From 296769355d266ae69c6c39cc35a88f6695594d97 Mon Sep 17 00:00:00 2001 From: Alexander Neumann Date: Tue, 20 Feb 2018 21:01:21 +0100 Subject: [PATCH] Update github.com/kurin/blazer to 0.3.0 This commit will reduce the number of HTTP requests per file uploaded from two to one. --- Gopkg.lock | 4 +- vendor/github.com/kurin/blazer/b2/b2.go | 89 ++++++++++++++++--- vendor/github.com/kurin/blazer/b2/baseline.go | 4 +- .../kurin/blazer/b2/integration_test.go | 39 ++++---- vendor/github.com/kurin/blazer/b2/monitor.go | 14 ++- vendor/github.com/kurin/blazer/b2/writer.go | 8 +- vendor/github.com/kurin/blazer/base/base.go | 5 +- 7 files changed, 119 insertions(+), 44 deletions(-) diff --git a/Gopkg.lock b/Gopkg.lock index ed7feabd4..54a3f7a1f 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -88,8 +88,8 @@ [[projects]] name = "github.com/kurin/blazer" packages = ["b2","base","internal/b2types","internal/blog"] - revision = "5b348b2bdb078b06baa46ab7e12cdff12ee028ab" - version = "v0.2.2" + revision = "cd0304efa98725679cf68422cefa328d3d96f2f4" + version = "v0.3.0" [[projects]] name = "github.com/marstr/guid" diff --git a/vendor/github.com/kurin/blazer/b2/b2.go b/vendor/github.com/kurin/blazer/b2/b2.go index 723b0c377..075b259c8 100644 --- a/vendor/github.com/kurin/blazer/b2/b2.go +++ b/vendor/github.com/kurin/blazer/b2/b2.go @@ -45,6 +45,7 @@ type Client struct { slock sync.Mutex sWriters map[string]*Writer sReaders map[string]*Reader + sMethods map[string]int } // NewClient creates and returns a new Client with valid B2 service account @@ -54,7 +55,9 @@ func NewClient(ctx context.Context, account, key string, opts ...ClientOption) ( backend: &beRoot{ b2i: &b2Root{}, }, + sMethods: make(map[string]int), } + opts = append(opts, client(c)) if err := c.backend.authorizeAccount(ctx, account, key, opts...); err != nil { return nil, err } @@ -62,6 +65,7 @@ func NewClient(ctx context.Context, account, key string, opts ...ClientOption) ( } type clientOptions struct { + client *Client transport http.RoundTripper failSomeUploads bool expireTokens bool @@ -115,13 +119,38 @@ func ForceCapExceeded() ClientOption { } } +func client(cl *Client) ClientOption { + return func(c *clientOptions) { + c.client = cl + } +} + +type clientTransport struct { + client *Client + rt http.RoundTripper +} + +func (ct *clientTransport) RoundTrip(r *http.Request) (*http.Response, error) { + method := r.Header.Get("X-Blazer-Method") + if method != "" && ct.client != nil { + ct.client.slock.Lock() + ct.client.sMethods[method]++ + ct.client.slock.Unlock() + } + t := ct.rt + if t == nil { + t = http.DefaultTransport + } + return t.RoundTrip(r) +} + // Bucket is a reference to a B2 bucket. type Bucket struct { b beBucketInterface r beRootInterface c *Client - urlPool sync.Pool + urlPool *urlPool } type BucketType string @@ -189,6 +218,36 @@ func IsNotExist(err error) bool { return berr.notFoundErr } +const uploadURLPoolSize = 100 + +type urlPool struct { + ch chan beURLInterface +} + +func newURLPool() *urlPool { + return &urlPool{ch: make(chan beURLInterface, uploadURLPoolSize)} +} + +func (p *urlPool) get() beURLInterface { + select { + case ue := <-p.ch: + // if the channel has an upload URL available, use that + return ue + default: + // otherwise return nil, a new upload URL needs to be generated + return nil + } +} + +func (p *urlPool) put(u beURLInterface) { + select { + case p.ch <- u: + // put the URL back if possible + default: + // if the channel is full, throw it away + } +} + // Bucket returns a bucket if it exists. func (c *Client) Bucket(ctx context.Context, name string) (*Bucket, error) { buckets, err := c.backend.listBuckets(ctx) @@ -198,9 +257,10 @@ func (c *Client) Bucket(ctx context.Context, name string) (*Bucket, error) { for _, bucket := range buckets { if bucket.name() == name { return &Bucket{ - b: bucket, - r: c.backend, - c: c, + b: bucket, + r: c.backend, + c: c, + urlPool: newURLPool(), }, nil } } @@ -221,9 +281,10 @@ func (c *Client) NewBucket(ctx context.Context, name string, attrs *BucketAttrs) for _, bucket := range buckets { if bucket.name() == name { return &Bucket{ - b: bucket, - r: c.backend, - c: c, + b: bucket, + r: c.backend, + c: c, + urlPool: newURLPool(), }, nil } } @@ -235,9 +296,10 @@ func (c *Client) NewBucket(ctx context.Context, name string, attrs *BucketAttrs) return nil, err } return &Bucket{ - b: b, - r: c.backend, - c: c, + b: b, + r: c.backend, + c: c, + urlPool: newURLPool(), }, err } @@ -250,9 +312,10 @@ func (c *Client) ListBuckets(ctx context.Context) ([]*Bucket, error) { var buckets []*Bucket for _, b := range bs { buckets = append(buckets, &Bucket{ - b: b, - r: c.backend, - c: c, + b: b, + r: c.backend, + c: c, + urlPool: newURLPool(), }) } return buckets, nil diff --git a/vendor/github.com/kurin/blazer/b2/baseline.go b/vendor/github.com/kurin/blazer/b2/baseline.go index c5b14a064..ffbe8d5ec 100644 --- a/vendor/github.com/kurin/blazer/b2/baseline.go +++ b/vendor/github.com/kurin/blazer/b2/baseline.go @@ -138,9 +138,11 @@ func (b *b2Root) authorizeAccount(ctx context.Context, account, key string, opts f(c) } var aopts []base.AuthOption + ct := &clientTransport{client: c.client} if c.transport != nil { - aopts = append(aopts, base.Transport(c.transport)) + ct.rt = c.transport } + aopts = append(aopts, base.Transport(ct)) if c.failSomeUploads { aopts = append(aopts, base.FailSomeUploads()) } diff --git a/vendor/github.com/kurin/blazer/b2/integration_test.go b/vendor/github.com/kurin/blazer/b2/integration_test.go index 138c171b0..e172cce0e 100644 --- a/vendor/github.com/kurin/blazer/b2/integration_test.go +++ b/vendor/github.com/kurin/blazer/b2/integration_test.go @@ -17,7 +17,9 @@ package b2 import ( "bytes" "context" + "crypto/rand" "crypto/sha1" + "encoding/hex" "fmt" "io" "net/http" @@ -735,21 +737,14 @@ func TestWriteEmpty(t *testing.T) { type rtCounter struct { rt http.RoundTripper trips int - api string sync.Mutex } func (rt *rtCounter) RoundTrip(r *http.Request) (*http.Response, error) { rt.Lock() defer rt.Unlock() - resp, err := rt.rt.RoundTrip(r) - if err != nil { - return resp, err - } - if rt.api == "" || r.Header.Get("X-Blazer-Method") == rt.api { - rt.trips++ - } - return resp, nil + rt.trips++ + return rt.rt.RoundTrip(r) } func TestAttrsNoRoundtrip(t *testing.T) { @@ -828,12 +823,6 @@ func TestAttrsNoRoundtrip(t *testing.T) { }*/ func TestSmallUploadsFewRoundtrips(t *testing.T) { - rt := &rtCounter{rt: defaultTransport, api: "b2_get_upload_url"} - defaultTransport = rt - defer func() { - defaultTransport = rt.rt - }() - ctx := context.Background() ctx, cancel := context.WithTimeout(ctx, 10*time.Minute) defer cancel() @@ -847,9 +836,11 @@ func TestSmallUploadsFewRoundtrips(t *testing.T) { t.Fatal(err) } } - if rt.trips > 3 { - // Pool is not guaranteed to be valid, so 3 calls allows some slack. - t.Errorf("too many calls to b2_get_upload_url: got %d, want < 3", rt.trips) + si := bucket.c.Status() + getURL := si.MethodCalls["b2_get_upload_url"] + uploadFile := si.MethodCalls["b2_upload_file"] + if getURL >= uploadFile { + t.Errorf("too many calls to b2_get_upload_url") } } @@ -1001,6 +992,16 @@ func (cc *ccRC) Close() error { return cc.ReadCloser.Close() } +var uniq string + +func init() { + b := make([]byte, 4) + if _, err := rand.Read(b); err != nil { + panic(err) + } + uniq = hex.EncodeToString(b) +} + func startLiveTest(ctx context.Context, t *testing.T) (*Bucket, func()) { id := os.Getenv(apiID) key := os.Getenv(apiKey) @@ -1016,7 +1017,7 @@ func startLiveTest(ctx context.Context, t *testing.T) (*Bucket, func()) { t.Fatal(err) return nil, nil } - bucket, err := client.NewBucket(ctx, id+"-"+bucketName, nil) + bucket, err := client.NewBucket(ctx, fmt.Sprintf("%s-%s-%s", id, bucketName, uniq), nil) if err != nil { t.Fatal(err) return nil, nil diff --git a/vendor/github.com/kurin/blazer/b2/monitor.go b/vendor/github.com/kurin/blazer/b2/monitor.go index ab93de515..71d619b5f 100644 --- a/vendor/github.com/kurin/blazer/b2/monitor.go +++ b/vendor/github.com/kurin/blazer/b2/monitor.go @@ -18,8 +18,9 @@ import "fmt" // StatusInfo reports information about a client. type StatusInfo struct { - Writers map[string]*WriterStatus - Readers map[string]*ReaderStatus + Writers map[string]*WriterStatus + Readers map[string]*ReaderStatus + MethodCalls map[string]int } // WriterStatus reports the status for each writer. @@ -42,8 +43,9 @@ func (c *Client) Status() *StatusInfo { defer c.slock.Unlock() si := &StatusInfo{ - Writers: make(map[string]*WriterStatus), - Readers: make(map[string]*ReaderStatus), + Writers: make(map[string]*WriterStatus), + Readers: make(map[string]*ReaderStatus), + MethodCalls: make(map[string]int), } for name, w := range c.sWriters { @@ -54,6 +56,10 @@ func (c *Client) Status() *StatusInfo { si.Readers[name] = r.status() } + for name, n := range c.sMethods { + si.MethodCalls[name] = n + } + return si } diff --git a/vendor/github.com/kurin/blazer/b2/writer.go b/vendor/github.com/kurin/blazer/b2/writer.go index 9203b6c30..becfc5844 100644 --- a/vendor/github.com/kurin/blazer/b2/writer.go +++ b/vendor/github.com/kurin/blazer/b2/writer.go @@ -246,12 +246,12 @@ func (w *Writer) Write(p []byte) (int, error) { } func (w *Writer) getUploadURL(ctx context.Context) (beURLInterface, error) { - u := w.o.b.urlPool.Get() + u := w.o.b.urlPool.get() if u == nil { return w.o.b.b.getUploadURL(w.ctx) } - ue := u.(beURLInterface) - return ue, nil + + return u, nil } func (w *Writer) simpleWriteFile() error { @@ -261,7 +261,7 @@ func (w *Writer) simpleWriteFile() error { } // This defer needs to be in a func() so that we put whatever the value of ue // is at function exit. - defer func() { w.o.b.urlPool.Put(ue) }() + defer func() { w.o.b.urlPool.put(ue) }() sha1 := w.w.Hash() ctype := w.contentType if ctype == "" { diff --git a/vendor/github.com/kurin/blazer/base/base.go b/vendor/github.com/kurin/blazer/base/base.go index 23d2e7921..9e5205c70 100644 --- a/vendor/github.com/kurin/blazer/base/base.go +++ b/vendor/github.com/kurin/blazer/base/base.go @@ -42,7 +42,7 @@ import ( const ( APIBase = "https://api.backblazeb2.com" - DefaultUserAgent = "blazer/0.2.2" + DefaultUserAgent = "blazer/0.3.0" ) type b2err struct { @@ -903,6 +903,9 @@ func (l *LargeFile) FinishLargeFile(ctx context.Context) (*File, error) { } b2resp := &b2types.FinishLargeFileResponse{} for k, v := range l.hashes { + if len(b2req.Hashes) < k { + return nil, fmt.Errorf("b2_finish_large_file: invalid index %d", k) + } b2req.Hashes[k-1] = v } headers := map[string]string{