2
2
mirror of https://github.com/octoleo/restic.git synced 2024-05-29 07:00:49 +00:00

checker: check trees and blobs in parallel

This commit is contained in:
Alexander Neumann 2015-07-12 16:42:22 +02:00
parent 5d2d552084
commit 5108d91bc7
2 changed files with 296 additions and 77 deletions

View File

@ -44,8 +44,11 @@ func map2id(id mapID) backend.ID {
type Checker struct {
packs map[mapID]struct{}
blobs map[mapID]struct{}
blobRefs map[mapID]uint
indexes map[mapID]*repository.Index
blobRefs struct {
sync.Mutex
M map[mapID]uint
}
indexes map[mapID]*repository.Index
masterIndex *repository.Index
@ -54,17 +57,20 @@ type Checker struct {
// New returns a new checker which runs on repo.
func New(repo *repository.Repository) *Checker {
return &Checker{
blobRefs: make(map[mapID]uint),
c := &Checker{
packs: make(map[mapID]struct{}),
blobs: make(map[mapID]struct{}),
masterIndex: repository.NewIndex(),
indexes: make(map[mapID]*repository.Index),
repo: repo,
}
c.blobRefs.M = make(map[mapID]uint)
return c
}
const defaultParallelism = 20
const defaultParallelism = 40
// LoadIndex loads all index files.
func (c *Checker) LoadIndex() error {
@ -117,7 +123,7 @@ func (c *Checker) LoadIndex() error {
for blob := range res.Index.Each(done) {
c.packs[id2map(blob.PackID)] = struct{}{}
c.blobs[id2map(blob.ID)] = struct{}{}
c.blobRefs[id2map(blob.ID)] = 0
c.blobRefs.M[id2map(blob.ID)] = 0
cnt++
}
@ -219,7 +225,7 @@ func (c *Checker) Packs(errChan chan<- error, done <-chan struct{}) {
debug.Log("Checker.Packs", "workers terminated")
for id := range c.repo.List(backend.Data, done) {
debug.Log("Checker.Packs", "check data blob %v", id)
debug.Log("Checker.Packs", "check data blob %v", id.Str())
if _, ok := seenPacks[id2map(id)]; !ok {
select {
case <-done:
@ -267,92 +273,279 @@ func loadTreeFromSnapshot(repo *repository.Repository, id backend.ID) (backend.I
return sn.Tree, nil
}
// Structure checks that for all snapshots all referenced blobs are available
// in the index. errChan is closed after all trees have been traversed.
func (c *Checker) Structure(errChan chan<- error, done <-chan struct{}) {
defer close(errChan)
// loadSnapshotTreeIDs loads all snapshots from backend and returns the tree IDs.
func loadSnapshotTreeIDs(repo *repository.Repository) (backend.IDs, []error) {
var trees struct {
IDs backend.IDs
sync.Mutex
}
var todo backend.IDs
var errs struct {
errs []error
sync.Mutex
}
for id := range c.repo.List(backend.Snapshot, done) {
debug.Log("Checker.Snaphots", "check snapshot %v", id.Str())
treeID, err := loadTreeFromSnapshot(c.repo, id)
snapshotWorker := func(strID string, done <-chan struct{}) error {
id, err := backend.ParseID(strID)
if err != nil {
select {
case <-done:
return
case errChan <- err:
}
continue
return err
}
debug.Log("Checker.Snaphots", "load snapshot %v", id.Str())
treeID, err := loadTreeFromSnapshot(repo, id)
if err != nil {
errs.Lock()
errs.errs = append(errs.errs, err)
errs.Unlock()
return nil
}
debug.Log("Checker.Snaphots", "snapshot %v has tree %v", id.Str(), treeID.Str())
todo = append(todo, treeID)
trees.Lock()
trees.IDs = append(trees.IDs, treeID)
trees.Unlock()
return nil
}
for _, err := range c.trees(todo) {
err := repository.FilesInParallel(repo.Backend(), backend.Snapshot, defaultParallelism, snapshotWorker)
if err != nil {
errs.errs = append(errs.errs, err)
}
return trees.IDs, errs.errs
}
// TreeError is returned when loading a tree from the repository failed.
type TreeError struct {
ID backend.ID
Errors []error
}
func (e TreeError) Error() string {
return fmt.Sprintf("%v: %d errors", e.ID.String(), len(e.Errors))
}
type treeJob struct {
backend.ID
error
*restic.Tree
}
// loadTreeWorker loads trees from repo and sends them to out.
func loadTreeWorker(repo *repository.Repository,
in <-chan backend.ID, out chan<- treeJob,
done <-chan struct{}, wg *sync.WaitGroup) {
defer func() {
debug.Log("checker.loadTreeWorker", "exiting")
wg.Done()
}()
var (
inCh = in
outCh = out
job treeJob
)
outCh = nil
for {
select {
case <-done:
return
case treeID, ok := <-inCh:
if !ok {
return
}
debug.Log("checker.loadTreeWorker", "load tree %v", treeID.Str())
tree, err := restic.LoadTree(repo, treeID)
debug.Log("checker.loadTreeWorker", "load tree %v (%v) returned err %v", tree, treeID.Str(), err)
job = treeJob{ID: treeID, error: err, Tree: tree}
outCh = out
inCh = nil
case outCh <- job:
debug.Log("checker.loadTreeWorker", "sent tree %v", job.ID.Str())
outCh = nil
inCh = in
}
}
}
// checkTreeWorker checks the trees received and sends out errors to errChan.
func (c *Checker) checkTreeWorker(in <-chan treeJob, out chan<- TreeError, done <-chan struct{}, wg *sync.WaitGroup) {
defer func() {
debug.Log("checker.checkTreeWorker", "exiting")
wg.Done()
}()
var (
inCh = in
outCh = out
treeError TreeError
)
outCh = nil
for {
select {
case <-done:
return
case job, ok := <-inCh:
if !ok {
return
}
id := id2map(job.ID)
alreadyChecked := false
c.blobRefs.Lock()
if c.blobRefs.M[id] > 0 {
alreadyChecked = true
}
c.blobRefs.M[id]++
debug.Log("checker.checkTreeWorker", "tree %v refcount %d", job.ID.Str(), c.blobRefs.M[id])
c.blobRefs.Unlock()
if alreadyChecked {
continue
}
debug.Log("checker.checkTreeWorker", "load tree %v", job.ID.Str())
errs := c.checkTree(job.ID, job.Tree)
if len(errs) > 0 {
debug.Log("checker.checkTreeWorker", "checked tree %v: %v errors", job.ID.Str(), len(errs))
treeError = TreeError{ID: job.ID, Errors: errs}
outCh = out
inCh = nil
}
case outCh <- treeError:
debug.Log("checker.checkTreeWorker", "tree %v: sent %d errors", treeError.ID, len(treeError.Errors))
outCh = nil
inCh = in
}
}
}
func filterTrees(backlog backend.IDs, loaderChan chan<- backend.ID, in <-chan treeJob, out chan<- treeJob, done <-chan struct{}) {
defer func() {
debug.Log("checker.filterTrees", "closing output channels")
close(loaderChan)
close(out)
}()
var (
inCh = in
outCh = out
loadCh = loaderChan
job treeJob
nextTreeID backend.ID
outstandingLoadTreeJobs = 0
)
outCh = nil
loadCh = nil
for {
if loadCh == nil && len(backlog) > 0 {
loadCh = loaderChan
nextTreeID, backlog = backlog[0], backlog[1:]
}
if loadCh == nil && outCh == nil && outstandingLoadTreeJobs == 0 {
debug.Log("checker.filterTrees", "backlog is empty, all channels nil, exiting")
return
}
select {
case <-done:
return
case loadCh <- nextTreeID:
outstandingLoadTreeJobs++
loadCh = nil
case j, ok := <-inCh:
if !ok {
debug.Log("checker.filterTrees", "input channel closed")
inCh = nil
in = nil
continue
}
outstandingLoadTreeJobs--
debug.Log("checker.filterTrees", "input job tree %v", j.ID.Str())
backlog = append(backlog, j.Tree.Subtrees()...)
job = j
outCh = out
inCh = nil
case outCh <- job:
outCh = nil
inCh = in
}
}
}
// Structure checks that for all snapshots all referenced data blobs and
// subtrees are available in the index. errChan is closed after all trees have
// been traversed.
func (c *Checker) Structure(errChan chan<- error, done <-chan struct{}) {
defer close(errChan)
trees, errs := loadSnapshotTreeIDs(c.repo)
debug.Log("checker.Structure", "need to check %d trees from snapshots, %d errs returned", len(trees), len(errs))
for _, err := range errs {
select {
case <-done:
return
case errChan <- err:
}
}
}
func (c *Checker) trees(treeIDs backend.IDs) (errs []error) {
treesChecked := make(map[mapID]struct{})
treeIDChan := make(chan backend.ID)
treeJobChan1 := make(chan treeJob)
treeJobChan2 := make(chan treeJob)
treeErrChan := make(chan TreeError)
for len(treeIDs) > 0 {
id := treeIDs[0]
treeIDs = treeIDs[1:]
c.blobRefs[id2map(id)]++
debug.Log("Checker.trees", "tree %v refcount %d", id.Str(), c.blobRefs[id2map(id)])
if _, ok := treesChecked[id2map(id)]; ok {
debug.Log("Checker.trees", "tree %v already checked", id.Str())
continue
}
debug.Log("Checker.trees", "check tree %v", id.Str())
if _, ok := c.blobs[id2map(id)]; !ok {
errs = append(errs, Error{TreeID: id, Err: errors.New("not found in index")})
continue
}
blobs, subtrees, treeErrors := c.tree(id)
if treeErrors != nil {
debug.Log("Checker.trees", "error checking tree %v: %v", id.Str(), treeErrors)
errs = append(errs, treeErrors...)
continue
}
for _, blobID := range blobs {
c.blobRefs[id2map(blobID)]++
debug.Log("Checker.trees", "blob %v refcount %d", blobID.Str(), c.blobRefs[id2map(blobID)])
if _, ok := c.blobs[id2map(blobID)]; !ok {
debug.Log("Checker.trees", "tree %v references blob %v which isn't contained in index", id.Str(), blobID.Str())
errs = append(errs, Error{TreeID: id, BlobID: blobID, Err: errors.New("not found in index")})
}
}
treeIDs = append(treeIDs, subtrees...)
treesChecked[id2map(id)] = struct{}{}
var wg sync.WaitGroup
for i := 0; i < defaultParallelism; i++ {
wg.Add(2)
go loadTreeWorker(c.repo, treeIDChan, treeJobChan1, done, &wg)
go c.checkTreeWorker(treeJobChan2, treeErrChan, done, &wg)
}
return errs
filterTrees(trees, treeIDChan, treeJobChan1, treeJobChan2, done)
wg.Wait()
}
func (c *Checker) tree(id backend.ID) (blobs backend.IDs, subtrees backend.IDs, errs []error) {
tree, err := restic.LoadTree(c.repo, id)
if err != nil {
return nil, nil, []error{Error{TreeID: id, Err: err}}
}
func (c *Checker) checkTree(id backend.ID, tree *restic.Tree) (errs []error) {
debug.Log("Checker.checkTree", "checking tree %v", id.Str())
// if _, ok := c.blobs[id2map(id)]; !ok {
// errs = append(errs, Error{TreeID: id, Err: errors.New("not found in index")})
// }
// blobs, subtrees, treeErrors := c.tree(id)
// if treeErrors != nil {
// debug.Log("Checker.trees", "error checking tree %v: %v", id.Str(), treeErrors)
// errs = append(errs, treeErrors...)
// continue
// }
// treeIDs = append(treeIDs, subtrees...)
// treesChecked[id2map(id)] = struct{}{}
var blobs []backend.ID
for i, node := range tree.Nodes {
switch node.Type {
@ -363,19 +556,34 @@ func (c *Checker) tree(id backend.ID) (blobs backend.IDs, subtrees backend.IDs,
errs = append(errs, Error{TreeID: id, Err: fmt.Errorf("node %d is dir but has no subtree", i)})
continue
}
subtrees = append(subtrees, node.Subtree)
}
}
return blobs, subtrees, errs
for _, blobID := range blobs {
mid := id2map(blobID)
c.blobRefs.Lock()
c.blobRefs.M[mid]++
debug.Log("Checker.checkTree", "blob %v refcount %d", blobID.Str(), c.blobRefs.M[mid])
c.blobRefs.Unlock()
if _, ok := c.blobs[id2map(blobID)]; !ok {
debug.Log("Checker.trees", "tree %v references blob %v which isn't contained in index", id.Str(), blobID.Str())
errs = append(errs, Error{TreeID: id, BlobID: blobID, Err: errors.New("not found in index")})
}
}
return errs
}
// UnusedBlobs returns all blobs that have never been referenced.
func (c *Checker) UnusedBlobs() (blobs backend.IDs) {
c.blobRefs.Lock()
defer c.blobRefs.Unlock()
debug.Log("Checker.UnusedBlobs", "checking %d blobs", len(c.blobs))
for id := range c.blobs {
if c.blobRefs[id] == 0 {
if c.blobRefs.M[id] == 0 {
debug.Log("Checker.UnusedBlobs", "blob %v not not referenced", map2id(id).Str())
blobs = append(blobs, map2id(id))
}

11
tree.go
View File

@ -89,3 +89,14 @@ func (t Tree) Find(name string) (*Node, error) {
_, node, err := t.binarySearch(name)
return node, err
}
// Subtrees returns a slice of all subtree IDs of the tree.
func (t Tree) Subtrees() (trees backend.IDs) {
for _, node := range t.Nodes {
if node.Type == "dir" && node.Subtree != nil {
trees = append(trees, node.Subtree)
}
}
return trees
}