2
2
mirror of https://github.com/octoleo/restic.git synced 2025-01-12 18:31:10 +00:00
restic/internal/checker/checker.go

796 lines
18 KiB
Go
Raw Normal View History

package checker
import (
2017-06-04 09:16:55 +00:00
"context"
"crypto/sha256"
2015-07-11 14:00:49 +00:00
"fmt"
"io"
"os"
2015-07-11 22:25:42 +00:00
"sync"
2017-07-23 12:21:03 +00:00
"github.com/restic/restic/internal/errors"
"github.com/restic/restic/internal/fs"
"github.com/restic/restic/internal/hashing"
2017-07-24 15:42:25 +00:00
"github.com/restic/restic/internal/restic"
2017-07-23 12:21:03 +00:00
"github.com/restic/restic/internal/debug"
"github.com/restic/restic/internal/pack"
"github.com/restic/restic/internal/repository"
)
// Checker runs various checks on a repository. It is advisable to create an
// exclusive Lock in the repository before running any checks.
//
// A Checker only tests for internal errors within the data structures of the
// repository (e.g. missing blobs), and needs a valid Repository to work on.
type Checker struct {
2016-08-31 20:39:36 +00:00
packs restic.IDSet
blobs restic.IDSet
blobRefs struct {
sync.Mutex
2016-08-31 20:39:36 +00:00
M map[restic.ID]uint
}
2016-08-31 20:39:36 +00:00
indexes map[restic.ID]*repository.Index
orphanedPacks restic.IDs
2015-10-12 20:34:12 +00:00
masterIndex *repository.MasterIndex
2016-08-31 21:07:50 +00:00
repo restic.Repository
}
// New returns a new checker which runs on repo.
2016-08-31 21:07:50 +00:00
func New(repo restic.Repository) *Checker {
c := &Checker{
2016-08-31 20:39:36 +00:00
packs: restic.NewIDSet(),
blobs: restic.NewIDSet(),
2015-10-12 20:34:12 +00:00
masterIndex: repository.NewMasterIndex(),
2016-08-31 20:39:36 +00:00
indexes: make(map[restic.ID]*repository.Index),
repo: repo,
}
2016-08-31 20:39:36 +00:00
c.blobRefs.M = make(map[restic.ID]uint)
return c
}
const defaultParallelism = 40
// ErrDuplicatePacks is returned when a pack is found in more than one index.
type ErrDuplicatePacks struct {
2016-08-31 20:39:36 +00:00
PackID restic.ID
Indexes restic.IDSet
}
func (e ErrDuplicatePacks) Error() string {
return fmt.Sprintf("pack %v contained in several indexes: %v", e.PackID.Str(), e.Indexes)
}
// ErrOldIndexFormat is returned when an index with the old format is
// found.
type ErrOldIndexFormat struct {
2016-08-31 20:39:36 +00:00
restic.ID
}
func (err ErrOldIndexFormat) Error() string {
return fmt.Sprintf("index %v has old format", err.ID.Str())
}
// LoadIndex loads all index files.
2017-06-04 09:16:55 +00:00
func (c *Checker) LoadIndex(ctx context.Context) (hints []error, errs []error) {
2016-09-27 20:35:08 +00:00
debug.Log("Start")
type indexRes struct {
Index *repository.Index
err error
ID string
}
indexCh := make(chan indexRes)
2017-06-04 09:16:55 +00:00
worker := func(ctx context.Context, id restic.ID) error {
2016-09-27 20:35:08 +00:00
debug.Log("worker got index %v", id)
2017-06-04 09:16:55 +00:00
idx, err := repository.LoadIndexWithDecoder(ctx, c.repo, id, repository.DecodeIndex)
if errors.Cause(err) == repository.ErrOldIndexFormat {
2016-09-27 20:35:08 +00:00
debug.Log("index %v has old format", id.Str())
hints = append(hints, ErrOldIndexFormat{id})
2017-06-04 09:16:55 +00:00
idx, err = repository.LoadIndexWithDecoder(ctx, c.repo, id, repository.DecodeOldIndex)
}
err = errors.Wrapf(err, "error loading index %v", id.Str())
select {
case indexCh <- indexRes{Index: idx, ID: id.String(), err: err}:
2017-06-04 09:16:55 +00:00
case <-ctx.Done():
}
return nil
}
go func() {
defer close(indexCh)
2016-09-27 20:35:08 +00:00
debug.Log("start loading indexes in parallel")
2017-06-04 09:16:55 +00:00
err := repository.FilesInParallel(ctx, c.repo.Backend(), restic.IndexFile, defaultParallelism,
repository.ParallelWorkFuncParseID(worker))
debug.Log("loading indexes finished, error: %v", err)
if err != nil {
panic(err)
}
}()
done := make(chan struct{})
defer close(done)
2016-08-31 20:39:36 +00:00
packToIndex := make(map[restic.ID]restic.IDSet)
for res := range indexCh {
debug.Log("process index %v, err %v", res.ID, res.err)
if res.err != nil {
errs = append(errs, res.err)
continue
}
2016-08-31 20:39:36 +00:00
idxID, err := restic.ParseID(res.ID)
if err != nil {
errs = append(errs, errors.Errorf("unable to parse as index ID: %v", res.ID))
continue
}
c.indexes[idxID] = res.Index
2015-10-12 20:34:12 +00:00
c.masterIndex.Insert(res.Index)
2016-09-27 20:35:08 +00:00
debug.Log("process blobs")
cnt := 0
2017-06-18 12:45:02 +00:00
for blob := range res.Index.Each(ctx) {
c.packs.Insert(blob.PackID)
c.blobs.Insert(blob.ID)
c.blobRefs.M[blob.ID] = 0
cnt++
if _, ok := packToIndex[blob.PackID]; !ok {
2016-08-31 20:39:36 +00:00
packToIndex[blob.PackID] = restic.NewIDSet()
}
packToIndex[blob.PackID].Insert(idxID)
}
2016-09-27 20:35:08 +00:00
debug.Log("%d blobs processed", cnt)
}
2016-09-27 20:35:08 +00:00
debug.Log("checking for duplicate packs")
for packID := range c.packs {
2016-09-27 20:35:08 +00:00
debug.Log(" check pack %v: contained in %d indexes", packID.Str(), len(packToIndex[packID]))
if len(packToIndex[packID]) > 1 {
hints = append(hints, ErrDuplicatePacks{
PackID: packID,
Indexes: packToIndex[packID],
})
}
}
2015-07-11 14:00:49 +00:00
c.repo.SetIndex(c.masterIndex)
return hints, errs
}
2015-07-11 14:00:49 +00:00
// PackError describes an error with a specific pack.
type PackError struct {
2016-08-31 20:39:36 +00:00
ID restic.ID
Orphaned bool
Err error
2015-07-11 14:00:49 +00:00
}
func (e PackError) Error() string {
return "pack " + e.ID.String() + ": " + e.Err.Error()
2015-07-11 14:00:49 +00:00
}
2017-06-04 09:16:55 +00:00
func packIDTester(ctx context.Context, repo restic.Repository, inChan <-chan restic.ID, errChan chan<- error, wg *sync.WaitGroup) {
2016-09-27 20:35:08 +00:00
debug.Log("worker start")
defer debug.Log("worker done")
2015-07-11 14:00:49 +00:00
2015-07-11 22:25:42 +00:00
defer wg.Done()
for id := range inChan {
h := restic.Handle{Type: restic.DataFile, Name: id.String()}
2017-06-04 09:16:55 +00:00
ok, err := repo.Backend().Test(ctx, h)
2015-07-11 14:00:49 +00:00
if err != nil {
err = PackError{ID: id, Err: err}
2015-07-11 22:25:42 +00:00
} else {
if !ok {
err = PackError{ID: id, Err: errors.New("does not exist")}
2015-07-11 22:25:42 +00:00
}
2015-07-11 14:00:49 +00:00
}
2015-07-11 22:25:42 +00:00
if err != nil {
2016-09-27 20:35:08 +00:00
debug.Log("error checking for pack %s: %v", id.Str(), err)
2015-07-11 22:25:42 +00:00
select {
2017-06-04 09:16:55 +00:00
case <-ctx.Done():
2015-07-11 22:25:42 +00:00
return
case errChan <- err:
}
2015-07-11 14:00:49 +00:00
continue
}
2015-07-11 22:25:42 +00:00
2016-09-27 20:35:08 +00:00
debug.Log("pack %s exists", id.Str())
2015-07-11 14:00:49 +00:00
}
2015-07-11 22:25:42 +00:00
}
// Packs checks that all packs referenced in the index are still available and
// there are no packs that aren't in an index. errChan is closed after all
// packs have been checked.
2017-06-04 09:16:55 +00:00
func (c *Checker) Packs(ctx context.Context, errChan chan<- error) {
defer close(errChan)
2016-09-27 20:35:08 +00:00
debug.Log("checking for %d packs", len(c.packs))
2016-08-31 20:39:36 +00:00
seenPacks := restic.NewIDSet()
2015-07-11 14:00:49 +00:00
2015-07-11 22:25:42 +00:00
var workerWG sync.WaitGroup
2016-08-31 20:39:36 +00:00
IDChan := make(chan restic.ID)
2015-07-11 22:25:42 +00:00
for i := 0; i < defaultParallelism; i++ {
workerWG.Add(1)
2017-06-04 09:16:55 +00:00
go packIDTester(ctx, c.repo, IDChan, errChan, &workerWG)
2015-07-11 22:25:42 +00:00
}
for id := range c.packs {
seenPacks.Insert(id)
2015-07-11 22:25:42 +00:00
IDChan <- id
}
close(IDChan)
2016-09-27 20:35:08 +00:00
debug.Log("waiting for %d workers to terminate", defaultParallelism)
2015-07-11 22:25:42 +00:00
workerWG.Wait()
2016-09-27 20:35:08 +00:00
debug.Log("workers terminated")
2015-07-11 22:25:42 +00:00
2017-06-04 09:16:55 +00:00
for id := range c.repo.List(ctx, restic.DataFile) {
2016-09-27 20:35:08 +00:00
debug.Log("check data blob %v", id.Str())
if !seenPacks.Has(id) {
c.orphanedPacks = append(c.orphanedPacks, id)
select {
2017-06-04 09:16:55 +00:00
case <-ctx.Done():
return
case errChan <- PackError{ID: id, Orphaned: true, Err: errors.New("not referenced in any index")}:
}
2015-07-11 14:00:49 +00:00
}
}
}
// Error is an error that occurred while checking a repository.
type Error struct {
2016-08-31 20:39:36 +00:00
TreeID restic.ID
BlobID restic.ID
2015-07-11 14:00:49 +00:00
Err error
}
func (e Error) Error() string {
if !e.BlobID.IsNull() && !e.TreeID.IsNull() {
2015-10-11 17:25:02 +00:00
msg := "tree " + e.TreeID.Str()
msg += ", blob " + e.BlobID.Str()
2015-07-11 14:00:49 +00:00
msg += ": " + e.Err.Error()
return msg
}
if !e.TreeID.IsNull() {
2015-10-11 17:25:02 +00:00
return "tree " + e.TreeID.Str() + ": " + e.Err.Error()
2015-07-11 14:00:49 +00:00
}
return e.Err.Error()
}
2017-06-04 09:16:55 +00:00
func loadTreeFromSnapshot(ctx context.Context, repo restic.Repository, id restic.ID) (restic.ID, error) {
sn, err := restic.LoadSnapshot(ctx, repo, id)
2015-07-11 14:00:49 +00:00
if err != nil {
2016-09-27 20:35:08 +00:00
debug.Log("error loading snapshot %v: %v", id.Str(), err)
2016-08-31 20:39:36 +00:00
return restic.ID{}, err
2015-07-11 14:00:49 +00:00
}
if sn.Tree == nil {
2016-09-27 20:35:08 +00:00
debug.Log("snapshot %v has no tree", id.Str())
2016-08-31 20:39:36 +00:00
return restic.ID{}, errors.Errorf("snapshot %v has no tree", id)
2015-07-11 14:00:49 +00:00
}
return *sn.Tree, nil
2015-07-11 14:00:49 +00:00
}
// loadSnapshotTreeIDs loads all snapshots from backend and returns the tree IDs.
2017-06-04 09:16:55 +00:00
func loadSnapshotTreeIDs(ctx context.Context, repo restic.Repository) (restic.IDs, []error) {
var trees struct {
2016-08-31 20:39:36 +00:00
IDs restic.IDs
sync.Mutex
}
var errs struct {
errs []error
sync.Mutex
}
2015-07-11 14:00:49 +00:00
2017-06-04 09:16:55 +00:00
snapshotWorker := func(ctx context.Context, strID string) error {
2016-08-31 20:39:36 +00:00
id, err := restic.ParseID(strID)
if err != nil {
return err
}
2015-07-11 14:00:49 +00:00
2016-09-27 20:35:08 +00:00
debug.Log("load snapshot %v", id.Str())
2015-07-11 14:00:49 +00:00
2017-06-04 09:16:55 +00:00
treeID, err := loadTreeFromSnapshot(ctx, repo, id)
2015-07-11 14:00:49 +00:00
if err != nil {
errs.Lock()
errs.errs = append(errs.errs, err)
errs.Unlock()
return nil
2015-07-11 14:00:49 +00:00
}
2016-09-27 20:35:08 +00:00
debug.Log("snapshot %v has tree %v", id.Str(), treeID.Str())
trees.Lock()
trees.IDs = append(trees.IDs, treeID)
trees.Unlock()
return nil
2015-07-11 14:00:49 +00:00
}
2017-06-04 09:16:55 +00:00
err := repository.FilesInParallel(ctx, repo.Backend(), restic.SnapshotFile, defaultParallelism, snapshotWorker)
if err != nil {
errs.errs = append(errs.errs, err)
}
return trees.IDs, errs.errs
}
2015-10-11 17:13:35 +00:00
// TreeError collects several errors that occurred while processing a tree.
type TreeError struct {
2016-08-31 20:39:36 +00:00
ID restic.ID
Errors []error
}
func (e TreeError) Error() string {
return fmt.Sprintf("tree %v: %v", e.ID.Str(), e.Errors)
}
type treeJob struct {
2016-08-31 20:39:36 +00:00
restic.ID
error
*restic.Tree
}
// loadTreeWorker loads trees from repo and sends them to out.
2017-06-04 09:16:55 +00:00
func loadTreeWorker(ctx context.Context, repo restic.Repository,
2016-08-31 20:39:36 +00:00
in <-chan restic.ID, out chan<- treeJob,
2017-06-04 09:16:55 +00:00
wg *sync.WaitGroup) {
defer func() {
2016-09-27 20:35:08 +00:00
debug.Log("exiting")
wg.Done()
}()
var (
inCh = in
outCh = out
job treeJob
)
outCh = nil
for {
select {
2017-06-04 09:16:55 +00:00
case <-ctx.Done():
return
case treeID, ok := <-inCh:
if !ok {
return
}
2016-09-27 20:35:08 +00:00
debug.Log("load tree %v", treeID.Str())
2017-06-04 09:16:55 +00:00
tree, err := repo.LoadTree(ctx, treeID)
2016-09-27 20:35:08 +00:00
debug.Log("load tree %v (%v) returned err: %v", tree, treeID.Str(), err)
job = treeJob{ID: treeID, error: err, Tree: tree}
outCh = out
inCh = nil
case outCh <- job:
2016-09-27 20:35:08 +00:00
debug.Log("sent tree %v", job.ID.Str())
outCh = nil
inCh = in
}
}
2015-07-11 14:00:49 +00:00
}
// checkTreeWorker checks the trees received and sends out errors to errChan.
2017-06-04 09:16:55 +00:00
func (c *Checker) checkTreeWorker(ctx context.Context, in <-chan treeJob, out chan<- error, wg *sync.WaitGroup) {
defer func() {
2016-09-27 20:35:08 +00:00
debug.Log("exiting")
wg.Done()
}()
2015-07-11 14:00:49 +00:00
var (
inCh = in
outCh = out
treeError TreeError
)
2015-07-11 14:00:49 +00:00
outCh = nil
for {
select {
2017-06-04 09:16:55 +00:00
case <-ctx.Done():
2016-09-27 20:35:08 +00:00
debug.Log("done channel closed, exiting")
return
2015-07-11 14:00:49 +00:00
case job, ok := <-inCh:
if !ok {
2016-09-27 20:35:08 +00:00
debug.Log("input channel closed, exiting")
return
}
id := job.ID
alreadyChecked := false
c.blobRefs.Lock()
if c.blobRefs.M[id] > 0 {
alreadyChecked = true
}
c.blobRefs.M[id]++
2016-09-27 20:35:08 +00:00
debug.Log("tree %v refcount %d", job.ID.Str(), c.blobRefs.M[id])
c.blobRefs.Unlock()
if alreadyChecked {
continue
}
2016-09-27 20:35:08 +00:00
debug.Log("check tree %v (tree %v, err %v)", job.ID.Str(), job.Tree, job.error)
2015-10-11 17:13:35 +00:00
var errs []error
if job.error != nil {
errs = append(errs, job.error)
} else {
errs = c.checkTree(job.ID, job.Tree)
}
if len(errs) > 0 {
2016-09-27 20:35:08 +00:00
debug.Log("checked tree %v: %v errors", job.ID.Str(), len(errs))
treeError = TreeError{ID: job.ID, Errors: errs}
outCh = out
inCh = nil
}
case outCh <- treeError:
2016-09-27 20:35:08 +00:00
debug.Log("tree %v: sent %d errors", treeError.ID, len(treeError.Errors))
outCh = nil
inCh = in
2015-07-11 14:00:49 +00:00
}
}
}
2017-06-04 09:16:55 +00:00
func filterTrees(ctx context.Context, backlog restic.IDs, loaderChan chan<- restic.ID, in <-chan treeJob, out chan<- treeJob) {
defer func() {
2016-09-27 20:35:08 +00:00
debug.Log("closing output channels")
close(loaderChan)
close(out)
}()
2015-07-11 14:00:49 +00:00
var (
inCh = in
outCh = out
loadCh = loaderChan
job treeJob
2016-08-31 20:39:36 +00:00
nextTreeID restic.ID
outstandingLoadTreeJobs = 0
)
2015-07-11 14:00:49 +00:00
outCh = nil
loadCh = nil
for {
if loadCh == nil && len(backlog) > 0 {
loadCh = loaderChan
nextTreeID, backlog = backlog[0], backlog[1:]
2015-07-11 14:00:49 +00:00
}
if loadCh == nil && outCh == nil && outstandingLoadTreeJobs == 0 {
2016-09-27 20:35:08 +00:00
debug.Log("backlog is empty, all channels nil, exiting")
return
2015-07-11 14:00:49 +00:00
}
select {
2017-06-04 09:16:55 +00:00
case <-ctx.Done():
return
2015-07-11 14:00:49 +00:00
case loadCh <- nextTreeID:
outstandingLoadTreeJobs++
loadCh = nil
2015-07-11 14:00:49 +00:00
case j, ok := <-inCh:
if !ok {
2016-09-27 20:35:08 +00:00
debug.Log("input channel closed")
inCh = nil
in = nil
continue
2015-07-11 14:00:49 +00:00
}
outstandingLoadTreeJobs--
2016-09-27 20:35:08 +00:00
debug.Log("input job tree %v", j.ID.Str())
2015-10-11 16:45:16 +00:00
var err error
if j.error != nil {
2016-09-27 20:35:08 +00:00
debug.Log("received job with error: %v (tree %v, ID %v)", j.error, j.Tree, j.ID.Str())
2015-10-11 16:45:16 +00:00
} else if j.Tree == nil {
2016-09-27 20:35:08 +00:00
debug.Log("received job with nil tree pointer: %v (ID %v)", j.error, j.ID.Str())
2015-10-11 16:45:16 +00:00
err = errors.New("tree is nil and error is nil")
} else {
2016-09-27 20:35:08 +00:00
debug.Log("subtrees for tree %v: %v", j.ID.Str(), j.Tree.Subtrees())
2015-10-11 16:45:16 +00:00
for _, id := range j.Tree.Subtrees() {
if id.IsNull() {
2015-10-11 17:13:35 +00:00
// We do not need to raise this error here, it is
// checked when the tree is checked. Just make sure
// that we do not add any null IDs to the backlog.
2016-09-27 20:35:08 +00:00
debug.Log("tree %v has nil subtree", j.ID.Str())
2015-10-11 16:45:16 +00:00
continue
}
backlog = append(backlog, id)
}
}
if err != nil {
// send a new job with the new error instead of the old one
j = treeJob{ID: j.ID, error: err}
}
job = j
outCh = out
inCh = nil
case outCh <- job:
2016-09-27 20:35:08 +00:00
debug.Log("tree sent to check: %v", job.ID.Str())
outCh = nil
inCh = in
2015-07-11 14:00:49 +00:00
}
}
}
2015-07-11 14:00:49 +00:00
// Structure checks that for all snapshots all referenced data blobs and
// subtrees are available in the index. errChan is closed after all trees have
// been traversed.
2017-06-04 09:16:55 +00:00
func (c *Checker) Structure(ctx context.Context, errChan chan<- error) {
defer close(errChan)
2017-06-04 09:16:55 +00:00
trees, errs := loadSnapshotTreeIDs(ctx, c.repo)
2016-09-27 20:35:08 +00:00
debug.Log("need to check %d trees from snapshots, %d errs returned", len(trees), len(errs))
2015-07-11 14:00:49 +00:00
for _, err := range errs {
select {
2017-06-04 09:16:55 +00:00
case <-ctx.Done():
return
case errChan <- err:
}
2015-07-11 14:00:49 +00:00
}
2016-08-31 20:39:36 +00:00
treeIDChan := make(chan restic.ID)
treeJobChan1 := make(chan treeJob)
treeJobChan2 := make(chan treeJob)
2015-07-11 14:00:49 +00:00
var wg sync.WaitGroup
for i := 0; i < defaultParallelism; i++ {
wg.Add(2)
2017-06-04 09:16:55 +00:00
go loadTreeWorker(ctx, c.repo, treeIDChan, treeJobChan1, &wg)
go c.checkTreeWorker(ctx, treeJobChan2, errChan, &wg)
2015-07-11 14:00:49 +00:00
}
2017-06-04 09:16:55 +00:00
filterTrees(ctx, trees, treeIDChan, treeJobChan1, treeJobChan2)
wg.Wait()
}
2016-08-31 20:39:36 +00:00
func (c *Checker) checkTree(id restic.ID, tree *restic.Tree) (errs []error) {
2016-09-27 20:35:08 +00:00
debug.Log("checking tree %v", id.Str())
2016-08-31 20:39:36 +00:00
var blobs []restic.ID
for _, node := range tree.Nodes {
2016-09-01 19:20:03 +00:00
switch node.Type {
2015-07-11 14:00:49 +00:00
case "file":
2016-04-10 14:51:16 +00:00
if node.Content == nil {
errs = append(errs, Error{TreeID: id, Err: errors.Errorf("file %q has nil blob list", node.Name)})
2016-04-10 14:51:16 +00:00
}
for b, blobID := range node.Content {
if blobID.IsNull() {
errs = append(errs, Error{TreeID: id, Err: errors.Errorf("file %q blob %d has null ID", node.Name, b)})
continue
}
blobs = append(blobs, blobID)
}
2015-07-11 14:00:49 +00:00
case "dir":
if node.Subtree == nil {
errs = append(errs, Error{TreeID: id, Err: errors.Errorf("dir node %q has no subtree", node.Name)})
2015-07-11 14:00:49 +00:00
continue
}
2015-10-11 16:46:26 +00:00
if node.Subtree.IsNull() {
errs = append(errs, Error{TreeID: id, Err: errors.Errorf("dir node %q subtree id is null", node.Name)})
2015-10-11 16:46:26 +00:00
continue
}
2016-04-10 14:51:16 +00:00
case "symlink", "socket", "chardev", "dev", "fifo":
2016-05-08 21:16:01 +00:00
// nothing to check
2016-04-10 14:51:16 +00:00
default:
2016-09-01 19:20:03 +00:00
errs = append(errs, Error{TreeID: id, Err: errors.Errorf("node %q with invalid type %q", node.Name, node.Type)})
2016-04-10 14:51:16 +00:00
}
if node.Name == "" {
errs = append(errs, Error{TreeID: id, Err: errors.New("node with empty name")})
}
}
for _, blobID := range blobs {
c.blobRefs.Lock()
c.blobRefs.M[blobID]++
2016-09-27 20:35:08 +00:00
debug.Log("blob %v refcount %d", blobID.Str(), c.blobRefs.M[blobID])
c.blobRefs.Unlock()
2015-07-11 14:00:49 +00:00
if !c.blobs.Has(blobID) {
2016-09-27 20:35:08 +00:00
debug.Log("tree %v references blob %v which isn't contained in index", id.Str(), blobID.Str())
errs = append(errs, Error{TreeID: id, BlobID: blobID, Err: errors.New("not found in index")})
2015-07-11 14:00:49 +00:00
}
}
return errs
2015-07-11 14:00:49 +00:00
}
// UnusedBlobs returns all blobs that have never been referenced.
2016-08-31 20:39:36 +00:00
func (c *Checker) UnusedBlobs() (blobs restic.IDs) {
c.blobRefs.Lock()
defer c.blobRefs.Unlock()
2016-09-27 20:35:08 +00:00
debug.Log("checking %d blobs", len(c.blobs))
2015-07-11 14:00:49 +00:00
for id := range c.blobs {
if c.blobRefs.M[id] == 0 {
2016-09-27 20:35:08 +00:00
debug.Log("blob %v not referenced", id.Str())
blobs = append(blobs, id)
2015-07-11 14:00:49 +00:00
}
}
return blobs
}
2015-12-06 16:29:31 +00:00
// CountPacks returns the number of packs in the repository.
func (c *Checker) CountPacks() uint64 {
return uint64(len(c.packs))
}
// checkPack reads a pack and checks the integrity of all blobs.
2017-06-04 09:16:55 +00:00
func checkPack(ctx context.Context, r restic.Repository, id restic.ID) error {
2016-09-27 20:35:08 +00:00
debug.Log("checking pack %v", id.Str())
2016-09-01 19:19:30 +00:00
h := restic.Handle{Type: restic.DataFile, Name: id.String()}
2017-06-04 09:16:55 +00:00
rd, err := r.Backend().Load(ctx, h, 0, 0)
if err != nil {
return err
}
2017-05-10 17:48:22 +00:00
packfile, err := fs.TempFile("", "restic-temp-check-")
if err != nil {
return errors.Wrap(err, "TempFile")
}
defer func() {
packfile.Close()
os.Remove(packfile.Name())
}()
hrd := hashing.NewReader(rd, sha256.New())
size, err := io.Copy(packfile, hrd)
if err != nil {
return errors.Wrap(err, "Copy")
}
if err = rd.Close(); err != nil {
return err
}
hash := restic.IDFromHash(hrd.Sum(nil))
debug.Log("hash for pack %v is %v", id.Str(), hash.Str())
if !hash.Equal(id) {
2016-09-27 20:35:08 +00:00
debug.Log("Pack ID does not match, want %v, got %v", id.Str(), hash.Str())
return errors.Errorf("Pack ID does not match, want %v, got %v", id.Str(), hash.Str())
}
blobs, err := pack.List(r.Key(), packfile, size)
if err != nil {
return err
}
var errs []error
var buf []byte
2016-08-25 19:08:16 +00:00
for i, blob := range blobs {
debug.Log(" check blob %d: %v", i, blob)
buf = buf[:cap(buf)]
if uint(len(buf)) < blob.Length {
buf = make([]byte, blob.Length)
}
buf = buf[:blob.Length]
_, err := packfile.Seek(int64(blob.Offset), 0)
if err != nil {
return errors.Errorf("Seek(%v): %v", blob.Offset, err)
}
_, err = io.ReadFull(packfile, buf)
if err != nil {
debug.Log(" error loading blob %v: %v", blob.ID.Str(), err)
errs = append(errs, errors.Errorf("blob %v: %v", i, err))
continue
}
n, err := r.Key().Decrypt(buf, buf)
if err != nil {
2016-09-27 20:35:08 +00:00
debug.Log(" error decrypting blob %v: %v", blob.ID.Str(), err)
errs = append(errs, errors.Errorf("blob %v: %v", i, err))
continue
}
buf = buf[:n]
hash := restic.Hash(buf)
if !hash.Equal(blob.ID) {
2016-09-27 20:35:08 +00:00
debug.Log(" Blob ID does not match, want %v, got %v", blob.ID.Str(), hash.Str())
errs = append(errs, errors.Errorf("Blob ID does not match, want %v, got %v", blob.ID.Str(), hash.Str()))
continue
}
}
if len(errs) > 0 {
return errors.Errorf("pack %v contains %v errors: %v", id.Str(), len(errs), errs)
}
return nil
}
// ReadData loads all data from the repository and checks the integrity.
2017-06-04 09:16:55 +00:00
func (c *Checker) ReadData(ctx context.Context, p *restic.Progress, errChan chan<- error) {
defer close(errChan)
2015-12-06 16:29:31 +00:00
p.Start()
defer p.Done()
2016-08-31 20:39:36 +00:00
worker := func(wg *sync.WaitGroup, in <-chan restic.ID) {
2015-12-06 16:09:06 +00:00
defer wg.Done()
for {
2016-08-31 20:39:36 +00:00
var id restic.ID
2015-12-06 16:09:06 +00:00
var ok bool
2015-12-06 16:09:06 +00:00
select {
2017-06-04 09:16:55 +00:00
case <-ctx.Done():
2015-12-06 16:09:06 +00:00
return
case id, ok = <-in:
if !ok {
return
}
}
2017-06-04 09:16:55 +00:00
err := checkPack(ctx, c.repo, id)
2015-12-06 16:29:31 +00:00
p.Report(restic.Stat{Blobs: 1})
2015-12-06 16:09:06 +00:00
if err == nil {
continue
}
select {
2017-06-04 09:16:55 +00:00
case <-ctx.Done():
2015-12-06 16:09:06 +00:00
return
case errChan <- err:
}
}
}
2015-12-06 16:09:06 +00:00
2017-06-04 09:16:55 +00:00
ch := c.repo.List(ctx, restic.DataFile)
2015-12-06 16:09:06 +00:00
var wg sync.WaitGroup
for i := 0; i < defaultParallelism; i++ {
wg.Add(1)
go worker(&wg, ch)
}
wg.Wait()
}