From 8bfc2519d7ce10133d80923cfe7b6f0f5a3794da Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Fri, 7 Apr 2023 21:29:01 +0200 Subject: [PATCH 01/13] backend: Deduplicate sanity checks for parameters of Load() method The check is now handled by backend.DefaultLoad. This also guarantees consistent behavior across all backends. --- internal/backend/azure/azure.go | 11 ----------- internal/backend/b2/b2.go | 11 ----------- internal/backend/gs/gs.go | 10 ---------- internal/backend/local/local.go | 7 ------- internal/backend/mem/mem_backend.go | 3 --- internal/backend/s3/s3.go | 11 ----------- internal/backend/sftp/sftp.go | 7 ------- internal/backend/swift/swift.go | 11 ----------- internal/backend/utils.go | 10 ++++++++++ 9 files changed, 10 insertions(+), 71 deletions(-) diff --git a/internal/backend/azure/azure.go b/internal/backend/azure/azure.go index c92fa3f89..8a252fc81 100644 --- a/internal/backend/azure/azure.go +++ b/internal/backend/azure/azure.go @@ -300,17 +300,6 @@ func (be *Backend) Load(ctx context.Context, h restic.Handle, length int, offset func (be *Backend) openReader(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) { debug.Log("Load %v, length %v, offset %v from %v", h, length, offset, be.Filename(h)) - if err := h.Valid(); err != nil { - return nil, backoff.Permanent(err) - } - - if offset < 0 { - return nil, errors.New("offset is negative") - } - - if length < 0 { - return nil, errors.Errorf("invalid length %d", length) - } objName := be.Filename(h) blockBlobClient := be.container.NewBlobClient(objName) diff --git a/internal/backend/b2/b2.go b/internal/backend/b2/b2.go index 40dbbf893..b30d1eeab 100644 --- a/internal/backend/b2/b2.go +++ b/internal/backend/b2/b2.go @@ -207,17 +207,6 @@ func (be *b2Backend) Load(ctx context.Context, h restic.Handle, length int, offs func (be *b2Backend) openReader(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) { debug.Log("Load %v, length %v, offset %v from %v", h, length, offset, be.Filename(h)) - if err := h.Valid(); err != nil { - return nil, backoff.Permanent(err) - } - - if offset < 0 { - return nil, errors.New("offset is negative") - } - - if length < 0 { - return nil, errors.Errorf("invalid length %d", length) - } ctx, cancel := context.WithCancel(ctx) diff --git a/internal/backend/gs/gs.go b/internal/backend/gs/gs.go index 77cbcda97..ee3c30e70 100644 --- a/internal/backend/gs/gs.go +++ b/internal/backend/gs/gs.go @@ -273,17 +273,7 @@ func (be *Backend) Load(ctx context.Context, h restic.Handle, length int, offset func (be *Backend) openReader(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) { debug.Log("Load %v, length %v, offset %v from %v", h, length, offset, be.Filename(h)) - if err := h.Valid(); err != nil { - return nil, err - } - if offset < 0 { - return nil, errors.New("offset is negative") - } - - if length < 0 { - return nil, errors.Errorf("invalid length %d", length) - } if length == 0 { // negative length indicates read till end to GCS lib length = -1 diff --git a/internal/backend/local/local.go b/internal/backend/local/local.go index 1716e0f07..f514647a6 100644 --- a/internal/backend/local/local.go +++ b/internal/backend/local/local.go @@ -218,13 +218,6 @@ func (b *Local) Load(ctx context.Context, h restic.Handle, length int, offset in func (b *Local) openReader(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) { debug.Log("Load %v, length %v, offset %v", h, length, offset) - if err := h.Valid(); err != nil { - return nil, backoff.Permanent(err) - } - - if offset < 0 { - return nil, errors.New("offset is negative") - } b.sem.GetToken() f, err := fs.Open(b.Filename(h)) diff --git a/internal/backend/mem/mem_backend.go b/internal/backend/mem/mem_backend.go index 0c46dcd6e..dbdbf1c46 100644 --- a/internal/backend/mem/mem_backend.go +++ b/internal/backend/mem/mem_backend.go @@ -114,9 +114,6 @@ func (be *MemoryBackend) Load(ctx context.Context, h restic.Handle, length int, } func (be *MemoryBackend) openReader(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) { - if err := h.Valid(); err != nil { - return nil, backoff.Permanent(err) - } be.sem.GetToken() be.m.Lock() diff --git a/internal/backend/s3/s3.go b/internal/backend/s3/s3.go index ad652a206..91643f909 100644 --- a/internal/backend/s3/s3.go +++ b/internal/backend/s3/s3.go @@ -312,17 +312,6 @@ func (be *Backend) Load(ctx context.Context, h restic.Handle, length int, offset func (be *Backend) openReader(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) { debug.Log("Load %v, length %v, offset %v from %v", h, length, offset, be.Filename(h)) - if err := h.Valid(); err != nil { - return nil, backoff.Permanent(err) - } - - if offset < 0 { - return nil, errors.New("offset is negative") - } - - if length < 0 { - return nil, errors.Errorf("invalid length %d", length) - } objName := be.Filename(h) opts := minio.GetObjectOptions{} diff --git a/internal/backend/sftp/sftp.go b/internal/backend/sftp/sftp.go index 514dd58da..5d5aa90d0 100644 --- a/internal/backend/sftp/sftp.go +++ b/internal/backend/sftp/sftp.go @@ -430,13 +430,6 @@ func (wr *wrapReader) Close() error { func (r *SFTP) openReader(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) { debug.Log("Load %v, length %v, offset %v", h, length, offset) - if err := h.Valid(); err != nil { - return nil, backoff.Permanent(err) - } - - if offset < 0 { - return nil, errors.New("offset is negative") - } r.sem.GetToken() f, err := r.c.Open(r.Filename(h)) diff --git a/internal/backend/swift/swift.go b/internal/backend/swift/swift.go index 764c7bb62..8685b336f 100644 --- a/internal/backend/swift/swift.go +++ b/internal/backend/swift/swift.go @@ -144,17 +144,6 @@ func (be *beSwift) Load(ctx context.Context, h restic.Handle, length int, offset func (be *beSwift) openReader(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) { debug.Log("Load %v, length %v, offset %v", h, length, offset) - if err := h.Valid(); err != nil { - return nil, backoff.Permanent(err) - } - - if offset < 0 { - return nil, errors.New("offset is negative") - } - - if length < 0 { - return nil, errors.Errorf("invalid length %d", length) - } objName := be.Filename(h) diff --git a/internal/backend/utils.go b/internal/backend/utils.go index d2ac44670..1c1607e04 100644 --- a/internal/backend/utils.go +++ b/internal/backend/utils.go @@ -6,6 +6,7 @@ import ( "fmt" "io" + "github.com/cenkalti/backoff/v4" "github.com/restic/restic/internal/debug" "github.com/restic/restic/internal/errors" "github.com/restic/restic/internal/restic" @@ -62,6 +63,15 @@ func LimitReadCloser(r io.ReadCloser, n int64) *LimitedReadCloser { func DefaultLoad(ctx context.Context, h restic.Handle, length int, offset int64, openReader func(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error), fn func(rd io.Reader) error) error { + if err := h.Valid(); err != nil { + return backoff.Permanent(err) + } + if offset < 0 { + return errors.New("offset is negative") + } + if length < 0 { + return errors.Errorf("invalid length %d", length) + } rd, err := openReader(ctx, h, length, offset) if err != nil { return err From 4703473ec564ed1130cb87ff4b355fb20babfc99 Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Fri, 7 Apr 2023 22:01:30 +0200 Subject: [PATCH 02/13] backend: extract most debug logs into logger backend --- cmd/restic/global.go | 33 +++++++---- internal/backend/azure/azure.go | 14 ----- internal/backend/b2/b2.go | 12 ---- internal/backend/dryrun/dry_backend.go | 2 - internal/backend/gs/gs.go | 13 ----- internal/backend/local/local.go | 9 --- internal/backend/logger/log.go | 77 ++++++++++++++++++++++++++ internal/backend/mem/mem_backend.go | 3 - internal/backend/rest/rest.go | 1 - internal/backend/s3/s3.go | 16 ------ internal/backend/sftp/sftp.go | 8 --- internal/backend/swift/swift.go | 11 ---- 12 files changed, 99 insertions(+), 100 deletions(-) create mode 100644 internal/backend/logger/log.go diff --git a/cmd/restic/global.go b/cmd/restic/global.go index 32f18a67f..41f97b5df 100644 --- a/cmd/restic/global.go +++ b/cmd/restic/global.go @@ -20,6 +20,7 @@ import ( "github.com/restic/restic/internal/backend/limiter" "github.com/restic/restic/internal/backend/local" "github.com/restic/restic/internal/backend/location" + "github.com/restic/restic/internal/backend/logger" "github.com/restic/restic/internal/backend/rclone" "github.com/restic/restic/internal/backend/rest" "github.com/restic/restic/internal/backend/retry" @@ -743,6 +744,9 @@ func open(ctx context.Context, s string, gopts GlobalOptions, opts options.Optio return nil, errors.Fatalf("unable to open repository at %v: %v", location.StripPassword(s), err) } + // wrap with debug logging + be = logger.New(be) + // wrap backend if a test specified an inner hook if gopts.backendInnerTestHook != nil { be, err = gopts.backendInnerTestHook(be) @@ -787,27 +791,34 @@ func create(ctx context.Context, s string, opts options.Options) (restic.Backend return nil, err } + var be restic.Backend switch loc.Scheme { case "local": - return local.Create(ctx, cfg.(local.Config)) + be, err = local.Create(ctx, cfg.(local.Config)) case "sftp": - return sftp.Create(ctx, cfg.(sftp.Config)) + be, err = sftp.Create(ctx, cfg.(sftp.Config)) case "s3": - return s3.Create(ctx, cfg.(s3.Config), rt) + be, err = s3.Create(ctx, cfg.(s3.Config), rt) case "gs": - return gs.Create(cfg.(gs.Config), rt) + be, err = gs.Create(cfg.(gs.Config), rt) case "azure": - return azure.Create(ctx, cfg.(azure.Config), rt) + be, err = azure.Create(ctx, cfg.(azure.Config), rt) case "swift": - return swift.Open(ctx, cfg.(swift.Config), rt) + be, err = swift.Open(ctx, cfg.(swift.Config), rt) case "b2": - return b2.Create(ctx, cfg.(b2.Config), rt) + be, err = b2.Create(ctx, cfg.(b2.Config), rt) case "rest": - return rest.Create(ctx, cfg.(rest.Config), rt) + be, err = rest.Create(ctx, cfg.(rest.Config), rt) case "rclone": - return rclone.Create(ctx, cfg.(rclone.Config)) + be, err = rclone.Create(ctx, cfg.(rclone.Config)) + default: + debug.Log("invalid repository scheme: %v", s) + return nil, errors.Fatalf("invalid scheme %q", loc.Scheme) } - debug.Log("invalid repository scheme: %v", s) - return nil, errors.Fatalf("invalid scheme %q", loc.Scheme) + if err != nil { + return nil, err + } + + return logger.New(be), nil } diff --git a/internal/backend/azure/azure.go b/internal/backend/azure/azure.go index 8a252fc81..4d7a4a57b 100644 --- a/internal/backend/azure/azure.go +++ b/internal/backend/azure/azure.go @@ -152,7 +152,6 @@ func (be *Backend) SetListMaxItems(i int) { // IsNotExist returns true if the error is caused by a not existing file. func (be *Backend) IsNotExist(err error) bool { - debug.Log("IsNotExist(%T, %#v)", err, err) return bloberror.HasCode(err, bloberror.BlobNotFound) } @@ -193,8 +192,6 @@ func (be *Backend) Save(ctx context.Context, h restic.Handle, rd restic.RewindRe objName := be.Filename(h) - debug.Log("Save %v at %v", h, objName) - be.sem.GetToken() debug.Log("InsertObject(%v, %v)", be.cfg.AccountName, objName) @@ -209,8 +206,6 @@ func (be *Backend) Save(ctx context.Context, h restic.Handle, rd restic.RewindRe } be.sem.ReleaseToken() - debug.Log("%v, err %#v", objName, err) - return err } @@ -299,8 +294,6 @@ func (be *Backend) Load(ctx context.Context, h restic.Handle, length int, offset } func (be *Backend) openReader(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) { - debug.Log("Load %v, length %v, offset %v from %v", h, length, offset, be.Filename(h)) - objName := be.Filename(h) blockBlobClient := be.container.NewBlobClient(objName) @@ -322,8 +315,6 @@ func (be *Backend) openReader(ctx context.Context, h restic.Handle, length int, // Stat returns information about a blob. func (be *Backend) Stat(ctx context.Context, h restic.Handle) (restic.FileInfo, error) { - debug.Log("%v", h) - objName := be.Filename(h) blobClient := be.container.NewBlobClient(objName) @@ -332,7 +323,6 @@ func (be *Backend) Stat(ctx context.Context, h restic.Handle) (restic.FileInfo, be.sem.ReleaseToken() if err != nil { - debug.Log("blob.GetProperties err %v", err) return restic.FileInfo{}, errors.Wrap(err, "blob.GetProperties") } @@ -352,8 +342,6 @@ func (be *Backend) Remove(ctx context.Context, h restic.Handle) error { _, err := blob.Delete(ctx, &azblob.DeleteBlobOptions{}) be.sem.ReleaseToken() - debug.Log("Remove(%v) at %v -> err %v", h, objName, err) - if be.IsNotExist(err) { return nil } @@ -364,8 +352,6 @@ func (be *Backend) Remove(ctx context.Context, h restic.Handle) error { // List runs fn for each file in the backend which has the type t. When an // error occurs (or fn returns an error), List stops and returns it. func (be *Backend) List(ctx context.Context, t restic.FileType, fn func(restic.FileInfo) error) error { - debug.Log("listing %v", t) - prefix, _ := be.Basedir(t) // make sure prefix ends with a slash diff --git a/internal/backend/b2/b2.go b/internal/backend/b2/b2.go index b30d1eeab..10f1a715b 100644 --- a/internal/backend/b2/b2.go +++ b/internal/backend/b2/b2.go @@ -206,8 +206,6 @@ func (be *b2Backend) Load(ctx context.Context, h restic.Handle, length int, offs } func (be *b2Backend) openReader(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) { - debug.Log("Load %v, length %v, offset %v from %v", h, length, offset, be.Filename(h)) - ctx, cancel := context.WithCancel(ctx) be.sem.GetToken() @@ -249,7 +247,6 @@ func (be *b2Backend) Save(ctx context.Context, h restic.Handle, rd restic.Rewind // b2 always requires sha1 checksums for uploaded file parts w := obj.NewWriter(ctx) n, err := io.Copy(w, rd) - debug.Log(" saved %d bytes, err %v", n, err) if err != nil { _ = w.Close() @@ -265,8 +262,6 @@ func (be *b2Backend) Save(ctx context.Context, h restic.Handle, rd restic.Rewind // Stat returns information about a blob. func (be *b2Backend) Stat(ctx context.Context, h restic.Handle) (bi restic.FileInfo, err error) { - debug.Log("Stat %v", h) - be.sem.GetToken() defer be.sem.ReleaseToken() @@ -274,7 +269,6 @@ func (be *b2Backend) Stat(ctx context.Context, h restic.Handle) (bi restic.FileI obj := be.bucket.Object(name) info, err := obj.Attrs(ctx) if err != nil { - debug.Log("Attrs() err %v", err) return restic.FileInfo{}, errors.Wrap(err, "Stat") } return restic.FileInfo{Size: info.Size, Name: h.Name}, nil @@ -282,8 +276,6 @@ func (be *b2Backend) Stat(ctx context.Context, h restic.Handle) (bi restic.FileI // Remove removes the blob with the given name and type. func (be *b2Backend) Remove(ctx context.Context, h restic.Handle) error { - debug.Log("Remove %v", h) - be.sem.GetToken() defer be.sem.ReleaseToken() @@ -330,8 +322,6 @@ func (sm *semLocker) Unlock() { sm.ReleaseToken() } // List returns a channel that yields all names of blobs of type t. func (be *b2Backend) List(ctx context.Context, t restic.FileType, fn func(restic.FileInfo) error) error { - debug.Log("List %v", t) - ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -356,7 +346,6 @@ func (be *b2Backend) List(ctx context.Context, t restic.FileType, fn func(restic } } if err := iter.Err(); err != nil { - debug.Log("List: %v", err) return err } return nil @@ -364,7 +353,6 @@ func (be *b2Backend) List(ctx context.Context, t restic.FileType, fn func(restic // Remove keys for a specified backend type. func (be *b2Backend) removeKeys(ctx context.Context, t restic.FileType) error { - debug.Log("removeKeys %v", t) return be.List(ctx, t, func(fi restic.FileInfo) error { return be.Remove(ctx, restic.Handle{Type: t, Name: fi.Name}) }) diff --git a/internal/backend/dryrun/dry_backend.go b/internal/backend/dryrun/dry_backend.go index 37569c320..1218e9819 100644 --- a/internal/backend/dryrun/dry_backend.go +++ b/internal/backend/dryrun/dry_backend.go @@ -34,8 +34,6 @@ func (be *Backend) Save(ctx context.Context, h restic.Handle, rd restic.RewindRe return err } - debug.Log("faked saving %v bytes at %v", rd.Length(), h) - // don't save anything, just return ok return nil } diff --git a/internal/backend/gs/gs.go b/internal/backend/gs/gs.go index ee3c30e70..faf8b9858 100644 --- a/internal/backend/gs/gs.go +++ b/internal/backend/gs/gs.go @@ -169,7 +169,6 @@ func (be *Backend) SetListMaxItems(i int) { // IsNotExist returns true if the error is caused by a not existing file. func (be *Backend) IsNotExist(err error) bool { - debug.Log("IsNotExist(%T, %#v)", err, err) return errors.Is(err, storage.ErrObjectNotExist) } @@ -210,8 +209,6 @@ func (be *Backend) Save(ctx context.Context, h restic.Handle, rd restic.RewindRe objName := be.Filename(h) - debug.Log("Save %v at %v", h, objName) - be.sem.GetToken() debug.Log("InsertObject(%v, %v)", be.bucketName, objName) @@ -253,11 +250,9 @@ func (be *Backend) Save(ctx context.Context, h restic.Handle, rd restic.RewindRe be.sem.ReleaseToken() if err != nil { - debug.Log("%v: err %#v: %v", objName, err, err) return errors.Wrap(err, "service.Objects.Insert") } - debug.Log("%v -> %v bytes", objName, wbytes) // sanity check if wbytes != rd.Length() { return errors.Errorf("wrote %d bytes instead of the expected %d bytes", wbytes, rd.Length()) @@ -272,8 +267,6 @@ func (be *Backend) Load(ctx context.Context, h restic.Handle, length int, offset } func (be *Backend) openReader(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) { - debug.Log("Load %v, length %v, offset %v from %v", h, length, offset, be.Filename(h)) - if length == 0 { // negative length indicates read till end to GCS lib length = -1 @@ -297,8 +290,6 @@ func (be *Backend) openReader(ctx context.Context, h restic.Handle, length int, // Stat returns information about a blob. func (be *Backend) Stat(ctx context.Context, h restic.Handle) (bi restic.FileInfo, err error) { - debug.Log("%v", h) - objName := be.Filename(h) be.sem.GetToken() @@ -306,7 +297,6 @@ func (be *Backend) Stat(ctx context.Context, h restic.Handle) (bi restic.FileInf be.sem.ReleaseToken() if err != nil { - debug.Log("GetObjectAttributes() err %v", err) return restic.FileInfo{}, errors.Wrap(err, "service.Objects.Get") } @@ -325,15 +315,12 @@ func (be *Backend) Remove(ctx context.Context, h restic.Handle) error { err = nil } - debug.Log("Remove(%v) at %v -> err %v", h, objName, err) return errors.Wrap(err, "client.RemoveObject") } // List runs fn for each file in the backend which has the type t. When an // error occurs (or fn returns an error), List stops and returns it. func (be *Backend) List(ctx context.Context, t restic.FileType, fn func(restic.FileInfo) error) error { - debug.Log("listing %v", t) - prefix, _ := be.Basedir(t) // make sure prefix ends with a slash diff --git a/internal/backend/local/local.go b/internal/backend/local/local.go index f514647a6..a1f3c6091 100644 --- a/internal/backend/local/local.go +++ b/internal/backend/local/local.go @@ -114,7 +114,6 @@ func (b *Local) IsNotExist(err error) bool { // Save stores data in the backend at the handle. func (b *Local) Save(ctx context.Context, h restic.Handle, rd restic.RewindReader) (err error) { - debug.Log("Save %v", h) if err := h.Valid(); err != nil { return backoff.Permanent(err) } @@ -217,8 +216,6 @@ func (b *Local) Load(ctx context.Context, h restic.Handle, length int, offset in } func (b *Local) openReader(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) { - debug.Log("Load %v, length %v, offset %v", h, length, offset) - b.sem.GetToken() f, err := fs.Open(b.Filename(h)) if err != nil { @@ -246,7 +243,6 @@ func (b *Local) openReader(ctx context.Context, h restic.Handle, length int, off // Stat returns information about a blob. func (b *Local) Stat(ctx context.Context, h restic.Handle) (restic.FileInfo, error) { - debug.Log("Stat %v", h) if err := h.Valid(); err != nil { return restic.FileInfo{}, backoff.Permanent(err) } @@ -264,7 +260,6 @@ func (b *Local) Stat(ctx context.Context, h restic.Handle) (restic.FileInfo, err // Remove removes the blob with the given name and type. func (b *Local) Remove(ctx context.Context, h restic.Handle) error { - debug.Log("Remove %v", h) fn := b.Filename(h) b.sem.GetToken() @@ -282,8 +277,6 @@ func (b *Local) Remove(ctx context.Context, h restic.Handle) error { // List runs fn for each file in the backend which has the type t. When an // error occurs (or fn returns an error), List stops and returns it. func (b *Local) List(ctx context.Context, t restic.FileType, fn func(restic.FileInfo) error) (err error) { - debug.Log("List %v", t) - basedir, subdirs := b.Basedir(t) if subdirs { err = visitDirs(ctx, basedir, fn) @@ -377,13 +370,11 @@ func visitFiles(ctx context.Context, dir string, fn func(restic.FileInfo) error, // Delete removes the repository and all files. func (b *Local) Delete(ctx context.Context) error { - debug.Log("Delete()") return fs.RemoveAll(b.Path) } // Close closes all open files. func (b *Local) Close() error { - debug.Log("Close()") // this does not need to do anything, all open files are closed within the // same function. return nil diff --git a/internal/backend/logger/log.go b/internal/backend/logger/log.go new file mode 100644 index 000000000..4623d8021 --- /dev/null +++ b/internal/backend/logger/log.go @@ -0,0 +1,77 @@ +package logger + +import ( + "context" + "io" + + "github.com/restic/restic/internal/debug" + "github.com/restic/restic/internal/restic" +) + +type Backend struct { + restic.Backend +} + +// statically ensure that Backend implements restic.Backend. +var _ restic.Backend = &Backend{} + +func New(be restic.Backend) *Backend { + return &Backend{Backend: be} +} + +func (be *Backend) IsNotExist(err error) bool { + isNotExist := be.Backend.IsNotExist(err) + debug.Log("IsNotExist(%T, %#v, %v)", err, err, isNotExist) + return isNotExist +} + +// Save adds new Data to the backend. +func (be *Backend) Save(ctx context.Context, h restic.Handle, rd restic.RewindReader) error { + debug.Log("Save(%v, %v)", h, rd.Length()) + err := be.Backend.Save(ctx, h, rd) + debug.Log(" save err %v", err) + return err +} + +// Remove deletes a file from the backend. +func (be *Backend) Remove(ctx context.Context, h restic.Handle) error { + debug.Log("Remove(%v)", h) + err := be.Backend.Remove(ctx, h) + debug.Log(" remove err %v", err) + return err +} + +func (be *Backend) Load(ctx context.Context, h restic.Handle, length int, offset int64, fn func(io.Reader) error) error { + debug.Log("Load(%v, length %v, offset %v)", h, length, offset) + err := be.Backend.Load(ctx, h, length, offset, fn) + debug.Log(" load err %v", err) + return err +} + +func (be *Backend) Stat(ctx context.Context, h restic.Handle) (restic.FileInfo, error) { + debug.Log("Stat(%v)", h) + fi, err := be.Backend.Stat(ctx, h) + debug.Log(" stat err %v", err) + return fi, err +} + +func (be *Backend) List(ctx context.Context, t restic.FileType, fn func(restic.FileInfo) error) error { + debug.Log("List(%v)", t) + err := be.Backend.List(ctx, t, fn) + debug.Log(" list err %v", err) + return err +} + +func (be *Backend) Delete(ctx context.Context) error { + debug.Log("Delete()") + err := be.Backend.Delete(ctx) + debug.Log(" delete err %v", err) + return err +} + +func (be *Backend) Close() error { + debug.Log("Close()") + err := be.Backend.Close() + debug.Log(" close err %v", err) + return err +} diff --git a/internal/backend/mem/mem_backend.go b/internal/backend/mem/mem_backend.go index dbdbf1c46..59e89286e 100644 --- a/internal/backend/mem/mem_backend.go +++ b/internal/backend/mem/mem_backend.go @@ -102,7 +102,6 @@ func (be *MemoryBackend) Save(ctx context.Context, h restic.Handle, rd restic.Re } be.data[h] = buf - debug.Log("saved %v bytes at %v", len(buf), h) return ctx.Err() } @@ -167,8 +166,6 @@ func (be *MemoryBackend) Stat(ctx context.Context, h restic.Handle) (restic.File h.Name = "" } - debug.Log("stat %v", h) - e, ok := be.data[h] if !ok { return restic.FileInfo{}, errNotFound diff --git a/internal/backend/rest/rest.go b/internal/backend/rest/rest.go index f4c2897b9..ad5af1629 100644 --- a/internal/backend/rest/rest.go +++ b/internal/backend/rest/rest.go @@ -212,7 +212,6 @@ func (b *Backend) Load(ctx context.Context, h restic.Handle, length int, offset } func (b *Backend) openReader(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) { - debug.Log("Load %v, length %v, offset %v", h, length, offset) if err := h.Valid(); err != nil { return nil, backoff.Permanent(err) } diff --git a/internal/backend/s3/s3.go b/internal/backend/s3/s3.go index 91643f909..872fb0441 100644 --- a/internal/backend/s3/s3.go +++ b/internal/backend/s3/s3.go @@ -169,8 +169,6 @@ func isAccessDenied(err error) bool { // IsNotExist returns true if the error is caused by a not existing file. func (be *Backend) IsNotExist(err error) bool { - debug.Log("IsNotExist(%T, %#v)", err, err) - var e minio.ErrorResponse return errors.As(err, &e) && e.Code == "NoSuchKey" } @@ -273,8 +271,6 @@ func (be *Backend) Path() string { // Save stores data in the backend at the handle. func (be *Backend) Save(ctx context.Context, h restic.Handle, rd restic.RewindReader) error { - debug.Log("Save %v", h) - if err := h.Valid(); err != nil { return backoff.Permanent(err) } @@ -294,8 +290,6 @@ func (be *Backend) Save(ctx context.Context, h restic.Handle, rd restic.RewindRe debug.Log("PutObject(%v, %v, %v)", be.cfg.Bucket, objName, rd.Length()) info, err := be.client.PutObject(ctx, be.cfg.Bucket, objName, io.NopCloser(rd), int64(rd.Length()), opts) - debug.Log("%v -> %v bytes, err %#v: %v", objName, info.Size, err, err) - // sanity check if err == nil && info.Size != rd.Length() { return errors.Errorf("wrote %d bytes instead of the expected %d bytes", info.Size, rd.Length()) @@ -311,8 +305,6 @@ func (be *Backend) Load(ctx context.Context, h restic.Handle, length int, offset } func (be *Backend) openReader(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) { - debug.Log("Load %v, length %v, offset %v from %v", h, length, offset, be.Filename(h)) - objName := be.Filename(h) opts := minio.GetObjectOptions{} @@ -345,8 +337,6 @@ func (be *Backend) openReader(ctx context.Context, h restic.Handle, length int, // Stat returns information about a blob. func (be *Backend) Stat(ctx context.Context, h restic.Handle) (bi restic.FileInfo, err error) { - debug.Log("%v", h) - objName := be.Filename(h) var obj *minio.Object @@ -355,7 +345,6 @@ func (be *Backend) Stat(ctx context.Context, h restic.Handle) (bi restic.FileInf be.sem.GetToken() obj, err = be.client.GetObject(ctx, be.cfg.Bucket, objName, opts) if err != nil { - debug.Log("GetObject() err %v", err) be.sem.ReleaseToken() return restic.FileInfo{}, errors.Wrap(err, "client.GetObject") } @@ -371,7 +360,6 @@ func (be *Backend) Stat(ctx context.Context, h restic.Handle) (bi restic.FileInf fi, err := obj.Stat() if err != nil { - debug.Log("Stat() err %v", err) return restic.FileInfo{}, errors.Wrap(err, "Stat") } @@ -386,8 +374,6 @@ func (be *Backend) Remove(ctx context.Context, h restic.Handle) error { err := be.client.RemoveObject(ctx, be.cfg.Bucket, objName, minio.RemoveObjectOptions{}) be.sem.ReleaseToken() - debug.Log("Remove(%v) at %v -> err %v", h, objName, err) - if be.IsNotExist(err) { err = nil } @@ -398,8 +384,6 @@ func (be *Backend) Remove(ctx context.Context, h restic.Handle) error { // List runs fn for each file in the backend which has the type t. When an // error occurs (or fn returns an error), List stops and returns it. func (be *Backend) List(ctx context.Context, t restic.FileType, fn func(restic.FileInfo) error) error { - debug.Log("listing %v", t) - prefix, recursive := be.Basedir(t) // make sure prefix ends with a slash diff --git a/internal/backend/sftp/sftp.go b/internal/backend/sftp/sftp.go index 5d5aa90d0..afe3fc394 100644 --- a/internal/backend/sftp/sftp.go +++ b/internal/backend/sftp/sftp.go @@ -304,7 +304,6 @@ func tempSuffix() string { // Save stores data in the backend at the handle. func (r *SFTP) Save(ctx context.Context, h restic.Handle, rd restic.RewindReader) error { - debug.Log("Save %v", h) if err := r.clientError(); err != nil { return err } @@ -429,8 +428,6 @@ func (wr *wrapReader) Close() error { } func (r *SFTP) openReader(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) { - debug.Log("Load %v, length %v, offset %v", h, length, offset) - r.sem.GetToken() f, err := r.c.Open(r.Filename(h)) if err != nil { @@ -467,7 +464,6 @@ func (r *SFTP) openReader(ctx context.Context, h restic.Handle, length int, offs // Stat returns information about a blob. func (r *SFTP) Stat(ctx context.Context, h restic.Handle) (restic.FileInfo, error) { - debug.Log("Stat(%v)", h) if err := r.clientError(); err != nil { return restic.FileInfo{}, err } @@ -489,7 +485,6 @@ func (r *SFTP) Stat(ctx context.Context, h restic.Handle) (restic.FileInfo, erro // Remove removes the content stored at name. func (r *SFTP) Remove(ctx context.Context, h restic.Handle) error { - debug.Log("Remove(%v)", h) if err := r.clientError(); err != nil { return err } @@ -503,8 +498,6 @@ func (r *SFTP) Remove(ctx context.Context, h restic.Handle) error { // List runs fn for each file in the backend which has the type t. When an // error occurs (or fn returns an error), List stops and returns it. func (r *SFTP) List(ctx context.Context, t restic.FileType, fn func(restic.FileInfo) error) error { - debug.Log("List %v", t) - basedir, subdirs := r.Basedir(t) walker := r.c.Walk(basedir) for { @@ -565,7 +558,6 @@ var closeTimeout = 2 * time.Second // Close closes the sftp connection and terminates the underlying command. func (r *SFTP) Close() error { - debug.Log("Close") if r == nil { return nil } diff --git a/internal/backend/swift/swift.go b/internal/backend/swift/swift.go index 8685b336f..99940df5c 100644 --- a/internal/backend/swift/swift.go +++ b/internal/backend/swift/swift.go @@ -143,7 +143,6 @@ func (be *beSwift) Load(ctx context.Context, h restic.Handle, length int, offset } func (be *beSwift) openReader(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) { - debug.Log("Load %v, length %v, offset %v", h, length, offset) objName := be.Filename(h) @@ -163,7 +162,6 @@ func (be *beSwift) openReader(ctx context.Context, h restic.Handle, length int, be.sem.GetToken() obj, _, err := be.conn.ObjectOpen(ctx, be.container, objName, false, headers) if err != nil { - debug.Log(" err %v", err) be.sem.ReleaseToken() return nil, errors.Wrap(err, "conn.ObjectOpen") } @@ -179,8 +177,6 @@ func (be *beSwift) Save(ctx context.Context, h restic.Handle, rd restic.RewindRe objName := be.Filename(h) - debug.Log("Save %v at %v", h, objName) - be.sem.GetToken() defer be.sem.ReleaseToken() @@ -192,15 +188,12 @@ func (be *beSwift) Save(ctx context.Context, h restic.Handle, rd restic.RewindRe be.container, objName, rd, true, hex.EncodeToString(rd.Hash()), encoding, hdr) // swift does not return the upload length - debug.Log("%v, err %#v", objName, err) return errors.Wrap(err, "client.PutObject") } // Stat returns information about a blob. func (be *beSwift) Stat(ctx context.Context, h restic.Handle) (bi restic.FileInfo, err error) { - debug.Log("%v", h) - objName := be.Filename(h) be.sem.GetToken() @@ -208,7 +201,6 @@ func (be *beSwift) Stat(ctx context.Context, h restic.Handle) (bi restic.FileInf obj, _, err := be.conn.Object(ctx, be.container, objName) if err != nil { - debug.Log("Object() err %v", err) return restic.FileInfo{}, errors.Wrap(err, "conn.Object") } @@ -223,15 +215,12 @@ func (be *beSwift) Remove(ctx context.Context, h restic.Handle) error { defer be.sem.ReleaseToken() err := be.conn.ObjectDelete(ctx, be.container, objName) - debug.Log("Remove(%v) -> err %v", h, err) return errors.Wrap(err, "conn.ObjectDelete") } // List runs fn for each file in the backend which has the type t. When an // error occurs (or fn returns an error), List stops and returns it. func (be *beSwift) List(ctx context.Context, t restic.FileType, fn func(restic.FileInfo) error) error { - debug.Log("listing %v", t) - prefix, _ := be.Basedir(t) prefix += "/" From 8b5ab5b59fbb0f17853e1bb303d851ffcdf987ef Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Fri, 7 Apr 2023 22:13:32 +0200 Subject: [PATCH 03/13] dryrun: fix outdated comments --- internal/backend/dryrun/dry_backend.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/internal/backend/dryrun/dry_backend.go b/internal/backend/dryrun/dry_backend.go index 1218e9819..487e2bc33 100644 --- a/internal/backend/dryrun/dry_backend.go +++ b/internal/backend/dryrun/dry_backend.go @@ -18,10 +18,9 @@ type Backend struct { b restic.Backend } -// statically ensure that RetryBackend implements restic.Backend. +// statically ensure that Backend implements restic.Backend. var _ restic.Backend = &Backend{} -// New returns a new backend that saves all data in a map in memory. func New(be restic.Backend) *Backend { b := &Backend{b: be} debug.Log("created new dry backend") From 8e1e3844aa6a1559449c5a5b2369b522817eb109 Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Fri, 7 Apr 2023 23:02:35 +0200 Subject: [PATCH 04/13] backend: factor out connection limiting and parameter validation The SemaphoreBackend now uniformly enforces the limit of concurrent backend operations. In addition, it unifies the parameter validation. The List() methods no longer uses a semaphore. Restic already never runs multiple list operations in parallel. By managing the semaphore in a wrapper backend, the sections that hold a semaphore token grow slightly. However, the main bottleneck is IO, so this shouldn't make much of a difference. The key insight that enables the SemaphoreBackend is that all of the complex semaphore handling in `openReader()` still happens within the original call to `Load()`. Thus, getting and releasing the semaphore tokens can be refactored to happen directly in `Load()`. This eliminates the need for wrapping the reader in `openReader()` to release the token. --- cmd/restic/global.go | 7 ++- internal/backend/azure/azure.go | 26 +-------- internal/backend/b2/b2.go | 51 ++--------------- internal/backend/gs/gs.go | 35 ++---------- internal/backend/local/local.go | 34 +---------- internal/backend/mem/mem_backend.go | 40 +------------ internal/backend/rest/rest.go | 44 --------------- internal/backend/s3/s3.go | 31 ++-------- internal/backend/sema/backend.go | 87 +++++++++++++++++++++++++++++ internal/backend/sema/semaphore.go | 58 ++++--------------- internal/backend/sftp/sftp.go | 57 +------------------ internal/backend/swift/swift.go | 29 +--------- internal/backend/utils.go | 11 +--- 13 files changed, 126 insertions(+), 384 deletions(-) create mode 100644 internal/backend/sema/backend.go diff --git a/cmd/restic/global.go b/cmd/restic/global.go index 41f97b5df..8d34f8ddb 100644 --- a/cmd/restic/global.go +++ b/cmd/restic/global.go @@ -25,6 +25,7 @@ import ( "github.com/restic/restic/internal/backend/rest" "github.com/restic/restic/internal/backend/retry" "github.com/restic/restic/internal/backend/s3" + "github.com/restic/restic/internal/backend/sema" "github.com/restic/restic/internal/backend/sftp" "github.com/restic/restic/internal/backend/swift" "github.com/restic/restic/internal/cache" @@ -744,8 +745,8 @@ func open(ctx context.Context, s string, gopts GlobalOptions, opts options.Optio return nil, errors.Fatalf("unable to open repository at %v: %v", location.StripPassword(s), err) } - // wrap with debug logging - be = logger.New(be) + // wrap with debug logging and connection limiting + be = logger.New(sema.New(be)) // wrap backend if a test specified an inner hook if gopts.backendInnerTestHook != nil { @@ -820,5 +821,5 @@ func create(ctx context.Context, s string, opts options.Options) (restic.Backend return nil, err } - return logger.New(be), nil + return logger.New(sema.New(be)), nil } diff --git a/internal/backend/azure/azure.go b/internal/backend/azure/azure.go index 4d7a4a57b..82d55960f 100644 --- a/internal/backend/azure/azure.go +++ b/internal/backend/azure/azure.go @@ -14,7 +14,6 @@ import ( "github.com/restic/restic/internal/backend" "github.com/restic/restic/internal/backend/layout" - "github.com/restic/restic/internal/backend/sema" "github.com/restic/restic/internal/debug" "github.com/restic/restic/internal/errors" "github.com/restic/restic/internal/restic" @@ -26,7 +25,6 @@ import ( "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/bloberror" "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blockblob" azContainer "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container" - "github.com/cenkalti/backoff/v4" ) // Backend stores data on an azure endpoint. @@ -34,7 +32,6 @@ type Backend struct { cfg Config container *azContainer.Client connections uint - sem sema.Semaphore prefix string listMaxItems int layout.Layout @@ -96,16 +93,10 @@ func open(cfg Config, rt http.RoundTripper) (*Backend, error) { return nil, errors.New("no azure authentication information found") } - sem, err := sema.New(cfg.Connections) - if err != nil { - return nil, err - } - be := &Backend{ container: client, cfg: cfg, connections: cfg.Connections, - sem: sem, Layout: &layout.DefaultLayout{ Path: cfg.Prefix, Join: path.Join, @@ -186,14 +177,8 @@ func (be *Backend) Path() string { // Save stores data in the backend at the handle. func (be *Backend) Save(ctx context.Context, h restic.Handle, rd restic.RewindReader) error { - if err := h.Valid(); err != nil { - return backoff.Permanent(err) - } - objName := be.Filename(h) - be.sem.GetToken() - debug.Log("InsertObject(%v, %v)", be.cfg.AccountName, objName) var err error @@ -205,7 +190,6 @@ func (be *Backend) Save(ctx context.Context, h restic.Handle, rd restic.RewindRe err = be.saveLarge(ctx, objName, rd) } - be.sem.ReleaseToken() return err } @@ -297,7 +281,6 @@ func (be *Backend) openReader(ctx context.Context, h restic.Handle, length int, objName := be.Filename(h) blockBlobClient := be.container.NewBlobClient(objName) - be.sem.GetToken() resp, err := blockBlobClient.DownloadStream(ctx, &blob.DownloadStreamOptions{ Range: azblob.HTTPRange{ Offset: offset, @@ -306,11 +289,10 @@ func (be *Backend) openReader(ctx context.Context, h restic.Handle, length int, }) if err != nil { - be.sem.ReleaseToken() return nil, err } - return be.sem.ReleaseTokenOnClose(resp.Body, nil), err + return resp.Body, err } // Stat returns information about a blob. @@ -318,9 +300,7 @@ func (be *Backend) Stat(ctx context.Context, h restic.Handle) (restic.FileInfo, objName := be.Filename(h) blobClient := be.container.NewBlobClient(objName) - be.sem.GetToken() props, err := blobClient.GetProperties(ctx, nil) - be.sem.ReleaseToken() if err != nil { return restic.FileInfo{}, errors.Wrap(err, "blob.GetProperties") @@ -338,9 +318,7 @@ func (be *Backend) Remove(ctx context.Context, h restic.Handle) error { objName := be.Filename(h) blob := be.container.NewBlobClient(objName) - be.sem.GetToken() _, err := blob.Delete(ctx, &azblob.DeleteBlobOptions{}) - be.sem.ReleaseToken() if be.IsNotExist(err) { return nil @@ -368,9 +346,7 @@ func (be *Backend) List(ctx context.Context, t restic.FileType, fn func(restic.F lister := be.container.NewListBlobsFlatPager(opts) for lister.More() { - be.sem.GetToken() resp, err := lister.NextPage(ctx) - be.sem.ReleaseToken() if err != nil { return err diff --git a/internal/backend/b2/b2.go b/internal/backend/b2/b2.go index 10f1a715b..0827f727b 100644 --- a/internal/backend/b2/b2.go +++ b/internal/backend/b2/b2.go @@ -11,12 +11,10 @@ import ( "github.com/restic/restic/internal/backend" "github.com/restic/restic/internal/backend/layout" - "github.com/restic/restic/internal/backend/sema" "github.com/restic/restic/internal/debug" "github.com/restic/restic/internal/errors" "github.com/restic/restic/internal/restic" - "github.com/cenkalti/backoff/v4" "github.com/kurin/blazer/b2" "github.com/kurin/blazer/base" ) @@ -28,7 +26,6 @@ type b2Backend struct { cfg Config listMaxItems int layout.Layout - sem sema.Semaphore canDelete bool } @@ -92,11 +89,6 @@ func Open(ctx context.Context, cfg Config, rt http.RoundTripper) (restic.Backend return nil, errors.Wrap(err, "Bucket") } - sem, err := sema.New(cfg.Connections) - if err != nil { - return nil, err - } - be := &b2Backend{ client: client, bucket: bucket, @@ -106,7 +98,6 @@ func Open(ctx context.Context, cfg Config, rt http.RoundTripper) (restic.Backend Path: cfg.Prefix, }, listMaxItems: defaultListMaxItems, - sem: sem, canDelete: true, } @@ -134,11 +125,6 @@ func Create(ctx context.Context, cfg Config, rt http.RoundTripper) (restic.Backe return nil, errors.Wrap(err, "NewBucket") } - sem, err := sema.New(cfg.Connections) - if err != nil { - return nil, err - } - be := &b2Backend{ client: client, bucket: bucket, @@ -148,7 +134,6 @@ func Create(ctx context.Context, cfg Config, rt http.RoundTripper) (restic.Backe Path: cfg.Prefix, }, listMaxItems: defaultListMaxItems, - sem: sem, } _, err = be.Stat(ctx, restic.Handle{Type: restic.ConfigFile}) @@ -202,20 +187,18 @@ func (be *b2Backend) IsNotExist(err error) bool { // Load runs fn with a reader that yields the contents of the file at h at the // given offset. func (be *b2Backend) Load(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + return backend.DefaultLoad(ctx, h, length, offset, be.openReader, fn) } func (be *b2Backend) openReader(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) { - ctx, cancel := context.WithCancel(ctx) - - be.sem.GetToken() - name := be.Layout.Filename(h) obj := be.bucket.Object(name) if offset == 0 && length == 0 { - rd := obj.NewReader(ctx) - return be.sem.ReleaseTokenOnClose(rd, cancel), nil + return obj.NewReader(ctx), nil } // pass a negative length to NewRangeReader so that the remainder of the @@ -224,8 +207,7 @@ func (be *b2Backend) openReader(ctx context.Context, h restic.Handle, length int length = -1 } - rd := obj.NewRangeReader(ctx, offset, int64(length)) - return be.sem.ReleaseTokenOnClose(rd, cancel), nil + return obj.NewRangeReader(ctx, offset, int64(length)), nil } // Save stores data in the backend at the handle. @@ -233,15 +215,7 @@ func (be *b2Backend) Save(ctx context.Context, h restic.Handle, rd restic.Rewind ctx, cancel := context.WithCancel(ctx) defer cancel() - if err := h.Valid(); err != nil { - return backoff.Permanent(err) - } - - be.sem.GetToken() - defer be.sem.ReleaseToken() - name := be.Filename(h) - debug.Log("Save %v, name %v", h, name) obj := be.bucket.Object(name) // b2 always requires sha1 checksums for uploaded file parts @@ -262,9 +236,6 @@ func (be *b2Backend) Save(ctx context.Context, h restic.Handle, rd restic.Rewind // Stat returns information about a blob. func (be *b2Backend) Stat(ctx context.Context, h restic.Handle) (bi restic.FileInfo, err error) { - be.sem.GetToken() - defer be.sem.ReleaseToken() - name := be.Filename(h) obj := be.bucket.Object(name) info, err := obj.Attrs(ctx) @@ -276,9 +247,6 @@ func (be *b2Backend) Stat(ctx context.Context, h restic.Handle) (bi restic.FileI // Remove removes the blob with the given name and type. func (be *b2Backend) Remove(ctx context.Context, h restic.Handle) error { - be.sem.GetToken() - defer be.sem.ReleaseToken() - // the retry backend will also repeat the remove method up to 10 times for i := 0; i < 3; i++ { obj := be.bucket.Object(be.Filename(h)) @@ -313,20 +281,13 @@ func (be *b2Backend) Remove(ctx context.Context, h restic.Handle) error { return errors.New("failed to delete all file versions") } -type semLocker struct { - sema.Semaphore -} - -func (sm *semLocker) Lock() { sm.GetToken() } -func (sm *semLocker) Unlock() { sm.ReleaseToken() } - // List returns a channel that yields all names of blobs of type t. func (be *b2Backend) List(ctx context.Context, t restic.FileType, fn func(restic.FileInfo) error) error { ctx, cancel := context.WithCancel(ctx) defer cancel() prefix, _ := be.Basedir(t) - iter := be.bucket.List(ctx, b2.ListPrefix(prefix), b2.ListPageSize(be.listMaxItems), b2.ListLocker(&semLocker{be.sem})) + iter := be.bucket.List(ctx, b2.ListPrefix(prefix), b2.ListPageSize(be.listMaxItems)) for iter.Next() { obj := iter.Object() diff --git a/internal/backend/gs/gs.go b/internal/backend/gs/gs.go index faf8b9858..12458a79c 100644 --- a/internal/backend/gs/gs.go +++ b/internal/backend/gs/gs.go @@ -15,7 +15,6 @@ import ( "github.com/pkg/errors" "github.com/restic/restic/internal/backend" "github.com/restic/restic/internal/backend/layout" - "github.com/restic/restic/internal/backend/sema" "github.com/restic/restic/internal/debug" "github.com/restic/restic/internal/restic" @@ -37,7 +36,6 @@ type Backend struct { gcsClient *storage.Client projectID string connections uint - sem sema.Semaphore bucketName string bucket *storage.BucketHandle prefix string @@ -99,16 +97,10 @@ func open(cfg Config, rt http.RoundTripper) (*Backend, error) { return nil, errors.Wrap(err, "getStorageClient") } - sem, err := sema.New(cfg.Connections) - if err != nil { - return nil, err - } - be := &Backend{ gcsClient: gcsClient, projectID: cfg.ProjectID, connections: cfg.Connections, - sem: sem, bucketName: cfg.Bucket, bucket: gcsClient.Bucket(cfg.Bucket), prefix: cfg.Prefix, @@ -203,16 +195,8 @@ func (be *Backend) Path() string { // Save stores data in the backend at the handle. func (be *Backend) Save(ctx context.Context, h restic.Handle, rd restic.RewindReader) error { - if err := h.Valid(); err != nil { - return err - } - objName := be.Filename(h) - be.sem.GetToken() - - debug.Log("InsertObject(%v, %v)", be.bucketName, objName) - // Set chunk size to zero to disable resumable uploads. // // With a non-zero chunk size (the default is @@ -247,8 +231,6 @@ func (be *Backend) Save(ctx context.Context, h restic.Handle, rd restic.RewindRe err = cerr } - be.sem.ReleaseToken() - if err != nil { return errors.Wrap(err, "service.Objects.Insert") } @@ -263,6 +245,9 @@ func (be *Backend) Save(ctx context.Context, h restic.Handle, rd restic.RewindRe // Load runs fn with a reader that yields the contents of the file at h at the // given offset. func (be *Backend) Load(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + return backend.DefaultLoad(ctx, h, length, offset, be.openReader, fn) } @@ -274,27 +259,19 @@ func (be *Backend) openReader(ctx context.Context, h restic.Handle, length int, objName := be.Filename(h) - be.sem.GetToken() - - ctx, cancel := context.WithCancel(ctx) - r, err := be.bucket.Object(objName).NewRangeReader(ctx, offset, int64(length)) if err != nil { - cancel() - be.sem.ReleaseToken() return nil, err } - return be.sem.ReleaseTokenOnClose(r, cancel), err + return r, err } // Stat returns information about a blob. func (be *Backend) Stat(ctx context.Context, h restic.Handle) (bi restic.FileInfo, err error) { objName := be.Filename(h) - be.sem.GetToken() attr, err := be.bucket.Object(objName).Attrs(ctx) - be.sem.ReleaseToken() if err != nil { return restic.FileInfo{}, errors.Wrap(err, "service.Objects.Get") @@ -307,9 +284,7 @@ func (be *Backend) Stat(ctx context.Context, h restic.Handle) (bi restic.FileInf func (be *Backend) Remove(ctx context.Context, h restic.Handle) error { objName := be.Filename(h) - be.sem.GetToken() err := be.bucket.Object(objName).Delete(ctx) - be.sem.ReleaseToken() if err == storage.ErrObjectNotExist { err = nil @@ -334,9 +309,7 @@ func (be *Backend) List(ctx context.Context, t restic.FileType, fn func(restic.F itr := be.bucket.Objects(ctx, &storage.Query{Prefix: prefix}) for { - be.sem.GetToken() attrs, err := itr.Next() - be.sem.ReleaseToken() if err == iterator.Done { break } diff --git a/internal/backend/local/local.go b/internal/backend/local/local.go index a1f3c6091..ca806f754 100644 --- a/internal/backend/local/local.go +++ b/internal/backend/local/local.go @@ -10,7 +10,6 @@ import ( "github.com/restic/restic/internal/backend" "github.com/restic/restic/internal/backend/layout" - "github.com/restic/restic/internal/backend/sema" "github.com/restic/restic/internal/debug" "github.com/restic/restic/internal/errors" "github.com/restic/restic/internal/fs" @@ -22,7 +21,6 @@ import ( // Local is a backend in a local directory. type Local struct { Config - sem sema.Semaphore layout.Layout backend.Modes } @@ -38,11 +36,6 @@ func open(ctx context.Context, cfg Config) (*Local, error) { return nil, err } - sem, err := sema.New(cfg.Connections) - if err != nil { - return nil, err - } - fi, err := fs.Stat(l.Filename(restic.Handle{Type: restic.ConfigFile})) m := backend.DeriveModesFromFileInfo(fi, err) debug.Log("using (%03O file, %03O dir) permissions", m.File, m.Dir) @@ -50,7 +43,6 @@ func open(ctx context.Context, cfg Config) (*Local, error) { return &Local{ Config: cfg, Layout: l, - sem: sem, Modes: m, }, nil } @@ -114,10 +106,6 @@ func (b *Local) IsNotExist(err error) bool { // Save stores data in the backend at the handle. func (b *Local) Save(ctx context.Context, h restic.Handle, rd restic.RewindReader) (err error) { - if err := h.Valid(); err != nil { - return backoff.Permanent(err) - } - finalname := b.Filename(h) dir := filepath.Dir(finalname) @@ -128,9 +116,6 @@ func (b *Local) Save(ctx context.Context, h restic.Handle, rd restic.RewindReade } }() - b.sem.GetToken() - defer b.sem.ReleaseToken() - // Create new file with a temporary name. tmpname := filepath.Base(finalname) + "-tmp-" f, err := tempFile(dir, tmpname) @@ -216,40 +201,28 @@ func (b *Local) Load(ctx context.Context, h restic.Handle, length int, offset in } func (b *Local) openReader(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) { - b.sem.GetToken() f, err := fs.Open(b.Filename(h)) if err != nil { - b.sem.ReleaseToken() return nil, err } if offset > 0 { _, err = f.Seek(offset, 0) if err != nil { - b.sem.ReleaseToken() _ = f.Close() return nil, err } } - r := b.sem.ReleaseTokenOnClose(f, nil) - if length > 0 { - return backend.LimitReadCloser(r, int64(length)), nil + return backend.LimitReadCloser(f, int64(length)), nil } - return r, nil + return f, nil } // Stat returns information about a blob. func (b *Local) Stat(ctx context.Context, h restic.Handle) (restic.FileInfo, error) { - if err := h.Valid(); err != nil { - return restic.FileInfo{}, backoff.Permanent(err) - } - - b.sem.GetToken() - defer b.sem.ReleaseToken() - fi, err := fs.Stat(b.Filename(h)) if err != nil { return restic.FileInfo{}, errors.WithStack(err) @@ -262,9 +235,6 @@ func (b *Local) Stat(ctx context.Context, h restic.Handle) (restic.FileInfo, err func (b *Local) Remove(ctx context.Context, h restic.Handle) error { fn := b.Filename(h) - b.sem.GetToken() - defer b.sem.ReleaseToken() - // reset read-only flag err := fs.Chmod(fn, 0666) if err != nil && !os.IsPermission(err) { diff --git a/internal/backend/mem/mem_backend.go b/internal/backend/mem/mem_backend.go index 59e89286e..4db4c9821 100644 --- a/internal/backend/mem/mem_backend.go +++ b/internal/backend/mem/mem_backend.go @@ -10,12 +10,9 @@ import ( "github.com/cespare/xxhash/v2" "github.com/restic/restic/internal/backend" - "github.com/restic/restic/internal/backend/sema" "github.com/restic/restic/internal/debug" "github.com/restic/restic/internal/errors" "github.com/restic/restic/internal/restic" - - "github.com/cenkalti/backoff/v4" ) type memMap map[restic.Handle][]byte @@ -32,19 +29,12 @@ const connectionCount = 2 type MemoryBackend struct { data memMap m sync.Mutex - sem sema.Semaphore } // New returns a new backend that saves all data in a map in memory. func New() *MemoryBackend { - sem, err := sema.New(connectionCount) - if err != nil { - panic(err) - } - be := &MemoryBackend{ data: make(memMap), - sem: sem, } debug.Log("created new memory backend") @@ -59,13 +49,6 @@ func (be *MemoryBackend) IsNotExist(err error) bool { // Save adds new Data to the backend. func (be *MemoryBackend) Save(ctx context.Context, h restic.Handle, rd restic.RewindReader) error { - if err := h.Valid(); err != nil { - return backoff.Permanent(err) - } - - be.sem.GetToken() - defer be.sem.ReleaseToken() - be.m.Lock() defer be.m.Unlock() @@ -113,8 +96,6 @@ func (be *MemoryBackend) Load(ctx context.Context, h restic.Handle, length int, } func (be *MemoryBackend) openReader(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) { - - be.sem.GetToken() be.m.Lock() defer be.m.Unlock() @@ -123,21 +104,12 @@ func (be *MemoryBackend) openReader(ctx context.Context, h restic.Handle, length h.Name = "" } - debug.Log("Load %v offset %v len %v", h, offset, length) - - if offset < 0 { - be.sem.ReleaseToken() - return nil, errors.New("offset is negative") - } - if _, ok := be.data[h]; !ok { - be.sem.ReleaseToken() return nil, errNotFound } buf := be.data[h] if offset > int64(len(buf)) { - be.sem.ReleaseToken() return nil, errors.New("offset beyond end of file") } @@ -146,18 +118,11 @@ func (be *MemoryBackend) openReader(ctx context.Context, h restic.Handle, length buf = buf[:length] } - return be.sem.ReleaseTokenOnClose(io.NopCloser(bytes.NewReader(buf)), nil), ctx.Err() + return io.NopCloser(bytes.NewReader(buf)), ctx.Err() } // Stat returns information about a file in the backend. func (be *MemoryBackend) Stat(ctx context.Context, h restic.Handle) (restic.FileInfo, error) { - if err := h.Valid(); err != nil { - return restic.FileInfo{}, backoff.Permanent(err) - } - - be.sem.GetToken() - defer be.sem.ReleaseToken() - be.m.Lock() defer be.m.Unlock() @@ -176,9 +141,6 @@ func (be *MemoryBackend) Stat(ctx context.Context, h restic.Handle) (restic.File // Remove deletes a file from the backend. func (be *MemoryBackend) Remove(ctx context.Context, h restic.Handle) error { - be.sem.GetToken() - defer be.sem.ReleaseToken() - be.m.Lock() defer be.m.Unlock() diff --git a/internal/backend/rest/rest.go b/internal/backend/rest/rest.go index ad5af1629..a88e26daa 100644 --- a/internal/backend/rest/rest.go +++ b/internal/backend/rest/rest.go @@ -12,12 +12,9 @@ import ( "strings" "github.com/restic/restic/internal/backend/layout" - "github.com/restic/restic/internal/backend/sema" "github.com/restic/restic/internal/debug" "github.com/restic/restic/internal/errors" "github.com/restic/restic/internal/restic" - - "github.com/cenkalti/backoff/v4" ) // make sure the rest backend implements restic.Backend @@ -27,7 +24,6 @@ var _ restic.Backend = &Backend{} type Backend struct { url *url.URL connections uint - sem sema.Semaphore client http.Client layout.Layout } @@ -40,11 +36,6 @@ const ( // Open opens the REST backend with the given config. func Open(cfg Config, rt http.RoundTripper) (*Backend, error) { - sem, err := sema.New(cfg.Connections) - if err != nil { - return nil, err - } - // use url without trailing slash for layout url := cfg.URL.String() if url[len(url)-1] == '/' { @@ -56,7 +47,6 @@ func Open(cfg Config, rt http.RoundTripper) (*Backend, error) { client: http.Client{Transport: rt}, Layout: &layout.RESTLayout{URL: url, Join: path.Join}, connections: cfg.Connections, - sem: sem, } return be, nil @@ -123,10 +113,6 @@ func (b *Backend) HasAtomicReplace() bool { // Save stores data in the backend at the handle. func (b *Backend) Save(ctx context.Context, h restic.Handle, rd restic.RewindReader) error { - if err := h.Valid(); err != nil { - return backoff.Permanent(err) - } - ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -143,9 +129,7 @@ func (b *Backend) Save(ctx context.Context, h restic.Handle, rd restic.RewindRea // let's the server know what's coming. req.ContentLength = rd.Length() - b.sem.GetToken() resp, err := b.client.Do(req) - b.sem.ReleaseToken() var cerr error if resp != nil { @@ -212,18 +196,6 @@ func (b *Backend) Load(ctx context.Context, h restic.Handle, length int, offset } func (b *Backend) openReader(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) { - if err := h.Valid(); err != nil { - return nil, backoff.Permanent(err) - } - - if offset < 0 { - return nil, errors.New("offset is negative") - } - - if length < 0 { - return nil, errors.Errorf("invalid length %d", length) - } - req, err := http.NewRequestWithContext(ctx, "GET", b.Filename(h), nil) if err != nil { return nil, errors.WithStack(err) @@ -237,9 +209,7 @@ func (b *Backend) openReader(ctx context.Context, h restic.Handle, length int, o req.Header.Set("Accept", ContentTypeV2) debug.Log("Load(%v) send range %v", h, byteRange) - b.sem.GetToken() resp, err := b.client.Do(req) - b.sem.ReleaseToken() if err != nil { if resp != nil { @@ -264,19 +234,13 @@ func (b *Backend) openReader(ctx context.Context, h restic.Handle, length int, o // Stat returns information about a blob. func (b *Backend) Stat(ctx context.Context, h restic.Handle) (restic.FileInfo, error) { - if err := h.Valid(); err != nil { - return restic.FileInfo{}, backoff.Permanent(err) - } - req, err := http.NewRequestWithContext(ctx, http.MethodHead, b.Filename(h), nil) if err != nil { return restic.FileInfo{}, errors.WithStack(err) } req.Header.Set("Accept", ContentTypeV2) - b.sem.GetToken() resp, err := b.client.Do(req) - b.sem.ReleaseToken() if err != nil { return restic.FileInfo{}, errors.WithStack(err) } @@ -309,19 +273,13 @@ func (b *Backend) Stat(ctx context.Context, h restic.Handle) (restic.FileInfo, e // Remove removes the blob with the given name and type. func (b *Backend) Remove(ctx context.Context, h restic.Handle) error { - if err := h.Valid(); err != nil { - return backoff.Permanent(err) - } - req, err := http.NewRequestWithContext(ctx, "DELETE", b.Filename(h), nil) if err != nil { return errors.WithStack(err) } req.Header.Set("Accept", ContentTypeV2) - b.sem.GetToken() resp, err := b.client.Do(req) - b.sem.ReleaseToken() if err != nil { return errors.Wrap(err, "client.Do") @@ -358,9 +316,7 @@ func (b *Backend) List(ctx context.Context, t restic.FileType, fn func(restic.Fi } req.Header.Set("Accept", ContentTypeV2) - b.sem.GetToken() resp, err := b.client.Do(req) - b.sem.ReleaseToken() if err != nil { return errors.Wrap(err, "List") diff --git a/internal/backend/s3/s3.go b/internal/backend/s3/s3.go index 872fb0441..79c6453b9 100644 --- a/internal/backend/s3/s3.go +++ b/internal/backend/s3/s3.go @@ -13,12 +13,10 @@ import ( "github.com/restic/restic/internal/backend" "github.com/restic/restic/internal/backend/layout" - "github.com/restic/restic/internal/backend/sema" "github.com/restic/restic/internal/debug" "github.com/restic/restic/internal/errors" "github.com/restic/restic/internal/restic" - "github.com/cenkalti/backoff/v4" "github.com/minio/minio-go/v7" "github.com/minio/minio-go/v7/pkg/credentials" ) @@ -26,7 +24,6 @@ import ( // Backend stores data on an S3 endpoint. type Backend struct { client *minio.Client - sem sema.Semaphore cfg Config layout.Layout } @@ -102,14 +99,8 @@ func open(ctx context.Context, cfg Config, rt http.RoundTripper) (*Backend, erro return nil, errors.Wrap(err, "minio.New") } - sem, err := sema.New(cfg.Connections) - if err != nil { - return nil, err - } - be := &Backend{ client: client, - sem: sem, cfg: cfg, } @@ -271,15 +262,8 @@ func (be *Backend) Path() string { // Save stores data in the backend at the handle. func (be *Backend) Save(ctx context.Context, h restic.Handle, rd restic.RewindReader) error { - if err := h.Valid(); err != nil { - return backoff.Permanent(err) - } - objName := be.Filename(h) - be.sem.GetToken() - defer be.sem.ReleaseToken() - opts := minio.PutObjectOptions{StorageClass: be.cfg.StorageClass} opts.ContentType = "application/octet-stream" // the only option with the high-level api is to let the library handle the checksum computation @@ -301,6 +285,9 @@ func (be *Backend) Save(ctx context.Context, h restic.Handle, rd restic.RewindRe // Load runs fn with a reader that yields the contents of the file at h at the // given offset. func (be *Backend) Load(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + return backend.DefaultLoad(ctx, h, length, offset, be.openReader, fn) } @@ -321,18 +308,13 @@ func (be *Backend) openReader(ctx context.Context, h restic.Handle, length int, return nil, errors.Wrap(err, "SetRange") } - be.sem.GetToken() - ctx, cancel := context.WithCancel(ctx) - coreClient := minio.Core{Client: be.client} rd, _, _, err := coreClient.GetObject(ctx, be.cfg.Bucket, objName, opts) if err != nil { - cancel() - be.sem.ReleaseToken() return nil, err } - return be.sem.ReleaseTokenOnClose(rd, cancel), err + return rd, err } // Stat returns information about a blob. @@ -342,17 +324,14 @@ func (be *Backend) Stat(ctx context.Context, h restic.Handle) (bi restic.FileInf opts := minio.GetObjectOptions{} - be.sem.GetToken() obj, err = be.client.GetObject(ctx, be.cfg.Bucket, objName, opts) if err != nil { - be.sem.ReleaseToken() return restic.FileInfo{}, errors.Wrap(err, "client.GetObject") } // make sure that the object is closed properly. defer func() { e := obj.Close() - be.sem.ReleaseToken() if err == nil { err = errors.Wrap(e, "Close") } @@ -370,9 +349,7 @@ func (be *Backend) Stat(ctx context.Context, h restic.Handle) (bi restic.FileInf func (be *Backend) Remove(ctx context.Context, h restic.Handle) error { objName := be.Filename(h) - be.sem.GetToken() err := be.client.RemoveObject(ctx, be.cfg.Bucket, objName, minio.RemoveObjectOptions{}) - be.sem.ReleaseToken() if be.IsNotExist(err) { err = nil diff --git a/internal/backend/sema/backend.go b/internal/backend/sema/backend.go new file mode 100644 index 000000000..4b6f55b50 --- /dev/null +++ b/internal/backend/sema/backend.go @@ -0,0 +1,87 @@ +package sema + +import ( + "context" + "io" + + "github.com/cenkalti/backoff/v4" + "github.com/restic/restic/internal/errors" + "github.com/restic/restic/internal/restic" +) + +// make sure that SemaphoreBackend implements restic.Backend +var _ restic.Backend = &SemaphoreBackend{} + +// SemaphoreBackend limits the number of concurrent operations. +type SemaphoreBackend struct { + restic.Backend + sem semaphore +} + +// New creates a backend that limits the concurrent operations on the underlying backend +func New(be restic.Backend) *SemaphoreBackend { + sem, err := newSemaphore(be.Connections()) + if err != nil { + panic(err) + } + + return &SemaphoreBackend{ + Backend: be, + sem: sem, + } +} + +// Save adds new Data to the backend. +func (be *SemaphoreBackend) Save(ctx context.Context, h restic.Handle, rd restic.RewindReader) error { + if err := h.Valid(); err != nil { + return backoff.Permanent(err) + } + + be.sem.GetToken() + defer be.sem.ReleaseToken() + + return be.Backend.Save(ctx, h, rd) +} + +// Load runs fn with a reader that yields the contents of the file at h at the +// given offset. +func (be *SemaphoreBackend) Load(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error { + if err := h.Valid(); err != nil { + return backoff.Permanent(err) + } + if offset < 0 { + return backoff.Permanent(errors.New("offset is negative")) + } + if length < 0 { + return backoff.Permanent(errors.Errorf("invalid length %d", length)) + } + + be.sem.GetToken() + defer be.sem.ReleaseToken() + + return be.Backend.Load(ctx, h, length, offset, fn) +} + +// Stat returns information about a file in the backend. +func (be *SemaphoreBackend) Stat(ctx context.Context, h restic.Handle) (restic.FileInfo, error) { + if err := h.Valid(); err != nil { + return restic.FileInfo{}, backoff.Permanent(err) + } + + be.sem.GetToken() + defer be.sem.ReleaseToken() + + return be.Backend.Stat(ctx, h) +} + +// Remove deletes a file from the backend. +func (be *SemaphoreBackend) Remove(ctx context.Context, h restic.Handle) error { + if err := h.Valid(); err != nil { + return backoff.Permanent(err) + } + + be.sem.GetToken() + defer be.sem.ReleaseToken() + + return be.Backend.Remove(ctx, h) +} diff --git a/internal/backend/sema/semaphore.go b/internal/backend/sema/semaphore.go index 7ee912979..c664eef7c 100644 --- a/internal/backend/sema/semaphore.go +++ b/internal/backend/sema/semaphore.go @@ -2,64 +2,30 @@ package sema import ( - "context" - "io" - + "github.com/restic/restic/internal/debug" "github.com/restic/restic/internal/errors" ) -// A Semaphore limits access to a restricted resource. -type Semaphore struct { +// A semaphore limits access to a restricted resource. +type semaphore struct { ch chan struct{} } -// New returns a new semaphore with capacity n. -func New(n uint) (Semaphore, error) { +// newSemaphore returns a new semaphore with capacity n. +func newSemaphore(n uint) (semaphore, error) { if n == 0 { - return Semaphore{}, errors.New("capacity must be a positive number") + return semaphore{}, errors.New("capacity must be a positive number") } - return Semaphore{ + return semaphore{ ch: make(chan struct{}, n), }, nil } // GetToken blocks until a Token is available. -func (s Semaphore) GetToken() { s.ch <- struct{}{} } +func (s semaphore) GetToken() { + s.ch <- struct{}{} + debug.Log("acquired token") +} // ReleaseToken returns a token. -func (s Semaphore) ReleaseToken() { <-s.ch } - -// ReleaseTokenOnClose wraps an io.ReadCloser to return a token on Close. -// Before returning the token, cancel, if not nil, will be run -// to free up context resources. -func (s Semaphore) ReleaseTokenOnClose(rc io.ReadCloser, cancel context.CancelFunc) io.ReadCloser { - return &wrapReader{ReadCloser: rc, sem: s, cancel: cancel} -} - -type wrapReader struct { - io.ReadCloser - eofSeen bool - sem Semaphore - cancel context.CancelFunc -} - -func (wr *wrapReader) Read(p []byte) (int, error) { - if wr.eofSeen { // XXX Why do we do this? - return 0, io.EOF - } - - n, err := wr.ReadCloser.Read(p) - if err == io.EOF { - wr.eofSeen = true - } - return n, err -} - -func (wr *wrapReader) Close() error { - err := wr.ReadCloser.Close() - if wr.cancel != nil { - wr.cancel() - } - wr.sem.ReleaseToken() - return err -} +func (s semaphore) ReleaseToken() { <-s.ch } diff --git a/internal/backend/sftp/sftp.go b/internal/backend/sftp/sftp.go index afe3fc394..e97a5f9c8 100644 --- a/internal/backend/sftp/sftp.go +++ b/internal/backend/sftp/sftp.go @@ -15,7 +15,6 @@ import ( "github.com/restic/restic/internal/backend" "github.com/restic/restic/internal/backend/layout" - "github.com/restic/restic/internal/backend/sema" "github.com/restic/restic/internal/debug" "github.com/restic/restic/internal/errors" "github.com/restic/restic/internal/restic" @@ -35,7 +34,6 @@ type SFTP struct { posixRename bool - sem sema.Semaphore layout.Layout Config backend.Modes @@ -140,11 +138,7 @@ func Open(ctx context.Context, cfg Config) (*SFTP, error) { } func open(ctx context.Context, sftp *SFTP, cfg Config) (*SFTP, error) { - sem, err := sema.New(cfg.Connections) - if err != nil { - return nil, err - } - + var err error sftp.Layout, err = layout.ParseLayout(ctx, sftp, cfg.Layout, defaultLayout, cfg.Path) if err != nil { return nil, err @@ -158,7 +152,6 @@ func open(ctx context.Context, sftp *SFTP, cfg Config) (*SFTP, error) { sftp.Config = cfg sftp.p = cfg.Path - sftp.sem = sem sftp.Modes = m return sftp, nil } @@ -308,17 +301,10 @@ func (r *SFTP) Save(ctx context.Context, h restic.Handle, rd restic.RewindReader return err } - if err := h.Valid(); err != nil { - return backoff.Permanent(err) - } - filename := r.Filename(h) tmpFilename := filename + "-restic-temp-" + tempSuffix() dirname := r.Dirname(h) - r.sem.GetToken() - defer r.sem.ReleaseToken() - // create new file f, err := r.c.OpenFile(tmpFilename, os.O_CREATE|os.O_EXCL|os.O_WRONLY) @@ -414,52 +400,27 @@ func (r *SFTP) Load(ctx context.Context, h restic.Handle, length int, offset int return backend.DefaultLoad(ctx, h, length, offset, r.openReader, fn) } -// wrapReader wraps an io.ReadCloser to run an additional function on Close. -type wrapReader struct { - io.ReadCloser - io.WriterTo - f func() -} - -func (wr *wrapReader) Close() error { - err := wr.ReadCloser.Close() - wr.f() - return err -} - func (r *SFTP) openReader(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) { - r.sem.GetToken() f, err := r.c.Open(r.Filename(h)) if err != nil { - r.sem.ReleaseToken() return nil, err } if offset > 0 { _, err = f.Seek(offset, 0) if err != nil { - r.sem.ReleaseToken() _ = f.Close() return nil, err } } - // use custom close wrapper to also provide WriteTo() on the wrapper - rd := &wrapReader{ - ReadCloser: f, - WriterTo: f, - f: func() { - r.sem.ReleaseToken() - }, - } - if length > 0 { // unlimited reads usually use io.Copy which needs WriteTo support at the underlying reader // limited reads are usually combined with io.ReadFull which reads all required bytes into a buffer in one go - return backend.LimitReadCloser(rd, int64(length)), nil + return backend.LimitReadCloser(f, int64(length)), nil } - return rd, nil + return f, nil } // Stat returns information about a blob. @@ -468,13 +429,6 @@ func (r *SFTP) Stat(ctx context.Context, h restic.Handle) (restic.FileInfo, erro return restic.FileInfo{}, err } - if err := h.Valid(); err != nil { - return restic.FileInfo{}, backoff.Permanent(err) - } - - r.sem.GetToken() - defer r.sem.ReleaseToken() - fi, err := r.c.Lstat(r.Filename(h)) if err != nil { return restic.FileInfo{}, errors.Wrap(err, "Lstat") @@ -489,9 +443,6 @@ func (r *SFTP) Remove(ctx context.Context, h restic.Handle) error { return err } - r.sem.GetToken() - defer r.sem.ReleaseToken() - return r.c.Remove(r.Filename(h)) } @@ -501,9 +452,7 @@ func (r *SFTP) List(ctx context.Context, t restic.FileType, fn func(restic.FileI basedir, subdirs := r.Basedir(t) walker := r.c.Walk(basedir) for { - r.sem.GetToken() ok := walker.Step() - r.sem.ReleaseToken() if !ok { break } diff --git a/internal/backend/swift/swift.go b/internal/backend/swift/swift.go index 99940df5c..dbf4ba0d1 100644 --- a/internal/backend/swift/swift.go +++ b/internal/backend/swift/swift.go @@ -15,12 +15,10 @@ import ( "github.com/restic/restic/internal/backend" "github.com/restic/restic/internal/backend/layout" - "github.com/restic/restic/internal/backend/sema" "github.com/restic/restic/internal/debug" "github.com/restic/restic/internal/errors" "github.com/restic/restic/internal/restic" - "github.com/cenkalti/backoff/v4" "github.com/ncw/swift/v2" ) @@ -28,7 +26,6 @@ import ( type beSwift struct { conn *swift.Connection connections uint - sem sema.Semaphore container string // Container name prefix string // Prefix of object names in the container layout.Layout @@ -42,11 +39,6 @@ var _ restic.Backend = &beSwift{} func Open(ctx context.Context, cfg Config, rt http.RoundTripper) (restic.Backend, error) { debug.Log("config %#v", cfg) - sem, err := sema.New(cfg.Connections) - if err != nil { - return nil, err - } - be := &beSwift{ conn: &swift.Connection{ UserName: cfg.UserName, @@ -72,7 +64,6 @@ func Open(ctx context.Context, cfg Config, rt http.RoundTripper) (restic.Backend Transport: rt, }, connections: cfg.Connections, - sem: sem, container: cfg.Container, prefix: cfg.Prefix, Layout: &layout.DefaultLayout{ @@ -159,27 +150,17 @@ func (be *beSwift) openReader(ctx context.Context, h restic.Handle, length int, debug.Log("Load(%v) send range %v", h, headers["Range"]) } - be.sem.GetToken() obj, _, err := be.conn.ObjectOpen(ctx, be.container, objName, false, headers) if err != nil { - be.sem.ReleaseToken() return nil, errors.Wrap(err, "conn.ObjectOpen") } - return be.sem.ReleaseTokenOnClose(obj, nil), nil + return obj, nil } // Save stores data in the backend at the handle. func (be *beSwift) Save(ctx context.Context, h restic.Handle, rd restic.RewindReader) error { - if err := h.Valid(); err != nil { - return backoff.Permanent(err) - } - objName := be.Filename(h) - - be.sem.GetToken() - defer be.sem.ReleaseToken() - encoding := "binary/octet-stream" debug.Log("PutObject(%v, %v, %v)", be.container, objName, encoding) @@ -196,9 +177,6 @@ func (be *beSwift) Save(ctx context.Context, h restic.Handle, rd restic.RewindRe func (be *beSwift) Stat(ctx context.Context, h restic.Handle) (bi restic.FileInfo, err error) { objName := be.Filename(h) - be.sem.GetToken() - defer be.sem.ReleaseToken() - obj, _, err := be.conn.Object(ctx, be.container, objName) if err != nil { return restic.FileInfo{}, errors.Wrap(err, "conn.Object") @@ -211,9 +189,6 @@ func (be *beSwift) Stat(ctx context.Context, h restic.Handle) (bi restic.FileInf func (be *beSwift) Remove(ctx context.Context, h restic.Handle) error { objName := be.Filename(h) - be.sem.GetToken() - defer be.sem.ReleaseToken() - err := be.conn.ObjectDelete(ctx, be.container, objName) return errors.Wrap(err, "conn.ObjectDelete") } @@ -226,9 +201,7 @@ func (be *beSwift) List(ctx context.Context, t restic.FileType, fn func(restic.F err := be.conn.ObjectsWalk(ctx, be.container, &swift.ObjectsOpts{Prefix: prefix}, func(ctx context.Context, opts *swift.ObjectsOpts) (interface{}, error) { - be.sem.GetToken() newObjects, err := be.conn.Objects(ctx, be.container, opts) - be.sem.ReleaseToken() if err != nil { return nil, errors.Wrap(err, "conn.ObjectNames") diff --git a/internal/backend/utils.go b/internal/backend/utils.go index 1c1607e04..bf8a7ad6d 100644 --- a/internal/backend/utils.go +++ b/internal/backend/utils.go @@ -6,7 +6,6 @@ import ( "fmt" "io" - "github.com/cenkalti/backoff/v4" "github.com/restic/restic/internal/debug" "github.com/restic/restic/internal/errors" "github.com/restic/restic/internal/restic" @@ -63,15 +62,7 @@ func LimitReadCloser(r io.ReadCloser, n int64) *LimitedReadCloser { func DefaultLoad(ctx context.Context, h restic.Handle, length int, offset int64, openReader func(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error), fn func(rd io.Reader) error) error { - if err := h.Valid(); err != nil { - return backoff.Permanent(err) - } - if offset < 0 { - return errors.New("offset is negative") - } - if length < 0 { - return errors.Errorf("invalid length %d", length) - } + rd, err := openReader(ctx, h, length, offset) if err != nil { return err From 803640ba4b0befb02aa4aca3700344ce10add88f Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Fri, 7 Apr 2023 23:22:00 +0200 Subject: [PATCH 05/13] backend: remove a few unnecessary debug logs --- internal/backend/mem/mem_backend.go | 2 -- internal/backend/rest/rest.go | 1 - internal/backend/s3/s3.go | 3 --- internal/backend/swift/swift.go | 5 ----- 4 files changed, 11 deletions(-) diff --git a/internal/backend/mem/mem_backend.go b/internal/backend/mem/mem_backend.go index 4db4c9821..618ef5752 100644 --- a/internal/backend/mem/mem_backend.go +++ b/internal/backend/mem/mem_backend.go @@ -144,8 +144,6 @@ func (be *MemoryBackend) Remove(ctx context.Context, h restic.Handle) error { be.m.Lock() defer be.m.Unlock() - debug.Log("Remove %v", h) - h.ContainedBlobType = restic.InvalidBlob if _, ok := be.data[h]; !ok { return errNotFound diff --git a/internal/backend/rest/rest.go b/internal/backend/rest/rest.go index a88e26daa..f9030d076 100644 --- a/internal/backend/rest/rest.go +++ b/internal/backend/rest/rest.go @@ -207,7 +207,6 @@ func (b *Backend) openReader(ctx context.Context, h restic.Handle, length int, o } req.Header.Set("Range", byteRange) req.Header.Set("Accept", ContentTypeV2) - debug.Log("Load(%v) send range %v", h, byteRange) resp, err := b.client.Do(req) diff --git a/internal/backend/s3/s3.go b/internal/backend/s3/s3.go index 79c6453b9..591ad2185 100644 --- a/internal/backend/s3/s3.go +++ b/internal/backend/s3/s3.go @@ -271,7 +271,6 @@ func (be *Backend) Save(ctx context.Context, h restic.Handle, rd restic.RewindRe // only use multipart uploads for very large files opts.PartSize = 200 * 1024 * 1024 - debug.Log("PutObject(%v, %v, %v)", be.cfg.Bucket, objName, rd.Length()) info, err := be.client.PutObject(ctx, be.cfg.Bucket, objName, io.NopCloser(rd), int64(rd.Length()), opts) // sanity check @@ -297,10 +296,8 @@ func (be *Backend) openReader(ctx context.Context, h restic.Handle, length int, var err error if length > 0 { - debug.Log("range: %v-%v", offset, offset+int64(length)-1) err = opts.SetRange(offset, offset+int64(length)-1) } else if offset > 0 { - debug.Log("range: %v-", offset) err = opts.SetRange(offset, 0) } diff --git a/internal/backend/swift/swift.go b/internal/backend/swift/swift.go index dbf4ba0d1..fcdbe6634 100644 --- a/internal/backend/swift/swift.go +++ b/internal/backend/swift/swift.go @@ -146,10 +146,6 @@ func (be *beSwift) openReader(ctx context.Context, h restic.Handle, length int, headers["Range"] = fmt.Sprintf("bytes=%d-%d", offset, offset+int64(length)-1) } - if _, ok := headers["Range"]; ok { - debug.Log("Load(%v) send range %v", h, headers["Range"]) - } - obj, _, err := be.conn.ObjectOpen(ctx, be.container, objName, false, headers) if err != nil { return nil, errors.Wrap(err, "conn.ObjectOpen") @@ -163,7 +159,6 @@ func (be *beSwift) Save(ctx context.Context, h restic.Handle, rd restic.RewindRe objName := be.Filename(h) encoding := "binary/octet-stream" - debug.Log("PutObject(%v, %v, %v)", be.container, objName, encoding) hdr := swift.Headers{"Content-Length": strconv.FormatInt(rd.Length(), 10)} _, err := be.conn.ObjectPut(ctx, be.container, objName, rd, true, hex.EncodeToString(rd.Hash()), From 45244fdf683d269edb1100d28afd2f817ff5d914 Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Fri, 7 Apr 2023 23:24:14 +0200 Subject: [PATCH 06/13] backend: remove parameter validation tests These parameter validations have been factored out into SemaphoreBackend. --- internal/backend/dryrun/dry_backend_test.go | 2 -- internal/backend/test/tests.go | 17 +---------------- 2 files changed, 1 insertion(+), 18 deletions(-) diff --git a/internal/backend/dryrun/dry_backend_test.go b/internal/backend/dryrun/dry_backend_test.go index 6b8f74e0f..69716c340 100644 --- a/internal/backend/dryrun/dry_backend_test.go +++ b/internal/backend/dryrun/dry_backend_test.go @@ -40,11 +40,9 @@ func TestDry(t *testing.T) { {d, "delete", "", "", ""}, {d, "stat", "a", "", "not found"}, {d, "list", "", "", ""}, - {d, "save", "", "", "invalid"}, {m, "save", "a", "baz", ""}, // save a directly to the mem backend {d, "save", "b", "foob", ""}, // b is not saved {d, "save", "b", "xxx", ""}, // no error as b is not saved - {d, "stat", "", "", "invalid"}, {d, "stat", "a", "a 3", ""}, {d, "load", "a", "baz", ""}, {d, "load", "b", "", "not found"}, diff --git a/internal/backend/test/tests.go b/internal/backend/test/tests.go index b98af59c3..53a10f446 100644 --- a/internal/backend/test/tests.go +++ b/internal/backend/test/tests.go @@ -124,17 +124,7 @@ func (s *Suite) TestLoad(t *testing.T) { b := s.open(t) defer s.close(t, b) - noop := func(rd io.Reader) error { - return nil - } - - err := b.Load(context.TODO(), restic.Handle{}, 0, 0, noop) - if err == nil { - t.Fatalf("Load() did not return an error for invalid handle") - } - test.Assert(t, !b.IsNotExist(err), "IsNotExist() should not accept an invalid handle error: %v", err) - - err = testLoad(b, restic.Handle{Type: restic.PackFile, Name: "foobar"}, 0, 0) + err := testLoad(b, restic.Handle{Type: restic.PackFile, Name: "foobar"}, 0, 0) if err == nil { t.Fatalf("Load() did not return an error for non-existing blob") } @@ -153,11 +143,6 @@ func (s *Suite) TestLoad(t *testing.T) { t.Logf("saved %d bytes as %v", length, handle) - err = b.Load(context.TODO(), handle, 100, -1, noop) - if err == nil { - t.Fatalf("Load() returned no error for negative offset!") - } - err = b.Load(context.TODO(), handle, 0, 0, func(rd io.Reader) error { _, err := io.Copy(io.Discard, rd) if err != nil { From 05abc6d6f5766d73c079a596db0d04ab8e777942 Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Fri, 7 Apr 2023 23:56:16 +0200 Subject: [PATCH 07/13] backend: deduplicate implementation of Delete() method --- internal/backend/azure/azure.go | 23 +---------------------- internal/backend/b2/b2.go | 27 +-------------------------- internal/backend/gs/gs.go | 23 +---------------------- internal/backend/rest/rest.go | 28 ++-------------------------- internal/backend/s3/s3.go | 23 +---------------------- internal/backend/swift/swift.go | 28 +--------------------------- internal/backend/utils.go | 25 +++++++++++++++++++++++++ 7 files changed, 32 insertions(+), 145 deletions(-) diff --git a/internal/backend/azure/azure.go b/internal/backend/azure/azure.go index 82d55960f..9a3695f0f 100644 --- a/internal/backend/azure/azure.go +++ b/internal/backend/azure/azure.go @@ -384,30 +384,9 @@ func (be *Backend) List(ctx context.Context, t restic.FileType, fn func(restic.F return ctx.Err() } -// Remove keys for a specified backend type. -func (be *Backend) removeKeys(ctx context.Context, t restic.FileType) error { - return be.List(ctx, t, func(fi restic.FileInfo) error { - return be.Remove(ctx, restic.Handle{Type: t, Name: fi.Name}) - }) -} - // Delete removes all restic keys in the bucket. It will not remove the bucket itself. func (be *Backend) Delete(ctx context.Context) error { - alltypes := []restic.FileType{ - restic.PackFile, - restic.KeyFile, - restic.LockFile, - restic.SnapshotFile, - restic.IndexFile} - - for _, t := range alltypes { - err := be.removeKeys(ctx, t) - if err != nil { - return nil - } - } - - return be.Remove(ctx, restic.Handle{Type: restic.ConfigFile}) + return backend.DefaultDelete(ctx, be) } // Close does nothing diff --git a/internal/backend/b2/b2.go b/internal/backend/b2/b2.go index 0827f727b..738df198d 100644 --- a/internal/backend/b2/b2.go +++ b/internal/backend/b2/b2.go @@ -312,34 +312,9 @@ func (be *b2Backend) List(ctx context.Context, t restic.FileType, fn func(restic return nil } -// Remove keys for a specified backend type. -func (be *b2Backend) removeKeys(ctx context.Context, t restic.FileType) error { - return be.List(ctx, t, func(fi restic.FileInfo) error { - return be.Remove(ctx, restic.Handle{Type: t, Name: fi.Name}) - }) -} - // Delete removes all restic keys in the bucket. It will not remove the bucket itself. func (be *b2Backend) Delete(ctx context.Context) error { - alltypes := []restic.FileType{ - restic.PackFile, - restic.KeyFile, - restic.LockFile, - restic.SnapshotFile, - restic.IndexFile} - - for _, t := range alltypes { - err := be.removeKeys(ctx, t) - if err != nil { - return nil - } - } - err := be.Remove(ctx, restic.Handle{Type: restic.ConfigFile}) - if err != nil && be.IsNotExist(err) { - err = nil - } - - return err + return backend.DefaultDelete(ctx, be) } // Close does nothing diff --git a/internal/backend/gs/gs.go b/internal/backend/gs/gs.go index 12458a79c..de798ac92 100644 --- a/internal/backend/gs/gs.go +++ b/internal/backend/gs/gs.go @@ -339,30 +339,9 @@ func (be *Backend) List(ctx context.Context, t restic.FileType, fn func(restic.F return ctx.Err() } -// Remove keys for a specified backend type. -func (be *Backend) removeKeys(ctx context.Context, t restic.FileType) error { - return be.List(ctx, t, func(fi restic.FileInfo) error { - return be.Remove(ctx, restic.Handle{Type: t, Name: fi.Name}) - }) -} - // Delete removes all restic keys in the bucket. It will not remove the bucket itself. func (be *Backend) Delete(ctx context.Context) error { - alltypes := []restic.FileType{ - restic.PackFile, - restic.KeyFile, - restic.LockFile, - restic.SnapshotFile, - restic.IndexFile} - - for _, t := range alltypes { - err := be.removeKeys(ctx, t) - if err != nil { - return nil - } - } - - return be.Remove(ctx, restic.Handle{Type: restic.ConfigFile}) + return backend.DefaultDelete(ctx, be) } // Close does nothing. diff --git a/internal/backend/rest/rest.go b/internal/backend/rest/rest.go index f9030d076..7be5a07c7 100644 --- a/internal/backend/rest/rest.go +++ b/internal/backend/rest/rest.go @@ -11,6 +11,7 @@ import ( "path" "strings" + "github.com/restic/restic/internal/backend" "github.com/restic/restic/internal/backend/layout" "github.com/restic/restic/internal/debug" "github.com/restic/restic/internal/errors" @@ -411,32 +412,7 @@ func (b *Backend) Close() error { return nil } -// Remove keys for a specified backend type. -func (b *Backend) removeKeys(ctx context.Context, t restic.FileType) error { - return b.List(ctx, t, func(fi restic.FileInfo) error { - return b.Remove(ctx, restic.Handle{Type: t, Name: fi.Name}) - }) -} - // Delete removes all data in the backend. func (b *Backend) Delete(ctx context.Context) error { - alltypes := []restic.FileType{ - restic.PackFile, - restic.KeyFile, - restic.LockFile, - restic.SnapshotFile, - restic.IndexFile} - - for _, t := range alltypes { - err := b.removeKeys(ctx, t) - if err != nil { - return nil - } - } - - err := b.Remove(ctx, restic.Handle{Type: restic.ConfigFile}) - if err != nil && b.IsNotExist(err) { - return nil - } - return err + return backend.DefaultDelete(ctx, b) } diff --git a/internal/backend/s3/s3.go b/internal/backend/s3/s3.go index 591ad2185..7b7a761ce 100644 --- a/internal/backend/s3/s3.go +++ b/internal/backend/s3/s3.go @@ -411,30 +411,9 @@ func (be *Backend) List(ctx context.Context, t restic.FileType, fn func(restic.F return ctx.Err() } -// Remove keys for a specified backend type. -func (be *Backend) removeKeys(ctx context.Context, t restic.FileType) error { - return be.List(ctx, restic.PackFile, func(fi restic.FileInfo) error { - return be.Remove(ctx, restic.Handle{Type: t, Name: fi.Name}) - }) -} - // Delete removes all restic keys in the bucket. It will not remove the bucket itself. func (be *Backend) Delete(ctx context.Context) error { - alltypes := []restic.FileType{ - restic.PackFile, - restic.KeyFile, - restic.LockFile, - restic.SnapshotFile, - restic.IndexFile} - - for _, t := range alltypes { - err := be.removeKeys(ctx, t) - if err != nil { - return nil - } - } - - return be.Remove(ctx, restic.Handle{Type: restic.ConfigFile}) + return backend.DefaultDelete(ctx, be) } // Close does nothing diff --git a/internal/backend/swift/swift.go b/internal/backend/swift/swift.go index fcdbe6634..cfa9ed665 100644 --- a/internal/backend/swift/swift.go +++ b/internal/backend/swift/swift.go @@ -231,13 +231,6 @@ func (be *beSwift) List(ctx context.Context, t restic.FileType, fn func(restic.F return ctx.Err() } -// Remove keys for a specified backend type. -func (be *beSwift) removeKeys(ctx context.Context, t restic.FileType) error { - return be.List(ctx, t, func(fi restic.FileInfo) error { - return be.Remove(ctx, restic.Handle{Type: t, Name: fi.Name}) - }) -} - // IsNotExist returns true if the error is caused by a not existing file. func (be *beSwift) IsNotExist(err error) bool { var e *swift.Error @@ -247,26 +240,7 @@ func (be *beSwift) IsNotExist(err error) bool { // Delete removes all restic objects in the container. // It will not remove the container itself. func (be *beSwift) Delete(ctx context.Context) error { - alltypes := []restic.FileType{ - restic.PackFile, - restic.KeyFile, - restic.LockFile, - restic.SnapshotFile, - restic.IndexFile} - - for _, t := range alltypes { - err := be.removeKeys(ctx, t) - if err != nil { - return nil - } - } - - err := be.Remove(ctx, restic.Handle{Type: restic.ConfigFile}) - if err != nil && !be.IsNotExist(err) { - return err - } - - return nil + return backend.DefaultDelete(ctx, be) } // Close does nothing diff --git a/internal/backend/utils.go b/internal/backend/utils.go index bf8a7ad6d..cd6614f34 100644 --- a/internal/backend/utils.go +++ b/internal/backend/utils.go @@ -75,6 +75,31 @@ func DefaultLoad(ctx context.Context, h restic.Handle, length int, offset int64, return rd.Close() } +// DefaultDelete removes all restic keys in the bucket. It will not remove the bucket itself. +func DefaultDelete(ctx context.Context, be restic.Backend) error { + alltypes := []restic.FileType{ + restic.PackFile, + restic.KeyFile, + restic.LockFile, + restic.SnapshotFile, + restic.IndexFile} + + for _, t := range alltypes { + err := be.List(ctx, t, func(fi restic.FileInfo) error { + return be.Remove(ctx, restic.Handle{Type: t, Name: fi.Name}) + }) + if err != nil { + return nil + } + } + err := be.Remove(ctx, restic.Handle{Type: restic.ConfigFile}) + if err != nil && be.IsNotExist(err) { + err = nil + } + + return err +} + type memorizedLister struct { fileInfos []restic.FileInfo tpe restic.FileType From 616926d2c19387889bbf00f765f5c131a521545b Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Fri, 7 Apr 2023 23:16:08 +0200 Subject: [PATCH 08/13] gs: use IsNotExist to check error --- internal/backend/gs/gs.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/backend/gs/gs.go b/internal/backend/gs/gs.go index de798ac92..f310fbef5 100644 --- a/internal/backend/gs/gs.go +++ b/internal/backend/gs/gs.go @@ -286,7 +286,7 @@ func (be *Backend) Remove(ctx context.Context, h restic.Handle) error { err := be.bucket.Object(objName).Delete(ctx) - if err == storage.ErrObjectNotExist { + if be.IsNotExist(err) { err = nil } From c934c99d4124d4033b7cbd99b79fc8ebe377d804 Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Sat, 8 Apr 2023 11:59:44 +0200 Subject: [PATCH 09/13] gs: replace usage of context.Background() --- cmd/restic/global.go | 2 +- internal/backend/gs/gs.go | 3 +-- internal/backend/gs/gs_test.go | 2 +- 3 files changed, 3 insertions(+), 4 deletions(-) diff --git a/cmd/restic/global.go b/cmd/restic/global.go index 8d34f8ddb..537fe02fe 100644 --- a/cmd/restic/global.go +++ b/cmd/restic/global.go @@ -801,7 +801,7 @@ func create(ctx context.Context, s string, opts options.Options) (restic.Backend case "s3": be, err = s3.Create(ctx, cfg.(s3.Config), rt) case "gs": - be, err = gs.Create(cfg.(gs.Config), rt) + be, err = gs.Create(ctx, cfg.(gs.Config), rt) case "azure": be, err = azure.Create(ctx, cfg.(azure.Config), rt) case "swift": diff --git a/internal/backend/gs/gs.go b/internal/backend/gs/gs.go index f310fbef5..62e5c4954 100644 --- a/internal/backend/gs/gs.go +++ b/internal/backend/gs/gs.go @@ -124,14 +124,13 @@ func Open(cfg Config, rt http.RoundTripper) (restic.Backend, error) { // // The service account must have the "storage.buckets.create" permission to // create a bucket the does not yet exist. -func Create(cfg Config, rt http.RoundTripper) (restic.Backend, error) { +func Create(ctx context.Context, cfg Config, rt http.RoundTripper) (restic.Backend, error) { be, err := open(cfg, rt) if err != nil { return nil, errors.Wrap(err, "open") } // Try to determine if the bucket exists. If it does not, try to create it. - ctx := context.Background() exists, err := be.bucketExists(ctx, be.bucket) if err != nil { if e, ok := err.(*googleapi.Error); ok && e.Code == http.StatusForbidden { diff --git a/internal/backend/gs/gs_test.go b/internal/backend/gs/gs_test.go index 77f8986f1..19ae8b829 100644 --- a/internal/backend/gs/gs_test.go +++ b/internal/backend/gs/gs_test.go @@ -42,7 +42,7 @@ func newGSTestSuite(t testing.TB) *test.Suite { Create: func(config interface{}) (restic.Backend, error) { cfg := config.(gs.Config) - be, err := gs.Create(cfg, tr) + be, err := gs.Create(context.Background(), cfg, tr) if err != nil { return nil, err } From 6042df075fba7d8113586181cd0ab332b5304aaa Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Sat, 8 Apr 2023 12:53:43 +0200 Subject: [PATCH 10/13] migrations: Fix S3 backend detection --- internal/backend/limiter/limiter_backend.go | 2 ++ internal/backend/logger/log.go | 2 ++ internal/backend/retry/backend_retry.go | 4 +++ internal/backend/sema/backend.go | 4 +++ internal/cache/backend.go | 4 +++ internal/migrations/s3_layout.go | 29 +++++++++++---------- internal/migrations/s3_layout_test.go | 27 +++++++++++++++++++ internal/restic/backend.go | 5 ++++ 8 files changed, 63 insertions(+), 14 deletions(-) create mode 100644 internal/migrations/s3_layout_test.go diff --git a/internal/backend/limiter/limiter_backend.go b/internal/backend/limiter/limiter_backend.go index f1b508327..7fcca59cc 100644 --- a/internal/backend/limiter/limiter_backend.go +++ b/internal/backend/limiter/limiter_backend.go @@ -46,6 +46,8 @@ func (r rateLimitedBackend) Load(ctx context.Context, h restic.Handle, length in }) } +func (r rateLimitedBackend) Unwrap() restic.Backend { return r.Backend } + type limitedReader struct { io.Reader writerTo io.WriterTo diff --git a/internal/backend/logger/log.go b/internal/backend/logger/log.go index 4623d8021..6c860cfae 100644 --- a/internal/backend/logger/log.go +++ b/internal/backend/logger/log.go @@ -75,3 +75,5 @@ func (be *Backend) Close() error { debug.Log(" close err %v", err) return err } + +func (be *Backend) Unwrap() restic.Backend { return be.Backend } diff --git a/internal/backend/retry/backend_retry.go b/internal/backend/retry/backend_retry.go index b5f2706f4..9c51efedc 100644 --- a/internal/backend/retry/backend_retry.go +++ b/internal/backend/retry/backend_retry.go @@ -191,3 +191,7 @@ func (be *Backend) List(ctx context.Context, t restic.FileType, fn func(restic.F return err } + +func (be *Backend) Unwrap() restic.Backend { + return be.Backend +} diff --git a/internal/backend/sema/backend.go b/internal/backend/sema/backend.go index 4b6f55b50..9f6dfbadd 100644 --- a/internal/backend/sema/backend.go +++ b/internal/backend/sema/backend.go @@ -85,3 +85,7 @@ func (be *SemaphoreBackend) Remove(ctx context.Context, h restic.Handle) error { return be.Backend.Remove(ctx, h) } + +func (be *SemaphoreBackend) Unwrap() restic.Backend { + return be.Backend +} diff --git a/internal/cache/backend.go b/internal/cache/backend.go index a707f8243..08ec1facd 100644 --- a/internal/cache/backend.go +++ b/internal/cache/backend.go @@ -211,3 +211,7 @@ func (b *Backend) Stat(ctx context.Context, h restic.Handle) (restic.FileInfo, e func (b *Backend) IsNotExist(err error) bool { return b.Backend.IsNotExist(err) } + +func (b *Backend) Unwrap() restic.Backend { + return b.Backend +} diff --git a/internal/migrations/s3_layout.go b/internal/migrations/s3_layout.go index d42b94bf8..a5293ef16 100644 --- a/internal/migrations/s3_layout.go +++ b/internal/migrations/s3_layout.go @@ -8,7 +8,6 @@ import ( "github.com/restic/restic/internal/backend/layout" "github.com/restic/restic/internal/backend/s3" - "github.com/restic/restic/internal/cache" "github.com/restic/restic/internal/debug" "github.com/restic/restic/internal/errors" "github.com/restic/restic/internal/restic" @@ -22,24 +21,26 @@ func init() { // "default" layout. type S3Layout struct{} -func toS3Backend(repo restic.Repository) *s3.Backend { - b := repo.Backend() - // unwrap cache - if be, ok := b.(*cache.Backend); ok { - b = be.Backend - } +func toS3Backend(b restic.Backend) *s3.Backend { + for b != nil { + if be, ok := b.(*s3.Backend); ok { + return be + } - be, ok := b.(*s3.Backend) - if !ok { - debug.Log("backend is not s3") - return nil + if be, ok := b.(restic.BackendUnwrapper); ok { + b = be.Unwrap() + } else { + // not the backend we're looking for + break + } } - return be + debug.Log("backend is not s3") + return nil } // Check tests whether the migration can be applied. func (m *S3Layout) Check(ctx context.Context, repo restic.Repository) (bool, string, error) { - be := toS3Backend(repo) + be := toS3Backend(repo.Backend()) if be == nil { debug.Log("backend is not s3") return false, "backend is not s3", nil @@ -91,7 +92,7 @@ func (m *S3Layout) moveFiles(ctx context.Context, be *s3.Backend, l layout.Layou // Apply runs the migration. func (m *S3Layout) Apply(ctx context.Context, repo restic.Repository) error { - be := toS3Backend(repo) + be := toS3Backend(repo.Backend()) if be == nil { debug.Log("backend is not s3") return errors.New("backend is not s3") diff --git a/internal/migrations/s3_layout_test.go b/internal/migrations/s3_layout_test.go new file mode 100644 index 000000000..ad0eedea6 --- /dev/null +++ b/internal/migrations/s3_layout_test.go @@ -0,0 +1,27 @@ +package migrations + +import ( + "testing" + + "github.com/restic/restic/internal/backend/mock" + "github.com/restic/restic/internal/backend/s3" + "github.com/restic/restic/internal/cache" + "github.com/restic/restic/internal/test" +) + +func TestS3UnwrapBackend(t *testing.T) { + // toS3Backend(b restic.Backend) *s3.Backend + + m := mock.NewBackend() + test.Assert(t, toS3Backend(m) == nil, "mock backend is not an s3 backend") + + // uninitialized fake backend for testing + s3 := &s3.Backend{} + test.Assert(t, toS3Backend(s3) == s3, "s3 was not returned") + + c := &cache.Backend{Backend: s3} + test.Assert(t, toS3Backend(c) == s3, "failed to unwrap s3 backend") + + c.Backend = m + test.Assert(t, toS3Backend(c) == nil, "a wrapped mock backend is not an s3 backend") +} diff --git a/internal/restic/backend.go b/internal/restic/backend.go index bc139fc8b..b01071132 100644 --- a/internal/restic/backend.go +++ b/internal/restic/backend.go @@ -70,6 +70,11 @@ type Backend interface { Delete(ctx context.Context) error } +type BackendUnwrapper interface { + // Unwrap returns the underlying backend or nil if there is none. + Unwrap() Backend +} + // FileInfo is contains information about a file in the backend. type FileInfo struct { Size int64 From f27750e27004108d64fc6e313e7824b59823d0d5 Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Fri, 21 Apr 2023 22:44:45 +0200 Subject: [PATCH 11/13] backend/sema: rename type to connectionLimitedBackend --- internal/backend/sema/backend.go | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/internal/backend/sema/backend.go b/internal/backend/sema/backend.go index 9f6dfbadd..2294ef0b2 100644 --- a/internal/backend/sema/backend.go +++ b/internal/backend/sema/backend.go @@ -9,30 +9,30 @@ import ( "github.com/restic/restic/internal/restic" ) -// make sure that SemaphoreBackend implements restic.Backend -var _ restic.Backend = &SemaphoreBackend{} +// make sure that connectionLimitedBackend implements restic.Backend +var _ restic.Backend = &connectionLimitedBackend{} -// SemaphoreBackend limits the number of concurrent operations. -type SemaphoreBackend struct { +// connectionLimitedBackend limits the number of concurrent operations. +type connectionLimitedBackend struct { restic.Backend sem semaphore } // New creates a backend that limits the concurrent operations on the underlying backend -func New(be restic.Backend) *SemaphoreBackend { +func New(be restic.Backend) restic.Backend { sem, err := newSemaphore(be.Connections()) if err != nil { panic(err) } - return &SemaphoreBackend{ + return &connectionLimitedBackend{ Backend: be, sem: sem, } } // Save adds new Data to the backend. -func (be *SemaphoreBackend) Save(ctx context.Context, h restic.Handle, rd restic.RewindReader) error { +func (be *connectionLimitedBackend) Save(ctx context.Context, h restic.Handle, rd restic.RewindReader) error { if err := h.Valid(); err != nil { return backoff.Permanent(err) } @@ -45,7 +45,7 @@ func (be *SemaphoreBackend) Save(ctx context.Context, h restic.Handle, rd restic // Load runs fn with a reader that yields the contents of the file at h at the // given offset. -func (be *SemaphoreBackend) Load(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error { +func (be *connectionLimitedBackend) Load(ctx context.Context, h restic.Handle, length int, offset int64, fn func(rd io.Reader) error) error { if err := h.Valid(); err != nil { return backoff.Permanent(err) } @@ -63,7 +63,7 @@ func (be *SemaphoreBackend) Load(ctx context.Context, h restic.Handle, length in } // Stat returns information about a file in the backend. -func (be *SemaphoreBackend) Stat(ctx context.Context, h restic.Handle) (restic.FileInfo, error) { +func (be *connectionLimitedBackend) Stat(ctx context.Context, h restic.Handle) (restic.FileInfo, error) { if err := h.Valid(); err != nil { return restic.FileInfo{}, backoff.Permanent(err) } @@ -75,7 +75,7 @@ func (be *SemaphoreBackend) Stat(ctx context.Context, h restic.Handle) (restic.F } // Remove deletes a file from the backend. -func (be *SemaphoreBackend) Remove(ctx context.Context, h restic.Handle) error { +func (be *connectionLimitedBackend) Remove(ctx context.Context, h restic.Handle) error { if err := h.Valid(); err != nil { return backoff.Permanent(err) } @@ -86,6 +86,6 @@ func (be *SemaphoreBackend) Remove(ctx context.Context, h restic.Handle) error { return be.Backend.Remove(ctx, h) } -func (be *SemaphoreBackend) Unwrap() restic.Backend { +func (be *connectionLimitedBackend) Unwrap() restic.Backend { return be.Backend } From ebba233a3a86259e368f9826af269c70d8a27462 Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Sat, 22 Apr 2023 12:32:57 +0200 Subject: [PATCH 12/13] backend/sema: rename constructor to NewBackend --- cmd/restic/global.go | 4 ++-- internal/backend/sema/backend.go | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/cmd/restic/global.go b/cmd/restic/global.go index 537fe02fe..43b8eb217 100644 --- a/cmd/restic/global.go +++ b/cmd/restic/global.go @@ -746,7 +746,7 @@ func open(ctx context.Context, s string, gopts GlobalOptions, opts options.Optio } // wrap with debug logging and connection limiting - be = logger.New(sema.New(be)) + be = logger.New(sema.NewBackend(be)) // wrap backend if a test specified an inner hook if gopts.backendInnerTestHook != nil { @@ -821,5 +821,5 @@ func create(ctx context.Context, s string, opts options.Options) (restic.Backend return nil, err } - return logger.New(sema.New(be)), nil + return logger.New(sema.NewBackend(be)), nil } diff --git a/internal/backend/sema/backend.go b/internal/backend/sema/backend.go index 2294ef0b2..fc4a9dde5 100644 --- a/internal/backend/sema/backend.go +++ b/internal/backend/sema/backend.go @@ -18,8 +18,8 @@ type connectionLimitedBackend struct { sem semaphore } -// New creates a backend that limits the concurrent operations on the underlying backend -func New(be restic.Backend) restic.Backend { +// NewBackend creates a backend that limits the concurrent operations on the underlying backend +func NewBackend(be restic.Backend) restic.Backend { sem, err := newSemaphore(be.Connections()) if err != nil { panic(err) From 831f593b875f922692cf1d5af6a120d9b89c4d0c Mon Sep 17 00:00:00 2001 From: Michael Eischer Date: Sat, 22 Apr 2023 12:33:06 +0200 Subject: [PATCH 13/13] backend/sema: Add tests --- internal/backend/sema/backend_test.go | 180 ++++++++++++++++++++++++++ 1 file changed, 180 insertions(+) create mode 100644 internal/backend/sema/backend_test.go diff --git a/internal/backend/sema/backend_test.go b/internal/backend/sema/backend_test.go new file mode 100644 index 000000000..db9559840 --- /dev/null +++ b/internal/backend/sema/backend_test.go @@ -0,0 +1,180 @@ +package sema_test + +import ( + "context" + "io" + "sync/atomic" + "testing" + "time" + + "github.com/restic/restic/internal/backend/mock" + "github.com/restic/restic/internal/backend/sema" + "github.com/restic/restic/internal/restic" + "github.com/restic/restic/internal/test" + "golang.org/x/sync/errgroup" +) + +func TestParameterValidationSave(t *testing.T) { + m := mock.NewBackend() + m.SaveFn = func(ctx context.Context, h restic.Handle, rd restic.RewindReader) error { + return nil + } + be := sema.NewBackend(m) + + err := be.Save(context.TODO(), restic.Handle{}, nil) + test.Assert(t, err != nil, "Save() with invalid handle did not return an error") +} + +func TestParameterValidationLoad(t *testing.T) { + m := mock.NewBackend() + m.OpenReaderFn = func(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) { + return io.NopCloser(nil), nil + } + + be := sema.NewBackend(m) + nilCb := func(rd io.Reader) error { return nil } + + err := be.Load(context.TODO(), restic.Handle{}, 10, 0, nilCb) + test.Assert(t, err != nil, "Load() with invalid handle did not return an error") + + h := restic.Handle{Type: restic.PackFile, Name: "foobar"} + err = be.Load(context.TODO(), h, 10, -1, nilCb) + test.Assert(t, err != nil, "Save() with negative offset did not return an error") + err = be.Load(context.TODO(), h, -1, 0, nilCb) + test.Assert(t, err != nil, "Save() with negative length did not return an error") +} + +func TestParameterValidationStat(t *testing.T) { + m := mock.NewBackend() + m.StatFn = func(ctx context.Context, h restic.Handle) (restic.FileInfo, error) { + return restic.FileInfo{}, nil + } + be := sema.NewBackend(m) + + _, err := be.Stat(context.TODO(), restic.Handle{}) + test.Assert(t, err != nil, "Stat() with invalid handle did not return an error") +} + +func TestParameterValidationRemove(t *testing.T) { + m := mock.NewBackend() + m.RemoveFn = func(ctx context.Context, h restic.Handle) error { + return nil + } + be := sema.NewBackend(m) + + err := be.Remove(context.TODO(), restic.Handle{}) + test.Assert(t, err != nil, "Remove() with invalid handle did not return an error") +} + +func TestUnwrap(t *testing.T) { + m := mock.NewBackend() + be := sema.NewBackend(m) + + unwrapper := be.(restic.BackendUnwrapper) + test.Assert(t, unwrapper.Unwrap() == m, "Unwrap() returned wrong backend") +} + +func countingBlocker() (func(), func(int) int) { + ctr := int64(0) + blocker := make(chan struct{}) + + wait := func() { + // count how many goroutines were allowed by the semaphore + atomic.AddInt64(&ctr, 1) + // block until the test can retrieve the counter + <-blocker + } + + unblock := func(expected int) int { + // give goroutines enough time to block + var blocked int64 + for i := 0; i < 100 && blocked != int64(expected); i++ { + time.Sleep(100 * time.Microsecond) + blocked = atomic.LoadInt64(&ctr) + } + close(blocker) + return int(blocked) + } + + return wait, unblock +} + +func concurrencyTester(t *testing.T, setup func(m *mock.Backend), handler func(be restic.Backend) func() error, unblock func(int) int) { + expectBlocked := int(2) + + m := mock.NewBackend() + setup(m) + m.ConnectionsFn = func() uint { return uint(expectBlocked) } + be := sema.NewBackend(m) + + var wg errgroup.Group + for i := 0; i < int(expectBlocked+1); i++ { + wg.Go(handler(be)) + } + + blocked := unblock(expectBlocked) + test.Assert(t, blocked == expectBlocked, "Unexpected number of goroutines blocked: %v", blocked) + test.OK(t, wg.Wait()) +} + +func TestConcurrencyLimitSave(t *testing.T) { + wait, unblock := countingBlocker() + concurrencyTester(t, func(m *mock.Backend) { + m.SaveFn = func(ctx context.Context, h restic.Handle, rd restic.RewindReader) error { + wait() + return nil + } + }, func(be restic.Backend) func() error { + return func() error { + h := restic.Handle{Type: restic.PackFile, Name: "foobar"} + return be.Save(context.TODO(), h, nil) + } + }, unblock) +} + +func TestConcurrencyLimitLoad(t *testing.T) { + wait, unblock := countingBlocker() + concurrencyTester(t, func(m *mock.Backend) { + m.OpenReaderFn = func(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) { + wait() + return io.NopCloser(nil), nil + } + }, func(be restic.Backend) func() error { + return func() error { + h := restic.Handle{Type: restic.PackFile, Name: "foobar"} + nilCb := func(rd io.Reader) error { return nil } + return be.Load(context.TODO(), h, 10, 0, nilCb) + } + }, unblock) +} + +func TestConcurrencyLimitStat(t *testing.T) { + wait, unblock := countingBlocker() + concurrencyTester(t, func(m *mock.Backend) { + m.StatFn = func(ctx context.Context, h restic.Handle) (restic.FileInfo, error) { + wait() + return restic.FileInfo{}, nil + } + }, func(be restic.Backend) func() error { + return func() error { + h := restic.Handle{Type: restic.PackFile, Name: "foobar"} + _, err := be.Stat(context.TODO(), h) + return err + } + }, unblock) +} + +func TestConcurrencyLimitDelete(t *testing.T) { + wait, unblock := countingBlocker() + concurrencyTester(t, func(m *mock.Backend) { + m.RemoveFn = func(ctx context.Context, h restic.Handle) error { + wait() + return nil + } + }, func(be restic.Backend) func() error { + return func() error { + h := restic.Handle{Type: restic.PackFile, Name: "foobar"} + return be.Remove(context.TODO(), h) + } + }, unblock) +}