From 581c62ee72f3aaad3687a5f411943674793b871b Mon Sep 17 00:00:00 2001 From: Alexander Neumann Date: Sat, 12 May 2018 21:40:31 +0200 Subject: [PATCH] archiver: Improve error handling This commit changes how the worker goroutines for saving e.g. blobs interact. Before, it was possible to get stuck sending an instruction to archive a file or dir when no worker goroutines were available any more. This commit introduces a `done` channel for each of the worker pools, which is set to the channel returned by `tomb.Dying()`, so it is closed when the first worker returned an error. --- cmd/restic/cmd_backup.go | 2 +- internal/archiver/archiver.go | 6 +- internal/archiver/archiver_test.go | 4 + internal/archiver/blob_saver.go | 21 +++-- internal/archiver/blob_saver_test.go | 115 +++++++++++++++++++++++++ internal/archiver/file_saver.go | 48 ++++++----- internal/archiver/file_saver_test.go | 97 ++++++++++++++++++++++ internal/archiver/tree_saver.go | 40 +++++---- internal/archiver/tree_saver_test.go | 120 +++++++++++++++++++++++++++ 9 files changed, 408 insertions(+), 45 deletions(-) create mode 100644 internal/archiver/blob_saver_test.go create mode 100644 internal/archiver/file_saver_test.go create mode 100644 internal/archiver/tree_saver_test.go diff --git a/cmd/restic/cmd_backup.go b/cmd/restic/cmd_backup.go index 562342000..487d8d587 100644 --- a/cmd/restic/cmd_backup.go +++ b/cmd/restic/cmd_backup.go @@ -467,7 +467,7 @@ func runBackup(opts BackupOptions, gopts GlobalOptions, term *termstatus.Termina p.V("start backup on %v", targets) _, id, err := arch.Snapshot(gopts.ctx, targets, snapshotOpts) if err != nil { - return err + return errors.Fatalf("unable to save snapshot: %v", err) } p.Finish() diff --git a/internal/archiver/archiver.go b/internal/archiver/archiver.go index 3e1a6d0b8..50f49f0f9 100644 --- a/internal/archiver/archiver.go +++ b/internal/archiver/archiver.go @@ -271,6 +271,7 @@ func (fn *FutureNode) wait(ctx context.Context) { switch { case fn.isFile: // wait for and collect the data for the file + fn.file.Wait(ctx) fn.node = fn.file.Node() fn.err = fn.file.Err() fn.stats = fn.file.Stats() @@ -281,6 +282,7 @@ func (fn *FutureNode) wait(ctx context.Context) { case fn.isDir: // wait for and collect the data for the dir + fn.dir.Wait(ctx) fn.node = fn.dir.Node() fn.stats = fn.dir.Stats() @@ -713,13 +715,13 @@ func (arch *Archiver) runWorkers(ctx context.Context, t *tomb.Tomb) { arch.fileSaver = NewFileSaver(ctx, t, arch.FS, - arch.blobSaver, + arch.blobSaver.Save, arch.Repo.Config().ChunkerPolynomial, arch.Options.FileReadConcurrency, arch.Options.SaveBlobConcurrency) arch.fileSaver.CompleteBlob = arch.CompleteBlob arch.fileSaver.NodeFromFileInfo = arch.nodeFromFileInfo - arch.treeSaver = NewTreeSaver(ctx, t, arch.Options.SaveTreeConcurrency, arch.saveTree, arch.error) + arch.treeSaver = NewTreeSaver(ctx, t, arch.Options.SaveTreeConcurrency, arch.saveTree, arch.Error) } // Snapshot saves several targets and returns a snapshot. diff --git a/internal/archiver/archiver_test.go b/internal/archiver/archiver_test.go index c24232b50..8b029c780 100644 --- a/internal/archiver/archiver_test.go +++ b/internal/archiver/archiver_test.go @@ -70,6 +70,8 @@ func saveFile(t testing.TB, repo restic.Repository, filename string, filesystem } res := arch.fileSaver.Save(ctx, "/", file, fi, start, complete) + + res.Wait(ctx) if res.Err() != nil { t.Fatal(res.Err()) } @@ -620,6 +622,7 @@ func TestArchiverSaveDir(t *testing.T) { t.Fatal(err) } + ft.Wait(ctx) node, stats := ft.Node(), ft.Stats() tmb.Kill(nil) @@ -701,6 +704,7 @@ func TestArchiverSaveDirIncremental(t *testing.T) { t.Fatal(err) } + ft.Wait(ctx) node, stats := ft.Node(), ft.Stats() tmb.Kill(nil) diff --git a/internal/archiver/blob_saver.go b/internal/archiver/blob_saver.go index 7ef274058..c20b96919 100644 --- a/internal/archiver/blob_saver.go +++ b/internal/archiver/blob_saver.go @@ -5,8 +5,8 @@ import ( "sync" "github.com/restic/restic/internal/debug" - "github.com/restic/restic/internal/errors" "github.com/restic/restic/internal/restic" + tomb "gopkg.in/tomb.v2" ) // Saver allows saving a blob. @@ -22,22 +22,24 @@ type BlobSaver struct { m sync.Mutex knownBlobs restic.BlobSet - ch chan<- saveBlobJob + ch chan<- saveBlobJob + done <-chan struct{} } // NewBlobSaver returns a new blob. A worker pool is started, it is stopped // when ctx is cancelled. -func NewBlobSaver(ctx context.Context, g Goer, repo Saver, workers uint) *BlobSaver { +func NewBlobSaver(ctx context.Context, t *tomb.Tomb, repo Saver, workers uint) *BlobSaver { ch := make(chan saveBlobJob) s := &BlobSaver{ repo: repo, knownBlobs: restic.NewBlobSet(), ch: ch, + done: t.Dying(), } for i := uint(0); i < workers; i++ { - g.Go(func() error { - return s.worker(ctx, ch) + t.Go(func() error { + return s.worker(t.Context(ctx), ch) }) } @@ -51,6 +53,10 @@ func (s *BlobSaver) Save(ctx context.Context, t restic.BlobType, buf *Buffer) Fu ch := make(chan saveBlobResponse, 1) select { case s.ch <- saveBlobJob{BlobType: t, buf: buf, ch: ch}: + case <-s.done: + debug.Log("not sending job, BlobSaver is done") + close(ch) + return FutureBlob{ch: ch} case <-ctx.Done(): debug.Log("not sending job, context is cancelled") close(ch) @@ -139,7 +145,7 @@ func (s *BlobSaver) saveBlob(ctx context.Context, t restic.BlobType, buf []byte) // otherwise we're responsible for saving it _, err := s.repo.SaveBlob(ctx, t, buf, id) if err != nil { - return saveBlobResponse{}, errors.Fatalf("unable to save data: %v", err) + return saveBlobResponse{}, err } return saveBlobResponse{ @@ -153,14 +159,13 @@ func (s *BlobSaver) worker(ctx context.Context, jobs <-chan saveBlobJob) error { var job saveBlobJob select { case <-ctx.Done(): - debug.Log("context is cancelled, exiting: %v", ctx.Err()) return nil case job = <-jobs: } res, err := s.saveBlob(ctx, job.BlobType, job.buf.Data) if err != nil { - debug.Log("saveBlob returned error: %v", err) + debug.Log("saveBlob returned error, exiting: %v", err) close(job.ch) return err } diff --git a/internal/archiver/blob_saver_test.go b/internal/archiver/blob_saver_test.go new file mode 100644 index 000000000..6d7af3fed --- /dev/null +++ b/internal/archiver/blob_saver_test.go @@ -0,0 +1,115 @@ +package archiver + +import ( + "context" + "fmt" + "runtime" + "sync/atomic" + "testing" + + "github.com/restic/restic/internal/errors" + "github.com/restic/restic/internal/repository" + "github.com/restic/restic/internal/restic" + tomb "gopkg.in/tomb.v2" +) + +var errTest = errors.New("test error") + +type saveFail struct { + idx restic.Index + cnt int32 + failAt int32 +} + +func (b *saveFail) SaveBlob(ctx context.Context, t restic.BlobType, buf []byte, id restic.ID) (restic.ID, error) { + val := atomic.AddInt32(&b.cnt, 1) + if val == b.failAt { + return restic.ID{}, errTest + } + + return id, nil +} + +func (b *saveFail) Index() restic.Index { + return b.idx +} + +func TestBlobSaver(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + var tmb tomb.Tomb + saver := &saveFail{ + idx: repository.NewIndex(), + } + + b := NewBlobSaver(ctx, &tmb, saver, uint(runtime.NumCPU())) + + var results []FutureBlob + + for i := 0; i < 20; i++ { + buf := &Buffer{Data: []byte(fmt.Sprintf("foo%d", i))} + fb := b.Save(ctx, restic.DataBlob, buf) + results = append(results, fb) + } + + for i, blob := range results { + blob.Wait(ctx) + if blob.Known() { + t.Errorf("blob %v is known, that should not be the case", i) + } + } + + tmb.Kill(nil) + + err := tmb.Wait() + if err != nil { + t.Fatal(err) + } +} + +func TestBlobSaverError(t *testing.T) { + var tests = []struct { + blobs int + failAt int + }{ + {20, 2}, + {20, 5}, + {20, 15}, + {200, 150}, + } + + for _, test := range tests { + t.Run("", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + var tmb tomb.Tomb + saver := &saveFail{ + idx: repository.NewIndex(), + failAt: int32(test.failAt), + } + + b := NewBlobSaver(ctx, &tmb, saver, uint(runtime.NumCPU())) + + var results []FutureBlob + + for i := 0; i < test.blobs; i++ { + buf := &Buffer{Data: []byte(fmt.Sprintf("foo%d", i))} + fb := b.Save(ctx, restic.DataBlob, buf) + results = append(results, fb) + } + + tmb.Kill(nil) + + err := tmb.Wait() + if err == nil { + t.Errorf("expected error not found") + } + + if err != errTest { + t.Fatalf("unexpected error found: %v", err) + } + }) + } +} diff --git a/internal/archiver/file_saver.go b/internal/archiver/file_saver.go index 7a2c24adf..1e7b4aea5 100644 --- a/internal/archiver/file_saver.go +++ b/internal/archiver/file_saver.go @@ -10,13 +10,9 @@ import ( "github.com/restic/restic/internal/errors" "github.com/restic/restic/internal/fs" "github.com/restic/restic/internal/restic" + tomb "gopkg.in/tomb.v2" ) -// Goer starts a function in a goroutine. -type Goer interface { - Go(func() error) -} - // FutureFile is returned by Save and will return the data once it // has been processed. type FutureFile struct { @@ -24,40 +20,47 @@ type FutureFile struct { res saveFileResponse } -func (s *FutureFile) wait() { - res, ok := <-s.ch - if ok { - s.res = res +// Wait blocks until the result of the save operation is received or ctx is +// cancelled. +func (s *FutureFile) Wait(ctx context.Context) { + select { + case res, ok := <-s.ch: + if ok { + s.res = res + } + case <-ctx.Done(): + return } } // Node returns the node once it is available. func (s *FutureFile) Node() *restic.Node { - s.wait() return s.res.node } // Stats returns the stats for the file once they are available. func (s *FutureFile) Stats() ItemStats { - s.wait() return s.res.stats } // Err returns the error in case an error occurred. func (s *FutureFile) Err() error { - s.wait() return s.res.err } +// SaveBlobFn saves a blob to a repo. +type SaveBlobFn func(context.Context, restic.BlobType, *Buffer) FutureBlob + // FileSaver concurrently saves incoming files to the repo. type FileSaver struct { fs fs.FS - blobSaver *BlobSaver saveFilePool *BufferPool + saveBlob SaveBlobFn pol chunker.Pol - ch chan<- saveFileJob + ch chan<- saveFileJob + done <-chan struct{} CompleteBlob func(filename string, bytes uint64) @@ -66,7 +69,7 @@ type FileSaver struct { // NewFileSaver returns a new file saver. A worker pool with fileWorkers is // started, it is stopped when ctx is cancelled. -func NewFileSaver(ctx context.Context, g Goer, fs fs.FS, blobSaver *BlobSaver, pol chunker.Pol, fileWorkers, blobWorkers uint) *FileSaver { +func NewFileSaver(ctx context.Context, t *tomb.Tomb, fs fs.FS, save SaveBlobFn, pol chunker.Pol, fileWorkers, blobWorkers uint) *FileSaver { ch := make(chan saveFileJob) debug.Log("new file saver with %v file workers and %v blob workers", fileWorkers, blobWorkers) @@ -75,17 +78,18 @@ func NewFileSaver(ctx context.Context, g Goer, fs fs.FS, blobSaver *BlobSaver, p s := &FileSaver{ fs: fs, - blobSaver: blobSaver, + saveBlob: save, saveFilePool: NewBufferPool(ctx, int(poolSize), chunker.MaxSize), pol: pol, ch: ch, + done: t.Dying(), CompleteBlob: func(string, uint64) {}, } for i := uint(0); i < fileWorkers; i++ { - g.Go(func() error { - s.worker(ctx, ch) + t.Go(func() error { + s.worker(t.Context(ctx), ch) return nil }) } @@ -111,8 +115,14 @@ func (s *FileSaver) Save(ctx context.Context, snPath string, file fs.File, fi os select { case s.ch <- job: + case <-s.done: + debug.Log("not sending job, FileSaver is done") + close(ch) + return FutureFile{ch: ch} case <-ctx.Done(): debug.Log("not sending job, context is cancelled: %v", ctx.Err()) + close(ch) + return FutureFile{ch: ch} } return FutureFile{ch: ch} @@ -182,7 +192,7 @@ func (s *FileSaver) saveFile(ctx context.Context, chnker *chunker.Chunker, snPat return saveFileResponse{err: ctx.Err()} } - res := s.blobSaver.Save(ctx, restic.DataBlob, buf) + res := s.saveBlob(ctx, restic.DataBlob, buf) results = append(results, res) // test if the context has been cancelled, return the error diff --git a/internal/archiver/file_saver_test.go b/internal/archiver/file_saver_test.go new file mode 100644 index 000000000..c8cf58735 --- /dev/null +++ b/internal/archiver/file_saver_test.go @@ -0,0 +1,97 @@ +package archiver + +import ( + "context" + "fmt" + "io/ioutil" + "path/filepath" + "runtime" + "testing" + + "github.com/restic/chunker" + "github.com/restic/restic/internal/fs" + "github.com/restic/restic/internal/restic" + "github.com/restic/restic/internal/test" + tomb "gopkg.in/tomb.v2" +) + +func createTestFiles(t testing.TB, num int) (files []string, cleanup func()) { + tempdir, cleanup := test.TempDir(t) + + for i := 0; i < 15; i++ { + filename := fmt.Sprintf("testfile-%d", i) + err := ioutil.WriteFile(filepath.Join(tempdir, filename), []byte(filename), 0600) + if err != nil { + t.Fatal(err) + } + files = append(files, filepath.Join(tempdir, filename)) + } + + return files, cleanup +} + +func startFileSaver(ctx context.Context, t testing.TB, fs fs.FS) (*FileSaver, *tomb.Tomb) { + var tmb tomb.Tomb + + saveBlob := func(ctx context.Context, tpe restic.BlobType, buf *Buffer) FutureBlob { + ch := make(chan saveBlobResponse) + close(ch) + return FutureBlob{ch: ch} + } + + workers := uint(runtime.NumCPU()) + pol, err := chunker.RandomPolynomial() + if err != nil { + t.Fatal(err) + } + + s := NewFileSaver(ctx, &tmb, fs, saveBlob, pol, workers, workers) + s.NodeFromFileInfo = restic.NodeFromFileInfo + + return s, &tmb +} + +func TestFileSaver(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + files, cleanup := createTestFiles(t, 15) + defer cleanup() + + startFn := func() {} + completeFn := func(*restic.Node, ItemStats) {} + + testFs := fs.Local{} + s, tmb := startFileSaver(ctx, t, testFs) + + var results []FutureFile + + for _, filename := range files { + f, err := testFs.Open(filename) + if err != nil { + t.Fatal(err) + } + + fi, err := f.Stat() + if err != nil { + t.Fatal(err) + } + + ff := s.Save(ctx, filename, f, fi, startFn, completeFn) + results = append(results, ff) + } + + for _, file := range results { + file.Wait(ctx) + if file.Err() != nil { + t.Errorf("unable to save file: %v", file.Err()) + } + } + + tmb.Kill(nil) + + err := tmb.Wait() + if err != nil { + t.Fatal(err) + } +} diff --git a/internal/archiver/tree_saver.go b/internal/archiver/tree_saver.go index 743ef3b55..29b899e82 100644 --- a/internal/archiver/tree_saver.go +++ b/internal/archiver/tree_saver.go @@ -4,8 +4,8 @@ import ( "context" "github.com/restic/restic/internal/debug" - "github.com/restic/restic/internal/errors" "github.com/restic/restic/internal/restic" + tomb "gopkg.in/tomb.v2" ) // FutureTree is returned by Save and will return the data once it @@ -15,22 +15,25 @@ type FutureTree struct { res saveTreeResponse } -func (s *FutureTree) wait() { - res, ok := <-s.ch - if ok { - s.res = res +// Wait blocks until the data has been received or ctx is cancelled. +func (s *FutureTree) Wait(ctx context.Context) { + select { + case <-ctx.Done(): + return + case res, ok := <-s.ch: + if ok { + s.res = res + } } } -// Node returns the node once it is available. +// Node returns the node. func (s *FutureTree) Node() *restic.Node { - s.wait() return s.res.node } -// Stats returns the stats for the file once they are available. +// Stats returns the stats for the file. func (s *FutureTree) Stats() ItemStats { - s.wait() return s.res.stats } @@ -39,23 +42,25 @@ type TreeSaver struct { saveTree func(context.Context, *restic.Tree) (restic.ID, ItemStats, error) errFn ErrorFunc - ch chan<- saveTreeJob + ch chan<- saveTreeJob + done <-chan struct{} } // NewTreeSaver returns a new tree saver. A worker pool with treeWorkers is // started, it is stopped when ctx is cancelled. -func NewTreeSaver(ctx context.Context, g Goer, treeWorkers uint, saveTree func(context.Context, *restic.Tree) (restic.ID, ItemStats, error), errFn ErrorFunc) *TreeSaver { +func NewTreeSaver(ctx context.Context, t *tomb.Tomb, treeWorkers uint, saveTree func(context.Context, *restic.Tree) (restic.ID, ItemStats, error), errFn ErrorFunc) *TreeSaver { ch := make(chan saveTreeJob) s := &TreeSaver{ ch: ch, + done: t.Dying(), saveTree: saveTree, errFn: errFn, } for i := uint(0); i < treeWorkers; i++ { - g.Go(func() error { - return s.worker(ctx, ch) + t.Go(func() error { + return s.worker(t.Context(ctx), ch) }) } @@ -73,8 +78,12 @@ func (s *TreeSaver) Save(ctx context.Context, snPath string, node *restic.Node, } select { case s.ch <- job: + case <-s.done: + debug.Log("not saving tree, TreeSaver is done") + close(ch) + return FutureTree{ch: ch} case <-ctx.Done(): - debug.Log("refusing to save job, context is cancelled: %v", ctx.Err()) + debug.Log("not saving tree, context is cancelled") close(ch) return FutureTree{ch: ch} } @@ -149,7 +158,8 @@ func (s *TreeSaver) worker(ctx context.Context, jobs <-chan saveTreeJob) error { node, stats, err := s.save(ctx, job.snPath, job.node, job.nodes) if err != nil { debug.Log("error saving tree blob: %v", err) - return errors.Fatalf("unable to save data: %v", err) + close(job.ch) + return err } job.ch <- saveTreeResponse{ diff --git a/internal/archiver/tree_saver_test.go b/internal/archiver/tree_saver_test.go new file mode 100644 index 000000000..3f58da222 --- /dev/null +++ b/internal/archiver/tree_saver_test.go @@ -0,0 +1,120 @@ +package archiver + +import ( + "context" + "fmt" + "os" + "runtime" + "sync/atomic" + "testing" + + "github.com/restic/restic/internal/errors" + "github.com/restic/restic/internal/restic" + tomb "gopkg.in/tomb.v2" +) + +func TestTreeSaver(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + var tmb tomb.Tomb + + saveFn := func(context.Context, *restic.Tree) (restic.ID, ItemStats, error) { + return restic.NewRandomID(), ItemStats{TreeBlobs: 1, TreeSize: 123}, nil + } + + errFn := func(snPath string, fi os.FileInfo, err error) error { + return nil + } + + b := NewTreeSaver(ctx, &tmb, uint(runtime.NumCPU()), saveFn, errFn) + + var results []FutureTree + + for i := 0; i < 20; i++ { + node := &restic.Node{ + Name: fmt.Sprintf("file-%d", i), + } + + fb := b.Save(ctx, "/", node, nil) + results = append(results, fb) + } + + for _, tree := range results { + tree.Wait(ctx) + } + + tmb.Kill(nil) + + err := tmb.Wait() + if err != nil { + t.Fatal(err) + } +} + +func TestTreeSaverError(t *testing.T) { + var tests = []struct { + trees int + failAt int32 + }{ + {1, 1}, + {20, 2}, + {20, 5}, + {20, 15}, + {200, 150}, + } + + errTest := errors.New("test error") + + for _, test := range tests { + t.Run("", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + var tmb tomb.Tomb + + var num int32 + saveFn := func(context.Context, *restic.Tree) (restic.ID, ItemStats, error) { + val := atomic.AddInt32(&num, 1) + if val == test.failAt { + t.Logf("sending error for request %v\n", test.failAt) + return restic.ID{}, ItemStats{}, errTest + } + return restic.NewRandomID(), ItemStats{TreeBlobs: 1, TreeSize: 123}, nil + } + + errFn := func(snPath string, fi os.FileInfo, err error) error { + t.Logf("ignoring error %v\n", err) + return nil + } + + b := NewTreeSaver(ctx, &tmb, uint(runtime.NumCPU()), saveFn, errFn) + + var results []FutureTree + + for i := 0; i < test.trees; i++ { + node := &restic.Node{ + Name: fmt.Sprintf("file-%d", i), + } + + fb := b.Save(ctx, "/", node, nil) + results = append(results, fb) + } + + for _, tree := range results { + tree.Wait(ctx) + } + + tmb.Kill(nil) + + err := tmb.Wait() + if err == nil { + t.Errorf("expected error not found") + } + + if err != errTest { + t.Fatalf("unexpected error found: %v", err) + } + }) + } +}