diff --git a/cmd/restic/cmd_repair_packs.go b/cmd/restic/cmd_repair_packs.go index 7d1a3a392..723bdbccb 100644 --- a/cmd/restic/cmd_repair_packs.go +++ b/cmd/restic/cmd_repair_packs.go @@ -116,7 +116,7 @@ func repairPacks(ctx context.Context, gopts GlobalOptions, repo *repository.Repo continue } - err = repository.StreamPack(wgCtx, repo.Backend().Load, repo.Key(), b.PackID, blobs, func(blob restic.BlobHandle, buf []byte, err error) error { + err = repo.LoadBlobsFromPack(wgCtx, b.PackID, blobs, func(blob restic.BlobHandle, buf []byte, err error) error { if err != nil { // Fallback path buf, err = repo.LoadBlob(wgCtx, blob.Type, blob.ID, nil) diff --git a/internal/repository/repack.go b/internal/repository/repack.go index c82e63f28..5588984f6 100644 --- a/internal/repository/repack.go +++ b/internal/repository/repack.go @@ -77,7 +77,7 @@ func repack(ctx context.Context, repo restic.Repository, dstRepo restic.Reposito worker := func() error { for t := range downloadQueue { - err := StreamPack(wgCtx, repo.Backend().Load, repo.Key(), t.PackID, t.Blobs, func(blob restic.BlobHandle, buf []byte, err error) error { + err := repo.LoadBlobsFromPack(wgCtx, t.PackID, t.Blobs, func(blob restic.BlobHandle, buf []byte, err error) error { if err != nil { var ierr error // check whether we can get a valid copy somewhere else diff --git a/internal/repository/repository.go b/internal/repository/repository.go index e13220741..407b6429c 100644 --- a/internal/repository/repository.go +++ b/internal/repository/repository.go @@ -875,16 +875,20 @@ func (r *Repository) SaveBlob(ctx context.Context, t restic.BlobType, buf []byte return newID, known, size, err } -type BackendLoadFn func(ctx context.Context, h backend.Handle, length int, offset int64, fn func(rd io.Reader) error) error +type backendLoadFn func(ctx context.Context, h backend.Handle, length int, offset int64, fn func(rd io.Reader) error) error // Skip sections with more than 4MB unused blobs const maxUnusedRange = 4 * 1024 * 1024 -// StreamPack loads the listed blobs from the specified pack file. The plaintext blob is passed to +// LoadBlobsFromPack loads the listed blobs from the specified pack file. The plaintext blob is passed to // the handleBlobFn callback or an error if decryption failed or the blob hash does not match. // handleBlobFn is called at most once for each blob. If the callback returns an error, -// then StreamPack will abort and not retry it. -func StreamPack(ctx context.Context, beLoad BackendLoadFn, key *crypto.Key, packID restic.ID, blobs []restic.Blob, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error { +// then LoadBlobsFromPack will abort and not retry it. +func (r *Repository) LoadBlobsFromPack(ctx context.Context, packID restic.ID, blobs []restic.Blob, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error { + return streamPack(ctx, r.Backend().Load, r.key, packID, blobs, handleBlobFn) +} + +func streamPack(ctx context.Context, beLoad backendLoadFn, key *crypto.Key, packID restic.ID, blobs []restic.Blob, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error { if len(blobs) == 0 { // nothing to do return nil @@ -915,7 +919,7 @@ func StreamPack(ctx context.Context, beLoad BackendLoadFn, key *crypto.Key, pack return streamPackPart(ctx, beLoad, key, packID, blobs[lowerIdx:], handleBlobFn) } -func streamPackPart(ctx context.Context, beLoad BackendLoadFn, key *crypto.Key, packID restic.ID, blobs []restic.Blob, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error { +func streamPackPart(ctx context.Context, beLoad backendLoadFn, key *crypto.Key, packID restic.ID, blobs []restic.Blob, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error { h := backend.Handle{Type: restic.PackFile, Name: packID.String(), IsMetadata: false} dataStart := blobs[0].Offset diff --git a/internal/repository/repository_internal_test.go b/internal/repository/repository_internal_test.go index fc408910c..eed99c7e0 100644 --- a/internal/repository/repository_internal_test.go +++ b/internal/repository/repository_internal_test.go @@ -276,7 +276,7 @@ func testStreamPack(t *testing.T, version uint) { loadCalls = 0 shortFirstLoad = test.shortFirstLoad - err = StreamPack(ctx, load, &key, restic.ID{}, test.blobs, handleBlob) + err = streamPack(ctx, load, &key, restic.ID{}, test.blobs, handleBlob) if err != nil { t.Fatal(err) } @@ -339,7 +339,7 @@ func testStreamPack(t *testing.T, version uint) { return err } - err = StreamPack(ctx, load, &key, restic.ID{}, test.blobs, handleBlob) + err = streamPack(ctx, load, &key, restic.ID{}, test.blobs, handleBlob) if err == nil { t.Fatalf("wanted error %v, got nil", test.err) } diff --git a/internal/restic/repository.go b/internal/restic/repository.go index 895c930dd..6818847c0 100644 --- a/internal/restic/repository.go +++ b/internal/restic/repository.go @@ -44,6 +44,7 @@ type Repository interface { ListPack(context.Context, ID, int64) ([]Blob, uint32, error) LoadBlob(context.Context, BlobType, ID, []byte) ([]byte, error) + LoadBlobsFromPack(ctx context.Context, packID ID, blobs []Blob, handleBlobFn func(blob BlobHandle, buf []byte, err error) error) error SaveBlob(context.Context, BlobType, []byte, ID, bool) (ID, bool, int, error) // StartPackUploader start goroutines to upload new pack files. The errgroup diff --git a/internal/restorer/filerestorer.go b/internal/restorer/filerestorer.go index 99a460321..f2c134ea9 100644 --- a/internal/restorer/filerestorer.go +++ b/internal/restorer/filerestorer.go @@ -7,7 +7,6 @@ import ( "golang.org/x/sync/errgroup" - "github.com/restic/restic/internal/crypto" "github.com/restic/restic/internal/debug" "github.com/restic/restic/internal/errors" "github.com/restic/restic/internal/repository" @@ -45,11 +44,12 @@ type packInfo struct { files map[*fileInfo]struct{} // set of files that use blobs from this pack } +type blobsLoaderFn func(ctx context.Context, packID restic.ID, blobs []restic.Blob, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error + // fileRestorer restores set of files type fileRestorer struct { - key *crypto.Key - idx func(restic.BlobHandle) []restic.PackedBlob - packLoader repository.BackendLoadFn + idx func(restic.BlobHandle) []restic.PackedBlob + blobsLoader blobsLoaderFn workerCount int filesWriter *filesWriter @@ -63,8 +63,7 @@ type fileRestorer struct { } func newFileRestorer(dst string, - packLoader repository.BackendLoadFn, - key *crypto.Key, + blobsLoader blobsLoaderFn, idx func(restic.BlobHandle) []restic.PackedBlob, connections uint, sparse bool, @@ -74,9 +73,8 @@ func newFileRestorer(dst string, workerCount := int(connections) return &fileRestorer{ - key: key, idx: idx, - packLoader: packLoader, + blobsLoader: blobsLoader, filesWriter: newFilesWriter(workerCount), zeroChunk: repository.ZeroChunk(), sparse: sparse, @@ -310,7 +308,7 @@ func (r *fileRestorer) downloadBlobs(ctx context.Context, packID restic.ID, for _, entry := range blobs { blobList = append(blobList, entry.blob) } - return repository.StreamPack(ctx, r.packLoader, r.key, packID, blobList, + return r.blobsLoader(ctx, packID, blobList, func(h restic.BlobHandle, blobData []byte, err error) error { processedBlobs.Insert(h) blob := blobs[h.ID] diff --git a/internal/restorer/filerestorer_test.go b/internal/restorer/filerestorer_test.go index c5bc3fe31..befeb5d2c 100644 --- a/internal/restorer/filerestorer_test.go +++ b/internal/restorer/filerestorer_test.go @@ -4,14 +4,11 @@ import ( "bytes" "context" "fmt" - "io" "os" + "sort" "testing" - "github.com/restic/restic/internal/backend" - "github.com/restic/restic/internal/crypto" "github.com/restic/restic/internal/errors" - "github.com/restic/restic/internal/repository" "github.com/restic/restic/internal/restic" rtest "github.com/restic/restic/internal/test" ) @@ -27,11 +24,6 @@ type TestFile struct { } type TestRepo struct { - key *crypto.Key - - // pack names and ids - packsNameToID map[string]restic.ID - packsIDToName map[restic.ID]string packsIDToData map[restic.ID][]byte // blobs and files @@ -40,7 +32,7 @@ type TestRepo struct { filesPathToContent map[string]string // - loader repository.BackendLoadFn + loader blobsLoaderFn } func (i *TestRepo) Lookup(bh restic.BlobHandle) []restic.PackedBlob { @@ -59,16 +51,6 @@ func newTestRepo(content []TestFile) *TestRepo { blobs map[restic.ID]restic.Blob } packs := make(map[string]Pack) - - key := crypto.NewRandomKey() - seal := func(data []byte) []byte { - ciphertext := crypto.NewBlobBuffer(len(data)) - ciphertext = ciphertext[:0] // truncate the slice - nonce := crypto.NewRandomNonce() - ciphertext = append(ciphertext, nonce...) - return key.Seal(ciphertext, nonce, data, nil) - } - filesPathToContent := make(map[string]string) for _, file := range content { @@ -86,14 +68,15 @@ func newTestRepo(content []TestFile) *TestRepo { // calculate blob id and add to the pack as necessary blobID := restic.Hash([]byte(blob.data)) if _, found := pack.blobs[blobID]; !found { - blobData := seal([]byte(blob.data)) + blobData := []byte(blob.data) pack.blobs[blobID] = restic.Blob{ BlobHandle: restic.BlobHandle{ Type: restic.DataBlob, ID: blobID, }, - Length: uint(len(blobData)), - Offset: uint(len(pack.data)), + Length: uint(len(blobData)), + UncompressedLength: uint(len(blobData)), + Offset: uint(len(pack.data)), } pack.data = append(pack.data, blobData...) } @@ -104,15 +87,11 @@ func newTestRepo(content []TestFile) *TestRepo { } blobs := make(map[restic.ID][]restic.PackedBlob) - packsIDToName := make(map[restic.ID]string) packsIDToData := make(map[restic.ID][]byte) - packsNameToID := make(map[string]restic.ID) for _, pack := range packs { packID := restic.Hash(pack.data) - packsIDToName[packID] = pack.name packsIDToData[packID] = pack.data - packsNameToID[pack.name] = packID for blobID, blob := range pack.blobs { blobs[blobID] = append(blobs[blobID], restic.PackedBlob{Blob: blob, PackID: packID}) } @@ -128,30 +107,44 @@ func newTestRepo(content []TestFile) *TestRepo { } repo := &TestRepo{ - key: key, - packsIDToName: packsIDToName, packsIDToData: packsIDToData, - packsNameToID: packsNameToID, blobs: blobs, files: files, filesPathToContent: filesPathToContent, } - repo.loader = func(ctx context.Context, h backend.Handle, length int, offset int64, fn func(rd io.Reader) error) error { - packID, err := restic.ParseID(h.Name) - if err != nil { - return err + repo.loader = func(ctx context.Context, packID restic.ID, blobs []restic.Blob, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error { + blobs = append([]restic.Blob{}, blobs...) + sort.Slice(blobs, func(i, j int) bool { + return blobs[i].Offset < blobs[j].Offset + }) + + for _, blob := range blobs { + found := false + for _, e := range repo.blobs[blob.ID] { + if packID == e.PackID { + found = true + buf := repo.packsIDToData[packID][e.Offset : e.Offset+e.Length] + err := handleBlobFn(e.BlobHandle, buf, nil) + if err != nil { + return err + } + } + } + if !found { + return fmt.Errorf("missing blob: %v", blob) + } } - rd := bytes.NewReader(repo.packsIDToData[packID][int(offset) : int(offset)+length]) - return fn(rd) + return nil } return repo } func restoreAndVerify(t *testing.T, tempdir string, content []TestFile, files map[string]bool, sparse bool) { + t.Helper() repo := newTestRepo(content) - r := newFileRestorer(tempdir, repo.loader, repo.key, repo.Lookup, 2, sparse, nil) + r := newFileRestorer(tempdir, repo.loader, repo.Lookup, 2, sparse, nil) if files == nil { r.files = repo.files @@ -170,6 +163,7 @@ func restoreAndVerify(t *testing.T, tempdir string, content []TestFile, files ma } func verifyRestore(t *testing.T, r *fileRestorer, repo *TestRepo) { + t.Helper() for _, file := range r.files { target := r.targetPath(file.location) data, err := os.ReadFile(target) @@ -283,62 +277,17 @@ func TestErrorRestoreFiles(t *testing.T) { loadError := errors.New("load error") // loader always returns an error - repo.loader = func(ctx context.Context, h backend.Handle, length int, offset int64, fn func(rd io.Reader) error) error { + repo.loader = func(ctx context.Context, packID restic.ID, blobs []restic.Blob, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error { return loadError } - r := newFileRestorer(tempdir, repo.loader, repo.key, repo.Lookup, 2, false, nil) + r := newFileRestorer(tempdir, repo.loader, repo.Lookup, 2, false, nil) r.files = repo.files err := r.restoreFiles(context.TODO()) rtest.Assert(t, errors.Is(err, loadError), "got %v, expected contained error %v", err, loadError) } -func TestDownloadError(t *testing.T) { - for i := 0; i < 100; i += 10 { - testPartialDownloadError(t, i) - } -} - -func testPartialDownloadError(t *testing.T, part int) { - tempdir := rtest.TempDir(t) - content := []TestFile{ - { - name: "file1", - blobs: []TestBlob{ - {"data1-1", "pack1"}, - {"data1-2", "pack1"}, - {"data1-3", "pack1"}, - }, - }} - - repo := newTestRepo(content) - - // loader always returns an error - loader := repo.loader - repo.loader = func(ctx context.Context, h backend.Handle, length int, offset int64, fn func(rd io.Reader) error) error { - // only load partial data to exercise fault handling in different places - err := loader(ctx, h, length*part/100, offset, fn) - if err == nil { - return nil - } - fmt.Println("Retry after error", err) - return loader(ctx, h, length, offset, fn) - } - - r := newFileRestorer(tempdir, repo.loader, repo.key, repo.Lookup, 2, false, nil) - r.files = repo.files - r.Error = func(s string, e error) error { - // ignore errors as in the `restore` command - fmt.Println("error during restore", s, e) - return nil - } - - err := r.restoreFiles(context.TODO()) - rtest.OK(t, err) - verifyRestore(t, r, repo) -} - func TestFatalDownloadError(t *testing.T) { tempdir := rtest.TempDir(t) content := []TestFile{ @@ -361,12 +310,19 @@ func TestFatalDownloadError(t *testing.T) { repo := newTestRepo(content) loader := repo.loader - repo.loader = func(ctx context.Context, h backend.Handle, length int, offset int64, fn func(rd io.Reader) error) error { - // only return half the data to break file2 - return loader(ctx, h, length/2, offset, fn) + repo.loader = func(ctx context.Context, packID restic.ID, blobs []restic.Blob, handleBlobFn func(blob restic.BlobHandle, buf []byte, err error) error) error { + ctr := 0 + return loader(ctx, packID, blobs, func(blob restic.BlobHandle, buf []byte, err error) error { + if ctr < 2 { + ctr++ + return handleBlobFn(blob, buf, err) + } + // break file2 + return errors.New("failed to load blob") + }) } - r := newFileRestorer(tempdir, repo.loader, repo.key, repo.Lookup, 2, false, nil) + r := newFileRestorer(tempdir, repo.loader, repo.Lookup, 2, false, nil) r.files = repo.files var errors []string diff --git a/internal/restorer/restorer.go b/internal/restorer/restorer.go index e973316c0..2ce1ee98e 100644 --- a/internal/restorer/restorer.go +++ b/internal/restorer/restorer.go @@ -231,7 +231,7 @@ func (res *Restorer) RestoreTo(ctx context.Context, dst string) error { } idx := NewHardlinkIndex[string]() - filerestorer := newFileRestorer(dst, res.repo.Backend().Load, res.repo.Key(), res.repo.Index().Lookup, + filerestorer := newFileRestorer(dst, res.repo.LoadBlobsFromPack, res.repo.Index().Lookup, res.repo.Connections(), res.sparse, res.progress) filerestorer.Error = res.Error