mirror of
https://github.com/octoleo/restic.git
synced 2024-12-22 10:58:55 +00:00
lock: freeze backend operations while refreshing stale lock
Freeze new backend operations while trying to refresh a stale lock.
This commit is contained in:
parent
51718ec561
commit
f490288738
@ -115,7 +115,7 @@ retryLoop:
|
||||
globalLocks.Lock()
|
||||
globalLocks.locks[lock] = lockInfo
|
||||
go refreshLocks(ctx, lockInfo, refreshChan, forcedRefreshChan)
|
||||
go monitorLockRefresh(ctx, lockInfo, refreshChan, forcedRefreshChan)
|
||||
go monitorLockRefresh(ctx, repo.Backend(), lockInfo, refreshChan, forcedRefreshChan)
|
||||
globalLocks.Unlock()
|
||||
|
||||
return lock, ctx, err
|
||||
@ -180,7 +180,7 @@ func refreshLocks(ctx context.Context, lockInfo *lockContext, refreshed chan<- s
|
||||
}
|
||||
}
|
||||
|
||||
func monitorLockRefresh(ctx context.Context, lockInfo *lockContext, refreshed <-chan struct{}, forcedRefresh chan<- struct{}) {
|
||||
func monitorLockRefresh(ctx context.Context, backend restic.Backend, lockInfo *lockContext, refreshed <-chan struct{}, forcedRefresh chan<- struct{}) {
|
||||
// time.Now() might use a monotonic timer which is paused during standby
|
||||
// convert to unix time to ensure we compare real time values
|
||||
lastRefresh := time.Now().UnixNano()
|
||||
@ -212,7 +212,7 @@ func monitorLockRefresh(ctx context.Context, lockInfo *lockContext, refreshed <-
|
||||
}
|
||||
|
||||
// keep on going if our current lock still exists
|
||||
if tryRefreshStaleLock(ctx, lockInfo.lock) {
|
||||
if tryRefreshStaleLock(ctx, backend, lockInfo.lock, lockInfo.cancel) {
|
||||
lastRefresh = time.Now().UnixNano()
|
||||
|
||||
// inform refresh gorountine about forced refresh
|
||||
@ -229,10 +229,19 @@ func monitorLockRefresh(ctx context.Context, lockInfo *lockContext, refreshed <-
|
||||
}
|
||||
}
|
||||
|
||||
func tryRefreshStaleLock(ctx context.Context, lock *restic.Lock) bool {
|
||||
func tryRefreshStaleLock(ctx context.Context, backend restic.Backend, lock *restic.Lock, cancel context.CancelFunc) bool {
|
||||
freeze := restic.AsBackend[restic.FreezeBackend](backend)
|
||||
if freeze != nil {
|
||||
debug.Log("freezing backend")
|
||||
freeze.Freeze()
|
||||
defer freeze.Unfreeze()
|
||||
}
|
||||
|
||||
err := lock.RefreshStaleLock(ctx)
|
||||
if err != nil {
|
||||
Warnf("failed to refresh stale lock: %v\n", err)
|
||||
// cancel context while the backend is still frozen to prevent accidental modifications
|
||||
cancel()
|
||||
return false
|
||||
}
|
||||
|
||||
|
@ -3,6 +3,7 @@ package sema
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"sync"
|
||||
|
||||
"github.com/cenkalti/backoff/v4"
|
||||
"github.com/restic/restic/internal/errors"
|
||||
@ -15,7 +16,8 @@ var _ restic.Backend = &connectionLimitedBackend{}
|
||||
// connectionLimitedBackend limits the number of concurrent operations.
|
||||
type connectionLimitedBackend struct {
|
||||
restic.Backend
|
||||
sem semaphore
|
||||
sem semaphore
|
||||
freezeLock sync.Mutex
|
||||
}
|
||||
|
||||
// NewBackend creates a backend that limits the concurrent operations on the underlying backend
|
||||
@ -39,9 +41,23 @@ func (be *connectionLimitedBackend) typeDependentLimit(t restic.FileType) func()
|
||||
return func() {}
|
||||
}
|
||||
be.sem.GetToken()
|
||||
// prevent token usage while the backend is frozen
|
||||
be.freezeLock.Lock()
|
||||
defer be.freezeLock.Unlock()
|
||||
|
||||
return be.sem.ReleaseToken
|
||||
}
|
||||
|
||||
// Freeze blocks all backend operations except those on lock files
|
||||
func (be *connectionLimitedBackend) Freeze() {
|
||||
be.freezeLock.Lock()
|
||||
}
|
||||
|
||||
// Unfreeze allows all backend operations to continue
|
||||
func (be *connectionLimitedBackend) Unfreeze() {
|
||||
be.freezeLock.Unlock()
|
||||
}
|
||||
|
||||
// Save adds new Data to the backend.
|
||||
func (be *connectionLimitedBackend) Save(ctx context.Context, h restic.Handle, rd restic.RewindReader) error {
|
||||
if err := h.Valid(); err != nil {
|
||||
@ -50,6 +66,10 @@ func (be *connectionLimitedBackend) Save(ctx context.Context, h restic.Handle, r
|
||||
|
||||
defer be.typeDependentLimit(h.Type)()
|
||||
|
||||
if ctx.Err() != nil {
|
||||
return ctx.Err()
|
||||
}
|
||||
|
||||
return be.Backend.Save(ctx, h, rd)
|
||||
}
|
||||
|
||||
@ -68,6 +88,10 @@ func (be *connectionLimitedBackend) Load(ctx context.Context, h restic.Handle, l
|
||||
|
||||
defer be.typeDependentLimit(h.Type)()
|
||||
|
||||
if ctx.Err() != nil {
|
||||
return ctx.Err()
|
||||
}
|
||||
|
||||
return be.Backend.Load(ctx, h, length, offset, fn)
|
||||
}
|
||||
|
||||
@ -79,6 +103,10 @@ func (be *connectionLimitedBackend) Stat(ctx context.Context, h restic.Handle) (
|
||||
|
||||
defer be.typeDependentLimit(h.Type)()
|
||||
|
||||
if ctx.Err() != nil {
|
||||
return restic.FileInfo{}, ctx.Err()
|
||||
}
|
||||
|
||||
return be.Backend.Stat(ctx, h)
|
||||
}
|
||||
|
||||
@ -90,6 +118,10 @@ func (be *connectionLimitedBackend) Remove(ctx context.Context, h restic.Handle)
|
||||
|
||||
defer be.typeDependentLimit(h.Type)()
|
||||
|
||||
if ctx.Err() != nil {
|
||||
return ctx.Err()
|
||||
}
|
||||
|
||||
return be.Backend.Remove(ctx, h)
|
||||
}
|
||||
|
||||
|
@ -92,6 +92,14 @@ func AsBackend[B Backend](b Backend) B {
|
||||
return be
|
||||
}
|
||||
|
||||
type FreezeBackend interface {
|
||||
Backend
|
||||
// Freeze blocks all backend operations except those on lock files
|
||||
Freeze()
|
||||
// Unfreeze allows all backend operations to continue
|
||||
Unfreeze()
|
||||
}
|
||||
|
||||
// FileInfo is contains information about a file in the backend.
|
||||
type FileInfo struct {
|
||||
Size int64
|
||||
|
Loading…
Reference in New Issue
Block a user