2
2
mirror of https://github.com/octoleo/restic.git synced 2024-12-27 04:32:40 +00:00

Merge pull request #1634 from restic/update-blazer

Update github.com/kurin/blazer to 0.3.0
This commit is contained in:
Alexander Neumann 2018-02-20 22:01:30 +01:00
commit 1a5d7a9965
7 changed files with 119 additions and 44 deletions

4
Gopkg.lock generated
View File

@ -88,8 +88,8 @@
[[projects]]
name = "github.com/kurin/blazer"
packages = ["b2","base","internal/b2types","internal/blog"]
revision = "5b348b2bdb078b06baa46ab7e12cdff12ee028ab"
version = "v0.2.2"
revision = "cd0304efa98725679cf68422cefa328d3d96f2f4"
version = "v0.3.0"
[[projects]]
name = "github.com/marstr/guid"

View File

@ -45,6 +45,7 @@ type Client struct {
slock sync.Mutex
sWriters map[string]*Writer
sReaders map[string]*Reader
sMethods map[string]int
}
// NewClient creates and returns a new Client with valid B2 service account
@ -54,7 +55,9 @@ func NewClient(ctx context.Context, account, key string, opts ...ClientOption) (
backend: &beRoot{
b2i: &b2Root{},
},
sMethods: make(map[string]int),
}
opts = append(opts, client(c))
if err := c.backend.authorizeAccount(ctx, account, key, opts...); err != nil {
return nil, err
}
@ -62,6 +65,7 @@ func NewClient(ctx context.Context, account, key string, opts ...ClientOption) (
}
type clientOptions struct {
client *Client
transport http.RoundTripper
failSomeUploads bool
expireTokens bool
@ -115,13 +119,38 @@ func ForceCapExceeded() ClientOption {
}
}
func client(cl *Client) ClientOption {
return func(c *clientOptions) {
c.client = cl
}
}
type clientTransport struct {
client *Client
rt http.RoundTripper
}
func (ct *clientTransport) RoundTrip(r *http.Request) (*http.Response, error) {
method := r.Header.Get("X-Blazer-Method")
if method != "" && ct.client != nil {
ct.client.slock.Lock()
ct.client.sMethods[method]++
ct.client.slock.Unlock()
}
t := ct.rt
if t == nil {
t = http.DefaultTransport
}
return t.RoundTrip(r)
}
// Bucket is a reference to a B2 bucket.
type Bucket struct {
b beBucketInterface
r beRootInterface
c *Client
urlPool sync.Pool
urlPool *urlPool
}
type BucketType string
@ -189,6 +218,36 @@ func IsNotExist(err error) bool {
return berr.notFoundErr
}
const uploadURLPoolSize = 100
type urlPool struct {
ch chan beURLInterface
}
func newURLPool() *urlPool {
return &urlPool{ch: make(chan beURLInterface, uploadURLPoolSize)}
}
func (p *urlPool) get() beURLInterface {
select {
case ue := <-p.ch:
// if the channel has an upload URL available, use that
return ue
default:
// otherwise return nil, a new upload URL needs to be generated
return nil
}
}
func (p *urlPool) put(u beURLInterface) {
select {
case p.ch <- u:
// put the URL back if possible
default:
// if the channel is full, throw it away
}
}
// Bucket returns a bucket if it exists.
func (c *Client) Bucket(ctx context.Context, name string) (*Bucket, error) {
buckets, err := c.backend.listBuckets(ctx)
@ -198,9 +257,10 @@ func (c *Client) Bucket(ctx context.Context, name string) (*Bucket, error) {
for _, bucket := range buckets {
if bucket.name() == name {
return &Bucket{
b: bucket,
r: c.backend,
c: c,
b: bucket,
r: c.backend,
c: c,
urlPool: newURLPool(),
}, nil
}
}
@ -221,9 +281,10 @@ func (c *Client) NewBucket(ctx context.Context, name string, attrs *BucketAttrs)
for _, bucket := range buckets {
if bucket.name() == name {
return &Bucket{
b: bucket,
r: c.backend,
c: c,
b: bucket,
r: c.backend,
c: c,
urlPool: newURLPool(),
}, nil
}
}
@ -235,9 +296,10 @@ func (c *Client) NewBucket(ctx context.Context, name string, attrs *BucketAttrs)
return nil, err
}
return &Bucket{
b: b,
r: c.backend,
c: c,
b: b,
r: c.backend,
c: c,
urlPool: newURLPool(),
}, err
}
@ -250,9 +312,10 @@ func (c *Client) ListBuckets(ctx context.Context) ([]*Bucket, error) {
var buckets []*Bucket
for _, b := range bs {
buckets = append(buckets, &Bucket{
b: b,
r: c.backend,
c: c,
b: b,
r: c.backend,
c: c,
urlPool: newURLPool(),
})
}
return buckets, nil

View File

@ -138,9 +138,11 @@ func (b *b2Root) authorizeAccount(ctx context.Context, account, key string, opts
f(c)
}
var aopts []base.AuthOption
ct := &clientTransport{client: c.client}
if c.transport != nil {
aopts = append(aopts, base.Transport(c.transport))
ct.rt = c.transport
}
aopts = append(aopts, base.Transport(ct))
if c.failSomeUploads {
aopts = append(aopts, base.FailSomeUploads())
}

View File

@ -17,7 +17,9 @@ package b2
import (
"bytes"
"context"
"crypto/rand"
"crypto/sha1"
"encoding/hex"
"fmt"
"io"
"net/http"
@ -735,21 +737,14 @@ func TestWriteEmpty(t *testing.T) {
type rtCounter struct {
rt http.RoundTripper
trips int
api string
sync.Mutex
}
func (rt *rtCounter) RoundTrip(r *http.Request) (*http.Response, error) {
rt.Lock()
defer rt.Unlock()
resp, err := rt.rt.RoundTrip(r)
if err != nil {
return resp, err
}
if rt.api == "" || r.Header.Get("X-Blazer-Method") == rt.api {
rt.trips++
}
return resp, nil
rt.trips++
return rt.rt.RoundTrip(r)
}
func TestAttrsNoRoundtrip(t *testing.T) {
@ -828,12 +823,6 @@ func TestAttrsNoRoundtrip(t *testing.T) {
}*/
func TestSmallUploadsFewRoundtrips(t *testing.T) {
rt := &rtCounter{rt: defaultTransport, api: "b2_get_upload_url"}
defaultTransport = rt
defer func() {
defaultTransport = rt.rt
}()
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 10*time.Minute)
defer cancel()
@ -847,9 +836,11 @@ func TestSmallUploadsFewRoundtrips(t *testing.T) {
t.Fatal(err)
}
}
if rt.trips > 3 {
// Pool is not guaranteed to be valid, so 3 calls allows some slack.
t.Errorf("too many calls to b2_get_upload_url: got %d, want < 3", rt.trips)
si := bucket.c.Status()
getURL := si.MethodCalls["b2_get_upload_url"]
uploadFile := si.MethodCalls["b2_upload_file"]
if getURL >= uploadFile {
t.Errorf("too many calls to b2_get_upload_url")
}
}
@ -1001,6 +992,16 @@ func (cc *ccRC) Close() error {
return cc.ReadCloser.Close()
}
var uniq string
func init() {
b := make([]byte, 4)
if _, err := rand.Read(b); err != nil {
panic(err)
}
uniq = hex.EncodeToString(b)
}
func startLiveTest(ctx context.Context, t *testing.T) (*Bucket, func()) {
id := os.Getenv(apiID)
key := os.Getenv(apiKey)
@ -1016,7 +1017,7 @@ func startLiveTest(ctx context.Context, t *testing.T) (*Bucket, func()) {
t.Fatal(err)
return nil, nil
}
bucket, err := client.NewBucket(ctx, id+"-"+bucketName, nil)
bucket, err := client.NewBucket(ctx, fmt.Sprintf("%s-%s-%s", id, bucketName, uniq), nil)
if err != nil {
t.Fatal(err)
return nil, nil

View File

@ -18,8 +18,9 @@ import "fmt"
// StatusInfo reports information about a client.
type StatusInfo struct {
Writers map[string]*WriterStatus
Readers map[string]*ReaderStatus
Writers map[string]*WriterStatus
Readers map[string]*ReaderStatus
MethodCalls map[string]int
}
// WriterStatus reports the status for each writer.
@ -42,8 +43,9 @@ func (c *Client) Status() *StatusInfo {
defer c.slock.Unlock()
si := &StatusInfo{
Writers: make(map[string]*WriterStatus),
Readers: make(map[string]*ReaderStatus),
Writers: make(map[string]*WriterStatus),
Readers: make(map[string]*ReaderStatus),
MethodCalls: make(map[string]int),
}
for name, w := range c.sWriters {
@ -54,6 +56,10 @@ func (c *Client) Status() *StatusInfo {
si.Readers[name] = r.status()
}
for name, n := range c.sMethods {
si.MethodCalls[name] = n
}
return si
}

View File

@ -246,12 +246,12 @@ func (w *Writer) Write(p []byte) (int, error) {
}
func (w *Writer) getUploadURL(ctx context.Context) (beURLInterface, error) {
u := w.o.b.urlPool.Get()
u := w.o.b.urlPool.get()
if u == nil {
return w.o.b.b.getUploadURL(w.ctx)
}
ue := u.(beURLInterface)
return ue, nil
return u, nil
}
func (w *Writer) simpleWriteFile() error {
@ -261,7 +261,7 @@ func (w *Writer) simpleWriteFile() error {
}
// This defer needs to be in a func() so that we put whatever the value of ue
// is at function exit.
defer func() { w.o.b.urlPool.Put(ue) }()
defer func() { w.o.b.urlPool.put(ue) }()
sha1 := w.w.Hash()
ctype := w.contentType
if ctype == "" {

View File

@ -42,7 +42,7 @@ import (
const (
APIBase = "https://api.backblazeb2.com"
DefaultUserAgent = "blazer/0.2.2"
DefaultUserAgent = "blazer/0.3.0"
)
type b2err struct {
@ -903,6 +903,9 @@ func (l *LargeFile) FinishLargeFile(ctx context.Context) (*File, error) {
}
b2resp := &b2types.FinishLargeFileResponse{}
for k, v := range l.hashes {
if len(b2req.Hashes) < k {
return nil, fmt.Errorf("b2_finish_large_file: invalid index %d", k)
}
b2req.Hashes[k-1] = v
}
headers := map[string]string{