mirror of
https://github.com/octoleo/restic.git
synced 2024-11-24 21:57:41 +00:00
repair pack: extract the repair logic into the repository package
Currently, the cmd/restic package contains a significant amount of code that modifies repository internals. This code should in the mid-term move into the repository package.
This commit is contained in:
parent
d7a50fe739
commit
feeab84204
@ -9,8 +9,8 @@ import (
|
|||||||
"github.com/restic/restic/internal/errors"
|
"github.com/restic/restic/internal/errors"
|
||||||
"github.com/restic/restic/internal/repository"
|
"github.com/restic/restic/internal/repository"
|
||||||
"github.com/restic/restic/internal/restic"
|
"github.com/restic/restic/internal/restic"
|
||||||
|
"github.com/restic/restic/internal/ui/termstatus"
|
||||||
"github.com/spf13/cobra"
|
"github.com/spf13/cobra"
|
||||||
"golang.org/x/sync/errgroup"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
var cmdRepairPacks = &cobra.Command{
|
var cmdRepairPacks = &cobra.Command{
|
||||||
@ -29,7 +29,9 @@ Exit status is 0 if the command was successful, and non-zero if there was any er
|
|||||||
`,
|
`,
|
||||||
DisableAutoGenTag: true,
|
DisableAutoGenTag: true,
|
||||||
RunE: func(cmd *cobra.Command, args []string) error {
|
RunE: func(cmd *cobra.Command, args []string) error {
|
||||||
return runRepairPacks(cmd.Context(), globalOptions, args)
|
term, cancel := setupTermstatus()
|
||||||
|
defer cancel()
|
||||||
|
return runRepairPacks(cmd.Context(), globalOptions, term, args)
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -37,7 +39,7 @@ func init() {
|
|||||||
cmdRepair.AddCommand(cmdRepairPacks)
|
cmdRepair.AddCommand(cmdRepairPacks)
|
||||||
}
|
}
|
||||||
|
|
||||||
func runRepairPacks(ctx context.Context, gopts GlobalOptions, args []string) error {
|
func runRepairPacks(ctx context.Context, gopts GlobalOptions, term *termstatus.Terminal, args []string) error {
|
||||||
// FIXME discuss and add proper feature flag mechanism
|
// FIXME discuss and add proper feature flag mechanism
|
||||||
flag, _ := os.LookupEnv("RESTIC_FEATURES")
|
flag, _ := os.LookupEnv("RESTIC_FEATURES")
|
||||||
if flag != "repair-packs-v1" {
|
if flag != "repair-packs-v1" {
|
||||||
@ -68,21 +70,19 @@ func runRepairPacks(ctx context.Context, gopts GlobalOptions, args []string) err
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
return repairPacks(ctx, gopts, repo, ids)
|
|
||||||
}
|
|
||||||
|
|
||||||
func repairPacks(ctx context.Context, gopts GlobalOptions, repo *repository.Repository, ids restic.IDSet) error {
|
|
||||||
bar := newIndexProgress(gopts.Quiet, gopts.JSON)
|
bar := newIndexProgress(gopts.Quiet, gopts.JSON)
|
||||||
err := repo.LoadIndex(ctx, bar)
|
err = repo.LoadIndex(ctx, bar)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Fatalf("%s", err)
|
return errors.Fatalf("%s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
Warnf("saving backup copies of pack files in current folder\n")
|
printer := newTerminalProgressPrinter(gopts.verbosity, term)
|
||||||
|
|
||||||
|
printer.P("saving backup copies of pack files to current folder")
|
||||||
for id := range ids {
|
for id := range ids {
|
||||||
f, err := os.OpenFile("pack-"+id.String(), os.O_WRONLY|os.O_CREATE|os.O_EXCL, 0o666)
|
f, err := os.OpenFile("pack-"+id.String(), os.O_WRONLY|os.O_CREATE|os.O_EXCL, 0o666)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Fatalf("%s", err)
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
err = repo.Backend().Load(ctx, backend.Handle{Type: restic.PackFile, Name: id.String()}, 0, 0, func(rd io.Reader) error {
|
err = repo.Backend().Load(ctx, backend.Handle{Type: restic.PackFile, Name: id.String()}, 0, 0, func(rd io.Reader) error {
|
||||||
@ -94,66 +94,15 @@ func repairPacks(ctx context.Context, gopts GlobalOptions, repo *repository.Repo
|
|||||||
return err
|
return err
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Fatalf("%s", err)
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
wg, wgCtx := errgroup.WithContext(ctx)
|
err = repository.RepairPacks(ctx, repo, ids, printer)
|
||||||
repo.StartPackUploader(wgCtx, wg)
|
|
||||||
repo.DisableAutoIndexUpdate()
|
|
||||||
|
|
||||||
Warnf("salvaging intact data from specified pack files\n")
|
|
||||||
bar = newProgressMax(!gopts.Quiet, uint64(len(ids)), "pack files")
|
|
||||||
defer bar.Done()
|
|
||||||
|
|
||||||
wg.Go(func() error {
|
|
||||||
// examine all data the indexes have for the pack file
|
|
||||||
for b := range repo.Index().ListPacks(wgCtx, ids) {
|
|
||||||
blobs := b.Blobs
|
|
||||||
if len(blobs) == 0 {
|
|
||||||
Warnf("no blobs found for pack %v\n", b.PackID)
|
|
||||||
bar.Add(1)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
err = repo.LoadBlobsFromPack(wgCtx, b.PackID, blobs, func(blob restic.BlobHandle, buf []byte, err error) error {
|
|
||||||
if err != nil {
|
|
||||||
// Fallback path
|
|
||||||
buf, err = repo.LoadBlob(wgCtx, blob.Type, blob.ID, nil)
|
|
||||||
if err != nil {
|
|
||||||
Warnf("failed to load blob %v: %v\n", blob.ID, err)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
id, _, _, err := repo.SaveBlob(wgCtx, blob.Type, buf, restic.ID{}, true)
|
|
||||||
if !id.Equal(blob.ID) {
|
|
||||||
panic("pack id mismatch during upload")
|
|
||||||
}
|
|
||||||
return err
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
bar.Add(1)
|
|
||||||
}
|
|
||||||
return repo.Flush(wgCtx)
|
|
||||||
})
|
|
||||||
|
|
||||||
if err := wg.Wait(); err != nil {
|
|
||||||
return errors.Fatalf("%s", err)
|
|
||||||
}
|
|
||||||
bar.Done()
|
|
||||||
|
|
||||||
// remove salvaged packs from index
|
|
||||||
err = rebuildIndexFiles(ctx, gopts, repo, ids, nil, false)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Fatalf("%s", err)
|
return errors.Fatalf("%s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// cleanup
|
|
||||||
Warnf("removing salvaged pack files\n")
|
|
||||||
DeleteFiles(ctx, gopts, repo, ids, restic.PackFile)
|
|
||||||
|
|
||||||
Warnf("\nUse `restic repair snapshots --forget` to remove the corrupted data blobs from all snapshots\n")
|
Warnf("\nUse `restic repair snapshots --forget` to remove the corrupted data blobs from all snapshots\n")
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -109,3 +109,21 @@ func newIndexProgress(quiet bool, json bool) *progress.Counter {
|
|||||||
func newIndexTerminalProgress(quiet bool, json bool, term *termstatus.Terminal) *progress.Counter {
|
func newIndexTerminalProgress(quiet bool, json bool, term *termstatus.Terminal) *progress.Counter {
|
||||||
return newTerminalProgressMax(!quiet && !json && stdoutIsTerminal(), 0, "index files loaded", term)
|
return newTerminalProgressMax(!quiet && !json && stdoutIsTerminal(), 0, "index files loaded", term)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type terminalProgressPrinter struct {
|
||||||
|
term *termstatus.Terminal
|
||||||
|
ui.Message
|
||||||
|
show bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *terminalProgressPrinter) NewCounter(description string) *progress.Counter {
|
||||||
|
return newTerminalProgressMax(t.show, 0, description, t.term)
|
||||||
|
}
|
||||||
|
|
||||||
|
func newTerminalProgressPrinter(verbosity uint, term *termstatus.Terminal) progress.Printer {
|
||||||
|
return &terminalProgressPrinter{
|
||||||
|
term: term,
|
||||||
|
Message: *ui.NewMessage(term, verbosity),
|
||||||
|
show: verbosity > 0,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
88
internal/repository/repair_pack.go
Normal file
88
internal/repository/repair_pack.go
Normal file
@ -0,0 +1,88 @@
|
|||||||
|
package repository
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"io"
|
||||||
|
|
||||||
|
"github.com/restic/restic/internal/restic"
|
||||||
|
"github.com/restic/restic/internal/ui/progress"
|
||||||
|
"golang.org/x/sync/errgroup"
|
||||||
|
)
|
||||||
|
|
||||||
|
func RepairPacks(ctx context.Context, repo *Repository, ids restic.IDSet, printer progress.Printer) error {
|
||||||
|
wg, wgCtx := errgroup.WithContext(ctx)
|
||||||
|
repo.StartPackUploader(wgCtx, wg)
|
||||||
|
repo.DisableAutoIndexUpdate()
|
||||||
|
|
||||||
|
printer.P("salvaging intact data from specified pack files")
|
||||||
|
bar := printer.NewCounter("pack files")
|
||||||
|
bar.SetMax(uint64(len(ids)))
|
||||||
|
defer bar.Done()
|
||||||
|
|
||||||
|
wg.Go(func() error {
|
||||||
|
// examine all data the indexes have for the pack file
|
||||||
|
for b := range repo.Index().ListPacks(wgCtx, ids) {
|
||||||
|
blobs := b.Blobs
|
||||||
|
if len(blobs) == 0 {
|
||||||
|
printer.E("no blobs found for pack %v", b.PackID)
|
||||||
|
bar.Add(1)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
err := repo.LoadBlobsFromPack(wgCtx, b.PackID, blobs, func(blob restic.BlobHandle, buf []byte, err error) error {
|
||||||
|
if err != nil {
|
||||||
|
// Fallback path
|
||||||
|
buf, err = repo.LoadBlob(wgCtx, blob.Type, blob.ID, nil)
|
||||||
|
if err != nil {
|
||||||
|
printer.E("failed to load blob %v: %v", blob.ID, err)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
id, _, _, err := repo.SaveBlob(wgCtx, blob.Type, buf, restic.ID{}, true)
|
||||||
|
if !id.Equal(blob.ID) {
|
||||||
|
panic("pack id mismatch during upload")
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
bar.Add(1)
|
||||||
|
}
|
||||||
|
return repo.Flush(wgCtx)
|
||||||
|
})
|
||||||
|
|
||||||
|
err := wg.Wait()
|
||||||
|
bar.Done()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// remove salvaged packs from index
|
||||||
|
printer.P("rebuilding index")
|
||||||
|
|
||||||
|
bar = printer.NewCounter("packs processed")
|
||||||
|
err = repo.Index().Save(ctx, repo, ids, nil, restic.MasterIndexSaveOpts{
|
||||||
|
SaveProgress: bar,
|
||||||
|
DeleteProgress: func() *progress.Counter {
|
||||||
|
return printer.NewCounter("old indexes deleted")
|
||||||
|
},
|
||||||
|
DeleteReport: func(id restic.ID, err error) {
|
||||||
|
printer.VV("removed index %v", id.String())
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// cleanup
|
||||||
|
printer.P("removing salvaged pack files")
|
||||||
|
// if we fail to delete the damaged pack files, then prune will remove them later on
|
||||||
|
bar = printer.NewCounter("files deleted")
|
||||||
|
_ = restic.ParallelRemove(ctx, repo, ids, restic.PackFile, nil, bar)
|
||||||
|
bar.Done()
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
30
internal/ui/progress/printer.go
Normal file
30
internal/ui/progress/printer.go
Normal file
@ -0,0 +1,30 @@
|
|||||||
|
package progress
|
||||||
|
|
||||||
|
// A Printer can can return a new counter or print messages
|
||||||
|
// at different log levels.
|
||||||
|
// It must be safe to call its methods from concurrent goroutines.
|
||||||
|
type Printer interface {
|
||||||
|
NewCounter(description string) *Counter
|
||||||
|
|
||||||
|
E(msg string, args ...interface{})
|
||||||
|
P(msg string, args ...interface{})
|
||||||
|
V(msg string, args ...interface{})
|
||||||
|
VV(msg string, args ...interface{})
|
||||||
|
}
|
||||||
|
|
||||||
|
// NoopPrinter discards all messages
|
||||||
|
type NoopPrinter struct{}
|
||||||
|
|
||||||
|
var _ Printer = (*NoopPrinter)(nil)
|
||||||
|
|
||||||
|
func (*NoopPrinter) NewCounter(description string) *Counter {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (*NoopPrinter) E(msg string, args ...interface{}) {}
|
||||||
|
|
||||||
|
func (*NoopPrinter) P(msg string, args ...interface{}) {}
|
||||||
|
|
||||||
|
func (*NoopPrinter) V(msg string, args ...interface{}) {}
|
||||||
|
|
||||||
|
func (*NoopPrinter) VV(msg string, args ...interface{}) {}
|
Loading…
Reference in New Issue
Block a user