mirror of
https://github.com/octoleo/restic.git
synced 2024-11-23 05:12:10 +00:00
Merge pull request #1038 from restic/s3-prevent-close
Improve GCS support
This commit is contained in:
commit
eadf5dcb2d
@ -8,7 +8,6 @@ import (
|
|||||||
"restic"
|
"restic"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"restic/backend"
|
|
||||||
"restic/errors"
|
"restic/errors"
|
||||||
|
|
||||||
"restic/debug"
|
"restic/debug"
|
||||||
@ -121,7 +120,7 @@ func (be *MemoryBackend) Load(ctx context.Context, h restic.Handle, length int,
|
|||||||
buf = buf[:length]
|
buf = buf[:length]
|
||||||
}
|
}
|
||||||
|
|
||||||
return backend.Closer{Reader: bytes.NewReader(buf)}, nil
|
return ioutil.NopCloser(bytes.NewReader(buf)), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Stat returns information about a file in the backend.
|
// Stat returns information about a file in the backend.
|
||||||
|
@ -108,9 +108,8 @@ func (b *restBackend) Save(ctx context.Context, h restic.Handle, rd io.Reader) (
|
|||||||
ctx, cancel := context.WithCancel(ctx)
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
// make sure that client.Post() cannot close the reader by wrapping it in
|
// make sure that client.Post() cannot close the reader by wrapping it
|
||||||
// backend.Closer, which has a noop method.
|
rd = ioutil.NopCloser(rd)
|
||||||
rd = backend.Closer{Reader: rd}
|
|
||||||
|
|
||||||
b.sem.GetToken()
|
b.sem.GetToken()
|
||||||
resp, err := ctxhttp.Post(ctx, b.client, b.Filename(h), "binary/octet-stream", rd)
|
resp, err := ctxhttp.Post(ctx, b.client, b.Filename(h), "binary/octet-stream", rd)
|
||||||
|
@ -4,6 +4,8 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
|
"io/ioutil"
|
||||||
|
"net/url"
|
||||||
"os"
|
"os"
|
||||||
"path"
|
"path"
|
||||||
"restic"
|
"restic"
|
||||||
@ -14,16 +16,16 @@ import (
|
|||||||
"restic/errors"
|
"restic/errors"
|
||||||
|
|
||||||
"github.com/minio/minio-go"
|
"github.com/minio/minio-go"
|
||||||
|
"github.com/minio/minio-go/pkg/s3utils"
|
||||||
|
|
||||||
"restic/debug"
|
"restic/debug"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Backend stores data on an S3 endpoint.
|
// Backend stores data on an S3 endpoint.
|
||||||
type Backend struct {
|
type Backend struct {
|
||||||
client *minio.Client
|
client *minio.Client
|
||||||
sem *backend.Semaphore
|
sem *backend.Semaphore
|
||||||
bucketname string
|
cfg Config
|
||||||
prefix string
|
|
||||||
backend.Layout
|
backend.Layout
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -48,10 +50,9 @@ func Open(cfg Config) (restic.Backend, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
be := &Backend{
|
be := &Backend{
|
||||||
client: client,
|
client: client,
|
||||||
sem: sem,
|
sem: sem,
|
||||||
bucketname: cfg.Bucket,
|
cfg: cfg,
|
||||||
prefix: cfg.Prefix,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
client.SetCustomTransport(backend.Transport())
|
client.SetCustomTransport(backend.Transport())
|
||||||
@ -118,7 +119,7 @@ func (be *Backend) ReadDir(dir string) (list []os.FileInfo, err error) {
|
|||||||
done := make(chan struct{})
|
done := make(chan struct{})
|
||||||
defer close(done)
|
defer close(done)
|
||||||
|
|
||||||
for obj := range be.client.ListObjects(be.bucketname, dir, false, done) {
|
for obj := range be.client.ListObjects(be.cfg.Bucket, dir, false, done) {
|
||||||
if obj.Key == "" {
|
if obj.Key == "" {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
@ -149,12 +150,25 @@ func (be *Backend) ReadDir(dir string) (list []os.FileInfo, err error) {
|
|||||||
|
|
||||||
// Location returns this backend's location (the bucket name).
|
// Location returns this backend's location (the bucket name).
|
||||||
func (be *Backend) Location() string {
|
func (be *Backend) Location() string {
|
||||||
return be.Join(be.bucketname, be.prefix)
|
return be.Join(be.cfg.Bucket, be.cfg.Prefix)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Path returns the path in the bucket that is used for this backend.
|
// Path returns the path in the bucket that is used for this backend.
|
||||||
func (be *Backend) Path() string {
|
func (be *Backend) Path() string {
|
||||||
return be.prefix
|
return be.cfg.Prefix
|
||||||
|
}
|
||||||
|
|
||||||
|
func (be *Backend) isGoogleCloudStorage() bool {
|
||||||
|
scheme := "https://"
|
||||||
|
if be.cfg.UseHTTP {
|
||||||
|
scheme = "http://"
|
||||||
|
}
|
||||||
|
url, err := url.Parse(scheme + be.cfg.Endpoint)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return s3utils.IsGoogleEndpoint(*url)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Save stores data in the backend at the handle.
|
// Save stores data in the backend at the handle.
|
||||||
@ -168,15 +182,20 @@ func (be *Backend) Save(ctx context.Context, h restic.Handle, rd io.Reader) (err
|
|||||||
objName := be.Filename(h)
|
objName := be.Filename(h)
|
||||||
|
|
||||||
// Check key does not already exist
|
// Check key does not already exist
|
||||||
_, err = be.client.StatObject(be.bucketname, objName)
|
_, err = be.client.StatObject(be.cfg.Bucket, objName)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
debug.Log("%v already exists", h)
|
debug.Log("%v already exists", h)
|
||||||
return errors.New("key already exists")
|
return errors.New("key already exists")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// prevent GCS from closing the file
|
||||||
|
if be.isGoogleCloudStorage() {
|
||||||
|
rd = ioutil.NopCloser(rd)
|
||||||
|
}
|
||||||
|
|
||||||
be.sem.GetToken()
|
be.sem.GetToken()
|
||||||
debug.Log("PutObject(%v, %v)", be.bucketname, objName)
|
debug.Log("PutObject(%v, %v)", be.cfg.Bucket, objName)
|
||||||
n, err := be.client.PutObject(be.bucketname, objName, rd, "application/octet-stream")
|
n, err := be.client.PutObject(be.cfg.Bucket, objName, rd, "application/octet-stream")
|
||||||
be.sem.ReleaseToken()
|
be.sem.ReleaseToken()
|
||||||
|
|
||||||
debug.Log("%v -> %v bytes, err %#v: %v", objName, n, err, err)
|
debug.Log("%v -> %v bytes, err %#v: %v", objName, n, err, err)
|
||||||
@ -226,7 +245,7 @@ func (be *Backend) Load(ctx context.Context, h restic.Handle, length int, offset
|
|||||||
debug.Log("Load(%v) send range %v", h, byteRange)
|
debug.Log("Load(%v) send range %v", h, byteRange)
|
||||||
|
|
||||||
coreClient := minio.Core{Client: be.client}
|
coreClient := minio.Core{Client: be.client}
|
||||||
rd, _, err := coreClient.GetObject(be.bucketname, objName, headers)
|
rd, _, err := coreClient.GetObject(be.cfg.Bucket, objName, headers)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
be.sem.ReleaseToken()
|
be.sem.ReleaseToken()
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -250,7 +269,7 @@ func (be *Backend) Stat(ctx context.Context, h restic.Handle) (bi restic.FileInf
|
|||||||
objName := be.Filename(h)
|
objName := be.Filename(h)
|
||||||
var obj *minio.Object
|
var obj *minio.Object
|
||||||
|
|
||||||
obj, err = be.client.GetObject(be.bucketname, objName)
|
obj, err = be.client.GetObject(be.cfg.Bucket, objName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
debug.Log("GetObject() err %v", err)
|
debug.Log("GetObject() err %v", err)
|
||||||
return restic.FileInfo{}, errors.Wrap(err, "client.GetObject")
|
return restic.FileInfo{}, errors.Wrap(err, "client.GetObject")
|
||||||
@ -277,7 +296,7 @@ func (be *Backend) Stat(ctx context.Context, h restic.Handle) (bi restic.FileInf
|
|||||||
func (be *Backend) Test(ctx context.Context, h restic.Handle) (bool, error) {
|
func (be *Backend) Test(ctx context.Context, h restic.Handle) (bool, error) {
|
||||||
found := false
|
found := false
|
||||||
objName := be.Filename(h)
|
objName := be.Filename(h)
|
||||||
_, err := be.client.StatObject(be.bucketname, objName)
|
_, err := be.client.StatObject(be.cfg.Bucket, objName)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
found = true
|
found = true
|
||||||
}
|
}
|
||||||
@ -289,8 +308,13 @@ func (be *Backend) Test(ctx context.Context, h restic.Handle) (bool, error) {
|
|||||||
// Remove removes the blob with the given name and type.
|
// Remove removes the blob with the given name and type.
|
||||||
func (be *Backend) Remove(ctx context.Context, h restic.Handle) error {
|
func (be *Backend) Remove(ctx context.Context, h restic.Handle) error {
|
||||||
objName := be.Filename(h)
|
objName := be.Filename(h)
|
||||||
err := be.client.RemoveObject(be.bucketname, objName)
|
err := be.client.RemoveObject(be.cfg.Bucket, objName)
|
||||||
debug.Log("Remove(%v) at %v -> err %v", h, objName, err)
|
debug.Log("Remove(%v) at %v -> err %v", h, objName, err)
|
||||||
|
|
||||||
|
if be.IsNotExist(err) {
|
||||||
|
err = nil
|
||||||
|
}
|
||||||
|
|
||||||
return errors.Wrap(err, "client.RemoveObject")
|
return errors.Wrap(err, "client.RemoveObject")
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -308,7 +332,7 @@ func (be *Backend) List(ctx context.Context, t restic.FileType) <-chan string {
|
|||||||
prefix += "/"
|
prefix += "/"
|
||||||
}
|
}
|
||||||
|
|
||||||
listresp := be.client.ListObjects(be.bucketname, prefix, true, ctx.Done())
|
listresp := be.client.ListObjects(be.cfg.Bucket, prefix, true, ctx.Done())
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
defer close(ch)
|
defer close(ch)
|
||||||
@ -372,11 +396,11 @@ func (be *Backend) Rename(h restic.Handle, l backend.Layout) error {
|
|||||||
debug.Log(" %v -> %v", oldname, newname)
|
debug.Log(" %v -> %v", oldname, newname)
|
||||||
|
|
||||||
coreClient := minio.Core{Client: be.client}
|
coreClient := minio.Core{Client: be.client}
|
||||||
err := coreClient.CopyObject(be.bucketname, newname, path.Join(be.bucketname, oldname), minio.CopyConditions{})
|
err := coreClient.CopyObject(be.cfg.Bucket, newname, path.Join(be.cfg.Bucket, oldname), minio.CopyConditions{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
debug.Log("copy failed: %v", err)
|
debug.Log("copy failed: %v", err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
return be.client.RemoveObject(be.bucketname, oldname)
|
return be.client.RemoveObject(be.cfg.Bucket, oldname)
|
||||||
}
|
}
|
||||||
|
@ -103,6 +103,21 @@ type MinioTestConfig struct {
|
|||||||
stopServer func()
|
stopServer func()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func openS3(t testing.TB, cfg MinioTestConfig) (be restic.Backend, err error) {
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
be, err = s3.Open(cfg.Config)
|
||||||
|
if err != nil {
|
||||||
|
t.Logf("s3 open: try %d: error %v", i, err)
|
||||||
|
time.Sleep(500 * time.Millisecond)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
return be, err
|
||||||
|
}
|
||||||
|
|
||||||
func newMinioTestSuite(ctx context.Context, t testing.TB) *test.Suite {
|
func newMinioTestSuite(ctx context.Context, t testing.TB) *test.Suite {
|
||||||
return &test.Suite{
|
return &test.Suite{
|
||||||
// NewConfig returns a config for a new temporary backend that will be used in tests.
|
// NewConfig returns a config for a new temporary backend that will be used in tests.
|
||||||
@ -127,7 +142,7 @@ func newMinioTestSuite(ctx context.Context, t testing.TB) *test.Suite {
|
|||||||
Create: func(config interface{}) (restic.Backend, error) {
|
Create: func(config interface{}) (restic.Backend, error) {
|
||||||
cfg := config.(MinioTestConfig)
|
cfg := config.(MinioTestConfig)
|
||||||
|
|
||||||
be, err := s3.Open(cfg.Config)
|
be, err := openS3(t, cfg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -446,11 +446,32 @@ func delayedRemove(b restic.Backend, h restic.Handle) error {
|
|||||||
found, err := b.Test(context.TODO(), h)
|
found, err := b.Test(context.TODO(), h)
|
||||||
for i := 0; found && i < 20; i++ {
|
for i := 0; found && i < 20; i++ {
|
||||||
found, err = b.Test(context.TODO(), h)
|
found, err = b.Test(context.TODO(), h)
|
||||||
if found {
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if !found {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func delayedList(t testing.TB, b restic.Backend, tpe restic.FileType, max int) restic.IDs {
|
||||||
|
list := restic.NewIDSet()
|
||||||
|
for i := 0; i < max; i++ {
|
||||||
|
for s := range b.List(context.TODO(), tpe) {
|
||||||
|
id := restic.TestParseID(s)
|
||||||
|
list.Insert(id)
|
||||||
|
}
|
||||||
|
if len(list) < max {
|
||||||
time.Sleep(100 * time.Millisecond)
|
time.Sleep(100 * time.Millisecond)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return err
|
|
||||||
|
return list.List()
|
||||||
}
|
}
|
||||||
|
|
||||||
// TestBackend tests all functions of the backend.
|
// TestBackend tests all functions of the backend.
|
||||||
@ -548,12 +569,7 @@ func (s *Suite) TestBackend(t *testing.T) {
|
|||||||
IDs = append(IDs, id)
|
IDs = append(IDs, id)
|
||||||
}
|
}
|
||||||
|
|
||||||
list := restic.IDs{}
|
list := delayedList(t, b, tpe, len(IDs))
|
||||||
|
|
||||||
for s := range b.List(context.TODO(), tpe) {
|
|
||||||
list = append(list, restic.TestParseID(s))
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(IDs) != len(list) {
|
if len(IDs) != len(list) {
|
||||||
t.Fatalf("wrong number of IDs returned: want %d, got %d", len(IDs), len(list))
|
t.Fatalf("wrong number of IDs returned: want %d, got %d", len(IDs), len(list))
|
||||||
}
|
}
|
||||||
@ -581,7 +597,7 @@ func (s *Suite) TestBackend(t *testing.T) {
|
|||||||
|
|
||||||
found, err = b.Test(context.TODO(), h)
|
found, err = b.Test(context.TODO(), h)
|
||||||
test.OK(t, err)
|
test.OK(t, err)
|
||||||
test.Assert(t, !found, fmt.Sprintf("id %q not found after removal", id))
|
test.Assert(t, !found, fmt.Sprintf("id %q found after removal", id))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -29,16 +29,6 @@ func LoadAll(ctx context.Context, be restic.Backend, h restic.Handle) (buf []byt
|
|||||||
return ioutil.ReadAll(rd)
|
return ioutil.ReadAll(rd)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Closer wraps an io.Reader and adds a Close() method that does nothing.
|
|
||||||
type Closer struct {
|
|
||||||
io.Reader
|
|
||||||
}
|
|
||||||
|
|
||||||
// Close is a no-op.
|
|
||||||
func (c Closer) Close() error {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// LimitedReadCloser wraps io.LimitedReader and exposes the Close() method.
|
// LimitedReadCloser wraps io.LimitedReader and exposes the Close() method.
|
||||||
type LimitedReadCloser struct {
|
type LimitedReadCloser struct {
|
||||||
io.ReadCloser
|
io.ReadCloser
|
||||||
|
Loading…
Reference in New Issue
Block a user