From ba892e1ec29a72f286c038c4dd12870eb95ccff0 Mon Sep 17 00:00:00 2001 From: Alexander Neumann Date: Sat, 7 Mar 2015 11:53:32 +0100 Subject: [PATCH] Refactor backup pipeline --- archiver.go | 294 ++++++++++++++++++++++++++++++++++++------- archiver_int_test.go | 121 ++++++++++++++++++ map.go | 18 +++ node.go | 38 +++--- node_linux.go | 37 ++++++ pipe/pipe.go | 88 ++++++++----- pipe/pipe_test.go | 29 +++-- walk.go | 7 +- walk_test.go | 9 +- 9 files changed, 536 insertions(+), 105 deletions(-) create mode 100644 archiver_int_test.go diff --git a/archiver.go b/archiver.go index f525eef11..dfe91b79b 100644 --- a/archiver.go +++ b/archiver.go @@ -417,7 +417,10 @@ func (arch *Archiver) saveTree(p *Progress, t *Tree) (Blob, error) { } func (arch *Archiver) fileWorker(wg *sync.WaitGroup, p *Progress, done <-chan struct{}, entCh <-chan pipe.Entry) { - defer wg.Done() + defer func() { + debug.Log("Archiver.fileWorker", "done") + wg.Done() + }() for { select { case e, ok := <-entCh: @@ -426,19 +429,51 @@ func (arch *Archiver) fileWorker(wg *sync.WaitGroup, p *Progress, done <-chan st return } - node, err := NodeFromFileInfo(e.Path, e.Info) + debug.Log("Archiver.fileWorker", "got job %v", e) + + node, err := NodeFromFileInfo(e.Fullpath(), e.Info()) if err != nil { panic(err) } - if node.Type == "file" { + // try to use old node, if present + if e.Node != nil { + debug.Log("Archiver.fileWorker", " %v use old data", e.Path()) + + oldNode := e.Node.(*Node) + // check if all content is still available in the repository + contentMissing := false + for _, blob := range oldNode.blobs { + if ok, err := arch.s.Test(backend.Data, blob.Storage); !ok || err != nil { + debug.Log("Archiver.fileWorker", " %v not using old data, %v (%v) is missing", e.Path(), blob.ID.Str(), blob.Storage.Str()) + contentMissing = true + break + } + } + + if !contentMissing { + node.Content = oldNode.Content + node.blobs = oldNode.blobs + debug.Log("Archiver.fileWorker", " %v content is complete", e.Path()) + } + } else { + debug.Log("Archiver.fileWorker", " %v no old data", e.Path()) + } + + // otherwise read file normally + if node.Type == "file" && len(node.Content) == 0 { + debug.Log("Archiver.fileWorker", " read and save %v, content: %v", e.Path(), node.Content) node.blobs, err = arch.SaveFile(p, node) if err != nil { panic(err) } + } else { + // report old data size + p.Report(Stat{Bytes: node.Size}) } - e.Result <- node + debug.Log("Archiver.fileWorker", " processed %v, %d/%d blobs", e.Path(), len(node.Content), len(node.blobs)) + e.Result() <- node p.Report(Stat{Files: 1}) case <-done: // pipeline was cancelled @@ -448,7 +483,10 @@ func (arch *Archiver) fileWorker(wg *sync.WaitGroup, p *Progress, done <-chan st } func (arch *Archiver) dirWorker(wg *sync.WaitGroup, p *Progress, done <-chan struct{}, dirCh <-chan pipe.Dir) { - defer wg.Done() + defer func() { + debug.Log("Archiver.dirWorker", "done") + wg.Done() + }() for { select { case dir, ok := <-dirCh: @@ -456,7 +494,7 @@ func (arch *Archiver) dirWorker(wg *sync.WaitGroup, p *Progress, done <-chan str // channel is closed return } - debug.Log("Archiver.DirWorker", "save dir %v\n", dir.Path) + debug.Log("Archiver.dirWorker", "save dir %v\n", dir.Path()) tree := NewTree() @@ -466,7 +504,7 @@ func (arch *Archiver) dirWorker(wg *sync.WaitGroup, p *Progress, done <-chan str tree.Insert(node) if node.Type == "dir" { - debug.Log("Archiver.DirWorker", "got tree node for %s: %v", node.path, node.blobs) + debug.Log("Archiver.dirWorker", "got tree node for %s: %v", node.path, node.blobs) } for _, blob := range node.blobs { @@ -475,10 +513,10 @@ func (arch *Archiver) dirWorker(wg *sync.WaitGroup, p *Progress, done <-chan str } } - node, err := NodeFromFileInfo(dir.Path, dir.Info) + node, err := NodeFromFileInfo(dir.Path(), dir.Info()) if err != nil { node.Error = err.Error() - dir.Result <- node + dir.Result() <- node continue } @@ -486,12 +524,12 @@ func (arch *Archiver) dirWorker(wg *sync.WaitGroup, p *Progress, done <-chan str if err != nil { panic(err) } - debug.Log("Archiver.DirWorker", "save tree for %s: %v", dir.Path, blob) + debug.Log("Archiver.dirWorker", "save tree for %s: %v", dir.Path(), blob) node.Subtree = blob.ID node.blobs = Blobs{blob} - dir.Result <- node + dir.Result() <- node p.Report(Stat{Dirs: 1}) case <-done: // pipeline was cancelled @@ -526,51 +564,225 @@ func compareWithOldTree(newCh <-chan interface{}, oldCh <-chan WalkTreeJob, outC } } -func (arch *Archiver) Snapshot(p *Progress, paths []string, parentSnapshot backend.ID) (*Snapshot, backend.ID, error) { +type ArchivePipe struct { + Old <-chan WalkTreeJob + New <-chan pipe.Job +} + +func copyJobs(done <-chan struct{}, in <-chan pipe.Job, out chan<- pipe.Job) { + i := in + o := out + + o = nil + + var ( + j pipe.Job + ok bool + ) + for { + select { + case <-done: + return + case j, ok = <-i: + if !ok { + // in ch closed, we're done + debug.Log("copyJobs", "in channel closed, we're done") + return + } + i = nil + o = out + case o <- j: + o = nil + i = in + } + } +} + +type archiveJob struct { + hasOld bool + old WalkTreeJob + new pipe.Job +} + +func (a *ArchivePipe) compare(done <-chan struct{}, out chan<- pipe.Job) { + defer func() { + close(out) + debug.Log("ArchivePipe.compare", "done") + }() + + debug.Log("ArchivePipe.compare", "start") + var ( + loadOld, loadNew bool = true, true + ok bool + oldJob WalkTreeJob + newJob pipe.Job + ) + + for { + if loadOld { + oldJob, ok = <-a.Old + // if the old channel is closed, just pass through the new jobs + if !ok { + debug.Log("ArchivePipe.compare", "old channel is closed, copy from new channel") + + // handle remaining newJob + if !loadNew { + out <- archiveJob{new: newJob}.Copy() + } + + copyJobs(done, a.New, out) + return + } + + loadOld = false + } + + if loadNew { + newJob, ok = <-a.New + // if the new channel is closed, there are no more files in the current snapshot, return + if !ok { + debug.Log("ArchivePipe.compare", "new channel is closed, we're done") + return + } + + loadNew = false + } + + debug.Log("ArchivePipe.compare", "old job: %v", oldJob.Path) + debug.Log("ArchivePipe.compare", "new job: %v", newJob.Path()) + + // at this point we have received an old job as well as a new job, compare paths + file1 := oldJob.Path + file2 := newJob.Path() + + dir1 := filepath.Dir(file1) + dir2 := filepath.Dir(file2) + + if file1 == file2 { + debug.Log("ArchivePipe.compare", " same filename %q", file1) + + // send job + out <- archiveJob{hasOld: true, old: oldJob, new: newJob}.Copy() + loadOld = true + loadNew = true + continue + } else if dir1 < dir2 { + debug.Log("ArchivePipe.compare", " %q < %q, file %q added", dir1, dir2, file2) + // file is new, send new job and load new + loadNew = true + out <- archiveJob{new: newJob}.Copy() + continue + } else if dir1 == dir2 && file1 < file2 { + debug.Log("ArchivePipe.compare", " %q < %q, file %q removed", file1, file2, file1) + // file has been removed, load new old + loadOld = true + continue + } + + debug.Log("ArchivePipe.compare", " %q > %q, file %q removed", file1, file2, file1) + // file has been removed, throw away old job and load new + loadOld = true + } +} + +func (j archiveJob) Copy() pipe.Job { + if !j.hasOld { + return j.new + } + + // handle files + if isFile(j.new.Info()) { + debug.Log("archiveJob.Copy", " job %v is file", j.new.Path()) + + // if type has changed, return new job directly + if j.old.Node == nil { + return j.new + } + + // if file is newer, return the new job + if j.old.Node.isNewer(j.new.Fullpath(), j.new.Info()) { + debug.Log("archiveJob.Copy", " job %v is newer", j.new.Path()) + return j.new + } + + debug.Log("archiveJob.Copy", " job %v add old data", j.new.Path()) + // otherwise annotate job with old data + e := j.new.(pipe.Entry) + e.Node = j.old.Node + return e + } + + // dirs and other types are just returned + return j.new +} + +func (arch *Archiver) Snapshot(p *Progress, paths []string, pid backend.ID) (*Snapshot, backend.ID, error) { debug.Log("Archiver.Snapshot", "start for %v", paths) debug.Break("Archiver.Snapshot") sort.Strings(paths) + // signal the whole pipeline to stop + done := make(chan struct{}) + var err error + p.Start() defer p.Done() + // create new snapshot sn, err := NewSnapshot(paths) if err != nil { return nil, nil, err } - // load parent snapshot - // var oldRoot backend.ID - // if parentSnapshot != nil { - // sn.Parent = parentSnapshot - // parentSn, err := LoadSnapshot(arch.s, parentSnapshot) - // if err != nil { - // return nil, nil, err - // } - // oldRoot = parentSn.Tree.Storage - // } + jobs := ArchivePipe{} - // signal the whole pipeline to stop - done := make(chan struct{}) + // use parent snapshot (if some was given) + if pid != nil { + sn.Parent = pid - // if we have an old root, start walker and comparer - // oldTreeCh := make(chan WalkTreeJob) - // if oldRoot != nil { - // // start walking the old tree - // debug.Log("Archiver.Snapshot", "start comparer for old root %v", oldRoot.Str()) - // go WalkTree(arch.s, oldRoot, done, oldTreeCh) - // } + // load parent snapshot + parent, err := LoadSnapshot(arch.s, pid) + if err != nil { + return nil, nil, err + } + + // start walker on old tree + ch := make(chan WalkTreeJob) + go WalkTree(arch.s, parent.Tree.Storage, done, ch) + jobs.Old = ch + } else { + // use closed channel + ch := make(chan WalkTreeJob) + close(ch) + jobs.Old = ch + } + + // start walker + pipeCh := make(chan pipe.Job) + resCh := make(chan pipe.Result, 1) + go func() { + err := pipe.Walk(paths, done, pipeCh, resCh) + if err != nil { + debug.Log("Archiver.Snapshot", "pipe.Walk returned error %v", err) + return + } + debug.Log("Archiver.Snapshot", "pipe.Walk done") + }() + jobs.New = pipeCh + + ch := make(chan pipe.Job) + go jobs.compare(done, ch) var wg sync.WaitGroup entCh := make(chan pipe.Entry) dirCh := make(chan pipe.Dir) - jobsCh := make(chan interface{}) // split wg.Add(1) go func() { - pipe.Split(jobsCh, dirCh, entCh) + pipe.Split(ch, dirCh, entCh) + debug.Log("Archiver.Snapshot", "split done") close(dirCh) close(entCh) wg.Done() @@ -583,15 +795,6 @@ func (arch *Archiver) Snapshot(p *Progress, paths []string, parentSnapshot backe go arch.dirWorker(&wg, p, done, dirCh) } - // start walker - resCh, err := pipe.Walk(paths, done, jobsCh) - if err != nil { - close(done) - - debug.Log("Archiver.Snapshot", "pipe.Walke returned error %v", err) - return nil, nil, err - } - // wait for all workers to terminate debug.Log("Archiver.Snapshot", "wait for workers") wg.Wait() @@ -604,10 +807,11 @@ func (arch *Archiver) Snapshot(p *Progress, paths []string, parentSnapshot backe for i := 0; i < len(paths); i++ { node := (<-root.Entries[i]).(*Node) - debug.Log("Archiver.Snapshot", "got toplevel node %v", node) + debug.Log("Archiver.Snapshot", "got toplevel node %v, %d/%d blobs", node, len(node.Content), len(node.blobs)) tree.Insert(node) for _, blob := range node.blobs { + debug.Log("Archiver.Snapshot", " add toplevel blob %v", blob) blob = arch.m.Insert(blob) tree.Map.Insert(blob) } @@ -630,6 +834,10 @@ func (arch *Archiver) Snapshot(p *Progress, paths []string, parentSnapshot backe } func isFile(fi os.FileInfo) bool { + if fi == nil { + return false + } + return fi.Mode()&(os.ModeType|os.ModeCharDevice) == 0 } diff --git a/archiver_int_test.go b/archiver_int_test.go new file mode 100644 index 000000000..dbaaa1311 --- /dev/null +++ b/archiver_int_test.go @@ -0,0 +1,121 @@ +package restic + +import ( + "os" + "testing" + + "github.com/restic/restic/pipe" +) + +var treeJobs = []string{ + "foo/baz/subdir", + "foo/bar", + "foo", + "quu/foo/file1", + "quu/foo/file2", + "quu/foo/file3", + "quu/foo", + "quu/fooz", + "quu", + "yy/a", + "yy/b", + "yy", +} + +var pipeJobs = []string{ + "foo/baz/subdir", + "foo/baz/subdir2", // subdir2 added + "foo/bar", + "foo", + "quu/foo/file1", // file2 removed + "quu/foo/file3", + "quu/foo", + "quu", + "quv/file1", // files added and removed + "quv/file2", + "quv", + "zz/file1", // new files removed and added at the end + "zz/file2", + "zz", +} + +var resultJobs = []struct { + path string + hasOld bool +}{ + {"foo/baz/subdir", true}, + {"foo/baz/subdir2", false}, + {"foo/bar", true}, + {"foo", true}, + {"quu/foo/file1", true}, + {"quu/foo/file3", true}, + {"quu/foo", true}, + {"quu", true}, + {"quv/file1", false}, + {"quv/file2", false}, + {"quv", false}, + {"zz/file1", false}, + {"zz/file2", false}, + {"zz", false}, +} + +type testPipeJob struct { + path string + err error + fi os.FileInfo + res chan<- pipe.Result +} + +func (j testPipeJob) Path() string { return j.path } +func (j testPipeJob) Fullpath() string { return j.path } +func (j testPipeJob) Error() error { return j.err } +func (j testPipeJob) Info() os.FileInfo { return j.fi } +func (j testPipeJob) Result() chan<- pipe.Result { return j.res } + +func testTreeWalker(done <-chan struct{}, out chan<- WalkTreeJob) { + for _, e := range treeJobs { + select { + case <-done: + return + case out <- WalkTreeJob{Path: e}: + } + } + + close(out) +} + +func testPipeWalker(done <-chan struct{}, out chan<- pipe.Job) { + for _, e := range pipeJobs { + select { + case <-done: + return + case out <- testPipeJob{path: e}: + } + } + + close(out) +} + +func TestArchivePipe(t *testing.T) { + done := make(chan struct{}) + + treeCh := make(chan WalkTreeJob) + pipeCh := make(chan pipe.Job) + + go testTreeWalker(done, treeCh) + go testPipeWalker(done, pipeCh) + + p := ArchivePipe{Old: treeCh, New: pipeCh} + + ch := make(chan pipe.Job) + + go p.compare(done, ch) + + i := 0 + for job := range ch { + if job.Path() != resultJobs[i].path { + t.Fatalf("wrong job received: wanted %v, got %v", resultJobs[i], job) + } + i++ + } +} diff --git a/map.go b/map.go index 080e46f25..7c1dc4872 100644 --- a/map.go +++ b/map.go @@ -140,6 +140,24 @@ func (bl *Map) Equals(other *Map) bool { return true } +// Select returns a list of of blobs from the plaintext IDs given in list. +func (bl *Map) Select(list backend.IDs) (Blobs, error) { + bl.m.Lock() + defer bl.m.Unlock() + + blobs := make(Blobs, 0, len(list)) + for _, id := range list { + _, blob, err := bl.find(Blob{ID: id}, false) + if err != nil { + return nil, err + } + + blobs = append(blobs, blob) + } + + return blobs, nil +} + // Len returns the number of blobs in the map. func (bl *Map) Len() int { bl.m.Lock() diff --git a/node.go b/node.go index ba787550b..b49554835 100644 --- a/node.go +++ b/node.go @@ -64,28 +64,36 @@ func NodeFromFileInfo(path string, fi os.FileInfo) (*Node, error) { node.Mode = fi.Mode() & os.ModePerm node.ModTime = fi.ModTime() - switch fi.Mode() & (os.ModeType | os.ModeCharDevice) { - case 0: - node.Type = "file" + node.Type = nodeTypeFromFileInfo(path, fi) + if node.Type == "file" { node.Size = uint64(fi.Size()) - case os.ModeDir: - node.Type = "dir" - case os.ModeSymlink: - node.Type = "symlink" - case os.ModeDevice | os.ModeCharDevice: - node.Type = "chardev" - case os.ModeDevice: - node.Type = "dev" - case os.ModeNamedPipe: - node.Type = "fifo" - case os.ModeSocket: - node.Type = "socket" } err := node.fill_extra(path, fi) return node, err } +func nodeTypeFromFileInfo(path string, fi os.FileInfo) string { + switch fi.Mode() & (os.ModeType | os.ModeCharDevice) { + case 0: + return "file" + case os.ModeDir: + return "dir" + case os.ModeSymlink: + return "symlink" + case os.ModeDevice | os.ModeCharDevice: + return "chardev" + case os.ModeDevice: + return "dev" + case os.ModeNamedPipe: + return "fifo" + case os.ModeSocket: + return "socket" + } + + return "" +} + func CreateNodeAt(node *Node, m *Map, s Server, path string) error { switch node.Type { case "dir": diff --git a/node_linux.go b/node_linux.go index fb6fe21c2..3ec6653df 100644 --- a/node_linux.go +++ b/node_linux.go @@ -7,6 +7,8 @@ import ( "strconv" "syscall" "time" + + "github.com/restic/restic/debug" ) func (node *Node) fill_extra(path string, fi os.FileInfo) (err error) { @@ -66,3 +68,38 @@ func (node *Node) createCharDevAt(path string) error { func (node *Node) createFifoAt(path string) error { return syscall.Mkfifo(path, 0600) } + +func (node *Node) isNewer(path string, fi os.FileInfo) bool { + // if this node has a type other than "file", treat as if content has changed + if node.Type != "file" { + debug.Log("node.isNewer", "node %v is newer: not file", path) + return true + } + + // if the name or type has changed, this is surely something different + tpe := nodeTypeFromFileInfo(path, fi) + if node.Name != fi.Name() || node.Type != tpe { + debug.Log("node.isNewer", "node %v is newer: name or type changed", path) + return false + } + + // collect extended stat + stat := fi.Sys().(*syscall.Stat_t) + + changeTime := time.Unix(stat.Ctim.Unix()) + inode := stat.Ino + size := uint64(stat.Size) + + // if timestamps or inodes differ, content has changed + if node.ModTime != fi.ModTime() || + node.ChangeTime != changeTime || + node.Inode != inode || + node.Size != size { + debug.Log("node.isNewer", "node %v is newer: timestamp or inode changed", path) + return false + } + + // otherwise the node is assumed to have the same content + debug.Log("node.isNewer", "node %v is not newer", path) + return false +} diff --git a/pipe/pipe.go b/pipe/pipe.go index c3534b7bf..a8a96d0e1 100644 --- a/pipe/pipe.go +++ b/pipe/pipe.go @@ -10,22 +10,51 @@ import ( "github.com/restic/restic/debug" ) -type Entry struct { - Path string - Info os.FileInfo - Error error - Result chan<- interface{} +type Result interface{} + +type Job interface { + Path() string + Fullpath() string + Error() error + Info() os.FileInfo + + Result() chan<- Result } +type Entry struct { + basedir string + path string + info os.FileInfo + error error + result chan<- Result + + // points to the old node if available, interface{} is used to prevent + // circular import + Node interface{} +} + +func (e Entry) Path() string { return e.path } +func (e Entry) Fullpath() string { return filepath.Join(e.basedir, e.path) } +func (e Entry) Error() error { return e.error } +func (e Entry) Info() os.FileInfo { return e.info } +func (e Entry) Result() chan<- Result { return e.result } + type Dir struct { - Path string - Error error - Info os.FileInfo + basedir string + path string + error error + info os.FileInfo - Entries [](<-chan interface{}) - Result chan<- interface{} + Entries [](<-chan Result) + result chan<- Result } +func (e Dir) Path() string { return e.path } +func (e Dir) Fullpath() string { return filepath.Join(e.basedir, e.path) } +func (e Dir) Error() error { return e.error } +func (e Dir) Info() os.FileInfo { return e.info } +func (e Dir) Result() chan<- Result { return e.result } + // readDirNames reads the directory named by dirname and returns // a sorted list of directory entries. // taken from filepath/path.go @@ -53,15 +82,17 @@ func isFile(fi os.FileInfo) bool { var errCancelled = errors.New("walk cancelled") -func walk(path string, done chan struct{}, jobs chan<- interface{}, res chan<- interface{}) error { +func walk(basedir, path string, done chan struct{}, jobs chan<- Job, res chan<- Result) error { info, err := os.Lstat(path) if err != nil { return err } + relpath, _ := filepath.Rel(basedir, path) + if !info.IsDir() { select { - case jobs <- Entry{Info: info, Path: path, Result: res}: + case jobs <- Entry{info: info, basedir: basedir, path: relpath, result: res}: case <-done: return errCancelled } @@ -73,18 +104,18 @@ func walk(path string, done chan struct{}, jobs chan<- interface{}, res chan<- i return err } - entries := make([]<-chan interface{}, 0, len(names)) + entries := make([]<-chan Result, 0, len(names)) for _, name := range names { subpath := filepath.Join(path, name) - ch := make(chan interface{}, 1) + ch := make(chan Result, 1) entries = append(entries, ch) fi, err := os.Lstat(subpath) if err != nil { select { - case jobs <- Entry{Info: fi, Error: err, Result: ch}: + case jobs <- Entry{info: fi, error: err, result: ch}: case <-done: return errCancelled } @@ -92,14 +123,14 @@ func walk(path string, done chan struct{}, jobs chan<- interface{}, res chan<- i } if isDir(fi) { - err = walk(subpath, done, jobs, ch) + err = walk(basedir, subpath, done, jobs, ch) if err != nil { return err } } else { select { - case jobs <- Entry{Info: fi, Path: subpath, Result: ch}: + case jobs <- Entry{info: fi, basedir: basedir, path: filepath.Join(relpath, name), result: ch}: case <-done: return errCancelled } @@ -107,7 +138,7 @@ func walk(path string, done chan struct{}, jobs chan<- interface{}, res chan<- i } select { - case jobs <- Dir{Path: path, Info: info, Entries: entries, Result: res}: + case jobs <- Dir{basedir: basedir, path: relpath, info: info, Entries: entries, result: res}: case <-done: return errCancelled } @@ -116,31 +147,30 @@ func walk(path string, done chan struct{}, jobs chan<- interface{}, res chan<- i // Walk sends a Job for each file and directory it finds below the paths. When // the channel done is closed, processing stops. -func Walk(paths []string, done chan struct{}, jobs chan<- interface{}) (<-chan interface{}, error) { - resCh := make(chan interface{}, 1) +func Walk(paths []string, done chan struct{}, jobs chan<- Job, res chan<- Result) error { defer func() { - close(resCh) - close(jobs) debug.Log("pipe.Walk", "output channel closed") + close(res) + close(jobs) }() - entries := make([]<-chan interface{}, 0, len(paths)) + entries := make([]<-chan Result, 0, len(paths)) for _, path := range paths { debug.Log("pipe.Walk", "start walker for %v", path) - ch := make(chan interface{}, 1) + ch := make(chan Result, 1) entries = append(entries, ch) - err := walk(path, done, jobs, ch) + err := walk(filepath.Dir(path), path, done, jobs, ch) if err != nil { - return nil, err + return err } debug.Log("pipe.Walk", "walker for %v done", path) } - resCh <- Dir{Entries: entries} - return resCh, nil + res <- Dir{Entries: entries} + return nil } // Split feeds all elements read from inChan to dirChan and entChan. -func Split(inChan <-chan interface{}, dirChan chan<- Dir, entChan chan<- Entry) { +func Split(inChan <-chan Job, dirChan chan<- Dir, entChan chan<- Entry) { debug.Log("pipe.Split", "start") defer debug.Log("pipe.Split", "done") diff --git a/pipe/pipe_test.go b/pipe/pipe_test.go index 979fb85b2..1baf0b34d 100644 --- a/pipe/pipe_test.go +++ b/pipe/pipe_test.go @@ -71,7 +71,7 @@ func TestPipelineWalkerWithSplit(t *testing.T) { after.files++ m.Unlock() - e.Result <- true + e.Result() <- true case dir, ok := <-dirCh: if !ok { @@ -88,7 +88,7 @@ func TestPipelineWalkerWithSplit(t *testing.T) { after.dirs++ m.Unlock() - dir.Result <- true + dir.Result() <- true case <-done: // pipeline was cancelled return @@ -106,7 +106,7 @@ func TestPipelineWalkerWithSplit(t *testing.T) { go worker(&wg, done, entCh, dirCh) } - jobs := make(chan interface{}, 200) + jobs := make(chan pipe.Job, 200) wg.Add(1) go func() { pipe.Split(jobs, dirCh, entCh) @@ -115,7 +115,8 @@ func TestPipelineWalkerWithSplit(t *testing.T) { wg.Done() }() - resCh, err := pipe.Walk([]string{*testWalkerPath}, done, jobs) + resCh := make(chan pipe.Result, 1) + err = pipe.Walk([]string{*testWalkerPath}, done, jobs, resCh) ok(t, err) // wait for all workers to terminate @@ -144,7 +145,7 @@ func TestPipelineWalker(t *testing.T) { after := stats{} m := sync.Mutex{} - worker := func(wg *sync.WaitGroup, done <-chan struct{}, jobs <-chan interface{}) { + worker := func(wg *sync.WaitGroup, done <-chan struct{}, jobs <-chan pipe.Job) { defer wg.Done() for { select { @@ -166,13 +167,13 @@ func TestPipelineWalker(t *testing.T) { after.dirs++ m.Unlock() - j.Result <- true + j.Result() <- true case pipe.Entry: m.Lock() after.files++ m.Unlock() - j.Result <- true + j.Result() <- true } case <-done: @@ -184,14 +185,15 @@ func TestPipelineWalker(t *testing.T) { var wg sync.WaitGroup done := make(chan struct{}) - jobs := make(chan interface{}) + jobs := make(chan pipe.Job) for i := 0; i < *maxWorkers; i++ { wg.Add(1) go worker(&wg, done, jobs) } - resCh, err := pipe.Walk([]string{*testWalkerPath}, done, jobs) + resCh := make(chan pipe.Result, 1) + err = pipe.Walk([]string{*testWalkerPath}, done, jobs, resCh) ok(t, err) // wait for all workers to terminate @@ -227,7 +229,7 @@ func BenchmarkPipelineWalker(b *testing.B) { // simulate backup //time.Sleep(10 * time.Millisecond) - e.Result <- true + e.Result() <- true case <-done: // pipeline was cancelled return @@ -259,7 +261,7 @@ func BenchmarkPipelineWalker(b *testing.B) { } m.Unlock() - dir.Result <- true + dir.Result() <- true case <-done: // pipeline was cancelled return @@ -281,7 +283,7 @@ func BenchmarkPipelineWalker(b *testing.B) { go fileWorker(&wg, done, entCh) } - jobs := make(chan interface{}, 200) + jobs := make(chan pipe.Job, 200) wg.Add(1) go func() { pipe.Split(jobs, dirCh, entCh) @@ -290,7 +292,8 @@ func BenchmarkPipelineWalker(b *testing.B) { wg.Done() }() - resCh, err := pipe.Walk([]string{*testWalkerPath}, done, jobs) + resCh := make(chan pipe.Result, 1) + err := pipe.Walk([]string{*testWalkerPath}, done, jobs, resCh) ok(b, err) // wait for all workers to terminate diff --git a/walk.go b/walk.go index d97ab09b6..83e2db726 100644 --- a/walk.go +++ b/walk.go @@ -34,7 +34,12 @@ func walkTree(s Server, path string, id backend.ID, done chan struct{}, jobCh ch } walkTree(s, p, blob.Storage, done, jobCh) } else { - jobCh <- WalkTreeJob{Path: p, Node: node} + // load old blobs + node.blobs, err = t.Map.Select(node.Content) + if err != nil { + debug.Log("walkTree", "unable to load bobs for %q (%v): %v", path, id.Str(), err) + } + jobCh <- WalkTreeJob{Path: p, Node: node, Error: err} } } diff --git a/walk_test.go b/walk_test.go index 0684793e4..4a0d50350 100644 --- a/walk_test.go +++ b/walk_test.go @@ -38,8 +38,9 @@ func TestWalkTree(t *testing.T) { go restic.WalkTree(server, sn.Tree.Storage, done, treeJobs) // start filesystem walker - fsJobs := make(chan interface{}) - go pipe.Walk(dirs, done, fsJobs) + fsJobs := make(chan pipe.Job) + resCh := make(chan pipe.Result, 1) + go pipe.Walk(dirs, done, fsJobs, resCh) for { // receive fs job @@ -51,10 +52,10 @@ func TestWalkTree(t *testing.T) { fsEntries := 1 switch j := fsJob.(type) { case pipe.Dir: - path = j.Path + path = j.Path() fsEntries = len(j.Entries) case pipe.Entry: - path = j.Path + path = j.Path() } // receive tree job