diff --git a/internal/repository/packer_manager.go b/internal/repository/packer_manager.go index e3b43b24e..32b2c9b7a 100644 --- a/internal/repository/packer_manager.go +++ b/internal/repository/packer_manager.go @@ -1,9 +1,10 @@ package repository import ( + "bufio" "context" - "hash" "io" + "io/ioutil" "os" "runtime" "sync" @@ -23,32 +24,29 @@ import ( // Packer holds a pack.Packer together with a hash writer. type Packer struct { *pack.Packer - hw *hashing.Writer - beHw *hashing.Writer tmpfile *os.File + bufWr *bufio.Writer } // packerManager keeps a list of open packs and creates new on demand. type packerManager struct { - tpe restic.BlobType - key *crypto.Key - hasherFn func() hash.Hash - queueFn func(ctx context.Context, t restic.BlobType, p *Packer) error + tpe restic.BlobType + key *crypto.Key + queueFn func(ctx context.Context, t restic.BlobType, p *Packer) error - pm sync.Mutex - packers []*Packer + pm sync.Mutex + packer *Packer } const minPackSize = 4 * 1024 * 1024 // newPackerManager returns an new packer manager which writes temporary files // to a temporary directory -func newPackerManager(key *crypto.Key, hasherFn func() hash.Hash, tpe restic.BlobType, queueFn func(ctx context.Context, t restic.BlobType, p *Packer) error) *packerManager { +func newPackerManager(key *crypto.Key, tpe restic.BlobType, queueFn func(ctx context.Context, t restic.BlobType, p *Packer) error) *packerManager { return &packerManager{ - tpe: tpe, - key: key, - hasherFn: hasherFn, - queueFn: queueFn, + tpe: tpe, + key: key, + queueFn: queueFn, } } @@ -56,24 +54,34 @@ func (r *packerManager) Flush(ctx context.Context) error { r.pm.Lock() defer r.pm.Unlock() - debug.Log("manually flushing %d packs", len(r.packers)) - for _, packer := range r.packers { - err := r.queueFn(ctx, r.tpe, packer) + if r.packer != nil { + debug.Log("manually flushing pending pack") + err := r.queueFn(ctx, r.tpe, r.packer) if err != nil { return err } + r.packer = nil } - r.packers = r.packers[:0] return nil } func (r *packerManager) SaveBlob(ctx context.Context, t restic.BlobType, id restic.ID, ciphertext []byte, uncompressedLength int) (int, error) { - packer, err := r.findPacker() - if err != nil { - return 0, err + r.pm.Lock() + defer r.pm.Unlock() + + var err error + packer := r.packer + if r.packer == nil { + packer, err = r.newPacker() + if err != nil { + return 0, err + } } + // remember packer + r.packer = packer // save ciphertext + // Add only appends bytes in memory to avoid being a scaling bottleneck size, err := packer.Add(t, id, ciphertext, uncompressedLength) if err != nil { return 0, err @@ -82,10 +90,12 @@ func (r *packerManager) SaveBlob(ctx context.Context, t restic.BlobType, id rest // if the pack is not full enough, put back to the list if packer.Size() < minPackSize { debug.Log("pack is not full enough (%d bytes)", packer.Size()) - r.insertPacker(packer) return size, nil } + // forget full packer + r.packer = nil + // call while holding lock to prevent findPacker from creating new packers if the uploaders are busy // else write the pack to the backend err = r.queueFn(ctx, t, packer) if err != nil { @@ -97,56 +107,24 @@ func (r *packerManager) SaveBlob(ctx context.Context, t restic.BlobType, id rest // findPacker returns a packer for a new blob of size bytes. Either a new one is // created or one is returned that already has some blobs. -func (r *packerManager) findPacker() (packer *Packer, err error) { - r.pm.Lock() - defer r.pm.Unlock() - - // search for a suitable packer - if len(r.packers) > 0 { - p := r.packers[0] - last := len(r.packers) - 1 - r.packers[0] = r.packers[last] - r.packers[last] = nil // Allow GC of stale reference. - r.packers = r.packers[:last] - return p, nil - } - - // no suitable packer found, return new +func (r *packerManager) newPacker() (packer *Packer, err error) { debug.Log("create new pack") tmpfile, err := fs.TempFile("", "restic-temp-pack-") if err != nil { return nil, errors.Wrap(err, "fs.TempFile") } - w := io.Writer(tmpfile) - beHasher := r.hasherFn() - var beHw *hashing.Writer - if beHasher != nil { - beHw = hashing.NewWriter(w, beHasher) - w = beHw - } - - hw := hashing.NewWriter(w, sha256.New()) - p := pack.NewPacker(r.key, hw) + bufWr := bufio.NewWriter(tmpfile) + p := pack.NewPacker(r.key, bufWr) packer = &Packer{ Packer: p, - beHw: beHw, - hw: hw, tmpfile: tmpfile, + bufWr: bufWr, } return packer, nil } -// insertPacker appends p to s.packs. -func (r *packerManager) insertPacker(p *Packer) { - r.pm.Lock() - defer r.pm.Unlock() - - r.packers = append(r.packers, p) - debug.Log("%d packers\n", len(r.packers)) -} - // savePacker stores p in the backend. func (r *Repository) savePacker(ctx context.Context, t restic.BlobType, p *Packer) error { debug.Log("save packer for %v with %d blobs (%d bytes)\n", t, p.Packer.Count(), p.Packer.Size()) @@ -154,20 +132,43 @@ func (r *Repository) savePacker(ctx context.Context, t restic.BlobType, p *Packe if err != nil { return err } - - id := restic.IDFromHash(p.hw.Sum(nil)) - h := restic.Handle{Type: restic.PackFile, Name: id.String(), - ContainedBlobType: t} - var beHash []byte - if p.beHw != nil { - beHash = p.beHw.Sum(nil) - } - rd, err := restic.NewFileReader(p.tmpfile, beHash) + err = p.bufWr.Flush() if err != nil { return err } - err = r.be.Save(ctx, h, rd) + // calculate sha256 hash in a second pass + var rd io.Reader + rd, err = restic.NewFileReader(p.tmpfile, nil) + if err != nil { + return err + } + beHasher := r.be.Hasher() + var beHr *hashing.Reader + if beHasher != nil { + beHr = hashing.NewReader(rd, beHasher) + rd = beHr + } + + hr := hashing.NewReader(rd, sha256.New()) + _, err = io.Copy(ioutil.Discard, hr) + if err != nil { + return err + } + + id := restic.IDFromHash(hr.Sum(nil)) + h := restic.Handle{Type: restic.PackFile, Name: id.String(), + ContainedBlobType: t} + var beHash []byte + if beHr != nil { + beHash = beHr.Sum(nil) + } + rrd, err := restic.NewFileReader(p.tmpfile, beHash) + if err != nil { + return err + } + + err = r.be.Save(ctx, h, rrd) if err != nil { debug.Log("Save(%v) error: %v", h, err) return err @@ -198,11 +199,3 @@ func (r *Repository) savePacker(ctx context.Context, t restic.BlobType, p *Packe } return r.idx.SaveFullIndex(ctx, r) } - -// countPacker returns the number of open (unfinished) packers. -func (r *packerManager) countPacker() int { - r.pm.Lock() - defer r.pm.Unlock() - - return len(r.packers) -} diff --git a/internal/repository/packer_manager_test.go b/internal/repository/packer_manager_test.go index 8713aad4e..67a33c757 100644 --- a/internal/repository/packer_manager_test.go +++ b/internal/repository/packer_manager_test.go @@ -2,26 +2,16 @@ package repository import ( "context" - "hash" "io" "math/rand" - "os" "sync" "testing" - "github.com/restic/restic/internal/backend/mem" "github.com/restic/restic/internal/crypto" - "github.com/restic/restic/internal/fs" - "github.com/restic/restic/internal/mock" "github.com/restic/restic/internal/restic" + "github.com/restic/restic/internal/test" ) -// Saver implements saving data in a backend. -type Saver interface { - Save(context.Context, restic.Handle, restic.RewindReader) error - Hasher() hash.Hash -} - func randomID(rd io.Reader) restic.ID { id := restic.ID{} _, err := io.ReadFull(rd, id[:]) @@ -40,91 +30,27 @@ func min(a, b int) int { return b } -func saveFile(t testing.TB, be Saver, length int, f *os.File, id restic.ID, hash []byte) { - h := restic.Handle{Type: restic.PackFile, Name: id.String()} - t.Logf("save file %v", h) - - rd, err := restic.NewFileReader(f, hash) - if err != nil { - t.Fatal(err) - } - - err = be.Save(context.TODO(), h, rd) - if err != nil { - t.Fatal(err) - } - - if err := f.Close(); err != nil { - t.Fatal(err) - } - - if err := fs.RemoveIfExists(f.Name()); err != nil { - t.Fatal(err) - } -} - -func fillPacks(t testing.TB, rnd *rand.Rand, be Saver, pm *packerManager, buf []byte) (bytes int) { +func fillPacks(t testing.TB, rnd *rand.Rand, pm *packerManager, buf []byte) (bytes int) { for i := 0; i < 100; i++ { l := rnd.Intn(maxBlobSize) - - packer, err := pm.findPacker() - if err != nil { - t.Fatal(err) - } - id := randomID(rnd) buf = buf[:l] // Only change a few bytes so we know we're not benchmarking the RNG. rnd.Read(buf[:min(l, 4)]) - n, err := packer.Add(restic.DataBlob, id, buf, 0) + n, err := pm.SaveBlob(context.TODO(), restic.DataBlob, id, buf, 0) if err != nil { t.Fatal(err) } - if n != l+37 { + if n != l+37 && n != l+37+36 { t.Errorf("Add() returned invalid number of bytes: want %v, got %v", l, n) } - bytes += l - - if packer.Size() < minPackSize { - pm.insertPacker(packer) - continue - } - - err = packer.Finalize() - if err != nil { - t.Fatal(err) - } - - packID := restic.IDFromHash(packer.hw.Sum(nil)) - var beHash []byte - if packer.beHw != nil { - beHash = packer.beHw.Sum(nil) - } - saveFile(t, be, int(packer.Size()), packer.tmpfile, packID, beHash) + bytes += n } - - return bytes -} - -func flushRemainingPacks(t testing.TB, be Saver, pm *packerManager) (bytes int) { - if pm.countPacker() > 0 { - for _, packer := range pm.packers { - err := packer.Finalize() - if err != nil { - t.Fatal(err) - } - bytes += packer.HeaderOverhead() - - packID := restic.IDFromHash(packer.hw.Sum(nil)) - var beHash []byte - if packer.beHw != nil { - beHash = packer.beHw.Sum(nil) - } - saveFile(t, be, int(packer.Size()), packer.tmpfile, packID, beHash) - } + err := pm.Flush(context.TODO()) + if err != nil { + t.Fatal(err) } - return bytes } @@ -143,13 +69,21 @@ func TestPackerManager(t *testing.T) { func testPackerManager(t testing.TB) int64 { rnd := rand.New(rand.NewSource(randomSeed)) - be := mem.New() - pm := newPackerManager(crypto.NewRandomKey(), be.Hasher, restic.DataBlob, nil) + savedBytes := int(0) + pm := newPackerManager(crypto.NewRandomKey(), restic.DataBlob, func(ctx context.Context, tp restic.BlobType, p *Packer) error { + err := p.Finalize() + if err != nil { + return err + } + savedBytes += int(p.Size()) + return nil + }) blobBuf := make([]byte, maxBlobSize) - bytes := fillPacks(t, rnd, be, pm, blobBuf) - bytes += flushRemainingPacks(t, be, pm) + bytes := fillPacks(t, rnd, pm, blobBuf) + // bytes does not include the last packs header + test.Equals(t, savedBytes, bytes+36) t.Logf("saved %d bytes", bytes) return int64(bytes) @@ -162,10 +96,6 @@ func BenchmarkPackerManager(t *testing.B) { }) rnd := rand.New(rand.NewSource(randomSeed)) - - be := &mock.Backend{ - SaveFn: func(context.Context, restic.Handle, restic.RewindReader) error { return nil }, - } blobBuf := make([]byte, maxBlobSize) t.ReportAllocs() @@ -174,8 +104,9 @@ func BenchmarkPackerManager(t *testing.B) { for i := 0; i < t.N; i++ { rnd.Seed(randomSeed) - pm := newPackerManager(crypto.NewRandomKey(), be.Hasher, restic.DataBlob, nil) - fillPacks(t, rnd, be, pm, blobBuf) - flushRemainingPacks(t, be, pm) + pm := newPackerManager(crypto.NewRandomKey(), restic.DataBlob, func(ctx context.Context, t restic.BlobType, p *Packer) error { + return nil + }) + fillPacks(t, rnd, pm, blobBuf) } } diff --git a/internal/repository/repository.go b/internal/repository/repository.go index 7f42dbdd5..a185032b5 100644 --- a/internal/repository/repository.go +++ b/internal/repository/repository.go @@ -520,8 +520,8 @@ func (r *Repository) StartPackUploader(ctx context.Context, wg *errgroup.Group) innerWg, ctx := errgroup.WithContext(ctx) r.packerWg = innerWg r.uploader = newPackerUploader(ctx, innerWg, r, r.be.Connections()) - r.treePM = newPackerManager(r.key, r.be.Hasher, restic.TreeBlob, r.uploader.QueuePacker) - r.dataPM = newPackerManager(r.key, r.be.Hasher, restic.DataBlob, r.uploader.QueuePacker) + r.treePM = newPackerManager(r.key, restic.TreeBlob, r.uploader.QueuePacker) + r.dataPM = newPackerManager(r.key, restic.DataBlob, r.uploader.QueuePacker) wg.Go(func() error { return innerWg.Wait()