From 23aeca85fff13aacce341d67d2cafc4f0a195214 Mon Sep 17 00:00:00 2001 From: Alexander Neumann Date: Tue, 27 Oct 2015 22:44:10 +0100 Subject: [PATCH] load trees in parallel --- walk.go | 162 +++++++++++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 143 insertions(+), 19 deletions(-) diff --git a/walk.go b/walk.go index 1e5979328..5cceb98b0 100644 --- a/walk.go +++ b/walk.go @@ -2,11 +2,14 @@ package restic import ( "path/filepath" + "sync" "github.com/restic/restic/backend" "github.com/restic/restic/debug" + "github.com/restic/restic/pack" ) +// WalkTreeJob is a job sent from the tree walker. type WalkTreeJob struct { Path string Error error @@ -15,47 +18,168 @@ type WalkTreeJob struct { Tree *Tree } -func walkTree(repo TreeLoader, path string, treeID backend.ID, done chan struct{}, jobCh chan<- WalkTreeJob) { - debug.Log("walkTree", "start on %q (%v)", path, treeID.Str()) +// TreeWalker traverses a tree in the repository depth-first and sends a job +// for each item (file or dir) that it encounters. +type TreeWalker struct { + ch chan<- loadTreeJob + out chan<- WalkTreeJob +} - t, err := LoadTree(repo, treeID) - if err != nil { +// NewTreeWalker uses ch to load trees from the repository and sends jobs to +// out. +func NewTreeWalker(ch chan<- loadTreeJob, out chan<- WalkTreeJob) *TreeWalker { + return &TreeWalker{ch: ch, out: out} +} + +// Walk starts walking the tree given by id. When the channel done is closed, +// processing stops. +func (tw *TreeWalker) Walk(path string, id backend.ID, done chan struct{}) { + debug.Log("TreeWalker.Walk", "starting on tree %v for %v", id.Str(), path) + defer debug.Log("TreeWalker.Walk", "done walking tree %v for %v", id.Str(), path) + + resCh := make(chan loadTreeResult, 1) + tw.ch <- loadTreeJob{ + id: id, + res: resCh, + } + + res := <-resCh + if res.err != nil { select { - case jobCh <- WalkTreeJob{Path: path, Error: err}: + case tw.out <- WalkTreeJob{Path: path, Error: res.err}: case <-done: return } return } - for _, node := range t.Nodes { - p := filepath.Join(path, node.Name) + tw.walk(path, res.tree, done) + + select { + case tw.out <- WalkTreeJob{Path: path, Tree: res.tree}: + case <-done: + return + } +} + +func (tw *TreeWalker) walk(path string, tree *Tree, done chan struct{}) { + debug.Log("TreeWalker.walk", "start on %q", path) + defer debug.Log("TreeWalker.walk", "done for %q", path) + + // load all subtrees in parallel + results := make([]<-chan loadTreeResult, len(tree.Nodes)) + for i, node := range tree.Nodes { if node.Type == "dir" { - walkTree(repo, p, *node.Subtree, done, jobCh) + resCh := make(chan loadTreeResult, 1) + tw.ch <- loadTreeJob{ + id: *node.Subtree, + res: resCh, + } + + results[i] = resCh + } + } + + for i, node := range tree.Nodes { + p := filepath.Join(path, node.Name) + var job WalkTreeJob + + if node.Type == "dir" { + if results[i] == nil { + panic("result chan should not be nil") + } + + res := <-results[i] + tw.walk(p, res.tree, done) + + job = WalkTreeJob{Path: p, Tree: res.tree, Error: res.err} } else { + job = WalkTreeJob{Path: p, Node: node} + } + + select { + case tw.out <- job: + case <-done: + return + } + } +} + +type loadTreeResult struct { + tree *Tree + err error +} + +type loadTreeJob struct { + id backend.ID + res chan<- loadTreeResult +} + +type treeLoader func(backend.ID) (*Tree, error) + +func loadTreeWorker(wg *sync.WaitGroup, in <-chan loadTreeJob, load treeLoader, done <-chan struct{}) { + debug.Log("loadTreeWorker", "start") + defer debug.Log("loadTreeWorker", "exit") + defer wg.Done() + + for { + select { + case <-done: + debug.Log("loadTreeWorker", "done channel closed") + return + case job, ok := <-in: + if !ok { + debug.Log("loadTreeWorker", "input channel closed, exiting") + return + } + + debug.Log("loadTreeWorker", "received job to load tree %v", job.id.Str()) + tree, err := load(job.id) + + debug.Log("loadTreeWorker", "tree %v loaded, error %v", job.id.Str(), err) + select { - case jobCh <- WalkTreeJob{Path: p, Node: node}: + case job.res <- loadTreeResult{tree, err}: + debug.Log("loadTreeWorker", "job result sent") case <-done: + debug.Log("loadTreeWorker", "done channel closed before result could be sent") return } } } - - select { - case jobCh <- WalkTreeJob{Path: path, Tree: t}: - case <-done: - return - } - - debug.Log("walkTree", "done for %q (%v)", path, treeID.Str()) } +const loadTreeWorkers = 10 + // WalkTree walks the tree specified by id recursively and sends a job for each // file and directory it finds. When the channel done is closed, processing // stops. func WalkTree(repo TreeLoader, id backend.ID, done chan struct{}, jobCh chan<- WalkTreeJob) { - debug.Log("WalkTree", "start on %v", id.Str()) - walkTree(repo, "", id, done, jobCh) + debug.Log("WalkTree", "start on %v, start workers", id.Str()) + + load := func(id backend.ID) (*Tree, error) { + tree := &Tree{} + err := repo.LoadJSONPack(pack.Tree, id, tree) + if err != nil { + return nil, err + } + return tree, nil + } + + ch := make(chan loadTreeJob) + + var wg sync.WaitGroup + for i := 0; i < loadTreeWorkers; i++ { + wg.Add(1) + go loadTreeWorker(&wg, ch, load, done) + } + + tw := NewTreeWalker(ch, jobCh) + tw.Walk("", id, done) close(jobCh) + + close(ch) + wg.Wait() + debug.Log("WalkTree", "done") }