Merge pull request #344 from restic/fix-338

pipe: propagate errors properly
This commit is contained in:
Alexander Neumann 2015-11-08 17:58:55 +01:00
commit 19a713970f
5 changed files with 211 additions and 60 deletions

View File

@ -304,6 +304,7 @@ func (arch *Archiver) fileWorker(wg *sync.WaitGroup, p *Progress, done <-chan st
}
func (arch *Archiver) dirWorker(wg *sync.WaitGroup, p *Progress, done <-chan struct{}, dirCh <-chan pipe.Dir) {
debug.Log("Archiver.dirWorker", "start")
defer func() {
debug.Log("Archiver.dirWorker", "done")
wg.Done()
@ -315,12 +316,20 @@ func (arch *Archiver) dirWorker(wg *sync.WaitGroup, p *Progress, done <-chan str
// channel is closed
return
}
debug.Log("Archiver.dirWorker", "save dir %v\n", dir.Path())
debug.Log("Archiver.dirWorker", "save dir %v (%d entries), error %v\n", dir.Path(), len(dir.Entries), dir.Error())
// ignore dir nodes with errors
if dir.Error() != nil {
dir.Result() <- nil
p.Report(Stat{Errors: 1})
continue
}
tree := NewTree()
// wait for all content
for _, ch := range dir.Entries {
debug.Log("Archiver.dirWorker", "receiving result from %v", ch)
res := <-ch
// if we get a nil pointer here, an error has happened while
@ -342,21 +351,20 @@ func (arch *Archiver) dirWorker(wg *sync.WaitGroup, p *Progress, done <-chan str
}
}
var (
node *Node
err error
)
if dir.Path() == "" {
// if this is the top-level dir, only create a stub node
node = &Node{}
} else {
// else create node from path and fi
node, err = NodeFromFileInfo(dir.Path(), dir.Info())
node := &Node{}
if dir.Path() != "" && dir.Info() != nil {
n, err := NodeFromFileInfo(dir.Path(), dir.Info())
if err != nil {
node.Error = err.Error()
dir.Result() <- node
n.Error = err.Error()
dir.Result() <- n
continue
}
node = n
}
if err := dir.Error(); err != nil {
node.Error = err.Error()
}
id, err := arch.SaveTreeJSON(tree)
@ -370,6 +378,8 @@ func (arch *Archiver) dirWorker(wg *sync.WaitGroup, p *Progress, done <-chan str
node.Subtree = &id
debug.Log("Archiver.dirWorker", "sending result to %v", dir.Result())
dir.Result() <- node
if dir.Path() != "" {
p.Report(Stat{Dirs: 1})
@ -615,11 +625,7 @@ func (arch *Archiver) Snapshot(p *Progress, paths []string, parentID *backend.ID
pipeCh := make(chan pipe.Job)
resCh := make(chan pipe.Result, 1)
go func() {
err := pipe.Walk(paths, arch.SelectFilter, done, pipeCh, resCh)
if err != nil {
debug.Log("Archiver.Snapshot", "pipe.Walk returned error %v", err)
return
}
pipe.Walk(paths, arch.SelectFilter, done, pipeCh, resCh)
debug.Log("Archiver.Snapshot", "pipe.Walk done")
}()
jobs.New = pipeCh

View File

@ -291,6 +291,56 @@ func TestBackupMissingFile2(t *testing.T) {
})
}
func TestBackupDirectoryError(t *testing.T) {
withTestEnvironment(t, func(env *testEnvironment, global GlobalOptions) {
datafile := filepath.Join("testdata", "backup-data.tar.gz")
fd, err := os.Open(datafile)
if os.IsNotExist(err) {
t.Skipf("unable to find data file %q, skipping", datafile)
return
}
OK(t, err)
OK(t, fd.Close())
SetupTarTestFixture(t, env.testdata, datafile)
cmdInit(t, global)
global.stderr = ioutil.Discard
ranHook := false
testdir := filepath.Join(env.testdata, "0", "0", "9")
// install hook that removes the dir right before readdirnames()
debug.Hook("pipe.readdirnames", func(context interface{}) {
path := context.(string)
if path != testdir {
return
}
t.Logf("in hook, removing test file %v", testdir)
ranHook = true
OK(t, os.RemoveAll(testdir))
})
cmdBackup(t, global, []string{filepath.Join(env.testdata, "0", "0")}, nil)
cmdCheck(t, global)
Assert(t, ranHook, "hook did not run")
debug.RemoveHook("pipe.walk2")
snapshots := cmdList(t, global, "snapshots")
Assert(t, len(snapshots) > 0,
"no snapshots found in repo (%v)", datafile)
files := cmdLs(t, global, snapshots[0].String())
Assert(t, len(files) > 1, "snapshot is empty")
})
}
func includes(haystack []string, needle string) bool {
for _, s := range haystack {
if s == needle {

View File

@ -10,12 +10,13 @@ import (
"syscall"
"time"
"runtime"
"github.com/juju/errors"
"github.com/restic/restic/backend"
"github.com/restic/restic/debug"
"github.com/restic/restic/pack"
"github.com/restic/restic/repository"
"runtime"
)
// Node is a file, directory or other item in a backup.

View File

@ -82,32 +82,46 @@ var errCancelled = errors.New("walk cancelled")
// dirs). If false is returned, files are ignored and dirs are not even walked.
type SelectFunc func(item string, fi os.FileInfo) bool
func walk(basedir, dir string, selectFunc SelectFunc, done chan struct{}, jobs chan<- Job, res chan<- Result) error {
func walk(basedir, dir string, selectFunc SelectFunc, done <-chan struct{}, jobs chan<- Job, res chan<- Result) {
debug.Log("pipe.walk", "start on %v", dir)
relpath, err := filepath.Rel(basedir, dir)
if err != nil {
panic(err)
}
info, err := os.Lstat(dir)
if err != nil {
debug.Log("pipe.walk", "error for %v: %v", dir, err)
return err
select {
case jobs <- Dir{basedir: basedir, path: relpath, info: info, error: err, result: res}:
case <-done:
}
return
}
if !selectFunc(dir, info) {
debug.Log("pipe.walk", "file %v excluded by filter", dir)
return nil
return
}
relpath, _ := filepath.Rel(basedir, dir)
if !info.IsDir() {
select {
case jobs <- Entry{info: info, basedir: basedir, path: relpath, result: res}:
case <-done:
return errCancelled
}
return nil
return
}
debug.RunHook("pipe.readdirnames", dir)
names, err := readDirNames(dir)
if err != nil {
return err
debug.Log("pipe.walk", "Readdirnames(%v) returned error: %v", dir, err)
select {
case <-done:
case jobs <- Dir{basedir: basedir, path: relpath, info: info, error: err, result: res}:
}
return
}
// Insert breakpoint to allow testing behaviour with vanishing files
@ -132,7 +146,7 @@ func walk(basedir, dir string, selectFunc SelectFunc, done chan struct{}, jobs c
select {
case jobs <- Entry{info: fi, error: statErr, basedir: basedir, path: filepath.Join(relpath, name), result: ch}:
case <-done:
return errCancelled
return
}
continue
}
@ -141,32 +155,19 @@ func walk(basedir, dir string, selectFunc SelectFunc, done chan struct{}, jobs c
// between walk and open
debug.RunHook("pipe.walk2", filepath.Join(relpath, name))
if isDir(fi) {
err = walk(basedir, subpath, selectFunc, done, jobs, ch)
if err != nil {
return err
}
} else {
select {
case jobs <- Entry{info: fi, basedir: basedir, path: filepath.Join(relpath, name), result: ch}:
case <-done:
return errCancelled
}
}
walk(basedir, subpath, selectFunc, done, jobs, ch)
}
select {
case jobs <- Dir{basedir: basedir, path: relpath, info: info, Entries: entries, result: res}:
case <-done:
return errCancelled
}
return nil
}
// Walk sends a Job for each file and directory it finds below the paths. When
// the channel done is closed, processing stops.
func Walk(paths []string, selectFunc SelectFunc, done chan struct{}, jobs chan<- Job, res chan<- Result) error {
func Walk(paths []string, selectFunc SelectFunc, done chan struct{}, jobs chan<- Job, res chan<- Result) {
debug.Log("pipe.Walk", "start on %v", paths)
defer func() {
debug.Log("pipe.Walk", "output channel closed")
close(jobs)
@ -176,11 +177,7 @@ func Walk(paths []string, selectFunc SelectFunc, done chan struct{}, jobs chan<-
for _, path := range paths {
debug.Log("pipe.Walk", "start walker for %v", path)
ch := make(chan Result, 1)
err := walk(filepath.Dir(path), path, selectFunc, done, jobs, ch)
if err != nil {
debug.Log("pipe.Walk", "error for %v: %v", path, err)
continue
}
walk(filepath.Dir(path), path, selectFunc, done, jobs, ch)
entries = append(entries, ch)
debug.Log("pipe.Walk", "walker for %v done", path)
}
@ -188,13 +185,11 @@ func Walk(paths []string, selectFunc SelectFunc, done chan struct{}, jobs chan<-
debug.Log("pipe.Walk", "sending root node")
select {
case <-done:
return errCancelled
return
case jobs <- Dir{Entries: entries, result: res}:
}
debug.Log("pipe.Walk", "walker done")
return nil
}
// Split feeds all elements read from inChan to dirChan and entChan.

View File

@ -1,12 +1,14 @@
package pipe_test
import (
"io/ioutil"
"os"
"path/filepath"
"sync"
"testing"
"time"
"github.com/restic/restic/debug"
"github.com/restic/restic/pipe"
. "github.com/restic/restic/test"
)
@ -122,8 +124,7 @@ func TestPipelineWalkerWithSplit(t *testing.T) {
}()
resCh := make(chan pipe.Result, 1)
err = pipe.Walk([]string{TestWalkerPath}, acceptAll, done, jobs, resCh)
OK(t, err)
pipe.Walk([]string{TestWalkerPath}, acceptAll, done, jobs, resCh)
// wait for all workers to terminate
wg.Wait()
@ -202,8 +203,7 @@ func TestPipelineWalker(t *testing.T) {
}
resCh := make(chan pipe.Result, 1)
err = pipe.Walk([]string{TestWalkerPath}, acceptAll, done, jobs, resCh)
OK(t, err)
pipe.Walk([]string{TestWalkerPath}, acceptAll, done, jobs, resCh)
// wait for all workers to terminate
wg.Wait()
@ -217,6 +217,107 @@ func TestPipelineWalker(t *testing.T) {
Assert(t, before == after, "stats do not match, expected %v, got %v", before, after)
}
func createFile(filename, data string) error {
f, err := os.Create(filename)
if err != nil {
return err
}
defer f.Close()
_, err = f.Write([]byte(data))
if err != nil {
return err
}
return nil
}
func TestPipeWalkerError(t *testing.T) {
dir, err := ioutil.TempDir("", "restic-test-")
OK(t, err)
base := filepath.Base(dir)
var testjobs = []struct {
path []string
err bool
}{
{[]string{base, "a", "file_a"}, false},
{[]string{base, "a"}, false},
{[]string{base, "b"}, true},
{[]string{base, "c", "file_c"}, false},
{[]string{base, "c"}, false},
{[]string{base}, false},
{[]string{}, false},
}
OK(t, os.Mkdir(filepath.Join(dir, "a"), 0755))
OK(t, os.Mkdir(filepath.Join(dir, "b"), 0755))
OK(t, os.Mkdir(filepath.Join(dir, "c"), 0755))
OK(t, createFile(filepath.Join(dir, "a", "file_a"), "file a"))
OK(t, createFile(filepath.Join(dir, "b", "file_b"), "file b"))
OK(t, createFile(filepath.Join(dir, "c", "file_c"), "file c"))
ranHook := false
testdir := filepath.Join(dir, "b")
// install hook that removes the dir right before readdirnames()
debug.Hook("pipe.readdirnames", func(context interface{}) {
path := context.(string)
if path != testdir {
return
}
t.Logf("in hook, removing test file %v", testdir)
ranHook = true
OK(t, os.RemoveAll(testdir))
})
done := make(chan struct{})
ch := make(chan pipe.Job)
resCh := make(chan pipe.Result, 1)
go pipe.Walk([]string{dir}, acceptAll, done, ch, resCh)
i := 0
for job := range ch {
if i == len(testjobs) {
t.Errorf("too many jobs received")
break
}
p := filepath.Join(testjobs[i].path...)
if p != job.Path() {
t.Errorf("job %d has wrong path: expected %q, got %q", i, p, job.Path())
}
if testjobs[i].err {
if job.Error() == nil {
t.Errorf("job %d expected error but got nil", i)
}
} else {
if job.Error() != nil {
t.Errorf("job %d expected no error but got %v", i, job.Error())
}
}
i++
}
if i != len(testjobs) {
t.Errorf("expected %d jobs, got %d", len(testjobs), i)
}
close(done)
Assert(t, ranHook, "hook did not run")
OK(t, os.RemoveAll(dir))
}
func BenchmarkPipelineWalker(b *testing.B) {
if TestWalkerPath == "" {
b.Skipf("walkerpath not set, skipping BenchPipelineWalker")
@ -302,8 +403,7 @@ func BenchmarkPipelineWalker(b *testing.B) {
}()
resCh := make(chan pipe.Result, 1)
err := pipe.Walk([]string{TestWalkerPath}, acceptAll, done, jobs, resCh)
OK(b, err)
pipe.Walk([]string{TestWalkerPath}, acceptAll, done, jobs, resCh)
// wait for all workers to terminate
wg.Wait()
@ -379,8 +479,7 @@ func TestPipelineWalkerMultiple(t *testing.T) {
}
resCh := make(chan pipe.Result, 1)
err = pipe.Walk(paths, acceptAll, done, jobs, resCh)
OK(t, err)
pipe.Walk(paths, acceptAll, done, jobs, resCh)
// wait for all workers to terminate
wg.Wait()