mirror of
https://github.com/octoleo/restic.git
synced 2024-12-22 10:58:55 +00:00
Merge pull request #4374 from MichaelEischer/try-refresh-stale-locks
Try to refresh stale locks
This commit is contained in:
commit
bdaec8fdb8
10
changelog/unreleased/issue-4274
Normal file
10
changelog/unreleased/issue-4274
Normal file
@ -0,0 +1,10 @@
|
||||
Bugfix: Improve lock refresh handling when using standby
|
||||
|
||||
If the restic process was stopped or the host running restic entered standby
|
||||
during a long running operation such as a backup, this resulted in the
|
||||
operation failing with `Fatal: failed to refresh lock in time`. We've reworked
|
||||
the lock refresh such that restic first checks whether it is safe to continue
|
||||
the current operation and only throws an error if not.
|
||||
|
||||
https://github.com/restic/restic/issues/4274
|
||||
https://github.com/restic/restic/pull/4374
|
@ -506,10 +506,13 @@ func runBackup(ctx context.Context, opts BackupOptions, gopts GlobalOptions, ter
|
||||
if !gopts.JSON {
|
||||
progressPrinter.V("lock repository")
|
||||
}
|
||||
lock, ctx, err := lockRepo(ctx, repo, gopts.RetryLock, gopts.JSON)
|
||||
defer unlockRepo(lock)
|
||||
if err != nil {
|
||||
return err
|
||||
if !opts.DryRun {
|
||||
var lock *restic.Lock
|
||||
lock, ctx, err = lockRepo(ctx, repo, gopts.RetryLock, gopts.JSON)
|
||||
defer unlockRepo(lock)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// rejectByNameFuncs collect functions that can reject items from the backup based on path only
|
||||
|
@ -12,6 +12,7 @@ import (
|
||||
)
|
||||
|
||||
type lockContext struct {
|
||||
lock *restic.Lock
|
||||
cancel context.CancelFunc
|
||||
refreshWG sync.WaitGroup
|
||||
}
|
||||
@ -104,15 +105,17 @@ retryLoop:
|
||||
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
lockInfo := &lockContext{
|
||||
lock: lock,
|
||||
cancel: cancel,
|
||||
}
|
||||
lockInfo.refreshWG.Add(2)
|
||||
refreshChan := make(chan struct{})
|
||||
forceRefreshChan := make(chan refreshLockRequest)
|
||||
|
||||
globalLocks.Lock()
|
||||
globalLocks.locks[lock] = lockInfo
|
||||
go refreshLocks(ctx, lock, lockInfo, refreshChan)
|
||||
go monitorLockRefresh(ctx, lockInfo, refreshChan)
|
||||
go refreshLocks(ctx, repo.Backend(), lockInfo, refreshChan, forceRefreshChan)
|
||||
go monitorLockRefresh(ctx, lockInfo, refreshChan, forceRefreshChan)
|
||||
globalLocks.Unlock()
|
||||
|
||||
return lock, ctx, err
|
||||
@ -124,8 +127,13 @@ var refreshInterval = 5 * time.Minute
|
||||
// the difference allows to compensate for a small time drift between clients.
|
||||
var refreshabilityTimeout = restic.StaleLockTimeout - refreshInterval*3/2
|
||||
|
||||
func refreshLocks(ctx context.Context, lock *restic.Lock, lockInfo *lockContext, refreshed chan<- struct{}) {
|
||||
type refreshLockRequest struct {
|
||||
result chan bool
|
||||
}
|
||||
|
||||
func refreshLocks(ctx context.Context, backend restic.Backend, lockInfo *lockContext, refreshed chan<- struct{}, forceRefresh <-chan refreshLockRequest) {
|
||||
debug.Log("start")
|
||||
lock := lockInfo.lock
|
||||
ticker := time.NewTicker(refreshInterval)
|
||||
lastRefresh := lock.Time
|
||||
|
||||
@ -149,6 +157,22 @@ func refreshLocks(ctx context.Context, lock *restic.Lock, lockInfo *lockContext,
|
||||
case <-ctx.Done():
|
||||
debug.Log("terminate")
|
||||
return
|
||||
|
||||
case req := <-forceRefresh:
|
||||
debug.Log("trying to refresh stale lock")
|
||||
// keep on going if our current lock still exists
|
||||
success := tryRefreshStaleLock(ctx, backend, lock, lockInfo.cancel)
|
||||
// inform refresh goroutine about forced refresh
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
case req.result <- success:
|
||||
}
|
||||
|
||||
if success {
|
||||
// update lock refresh time
|
||||
lastRefresh = lock.Time
|
||||
}
|
||||
|
||||
case <-ticker.C:
|
||||
if time.Since(lastRefresh) > refreshabilityTimeout {
|
||||
// the lock is too old, wait until the expiry monitor cancels the context
|
||||
@ -161,7 +185,7 @@ func refreshLocks(ctx context.Context, lock *restic.Lock, lockInfo *lockContext,
|
||||
Warnf("unable to refresh lock: %v\n", err)
|
||||
} else {
|
||||
lastRefresh = lock.Time
|
||||
// inform monitor gorountine about successful refresh
|
||||
// inform monitor goroutine about successful refresh
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
case refreshed <- struct{}{}:
|
||||
@ -171,7 +195,7 @@ func refreshLocks(ctx context.Context, lock *restic.Lock, lockInfo *lockContext,
|
||||
}
|
||||
}
|
||||
|
||||
func monitorLockRefresh(ctx context.Context, lockInfo *lockContext, refreshed <-chan struct{}) {
|
||||
func monitorLockRefresh(ctx context.Context, lockInfo *lockContext, refreshed <-chan struct{}, forceRefresh chan<- refreshLockRequest) {
|
||||
// time.Now() might use a monotonic timer which is paused during standby
|
||||
// convert to unix time to ensure we compare real time values
|
||||
lastRefresh := time.Now().UnixNano()
|
||||
@ -183,24 +207,47 @@ func monitorLockRefresh(ctx context.Context, lockInfo *lockContext, refreshed <-
|
||||
// timers are paused during standby, which is a problem as the refresh timeout
|
||||
// _must_ expire if the host was too long in standby. Thus fall back to periodic checks
|
||||
// https://github.com/golang/go/issues/35012
|
||||
timer := time.NewTimer(pollDuration)
|
||||
ticker := time.NewTicker(pollDuration)
|
||||
defer func() {
|
||||
timer.Stop()
|
||||
ticker.Stop()
|
||||
lockInfo.cancel()
|
||||
lockInfo.refreshWG.Done()
|
||||
}()
|
||||
|
||||
var refreshStaleLockResult chan bool
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
debug.Log("terminate expiry monitoring")
|
||||
return
|
||||
case <-refreshed:
|
||||
if refreshStaleLockResult != nil {
|
||||
// ignore delayed refresh notifications while the stale lock is refreshed
|
||||
continue
|
||||
}
|
||||
lastRefresh = time.Now().UnixNano()
|
||||
case <-timer.C:
|
||||
if time.Now().UnixNano()-lastRefresh < refreshabilityTimeout.Nanoseconds() {
|
||||
// restart timer
|
||||
timer.Reset(pollDuration)
|
||||
case <-ticker.C:
|
||||
if time.Now().UnixNano()-lastRefresh < refreshabilityTimeout.Nanoseconds() || refreshStaleLockResult != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
debug.Log("trying to refreshStaleLock")
|
||||
// keep on going if our current lock still exists
|
||||
refreshReq := refreshLockRequest{
|
||||
result: make(chan bool),
|
||||
}
|
||||
refreshStaleLockResult = refreshReq.result
|
||||
|
||||
// inform refresh goroutine about forced refresh
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
case forceRefresh <- refreshReq:
|
||||
}
|
||||
case success := <-refreshStaleLockResult:
|
||||
if success {
|
||||
lastRefresh = time.Now().UnixNano()
|
||||
refreshStaleLockResult = nil
|
||||
continue
|
||||
}
|
||||
|
||||
@ -210,6 +257,25 @@ func monitorLockRefresh(ctx context.Context, lockInfo *lockContext, refreshed <-
|
||||
}
|
||||
}
|
||||
|
||||
func tryRefreshStaleLock(ctx context.Context, backend restic.Backend, lock *restic.Lock, cancel context.CancelFunc) bool {
|
||||
freeze := restic.AsBackend[restic.FreezeBackend](backend)
|
||||
if freeze != nil {
|
||||
debug.Log("freezing backend")
|
||||
freeze.Freeze()
|
||||
defer freeze.Unfreeze()
|
||||
}
|
||||
|
||||
err := lock.RefreshStaleLock(ctx)
|
||||
if err != nil {
|
||||
Warnf("failed to refresh stale lock: %v\n", err)
|
||||
// cancel context while the backend is still frozen to prevent accidental modifications
|
||||
cancel()
|
||||
return false
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
func unlockRepo(lock *restic.Lock) {
|
||||
if lock == nil {
|
||||
return
|
||||
|
@ -5,16 +5,26 @@ import (
|
||||
"fmt"
|
||||
"runtime"
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/restic/restic/internal/backend/location"
|
||||
"github.com/restic/restic/internal/backend/mem"
|
||||
"github.com/restic/restic/internal/debug"
|
||||
"github.com/restic/restic/internal/repository"
|
||||
"github.com/restic/restic/internal/restic"
|
||||
"github.com/restic/restic/internal/test"
|
||||
)
|
||||
|
||||
func openTestRepo(t *testing.T, wrapper backendWrapper) (*repository.Repository, func(), *testEnvironment) {
|
||||
func openLockTestRepo(t *testing.T, wrapper backendWrapper) (*repository.Repository, func(), *testEnvironment) {
|
||||
env, cleanup := withTestEnvironment(t)
|
||||
|
||||
reg := location.NewRegistry()
|
||||
reg.Register(mem.NewFactory())
|
||||
env.gopts.backends = reg
|
||||
env.gopts.Repo = "mem:"
|
||||
|
||||
if wrapper != nil {
|
||||
env.gopts.backendTestHook = wrapper
|
||||
}
|
||||
@ -36,7 +46,7 @@ func checkedLockRepo(ctx context.Context, t *testing.T, repo restic.Repository,
|
||||
}
|
||||
|
||||
func TestLock(t *testing.T) {
|
||||
repo, cleanup, env := openTestRepo(t, nil)
|
||||
repo, cleanup, env := openLockTestRepo(t, nil)
|
||||
defer cleanup()
|
||||
|
||||
lock, wrappedCtx := checkedLockRepo(context.Background(), t, repo, env)
|
||||
@ -47,7 +57,7 @@ func TestLock(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestLockCancel(t *testing.T) {
|
||||
repo, cleanup, env := openTestRepo(t, nil)
|
||||
repo, cleanup, env := openLockTestRepo(t, nil)
|
||||
defer cleanup()
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
@ -63,7 +73,7 @@ func TestLockCancel(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestLockUnlockAll(t *testing.T) {
|
||||
repo, cleanup, env := openTestRepo(t, nil)
|
||||
repo, cleanup, env := openLockTestRepo(t, nil)
|
||||
defer cleanup()
|
||||
|
||||
lock, wrappedCtx := checkedLockRepo(context.Background(), t, repo, env)
|
||||
@ -78,7 +88,7 @@ func TestLockUnlockAll(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestLockConflict(t *testing.T) {
|
||||
repo, cleanup, env := openTestRepo(t, nil)
|
||||
repo, cleanup, env := openLockTestRepo(t, nil)
|
||||
defer cleanup()
|
||||
repo2, err := OpenRepository(context.TODO(), env.gopts)
|
||||
test.OK(t, err)
|
||||
@ -107,7 +117,7 @@ func (b *writeOnceBackend) Save(ctx context.Context, h restic.Handle, rd restic.
|
||||
}
|
||||
|
||||
func TestLockFailedRefresh(t *testing.T) {
|
||||
repo, cleanup, env := openTestRepo(t, func(r restic.Backend) (restic.Backend, error) {
|
||||
repo, cleanup, env := openLockTestRepo(t, func(r restic.Backend) (restic.Backend, error) {
|
||||
return &writeOnceBackend{Backend: r}, nil
|
||||
})
|
||||
defer cleanup()
|
||||
@ -145,7 +155,7 @@ func (b *loggingBackend) Save(ctx context.Context, h restic.Handle, rd restic.Re
|
||||
}
|
||||
|
||||
func TestLockSuccessfulRefresh(t *testing.T) {
|
||||
repo, cleanup, env := openTestRepo(t, func(r restic.Backend) (restic.Backend, error) {
|
||||
repo, cleanup, env := openLockTestRepo(t, func(r restic.Backend) (restic.Backend, error) {
|
||||
return &loggingBackend{
|
||||
Backend: r,
|
||||
t: t,
|
||||
@ -182,8 +192,71 @@ func TestLockSuccessfulRefresh(t *testing.T) {
|
||||
unlockRepo(lock)
|
||||
}
|
||||
|
||||
type slowBackend struct {
|
||||
restic.Backend
|
||||
m sync.Mutex
|
||||
sleep time.Duration
|
||||
}
|
||||
|
||||
func (b *slowBackend) Save(ctx context.Context, h restic.Handle, rd restic.RewindReader) error {
|
||||
b.m.Lock()
|
||||
sleep := b.sleep
|
||||
b.m.Unlock()
|
||||
time.Sleep(sleep)
|
||||
return b.Backend.Save(ctx, h, rd)
|
||||
}
|
||||
|
||||
func TestLockSuccessfulStaleRefresh(t *testing.T) {
|
||||
var sb *slowBackend
|
||||
repo, cleanup, env := openLockTestRepo(t, func(r restic.Backend) (restic.Backend, error) {
|
||||
sb = &slowBackend{Backend: r}
|
||||
return sb, nil
|
||||
})
|
||||
defer cleanup()
|
||||
|
||||
t.Logf("test for successful lock refresh %v", time.Now())
|
||||
// reduce locking intervals to be suitable for testing
|
||||
ri, rt := refreshInterval, refreshabilityTimeout
|
||||
refreshInterval = 10 * time.Millisecond
|
||||
refreshabilityTimeout = 50 * time.Millisecond
|
||||
defer func() {
|
||||
refreshInterval, refreshabilityTimeout = ri, rt
|
||||
}()
|
||||
|
||||
lock, wrappedCtx := checkedLockRepo(context.Background(), t, repo, env)
|
||||
// delay lock refreshing long enough that the lock would expire
|
||||
sb.m.Lock()
|
||||
sb.sleep = refreshabilityTimeout + refreshInterval
|
||||
sb.m.Unlock()
|
||||
|
||||
select {
|
||||
case <-wrappedCtx.Done():
|
||||
// don't call t.Fatal to allow the lock to be properly cleaned up
|
||||
t.Error("lock refresh failed", time.Now())
|
||||
|
||||
case <-time.After(refreshabilityTimeout):
|
||||
}
|
||||
// reset slow backend
|
||||
sb.m.Lock()
|
||||
sb.sleep = 0
|
||||
sb.m.Unlock()
|
||||
debug.Log("normal lock period has expired")
|
||||
|
||||
select {
|
||||
case <-wrappedCtx.Done():
|
||||
// don't call t.Fatal to allow the lock to be properly cleaned up
|
||||
t.Error("lock refresh failed", time.Now())
|
||||
|
||||
case <-time.After(3 * refreshabilityTimeout):
|
||||
// expected lock refresh to work
|
||||
}
|
||||
|
||||
// unlockRepo should not crash
|
||||
unlockRepo(lock)
|
||||
}
|
||||
|
||||
func TestLockWaitTimeout(t *testing.T) {
|
||||
repo, cleanup, env := openTestRepo(t, nil)
|
||||
repo, cleanup, env := openLockTestRepo(t, nil)
|
||||
defer cleanup()
|
||||
|
||||
elock, _, err := lockRepoExclusive(context.TODO(), repo, env.gopts.RetryLock, env.gopts.JSON)
|
||||
@ -205,8 +278,9 @@ func TestLockWaitTimeout(t *testing.T) {
|
||||
test.OK(t, lock.Unlock())
|
||||
test.OK(t, elock.Unlock())
|
||||
}
|
||||
|
||||
func TestLockWaitCancel(t *testing.T) {
|
||||
repo, cleanup, env := openTestRepo(t, nil)
|
||||
repo, cleanup, env := openLockTestRepo(t, nil)
|
||||
defer cleanup()
|
||||
|
||||
elock, _, err := lockRepoExclusive(context.TODO(), repo, env.gopts.RetryLock, env.gopts.JSON)
|
||||
@ -215,10 +289,10 @@ func TestLockWaitCancel(t *testing.T) {
|
||||
retryLock := 200 * time.Millisecond
|
||||
cancelAfter := 40 * time.Millisecond
|
||||
|
||||
start := time.Now()
|
||||
ctx, cancel := context.WithCancel(context.TODO())
|
||||
time.AfterFunc(cancelAfter, cancel)
|
||||
|
||||
start := time.Now()
|
||||
lock, _, err := lockRepo(ctx, repo, retryLock, env.gopts.JSON)
|
||||
duration := time.Since(start)
|
||||
|
||||
@ -227,14 +301,14 @@ func TestLockWaitCancel(t *testing.T) {
|
||||
test.Assert(t, strings.Contains(err.Error(), "context canceled"),
|
||||
"create normal lock with exclusively locked repo didn't return the correct error")
|
||||
test.Assert(t, cancelAfter <= duration && duration < retryLock-10*time.Millisecond,
|
||||
"create normal lock with exclusively locked repo didn't return in time")
|
||||
"create normal lock with exclusively locked repo didn't return in time, duration %v", duration)
|
||||
|
||||
test.OK(t, lock.Unlock())
|
||||
test.OK(t, elock.Unlock())
|
||||
}
|
||||
|
||||
func TestLockWaitSuccess(t *testing.T) {
|
||||
repo, cleanup, env := openTestRepo(t, nil)
|
||||
repo, cleanup, env := openLockTestRepo(t, nil)
|
||||
defer cleanup()
|
||||
|
||||
elock, _, err := lockRepoExclusive(context.TODO(), repo, env.gopts.RetryLock, env.gopts.JSON)
|
||||
|
@ -3,6 +3,7 @@ package sema
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"sync"
|
||||
|
||||
"github.com/cenkalti/backoff/v4"
|
||||
"github.com/restic/restic/internal/errors"
|
||||
@ -15,7 +16,8 @@ var _ restic.Backend = &connectionLimitedBackend{}
|
||||
// connectionLimitedBackend limits the number of concurrent operations.
|
||||
type connectionLimitedBackend struct {
|
||||
restic.Backend
|
||||
sem semaphore
|
||||
sem semaphore
|
||||
freezeLock sync.Mutex
|
||||
}
|
||||
|
||||
// NewBackend creates a backend that limits the concurrent operations on the underlying backend
|
||||
@ -39,9 +41,23 @@ func (be *connectionLimitedBackend) typeDependentLimit(t restic.FileType) func()
|
||||
return func() {}
|
||||
}
|
||||
be.sem.GetToken()
|
||||
// prevent token usage while the backend is frozen
|
||||
be.freezeLock.Lock()
|
||||
defer be.freezeLock.Unlock()
|
||||
|
||||
return be.sem.ReleaseToken
|
||||
}
|
||||
|
||||
// Freeze blocks all backend operations except those on lock files
|
||||
func (be *connectionLimitedBackend) Freeze() {
|
||||
be.freezeLock.Lock()
|
||||
}
|
||||
|
||||
// Unfreeze allows all backend operations to continue
|
||||
func (be *connectionLimitedBackend) Unfreeze() {
|
||||
be.freezeLock.Unlock()
|
||||
}
|
||||
|
||||
// Save adds new Data to the backend.
|
||||
func (be *connectionLimitedBackend) Save(ctx context.Context, h restic.Handle, rd restic.RewindReader) error {
|
||||
if err := h.Valid(); err != nil {
|
||||
@ -50,6 +66,10 @@ func (be *connectionLimitedBackend) Save(ctx context.Context, h restic.Handle, r
|
||||
|
||||
defer be.typeDependentLimit(h.Type)()
|
||||
|
||||
if ctx.Err() != nil {
|
||||
return ctx.Err()
|
||||
}
|
||||
|
||||
return be.Backend.Save(ctx, h, rd)
|
||||
}
|
||||
|
||||
@ -68,6 +88,10 @@ func (be *connectionLimitedBackend) Load(ctx context.Context, h restic.Handle, l
|
||||
|
||||
defer be.typeDependentLimit(h.Type)()
|
||||
|
||||
if ctx.Err() != nil {
|
||||
return ctx.Err()
|
||||
}
|
||||
|
||||
return be.Backend.Load(ctx, h, length, offset, fn)
|
||||
}
|
||||
|
||||
@ -79,6 +103,10 @@ func (be *connectionLimitedBackend) Stat(ctx context.Context, h restic.Handle) (
|
||||
|
||||
defer be.typeDependentLimit(h.Type)()
|
||||
|
||||
if ctx.Err() != nil {
|
||||
return restic.FileInfo{}, ctx.Err()
|
||||
}
|
||||
|
||||
return be.Backend.Stat(ctx, h)
|
||||
}
|
||||
|
||||
@ -90,6 +118,10 @@ func (be *connectionLimitedBackend) Remove(ctx context.Context, h restic.Handle)
|
||||
|
||||
defer be.typeDependentLimit(h.Type)()
|
||||
|
||||
if ctx.Err() != nil {
|
||||
return ctx.Err()
|
||||
}
|
||||
|
||||
return be.Backend.Remove(ctx, h)
|
||||
}
|
||||
|
||||
|
@ -3,6 +3,7 @@ package sema_test
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
@ -197,3 +198,38 @@ func TestConcurrencyUnlimitedLockSave(t *testing.T) {
|
||||
}
|
||||
}, unblock, true)
|
||||
}
|
||||
|
||||
func TestFreeze(t *testing.T) {
|
||||
var counter int64
|
||||
m := mock.NewBackend()
|
||||
m.SaveFn = func(ctx context.Context, h restic.Handle, rd restic.RewindReader) error {
|
||||
atomic.AddInt64(&counter, 1)
|
||||
return nil
|
||||
}
|
||||
m.ConnectionsFn = func() uint { return 2 }
|
||||
be := sema.NewBackend(m)
|
||||
fb := be.(restic.FreezeBackend)
|
||||
|
||||
// Freeze backend
|
||||
fb.Freeze()
|
||||
|
||||
// Start Save call that should block
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
h := restic.Handle{Type: restic.PackFile, Name: "foobar"}
|
||||
test.OK(t, be.Save(context.TODO(), h, nil))
|
||||
}()
|
||||
|
||||
// check
|
||||
time.Sleep(1 * time.Millisecond)
|
||||
val := atomic.LoadInt64(&counter)
|
||||
test.Assert(t, val == 0, "save call worked despite frozen backend")
|
||||
|
||||
// unfreeze and check that save did complete
|
||||
fb.Unfreeze()
|
||||
wg.Wait()
|
||||
val = atomic.LoadInt64(&counter)
|
||||
test.Assert(t, val == 1, "save call should have completed")
|
||||
}
|
||||
|
@ -21,26 +21,9 @@ func init() {
|
||||
// "default" layout.
|
||||
type S3Layout struct{}
|
||||
|
||||
func toS3Backend(b restic.Backend) *s3.Backend {
|
||||
for b != nil {
|
||||
if be, ok := b.(*s3.Backend); ok {
|
||||
return be
|
||||
}
|
||||
|
||||
if be, ok := b.(restic.BackendUnwrapper); ok {
|
||||
b = be.Unwrap()
|
||||
} else {
|
||||
// not the backend we're looking for
|
||||
break
|
||||
}
|
||||
}
|
||||
debug.Log("backend is not s3")
|
||||
return nil
|
||||
}
|
||||
|
||||
// Check tests whether the migration can be applied.
|
||||
func (m *S3Layout) Check(_ context.Context, repo restic.Repository) (bool, string, error) {
|
||||
be := toS3Backend(repo.Backend())
|
||||
be := restic.AsBackend[*s3.Backend](repo.Backend())
|
||||
if be == nil {
|
||||
debug.Log("backend is not s3")
|
||||
return false, "backend is not s3", nil
|
||||
@ -92,7 +75,7 @@ func (m *S3Layout) moveFiles(ctx context.Context, be *s3.Backend, l layout.Layou
|
||||
|
||||
// Apply runs the migration.
|
||||
func (m *S3Layout) Apply(ctx context.Context, repo restic.Repository) error {
|
||||
be := toS3Backend(repo.Backend())
|
||||
be := restic.AsBackend[*s3.Backend](repo.Backend())
|
||||
if be == nil {
|
||||
debug.Log("backend is not s3")
|
||||
return errors.New("backend is not s3")
|
||||
|
@ -1,27 +0,0 @@
|
||||
package migrations
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/restic/restic/internal/backend/mock"
|
||||
"github.com/restic/restic/internal/backend/s3"
|
||||
"github.com/restic/restic/internal/cache"
|
||||
"github.com/restic/restic/internal/test"
|
||||
)
|
||||
|
||||
func TestS3UnwrapBackend(t *testing.T) {
|
||||
// toS3Backend(b restic.Backend) *s3.Backend
|
||||
|
||||
m := mock.NewBackend()
|
||||
test.Assert(t, toS3Backend(m) == nil, "mock backend is not an s3 backend")
|
||||
|
||||
// uninitialized fake backend for testing
|
||||
s3 := &s3.Backend{}
|
||||
test.Assert(t, toS3Backend(s3) == s3, "s3 was not returned")
|
||||
|
||||
c := &cache.Backend{Backend: s3}
|
||||
test.Assert(t, toS3Backend(c) == s3, "failed to unwrap s3 backend")
|
||||
|
||||
c.Backend = m
|
||||
test.Assert(t, toS3Backend(c) == nil, "a wrapped mock backend is not an s3 backend")
|
||||
}
|
@ -75,6 +75,31 @@ type BackendUnwrapper interface {
|
||||
Unwrap() Backend
|
||||
}
|
||||
|
||||
func AsBackend[B Backend](b Backend) B {
|
||||
for b != nil {
|
||||
if be, ok := b.(B); ok {
|
||||
return be
|
||||
}
|
||||
|
||||
if be, ok := b.(BackendUnwrapper); ok {
|
||||
b = be.Unwrap()
|
||||
} else {
|
||||
// not the backend we're looking for
|
||||
break
|
||||
}
|
||||
}
|
||||
var be B
|
||||
return be
|
||||
}
|
||||
|
||||
type FreezeBackend interface {
|
||||
Backend
|
||||
// Freeze blocks all backend operations except those on lock files
|
||||
Freeze()
|
||||
// Unfreeze allows all backend operations to continue
|
||||
Unfreeze()
|
||||
}
|
||||
|
||||
// FileInfo is contains information about a file in the backend.
|
||||
type FileInfo struct {
|
||||
Size int64
|
||||
|
38
internal/restic/backend_test.go
Normal file
38
internal/restic/backend_test.go
Normal file
@ -0,0 +1,38 @@
|
||||
package restic_test
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/restic/restic/internal/restic"
|
||||
"github.com/restic/restic/internal/test"
|
||||
)
|
||||
|
||||
type testBackend struct {
|
||||
restic.Backend
|
||||
}
|
||||
|
||||
func (t *testBackend) Unwrap() restic.Backend {
|
||||
return nil
|
||||
}
|
||||
|
||||
type otherTestBackend struct {
|
||||
restic.Backend
|
||||
}
|
||||
|
||||
func (t *otherTestBackend) Unwrap() restic.Backend {
|
||||
return t.Backend
|
||||
}
|
||||
|
||||
func TestAsBackend(t *testing.T) {
|
||||
other := otherTestBackend{}
|
||||
test.Assert(t, restic.AsBackend[*testBackend](other) == nil, "otherTestBackend is not a testBackend backend")
|
||||
|
||||
testBe := &testBackend{}
|
||||
test.Assert(t, restic.AsBackend[*testBackend](testBe) == testBe, "testBackend was not returned")
|
||||
|
||||
wrapper := &otherTestBackend{Backend: testBe}
|
||||
test.Assert(t, restic.AsBackend[*testBackend](wrapper) == testBe, "failed to unwrap testBackend backend")
|
||||
|
||||
wrapper.Backend = other
|
||||
test.Assert(t, restic.AsBackend[*testBackend](wrapper) == nil, "a wrapped otherTestBackend is not a testBackend")
|
||||
}
|
@ -81,6 +81,8 @@ func IsInvalidLock(err error) bool {
|
||||
return errors.As(err, &e)
|
||||
}
|
||||
|
||||
var ErrRemovedLock = errors.New("lock file was removed in the meantime")
|
||||
|
||||
// NewLock returns a new, non-exclusive lock for the repository. If an
|
||||
// exclusive lock is already held by another process, it returns an error
|
||||
// that satisfies IsAlreadyLocked.
|
||||
@ -274,6 +276,68 @@ func (l *Lock) Refresh(ctx context.Context) error {
|
||||
return l.repo.Backend().Remove(context.TODO(), Handle{Type: LockFile, Name: oldLockID.String()})
|
||||
}
|
||||
|
||||
// RefreshStaleLock is an extended variant of Refresh that can also refresh stale lock files.
|
||||
func (l *Lock) RefreshStaleLock(ctx context.Context) error {
|
||||
debug.Log("refreshing stale lock %v", l.lockID)
|
||||
// refreshing a stale lock is possible if it still exists and continues to do
|
||||
// so until after creating a new lock. The initial check avoids creating a new
|
||||
// lock file if this lock was already removed in the meantime.
|
||||
exists, err := l.checkExistence(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
} else if !exists {
|
||||
return ErrRemovedLock
|
||||
}
|
||||
|
||||
l.lock.Lock()
|
||||
l.Time = time.Now()
|
||||
l.lock.Unlock()
|
||||
id, err := l.createLock(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
time.Sleep(waitBeforeLockCheck)
|
||||
|
||||
exists, err = l.checkExistence(ctx)
|
||||
if err != nil {
|
||||
// cleanup replacement lock
|
||||
_ = l.repo.Backend().Remove(context.TODO(), Handle{Type: LockFile, Name: id.String()})
|
||||
return err
|
||||
}
|
||||
|
||||
if !exists {
|
||||
// cleanup replacement lock
|
||||
_ = l.repo.Backend().Remove(context.TODO(), Handle{Type: LockFile, Name: id.String()})
|
||||
return ErrRemovedLock
|
||||
}
|
||||
|
||||
l.lock.Lock()
|
||||
defer l.lock.Unlock()
|
||||
|
||||
debug.Log("new lock ID %v", id)
|
||||
oldLockID := l.lockID
|
||||
l.lockID = &id
|
||||
|
||||
return l.repo.Backend().Remove(context.TODO(), Handle{Type: LockFile, Name: oldLockID.String()})
|
||||
}
|
||||
|
||||
func (l *Lock) checkExistence(ctx context.Context) (bool, error) {
|
||||
l.lock.Lock()
|
||||
defer l.lock.Unlock()
|
||||
|
||||
exists := false
|
||||
|
||||
err := l.repo.Backend().List(ctx, LockFile, func(fi FileInfo) error {
|
||||
if fi.Name == l.lockID.String() {
|
||||
exists = true
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
return exists, err
|
||||
}
|
||||
|
||||
func (l *Lock) String() string {
|
||||
l.lock.Lock()
|
||||
defer l.lock.Unlock()
|
||||
|
@ -16,6 +16,7 @@ import (
|
||||
|
||||
func TestLock(t *testing.T) {
|
||||
repo := repository.TestRepository(t)
|
||||
restic.TestSetLockTimeout(t, 5*time.Millisecond)
|
||||
|
||||
lock, err := restic.NewLock(context.TODO(), repo)
|
||||
rtest.OK(t, err)
|
||||
@ -25,6 +26,7 @@ func TestLock(t *testing.T) {
|
||||
|
||||
func TestDoubleUnlock(t *testing.T) {
|
||||
repo := repository.TestRepository(t)
|
||||
restic.TestSetLockTimeout(t, 5*time.Millisecond)
|
||||
|
||||
lock, err := restic.NewLock(context.TODO(), repo)
|
||||
rtest.OK(t, err)
|
||||
@ -38,6 +40,7 @@ func TestDoubleUnlock(t *testing.T) {
|
||||
|
||||
func TestMultipleLock(t *testing.T) {
|
||||
repo := repository.TestRepository(t)
|
||||
restic.TestSetLockTimeout(t, 5*time.Millisecond)
|
||||
|
||||
lock1, err := restic.NewLock(context.TODO(), repo)
|
||||
rtest.OK(t, err)
|
||||
@ -63,6 +66,7 @@ func (be *failLockLoadingBackend) Load(ctx context.Context, h restic.Handle, len
|
||||
func TestMultipleLockFailure(t *testing.T) {
|
||||
be := &failLockLoadingBackend{Backend: mem.New()}
|
||||
repo := repository.TestRepositoryWithBackend(t, be, 0)
|
||||
restic.TestSetLockTimeout(t, 5*time.Millisecond)
|
||||
|
||||
lock1, err := restic.NewLock(context.TODO(), repo)
|
||||
rtest.OK(t, err)
|
||||
@ -83,6 +87,7 @@ func TestLockExclusive(t *testing.T) {
|
||||
|
||||
func TestLockOnExclusiveLockedRepo(t *testing.T) {
|
||||
repo := repository.TestRepository(t)
|
||||
restic.TestSetLockTimeout(t, 5*time.Millisecond)
|
||||
|
||||
elock, err := restic.NewExclusiveLock(context.TODO(), repo)
|
||||
rtest.OK(t, err)
|
||||
@ -99,6 +104,7 @@ func TestLockOnExclusiveLockedRepo(t *testing.T) {
|
||||
|
||||
func TestExclusiveLockOnLockedRepo(t *testing.T) {
|
||||
repo := repository.TestRepository(t)
|
||||
restic.TestSetLockTimeout(t, 5*time.Millisecond)
|
||||
|
||||
elock, err := restic.NewLock(context.TODO(), repo)
|
||||
rtest.OK(t, err)
|
||||
@ -247,15 +253,10 @@ func TestRemoveAllLocks(t *testing.T) {
|
||||
3, processed)
|
||||
}
|
||||
|
||||
func TestLockRefresh(t *testing.T) {
|
||||
repo := repository.TestRepository(t)
|
||||
|
||||
lock, err := restic.NewLock(context.TODO(), repo)
|
||||
rtest.OK(t, err)
|
||||
time0 := lock.Time
|
||||
|
||||
func checkSingleLock(t *testing.T, repo restic.Repository) restic.ID {
|
||||
t.Helper()
|
||||
var lockID *restic.ID
|
||||
err = repo.List(context.TODO(), restic.LockFile, func(id restic.ID, size int64) error {
|
||||
err := repo.List(context.TODO(), restic.LockFile, func(id restic.ID, size int64) error {
|
||||
if lockID != nil {
|
||||
t.Error("more than one lock found")
|
||||
}
|
||||
@ -265,27 +266,59 @@ func TestLockRefresh(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if lockID == nil {
|
||||
t.Fatal("no lock found")
|
||||
}
|
||||
return *lockID
|
||||
}
|
||||
|
||||
func testLockRefresh(t *testing.T, refresh func(lock *restic.Lock) error) {
|
||||
repo := repository.TestRepository(t)
|
||||
restic.TestSetLockTimeout(t, 5*time.Millisecond)
|
||||
|
||||
lock, err := restic.NewLock(context.TODO(), repo)
|
||||
rtest.OK(t, err)
|
||||
time0 := lock.Time
|
||||
|
||||
lockID := checkSingleLock(t, repo)
|
||||
|
||||
time.Sleep(time.Millisecond)
|
||||
rtest.OK(t, lock.Refresh(context.TODO()))
|
||||
rtest.OK(t, refresh(lock))
|
||||
|
||||
var lockID2 *restic.ID
|
||||
err = repo.List(context.TODO(), restic.LockFile, func(id restic.ID, size int64) error {
|
||||
if lockID2 != nil {
|
||||
t.Error("more than one lock found")
|
||||
}
|
||||
lockID2 = &id
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
lockID2 := checkSingleLock(t, repo)
|
||||
|
||||
rtest.Assert(t, !lockID.Equal(*lockID2),
|
||||
rtest.Assert(t, !lockID.Equal(lockID2),
|
||||
"expected a new ID after lock refresh, got the same")
|
||||
lock2, err := restic.LoadLock(context.TODO(), repo, *lockID2)
|
||||
lock2, err := restic.LoadLock(context.TODO(), repo, lockID2)
|
||||
rtest.OK(t, err)
|
||||
rtest.Assert(t, lock2.Time.After(time0),
|
||||
"expected a later timestamp after lock refresh")
|
||||
rtest.OK(t, lock.Unlock())
|
||||
}
|
||||
|
||||
func TestLockRefresh(t *testing.T) {
|
||||
testLockRefresh(t, func(lock *restic.Lock) error {
|
||||
return lock.Refresh(context.TODO())
|
||||
})
|
||||
}
|
||||
|
||||
func TestLockRefreshStale(t *testing.T) {
|
||||
testLockRefresh(t, func(lock *restic.Lock) error {
|
||||
return lock.RefreshStaleLock(context.TODO())
|
||||
})
|
||||
}
|
||||
|
||||
func TestLockRefreshStaleMissing(t *testing.T) {
|
||||
repo := repository.TestRepository(t)
|
||||
restic.TestSetLockTimeout(t, 5*time.Millisecond)
|
||||
|
||||
lock, err := restic.NewLock(context.TODO(), repo)
|
||||
rtest.OK(t, err)
|
||||
lockID := checkSingleLock(t, repo)
|
||||
|
||||
// refresh must fail if lock was removed
|
||||
rtest.OK(t, repo.Backend().Remove(context.TODO(), restic.Handle{Type: restic.LockFile, Name: lockID.String()}))
|
||||
time.Sleep(time.Millisecond)
|
||||
err = lock.RefreshStaleLock(context.TODO())
|
||||
rtest.Assert(t, err == restic.ErrRemovedLock, "unexpected error, expected %v, got %v", restic.ErrRemovedLock, err)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user