2
2
mirror of https://github.com/octoleo/restic.git synced 2024-11-29 08:14:03 +00:00

Merge pull request #978 from restic/add-backblaze-backend

Add Backblaze B2 backend
This commit is contained in:
Alexander Neumann 2017-06-03 14:54:04 +02:00
commit 40a61b82ce
28 changed files with 6547 additions and 0 deletions

View File

@ -9,6 +9,11 @@ Important Changes in 0.X.Y
https://github.com/restic/restic/pull/975 https://github.com/restic/restic/pull/975
https://github.com/restic/restic/pull/648 https://github.com/restic/restic/pull/648
* New "b2" backend: A new backend for Backblaze B2 cloud storage
service has been added, https://www.backblaze.com
https://github.com/restic/restic/issues/512
https://github.com/restic/restic/pull/978
Important Changes in 0.6.1 Important Changes in 0.6.1
========================== ==========================

View File

@ -343,6 +343,35 @@ The policy of new container created by restic can be changed using environment v
$ export SWIFT_DEFAULT_CONTAINER_POLICY=<MY_CONTAINER_POLICY> $ export SWIFT_DEFAULT_CONTAINER_POLICY=<MY_CONTAINER_POLICY>
Backblaze B2
~~~~~~~~~~~~
Restic can backup data to any Backblaze B2 bucket. You need to first setup the
following environment variables with the credentials you obtained when signed
into your B2 account:
.. code-block:: console
$ export B2_ACCOUNT_ID=<MY_ACCOUNT_ID>
$ export B2_ACCOUNT_KEY=<MY_SECRET_ACCOUNT_KEY>
You can then easily initialize a repository stored at Backblaze B2. If the
bucket does not exist yet, it will be created:
.. code-block:: console
$ restic -r b2:bucketname:path/to/repo init
enter password for new backend:
enter password again:
created restic backend eefee03bbd at b2:bucketname:path/to/repo
Please note that knowledge of your password is required to access the repository.
Losing your password means that your data is irrecoverably lost.
The number of concurrent connections to the B2 service can be set with the `-o
b2.connections=10`. By default, at most five parallel connections are
established.
Password prompt on Windows Password prompt on Windows
~~~~~~~~~~~~~~~~~~~~~~~~~~ ~~~~~~~~~~~~~~~~~~~~~~~~~~

View File

