2
2
mirror of https://github.com/octoleo/restic.git synced 2025-01-11 02:08:44 +00:00

restic: parallelize lock file loading

This commit is contained in:
Michael Eischer 2020-12-18 20:46:16 +01:00
parent 22260d130d
commit 34b6130a0e

View File

@ -12,6 +12,7 @@ import (
"time" "time"
"github.com/restic/restic/internal/errors" "github.com/restic/restic/internal/errors"
"golang.org/x/sync/errgroup"
"github.com/restic/restic/internal/debug" "github.com/restic/restic/internal/debug"
) )
@ -138,12 +139,7 @@ func (l *Lock) fillUserInfo() error {
// non-exclusive lock is to be created, an error is only returned when an // non-exclusive lock is to be created, an error is only returned when an
// exclusive lock is found. // exclusive lock is found.
func (l *Lock) checkForOtherLocks(ctx context.Context) error { func (l *Lock) checkForOtherLocks(ctx context.Context) error {
return l.repo.List(ctx, LockFile, func(id ID, size int64) error { return ForAllLocks(ctx, l.repo, l.lockID, func(id ID, lock *Lock, err error) error {
if l.lockID != nil && id.Equal(*l.lockID) {
return nil
}
lock, err := LoadLock(ctx, l.repo, id)
if err != nil { if err != nil {
// ignore locks that cannot be loaded // ignore locks that cannot be loaded
debug.Log("ignore lock %v: %v", id, err) debug.Log("ignore lock %v: %v", id, err)
@ -275,8 +271,7 @@ func LoadLock(ctx context.Context, repo Repository, id ID) (*Lock, error) {
// RemoveStaleLocks deletes all locks detected as stale from the repository. // RemoveStaleLocks deletes all locks detected as stale from the repository.
func RemoveStaleLocks(ctx context.Context, repo Repository) error { func RemoveStaleLocks(ctx context.Context, repo Repository) error {
return repo.List(ctx, LockFile, func(id ID, size int64) error { return ForAllLocks(ctx, repo, nil, func(id ID, lock *Lock, err error) error {
lock, err := LoadLock(ctx, repo, id)
if err != nil { if err != nil {
// ignore locks that cannot be loaded // ignore locks that cannot be loaded
debug.Log("ignore lock %v: %v", id, err) debug.Log("ignore lock %v: %v", id, err)
@ -297,3 +292,59 @@ func RemoveAllLocks(ctx context.Context, repo Repository) error {
return repo.Backend().Remove(ctx, Handle{Type: LockFile, Name: id.String()}) return repo.Backend().Remove(ctx, Handle{Type: LockFile, Name: id.String()})
}) })
} }
const loadLockParallelism = 5
// ForAllLocks reads all locks in parallel and calls the given callback.
// It is guaranteed that the function is not run concurrently. If the
// callback returns an error, this function is cancelled and also returns that error.
// If a lock ID is passed via excludeID, it will be ignored.
func ForAllLocks(ctx context.Context, repo Repository, excludeID *ID, fn func(ID, *Lock, error) error) error {
var m sync.Mutex
// track spawned goroutines using wg, create a new context which is
// cancelled as soon as an error occurs.
wg, ctx := errgroup.WithContext(ctx)
ch := make(chan ID)
// send list of lock files through ch, which is closed afterwards
wg.Go(func() error {
defer close(ch)
return repo.List(ctx, LockFile, func(id ID, size int64) error {
if excludeID != nil && id.Equal(*excludeID) {
return nil
}
select {
case <-ctx.Done():
return nil
case ch <- id:
}
return nil
})
})
// a worker receives an snapshot ID from ch, loads the snapshot
// and runs fn with id, the snapshot and the error
worker := func() error {
for id := range ch {
debug.Log("load lock %v", id)
lock, err := LoadLock(ctx, repo, id)
m.Lock()
err = fn(id, lock, err)
m.Unlock()
if err != nil {
return err
}
}
return nil
}
for i := 0; i < loadLockParallelism; i++ {
wg.Go(worker)
}
return wg.Wait()
}