2
2
mirror of https://github.com/octoleo/restic.git synced 2024-11-29 16:23:59 +00:00

Refactor backup pipeline

This commit is contained in:
Alexander Neumann 2015-03-07 11:53:32 +01:00
parent 1516cb5996
commit ba892e1ec2
9 changed files with 536 additions and 105 deletions

View File

@ -417,7 +417,10 @@ func (arch *Archiver) saveTree(p *Progress, t *Tree) (Blob, error) {
} }
func (arch *Archiver) fileWorker(wg *sync.WaitGroup, p *Progress, done <-chan struct{}, entCh <-chan pipe.Entry) { func (arch *Archiver) fileWorker(wg *sync.WaitGroup, p *Progress, done <-chan struct{}, entCh <-chan pipe.Entry) {
defer wg.Done() defer func() {
debug.Log("Archiver.fileWorker", "done")
wg.Done()
}()
for { for {
select { select {
case e, ok := <-entCh: case e, ok := <-entCh:
@ -426,19 +429,51 @@ func (arch *Archiver) fileWorker(wg *sync.WaitGroup, p *Progress, done <-chan st
return return
} }
node, err := NodeFromFileInfo(e.Path, e.Info) debug.Log("Archiver.fileWorker", "got job %v", e)
node, err := NodeFromFileInfo(e.Fullpath(), e.Info())
if err != nil { if err != nil {
panic(err) panic(err)
} }
if node.Type == "file" { // try to use old node, if present
if e.Node != nil {
debug.Log("Archiver.fileWorker", " %v use old data", e.Path())
oldNode := e.Node.(*Node)
// check if all content is still available in the repository
contentMissing := false
for _, blob := range oldNode.blobs {
if ok, err := arch.s.Test(backend.Data, blob.Storage); !ok || err != nil {
debug.Log("Archiver.fileWorker", " %v not using old data, %v (%v) is missing", e.Path(), blob.ID.Str(), blob.Storage.Str())
contentMissing = true
break
}
}
if !contentMissing {
node.Content = oldNode.Content
node.blobs = oldNode.blobs
debug.Log("Archiver.fileWorker", " %v content is complete", e.Path())
}
} else {
debug.Log("Archiver.fileWorker", " %v no old data", e.Path())
}
// otherwise read file normally
if node.Type == "file" && len(node.Content) == 0 {
debug.Log("Archiver.fileWorker", " read and save %v, content: %v", e.Path(), node.Content)
node.blobs, err = arch.SaveFile(p, node) node.blobs, err = arch.SaveFile(p, node)
if err != nil { if err != nil {
panic(err) panic(err)
} }
} else {
// report old data size
p.Report(Stat{Bytes: node.Size})
} }
e.Result <- node debug.Log("Archiver.fileWorker", " processed %v, %d/%d blobs", e.Path(), len(node.Content), len(node.blobs))
e.Result() <- node
p.Report(Stat{Files: 1}) p.Report(Stat{Files: 1})
case <-done: case <-done:
// pipeline was cancelled // pipeline was cancelled
@ -448,7 +483,10 @@ func (arch *Archiver) fileWorker(wg *sync.WaitGroup, p *Progress, done <-chan st
} }
func (arch *Archiver) dirWorker(wg *sync.WaitGroup, p *Progress, done <-chan struct{}, dirCh <-chan pipe.Dir) { func (arch *Archiver) dirWorker(wg *sync.WaitGroup, p *Progress, done <-chan struct{}, dirCh <-chan pipe.Dir) {
defer wg.Done() defer func() {
debug.Log("Archiver.dirWorker", "done")
wg.Done()
}()
for { for {
select { select {
case dir, ok := <-dirCh: case dir, ok := <-dirCh:
@ -456,7 +494,7 @@ func (arch *Archiver) dirWorker(wg *sync.WaitGroup, p *Progress, done <-chan str
// channel is closed // channel is closed
return return
} }
debug.Log("Archiver.DirWorker", "save dir %v\n", dir.Path) debug.Log("Archiver.dirWorker", "save dir %v\n", dir.Path())
tree := NewTree() tree := NewTree()
@ -466,7 +504,7 @@ func (arch *Archiver) dirWorker(wg *sync.WaitGroup, p *Progress, done <-chan str
tree.Insert(node) tree.Insert(node)
if node.Type == "dir" { if node.Type == "dir" {
debug.Log("Archiver.DirWorker", "got tree node for %s: %v", node.path, node.blobs) debug.Log("Archiver.dirWorker", "got tree node for %s: %v", node.path, node.blobs)
} }
for _, blob := range node.blobs { for _, blob := range node.blobs {
@ -475,10 +513,10 @@ func (arch *Archiver) dirWorker(wg *sync.WaitGroup, p *Progress, done <-chan str
} }
} }
node, err := NodeFromFileInfo(dir.Path, dir.Info) node, err := NodeFromFileInfo(dir.Path(), dir.Info())
if err != nil { if err != nil {
node.Error = err.Error() node.Error = err.Error()
dir.Result <- node dir.Result() <- node
continue continue
} }
@ -486,12 +524,12 @@ func (arch *Archiver) dirWorker(wg *sync.WaitGroup, p *Progress, done <-chan str
if err != nil { if err != nil {
panic(err) panic(err)
} }
debug.Log("Archiver.DirWorker", "save tree for %s: %v", dir.Path, blob) debug.Log("Archiver.dirWorker", "save tree for %s: %v", dir.Path(), blob)
node.Subtree = blob.ID node.Subtree = blob.ID
node.blobs = Blobs{blob} node.blobs = Blobs{blob}
dir.Result <- node dir.Result() <- node
p.Report(Stat{Dirs: 1}) p.Report(Stat{Dirs: 1})
case <-done: case <-done:
// pipeline was cancelled // pipeline was cancelled
@ -526,51 +564,225 @@ func compareWithOldTree(newCh <-chan interface{}, oldCh <-chan WalkTreeJob, outC
} }
} }
func (arch *Archiver) Snapshot(p *Progress, paths []string, parentSnapshot backend.ID) (*Snapshot, backend.ID, error) { type ArchivePipe struct {
Old <-chan WalkTreeJob
New <-chan pipe.Job
}
func copyJobs(done <-chan struct{}, in <-chan pipe.Job, out chan<- pipe.Job) {
i := in
o := out
o = nil
var (
j pipe.Job
ok bool
)
for {
select {
case <-done:
return
case j, ok = <-i:
if !ok {
// in ch closed, we're done
debug.Log("copyJobs", "in channel closed, we're done")
return
}
i = nil
o = out
case o <- j:
o = nil
i = in
}
}
}
type archiveJob struct {
hasOld bool
old WalkTreeJob
new pipe.Job
}
func (a *ArchivePipe) compare(done <-chan struct{}, out chan<- pipe.Job) {
defer func() {
close(out)
debug.Log("ArchivePipe.compare", "done")
}()
debug.Log("ArchivePipe.compare", "start")
var (
loadOld, loadNew bool = true, true
ok bool
oldJob WalkTreeJob
newJob pipe.Job
)
for {
if loadOld {
oldJob, ok = <-a.Old
// if the old channel is closed, just pass through the new jobs
if !ok {
debug.Log("ArchivePipe.compare", "old channel is closed, copy from new channel")
// handle remaining newJob
if !loadNew {
out <- archiveJob{new: newJob}.Copy()
}
copyJobs(done, a.New, out)
return
}
loadOld = false
}
if loadNew {
newJob, ok = <-a.New
// if the new channel is closed, there are no more files in the current snapshot, return
if !ok {
debug.Log("ArchivePipe.compare", "new channel is closed, we're done")
return
}
loadNew = false
}
debug.Log("ArchivePipe.compare", "old job: %v", oldJob.Path)
debug.Log("ArchivePipe.compare", "new job: %v", newJob.Path())
// at this point we have received an old job as well as a new job, compare paths
file1 := oldJob.Path
file2 := newJob.Path()
dir1 := filepath.Dir(file1)
dir2 := filepath.Dir(file2)
if file1 == file2 {
debug.Log("ArchivePipe.compare", " same filename %q", file1)
// send job
out <- archiveJob{hasOld: true, old: oldJob, new: newJob}.Copy()
loadOld = true
loadNew = true
continue
} else if dir1 < dir2 {
debug.Log("ArchivePipe.compare", " %q < %q, file %q added", dir1, dir2, file2)
// file is new, send new job and load new
loadNew = true
out <- archiveJob{new: newJob}.Copy()
continue
} else if dir1 == dir2 && file1 < file2 {
debug.Log("ArchivePipe.compare", " %q < %q, file %q removed", file1, file2, file1)
// file has been removed, load new old
loadOld = true
continue
}
debug.Log("ArchivePipe.compare", " %q > %q, file %q removed", file1, file2, file1)
// file has been removed, throw away old job and load new
loadOld = true
}
}
func (j archiveJob) Copy() pipe.Job {
if !j.hasOld {
return j.new
}
// handle files
if isFile(j.new.Info()) {
debug.Log("archiveJob.Copy", " job %v is file", j.new.Path())
// if type has changed, return new job directly
if j.old.Node == nil {
return j.new
}
// if file is newer, return the new job
if j.old.Node.isNewer(j.new.Fullpath(), j.new.Info()) {
debug.Log("archiveJob.Copy", " job %v is newer", j.new.Path())
return j.new
}
debug.Log("archiveJob.Copy", " job %v add old data", j.new.Path())
// otherwise annotate job with old data
e := j.new.(pipe.Entry)
e.Node = j.old.Node
return e
}
// dirs and other types are just returned
return j.new
}
func (arch *Archiver) Snapshot(p *Progress, paths []string, pid backend.ID) (*Snapshot, backend.ID, error) {
debug.Log("Archiver.Snapshot", "start for %v", paths) debug.Log("Archiver.Snapshot", "start for %v", paths)
debug.Break("Archiver.Snapshot") debug.Break("Archiver.Snapshot")
sort.Strings(paths) sort.Strings(paths)
// signal the whole pipeline to stop
done := make(chan struct{})
var err error
p.Start() p.Start()
defer p.Done() defer p.Done()
// create new snapshot
sn, err := NewSnapshot(paths) sn, err := NewSnapshot(paths)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
jobs := ArchivePipe{}
// use parent snapshot (if some was given)
if pid != nil {
sn.Parent = pid
// load parent snapshot // load parent snapshot
// var oldRoot backend.ID parent, err := LoadSnapshot(arch.s, pid)
// if parentSnapshot != nil { if err != nil {
// sn.Parent = parentSnapshot return nil, nil, err
// parentSn, err := LoadSnapshot(arch.s, parentSnapshot) }
// if err != nil {
// return nil, nil, err
// }
// oldRoot = parentSn.Tree.Storage
// }
// signal the whole pipeline to stop // start walker on old tree
done := make(chan struct{}) ch := make(chan WalkTreeJob)
go WalkTree(arch.s, parent.Tree.Storage, done, ch)
jobs.Old = ch
} else {
// use closed channel
ch := make(chan WalkTreeJob)
close(ch)
jobs.Old = ch
}
// if we have an old root, start walker and comparer // start walker
// oldTreeCh := make(chan WalkTreeJob) pipeCh := make(chan pipe.Job)
// if oldRoot != nil { resCh := make(chan pipe.Result, 1)
// // start walking the old tree go func() {
// debug.Log("Archiver.Snapshot", "start comparer for old root %v", oldRoot.Str()) err := pipe.Walk(paths, done, pipeCh, resCh)
// go WalkTree(arch.s, oldRoot, done, oldTreeCh) if err != nil {
// } debug.Log("Archiver.Snapshot", "pipe.Walk returned error %v", err)
return
}
debug.Log("Archiver.Snapshot", "pipe.Walk done")
}()
jobs.New = pipeCh
ch := make(chan pipe.Job)
go jobs.compare(done, ch)
var wg sync.WaitGroup var wg sync.WaitGroup
entCh := make(chan pipe.Entry) entCh := make(chan pipe.Entry)
dirCh := make(chan pipe.Dir) dirCh := make(chan pipe.Dir)
jobsCh := make(chan interface{})
// split // split
wg.Add(1) wg.Add(1)
go func() { go func() {
pipe.Split(jobsCh, dirCh, entCh) pipe.Split(ch, dirCh, entCh)
debug.Log("Archiver.Snapshot", "split done")
close(dirCh) close(dirCh)
close(entCh) close(entCh)
wg.Done() wg.Done()
@ -583,15 +795,6 @@ func (arch *Archiver) Snapshot(p *Progress, paths []string, parentSnapshot backe
go arch.dirWorker(&wg, p, done, dirCh) go arch.dirWorker(&wg, p, done, dirCh)
} }
// start walker
resCh, err := pipe.Walk(paths, done, jobsCh)
if err != nil {
close(done)
debug.Log("Archiver.Snapshot", "pipe.Walke returned error %v", err)
return nil, nil, err
}
// wait for all workers to terminate // wait for all workers to terminate
debug.Log("Archiver.Snapshot", "wait for workers") debug.Log("Archiver.Snapshot", "wait for workers")
wg.Wait() wg.Wait()
@ -604,10 +807,11 @@ func (arch *Archiver) Snapshot(p *Progress, paths []string, parentSnapshot backe
for i := 0; i < len(paths); i++ { for i := 0; i < len(paths); i++ {
node := (<-root.Entries[i]).(*Node) node := (<-root.Entries[i]).(*Node)
debug.Log("Archiver.Snapshot", "got toplevel node %v", node) debug.Log("Archiver.Snapshot", "got toplevel node %v, %d/%d blobs", node, len(node.Content), len(node.blobs))
tree.Insert(node) tree.Insert(node)
for _, blob := range node.blobs { for _, blob := range node.blobs {
debug.Log("Archiver.Snapshot", " add toplevel blob %v", blob)
blob = arch.m.Insert(blob) blob = arch.m.Insert(blob)
tree.Map.Insert(blob) tree.Map.Insert(blob)
} }
@ -630,6 +834,10 @@ func (arch *Archiver) Snapshot(p *Progress, paths []string, parentSnapshot backe
} }
func isFile(fi os.FileInfo) bool { func isFile(fi os.FileInfo) bool {
if fi == nil {
return false
}
return fi.Mode()&(os.ModeType|os.ModeCharDevice) == 0 return fi.Mode()&(os.ModeType|os.ModeCharDevice) == 0
} }

121
archiver_int_test.go Normal file
View File

@ -0,0 +1,121 @@
package restic
import (
"os"
"testing"
"github.com/restic/restic/pipe"
)
var treeJobs = []string{
"foo/baz/subdir",
"foo/bar",
"foo",
"quu/foo/file1",
"quu/foo/file2",
"quu/foo/file3",
"quu/foo",
"quu/fooz",
"quu",
"yy/a",
"yy/b",
"yy",
}
var pipeJobs = []string{
"foo/baz/subdir",
"foo/baz/subdir2", // subdir2 added
"foo/bar",
"foo",
"quu/foo/file1", // file2 removed
"quu/foo/file3",
"quu/foo",
"quu",
"quv/file1", // files added and removed
"quv/file2",
"quv",
"zz/file1", // new files removed and added at the end
"zz/file2",
"zz",
}
var resultJobs = []struct {
path string
hasOld bool
}{
{"foo/baz/subdir", true},
{"foo/baz/subdir2", false},
{"foo/bar", true},
{"foo", true},
{"quu/foo/file1", true},
{"quu/foo/file3", true},
{"quu/foo", true},
{"quu", true},
{"quv/file1", false},
{"quv/file2", false},
{"quv", false},
{"zz/file1", false},
{"zz/file2", false},
{"zz", false},
}
type testPipeJob struct {
path string
err error
fi os.FileInfo
res chan<- pipe.Result
}
func (j testPipeJob) Path() string { return j.path }
func (j testPipeJob) Fullpath() string { return j.path }
func (j testPipeJob) Error() error { return j.err }
func (j testPipeJob) Info() os.FileInfo { return j.fi }
func (j testPipeJob) Result() chan<- pipe.Result { return j.res }
func testTreeWalker(done <-chan struct{}, out chan<- WalkTreeJob) {
for _, e := range treeJobs {
select {
case <-done:
return
case out <- WalkTreeJob{Path: e}:
}
}
close(out)
}
func testPipeWalker(done <-chan struct{}, out chan<- pipe.Job) {
for _, e := range pipeJobs {
select {
case <-done:
return
case out <- testPipeJob{path: e}:
}
}
close(out)
}
func TestArchivePipe(t *testing.T) {
done := make(chan struct{})
treeCh := make(chan WalkTreeJob)
pipeCh := make(chan pipe.Job)
go testTreeWalker(done, treeCh)
go testPipeWalker(done, pipeCh)
p := ArchivePipe{Old: treeCh, New: pipeCh}
ch := make(chan pipe.Job)
go p.compare(done, ch)
i := 0
for job := range ch {
if job.Path() != resultJobs[i].path {
t.Fatalf("wrong job received: wanted %v, got %v", resultJobs[i], job)
}
i++
}
}

18
map.go
View File

@ -140,6 +140,24 @@ func (bl *Map) Equals(other *Map) bool {
return true return true
} }
// Select returns a list of of blobs from the plaintext IDs given in list.
func (bl *Map) Select(list backend.IDs) (Blobs, error) {
bl.m.Lock()
defer bl.m.Unlock()
blobs := make(Blobs, 0, len(list))
for _, id := range list {
_, blob, err := bl.find(Blob{ID: id}, false)
if err != nil {
return nil, err
}
blobs = append(blobs, blob)
}
return blobs, nil
}
// Len returns the number of blobs in the map. // Len returns the number of blobs in the map.
func (bl *Map) Len() int { func (bl *Map) Len() int {
bl.m.Lock() bl.m.Lock()

38
node.go
View File

@ -64,28 +64,36 @@ func NodeFromFileInfo(path string, fi os.FileInfo) (*Node, error) {
node.Mode = fi.Mode() & os.ModePerm node.Mode = fi.Mode() & os.ModePerm
node.ModTime = fi.ModTime() node.ModTime = fi.ModTime()
switch fi.Mode() & (os.ModeType | os.ModeCharDevice) { node.Type = nodeTypeFromFileInfo(path, fi)
case 0: if node.Type == "file" {
node.Type = "file"
node.Size = uint64(fi.Size()) node.Size = uint64(fi.Size())
case os.ModeDir:
node.Type = "dir"
case os.ModeSymlink:
node.Type = "symlink"
case os.ModeDevice | os.ModeCharDevice:
node.Type = "chardev"
case os.ModeDevice:
node.Type = "dev"
case os.ModeNamedPipe:
node.Type = "fifo"
case os.ModeSocket:
node.Type = "socket"
} }
err := node.fill_extra(path, fi) err := node.fill_extra(path, fi)
return node, err return node, err
} }
func nodeTypeFromFileInfo(path string, fi os.FileInfo) string {
switch fi.Mode() & (os.ModeType | os.ModeCharDevice) {
case 0:
return "file"
case os.ModeDir:
return "dir"
case os.ModeSymlink:
return "symlink"
case os.ModeDevice | os.ModeCharDevice:
return "chardev"
case os.ModeDevice:
return "dev"
case os.ModeNamedPipe:
return "fifo"
case os.ModeSocket:
return "socket"
}
return ""
}
func CreateNodeAt(node *Node, m *Map, s Server, path string) error { func CreateNodeAt(node *Node, m *Map, s Server, path string) error {
switch node.Type { switch node.Type {
case "dir": case "dir":

View File

@ -7,6 +7,8 @@ import (
"strconv" "strconv"
"syscall" "syscall"
"time" "time"
"github.com/restic/restic/debug"
) )
func (node *Node) fill_extra(path string, fi os.FileInfo) (err error) { func (node *Node) fill_extra(path string, fi os.FileInfo) (err error) {
@ -66,3 +68,38 @@ func (node *Node) createCharDevAt(path string) error {
func (node *Node) createFifoAt(path string) error { func (node *Node) createFifoAt(path string) error {
return syscall.Mkfifo(path, 0600) return syscall.Mkfifo(path, 0600)
} }
func (node *Node) isNewer(path string, fi os.FileInfo) bool {
// if this node has a type other than "file", treat as if content has changed
if node.Type != "file" {
debug.Log("node.isNewer", "node %v is newer: not file", path)
return true
}
// if the name or type has changed, this is surely something different
tpe := nodeTypeFromFileInfo(path, fi)
if node.Name != fi.Name() || node.Type != tpe {
debug.Log("node.isNewer", "node %v is newer: name or type changed", path)
return false
}
// collect extended stat
stat := fi.Sys().(*syscall.Stat_t)
changeTime := time.Unix(stat.Ctim.Unix())
inode := stat.Ino
size := uint64(stat.Size)
// if timestamps or inodes differ, content has changed
if node.ModTime != fi.ModTime() ||
node.ChangeTime != changeTime ||
node.Inode != inode ||
node.Size != size {
debug.Log("node.isNewer", "node %v is newer: timestamp or inode changed", path)
return false
}
// otherwise the node is assumed to have the same content
debug.Log("node.isNewer", "node %v is not newer", path)
return false
}

View File

@ -10,22 +10,51 @@ import (
"github.com/restic/restic/debug" "github.com/restic/restic/debug"
) )
type Entry struct { type Result interface{}
Path string
Info os.FileInfo type Job interface {
Error error Path() string
Result chan<- interface{} Fullpath() string
Error() error
Info() os.FileInfo
Result() chan<- Result
} }
type Entry struct {
basedir string
path string
info os.FileInfo
error error
result chan<- Result
// points to the old node if available, interface{} is used to prevent
// circular import
Node interface{}
}
func (e Entry) Path() string { return e.path }
func (e Entry) Fullpath() string { return filepath.Join(e.basedir, e.path) }
func (e Entry) Error() error { return e.error }
func (e Entry) Info() os.FileInfo { return e.info }
func (e Entry) Result() chan<- Result { return e.result }
type Dir struct { type Dir struct {
Path string basedir string
Error error path string
Info os.FileInfo error error
info os.FileInfo
Entries [](<-chan interface{}) Entries [](<-chan Result)
Result chan<- interface{} result chan<- Result
} }
func (e Dir) Path() string { return e.path }
func (e Dir) Fullpath() string { return filepath.Join(e.basedir, e.path) }
func (e Dir) Error() error { return e.error }
func (e Dir) Info() os.FileInfo { return e.info }
func (e Dir) Result() chan<- Result { return e.result }
// readDirNames reads the directory named by dirname and returns // readDirNames reads the directory named by dirname and returns
// a sorted list of directory entries. // a sorted list of directory entries.
// taken from filepath/path.go // taken from filepath/path.go
@ -53,15 +82,17 @@ func isFile(fi os.FileInfo) bool {
var errCancelled = errors.New("walk cancelled") var errCancelled = errors.New("walk cancelled")
func walk(path string, done chan struct{}, jobs chan<- interface{}, res chan<- interface{}) error { func walk(basedir, path string, done chan struct{}, jobs chan<- Job, res chan<- Result) error {
info, err := os.Lstat(path) info, err := os.Lstat(path)
if err != nil { if err != nil {
return err return err
} }
relpath, _ := filepath.Rel(basedir, path)
if !info.IsDir() { if !info.IsDir() {
select { select {
case jobs <- Entry{Info: info, Path: path, Result: res}: case jobs <- Entry{info: info, basedir: basedir, path: relpath, result: res}:
case <-done: case <-done:
return errCancelled return errCancelled
} }
@ -73,18 +104,18 @@ func walk(path string, done chan struct{}, jobs chan<- interface{}, res chan<- i
return err return err
} }
entries := make([]<-chan interface{}, 0, len(names)) entries := make([]<-chan Result, 0, len(names))
for _, name := range names { for _, name := range names {
subpath := filepath.Join(path, name) subpath := filepath.Join(path, name)
ch := make(chan interface{}, 1) ch := make(chan Result, 1)
entries = append(entries, ch) entries = append(entries, ch)
fi, err := os.Lstat(subpath) fi, err := os.Lstat(subpath)
if err != nil { if err != nil {
select { select {
case jobs <- Entry{Info: fi, Error: err, Result: ch}: case jobs <- Entry{info: fi, error: err, result: ch}:
case <-done: case <-done:
return errCancelled return errCancelled
} }
@ -92,14 +123,14 @@ func walk(path string, done chan struct{}, jobs chan<- interface{}, res chan<- i
} }
if isDir(fi) { if isDir(fi) {
err = walk(subpath, done, jobs, ch) err = walk(basedir, subpath, done, jobs, ch)
if err != nil { if err != nil {
return err return err
} }
} else { } else {
select { select {
case jobs <- Entry{Info: fi, Path: subpath, Result: ch}: case jobs <- Entry{info: fi, basedir: basedir, path: filepath.Join(relpath, name), result: ch}:
case <-done: case <-done:
return errCancelled return errCancelled
} }
@ -107,7 +138,7 @@ func walk(path string, done chan struct{}, jobs chan<- interface{}, res chan<- i
} }
select { select {
case jobs <- Dir{Path: path, Info: info, Entries: entries, Result: res}: case jobs <- Dir{basedir: basedir, path: relpath, info: info, Entries: entries, result: res}:
case <-done: case <-done:
return errCancelled return errCancelled
} }
@ -116,31 +147,30 @@ func walk(path string, done chan struct{}, jobs chan<- interface{}, res chan<- i
// Walk sends a Job for each file and directory it finds below the paths. When // Walk sends a Job for each file and directory it finds below the paths. When
// the channel done is closed, processing stops. // the channel done is closed, processing stops.
func Walk(paths []string, done chan struct{}, jobs chan<- interface{}) (<-chan interface{}, error) { func Walk(paths []string, done chan struct{}, jobs chan<- Job, res chan<- Result) error {
resCh := make(chan interface{}, 1)
defer func() { defer func() {
close(resCh)
close(jobs)
debug.Log("pipe.Walk", "output channel closed") debug.Log("pipe.Walk", "output channel closed")
close(res)
close(jobs)
}() }()
entries := make([]<-chan interface{}, 0, len(paths)) entries := make([]<-chan Result, 0, len(paths))
for _, path := range paths { for _, path := range paths {
debug.Log("pipe.Walk", "start walker for %v", path) debug.Log("pipe.Walk", "start walker for %v", path)
ch := make(chan interface{}, 1) ch := make(chan Result, 1)
entries = append(entries, ch) entries = append(entries, ch)
err := walk(path, done, jobs, ch) err := walk(filepath.Dir(path), path, done, jobs, ch)
if err != nil { if err != nil {
return nil, err return err
} }
debug.Log("pipe.Walk", "walker for %v done", path) debug.Log("pipe.Walk", "walker for %v done", path)
} }
resCh <- Dir{Entries: entries} res <- Dir{Entries: entries}
return resCh, nil return nil
} }
// Split feeds all elements read from inChan to dirChan and entChan. // Split feeds all elements read from inChan to dirChan and entChan.
func Split(inChan <-chan interface{}, dirChan chan<- Dir, entChan chan<- Entry) { func Split(inChan <-chan Job, dirChan chan<- Dir, entChan chan<- Entry) {
debug.Log("pipe.Split", "start") debug.Log("pipe.Split", "start")
defer debug.Log("pipe.Split", "done") defer debug.Log("pipe.Split", "done")

View File

@ -71,7 +71,7 @@ func TestPipelineWalkerWithSplit(t *testing.T) {
after.files++ after.files++
m.Unlock() m.Unlock()
e.Result <- true e.Result() <- true
case dir, ok := <-dirCh: case dir, ok := <-dirCh:
if !ok { if !ok {
@ -88,7 +88,7 @@ func TestPipelineWalkerWithSplit(t *testing.T) {
after.dirs++ after.dirs++
m.Unlock() m.Unlock()
dir.Result <- true dir.Result() <- true
case <-done: case <-done:
// pipeline was cancelled // pipeline was cancelled
return return
@ -106,7 +106,7 @@ func TestPipelineWalkerWithSplit(t *testing.T) {
go worker(&wg, done, entCh, dirCh) go worker(&wg, done, entCh, dirCh)
} }
jobs := make(chan interface{}, 200) jobs := make(chan pipe.Job, 200)
wg.Add(1) wg.Add(1)
go func() { go func() {
pipe.Split(jobs, dirCh, entCh) pipe.Split(jobs, dirCh, entCh)
@ -115,7 +115,8 @@ func TestPipelineWalkerWithSplit(t *testing.T) {
wg.Done() wg.Done()
}() }()
resCh, err := pipe.Walk([]string{*testWalkerPath}, done, jobs) resCh := make(chan pipe.Result, 1)
err = pipe.Walk([]string{*testWalkerPath}, done, jobs, resCh)
ok(t, err) ok(t, err)
// wait for all workers to terminate // wait for all workers to terminate
@ -144,7 +145,7 @@ func TestPipelineWalker(t *testing.T) {
after := stats{} after := stats{}
m := sync.Mutex{} m := sync.Mutex{}
worker := func(wg *sync.WaitGroup, done <-chan struct{}, jobs <-chan interface{}) { worker := func(wg *sync.WaitGroup, done <-chan struct{}, jobs <-chan pipe.Job) {
defer wg.Done() defer wg.Done()
for { for {
select { select {
@ -166,13 +167,13 @@ func TestPipelineWalker(t *testing.T) {
after.dirs++ after.dirs++
m.Unlock() m.Unlock()
j.Result <- true j.Result() <- true
case pipe.Entry: case pipe.Entry:
m.Lock() m.Lock()
after.files++ after.files++
m.Unlock() m.Unlock()
j.Result <- true j.Result() <- true
} }
case <-done: case <-done:
@ -184,14 +185,15 @@ func TestPipelineWalker(t *testing.T) {
var wg sync.WaitGroup var wg sync.WaitGroup
done := make(chan struct{}) done := make(chan struct{})
jobs := make(chan interface{}) jobs := make(chan pipe.Job)
for i := 0; i < *maxWorkers; i++ { for i := 0; i < *maxWorkers; i++ {
wg.Add(1) wg.Add(1)
go worker(&wg, done, jobs) go worker(&wg, done, jobs)
} }
resCh, err := pipe.Walk([]string{*testWalkerPath}, done, jobs) resCh := make(chan pipe.Result, 1)
err = pipe.Walk([]string{*testWalkerPath}, done, jobs, resCh)
ok(t, err) ok(t, err)
// wait for all workers to terminate // wait for all workers to terminate
@ -227,7 +229,7 @@ func BenchmarkPipelineWalker(b *testing.B) {
// simulate backup // simulate backup
//time.Sleep(10 * time.Millisecond) //time.Sleep(10 * time.Millisecond)
e.Result <- true e.Result() <- true
case <-done: case <-done:
// pipeline was cancelled // pipeline was cancelled
return return
@ -259,7 +261,7 @@ func BenchmarkPipelineWalker(b *testing.B) {
} }
m.Unlock() m.Unlock()
dir.Result <- true dir.Result() <- true
case <-done: case <-done:
// pipeline was cancelled // pipeline was cancelled
return return
@ -281,7 +283,7 @@ func BenchmarkPipelineWalker(b *testing.B) {
go fileWorker(&wg, done, entCh) go fileWorker(&wg, done, entCh)
} }
jobs := make(chan interface{}, 200) jobs := make(chan pipe.Job, 200)
wg.Add(1) wg.Add(1)
go func() { go func() {
pipe.Split(jobs, dirCh, entCh) pipe.Split(jobs, dirCh, entCh)
@ -290,7 +292,8 @@ func BenchmarkPipelineWalker(b *testing.B) {
wg.Done() wg.Done()
}() }()
resCh, err := pipe.Walk([]string{*testWalkerPath}, done, jobs) resCh := make(chan pipe.Result, 1)
err := pipe.Walk([]string{*testWalkerPath}, done, jobs, resCh)
ok(b, err) ok(b, err)
// wait for all workers to terminate // wait for all workers to terminate

View File

@ -34,7 +34,12 @@ func walkTree(s Server, path string, id backend.ID, done chan struct{}, jobCh ch
} }
walkTree(s, p, blob.Storage, done, jobCh) walkTree(s, p, blob.Storage, done, jobCh)
} else { } else {
jobCh <- WalkTreeJob{Path: p, Node: node} // load old blobs
node.blobs, err = t.Map.Select(node.Content)
if err != nil {
debug.Log("walkTree", "unable to load bobs for %q (%v): %v", path, id.Str(), err)
}
jobCh <- WalkTreeJob{Path: p, Node: node, Error: err}
} }
} }

View File

@ -38,8 +38,9 @@ func TestWalkTree(t *testing.T) {
go restic.WalkTree(server, sn.Tree.Storage, done, treeJobs) go restic.WalkTree(server, sn.Tree.Storage, done, treeJobs)
// start filesystem walker // start filesystem walker
fsJobs := make(chan interface{}) fsJobs := make(chan pipe.Job)
go pipe.Walk(dirs, done, fsJobs) resCh := make(chan pipe.Result, 1)
go pipe.Walk(dirs, done, fsJobs, resCh)
for { for {
// receive fs job // receive fs job
@ -51,10 +52,10 @@ func TestWalkTree(t *testing.T) {
fsEntries := 1 fsEntries := 1
switch j := fsJob.(type) { switch j := fsJob.(type) {
case pipe.Dir: case pipe.Dir:
path = j.Path path = j.Path()
fsEntries = len(j.Entries) fsEntries = len(j.Entries)
case pipe.Entry: case pipe.Entry:
path = j.Path path = j.Path()
} }
// receive tree job // receive tree job