From b096fc7abf7d8a3ef2ab33b6499005dc1f7cd164 Mon Sep 17 00:00:00 2001 From: Alexander Neumann Date: Sun, 28 Oct 2018 11:16:29 +0100 Subject: [PATCH] index: Correctly process errors listing all files This also removes the now unused `list` and `worker` packages. --- internal/index/index.go | 99 ++++++++++++++++++++++++++++------ internal/list/list.go | 79 --------------------------- internal/worker/doc.go | 2 - internal/worker/pool.go | 101 ----------------------------------- internal/worker/pool_test.go | 92 ------------------------------- 5 files changed, 84 insertions(+), 289 deletions(-) delete mode 100644 internal/list/list.go delete mode 100644 internal/worker/doc.go delete mode 100644 internal/worker/pool.go delete mode 100644 internal/worker/pool_test.go diff --git a/internal/index/index.go b/internal/index/index.go index 0079c59cd..b8eb62b95 100644 --- a/internal/index/index.go +++ b/internal/index/index.go @@ -5,14 +5,13 @@ import ( "context" "fmt" "os" + "sync" "github.com/restic/restic/internal/debug" - "github.com/restic/restic/internal/list" + "github.com/restic/restic/internal/errors" "github.com/restic/restic/internal/pack" "github.com/restic/restic/internal/restic" - "github.com/restic/restic/internal/worker" - - "github.com/restic/restic/internal/errors" + "golang.org/x/sync/errgroup" ) // Pack contains information about the contents of a pack. @@ -35,38 +34,108 @@ func newIndex() *Index { } } +const listPackWorkers = 10 + // New creates a new index for repo from scratch. InvalidFiles contains all IDs // of files that cannot be listed successfully. func New(ctx context.Context, repo restic.Repository, ignorePacks restic.IDSet, p *restic.Progress) (idx *Index, invalidFiles restic.IDs, err error) { p.Start() defer p.Done() - ch := make(chan worker.Job) - go list.AllPacks(ctx, repo, ignorePacks, ch) + type Job struct { + PackID restic.ID + Size int64 + } + + type Result struct { + Error error + PackID restic.ID + Size int64 + Entries []restic.Blob + } + + inputCh := make(chan Job) + outputCh := make(chan Result) + wg, ctx := errgroup.WithContext(ctx) + + // list the files in the repo, send to inputCh + wg.Go(func() error { + defer close(inputCh) + return repo.List(ctx, restic.DataFile, func(id restic.ID, size int64) error { + if ignorePacks.Has(id) { + return nil + } + + job := Job{ + PackID: id, + Size: size, + } + + select { + case inputCh <- job: + case <-ctx.Done(): + } + return nil + }) + }) + + // run the workers listing the files, read from inputCh, send to outputCh + var workers sync.WaitGroup + for i := 0; i < listPackWorkers; i++ { + workers.Add(1) + go func() { + defer workers.Done() + for job := range inputCh { + res := Result{PackID: job.PackID} + res.Entries, res.Size, res.Error = repo.ListPack(ctx, job.PackID, job.Size) + + select { + case outputCh <- res: + case <-ctx.Done(): + return + } + } + }() + } + + // wait until all the workers are done, then close outputCh + wg.Go(func() error { + workers.Wait() + close(outputCh) + return nil + }) idx = newIndex() - for job := range ch { + for res := range outputCh { p.Report(restic.Stat{Blobs: 1}) - - j := job.Result.(list.Result) - if job.Error != nil { - cause := errors.Cause(job.Error) + if res.Error != nil { + cause := errors.Cause(res.Error) if _, ok := cause.(pack.InvalidFileError); ok { - invalidFiles = append(invalidFiles, j.PackID()) + invalidFiles = append(invalidFiles, res.PackID) continue } - fmt.Fprintf(os.Stderr, "pack file cannot be listed %v: %v\n", j.PackID(), job.Error) + fmt.Fprintf(os.Stderr, "pack file cannot be listed %v: %v\n", res.PackID, res.Error) continue } - debug.Log("pack %v contains %d blobs", j.PackID(), len(j.Entries())) + debug.Log("pack %v contains %d blobs", res.PackID, len(res.Entries)) - err := idx.AddPack(j.PackID(), j.Size(), j.Entries()) + err := idx.AddPack(res.PackID, res.Size, res.Entries) if err != nil { return nil, nil, err } + + select { + case <-ctx.Done(): // an error occurred + default: + } + } + + err = wg.Wait() + if err != nil { + return nil, nil, err } return idx, invalidFiles, nil diff --git a/internal/list/list.go b/internal/list/list.go deleted file mode 100644 index 9332f520f..000000000 --- a/internal/list/list.go +++ /dev/null @@ -1,79 +0,0 @@ -package list - -import ( - "context" - - "github.com/restic/restic/internal/restic" - "github.com/restic/restic/internal/worker" -) - -const listPackWorkers = 10 - -// Lister combines lists packs in a repo and blobs in a pack. -type Lister interface { - List(context.Context, restic.FileType, func(restic.ID, int64) error) error - ListPack(context.Context, restic.ID, int64) ([]restic.Blob, int64, error) -} - -// Result is returned in the channel from LoadBlobsFromAllPacks. -type Result struct { - packID restic.ID - size int64 - entries []restic.Blob -} - -// PackID returns the pack ID of this result. -func (l Result) PackID() restic.ID { - return l.packID -} - -// Size returns the size of the pack. -func (l Result) Size() int64 { - return l.size -} - -// Entries returns a list of all blobs saved in the pack. -func (l Result) Entries() []restic.Blob { - return l.entries -} - -// AllPacks sends the contents of all packs to ch. -func AllPacks(ctx context.Context, repo Lister, ignorePacks restic.IDSet, ch chan<- worker.Job) { - type fileInfo struct { - id restic.ID - size int64 - } - - f := func(ctx context.Context, job worker.Job) (interface{}, error) { - packInfo := job.Data.(fileInfo) - entries, size, err := repo.ListPack(ctx, packInfo.id, packInfo.size) - - return Result{ - packID: packInfo.id, - size: size, - entries: entries, - }, err - } - - jobCh := make(chan worker.Job) - wp := worker.New(ctx, listPackWorkers, f, jobCh, ch) - - go func() { - defer close(jobCh) - - _ = repo.List(ctx, restic.DataFile, func(id restic.ID, size int64) error { - if ignorePacks.Has(id) { - return nil - } - - select { - case jobCh <- worker.Job{Data: fileInfo{id: id, size: size}, Result: Result{packID: id}}: - case <-ctx.Done(): - return ctx.Err() - } - return nil - }) - }() - - wp.Wait() -} diff --git a/internal/worker/doc.go b/internal/worker/doc.go deleted file mode 100644 index 602bb5037..000000000 --- a/internal/worker/doc.go +++ /dev/null @@ -1,2 +0,0 @@ -// Package worker implements a worker pool. -package worker diff --git a/internal/worker/pool.go b/internal/worker/pool.go deleted file mode 100644 index 870548378..000000000 --- a/internal/worker/pool.go +++ /dev/null @@ -1,101 +0,0 @@ -package worker - -import "context" - -// Job is one unit of work. It is given to a Func, and the returned result and -// error are stored in Result and Error. -type Job struct { - Data interface{} - Result interface{} - Error error -} - -// Func does the actual work within a Pool. -type Func func(ctx context.Context, job Job) (result interface{}, err error) - -// Pool implements a worker pool. -type Pool struct { - f Func - jobCh <-chan Job - resCh chan<- Job - - numWorkers int - workersExit chan struct{} - allWorkersDone chan struct{} -} - -// New returns a new worker pool with n goroutines, each running the function -// f. The workers are started immediately. -func New(ctx context.Context, n int, f Func, jobChan <-chan Job, resultChan chan<- Job) *Pool { - p := &Pool{ - f: f, - workersExit: make(chan struct{}), - allWorkersDone: make(chan struct{}), - numWorkers: n, - jobCh: jobChan, - resCh: resultChan, - } - - for i := 0; i < n; i++ { - go p.runWorker(ctx, i) - } - - go p.waitForExit() - - return p -} - -// waitForExit receives from p.workersExit until all worker functions have -// exited, then closes the result channel. -func (p *Pool) waitForExit() { - n := p.numWorkers - for n > 0 { - <-p.workersExit - n-- - } - close(p.allWorkersDone) - close(p.resCh) -} - -// runWorker runs a worker function. -func (p *Pool) runWorker(ctx context.Context, numWorker int) { - defer func() { - p.workersExit <- struct{}{} - }() - - var ( - // enable the input channel when starting up a new goroutine - inCh = p.jobCh - // but do not enable the output channel until we have a result - outCh chan<- Job - - job Job - ok bool - ) - - for { - select { - case <-ctx.Done(): - return - - case job, ok = <-inCh: - if !ok { - return - } - - job.Result, job.Error = p.f(ctx, job) - inCh = nil - outCh = p.resCh - - case outCh <- job: - outCh = nil - inCh = p.jobCh - } - } -} - -// Wait waits for all worker goroutines to terminate, afterwards the output -// channel is closed. -func (p *Pool) Wait() { - <-p.allWorkersDone -} diff --git a/internal/worker/pool_test.go b/internal/worker/pool_test.go deleted file mode 100644 index 7c10b7e6a..000000000 --- a/internal/worker/pool_test.go +++ /dev/null @@ -1,92 +0,0 @@ -package worker_test - -import ( - "context" - "testing" - - "github.com/restic/restic/internal/errors" - - "github.com/restic/restic/internal/worker" -) - -const concurrency = 10 - -var errTooLarge = errors.New("too large") - -func square(ctx context.Context, job worker.Job) (interface{}, error) { - n := job.Data.(int) - if n > 2000 { - return nil, errTooLarge - } - return n * n, nil -} - -func newBufferedPool(ctx context.Context, bufsize int, n int, f worker.Func) (chan worker.Job, chan worker.Job, *worker.Pool) { - inCh := make(chan worker.Job, bufsize) - outCh := make(chan worker.Job, bufsize) - - return inCh, outCh, worker.New(ctx, n, f, inCh, outCh) -} - -func TestPool(t *testing.T) { - inCh, outCh, p := newBufferedPool(context.TODO(), 200, concurrency, square) - - for i := 0; i < 150; i++ { - inCh <- worker.Job{Data: i} - } - - close(inCh) - p.Wait() - - for res := range outCh { - if res.Error != nil { - t.Errorf("unexpected error for job %v received: %v", res.Data, res.Error) - continue - } - - n := res.Data.(int) - m := res.Result.(int) - - if m != n*n { - t.Errorf("wrong value for job %d returned: want %d, got %d", n, n*n, m) - } - } -} - -func TestPoolErrors(t *testing.T) { - inCh, outCh, p := newBufferedPool(context.TODO(), 200, concurrency, square) - - for i := 0; i < 150; i++ { - inCh <- worker.Job{Data: i + 1900} - } - - close(inCh) - p.Wait() - - for res := range outCh { - n := res.Data.(int) - - if n > 2000 { - if res.Error == nil { - t.Errorf("expected error not found, result is %v", res) - continue - } - - if res.Error != errTooLarge { - t.Errorf("unexpected error found, result is %v", res) - } - - continue - } else { - if res.Error != nil { - t.Errorf("unexpected error for job %v received: %v", res.Data, res.Error) - continue - } - } - - m := res.Result.(int) - if m != n*n { - t.Errorf("wrong value for job %d returned: want %d, got %d", n, n*n, m) - } - } -}