Merge pull request #3899 from MichaelEischer/less-prune-mem

Optimize prune memory usage
This commit is contained in:
Michael Eischer 2022-10-22 18:56:02 +02:00 committed by GitHub
commit b57d42905c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 217 additions and 51 deletions

View File

@ -0,0 +1,5 @@
Enhancement: Optimize prune memory usage
The prune command requires large amounts of memory to determine what to keep and what to remove. That step has been optimized to use up to 30% less memory.
https://github.com/restic/restic/pull/3899

View File

@ -233,11 +233,11 @@ type pruneStats struct {
}
type prunePlan struct {
removePacksFirst restic.IDSet // packs to remove first (unreferenced packs)
repackPacks restic.IDSet // packs to repack
keepBlobs restic.BlobSet // blobs to keep during repacking
removePacks restic.IDSet // packs to remove
ignorePacks restic.IDSet // packs to ignore when rebuilding the index
removePacksFirst restic.IDSet // packs to remove first (unreferenced packs)
repackPacks restic.IDSet // packs to repack
keepBlobs restic.CountedBlobSet // blobs to keep during repacking
removePacks restic.IDSet // packs to remove
ignorePacks restic.IDSet // packs to ignore when rebuilding the index
}
type packInfo struct {
@ -277,6 +277,7 @@ func planPrune(ctx context.Context, opts PruneOptions, gopts GlobalOptions, repo
}
if len(plan.repackPacks) != 0 {
blobCount := keepBlobs.Len()
// when repacking, we do not want to keep blobs which are
// already contained in kept packs, so delete them from keepBlobs
repo.Index().Each(ctx, func(blob restic.PackedBlob) {
@ -285,6 +286,11 @@ func planPrune(ctx context.Context, opts PruneOptions, gopts GlobalOptions, repo
}
keepBlobs.Delete(blob.BlobHandle)
})
if keepBlobs.Len() < blobCount/2 {
// replace with copy to shrink map to necessary size if there's a chance to benefit
keepBlobs = keepBlobs.Copy()
}
} else {
// keepBlobs is only needed if packs are repacked
keepBlobs = nil
@ -294,46 +300,53 @@ func planPrune(ctx context.Context, opts PruneOptions, gopts GlobalOptions, repo
return plan, stats, nil
}
func packInfoFromIndex(ctx context.Context, idx restic.MasterIndex, usedBlobs restic.BlobSet, stats *pruneStats) (restic.BlobSet, map[restic.ID]packInfo, error) {
keepBlobs := restic.NewBlobSet()
duplicateBlobs := make(map[restic.BlobHandle]uint8)
func packInfoFromIndex(ctx context.Context, idx restic.MasterIndex, usedBlobs restic.CountedBlobSet, stats *pruneStats) (restic.CountedBlobSet, map[restic.ID]packInfo, error) {
// iterate over all blobs in index to find out which blobs are duplicates
// The counter in usedBlobs describes how many instances of the blob exist in the repository index
// Thus 0 == blob is missing, 1 == blob exists once, >= 2 == duplicates exist
idx.Each(ctx, func(blob restic.PackedBlob) {
bh := blob.BlobHandle
size := uint64(blob.Length)
switch {
case usedBlobs.Has(bh): // used blob, move to keepBlobs
usedBlobs.Delete(bh)
keepBlobs.Insert(bh)
stats.size.used += size
stats.blobs.used++
case keepBlobs.Has(bh): // duplicate blob
count, ok := duplicateBlobs[bh]
if !ok {
count = 2 // this one is already the second blob!
} else if count < math.MaxUint8 {
count, ok := usedBlobs[bh]
if ok {
if count < math.MaxUint8 {
// don't overflow, but saturate count at 255
// this can lead to a non-optimal pack selection, but won't cause
// problems otherwise
count++
}
duplicateBlobs[bh] = count
stats.size.duplicate += size
stats.blobs.duplicate++
default:
if count == 1 {
stats.size.used += size
stats.blobs.used++
} else {
// duplicate if counted more than once
stats.size.duplicate += size
stats.blobs.duplicate++
}
usedBlobs[bh] = count
} else {
stats.size.unused += size
stats.blobs.unused++
}
})
// Check if all used blobs have been found in index
if len(usedBlobs) != 0 {
missingBlobs := restic.NewBlobSet()
for bh, count := range usedBlobs {
if count == 0 {
// blob does not exist in any pack files
missingBlobs.Insert(bh)
}
}
if len(missingBlobs) != 0 {
Warnf("%v not found in the index\n\n"+
"Integrity check failed: Data seems to be missing.\n"+
"Will not start prune to prevent (additional) data loss!\n"+
"Please report this error (along with the output of the 'prune' run) at\n"+
"https://github.com/restic/restic/issues/new/choose\n", usedBlobs)
"https://github.com/restic/restic/issues/new/choose\n", missingBlobs)
return nil, nil, errorIndexIncomplete
}
@ -345,6 +358,7 @@ func packInfoFromIndex(ctx context.Context, idx restic.MasterIndex, usedBlobs re
indexPack[pid] = packInfo{tpe: restic.NumBlobTypes, usedSize: uint64(hdrSize)}
}
hasDuplicates := false
// iterate over all blobs in index to generate packInfo
idx.Each(ctx, func(blob restic.PackedBlob) {
ip := indexPack[blob.PackID]
@ -361,10 +375,14 @@ func packInfoFromIndex(ctx context.Context, idx restic.MasterIndex, usedBlobs re
bh := blob.BlobHandle
size := uint64(blob.Length)
_, isDuplicate := duplicateBlobs[bh]
dupCount := usedBlobs[bh]
switch {
case isDuplicate: // duplicate blobs will be handled later
case keepBlobs.Has(bh): // used blob, not duplicate
case dupCount >= 2:
hasDuplicates = true
// mark as unused for now, we will later on select one copy
ip.unusedSize += size
ip.unusedBlobs++
case dupCount == 1: // used blob, not duplicate
ip.usedSize += size
ip.usedBlobs++
default: // unused blob
@ -382,40 +400,52 @@ func packInfoFromIndex(ctx context.Context, idx restic.MasterIndex, usedBlobs re
// - mark only one occurence of duplicate blobs as used
// - if there are already some used blobs in a pack, possibly mark duplicates in this pack as "used"
// - if there are no used blobs in a pack, possibly mark duplicates as "unused"
if len(duplicateBlobs) > 0 {
if hasDuplicates {
// iterate again over all blobs in index (this is pretty cheap, all in-mem)
idx.Each(ctx, func(blob restic.PackedBlob) {
bh := blob.BlobHandle
count, isDuplicate := duplicateBlobs[bh]
if !isDuplicate {
count, ok := usedBlobs[bh]
// skip non-duplicate, aka. normal blobs
// count == 0 is used to mark that this was a duplicate blob with only a single occurence remaining
if !ok || count == 1 {
return
}
ip := indexPack[blob.PackID]
size := uint64(blob.Length)
switch {
case count == 0:
// used duplicate exists -> mark as unused
ip.unusedSize += size
ip.unusedBlobs++
case ip.usedBlobs > 0, count == 1:
// other used blobs in pack or "last" occurency -> mark as used
case ip.usedBlobs > 0, count == 0:
// other used blobs in pack or "last" occurence -> transition to used
ip.usedSize += size
ip.usedBlobs++
// let other occurences be marked as unused
duplicateBlobs[bh] = 0
ip.unusedSize -= size
ip.unusedBlobs--
// let other occurences remain marked as unused
usedBlobs[bh] = 1
default:
// mark as unused and decrease counter
ip.unusedSize += size
ip.unusedBlobs++
duplicateBlobs[bh] = count - 1
// remain unused and decrease counter
count--
if count == 1 {
// setting count to 1 would lead to forgetting that this blob had duplicates
// thus use the special value zero. This will select the last instance of the blob for keeping.
count = 0
}
usedBlobs[bh] = count
}
// update indexPack
indexPack[blob.PackID] = ip
})
}
return keepBlobs, indexPack, nil
// Sanity check. If no duplicates exist, all blobs have value 1. After handling
// duplicates, this also applies to duplicates.
for _, count := range usedBlobs {
if count != 1 {
panic("internal error during blob selection")
}
}
return usedBlobs, indexPack, nil
}
func decidePackAction(ctx context.Context, opts PruneOptions, gopts GlobalOptions, repo restic.Repository, indexPack map[restic.ID]packInfo, stats *pruneStats) (prunePlan, error) {
@ -690,6 +720,9 @@ func doPrune(ctx context.Context, opts PruneOptions, gopts GlobalOptions, repo r
"https://github.com/restic/restic/issues/new/choose\n", plan.keepBlobs)
return errors.Fatal("internal error: blobs were not repacked")
}
// allow GC of the blob set
plan.keepBlobs = nil
}
if len(plan.ignorePacks) == 0 {
@ -747,7 +780,7 @@ func rebuildIndexFiles(ctx context.Context, gopts GlobalOptions, repo restic.Rep
return DeleteFilesChecked(ctx, gopts, repo, obsoleteIndexes, restic.IndexFile)
}
func getUsedBlobs(ctx context.Context, gopts GlobalOptions, repo restic.Repository, ignoreSnapshots restic.IDSet) (usedBlobs restic.BlobSet, err error) {
func getUsedBlobs(ctx context.Context, gopts GlobalOptions, repo restic.Repository, ignoreSnapshots restic.IDSet) (usedBlobs restic.CountedBlobSet, err error) {
var snapshotTrees restic.IDs
Verbosef("loading all snapshots...\n")
err = restic.ForAllSnapshots(ctx, repo.Backend(), repo, ignoreSnapshots,
@ -766,7 +799,7 @@ func getUsedBlobs(ctx context.Context, gopts GlobalOptions, repo restic.Reposito
Verbosef("finding data that is still in use for %d snapshots\n", len(snapshotTrees))
usedBlobs = restic.NewBlobSet()
usedBlobs = restic.NewCountedBlobSet()
bar := newProgressMax(!gopts.Quiet, uint64(len(snapshotTrees)), "snapshots")
defer bar.Done()

View File

@ -12,6 +12,12 @@ import (
"golang.org/x/sync/errgroup"
)
type repackBlobSet interface {
Has(bh restic.BlobHandle) bool
Delete(bh restic.BlobHandle)
Len() int
}
// Repack takes a list of packs together with a list of blobs contained in
// these packs. Each pack is loaded and the blobs listed in keepBlobs is saved
// into a new pack. Returned is the list of obsolete packs which can then
@ -19,8 +25,8 @@ import (
//
// The map keepBlobs is modified by Repack, it is used to keep track of which
// blobs have been processed.
func Repack(ctx context.Context, repo restic.Repository, dstRepo restic.Repository, packs restic.IDSet, keepBlobs restic.BlobSet, p *progress.Counter) (obsoletePacks restic.IDSet, err error) {
debug.Log("repacking %d packs while keeping %d blobs", len(packs), len(keepBlobs))
func Repack(ctx context.Context, repo restic.Repository, dstRepo restic.Repository, packs restic.IDSet, keepBlobs repackBlobSet, p *progress.Counter) (obsoletePacks restic.IDSet, err error) {
debug.Log("repacking %d packs while keeping %d blobs", len(packs), keepBlobs.Len())
if repo == dstRepo && dstRepo.Connections() < 2 {
return nil, errors.Fatal("repack step requires a backend connection limit of at least two")
@ -41,7 +47,7 @@ func Repack(ctx context.Context, repo restic.Repository, dstRepo restic.Reposito
return obsoletePacks, nil
}
func repack(ctx context.Context, repo restic.Repository, dstRepo restic.Repository, packs restic.IDSet, keepBlobs restic.BlobSet, p *progress.Counter) (obsoletePacks restic.IDSet, err error) {
func repack(ctx context.Context, repo restic.Repository, dstRepo restic.Repository, packs restic.IDSet, keepBlobs repackBlobSet, p *progress.Counter) (obsoletePacks restic.IDSet, err error) {
wg, wgCtx := errgroup.WithContext(ctx)
var keepMutex sync.Mutex

View File

@ -31,6 +31,10 @@ func (s BlobSet) Delete(h BlobHandle) {
delete(s, h)
}
func (s BlobSet) Len() int {
return len(s)
}
// Equals returns true iff s equals other.
func (s BlobSet) Equals(other BlobSet) bool {
if len(s) != len(other) {

View File

@ -0,0 +1,68 @@
package restic
import "sort"
// CountedBlobSet is a set of blobs. For each blob it also stores a uint8 value
// which can be used to track some information. The CountedBlobSet does not use
// that value in any way. New entries are created with value 0.
type CountedBlobSet map[BlobHandle]uint8
// NewCountedBlobSet returns a new CountedBlobSet, populated with ids.
func NewCountedBlobSet(handles ...BlobHandle) CountedBlobSet {
m := make(CountedBlobSet)
for _, h := range handles {
m[h] = 0
}
return m
}
// Has returns true iff id is contained in the set.
func (s CountedBlobSet) Has(h BlobHandle) bool {
_, ok := s[h]
return ok
}
// Insert adds id to the set.
func (s CountedBlobSet) Insert(h BlobHandle) {
s[h] = 0
}
// Delete removes id from the set.
func (s CountedBlobSet) Delete(h BlobHandle) {
delete(s, h)
}
func (s CountedBlobSet) Len() int {
return len(s)
}
// List returns a sorted slice of all BlobHandle in the set.
func (s CountedBlobSet) List() BlobHandles {
list := make(BlobHandles, 0, len(s))
for h := range s {
list = append(list, h)
}
sort.Sort(list)
return list
}
func (s CountedBlobSet) String() string {
str := s.List().String()
if len(str) < 2 {
return "{}"
}
return "{" + str[1:len(str)-1] + "}"
}
// Copy returns a copy of the CountedBlobSet.
func (s CountedBlobSet) Copy() CountedBlobSet {
cp := make(CountedBlobSet, len(s))
for k, v := range s {
cp[k] = v
}
return cp
}

View File

@ -0,0 +1,45 @@
package restic_test
import (
"testing"
"github.com/restic/restic/internal/restic"
"github.com/restic/restic/internal/test"
)
func TestCountedBlobSet(t *testing.T) {
bs := restic.NewCountedBlobSet()
test.Equals(t, bs.Len(), 0)
test.Equals(t, bs.List(), restic.BlobHandles{})
bh := restic.NewRandomBlobHandle()
// check non existant
test.Equals(t, bs.Has(bh), false)
// test insert
bs.Insert(bh)
test.Equals(t, bs.Has(bh), true)
test.Equals(t, bs.Len(), 1)
test.Equals(t, bs.List(), restic.BlobHandles{bh})
// test remove
bs.Delete(bh)
test.Equals(t, bs.Len(), 0)
test.Equals(t, bs.Has(bh), false)
test.Equals(t, bs.List(), restic.BlobHandles{})
bs = restic.NewCountedBlobSet(bh)
test.Equals(t, bs.Len(), 1)
test.Equals(t, bs.List(), restic.BlobHandles{bh})
s := bs.String()
test.Assert(t, len(s) > 10, "invalid string: %v", s)
}
func TestCountedBlobSetCopy(t *testing.T) {
bs := restic.NewCountedBlobSet(restic.NewRandomBlobHandle(), restic.NewRandomBlobHandle(), restic.NewRandomBlobHandle())
test.Equals(t, bs.Len(), 3)
cp := bs.Copy()
test.Equals(t, cp.Len(), 3)
test.Equals(t, bs.List(), cp.List())
}

View File

@ -15,9 +15,14 @@ type Loader interface {
Connections() uint
}
type findBlobSet interface {
Has(bh BlobHandle) bool
Insert(bh BlobHandle)
}
// FindUsedBlobs traverses the tree ID and adds all seen blobs (trees and data
// blobs) to the set blobs. Already seen tree blobs will not be visited again.
func FindUsedBlobs(ctx context.Context, repo Loader, treeIDs IDs, blobs BlobSet, p *progress.Counter) error {
func FindUsedBlobs(ctx context.Context, repo Loader, treeIDs IDs, blobs findBlobSet, p *progress.Counter) error {
var lock sync.Mutex
wg, ctx := errgroup.WithContext(ctx)