From 1569176e481ba13d1ae4bc164fb7a79feff38e1c Mon Sep 17 00:00:00 2001 From: Alexander Neumann Date: Fri, 6 Nov 2015 19:41:57 +0100 Subject: [PATCH 1/4] pipe: propagate errors properly --- archiver.go | 35 ++++++++-------- node.go | 3 +- pipe/pipe.go | 60 ++++++++++++--------------- pipe/pipe_test.go | 102 ++++++++++++++++++++++++++++++++++++++++++---- 4 files changed, 140 insertions(+), 60 deletions(-) diff --git a/archiver.go b/archiver.go index a26dc3e7f..61d2ea8f2 100644 --- a/archiver.go +++ b/archiver.go @@ -304,6 +304,7 @@ func (arch *Archiver) fileWorker(wg *sync.WaitGroup, p *Progress, done <-chan st } func (arch *Archiver) dirWorker(wg *sync.WaitGroup, p *Progress, done <-chan struct{}, dirCh <-chan pipe.Dir) { + debug.Log("Archiver.dirWorker", "start") defer func() { debug.Log("Archiver.dirWorker", "done") wg.Done() @@ -315,12 +316,13 @@ func (arch *Archiver) dirWorker(wg *sync.WaitGroup, p *Progress, done <-chan str // channel is closed return } - debug.Log("Archiver.dirWorker", "save dir %v\n", dir.Path()) + debug.Log("Archiver.dirWorker", "save dir %v (%d entries), error %v\n", dir.Path(), len(dir.Entries), dir.Error()) tree := NewTree() // wait for all content for _, ch := range dir.Entries { + debug.Log("Archiver.dirWorker", "receiving result from %v", ch) res := <-ch // if we get a nil pointer here, an error has happened while @@ -342,21 +344,20 @@ func (arch *Archiver) dirWorker(wg *sync.WaitGroup, p *Progress, done <-chan str } } - var ( - node *Node - err error - ) - if dir.Path() == "" { - // if this is the top-level dir, only create a stub node - node = &Node{} - } else { - // else create node from path and fi - node, err = NodeFromFileInfo(dir.Path(), dir.Info()) + node := &Node{} + + if dir.Path() != "" && dir.Info() != nil { + n, err := NodeFromFileInfo(dir.Path(), dir.Info()) if err != nil { - node.Error = err.Error() - dir.Result() <- node + n.Error = err.Error() + dir.Result() <- n continue } + node = n + } + + if err := dir.Error(); err != nil { + node.Error = err.Error() } id, err := arch.SaveTreeJSON(tree) @@ -370,6 +371,8 @@ func (arch *Archiver) dirWorker(wg *sync.WaitGroup, p *Progress, done <-chan str node.Subtree = &id + debug.Log("Archiver.dirWorker", "sending result to %v", dir.Result()) + dir.Result() <- node if dir.Path() != "" { p.Report(Stat{Dirs: 1}) @@ -615,11 +618,7 @@ func (arch *Archiver) Snapshot(p *Progress, paths []string, parentID *backend.ID pipeCh := make(chan pipe.Job) resCh := make(chan pipe.Result, 1) go func() { - err := pipe.Walk(paths, arch.SelectFilter, done, pipeCh, resCh) - if err != nil { - debug.Log("Archiver.Snapshot", "pipe.Walk returned error %v", err) - return - } + pipe.Walk(paths, arch.SelectFilter, done, pipeCh, resCh) debug.Log("Archiver.Snapshot", "pipe.Walk done") }() jobs.New = pipeCh diff --git a/node.go b/node.go index eb1ca5f02..4690f1333 100644 --- a/node.go +++ b/node.go @@ -10,12 +10,13 @@ import ( "syscall" "time" + "runtime" + "github.com/juju/errors" "github.com/restic/restic/backend" "github.com/restic/restic/debug" "github.com/restic/restic/pack" "github.com/restic/restic/repository" - "runtime" ) // Node is a file, directory or other item in a backup. diff --git a/pipe/pipe.go b/pipe/pipe.go index 83b3353f6..d911f6087 100644 --- a/pipe/pipe.go +++ b/pipe/pipe.go @@ -82,32 +82,45 @@ var errCancelled = errors.New("walk cancelled") // dirs). If false is returned, files are ignored and dirs are not even walked. type SelectFunc func(item string, fi os.FileInfo) bool -func walk(basedir, dir string, selectFunc SelectFunc, done chan struct{}, jobs chan<- Job, res chan<- Result) error { +func walk(basedir, dir string, selectFunc SelectFunc, done <-chan struct{}, jobs chan<- Job, res chan<- Result) { + debug.Log("pipe.walk", "start on %v", dir) + + relpath, err := filepath.Rel(basedir, dir) + if err != nil { + panic(err) + } + info, err := os.Lstat(dir) if err != nil { debug.Log("pipe.walk", "error for %v: %v", dir, err) - return err + select { + case jobs <- Dir{basedir: basedir, path: relpath, info: info, error: err, result: res}: + case <-done: + } + return } if !selectFunc(dir, info) { debug.Log("pipe.walk", "file %v excluded by filter", dir) - return nil + return } - relpath, _ := filepath.Rel(basedir, dir) - if !info.IsDir() { select { case jobs <- Entry{info: info, basedir: basedir, path: relpath, result: res}: case <-done: - return errCancelled } - return nil + return } names, err := readDirNames(dir) if err != nil { - return err + debug.Log("pipe.walk", "Readdirnames(%v) returned error: %v", dir, err) + select { + case <-done: + case jobs <- Dir{basedir: basedir, path: relpath, info: info, error: err, result: res}: + } + return } // Insert breakpoint to allow testing behaviour with vanishing files @@ -132,7 +145,7 @@ func walk(basedir, dir string, selectFunc SelectFunc, done chan struct{}, jobs c select { case jobs <- Entry{info: fi, error: statErr, basedir: basedir, path: filepath.Join(relpath, name), result: ch}: case <-done: - return errCancelled + return } continue } @@ -141,32 +154,19 @@ func walk(basedir, dir string, selectFunc SelectFunc, done chan struct{}, jobs c // between walk and open debug.RunHook("pipe.walk2", filepath.Join(relpath, name)) - if isDir(fi) { - err = walk(basedir, subpath, selectFunc, done, jobs, ch) - if err != nil { - return err - } - - } else { - select { - case jobs <- Entry{info: fi, basedir: basedir, path: filepath.Join(relpath, name), result: ch}: - case <-done: - return errCancelled - } - } + walk(basedir, subpath, selectFunc, done, jobs, ch) } select { case jobs <- Dir{basedir: basedir, path: relpath, info: info, Entries: entries, result: res}: case <-done: - return errCancelled } - return nil } // Walk sends a Job for each file and directory it finds below the paths. When // the channel done is closed, processing stops. -func Walk(paths []string, selectFunc SelectFunc, done chan struct{}, jobs chan<- Job, res chan<- Result) error { +func Walk(paths []string, selectFunc SelectFunc, done chan struct{}, jobs chan<- Job, res chan<- Result) { + debug.Log("pipe.Walk", "start on %v", paths) defer func() { debug.Log("pipe.Walk", "output channel closed") close(jobs) @@ -176,11 +176,7 @@ func Walk(paths []string, selectFunc SelectFunc, done chan struct{}, jobs chan<- for _, path := range paths { debug.Log("pipe.Walk", "start walker for %v", path) ch := make(chan Result, 1) - err := walk(filepath.Dir(path), path, selectFunc, done, jobs, ch) - if err != nil { - debug.Log("pipe.Walk", "error for %v: %v", path, err) - continue - } + walk(filepath.Dir(path), path, selectFunc, done, jobs, ch) entries = append(entries, ch) debug.Log("pipe.Walk", "walker for %v done", path) } @@ -188,13 +184,11 @@ func Walk(paths []string, selectFunc SelectFunc, done chan struct{}, jobs chan<- debug.Log("pipe.Walk", "sending root node") select { case <-done: - return errCancelled + return case jobs <- Dir{Entries: entries, result: res}: } debug.Log("pipe.Walk", "walker done") - - return nil } // Split feeds all elements read from inChan to dirChan and entChan. diff --git a/pipe/pipe_test.go b/pipe/pipe_test.go index 001015938..95bbf7db8 100644 --- a/pipe/pipe_test.go +++ b/pipe/pipe_test.go @@ -1,6 +1,8 @@ package pipe_test import ( + "fmt" + "io/ioutil" "os" "path/filepath" "sync" @@ -122,8 +124,7 @@ func TestPipelineWalkerWithSplit(t *testing.T) { }() resCh := make(chan pipe.Result, 1) - err = pipe.Walk([]string{TestWalkerPath}, acceptAll, done, jobs, resCh) - OK(t, err) + pipe.Walk([]string{TestWalkerPath}, acceptAll, done, jobs, resCh) // wait for all workers to terminate wg.Wait() @@ -202,8 +203,7 @@ func TestPipelineWalker(t *testing.T) { } resCh := make(chan pipe.Result, 1) - err = pipe.Walk([]string{TestWalkerPath}, acceptAll, done, jobs, resCh) - OK(t, err) + pipe.Walk([]string{TestWalkerPath}, acceptAll, done, jobs, resCh) // wait for all workers to terminate wg.Wait() @@ -217,6 +217,94 @@ func TestPipelineWalker(t *testing.T) { Assert(t, before == after, "stats do not match, expected %v, got %v", before, after) } +func createFile(filename, data string) error { + f, err := os.Create(filename) + if err != nil { + return err + } + + defer f.Close() + + _, err = f.Write([]byte(data)) + if err != nil { + return err + } + + return nil +} + +func TestPipeWalkerError(t *testing.T) { + dir, err := ioutil.TempDir("", "restic-test-") + OK(t, err) + + base := filepath.Base(dir) + + var testjobs = []struct { + path []string + err bool + }{ + {[]string{base, "a", "file_a"}, false}, + {[]string{base, "a"}, false}, + {[]string{base, "b"}, true}, + {[]string{base, "c", "file_c"}, false}, + {[]string{base, "c"}, false}, + {[]string{base}, false}, + {[]string{}, false}, + } + + OK(t, os.Mkdir(filepath.Join(dir, "a"), 0755)) + OK(t, os.Mkdir(filepath.Join(dir, "b"), 0755)) + OK(t, os.Mkdir(filepath.Join(dir, "c"), 0755)) + + OK(t, createFile(filepath.Join(dir, "a", "file_a"), "file a")) + OK(t, createFile(filepath.Join(dir, "b", "file_b"), "file b")) + OK(t, createFile(filepath.Join(dir, "c", "file_c"), "file c")) + + OK(t, os.Chmod(filepath.Join(dir, "b"), 0)) + + done := make(chan struct{}) + ch := make(chan pipe.Job) + resCh := make(chan pipe.Result, 1) + + go pipe.Walk([]string{dir}, acceptAll, done, ch, resCh) + + i := 0 + for job := range ch { + if i == len(testjobs) { + t.Errorf("too many jobs received") + break + } + + fmt.Printf("job %+v: %+v\n", job.Path(), job) + + p := filepath.Join(testjobs[i].path...) + if p != job.Path() { + t.Errorf("job %d has wrong path: expected %q, got %q", i, p, job.Path()) + } + + if testjobs[i].err { + if job.Error() == nil { + t.Errorf("job %d expected error but got nil", i) + } + } else { + if job.Error() != nil { + t.Errorf("job %d expected no error but got %v", i, job.Error()) + } + } + + i++ + } + + if i != len(testjobs) { + t.Errorf("expected %d jobs, got %d", len(testjobs), i) + } + + close(done) + + OK(t, os.Chmod(filepath.Join(dir, "b"), 0755)) + OK(t, os.RemoveAll(dir)) +} + func BenchmarkPipelineWalker(b *testing.B) { if TestWalkerPath == "" { b.Skipf("walkerpath not set, skipping BenchPipelineWalker") @@ -302,8 +390,7 @@ func BenchmarkPipelineWalker(b *testing.B) { }() resCh := make(chan pipe.Result, 1) - err := pipe.Walk([]string{TestWalkerPath}, acceptAll, done, jobs, resCh) - OK(b, err) + pipe.Walk([]string{TestWalkerPath}, acceptAll, done, jobs, resCh) // wait for all workers to terminate wg.Wait() @@ -379,8 +466,7 @@ func TestPipelineWalkerMultiple(t *testing.T) { } resCh := make(chan pipe.Result, 1) - err = pipe.Walk(paths, acceptAll, done, jobs, resCh) - OK(t, err) + pipe.Walk(paths, acceptAll, done, jobs, resCh) // wait for all workers to terminate wg.Wait() From 005c13ff05ecd7be82d8c86c4c555e5b8e6f9429 Mon Sep 17 00:00:00 2001 From: Alexander Neumann Date: Fri, 6 Nov 2015 22:38:34 +0100 Subject: [PATCH 2/4] pipe: make test platform-independent --- pipe/pipe.go | 1 + pipe/pipe_test.go | 23 ++++++++++++++++++----- 2 files changed, 19 insertions(+), 5 deletions(-) diff --git a/pipe/pipe.go b/pipe/pipe.go index d911f6087..1599efa19 100644 --- a/pipe/pipe.go +++ b/pipe/pipe.go @@ -113,6 +113,7 @@ func walk(basedir, dir string, selectFunc SelectFunc, done <-chan struct{}, jobs return } + debug.RunHook("pipe.readdirnames", dir) names, err := readDirNames(dir) if err != nil { debug.Log("pipe.walk", "Readdirnames(%v) returned error: %v", dir, err) diff --git a/pipe/pipe_test.go b/pipe/pipe_test.go index 95bbf7db8..40ae8c0ce 100644 --- a/pipe/pipe_test.go +++ b/pipe/pipe_test.go @@ -1,7 +1,6 @@ package pipe_test import ( - "fmt" "io/ioutil" "os" "path/filepath" @@ -9,6 +8,7 @@ import ( "testing" "time" + "github.com/restic/restic/debug" "github.com/restic/restic/pipe" . "github.com/restic/restic/test" ) @@ -260,7 +260,22 @@ func TestPipeWalkerError(t *testing.T) { OK(t, createFile(filepath.Join(dir, "b", "file_b"), "file b")) OK(t, createFile(filepath.Join(dir, "c", "file_c"), "file c")) - OK(t, os.Chmod(filepath.Join(dir, "b"), 0)) + ranHook := false + testdir := filepath.Join(dir, "b") + + // install hook that removes the dir right before readdirnames() + debug.Hook("pipe.readdirnames", func(context interface{}) { + path := context.(string) + + if path != testdir { + return + } + + t.Logf("in hook, removing test file %v", testdir) + ranHook = true + + OK(t, os.RemoveAll(testdir)) + }) done := make(chan struct{}) ch := make(chan pipe.Job) @@ -275,8 +290,6 @@ func TestPipeWalkerError(t *testing.T) { break } - fmt.Printf("job %+v: %+v\n", job.Path(), job) - p := filepath.Join(testjobs[i].path...) if p != job.Path() { t.Errorf("job %d has wrong path: expected %q, got %q", i, p, job.Path()) @@ -301,7 +314,7 @@ func TestPipeWalkerError(t *testing.T) { close(done) - OK(t, os.Chmod(filepath.Join(dir, "b"), 0755)) + Assert(t, ranHook, "hook did not run") OK(t, os.RemoveAll(dir)) } From ea41a1045f6acbfb1396facc3f7e9d27037eaab7 Mon Sep 17 00:00:00 2001 From: Alexander Neumann Date: Fri, 6 Nov 2015 23:19:56 +0100 Subject: [PATCH 3/4] Add integration test for error on readdirnames --- cmd/restic/integration_test.go | 50 ++++++++++++++++++++++++++++++++++ 1 file changed, 50 insertions(+) diff --git a/cmd/restic/integration_test.go b/cmd/restic/integration_test.go index fa95eca92..9880ea164 100644 --- a/cmd/restic/integration_test.go +++ b/cmd/restic/integration_test.go @@ -291,6 +291,56 @@ func TestBackupMissingFile2(t *testing.T) { }) } +func TestBackupDirectoryError(t *testing.T) { + withTestEnvironment(t, func(env *testEnvironment, global GlobalOptions) { + datafile := filepath.Join("testdata", "backup-data.tar.gz") + fd, err := os.Open(datafile) + if os.IsNotExist(err) { + t.Skipf("unable to find data file %q, skipping", datafile) + return + } + OK(t, err) + OK(t, fd.Close()) + + SetupTarTestFixture(t, env.testdata, datafile) + + cmdInit(t, global) + + global.stderr = ioutil.Discard + ranHook := false + + testdir := filepath.Join(env.testdata, "0", "0", "9") + + // install hook that removes the dir right before readdirnames() + debug.Hook("pipe.readdirnames", func(context interface{}) { + path := context.(string) + + if path != testdir { + return + } + + t.Logf("in hook, removing test file %v", testdir) + ranHook = true + + OK(t, os.RemoveAll(testdir)) + }) + + cmdBackup(t, global, []string{filepath.Join(env.testdata, "0", "0")}, nil) + cmdCheck(t, global) + + Assert(t, ranHook, "hook did not run") + debug.RemoveHook("pipe.walk2") + + snapshots := cmdList(t, global, "snapshots") + Assert(t, len(snapshots) > 0, + "no snapshots found in repo (%v)", datafile) + + files := cmdLs(t, global, snapshots[0].String()) + + Assert(t, len(files) > 1, "snapshot is empty") + }) +} + func includes(haystack []string, needle string) bool { for _, s := range haystack { if s == needle { From 4484a3ea0d878b5143b44cb119e9552c1a5c4c6e Mon Sep 17 00:00:00 2001 From: Alexander Neumann Date: Sat, 7 Nov 2015 11:42:28 +0100 Subject: [PATCH 4/4] archiver: ignore dir nodes with errors --- archiver.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/archiver.go b/archiver.go index 61d2ea8f2..529dd7cdb 100644 --- a/archiver.go +++ b/archiver.go @@ -318,6 +318,13 @@ func (arch *Archiver) dirWorker(wg *sync.WaitGroup, p *Progress, done <-chan str } debug.Log("Archiver.dirWorker", "save dir %v (%d entries), error %v\n", dir.Path(), len(dir.Entries), dir.Error()) + // ignore dir nodes with errors + if dir.Error() != nil { + dir.Result() <- nil + p.Report(Stat{Errors: 1}) + continue + } + tree := NewTree() // wait for all content