2
2
mirror of https://github.com/octoleo/restic.git synced 2025-01-05 16:12:29 +00:00

s3 backend: limit http concurrency in Save(), Stat(), Test(), Remove()

NB: List() is NOT currently limited, as it would cause deadlock due to
be.client.ListObjects() implementation.

as per discussion in PR #1399
This commit is contained in:
George Armhold 2017-11-01 09:40:54 -04:00
parent d069ee31b2
commit 99ac0da4bc

View File

@ -263,6 +263,9 @@ func (be *Backend) Save(ctx context.Context, h restic.Handle, rd io.Reader) (err
objName := be.Filename(h) objName := be.Filename(h)
be.sem.GetToken()
defer be.sem.ReleaseToken()
// Check key does not already exist // Check key does not already exist
_, err = be.client.StatObject(be.cfg.Bucket, objName) _, err = be.client.StatObject(be.cfg.Bucket, objName)
if err == nil { if err == nil {
@ -282,10 +285,8 @@ func (be *Backend) Save(ctx context.Context, h restic.Handle, rd io.Reader) (err
debug.Log("reader is %#T, no specific workaround enabled", rd) debug.Log("reader is %#T, no specific workaround enabled", rd)
} }
be.sem.GetToken()
debug.Log("PutObject(%v, %v)", be.cfg.Bucket, objName) debug.Log("PutObject(%v, %v)", be.cfg.Bucket, objName)
n, err := be.client.PutObject(be.cfg.Bucket, objName, rd, "application/octet-stream") n, err := be.client.PutObject(be.cfg.Bucket, objName, rd, "application/octet-stream")
be.sem.ReleaseToken()
debug.Log("%v -> %v bytes, err %#v: %v", objName, n, err, err) debug.Log("%v -> %v bytes, err %#v: %v", objName, n, err, err)
@ -358,15 +359,18 @@ func (be *Backend) Stat(ctx context.Context, h restic.Handle) (bi restic.FileInf
objName := be.Filename(h) objName := be.Filename(h)
var obj *minio.Object var obj *minio.Object
be.sem.GetToken()
obj, err = be.client.GetObject(be.cfg.Bucket, objName) obj, err = be.client.GetObject(be.cfg.Bucket, objName)
if err != nil { if err != nil {
debug.Log("GetObject() err %v", err) debug.Log("GetObject() err %v", err)
be.sem.ReleaseToken()
return restic.FileInfo{}, errors.Wrap(err, "client.GetObject") return restic.FileInfo{}, errors.Wrap(err, "client.GetObject")
} }
// make sure that the object is closed properly. // make sure that the object is closed properly.
defer func() { defer func() {
e := obj.Close() e := obj.Close()
be.sem.ReleaseToken()
if err == nil { if err == nil {
err = errors.Wrap(e, "Close") err = errors.Wrap(e, "Close")
} }
@ -385,7 +389,11 @@ func (be *Backend) Stat(ctx context.Context, h restic.Handle) (bi restic.FileInf
func (be *Backend) Test(ctx context.Context, h restic.Handle) (bool, error) { func (be *Backend) Test(ctx context.Context, h restic.Handle) (bool, error) {
found := false found := false
objName := be.Filename(h) objName := be.Filename(h)
be.sem.GetToken()
_, err := be.client.StatObject(be.cfg.Bucket, objName) _, err := be.client.StatObject(be.cfg.Bucket, objName)
be.sem.ReleaseToken()
if err == nil { if err == nil {
found = true found = true
} }
@ -397,7 +405,11 @@ func (be *Backend) Test(ctx context.Context, h restic.Handle) (bool, error) {
// Remove removes the blob with the given name and type. // Remove removes the blob with the given name and type.
func (be *Backend) Remove(ctx context.Context, h restic.Handle) error { func (be *Backend) Remove(ctx context.Context, h restic.Handle) error {
objName := be.Filename(h) objName := be.Filename(h)
be.sem.GetToken()
err := be.client.RemoveObject(be.cfg.Bucket, objName) err := be.client.RemoveObject(be.cfg.Bucket, objName)
be.sem.ReleaseToken()
debug.Log("Remove(%v) at %v -> err %v", h, objName, err) debug.Log("Remove(%v) at %v -> err %v", h, objName, err)
if be.IsNotExist(err) { if be.IsNotExist(err) {