From 2e0f1f5113477851e2957627b3c9b24fa96d0ceb Mon Sep 17 00:00:00 2001 From: greatroar <61184462+greatroar@users.noreply.github.com> Date: Tue, 10 May 2022 22:17:50 +0200 Subject: [PATCH] repository: Remove RunWorkers, report ctx.Err() This removes RunWorkers, which had become mere overhead by successive refactors. It also ensures that each former user of that function returns any context error that occurs, so failure to complete an operation is always reported as an error. --- internal/repository/index_parallel.go | 8 ++++---- internal/repository/master_index.go | 9 ++++----- internal/repository/repository.go | 8 ++++---- internal/repository/worker_group.go | 18 ------------------ 4 files changed, 12 insertions(+), 31 deletions(-) delete mode 100644 internal/repository/worker_group.go diff --git a/internal/repository/index_parallel.go b/internal/repository/index_parallel.go index 545a05921..47aaba85a 100644 --- a/internal/repository/index_parallel.go +++ b/internal/repository/index_parallel.go @@ -37,7 +37,7 @@ func ForAllIndexes(ctx context.Context, repo restic.Repository, return repo.List(ctx, restic.IndexFile, func(id restic.ID, size int64) error { select { case <-ctx.Done(): - return nil + return ctx.Err() case ch <- FileInfo{id, size}: } return nil @@ -69,9 +69,9 @@ func ForAllIndexes(ctx context.Context, repo restic.Repository, } // run workers on ch - wg.Go(func() error { - return RunWorkers(loadIndexParallelism, worker) - }) + for i := 0; i < loadIndexParallelism; i++ { + wg.Go(worker) + } return wg.Wait() } diff --git a/internal/repository/master_index.go b/internal/repository/master_index.go index 20aa26d38..22e763c35 100644 --- a/internal/repository/master_index.go +++ b/internal/repository/master_index.go @@ -365,7 +365,7 @@ func (mi *MasterIndex) Save(ctx context.Context, repo restic.Repository, packBla select { case ch <- newIndex: case <-ctx.Done(): - return nil + return ctx.Err() } newIndex = NewIndex() } @@ -397,10 +397,9 @@ func (mi *MasterIndex) Save(ctx context.Context, repo restic.Repository, packBla } // run workers on ch - wg.Go(func() error { - return RunWorkers(saveIndexParallelism, worker) - }) - + for i := 0; i < saveIndexParallelism; i++ { + wg.Go(worker) + } err = wg.Wait() return obsolete, err diff --git a/internal/repository/repository.go b/internal/repository/repository.go index 435c0c4fe..dc03a3b14 100644 --- a/internal/repository/repository.go +++ b/internal/repository/repository.go @@ -679,7 +679,7 @@ func (r *Repository) CreateIndexFromPacks(ctx context.Context, packsize map[rest for id, size := range packsize { select { case <-ctx.Done(): - return nil + return ctx.Err() case ch <- FileInfo{id, size}: } } @@ -705,9 +705,9 @@ func (r *Repository) CreateIndexFromPacks(ctx context.Context, packsize map[rest } // run workers on ch - wg.Go(func() error { - return RunWorkers(listPackParallelism, worker) - }) + for i := 0; i < listPackParallelism; i++ { + wg.Go(worker) + } err = wg.Wait() if err != nil { diff --git a/internal/repository/worker_group.go b/internal/repository/worker_group.go deleted file mode 100644 index c612d1d22..000000000 --- a/internal/repository/worker_group.go +++ /dev/null @@ -1,18 +0,0 @@ -package repository - -import ( - "golang.org/x/sync/errgroup" -) - -// RunWorkers runs count instances of workerFunc using an errgroup.Group. -// If an error occurs in one of the workers, it is returned. -func RunWorkers(count int, workerFunc func() error) error { - var wg errgroup.Group - - // run workers - for i := 0; i < count; i++ { - wg.Go(workerFunc) - } - - return wg.Wait() -}