diff --git a/src/cmds/restic/cmd_init.go b/src/cmds/restic/cmd_init.go index ec790fe90..887d34166 100644 --- a/src/cmds/restic/cmd_init.go +++ b/src/cmds/restic/cmd_init.go @@ -27,6 +27,7 @@ func (cmd CmdInit) Execute(args []string) error { } s := repository.New(be) + err = s.Init(cmd.global.password) if err != nil { cmd.global.Exitf(1, "creating key in backend at %s failed: %v\n", cmd.global.Repo, err) diff --git a/src/restic/archiver_duplication_test.go b/src/restic/archiver_duplication_test.go index 3edc70cb3..ffcbacee4 100644 --- a/src/restic/archiver_duplication_test.go +++ b/src/restic/archiver_duplication_test.go @@ -79,6 +79,7 @@ func testArchiverDuplication(t *testing.T) { } repo := repository.New(forgetfulBackend()) + err = repo.Init("foo") if err != nil { t.Fatal(err) diff --git a/src/restic/pack/pack.go b/src/restic/pack/pack.go index 7aa8360be..931bda869 100644 --- a/src/restic/pack/pack.go +++ b/src/restic/pack/pack.go @@ -83,26 +83,29 @@ type Packer struct { bytes uint k *crypto.Key - buf *bytes.Buffer + wr io.Writer m sync.Mutex } // NewPacker returns a new Packer that can be used to pack blobs -// together. -func NewPacker(k *crypto.Key, buf []byte) *Packer { - return &Packer{k: k, buf: bytes.NewBuffer(buf)} +// together. If wr is nil, a bytes.Buffer is used. +func NewPacker(k *crypto.Key, wr io.Writer) *Packer { + if wr == nil { + wr = bytes.NewBuffer(nil) + } + return &Packer{k: k, wr: wr} } // Add saves the data read from rd as a new blob to the packer. Returned is the // number of bytes written to the pack. -func (p *Packer) Add(t BlobType, id backend.ID, rd io.Reader) (int64, error) { +func (p *Packer) Add(t BlobType, id backend.ID, data []byte) (int, error) { p.m.Lock() defer p.m.Unlock() c := Blob{Type: t, ID: id} - n, err := io.Copy(p.buf, rd) + n, err := p.wr.Write(data) c.Length = uint(n) c.Offset = p.bytes p.bytes += uint(n) @@ -121,8 +124,9 @@ type headerEntry struct { } // Finalize writes the header for all added blobs and finalizes the pack. -// Returned are all bytes written, including the header. -func (p *Packer) Finalize() ([]byte, error) { +// Returned are the number of bytes written, including the header. If the +// underlying writer implements io.Closer, it is closed. +func (p *Packer) Finalize() (uint, error) { p.m.Lock() defer p.m.Unlock() @@ -131,37 +135,41 @@ func (p *Packer) Finalize() ([]byte, error) { hdrBuf := bytes.NewBuffer(nil) bytesHeader, err := p.writeHeader(hdrBuf) if err != nil { - return nil, err + return 0, err } encryptedHeader, err := crypto.Encrypt(p.k, nil, hdrBuf.Bytes()) if err != nil { - return nil, err + return 0, err } // append the header - n, err := p.buf.Write(encryptedHeader) + n, err := p.wr.Write(encryptedHeader) if err != nil { - return nil, err + return 0, err } hdrBytes := bytesHeader + crypto.Extension if uint(n) != hdrBytes { - return nil, errors.New("wrong number of bytes written") + return 0, errors.New("wrong number of bytes written") } bytesWritten += hdrBytes // write length - err = binary.Write(p.buf, binary.LittleEndian, uint32(uint(len(p.blobs))*entrySize+crypto.Extension)) + err = binary.Write(p.wr, binary.LittleEndian, uint32(uint(len(p.blobs))*entrySize+crypto.Extension)) if err != nil { - return nil, err + return 0, err } bytesWritten += uint(binary.Size(uint32(0))) p.bytes = uint(bytesWritten) - return p.buf.Bytes(), nil + if w, ok := p.wr.(io.Closer); ok { + return bytesWritten, w.Close() + } + + return bytesWritten, nil } // writeHeader constructs and writes the header to wr. @@ -208,6 +216,11 @@ func (p *Packer) Blobs() []Blob { return p.blobs } +// Writer return the underlying writer. +func (p *Packer) Writer() io.Writer { + return p.wr +} + func (p *Packer) String() string { return fmt.Sprintf("", len(p.blobs), p.bytes) } diff --git a/src/restic/pack/pack_test.go b/src/restic/pack/pack_test.go index 18a7a86f4..e987ced7c 100644 --- a/src/restic/pack/pack_test.go +++ b/src/restic/pack/pack_test.go @@ -38,12 +38,13 @@ func newPack(t testing.TB, k *crypto.Key) ([]Buf, []byte, uint) { // pack blobs p := pack.NewPacker(k, nil) for _, b := range bufs { - p.Add(pack.Tree, b.id, bytes.NewReader(b.data)) + p.Add(pack.Tree, b.id, b.data) } - packData, err := p.Finalize() + _, err := p.Finalize() OK(t, err) + packData := p.Writer().(*bytes.Buffer).Bytes() return bufs, packData, p.Size() } diff --git a/src/restic/repository/packer_manager.go b/src/restic/repository/packer_manager.go index a7716418e..de1f97c74 100644 --- a/src/restic/repository/packer_manager.go +++ b/src/restic/repository/packer_manager.go @@ -1,6 +1,10 @@ package repository import ( + "fmt" + "io" + "io/ioutil" + "os" "sync" "restic/backend" @@ -9,21 +13,42 @@ import ( "restic/pack" ) +// Saver implements saving data in a backend. +type Saver interface { + Save(h backend.Handle, jp []byte) error +} + // packerManager keeps a list of open packs and creates new on demand. type packerManager struct { - be backend.Backend + be Saver key *crypto.Key pm sync.Mutex packs []*pack.Packer + + pool sync.Pool } const minPackSize = 4 * 1024 * 1024 const maxPackSize = 16 * 1024 * 1024 const maxPackers = 200 +// newPackerManager returns an new packer manager which writes temporary files +// to a temporary directory +func newPackerManager(be Saver, key *crypto.Key) *packerManager { + return &packerManager{ + be: be, + key: key, + pool: sync.Pool{ + New: func() interface{} { + return make([]byte, (minPackSize+maxPackSize)/2) + }, + }, + } +} + // findPacker returns a packer for a new blob of size bytes. Either a new one is // created or one is returned that already has some blobs. -func (r *packerManager) findPacker(size uint) (*pack.Packer, error) { +func (r *packerManager) findPacker(size uint) (packer *pack.Packer, err error) { r.pm.Lock() defer r.pm.Unlock() @@ -42,7 +67,12 @@ func (r *packerManager) findPacker(size uint) (*pack.Packer, error) { // no suitable packer found, return new debug.Log("Repo.findPacker", "create new pack for %d bytes", size) - return pack.NewPacker(r.key, nil), nil + tmpfile, err := ioutil.TempFile("", "restic-temp-pack-") + if err != nil { + return nil, err + } + + return pack.NewPacker(r.key, tmpfile), nil } // insertPacker appends p to s.packs. @@ -57,11 +87,28 @@ func (r *packerManager) insertPacker(p *pack.Packer) { // savePacker stores p in the backend. func (r *Repository) savePacker(p *pack.Packer) error { debug.Log("Repo.savePacker", "save packer with %d blobs\n", p.Count()) - data, err := p.Finalize() + n, err := p.Finalize() if err != nil { return err } + tmpfile := p.Writer().(*os.File) + f, err := os.Open(tmpfile.Name()) + if err != nil { + return err + } + + data := make([]byte, n) + m, err := io.ReadFull(f, data) + + if uint(m) != n { + return fmt.Errorf("read wrong number of bytes from %v: want %v, got %v", tmpfile.Name(), n, m) + } + + if err = f.Close(); err != nil { + return err + } + id := backend.Hash(data) h := backend.Handle{Type: backend.Data, Name: id.String()} @@ -73,6 +120,11 @@ func (r *Repository) savePacker(p *pack.Packer) error { debug.Log("Repo.savePacker", "saved as %v", h) + err = os.Remove(tmpfile.Name()) + if err != nil { + return err + } + // update blobs in the index for _, b := range p.Blobs() { debug.Log("Repo.savePacker", " updating blob %v to pack %v", b.ID.Str(), id.Str()) diff --git a/src/restic/repository/packer_manager_test.go b/src/restic/repository/packer_manager_test.go new file mode 100644 index 000000000..78d91bc37 --- /dev/null +++ b/src/restic/repository/packer_manager_test.go @@ -0,0 +1,174 @@ +package repository + +import ( + "io" + "math/rand" + "os" + "restic/backend" + "restic/backend/mem" + "restic/crypto" + "restic/pack" + "testing" +) + +type randReader struct { + src rand.Source + rand *rand.Rand +} + +func newRandReader(src rand.Source) *randReader { + return &randReader{ + src: src, + rand: rand.New(src), + } +} + +// Read generates len(p) random bytes and writes them into p. It +// always returns len(p) and a nil error. +func (r *randReader) Read(p []byte) (n int, err error) { + for i := 0; i < len(p); i += 7 { + val := r.src.Int63() + for j := 0; i+j < len(p) && j < 7; j++ { + p[i+j] = byte(val) + val >>= 8 + } + } + return len(p), nil +} + +func randomID(rd io.Reader) backend.ID { + id := backend.ID{} + _, err := io.ReadFull(rd, id[:]) + if err != nil { + panic(err) + } + return id +} + +const maxBlobSize = 1 << 20 + +func saveFile(t testing.TB, be Saver, filename string, n int) { + f, err := os.Open(filename) + if err != nil { + t.Fatal(err) + } + + data := make([]byte, n) + m, err := io.ReadFull(f, data) + + if m != n { + t.Fatalf("read wrong number of bytes from %v: want %v, got %v", filename, m, n) + } + + if err = f.Close(); err != nil { + t.Fatal(err) + } + + h := backend.Handle{Type: backend.Data, Name: backend.Hash(data).String()} + + err = be.Save(h, data) + if err != nil { + t.Fatal(err) + } + + err = os.Remove(filename) + if err != nil { + t.Fatal(err) + } +} + +func fillPacks(t testing.TB, rnd *randReader, be Saver, pm *packerManager, buf []byte) (bytes int) { + for i := 0; i < 100; i++ { + l := rnd.rand.Intn(1 << 20) + seed := rnd.rand.Int63() + + packer, err := pm.findPacker(uint(l)) + if err != nil { + t.Fatal(err) + } + + rd := newRandReader(rand.NewSource(seed)) + id := randomID(rd) + buf = buf[:l] + _, err = io.ReadFull(rd, buf) + if err != nil { + t.Fatal(err) + } + + n, err := packer.Add(pack.Data, id, buf) + if n != l { + t.Errorf("Add() returned invalid number of bytes: want %v, got %v", n, l) + } + bytes += l + + if packer.Size() < minPackSize && pm.countPacker() < maxPackers { + pm.insertPacker(packer) + continue + } + + bytesWritten, err := packer.Finalize() + if err != nil { + t.Fatal(err) + } + + tmpfile := packer.Writer().(*os.File) + saveFile(t, be, tmpfile.Name(), int(bytesWritten)) + } + + return bytes +} + +func flushRemainingPacks(t testing.TB, rnd *randReader, be Saver, pm *packerManager) (bytes int) { + if pm.countPacker() > 0 { + for _, packer := range pm.packs { + n, err := packer.Finalize() + if err != nil { + t.Fatal(err) + } + bytes += int(n) + + tmpfile := packer.Writer().(*os.File) + saveFile(t, be, tmpfile.Name(), bytes) + } + } + + return bytes +} + +type fakeBackend struct{} + +func (f *fakeBackend) Save(h backend.Handle, data []byte) error { + return nil +} + +func TestPackerManager(t *testing.T) { + rnd := newRandReader(rand.NewSource(23)) + + be := mem.New() + pm := newPackerManager(be, crypto.NewRandomKey()) + + blobBuf := make([]byte, maxBlobSize) + + bytes := fillPacks(t, rnd, be, pm, blobBuf) + bytes += flushRemainingPacks(t, rnd, be, pm) + + t.Logf("saved %d bytes", bytes) +} + +func BenchmarkPackerManager(t *testing.B) { + rnd := newRandReader(rand.NewSource(23)) + + be := &fakeBackend{} + pm := newPackerManager(be, crypto.NewRandomKey()) + blobBuf := make([]byte, maxBlobSize) + + t.ResetTimer() + + bytes := 0 + for i := 0; i < t.N; i++ { + bytes += fillPacks(t, rnd, be, pm, blobBuf) + } + + bytes += flushRemainingPacks(t, rnd, be, pm) + t.Logf("saved %d bytes", bytes) +} diff --git a/src/restic/repository/repository.go b/src/restic/repository/repository.go index f981f5b76..e7e2e5ada 100644 --- a/src/restic/repository/repository.go +++ b/src/restic/repository/repository.go @@ -28,13 +28,13 @@ type Repository struct { // New returns a new repository with backend be. func New(be backend.Backend) *Repository { - return &Repository{ - be: be, - idx: NewMasterIndex(), - packerManager: &packerManager{ - be: be, - }, + repo := &Repository{ + be: be, + idx: NewMasterIndex(), + packerManager: newPackerManager(be, nil), } + + return repo } // Find loads the list of all blobs of type t and searches for names which start @@ -195,7 +195,7 @@ func (r *Repository) SaveAndEncrypt(t pack.BlobType, data []byte, id *backend.ID } // save ciphertext - _, err = packer.Add(t, *id, bytes.NewReader(ciphertext)) + _, err = packer.Add(t, *id, ciphertext) if err != nil { return backend.ID{}, err } @@ -299,7 +299,6 @@ func (r *Repository) Flush() error { } } r.packs = r.packs[:0] - return nil }