diff --git a/internal/backend/azure/azure.go b/internal/backend/azure/azure.go index ff89a6b01..e99680ce3 100644 --- a/internal/backend/azure/azure.go +++ b/internal/backend/azure/azure.go @@ -12,6 +12,7 @@ import ( "strings" "github.com/restic/restic/internal/backend" + "github.com/restic/restic/internal/backend/sema" "github.com/restic/restic/internal/debug" "github.com/restic/restic/internal/errors" "github.com/restic/restic/internal/restic" @@ -25,7 +26,7 @@ type Backend struct { accountName string container *storage.Container connections uint - sem *backend.Semaphore + sem sema.Semaphore prefix string listMaxItems int backend.Layout @@ -48,7 +49,7 @@ func open(cfg Config, rt http.RoundTripper) (*Backend, error) { service := client.GetBlobService() - sem, err := backend.NewSemaphore(cfg.Connections) + sem, err := sema.New(cfg.Connections) if err != nil { return nil, err } diff --git a/internal/backend/b2/b2.go b/internal/backend/b2/b2.go index 7f8019a74..46e01450b 100644 --- a/internal/backend/b2/b2.go +++ b/internal/backend/b2/b2.go @@ -8,6 +8,7 @@ import ( "path" "github.com/restic/restic/internal/backend" + "github.com/restic/restic/internal/backend/sema" "github.com/restic/restic/internal/debug" "github.com/restic/restic/internal/errors" "github.com/restic/restic/internal/restic" @@ -23,7 +24,7 @@ type b2Backend struct { cfg Config listMaxItems int backend.Layout - sem *backend.Semaphore + sem sema.Semaphore } const defaultListMaxItems = 1000 @@ -58,7 +59,7 @@ func Open(ctx context.Context, cfg Config, rt http.RoundTripper) (restic.Backend return nil, errors.Wrap(err, "Bucket") } - sem, err := backend.NewSemaphore(cfg.Connections) + sem, err := sema.New(cfg.Connections) if err != nil { return nil, err } @@ -99,7 +100,7 @@ func Create(ctx context.Context, cfg Config, rt http.RoundTripper) (restic.Backe return nil, errors.Wrap(err, "NewBucket") } - sem, err := backend.NewSemaphore(cfg.Connections) + sem, err := sema.New(cfg.Connections) if err != nil { return nil, err } @@ -284,11 +285,11 @@ func (be *b2Backend) Remove(ctx context.Context, h restic.Handle) error { } type semLocker struct { - *backend.Semaphore + sema.Semaphore } -func (sm semLocker) Lock() { sm.GetToken() } -func (sm semLocker) Unlock() { sm.ReleaseToken() } +func (sm *semLocker) Lock() { sm.GetToken() } +func (sm *semLocker) Unlock() { sm.ReleaseToken() } // List returns a channel that yields all names of blobs of type t. func (be *b2Backend) List(ctx context.Context, t restic.FileType, fn func(restic.FileInfo) error) error { @@ -298,7 +299,7 @@ func (be *b2Backend) List(ctx context.Context, t restic.FileType, fn func(restic defer cancel() prefix, _ := be.Basedir(t) - iter := be.bucket.List(ctx, b2.ListPrefix(prefix), b2.ListPageSize(be.listMaxItems), b2.ListLocker(semLocker{be.sem})) + iter := be.bucket.List(ctx, b2.ListPrefix(prefix), b2.ListPageSize(be.listMaxItems), b2.ListLocker(&semLocker{be.sem})) for iter.Next() { obj := iter.Object() diff --git a/internal/backend/gs/gs.go b/internal/backend/gs/gs.go index 92de75887..5e464ab32 100644 --- a/internal/backend/gs/gs.go +++ b/internal/backend/gs/gs.go @@ -14,6 +14,7 @@ import ( "cloud.google.com/go/storage" "github.com/pkg/errors" "github.com/restic/restic/internal/backend" + "github.com/restic/restic/internal/backend/sema" "github.com/restic/restic/internal/debug" "github.com/restic/restic/internal/restic" @@ -35,7 +36,7 @@ type Backend struct { gcsClient *storage.Client projectID string connections uint - sem *backend.Semaphore + sem sema.Semaphore bucketName string bucket *storage.BucketHandle prefix string @@ -97,7 +98,7 @@ func open(cfg Config, rt http.RoundTripper) (*Backend, error) { return nil, errors.Wrap(err, "getStorageClient") } - sem, err := backend.NewSemaphore(cfg.Connections) + sem, err := sema.New(cfg.Connections) if err != nil { return nil, err } diff --git a/internal/backend/local/local.go b/internal/backend/local/local.go index 22fb8c8e5..bb644c949 100644 --- a/internal/backend/local/local.go +++ b/internal/backend/local/local.go @@ -9,12 +9,12 @@ import ( "path/filepath" "syscall" - "github.com/restic/restic/internal/errors" - "github.com/restic/restic/internal/restic" - "github.com/restic/restic/internal/backend" + "github.com/restic/restic/internal/backend/sema" "github.com/restic/restic/internal/debug" + "github.com/restic/restic/internal/errors" "github.com/restic/restic/internal/fs" + "github.com/restic/restic/internal/restic" "github.com/cenkalti/backoff/v4" ) @@ -22,7 +22,7 @@ import ( // Local is a backend in a local directory. type Local struct { Config - sem *backend.Semaphore + sem sema.Semaphore backend.Layout backend.Modes } @@ -38,7 +38,7 @@ func open(ctx context.Context, cfg Config) (*Local, error) { return nil, err } - sem, err := backend.NewSemaphore(cfg.Connections) + sem, err := sema.New(cfg.Connections) if err != nil { return nil, err } diff --git a/internal/backend/mem/mem_backend.go b/internal/backend/mem/mem_backend.go index b14149d52..3880f4404 100644 --- a/internal/backend/mem/mem_backend.go +++ b/internal/backend/mem/mem_backend.go @@ -11,6 +11,7 @@ import ( "sync" "github.com/restic/restic/internal/backend" + "github.com/restic/restic/internal/backend/sema" "github.com/restic/restic/internal/debug" "github.com/restic/restic/internal/errors" "github.com/restic/restic/internal/restic" @@ -32,12 +33,12 @@ const connectionCount = 2 type MemoryBackend struct { data memMap m sync.Mutex - sem *backend.Semaphore + sem sema.Semaphore } // New returns a new backend that saves all data in a map in memory. func New() *MemoryBackend { - sem, err := backend.NewSemaphore(connectionCount) + sem, err := sema.New(connectionCount) if err != nil { panic(err) } diff --git a/internal/backend/rest/rest.go b/internal/backend/rest/rest.go index b9824bb53..c39ad3941 100644 --- a/internal/backend/rest/rest.go +++ b/internal/backend/rest/rest.go @@ -17,6 +17,7 @@ import ( "golang.org/x/net/context/ctxhttp" "github.com/restic/restic/internal/backend" + "github.com/restic/restic/internal/backend/sema" "github.com/restic/restic/internal/debug" "github.com/restic/restic/internal/errors" "github.com/restic/restic/internal/restic" @@ -31,7 +32,7 @@ var _ restic.Backend = &Backend{} type Backend struct { url *url.URL connections uint - sem *backend.Semaphore + sem sema.Semaphore client *http.Client backend.Layout } @@ -46,7 +47,7 @@ const ( func Open(cfg Config, rt http.RoundTripper) (*Backend, error) { client := &http.Client{Transport: rt} - sem, err := backend.NewSemaphore(cfg.Connections) + sem, err := sema.New(cfg.Connections) if err != nil { return nil, err } diff --git a/internal/backend/s3/s3.go b/internal/backend/s3/s3.go index ac1a1d5ce..d6978cc28 100644 --- a/internal/backend/s3/s3.go +++ b/internal/backend/s3/s3.go @@ -13,6 +13,7 @@ import ( "time" "github.com/restic/restic/internal/backend" + "github.com/restic/restic/internal/backend/sema" "github.com/restic/restic/internal/debug" "github.com/restic/restic/internal/errors" "github.com/restic/restic/internal/restic" @@ -25,7 +26,7 @@ import ( // Backend stores data on an S3 endpoint. type Backend struct { client *minio.Client - sem *backend.Semaphore + sem sema.Semaphore cfg Config backend.Layout } @@ -101,7 +102,7 @@ func open(ctx context.Context, cfg Config, rt http.RoundTripper) (*Backend, erro return nil, errors.Wrap(err, "minio.New") } - sem, err := backend.NewSemaphore(cfg.Connections) + sem, err := sema.New(cfg.Connections) if err != nil { return nil, err } diff --git a/internal/backend/sema/semaphore.go b/internal/backend/sema/semaphore.go new file mode 100644 index 000000000..7ee912979 --- /dev/null +++ b/internal/backend/sema/semaphore.go @@ -0,0 +1,65 @@ +// Package sema implements semaphores. +package sema + +import ( + "context" + "io" + + "github.com/restic/restic/internal/errors" +) + +// A Semaphore limits access to a restricted resource. +type Semaphore struct { + ch chan struct{} +} + +// New returns a new semaphore with capacity n. +func New(n uint) (Semaphore, error) { + if n == 0 { + return Semaphore{}, errors.New("capacity must be a positive number") + } + return Semaphore{ + ch: make(chan struct{}, n), + }, nil +} + +// GetToken blocks until a Token is available. +func (s Semaphore) GetToken() { s.ch <- struct{}{} } + +// ReleaseToken returns a token. +func (s Semaphore) ReleaseToken() { <-s.ch } + +// ReleaseTokenOnClose wraps an io.ReadCloser to return a token on Close. +// Before returning the token, cancel, if not nil, will be run +// to free up context resources. +func (s Semaphore) ReleaseTokenOnClose(rc io.ReadCloser, cancel context.CancelFunc) io.ReadCloser { + return &wrapReader{ReadCloser: rc, sem: s, cancel: cancel} +} + +type wrapReader struct { + io.ReadCloser + eofSeen bool + sem Semaphore + cancel context.CancelFunc +} + +func (wr *wrapReader) Read(p []byte) (int, error) { + if wr.eofSeen { // XXX Why do we do this? + return 0, io.EOF + } + + n, err := wr.ReadCloser.Read(p) + if err == io.EOF { + wr.eofSeen = true + } + return n, err +} + +func (wr *wrapReader) Close() error { + err := wr.ReadCloser.Close() + if wr.cancel != nil { + wr.cancel() + } + wr.sem.ReleaseToken() + return err +} diff --git a/internal/backend/semaphore.go b/internal/backend/semaphore.go deleted file mode 100644 index 28b97472b..000000000 --- a/internal/backend/semaphore.go +++ /dev/null @@ -1,69 +0,0 @@ -package backend - -import ( - "context" - "io" - - "github.com/restic/restic/internal/errors" -) - -// Semaphore limits access to a restricted resource. -type Semaphore struct { - ch chan struct{} -} - -// NewSemaphore returns a new semaphore with capacity n. -func NewSemaphore(n uint) (*Semaphore, error) { - if n == 0 { - return nil, errors.New("must be a positive number") - } - return &Semaphore{ - ch: make(chan struct{}, n), - }, nil -} - -// GetToken blocks until a Token is available. -func (s *Semaphore) GetToken() { - s.ch <- struct{}{} -} - -// ReleaseToken returns a token. -func (s *Semaphore) ReleaseToken() { - <-s.ch -} - -// ReleaseTokenOnClose wraps an io.ReadCloser to return a token on Close. Before returning the token, -// cancel, if provided, will be run to free up context resources. -func (s *Semaphore) ReleaseTokenOnClose(rc io.ReadCloser, cancel context.CancelFunc) io.ReadCloser { - return &wrapReader{rc, false, func() { - if cancel != nil { - cancel() - } - s.ReleaseToken() - }} -} - -// wrapReader wraps an io.ReadCloser to run an additional function on Close. -type wrapReader struct { - io.ReadCloser - eofSeen bool - f func() -} - -func (wr *wrapReader) Read(p []byte) (int, error) { - if wr.eofSeen { - return 0, io.EOF - } - - n, err := wr.ReadCloser.Read(p) - if err == io.EOF { - wr.eofSeen = true - } - return n, err -} - -func (wr *wrapReader) Close() error { - err := wr.ReadCloser.Close() - wr.f() - return err -} diff --git a/internal/backend/sftp/sftp.go b/internal/backend/sftp/sftp.go index 99eb2c09a..0019e588c 100644 --- a/internal/backend/sftp/sftp.go +++ b/internal/backend/sftp/sftp.go @@ -13,12 +13,12 @@ import ( "path" "time" + "github.com/restic/restic/internal/backend" + "github.com/restic/restic/internal/backend/sema" + "github.com/restic/restic/internal/debug" "github.com/restic/restic/internal/errors" "github.com/restic/restic/internal/restic" - "github.com/restic/restic/internal/backend" - "github.com/restic/restic/internal/debug" - "github.com/cenkalti/backoff/v4" "github.com/pkg/sftp" ) @@ -33,7 +33,7 @@ type SFTP struct { posixRename bool - sem *backend.Semaphore + sem sema.Semaphore backend.Layout Config backend.Modes @@ -121,7 +121,7 @@ func (r *SFTP) clientError() error { func Open(ctx context.Context, cfg Config) (*SFTP, error) { debug.Log("open backend with config %#v", cfg) - sem, err := backend.NewSemaphore(cfg.Connections) + sem, err := sema.New(cfg.Connections) if err != nil { return nil, err } diff --git a/internal/backend/swift/swift.go b/internal/backend/swift/swift.go index b127cb832..2cab8a3fb 100644 --- a/internal/backend/swift/swift.go +++ b/internal/backend/swift/swift.go @@ -14,6 +14,7 @@ import ( "time" "github.com/restic/restic/internal/backend" + "github.com/restic/restic/internal/backend/sema" "github.com/restic/restic/internal/debug" "github.com/restic/restic/internal/errors" "github.com/restic/restic/internal/restic" @@ -26,7 +27,7 @@ import ( type beSwift struct { conn *swift.Connection connections uint - sem *backend.Semaphore + sem sema.Semaphore container string // Container name prefix string // Prefix of object names in the container backend.Layout @@ -40,7 +41,7 @@ var _ restic.Backend = &beSwift{} func Open(ctx context.Context, cfg Config, rt http.RoundTripper) (restic.Backend, error) { debug.Log("config %#v", cfg) - sem, err := backend.NewSemaphore(cfg.Connections) + sem, err := sema.New(cfg.Connections) if err != nil { return nil, err }