diff --git a/src/restic/pack/pack.go b/src/restic/pack/pack.go index 7aa8360be..e983eae3f 100644 --- a/src/restic/pack/pack.go +++ b/src/restic/pack/pack.go @@ -83,26 +83,26 @@ type Packer struct { bytes uint k *crypto.Key - buf *bytes.Buffer + wr io.Writer m sync.Mutex } // NewPacker returns a new Packer that can be used to pack blobs -// together. -func NewPacker(k *crypto.Key, buf []byte) *Packer { - return &Packer{k: k, buf: bytes.NewBuffer(buf)} +// together. If wr is nil, a bytes.Buffer is used. +func NewPacker(k *crypto.Key, wr io.Writer) *Packer { + return &Packer{k: k, wr: wr} } // Add saves the data read from rd as a new blob to the packer. Returned is the // number of bytes written to the pack. -func (p *Packer) Add(t BlobType, id backend.ID, rd io.Reader) (int64, error) { +func (p *Packer) Add(t BlobType, id backend.ID, data []byte) (int, error) { p.m.Lock() defer p.m.Unlock() c := Blob{Type: t, ID: id} - n, err := io.Copy(p.buf, rd) + n, err := p.wr.Write(data) c.Length = uint(n) c.Offset = p.bytes p.bytes += uint(n) @@ -121,8 +121,9 @@ type headerEntry struct { } // Finalize writes the header for all added blobs and finalizes the pack. -// Returned are all bytes written, including the header. -func (p *Packer) Finalize() ([]byte, error) { +// Returned are the number of bytes written, including the header. If the +// underlying writer implements io.Closer, it is closed. +func (p *Packer) Finalize() (uint, error) { p.m.Lock() defer p.m.Unlock() @@ -131,37 +132,41 @@ func (p *Packer) Finalize() ([]byte, error) { hdrBuf := bytes.NewBuffer(nil) bytesHeader, err := p.writeHeader(hdrBuf) if err != nil { - return nil, err + return 0, err } encryptedHeader, err := crypto.Encrypt(p.k, nil, hdrBuf.Bytes()) if err != nil { - return nil, err + return 0, err } // append the header - n, err := p.buf.Write(encryptedHeader) + n, err := p.wr.Write(encryptedHeader) if err != nil { - return nil, err + return 0, err } hdrBytes := bytesHeader + crypto.Extension if uint(n) != hdrBytes { - return nil, errors.New("wrong number of bytes written") + return 0, errors.New("wrong number of bytes written") } bytesWritten += hdrBytes // write length - err = binary.Write(p.buf, binary.LittleEndian, uint32(uint(len(p.blobs))*entrySize+crypto.Extension)) + err = binary.Write(p.wr, binary.LittleEndian, uint32(uint(len(p.blobs))*entrySize+crypto.Extension)) if err != nil { - return nil, err + return 0, err } bytesWritten += uint(binary.Size(uint32(0))) p.bytes = uint(bytesWritten) - return p.buf.Bytes(), nil + if w, ok := p.wr.(io.Closer); ok { + return bytesWritten, w.Close() + } + + return bytesWritten, nil } // writeHeader constructs and writes the header to wr. @@ -208,6 +213,11 @@ func (p *Packer) Blobs() []Blob { return p.blobs } +// Writer return the underlying writer. +func (p *Packer) Writer() io.Writer { + return p.wr +} + func (p *Packer) String() string { return fmt.Sprintf("", len(p.blobs), p.bytes) } diff --git a/src/restic/repository/packer_manager.go b/src/restic/repository/packer_manager.go index 2791af51c..88a3e1ac7 100644 --- a/src/restic/repository/packer_manager.go +++ b/src/restic/repository/packer_manager.go @@ -1,6 +1,10 @@ package repository import ( + "fmt" + "io" + "io/ioutil" + "os" "sync" "restic/backend" @@ -11,7 +15,7 @@ import ( // Saver implements saving data in a backend. type Saver interface { - Save(h backend.Handle, p []byte) error + Save(h backend.Handle, jp []byte) error } // packerManager keeps a list of open packs and creates new on demand. @@ -20,12 +24,30 @@ type packerManager struct { key *crypto.Key pm sync.Mutex packs []*pack.Packer + + tempdir string } const minPackSize = 4 * 1024 * 1024 const maxPackSize = 16 * 1024 * 1024 const maxPackers = 200 +// NewPackerManager returns an new packer manager which writes temporary files +// to a temporary directory +func NewPackerManager(be Saver, key *crypto.Key) (pm *packerManager, err error) { + pm = &packerManager{ + be: be, + key: key, + } + + pm.tempdir, err = ioutil.TempDir("", fmt.Sprintf("restic-packs-%d-", os.Getpid())) + if err != nil { + return nil, err + } + + return pm, nil +} + // findPacker returns a packer for a new blob of size bytes. Either a new one is // created or one is returned that already has some blobs. func (r *packerManager) findPacker(size uint) (*pack.Packer, error) { @@ -47,7 +69,13 @@ func (r *packerManager) findPacker(size uint) (*pack.Packer, error) { // no suitable packer found, return new debug.Log("Repo.findPacker", "create new pack for %d bytes", size) - return pack.NewPacker(r.key, nil), nil + tmpfile, err := ioutil.TempFile(r.tempdir, "restic-pack-") + if err != nil { + return nil, err + } + + fmt.Printf("tmpfile: %v, tempdir %v\n", tmpfile.Name(), r.tempdir) + return pack.NewPacker(r.key, tmpfile), nil } // insertPacker appends p to s.packs. @@ -62,11 +90,28 @@ func (r *packerManager) insertPacker(p *pack.Packer) { // savePacker stores p in the backend. func (r *Repository) savePacker(p *pack.Packer) error { debug.Log("Repo.savePacker", "save packer with %d blobs\n", p.Count()) - data, err := p.Finalize() + n, err := p.Finalize() if err != nil { return err } + tmpfile := p.Writer().(*os.File) + f, err := os.Open(tmpfile.Name()) + if err != nil { + return err + } + + data := make([]byte, n) + m, err := io.ReadFull(f, data) + + if uint(m) != n { + return fmt.Errorf("read wrong number of bytes from %v: want %v, got %v", tmpfile.Name(), n, m) + } + + if err = f.Close(); err != nil { + return err + } + id := backend.Hash(data) h := backend.Handle{Type: backend.Data, Name: id.String()} @@ -100,3 +145,10 @@ func (r *packerManager) countPacker() int { return len(r.packs) } + +// removeTempdir deletes the temporary directory. +func (r *packerManager) removeTempdir() error { + err := os.RemoveAll(r.tempdir) + r.tempdir = "" + return err +} diff --git a/src/restic/repository/packer_manager_test.go b/src/restic/repository/packer_manager_test.go index 4af073e4d..79a7d9904 100644 --- a/src/restic/repository/packer_manager_test.go +++ b/src/restic/repository/packer_manager_test.go @@ -3,6 +3,7 @@ package repository import ( "io" "math/rand" + "os" "restic/backend" "restic/backend/mem" "restic/crypto" @@ -19,7 +20,39 @@ func randomID(rd io.Reader) backend.ID { return id } -func fillPacks(t testing.TB, rnd *rand.Rand, be Saver, pm *packerManager) (bytes int) { +const maxBlobSize = 1 << 20 + +func saveFile(t testing.TB, be Saver, filename string, n int) { + f, err := os.Open(filename) + if err != nil { + t.Fatal(err) + } + + data := make([]byte, n) + m, err := io.ReadFull(f, data) + + if m != n { + t.Fatalf("read wrong number of bytes from %v: want %v, got %v", filename, m, n) + } + + if err = f.Close(); err != nil { + t.Fatal(err) + } + + h := backend.Handle{Type: backend.Data, Name: backend.Hash(data).String()} + + err = be.Save(h, data) + if err != nil { + t.Fatal(err) + } + + err = os.Remove(filename) + if err != nil { + t.Fatal(err) + } +} + +func fillPacks(t testing.TB, rnd *rand.Rand, be Saver, pm *packerManager, buf []byte) (bytes int) { for i := 0; i < 100; i++ { l := rnd.Intn(1 << 20) seed := rnd.Int63() @@ -31,9 +64,14 @@ func fillPacks(t testing.TB, rnd *rand.Rand, be Saver, pm *packerManager) (bytes rd := rand.New(rand.NewSource(seed)) id := randomID(rd) - n, err := packer.Add(pack.Data, id, io.LimitReader(rd, int64(l))) + buf = buf[:l] + _, err = io.ReadFull(rd, buf) + if err != nil { + t.Fatal(err) + } - if n != int64(l) { + n, err := packer.Add(pack.Data, id, buf) + if n != l { t.Errorf("Add() returned invalid number of bytes: want %v, got %v", n, l) } bytes += l @@ -43,17 +81,13 @@ func fillPacks(t testing.TB, rnd *rand.Rand, be Saver, pm *packerManager) (bytes continue } - data, err := packer.Finalize() + bytesWritten, err := packer.Finalize() if err != nil { t.Fatal(err) } - h := backend.Handle{Type: backend.Data, Name: randomID(rd).String()} - - err = be.Save(h, data) - if err != nil { - t.Fatal(err) - } + tmpfile := packer.Writer().(*os.File) + saveFile(t, be, tmpfile.Name(), int(bytesWritten)) } return bytes @@ -62,18 +96,14 @@ func fillPacks(t testing.TB, rnd *rand.Rand, be Saver, pm *packerManager) (bytes func flushRemainingPacks(t testing.TB, rnd *rand.Rand, be Saver, pm *packerManager) (bytes int) { if pm.countPacker() > 0 { for _, packer := range pm.packs { - data, err := packer.Finalize() + n, err := packer.Finalize() if err != nil { t.Fatal(err) } - bytes += len(data) + bytes += int(n) - h := backend.Handle{Type: backend.Data, Name: randomID(rnd).String()} - - err = be.Save(h, data) - if err != nil { - t.Fatal(err) - } + tmpfile := packer.Writer().(*os.File) + saveFile(t, be, tmpfile.Name(), bytes) } } @@ -90,33 +120,45 @@ func TestPackerManager(t *testing.T) { rnd := rand.New(rand.NewSource(23)) be := mem.New() - pm := &packerManager{ - be: be, - key: crypto.NewRandomKey(), + pm, err := NewPackerManager(be, crypto.NewRandomKey()) + if err != nil { + t.Fatal(err) } - bytes := fillPacks(t, rnd, be, pm) + blobBuf := make([]byte, maxBlobSize) + + bytes := fillPacks(t, rnd, be, pm, blobBuf) bytes += flushRemainingPacks(t, rnd, be, pm) t.Logf("saved %d bytes", bytes) + err = pm.removeTempdir() + if err != nil { + t.Fatal(err) + } } func BenchmarkPackerManager(t *testing.B) { rnd := rand.New(rand.NewSource(23)) be := &fakeBackend{} - pm := &packerManager{ - be: be, - key: crypto.NewRandomKey(), + pm, err := NewPackerManager(be, crypto.NewRandomKey()) + if err != nil { + t.Fatal(err) } + blobBuf := make([]byte, maxBlobSize) t.ResetTimer() bytes := 0 for i := 0; i < t.N; i++ { - bytes += fillPacks(t, rnd, be, pm) + bytes += fillPacks(t, rnd, be, pm, blobBuf) } bytes += flushRemainingPacks(t, rnd, be, pm) t.Logf("saved %d bytes", bytes) + + err = pm.removeTempdir() + if err != nil { + t.Fatal(err) + } } diff --git a/src/restic/repository/repository.go b/src/restic/repository/repository.go index f981f5b76..eb86902b6 100644 --- a/src/restic/repository/repository.go +++ b/src/restic/repository/repository.go @@ -27,14 +27,19 @@ type Repository struct { } // New returns a new repository with backend be. -func New(be backend.Backend) *Repository { - return &Repository{ - be: be, - idx: NewMasterIndex(), - packerManager: &packerManager{ - be: be, - }, +func New(be backend.Backend) (*Repository, error) { + pm, err := NewPackerManager(be, nil) + if err != nil { + return nil, err } + + repo := &Repository{ + be: be, + idx: NewMasterIndex(), + packerManager: pm, + } + + return repo, nil } // Find loads the list of all blobs of type t and searches for names which start @@ -195,7 +200,7 @@ func (r *Repository) SaveAndEncrypt(t pack.BlobType, data []byte, id *backend.ID } // save ciphertext - _, err = packer.Add(t, *id, bytes.NewReader(ciphertext)) + _, err = packer.Add(t, *id, ciphertext) if err != nil { return backend.ID{}, err } diff --git a/src/restic/test/backend.go b/src/restic/test/backend.go index 5516cecdf..4d9ca6a44 100644 --- a/src/restic/test/backend.go +++ b/src/restic/test/backend.go @@ -61,7 +61,10 @@ func SetupRepo() *repository.Repository { panic(err) } - repo := repository.New(b) + repo, err := repository.New(b) + if err != nil { + panic(err) + } err = repo.Init(TestPassword) if err != nil { panic(err) diff --git a/src/restic/test/helpers.go b/src/restic/test/helpers.go index 4c3280fba..c43e15b70 100644 --- a/src/restic/test/helpers.go +++ b/src/restic/test/helpers.go @@ -213,7 +213,10 @@ func OpenLocalRepo(t testing.TB, dir string) *repository.Repository { be, err := local.Open(dir) OK(t, err) - repo := repository.New(be) + repo, err := repository.New(be) + if err != nil { + t.Fatal(err) + } err = repo.SearchKey(TestPassword) OK(t, err)