diff --git a/changelog/unreleased/pull-4354 b/changelog/unreleased/pull-4354 new file mode 100644 index 000000000..dddbbc765 --- /dev/null +++ b/changelog/unreleased/pull-4354 @@ -0,0 +1,7 @@ +Enhancement: Significantly reduce prune memory usage + +Prune has been optimized to use up to 60% less memory. The memory usage should +now be roughly similar to creating a backup. + +https://github.com/restic/restic/pull/4354 +https://github.com/restic/restic/pull/4812 diff --git a/cmd/restic/cmd_prune.go b/cmd/restic/cmd_prune.go index 7872589be..2637fedc8 100644 --- a/cmd/restic/cmd_prune.go +++ b/cmd/restic/cmd_prune.go @@ -188,8 +188,8 @@ func runPruneWithRepo(ctx context.Context, opts PruneOptions, gopts GlobalOption RepackUncompressed: opts.RepackUncompressed, } - plan, err := repository.PlanPrune(ctx, popts, repo, func(ctx context.Context, repo restic.Repository) (usedBlobs restic.CountedBlobSet, err error) { - return getUsedBlobs(ctx, repo, ignoreSnapshots, printer) + plan, err := repository.PlanPrune(ctx, popts, repo, func(ctx context.Context, repo restic.Repository, usedBlobs restic.FindBlobSet) error { + return getUsedBlobs(ctx, repo, usedBlobs, ignoreSnapshots, printer) }, printer) if err != nil { return err @@ -255,10 +255,10 @@ func printPruneStats(printer progress.Printer, stats repository.PruneStats) erro return nil } -func getUsedBlobs(ctx context.Context, repo restic.Repository, ignoreSnapshots restic.IDSet, printer progress.Printer) (usedBlobs restic.CountedBlobSet, err error) { +func getUsedBlobs(ctx context.Context, repo restic.Repository, usedBlobs restic.FindBlobSet, ignoreSnapshots restic.IDSet, printer progress.Printer) error { var snapshotTrees restic.IDs printer.P("loading all snapshots...\n") - err = restic.ForAllSnapshots(ctx, repo, repo, ignoreSnapshots, + err := restic.ForAllSnapshots(ctx, repo, repo, ignoreSnapshots, func(id restic.ID, sn *restic.Snapshot, err error) error { if err != nil { debug.Log("failed to load snapshot %v (error %v)", id, err) @@ -269,20 +269,14 @@ func getUsedBlobs(ctx context.Context, repo restic.Repository, ignoreSnapshots r return nil }) if err != nil { - return nil, errors.Fatalf("failed loading snapshot: %v", err) + return errors.Fatalf("failed loading snapshot: %v", err) } printer.P("finding data that is still in use for %d snapshots\n", len(snapshotTrees)) - usedBlobs = restic.NewCountedBlobSet() - bar := printer.NewCounter("snapshots") bar.SetMax(uint64(len(snapshotTrees))) defer bar.Done() - err = restic.FindUsedBlobs(ctx, repo, snapshotTrees, usedBlobs, bar) - if err != nil { - return nil, err - } - return usedBlobs, nil + return restic.FindUsedBlobs(ctx, repo, snapshotTrees, usedBlobs, bar) } diff --git a/cmd/restic/cmd_repair_index_integration_test.go b/cmd/restic/cmd_repair_index_integration_test.go index e1a3dfe03..e11b2f91b 100644 --- a/cmd/restic/cmd_repair_index_integration_test.go +++ b/cmd/restic/cmd_repair_index_integration_test.go @@ -68,7 +68,7 @@ func TestRebuildIndexAlwaysFull(t *testing.T) { defer func() { index.IndexFull = indexFull }() - index.IndexFull = func(*index.Index, bool) bool { return true } + index.IndexFull = func(*index.Index) bool { return true } testRebuildIndex(t, nil) } diff --git a/internal/index/associated_data.go b/internal/index/associated_data.go new file mode 100644 index 000000000..ee58957e0 --- /dev/null +++ b/internal/index/associated_data.go @@ -0,0 +1,156 @@ +package index + +import ( + "context" + "sort" + + "github.com/restic/restic/internal/restic" +) + +type associatedSetSub[T any] struct { + value []T + isSet []bool +} + +// AssociatedSet is a memory efficient implementation of a BlobSet that can +// store a small data item for each BlobHandle. It relies on a special property +// of our MasterIndex implementation. A BlobHandle can be permanently identified +// using an offset that never changes as MasterIndex entries cannot be modified (only added). +// +// The AssociatedSet thus can use an array with the size of the MasterIndex to store +// its data. Access to an individual entry is possible by looking up the BlobHandle's +// offset from the MasterIndex. +// +// BlobHandles that are not part of the MasterIndex can be stored by placing them in +// an overflow set that is expected to be empty in the normal case. +type AssociatedSet[T any] struct { + byType [restic.NumBlobTypes]associatedSetSub[T] + overflow map[restic.BlobHandle]T + idx *MasterIndex +} + +func NewAssociatedSet[T any](mi *MasterIndex) *AssociatedSet[T] { + a := AssociatedSet[T]{ + overflow: make(map[restic.BlobHandle]T), + idx: mi, + } + + for typ := range a.byType { + if typ == 0 { + continue + } + // index starts counting at 1 + count := mi.stableLen(restic.BlobType(typ)) + 1 + a.byType[typ].value = make([]T, count) + a.byType[typ].isSet = make([]bool, count) + } + + return &a +} + +func (a *AssociatedSet[T]) Get(bh restic.BlobHandle) (T, bool) { + if val, ok := a.overflow[bh]; ok { + return val, true + } + + idx := a.idx.blobIndex(bh) + bt := &a.byType[bh.Type] + if idx >= len(bt.value) || idx == -1 { + var zero T + return zero, false + } + + has := bt.isSet[idx] + if has { + return bt.value[idx], has + } + var zero T + return zero, false +} + +func (a *AssociatedSet[T]) Has(bh restic.BlobHandle) bool { + _, ok := a.Get(bh) + return ok +} + +func (a *AssociatedSet[T]) Set(bh restic.BlobHandle, val T) { + if _, ok := a.overflow[bh]; ok { + a.overflow[bh] = val + return + } + + idx := a.idx.blobIndex(bh) + bt := &a.byType[bh.Type] + if idx >= len(bt.value) || idx == -1 { + a.overflow[bh] = val + } else { + bt.value[idx] = val + bt.isSet[idx] = true + } +} + +func (a *AssociatedSet[T]) Insert(bh restic.BlobHandle) { + var zero T + a.Set(bh, zero) +} + +func (a *AssociatedSet[T]) Delete(bh restic.BlobHandle) { + if _, ok := a.overflow[bh]; ok { + delete(a.overflow, bh) + return + } + + idx := a.idx.blobIndex(bh) + bt := &a.byType[bh.Type] + if idx < len(bt.value) && idx != -1 { + bt.isSet[idx] = false + } +} + +func (a *AssociatedSet[T]) Len() int { + count := 0 + a.For(func(_ restic.BlobHandle, _ T) { + count++ + }) + return count +} + +func (a *AssociatedSet[T]) For(cb func(bh restic.BlobHandle, val T)) { + for k, v := range a.overflow { + cb(k, v) + } + + _ = a.idx.Each(context.Background(), func(pb restic.PackedBlob) { + if _, ok := a.overflow[pb.BlobHandle]; ok { + // already reported via overflow set + return + } + + val, known := a.Get(pb.BlobHandle) + if known { + cb(pb.BlobHandle, val) + } + }) +} + +// List returns a sorted slice of all BlobHandle in the set. +func (a *AssociatedSet[T]) List() restic.BlobHandles { + list := make(restic.BlobHandles, 0) + a.For(func(bh restic.BlobHandle, _ T) { + list = append(list, bh) + }) + + return list +} + +func (a *AssociatedSet[T]) String() string { + list := a.List() + sort.Sort(list) + + str := list.String() + if len(str) < 2 { + return "{}" + } + + return "{" + str[1:len(str)-1] + "}" +} diff --git a/internal/index/associated_data_test.go b/internal/index/associated_data_test.go new file mode 100644 index 000000000..715e3391a --- /dev/null +++ b/internal/index/associated_data_test.go @@ -0,0 +1,154 @@ +package index + +import ( + "context" + "testing" + + "github.com/restic/restic/internal/crypto" + "github.com/restic/restic/internal/restic" + "github.com/restic/restic/internal/test" +) + +type noopSaver struct{} + +func (n *noopSaver) Connections() uint { + return 2 +} +func (n *noopSaver) SaveUnpacked(ctx context.Context, t restic.FileType, buf []byte) (restic.ID, error) { + return restic.Hash(buf), nil +} + +func makeFakePackedBlob() (restic.BlobHandle, restic.PackedBlob) { + bh := restic.NewRandomBlobHandle() + blob := restic.PackedBlob{ + PackID: restic.NewRandomID(), + Blob: restic.Blob{ + BlobHandle: bh, + Length: uint(crypto.CiphertextLength(10)), + Offset: 0, + }, + } + return bh, blob +} + +func TestAssociatedSet(t *testing.T) { + bh, blob := makeFakePackedBlob() + + mi := NewMasterIndex() + mi.StorePack(blob.PackID, []restic.Blob{blob.Blob}) + test.OK(t, mi.SaveIndex(context.TODO(), &noopSaver{})) + + bs := NewAssociatedSet[uint8](mi) + test.Equals(t, bs.Len(), 0) + test.Equals(t, bs.List(), restic.BlobHandles{}) + + // check non existent + test.Equals(t, bs.Has(bh), false) + _, ok := bs.Get(bh) + test.Equals(t, false, ok) + + // test insert + bs.Insert(bh) + test.Equals(t, bs.Has(bh), true) + test.Equals(t, bs.Len(), 1) + test.Equals(t, bs.List(), restic.BlobHandles{bh}) + test.Equals(t, 0, len(bs.overflow)) + + // test set + bs.Set(bh, 42) + test.Equals(t, bs.Has(bh), true) + test.Equals(t, bs.Len(), 1) + val, ok := bs.Get(bh) + test.Equals(t, true, ok) + test.Equals(t, uint8(42), val) + + s := bs.String() + test.Assert(t, len(s) > 10, "invalid string: %v", s) + + // test remove + bs.Delete(bh) + test.Equals(t, bs.Len(), 0) + test.Equals(t, bs.Has(bh), false) + test.Equals(t, bs.List(), restic.BlobHandles{}) + + test.Equals(t, "{}", bs.String()) + + // test set + bs.Set(bh, 43) + test.Equals(t, bs.Has(bh), true) + test.Equals(t, bs.Len(), 1) + val, ok = bs.Get(bh) + test.Equals(t, true, ok) + test.Equals(t, uint8(43), val) + test.Equals(t, 0, len(bs.overflow)) + // test update + bs.Set(bh, 44) + val, ok = bs.Get(bh) + test.Equals(t, true, ok) + test.Equals(t, uint8(44), val) + test.Equals(t, 0, len(bs.overflow)) + + // test overflow blob + of := restic.NewRandomBlobHandle() + test.Equals(t, false, bs.Has(of)) + // set + bs.Set(of, 7) + test.Equals(t, 1, len(bs.overflow)) + test.Equals(t, bs.Len(), 2) + // get + val, ok = bs.Get(of) + test.Equals(t, true, ok) + test.Equals(t, uint8(7), val) + test.Equals(t, bs.List(), restic.BlobHandles{of, bh}) + // update + bs.Set(of, 8) + val, ok = bs.Get(of) + test.Equals(t, true, ok) + test.Equals(t, uint8(8), val) + test.Equals(t, 1, len(bs.overflow)) + // delete + bs.Delete(of) + test.Equals(t, bs.Len(), 1) + test.Equals(t, bs.Has(of), false) + test.Equals(t, bs.List(), restic.BlobHandles{bh}) + test.Equals(t, 0, len(bs.overflow)) +} + +func TestAssociatedSetWithExtendedIndex(t *testing.T) { + _, blob := makeFakePackedBlob() + + mi := NewMasterIndex() + mi.StorePack(blob.PackID, []restic.Blob{blob.Blob}) + test.OK(t, mi.SaveIndex(context.TODO(), &noopSaver{})) + + bs := NewAssociatedSet[uint8](mi) + + // add new blobs to index after building the set + of, blob2 := makeFakePackedBlob() + mi.StorePack(blob2.PackID, []restic.Blob{blob2.Blob}) + test.OK(t, mi.SaveIndex(context.TODO(), &noopSaver{})) + + // non-existant + test.Equals(t, false, bs.Has(of)) + // set + bs.Set(of, 5) + test.Equals(t, 1, len(bs.overflow)) + test.Equals(t, bs.Len(), 1) + // get + val, ok := bs.Get(of) + test.Equals(t, true, ok) + test.Equals(t, uint8(5), val) + test.Equals(t, bs.List(), restic.BlobHandles{of}) + // update + bs.Set(of, 8) + val, ok = bs.Get(of) + test.Equals(t, true, ok) + test.Equals(t, uint8(8), val) + test.Equals(t, 1, len(bs.overflow)) + // delete + bs.Delete(of) + test.Equals(t, bs.Len(), 0) + test.Equals(t, bs.Has(of), false) + test.Equals(t, bs.List(), restic.BlobHandles{}) + test.Equals(t, 0, len(bs.overflow)) +} diff --git a/internal/index/index.go b/internal/index/index.go index 12ef6b18a..36ac2560f 100644 --- a/internal/index/index.go +++ b/internal/index/index.go @@ -47,7 +47,7 @@ import ( // Index holds lookup tables for id -> pack. type Index struct { - m sync.Mutex + m sync.RWMutex byType [restic.NumBlobTypes]indexMap packs restic.IDs @@ -83,22 +83,21 @@ func (idx *Index) store(packIndex int, blob restic.Blob) { // Final returns true iff the index is already written to the repository, it is // finalized. func (idx *Index) Final() bool { - idx.m.Lock() - defer idx.m.Unlock() + idx.m.RLock() + defer idx.m.RUnlock() return idx.final } const ( - indexMaxBlobs = 50000 - indexMaxBlobsCompressed = 3 * indexMaxBlobs - indexMaxAge = 10 * time.Minute + indexMaxBlobs = 50000 + indexMaxAge = 10 * time.Minute ) // IndexFull returns true iff the index is "full enough" to be saved as a preliminary index. -var IndexFull = func(idx *Index, compress bool) bool { - idx.m.Lock() - defer idx.m.Unlock() +var IndexFull = func(idx *Index) bool { + idx.m.RLock() + defer idx.m.RUnlock() debug.Log("checking whether index %p is full", idx) @@ -107,18 +106,12 @@ var IndexFull = func(idx *Index, compress bool) bool { blobs += idx.byType[typ].len() } age := time.Since(idx.created) - var maxBlobs uint - if compress { - maxBlobs = indexMaxBlobsCompressed - } else { - maxBlobs = indexMaxBlobs - } switch { case age >= indexMaxAge: debug.Log("index %p is old enough", idx, age) return true - case blobs >= maxBlobs: + case blobs >= indexMaxBlobs: debug.Log("index %p has %d blobs", idx, blobs) return true } @@ -163,8 +156,8 @@ func (idx *Index) toPackedBlob(e *indexEntry, t restic.BlobType) restic.PackedBl // Lookup queries the index for the blob ID and returns all entries including // duplicates. Adds found entries to blobs and returns the result. func (idx *Index) Lookup(bh restic.BlobHandle, pbs []restic.PackedBlob) []restic.PackedBlob { - idx.m.Lock() - defer idx.m.Unlock() + idx.m.RLock() + defer idx.m.RUnlock() idx.byType[bh.Type].foreachWithID(bh.ID, func(e *indexEntry) { pbs = append(pbs, idx.toPackedBlob(e, bh.Type)) @@ -175,8 +168,8 @@ func (idx *Index) Lookup(bh restic.BlobHandle, pbs []restic.PackedBlob) []restic // Has returns true iff the id is listed in the index. func (idx *Index) Has(bh restic.BlobHandle) bool { - idx.m.Lock() - defer idx.m.Unlock() + idx.m.RLock() + defer idx.m.RUnlock() return idx.byType[bh.Type].get(bh.ID) != nil } @@ -184,8 +177,8 @@ func (idx *Index) Has(bh restic.BlobHandle) bool { // LookupSize returns the length of the plaintext content of the blob with the // given id. func (idx *Index) LookupSize(bh restic.BlobHandle) (plaintextLength uint, found bool) { - idx.m.Lock() - defer idx.m.Unlock() + idx.m.RLock() + defer idx.m.RUnlock() e := idx.byType[bh.Type].get(bh.ID) if e == nil { @@ -200,8 +193,8 @@ func (idx *Index) LookupSize(bh restic.BlobHandle) (plaintextLength uint, found // Each passes all blobs known to the index to the callback fn. This blocks any // modification of the index. func (idx *Index) Each(ctx context.Context, fn func(restic.PackedBlob)) error { - idx.m.Lock() - defer idx.m.Unlock() + idx.m.RLock() + defer idx.m.RUnlock() for typ := range idx.byType { m := &idx.byType[typ] @@ -229,12 +222,12 @@ type EachByPackResult struct { // When the context is cancelled, the background goroutine // terminates. This blocks any modification of the index. func (idx *Index) EachByPack(ctx context.Context, packBlacklist restic.IDSet) <-chan EachByPackResult { - idx.m.Lock() + idx.m.RLock() ch := make(chan EachByPackResult) go func() { - defer idx.m.Unlock() + defer idx.m.RUnlock() defer close(ch) byPack := make(map[restic.ID][restic.NumBlobTypes][]*indexEntry) @@ -275,8 +268,8 @@ func (idx *Index) EachByPack(ctx context.Context, packBlacklist restic.IDSet) <- // Packs returns all packs in this index func (idx *Index) Packs() restic.IDSet { - idx.m.Lock() - defer idx.m.Unlock() + idx.m.RLock() + defer idx.m.RUnlock() packs := restic.NewIDSet() for _, packID := range idx.packs { @@ -344,8 +337,8 @@ type jsonIndex struct { // Encode writes the JSON serialization of the index to the writer w. func (idx *Index) Encode(w io.Writer) error { debug.Log("encoding index") - idx.m.Lock() - defer idx.m.Unlock() + idx.m.RLock() + defer idx.m.RUnlock() list, err := idx.generatePackList() if err != nil { @@ -389,8 +382,8 @@ func (idx *Index) Finalize() { // IDs returns the IDs of the index, if available. If the index is not yet // finalized, an error is returned. func (idx *Index) IDs() (restic.IDs, error) { - idx.m.Lock() - defer idx.m.Unlock() + idx.m.RLock() + defer idx.m.RUnlock() if !idx.final { return nil, errors.New("index not finalized") @@ -422,8 +415,8 @@ func (idx *Index) SetID(id restic.ID) error { // Dump writes the pretty-printed JSON representation of the index to w. func (idx *Index) Dump(w io.Writer) error { debug.Log("dumping index") - idx.m.Lock() - defer idx.m.Unlock() + idx.m.RLock() + defer idx.m.RUnlock() list, err := idx.generatePackList() if err != nil { @@ -579,3 +572,17 @@ func decodeOldIndex(buf []byte) (idx *Index, err error) { debug.Log("done") return idx, nil } + +func (idx *Index) BlobIndex(bh restic.BlobHandle) int { + idx.m.RLock() + defer idx.m.RUnlock() + + return idx.byType[bh.Type].firstIndex(bh.ID) +} + +func (idx *Index) Len(t restic.BlobType) uint { + idx.m.RLock() + defer idx.m.RUnlock() + + return idx.byType[t].len() +} diff --git a/internal/index/index_test.go b/internal/index/index_test.go index 66cec23f6..1a487f82f 100644 --- a/internal/index/index_test.go +++ b/internal/index/index_test.go @@ -172,6 +172,9 @@ func TestIndexSize(t *testing.T) { err := idx.Encode(wr) rtest.OK(t, err) + rtest.Equals(t, uint(packs*blobCount), idx.Len(restic.DataBlob)) + rtest.Equals(t, uint(0), idx.Len(restic.TreeBlob)) + t.Logf("Index file size for %d blobs in %d packs is %d", blobCount*packs, packs, wr.Len()) } diff --git a/internal/index/indexmap.go b/internal/index/indexmap.go index 4a78b9f77..6db523633 100644 --- a/internal/index/indexmap.go +++ b/internal/index/indexmap.go @@ -99,6 +99,32 @@ func (m *indexMap) get(id restic.ID) *indexEntry { return nil } +// firstIndex returns the index of the first entry for ID id. +// This index is guaranteed to never change. +func (m *indexMap) firstIndex(id restic.ID) int { + if len(m.buckets) == 0 { + return -1 + } + + idx := -1 + h := m.hash(id) + ei := m.buckets[h] + for ei != 0 { + e := m.resolve(ei) + cur := ei + ei = e.next + if e.id != id { + continue + } + if int(cur) < idx || idx == -1 { + // casting from uint to int is unproblematic as we'd run out of memory + // before this can result in an overflow. + idx = int(cur) + } + } + return idx +} + func (m *indexMap) grow() { m.buckets = make([]uint, growthFactor*len(m.buckets)) @@ -118,9 +144,10 @@ func (m *indexMap) hash(id restic.ID) uint { // While SHA-256 should be collision-resistant, for hash table indices // we use only a few bits of it and finding collisions for those is // much easier than breaking the whole algorithm. - m.mh.Reset() - _, _ = m.mh.Write(id[:]) - h := uint(m.mh.Sum64()) + mh := maphash.Hash{} + mh.SetSeed(m.mh.Seed()) + _, _ = mh.Write(id[:]) + h := uint(mh.Sum64()) return h & uint(len(m.buckets)-1) } diff --git a/internal/index/indexmap_test.go b/internal/index/indexmap_test.go index a16670c7d..e0db9cb40 100644 --- a/internal/index/indexmap_test.go +++ b/internal/index/indexmap_test.go @@ -143,3 +143,45 @@ func BenchmarkIndexMapHash(b *testing.B) { } } } + +func TestIndexMapFirstIndex(t *testing.T) { + t.Parallel() + + var ( + id restic.ID + m indexMap + r = rand.New(rand.NewSource(98765)) + fi = make(map[restic.ID]int) + ) + + for i := 1; i <= 400; i++ { + r.Read(id[:]) + rtest.Equals(t, -1, m.firstIndex(id), "wrong firstIndex for nonexistant id") + + m.add(id, 0, 0, 0, 0) + idx := m.firstIndex(id) + rtest.Equals(t, i, idx, "unexpected index for id") + fi[id] = idx + } + // iterate over blobs, as this is a hashmap the order is effectively random + for id, idx := range fi { + rtest.Equals(t, idx, m.firstIndex(id), "wrong index returned") + } +} + +func TestIndexMapFirstIndexDuplicates(t *testing.T) { + t.Parallel() + + var ( + id restic.ID + m indexMap + r = rand.New(rand.NewSource(98765)) + ) + + r.Read(id[:]) + for i := 1; i <= 10; i++ { + m.add(id, 0, 0, 0, 0) + } + idx := m.firstIndex(id) + rtest.Equals(t, 1, idx, "unexpected index for id") +} diff --git a/internal/index/master_index.go b/internal/index/master_index.go index f9fc4505b..f8e776b23 100644 --- a/internal/index/master_index.go +++ b/internal/index/master_index.go @@ -17,7 +17,6 @@ type MasterIndex struct { idx []*Index pendingBlobs restic.BlobSet idxMutex sync.RWMutex - compress bool } // NewMasterIndex creates a new master index. @@ -33,10 +32,6 @@ func (mi *MasterIndex) clear() { mi.idx[0].Finalize() } -func (mi *MasterIndex) MarkCompressed() { - mi.compress = true -} - // Lookup queries all known Indexes for the ID and returns all matches. func (mi *MasterIndex) Lookup(bh restic.BlobHandle) (pbs []restic.PackedBlob) { mi.idxMutex.RLock() @@ -211,7 +206,7 @@ func (mi *MasterIndex) finalizeFullIndexes() []*Index { continue } - if IndexFull(idx, mi.compress) { + if IndexFull(idx) { debug.Log("index %p is full", idx) idx.Finalize() list = append(list, idx) @@ -417,7 +412,7 @@ func (mi *MasterIndex) Rewrite(ctx context.Context, repo restic.Unpacked, exclud newIndex := NewIndex() for task := range rewriteCh { // always rewrite indexes using the old format, that include a pack that must be removed or that are not full - if !task.oldFormat && len(task.idx.Packs().Intersect(excludePacks)) == 0 && IndexFull(task.idx, mi.compress) { + if !task.oldFormat && len(task.idx.Packs().Intersect(excludePacks)) == 0 && IndexFull(task.idx) { // make sure that each pack is only stored exactly once in the index excludePacks.Merge(task.idx.Packs()) // index is already up to date @@ -433,7 +428,7 @@ func (mi *MasterIndex) Rewrite(ctx context.Context, repo restic.Unpacked, exclud for pbs := range task.idx.EachByPack(wgCtx, excludePacks) { newIndex.StorePack(pbs.PackID, pbs.Blobs) - if IndexFull(newIndex, mi.compress) { + if IndexFull(newIndex) { select { case saveCh <- newIndex: case <-wgCtx.Done(): @@ -527,7 +522,7 @@ func (mi *MasterIndex) SaveFallback(ctx context.Context, repo restic.SaverRemove for pbs := range idx.EachByPack(wgCtx, excludePacks) { newIndex.StorePack(pbs.PackID, pbs.Blobs) p.Add(1) - if IndexFull(newIndex, mi.compress) { + if IndexFull(newIndex) { select { case ch <- newIndex: case <-wgCtx.Done(): @@ -638,3 +633,21 @@ func (mi *MasterIndex) ListPacks(ctx context.Context, packs restic.IDSet) <-chan }() return out } + +// Only for use by AssociatedSet +func (mi *MasterIndex) blobIndex(h restic.BlobHandle) int { + mi.idxMutex.RLock() + defer mi.idxMutex.RUnlock() + + // other indexes are ignored as their ids can change when merged into the main index + return mi.idx[0].BlobIndex(h) +} + +// Only for use by AssociatedSet +func (mi *MasterIndex) stableLen(t restic.BlobType) uint { + mi.idxMutex.RLock() + defer mi.idxMutex.RUnlock() + + // other indexes are ignored as their ids can change when merged into the main index + return mi.idx[0].Len(t) +} diff --git a/internal/index/master_index_test.go b/internal/index/master_index_test.go index c42484c55..7a2487cd4 100644 --- a/internal/index/master_index_test.go +++ b/internal/index/master_index_test.go @@ -161,9 +161,12 @@ func TestMasterMergeFinalIndexes(t *testing.T) { mIdx.Insert(idx1) mIdx.Insert(idx2) - finalIndexes, idxCount := index.TestMergeIndex(t, mIdx) + rtest.Equals(t, restic.NewIDSet(), mIdx.IDs()) + + finalIndexes, idxCount, ids := index.TestMergeIndex(t, mIdx) rtest.Equals(t, []*index.Index{idx1, idx2}, finalIndexes) rtest.Equals(t, 1, idxCount) + rtest.Equals(t, ids, mIdx.IDs()) blobCount := 0 rtest.OK(t, mIdx.Each(context.TODO(), func(pb restic.PackedBlob) { @@ -186,9 +189,11 @@ func TestMasterMergeFinalIndexes(t *testing.T) { idx3.StorePack(blob2.PackID, []restic.Blob{blob2.Blob}) mIdx.Insert(idx3) - finalIndexes, idxCount = index.TestMergeIndex(t, mIdx) + finalIndexes, idxCount, newIDs := index.TestMergeIndex(t, mIdx) rtest.Equals(t, []*index.Index{idx3}, finalIndexes) rtest.Equals(t, 1, idxCount) + ids.Merge(newIDs) + rtest.Equals(t, ids, mIdx.IDs()) // Index should have same entries as before! blobs = mIdx.Lookup(bhInIdx1) diff --git a/internal/index/testing.go b/internal/index/testing.go index 7c05ac651..0b5084bb0 100644 --- a/internal/index/testing.go +++ b/internal/index/testing.go @@ -7,12 +7,15 @@ import ( "github.com/restic/restic/internal/test" ) -func TestMergeIndex(t testing.TB, mi *MasterIndex) ([]*Index, int) { +func TestMergeIndex(t testing.TB, mi *MasterIndex) ([]*Index, int, restic.IDSet) { finalIndexes := mi.finalizeNotFinalIndexes() + ids := restic.NewIDSet() for _, idx := range finalIndexes { - test.OK(t, idx.SetID(restic.NewRandomID())) + id := restic.NewRandomID() + ids.Insert(id) + test.OK(t, idx.SetID(id)) } test.OK(t, mi.MergeFinalIndexes()) - return finalIndexes, len(mi.idx) + return finalIndexes, len(mi.idx), ids } diff --git a/internal/repository/prune.go b/internal/repository/prune.go index 49869fcac..1dae68c15 100644 --- a/internal/repository/prune.go +++ b/internal/repository/prune.go @@ -7,6 +7,7 @@ import ( "sort" "github.com/restic/restic/internal/errors" + "github.com/restic/restic/internal/index" "github.com/restic/restic/internal/pack" "github.com/restic/restic/internal/restic" "github.com/restic/restic/internal/ui/progress" @@ -60,11 +61,11 @@ type PruneStats struct { } type PrunePlan struct { - removePacksFirst restic.IDSet // packs to remove first (unreferenced packs) - repackPacks restic.IDSet // packs to repack - keepBlobs restic.CountedBlobSet // blobs to keep during repacking - removePacks restic.IDSet // packs to remove - ignorePacks restic.IDSet // packs to ignore when rebuilding the index + removePacksFirst restic.IDSet // packs to remove first (unreferenced packs) + repackPacks restic.IDSet // packs to repack + keepBlobs *index.AssociatedSet[uint8] // blobs to keep during repacking + removePacks restic.IDSet // packs to remove + ignorePacks restic.IDSet // packs to ignore when rebuilding the index repo *Repository stats PruneStats @@ -90,7 +91,7 @@ type packInfoWithID struct { // PlanPrune selects which files to rewrite and which to delete and which blobs to keep. // Also some summary statistics are returned. -func PlanPrune(ctx context.Context, opts PruneOptions, repo *Repository, getUsedBlobs func(ctx context.Context, repo restic.Repository) (usedBlobs restic.CountedBlobSet, err error), printer progress.Printer) (*PrunePlan, error) { +func PlanPrune(ctx context.Context, opts PruneOptions, repo *Repository, getUsedBlobs func(ctx context.Context, repo restic.Repository, usedBlobs restic.FindBlobSet) error, printer progress.Printer) (*PrunePlan, error) { var stats PruneStats if opts.UnsafeRecovery { @@ -104,7 +105,8 @@ func PlanPrune(ctx context.Context, opts PruneOptions, repo *Repository, getUsed return nil, fmt.Errorf("compression requires at least repository format version 2") } - usedBlobs, err := getUsedBlobs(ctx, repo) + usedBlobs := index.NewAssociatedSet[uint8](repo.idx) + err := getUsedBlobs(ctx, repo, usedBlobs) if err != nil { return nil, err } @@ -122,7 +124,6 @@ func PlanPrune(ctx context.Context, opts PruneOptions, repo *Repository, getUsed } if len(plan.repackPacks) != 0 { - blobCount := keepBlobs.Len() // when repacking, we do not want to keep blobs which are // already contained in kept packs, so delete them from keepBlobs err := repo.ListBlobs(ctx, func(blob restic.PackedBlob) { @@ -134,11 +135,6 @@ func PlanPrune(ctx context.Context, opts PruneOptions, repo *Repository, getUsed if err != nil { return nil, err } - - if keepBlobs.Len() < blobCount/2 { - // replace with copy to shrink map to necessary size if there's a chance to benefit - keepBlobs = keepBlobs.Copy() - } } else { // keepBlobs is only needed if packs are repacked keepBlobs = nil @@ -152,13 +148,13 @@ func PlanPrune(ctx context.Context, opts PruneOptions, repo *Repository, getUsed return &plan, nil } -func packInfoFromIndex(ctx context.Context, idx restic.ListBlobser, usedBlobs restic.CountedBlobSet, stats *PruneStats, printer progress.Printer) (restic.CountedBlobSet, map[restic.ID]packInfo, error) { +func packInfoFromIndex(ctx context.Context, idx restic.ListBlobser, usedBlobs *index.AssociatedSet[uint8], stats *PruneStats, printer progress.Printer) (*index.AssociatedSet[uint8], map[restic.ID]packInfo, error) { // iterate over all blobs in index to find out which blobs are duplicates // The counter in usedBlobs describes how many instances of the blob exist in the repository index // Thus 0 == blob is missing, 1 == blob exists once, >= 2 == duplicates exist err := idx.ListBlobs(ctx, func(blob restic.PackedBlob) { bh := blob.BlobHandle - count, ok := usedBlobs[bh] + count, ok := usedBlobs.Get(bh) if ok { if count < math.MaxUint8 { // don't overflow, but saturate count at 255 @@ -167,7 +163,7 @@ func packInfoFromIndex(ctx context.Context, idx restic.ListBlobser, usedBlobs re count++ } - usedBlobs[bh] = count + usedBlobs.Set(bh, count) } }) if err != nil { @@ -176,12 +172,12 @@ func packInfoFromIndex(ctx context.Context, idx restic.ListBlobser, usedBlobs re // Check if all used blobs have been found in index missingBlobs := restic.NewBlobSet() - for bh, count := range usedBlobs { + usedBlobs.For(func(bh restic.BlobHandle, count uint8) { if count == 0 { // blob does not exist in any pack files missingBlobs.Insert(bh) } - } + }) if len(missingBlobs) != 0 { printer.E("%v not found in the index\n\n"+ @@ -221,7 +217,7 @@ func packInfoFromIndex(ctx context.Context, idx restic.ListBlobser, usedBlobs re bh := blob.BlobHandle size := uint64(blob.Length) - dupCount := usedBlobs[bh] + dupCount, _ := usedBlobs.Get(bh) switch { case dupCount >= 2: hasDuplicates = true @@ -266,7 +262,7 @@ func packInfoFromIndex(ctx context.Context, idx restic.ListBlobser, usedBlobs re // iterate again over all blobs in index (this is pretty cheap, all in-mem) err = idx.ListBlobs(ctx, func(blob restic.PackedBlob) { bh := blob.BlobHandle - count, ok := usedBlobs[bh] + count, ok := usedBlobs.Get(bh) // skip non-duplicate, aka. normal blobs // count == 0 is used to mark that this was a duplicate blob with only a single occurrence remaining if !ok || count == 1 { @@ -290,7 +286,7 @@ func packInfoFromIndex(ctx context.Context, idx restic.ListBlobser, usedBlobs re stats.Size.Duplicate -= size stats.Blobs.Duplicate-- // let other occurrences remain marked as unused - usedBlobs[bh] = 1 + usedBlobs.Set(bh, 1) default: // remain unused and decrease counter count-- @@ -299,7 +295,7 @@ func packInfoFromIndex(ctx context.Context, idx restic.ListBlobser, usedBlobs re // thus use the special value zero. This will select the last instance of the blob for keeping. count = 0 } - usedBlobs[bh] = count + usedBlobs.Set(bh, count) } // update indexPack indexPack[blob.PackID] = ip @@ -311,11 +307,11 @@ func packInfoFromIndex(ctx context.Context, idx restic.ListBlobser, usedBlobs re // Sanity check. If no duplicates exist, all blobs have value 1. After handling // duplicates, this also applies to duplicates. - for _, count := range usedBlobs { + usedBlobs.For(func(_ restic.BlobHandle, count uint8) { if count != 1 { panic("internal error during blob selection") } - } + }) return usedBlobs, indexPack, nil } @@ -549,6 +545,8 @@ func (plan *PrunePlan) Execute(ctx context.Context, printer progress.Printer) er if len(plan.removePacksFirst) != 0 { printer.P("deleting unreferenced packs\n") _ = deleteFiles(ctx, true, repo, plan.removePacksFirst, restic.PackFile, printer) + // forget unused data + plan.removePacksFirst = nil } if ctx.Err() != nil { return ctx.Err() @@ -566,8 +564,10 @@ func (plan *PrunePlan) Execute(ctx context.Context, printer progress.Printer) er // Also remove repacked packs plan.removePacks.Merge(plan.repackPacks) + // forget unused data + plan.repackPacks = nil - if len(plan.keepBlobs) != 0 { + if plan.keepBlobs.Len() != 0 { printer.E("%v was not repacked\n\n"+ "Integrity check failed.\n"+ "Please report this error (along with the output of the 'prune' run) at\n"+ diff --git a/internal/repository/prune_test.go b/internal/repository/prune_test.go index dbf36ffd0..17dfafa4e 100644 --- a/internal/repository/prune_test.go +++ b/internal/repository/prune_test.go @@ -30,8 +30,11 @@ func testPrune(t *testing.T, opts repository.PruneOptions, errOnUnused bool) { } rtest.OK(t, repo.Flush(context.TODO())) - plan, err := repository.PlanPrune(context.TODO(), opts, repo, func(ctx context.Context, repo restic.Repository) (usedBlobs restic.CountedBlobSet, err error) { - return restic.NewCountedBlobSet(keep.List()...), nil + plan, err := repository.PlanPrune(context.TODO(), opts, repo, func(ctx context.Context, repo restic.Repository, usedBlobs restic.FindBlobSet) error { + for blob := range keep { + usedBlobs.Insert(blob) + } + return nil }, &progress.NoopPrinter{}) rtest.OK(t, err) diff --git a/internal/repository/repository.go b/internal/repository/repository.go index f0ef93ecf..34a362c55 100644 --- a/internal/repository/repository.go +++ b/internal/repository/repository.go @@ -588,19 +588,11 @@ func (r *Repository) ListPacksFromIndex(ctx context.Context, packs restic.IDSet) // SetIndex instructs the repository to use the given index. func (r *Repository) SetIndex(i restic.MasterIndex) error { r.idx = i.(*index.MasterIndex) - r.configureIndex() return r.prepareCache() } func (r *Repository) clearIndex() { r.idx = index.NewMasterIndex() - r.configureIndex() -} - -func (r *Repository) configureIndex() { - if r.cfg.Version >= 2 { - r.idx.MarkCompressed() - } } // LoadIndex loads all index files from the backend in parallel and stores them diff --git a/internal/repository/repository_test.go b/internal/repository/repository_test.go index 05b790e33..92eb1bbae 100644 --- a/internal/repository/repository_test.go +++ b/internal/repository/repository_test.go @@ -376,7 +376,7 @@ func TestRepositoryIncrementalIndex(t *testing.T) { func testRepositoryIncrementalIndex(t *testing.T, version uint) { repo, _ := repository.TestRepositoryWithVersion(t, version) - index.IndexFull = func(*index.Index, bool) bool { return true } + index.IndexFull = func(*index.Index) bool { return true } // add a few rounds of packs for j := 0; j < 5; j++ { diff --git a/internal/restic/counted_blob_set.go b/internal/restic/counted_blob_set.go deleted file mode 100644 index f965d3129..000000000 --- a/internal/restic/counted_blob_set.go +++ /dev/null @@ -1,68 +0,0 @@ -package restic - -import "sort" - -// CountedBlobSet is a set of blobs. For each blob it also stores a uint8 value -// which can be used to track some information. The CountedBlobSet does not use -// that value in any way. New entries are created with value 0. -type CountedBlobSet map[BlobHandle]uint8 - -// NewCountedBlobSet returns a new CountedBlobSet, populated with ids. -func NewCountedBlobSet(handles ...BlobHandle) CountedBlobSet { - m := make(CountedBlobSet) - for _, h := range handles { - m[h] = 0 - } - - return m -} - -// Has returns true iff id is contained in the set. -func (s CountedBlobSet) Has(h BlobHandle) bool { - _, ok := s[h] - return ok -} - -// Insert adds id to the set. -func (s CountedBlobSet) Insert(h BlobHandle) { - s[h] = 0 -} - -// Delete removes id from the set. -func (s CountedBlobSet) Delete(h BlobHandle) { - delete(s, h) -} - -func (s CountedBlobSet) Len() int { - return len(s) -} - -// List returns a sorted slice of all BlobHandle in the set. -func (s CountedBlobSet) List() BlobHandles { - list := make(BlobHandles, 0, len(s)) - for h := range s { - list = append(list, h) - } - - sort.Sort(list) - - return list -} - -func (s CountedBlobSet) String() string { - str := s.List().String() - if len(str) < 2 { - return "{}" - } - - return "{" + str[1:len(str)-1] + "}" -} - -// Copy returns a copy of the CountedBlobSet. -func (s CountedBlobSet) Copy() CountedBlobSet { - cp := make(CountedBlobSet, len(s)) - for k, v := range s { - cp[k] = v - } - return cp -} diff --git a/internal/restic/counted_blob_set_test.go b/internal/restic/counted_blob_set_test.go deleted file mode 100644 index edd39e65b..000000000 --- a/internal/restic/counted_blob_set_test.go +++ /dev/null @@ -1,45 +0,0 @@ -package restic_test - -import ( - "testing" - - "github.com/restic/restic/internal/restic" - "github.com/restic/restic/internal/test" -) - -func TestCountedBlobSet(t *testing.T) { - bs := restic.NewCountedBlobSet() - test.Equals(t, bs.Len(), 0) - test.Equals(t, bs.List(), restic.BlobHandles{}) - - bh := restic.NewRandomBlobHandle() - // check non existent - test.Equals(t, bs.Has(bh), false) - - // test insert - bs.Insert(bh) - test.Equals(t, bs.Has(bh), true) - test.Equals(t, bs.Len(), 1) - test.Equals(t, bs.List(), restic.BlobHandles{bh}) - - // test remove - bs.Delete(bh) - test.Equals(t, bs.Len(), 0) - test.Equals(t, bs.Has(bh), false) - test.Equals(t, bs.List(), restic.BlobHandles{}) - - bs = restic.NewCountedBlobSet(bh) - test.Equals(t, bs.Len(), 1) - test.Equals(t, bs.List(), restic.BlobHandles{bh}) - - s := bs.String() - test.Assert(t, len(s) > 10, "invalid string: %v", s) -} - -func TestCountedBlobSetCopy(t *testing.T) { - bs := restic.NewCountedBlobSet(restic.NewRandomBlobHandle(), restic.NewRandomBlobHandle(), restic.NewRandomBlobHandle()) - test.Equals(t, bs.Len(), 3) - cp := bs.Copy() - test.Equals(t, cp.Len(), 3) - test.Equals(t, bs.List(), cp.List()) -} diff --git a/internal/restic/find.go b/internal/restic/find.go index cefef2196..d7b032bf8 100644 --- a/internal/restic/find.go +++ b/internal/restic/find.go @@ -15,14 +15,14 @@ type Loader interface { Connections() uint } -type findBlobSet interface { +type FindBlobSet interface { Has(bh BlobHandle) bool Insert(bh BlobHandle) } // FindUsedBlobs traverses the tree ID and adds all seen blobs (trees and data // blobs) to the set blobs. Already seen tree blobs will not be visited again. -func FindUsedBlobs(ctx context.Context, repo Loader, treeIDs IDs, blobs findBlobSet, p *progress.Counter) error { +func FindUsedBlobs(ctx context.Context, repo Loader, treeIDs IDs, blobs FindBlobSet, p *progress.Counter) error { var lock sync.Mutex wg, ctx := errgroup.WithContext(ctx)