diff --git a/src/restic/backend/mem/mem_backend.go b/src/restic/backend/mem/mem_backend.go index cee81799a..abd68fcf6 100644 --- a/src/restic/backend/mem/mem_backend.go +++ b/src/restic/backend/mem/mem_backend.go @@ -8,7 +8,6 @@ import ( "restic" "sync" - "restic/backend" "restic/errors" "restic/debug" @@ -121,7 +120,7 @@ func (be *MemoryBackend) Load(ctx context.Context, h restic.Handle, length int, buf = buf[:length] } - return backend.Closer{Reader: bytes.NewReader(buf)}, nil + return ioutil.NopCloser(bytes.NewReader(buf)), nil } // Stat returns information about a file in the backend. diff --git a/src/restic/backend/rest/rest.go b/src/restic/backend/rest/rest.go index 99dc2ba63..e7976d244 100644 --- a/src/restic/backend/rest/rest.go +++ b/src/restic/backend/rest/rest.go @@ -108,9 +108,8 @@ func (b *restBackend) Save(ctx context.Context, h restic.Handle, rd io.Reader) ( ctx, cancel := context.WithCancel(ctx) defer cancel() - // make sure that client.Post() cannot close the reader by wrapping it in - // backend.Closer, which has a noop method. - rd = backend.Closer{Reader: rd} + // make sure that client.Post() cannot close the reader by wrapping it + rd = ioutil.NopCloser(rd) b.sem.GetToken() resp, err := ctxhttp.Post(ctx, b.client, b.Filename(h), "binary/octet-stream", rd) diff --git a/src/restic/backend/s3/s3.go b/src/restic/backend/s3/s3.go index aa557e7e4..2fb88d2a6 100644 --- a/src/restic/backend/s3/s3.go +++ b/src/restic/backend/s3/s3.go @@ -4,6 +4,8 @@ import ( "context" "fmt" "io" + "io/ioutil" + "net/url" "os" "path" "restic" @@ -14,16 +16,16 @@ import ( "restic/errors" "github.com/minio/minio-go" + "github.com/minio/minio-go/pkg/s3utils" "restic/debug" ) // Backend stores data on an S3 endpoint. type Backend struct { - client *minio.Client - sem *backend.Semaphore - bucketname string - prefix string + client *minio.Client + sem *backend.Semaphore + cfg Config backend.Layout } @@ -48,10 +50,9 @@ func Open(cfg Config) (restic.Backend, error) { } be := &Backend{ - client: client, - sem: sem, - bucketname: cfg.Bucket, - prefix: cfg.Prefix, + client: client, + sem: sem, + cfg: cfg, } client.SetCustomTransport(backend.Transport()) @@ -118,7 +119,7 @@ func (be *Backend) ReadDir(dir string) (list []os.FileInfo, err error) { done := make(chan struct{}) defer close(done) - for obj := range be.client.ListObjects(be.bucketname, dir, false, done) { + for obj := range be.client.ListObjects(be.cfg.Bucket, dir, false, done) { if obj.Key == "" { continue } @@ -149,12 +150,25 @@ func (be *Backend) ReadDir(dir string) (list []os.FileInfo, err error) { // Location returns this backend's location (the bucket name). func (be *Backend) Location() string { - return be.Join(be.bucketname, be.prefix) + return be.Join(be.cfg.Bucket, be.cfg.Prefix) } // Path returns the path in the bucket that is used for this backend. func (be *Backend) Path() string { - return be.prefix + return be.cfg.Prefix +} + +func (be *Backend) isGoogleCloudStorage() bool { + scheme := "https://" + if be.cfg.UseHTTP { + scheme = "http://" + } + url, err := url.Parse(scheme + be.cfg.Endpoint) + if err != nil { + panic(err) + } + + return s3utils.IsGoogleEndpoint(*url) } // Save stores data in the backend at the handle. @@ -168,15 +182,20 @@ func (be *Backend) Save(ctx context.Context, h restic.Handle, rd io.Reader) (err objName := be.Filename(h) // Check key does not already exist - _, err = be.client.StatObject(be.bucketname, objName) + _, err = be.client.StatObject(be.cfg.Bucket, objName) if err == nil { debug.Log("%v already exists", h) return errors.New("key already exists") } + // prevent GCS from closing the file + if be.isGoogleCloudStorage() { + rd = ioutil.NopCloser(rd) + } + be.sem.GetToken() - debug.Log("PutObject(%v, %v)", be.bucketname, objName) - n, err := be.client.PutObject(be.bucketname, objName, rd, "application/octet-stream") + debug.Log("PutObject(%v, %v)", be.cfg.Bucket, objName) + n, err := be.client.PutObject(be.cfg.Bucket, objName, rd, "application/octet-stream") be.sem.ReleaseToken() debug.Log("%v -> %v bytes, err %#v: %v", objName, n, err, err) @@ -226,7 +245,7 @@ func (be *Backend) Load(ctx context.Context, h restic.Handle, length int, offset debug.Log("Load(%v) send range %v", h, byteRange) coreClient := minio.Core{Client: be.client} - rd, _, err := coreClient.GetObject(be.bucketname, objName, headers) + rd, _, err := coreClient.GetObject(be.cfg.Bucket, objName, headers) if err != nil { be.sem.ReleaseToken() return nil, err @@ -250,7 +269,7 @@ func (be *Backend) Stat(ctx context.Context, h restic.Handle) (bi restic.FileInf objName := be.Filename(h) var obj *minio.Object - obj, err = be.client.GetObject(be.bucketname, objName) + obj, err = be.client.GetObject(be.cfg.Bucket, objName) if err != nil { debug.Log("GetObject() err %v", err) return restic.FileInfo{}, errors.Wrap(err, "client.GetObject") @@ -277,7 +296,7 @@ func (be *Backend) Stat(ctx context.Context, h restic.Handle) (bi restic.FileInf func (be *Backend) Test(ctx context.Context, h restic.Handle) (bool, error) { found := false objName := be.Filename(h) - _, err := be.client.StatObject(be.bucketname, objName) + _, err := be.client.StatObject(be.cfg.Bucket, objName) if err == nil { found = true } @@ -289,8 +308,13 @@ func (be *Backend) Test(ctx context.Context, h restic.Handle) (bool, error) { // Remove removes the blob with the given name and type. func (be *Backend) Remove(ctx context.Context, h restic.Handle) error { objName := be.Filename(h) - err := be.client.RemoveObject(be.bucketname, objName) + err := be.client.RemoveObject(be.cfg.Bucket, objName) debug.Log("Remove(%v) at %v -> err %v", h, objName, err) + + if be.IsNotExist(err) { + err = nil + } + return errors.Wrap(err, "client.RemoveObject") } @@ -308,7 +332,7 @@ func (be *Backend) List(ctx context.Context, t restic.FileType) <-chan string { prefix += "/" } - listresp := be.client.ListObjects(be.bucketname, prefix, true, ctx.Done()) + listresp := be.client.ListObjects(be.cfg.Bucket, prefix, true, ctx.Done()) go func() { defer close(ch) @@ -372,11 +396,11 @@ func (be *Backend) Rename(h restic.Handle, l backend.Layout) error { debug.Log(" %v -> %v", oldname, newname) coreClient := minio.Core{Client: be.client} - err := coreClient.CopyObject(be.bucketname, newname, path.Join(be.bucketname, oldname), minio.CopyConditions{}) + err := coreClient.CopyObject(be.cfg.Bucket, newname, path.Join(be.cfg.Bucket, oldname), minio.CopyConditions{}) if err != nil { debug.Log("copy failed: %v", err) return err } - return be.client.RemoveObject(be.bucketname, oldname) + return be.client.RemoveObject(be.cfg.Bucket, oldname) } diff --git a/src/restic/backend/s3/s3_test.go b/src/restic/backend/s3/s3_test.go index 12ca73a10..7d080063a 100644 --- a/src/restic/backend/s3/s3_test.go +++ b/src/restic/backend/s3/s3_test.go @@ -103,6 +103,21 @@ type MinioTestConfig struct { stopServer func() } +func openS3(t testing.TB, cfg MinioTestConfig) (be restic.Backend, err error) { + for i := 0; i < 10; i++ { + be, err = s3.Open(cfg.Config) + if err != nil { + t.Logf("s3 open: try %d: error %v", i, err) + time.Sleep(500 * time.Millisecond) + continue + } + + break + } + + return be, err +} + func newMinioTestSuite(ctx context.Context, t testing.TB) *test.Suite { return &test.Suite{ // NewConfig returns a config for a new temporary backend that will be used in tests. @@ -127,7 +142,7 @@ func newMinioTestSuite(ctx context.Context, t testing.TB) *test.Suite { Create: func(config interface{}) (restic.Backend, error) { cfg := config.(MinioTestConfig) - be, err := s3.Open(cfg.Config) + be, err := openS3(t, cfg) if err != nil { return nil, err } diff --git a/src/restic/backend/test/tests.go b/src/restic/backend/test/tests.go index b6da7182d..4447064cf 100644 --- a/src/restic/backend/test/tests.go +++ b/src/restic/backend/test/tests.go @@ -446,11 +446,32 @@ func delayedRemove(b restic.Backend, h restic.Handle) error { found, err := b.Test(context.TODO(), h) for i := 0; found && i < 20; i++ { found, err = b.Test(context.TODO(), h) - if found { + if err != nil { + return err + } + + if !found { + break + } + + time.Sleep(100 * time.Millisecond) + } + return err +} + +func delayedList(t testing.TB, b restic.Backend, tpe restic.FileType, max int) restic.IDs { + list := restic.NewIDSet() + for i := 0; i < max; i++ { + for s := range b.List(context.TODO(), tpe) { + id := restic.TestParseID(s) + list.Insert(id) + } + if len(list) < max { time.Sleep(100 * time.Millisecond) } } - return err + + return list.List() } // TestBackend tests all functions of the backend. @@ -548,12 +569,7 @@ func (s *Suite) TestBackend(t *testing.T) { IDs = append(IDs, id) } - list := restic.IDs{} - - for s := range b.List(context.TODO(), tpe) { - list = append(list, restic.TestParseID(s)) - } - + list := delayedList(t, b, tpe, len(IDs)) if len(IDs) != len(list) { t.Fatalf("wrong number of IDs returned: want %d, got %d", len(IDs), len(list)) } @@ -581,7 +597,7 @@ func (s *Suite) TestBackend(t *testing.T) { found, err = b.Test(context.TODO(), h) test.OK(t, err) - test.Assert(t, !found, fmt.Sprintf("id %q not found after removal", id)) + test.Assert(t, !found, fmt.Sprintf("id %q found after removal", id)) } } } diff --git a/src/restic/backend/utils.go b/src/restic/backend/utils.go index a07c7e86e..76e9de569 100644 --- a/src/restic/backend/utils.go +++ b/src/restic/backend/utils.go @@ -29,16 +29,6 @@ func LoadAll(ctx context.Context, be restic.Backend, h restic.Handle) (buf []byt return ioutil.ReadAll(rd) } -// Closer wraps an io.Reader and adds a Close() method that does nothing. -type Closer struct { - io.Reader -} - -// Close is a no-op. -func (c Closer) Close() error { - return nil -} - // LimitedReadCloser wraps io.LimitedReader and exposes the Close() method. type LimitedReadCloser struct { io.ReadCloser