From 581c62ee72f3aaad3687a5f411943674793b871b Mon Sep 17 00:00:00 2001 From: Alexander Neumann Date: Sat, 12 May 2018 21:40:31 +0200 Subject: [PATCH 1/6] archiver: Improve error handling This commit changes how the worker goroutines for saving e.g. blobs interact. Before, it was possible to get stuck sending an instruction to archive a file or dir when no worker goroutines were available any more. This commit introduces a `done` channel for each of the worker pools, which is set to the channel returned by `tomb.Dying()`, so it is closed when the first worker returned an error. --- cmd/restic/cmd_backup.go | 2 +- internal/archiver/archiver.go | 6 +- internal/archiver/archiver_test.go | 4 + internal/archiver/blob_saver.go | 21 +++-- internal/archiver/blob_saver_test.go | 115 +++++++++++++++++++++++++ internal/archiver/file_saver.go | 48 ++++++----- internal/archiver/file_saver_test.go | 97 ++++++++++++++++++++++ internal/archiver/tree_saver.go | 40 +++++---- internal/archiver/tree_saver_test.go | 120 +++++++++++++++++++++++++++ 9 files changed, 408 insertions(+), 45 deletions(-) create mode 100644 internal/archiver/blob_saver_test.go create mode 100644 internal/archiver/file_saver_test.go create mode 100644 internal/archiver/tree_saver_test.go diff --git a/cmd/restic/cmd_backup.go b/cmd/restic/cmd_backup.go index 562342000..487d8d587 100644 --- a/cmd/restic/cmd_backup.go +++ b/cmd/restic/cmd_backup.go @@ -467,7 +467,7 @@ func runBackup(opts BackupOptions, gopts GlobalOptions, term *termstatus.Termina p.V("start backup on %v", targets) _, id, err := arch.Snapshot(gopts.ctx, targets, snapshotOpts) if err != nil { - return err + return errors.Fatalf("unable to save snapshot: %v", err) } p.Finish() diff --git a/internal/archiver/archiver.go b/internal/archiver/archiver.go index 3e1a6d0b8..50f49f0f9 100644 --- a/internal/archiver/archiver.go +++ b/internal/archiver/archiver.go @@ -271,6 +271,7 @@ func (fn *FutureNode) wait(ctx context.Context) { switch { case fn.isFile: // wait for and collect the data for the file + fn.file.Wait(ctx) fn.node = fn.file.Node() fn.err = fn.file.Err() fn.stats = fn.file.Stats() @@ -281,6 +282,7 @@ func (fn *FutureNode) wait(ctx context.Context) { case fn.isDir: // wait for and collect the data for the dir + fn.dir.Wait(ctx) fn.node = fn.dir.Node() fn.stats = fn.dir.Stats() @@ -713,13 +715,13 @@ func (arch *Archiver) runWorkers(ctx context.Context, t *tomb.Tomb) { arch.fileSaver = NewFileSaver(ctx, t, arch.FS, - arch.blobSaver, + arch.blobSaver.Save, arch.Repo.Config().ChunkerPolynomial, arch.Options.FileReadConcurrency, arch.Options.SaveBlobConcurrency) arch.fileSaver.CompleteBlob = arch.CompleteBlob arch.fileSaver.NodeFromFileInfo = arch.nodeFromFileInfo - arch.treeSaver = NewTreeSaver(ctx, t, arch.Options.SaveTreeConcurrency, arch.saveTree, arch.error) + arch.treeSaver = NewTreeSaver(ctx, t, arch.Options.SaveTreeConcurrency, arch.saveTree, arch.Error) } // Snapshot saves several targets and returns a snapshot. diff --git a/internal/archiver/archiver_test.go b/internal/archiver/archiver_test.go index c24232b50..8b029c780 100644 --- a/internal/archiver/archiver_test.go +++ b/internal/archiver/archiver_test.go @@ -70,6 +70,8 @@ func saveFile(t testing.TB, repo restic.Repository, filename string, filesystem } res := arch.fileSaver.Save(ctx, "/", file, fi, start, complete) + + res.Wait(ctx) if res.Err() != nil { t.Fatal(res.Err()) } @@ -620,6 +622,7 @@ func TestArchiverSaveDir(t *testing.T) { t.Fatal(err) } + ft.Wait(ctx) node, stats := ft.Node(), ft.Stats() tmb.Kill(nil) @@ -701,6 +704,7 @@ func TestArchiverSaveDirIncremental(t *testing.T) { t.Fatal(err) } + ft.Wait(ctx) node, stats := ft.Node(), ft.Stats() tmb.Kill(nil) diff --git a/internal/archiver/blob_saver.go b/internal/archiver/blob_saver.go index 7ef274058..c20b96919 100644 --- a/internal/archiver/blob_saver.go +++ b/internal/archiver/blob_saver.go @@ -5,8 +5,8 @@ import ( "sync" "github.com/restic/restic/internal/debug" - "github.com/restic/restic/internal/errors" "github.com/restic/restic/internal/restic" + tomb "gopkg.in/tomb.v2" ) // Saver allows saving a blob. @@ -22,22 +22,24 @@ type BlobSaver struct { m sync.Mutex knownBlobs restic.BlobSet - ch chan<- saveBlobJob + ch chan<- saveBlobJob + done <-chan struct{} } // NewBlobSaver returns a new blob. A worker pool is started, it is stopped // when ctx is cancelled. -func NewBlobSaver(ctx context.Context, g Goer, repo Saver, workers uint) *BlobSaver { +func NewBlobSaver(ctx context.Context, t *tomb.Tomb, repo Saver, workers uint) *BlobSaver { ch := make(chan saveBlobJob) s := &BlobSaver{ repo: repo, knownBlobs: restic.NewBlobSet(), ch: ch, + done: t.Dying(), } for i := uint(0); i < workers; i++ { - g.Go(func() error { - return s.worker(ctx, ch) + t.Go(func() error { + return s.worker(t.Context(ctx), ch) }) } @@ -51,6 +53,10 @@ func (s *BlobSaver) Save(ctx context.Context, t restic.BlobType, buf *Buffer) Fu ch := make(chan saveBlobResponse, 1) select { case s.ch <- saveBlobJob{BlobType: t, buf: buf, ch: ch}: + case <-s.done: + debug.Log("not sending job, BlobSaver is done") + close(ch) + return FutureBlob{ch: ch} case <-ctx.Done(): debug.Log("not sending job, context is cancelled") close(ch) @@ -139,7 +145,7 @@ func (s *BlobSaver) saveBlob(ctx context.Context, t restic.BlobType, buf []byte) // otherwise we're responsible for saving it _, err := s.repo.SaveBlob(ctx, t, buf, id) if err != nil { - return saveBlobResponse{}, errors.Fatalf("unable to save data: %v", err) + return saveBlobResponse{}, err } return saveBlobResponse{ @@ -153,14 +159,13 @@ func (s *BlobSaver) worker(ctx context.Context, jobs <-chan saveBlobJob) error { var job saveBlobJob select { case <-ctx.Done(): - debug.Log("context is cancelled, exiting: %v", ctx.Err()) return nil case job = <-jobs: } res, err := s.saveBlob(ctx, job.BlobType, job.buf.Data) if err != nil { - debug.Log("saveBlob returned error: %v", err) + debug.Log("saveBlob returned error, exiting: %v", err) close(job.ch) return err } diff --git a/internal/archiver/blob_saver_test.go b/internal/archiver/blob_saver_test.go new file mode 100644 index 000000000..6d7af3fed --- /dev/null +++ b/internal/archiver/blob_saver_test.go @@ -0,0 +1,115 @@ +package archiver + +import ( + "context" + "fmt" + "runtime" + "sync/atomic" + "testing" + + "github.com/restic/restic/internal/errors" + "github.com/restic/restic/internal/repository" + "github.com/restic/restic/internal/restic" + tomb "gopkg.in/tomb.v2" +) + +var errTest = errors.New("test error") + +type saveFail struct { + idx restic.Index + cnt int32 + failAt int32 +} + +func (b *saveFail) SaveBlob(ctx context.Context, t restic.BlobType, buf []byte, id restic.ID) (restic.ID, error) { + val := atomic.AddInt32(&b.cnt, 1) + if val == b.failAt { + return restic.ID{}, errTest + } + + return id, nil +} + +func (b *saveFail) Index() restic.Index { + return b.idx +} + +func TestBlobSaver(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + var tmb tomb.Tomb + saver := &saveFail{ + idx: repository.NewIndex(), + } + + b := NewBlobSaver(ctx, &tmb, saver, uint(runtime.NumCPU())) + + var results []FutureBlob + + for i := 0; i < 20; i++ { + buf := &Buffer{Data: []byte(fmt.Sprintf("foo%d", i))} + fb := b.Save(ctx, restic.DataBlob, buf) + results = append(results, fb) + } + + for i, blob := range results { + blob.Wait(ctx) + if blob.Known() { + t.Errorf("blob %v is known, that should not be the case", i) + } + } + + tmb.Kill(nil) + + err := tmb.Wait() + if err != nil { + t.Fatal(err) + } +} + +func TestBlobSaverError(t *testing.T) { + var tests = []struct { + blobs int + failAt int + }{ + {20, 2}, + {20, 5}, + {20, 15}, + {200, 150}, + } + + for _, test := range tests { + t.Run("", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + var tmb tomb.Tomb + saver := &saveFail{ + idx: repository.NewIndex(), + failAt: int32(test.failAt), + } + + b := NewBlobSaver(ctx, &tmb, saver, uint(runtime.NumCPU())) + + var results []FutureBlob + + for i := 0; i < test.blobs; i++ { + buf := &Buffer{Data: []byte(fmt.Sprintf("foo%d", i))} + fb := b.Save(ctx, restic.DataBlob, buf) + results = append(results, fb) + } + + tmb.Kill(nil) + + err := tmb.Wait() + if err == nil { + t.Errorf("expected error not found") + } + + if err != errTest { + t.Fatalf("unexpected error found: %v", err) + } + }) + } +} diff --git a/internal/archiver/file_saver.go b/internal/archiver/file_saver.go index 7a2c24adf..1e7b4aea5 100644 --- a/internal/archiver/file_saver.go +++ b/internal/archiver/file_saver.go @@ -10,13 +10,9 @@ import ( "github.com/restic/restic/internal/errors" "github.com/restic/restic/internal/fs" "github.com/restic/restic/internal/restic" + tomb "gopkg.in/tomb.v2" ) -// Goer starts a function in a goroutine. -type Goer interface { - Go(func() error) -} - // FutureFile is returned by Save and will return the data once it // has been processed. type FutureFile struct { @@ -24,40 +20,47 @@ type FutureFile struct { res saveFileResponse } -func (s *FutureFile) wait() { - res, ok := <-s.ch - if ok { - s.res = res +// Wait blocks until the result of the save operation is received or ctx is +// cancelled. +func (s *FutureFile) Wait(ctx context.Context) { + select { + case res, ok := <-s.ch: + if ok { + s.res = res + } + case <-ctx.Done(): + return } } // Node returns the node once it is available. func (s *FutureFile) Node() *restic.Node { - s.wait() return s.res.node } // Stats returns the stats for the file once they are available. func (s *FutureFile) Stats() ItemStats { - s.wait() return s.res.stats } // Err returns the error in case an error occurred. func (s *FutureFile) Err() error { - s.wait() return s.res.err } +// SaveBlobFn saves a blob to a repo. +type SaveBlobFn func(context.Context, restic.BlobType, *Buffer) FutureBlob + // FileSaver concurrently saves incoming files to the repo. type FileSaver struct { fs fs.FS - blobSaver *BlobSaver saveFilePool *BufferPool + saveBlob SaveBlobFn pol chunker.Pol - ch chan<- saveFileJob + ch chan<- saveFileJob + done <-chan struct{} CompleteBlob func(filename string, bytes uint64) @@ -66,7 +69,7 @@ type FileSaver struct { // NewFileSaver returns a new file saver. A worker pool with fileWorkers is // started, it is stopped when ctx is cancelled. -func NewFileSaver(ctx context.Context, g Goer, fs fs.FS, blobSaver *BlobSaver, pol chunker.Pol, fileWorkers, blobWorkers uint) *FileSaver { +func NewFileSaver(ctx context.Context, t *tomb.Tomb, fs fs.FS, save SaveBlobFn, pol chunker.Pol, fileWorkers, blobWorkers uint) *FileSaver { ch := make(chan saveFileJob) debug.Log("new file saver with %v file workers and %v blob workers", fileWorkers, blobWorkers) @@ -75,17 +78,18 @@ func NewFileSaver(ctx context.Context, g Goer, fs fs.FS, blobSaver *BlobSaver, p s := &FileSaver{ fs: fs, - blobSaver: blobSaver, + saveBlob: save, saveFilePool: NewBufferPool(ctx, int(poolSize), chunker.MaxSize), pol: pol, ch: ch, + done: t.Dying(), CompleteBlob: func(string, uint64) {}, } for i := uint(0); i < fileWorkers; i++ { - g.Go(func() error { - s.worker(ctx, ch) + t.Go(func() error { + s.worker(t.Context(ctx), ch) return nil }) } @@ -111,8 +115,14 @@ func (s *FileSaver) Save(ctx context.Context, snPath string, file fs.File, fi os select { case s.ch <- job: + case <-s.done: + debug.Log("not sending job, FileSaver is done") + close(ch) + return FutureFile{ch: ch} case <-ctx.Done(): debug.Log("not sending job, context is cancelled: %v", ctx.Err()) + close(ch) + return FutureFile{ch: ch} } return FutureFile{ch: ch} @@ -182,7 +192,7 @@ func (s *FileSaver) saveFile(ctx context.Context, chnker *chunker.Chunker, snPat return saveFileResponse{err: ctx.Err()} } - res := s.blobSaver.Save(ctx, restic.DataBlob, buf) + res := s.saveBlob(ctx, restic.DataBlob, buf) results = append(results, res) // test if the context has been cancelled, return the error diff --git a/internal/archiver/file_saver_test.go b/internal/archiver/file_saver_test.go new file mode 100644 index 000000000..c8cf58735 --- /dev/null +++ b/internal/archiver/file_saver_test.go @@ -0,0 +1,97 @@ +package archiver + +import ( + "context" + "fmt" + "io/ioutil" + "path/filepath" + "runtime" + "testing" + + "github.com/restic/chunker" + "github.com/restic/restic/internal/fs" + "github.com/restic/restic/internal/restic" + "github.com/restic/restic/internal/test" + tomb "gopkg.in/tomb.v2" +) + +func createTestFiles(t testing.TB, num int) (files []string, cleanup func()) { + tempdir, cleanup := test.TempDir(t) + + for i := 0; i < 15; i++ { + filename := fmt.Sprintf("testfile-%d", i) + err := ioutil.WriteFile(filepath.Join(tempdir, filename), []byte(filename), 0600) + if err != nil { + t.Fatal(err) + } + files = append(files, filepath.Join(tempdir, filename)) + } + + return files, cleanup +} + +func startFileSaver(ctx context.Context, t testing.TB, fs fs.FS) (*FileSaver, *tomb.Tomb) { + var tmb tomb.Tomb + + saveBlob := func(ctx context.Context, tpe restic.BlobType, buf *Buffer) FutureBlob { + ch := make(chan saveBlobResponse) + close(ch) + return FutureBlob{ch: ch} + } + + workers := uint(runtime.NumCPU()) + pol, err := chunker.RandomPolynomial() + if err != nil { + t.Fatal(err) + } + + s := NewFileSaver(ctx, &tmb, fs, saveBlob, pol, workers, workers) + s.NodeFromFileInfo = restic.NodeFromFileInfo + + return s, &tmb +} + +func TestFileSaver(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + files, cleanup := createTestFiles(t, 15) + defer cleanup() + + startFn := func() {} + completeFn := func(*restic.Node, ItemStats) {} + + testFs := fs.Local{} + s, tmb := startFileSaver(ctx, t, testFs) + + var results []FutureFile + + for _, filename := range files { + f, err := testFs.Open(filename) + if err != nil { + t.Fatal(err) + } + + fi, err := f.Stat() + if err != nil { + t.Fatal(err) + } + + ff := s.Save(ctx, filename, f, fi, startFn, completeFn) + results = append(results, ff) + } + + for _, file := range results { + file.Wait(ctx) + if file.Err() != nil { + t.Errorf("unable to save file: %v", file.Err()) + } + } + + tmb.Kill(nil) + + err := tmb.Wait() + if err != nil { + t.Fatal(err) + } +} diff --git a/internal/archiver/tree_saver.go b/internal/archiver/tree_saver.go index 743ef3b55..29b899e82 100644 --- a/internal/archiver/tree_saver.go +++ b/internal/archiver/tree_saver.go @@ -4,8 +4,8 @@ import ( "context" "github.com/restic/restic/internal/debug" - "github.com/restic/restic/internal/errors" "github.com/restic/restic/internal/restic" + tomb "gopkg.in/tomb.v2" ) // FutureTree is returned by Save and will return the data once it @@ -15,22 +15,25 @@ type FutureTree struct { res saveTreeResponse } -func (s *FutureTree) wait() { - res, ok := <-s.ch - if ok { - s.res = res +// Wait blocks until the data has been received or ctx is cancelled. +func (s *FutureTree) Wait(ctx context.Context) { + select { + case <-ctx.Done(): + return + case res, ok := <-s.ch: + if ok { + s.res = res + } } } -// Node returns the node once it is available. +// Node returns the node. func (s *FutureTree) Node() *restic.Node { - s.wait() return s.res.node } -// Stats returns the stats for the file once they are available. +// Stats returns the stats for the file. func (s *FutureTree) Stats() ItemStats { - s.wait() return s.res.stats } @@ -39,23 +42,25 @@ type TreeSaver struct { saveTree func(context.Context, *restic.Tree) (restic.ID, ItemStats, error) errFn ErrorFunc - ch chan<- saveTreeJob + ch chan<- saveTreeJob + done <-chan struct{} } // NewTreeSaver returns a new tree saver. A worker pool with treeWorkers is // started, it is stopped when ctx is cancelled. -func NewTreeSaver(ctx context.Context, g Goer, treeWorkers uint, saveTree func(context.Context, *restic.Tree) (restic.ID, ItemStats, error), errFn ErrorFunc) *TreeSaver { +func NewTreeSaver(ctx context.Context, t *tomb.Tomb, treeWorkers uint, saveTree func(context.Context, *restic.Tree) (restic.ID, ItemStats, error), errFn ErrorFunc) *TreeSaver { ch := make(chan saveTreeJob) s := &TreeSaver{ ch: ch, + done: t.Dying(), saveTree: saveTree, errFn: errFn, } for i := uint(0); i < treeWorkers; i++ { - g.Go(func() error { - return s.worker(ctx, ch) + t.Go(func() error { + return s.worker(t.Context(ctx), ch) }) } @@ -73,8 +78,12 @@ func (s *TreeSaver) Save(ctx context.Context, snPath string, node *restic.Node, } select { case s.ch <- job: + case <-s.done: + debug.Log("not saving tree, TreeSaver is done") + close(ch) + return FutureTree{ch: ch} case <-ctx.Done(): - debug.Log("refusing to save job, context is cancelled: %v", ctx.Err()) + debug.Log("not saving tree, context is cancelled") close(ch) return FutureTree{ch: ch} } @@ -149,7 +158,8 @@ func (s *TreeSaver) worker(ctx context.Context, jobs <-chan saveTreeJob) error { node, stats, err := s.save(ctx, job.snPath, job.node, job.nodes) if err != nil { debug.Log("error saving tree blob: %v", err) - return errors.Fatalf("unable to save data: %v", err) + close(job.ch) + return err } job.ch <- saveTreeResponse{ diff --git a/internal/archiver/tree_saver_test.go b/internal/archiver/tree_saver_test.go new file mode 100644 index 000000000..3f58da222 --- /dev/null +++ b/internal/archiver/tree_saver_test.go @@ -0,0 +1,120 @@ +package archiver + +import ( + "context" + "fmt" + "os" + "runtime" + "sync/atomic" + "testing" + + "github.com/restic/restic/internal/errors" + "github.com/restic/restic/internal/restic" + tomb "gopkg.in/tomb.v2" +) + +func TestTreeSaver(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + var tmb tomb.Tomb + + saveFn := func(context.Context, *restic.Tree) (restic.ID, ItemStats, error) { + return restic.NewRandomID(), ItemStats{TreeBlobs: 1, TreeSize: 123}, nil + } + + errFn := func(snPath string, fi os.FileInfo, err error) error { + return nil + } + + b := NewTreeSaver(ctx, &tmb, uint(runtime.NumCPU()), saveFn, errFn) + + var results []FutureTree + + for i := 0; i < 20; i++ { + node := &restic.Node{ + Name: fmt.Sprintf("file-%d", i), + } + + fb := b.Save(ctx, "/", node, nil) + results = append(results, fb) + } + + for _, tree := range results { + tree.Wait(ctx) + } + + tmb.Kill(nil) + + err := tmb.Wait() + if err != nil { + t.Fatal(err) + } +} + +func TestTreeSaverError(t *testing.T) { + var tests = []struct { + trees int + failAt int32 + }{ + {1, 1}, + {20, 2}, + {20, 5}, + {20, 15}, + {200, 150}, + } + + errTest := errors.New("test error") + + for _, test := range tests { + t.Run("", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + var tmb tomb.Tomb + + var num int32 + saveFn := func(context.Context, *restic.Tree) (restic.ID, ItemStats, error) { + val := atomic.AddInt32(&num, 1) + if val == test.failAt { + t.Logf("sending error for request %v\n", test.failAt) + return restic.ID{}, ItemStats{}, errTest + } + return restic.NewRandomID(), ItemStats{TreeBlobs: 1, TreeSize: 123}, nil + } + + errFn := func(snPath string, fi os.FileInfo, err error) error { + t.Logf("ignoring error %v\n", err) + return nil + } + + b := NewTreeSaver(ctx, &tmb, uint(runtime.NumCPU()), saveFn, errFn) + + var results []FutureTree + + for i := 0; i < test.trees; i++ { + node := &restic.Node{ + Name: fmt.Sprintf("file-%d", i), + } + + fb := b.Save(ctx, "/", node, nil) + results = append(results, fb) + } + + for _, tree := range results { + tree.Wait(ctx) + } + + tmb.Kill(nil) + + err := tmb.Wait() + if err == nil { + t.Errorf("expected error not found") + } + + if err != errTest { + t.Fatalf("unexpected error found: %v", err) + } + }) + } +} From 21c83b17257be5c3015a92b9965550dceb3e9cdf Mon Sep 17 00:00:00 2001 From: Alexander Neumann Date: Sat, 12 May 2018 21:59:38 +0200 Subject: [PATCH 2/6] archiver: Add high-level documentation --- internal/archiver/archiver.go | 3 ++- internal/archiver/doc.go | 12 ++++++++++++ 2 files changed, 14 insertions(+), 1 deletion(-) create mode 100644 internal/archiver/doc.go diff --git a/internal/archiver/archiver.go b/internal/archiver/archiver.go index 50f49f0f9..29cf8d332 100644 --- a/internal/archiver/archiver.go +++ b/internal/archiver/archiver.go @@ -293,7 +293,8 @@ func (fn *FutureNode) wait(ctx context.Context) { } // Save saves a target (file or directory) to the repo. If the item is -// excluded,this function returns a nil node and error. +// excluded,this function returns a nil node and error, with excluded set to +// true. // // Errors and completion is needs to be handled by the caller. // diff --git a/internal/archiver/doc.go b/internal/archiver/doc.go new file mode 100644 index 000000000..928145aa2 --- /dev/null +++ b/internal/archiver/doc.go @@ -0,0 +1,12 @@ +// Package archiver contains the code which reads files, splits them into +// chunks and saves the data to the repository. +// +// An Archiver has a number of worker goroutines handling saving the different +// data structures to the repository, the details are implemented by the +// FileSaver, BlobSaver, and TreeSaver types. +// +// The main goroutine (the one calling Snapshot()) traverses the directory tree +// and delegates all work to these worker pools. They return a type +// (FutureFile, FutureBlob, and FutureTree) which can be resolved later, by +// calling Wait() on it. +package archiver From 256104111de9d6965ae64ca785612842ee93c3c5 Mon Sep 17 00:00:00 2001 From: Alexander Neumann Date: Sat, 12 May 2018 21:59:58 +0200 Subject: [PATCH 3/6] archiver: Clarify names --- internal/archiver/archiver.go | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/internal/archiver/archiver.go b/internal/archiver/archiver.go index 29cf8d332..5655a47b0 100644 --- a/internal/archiver/archiver.go +++ b/internal/archiver/archiver.go @@ -263,8 +263,8 @@ type FutureNode struct { isFile bool file FutureFile - isDir bool - dir FutureTree + isTree bool + tree FutureTree } func (fn *FutureNode) wait(ctx context.Context) { @@ -280,15 +280,15 @@ func (fn *FutureNode) wait(ctx context.Context) { fn.file = FutureFile{} fn.isFile = false - case fn.isDir: + case fn.isTree: // wait for and collect the data for the dir - fn.dir.Wait(ctx) - fn.node = fn.dir.Node() - fn.stats = fn.dir.Stats() + fn.tree.Wait(ctx) + fn.node = fn.tree.Node() + fn.stats = fn.tree.Stats() // ensure the other stuff can be garbage-collected - fn.dir = FutureTree{} - fn.isDir = false + fn.tree = FutureTree{} + fn.isTree = false } } @@ -393,8 +393,8 @@ func (arch *Archiver) Save(ctx context.Context, snPath, target string, previous start := time.Now() oldSubtree := arch.loadSubtree(ctx, previous) - fn.isDir = true - fn.dir, err = arch.SaveDir(ctx, snPath, fi, target, oldSubtree) + fn.isTree = true + fn.tree, err = arch.SaveDir(ctx, snPath, fi, target, oldSubtree) if err == nil { arch.CompleteItem(snItem, previous, fn.node, fn.stats, time.Since(start)) } else { From 526956af3585b23f287e9c910c1f28db5eb00367 Mon Sep 17 00:00:00 2001 From: Alexander Neumann Date: Sat, 12 May 2018 23:07:16 +0200 Subject: [PATCH 4/6] archiver: Read files/dirs in order --- internal/archiver/archiver.go | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/internal/archiver/archiver.go b/internal/archiver/archiver.go index 5655a47b0..2d303e2ff 100644 --- a/internal/archiver/archiver.go +++ b/internal/archiver/archiver.go @@ -480,7 +480,16 @@ func (arch *Archiver) SaveTree(ctx context.Context, snPath string, atree *Tree, futureNodes := make(map[string]FutureNode) - for name, subatree := range atree.Nodes { + // iterate over the nodes of atree in lexicographic (=deterministic) order + names := make([]string, 0, len(atree.Nodes)) + for name := range atree.Nodes { + names = append(names, name) + } + sort.Stable(sort.StringSlice(names)) + + for _, name := range names { + subatree := atree.Nodes[name] + // test if context has been cancelled if ctx.Err() != nil { return nil, ctx.Err() From c5e75d1c9855322485a372dfa38bb599c9ff93b8 Mon Sep 17 00:00:00 2001 From: Alexander Neumann Date: Sat, 12 May 2018 23:08:00 +0200 Subject: [PATCH 5/6] archiver: Add test for early abort on unhandled error --- internal/archiver/archiver.go | 1 + internal/archiver/archiver_test.go | 140 +++++++++++++++++++++++++++++ internal/archiver/file_saver.go | 2 + 3 files changed, 143 insertions(+) diff --git a/internal/archiver/archiver.go b/internal/archiver/archiver.go index 2d303e2ff..862c558e8 100644 --- a/internal/archiver/archiver.go +++ b/internal/archiver/archiver.go @@ -398,6 +398,7 @@ func (arch *Archiver) Save(ctx context.Context, snPath, target string, previous if err == nil { arch.CompleteItem(snItem, previous, fn.node, fn.stats, time.Since(start)) } else { + debug.Log("SaveDir for %v returned error: %v", snPath, err) return FutureNode{}, false, err } diff --git a/internal/archiver/archiver_test.go b/internal/archiver/archiver_test.go index 8b029c780..136cf1291 100644 --- a/internal/archiver/archiver_test.go +++ b/internal/archiver/archiver_test.go @@ -8,11 +8,13 @@ import ( "runtime" "strings" "sync" + "sync/atomic" "syscall" "testing" "time" "github.com/restic/restic/internal/checker" + "github.com/restic/restic/internal/errors" "github.com/restic/restic/internal/fs" "github.com/restic/restic/internal/repository" "github.com/restic/restic/internal/restic" @@ -1598,3 +1600,141 @@ func TestArchiverErrorReporting(t *testing.T) { }) } } + +// TrackFS keeps track which files are opened. For some files, an error is injected. +type TrackFS struct { + fs.FS + + errorOn map[string]error + + opened map[string]uint + m sync.Mutex +} + +func (m *TrackFS) Open(name string) (fs.File, error) { + m.m.Lock() + m.opened[name]++ + m.m.Unlock() + + return m.FS.Open(name) +} + +func (m *TrackFS) OpenFile(name string, flag int, perm os.FileMode) (fs.File, error) { + m.m.Lock() + m.opened[name]++ + m.m.Unlock() + + return m.FS.OpenFile(name, flag, perm) +} + +type failSaveRepo struct { + restic.Repository + failAfter int32 + cnt int32 + err error +} + +func (f *failSaveRepo) SaveBlob(ctx context.Context, t restic.BlobType, buf []byte, id restic.ID) (restic.ID, error) { + val := atomic.AddInt32(&f.cnt, 1) + if val >= f.failAfter { + return restic.ID{}, f.err + } + + return f.Repository.SaveBlob(ctx, t, buf, id) +} + +func TestArchiverAbortEarlyOnError(t *testing.T) { + var testErr = errors.New("test error") + + var tests = []struct { + src TestDir + wantOpen map[string]uint + failAfter uint // error after so many files have been saved to the repo + err error + }{ + { + src: TestDir{ + "dir": TestDir{ + "bar": TestFile{Content: "foobar"}, + "baz": TestFile{Content: "foobar"}, + "foo": TestFile{Content: "foobar"}, + }, + }, + wantOpen: map[string]uint{ + filepath.FromSlash("dir/bar"): 1, + filepath.FromSlash("dir/baz"): 1, + filepath.FromSlash("dir/foo"): 1, + }, + }, + { + src: TestDir{ + "dir": TestDir{ + "file1": TestFile{Content: string(restictest.Random(3, 4*1024*1024))}, + "file2": TestFile{Content: string(restictest.Random(3, 4*1024*1024))}, + "file3": TestFile{Content: string(restictest.Random(3, 4*1024*1024))}, + "file4": TestFile{Content: string(restictest.Random(3, 4*1024*1024))}, + "file5": TestFile{Content: string(restictest.Random(3, 4*1024*1024))}, + "file6": TestFile{Content: string(restictest.Random(3, 4*1024*1024))}, + "file7": TestFile{Content: string(restictest.Random(3, 4*1024*1024))}, + "file8": TestFile{Content: string(restictest.Random(3, 4*1024*1024))}, + "file9": TestFile{Content: string(restictest.Random(3, 4*1024*1024))}, + }, + }, + wantOpen: map[string]uint{ + filepath.FromSlash("dir/file1"): 1, + filepath.FromSlash("dir/file2"): 1, + filepath.FromSlash("dir/file3"): 1, + filepath.FromSlash("dir/file7"): 0, + filepath.FromSlash("dir/file8"): 0, + filepath.FromSlash("dir/file9"): 0, + }, + failAfter: 5, + err: testErr, + }, + } + + for _, test := range tests { + t.Run("", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + tempdir, repo, cleanup := prepareTempdirRepoSrc(t, test.src) + defer cleanup() + + back := fs.TestChdir(t, tempdir) + defer back() + + testFS := &TrackFS{ + FS: fs.Track{fs.Local{}}, + opened: make(map[string]uint), + } + + if testFS.errorOn == nil { + testFS.errorOn = make(map[string]error) + } + + testRepo := &failSaveRepo{ + Repository: repo, + failAfter: int32(test.failAfter), + err: test.err, + } + + arch := New(testRepo, testFS, Options{}) + + _, _, err := arch.Snapshot(ctx, []string{"."}, SnapshotOptions{Time: time.Now()}) + if errors.Cause(err) != test.err { + t.Errorf("expected error (%v) not found, got %v", test.err, errors.Cause(err)) + } + + t.Logf("Snapshot return error: %v", err) + + t.Logf("track fs: %v", testFS.opened) + + for k, v := range test.wantOpen { + if testFS.opened[k] != v { + t.Errorf("opened %v %d times, want %d", k, testFS.opened[k], v) + } + } + }) + } +} diff --git a/internal/archiver/file_saver.go b/internal/archiver/file_saver.go index 1e7b4aea5..66defe358 100644 --- a/internal/archiver/file_saver.go +++ b/internal/archiver/file_saver.go @@ -117,10 +117,12 @@ func (s *FileSaver) Save(ctx context.Context, snPath string, file fs.File, fi os case s.ch <- job: case <-s.done: debug.Log("not sending job, FileSaver is done") + _ = file.Close() close(ch) return FutureFile{ch: ch} case <-ctx.Done(): debug.Log("not sending job, context is cancelled: %v", ctx.Err()) + _ = file.Close() close(ch) return FutureFile{ch: ch} } From e43c9202a6a901bbd4c96020c48fd0cc6f69c446 Mon Sep 17 00:00:00 2001 From: Alexander Neumann Date: Sat, 12 May 2018 23:54:20 +0200 Subject: [PATCH 6/6] archiver: Make sure backend error is passed up --- internal/archiver/archiver.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/internal/archiver/archiver.go b/internal/archiver/archiver.go index 862c558e8..c6cae0fa6 100644 --- a/internal/archiver/archiver.go +++ b/internal/archiver/archiver.go @@ -218,6 +218,7 @@ func (arch *Archiver) SaveDir(ctx context.Context, snPath string, fi os.FileInfo for _, name := range names { // test if context has been cancelled if ctx.Err() != nil { + debug.Log("context has been cancelled, aborting") return FutureTree{}, ctx.Err() } @@ -767,7 +768,8 @@ func (arch *Archiver) Snapshot(ctx context.Context, targets []string, opts Snaps t.Kill(nil) werr := t.Wait() - if err != nil && errors.Cause(err) == context.Canceled { + debug.Log("err is %v, werr is %v", err, werr) + if err == nil || errors.Cause(err) == context.Canceled { err = werr }