From 4df887406ff42320ca5d157e66847c3fba36ae5b Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Sun, 19 May 2024 12:41:56 +0200 Subject: [PATCH 01/25] repository: inline MasterIndex interface into Repository interface --- cmd/restic/cmd_cat.go | 2 +- cmd/restic/cmd_copy.go | 6 +-- cmd/restic/cmd_debug.go | 2 +- cmd/restic/cmd_find.go | 6 +-- cmd/restic/cmd_recover.go | 2 +- cmd/restic/cmd_stats.go | 4 +- cmd/restic/integration_helpers_test.go | 4 +- internal/archiver/archiver.go | 4 +- internal/checker/checker.go | 24 +++++------ internal/index/master_index_test.go | 2 +- internal/pack/pack.go | 4 +- internal/repository/check.go | 3 +- internal/repository/prune.go | 19 ++++----- internal/repository/repack.go | 2 +- internal/repository/repack_test.go | 13 ++---- internal/repository/repair_index.go | 4 +- internal/repository/repair_pack.go | 2 +- internal/repository/repair_pack_test.go | 6 +-- internal/repository/repository.go | 33 +++++++++++---- internal/repository/repository_test.go | 4 +- internal/restic/repository.go | 55 ++++++++++++++----------- internal/restorer/restorer.go | 2 +- 22 files changed, 109 insertions(+), 94 deletions(-) diff --git a/cmd/restic/cmd_cat.go b/cmd/restic/cmd_cat.go index 8d11a9dc4..e776b67a1 100644 --- a/cmd/restic/cmd_cat.go +++ b/cmd/restic/cmd_cat.go @@ -168,7 +168,7 @@ func runCat(ctx context.Context, gopts GlobalOptions, args []string) error { for _, t := range []restic.BlobType{restic.DataBlob, restic.TreeBlob} { bh := restic.BlobHandle{ID: id, Type: t} - if !repo.Index().Has(bh) { + if !repo.HasBlob(bh) { continue } diff --git a/cmd/restic/cmd_copy.go b/cmd/restic/cmd_copy.go index ad6c58a25..26b16a374 100644 --- a/cmd/restic/cmd_copy.go +++ b/cmd/restic/cmd_copy.go @@ -187,7 +187,7 @@ func copyTree(ctx context.Context, srcRepo restic.Repository, dstRepo restic.Rep packList := restic.NewIDSet() enqueue := func(h restic.BlobHandle) { - pb := srcRepo.Index().Lookup(h) + pb := srcRepo.LookupBlob(h) copyBlobs.Insert(h) for _, p := range pb { packList.Insert(p.PackID) @@ -202,7 +202,7 @@ func copyTree(ctx context.Context, srcRepo restic.Repository, dstRepo restic.Rep // Do we already have this tree blob? treeHandle := restic.BlobHandle{ID: tree.ID, Type: restic.TreeBlob} - if !dstRepo.Index().Has(treeHandle) { + if !dstRepo.HasBlob(treeHandle) { // copy raw tree bytes to avoid problems if the serialization changes enqueue(treeHandle) } @@ -212,7 +212,7 @@ func copyTree(ctx context.Context, srcRepo restic.Repository, dstRepo restic.Rep // Copy the blobs for this file. for _, blobID := range entry.Content { h := restic.BlobHandle{Type: restic.DataBlob, ID: blobID} - if !dstRepo.Index().Has(h) { + if !dstRepo.HasBlob(h) { enqueue(h) } } diff --git a/cmd/restic/cmd_debug.go b/cmd/restic/cmd_debug.go index 9fb6969d0..7b0cdb53e 100644 --- a/cmd/restic/cmd_debug.go +++ b/cmd/restic/cmd_debug.go @@ -492,7 +492,7 @@ func examinePack(ctx context.Context, opts DebugExamineOptions, repo restic.Repo blobsLoaded := false // examine all data the indexes have for the pack file - for b := range repo.Index().ListPacks(ctx, restic.NewIDSet(id)) { + for b := range repo.ListPacksFromIndex(ctx, restic.NewIDSet(id)) { blobs := b.Blobs if len(blobs) == 0 { continue diff --git a/cmd/restic/cmd_find.go b/cmd/restic/cmd_find.go index 81df0ab98..7ad8886c8 100644 --- a/cmd/restic/cmd_find.go +++ b/cmd/restic/cmd_find.go @@ -465,7 +465,7 @@ func (f *Finder) indexPacksToBlobs(ctx context.Context, packIDs map[string]struc // remember which packs were found in the index indexPackIDs := make(map[string]struct{}) - err := f.repo.Index().Each(wctx, func(pb restic.PackedBlob) { + err := f.repo.ListBlobs(wctx, func(pb restic.PackedBlob) { idStr := pb.PackID.String() // keep entry in packIDs as Each() returns individual index entries matchingID := false @@ -503,15 +503,13 @@ func (f *Finder) indexPacksToBlobs(ctx context.Context, packIDs map[string]struc } func (f *Finder) findObjectPack(id string, t restic.BlobType) { - idx := f.repo.Index() - rid, err := restic.ParseID(id) if err != nil { Printf("Note: cannot find pack for object '%s', unable to parse ID: %v\n", id, err) return } - blobs := idx.Lookup(restic.BlobHandle{ID: rid, Type: t}) + blobs := f.repo.LookupBlob(restic.BlobHandle{ID: rid, Type: t}) if len(blobs) == 0 { Printf("Object %s not found in the index\n", rid.Str()) return diff --git a/cmd/restic/cmd_recover.go b/cmd/restic/cmd_recover.go index debaa4e5b..726f1bf65 100644 --- a/cmd/restic/cmd_recover.go +++ b/cmd/restic/cmd_recover.go @@ -61,7 +61,7 @@ func runRecover(ctx context.Context, gopts GlobalOptions) error { // tree. If it is not referenced, we have a root tree. trees := make(map[restic.ID]bool) - err = repo.Index().Each(ctx, func(blob restic.PackedBlob) { + err = repo.ListBlobs(ctx, func(blob restic.PackedBlob) { if blob.Type == restic.TreeBlob { trees[blob.Blob.ID] = false } diff --git a/cmd/restic/cmd_stats.go b/cmd/restic/cmd_stats.go index a7891e5b0..3bec18f4c 100644 --- a/cmd/restic/cmd_stats.go +++ b/cmd/restic/cmd_stats.go @@ -124,7 +124,7 @@ func runStats(ctx context.Context, opts StatsOptions, gopts GlobalOptions, args if opts.countMode == countModeRawData { // the blob handles have been collected, but not yet counted for blobHandle := range stats.blobs { - pbs := repo.Index().Lookup(blobHandle) + pbs := repo.LookupBlob(blobHandle) if len(pbs) == 0 { return fmt.Errorf("blob %v not found", blobHandle) } @@ -378,7 +378,7 @@ func statsDebugBlobs(ctx context.Context, repo restic.Repository) ([restic.NumBl hist[i] = newSizeHistogram(2 * chunker.MaxSize) } - err := repo.Index().Each(ctx, func(pb restic.PackedBlob) { + err := repo.ListBlobs(ctx, func(pb restic.PackedBlob) { hist[pb.Type].Add(uint64(pb.Length)) }) diff --git a/cmd/restic/integration_helpers_test.go b/cmd/restic/integration_helpers_test.go index 2812eda6d..978deab3d 100644 --- a/cmd/restic/integration_helpers_test.go +++ b/cmd/restic/integration_helpers_test.go @@ -252,7 +252,7 @@ func listTreePacks(gopts GlobalOptions, t *testing.T) restic.IDSet { rtest.OK(t, r.LoadIndex(ctx, nil)) treePacks := restic.NewIDSet() - rtest.OK(t, r.Index().Each(ctx, func(pb restic.PackedBlob) { + rtest.OK(t, r.ListBlobs(ctx, func(pb restic.PackedBlob) { if pb.Type == restic.TreeBlob { treePacks.Insert(pb.PackID) } @@ -280,7 +280,7 @@ func removePacksExcept(gopts GlobalOptions, t testing.TB, keep restic.IDSet, rem rtest.OK(t, r.LoadIndex(ctx, nil)) treePacks := restic.NewIDSet() - rtest.OK(t, r.Index().Each(ctx, func(pb restic.PackedBlob) { + rtest.OK(t, r.ListBlobs(ctx, func(pb restic.PackedBlob) { if pb.Type == restic.TreeBlob { treePacks.Insert(pb.PackID) } diff --git a/internal/archiver/archiver.go b/internal/archiver/archiver.go index c1f73eea6..10034afa1 100644 --- a/internal/archiver/archiver.go +++ b/internal/archiver/archiver.go @@ -276,7 +276,7 @@ func (arch *Archiver) loadSubtree(ctx context.Context, node *restic.Node) (*rest } func (arch *Archiver) wrapLoadTreeError(id restic.ID, err error) error { - if arch.Repo.Index().Has(restic.BlobHandle{ID: id, Type: restic.TreeBlob}) { + if arch.Repo.HasBlob(restic.BlobHandle{ID: id, Type: restic.TreeBlob}) { err = errors.Errorf("tree %v could not be loaded; the repository could be damaged: %v", id, err) } else { err = errors.Errorf("tree %v is not known; the repository could be damaged, run `repair index` to try to repair it", id) @@ -390,7 +390,7 @@ func (fn *FutureNode) take(ctx context.Context) futureNodeResult { func (arch *Archiver) allBlobsPresent(previous *restic.Node) bool { // check if all blobs are contained in index for _, id := range previous.Content { - if !arch.Repo.Index().Has(restic.BlobHandle{ID: id, Type: restic.DataBlob}) { + if !arch.Repo.HasBlob(restic.BlobHandle{ID: id, Type: restic.DataBlob}) { return false } } diff --git a/internal/checker/checker.go b/internal/checker/checker.go index dc83aef5b..09b1dd7eb 100644 --- a/internal/checker/checker.go +++ b/internal/checker/checker.go @@ -91,9 +91,9 @@ func (c *Checker) LoadSnapshots(ctx context.Context) error { return err } -func computePackTypes(ctx context.Context, idx restic.MasterIndex) (map[restic.ID]restic.BlobType, error) { +func computePackTypes(ctx context.Context, idx restic.ListBlobser) (map[restic.ID]restic.BlobType, error) { packs := make(map[restic.ID]restic.BlobType) - err := idx.Each(ctx, func(pb restic.PackedBlob) { + err := idx.ListBlobs(ctx, func(pb restic.PackedBlob) { tpe, exists := packs[pb.PackID] if exists { if pb.Type != tpe { @@ -177,12 +177,18 @@ func (c *Checker) LoadIndex(ctx context.Context, p *progress.Counter) (hints []e return hints, append(errs, err) } + err = c.repo.SetIndex(c.masterIndex) + if err != nil { + debug.Log("SetIndex returned error: %v", err) + errs = append(errs, err) + } + // compute pack size using index entries - c.packs, err = pack.Size(ctx, c.masterIndex, false) + c.packs, err = pack.Size(ctx, c.repo, false) if err != nil { return hints, append(errs, err) } - packTypes, err := computePackTypes(ctx, c.masterIndex) + packTypes, err := computePackTypes(ctx, c.repo) if err != nil { return hints, append(errs, err) } @@ -203,12 +209,6 @@ func (c *Checker) LoadIndex(ctx context.Context, p *progress.Counter) (hints []e } } - err = c.repo.SetIndex(c.masterIndex) - if err != nil { - debug.Log("SetIndex returned error: %v", err) - errs = append(errs, err) - } - return hints, errs } @@ -488,7 +488,7 @@ func (c *Checker) UnusedBlobs(ctx context.Context) (blobs restic.BlobHandles, er ctx, cancel := context.WithCancel(ctx) defer cancel() - err = c.repo.Index().Each(ctx, func(blob restic.PackedBlob) { + err = c.repo.ListBlobs(ctx, func(blob restic.PackedBlob) { h := restic.BlobHandle{ID: blob.ID, Type: blob.Type} if !c.blobRefs.M.Has(h) { debug.Log("blob %v not referenced", h) @@ -573,7 +573,7 @@ func (c *Checker) ReadPacks(ctx context.Context, packs map[restic.ID]int64, p *p } // push packs to ch - for pbs := range c.repo.Index().ListPacks(ctx, packSet) { + for pbs := range c.repo.ListPacksFromIndex(ctx, packSet) { size := packs[pbs.PackID] debug.Log("listed %v", pbs.PackID) select { diff --git a/internal/index/master_index_test.go b/internal/index/master_index_test.go index c3560a7fb..36a028768 100644 --- a/internal/index/master_index_test.go +++ b/internal/index/master_index_test.go @@ -362,7 +362,7 @@ func testIndexSave(t *testing.T, version uint) { t.Fatal(err) } - err = repo.Index().Save(context.TODO(), repo, nil, nil, restic.MasterIndexSaveOpts{}) + err = repo.SaveIndex(context.TODO(), nil, nil, restic.MasterIndexSaveOpts{}) if err != nil { t.Fatalf("unable to save new index: %v", err) } diff --git a/internal/pack/pack.go b/internal/pack/pack.go index 7d8d87e71..57957ce91 100644 --- a/internal/pack/pack.go +++ b/internal/pack/pack.go @@ -389,10 +389,10 @@ func CalculateHeaderSize(blobs []restic.Blob) int { // If onlyHdr is set to true, only the size of the header is returned // Note that this function only gives correct sizes, if there are no // duplicates in the index. -func Size(ctx context.Context, mi restic.MasterIndex, onlyHdr bool) (map[restic.ID]int64, error) { +func Size(ctx context.Context, mi restic.ListBlobser, onlyHdr bool) (map[restic.ID]int64, error) { packSize := make(map[restic.ID]int64) - err := mi.Each(ctx, func(blob restic.PackedBlob) { + err := mi.ListBlobs(ctx, func(blob restic.PackedBlob) { size, ok := packSize[blob.PackID] if !ok { size = headerSize diff --git a/internal/repository/check.go b/internal/repository/check.go index 8018f4902..05605db86 100644 --- a/internal/repository/check.go +++ b/internal/repository/check.go @@ -158,11 +158,10 @@ func checkPackInner(ctx context.Context, r *Repository, id restic.ID, blobs []re errs = append(errs, errors.Errorf("pack header size does not match, want %v, got %v", idxHdrSize, hdrSize)) } - idx := r.Index() for _, blob := range blobs { // Check if blob is contained in index and position is correct idxHas := false - for _, pb := range idx.Lookup(blob.BlobHandle) { + for _, pb := range r.LookupBlob(blob.BlobHandle) { if pb.PackID == id && pb.Blob == blob { idxHas = true break diff --git a/internal/repository/prune.go b/internal/repository/prune.go index 479439e6a..66eab28b2 100644 --- a/internal/repository/prune.go +++ b/internal/repository/prune.go @@ -7,7 +7,6 @@ import ( "sort" "github.com/restic/restic/internal/errors" - "github.com/restic/restic/internal/index" "github.com/restic/restic/internal/pack" "github.com/restic/restic/internal/restic" "github.com/restic/restic/internal/ui/progress" @@ -67,7 +66,7 @@ type PrunePlan struct { removePacks restic.IDSet // packs to remove ignorePacks restic.IDSet // packs to ignore when rebuilding the index - repo restic.Repository + repo *Repository stats PruneStats opts PruneOptions } @@ -89,7 +88,7 @@ type packInfoWithID struct { // PlanPrune selects which files to rewrite and which to delete and which blobs to keep. // Also some summary statistics are returned. -func PlanPrune(ctx context.Context, opts PruneOptions, repo restic.Repository, getUsedBlobs func(ctx context.Context, repo restic.Repository) (usedBlobs restic.CountedBlobSet, err error), printer progress.Printer) (*PrunePlan, error) { +func PlanPrune(ctx context.Context, opts PruneOptions, repo *Repository, getUsedBlobs func(ctx context.Context, repo restic.Repository) (usedBlobs restic.CountedBlobSet, err error), printer progress.Printer) (*PrunePlan, error) { var stats PruneStats if opts.UnsafeRecovery { @@ -109,7 +108,7 @@ func PlanPrune(ctx context.Context, opts PruneOptions, repo restic.Repository, g } printer.P("searching used packs...\n") - keepBlobs, indexPack, err := packInfoFromIndex(ctx, repo.Index(), usedBlobs, &stats, printer) + keepBlobs, indexPack, err := packInfoFromIndex(ctx, repo, usedBlobs, &stats, printer) if err != nil { return nil, err } @@ -124,7 +123,7 @@ func PlanPrune(ctx context.Context, opts PruneOptions, repo restic.Repository, g blobCount := keepBlobs.Len() // when repacking, we do not want to keep blobs which are // already contained in kept packs, so delete them from keepBlobs - err := repo.Index().Each(ctx, func(blob restic.PackedBlob) { + err := repo.ListBlobs(ctx, func(blob restic.PackedBlob) { if plan.removePacks.Has(blob.PackID) || plan.repackPacks.Has(blob.PackID) { return } @@ -151,11 +150,11 @@ func PlanPrune(ctx context.Context, opts PruneOptions, repo restic.Repository, g return &plan, nil } -func packInfoFromIndex(ctx context.Context, idx restic.MasterIndex, usedBlobs restic.CountedBlobSet, stats *PruneStats, printer progress.Printer) (restic.CountedBlobSet, map[restic.ID]packInfo, error) { +func packInfoFromIndex(ctx context.Context, idx restic.ListBlobser, usedBlobs restic.CountedBlobSet, stats *PruneStats, printer progress.Printer) (restic.CountedBlobSet, map[restic.ID]packInfo, error) { // iterate over all blobs in index to find out which blobs are duplicates // The counter in usedBlobs describes how many instances of the blob exist in the repository index // Thus 0 == blob is missing, 1 == blob exists once, >= 2 == duplicates exist - err := idx.Each(ctx, func(blob restic.PackedBlob) { + err := idx.ListBlobs(ctx, func(blob restic.PackedBlob) { bh := blob.BlobHandle count, ok := usedBlobs[bh] if ok { @@ -205,7 +204,7 @@ func packInfoFromIndex(ctx context.Context, idx restic.MasterIndex, usedBlobs re hasDuplicates := false // iterate over all blobs in index to generate packInfo - err = idx.Each(ctx, func(blob restic.PackedBlob) { + err = idx.ListBlobs(ctx, func(blob restic.PackedBlob) { ip := indexPack[blob.PackID] // Set blob type if not yet set @@ -260,7 +259,7 @@ func packInfoFromIndex(ctx context.Context, idx restic.MasterIndex, usedBlobs re // - if there are no used blobs in a pack, possibly mark duplicates as "unused" if hasDuplicates { // iterate again over all blobs in index (this is pretty cheap, all in-mem) - err = idx.Each(ctx, func(blob restic.PackedBlob) { + err = idx.ListBlobs(ctx, func(blob restic.PackedBlob) { bh := blob.BlobHandle count, ok := usedBlobs[bh] // skip non-duplicate, aka. normal blobs @@ -581,7 +580,7 @@ func (plan *PrunePlan) Execute(ctx context.Context, printer progress.Printer) (e if plan.opts.UnsafeRecovery { printer.P("deleting index files\n") - indexFiles := repo.Index().(*index.MasterIndex).IDs() + indexFiles := repo.idx.IDs() err = deleteFiles(ctx, false, repo, indexFiles, restic.IndexFile, printer) if err != nil { return errors.Fatalf("%s", err) diff --git a/internal/repository/repack.go b/internal/repository/repack.go index 5bedcfa56..8c9ca28bb 100644 --- a/internal/repository/repack.go +++ b/internal/repository/repack.go @@ -54,7 +54,7 @@ func repack(ctx context.Context, repo restic.Repository, dstRepo restic.Reposito downloadQueue := make(chan restic.PackBlobs) wg.Go(func() error { defer close(downloadQueue) - for pbs := range repo.Index().ListPacks(wgCtx, packs) { + for pbs := range repo.ListPacksFromIndex(wgCtx, packs) { var packBlobs []restic.Blob keepMutex.Lock() // filter out unnecessary blobs diff --git a/internal/repository/repack_test.go b/internal/repository/repack_test.go index 3fd56ccb1..e0f1b4254 100644 --- a/internal/repository/repack_test.go +++ b/internal/repository/repack_test.go @@ -145,9 +145,8 @@ func listFiles(t *testing.T, repo restic.Lister, tpe backend.FileType) restic.ID func findPacksForBlobs(t *testing.T, repo restic.Repository, blobs restic.BlobSet) restic.IDSet { packs := restic.NewIDSet() - idx := repo.Index() for h := range blobs { - list := idx.Lookup(h) + list := repo.LookupBlob(h) if len(list) == 0 { t.Fatal("Failed to find blob", h.ID.Str(), "with type", h.Type) } @@ -195,7 +194,7 @@ func rebuildIndex(t *testing.T, repo restic.Repository) { }) rtest.OK(t, err) - err = repo.Index().Save(context.TODO(), repo, restic.NewIDSet(), obsoleteIndexes, restic.MasterIndexSaveOpts{}) + err = repo.SaveIndex(context.TODO(), restic.NewIDSet(), obsoleteIndexes, restic.MasterIndexSaveOpts{}) rtest.OK(t, err) } @@ -252,10 +251,8 @@ func testRepack(t *testing.T, version uint) { } } - idx := repo.Index() - for h := range keepBlobs { - list := idx.Lookup(h) + list := repo.LookupBlob(h) if len(list) == 0 { t.Errorf("unable to find blob %v in repo", h.ID.Str()) continue @@ -318,10 +315,8 @@ func testRepackCopy(t *testing.T, version uint) { rebuildIndex(t, dstRepo) reloadIndex(t, dstRepo) - idx := dstRepo.Index() - for h := range keepBlobs { - list := idx.Lookup(h) + list := dstRepo.LookupBlob(h) if len(list) == 0 { t.Errorf("unable to find blob %v in repo", h.ID.Str()) continue diff --git a/internal/repository/repair_index.go b/internal/repository/repair_index.go index a6e732b44..a7d94fcf8 100644 --- a/internal/repository/repair_index.go +++ b/internal/repository/repair_index.go @@ -54,7 +54,7 @@ func RepairIndex(ctx context.Context, repo *Repository, opts RepairIndexOptions, if err != nil { return err } - packSizeFromIndex, err = pack.Size(ctx, repo.Index(), false) + packSizeFromIndex, err = pack.Size(ctx, repo, false) if err != nil { return err } @@ -115,7 +115,7 @@ func rebuildIndexFiles(ctx context.Context, repo restic.Repository, removePacks printer.P("rebuilding index\n") bar := printer.NewCounter("packs processed") - return repo.Index().Save(ctx, repo, removePacks, extraObsolete, restic.MasterIndexSaveOpts{ + return repo.SaveIndex(ctx, removePacks, extraObsolete, restic.MasterIndexSaveOpts{ SaveProgress: bar, DeleteProgress: func() *progress.Counter { return printer.NewCounter("old indexes deleted") diff --git a/internal/repository/repair_pack.go b/internal/repository/repair_pack.go index a0bd56012..cac7aac10 100644 --- a/internal/repository/repair_pack.go +++ b/internal/repository/repair_pack.go @@ -21,7 +21,7 @@ func RepairPacks(ctx context.Context, repo restic.Repository, ids restic.IDSet, wg.Go(func() error { // examine all data the indexes have for the pack file - for b := range repo.Index().ListPacks(wgCtx, ids) { + for b := range repo.ListPacksFromIndex(wgCtx, ids) { blobs := b.Blobs if len(blobs) == 0 { printer.E("no blobs found for pack %v", b.PackID) diff --git a/internal/repository/repair_pack_test.go b/internal/repository/repair_pack_test.go index 7acdc646e..28a5525a2 100644 --- a/internal/repository/repair_pack_test.go +++ b/internal/repository/repair_pack_test.go @@ -18,7 +18,7 @@ import ( func listBlobs(repo restic.Repository) restic.BlobSet { blobs := restic.NewBlobSet() - _ = repo.Index().Each(context.TODO(), func(pb restic.PackedBlob) { + _ = repo.ListBlobs(context.TODO(), func(pb restic.PackedBlob) { blobs.Insert(pb.BlobHandle) }) return blobs @@ -68,7 +68,7 @@ func testRepairBrokenPack(t *testing.T, version uint) { // find blob that starts at offset 0 var damagedBlob restic.BlobHandle - for blobs := range repo.Index().ListPacks(context.TODO(), restic.NewIDSet(damagedID)) { + for blobs := range repo.ListPacksFromIndex(context.TODO(), restic.NewIDSet(damagedID)) { for _, blob := range blobs.Blobs { if blob.Offset == 0 { damagedBlob = blob.BlobHandle @@ -91,7 +91,7 @@ func testRepairBrokenPack(t *testing.T, version uint) { // all blobs in the file are broken damagedBlobs := restic.NewBlobSet() - for blobs := range repo.Index().ListPacks(context.TODO(), restic.NewIDSet(damagedID)) { + for blobs := range repo.ListPacksFromIndex(context.TODO(), restic.NewIDSet(damagedID)) { for _, blob := range blobs.Blobs { damagedBlobs.Insert(blob.BlobHandle) } diff --git a/internal/repository/repository.go b/internal/repository/repository.go index 534edc9fd..4aa8106ab 100644 --- a/internal/repository/repository.go +++ b/internal/repository/repository.go @@ -300,11 +300,6 @@ func (r *Repository) loadBlob(ctx context.Context, blobs []restic.PackedBlob, bu return nil, errors.Errorf("loading %v from %v packs failed", blobs[0].BlobHandle, len(blobs)) } -// LookupBlobSize returns the size of blob id. -func (r *Repository) LookupBlobSize(id restic.ID, tpe restic.BlobType) (uint, bool) { - return r.idx.LookupSize(restic.BlobHandle{ID: id, Type: tpe}) -} - func (r *Repository) getZstdEncoder() *zstd.Encoder { r.allocEnc.Do(func() { level := zstd.SpeedDefault @@ -583,9 +578,31 @@ func (r *Repository) Connections() uint { return r.be.Connections() } -// Index returns the currently used MasterIndex. -func (r *Repository) Index() restic.MasterIndex { - return r.idx +func (r *Repository) HasBlob(bh restic.BlobHandle) bool { + return r.idx.Has(bh) +} + +func (r *Repository) LookupBlob(bh restic.BlobHandle) []restic.PackedBlob { + return r.idx.Lookup(bh) +} + +// LookupBlobSize returns the size of blob id. +func (r *Repository) LookupBlobSize(id restic.ID, tpe restic.BlobType) (uint, bool) { + return r.idx.LookupSize(restic.BlobHandle{ID: id, Type: tpe}) +} + +func (r *Repository) SaveIndex(ctx context.Context, excludePacks restic.IDSet, extraObsolete restic.IDs, opts restic.MasterIndexSaveOpts) error { + return r.idx.Save(ctx, r, excludePacks, extraObsolete, opts) +} + +// ListBlobs runs fn on all blobs known to the index. When the context is cancelled, +// the index iteration returns immediately with ctx.Err(). This blocks any modification of the index. +func (r *Repository) ListBlobs(ctx context.Context, fn func(restic.PackedBlob)) error { + return r.idx.Each(ctx, fn) +} + +func (r *Repository) ListPacksFromIndex(ctx context.Context, packs restic.IDSet) <-chan restic.PackBlobs { + return r.idx.ListPacks(ctx, packs) } // SetIndex instructs the repository to use the given index. diff --git a/internal/repository/repository_test.go b/internal/repository/repository_test.go index f0d3ae486..31a588f62 100644 --- a/internal/repository/repository_test.go +++ b/internal/repository/repository_test.go @@ -161,7 +161,7 @@ func TestLoadBlobBroken(t *testing.T) { data, err := repo.LoadBlob(context.TODO(), restic.TreeBlob, id, nil) rtest.OK(t, err) rtest.Assert(t, bytes.Equal(buf, data), "data mismatch") - pack := repo.Index().Lookup(restic.BlobHandle{Type: restic.TreeBlob, ID: id})[0].PackID + pack := repo.LookupBlob(restic.BlobHandle{Type: restic.TreeBlob, ID: id})[0].PackID rtest.Assert(t, c.Has(backend.Handle{Type: restic.PackFile, Name: pack.String()}), "expected tree pack to be cached") } @@ -439,7 +439,7 @@ func TestListPack(t *testing.T) { repo.UseCache(c) // Forcibly cache pack file - packID := repo.Index().Lookup(restic.BlobHandle{Type: restic.TreeBlob, ID: id})[0].PackID + packID := repo.LookupBlob(restic.BlobHandle{Type: restic.TreeBlob, ID: id})[0].PackID rtest.OK(t, be.Load(context.TODO(), backend.Handle{Type: restic.PackFile, IsMetadata: true, Name: packID.String()}, 0, 0, func(rd io.Reader) error { return nil })) // Get size to list pack diff --git a/internal/restic/repository.go b/internal/restic/repository.go index bc0ec2d43..ee32beb7e 100644 --- a/internal/restic/repository.go +++ b/internal/restic/repository.go @@ -18,17 +18,36 @@ var ErrInvalidData = errors.New("invalid data returned") type Repository interface { // Connections returns the maximum number of concurrent backend operations Connections() uint - + Config() Config + PackSize() uint Key() *crypto.Key - Index() MasterIndex LoadIndex(context.Context, *progress.Counter) error ClearIndex() SetIndex(MasterIndex) error + SaveIndex(ctx context.Context, excludePacks IDSet, extraObsolete IDs, opts MasterIndexSaveOpts) error + + HasBlob(BlobHandle) bool + LookupBlob(BlobHandle) []PackedBlob LookupBlobSize(ID, BlobType) (uint, bool) - Config() Config - PackSize() uint + // ListBlobs runs fn on all blobs known to the index. When the context is cancelled, + // the index iteration returns immediately with ctx.Err(). This blocks any modification of the index. + ListBlobs(ctx context.Context, fn func(PackedBlob)) error + ListPacksFromIndex(ctx context.Context, packs IDSet) <-chan PackBlobs + // ListPack returns the list of blobs saved in the pack id and the length of + // the pack header. + ListPack(context.Context, ID, int64) ([]Blob, uint32, error) + + LoadBlob(context.Context, BlobType, ID, []byte) ([]byte, error) + LoadBlobsFromPack(ctx context.Context, packID ID, blobs []Blob, handleBlobFn func(blob BlobHandle, buf []byte, err error) error) error + + // StartPackUploader start goroutines to upload new pack files. The errgroup + // is used to immediately notify about an upload error. Flush() will also return + // that error. + StartPackUploader(ctx context.Context, wg *errgroup.Group) + SaveBlob(context.Context, BlobType, []byte, ID, bool) (ID, bool, int, error) + Flush(context.Context) error // List calls the function fn for each file of type t in the repository. // When an error is returned by fn, processing stops and List() returns the @@ -36,31 +55,15 @@ type Repository interface { // // The function fn is called in the same Goroutine List() was called from. List(ctx context.Context, t FileType, fn func(ID, int64) error) error - - // ListPack returns the list of blobs saved in the pack id and the length of - // the pack header. - ListPack(context.Context, ID, int64) ([]Blob, uint32, error) - - LoadBlob(context.Context, BlobType, ID, []byte) ([]byte, error) - LoadBlobsFromPack(ctx context.Context, packID ID, blobs []Blob, handleBlobFn func(blob BlobHandle, buf []byte, err error) error) error - SaveBlob(context.Context, BlobType, []byte, ID, bool) (ID, bool, int, error) - - // StartPackUploader start goroutines to upload new pack files. The errgroup - // is used to immediately notify about an upload error. Flush() will also return - // that error. - StartPackUploader(ctx context.Context, wg *errgroup.Group) - Flush(context.Context) error - + // LoadRaw reads all data stored in the backend for the file with id and filetype t. + // If the backend returns data that does not match the id, then the buffer is returned + // along with an error that is a restic.ErrInvalidData error. + LoadRaw(ctx context.Context, t FileType, id ID) (data []byte, err error) // LoadUnpacked loads and decrypts the file with the given type and ID. LoadUnpacked(ctx context.Context, t FileType, id ID) (data []byte, err error) SaveUnpacked(context.Context, FileType, []byte) (ID, error) // RemoveUnpacked removes a file from the repository. This will eventually be restricted to deleting only snapshots. RemoveUnpacked(ctx context.Context, t FileType, id ID) error - - // LoadRaw reads all data stored in the backend for the file with id and filetype t. - // If the backend returns data that does not match the id, then the buffer is returned - // along with an error that is a restic.ErrInvalidData error. - LoadRaw(ctx context.Context, t FileType, id ID) (data []byte, err error) } type FileType = backend.FileType @@ -141,3 +144,7 @@ type Unpacked interface { SaverUnpacked RemoverUnpacked } + +type ListBlobser interface { + ListBlobs(ctx context.Context, fn func(PackedBlob)) error +} diff --git a/internal/restorer/restorer.go b/internal/restorer/restorer.go index 9f41f5cf2..721330a8c 100644 --- a/internal/restorer/restorer.go +++ b/internal/restorer/restorer.go @@ -240,7 +240,7 @@ func (res *Restorer) RestoreTo(ctx context.Context, dst string) error { } idx := NewHardlinkIndex[string]() - filerestorer := newFileRestorer(dst, res.repo.LoadBlobsFromPack, res.repo.Index().Lookup, + filerestorer := newFileRestorer(dst, res.repo.LoadBlobsFromPack, res.repo.LookupBlob, res.repo.Connections(), res.sparse, res.progress) filerestorer.Error = res.Error From 8f1e70cd9b3ceac6f35562abdbff8044f15f9a89 Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Sun, 19 May 2024 12:51:58 +0200 Subject: [PATCH 02/25] repository: remove clearIndex and packSize from public interface --- internal/repository/prune.go | 8 ++++---- internal/repository/repair_index.go | 2 +- internal/repository/repository.go | 12 ++++++------ internal/restic/repository.go | 2 -- 4 files changed, 11 insertions(+), 13 deletions(-) diff --git a/internal/repository/prune.go b/internal/repository/prune.go index 66eab28b2..712986e61 100644 --- a/internal/repository/prune.go +++ b/internal/repository/prune.go @@ -313,7 +313,7 @@ func packInfoFromIndex(ctx context.Context, idx restic.ListBlobser, usedBlobs re return usedBlobs, indexPack, nil } -func decidePackAction(ctx context.Context, opts PruneOptions, repo restic.Repository, indexPack map[restic.ID]packInfo, stats *PruneStats, printer progress.Printer) (PrunePlan, error) { +func decidePackAction(ctx context.Context, opts PruneOptions, repo *Repository, indexPack map[restic.ID]packInfo, stats *PruneStats, printer progress.Printer) (PrunePlan, error) { removePacksFirst := restic.NewIDSet() removePacks := restic.NewIDSet() repackPacks := restic.NewIDSet() @@ -322,10 +322,10 @@ func decidePackAction(ctx context.Context, opts PruneOptions, repo restic.Reposi var repackSmallCandidates []packInfoWithID repoVersion := repo.Config().Version // only repack very small files by default - targetPackSize := repo.PackSize() / 25 + targetPackSize := repo.packSize() / 25 if opts.RepackSmall { // consider files with at least 80% of the target size as large enough - targetPackSize = repo.PackSize() / 5 * 4 + targetPackSize = repo.packSize() / 5 * 4 } // loop over all packs and decide what to do @@ -612,7 +612,7 @@ func (plan *PrunePlan) Execute(ctx context.Context, printer progress.Printer) (e } // drop outdated in-memory index - repo.ClearIndex() + repo.clearIndex() printer.P("done\n") return nil diff --git a/internal/repository/repair_index.go b/internal/repository/repair_index.go index a7d94fcf8..c20641d50 100644 --- a/internal/repository/repair_index.go +++ b/internal/repository/repair_index.go @@ -107,7 +107,7 @@ func RepairIndex(ctx context.Context, repo *Repository, opts RepairIndexOptions, } // drop outdated in-memory index - repo.ClearIndex() + repo.clearIndex() return nil } diff --git a/internal/repository/repository.go b/internal/repository/repository.go index 4aa8106ab..48bb9a0ad 100644 --- a/internal/repository/repository.go +++ b/internal/repository/repository.go @@ -146,8 +146,8 @@ func (r *Repository) Config() restic.Config { return r.cfg } -// PackSize return the target size of a pack file when uploading -func (r *Repository) PackSize() uint { +// packSize return the target size of a pack file when uploading +func (r *Repository) packSize() uint { return r.opts.PackSize } @@ -541,8 +541,8 @@ func (r *Repository) StartPackUploader(ctx context.Context, wg *errgroup.Group) innerWg, ctx := errgroup.WithContext(ctx) r.packerWg = innerWg r.uploader = newPackerUploader(ctx, innerWg, r, r.be.Connections()) - r.treePM = newPackerManager(r.key, restic.TreeBlob, r.PackSize(), r.uploader.QueuePacker) - r.dataPM = newPackerManager(r.key, restic.DataBlob, r.PackSize(), r.uploader.QueuePacker) + r.treePM = newPackerManager(r.key, restic.TreeBlob, r.packSize(), r.uploader.QueuePacker) + r.dataPM = newPackerManager(r.key, restic.DataBlob, r.packSize(), r.uploader.QueuePacker) wg.Go(func() error { return innerWg.Wait() @@ -612,7 +612,7 @@ func (r *Repository) SetIndex(i restic.MasterIndex) error { return r.prepareCache() } -func (r *Repository) ClearIndex() { +func (r *Repository) clearIndex() { r.idx = index.NewMasterIndex() r.configureIndex() } @@ -646,7 +646,7 @@ func (r *Repository) LoadIndex(ctx context.Context, p *progress.Counter) error { } // reset in-memory index before loading it from the repository - r.ClearIndex() + r.clearIndex() err = index.ForAllIndexes(ctx, indexList, r, func(_ restic.ID, idx *index.Index, _ bool, err error) error { if err != nil { diff --git a/internal/restic/repository.go b/internal/restic/repository.go index ee32beb7e..06344a630 100644 --- a/internal/restic/repository.go +++ b/internal/restic/repository.go @@ -19,11 +19,9 @@ type Repository interface { // Connections returns the maximum number of concurrent backend operations Connections() uint Config() Config - PackSize() uint Key() *crypto.Key LoadIndex(context.Context, *progress.Counter) error - ClearIndex() SetIndex(MasterIndex) error SaveIndex(ctx context.Context, excludePacks IDSet, extraObsolete IDs, opts MasterIndexSaveOpts) error From 0aa5c5384263a398ef87927151c204f8eeed6d49 Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Sun, 19 May 2024 12:58:41 +0200 Subject: [PATCH 03/25] repository: replace HasBlob with LookupBlobSize --- cmd/restic/cmd_cat.go | 3 +-- cmd/restic/cmd_copy.go | 4 ++-- internal/archiver/archiver.go | 4 ++-- internal/repository/repository.go | 4 ---- internal/restic/repository.go | 1 - 5 files changed, 5 insertions(+), 11 deletions(-) diff --git a/cmd/restic/cmd_cat.go b/cmd/restic/cmd_cat.go index e776b67a1..de579587f 100644 --- a/cmd/restic/cmd_cat.go +++ b/cmd/restic/cmd_cat.go @@ -167,8 +167,7 @@ func runCat(ctx context.Context, gopts GlobalOptions, args []string) error { } for _, t := range []restic.BlobType{restic.DataBlob, restic.TreeBlob} { - bh := restic.BlobHandle{ID: id, Type: t} - if !repo.HasBlob(bh) { + if _, ok := repo.LookupBlobSize(id, t); !ok { continue } diff --git a/cmd/restic/cmd_copy.go b/cmd/restic/cmd_copy.go index 26b16a374..0df899321 100644 --- a/cmd/restic/cmd_copy.go +++ b/cmd/restic/cmd_copy.go @@ -202,7 +202,7 @@ func copyTree(ctx context.Context, srcRepo restic.Repository, dstRepo restic.Rep // Do we already have this tree blob? treeHandle := restic.BlobHandle{ID: tree.ID, Type: restic.TreeBlob} - if !dstRepo.HasBlob(treeHandle) { + if _, ok := dstRepo.LookupBlobSize(treeHandle.ID, treeHandle.Type); !ok { // copy raw tree bytes to avoid problems if the serialization changes enqueue(treeHandle) } @@ -212,7 +212,7 @@ func copyTree(ctx context.Context, srcRepo restic.Repository, dstRepo restic.Rep // Copy the blobs for this file. for _, blobID := range entry.Content { h := restic.BlobHandle{Type: restic.DataBlob, ID: blobID} - if !dstRepo.HasBlob(h) { + if _, ok := dstRepo.LookupBlobSize(h.ID, h.Type); !ok { enqueue(h) } } diff --git a/internal/archiver/archiver.go b/internal/archiver/archiver.go index 10034afa1..1de28082b 100644 --- a/internal/archiver/archiver.go +++ b/internal/archiver/archiver.go @@ -276,7 +276,7 @@ func (arch *Archiver) loadSubtree(ctx context.Context, node *restic.Node) (*rest } func (arch *Archiver) wrapLoadTreeError(id restic.ID, err error) error { - if arch.Repo.HasBlob(restic.BlobHandle{ID: id, Type: restic.TreeBlob}) { + if _, ok := arch.Repo.LookupBlobSize(id, restic.TreeBlob); ok { err = errors.Errorf("tree %v could not be loaded; the repository could be damaged: %v", id, err) } else { err = errors.Errorf("tree %v is not known; the repository could be damaged, run `repair index` to try to repair it", id) @@ -390,7 +390,7 @@ func (fn *FutureNode) take(ctx context.Context) futureNodeResult { func (arch *Archiver) allBlobsPresent(previous *restic.Node) bool { // check if all blobs are contained in index for _, id := range previous.Content { - if !arch.Repo.HasBlob(restic.BlobHandle{ID: id, Type: restic.DataBlob}) { + if _, ok := arch.Repo.LookupBlobSize(id, restic.DataBlob); !ok { return false } } diff --git a/internal/repository/repository.go b/internal/repository/repository.go index 48bb9a0ad..e5983ee16 100644 --- a/internal/repository/repository.go +++ b/internal/repository/repository.go @@ -578,10 +578,6 @@ func (r *Repository) Connections() uint { return r.be.Connections() } -func (r *Repository) HasBlob(bh restic.BlobHandle) bool { - return r.idx.Has(bh) -} - func (r *Repository) LookupBlob(bh restic.BlobHandle) []restic.PackedBlob { return r.idx.Lookup(bh) } diff --git a/internal/restic/repository.go b/internal/restic/repository.go index 06344a630..d16c12018 100644 --- a/internal/restic/repository.go +++ b/internal/restic/repository.go @@ -25,7 +25,6 @@ type Repository interface { SetIndex(MasterIndex) error SaveIndex(ctx context.Context, excludePacks IDSet, extraObsolete IDs, opts MasterIndexSaveOpts) error - HasBlob(BlobHandle) bool LookupBlob(BlobHandle) []PackedBlob LookupBlobSize(ID, BlobType) (uint, bool) From c01bcb10018ac6f416d7f7750f83fb814b4d7d55 Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Sun, 19 May 2024 13:03:14 +0200 Subject: [PATCH 04/25] archiver: remove unused masterIndex from test --- internal/archiver/blob_saver_test.go | 11 +---------- 1 file changed, 1 insertion(+), 10 deletions(-) diff --git a/internal/archiver/blob_saver_test.go b/internal/archiver/blob_saver_test.go index 180f95b3d..f7ef2f47d 100644 --- a/internal/archiver/blob_saver_test.go +++ b/internal/archiver/blob_saver_test.go @@ -10,7 +10,6 @@ import ( "testing" "github.com/restic/restic/internal/errors" - "github.com/restic/restic/internal/index" "github.com/restic/restic/internal/restic" rtest "github.com/restic/restic/internal/test" "golang.org/x/sync/errgroup" @@ -19,7 +18,6 @@ import ( var errTest = errors.New("test error") type saveFail struct { - idx restic.MasterIndex cnt int32 failAt int32 } @@ -33,18 +31,12 @@ func (b *saveFail) SaveBlob(_ context.Context, _ restic.BlobType, _ []byte, id r return id, false, 0, nil } -func (b *saveFail) Index() restic.MasterIndex { - return b.idx -} - func TestBlobSaver(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() wg, ctx := errgroup.WithContext(ctx) - saver := &saveFail{ - idx: index.NewMasterIndex(), - } + saver := &saveFail{} b := NewBlobSaver(ctx, wg, saver, uint(runtime.NumCPU())) @@ -100,7 +92,6 @@ func TestBlobSaverError(t *testing.T) { wg, ctx := errgroup.WithContext(ctx) saver := &saveFail{ - idx: index.NewMasterIndex(), failAt: int32(test.failAt), } From 0bb0720348c055222c04b105fe44e40c68e6a3a2 Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Sun, 19 May 2024 14:41:11 +0200 Subject: [PATCH 05/25] test cleanups --- internal/archiver/testing.go | 3 +-- internal/repository/repack_test.go | 5 ----- internal/repository/repair_pack_test.go | 2 -- 3 files changed, 1 insertion(+), 9 deletions(-) diff --git a/internal/archiver/testing.go b/internal/archiver/testing.go index a186a4ee5..278c6a448 100644 --- a/internal/archiver/testing.go +++ b/internal/archiver/testing.go @@ -11,7 +11,6 @@ import ( "testing" "time" - "github.com/restic/restic/internal/crypto" "github.com/restic/restic/internal/debug" "github.com/restic/restic/internal/fs" "github.com/restic/restic/internal/restic" @@ -239,7 +238,7 @@ func TestEnsureFileContent(ctx context.Context, t testing.TB, repo restic.BlobLo return } - content := make([]byte, crypto.CiphertextLength(len(file.Content))) + content := make([]byte, len(file.Content)) pos := 0 for _, id := range node.Content { part, err := repo.LoadBlob(ctx, restic.DataBlob, id, content[pos:]) diff --git a/internal/repository/repack_test.go b/internal/repository/repack_test.go index e0f1b4254..f47f32881 100644 --- a/internal/repository/repack_test.go +++ b/internal/repository/repack_test.go @@ -199,11 +199,6 @@ func rebuildIndex(t *testing.T, repo restic.Repository) { } func reloadIndex(t *testing.T, repo restic.Repository) { - err := repo.SetIndex(index.NewMasterIndex()) - if err != nil { - t.Fatal(err) - } - if err := repo.LoadIndex(context.TODO(), nil); err != nil { t.Fatalf("error loading new index: %v", err) } diff --git a/internal/repository/repair_pack_test.go b/internal/repository/repair_pack_test.go index 28a5525a2..0d6d340f4 100644 --- a/internal/repository/repair_pack_test.go +++ b/internal/repository/repair_pack_test.go @@ -8,7 +8,6 @@ import ( "github.com/restic/restic/internal/backend" backendtest "github.com/restic/restic/internal/backend/test" - "github.com/restic/restic/internal/index" "github.com/restic/restic/internal/repository" "github.com/restic/restic/internal/restic" "github.com/restic/restic/internal/test" @@ -118,7 +117,6 @@ func testRepairBrokenPack(t *testing.T, version uint) { rtest.OK(t, repository.RepairPacks(context.TODO(), repo, toRepair, &progress.NoopPrinter{})) // reload index - rtest.OK(t, repo.SetIndex(index.NewMasterIndex())) rtest.OK(t, repo.LoadIndex(context.TODO(), nil)) packsAfter := listPacks(t, repo) From e848ad651a253def63d989c044d374ee40265c8c Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Sun, 19 May 2024 14:41:31 +0200 Subject: [PATCH 06/25] restic: name parameters in restic interface --- internal/restic/repository.go | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/internal/restic/repository.go b/internal/restic/repository.go index d16c12018..7c0c747bf 100644 --- a/internal/restic/repository.go +++ b/internal/restic/repository.go @@ -21,12 +21,12 @@ type Repository interface { Config() Config Key() *crypto.Key - LoadIndex(context.Context, *progress.Counter) error - SetIndex(MasterIndex) error + LoadIndex(ctx context.Context, p *progress.Counter) error + SetIndex(mi MasterIndex) error SaveIndex(ctx context.Context, excludePacks IDSet, extraObsolete IDs, opts MasterIndexSaveOpts) error - LookupBlob(BlobHandle) []PackedBlob - LookupBlobSize(ID, BlobType) (uint, bool) + LookupBlob(bh BlobHandle) []PackedBlob + LookupBlobSize(id ID, t BlobType) (size uint, exists bool) // ListBlobs runs fn on all blobs known to the index. When the context is cancelled, // the index iteration returns immediately with ctx.Err(). This blocks any modification of the index. @@ -34,17 +34,17 @@ type Repository interface { ListPacksFromIndex(ctx context.Context, packs IDSet) <-chan PackBlobs // ListPack returns the list of blobs saved in the pack id and the length of // the pack header. - ListPack(context.Context, ID, int64) ([]Blob, uint32, error) + ListPack(ctx context.Context, id ID, packSize int64) (entries []Blob, hdrSize uint32, err error) - LoadBlob(context.Context, BlobType, ID, []byte) ([]byte, error) + LoadBlob(ctx context.Context, t BlobType, id ID, buf []byte) ([]byte, error) LoadBlobsFromPack(ctx context.Context, packID ID, blobs []Blob, handleBlobFn func(blob BlobHandle, buf []byte, err error) error) error // StartPackUploader start goroutines to upload new pack files. The errgroup // is used to immediately notify about an upload error. Flush() will also return // that error. StartPackUploader(ctx context.Context, wg *errgroup.Group) - SaveBlob(context.Context, BlobType, []byte, ID, bool) (ID, bool, int, error) - Flush(context.Context) error + SaveBlob(ctx context.Context, t BlobType, buf []byte, id ID, storeDuplicate bool) (newID ID, known bool, size int, err error) + Flush(ctx context.Context) error // List calls the function fn for each file of type t in the repository. // When an error is returned by fn, processing stops and List() returns the @@ -58,7 +58,7 @@ type Repository interface { LoadRaw(ctx context.Context, t FileType, id ID) (data []byte, err error) // LoadUnpacked loads and decrypts the file with the given type and ID. LoadUnpacked(ctx context.Context, t FileType, id ID) (data []byte, err error) - SaveUnpacked(context.Context, FileType, []byte) (ID, error) + SaveUnpacked(ctx context.Context, t FileType, buf []byte) (ID, error) // RemoveUnpacked removes a file from the repository. This will eventually be restricted to deleting only snapshots. RemoveUnpacked(ctx context.Context, t FileType, id ID) error } @@ -86,7 +86,7 @@ type LoaderUnpacked interface { type SaverUnpacked interface { // Connections returns the maximum number of concurrent backend operations Connections() uint - SaveUnpacked(context.Context, FileType, []byte) (ID, error) + SaveUnpacked(ctx context.Context, t FileType, buf []byte) (ID, error) } // RemoverUnpacked allows removing an unpacked blob @@ -115,8 +115,8 @@ type MasterIndexSaveOpts struct { // MasterIndex keeps track of the blobs are stored within files. type MasterIndex interface { - Has(BlobHandle) bool - Lookup(BlobHandle) []PackedBlob + Has(bh BlobHandle) bool + Lookup(bh BlobHandle) []PackedBlob // Each runs fn on all blobs known to the index. When the context is cancelled, // the index iteration returns immediately with ctx.Err(). This blocks any modification of the index. From 1266a4932f2974d67fcc75c951584e55c5713362 Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Sun, 19 May 2024 14:54:50 +0200 Subject: [PATCH 07/25] repository: fix parameter order of LookupBlobSize All methods should use blobType followed by ID. --- cmd/restic/cmd_cat.go | 2 +- cmd/restic/cmd_copy.go | 4 ++-- cmd/restic/cmd_diff.go | 2 +- cmd/restic/cmd_repair_snapshots.go | 2 +- cmd/restic/cmd_stats.go | 2 +- internal/archiver/archiver.go | 4 ++-- internal/checker/checker.go | 2 +- internal/checker/checker_test.go | 4 ++-- internal/fuse/file.go | 2 +- internal/fuse/fuse_test.go | 2 +- internal/repository/repack_test.go | 2 +- internal/repository/repository.go | 4 ++-- internal/restic/find.go | 2 +- internal/restic/find_test.go | 2 +- internal/restic/repository.go | 2 +- internal/restic/tree_stream.go | 2 +- internal/restorer/restorer.go | 2 +- 17 files changed, 21 insertions(+), 21 deletions(-) diff --git a/cmd/restic/cmd_cat.go b/cmd/restic/cmd_cat.go index de579587f..23205771a 100644 --- a/cmd/restic/cmd_cat.go +++ b/cmd/restic/cmd_cat.go @@ -167,7 +167,7 @@ func runCat(ctx context.Context, gopts GlobalOptions, args []string) error { } for _, t := range []restic.BlobType{restic.DataBlob, restic.TreeBlob} { - if _, ok := repo.LookupBlobSize(id, t); !ok { + if _, ok := repo.LookupBlobSize(t, id); !ok { continue } diff --git a/cmd/restic/cmd_copy.go b/cmd/restic/cmd_copy.go index 0df899321..4b2f95bf2 100644 --- a/cmd/restic/cmd_copy.go +++ b/cmd/restic/cmd_copy.go @@ -202,7 +202,7 @@ func copyTree(ctx context.Context, srcRepo restic.Repository, dstRepo restic.Rep // Do we already have this tree blob? treeHandle := restic.BlobHandle{ID: tree.ID, Type: restic.TreeBlob} - if _, ok := dstRepo.LookupBlobSize(treeHandle.ID, treeHandle.Type); !ok { + if _, ok := dstRepo.LookupBlobSize(treeHandle.Type, treeHandle.ID); !ok { // copy raw tree bytes to avoid problems if the serialization changes enqueue(treeHandle) } @@ -212,7 +212,7 @@ func copyTree(ctx context.Context, srcRepo restic.Repository, dstRepo restic.Rep // Copy the blobs for this file. for _, blobID := range entry.Content { h := restic.BlobHandle{Type: restic.DataBlob, ID: blobID} - if _, ok := dstRepo.LookupBlobSize(h.ID, h.Type); !ok { + if _, ok := dstRepo.LookupBlobSize(h.Type, h.ID); !ok { enqueue(h) } } diff --git a/cmd/restic/cmd_diff.go b/cmd/restic/cmd_diff.go index b156191dc..28c742625 100644 --- a/cmd/restic/cmd_diff.go +++ b/cmd/restic/cmd_diff.go @@ -156,7 +156,7 @@ func updateBlobs(repo restic.Loader, blobs restic.BlobSet, stats *DiffStat) { stats.TreeBlobs++ } - size, found := repo.LookupBlobSize(h.ID, h.Type) + size, found := repo.LookupBlobSize(h.Type, h.ID) if !found { Warnf("unable to find blob size for %v\n", h) continue diff --git a/cmd/restic/cmd_repair_snapshots.go b/cmd/restic/cmd_repair_snapshots.go index b200d100a..be5ef4ad9 100644 --- a/cmd/restic/cmd_repair_snapshots.go +++ b/cmd/restic/cmd_repair_snapshots.go @@ -97,7 +97,7 @@ func runRepairSnapshots(ctx context.Context, gopts GlobalOptions, opts RepairOpt var newSize uint64 // check all contents and remove if not available for _, id := range node.Content { - if size, found := repo.LookupBlobSize(id, restic.DataBlob); !found { + if size, found := repo.LookupBlobSize(restic.DataBlob, id); !found { ok = false } else { newContent = append(newContent, id) diff --git a/cmd/restic/cmd_stats.go b/cmd/restic/cmd_stats.go index 3bec18f4c..0926a54ef 100644 --- a/cmd/restic/cmd_stats.go +++ b/cmd/restic/cmd_stats.go @@ -238,7 +238,7 @@ func statsWalkTree(repo restic.Loader, opts StatsOptions, stats *statsContainer, } if _, ok := stats.fileBlobs[nodePath][blobID]; !ok { // is always a data blob since we're accessing it via a file's Content array - blobSize, found := repo.LookupBlobSize(blobID, restic.DataBlob) + blobSize, found := repo.LookupBlobSize(restic.DataBlob, blobID) if !found { return fmt.Errorf("blob %s not found for tree %s", blobID, parentTreeID) } diff --git a/internal/archiver/archiver.go b/internal/archiver/archiver.go index 1de28082b..50b09583c 100644 --- a/internal/archiver/archiver.go +++ b/internal/archiver/archiver.go @@ -276,7 +276,7 @@ func (arch *Archiver) loadSubtree(ctx context.Context, node *restic.Node) (*rest } func (arch *Archiver) wrapLoadTreeError(id restic.ID, err error) error { - if _, ok := arch.Repo.LookupBlobSize(id, restic.TreeBlob); ok { + if _, ok := arch.Repo.LookupBlobSize(restic.TreeBlob, id); ok { err = errors.Errorf("tree %v could not be loaded; the repository could be damaged: %v", id, err) } else { err = errors.Errorf("tree %v is not known; the repository could be damaged, run `repair index` to try to repair it", id) @@ -390,7 +390,7 @@ func (fn *FutureNode) take(ctx context.Context) futureNodeResult { func (arch *Archiver) allBlobsPresent(previous *restic.Node) bool { // check if all blobs are contained in index for _, id := range previous.Content { - if _, ok := arch.Repo.LookupBlobSize(id, restic.DataBlob); !ok { + if _, ok := arch.Repo.LookupBlobSize(restic.DataBlob, id); !ok { return false } } diff --git a/internal/checker/checker.go b/internal/checker/checker.go index 09b1dd7eb..db3bf807d 100644 --- a/internal/checker/checker.go +++ b/internal/checker/checker.go @@ -429,7 +429,7 @@ func (c *Checker) checkTree(id restic.ID, tree *restic.Tree) (errs []error) { // unfortunately fails in some cases that are not resolvable // by users, so we omit this check, see #1887 - _, found := c.repo.LookupBlobSize(blobID, restic.DataBlob) + _, found := c.repo.LookupBlobSize(restic.DataBlob, blobID) if !found { debug.Log("tree %v references blob %v which isn't contained in index", id, blobID) errs = append(errs, &Error{TreeID: id, Err: errors.Errorf("file %q blob %v not found in index", node.Name, blobID)}) diff --git a/internal/checker/checker_test.go b/internal/checker/checker_test.go index baec88628..1219f4e2b 100644 --- a/internal/checker/checker_test.go +++ b/internal/checker/checker_test.go @@ -461,11 +461,11 @@ func (r *delayRepository) LoadTree(ctx context.Context, id restic.ID) (*restic.T return restic.LoadTree(ctx, r.Repository, id) } -func (r *delayRepository) LookupBlobSize(id restic.ID, t restic.BlobType) (uint, bool) { +func (r *delayRepository) LookupBlobSize(t restic.BlobType, id restic.ID) (uint, bool) { if id == r.DelayTree && t == restic.DataBlob { r.Unblock() } - return r.Repository.LookupBlobSize(id, t) + return r.Repository.LookupBlobSize(t, id) } func (r *delayRepository) Unblock() { diff --git a/internal/fuse/file.go b/internal/fuse/file.go index 5190febbb..e2e0cf9a0 100644 --- a/internal/fuse/file.go +++ b/internal/fuse/file.go @@ -72,7 +72,7 @@ func (f *file) Open(_ context.Context, _ *fuse.OpenRequest, _ *fuse.OpenResponse var bytes uint64 cumsize := make([]uint64, 1+len(f.node.Content)) for i, id := range f.node.Content { - size, found := f.root.repo.LookupBlobSize(id, restic.DataBlob) + size, found := f.root.repo.LookupBlobSize(restic.DataBlob, id) if !found { return nil, errors.Errorf("id %v not found in repository", id) } diff --git a/internal/fuse/fuse_test.go b/internal/fuse/fuse_test.go index 1053d49a4..aebcb1272 100644 --- a/internal/fuse/fuse_test.go +++ b/internal/fuse/fuse_test.go @@ -89,7 +89,7 @@ func TestFuseFile(t *testing.T) { memfile []byte ) for _, id := range content { - size, found := repo.LookupBlobSize(id, restic.DataBlob) + size, found := repo.LookupBlobSize(restic.DataBlob, id) rtest.Assert(t, found, "Expected to find blob id %v", id) filesize += uint64(size) diff --git a/internal/repository/repack_test.go b/internal/repository/repack_test.go index f47f32881..524ab6485 100644 --- a/internal/repository/repack_test.go +++ b/internal/repository/repack_test.go @@ -266,7 +266,7 @@ func testRepack(t *testing.T, version uint) { } for h := range removeBlobs { - if _, found := repo.LookupBlobSize(h.ID, h.Type); found { + if _, found := repo.LookupBlobSize(h.Type, h.ID); found { t.Errorf("blob %v still contained in the repo", h) } } diff --git a/internal/repository/repository.go b/internal/repository/repository.go index e5983ee16..d68ed8837 100644 --- a/internal/repository/repository.go +++ b/internal/repository/repository.go @@ -583,8 +583,8 @@ func (r *Repository) LookupBlob(bh restic.BlobHandle) []restic.PackedBlob { } // LookupBlobSize returns the size of blob id. -func (r *Repository) LookupBlobSize(id restic.ID, tpe restic.BlobType) (uint, bool) { - return r.idx.LookupSize(restic.BlobHandle{ID: id, Type: tpe}) +func (r *Repository) LookupBlobSize(tpe restic.BlobType, id restic.ID) (uint, bool) { + return r.idx.LookupSize(restic.BlobHandle{Type: tpe, ID: id}) } func (r *Repository) SaveIndex(ctx context.Context, excludePacks restic.IDSet, extraObsolete restic.IDs, opts restic.MasterIndexSaveOpts) error { diff --git a/internal/restic/find.go b/internal/restic/find.go index 08670a49f..cefef2196 100644 --- a/internal/restic/find.go +++ b/internal/restic/find.go @@ -11,7 +11,7 @@ import ( // Loader loads a blob from a repository. type Loader interface { LoadBlob(context.Context, BlobType, ID, []byte) ([]byte, error) - LookupBlobSize(id ID, tpe BlobType) (uint, bool) + LookupBlobSize(tpe BlobType, id ID) (uint, bool) Connections() uint } diff --git a/internal/restic/find_test.go b/internal/restic/find_test.go index 1ae30ded9..9b8315ad4 100644 --- a/internal/restic/find_test.go +++ b/internal/restic/find_test.go @@ -166,7 +166,7 @@ func (r ForbiddenRepo) LoadBlob(context.Context, restic.BlobType, restic.ID, []b return nil, errors.New("should not be called") } -func (r ForbiddenRepo) LookupBlobSize(_ restic.ID, _ restic.BlobType) (uint, bool) { +func (r ForbiddenRepo) LookupBlobSize(_ restic.BlobType, _ restic.ID) (uint, bool) { return 0, false } diff --git a/internal/restic/repository.go b/internal/restic/repository.go index 7c0c747bf..3d5bccec0 100644 --- a/internal/restic/repository.go +++ b/internal/restic/repository.go @@ -26,7 +26,7 @@ type Repository interface { SaveIndex(ctx context.Context, excludePacks IDSet, extraObsolete IDs, opts MasterIndexSaveOpts) error LookupBlob(bh BlobHandle) []PackedBlob - LookupBlobSize(id ID, t BlobType) (size uint, exists bool) + LookupBlobSize(t BlobType, id ID) (size uint, exists bool) // ListBlobs runs fn on all blobs known to the index. When the context is cancelled, // the index iteration returns immediately with ctx.Err(). This blocks any modification of the index. diff --git a/internal/restic/tree_stream.go b/internal/restic/tree_stream.go index 4110a5e8d..123295533 100644 --- a/internal/restic/tree_stream.go +++ b/internal/restic/tree_stream.go @@ -77,7 +77,7 @@ func filterTrees(ctx context.Context, repo Loader, trees IDs, loaderChan chan<- continue } - treeSize, found := repo.LookupBlobSize(nextTreeID.ID, TreeBlob) + treeSize, found := repo.LookupBlobSize(TreeBlob, nextTreeID.ID) if found && treeSize > 50*1024*1024 { loadCh = hugeTreeLoaderChan } else { diff --git a/internal/restorer/restorer.go b/internal/restorer/restorer.go index 721330a8c..c471800df 100644 --- a/internal/restorer/restorer.go +++ b/internal/restorer/restorer.go @@ -435,7 +435,7 @@ func (res *Restorer) verifyFile(target string, node *restic.Node, buf []byte) ([ var offset int64 for _, blobID := range node.Content { - length, found := res.repo.LookupBlobSize(blobID, restic.DataBlob) + length, found := res.repo.LookupBlobSize(restic.DataBlob, blobID) if !found { return buf, errors.Errorf("Unable to fetch blob %s", blobID) } From 864995271ec7413594ce8a186385b6b61cdb310d Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Sun, 19 May 2024 14:56:17 +0200 Subject: [PATCH 08/25] repository: unwrap BlobHandle parameters of LookupBlob The method now uses the same parameters as LookupBlobSize. --- cmd/restic/cmd_copy.go | 2 +- cmd/restic/cmd_find.go | 2 +- cmd/restic/cmd_stats.go | 2 +- internal/repository/check.go | 2 +- internal/repository/repack_test.go | 6 +++--- internal/repository/repository.go | 4 ++-- internal/repository/repository_test.go | 4 ++-- internal/restic/repository.go | 2 +- internal/restorer/filerestorer.go | 8 ++++---- internal/restorer/filerestorer_test.go | 4 ++-- 10 files changed, 18 insertions(+), 18 deletions(-) diff --git a/cmd/restic/cmd_copy.go b/cmd/restic/cmd_copy.go index 4b2f95bf2..d12501dd9 100644 --- a/cmd/restic/cmd_copy.go +++ b/cmd/restic/cmd_copy.go @@ -187,7 +187,7 @@ func copyTree(ctx context.Context, srcRepo restic.Repository, dstRepo restic.Rep packList := restic.NewIDSet() enqueue := func(h restic.BlobHandle) { - pb := srcRepo.LookupBlob(h) + pb := srcRepo.LookupBlob(h.Type, h.ID) copyBlobs.Insert(h) for _, p := range pb { packList.Insert(p.PackID) diff --git a/cmd/restic/cmd_find.go b/cmd/restic/cmd_find.go index 7ad8886c8..59e34c468 100644 --- a/cmd/restic/cmd_find.go +++ b/cmd/restic/cmd_find.go @@ -509,7 +509,7 @@ func (f *Finder) findObjectPack(id string, t restic.BlobType) { return } - blobs := f.repo.LookupBlob(restic.BlobHandle{ID: rid, Type: t}) + blobs := f.repo.LookupBlob(t, rid) if len(blobs) == 0 { Printf("Object %s not found in the index\n", rid.Str()) return diff --git a/cmd/restic/cmd_stats.go b/cmd/restic/cmd_stats.go index 0926a54ef..0f8e45f36 100644 --- a/cmd/restic/cmd_stats.go +++ b/cmd/restic/cmd_stats.go @@ -124,7 +124,7 @@ func runStats(ctx context.Context, opts StatsOptions, gopts GlobalOptions, args if opts.countMode == countModeRawData { // the blob handles have been collected, but not yet counted for blobHandle := range stats.blobs { - pbs := repo.LookupBlob(blobHandle) + pbs := repo.LookupBlob(blobHandle.Type, blobHandle.ID) if len(pbs) == 0 { return fmt.Errorf("blob %v not found", blobHandle) } diff --git a/internal/repository/check.go b/internal/repository/check.go index 05605db86..f16cd7492 100644 --- a/internal/repository/check.go +++ b/internal/repository/check.go @@ -161,7 +161,7 @@ func checkPackInner(ctx context.Context, r *Repository, id restic.ID, blobs []re for _, blob := range blobs { // Check if blob is contained in index and position is correct idxHas := false - for _, pb := range r.LookupBlob(blob.BlobHandle) { + for _, pb := range r.LookupBlob(blob.BlobHandle.Type, blob.BlobHandle.ID) { if pb.PackID == id && pb.Blob == blob { idxHas = true break diff --git a/internal/repository/repack_test.go b/internal/repository/repack_test.go index 524ab6485..96b75ca46 100644 --- a/internal/repository/repack_test.go +++ b/internal/repository/repack_test.go @@ -146,7 +146,7 @@ func findPacksForBlobs(t *testing.T, repo restic.Repository, blobs restic.BlobSe packs := restic.NewIDSet() for h := range blobs { - list := repo.LookupBlob(h) + list := repo.LookupBlob(h.Type, h.ID) if len(list) == 0 { t.Fatal("Failed to find blob", h.ID.Str(), "with type", h.Type) } @@ -247,7 +247,7 @@ func testRepack(t *testing.T, version uint) { } for h := range keepBlobs { - list := repo.LookupBlob(h) + list := repo.LookupBlob(h.Type, h.ID) if len(list) == 0 { t.Errorf("unable to find blob %v in repo", h.ID.Str()) continue @@ -311,7 +311,7 @@ func testRepackCopy(t *testing.T, version uint) { reloadIndex(t, dstRepo) for h := range keepBlobs { - list := dstRepo.LookupBlob(h) + list := dstRepo.LookupBlob(h.Type, h.ID) if len(list) == 0 { t.Errorf("unable to find blob %v in repo", h.ID.Str()) continue diff --git a/internal/repository/repository.go b/internal/repository/repository.go index d68ed8837..73d05fe7b 100644 --- a/internal/repository/repository.go +++ b/internal/repository/repository.go @@ -578,8 +578,8 @@ func (r *Repository) Connections() uint { return r.be.Connections() } -func (r *Repository) LookupBlob(bh restic.BlobHandle) []restic.PackedBlob { - return r.idx.Lookup(bh) +func (r *Repository) LookupBlob(tpe restic.BlobType, id restic.ID) []restic.PackedBlob { + return r.idx.Lookup(restic.BlobHandle{Type: tpe, ID: id}) } // LookupBlobSize returns the size of blob id. diff --git a/internal/repository/repository_test.go b/internal/repository/repository_test.go index 31a588f62..bc950d0b0 100644 --- a/internal/repository/repository_test.go +++ b/internal/repository/repository_test.go @@ -161,7 +161,7 @@ func TestLoadBlobBroken(t *testing.T) { data, err := repo.LoadBlob(context.TODO(), restic.TreeBlob, id, nil) rtest.OK(t, err) rtest.Assert(t, bytes.Equal(buf, data), "data mismatch") - pack := repo.LookupBlob(restic.BlobHandle{Type: restic.TreeBlob, ID: id})[0].PackID + pack := repo.LookupBlob(restic.TreeBlob, id)[0].PackID rtest.Assert(t, c.Has(backend.Handle{Type: restic.PackFile, Name: pack.String()}), "expected tree pack to be cached") } @@ -439,7 +439,7 @@ func TestListPack(t *testing.T) { repo.UseCache(c) // Forcibly cache pack file - packID := repo.LookupBlob(restic.BlobHandle{Type: restic.TreeBlob, ID: id})[0].PackID + packID := repo.LookupBlob(restic.TreeBlob, id)[0].PackID rtest.OK(t, be.Load(context.TODO(), backend.Handle{Type: restic.PackFile, IsMetadata: true, Name: packID.String()}, 0, 0, func(rd io.Reader) error { return nil })) // Get size to list pack diff --git a/internal/restic/repository.go b/internal/restic/repository.go index 3d5bccec0..9e6d6b99b 100644 --- a/internal/restic/repository.go +++ b/internal/restic/repository.go @@ -25,7 +25,7 @@ type Repository interface { SetIndex(mi MasterIndex) error SaveIndex(ctx context.Context, excludePacks IDSet, extraObsolete IDs, opts MasterIndexSaveOpts) error - LookupBlob(bh BlobHandle) []PackedBlob + LookupBlob(t BlobType, id ID) []PackedBlob LookupBlobSize(t BlobType, id ID) (size uint, exists bool) // ListBlobs runs fn on all blobs known to the index. When the context is cancelled, diff --git a/internal/restorer/filerestorer.go b/internal/restorer/filerestorer.go index f2c134ea9..3551857dd 100644 --- a/internal/restorer/filerestorer.go +++ b/internal/restorer/filerestorer.go @@ -48,7 +48,7 @@ type blobsLoaderFn func(ctx context.Context, packID restic.ID, blobs []restic.Bl // fileRestorer restores set of files type fileRestorer struct { - idx func(restic.BlobHandle) []restic.PackedBlob + idx func(restic.BlobType, restic.ID) []restic.PackedBlob blobsLoader blobsLoaderFn workerCount int @@ -64,7 +64,7 @@ type fileRestorer struct { func newFileRestorer(dst string, blobsLoader blobsLoaderFn, - idx func(restic.BlobHandle) []restic.PackedBlob, + idx func(restic.BlobType, restic.ID) []restic.PackedBlob, connections uint, sparse bool, progress *restore.Progress) *fileRestorer { @@ -99,7 +99,7 @@ func (r *fileRestorer) forEachBlob(blobIDs []restic.ID, fn func(packID restic.ID } for _, blobID := range blobIDs { - packs := r.idx(restic.BlobHandle{ID: blobID, Type: restic.DataBlob}) + packs := r.idx(restic.DataBlob, blobID) if len(packs) == 0 { return errors.Errorf("Unknown blob %s", blobID.String()) } @@ -227,7 +227,7 @@ func (r *fileRestorer) downloadPack(ctx context.Context, pack *packInfo) error { } } else if packsMap, ok := file.blobs.(map[restic.ID][]fileBlobInfo); ok { for _, blob := range packsMap[pack.id] { - idxPacks := r.idx(restic.BlobHandle{ID: blob.id, Type: restic.DataBlob}) + idxPacks := r.idx(restic.DataBlob, blob.id) for _, idxPack := range idxPacks { if idxPack.PackID.Equal(pack.id) { addBlob(idxPack.Blob, blob.offset) diff --git a/internal/restorer/filerestorer_test.go b/internal/restorer/filerestorer_test.go index befeb5d2c..03797e0c8 100644 --- a/internal/restorer/filerestorer_test.go +++ b/internal/restorer/filerestorer_test.go @@ -35,8 +35,8 @@ type TestRepo struct { loader blobsLoaderFn } -func (i *TestRepo) Lookup(bh restic.BlobHandle) []restic.PackedBlob { - packs := i.blobs[bh.ID] +func (i *TestRepo) Lookup(tpe restic.BlobType, id restic.ID) []restic.PackedBlob { + packs := i.blobs[id] return packs } From 6ca12c1b4aaf28b8a0a322d8b5e1f46e480e3802 Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Sun, 19 May 2024 15:11:32 +0200 Subject: [PATCH 09/25] archiver: replace most uses of restic.Repository --- internal/archiver/archiver.go | 14 +++++++++++-- internal/archiver/archiver_test.go | 28 ++++++++++++------------- internal/archiver/archiver_unix_test.go | 2 +- internal/archiver/testing.go | 2 +- 4 files changed, 28 insertions(+), 18 deletions(-) diff --git a/internal/archiver/archiver.go b/internal/archiver/archiver.go index 50b09583c..86b329a9a 100644 --- a/internal/archiver/archiver.go +++ b/internal/archiver/archiver.go @@ -64,9 +64,19 @@ func (s *ItemStats) Add(other ItemStats) { s.TreeSizeInRepo += other.TreeSizeInRepo } +type archiverRepo interface { + restic.Loader + restic.BlobSaver + restic.SaverUnpacked + + Config() restic.Config + StartPackUploader(ctx context.Context, wg *errgroup.Group) + Flush(ctx context.Context) error +} + // Archiver saves a directory structure to the repo. type Archiver struct { - Repo restic.Repository + Repo archiverRepo SelectByName SelectByNameFunc Select SelectFunc FS fs.FS @@ -160,7 +170,7 @@ func (o Options) ApplyDefaults() Options { } // New initializes a new archiver. -func New(repo restic.Repository, fs fs.FS, opts Options) *Archiver { +func New(repo archiverRepo, fs fs.FS, opts Options) *Archiver { arch := &Archiver{ Repo: repo, SelectByName: func(_ string) bool { return true }, diff --git a/internal/archiver/archiver_test.go b/internal/archiver/archiver_test.go index 8d0c2c02f..f38d5b0de 100644 --- a/internal/archiver/archiver_test.go +++ b/internal/archiver/archiver_test.go @@ -36,7 +36,7 @@ func prepareTempdirRepoSrc(t testing.TB, src TestDir) (string, restic.Repository return tempdir, repo } -func saveFile(t testing.TB, repo restic.Repository, filename string, filesystem fs.FS) (*restic.Node, ItemStats) { +func saveFile(t testing.TB, repo archiverRepo, filename string, filesystem fs.FS) (*restic.Node, ItemStats) { wg, ctx := errgroup.WithContext(context.TODO()) repo.StartPackUploader(ctx, wg) @@ -416,14 +416,14 @@ func BenchmarkArchiverSaveFileLarge(b *testing.B) { } type blobCountingRepo struct { - restic.Repository + archiverRepo m sync.Mutex saved map[restic.BlobHandle]uint } func (repo *blobCountingRepo) SaveBlob(ctx context.Context, t restic.BlobType, buf []byte, id restic.ID, storeDuplicate bool) (restic.ID, bool, int, error) { - id, exists, size, err := repo.Repository.SaveBlob(ctx, t, buf, id, storeDuplicate) + id, exists, size, err := repo.archiverRepo.SaveBlob(ctx, t, buf, id, storeDuplicate) if exists { return id, exists, size, err } @@ -435,7 +435,7 @@ func (repo *blobCountingRepo) SaveBlob(ctx context.Context, t restic.BlobType, b } func (repo *blobCountingRepo) SaveTree(ctx context.Context, t *restic.Tree) (restic.ID, error) { - id, err := restic.SaveTree(ctx, repo.Repository, t) + id, err := restic.SaveTree(ctx, repo.archiverRepo, t) h := restic.BlobHandle{ID: id, Type: restic.TreeBlob} repo.m.Lock() repo.saved[h]++ @@ -465,8 +465,8 @@ func TestArchiverSaveFileIncremental(t *testing.T) { tempdir := rtest.TempDir(t) repo := &blobCountingRepo{ - Repository: repository.TestRepository(t), - saved: make(map[restic.BlobHandle]uint), + archiverRepo: repository.TestRepository(t), + saved: make(map[restic.BlobHandle]uint), } data := rtest.Random(23, 512*1024+887898) @@ -902,8 +902,8 @@ func TestArchiverSaveDirIncremental(t *testing.T) { tempdir := rtest.TempDir(t) repo := &blobCountingRepo{ - Repository: repository.TestRepository(t), - saved: make(map[restic.BlobHandle]uint), + archiverRepo: repository.TestRepository(t), + saved: make(map[restic.BlobHandle]uint), } appendToFile(t, filepath.Join(tempdir, "testfile"), []byte("foobar")) @@ -2017,7 +2017,7 @@ func (m *TrackFS) OpenFile(name string, flag int, perm os.FileMode) (fs.File, er } type failSaveRepo struct { - restic.Repository + archiverRepo failAfter int32 cnt int32 err error @@ -2029,7 +2029,7 @@ func (f *failSaveRepo) SaveBlob(ctx context.Context, t restic.BlobType, buf []by return restic.Hash(buf), false, 0, f.err } - return f.Repository.SaveBlob(ctx, t, buf, id, storeDuplicate) + return f.archiverRepo.SaveBlob(ctx, t, buf, id, storeDuplicate) } func TestArchiverAbortEarlyOnError(t *testing.T) { @@ -2105,9 +2105,9 @@ func TestArchiverAbortEarlyOnError(t *testing.T) { } testRepo := &failSaveRepo{ - Repository: repo, - failAfter: int32(test.failAfter), - err: test.err, + archiverRepo: repo, + failAfter: int32(test.failAfter), + err: test.err, } // at most two files may be queued @@ -2134,7 +2134,7 @@ func TestArchiverAbortEarlyOnError(t *testing.T) { } } -func snapshot(t testing.TB, repo restic.Repository, fs fs.FS, parent *restic.Snapshot, filename string) (*restic.Snapshot, *restic.Node) { +func snapshot(t testing.TB, repo archiverRepo, fs fs.FS, parent *restic.Snapshot, filename string) (*restic.Snapshot, *restic.Node) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() diff --git a/internal/archiver/archiver_unix_test.go b/internal/archiver/archiver_unix_test.go index a6b1aad2e..4a380dff8 100644 --- a/internal/archiver/archiver_unix_test.go +++ b/internal/archiver/archiver_unix_test.go @@ -46,7 +46,7 @@ func wrapFileInfo(fi os.FileInfo) os.FileInfo { return res } -func statAndSnapshot(t *testing.T, repo restic.Repository, name string) (*restic.Node, *restic.Node) { +func statAndSnapshot(t *testing.T, repo archiverRepo, name string) (*restic.Node, *restic.Node) { fi := lstat(t, name) want, err := restic.NodeFromFileInfo(name, fi, false) rtest.OK(t, err) diff --git a/internal/archiver/testing.go b/internal/archiver/testing.go index 278c6a448..106e68445 100644 --- a/internal/archiver/testing.go +++ b/internal/archiver/testing.go @@ -25,7 +25,7 @@ func TestSnapshot(t testing.TB, repo restic.Repository, path string, parent *res Tags: []string{"test"}, } if parent != nil { - sn, err := restic.LoadSnapshot(context.TODO(), arch.Repo, *parent) + sn, err := restic.LoadSnapshot(context.TODO(), repo, *parent) if err != nil { t.Fatal(err) } From 447b486c20c4933eb06011fb435a93641550db4e Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Sun, 19 May 2024 15:37:54 +0200 Subject: [PATCH 10/25] index: deduplicate index loading of check and repository --- internal/checker/checker.go | 37 +++---------------------- internal/index/master_index.go | 45 +++++++++++++++++++++++++++++++ internal/repository/repository.go | 35 +----------------------- 3 files changed, 49 insertions(+), 68 deletions(-) diff --git a/internal/checker/checker.go b/internal/checker/checker.go index db3bf807d..61c017414 100644 --- a/internal/checker/checker.go +++ b/internal/checker/checker.go @@ -111,33 +111,10 @@ func computePackTypes(ctx context.Context, idx restic.ListBlobser) (map[restic.I func (c *Checker) LoadIndex(ctx context.Context, p *progress.Counter) (hints []error, errs []error) { debug.Log("Start") - indexList, err := restic.MemorizeList(ctx, c.repo, restic.IndexFile) - if err != nil { - // abort if an error occurs while listing the indexes - return hints, append(errs, err) - } - - if p != nil { - var numIndexFiles uint64 - err := indexList.List(ctx, restic.IndexFile, func(_ restic.ID, _ int64) error { - numIndexFiles++ - return nil - }) - if err != nil { - return hints, append(errs, err) - } - p.SetMax(numIndexFiles) - defer p.Done() - } - packToIndex := make(map[restic.ID]restic.IDSet) - err = index.ForAllIndexes(ctx, indexList, c.repo, func(id restic.ID, index *index.Index, oldFormat bool, err error) error { + err := c.masterIndex.Load(ctx, c.repo, p, func(id restic.ID, idx *index.Index, oldFormat bool, err error) error { debug.Log("process index %v, err %v", id, err) - if p != nil { - p.Add(1) - } - if oldFormat { debug.Log("index %v has old format", id) hints = append(hints, &ErrOldIndexFormat{id}) @@ -150,11 +127,9 @@ func (c *Checker) LoadIndex(ctx context.Context, p *progress.Counter) (hints []e return nil } - c.masterIndex.Insert(index) - debug.Log("process blobs") cnt := 0 - err = index.Each(ctx, func(blob restic.PackedBlob) { + err = idx.Each(ctx, func(blob restic.PackedBlob) { cnt++ if _, ok := packToIndex[blob.PackID]; !ok { @@ -167,13 +142,7 @@ func (c *Checker) LoadIndex(ctx context.Context, p *progress.Counter) (hints []e return err }) if err != nil { - errs = append(errs, err) - } - - // Merge index before computing pack sizes, as this needs removed duplicates - err = c.masterIndex.MergeFinalIndexes() - if err != nil { - // abort if an error occurs merging the indexes + // failed to load the index return hints, append(errs, err) } diff --git a/internal/index/master_index.go b/internal/index/master_index.go index 21ab344d6..796559fd7 100644 --- a/internal/index/master_index.go +++ b/internal/index/master_index.go @@ -9,6 +9,7 @@ import ( "github.com/restic/restic/internal/debug" "github.com/restic/restic/internal/restic" + "github.com/restic/restic/internal/ui/progress" "golang.org/x/sync/errgroup" ) @@ -267,6 +268,50 @@ func (mi *MasterIndex) MergeFinalIndexes() error { return nil } +func (mi *MasterIndex) Load(ctx context.Context, r restic.ListerLoaderUnpacked, p *progress.Counter, cb func(id restic.ID, idx *Index, oldFormat bool, err error) error) error { + indexList, err := restic.MemorizeList(ctx, r, restic.IndexFile) + if err != nil { + return err + } + + if p != nil { + var numIndexFiles uint64 + err := indexList.List(ctx, restic.IndexFile, func(_ restic.ID, _ int64) error { + numIndexFiles++ + return nil + }) + if err != nil { + return err + } + p.SetMax(numIndexFiles) + defer p.Done() + } + + err = ForAllIndexes(ctx, indexList, r, func(id restic.ID, idx *Index, oldFormat bool, err error) error { + if p != nil { + p.Add(1) + } + if cb != nil { + err = cb(id, idx, oldFormat, err) + } + if err != nil { + return err + } + // special case to allow check to ignore index loading errors + if idx == nil { + return nil + } + mi.Insert(idx) + return nil + }) + + if err != nil { + return err + } + + return mi.MergeFinalIndexes() +} + // Save saves all known indexes to index files, leaving out any // packs whose ID is contained in packBlacklist from finalized indexes. // It also removes the old index files and those listed in extraObsolete. diff --git a/internal/repository/repository.go b/internal/repository/repository.go index 73d05fe7b..bd7de0de4 100644 --- a/internal/repository/repository.go +++ b/internal/repository/repository.go @@ -623,43 +623,10 @@ func (r *Repository) configureIndex() { func (r *Repository) LoadIndex(ctx context.Context, p *progress.Counter) error { debug.Log("Loading index") - indexList, err := restic.MemorizeList(ctx, r, restic.IndexFile) - if err != nil { - return err - } - - if p != nil { - var numIndexFiles uint64 - err := indexList.List(ctx, restic.IndexFile, func(_ restic.ID, _ int64) error { - numIndexFiles++ - return nil - }) - if err != nil { - return err - } - p.SetMax(numIndexFiles) - defer p.Done() - } - // reset in-memory index before loading it from the repository r.clearIndex() - err = index.ForAllIndexes(ctx, indexList, r, func(_ restic.ID, idx *index.Index, _ bool, err error) error { - if err != nil { - return err - } - r.idx.Insert(idx) - if p != nil { - p.Add(1) - } - return nil - }) - - if err != nil { - return err - } - - err = r.idx.MergeFinalIndexes() + err := r.idx.Load(ctx, r, p, nil) if err != nil { return err } From fb59e0061426ed5ad5e9d3ee9ba30285678f49eb Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Sun, 19 May 2024 15:48:25 +0200 Subject: [PATCH 11/25] index: rewrite MasterIndex load/save test to be independent of repository --- internal/index/master_index_test.go | 55 ++++++++++------------------- 1 file changed, 18 insertions(+), 37 deletions(-) diff --git a/internal/index/master_index_test.go b/internal/index/master_index_test.go index 36a028768..3a7f3da88 100644 --- a/internal/index/master_index_test.go +++ b/internal/index/master_index_test.go @@ -357,44 +357,25 @@ func TestIndexSave(t *testing.T) { func testIndexSave(t *testing.T, version uint) { repo := createFilledRepo(t, 3, version) - err := repo.LoadIndex(context.TODO(), nil) - if err != nil { - t.Fatal(err) - } + idx := index.NewMasterIndex() + rtest.OK(t, idx.Load(context.TODO(), repo, nil, nil)) + blobs := make(map[restic.PackedBlob]struct{}) + rtest.OK(t, idx.Each(context.TODO(), func(pb restic.PackedBlob) { + blobs[pb] = struct{}{} + })) - err = repo.SaveIndex(context.TODO(), nil, nil, restic.MasterIndexSaveOpts{}) - if err != nil { - t.Fatalf("unable to save new index: %v", err) - } + rtest.OK(t, idx.Save(context.TODO(), repo, nil, nil, restic.MasterIndexSaveOpts{})) + idx = index.NewMasterIndex() + rtest.OK(t, idx.Load(context.TODO(), repo, nil, nil)) - checker := checker.New(repo, false) - err = checker.LoadSnapshots(context.TODO()) - if err != nil { - t.Error(err) - } - - hints, errs := checker.LoadIndex(context.TODO(), nil) - for _, h := range hints { - t.Logf("hint: %v\n", h) - } - - for _, err := range errs { - t.Errorf("checker found error: %v", err) - } - - ctx, cancel := context.WithCancel(context.TODO()) - defer cancel() - - errCh := make(chan error) - go checker.Structure(ctx, nil, errCh) - i := 0 - for err := range errCh { - t.Errorf("checker returned error: %v", err) - i++ - if i == 10 { - t.Errorf("more than 10 errors returned, skipping the rest") - cancel() - break + rtest.OK(t, idx.Each(context.TODO(), func(pb restic.PackedBlob) { + if _, ok := blobs[pb]; ok { + delete(blobs, pb) + } else { + t.Fatalf("unexpected blobs %v", pb) } - } + })) + rtest.Equals(t, 0, len(blobs), "saved index is missing blobs") + + checker.TestCheckRepo(t, repo, false) } From 550d1eeac3540812a811a3ba9fac2eb9a9923f20 Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Sun, 19 May 2024 15:55:42 +0200 Subject: [PATCH 12/25] repository: remove SaveIndex from interface The method is now only indirectly accessible via Prune or RepairIndex. --- internal/index/master_index.go | 9 ++++++- internal/index/master_index_test.go | 2 +- internal/repository/repack_test.go | 41 ++++++----------------------- internal/repository/repair_index.go | 6 +++-- internal/repository/repair_pack.go | 2 +- internal/repository/repository.go | 4 --- internal/restic/repository.go | 10 ------- 7 files changed, 22 insertions(+), 52 deletions(-) diff --git a/internal/index/master_index.go b/internal/index/master_index.go index 796559fd7..981a2b31b 100644 --- a/internal/index/master_index.go +++ b/internal/index/master_index.go @@ -312,10 +312,17 @@ func (mi *MasterIndex) Load(ctx context.Context, r restic.ListerLoaderUnpacked, return mi.MergeFinalIndexes() } +type MasterIndexSaveOpts struct { + SaveProgress *progress.Counter + DeleteProgress func() *progress.Counter + DeleteReport func(id restic.ID, err error) + SkipDeletion bool +} + // Save saves all known indexes to index files, leaving out any // packs whose ID is contained in packBlacklist from finalized indexes. // It also removes the old index files and those listed in extraObsolete. -func (mi *MasterIndex) Save(ctx context.Context, repo restic.SaverRemoverUnpacked, excludePacks restic.IDSet, extraObsolete restic.IDs, opts restic.MasterIndexSaveOpts) error { +func (mi *MasterIndex) Save(ctx context.Context, repo restic.SaverRemoverUnpacked, excludePacks restic.IDSet, extraObsolete restic.IDs, opts MasterIndexSaveOpts) error { p := opts.SaveProgress p.SetMax(uint64(len(mi.Packs(excludePacks)))) diff --git a/internal/index/master_index_test.go b/internal/index/master_index_test.go index 3a7f3da88..41f4cc534 100644 --- a/internal/index/master_index_test.go +++ b/internal/index/master_index_test.go @@ -364,7 +364,7 @@ func testIndexSave(t *testing.T, version uint) { blobs[pb] = struct{}{} })) - rtest.OK(t, idx.Save(context.TODO(), repo, nil, nil, restic.MasterIndexSaveOpts{})) + rtest.OK(t, idx.Save(context.TODO(), repo, nil, nil, index.MasterIndexSaveOpts{})) idx = index.NewMasterIndex() rtest.OK(t, idx.Load(context.TODO(), repo, nil, nil)) diff --git a/internal/repository/repack_test.go b/internal/repository/repack_test.go index 96b75ca46..476e63b47 100644 --- a/internal/repository/repack_test.go +++ b/internal/repository/repack_test.go @@ -7,10 +7,10 @@ import ( "time" "github.com/restic/restic/internal/backend" - "github.com/restic/restic/internal/index" "github.com/restic/restic/internal/repository" "github.com/restic/restic/internal/restic" rtest "github.com/restic/restic/internal/test" + "github.com/restic/restic/internal/ui/progress" "golang.org/x/sync/errgroup" ) @@ -173,35 +173,12 @@ func repack(t *testing.T, repo restic.Repository, packs restic.IDSet, blobs rest } } -func rebuildIndex(t *testing.T, repo restic.Repository) { - err := repo.SetIndex(index.NewMasterIndex()) - rtest.OK(t, err) +func rebuildAndReloadIndex(t *testing.T, repo *repository.Repository) { + rtest.OK(t, repository.RepairIndex(context.TODO(), repo, repository.RepairIndexOptions{ + ReadAllPacks: true, + }, &progress.NoopPrinter{})) - packs := make(map[restic.ID]int64) - err = repo.List(context.TODO(), restic.PackFile, func(id restic.ID, size int64) error { - packs[id] = size - return nil - }) - rtest.OK(t, err) - - _, err = repo.(*repository.Repository).CreateIndexFromPacks(context.TODO(), packs, nil) - rtest.OK(t, err) - - var obsoleteIndexes restic.IDs - err = repo.List(context.TODO(), restic.IndexFile, func(id restic.ID, size int64) error { - obsoleteIndexes = append(obsoleteIndexes, id) - return nil - }) - rtest.OK(t, err) - - err = repo.SaveIndex(context.TODO(), restic.NewIDSet(), obsoleteIndexes, restic.MasterIndexSaveOpts{}) - rtest.OK(t, err) -} - -func reloadIndex(t *testing.T, repo restic.Repository) { - if err := repo.LoadIndex(context.TODO(), nil); err != nil { - t.Fatalf("error loading new index: %v", err) - } + rtest.OK(t, repo.LoadIndex(context.TODO(), nil)) } func TestRepack(t *testing.T) { @@ -236,8 +213,7 @@ func testRepack(t *testing.T, version uint) { removePacks := findPacksForBlobs(t, repo, removeBlobs) repack(t, repo, removePacks, keepBlobs) - rebuildIndex(t, repo) - reloadIndex(t, repo) + rebuildAndReloadIndex(t, repo) packsAfter = listPacks(t, repo) for id := range removePacks { @@ -307,8 +283,7 @@ func testRepackCopy(t *testing.T, version uint) { if err != nil { t.Fatal(err) } - rebuildIndex(t, dstRepo) - reloadIndex(t, dstRepo) + rebuildAndReloadIndex(t, dstRepo) for h := range keepBlobs { list := dstRepo.LookupBlob(h.Type, h.ID) diff --git a/internal/repository/repair_index.go b/internal/repository/repair_index.go index c20641d50..5674a3963 100644 --- a/internal/repository/repair_index.go +++ b/internal/repository/repair_index.go @@ -28,6 +28,8 @@ func RepairIndex(ctx context.Context, repo *Repository, opts RepairIndexOptions, if err != nil { return err } + repo.clearIndex() + } else { printer.P("loading indexes...\n") mi := index.NewMasterIndex() @@ -111,11 +113,11 @@ func RepairIndex(ctx context.Context, repo *Repository, opts RepairIndexOptions, return nil } -func rebuildIndexFiles(ctx context.Context, repo restic.Repository, removePacks restic.IDSet, extraObsolete restic.IDs, skipDeletion bool, printer progress.Printer) error { +func rebuildIndexFiles(ctx context.Context, repo *Repository, removePacks restic.IDSet, extraObsolete restic.IDs, skipDeletion bool, printer progress.Printer) error { printer.P("rebuilding index\n") bar := printer.NewCounter("packs processed") - return repo.SaveIndex(ctx, removePacks, extraObsolete, restic.MasterIndexSaveOpts{ + return repo.idx.Save(ctx, repo, removePacks, extraObsolete, index.MasterIndexSaveOpts{ SaveProgress: bar, DeleteProgress: func() *progress.Counter { return printer.NewCounter("old indexes deleted") diff --git a/internal/repository/repair_pack.go b/internal/repository/repair_pack.go index cac7aac10..7cb9d9f3e 100644 --- a/internal/repository/repair_pack.go +++ b/internal/repository/repair_pack.go @@ -10,7 +10,7 @@ import ( "golang.org/x/sync/errgroup" ) -func RepairPacks(ctx context.Context, repo restic.Repository, ids restic.IDSet, printer progress.Printer) error { +func RepairPacks(ctx context.Context, repo *Repository, ids restic.IDSet, printer progress.Printer) error { wg, wgCtx := errgroup.WithContext(ctx) repo.StartPackUploader(wgCtx, wg) diff --git a/internal/repository/repository.go b/internal/repository/repository.go index bd7de0de4..7abc5911a 100644 --- a/internal/repository/repository.go +++ b/internal/repository/repository.go @@ -587,10 +587,6 @@ func (r *Repository) LookupBlobSize(tpe restic.BlobType, id restic.ID) (uint, bo return r.idx.LookupSize(restic.BlobHandle{Type: tpe, ID: id}) } -func (r *Repository) SaveIndex(ctx context.Context, excludePacks restic.IDSet, extraObsolete restic.IDs, opts restic.MasterIndexSaveOpts) error { - return r.idx.Save(ctx, r, excludePacks, extraObsolete, opts) -} - // ListBlobs runs fn on all blobs known to the index. When the context is cancelled, // the index iteration returns immediately with ctx.Err(). This blocks any modification of the index. func (r *Repository) ListBlobs(ctx context.Context, fn func(restic.PackedBlob)) error { diff --git a/internal/restic/repository.go b/internal/restic/repository.go index 9e6d6b99b..b18b036a7 100644 --- a/internal/restic/repository.go +++ b/internal/restic/repository.go @@ -23,7 +23,6 @@ type Repository interface { LoadIndex(ctx context.Context, p *progress.Counter) error SetIndex(mi MasterIndex) error - SaveIndex(ctx context.Context, excludePacks IDSet, extraObsolete IDs, opts MasterIndexSaveOpts) error LookupBlob(t BlobType, id ID) []PackedBlob LookupBlobSize(t BlobType, id ID) (size uint, exists bool) @@ -106,13 +105,6 @@ type PackBlobs struct { Blobs []Blob } -type MasterIndexSaveOpts struct { - SaveProgress *progress.Counter - DeleteProgress func() *progress.Counter - DeleteReport func(id ID, err error) - SkipDeletion bool -} - // MasterIndex keeps track of the blobs are stored within files. type MasterIndex interface { Has(bh BlobHandle) bool @@ -122,8 +114,6 @@ type MasterIndex interface { // the index iteration returns immediately with ctx.Err(). This blocks any modification of the index. Each(ctx context.Context, fn func(PackedBlob)) error ListPacks(ctx context.Context, packs IDSet) <-chan PackBlobs - - Save(ctx context.Context, repo SaverRemoverUnpacked, excludePacks IDSet, extraObsolete IDs, opts MasterIndexSaveOpts) error } // Lister allows listing files in a backend. From 04ad9f0c0c5f28d47d9c921a5219d820ef1af364 Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Sun, 19 May 2024 16:10:48 +0200 Subject: [PATCH 13/25] repository: remove Packer and SavePacker from public interface --- internal/repository/packer_manager.go | 18 +++++++++--------- internal/repository/packer_manager_test.go | 6 +++--- internal/repository/packer_uploader.go | 12 ++++++------ 3 files changed, 18 insertions(+), 18 deletions(-) diff --git a/internal/repository/packer_manager.go b/internal/repository/packer_manager.go index 22eca0c2e..c7599f5af 100644 --- a/internal/repository/packer_manager.go +++ b/internal/repository/packer_manager.go @@ -21,8 +21,8 @@ import ( "github.com/minio/sha256-simd" ) -// Packer holds a pack.Packer together with a hash writer. -type Packer struct { +// packer holds a pack.packer together with a hash writer. +type packer struct { *pack.Packer tmpfile *os.File bufWr *bufio.Writer @@ -32,16 +32,16 @@ type Packer struct { type packerManager struct { tpe restic.BlobType key *crypto.Key - queueFn func(ctx context.Context, t restic.BlobType, p *Packer) error + queueFn func(ctx context.Context, t restic.BlobType, p *packer) error pm sync.Mutex - packer *Packer + packer *packer packSize uint } // newPackerManager returns a new packer manager which writes temporary files // to a temporary directory -func newPackerManager(key *crypto.Key, tpe restic.BlobType, packSize uint, queueFn func(ctx context.Context, t restic.BlobType, p *Packer) error) *packerManager { +func newPackerManager(key *crypto.Key, tpe restic.BlobType, packSize uint, queueFn func(ctx context.Context, t restic.BlobType, p *packer) error) *packerManager { return &packerManager{ tpe: tpe, key: key, @@ -114,7 +114,7 @@ func (r *packerManager) SaveBlob(ctx context.Context, t restic.BlobType, id rest // findPacker returns a packer for a new blob of size bytes. Either a new one is // created or one is returned that already has some blobs. -func (r *packerManager) newPacker() (packer *Packer, err error) { +func (r *packerManager) newPacker() (pck *packer, err error) { debug.Log("create new pack") tmpfile, err := fs.TempFile("", "restic-temp-pack-") if err != nil { @@ -123,17 +123,17 @@ func (r *packerManager) newPacker() (packer *Packer, err error) { bufWr := bufio.NewWriter(tmpfile) p := pack.NewPacker(r.key, bufWr) - packer = &Packer{ + pck = &packer{ Packer: p, tmpfile: tmpfile, bufWr: bufWr, } - return packer, nil + return pck, nil } // savePacker stores p in the backend. -func (r *Repository) savePacker(ctx context.Context, t restic.BlobType, p *Packer) error { +func (r *Repository) savePacker(ctx context.Context, t restic.BlobType, p *packer) error { debug.Log("save packer for %v with %d blobs (%d bytes)\n", t, p.Packer.Count(), p.Packer.Size()) err := p.Packer.Finalize() if err != nil { diff --git a/internal/repository/packer_manager_test.go b/internal/repository/packer_manager_test.go index 8984073da..0f3aea05f 100644 --- a/internal/repository/packer_manager_test.go +++ b/internal/repository/packer_manager_test.go @@ -70,7 +70,7 @@ func testPackerManager(t testing.TB) int64 { rnd := rand.New(rand.NewSource(randomSeed)) savedBytes := int(0) - pm := newPackerManager(crypto.NewRandomKey(), restic.DataBlob, DefaultPackSize, func(ctx context.Context, tp restic.BlobType, p *Packer) error { + pm := newPackerManager(crypto.NewRandomKey(), restic.DataBlob, DefaultPackSize, func(ctx context.Context, tp restic.BlobType, p *packer) error { err := p.Finalize() if err != nil { return err @@ -92,7 +92,7 @@ func testPackerManager(t testing.TB) int64 { func TestPackerManagerWithOversizeBlob(t *testing.T) { packFiles := int(0) sizeLimit := uint(512 * 1024) - pm := newPackerManager(crypto.NewRandomKey(), restic.DataBlob, sizeLimit, func(ctx context.Context, tp restic.BlobType, p *Packer) error { + pm := newPackerManager(crypto.NewRandomKey(), restic.DataBlob, sizeLimit, func(ctx context.Context, tp restic.BlobType, p *packer) error { packFiles++ return nil }) @@ -122,7 +122,7 @@ func BenchmarkPackerManager(t *testing.B) { for i := 0; i < t.N; i++ { rnd.Seed(randomSeed) - pm := newPackerManager(crypto.NewRandomKey(), restic.DataBlob, DefaultPackSize, func(ctx context.Context, t restic.BlobType, p *Packer) error { + pm := newPackerManager(crypto.NewRandomKey(), restic.DataBlob, DefaultPackSize, func(ctx context.Context, t restic.BlobType, p *packer) error { return nil }) fillPacks(t, rnd, pm, blobBuf) diff --git a/internal/repository/packer_uploader.go b/internal/repository/packer_uploader.go index 30c8f77af..936e7ea1d 100644 --- a/internal/repository/packer_uploader.go +++ b/internal/repository/packer_uploader.go @@ -7,13 +7,13 @@ import ( "golang.org/x/sync/errgroup" ) -// SavePacker implements saving a pack in the repository. -type SavePacker interface { - savePacker(ctx context.Context, t restic.BlobType, p *Packer) error +// savePacker implements saving a pack in the repository. +type savePacker interface { + savePacker(ctx context.Context, t restic.BlobType, p *packer) error } type uploadTask struct { - packer *Packer + packer *packer tpe restic.BlobType } @@ -21,7 +21,7 @@ type packerUploader struct { uploadQueue chan uploadTask } -func newPackerUploader(ctx context.Context, wg *errgroup.Group, repo SavePacker, connections uint) *packerUploader { +func newPackerUploader(ctx context.Context, wg *errgroup.Group, repo savePacker, connections uint) *packerUploader { pu := &packerUploader{ uploadQueue: make(chan uploadTask), } @@ -48,7 +48,7 @@ func newPackerUploader(ctx context.Context, wg *errgroup.Group, repo SavePacker, return pu } -func (pu *packerUploader) QueuePacker(ctx context.Context, t restic.BlobType, p *Packer) (err error) { +func (pu *packerUploader) QueuePacker(ctx context.Context, t restic.BlobType, p *packer) (err error) { select { case <-ctx.Done(): return ctx.Err() From 76e6719f2e8b6366eef3a5cba876f1dfb789a0bb Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Sun, 19 May 2024 16:14:31 +0200 Subject: [PATCH 14/25] repository: make CreateIndexFromPacks method private --- internal/repository/repair_index.go | 2 +- internal/repository/repository.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/internal/repository/repair_index.go b/internal/repository/repair_index.go index 5674a3963..4ac6cdd3a 100644 --- a/internal/repository/repair_index.go +++ b/internal/repository/repair_index.go @@ -92,7 +92,7 @@ func RepairIndex(ctx context.Context, repo *Repository, opts RepairIndexOptions, printer.P("reading pack files\n") bar := printer.NewCounter("packs") bar.SetMax(uint64(len(packSizeFromList))) - invalidFiles, err := repo.CreateIndexFromPacks(ctx, packSizeFromList, bar) + invalidFiles, err := repo.createIndexFromPacks(ctx, packSizeFromList, bar) bar.Done() if err != nil { return err diff --git a/internal/repository/repository.go b/internal/repository/repository.go index 7abc5911a..d9dfc4556 100644 --- a/internal/repository/repository.go +++ b/internal/repository/repository.go @@ -656,10 +656,10 @@ func (r *Repository) LoadIndex(ctx context.Context, p *progress.Counter) error { return r.prepareCache() } -// CreateIndexFromPacks creates a new index by reading all given pack files (with sizes). +// createIndexFromPacks creates a new index by reading all given pack files (with sizes). // The index is added to the MasterIndex but not marked as finalized. // Returned is the list of pack files which could not be read. -func (r *Repository) CreateIndexFromPacks(ctx context.Context, packsize map[restic.ID]int64, p *progress.Counter) (invalid restic.IDs, err error) { +func (r *Repository) createIndexFromPacks(ctx context.Context, packsize map[restic.ID]int64, p *progress.Counter) (invalid restic.IDs, err error) { var m sync.Mutex debug.Log("Loading index from pack files") From 9aa0c90fb2d5eae91b74b58c822da6b030471182 Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Sun, 19 May 2024 16:40:40 +0200 Subject: [PATCH 15/25] index: remove supersedes field Using the field with its current semantics is nearly impossible to get right. Remove it as it will be replaced anyways in repository format 3. --- internal/index/index.go | 38 +++++++--------------------------- internal/index/index_test.go | 6 ------ internal/index/master_index.go | 13 +----------- 3 files changed, 8 insertions(+), 49 deletions(-) diff --git a/internal/index/index.go b/internal/index/index.go index 1c20fe38d..e09b683f1 100644 --- a/internal/index/index.go +++ b/internal/index/index.go @@ -50,10 +50,9 @@ type Index struct { byType [restic.NumBlobTypes]indexMap packs restic.IDs - final bool // set to true for all indexes read from the backend ("finalized") - ids restic.IDs // set to the IDs of the contained finalized indexes - supersedes restic.IDs - created time.Time + final bool // set to true for all indexes read from the backend ("finalized") + ids restic.IDs // set to the IDs of the contained finalized indexes + created time.Time } // NewIndex returns a new index. @@ -197,25 +196,6 @@ func (idx *Index) LookupSize(bh restic.BlobHandle) (plaintextLength uint, found return uint(crypto.PlaintextLength(int(e.length))), true } -// Supersedes returns the list of indexes this index supersedes, if any. -func (idx *Index) Supersedes() restic.IDs { - return idx.supersedes -} - -// AddToSupersedes adds the ids to the list of indexes superseded by this -// index. If the index has already been finalized, an error is returned. -func (idx *Index) AddToSupersedes(ids ...restic.ID) error { - idx.m.Lock() - defer idx.m.Unlock() - - if idx.final { - return errors.New("index already finalized") - } - - idx.supersedes = append(idx.supersedes, ids...) - return nil -} - // Each passes all blobs known to the index to the callback fn. This blocks any // modification of the index. func (idx *Index) Each(ctx context.Context, fn func(restic.PackedBlob)) error { @@ -356,8 +336,8 @@ func (idx *Index) generatePackList() ([]packJSON, error) { } type jsonIndex struct { - Supersedes restic.IDs `json:"supersedes,omitempty"` - Packs []packJSON `json:"packs"` + // removed: Supersedes restic.IDs `json:"supersedes,omitempty"` + Packs []packJSON `json:"packs"` } // Encode writes the JSON serialization of the index to the writer w. @@ -373,8 +353,7 @@ func (idx *Index) Encode(w io.Writer) error { enc := json.NewEncoder(w) idxJSON := jsonIndex{ - Supersedes: idx.supersedes, - Packs: list, + Packs: list, } return enc.Encode(idxJSON) } @@ -433,8 +412,7 @@ func (idx *Index) Dump(w io.Writer) error { } outer := jsonIndex{ - Supersedes: idx.Supersedes(), - Packs: list, + Packs: list, } buf, err := json.MarshalIndent(outer, "", " ") @@ -495,7 +473,6 @@ func (idx *Index) merge(idx2 *Index) error { } idx.ids = append(idx.ids, idx2.ids...) - idx.supersedes = append(idx.supersedes, idx2.supersedes...) return nil } @@ -545,7 +522,6 @@ func DecodeIndex(buf []byte, id restic.ID) (idx *Index, oldFormat bool, err erro }) } } - idx.supersedes = idxJSON.Supersedes idx.ids = append(idx.ids, id) idx.final = true diff --git a/internal/index/index_test.go b/internal/index/index_test.go index bafd95c48..66cec23f6 100644 --- a/internal/index/index_test.go +++ b/internal/index/index_test.go @@ -309,8 +309,6 @@ func TestIndexUnserialize(t *testing.T) { {docExampleV1, 1}, {docExampleV2, 2}, } { - oldIdx := restic.IDs{restic.TestParseID("ed54ae36197f4745ebc4b54d10e0f623eaaaedd03013eb7ae90df881b7781452")} - idx, oldFormat, err := index.DecodeIndex(task.idxBytes, restic.NewRandomID()) rtest.OK(t, err) rtest.Assert(t, !oldFormat, "new index format recognized as old format") @@ -337,8 +335,6 @@ func TestIndexUnserialize(t *testing.T) { } } - rtest.Equals(t, oldIdx, idx.Supersedes()) - blobs := listPack(t, idx, exampleLookupTest.packID) if len(blobs) != len(exampleLookupTest.blobs) { t.Fatalf("expected %d blobs in pack, got %d", len(exampleLookupTest.blobs), len(blobs)) @@ -446,8 +442,6 @@ func TestIndexUnserializeOld(t *testing.T) { rtest.Equals(t, test.offset, blob.Offset) rtest.Equals(t, test.length, blob.Length) } - - rtest.Equals(t, 0, len(idx.Supersedes())) } func TestIndexPacks(t *testing.T) { diff --git a/internal/index/master_index.go b/internal/index/master_index.go index 981a2b31b..17e681411 100644 --- a/internal/index/master_index.go +++ b/internal/index/master_index.go @@ -332,7 +332,7 @@ func (mi *MasterIndex) Save(ctx context.Context, repo restic.SaverRemoverUnpacke debug.Log("start rebuilding index of %d indexes, excludePacks: %v", len(mi.idx), excludePacks) newIndex := NewIndex() - obsolete := restic.NewIDSet() + obsolete := restic.NewIDSet(extraObsolete...) // track spawned goroutines using wg, create a new context which is // cancelled as soon as an error occurs. @@ -351,11 +351,6 @@ func (mi *MasterIndex) Save(ctx context.Context, repo restic.SaverRemoverUnpacke } debug.Log("adding index ids %v to supersedes field", ids) - - err = newIndex.AddToSupersedes(ids...) - if err != nil { - return err - } obsolete.Merge(restic.NewIDSet(ids...)) } else { debug.Log("index %d isn't final, don't add to supersedes field", i) @@ -380,12 +375,6 @@ func (mi *MasterIndex) Save(ctx context.Context, repo restic.SaverRemoverUnpacke } } - err := newIndex.AddToSupersedes(extraObsolete...) - if err != nil { - return err - } - obsolete.Merge(restic.NewIDSet(extraObsolete...)) - select { case ch <- newIndex: case <-wgCtx.Done(): From 68fa0e0305421a5cd33779398a345d7c9b15178f Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Sun, 19 May 2024 19:10:08 +0200 Subject: [PATCH 16/25] prune: no longer disable automatic index updates this allows prune to resume an interrupted prune run. --- cmd/restic/cmd_prune.go | 3 --- internal/repository/packer_manager.go | 3 --- internal/repository/repository.go | 12 ------------ 3 files changed, 18 deletions(-) diff --git a/cmd/restic/cmd_prune.go b/cmd/restic/cmd_prune.go index d62a7df75..7872589be 100644 --- a/cmd/restic/cmd_prune.go +++ b/cmd/restic/cmd_prune.go @@ -162,9 +162,6 @@ func runPrune(ctx context.Context, opts PruneOptions, gopts GlobalOptions, term } func runPruneWithRepo(ctx context.Context, opts PruneOptions, gopts GlobalOptions, repo *repository.Repository, ignoreSnapshots restic.IDSet, term *termstatus.Terminal) error { - // we do not need index updates while pruning! - repo.DisableAutoIndexUpdate() - if repo.Cache == nil { Print("warning: running prune without a cache, this may be very slow!\n") } diff --git a/internal/repository/packer_manager.go b/internal/repository/packer_manager.go index c7599f5af..76734fb87 100644 --- a/internal/repository/packer_manager.go +++ b/internal/repository/packer_manager.go @@ -200,8 +200,5 @@ func (r *Repository) savePacker(ctx context.Context, t restic.BlobType, p *packe r.idx.StorePack(id, p.Packer.Blobs()) // Save index if full - if r.noAutoIndexUpdate { - return nil - } return r.idx.SaveFullIndex(ctx, r) } diff --git a/internal/repository/repository.go b/internal/repository/repository.go index d9dfc4556..f0ef93ecf 100644 --- a/internal/repository/repository.go +++ b/internal/repository/repository.go @@ -42,8 +42,6 @@ type Repository struct { opts Options - noAutoIndexUpdate bool - packerWg *errgroup.Group uploader *packerUploader treePM *packerManager @@ -130,12 +128,6 @@ func New(be backend.Backend, opts Options) (*Repository, error) { return repo, nil } -// DisableAutoIndexUpdate deactives the automatic finalization and upload of new -// indexes once these are full -func (r *Repository) DisableAutoIndexUpdate() { - r.noAutoIndexUpdate = true -} - // setConfig assigns the given config and updates the repository parameters accordingly func (r *Repository) setConfig(cfg restic.Config) { r.cfg = cfg @@ -526,10 +518,6 @@ func (r *Repository) Flush(ctx context.Context) error { return err } - // Save index after flushing only if noAutoIndexUpdate is not set - if r.noAutoIndexUpdate { - return nil - } return r.idx.SaveIndex(ctx, r) } From 72482ce5bd173ed5a8a4e9dda393925e24c3196b Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Sun, 19 May 2024 20:36:16 +0200 Subject: [PATCH 17/25] index: misc cleanups --- internal/index/index_parallel.go | 2 +- internal/index/master_index.go | 13 ++++++++----- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/internal/index/index_parallel.go b/internal/index/index_parallel.go index d51d5930f..3d5621a2d 100644 --- a/internal/index/index_parallel.go +++ b/internal/index/index_parallel.go @@ -11,7 +11,7 @@ import ( // ForAllIndexes loads all index files in parallel and calls the given callback. // It is guaranteed that the function is not run concurrently. If the callback // returns an error, this function is cancelled and also returns that error. -func ForAllIndexes(ctx context.Context, lister restic.Lister, repo restic.ListerLoaderUnpacked, +func ForAllIndexes(ctx context.Context, lister restic.Lister, repo restic.LoaderUnpacked, fn func(id restic.ID, index *Index, oldFormat bool, err error) error) error { // decoding an index can take quite some time such that this can be both CPU- or IO-bound diff --git a/internal/index/master_index.go b/internal/index/master_index.go index 17e681411..a5ee40b52 100644 --- a/internal/index/master_index.go +++ b/internal/index/master_index.go @@ -23,12 +23,15 @@ type MasterIndex struct { // NewMasterIndex creates a new master index. func NewMasterIndex() *MasterIndex { + mi := &MasterIndex{pendingBlobs: restic.NewBlobSet()} + mi.clear() + return mi +} + +func (mi *MasterIndex) clear() { // Always add an empty final index, such that MergeFinalIndexes can merge into this. - // Note that removing this index could lead to a race condition in the rare - // situation that only two indexes exist which are saved and merged concurrently. - idx := []*Index{NewIndex()} - idx[0].Finalize() - return &MasterIndex{idx: idx, pendingBlobs: restic.NewBlobSet()} + mi.idx = []*Index{NewIndex()} + mi.idx[0].Finalize() } func (mi *MasterIndex) MarkCompressed() { From ad98fbf7dd847e4a5888475f1ef6456ffb491dc3 Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Sun, 19 May 2024 20:37:19 +0200 Subject: [PATCH 18/25] restic: add IDSet.Clone() method --- internal/restic/idset.go | 6 ++++++ internal/restic/idset_test.go | 3 +++ 2 files changed, 9 insertions(+) diff --git a/internal/restic/idset.go b/internal/restic/idset.go index 1b12a6398..9e6e3c6fd 100644 --- a/internal/restic/idset.go +++ b/internal/restic/idset.go @@ -105,3 +105,9 @@ func (s IDSet) String() string { str := s.List().String() return "{" + str[1:len(str)-1] + "}" } + +func (s IDSet) Clone() IDSet { + c := NewIDSet() + c.Merge(s) + return c +} diff --git a/internal/restic/idset_test.go b/internal/restic/idset_test.go index 734b31237..14c88b314 100644 --- a/internal/restic/idset_test.go +++ b/internal/restic/idset_test.go @@ -35,4 +35,7 @@ func TestIDSet(t *testing.T) { } rtest.Equals(t, "{1285b303 7bb086db f658198b}", set.String()) + + copied := set.Clone() + rtest.Equals(t, "{1285b303 7bb086db f658198b}", copied.String()) } From 5f7b48e65f302c2ca4965a4f815ef4bba0d2e842 Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Sun, 19 May 2024 20:38:58 +0200 Subject: [PATCH 19/25] index: replace Save() method with Rewrite and SaveFallback Rewrite implements a streaming rewrite of the index that excludes the given packs. For this it loads all index files from the repository and only modifies those that require changes. This will reduce the index churn when running prune. Rewrite does not require the in-memory index and thus can drop it to significantly reduce the memory usage. However, `prune --unsafe-recovery` cannot use this strategy and requires a separate method to save the whole in-memory index. This is now handled using SaveFallback. --- internal/index/master_index.go | 227 +++++++++++++++++++---- internal/index/master_index_test.go | 2 +- internal/repository/prune.go | 12 +- internal/repository/repair_index.go | 15 +- internal/repository/repair_index_test.go | 4 - internal/repository/repair_pack.go | 2 +- 6 files changed, 203 insertions(+), 59 deletions(-) diff --git a/internal/index/master_index.go b/internal/index/master_index.go index a5ee40b52..40d1e3446 100644 --- a/internal/index/master_index.go +++ b/internal/index/master_index.go @@ -315,18 +315,190 @@ func (mi *MasterIndex) Load(ctx context.Context, r restic.ListerLoaderUnpacked, return mi.MergeFinalIndexes() } -type MasterIndexSaveOpts struct { +type MasterIndexRewriteOpts struct { SaveProgress *progress.Counter DeleteProgress func() *progress.Counter DeleteReport func(id restic.ID, err error) - SkipDeletion bool } -// Save saves all known indexes to index files, leaving out any -// packs whose ID is contained in packBlacklist from finalized indexes. -// It also removes the old index files and those listed in extraObsolete. -func (mi *MasterIndex) Save(ctx context.Context, repo restic.SaverRemoverUnpacked, excludePacks restic.IDSet, extraObsolete restic.IDs, opts MasterIndexSaveOpts) error { +// Rewrite removes packs whose ID is in excludePacks from all known indexes. +// It also removes the rewritten index files and those listed in extraObsolete. +// If oldIndexes is not nil, then only the indexes in this set are processed. +// This is used by repair index to only rewrite and delete the old indexes. +// +// Must not be called concurrently to any other MasterIndex operation. +func (mi *MasterIndex) Rewrite(ctx context.Context, repo restic.Unpacked, excludePacks restic.IDSet, oldIndexes restic.IDSet, extraObsolete restic.IDs, opts MasterIndexRewriteOpts) error { + for _, idx := range mi.idx { + if !idx.Final() { + panic("internal error - index must be saved before calling MasterIndex.Rewrite") + } + } + + var indexes restic.IDSet + if oldIndexes != nil { + // repair index adds new index entries for already existing pack files + // only remove the old (possibly broken) entries by only processing old indexes + indexes = oldIndexes + } else { + indexes = mi.IDs() + } + p := opts.SaveProgress + p.SetMax(uint64(len(indexes))) + + // reset state which is not necessary for Rewrite and just consumes a lot of memory + // the index state would be invalid after Rewrite completes anyways + mi.clear() + runtime.GC() + + // copy excludePacks to prevent unintended sideeffects + excludePacks = excludePacks.Clone() + debug.Log("start rebuilding index of %d indexes, excludePacks: %v", len(indexes), excludePacks) + wg, wgCtx := errgroup.WithContext(ctx) + + idxCh := make(chan restic.ID) + wg.Go(func() error { + defer close(idxCh) + for id := range indexes { + select { + case idxCh <- id: + case <-wgCtx.Done(): + return wgCtx.Err() + } + } + return nil + }) + + var rewriteWg sync.WaitGroup + type rewriteTask struct { + idx *Index + oldFormat bool + } + rewriteCh := make(chan rewriteTask) + loader := func() error { + defer rewriteWg.Done() + for id := range idxCh { + buf, err := repo.LoadUnpacked(wgCtx, restic.IndexFile, id) + if err != nil { + return fmt.Errorf("LoadUnpacked(%v): %w", id.Str(), err) + } + idx, oldFormat, err := DecodeIndex(buf, id) + if err != nil { + return err + } + + select { + case rewriteCh <- rewriteTask{idx, oldFormat}: + case <-wgCtx.Done(): + return wgCtx.Err() + } + + } + return nil + } + // loading an index can take quite some time such that this can be both CPU- or IO-bound + loaderCount := int(repo.Connections()) + runtime.GOMAXPROCS(0) + // run workers on ch + for i := 0; i < loaderCount; i++ { + rewriteWg.Add(1) + wg.Go(loader) + } + wg.Go(func() error { + rewriteWg.Wait() + close(rewriteCh) + return nil + }) + + obsolete := restic.NewIDSet(extraObsolete...) + saveCh := make(chan *Index) + + wg.Go(func() error { + defer close(saveCh) + newIndex := NewIndex() + for task := range rewriteCh { + // always rewrite indexes using the old format, that include a pack that must be removed or that are not full + if !task.oldFormat && len(task.idx.Packs().Intersect(excludePacks)) == 0 && IndexFull(task.idx, mi.compress) { + // make sure that each pack is only stored exactly once in the index + excludePacks.Merge(task.idx.Packs()) + // index is already up to date + p.Add(1) + continue + } + + ids, err := task.idx.IDs() + if err != nil || len(ids) != 1 { + panic("internal error, index has no ID") + } + obsolete.Merge(restic.NewIDSet(ids...)) + + for pbs := range task.idx.EachByPack(wgCtx, excludePacks) { + newIndex.StorePack(pbs.PackID, pbs.Blobs) + if IndexFull(newIndex, mi.compress) { + select { + case saveCh <- newIndex: + case <-wgCtx.Done(): + return wgCtx.Err() + } + newIndex = NewIndex() + } + } + if wgCtx.Err() != nil { + return wgCtx.Err() + } + // make sure that each pack is only stored exactly once in the index + excludePacks.Merge(task.idx.Packs()) + p.Add(1) + } + + select { + case saveCh <- newIndex: + case <-wgCtx.Done(): + } + return nil + }) + + // a worker receives an index from ch, and saves the index + worker := func() error { + for idx := range saveCh { + idx.Finalize() + if _, err := SaveIndex(wgCtx, repo, idx); err != nil { + return err + } + } + return nil + } + + // encoding an index can take quite some time such that this can be both CPU- or IO-bound + workerCount := int(repo.Connections()) + runtime.GOMAXPROCS(0) + // run workers on ch + for i := 0; i < workerCount; i++ { + wg.Go(worker) + } + err := wg.Wait() + p.Done() + if err != nil { + return fmt.Errorf("failed to rewrite indexes: %w", err) + } + + p = nil + if opts.DeleteProgress != nil { + p = opts.DeleteProgress() + } + defer p.Done() + return restic.ParallelRemove(ctx, repo, obsolete, restic.IndexFile, func(id restic.ID, err error) error { + if opts.DeleteReport != nil { + opts.DeleteReport(id, err) + } + return err + }, p) +} + +// SaveFallback saves all known indexes to index files, leaving out any +// packs whose ID is contained in packBlacklist from finalized indexes. +// It is only intended for use by prune with the UnsafeRecovery option. +// +// Must not be called concurrently to any other MasterIndex operation. +func (mi *MasterIndex) SaveFallback(ctx context.Context, repo restic.SaverRemoverUnpacked, excludePacks restic.IDSet, p *progress.Counter) error { p.SetMax(uint64(len(mi.Packs(excludePacks)))) mi.idxMutex.Lock() @@ -334,33 +506,23 @@ func (mi *MasterIndex) Save(ctx context.Context, repo restic.SaverRemoverUnpacke debug.Log("start rebuilding index of %d indexes, excludePacks: %v", len(mi.idx), excludePacks) - newIndex := NewIndex() - obsolete := restic.NewIDSet(extraObsolete...) - - // track spawned goroutines using wg, create a new context which is - // cancelled as soon as an error occurs. + obsolete := restic.NewIDSet() wg, wgCtx := errgroup.WithContext(ctx) ch := make(chan *Index) - wg.Go(func() error { defer close(ch) - for i, idx := range mi.idx { + newIndex := NewIndex() + for _, idx := range mi.idx { if idx.Final() { ids, err := idx.IDs() if err != nil { - debug.Log("index %d does not have an ID: %v", err) - return err + panic("internal error - finalized index without ID") } - debug.Log("adding index ids %v to supersedes field", ids) obsolete.Merge(restic.NewIDSet(ids...)) - } else { - debug.Log("index %d isn't final, don't add to supersedes field", i) } - debug.Log("adding index %d", i) - for pbs := range idx.EachByPack(wgCtx, excludePacks) { newIndex.StorePack(pbs.PackID, pbs.Blobs) p.Add(1) @@ -396,33 +558,18 @@ func (mi *MasterIndex) Save(ctx context.Context, repo restic.SaverRemoverUnpacke return nil } - // encoding an index can take quite some time such that this can be both CPU- or IO-bound - workerCount := int(repo.Connections()) + runtime.GOMAXPROCS(0) + // keep concurrency bounded as we're on a fallback path + workerCount := int(repo.Connections()) // run workers on ch for i := 0; i < workerCount; i++ { wg.Go(worker) } err := wg.Wait() p.Done() - if err != nil { - return err - } + // the index no longer matches to stored state + mi.clear() - if opts.SkipDeletion { - return nil - } - - p = nil - if opts.DeleteProgress != nil { - p = opts.DeleteProgress() - } - defer p.Done() - return restic.ParallelRemove(ctx, repo, obsolete, restic.IndexFile, func(id restic.ID, err error) error { - if opts.DeleteReport != nil { - opts.DeleteReport(id, err) - } - return err - }, p) + return err } // SaveIndex saves an index in the repository. diff --git a/internal/index/master_index_test.go b/internal/index/master_index_test.go index 41f4cc534..b8a29262e 100644 --- a/internal/index/master_index_test.go +++ b/internal/index/master_index_test.go @@ -364,7 +364,7 @@ func testIndexSave(t *testing.T, version uint) { blobs[pb] = struct{}{} })) - rtest.OK(t, idx.Save(context.TODO(), repo, nil, nil, index.MasterIndexSaveOpts{})) + rtest.OK(t, idx.Rewrite(context.TODO(), repo, nil, nil, nil, index.MasterIndexRewriteOpts{})) idx = index.NewMasterIndex() rtest.OK(t, idx.Load(context.TODO(), repo, nil, nil)) diff --git a/internal/repository/prune.go b/internal/repository/prune.go index 712986e61..895b07994 100644 --- a/internal/repository/prune.go +++ b/internal/repository/prune.go @@ -522,7 +522,7 @@ func (plan *PrunePlan) Stats() PruneStats { // - rebuild the index while ignoring all files that will be deleted // - delete the files // plan.removePacks and plan.ignorePacks are modified in this function. -func (plan *PrunePlan) Execute(ctx context.Context, printer progress.Printer) (err error) { +func (plan *PrunePlan) Execute(ctx context.Context, printer progress.Printer) error { if plan.opts.DryRun { printer.V("Repeated prune dry-runs can report slightly different amounts of data to keep or repack. This is expected behavior.\n\n") if len(plan.removePacksFirst) > 0 { @@ -581,12 +581,12 @@ func (plan *PrunePlan) Execute(ctx context.Context, printer progress.Printer) (e if plan.opts.UnsafeRecovery { printer.P("deleting index files\n") indexFiles := repo.idx.IDs() - err = deleteFiles(ctx, false, repo, indexFiles, restic.IndexFile, printer) + err := deleteFiles(ctx, false, repo, indexFiles, restic.IndexFile, printer) if err != nil { return errors.Fatalf("%s", err) } } else if len(plan.ignorePacks) != 0 { - err = rebuildIndexFiles(ctx, repo, plan.ignorePacks, nil, false, printer) + err := rewriteIndexFiles(ctx, repo, plan.ignorePacks, nil, nil, printer) if err != nil { return errors.Fatalf("%s", err) } @@ -601,16 +601,12 @@ func (plan *PrunePlan) Execute(ctx context.Context, printer progress.Printer) (e } if plan.opts.UnsafeRecovery { - err = rebuildIndexFiles(ctx, repo, plan.ignorePacks, nil, true, printer) + err := repo.idx.SaveFallback(ctx, repo, plan.ignorePacks, printer.NewCounter("packs processed")) if err != nil { return errors.Fatalf("%s", err) } } - if err != nil { - return err - } - // drop outdated in-memory index repo.clearIndex() diff --git a/internal/repository/repair_index.go b/internal/repository/repair_index.go index 4ac6cdd3a..e01131923 100644 --- a/internal/repository/repair_index.go +++ b/internal/repository/repair_index.go @@ -62,6 +62,8 @@ func RepairIndex(ctx context.Context, repo *Repository, opts RepairIndexOptions, } } + oldIndexes := repo.idx.IDs() + printer.P("getting pack files to read...\n") err := repo.List(ctx, restic.PackFile, func(id restic.ID, packSize int64) error { size, ok := packSizeFromIndex[id] @@ -103,7 +105,11 @@ func RepairIndex(ctx context.Context, repo *Repository, opts RepairIndexOptions, } } - err = rebuildIndexFiles(ctx, repo, removePacks, obsoleteIndexes, false, printer) + if err := repo.Flush(ctx); err != nil { + return err + } + + err = rewriteIndexFiles(ctx, repo, removePacks, oldIndexes, obsoleteIndexes, printer) if err != nil { return err } @@ -113,11 +119,11 @@ func RepairIndex(ctx context.Context, repo *Repository, opts RepairIndexOptions, return nil } -func rebuildIndexFiles(ctx context.Context, repo *Repository, removePacks restic.IDSet, extraObsolete restic.IDs, skipDeletion bool, printer progress.Printer) error { +func rewriteIndexFiles(ctx context.Context, repo *Repository, removePacks restic.IDSet, oldIndexes restic.IDSet, extraObsolete restic.IDs, printer progress.Printer) error { printer.P("rebuilding index\n") - bar := printer.NewCounter("packs processed") - return repo.idx.Save(ctx, repo, removePacks, extraObsolete, index.MasterIndexSaveOpts{ + bar := printer.NewCounter("indexes processed") + return repo.idx.Rewrite(ctx, repo, removePacks, oldIndexes, extraObsolete, index.MasterIndexRewriteOpts{ SaveProgress: bar, DeleteProgress: func() *progress.Counter { return printer.NewCounter("old indexes deleted") @@ -129,6 +135,5 @@ func rebuildIndexFiles(ctx context.Context, repo *Repository, removePacks restic printer.VV("removed index %v\n", id.String()) } }, - SkipDeletion: skipDeletion, }) } diff --git a/internal/repository/repair_index_test.go b/internal/repository/repair_index_test.go index 79922e9ec..ac47d59ff 100644 --- a/internal/repository/repair_index_test.go +++ b/internal/repository/repair_index_test.go @@ -30,10 +30,6 @@ func testRebuildIndex(t *testing.T, readAllPacks bool, damage func(t *testing.T, ReadAllPacks: readAllPacks, }, &progress.NoopPrinter{})) - newIndexes := listIndex(t, repo) - old := indexes.Intersect(newIndexes) - rtest.Assert(t, len(old) == 0, "expected old indexes to be removed, found %v", old) - checker.TestCheckRepo(t, repo, true) } diff --git a/internal/repository/repair_pack.go b/internal/repository/repair_pack.go index 7cb9d9f3e..811388cc9 100644 --- a/internal/repository/repair_pack.go +++ b/internal/repository/repair_pack.go @@ -56,7 +56,7 @@ func RepairPacks(ctx context.Context, repo *Repository, ids restic.IDSet, printe } // remove salvaged packs from index - err = rebuildIndexFiles(ctx, repo, ids, nil, false, printer) + err = rewriteIndexFiles(ctx, repo, ids, nil, nil, printer) if err != nil { return err } From 2ca1c3772152829908e18da8be9ecc3b55128cd5 Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Sun, 19 May 2024 21:08:19 +0200 Subject: [PATCH 20/25] index: additional tests for new index save methods --- internal/index/master_index_test.go | 77 ++++++++++++++++++++++++++++- 1 file changed, 76 insertions(+), 1 deletion(-) diff --git a/internal/index/master_index_test.go b/internal/index/master_index_test.go index b8a29262e..c42484c55 100644 --- a/internal/index/master_index_test.go +++ b/internal/index/master_index_test.go @@ -355,8 +355,60 @@ func TestIndexSave(t *testing.T) { } func testIndexSave(t *testing.T, version uint) { + for _, test := range []struct { + name string + saver func(idx *index.MasterIndex, repo restic.Repository) error + }{ + {"rewrite no-op", func(idx *index.MasterIndex, repo restic.Repository) error { + return idx.Rewrite(context.TODO(), repo, nil, nil, nil, index.MasterIndexRewriteOpts{}) + }}, + {"rewrite skip-all", func(idx *index.MasterIndex, repo restic.Repository) error { + return idx.Rewrite(context.TODO(), repo, nil, restic.NewIDSet(), nil, index.MasterIndexRewriteOpts{}) + }}, + {"SaveFallback", func(idx *index.MasterIndex, repo restic.Repository) error { + err := restic.ParallelRemove(context.TODO(), repo, idx.IDs(), restic.IndexFile, nil, nil) + if err != nil { + return nil + } + return idx.SaveFallback(context.TODO(), repo, restic.NewIDSet(), nil) + }}, + } { + t.Run(test.name, func(t *testing.T) { + repo := createFilledRepo(t, 3, version) + + idx := index.NewMasterIndex() + rtest.OK(t, idx.Load(context.TODO(), repo, nil, nil)) + blobs := make(map[restic.PackedBlob]struct{}) + rtest.OK(t, idx.Each(context.TODO(), func(pb restic.PackedBlob) { + blobs[pb] = struct{}{} + })) + + rtest.OK(t, test.saver(idx, repo)) + idx = index.NewMasterIndex() + rtest.OK(t, idx.Load(context.TODO(), repo, nil, nil)) + + rtest.OK(t, idx.Each(context.TODO(), func(pb restic.PackedBlob) { + if _, ok := blobs[pb]; ok { + delete(blobs, pb) + } else { + t.Fatalf("unexpected blobs %v", pb) + } + })) + rtest.Equals(t, 0, len(blobs), "saved index is missing blobs") + + checker.TestCheckRepo(t, repo, false) + }) + } +} + +func TestIndexSavePartial(t *testing.T) { + repository.TestAllVersions(t, testIndexSavePartial) +} + +func testIndexSavePartial(t *testing.T, version uint) { repo := createFilledRepo(t, 3, version) + // capture blob list before adding fourth snapshot idx := index.NewMasterIndex() rtest.OK(t, idx.Load(context.TODO(), repo, nil, nil)) blobs := make(map[restic.PackedBlob]struct{}) @@ -364,10 +416,21 @@ func testIndexSave(t *testing.T, version uint) { blobs[pb] = struct{}{} })) - rtest.OK(t, idx.Rewrite(context.TODO(), repo, nil, nil, nil, index.MasterIndexRewriteOpts{})) + // add+remove new snapshot and track its pack files + packsBefore := listPacks(t, repo) + sn := restic.TestCreateSnapshot(t, repo, snapshotTime.Add(time.Duration(4)*time.Second), depth) + rtest.OK(t, repo.RemoveUnpacked(context.TODO(), restic.SnapshotFile, *sn.ID())) + packsAfter := listPacks(t, repo) + newPacks := packsAfter.Sub(packsBefore) + + // rewrite index and remove pack files of new snapshot idx = index.NewMasterIndex() rtest.OK(t, idx.Load(context.TODO(), repo, nil, nil)) + rtest.OK(t, idx.Rewrite(context.TODO(), repo, newPacks, nil, nil, index.MasterIndexRewriteOpts{})) + // check blobs + idx = index.NewMasterIndex() + rtest.OK(t, idx.Load(context.TODO(), repo, nil, nil)) rtest.OK(t, idx.Each(context.TODO(), func(pb restic.PackedBlob) { if _, ok := blobs[pb]; ok { delete(blobs, pb) @@ -377,5 +440,17 @@ func testIndexSave(t *testing.T, version uint) { })) rtest.Equals(t, 0, len(blobs), "saved index is missing blobs") + // remove pack files to make check happy + rtest.OK(t, restic.ParallelRemove(context.TODO(), repo, newPacks, restic.PackFile, nil, nil)) + checker.TestCheckRepo(t, repo, false) } + +func listPacks(t testing.TB, repo restic.Lister) restic.IDSet { + s := restic.NewIDSet() + rtest.OK(t, repo.List(context.TODO(), restic.PackFile, func(id restic.ID, _ int64) error { + s.Insert(id) + return nil + })) + return s +} From 57d69aa640799a82fa3f3cb01699e923e58ebc48 Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Sun, 19 May 2024 21:10:25 +0200 Subject: [PATCH 21/25] index: cleanup SaveIndex method --- internal/index/index.go | 19 +++++++++++++++++++ internal/index/master_index.go | 25 +++---------------------- internal/repository/repository_test.go | 2 +- 3 files changed, 23 insertions(+), 23 deletions(-) diff --git a/internal/index/index.go b/internal/index/index.go index e09b683f1..12ef6b18a 100644 --- a/internal/index/index.go +++ b/internal/index/index.go @@ -1,6 +1,7 @@ package index import ( + "bytes" "context" "encoding/json" "fmt" @@ -358,6 +359,24 @@ func (idx *Index) Encode(w io.Writer) error { return enc.Encode(idxJSON) } +// SaveIndex saves an index in the repository. +func (idx *Index) SaveIndex(ctx context.Context, repo restic.SaverUnpacked) (restic.ID, error) { + buf := bytes.NewBuffer(nil) + + err := idx.Encode(buf) + if err != nil { + return restic.ID{}, err + } + + id, err := repo.SaveUnpacked(ctx, restic.IndexFile, buf.Bytes()) + ierr := idx.SetID(id) + if ierr != nil { + // logic bug + panic(ierr) + } + return id, err +} + // Finalize sets the index to final. func (idx *Index) Finalize() { debug.Log("finalizing index") diff --git a/internal/index/master_index.go b/internal/index/master_index.go index 40d1e3446..8e959bed3 100644 --- a/internal/index/master_index.go +++ b/internal/index/master_index.go @@ -1,7 +1,6 @@ package index import ( - "bytes" "context" "fmt" "runtime" @@ -461,7 +460,7 @@ func (mi *MasterIndex) Rewrite(ctx context.Context, repo restic.Unpacked, exclud worker := func() error { for idx := range saveCh { idx.Finalize() - if _, err := SaveIndex(wgCtx, repo, idx); err != nil { + if _, err := idx.SaveIndex(wgCtx, repo); err != nil { return err } } @@ -551,7 +550,7 @@ func (mi *MasterIndex) SaveFallback(ctx context.Context, repo restic.SaverRemove worker := func() error { for idx := range ch { idx.Finalize() - if _, err := SaveIndex(wgCtx, repo, idx); err != nil { + if _, err := idx.SaveIndex(wgCtx, repo); err != nil { return err } } @@ -572,30 +571,12 @@ func (mi *MasterIndex) SaveFallback(ctx context.Context, repo restic.SaverRemove return err } -// SaveIndex saves an index in the repository. -func SaveIndex(ctx context.Context, repo restic.SaverUnpacked, index *Index) (restic.ID, error) { - buf := bytes.NewBuffer(nil) - - err := index.Encode(buf) - if err != nil { - return restic.ID{}, err - } - - id, err := repo.SaveUnpacked(ctx, restic.IndexFile, buf.Bytes()) - ierr := index.SetID(id) - if ierr != nil { - // logic bug - panic(ierr) - } - return id, err -} - // saveIndex saves all indexes in the backend. func (mi *MasterIndex) saveIndex(ctx context.Context, r restic.SaverUnpacked, indexes ...*Index) error { for i, idx := range indexes { debug.Log("Saving index %d", i) - sid, err := SaveIndex(ctx, r, idx) + sid, err := idx.SaveIndex(ctx, r) if err != nil { return err } diff --git a/internal/repository/repository_test.go b/internal/repository/repository_test.go index bc950d0b0..05b790e33 100644 --- a/internal/repository/repository_test.go +++ b/internal/repository/repository_test.go @@ -336,7 +336,7 @@ func benchmarkLoadIndex(b *testing.B, version uint) { } idx.Finalize() - id, err := index.SaveIndex(context.TODO(), repo, idx) + id, err := idx.SaveIndex(context.TODO(), repo) rtest.OK(b, err) b.Logf("index saved as %v", id.Str()) From e52033a8bd1aa0bfb23ccde5805a00e64ea4cfa4 Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Sun, 19 May 2024 22:14:28 +0200 Subject: [PATCH 22/25] index: slightly reduce Rewrite concurrency The index operations are likely CPU-bounded. Thus, reduce the concurrency accordingly. --- internal/index/master_index.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/internal/index/master_index.go b/internal/index/master_index.go index 8e959bed3..f9fc4505b 100644 --- a/internal/index/master_index.go +++ b/internal/index/master_index.go @@ -395,8 +395,9 @@ func (mi *MasterIndex) Rewrite(ctx context.Context, repo restic.Unpacked, exclud } return nil } - // loading an index can take quite some time such that this can be both CPU- or IO-bound - loaderCount := int(repo.Connections()) + runtime.GOMAXPROCS(0) + // loading an index can take quite some time such that this is probably CPU-bound + // the index files are probably already cached at this point + loaderCount := runtime.GOMAXPROCS(0) // run workers on ch for i := 0; i < loaderCount; i++ { rewriteWg.Add(1) @@ -467,8 +468,9 @@ func (mi *MasterIndex) Rewrite(ctx context.Context, repo restic.Unpacked, exclud return nil } - // encoding an index can take quite some time such that this can be both CPU- or IO-bound - workerCount := int(repo.Connections()) + runtime.GOMAXPROCS(0) + // encoding an index can take quite some time such that this can be CPU- or IO-bound + // do not add repo.Connections() here as there are already the loader goroutines. + workerCount := runtime.GOMAXPROCS(0) // run workers on ch for i := 0; i < workerCount; i++ { wg.Go(worker) From 027cc647373b05718710831c76177d17ca6e8a14 Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Sun, 19 May 2024 23:24:18 +0200 Subject: [PATCH 23/25] repository: fix prune heuristic to allow resuming interrupted runs Pack files created by interrupted prune runs, appear to consist only of duplicate blobs on the next run. This caused the previous heuristic to ignore those pack files. Now, a duplicate blob in a specific pack file is also selected if that pack file only contains duplicate blobs. This allows prune to select the already rewritten pack files. --- internal/repository/prune.go | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/internal/repository/prune.go b/internal/repository/prune.go index 895b07994..49869fcac 100644 --- a/internal/repository/prune.go +++ b/internal/repository/prune.go @@ -72,10 +72,12 @@ type PrunePlan struct { } type packInfo struct { - usedBlobs uint - unusedBlobs uint - usedSize uint64 - unusedSize uint64 + usedBlobs uint + unusedBlobs uint + duplicateBlobs uint + usedSize uint64 + unusedSize uint64 + tpe restic.BlobType uncompressed bool } @@ -226,6 +228,7 @@ func packInfoFromIndex(ctx context.Context, idx restic.ListBlobser, usedBlobs re // mark as unused for now, we will later on select one copy ip.unusedSize += size ip.unusedBlobs++ + ip.duplicateBlobs++ // count as duplicate, will later on change one copy to be counted as used stats.Size.Duplicate += size @@ -256,6 +259,8 @@ func packInfoFromIndex(ctx context.Context, idx restic.ListBlobser, usedBlobs re // if duplicate blobs exist, those will be set to either "used" or "unused": // - mark only one occurrence of duplicate blobs as used // - if there are already some used blobs in a pack, possibly mark duplicates in this pack as "used" + // - if a pack only consists of duplicates (which by definition are used blobs), mark it as "used". This + // ensures that already rewritten packs are kept. // - if there are no used blobs in a pack, possibly mark duplicates as "unused" if hasDuplicates { // iterate again over all blobs in index (this is pretty cheap, all in-mem) @@ -271,8 +276,10 @@ func packInfoFromIndex(ctx context.Context, idx restic.ListBlobser, usedBlobs re ip := indexPack[blob.PackID] size := uint64(blob.Length) switch { - case ip.usedBlobs > 0, count == 0: - // other used blobs in pack or "last" occurrence -> transition to used + case ip.usedBlobs > 0, (ip.duplicateBlobs == ip.unusedBlobs), count == 0: + // other used blobs in pack, only duplicate blobs or "last" occurrence -> transition to used + // a pack file created by an interrupted prune run will consist of only duplicate blobs + // thus select such already repacked pack files ip.usedSize += size ip.usedBlobs++ ip.unusedSize -= size From f680a2331d7e5e5131e96fde7c0b5ac534e867d9 Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Sun, 19 May 2024 23:50:40 +0200 Subject: [PATCH 24/25] add changelog for streaming index rewrite --- changelog/unreleased/issue-3806 | 11 +++++++++++ 1 file changed, 11 insertions(+) create mode 100644 changelog/unreleased/issue-3806 diff --git a/changelog/unreleased/issue-3806 b/changelog/unreleased/issue-3806 new file mode 100644 index 000000000..d3ae9b507 --- /dev/null +++ b/changelog/unreleased/issue-3806 @@ -0,0 +1,11 @@ +Enhancement: Make `prune` command resumable + +When `prune` was interrupted, it a latter `prune` run previously started repacking +the pack files from the start as `prune` did not update the index while repacking. + +The `prune` command now supports resuming interrupted prune runs. The update +of the repository index also has been optimized to use less memory and only +rewrite parts of the index that have changed. + +https://github.com/restic/restic/issues/3806 +https://github.com/restic/restic/pull/4812 From 860b595a8b1a29adceaf9bb21df5cbdcf7927df6 Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Sun, 19 May 2024 23:52:21 +0200 Subject: [PATCH 25/25] backend: increase watchdog test timeout for deflaking --- internal/backend/watchdog_roundtriper_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/backend/watchdog_roundtriper_test.go b/internal/backend/watchdog_roundtriper_test.go index a13d670e0..b1f589bc0 100644 --- a/internal/backend/watchdog_roundtriper_test.go +++ b/internal/backend/watchdog_roundtriper_test.go @@ -64,7 +64,7 @@ func TestRoundtrip(t *testing.T) { })) defer srv.Close() - rt := newWatchdogRoundtripper(http.DefaultTransport, 50*time.Millisecond, 2) + rt := newWatchdogRoundtripper(http.DefaultTransport, 100*time.Millisecond, 2) req, err := http.NewRequestWithContext(context.TODO(), "GET", srv.URL, io.NopCloser(newSlowReader(bytes.NewReader(msg), time.Duration(delay)*time.Millisecond))) rtest.OK(t, err)