diff --git a/changelog/unreleased/issue-3806 b/changelog/unreleased/issue-3806 new file mode 100644 index 000000000..d3ae9b507 --- /dev/null +++ b/changelog/unreleased/issue-3806 @@ -0,0 +1,11 @@ +Enhancement: Make `prune` command resumable + +When `prune` was interrupted, it a latter `prune` run previously started repacking +the pack files from the start as `prune` did not update the index while repacking. + +The `prune` command now supports resuming interrupted prune runs. The update +of the repository index also has been optimized to use less memory and only +rewrite parts of the index that have changed. + +https://github.com/restic/restic/issues/3806 +https://github.com/restic/restic/pull/4812 diff --git a/cmd/restic/cmd_cat.go b/cmd/restic/cmd_cat.go index 8d11a9dc4..23205771a 100644 --- a/cmd/restic/cmd_cat.go +++ b/cmd/restic/cmd_cat.go @@ -167,8 +167,7 @@ func runCat(ctx context.Context, gopts GlobalOptions, args []string) error { } for _, t := range []restic.BlobType{restic.DataBlob, restic.TreeBlob} { - bh := restic.BlobHandle{ID: id, Type: t} - if !repo.Index().Has(bh) { + if _, ok := repo.LookupBlobSize(t, id); !ok { continue } diff --git a/cmd/restic/cmd_copy.go b/cmd/restic/cmd_copy.go index ad6c58a25..d12501dd9 100644 --- a/cmd/restic/cmd_copy.go +++ b/cmd/restic/cmd_copy.go @@ -187,7 +187,7 @@ func copyTree(ctx context.Context, srcRepo restic.Repository, dstRepo restic.Rep packList := restic.NewIDSet() enqueue := func(h restic.BlobHandle) { - pb := srcRepo.Index().Lookup(h) + pb := srcRepo.LookupBlob(h.Type, h.ID) copyBlobs.Insert(h) for _, p := range pb { packList.Insert(p.PackID) @@ -202,7 +202,7 @@ func copyTree(ctx context.Context, srcRepo restic.Repository, dstRepo restic.Rep // Do we already have this tree blob? treeHandle := restic.BlobHandle{ID: tree.ID, Type: restic.TreeBlob} - if !dstRepo.Index().Has(treeHandle) { + if _, ok := dstRepo.LookupBlobSize(treeHandle.Type, treeHandle.ID); !ok { // copy raw tree bytes to avoid problems if the serialization changes enqueue(treeHandle) } @@ -212,7 +212,7 @@ func copyTree(ctx context.Context, srcRepo restic.Repository, dstRepo restic.Rep // Copy the blobs for this file. for _, blobID := range entry.Content { h := restic.BlobHandle{Type: restic.DataBlob, ID: blobID} - if !dstRepo.Index().Has(h) { + if _, ok := dstRepo.LookupBlobSize(h.Type, h.ID); !ok { enqueue(h) } } diff --git a/cmd/restic/cmd_debug.go b/cmd/restic/cmd_debug.go index 9fb6969d0..7b0cdb53e 100644 --- a/cmd/restic/cmd_debug.go +++ b/cmd/restic/cmd_debug.go @@ -492,7 +492,7 @@ func examinePack(ctx context.Context, opts DebugExamineOptions, repo restic.Repo blobsLoaded := false // examine all data the indexes have for the pack file - for b := range repo.Index().ListPacks(ctx, restic.NewIDSet(id)) { + for b := range repo.ListPacksFromIndex(ctx, restic.NewIDSet(id)) { blobs := b.Blobs if len(blobs) == 0 { continue diff --git a/cmd/restic/cmd_diff.go b/cmd/restic/cmd_diff.go index b156191dc..28c742625 100644 --- a/cmd/restic/cmd_diff.go +++ b/cmd/restic/cmd_diff.go @@ -156,7 +156,7 @@ func updateBlobs(repo restic.Loader, blobs restic.BlobSet, stats *DiffStat) { stats.TreeBlobs++ } - size, found := repo.LookupBlobSize(h.ID, h.Type) + size, found := repo.LookupBlobSize(h.Type, h.ID) if !found { Warnf("unable to find blob size for %v\n", h) continue diff --git a/cmd/restic/cmd_find.go b/cmd/restic/cmd_find.go index 81df0ab98..59e34c468 100644 --- a/cmd/restic/cmd_find.go +++ b/cmd/restic/cmd_find.go @@ -465,7 +465,7 @@ func (f *Finder) indexPacksToBlobs(ctx context.Context, packIDs map[string]struc // remember which packs were found in the index indexPackIDs := make(map[string]struct{}) - err := f.repo.Index().Each(wctx, func(pb restic.PackedBlob) { + err := f.repo.ListBlobs(wctx, func(pb restic.PackedBlob) { idStr := pb.PackID.String() // keep entry in packIDs as Each() returns individual index entries matchingID := false @@ -503,15 +503,13 @@ func (f *Finder) indexPacksToBlobs(ctx context.Context, packIDs map[string]struc } func (f *Finder) findObjectPack(id string, t restic.BlobType) { - idx := f.repo.Index() - rid, err := restic.ParseID(id) if err != nil { Printf("Note: cannot find pack for object '%s', unable to parse ID: %v\n", id, err) return } - blobs := idx.Lookup(restic.BlobHandle{ID: rid, Type: t}) + blobs := f.repo.LookupBlob(t, rid) if len(blobs) == 0 { Printf("Object %s not found in the index\n", rid.Str()) return diff --git a/cmd/restic/cmd_prune.go b/cmd/restic/cmd_prune.go index d62a7df75..7872589be 100644 --- a/cmd/restic/cmd_prune.go +++ b/cmd/restic/cmd_prune.go @@ -162,9 +162,6 @@ func runPrune(ctx context.Context, opts PruneOptions, gopts GlobalOptions, term } func runPruneWithRepo(ctx context.Context, opts PruneOptions, gopts GlobalOptions, repo *repository.Repository, ignoreSnapshots restic.IDSet, term *termstatus.Terminal) error { - // we do not need index updates while pruning! - repo.DisableAutoIndexUpdate() - if repo.Cache == nil { Print("warning: running prune without a cache, this may be very slow!\n") } diff --git a/cmd/restic/cmd_recover.go b/cmd/restic/cmd_recover.go index debaa4e5b..726f1bf65 100644 --- a/cmd/restic/cmd_recover.go +++ b/cmd/restic/cmd_recover.go @@ -61,7 +61,7 @@ func runRecover(ctx context.Context, gopts GlobalOptions) error { // tree. If it is not referenced, we have a root tree. trees := make(map[restic.ID]bool) - err = repo.Index().Each(ctx, func(blob restic.PackedBlob) { + err = repo.ListBlobs(ctx, func(blob restic.PackedBlob) { if blob.Type == restic.TreeBlob { trees[blob.Blob.ID] = false } diff --git a/cmd/restic/cmd_repair_snapshots.go b/cmd/restic/cmd_repair_snapshots.go index b200d100a..be5ef4ad9 100644 --- a/cmd/restic/cmd_repair_snapshots.go +++ b/cmd/restic/cmd_repair_snapshots.go @@ -97,7 +97,7 @@ func runRepairSnapshots(ctx context.Context, gopts GlobalOptions, opts RepairOpt var newSize uint64 // check all contents and remove if not available for _, id := range node.Content { - if size, found := repo.LookupBlobSize(id, restic.DataBlob); !found { + if size, found := repo.LookupBlobSize(restic.DataBlob, id); !found { ok = false } else { newContent = append(newContent, id) diff --git a/cmd/restic/cmd_stats.go b/cmd/restic/cmd_stats.go index a7891e5b0..0f8e45f36 100644 --- a/cmd/restic/cmd_stats.go +++ b/cmd/restic/cmd_stats.go @@ -124,7 +124,7 @@ func runStats(ctx context.Context, opts StatsOptions, gopts GlobalOptions, args if opts.countMode == countModeRawData { // the blob handles have been collected, but not yet counted for blobHandle := range stats.blobs { - pbs := repo.Index().Lookup(blobHandle) + pbs := repo.LookupBlob(blobHandle.Type, blobHandle.ID) if len(pbs) == 0 { return fmt.Errorf("blob %v not found", blobHandle) } @@ -238,7 +238,7 @@ func statsWalkTree(repo restic.Loader, opts StatsOptions, stats *statsContainer, } if _, ok := stats.fileBlobs[nodePath][blobID]; !ok { // is always a data blob since we're accessing it via a file's Content array - blobSize, found := repo.LookupBlobSize(blobID, restic.DataBlob) + blobSize, found := repo.LookupBlobSize(restic.DataBlob, blobID) if !found { return fmt.Errorf("blob %s not found for tree %s", blobID, parentTreeID) } @@ -378,7 +378,7 @@ func statsDebugBlobs(ctx context.Context, repo restic.Repository) ([restic.NumBl hist[i] = newSizeHistogram(2 * chunker.MaxSize) } - err := repo.Index().Each(ctx, func(pb restic.PackedBlob) { + err := repo.ListBlobs(ctx, func(pb restic.PackedBlob) { hist[pb.Type].Add(uint64(pb.Length)) }) diff --git a/cmd/restic/integration_helpers_test.go b/cmd/restic/integration_helpers_test.go index 2812eda6d..978deab3d 100644 --- a/cmd/restic/integration_helpers_test.go +++ b/cmd/restic/integration_helpers_test.go @@ -252,7 +252,7 @@ func listTreePacks(gopts GlobalOptions, t *testing.T) restic.IDSet { rtest.OK(t, r.LoadIndex(ctx, nil)) treePacks := restic.NewIDSet() - rtest.OK(t, r.Index().Each(ctx, func(pb restic.PackedBlob) { + rtest.OK(t, r.ListBlobs(ctx, func(pb restic.PackedBlob) { if pb.Type == restic.TreeBlob { treePacks.Insert(pb.PackID) } @@ -280,7 +280,7 @@ func removePacksExcept(gopts GlobalOptions, t testing.TB, keep restic.IDSet, rem rtest.OK(t, r.LoadIndex(ctx, nil)) treePacks := restic.NewIDSet() - rtest.OK(t, r.Index().Each(ctx, func(pb restic.PackedBlob) { + rtest.OK(t, r.ListBlobs(ctx, func(pb restic.PackedBlob) { if pb.Type == restic.TreeBlob { treePacks.Insert(pb.PackID) } diff --git a/internal/archiver/archiver.go b/internal/archiver/archiver.go index c1f73eea6..86b329a9a 100644 --- a/internal/archiver/archiver.go +++ b/internal/archiver/archiver.go @@ -64,9 +64,19 @@ func (s *ItemStats) Add(other ItemStats) { s.TreeSizeInRepo += other.TreeSizeInRepo } +type archiverRepo interface { + restic.Loader + restic.BlobSaver + restic.SaverUnpacked + + Config() restic.Config + StartPackUploader(ctx context.Context, wg *errgroup.Group) + Flush(ctx context.Context) error +} + // Archiver saves a directory structure to the repo. type Archiver struct { - Repo restic.Repository + Repo archiverRepo SelectByName SelectByNameFunc Select SelectFunc FS fs.FS @@ -160,7 +170,7 @@ func (o Options) ApplyDefaults() Options { } // New initializes a new archiver. -func New(repo restic.Repository, fs fs.FS, opts Options) *Archiver { +func New(repo archiverRepo, fs fs.FS, opts Options) *Archiver { arch := &Archiver{ Repo: repo, SelectByName: func(_ string) bool { return true }, @@ -276,7 +286,7 @@ func (arch *Archiver) loadSubtree(ctx context.Context, node *restic.Node) (*rest } func (arch *Archiver) wrapLoadTreeError(id restic.ID, err error) error { - if arch.Repo.Index().Has(restic.BlobHandle{ID: id, Type: restic.TreeBlob}) { + if _, ok := arch.Repo.LookupBlobSize(restic.TreeBlob, id); ok { err = errors.Errorf("tree %v could not be loaded; the repository could be damaged: %v", id, err) } else { err = errors.Errorf("tree %v is not known; the repository could be damaged, run `repair index` to try to repair it", id) @@ -390,7 +400,7 @@ func (fn *FutureNode) take(ctx context.Context) futureNodeResult { func (arch *Archiver) allBlobsPresent(previous *restic.Node) bool { // check if all blobs are contained in index for _, id := range previous.Content { - if !arch.Repo.Index().Has(restic.BlobHandle{ID: id, Type: restic.DataBlob}) { + if _, ok := arch.Repo.LookupBlobSize(restic.DataBlob, id); !ok { return false } } diff --git a/internal/archiver/archiver_test.go b/internal/archiver/archiver_test.go index 8d0c2c02f..f38d5b0de 100644 --- a/internal/archiver/archiver_test.go +++ b/internal/archiver/archiver_test.go @@ -36,7 +36,7 @@ func prepareTempdirRepoSrc(t testing.TB, src TestDir) (string, restic.Repository return tempdir, repo } -func saveFile(t testing.TB, repo restic.Repository, filename string, filesystem fs.FS) (*restic.Node, ItemStats) { +func saveFile(t testing.TB, repo archiverRepo, filename string, filesystem fs.FS) (*restic.Node, ItemStats) { wg, ctx := errgroup.WithContext(context.TODO()) repo.StartPackUploader(ctx, wg) @@ -416,14 +416,14 @@ func BenchmarkArchiverSaveFileLarge(b *testing.B) { } type blobCountingRepo struct { - restic.Repository + archiverRepo m sync.Mutex saved map[restic.BlobHandle]uint } func (repo *blobCountingRepo) SaveBlob(ctx context.Context, t restic.BlobType, buf []byte, id restic.ID, storeDuplicate bool) (restic.ID, bool, int, error) { - id, exists, size, err := repo.Repository.SaveBlob(ctx, t, buf, id, storeDuplicate) + id, exists, size, err := repo.archiverRepo.SaveBlob(ctx, t, buf, id, storeDuplicate) if exists { return id, exists, size, err } @@ -435,7 +435,7 @@ func (repo *blobCountingRepo) SaveBlob(ctx context.Context, t restic.BlobType, b } func (repo *blobCountingRepo) SaveTree(ctx context.Context, t *restic.Tree) (restic.ID, error) { - id, err := restic.SaveTree(ctx, repo.Repository, t) + id, err := restic.SaveTree(ctx, repo.archiverRepo, t) h := restic.BlobHandle{ID: id, Type: restic.TreeBlob} repo.m.Lock() repo.saved[h]++ @@ -465,8 +465,8 @@ func TestArchiverSaveFileIncremental(t *testing.T) { tempdir := rtest.TempDir(t) repo := &blobCountingRepo{ - Repository: repository.TestRepository(t), - saved: make(map[restic.BlobHandle]uint), + archiverRepo: repository.TestRepository(t), + saved: make(map[restic.BlobHandle]uint), } data := rtest.Random(23, 512*1024+887898) @@ -902,8 +902,8 @@ func TestArchiverSaveDirIncremental(t *testing.T) { tempdir := rtest.TempDir(t) repo := &blobCountingRepo{ - Repository: repository.TestRepository(t), - saved: make(map[restic.BlobHandle]uint), + archiverRepo: repository.TestRepository(t), + saved: make(map[restic.BlobHandle]uint), } appendToFile(t, filepath.Join(tempdir, "testfile"), []byte("foobar")) @@ -2017,7 +2017,7 @@ func (m *TrackFS) OpenFile(name string, flag int, perm os.FileMode) (fs.File, er } type failSaveRepo struct { - restic.Repository + archiverRepo failAfter int32 cnt int32 err error @@ -2029,7 +2029,7 @@ func (f *failSaveRepo) SaveBlob(ctx context.Context, t restic.BlobType, buf []by return restic.Hash(buf), false, 0, f.err } - return f.Repository.SaveBlob(ctx, t, buf, id, storeDuplicate) + return f.archiverRepo.SaveBlob(ctx, t, buf, id, storeDuplicate) } func TestArchiverAbortEarlyOnError(t *testing.T) { @@ -2105,9 +2105,9 @@ func TestArchiverAbortEarlyOnError(t *testing.T) { } testRepo := &failSaveRepo{ - Repository: repo, - failAfter: int32(test.failAfter), - err: test.err, + archiverRepo: repo, + failAfter: int32(test.failAfter), + err: test.err, } // at most two files may be queued @@ -2134,7 +2134,7 @@ func TestArchiverAbortEarlyOnError(t *testing.T) { } } -func snapshot(t testing.TB, repo restic.Repository, fs fs.FS, parent *restic.Snapshot, filename string) (*restic.Snapshot, *restic.Node) { +func snapshot(t testing.TB, repo archiverRepo, fs fs.FS, parent *restic.Snapshot, filename string) (*restic.Snapshot, *restic.Node) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() diff --git a/internal/archiver/archiver_unix_test.go b/internal/archiver/archiver_unix_test.go index a6b1aad2e..4a380dff8 100644 --- a/internal/archiver/archiver_unix_test.go +++ b/internal/archiver/archiver_unix_test.go @@ -46,7 +46,7 @@ func wrapFileInfo(fi os.FileInfo) os.FileInfo { return res } -func statAndSnapshot(t *testing.T, repo restic.Repository, name string) (*restic.Node, *restic.Node) { +func statAndSnapshot(t *testing.T, repo archiverRepo, name string) (*restic.Node, *restic.Node) { fi := lstat(t, name) want, err := restic.NodeFromFileInfo(name, fi, false) rtest.OK(t, err) diff --git a/internal/archiver/blob_saver_test.go b/internal/archiver/blob_saver_test.go index 180f95b3d..f7ef2f47d 100644 --- a/internal/archiver/blob_saver_test.go +++ b/internal/archiver/blob_saver_test.go @@ -10,7 +10,6 @@ import ( "testing" "github.com/restic/restic/internal/errors" - "github.com/restic/restic/internal/index" "github.com/restic/restic/internal/restic" rtest "github.com/restic/restic/internal/test" "golang.org/x/sync/errgroup" @@ -19,7 +18,6 @@ import ( var errTest = errors.New("test error") type saveFail struct { - idx restic.MasterIndex cnt int32 failAt int32 } @@ -33,18 +31,12 @@ func (b *saveFail) SaveBlob(_ context.Context, _ restic.BlobType, _ []byte, id r return id, false, 0, nil } -func (b *saveFail) Index() restic.MasterIndex { - return b.idx -} - func TestBlobSaver(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() wg, ctx := errgroup.WithContext(ctx) - saver := &saveFail{ - idx: index.NewMasterIndex(), - } + saver := &saveFail{} b := NewBlobSaver(ctx, wg, saver, uint(runtime.NumCPU())) @@ -100,7 +92,6 @@ func TestBlobSaverError(t *testing.T) { wg, ctx := errgroup.WithContext(ctx) saver := &saveFail{ - idx: index.NewMasterIndex(), failAt: int32(test.failAt), } diff --git a/internal/archiver/testing.go b/internal/archiver/testing.go index a186a4ee5..106e68445 100644 --- a/internal/archiver/testing.go +++ b/internal/archiver/testing.go @@ -11,7 +11,6 @@ import ( "testing" "time" - "github.com/restic/restic/internal/crypto" "github.com/restic/restic/internal/debug" "github.com/restic/restic/internal/fs" "github.com/restic/restic/internal/restic" @@ -26,7 +25,7 @@ func TestSnapshot(t testing.TB, repo restic.Repository, path string, parent *res Tags: []string{"test"}, } if parent != nil { - sn, err := restic.LoadSnapshot(context.TODO(), arch.Repo, *parent) + sn, err := restic.LoadSnapshot(context.TODO(), repo, *parent) if err != nil { t.Fatal(err) } @@ -239,7 +238,7 @@ func TestEnsureFileContent(ctx context.Context, t testing.TB, repo restic.BlobLo return } - content := make([]byte, crypto.CiphertextLength(len(file.Content))) + content := make([]byte, len(file.Content)) pos := 0 for _, id := range node.Content { part, err := repo.LoadBlob(ctx, restic.DataBlob, id, content[pos:]) diff --git a/internal/backend/watchdog_roundtriper_test.go b/internal/backend/watchdog_roundtriper_test.go index a13d670e0..b1f589bc0 100644 --- a/internal/backend/watchdog_roundtriper_test.go +++ b/internal/backend/watchdog_roundtriper_test.go @@ -64,7 +64,7 @@ func TestRoundtrip(t *testing.T) { })) defer srv.Close() - rt := newWatchdogRoundtripper(http.DefaultTransport, 50*time.Millisecond, 2) + rt := newWatchdogRoundtripper(http.DefaultTransport, 100*time.Millisecond, 2) req, err := http.NewRequestWithContext(context.TODO(), "GET", srv.URL, io.NopCloser(newSlowReader(bytes.NewReader(msg), time.Duration(delay)*time.Millisecond))) rtest.OK(t, err) diff --git a/internal/checker/checker.go b/internal/checker/checker.go index dc83aef5b..61c017414 100644 --- a/internal/checker/checker.go +++ b/internal/checker/checker.go @@ -91,9 +91,9 @@ func (c *Checker) LoadSnapshots(ctx context.Context) error { return err } -func computePackTypes(ctx context.Context, idx restic.MasterIndex) (map[restic.ID]restic.BlobType, error) { +func computePackTypes(ctx context.Context, idx restic.ListBlobser) (map[restic.ID]restic.BlobType, error) { packs := make(map[restic.ID]restic.BlobType) - err := idx.Each(ctx, func(pb restic.PackedBlob) { + err := idx.ListBlobs(ctx, func(pb restic.PackedBlob) { tpe, exists := packs[pb.PackID] if exists { if pb.Type != tpe { @@ -111,33 +111,10 @@ func computePackTypes(ctx context.Context, idx restic.MasterIndex) (map[restic.I func (c *Checker) LoadIndex(ctx context.Context, p *progress.Counter) (hints []error, errs []error) { debug.Log("Start") - indexList, err := restic.MemorizeList(ctx, c.repo, restic.IndexFile) - if err != nil { - // abort if an error occurs while listing the indexes - return hints, append(errs, err) - } - - if p != nil { - var numIndexFiles uint64 - err := indexList.List(ctx, restic.IndexFile, func(_ restic.ID, _ int64) error { - numIndexFiles++ - return nil - }) - if err != nil { - return hints, append(errs, err) - } - p.SetMax(numIndexFiles) - defer p.Done() - } - packToIndex := make(map[restic.ID]restic.IDSet) - err = index.ForAllIndexes(ctx, indexList, c.repo, func(id restic.ID, index *index.Index, oldFormat bool, err error) error { + err := c.masterIndex.Load(ctx, c.repo, p, func(id restic.ID, idx *index.Index, oldFormat bool, err error) error { debug.Log("process index %v, err %v", id, err) - if p != nil { - p.Add(1) - } - if oldFormat { debug.Log("index %v has old format", id) hints = append(hints, &ErrOldIndexFormat{id}) @@ -150,11 +127,9 @@ func (c *Checker) LoadIndex(ctx context.Context, p *progress.Counter) (hints []e return nil } - c.masterIndex.Insert(index) - debug.Log("process blobs") cnt := 0 - err = index.Each(ctx, func(blob restic.PackedBlob) { + err = idx.Each(ctx, func(blob restic.PackedBlob) { cnt++ if _, ok := packToIndex[blob.PackID]; !ok { @@ -167,22 +142,22 @@ func (c *Checker) LoadIndex(ctx context.Context, p *progress.Counter) (hints []e return err }) if err != nil { + // failed to load the index + return hints, append(errs, err) + } + + err = c.repo.SetIndex(c.masterIndex) + if err != nil { + debug.Log("SetIndex returned error: %v", err) errs = append(errs, err) } - // Merge index before computing pack sizes, as this needs removed duplicates - err = c.masterIndex.MergeFinalIndexes() - if err != nil { - // abort if an error occurs merging the indexes - return hints, append(errs, err) - } - // compute pack size using index entries - c.packs, err = pack.Size(ctx, c.masterIndex, false) + c.packs, err = pack.Size(ctx, c.repo, false) if err != nil { return hints, append(errs, err) } - packTypes, err := computePackTypes(ctx, c.masterIndex) + packTypes, err := computePackTypes(ctx, c.repo) if err != nil { return hints, append(errs, err) } @@ -203,12 +178,6 @@ func (c *Checker) LoadIndex(ctx context.Context, p *progress.Counter) (hints []e } } - err = c.repo.SetIndex(c.masterIndex) - if err != nil { - debug.Log("SetIndex returned error: %v", err) - errs = append(errs, err) - } - return hints, errs } @@ -429,7 +398,7 @@ func (c *Checker) checkTree(id restic.ID, tree *restic.Tree) (errs []error) { // unfortunately fails in some cases that are not resolvable // by users, so we omit this check, see #1887 - _, found := c.repo.LookupBlobSize(blobID, restic.DataBlob) + _, found := c.repo.LookupBlobSize(restic.DataBlob, blobID) if !found { debug.Log("tree %v references blob %v which isn't contained in index", id, blobID) errs = append(errs, &Error{TreeID: id, Err: errors.Errorf("file %q blob %v not found in index", node.Name, blobID)}) @@ -488,7 +457,7 @@ func (c *Checker) UnusedBlobs(ctx context.Context) (blobs restic.BlobHandles, er ctx, cancel := context.WithCancel(ctx) defer cancel() - err = c.repo.Index().Each(ctx, func(blob restic.PackedBlob) { + err = c.repo.ListBlobs(ctx, func(blob restic.PackedBlob) { h := restic.BlobHandle{ID: blob.ID, Type: blob.Type} if !c.blobRefs.M.Has(h) { debug.Log("blob %v not referenced", h) @@ -573,7 +542,7 @@ func (c *Checker) ReadPacks(ctx context.Context, packs map[restic.ID]int64, p *p } // push packs to ch - for pbs := range c.repo.Index().ListPacks(ctx, packSet) { + for pbs := range c.repo.ListPacksFromIndex(ctx, packSet) { size := packs[pbs.PackID] debug.Log("listed %v", pbs.PackID) select { diff --git a/internal/checker/checker_test.go b/internal/checker/checker_test.go index baec88628..1219f4e2b 100644 --- a/internal/checker/checker_test.go +++ b/internal/checker/checker_test.go @@ -461,11 +461,11 @@ func (r *delayRepository) LoadTree(ctx context.Context, id restic.ID) (*restic.T return restic.LoadTree(ctx, r.Repository, id) } -func (r *delayRepository) LookupBlobSize(id restic.ID, t restic.BlobType) (uint, bool) { +func (r *delayRepository) LookupBlobSize(t restic.BlobType, id restic.ID) (uint, bool) { if id == r.DelayTree && t == restic.DataBlob { r.Unblock() } - return r.Repository.LookupBlobSize(id, t) + return r.Repository.LookupBlobSize(t, id) } func (r *delayRepository) Unblock() { diff --git a/internal/fuse/file.go b/internal/fuse/file.go index 5190febbb..e2e0cf9a0 100644 --- a/internal/fuse/file.go +++ b/internal/fuse/file.go @@ -72,7 +72,7 @@ func (f *file) Open(_ context.Context, _ *fuse.OpenRequest, _ *fuse.OpenResponse var bytes uint64 cumsize := make([]uint64, 1+len(f.node.Content)) for i, id := range f.node.Content { - size, found := f.root.repo.LookupBlobSize(id, restic.DataBlob) + size, found := f.root.repo.LookupBlobSize(restic.DataBlob, id) if !found { return nil, errors.Errorf("id %v not found in repository", id) } diff --git a/internal/fuse/fuse_test.go b/internal/fuse/fuse_test.go index 1053d49a4..aebcb1272 100644 --- a/internal/fuse/fuse_test.go +++ b/internal/fuse/fuse_test.go @@ -89,7 +89,7 @@ func TestFuseFile(t *testing.T) { memfile []byte ) for _, id := range content { - size, found := repo.LookupBlobSize(id, restic.DataBlob) + size, found := repo.LookupBlobSize(restic.DataBlob, id) rtest.Assert(t, found, "Expected to find blob id %v", id) filesize += uint64(size) diff --git a/internal/index/index.go b/internal/index/index.go index 1c20fe38d..12ef6b18a 100644 --- a/internal/index/index.go +++ b/internal/index/index.go @@ -1,6 +1,7 @@ package index import ( + "bytes" "context" "encoding/json" "fmt" @@ -50,10 +51,9 @@ type Index struct { byType [restic.NumBlobTypes]indexMap packs restic.IDs - final bool // set to true for all indexes read from the backend ("finalized") - ids restic.IDs // set to the IDs of the contained finalized indexes - supersedes restic.IDs - created time.Time + final bool // set to true for all indexes read from the backend ("finalized") + ids restic.IDs // set to the IDs of the contained finalized indexes + created time.Time } // NewIndex returns a new index. @@ -197,25 +197,6 @@ func (idx *Index) LookupSize(bh restic.BlobHandle) (plaintextLength uint, found return uint(crypto.PlaintextLength(int(e.length))), true } -// Supersedes returns the list of indexes this index supersedes, if any. -func (idx *Index) Supersedes() restic.IDs { - return idx.supersedes -} - -// AddToSupersedes adds the ids to the list of indexes superseded by this -// index. If the index has already been finalized, an error is returned. -func (idx *Index) AddToSupersedes(ids ...restic.ID) error { - idx.m.Lock() - defer idx.m.Unlock() - - if idx.final { - return errors.New("index already finalized") - } - - idx.supersedes = append(idx.supersedes, ids...) - return nil -} - // Each passes all blobs known to the index to the callback fn. This blocks any // modification of the index. func (idx *Index) Each(ctx context.Context, fn func(restic.PackedBlob)) error { @@ -356,8 +337,8 @@ func (idx *Index) generatePackList() ([]packJSON, error) { } type jsonIndex struct { - Supersedes restic.IDs `json:"supersedes,omitempty"` - Packs []packJSON `json:"packs"` + // removed: Supersedes restic.IDs `json:"supersedes,omitempty"` + Packs []packJSON `json:"packs"` } // Encode writes the JSON serialization of the index to the writer w. @@ -373,12 +354,29 @@ func (idx *Index) Encode(w io.Writer) error { enc := json.NewEncoder(w) idxJSON := jsonIndex{ - Supersedes: idx.supersedes, - Packs: list, + Packs: list, } return enc.Encode(idxJSON) } +// SaveIndex saves an index in the repository. +func (idx *Index) SaveIndex(ctx context.Context, repo restic.SaverUnpacked) (restic.ID, error) { + buf := bytes.NewBuffer(nil) + + err := idx.Encode(buf) + if err != nil { + return restic.ID{}, err + } + + id, err := repo.SaveUnpacked(ctx, restic.IndexFile, buf.Bytes()) + ierr := idx.SetID(id) + if ierr != nil { + // logic bug + panic(ierr) + } + return id, err +} + // Finalize sets the index to final. func (idx *Index) Finalize() { debug.Log("finalizing index") @@ -433,8 +431,7 @@ func (idx *Index) Dump(w io.Writer) error { } outer := jsonIndex{ - Supersedes: idx.Supersedes(), - Packs: list, + Packs: list, } buf, err := json.MarshalIndent(outer, "", " ") @@ -495,7 +492,6 @@ func (idx *Index) merge(idx2 *Index) error { } idx.ids = append(idx.ids, idx2.ids...) - idx.supersedes = append(idx.supersedes, idx2.supersedes...) return nil } @@ -545,7 +541,6 @@ func DecodeIndex(buf []byte, id restic.ID) (idx *Index, oldFormat bool, err erro }) } } - idx.supersedes = idxJSON.Supersedes idx.ids = append(idx.ids, id) idx.final = true diff --git a/internal/index/index_parallel.go b/internal/index/index_parallel.go index d51d5930f..3d5621a2d 100644 --- a/internal/index/index_parallel.go +++ b/internal/index/index_parallel.go @@ -11,7 +11,7 @@ import ( // ForAllIndexes loads all index files in parallel and calls the given callback. // It is guaranteed that the function is not run concurrently. If the callback // returns an error, this function is cancelled and also returns that error. -func ForAllIndexes(ctx context.Context, lister restic.Lister, repo restic.ListerLoaderUnpacked, +func ForAllIndexes(ctx context.Context, lister restic.Lister, repo restic.LoaderUnpacked, fn func(id restic.ID, index *Index, oldFormat bool, err error) error) error { // decoding an index can take quite some time such that this can be both CPU- or IO-bound diff --git a/internal/index/index_test.go b/internal/index/index_test.go index bafd95c48..66cec23f6 100644 --- a/internal/index/index_test.go +++ b/internal/index/index_test.go @@ -309,8 +309,6 @@ func TestIndexUnserialize(t *testing.T) { {docExampleV1, 1}, {docExampleV2, 2}, } { - oldIdx := restic.IDs{restic.TestParseID("ed54ae36197f4745ebc4b54d10e0f623eaaaedd03013eb7ae90df881b7781452")} - idx, oldFormat, err := index.DecodeIndex(task.idxBytes, restic.NewRandomID()) rtest.OK(t, err) rtest.Assert(t, !oldFormat, "new index format recognized as old format") @@ -337,8 +335,6 @@ func TestIndexUnserialize(t *testing.T) { } } - rtest.Equals(t, oldIdx, idx.Supersedes()) - blobs := listPack(t, idx, exampleLookupTest.packID) if len(blobs) != len(exampleLookupTest.blobs) { t.Fatalf("expected %d blobs in pack, got %d", len(exampleLookupTest.blobs), len(blobs)) @@ -446,8 +442,6 @@ func TestIndexUnserializeOld(t *testing.T) { rtest.Equals(t, test.offset, blob.Offset) rtest.Equals(t, test.length, blob.Length) } - - rtest.Equals(t, 0, len(idx.Supersedes())) } func TestIndexPacks(t *testing.T) { diff --git a/internal/index/master_index.go b/internal/index/master_index.go index 21ab344d6..f9fc4505b 100644 --- a/internal/index/master_index.go +++ b/internal/index/master_index.go @@ -1,7 +1,6 @@ package index import ( - "bytes" "context" "fmt" "runtime" @@ -9,6 +8,7 @@ import ( "github.com/restic/restic/internal/debug" "github.com/restic/restic/internal/restic" + "github.com/restic/restic/internal/ui/progress" "golang.org/x/sync/errgroup" ) @@ -22,12 +22,15 @@ type MasterIndex struct { // NewMasterIndex creates a new master index. func NewMasterIndex() *MasterIndex { + mi := &MasterIndex{pendingBlobs: restic.NewBlobSet()} + mi.clear() + return mi +} + +func (mi *MasterIndex) clear() { // Always add an empty final index, such that MergeFinalIndexes can merge into this. - // Note that removing this index could lead to a race condition in the rare - // situation that only two indexes exist which are saved and merged concurrently. - idx := []*Index{NewIndex()} - idx[0].Finalize() - return &MasterIndex{idx: idx, pendingBlobs: restic.NewBlobSet()} + mi.idx = []*Index{NewIndex()} + mi.idx[0].Finalize() } func (mi *MasterIndex) MarkCompressed() { @@ -267,11 +270,236 @@ func (mi *MasterIndex) MergeFinalIndexes() error { return nil } -// Save saves all known indexes to index files, leaving out any -// packs whose ID is contained in packBlacklist from finalized indexes. -// It also removes the old index files and those listed in extraObsolete. -func (mi *MasterIndex) Save(ctx context.Context, repo restic.SaverRemoverUnpacked, excludePacks restic.IDSet, extraObsolete restic.IDs, opts restic.MasterIndexSaveOpts) error { +func (mi *MasterIndex) Load(ctx context.Context, r restic.ListerLoaderUnpacked, p *progress.Counter, cb func(id restic.ID, idx *Index, oldFormat bool, err error) error) error { + indexList, err := restic.MemorizeList(ctx, r, restic.IndexFile) + if err != nil { + return err + } + + if p != nil { + var numIndexFiles uint64 + err := indexList.List(ctx, restic.IndexFile, func(_ restic.ID, _ int64) error { + numIndexFiles++ + return nil + }) + if err != nil { + return err + } + p.SetMax(numIndexFiles) + defer p.Done() + } + + err = ForAllIndexes(ctx, indexList, r, func(id restic.ID, idx *Index, oldFormat bool, err error) error { + if p != nil { + p.Add(1) + } + if cb != nil { + err = cb(id, idx, oldFormat, err) + } + if err != nil { + return err + } + // special case to allow check to ignore index loading errors + if idx == nil { + return nil + } + mi.Insert(idx) + return nil + }) + + if err != nil { + return err + } + + return mi.MergeFinalIndexes() +} + +type MasterIndexRewriteOpts struct { + SaveProgress *progress.Counter + DeleteProgress func() *progress.Counter + DeleteReport func(id restic.ID, err error) +} + +// Rewrite removes packs whose ID is in excludePacks from all known indexes. +// It also removes the rewritten index files and those listed in extraObsolete. +// If oldIndexes is not nil, then only the indexes in this set are processed. +// This is used by repair index to only rewrite and delete the old indexes. +// +// Must not be called concurrently to any other MasterIndex operation. +func (mi *MasterIndex) Rewrite(ctx context.Context, repo restic.Unpacked, excludePacks restic.IDSet, oldIndexes restic.IDSet, extraObsolete restic.IDs, opts MasterIndexRewriteOpts) error { + for _, idx := range mi.idx { + if !idx.Final() { + panic("internal error - index must be saved before calling MasterIndex.Rewrite") + } + } + + var indexes restic.IDSet + if oldIndexes != nil { + // repair index adds new index entries for already existing pack files + // only remove the old (possibly broken) entries by only processing old indexes + indexes = oldIndexes + } else { + indexes = mi.IDs() + } + p := opts.SaveProgress + p.SetMax(uint64(len(indexes))) + + // reset state which is not necessary for Rewrite and just consumes a lot of memory + // the index state would be invalid after Rewrite completes anyways + mi.clear() + runtime.GC() + + // copy excludePacks to prevent unintended sideeffects + excludePacks = excludePacks.Clone() + debug.Log("start rebuilding index of %d indexes, excludePacks: %v", len(indexes), excludePacks) + wg, wgCtx := errgroup.WithContext(ctx) + + idxCh := make(chan restic.ID) + wg.Go(func() error { + defer close(idxCh) + for id := range indexes { + select { + case idxCh <- id: + case <-wgCtx.Done(): + return wgCtx.Err() + } + } + return nil + }) + + var rewriteWg sync.WaitGroup + type rewriteTask struct { + idx *Index + oldFormat bool + } + rewriteCh := make(chan rewriteTask) + loader := func() error { + defer rewriteWg.Done() + for id := range idxCh { + buf, err := repo.LoadUnpacked(wgCtx, restic.IndexFile, id) + if err != nil { + return fmt.Errorf("LoadUnpacked(%v): %w", id.Str(), err) + } + idx, oldFormat, err := DecodeIndex(buf, id) + if err != nil { + return err + } + + select { + case rewriteCh <- rewriteTask{idx, oldFormat}: + case <-wgCtx.Done(): + return wgCtx.Err() + } + + } + return nil + } + // loading an index can take quite some time such that this is probably CPU-bound + // the index files are probably already cached at this point + loaderCount := runtime.GOMAXPROCS(0) + // run workers on ch + for i := 0; i < loaderCount; i++ { + rewriteWg.Add(1) + wg.Go(loader) + } + wg.Go(func() error { + rewriteWg.Wait() + close(rewriteCh) + return nil + }) + + obsolete := restic.NewIDSet(extraObsolete...) + saveCh := make(chan *Index) + + wg.Go(func() error { + defer close(saveCh) + newIndex := NewIndex() + for task := range rewriteCh { + // always rewrite indexes using the old format, that include a pack that must be removed or that are not full + if !task.oldFormat && len(task.idx.Packs().Intersect(excludePacks)) == 0 && IndexFull(task.idx, mi.compress) { + // make sure that each pack is only stored exactly once in the index + excludePacks.Merge(task.idx.Packs()) + // index is already up to date + p.Add(1) + continue + } + + ids, err := task.idx.IDs() + if err != nil || len(ids) != 1 { + panic("internal error, index has no ID") + } + obsolete.Merge(restic.NewIDSet(ids...)) + + for pbs := range task.idx.EachByPack(wgCtx, excludePacks) { + newIndex.StorePack(pbs.PackID, pbs.Blobs) + if IndexFull(newIndex, mi.compress) { + select { + case saveCh <- newIndex: + case <-wgCtx.Done(): + return wgCtx.Err() + } + newIndex = NewIndex() + } + } + if wgCtx.Err() != nil { + return wgCtx.Err() + } + // make sure that each pack is only stored exactly once in the index + excludePacks.Merge(task.idx.Packs()) + p.Add(1) + } + + select { + case saveCh <- newIndex: + case <-wgCtx.Done(): + } + return nil + }) + + // a worker receives an index from ch, and saves the index + worker := func() error { + for idx := range saveCh { + idx.Finalize() + if _, err := idx.SaveIndex(wgCtx, repo); err != nil { + return err + } + } + return nil + } + + // encoding an index can take quite some time such that this can be CPU- or IO-bound + // do not add repo.Connections() here as there are already the loader goroutines. + workerCount := runtime.GOMAXPROCS(0) + // run workers on ch + for i := 0; i < workerCount; i++ { + wg.Go(worker) + } + err := wg.Wait() + p.Done() + if err != nil { + return fmt.Errorf("failed to rewrite indexes: %w", err) + } + + p = nil + if opts.DeleteProgress != nil { + p = opts.DeleteProgress() + } + defer p.Done() + return restic.ParallelRemove(ctx, repo, obsolete, restic.IndexFile, func(id restic.ID, err error) error { + if opts.DeleteReport != nil { + opts.DeleteReport(id, err) + } + return err + }, p) +} + +// SaveFallback saves all known indexes to index files, leaving out any +// packs whose ID is contained in packBlacklist from finalized indexes. +// It is only intended for use by prune with the UnsafeRecovery option. +// +// Must not be called concurrently to any other MasterIndex operation. +func (mi *MasterIndex) SaveFallback(ctx context.Context, repo restic.SaverRemoverUnpacked, excludePacks restic.IDSet, p *progress.Counter) error { p.SetMax(uint64(len(mi.Packs(excludePacks)))) mi.idxMutex.Lock() @@ -279,38 +507,23 @@ func (mi *MasterIndex) Save(ctx context.Context, repo restic.SaverRemoverUnpacke debug.Log("start rebuilding index of %d indexes, excludePacks: %v", len(mi.idx), excludePacks) - newIndex := NewIndex() obsolete := restic.NewIDSet() - - // track spawned goroutines using wg, create a new context which is - // cancelled as soon as an error occurs. wg, wgCtx := errgroup.WithContext(ctx) ch := make(chan *Index) - wg.Go(func() error { defer close(ch) - for i, idx := range mi.idx { + newIndex := NewIndex() + for _, idx := range mi.idx { if idx.Final() { ids, err := idx.IDs() if err != nil { - debug.Log("index %d does not have an ID: %v", err) - return err + panic("internal error - finalized index without ID") } - debug.Log("adding index ids %v to supersedes field", ids) - - err = newIndex.AddToSupersedes(ids...) - if err != nil { - return err - } obsolete.Merge(restic.NewIDSet(ids...)) - } else { - debug.Log("index %d isn't final, don't add to supersedes field", i) } - debug.Log("adding index %d", i) - for pbs := range idx.EachByPack(wgCtx, excludePacks) { newIndex.StorePack(pbs.PackID, pbs.Blobs) p.Add(1) @@ -328,12 +541,6 @@ func (mi *MasterIndex) Save(ctx context.Context, repo restic.SaverRemoverUnpacke } } - err := newIndex.AddToSupersedes(extraObsolete...) - if err != nil { - return err - } - obsolete.Merge(restic.NewIDSet(extraObsolete...)) - select { case ch <- newIndex: case <-wgCtx.Done(): @@ -345,58 +552,25 @@ func (mi *MasterIndex) Save(ctx context.Context, repo restic.SaverRemoverUnpacke worker := func() error { for idx := range ch { idx.Finalize() - if _, err := SaveIndex(wgCtx, repo, idx); err != nil { + if _, err := idx.SaveIndex(wgCtx, repo); err != nil { return err } } return nil } - // encoding an index can take quite some time such that this can be both CPU- or IO-bound - workerCount := int(repo.Connections()) + runtime.GOMAXPROCS(0) + // keep concurrency bounded as we're on a fallback path + workerCount := int(repo.Connections()) // run workers on ch for i := 0; i < workerCount; i++ { wg.Go(worker) } err := wg.Wait() p.Done() - if err != nil { - return err - } + // the index no longer matches to stored state + mi.clear() - if opts.SkipDeletion { - return nil - } - - p = nil - if opts.DeleteProgress != nil { - p = opts.DeleteProgress() - } - defer p.Done() - return restic.ParallelRemove(ctx, repo, obsolete, restic.IndexFile, func(id restic.ID, err error) error { - if opts.DeleteReport != nil { - opts.DeleteReport(id, err) - } - return err - }, p) -} - -// SaveIndex saves an index in the repository. -func SaveIndex(ctx context.Context, repo restic.SaverUnpacked, index *Index) (restic.ID, error) { - buf := bytes.NewBuffer(nil) - - err := index.Encode(buf) - if err != nil { - return restic.ID{}, err - } - - id, err := repo.SaveUnpacked(ctx, restic.IndexFile, buf.Bytes()) - ierr := index.SetID(id) - if ierr != nil { - // logic bug - panic(ierr) - } - return id, err + return err } // saveIndex saves all indexes in the backend. @@ -404,7 +578,7 @@ func (mi *MasterIndex) saveIndex(ctx context.Context, r restic.SaverUnpacked, in for i, idx := range indexes { debug.Log("Saving index %d", i) - sid, err := SaveIndex(ctx, r, idx) + sid, err := idx.SaveIndex(ctx, r) if err != nil { return err } diff --git a/internal/index/master_index_test.go b/internal/index/master_index_test.go index c3560a7fb..c42484c55 100644 --- a/internal/index/master_index_test.go +++ b/internal/index/master_index_test.go @@ -355,46 +355,102 @@ func TestIndexSave(t *testing.T) { } func testIndexSave(t *testing.T, version uint) { - repo := createFilledRepo(t, 3, version) + for _, test := range []struct { + name string + saver func(idx *index.MasterIndex, repo restic.Repository) error + }{ + {"rewrite no-op", func(idx *index.MasterIndex, repo restic.Repository) error { + return idx.Rewrite(context.TODO(), repo, nil, nil, nil, index.MasterIndexRewriteOpts{}) + }}, + {"rewrite skip-all", func(idx *index.MasterIndex, repo restic.Repository) error { + return idx.Rewrite(context.TODO(), repo, nil, restic.NewIDSet(), nil, index.MasterIndexRewriteOpts{}) + }}, + {"SaveFallback", func(idx *index.MasterIndex, repo restic.Repository) error { + err := restic.ParallelRemove(context.TODO(), repo, idx.IDs(), restic.IndexFile, nil, nil) + if err != nil { + return nil + } + return idx.SaveFallback(context.TODO(), repo, restic.NewIDSet(), nil) + }}, + } { + t.Run(test.name, func(t *testing.T) { + repo := createFilledRepo(t, 3, version) - err := repo.LoadIndex(context.TODO(), nil) - if err != nil { - t.Fatal(err) - } + idx := index.NewMasterIndex() + rtest.OK(t, idx.Load(context.TODO(), repo, nil, nil)) + blobs := make(map[restic.PackedBlob]struct{}) + rtest.OK(t, idx.Each(context.TODO(), func(pb restic.PackedBlob) { + blobs[pb] = struct{}{} + })) - err = repo.Index().Save(context.TODO(), repo, nil, nil, restic.MasterIndexSaveOpts{}) - if err != nil { - t.Fatalf("unable to save new index: %v", err) - } + rtest.OK(t, test.saver(idx, repo)) + idx = index.NewMasterIndex() + rtest.OK(t, idx.Load(context.TODO(), repo, nil, nil)) - checker := checker.New(repo, false) - err = checker.LoadSnapshots(context.TODO()) - if err != nil { - t.Error(err) - } + rtest.OK(t, idx.Each(context.TODO(), func(pb restic.PackedBlob) { + if _, ok := blobs[pb]; ok { + delete(blobs, pb) + } else { + t.Fatalf("unexpected blobs %v", pb) + } + })) + rtest.Equals(t, 0, len(blobs), "saved index is missing blobs") - hints, errs := checker.LoadIndex(context.TODO(), nil) - for _, h := range hints { - t.Logf("hint: %v\n", h) - } - - for _, err := range errs { - t.Errorf("checker found error: %v", err) - } - - ctx, cancel := context.WithCancel(context.TODO()) - defer cancel() - - errCh := make(chan error) - go checker.Structure(ctx, nil, errCh) - i := 0 - for err := range errCh { - t.Errorf("checker returned error: %v", err) - i++ - if i == 10 { - t.Errorf("more than 10 errors returned, skipping the rest") - cancel() - break - } + checker.TestCheckRepo(t, repo, false) + }) } } + +func TestIndexSavePartial(t *testing.T) { + repository.TestAllVersions(t, testIndexSavePartial) +} + +func testIndexSavePartial(t *testing.T, version uint) { + repo := createFilledRepo(t, 3, version) + + // capture blob list before adding fourth snapshot + idx := index.NewMasterIndex() + rtest.OK(t, idx.Load(context.TODO(), repo, nil, nil)) + blobs := make(map[restic.PackedBlob]struct{}) + rtest.OK(t, idx.Each(context.TODO(), func(pb restic.PackedBlob) { + blobs[pb] = struct{}{} + })) + + // add+remove new snapshot and track its pack files + packsBefore := listPacks(t, repo) + sn := restic.TestCreateSnapshot(t, repo, snapshotTime.Add(time.Duration(4)*time.Second), depth) + rtest.OK(t, repo.RemoveUnpacked(context.TODO(), restic.SnapshotFile, *sn.ID())) + packsAfter := listPacks(t, repo) + newPacks := packsAfter.Sub(packsBefore) + + // rewrite index and remove pack files of new snapshot + idx = index.NewMasterIndex() + rtest.OK(t, idx.Load(context.TODO(), repo, nil, nil)) + rtest.OK(t, idx.Rewrite(context.TODO(), repo, newPacks, nil, nil, index.MasterIndexRewriteOpts{})) + + // check blobs + idx = index.NewMasterIndex() + rtest.OK(t, idx.Load(context.TODO(), repo, nil, nil)) + rtest.OK(t, idx.Each(context.TODO(), func(pb restic.PackedBlob) { + if _, ok := blobs[pb]; ok { + delete(blobs, pb) + } else { + t.Fatalf("unexpected blobs %v", pb) + } + })) + rtest.Equals(t, 0, len(blobs), "saved index is missing blobs") + + // remove pack files to make check happy + rtest.OK(t, restic.ParallelRemove(context.TODO(), repo, newPacks, restic.PackFile, nil, nil)) + + checker.TestCheckRepo(t, repo, false) +} + +func listPacks(t testing.TB, repo restic.Lister) restic.IDSet { + s := restic.NewIDSet() + rtest.OK(t, repo.List(context.TODO(), restic.PackFile, func(id restic.ID, _ int64) error { + s.Insert(id) + return nil + })) + return s +} diff --git a/internal/pack/pack.go b/internal/pack/pack.go index 7d8d87e71..57957ce91 100644 --- a/internal/pack/pack.go +++ b/internal/pack/pack.go @@ -389,10 +389,10 @@ func CalculateHeaderSize(blobs []restic.Blob) int { // If onlyHdr is set to true, only the size of the header is returned // Note that this function only gives correct sizes, if there are no // duplicates in the index. -func Size(ctx context.Context, mi restic.MasterIndex, onlyHdr bool) (map[restic.ID]int64, error) { +func Size(ctx context.Context, mi restic.ListBlobser, onlyHdr bool) (map[restic.ID]int64, error) { packSize := make(map[restic.ID]int64) - err := mi.Each(ctx, func(blob restic.PackedBlob) { + err := mi.ListBlobs(ctx, func(blob restic.PackedBlob) { size, ok := packSize[blob.PackID] if !ok { size = headerSize diff --git a/internal/repository/check.go b/internal/repository/check.go index 8018f4902..f16cd7492 100644 --- a/internal/repository/check.go +++ b/internal/repository/check.go @@ -158,11 +158,10 @@ func checkPackInner(ctx context.Context, r *Repository, id restic.ID, blobs []re errs = append(errs, errors.Errorf("pack header size does not match, want %v, got %v", idxHdrSize, hdrSize)) } - idx := r.Index() for _, blob := range blobs { // Check if blob is contained in index and position is correct idxHas := false - for _, pb := range idx.Lookup(blob.BlobHandle) { + for _, pb := range r.LookupBlob(blob.BlobHandle.Type, blob.BlobHandle.ID) { if pb.PackID == id && pb.Blob == blob { idxHas = true break diff --git a/internal/repository/packer_manager.go b/internal/repository/packer_manager.go index 22eca0c2e..76734fb87 100644 --- a/internal/repository/packer_manager.go +++ b/internal/repository/packer_manager.go @@ -21,8 +21,8 @@ import ( "github.com/minio/sha256-simd" ) -// Packer holds a pack.Packer together with a hash writer. -type Packer struct { +// packer holds a pack.packer together with a hash writer. +type packer struct { *pack.Packer tmpfile *os.File bufWr *bufio.Writer @@ -32,16 +32,16 @@ type Packer struct { type packerManager struct { tpe restic.BlobType key *crypto.Key - queueFn func(ctx context.Context, t restic.BlobType, p *Packer) error + queueFn func(ctx context.Context, t restic.BlobType, p *packer) error pm sync.Mutex - packer *Packer + packer *packer packSize uint } // newPackerManager returns a new packer manager which writes temporary files // to a temporary directory -func newPackerManager(key *crypto.Key, tpe restic.BlobType, packSize uint, queueFn func(ctx context.Context, t restic.BlobType, p *Packer) error) *packerManager { +func newPackerManager(key *crypto.Key, tpe restic.BlobType, packSize uint, queueFn func(ctx context.Context, t restic.BlobType, p *packer) error) *packerManager { return &packerManager{ tpe: tpe, key: key, @@ -114,7 +114,7 @@ func (r *packerManager) SaveBlob(ctx context.Context, t restic.BlobType, id rest // findPacker returns a packer for a new blob of size bytes. Either a new one is // created or one is returned that already has some blobs. -func (r *packerManager) newPacker() (packer *Packer, err error) { +func (r *packerManager) newPacker() (pck *packer, err error) { debug.Log("create new pack") tmpfile, err := fs.TempFile("", "restic-temp-pack-") if err != nil { @@ -123,17 +123,17 @@ func (r *packerManager) newPacker() (packer *Packer, err error) { bufWr := bufio.NewWriter(tmpfile) p := pack.NewPacker(r.key, bufWr) - packer = &Packer{ + pck = &packer{ Packer: p, tmpfile: tmpfile, bufWr: bufWr, } - return packer, nil + return pck, nil } // savePacker stores p in the backend. -func (r *Repository) savePacker(ctx context.Context, t restic.BlobType, p *Packer) error { +func (r *Repository) savePacker(ctx context.Context, t restic.BlobType, p *packer) error { debug.Log("save packer for %v with %d blobs (%d bytes)\n", t, p.Packer.Count(), p.Packer.Size()) err := p.Packer.Finalize() if err != nil { @@ -200,8 +200,5 @@ func (r *Repository) savePacker(ctx context.Context, t restic.BlobType, p *Packe r.idx.StorePack(id, p.Packer.Blobs()) // Save index if full - if r.noAutoIndexUpdate { - return nil - } return r.idx.SaveFullIndex(ctx, r) } diff --git a/internal/repository/packer_manager_test.go b/internal/repository/packer_manager_test.go index 8984073da..0f3aea05f 100644 --- a/internal/repository/packer_manager_test.go +++ b/internal/repository/packer_manager_test.go @@ -70,7 +70,7 @@ func testPackerManager(t testing.TB) int64 { rnd := rand.New(rand.NewSource(randomSeed)) savedBytes := int(0) - pm := newPackerManager(crypto.NewRandomKey(), restic.DataBlob, DefaultPackSize, func(ctx context.Context, tp restic.BlobType, p *Packer) error { + pm := newPackerManager(crypto.NewRandomKey(), restic.DataBlob, DefaultPackSize, func(ctx context.Context, tp restic.BlobType, p *packer) error { err := p.Finalize() if err != nil { return err @@ -92,7 +92,7 @@ func testPackerManager(t testing.TB) int64 { func TestPackerManagerWithOversizeBlob(t *testing.T) { packFiles := int(0) sizeLimit := uint(512 * 1024) - pm := newPackerManager(crypto.NewRandomKey(), restic.DataBlob, sizeLimit, func(ctx context.Context, tp restic.BlobType, p *Packer) error { + pm := newPackerManager(crypto.NewRandomKey(), restic.DataBlob, sizeLimit, func(ctx context.Context, tp restic.BlobType, p *packer) error { packFiles++ return nil }) @@ -122,7 +122,7 @@ func BenchmarkPackerManager(t *testing.B) { for i := 0; i < t.N; i++ { rnd.Seed(randomSeed) - pm := newPackerManager(crypto.NewRandomKey(), restic.DataBlob, DefaultPackSize, func(ctx context.Context, t restic.BlobType, p *Packer) error { + pm := newPackerManager(crypto.NewRandomKey(), restic.DataBlob, DefaultPackSize, func(ctx context.Context, t restic.BlobType, p *packer) error { return nil }) fillPacks(t, rnd, pm, blobBuf) diff --git a/internal/repository/packer_uploader.go b/internal/repository/packer_uploader.go index 30c8f77af..936e7ea1d 100644 --- a/internal/repository/packer_uploader.go +++ b/internal/repository/packer_uploader.go @@ -7,13 +7,13 @@ import ( "golang.org/x/sync/errgroup" ) -// SavePacker implements saving a pack in the repository. -type SavePacker interface { - savePacker(ctx context.Context, t restic.BlobType, p *Packer) error +// savePacker implements saving a pack in the repository. +type savePacker interface { + savePacker(ctx context.Context, t restic.BlobType, p *packer) error } type uploadTask struct { - packer *Packer + packer *packer tpe restic.BlobType } @@ -21,7 +21,7 @@ type packerUploader struct { uploadQueue chan uploadTask } -func newPackerUploader(ctx context.Context, wg *errgroup.Group, repo SavePacker, connections uint) *packerUploader { +func newPackerUploader(ctx context.Context, wg *errgroup.Group, repo savePacker, connections uint) *packerUploader { pu := &packerUploader{ uploadQueue: make(chan uploadTask), } @@ -48,7 +48,7 @@ func newPackerUploader(ctx context.Context, wg *errgroup.Group, repo SavePacker, return pu } -func (pu *packerUploader) QueuePacker(ctx context.Context, t restic.BlobType, p *Packer) (err error) { +func (pu *packerUploader) QueuePacker(ctx context.Context, t restic.BlobType, p *packer) (err error) { select { case <-ctx.Done(): return ctx.Err() diff --git a/internal/repository/prune.go b/internal/repository/prune.go index 479439e6a..49869fcac 100644 --- a/internal/repository/prune.go +++ b/internal/repository/prune.go @@ -7,7 +7,6 @@ import ( "sort" "github.com/restic/restic/internal/errors" - "github.com/restic/restic/internal/index" "github.com/restic/restic/internal/pack" "github.com/restic/restic/internal/restic" "github.com/restic/restic/internal/ui/progress" @@ -67,16 +66,18 @@ type PrunePlan struct { removePacks restic.IDSet // packs to remove ignorePacks restic.IDSet // packs to ignore when rebuilding the index - repo restic.Repository + repo *Repository stats PruneStats opts PruneOptions } type packInfo struct { - usedBlobs uint - unusedBlobs uint - usedSize uint64 - unusedSize uint64 + usedBlobs uint + unusedBlobs uint + duplicateBlobs uint + usedSize uint64 + unusedSize uint64 + tpe restic.BlobType uncompressed bool } @@ -89,7 +90,7 @@ type packInfoWithID struct { // PlanPrune selects which files to rewrite and which to delete and which blobs to keep. // Also some summary statistics are returned. -func PlanPrune(ctx context.Context, opts PruneOptions, repo restic.Repository, getUsedBlobs func(ctx context.Context, repo restic.Repository) (usedBlobs restic.CountedBlobSet, err error), printer progress.Printer) (*PrunePlan, error) { +func PlanPrune(ctx context.Context, opts PruneOptions, repo *Repository, getUsedBlobs func(ctx context.Context, repo restic.Repository) (usedBlobs restic.CountedBlobSet, err error), printer progress.Printer) (*PrunePlan, error) { var stats PruneStats if opts.UnsafeRecovery { @@ -109,7 +110,7 @@ func PlanPrune(ctx context.Context, opts PruneOptions, repo restic.Repository, g } printer.P("searching used packs...\n") - keepBlobs, indexPack, err := packInfoFromIndex(ctx, repo.Index(), usedBlobs, &stats, printer) + keepBlobs, indexPack, err := packInfoFromIndex(ctx, repo, usedBlobs, &stats, printer) if err != nil { return nil, err } @@ -124,7 +125,7 @@ func PlanPrune(ctx context.Context, opts PruneOptions, repo restic.Repository, g blobCount := keepBlobs.Len() // when repacking, we do not want to keep blobs which are // already contained in kept packs, so delete them from keepBlobs - err := repo.Index().Each(ctx, func(blob restic.PackedBlob) { + err := repo.ListBlobs(ctx, func(blob restic.PackedBlob) { if plan.removePacks.Has(blob.PackID) || plan.repackPacks.Has(blob.PackID) { return } @@ -151,11 +152,11 @@ func PlanPrune(ctx context.Context, opts PruneOptions, repo restic.Repository, g return &plan, nil } -func packInfoFromIndex(ctx context.Context, idx restic.MasterIndex, usedBlobs restic.CountedBlobSet, stats *PruneStats, printer progress.Printer) (restic.CountedBlobSet, map[restic.ID]packInfo, error) { +func packInfoFromIndex(ctx context.Context, idx restic.ListBlobser, usedBlobs restic.CountedBlobSet, stats *PruneStats, printer progress.Printer) (restic.CountedBlobSet, map[restic.ID]packInfo, error) { // iterate over all blobs in index to find out which blobs are duplicates // The counter in usedBlobs describes how many instances of the blob exist in the repository index // Thus 0 == blob is missing, 1 == blob exists once, >= 2 == duplicates exist - err := idx.Each(ctx, func(blob restic.PackedBlob) { + err := idx.ListBlobs(ctx, func(blob restic.PackedBlob) { bh := blob.BlobHandle count, ok := usedBlobs[bh] if ok { @@ -205,7 +206,7 @@ func packInfoFromIndex(ctx context.Context, idx restic.MasterIndex, usedBlobs re hasDuplicates := false // iterate over all blobs in index to generate packInfo - err = idx.Each(ctx, func(blob restic.PackedBlob) { + err = idx.ListBlobs(ctx, func(blob restic.PackedBlob) { ip := indexPack[blob.PackID] // Set blob type if not yet set @@ -227,6 +228,7 @@ func packInfoFromIndex(ctx context.Context, idx restic.MasterIndex, usedBlobs re // mark as unused for now, we will later on select one copy ip.unusedSize += size ip.unusedBlobs++ + ip.duplicateBlobs++ // count as duplicate, will later on change one copy to be counted as used stats.Size.Duplicate += size @@ -257,10 +259,12 @@ func packInfoFromIndex(ctx context.Context, idx restic.MasterIndex, usedBlobs re // if duplicate blobs exist, those will be set to either "used" or "unused": // - mark only one occurrence of duplicate blobs as used // - if there are already some used blobs in a pack, possibly mark duplicates in this pack as "used" + // - if a pack only consists of duplicates (which by definition are used blobs), mark it as "used". This + // ensures that already rewritten packs are kept. // - if there are no used blobs in a pack, possibly mark duplicates as "unused" if hasDuplicates { // iterate again over all blobs in index (this is pretty cheap, all in-mem) - err = idx.Each(ctx, func(blob restic.PackedBlob) { + err = idx.ListBlobs(ctx, func(blob restic.PackedBlob) { bh := blob.BlobHandle count, ok := usedBlobs[bh] // skip non-duplicate, aka. normal blobs @@ -272,8 +276,10 @@ func packInfoFromIndex(ctx context.Context, idx restic.MasterIndex, usedBlobs re ip := indexPack[blob.PackID] size := uint64(blob.Length) switch { - case ip.usedBlobs > 0, count == 0: - // other used blobs in pack or "last" occurrence -> transition to used + case ip.usedBlobs > 0, (ip.duplicateBlobs == ip.unusedBlobs), count == 0: + // other used blobs in pack, only duplicate blobs or "last" occurrence -> transition to used + // a pack file created by an interrupted prune run will consist of only duplicate blobs + // thus select such already repacked pack files ip.usedSize += size ip.usedBlobs++ ip.unusedSize -= size @@ -314,7 +320,7 @@ func packInfoFromIndex(ctx context.Context, idx restic.MasterIndex, usedBlobs re return usedBlobs, indexPack, nil } -func decidePackAction(ctx context.Context, opts PruneOptions, repo restic.Repository, indexPack map[restic.ID]packInfo, stats *PruneStats, printer progress.Printer) (PrunePlan, error) { +func decidePackAction(ctx context.Context, opts PruneOptions, repo *Repository, indexPack map[restic.ID]packInfo, stats *PruneStats, printer progress.Printer) (PrunePlan, error) { removePacksFirst := restic.NewIDSet() removePacks := restic.NewIDSet() repackPacks := restic.NewIDSet() @@ -323,10 +329,10 @@ func decidePackAction(ctx context.Context, opts PruneOptions, repo restic.Reposi var repackSmallCandidates []packInfoWithID repoVersion := repo.Config().Version // only repack very small files by default - targetPackSize := repo.PackSize() / 25 + targetPackSize := repo.packSize() / 25 if opts.RepackSmall { // consider files with at least 80% of the target size as large enough - targetPackSize = repo.PackSize() / 5 * 4 + targetPackSize = repo.packSize() / 5 * 4 } // loop over all packs and decide what to do @@ -523,7 +529,7 @@ func (plan *PrunePlan) Stats() PruneStats { // - rebuild the index while ignoring all files that will be deleted // - delete the files // plan.removePacks and plan.ignorePacks are modified in this function. -func (plan *PrunePlan) Execute(ctx context.Context, printer progress.Printer) (err error) { +func (plan *PrunePlan) Execute(ctx context.Context, printer progress.Printer) error { if plan.opts.DryRun { printer.V("Repeated prune dry-runs can report slightly different amounts of data to keep or repack. This is expected behavior.\n\n") if len(plan.removePacksFirst) > 0 { @@ -581,13 +587,13 @@ func (plan *PrunePlan) Execute(ctx context.Context, printer progress.Printer) (e if plan.opts.UnsafeRecovery { printer.P("deleting index files\n") - indexFiles := repo.Index().(*index.MasterIndex).IDs() - err = deleteFiles(ctx, false, repo, indexFiles, restic.IndexFile, printer) + indexFiles := repo.idx.IDs() + err := deleteFiles(ctx, false, repo, indexFiles, restic.IndexFile, printer) if err != nil { return errors.Fatalf("%s", err) } } else if len(plan.ignorePacks) != 0 { - err = rebuildIndexFiles(ctx, repo, plan.ignorePacks, nil, false, printer) + err := rewriteIndexFiles(ctx, repo, plan.ignorePacks, nil, nil, printer) if err != nil { return errors.Fatalf("%s", err) } @@ -602,18 +608,14 @@ func (plan *PrunePlan) Execute(ctx context.Context, printer progress.Printer) (e } if plan.opts.UnsafeRecovery { - err = rebuildIndexFiles(ctx, repo, plan.ignorePacks, nil, true, printer) + err := repo.idx.SaveFallback(ctx, repo, plan.ignorePacks, printer.NewCounter("packs processed")) if err != nil { return errors.Fatalf("%s", err) } } - if err != nil { - return err - } - // drop outdated in-memory index - repo.ClearIndex() + repo.clearIndex() printer.P("done\n") return nil diff --git a/internal/repository/repack.go b/internal/repository/repack.go index 5bedcfa56..8c9ca28bb 100644 --- a/internal/repository/repack.go +++ b/internal/repository/repack.go @@ -54,7 +54,7 @@ func repack(ctx context.Context, repo restic.Repository, dstRepo restic.Reposito downloadQueue := make(chan restic.PackBlobs) wg.Go(func() error { defer close(downloadQueue) - for pbs := range repo.Index().ListPacks(wgCtx, packs) { + for pbs := range repo.ListPacksFromIndex(wgCtx, packs) { var packBlobs []restic.Blob keepMutex.Lock() // filter out unnecessary blobs diff --git a/internal/repository/repack_test.go b/internal/repository/repack_test.go index 3fd56ccb1..476e63b47 100644 --- a/internal/repository/repack_test.go +++ b/internal/repository/repack_test.go @@ -7,10 +7,10 @@ import ( "time" "github.com/restic/restic/internal/backend" - "github.com/restic/restic/internal/index" "github.com/restic/restic/internal/repository" "github.com/restic/restic/internal/restic" rtest "github.com/restic/restic/internal/test" + "github.com/restic/restic/internal/ui/progress" "golang.org/x/sync/errgroup" ) @@ -145,9 +145,8 @@ func listFiles(t *testing.T, repo restic.Lister, tpe backend.FileType) restic.ID func findPacksForBlobs(t *testing.T, repo restic.Repository, blobs restic.BlobSet) restic.IDSet { packs := restic.NewIDSet() - idx := repo.Index() for h := range blobs { - list := idx.Lookup(h) + list := repo.LookupBlob(h.Type, h.ID) if len(list) == 0 { t.Fatal("Failed to find blob", h.ID.Str(), "with type", h.Type) } @@ -174,40 +173,12 @@ func repack(t *testing.T, repo restic.Repository, packs restic.IDSet, blobs rest } } -func rebuildIndex(t *testing.T, repo restic.Repository) { - err := repo.SetIndex(index.NewMasterIndex()) - rtest.OK(t, err) +func rebuildAndReloadIndex(t *testing.T, repo *repository.Repository) { + rtest.OK(t, repository.RepairIndex(context.TODO(), repo, repository.RepairIndexOptions{ + ReadAllPacks: true, + }, &progress.NoopPrinter{})) - packs := make(map[restic.ID]int64) - err = repo.List(context.TODO(), restic.PackFile, func(id restic.ID, size int64) error { - packs[id] = size - return nil - }) - rtest.OK(t, err) - - _, err = repo.(*repository.Repository).CreateIndexFromPacks(context.TODO(), packs, nil) - rtest.OK(t, err) - - var obsoleteIndexes restic.IDs - err = repo.List(context.TODO(), restic.IndexFile, func(id restic.ID, size int64) error { - obsoleteIndexes = append(obsoleteIndexes, id) - return nil - }) - rtest.OK(t, err) - - err = repo.Index().Save(context.TODO(), repo, restic.NewIDSet(), obsoleteIndexes, restic.MasterIndexSaveOpts{}) - rtest.OK(t, err) -} - -func reloadIndex(t *testing.T, repo restic.Repository) { - err := repo.SetIndex(index.NewMasterIndex()) - if err != nil { - t.Fatal(err) - } - - if err := repo.LoadIndex(context.TODO(), nil); err != nil { - t.Fatalf("error loading new index: %v", err) - } + rtest.OK(t, repo.LoadIndex(context.TODO(), nil)) } func TestRepack(t *testing.T) { @@ -242,8 +213,7 @@ func testRepack(t *testing.T, version uint) { removePacks := findPacksForBlobs(t, repo, removeBlobs) repack(t, repo, removePacks, keepBlobs) - rebuildIndex(t, repo) - reloadIndex(t, repo) + rebuildAndReloadIndex(t, repo) packsAfter = listPacks(t, repo) for id := range removePacks { @@ -252,10 +222,8 @@ func testRepack(t *testing.T, version uint) { } } - idx := repo.Index() - for h := range keepBlobs { - list := idx.Lookup(h) + list := repo.LookupBlob(h.Type, h.ID) if len(list) == 0 { t.Errorf("unable to find blob %v in repo", h.ID.Str()) continue @@ -274,7 +242,7 @@ func testRepack(t *testing.T, version uint) { } for h := range removeBlobs { - if _, found := repo.LookupBlobSize(h.ID, h.Type); found { + if _, found := repo.LookupBlobSize(h.Type, h.ID); found { t.Errorf("blob %v still contained in the repo", h) } } @@ -315,13 +283,10 @@ func testRepackCopy(t *testing.T, version uint) { if err != nil { t.Fatal(err) } - rebuildIndex(t, dstRepo) - reloadIndex(t, dstRepo) - - idx := dstRepo.Index() + rebuildAndReloadIndex(t, dstRepo) for h := range keepBlobs { - list := idx.Lookup(h) + list := dstRepo.LookupBlob(h.Type, h.ID) if len(list) == 0 { t.Errorf("unable to find blob %v in repo", h.ID.Str()) continue diff --git a/internal/repository/repair_index.go b/internal/repository/repair_index.go index a6e732b44..e01131923 100644 --- a/internal/repository/repair_index.go +++ b/internal/repository/repair_index.go @@ -28,6 +28,8 @@ func RepairIndex(ctx context.Context, repo *Repository, opts RepairIndexOptions, if err != nil { return err } + repo.clearIndex() + } else { printer.P("loading indexes...\n") mi := index.NewMasterIndex() @@ -54,12 +56,14 @@ func RepairIndex(ctx context.Context, repo *Repository, opts RepairIndexOptions, if err != nil { return err } - packSizeFromIndex, err = pack.Size(ctx, repo.Index(), false) + packSizeFromIndex, err = pack.Size(ctx, repo, false) if err != nil { return err } } + oldIndexes := repo.idx.IDs() + printer.P("getting pack files to read...\n") err := repo.List(ctx, restic.PackFile, func(id restic.ID, packSize int64) error { size, ok := packSizeFromIndex[id] @@ -90,7 +94,7 @@ func RepairIndex(ctx context.Context, repo *Repository, opts RepairIndexOptions, printer.P("reading pack files\n") bar := printer.NewCounter("packs") bar.SetMax(uint64(len(packSizeFromList))) - invalidFiles, err := repo.CreateIndexFromPacks(ctx, packSizeFromList, bar) + invalidFiles, err := repo.createIndexFromPacks(ctx, packSizeFromList, bar) bar.Done() if err != nil { return err @@ -101,21 +105,25 @@ func RepairIndex(ctx context.Context, repo *Repository, opts RepairIndexOptions, } } - err = rebuildIndexFiles(ctx, repo, removePacks, obsoleteIndexes, false, printer) + if err := repo.Flush(ctx); err != nil { + return err + } + + err = rewriteIndexFiles(ctx, repo, removePacks, oldIndexes, obsoleteIndexes, printer) if err != nil { return err } // drop outdated in-memory index - repo.ClearIndex() + repo.clearIndex() return nil } -func rebuildIndexFiles(ctx context.Context, repo restic.Repository, removePacks restic.IDSet, extraObsolete restic.IDs, skipDeletion bool, printer progress.Printer) error { +func rewriteIndexFiles(ctx context.Context, repo *Repository, removePacks restic.IDSet, oldIndexes restic.IDSet, extraObsolete restic.IDs, printer progress.Printer) error { printer.P("rebuilding index\n") - bar := printer.NewCounter("packs processed") - return repo.Index().Save(ctx, repo, removePacks, extraObsolete, restic.MasterIndexSaveOpts{ + bar := printer.NewCounter("indexes processed") + return repo.idx.Rewrite(ctx, repo, removePacks, oldIndexes, extraObsolete, index.MasterIndexRewriteOpts{ SaveProgress: bar, DeleteProgress: func() *progress.Counter { return printer.NewCounter("old indexes deleted") @@ -127,6 +135,5 @@ func rebuildIndexFiles(ctx context.Context, repo restic.Repository, removePacks printer.VV("removed index %v\n", id.String()) } }, - SkipDeletion: skipDeletion, }) } diff --git a/internal/repository/repair_index_test.go b/internal/repository/repair_index_test.go index 79922e9ec..ac47d59ff 100644 --- a/internal/repository/repair_index_test.go +++ b/internal/repository/repair_index_test.go @@ -30,10 +30,6 @@ func testRebuildIndex(t *testing.T, readAllPacks bool, damage func(t *testing.T, ReadAllPacks: readAllPacks, }, &progress.NoopPrinter{})) - newIndexes := listIndex(t, repo) - old := indexes.Intersect(newIndexes) - rtest.Assert(t, len(old) == 0, "expected old indexes to be removed, found %v", old) - checker.TestCheckRepo(t, repo, true) } diff --git a/internal/repository/repair_pack.go b/internal/repository/repair_pack.go index a0bd56012..811388cc9 100644 --- a/internal/repository/repair_pack.go +++ b/internal/repository/repair_pack.go @@ -10,7 +10,7 @@ import ( "golang.org/x/sync/errgroup" ) -func RepairPacks(ctx context.Context, repo restic.Repository, ids restic.IDSet, printer progress.Printer) error { +func RepairPacks(ctx context.Context, repo *Repository, ids restic.IDSet, printer progress.Printer) error { wg, wgCtx := errgroup.WithContext(ctx) repo.StartPackUploader(wgCtx, wg) @@ -21,7 +21,7 @@ func RepairPacks(ctx context.Context, repo restic.Repository, ids restic.IDSet, wg.Go(func() error { // examine all data the indexes have for the pack file - for b := range repo.Index().ListPacks(wgCtx, ids) { + for b := range repo.ListPacksFromIndex(wgCtx, ids) { blobs := b.Blobs if len(blobs) == 0 { printer.E("no blobs found for pack %v", b.PackID) @@ -56,7 +56,7 @@ func RepairPacks(ctx context.Context, repo restic.Repository, ids restic.IDSet, } // remove salvaged packs from index - err = rebuildIndexFiles(ctx, repo, ids, nil, false, printer) + err = rewriteIndexFiles(ctx, repo, ids, nil, nil, printer) if err != nil { return err } diff --git a/internal/repository/repair_pack_test.go b/internal/repository/repair_pack_test.go index 7acdc646e..0d6d340f4 100644 --- a/internal/repository/repair_pack_test.go +++ b/internal/repository/repair_pack_test.go @@ -8,7 +8,6 @@ import ( "github.com/restic/restic/internal/backend" backendtest "github.com/restic/restic/internal/backend/test" - "github.com/restic/restic/internal/index" "github.com/restic/restic/internal/repository" "github.com/restic/restic/internal/restic" "github.com/restic/restic/internal/test" @@ -18,7 +17,7 @@ import ( func listBlobs(repo restic.Repository) restic.BlobSet { blobs := restic.NewBlobSet() - _ = repo.Index().Each(context.TODO(), func(pb restic.PackedBlob) { + _ = repo.ListBlobs(context.TODO(), func(pb restic.PackedBlob) { blobs.Insert(pb.BlobHandle) }) return blobs @@ -68,7 +67,7 @@ func testRepairBrokenPack(t *testing.T, version uint) { // find blob that starts at offset 0 var damagedBlob restic.BlobHandle - for blobs := range repo.Index().ListPacks(context.TODO(), restic.NewIDSet(damagedID)) { + for blobs := range repo.ListPacksFromIndex(context.TODO(), restic.NewIDSet(damagedID)) { for _, blob := range blobs.Blobs { if blob.Offset == 0 { damagedBlob = blob.BlobHandle @@ -91,7 +90,7 @@ func testRepairBrokenPack(t *testing.T, version uint) { // all blobs in the file are broken damagedBlobs := restic.NewBlobSet() - for blobs := range repo.Index().ListPacks(context.TODO(), restic.NewIDSet(damagedID)) { + for blobs := range repo.ListPacksFromIndex(context.TODO(), restic.NewIDSet(damagedID)) { for _, blob := range blobs.Blobs { damagedBlobs.Insert(blob.BlobHandle) } @@ -118,7 +117,6 @@ func testRepairBrokenPack(t *testing.T, version uint) { rtest.OK(t, repository.RepairPacks(context.TODO(), repo, toRepair, &progress.NoopPrinter{})) // reload index - rtest.OK(t, repo.SetIndex(index.NewMasterIndex())) rtest.OK(t, repo.LoadIndex(context.TODO(), nil)) packsAfter := listPacks(t, repo) diff --git a/internal/repository/repository.go b/internal/repository/repository.go index 534edc9fd..f0ef93ecf 100644 --- a/internal/repository/repository.go +++ b/internal/repository/repository.go @@ -42,8 +42,6 @@ type Repository struct { opts Options - noAutoIndexUpdate bool - packerWg *errgroup.Group uploader *packerUploader treePM *packerManager @@ -130,12 +128,6 @@ func New(be backend.Backend, opts Options) (*Repository, error) { return repo, nil } -// DisableAutoIndexUpdate deactives the automatic finalization and upload of new -// indexes once these are full -func (r *Repository) DisableAutoIndexUpdate() { - r.noAutoIndexUpdate = true -} - // setConfig assigns the given config and updates the repository parameters accordingly func (r *Repository) setConfig(cfg restic.Config) { r.cfg = cfg @@ -146,8 +138,8 @@ func (r *Repository) Config() restic.Config { return r.cfg } -// PackSize return the target size of a pack file when uploading -func (r *Repository) PackSize() uint { +// packSize return the target size of a pack file when uploading +func (r *Repository) packSize() uint { return r.opts.PackSize } @@ -300,11 +292,6 @@ func (r *Repository) loadBlob(ctx context.Context, blobs []restic.PackedBlob, bu return nil, errors.Errorf("loading %v from %v packs failed", blobs[0].BlobHandle, len(blobs)) } -// LookupBlobSize returns the size of blob id. -func (r *Repository) LookupBlobSize(id restic.ID, tpe restic.BlobType) (uint, bool) { - return r.idx.LookupSize(restic.BlobHandle{ID: id, Type: tpe}) -} - func (r *Repository) getZstdEncoder() *zstd.Encoder { r.allocEnc.Do(func() { level := zstd.SpeedDefault @@ -531,10 +518,6 @@ func (r *Repository) Flush(ctx context.Context) error { return err } - // Save index after flushing only if noAutoIndexUpdate is not set - if r.noAutoIndexUpdate { - return nil - } return r.idx.SaveIndex(ctx, r) } @@ -546,8 +529,8 @@ func (r *Repository) StartPackUploader(ctx context.Context, wg *errgroup.Group) innerWg, ctx := errgroup.WithContext(ctx) r.packerWg = innerWg r.uploader = newPackerUploader(ctx, innerWg, r, r.be.Connections()) - r.treePM = newPackerManager(r.key, restic.TreeBlob, r.PackSize(), r.uploader.QueuePacker) - r.dataPM = newPackerManager(r.key, restic.DataBlob, r.PackSize(), r.uploader.QueuePacker) + r.treePM = newPackerManager(r.key, restic.TreeBlob, r.packSize(), r.uploader.QueuePacker) + r.dataPM = newPackerManager(r.key, restic.DataBlob, r.packSize(), r.uploader.QueuePacker) wg.Go(func() error { return innerWg.Wait() @@ -583,9 +566,23 @@ func (r *Repository) Connections() uint { return r.be.Connections() } -// Index returns the currently used MasterIndex. -func (r *Repository) Index() restic.MasterIndex { - return r.idx +func (r *Repository) LookupBlob(tpe restic.BlobType, id restic.ID) []restic.PackedBlob { + return r.idx.Lookup(restic.BlobHandle{Type: tpe, ID: id}) +} + +// LookupBlobSize returns the size of blob id. +func (r *Repository) LookupBlobSize(tpe restic.BlobType, id restic.ID) (uint, bool) { + return r.idx.LookupSize(restic.BlobHandle{Type: tpe, ID: id}) +} + +// ListBlobs runs fn on all blobs known to the index. When the context is cancelled, +// the index iteration returns immediately with ctx.Err(). This blocks any modification of the index. +func (r *Repository) ListBlobs(ctx context.Context, fn func(restic.PackedBlob)) error { + return r.idx.Each(ctx, fn) +} + +func (r *Repository) ListPacksFromIndex(ctx context.Context, packs restic.IDSet) <-chan restic.PackBlobs { + return r.idx.ListPacks(ctx, packs) } // SetIndex instructs the repository to use the given index. @@ -595,7 +592,7 @@ func (r *Repository) SetIndex(i restic.MasterIndex) error { return r.prepareCache() } -func (r *Repository) ClearIndex() { +func (r *Repository) clearIndex() { r.idx = index.NewMasterIndex() r.configureIndex() } @@ -610,43 +607,10 @@ func (r *Repository) configureIndex() { func (r *Repository) LoadIndex(ctx context.Context, p *progress.Counter) error { debug.Log("Loading index") - indexList, err := restic.MemorizeList(ctx, r, restic.IndexFile) - if err != nil { - return err - } - - if p != nil { - var numIndexFiles uint64 - err := indexList.List(ctx, restic.IndexFile, func(_ restic.ID, _ int64) error { - numIndexFiles++ - return nil - }) - if err != nil { - return err - } - p.SetMax(numIndexFiles) - defer p.Done() - } - // reset in-memory index before loading it from the repository - r.ClearIndex() + r.clearIndex() - err = index.ForAllIndexes(ctx, indexList, r, func(_ restic.ID, idx *index.Index, _ bool, err error) error { - if err != nil { - return err - } - r.idx.Insert(idx) - if p != nil { - p.Add(1) - } - return nil - }) - - if err != nil { - return err - } - - err = r.idx.MergeFinalIndexes() + err := r.idx.Load(ctx, r, p, nil) if err != nil { return err } @@ -680,10 +644,10 @@ func (r *Repository) LoadIndex(ctx context.Context, p *progress.Counter) error { return r.prepareCache() } -// CreateIndexFromPacks creates a new index by reading all given pack files (with sizes). +// createIndexFromPacks creates a new index by reading all given pack files (with sizes). // The index is added to the MasterIndex but not marked as finalized. // Returned is the list of pack files which could not be read. -func (r *Repository) CreateIndexFromPacks(ctx context.Context, packsize map[restic.ID]int64, p *progress.Counter) (invalid restic.IDs, err error) { +func (r *Repository) createIndexFromPacks(ctx context.Context, packsize map[restic.ID]int64, p *progress.Counter) (invalid restic.IDs, err error) { var m sync.Mutex debug.Log("Loading index from pack files") diff --git a/internal/repository/repository_test.go b/internal/repository/repository_test.go index f0d3ae486..05b790e33 100644 --- a/internal/repository/repository_test.go +++ b/internal/repository/repository_test.go @@ -161,7 +161,7 @@ func TestLoadBlobBroken(t *testing.T) { data, err := repo.LoadBlob(context.TODO(), restic.TreeBlob, id, nil) rtest.OK(t, err) rtest.Assert(t, bytes.Equal(buf, data), "data mismatch") - pack := repo.Index().Lookup(restic.BlobHandle{Type: restic.TreeBlob, ID: id})[0].PackID + pack := repo.LookupBlob(restic.TreeBlob, id)[0].PackID rtest.Assert(t, c.Has(backend.Handle{Type: restic.PackFile, Name: pack.String()}), "expected tree pack to be cached") } @@ -336,7 +336,7 @@ func benchmarkLoadIndex(b *testing.B, version uint) { } idx.Finalize() - id, err := index.SaveIndex(context.TODO(), repo, idx) + id, err := idx.SaveIndex(context.TODO(), repo) rtest.OK(b, err) b.Logf("index saved as %v", id.Str()) @@ -439,7 +439,7 @@ func TestListPack(t *testing.T) { repo.UseCache(c) // Forcibly cache pack file - packID := repo.Index().Lookup(restic.BlobHandle{Type: restic.TreeBlob, ID: id})[0].PackID + packID := repo.LookupBlob(restic.TreeBlob, id)[0].PackID rtest.OK(t, be.Load(context.TODO(), backend.Handle{Type: restic.PackFile, IsMetadata: true, Name: packID.String()}, 0, 0, func(rd io.Reader) error { return nil })) // Get size to list pack diff --git a/internal/restic/find.go b/internal/restic/find.go index 08670a49f..cefef2196 100644 --- a/internal/restic/find.go +++ b/internal/restic/find.go @@ -11,7 +11,7 @@ import ( // Loader loads a blob from a repository. type Loader interface { LoadBlob(context.Context, BlobType, ID, []byte) ([]byte, error) - LookupBlobSize(id ID, tpe BlobType) (uint, bool) + LookupBlobSize(tpe BlobType, id ID) (uint, bool) Connections() uint } diff --git a/internal/restic/find_test.go b/internal/restic/find_test.go index 1ae30ded9..9b8315ad4 100644 --- a/internal/restic/find_test.go +++ b/internal/restic/find_test.go @@ -166,7 +166,7 @@ func (r ForbiddenRepo) LoadBlob(context.Context, restic.BlobType, restic.ID, []b return nil, errors.New("should not be called") } -func (r ForbiddenRepo) LookupBlobSize(_ restic.ID, _ restic.BlobType) (uint, bool) { +func (r ForbiddenRepo) LookupBlobSize(_ restic.BlobType, _ restic.ID) (uint, bool) { return 0, false } diff --git a/internal/restic/idset.go b/internal/restic/idset.go index 1b12a6398..9e6e3c6fd 100644 --- a/internal/restic/idset.go +++ b/internal/restic/idset.go @@ -105,3 +105,9 @@ func (s IDSet) String() string { str := s.List().String() return "{" + str[1:len(str)-1] + "}" } + +func (s IDSet) Clone() IDSet { + c := NewIDSet() + c.Merge(s) + return c +} diff --git a/internal/restic/idset_test.go b/internal/restic/idset_test.go index 734b31237..14c88b314 100644 --- a/internal/restic/idset_test.go +++ b/internal/restic/idset_test.go @@ -35,4 +35,7 @@ func TestIDSet(t *testing.T) { } rtest.Equals(t, "{1285b303 7bb086db f658198b}", set.String()) + + copied := set.Clone() + rtest.Equals(t, "{1285b303 7bb086db f658198b}", copied.String()) } diff --git a/internal/restic/repository.go b/internal/restic/repository.go index bc0ec2d43..b18b036a7 100644 --- a/internal/restic/repository.go +++ b/internal/restic/repository.go @@ -18,17 +18,32 @@ var ErrInvalidData = errors.New("invalid data returned") type Repository interface { // Connections returns the maximum number of concurrent backend operations Connections() uint - + Config() Config Key() *crypto.Key - Index() MasterIndex - LoadIndex(context.Context, *progress.Counter) error - ClearIndex() - SetIndex(MasterIndex) error - LookupBlobSize(ID, BlobType) (uint, bool) + LoadIndex(ctx context.Context, p *progress.Counter) error + SetIndex(mi MasterIndex) error - Config() Config - PackSize() uint + LookupBlob(t BlobType, id ID) []PackedBlob + LookupBlobSize(t BlobType, id ID) (size uint, exists bool) + + // ListBlobs runs fn on all blobs known to the index. When the context is cancelled, + // the index iteration returns immediately with ctx.Err(). This blocks any modification of the index. + ListBlobs(ctx context.Context, fn func(PackedBlob)) error + ListPacksFromIndex(ctx context.Context, packs IDSet) <-chan PackBlobs + // ListPack returns the list of blobs saved in the pack id and the length of + // the pack header. + ListPack(ctx context.Context, id ID, packSize int64) (entries []Blob, hdrSize uint32, err error) + + LoadBlob(ctx context.Context, t BlobType, id ID, buf []byte) ([]byte, error) + LoadBlobsFromPack(ctx context.Context, packID ID, blobs []Blob, handleBlobFn func(blob BlobHandle, buf []byte, err error) error) error + + // StartPackUploader start goroutines to upload new pack files. The errgroup + // is used to immediately notify about an upload error. Flush() will also return + // that error. + StartPackUploader(ctx context.Context, wg *errgroup.Group) + SaveBlob(ctx context.Context, t BlobType, buf []byte, id ID, storeDuplicate bool) (newID ID, known bool, size int, err error) + Flush(ctx context.Context) error // List calls the function fn for each file of type t in the repository. // When an error is returned by fn, processing stops and List() returns the @@ -36,31 +51,15 @@ type Repository interface { // // The function fn is called in the same Goroutine List() was called from. List(ctx context.Context, t FileType, fn func(ID, int64) error) error - - // ListPack returns the list of blobs saved in the pack id and the length of - // the pack header. - ListPack(context.Context, ID, int64) ([]Blob, uint32, error) - - LoadBlob(context.Context, BlobType, ID, []byte) ([]byte, error) - LoadBlobsFromPack(ctx context.Context, packID ID, blobs []Blob, handleBlobFn func(blob BlobHandle, buf []byte, err error) error) error - SaveBlob(context.Context, BlobType, []byte, ID, bool) (ID, bool, int, error) - - // StartPackUploader start goroutines to upload new pack files. The errgroup - // is used to immediately notify about an upload error. Flush() will also return - // that error. - StartPackUploader(ctx context.Context, wg *errgroup.Group) - Flush(context.Context) error - - // LoadUnpacked loads and decrypts the file with the given type and ID. - LoadUnpacked(ctx context.Context, t FileType, id ID) (data []byte, err error) - SaveUnpacked(context.Context, FileType, []byte) (ID, error) - // RemoveUnpacked removes a file from the repository. This will eventually be restricted to deleting only snapshots. - RemoveUnpacked(ctx context.Context, t FileType, id ID) error - // LoadRaw reads all data stored in the backend for the file with id and filetype t. // If the backend returns data that does not match the id, then the buffer is returned // along with an error that is a restic.ErrInvalidData error. LoadRaw(ctx context.Context, t FileType, id ID) (data []byte, err error) + // LoadUnpacked loads and decrypts the file with the given type and ID. + LoadUnpacked(ctx context.Context, t FileType, id ID) (data []byte, err error) + SaveUnpacked(ctx context.Context, t FileType, buf []byte) (ID, error) + // RemoveUnpacked removes a file from the repository. This will eventually be restricted to deleting only snapshots. + RemoveUnpacked(ctx context.Context, t FileType, id ID) error } type FileType = backend.FileType @@ -86,7 +85,7 @@ type LoaderUnpacked interface { type SaverUnpacked interface { // Connections returns the maximum number of concurrent backend operations Connections() uint - SaveUnpacked(context.Context, FileType, []byte) (ID, error) + SaveUnpacked(ctx context.Context, t FileType, buf []byte) (ID, error) } // RemoverUnpacked allows removing an unpacked blob @@ -106,24 +105,15 @@ type PackBlobs struct { Blobs []Blob } -type MasterIndexSaveOpts struct { - SaveProgress *progress.Counter - DeleteProgress func() *progress.Counter - DeleteReport func(id ID, err error) - SkipDeletion bool -} - // MasterIndex keeps track of the blobs are stored within files. type MasterIndex interface { - Has(BlobHandle) bool - Lookup(BlobHandle) []PackedBlob + Has(bh BlobHandle) bool + Lookup(bh BlobHandle) []PackedBlob // Each runs fn on all blobs known to the index. When the context is cancelled, // the index iteration returns immediately with ctx.Err(). This blocks any modification of the index. Each(ctx context.Context, fn func(PackedBlob)) error ListPacks(ctx context.Context, packs IDSet) <-chan PackBlobs - - Save(ctx context.Context, repo SaverRemoverUnpacked, excludePacks IDSet, extraObsolete IDs, opts MasterIndexSaveOpts) error } // Lister allows listing files in a backend. @@ -141,3 +131,7 @@ type Unpacked interface { SaverUnpacked RemoverUnpacked } + +type ListBlobser interface { + ListBlobs(ctx context.Context, fn func(PackedBlob)) error +} diff --git a/internal/restic/tree_stream.go b/internal/restic/tree_stream.go index 4110a5e8d..123295533 100644 --- a/internal/restic/tree_stream.go +++ b/internal/restic/tree_stream.go @@ -77,7 +77,7 @@ func filterTrees(ctx context.Context, repo Loader, trees IDs, loaderChan chan<- continue } - treeSize, found := repo.LookupBlobSize(nextTreeID.ID, TreeBlob) + treeSize, found := repo.LookupBlobSize(TreeBlob, nextTreeID.ID) if found && treeSize > 50*1024*1024 { loadCh = hugeTreeLoaderChan } else { diff --git a/internal/restorer/filerestorer.go b/internal/restorer/filerestorer.go index f2c134ea9..3551857dd 100644 --- a/internal/restorer/filerestorer.go +++ b/internal/restorer/filerestorer.go @@ -48,7 +48,7 @@ type blobsLoaderFn func(ctx context.Context, packID restic.ID, blobs []restic.Bl // fileRestorer restores set of files type fileRestorer struct { - idx func(restic.BlobHandle) []restic.PackedBlob + idx func(restic.BlobType, restic.ID) []restic.PackedBlob blobsLoader blobsLoaderFn workerCount int @@ -64,7 +64,7 @@ type fileRestorer struct { func newFileRestorer(dst string, blobsLoader blobsLoaderFn, - idx func(restic.BlobHandle) []restic.PackedBlob, + idx func(restic.BlobType, restic.ID) []restic.PackedBlob, connections uint, sparse bool, progress *restore.Progress) *fileRestorer { @@ -99,7 +99,7 @@ func (r *fileRestorer) forEachBlob(blobIDs []restic.ID, fn func(packID restic.ID } for _, blobID := range blobIDs { - packs := r.idx(restic.BlobHandle{ID: blobID, Type: restic.DataBlob}) + packs := r.idx(restic.DataBlob, blobID) if len(packs) == 0 { return errors.Errorf("Unknown blob %s", blobID.String()) } @@ -227,7 +227,7 @@ func (r *fileRestorer) downloadPack(ctx context.Context, pack *packInfo) error { } } else if packsMap, ok := file.blobs.(map[restic.ID][]fileBlobInfo); ok { for _, blob := range packsMap[pack.id] { - idxPacks := r.idx(restic.BlobHandle{ID: blob.id, Type: restic.DataBlob}) + idxPacks := r.idx(restic.DataBlob, blob.id) for _, idxPack := range idxPacks { if idxPack.PackID.Equal(pack.id) { addBlob(idxPack.Blob, blob.offset) diff --git a/internal/restorer/filerestorer_test.go b/internal/restorer/filerestorer_test.go index befeb5d2c..03797e0c8 100644 --- a/internal/restorer/filerestorer_test.go +++ b/internal/restorer/filerestorer_test.go @@ -35,8 +35,8 @@ type TestRepo struct { loader blobsLoaderFn } -func (i *TestRepo) Lookup(bh restic.BlobHandle) []restic.PackedBlob { - packs := i.blobs[bh.ID] +func (i *TestRepo) Lookup(tpe restic.BlobType, id restic.ID) []restic.PackedBlob { + packs := i.blobs[id] return packs } diff --git a/internal/restorer/restorer.go b/internal/restorer/restorer.go index 9f41f5cf2..c471800df 100644 --- a/internal/restorer/restorer.go +++ b/internal/restorer/restorer.go @@ -240,7 +240,7 @@ func (res *Restorer) RestoreTo(ctx context.Context, dst string) error { } idx := NewHardlinkIndex[string]() - filerestorer := newFileRestorer(dst, res.repo.LoadBlobsFromPack, res.repo.Index().Lookup, + filerestorer := newFileRestorer(dst, res.repo.LoadBlobsFromPack, res.repo.LookupBlob, res.repo.Connections(), res.sparse, res.progress) filerestorer.Error = res.Error @@ -435,7 +435,7 @@ func (res *Restorer) verifyFile(target string, node *restic.Node, buf []byte) ([ var offset int64 for _, blobID := range node.Content { - length, found := res.repo.LookupBlobSize(blobID, restic.DataBlob) + length, found := res.repo.LookupBlobSize(restic.DataBlob, blobID) if !found { return buf, errors.Errorf("Unable to fetch blob %s", blobID) }