diff --git a/internal/restic/find.go b/internal/restic/find.go index 4d6433d60..c7406d750 100644 --- a/internal/restic/find.go +++ b/internal/restic/find.go @@ -11,6 +11,7 @@ import ( // TreeLoader loads a tree from a repository. type TreeLoader interface { LoadTree(context.Context, ID) (*Tree, error) + LookupBlobSize(id ID, tpe BlobType) (uint, bool) } // FindUsedBlobs traverses the tree ID and adds all seen blobs (trees and data diff --git a/internal/restic/find_test.go b/internal/restic/find_test.go index c599e5fdb..4d9bc5a13 100644 --- a/internal/restic/find_test.go +++ b/internal/restic/find_test.go @@ -166,6 +166,10 @@ func (r ForbiddenRepo) LoadTree(ctx context.Context, id restic.ID) (*restic.Tree return nil, errors.New("should not be called") } +func (r ForbiddenRepo) LookupBlobSize(id restic.ID, tpe restic.BlobType) (uint, bool) { + return 0, false +} + func TestFindUsedBlobsSkipsSeenBlobs(t *testing.T) { repo, cleanup := repository.TestRepository(t) defer cleanup() diff --git a/internal/restic/tree_stream.go b/internal/restic/tree_stream.go index 871ba8998..f6982efc2 100644 --- a/internal/restic/tree_stream.go +++ b/internal/restic/tree_stream.go @@ -10,7 +10,7 @@ import ( "golang.org/x/sync/errgroup" ) -const streamTreeParallelism = 5 +const streamTreeParallelism = 6 // TreeItem is used to return either an error or the tree for a tree id type TreeItem struct { @@ -46,7 +46,7 @@ func loadTreeWorker(ctx context.Context, repo TreeLoader, } } -func filterTrees(ctx context.Context, trees IDs, loaderChan chan<- trackedID, +func filterTrees(ctx context.Context, repo TreeLoader, trees IDs, loaderChan chan<- trackedID, hugeTreeLoaderChan chan<- trackedID, in <-chan trackedTreeItem, out chan<- TreeItem, skip func(tree ID) bool, p *progress.Counter) { var ( @@ -78,7 +78,12 @@ func filterTrees(ctx context.Context, trees IDs, loaderChan chan<- trackedID, continue } - loadCh = loaderChan + treeSize, found := repo.LookupBlobSize(nextTreeID.ID, TreeBlob) + if found && treeSize > 50*1024*1024 { + loadCh = hugeTreeLoaderChan + } else { + loadCh = loaderChan + } } if loadCh == nil && outCh == nil && outstandingLoadTreeJobs == 0 { @@ -152,16 +157,21 @@ func filterTrees(ctx context.Context, trees IDs, loaderChan chan<- trackedID, // on the errgroup until all goroutines were stopped. func StreamTrees(ctx context.Context, wg *errgroup.Group, repo TreeLoader, trees IDs, skip func(tree ID) bool, p *progress.Counter) <-chan TreeItem { loaderChan := make(chan trackedID) + hugeTreeChan := make(chan trackedID, 10) loadedTreeChan := make(chan trackedTreeItem) treeStream := make(chan TreeItem) var loadTreeWg sync.WaitGroup for i := 0; i < streamTreeParallelism; i++ { + workerLoaderChan := loaderChan + if i == 0 { + workerLoaderChan = hugeTreeChan + } loadTreeWg.Add(1) wg.Go(func() error { defer loadTreeWg.Done() - loadTreeWorker(ctx, repo, loaderChan, loadedTreeChan) + loadTreeWorker(ctx, repo, workerLoaderChan, loadedTreeChan) return nil }) } @@ -175,8 +185,9 @@ func StreamTrees(ctx context.Context, wg *errgroup.Group, repo TreeLoader, trees wg.Go(func() error { defer close(loaderChan) + defer close(hugeTreeChan) defer close(treeStream) - filterTrees(ctx, trees, loaderChan, loadedTreeChan, treeStream, skip, p) + filterTrees(ctx, repo, trees, loaderChan, hugeTreeChan, loadedTreeChan, treeStream, skip, p) return nil }) return treeStream diff --git a/internal/walker/walker.go b/internal/walker/walker.go index e06343ac3..3c8e723a8 100644 --- a/internal/walker/walker.go +++ b/internal/walker/walker.go @@ -10,6 +10,11 @@ import ( "github.com/restic/restic/internal/restic" ) +// TreeLoader loads a tree from a repository. +type TreeLoader interface { + LoadTree(context.Context, restic.ID) (*restic.Tree, error) +} + // ErrSkipNode is returned by WalkFunc when a dir node should not be walked. var ErrSkipNode = errors.New("skip this node") @@ -33,7 +38,7 @@ type WalkFunc func(parentTreeID restic.ID, path string, node *restic.Node, nodeE // Walk calls walkFn recursively for each node in root. If walkFn returns an // error, it is passed up the call stack. The trees in ignoreTrees are not // walked. If walkFn ignores trees, these are added to the set. -func Walk(ctx context.Context, repo restic.TreeLoader, root restic.ID, ignoreTrees restic.IDSet, walkFn WalkFunc) error { +func Walk(ctx context.Context, repo TreeLoader, root restic.ID, ignoreTrees restic.IDSet, walkFn WalkFunc) error { tree, err := repo.LoadTree(ctx, root) _, err = walkFn(root, "/", nil, err) @@ -55,7 +60,7 @@ func Walk(ctx context.Context, repo restic.TreeLoader, root restic.ID, ignoreTre // walk recursively traverses the tree, ignoring subtrees when the ID of the // subtree is in ignoreTrees. If err is nil and ignore is true, the subtree ID // will be added to ignoreTrees by walk. -func walk(ctx context.Context, repo restic.TreeLoader, prefix string, parentTreeID restic.ID, tree *restic.Tree, ignoreTrees restic.IDSet, walkFn WalkFunc) (ignore bool, err error) { +func walk(ctx context.Context, repo TreeLoader, prefix string, parentTreeID restic.ID, tree *restic.Tree, ignoreTrees restic.IDSet, walkFn WalkFunc) (ignore bool, err error) { var allNodesIgnored = true if len(tree.Nodes) == 0 {