From f8f8107d55430552b40e5899a1d01f863e576849 Mon Sep 17 00:00:00 2001 From: Alexander Neumann Date: Sun, 15 Feb 2015 14:44:54 +0100 Subject: [PATCH] wip --- archiver.go | 145 +++++++++++++++++++++++++++++++++------ archiver_test.go | 5 +- cmd/restic/cmd_backup.go | 2 +- node.go | 5 +- pipe/pipe.go | 24 ++++--- pipe/pipe_test.go | 44 +++++++----- testsuite.sh | 2 +- tree.go | 3 +- 8 files changed, 171 insertions(+), 59 deletions(-) diff --git a/archiver.go b/archiver.go index db20195db..b4af3da5b 100644 --- a/archiver.go +++ b/archiver.go @@ -12,11 +12,12 @@ import ( "github.com/restic/restic/backend" "github.com/restic/restic/chunker" "github.com/restic/restic/debug" + "github.com/restic/restic/pipe" ) const ( - maxConcurrentFiles = 16 - maxConcurrentBlobs = 16 + maxConcurrentBlobs = 32 + maxConcurrency = 10 // chunkerBufSize is used in pool.go chunkerBufSize = 512 * chunker.KiB @@ -26,7 +27,6 @@ type Archiver struct { s Server m *Map - fileToken chan struct{} blobToken chan struct{} Error func(dir string, fi os.FileInfo, err error) error @@ -40,15 +40,10 @@ func NewArchiver(s Server, p *Progress) (*Archiver, error) { arch := &Archiver{ s: s, p: p, - fileToken: make(chan struct{}, maxConcurrentFiles), blobToken: make(chan struct{}, maxConcurrentBlobs), } - // fill file and blob token - for i := 0; i < maxConcurrentFiles; i++ { - arch.fileToken <- struct{}{} - } - + // fill blob token for i := 0; i < maxConcurrentBlobs; i++ { arch.blobToken <- struct{}{} } @@ -283,16 +278,10 @@ func (arch *Archiver) saveTree(t *Tree) (Blob, error) { } if len(node.Content) == 0 { - // get token - token := <-arch.fileToken - // start goroutine wg.Add(1) go func(n *Node) { defer wg.Done() - defer func() { - arch.fileToken <- token - }() var blobs Blobs blobs, n.err = arch.SaveFile(n) @@ -354,27 +343,143 @@ func (arch *Archiver) saveTree(t *Tree) (Blob, error) { return blob, nil } -func (arch *Archiver) Snapshot(dir string, t *Tree, parentSnapshot backend.ID) (*Snapshot, backend.ID, error) { +func (arch *Archiver) Snapshot(path string, parentSnapshot backend.ID) (*Snapshot, backend.ID, error) { debug.Break("Archiver.Snapshot") arch.p.Start() defer arch.p.Done() - sn, err := NewSnapshot(dir) + sn, err := NewSnapshot(path) if err != nil { return nil, nil, err } sn.Parent = parentSnapshot - blob, err := arch.saveTree(t) + done := make(chan struct{}) + entCh := make(chan pipe.Entry) + dirCh := make(chan pipe.Dir) + + fileWorker := func(wg *sync.WaitGroup, done <-chan struct{}, entCh <-chan pipe.Entry) { + defer wg.Done() + for { + select { + case e, ok := <-entCh: + if !ok { + // channel is closed + return + } + + node, err := NodeFromFileInfo(e.Path, e.Info) + if err != nil { + panic(err) + } + + if node.Type == "file" { + node.blobs, err = arch.SaveFile(node) + if err != nil { + panic(err) + } + } + + e.Result <- node + case <-done: + // pipeline was cancelled + return + } + } + } + + dirWorker := func(wg *sync.WaitGroup, done <-chan struct{}, dirCh <-chan pipe.Dir) { + defer wg.Done() + for { + select { + case dir, ok := <-dirCh: + if !ok { + // channel is closed + return + } + + tree := NewTree() + + // wait for all content + for _, ch := range dir.Entries { + node := (<-ch).(*Node) + tree.Insert(node) + + if node.Type == "dir" { + debug.Log("Archiver.DirWorker", "got tree node for %s: %v", node.path, node.blobs) + } + + for _, blob := range node.blobs { + tree.Map.Insert(blob) + arch.m.Insert(blob) + } + } + + node, err := NodeFromFileInfo(dir.Path, dir.Info) + if err != nil { + node.Error = err.Error() + dir.Result <- node + continue + } + + blob, err := arch.SaveTreeJSON(tree) + if err != nil { + panic(err) + } + debug.Log("Archiver.DirWorker", "save tree for %s: %v", dir.Path, blob) + + node.Subtree = blob.ID + node.blobs = Blobs{blob} + + dir.Result <- node + case <-done: + // pipeline was cancelled + return + } + } + } + + var wg sync.WaitGroup + for i := 0; i < maxConcurrency; i++ { + wg.Add(2) + go fileWorker(&wg, done, entCh) + go dirWorker(&wg, done, dirCh) + } + + resCh, err := pipe.Walk(path, done, entCh, dirCh) + if err != nil { + close(done) + } + + // wait for all workers to terminate + wg.Wait() + if err != nil { return nil, nil, err } - sn.Tree = blob + + // wait for top-level node + node := (<-resCh).(*Node) + + // add tree for top-level directory + tree := NewTree() + tree.Insert(node) + for _, blob := range node.blobs { + blob = arch.m.Insert(blob) + tree.Map.Insert(blob) + } + + tb, err := arch.SaveTreeJSON(tree) + if err != nil { + return nil, nil, err + } + + sn.Tree = tb // save snapshot - blob, err = arch.s.SaveJSON(backend.Snapshot, sn) + blob, err := arch.s.SaveJSON(backend.Snapshot, sn) if err != nil { return nil, nil, err } diff --git a/archiver_test.go b/archiver_test.go index daef43da0..043e15cec 100644 --- a/archiver_test.go +++ b/archiver_test.go @@ -140,13 +140,10 @@ func BenchmarkArchiveDirectory(b *testing.B) { key := setupKey(b, be, "geheim") server := restic.NewServerWithKey(be, key) - tree, err := restic.NewScanner(nil).Scan(*benchArchiveDirectory) - ok(b, err) - arch, err := restic.NewArchiver(server, nil) ok(b, err) - _, id, err := arch.Snapshot(*benchArchiveDirectory, tree, nil) + _, id, err := arch.Snapshot(*benchArchiveDirectory, nil) b.Logf("snapshot archived as %v", id) } diff --git a/cmd/restic/cmd_backup.go b/cmd/restic/cmd_backup.go index f128f487d..dbb631e05 100644 --- a/cmd/restic/cmd_backup.go +++ b/cmd/restic/cmd_backup.go @@ -183,7 +183,7 @@ func (cmd CmdBackup) Execute(args []string) error { return nil } - _, id, err := arch.Snapshot(target, newTree, parentSnapshotID) + _, id, err := arch.Snapshot(target, parentSnapshotID) if err != nil { return err } diff --git a/node.go b/node.go index ff2d8018e..ba787550b 100644 --- a/node.go +++ b/node.go @@ -35,8 +35,9 @@ type Node struct { tree *Tree - path string - err error + path string + err error + blobs Blobs } func (n Node) String() string { diff --git a/pipe/pipe.go b/pipe/pipe.go index b8a08ec1d..c70bfa363 100644 --- a/pipe/pipe.go +++ b/pipe/pipe.go @@ -17,6 +17,7 @@ type Entry struct { type Dir struct { Path string Error error + Info os.FileInfo Entries [](<-chan interface{}) Result chan<- interface{} @@ -59,7 +60,6 @@ func walk(path string, done chan struct{}, entCh chan<- Entry, dirCh chan<- Dir, names, err := readDirNames(path) if err != nil { - dirCh <- Dir{Path: path, Error: err} return err } @@ -67,26 +67,28 @@ func walk(path string, done chan struct{}, entCh chan<- Entry, dirCh chan<- Dir, for _, name := range names { subpath := filepath.Join(path, name) + ch := make(chan interface{}, 1) + entries = append(entries, ch) fi, err := os.Lstat(subpath) if err != nil { - entries = append(entries, ch) - entCh <- Entry{Info: fi, Error: err, Result: ch} - continue + // entCh <- Entry{Info: fi, Error: err, Result: ch} + return err } - if isFile(fi) { - ch := make(chan interface{}, 1) + if isDir(fi) { + err = walk(subpath, done, entCh, dirCh, ch) + if err != nil { + return err + } + + } else { entCh <- Entry{Info: fi, Path: subpath, Result: ch} - } else if isDir(fi) { - ch := make(chan interface{}, 1) - entries = append(entries, ch) - walk(subpath, done, entCh, dirCh, ch) } } - dirCh <- Dir{Path: path, Entries: entries, Result: res} + dirCh <- Dir{Path: path, Info: info, Entries: entries, Result: res} return nil } diff --git a/pipe/pipe_test.go b/pipe/pipe_test.go index 1564b2c4f..8f71f107a 100644 --- a/pipe/pipe_test.go +++ b/pipe/pipe_test.go @@ -33,7 +33,7 @@ func statPath(path string) (stats, error) { if fi.IsDir() { s.dirs++ - } else if isFile(fi) { + } else { s.files++ } @@ -57,8 +57,7 @@ func TestPipelineWalker(t *testing.T) { after := stats{} m := sync.Mutex{} - var wg sync.WaitGroup - worker := func(done <-chan struct{}, entCh <-chan pipe.Entry, dirCh <-chan pipe.Dir) { + worker := func(wg *sync.WaitGroup, done <-chan struct{}, entCh <-chan pipe.Entry, dirCh <-chan pipe.Dir) { defer wg.Done() for { select { @@ -97,13 +96,14 @@ func TestPipelineWalker(t *testing.T) { } } + var wg sync.WaitGroup done := make(chan struct{}) entCh := make(chan pipe.Entry) dirCh := make(chan pipe.Dir) for i := 0; i < *maxWorkers; i++ { wg.Add(1) - go worker(done, entCh, dirCh) + go worker(&wg, done, entCh, dirCh) } resCh, err := pipe.Walk(*testWalkerPath, done, entCh, dirCh) @@ -129,24 +129,32 @@ func BenchmarkPipelineWalker(b *testing.B) { var max time.Duration m := sync.Mutex{} - worker := func(wg *sync.WaitGroup, done <-chan struct{}, entCh <-chan pipe.Entry, dirCh <-chan pipe.Dir) { + fileWorker := func(wg *sync.WaitGroup, done <-chan struct{}, ch <-chan pipe.Entry) { defer wg.Done() for { select { - case e, ok := <-entCh: + case e, ok := <-ch: if !ok { // channel is closed return } - // fmt.Printf("file: %v\n", j.Path) - // simulate backup - time.Sleep(10 * time.Millisecond) + //time.Sleep(10 * time.Millisecond) e.Result <- true + case <-done: + // pipeline was cancelled + return + } + } + } - case dir, ok := <-dirCh: + dirWorker := func(wg *sync.WaitGroup, done <-chan struct{}, ch <-chan pipe.Dir) { + defer wg.Done() + for { + select { + case dir, ok := <-ch: if !ok { // channel is closed return @@ -164,8 +172,6 @@ func BenchmarkPipelineWalker(b *testing.B) { if d > max { max = d } - - // fmt.Printf("dir %v: %v\n", d, j.Path) m.Unlock() dir.Result <- true @@ -177,15 +183,17 @@ func BenchmarkPipelineWalker(b *testing.B) { } for i := 0; i < b.N; i++ { + max = 0 done := make(chan struct{}) - entCh := make(chan pipe.Entry, 100) - dirCh := make(chan pipe.Dir, 100) + entCh := make(chan pipe.Entry, 200) + dirCh := make(chan pipe.Dir, 200) var wg sync.WaitGroup b.Logf("starting %d workers", *maxWorkers) for i := 0; i < *maxWorkers; i++ { - wg.Add(1) - go worker(&wg, done, entCh, dirCh) + wg.Add(2) + go dirWorker(&wg, done, dirCh) + go fileWorker(&wg, done, entCh) } resCh, err := pipe.Walk(*testWalkerPath, done, entCh, dirCh) @@ -196,7 +204,7 @@ func BenchmarkPipelineWalker(b *testing.B) { // wait for final result <-resCh - } - b.Logf("max duration for a dir: %v", max) + b.Logf("max duration for a dir: %v", max) + } } diff --git a/testsuite.sh b/testsuite.sh index 79607e51f..986b3eb35 100755 --- a/testsuite.sh +++ b/testsuite.sh @@ -14,4 +14,4 @@ go build -a -tags debug -o "${BINDIR}/restic.debug" ./cmd/restic go build -a -o "${BINDIR}/dirdiff" ./cmd/dirdiff # run tests -testsuite/run.sh +testsuite/run.sh "$@" diff --git a/tree.go b/tree.go index f01370b2e..d98b82bb9 100644 --- a/tree.go +++ b/tree.go @@ -183,8 +183,7 @@ func (t *Tree) Insert(node *Node) error { return ErrNodeAlreadyInTree } - // insert blob - // https://code.google.com/p/go-wiki/wiki/bliceTricks + // https://code.google.com/p/go-wiki/wiki/SliceTricks t.Nodes = append(t.Nodes, &Node{}) copy(t.Nodes[pos+1:], t.Nodes[pos:]) t.Nodes[pos] = node