2
2
mirror of https://github.com/octoleo/restic.git synced 2024-09-28 06:29:01 +00:00

lock: Do not limit backend concurrency for lock files

restic must be able to refresh lock files in time. However, large
uploads over slow connections can cause the lock refresh to be stuck
behind the large uploads and thus time out.
This commit is contained in:
Michael Eischer 2023-04-23 12:16:54 +02:00
parent 41cc320145
commit d05f6211d1
3 changed files with 46 additions and 15 deletions

View File

@ -0,0 +1,5 @@
Bugfix: Avoid lock refresh issues with slow network connections
On network connections with a low upload speed, restic could often fail backups and other operations with `Fatal: failed to refresh lock in time`. We've reworked the lock refresh to avoid this error.
https://github.com/restic/restic/pull/4304

View File

@ -31,14 +31,24 @@ func NewBackend(be restic.Backend) restic.Backend {
} }
} }
// typeDependentLimit acquire a token unless the FileType is a lock file. The returned function
// must be called to release the token.
func (be *connectionLimitedBackend) typeDependentLimit(t restic.FileType) func() {
// allow concurrent lock file operations to ensure that the lock refresh is always possible
if t == restic.LockFile {
return func() {}
}
be.sem.GetToken()
return be.sem.ReleaseToken
}
// Save adds new Data to the backend. // Save adds new Data to the backend.
func (be *connectionLimitedBackend) Save(ctx context.Context, h restic.Handle, rd restic.RewindReader) error { func (be *connectionLimitedBackend) Save(ctx context.Context, h restic.Handle, rd restic.RewindReader) error {
if err := h.Valid(); err != nil { if err := h.Valid(); err != nil {
return backoff.Permanent(err) return backoff.Permanent(err)
} }
be.sem.GetToken() defer be.typeDependentLimit(h.Type)()
defer be.sem.ReleaseToken()
return be.Backend.Save(ctx, h, rd) return be.Backend.Save(ctx, h, rd)
} }
@ -56,8 +66,7 @@ func (be *connectionLimitedBackend) Load(ctx context.Context, h restic.Handle, l
return backoff.Permanent(errors.Errorf("invalid length %d", length)) return backoff.Permanent(errors.Errorf("invalid length %d", length))
} }
be.sem.GetToken() defer be.typeDependentLimit(h.Type)()
defer be.sem.ReleaseToken()
return be.Backend.Load(ctx, h, length, offset, fn) return be.Backend.Load(ctx, h, length, offset, fn)
} }
@ -68,8 +77,7 @@ func (be *connectionLimitedBackend) Stat(ctx context.Context, h restic.Handle) (
return restic.FileInfo{}, backoff.Permanent(err) return restic.FileInfo{}, backoff.Permanent(err)
} }
be.sem.GetToken() defer be.typeDependentLimit(h.Type)()
defer be.sem.ReleaseToken()
return be.Backend.Stat(ctx, h) return be.Backend.Stat(ctx, h)
} }
@ -80,8 +88,7 @@ func (be *connectionLimitedBackend) Remove(ctx context.Context, h restic.Handle)
return backoff.Permanent(err) return backoff.Permanent(err)
} }
be.sem.GetToken() defer be.typeDependentLimit(h.Type)()
defer be.sem.ReleaseToken()
return be.Backend.Remove(ctx, h) return be.Backend.Remove(ctx, h)
} }

View File

@ -88,7 +88,7 @@ func countingBlocker() (func(), func(int) int) {
unblock := func(expected int) int { unblock := func(expected int) int {
// give goroutines enough time to block // give goroutines enough time to block
var blocked int64 var blocked int64
for i := 0; i < 100 && blocked != int64(expected); i++ { for i := 0; i < 100 && blocked < int64(expected); i++ {
time.Sleep(100 * time.Microsecond) time.Sleep(100 * time.Microsecond)
blocked = atomic.LoadInt64(&ctr) blocked = atomic.LoadInt64(&ctr)
} }
@ -99,8 +99,9 @@ func countingBlocker() (func(), func(int) int) {
return wait, unblock return wait, unblock
} }
func concurrencyTester(t *testing.T, setup func(m *mock.Backend), handler func(be restic.Backend) func() error, unblock func(int) int) { func concurrencyTester(t *testing.T, setup func(m *mock.Backend), handler func(be restic.Backend) func() error, unblock func(int) int, isUnlimited bool) {
expectBlocked := int(2) expectBlocked := int(2)
workerCount := expectBlocked + 1
m := mock.NewBackend() m := mock.NewBackend()
setup(m) setup(m)
@ -108,10 +109,13 @@ func concurrencyTester(t *testing.T, setup func(m *mock.Backend), handler func(b
be := sema.NewBackend(m) be := sema.NewBackend(m)
var wg errgroup.Group var wg errgroup.Group
for i := 0; i < int(expectBlocked+1); i++ { for i := 0; i < workerCount; i++ {
wg.Go(handler(be)) wg.Go(handler(be))
} }
if isUnlimited {
expectBlocked = workerCount
}
blocked := unblock(expectBlocked) blocked := unblock(expectBlocked)
test.Assert(t, blocked == expectBlocked, "Unexpected number of goroutines blocked: %v", blocked) test.Assert(t, blocked == expectBlocked, "Unexpected number of goroutines blocked: %v", blocked)
test.OK(t, wg.Wait()) test.OK(t, wg.Wait())
@ -129,7 +133,7 @@ func TestConcurrencyLimitSave(t *testing.T) {
h := restic.Handle{Type: restic.PackFile, Name: "foobar"} h := restic.Handle{Type: restic.PackFile, Name: "foobar"}
return be.Save(context.TODO(), h, nil) return be.Save(context.TODO(), h, nil)
} }
}, unblock) }, unblock, false)
} }
func TestConcurrencyLimitLoad(t *testing.T) { func TestConcurrencyLimitLoad(t *testing.T) {
@ -145,7 +149,7 @@ func TestConcurrencyLimitLoad(t *testing.T) {
nilCb := func(rd io.Reader) error { return nil } nilCb := func(rd io.Reader) error { return nil }
return be.Load(context.TODO(), h, 10, 0, nilCb) return be.Load(context.TODO(), h, 10, 0, nilCb)
} }
}, unblock) }, unblock, false)
} }
func TestConcurrencyLimitStat(t *testing.T) { func TestConcurrencyLimitStat(t *testing.T) {
@ -161,7 +165,7 @@ func TestConcurrencyLimitStat(t *testing.T) {
_, err := be.Stat(context.TODO(), h) _, err := be.Stat(context.TODO(), h)
return err return err
} }
}, unblock) }, unblock, false)
} }
func TestConcurrencyLimitDelete(t *testing.T) { func TestConcurrencyLimitDelete(t *testing.T) {
@ -176,5 +180,20 @@ func TestConcurrencyLimitDelete(t *testing.T) {
h := restic.Handle{Type: restic.PackFile, Name: "foobar"} h := restic.Handle{Type: restic.PackFile, Name: "foobar"}
return be.Remove(context.TODO(), h) return be.Remove(context.TODO(), h)
} }
}, unblock) }, unblock, false)
}
func TestConcurrencyUnlimitedLockSave(t *testing.T) {
wait, unblock := countingBlocker()
concurrencyTester(t, func(m *mock.Backend) {
m.SaveFn = func(ctx context.Context, h restic.Handle, rd restic.RewindReader) error {
wait()
return nil
}
}, func(be restic.Backend) func() error {
return func() error {
h := restic.Handle{Type: restic.LockFile, Name: "foobar"}
return be.Save(context.TODO(), h, nil)
}
}, unblock, true)
} }