From 981752ade099f1a2932a8f4419c424ec02a45de8 Mon Sep 17 00:00:00 2001 From: George Armhold Date: Tue, 31 Oct 2017 07:32:30 -0400 Subject: [PATCH 1/5] Azure backend: limit http concurrency in Stat(), Test(), Remove() as per discussion in PR #1399 --- internal/backend/azure/azure.go | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/internal/backend/azure/azure.go b/internal/backend/azure/azure.go index 44ed20048..3430c69ae 100644 --- a/internal/backend/azure/azure.go +++ b/internal/backend/azure/azure.go @@ -227,13 +227,17 @@ func (be *Backend) Load(ctx context.Context, h restic.Handle, length int, offset } // Stat returns information about a blob. -func (be *Backend) Stat(ctx context.Context, h restic.Handle) (bi restic.FileInfo, err error) { +func (be *Backend) Stat(ctx context.Context, h restic.Handle) (restic.FileInfo, error) { debug.Log("%v", h) objName := be.Filename(h) blob := be.container.GetBlobReference(objName) - if err := blob.GetProperties(nil); err != nil { + be.sem.GetToken() + err := blob.GetProperties(nil) + be.sem.ReleaseToken() + + if err != nil { debug.Log("blob.GetProperties err %v", err) return restic.FileInfo{}, errors.Wrap(err, "blob.GetProperties") } @@ -244,7 +248,11 @@ func (be *Backend) Stat(ctx context.Context, h restic.Handle) (bi restic.FileInf // Test returns true if a blob of the given type and name exists in the backend. func (be *Backend) Test(ctx context.Context, h restic.Handle) (bool, error) { objName := be.Filename(h) + + be.sem.GetToken() found, err := be.container.GetBlobReference(objName).Exists() + be.sem.ReleaseToken() + if err != nil { return false, err } @@ -254,7 +262,11 @@ func (be *Backend) Test(ctx context.Context, h restic.Handle) (bool, error) { // Remove removes the blob with the given name and type. func (be *Backend) Remove(ctx context.Context, h restic.Handle) error { objName := be.Filename(h) + + be.sem.GetToken() _, err := be.container.GetBlobReference(objName).DeleteIfExists(nil) + be.sem.ReleaseToken() + debug.Log("Remove(%v) at %v -> err %v", h, objName, err) return errors.Wrap(err, "client.RemoveObject") } @@ -282,7 +294,10 @@ func (be *Backend) List(ctx context.Context, t restic.FileType) <-chan string { defer close(ch) for { + be.sem.GetToken() obj, err := be.container.ListBlobs(params) + be.sem.ReleaseToken() + if err != nil { return } From d069ee31b2acec66bca1dc29cd626679615c8873 Mon Sep 17 00:00:00 2001 From: George Armhold Date: Tue, 31 Oct 2017 08:01:43 -0400 Subject: [PATCH 2/5] GS backend: limit http concurrency in Save(), Stat(), Test(), Remove(), List() as per discussion in PR #1399 --- internal/backend/gs/gs.go | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/internal/backend/gs/gs.go b/internal/backend/gs/gs.go index 1141497a1..ce02dc691 100644 --- a/internal/backend/gs/gs.go +++ b/internal/backend/gs/gs.go @@ -204,14 +204,15 @@ func (be *Backend) Save(ctx context.Context, h restic.Handle, rd io.Reader) (err debug.Log("Save %v at %v", h, objName) + be.sem.GetToken() + // Check key does not already exist if _, err := be.service.Objects.Get(be.bucketName, objName).Do(); err == nil { debug.Log("%v already exists", h) + be.sem.ReleaseToken() return errors.New("key already exists") } - be.sem.GetToken() - debug.Log("InsertObject(%v, %v)", be.bucketName, objName) // Set chunk size to zero to disable resumable uploads. @@ -323,7 +324,10 @@ func (be *Backend) Stat(ctx context.Context, h restic.Handle) (bi restic.FileInf objName := be.Filename(h) + be.sem.GetToken() obj, err := be.service.Objects.Get(be.bucketName, objName).Do() + be.sem.ReleaseToken() + if err != nil { debug.Log("GetObject() err %v", err) return restic.FileInfo{}, errors.Wrap(err, "service.Objects.Get") @@ -336,7 +340,11 @@ func (be *Backend) Stat(ctx context.Context, h restic.Handle) (bi restic.FileInf func (be *Backend) Test(ctx context.Context, h restic.Handle) (bool, error) { found := false objName := be.Filename(h) + + be.sem.GetToken() _, err := be.service.Objects.Get(be.bucketName, objName).Do() + be.sem.ReleaseToken() + if err == nil { found = true } @@ -348,7 +356,10 @@ func (be *Backend) Test(ctx context.Context, h restic.Handle) (bool, error) { func (be *Backend) Remove(ctx context.Context, h restic.Handle) error { objName := be.Filename(h) + be.sem.GetToken() err := be.service.Objects.Delete(be.bucketName, objName).Do() + be.sem.ReleaseToken() + if er, ok := err.(*googleapi.Error); ok { if er.Code == 404 { err = nil @@ -378,7 +389,10 @@ func (be *Backend) List(ctx context.Context, t restic.FileType) <-chan string { listReq := be.service.Objects.List(be.bucketName).Prefix(prefix).MaxResults(int64(be.listMaxItems)) for { + be.sem.GetToken() obj, err := listReq.Do() + be.sem.ReleaseToken() + if err != nil { fmt.Fprintf(os.Stderr, "error listing %v: %v\n", prefix, err) return From 99ac0da4bc9f35515fe53bab573e60743044fb01 Mon Sep 17 00:00:00 2001 From: George Armhold Date: Wed, 1 Nov 2017 09:40:54 -0400 Subject: [PATCH 3/5] s3 backend: limit http concurrency in Save(), Stat(), Test(), Remove() NB: List() is NOT currently limited, as it would cause deadlock due to be.client.ListObjects() implementation. as per discussion in PR #1399 --- internal/backend/s3/s3.go | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/internal/backend/s3/s3.go b/internal/backend/s3/s3.go index f21be437c..86fd078a5 100644 --- a/internal/backend/s3/s3.go +++ b/internal/backend/s3/s3.go @@ -263,6 +263,9 @@ func (be *Backend) Save(ctx context.Context, h restic.Handle, rd io.Reader) (err objName := be.Filename(h) + be.sem.GetToken() + defer be.sem.ReleaseToken() + // Check key does not already exist _, err = be.client.StatObject(be.cfg.Bucket, objName) if err == nil { @@ -282,10 +285,8 @@ func (be *Backend) Save(ctx context.Context, h restic.Handle, rd io.Reader) (err debug.Log("reader is %#T, no specific workaround enabled", rd) } - be.sem.GetToken() debug.Log("PutObject(%v, %v)", be.cfg.Bucket, objName) n, err := be.client.PutObject(be.cfg.Bucket, objName, rd, "application/octet-stream") - be.sem.ReleaseToken() debug.Log("%v -> %v bytes, err %#v: %v", objName, n, err, err) @@ -358,15 +359,18 @@ func (be *Backend) Stat(ctx context.Context, h restic.Handle) (bi restic.FileInf objName := be.Filename(h) var obj *minio.Object + be.sem.GetToken() obj, err = be.client.GetObject(be.cfg.Bucket, objName) if err != nil { debug.Log("GetObject() err %v", err) + be.sem.ReleaseToken() return restic.FileInfo{}, errors.Wrap(err, "client.GetObject") } // make sure that the object is closed properly. defer func() { e := obj.Close() + be.sem.ReleaseToken() if err == nil { err = errors.Wrap(e, "Close") } @@ -385,7 +389,11 @@ func (be *Backend) Stat(ctx context.Context, h restic.Handle) (bi restic.FileInf func (be *Backend) Test(ctx context.Context, h restic.Handle) (bool, error) { found := false objName := be.Filename(h) + + be.sem.GetToken() _, err := be.client.StatObject(be.cfg.Bucket, objName) + be.sem.ReleaseToken() + if err == nil { found = true } @@ -397,7 +405,11 @@ func (be *Backend) Test(ctx context.Context, h restic.Handle) (bool, error) { // Remove removes the blob with the given name and type. func (be *Backend) Remove(ctx context.Context, h restic.Handle) error { objName := be.Filename(h) + + be.sem.GetToken() err := be.client.RemoveObject(be.cfg.Bucket, objName) + be.sem.ReleaseToken() + debug.Log("Remove(%v) at %v -> err %v", h, objName, err) if be.IsNotExist(err) { From 8515d093e00a0109621c1248728c44ca5806f3f6 Mon Sep 17 00:00:00 2001 From: George Armhold Date: Thu, 2 Nov 2017 12:38:17 -0400 Subject: [PATCH 4/5] swift backend: fix premature release of semaphore in Load() & document concurrency issue in List(). refactor wrapReader from b2 -> semaphore so it can be used elsewhere. As per discussion in PR #1399. --- internal/backend/b2/b2.go | 43 ++------------------------------- internal/backend/semaphore.go | 42 +++++++++++++++++++++++++++++++- internal/backend/swift/swift.go | 12 ++++----- 3 files changed, 49 insertions(+), 48 deletions(-) diff --git a/internal/backend/b2/b2.go b/internal/backend/b2/b2.go index b115662e9..7e570c6eb 100644 --- a/internal/backend/b2/b2.go +++ b/internal/backend/b2/b2.go @@ -137,31 +137,6 @@ func (be *b2Backend) Location() string { return be.cfg.Bucket } -// wrapReader wraps an io.ReadCloser to run an additional function on Close. -type wrapReader struct { - io.ReadCloser - eofSeen bool - f func() -} - -func (wr *wrapReader) Read(p []byte) (int, error) { - if wr.eofSeen { - return 0, io.EOF - } - - n, err := wr.ReadCloser.Read(p) - if err == io.EOF { - wr.eofSeen = true - } - return n, err -} - -func (wr *wrapReader) Close() error { - err := wr.ReadCloser.Close() - wr.f() - return err -} - // IsNotExist returns true if the error is caused by a non-existing file. func (be *b2Backend) IsNotExist(err error) bool { return b2.IsNotExist(errors.Cause(err)) @@ -192,14 +167,7 @@ func (be *b2Backend) Load(ctx context.Context, h restic.Handle, length int, offs if offset == 0 && length == 0 { rd := obj.NewReader(ctx) - wrapper := &wrapReader{ - ReadCloser: rd, - f: func() { - cancel() - be.sem.ReleaseToken() - }, - } - return wrapper, nil + return be.sem.ReleaseTokenOnClose(rd, cancel), nil } // pass a negative length to NewRangeReader so that the remainder of the @@ -209,14 +177,7 @@ func (be *b2Backend) Load(ctx context.Context, h restic.Handle, length int, offs } rd := obj.NewRangeReader(ctx, offset, int64(length)) - wrapper := &wrapReader{ - ReadCloser: rd, - f: func() { - cancel() - be.sem.ReleaseToken() - }, - } - return wrapper, nil + return be.sem.ReleaseTokenOnClose(rd, cancel), nil } // Save stores data in the backend at the handle. diff --git a/internal/backend/semaphore.go b/internal/backend/semaphore.go index e83191c46..2146db2f3 100644 --- a/internal/backend/semaphore.go +++ b/internal/backend/semaphore.go @@ -1,6 +1,10 @@ package backend -import "github.com/restic/restic/internal/errors" +import ( + "context" + "github.com/restic/restic/internal/errors" + "io" +) // Semaphore limits access to a restricted resource. type Semaphore struct { @@ -26,3 +30,39 @@ func (s *Semaphore) GetToken() { func (s *Semaphore) ReleaseToken() { <-s.ch } + +// ReleaseTokenOnClose wraps an io.ReadCloser to return a token on Close. Before returning the token, +// cancel, if provided, will be run to free up context resources. +func (s *Semaphore) ReleaseTokenOnClose(rc io.ReadCloser, cancel context.CancelFunc) io.ReadCloser { + return &wrapReader{rc, false, func() { + if cancel != nil { + cancel() + } + s.ReleaseToken() + }} +} + +// wrapReader wraps an io.ReadCloser to run an additional function on Close. +type wrapReader struct { + io.ReadCloser + eofSeen bool + f func() +} + +func (wr *wrapReader) Read(p []byte) (int, error) { + if wr.eofSeen { + return 0, io.EOF + } + + n, err := wr.ReadCloser.Read(p) + if err == io.EOF { + wr.eofSeen = true + } + return n, err +} + +func (wr *wrapReader) Close() error { + err := wr.ReadCloser.Close() + wr.f() + return err +} diff --git a/internal/backend/swift/swift.go b/internal/backend/swift/swift.go index 8c69caffa..3497fec1d 100644 --- a/internal/backend/swift/swift.go +++ b/internal/backend/swift/swift.go @@ -129,11 +129,6 @@ func (be *beSwift) Load(ctx context.Context, h restic.Handle, length int, offset objName := be.Filename(h) - be.sem.GetToken() - defer func() { - be.sem.ReleaseToken() - }() - headers := swift.Headers{} if offset > 0 { headers["Range"] = fmt.Sprintf("bytes=%d-", offset) @@ -147,13 +142,15 @@ func (be *beSwift) Load(ctx context.Context, h restic.Handle, length int, offset debug.Log("Load(%v) send range %v", h, headers["Range"]) } + be.sem.GetToken() obj, _, err := be.conn.ObjectOpen(be.container, objName, false, headers) if err != nil { debug.Log(" err %v", err) + be.sem.ReleaseToken() return nil, errors.Wrap(err, "conn.ObjectOpen") } - return obj, nil + return be.sem.ReleaseTokenOnClose(obj, nil), nil } // Save stores data in the backend at the handle. @@ -243,6 +240,9 @@ func (be *beSwift) List(ctx context.Context, t restic.FileType) <-chan string { go func() { defer close(ch) + // NB: unfortunately we can't protect this with be.sem.GetToken() here. + // Doing so would enable a deadlock situation (PR: gh-1399), as ObjectsWalk() + // starts its own goroutine and returns results via a channel. err := be.conn.ObjectsWalk(be.container, &swift.ObjectsOpts{Prefix: prefix}, func(opts *swift.ObjectsOpts) (interface{}, error) { newObjects, err := be.conn.ObjectNames(be.container, opts) From 0268d0e7d6ae43e2cb46abdecdee8d55439f744e Mon Sep 17 00:00:00 2001 From: George Armhold Date: Thu, 2 Nov 2017 18:29:32 -0400 Subject: [PATCH 5/5] swift backend: limit http concurrency in Save(), Stat(), Test(), Remove(), List(). move comment regarding problematic List() backend api (it's s3's ListObjects that has a problem, NOT swift's ObjectsWalk). As per discussion in PR #1399. --- internal/backend/s3/s3.go | 3 +++ internal/backend/swift/swift.go | 25 +++++++++++++++++-------- 2 files changed, 20 insertions(+), 8 deletions(-) diff --git a/internal/backend/s3/s3.go b/internal/backend/s3/s3.go index 86fd078a5..844e832d2 100644 --- a/internal/backend/s3/s3.go +++ b/internal/backend/s3/s3.go @@ -433,6 +433,9 @@ func (be *Backend) List(ctx context.Context, t restic.FileType) <-chan string { prefix += "/" } + // NB: unfortunately we can't protect this with be.sem.GetToken() here. + // Doing so would enable a deadlock situation (gh-1399), as ListObjects() + // starts its own goroutine and returns results via a channel. listresp := be.client.ListObjects(be.cfg.Bucket, prefix, true, ctx.Done()) go func() { diff --git a/internal/backend/swift/swift.go b/internal/backend/swift/swift.go index 3497fec1d..5ada1f5d8 100644 --- a/internal/backend/swift/swift.go +++ b/internal/backend/swift/swift.go @@ -163,6 +163,9 @@ func (be *beSwift) Save(ctx context.Context, h restic.Handle, rd io.Reader) (err debug.Log("Save %v at %v", h, objName) + be.sem.GetToken() + defer be.sem.ReleaseToken() + // Check key does not already exist switch _, _, err = be.conn.Object(be.container, objName); err { case nil: @@ -176,11 +179,6 @@ func (be *beSwift) Save(ctx context.Context, h restic.Handle, rd io.Reader) (err return errors.Wrap(err, "conn.Object") } - be.sem.GetToken() - defer func() { - be.sem.ReleaseToken() - }() - encoding := "binary/octet-stream" debug.Log("PutObject(%v, %v, %v)", be.container, objName, encoding) @@ -196,6 +194,9 @@ func (be *beSwift) Stat(ctx context.Context, h restic.Handle) (bi restic.FileInf objName := be.Filename(h) + be.sem.GetToken() + defer be.sem.ReleaseToken() + obj, _, err := be.conn.Object(be.container, objName) if err != nil { debug.Log("Object() err %v", err) @@ -208,6 +209,10 @@ func (be *beSwift) Stat(ctx context.Context, h restic.Handle) (bi restic.FileInf // Test returns true if a blob of the given type and name exists in the backend. func (be *beSwift) Test(ctx context.Context, h restic.Handle) (bool, error) { objName := be.Filename(h) + + be.sem.GetToken() + defer be.sem.ReleaseToken() + switch _, _, err := be.conn.Object(be.container, objName); err { case nil: return true, nil @@ -223,6 +228,10 @@ func (be *beSwift) Test(ctx context.Context, h restic.Handle) (bool, error) { // Remove removes the blob with the given name and type. func (be *beSwift) Remove(ctx context.Context, h restic.Handle) error { objName := be.Filename(h) + + be.sem.GetToken() + defer be.sem.ReleaseToken() + err := be.conn.ObjectDelete(be.container, objName) debug.Log("Remove(%v) -> err %v", h, err) return errors.Wrap(err, "conn.ObjectDelete") @@ -240,12 +249,12 @@ func (be *beSwift) List(ctx context.Context, t restic.FileType) <-chan string { go func() { defer close(ch) - // NB: unfortunately we can't protect this with be.sem.GetToken() here. - // Doing so would enable a deadlock situation (PR: gh-1399), as ObjectsWalk() - // starts its own goroutine and returns results via a channel. err := be.conn.ObjectsWalk(be.container, &swift.ObjectsOpts{Prefix: prefix}, func(opts *swift.ObjectsOpts) (interface{}, error) { + be.sem.GetToken() newObjects, err := be.conn.ObjectNames(be.container, opts) + be.sem.ReleaseToken() + if err != nil { return nil, errors.Wrap(err, "conn.ObjectNames") }