From ba75a3884cf97a273fbd07a153ba7e186874834b Mon Sep 17 00:00:00 2001 From: Dipta Das Date: Sat, 8 Jul 2017 06:34:23 -0700 Subject: [PATCH] Add Google Cloud Storage as backend Environment variables: GOOGLE_PROJECT_ID=gcp-project-id GOOGLE_APPLICATION_CREDENTIALS=path-to-json-file Environment variables for test: RESTIC_TEST_GS_PROJECT_ID=gcp-project-id RESTIC_TEST_GS_APPLICATION_CREDENTIALS=path-to-json-file RESTIC_TEST_GS_REPOSITORY=gs:us-central1/test-bucket Init repository: $ restic -r gs:bucket:/[prefix] init --- cmd/restic/global.go | 30 ++ internal/backend/gs/config.go | 57 ++++ internal/backend/gs/config_test.go | 40 +++ internal/backend/gs/gs.go | 404 ++++++++++++++++++++++++++ internal/backend/gs/gs_test.go | 121 ++++++++ internal/backend/location/location.go | 2 + 6 files changed, 654 insertions(+) create mode 100644 internal/backend/gs/config.go create mode 100644 internal/backend/gs/config_test.go create mode 100644 internal/backend/gs/gs.go create mode 100644 internal/backend/gs/gs_test.go diff --git a/cmd/restic/global.go b/cmd/restic/global.go index 874a26408..e8ed6c047 100644 --- a/cmd/restic/global.go +++ b/cmd/restic/global.go @@ -11,6 +11,7 @@ import ( "syscall" "github.com/restic/restic/internal/backend/b2" + "github.com/restic/restic/internal/backend/gs" "github.com/restic/restic/internal/backend/local" "github.com/restic/restic/internal/backend/location" "github.com/restic/restic/internal/backend/rest" @@ -363,6 +364,31 @@ func parseConfig(loc location.Location, opts options.Options) (interface{}, erro debug.Log("opening s3 repository at %#v", cfg) return cfg, nil + case "gs": + cfg := loc.Config.(gs.Config) + if cfg.ProjectID == "" { + cfg.ProjectID = os.Getenv("GOOGLE_PROJECT_ID") + } + + if cfg.JSONKeyPath == "" { + if path := os.Getenv("GOOGLE_APPLICATION_CREDENTIALS"); path != "" { + // Check read access + if _, err := ioutil.ReadFile(path); err != nil { + return nil, errors.Fatalf("Failed to read google credential from file %v: %v", path, err) + } + cfg.JSONKeyPath = path + } else { + return nil, errors.Fatal("No credential file path is set") + } + } + + if err := opts.Apply(loc.Scheme, &cfg); err != nil { + return nil, err + } + + debug.Log("opening gs repository at %#v", cfg) + return cfg, nil + case "swift": cfg := loc.Config.(swift.Config) @@ -429,6 +455,8 @@ func open(s string, opts options.Options) (restic.Backend, error) { be, err = sftp.Open(cfg.(sftp.Config)) case "s3": be, err = s3.Open(cfg.(s3.Config)) + case "gs": + be, err = gs.Open(cfg.(gs.Config)) case "swift": be, err = swift.Open(cfg.(swift.Config)) case "b2": @@ -477,6 +505,8 @@ func create(s string, opts options.Options) (restic.Backend, error) { return sftp.Create(cfg.(sftp.Config)) case "s3": return s3.Create(cfg.(s3.Config)) + case "gs": + return gs.Create(cfg.(gs.Config)) case "swift": return swift.Open(cfg.(swift.Config)) case "b2": diff --git a/internal/backend/gs/config.go b/internal/backend/gs/config.go new file mode 100644 index 000000000..29ebc1c0c --- /dev/null +++ b/internal/backend/gs/config.go @@ -0,0 +1,57 @@ +package gs + +import ( + "errors" + "path" + "strings" + + "github.com/restic/restic/internal/options" +) + +// Config contains all configuration necessary to connect to an gcs compatible +// server. +type Config struct { + ProjectID string + JSONKeyPath string + Bucket string + Prefix string + + Connections uint `option:"connections" help:"set a limit for the number of concurrent connections (default: 20)"` +} + +// NewConfig returns a new Config with the default values filled in. +func NewConfig() Config { + return Config{ + Connections: 5, + } +} + +func init() { + options.Register("gs", Config{}) +} + +// ParseConfig parses the string s and extracts the gcs config. The +// supported configuration format is gs:bucketName:/[prefix]. +func ParseConfig(s string) (interface{}, error) { + if strings.HasPrefix(s, "gs:") { + s = s[3:] + } else { + return nil, errors.New("gs: invalid format") + } + // use the first entry of the path as the bucket name and the + // remainder as prefix + path := strings.SplitN(s, ":/", 2) + return createConfig(path) +} + +func createConfig(p []string) (interface{}, error) { + if len(p) < 2 { + return nil, errors.New("gs: invalid format, bucket name not found") + } + cfg := NewConfig() + cfg.Bucket = p[0] + if p[1] != "" { + cfg.Prefix = path.Clean(p[1]) + } + return cfg, nil +} diff --git a/internal/backend/gs/config_test.go b/internal/backend/gs/config_test.go new file mode 100644 index 000000000..fb03e3a20 --- /dev/null +++ b/internal/backend/gs/config_test.go @@ -0,0 +1,40 @@ +package gs + +import "testing" + +var configTests = []struct { + s string + cfg Config +}{ + {"gs:bucketname:/", Config{ + Bucket: "bucketname", + Prefix: "", + Connections: 5, + }}, + {"gs:bucketname:/prefix/directory", Config{ + Bucket: "bucketname", + Prefix: "prefix/directory", + Connections: 5, + }}, + {"gs:bucketname:/prefix/directory/", Config{ + Bucket: "bucketname", + Prefix: "prefix/directory", + Connections: 5, + }}, +} + +func TestParseConfig(t *testing.T) { + for i, test := range configTests { + cfg, err := ParseConfig(test.s) + if err != nil { + t.Errorf("test %d:%s failed: %v", i, test.s, err) + continue + } + + if cfg != test.cfg { + t.Errorf("test %d:\ninput:\n %s\n wrong config, want:\n %v\ngot:\n %v", + i, test.s, test.cfg, cfg) + continue + } + } +} diff --git a/internal/backend/gs/gs.go b/internal/backend/gs/gs.go new file mode 100644 index 000000000..7a3796ce9 --- /dev/null +++ b/internal/backend/gs/gs.go @@ -0,0 +1,404 @@ +package gs + +import ( + "context" + "fmt" + "io" + "os" + "path" + "strings" + "time" + + "github.com/pkg/errors" + "github.com/restic/restic/internal/backend" + "github.com/restic/restic/internal/debug" + "github.com/restic/restic/internal/restic" + + "io/ioutil" + + "golang.org/x/oauth2/google" + "google.golang.org/api/googleapi" + storage "google.golang.org/api/storage/v1" +) + +// Backend stores data on an gs endpoint. +type Backend struct { + service *storage.Service + projectID string + sem *backend.Semaphore + bucketName string + prefix string + backend.Layout +} + +// make sure that *Backend implements backend.Backend +var _ restic.Backend = &Backend{} + +func getStorageService(jsonKeyPath string) (*storage.Service, error) { + + raw, err := ioutil.ReadFile(jsonKeyPath) + if err != nil { + return nil, errors.Wrap(err, "ReadFile") + } + + conf, err := google.JWTConfigFromJSON(raw, storage.DevstorageReadWriteScope) + if err != nil { + return nil, err + } + + client := conf.Client(context.TODO()) + + service, err := storage.New(client) + if err != nil { + return nil, err + } + + return service, nil +} + +func open(cfg Config) (*Backend, error) { + debug.Log("open, config %#v", cfg) + + service, err := getStorageService(cfg.JSONKeyPath) + if err != nil { + return nil, errors.Wrap(err, "getStorageService") + } + + sem, err := backend.NewSemaphore(cfg.Connections) + if err != nil { + return nil, err + } + + be := &Backend{ + service: service, + projectID: cfg.ProjectID, + sem: sem, + bucketName: cfg.Bucket, + prefix: cfg.Prefix, + Layout: &backend.DefaultLayout{ + Path: cfg.Prefix, + Join: path.Join, + }, + } + + return be, nil +} + +// Open opens the gs backend at bucket and region. +func Open(cfg Config) (restic.Backend, error) { + return open(cfg) +} + +// Create opens the S3 backend at bucket and region and creates the bucket if +// it does not exist yet. +func Create(cfg Config) (restic.Backend, error) { + be, err := open(cfg) + + if err != nil { + return nil, errors.Wrap(err, "open") + } + + // Create bucket if not exists + if _, err := be.service.Buckets.Get(be.bucketName).Do(); err != nil { + bucket := &storage.Bucket{ + Name: be.bucketName, + } + + if _, err := be.service.Buckets.Insert(be.projectID, bucket).Do(); err != nil { + return nil, errors.Wrap(err, "service.Buckets.Insert") + } + } + + return be, nil +} + +// IsNotExist returns true if the error is caused by a not existing file. +func (be *Backend) IsNotExist(err error) bool { + debug.Log("IsNotExist(%T, %#v)", err, err) + + if os.IsNotExist(err) { + return true + } + + if er, ok := err.(*googleapi.Error); ok { + if er.Code == 404 { + return true + } + } + + return false +} + +// Join combines path components with slashes. +func (be *Backend) Join(p ...string) string { + return path.Join(p...) +} + +type fileInfo struct { + name string + size int64 + mode os.FileMode + modTime time.Time + isDir bool +} + +func (fi fileInfo) Name() string { return fi.name } // base name of the file +func (fi fileInfo) Size() int64 { return fi.size } // length in bytes for regular files; system-dependent for others +func (fi fileInfo) Mode() os.FileMode { return fi.mode } // file mode bits +func (fi fileInfo) ModTime() time.Time { return fi.modTime } // modification time +func (fi fileInfo) IsDir() bool { return fi.isDir } // abbreviation for Mode().IsDir() +func (fi fileInfo) Sys() interface{} { return nil } // underlying data source (can return nil) + +// ReadDir returns the entries for a directory. +func (be *Backend) ReadDir(dir string) (list []os.FileInfo, err error) { + debug.Log("ReadDir(%v)", dir) + + // make sure dir ends with a slash + if dir[len(dir)-1] != '/' { + dir += "/" + } + + obj, err := be.service.Objects.List(be.bucketName).Prefix(dir).Delimiter("/").Do() + if err != nil { + return nil, err + } + + for _, item := range obj.Prefixes { + entry := fileInfo{ + name: strings.TrimPrefix(item, dir), + isDir: true, + mode: os.ModeDir | 0755, + } + list = append(list, entry) + } + for _, item := range obj.Items { + entry := fileInfo{ + name: strings.TrimPrefix(item.Name, dir), + isDir: false, + mode: 0644, + size: int64(item.Size), + //modTime: item.Updated, + } + if entry.name != "" { + list = append(list, entry) + } + } + + return list, nil +} + +// Location returns this backend's location (the bucket name). +func (be *Backend) Location() string { + return be.Join(be.bucketName, be.prefix) +} + +// Path returns the path in the bucket that is used for this backend. +func (be *Backend) Path() string { + return be.prefix +} + +// Save stores data in the backend at the handle. +func (be *Backend) Save(ctx context.Context, h restic.Handle, rd io.Reader) (err error) { + if err := h.Valid(); err != nil { + return err + } + + objName := be.Filename(h) + + debug.Log("Save %v at %v", h, objName) + + // Check key does not already exist + if _, err := be.service.Objects.Get(be.bucketName, objName).Do(); err == nil { + debug.Log("%v already exists", h) + return errors.New("key already exists") + } + + be.sem.GetToken() + + debug.Log("InsertObject(%v, %v)", be.bucketName, objName) + + info, err := be.service.Objects.Insert(be.bucketName, + &storage.Object{ + Name: objName, + }).Media(rd).Do() + + be.sem.ReleaseToken() + debug.Log("%v -> %v bytes, err %#v: %v", objName, info.Size, err, err) + + return errors.Wrap(err, "service.Objects.Insert") +} + +// wrapReader wraps an io.ReadCloser to run an additional function on Close. +type wrapReader struct { + io.ReadCloser + f func() +} + +func (wr wrapReader) Close() error { + err := wr.ReadCloser.Close() + wr.f() + return err +} + +// Load returns a reader that yields the contents of the file at h at the +// given offset. If length is nonzero, only a portion of the file is +// returned. rd must be closed after use. +func (be *Backend) Load(ctx context.Context, h restic.Handle, length int, offset int64) (io.ReadCloser, error) { + debug.Log("Load %v, length %v, offset %v from %v", h, length, offset, be.Filename(h)) + if err := h.Valid(); err != nil { + return nil, err + } + + if offset < 0 { + return nil, errors.New("offset is negative") + } + + if length < 0 { + return nil, errors.Errorf("invalid length %d", length) + } + + objName := be.Filename(h) + + be.sem.GetToken() + + var byteRange string + if length > 0 { + byteRange = fmt.Sprintf("bytes=%d-%d", offset, offset+int64(length-1)) + } else { + byteRange = fmt.Sprintf("bytes=%d-", offset) + } + + req := be.service.Objects.Get(be.bucketName, objName) + // https://cloud.google.com/storage/docs/json_api/v1/parameters#range + req.Header().Set("Range", byteRange) + res, err := req.Download() + if err != nil { + be.sem.ReleaseToken() + return nil, err + } + + closeRd := wrapReader{ + ReadCloser: res.Body, + f: func() { + debug.Log("Close()") + be.sem.ReleaseToken() + }, + } + + return closeRd, err +} + +// Stat returns information about a blob. +func (be *Backend) Stat(ctx context.Context, h restic.Handle) (bi restic.FileInfo, err error) { + debug.Log("%v", h) + + objName := be.Filename(h) + + obj, err := be.service.Objects.Get(be.bucketName, objName).Do() + if err != nil { + debug.Log("GetObject() err %v", err) + return restic.FileInfo{}, errors.Wrap(err, "service.Objects.Get") + } + + return restic.FileInfo{Size: int64(obj.Size)}, nil +} + +// Test returns true if a blob of the given type and name exists in the backend. +func (be *Backend) Test(ctx context.Context, h restic.Handle) (bool, error) { + found := false + objName := be.Filename(h) + _, err := be.service.Objects.Get(be.bucketName, objName).Do() + if err == nil { + found = true + } + // If error, then not found + return found, nil +} + +// Remove removes the blob with the given name and type. +func (be *Backend) Remove(ctx context.Context, h restic.Handle) error { + objName := be.Filename(h) + + err := be.service.Objects.Delete(be.bucketName, objName).Do() + if er, ok := err.(*googleapi.Error); ok { + if er.Code == 404 { + err = nil + } + } + + debug.Log("Remove(%v) at %v -> err %v", h, objName, err) + return errors.Wrap(err, "client.RemoveObject") +} + +// List returns a channel that yields all names of blobs of type t. A +// goroutine is started for this. If the channel done is closed, sending +// stops. +func (be *Backend) List(ctx context.Context, t restic.FileType) <-chan string { + debug.Log("listing %v", t) + ch := make(chan string) + + prefix := be.Dirname(restic.Handle{Type: t}) + + // make sure prefix ends with a slash + if prefix[len(prefix)-1] != '/' { + prefix += "/" + } + + go func() { + defer close(ch) + + obj, err := be.service.Objects.List(be.bucketName).Prefix(prefix).Do() + if err != nil { + return + } + + for _, item := range obj.Items { + m := strings.TrimPrefix(item.Name, prefix) + if m == "" { + continue + } + + select { + case ch <- path.Base(m): + case <-ctx.Done(): + return + } + } + }() + + return ch +} + +// Remove keys for a specified backend type. +func (be *Backend) removeKeys(ctx context.Context, t restic.FileType) error { + for key := range be.List(ctx, restic.DataFile) { + err := be.Remove(ctx, restic.Handle{Type: restic.DataFile, Name: key}) + if err != nil { + return err + } + } + + return nil +} + +// Delete removes all restic keys in the bucket. It will not remove the bucket itself. +func (be *Backend) Delete(ctx context.Context) error { + alltypes := []restic.FileType{ + restic.DataFile, + restic.KeyFile, + restic.LockFile, + restic.SnapshotFile, + restic.IndexFile} + + for _, t := range alltypes { + err := be.removeKeys(ctx, t) + if err != nil { + return nil + } + } + + return be.Remove(ctx, restic.Handle{Type: restic.ConfigFile}) +} + +// Close does nothing +func (be *Backend) Close() error { return nil } diff --git a/internal/backend/gs/gs_test.go b/internal/backend/gs/gs_test.go new file mode 100644 index 000000000..b93e698e9 --- /dev/null +++ b/internal/backend/gs/gs_test.go @@ -0,0 +1,121 @@ +package gs_test + +import ( + "context" + "errors" + "fmt" + "os" + "testing" + "time" + + "github.com/restic/restic/internal/backend/gs" + "github.com/restic/restic/internal/backend/test" + "github.com/restic/restic/internal/restic" + . "github.com/restic/restic/internal/test" +) + +func newGSTestSuite(t testing.TB) *test.Suite { + return &test.Suite{ + // do not use excessive data + MinimalData: true, + + // NewConfig returns a config for a new temporary backend that will be used in tests. + NewConfig: func() (interface{}, error) { + gscfg, err := gs.ParseConfig(os.Getenv("RESTIC_TEST_GS_REPOSITORY")) + if err != nil { + return nil, err + } + + cfg := gscfg.(gs.Config) + cfg.ProjectID = os.Getenv("RESTIC_TEST_GS_PROJECT_ID") + cfg.JSONKeyPath = os.Getenv("RESTIC_TEST_GS_APPLICATION_CREDENTIALS") + cfg.Prefix = fmt.Sprintf("test-%d", time.Now().UnixNano()) + return cfg, nil + }, + + // CreateFn is a function that creates a temporary repository for the tests. + Create: func(config interface{}) (restic.Backend, error) { + cfg := config.(gs.Config) + + be, err := gs.Create(cfg) + if err != nil { + return nil, err + } + + exists, err := be.Test(context.TODO(), restic.Handle{Type: restic.ConfigFile}) + if err != nil { + return nil, err + } + + if exists { + return nil, errors.New("config already exists") + } + + return be, nil + }, + + // OpenFn is a function that opens a previously created temporary repository. + Open: func(config interface{}) (restic.Backend, error) { + cfg := config.(gs.Config) + return gs.Open(cfg) + }, + + // CleanupFn removes data created during the tests. + Cleanup: func(config interface{}) error { + cfg := config.(gs.Config) + + be, err := gs.Open(cfg) + if err != nil { + return err + } + + if err := be.(restic.Deleter).Delete(context.TODO()); err != nil { + return err + } + + return nil + }, + } +} + +func TestBackendGS(t *testing.T) { + defer func() { + if t.Skipped() { + SkipDisallowed(t, "restic/backend/gs.TestBackendGS") + } + }() + + vars := []string{ + "RESTIC_TEST_GS_PROJECT_ID", + "RESTIC_TEST_GS_APPLICATION_CREDENTIALS", + "RESTIC_TEST_GS_REPOSITORY", + } + + for _, v := range vars { + if os.Getenv(v) == "" { + t.Skipf("environment variable %v not set", v) + return + } + } + + t.Logf("run tests") + newGSTestSuite(t).RunTests(t) +} + +func BenchmarkBackendGS(t *testing.B) { + vars := []string{ + "RESTIC_TEST_GS_PROJECT_ID", + "RESTIC_TEST_GS_APPLICATION_CREDENTIALS", + "RESTIC_TEST_GS_REPOSITORY", + } + + for _, v := range vars { + if os.Getenv(v) == "" { + t.Skipf("environment variable %v not set", v) + return + } + } + + t.Logf("run tests") + newGSTestSuite(t).RunBenchmarks(t) +} diff --git a/internal/backend/location/location.go b/internal/backend/location/location.go index 0466325a7..4187de024 100644 --- a/internal/backend/location/location.go +++ b/internal/backend/location/location.go @@ -5,6 +5,7 @@ import ( "strings" "github.com/restic/restic/internal/backend/b2" + "github.com/restic/restic/internal/backend/gs" "github.com/restic/restic/internal/backend/local" "github.com/restic/restic/internal/backend/rest" "github.com/restic/restic/internal/backend/s3" @@ -32,6 +33,7 @@ var parsers = []parser{ {"local", local.ParseConfig}, {"sftp", sftp.ParseConfig}, {"s3", s3.ParseConfig}, + {"gs", gs.ParseConfig}, {"swift", swift.ParseConfig}, {"rest", rest.ParseConfig}, }