mirror of
https://github.com/octoleo/restic.git
synced 2024-12-01 17:23:57 +00:00
prune: Use a single CountedBlobSet to track blobs
The set covers necessary, existing and duplicate blobs. This removes the duplicate sets used to track whether all necessary blobs also exist. This reduces the memory usage of prune by about 20-30%.
This commit is contained in:
parent
b21241ec1c
commit
c4fc5c97f9
@ -235,7 +235,7 @@ type pruneStats struct {
|
|||||||
type prunePlan struct {
|
type prunePlan struct {
|
||||||
removePacksFirst restic.IDSet // packs to remove first (unreferenced packs)
|
removePacksFirst restic.IDSet // packs to remove first (unreferenced packs)
|
||||||
repackPacks restic.IDSet // packs to repack
|
repackPacks restic.IDSet // packs to repack
|
||||||
keepBlobs restic.BlobSet // blobs to keep during repacking
|
keepBlobs restic.CountedBlobSet // blobs to keep during repacking
|
||||||
removePacks restic.IDSet // packs to remove
|
removePacks restic.IDSet // packs to remove
|
||||||
ignorePacks restic.IDSet // packs to ignore when rebuilding the index
|
ignorePacks restic.IDSet // packs to ignore when rebuilding the index
|
||||||
}
|
}
|
||||||
@ -294,46 +294,53 @@ func planPrune(ctx context.Context, opts PruneOptions, gopts GlobalOptions, repo
|
|||||||
return plan, stats, nil
|
return plan, stats, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func packInfoFromIndex(ctx context.Context, idx restic.MasterIndex, usedBlobs restic.BlobSet, stats *pruneStats) (restic.BlobSet, map[restic.ID]packInfo, error) {
|
func packInfoFromIndex(ctx context.Context, idx restic.MasterIndex, usedBlobs restic.CountedBlobSet, stats *pruneStats) (restic.CountedBlobSet, map[restic.ID]packInfo, error) {
|
||||||
keepBlobs := restic.NewBlobSet()
|
|
||||||
duplicateBlobs := make(map[restic.BlobHandle]uint8)
|
|
||||||
|
|
||||||
// iterate over all blobs in index to find out which blobs are duplicates
|
// iterate over all blobs in index to find out which blobs are duplicates
|
||||||
|
// The counter in usedBlobs describes how many instances of the blob exist in the repository index
|
||||||
|
// Thus 0 == blob is missing, 1 == blob exists once, >= 2 == duplicates exist
|
||||||
idx.Each(ctx, func(blob restic.PackedBlob) {
|
idx.Each(ctx, func(blob restic.PackedBlob) {
|
||||||
bh := blob.BlobHandle
|
bh := blob.BlobHandle
|
||||||
size := uint64(blob.Length)
|
size := uint64(blob.Length)
|
||||||
switch {
|
count, ok := usedBlobs[bh]
|
||||||
case usedBlobs.Has(bh): // used blob, move to keepBlobs
|
if ok {
|
||||||
usedBlobs.Delete(bh)
|
if count < math.MaxUint8 {
|
||||||
keepBlobs.Insert(bh)
|
|
||||||
stats.size.used += size
|
|
||||||
stats.blobs.used++
|
|
||||||
case keepBlobs.Has(bh): // duplicate blob
|
|
||||||
count, ok := duplicateBlobs[bh]
|
|
||||||
if !ok {
|
|
||||||
count = 2 // this one is already the second blob!
|
|
||||||
} else if count < math.MaxUint8 {
|
|
||||||
// don't overflow, but saturate count at 255
|
// don't overflow, but saturate count at 255
|
||||||
// this can lead to a non-optimal pack selection, but won't cause
|
// this can lead to a non-optimal pack selection, but won't cause
|
||||||
// problems otherwise
|
// problems otherwise
|
||||||
count++
|
count++
|
||||||
}
|
}
|
||||||
duplicateBlobs[bh] = count
|
|
||||||
|
if count == 1 {
|
||||||
|
stats.size.used += size
|
||||||
|
stats.blobs.used++
|
||||||
|
} else {
|
||||||
|
// duplicate if counted more than once
|
||||||
stats.size.duplicate += size
|
stats.size.duplicate += size
|
||||||
stats.blobs.duplicate++
|
stats.blobs.duplicate++
|
||||||
default:
|
}
|
||||||
|
|
||||||
|
usedBlobs[bh] = count
|
||||||
|
} else {
|
||||||
stats.size.unused += size
|
stats.size.unused += size
|
||||||
stats.blobs.unused++
|
stats.blobs.unused++
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
// Check if all used blobs have been found in index
|
// Check if all used blobs have been found in index
|
||||||
if len(usedBlobs) != 0 {
|
missingBlobs := restic.NewBlobSet()
|
||||||
|
for bh, count := range usedBlobs {
|
||||||
|
if count == 0 {
|
||||||
|
// blob does not exist in any pack files
|
||||||
|
missingBlobs.Insert(bh)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(missingBlobs) != 0 {
|
||||||
Warnf("%v not found in the index\n\n"+
|
Warnf("%v not found in the index\n\n"+
|
||||||
"Integrity check failed: Data seems to be missing.\n"+
|
"Integrity check failed: Data seems to be missing.\n"+
|
||||||
"Will not start prune to prevent (additional) data loss!\n"+
|
"Will not start prune to prevent (additional) data loss!\n"+
|
||||||
"Please report this error (along with the output of the 'prune' run) at\n"+
|
"Please report this error (along with the output of the 'prune' run) at\n"+
|
||||||
"https://github.com/restic/restic/issues/new/choose\n", usedBlobs)
|
"https://github.com/restic/restic/issues/new/choose\n", missingBlobs)
|
||||||
return nil, nil, errorIndexIncomplete
|
return nil, nil, errorIndexIncomplete
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -345,6 +352,7 @@ func packInfoFromIndex(ctx context.Context, idx restic.MasterIndex, usedBlobs re
|
|||||||
indexPack[pid] = packInfo{tpe: restic.NumBlobTypes, usedSize: uint64(hdrSize)}
|
indexPack[pid] = packInfo{tpe: restic.NumBlobTypes, usedSize: uint64(hdrSize)}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
hasDuplicates := false
|
||||||
// iterate over all blobs in index to generate packInfo
|
// iterate over all blobs in index to generate packInfo
|
||||||
idx.Each(ctx, func(blob restic.PackedBlob) {
|
idx.Each(ctx, func(blob restic.PackedBlob) {
|
||||||
ip := indexPack[blob.PackID]
|
ip := indexPack[blob.PackID]
|
||||||
@ -361,10 +369,14 @@ func packInfoFromIndex(ctx context.Context, idx restic.MasterIndex, usedBlobs re
|
|||||||
|
|
||||||
bh := blob.BlobHandle
|
bh := blob.BlobHandle
|
||||||
size := uint64(blob.Length)
|
size := uint64(blob.Length)
|
||||||
_, isDuplicate := duplicateBlobs[bh]
|
dupCount := usedBlobs[bh]
|
||||||
switch {
|
switch {
|
||||||
case isDuplicate: // duplicate blobs will be handled later
|
case dupCount >= 2:
|
||||||
case keepBlobs.Has(bh): // used blob, not duplicate
|
hasDuplicates = true
|
||||||
|
// mark as unused for now, we will later on select one copy
|
||||||
|
ip.unusedSize += size
|
||||||
|
ip.unusedBlobs++
|
||||||
|
case dupCount == 1: // used blob, not duplicate
|
||||||
ip.usedSize += size
|
ip.usedSize += size
|
||||||
ip.usedBlobs++
|
ip.usedBlobs++
|
||||||
default: // unused blob
|
default: // unused blob
|
||||||
@ -382,40 +394,52 @@ func packInfoFromIndex(ctx context.Context, idx restic.MasterIndex, usedBlobs re
|
|||||||
// - mark only one occurence of duplicate blobs as used
|
// - mark only one occurence of duplicate blobs as used
|
||||||
// - if there are already some used blobs in a pack, possibly mark duplicates in this pack as "used"
|
// - if there are already some used blobs in a pack, possibly mark duplicates in this pack as "used"
|
||||||
// - if there are no used blobs in a pack, possibly mark duplicates as "unused"
|
// - if there are no used blobs in a pack, possibly mark duplicates as "unused"
|
||||||
if len(duplicateBlobs) > 0 {
|
if hasDuplicates {
|
||||||
// iterate again over all blobs in index (this is pretty cheap, all in-mem)
|
// iterate again over all blobs in index (this is pretty cheap, all in-mem)
|
||||||
idx.Each(ctx, func(blob restic.PackedBlob) {
|
idx.Each(ctx, func(blob restic.PackedBlob) {
|
||||||
bh := blob.BlobHandle
|
bh := blob.BlobHandle
|
||||||
count, isDuplicate := duplicateBlobs[bh]
|
count, ok := usedBlobs[bh]
|
||||||
if !isDuplicate {
|
// skip non-duplicate, aka. normal blobs
|
||||||
|
// count == 0 is used to mark that this was a duplicate blob with only a single occurence remaining
|
||||||
|
if !ok || count == 1 {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
ip := indexPack[blob.PackID]
|
ip := indexPack[blob.PackID]
|
||||||
size := uint64(blob.Length)
|
size := uint64(blob.Length)
|
||||||
switch {
|
switch {
|
||||||
case count == 0:
|
case ip.usedBlobs > 0, count == 0:
|
||||||
// used duplicate exists -> mark as unused
|
// other used blobs in pack or "last" occurence -> transition to used
|
||||||
ip.unusedSize += size
|
|
||||||
ip.unusedBlobs++
|
|
||||||
case ip.usedBlobs > 0, count == 1:
|
|
||||||
// other used blobs in pack or "last" occurency -> mark as used
|
|
||||||
ip.usedSize += size
|
ip.usedSize += size
|
||||||
ip.usedBlobs++
|
ip.usedBlobs++
|
||||||
// let other occurences be marked as unused
|
ip.unusedSize -= size
|
||||||
duplicateBlobs[bh] = 0
|
ip.unusedBlobs--
|
||||||
|
// let other occurences remain marked as unused
|
||||||
|
usedBlobs[bh] = 1
|
||||||
default:
|
default:
|
||||||
// mark as unused and decrease counter
|
// remain unused and decrease counter
|
||||||
ip.unusedSize += size
|
count--
|
||||||
ip.unusedBlobs++
|
if count == 1 {
|
||||||
duplicateBlobs[bh] = count - 1
|
// setting count to 1 would lead to forgetting that this blob had duplicates
|
||||||
|
// thus use the special value zero. This will select the last instance of the blob for keeping.
|
||||||
|
count = 0
|
||||||
|
}
|
||||||
|
usedBlobs[bh] = count
|
||||||
}
|
}
|
||||||
// update indexPack
|
// update indexPack
|
||||||
indexPack[blob.PackID] = ip
|
indexPack[blob.PackID] = ip
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
return keepBlobs, indexPack, nil
|
// Sanity check. If no duplicates exist, all blobs have value 1. After handling
|
||||||
|
// duplicates, this also applies to duplicates.
|
||||||
|
for _, count := range usedBlobs {
|
||||||
|
if count != 1 {
|
||||||
|
panic("internal error during blob selection")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return usedBlobs, indexPack, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func decidePackAction(ctx context.Context, opts PruneOptions, gopts GlobalOptions, repo restic.Repository, indexPack map[restic.ID]packInfo, stats *pruneStats) (prunePlan, error) {
|
func decidePackAction(ctx context.Context, opts PruneOptions, gopts GlobalOptions, repo restic.Repository, indexPack map[restic.ID]packInfo, stats *pruneStats) (prunePlan, error) {
|
||||||
@ -747,7 +771,7 @@ func rebuildIndexFiles(ctx context.Context, gopts GlobalOptions, repo restic.Rep
|
|||||||
return DeleteFilesChecked(ctx, gopts, repo, obsoleteIndexes, restic.IndexFile)
|
return DeleteFilesChecked(ctx, gopts, repo, obsoleteIndexes, restic.IndexFile)
|
||||||
}
|
}
|
||||||
|
|
||||||
func getUsedBlobs(ctx context.Context, gopts GlobalOptions, repo restic.Repository, ignoreSnapshots restic.IDSet) (usedBlobs restic.BlobSet, err error) {
|
func getUsedBlobs(ctx context.Context, gopts GlobalOptions, repo restic.Repository, ignoreSnapshots restic.IDSet) (usedBlobs restic.CountedBlobSet, err error) {
|
||||||
var snapshotTrees restic.IDs
|
var snapshotTrees restic.IDs
|
||||||
Verbosef("loading all snapshots...\n")
|
Verbosef("loading all snapshots...\n")
|
||||||
err = restic.ForAllSnapshots(ctx, repo.Backend(), repo, ignoreSnapshots,
|
err = restic.ForAllSnapshots(ctx, repo.Backend(), repo, ignoreSnapshots,
|
||||||
@ -766,7 +790,7 @@ func getUsedBlobs(ctx context.Context, gopts GlobalOptions, repo restic.Reposito
|
|||||||
|
|
||||||
Verbosef("finding data that is still in use for %d snapshots\n", len(snapshotTrees))
|
Verbosef("finding data that is still in use for %d snapshots\n", len(snapshotTrees))
|
||||||
|
|
||||||
usedBlobs = restic.NewBlobSet()
|
usedBlobs = restic.NewCountedBlobSet()
|
||||||
|
|
||||||
bar := newProgressMax(!gopts.Quiet, uint64(len(snapshotTrees)), "snapshots")
|
bar := newProgressMax(!gopts.Quiet, uint64(len(snapshotTrees)), "snapshots")
|
||||||
defer bar.Done()
|
defer bar.Done()
|
||||||
|
@ -12,6 +12,12 @@ import (
|
|||||||
"golang.org/x/sync/errgroup"
|
"golang.org/x/sync/errgroup"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type repackBlobSet interface {
|
||||||
|
Has(bh restic.BlobHandle) bool
|
||||||
|
Delete(bh restic.BlobHandle)
|
||||||
|
Len() int
|
||||||
|
}
|
||||||
|
|
||||||
// Repack takes a list of packs together with a list of blobs contained in
|
// Repack takes a list of packs together with a list of blobs contained in
|
||||||
// these packs. Each pack is loaded and the blobs listed in keepBlobs is saved
|
// these packs. Each pack is loaded and the blobs listed in keepBlobs is saved
|
||||||
// into a new pack. Returned is the list of obsolete packs which can then
|
// into a new pack. Returned is the list of obsolete packs which can then
|
||||||
@ -19,8 +25,8 @@ import (
|
|||||||
//
|
//
|
||||||
// The map keepBlobs is modified by Repack, it is used to keep track of which
|
// The map keepBlobs is modified by Repack, it is used to keep track of which
|
||||||
// blobs have been processed.
|
// blobs have been processed.
|
||||||
func Repack(ctx context.Context, repo restic.Repository, dstRepo restic.Repository, packs restic.IDSet, keepBlobs restic.BlobSet, p *progress.Counter) (obsoletePacks restic.IDSet, err error) {
|
func Repack(ctx context.Context, repo restic.Repository, dstRepo restic.Repository, packs restic.IDSet, keepBlobs repackBlobSet, p *progress.Counter) (obsoletePacks restic.IDSet, err error) {
|
||||||
debug.Log("repacking %d packs while keeping %d blobs", len(packs), len(keepBlobs))
|
debug.Log("repacking %d packs while keeping %d blobs", len(packs), keepBlobs.Len())
|
||||||
|
|
||||||
if repo == dstRepo && dstRepo.Connections() < 2 {
|
if repo == dstRepo && dstRepo.Connections() < 2 {
|
||||||
return nil, errors.Fatal("repack step requires a backend connection limit of at least two")
|
return nil, errors.Fatal("repack step requires a backend connection limit of at least two")
|
||||||
@ -41,7 +47,7 @@ func Repack(ctx context.Context, repo restic.Repository, dstRepo restic.Reposito
|
|||||||
return obsoletePacks, nil
|
return obsoletePacks, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func repack(ctx context.Context, repo restic.Repository, dstRepo restic.Repository, packs restic.IDSet, keepBlobs restic.BlobSet, p *progress.Counter) (obsoletePacks restic.IDSet, err error) {
|
func repack(ctx context.Context, repo restic.Repository, dstRepo restic.Repository, packs restic.IDSet, keepBlobs repackBlobSet, p *progress.Counter) (obsoletePacks restic.IDSet, err error) {
|
||||||
wg, wgCtx := errgroup.WithContext(ctx)
|
wg, wgCtx := errgroup.WithContext(ctx)
|
||||||
|
|
||||||
var keepMutex sync.Mutex
|
var keepMutex sync.Mutex
|
||||||
|
@ -31,6 +31,10 @@ func (s BlobSet) Delete(h BlobHandle) {
|
|||||||
delete(s, h)
|
delete(s, h)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s BlobSet) Len() int {
|
||||||
|
return len(s)
|
||||||
|
}
|
||||||
|
|
||||||
// Equals returns true iff s equals other.
|
// Equals returns true iff s equals other.
|
||||||
func (s BlobSet) Equals(other BlobSet) bool {
|
func (s BlobSet) Equals(other BlobSet) bool {
|
||||||
if len(s) != len(other) {
|
if len(s) != len(other) {
|
||||||
|
@ -15,9 +15,14 @@ type Loader interface {
|
|||||||
Connections() uint
|
Connections() uint
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type findBlobSet interface {
|
||||||
|
Has(bh BlobHandle) bool
|
||||||
|
Insert(bh BlobHandle)
|
||||||
|
}
|
||||||
|
|
||||||
// FindUsedBlobs traverses the tree ID and adds all seen blobs (trees and data
|
// FindUsedBlobs traverses the tree ID and adds all seen blobs (trees and data
|
||||||
// blobs) to the set blobs. Already seen tree blobs will not be visited again.
|
// blobs) to the set blobs. Already seen tree blobs will not be visited again.
|
||||||
func FindUsedBlobs(ctx context.Context, repo Loader, treeIDs IDs, blobs BlobSet, p *progress.Counter) error {
|
func FindUsedBlobs(ctx context.Context, repo Loader, treeIDs IDs, blobs findBlobSet, p *progress.Counter) error {
|
||||||
var lock sync.Mutex
|
var lock sync.Mutex
|
||||||
|
|
||||||
wg, ctx := errgroup.WithContext(ctx)
|
wg, ctx := errgroup.WithContext(ctx)
|
||||||
|
Loading…
Reference in New Issue
Block a user