diff --git a/vendor/manifest b/vendor/manifest index 920c3da86..40f904099 100644 --- a/vendor/manifest +++ b/vendor/manifest @@ -34,7 +34,7 @@ { "importpath": "github.com/kurin/blazer", "repository": "https://github.com/kurin/blazer", - "revision": "48de0a1e4d21fba201aff7fefdf3e5e7735b1439", + "revision": "d1b9d31c8641e46f2651fe564ee9ddb857c1ed29", "branch": "master" }, { diff --git a/vendor/src/github.com/kurin/blazer/b2/b2.go b/vendor/src/github.com/kurin/blazer/b2/b2.go index bfde1cee8..66c8c1c68 100644 --- a/vendor/src/github.com/kurin/blazer/b2/b2.go +++ b/vendor/src/github.com/kurin/blazer/b2/b2.go @@ -28,7 +28,6 @@ package b2 import ( - "bytes" "fmt" "io" "net/http" @@ -64,7 +63,10 @@ func NewClient(ctx context.Context, account, key string, opts ...ClientOption) ( } type clientOptions struct { - transport http.RoundTripper + transport http.RoundTripper + failSomeUploads bool + expireTokens bool + capExceeded bool } // A ClientOption allows callers to adjust various per-client settings. @@ -78,6 +80,30 @@ func Transport(rt http.RoundTripper) ClientOption { } } +// FailSomeUploads requests intermittent upload failures from the B2 service. +// This is mostly useful for testing. +func FailSomeUploads() ClientOption { + return func(c *clientOptions) { + c.failSomeUploads = true + } +} + +// ExpireSomeAuthTokens requests intermittent authentication failures from the +// B2 service. +func ExpireSomeAuthTokens() ClientOption { + return func(c *clientOptions) { + c.expireTokens = true + } +} + +// ForceCapExceeded requests a cap limit from the B2 service. This causes all +// uploads to be treated as if they would exceed the configure B2 capacity. +func ForceCapExceeded() ClientOption { + return func(c *clientOptions) { + c.capExceeded = true + } +} + // Bucket is a reference to a B2 bucket. type Bucket struct { b beBucketInterface @@ -306,9 +332,11 @@ func (o *Object) Name() string { // Attrs returns an object's attributes. func (o *Object) Attrs(ctx context.Context) (*Attrs, error) { - if err := o.ensure(ctx); err != nil { + f, err := o.b.b.downloadFileByName(ctx, o.name, 0, 1) + if err != nil { return nil, err } + o.f = o.b.b.file(f.id()) fi, err := o.f.getFileInfo(ctx) if err != nil { return nil, err @@ -403,7 +431,7 @@ func (o *Object) NewRangeReader(ctx context.Context, offset, length int64) *Read cancel: cancel, o: o, name: o.name, - chunks: make(map[int]*bytes.Buffer), + chunks: make(map[int]*rchunk), length: length, offset: offset, } diff --git a/vendor/src/github.com/kurin/blazer/b2/b2_test.go b/vendor/src/github.com/kurin/blazer/b2/b2_test.go index 3f974438d..9748528af 100644 --- a/vendor/src/github.com/kurin/blazer/b2/b2_test.go +++ b/vendor/src/github.com/kurin/blazer/b2/b2_test.go @@ -196,8 +196,18 @@ func (t *testBucket) listFileVersions(ctx context.Context, count int, a, b, c, d } func (t *testBucket) downloadFileByName(_ context.Context, name string, offset, size int64) (b2FileReaderInterface, error) { + f := t.files[name] + end := int(offset + size) + if end >= len(f) { + end = len(f) + } + if int(offset) >= len(f) { + return nil, errNoMoreContent + } return &testFileReader{ - b: ioutil.NopCloser(bytes.NewBufferString(t.files[name][offset : offset+size])), + b: ioutil.NopCloser(bytes.NewBufferString(f[offset:end])), + s: end - int(offset), + n: name, }, nil } @@ -205,7 +215,8 @@ func (t *testBucket) hideFile(context.Context, string) (b2FileInterface, error) func (t *testBucket) getDownloadAuthorization(context.Context, string, time.Duration) (string, error) { return "", nil } -func (t *testBucket) baseURL() string { return "" } +func (t *testBucket) baseURL() string { return "" } +func (t *testBucket) file(id string) b2FileInterface { return nil } type testURL struct { files map[string]string @@ -310,12 +321,14 @@ func (t *testFile) deleteFileVersion(context.Context) error { type testFileReader struct { b io.ReadCloser - s int64 + s int + n string } func (t *testFileReader) Read(p []byte) (int, error) { return t.b.Read(p) } func (t *testFileReader) Close() error { return nil } -func (t *testFileReader) stats() (int, string, string, map[string]string) { return 0, "", "", nil } +func (t *testFileReader) stats() (int, string, string, map[string]string) { return t.s, "", "", nil } +func (t *testFileReader) id() string { return t.n } type zReader struct{} @@ -569,6 +582,46 @@ func TestReadWrite(t *testing.T) { } } +func TestReadRangeReturnsRight(t *testing.T) { + ctx := context.Background() + ctx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + + client := &Client{ + backend: &beRoot{ + b2i: &testRoot{ + bucketMap: make(map[string]map[string]string), + errs: &errCont{}, + }, + }, + } + + bucket, err := client.NewBucket(ctx, bucketName, &BucketAttrs{Type: Private}) + if err != nil { + t.Fatal(err) + } + defer func() { + if err := bucket.Delete(ctx); err != nil { + t.Error(err) + } + }() + + obj, _, err := writeFile(ctx, bucket, "file", 1e6+42, 1e8) + if err != nil { + t.Fatal(err) + } + r := obj.NewRangeReader(ctx, 200, 1400) + r.ChunkSize = 1000 + + i, err := io.Copy(ioutil.Discard, r) + if err != nil { + t.Error(err) + } + if i != 1400 { + t.Errorf("NewRangeReader(_, 200, 1400): want 1400, got %d", i) + } +} + func TestWriterReturnsError(t *testing.T) { ctx := context.Background() ctx, cancel := context.WithTimeout(ctx, 10*time.Second) diff --git a/vendor/src/github.com/kurin/blazer/b2/backend.go b/vendor/src/github.com/kurin/blazer/b2/backend.go index 843a997a8..9862988c3 100644 --- a/vendor/src/github.com/kurin/blazer/b2/backend.go +++ b/vendor/src/github.com/kurin/blazer/b2/backend.go @@ -54,6 +54,7 @@ type beBucketInterface interface { hideFile(context.Context, string) (beFileInterface, error) getDownloadAuthorization(context.Context, string, time.Duration) (string, error) baseURL() string + file(string) beFileInterface } type beBucket struct { @@ -110,6 +111,7 @@ type beFileChunk struct { type beFileReaderInterface interface { io.ReadCloser stats() (int, string, string, map[string]string) + id() string } type beFileReader struct { @@ -405,6 +407,13 @@ func (b *beBucket) baseURL() string { return b.b2bucket.baseURL() } +func (b *beBucket) file(id string) beFileInterface { + return &beFile{ + b2file: b.b2bucket.file(id), + ri: b.ri, + } +} + func (b *beURL) uploadFile(ctx context.Context, r io.ReadSeeker, size int, name, ct, sha1 string, info map[string]string) (beFileInterface, error) { var file beFileInterface f := func() error { @@ -602,6 +611,8 @@ func (b *beFileReader) stats() (int, string, string, map[string]string) { return b.b2fileReader.stats() } +func (b *beFileReader) id() string { return b.b2fileReader.id() } + func (b *beFileInfo) stats() (string, string, int64, string, map[string]string, string, time.Time) { return b.name, b.sha, b.size, b.ct, b.info, b.status, b.stamp } diff --git a/vendor/src/github.com/kurin/blazer/b2/baseline.go b/vendor/src/github.com/kurin/blazer/b2/baseline.go index 5a00f526f..adfefcc75 100644 --- a/vendor/src/github.com/kurin/blazer/b2/baseline.go +++ b/vendor/src/github.com/kurin/blazer/b2/baseline.go @@ -16,6 +16,7 @@ package b2 import ( "io" + "net/http" "time" "github.com/kurin/blazer/base" @@ -50,6 +51,7 @@ type b2BucketInterface interface { hideFile(context.Context, string) (b2FileInterface, error) getDownloadAuthorization(context.Context, string, time.Duration) (string, error) baseURL() string + file(string) b2FileInterface } type b2URLInterface interface { @@ -81,6 +83,7 @@ type b2FileChunkInterface interface { type b2FileReaderInterface interface { io.ReadCloser stats() (int, string, string, map[string]string) + id() string } type b2FileInfoInterface interface { @@ -138,6 +141,15 @@ func (b *b2Root) authorizeAccount(ctx context.Context, account, key string, opts if c.transport != nil { aopts = append(aopts, base.Transport(c.transport)) } + if c.failSomeUploads { + aopts = append(aopts, base.FailSomeUploads()) + } + if c.expireTokens { + aopts = append(aopts, base.ExpireSomeAuthTokens()) + } + if c.capExceeded { + aopts = append(aopts, base.ForceCapExceeded()) + } nb, err := base.AuthorizeAccount(ctx, account, key, aopts...) if err != nil { return err @@ -303,6 +315,9 @@ func (b *b2Bucket) listFileVersions(ctx context.Context, count int, nextName, ne func (b *b2Bucket) downloadFileByName(ctx context.Context, name string, offset, size int64) (b2FileReaderInterface, error) { fr, err := b.b.DownloadFileByName(ctx, name, offset, size) if err != nil { + if code, _ := base.Code(err); code == http.StatusRequestedRangeNotSatisfiable { + return nil, errNoMoreContent + } return nil, err } return &b2FileReader{fr}, nil @@ -324,6 +339,8 @@ func (b *b2Bucket) baseURL() string { return b.b.BaseURL() } +func (b *b2Bucket) file(id string) b2FileInterface { return &b2File{b.b.File(id)} } + func (b *b2URL) uploadFile(ctx context.Context, r io.Reader, size int, name, contentType, sha1 string, info map[string]string) (b2FileInterface, error) { file, err := b.b.UploadFile(ctx, r, size, name, contentType, sha1, info) if err != nil { @@ -416,6 +433,8 @@ func (b *b2FileReader) stats() (int, string, string, map[string]string) { return b.b.ContentLength, b.b.ContentType, b.b.SHA1, b.b.Info } +func (b *b2FileReader) id() string { return b.b.ID } + func (b *b2FileInfo) stats() (string, string, int64, string, map[string]string, string, time.Time) { return b.b.Name, b.b.SHA1, b.b.Size, b.b.ContentType, b.b.Info, b.b.Status, b.b.Timestamp } diff --git a/vendor/src/github.com/kurin/blazer/b2/integration_test.go b/vendor/src/github.com/kurin/blazer/b2/integration_test.go index c96b4fb33..f24da99b4 100644 --- a/vendor/src/github.com/kurin/blazer/b2/integration_test.go +++ b/vendor/src/github.com/kurin/blazer/b2/integration_test.go @@ -25,8 +25,6 @@ import ( "testing" "time" - "github.com/kurin/blazer/base" - "golang.org/x/net/context" ) @@ -37,16 +35,6 @@ const ( errVar = "B2_TRANSIENT_ERRORS" ) -func init() { - fail := os.Getenv(errVar) - switch fail { - case "", "0", "false": - return - } - base.FailSomeUploads = true - base.ExpireSomeAuthTokens = true -} - func TestReadWriteLive(t *testing.T) { ctx := context.Background() ctx, cancel := context.WithTimeout(ctx, 10*time.Minute) @@ -144,6 +132,21 @@ func TestHideShowLive(t *testing.T) { } } +type cancelReader struct { + r io.Reader + n, l int + c func() +} + +func (c *cancelReader) Read(p []byte) (int, error) { + n, err := c.r.Read(p) + c.n += n + if c.n >= c.l { + c.c() + } + return n, err +} + func TestResumeWriter(t *testing.T) { ctx := context.Background() ctx, cancel := context.WithTimeout(ctx, 10*time.Minute) @@ -151,18 +154,11 @@ func TestResumeWriter(t *testing.T) { w := bucket.Object("foo").NewWriter(ctx) w.ChunkSize = 5e6 - r := io.LimitReader(zReader{}, 15e6) - go func() { - // Cancel the context after the first chunk has been written. - ticker := time.NewTicker(time.Second) - defer ticker.Stop() - defer cancel() - for range ticker.C { - if w.cidx > 1 { - return - } - } - }() + r := &cancelReader{ + r: io.LimitReader(zReader{}, 15e6), + l: 6e6, + c: cancel, + } if _, err := io.Copy(w, r); err != context.Canceled { t.Fatalf("io.Copy: wanted canceled context, got: %v", err) } @@ -392,6 +388,11 @@ func TestRangeReaderLive(t *testing.T) { length: 2e6, size: 1e6, }, + { + offset: 0, + length: 4e6, + size: 3e6, + }, } for _, e := range table { @@ -418,12 +419,12 @@ func TestRangeReaderLive(t *testing.T) { continue } if read != e.size { - t.Errorf("read %d bytes, wanted %d bytes", read, e.size) + t.Errorf("NewRangeReader(_, %d, %d): read %d bytes, wanted %d bytes", e.offset, e.length, read, e.size) } got := fmt.Sprintf("%x", hr.Sum(nil)) want := fmt.Sprintf("%x", hw.Sum(nil)) if got != want { - t.Errorf("bad hash, got %q, want %q", got, want) + t.Errorf("NewRangeReader(_, %d, %d): got %q, want %q", e.offset, e.length, got, want) } } } @@ -661,7 +662,7 @@ func startLiveTest(ctx context.Context, t *testing.T) (*Bucket, func()) { t.Skipf("B2_ACCOUNT_ID or B2_SECRET_KEY unset; skipping integration tests") return nil, nil } - client, err := NewClient(ctx, id, key) + client, err := NewClient(ctx, id, key, FailSomeUploads(), ExpireSomeAuthTokens()) if err != nil { t.Fatal(err) return nil, nil diff --git a/vendor/src/github.com/kurin/blazer/b2/reader.go b/vendor/src/github.com/kurin/blazer/b2/reader.go index cba76b46c..9b293710e 100644 --- a/vendor/src/github.com/kurin/blazer/b2/reader.go +++ b/vendor/src/github.com/kurin/blazer/b2/reader.go @@ -16,6 +16,7 @@ package b2 import ( "bytes" + "errors" "io" "sync" @@ -24,6 +25,8 @@ import ( "golang.org/x/net/context" ) +var errNoMoreContent = errors.New("416: out of content") + // Reader reads files from B2. type Reader struct { // ConcurrentDownloads is the number of simultaneous downloads to pull from @@ -42,16 +45,15 @@ type Reader struct { name string offset int64 // the start of the file length int64 // the length to read, or -1 - size int64 // the end of the file, in absolute terms csize int // chunk size read int // amount read chwid int // chunks written chrid int // chunks read - chbuf chan *bytes.Buffer + chbuf chan *rchunk init sync.Once rmux sync.Mutex // guards rcond rcond *sync.Cond - chunks map[int]*bytes.Buffer + chunks map[int]*rchunk emux sync.RWMutex // guards err, believe it or not err error @@ -60,6 +62,11 @@ type Reader struct { smap map[int]*meteredReader } +type rchunk struct { + bytes.Buffer + final bool +} + // Close frees resources associated with the download. func (r *Reader) Close() error { r.cancel() @@ -93,7 +100,7 @@ func (r *Reader) getErr() error { func (r *Reader) thread() { go func() { for { - var buf *bytes.Buffer + var buf *rchunk select { case b, ok := <-r.chbuf: if !ok { @@ -109,26 +116,31 @@ func (r *Reader) thread() { r.rmux.Unlock() offset := int64(chunkID*r.csize) + r.offset size := int64(r.csize) - if offset >= r.size { - // Send an empty chunk. This is necessary to prevent a deadlock when - // this is the very first chunk. + if r.length > 0 { + if size > r.length { + buf.final = true + size = r.length + } + r.length -= size + } + redo: + fr, err := r.o.b.b.downloadFileByName(r.ctx, r.name, offset, size) + if err == errNoMoreContent { + // this read generated a 416 so we are entirely past the end of the object + buf.final = true r.rmux.Lock() r.chunks[chunkID] = buf r.rmux.Unlock() r.rcond.Broadcast() return } - if offset+size > r.size { - size = r.size - offset - } - redo: - fr, err := r.o.b.b.downloadFileByName(r.ctx, r.name, offset, size) if err != nil { r.setErr(err) r.rcond.Broadcast() return } - mr := &meteredReader{r: &fakeSeeker{fr}, size: int(size)} + rsize, _, _, _ := fr.stats() + mr := &meteredReader{r: &fakeSeeker{fr}, size: int(rsize)} r.smux.Lock() r.smap[chunkID] = mr r.smux.Unlock() @@ -136,9 +148,9 @@ func (r *Reader) thread() { r.smux.Lock() r.smap[chunkID] = nil r.smux.Unlock() - if i < size || err == io.ErrUnexpectedEOF { + if i < int64(rsize) || err == io.ErrUnexpectedEOF { // Probably the network connection was closed early. Retry. - blog.V(1).Infof("b2 reader %d: got %dB of %dB; retrying", chunkID, i, size) + blog.V(1).Infof("b2 reader %d: got %dB of %dB; retrying", chunkID, i, rsize) buf.Reset() goto redo } @@ -155,8 +167,8 @@ func (r *Reader) thread() { }() } -func (r *Reader) curChunk() (*bytes.Buffer, error) { - ch := make(chan *bytes.Buffer) +func (r *Reader) curChunk() (*rchunk, error) { + ch := make(chan *rchunk) go func() { r.rmux.Lock() defer r.rmux.Unlock() @@ -185,17 +197,6 @@ func (r *Reader) initFunc() { r.smap = make(map[int]*meteredReader) r.smux.Unlock() r.o.b.c.addReader(r) - if err := r.o.ensure(r.ctx); err != nil { - r.setErr(err) - return - } - r.size = r.o.f.size() - if r.length >= 0 && r.offset+r.length < r.size { - r.size = r.offset + r.length - } - if r.offset > r.size { - r.offset = r.size - } r.rcond = sync.NewCond(&r.rmux) cr := r.ConcurrentDownloads if cr < 1 { @@ -205,10 +206,10 @@ func (r *Reader) initFunc() { r.ChunkSize = 1e7 } r.csize = r.ChunkSize - r.chbuf = make(chan *bytes.Buffer, cr) + r.chbuf = make(chan *rchunk, cr) for i := 0; i < cr; i++ { r.thread() - r.chbuf <- &bytes.Buffer{} + r.chbuf <- &rchunk{} } } @@ -226,7 +227,7 @@ func (r *Reader) Read(p []byte) (int, error) { n, err := chunk.Read(p) r.read += n if err == io.EOF { - if int64(r.read) >= r.size-r.offset { + if chunk.final { close(r.chbuf) r.setErrNoCancel(err) return n, err diff --git a/vendor/src/github.com/kurin/blazer/b2/writer.go b/vendor/src/github.com/kurin/blazer/b2/writer.go index 0605f947b..3026460ea 100644 --- a/vendor/src/github.com/kurin/blazer/b2/writer.go +++ b/vendor/src/github.com/kurin/blazer/b2/writer.go @@ -107,7 +107,7 @@ func (w *Writer) setErr(err error) { w.emux.Lock() defer w.emux.Unlock() if w.err == nil { - blog.V(0).Infof("error writing %s: %v", w.name, err) + blog.V(1).Infof("error writing %s: %v", w.name, err) w.err = err w.cancel() } @@ -134,15 +134,15 @@ func (w *Writer) completeChunk(id int) { var gid int32 func (w *Writer) thread() { + w.wg.Add(1) go func() { + defer w.wg.Done() id := atomic.AddInt32(&gid, 1) fc, err := w.file.getUploadPartURL(w.ctx) if err != nil { w.setErr(err) return } - w.wg.Add(1) - defer w.wg.Done() for { chunk, ok := <-w.ready if !ok { diff --git a/vendor/src/github.com/kurin/blazer/base/base.go b/vendor/src/github.com/kurin/blazer/base/base.go index 7c87be907..9ae118b15 100644 --- a/vendor/src/github.com/kurin/blazer/base/base.go +++ b/vendor/src/github.com/kurin/blazer/base/base.go @@ -208,7 +208,10 @@ func millitime(t int64) time.Time { } type b2Options struct { - transport http.RoundTripper + transport http.RoundTripper + failSomeUploads bool + expireTokens bool + capExceeded bool } func (o *b2Options) getTransport() http.RoundTripper { @@ -272,20 +275,6 @@ func (rb *requestBody) getBody() io.Reader { return rb.body } -var ( - // FailSomeUploads causes B2 to return errors, randomly, to some RPCs. It is - // intended to be used for integration testing. - FailSomeUploads = false - - // ExpireSomeAuthTokens causes B2 to expire auth tokens frequently, testing - // account reauthentication. - ExpireSomeAuthTokens = false - - // ForceCapExceeded causes B2 to reject all uploads with capacity limit - // failures. - ForceCapExceeded = false -) - var reqID int64 func (o *b2Options) makeRequest(ctx context.Context, method, verb, url string, b2req, b2resp interface{}, headers map[string]string, body *requestBody) error { @@ -311,13 +300,13 @@ func (o *b2Options) makeRequest(ctx context.Context, method, verb, url string, b } req.Header.Set("X-Blazer-Request-ID", fmt.Sprintf("%d", atomic.AddInt64(&reqID, 1))) req.Header.Set("X-Blazer-Method", method) - if FailSomeUploads { + if o.failSomeUploads { req.Header.Add("X-Bz-Test-Mode", "fail_some_uploads") } - if ExpireSomeAuthTokens { + if o.expireTokens { req.Header.Add("X-Bz-Test-Mode", "expire_some_account_authorization_tokens") } - if ForceCapExceeded { + if o.capExceeded { req.Header.Add("X-Bz-Test-Mode", "force_cap_exceeded") } cancel := make(chan struct{}) @@ -396,6 +385,30 @@ func Transport(rt http.RoundTripper) AuthOption { } } +// FailSomeUploads requests intermittent upload failures from the B2 service. +// This is mostly useful for testing. +func FailSomeUploads() AuthOption { + return func(o *b2Options) { + o.failSomeUploads = true + } +} + +// ExpireSomeAuthTokens requests intermittent authentication failures from the +// B2 service. +func ExpireSomeAuthTokens() AuthOption { + return func(o *b2Options) { + o.expireTokens = true + } +} + +// ForceCapExceeded requests a cap limit from the B2 service. This causes all +// uploads to be treated as if they would exceed the configure B2 capacity. +func ForceCapExceeded() AuthOption { + return func(o *b2Options) { + o.capExceeded = true + } +} + type LifecycleRule struct { Prefix string DaysNewUntilHidden int @@ -604,6 +617,12 @@ type File struct { b2 *B2 } +// File returns a bare File struct, but with the appropriate id and b2 +// interfaces. +func (b *Bucket) File(id string) *File { + return &File{id: id, b2: b.b2} +} + // UploadFile wraps b2_upload_file. func (url *URL) UploadFile(ctx context.Context, r io.Reader, size int, name, contentType, sha1 string, info map[string]string) (*File, error) { headers := map[string]string{ @@ -910,6 +929,7 @@ type FileReader struct { ContentLength int ContentType string SHA1 string + ID string Info map[string]string } @@ -971,6 +991,7 @@ func (b *Bucket) DownloadFileByName(ctx context.Context, name string, offset, si return &FileReader{ ReadCloser: reply.resp.Body, SHA1: reply.resp.Header.Get("X-Bz-Content-Sha1"), + ID: reply.resp.Header.Get("X-Bz-File-Id"), ContentType: reply.resp.Header.Get("Content-Type"), ContentLength: int(clen), Info: info,