2
2
mirror of https://github.com/octoleo/restic.git synced 2025-01-01 06:21:50 +00:00
restic/internal/backend/sema/backend.go
Michael Eischer d05f6211d1 lock: Do not limit backend concurrency for lock files
restic must be able to refresh lock files in time. However, large
uploads over slow connections can cause the lock refresh to be stuck
behind the large uploads and thus time out.
2023-05-08 19:04:46 +02:00

99 lines
2.6 KiB
Go

package sema
import (
"context"
"io"
"github.com/cenkalti/backoff/v4"
"github.com/restic/restic/internal/errors"
"github.com/restic/restic/internal/restic"
)
// make sure that connectionLimitedBackend implements restic.Backend
var _ restic.Backend = &connectionLimitedBackend{}
// connectionLimitedBackend limits the number of concurrent operations.
type connectionLimitedBackend struct {
restic.Backend
sem semaphore
}
// NewBackend creates a backend that limits the concurrent operations on the underlying backend
func NewBackend(be restic.Backend) restic.Backend {
sem, err := newSemaphore(be.Connections())
if err != nil {
panic(err)
}
return &connectionLimitedBackend{
Backend: be,
sem: sem,
}
}
// typeDependentLimit acquire a token unless the FileType is a lock file. The returned function
// must be called to release the token.
func (be *connectionLimitedBackend) typeDependentLimit(t restic.FileType) func() {
// allow concurrent lock file operations to ensure that the lock refresh is always possible
if t == restic.LockFile {
return func() {}
}
be.sem.GetToken()
return be.sem.ReleaseToken
}
// Save adds new Data to the backend.
func (be *connectionLimitedBackend) Save(ctx context.Context, h restic.Handle, rd restic.RewindReader) error {
if err := h.Valid(); err != nil {
return backoff.Permanent(err)
}
defer be.typeDependentLimit(h.Type)()
return be.Backend.Save(ctx, h, rd)
}
// Load runs fn with a reader that yields the contents of the file at h at the
// given offset.
func (be *connectionLimitedBackend) Load(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error {
if err := h.Valid(); err != nil {
return backoff.Permanent(err)
}
if offset < 0 {
return backoff.Permanent(errors.New("offset is negative"))
}
if length < 0 {
return backoff.Permanent(errors.Errorf("invalid length %d", length))
}
defer be.typeDependentLimit(h.Type)()
return be.Backend.Load(ctx, h, length, offset, fn)
}
// Stat returns information about a file in the backend.
func (be *connectionLimitedBackend) Stat(ctx context.Context, h restic.Handle) (restic.FileInfo, error) {
if err := h.Valid(); err != nil {
return restic.FileInfo{}, backoff.Permanent(err)
}
defer be.typeDependentLimit(h.Type)()
return be.Backend.Stat(ctx, h)
}
// Remove deletes a file from the backend.
func (be *connectionLimitedBackend) Remove(ctx context.Context, h restic.Handle) error {
if err := h.Valid(); err != nil {
return backoff.Permanent(err)
}
defer be.typeDependentLimit(h.Type)()
return be.Backend.Remove(ctx, h)
}
func (be *connectionLimitedBackend) Unwrap() restic.Backend {
return be.Backend
}