From 408ac1a0c25fd0cb407c8d245fd5d21ad7acf136 Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Fri, 27 May 2022 19:08:50 +0200 Subject: [PATCH 1/3] archiver: remove tomb usage --- internal/archiver/archiver.go | 32 ++++++----- internal/archiver/archiver_test.go | 83 ++++++++++++++-------------- internal/archiver/blob_saver.go | 18 ++++-- internal/archiver/blob_saver_test.go | 18 +++--- internal/archiver/file_saver.go | 20 ++++--- internal/archiver/file_saver_test.go | 16 +++--- internal/archiver/tree_saver.go | 20 ++++--- internal/archiver/tree_saver_test.go | 18 +++--- 8 files changed, 123 insertions(+), 102 deletions(-) diff --git a/internal/archiver/archiver.go b/internal/archiver/archiver.go index e22c097ec..8b885c0b8 100644 --- a/internal/archiver/archiver.go +++ b/internal/archiver/archiver.go @@ -13,7 +13,7 @@ import ( "github.com/restic/restic/internal/errors" "github.com/restic/restic/internal/fs" "github.com/restic/restic/internal/restic" - tomb "gopkg.in/tomb.v2" + "golang.org/x/sync/errgroup" ) // SelectByNameFunc returns true for all items that should be included (files and @@ -762,17 +762,23 @@ func (arch *Archiver) loadParentTree(ctx context.Context, snapshotID restic.ID) } // runWorkers starts the worker pools, which are stopped when the context is cancelled. -func (arch *Archiver) runWorkers(ctx context.Context, t *tomb.Tomb) { - arch.blobSaver = NewBlobSaver(ctx, t, arch.Repo, arch.Options.SaveBlobConcurrency) +func (arch *Archiver) runWorkers(ctx context.Context, wg *errgroup.Group) { + arch.blobSaver = NewBlobSaver(ctx, wg, arch.Repo, arch.Options.SaveBlobConcurrency) - arch.fileSaver = NewFileSaver(ctx, t, + arch.fileSaver = NewFileSaver(ctx, wg, arch.blobSaver.Save, arch.Repo.Config().ChunkerPolynomial, arch.Options.FileReadConcurrency, arch.Options.SaveBlobConcurrency) arch.fileSaver.CompleteBlob = arch.CompleteBlob arch.fileSaver.NodeFromFileInfo = arch.nodeFromFileInfo - arch.treeSaver = NewTreeSaver(ctx, t, arch.Options.SaveTreeConcurrency, arch.saveTree, arch.Error) + arch.treeSaver = NewTreeSaver(ctx, wg, arch.Options.SaveTreeConcurrency, arch.saveTree, arch.Error) +} + +func (arch *Archiver) stopWorkers() { + arch.blobSaver.TriggerShutdown() + arch.fileSaver.TriggerShutdown() + arch.treeSaver.TriggerShutdown() } // Snapshot saves several targets and returns a snapshot. @@ -787,17 +793,16 @@ func (arch *Archiver) Snapshot(ctx context.Context, targets []string, opts Snaps return nil, restic.ID{}, err } - var t tomb.Tomb - wctx := t.Context(ctx) + wg, wgCtx := errgroup.WithContext(ctx) start := time.Now() var rootTreeID restic.ID var stats ItemStats - t.Go(func() error { - arch.runWorkers(wctx, &t) + wg.Go(func() error { + arch.runWorkers(wgCtx, wg) debug.Log("starting snapshot") - tree, err := arch.SaveTree(wctx, "/", atree, arch.loadParentTree(wctx, opts.ParentSnapshot)) + tree, err := arch.SaveTree(wgCtx, "/", atree, arch.loadParentTree(wgCtx, opts.ParentSnapshot)) if err != nil { return err } @@ -806,13 +811,12 @@ func (arch *Archiver) Snapshot(ctx context.Context, targets []string, opts Snaps return errors.New("snapshot is empty") } - rootTreeID, stats, err = arch.saveTree(wctx, tree) - // trigger shutdown but don't set an error - t.Kill(nil) + rootTreeID, stats, err = arch.saveTree(wgCtx, tree) + arch.stopWorkers() return err }) - err = t.Wait() + err = wg.Wait() debug.Log("err is %v", err) if err != nil { diff --git a/internal/archiver/archiver_test.go b/internal/archiver/archiver_test.go index 0d6295c39..0d0885a36 100644 --- a/internal/archiver/archiver_test.go +++ b/internal/archiver/archiver_test.go @@ -23,7 +23,7 @@ import ( "github.com/restic/restic/internal/repository" "github.com/restic/restic/internal/restic" restictest "github.com/restic/restic/internal/test" - tomb "gopkg.in/tomb.v2" + "golang.org/x/sync/errgroup" ) func prepareTempdirRepoSrc(t testing.TB, src TestDir) (tempdir string, repo restic.Repository, cleanup func()) { @@ -41,11 +41,10 @@ func prepareTempdirRepoSrc(t testing.TB, src TestDir) (tempdir string, repo rest } func saveFile(t testing.TB, repo restic.Repository, filename string, filesystem fs.FS) (*restic.Node, ItemStats) { - var tmb tomb.Tomb - ctx := tmb.Context(context.Background()) + wg, ctx := errgroup.WithContext(context.TODO()) arch := New(repo, filesystem, Options{}) - arch.runWorkers(ctx, &tmb) + arch.runWorkers(ctx, wg) arch.Error = func(item string, fi os.FileInfo, err error) error { t.Errorf("archiver error for %v: %v", item, err) @@ -87,14 +86,13 @@ func saveFile(t testing.TB, repo restic.Repository, filename string, filesystem t.Fatal(res.Err()) } - tmb.Kill(nil) - err = tmb.Wait() + arch.stopWorkers() + err = repo.Flush(context.Background()) if err != nil { t.Fatal(err) } - err = repo.Flush(context.Background()) - if err != nil { + if err := wg.Wait(); err != nil { t.Fatal(err) } @@ -214,14 +212,14 @@ func TestArchiverSave(t *testing.T) { tempdir, repo, cleanup := prepareTempdirRepoSrc(t, TestDir{"file": testfile}) defer cleanup() - var tmb tomb.Tomb + wg, ctx := errgroup.WithContext(ctx) arch := New(repo, fs.Track{FS: fs.Local{}}, Options{}) arch.Error = func(item string, fi os.FileInfo, err error) error { t.Errorf("archiver error for %v: %v", item, err) return err } - arch.runWorkers(tmb.Context(ctx), &tmb) + arch.runWorkers(ctx, wg) node, excluded, err := arch.Save(ctx, "/", filepath.Join(tempdir, "file"), nil) if err != nil { @@ -243,6 +241,7 @@ func TestArchiverSave(t *testing.T) { stats := node.stats + arch.stopWorkers() err = repo.Flush(ctx) if err != nil { t.Fatal(err) @@ -281,6 +280,8 @@ func TestArchiverSaveReaderFS(t *testing.T) { repo, cleanup := repository.TestRepository(t) defer cleanup() + wg, ctx := errgroup.WithContext(ctx) + ts := time.Now() filename := "xx" readerFs := &fs.Reader{ @@ -290,14 +291,12 @@ func TestArchiverSaveReaderFS(t *testing.T) { ReadCloser: ioutil.NopCloser(strings.NewReader(test.Data)), } - var tmb tomb.Tomb - arch := New(repo, readerFs, Options{}) arch.Error = func(item string, fi os.FileInfo, err error) error { t.Errorf("archiver error for %v: %v", item, err) return err } - arch.runWorkers(tmb.Context(ctx), &tmb) + arch.runWorkers(ctx, wg) node, excluded, err := arch.Save(ctx, "/", filename, nil) t.Logf("Save returned %v %v", node, err) @@ -320,6 +319,7 @@ func TestArchiverSaveReaderFS(t *testing.T) { stats := node.stats + arch.stopWorkers() err = repo.Flush(ctx) if err != nil { t.Fatal(err) @@ -826,14 +826,13 @@ func TestArchiverSaveDir(t *testing.T) { for _, test := range tests { t.Run("", func(t *testing.T) { - var tmb tomb.Tomb - ctx := tmb.Context(context.Background()) - tempdir, repo, cleanup := prepareTempdirRepoSrc(t, test.src) defer cleanup() + wg, ctx := errgroup.WithContext(context.Background()) + arch := New(repo, fs.Track{FS: fs.Local{}}, Options{}) - arch.runWorkers(ctx, &tmb) + arch.runWorkers(ctx, wg) chdir := tempdir if test.chdir != "" { @@ -856,12 +855,6 @@ func TestArchiverSaveDir(t *testing.T) { ft.Wait(ctx) node, stats := ft.Node(), ft.Stats() - tmb.Kill(nil) - err = tmb.Wait() - if err != nil { - t.Fatal(err) - } - t.Logf("stats: %v", stats) if stats.DataSize != 0 { t.Errorf("wrong stats returned in DataSize, want 0, got %d", stats.DataSize) @@ -876,24 +869,29 @@ func TestArchiverSaveDir(t *testing.T) { t.Errorf("wrong stats returned in TreeBlobs, want > 0, got %d", stats.TreeBlobs) } - ctx = context.Background() node.Name = targetNodeName tree := &restic.Tree{Nodes: []*restic.Node{node}} treeID, err := repo.SaveTree(ctx, tree) if err != nil { t.Fatal(err) } + arch.stopWorkers() err = repo.Flush(ctx) if err != nil { t.Fatal(err) } + err = wg.Wait() + if err != nil { + t.Fatal(err) + } + want := test.want if want == nil { want = test.src } - TestEnsureTree(ctx, t, "/", repo, treeID, want) + TestEnsureTree(context.TODO(), t, "/", repo, treeID, want) }) } } @@ -915,11 +913,10 @@ func TestArchiverSaveDirIncremental(t *testing.T) { // save the empty directory several times in a row, then have a look if the // archiver did save the same tree several times for i := 0; i < 5; i++ { - var tmb tomb.Tomb - ctx := tmb.Context(context.Background()) + wg, ctx := errgroup.WithContext(context.TODO()) arch := New(repo, fs.Track{FS: fs.Local{}}, Options{}) - arch.runWorkers(ctx, &tmb) + arch.runWorkers(ctx, wg) fi, err := fs.Lstat(tempdir) if err != nil { @@ -934,8 +931,6 @@ func TestArchiverSaveDirIncremental(t *testing.T) { ft.Wait(ctx) node, stats := ft.Node(), ft.Stats() - tmb.Kill(nil) - err = tmb.Wait() if err != nil { t.Fatal(err) } @@ -972,7 +967,12 @@ func TestArchiverSaveDirIncremental(t *testing.T) { t.Logf("node subtree %v", node.Subtree) - err = repo.Flush(context.Background()) + arch.stopWorkers() + err = repo.Flush(ctx) + if err != nil { + t.Fatal(err) + } + err = wg.Wait() if err != nil { t.Fatal(err) } @@ -1081,9 +1081,6 @@ func TestArchiverSaveTree(t *testing.T) { for _, test := range tests { t.Run("", func(t *testing.T) { - var tmb tomb.Tomb - ctx := tmb.Context(context.Background()) - tempdir, repo, cleanup := prepareTempdirRepoSrc(t, test.src) defer cleanup() @@ -1099,7 +1096,9 @@ func TestArchiverSaveTree(t *testing.T) { stat.Add(s) } - arch.runWorkers(ctx, &tmb) + wg, ctx := errgroup.WithContext(context.TODO()) + + arch.runWorkers(ctx, wg) back := restictest.Chdir(t, tempdir) defer back() @@ -1123,14 +1122,12 @@ func TestArchiverSaveTree(t *testing.T) { t.Fatal(err) } - tmb.Kill(nil) - err = tmb.Wait() + arch.stopWorkers() + err = repo.Flush(ctx) if err != nil { t.Fatal(err) } - - ctx = context.Background() - err = repo.Flush(ctx) + err = wg.Wait() if err != nil { t.Fatal(err) } @@ -1139,7 +1136,7 @@ func TestArchiverSaveTree(t *testing.T) { if want == nil { want = test.src } - TestEnsureTree(ctx, t, "/", repo, treeID, want) + TestEnsureTree(context.TODO(), t, "/", repo, treeID, want) bothZeroOrNeither(t, uint64(test.stat.DataBlobs), uint64(stat.DataBlobs)) bothZeroOrNeither(t, uint64(test.stat.TreeBlobs), uint64(stat.TreeBlobs)) bothZeroOrNeither(t, test.stat.DataSize, stat.DataSize) @@ -2240,14 +2237,14 @@ func TestRacyFileSwap(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - var tmb tomb.Tomb + wg, ctx := errgroup.WithContext(ctx) arch := New(repo, fs.Track{FS: statfs}, Options{}) arch.Error = func(item string, fi os.FileInfo, err error) error { t.Logf("archiver error as expected for %v: %v", item, err) return err } - arch.runWorkers(tmb.Context(ctx), &tmb) + arch.runWorkers(ctx, wg) // fs.Track will panic if the file was not closed _, excluded, err := arch.Save(ctx, "/", tempfile, nil) diff --git a/internal/archiver/blob_saver.go b/internal/archiver/blob_saver.go index c59f08133..1ee319091 100644 --- a/internal/archiver/blob_saver.go +++ b/internal/archiver/blob_saver.go @@ -5,7 +5,7 @@ import ( "github.com/restic/restic/internal/debug" "github.com/restic/restic/internal/restic" - tomb "gopkg.in/tomb.v2" + "golang.org/x/sync/errgroup" ) // Saver allows saving a blob. @@ -22,7 +22,7 @@ type BlobSaver struct { // NewBlobSaver returns a new blob. A worker pool is started, it is stopped // when ctx is cancelled. -func NewBlobSaver(ctx context.Context, t *tomb.Tomb, repo Saver, workers uint) *BlobSaver { +func NewBlobSaver(ctx context.Context, wg *errgroup.Group, repo Saver, workers uint) *BlobSaver { ch := make(chan saveBlobJob) s := &BlobSaver{ repo: repo, @@ -30,14 +30,18 @@ func NewBlobSaver(ctx context.Context, t *tomb.Tomb, repo Saver, workers uint) * } for i := uint(0); i < workers; i++ { - t.Go(func() error { - return s.worker(t.Context(ctx), ch) + wg.Go(func() error { + return s.worker(ctx, ch) }) } return s } +func (s *BlobSaver) TriggerShutdown() { + close(s.ch) +} + // Save stores a blob in the repo. It checks the index and the known blobs // before saving anything. It takes ownership of the buffer passed in. func (s *BlobSaver) Save(ctx context.Context, t restic.BlobType, buf *Buffer) FutureBlob { @@ -114,10 +118,14 @@ func (s *BlobSaver) saveBlob(ctx context.Context, t restic.BlobType, buf []byte) func (s *BlobSaver) worker(ctx context.Context, jobs <-chan saveBlobJob) error { for { var job saveBlobJob + var ok bool select { case <-ctx.Done(): return nil - case job = <-jobs: + case job, ok = <-jobs: + if !ok { + return nil + } } res, err := s.saveBlob(ctx, job.BlobType, job.buf.Data) diff --git a/internal/archiver/blob_saver_test.go b/internal/archiver/blob_saver_test.go index df63d3209..54aa374cf 100644 --- a/internal/archiver/blob_saver_test.go +++ b/internal/archiver/blob_saver_test.go @@ -10,7 +10,7 @@ import ( "github.com/restic/restic/internal/errors" "github.com/restic/restic/internal/repository" "github.com/restic/restic/internal/restic" - tomb "gopkg.in/tomb.v2" + "golang.org/x/sync/errgroup" ) var errTest = errors.New("test error") @@ -38,12 +38,12 @@ func TestBlobSaver(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - tmb, ctx := tomb.WithContext(ctx) + wg, ctx := errgroup.WithContext(ctx) saver := &saveFail{ idx: repository.NewMasterIndex(), } - b := NewBlobSaver(ctx, tmb, saver, uint(runtime.NumCPU())) + b := NewBlobSaver(ctx, wg, saver, uint(runtime.NumCPU())) var results []FutureBlob @@ -60,9 +60,9 @@ func TestBlobSaver(t *testing.T) { } } - tmb.Kill(nil) + b.TriggerShutdown() - err := tmb.Wait() + err := wg.Wait() if err != nil { t.Fatal(err) } @@ -84,22 +84,22 @@ func TestBlobSaverError(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - tmb, ctx := tomb.WithContext(ctx) + wg, ctx := errgroup.WithContext(ctx) saver := &saveFail{ idx: repository.NewMasterIndex(), failAt: int32(test.failAt), } - b := NewBlobSaver(ctx, tmb, saver, uint(runtime.NumCPU())) + b := NewBlobSaver(ctx, wg, saver, uint(runtime.NumCPU())) for i := 0; i < test.blobs; i++ { buf := &Buffer{Data: []byte(fmt.Sprintf("foo%d", i))} b.Save(ctx, restic.DataBlob, buf) } - tmb.Kill(nil) + b.TriggerShutdown() - err := tmb.Wait() + err := wg.Wait() if err == nil { t.Errorf("expected error not found") } diff --git a/internal/archiver/file_saver.go b/internal/archiver/file_saver.go index 2c43aefa1..64a46f3bb 100644 --- a/internal/archiver/file_saver.go +++ b/internal/archiver/file_saver.go @@ -10,7 +10,7 @@ import ( "github.com/restic/restic/internal/errors" "github.com/restic/restic/internal/fs" "github.com/restic/restic/internal/restic" - tomb "gopkg.in/tomb.v2" + "golang.org/x/sync/errgroup" ) // FutureFile is returned by Save and will return the data once it @@ -67,7 +67,7 @@ type FileSaver struct { // NewFileSaver returns a new file saver. A worker pool with fileWorkers is // started, it is stopped when ctx is cancelled. -func NewFileSaver(ctx context.Context, t *tomb.Tomb, save SaveBlobFn, pol chunker.Pol, fileWorkers, blobWorkers uint) *FileSaver { +func NewFileSaver(ctx context.Context, wg *errgroup.Group, save SaveBlobFn, pol chunker.Pol, fileWorkers, blobWorkers uint) *FileSaver { ch := make(chan saveFileJob) debug.Log("new file saver with %v file workers and %v blob workers", fileWorkers, blobWorkers) @@ -84,8 +84,8 @@ func NewFileSaver(ctx context.Context, t *tomb.Tomb, save SaveBlobFn, pol chunke } for i := uint(0); i < fileWorkers; i++ { - t.Go(func() error { - s.worker(t.Context(ctx), ch) + wg.Go(func() error { + s.worker(ctx, ch) return nil }) } @@ -93,6 +93,10 @@ func NewFileSaver(ctx context.Context, t *tomb.Tomb, save SaveBlobFn, pol chunke return s } +func (s *FileSaver) TriggerShutdown() { + close(s.ch) +} + // CompleteFunc is called when the file has been saved. type CompleteFunc func(*restic.Node, ItemStats) @@ -115,7 +119,6 @@ func (s *FileSaver) Save(ctx context.Context, snPath string, file fs.File, fi os debug.Log("not sending job, context is cancelled: %v", ctx.Err()) _ = file.Close() close(ch) - return FutureFile{ch: ch} } return FutureFile{ch: ch} @@ -226,12 +229,15 @@ func (s *FileSaver) worker(ctx context.Context, jobs <-chan saveFileJob) { for { var job saveFileJob + var ok bool select { case <-ctx.Done(): return - case job = <-jobs: + case job, ok = <-jobs: + if !ok { + return + } } - res := s.saveFile(ctx, chnker, job.snPath, job.file, job.fi, job.start) if job.complete != nil { job.complete(res.node, res.stats) diff --git a/internal/archiver/file_saver_test.go b/internal/archiver/file_saver_test.go index d4f4fe82b..497882fcb 100644 --- a/internal/archiver/file_saver_test.go +++ b/internal/archiver/file_saver_test.go @@ -12,7 +12,7 @@ import ( "github.com/restic/restic/internal/fs" "github.com/restic/restic/internal/restic" "github.com/restic/restic/internal/test" - tomb "gopkg.in/tomb.v2" + "golang.org/x/sync/errgroup" ) func createTestFiles(t testing.TB, num int) (files []string, cleanup func()) { @@ -30,8 +30,8 @@ func createTestFiles(t testing.TB, num int) (files []string, cleanup func()) { return files, cleanup } -func startFileSaver(ctx context.Context, t testing.TB) (*FileSaver, context.Context, *tomb.Tomb) { - tmb, ctx := tomb.WithContext(ctx) +func startFileSaver(ctx context.Context, t testing.TB) (*FileSaver, context.Context, *errgroup.Group) { + wg, ctx := errgroup.WithContext(ctx) saveBlob := func(ctx context.Context, tpe restic.BlobType, buf *Buffer) FutureBlob { ch := make(chan saveBlobResponse) @@ -45,10 +45,10 @@ func startFileSaver(ctx context.Context, t testing.TB) (*FileSaver, context.Cont t.Fatal(err) } - s := NewFileSaver(ctx, tmb, saveBlob, pol, workers, workers) + s := NewFileSaver(ctx, wg, saveBlob, pol, workers, workers) s.NodeFromFileInfo = restic.NodeFromFileInfo - return s, ctx, tmb + return s, ctx, wg } func TestFileSaver(t *testing.T) { @@ -62,7 +62,7 @@ func TestFileSaver(t *testing.T) { completeFn := func(*restic.Node, ItemStats) {} testFs := fs.Local{} - s, ctx, tmb := startFileSaver(ctx, t) + s, ctx, wg := startFileSaver(ctx, t) var results []FutureFile @@ -88,9 +88,9 @@ func TestFileSaver(t *testing.T) { } } - tmb.Kill(nil) + s.TriggerShutdown() - err := tmb.Wait() + err := wg.Wait() if err != nil { t.Fatal(err) } diff --git a/internal/archiver/tree_saver.go b/internal/archiver/tree_saver.go index 867bad6aa..afa58be40 100644 --- a/internal/archiver/tree_saver.go +++ b/internal/archiver/tree_saver.go @@ -5,7 +5,7 @@ import ( "github.com/restic/restic/internal/debug" "github.com/restic/restic/internal/restic" - tomb "gopkg.in/tomb.v2" + "golang.org/x/sync/errgroup" ) // FutureTree is returned by Save and will return the data once it @@ -47,7 +47,7 @@ type TreeSaver struct { // NewTreeSaver returns a new tree saver. A worker pool with treeWorkers is // started, it is stopped when ctx is cancelled. -func NewTreeSaver(ctx context.Context, t *tomb.Tomb, treeWorkers uint, saveTree func(context.Context, *restic.Tree) (restic.ID, ItemStats, error), errFn ErrorFunc) *TreeSaver { +func NewTreeSaver(ctx context.Context, wg *errgroup.Group, treeWorkers uint, saveTree func(context.Context, *restic.Tree) (restic.ID, ItemStats, error), errFn ErrorFunc) *TreeSaver { ch := make(chan saveTreeJob) s := &TreeSaver{ @@ -57,14 +57,18 @@ func NewTreeSaver(ctx context.Context, t *tomb.Tomb, treeWorkers uint, saveTree } for i := uint(0); i < treeWorkers; i++ { - t.Go(func() error { - return s.worker(t.Context(ctx), ch) + wg.Go(func() error { + return s.worker(ctx, ch) }) } return s } +func (s *TreeSaver) TriggerShutdown() { + close(s.ch) +} + // Save stores the dir d and returns the data once it has been completed. func (s *TreeSaver) Save(ctx context.Context, snPath string, node *restic.Node, nodes []FutureNode, complete CompleteFunc) FutureTree { ch := make(chan saveTreeResponse, 1) @@ -80,7 +84,6 @@ func (s *TreeSaver) Save(ctx context.Context, snPath string, node *restic.Node, case <-ctx.Done(): debug.Log("not saving tree, context is cancelled") close(ch) - return FutureTree{ch: ch} } return FutureTree{ch: ch} @@ -146,12 +149,15 @@ func (s *TreeSaver) save(ctx context.Context, snPath string, node *restic.Node, func (s *TreeSaver) worker(ctx context.Context, jobs <-chan saveTreeJob) error { for { var job saveTreeJob + var ok bool select { case <-ctx.Done(): return nil - case job = <-jobs: + case job, ok = <-jobs: + if !ok { + return nil + } } - node, stats, err := s.save(ctx, job.snPath, job.node, job.nodes) if err != nil { debug.Log("error saving tree blob: %v", err) diff --git a/internal/archiver/tree_saver_test.go b/internal/archiver/tree_saver_test.go index c9c589d1c..8ffafcaad 100644 --- a/internal/archiver/tree_saver_test.go +++ b/internal/archiver/tree_saver_test.go @@ -10,14 +10,14 @@ import ( "github.com/restic/restic/internal/errors" "github.com/restic/restic/internal/restic" - tomb "gopkg.in/tomb.v2" + "golang.org/x/sync/errgroup" ) func TestTreeSaver(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - tmb, ctx := tomb.WithContext(ctx) + wg, ctx := errgroup.WithContext(ctx) saveFn := func(context.Context, *restic.Tree) (restic.ID, ItemStats, error) { return restic.NewRandomID(), ItemStats{TreeBlobs: 1, TreeSize: 123}, nil @@ -27,7 +27,7 @@ func TestTreeSaver(t *testing.T) { return nil } - b := NewTreeSaver(ctx, tmb, uint(runtime.NumCPU()), saveFn, errFn) + b := NewTreeSaver(ctx, wg, uint(runtime.NumCPU()), saveFn, errFn) var results []FutureTree @@ -44,9 +44,9 @@ func TestTreeSaver(t *testing.T) { tree.Wait(ctx) } - tmb.Kill(nil) + b.TriggerShutdown() - err := tmb.Wait() + err := wg.Wait() if err != nil { t.Fatal(err) } @@ -71,7 +71,7 @@ func TestTreeSaverError(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - tmb, ctx := tomb.WithContext(ctx) + wg, ctx := errgroup.WithContext(ctx) var num int32 saveFn := func(context.Context, *restic.Tree) (restic.ID, ItemStats, error) { @@ -88,7 +88,7 @@ func TestTreeSaverError(t *testing.T) { return nil } - b := NewTreeSaver(ctx, tmb, uint(runtime.NumCPU()), saveFn, errFn) + b := NewTreeSaver(ctx, wg, uint(runtime.NumCPU()), saveFn, errFn) var results []FutureTree @@ -105,9 +105,9 @@ func TestTreeSaverError(t *testing.T) { tree.Wait(ctx) } - tmb.Kill(nil) + b.TriggerShutdown() - err := tmb.Wait() + err := wg.Wait() if err == nil { t.Errorf("expected error not found") } From e002b09d57324aca62fdeaa9d57ea4a675d68af2 Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Sun, 5 Jun 2022 15:48:10 +0200 Subject: [PATCH 2/3] archiver: free workers once finished --- internal/archiver/archiver.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/internal/archiver/archiver.go b/internal/archiver/archiver.go index 8b885c0b8..2c7af3ae0 100644 --- a/internal/archiver/archiver.go +++ b/internal/archiver/archiver.go @@ -779,6 +779,9 @@ func (arch *Archiver) stopWorkers() { arch.blobSaver.TriggerShutdown() arch.fileSaver.TriggerShutdown() arch.treeSaver.TriggerShutdown() + arch.blobSaver = nil + arch.fileSaver = nil + arch.treeSaver = nil } // Snapshot saves several targets and returns a snapshot. From 853ceb3bec856c1ac6bee000073750364974add3 Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Fri, 27 May 2022 19:27:14 +0200 Subject: [PATCH 3/3] get rid of tomb package --- cmd/restic/cmd_backup.go | 40 ++++++++++++++++++++++++---------------- go.mod | 1 - go.sum | 2 -- 3 files changed, 24 insertions(+), 19 deletions(-) diff --git a/cmd/restic/cmd_backup.go b/cmd/restic/cmd_backup.go index bf1e17a47..5e9476984 100644 --- a/cmd/restic/cmd_backup.go +++ b/cmd/restic/cmd_backup.go @@ -12,10 +12,11 @@ import ( "path/filepath" "runtime" "strings" + "sync" "time" "github.com/spf13/cobra" - tomb "gopkg.in/tomb.v2" + "golang.org/x/sync/errgroup" "github.com/restic/restic/internal/archiver" "github.com/restic/restic/internal/debug" @@ -55,16 +56,22 @@ Exit status is 3 if some source data could not be read (incomplete snapshot crea }, DisableAutoGenTag: true, RunE: func(cmd *cobra.Command, args []string) error { - var t tomb.Tomb - term := termstatus.New(globalOptions.stdout, globalOptions.stderr, globalOptions.Quiet) - t.Go(func() error { term.Run(t.Context(globalOptions.ctx)); return nil }) + var wg sync.WaitGroup + cancelCtx, cancel := context.WithCancel(globalOptions.ctx) + defer func() { + // shutdown termstatus + cancel() + wg.Wait() + }() - err := runBackup(backupOptions, globalOptions, term, args) - t.Kill(nil) - if werr := t.Wait(); werr != nil { - panic(fmt.Sprintf("term.Run() returned err: %v", err)) - } - return err + term := termstatus.New(globalOptions.stdout, globalOptions.stderr, globalOptions.Quiet) + wg.Add(1) + go func() { + defer wg.Done() + term.Run(cancelCtx) + }() + + return runBackup(backupOptions, globalOptions, term, args) }, } @@ -534,8 +541,6 @@ func runBackup(opts BackupOptions, gopts GlobalOptions, term *termstatus.Termina } } - var t tomb.Tomb - if gopts.verbosity >= 2 && !gopts.JSON { Verbosef("open repository\n") } @@ -567,7 +572,10 @@ func runBackup(opts BackupOptions, gopts GlobalOptions, term *termstatus.Termina progressReporter.SetMinUpdatePause(calculateProgressInterval(!gopts.Quiet, gopts.JSON)) - t.Go(func() error { return progressReporter.Run(t.Context(gopts.ctx)) }) + wg, wgCtx := errgroup.WithContext(gopts.ctx) + cancelCtx, cancel := context.WithCancel(wgCtx) + defer cancel() + wg.Go(func() error { return progressReporter.Run(cancelCtx) }) if !gopts.JSON { progressPrinter.V("lock repository") @@ -675,7 +683,7 @@ func runBackup(opts BackupOptions, gopts GlobalOptions, term *termstatus.Termina if !gopts.JSON { progressPrinter.V("start scan on %v", targets) } - t.Go(func() error { return sc.Scan(t.Context(gopts.ctx), targets) }) + wg.Go(func() error { return sc.Scan(cancelCtx, targets) }) arch := archiver.New(repo, targetFS, archiver.Options{}) arch.SelectByName = selectByNameFilter @@ -717,10 +725,10 @@ func runBackup(opts BackupOptions, gopts GlobalOptions, term *termstatus.Termina _, id, err := arch.Snapshot(gopts.ctx, targets, snapshotOpts) // cleanly shutdown all running goroutines - t.Kill(nil) + cancel() // let's see if one returned an error - werr := t.Wait() + werr := wg.Wait() // return original error if err != nil { diff --git a/go.mod b/go.mod index 2813190ce..81d35692f 100644 --- a/go.mod +++ b/go.mod @@ -46,7 +46,6 @@ require ( google.golang.org/genproto v0.0.0-20220324131243-acbaeb5b85eb // indirect google.golang.org/protobuf v1.28.0 // indirect gopkg.in/ini.v1 v1.66.4 // indirect - gopkg.in/tomb.v2 v2.0.0-20161208151619-d5d1b5820637 gopkg.in/yaml.v3 v3.0.0 // indirect ) diff --git a/go.sum b/go.sum index a176f11be..4f1ff9309 100644 --- a/go.sum +++ b/go.sum @@ -750,8 +750,6 @@ gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= gopkg.in/ini.v1 v1.57.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= gopkg.in/ini.v1 v1.66.4 h1:SsAcf+mM7mRZo2nJNGt8mZCjG8ZRaNGMURJw7BsIST4= gopkg.in/ini.v1 v1.66.4/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= -gopkg.in/tomb.v2 v2.0.0-20161208151619-d5d1b5820637 h1:yiW+nvdHb9LVqSHQBXfZCieqV4fzYhNBql77zY0ykqs= -gopkg.in/tomb.v2 v2.0.0-20161208151619-d5d1b5820637/go.mod h1:BHsqpu/nsuzkT5BpiH1EMZPLyqSMM8JbIavyFACoFNk= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.3/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=