diff --git a/Gopkg.lock b/Gopkg.lock index a2858f831..ed7feabd4 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -88,8 +88,8 @@ [[projects]] name = "github.com/kurin/blazer" packages = ["b2","base","internal/b2types","internal/blog"] - revision = "e269a1a17bb6aec278c06a57cb7e8f8d0d333e04" - version = "v0.2.1" + revision = "5b348b2bdb078b06baa46ab7e12cdff12ee028ab" + version = "v0.2.2" [[projects]] name = "github.com/marstr/guid" diff --git a/vendor/github.com/kurin/blazer/b2/b2.go b/vendor/github.com/kurin/blazer/b2/b2.go index 875c5310e..723b0c377 100644 --- a/vendor/github.com/kurin/blazer/b2/b2.go +++ b/vendor/github.com/kurin/blazer/b2/b2.go @@ -120,7 +120,8 @@ type Bucket struct { b beBucketInterface r beRootInterface - c *Client + c *Client + urlPool sync.Pool } type BucketType string @@ -240,7 +241,7 @@ func (c *Client) NewBucket(ctx context.Context, name string, attrs *BucketAttrs) }, err } -// ListBucket returns all the available buckets. +// ListBuckets returns all the available buckets. func (c *Client) ListBuckets(ctx context.Context) ([]*Bucket, error) { bs, err := c.backend.listBuckets(ctx) if err != nil { @@ -565,6 +566,37 @@ func (b *Bucket) ListCurrentObjects(ctx context.Context, count int, c *Cursor) ( return objects, next, rtnErr } +// ListUnfinishedLargeFiles lists any objects that correspond to large file uploads that haven't been completed. +// This can happen for example when an upload is interrupted. +func (b *Bucket) ListUnfinishedLargeFiles(ctx context.Context, count int, c *Cursor) ([]*Object, *Cursor, error) { + if c == nil { + c = &Cursor{} + } + fs, name, err := b.b.listUnfinishedLargeFiles(ctx, count, c.name) + if err != nil { + return nil, nil, err + } + var next *Cursor + if name != "" { + next = &Cursor{ + name: name, + } + } + var objects []*Object + for _, f := range fs { + objects = append(objects, &Object{ + name: f.name(), + f: f, + b: b, + }) + } + var rtnErr error + if len(objects) == 0 || next == nil { + rtnErr = io.EOF + } + return objects, next, rtnErr +} + // Hide hides the object from name-based listing. func (o *Object) Hide(ctx context.Context) error { if err := o.ensure(ctx); err != nil { diff --git a/vendor/github.com/kurin/blazer/b2/b2_test.go b/vendor/github.com/kurin/blazer/b2/b2_test.go index e728f9bb7..add7d7567 100644 --- a/vendor/github.com/kurin/blazer/b2/b2_test.go +++ b/vendor/github.com/kurin/blazer/b2/b2_test.go @@ -198,6 +198,10 @@ func (t *testBucket) listFileVersions(ctx context.Context, count int, a, b, c, d return x, y, "", z } +func (t *testBucket) listUnfinishedLargeFiles(ctx context.Context, count int, cont string) ([]b2FileInterface, string, error) { + return nil, "", fmt.Errorf("testBucket.listUnfinishedLargeFiles(ctx, %d, %q): not implemented", count, cont) +} + func (t *testBucket) downloadFileByName(_ context.Context, name string, offset, size int64) (b2FileReaderInterface, error) { gmux.Lock() defer gmux.Unlock() diff --git a/vendor/github.com/kurin/blazer/b2/backend.go b/vendor/github.com/kurin/blazer/b2/backend.go index 392dbb3cf..c983c3b50 100644 --- a/vendor/github.com/kurin/blazer/b2/backend.go +++ b/vendor/github.com/kurin/blazer/b2/backend.go @@ -49,6 +49,7 @@ type beBucketInterface interface { startLargeFile(ctx context.Context, name, contentType string, info map[string]string) (beLargeFileInterface, error) listFileNames(context.Context, int, string, string, string) ([]beFileInterface, string, error) listFileVersions(context.Context, int, string, string, string, string) ([]beFileInterface, string, string, error) + listUnfinishedLargeFiles(context.Context, int, string) ([]beFileInterface, string, error) downloadFileByName(context.Context, string, int64, int64) (beFileReaderInterface, error) hideFile(context.Context, string) (beFileInterface, error) getDownloadAuthorization(context.Context, string, time.Duration) (string, error) @@ -339,6 +340,32 @@ func (b *beBucket) listFileVersions(ctx context.Context, count int, nextName, ne return files, name, id, nil } +func (b *beBucket) listUnfinishedLargeFiles(ctx context.Context, count int, continuation string) ([]beFileInterface, string, error) { + var cont string + var files []beFileInterface + f := func() error { + g := func() error { + fs, c, err := b.b2bucket.listUnfinishedLargeFiles(ctx, count, continuation) + if err != nil { + return err + } + cont = c + for _, f := range fs { + files = append(files, &beFile{ + b2file: f, + ri: b.ri, + }) + } + return nil + } + return withReauth(ctx, b.ri, g) + } + if err := withBackoff(ctx, b.ri, f); err != nil { + return nil, "", err + } + return files, cont, nil +} + func (b *beBucket) downloadFileByName(ctx context.Context, name string, offset, size int64) (beFileReaderInterface, error) { var reader beFileReaderInterface f := func() error { diff --git a/vendor/github.com/kurin/blazer/b2/baseline.go b/vendor/github.com/kurin/blazer/b2/baseline.go index 4a813b8a1..c5b14a064 100644 --- a/vendor/github.com/kurin/blazer/b2/baseline.go +++ b/vendor/github.com/kurin/blazer/b2/baseline.go @@ -46,6 +46,7 @@ type b2BucketInterface interface { startLargeFile(ctx context.Context, name, contentType string, info map[string]string) (b2LargeFileInterface, error) listFileNames(context.Context, int, string, string, string) ([]b2FileInterface, string, error) listFileVersions(context.Context, int, string, string, string, string) ([]b2FileInterface, string, string, error) + listUnfinishedLargeFiles(context.Context, int, string) ([]b2FileInterface, string, error) downloadFileByName(context.Context, string, int64, int64) (b2FileReaderInterface, error) hideFile(context.Context, string) (b2FileInterface, error) getDownloadAuthorization(context.Context, string, time.Duration) (string, error) @@ -314,6 +315,18 @@ func (b *b2Bucket) listFileVersions(ctx context.Context, count int, nextName, ne return files, name, id, nil } +func (b *b2Bucket) listUnfinishedLargeFiles(ctx context.Context, count int, continuation string) ([]b2FileInterface, string, error) { + fs, cont, err := b.b.ListUnfinishedLargeFiles(ctx, count, continuation) + if err != nil { + return nil, "", err + } + var files []b2FileInterface + for _, f := range fs { + files = append(files, &b2File{f}) + } + return files, cont, nil +} + func (b *b2Bucket) downloadFileByName(ctx context.Context, name string, offset, size int64) (b2FileReaderInterface, error) { fr, err := b.b.DownloadFileByName(ctx, name, offset, size) if err != nil { diff --git a/vendor/github.com/kurin/blazer/b2/integration_test.go b/vendor/github.com/kurin/blazer/b2/integration_test.go index 2321e3e90..138c171b0 100644 --- a/vendor/github.com/kurin/blazer/b2/integration_test.go +++ b/vendor/github.com/kurin/blazer/b2/integration_test.go @@ -735,14 +735,21 @@ func TestWriteEmpty(t *testing.T) { type rtCounter struct { rt http.RoundTripper trips int + api string sync.Mutex } func (rt *rtCounter) RoundTrip(r *http.Request) (*http.Response, error) { rt.Lock() defer rt.Unlock() - rt.trips++ - return rt.rt.RoundTrip(r) + resp, err := rt.rt.RoundTrip(r) + if err != nil { + return resp, err + } + if rt.api == "" || r.Header.Get("X-Blazer-Method") == rt.api { + rt.trips++ + } + return resp, nil } func TestAttrsNoRoundtrip(t *testing.T) { @@ -786,6 +793,66 @@ func TestAttrsNoRoundtrip(t *testing.T) { } } +/*func TestAttrsFewRoundtrips(t *testing.T) { + rt := &rtCounter{rt: defaultTransport} + defaultTransport = rt + defer func() { + defaultTransport = rt.rt + }() + + ctx := context.Background() + ctx, cancel := context.WithTimeout(ctx, 10*time.Minute) + defer cancel() + + bucket, done := startLiveTest(ctx, t) + defer done() + + _, _, err := writeFile(ctx, bucket, smallFileName, 42, 1e8) + if err != nil { + t.Fatal(err) + } + + o := bucket.Object(smallFileName) + trips := rt.trips + attrs, err := o.Attrs(ctx) + if err != nil { + t.Fatal(err) + } + if attrs.Name != smallFileName { + t.Errorf("got the wrong object: got %q, want %q", attrs.Name, smallFileName) + } + + if trips != rt.trips { + t.Errorf("Attrs(): too many round trips, got %d, want 1", rt.trips-trips) + } +}*/ + +func TestSmallUploadsFewRoundtrips(t *testing.T) { + rt := &rtCounter{rt: defaultTransport, api: "b2_get_upload_url"} + defaultTransport = rt + defer func() { + defaultTransport = rt.rt + }() + + ctx := context.Background() + ctx, cancel := context.WithTimeout(ctx, 10*time.Minute) + defer cancel() + + bucket, done := startLiveTest(ctx, t) + defer done() + + for i := 0; i < 10; i++ { + _, _, err := writeFile(ctx, bucket, fmt.Sprintf("%s.%d", smallFileName, i), 42, 1e8) + if err != nil { + t.Fatal(err) + } + } + if rt.trips > 3 { + // Pool is not guaranteed to be valid, so 3 calls allows some slack. + t.Errorf("too many calls to b2_get_upload_url: got %d, want < 3", rt.trips) + } +} + func TestDeleteWithoutName(t *testing.T) { ctx := context.Background() ctx, cancel := context.WithTimeout(ctx, 10*time.Minute) @@ -804,6 +871,26 @@ func TestDeleteWithoutName(t *testing.T) { } } +func TestListUnfinishedLargeFiles(t *testing.T) { + ctx := context.Background() + bucket, done := startLiveTest(ctx, t) + defer done() + + w := bucket.Object(largeFileName).NewWriter(ctx) + w.ChunkSize = 1e5 + if _, err := io.Copy(w, io.LimitReader(zReader{}, 1e6)); err != nil { + t.Fatal(err) + } + // Don't close the writer. + fs, _, err := bucket.ListUnfinishedLargeFiles(ctx, 10, nil) + if err != io.EOF && err != nil { + t.Fatal(err) + } + if len(fs) != 1 { + t.Errorf("ListUnfinishedLargeFiles: got %d, want 1", len(fs)) + } +} + type object struct { o *Object err error diff --git a/vendor/github.com/kurin/blazer/b2/readerat.go b/vendor/github.com/kurin/blazer/b2/readerat.go index 5f2d153a0..7ef135232 100644 --- a/vendor/github.com/kurin/blazer/b2/readerat.go +++ b/vendor/github.com/kurin/blazer/b2/readerat.go @@ -33,7 +33,7 @@ func (r *readerAt) ReadAt(p []byte, off int64) (int, error) { if err != nil { return 0, err } - defer func() { r.rs.Seek(cur, io.SeekStart) }() + defer r.rs.Seek(cur, io.SeekStart) if _, err := r.rs.Seek(off, io.SeekStart); err != nil { return 0, err diff --git a/vendor/github.com/kurin/blazer/b2/writer.go b/vendor/github.com/kurin/blazer/b2/writer.go index 5e5c29ed4..9203b6c30 100644 --- a/vendor/github.com/kurin/blazer/b2/writer.go +++ b/vendor/github.com/kurin/blazer/b2/writer.go @@ -144,7 +144,7 @@ func (w *Writer) thread() { } if sha, ok := w.seen[chunk.id]; ok { if sha != chunk.buf.Hash() { - w.setErr(errors.New("resumable upload was requested, but chunks don't match!")) + w.setErr(errors.New("resumable upload was requested, but chunks don't match")) return } chunk.buf.Close() @@ -245,11 +245,23 @@ func (w *Writer) Write(p []byte) (int, error) { return i + k, err } +func (w *Writer) getUploadURL(ctx context.Context) (beURLInterface, error) { + u := w.o.b.urlPool.Get() + if u == nil { + return w.o.b.b.getUploadURL(w.ctx) + } + ue := u.(beURLInterface) + return ue, nil +} + func (w *Writer) simpleWriteFile() error { - ue, err := w.o.b.b.getUploadURL(w.ctx) + ue, err := w.getUploadURL(w.ctx) if err != nil { return err } + // This defer needs to be in a func() so that we put whatever the value of ue + // is at function exit. + defer func() { w.o.b.urlPool.Put(ue) }() sha1 := w.w.Hash() ctype := w.contentType if ctype == "" { diff --git a/vendor/github.com/kurin/blazer/base/base.go b/vendor/github.com/kurin/blazer/base/base.go index 726ea8e37..23d2e7921 100644 --- a/vendor/github.com/kurin/blazer/base/base.go +++ b/vendor/github.com/kurin/blazer/base/base.go @@ -18,7 +18,6 @@ // It currently lacks support for the following APIs: // // b2_download_file_by_id -// b2_list_unfinished_large_files package base import ( @@ -43,7 +42,7 @@ import ( const ( APIBase = "https://api.backblazeb2.com" - DefaultUserAgent = "blazer/0.2.1" + DefaultUserAgent = "blazer/0.2.2" ) type b2err struct { @@ -69,17 +68,15 @@ func Action(err error) ErrAction { if e.retry > 0 { return Retry } - if e.code >= 500 && e.code < 600 { - if e.method == "b2_upload_file" || e.method == "b2_upload_part" { - return AttemptNewUpload - } + if e.code >= 500 && e.code < 600 && (e.method == "b2_upload_file" || e.method == "b2_upload_part") { + return AttemptNewUpload } switch e.code { case 401: - if e.method == "b2_authorize_account" { + switch e.method { + case "b2_authorize_account": return Punt - } - if e.method == "b2_upload_file" || e.method == "b2_upload_part" { + case "b2_upload_file", "b2_upload_part": return AttemptNewUpload } return ReAuthenticate @@ -698,9 +695,9 @@ func (b *Bucket) File(id, name string) *File { } // UploadFile wraps b2_upload_file. -func (u *URL) UploadFile(ctx context.Context, r io.Reader, size int, name, contentType, sha1 string, info map[string]string) (*File, error) { +func (url *URL) UploadFile(ctx context.Context, r io.Reader, size int, name, contentType, sha1 string, info map[string]string) (*File, error) { headers := map[string]string{ - "Authorization": u.token, + "Authorization": url.token, "X-Bz-File-Name": name, "Content-Type": contentType, "Content-Length": fmt.Sprintf("%d", size), @@ -710,7 +707,7 @@ func (u *URL) UploadFile(ctx context.Context, r io.Reader, size int, name, conte headers[fmt.Sprintf("X-Bz-Info-%s", k)] = v } b2resp := &b2types.UploadFileResponse{} - if err := u.b2.opts.makeRequest(ctx, "b2_upload_file", "POST", u.uri, nil, b2resp, headers, &requestBody{body: r, size: int64(size)}); err != nil { + if err := url.b2.opts.makeRequest(ctx, "b2_upload_file", "POST", url.uri, nil, b2resp, headers, &requestBody{body: r, size: int64(size)}); err != nil { return nil, err } return &File{ @@ -719,7 +716,7 @@ func (u *URL) UploadFile(ctx context.Context, r io.Reader, size int, name, conte Timestamp: millitime(b2resp.Timestamp), Status: b2resp.Action, id: b2resp.FileID, - b2: u.b2, + b2: url.b2, }, nil } @@ -924,6 +921,39 @@ func (l *LargeFile) FinishLargeFile(ctx context.Context) (*File, error) { }, nil } +// ListUnfinishedLargeFiles wraps b2_list_unfinished_large_files. +func (b *Bucket) ListUnfinishedLargeFiles(ctx context.Context, count int, continuation string) ([]*File, string, error) { + b2req := &b2types.ListUnfinishedLargeFilesRequest{ + BucketID: b.id, + Continuation: continuation, + Count: count, + } + b2resp := &b2types.ListUnfinishedLargeFilesResponse{} + headers := map[string]string{ + "Authorization": b.b2.authToken, + } + if err := b.b2.opts.makeRequest(ctx, "b2_list_unfinished_large_files", "POST", b.b2.apiURI+b2types.V1api+"b2_list_unfinished_large_files", b2req, b2resp, headers, nil); err != nil { + return nil, "", err + } + cont := b2resp.Continuation + var files []*File + for _, f := range b2resp.Files { + files = append(files, &File{ + Name: f.Name, + Timestamp: millitime(f.Timestamp), + b2: b.b2, + id: f.FileID, + Info: &FileInfo{ + Name: f.Name, + ContentType: f.ContentType, + Info: f.Info, + Timestamp: millitime(f.Timestamp), + }, + }) + } + return files, cont, nil +} + // ListFileNames wraps b2_list_file_names. func (b *Bucket) ListFileNames(ctx context.Context, count int, continuation, prefix, delimiter string) ([]*File, string, error) { b2req := &b2types.ListFileNamesRequest{ diff --git a/vendor/github.com/kurin/blazer/base/integration_test.go b/vendor/github.com/kurin/blazer/base/integration_test.go index ad648f1c4..7e615bca9 100644 --- a/vendor/github.com/kurin/blazer/base/integration_test.go +++ b/vendor/github.com/kurin/blazer/base/integration_test.go @@ -156,8 +156,8 @@ func TestStorage(t *testing.T) { // b2_start_large_file largeInfoMap := map[string]string{ - "one_BILLION": "1e9", - "two_TRILLION": "2eSomething, I guess 2e12", + "one_billion": "1e9", + "two_trillion": "2eSomething, I guess 2e12", } lf, err := bucket.StartLargeFile(ctx, largeFileName, "application/octet-stream", largeInfoMap) if err != nil { diff --git a/vendor/github.com/kurin/blazer/internal/b2types/b2types.go b/vendor/github.com/kurin/blazer/internal/b2types/b2types.go index df34feeda..4730110b3 100644 --- a/vendor/github.com/kurin/blazer/internal/b2types/b2types.go +++ b/vendor/github.com/kurin/blazer/internal/b2types/b2types.go @@ -227,3 +227,14 @@ type GetDownloadAuthorizationResponse struct { Prefix string `json:"fileNamePrefix"` Token string `json:"authorizationToken"` } + +type ListUnfinishedLargeFilesRequest struct { + BucketID string `json:"bucketId"` + Continuation string `json:"startFileId,omitempty"` + Count int `json:"maxFileCount,omitempty"` +} + +type ListUnfinishedLargeFilesResponse struct { + Files []GetFileInfoResponse `json:"files"` + Continuation string `json:"nextFileId"` +}