mirror of
https://github.com/octoleo/restic.git
synced 2024-11-25 14:17:42 +00:00
lock: try refreshing of stale locks
A stale lock may be refreshed if it continues to exist until after a replacement lock has been created. This ensures that a repository was not unlocked in the meantime.
This commit is contained in:
parent
60d8066568
commit
bee3231ed4
@ -108,11 +108,12 @@ retryLoop:
|
|||||||
}
|
}
|
||||||
lockInfo.refreshWG.Add(2)
|
lockInfo.refreshWG.Add(2)
|
||||||
refreshChan := make(chan struct{})
|
refreshChan := make(chan struct{})
|
||||||
|
forcedRefreshChan := make(chan struct{})
|
||||||
|
|
||||||
globalLocks.Lock()
|
globalLocks.Lock()
|
||||||
globalLocks.locks[lock] = lockInfo
|
globalLocks.locks[lock] = lockInfo
|
||||||
go refreshLocks(ctx, lock, lockInfo, refreshChan)
|
go refreshLocks(ctx, lock, lockInfo, refreshChan, forcedRefreshChan)
|
||||||
go monitorLockRefresh(ctx, lockInfo, refreshChan)
|
go monitorLockRefresh(ctx, lock, lockInfo, refreshChan, forcedRefreshChan)
|
||||||
globalLocks.Unlock()
|
globalLocks.Unlock()
|
||||||
|
|
||||||
return lock, ctx, err
|
return lock, ctx, err
|
||||||
@ -124,7 +125,7 @@ var refreshInterval = 5 * time.Minute
|
|||||||
// the difference allows to compensate for a small time drift between clients.
|
// the difference allows to compensate for a small time drift between clients.
|
||||||
var refreshabilityTimeout = restic.StaleLockTimeout - refreshInterval*3/2
|
var refreshabilityTimeout = restic.StaleLockTimeout - refreshInterval*3/2
|
||||||
|
|
||||||
func refreshLocks(ctx context.Context, lock *restic.Lock, lockInfo *lockContext, refreshed chan<- struct{}) {
|
func refreshLocks(ctx context.Context, lock *restic.Lock, lockInfo *lockContext, refreshed chan<- struct{}, forcedRefresh <-chan struct{}) {
|
||||||
debug.Log("start")
|
debug.Log("start")
|
||||||
ticker := time.NewTicker(refreshInterval)
|
ticker := time.NewTicker(refreshInterval)
|
||||||
lastRefresh := lock.Time
|
lastRefresh := lock.Time
|
||||||
@ -149,6 +150,11 @@ func refreshLocks(ctx context.Context, lock *restic.Lock, lockInfo *lockContext,
|
|||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
debug.Log("terminate")
|
debug.Log("terminate")
|
||||||
return
|
return
|
||||||
|
|
||||||
|
case <-forcedRefresh:
|
||||||
|
// update lock refresh time
|
||||||
|
lastRefresh = lock.Time
|
||||||
|
|
||||||
case <-ticker.C:
|
case <-ticker.C:
|
||||||
if time.Since(lastRefresh) > refreshabilityTimeout {
|
if time.Since(lastRefresh) > refreshabilityTimeout {
|
||||||
// the lock is too old, wait until the expiry monitor cancels the context
|
// the lock is too old, wait until the expiry monitor cancels the context
|
||||||
@ -161,7 +167,7 @@ func refreshLocks(ctx context.Context, lock *restic.Lock, lockInfo *lockContext,
|
|||||||
Warnf("unable to refresh lock: %v\n", err)
|
Warnf("unable to refresh lock: %v\n", err)
|
||||||
} else {
|
} else {
|
||||||
lastRefresh = lock.Time
|
lastRefresh = lock.Time
|
||||||
// inform monitor gorountine about successful refresh
|
// inform monitor goroutine about successful refresh
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
case refreshed <- struct{}{}:
|
case refreshed <- struct{}{}:
|
||||||
@ -171,7 +177,7 @@ func refreshLocks(ctx context.Context, lock *restic.Lock, lockInfo *lockContext,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func monitorLockRefresh(ctx context.Context, lockInfo *lockContext, refreshed <-chan struct{}) {
|
func monitorLockRefresh(ctx context.Context, lock *restic.Lock, lockInfo *lockContext, refreshed <-chan struct{}, forcedRefresh chan<- struct{}) {
|
||||||
// time.Now() might use a monotonic timer which is paused during standby
|
// time.Now() might use a monotonic timer which is paused during standby
|
||||||
// convert to unix time to ensure we compare real time values
|
// convert to unix time to ensure we compare real time values
|
||||||
lastRefresh := time.Now().UnixNano()
|
lastRefresh := time.Now().UnixNano()
|
||||||
@ -183,9 +189,9 @@ func monitorLockRefresh(ctx context.Context, lockInfo *lockContext, refreshed <-
|
|||||||
// timers are paused during standby, which is a problem as the refresh timeout
|
// timers are paused during standby, which is a problem as the refresh timeout
|
||||||
// _must_ expire if the host was too long in standby. Thus fall back to periodic checks
|
// _must_ expire if the host was too long in standby. Thus fall back to periodic checks
|
||||||
// https://github.com/golang/go/issues/35012
|
// https://github.com/golang/go/issues/35012
|
||||||
timer := time.NewTimer(pollDuration)
|
ticker := time.NewTicker(pollDuration)
|
||||||
defer func() {
|
defer func() {
|
||||||
timer.Stop()
|
ticker.Stop()
|
||||||
lockInfo.cancel()
|
lockInfo.cancel()
|
||||||
lockInfo.refreshWG.Done()
|
lockInfo.refreshWG.Done()
|
||||||
}()
|
}()
|
||||||
@ -197,10 +203,20 @@ func monitorLockRefresh(ctx context.Context, lockInfo *lockContext, refreshed <-
|
|||||||
return
|
return
|
||||||
case <-refreshed:
|
case <-refreshed:
|
||||||
lastRefresh = time.Now().UnixNano()
|
lastRefresh = time.Now().UnixNano()
|
||||||
case <-timer.C:
|
case <-ticker.C:
|
||||||
if time.Now().UnixNano()-lastRefresh < refreshabilityTimeout.Nanoseconds() {
|
if time.Now().UnixNano()-lastRefresh < refreshabilityTimeout.Nanoseconds() {
|
||||||
// restart timer
|
continue
|
||||||
timer.Reset(pollDuration)
|
}
|
||||||
|
|
||||||
|
// keep on going if our current lock still exists
|
||||||
|
if tryRefreshStaleLock(ctx, lock) {
|
||||||
|
lastRefresh = time.Now().UnixNano()
|
||||||
|
|
||||||
|
// inform refresh gorountine about forced refresh
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
case forcedRefresh <- struct{}{}:
|
||||||
|
}
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -210,6 +226,16 @@ func monitorLockRefresh(ctx context.Context, lockInfo *lockContext, refreshed <-
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func tryRefreshStaleLock(ctx context.Context, lock *restic.Lock) bool {
|
||||||
|
err := lock.RefreshStaleLock(ctx)
|
||||||
|
if err != nil {
|
||||||
|
Warnf("failed to refresh stale lock: %v\n", err)
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
func unlockRepo(lock *restic.Lock) {
|
func unlockRepo(lock *restic.Lock) {
|
||||||
if lock == nil {
|
if lock == nil {
|
||||||
return
|
return
|
||||||
|
@ -81,6 +81,8 @@ func IsInvalidLock(err error) bool {
|
|||||||
return errors.As(err, &e)
|
return errors.As(err, &e)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var ErrRemovedLock = errors.New("lock file was removed in the meantime")
|
||||||
|
|
||||||
// NewLock returns a new, non-exclusive lock for the repository. If an
|
// NewLock returns a new, non-exclusive lock for the repository. If an
|
||||||
// exclusive lock is already held by another process, it returns an error
|
// exclusive lock is already held by another process, it returns an error
|
||||||
// that satisfies IsAlreadyLocked.
|
// that satisfies IsAlreadyLocked.
|
||||||
@ -274,6 +276,68 @@ func (l *Lock) Refresh(ctx context.Context) error {
|
|||||||
return l.repo.Backend().Remove(context.TODO(), Handle{Type: LockFile, Name: oldLockID.String()})
|
return l.repo.Backend().Remove(context.TODO(), Handle{Type: LockFile, Name: oldLockID.String()})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// RefreshStaleLock is an extended variant of Refresh that can also refresh stale lock files.
|
||||||
|
func (l *Lock) RefreshStaleLock(ctx context.Context) error {
|
||||||
|
debug.Log("refreshing stale lock %v", l.lockID)
|
||||||
|
// refreshing a stale lock is possible if it still exists and continues to do
|
||||||
|
// so until after creating a new lock. The initial check avoids creating a new
|
||||||
|
// lock file if this lock was already removed in the meantime.
|
||||||
|
exists, err := l.checkExistence(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
} else if !exists {
|
||||||
|
return ErrRemovedLock
|
||||||
|
}
|
||||||
|
|
||||||
|
l.lock.Lock()
|
||||||
|
l.Time = time.Now()
|
||||||
|
l.lock.Unlock()
|
||||||
|
id, err := l.createLock(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
time.Sleep(waitBeforeLockCheck)
|
||||||
|
|
||||||
|
exists, err = l.checkExistence(ctx)
|
||||||
|
if err != nil {
|
||||||
|
// cleanup replacement lock
|
||||||
|
_ = l.repo.Backend().Remove(context.TODO(), Handle{Type: LockFile, Name: id.String()})
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
l.lock.Lock()
|
||||||
|
defer l.lock.Unlock()
|
||||||
|
|
||||||
|
if !exists {
|
||||||
|
// cleanup replacement lock
|
||||||
|
_ = l.repo.Backend().Remove(context.TODO(), Handle{Type: LockFile, Name: id.String()})
|
||||||
|
return ErrRemovedLock
|
||||||
|
}
|
||||||
|
|
||||||
|
debug.Log("new lock ID %v", id)
|
||||||
|
oldLockID := l.lockID
|
||||||
|
l.lockID = &id
|
||||||
|
|
||||||
|
return l.repo.Backend().Remove(context.TODO(), Handle{Type: LockFile, Name: oldLockID.String()})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *Lock) checkExistence(ctx context.Context) (bool, error) {
|
||||||
|
l.lock.Lock()
|
||||||
|
defer l.lock.Unlock()
|
||||||
|
|
||||||
|
exists := false
|
||||||
|
|
||||||
|
err := l.repo.Backend().List(ctx, LockFile, func(fi FileInfo) error {
|
||||||
|
if fi.Name == l.lockID.String() {
|
||||||
|
exists = true
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
|
return exists, err
|
||||||
|
}
|
||||||
|
|
||||||
func (l *Lock) String() string {
|
func (l *Lock) String() string {
|
||||||
l.lock.Lock()
|
l.lock.Lock()
|
||||||
defer l.lock.Unlock()
|
defer l.lock.Unlock()
|
||||||
|
Loading…
Reference in New Issue
Block a user