From 1d7bb01a6b606d7d44627f5fcf2d3d33a7976f1f Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Sat, 7 Nov 2020 00:23:45 +0100 Subject: [PATCH 01/13] check: Cleanup tree loading and switch to use errgroup The helper methods are now wired up in the Structure method. --- internal/checker/checker.go | 145 ++++++++++++++---------------------- 1 file changed, 54 insertions(+), 91 deletions(-) diff --git a/internal/checker/checker.go b/internal/checker/checker.go index 23564ea45..9c6922a36 100644 --- a/internal/checker/checker.go +++ b/internal/checker/checker.go @@ -316,115 +316,56 @@ type treeJob struct { // loadTreeWorker loads trees from repo and sends them to out. func loadTreeWorker(ctx context.Context, repo restic.Repository, - in <-chan restic.ID, out chan<- treeJob, - wg *sync.WaitGroup) { + in <-chan restic.ID, out chan<- treeJob) { - defer func() { - debug.Log("exiting") - wg.Done() - }() + for treeID := range in { + tree, err := repo.LoadTree(ctx, treeID) + debug.Log("load tree %v (%v) returned err: %v", tree, treeID, err) + job := treeJob{ID: treeID, error: err, Tree: tree} - var ( - inCh = in - outCh = out - job treeJob - ) - - outCh = nil - for { select { case <-ctx.Done(): return - - case treeID, ok := <-inCh: - if !ok { - return - } - debug.Log("load tree %v", treeID) - - tree, err := repo.LoadTree(ctx, treeID) - debug.Log("load tree %v (%v) returned err: %v", tree, treeID, err) - job = treeJob{ID: treeID, error: err, Tree: tree} - outCh = out - inCh = nil - - case outCh <- job: - debug.Log("sent tree %v", job.ID) - outCh = nil - inCh = in + case out <- job: } } } // checkTreeWorker checks the trees received and sends out errors to errChan. -func (c *Checker) checkTreeWorker(ctx context.Context, in <-chan treeJob, out chan<- error, wg *sync.WaitGroup) { - defer func() { - debug.Log("exiting") - wg.Done() - }() +func (c *Checker) checkTreeWorker(ctx context.Context, in <-chan treeJob, out chan<- error) { + for job := range in { + debug.Log("check tree %v (tree %v, err %v)", job.ID, job.Tree, job.error) - var ( - inCh = in - outCh = out - treeError TreeError - ) + var errs []error + if job.error != nil { + errs = append(errs, job.error) + } else { + errs = c.checkTree(job.ID, job.Tree) + } - outCh = nil - for { + if len(errs) == 0 { + continue + } + treeError := TreeError{ID: job.ID, Errors: errs} select { case <-ctx.Done(): - debug.Log("done channel closed, exiting") return - - case job, ok := <-inCh: - if !ok { - debug.Log("input channel closed, exiting") - return - } - - debug.Log("check tree %v (tree %v, err %v)", job.ID, job.Tree, job.error) - - var errs []error - if job.error != nil { - errs = append(errs, job.error) - } else { - errs = c.checkTree(job.ID, job.Tree) - } - - if len(errs) > 0 { - debug.Log("checked tree %v: %v errors", job.ID, len(errs)) - treeError = TreeError{ID: job.ID, Errors: errs} - outCh = out - inCh = nil - } - - case outCh <- treeError: + case out <- treeError: debug.Log("tree %v: sent %d errors", treeError.ID, len(treeError.Errors)) - outCh = nil - inCh = in } } } func (c *Checker) filterTrees(ctx context.Context, backlog restic.IDs, loaderChan chan<- restic.ID, in <-chan treeJob, out chan<- treeJob) { - defer func() { - debug.Log("closing output channels") - close(loaderChan) - close(out) - }() - var ( inCh = in - outCh = out - loadCh = loaderChan + outCh chan<- treeJob + loadCh chan<- restic.ID job treeJob nextTreeID restic.ID outstandingLoadTreeJobs = 0 ) - outCh = nil - loadCh = nil - for { if loadCh == nil && len(backlog) > 0 { // process last added ids first, that is traverse the tree in depth-first order @@ -528,8 +469,6 @@ func loadSnapshotTreeIDs(ctx context.Context, repo restic.Repository) (ids resti // subtrees are available in the index. errChan is closed after all trees have // been traversed. func (c *Checker) Structure(ctx context.Context, errChan chan<- error) { - defer close(errChan) - trees, errs := loadSnapshotTreeIDs(ctx, c.repo) debug.Log("need to check %d trees from snapshots, %d errs returned", len(trees), len(errs)) @@ -541,18 +480,42 @@ func (c *Checker) Structure(ctx context.Context, errChan chan<- error) { } } - treeIDChan := make(chan restic.ID) - treeJobChan1 := make(chan treeJob) - treeJobChan2 := make(chan treeJob) + loaderChan := make(chan restic.ID) + loadedTreeChan := make(chan treeJob) + treeStream := make(chan treeJob) + + wg, ctx := errgroup.WithContext(ctx) + var loadTreeWg sync.WaitGroup - var wg sync.WaitGroup for i := 0; i < defaultParallelism; i++ { - wg.Add(2) - go loadTreeWorker(ctx, c.repo, treeIDChan, treeJobChan1, &wg) - go c.checkTreeWorker(ctx, treeJobChan2, errChan, &wg) + loadTreeWg.Add(1) + wg.Go(func() error { + defer loadTreeWg.Done() + loadTreeWorker(ctx, c.repo, loaderChan, loadedTreeChan) + return nil + }) + } + // close once all loadTreeWorkers have completed + wg.Go(func() error { + loadTreeWg.Wait() + close(loadedTreeChan) + return nil + }) + + defer close(errChan) + for i := 0; i < defaultParallelism; i++ { + wg.Go(func() error { + c.checkTreeWorker(ctx, treeStream, errChan) + return nil + }) } - c.filterTrees(ctx, trees, treeIDChan, treeJobChan1, treeJobChan2) + wg.Go(func() error { + defer close(loaderChan) + defer close(treeStream) + c.filterTrees(ctx, trees, loaderChan, loadedTreeChan, treeStream) + return nil + }) wg.Wait() } From 6e03f80ca26285d865e8d63cf65ad1e6e6a6f2ff Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Sat, 7 Nov 2020 00:39:40 +0100 Subject: [PATCH 02/13] check: Split the parallelized tree loader into a reusable component The actual code change is minimal --- internal/checker/checker.go | 159 +++------------------------------ internal/restic/tree_stream.go | 155 ++++++++++++++++++++++++++++++++ 2 files changed, 169 insertions(+), 145 deletions(-) create mode 100644 internal/restic/tree_stream.go diff --git a/internal/checker/checker.go b/internal/checker/checker.go index 9c6922a36..4a1c5cb4b 100644 --- a/internal/checker/checker.go +++ b/internal/checker/checker.go @@ -308,37 +308,14 @@ func (e TreeError) Error() string { return fmt.Sprintf("tree %v: %v", e.ID.Str(), e.Errors) } -type treeJob struct { - restic.ID - error - *restic.Tree -} - -// loadTreeWorker loads trees from repo and sends them to out. -func loadTreeWorker(ctx context.Context, repo restic.Repository, - in <-chan restic.ID, out chan<- treeJob) { - - for treeID := range in { - tree, err := repo.LoadTree(ctx, treeID) - debug.Log("load tree %v (%v) returned err: %v", tree, treeID, err) - job := treeJob{ID: treeID, error: err, Tree: tree} - - select { - case <-ctx.Done(): - return - case out <- job: - } - } -} - // checkTreeWorker checks the trees received and sends out errors to errChan. -func (c *Checker) checkTreeWorker(ctx context.Context, in <-chan treeJob, out chan<- error) { - for job := range in { - debug.Log("check tree %v (tree %v, err %v)", job.ID, job.Tree, job.error) +func (c *Checker) checkTreeWorker(ctx context.Context, trees <-chan restic.TreeItem, out chan<- error) { + for job := range trees { + debug.Log("check tree %v (tree %v, err %v)", job.ID, job.Tree, job.Error) var errs []error - if job.error != nil { - errs = append(errs, job.error) + if job.Error != nil { + errs = append(errs, job.Error) } else { errs = c.checkTree(job.ID, job.Tree) } @@ -356,97 +333,6 @@ func (c *Checker) checkTreeWorker(ctx context.Context, in <-chan treeJob, out ch } } -func (c *Checker) filterTrees(ctx context.Context, backlog restic.IDs, loaderChan chan<- restic.ID, in <-chan treeJob, out chan<- treeJob) { - var ( - inCh = in - outCh chan<- treeJob - loadCh chan<- restic.ID - job treeJob - nextTreeID restic.ID - outstandingLoadTreeJobs = 0 - ) - - for { - if loadCh == nil && len(backlog) > 0 { - // process last added ids first, that is traverse the tree in depth-first order - ln := len(backlog) - 1 - nextTreeID, backlog = backlog[ln], backlog[:ln] - - // use a separate flag for processed trees to ensure that check still processes trees - // even when a file references a tree blob - c.blobRefs.Lock() - h := restic.BlobHandle{ID: nextTreeID, Type: restic.TreeBlob} - blobReferenced := c.blobRefs.M.Has(h) - // noop if already referenced - c.blobRefs.M.Insert(h) - c.blobRefs.Unlock() - if blobReferenced { - continue - } - - loadCh = loaderChan - } - - if loadCh == nil && outCh == nil && outstandingLoadTreeJobs == 0 { - debug.Log("backlog is empty, all channels nil, exiting") - return - } - - select { - case <-ctx.Done(): - return - - case loadCh <- nextTreeID: - outstandingLoadTreeJobs++ - loadCh = nil - - case j, ok := <-inCh: - if !ok { - debug.Log("input channel closed") - inCh = nil - in = nil - continue - } - - outstandingLoadTreeJobs-- - - debug.Log("input job tree %v", j.ID) - - if j.error != nil { - debug.Log("received job with error: %v (tree %v, ID %v)", j.error, j.Tree, j.ID) - } else if j.Tree == nil { - debug.Log("received job with nil tree pointer: %v (ID %v)", j.error, j.ID) - // send a new job with the new error instead of the old one - j = treeJob{ID: j.ID, error: errors.New("tree is nil and error is nil")} - } else { - subtrees := j.Tree.Subtrees() - debug.Log("subtrees for tree %v: %v", j.ID, subtrees) - // iterate backwards over subtree to compensate backwards traversal order of nextTreeID selection - for i := len(subtrees) - 1; i >= 0; i-- { - id := subtrees[i] - if id.IsNull() { - // We do not need to raise this error here, it is - // checked when the tree is checked. Just make sure - // that we do not add any null IDs to the backlog. - debug.Log("tree %v has nil subtree", j.ID) - continue - } - backlog = append(backlog, id) - } - } - - job = j - outCh = out - inCh = nil - - case outCh <- job: - debug.Log("tree sent to check: %v", job.ID) - outCh = nil - inCh = in - } - } -} - func loadSnapshotTreeIDs(ctx context.Context, repo restic.Repository) (ids restic.IDs, errs []error) { err := restic.ForAllSnapshots(ctx, repo, nil, func(id restic.ID, sn *restic.Snapshot, err error) error { if err != nil { @@ -480,26 +366,16 @@ func (c *Checker) Structure(ctx context.Context, errChan chan<- error) { } } - loaderChan := make(chan restic.ID) - loadedTreeChan := make(chan treeJob) - treeStream := make(chan treeJob) - wg, ctx := errgroup.WithContext(ctx) - var loadTreeWg sync.WaitGroup - - for i := 0; i < defaultParallelism; i++ { - loadTreeWg.Add(1) - wg.Go(func() error { - defer loadTreeWg.Done() - loadTreeWorker(ctx, c.repo, loaderChan, loadedTreeChan) - return nil - }) - } - // close once all loadTreeWorkers have completed - wg.Go(func() error { - loadTreeWg.Wait() - close(loadedTreeChan) - return nil + treeStream := restic.StreamTrees(ctx, wg, c.repo, trees, func(treeID restic.ID) bool { + // blobRefs may be accessed in parallel by checkTree + c.blobRefs.Lock() + h := restic.BlobHandle{ID: treeID, Type: restic.TreeBlob} + blobReferenced := c.blobRefs.M.Has(h) + // noop if already referenced + c.blobRefs.M.Insert(h) + c.blobRefs.Unlock() + return blobReferenced }) defer close(errChan) @@ -510,13 +386,6 @@ func (c *Checker) Structure(ctx context.Context, errChan chan<- error) { }) } - wg.Go(func() error { - defer close(loaderChan) - defer close(treeStream) - c.filterTrees(ctx, trees, loaderChan, loadedTreeChan, treeStream) - return nil - }) - wg.Wait() } diff --git a/internal/restic/tree_stream.go b/internal/restic/tree_stream.go new file mode 100644 index 000000000..b71f4aa18 --- /dev/null +++ b/internal/restic/tree_stream.go @@ -0,0 +1,155 @@ +package restic + +import ( + "context" + "errors" + "sync" + + "github.com/restic/restic/internal/debug" + "golang.org/x/sync/errgroup" +) + +const streamTreeParallelism = 5 + +// TreeItem is used to return either an error or the tree for a tree id +type TreeItem struct { + ID + Error error + *Tree +} + +// loadTreeWorker loads trees from repo and sends them to out. +func loadTreeWorker(ctx context.Context, repo TreeLoader, + in <-chan ID, out chan<- TreeItem) { + + for treeID := range in { + tree, err := repo.LoadTree(ctx, treeID) + debug.Log("load tree %v (%v) returned err: %v", tree, treeID, err) + job := TreeItem{ID: treeID, Error: err, Tree: tree} + + select { + case <-ctx.Done(): + return + case out <- job: + } + } +} + +func filterTrees(ctx context.Context, backlog IDs, loaderChan chan<- ID, + in <-chan TreeItem, out chan<- TreeItem, skip func(tree ID) bool) { + + var ( + inCh = in + outCh chan<- TreeItem + loadCh chan<- ID + job TreeItem + nextTreeID ID + outstandingLoadTreeJobs = 0 + ) + + for { + if loadCh == nil && len(backlog) > 0 { + // process last added ids first, that is traverse the tree in depth-first order + ln := len(backlog) - 1 + nextTreeID, backlog = backlog[ln], backlog[:ln] + + if skip(nextTreeID) { + continue + } + + loadCh = loaderChan + } + + if loadCh == nil && outCh == nil && outstandingLoadTreeJobs == 0 { + debug.Log("backlog is empty, all channels nil, exiting") + return + } + + select { + case <-ctx.Done(): + return + + case loadCh <- nextTreeID: + outstandingLoadTreeJobs++ + loadCh = nil + + case j, ok := <-inCh: + if !ok { + debug.Log("input channel closed") + inCh = nil + in = nil + continue + } + + outstandingLoadTreeJobs-- + + debug.Log("input job tree %v", j.ID) + + if j.Error != nil { + debug.Log("received job with error: %v (tree %v, ID %v)", j.Error, j.Tree, j.ID) + } else if j.Tree == nil { + debug.Log("received job with nil tree pointer: %v (ID %v)", j.Error, j.ID) + // send a new job with the new error instead of the old one + j = TreeItem{ID: j.ID, Error: errors.New("tree is nil and error is nil")} + } else { + subtrees := j.Tree.Subtrees() + debug.Log("subtrees for tree %v: %v", j.ID, subtrees) + // iterate backwards over subtree to compensate backwards traversal order of nextTreeID selection + for i := len(subtrees) - 1; i >= 0; i-- { + id := subtrees[i] + if id.IsNull() { + // We do not need to raise this error here, it is + // checked when the tree is checked. Just make sure + // that we do not add any null IDs to the backlog. + debug.Log("tree %v has nil subtree", j.ID) + continue + } + backlog = append(backlog, id) + } + } + + job = j + outCh = out + inCh = nil + + case outCh <- job: + debug.Log("tree sent to process: %v", job.ID) + outCh = nil + inCh = in + } + } +} + +// StreamTrees iteratively loads the given trees and their subtrees. The skip method +// is guaranteed to always be called from the same goroutine. +func StreamTrees(ctx context.Context, wg *errgroup.Group, repo TreeLoader, trees IDs, skip func(tree ID) bool) <-chan TreeItem { + loaderChan := make(chan ID) + loadedTreeChan := make(chan TreeItem) + treeStream := make(chan TreeItem) + + var loadTreeWg sync.WaitGroup + + for i := 0; i < streamTreeParallelism; i++ { + loadTreeWg.Add(1) + wg.Go(func() error { + defer loadTreeWg.Done() + loadTreeWorker(ctx, repo, loaderChan, loadedTreeChan) + return nil + }) + } + + // close once all loadTreeWorkers have completed + wg.Go(func() error { + loadTreeWg.Wait() + close(loadedTreeChan) + return nil + }) + + wg.Go(func() error { + defer close(loaderChan) + defer close(treeStream) + filterTrees(ctx, trees, loaderChan, loadedTreeChan, treeStream, skip) + return nil + }) + return treeStream +} From f2a1b125cb6859161b20957b6ec083ff0229424d Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Sat, 7 Nov 2020 01:12:07 +0100 Subject: [PATCH 03/13] restic: Actually parallelize FindUsedBlobs --- internal/restic/find.go | 57 +++++++++++++++++++++++++---------------- 1 file changed, 35 insertions(+), 22 deletions(-) diff --git a/internal/restic/find.go b/internal/restic/find.go index b5bef0720..b797cac6b 100644 --- a/internal/restic/find.go +++ b/internal/restic/find.go @@ -1,6 +1,11 @@ package restic -import "context" +import ( + "context" + "sync" + + "golang.org/x/sync/errgroup" +) // TreeLoader loads a tree from a repository. type TreeLoader interface { @@ -10,30 +15,38 @@ type TreeLoader interface { // FindUsedBlobs traverses the tree ID and adds all seen blobs (trees and data // blobs) to the set blobs. Already seen tree blobs will not be visited again. func FindUsedBlobs(ctx context.Context, repo TreeLoader, treeID ID, blobs BlobSet) error { - h := BlobHandle{ID: treeID, Type: TreeBlob} - if blobs.Has(h) { - return nil - } - blobs.Insert(h) + var lock sync.Mutex - tree, err := repo.LoadTree(ctx, treeID) - if err != nil { - return err - } + wg, ctx := errgroup.WithContext(ctx) + treeStream := StreamTrees(ctx, wg, repo, IDs{treeID}, func(treeID ID) bool { + // locking is necessary the goroutine below concurrently adds data blobs + lock.Lock() + h := BlobHandle{ID: treeID, Type: TreeBlob} + blobReferenced := blobs.Has(h) + // noop if already referenced + blobs.Insert(h) + lock.Unlock() + return blobReferenced + }) - for _, node := range tree.Nodes { - switch node.Type { - case "file": - for _, blob := range node.Content { - blobs.Insert(BlobHandle{ID: blob, Type: DataBlob}) + wg.Go(func() error { + for tree := range treeStream { + if tree.Error != nil { + return tree.Error } - case "dir": - err := FindUsedBlobs(ctx, repo, *node.Subtree, blobs) - if err != nil { - return err + + lock.Lock() + for _, node := range tree.Nodes { + switch node.Type { + case "file": + for _, blob := range node.Content { + blobs.Insert(BlobHandle{ID: blob, Type: DataBlob}) + } + } } + lock.Unlock() } - } - - return nil + return nil + }) + return wg.Wait() } From 0caad1e890bd016cfac2e76a39c0de1840520579 Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Sat, 7 Nov 2020 01:41:22 +0100 Subject: [PATCH 04/13] copy: parallelize tree walk --- cmd/restic/cmd_copy.go | 107 +++++++++++++++++++++-------------------- 1 file changed, 55 insertions(+), 52 deletions(-) diff --git a/cmd/restic/cmd_copy.go b/cmd/restic/cmd_copy.go index 216dab836..37cf45ec3 100644 --- a/cmd/restic/cmd_copy.go +++ b/cmd/restic/cmd_copy.go @@ -6,6 +6,7 @@ import ( "github.com/restic/restic/internal/debug" "github.com/restic/restic/internal/restic" + "golang.org/x/sync/errgroup" "github.com/spf13/cobra" ) @@ -184,57 +185,59 @@ type treeCloner struct { buf []byte } -func (t *treeCloner) copyTree(ctx context.Context, treeID restic.ID) error { - // We have already processed this tree - if t.visitedTrees.Has(treeID) { +func (t *treeCloner) copyTree(ctx context.Context, rootTreeID restic.ID) error { + wg, ctx := errgroup.WithContext(ctx) + + treeStream := restic.StreamTrees(ctx, wg, t.srcRepo, restic.IDs{rootTreeID}, func(treeID restic.ID) bool { + visited := t.visitedTrees.Has(treeID) + t.visitedTrees.Insert(treeID) + return visited + }) + + wg.Go(func() error { + for tree := range treeStream { + if tree.Error != nil { + return fmt.Errorf("LoadTree(%v) returned error %v", tree.ID.Str(), tree.Error) + } + + // Do we already have this tree blob? + if !t.dstRepo.Index().Has(restic.BlobHandle{ID: tree.ID, Type: restic.TreeBlob}) { + newTreeID, err := t.dstRepo.SaveTree(ctx, tree.Tree) + if err != nil { + return fmt.Errorf("SaveTree(%v) returned error %v", tree.ID.Str(), err) + } + // Assurance only. + if newTreeID != tree.ID { + return fmt.Errorf("SaveTree(%v) returned unexpected id %s", tree.ID.Str(), newTreeID.Str()) + } + } + + // TODO: parallelize blob down/upload + + for _, entry := range tree.Nodes { + // Recursion into directories is handled by StreamTrees + // Copy the blobs for this file. + for _, blobID := range entry.Content { + // Do we already have this data blob? + if t.dstRepo.Index().Has(restic.BlobHandle{ID: blobID, Type: restic.DataBlob}) { + continue + } + debug.Log("Copying blob %s\n", blobID.Str()) + var err error + t.buf, err = t.srcRepo.LoadBlob(ctx, restic.DataBlob, blobID, t.buf) + if err != nil { + return fmt.Errorf("LoadBlob(%v) returned error %v", blobID, err) + } + + _, _, err = t.dstRepo.SaveBlob(ctx, restic.DataBlob, t.buf, blobID, false) + if err != nil { + return fmt.Errorf("SaveBlob(%v) returned error %v", blobID, err) + } + } + } + + } return nil - } - - tree, err := t.srcRepo.LoadTree(ctx, treeID) - if err != nil { - return fmt.Errorf("LoadTree(%v) returned error %v", treeID.Str(), err) - } - t.visitedTrees.Insert(treeID) - - // Do we already have this tree blob? - if !t.dstRepo.Index().Has(restic.BlobHandle{ID: treeID, Type: restic.TreeBlob}) { - newTreeID, err := t.dstRepo.SaveTree(ctx, tree) - if err != nil { - return fmt.Errorf("SaveTree(%v) returned error %v", treeID.Str(), err) - } - // Assurance only. - if newTreeID != treeID { - return fmt.Errorf("SaveTree(%v) returned unexpected id %s", treeID.Str(), newTreeID.Str()) - } - } - - // TODO: parellize this stuff, likely only needed inside a tree. - - for _, entry := range tree.Nodes { - // If it is a directory, recurse - if entry.Type == "dir" && entry.Subtree != nil { - if err := t.copyTree(ctx, *entry.Subtree); err != nil { - return err - } - } - // Copy the blobs for this file. - for _, blobID := range entry.Content { - // Do we already have this data blob? - if t.dstRepo.Index().Has(restic.BlobHandle{ID: blobID, Type: restic.DataBlob}) { - continue - } - debug.Log("Copying blob %s\n", blobID.Str()) - t.buf, err = t.srcRepo.LoadBlob(ctx, restic.DataBlob, blobID, t.buf) - if err != nil { - return fmt.Errorf("LoadBlob(%v) returned error %v", blobID, err) - } - - _, _, err = t.dstRepo.SaveBlob(ctx, restic.DataBlob, t.buf, blobID, false) - if err != nil { - return fmt.Errorf("SaveBlob(%v) returned error %v", blobID, err) - } - } - } - - return nil + }) + return wg.Wait() } From 3d6a3e2555b9635558014114c93fee895f076839 Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Wed, 16 Dec 2020 22:55:07 +0100 Subject: [PATCH 05/13] copy: Remove treeCloner struct --- cmd/restic/cmd_copy.go | 38 ++++++++++++++++---------------------- 1 file changed, 16 insertions(+), 22 deletions(-) diff --git a/cmd/restic/cmd_copy.go b/cmd/restic/cmd_copy.go index 37cf45ec3..b11c5e1e8 100644 --- a/cmd/restic/cmd_copy.go +++ b/cmd/restic/cmd_copy.go @@ -104,12 +104,8 @@ func runCopy(opts CopyOptions, gopts GlobalOptions, args []string) error { dstSnapshotByOriginal[*sn.ID()] = append(dstSnapshotByOriginal[*sn.ID()], sn) } - cloner := &treeCloner{ - srcRepo: srcRepo, - dstRepo: dstRepo, - visitedTrees: restic.NewIDSet(), - buf: nil, - } + // remember already processed trees across all snapshots + visitedTrees := restic.NewIDSet() for sn := range FindFilteredSnapshots(ctx, srcRepo, opts.Hosts, opts.Tags, opts.Paths, args) { Verbosef("\nsnapshot %s of %v at %s)\n", sn.ID().Str(), sn.Paths, sn.Time) @@ -134,7 +130,7 @@ func runCopy(opts CopyOptions, gopts GlobalOptions, args []string) error { } Verbosef(" copy started, this may take a while...\n") - if err := cloner.copyTree(ctx, *sn.Tree); err != nil { + if err := copyTree(ctx, srcRepo, dstRepo, visitedTrees, *sn.Tree); err != nil { return err } debug.Log("tree copied") @@ -178,31 +174,29 @@ func similarSnapshots(sna *restic.Snapshot, snb *restic.Snapshot) bool { return true } -type treeCloner struct { - srcRepo restic.Repository - dstRepo restic.Repository - visitedTrees restic.IDSet - buf []byte -} +func copyTree(ctx context.Context, srcRepo restic.Repository, dstRepo restic.Repository, + visitedTrees restic.IDSet, rootTreeID restic.ID) error { -func (t *treeCloner) copyTree(ctx context.Context, rootTreeID restic.ID) error { wg, ctx := errgroup.WithContext(ctx) - treeStream := restic.StreamTrees(ctx, wg, t.srcRepo, restic.IDs{rootTreeID}, func(treeID restic.ID) bool { - visited := t.visitedTrees.Has(treeID) - t.visitedTrees.Insert(treeID) + treeStream := restic.StreamTrees(ctx, wg, srcRepo, restic.IDs{rootTreeID}, func(treeID restic.ID) bool { + visited := visitedTrees.Has(treeID) + visitedTrees.Insert(treeID) return visited }) wg.Go(func() error { + // reused buffer + var buf []byte + for tree := range treeStream { if tree.Error != nil { return fmt.Errorf("LoadTree(%v) returned error %v", tree.ID.Str(), tree.Error) } // Do we already have this tree blob? - if !t.dstRepo.Index().Has(restic.BlobHandle{ID: tree.ID, Type: restic.TreeBlob}) { - newTreeID, err := t.dstRepo.SaveTree(ctx, tree.Tree) + if !dstRepo.Index().Has(restic.BlobHandle{ID: tree.ID, Type: restic.TreeBlob}) { + newTreeID, err := dstRepo.SaveTree(ctx, tree.Tree) if err != nil { return fmt.Errorf("SaveTree(%v) returned error %v", tree.ID.Str(), err) } @@ -219,17 +213,17 @@ func (t *treeCloner) copyTree(ctx context.Context, rootTreeID restic.ID) error { // Copy the blobs for this file. for _, blobID := range entry.Content { // Do we already have this data blob? - if t.dstRepo.Index().Has(restic.BlobHandle{ID: blobID, Type: restic.DataBlob}) { + if dstRepo.Index().Has(restic.BlobHandle{ID: blobID, Type: restic.DataBlob}) { continue } debug.Log("Copying blob %s\n", blobID.Str()) var err error - t.buf, err = t.srcRepo.LoadBlob(ctx, restic.DataBlob, blobID, t.buf) + buf, err = srcRepo.LoadBlob(ctx, restic.DataBlob, blobID, buf) if err != nil { return fmt.Errorf("LoadBlob(%v) returned error %v", blobID, err) } - _, _, err = t.dstRepo.SaveBlob(ctx, restic.DataBlob, t.buf, blobID, false) + _, _, err = dstRepo.SaveBlob(ctx, restic.DataBlob, buf, blobID, false) if err != nil { return fmt.Errorf("SaveBlob(%v) returned error %v", blobID, err) } From 258ce0c1e5cc488c0267fbf024b969041db43e5d Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Tue, 17 Nov 2020 22:37:57 +0100 Subject: [PATCH 06/13] parallel: report progress for StreamTrees This assigns an id to each tree root and then keeps track of how many tree loads (i.e. trees referenced for the first time) are pending per tree root. Once a tree root and its subtrees were fully processed there are no more pending tree loads and the tree root is reported as processed. --- cmd/restic/cmd_copy.go | 2 +- internal/checker/checker.go | 2 +- internal/restic/find.go | 2 +- internal/restic/tree_stream.go | 56 +++++++++++++++++++++++++--------- 4 files changed, 44 insertions(+), 18 deletions(-) diff --git a/cmd/restic/cmd_copy.go b/cmd/restic/cmd_copy.go index b11c5e1e8..cb8296d4b 100644 --- a/cmd/restic/cmd_copy.go +++ b/cmd/restic/cmd_copy.go @@ -183,7 +183,7 @@ func copyTree(ctx context.Context, srcRepo restic.Repository, dstRepo restic.Rep visited := visitedTrees.Has(treeID) visitedTrees.Insert(treeID) return visited - }) + }, nil) wg.Go(func() error { // reused buffer diff --git a/internal/checker/checker.go b/internal/checker/checker.go index 4a1c5cb4b..e41c1f1b5 100644 --- a/internal/checker/checker.go +++ b/internal/checker/checker.go @@ -376,7 +376,7 @@ func (c *Checker) Structure(ctx context.Context, errChan chan<- error) { c.blobRefs.M.Insert(h) c.blobRefs.Unlock() return blobReferenced - }) + }, nil) defer close(errChan) for i := 0; i < defaultParallelism; i++ { diff --git a/internal/restic/find.go b/internal/restic/find.go index b797cac6b..4c72766c6 100644 --- a/internal/restic/find.go +++ b/internal/restic/find.go @@ -27,7 +27,7 @@ func FindUsedBlobs(ctx context.Context, repo TreeLoader, treeID ID, blobs BlobSe blobs.Insert(h) lock.Unlock() return blobReferenced - }) + }, nil) wg.Go(func() error { for tree := range treeStream { diff --git a/internal/restic/tree_stream.go b/internal/restic/tree_stream.go index b71f4aa18..0c2a96810 100644 --- a/internal/restic/tree_stream.go +++ b/internal/restic/tree_stream.go @@ -6,6 +6,7 @@ import ( "sync" "github.com/restic/restic/internal/debug" + "github.com/restic/restic/internal/ui/progress" "golang.org/x/sync/errgroup" ) @@ -18,14 +19,24 @@ type TreeItem struct { *Tree } +type trackedTreeItem struct { + TreeItem + rootIdx int +} + +type trackedID struct { + ID + rootIdx int +} + // loadTreeWorker loads trees from repo and sends them to out. func loadTreeWorker(ctx context.Context, repo TreeLoader, - in <-chan ID, out chan<- TreeItem) { + in <-chan trackedID, out chan<- trackedTreeItem) { for treeID := range in { - tree, err := repo.LoadTree(ctx, treeID) + tree, err := repo.LoadTree(ctx, treeID.ID) debug.Log("load tree %v (%v) returned err: %v", tree, treeID, err) - job := TreeItem{ID: treeID, Error: err, Tree: tree} + job := trackedTreeItem{TreeItem: TreeItem{ID: treeID.ID, Error: err, Tree: tree}, rootIdx: treeID.rootIdx} select { case <-ctx.Done(): @@ -35,17 +46,23 @@ func loadTreeWorker(ctx context.Context, repo TreeLoader, } } -func filterTrees(ctx context.Context, backlog IDs, loaderChan chan<- ID, - in <-chan TreeItem, out chan<- TreeItem, skip func(tree ID) bool) { +func filterTrees(ctx context.Context, trees IDs, loaderChan chan<- trackedID, + in <-chan trackedTreeItem, out chan<- TreeItem, skip func(tree ID) bool, p *progress.Counter) { var ( inCh = in outCh chan<- TreeItem - loadCh chan<- ID + loadCh chan<- trackedID job TreeItem - nextTreeID ID + nextTreeID trackedID outstandingLoadTreeJobs = 0 ) + rootCounter := make([]int, len(trees)) + backlog := make([]trackedID, 0, len(trees)) + for idx, id := range trees { + backlog = append(backlog, trackedID{ID: id, rootIdx: idx}) + rootCounter[idx] = 1 + } for { if loadCh == nil && len(backlog) > 0 { @@ -53,7 +70,11 @@ func filterTrees(ctx context.Context, backlog IDs, loaderChan chan<- ID, ln := len(backlog) - 1 nextTreeID, backlog = backlog[ln], backlog[:ln] - if skip(nextTreeID) { + if skip(nextTreeID.ID) { + rootCounter[nextTreeID.rootIdx]-- + if p != nil && rootCounter[nextTreeID.rootIdx] == 0 { + p.Add(1) + } continue } @@ -82,6 +103,7 @@ func filterTrees(ctx context.Context, backlog IDs, loaderChan chan<- ID, } outstandingLoadTreeJobs-- + rootCounter[j.rootIdx]-- debug.Log("input job tree %v", j.ID) @@ -90,7 +112,7 @@ func filterTrees(ctx context.Context, backlog IDs, loaderChan chan<- ID, } else if j.Tree == nil { debug.Log("received job with nil tree pointer: %v (ID %v)", j.Error, j.ID) // send a new job with the new error instead of the old one - j = TreeItem{ID: j.ID, Error: errors.New("tree is nil and error is nil")} + j = trackedTreeItem{TreeItem: TreeItem{ID: j.ID, Error: errors.New("tree is nil and error is nil")}, rootIdx: j.rootIdx} } else { subtrees := j.Tree.Subtrees() debug.Log("subtrees for tree %v: %v", j.ID, subtrees) @@ -104,11 +126,15 @@ func filterTrees(ctx context.Context, backlog IDs, loaderChan chan<- ID, debug.Log("tree %v has nil subtree", j.ID) continue } - backlog = append(backlog, id) + backlog = append(backlog, trackedID{ID: id, rootIdx: j.rootIdx}) + rootCounter[j.rootIdx]++ } } + if p != nil && rootCounter[j.rootIdx] == 0 { + p.Add(1) + } - job = j + job = j.TreeItem outCh = out inCh = nil @@ -122,9 +148,9 @@ func filterTrees(ctx context.Context, backlog IDs, loaderChan chan<- ID, // StreamTrees iteratively loads the given trees and their subtrees. The skip method // is guaranteed to always be called from the same goroutine. -func StreamTrees(ctx context.Context, wg *errgroup.Group, repo TreeLoader, trees IDs, skip func(tree ID) bool) <-chan TreeItem { - loaderChan := make(chan ID) - loadedTreeChan := make(chan TreeItem) +func StreamTrees(ctx context.Context, wg *errgroup.Group, repo TreeLoader, trees IDs, skip func(tree ID) bool, p *progress.Counter) <-chan TreeItem { + loaderChan := make(chan trackedID) + loadedTreeChan := make(chan trackedTreeItem) treeStream := make(chan TreeItem) var loadTreeWg sync.WaitGroup @@ -148,7 +174,7 @@ func StreamTrees(ctx context.Context, wg *errgroup.Group, repo TreeLoader, trees wg.Go(func() error { defer close(loaderChan) defer close(treeStream) - filterTrees(ctx, trees, loaderChan, loadedTreeChan, treeStream, skip) + filterTrees(ctx, trees, loaderChan, loadedTreeChan, treeStream, skip, p) return nil }) return treeStream From eda8c67616f30e385a7003a5607ffcd7dde05131 Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Sat, 7 Nov 2020 14:16:04 +0100 Subject: [PATCH 07/13] restic: let FindUsedBlobs handle multiple snapshots at once --- cmd/restic/cmd_prune.go | 16 +++++----------- cmd/restic/cmd_stats.go | 2 +- internal/restic/find.go | 7 ++++--- internal/restic/find_test.go | 8 ++++---- 4 files changed, 14 insertions(+), 19 deletions(-) diff --git a/cmd/restic/cmd_prune.go b/cmd/restic/cmd_prune.go index bdad4efd9..90fe7693d 100644 --- a/cmd/restic/cmd_prune.go +++ b/cmd/restic/cmd_prune.go @@ -574,20 +574,14 @@ func getUsedBlobs(gopts GlobalOptions, repo restic.Repository, ignoreSnapshots r bar := newProgressMax(!gopts.Quiet, uint64(len(snapshotTrees)), "snapshots") defer bar.Done() - for _, tree := range snapshotTrees { - debug.Log("process tree %v", tree) - err = restic.FindUsedBlobs(ctx, repo, tree, usedBlobs) - if err != nil { - if repo.Backend().IsNotExist(err) { - return nil, errors.Fatal("unable to load a tree from the repo: " + err.Error()) - } - - return nil, err + err = restic.FindUsedBlobs(ctx, repo, snapshotTrees, usedBlobs, bar) + if err != nil { + if repo.Backend().IsNotExist(err) { + return nil, errors.Fatal("unable to load a tree from the repo: " + err.Error()) } - debug.Log("processed tree %v", tree) - bar.Add(1) + return nil, err } return usedBlobs, nil } diff --git a/cmd/restic/cmd_stats.go b/cmd/restic/cmd_stats.go index 81ec66843..deb649e26 100644 --- a/cmd/restic/cmd_stats.go +++ b/cmd/restic/cmd_stats.go @@ -166,7 +166,7 @@ func statsWalkSnapshot(ctx context.Context, snapshot *restic.Snapshot, repo rest if statsOptions.countMode == countModeRawData { // count just the sizes of unique blobs; we don't need to walk the tree // ourselves in this case, since a nifty function does it for us - return restic.FindUsedBlobs(ctx, repo, *snapshot.Tree, stats.blobs) + return restic.FindUsedBlobs(ctx, repo, restic.IDs{*snapshot.Tree}, stats.blobs, nil) } err := walker.Walk(ctx, repo, *snapshot.Tree, restic.NewIDSet(), statsWalkTree(repo, stats)) diff --git a/internal/restic/find.go b/internal/restic/find.go index 4c72766c6..4d6433d60 100644 --- a/internal/restic/find.go +++ b/internal/restic/find.go @@ -4,6 +4,7 @@ import ( "context" "sync" + "github.com/restic/restic/internal/ui/progress" "golang.org/x/sync/errgroup" ) @@ -14,11 +15,11 @@ type TreeLoader interface { // FindUsedBlobs traverses the tree ID and adds all seen blobs (trees and data // blobs) to the set blobs. Already seen tree blobs will not be visited again. -func FindUsedBlobs(ctx context.Context, repo TreeLoader, treeID ID, blobs BlobSet) error { +func FindUsedBlobs(ctx context.Context, repo TreeLoader, treeIDs IDs, blobs BlobSet, p *progress.Counter) error { var lock sync.Mutex wg, ctx := errgroup.WithContext(ctx) - treeStream := StreamTrees(ctx, wg, repo, IDs{treeID}, func(treeID ID) bool { + treeStream := StreamTrees(ctx, wg, repo, treeIDs, func(treeID ID) bool { // locking is necessary the goroutine below concurrently adds data blobs lock.Lock() h := BlobHandle{ID: treeID, Type: TreeBlob} @@ -27,7 +28,7 @@ func FindUsedBlobs(ctx context.Context, repo TreeLoader, treeID ID, blobs BlobSe blobs.Insert(h) lock.Unlock() return blobReferenced - }, nil) + }, p) wg.Go(func() error { for tree := range treeStream { diff --git a/internal/restic/find_test.go b/internal/restic/find_test.go index 635421d8b..278a7471a 100644 --- a/internal/restic/find_test.go +++ b/internal/restic/find_test.go @@ -94,7 +94,7 @@ func TestFindUsedBlobs(t *testing.T) { for i, sn := range snapshots { usedBlobs := restic.NewBlobSet() - err := restic.FindUsedBlobs(context.TODO(), repo, *sn.Tree, usedBlobs) + err := restic.FindUsedBlobs(context.TODO(), repo, restic.IDs{*sn.Tree}, usedBlobs, nil) if err != nil { t.Errorf("FindUsedBlobs returned error: %v", err) continue @@ -133,12 +133,12 @@ func TestFindUsedBlobsSkipsSeenBlobs(t *testing.T) { t.Logf("snapshot %v saved, tree %v", snapshot.ID().Str(), snapshot.Tree.Str()) usedBlobs := restic.NewBlobSet() - err := restic.FindUsedBlobs(context.TODO(), repo, *snapshot.Tree, usedBlobs) + err := restic.FindUsedBlobs(context.TODO(), repo, restic.IDs{*snapshot.Tree}, usedBlobs, nil) if err != nil { t.Fatalf("FindUsedBlobs returned error: %v", err) } - err = restic.FindUsedBlobs(context.TODO(), ForbiddenRepo{}, *snapshot.Tree, usedBlobs) + err = restic.FindUsedBlobs(context.TODO(), ForbiddenRepo{}, restic.IDs{*snapshot.Tree}, usedBlobs, nil) if err != nil { t.Fatalf("FindUsedBlobs returned error: %v", err) } @@ -154,7 +154,7 @@ func BenchmarkFindUsedBlobs(b *testing.B) { for i := 0; i < b.N; i++ { blobs := restic.NewBlobSet() - err := restic.FindUsedBlobs(context.TODO(), repo, *sn.Tree, blobs) + err := restic.FindUsedBlobs(context.TODO(), repo, restic.IDs{*sn.Tree}, blobs, nil) if err != nil { b.Error(err) } From 505f8a22298ab0c1fe93aa1a1ce475e340d24228 Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Sat, 5 Dec 2020 23:57:06 +0100 Subject: [PATCH 08/13] progress/counter: Support updating the progress bar maximum --- cmd/restic/progress.go | 13 ++++++++----- internal/ui/progress/counter.go | 29 ++++++++++++++++++++++++---- internal/ui/progress/counter_test.go | 18 +++++++++++++---- 3 files changed, 47 insertions(+), 13 deletions(-) diff --git a/cmd/restic/progress.go b/cmd/restic/progress.go index 2fbd97c6c..c0b6c56fb 100644 --- a/cmd/restic/progress.go +++ b/cmd/restic/progress.go @@ -33,11 +33,14 @@ func newProgressMax(show bool, max uint64, description string) *progress.Counter } interval := calculateProgressInterval() - return progress.New(interval, func(v uint64, d time.Duration, final bool) { - status := fmt.Sprintf("[%s] %s %d / %d %s", - formatDuration(d), - formatPercent(v, max), - v, max, description) + return progress.New(interval, max, func(v uint64, max uint64, d time.Duration, final bool) { + var status string + if max == 0 { + status = fmt.Sprintf("[%s] %d %s", formatDuration(d), v, description) + } else { + status = fmt.Sprintf("[%s] %s %d / %d %s", + formatDuration(d), formatPercent(v, max), v, max, description) + } if w := stdoutTerminalWidth(); w > 0 { status = shortenStatus(w, status) diff --git a/internal/ui/progress/counter.go b/internal/ui/progress/counter.go index bf4906978..82d5e39ba 100644 --- a/internal/ui/progress/counter.go +++ b/internal/ui/progress/counter.go @@ -12,7 +12,7 @@ import ( // // The final argument is true if Counter.Done has been called, // which means that the current call will be the last. -type Func func(value uint64, runtime time.Duration, final bool) +type Func func(value uint64, total uint64, runtime time.Duration, final bool) // A Counter tracks a running count and controls a goroutine that passes its // value periodically to a Func. @@ -27,16 +27,19 @@ type Counter struct { valueMutex sync.Mutex value uint64 + max uint64 } // New starts a new Counter. -func New(interval time.Duration, report Func) *Counter { +func New(interval time.Duration, total uint64, report Func) *Counter { c := &Counter{ report: report, start: time.Now(), stopped: make(chan struct{}), stop: make(chan struct{}), + max: total, } + if interval > 0 { c.tick = time.NewTicker(interval) } @@ -56,6 +59,16 @@ func (c *Counter) Add(v uint64) { c.valueMutex.Unlock() } +// SetMax sets the maximum expected counter value. This method is concurrency-safe. +func (c *Counter) SetMax(max uint64) { + if c == nil { + return + } + c.valueMutex.Lock() + c.max = max + c.valueMutex.Unlock() +} + // Done tells a Counter to stop and waits for it to report its final value. func (c *Counter) Done() { if c == nil { @@ -77,11 +90,19 @@ func (c *Counter) get() uint64 { return v } +func (c *Counter) getMax() uint64 { + c.valueMutex.Lock() + max := c.max + c.valueMutex.Unlock() + + return max +} + func (c *Counter) run() { defer close(c.stopped) defer func() { // Must be a func so that time.Since isn't called at defer time. - c.report(c.get(), time.Since(c.start), true) + c.report(c.get(), c.getMax(), time.Since(c.start), true) }() var tick <-chan time.Time @@ -101,6 +122,6 @@ func (c *Counter) run() { return } - c.report(c.get(), now.Sub(c.start), false) + c.report(c.get(), c.getMax(), now.Sub(c.start), false) } } diff --git a/internal/ui/progress/counter_test.go b/internal/ui/progress/counter_test.go index 9a76d9cbf..84d1e5a64 100644 --- a/internal/ui/progress/counter_test.go +++ b/internal/ui/progress/counter_test.go @@ -10,23 +10,30 @@ import ( func TestCounter(t *testing.T) { const N = 100 + const startTotal = uint64(12345) var ( finalSeen = false increasing = true last uint64 + lastTotal = startTotal ncalls int + nmaxChange int ) - report := func(value uint64, d time.Duration, final bool) { + report := func(value uint64, total uint64, d time.Duration, final bool) { finalSeen = true if value < last { increasing = false } last = value + if total != lastTotal { + nmaxChange++ + } + lastTotal = total ncalls++ } - c := progress.New(10*time.Millisecond, report) + c := progress.New(10*time.Millisecond, startTotal, report) done := make(chan struct{}) go func() { @@ -35,6 +42,7 @@ func TestCounter(t *testing.T) { time.Sleep(time.Millisecond) c.Add(1) } + c.SetMax(42) }() <-done @@ -43,6 +51,8 @@ func TestCounter(t *testing.T) { test.Assert(t, finalSeen, "final call did not happen") test.Assert(t, increasing, "values not increasing") test.Equals(t, uint64(N), last) + test.Equals(t, uint64(42), lastTotal) + test.Equals(t, int(1), nmaxChange) t.Log("number of calls:", ncalls) } @@ -58,14 +68,14 @@ func TestCounterNoTick(t *testing.T) { finalSeen := false otherSeen := false - report := func(value uint64, d time.Duration, final bool) { + report := func(value, total uint64, d time.Duration, final bool) { if final { finalSeen = true } else { otherSeen = true } } - c := progress.New(0, report) + c := progress.New(0, 1, report) time.Sleep(time.Millisecond) c.Done() From e2b007244138140ce8cdeaf22740fc134d8ddeed Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Sun, 6 Dec 2020 00:07:45 +0100 Subject: [PATCH 09/13] check: add progress bar to the tree structure check --- cmd/restic/cmd_check.go | 6 +++++- internal/checker/checker.go | 5 +++-- internal/checker/checker_test.go | 4 +++- internal/checker/testing.go | 2 +- internal/repository/master_index_test.go | 2 +- 5 files changed, 13 insertions(+), 6 deletions(-) diff --git a/cmd/restic/cmd_check.go b/cmd/restic/cmd_check.go index 774879490..c1a6e5464 100644 --- a/cmd/restic/cmd_check.go +++ b/cmd/restic/cmd_check.go @@ -240,7 +240,11 @@ func runCheck(opts CheckOptions, gopts GlobalOptions, args []string) error { Verbosef("check snapshots, trees and blobs\n") errChan = make(chan error) - go chkr.Structure(gopts.ctx, errChan) + go func() { + bar := newProgressMax(!gopts.Quiet, 0, "snapshots") + defer bar.Done() + chkr.Structure(gopts.ctx, bar, errChan) + }() for err := range errChan { errorsFound = true diff --git a/internal/checker/checker.go b/internal/checker/checker.go index e41c1f1b5..1ed470e99 100644 --- a/internal/checker/checker.go +++ b/internal/checker/checker.go @@ -354,8 +354,9 @@ func loadSnapshotTreeIDs(ctx context.Context, repo restic.Repository) (ids resti // Structure checks that for all snapshots all referenced data blobs and // subtrees are available in the index. errChan is closed after all trees have // been traversed. -func (c *Checker) Structure(ctx context.Context, errChan chan<- error) { +func (c *Checker) Structure(ctx context.Context, p *progress.Counter, errChan chan<- error) { trees, errs := loadSnapshotTreeIDs(ctx, c.repo) + p.SetMax(uint64(len(trees))) debug.Log("need to check %d trees from snapshots, %d errs returned", len(trees), len(errs)) for _, err := range errs { @@ -376,7 +377,7 @@ func (c *Checker) Structure(ctx context.Context, errChan chan<- error) { c.blobRefs.M.Insert(h) c.blobRefs.Unlock() return blobReferenced - }, nil) + }, p) defer close(errChan) for i := 0; i < defaultParallelism; i++ { diff --git a/internal/checker/checker_test.go b/internal/checker/checker_test.go index f8efd05e8..ad1b15f1a 100644 --- a/internal/checker/checker_test.go +++ b/internal/checker/checker_test.go @@ -43,7 +43,9 @@ func checkPacks(chkr *checker.Checker) []error { } func checkStruct(chkr *checker.Checker) []error { - return collectErrors(context.TODO(), chkr.Structure) + return collectErrors(context.TODO(), func(ctx context.Context, errChan chan<- error) { + chkr.Structure(ctx, nil, errChan) + }) } func checkData(chkr *checker.Checker) []error { diff --git a/internal/checker/testing.go b/internal/checker/testing.go index 6c5be84e2..d672911b1 100644 --- a/internal/checker/testing.go +++ b/internal/checker/testing.go @@ -30,7 +30,7 @@ func TestCheckRepo(t testing.TB, repo restic.Repository) { // structure errChan = make(chan error) - go chkr.Structure(context.TODO(), errChan) + go chkr.Structure(context.TODO(), nil, errChan) for err := range errChan { t.Error(err) diff --git a/internal/repository/master_index_test.go b/internal/repository/master_index_test.go index 9ccf0e59e..3c279696e 100644 --- a/internal/repository/master_index_test.go +++ b/internal/repository/master_index_test.go @@ -368,7 +368,7 @@ func TestIndexSave(t *testing.T) { defer cancel() errCh := make(chan error) - go checker.Structure(ctx, errCh) + go checker.Structure(ctx, nil, errCh) i := 0 for err := range errCh { t.Errorf("checker returned error: %v", err) From 313ad0e32fcb7d85ed1a380afd027e7f42223638 Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Sun, 6 Dec 2020 00:18:50 +0100 Subject: [PATCH 10/13] progress/counter: Fix test for final report call --- internal/ui/progress/counter_test.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/internal/ui/progress/counter_test.go b/internal/ui/progress/counter_test.go index 84d1e5a64..49a99f7ee 100644 --- a/internal/ui/progress/counter_test.go +++ b/internal/ui/progress/counter_test.go @@ -22,7 +22,9 @@ func TestCounter(t *testing.T) { ) report := func(value uint64, total uint64, d time.Duration, final bool) { - finalSeen = true + if final { + finalSeen = true + } if value < last { increasing = false } From ddb7697d29c23293442b4653f14602f4a20bf3a6 Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Sun, 6 Dec 2020 00:59:24 +0100 Subject: [PATCH 11/13] restic: Test progress reporting of StreamTrees --- internal/restic/find_test.go | 43 ++++++++++++++++++++++++++++++++- internal/ui/progress/counter.go | 7 +++--- 2 files changed, 46 insertions(+), 4 deletions(-) diff --git a/internal/restic/find_test.go b/internal/restic/find_test.go index 278a7471a..c599e5fdb 100644 --- a/internal/restic/find_test.go +++ b/internal/restic/find_test.go @@ -15,6 +15,8 @@ import ( "github.com/restic/restic/internal/errors" "github.com/restic/restic/internal/repository" "github.com/restic/restic/internal/restic" + "github.com/restic/restic/internal/test" + "github.com/restic/restic/internal/ui/progress" ) func loadIDSet(t testing.TB, filename string) restic.BlobSet { @@ -92,9 +94,12 @@ func TestFindUsedBlobs(t *testing.T) { snapshots = append(snapshots, sn) } + p := progress.New(time.Second, findTestSnapshots, func(value uint64, total uint64, runtime time.Duration, final bool) {}) + defer p.Done() + for i, sn := range snapshots { usedBlobs := restic.NewBlobSet() - err := restic.FindUsedBlobs(context.TODO(), repo, restic.IDs{*sn.Tree}, usedBlobs, nil) + err := restic.FindUsedBlobs(context.TODO(), repo, restic.IDs{*sn.Tree}, usedBlobs, p) if err != nil { t.Errorf("FindUsedBlobs returned error: %v", err) continue @@ -105,6 +110,8 @@ func TestFindUsedBlobs(t *testing.T) { continue } + test.Equals(t, p.Get(), uint64(i+1)) + goldenFilename := filepath.Join("testdata", fmt.Sprintf("used_blobs_snapshot%d", i)) want := loadIDSet(t, goldenFilename) @@ -119,6 +126,40 @@ func TestFindUsedBlobs(t *testing.T) { } } +func TestMultiFindUsedBlobs(t *testing.T) { + repo, cleanup := repository.TestRepository(t) + defer cleanup() + + var snapshotTrees restic.IDs + for i := 0; i < findTestSnapshots; i++ { + sn := restic.TestCreateSnapshot(t, repo, findTestTime.Add(time.Duration(i)*time.Second), findTestDepth, 0) + t.Logf("snapshot %v saved, tree %v", sn.ID().Str(), sn.Tree.Str()) + snapshotTrees = append(snapshotTrees, *sn.Tree) + } + + want := restic.NewBlobSet() + for i := range snapshotTrees { + goldenFilename := filepath.Join("testdata", fmt.Sprintf("used_blobs_snapshot%d", i)) + want.Merge(loadIDSet(t, goldenFilename)) + } + + p := progress.New(time.Second, findTestSnapshots, func(value uint64, total uint64, runtime time.Duration, final bool) {}) + defer p.Done() + + // run twice to check progress bar handling of duplicate tree roots + usedBlobs := restic.NewBlobSet() + for i := 1; i < 3; i++ { + err := restic.FindUsedBlobs(context.TODO(), repo, snapshotTrees, usedBlobs, p) + test.OK(t, err) + test.Equals(t, p.Get(), uint64(i*len(snapshotTrees))) + + if !want.Equals(usedBlobs) { + t.Errorf("wrong list of blobs returned:\n missing blobs: %v\n extra blobs: %v", + want.Sub(usedBlobs), usedBlobs.Sub(want)) + } + } +} + type ForbiddenRepo struct{} func (r ForbiddenRepo) LoadTree(ctx context.Context, id restic.ID) (*restic.Tree, error) { diff --git a/internal/ui/progress/counter.go b/internal/ui/progress/counter.go index 82d5e39ba..d2f75c9bf 100644 --- a/internal/ui/progress/counter.go +++ b/internal/ui/progress/counter.go @@ -82,7 +82,8 @@ func (c *Counter) Done() { *c = Counter{} // Prevent reuse. } -func (c *Counter) get() uint64 { +// Get the current Counter value. This method is concurrency-safe. +func (c *Counter) Get() uint64 { c.valueMutex.Lock() v := c.value c.valueMutex.Unlock() @@ -102,7 +103,7 @@ func (c *Counter) run() { defer close(c.stopped) defer func() { // Must be a func so that time.Since isn't called at defer time. - c.report(c.get(), c.getMax(), time.Since(c.start), true) + c.report(c.Get(), c.getMax(), time.Since(c.start), true) }() var tick <-chan time.Time @@ -122,6 +123,6 @@ func (c *Counter) run() { return } - c.report(c.get(), c.getMax(), now.Sub(c.start), false) + c.report(c.Get(), c.getMax(), now.Sub(c.start), false) } } From 1e306be00033a64de8af0155648892c25a03395a Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Sun, 6 Dec 2020 01:08:07 +0100 Subject: [PATCH 12/13] Add changelog entry --- changelog/unreleased/pull-3106 | 10 ++++++++++ 1 file changed, 10 insertions(+) create mode 100644 changelog/unreleased/pull-3106 diff --git a/changelog/unreleased/pull-3106 b/changelog/unreleased/pull-3106 new file mode 100644 index 000000000..67d80b4d0 --- /dev/null +++ b/changelog/unreleased/pull-3106 @@ -0,0 +1,10 @@ +Enhancement: Parallelize scan of snapshot content in copy and prune + +The copy and the prune commands used to traverse the directories of +snapshots one by one to find used data. This snapshot traversal is +now parallized which can speed up this step several times. + +In addition the check command now reports how many snapshots have +already been processed. + +https://github.com/restic/restic/pull/3106 From 68608a89adb94354e533682c4f2034929dadc264 Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Tue, 29 Dec 2020 17:29:00 +0100 Subject: [PATCH 13/13] restic: add comment about StreamTrees shutdown --- internal/restic/tree_stream.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/internal/restic/tree_stream.go b/internal/restic/tree_stream.go index 0c2a96810..871ba8998 100644 --- a/internal/restic/tree_stream.go +++ b/internal/restic/tree_stream.go @@ -147,7 +147,9 @@ func filterTrees(ctx context.Context, trees IDs, loaderChan chan<- trackedID, } // StreamTrees iteratively loads the given trees and their subtrees. The skip method -// is guaranteed to always be called from the same goroutine. +// is guaranteed to always be called from the same goroutine. To shutdown the started +// goroutines, either read all items from the channel or cancel the context. Then `Wait()` +// on the errgroup until all goroutines were stopped. func StreamTrees(ctx context.Context, wg *errgroup.Group, repo TreeLoader, trees IDs, skip func(tree ID) bool, p *progress.Counter) <-chan TreeItem { loaderChan := make(chan trackedID) loadedTreeChan := make(chan trackedTreeItem)