From 122462b9b1c339419fa3e279f9716f0af1fd7df2 Mon Sep 17 00:00:00 2001 From: Alexander Neumann Date: Sun, 28 May 2017 10:19:01 +0200 Subject: [PATCH] Add Backblaze B2 backend This is based on prior work by Joe Turgeon @arithmetric. --- doc/manual.rst | 29 ++ run_integration_tests.go | 7 + src/cmds/restic/global.go | 22 ++ src/restic/backend/b2/b2.go | 371 +++++++++++++++++++ src/restic/backend/b2/b2_test.go | 93 +++++ src/restic/backend/b2/config.go | 93 +++++ src/restic/backend/b2/config_test.go | 92 +++++ src/restic/backend/location/location.go | 2 + src/restic/backend/location/location_test.go | 19 + src/restic/backend/semaphore.go | 23 ++ 10 files changed, 751 insertions(+) create mode 100644 src/restic/backend/b2/b2.go create mode 100644 src/restic/backend/b2/b2_test.go create mode 100644 src/restic/backend/b2/config.go create mode 100644 src/restic/backend/b2/config_test.go create mode 100644 src/restic/backend/semaphore.go diff --git a/doc/manual.rst b/doc/manual.rst index 06950a6a3..e1a8c13ac 100644 --- a/doc/manual.rst +++ b/doc/manual.rst @@ -282,6 +282,35 @@ this command. Please note that knowledge of your password is required to access the repository. Losing your password means that your data is irrecoverably lost. +Backblaze B2 +~~~~~~~~~~~~ + +Restic can backup data to any Backblaze B2 bucket. You need to first setup the +following environment variables with the credentials you obtained when signed +into your B2 account: + +.. code-block:: console + + $ export B2_ACCOUNT_ID= + $ export B2_ACCOUNT_KEY= + +You can then easily initialize a repository stored at Backblaze B2. If the +bucket does not exist yet, it will be created: + +.. code-block:: console + + $ restic -r b2:bucketname:path/to/repo init + enter password for new backend: + enter password again: + created restic backend eefee03bbd at b2:bucketname:path/to/repo + Please note that knowledge of your password is required to access the repository. + Losing your password means that your data is irrecoverably lost. + +The number of concurrent connections to the B2 service can be set with the `-o +b2.connections=10`. By default, at most five parallel connections are +established. + + Password prompt on Windows ~~~~~~~~~~~~~~~~~~~~~~~~~~ diff --git a/run_integration_tests.go b/run_integration_tests.go index eca5ed82f..b1902288e 100644 --- a/run_integration_tests.go +++ b/run_integration_tests.go @@ -164,6 +164,13 @@ func (env *TravisEnvironment) RunTests() error { msg("S3 repository not available\n") } + // if the test b2 repository is available, make sure that the test is not skipped + if os.Getenv("RESTIC_TEST_B2_REPOSITORY") != "" { + ensureTests = append(ensureTests, "restic/backend/b2.TestBackendB2") + } else { + msg("B2 repository not available\n") + } + env.env["RESTIC_TEST_DISALLOW_SKIP"] = strings.Join(ensureTests, ",") if *runCrossCompile { diff --git a/src/cmds/restic/global.go b/src/cmds/restic/global.go index 770a222e8..baa3998a9 100644 --- a/src/cmds/restic/global.go +++ b/src/cmds/restic/global.go @@ -11,6 +11,7 @@ import ( "strings" "syscall" + "restic/backend/b2" "restic/backend/local" "restic/backend/location" "restic/backend/rest" @@ -356,6 +357,23 @@ func parseConfig(loc location.Location, opts options.Options) (interface{}, erro debug.Log("opening s3 repository at %#v", cfg) return cfg, nil + case "b2": + cfg := loc.Config.(b2.Config) + + if cfg.AccountID == "" { + cfg.AccountID = os.Getenv("B2_ACCOUNT_ID") + } + + if cfg.Key == "" { + cfg.Key = os.Getenv("B2_ACCOUNT_KEY") + } + + if err := opts.Apply(loc.Scheme, &cfg); err != nil { + return nil, err + } + + debug.Log("opening b2 repository at %#v", cfg) + return cfg, nil case "rest": cfg := loc.Config.(rest.Config) if err := opts.Apply(loc.Scheme, &cfg); err != nil { @@ -391,6 +409,8 @@ func open(s string, opts options.Options) (restic.Backend, error) { be, err = sftp.Open(cfg.(sftp.Config)) case "s3": be, err = s3.Open(cfg.(s3.Config)) + case "b2": + be, err = b2.Open(cfg.(b2.Config)) case "rest": be, err = rest.Open(cfg.(rest.Config)) @@ -435,6 +455,8 @@ func create(s string, opts options.Options) (restic.Backend, error) { return sftp.Create(cfg.(sftp.Config)) case "s3": return s3.Open(cfg.(s3.Config)) + case "b2": + return b2.Create(cfg.(b2.Config)) case "rest": return rest.Create(cfg.(rest.Config)) } diff --git a/src/restic/backend/b2/b2.go b/src/restic/backend/b2/b2.go new file mode 100644 index 000000000..c209c13ab --- /dev/null +++ b/src/restic/backend/b2/b2.go @@ -0,0 +1,371 @@ +package b2 + +import ( + "context" + "io" + "path" + "restic" + "strings" + + "restic/backend" + "restic/debug" + "restic/errors" + + "github.com/kurin/blazer/b2" +) + +// b2Backend is a backend which stores its data on Backblaze B2. +type b2Backend struct { + client *b2.Client + bucket *b2.Bucket + cfg Config + backend.Layout + sem *backend.Semaphore +} + +func newClient(ctx context.Context, cfg Config) (*b2.Client, error) { + opts := []b2.ClientOption{b2.Transport(backend.Transport())} + + c, err := b2.NewClient(ctx, cfg.AccountID, cfg.Key, opts...) + if err != nil { + return nil, errors.Wrap(err, "b2.NewClient") + } + return c, nil +} + +// Open opens a connection to the B2 service. +func Open(cfg Config) (restic.Backend, error) { + debug.Log("cfg %#v", cfg) + + ctx, cancel := context.WithCancel(context.TODO()) + defer cancel() + + client, err := newClient(ctx, cfg) + if err != nil { + return nil, err + } + + bucket, err := client.Bucket(ctx, cfg.Bucket) + if err != nil { + return nil, errors.Wrap(err, "Bucket") + } + + be := &b2Backend{ + client: client, + bucket: bucket, + cfg: cfg, + Layout: &backend.DefaultLayout{ + Join: path.Join, + Path: cfg.Prefix, + }, + sem: backend.NewSemaphore(cfg.Connections), + } + + return be, nil +} + +// Create opens a connection to the B2 service. If the bucket does not exist yet, +// it is created. +func Create(cfg Config) (restic.Backend, error) { + debug.Log("cfg %#v", cfg) + + ctx, cancel := context.WithCancel(context.TODO()) + defer cancel() + + client, err := newClient(ctx, cfg) + if err != nil { + return nil, err + } + + attr := b2.BucketAttrs{ + Type: b2.Private, + } + bucket, err := client.NewBucket(ctx, cfg.Bucket, &attr) + if err != nil { + return nil, errors.Wrap(err, "NewBucket") + } + + be := &b2Backend{ + client: client, + bucket: bucket, + cfg: cfg, + Layout: &backend.DefaultLayout{ + Join: path.Join, + Path: cfg.Prefix, + }, + sem: backend.NewSemaphore(cfg.Connections), + } + + present, err := be.Test(restic.Handle{Type: restic.ConfigFile}) + if err != nil { + return nil, err + } + + if present { + return nil, errors.New("config already exists") + } + + return be, nil +} + +// Location returns the location for the backend. +func (be *b2Backend) Location() string { + return be.cfg.Bucket +} + +// wrapReader wraps an io.ReadCloser to run an additional function on Close. +type wrapReader struct { + io.ReadCloser + eofSeen bool + f func() +} + +func (wr *wrapReader) Read(p []byte) (int, error) { + if wr.eofSeen { + return 0, io.EOF + } + + n, err := wr.ReadCloser.Read(p) + if err == io.EOF { + wr.eofSeen = true + } + return n, err +} + +func (wr *wrapReader) Close() error { + err := wr.ReadCloser.Close() + wr.f() + return err +} + +// Load returns the data stored in the backend for h at the given offset +// and saves it in p. Load has the same semantics as io.ReaderAt. +func (be *b2Backend) Load(h restic.Handle, length int, offset int64) (io.ReadCloser, error) { + debug.Log("Load %v, length %v, offset %v from %v", h, length, offset, be.Filename(h)) + if err := h.Valid(); err != nil { + return nil, err + } + + if offset < 0 { + return nil, errors.New("offset is negative") + } + + if length < 0 { + return nil, errors.Errorf("invalid length %d", length) + } + + ctx, cancel := context.WithCancel(context.TODO()) + + be.sem.GetToken() + + name := be.Layout.Filename(h) + obj := be.bucket.Object(name) + + if offset == 0 && length == 0 { + rd := obj.NewReader(ctx) + wrapper := &wrapReader{ + ReadCloser: rd, + f: func() { + cancel() + be.sem.ReleaseToken() + }, + } + return wrapper, nil + } + + // pass a negative length to NewRangeReader so that the remainder of the + // file is read. + if length == 0 { + length = -1 + } + + rd := obj.NewRangeReader(ctx, offset, int64(length)) + wrapper := &wrapReader{ + ReadCloser: rd, + f: func() { + cancel() + be.sem.ReleaseToken() + }, + } + return wrapper, nil +} + +// Save stores data in the backend at the handle. +func (be *b2Backend) Save(h restic.Handle, rd io.Reader) (err error) { + ctx, cancel := context.WithCancel(context.TODO()) + defer cancel() + + if err := h.Valid(); err != nil { + return err + } + + be.sem.GetToken() + defer be.sem.ReleaseToken() + + name := be.Filename(h) + debug.Log("Save %v, name %v", h, name) + obj := be.bucket.Object(name) + + _, err = obj.Attrs(ctx) + if err == nil { + debug.Log(" %v already exists", h) + return errors.New("key already exists") + } + + w := obj.NewWriter(ctx) + n, err := io.Copy(w, rd) + debug.Log(" saved %d bytes, err %v", n, err) + + if err != nil { + _ = w.Close() + return errors.Wrap(err, "Copy") + } + + return errors.Wrap(w.Close(), "Close") +} + +// Stat returns information about a blob. +func (be *b2Backend) Stat(h restic.Handle) (bi restic.FileInfo, err error) { + debug.Log("Stat %v", h) + + ctx, cancel := context.WithCancel(context.TODO()) + defer cancel() + + be.sem.GetToken() + defer be.sem.ReleaseToken() + + name := be.Filename(h) + obj := be.bucket.Object(name) + info, err := obj.Attrs(ctx) + if err != nil { + debug.Log("Attrs() err %v", err) + return restic.FileInfo{}, errors.Wrap(err, "Stat") + } + return restic.FileInfo{Size: info.Size}, nil +} + +// Test returns true if a blob of the given type and name exists in the backend. +func (be *b2Backend) Test(h restic.Handle) (bool, error) { + debug.Log("Test %v", h) + + ctx, cancel := context.WithCancel(context.TODO()) + defer cancel() + + be.sem.GetToken() + defer be.sem.ReleaseToken() + + found := false + name := be.Filename(h) + obj := be.bucket.Object(name) + info, err := obj.Attrs(ctx) + if err == nil && info != nil && info.Status == b2.Uploaded { + found = true + } + return found, nil +} + +// Remove removes the blob with the given name and type. +func (be *b2Backend) Remove(h restic.Handle) error { + debug.Log("Remove %v", h) + + ctx, cancel := context.WithCancel(context.TODO()) + defer cancel() + + be.sem.GetToken() + defer be.sem.ReleaseToken() + + obj := be.bucket.Object(be.Filename(h)) + return errors.Wrap(obj.Delete(ctx), "Delete") +} + +// List returns a channel that yields all names of blobs of type t. A +// goroutine is started for this. If the channel done is closed, sending +// stops. +func (be *b2Backend) List(t restic.FileType, done <-chan struct{}) <-chan string { + debug.Log("List %v", t) + ch := make(chan string) + + ctx, cancel := context.WithCancel(context.TODO()) + + be.sem.GetToken() + + go func() { + defer close(ch) + defer cancel() + defer be.sem.ReleaseToken() + + prefix := be.Dirname(restic.Handle{Type: t}) + cur := &b2.Cursor{Prefix: prefix} + + for { + objs, c, err := be.bucket.ListCurrentObjects(ctx, 1000, cur) + if err != nil && err != io.EOF { + return + } + for _, obj := range objs { + // Skip objects returned that do not have the specified prefix. + if !strings.HasPrefix(obj.Name(), prefix) { + continue + } + + m := path.Base(obj.Name()) + if m == "" { + continue + } + + select { + case ch <- m: + case <-done: + return + } + } + if err == io.EOF { + return + } + cur = c + } + }() + + return ch +} + +// Remove keys for a specified backend type. +func (be *b2Backend) removeKeys(t restic.FileType) error { + debug.Log("removeKeys %v", t) + + done := make(chan struct{}) + defer close(done) + for key := range be.List(t, done) { + err := be.Remove(restic.Handle{Type: t, Name: key}) + if err != nil { + return err + } + } + return nil +} + +// Delete removes all restic keys in the bucket. It will not remove the bucket itself. +func (be *b2Backend) Delete() error { + alltypes := []restic.FileType{ + restic.DataFile, + restic.KeyFile, + restic.LockFile, + restic.SnapshotFile, + restic.IndexFile} + + for _, t := range alltypes { + err := be.removeKeys(t) + if err != nil { + return nil + } + } + err := be.Remove(restic.Handle{Type: restic.ConfigFile}) + if err != nil && b2.IsNotExist(errors.Cause(err)) { + err = nil + } + + return err +} + +// Close does nothing +func (be *b2Backend) Close() error { return nil } diff --git a/src/restic/backend/b2/b2_test.go b/src/restic/backend/b2/b2_test.go new file mode 100644 index 000000000..64c00c9ff --- /dev/null +++ b/src/restic/backend/b2/b2_test.go @@ -0,0 +1,93 @@ +package b2_test + +import ( + "fmt" + "os" + "testing" + "time" + + "restic" + "restic/backend/b2" + "restic/backend/test" + + . "restic/test" +) + +func newB2TestSuite(t testing.TB) *test.Suite { + return &test.Suite{ + // do not use excessive data + MinimalData: true, + + // NewConfig returns a config for a new temporary backend that will be used in tests. + NewConfig: func() (interface{}, error) { + b2cfg, err := b2.ParseConfig(os.Getenv("RESTIC_TEST_B2_REPOSITORY")) + if err != nil { + return nil, err + } + + cfg := b2cfg.(b2.Config) + cfg.AccountID = os.Getenv("RESTIC_TEST_B2_ACCOUNT_ID") + cfg.Key = os.Getenv("RESTIC_TEST_B2_ACCOUNT_KEY") + cfg.Prefix = fmt.Sprintf("test-%d", time.Now().UnixNano()) + return cfg, nil + }, + + // CreateFn is a function that creates a temporary repository for the tests. + Create: func(config interface{}) (restic.Backend, error) { + cfg := config.(b2.Config) + return b2.Create(cfg) + }, + + // OpenFn is a function that opens a previously created temporary repository. + Open: func(config interface{}) (restic.Backend, error) { + cfg := config.(b2.Config) + return b2.Open(cfg) + }, + + // CleanupFn removes data created during the tests. + Cleanup: func(config interface{}) error { + cfg := config.(b2.Config) + be, err := b2.Open(cfg) + if err != nil { + return err + } + + if err := be.(restic.Deleter).Delete(); err != nil { + return err + } + + return nil + }, + } +} + +func testVars(t testing.TB) { + vars := []string{ + "RESTIC_TEST_B2_ACCOUNT_ID", + "RESTIC_TEST_B2_ACCOUNT_KEY", + "RESTIC_TEST_B2_REPOSITORY", + } + + for _, v := range vars { + if os.Getenv(v) == "" { + t.Skipf("environment variable %v not set", v) + return + } + } +} + +func TestBackendB2(t *testing.T) { + defer func() { + if t.Skipped() { + SkipDisallowed(t, "restic/backend/b2.TestBackendB2") + } + }() + + testVars(t) + newB2TestSuite(t).RunTests(t) +} + +func BenchmarkBackendb2(t *testing.B) { + testVars(t) + newB2TestSuite(t).RunBenchmarks(t) +} diff --git a/src/restic/backend/b2/config.go b/src/restic/backend/b2/config.go new file mode 100644 index 000000000..221e4ff02 --- /dev/null +++ b/src/restic/backend/b2/config.go @@ -0,0 +1,93 @@ +package b2 + +import ( + "path" + "regexp" + "strings" + + "restic/errors" + "restic/options" +) + +// Config contains all configuration necessary to connect to an b2 compatible +// server. +type Config struct { + AccountID string + Key string + Bucket string + Prefix string + + Connections int `option:"connections" help:"set a limit for the number of concurrent connections (default: 5)"` +} + +// NewConfig returns a new config with default options applied. +func NewConfig() Config { + return Config{ + Connections: 5, + } +} + +func init() { + options.Register("b2", Config{}) +} + +var bucketName = regexp.MustCompile("^[a-zA-Z0-9-]+$") + +// checkBucketName tests the bucket name against the rules at +// https://help.backblaze.com/hc/en-us/articles/217666908-What-you-need-to-know-about-B2-Bucket-names +func checkBucketName(name string) error { + if name == "" { + return errors.New("bucket name is empty") + } + + if len(name) < 6 { + return errors.New("bucket name is too short") + } + + if len(name) > 50 { + return errors.New("bucket name is too long") + } + + if !bucketName.MatchString(name) { + return errors.New("bucket name contains invalid characters, allowed are: a-z, 0-9, dash (-)") + } + + return nil +} + +// ParseConfig parses the string s and extracts the b2 config. The supported +// configuration format is b2:bucketname/prefix. If no prefix is given the +// prefix "restic" will be used. +func ParseConfig(s string) (interface{}, error) { + if !strings.HasPrefix(s, "b2:") { + return nil, errors.New("invalid format, want: b2:bucket-name[:path]") + } + + s = s[3:] + data := strings.SplitN(s, ":", 2) + if len(data) == 0 || len(data[0]) == 0 { + return nil, errors.New("bucket name not found") + } + + cfg := NewConfig() + cfg.Bucket = data[0] + + if err := checkBucketName(cfg.Bucket); err != nil { + return nil, err + } + + if len(data) == 2 { + p := data[1] + if len(p) > 0 { + p = path.Clean(p) + } + + if len(p) > 0 && path.IsAbs(p) { + p = p[1:] + } + + cfg.Prefix = p + } + + return cfg, nil +} diff --git a/src/restic/backend/b2/config_test.go b/src/restic/backend/b2/config_test.go new file mode 100644 index 000000000..4194cb62c --- /dev/null +++ b/src/restic/backend/b2/config_test.go @@ -0,0 +1,92 @@ +package b2 + +import "testing" + +var configTests = []struct { + s string + cfg Config +}{ + {"b2:bucketname", Config{ + Bucket: "bucketname", + Prefix: "", + Connections: 5, + }}, + {"b2:bucketname:", Config{ + Bucket: "bucketname", + Prefix: "", + Connections: 5, + }}, + {"b2:bucketname:/prefix/directory", Config{ + Bucket: "bucketname", + Prefix: "prefix/directory", + Connections: 5, + }}, + {"b2:foobar", Config{ + Bucket: "foobar", + Prefix: "", + Connections: 5, + }}, + {"b2:foobar:", Config{ + Bucket: "foobar", + Prefix: "", + Connections: 5, + }}, + {"b2:foobar:/", Config{ + Bucket: "foobar", + Prefix: "", + Connections: 5, + }}, +} + +func TestParseConfig(t *testing.T) { + for _, test := range configTests { + t.Run("", func(t *testing.T) { + cfg, err := ParseConfig(test.s) + if err != nil { + t.Fatalf("%s failed: %v", test.s, err) + } + + if cfg != test.cfg { + t.Fatalf("input: %s\n wrong config, want:\n %#v\ngot:\n %#v", + test.s, test.cfg, cfg) + } + }) + } +} + +var invalidConfigTests = []struct { + s string + err string +}{ + { + "b2", + "invalid format, want: b2:bucket-name[:path]", + }, + { + "b2:", + "bucket name not found", + }, + { + "b2:bucket_name", + "bucket name contains invalid characters, allowed are: a-z, 0-9, dash (-)", + }, + { + "b2:bucketname/prefix/directory/", + "bucket name contains invalid characters, allowed are: a-z, 0-9, dash (-)", + }, +} + +func TestInvalidConfig(t *testing.T) { + for _, test := range invalidConfigTests { + t.Run("", func(t *testing.T) { + cfg, err := ParseConfig(test.s) + if err == nil { + t.Fatalf("expected error not found for invalid config: %v, cfg is:\n%#v", test.s, cfg) + } + + if err.Error() != test.err { + t.Fatalf("unexpected error found, want:\n %v\ngot:\n %v", test.err, err.Error()) + } + }) + } +} diff --git a/src/restic/backend/location/location.go b/src/restic/backend/location/location.go index 23e0af37b..69c22566b 100644 --- a/src/restic/backend/location/location.go +++ b/src/restic/backend/location/location.go @@ -4,6 +4,7 @@ package location import ( "strings" + "restic/backend/b2" "restic/backend/local" "restic/backend/rest" "restic/backend/s3" @@ -25,6 +26,7 @@ type parser struct { // parsers is a list of valid config parsers for the backends. The first parser // is the fallback and should always be set to the local backend. var parsers = []parser{ + {"b2", b2.ParseConfig}, {"local", local.ParseConfig}, {"sftp", sftp.ParseConfig}, {"s3", s3.ParseConfig}, diff --git a/src/restic/backend/location/location_test.go b/src/restic/backend/location/location_test.go index 47260669a..156c86a0c 100644 --- a/src/restic/backend/location/location_test.go +++ b/src/restic/backend/location/location_test.go @@ -5,6 +5,7 @@ import ( "reflect" "testing" + "restic/backend/b2" "restic/backend/local" "restic/backend/rest" "restic/backend/s3" @@ -203,6 +204,24 @@ var parseTests = []struct { }, }, }, + { + "b2:bucketname:/prefix", Location{Scheme: "b2", + Config: b2.Config{ + Bucket: "bucketname", + Prefix: "prefix", + Connections: 5, + }, + }, + }, + { + "b2:bucketname", Location{Scheme: "b2", + Config: b2.Config{ + Bucket: "bucketname", + Prefix: "", + Connections: 5, + }, + }, + }, } func TestParse(t *testing.T) { diff --git a/src/restic/backend/semaphore.go b/src/restic/backend/semaphore.go new file mode 100644 index 000000000..dbbd72966 --- /dev/null +++ b/src/restic/backend/semaphore.go @@ -0,0 +1,23 @@ +package backend + +// Semaphore limits access to a restricted resource. +type Semaphore struct { + ch chan struct{} +} + +// NewSemaphore returns a new semaphore with capacity n. +func NewSemaphore(n int) *Semaphore { + return &Semaphore{ + ch: make(chan struct{}, n), + } +} + +// GetToken blocks until a Token is available. +func (s *Semaphore) GetToken() { + s.ch <- struct{}{} +} + +// ReleaseToken returns a token. +func (s *Semaphore) ReleaseToken() { + <-s.ch +}