2
2
mirror of https://github.com/octoleo/restic.git synced 2024-06-06 02:50:50 +00:00

Merge pull request #3748 from greatroar/runworkers

repository: Remove RunWorkers, report ctx.Err()
This commit is contained in:
MichaelEischer 2022-05-11 19:38:46 +02:00 committed by GitHub
commit df554e5f69
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 12 additions and 31 deletions

View File

@ -37,7 +37,7 @@ func ForAllIndexes(ctx context.Context, repo restic.Repository,
return repo.List(ctx, restic.IndexFile, func(id restic.ID, size int64) error { return repo.List(ctx, restic.IndexFile, func(id restic.ID, size int64) error {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return nil return ctx.Err()
case ch <- FileInfo{id, size}: case ch <- FileInfo{id, size}:
} }
return nil return nil
@ -69,9 +69,9 @@ func ForAllIndexes(ctx context.Context, repo restic.Repository,
} }
// run workers on ch // run workers on ch
wg.Go(func() error { for i := 0; i < loadIndexParallelism; i++ {
return RunWorkers(loadIndexParallelism, worker) wg.Go(worker)
}) }
return wg.Wait() return wg.Wait()
} }

View File

@ -365,7 +365,7 @@ func (mi *MasterIndex) Save(ctx context.Context, repo restic.Repository, packBla
select { select {
case ch <- newIndex: case ch <- newIndex:
case <-ctx.Done(): case <-ctx.Done():
return nil return ctx.Err()
} }
newIndex = NewIndex() newIndex = NewIndex()
} }
@ -397,10 +397,9 @@ func (mi *MasterIndex) Save(ctx context.Context, repo restic.Repository, packBla
} }
// run workers on ch // run workers on ch
wg.Go(func() error { for i := 0; i < saveIndexParallelism; i++ {
return RunWorkers(saveIndexParallelism, worker) wg.Go(worker)
}) }
err = wg.Wait() err = wg.Wait()
return obsolete, err return obsolete, err

View File

@ -679,7 +679,7 @@ func (r *Repository) CreateIndexFromPacks(ctx context.Context, packsize map[rest
for id, size := range packsize { for id, size := range packsize {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return nil return ctx.Err()
case ch <- FileInfo{id, size}: case ch <- FileInfo{id, size}:
} }
} }
@ -705,9 +705,9 @@ func (r *Repository) CreateIndexFromPacks(ctx context.Context, packsize map[rest
} }
// run workers on ch // run workers on ch
wg.Go(func() error { for i := 0; i < listPackParallelism; i++ {
return RunWorkers(listPackParallelism, worker) wg.Go(worker)
}) }
err = wg.Wait() err = wg.Wait()
if err != nil { if err != nil {

View File

@ -1,18 +0,0 @@
package repository
import (
"golang.org/x/sync/errgroup"
)
// RunWorkers runs count instances of workerFunc using an errgroup.Group.
// If an error occurs in one of the workers, it is returned.
func RunWorkers(count int, workerFunc func() error) error {
var wg errgroup.Group
// run workers
for i := 0; i < count; i++ {
wg.Go(workerFunc)
}
return wg.Wait()
}