diff --git a/backend/mem_backend.go b/backend/mem_backend.go index 9ba17a72f..21d78846b 100644 --- a/backend/mem_backend.go +++ b/backend/mem_backend.go @@ -125,6 +125,11 @@ func memCreate(be *MemoryBackend) (Blob, error) { return blob, nil } +// ReadCloser wraps a reader and adds a noop Close method. +func ReadCloser(rd io.Reader) io.ReadCloser { + return readCloser{rd} +} + // readCloser wraps a reader and adds a noop Close method. type readCloser struct { io.Reader diff --git a/checker/checker.go b/checker/checker.go index 7064e56c9..ecd2ed626 100644 --- a/checker/checker.go +++ b/checker/checker.go @@ -1,13 +1,17 @@ package checker import ( + "bytes" "errors" "fmt" + "io/ioutil" "sync" "github.com/restic/restic" "github.com/restic/restic/backend" + "github.com/restic/restic/crypto" "github.com/restic/restic/debug" + "github.com/restic/restic/pack" "github.com/restic/restic/repository" ) @@ -335,7 +339,7 @@ type TreeError struct { } func (e TreeError) Error() string { - return fmt.Sprintf("%v: %d errors", e.ID.String(), len(e.Errors)) + return fmt.Sprintf("tree %v: %v", e.ID.Str(), e.Errors) } type treeJob struct { @@ -634,3 +638,105 @@ func (c *Checker) UnusedBlobs() (blobs backend.IDs) { func (c *Checker) OrphanedPacks() backend.IDs { return c.orphanedPacks } + +// CountPacks returns the number of packs in the repository. +func (c *Checker) CountPacks() uint64 { + return uint64(len(c.packs)) +} + +// checkPack reads a pack and checks the integrity of all blobs. +func checkPack(r *repository.Repository, id backend.ID) error { + debug.Log("Checker.checkPack", "checking pack %v", id.Str()) + rd, err := r.Backend().Get(backend.Data, id.String()) + if err != nil { + return err + } + + buf, err := ioutil.ReadAll(rd) + if err != nil { + return err + } + + err = rd.Close() + if err != nil { + return err + } + + unpacker, err := pack.NewUnpacker(r.Key(), bytes.NewReader(buf)) + if err != nil { + return err + } + + var errs []error + for i, blob := range unpacker.Entries { + debug.Log("Checker.checkPack", " check blob %d: %v", i, blob.ID.Str()) + + plainBuf := make([]byte, blob.Length) + plainBuf, err = crypto.Decrypt(r.Key(), plainBuf, buf[blob.Offset:blob.Offset+blob.Length]) + if err != nil { + debug.Log("Checker.checkPack", " error decrypting blob %v: %v", blob.ID.Str(), err) + errs = append(errs, fmt.Errorf("blob %v: %v", i, err)) + continue + } + + hash := backend.Hash(plainBuf) + if !hash.Equal(blob.ID) { + debug.Log("Checker.checkPack", " ID does not match, want %v, got %v", blob.ID.Str(), hash.Str()) + errs = append(errs, fmt.Errorf("ID does not match, want %v, got %v", blob.ID.Str(), hash.Str())) + continue + } + } + + if len(errs) > 0 { + return fmt.Errorf("pack %v contains %v errors: %v", id.Str(), len(errs), errs) + } + + return nil +} + +// ReadData loads all data from the repository and checks the integrity. +func (c *Checker) ReadData(p *restic.Progress, errChan chan<- error, done <-chan struct{}) { + defer close(errChan) + + p.Start() + defer p.Done() + + worker := func(wg *sync.WaitGroup, in <-chan backend.ID) { + defer wg.Done() + for { + var id backend.ID + var ok bool + + select { + case <-done: + return + case id, ok = <-in: + if !ok { + return + } + } + + err := checkPack(c.repo, id) + p.Report(restic.Stat{Blobs: 1}) + if err == nil { + continue + } + + select { + case <-done: + return + case errChan <- err: + } + } + } + + ch := c.repo.List(backend.Data, done) + + var wg sync.WaitGroup + for i := 0; i < defaultParallelism; i++ { + wg.Add(1) + go worker(&wg, ch) + } + + wg.Wait() +} diff --git a/checker/checker_test.go b/checker/checker_test.go index 37d7f7a7b..99e89a22a 100644 --- a/checker/checker_test.go +++ b/checker/checker_test.go @@ -1,10 +1,13 @@ package checker_test import ( + "io" + "math/rand" "path/filepath" "sort" "testing" + "github.com/restic/restic" "github.com/restic/restic/backend" "github.com/restic/restic/checker" "github.com/restic/restic/repository" @@ -24,13 +27,13 @@ func list(repo *repository.Repository, t backend.Type) (IDs []string) { return IDs } -func checkPacks(chkr *checker.Checker) (errs []error) { +func collectErrors(f func(chan<- error, <-chan struct{})) (errs []error) { done := make(chan struct{}) defer close(done) errChan := make(chan error) - go chkr.Packs(errChan, done) + go f(errChan, done) for err := range errChan { errs = append(errs, err) @@ -39,19 +42,20 @@ func checkPacks(chkr *checker.Checker) (errs []error) { return errs } -func checkStruct(chkr *checker.Checker) (errs []error) { - done := make(chan struct{}) - defer close(done) +func checkPacks(chkr *checker.Checker) []error { + return collectErrors(chkr.Packs) +} - errChan := make(chan error) +func checkStruct(chkr *checker.Checker) []error { + return collectErrors(chkr.Structure) +} - go chkr.Structure(errChan, done) - - for err := range errChan { - errs = append(errs, err) - } - - return errs +func checkData(chkr *checker.Checker) []error { + return collectErrors( + func(errCh chan<- error, done <-chan struct{}) { + chkr.ReadData(nil, errCh, done) + }, + ) } func TestCheckRepo(t *testing.T) { @@ -204,3 +208,102 @@ func TestDuplicatePacksInIndex(t *testing.T) { }) } + +// errorBackend randomly modifies data after reading. +type errorBackend struct { + backend.Backend +} + +func (b errorBackend) Get(t backend.Type, name string) (io.ReadCloser, error) { + rd, err := b.Backend.Get(t, name) + if err != nil { + return rd, err + } + + if t != backend.Data { + return rd, err + } + + return backend.ReadCloser(faultReader{rd}), nil +} + +func (b errorBackend) GetReader(t backend.Type, name string, offset, length uint) (io.ReadCloser, error) { + rd, err := b.Backend.GetReader(t, name, offset, length) + if err != nil { + return rd, err + } + + if t != backend.Data { + return rd, err + } + + return backend.ReadCloser(faultReader{rd}), nil +} + +// induceError flips a bit in the slice. +func induceError(data []byte) { + if rand.Float32() < 0.8 { + return + } + + pos := rand.Intn(len(data)) + data[pos] ^= 1 +} + +// faultReader wraps a reader and randomly modifies data on read. +type faultReader struct { + rd io.Reader +} + +func (f faultReader) Read(p []byte) (int, error) { + n, err := f.rd.Read(p) + if n > 0 { + induceError(p) + } + + return n, err +} + +func TestCheckerModifiedData(t *testing.T) { + be := backend.NewMemoryBackend() + + repo := repository.New(be) + OK(t, repo.Init(TestPassword)) + + arch := restic.NewArchiver(repo) + _, id, err := arch.Snapshot(nil, []string{"."}, nil) + OK(t, err) + t.Logf("archived as %v", id.Str()) + + checkRepo := repository.New(errorBackend{be}) + OK(t, checkRepo.SearchKey(TestPassword)) + + chkr := checker.New(checkRepo) + + hints, errs := chkr.LoadIndex() + if len(errs) > 0 { + t.Fatalf("expected no errors, got %v: %v", len(errs), errs) + } + + if len(hints) > 0 { + t.Errorf("expected no hints, got %v: %v", len(hints), hints) + } + + errFound := false + for _, err := range checkPacks(chkr) { + t.Logf("pack error: %v", err) + } + + for _, err := range checkStruct(chkr) { + t.Logf("struct error: %v", err) + } + + for _, err := range checkData(chkr) { + t.Logf("struct error: %v", err) + errFound = true + } + + if !errFound { + t.Fatal("no error found, checker is broken") + } +} diff --git a/cmd/restic/cmd_check.go b/cmd/restic/cmd_check.go index c3ba1dd83..8c59c2ffe 100644 --- a/cmd/restic/cmd_check.go +++ b/cmd/restic/cmd_check.go @@ -4,7 +4,11 @@ import ( "errors" "fmt" "os" + "time" + "golang.org/x/crypto/ssh/terminal" + + "github.com/restic/restic" "github.com/restic/restic/checker" ) @@ -29,6 +33,37 @@ func (cmd CmdCheck) Usage() string { return "[check-options]" } +func (cmd CmdCheck) newReadProgress(todo restic.Stat) *restic.Progress { + if !cmd.global.ShowProgress() { + return nil + } + + readProgress := restic.NewProgress(time.Second) + + readProgress.OnUpdate = func(s restic.Stat, d time.Duration, ticker bool) { + status := fmt.Sprintf("[%s] %s %d / %d items", + formatDuration(d), + formatPercent(s.Blobs, todo.Blobs), + s.Blobs, todo.Blobs) + + w, _, err := terminal.GetSize(int(os.Stdout.Fd())) + if err == nil { + if len(status) > w { + max := w - len(status) - 4 + status = status[:max] + "... " + } + } + + fmt.Printf("\x1b[2K%s\r", status) + } + + readProgress.OnDone = func(s restic.Stat, d time.Duration, ticker bool) { + fmt.Printf("\nduration: %s\n", formatDuration(d)) + } + + return readProgress +} + func (cmd CmdCheck) Execute(args []string) error { if len(args) != 0 { return errors.New("check has no arguments") @@ -109,6 +144,20 @@ func (cmd CmdCheck) Execute(args []string) error { } } + if cmd.ReadData { + cmd.global.Verbosef("Read all data\n") + + p := cmd.newReadProgress(restic.Stat{Blobs: chkr.CountPacks()}) + errChan := make(chan error) + + go chkr.ReadData(p, errChan, done) + + for err := range errChan { + errorsFound = true + fmt.Fprintf(os.Stderr, "%v\n", err) + } + } + if errorsFound { return errors.New("repository contains errors") }