2
2
mirror of https://github.com/octoleo/restic.git synced 2024-12-23 03:18:55 +00:00

Merge pull request #3654 from MichaelEischer/limit-huge-tree-streams

Limit number of large tree blobs loaded in parallel by StreamTrees
This commit is contained in:
Alexander Neumann 2022-03-21 11:01:04 +01:00 committed by GitHub
commit 0937008648
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 28 additions and 7 deletions

View File

@ -11,6 +11,7 @@ import (
// TreeLoader loads a tree from a repository. // TreeLoader loads a tree from a repository.
type TreeLoader interface { type TreeLoader interface {
LoadTree(context.Context, ID) (*Tree, error) LoadTree(context.Context, ID) (*Tree, error)
LookupBlobSize(id ID, tpe BlobType) (uint, bool)
} }
// FindUsedBlobs traverses the tree ID and adds all seen blobs (trees and data // FindUsedBlobs traverses the tree ID and adds all seen blobs (trees and data

View File

@ -166,6 +166,10 @@ func (r ForbiddenRepo) LoadTree(ctx context.Context, id restic.ID) (*restic.Tree
return nil, errors.New("should not be called") return nil, errors.New("should not be called")
} }
func (r ForbiddenRepo) LookupBlobSize(id restic.ID, tpe restic.BlobType) (uint, bool) {
return 0, false
}
func TestFindUsedBlobsSkipsSeenBlobs(t *testing.T) { func TestFindUsedBlobsSkipsSeenBlobs(t *testing.T) {
repo, cleanup := repository.TestRepository(t) repo, cleanup := repository.TestRepository(t)
defer cleanup() defer cleanup()

View File

@ -10,7 +10,7 @@ import (
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
) )
const streamTreeParallelism = 5 const streamTreeParallelism = 6
// TreeItem is used to return either an error or the tree for a tree id // TreeItem is used to return either an error or the tree for a tree id
type TreeItem struct { type TreeItem struct {
@ -46,7 +46,7 @@ func loadTreeWorker(ctx context.Context, repo TreeLoader,
} }
} }
func filterTrees(ctx context.Context, trees IDs, loaderChan chan<- trackedID, func filterTrees(ctx context.Context, repo TreeLoader, trees IDs, loaderChan chan<- trackedID, hugeTreeLoaderChan chan<- trackedID,
in <-chan trackedTreeItem, out chan<- TreeItem, skip func(tree ID) bool, p *progress.Counter) { in <-chan trackedTreeItem, out chan<- TreeItem, skip func(tree ID) bool, p *progress.Counter) {
var ( var (
@ -78,7 +78,12 @@ func filterTrees(ctx context.Context, trees IDs, loaderChan chan<- trackedID,
continue continue
} }
loadCh = loaderChan treeSize, found := repo.LookupBlobSize(nextTreeID.ID, TreeBlob)
if found && treeSize > 50*1024*1024 {
loadCh = hugeTreeLoaderChan
} else {
loadCh = loaderChan
}
} }
if loadCh == nil && outCh == nil && outstandingLoadTreeJobs == 0 { if loadCh == nil && outCh == nil && outstandingLoadTreeJobs == 0 {
@ -152,16 +157,21 @@ func filterTrees(ctx context.Context, trees IDs, loaderChan chan<- trackedID,
// on the errgroup until all goroutines were stopped. // on the errgroup until all goroutines were stopped.
func StreamTrees(ctx context.Context, wg *errgroup.Group, repo TreeLoader, trees IDs, skip func(tree ID) bool, p *progress.Counter) <-chan TreeItem { func StreamTrees(ctx context.Context, wg *errgroup.Group, repo TreeLoader, trees IDs, skip func(tree ID) bool, p *progress.Counter) <-chan TreeItem {
loaderChan := make(chan trackedID) loaderChan := make(chan trackedID)
hugeTreeChan := make(chan trackedID, 10)
loadedTreeChan := make(chan trackedTreeItem) loadedTreeChan := make(chan trackedTreeItem)
treeStream := make(chan TreeItem) treeStream := make(chan TreeItem)
var loadTreeWg sync.WaitGroup var loadTreeWg sync.WaitGroup
for i := 0; i < streamTreeParallelism; i++ { for i := 0; i < streamTreeParallelism; i++ {
workerLoaderChan := loaderChan
if i == 0 {
workerLoaderChan = hugeTreeChan
}
loadTreeWg.Add(1) loadTreeWg.Add(1)
wg.Go(func() error { wg.Go(func() error {
defer loadTreeWg.Done() defer loadTreeWg.Done()
loadTreeWorker(ctx, repo, loaderChan, loadedTreeChan) loadTreeWorker(ctx, repo, workerLoaderChan, loadedTreeChan)
return nil return nil
}) })
} }
@ -175,8 +185,9 @@ func StreamTrees(ctx context.Context, wg *errgroup.Group, repo TreeLoader, trees
wg.Go(func() error { wg.Go(func() error {
defer close(loaderChan) defer close(loaderChan)
defer close(hugeTreeChan)
defer close(treeStream) defer close(treeStream)
filterTrees(ctx, trees, loaderChan, loadedTreeChan, treeStream, skip, p) filterTrees(ctx, repo, trees, loaderChan, hugeTreeChan, loadedTreeChan, treeStream, skip, p)
return nil return nil
}) })
return treeStream return treeStream

View File

@ -10,6 +10,11 @@ import (
"github.com/restic/restic/internal/restic" "github.com/restic/restic/internal/restic"
) )
// TreeLoader loads a tree from a repository.
type TreeLoader interface {
LoadTree(context.Context, restic.ID) (*restic.Tree, error)
}
// ErrSkipNode is returned by WalkFunc when a dir node should not be walked. // ErrSkipNode is returned by WalkFunc when a dir node should not be walked.
var ErrSkipNode = errors.New("skip this node") var ErrSkipNode = errors.New("skip this node")
@ -33,7 +38,7 @@ type WalkFunc func(parentTreeID restic.ID, path string, node *restic.Node, nodeE
// Walk calls walkFn recursively for each node in root. If walkFn returns an // Walk calls walkFn recursively for each node in root. If walkFn returns an
// error, it is passed up the call stack. The trees in ignoreTrees are not // error, it is passed up the call stack. The trees in ignoreTrees are not
// walked. If walkFn ignores trees, these are added to the set. // walked. If walkFn ignores trees, these are added to the set.
func Walk(ctx context.Context, repo restic.TreeLoader, root restic.ID, ignoreTrees restic.IDSet, walkFn WalkFunc) error { func Walk(ctx context.Context, repo TreeLoader, root restic.ID, ignoreTrees restic.IDSet, walkFn WalkFunc) error {
tree, err := repo.LoadTree(ctx, root) tree, err := repo.LoadTree(ctx, root)
_, err = walkFn(root, "/", nil, err) _, err = walkFn(root, "/", nil, err)
@ -55,7 +60,7 @@ func Walk(ctx context.Context, repo restic.TreeLoader, root restic.ID, ignoreTre
// walk recursively traverses the tree, ignoring subtrees when the ID of the // walk recursively traverses the tree, ignoring subtrees when the ID of the
// subtree is in ignoreTrees. If err is nil and ignore is true, the subtree ID // subtree is in ignoreTrees. If err is nil and ignore is true, the subtree ID
// will be added to ignoreTrees by walk. // will be added to ignoreTrees by walk.
func walk(ctx context.Context, repo restic.TreeLoader, prefix string, parentTreeID restic.ID, tree *restic.Tree, ignoreTrees restic.IDSet, walkFn WalkFunc) (ignore bool, err error) { func walk(ctx context.Context, repo TreeLoader, prefix string, parentTreeID restic.ID, tree *restic.Tree, ignoreTrees restic.IDSet, walkFn WalkFunc) (ignore bool, err error) {
var allNodesIgnored = true var allNodesIgnored = true
if len(tree.Nodes) == 0 { if len(tree.Nodes) == 0 {