diff --git a/internal/index/index.go b/internal/index/index.go index 0079c59cd..6c6294b21 100644 --- a/internal/index/index.go +++ b/internal/index/index.go @@ -5,14 +5,13 @@ import ( "context" "fmt" "os" + "sync" "github.com/restic/restic/internal/debug" - "github.com/restic/restic/internal/list" + "github.com/restic/restic/internal/errors" "github.com/restic/restic/internal/pack" "github.com/restic/restic/internal/restic" - "github.com/restic/restic/internal/worker" - - "github.com/restic/restic/internal/errors" + "golang.org/x/sync/errgroup" ) // Pack contains information about the contents of a pack. @@ -35,38 +34,118 @@ func newIndex() *Index { } } +const listPackWorkers = 10 + +// Lister lists files and their contents +type Lister interface { + // List runs fn for all files of type t in the repo. + List(ctx context.Context, t restic.FileType, fn func(restic.ID, int64) error) error + + // ListPack returns the list of blobs saved in the pack id and the length + // of the file as stored in the backend. + ListPack(ctx context.Context, id restic.ID, size int64) ([]restic.Blob, int64, error) +} + // New creates a new index for repo from scratch. InvalidFiles contains all IDs // of files that cannot be listed successfully. -func New(ctx context.Context, repo restic.Repository, ignorePacks restic.IDSet, p *restic.Progress) (idx *Index, invalidFiles restic.IDs, err error) { +func New(ctx context.Context, repo Lister, ignorePacks restic.IDSet, p *restic.Progress) (idx *Index, invalidFiles restic.IDs, err error) { p.Start() defer p.Done() - ch := make(chan worker.Job) - go list.AllPacks(ctx, repo, ignorePacks, ch) + type Job struct { + PackID restic.ID + Size int64 + } + + type Result struct { + Error error + PackID restic.ID + Size int64 + Entries []restic.Blob + } + + inputCh := make(chan Job) + outputCh := make(chan Result) + wg, ctx := errgroup.WithContext(ctx) + + // list the files in the repo, send to inputCh + wg.Go(func() error { + defer close(inputCh) + return repo.List(ctx, restic.DataFile, func(id restic.ID, size int64) error { + if ignorePacks.Has(id) { + return nil + } + + job := Job{ + PackID: id, + Size: size, + } + + select { + case inputCh <- job: + case <-ctx.Done(): + } + return nil + }) + }) + + // run the workers listing the files, read from inputCh, send to outputCh + var workers sync.WaitGroup + for i := 0; i < listPackWorkers; i++ { + workers.Add(1) + go func() { + defer workers.Done() + for job := range inputCh { + res := Result{PackID: job.PackID} + res.Entries, res.Size, res.Error = repo.ListPack(ctx, job.PackID, job.Size) + + select { + case outputCh <- res: + case <-ctx.Done(): + return + } + } + }() + } + + // wait until all the workers are done, then close outputCh + wg.Go(func() error { + workers.Wait() + close(outputCh) + return nil + }) idx = newIndex() - for job := range ch { + for res := range outputCh { p.Report(restic.Stat{Blobs: 1}) - - j := job.Result.(list.Result) - if job.Error != nil { - cause := errors.Cause(job.Error) + if res.Error != nil { + cause := errors.Cause(res.Error) if _, ok := cause.(pack.InvalidFileError); ok { - invalidFiles = append(invalidFiles, j.PackID()) + invalidFiles = append(invalidFiles, res.PackID) continue } - fmt.Fprintf(os.Stderr, "pack file cannot be listed %v: %v\n", j.PackID(), job.Error) + fmt.Fprintf(os.Stderr, "pack file cannot be listed %v: %v\n", res.PackID, res.Error) continue } - debug.Log("pack %v contains %d blobs", j.PackID(), len(j.Entries())) + debug.Log("pack %v contains %d blobs", res.PackID, len(res.Entries)) - err := idx.AddPack(j.PackID(), j.Size(), j.Entries()) + err := idx.AddPack(res.PackID, res.Size, res.Entries) if err != nil { return nil, nil, err } + + select { + case <-ctx.Done(): // an error occurred + default: + } + } + + err = wg.Wait() + if err != nil { + return nil, nil, err } return idx, invalidFiles, nil @@ -89,7 +168,13 @@ type indexJSON struct { Packs []packJSON `json:"packs"` } -func loadIndexJSON(ctx context.Context, repo restic.Repository, id restic.ID) (*indexJSON, error) { +// ListLoader allows listing files and their content, in addition to loading and unmarshaling JSON files. +type ListLoader interface { + Lister + LoadJSONUnpacked(context.Context, restic.FileType, restic.ID, interface{}) error +} + +func loadIndexJSON(ctx context.Context, repo ListLoader, id restic.ID) (*indexJSON, error) { debug.Log("process index %v\n", id) var idx indexJSON @@ -102,7 +187,7 @@ func loadIndexJSON(ctx context.Context, repo restic.Repository, id restic.ID) (* } // Load creates an index by loading all index files from the repo. -func Load(ctx context.Context, repo restic.Repository, p *restic.Progress) (*Index, error) { +func Load(ctx context.Context, repo ListLoader, p *restic.Progress) (*Index, error) { debug.Log("loading indexes") p.Start() @@ -259,8 +344,13 @@ func (idx *Index) FindBlob(h restic.BlobHandle) (result []Location, err error) { const maxEntries = 3000 +// Saver saves structures as JSON. +type Saver interface { + SaveJSONUnpacked(ctx context.Context, t restic.FileType, item interface{}) (restic.ID, error) +} + // Save writes the complete index to the repo. -func (idx *Index) Save(ctx context.Context, repo restic.Repository, supersedes restic.IDs) (restic.IDs, error) { +func (idx *Index) Save(ctx context.Context, repo Saver, supersedes restic.IDs) (restic.IDs, error) { debug.Log("pack files: %d\n", len(idx.Packs)) var indexIDs []restic.ID diff --git a/internal/index/index_test.go b/internal/index/index_test.go index e09d18930..43198cf68 100644 --- a/internal/index/index_test.go +++ b/internal/index/index_test.go @@ -2,10 +2,12 @@ package index import ( "context" + "sync" "testing" "time" "github.com/restic/restic/internal/checker" + "github.com/restic/restic/internal/errors" "github.com/restic/restic/internal/repository" "github.com/restic/restic/internal/restic" "github.com/restic/restic/internal/test" @@ -48,7 +50,7 @@ func TestIndexNew(t *testing.T) { repo, cleanup := createFilledRepo(t, 3, 0) defer cleanup() - idx, _, err := New(context.TODO(), repo, restic.NewIDSet(), nil) + idx, invalid, err := New(context.TODO(), repo, restic.NewIDSet(), nil) if err != nil { t.Fatalf("New() returned error %v", err) } @@ -57,9 +59,102 @@ func TestIndexNew(t *testing.T) { t.Fatalf("New() returned nil index") } + if len(invalid) > 0 { + t.Fatalf("New() returned invalid files: %v", invalid) + } + validateIndex(t, repo, idx) } +type ErrorRepo struct { + restic.Repository + MaxListFiles int + + MaxPacks int + MaxPacksMutex sync.Mutex +} + +// List returns an error after repo.MaxListFiles files. +func (repo *ErrorRepo) List(ctx context.Context, t restic.FileType, fn func(restic.ID, int64) error) error { + if repo.MaxListFiles == 0 { + return errors.New("test error, max is zero") + } + + max := repo.MaxListFiles + return repo.Repository.List(ctx, t, func(id restic.ID, size int64) error { + if max == 0 { + return errors.New("test error, max reached zero") + } + + max-- + return fn(id, size) + }) +} + +// ListPack returns an error after repo.MaxPacks files. +func (repo *ErrorRepo) ListPack(ctx context.Context, id restic.ID, size int64) ([]restic.Blob, int64, error) { + repo.MaxPacksMutex.Lock() + max := repo.MaxPacks + if max > 0 { + repo.MaxPacks-- + } + repo.MaxPacksMutex.Unlock() + + if max == 0 { + return nil, 0, errors.New("test list pack error") + } + + return repo.Repository.ListPack(ctx, id, size) +} + +func TestIndexNewListErrors(t *testing.T) { + repo, cleanup := createFilledRepo(t, 3, 0) + defer cleanup() + + for _, max := range []int{0, 3, 5} { + errRepo := &ErrorRepo{ + Repository: repo, + MaxListFiles: max, + } + idx, invalid, err := New(context.TODO(), errRepo, restic.NewIDSet(), nil) + if err == nil { + t.Errorf("expected error not found, got nil") + } + + if idx != nil { + t.Errorf("expected nil index, got %v", idx) + } + + if len(invalid) != 0 { + t.Errorf("expected empty invalid list, got %v", invalid) + } + } +} + +func TestIndexNewPackErrors(t *testing.T) { + repo, cleanup := createFilledRepo(t, 3, 0) + defer cleanup() + + for _, max := range []int{0, 3, 5} { + errRepo := &ErrorRepo{ + Repository: repo, + MaxPacks: max, + } + idx, invalid, err := New(context.TODO(), errRepo, restic.NewIDSet(), nil) + if err == nil { + t.Errorf("expected error not found, got nil") + } + + if idx != nil { + t.Errorf("expected nil index, got %v", idx) + } + + if len(invalid) != 0 { + t.Errorf("expected empty invalid list, got %v", invalid) + } + } +} + func TestIndexLoad(t *testing.T) { repo, cleanup := createFilledRepo(t, 3, 0) defer cleanup() @@ -186,7 +281,7 @@ func BenchmarkIndexSave(b *testing.B) { } func TestIndexDuplicateBlobs(t *testing.T) { - repo, cleanup := createFilledRepo(t, 3, 0.01) + repo, cleanup := createFilledRepo(t, 3, 0.05) defer cleanup() idx, _, err := New(context.TODO(), repo, restic.NewIDSet(), nil) @@ -252,6 +347,7 @@ func TestIndexSave(t *testing.T) { } ctx, cancel := context.WithCancel(context.TODO()) + defer cancel() errCh := make(chan error) go checker.Structure(ctx, errCh) diff --git a/internal/list/list.go b/internal/list/list.go deleted file mode 100644 index 9332f520f..000000000 --- a/internal/list/list.go +++ /dev/null @@ -1,79 +0,0 @@ -package list - -import ( - "context" - - "github.com/restic/restic/internal/restic" - "github.com/restic/restic/internal/worker" -) - -const listPackWorkers = 10 - -// Lister combines lists packs in a repo and blobs in a pack. -type Lister interface { - List(context.Context, restic.FileType, func(restic.ID, int64) error) error - ListPack(context.Context, restic.ID, int64) ([]restic.Blob, int64, error) -} - -// Result is returned in the channel from LoadBlobsFromAllPacks. -type Result struct { - packID restic.ID - size int64 - entries []restic.Blob -} - -// PackID returns the pack ID of this result. -func (l Result) PackID() restic.ID { - return l.packID -} - -// Size returns the size of the pack. -func (l Result) Size() int64 { - return l.size -} - -// Entries returns a list of all blobs saved in the pack. -func (l Result) Entries() []restic.Blob { - return l.entries -} - -// AllPacks sends the contents of all packs to ch. -func AllPacks(ctx context.Context, repo Lister, ignorePacks restic.IDSet, ch chan<- worker.Job) { - type fileInfo struct { - id restic.ID - size int64 - } - - f := func(ctx context.Context, job worker.Job) (interface{}, error) { - packInfo := job.Data.(fileInfo) - entries, size, err := repo.ListPack(ctx, packInfo.id, packInfo.size) - - return Result{ - packID: packInfo.id, - size: size, - entries: entries, - }, err - } - - jobCh := make(chan worker.Job) - wp := worker.New(ctx, listPackWorkers, f, jobCh, ch) - - go func() { - defer close(jobCh) - - _ = repo.List(ctx, restic.DataFile, func(id restic.ID, size int64) error { - if ignorePacks.Has(id) { - return nil - } - - select { - case jobCh <- worker.Job{Data: fileInfo{id: id, size: size}, Result: Result{packID: id}}: - case <-ctx.Done(): - return ctx.Err() - } - return nil - }) - }() - - wp.Wait() -} diff --git a/internal/restic/testing.go b/internal/restic/testing.go index c0d1684f8..eb10919e7 100644 --- a/internal/restic/testing.go +++ b/internal/restic/testing.go @@ -26,6 +26,7 @@ type fakeFileSystem struct { duplication float32 buf []byte chunker *chunker.Chunker + rand *rand.Rand } // saveFile reads from rd and saves the blobs in the repository. The list of @@ -87,7 +88,7 @@ func (fs *fakeFileSystem) treeIsKnown(tree *Tree) (bool, []byte, ID) { } func (fs *fakeFileSystem) blobIsKnown(id ID, t BlobType) bool { - if rand.Float32() < fs.duplication { + if fs.rand.Float32() < fs.duplication { return false } @@ -175,6 +176,7 @@ func TestCreateSnapshot(t testing.TB, repo Repository, at time.Time, depth int, repo: repo, knownBlobs: NewIDSet(), duplication: duplication, + rand: rand.New(rand.NewSource(seed)), } treeID := fs.saveTree(context.TODO(), seed, depth) diff --git a/internal/worker/doc.go b/internal/worker/doc.go deleted file mode 100644 index 602bb5037..000000000 --- a/internal/worker/doc.go +++ /dev/null @@ -1,2 +0,0 @@ -// Package worker implements a worker pool. -package worker diff --git a/internal/worker/pool.go b/internal/worker/pool.go deleted file mode 100644 index 870548378..000000000 --- a/internal/worker/pool.go +++ /dev/null @@ -1,101 +0,0 @@ -package worker - -import "context" - -// Job is one unit of work. It is given to a Func, and the returned result and -// error are stored in Result and Error. -type Job struct { - Data interface{} - Result interface{} - Error error -} - -// Func does the actual work within a Pool. -type Func func(ctx context.Context, job Job) (result interface{}, err error) - -// Pool implements a worker pool. -type Pool struct { - f Func - jobCh <-chan Job - resCh chan<- Job - - numWorkers int - workersExit chan struct{} - allWorkersDone chan struct{} -} - -// New returns a new worker pool with n goroutines, each running the function -// f. The workers are started immediately. -func New(ctx context.Context, n int, f Func, jobChan <-chan Job, resultChan chan<- Job) *Pool { - p := &Pool{ - f: f, - workersExit: make(chan struct{}), - allWorkersDone: make(chan struct{}), - numWorkers: n, - jobCh: jobChan, - resCh: resultChan, - } - - for i := 0; i < n; i++ { - go p.runWorker(ctx, i) - } - - go p.waitForExit() - - return p -} - -// waitForExit receives from p.workersExit until all worker functions have -// exited, then closes the result channel. -func (p *Pool) waitForExit() { - n := p.numWorkers - for n > 0 { - <-p.workersExit - n-- - } - close(p.allWorkersDone) - close(p.resCh) -} - -// runWorker runs a worker function. -func (p *Pool) runWorker(ctx context.Context, numWorker int) { - defer func() { - p.workersExit <- struct{}{} - }() - - var ( - // enable the input channel when starting up a new goroutine - inCh = p.jobCh - // but do not enable the output channel until we have a result - outCh chan<- Job - - job Job - ok bool - ) - - for { - select { - case <-ctx.Done(): - return - - case job, ok = <-inCh: - if !ok { - return - } - - job.Result, job.Error = p.f(ctx, job) - inCh = nil - outCh = p.resCh - - case outCh <- job: - outCh = nil - inCh = p.jobCh - } - } -} - -// Wait waits for all worker goroutines to terminate, afterwards the output -// channel is closed. -func (p *Pool) Wait() { - <-p.allWorkersDone -} diff --git a/internal/worker/pool_test.go b/internal/worker/pool_test.go deleted file mode 100644 index 7c10b7e6a..000000000 --- a/internal/worker/pool_test.go +++ /dev/null @@ -1,92 +0,0 @@ -package worker_test - -import ( - "context" - "testing" - - "github.com/restic/restic/internal/errors" - - "github.com/restic/restic/internal/worker" -) - -const concurrency = 10 - -var errTooLarge = errors.New("too large") - -func square(ctx context.Context, job worker.Job) (interface{}, error) { - n := job.Data.(int) - if n > 2000 { - return nil, errTooLarge - } - return n * n, nil -} - -func newBufferedPool(ctx context.Context, bufsize int, n int, f worker.Func) (chan worker.Job, chan worker.Job, *worker.Pool) { - inCh := make(chan worker.Job, bufsize) - outCh := make(chan worker.Job, bufsize) - - return inCh, outCh, worker.New(ctx, n, f, inCh, outCh) -} - -func TestPool(t *testing.T) { - inCh, outCh, p := newBufferedPool(context.TODO(), 200, concurrency, square) - - for i := 0; i < 150; i++ { - inCh <- worker.Job{Data: i} - } - - close(inCh) - p.Wait() - - for res := range outCh { - if res.Error != nil { - t.Errorf("unexpected error for job %v received: %v", res.Data, res.Error) - continue - } - - n := res.Data.(int) - m := res.Result.(int) - - if m != n*n { - t.Errorf("wrong value for job %d returned: want %d, got %d", n, n*n, m) - } - } -} - -func TestPoolErrors(t *testing.T) { - inCh, outCh, p := newBufferedPool(context.TODO(), 200, concurrency, square) - - for i := 0; i < 150; i++ { - inCh <- worker.Job{Data: i + 1900} - } - - close(inCh) - p.Wait() - - for res := range outCh { - n := res.Data.(int) - - if n > 2000 { - if res.Error == nil { - t.Errorf("expected error not found, result is %v", res) - continue - } - - if res.Error != errTooLarge { - t.Errorf("unexpected error found, result is %v", res) - } - - continue - } else { - if res.Error != nil { - t.Errorf("unexpected error for job %v received: %v", res.Data, res.Error) - continue - } - } - - m := res.Result.(int) - if m != n*n { - t.Errorf("wrong value for job %d returned: want %d, got %d", n, n*n, m) - } - } -}