repository: stream packs during repacking

This commit is contained in:
Michael Eischer 2021-08-20 10:10:35 +02:00
parent c4a2bfcb39
commit f00f690658
1 changed files with 31 additions and 119 deletions

View File

@ -2,13 +2,9 @@ package repository
import (
"context"
"os"
"sync"
"github.com/restic/restic/internal/debug"
"github.com/restic/restic/internal/errors"
"github.com/restic/restic/internal/fs"
"github.com/restic/restic/internal/pack"
"github.com/restic/restic/internal/restic"
"github.com/restic/restic/internal/ui/progress"
@ -27,147 +23,63 @@ const numRepackWorkers = 8
func Repack(ctx context.Context, repo restic.Repository, packs restic.IDSet, keepBlobs restic.BlobSet, p *progress.Counter) (obsoletePacks restic.IDSet, err error) {
debug.Log("repacking %d packs while keeping %d blobs", len(packs), len(keepBlobs))
var keepMutex sync.Mutex
wg, wgCtx := errgroup.WithContext(ctx)
downloadQueue := make(chan restic.ID)
downloadQueue := make(chan restic.PackBlobs)
wg.Go(func() error {
defer close(downloadQueue)
for packID := range packs {
select {
case downloadQueue <- packID:
case <-wgCtx.Done():
return wgCtx.Err()
}
}
return nil
})
type repackJob struct {
tempfile *os.File
hash restic.ID
packLength int64
}
processQueue := make(chan repackJob)
// used to close processQueue once all downloaders have finished
var downloadWG sync.WaitGroup
downloader := func() error {
defer downloadWG.Done()
for packID := range downloadQueue {
// load the complete pack into a temp file
h := restic.Handle{Type: restic.PackFile, Name: packID.String()}
tempfile, hash, packLength, err := DownloadAndHash(wgCtx, repo.Backend(), h)
if err != nil {
return errors.Wrap(err, "Repack")
}
debug.Log("pack %v loaded (%d bytes), hash %v", packID, packLength, hash)
if !packID.Equal(hash) {
return errors.Errorf("hash does not match id: want %v, got %v", packID, hash)
}
select {
case processQueue <- repackJob{tempfile, hash, packLength}:
case <-wgCtx.Done():
return wgCtx.Err()
}
}
return nil
}
downloadWG.Add(numRepackWorkers)
for i := 0; i < numRepackWorkers; i++ {
wg.Go(downloader)
}
wg.Go(func() error {
downloadWG.Wait()
close(processQueue)
return nil
})
var keepMutex sync.Mutex
worker := func() error {
for job := range processQueue {
tempfile, packID, packLength := job.tempfile, job.hash, job.packLength
blobs, _, err := pack.List(repo.Key(), tempfile, packLength)
if err != nil {
return err
}
debug.Log("processing pack %v, blobs: %v", packID, len(blobs))
var buf []byte
for _, entry := range blobs {
for pbs := range repo.Index().ListPacks(ctx, packs) {
var packBlobs []restic.Blob
keepMutex.Lock()
// filter out unnecessary blobs
for _, entry := range pbs.Blobs {
h := restic.BlobHandle{ID: entry.ID, Type: entry.Type}
keepMutex.Lock()
shouldKeep := keepBlobs.Has(h)
keepMutex.Unlock()
if !shouldKeep {
continue
if keepBlobs.Has(h) {
packBlobs = append(packBlobs, entry)
}
}
keepMutex.Unlock()
debug.Log(" process blob %v", h)
select {
case downloadQueue <- restic.PackBlobs{PackID: pbs.PackID, Blobs: packBlobs}:
case <-wgCtx.Done():
return wgCtx.Err()
}
}
return nil
})
if uint(cap(buf)) < entry.Length {
buf = make([]byte, entry.Length)
}
buf = buf[:entry.Length]
n, err := tempfile.ReadAt(buf, int64(entry.Offset))
if err != nil {
return errors.Wrap(err, "ReadAt")
}
if n != len(buf) {
return errors.Errorf("read blob %v from %v: not enough bytes read, want %v, got %v",
h, tempfile.Name(), len(buf), n)
}
nonce, ciphertext := buf[:repo.Key().NonceSize()], buf[repo.Key().NonceSize():]
plaintext, err := repo.Key().Open(ciphertext[:0], nonce, ciphertext, nil)
worker := func() error {
for t := range downloadQueue {
err := StreamPack(wgCtx, repo.Backend().Load, repo.Key(), t.PackID, t.Blobs, func(blob restic.BlobHandle, buf []byte, err error) error {
if err != nil {
return err
}
id := restic.Hash(plaintext)
if !id.Equal(entry.ID) {
debug.Log("read blob %v/%v from %v: wrong data returned, hash is %v",
h.Type, h.ID, tempfile.Name(), id)
return errors.Errorf("read blob %v from %v: wrong data returned, hash is %v",
h, tempfile.Name(), id)
}
keepMutex.Lock()
// recheck whether some other worker was faster
shouldKeep = keepBlobs.Has(h)
shouldKeep := keepBlobs.Has(blob)
if shouldKeep {
keepBlobs.Delete(h)
keepBlobs.Delete(blob)
}
keepMutex.Unlock()
if !shouldKeep {
continue
return nil
}
// We do want to save already saved blobs!
_, _, err = repo.SaveBlob(wgCtx, entry.Type, plaintext, entry.ID, true)
_, _, err = repo.SaveBlob(wgCtx, blob.Type, buf, blob.ID, true)
if err != nil {
return err
}
debug.Log(" saved blob %v", entry.ID)
}
if err = tempfile.Close(); err != nil {
return errors.Wrap(err, "Close")
}
if err = fs.RemoveIfExists(tempfile.Name()); err != nil {
return errors.Wrap(err, "Remove")
debug.Log(" saved blob %v", blob.ID)
return nil
})
if err != nil {
return err
}
p.Add(1)
}