@ -171,6 +171,13 @@ func (env *TravisEnvironment) RunTests() error {
msg("Swift service not available\n") msg("Swift service not available\n")
} }
// if the test b2 repository is available, make sure that the test is not skipped
if os.Getenv("RESTIC_TEST_B2_REPOSITORY") != "" {
ensureTests = append(ensureTests, "restic/backend/b2.TestBackendB2")
} else {
msg("B2 repository not available\n")
}
env.env["RESTIC_TEST_DISALLOW_SKIP"] = strings.Join(ensureTests, ",") env.env["RESTIC_TEST_DISALLOW_SKIP"] = strings.Join(ensureTests, ",")
if *runCrossCompile { if *runCrossCompile {

View File

@ -11,6 +11,7 @@ import (
"strings" "strings"
"syscall" "syscall"
"restic/backend/b2"
"restic/backend/local" "restic/backend/local"
"restic/backend/location" "restic/backend/location"
"restic/backend/rest" "restic/backend/rest"
@ -371,6 +372,23 @@ func parseConfig(loc location.Location, opts options.Options) (interface{}, erro
debug.Log("opening swift repository at %#v", cfg) debug.Log("opening swift repository at %#v", cfg)
return cfg, nil return cfg, nil
case "b2":
cfg := loc.Config.(b2.Config)
if cfg.AccountID == "" {
cfg.AccountID = os.Getenv("B2_ACCOUNT_ID")
}
if cfg.Key == "" {
cfg.Key = os.Getenv("B2_ACCOUNT_KEY")
}
if err := opts.Apply(loc.Scheme, &cfg); err != nil {
return nil, err
}
debug.Log("opening b2 repository at %#v", cfg)
return cfg, nil
case "rest": case "rest":
cfg := loc.Config.(rest.Config) cfg := loc.Config.(rest.Config)
if err := opts.Apply(loc.Scheme, &cfg); err != nil { if err := opts.Apply(loc.Scheme, &cfg); err != nil {
@ -408,6 +426,8 @@ func open(s string, opts options.Options) (restic.Backend, error) {
be, err = s3.Open(cfg.(s3.Config)) be, err = s3.Open(cfg.(s3.Config))
case "swift": case "swift":
be, err = swift.Open(cfg.(swift.Config)) be, err = swift.Open(cfg.(swift.Config))
case "b2":
be, err = b2.Open(cfg.(b2.Config))
case "rest": case "rest":
be, err = rest.Open(cfg.(rest.Config)) be, err = rest.Open(cfg.(rest.Config))
@ -454,6 +474,8 @@ func create(s string, opts options.Options) (restic.Backend, error) {
return s3.Open(cfg.(s3.Config)) return s3.Open(cfg.(s3.Config))
case "swift": case "swift":
return swift.Open(cfg.(swift.Config)) return swift.Open(cfg.(swift.Config))
case "b2":
return b2.Create(cfg.(b2.Config))
case "rest": case "rest":
return rest.Create(cfg.(rest.Config)) return rest.Create(cfg.(rest.Config))
} }

371
src/restic/backend/b2/b2.go Normal file
View File

@ -0,0 +1,371 @@
package b2
import (
"context"
"io"
"path"
"restic"
"strings"
"restic/backend"
"restic/debug"
"restic/errors"
"github.com/kurin/blazer/b2"
)
// b2Backend is a backend which stores its data on Backblaze B2.
type b2Backend struct {
client *b2.Client
bucket *b2.Bucket
cfg Config
backend.Layout
sem *backend.Semaphore
}
func newClient(ctx context.Context, cfg Config) (*b2.Client, error) {
opts := []b2.ClientOption{b2.Transport(backend.Transport())}
c, err := b2.NewClient(ctx, cfg.AccountID, cfg.Key, opts...)
if err != nil {
return nil, errors.Wrap(err, "b2.NewClient")
}
return c, nil
}
// Open opens a connection to the B2 service.
func Open(cfg Config) (restic.Backend, error) {
debug.Log("cfg %#v", cfg)
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
client, err := newClient(ctx, cfg)
if err != nil {
return nil, err
}
bucket, err := client.Bucket(ctx, cfg.Bucket)
if err != nil {
return nil, errors.Wrap(err, "Bucket")
}
be := &b2Backend{
client: client,
bucket: bucket,
cfg: cfg,
Layout: &backend.DefaultLayout{
Join: path.Join,
Path: cfg.Prefix,
},
sem: backend.NewSemaphore(cfg.Connections),
}
return be, nil
}
// Create opens a connection to the B2 service. If the bucket does not exist yet,
// it is created.
func Create(cfg Config) (restic.Backend, error) {
debug.Log("cfg %#v", cfg)
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
client, err := newClient(ctx, cfg)
if err != nil {
return nil, err
}
attr := b2.BucketAttrs{
Type: b2.Private,
}
bucket, err := client.NewBucket(ctx, cfg.Bucket, &attr)
if err != nil {
return nil, errors.Wrap(err, "NewBucket")
}
be := &b2Backend{
client: client,
bucket: bucket,
cfg: cfg,
Layout: &backend.DefaultLayout{
Join: path.Join,
Path: cfg.Prefix,
},
sem: backend.NewSemaphore(cfg.Connections),
}
present, err := be.Test(restic.Handle{Type: restic.ConfigFile})
if err != nil {
return nil, err
}
if present {
return nil, errors.New("config already exists")
}
return be, nil
}
// Location returns the location for the backend.
func (be *b2Backend) Location() string {
return be.cfg.Bucket
}
// wrapReader wraps an io.ReadCloser to run an additional function on Close.
type wrapReader struct {
io.ReadCloser
eofSeen bool
f func()
}
func (wr *wrapReader) Read(p []byte) (int, error) {
if wr.eofSeen {
return 0, io.EOF
}
n, err := wr.ReadCloser.Read(p)
if err == io.EOF {
wr.eofSeen = true
}
return n, err
}
func (wr *wrapReader) Close() error {
err := wr.ReadCloser.Close()
wr.f()
return err
}
// Load returns the data stored in the backend for h at the given offset
// and saves it in p. Load has the same semantics as io.ReaderAt.
func (be *b2Backend) Load(h restic.Handle, length int, offset int64) (io.ReadCloser, error) {
debug.Log("Load %v, length %v, offset %v from %v", h, length, offset, be.Filename(h))
if err := h.Valid(); err != nil {
return nil, err
}
if offset < 0 {
return nil, errors.New("offset is negative")
}
if length < 0 {
return nil, errors.Errorf("invalid length %d", length)
}
ctx, cancel := context.WithCancel(context.TODO())
be.sem.GetToken()
name := be.Layout.Filename(h)
obj := be.bucket.Object(name)
if offset == 0 && length == 0 {
rd := obj.NewReader(ctx)
wrapper := &wrapReader{
ReadCloser: rd,
f: func() {
cancel()
be.sem.ReleaseToken()
},
}
return wrapper, nil
}
// pass a negative length to NewRangeReader so that the remainder of the
// file is read.
if length == 0 {
length = -1
}
rd := obj.NewRangeReader(ctx, offset, int64(length))
wrapper := &wrapReader{
ReadCloser: rd,
f: func() {
cancel()
be.sem.ReleaseToken()
},
}
return wrapper, nil
}
// Save stores data in the backend at the handle.
func (be *b2Backend) Save(h restic.Handle, rd io.Reader) (err error) {
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
if err := h.Valid(); err != nil {
return err
}
be.sem.GetToken()
defer be.sem.ReleaseToken()
name := be.Filename(h)
debug.Log("Save %v, name %v", h, name)
obj := be.bucket.Object(name)
_, err = obj.Attrs(ctx)
if err == nil {
debug.Log(" %v already exists", h)
return errors.New("key already exists")
}
w := obj.NewWriter(ctx)
n, err := io.Copy(w, rd)
debug.Log(" saved %d bytes, err %v", n, err)
if err != nil {
_ = w.Close()
return errors.Wrap(err, "Copy")
}
return errors.Wrap(w.Close(), "Close")
}
// Stat returns information about a blob.
func (be *b2Backend) Stat(h restic.Handle) (bi restic.FileInfo, err error) {
debug.Log("Stat %v", h)
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
be.sem.GetToken()
defer be.sem.ReleaseToken()
name := be.Filename(h)
obj := be.bucket.Object(name)
info, err := obj.Attrs(ctx)
if err != nil {
debug.Log("Attrs() err %v", err)
return restic.FileInfo{}, errors.Wrap(err, "Stat")
}
return restic.FileInfo{Size: info.Size}, nil
}
// Test returns true if a blob of the given type and name exists in the backend.
func (be *b2Backend) Test(h restic.Handle) (bool, error) {
debug.Log("Test %v", h)
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
be.sem.GetToken()
defer be.sem.ReleaseToken()
found := false
name := be.Filename(h)
obj := be.bucket.Object(name)
info, err := obj.Attrs(ctx)
if err == nil && info != nil && info.Status == b2.Uploaded {
found = true
}
return found, nil
}
// Remove removes the blob with the given name and type.
func (be *b2Backend) Remove(h restic.Handle) error {
debug.Log("Remove %v", h)
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
be.sem.GetToken()
defer be.sem.ReleaseToken()
obj := be.bucket.Object(be.Filename(h))
return errors.Wrap(obj.Delete(ctx), "Delete")
}
// List returns a channel that yields all names of blobs of type t. A
// goroutine is started for this. If the channel done is closed, sending
// stops.
func (be *b2Backend) List(t restic.FileType, done <-chan struct{}) <-chan string {
debug.Log("List %v", t)
ch := make(chan string)
ctx, cancel := context.WithCancel(context.TODO())
be.sem.GetToken()
go func() {
defer close(ch)
defer cancel()
defer be.sem.ReleaseToken()
prefix := be.Dirname(restic.Handle{Type: t})
cur := &b2.Cursor{Prefix: prefix}
for {
objs, c, err := be.bucket.ListCurrentObjects(ctx, 1000, cur)
if err != nil && err != io.EOF {
return
}
for _, obj := range objs {
// Skip objects returned that do not have the specified prefix.
if !strings.HasPrefix(obj.Name(), prefix) {
continue
}
m := path.Base(obj.Name())
if m == "" {
continue
}
select {
case ch <- m:
case <-done:
return
}
}
if err == io.EOF {
return
}
cur = c
}
}()
return ch
}
// Remove keys for a specified backend type.
func (be *b2Backend) removeKeys(t restic.FileType) error {
debug.Log("removeKeys %v", t)
done := make(chan struct{})
defer close(done)
for key := range be.List(t, done) {
err := be.Remove(restic.Handle{Type: t, Name: key})
if err != nil {
return err
}
}
return nil
}
// Delete removes all restic keys in the bucket. It will not remove the bucket itself.
func (be *b2Backend) Delete() error {
alltypes := []restic.FileType{
restic.DataFile,
restic.KeyFile,
restic.LockFile,
restic.SnapshotFile,
restic.IndexFile}
for _, t := range alltypes {
err := be.removeKeys(t)
if err != nil {
return nil
}
}
err := be.Remove(restic.Handle{Type: restic.ConfigFile})
if err != nil && b2.IsNotExist(errors.Cause(err)) {
err = nil
}
return err
}
// Close does nothing
func (be *b2Backend) Close() error { return nil }

View File

@ -0,0 +1,93 @@
package b2_test
import (
"fmt"
"os"
"testing"
"time"
"restic"
"restic/backend/b2"
"restic/backend/test"
. "restic/test"
)
func newB2TestSuite(t testing.TB) *test.Suite {
return &test.Suite{
// do not use excessive data
MinimalData: true,
// NewConfig returns a config for a new temporary backend that will be used in tests.
NewConfig: func() (interface{}, error) {
b2cfg, err := b2.ParseConfig(os.Getenv("RESTIC_TEST_B2_REPOSITORY"))
if err != nil {
return nil, err
}
cfg := b2cfg.(b2.Config)
cfg.AccountID = os.Getenv("RESTIC_TEST_B2_ACCOUNT_ID")
cfg.Key = os.Getenv("RESTIC_TEST_B2_ACCOUNT_KEY")
cfg.Prefix = fmt.Sprintf("test-%d", time.Now().UnixNano())
return cfg, nil
},
// CreateFn is a function that creates a temporary repository for the tests.
Create: func(config interface{}) (restic.Backend, error) {
cfg := config.(b2.Config)
return b2.Create(cfg)
},
// OpenFn is a function that opens a previously created temporary repository.
Open: func(config interface{}) (restic.Backend, error) {
cfg := config.(b2.Config)
return b2.Open(cfg)
},
// CleanupFn removes data created during the tests.
Cleanup: func(config interface{}) error {
cfg := config.(b2.Config)
be, err := b2.Open(cfg)
if err != nil {
return err
}
if err := be.(restic.Deleter).Delete(); err != nil {
return err
}
return nil
},
}
}
func testVars(t testing.TB) {
vars := []string{
"RESTIC_TEST_B2_ACCOUNT_ID",
"RESTIC_TEST_B2_ACCOUNT_KEY",
"RESTIC_TEST_B2_REPOSITORY",
}
for _, v := range vars {
if os.Getenv(v) == "" {
t.Skipf("environment variable %v not set", v)
return
}
}
}
func TestBackendB2(t *testing.T) {
defer func() {
if t.Skipped() {
SkipDisallowed(t, "restic/backend/b2.TestBackendB2")
}
}()
testVars(t)
newB2TestSuite(t).RunTests(t)
}
func BenchmarkBackendb2(t *testing.B) {
testVars(t)
newB2TestSuite(t).RunBenchmarks(t)
}

View File

@ -0,0 +1,93 @@
package b2
import (
"path"
"regexp"
"strings"
"restic/errors"
"restic/options"
)
// Config contains all configuration necessary to connect to an b2 compatible
// server.
type Config struct {
AccountID string
Key string
Bucket string
Prefix string
Connections int `option:"connections" help:"set a limit for the number of concurrent connections (default: 5)"`
}
// NewConfig returns a new config with default options applied.
func NewConfig() Config {
return Config{
Connections: 5,
}
}
func init() {
options.Register("b2", Config{})
}
var bucketName = regexp.MustCompile("^[a-zA-Z0-9-]+$")
// checkBucketName tests the bucket name against the rules at
// https://help.backblaze.com/hc/en-us/articles/217666908-What-you-need-to-know-about-B2-Bucket-names
func checkBucketName(name string) error {
if name == "" {
return errors.New("bucket name is empty")
}
if len(name) < 6 {
return errors.New("bucket name is too short")
}
if len(name) > 50 {
return errors.New("bucket name is too long")
}
if !bucketName.MatchString(name) {
return errors.New("bucket name contains invalid characters, allowed are: a-z, 0-9, dash (-)")
}
return nil
}
// ParseConfig parses the string s and extracts the b2 config. The supported
// configuration format is b2:bucketname/prefix. If no prefix is given the
// prefix "restic" will be used.
func ParseConfig(s string) (interface{}, error) {
if !strings.HasPrefix(s, "b2:") {
return nil, errors.New("invalid format, want: b2:bucket-name[:path]")
}
s = s[3:]
data := strings.SplitN(s, ":", 2)
if len(data) == 0 || len(data[0]) == 0 {
return nil, errors.New("bucket name not found")
}
cfg := NewConfig()
cfg.Bucket = data[0]
if err := checkBucketName(cfg.Bucket); err != nil {
return nil, err
}
if len(data) == 2 {
p := data[1]
if len(p) > 0 {
p = path.Clean(p)
}
if len(p) > 0 && path.IsAbs(p) {
p = p[1:]
}
cfg.Prefix = p
}
return cfg, nil
}

View File

@ -0,0 +1,92 @@
package b2
import "testing"
var configTests = []struct {
s string
cfg Config
}{
{"b2:bucketname", Config{
Bucket: "bucketname",
Prefix: "",
Connections: 5,
}},
{"b2:bucketname:", Config{
Bucket: "bucketname",
Prefix: "",
Connections: 5,
}},
{"b2:bucketname:/prefix/directory", Config{
Bucket: "bucketname",
Prefix: "prefix/directory",
Connections: 5,
}},
{"b2:foobar", Config{
Bucket: "foobar",
Prefix: "",
Connections: 5,
}},
{"b2:foobar:", Config{
Bucket: "foobar",
Prefix: "",
Connections: 5,
}},
{"b2:foobar:/", Config{
Bucket: "foobar",
Prefix: "",
Connections: 5,
}},
}
func TestParseConfig(t *testing.T) {
for _, test := range configTests {
t.Run("", func(t *testing.T) {
cfg, err := ParseConfig(test.s)
if err != nil {
t.Fatalf("%s failed: %v", test.s, err)
}
if cfg != test.cfg {
t.Fatalf("input: %s\n wrong config, want:\n %#v\ngot:\n %#v",
test.s, test.cfg, cfg)
}
})
}
}
var invalidConfigTests = []struct {
s string
err string
}{
{
"b2",
"invalid format, want: b2:bucket-name[:path]",
},
{
"b2:",
"bucket name not found",
},
{
"b2:bucket_name",
"bucket name contains invalid characters, allowed are: a-z, 0-9, dash (-)",
},
{
"b2:bucketname/prefix/directory/",
"bucket name contains invalid characters, allowed are: a-z, 0-9, dash (-)",
},
}
func TestInvalidConfig(t *testing.T) {
for _, test := range invalidConfigTests {
t.Run("", func(t *testing.T) {
cfg, err := ParseConfig(test.s)
if err == nil {
t.Fatalf("expected error not found for invalid config: %v, cfg is:\n%#v", test.s, cfg)
}
if err.Error() != test.err {
t.Fatalf("unexpected error found, want:\n %v\ngot:\n %v", test.err, err.Error())
}
})
}
}

View File

@ -4,6 +4,7 @@ package location
import ( import (
"strings" "strings"
"restic/backend/b2"
"restic/backend/local" "restic/backend/local"
"restic/backend/rest" "restic/backend/rest"
"restic/backend/s3" "restic/backend/s3"
@ -26,6 +27,7 @@ type parser struct {
// parsers is a list of valid config parsers for the backends. The first parser // parsers is a list of valid config parsers for the backends. The first parser
// is the fallback and should always be set to the local backend. // is the fallback and should always be set to the local backend.
var parsers = []parser{ var parsers = []parser{
{"b2", b2.ParseConfig},
{"local", local.ParseConfig}, {"local", local.ParseConfig},
{"sftp", sftp.ParseConfig}, {"sftp", sftp.ParseConfig},
{"s3", s3.ParseConfig}, {"s3", s3.ParseConfig},

View File

@ -5,6 +5,7 @@ import (
"reflect" "reflect"
"testing" "testing"
"restic/backend/b2"
"restic/backend/local" "restic/backend/local"
"restic/backend/rest" "restic/backend/rest"
"restic/backend/s3" "restic/backend/s3"
@ -222,6 +223,24 @@ var parseTests = []struct {
}, },
}, },
}, },
{
"b2:bucketname:/prefix", Location{Scheme: "b2",
Config: b2.Config{
Bucket: "bucketname",
Prefix: "prefix",
Connections: 5,
},
},
},
{
"b2:bucketname", Location{Scheme: "b2",
Config: b2.Config{
Bucket: "bucketname",
Prefix: "",
Connections: 5,
},
},
},
} }
func TestParse(t *testing.T) { func TestParse(t *testing.T) {

View File

@ -0,0 +1,23 @@
package backend
// Semaphore limits access to a restricted resource.
type Semaphore struct {
ch chan struct{}
}
// NewSemaphore returns a new semaphore with capacity n.
func NewSemaphore(n int) *Semaphore {
return &Semaphore{
ch: make(chan struct{}, n),
}
}
// GetToken blocks until a Token is available.
func (s *Semaphore) GetToken() {
s.ch <- struct{}{}
}
// ReleaseToken returns a token.
func (s *Semaphore) ReleaseToken() {
<-s.ch
}

6
vendor/manifest vendored
View File

@ -31,6 +31,12 @@
"revision": "2788f0dbd16903de03cb8186e5c7d97b69ad387b", "revision": "2788f0dbd16903de03cb8186e5c7d97b69ad387b",
"branch": "master" "branch": "master"
}, },
{
"importpath": "github.com/kurin/blazer",
"repository": "https://github.com/kurin/blazer",
"revision": "48de0a1e4d21fba201aff7fefdf3e5e7735b1439",
"branch": "master"
},
{ {
"importpath": "github.com/minio/go-homedir", "importpath": "github.com/minio/go-homedir",
"repository": "https://github.com/minio/go-homedir", "repository": "https://github.com/minio/go-homedir",

View File

@ -0,0 +1,27 @@
Want to contribute? Great! First, read this page (including the small print at the end).
### Before you contribute
Before we can use your code, you must sign the
[Google Individual Contributor License Agreement]
(https://cla.developers.google.com/about/google-individual)
(CLA), which you can do online. The CLA is necessary mainly because you own the
copyright to your changes, even after your contribution becomes part of our
codebase, so we need your permission to use and distribute your code. We also
need to be sure of various other things—for instance that you'll tell us if you
know that your code infringes on other people's patents. You don't have to sign
the CLA until after you've submitted your code for review and a member has
approved it, but you must do it before we can put your code into our codebase.
Before you start working on a larger contribution, you should get in touch with
us first through the issue tracker with your idea so that we can help out and
possibly guide you. Coordinating up front makes it much easier to avoid
frustration later on.
### Code reviews
All submissions, including submissions by project members, require review. We
use Github pull requests for this purpose.
### The small print
Contributions made by corporations are covered by a different agreement than
the one above, the
[Software Grant and Corporate Contributor License Agreement]
(https://cla.developers.google.com/about/google-corporate).

View File

@ -0,0 +1,13 @@
Copyright 2016, Google
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

View File

@ -0,0 +1,142 @@
Blazer
====
[![GoDoc](https://godoc.org/github.com/kurin/blazer/b2?status.svg)](https://godoc.org/github.com/kurin/blazer/b2)
[![Build Status](https://travis-ci.org/kurin/blazer.svg)](https://travis-ci.org/kurin/blazer)
Blazer is a Golang client library for Backblaze's B2 object storage service.
It is designed for simple integration with existing applications that may
already be using S3 and Google Cloud Storage, by exporting only a few standard
Go types.
It implements and satisfies the [B2 integration
checklist](https://www.backblaze.com/b2/docs/integration_checklist.html),
automatically handling error recovery, reauthentication, and other low-level
aspects, making it suitable to upload very large files, or over multi-day time
scales.
```go
import "github.com/kurin/blazer/b2"
```
## Examples
### Copy a file into B2
```go
func copyFile(ctx context.Context, bucket *b2.Bucket, src, dst string) error {
f, err := file.Open(src)
if err != nil {
return err
}
defer f.Close()
obj := bucket.Object(dst)
w := obj.NewWriter(ctx)
if _, err := io.Copy(w, f); err != nil {
w.Close()
return err
}
return w.Close()
}
```
If the file is less than 100MB, Blazer will simply buffer the file and use the
`b2_upload_file` API to send the file to Backblaze. If the file is greater
than 100MB, Blazer will use B2's large file support to upload the file in 100MB
chunks.
### Copy a file into B2, with multiple concurrent uploads
Uploading a large file with multiple HTTP connections is simple:
```go
func copyFile(ctx context.Context, bucket *b2.Bucket, writers int, src, dst string) error {
f, err := file.Open(src)
if err != nil {
return err
}
defer f.Close()
w := bucket.Object(dst).NewWriter(ctx)
w.ConcurrentUploads = writers
if _, err := io.Copy(w, f); err != nil {
w.Close()
return err
}
return w.Close()
}
```
This will automatically split the file into `writers` chunks of 100MB uploads.
Note that 100MB is the smallest chunk size that B2 supports.
### Download a file from B2
Downloading is as simple as uploading:
```go
func downloadFile(ctx context.Context, bucket *b2.Bucket, downloads int, src, dst string) error {
r, err := bucket.Object(src).NewReader(ctx)
if err != nil {
return err
}
defer r.Close()
f, err := file.Create(dst)
if err != nil {
return err
}
r.ConcurrentDownloads = downloads
if _, err := io.Copy(f, r); err != nil {
f.Close()
return err
}
return f.Close()
}
```
### List all objects in a bucket
```go
func printObjects(ctx context.Context, bucket *b2.Bucket) error {
var cur *b2.Cursor
for {
objs, c, err := bucket.ListObjects(ctx, 1000, cur)
if err != nil && err != io.EOF {
return err
}
for _, obj := range objs {
fmt.Println(obj)
}
if err == io.EOF {
return
}
cur = c
}
}
```
### Grant temporary auth to a file
Say you have a number of files in a private bucket, and you want to allow other
people to download some files. This is possible to do by issuing a temporary
authorization token for the prefix of the files you want to share.
```go
token, err := bucket.AuthToken(ctx, "photos", time.Hour)
```
If successful, `token` is then an authorization token valid for one hour, which
can be set in HTTP GET requests.
The hostname to use when downloading files via HTTP is account-specific and can
be found via the BaseURL method:
```go
base := bucket.BaseURL()
```
---
This is not an official Google product.

View File

@ -0,0 +1,583 @@
// Copyright 2016, Google
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package b2 provides a high-level interface to Backblaze's B2 cloud storage
// service.
//
// It is specifically designed to abstract away the Backblaze API details by
// providing familiar Go interfaces, specifically an io.Writer for object
// storage, and an io.Reader for object download. Handling of transient
// errors, including network and authentication timeouts, is transparent.
//
// Methods that perform network requests accept a context.Context argument.
// Callers should use the context's cancellation abilities to end requests
// early, or to provide timeout or deadline guarantees.
//
// This package is in development and may make API changes.
package b2
import (
"bytes"
"fmt"
"io"
"net/http"
"regexp"
"strconv"
"sync"
"time"
"golang.org/x/net/context"
)
// Client is a Backblaze B2 client.
type Client struct {
backend beRootInterface
slock sync.Mutex
sWriters map[string]*Writer
sReaders map[string]*Reader
}
// NewClient creates and returns a new Client with valid B2 service account
// tokens.
func NewClient(ctx context.Context, account, key string, opts ...ClientOption) (*Client, error) {
c := &Client{
backend: &beRoot{
b2i: &b2Root{},
},
}
if err := c.backend.authorizeAccount(ctx, account, key, opts...); err != nil {
return nil, err
}
return c, nil
}
type clientOptions struct {
transport http.RoundTripper
}
// A ClientOption allows callers to adjust various per-client settings.
type ClientOption func(*clientOptions)
// Transport sets the underlying HTTP transport mechanism. If unset,
// http.DefaultTransport is used.
func Transport(rt http.RoundTripper) ClientOption {
return func(c *clientOptions) {
c.transport = rt
}
}
// Bucket is a reference to a B2 bucket.
type Bucket struct {
b beBucketInterface
r beRootInterface
c *Client
}
type BucketType string
const (
UnknownType BucketType = ""
Private = "allPrivate"
Public = "allPublic"
)
// BucketAttrs holds a bucket's metadata attributes.
type BucketAttrs struct {
// Type lists or sets the new bucket type. If Type is UnknownType during a
// bucket.Update, the type is not changed.
Type BucketType
// Info records user data, limited to ten keys. If nil during a
// bucket.Update, the existing bucket info is not modified. A bucket's
// metadata can be removed by updating with an empty map.
Info map[string]string
// Reports or sets bucket lifecycle rules. If nil during a bucket.Update,
// the rules are not modified. A bucket's rules can be removed by updating
// with an empty slice.
LifecycleRules []LifecycleRule
}
// A LifecycleRule describes an object's life cycle, namely how many days after
// uploading an object should be hidden, and after how many days hidden an
// object should be deleted. Multiple rules may not apply to the same file or
// set of files. Be careful when using this feature; it can (is designed to)
// delete your data.
type LifecycleRule struct {
// Prefix specifies all the files in the bucket to which this rule applies.
Prefix string
// DaysUploadedUntilHidden specifies the number of days after which a file
// will automatically be hidden. 0 means "do not automatically hide new
// files".
DaysNewUntilHidden int
// DaysHiddenUntilDeleted specifies the number of days after which a hidden
// file is deleted. 0 means "do not automatically delete hidden files".
DaysHiddenUntilDeleted int
}
type b2err struct {
err error
notFoundErr bool
isUpdateConflict bool
}
func (e b2err) Error() string {
return e.err.Error()
}
// IsNotExist reports whether a given error indicates that an object or bucket
// does not exist.
func IsNotExist(err error) bool {
berr, ok := err.(b2err)
if !ok {
return false
}
return berr.notFoundErr
}
// Bucket returns a bucket if it exists.
func (c *Client) Bucket(ctx context.Context, name string) (*Bucket, error) {
buckets, err := c.backend.listBuckets(ctx)
if err != nil {
return nil, err
}
for _, bucket := range buckets {
if bucket.name() == name {
return &Bucket{
b: bucket,
r: c.backend,
c: c,
}, nil
}
}
return nil, b2err{
err: fmt.Errorf("%s: bucket not found", name),
notFoundErr: true,
}
}
// NewBucket returns a bucket. The bucket is created with the given attributes
// if it does not already exist. If attrs is nil, it is created as a private
// bucket with no info metadata and no lifecycle rules.
func (c *Client) NewBucket(ctx context.Context, name string, attrs *BucketAttrs) (*Bucket, error) {
buckets, err := c.backend.listBuckets(ctx)
if err != nil {
return nil, err
}
for _, bucket := range buckets {
if bucket.name() == name {
return &Bucket{
b: bucket,
r: c.backend,
c: c,
}, nil
}
}
if attrs == nil {
attrs = &BucketAttrs{Type: Private}
}
b, err := c.backend.createBucket(ctx, name, string(attrs.Type), attrs.Info, attrs.LifecycleRules)
if err != nil {
return nil, err
}
return &Bucket{
b: b,
r: c.backend,
c: c,
}, err
}
// ListBucket returns all the available buckets.
func (c *Client) ListBuckets(ctx context.Context) ([]*Bucket, error) {
bs, err := c.backend.listBuckets(ctx)
if err != nil {
return nil, err
}
var buckets []*Bucket
for _, b := range bs {
buckets = append(buckets, &Bucket{
b: b,
r: c.backend,
c: c,
})
}
return buckets, nil
}
// IsUpdateConflict reports whether a given error is the result of a bucket
// update conflict.
func IsUpdateConflict(err error) bool {
e, ok := err.(b2err)
if !ok {
return false
}
return e.isUpdateConflict
}
// Update modifies the given bucket with new attributes. It is possible that
// this method could fail with an update conflict, in which case you should
// retrieve the latest bucket attributes with Attrs and try again.
func (b *Bucket) Update(ctx context.Context, attrs *BucketAttrs) error {
return b.b.updateBucket(ctx, attrs)
}
// Attrs retrieves and returns the current bucket's attributes.
func (b *Bucket) Attrs(ctx context.Context) (*BucketAttrs, error) {
bucket, err := b.c.Bucket(ctx, b.Name())
if err != nil {
return nil, err
}
b.b = bucket.b
return b.b.attrs(), nil
}
var bNotExist = regexp.MustCompile("Bucket.*does not exist")
// Delete removes a bucket. The bucket must be empty.
func (b *Bucket) Delete(ctx context.Context) error {
err := b.b.deleteBucket(ctx)
if err == nil {
return err
}
// So, the B2 documentation disagrees with the implementation here, and the
// error code is not really helpful. If the bucket doesn't exist, the error is
// 400, not 404, and the string is "Bucket <name> does not exist". However, the
// documentation says it will be "Bucket id <name> does not exist". In case
// they update the implementation to match the documentation, we're just going
// to regexp over the error message and hope it's okay.
if bNotExist.MatchString(err.Error()) {
return b2err{
err: err,
notFoundErr: true,
}
}
return err
}
// BaseURL returns the base URL to use for all files uploaded to this bucket.
func (b *Bucket) BaseURL() string {
return b.b.baseURL()
}
// Name returns the bucket's name.
func (b *Bucket) Name() string {
return b.b.name()
}
// Object represents a B2 object.
type Object struct {
attrs *Attrs
name string
f beFileInterface
b *Bucket
}
// Attrs holds an object's metadata.
type Attrs struct {
Name string // Not used on upload.
Size int64 // Not used on upload.
ContentType string // Used on upload, default is "application/octet-stream".
Status ObjectState // Not used on upload.
UploadTimestamp time.Time // Not used on upload.
SHA1 string // Not used on upload. Can be "none" for large files.
LastModified time.Time // If present, and there are fewer than 10 keys in the Info field, this is saved on upload.
Info map[string]string // Save arbitrary metadata on upload, but limited to 10 keys.
}
// Name returns an object's name
func (o *Object) Name() string {
return o.name
}
// Attrs returns an object's attributes.
func (o *Object) Attrs(ctx context.Context) (*Attrs, error) {
if err := o.ensure(ctx); err != nil {
return nil, err
}
fi, err := o.f.getFileInfo(ctx)
if err != nil {
return nil, err
}
name, sha, size, ct, info, st, stamp := fi.stats()
var state ObjectState
switch st {
case "upload":
state = Uploaded
case "start":
state = Started
case "hide":
state = Hider
case "folder":
state = Folder
}
var mtime time.Time
if v, ok := info["src_last_modified_millis"]; ok {
ms, err := strconv.ParseInt(v, 10, 64)
if err != nil {
return nil, err
}
mtime = time.Unix(ms/1e3, (ms%1e3)*1e6)
delete(info, "src_last_modified_millis")
}
return &Attrs{
Name: name,
Size: size,
ContentType: ct,
UploadTimestamp: stamp,
SHA1: sha,
Info: info,
Status: state,
LastModified: mtime,
}, nil
}
// ObjectState represents the various states an object can be in.
type ObjectState int
const (
Unknown ObjectState = iota
// Started represents a large upload that has been started but not finished
// or canceled.
Started
// Uploaded represents an object that has finished uploading and is complete.
Uploaded
// Hider represents an object that exists only to hide another object. It
// cannot in itself be downloaded and, in particular, is not a hidden object.
Hider
// Folder is a special state given to non-objects that are returned during a
// List*Objects call with a non-empty Delimiter.
Folder
)
// Object returns a reference to the named object in the bucket. Hidden
// objects cannot be referenced in this manner; they can only be found by
// finding the appropriate reference in ListObjects.
func (b *Bucket) Object(name string) *Object {
return &Object{
name: name,
b: b,
}
}
// URL returns the full URL to the given object.
func (o *Object) URL() string {
return fmt.Sprintf("%s/file/%s/%s", o.b.BaseURL(), o.b.Name(), o.name)
}
// NewWriter returns a new writer for the given object. Objects that are
// overwritten are not deleted, but are "hidden".
//
// Callers must close the writer when finished and check the error status.
func (o *Object) NewWriter(ctx context.Context) *Writer {
ctx, cancel := context.WithCancel(ctx)
return &Writer{
o: o,
name: o.name,
ctx: ctx,
cancel: cancel,
}
}
// NewRangeReader returns a reader for the given object, reading up to length
// bytes. If length is negative, the rest of the object is read.
func (o *Object) NewRangeReader(ctx context.Context, offset, length int64) *Reader {
ctx, cancel := context.WithCancel(ctx)
return &Reader{
ctx: ctx,
cancel: cancel,
o: o,
name: o.name,
chunks: make(map[int]*bytes.Buffer),
length: length,
offset: offset,
}
}
// NewReader returns a reader for the given object.
func (o *Object) NewReader(ctx context.Context) *Reader {
return o.NewRangeReader(ctx, 0, -1)
}
func (o *Object) ensure(ctx context.Context) error {
if o.f == nil {
f, err := o.b.getObject(ctx, o.name)
if err != nil {
return err
}
o.f = f.f
}
return nil
}
// Delete removes the given object.
func (o *Object) Delete(ctx context.Context) error {
if err := o.ensure(ctx); err != nil {
return err
}
return o.f.deleteFileVersion(ctx)
}
// Cursor is passed to ListObjects to return subsequent pages.
type Cursor struct {
// Prefix limits the listed objects to those that begin with this string.
Prefix string
// Delimiter denotes the path separator. If set, object listings will be
// truncated at this character.
//
// For example, if the bucket contains objects foo/bar, foo/baz, and foo,
// then a delimiter of "/" will cause the listing to return "foo" and "foo/".
// Otherwise, the listing would have returned all object names.
//
// Note that objects returned that end in the delimiter may not be actual
// objects, e.g. you cannot read from (or write to, or delete) an object "foo/",
// both because no actual object exists and because B2 disallows object names
// that end with "/". If you want to ensure that all objects returned by
// ListObjects and ListCurrentObjects are actual objects, leave this unset.
Delimiter string
name string
id string
}
// ListObjects returns all objects in the bucket, including multiple versions
// of the same object. Cursor may be nil; when passed to a subsequent query,
// it will continue the listing.
//
// ListObjects will return io.EOF when there are no objects left in the bucket,
// however it may do so concurrently with the last objects.
func (b *Bucket) ListObjects(ctx context.Context, count int, c *Cursor) ([]*Object, *Cursor, error) {
if c == nil {
c = &Cursor{}
}
fs, name, id, err := b.b.listFileVersions(ctx, count, c.name, c.id, c.Prefix, c.Delimiter)
if err != nil {
return nil, nil, err
}
var next *Cursor
if name != "" && id != "" {
next = &Cursor{
Prefix: c.Prefix,
Delimiter: c.Delimiter,
name: name,
id: id,
}
}
var objects []*Object
for _, f := range fs {
objects = append(objects, &Object{
name: f.name(),
f: f,
b: b,
})
}
var rtnErr error
if len(objects) == 0 || next == nil {
rtnErr = io.EOF
}
return objects, next, rtnErr
}
// ListCurrentObjects is similar to ListObjects, except that it returns only
// current, unhidden objects in the bucket.
func (b *Bucket) ListCurrentObjects(ctx context.Context, count int, c *Cursor) ([]*Object, *Cursor, error) {
if c == nil {
c = &Cursor{}
}
fs, name, err := b.b.listFileNames(ctx, count, c.name, c.Prefix, c.Delimiter)
if err != nil {
return nil, nil, err
}
var next *Cursor
if name != "" {
next = &Cursor{
Prefix: c.Prefix,
Delimiter: c.Delimiter,
name: name,
}
}
var objects []*Object
for _, f := range fs {
objects = append(objects, &Object{
name: f.name(),
f: f,
b: b,
})
}
var rtnErr error
if len(objects) == 0 || next == nil {
rtnErr = io.EOF
}
return objects, next, rtnErr
}
// Hide hides the object from name-based listing.
func (o *Object) Hide(ctx context.Context) error {
if err := o.ensure(ctx); err != nil {
return err
}
_, err := o.b.b.hideFile(ctx, o.name)
return err
}
// Reveal unhides (if hidden) the named object. If there are multiple objects
// of a given name, it will reveal the most recent.
func (b *Bucket) Reveal(ctx context.Context, name string) error {
cur := &Cursor{
name: name,
}
objs, _, err := b.ListObjects(ctx, 1, cur)
if err != nil && err != io.EOF {
return err
}
if len(objs) < 1 || objs[0].name != name {
return b2err{err: fmt.Errorf("%s: not found", name), notFoundErr: true}
}
obj := objs[0]
if obj.f.status() != "hide" {
return nil
}
return obj.Delete(ctx)
}
func (b *Bucket) getObject(ctx context.Context, name string) (*Object, error) {
fs, _, err := b.b.listFileNames(ctx, 1, name, "", "")
if err != nil {
return nil, err
}
if len(fs) < 1 {
return nil, b2err{err: fmt.Errorf("%s: not found", name), notFoundErr: true}
}
f := fs[0]
if f.name() != name {
return nil, b2err{err: fmt.Errorf("%s: not found", name), notFoundErr: true}
}
return &Object{
name: name,
f: f,
b: b,
}, nil
}
// AuthToken returns an authorization token that can be used to access objects
// in a private bucket. Only objects that begin with prefix can be accessed.
// The token expires after the given duration.
func (b *Bucket) AuthToken(ctx context.Context, prefix string, valid time.Duration) (string, error) {
return b.b.getDownloadAuthorization(ctx, prefix, valid)
}

View File

@ -0,0 +1,668 @@
// Copyright 2016, Google
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package b2
import (
"bytes"
"crypto/sha1"
"fmt"
"io"
"io/ioutil"
"net/http"
"sort"
"strings"
"sync"
"testing"
"time"
"golang.org/x/net/context"
)
const (
bucketName = "b2-tests"
smallFileName = "TeenyTiny"
largeFileName = "BigBytes"
)
type testError struct {
retry bool
backoff time.Duration
reauth bool
reupload bool
}
func (t testError) Error() string {
return fmt.Sprintf("retry %v; backoff %v; reauth %v; reupload %v", t.retry, t.backoff, t.reauth, t.reupload)
}
type errCont struct {
errMap map[string]map[int]error
opMap map[string]int
}
func (e *errCont) getError(name string) error {
if e.errMap == nil {
return nil
}
if e.opMap == nil {
e.opMap = make(map[string]int)
}
i := e.opMap[name]
e.opMap[name]++
return e.errMap[name][i]
}
type testRoot struct {
errs *errCont
auths int
bucketMap map[string]map[string]string
}
func (t *testRoot) authorizeAccount(context.Context, string, string, ...ClientOption) error {
t.auths++
return nil
}
func (t *testRoot) backoff(err error) time.Duration {
e, ok := err.(testError)
if !ok {
return 0
}
return e.backoff
}
func (t *testRoot) reauth(err error) bool {
e, ok := err.(testError)
if !ok {
return false
}
return e.reauth
}
func (t *testRoot) reupload(err error) bool {
e, ok := err.(testError)
if !ok {
return false
}
return e.reupload
}
func (t *testRoot) transient(err error) bool {
e, ok := err.(testError)
if !ok {
return false
}
return e.retry || e.reupload || e.backoff > 0
}
func (t *testRoot) createBucket(_ context.Context, name, _ string, _ map[string]string, _ []LifecycleRule) (b2BucketInterface, error) {
if err := t.errs.getError("createBucket"); err != nil {
return nil, err
}
if _, ok := t.bucketMap[name]; ok {
return nil, fmt.Errorf("%s: bucket exists", name)
}
m := make(map[string]string)
t.bucketMap[name] = m
return &testBucket{
n: name,
errs: t.errs,
files: m,
}, nil
}
func (t *testRoot) listBuckets(context.Context) ([]b2BucketInterface, error) {
var b []b2BucketInterface
for k, v := range t.bucketMap {
b = append(b, &testBucket{
n: k,
errs: t.errs,
files: v,
})
}
return b, nil
}
type testBucket struct {
n string
errs *errCont
files map[string]string
}
func (t *testBucket) name() string { return t.n }
func (t *testBucket) btype() string { return "allPrivate" }
func (t *testBucket) attrs() *BucketAttrs { return nil }
func (t *testBucket) deleteBucket(context.Context) error { return nil }
func (t *testBucket) updateBucket(context.Context, *BucketAttrs) error { return nil }
func (t *testBucket) getUploadURL(context.Context) (b2URLInterface, error) {
if err := t.errs.getError("getUploadURL"); err != nil {
return nil, err
}
return &testURL{
files: t.files,
}, nil
}
func (t *testBucket) startLargeFile(_ context.Context, name, _ string, _ map[string]string) (b2LargeFileInterface, error) {
return &testLargeFile{
name: name,
parts: make(map[int][]byte),
files: t.files,
errs: t.errs,
}, nil
}
func (t *testBucket) listFileNames(ctx context.Context, count int, cont, pfx, del string) ([]b2FileInterface, string, error) {
var f []string
for name := range t.files {
f = append(f, name)
}
sort.Strings(f)
idx := sort.SearchStrings(f, cont)
var b []b2FileInterface
var next string
for i := idx; i < len(f) && i-idx < count; i++ {
b = append(b, &testFile{
n: f[i],
s: int64(len(t.files[f[i]])),
files: t.files,
})
if i+1 < len(f) {
next = f[i+1]
}
if i+1 == len(f) {
next = ""
}
}
return b, next, nil
}
func (t *testBucket) listFileVersions(ctx context.Context, count int, a, b, c, d string) ([]b2FileInterface, string, string, error) {
x, y, z := t.listFileNames(ctx, count, a, c, d)
return x, y, "", z
}
func (t *testBucket) downloadFileByName(_ context.Context, name string, offset, size int64) (b2FileReaderInterface, error) {
return &testFileReader{
b: ioutil.NopCloser(bytes.NewBufferString(t.files[name][offset : offset+size])),
}, nil
}
func (t *testBucket) hideFile(context.Context, string) (b2FileInterface, error) { return nil, nil }
func (t *testBucket) getDownloadAuthorization(context.Context, string, time.Duration) (string, error) {
return "", nil
}
func (t *testBucket) baseURL() string { return "" }
type testURL struct {
files map[string]string
}
func (t *testURL) reload(context.Context) error { return nil }
func (t *testURL) uploadFile(_ context.Context, r io.Reader, _ int, name, _, _ string, _ map[string]string) (b2FileInterface, error) {
buf := &bytes.Buffer{}
if _, err := io.Copy(buf, r); err != nil {
return nil, err
}
t.files[name] = buf.String()
return &testFile{
n: name,
s: int64(len(t.files[name])),
files: t.files,
}, nil
}
type testLargeFile struct {
name string
mux sync.Mutex
parts map[int][]byte
files map[string]string
errs *errCont
}
func (t *testLargeFile) finishLargeFile(context.Context) (b2FileInterface, error) {
var total []byte
for i := 1; i <= len(t.parts); i++ {
total = append(total, t.parts[i]...)
}
t.files[t.name] = string(total)
return &testFile{
n: t.name,
s: int64(len(total)),
files: t.files,
}, nil
}
func (t *testLargeFile) getUploadPartURL(context.Context) (b2FileChunkInterface, error) {
return &testFileChunk{
parts: t.parts,
mux: &t.mux,
errs: t.errs,
}, nil
}
type testFileChunk struct {
mux *sync.Mutex
parts map[int][]byte
errs *errCont
}
func (t *testFileChunk) reload(context.Context) error { return nil }
func (t *testFileChunk) uploadPart(_ context.Context, r io.Reader, _ string, _, index int) (int, error) {
if err := t.errs.getError("uploadPart"); err != nil {
return 0, err
}
buf := &bytes.Buffer{}
i, err := io.Copy(buf, r)
if err != nil {
return int(i), err
}
t.mux.Lock()
t.parts[index] = buf.Bytes()
t.mux.Unlock()
return int(i), nil
}
type testFile struct {
n string
s int64
t time.Time
a string
files map[string]string
}
func (t *testFile) name() string { return t.n }
func (t *testFile) size() int64 { return t.s }
func (t *testFile) timestamp() time.Time { return t.t }
func (t *testFile) status() string { return t.a }
func (t *testFile) compileParts(int64, map[int]string) b2LargeFileInterface {
panic("not implemented")
}
func (t *testFile) getFileInfo(context.Context) (b2FileInfoInterface, error) {
return nil, nil
}
func (t *testFile) listParts(context.Context, int, int) ([]b2FilePartInterface, int, error) {
return nil, 0, nil
}
func (t *testFile) deleteFileVersion(context.Context) error {
delete(t.files, t.n)
return nil
}
type testFileReader struct {
b io.ReadCloser
s int64
}
func (t *testFileReader) Read(p []byte) (int, error) { return t.b.Read(p) }
func (t *testFileReader) Close() error { return nil }
func (t *testFileReader) stats() (int, string, string, map[string]string) { return 0, "", "", nil }
type zReader struct{}
var pattern = []byte{0x02, 0x80, 0xff, 0x1a, 0xcc, 0x63, 0x22}
func (zReader) Read(p []byte) (int, error) {
for i := 0; i+len(pattern) < len(p); i += len(pattern) {
copy(p[i:], pattern)
}
return len(p), nil
}
func TestReauth(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
root := &testRoot{
bucketMap: make(map[string]map[string]string),
errs: &errCont{
errMap: map[string]map[int]error{
"createBucket": {0: testError{reauth: true}},
},
},
}
client := &Client{
backend: &beRoot{
b2i: root,
},
}
auths := root.auths
if _, err := client.NewBucket(ctx, "fun", &BucketAttrs{Type: Private}); err != nil {
t.Errorf("bucket should not err, got %v", err)
}
if root.auths != auths+1 {
t.Errorf("client should have re-authenticated; did not")
}
}
func TestBackoff(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
var calls []time.Duration
ch := make(chan time.Time)
close(ch)
after = func(d time.Duration) <-chan time.Time {
calls = append(calls, d)
return ch
}
table := []struct {
root *testRoot
want int
}{
{
root: &testRoot{
bucketMap: make(map[string]map[string]string),
errs: &errCont{
errMap: map[string]map[int]error{
"createBucket": {
0: testError{backoff: time.Second},
1: testError{backoff: 2 * time.Second},
},
},
},
},
want: 2,
},
{
root: &testRoot{
bucketMap: make(map[string]map[string]string),
errs: &errCont{
errMap: map[string]map[int]error{
"getUploadURL": {
0: testError{retry: true},
},
},
},
},
want: 1,
},
}
var total int
for _, ent := range table {
client := &Client{
backend: &beRoot{
b2i: ent.root,
},
}
b, err := client.NewBucket(ctx, "fun", &BucketAttrs{Type: Private})
if err != nil {
t.Fatal(err)
}
o := b.Object("foo")
w := o.NewWriter(ctx)
if _, err := io.Copy(w, bytes.NewBufferString("foo")); err != nil {
t.Fatal(err)
}
if err := w.Close(); err != nil {
t.Fatal(err)
}
total += ent.want
}
if len(calls) != total {
t.Errorf("got %d calls, wanted %d", len(calls), total)
}
}
func TestBackoffWithoutRetryAfter(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
var calls []time.Duration
ch := make(chan time.Time)
close(ch)
after = func(d time.Duration) <-chan time.Time {
calls = append(calls, d)
return ch
}
root := &testRoot{
bucketMap: make(map[string]map[string]string),
errs: &errCont{
errMap: map[string]map[int]error{
"createBucket": {
0: testError{retry: true},
1: testError{retry: true},
},
},
},
}
client := &Client{
backend: &beRoot{
b2i: root,
},
}
if _, err := client.NewBucket(ctx, "fun", &BucketAttrs{Type: Private}); err != nil {
t.Errorf("bucket should not err, got %v", err)
}
if len(calls) != 2 {
t.Errorf("wrong number of backoff calls; got %d, want 2", len(calls))
}
}
type badTransport struct{}
func (badTransport) RoundTrip(r *http.Request) (*http.Response, error) {
return &http.Response{
Status: "700 What",
StatusCode: 700,
Body: ioutil.NopCloser(bytes.NewBufferString("{}")),
Request: r,
}, nil
}
func TestCustomTransport(t *testing.T) {
ctx := context.Background()
// Sorta fragile but...
_, err := NewClient(ctx, "abcd", "efgh", Transport(badTransport{}))
if err == nil {
t.Error("NewClient returned successfully, expected an error")
}
if !strings.Contains(err.Error(), "700") {
t.Errorf("Expected nonsense error code 700, got %v", err)
}
}
func TestReaderDoubleClose(t *testing.T) {
ctx := context.Background()
client := &Client{
backend: &beRoot{
b2i: &testRoot{
bucketMap: make(map[string]map[string]string),
errs: &errCont{},
},
},
}
bucket, err := client.NewBucket(ctx, "bucket", &BucketAttrs{Type: Private})
if err != nil {
t.Fatal(err)
}
o, _, err := writeFile(ctx, bucket, "file", 10, 10)
if err != nil {
t.Fatal(err)
}
r := o.NewReader(ctx)
// Read to EOF, and then read some more.
if _, err := io.Copy(ioutil.Discard, r); err != nil {
t.Fatal(err)
}
if _, err := io.Copy(ioutil.Discard, r); err != nil {
t.Fatal(err)
}
}
func TestReadWrite(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
client := &Client{
backend: &beRoot{
b2i: &testRoot{
bucketMap: make(map[string]map[string]string),
errs: &errCont{},
},
},
}
bucket, err := client.NewBucket(ctx, bucketName, &BucketAttrs{Type: Private})
if err != nil {
t.Fatal(err)
}
defer func() {
if err := bucket.Delete(ctx); err != nil {
t.Error(err)
}
}()
sobj, wsha, err := writeFile(ctx, bucket, smallFileName, 1e6+42, 1e8)
if err != nil {
t.Fatal(err)
}
defer func() {
if err := sobj.Delete(ctx); err != nil {
t.Error(err)
}
}()
if err := readFile(ctx, sobj, wsha, 1e5, 10); err != nil {
t.Error(err)
}
lobj, wshaL, err := writeFile(ctx, bucket, largeFileName, 1e6-1e5, 1e4)
if err != nil {
t.Fatal(err)
}
defer func() {
if err := lobj.Delete(ctx); err != nil {
t.Error(err)
}
}()
if err := readFile(ctx, lobj, wshaL, 1e7, 10); err != nil {
t.Error(err)
}
}
func TestWriterReturnsError(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
client := &Client{
backend: &beRoot{
b2i: &testRoot{
bucketMap: make(map[string]map[string]string),
errs: &errCont{
errMap: map[string]map[int]error{
"uploadPart": {
0: testError{},
1: testError{},
2: testError{},
3: testError{},
4: testError{},
5: testError{},
6: testError{},
},
},
},
},
},
}
bucket, err := client.NewBucket(ctx, bucketName, &BucketAttrs{Type: Private})
if err != nil {
t.Fatal(err)
}
w := bucket.Object("test").NewWriter(ctx)
r := io.LimitReader(zReader{}, 1e7)
w.ChunkSize = 1e4
w.ConcurrentUploads = 4
if _, err := io.Copy(w, r); err == nil {
t.Fatalf("io.Copy: should have returned an error")
}
}
func TestFileBuffer(t *testing.T) {
r := io.LimitReader(zReader{}, 1e8)
w, err := newFileBuffer("")
if err != nil {
t.Fatal(err)
}
defer w.Close()
if _, err := io.Copy(w, r); err != nil {
t.Fatal(err)
}
bReader, err := w.Reader()
if err != nil {
t.Fatal(err)
}
hsh := sha1.New()
if _, err := io.Copy(hsh, bReader); err != nil {
t.Fatal(err)
}
hshText := fmt.Sprintf("%x", hsh.Sum(nil))
if hshText != w.Hash() {
t.Errorf("hashes are not equal: bufferWriter is %q, read buffer is %q", w.Hash(), hshText)
}
}
func writeFile(ctx context.Context, bucket *Bucket, name string, size int64, csize int) (*Object, string, error) {
r := io.LimitReader(zReader{}, size)
o := bucket.Object(name)
f := o.NewWriter(ctx)
h := sha1.New()
w := io.MultiWriter(f, h)
f.ConcurrentUploads = 5
f.ChunkSize = csize
if _, err := io.Copy(w, r); err != nil {
return nil, "", err
}
if err := f.Close(); err != nil {
return nil, "", err
}
return o, fmt.Sprintf("%x", h.Sum(nil)), nil
}
func readFile(ctx context.Context, obj *Object, sha string, chunk, concur int) error {
r := obj.NewReader(ctx)
r.ChunkSize = chunk
r.ConcurrentDownloads = concur
h := sha1.New()
if _, err := io.Copy(h, r); err != nil {
return err
}
if err := r.Close(); err != nil {
return err
}
rsha := fmt.Sprintf("%x", h.Sum(nil))
if sha != rsha {
return fmt.Errorf("bad hash: got %s, want %s", rsha, sha)
}
return nil
}

View File

@ -0,0 +1,659 @@
// Copyright 2016, Google
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package b2
import (
"io"
"math/rand"
"time"
"golang.org/x/net/context"
)
// This file wraps the baseline interfaces with backoff and retry semantics.
type beRootInterface interface {
backoff(error) time.Duration
reauth(error) bool
transient(error) bool
reupload(error) bool
authorizeAccount(context.Context, string, string, ...ClientOption) error
reauthorizeAccount(context.Context) error
createBucket(ctx context.Context, name, btype string, info map[string]string, rules []LifecycleRule) (beBucketInterface, error)
listBuckets(context.Context) ([]beBucketInterface, error)
}
type beRoot struct {
account, key string
b2i b2RootInterface
}
type beBucketInterface interface {
name() string
btype() BucketType
attrs() *BucketAttrs
updateBucket(context.Context, *BucketAttrs) error
deleteBucket(context.Context) error
getUploadURL(context.Context) (beURLInterface, error)
startLargeFile(ctx context.Context, name, contentType string, info map[string]string) (beLargeFileInterface, error)
listFileNames(context.Context, int, string, string, string) ([]beFileInterface, string, error)
listFileVersions(context.Context, int, string, string, string, string) ([]beFileInterface, string, string, error)
downloadFileByName(context.Context, string, int64, int64) (beFileReaderInterface, error)
hideFile(context.Context, string) (beFileInterface, error)
getDownloadAuthorization(context.Context, string, time.Duration) (string, error)
baseURL() string
}
type beBucket struct {
b2bucket b2BucketInterface
ri beRootInterface
}
type beURLInterface interface {
uploadFile(context.Context, io.ReadSeeker, int, string, string, string, map[string]string) (beFileInterface, error)
}
type beURL struct {
b2url b2URLInterface
ri beRootInterface
}
type beFileInterface interface {
name() string
size() int64
timestamp() time.Time
status() string
deleteFileVersion(context.Context) error
getFileInfo(context.Context) (beFileInfoInterface, error)
listParts(context.Context, int, int) ([]beFilePartInterface, int, error)
compileParts(int64, map[int]string) beLargeFileInterface
}
type beFile struct {
b2file b2FileInterface
url beURLInterface
ri beRootInterface
}
type beLargeFileInterface interface {
finishLargeFile(context.Context) (beFileInterface, error)
getUploadPartURL(context.Context) (beFileChunkInterface, error)
}
type beLargeFile struct {
b2largeFile b2LargeFileInterface
ri beRootInterface
}
type beFileChunkInterface interface {
reload(context.Context) error
uploadPart(context.Context, io.ReadSeeker, string, int, int) (int, error)
}
type beFileChunk struct {
b2fileChunk b2FileChunkInterface
ri beRootInterface
}
type beFileReaderInterface interface {
io.ReadCloser
stats() (int, string, string, map[string]string)
}
type beFileReader struct {
b2fileReader b2FileReaderInterface
ri beRootInterface
}
type beFileInfoInterface interface {
stats() (string, string, int64, string, map[string]string, string, time.Time)
}
type beFilePartInterface interface {
number() int
sha1() string
size() int64
}
type beFilePart struct {
b2filePart b2FilePartInterface
ri beRootInterface
}
type beFileInfo struct {
name string
sha string
size int64
ct string
info map[string]string
status string
stamp time.Time
}
func (r *beRoot) backoff(err error) time.Duration { return r.b2i.backoff(err) }
func (r *beRoot) reauth(err error) bool { return r.b2i.reauth(err) }
func (r *beRoot) reupload(err error) bool { return r.b2i.reupload(err) }
func (r *beRoot) transient(err error) bool { return r.b2i.transient(err) }
func (r *beRoot) authorizeAccount(ctx context.Context, account, key string, opts ...ClientOption) error {
f := func() error {
if err := r.b2i.authorizeAccount(ctx, account, key, opts...); err != nil {
return err
}
r.account = account
r.key = key
return nil
}
return withBackoff(ctx, r, f)
}
func (r *beRoot) reauthorizeAccount(ctx context.Context) error {
return r.authorizeAccount(ctx, r.account, r.key)
}
func (r *beRoot) createBucket(ctx context.Context, name, btype string, info map[string]string, rules []LifecycleRule) (beBucketInterface, error) {
var bi beBucketInterface
f := func() error {
g := func() error {
bucket, err := r.b2i.createBucket(ctx, name, btype, info, rules)
if err != nil {
return err
}
bi = &beBucket{
b2bucket: bucket,
ri: r,
}
return nil
}
return withReauth(ctx, r, g)
}
if err := withBackoff(ctx, r, f); err != nil {
return nil, err
}
return bi, nil
}
func (r *beRoot) listBuckets(ctx context.Context) ([]beBucketInterface, error) {
var buckets []beBucketInterface
f := func() error {
g := func() error {
bs, err := r.b2i.listBuckets(ctx)
if err != nil {
return err
}
for _, b := range bs {
buckets = append(buckets, &beBucket{
b2bucket: b,
ri: r,
})
}
return nil
}
return withReauth(ctx, r, g)
}
if err := withBackoff(ctx, r, f); err != nil {
return nil, err
}
return buckets, nil
}
func (b *beBucket) name() string {
return b.b2bucket.name()
}
func (b *beBucket) btype() BucketType {
return BucketType(b.b2bucket.btype())
}
func (b *beBucket) attrs() *BucketAttrs {
return b.b2bucket.attrs()
}
func (b *beBucket) updateBucket(ctx context.Context, attrs *BucketAttrs) error {
f := func() error {
g := func() error {
return b.b2bucket.updateBucket(ctx, attrs)
}
return withReauth(ctx, b.ri, g)
}
return withBackoff(ctx, b.ri, f)
}
func (b *beBucket) deleteBucket(ctx context.Context) error {
f := func() error {
g := func() error {
return b.b2bucket.deleteBucket(ctx)
}
return withReauth(ctx, b.ri, g)
}
return withBackoff(ctx, b.ri, f)
}
func (b *beBucket) getUploadURL(ctx context.Context) (beURLInterface, error) {
var url beURLInterface
f := func() error {
g := func() error {
u, err := b.b2bucket.getUploadURL(ctx)
if err != nil {
return err
}
url = &beURL{
b2url: u,
ri: b.ri,
}
return nil
}
return withReauth(ctx, b.ri, g)
}
if err := withBackoff(ctx, b.ri, f); err != nil {
return nil, err
}
return url, nil
}
func (b *beBucket) startLargeFile(ctx context.Context, name, ct string, info map[string]string) (beLargeFileInterface, error) {
var file beLargeFileInterface
f := func() error {
g := func() error {
f, err := b.b2bucket.startLargeFile(ctx, name, ct, info)
if err != nil {
return err
}
file = &beLargeFile{
b2largeFile: f,
ri: b.ri,
}
return nil
}
return withReauth(ctx, b.ri, g)
}
if err := withBackoff(ctx, b.ri, f); err != nil {
return nil, err
}
return file, nil
}
func (b *beBucket) listFileNames(ctx context.Context, count int, continuation, prefix, delimiter string) ([]beFileInterface, string, error) {
var cont string
var files []beFileInterface
f := func() error {
g := func() error {
fs, c, err := b.b2bucket.listFileNames(ctx, count, continuation, prefix, delimiter)
if err != nil {
return err
}
cont = c
for _, f := range fs {
files = append(files, &beFile{
b2file: f,
ri: b.ri,
})
}
return nil
}
return withReauth(ctx, b.ri, g)
}
if err := withBackoff(ctx, b.ri, f); err != nil {
return nil, "", err
}
return files, cont, nil
}
func (b *beBucket) listFileVersions(ctx context.Context, count int, nextName, nextID, prefix, delimiter string) ([]beFileInterface, string, string, error) {
var name, id string
var files []beFileInterface
f := func() error {
g := func() error {
fs, n, d, err := b.b2bucket.listFileVersions(ctx, count, nextName, nextID, prefix, delimiter)
if err != nil {
return err
}
name = n
id = d
for _, f := range fs {
files = append(files, &beFile{
b2file: f,
ri: b.ri,
})
}
return nil
}
return withReauth(ctx, b.ri, g)
}
if err := withBackoff(ctx, b.ri, f); err != nil {
return nil, "", "", err
}
return files, name, id, nil
}
func (b *beBucket) downloadFileByName(ctx context.Context, name string, offset, size int64) (beFileReaderInterface, error) {
var reader beFileReaderInterface
f := func() error {
g := func() error {
fr, err := b.b2bucket.downloadFileByName(ctx, name, offset, size)
if err != nil {
return err
}
reader = &beFileReader{
b2fileReader: fr,
ri: b.ri,
}
return nil
}
return withReauth(ctx, b.ri, g)
}
if err := withBackoff(ctx, b.ri, f); err != nil {
return nil, err
}
return reader, nil
}
func (b *beBucket) hideFile(ctx context.Context, name string) (beFileInterface, error) {
var file beFileInterface
f := func() error {
g := func() error {
f, err := b.b2bucket.hideFile(ctx, name)
if err != nil {
return err
}
file = &beFile{
b2file: f,
ri: b.ri,
}
return nil
}
return withReauth(ctx, b.ri, g)
}
if err := withBackoff(ctx, b.ri, f); err != nil {
return nil, err
}
return file, nil
}
func (b *beBucket) getDownloadAuthorization(ctx context.Context, p string, v time.Duration) (string, error) {
var tok string
f := func() error {
g := func() error {
t, err := b.b2bucket.getDownloadAuthorization(ctx, p, v)
if err != nil {
return err
}
tok = t
return nil
}
return withReauth(ctx, b.ri, g)
}
if err := withBackoff(ctx, b.ri, f); err != nil {
return "", err
}
return tok, nil
}
func (b *beBucket) baseURL() string {
return b.b2bucket.baseURL()
}
func (b *beURL) uploadFile(ctx context.Context, r io.ReadSeeker, size int, name, ct, sha1 string, info map[string]string) (beFileInterface, error) {
var file beFileInterface
f := func() error {
if _, err := r.Seek(0, 0); err != nil {
return err
}
f, err := b.b2url.uploadFile(ctx, r, size, name, ct, sha1, info)
if err != nil {
return err
}
file = &beFile{
b2file: f,
url: b,
ri: b.ri,
}
return nil
}
if err := withBackoff(ctx, b.ri, f); err != nil {
return nil, err
}
return file, nil
}
func (b *beFile) deleteFileVersion(ctx context.Context) error {
f := func() error {
g := func() error {
return b.b2file.deleteFileVersion(ctx)
}
return withReauth(ctx, b.ri, g)
}
return withBackoff(ctx, b.ri, f)
}
func (b *beFile) size() int64 {
return b.b2file.size()
}
func (b *beFile) name() string {
return b.b2file.name()
}
func (b *beFile) timestamp() time.Time {
return b.b2file.timestamp()
}
func (b *beFile) status() string {
return b.b2file.status()
}
func (b *beFile) getFileInfo(ctx context.Context) (beFileInfoInterface, error) {
var fileInfo beFileInfoInterface
f := func() error {
g := func() error {
fi, err := b.b2file.getFileInfo(ctx)
if err != nil {
return err
}
name, sha, size, ct, info, status, stamp := fi.stats()
fileInfo = &beFileInfo{
name: name,
sha: sha,
size: size,
ct: ct,
info: info,
status: status,
stamp: stamp,
}
return nil
}
return withReauth(ctx, b.ri, g)
}
if err := withBackoff(ctx, b.ri, f); err != nil {
return nil, err
}
return fileInfo, nil
}
func (b *beFile) listParts(ctx context.Context, next, count int) ([]beFilePartInterface, int, error) {
var fpi []beFilePartInterface
var rnxt int
f := func() error {
g := func() error {
ps, n, err := b.b2file.listParts(ctx, next, count)
if err != nil {
return err
}
rnxt = n
for _, p := range ps {
fpi = append(fpi, &beFilePart{
b2filePart: p,
ri: b.ri,
})
}
return nil
}
return withReauth(ctx, b.ri, g)
}
if err := withBackoff(ctx, b.ri, f); err != nil {
return nil, 0, err
}
return fpi, rnxt, nil
}
func (b *beFile) compileParts(size int64, seen map[int]string) beLargeFileInterface {
return &beLargeFile{
b2largeFile: b.b2file.compileParts(size, seen),
ri: b.ri,
}
}
func (b *beLargeFile) getUploadPartURL(ctx context.Context) (beFileChunkInterface, error) {
var chunk beFileChunkInterface
f := func() error {
g := func() error {
fc, err := b.b2largeFile.getUploadPartURL(ctx)
if err != nil {
return err
}
chunk = &beFileChunk{
b2fileChunk: fc,
ri: b.ri,
}
return nil
}
return withReauth(ctx, b.ri, g)
}
if err := withBackoff(ctx, b.ri, f); err != nil {
return nil, err
}
return chunk, nil
}
func (b *beLargeFile) finishLargeFile(ctx context.Context) (beFileInterface, error) {
var file beFileInterface
f := func() error {
g := func() error {
f, err := b.b2largeFile.finishLargeFile(ctx)
if err != nil {
return err
}
file = &beFile{
b2file: f,
ri: b.ri,
}
return nil
}
return withReauth(ctx, b.ri, g)
}
if err := withBackoff(ctx, b.ri, f); err != nil {
return nil, err
}
return file, nil
}
func (b *beFileChunk) reload(ctx context.Context) error {
f := func() error {
g := func() error {
return b.b2fileChunk.reload(ctx)
}
return withReauth(ctx, b.ri, g)
}
return withBackoff(ctx, b.ri, f)
}
func (b *beFileChunk) uploadPart(ctx context.Context, r io.ReadSeeker, sha1 string, size, index int) (int, error) {
// no re-auth; pass it back up to the caller so they can get an new upload URI and token
// TODO: we should handle that here probably
var i int
f := func() error {
if _, err := r.Seek(0, 0); err != nil {
return err
}
j, err := b.b2fileChunk.uploadPart(ctx, r, sha1, size, index)
if err != nil {
return err
}
i = j
return nil
}
if err := withBackoff(ctx, b.ri, f); err != nil {
return 0, err
}
return i, nil
}
func (b *beFileReader) Read(p []byte) (int, error) {
return b.b2fileReader.Read(p)
}
func (b *beFileReader) Close() error {
return b.b2fileReader.Close()
}
func (b *beFileReader) stats() (int, string, string, map[string]string) {
return b.b2fileReader.stats()
}
func (b *beFileInfo) stats() (string, string, int64, string, map[string]string, string, time.Time) {
return b.name, b.sha, b.size, b.ct, b.info, b.status, b.stamp
}
func (b *beFilePart) number() int { return b.b2filePart.number() }
func (b *beFilePart) sha1() string { return b.b2filePart.sha1() }
func (b *beFilePart) size() int64 { return b.b2filePart.size() }
func jitter(d time.Duration) time.Duration {
f := float64(d)
f /= 50
f += f * (rand.Float64() - 0.5)
return time.Duration(f)
}
func getBackoff(d time.Duration) time.Duration {
if d > 15*time.Second {
return d + jitter(d)
}
return d*2 + jitter(d*2)
}
var after = time.After
func withBackoff(ctx context.Context, ri beRootInterface, f func() error) error {
backoff := 500 * time.Millisecond
for {
err := f()
if !ri.transient(err) {
return err
}
bo := ri.backoff(err)
if bo > 0 {
backoff = bo
} else {
backoff = getBackoff(backoff)
}
select {
case <-ctx.Done():
return ctx.Err()
case <-after(backoff):
}
}
}
func withReauth(ctx context.Context, ri beRootInterface, f func() error) error {
err := f()
if ri.reauth(err) {
if err := ri.reauthorizeAccount(ctx); err != nil {
return err
}
err = f()
}
return err
}

View File

@ -0,0 +1,425 @@
// Copyright 2016, Google
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package b2
import (
"io"
"time"
"github.com/kurin/blazer/base"
"golang.org/x/net/context"
)
// This file wraps the base package in a thin layer, for testing. It should be
// the only file in b2 that imports base.
type b2RootInterface interface {
authorizeAccount(context.Context, string, string, ...ClientOption) error
transient(error) bool
backoff(error) time.Duration
reauth(error) bool
reupload(error) bool
createBucket(context.Context, string, string, map[string]string, []LifecycleRule) (b2BucketInterface, error)
listBuckets(context.Context) ([]b2BucketInterface, error)
}
type b2BucketInterface interface {
name() string
btype() string
attrs() *BucketAttrs
updateBucket(context.Context, *BucketAttrs) error
deleteBucket(context.Context) error
getUploadURL(context.Context) (b2URLInterface, error)
startLargeFile(ctx context.Context, name, contentType string, info map[string]string) (b2LargeFileInterface, error)
listFileNames(context.Context, int, string, string, string) ([]b2FileInterface, string, error)
listFileVersions(context.Context, int, string, string, string, string) ([]b2FileInterface, string, string, error)
downloadFileByName(context.Context, string, int64, int64) (b2FileReaderInterface, error)
hideFile(context.Context, string) (b2FileInterface, error)
getDownloadAuthorization(context.Context, string, time.Duration) (string, error)
baseURL() string
}
type b2URLInterface interface {
reload(context.Context) error
uploadFile(context.Context, io.Reader, int, string, string, string, map[string]string) (b2FileInterface, error)
}
type b2FileInterface interface {
name() string
size() int64
timestamp() time.Time
status() string
deleteFileVersion(context.Context) error
getFileInfo(context.Context) (b2FileInfoInterface, error)
listParts(context.Context, int, int) ([]b2FilePartInterface, int, error)
compileParts(int64, map[int]string) b2LargeFileInterface
}
type b2LargeFileInterface interface {
finishLargeFile(context.Context) (b2FileInterface, error)
getUploadPartURL(context.Context) (b2FileChunkInterface, error)
}
type b2FileChunkInterface interface {
reload(context.Context) error
uploadPart(context.Context, io.Reader, string, int, int) (int, error)
}
type b2FileReaderInterface interface {
io.ReadCloser
stats() (int, string, string, map[string]string)
}
type b2FileInfoInterface interface {
stats() (string, string, int64, string, map[string]string, string, time.Time) // bleck
}
type b2FilePartInterface interface {
number() int
sha1() string
size() int64
}
type b2Root struct {
b *base.B2
}
type b2Bucket struct {
b *base.Bucket
}
type b2URL struct {
b *base.URL
}
type b2File struct {
b *base.File
}
type b2LargeFile struct {
b *base.LargeFile
}
type b2FileChunk struct {
b *base.FileChunk
}
type b2FileReader struct {
b *base.FileReader
}
type b2FileInfo struct {
b *base.FileInfo
}
type b2FilePart struct {
b *base.FilePart
}
func (b *b2Root) authorizeAccount(ctx context.Context, account, key string, opts ...ClientOption) error {
c := &clientOptions{}
for _, f := range opts {
f(c)
}
var aopts []base.AuthOption
if c.transport != nil {
aopts = append(aopts, base.Transport(c.transport))
}
nb, err := base.AuthorizeAccount(ctx, account, key, aopts...)
if err != nil {
return err
}
if b.b == nil {
b.b = nb
return nil
}
b.b.Update(nb)
return nil
}
func (*b2Root) backoff(err error) time.Duration {
if base.Action(err) != base.Retry {
return 0
}
return base.Backoff(err)
}
func (*b2Root) reauth(err error) bool {
return base.Action(err) == base.ReAuthenticate
}
func (*b2Root) reupload(err error) bool {
return base.Action(err) == base.AttemptNewUpload
}
func (*b2Root) transient(err error) bool {
return base.Action(err) == base.Retry
}
func (b *b2Root) createBucket(ctx context.Context, name, btype string, info map[string]string, rules []LifecycleRule) (b2BucketInterface, error) {
var baseRules []base.LifecycleRule
for _, rule := range rules {
baseRules = append(baseRules, base.LifecycleRule{
DaysNewUntilHidden: rule.DaysNewUntilHidden,
DaysHiddenUntilDeleted: rule.DaysHiddenUntilDeleted,
Prefix: rule.Prefix,
})
}
bucket, err := b.b.CreateBucket(ctx, name, btype, info, baseRules)
if err != nil {
return nil, err
}
return &b2Bucket{bucket}, nil
}
func (b *b2Root) listBuckets(ctx context.Context) ([]b2BucketInterface, error) {
buckets, err := b.b.ListBuckets(ctx)
if err != nil {
return nil, err
}
var rtn []b2BucketInterface
for _, bucket := range buckets {
rtn = append(rtn, &b2Bucket{bucket})
}
return rtn, err
}
func (b *b2Bucket) updateBucket(ctx context.Context, attrs *BucketAttrs) error {
if attrs == nil {
return nil
}
if attrs.Type != UnknownType {
b.b.Type = string(attrs.Type)
}
if attrs.Info != nil {
b.b.Info = attrs.Info
}
if attrs.LifecycleRules != nil {
rules := []base.LifecycleRule{}
for _, rule := range attrs.LifecycleRules {
rules = append(rules, base.LifecycleRule{
DaysNewUntilHidden: rule.DaysNewUntilHidden,
DaysHiddenUntilDeleted: rule.DaysHiddenUntilDeleted,
Prefix: rule.Prefix,
})
}
b.b.LifecycleRules = rules
}
newBucket, err := b.b.Update(ctx)
if err == nil {
b.b = newBucket
}
code, _ := base.Code(err)
if code == 409 {
return b2err{
err: err,
isUpdateConflict: true,
}
}
return err
}
func (b *b2Bucket) deleteBucket(ctx context.Context) error {
return b.b.DeleteBucket(ctx)
}
func (b *b2Bucket) name() string {
return b.b.Name
}
func (b *b2Bucket) btype() string {
return b.b.Type
}
func (b *b2Bucket) attrs() *BucketAttrs {
var rules []LifecycleRule
for _, rule := range b.b.LifecycleRules {
rules = append(rules, LifecycleRule{
DaysNewUntilHidden: rule.DaysNewUntilHidden,
DaysHiddenUntilDeleted: rule.DaysHiddenUntilDeleted,
Prefix: rule.Prefix,
})
}
return &BucketAttrs{
LifecycleRules: rules,
Info: b.b.Info,
Type: BucketType(b.b.Type),
}
}
func (b *b2Bucket) getUploadURL(ctx context.Context) (b2URLInterface, error) {
url, err := b.b.GetUploadURL(ctx)
if err != nil {
return nil, err
}
return &b2URL{url}, nil
}
func (b *b2Bucket) startLargeFile(ctx context.Context, name, ct string, info map[string]string) (b2LargeFileInterface, error) {
lf, err := b.b.StartLargeFile(ctx, name, ct, info)
if err != nil {
return nil, err
}
return &b2LargeFile{lf}, nil
}
func (b *b2Bucket) listFileNames(ctx context.Context, count int, continuation, prefix, delimiter string) ([]b2FileInterface, string, error) {
fs, c, err := b.b.ListFileNames(ctx, count, continuation, prefix, delimiter)
if err != nil {
return nil, "", err
}
var files []b2FileInterface
for _, f := range fs {
files = append(files, &b2File{f})
}
return files, c, nil
}
func (b *b2Bucket) listFileVersions(ctx context.Context, count int, nextName, nextID, prefix, delimiter string) ([]b2FileInterface, string, string, error) {
fs, name, id, err := b.b.ListFileVersions(ctx, count, nextName, nextID, prefix, delimiter)
if err != nil {
return nil, "", "", err
}
var files []b2FileInterface
for _, f := range fs {
files = append(files, &b2File{f})
}
return files, name, id, nil
}
func (b *b2Bucket) downloadFileByName(ctx context.Context, name string, offset, size int64) (b2FileReaderInterface, error) {
fr, err := b.b.DownloadFileByName(ctx, name, offset, size)
if err != nil {
return nil, err
}
return &b2FileReader{fr}, nil
}
func (b *b2Bucket) hideFile(ctx context.Context, name string) (b2FileInterface, error) {
f, err := b.b.HideFile(ctx, name)
if err != nil {
return nil, err
}
return &b2File{f}, nil
}
func (b *b2Bucket) getDownloadAuthorization(ctx context.Context, p string, v time.Duration) (string, error) {
return b.b.GetDownloadAuthorization(ctx, p, v)
}
func (b *b2Bucket) baseURL() string {
return b.b.BaseURL()
}
func (b *b2URL) uploadFile(ctx context.Context, r io.Reader, size int, name, contentType, sha1 string, info map[string]string) (b2FileInterface, error) {
file, err := b.b.UploadFile(ctx, r, size, name, contentType, sha1, info)
if err != nil {
return nil, err
}
return &b2File{file}, nil
}
func (b *b2URL) reload(ctx context.Context) error {
return b.b.Reload(ctx)
}
func (b *b2File) deleteFileVersion(ctx context.Context) error {
return b.b.DeleteFileVersion(ctx)
}
func (b *b2File) name() string {
return b.b.Name
}
func (b *b2File) size() int64 {
return b.b.Size
}
func (b *b2File) timestamp() time.Time {
return b.b.Timestamp
}
func (b *b2File) status() string {
return b.b.Status
}
func (b *b2File) getFileInfo(ctx context.Context) (b2FileInfoInterface, error) {
fi, err := b.b.GetFileInfo(ctx)
if err != nil {
return nil, err
}
return &b2FileInfo{fi}, nil
}
func (b *b2File) listParts(ctx context.Context, next, count int) ([]b2FilePartInterface, int, error) {
parts, n, err := b.b.ListParts(ctx, next, count)
if err != nil {
return nil, 0, err
}
var rtn []b2FilePartInterface
for _, part := range parts {
rtn = append(rtn, &b2FilePart{part})
}
return rtn, n, nil
}
func (b *b2File) compileParts(size int64, seen map[int]string) b2LargeFileInterface {
return &b2LargeFile{b.b.CompileParts(size, seen)}
}
func (b *b2LargeFile) finishLargeFile(ctx context.Context) (b2FileInterface, error) {
f, err := b.b.FinishLargeFile(ctx)
if err != nil {
return nil, err
}
return &b2File{f}, nil
}
func (b *b2LargeFile) getUploadPartURL(ctx context.Context) (b2FileChunkInterface, error) {
c, err := b.b.GetUploadPartURL(ctx)
if err != nil {
return nil, err
}
return &b2FileChunk{c}, nil
}
func (b *b2FileChunk) reload(ctx context.Context) error {
return b.b.Reload(ctx)
}
func (b *b2FileChunk) uploadPart(ctx context.Context, r io.Reader, sha1 string, size, index int) (int, error) {
return b.b.UploadPart(ctx, r, sha1, size, index)
}
func (b *b2FileReader) Read(p []byte) (int, error) {
return b.b.Read(p)
}
func (b *b2FileReader) Close() error {
return b.b.Close()
}
func (b *b2FileReader) stats() (int, string, string, map[string]string) {
return b.b.ContentLength, b.b.ContentType, b.b.SHA1, b.b.Info
}
func (b *b2FileInfo) stats() (string, string, int64, string, map[string]string, string, time.Time) {
return b.b.Name, b.b.SHA1, b.b.Size, b.b.ContentType, b.b.Info, b.b.Status, b.b.Timestamp
}
func (b *b2FilePart) number() int { return b.b.Number }
func (b *b2FilePart) sha1() string { return b.b.SHA1 }
func (b *b2FilePart) size() int64 { return b.b.Size }

View File

@ -0,0 +1,128 @@
// Copyright 2017, Google
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package b2
import (
"bytes"
"crypto/sha1"
"fmt"
"hash"
"io"
"io/ioutil"
"os"
"sync"
)
type writeBuffer interface {
io.Writer
Len() int
Reader() (io.ReadSeeker, error)
Hash() string // sha1 or whatever it is
Close() error
}
type memoryBuffer struct {
buf *bytes.Buffer
hsh hash.Hash
w io.Writer
mux sync.Mutex
}
var bufpool *sync.Pool
func init() {
bufpool = &sync.Pool{}
bufpool.New = func() interface{} { return &bytes.Buffer{} }
}
func newMemoryBuffer() *memoryBuffer {
mb := &memoryBuffer{
hsh: sha1.New(),
}
mb.buf = bufpool.Get().(*bytes.Buffer)
mb.w = io.MultiWriter(mb.hsh, mb.buf)
return mb
}
type thing struct {
rs io.ReadSeeker
t int
}
func (mb *memoryBuffer) Write(p []byte) (int, error) { return mb.w.Write(p) }
func (mb *memoryBuffer) Len() int { return mb.buf.Len() }
func (mb *memoryBuffer) Reader() (io.ReadSeeker, error) { return bytes.NewReader(mb.buf.Bytes()), nil }
func (mb *memoryBuffer) Hash() string { return fmt.Sprintf("%x", mb.hsh.Sum(nil)) }
func (mb *memoryBuffer) Close() error {
mb.mux.Lock()
defer mb.mux.Unlock()
if mb.buf == nil {
return nil
}
mb.buf.Truncate(0)
bufpool.Put(mb.buf)
mb.buf = nil
return nil
}
type fileBuffer struct {
f *os.File
hsh hash.Hash
w io.Writer
s int
}
func newFileBuffer(loc string) (*fileBuffer, error) {
f, err := ioutil.TempFile(loc, "blazer")
if err != nil {
return nil, err
}
fb := &fileBuffer{
f: f,
hsh: sha1.New(),
}
fb.w = io.MultiWriter(fb.f, fb.hsh)
return fb, nil
}
func (fb *fileBuffer) Write(p []byte) (int, error) {
n, err := fb.w.Write(p)
fb.s += n
return n, err
}
func (fb *fileBuffer) Len() int { return fb.s }
func (fb *fileBuffer) Hash() string { return fmt.Sprintf("%x", fb.hsh.Sum(nil)) }
func (fb *fileBuffer) Reader() (io.ReadSeeker, error) {
if _, err := fb.f.Seek(0, 0); err != nil {
return nil, err
}
return &fr{f: fb.f}, nil
}
func (fb *fileBuffer) Close() error {
fb.f.Close()
return os.Remove(fb.f.Name())
}
// wraps *os.File so that the http package doesn't see it as an io.Closer
type fr struct {
f *os.File
}
func (r *fr) Read(p []byte) (int, error) { return r.f.Read(p) }
func (r *fr) Seek(a int64, b int) (int64, error) { return r.f.Seek(a, b) }

View File

@ -0,0 +1,688 @@
// Copyright 2016, Google
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package b2
import (
"bytes"
"crypto/sha1"
"fmt"
"io"
"net/http"
"os"
"reflect"
"testing"
"time"
"github.com/kurin/blazer/base"
"golang.org/x/net/context"
)
const (
apiID = "B2_ACCOUNT_ID"
apiKey = "B2_SECRET_KEY"
errVar = "B2_TRANSIENT_ERRORS"
)
func init() {
fail := os.Getenv(errVar)
switch fail {
case "", "0", "false":
return
}
base.FailSomeUploads = true
base.ExpireSomeAuthTokens = true
}
func TestReadWriteLive(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 10*time.Minute)
defer cancel()
bucket, done := startLiveTest(ctx, t)
defer done()
sobj, wsha, err := writeFile(ctx, bucket, smallFileName, 1e6-42, 1e8)
if err != nil {
t.Fatal(err)
}
lobj, wshaL, err := writeFile(ctx, bucket, largeFileName, 5e6+5e4, 5e6)
if err != nil {
t.Fatal(err)
}
if err := readFile(ctx, lobj, wshaL, 1e6, 10); err != nil {
t.Error(err)
}
if err := readFile(ctx, sobj, wsha, 1e5, 10); err != nil {
t.Error(err)
}
var cur *Cursor
for {
objs, c, err := bucket.ListObjects(ctx, 100, cur)
if err != nil && err != io.EOF {
t.Fatal(err)
}
for _, o := range objs {
if err := o.Delete(ctx); err != nil {
t.Error(err)
}
}
if err == io.EOF {
break
}
cur = c
}
}
func TestHideShowLive(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 10*time.Minute)
defer cancel()
bucket, done := startLiveTest(ctx, t)
defer done()
// write a file
obj, _, err := writeFile(ctx, bucket, smallFileName, 1e6+42, 1e8)
if err != nil {
t.Fatal(err)
}
got, err := countObjects(ctx, bucket.ListCurrentObjects)
if err != nil {
t.Error(err)
}
if got != 1 {
t.Fatalf("got %d objects, wanted 1", got)
}
// When the hide marker and the object it's hiding were created within the
// same second, they can be sorted in the wrong order, causing the object to
// fail to be hidden.
time.Sleep(1500 * time.Millisecond)
// hide the file
if err := obj.Hide(ctx); err != nil {
t.Fatal(err)
}
got, err = countObjects(ctx, bucket.ListCurrentObjects)
if err != nil {
t.Error(err)
}
if got != 0 {
t.Fatalf("got %d objects, wanted 0", got)
}
// unhide the file
if err := bucket.Reveal(ctx, smallFileName); err != nil {
t.Fatal(err)
}
// count see the object again
got, err = countObjects(ctx, bucket.ListCurrentObjects)
if err != nil {
t.Error(err)
}
if got != 1 {
t.Fatalf("got %d objects, wanted 1", got)
}
}
func TestResumeWriter(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 10*time.Minute)
bucket, _ := startLiveTest(ctx, t)
w := bucket.Object("foo").NewWriter(ctx)
w.ChunkSize = 5e6
r := io.LimitReader(zReader{}, 15e6)
go func() {
// Cancel the context after the first chunk has been written.
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
defer cancel()
for range ticker.C {
if w.cidx > 1 {
return
}
}
}()
if _, err := io.Copy(w, r); err != context.Canceled {
t.Fatalf("io.Copy: wanted canceled context, got: %v", err)
}
ctx2 := context.Background()
ctx2, cancel2 := context.WithTimeout(ctx2, 10*time.Minute)
defer cancel2()
bucket2, done := startLiveTest(ctx2, t)
defer done()
w2 := bucket2.Object("foo").NewWriter(ctx2)
w2.ChunkSize = 5e6
r2 := io.LimitReader(zReader{}, 15e6)
h1 := sha1.New()
tr := io.TeeReader(r2, h1)
w2.Resume = true
w2.ConcurrentUploads = 2
if _, err := io.Copy(w2, tr); err != nil {
t.Fatal(err)
}
if err := w2.Close(); err != nil {
t.Fatal(err)
}
begSHA := fmt.Sprintf("%x", h1.Sum(nil))
objR := bucket2.Object("foo").NewReader(ctx2)
objR.ConcurrentDownloads = 3
h2 := sha1.New()
if _, err := io.Copy(h2, objR); err != nil {
t.Fatal(err)
}
if err := objR.Close(); err != nil {
t.Error(err)
}
endSHA := fmt.Sprintf("%x", h2.Sum(nil))
if endSHA != begSHA {
t.Errorf("got conflicting hashes: got %q, want %q", endSHA, begSHA)
}
}
func TestAttrs(t *testing.T) {
// TODO: test is flaky
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 10*time.Minute)
defer cancel()
bucket, done := startLiveTest(ctx, t)
defer done()
attrlist := []*Attrs{
&Attrs{
ContentType: "jpeg/stream",
Info: map[string]string{
"one": "a",
"two": "b",
},
},
&Attrs{
ContentType: "application/MAGICFACE",
LastModified: time.Unix(1464370149, 142000000),
Info: map[string]string{}, // can't be nil
},
}
table := []struct {
name string
size int64
}{
{
name: "small",
size: 1e3,
},
{
name: "large",
size: 5e6 + 4,
},
}
for _, e := range table {
for _, attrs := range attrlist {
o := bucket.Object(e.name)
w := o.NewWriter(ctx).WithAttrs(attrs)
if _, err := io.Copy(w, io.LimitReader(zReader{}, e.size)); err != nil {
t.Error(err)
continue
}
if err := w.Close(); err != nil {
t.Error(err)
continue
}
gotAttrs, err := bucket.Object(e.name).Attrs(ctx)
if err != nil {
t.Error(err)
continue
}
if gotAttrs.ContentType != attrs.ContentType {
t.Errorf("bad content-type for %s: got %q, want %q", e.name, gotAttrs.ContentType, attrs.ContentType)
}
if !reflect.DeepEqual(gotAttrs.Info, attrs.Info) {
t.Errorf("bad info for %s: got %#v, want %#v", e.name, gotAttrs.Info, attrs.Info)
}
if !gotAttrs.LastModified.Equal(attrs.LastModified) {
t.Errorf("bad lastmodified time for %s: got %v, want %v", e.name, gotAttrs.LastModified, attrs.LastModified)
}
if err := o.Delete(ctx); err != nil {
t.Errorf("Object(%q).Delete: %v", e.name, err)
}
}
}
}
func TestFileBufferLive(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, time.Minute)
defer cancel()
bucket, done := startLiveTest(ctx, t)
defer done()
r := io.LimitReader(zReader{}, 1e6)
w := bucket.Object("small").NewWriter(ctx)
w.UseFileBuffer = true
w.Write(nil)
wb, ok := w.w.(*fileBuffer)
if !ok {
t.Fatalf("writer isn't using file buffer: %T", w.w)
}
smallTmpName := wb.f.Name()
if _, err := io.Copy(w, r); err != nil {
t.Errorf("creating small file: %v", err)
}
if err := w.Close(); err != nil {
t.Errorf("w.Close(): %v", err)
}
if _, err := os.Stat(smallTmpName); !os.IsNotExist(err) {
t.Errorf("tmp file exists (%s) or other error: %v", smallTmpName, err)
}
}
func TestAuthTokLive(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, time.Minute)
defer cancel()
bucket, done := startLiveTest(ctx, t)
defer done()
foo := "foo/bar"
baz := "baz/bar"
fw := bucket.Object(foo).NewWriter(ctx)
io.Copy(fw, io.LimitReader(zReader{}, 1e5))
if err := fw.Close(); err != nil {
t.Fatal(err)
}
bw := bucket.Object(baz).NewWriter(ctx)
io.Copy(bw, io.LimitReader(zReader{}, 1e5))
if err := bw.Close(); err != nil {
t.Fatal(err)
}
tok, err := bucket.AuthToken(ctx, "foo", time.Hour)
if err != nil {
t.Fatal(err)
}
furl := fmt.Sprintf("%s?Authorization=%s", bucket.Object(foo).URL(), tok)
frsp, err := http.Get(furl)
if err != nil {
t.Fatal(err)
}
if frsp.StatusCode != 200 {
t.Fatalf("%s: got %s, want 200", furl, frsp.Status)
}
burl := fmt.Sprintf("%s?Authorization=%s", bucket.Object(baz).URL(), tok)
brsp, err := http.Get(burl)
if err != nil {
t.Fatal(err)
}
if brsp.StatusCode != 401 {
t.Fatalf("%s: got %s, want 401", burl, brsp.Status)
}
}
func TestRangeReaderLive(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, time.Minute)
defer cancel()
bucket, done := startLiveTest(ctx, t)
defer done()
buf := &bytes.Buffer{}
io.Copy(buf, io.LimitReader(zReader{}, 3e6))
rs := bytes.NewReader(buf.Bytes())
w := bucket.Object("foobar").NewWriter(ctx)
if _, err := io.Copy(w, rs); err != nil {
t.Fatal(err)
}
if err := w.Close(); err != nil {
t.Fatal(err)
}
table := []struct {
offset, length int64
size int64 // expected actual size
}{
{
offset: 1e6 - 50,
length: 1e6 + 50,
size: 1e6 + 50,
},
{
offset: 0,
length: -1,
size: 3e6,
},
{
offset: 2e6,
length: -1,
size: 1e6,
},
{
offset: 2e6,
length: 2e6,
size: 1e6,
},
}
for _, e := range table {
if _, err := rs.Seek(e.offset, 0); err != nil {
t.Error(err)
continue
}
hw := sha1.New()
var lr io.Reader
lr = rs
if e.length >= 0 {
lr = io.LimitReader(rs, e.length)
}
if _, err := io.Copy(hw, lr); err != nil {
t.Error(err)
continue
}
r := bucket.Object("foobar").NewRangeReader(ctx, e.offset, e.length)
defer r.Close()
hr := sha1.New()
read, err := io.Copy(hr, r)
if err != nil {
t.Error(err)
continue
}
if read != e.size {
t.Errorf("read %d bytes, wanted %d bytes", read, e.size)
}
got := fmt.Sprintf("%x", hr.Sum(nil))
want := fmt.Sprintf("%x", hw.Sum(nil))
if got != want {
t.Errorf("bad hash, got %q, want %q", got, want)
}
}
}
func TestListObjectsWithPrefix(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, time.Minute)
defer cancel()
bucket, done := startLiveTest(ctx, t)
defer done()
foo := "foo/bar"
baz := "baz/bar"
fw := bucket.Object(foo).NewWriter(ctx)
io.Copy(fw, io.LimitReader(zReader{}, 1e5))
if err := fw.Close(); err != nil {
t.Fatal(err)
}
bw := bucket.Object(baz).NewWriter(ctx)
io.Copy(bw, io.LimitReader(zReader{}, 1e5))
if err := bw.Close(); err != nil {
t.Fatal(err)
}
// This is kind of a hack, but
type lfun func(context.Context, int, *Cursor) ([]*Object, *Cursor, error)
for _, f := range []lfun{bucket.ListObjects, bucket.ListCurrentObjects} {
c := &Cursor{
Prefix: "baz/",
}
var res []string
for {
objs, cur, err := f(ctx, 10, c)
if err != nil && err != io.EOF {
t.Fatalf("bucket.ListObjects: %v", err)
}
for _, o := range objs {
attrs, err := o.Attrs(ctx)
if err != nil {
t.Errorf("(%v).Attrs: %v", o, err)
continue
}
res = append(res, attrs.Name)
}
if err == io.EOF {
break
}
c = cur
}
want := []string{"baz/bar"}
if !reflect.DeepEqual(res, want) {
t.Errorf("got %v, want %v", res, want)
}
}
}
func compare(a, b *BucketAttrs) bool {
if a == nil {
a = &BucketAttrs{}
}
if b == nil {
b = &BucketAttrs{}
}
if a.Type != b.Type && !((a.Type == "" && b.Type == Private) || (a.Type == Private && b.Type == "")) {
return false
}
if !reflect.DeepEqual(a.Info, b.Info) && (len(a.Info) > 0 || len(b.Info) > 0) {
return false
}
return reflect.DeepEqual(a.LifecycleRules, b.LifecycleRules)
}
func TestNewBucket(t *testing.T) {
id := os.Getenv(apiID)
key := os.Getenv(apiKey)
if id == "" || key == "" {
t.Skipf("B2_ACCOUNT_ID or B2_SECRET_KEY unset; skipping integration tests")
}
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 2*time.Minute)
defer cancel()
client, err := NewClient(ctx, id, key)
if err != nil {
t.Fatal(err)
}
table := []struct {
name string
attrs *BucketAttrs
}{
{
name: "no-attrs",
},
{
name: "only-rules",
attrs: &BucketAttrs{
LifecycleRules: []LifecycleRule{
{
Prefix: "whee/",
DaysHiddenUntilDeleted: 30,
},
{
Prefix: "whoa/",
DaysNewUntilHidden: 1,
},
},
},
},
{
name: "only-info",
attrs: &BucketAttrs{
Info: map[string]string{
"this": "that",
"other": "thing",
},
},
},
}
for _, ent := range table {
bucket, err := client.NewBucket(ctx, id+"-"+ent.name, ent.attrs)
if err != nil {
t.Errorf("%s: NewBucket(%v): %v", ent.name, ent.attrs, err)
continue
}
defer bucket.Delete(ctx)
if err := bucket.Update(ctx, nil); err != nil {
t.Errorf("%s: Update(ctx, nil): %v", ent.name, err)
continue
}
attrs, err := bucket.Attrs(ctx)
if err != nil {
t.Errorf("%s: Attrs(ctx): %v", ent.name, err)
continue
}
if !compare(attrs, ent.attrs) {
t.Errorf("%s: attrs disagree: got %v, want %v", ent.name, attrs, ent.attrs)
}
}
}
func TestDuelingBuckets(t *testing.T) {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 10*time.Minute)
defer cancel()
bucket, done := startLiveTest(ctx, t)
defer done()
bucket2, done2 := startLiveTest(ctx, t)
defer done2()
attrs, err := bucket.Attrs(ctx)
if err != nil {
t.Fatal(err)
}
attrs2, err := bucket2.Attrs(ctx)
if err != nil {
t.Fatal(err)
}
attrs.Info["food"] = "yum"
if err := bucket.Update(ctx, attrs); err != nil {
t.Fatal(err)
}
attrs2.Info["nails"] = "not"
if err := bucket2.Update(ctx, attrs2); !IsUpdateConflict(err) {
t.Fatalf("bucket.Update should have failed with IsUpdateConflict; instead failed with %v", err)
}
attrs2, err = bucket2.Attrs(ctx)
if err != nil {
t.Fatal(err)
}
attrs2.Info["nails"] = "not"
if err := bucket2.Update(ctx, nil); err != nil {
t.Fatal(err)
}
if err := bucket2.Update(ctx, attrs2); err != nil {
t.Fatal(err)
}
}
type object struct {
o *Object
err error
}
func countObjects(ctx context.Context, f func(context.Context, int, *Cursor) ([]*Object, *Cursor, error)) (int, error) {
var got int
ch := listObjects(ctx, f)
for c := range ch {
if c.err != nil {
return 0, c.err
}
got++
}
return got, nil
}
func listObjects(ctx context.Context, f func(context.Context, int, *Cursor) ([]*Object, *Cursor, error)) <-chan object {
ch := make(chan object)
go func() {
defer close(ch)
var cur *Cursor
for {
objs, c, err := f(ctx, 100, cur)
if err != nil && err != io.EOF {
ch <- object{err: err}
return
}
for _, o := range objs {
ch <- object{o: o}
}
if err == io.EOF {
return
}
cur = c
}
}()
return ch
}
func startLiveTest(ctx context.Context, t *testing.T) (*Bucket, func()) {
id := os.Getenv(apiID)
key := os.Getenv(apiKey)
if id == "" || key == "" {
t.Skipf("B2_ACCOUNT_ID or B2_SECRET_KEY unset; skipping integration tests")
return nil, nil
}
client, err := NewClient(ctx, id, key)
if err != nil {
t.Fatal(err)
return nil, nil
}
bucket, err := client.NewBucket(ctx, id+"-"+bucketName, nil)
if err != nil {
t.Fatal(err)
return nil, nil
}
f := func() {
for c := range listObjects(ctx, bucket.ListObjects) {
if c.err != nil {
continue
}
if err := c.o.Delete(ctx); err != nil {
t.Error(err)
}
}
if err := bucket.Delete(ctx); err != nil && !IsNotExist(err) {
t.Error(err)
}
}
return bucket, f
}

View File

@ -0,0 +1,102 @@
// Copyright 2017, Google
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package b2
import "fmt"
// StatusInfo reports information about a client.
type StatusInfo struct {
Writers map[string]*WriterStatus
Readers map[string]*ReaderStatus
}
// WriterStatus reports the status for each writer.
type WriterStatus struct {
// Progress is a slice of completion ratios. The index of a ratio is its
// chunk id less one.
Progress []float64
}
// ReaderStatus reports the status for each reader.
type ReaderStatus struct {
// Progress is a slice of completion ratios. The index of a ratio is its
// chunk id less one.
Progress []float64
}
// Status returns information about the current state of the client.
func (c *Client) Status() *StatusInfo {
c.slock.Lock()
defer c.slock.Unlock()
si := &StatusInfo{
Writers: make(map[string]*WriterStatus),
Readers: make(map[string]*ReaderStatus),
}
for name, w := range c.sWriters {
si.Writers[name] = w.status()
}
for name, r := range c.sReaders {
si.Readers[name] = r.status()
}
return si
}
func (c *Client) addWriter(w *Writer) {
c.slock.Lock()
defer c.slock.Unlock()
if c.sWriters == nil {
c.sWriters = make(map[string]*Writer)
}
c.sWriters[fmt.Sprintf("%s/%s", w.o.b.Name(), w.name)] = w
}
func (c *Client) removeWriter(w *Writer) {
c.slock.Lock()
defer c.slock.Unlock()
if c.sWriters == nil {
return
}
delete(c.sWriters, fmt.Sprintf("%s/%s", w.o.b.Name(), w.name))
}
func (c *Client) addReader(r *Reader) {
c.slock.Lock()
defer c.slock.Unlock()
if c.sReaders == nil {
c.sReaders = make(map[string]*Reader)
}
c.sReaders[fmt.Sprintf("%s/%s", r.o.b.Name(), r.name)] = r
}
func (c *Client) removeReader(r *Reader) {
c.slock.Lock()
defer c.slock.Unlock()
if c.sReaders == nil {
return
}
delete(c.sReaders, fmt.Sprintf("%s/%s", r.o.b.Name(), r.name))
}

View File

@ -0,0 +1,299 @@
// Copyright 2016, Google
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package b2
import (
"bytes"
"io"
"sync"
"github.com/kurin/blazer/internal/blog"
"golang.org/x/net/context"
)
// Reader reads files from B2.
type Reader struct {
// ConcurrentDownloads is the number of simultaneous downloads to pull from
// B2. Values greater than one will cause B2 to make multiple HTTP requests
// for a given file, increasing available bandwidth at the cost of buffering
// the downloads in memory.
ConcurrentDownloads int
// ChunkSize is the size to fetch per ConcurrentDownload. The default is
// 10MB.
ChunkSize int
ctx context.Context
cancel context.CancelFunc // cancels ctx
o *Object
name string
offset int64 // the start of the file
length int64 // the length to read, or -1
size int64 // the end of the file, in absolute terms
csize int // chunk size
read int // amount read
chwid int // chunks written
chrid int // chunks read
chbuf chan *bytes.Buffer
init sync.Once
rmux sync.Mutex // guards rcond
rcond *sync.Cond
chunks map[int]*bytes.Buffer
emux sync.RWMutex // guards err, believe it or not
err error
smux sync.Mutex
smap map[int]*meteredReader
}
// Close frees resources associated with the download.
func (r *Reader) Close() error {
r.cancel()
r.o.b.c.removeReader(r)
return nil
}
func (r *Reader) setErr(err error) {
r.emux.Lock()
defer r.emux.Unlock()
if r.err == nil {
r.err = err
r.cancel()
}
}
func (r *Reader) setErrNoCancel(err error) {
r.emux.Lock()
defer r.emux.Unlock()
if r.err == nil {
r.err = err
}
}
func (r *Reader) getErr() error {
r.emux.RLock()
defer r.emux.RUnlock()
return r.err
}
func (r *Reader) thread() {
go func() {
for {
var buf *bytes.Buffer
select {
case b, ok := <-r.chbuf:
if !ok {
return
}
buf = b
case <-r.ctx.Done():
return
}
r.rmux.Lock()
chunkID := r.chwid
r.chwid++
r.rmux.Unlock()
offset := int64(chunkID*r.csize) + r.offset
size := int64(r.csize)
if offset >= r.size {
// Send an empty chunk. This is necessary to prevent a deadlock when
// this is the very first chunk.
r.rmux.Lock()
r.chunks[chunkID] = buf
r.rmux.Unlock()
r.rcond.Broadcast()
return
}
if offset+size > r.size {
size = r.size - offset
}
redo:
fr, err := r.o.b.b.downloadFileByName(r.ctx, r.name, offset, size)
if err != nil {
r.setErr(err)
r.rcond.Broadcast()
return
}
mr := &meteredReader{r: &fakeSeeker{fr}, size: int(size)}
r.smux.Lock()
r.smap[chunkID] = mr
r.smux.Unlock()
i, err := copyContext(r.ctx, buf, mr)
r.smux.Lock()
r.smap[chunkID] = nil
r.smux.Unlock()
if i < size || err == io.ErrUnexpectedEOF {
// Probably the network connection was closed early. Retry.
blog.V(1).Infof("b2 reader %d: got %dB of %dB; retrying", chunkID, i, size)
buf.Reset()
goto redo
}
if err != nil {
r.setErr(err)
r.rcond.Broadcast()
return
}
r.rmux.Lock()
r.chunks[chunkID] = buf
r.rmux.Unlock()
r.rcond.Broadcast()
}
}()
}
func (r *Reader) curChunk() (*bytes.Buffer, error) {
ch := make(chan *bytes.Buffer)
go func() {
r.rmux.Lock()
defer r.rmux.Unlock()
for r.chunks[r.chrid] == nil && r.getErr() == nil && r.ctx.Err() == nil {
r.rcond.Wait()
}
select {
case ch <- r.chunks[r.chrid]:
case <-r.ctx.Done():
return
}
}()
select {
case buf := <-ch:
return buf, r.getErr()
case <-r.ctx.Done():
if r.getErr() != nil {
return nil, r.getErr()
}
return nil, r.ctx.Err()
}
}
func (r *Reader) initFunc() {
r.smux.Lock()
r.smap = make(map[int]*meteredReader)
r.smux.Unlock()
r.o.b.c.addReader(r)
if err := r.o.ensure(r.ctx); err != nil {
r.setErr(err)
return
}
r.size = r.o.f.size()
if r.length >= 0 && r.offset+r.length < r.size {
r.size = r.offset + r.length
}
if r.offset > r.size {
r.offset = r.size
}
r.rcond = sync.NewCond(&r.rmux)
cr := r.ConcurrentDownloads
if cr < 1 {
cr = 1
}
if r.ChunkSize < 1 {
r.ChunkSize = 1e7
}
r.csize = r.ChunkSize
r.chbuf = make(chan *bytes.Buffer, cr)
for i := 0; i < cr; i++ {
r.thread()
r.chbuf <- &bytes.Buffer{}
}
}
func (r *Reader) Read(p []byte) (int, error) {
if err := r.getErr(); err != nil {
return 0, err
}
// TODO: check the SHA1 hash here and verify it on Close.
r.init.Do(r.initFunc)
chunk, err := r.curChunk()
if err != nil {
r.setErrNoCancel(err)
return 0, err
}
n, err := chunk.Read(p)
r.read += n
if err == io.EOF {
if int64(r.read) >= r.size-r.offset {
close(r.chbuf)
r.setErrNoCancel(err)
return n, err
}
r.chrid++
chunk.Reset()
r.chbuf <- chunk
err = nil
}
r.setErrNoCancel(err)
return n, err
}
func (r *Reader) status() *ReaderStatus {
r.smux.Lock()
defer r.smux.Unlock()
rs := &ReaderStatus{
Progress: make([]float64, len(r.smap)),
}
for i := 1; i <= len(r.smap); i++ {
rs.Progress[i-1] = r.smap[i].done()
}
return rs
}
// copied from io.Copy, basically.
func copyContext(ctx context.Context, dst io.Writer, src io.Reader) (written int64, err error) {
buf := make([]byte, 32*1024)
for {
if ctx.Err() != nil {
err = ctx.Err()
return
}
nr, er := src.Read(buf)
if nr > 0 {
nw, ew := dst.Write(buf[0:nr])
if nw > 0 {
written += int64(nw)
}
if ew != nil {
err = ew
break
}
if nr != nw {
err = io.ErrShortWrite
break
}
}
if er == io.EOF {
break
}
if er != nil {
err = er
break
}
}
return written, err
}
// fakeSeeker exists so that we can wrap the http response body (an io.Reader
// but not an io.Seeker) into a meteredReader, which will allow us to keep tabs
// on how much of the chunk we've read so far.
type fakeSeeker struct {
io.Reader
}
func (fs *fakeSeeker) Seek(int64, int) (int64, error) { return 0, nil }

View File

@ -0,0 +1,441 @@
// Copyright 2016, Google
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package b2
import (
"errors"
"fmt"
"io"
"sync"
"sync/atomic"
"time"
"github.com/kurin/blazer/internal/blog"
"golang.org/x/net/context"
)
// Writer writes data into Backblaze. It automatically switches to the large
// file API if the file exceeds ChunkSize bytes. Due to that and other
// Backblaze API details, there is a large buffer.
//
// Changes to public Writer attributes must be made before the first call to
// Write.
type Writer struct {
// ConcurrentUploads is number of different threads sending data concurrently
// to Backblaze for large files. This can increase performance greatly, as
// each thread will hit a different endpoint. However, there is a ChunkSize
// buffer for each thread. Values less than 1 are equivalent to 1.
ConcurrentUploads int
// Resume an upload. If true, and the upload is a large file, and a file of
// the same name was started but not finished, then assume that we are
// resuming that file, and don't upload duplicate chunks.
Resume bool
// ChunkSize is the size, in bytes, of each individual part, when writing
// large files, and also when determining whether to upload a file normally
// or when to split it into parts. The default is 100M (1e8) The minimum is
// 5M (5e6); values less than this are not an error, but will fail. The
// maximum is 5GB (5e9).
ChunkSize int
// UseFileBuffer controls whether to use an in-memory buffer (the default) or
// scratch space on the file system. If this is true, b2 will save chunks in
// FileBufferDir.
UseFileBuffer bool
// FileBufferDir specifies the directory where scratch files are kept. If
// blank, os.TempDir() is used.
FileBufferDir string
contentType string
info map[string]string
csize int
ctx context.Context
cancel context.CancelFunc
ready chan chunk
wg sync.WaitGroup
start sync.Once
once sync.Once
done sync.Once
file beLargeFileInterface
seen map[int]string
o *Object
name string
cidx int
w writeBuffer
emux sync.RWMutex
err error
smux sync.RWMutex
smap map[int]*meteredReader
}
type chunk struct {
id int
buf writeBuffer
}
func (w *Writer) getBuffer() (writeBuffer, error) {
if !w.UseFileBuffer {
return newMemoryBuffer(), nil
}
return newFileBuffer(w.FileBufferDir)
}
func (w *Writer) setErr(err error) {
if err == nil {
return
}
w.emux.Lock()
defer w.emux.Unlock()
if w.err == nil {
blog.V(0).Infof("error writing %s: %v", w.name, err)
w.err = err
w.cancel()
}
}
func (w *Writer) getErr() error {
w.emux.RLock()
defer w.emux.RUnlock()
return w.err
}
func (w *Writer) registerChunk(id int, r *meteredReader) {
w.smux.Lock()
w.smap[id] = r
w.smux.Unlock()
}
func (w *Writer) completeChunk(id int) {
w.smux.Lock()
w.smap[id] = nil
w.smux.Unlock()
}
var gid int32
func (w *Writer) thread() {
go func() {
id := atomic.AddInt32(&gid, 1)
fc, err := w.file.getUploadPartURL(w.ctx)
if err != nil {
w.setErr(err)
return
}
w.wg.Add(1)
defer w.wg.Done()
for {
chunk, ok := <-w.ready
if !ok {
return
}
if sha, ok := w.seen[chunk.id]; ok {
if sha != chunk.buf.Hash() {
w.setErr(errors.New("resumable upload was requested, but chunks don't match!"))
return
}
chunk.buf.Close()
w.completeChunk(chunk.id)
blog.V(2).Infof("skipping chunk %d", chunk.id)
continue
}
blog.V(2).Infof("thread %d handling chunk %d", id, chunk.id)
r, err := chunk.buf.Reader()
if err != nil {
w.setErr(err)
return
}
mr := &meteredReader{r: r, size: chunk.buf.Len()}
w.registerChunk(chunk.id, mr)
sleep := time.Millisecond * 15
redo:
n, err := fc.uploadPart(w.ctx, mr, chunk.buf.Hash(), chunk.buf.Len(), chunk.id)
if n != chunk.buf.Len() || err != nil {
if w.o.b.r.reupload(err) {
time.Sleep(sleep)
sleep *= 2
if sleep > time.Second*15 {
sleep = time.Second * 15
}
blog.V(1).Infof("b2 writer: wrote %d of %d: error: %v; retrying", n, chunk.buf.Len(), err)
f, err := w.file.getUploadPartURL(w.ctx)
if err != nil {
w.setErr(err)
w.completeChunk(chunk.id)
chunk.buf.Close() // TODO: log error
return
}
fc = f
goto redo
}
w.setErr(err)
w.completeChunk(chunk.id)
chunk.buf.Close() // TODO: log error
return
}
w.completeChunk(chunk.id)
chunk.buf.Close() // TODO: log error
blog.V(2).Infof("chunk %d handled", chunk.id)
}
}()
}
// Write satisfies the io.Writer interface.
func (w *Writer) Write(p []byte) (int, error) {
w.start.Do(func() {
w.smux.Lock()
w.smap = make(map[int]*meteredReader)
w.smux.Unlock()
w.o.b.c.addWriter(w)
w.csize = w.ChunkSize
if w.csize == 0 {
w.csize = 1e8
}
v, err := w.getBuffer()
if err != nil {
w.setErr(err)
return
}
w.w = v
})
if err := w.getErr(); err != nil {
return 0, err
}
left := w.csize - w.w.Len()
if len(p) < left {
return w.w.Write(p)
}
i, err := w.w.Write(p[:left])
if err != nil {
w.setErr(err)
return i, err
}
if err := w.sendChunk(); err != nil {
w.setErr(err)
return i, w.getErr()
}
k, err := w.Write(p[left:])
if err != nil {
w.setErr(err)
}
return i + k, err
}
func (w *Writer) simpleWriteFile() error {
ue, err := w.o.b.b.getUploadURL(w.ctx)
if err != nil {
return err
}
sha1 := w.w.Hash()
ctype := w.contentType
if ctype == "" {
ctype = "application/octet-stream"
}
r, err := w.w.Reader()
if err != nil {
return err
}
mr := &meteredReader{r: r, size: w.w.Len()}
w.registerChunk(1, mr)
defer w.completeChunk(1)
redo:
f, err := ue.uploadFile(w.ctx, mr, int(w.w.Len()), w.name, ctype, sha1, w.info)
if err != nil {
if w.o.b.r.reupload(err) {
blog.V(1).Infof("b2 writer: %v; retrying", err)
u, err := w.o.b.b.getUploadURL(w.ctx)
if err != nil {
return err
}
ue = u
goto redo
}
return err
}
w.o.f = f
return nil
}
func (w *Writer) getLargeFile() (beLargeFileInterface, error) {
if !w.Resume {
ctype := w.contentType
if ctype == "" {
ctype = "application/octet-stream"
}
return w.o.b.b.startLargeFile(w.ctx, w.name, ctype, w.info)
}
next := 1
seen := make(map[int]string)
var size int64
var fi beFileInterface
for {
cur := &Cursor{name: w.name}
objs, _, err := w.o.b.ListObjects(w.ctx, 1, cur)
if err != nil {
return nil, err
}
if len(objs) < 1 || objs[0].name != w.name {
w.Resume = false
return w.getLargeFile()
}
fi = objs[0].f
parts, n, err := fi.listParts(w.ctx, next, 100)
if err != nil {
return nil, err
}
next = n
for _, p := range parts {
seen[p.number()] = p.sha1()
size += p.size()
}
if len(parts) == 0 {
break
}
if next == 0 {
break
}
}
w.seen = make(map[int]string) // copy the map
for id, sha := range seen {
w.seen[id] = sha
}
return fi.compileParts(size, seen), nil
}
func (w *Writer) sendChunk() error {
var err error
w.once.Do(func() {
lf, e := w.getLargeFile()
if e != nil {
err = e
return
}
w.file = lf
w.ready = make(chan chunk)
if w.ConcurrentUploads < 1 {
w.ConcurrentUploads = 1
}
for i := 0; i < w.ConcurrentUploads; i++ {
w.thread()
}
})
if err != nil {
return err
}
select {
case w.ready <- chunk{
id: w.cidx + 1,
buf: w.w,
}:
case <-w.ctx.Done():
return w.ctx.Err()
}
w.cidx++
v, err := w.getBuffer()
if err != nil {
return err
}
w.w = v
return nil
}
// Close satisfies the io.Closer interface. It is critical to check the return
// value of Close on all writers.
func (w *Writer) Close() error {
w.done.Do(func() {
defer w.o.b.c.removeWriter(w)
defer w.w.Close() // TODO: log error
if w.cidx == 0 {
w.setErr(w.simpleWriteFile())
return
}
if w.w.Len() > 0 {
if err := w.sendChunk(); err != nil {
w.setErr(err)
return
}
}
close(w.ready)
w.wg.Wait()
f, err := w.file.finishLargeFile(w.ctx)
if err != nil {
w.setErr(err)
return
}
w.o.f = f
})
return w.getErr()
}
// WithAttrs sets the writable attributes of the resulting file to given
// values. WithAttrs must be called before the first call to Write.
func (w *Writer) WithAttrs(attrs *Attrs) *Writer {
w.contentType = attrs.ContentType
w.info = make(map[string]string)
for k, v := range attrs.Info {
w.info[k] = v
}
if len(w.info) < 10 && !attrs.LastModified.IsZero() {
w.info["src_last_modified_millis"] = fmt.Sprintf("%d", attrs.LastModified.UnixNano()/1e6)
}
return w
}
func (w *Writer) status() *WriterStatus {
w.smux.RLock()
defer w.smux.RUnlock()
ws := &WriterStatus{
Progress: make([]float64, len(w.smap)),
}
for i := 1; i <= len(w.smap); i++ {
ws.Progress[i-1] = w.smap[i].done()
}
return ws
}
type meteredReader struct {
read int64
size int
r io.ReadSeeker
}
func (mr *meteredReader) Read(p []byte) (int, error) {
n, err := mr.r.Read(p)
atomic.AddInt64(&mr.read, int64(n))
return n, err
}
func (mr *meteredReader) Seek(offset int64, whence int) (int64, error) {
atomic.StoreInt64(&mr.read, offset)
return mr.r.Seek(offset, whence)
}
func (mr *meteredReader) done() float64 {
if mr == nil {
return 1
}
read := float64(atomic.LoadInt64(&mr.read))
return read / float64(mr.size)
}

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,279 @@
// Copyright 2016, Google
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package base
import (
"bytes"
"crypto/sha1"
"fmt"
"io"
"os"
"reflect"
"testing"
"time"
"golang.org/x/net/context"
)
const (
apiID = "B2_ACCOUNT_ID"
apiKey = "B2_SECRET_KEY"
)
const (
bucketName = "base-tests"
smallFileName = "TeenyTiny"
largeFileName = "BigBytes"
)
type zReader struct{}
func (zReader) Read(p []byte) (int, error) {
return len(p), nil
}
func TestStorage(t *testing.T) {
id := os.Getenv(apiID)
key := os.Getenv(apiKey)
if id == "" || key == "" {
t.Skipf("B2_ACCOUNT_ID or B2_SECRET_KEY unset; skipping integration tests")
}
ctx := context.Background()
// b2_authorize_account
b2, err := AuthorizeAccount(ctx, id, key)
if err != nil {
t.Fatal(err)
}
// b2_create_bucket
infoKey := "key"
infoVal := "val"
m := map[string]string{infoKey: infoVal}
rules := []LifecycleRule{
{
Prefix: "what/",
DaysNewUntilHidden: 5,
},
}
bname := id + "-" + bucketName
bucket, err := b2.CreateBucket(ctx, bname, "", m, rules)
if err != nil {
t.Fatal(err)
}
if bucket.Info[infoKey] != infoVal {
t.Errorf("%s: bucketInfo[%q] got %q, want %q", bucket.Name, infoKey, bucket.Info[infoKey], infoVal)
}
if len(bucket.LifecycleRules) != 1 {
t.Errorf("%s: lifecycle rules: got %d rules, wanted 1", bucket.Name, len(bucket.LifecycleRules))
}
defer func() {
// b2_delete_bucket
if err := bucket.DeleteBucket(ctx); err != nil {
t.Error(err)
}
}()
// b2_update_bucket
bucket.Info["new"] = "yay"
bucket.LifecycleRules = nil // Unset options should be a noop.
newBucket, err := bucket.Update(ctx)
if err != nil {
t.Errorf("%s: update bucket: %v", bucket.Name, err)
return
}
bucket = newBucket
if bucket.Info["new"] != "yay" {
t.Errorf("%s: info key \"new\": got %s, want \"yay\"", bucket.Name, bucket.Info["new"])
}
if len(bucket.LifecycleRules) != 1 {
t.Errorf("%s: lifecycle rules: got %d rules, wanted 1", bucket.Name, len(bucket.LifecycleRules))
}
// b2_list_buckets
buckets, err := b2.ListBuckets(ctx)
if err != nil {
t.Fatal(err)
}
var found bool
for _, bucket := range buckets {
if bucket.Name == bname {
found = true
break
}
}
if !found {
t.Errorf("%s: new bucket not found", bname)
}
// b2_get_upload_url
ue, err := bucket.GetUploadURL(ctx)
if err != nil {
t.Fatal(err)
}
// b2_upload_file
smallFile := io.LimitReader(zReader{}, 1024*50) // 50k
hash := sha1.New()
buf := &bytes.Buffer{}
w := io.MultiWriter(hash, buf)
if _, err := io.Copy(w, smallFile); err != nil {
t.Error(err)
}
smallSHA1 := fmt.Sprintf("%x", hash.Sum(nil))
smallInfoMap := map[string]string{
"one": "1",
"two": "2",
}
file, err := ue.UploadFile(ctx, buf, buf.Len(), smallFileName, "application/octet-stream", smallSHA1, smallInfoMap)
if err != nil {
t.Fatal(err)
}
defer func() {
// b2_delete_file_version
if err := file.DeleteFileVersion(ctx); err != nil {
t.Error(err)
}
}()
// b2_start_large_file
largeInfoMap := map[string]string{
"one_BILLION": "1e9",
"two_TRILLION": "2eSomething, I guess 2e12",
}
lf, err := bucket.StartLargeFile(ctx, largeFileName, "application/octet-stream", largeInfoMap)
if err != nil {
t.Fatal(err)
}
// b2_get_upload_part_url
fc, err := lf.GetUploadPartURL(ctx)
if err != nil {
t.Fatal(err)
}
// b2_upload_part
largeFile := io.LimitReader(zReader{}, 10e6) // 10M
for i := 0; i < 2; i++ {
r := io.LimitReader(largeFile, 5e6) // 5M
hash := sha1.New()
buf := &bytes.Buffer{}
w := io.MultiWriter(hash, buf)
if _, err := io.Copy(w, r); err != nil {
t.Error(err)
}
if _, err := fc.UploadPart(ctx, buf, fmt.Sprintf("%x", hash.Sum(nil)), buf.Len(), i+1); err != nil {
t.Error(err)
}
}
// b2_finish_large_file
lfile, err := lf.FinishLargeFile(ctx)
if err != nil {
t.Fatal(err)
}
// b2_get_file_info
smallInfo, err := file.GetFileInfo(ctx)
if err != nil {
t.Fatal(err)
}
compareFileAndInfo(t, smallInfo, smallFileName, smallSHA1, smallInfoMap)
largeInfo, err := lfile.GetFileInfo(ctx)
if err != nil {
t.Fatal(err)
}
compareFileAndInfo(t, largeInfo, largeFileName, "none", largeInfoMap)
defer func() {
if err := lfile.DeleteFileVersion(ctx); err != nil {
t.Error(err)
}
}()
clf, err := bucket.StartLargeFile(ctx, largeFileName, "application/octet-stream", nil)
if err != nil {
t.Fatal(err)
}
// b2_cancel_large_file
if err := clf.CancelLargeFile(ctx); err != nil {
t.Fatal(err)
}
// b2_list_file_names
files, _, err := bucket.ListFileNames(ctx, 100, "", "", "")
if err != nil {
t.Fatal(err)
}
if len(files) != 2 {
t.Errorf("expected 2 files, got %d: %v", len(files), files)
}
// b2_download_file_by_name
fr, err := bucket.DownloadFileByName(ctx, smallFileName, 0, 0)
if err != nil {
t.Fatal(err)
}
if fr.SHA1 != smallSHA1 {
t.Errorf("small file SHAs don't match: got %q, want %q", fr.SHA1, smallSHA1)
}
lbuf := &bytes.Buffer{}
if _, err := io.Copy(lbuf, fr); err != nil {
t.Fatal(err)
}
if lbuf.Len() != fr.ContentLength {
t.Errorf("small file retreived lengths don't match: got %d, want %d", lbuf.Len(), fr.ContentLength)
}
// b2_hide_file
hf, err := bucket.HideFile(ctx, smallFileName)
if err != nil {
t.Fatal(err)
}
defer func() {
if err := hf.DeleteFileVersion(ctx); err != nil {
t.Error(err)
}
}()
// b2_list_file_versions
files, _, _, err = bucket.ListFileVersions(ctx, 100, "", "", "", "")
if err != nil {
t.Fatal(err)
}
if len(files) != 3 {
t.Errorf("expected 3 files, got %d: %v", len(files), files)
}
// b2_get_download_authorization
if _, err := bucket.GetDownloadAuthorization(ctx, "foo/", 24*time.Hour); err != nil {
t.Errorf("failed to get download auth token: %v", err)
}
}
func compareFileAndInfo(t *testing.T, info *FileInfo, name, sha1 string, imap map[string]string) {
if info.Name != name {
t.Errorf("got %q, want %q", info.Name, name)
}
if info.SHA1 != sha1 {
t.Errorf("got %q, want %q", info.SHA1, sha1)
}
if !reflect.DeepEqual(info.Info, imap) {
t.Errorf("got %v, want %v", info.Info, imap)
}
}

View File

@ -0,0 +1,240 @@
// Copyright 2016, Google
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package b2types implements internal types common to the B2 API.
package b2types
// You know what would be amazing? If I could autogen this from like a JSON
// file. Wouldn't that be amazing? That would be amazing.
const (
V1api = "/b2api/v1/"
)
type ErrorMessage struct {
Status int `json:"status"`
Code string `json:"code"`
Msg string `json:"message"`
}
type AuthorizeAccountResponse struct {
AccountID string `json:"accountId"`
AuthToken string `json:"authorizationToken"`
URI string `json:"apiUrl"`
DownloadURI string `json:"downloadUrl"`
MinPartSize int `json:"minimumPartSize"`
}
type LifecycleRule struct {
DaysHiddenUntilDeleted int `json:"daysFromHidingToDeleting,omitempty"`
DaysNewUntilHidden int `json:"daysFromUploadingToHiding,omitempty"`
Prefix string `json:"fileNamePrefix"`
}
type CreateBucketRequest struct {
AccountID string `json:"accountId"`
Name string `json:"bucketName"`
Type string `json:"bucketType"`
Info map[string]string `json:"bucketInfo"`
LifecycleRules []LifecycleRule `json:"lifecycleRules"`
}
type CreateBucketResponse struct {
BucketID string `json:"bucketId"`
Name string `json:"bucketName"`
Type string `json:"bucketType"`
Info map[string]string `json:"bucketInfo"`
LifecycleRules []LifecycleRule `json:"lifecycleRules"`
Revision int `json:"revision"`
}
type DeleteBucketRequest struct {
AccountID string `json:"accountId"`
BucketID string `json:"bucketId"`
}
type ListBucketsRequest struct {
AccountID string `json:"accountId"`
}
type ListBucketsResponse struct {
Buckets []CreateBucketResponse `json:"buckets"`
}
type UpdateBucketRequest struct {
AccountID string `json:"accountId"`
BucketID string `json:"bucketId"`
// bucketName is a required field according to
// https://www.backblaze.com/b2/docs/b2_update_bucket.html.
//
// However, actually setting it returns 400: unknown field in
// com.backblaze.modules.b2.data.UpdateBucketRequest: bucketName
//
//Name string `json:"bucketName"`
Type string `json:"bucketType,omitempty"`
Info map[string]string `json:"bucketInfo,omitempty"`
LifecycleRules []LifecycleRule `json:"lifecycleRules,omitempty"`
IfRevisionIs int `json:"ifRevisionIs,omitempty"`
}
type UpdateBucketResponse CreateBucketResponse
type GetUploadURLRequest struct {
BucketID string `json:"bucketId"`
}
type GetUploadURLResponse struct {
URI string `json:"uploadUrl"`
Token string `json:"authorizationToken"`
}
type UploadFileResponse struct {
FileID string `json:"fileId"`
Timestamp int64 `json:"uploadTimestamp"`
Action string `json:"action"`
}
type DeleteFileVersionRequest struct {
Name string `json:"fileName"`
FileID string `json:"fileId"`
}
type StartLargeFileRequest struct {
BucketID string `json:"bucketId"`
Name string `json:"fileName"`
ContentType string `json:"contentType"`
Info map[string]string `json:"fileInfo,omitempty"`
}
type StartLargeFileResponse struct {
ID string `json:"fileId"`
}
type CancelLargeFileRequest struct {
ID string `json:"fileId"`
}
type ListPartsRequest struct {
ID string `json:"fileId"`
Start int `json:"startPartNumber"`
Count int `json:"maxPartCount"`
}
type ListPartsResponse struct {
Next int `json:"nextPartNumber"`
Parts []struct {
ID string `json:"fileId"`
Number int `json:"partNumber"`
SHA1 string `json:"contentSha1"`
Size int64 `json:"contentLength"`
} `json:"parts"`
}
type getUploadPartURLRequest struct {
ID string `json:"fileId"`
}
type getUploadPartURLResponse struct {
URL string `json:"uploadUrl"`
Token string `json:"authorizationToken"`
}
type FinishLargeFileRequest struct {
ID string `json:"fileId"`
Hashes []string `json:"partSha1Array"`
}
type FinishLargeFileResponse struct {
Name string `json:"fileName"`
FileID string `json:"fileId"`
Timestamp int64 `json:"uploadTimestamp"`
Action string `json:"action"`
}
type ListFileNamesRequest struct {
BucketID string `json:"bucketId"`
Count int `json:"maxFileCount"`
Continuation string `json:"startFileName,omitempty"`
Prefix string `json:"prefix,omitempty"`
Delimiter string `json:"delimiter,omitempty"`
}
type ListFileNamesResponse struct {
Continuation string `json:"nextFileName"`
Files []struct {
FileID string `json:"fileId"`
Name string `json:"fileName"`
Size int64 `json:"size"`
Action string `json:"action"`
Timestamp int64 `json:"uploadTimestamp"`
} `json:"files"`
}
type ListFileVersionsRequest struct {
BucketID string `json:"bucketId"`
Count int `json:"maxFileCount"`
StartName string `json:"startFileName,omitempty"`
StartID string `json:"startFileId,omitempty"`
Prefix string `json:"prefix,omitempty"`
Delimiter string `json:"delimiter,omitempty"`
}
type ListFileVersionsResponse struct {
NextName string `json:"nextFileName"`
NextID string `json:"nextFileId"`
Files []struct {
FileID string `json:"fileId"`
Name string `json:"fileName"`
Size int64 `json:"size"`
Action string `json:"action"`
Timestamp int64 `json:"uploadTimestamp"`
} `json:"files"`
}
type HideFileRequest struct {
BucketID string `json:"bucketId"`
File string `json:"fileName"`
}
type HideFileResponse struct {
ID string `json:"fileId"`
Timestamp int64 `json:"uploadTimestamp"`
Action string `json:"action"`
}
type GetFileInfoRequest struct {
ID string `json:"fileId"`
}
type GetFileInfoResponse struct {
Name string `json:"fileName"`
SHA1 string `json:"contentSha1"`
Size int64 `json:"contentLength"`
ContentType string `json:"contentType"`
Info map[string]string `json:"fileInfo"`
Action string `json:"action"`
Timestamp int64 `json:"uploadTimestamp"`
}
type GetDownloadAuthorizationRequest struct {
BucketID string `json:"bucketId"`
Prefix string `json:"fileNamePrefix"`
Valid int `json:"validDurationInSeconds"`
}
type GetDownloadAuthorizationResponse struct {
BucketID string `json:"bucketId"`
Prefix string `json:"fileNamePrefix"`
Token string `json:"authorizationToken"`
}

View File

@ -0,0 +1,54 @@
// Copyright 2017, Google
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package blog implements a private logger, in the manner of glog, without
// poluting the flag namespace or leaving files all over /tmp.
//
// It has almost no features, and a bunch of global state.
package blog
import (
"log"
"os"
"strconv"
)
var level int32
type Verbose bool
func init() {
lvl := os.Getenv("B2_LOG_LEVEL")
i, err := strconv.ParseInt(lvl, 10, 32)
if err != nil {
return
}
level = int32(i)
}
func (v Verbose) Info(a ...interface{}) {
if v {
log.Print(a...)
}
}
func (v Verbose) Infof(format string, a ...interface{}) {
if v {
log.Printf(format, a...)
}
}
func V(target int32) Verbose {
return Verbose(target <= level)
}