From b7bed406b945a16aef6998e4a4f3ecac56a43873 Mon Sep 17 00:00:00 2001 From: Alexander Neumann Date: Fri, 25 May 2018 19:16:26 +0200 Subject: [PATCH] Update github.com/kurin/blazer --- Gopkg.lock | 4 +- vendor/github.com/kurin/blazer/README.md | 17 +- vendor/github.com/kurin/blazer/b2/b2.go | 10 +- .../kurin/blazer/b2/integration_test.go | 147 ++++++-------- vendor/github.com/kurin/blazer/b2/iterator.go | 183 ++++++++++++++++++ vendor/github.com/kurin/blazer/base/base.go | 2 +- .../blazer/internal/bin/cleanup/cleanup.go | 42 ++-- .../blazer/x/consistent/consistent_test.go | 37 +--- .../kurin/blazer/x/window/window.go | 27 ++- .../kurin/blazer/x/window/window_test.go | 15 ++ 10 files changed, 320 insertions(+), 164 deletions(-) create mode 100644 vendor/github.com/kurin/blazer/b2/iterator.go diff --git a/Gopkg.lock b/Gopkg.lock index 2f02be69b..8ff9cc2bf 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -94,8 +94,8 @@ [[projects]] name = "github.com/kurin/blazer" packages = ["b2","base","internal/b2assets","internal/b2types","internal/blog","x/window"] - revision = "b7c9cf27cae3aec98c2caaeb5181608bfe05b17c" - version = "v0.3.1" + revision = "3c18ed98a4120a440c8f45d8fbf41d414612a501" + version = "v0.4.2" [[projects]] name = "github.com/marstr/guid" diff --git a/vendor/github.com/kurin/blazer/README.md b/vendor/github.com/kurin/blazer/README.md index b9b49f379..35095536e 100644 --- a/vendor/github.com/kurin/blazer/README.md +++ b/vendor/github.com/kurin/blazer/README.md @@ -97,20 +97,11 @@ func downloadFile(ctx context.Context, bucket *b2.Bucket, downloads int, src, ds ```go func printObjects(ctx context.Context, bucket *b2.Bucket) error { - var cur *b2.Cursor - for { - objs, c, err := bucket.ListObjects(ctx, 1000, cur) - if err != nil && err != io.EOF { - return err - } - for _, obj := range objs { - fmt.Println(obj) - } - if err == io.EOF { - return - } - cur = c + iterator := bucket.List() + for iterator.Next() { + fmt.Println(itrator.Object()) } + return iterator.Err() } ``` diff --git a/vendor/github.com/kurin/blazer/b2/b2.go b/vendor/github.com/kurin/blazer/b2/b2.go index 8ec5b6d4f..cd83b858e 100644 --- a/vendor/github.com/kurin/blazer/b2/b2.go +++ b/vendor/github.com/kurin/blazer/b2/b2.go @@ -501,7 +501,7 @@ const ( Hider // Folder is a special state given to non-objects that are returned during a - // List*Objects call with a non-empty Delimiter. + // List call with a ListDelimiter option. Folder ) @@ -574,6 +574,8 @@ func (o *Object) Delete(ctx context.Context) error { } // Cursor is passed to ListObjects to return subsequent pages. +// +// DEPRECATED. Will be removed in a future release. type Cursor struct { // Prefix limits the listed objects to those that begin with this string. Prefix string @@ -602,6 +604,8 @@ type Cursor struct { // // ListObjects will return io.EOF when there are no objects left in the bucket, // however it may do so concurrently with the last objects. +// +// DEPRECATED. Will be removed in a future release. func (b *Bucket) ListObjects(ctx context.Context, count int, c *Cursor) ([]*Object, *Cursor, error) { if c == nil { c = &Cursor{} @@ -636,6 +640,8 @@ func (b *Bucket) ListObjects(ctx context.Context, count int, c *Cursor) ([]*Obje // ListCurrentObjects is similar to ListObjects, except that it returns only // current, unhidden objects in the bucket. +// +// DEPRECATED. Will be removed in a future release. func (b *Bucket) ListCurrentObjects(ctx context.Context, count int, c *Cursor) ([]*Object, *Cursor, error) { if c == nil { c = &Cursor{} @@ -669,6 +675,8 @@ func (b *Bucket) ListCurrentObjects(ctx context.Context, count int, c *Cursor) ( // ListUnfinishedLargeFiles lists any objects that correspond to large file uploads that haven't been completed. // This can happen for example when an upload is interrupted. +// +// DEPRECATED. Will be removed in a future release. func (b *Bucket) ListUnfinishedLargeFiles(ctx context.Context, count int, c *Cursor) ([]*Object, *Cursor, error) { if c == nil { c = &Cursor{} diff --git a/vendor/github.com/kurin/blazer/b2/integration_test.go b/vendor/github.com/kurin/blazer/b2/integration_test.go index 18930e8a1..a428fce74 100644 --- a/vendor/github.com/kurin/blazer/b2/integration_test.go +++ b/vendor/github.com/kurin/blazer/b2/integration_test.go @@ -64,21 +64,14 @@ func TestReadWriteLive(t *testing.T) { t.Error(err) } - var cur *Cursor - for { - objs, c, err := bucket.ListObjects(ctx, 100, cur) - if err != nil && err != io.EOF { - t.Fatal(err) + iter := bucket.List(ListHidden()) + for iter.Next(ctx) { + if err := iter.Object().Delete(ctx); err != nil { + t.Error(err) } - for _, o := range objs { - if err := o.Delete(ctx); err != nil { - t.Error(err) - } - } - if err == io.EOF { - break - } - cur = c + } + if err := iter.Err(); err != nil { + t.Error(err) } } @@ -175,7 +168,7 @@ func TestHideShowLive(t *testing.T) { t.Fatal(err) } - got, err := countObjects(ctx, bucket.ListCurrentObjects) + got, err := countObjects(ctx, bucket.List()) if err != nil { t.Error(err) } @@ -193,7 +186,7 @@ func TestHideShowLive(t *testing.T) { t.Fatal(err) } - got, err = countObjects(ctx, bucket.ListCurrentObjects) + got, err = countObjects(ctx, bucket.List()) if err != nil { t.Error(err) } @@ -207,7 +200,7 @@ func TestHideShowLive(t *testing.T) { } // count see the object again - got, err = countObjects(ctx, bucket.ListCurrentObjects) + got, err = countObjects(ctx, bucket.List()) if err != nil { t.Error(err) } @@ -542,33 +535,37 @@ func TestListObjectsWithPrefix(t *testing.T) { t.Fatal(err) } - // This is kind of a hack, but - type lfun func(context.Context, int, *Cursor) ([]*Object, *Cursor, error) + table := []struct { + opts []ListOption + }{ + { + opts: []ListOption{ + ListPrefix("baz/"), + }, + }, + { + opts: []ListOption{ + ListPrefix("baz/"), + ListHidden(), + }, + }, + } - for _, f := range []lfun{bucket.ListObjects, bucket.ListCurrentObjects} { - c := &Cursor{ - Prefix: "baz/", - } + for _, entry := range table { + iter := bucket.List(entry.opts...) var res []string - for { - objs, cur, err := f(ctx, 10, c) - if err != nil && err != io.EOF { - t.Fatalf("bucket.ListObjects: %v", err) + for iter.Next(ctx) { + o := iter.Object() + attrs, err := o.Attrs(ctx) + if err != nil { + t.Errorf("(%v).Attrs: %v", o, err) + continue } - for _, o := range objs { - attrs, err := o.Attrs(ctx) - if err != nil { - t.Errorf("(%v).Attrs: %v", o, err) - continue - } - res = append(res, attrs.Name) - } - if err == io.EOF { - break - } - c = cur + res = append(res, attrs.Name) + } + if iter.Err() != nil { + t.Errorf("iter.Err(): %v", iter.Err()) } - want := []string{"baz/bar"} if !reflect.DeepEqual(res, want) { t.Errorf("got %v, want %v", res, want) @@ -746,19 +743,15 @@ func TestAttrsNoRoundtrip(t *testing.T) { t.Fatal(err) } - objs, _, err := bucket.ListObjects(ctx, 1, nil) - if err != nil { - t.Fatal(err) - } - if len(objs) != 1 { - t.Fatalf("unexpected objects: got %d, want 1", len(objs)) - } + iter := bucket.List() + iter.Next(ctx) + obj := iter.Object() var trips int for range bucket.c.Status().table()["1m"] { - trips += 1 + trips++ } - attrs, err := objs[0].Attrs(ctx) + attrs, err := obj.Attrs(ctx) if err != nil { t.Fatal(err) } @@ -768,7 +761,7 @@ func TestAttrsNoRoundtrip(t *testing.T) { var newTrips int for range bucket.c.Status().table()["1m"] { - newTrips += 1 + newTrips++ } if trips != newTrips { t.Errorf("Attrs() should not have caused any net traffic, but it did: old %d, new %d", trips, newTrips) @@ -859,13 +852,9 @@ func TestListUnfinishedLargeFiles(t *testing.T) { if _, err := io.Copy(w, io.LimitReader(zReader{}, 1e6)); err != nil { t.Fatal(err) } - // Don't close the writer. - fs, _, err := bucket.ListUnfinishedLargeFiles(ctx, 10, nil) - if err != io.EOF && err != nil { - t.Fatal(err) - } - if len(fs) != 1 { - t.Errorf("ListUnfinishedLargeFiles: got %d, want 1", len(fs)) + iter := bucket.List(ListUnfinished()) + if !iter.Next(ctx) { + t.Errorf("ListUnfinishedLargeFiles: got none, want 1 (error %v)", iter.Err()) } } @@ -905,39 +894,12 @@ type object struct { err error } -func countObjects(ctx context.Context, f func(context.Context, int, *Cursor) ([]*Object, *Cursor, error)) (int, error) { +func countObjects(ctx context.Context, iter *ObjectIterator) (int, error) { var got int - ch := listObjects(ctx, f) - for c := range ch { - if c.err != nil { - return 0, c.err - } + for iter.Next(ctx) { got++ } - return got, nil -} - -func listObjects(ctx context.Context, f func(context.Context, int, *Cursor) ([]*Object, *Cursor, error)) <-chan object { - ch := make(chan object) - go func() { - defer close(ch) - var cur *Cursor - for { - objs, c, err := f(ctx, 100, cur) - if err != nil && err != io.EOF { - ch <- object{err: err} - return - } - for _, o := range objs { - ch <- object{o: o} - } - if err == io.EOF { - return - } - cur = c - } - }() - return ch + return got, iter.Err() } var defaultTransport = http.DefaultTransport @@ -1042,14 +1004,15 @@ func startLiveTest(ctx context.Context, t *testing.T) (*Bucket, func()) { } f := func() { defer ccport.done() - for c := range listObjects(ctx, bucket.ListObjects) { - if c.err != nil { - continue - } - if err := c.o.Delete(ctx); err != nil { + iter := bucket.List(ListHidden()) + for iter.Next(ctx) { + if err := iter.Object().Delete(ctx); err != nil { t.Error(err) } } + if err := iter.Err(); err != nil && !IsNotExist(err) { + t.Errorf("%#v", err) + } if err := bucket.Delete(ctx); err != nil && !IsNotExist(err) { t.Error(err) } diff --git a/vendor/github.com/kurin/blazer/b2/iterator.go b/vendor/github.com/kurin/blazer/b2/iterator.go new file mode 100644 index 000000000..48927e05f --- /dev/null +++ b/vendor/github.com/kurin/blazer/b2/iterator.go @@ -0,0 +1,183 @@ +// Copyright 2018, Google +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package b2 + +import ( + "context" + "io" + "sync" +) + +// List returns an iterator for selecting objects in a bucket. The default +// behavior, with no options, is to list all currently un-hidden objects. +func (b *Bucket) List(opts ...ListOption) *ObjectIterator { + o := &ObjectIterator{ + bucket: b, + } + for _, opt := range opts { + opt(&o.opts) + } + return o +} + +// ObjectIterator abtracts away the tricky bits of iterating over a bucket's +// contents. +// +// It is intended to be called in a loop: +// for iter.Next(ctx) { +// obj := iter.Object() +// // act on obj +// } +// if err := iter.Err(); err != nil { +// // handle err +// } +type ObjectIterator struct { + bucket *Bucket + final bool + err error + idx int + c *Cursor + opts objectIteratorOptions + objs []*Object + init sync.Once + l lister + count int +} + +type lister func(context.Context, int, *Cursor) ([]*Object, *Cursor, error) + +func (o *ObjectIterator) frame(ctx context.Context) error { + objs, c, err := o.l(ctx, o.count, o.c) + if err != nil && err != io.EOF { + if bNotExist.MatchString(err.Error()) { + return b2err{ + err: err, + notFoundErr: true, + } + } + return err + } + o.c = c + o.objs = objs + o.idx = 0 + if err == io.EOF { + o.final = true + } + return nil +} + +// Next advances the iterator to the next object. It should be called before +// any calls to Object(). If Next returns true, then the next call to Object() +// will be valid. Once Next returns false, it is important to check the return +// value of Err(). +func (o *ObjectIterator) Next(ctx context.Context) bool { + o.init.Do(func() { + o.count = 1000 + switch { + case o.opts.unfinished: + o.l = o.bucket.ListUnfinishedLargeFiles + o.count = 100 + case o.opts.hidden: + o.l = o.bucket.ListObjects + default: + o.l = o.bucket.ListCurrentObjects + } + o.c = &Cursor{ + Prefix: o.opts.prefix, + Delimiter: o.opts.delimiter, + } + }) + if o.err != nil { + return false + } + if o.idx >= len(o.objs) { + if o.final { + o.err = io.EOF + return false + } + if err := o.frame(ctx); err != nil { + o.err = err + return false + } + return o.Next(ctx) + } + o.idx++ + return true +} + +// Object returns the current object. +func (o *ObjectIterator) Object() *Object { + return o.objs[o.idx-1] +} + +// Err returns the current error or nil. If Next() returns false and Err() is +// nil, then all objects have been seen. +func (o *ObjectIterator) Err() error { + if o.err == io.EOF { + return nil + } + return o.err +} + +type objectIteratorOptions struct { + hidden bool + unfinished bool + prefix string + delimiter string +} + +// A ListOption alters the default behavor of List. +type ListOption func(*objectIteratorOptions) + +// ListHidden will include hidden objects in the output. +func ListHidden() ListOption { + return func(o *objectIteratorOptions) { + o.hidden = true + } +} + +// ListUnfinished will list unfinished large file operations instead of +// existing objects. +func ListUnfinished() ListOption { + return func(o *objectIteratorOptions) { + o.unfinished = true + } +} + +// ListPrefix will restrict the output to objects whose names begin with +// prefix. +func ListPrefix(pfx string) ListOption { + return func(o *objectIteratorOptions) { + o.prefix = pfx + } +} + +// ListDelimiter denotes the path separator. If set, object listings will be +// truncated at this character. +// +// For example, if the bucket contains objects foo/bar, foo/baz, and foo, +// then a delimiter of "/" will cause the listing to return "foo" and "foo/". +// Otherwise, the listing would have returned all object names. +// +// Note that objects returned that end in the delimiter may not be actual +// objects, e.g. you cannot read from (or write to, or delete) an object +// "foo/", both because no actual object exists and because B2 disallows object +// names that end with "/". If you want to ensure that all objects returned +// are actual objects, leave this unset. +func ListDelimiter(delimiter string) ListOption { + return func(o *objectIteratorOptions) { + o.delimiter = delimiter + } +} diff --git a/vendor/github.com/kurin/blazer/base/base.go b/vendor/github.com/kurin/blazer/base/base.go index a2a1f0ce6..3f610c00f 100644 --- a/vendor/github.com/kurin/blazer/base/base.go +++ b/vendor/github.com/kurin/blazer/base/base.go @@ -42,7 +42,7 @@ import ( const ( APIBase = "https://api.backblazeb2.com" - DefaultUserAgent = "blazer/0.3.1" + DefaultUserAgent = "blazer/0.4.2" ) type b2err struct { diff --git a/vendor/github.com/kurin/blazer/internal/bin/cleanup/cleanup.go b/vendor/github.com/kurin/blazer/internal/bin/cleanup/cleanup.go index ece1b1aca..47f2fce5f 100644 --- a/vendor/github.com/kurin/blazer/internal/bin/cleanup/cleanup.go +++ b/vendor/github.com/kurin/blazer/internal/bin/cleanup/cleanup.go @@ -3,8 +3,8 @@ package main import ( "context" "fmt" - "io" "os" + "strings" "sync" "github.com/kurin/blazer/b2" @@ -24,12 +24,27 @@ func main() { fmt.Println(err) return } + buckets, err := client.ListBuckets(ctx) + if err != nil { + fmt.Println(err) + return + } + var kill []string + for _, bucket := range buckets { + if strings.HasPrefix(bucket.Name(), fmt.Sprintf("%s-b2-tests-", id)) { + kill = append(kill, bucket.Name()) + } + if bucket.Name() == fmt.Sprintf("%s-consistobucket", id) || bucket.Name() == fmt.Sprintf("%s-base-tests", id) { + kill = append(kill, bucket.Name()) + } + } var wg sync.WaitGroup - for _, name := range []string{"consistobucket", "base-tests"} { + for _, name := range kill { wg.Add(1) go func(name string) { defer wg.Done() - if err := killBucket(ctx, client, id, name); err != nil { + fmt.Println("removing", name) + if err := killBucket(ctx, client, name); err != nil { fmt.Println(err) } }(name) @@ -37,8 +52,8 @@ func main() { wg.Wait() } -func killBucket(ctx context.Context, client *b2.Client, id, name string) error { - bucket, err := client.NewBucket(ctx, id+"-"+name, nil) +func killBucket(ctx context.Context, client *b2.Client, name string) error { + bucket, err := client.NewBucket(ctx, name, nil) if b2.IsNotExist(err) { return nil } @@ -46,18 +61,11 @@ func killBucket(ctx context.Context, client *b2.Client, id, name string) error { return err } defer bucket.Delete(ctx) - cur := &b2.Cursor{} - for { - os, c, err := bucket.ListObjects(ctx, 1000, cur) - if err != nil && err != io.EOF { - return err + iter := bucket.List(b2.ListHidden()) + for iter.Next(ctx) { + if err := iter.Object().Delete(ctx); err != nil { + fmt.Println(err) } - for _, o := range os { - o.Delete(ctx) - } - if err == io.EOF { - return nil - } - cur = c } + return iter.Err() } diff --git a/vendor/github.com/kurin/blazer/x/consistent/consistent_test.go b/vendor/github.com/kurin/blazer/x/consistent/consistent_test.go index 0a9b086dc..b9a50b587 100644 --- a/vendor/github.com/kurin/blazer/x/consistent/consistent_test.go +++ b/vendor/github.com/kurin/blazer/x/consistent/consistent_test.go @@ -2,7 +2,6 @@ package consistent import ( "context" - "io" "io/ioutil" "os" "strconv" @@ -66,7 +65,7 @@ func TestOperationLive(t *testing.T) { t.Fatal(err) } if n != 100 { - t.Errorf("result: got %d, want 10", n) + t.Errorf("result: got %d, want 100", n) } } @@ -142,14 +141,15 @@ func startLiveTest(ctx context.Context, t *testing.T) (*b2.Bucket, func()) { return nil, nil } f := func() { - for c := range listObjects(ctx, bucket.ListObjects) { - if c.err != nil { - continue - } - if err := c.o.Delete(ctx); err != nil { + iter := bucket.List(b2.ListHidden()) + for iter.Next(ctx) { + if err := iter.Object().Delete(ctx); err != nil { t.Error(err) } } + if err := iter.Err(); err != nil && !b2.IsNotExist(err) { + t.Error(err) + } if err := bucket.Delete(ctx); err != nil && !b2.IsNotExist(err) { t.Error(err) } @@ -157,29 +157,6 @@ func startLiveTest(ctx context.Context, t *testing.T) (*b2.Bucket, func()) { return bucket, f } -func listObjects(ctx context.Context, f func(context.Context, int, *b2.Cursor) ([]*b2.Object, *b2.Cursor, error)) <-chan object { - ch := make(chan object) - go func() { - defer close(ch) - var cur *b2.Cursor - for { - objs, c, err := f(ctx, 100, cur) - if err != nil && err != io.EOF { - ch <- object{err: err} - return - } - for _, o := range objs { - ch <- object{o: o} - } - if err == io.EOF { - return - } - cur = c - } - }() - return ch -} - type object struct { o *b2.Object err error diff --git a/vendor/github.com/kurin/blazer/x/window/window.go b/vendor/github.com/kurin/blazer/x/window/window.go index 545deacfd..d18a88703 100644 --- a/vendor/github.com/kurin/blazer/x/window/window.go +++ b/vendor/github.com/kurin/blazer/x/window/window.go @@ -24,7 +24,7 @@ import ( // A Window efficiently records events that have occurred over a span of time // extending from some fixed interval ago to now. Events that pass beyond this -// horizon effectively "fall off" the back of the window. +// horizon are discarded. type Window struct { mu sync.Mutex events []interface{} @@ -81,16 +81,27 @@ func (w *Window) sweep(now time.Time) { w.last = now }() - b := w.bucket(now) - p := w.bucket(w.last) + // This compares now and w.last's monotonic clocks. + diff := now.Sub(w.last) + if diff < 0 { + // time went backwards somehow; zero events and return + for i := range w.events { + w.events[i] = nil + } + return + } + last := now.Add(-diff) - if b == p && now.Sub(w.last) <= w.res { + b := w.bucket(now) + p := w.bucket(last) + + if b == p && diff <= w.res { // We're in the same bucket as the previous sweep, so all buckets are // valid. return } - if now.Sub(w.last) > w.res*time.Duration(len(w.events)) { + if diff > w.res*time.Duration(len(w.events)) { // We've gone longer than this window measures since the last sweep, just // zero the thing and have done. for i := range w.events { @@ -102,10 +113,10 @@ func (w *Window) sweep(now time.Time) { // Expire all invalid buckets. This means buckets not seen since the // previous sweep and now, including the current bucket but not including the // previous bucket. - old := int(w.last.UnixNano()) / int(w.res) - new := int(now.UnixNano()) / int(w.res) + old := int64(last.UnixNano()) / int64(w.res) + new := int64(now.UnixNano()) / int64(w.res) for i := old + 1; i <= new; i++ { - b := i % len(w.events) + b := int(i) % len(w.events) w.events[b] = nil } } diff --git a/vendor/github.com/kurin/blazer/x/window/window_test.go b/vendor/github.com/kurin/blazer/x/window/window_test.go index 4b5513916..d0fc91aa6 100644 --- a/vendor/github.com/kurin/blazer/x/window/window_test.go +++ b/vendor/github.com/kurin/blazer/x/window/window_test.go @@ -73,6 +73,21 @@ func TestWindows(t *testing.T) { want: 6, reduce: adder, }, + { // what happens if time goes backwards? + size: time.Minute, + dur: time.Second, + incs: []epair{ + {t: time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC), e: 1}, + {t: time.Date(2000, 1, 1, 0, 0, 1, 0, time.UTC), e: 1}, + {t: time.Date(2000, 1, 1, 0, 0, 2, 0, time.UTC), e: 1}, + {t: time.Date(2000, 1, 1, 0, 0, 3, 0, time.UTC), e: 1}, + {t: time.Date(2000, 1, 1, 0, 0, 4, 0, time.UTC), e: 1}, + {t: time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC), e: 1}, + }, + look: time.Date(2000, 1, 1, 0, 0, 30, 0, time.UTC), + want: 1, + reduce: adder, + }, } for _, e := range table {