diff --git a/internal/index/index_parallel.go b/internal/index/index_parallel.go index 83f9122c6..a76b08a4e 100644 --- a/internal/index/index_parallel.go +++ b/internal/index/index_parallel.go @@ -5,9 +5,7 @@ import ( "runtime" "sync" - "github.com/restic/restic/internal/debug" "github.com/restic/restic/internal/restic" - "golang.org/x/sync/errgroup" ) // ForAllIndexes loads all index files in parallel and calls the given callback. @@ -16,68 +14,23 @@ import ( func ForAllIndexes(ctx context.Context, repo restic.Repository, fn func(id restic.ID, index *Index, oldFormat bool, err error) error) error { - debug.Log("Start") - - type FileInfo struct { - restic.ID - Size int64 - } - - var m sync.Mutex - - // track spawned goroutines using wg, create a new context which is - // cancelled as soon as an error occurs. - wg, ctx := errgroup.WithContext(ctx) - - ch := make(chan FileInfo) - // send list of index files through ch, which is closed afterwards - wg.Go(func() error { - defer close(ch) - return repo.List(ctx, restic.IndexFile, func(id restic.ID, size int64) error { - select { - case <-ctx.Done(): - return ctx.Err() - case ch <- FileInfo{id, size}: - } - return nil - }) - }) - - // a worker receives an index ID from ch, loads the index, and sends it to indexCh - worker := func() error { - var buf []byte - for fi := range ch { - debug.Log("worker got file %v", fi.ID.Str()) - var err error - var idx *Index - oldFormat := false - - if cap(buf) < int(fi.Size) { - // overallocate a bit - buf = make([]byte, fi.Size+128*1024) - } - buf, err = repo.LoadUnpacked(ctx, restic.IndexFile, fi.ID, buf[:0]) - if err == nil { - idx, oldFormat, err = DecodeIndex(buf, fi.ID) - } - - m.Lock() - err = fn(fi.ID, idx, oldFormat, err) - m.Unlock() - if err != nil { - return err - } - } - return nil - } - // decoding an index can take quite some time such that this can be both CPU- or IO-bound // as the whole index is kept in memory anyways, a few workers too much don't matter - workerCount := int(repo.Connections()) + runtime.GOMAXPROCS(0) - // run workers on ch - for i := 0; i < workerCount; i++ { - wg.Go(worker) - } + workerCount := repo.Connections() + uint(runtime.GOMAXPROCS(0)) - return wg.Wait() + var m sync.Mutex + return restic.ParallelList(ctx, repo.Backend(), restic.IndexFile, workerCount, func(ctx context.Context, id restic.ID, size int64) error { + var err error + var idx *Index + oldFormat := false + + buf, err := repo.LoadUnpacked(ctx, restic.IndexFile, id, nil) + if err == nil { + idx, oldFormat, err = DecodeIndex(buf, id) + } + + m.Lock() + defer m.Unlock() + return fn(id, idx, oldFormat, err) + }) } diff --git a/internal/restic/lock.go b/internal/restic/lock.go index e27990aed..0f093ae1e 100644 --- a/internal/restic/lock.go +++ b/internal/restic/lock.go @@ -12,7 +12,6 @@ import ( "time" "github.com/restic/restic/internal/errors" - "golang.org/x/sync/errgroup" "github.com/restic/restic/internal/debug" ) @@ -320,50 +319,15 @@ func RemoveAllLocks(ctx context.Context, repo Repository) (uint, error) { func ForAllLocks(ctx context.Context, repo Repository, excludeID *ID, fn func(ID, *Lock, error) error) error { var m sync.Mutex - // track spawned goroutines using wg, create a new context which is - // cancelled as soon as an error occurs. - wg, ctx := errgroup.WithContext(ctx) - - ch := make(chan ID) - - // send list of lock files through ch, which is closed afterwards - wg.Go(func() error { - defer close(ch) - return repo.List(ctx, LockFile, func(id ID, size int64) error { - if excludeID != nil && id.Equal(*excludeID) { - return nil - } - - select { - case <-ctx.Done(): - return nil - case ch <- id: - } - return nil - }) - }) - - // a worker receives an snapshot ID from ch, loads the snapshot - // and runs fn with id, the snapshot and the error - worker := func() error { - for id := range ch { - debug.Log("load lock %v", id) - lock, err := LoadLock(ctx, repo, id) - - m.Lock() - err = fn(id, lock, err) - m.Unlock() - if err != nil { - return err - } - } - return nil - } - // For locks decoding is nearly for free, thus just assume were only limited by IO - for i := 0; i < int(repo.Connections()); i++ { - wg.Go(worker) - } + return ParallelList(ctx, repo.Backend(), LockFile, repo.Connections(), func(ctx context.Context, id ID, size int64) error { + if excludeID != nil && id.Equal(*excludeID) { + return nil + } + lock, err := LoadLock(ctx, repo, id) - return wg.Wait() + m.Lock() + defer m.Unlock() + return fn(id, lock, err) + }) } diff --git a/internal/restic/parallel.go b/internal/restic/parallel.go new file mode 100644 index 000000000..df160f018 --- /dev/null +++ b/internal/restic/parallel.go @@ -0,0 +1,59 @@ +package restic + +import ( + "context" + + "github.com/restic/restic/internal/debug" + "golang.org/x/sync/errgroup" +) + +func ParallelList(ctx context.Context, r Lister, t FileType, parallelism uint, fn func(context.Context, ID, int64) error) error { + + type FileIDInfo struct { + ID + Size int64 + } + + // track spawned goroutines using wg, create a new context which is + // cancelled as soon as an error occurs. + wg, ctx := errgroup.WithContext(ctx) + + ch := make(chan FileIDInfo) + // send list of index files through ch, which is closed afterwards + wg.Go(func() error { + defer close(ch) + return r.List(ctx, t, func(fi FileInfo) error { + id, err := ParseID(fi.Name) + if err != nil { + debug.Log("unable to parse %v as an ID", fi.Name) + return nil + } + + select { + case <-ctx.Done(): + return nil + case ch <- FileIDInfo{id, fi.Size}: + } + return nil + }) + }) + + // a worker receives an index ID from ch, loads the index, and sends it to indexCh + worker := func() error { + for fi := range ch { + debug.Log("worker got file %v", fi.ID.Str()) + err := fn(ctx, fi.ID, fi.Size) + if err != nil { + return err + } + } + return nil + } + + // run workers on ch + for i := uint(0); i < parallelism; i++ { + wg.Go(worker) + } + + return wg.Wait() +} diff --git a/internal/restic/snapshot.go b/internal/restic/snapshot.go index 7f8d4164b..58d863526 100644 --- a/internal/restic/snapshot.go +++ b/internal/restic/snapshot.go @@ -8,8 +8,6 @@ import ( "sync" "time" - "golang.org/x/sync/errgroup" - "github.com/restic/restic/internal/debug" ) @@ -82,58 +80,17 @@ func SaveSnapshot(ctx context.Context, repo SaverUnpacked, sn *Snapshot) (ID, er func ForAllSnapshots(ctx context.Context, be Lister, loader LoaderUnpacked, excludeIDs IDSet, fn func(ID, *Snapshot, error) error) error { var m sync.Mutex - // track spawned goroutines using wg, create a new context which is - // cancelled as soon as an error occurs. - wg, ctx := errgroup.WithContext(ctx) - - ch := make(chan ID) - - // send list of snapshot files through ch, which is closed afterwards - wg.Go(func() error { - defer close(ch) - return be.List(ctx, SnapshotFile, func(fi FileInfo) error { - id, err := ParseID(fi.Name) - if err != nil { - debug.Log("unable to parse %v as an ID", fi.Name) - return nil - } - - if excludeIDs.Has(id) { - return nil - } - - select { - case <-ctx.Done(): - return nil - case ch <- id: - } - return nil - }) - }) - - // a worker receives an snapshot ID from ch, loads the snapshot - // and runs fn with id, the snapshot and the error - worker := func() error { - for id := range ch { - debug.Log("load snapshot %v", id) - sn, err := LoadSnapshot(ctx, loader, id) - - m.Lock() - err = fn(id, sn, err) - m.Unlock() - if err != nil { - return err - } - } - return nil - } - // For most snapshots decoding is nearly for free, thus just assume were only limited by IO - for i := 0; i < int(loader.Connections()); i++ { - wg.Go(worker) - } + return ParallelList(ctx, be, SnapshotFile, loader.Connections(), func(ctx context.Context, id ID, size int64) error { + if excludeIDs.Has(id) { + return nil + } - return wg.Wait() + sn, err := LoadSnapshot(ctx, loader, id) + m.Lock() + defer m.Unlock() + return fn(id, sn, err) + }) } func (sn Snapshot) String() string {