From 153e2ba85919da899f182369a111188ce84d783c Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Fri, 20 Aug 2021 10:09:34 +0200 Subject: [PATCH 01/13] repository: Implement lisiting blobs per pack file --- internal/repository/master_index.go | 39 +++++++++++++++++++++++++++++ internal/restic/repository.go | 6 +++++ 2 files changed, 45 insertions(+) diff --git a/internal/repository/master_index.go b/internal/repository/master_index.go index 5e099a3a5..adc110df8 100644 --- a/internal/repository/master_index.go +++ b/internal/repository/master_index.go @@ -400,3 +400,42 @@ func (mi *MasterIndex) Save(ctx context.Context, repo restic.Repository, packBla return obsolete, err } + +// ListPacks returns the blobs of the specified pack files grouped by pack file. +func (mi *MasterIndex) ListPacks(ctx context.Context, packs restic.IDSet) <-chan restic.PackBlobs { + out := make(chan restic.PackBlobs) + go func() { + defer close(out) + // only resort a part of the index to keep the memory overhead bounded + for i := byte(0); i < 16; i++ { + if ctx.Err() != nil { + return + } + + packBlob := make(map[restic.ID][]restic.Blob) + for pack := range packs { + if pack[0]&0xf == i { + packBlob[pack] = nil + } + } + if len(packBlob) == 0 { + continue + } + for pb := range mi.Each(ctx) { + if packs.Has(pb.PackID) && pb.PackID[0]&0xf == i { + packBlob[pb.PackID] = append(packBlob[pb.PackID], pb.Blob) + } + } + + // pass on packs + for packID, pbs := range packBlob { + select { + case out <- restic.PackBlobs{PackID: packID, Blobs: pbs}: + case <-ctx.Done(): + return + } + } + } + }() + return out +} diff --git a/internal/restic/repository.go b/internal/restic/repository.go index 38c611ce6..c2a6e9f74 100644 --- a/internal/restic/repository.go +++ b/internal/restic/repository.go @@ -60,6 +60,11 @@ type Lister interface { List(context.Context, FileType, func(FileInfo) error) error } +type PackBlobs struct { + PackID ID + Blobs []Blob +} + // MasterIndex keeps track of the blobs are stored within files. type MasterIndex interface { Has(BlobHandle) bool @@ -71,4 +76,5 @@ type MasterIndex interface { // the context is cancelled, the background goroutine terminates. This // blocks any modification of the index. Each(ctx context.Context) <-chan PackedBlob + ListPacks(ctx context.Context, packs IDSet) <-chan PackBlobs } From c4a2bfcb3925f9789745d6783ab5f95758638858 Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Fri, 20 Aug 2021 23:21:05 +0200 Subject: [PATCH 02/13] repository: Add StreamPacks function The function supports efficiently loading a specified list of blobs from a single pack in a streaming fashion. That is there's no need for temporary files independent of the pack size. --- internal/repository/repository.go | 85 +++++++++++++++++++++++++++++++ 1 file changed, 85 insertions(+) diff --git a/internal/repository/repository.go b/internal/repository/repository.go index 1ed65083c..df62f3ad7 100644 --- a/internal/repository/repository.go +++ b/internal/repository/repository.go @@ -1,12 +1,14 @@ package repository import ( + "bufio" "bytes" "context" "encoding/json" "fmt" "io" "os" + "sort" "sync" "github.com/restic/chunker" @@ -25,6 +27,8 @@ import ( "golang.org/x/sync/errgroup" ) +const maxStreamBufferSize = 4 * 1024 * 1024 + // Repository is used to access a repository in a backend. type Repository struct { be restic.Backend @@ -782,3 +786,84 @@ func DownloadAndHash(ctx context.Context, be Loader, h restic.Handle) (tmpfile * return tmpfile, hash, size, err } + +type BackendLoadFn func(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error + +func StreamPack(ctx context.Context, beLoad BackendLoadFn, key *crypto.Key, packID restic.ID, blobs []restic.Blob, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error { + if len(blobs) == 0 { + // nothing to do + return nil + } + + sort.Slice(blobs, func(i, j int) bool { + return blobs[i].Offset < blobs[j].Offset + }) + h := restic.Handle{Type: restic.PackFile, Name: packID.String()} + + dataStart := blobs[0].Offset + dataEnd := blobs[len(blobs)-1].Offset + blobs[len(blobs)-1].Length + + debug.Log("streaming pack %v (%d to %d bytes), blobs: %v", packID, dataStart, dataEnd, len(blobs)) + + // stream blobs in pack + err := beLoad(ctx, h, int(dataEnd-dataStart), int64(dataStart), func(rd io.Reader) error { + bufferSize := int(dataEnd - dataStart) + if bufferSize > maxStreamBufferSize { + bufferSize = maxStreamBufferSize + } + bufRd := bufio.NewReaderSize(rd, bufferSize) + currentBlobEnd := dataStart + var buf []byte + for _, entry := range blobs { + skipBytes := int(entry.Offset - currentBlobEnd) + if skipBytes < 0 { + return errors.Errorf("overlapping blobs in pack %v", packID) + } + + _, err := bufRd.Discard(skipBytes) + if err != nil { + return err + } + + h := restic.BlobHandle{ID: entry.ID, Type: entry.Type} + debug.Log(" process blob %v, skipped %d, %v", h, skipBytes, entry) + + if uint(cap(buf)) < entry.Length { + buf = make([]byte, entry.Length) + } + buf = buf[:entry.Length] + + n, err := io.ReadFull(bufRd, buf) + if err != nil { + debug.Log(" read error %v", err) + return errors.Wrap(err, "ReadFull") + } + + if n != len(buf) { + return errors.Errorf("read blob %v from %v: not enough bytes read, want %v, got %v", + h, packID.Str(), len(buf), n) + } + currentBlobEnd = entry.Offset + entry.Length + + // decryption errors are likely permanent, give the caller a chance to skip them + nonce, ciphertext := buf[:key.NonceSize()], buf[key.NonceSize():] + plaintext, err := key.Open(ciphertext[:0], nonce, ciphertext, nil) + if err == nil { + id := restic.Hash(plaintext) + if !id.Equal(entry.ID) { + debug.Log("read blob %v/%v from %v: wrong data returned, hash is %v", + h.Type, h.ID, packID.Str(), id) + err = errors.Errorf("read blob %v from %v: wrong data returned, hash is %v", + h, packID.Str(), id) + } + } + + err = handleBlobFn(entry.BlobHandle, plaintext, err) + if err != nil { + return err + } + } + return nil + }) + return errors.Wrap(err, "StreamPack") +} From f00f690658a5fa147acc2941569d1375b8a07bd5 Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Fri, 20 Aug 2021 10:10:35 +0200 Subject: [PATCH 03/13] repository: stream packs during repacking --- internal/repository/repack.go | 150 +++++++--------------------------- 1 file changed, 31 insertions(+), 119 deletions(-) diff --git a/internal/repository/repack.go b/internal/repository/repack.go index 423f3c831..0734c8206 100644 --- a/internal/repository/repack.go +++ b/internal/repository/repack.go @@ -2,13 +2,9 @@ package repository import ( "context" - "os" "sync" "github.com/restic/restic/internal/debug" - "github.com/restic/restic/internal/errors" - "github.com/restic/restic/internal/fs" - "github.com/restic/restic/internal/pack" "github.com/restic/restic/internal/restic" "github.com/restic/restic/internal/ui/progress" @@ -27,147 +23,63 @@ const numRepackWorkers = 8 func Repack(ctx context.Context, repo restic.Repository, packs restic.IDSet, keepBlobs restic.BlobSet, p *progress.Counter) (obsoletePacks restic.IDSet, err error) { debug.Log("repacking %d packs while keeping %d blobs", len(packs), len(keepBlobs)) + var keepMutex sync.Mutex wg, wgCtx := errgroup.WithContext(ctx) - downloadQueue := make(chan restic.ID) + downloadQueue := make(chan restic.PackBlobs) wg.Go(func() error { defer close(downloadQueue) - for packID := range packs { - select { - case downloadQueue <- packID: - case <-wgCtx.Done(): - return wgCtx.Err() - } - } - return nil - }) - - type repackJob struct { - tempfile *os.File - hash restic.ID - packLength int64 - } - processQueue := make(chan repackJob) - // used to close processQueue once all downloaders have finished - var downloadWG sync.WaitGroup - - downloader := func() error { - defer downloadWG.Done() - for packID := range downloadQueue { - // load the complete pack into a temp file - h := restic.Handle{Type: restic.PackFile, Name: packID.String()} - - tempfile, hash, packLength, err := DownloadAndHash(wgCtx, repo.Backend(), h) - if err != nil { - return errors.Wrap(err, "Repack") - } - - debug.Log("pack %v loaded (%d bytes), hash %v", packID, packLength, hash) - - if !packID.Equal(hash) { - return errors.Errorf("hash does not match id: want %v, got %v", packID, hash) - } - - select { - case processQueue <- repackJob{tempfile, hash, packLength}: - case <-wgCtx.Done(): - return wgCtx.Err() - } - } - return nil - } - - downloadWG.Add(numRepackWorkers) - for i := 0; i < numRepackWorkers; i++ { - wg.Go(downloader) - } - wg.Go(func() error { - downloadWG.Wait() - close(processQueue) - return nil - }) - - var keepMutex sync.Mutex - worker := func() error { - for job := range processQueue { - tempfile, packID, packLength := job.tempfile, job.hash, job.packLength - - blobs, _, err := pack.List(repo.Key(), tempfile, packLength) - if err != nil { - return err - } - - debug.Log("processing pack %v, blobs: %v", packID, len(blobs)) - var buf []byte - for _, entry := range blobs { + for pbs := range repo.Index().ListPacks(ctx, packs) { + var packBlobs []restic.Blob + keepMutex.Lock() + // filter out unnecessary blobs + for _, entry := range pbs.Blobs { h := restic.BlobHandle{ID: entry.ID, Type: entry.Type} - - keepMutex.Lock() - shouldKeep := keepBlobs.Has(h) - keepMutex.Unlock() - - if !shouldKeep { - continue + if keepBlobs.Has(h) { + packBlobs = append(packBlobs, entry) } + } + keepMutex.Unlock() - debug.Log(" process blob %v", h) + select { + case downloadQueue <- restic.PackBlobs{PackID: pbs.PackID, Blobs: packBlobs}: + case <-wgCtx.Done(): + return wgCtx.Err() + } + } + return nil + }) - if uint(cap(buf)) < entry.Length { - buf = make([]byte, entry.Length) - } - buf = buf[:entry.Length] - - n, err := tempfile.ReadAt(buf, int64(entry.Offset)) - if err != nil { - return errors.Wrap(err, "ReadAt") - } - - if n != len(buf) { - return errors.Errorf("read blob %v from %v: not enough bytes read, want %v, got %v", - h, tempfile.Name(), len(buf), n) - } - - nonce, ciphertext := buf[:repo.Key().NonceSize()], buf[repo.Key().NonceSize():] - plaintext, err := repo.Key().Open(ciphertext[:0], nonce, ciphertext, nil) + worker := func() error { + for t := range downloadQueue { + err := StreamPack(wgCtx, repo.Backend().Load, repo.Key(), t.PackID, t.Blobs, func(blob restic.BlobHandle, buf []byte, err error) error { if err != nil { return err } - id := restic.Hash(plaintext) - if !id.Equal(entry.ID) { - debug.Log("read blob %v/%v from %v: wrong data returned, hash is %v", - h.Type, h.ID, tempfile.Name(), id) - return errors.Errorf("read blob %v from %v: wrong data returned, hash is %v", - h, tempfile.Name(), id) - } - keepMutex.Lock() // recheck whether some other worker was faster - shouldKeep = keepBlobs.Has(h) + shouldKeep := keepBlobs.Has(blob) if shouldKeep { - keepBlobs.Delete(h) + keepBlobs.Delete(blob) } keepMutex.Unlock() if !shouldKeep { - continue + return nil } // We do want to save already saved blobs! - _, _, err = repo.SaveBlob(wgCtx, entry.Type, plaintext, entry.ID, true) + _, _, err = repo.SaveBlob(wgCtx, blob.Type, buf, blob.ID, true) if err != nil { return err } - debug.Log(" saved blob %v", entry.ID) - } - - if err = tempfile.Close(); err != nil { - return errors.Wrap(err, "Close") - } - - if err = fs.RemoveIfExists(tempfile.Name()); err != nil { - return errors.Wrap(err, "Remove") + debug.Log(" saved blob %v", blob.ID) + return nil + }) + if err != nil { + return err } p.Add(1) } From f40abd92fa97e5e54fc0b24666cc0498d889a41d Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Fri, 20 Aug 2021 12:12:38 +0200 Subject: [PATCH 04/13] restorer: convert to use StreamPack --- internal/repository/repository.go | 2 +- internal/restorer/filerestorer.go | 144 +++++++------------------ internal/restorer/filerestorer_test.go | 5 +- 3 files changed, 40 insertions(+), 111 deletions(-) diff --git a/internal/repository/repository.go b/internal/repository/repository.go index df62f3ad7..291e00da6 100644 --- a/internal/repository/repository.go +++ b/internal/repository/repository.go @@ -798,7 +798,7 @@ func StreamPack(ctx context.Context, beLoad BackendLoadFn, key *crypto.Key, pack sort.Slice(blobs, func(i, j int) bool { return blobs[i].Offset < blobs[j].Offset }) - h := restic.Handle{Type: restic.PackFile, Name: packID.String()} + h := restic.Handle{Type: restic.PackFile, Name: packID.String(), ContainedBlobType: restic.DataBlob} dataStart := blobs[0].Offset dataEnd := blobs[len(blobs)-1].Offset + blobs[len(blobs)-1].Length diff --git a/internal/restorer/filerestorer.go b/internal/restorer/filerestorer.go index d3d52f13a..323b69cf8 100644 --- a/internal/restorer/filerestorer.go +++ b/internal/restorer/filerestorer.go @@ -1,12 +1,9 @@ package restorer import ( - "bufio" "context" - "io" "math" "path/filepath" - "sort" "sync" "golang.org/x/sync/errgroup" @@ -14,6 +11,7 @@ import ( "github.com/restic/restic/internal/crypto" "github.com/restic/restic/internal/debug" "github.com/restic/restic/internal/errors" + "github.com/restic/restic/internal/repository" "github.com/restic/restic/internal/restic" ) @@ -52,7 +50,7 @@ type packInfo struct { type fileRestorer struct { key *crypto.Key idx func(restic.BlobHandle) []restic.PackedBlob - packLoader func(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error + packLoader repository.BackendLoadFn filesWriter *filesWriter @@ -62,7 +60,7 @@ type fileRestorer struct { } func newFileRestorer(dst string, - packLoader func(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error, + packLoader repository.BackendLoadFn, key *crypto.Key, idx func(restic.BlobHandle) []restic.PackedBlob) *fileRestorer { @@ -175,17 +173,14 @@ func (r *fileRestorer) restoreFiles(ctx context.Context) error { return wg.Wait() } -const maxBufferSize = 4 * 1024 * 1024 - func (r *fileRestorer) downloadPack(ctx context.Context, pack *packInfo) error { // calculate pack byte range and blob->[]files->[]offsets mappings start, end := int64(math.MaxInt64), int64(0) blobs := make(map[restic.ID]struct { - offset int64 // offset of the blob in the pack - length int // length of the blob - files map[*fileInfo][]int64 // file -> offsets (plural!) of the blob in the file + files map[*fileInfo][]int64 // file -> offsets (plural!) of the blob in the file }) + var blobList []restic.Blob for file := range pack.files { addBlob := func(blob restic.Blob, fileOffset int64) { if start > int64(blob.Offset) { @@ -196,9 +191,8 @@ func (r *fileRestorer) downloadPack(ctx context.Context, pack *packInfo) error { } blobInfo, ok := blobs[blob.ID] if !ok { - blobInfo.offset = int64(blob.Offset) - blobInfo.length = int(blob.Length) blobInfo.files = make(map[*fileInfo][]int64) + blobList = append(blobList, blob) blobs[blob.ID] = blobInfo } blobInfo.files[file] = append(blobInfo.files[file], fileOffset) @@ -228,14 +222,6 @@ func (r *fileRestorer) downloadPack(ctx context.Context, pack *packInfo) error { } } - sortedBlobs := make([]restic.ID, 0, len(blobs)) - for blobID := range blobs { - sortedBlobs = append(sortedBlobs, blobID) - } - sort.Slice(sortedBlobs, func(i, j int) bool { - return blobs[sortedBlobs[i]].offset < blobs[sortedBlobs[j]].offset - }) - sanitizeError := func(file *fileInfo, err error) error { if err != nil { err = r.Error(file.location, err) @@ -243,59 +229,39 @@ func (r *fileRestorer) downloadPack(ctx context.Context, pack *packInfo) error { return err } - h := restic.Handle{Type: restic.PackFile, Name: pack.id.String(), ContainedBlobType: restic.DataBlob} - err := r.packLoader(ctx, h, int(end-start), start, func(rd io.Reader) error { - bufferSize := int(end - start) - if bufferSize > maxBufferSize { - bufferSize = maxBufferSize - } - bufRd := bufio.NewReaderSize(rd, bufferSize) - currentBlobEnd := start - var blobData, buf []byte - for _, blobID := range sortedBlobs { - blob := blobs[blobID] - _, err := bufRd.Discard(int(blob.offset - currentBlobEnd)) - if err != nil { - return err - } - buf, err = r.downloadBlob(bufRd, blobID, blob.length, buf) - if err != nil { - return err - } - blobData, err = r.decryptBlob(blobID, buf) - if err != nil { - for file := range blob.files { - if errFile := sanitizeError(file, err); errFile != nil { - return errFile - } + err := repository.StreamPack(ctx, r.packLoader, r.key, pack.id, blobList, func(h restic.BlobHandle, blobData []byte, err error) error { + blob := blobs[h.ID] + if err != nil { + for file := range blob.files { + if errFile := sanitizeError(file, err); errFile != nil { + return errFile } - continue } - currentBlobEnd = blob.offset + int64(blob.length) - for file, offsets := range blob.files { - for _, offset := range offsets { - writeToFile := func() error { - // this looks overly complicated and needs explanation - // two competing requirements: - // - must create the file once and only once - // - should allow concurrent writes to the file - // so write the first blob while holding file lock - // write other blobs after releasing the lock - createSize := int64(-1) - file.lock.Lock() - if file.inProgress { - file.lock.Unlock() - } else { - defer file.lock.Unlock() - file.inProgress = true - createSize = file.size - } - return r.filesWriter.writeToFile(r.targetPath(file.location), blobData, offset, createSize) - } - err := sanitizeError(file, writeToFile()) - if err != nil { - return err + return nil + } + for file, offsets := range blob.files { + for _, offset := range offsets { + writeToFile := func() error { + // this looks overly complicated and needs explanation + // two competing requirements: + // - must create the file once and only once + // - should allow concurrent writes to the file + // so write the first blob while holding file lock + // write other blobs after releasing the lock + createSize := int64(-1) + file.lock.Lock() + if file.inProgress { + file.lock.Unlock() + } else { + defer file.lock.Unlock() + file.inProgress = true + createSize = file.size } + return r.filesWriter.writeToFile(r.targetPath(file.location), blobData, offset, createSize) + } + err := sanitizeError(file, writeToFile()) + if err != nil { + return err } } } @@ -312,41 +278,3 @@ func (r *fileRestorer) downloadPack(ctx context.Context, pack *packInfo) error { return nil } - -func (r *fileRestorer) downloadBlob(rd io.Reader, blobID restic.ID, length int, buf []byte) ([]byte, error) { - // TODO reconcile with Repository#loadBlob implementation - - if cap(buf) < length { - buf = make([]byte, length) - } else { - buf = buf[:length] - } - - n, err := io.ReadFull(rd, buf) - if err != nil { - return nil, err - } - - if n != length { - return nil, errors.Errorf("error loading blob %v: wrong length returned, want %d, got %d", blobID.Str(), length, n) - } - return buf, nil -} - -func (r *fileRestorer) decryptBlob(blobID restic.ID, buf []byte) ([]byte, error) { - // TODO reconcile with Repository#loadBlob implementation - - // decrypt - nonce, ciphertext := buf[:r.key.NonceSize()], buf[r.key.NonceSize():] - plaintext, err := r.key.Open(ciphertext[:0], nonce, ciphertext, nil) - if err != nil { - return nil, errors.Errorf("decrypting blob %v failed: %v", blobID, err) - } - - // check hash - if !restic.Hash(plaintext).Equal(blobID) { - return nil, errors.Errorf("blob %v returned invalid hash", blobID) - } - - return plaintext, nil -} diff --git a/internal/restorer/filerestorer_test.go b/internal/restorer/filerestorer_test.go index 333420b70..f5760f54a 100644 --- a/internal/restorer/filerestorer_test.go +++ b/internal/restorer/filerestorer_test.go @@ -10,6 +10,7 @@ import ( "github.com/restic/restic/internal/crypto" "github.com/restic/restic/internal/errors" + "github.com/restic/restic/internal/repository" "github.com/restic/restic/internal/restic" rtest "github.com/restic/restic/internal/test" ) @@ -38,7 +39,7 @@ type TestRepo struct { filesPathToContent map[string]string // - loader func(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error + loader repository.BackendLoadFn } func (i *TestRepo) Lookup(bh restic.BlobHandle) []restic.PackedBlob { @@ -267,7 +268,7 @@ func TestErrorRestoreFiles(t *testing.T) { r.files = repo.files err := r.restoreFiles(context.TODO()) - rtest.Equals(t, loadError, err) + rtest.Assert(t, errors.Is(err, loadError), "got %v, expected contained error %v", err, loadError) } func TestDownloadError(t *testing.T) { From f1e58e7c7f72206ac3bc648f6c2c63b80fe34b4f Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Fri, 20 Aug 2021 16:15:40 +0200 Subject: [PATCH 05/13] checker: rewrite ReadData to stream packs --- cmd/restic/integration_test.go | 3 - internal/checker/checker.go | 184 ++++++++++++++++++------------ internal/pack/pack.go | 5 +- internal/repository/repository.go | 6 +- 4 files changed, 119 insertions(+), 79 deletions(-) diff --git a/cmd/restic/integration_test.go b/cmd/restic/integration_test.go index 42936d2ea..f926306eb 100644 --- a/cmd/restic/integration_test.go +++ b/cmd/restic/integration_test.go @@ -2135,7 +2135,4 @@ func TestBackendLoadWriteTo(t *testing.T) { firstSnapshot := testRunList(t, "snapshots", env.gopts) rtest.Assert(t, len(firstSnapshot) == 1, "expected one snapshot, got %v", firstSnapshot) - - // test readData using the hashing.Reader - testRunCheck(t, env.gopts) } diff --git a/internal/checker/checker.go b/internal/checker/checker.go index e842a08be..3186b90d0 100644 --- a/internal/checker/checker.go +++ b/internal/checker/checker.go @@ -1,14 +1,18 @@ package checker import ( + "bufio" + "bytes" "context" "fmt" "io" - "os" + "sort" "sync" + "github.com/minio/sha256-simd" "github.com/restic/restic/internal/debug" "github.com/restic/restic/internal/errors" + "github.com/restic/restic/internal/hashing" "github.com/restic/restic/internal/pack" "github.com/restic/restic/internal/repository" "github.com/restic/restic/internal/restic" @@ -436,78 +440,112 @@ func (c *Checker) GetPacks() map[restic.ID]int64 { } // checkPack reads a pack and checks the integrity of all blobs. -func checkPack(ctx context.Context, r restic.Repository, id restic.ID, size int64) error { - debug.Log("checking pack %v", id) - h := restic.Handle{Type: restic.PackFile, Name: id.String()} +func checkPack(ctx context.Context, r restic.Repository, id restic.ID, blobs []restic.Blob, size int64) error { + debug.Log("checking pack %v", id.String()) - packfile, hash, realSize, err := repository.DownloadAndHash(ctx, r.Backend(), h) - if err != nil { - return errors.Wrap(err, "checkPack") + if len(blobs) == 0 { + return errors.Errorf("pack %v is empty or not indexed", id) } - defer func() { - _ = packfile.Close() - _ = os.Remove(packfile.Name()) - }() + // sanity check blobs in index + sort.Slice(blobs, func(i, j int) bool { + return blobs[i].Offset < blobs[j].Offset + }) + idxHdrSize := pack.HeaderSize + len(blobs)*int(pack.EntrySize) + lastBlobEnd := 0 + nonContinuousPack := false + for _, blob := range blobs { + if lastBlobEnd != int(blob.Offset) { + nonContinuousPack = true + } + lastBlobEnd = int(blob.Offset + blob.Length) + } + // size was calculated by masterindex.PackSize, thus there's no need to recalculate it here - debug.Log("hash for pack %v is %v", id, hash) + var errs []error + if nonContinuousPack { + debug.Log("Index for pack contains gaps / overlaps, blobs: %v", blobs) + errs = append(errs, errors.New("Index for pack contains gaps / overlapping blobs")) + } + // calculate hash on-the-fly while reading the pack and capture pack header + var hash restic.ID + var hdrBuf *bytes.Buffer + hashingLoader := func(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error { + return r.Backend().Load(ctx, h, int(size), 0, func(rd io.Reader) error { + hrd := hashing.NewReader(rd, sha256.New()) + + // create a buffer that is large enough to be reused by repository.StreamPack + // this ensures that we can read the pack header later on + bufferSize := int(size) + if bufferSize > repository.MaxStreamBufferSize { + bufferSize = repository.MaxStreamBufferSize + } + bufRd := bufio.NewReaderSize(hrd, bufferSize) + + // skip to start of first blob, offset == 0 for correct pack files + _, err := bufRd.Discard(int(offset)) + if err != nil { + return err + } + + err = fn(bufRd) + if err != nil { + return err + } + + // skip enough bytes until we reach the possible header start + curPos := length + int(offset) + minHdrStart := int(size) - pack.MaxHeaderSize + if minHdrStart > curPos { + _, err := bufRd.Discard(minHdrStart - curPos) + if err != nil { + return err + } + } + + // read remainder, which should be the pack header + hdrBuf = new(bytes.Buffer) + _, err = io.Copy(hdrBuf, bufRd) + if err != nil { + return err + } + + hash = restic.IDFromHash(hrd.Sum(nil)) + return nil + }) + } + + err := repository.StreamPack(ctx, hashingLoader, r.Key(), id, blobs, func(blob restic.BlobHandle, buf []byte, err error) error { + debug.Log(" check blob %v: %v", blob.ID, blob) + if err != nil { + debug.Log(" error verifying blob %v: %v", blob.ID, err) + errs = append(errs, errors.Errorf("blob %v: %v", blob.ID, err)) + } + return nil + }) + if err != nil { + // failed to load the pack file, return as further checks cannot succeed anyways + debug.Log(" error streaming pack: %v", err) + return errors.Errorf("pack %v failed to download: %v", err) + } if !hash.Equal(id) { debug.Log("Pack ID does not match, want %v, got %v", id, hash) return errors.Errorf("Pack ID does not match, want %v, got %v", id.Str(), hash.Str()) } - if realSize != size { - debug.Log("Pack size does not match, want %v, got %v", size, realSize) - return errors.Errorf("Pack size does not match, want %v, got %v", size, realSize) - } - - blobs, hdrSize, err := pack.List(r.Key(), packfile, size) + blobs, hdrSize, err := pack.List(r.Key(), bytes.NewReader(hdrBuf.Bytes()), int64(hdrBuf.Len())) if err != nil { return err } - var errs []error - var buf []byte - sizeFromBlobs := uint(hdrSize) + if uint32(idxHdrSize) != hdrSize { + debug.Log("Pack header size does not match, want %v, got %v", idxHdrSize, hdrSize) + errs = append(errs, errors.Errorf("Pack header size does not match, want %v, got %v", idxHdrSize, hdrSize)) + } + idx := r.Index() - for i, blob := range blobs { - sizeFromBlobs += blob.Length - debug.Log(" check blob %d: %v", i, blob) - - buf = buf[:cap(buf)] - if uint(len(buf)) < blob.Length { - buf = make([]byte, blob.Length) - } - buf = buf[:blob.Length] - - _, err := packfile.Seek(int64(blob.Offset), 0) - if err != nil { - return errors.Errorf("Seek(%v): %v", blob.Offset, err) - } - - _, err = io.ReadFull(packfile, buf) - if err != nil { - debug.Log(" error loading blob %v: %v", blob.ID, err) - errs = append(errs, errors.Errorf("blob %v: %v", i, err)) - continue - } - - nonce, ciphertext := buf[:r.Key().NonceSize()], buf[r.Key().NonceSize():] - plaintext, err := r.Key().Open(ciphertext[:0], nonce, ciphertext, nil) - if err != nil { - debug.Log(" error decrypting blob %v: %v", blob.ID, err) - errs = append(errs, errors.Errorf("blob %v: %v", i, err)) - continue - } - - hash := restic.Hash(plaintext) - if !hash.Equal(blob.ID) { - debug.Log(" Blob ID does not match, want %v, got %v", blob.ID, hash) - errs = append(errs, errors.Errorf("Blob ID does not match, want %v, got %v", blob.ID.Str(), hash.Str())) - continue - } - + for _, blob := range blobs { // Check if blob is contained in index and position is correct idxHas := false for _, pb := range idx.Lookup(blob.BlobHandle) { @@ -522,11 +560,6 @@ func checkPack(ctx context.Context, r restic.Repository, id restic.ID, size int6 } } - if int64(sizeFromBlobs) != size { - debug.Log("Pack size does not match, want %v, got %v", size, sizeFromBlobs) - errs = append(errs, errors.Errorf("Pack size does not match, want %v, got %v", size, sizeFromBlobs)) - } - if len(errs) > 0 { return errors.Errorf("pack %v contains %v errors: %v", id.Str(), len(errs), errs) } @@ -544,17 +577,18 @@ func (c *Checker) ReadPacks(ctx context.Context, packs map[restic.ID]int64, p *p defer close(errChan) g, ctx := errgroup.WithContext(ctx) - type packsize struct { - id restic.ID - size int64 + type checkTask struct { + id restic.ID + size int64 + blobs []restic.Blob } - ch := make(chan packsize) + ch := make(chan checkTask) // run workers for i := 0; i < defaultParallelism; i++ { g.Go(func() error { for { - var ps packsize + var ps checkTask var ok bool select { @@ -565,7 +599,8 @@ func (c *Checker) ReadPacks(ctx context.Context, packs map[restic.ID]int64, p *p return nil } } - err := checkPack(ctx, c.repo, ps.id, ps.size) + + err := checkPack(ctx, c.repo, ps.id, ps.blobs, ps.size) p.Add(1) if err == nil { continue @@ -580,10 +615,17 @@ func (c *Checker) ReadPacks(ctx context.Context, packs map[restic.ID]int64, p *p }) } + packSet := restic.NewIDSet() + for pack := range packs { + packSet.Insert(pack) + } + // push packs to ch - for pack, size := range packs { + for pbs := range c.repo.Index().ListPacks(ctx, packSet) { + size := packs[pbs.PackID] + debug.Log("listed %v", pbs.PackID) select { - case ch <- packsize{id: pack, size: size}: + case ch <- checkTask{id: pbs.PackID, size: size, blobs: pbs.Blobs}: case <-ctx.Done(): } } diff --git a/internal/pack/pack.go b/internal/pack/pack.go index d679c658b..95f298acb 100644 --- a/internal/pack/pack.go +++ b/internal/pack/pack.go @@ -160,7 +160,8 @@ const ( // HeaderSize is the header's constant overhead (independent of #entries) HeaderSize = headerLengthSize + crypto.Extension - maxHeaderSize = 16 * 1024 * 1024 + // MaxHeaderSize is the max size of header including header-length field + MaxHeaderSize = 16*1024*1024 + headerLengthSize // number of header enries to download as part of header-length request eagerEntries = 15 ) @@ -199,7 +200,7 @@ func readRecords(rd io.ReaderAt, size int64, max int) ([]byte, int, error) { err = InvalidFileError{Message: "header length is invalid"} case int64(hlen) > size-int64(headerLengthSize): err = InvalidFileError{Message: "header is larger than file"} - case int64(hlen) > maxHeaderSize: + case int64(hlen) > MaxHeaderSize-int64(headerLengthSize): err = InvalidFileError{Message: "header is larger than maxHeaderSize"} } if err != nil { diff --git a/internal/repository/repository.go b/internal/repository/repository.go index 291e00da6..8bb81d390 100644 --- a/internal/repository/repository.go +++ b/internal/repository/repository.go @@ -27,7 +27,7 @@ import ( "golang.org/x/sync/errgroup" ) -const maxStreamBufferSize = 4 * 1024 * 1024 +const MaxStreamBufferSize = 4 * 1024 * 1024 // Repository is used to access a repository in a backend. type Repository struct { @@ -808,8 +808,8 @@ func StreamPack(ctx context.Context, beLoad BackendLoadFn, key *crypto.Key, pack // stream blobs in pack err := beLoad(ctx, h, int(dataEnd-dataStart), int64(dataStart), func(rd io.Reader) error { bufferSize := int(dataEnd - dataStart) - if bufferSize > maxStreamBufferSize { - bufferSize = maxStreamBufferSize + if bufferSize > MaxStreamBufferSize { + bufferSize = MaxStreamBufferSize } bufRd := bufio.NewReaderSize(rd, bufferSize) currentBlobEnd := dataStart From becebf5d883e04daa66d61dd503545d13074b499 Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Fri, 20 Aug 2021 16:16:10 +0200 Subject: [PATCH 06/13] repository: remove unused DownloadAndHash --- internal/repository/repository.go | 44 ---------- internal/repository/repository_test.go | 106 ------------------------- 2 files changed, 150 deletions(-) diff --git a/internal/repository/repository.go b/internal/repository/repository.go index 8bb81d390..b43c3cec9 100644 --- a/internal/repository/repository.go +++ b/internal/repository/repository.go @@ -17,13 +17,10 @@ import ( "github.com/restic/restic/internal/crypto" "github.com/restic/restic/internal/debug" "github.com/restic/restic/internal/errors" - "github.com/restic/restic/internal/fs" - "github.com/restic/restic/internal/hashing" "github.com/restic/restic/internal/pack" "github.com/restic/restic/internal/restic" "github.com/restic/restic/internal/ui/progress" - "github.com/minio/sha256-simd" "golang.org/x/sync/errgroup" ) @@ -746,47 +743,6 @@ type Loader interface { Load(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error } -// DownloadAndHash is all-in-one helper to download content of the file at h to a temporary filesystem location -// and calculate ID of the contents. Returned (temporary) file is positioned at the beginning of the file; -// it is the reponsibility of the caller to close and delete the file. -func DownloadAndHash(ctx context.Context, be Loader, h restic.Handle) (tmpfile *os.File, hash restic.ID, size int64, err error) { - tmpfile, err = fs.TempFile("", "restic-temp-") - if err != nil { - return nil, restic.ID{}, -1, errors.Wrap(err, "TempFile") - } - - err = be.Load(ctx, h, 0, 0, func(rd io.Reader) (ierr error) { - _, ierr = tmpfile.Seek(0, io.SeekStart) - if ierr == nil { - ierr = tmpfile.Truncate(0) - } - if ierr != nil { - return ierr - } - hrd := hashing.NewReader(rd, sha256.New()) - size, ierr = io.Copy(tmpfile, hrd) - hash = restic.IDFromHash(hrd.Sum(nil)) - return ierr - }) - - if err != nil { - // ignore subsequent errors - _ = tmpfile.Close() - _ = os.Remove(tmpfile.Name()) - return nil, restic.ID{}, -1, errors.Wrap(err, "Load") - } - - _, err = tmpfile.Seek(0, io.SeekStart) - if err != nil { - // ignore subsequent errors - _ = tmpfile.Close() - _ = os.Remove(tmpfile.Name()) - return nil, restic.ID{}, -1, errors.Wrap(err, "Seek") - } - - return tmpfile, hash, size, err -} - type BackendLoadFn func(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error func StreamPack(ctx context.Context, beLoad BackendLoadFn, key *crypto.Key, packID restic.ID, blobs []restic.Blob, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error { diff --git a/internal/repository/repository_test.go b/internal/repository/repository_test.go index 22a424556..083b6edff 100644 --- a/internal/repository/repository_test.go +++ b/internal/repository/repository_test.go @@ -13,8 +13,6 @@ import ( "time" "github.com/restic/restic/internal/archiver" - "github.com/restic/restic/internal/errors" - "github.com/restic/restic/internal/fs" "github.com/restic/restic/internal/repository" "github.com/restic/restic/internal/restic" rtest "github.com/restic/restic/internal/test" @@ -411,107 +409,3 @@ func TestRepositoryIncrementalIndex(t *testing.T) { } } } - -type backend struct { - rd io.Reader -} - -func (be backend) Load(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error { - return fn(be.rd) -} - -type retryBackend struct { - buf []byte -} - -func (be retryBackend) Load(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error { - err := fn(bytes.NewReader(be.buf[:len(be.buf)/2])) - if err != nil { - return err - } - - return fn(bytes.NewReader(be.buf)) -} - -func TestDownloadAndHash(t *testing.T) { - buf := make([]byte, 5*1024*1024+881) - _, err := io.ReadFull(rnd, buf) - if err != nil { - t.Fatal(err) - } - - var tests = []struct { - be repository.Loader - want []byte - }{ - { - be: backend{rd: bytes.NewReader(buf)}, - want: buf, - }, - { - be: retryBackend{buf: buf}, - want: buf, - }, - } - - for _, test := range tests { - t.Run("", func(t *testing.T) { - f, id, size, err := repository.DownloadAndHash(context.TODO(), test.be, restic.Handle{}) - if err != nil { - t.Error(err) - } - - want := restic.Hash(test.want) - if !want.Equal(id) { - t.Errorf("wrong hash returned, want %v, got %v", want.Str(), id.Str()) - } - - if size != int64(len(test.want)) { - t.Errorf("wrong size returned, want %v, got %v", test.want, size) - } - - err = f.Close() - if err != nil { - t.Error(err) - } - - err = fs.RemoveIfExists(f.Name()) - if err != nil { - t.Fatal(err) - } - }) - } -} - -type errorReader struct { - err error -} - -func (er errorReader) Read(p []byte) (n int, err error) { - return 0, er.err -} - -func TestDownloadAndHashErrors(t *testing.T) { - var tests = []struct { - be repository.Loader - err string - }{ - { - be: backend{rd: errorReader{errors.New("test error 1")}}, - err: "test error 1", - }, - } - - for _, test := range tests { - t.Run("", func(t *testing.T) { - _, _, _, err := repository.DownloadAndHash(context.TODO(), test.be, restic.Handle{}) - if err == nil { - t.Fatalf("wanted error %q, got nil", test.err) - } - - if errors.Cause(err).Error() != test.err { - t.Fatalf("wanted error %q, got %q", test.err, err) - } - }) - } -} From 34ebafb8b65cef86fdda9bef64bc33e968e0d319 Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Fri, 20 Aug 2021 16:16:45 +0200 Subject: [PATCH 07/13] repository: don't crash if blob size is too short --- internal/repository/repository.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/internal/repository/repository.go b/internal/repository/repository.go index b43c3cec9..472c4cd15 100644 --- a/internal/repository/repository.go +++ b/internal/repository/repository.go @@ -801,6 +801,11 @@ func StreamPack(ctx context.Context, beLoad BackendLoadFn, key *crypto.Key, pack } currentBlobEnd = entry.Offset + entry.Length + if int(entry.Length) <= key.NonceSize() { + debug.Log("%v", blobs) + return errors.Errorf("invalid blob length %v", entry) + } + // decryption errors are likely permanent, give the caller a chance to skip them nonce, ciphertext := buf[:key.NonceSize()], buf[key.NonceSize():] plaintext, err := key.Open(ciphertext[:0], nonce, ciphertext, nil) From 930a00ad54b545bdd5e9fa6a4d4fb6a030f37f4a Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Sun, 22 Aug 2021 14:38:27 +0200 Subject: [PATCH 08/13] checker: reuse bufio reader --- internal/checker/checker.go | 16 ++++++---------- internal/repository/repository.go | 1 + 2 files changed, 7 insertions(+), 10 deletions(-) diff --git a/internal/checker/checker.go b/internal/checker/checker.go index 3186b90d0..ce6eafd74 100644 --- a/internal/checker/checker.go +++ b/internal/checker/checker.go @@ -440,7 +440,7 @@ func (c *Checker) GetPacks() map[restic.ID]int64 { } // checkPack reads a pack and checks the integrity of all blobs. -func checkPack(ctx context.Context, r restic.Repository, id restic.ID, blobs []restic.Blob, size int64) error { +func checkPack(ctx context.Context, r restic.Repository, id restic.ID, blobs []restic.Blob, size int64, bufRd *bufio.Reader) error { debug.Log("checking pack %v", id.String()) if len(blobs) == 0 { @@ -474,14 +474,7 @@ func checkPack(ctx context.Context, r restic.Repository, id restic.ID, blobs []r hashingLoader := func(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error { return r.Backend().Load(ctx, h, int(size), 0, func(rd io.Reader) error { hrd := hashing.NewReader(rd, sha256.New()) - - // create a buffer that is large enough to be reused by repository.StreamPack - // this ensures that we can read the pack header later on - bufferSize := int(size) - if bufferSize > repository.MaxStreamBufferSize { - bufferSize = repository.MaxStreamBufferSize - } - bufRd := bufio.NewReaderSize(hrd, bufferSize) + bufRd.Reset(hrd) // skip to start of first blob, offset == 0 for correct pack files _, err := bufRd.Discard(int(offset)) @@ -587,6 +580,9 @@ func (c *Checker) ReadPacks(ctx context.Context, packs map[restic.ID]int64, p *p // run workers for i := 0; i < defaultParallelism; i++ { g.Go(func() error { + // create a buffer that is large enough to be reused by repository.StreamPack + // this ensures that we can read the pack header later on + bufRd := bufio.NewReaderSize(nil, repository.MaxStreamBufferSize) for { var ps checkTask var ok bool @@ -600,7 +596,7 @@ func (c *Checker) ReadPacks(ctx context.Context, packs map[restic.ID]int64, p *p } } - err := checkPack(ctx, c.repo, ps.id, ps.blobs, ps.size) + err := checkPack(ctx, c.repo, ps.id, ps.blobs, ps.size, bufRd) p.Add(1) if err == nil { continue diff --git a/internal/repository/repository.go b/internal/repository/repository.go index 472c4cd15..bbacd5196 100644 --- a/internal/repository/repository.go +++ b/internal/repository/repository.go @@ -767,6 +767,7 @@ func StreamPack(ctx context.Context, beLoad BackendLoadFn, key *crypto.Key, pack if bufferSize > MaxStreamBufferSize { bufferSize = MaxStreamBufferSize } + // create reader here to allow reusing the buffered reader from checker.checkData bufRd := bufio.NewReaderSize(rd, bufferSize) currentBlobEnd := dataStart var buf []byte From 4b3dc415ef41593fdd5e3cc0db3b78b3bbef5411 Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Sat, 4 Sep 2021 14:08:53 +0200 Subject: [PATCH 09/13] checker: cleanup header extraction --- internal/checker/checker.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/internal/checker/checker.go b/internal/checker/checker.go index ce6eafd74..5262789d0 100644 --- a/internal/checker/checker.go +++ b/internal/checker/checker.go @@ -6,6 +6,7 @@ import ( "context" "fmt" "io" + "io/ioutil" "sort" "sync" @@ -470,7 +471,7 @@ func checkPack(ctx context.Context, r restic.Repository, id restic.ID, blobs []r // calculate hash on-the-fly while reading the pack and capture pack header var hash restic.ID - var hdrBuf *bytes.Buffer + var hdrBuf []byte hashingLoader := func(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error { return r.Backend().Load(ctx, h, int(size), 0, func(rd io.Reader) error { hrd := hashing.NewReader(rd, sha256.New()) @@ -498,8 +499,7 @@ func checkPack(ctx context.Context, r restic.Repository, id restic.ID, blobs []r } // read remainder, which should be the pack header - hdrBuf = new(bytes.Buffer) - _, err = io.Copy(hdrBuf, bufRd) + hdrBuf, err = ioutil.ReadAll(bufRd) if err != nil { return err } @@ -527,7 +527,7 @@ func checkPack(ctx context.Context, r restic.Repository, id restic.ID, blobs []r return errors.Errorf("Pack ID does not match, want %v, got %v", id.Str(), hash.Str()) } - blobs, hdrSize, err := pack.List(r.Key(), bytes.NewReader(hdrBuf.Bytes()), int64(hdrBuf.Len())) + blobs, hdrSize, err := pack.List(r.Key(), bytes.NewReader(hdrBuf), int64(len(hdrBuf))) if err != nil { return err } From 47554a3428d5babcd490522660afe5fc5ec4beea Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Sat, 4 Sep 2021 16:09:34 +0200 Subject: [PATCH 10/13] repository: Fix error handling in repack When storing a blob fails, this is a fatal error which must not be retried. --- internal/repository/repository.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/internal/repository/repository.go b/internal/repository/repository.go index bbacd5196..31bd76685 100644 --- a/internal/repository/repository.go +++ b/internal/repository/repository.go @@ -11,6 +11,7 @@ import ( "sort" "sync" + "github.com/cenkalti/backoff" "github.com/restic/chunker" "github.com/restic/restic/internal/backend/dryrun" "github.com/restic/restic/internal/cache" @@ -822,7 +823,7 @@ func StreamPack(ctx context.Context, beLoad BackendLoadFn, key *crypto.Key, pack err = handleBlobFn(entry.BlobHandle, plaintext, err) if err != nil { - return err + return backoff.Permanent(err) } } return nil From bba8ba7a5bf118b1edb10f1bf6993b8fe428092a Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Sun, 5 Sep 2021 12:20:07 +0200 Subject: [PATCH 11/13] repository: cancel streampack context after error --- internal/repository/repository.go | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/internal/repository/repository.go b/internal/repository/repository.go index 31bd76685..39953cd88 100644 --- a/internal/repository/repository.go +++ b/internal/repository/repository.go @@ -11,7 +11,7 @@ import ( "sort" "sync" - "github.com/cenkalti/backoff" + "github.com/cenkalti/backoff/v4" "github.com/restic/chunker" "github.com/restic/restic/internal/backend/dryrun" "github.com/restic/restic/internal/cache" @@ -746,6 +746,10 @@ type Loader interface { type BackendLoadFn func(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error +// StreamPack loads the listed blobs from the specified pack file. The plaintext blob is passed to +// the handleBlobFn callback or an error if decryption failed or the blob hash does not match. In +// case of download errors handleBlobFn might be called multiple times for the same blob. If the +// callback returns an error, then StreamPack will abort and not retry it. func StreamPack(ctx context.Context, beLoad BackendLoadFn, key *crypto.Key, packID restic.ID, blobs []restic.Blob, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error { if len(blobs) == 0 { // nothing to do @@ -762,8 +766,13 @@ func StreamPack(ctx context.Context, beLoad BackendLoadFn, key *crypto.Key, pack debug.Log("streaming pack %v (%d to %d bytes), blobs: %v", packID, dataStart, dataEnd, len(blobs)) + ctx, cancel := context.WithCancel(ctx) // stream blobs in pack err := beLoad(ctx, h, int(dataEnd-dataStart), int64(dataStart), func(rd io.Reader) error { + // prevent callbacks after cancelation + if ctx.Err() != nil { + return ctx.Err() + } bufferSize := int(dataEnd - dataStart) if bufferSize > MaxStreamBufferSize { bufferSize = MaxStreamBufferSize @@ -823,6 +832,7 @@ func StreamPack(ctx context.Context, beLoad BackendLoadFn, key *crypto.Key, pack err = handleBlobFn(entry.BlobHandle, plaintext, err) if err != nil { + cancel() return backoff.Permanent(err) } } From 27524979e860b3bb1f447d8cedad09a48f7811d4 Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Sun, 13 Feb 2022 11:43:09 +0100 Subject: [PATCH 12/13] restorer: Remove dead code --- internal/restorer/filerestorer.go | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/internal/restorer/filerestorer.go b/internal/restorer/filerestorer.go index 323b69cf8..84c7834df 100644 --- a/internal/restorer/filerestorer.go +++ b/internal/restorer/filerestorer.go @@ -2,7 +2,6 @@ package restorer import ( "context" - "math" "path/filepath" "sync" @@ -118,7 +117,7 @@ func (r *fileRestorer) restoreFiles(ctx context.Context) error { err := r.forEachBlob(fileBlobs, func(packID restic.ID, blob restic.Blob) { if largeFile { packsMap[packID] = append(packsMap[packID], fileBlobInfo{id: blob.ID, offset: fileOffset}) - fileOffset += int64(blob.Length) - crypto.Extension + fileOffset += int64(restic.PlaintextLength(int(blob.Length))) } pack, ok := packs[packID] if !ok { @@ -175,20 +174,13 @@ func (r *fileRestorer) restoreFiles(ctx context.Context) error { func (r *fileRestorer) downloadPack(ctx context.Context, pack *packInfo) error { - // calculate pack byte range and blob->[]files->[]offsets mappings - start, end := int64(math.MaxInt64), int64(0) + // calculate blob->[]files->[]offsets mappings blobs := make(map[restic.ID]struct { files map[*fileInfo][]int64 // file -> offsets (plural!) of the blob in the file }) var blobList []restic.Blob for file := range pack.files { addBlob := func(blob restic.Blob, fileOffset int64) { - if start > int64(blob.Offset) { - start = int64(blob.Offset) - } - if end < int64(blob.Offset+blob.Length) { - end = int64(blob.Offset + blob.Length) - } blobInfo, ok := blobs[blob.ID] if !ok { blobInfo.files = make(map[*fileInfo][]int64) From e682f7c0d610b1b8334ba3cf108a344c3a1709dc Mon Sep 17 00:00:00 2001 From: Alexander Neumann Date: Mon, 21 Mar 2022 20:38:53 +0100 Subject: [PATCH 13/13] Add tests for StreamPack --- internal/repository/repository_test.go | 203 +++++++++++++++++++++++++ 1 file changed, 203 insertions(+) diff --git a/internal/repository/repository_test.go b/internal/repository/repository_test.go index 083b6edff..167ffa535 100644 --- a/internal/repository/repository_test.go +++ b/internal/repository/repository_test.go @@ -4,17 +4,22 @@ import ( "bytes" "context" "crypto/sha256" + "encoding/json" "fmt" "io" "math/rand" "os" "path/filepath" + "strings" "testing" "time" + "github.com/google/go-cmp/cmp" "github.com/restic/restic/internal/archiver" + "github.com/restic/restic/internal/crypto" "github.com/restic/restic/internal/repository" "github.com/restic/restic/internal/restic" + "github.com/restic/restic/internal/test" rtest "github.com/restic/restic/internal/test" ) @@ -408,4 +413,202 @@ func TestRepositoryIncrementalIndex(t *testing.T) { t.Errorf("pack %v listed in %d indexes\n", packID, len(ids)) } } + +} + +// buildPackfileWithoutHeader returns a manually built pack file without a header. +func buildPackfileWithoutHeader(t testing.TB, blobSizes []int, key *crypto.Key) (blobs []restic.Blob, packfile []byte) { + var offset uint + for i, size := range blobSizes { + plaintext := test.Random(800+i, size) + + // we use a deterministic nonce here so the whole process is + // deterministic, last byte is the blob index + var nonce = []byte{ + 0x15, 0x98, 0xc0, 0xf7, 0xb9, 0x65, 0x97, 0x74, + 0x12, 0xdc, 0xd3, 0x62, 0xa9, 0x6e, 0x20, byte(i), + } + + before := len(packfile) + packfile = append(packfile, nonce...) + packfile = key.Seal(packfile, nonce, plaintext, nil) + after := len(packfile) + + ciphertextLength := after - before + + blobs = append(blobs, restic.Blob{ + BlobHandle: restic.BlobHandle{ + ID: restic.Hash(plaintext), + Type: restic.DataBlob, + }, + Length: uint(ciphertextLength), + Offset: offset, + }) + + offset = uint(len(packfile)) + } + + return blobs, packfile +} + +func TestStreamPack(t *testing.T) { + // always use the same key for deterministic output + const jsonKey = `{"mac":{"k":"eQenuI8adktfzZMuC8rwdA==","r":"k8cfAly2qQSky48CQK7SBA=="},"encrypt":"MKO9gZnRiQFl8mDUurSDa9NMjiu9MUifUrODTHS05wo="}` + + var key crypto.Key + err := json.Unmarshal([]byte(jsonKey), &key) + if err != nil { + t.Fatal(err) + } + + blobSizes := []int{ + 10, + 5231, + 18812, + 123123, + 12301, + 892242, + 28616, + 13351, + 252287, + 188883, + 2522811, + 18883, + } + + packfileBlobs, packfile := buildPackfileWithoutHeader(t, blobSizes, &key) + + load := func(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error { + data := packfile + + if offset > int64(len(data)) { + offset = 0 + length = 0 + } + data = data[offset:] + + if length > len(data) { + length = len(data) + } + + data = data[:length] + + return fn(bytes.NewReader(data)) + + } + + // first, test regular usage + t.Run("regular", func(t *testing.T) { + tests := []struct { + blobs []restic.Blob + }{ + {packfileBlobs[1:2]}, + {packfileBlobs[2:5]}, + {packfileBlobs[2:8]}, + {[]restic.Blob{ + packfileBlobs[0], + packfileBlobs[8], + packfileBlobs[4], + }}, + {[]restic.Blob{ + packfileBlobs[0], + packfileBlobs[len(packfileBlobs)-1], + }}, + } + + for _, test := range tests { + t.Run("", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + gotBlobs := make(map[restic.ID]int) + + handleBlob := func(blob restic.BlobHandle, buf []byte, err error) error { + gotBlobs[blob.ID]++ + + id := restic.Hash(buf) + if !id.Equal(blob.ID) { + t.Fatalf("wrong id %v for blob %s returned", id, blob.ID) + } + + return err + } + + wantBlobs := make(map[restic.ID]int) + for _, blob := range test.blobs { + wantBlobs[blob.ID] = 1 + } + + err = repository.StreamPack(ctx, load, &key, restic.ID{}, test.blobs, handleBlob) + if err != nil { + t.Fatal(err) + } + + if !cmp.Equal(wantBlobs, gotBlobs) { + t.Fatal(cmp.Diff(wantBlobs, gotBlobs)) + } + }) + } + }) + + // next, test invalid uses, which should return an error + t.Run("invalid", func(t *testing.T) { + tests := []struct { + blobs []restic.Blob + err string + }{ + { + // pass one blob several times + blobs: []restic.Blob{ + packfileBlobs[3], + packfileBlobs[8], + packfileBlobs[3], + packfileBlobs[4], + }, + err: "overlapping blobs in pack", + }, + + { + // pass something that's not a valid blob in the current pack file + blobs: []restic.Blob{ + { + Offset: 123, + Length: 20000, + }, + }, + err: "ciphertext verification failed", + }, + + { + // pass a blob that's too small + blobs: []restic.Blob{ + { + Offset: 123, + Length: 10, + }, + }, + err: "invalid blob length", + }, + } + + for _, test := range tests { + t.Run("", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + handleBlob := func(blob restic.BlobHandle, buf []byte, err error) error { + return err + } + + err = repository.StreamPack(ctx, load, &key, restic.ID{}, test.blobs, handleBlob) + if err == nil { + t.Fatalf("wanted error %v, got nil", test.err) + } + + if !strings.Contains(err.Error(), test.err) { + t.Fatalf("wrong error returned, it should contain %q but was %q", test.err, err) + } + }) + } + }) }