2
2
mirror of https://github.com/octoleo/restic.git synced 2024-06-03 17:40:53 +00:00

checker: run Packs() in parallel

This commit is contained in:
Alexander Neumann 2015-07-12 00:25:42 +02:00
parent b4170ff45f
commit 7e6174126f

View File

@ -4,6 +4,7 @@ import (
"encoding/hex"
"errors"
"fmt"
"sync"
"github.com/restic/restic"
"github.com/restic/restic/backend"
@ -63,7 +64,7 @@ func New(repo *repository.Repository) *Checker {
}
}
const loadIndexParallelism = 20
const defaultParallelism = 20
// LoadIndex loads all index files.
func (c *Checker) LoadIndex() error {
@ -94,7 +95,7 @@ func (c *Checker) LoadIndex() error {
go func() {
defer close(indexCh)
debug.Log("LoadIndex", "start loading indexes in parallel")
perr = repository.FilesInParallel(c.repo.Backend(), backend.Index, loadIndexParallelism, worker)
perr = repository.FilesInParallel(c.repo.Backend(), backend.Index, defaultParallelism, worker)
debug.Log("LoadIndex", "loading indexes finished, error: %v", perr)
}()
@ -140,33 +141,95 @@ func (e PackError) Error() string {
return "pack " + e.ID.String() + ": " + e.error.Error()
}
func packIDTester(repo *repository.Repository, inChan <-chan mapID, errChan chan<- error, wg *sync.WaitGroup, done <-chan struct{}) {
debug.Log("Checker.testPackID", "worker start")
defer debug.Log("Checker.testPackID", "worker done")
defer wg.Done()
for id := range inChan {
ok, err := repo.Backend().Test(backend.Data, map2str(id))
if err != nil {
err = PackError{map2id(id), err}
} else {
if !ok {
err = PackError{map2id(id), errors.New("does not exist")}
}
}
if err != nil {
debug.Log("Checker.testPackID", "error checking for pack %s: %v", map2id(id).Str(), err)
select {
case <-done:
return
case errChan <- err:
}
continue
}
debug.Log("Checker.testPackID", "pack %s exists", map2id(id).Str())
}
}
func collectErrors(in <-chan error, out chan<- []error, done <-chan struct{}) {
var errs []error
outer:
for {
select {
case err, ok := <-in:
if !ok {
break outer
}
errs = append(errs, err)
case <-done:
break outer
}
}
out <- errs
}
// Packs checks that all packs referenced in the index are still available and
// there are no packs that aren't in an index.
func (c *Checker) Packs() (errs []error) {
func (c *Checker) Packs() []error {
debug.Log("Checker.Packs", "checking for %d packs", len(c.packs))
seenPacks := make(map[mapID]struct{})
for id := range c.packs {
seenPacks[id] = struct{}{}
ok, err := c.repo.Backend().Test(backend.Data, map2str(id))
if err != nil {
debug.Log("Checker.Packs", "error checking for pack %s", map2id(id).Str())
errs = append(errs, PackError{map2id(id), err})
continue
}
if !ok {
debug.Log("Checker.Packs", "pack %s does not exist", map2id(id).Str())
errs = append(errs, PackError{map2id(id), errors.New("does not exist")})
continue
}
debug.Log("Checker.Packs", "pack %s exists", map2id(id).Str())
}
done := make(chan struct{})
defer close(done)
var workerWG sync.WaitGroup
IDChan := make(chan mapID)
errChan := make(chan error)
for i := 0; i < defaultParallelism; i++ {
workerWG.Add(1)
go packIDTester(c.repo, IDChan, errChan, &workerWG, done)
}
errsChan := make(chan []error, 1)
go collectErrors(errChan, errsChan, done)
for id := range c.packs {
seenPacks[id] = struct{}{}
IDChan <- id
}
close(IDChan)
debug.Log("Checker.Packs", "waiting for %d workers to terminate", defaultParallelism)
workerWG.Wait()
debug.Log("Checker.Packs", "workers terminated")
close(errChan)
errs := <-errsChan
debug.Log("Checker.Packs", "error worker terminated")
for id := range c.repo.List(backend.Data, done) {
debug.Log("Checker.Packs", "check data blob %v", id)
if _, ok := seenPacks[id2map(id)]; !ok {
errs = append(errs, PackError{id, errors.New("not referenced in any index")})
}