mirror of
https://github.com/octoleo/restic.git
synced 2024-11-18 02:55:18 +00:00
8b7bf8691d
This is the first commit that removes the (redundant) Get() method of the backend interface. Get(x, y) is equivalent to GetReader(x, y, 0, 0).
743 lines
18 KiB
Go
743 lines
18 KiB
Go
package checker
|
|
|
|
import (
|
|
"bytes"
|
|
"errors"
|
|
"fmt"
|
|
"io/ioutil"
|
|
"sync"
|
|
|
|
"github.com/restic/restic"
|
|
"github.com/restic/restic/backend"
|
|
"github.com/restic/restic/crypto"
|
|
"github.com/restic/restic/debug"
|
|
"github.com/restic/restic/pack"
|
|
"github.com/restic/restic/repository"
|
|
)
|
|
|
|
// Checker runs various checks on a repository. It is advisable to create an
|
|
// exclusive Lock in the repository before running any checks.
|
|
//
|
|
// A Checker only tests for internal errors within the data structures of the
|
|
// repository (e.g. missing blobs), and needs a valid Repository to work on.
|
|
type Checker struct {
|
|
packs map[backend.ID]struct{}
|
|
blobs map[backend.ID]struct{}
|
|
blobRefs struct {
|
|
sync.Mutex
|
|
M map[backend.ID]uint
|
|
}
|
|
indexes map[backend.ID]*repository.Index
|
|
orphanedPacks backend.IDs
|
|
|
|
masterIndex *repository.MasterIndex
|
|
|
|
repo *repository.Repository
|
|
}
|
|
|
|
// New returns a new checker which runs on repo.
|
|
func New(repo *repository.Repository) *Checker {
|
|
c := &Checker{
|
|
packs: make(map[backend.ID]struct{}),
|
|
blobs: make(map[backend.ID]struct{}),
|
|
masterIndex: repository.NewMasterIndex(),
|
|
indexes: make(map[backend.ID]*repository.Index),
|
|
repo: repo,
|
|
}
|
|
|
|
c.blobRefs.M = make(map[backend.ID]uint)
|
|
|
|
return c
|
|
}
|
|
|
|
const defaultParallelism = 40
|
|
|
|
// ErrDuplicatePacks is returned when a pack is found in more than one index.
|
|
type ErrDuplicatePacks struct {
|
|
PackID backend.ID
|
|
Indexes backend.IDSet
|
|
}
|
|
|
|
func (e ErrDuplicatePacks) Error() string {
|
|
return fmt.Sprintf("pack %v contained in several indexes: %v", e.PackID.Str(), e.Indexes)
|
|
}
|
|
|
|
// ErrOldIndexFormat is returned when an index with the old format is
|
|
// found.
|
|
type ErrOldIndexFormat struct {
|
|
backend.ID
|
|
}
|
|
|
|
func (err ErrOldIndexFormat) Error() string {
|
|
return fmt.Sprintf("index %v has old format", err.ID.Str())
|
|
}
|
|
|
|
// LoadIndex loads all index files.
|
|
func (c *Checker) LoadIndex() (hints []error, errs []error) {
|
|
debug.Log("LoadIndex", "Start")
|
|
type indexRes struct {
|
|
Index *repository.Index
|
|
ID string
|
|
}
|
|
|
|
indexCh := make(chan indexRes)
|
|
|
|
worker := func(id backend.ID, done <-chan struct{}) error {
|
|
debug.Log("LoadIndex", "worker got index %v", id)
|
|
idx, err := repository.LoadIndexWithDecoder(c.repo, id.String(), repository.DecodeIndex)
|
|
if err == repository.ErrOldIndexFormat {
|
|
debug.Log("LoadIndex", "index %v has old format", id.Str())
|
|
hints = append(hints, ErrOldIndexFormat{id})
|
|
|
|
idx, err = repository.LoadIndexWithDecoder(c.repo, id.String(), repository.DecodeOldIndex)
|
|
}
|
|
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
select {
|
|
case indexCh <- indexRes{Index: idx, ID: id.String()}:
|
|
case <-done:
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
var perr error
|
|
go func() {
|
|
defer close(indexCh)
|
|
debug.Log("LoadIndex", "start loading indexes in parallel")
|
|
perr = repository.FilesInParallel(c.repo.Backend(), backend.Index, defaultParallelism,
|
|
repository.ParallelWorkFuncParseID(worker))
|
|
debug.Log("LoadIndex", "loading indexes finished, error: %v", perr)
|
|
}()
|
|
|
|
done := make(chan struct{})
|
|
defer close(done)
|
|
|
|
if perr != nil {
|
|
errs = append(errs, perr)
|
|
return hints, errs
|
|
}
|
|
|
|
packToIndex := make(map[backend.ID]backend.IDSet)
|
|
|
|
for res := range indexCh {
|
|
debug.Log("LoadIndex", "process index %v", res.ID)
|
|
idxID, err := backend.ParseID(res.ID)
|
|
if err != nil {
|
|
errs = append(errs, fmt.Errorf("unable to parse as index ID: %v", res.ID))
|
|
continue
|
|
}
|
|
|
|
c.indexes[idxID] = res.Index
|
|
c.masterIndex.Insert(res.Index)
|
|
|
|
debug.Log("LoadIndex", "process blobs")
|
|
cnt := 0
|
|
for blob := range res.Index.Each(done) {
|
|
c.packs[blob.PackID] = struct{}{}
|
|
c.blobs[blob.ID] = struct{}{}
|
|
c.blobRefs.M[blob.ID] = 0
|
|
cnt++
|
|
|
|
if _, ok := packToIndex[blob.PackID]; !ok {
|
|
packToIndex[blob.PackID] = backend.NewIDSet()
|
|
}
|
|
packToIndex[blob.PackID].Insert(idxID)
|
|
}
|
|
|
|
debug.Log("LoadIndex", "%d blobs processed", cnt)
|
|
}
|
|
|
|
debug.Log("LoadIndex", "done, error %v", perr)
|
|
|
|
debug.Log("LoadIndex", "checking for duplicate packs")
|
|
for packID := range c.packs {
|
|
debug.Log("LoadIndex", " check pack %v: contained in %d indexes", packID.Str(), len(packToIndex[packID]))
|
|
if len(packToIndex[packID]) > 1 {
|
|
hints = append(hints, ErrDuplicatePacks{
|
|
PackID: packID,
|
|
Indexes: packToIndex[packID],
|
|
})
|
|
}
|
|
}
|
|
|
|
c.repo.SetIndex(c.masterIndex)
|
|
|
|
return hints, errs
|
|
}
|
|
|
|
// PackError describes an error with a specific pack.
|
|
type PackError struct {
|
|
ID backend.ID
|
|
Orphaned bool
|
|
Err error
|
|
}
|
|
|
|
func (e PackError) Error() string {
|
|
return "pack " + e.ID.String() + ": " + e.Err.Error()
|
|
}
|
|
|
|
func packIDTester(repo *repository.Repository, inChan <-chan backend.ID, errChan chan<- error, wg *sync.WaitGroup, done <-chan struct{}) {
|
|
debug.Log("Checker.testPackID", "worker start")
|
|
defer debug.Log("Checker.testPackID", "worker done")
|
|
|
|
defer wg.Done()
|
|
|
|
for id := range inChan {
|
|
ok, err := repo.Backend().Test(backend.Data, id.String())
|
|
if err != nil {
|
|
err = PackError{ID: id, Err: err}
|
|
} else {
|
|
if !ok {
|
|
err = PackError{ID: id, Err: errors.New("does not exist")}
|
|
}
|
|
}
|
|
|
|
if err != nil {
|
|
debug.Log("Checker.testPackID", "error checking for pack %s: %v", id.Str(), err)
|
|
select {
|
|
case <-done:
|
|
return
|
|
case errChan <- err:
|
|
}
|
|
|
|
continue
|
|
}
|
|
|
|
debug.Log("Checker.testPackID", "pack %s exists", id.Str())
|
|
}
|
|
}
|
|
|
|
// Packs checks that all packs referenced in the index are still available and
|
|
// there are no packs that aren't in an index. errChan is closed after all
|
|
// packs have been checked.
|
|
func (c *Checker) Packs(errChan chan<- error, done <-chan struct{}) {
|
|
defer close(errChan)
|
|
|
|
debug.Log("Checker.Packs", "checking for %d packs", len(c.packs))
|
|
seenPacks := make(map[backend.ID]struct{})
|
|
|
|
var workerWG sync.WaitGroup
|
|
|
|
IDChan := make(chan backend.ID)
|
|
for i := 0; i < defaultParallelism; i++ {
|
|
workerWG.Add(1)
|
|
go packIDTester(c.repo, IDChan, errChan, &workerWG, done)
|
|
}
|
|
|
|
for id := range c.packs {
|
|
seenPacks[id] = struct{}{}
|
|
IDChan <- id
|
|
}
|
|
close(IDChan)
|
|
|
|
debug.Log("Checker.Packs", "waiting for %d workers to terminate", defaultParallelism)
|
|
workerWG.Wait()
|
|
debug.Log("Checker.Packs", "workers terminated")
|
|
|
|
for id := range c.repo.List(backend.Data, done) {
|
|
debug.Log("Checker.Packs", "check data blob %v", id.Str())
|
|
if _, ok := seenPacks[id]; !ok {
|
|
c.orphanedPacks = append(c.orphanedPacks, id)
|
|
select {
|
|
case <-done:
|
|
return
|
|
case errChan <- PackError{ID: id, Orphaned: true, Err: errors.New("not referenced in any index")}:
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Error is an error that occurred while checking a repository.
|
|
type Error struct {
|
|
TreeID *backend.ID
|
|
BlobID *backend.ID
|
|
Err error
|
|
}
|
|
|
|
func (e Error) Error() string {
|
|
if e.BlobID != nil && e.TreeID != nil {
|
|
msg := "tree " + e.TreeID.Str()
|
|
msg += ", blob " + e.BlobID.Str()
|
|
msg += ": " + e.Err.Error()
|
|
return msg
|
|
}
|
|
|
|
if e.TreeID != nil {
|
|
return "tree " + e.TreeID.Str() + ": " + e.Err.Error()
|
|
}
|
|
|
|
return e.Err.Error()
|
|
}
|
|
|
|
func loadTreeFromSnapshot(repo *repository.Repository, id backend.ID) (backend.ID, error) {
|
|
sn, err := restic.LoadSnapshot(repo, id)
|
|
if err != nil {
|
|
debug.Log("Checker.loadTreeFromSnapshot", "error loading snapshot %v: %v", id.Str(), err)
|
|
return backend.ID{}, err
|
|
}
|
|
|
|
if sn.Tree == nil {
|
|
debug.Log("Checker.loadTreeFromSnapshot", "snapshot %v has no tree", id.Str())
|
|
return backend.ID{}, fmt.Errorf("snapshot %v has no tree", id)
|
|
}
|
|
|
|
return *sn.Tree, nil
|
|
}
|
|
|
|
// loadSnapshotTreeIDs loads all snapshots from backend and returns the tree IDs.
|
|
func loadSnapshotTreeIDs(repo *repository.Repository) (backend.IDs, []error) {
|
|
var trees struct {
|
|
IDs backend.IDs
|
|
sync.Mutex
|
|
}
|
|
|
|
var errs struct {
|
|
errs []error
|
|
sync.Mutex
|
|
}
|
|
|
|
snapshotWorker := func(strID string, done <-chan struct{}) error {
|
|
id, err := backend.ParseID(strID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
debug.Log("Checker.Snaphots", "load snapshot %v", id.Str())
|
|
|
|
treeID, err := loadTreeFromSnapshot(repo, id)
|
|
if err != nil {
|
|
errs.Lock()
|
|
errs.errs = append(errs.errs, err)
|
|
errs.Unlock()
|
|
return nil
|
|
}
|
|
|
|
debug.Log("Checker.Snaphots", "snapshot %v has tree %v", id.Str(), treeID.Str())
|
|
trees.Lock()
|
|
trees.IDs = append(trees.IDs, treeID)
|
|
trees.Unlock()
|
|
|
|
return nil
|
|
}
|
|
|
|
err := repository.FilesInParallel(repo.Backend(), backend.Snapshot, defaultParallelism, snapshotWorker)
|
|
if err != nil {
|
|
errs.errs = append(errs.errs, err)
|
|
}
|
|
|
|
return trees.IDs, errs.errs
|
|
}
|
|
|
|
// TreeError collects several errors that occurred while processing a tree.
|
|
type TreeError struct {
|
|
ID backend.ID
|
|
Errors []error
|
|
}
|
|
|
|
func (e TreeError) Error() string {
|
|
return fmt.Sprintf("tree %v: %v", e.ID.Str(), e.Errors)
|
|
}
|
|
|
|
type treeJob struct {
|
|
backend.ID
|
|
error
|
|
*restic.Tree
|
|
}
|
|
|
|
// loadTreeWorker loads trees from repo and sends them to out.
|
|
func loadTreeWorker(repo *repository.Repository,
|
|
in <-chan backend.ID, out chan<- treeJob,
|
|
done <-chan struct{}, wg *sync.WaitGroup) {
|
|
|
|
defer func() {
|
|
debug.Log("checker.loadTreeWorker", "exiting")
|
|
wg.Done()
|
|
}()
|
|
|
|
var (
|
|
inCh = in
|
|
outCh = out
|
|
job treeJob
|
|
)
|
|
|
|
outCh = nil
|
|
for {
|
|
select {
|
|
case <-done:
|
|
return
|
|
|
|
case treeID, ok := <-inCh:
|
|
if !ok {
|
|
return
|
|
}
|
|
debug.Log("checker.loadTreeWorker", "load tree %v", treeID.Str())
|
|
|
|
tree, err := restic.LoadTree(repo, treeID)
|
|
debug.Log("checker.loadTreeWorker", "load tree %v (%v) returned err: %v", tree, treeID.Str(), err)
|
|
job = treeJob{ID: treeID, error: err, Tree: tree}
|
|
outCh = out
|
|
inCh = nil
|
|
|
|
case outCh <- job:
|
|
debug.Log("checker.loadTreeWorker", "sent tree %v", job.ID.Str())
|
|
outCh = nil
|
|
inCh = in
|
|
}
|
|
}
|
|
}
|
|
|
|
// checkTreeWorker checks the trees received and sends out errors to errChan.
|
|
func (c *Checker) checkTreeWorker(in <-chan treeJob, out chan<- error, done <-chan struct{}, wg *sync.WaitGroup) {
|
|
defer func() {
|
|
debug.Log("checker.checkTreeWorker", "exiting")
|
|
wg.Done()
|
|
}()
|
|
|
|
var (
|
|
inCh = in
|
|
outCh = out
|
|
treeError TreeError
|
|
)
|
|
|
|
outCh = nil
|
|
for {
|
|
select {
|
|
case <-done:
|
|
debug.Log("checker.checkTreeWorker", "done channel closed, exiting")
|
|
return
|
|
|
|
case job, ok := <-inCh:
|
|
if !ok {
|
|
debug.Log("checker.checkTreeWorker", "input channel closed, exiting")
|
|
return
|
|
}
|
|
|
|
id := job.ID
|
|
alreadyChecked := false
|
|
c.blobRefs.Lock()
|
|
if c.blobRefs.M[id] > 0 {
|
|
alreadyChecked = true
|
|
}
|
|
c.blobRefs.M[id]++
|
|
debug.Log("checker.checkTreeWorker", "tree %v refcount %d", job.ID.Str(), c.blobRefs.M[id])
|
|
c.blobRefs.Unlock()
|
|
|
|
if alreadyChecked {
|
|
continue
|
|
}
|
|
|
|
debug.Log("checker.checkTreeWorker", "check tree %v (tree %v, err %v)", job.ID.Str(), job.Tree, job.error)
|
|
|
|
var errs []error
|
|
if job.error != nil {
|
|
errs = append(errs, job.error)
|
|
} else {
|
|
errs = c.checkTree(job.ID, job.Tree)
|
|
}
|
|
|
|
if len(errs) > 0 {
|
|
debug.Log("checker.checkTreeWorker", "checked tree %v: %v errors", job.ID.Str(), len(errs))
|
|
treeError = TreeError{ID: job.ID, Errors: errs}
|
|
outCh = out
|
|
inCh = nil
|
|
}
|
|
|
|
case outCh <- treeError:
|
|
debug.Log("checker.checkTreeWorker", "tree %v: sent %d errors", treeError.ID, len(treeError.Errors))
|
|
outCh = nil
|
|
inCh = in
|
|
}
|
|
}
|
|
}
|
|
|
|
func filterTrees(backlog backend.IDs, loaderChan chan<- backend.ID, in <-chan treeJob, out chan<- treeJob, done <-chan struct{}) {
|
|
defer func() {
|
|
debug.Log("checker.filterTrees", "closing output channels")
|
|
close(loaderChan)
|
|
close(out)
|
|
}()
|
|
|
|
var (
|
|
inCh = in
|
|
outCh = out
|
|
loadCh = loaderChan
|
|
job treeJob
|
|
nextTreeID backend.ID
|
|
outstandingLoadTreeJobs = 0
|
|
)
|
|
|
|
outCh = nil
|
|
loadCh = nil
|
|
|
|
for {
|
|
if loadCh == nil && len(backlog) > 0 {
|
|
loadCh = loaderChan
|
|
nextTreeID, backlog = backlog[0], backlog[1:]
|
|
}
|
|
|
|
if loadCh == nil && outCh == nil && outstandingLoadTreeJobs == 0 {
|
|
debug.Log("checker.filterTrees", "backlog is empty, all channels nil, exiting")
|
|
return
|
|
}
|
|
|
|
select {
|
|
case <-done:
|
|
return
|
|
|
|
case loadCh <- nextTreeID:
|
|
outstandingLoadTreeJobs++
|
|
loadCh = nil
|
|
|
|
case j, ok := <-inCh:
|
|
if !ok {
|
|
debug.Log("checker.filterTrees", "input channel closed")
|
|
inCh = nil
|
|
in = nil
|
|
continue
|
|
}
|
|
|
|
outstandingLoadTreeJobs--
|
|
|
|
debug.Log("checker.filterTrees", "input job tree %v", j.ID.Str())
|
|
|
|
var err error
|
|
|
|
if j.error != nil {
|
|
debug.Log("checker.filterTrees", "received job with error: %v (tree %v, ID %v)", j.error, j.Tree, j.ID.Str())
|
|
} else if j.Tree == nil {
|
|
debug.Log("checker.filterTrees", "received job with nil tree pointer: %v (ID %v)", j.error, j.ID.Str())
|
|
err = errors.New("tree is nil and error is nil")
|
|
} else {
|
|
debug.Log("checker.filterTrees", "subtrees for tree %v: %v", j.ID.Str(), j.Tree.Subtrees())
|
|
for _, id := range j.Tree.Subtrees() {
|
|
if id.IsNull() {
|
|
// We do not need to raise this error here, it is
|
|
// checked when the tree is checked. Just make sure
|
|
// that we do not add any null IDs to the backlog.
|
|
debug.Log("checker.filterTrees", "tree %v has nil subtree", j.ID.Str())
|
|
continue
|
|
}
|
|
backlog = append(backlog, id)
|
|
}
|
|
}
|
|
|
|
if err != nil {
|
|
// send a new job with the new error instead of the old one
|
|
j = treeJob{ID: j.ID, error: err}
|
|
}
|
|
|
|
job = j
|
|
outCh = out
|
|
inCh = nil
|
|
|
|
case outCh <- job:
|
|
outCh = nil
|
|
inCh = in
|
|
}
|
|
}
|
|
}
|
|
|
|
// Structure checks that for all snapshots all referenced data blobs and
|
|
// subtrees are available in the index. errChan is closed after all trees have
|
|
// been traversed.
|
|
func (c *Checker) Structure(errChan chan<- error, done <-chan struct{}) {
|
|
defer close(errChan)
|
|
|
|
trees, errs := loadSnapshotTreeIDs(c.repo)
|
|
debug.Log("checker.Structure", "need to check %d trees from snapshots, %d errs returned", len(trees), len(errs))
|
|
|
|
for _, err := range errs {
|
|
select {
|
|
case <-done:
|
|
return
|
|
case errChan <- err:
|
|
}
|
|
}
|
|
|
|
treeIDChan := make(chan backend.ID)
|
|
treeJobChan1 := make(chan treeJob)
|
|
treeJobChan2 := make(chan treeJob)
|
|
|
|
var wg sync.WaitGroup
|
|
for i := 0; i < defaultParallelism; i++ {
|
|
wg.Add(2)
|
|
go loadTreeWorker(c.repo, treeIDChan, treeJobChan1, done, &wg)
|
|
go c.checkTreeWorker(treeJobChan2, errChan, done, &wg)
|
|
}
|
|
|
|
filterTrees(trees, treeIDChan, treeJobChan1, treeJobChan2, done)
|
|
|
|
wg.Wait()
|
|
}
|
|
|
|
func (c *Checker) checkTree(id backend.ID, tree *restic.Tree) (errs []error) {
|
|
debug.Log("Checker.checkTree", "checking tree %v", id.Str())
|
|
|
|
var blobs []backend.ID
|
|
|
|
for _, node := range tree.Nodes {
|
|
switch node.Type {
|
|
case "file":
|
|
for b, blobID := range node.Content {
|
|
if blobID.IsNull() {
|
|
errs = append(errs, Error{TreeID: &id, Err: fmt.Errorf("file %q blob %d has null ID", node.Name, b)})
|
|
continue
|
|
}
|
|
blobs = append(blobs, blobID)
|
|
}
|
|
case "dir":
|
|
if node.Subtree == nil {
|
|
errs = append(errs, Error{TreeID: &id, Err: fmt.Errorf("dir node %q has no subtree", node.Name)})
|
|
continue
|
|
}
|
|
|
|
if node.Subtree.IsNull() {
|
|
errs = append(errs, Error{TreeID: &id, Err: fmt.Errorf("dir node %q subtree id is null", node.Name)})
|
|
continue
|
|
}
|
|
}
|
|
}
|
|
|
|
for _, blobID := range blobs {
|
|
c.blobRefs.Lock()
|
|
c.blobRefs.M[blobID]++
|
|
debug.Log("Checker.checkTree", "blob %v refcount %d", blobID.Str(), c.blobRefs.M[blobID])
|
|
c.blobRefs.Unlock()
|
|
|
|
if _, ok := c.blobs[blobID]; !ok {
|
|
debug.Log("Checker.trees", "tree %v references blob %v which isn't contained in index", id.Str(), blobID.Str())
|
|
|
|
errs = append(errs, Error{TreeID: &id, BlobID: &blobID, Err: errors.New("not found in index")})
|
|
}
|
|
}
|
|
|
|
return errs
|
|
}
|
|
|
|
// UnusedBlobs returns all blobs that have never been referenced.
|
|
func (c *Checker) UnusedBlobs() (blobs backend.IDs) {
|
|
c.blobRefs.Lock()
|
|
defer c.blobRefs.Unlock()
|
|
|
|
debug.Log("Checker.UnusedBlobs", "checking %d blobs", len(c.blobs))
|
|
for id := range c.blobs {
|
|
if c.blobRefs.M[id] == 0 {
|
|
debug.Log("Checker.UnusedBlobs", "blob %v not referenced", id.Str())
|
|
blobs = append(blobs, id)
|
|
}
|
|
}
|
|
|
|
return blobs
|
|
}
|
|
|
|
// OrphanedPacks returns a slice of unused packs (only available after Packs() was run).
|
|
func (c *Checker) OrphanedPacks() backend.IDs {
|
|
return c.orphanedPacks
|
|
}
|
|
|
|
// CountPacks returns the number of packs in the repository.
|
|
func (c *Checker) CountPacks() uint64 {
|
|
return uint64(len(c.packs))
|
|
}
|
|
|
|
// checkPack reads a pack and checks the integrity of all blobs.
|
|
func checkPack(r *repository.Repository, id backend.ID) error {
|
|
debug.Log("Checker.checkPack", "checking pack %v", id.Str())
|
|
rd, err := r.Backend().GetReader(backend.Data, id.String(), 0, 0)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
buf, err := ioutil.ReadAll(rd)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
err = rd.Close()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
unpacker, err := pack.NewUnpacker(r.Key(), bytes.NewReader(buf))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
var errs []error
|
|
for i, blob := range unpacker.Entries {
|
|
debug.Log("Checker.checkPack", " check blob %d: %v", i, blob.ID.Str())
|
|
|
|
plainBuf := make([]byte, blob.Length)
|
|
plainBuf, err = crypto.Decrypt(r.Key(), plainBuf, buf[blob.Offset:blob.Offset+blob.Length])
|
|
if err != nil {
|
|
debug.Log("Checker.checkPack", " error decrypting blob %v: %v", blob.ID.Str(), err)
|
|
errs = append(errs, fmt.Errorf("blob %v: %v", i, err))
|
|
continue
|
|
}
|
|
|
|
hash := backend.Hash(plainBuf)
|
|
if !hash.Equal(blob.ID) {
|
|
debug.Log("Checker.checkPack", " ID does not match, want %v, got %v", blob.ID.Str(), hash.Str())
|
|
errs = append(errs, fmt.Errorf("ID does not match, want %v, got %v", blob.ID.Str(), hash.Str()))
|
|
continue
|
|
}
|
|
}
|
|
|
|
if len(errs) > 0 {
|
|
return fmt.Errorf("pack %v contains %v errors: %v", id.Str(), len(errs), errs)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// ReadData loads all data from the repository and checks the integrity.
|
|
func (c *Checker) ReadData(p *restic.Progress, errChan chan<- error, done <-chan struct{}) {
|
|
defer close(errChan)
|
|
|
|
p.Start()
|
|
defer p.Done()
|
|
|
|
worker := func(wg *sync.WaitGroup, in <-chan backend.ID) {
|
|
defer wg.Done()
|
|
for {
|
|
var id backend.ID
|
|
var ok bool
|
|
|
|
select {
|
|
case <-done:
|
|
return
|
|
case id, ok = <-in:
|
|
if !ok {
|
|
return
|
|
}
|
|
}
|
|
|
|
err := checkPack(c.repo, id)
|
|
p.Report(restic.Stat{Blobs: 1})
|
|
if err == nil {
|
|
continue
|
|
}
|
|
|
|
select {
|
|
case <-done:
|
|
return
|
|
case errChan <- err:
|
|
}
|
|
}
|
|
}
|
|
|
|
ch := c.repo.List(backend.Data, done)
|
|
|
|
var wg sync.WaitGroup
|
|
for i := 0; i < defaultParallelism; i++ {
|
|
wg.Add(1)
|
|
go worker(&wg, ch)
|
|
}
|
|
|
|
wg.Wait()
|
|
}
|