mirror of
https://github.com/octoleo/restic.git
synced 2025-01-11 10:18:10 +00:00
Merge pull request #4763 from MichaelEischer/refactor-prune
Refactor repair index / prune into the repository package
This commit is contained in:
commit
b15d867414
@ -8,6 +8,7 @@ import (
|
||||
|
||||
"github.com/restic/restic/internal/errors"
|
||||
"github.com/restic/restic/internal/restic"
|
||||
"github.com/restic/restic/internal/ui/termstatus"
|
||||
"github.com/spf13/cobra"
|
||||
)
|
||||
|
||||
@ -33,7 +34,9 @@ Exit status is 0 if the command was successful, and non-zero if there was any er
|
||||
`,
|
||||
DisableAutoGenTag: true,
|
||||
RunE: func(cmd *cobra.Command, args []string) error {
|
||||
return runForget(cmd.Context(), forgetOptions, forgetPruneOptions, globalOptions, args)
|
||||
term, cancel := setupTermstatus()
|
||||
defer cancel()
|
||||
return runForget(cmd.Context(), forgetOptions, forgetPruneOptions, globalOptions, term, args)
|
||||
},
|
||||
}
|
||||
|
||||
@ -152,7 +155,7 @@ func verifyForgetOptions(opts *ForgetOptions) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func runForget(ctx context.Context, opts ForgetOptions, pruneOptions PruneOptions, gopts GlobalOptions, args []string) error {
|
||||
func runForget(ctx context.Context, opts ForgetOptions, pruneOptions PruneOptions, gopts GlobalOptions, term *termstatus.Terminal, args []string) error {
|
||||
err := verifyForgetOptions(&opts)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -173,6 +176,12 @@ func runForget(ctx context.Context, opts ForgetOptions, pruneOptions PruneOption
|
||||
}
|
||||
defer unlock()
|
||||
|
||||
verbosity := gopts.verbosity
|
||||
if gopts.JSON {
|
||||
verbosity = 0
|
||||
}
|
||||
printer := newTerminalProgressPrinter(verbosity, term)
|
||||
|
||||
var snapshots restic.Snapshots
|
||||
removeSnIDs := restic.NewIDSet()
|
||||
|
||||
@ -210,15 +219,11 @@ func runForget(ctx context.Context, opts ForgetOptions, pruneOptions PruneOption
|
||||
}
|
||||
|
||||
if policy.Empty() && len(args) == 0 {
|
||||
if !gopts.JSON {
|
||||
Verbosef("no policy was specified, no snapshots will be removed\n")
|
||||
}
|
||||
printer.P("no policy was specified, no snapshots will be removed\n")
|
||||
}
|
||||
|
||||
if !policy.Empty() {
|
||||
if !gopts.JSON {
|
||||
Verbosef("Applying Policy: %v\n", policy)
|
||||
}
|
||||
printer.P("Applying Policy: %v\n", policy)
|
||||
|
||||
for k, snapshotGroup := range snapshotGroups {
|
||||
if gopts.Verbose >= 1 && !gopts.JSON {
|
||||
@ -241,16 +246,16 @@ func runForget(ctx context.Context, opts ForgetOptions, pruneOptions PruneOption
|
||||
keep, remove, reasons := restic.ApplyPolicy(snapshotGroup, policy)
|
||||
|
||||
if len(keep) != 0 && !gopts.Quiet && !gopts.JSON {
|
||||
Printf("keep %d snapshots:\n", len(keep))
|
||||
printer.P("keep %d snapshots:\n", len(keep))
|
||||
PrintSnapshots(globalOptions.stdout, keep, reasons, opts.Compact)
|
||||
Printf("\n")
|
||||
printer.P("\n")
|
||||
}
|
||||
fg.Keep = asJSONSnapshots(keep)
|
||||
|
||||
if len(remove) != 0 && !gopts.Quiet && !gopts.JSON {
|
||||
Printf("remove %d snapshots:\n", len(remove))
|
||||
printer.P("remove %d snapshots:\n", len(remove))
|
||||
PrintSnapshots(globalOptions.stdout, remove, nil, opts.Compact)
|
||||
Printf("\n")
|
||||
printer.P("\n")
|
||||
}
|
||||
fg.Remove = asJSONSnapshots(remove)
|
||||
|
||||
@ -267,14 +272,21 @@ func runForget(ctx context.Context, opts ForgetOptions, pruneOptions PruneOption
|
||||
|
||||
if len(removeSnIDs) > 0 {
|
||||
if !opts.DryRun {
|
||||
err := DeleteFilesChecked(ctx, gopts, repo, removeSnIDs, restic.SnapshotFile)
|
||||
bar := printer.NewCounter("files deleted")
|
||||
err := restic.ParallelRemove(ctx, repo, removeSnIDs, restic.SnapshotFile, func(id restic.ID, err error) error {
|
||||
if err != nil {
|
||||
printer.E("unable to remove %v/%v from the repository\n", restic.SnapshotFile, id)
|
||||
} else {
|
||||
printer.VV("removed %v/%v\n", restic.SnapshotFile, id)
|
||||
}
|
||||
return nil
|
||||
}, bar)
|
||||
bar.Done()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
if !gopts.JSON {
|
||||
Printf("Would have removed the following snapshots:\n%v\n\n", removeSnIDs)
|
||||
}
|
||||
printer.P("Would have removed the following snapshots:\n%v\n\n", removeSnIDs)
|
||||
}
|
||||
}
|
||||
|
||||
@ -286,15 +298,13 @@ func runForget(ctx context.Context, opts ForgetOptions, pruneOptions PruneOption
|
||||
}
|
||||
|
||||
if len(removeSnIDs) > 0 && opts.Prune {
|
||||
if !gopts.JSON {
|
||||
if opts.DryRun {
|
||||
Verbosef("%d snapshots would be removed, running prune dry run\n", len(removeSnIDs))
|
||||
} else {
|
||||
Verbosef("%d snapshots have been removed, running prune\n", len(removeSnIDs))
|
||||
}
|
||||
if opts.DryRun {
|
||||
printer.P("%d snapshots would be removed, running prune dry run\n", len(removeSnIDs))
|
||||
} else {
|
||||
printer.P("%d snapshots have been removed, running prune\n", len(removeSnIDs))
|
||||
}
|
||||
pruneOptions.DryRun = opts.DryRun
|
||||
return runPruneWithRepo(ctx, pruneOptions, gopts, repo, removeSnIDs)
|
||||
return runPruneWithRepo(ctx, pruneOptions, gopts, repo, removeSnIDs, term)
|
||||
}
|
||||
|
||||
return nil
|
||||
|
@ -5,6 +5,7 @@ import (
|
||||
"testing"
|
||||
|
||||
rtest "github.com/restic/restic/internal/test"
|
||||
"github.com/restic/restic/internal/ui/termstatus"
|
||||
)
|
||||
|
||||
func testRunForget(t testing.TB, gopts GlobalOptions, args ...string) {
|
||||
@ -12,5 +13,7 @@ func testRunForget(t testing.TB, gopts GlobalOptions, args ...string) {
|
||||
pruneOpts := PruneOptions{
|
||||
MaxUnused: "5%",
|
||||
}
|
||||
rtest.OK(t, runForget(context.TODO(), opts, pruneOpts, gopts, args))
|
||||
rtest.OK(t, withTermStatus(gopts, func(ctx context.Context, term *termstatus.Terminal) error {
|
||||
return runForget(context.TODO(), opts, pruneOpts, gopts, term, args)
|
||||
}))
|
||||
}
|
||||
|
@ -4,26 +4,20 @@ import (
|
||||
"context"
|
||||
"math"
|
||||
"runtime"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
"github.com/restic/restic/internal/debug"
|
||||
"github.com/restic/restic/internal/errors"
|
||||
"github.com/restic/restic/internal/index"
|
||||
"github.com/restic/restic/internal/pack"
|
||||
"github.com/restic/restic/internal/repository"
|
||||
"github.com/restic/restic/internal/restic"
|
||||
"github.com/restic/restic/internal/ui"
|
||||
"github.com/restic/restic/internal/ui/progress"
|
||||
"github.com/restic/restic/internal/ui/termstatus"
|
||||
|
||||
"github.com/spf13/cobra"
|
||||
)
|
||||
|
||||
var errorIndexIncomplete = errors.Fatal("index is not complete")
|
||||
var errorPacksMissing = errors.Fatal("packs from index missing in repo")
|
||||
var errorSizeNotMatching = errors.Fatal("pack size does not match calculated size from index")
|
||||
|
||||
var cmdPrune = &cobra.Command{
|
||||
Use: "prune [flags]",
|
||||
Short: "Remove unneeded data from the repository",
|
||||
@ -38,7 +32,9 @@ Exit status is 0 if the command was successful, and non-zero if there was any er
|
||||
`,
|
||||
DisableAutoGenTag: true,
|
||||
RunE: func(cmd *cobra.Command, _ []string) error {
|
||||
return runPrune(cmd.Context(), pruneOptions, globalOptions)
|
||||
term, cancel := setupTermstatus()
|
||||
defer cancel()
|
||||
return runPrune(cmd.Context(), pruneOptions, globalOptions, term)
|
||||
},
|
||||
}
|
||||
|
||||
@ -138,7 +134,7 @@ func verifyPruneOptions(opts *PruneOptions) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func runPrune(ctx context.Context, opts PruneOptions, gopts GlobalOptions) error {
|
||||
func runPrune(ctx context.Context, opts PruneOptions, gopts GlobalOptions, term *termstatus.Terminal) error {
|
||||
err := verifyPruneOptions(&opts)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -154,14 +150,6 @@ func runPrune(ctx context.Context, opts PruneOptions, gopts GlobalOptions) error
|
||||
}
|
||||
defer unlock()
|
||||
|
||||
if repo.Connections() < 2 {
|
||||
return errors.Fatal("prune requires a backend connection limit of at least two")
|
||||
}
|
||||
|
||||
if repo.Config().Version < 2 && opts.RepackUncompressed {
|
||||
return errors.Fatal("compression requires at least repository format version 2")
|
||||
}
|
||||
|
||||
if opts.UnsafeNoSpaceRecovery != "" {
|
||||
repoID := repo.Config().ID
|
||||
if opts.UnsafeNoSpaceRecovery != repoID {
|
||||
@ -170,10 +158,10 @@ func runPrune(ctx context.Context, opts PruneOptions, gopts GlobalOptions) error
|
||||
opts.unsafeRecovery = true
|
||||
}
|
||||
|
||||
return runPruneWithRepo(ctx, opts, gopts, repo, restic.NewIDSet())
|
||||
return runPruneWithRepo(ctx, opts, gopts, repo, restic.NewIDSet(), term)
|
||||
}
|
||||
|
||||
func runPruneWithRepo(ctx context.Context, opts PruneOptions, gopts GlobalOptions, repo *repository.Repository, ignoreSnapshots restic.IDSet) error {
|
||||
func runPruneWithRepo(ctx context.Context, opts PruneOptions, gopts GlobalOptions, repo *repository.Repository, ignoreSnapshots restic.IDSet, term *termstatus.Terminal) error {
|
||||
// we do not need index updates while pruning!
|
||||
repo.DisableAutoIndexUpdate()
|
||||
|
||||
@ -181,24 +169,40 @@ func runPruneWithRepo(ctx context.Context, opts PruneOptions, gopts GlobalOption
|
||||
Print("warning: running prune without a cache, this may be very slow!\n")
|
||||
}
|
||||
|
||||
Verbosef("loading indexes...\n")
|
||||
printer := newTerminalProgressPrinter(gopts.verbosity, term)
|
||||
|
||||
printer.P("loading indexes...\n")
|
||||
// loading the index before the snapshots is ok, as we use an exclusive lock here
|
||||
bar := newIndexProgress(gopts.Quiet, gopts.JSON)
|
||||
bar := newIndexTerminalProgress(gopts.Quiet, gopts.JSON, term)
|
||||
err := repo.LoadIndex(ctx, bar)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
plan, stats, err := planPrune(ctx, opts, repo, ignoreSnapshots, gopts.Quiet)
|
||||
popts := repository.PruneOptions{
|
||||
DryRun: opts.DryRun,
|
||||
UnsafeRecovery: opts.unsafeRecovery,
|
||||
|
||||
MaxUnusedBytes: opts.maxUnusedBytes,
|
||||
MaxRepackBytes: opts.MaxRepackBytes,
|
||||
|
||||
RepackCachableOnly: opts.RepackCachableOnly,
|
||||
RepackSmall: opts.RepackSmall,
|
||||
RepackUncompressed: opts.RepackUncompressed,
|
||||
}
|
||||
|
||||
plan, err := repository.PlanPrune(ctx, popts, repo, func(ctx context.Context, repo restic.Repository) (usedBlobs restic.CountedBlobSet, err error) {
|
||||
return getUsedBlobs(ctx, repo, ignoreSnapshots, printer)
|
||||
}, printer)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if opts.DryRun {
|
||||
Verbosef("\nWould have made the following changes:")
|
||||
if popts.DryRun {
|
||||
printer.P("\nWould have made the following changes:")
|
||||
}
|
||||
|
||||
err = printPruneStats(stats)
|
||||
err = printPruneStats(printer, plan.Stats())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -206,605 +210,54 @@ func runPruneWithRepo(ctx context.Context, opts PruneOptions, gopts GlobalOption
|
||||
// Trigger GC to reset garbage collection threshold
|
||||
runtime.GC()
|
||||
|
||||
return doPrune(ctx, opts, gopts, repo, plan)
|
||||
}
|
||||
|
||||
type pruneStats struct {
|
||||
blobs struct {
|
||||
used uint
|
||||
duplicate uint
|
||||
unused uint
|
||||
remove uint
|
||||
repack uint
|
||||
repackrm uint
|
||||
}
|
||||
size struct {
|
||||
used uint64
|
||||
duplicate uint64
|
||||
unused uint64
|
||||
remove uint64
|
||||
repack uint64
|
||||
repackrm uint64
|
||||
unref uint64
|
||||
uncompressed uint64
|
||||
}
|
||||
packs struct {
|
||||
used uint
|
||||
unused uint
|
||||
partlyUsed uint
|
||||
unref uint
|
||||
keep uint
|
||||
repack uint
|
||||
remove uint
|
||||
}
|
||||
}
|
||||
|
||||
type prunePlan struct {
|
||||
removePacksFirst restic.IDSet // packs to remove first (unreferenced packs)
|
||||
repackPacks restic.IDSet // packs to repack
|
||||
keepBlobs restic.CountedBlobSet // blobs to keep during repacking
|
||||
removePacks restic.IDSet // packs to remove
|
||||
ignorePacks restic.IDSet // packs to ignore when rebuilding the index
|
||||
}
|
||||
|
||||
type packInfo struct {
|
||||
usedBlobs uint
|
||||
unusedBlobs uint
|
||||
usedSize uint64
|
||||
unusedSize uint64
|
||||
tpe restic.BlobType
|
||||
uncompressed bool
|
||||
}
|
||||
|
||||
type packInfoWithID struct {
|
||||
ID restic.ID
|
||||
packInfo
|
||||
mustCompress bool
|
||||
}
|
||||
|
||||
// planPrune selects which files to rewrite and which to delete and which blobs to keep.
|
||||
// Also some summary statistics are returned.
|
||||
func planPrune(ctx context.Context, opts PruneOptions, repo restic.Repository, ignoreSnapshots restic.IDSet, quiet bool) (prunePlan, pruneStats, error) {
|
||||
var stats pruneStats
|
||||
|
||||
usedBlobs, err := getUsedBlobs(ctx, repo, ignoreSnapshots, quiet)
|
||||
if err != nil {
|
||||
return prunePlan{}, stats, err
|
||||
}
|
||||
|
||||
Verbosef("searching used packs...\n")
|
||||
keepBlobs, indexPack, err := packInfoFromIndex(ctx, repo.Index(), usedBlobs, &stats)
|
||||
if err != nil {
|
||||
return prunePlan{}, stats, err
|
||||
}
|
||||
|
||||
Verbosef("collecting packs for deletion and repacking\n")
|
||||
plan, err := decidePackAction(ctx, opts, repo, indexPack, &stats, quiet)
|
||||
if err != nil {
|
||||
return prunePlan{}, stats, err
|
||||
}
|
||||
|
||||
if len(plan.repackPacks) != 0 {
|
||||
blobCount := keepBlobs.Len()
|
||||
// when repacking, we do not want to keep blobs which are
|
||||
// already contained in kept packs, so delete them from keepBlobs
|
||||
repo.Index().Each(ctx, func(blob restic.PackedBlob) {
|
||||
if plan.removePacks.Has(blob.PackID) || plan.repackPacks.Has(blob.PackID) {
|
||||
return
|
||||
}
|
||||
keepBlobs.Delete(blob.BlobHandle)
|
||||
})
|
||||
|
||||
if keepBlobs.Len() < blobCount/2 {
|
||||
// replace with copy to shrink map to necessary size if there's a chance to benefit
|
||||
keepBlobs = keepBlobs.Copy()
|
||||
}
|
||||
} else {
|
||||
// keepBlobs is only needed if packs are repacked
|
||||
keepBlobs = nil
|
||||
}
|
||||
plan.keepBlobs = keepBlobs
|
||||
|
||||
return plan, stats, nil
|
||||
}
|
||||
|
||||
func packInfoFromIndex(ctx context.Context, idx restic.MasterIndex, usedBlobs restic.CountedBlobSet, stats *pruneStats) (restic.CountedBlobSet, map[restic.ID]packInfo, error) {
|
||||
// iterate over all blobs in index to find out which blobs are duplicates
|
||||
// The counter in usedBlobs describes how many instances of the blob exist in the repository index
|
||||
// Thus 0 == blob is missing, 1 == blob exists once, >= 2 == duplicates exist
|
||||
idx.Each(ctx, func(blob restic.PackedBlob) {
|
||||
bh := blob.BlobHandle
|
||||
count, ok := usedBlobs[bh]
|
||||
if ok {
|
||||
if count < math.MaxUint8 {
|
||||
// don't overflow, but saturate count at 255
|
||||
// this can lead to a non-optimal pack selection, but won't cause
|
||||
// problems otherwise
|
||||
count++
|
||||
}
|
||||
|
||||
usedBlobs[bh] = count
|
||||
}
|
||||
})
|
||||
|
||||
// Check if all used blobs have been found in index
|
||||
missingBlobs := restic.NewBlobSet()
|
||||
for bh, count := range usedBlobs {
|
||||
if count == 0 {
|
||||
// blob does not exist in any pack files
|
||||
missingBlobs.Insert(bh)
|
||||
}
|
||||
}
|
||||
|
||||
if len(missingBlobs) != 0 {
|
||||
Warnf("%v not found in the index\n\n"+
|
||||
"Integrity check failed: Data seems to be missing.\n"+
|
||||
"Will not start prune to prevent (additional) data loss!\n"+
|
||||
"Please report this error (along with the output of the 'prune' run) at\n"+
|
||||
"https://github.com/restic/restic/issues/new/choose\n", missingBlobs)
|
||||
return nil, nil, errorIndexIncomplete
|
||||
}
|
||||
|
||||
indexPack := make(map[restic.ID]packInfo)
|
||||
|
||||
// save computed pack header size
|
||||
for pid, hdrSize := range pack.Size(ctx, idx, true) {
|
||||
// initialize tpe with NumBlobTypes to indicate it's not set
|
||||
indexPack[pid] = packInfo{tpe: restic.NumBlobTypes, usedSize: uint64(hdrSize)}
|
||||
}
|
||||
|
||||
hasDuplicates := false
|
||||
// iterate over all blobs in index to generate packInfo
|
||||
idx.Each(ctx, func(blob restic.PackedBlob) {
|
||||
ip := indexPack[blob.PackID]
|
||||
|
||||
// Set blob type if not yet set
|
||||
if ip.tpe == restic.NumBlobTypes {
|
||||
ip.tpe = blob.Type
|
||||
}
|
||||
|
||||
// mark mixed packs with "Invalid blob type"
|
||||
if ip.tpe != blob.Type {
|
||||
ip.tpe = restic.InvalidBlob
|
||||
}
|
||||
|
||||
bh := blob.BlobHandle
|
||||
size := uint64(blob.Length)
|
||||
dupCount := usedBlobs[bh]
|
||||
switch {
|
||||
case dupCount >= 2:
|
||||
hasDuplicates = true
|
||||
// mark as unused for now, we will later on select one copy
|
||||
ip.unusedSize += size
|
||||
ip.unusedBlobs++
|
||||
|
||||
// count as duplicate, will later on change one copy to be counted as used
|
||||
stats.size.duplicate += size
|
||||
stats.blobs.duplicate++
|
||||
case dupCount == 1: // used blob, not duplicate
|
||||
ip.usedSize += size
|
||||
ip.usedBlobs++
|
||||
|
||||
stats.size.used += size
|
||||
stats.blobs.used++
|
||||
default: // unused blob
|
||||
ip.unusedSize += size
|
||||
ip.unusedBlobs++
|
||||
|
||||
stats.size.unused += size
|
||||
stats.blobs.unused++
|
||||
}
|
||||
if !blob.IsCompressed() {
|
||||
ip.uncompressed = true
|
||||
}
|
||||
// update indexPack
|
||||
indexPack[blob.PackID] = ip
|
||||
})
|
||||
|
||||
// if duplicate blobs exist, those will be set to either "used" or "unused":
|
||||
// - mark only one occurrence of duplicate blobs as used
|
||||
// - if there are already some used blobs in a pack, possibly mark duplicates in this pack as "used"
|
||||
// - if there are no used blobs in a pack, possibly mark duplicates as "unused"
|
||||
if hasDuplicates {
|
||||
// iterate again over all blobs in index (this is pretty cheap, all in-mem)
|
||||
idx.Each(ctx, func(blob restic.PackedBlob) {
|
||||
bh := blob.BlobHandle
|
||||
count, ok := usedBlobs[bh]
|
||||
// skip non-duplicate, aka. normal blobs
|
||||
// count == 0 is used to mark that this was a duplicate blob with only a single occurrence remaining
|
||||
if !ok || count == 1 {
|
||||
return
|
||||
}
|
||||
|
||||
ip := indexPack[blob.PackID]
|
||||
size := uint64(blob.Length)
|
||||
switch {
|
||||
case ip.usedBlobs > 0, count == 0:
|
||||
// other used blobs in pack or "last" occurrence -> transition to used
|
||||
ip.usedSize += size
|
||||
ip.usedBlobs++
|
||||
ip.unusedSize -= size
|
||||
ip.unusedBlobs--
|
||||
// same for the global statistics
|
||||
stats.size.used += size
|
||||
stats.blobs.used++
|
||||
stats.size.duplicate -= size
|
||||
stats.blobs.duplicate--
|
||||
// let other occurrences remain marked as unused
|
||||
usedBlobs[bh] = 1
|
||||
default:
|
||||
// remain unused and decrease counter
|
||||
count--
|
||||
if count == 1 {
|
||||
// setting count to 1 would lead to forgetting that this blob had duplicates
|
||||
// thus use the special value zero. This will select the last instance of the blob for keeping.
|
||||
count = 0
|
||||
}
|
||||
usedBlobs[bh] = count
|
||||
}
|
||||
// update indexPack
|
||||
indexPack[blob.PackID] = ip
|
||||
})
|
||||
}
|
||||
|
||||
// Sanity check. If no duplicates exist, all blobs have value 1. After handling
|
||||
// duplicates, this also applies to duplicates.
|
||||
for _, count := range usedBlobs {
|
||||
if count != 1 {
|
||||
panic("internal error during blob selection")
|
||||
}
|
||||
}
|
||||
|
||||
return usedBlobs, indexPack, nil
|
||||
}
|
||||
|
||||
func decidePackAction(ctx context.Context, opts PruneOptions, repo restic.Repository, indexPack map[restic.ID]packInfo, stats *pruneStats, quiet bool) (prunePlan, error) {
|
||||
removePacksFirst := restic.NewIDSet()
|
||||
removePacks := restic.NewIDSet()
|
||||
repackPacks := restic.NewIDSet()
|
||||
|
||||
var repackCandidates []packInfoWithID
|
||||
var repackSmallCandidates []packInfoWithID
|
||||
repoVersion := repo.Config().Version
|
||||
// only repack very small files by default
|
||||
targetPackSize := repo.PackSize() / 25
|
||||
if opts.RepackSmall {
|
||||
// consider files with at least 80% of the target size as large enough
|
||||
targetPackSize = repo.PackSize() / 5 * 4
|
||||
}
|
||||
|
||||
// loop over all packs and decide what to do
|
||||
bar := newProgressMax(!quiet, uint64(len(indexPack)), "packs processed")
|
||||
err := repo.List(ctx, restic.PackFile, func(id restic.ID, packSize int64) error {
|
||||
p, ok := indexPack[id]
|
||||
if !ok {
|
||||
// Pack was not referenced in index and is not used => immediately remove!
|
||||
Verboseff("will remove pack %v as it is unused and not indexed\n", id.Str())
|
||||
removePacksFirst.Insert(id)
|
||||
stats.size.unref += uint64(packSize)
|
||||
return nil
|
||||
}
|
||||
|
||||
if p.unusedSize+p.usedSize != uint64(packSize) && p.usedBlobs != 0 {
|
||||
// Pack size does not fit and pack is needed => error
|
||||
// If the pack is not needed, this is no error, the pack can
|
||||
// and will be simply removed, see below.
|
||||
Warnf("pack %s: calculated size %d does not match real size %d\nRun 'restic repair index'.\n",
|
||||
id.Str(), p.unusedSize+p.usedSize, packSize)
|
||||
return errorSizeNotMatching
|
||||
}
|
||||
|
||||
// statistics
|
||||
switch {
|
||||
case p.usedBlobs == 0:
|
||||
stats.packs.unused++
|
||||
case p.unusedBlobs == 0:
|
||||
stats.packs.used++
|
||||
default:
|
||||
stats.packs.partlyUsed++
|
||||
}
|
||||
|
||||
if p.uncompressed {
|
||||
stats.size.uncompressed += p.unusedSize + p.usedSize
|
||||
}
|
||||
mustCompress := false
|
||||
if repoVersion >= 2 {
|
||||
// repo v2: always repack tree blobs if uncompressed
|
||||
// compress data blobs if requested
|
||||
mustCompress = (p.tpe == restic.TreeBlob || opts.RepackUncompressed) && p.uncompressed
|
||||
}
|
||||
|
||||
// decide what to do
|
||||
switch {
|
||||
case p.usedBlobs == 0:
|
||||
// All blobs in pack are no longer used => remove pack!
|
||||
removePacks.Insert(id)
|
||||
stats.blobs.remove += p.unusedBlobs
|
||||
stats.size.remove += p.unusedSize
|
||||
|
||||
case opts.RepackCachableOnly && p.tpe == restic.DataBlob:
|
||||
// if this is a data pack and --repack-cacheable-only is set => keep pack!
|
||||
stats.packs.keep++
|
||||
|
||||
case p.unusedBlobs == 0 && p.tpe != restic.InvalidBlob && !mustCompress:
|
||||
if packSize >= int64(targetPackSize) {
|
||||
// All blobs in pack are used and not mixed => keep pack!
|
||||
stats.packs.keep++
|
||||
} else {
|
||||
repackSmallCandidates = append(repackSmallCandidates, packInfoWithID{ID: id, packInfo: p, mustCompress: mustCompress})
|
||||
}
|
||||
|
||||
default:
|
||||
// all other packs are candidates for repacking
|
||||
repackCandidates = append(repackCandidates, packInfoWithID{ID: id, packInfo: p, mustCompress: mustCompress})
|
||||
}
|
||||
|
||||
delete(indexPack, id)
|
||||
bar.Add(1)
|
||||
return nil
|
||||
})
|
||||
bar.Done()
|
||||
if err != nil {
|
||||
return prunePlan{}, err
|
||||
}
|
||||
|
||||
// At this point indexPacks contains only missing packs!
|
||||
|
||||
// missing packs that are not needed can be ignored
|
||||
ignorePacks := restic.NewIDSet()
|
||||
for id, p := range indexPack {
|
||||
if p.usedBlobs == 0 {
|
||||
ignorePacks.Insert(id)
|
||||
stats.blobs.remove += p.unusedBlobs
|
||||
stats.size.remove += p.unusedSize
|
||||
delete(indexPack, id)
|
||||
}
|
||||
}
|
||||
|
||||
if len(indexPack) != 0 {
|
||||
Warnf("The index references %d needed pack files which are missing from the repository:\n", len(indexPack))
|
||||
for id := range indexPack {
|
||||
Warnf(" %v\n", id)
|
||||
}
|
||||
return prunePlan{}, errorPacksMissing
|
||||
}
|
||||
if len(ignorePacks) != 0 {
|
||||
Warnf("Missing but unneeded pack files are referenced in the index, will be repaired\n")
|
||||
for id := range ignorePacks {
|
||||
Warnf("will forget missing pack file %v\n", id)
|
||||
}
|
||||
}
|
||||
|
||||
if len(repackSmallCandidates) < 10 {
|
||||
// too few small files to be worth the trouble, this also prevents endlessly repacking
|
||||
// if there is just a single pack file below the target size
|
||||
stats.packs.keep += uint(len(repackSmallCandidates))
|
||||
} else {
|
||||
repackCandidates = append(repackCandidates, repackSmallCandidates...)
|
||||
}
|
||||
|
||||
// Sort repackCandidates such that packs with highest ratio unused/used space are picked first.
|
||||
// This is equivalent to sorting by unused / total space.
|
||||
// Instead of unused[i] / used[i] > unused[j] / used[j] we use
|
||||
// unused[i] * used[j] > unused[j] * used[i] as uint32*uint32 < uint64
|
||||
// Moreover packs containing trees and too small packs are sorted to the beginning
|
||||
sort.Slice(repackCandidates, func(i, j int) bool {
|
||||
pi := repackCandidates[i].packInfo
|
||||
pj := repackCandidates[j].packInfo
|
||||
switch {
|
||||
case pi.tpe != restic.DataBlob && pj.tpe == restic.DataBlob:
|
||||
return true
|
||||
case pj.tpe != restic.DataBlob && pi.tpe == restic.DataBlob:
|
||||
return false
|
||||
case pi.unusedSize+pi.usedSize < uint64(targetPackSize) && pj.unusedSize+pj.usedSize >= uint64(targetPackSize):
|
||||
return true
|
||||
case pj.unusedSize+pj.usedSize < uint64(targetPackSize) && pi.unusedSize+pi.usedSize >= uint64(targetPackSize):
|
||||
return false
|
||||
}
|
||||
return pi.unusedSize*pj.usedSize > pj.unusedSize*pi.usedSize
|
||||
})
|
||||
|
||||
repack := func(id restic.ID, p packInfo) {
|
||||
repackPacks.Insert(id)
|
||||
stats.blobs.repack += p.unusedBlobs + p.usedBlobs
|
||||
stats.size.repack += p.unusedSize + p.usedSize
|
||||
stats.blobs.repackrm += p.unusedBlobs
|
||||
stats.size.repackrm += p.unusedSize
|
||||
if p.uncompressed {
|
||||
stats.size.uncompressed -= p.unusedSize + p.usedSize
|
||||
}
|
||||
}
|
||||
|
||||
// calculate limit for number of unused bytes in the repo after repacking
|
||||
maxUnusedSizeAfter := opts.maxUnusedBytes(stats.size.used)
|
||||
|
||||
for _, p := range repackCandidates {
|
||||
reachedUnusedSizeAfter := (stats.size.unused-stats.size.remove-stats.size.repackrm < maxUnusedSizeAfter)
|
||||
reachedRepackSize := stats.size.repack+p.unusedSize+p.usedSize >= opts.MaxRepackBytes
|
||||
packIsLargeEnough := p.unusedSize+p.usedSize >= uint64(targetPackSize)
|
||||
|
||||
switch {
|
||||
case reachedRepackSize:
|
||||
stats.packs.keep++
|
||||
|
||||
case p.tpe != restic.DataBlob, p.mustCompress:
|
||||
// repacking non-data packs / uncompressed-trees is only limited by repackSize
|
||||
repack(p.ID, p.packInfo)
|
||||
|
||||
case reachedUnusedSizeAfter && packIsLargeEnough:
|
||||
// for all other packs stop repacking if tolerated unused size is reached.
|
||||
stats.packs.keep++
|
||||
|
||||
default:
|
||||
repack(p.ID, p.packInfo)
|
||||
}
|
||||
}
|
||||
|
||||
stats.packs.unref = uint(len(removePacksFirst))
|
||||
stats.packs.repack = uint(len(repackPacks))
|
||||
stats.packs.remove = uint(len(removePacks))
|
||||
|
||||
if repo.Config().Version < 2 {
|
||||
// compression not supported for repository format version 1
|
||||
stats.size.uncompressed = 0
|
||||
}
|
||||
|
||||
return prunePlan{removePacksFirst: removePacksFirst,
|
||||
removePacks: removePacks,
|
||||
repackPacks: repackPacks,
|
||||
ignorePacks: ignorePacks,
|
||||
}, nil
|
||||
return plan.Execute(ctx, printer)
|
||||
}
|
||||
|
||||
// printPruneStats prints out the statistics
|
||||
func printPruneStats(stats pruneStats) error {
|
||||
Verboseff("\nused: %10d blobs / %s\n", stats.blobs.used, ui.FormatBytes(stats.size.used))
|
||||
if stats.blobs.duplicate > 0 {
|
||||
Verboseff("duplicates: %10d blobs / %s\n", stats.blobs.duplicate, ui.FormatBytes(stats.size.duplicate))
|
||||
func printPruneStats(printer progress.Printer, stats repository.PruneStats) error {
|
||||
printer.V("\nused: %10d blobs / %s\n", stats.Blobs.Used, ui.FormatBytes(stats.Size.Used))
|
||||
if stats.Blobs.Duplicate > 0 {
|
||||
printer.V("duplicates: %10d blobs / %s\n", stats.Blobs.Duplicate, ui.FormatBytes(stats.Size.Duplicate))
|
||||
}
|
||||
Verboseff("unused: %10d blobs / %s\n", stats.blobs.unused, ui.FormatBytes(stats.size.unused))
|
||||
if stats.size.unref > 0 {
|
||||
Verboseff("unreferenced: %s\n", ui.FormatBytes(stats.size.unref))
|
||||
printer.V("unused: %10d blobs / %s\n", stats.Blobs.Unused, ui.FormatBytes(stats.Size.Unused))
|
||||
if stats.Size.Unref > 0 {
|
||||
printer.V("unreferenced: %s\n", ui.FormatBytes(stats.Size.Unref))
|
||||
}
|
||||
totalBlobs := stats.blobs.used + stats.blobs.unused + stats.blobs.duplicate
|
||||
totalSize := stats.size.used + stats.size.duplicate + stats.size.unused + stats.size.unref
|
||||
unusedSize := stats.size.duplicate + stats.size.unused
|
||||
Verboseff("total: %10d blobs / %s\n", totalBlobs, ui.FormatBytes(totalSize))
|
||||
Verboseff("unused size: %s of total size\n", ui.FormatPercent(unusedSize, totalSize))
|
||||
totalBlobs := stats.Blobs.Used + stats.Blobs.Unused + stats.Blobs.Duplicate
|
||||
totalSize := stats.Size.Used + stats.Size.Duplicate + stats.Size.Unused + stats.Size.Unref
|
||||
unusedSize := stats.Size.Duplicate + stats.Size.Unused
|
||||
printer.V("total: %10d blobs / %s\n", totalBlobs, ui.FormatBytes(totalSize))
|
||||
printer.V("unused size: %s of total size\n", ui.FormatPercent(unusedSize, totalSize))
|
||||
|
||||
Verbosef("\nto repack: %10d blobs / %s\n", stats.blobs.repack, ui.FormatBytes(stats.size.repack))
|
||||
Verbosef("this removes: %10d blobs / %s\n", stats.blobs.repackrm, ui.FormatBytes(stats.size.repackrm))
|
||||
Verbosef("to delete: %10d blobs / %s\n", stats.blobs.remove, ui.FormatBytes(stats.size.remove+stats.size.unref))
|
||||
totalPruneSize := stats.size.remove + stats.size.repackrm + stats.size.unref
|
||||
Verbosef("total prune: %10d blobs / %s\n", stats.blobs.remove+stats.blobs.repackrm, ui.FormatBytes(totalPruneSize))
|
||||
if stats.size.uncompressed > 0 {
|
||||
Verbosef("not yet compressed: %s\n", ui.FormatBytes(stats.size.uncompressed))
|
||||
printer.P("\nto repack: %10d blobs / %s\n", stats.Blobs.Repack, ui.FormatBytes(stats.Size.Repack))
|
||||
printer.P("this removes: %10d blobs / %s\n", stats.Blobs.Repackrm, ui.FormatBytes(stats.Size.Repackrm))
|
||||
printer.P("to delete: %10d blobs / %s\n", stats.Blobs.Remove, ui.FormatBytes(stats.Size.Remove+stats.Size.Unref))
|
||||
totalPruneSize := stats.Size.Remove + stats.Size.Repackrm + stats.Size.Unref
|
||||
printer.P("total prune: %10d blobs / %s\n", stats.Blobs.Remove+stats.Blobs.Repackrm, ui.FormatBytes(totalPruneSize))
|
||||
if stats.Size.Uncompressed > 0 {
|
||||
printer.P("not yet compressed: %s\n", ui.FormatBytes(stats.Size.Uncompressed))
|
||||
}
|
||||
Verbosef("remaining: %10d blobs / %s\n", totalBlobs-(stats.blobs.remove+stats.blobs.repackrm), ui.FormatBytes(totalSize-totalPruneSize))
|
||||
unusedAfter := unusedSize - stats.size.remove - stats.size.repackrm
|
||||
Verbosef("unused size after prune: %s (%s of remaining size)\n",
|
||||
printer.P("remaining: %10d blobs / %s\n", totalBlobs-(stats.Blobs.Remove+stats.Blobs.Repackrm), ui.FormatBytes(totalSize-totalPruneSize))
|
||||
unusedAfter := unusedSize - stats.Size.Remove - stats.Size.Repackrm
|
||||
printer.P("unused size after prune: %s (%s of remaining size)\n",
|
||||
ui.FormatBytes(unusedAfter), ui.FormatPercent(unusedAfter, totalSize-totalPruneSize))
|
||||
Verbosef("\n")
|
||||
Verboseff("totally used packs: %10d\n", stats.packs.used)
|
||||
Verboseff("partly used packs: %10d\n", stats.packs.partlyUsed)
|
||||
Verboseff("unused packs: %10d\n\n", stats.packs.unused)
|
||||
printer.P("\n")
|
||||
printer.V("totally used packs: %10d\n", stats.Packs.Used)
|
||||
printer.V("partly used packs: %10d\n", stats.Packs.PartlyUsed)
|
||||
printer.V("unused packs: %10d\n\n", stats.Packs.Unused)
|
||||
|
||||
Verboseff("to keep: %10d packs\n", stats.packs.keep)
|
||||
Verboseff("to repack: %10d packs\n", stats.packs.repack)
|
||||
Verboseff("to delete: %10d packs\n", stats.packs.remove)
|
||||
if stats.packs.unref > 0 {
|
||||
Verboseff("to delete: %10d unreferenced packs\n\n", stats.packs.unref)
|
||||
printer.V("to keep: %10d packs\n", stats.Packs.Keep)
|
||||
printer.V("to repack: %10d packs\n", stats.Packs.Repack)
|
||||
printer.V("to delete: %10d packs\n", stats.Packs.Remove)
|
||||
if stats.Packs.Unref > 0 {
|
||||
printer.V("to delete: %10d unreferenced packs\n\n", stats.Packs.Unref)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// doPrune does the actual pruning:
|
||||
// - remove unreferenced packs first
|
||||
// - repack given pack files while keeping the given blobs
|
||||
// - rebuild the index while ignoring all files that will be deleted
|
||||
// - delete the files
|
||||
// plan.removePacks and plan.ignorePacks are modified in this function.
|
||||
func doPrune(ctx context.Context, opts PruneOptions, gopts GlobalOptions, repo restic.Repository, plan prunePlan) (err error) {
|
||||
if opts.DryRun {
|
||||
if !gopts.JSON && gopts.verbosity >= 2 {
|
||||
Printf("Repeated prune dry-runs can report slightly different amounts of data to keep or repack. This is expected behavior.\n\n")
|
||||
if len(plan.removePacksFirst) > 0 {
|
||||
Printf("Would have removed the following unreferenced packs:\n%v\n\n", plan.removePacksFirst)
|
||||
}
|
||||
Printf("Would have repacked and removed the following packs:\n%v\n\n", plan.repackPacks)
|
||||
Printf("Would have removed the following no longer used packs:\n%v\n\n", plan.removePacks)
|
||||
}
|
||||
// Always quit here if DryRun was set!
|
||||
return nil
|
||||
}
|
||||
|
||||
// unreferenced packs can be safely deleted first
|
||||
if len(plan.removePacksFirst) != 0 {
|
||||
Verbosef("deleting unreferenced packs\n")
|
||||
DeleteFiles(ctx, gopts, repo, plan.removePacksFirst, restic.PackFile)
|
||||
}
|
||||
|
||||
if len(plan.repackPacks) != 0 {
|
||||
Verbosef("repacking packs\n")
|
||||
bar := newProgressMax(!gopts.Quiet, uint64(len(plan.repackPacks)), "packs repacked")
|
||||
_, err := repository.Repack(ctx, repo, repo, plan.repackPacks, plan.keepBlobs, bar)
|
||||
bar.Done()
|
||||
if err != nil {
|
||||
return errors.Fatal(err.Error())
|
||||
}
|
||||
|
||||
// Also remove repacked packs
|
||||
plan.removePacks.Merge(plan.repackPacks)
|
||||
|
||||
if len(plan.keepBlobs) != 0 {
|
||||
Warnf("%v was not repacked\n\n"+
|
||||
"Integrity check failed.\n"+
|
||||
"Please report this error (along with the output of the 'prune' run) at\n"+
|
||||
"https://github.com/restic/restic/issues/new/choose\n", plan.keepBlobs)
|
||||
return errors.Fatal("internal error: blobs were not repacked")
|
||||
}
|
||||
|
||||
// allow GC of the blob set
|
||||
plan.keepBlobs = nil
|
||||
}
|
||||
|
||||
if len(plan.ignorePacks) == 0 {
|
||||
plan.ignorePacks = plan.removePacks
|
||||
} else {
|
||||
plan.ignorePacks.Merge(plan.removePacks)
|
||||
}
|
||||
|
||||
if opts.unsafeRecovery {
|
||||
Verbosef("deleting index files\n")
|
||||
indexFiles := repo.Index().(*index.MasterIndex).IDs()
|
||||
err = DeleteFilesChecked(ctx, gopts, repo, indexFiles, restic.IndexFile)
|
||||
if err != nil {
|
||||
return errors.Fatalf("%s", err)
|
||||
}
|
||||
} else if len(plan.ignorePacks) != 0 {
|
||||
err = rebuildIndexFiles(ctx, gopts, repo, plan.ignorePacks, nil, false)
|
||||
if err != nil {
|
||||
return errors.Fatalf("%s", err)
|
||||
}
|
||||
}
|
||||
|
||||
if len(plan.removePacks) != 0 {
|
||||
Verbosef("removing %d old packs\n", len(plan.removePacks))
|
||||
DeleteFiles(ctx, gopts, repo, plan.removePacks, restic.PackFile)
|
||||
}
|
||||
|
||||
if opts.unsafeRecovery {
|
||||
err = rebuildIndexFiles(ctx, gopts, repo, plan.ignorePacks, nil, true)
|
||||
if err != nil {
|
||||
return errors.Fatalf("%s", err)
|
||||
}
|
||||
}
|
||||
|
||||
Verbosef("done\n")
|
||||
return nil
|
||||
}
|
||||
|
||||
func rebuildIndexFiles(ctx context.Context, gopts GlobalOptions, repo restic.Repository, removePacks restic.IDSet, extraObsolete restic.IDs, skipDeletion bool) error {
|
||||
Verbosef("rebuilding index\n")
|
||||
|
||||
bar := newProgressMax(!gopts.Quiet, 0, "packs processed")
|
||||
return repo.Index().Save(ctx, repo, removePacks, extraObsolete, restic.MasterIndexSaveOpts{
|
||||
SaveProgress: bar,
|
||||
DeleteProgress: func() *progress.Counter {
|
||||
return newProgressMax(!gopts.Quiet, 0, "old indexes deleted")
|
||||
},
|
||||
DeleteReport: func(id restic.ID, _ error) {
|
||||
if gopts.verbosity > 2 {
|
||||
Verbosef("removed index %v\n", id.String())
|
||||
}
|
||||
},
|
||||
SkipDeletion: skipDeletion,
|
||||
})
|
||||
}
|
||||
|
||||
func getUsedBlobs(ctx context.Context, repo restic.Repository, ignoreSnapshots restic.IDSet, quiet bool) (usedBlobs restic.CountedBlobSet, err error) {
|
||||
func getUsedBlobs(ctx context.Context, repo restic.Repository, ignoreSnapshots restic.IDSet, printer progress.Printer) (usedBlobs restic.CountedBlobSet, err error) {
|
||||
var snapshotTrees restic.IDs
|
||||
Verbosef("loading all snapshots...\n")
|
||||
printer.P("loading all snapshots...\n")
|
||||
err = restic.ForAllSnapshots(ctx, repo, repo, ignoreSnapshots,
|
||||
func(id restic.ID, sn *restic.Snapshot, err error) error {
|
||||
if err != nil {
|
||||
@ -819,11 +272,12 @@ func getUsedBlobs(ctx context.Context, repo restic.Repository, ignoreSnapshots r
|
||||
return nil, errors.Fatalf("failed loading snapshot: %v", err)
|
||||
}
|
||||
|
||||
Verbosef("finding data that is still in use for %d snapshots\n", len(snapshotTrees))
|
||||
printer.P("finding data that is still in use for %d snapshots\n", len(snapshotTrees))
|
||||
|
||||
usedBlobs = restic.NewCountedBlobSet()
|
||||
|
||||
bar := newProgressMax(!quiet, uint64(len(snapshotTrees)), "snapshots")
|
||||
bar := printer.NewCounter("snapshots")
|
||||
bar.SetMax(uint64(len(snapshotTrees)))
|
||||
defer bar.Done()
|
||||
|
||||
err = restic.FindUsedBlobs(ctx, repo, snapshotTrees, usedBlobs, bar)
|
||||
|
@ -7,7 +7,9 @@ import (
|
||||
"testing"
|
||||
|
||||
"github.com/restic/restic/internal/backend"
|
||||
"github.com/restic/restic/internal/repository"
|
||||
rtest "github.com/restic/restic/internal/test"
|
||||
"github.com/restic/restic/internal/ui/termstatus"
|
||||
)
|
||||
|
||||
func testRunPrune(t testing.TB, gopts GlobalOptions, opts PruneOptions) {
|
||||
@ -16,7 +18,9 @@ func testRunPrune(t testing.TB, gopts GlobalOptions, opts PruneOptions) {
|
||||
defer func() {
|
||||
gopts.backendTestHook = oldHook
|
||||
}()
|
||||
rtest.OK(t, runPrune(context.TODO(), opts, gopts))
|
||||
rtest.OK(t, withTermStatus(gopts, func(ctx context.Context, term *termstatus.Terminal) error {
|
||||
return runPrune(context.TODO(), opts, gopts, term)
|
||||
}))
|
||||
}
|
||||
|
||||
func TestPrune(t *testing.T) {
|
||||
@ -31,7 +35,7 @@ func testPruneVariants(t *testing.T, unsafeNoSpaceRecovery bool) {
|
||||
}
|
||||
t.Run("0"+suffix, func(t *testing.T) {
|
||||
opts := PruneOptions{MaxUnused: "0%", unsafeRecovery: unsafeNoSpaceRecovery}
|
||||
checkOpts := CheckOptions{ReadData: true, CheckUnused: true}
|
||||
checkOpts := CheckOptions{ReadData: true, CheckUnused: !unsafeNoSpaceRecovery}
|
||||
testPrune(t, opts, checkOpts)
|
||||
})
|
||||
|
||||
@ -84,7 +88,9 @@ func testRunForgetJSON(t testing.TB, gopts GlobalOptions, args ...string) {
|
||||
pruneOpts := PruneOptions{
|
||||
MaxUnused: "5%",
|
||||
}
|
||||
return runForget(context.TODO(), opts, pruneOpts, gopts, args)
|
||||
return withTermStatus(gopts, func(ctx context.Context, term *termstatus.Terminal) error {
|
||||
return runForget(context.TODO(), opts, pruneOpts, gopts, term, args)
|
||||
})
|
||||
})
|
||||
rtest.OK(t, err)
|
||||
|
||||
@ -138,7 +144,9 @@ func TestPruneWithDamagedRepository(t *testing.T) {
|
||||
env.gopts.backendTestHook = oldHook
|
||||
}()
|
||||
// prune should fail
|
||||
rtest.Assert(t, runPrune(context.TODO(), pruneDefaultOptions, env.gopts) == errorPacksMissing,
|
||||
rtest.Assert(t, withTermStatus(env.gopts, func(ctx context.Context, term *termstatus.Terminal) error {
|
||||
return runPrune(context.TODO(), pruneDefaultOptions, env.gopts, term)
|
||||
}) == repository.ErrPacksMissing,
|
||||
"prune should have reported index not complete error")
|
||||
}
|
||||
|
||||
@ -218,7 +226,9 @@ func testEdgeCaseRepo(t *testing.T, tarfile string, optionsCheck CheckOptions, o
|
||||
testRunPrune(t, env.gopts, optionsPrune)
|
||||
testRunCheck(t, env.gopts)
|
||||
} else {
|
||||
rtest.Assert(t, runPrune(context.TODO(), optionsPrune, env.gopts) != nil,
|
||||
rtest.Assert(t, withTermStatus(env.gopts, func(ctx context.Context, term *termstatus.Terminal) error {
|
||||
return runPrune(context.TODO(), optionsPrune, env.gopts, term)
|
||||
}) != nil,
|
||||
"prune should have reported an error")
|
||||
}
|
||||
}
|
||||
|
@ -3,10 +3,8 @@ package main
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/restic/restic/internal/index"
|
||||
"github.com/restic/restic/internal/pack"
|
||||
"github.com/restic/restic/internal/repository"
|
||||
"github.com/restic/restic/internal/restic"
|
||||
"github.com/restic/restic/internal/ui/termstatus"
|
||||
"github.com/spf13/cobra"
|
||||
"github.com/spf13/pflag"
|
||||
)
|
||||
@ -25,7 +23,9 @@ Exit status is 0 if the command was successful, and non-zero if there was any er
|
||||
`,
|
||||
DisableAutoGenTag: true,
|
||||
RunE: func(cmd *cobra.Command, _ []string) error {
|
||||
return runRebuildIndex(cmd.Context(), repairIndexOptions, globalOptions)
|
||||
term, cancel := setupTermstatus()
|
||||
defer cancel()
|
||||
return runRebuildIndex(cmd.Context(), repairIndexOptions, globalOptions, term)
|
||||
},
|
||||
}
|
||||
|
||||
@ -55,105 +55,22 @@ func init() {
|
||||
}
|
||||
}
|
||||
|
||||
func runRebuildIndex(ctx context.Context, opts RepairIndexOptions, gopts GlobalOptions) error {
|
||||
func runRebuildIndex(ctx context.Context, opts RepairIndexOptions, gopts GlobalOptions, term *termstatus.Terminal) error {
|
||||
ctx, repo, unlock, err := openWithExclusiveLock(ctx, gopts, false)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer unlock()
|
||||
|
||||
return rebuildIndex(ctx, opts, gopts, repo)
|
||||
}
|
||||
printer := newTerminalProgressPrinter(gopts.verbosity, term)
|
||||
|
||||
func rebuildIndex(ctx context.Context, opts RepairIndexOptions, gopts GlobalOptions, repo *repository.Repository) error {
|
||||
var obsoleteIndexes restic.IDs
|
||||
packSizeFromList := make(map[restic.ID]int64)
|
||||
packSizeFromIndex := make(map[restic.ID]int64)
|
||||
removePacks := restic.NewIDSet()
|
||||
|
||||
if opts.ReadAllPacks {
|
||||
// get list of old index files but start with empty index
|
||||
err := repo.List(ctx, restic.IndexFile, func(id restic.ID, _ int64) error {
|
||||
obsoleteIndexes = append(obsoleteIndexes, id)
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
Verbosef("loading indexes...\n")
|
||||
mi := index.NewMasterIndex()
|
||||
err := index.ForAllIndexes(ctx, repo, repo, func(id restic.ID, idx *index.Index, _ bool, err error) error {
|
||||
if err != nil {
|
||||
Warnf("removing invalid index %v: %v\n", id, err)
|
||||
obsoleteIndexes = append(obsoleteIndexes, id)
|
||||
return nil
|
||||
}
|
||||
|
||||
mi.Insert(idx)
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = mi.MergeFinalIndexes()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = repo.SetIndex(mi)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
packSizeFromIndex = pack.Size(ctx, repo.Index(), false)
|
||||
}
|
||||
|
||||
Verbosef("getting pack files to read...\n")
|
||||
err := repo.List(ctx, restic.PackFile, func(id restic.ID, packSize int64) error {
|
||||
size, ok := packSizeFromIndex[id]
|
||||
if !ok || size != packSize {
|
||||
// Pack was not referenced in index or size does not match
|
||||
packSizeFromList[id] = packSize
|
||||
removePacks.Insert(id)
|
||||
}
|
||||
if !ok {
|
||||
Warnf("adding pack file to index %v\n", id)
|
||||
} else if size != packSize {
|
||||
Warnf("reindexing pack file %v with unexpected size %v instead of %v\n", id, packSize, size)
|
||||
}
|
||||
delete(packSizeFromIndex, id)
|
||||
return nil
|
||||
})
|
||||
err = repository.RepairIndex(ctx, repo, repository.RepairIndexOptions{
|
||||
ReadAllPacks: opts.ReadAllPacks,
|
||||
}, printer)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for id := range packSizeFromIndex {
|
||||
// forget pack files that are referenced in the index but do not exist
|
||||
// when rebuilding the index
|
||||
removePacks.Insert(id)
|
||||
Warnf("removing not found pack file %v\n", id)
|
||||
}
|
||||
|
||||
if len(packSizeFromList) > 0 {
|
||||
Verbosef("reading pack files\n")
|
||||
bar := newProgressMax(!gopts.Quiet, uint64(len(packSizeFromList)), "packs")
|
||||
invalidFiles, err := repo.CreateIndexFromPacks(ctx, packSizeFromList, bar)
|
||||
bar.Done()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, id := range invalidFiles {
|
||||
Verboseff("skipped incomplete pack file: %v\n", id)
|
||||
}
|
||||
}
|
||||
|
||||
err = rebuildIndexFiles(ctx, gopts, repo, removePacks, obsoleteIndexes, false)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
Verbosef("done\n")
|
||||
|
||||
printer.P("done\n")
|
||||
return nil
|
||||
}
|
||||
|
@ -13,12 +13,15 @@ import (
|
||||
"github.com/restic/restic/internal/index"
|
||||
"github.com/restic/restic/internal/restic"
|
||||
rtest "github.com/restic/restic/internal/test"
|
||||
"github.com/restic/restic/internal/ui/termstatus"
|
||||
)
|
||||
|
||||
func testRunRebuildIndex(t testing.TB, gopts GlobalOptions) {
|
||||
rtest.OK(t, withRestoreGlobalOptions(func() error {
|
||||
globalOptions.stdout = io.Discard
|
||||
return runRebuildIndex(context.TODO(), RepairIndexOptions{}, gopts)
|
||||
return withTermStatus(gopts, func(ctx context.Context, term *termstatus.Terminal) error {
|
||||
globalOptions.stdout = io.Discard
|
||||
return runRebuildIndex(context.TODO(), RepairIndexOptions{}, gopts, term)
|
||||
})
|
||||
}))
|
||||
}
|
||||
|
||||
@ -126,12 +129,13 @@ func TestRebuildIndexFailsOnAppendOnly(t *testing.T) {
|
||||
rtest.SetupTarTestFixture(t, env.base, datafile)
|
||||
|
||||
err := withRestoreGlobalOptions(func() error {
|
||||
globalOptions.stdout = io.Discard
|
||||
|
||||
env.gopts.backendTestHook = func(r backend.Backend) (backend.Backend, error) {
|
||||
return &appendOnlyBackend{r}, nil
|
||||
}
|
||||
return runRebuildIndex(context.TODO(), RepairIndexOptions{}, env.gopts)
|
||||
return withTermStatus(env.gopts, func(ctx context.Context, term *termstatus.Terminal) error {
|
||||
globalOptions.stdout = io.Discard
|
||||
return runRebuildIndex(context.TODO(), RepairIndexOptions{}, env.gopts, term)
|
||||
})
|
||||
})
|
||||
|
||||
if err == nil {
|
||||
|
@ -58,14 +58,14 @@ func runRepairPacks(ctx context.Context, gopts GlobalOptions, term *termstatus.T
|
||||
}
|
||||
defer unlock()
|
||||
|
||||
bar := newIndexProgress(gopts.Quiet, gopts.JSON)
|
||||
printer := newTerminalProgressPrinter(gopts.verbosity, term)
|
||||
|
||||
bar := newIndexTerminalProgress(gopts.Quiet, gopts.JSON, term)
|
||||
err = repo.LoadIndex(ctx, bar)
|
||||
if err != nil {
|
||||
return errors.Fatalf("%s", err)
|
||||
}
|
||||
|
||||
printer := newTerminalProgressPrinter(gopts.verbosity, term)
|
||||
|
||||
printer.P("saving backup copies of pack files to current folder")
|
||||
for id := range ids {
|
||||
f, err := os.OpenFile("pack-"+id.String(), os.O_WRONLY|os.O_CREATE|os.O_EXCL, 0o666)
|
||||
|
@ -1,41 +0,0 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/restic/restic/internal/restic"
|
||||
)
|
||||
|
||||
// DeleteFiles deletes the given fileList of fileType in parallel
|
||||
// it will print a warning if there is an error, but continue deleting the remaining files
|
||||
func DeleteFiles(ctx context.Context, gopts GlobalOptions, repo restic.Repository, fileList restic.IDSet, fileType restic.FileType) {
|
||||
_ = deleteFiles(ctx, gopts, true, repo, fileList, fileType)
|
||||
}
|
||||
|
||||
// DeleteFilesChecked deletes the given fileList of fileType in parallel
|
||||
// if an error occurs, it will cancel and return this error
|
||||
func DeleteFilesChecked(ctx context.Context, gopts GlobalOptions, repo restic.Repository, fileList restic.IDSet, fileType restic.FileType) error {
|
||||
return deleteFiles(ctx, gopts, false, repo, fileList, fileType)
|
||||
}
|
||||
|
||||
// deleteFiles deletes the given fileList of fileType in parallel
|
||||
// if ignoreError=true, it will print a warning if there was an error, else it will abort.
|
||||
func deleteFiles(ctx context.Context, gopts GlobalOptions, ignoreError bool, repo restic.Repository, fileList restic.IDSet, fileType restic.FileType) error {
|
||||
bar := newProgressMax(!gopts.JSON && !gopts.Quiet, 0, "files deleted")
|
||||
defer bar.Done()
|
||||
|
||||
return restic.ParallelRemove(ctx, repo, fileList, fileType, func(id restic.ID, err error) error {
|
||||
if err != nil {
|
||||
if !gopts.JSON {
|
||||
Warnf("unable to remove %v/%v from the repository\n", fileType, id)
|
||||
}
|
||||
if !ignoreError {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if !gopts.JSON && gopts.verbosity > 2 {
|
||||
Verbosef("removed %v/%v\n", fileType, id)
|
||||
}
|
||||
return nil
|
||||
}, bar)
|
||||
}
|
@ -12,6 +12,7 @@ import (
|
||||
"github.com/restic/restic/internal/errors"
|
||||
"github.com/restic/restic/internal/restic"
|
||||
rtest "github.com/restic/restic/internal/test"
|
||||
"github.com/restic/restic/internal/ui/termstatus"
|
||||
)
|
||||
|
||||
func TestCheckRestoreNoLock(t *testing.T) {
|
||||
@ -88,8 +89,12 @@ func TestListOnce(t *testing.T) {
|
||||
testRunPrune(t, env.gopts, pruneOpts)
|
||||
rtest.OK(t, runCheck(context.TODO(), checkOpts, env.gopts, nil))
|
||||
|
||||
rtest.OK(t, runRebuildIndex(context.TODO(), RepairIndexOptions{}, env.gopts))
|
||||
rtest.OK(t, runRebuildIndex(context.TODO(), RepairIndexOptions{ReadAllPacks: true}, env.gopts))
|
||||
rtest.OK(t, withTermStatus(env.gopts, func(ctx context.Context, term *termstatus.Terminal) error {
|
||||
return runRebuildIndex(context.TODO(), RepairIndexOptions{}, env.gopts, term)
|
||||
}))
|
||||
rtest.OK(t, withTermStatus(env.gopts, func(ctx context.Context, term *termstatus.Terminal) error {
|
||||
return runRebuildIndex(context.TODO(), RepairIndexOptions{ReadAllPacks: true}, env.gopts, term)
|
||||
}))
|
||||
}
|
||||
|
||||
type writeToOnly struct {
|
||||
|
@ -1430,7 +1430,7 @@ func TestArchiverSnapshot(t *testing.T) {
|
||||
}
|
||||
TestEnsureSnapshot(t, repo, snapshotID, want)
|
||||
|
||||
checker.TestCheckRepo(t, repo)
|
||||
checker.TestCheckRepo(t, repo, false)
|
||||
|
||||
// check that the snapshot contains the targets with absolute paths
|
||||
for i, target := range sn.Paths {
|
||||
@ -1590,7 +1590,7 @@ func TestArchiverSnapshotSelect(t *testing.T) {
|
||||
}
|
||||
TestEnsureSnapshot(t, repo, snapshotID, want)
|
||||
|
||||
checker.TestCheckRepo(t, repo)
|
||||
checker.TestCheckRepo(t, repo, false)
|
||||
})
|
||||
}
|
||||
}
|
||||
@ -1794,7 +1794,7 @@ func TestArchiverParent(t *testing.T) {
|
||||
t.Logf("second backup saved as %v", secondSnapshotID.Str())
|
||||
t.Logf("testfs: %v", testFS)
|
||||
|
||||
checker.TestCheckRepo(t, repo)
|
||||
checker.TestCheckRepo(t, repo, false)
|
||||
})
|
||||
}
|
||||
}
|
||||
@ -1927,7 +1927,7 @@ func TestArchiverErrorReporting(t *testing.T) {
|
||||
}
|
||||
TestEnsureSnapshot(t, repo, snapshotID, want)
|
||||
|
||||
checker.TestCheckRepo(t, repo)
|
||||
checker.TestCheckRepo(t, repo, false)
|
||||
})
|
||||
}
|
||||
}
|
||||
@ -2288,7 +2288,7 @@ func TestMetadataChanged(t *testing.T) {
|
||||
// make sure the content matches
|
||||
TestEnsureFileContent(context.Background(), t, repo, "testfile", node3, files["testfile"].(TestFile))
|
||||
|
||||
checker.TestCheckRepo(t, repo)
|
||||
checker.TestCheckRepo(t, repo, false)
|
||||
}
|
||||
|
||||
func TestRacyFileSwap(t *testing.T) {
|
||||
|
@ -4,6 +4,7 @@ import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/base64"
|
||||
"fmt"
|
||||
"hash"
|
||||
"io"
|
||||
"net/http"
|
||||
@ -41,7 +42,7 @@ func NewFactory() location.Factory {
|
||||
)
|
||||
}
|
||||
|
||||
var errNotFound = errors.New("not found")
|
||||
var errNotFound = fmt.Errorf("not found")
|
||||
|
||||
const connectionCount = 2
|
||||
|
||||
|
@ -8,7 +8,7 @@ import (
|
||||
)
|
||||
|
||||
// TestCheckRepo runs the checker on repo.
|
||||
func TestCheckRepo(t testing.TB, repo restic.Repository) {
|
||||
func TestCheckRepo(t testing.TB, repo restic.Repository, skipStructure bool) {
|
||||
chkr := New(repo, true)
|
||||
|
||||
hints, errs := chkr.LoadIndex(context.TODO(), nil)
|
||||
@ -33,18 +33,20 @@ func TestCheckRepo(t testing.TB, repo restic.Repository) {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
// structure
|
||||
errChan = make(chan error)
|
||||
go chkr.Structure(context.TODO(), nil, errChan)
|
||||
if !skipStructure {
|
||||
// structure
|
||||
errChan = make(chan error)
|
||||
go chkr.Structure(context.TODO(), nil, errChan)
|
||||
|
||||
for err := range errChan {
|
||||
t.Error(err)
|
||||
}
|
||||
for err := range errChan {
|
||||
t.Error(err)
|
||||
}
|
||||
|
||||
// unused blobs
|
||||
blobs := chkr.UnusedBlobs(context.TODO())
|
||||
if len(blobs) > 0 {
|
||||
t.Errorf("unused blobs found: %v", blobs)
|
||||
// unused blobs
|
||||
blobs := chkr.UnusedBlobs(context.TODO())
|
||||
if len(blobs) > 0 {
|
||||
t.Errorf("unused blobs found: %v", blobs)
|
||||
}
|
||||
}
|
||||
|
||||
// read data
|
||||
|
616
internal/repository/prune.go
Normal file
616
internal/repository/prune.go
Normal file
@ -0,0 +1,616 @@
|
||||
package repository
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"math"
|
||||
"sort"
|
||||
|
||||
"github.com/restic/restic/internal/errors"
|
||||
"github.com/restic/restic/internal/index"
|
||||
"github.com/restic/restic/internal/pack"
|
||||
"github.com/restic/restic/internal/restic"
|
||||
"github.com/restic/restic/internal/ui/progress"
|
||||
)
|
||||
|
||||
var ErrIndexIncomplete = errors.Fatal("index is not complete")
|
||||
var ErrPacksMissing = errors.Fatal("packs from index missing in repo")
|
||||
var ErrSizeNotMatching = errors.Fatal("pack size does not match calculated size from index")
|
||||
|
||||
// PruneOptions collects all options for the cleanup command.
|
||||
type PruneOptions struct {
|
||||
DryRun bool
|
||||
UnsafeRecovery bool
|
||||
|
||||
MaxUnusedBytes func(used uint64) (unused uint64) // calculates the number of unused bytes after repacking, according to MaxUnused
|
||||
MaxRepackBytes uint64
|
||||
|
||||
RepackCachableOnly bool
|
||||
RepackSmall bool
|
||||
RepackUncompressed bool
|
||||
}
|
||||
|
||||
type PruneStats struct {
|
||||
Blobs struct {
|
||||
Used uint
|
||||
Duplicate uint
|
||||
Unused uint
|
||||
Remove uint
|
||||
Repack uint
|
||||
Repackrm uint
|
||||
}
|
||||
Size struct {
|
||||
Used uint64
|
||||
Duplicate uint64
|
||||
Unused uint64
|
||||
Remove uint64
|
||||
Repack uint64
|
||||
Repackrm uint64
|
||||
Unref uint64
|
||||
Uncompressed uint64
|
||||
}
|
||||
Packs struct {
|
||||
Used uint
|
||||
Unused uint
|
||||
PartlyUsed uint
|
||||
Unref uint
|
||||
Keep uint
|
||||
Repack uint
|
||||
Remove uint
|
||||
}
|
||||
}
|
||||
|
||||
type PrunePlan struct {
|
||||
removePacksFirst restic.IDSet // packs to remove first (unreferenced packs)
|
||||
repackPacks restic.IDSet // packs to repack
|
||||
keepBlobs restic.CountedBlobSet // blobs to keep during repacking
|
||||
removePacks restic.IDSet // packs to remove
|
||||
ignorePacks restic.IDSet // packs to ignore when rebuilding the index
|
||||
|
||||
repo restic.Repository
|
||||
stats PruneStats
|
||||
opts PruneOptions
|
||||
}
|
||||
|
||||
type packInfo struct {
|
||||
usedBlobs uint
|
||||
unusedBlobs uint
|
||||
usedSize uint64
|
||||
unusedSize uint64
|
||||
tpe restic.BlobType
|
||||
uncompressed bool
|
||||
}
|
||||
|
||||
type packInfoWithID struct {
|
||||
ID restic.ID
|
||||
packInfo
|
||||
mustCompress bool
|
||||
}
|
||||
|
||||
// PlanPrune selects which files to rewrite and which to delete and which blobs to keep.
|
||||
// Also some summary statistics are returned.
|
||||
func PlanPrune(ctx context.Context, opts PruneOptions, repo restic.Repository, getUsedBlobs func(ctx context.Context, repo restic.Repository) (usedBlobs restic.CountedBlobSet, err error), printer progress.Printer) (*PrunePlan, error) {
|
||||
var stats PruneStats
|
||||
|
||||
if opts.UnsafeRecovery {
|
||||
// prevent repacking data to make sure users cannot get stuck.
|
||||
opts.MaxRepackBytes = 0
|
||||
}
|
||||
if repo.Connections() < 2 {
|
||||
return nil, fmt.Errorf("prune requires a backend connection limit of at least two")
|
||||
}
|
||||
if repo.Config().Version < 2 && opts.RepackUncompressed {
|
||||
return nil, fmt.Errorf("compression requires at least repository format version 2")
|
||||
}
|
||||
|
||||
usedBlobs, err := getUsedBlobs(ctx, repo)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
printer.P("searching used packs...\n")
|
||||
keepBlobs, indexPack, err := packInfoFromIndex(ctx, repo.Index(), usedBlobs, &stats, printer)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
printer.P("collecting packs for deletion and repacking\n")
|
||||
plan, err := decidePackAction(ctx, opts, repo, indexPack, &stats, printer)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if len(plan.repackPacks) != 0 {
|
||||
blobCount := keepBlobs.Len()
|
||||
// when repacking, we do not want to keep blobs which are
|
||||
// already contained in kept packs, so delete them from keepBlobs
|
||||
repo.Index().Each(ctx, func(blob restic.PackedBlob) {
|
||||
if plan.removePacks.Has(blob.PackID) || plan.repackPacks.Has(blob.PackID) {
|
||||
return
|
||||
}
|
||||
keepBlobs.Delete(blob.BlobHandle)
|
||||
})
|
||||
|
||||
if keepBlobs.Len() < blobCount/2 {
|
||||
// replace with copy to shrink map to necessary size if there's a chance to benefit
|
||||
keepBlobs = keepBlobs.Copy()
|
||||
}
|
||||
} else {
|
||||
// keepBlobs is only needed if packs are repacked
|
||||
keepBlobs = nil
|
||||
}
|
||||
plan.keepBlobs = keepBlobs
|
||||
|
||||
plan.repo = repo
|
||||
plan.stats = stats
|
||||
plan.opts = opts
|
||||
|
||||
return &plan, nil
|
||||
}
|
||||
|
||||
func packInfoFromIndex(ctx context.Context, idx restic.MasterIndex, usedBlobs restic.CountedBlobSet, stats *PruneStats, printer progress.Printer) (restic.CountedBlobSet, map[restic.ID]packInfo, error) {
|
||||
// iterate over all blobs in index to find out which blobs are duplicates
|
||||
// The counter in usedBlobs describes how many instances of the blob exist in the repository index
|
||||
// Thus 0 == blob is missing, 1 == blob exists once, >= 2 == duplicates exist
|
||||
idx.Each(ctx, func(blob restic.PackedBlob) {
|
||||
bh := blob.BlobHandle
|
||||
count, ok := usedBlobs[bh]
|
||||
if ok {
|
||||
if count < math.MaxUint8 {
|
||||
// don't overflow, but saturate count at 255
|
||||
// this can lead to a non-optimal pack selection, but won't cause
|
||||
// problems otherwise
|
||||
count++
|
||||
}
|
||||
|
||||
usedBlobs[bh] = count
|
||||
}
|
||||
})
|
||||
|
||||
// Check if all used blobs have been found in index
|
||||
missingBlobs := restic.NewBlobSet()
|
||||
for bh, count := range usedBlobs {
|
||||
if count == 0 {
|
||||
// blob does not exist in any pack files
|
||||
missingBlobs.Insert(bh)
|
||||
}
|
||||
}
|
||||
|
||||
if len(missingBlobs) != 0 {
|
||||
printer.E("%v not found in the index\n\n"+
|
||||
"Integrity check failed: Data seems to be missing.\n"+
|
||||
"Will not start prune to prevent (additional) data loss!\n"+
|
||||
"Please report this error (along with the output of the 'prune' run) at\n"+
|
||||
"https://github.com/restic/restic/issues/new/choose\n", missingBlobs)
|
||||
return nil, nil, ErrIndexIncomplete
|
||||
}
|
||||
|
||||
indexPack := make(map[restic.ID]packInfo)
|
||||
|
||||
// save computed pack header size
|
||||
for pid, hdrSize := range pack.Size(ctx, idx, true) {
|
||||
// initialize tpe with NumBlobTypes to indicate it's not set
|
||||
indexPack[pid] = packInfo{tpe: restic.NumBlobTypes, usedSize: uint64(hdrSize)}
|
||||
}
|
||||
|
||||
hasDuplicates := false
|
||||
// iterate over all blobs in index to generate packInfo
|
||||
idx.Each(ctx, func(blob restic.PackedBlob) {
|
||||
ip := indexPack[blob.PackID]
|
||||
|
||||
// Set blob type if not yet set
|
||||
if ip.tpe == restic.NumBlobTypes {
|
||||
ip.tpe = blob.Type
|
||||
}
|
||||
|
||||
// mark mixed packs with "Invalid blob type"
|
||||
if ip.tpe != blob.Type {
|
||||
ip.tpe = restic.InvalidBlob
|
||||
}
|
||||
|
||||
bh := blob.BlobHandle
|
||||
size := uint64(blob.Length)
|
||||
dupCount := usedBlobs[bh]
|
||||
switch {
|
||||
case dupCount >= 2:
|
||||
hasDuplicates = true
|
||||
// mark as unused for now, we will later on select one copy
|
||||
ip.unusedSize += size
|
||||
ip.unusedBlobs++
|
||||
|
||||
// count as duplicate, will later on change one copy to be counted as used
|
||||
stats.Size.Duplicate += size
|
||||
stats.Blobs.Duplicate++
|
||||
case dupCount == 1: // used blob, not duplicate
|
||||
ip.usedSize += size
|
||||
ip.usedBlobs++
|
||||
|
||||
stats.Size.Used += size
|
||||
stats.Blobs.Used++
|
||||
default: // unused blob
|
||||
ip.unusedSize += size
|
||||
ip.unusedBlobs++
|
||||
|
||||
stats.Size.Unused += size
|
||||
stats.Blobs.Unused++
|
||||
}
|
||||
if !blob.IsCompressed() {
|
||||
ip.uncompressed = true
|
||||
}
|
||||
// update indexPack
|
||||
indexPack[blob.PackID] = ip
|
||||
})
|
||||
|
||||
// if duplicate blobs exist, those will be set to either "used" or "unused":
|
||||
// - mark only one occurrence of duplicate blobs as used
|
||||
// - if there are already some used blobs in a pack, possibly mark duplicates in this pack as "used"
|
||||
// - if there are no used blobs in a pack, possibly mark duplicates as "unused"
|
||||
if hasDuplicates {
|
||||
// iterate again over all blobs in index (this is pretty cheap, all in-mem)
|
||||
idx.Each(ctx, func(blob restic.PackedBlob) {
|
||||
bh := blob.BlobHandle
|
||||
count, ok := usedBlobs[bh]
|
||||
// skip non-duplicate, aka. normal blobs
|
||||
// count == 0 is used to mark that this was a duplicate blob with only a single occurrence remaining
|
||||
if !ok || count == 1 {
|
||||
return
|
||||
}
|
||||
|
||||
ip := indexPack[blob.PackID]
|
||||
size := uint64(blob.Length)
|
||||
switch {
|
||||
case ip.usedBlobs > 0, count == 0:
|
||||
// other used blobs in pack or "last" occurrence -> transition to used
|
||||
ip.usedSize += size
|
||||
ip.usedBlobs++
|
||||
ip.unusedSize -= size
|
||||
ip.unusedBlobs--
|
||||
// same for the global statistics
|
||||
stats.Size.Used += size
|
||||
stats.Blobs.Used++
|
||||
stats.Size.Duplicate -= size
|
||||
stats.Blobs.Duplicate--
|
||||
// let other occurrences remain marked as unused
|
||||
usedBlobs[bh] = 1
|
||||
default:
|
||||
// remain unused and decrease counter
|
||||
count--
|
||||
if count == 1 {
|
||||
// setting count to 1 would lead to forgetting that this blob had duplicates
|
||||
// thus use the special value zero. This will select the last instance of the blob for keeping.
|
||||
count = 0
|
||||
}
|
||||
usedBlobs[bh] = count
|
||||
}
|
||||
// update indexPack
|
||||
indexPack[blob.PackID] = ip
|
||||
})
|
||||
}
|
||||
|
||||
// Sanity check. If no duplicates exist, all blobs have value 1. After handling
|
||||
// duplicates, this also applies to duplicates.
|
||||
for _, count := range usedBlobs {
|
||||
if count != 1 {
|
||||
panic("internal error during blob selection")
|
||||
}
|
||||
}
|
||||
|
||||
return usedBlobs, indexPack, nil
|
||||
}
|
||||
|
||||
func decidePackAction(ctx context.Context, opts PruneOptions, repo restic.Repository, indexPack map[restic.ID]packInfo, stats *PruneStats, printer progress.Printer) (PrunePlan, error) {
|
||||
removePacksFirst := restic.NewIDSet()
|
||||
removePacks := restic.NewIDSet()
|
||||
repackPacks := restic.NewIDSet()
|
||||
|
||||
var repackCandidates []packInfoWithID
|
||||
var repackSmallCandidates []packInfoWithID
|
||||
repoVersion := repo.Config().Version
|
||||
// only repack very small files by default
|
||||
targetPackSize := repo.PackSize() / 25
|
||||
if opts.RepackSmall {
|
||||
// consider files with at least 80% of the target size as large enough
|
||||
targetPackSize = repo.PackSize() / 5 * 4
|
||||
}
|
||||
|
||||
// loop over all packs and decide what to do
|
||||
bar := printer.NewCounter("packs processed")
|
||||
bar.SetMax(uint64(len(indexPack)))
|
||||
err := repo.List(ctx, restic.PackFile, func(id restic.ID, packSize int64) error {
|
||||
p, ok := indexPack[id]
|
||||
if !ok {
|
||||
// Pack was not referenced in index and is not used => immediately remove!
|
||||
printer.V("will remove pack %v as it is unused and not indexed\n", id.Str())
|
||||
removePacksFirst.Insert(id)
|
||||
stats.Size.Unref += uint64(packSize)
|
||||
return nil
|
||||
}
|
||||
|
||||
if p.unusedSize+p.usedSize != uint64(packSize) && p.usedBlobs != 0 {
|
||||
// Pack size does not fit and pack is needed => error
|
||||
// If the pack is not needed, this is no error, the pack can
|
||||
// and will be simply removed, see below.
|
||||
printer.E("pack %s: calculated size %d does not match real size %d\nRun 'restic repair index'.\n",
|
||||
id.Str(), p.unusedSize+p.usedSize, packSize)
|
||||
return ErrSizeNotMatching
|
||||
}
|
||||
|
||||
// statistics
|
||||
switch {
|
||||
case p.usedBlobs == 0:
|
||||
stats.Packs.Unused++
|
||||
case p.unusedBlobs == 0:
|
||||
stats.Packs.Used++
|
||||
default:
|
||||
stats.Packs.PartlyUsed++
|
||||
}
|
||||
|
||||
if p.uncompressed {
|
||||
stats.Size.Uncompressed += p.unusedSize + p.usedSize
|
||||
}
|
||||
mustCompress := false
|
||||
if repoVersion >= 2 {
|
||||
// repo v2: always repack tree blobs if uncompressed
|
||||
// compress data blobs if requested
|
||||
mustCompress = (p.tpe == restic.TreeBlob || opts.RepackUncompressed) && p.uncompressed
|
||||
}
|
||||
|
||||
// decide what to do
|
||||
switch {
|
||||
case p.usedBlobs == 0:
|
||||
// All blobs in pack are no longer used => remove pack!
|
||||
removePacks.Insert(id)
|
||||
stats.Blobs.Remove += p.unusedBlobs
|
||||
stats.Size.Remove += p.unusedSize
|
||||
|
||||
case opts.RepackCachableOnly && p.tpe == restic.DataBlob:
|
||||
// if this is a data pack and --repack-cacheable-only is set => keep pack!
|
||||
stats.Packs.Keep++
|
||||
|
||||
case p.unusedBlobs == 0 && p.tpe != restic.InvalidBlob && !mustCompress:
|
||||
if packSize >= int64(targetPackSize) {
|
||||
// All blobs in pack are used and not mixed => keep pack!
|
||||
stats.Packs.Keep++
|
||||
} else {
|
||||
repackSmallCandidates = append(repackSmallCandidates, packInfoWithID{ID: id, packInfo: p, mustCompress: mustCompress})
|
||||
}
|
||||
|
||||
default:
|
||||
// all other packs are candidates for repacking
|
||||
repackCandidates = append(repackCandidates, packInfoWithID{ID: id, packInfo: p, mustCompress: mustCompress})
|
||||
}
|
||||
|
||||
delete(indexPack, id)
|
||||
bar.Add(1)
|
||||
return nil
|
||||
})
|
||||
bar.Done()
|
||||
if err != nil {
|
||||
return PrunePlan{}, err
|
||||
}
|
||||
|
||||
// At this point indexPacks contains only missing packs!
|
||||
|
||||
// missing packs that are not needed can be ignored
|
||||
ignorePacks := restic.NewIDSet()
|
||||
for id, p := range indexPack {
|
||||
if p.usedBlobs == 0 {
|
||||
ignorePacks.Insert(id)
|
||||
stats.Blobs.Remove += p.unusedBlobs
|
||||
stats.Size.Remove += p.unusedSize
|
||||
delete(indexPack, id)
|
||||
}
|
||||
}
|
||||
|
||||
if len(indexPack) != 0 {
|
||||
printer.E("The index references %d needed pack files which are missing from the repository:\n", len(indexPack))
|
||||
for id := range indexPack {
|
||||
printer.E(" %v\n", id)
|
||||
}
|
||||
return PrunePlan{}, ErrPacksMissing
|
||||
}
|
||||
if len(ignorePacks) != 0 {
|
||||
printer.E("Missing but unneeded pack files are referenced in the index, will be repaired\n")
|
||||
for id := range ignorePacks {
|
||||
printer.E("will forget missing pack file %v\n", id)
|
||||
}
|
||||
}
|
||||
|
||||
if len(repackSmallCandidates) < 10 {
|
||||
// too few small files to be worth the trouble, this also prevents endlessly repacking
|
||||
// if there is just a single pack file below the target size
|
||||
stats.Packs.Keep += uint(len(repackSmallCandidates))
|
||||
} else {
|
||||
repackCandidates = append(repackCandidates, repackSmallCandidates...)
|
||||
}
|
||||
|
||||
// Sort repackCandidates such that packs with highest ratio unused/used space are picked first.
|
||||
// This is equivalent to sorting by unused / total space.
|
||||
// Instead of unused[i] / used[i] > unused[j] / used[j] we use
|
||||
// unused[i] * used[j] > unused[j] * used[i] as uint32*uint32 < uint64
|
||||
// Moreover packs containing trees and too small packs are sorted to the beginning
|
||||
sort.Slice(repackCandidates, func(i, j int) bool {
|
||||
pi := repackCandidates[i].packInfo
|
||||
pj := repackCandidates[j].packInfo
|
||||
switch {
|
||||
case pi.tpe != restic.DataBlob && pj.tpe == restic.DataBlob:
|
||||
return true
|
||||
case pj.tpe != restic.DataBlob && pi.tpe == restic.DataBlob:
|
||||
return false
|
||||
case pi.unusedSize+pi.usedSize < uint64(targetPackSize) && pj.unusedSize+pj.usedSize >= uint64(targetPackSize):
|
||||
return true
|
||||
case pj.unusedSize+pj.usedSize < uint64(targetPackSize) && pi.unusedSize+pi.usedSize >= uint64(targetPackSize):
|
||||
return false
|
||||
}
|
||||
return pi.unusedSize*pj.usedSize > pj.unusedSize*pi.usedSize
|
||||
})
|
||||
|
||||
repack := func(id restic.ID, p packInfo) {
|
||||
repackPacks.Insert(id)
|
||||
stats.Blobs.Repack += p.unusedBlobs + p.usedBlobs
|
||||
stats.Size.Repack += p.unusedSize + p.usedSize
|
||||
stats.Blobs.Repackrm += p.unusedBlobs
|
||||
stats.Size.Repackrm += p.unusedSize
|
||||
if p.uncompressed {
|
||||
stats.Size.Uncompressed -= p.unusedSize + p.usedSize
|
||||
}
|
||||
}
|
||||
|
||||
// calculate limit for number of unused bytes in the repo after repacking
|
||||
maxUnusedSizeAfter := opts.MaxUnusedBytes(stats.Size.Used)
|
||||
|
||||
for _, p := range repackCandidates {
|
||||
reachedUnusedSizeAfter := (stats.Size.Unused-stats.Size.Remove-stats.Size.Repackrm < maxUnusedSizeAfter)
|
||||
reachedRepackSize := stats.Size.Repack+p.unusedSize+p.usedSize >= opts.MaxRepackBytes
|
||||
packIsLargeEnough := p.unusedSize+p.usedSize >= uint64(targetPackSize)
|
||||
|
||||
switch {
|
||||
case reachedRepackSize:
|
||||
stats.Packs.Keep++
|
||||
|
||||
case p.tpe != restic.DataBlob, p.mustCompress:
|
||||
// repacking non-data packs / uncompressed-trees is only limited by repackSize
|
||||
repack(p.ID, p.packInfo)
|
||||
|
||||
case reachedUnusedSizeAfter && packIsLargeEnough:
|
||||
// for all other packs stop repacking if tolerated unused size is reached.
|
||||
stats.Packs.Keep++
|
||||
|
||||
default:
|
||||
repack(p.ID, p.packInfo)
|
||||
}
|
||||
}
|
||||
|
||||
stats.Packs.Unref = uint(len(removePacksFirst))
|
||||
stats.Packs.Repack = uint(len(repackPacks))
|
||||
stats.Packs.Remove = uint(len(removePacks))
|
||||
|
||||
if repo.Config().Version < 2 {
|
||||
// compression not supported for repository format version 1
|
||||
stats.Size.Uncompressed = 0
|
||||
}
|
||||
|
||||
return PrunePlan{removePacksFirst: removePacksFirst,
|
||||
removePacks: removePacks,
|
||||
repackPacks: repackPacks,
|
||||
ignorePacks: ignorePacks,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (plan *PrunePlan) Stats() PruneStats {
|
||||
return plan.stats
|
||||
}
|
||||
|
||||
// Execute does the actual pruning:
|
||||
// - remove unreferenced packs first
|
||||
// - repack given pack files while keeping the given blobs
|
||||
// - rebuild the index while ignoring all files that will be deleted
|
||||
// - delete the files
|
||||
// plan.removePacks and plan.ignorePacks are modified in this function.
|
||||
func (plan *PrunePlan) Execute(ctx context.Context, printer progress.Printer) (err error) {
|
||||
if plan.opts.DryRun {
|
||||
printer.V("Repeated prune dry-runs can report slightly different amounts of data to keep or repack. This is expected behavior.\n\n")
|
||||
if len(plan.removePacksFirst) > 0 {
|
||||
printer.V("Would have removed the following unreferenced packs:\n%v\n\n", plan.removePacksFirst)
|
||||
}
|
||||
printer.V("Would have repacked and removed the following packs:\n%v\n\n", plan.repackPacks)
|
||||
printer.V("Would have removed the following no longer used packs:\n%v\n\n", plan.removePacks)
|
||||
// Always quit here if DryRun was set!
|
||||
return nil
|
||||
}
|
||||
|
||||
repo := plan.repo
|
||||
// make sure the plan can only be used once
|
||||
plan.repo = nil
|
||||
|
||||
// unreferenced packs can be safely deleted first
|
||||
if len(plan.removePacksFirst) != 0 {
|
||||
printer.P("deleting unreferenced packs\n")
|
||||
_ = deleteFiles(ctx, true, repo, plan.removePacksFirst, restic.PackFile, printer)
|
||||
}
|
||||
|
||||
if len(plan.repackPacks) != 0 {
|
||||
printer.P("repacking packs\n")
|
||||
bar := printer.NewCounter("packs repacked")
|
||||
bar.SetMax(uint64(len(plan.repackPacks)))
|
||||
_, err := Repack(ctx, repo, repo, plan.repackPacks, plan.keepBlobs, bar)
|
||||
bar.Done()
|
||||
if err != nil {
|
||||
return errors.Fatal(err.Error())
|
||||
}
|
||||
|
||||
// Also remove repacked packs
|
||||
plan.removePacks.Merge(plan.repackPacks)
|
||||
|
||||
if len(plan.keepBlobs) != 0 {
|
||||
printer.E("%v was not repacked\n\n"+
|
||||
"Integrity check failed.\n"+
|
||||
"Please report this error (along with the output of the 'prune' run) at\n"+
|
||||
"https://github.com/restic/restic/issues/new/choose\n", plan.keepBlobs)
|
||||
return errors.Fatal("internal error: blobs were not repacked")
|
||||
}
|
||||
|
||||
// allow GC of the blob set
|
||||
plan.keepBlobs = nil
|
||||
}
|
||||
|
||||
if len(plan.ignorePacks) == 0 {
|
||||
plan.ignorePacks = plan.removePacks
|
||||
} else {
|
||||
plan.ignorePacks.Merge(plan.removePacks)
|
||||
}
|
||||
|
||||
if plan.opts.UnsafeRecovery {
|
||||
printer.P("deleting index files\n")
|
||||
indexFiles := repo.Index().(*index.MasterIndex).IDs()
|
||||
err = deleteFiles(ctx, false, repo, indexFiles, restic.IndexFile, printer)
|
||||
if err != nil {
|
||||
return errors.Fatalf("%s", err)
|
||||
}
|
||||
} else if len(plan.ignorePacks) != 0 {
|
||||
err = rebuildIndexFiles(ctx, repo, plan.ignorePacks, nil, false, printer)
|
||||
if err != nil {
|
||||
return errors.Fatalf("%s", err)
|
||||
}
|
||||
}
|
||||
|
||||
if len(plan.removePacks) != 0 {
|
||||
printer.P("removing %d old packs\n", len(plan.removePacks))
|
||||
_ = deleteFiles(ctx, true, repo, plan.removePacks, restic.PackFile, printer)
|
||||
}
|
||||
|
||||
if plan.opts.UnsafeRecovery {
|
||||
err = rebuildIndexFiles(ctx, repo, plan.ignorePacks, nil, true, printer)
|
||||
if err != nil {
|
||||
return errors.Fatalf("%s", err)
|
||||
}
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// drop outdated in-memory index
|
||||
repo.ClearIndex()
|
||||
|
||||
printer.P("done\n")
|
||||
return nil
|
||||
}
|
||||
|
||||
// deleteFiles deletes the given fileList of fileType in parallel
|
||||
// if ignoreError=true, it will print a warning if there was an error, else it will abort.
|
||||
func deleteFiles(ctx context.Context, ignoreError bool, repo restic.Repository, fileList restic.IDSet, fileType restic.FileType, printer progress.Printer) error {
|
||||
bar := printer.NewCounter("files deleted")
|
||||
defer bar.Done()
|
||||
|
||||
return restic.ParallelRemove(ctx, repo, fileList, fileType, func(id restic.ID, err error) error {
|
||||
if err != nil {
|
||||
printer.E("unable to remove %v/%v from the repository\n", fileType, id)
|
||||
if !ignoreError {
|
||||
return err
|
||||
}
|
||||
}
|
||||
printer.VV("removed %v/%v\n", fileType, id)
|
||||
return nil
|
||||
}, bar)
|
||||
}
|
105
internal/repository/prune_test.go
Normal file
105
internal/repository/prune_test.go
Normal file
@ -0,0 +1,105 @@
|
||||
package repository_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"math"
|
||||
"testing"
|
||||
|
||||
"github.com/restic/restic/internal/checker"
|
||||
"github.com/restic/restic/internal/repository"
|
||||
"github.com/restic/restic/internal/restic"
|
||||
rtest "github.com/restic/restic/internal/test"
|
||||
"github.com/restic/restic/internal/ui/progress"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
func testPrune(t *testing.T, opts repository.PruneOptions, errOnUnused bool) {
|
||||
repo := repository.TestRepository(t).(*repository.Repository)
|
||||
createRandomBlobs(t, repo, 4, 0.5, true)
|
||||
createRandomBlobs(t, repo, 5, 0.5, true)
|
||||
keep, _ := selectBlobs(t, repo, 0.5)
|
||||
|
||||
var wg errgroup.Group
|
||||
repo.StartPackUploader(context.TODO(), &wg)
|
||||
// duplicate a few blobs to exercise those code paths
|
||||
for blob := range keep {
|
||||
buf, err := repo.LoadBlob(context.TODO(), blob.Type, blob.ID, nil)
|
||||
rtest.OK(t, err)
|
||||
_, _, _, err = repo.SaveBlob(context.TODO(), blob.Type, buf, blob.ID, true)
|
||||
rtest.OK(t, err)
|
||||
}
|
||||
rtest.OK(t, repo.Flush(context.TODO()))
|
||||
|
||||
plan, err := repository.PlanPrune(context.TODO(), opts, repo, func(ctx context.Context, repo restic.Repository) (usedBlobs restic.CountedBlobSet, err error) {
|
||||
return restic.NewCountedBlobSet(keep.List()...), nil
|
||||
}, &progress.NoopPrinter{})
|
||||
rtest.OK(t, err)
|
||||
|
||||
rtest.OK(t, plan.Execute(context.TODO(), &progress.NoopPrinter{}))
|
||||
|
||||
repo = repository.TestOpenBackend(t, repo.Backend()).(*repository.Repository)
|
||||
checker.TestCheckRepo(t, repo, true)
|
||||
|
||||
if errOnUnused {
|
||||
existing := listBlobs(repo)
|
||||
rtest.Assert(t, existing.Equals(keep), "unexpected blobs, wanted %v got %v", keep, existing)
|
||||
}
|
||||
}
|
||||
|
||||
func TestPrune(t *testing.T) {
|
||||
for _, test := range []struct {
|
||||
name string
|
||||
opts repository.PruneOptions
|
||||
errOnUnused bool
|
||||
}{
|
||||
{
|
||||
name: "0",
|
||||
opts: repository.PruneOptions{
|
||||
MaxRepackBytes: math.MaxUint64,
|
||||
MaxUnusedBytes: func(used uint64) (unused uint64) { return 0 },
|
||||
},
|
||||
errOnUnused: true,
|
||||
},
|
||||
{
|
||||
name: "50",
|
||||
opts: repository.PruneOptions{
|
||||
MaxRepackBytes: math.MaxUint64,
|
||||
MaxUnusedBytes: func(used uint64) (unused uint64) { return used / 2 },
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "unlimited",
|
||||
opts: repository.PruneOptions{
|
||||
MaxRepackBytes: math.MaxUint64,
|
||||
MaxUnusedBytes: func(used uint64) (unused uint64) { return math.MaxUint64 },
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "cachableonly",
|
||||
opts: repository.PruneOptions{
|
||||
MaxRepackBytes: math.MaxUint64,
|
||||
MaxUnusedBytes: func(used uint64) (unused uint64) { return used / 20 },
|
||||
RepackCachableOnly: true,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "small",
|
||||
opts: repository.PruneOptions{
|
||||
MaxRepackBytes: math.MaxUint64,
|
||||
MaxUnusedBytes: func(used uint64) (unused uint64) { return math.MaxUint64 },
|
||||
RepackSmall: true,
|
||||
},
|
||||
errOnUnused: true,
|
||||
},
|
||||
} {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
testPrune(t, test.opts, test.errOnUnused)
|
||||
})
|
||||
t.Run(test.name+"-recovery", func(t *testing.T) {
|
||||
opts := test.opts
|
||||
opts.UnsafeRecovery = true
|
||||
// unsafeNoSpaceRecovery does not repack partially used pack files
|
||||
testPrune(t, opts, false)
|
||||
})
|
||||
}
|
||||
}
|
@ -18,7 +18,7 @@ func randomSize(min, max int) int {
|
||||
return rand.Intn(max-min) + min
|
||||
}
|
||||
|
||||
func createRandomBlobs(t testing.TB, repo restic.Repository, blobs int, pData float32) {
|
||||
func createRandomBlobs(t testing.TB, repo restic.Repository, blobs int, pData float32, smallBlobs bool) {
|
||||
var wg errgroup.Group
|
||||
repo.StartPackUploader(context.TODO(), &wg)
|
||||
|
||||
@ -30,7 +30,11 @@ func createRandomBlobs(t testing.TB, repo restic.Repository, blobs int, pData fl
|
||||
|
||||
if rand.Float32() < pData {
|
||||
tpe = restic.DataBlob
|
||||
length = randomSize(10*1024, 1024*1024) // 10KiB to 1MiB of data
|
||||
if smallBlobs {
|
||||
length = randomSize(1*1024, 20*1024) // 1KiB to 20KiB of data
|
||||
} else {
|
||||
length = randomSize(10*1024, 1024*1024) // 10KiB to 1MiB of data
|
||||
}
|
||||
} else {
|
||||
tpe = restic.TreeBlob
|
||||
length = randomSize(1*1024, 20*1024) // 1KiB to 20KiB
|
||||
@ -121,8 +125,12 @@ func selectBlobs(t *testing.T, repo restic.Repository, p float32) (list1, list2
|
||||
}
|
||||
|
||||
func listPacks(t *testing.T, repo restic.Lister) restic.IDSet {
|
||||
return listFiles(t, repo, restic.PackFile)
|
||||
}
|
||||
|
||||
func listFiles(t *testing.T, repo restic.Lister, tpe backend.FileType) restic.IDSet {
|
||||
list := restic.NewIDSet()
|
||||
err := repo.List(context.TODO(), restic.PackFile, func(id restic.ID, size int64) error {
|
||||
err := repo.List(context.TODO(), tpe, func(id restic.ID, size int64) error {
|
||||
list.Insert(id)
|
||||
return nil
|
||||
})
|
||||
@ -166,12 +174,6 @@ func repack(t *testing.T, repo restic.Repository, packs restic.IDSet, blobs rest
|
||||
}
|
||||
}
|
||||
|
||||
func flush(t *testing.T, repo restic.Repository) {
|
||||
if err := repo.Flush(context.TODO()); err != nil {
|
||||
t.Fatalf("repo.SaveIndex() %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func rebuildIndex(t *testing.T, repo restic.Repository) {
|
||||
err := repo.SetIndex(index.NewMasterIndex())
|
||||
rtest.OK(t, err)
|
||||
@ -219,7 +221,9 @@ func testRepack(t *testing.T, version uint) {
|
||||
rand.Seed(seed)
|
||||
t.Logf("rand seed is %v", seed)
|
||||
|
||||
createRandomBlobs(t, repo, 100, 0.7)
|
||||
// add a small amount of blobs twice to create multiple pack files
|
||||
createRandomBlobs(t, repo, 10, 0.7, false)
|
||||
createRandomBlobs(t, repo, 10, 0.7, false)
|
||||
|
||||
packsBefore := listPacks(t, repo)
|
||||
|
||||
@ -233,8 +237,6 @@ func testRepack(t *testing.T, version uint) {
|
||||
packsBefore, packsAfter)
|
||||
}
|
||||
|
||||
flush(t, repo)
|
||||
|
||||
removeBlobs, keepBlobs := selectBlobs(t, repo, 0.2)
|
||||
|
||||
removePacks := findPacksForBlobs(t, repo, removeBlobs)
|
||||
@ -302,8 +304,9 @@ func testRepackCopy(t *testing.T, version uint) {
|
||||
rand.Seed(seed)
|
||||
t.Logf("rand seed is %v", seed)
|
||||
|
||||
createRandomBlobs(t, repo, 100, 0.7)
|
||||
flush(t, repo)
|
||||
// add a small amount of blobs twice to create multiple pack files
|
||||
createRandomBlobs(t, repo, 10, 0.7, false)
|
||||
createRandomBlobs(t, repo, 10, 0.7, false)
|
||||
|
||||
_, keepBlobs := selectBlobs(t, repo, 0.2)
|
||||
copyPacks := findPacksForBlobs(t, repo, keepBlobs)
|
||||
@ -343,7 +346,7 @@ func testRepackWrongBlob(t *testing.T, version uint) {
|
||||
rand.Seed(seed)
|
||||
t.Logf("rand seed is %v", seed)
|
||||
|
||||
createRandomBlobs(t, repo, 5, 0.7)
|
||||
createRandomBlobs(t, repo, 5, 0.7, false)
|
||||
createRandomWrongBlob(t, repo)
|
||||
|
||||
// just keep all blobs, but also rewrite every pack
|
||||
|
129
internal/repository/repair_index.go
Normal file
129
internal/repository/repair_index.go
Normal file
@ -0,0 +1,129 @@
|
||||
package repository
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/restic/restic/internal/index"
|
||||
"github.com/restic/restic/internal/pack"
|
||||
"github.com/restic/restic/internal/restic"
|
||||
"github.com/restic/restic/internal/ui/progress"
|
||||
)
|
||||
|
||||
type RepairIndexOptions struct {
|
||||
ReadAllPacks bool
|
||||
}
|
||||
|
||||
func RepairIndex(ctx context.Context, repo *Repository, opts RepairIndexOptions, printer progress.Printer) error {
|
||||
var obsoleteIndexes restic.IDs
|
||||
packSizeFromList := make(map[restic.ID]int64)
|
||||
packSizeFromIndex := make(map[restic.ID]int64)
|
||||
removePacks := restic.NewIDSet()
|
||||
|
||||
if opts.ReadAllPacks {
|
||||
// get list of old index files but start with empty index
|
||||
err := repo.List(ctx, restic.IndexFile, func(id restic.ID, _ int64) error {
|
||||
obsoleteIndexes = append(obsoleteIndexes, id)
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
printer.P("loading indexes...\n")
|
||||
mi := index.NewMasterIndex()
|
||||
err := index.ForAllIndexes(ctx, repo, repo, func(id restic.ID, idx *index.Index, _ bool, err error) error {
|
||||
if err != nil {
|
||||
printer.E("removing invalid index %v: %v\n", id, err)
|
||||
obsoleteIndexes = append(obsoleteIndexes, id)
|
||||
return nil
|
||||
}
|
||||
|
||||
mi.Insert(idx)
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = mi.MergeFinalIndexes()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = repo.SetIndex(mi)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
packSizeFromIndex = pack.Size(ctx, repo.Index(), false)
|
||||
}
|
||||
|
||||
printer.P("getting pack files to read...\n")
|
||||
err := repo.List(ctx, restic.PackFile, func(id restic.ID, packSize int64) error {
|
||||
size, ok := packSizeFromIndex[id]
|
||||
if !ok || size != packSize {
|
||||
// Pack was not referenced in index or size does not match
|
||||
packSizeFromList[id] = packSize
|
||||
removePacks.Insert(id)
|
||||
}
|
||||
if !ok {
|
||||
printer.E("adding pack file to index %v\n", id)
|
||||
} else if size != packSize {
|
||||
printer.E("reindexing pack file %v with unexpected size %v instead of %v\n", id, packSize, size)
|
||||
}
|
||||
delete(packSizeFromIndex, id)
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for id := range packSizeFromIndex {
|
||||
// forget pack files that are referenced in the index but do not exist
|
||||
// when rebuilding the index
|
||||
removePacks.Insert(id)
|
||||
printer.E("removing not found pack file %v\n", id)
|
||||
}
|
||||
|
||||
if len(packSizeFromList) > 0 {
|
||||
printer.P("reading pack files\n")
|
||||
bar := printer.NewCounter("packs")
|
||||
bar.SetMax(uint64(len(packSizeFromList)))
|
||||
invalidFiles, err := repo.CreateIndexFromPacks(ctx, packSizeFromList, bar)
|
||||
bar.Done()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, id := range invalidFiles {
|
||||
printer.V("skipped incomplete pack file: %v\n", id)
|
||||
}
|
||||
}
|
||||
|
||||
err = rebuildIndexFiles(ctx, repo, removePacks, obsoleteIndexes, false, printer)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// drop outdated in-memory index
|
||||
repo.ClearIndex()
|
||||
return nil
|
||||
}
|
||||
|
||||
func rebuildIndexFiles(ctx context.Context, repo restic.Repository, removePacks restic.IDSet, extraObsolete restic.IDs, skipDeletion bool, printer progress.Printer) error {
|
||||
printer.P("rebuilding index\n")
|
||||
|
||||
bar := printer.NewCounter("packs processed")
|
||||
return repo.Index().Save(ctx, repo, removePacks, extraObsolete, restic.MasterIndexSaveOpts{
|
||||
SaveProgress: bar,
|
||||
DeleteProgress: func() *progress.Counter {
|
||||
return printer.NewCounter("old indexes deleted")
|
||||
},
|
||||
DeleteReport: func(id restic.ID, err error) {
|
||||
if err != nil {
|
||||
printer.VV("failed to remove index %v: %v\n", id.String(), err)
|
||||
} else {
|
||||
printer.VV("removed index %v\n", id.String())
|
||||
}
|
||||
},
|
||||
SkipDeletion: skipDeletion,
|
||||
})
|
||||
}
|
79
internal/repository/repair_index_test.go
Normal file
79
internal/repository/repair_index_test.go
Normal file
@ -0,0 +1,79 @@
|
||||
package repository_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/restic/restic/internal/backend"
|
||||
"github.com/restic/restic/internal/checker"
|
||||
"github.com/restic/restic/internal/repository"
|
||||
"github.com/restic/restic/internal/restic"
|
||||
rtest "github.com/restic/restic/internal/test"
|
||||
"github.com/restic/restic/internal/ui/progress"
|
||||
)
|
||||
|
||||
func listIndex(t *testing.T, repo restic.Lister) restic.IDSet {
|
||||
return listFiles(t, repo, restic.IndexFile)
|
||||
}
|
||||
|
||||
func testRebuildIndex(t *testing.T, readAllPacks bool, damage func(t *testing.T, repo *repository.Repository)) {
|
||||
repo := repository.TestRepository(t).(*repository.Repository)
|
||||
createRandomBlobs(t, repo, 4, 0.5, true)
|
||||
createRandomBlobs(t, repo, 5, 0.5, true)
|
||||
indexes := listIndex(t, repo)
|
||||
t.Logf("old indexes %v", indexes)
|
||||
|
||||
damage(t, repo)
|
||||
|
||||
repo = repository.TestOpenBackend(t, repo.Backend()).(*repository.Repository)
|
||||
rtest.OK(t, repository.RepairIndex(context.TODO(), repo, repository.RepairIndexOptions{
|
||||
ReadAllPacks: readAllPacks,
|
||||
}, &progress.NoopPrinter{}))
|
||||
|
||||
newIndexes := listIndex(t, repo)
|
||||
old := indexes.Intersect(newIndexes)
|
||||
rtest.Assert(t, len(old) == 0, "expected old indexes to be removed, found %v", old)
|
||||
|
||||
checker.TestCheckRepo(t, repo, true)
|
||||
}
|
||||
|
||||
func TestRebuildIndex(t *testing.T) {
|
||||
for _, test := range []struct {
|
||||
name string
|
||||
damage func(t *testing.T, repo *repository.Repository)
|
||||
}{
|
||||
{
|
||||
"valid index",
|
||||
func(t *testing.T, repo *repository.Repository) {},
|
||||
},
|
||||
{
|
||||
"damaged index",
|
||||
func(t *testing.T, repo *repository.Repository) {
|
||||
index := listIndex(t, repo).List()[0]
|
||||
replaceFile(t, repo, backend.Handle{Type: restic.IndexFile, Name: index.String()}, func(b []byte) []byte {
|
||||
b[0] ^= 0xff
|
||||
return b
|
||||
})
|
||||
},
|
||||
},
|
||||
{
|
||||
"missing index",
|
||||
func(t *testing.T, repo *repository.Repository) {
|
||||
index := listIndex(t, repo).List()[0]
|
||||
rtest.OK(t, repo.Backend().Remove(context.TODO(), backend.Handle{Type: restic.IndexFile, Name: index.String()}))
|
||||
},
|
||||
},
|
||||
{
|
||||
"missing pack",
|
||||
func(t *testing.T, repo *repository.Repository) {
|
||||
pack := listPacks(t, repo).List()[0]
|
||||
rtest.OK(t, repo.Backend().Remove(context.TODO(), backend.Handle{Type: restic.PackFile, Name: pack.String()}))
|
||||
},
|
||||
},
|
||||
} {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
testRebuildIndex(t, false, test.damage)
|
||||
testRebuildIndex(t, true, test.damage)
|
||||
})
|
||||
}
|
||||
}
|
@ -60,19 +60,7 @@ func RepairPacks(ctx context.Context, repo restic.Repository, ids restic.IDSet,
|
||||
}
|
||||
|
||||
// remove salvaged packs from index
|
||||
printer.P("rebuilding index")
|
||||
|
||||
bar = printer.NewCounter("packs processed")
|
||||
err = repo.Index().Save(ctx, repo, ids, nil, restic.MasterIndexSaveOpts{
|
||||
SaveProgress: bar,
|
||||
DeleteProgress: func() *progress.Counter {
|
||||
return printer.NewCounter("old indexes deleted")
|
||||
},
|
||||
DeleteReport: func(id restic.ID, _ error) {
|
||||
printer.VV("removed index %v", id.String())
|
||||
},
|
||||
})
|
||||
|
||||
err = rebuildIndexFiles(ctx, repo, ids, nil, false, printer)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -109,7 +109,7 @@ func testRepairBrokenPack(t *testing.T, version uint) {
|
||||
rand.Seed(seed)
|
||||
t.Logf("rand seed is %v", seed)
|
||||
|
||||
createRandomBlobs(t, repo, 5, 0.7)
|
||||
createRandomBlobs(t, repo, 5, 0.7, true)
|
||||
packsBefore := listPacks(t, repo)
|
||||
blobsBefore := listBlobs(repo)
|
||||
|
||||
|
@ -143,9 +143,6 @@ func (r *Repository) DisableAutoIndexUpdate() {
|
||||
// setConfig assigns the given config and updates the repository parameters accordingly
|
||||
func (r *Repository) setConfig(cfg restic.Config) {
|
||||
r.cfg = cfg
|
||||
if r.cfg.Version >= 2 {
|
||||
r.idx.MarkCompressed()
|
||||
}
|
||||
}
|
||||
|
||||
// Config returns the repository configuration.
|
||||
@ -638,9 +635,21 @@ func (r *Repository) Index() restic.MasterIndex {
|
||||
// SetIndex instructs the repository to use the given index.
|
||||
func (r *Repository) SetIndex(i restic.MasterIndex) error {
|
||||
r.idx = i.(*index.MasterIndex)
|
||||
r.configureIndex()
|
||||
return r.prepareCache()
|
||||
}
|
||||
|
||||
func (r *Repository) ClearIndex() {
|
||||
r.idx = index.NewMasterIndex()
|
||||
r.configureIndex()
|
||||
}
|
||||
|
||||
func (r *Repository) configureIndex() {
|
||||
if r.cfg.Version >= 2 {
|
||||
r.idx.MarkCompressed()
|
||||
}
|
||||
}
|
||||
|
||||
// LoadIndex loads all index files from the backend in parallel and stores them
|
||||
func (r *Repository) LoadIndex(ctx context.Context, p *progress.Counter) error {
|
||||
debug.Log("Loading index")
|
||||
@ -663,6 +672,9 @@ func (r *Repository) LoadIndex(ctx context.Context, p *progress.Counter) error {
|
||||
defer p.Done()
|
||||
}
|
||||
|
||||
// reset in-memory index before loading it from the repository
|
||||
r.ClearIndex()
|
||||
|
||||
err = index.ForAllIndexes(ctx, indexList, r, func(_ restic.ID, idx *index.Index, _ bool, err error) error {
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -242,8 +242,7 @@ func loadIndex(ctx context.Context, repo restic.LoaderUnpacked, id restic.ID) (*
|
||||
}
|
||||
|
||||
func TestRepositoryLoadUnpackedBroken(t *testing.T) {
|
||||
repo, cleanup := repository.TestFromFixture(t, repoFixture)
|
||||
defer cleanup()
|
||||
repo := repository.TestRepository(t)
|
||||
|
||||
data := rtest.Random(23, 12345)
|
||||
id := restic.Hash(data)
|
||||
@ -252,7 +251,7 @@ func TestRepositoryLoadUnpackedBroken(t *testing.T) {
|
||||
data[0] ^= 0xff
|
||||
|
||||
// store broken file
|
||||
err := repo.Backend().Save(context.TODO(), h, backend.NewByteReader(data, nil))
|
||||
err := repo.Backend().Save(context.TODO(), h, backend.NewByteReader(data, repo.Backend().Hasher()))
|
||||
rtest.OK(t, err)
|
||||
|
||||
// without a retry backend this will just return an error that the file is broken
|
||||
|
@ -60,8 +60,11 @@ func TestRepositoryWithBackend(t testing.TB, be backend.Backend, version uint, o
|
||||
t.Fatalf("TestRepository(): new repo failed: %v", err)
|
||||
}
|
||||
|
||||
cfg := restic.TestCreateConfig(t, testChunkerPol, version)
|
||||
err = repo.init(context.TODO(), test.TestPassword, cfg)
|
||||
if version == 0 {
|
||||
version = restic.StableRepoVersion
|
||||
}
|
||||
pol := testChunkerPol
|
||||
err = repo.Init(context.TODO(), version, test.TestPassword, &pol)
|
||||
if err != nil {
|
||||
t.Fatalf("TestRepository(): initialize repo failed: %v", err)
|
||||
}
|
||||
|
@ -51,22 +51,6 @@ func CreateConfig(version uint) (Config, error) {
|
||||
return cfg, nil
|
||||
}
|
||||
|
||||
// TestCreateConfig creates a config for use within tests.
|
||||
func TestCreateConfig(t testing.TB, pol chunker.Pol, version uint) (cfg Config) {
|
||||
cfg.ChunkerPolynomial = pol
|
||||
|
||||
cfg.ID = NewRandomID().String()
|
||||
if version == 0 {
|
||||
version = StableRepoVersion
|
||||
}
|
||||
if version < MinRepoVersion || version > MaxRepoVersion {
|
||||
t.Fatalf("version %d is out of range", version)
|
||||
}
|
||||
cfg.Version = version
|
||||
|
||||
return cfg
|
||||
}
|
||||
|
||||
var checkPolynomial = true
|
||||
var checkPolynomialOnce sync.Once
|
||||
|
||||
|
@ -26,6 +26,7 @@ type Repository interface {
|
||||
|
||||
Index() MasterIndex
|
||||
LoadIndex(context.Context, *progress.Counter) error
|
||||
ClearIndex()
|
||||
SetIndex(MasterIndex) error
|
||||
LookupBlobSize(ID, BlobType) (uint, bool)
|
||||
|
||||
|
@ -45,7 +45,7 @@ func TestCreateSnapshot(t *testing.T) {
|
||||
t.Fatalf("snapshot has zero tree ID")
|
||||
}
|
||||
|
||||
checker.TestCheckRepo(t, repo)
|
||||
checker.TestCheckRepo(t, repo, false)
|
||||
}
|
||||
|
||||
func BenchmarkTestCreateSnapshot(t *testing.B) {
|
||||
|
@ -1,5 +1,7 @@
|
||||
package progress
|
||||
|
||||
import "testing"
|
||||
|
||||
// A Printer can can return a new counter or print messages
|
||||
// at different log levels.
|
||||
// It must be safe to call its methods from concurrent goroutines.
|
||||
@ -28,3 +30,36 @@ func (*NoopPrinter) P(_ string, _ ...interface{}) {}
|
||||
func (*NoopPrinter) V(_ string, _ ...interface{}) {}
|
||||
|
||||
func (*NoopPrinter) VV(_ string, _ ...interface{}) {}
|
||||
|
||||
// TestPrinter prints messages during testing
|
||||
type TestPrinter struct {
|
||||
t testing.TB
|
||||
}
|
||||
|
||||
func NewTestPrinter(t testing.TB) *TestPrinter {
|
||||
return &TestPrinter{
|
||||
t: t,
|
||||
}
|
||||
}
|
||||
|
||||
var _ Printer = (*TestPrinter)(nil)
|
||||
|
||||
func (p *TestPrinter) NewCounter(_ string) *Counter {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *TestPrinter) E(msg string, args ...interface{}) {
|
||||
p.t.Logf("error: "+msg, args...)
|
||||
}
|
||||
|
||||
func (p *TestPrinter) P(msg string, args ...interface{}) {
|
||||
p.t.Logf("print: "+msg, args...)
|
||||
}
|
||||
|
||||
func (p *TestPrinter) V(msg string, args ...interface{}) {
|
||||
p.t.Logf("verbose: "+msg, args...)
|
||||
}
|
||||
|
||||
func (p *TestPrinter) VV(msg string, args ...interface{}) {
|
||||
p.t.Logf("verbose2: "+msg, args...)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user