From ab8b97c4ba2b8d9dc5cb9cf39ed049d47c9fbcdb Mon Sep 17 00:00:00 2001 From: Alexander Neumann Date: Sun, 8 Mar 2015 20:57:21 +0100 Subject: [PATCH] Remove code duplication The top-level tree is now handled by the DirWorkers --- archiver.go | 46 +++++++++++--------------- pipe/pipe.go | 12 +++++-- pipe/pipe_test.go | 84 +++++++++++++++++++++++++++++++++++++++++++++++ walk.go | 4 +-- 4 files changed, 114 insertions(+), 32 deletions(-) diff --git a/archiver.go b/archiver.go index 69d7d8053..c1b9a3578 100644 --- a/archiver.go +++ b/archiver.go @@ -513,11 +513,21 @@ func (arch *Archiver) dirWorker(wg *sync.WaitGroup, p *Progress, done <-chan str } } - node, err := NodeFromFileInfo(dir.Path(), dir.Info()) - if err != nil { - node.Error = err.Error() - dir.Result() <- node - continue + var ( + node *Node + err error + ) + if dir.Path() == "" { + // if this is the top-level dir, only create a stub node + node = &Node{} + } else { + // else create note from path and fi + node, err = NodeFromFileInfo(dir.Path(), dir.Info()) + if err != nil { + node.Error = err.Error() + dir.Result() <- node + continue + } } blob, err := arch.SaveTreeJSON(tree) @@ -775,28 +785,10 @@ func (arch *Archiver) Snapshot(p *Progress, paths []string, pid backend.ID) (*Sn debug.Log("Archiver.Snapshot", "workers terminated") - // add the top-level tree - tree := NewTree() - root := (<-resCh).(pipe.Dir) - for i := 0; i < len(paths); i++ { - node := (<-root.Entries[i]).(*Node) - - debug.Log("Archiver.Snapshot", "got toplevel node %v, %d/%d blobs", node, len(node.Content), len(node.blobs)) - - tree.Insert(node) - for _, blob := range node.blobs { - debug.Log("Archiver.Snapshot", " add toplevel blob %v", blob) - blob = arch.m.Insert(blob) - tree.Map.Insert(blob) - } - } - - tb, err := arch.SaveTreeJSON(tree) - if err != nil { - return nil, nil, err - } - - sn.Tree = tb + // receive the top-level tree + root := (<-resCh).(*Node) + debug.Log("Archiver.Snapshot", "root node received: %#v", root.blobs[0]) + sn.Tree = root.blobs[0] // save snapshot blob, err := arch.s.SaveJSON(backend.Snapshot, sn) diff --git a/pipe/pipe.go b/pipe/pipe.go index a8a96d0e1..4ff380d3e 100644 --- a/pipe/pipe.go +++ b/pipe/pipe.go @@ -150,7 +150,6 @@ func walk(basedir, path string, done chan struct{}, jobs chan<- Job, res chan<- func Walk(paths []string, done chan struct{}, jobs chan<- Job, res chan<- Result) error { defer func() { debug.Log("pipe.Walk", "output channel closed") - close(res) close(jobs) }() @@ -165,7 +164,16 @@ func Walk(paths []string, done chan struct{}, jobs chan<- Job, res chan<- Result } debug.Log("pipe.Walk", "walker for %v done", path) } - res <- Dir{Entries: entries} + + debug.Log("pipe.Walk", "sending root node") + select { + case <-done: + return errCancelled + case jobs <- Dir{Entries: entries, result: res}: + } + + debug.Log("pipe.Walk", "walker done") + return nil } diff --git a/pipe/pipe_test.go b/pipe/pipe_test.go index 1baf0b34d..d0deffccc 100644 --- a/pipe/pipe_test.go +++ b/pipe/pipe_test.go @@ -54,6 +54,9 @@ func TestPipelineWalkerWithSplit(t *testing.T) { t.Logf("walking path %s with %d dirs, %d files", *testWalkerPath, before.dirs, before.files) + // account for top level dir + before.dirs++ + after := stats{} m := sync.Mutex{} @@ -142,6 +145,9 @@ func TestPipelineWalker(t *testing.T) { t.Logf("walking path %s with %d dirs, %d files", *testWalkerPath, before.dirs, before.files) + // account for top level dir + before.dirs++ + after := stats{} m := sync.Mutex{} @@ -305,3 +311,81 @@ func BenchmarkPipelineWalker(b *testing.B) { b.Logf("max duration for a dir: %v", max) } } + +func TestPipelineWalkerMultiple(t *testing.T) { + if *testWalkerPath == "" { + t.Skipf("walkerpath not set, skipping TestPipelineWalker") + } + + paths, err := filepath.Glob(filepath.Join(*testWalkerPath, "*")) + + before, err := statPath(*testWalkerPath) + ok(t, err) + + t.Logf("walking paths %v with %d dirs, %d files", paths, + before.dirs, before.files) + + after := stats{} + m := sync.Mutex{} + + worker := func(wg *sync.WaitGroup, done <-chan struct{}, jobs <-chan pipe.Job) { + defer wg.Done() + for { + select { + case job, ok := <-jobs: + if !ok { + // channel is closed + return + } + assert(t, job != nil, "job is nil") + + switch j := job.(type) { + case pipe.Dir: + // wait for all content + for _, ch := range j.Entries { + <-ch + } + + m.Lock() + after.dirs++ + m.Unlock() + + j.Result() <- true + case pipe.Entry: + m.Lock() + after.files++ + m.Unlock() + + j.Result() <- true + } + + case <-done: + // pipeline was cancelled + return + } + } + } + + var wg sync.WaitGroup + done := make(chan struct{}) + jobs := make(chan pipe.Job) + + for i := 0; i < *maxWorkers; i++ { + wg.Add(1) + go worker(&wg, done, jobs) + } + + resCh := make(chan pipe.Result, 1) + err = pipe.Walk(paths, done, jobs, resCh) + ok(t, err) + + // wait for all workers to terminate + wg.Wait() + + // wait for top-level blob + <-resCh + + t.Logf("walked %d paths with %d dirs, %d files", len(paths), after.dirs, after.files) + + assert(t, before == after, "stats do not match, expected %v, got %v", before, after) +} diff --git a/walk.go b/walk.go index 83e2db726..92e2005f5 100644 --- a/walk.go +++ b/walk.go @@ -43,9 +43,7 @@ func walkTree(s Server, path string, id backend.ID, done chan struct{}, jobCh ch } } - if path != "" { - jobCh <- WalkTreeJob{Path: filepath.Join(path), Tree: t} - } + jobCh <- WalkTreeJob{Path: filepath.Join(path), Tree: t} debug.Log("walkTree", "done for %q (%v)", path, id.Str()) }