diff --git a/internal/restic/lock.go b/internal/restic/lock.go index 7047bb52d..e79882f80 100644 --- a/internal/restic/lock.go +++ b/internal/restic/lock.go @@ -12,6 +12,7 @@ import ( "time" "github.com/restic/restic/internal/errors" + "golang.org/x/sync/errgroup" "github.com/restic/restic/internal/debug" ) @@ -138,12 +139,7 @@ func (l *Lock) fillUserInfo() error { // non-exclusive lock is to be created, an error is only returned when an // exclusive lock is found. func (l *Lock) checkForOtherLocks(ctx context.Context) error { - return l.repo.List(ctx, LockFile, func(id ID, size int64) error { - if l.lockID != nil && id.Equal(*l.lockID) { - return nil - } - - lock, err := LoadLock(ctx, l.repo, id) + return ForAllLocks(ctx, l.repo, l.lockID, func(id ID, lock *Lock, err error) error { if err != nil { // ignore locks that cannot be loaded debug.Log("ignore lock %v: %v", id, err) @@ -275,8 +271,7 @@ func LoadLock(ctx context.Context, repo Repository, id ID) (*Lock, error) { // RemoveStaleLocks deletes all locks detected as stale from the repository. func RemoveStaleLocks(ctx context.Context, repo Repository) error { - return repo.List(ctx, LockFile, func(id ID, size int64) error { - lock, err := LoadLock(ctx, repo, id) + return ForAllLocks(ctx, repo, nil, func(id ID, lock *Lock, err error) error { if err != nil { // ignore locks that cannot be loaded debug.Log("ignore lock %v: %v", id, err) @@ -297,3 +292,59 @@ func RemoveAllLocks(ctx context.Context, repo Repository) error { return repo.Backend().Remove(ctx, Handle{Type: LockFile, Name: id.String()}) }) } + +const loadLockParallelism = 5 + +// ForAllLocks reads all locks in parallel and calls the given callback. +// It is guaranteed that the function is not run concurrently. If the +// callback returns an error, this function is cancelled and also returns that error. +// If a lock ID is passed via excludeID, it will be ignored. +func ForAllLocks(ctx context.Context, repo Repository, excludeID *ID, fn func(ID, *Lock, error) error) error { + var m sync.Mutex + + // track spawned goroutines using wg, create a new context which is + // cancelled as soon as an error occurs. + wg, ctx := errgroup.WithContext(ctx) + + ch := make(chan ID) + + // send list of lock files through ch, which is closed afterwards + wg.Go(func() error { + defer close(ch) + return repo.List(ctx, LockFile, func(id ID, size int64) error { + if excludeID != nil && id.Equal(*excludeID) { + return nil + } + + select { + case <-ctx.Done(): + return nil + case ch <- id: + } + return nil + }) + }) + + // a worker receives an snapshot ID from ch, loads the snapshot + // and runs fn with id, the snapshot and the error + worker := func() error { + for id := range ch { + debug.Log("load lock %v", id) + lock, err := LoadLock(ctx, repo, id) + + m.Lock() + err = fn(id, lock, err) + m.Unlock() + if err != nil { + return err + } + } + return nil + } + + for i := 0; i < loadLockParallelism; i++ { + wg.Go(worker) + } + + return wg.Wait() +}