diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index 08180fba9..065c61690 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -41,7 +41,7 @@ }, { "ImportPath": "github.com/calmh/xdr", - "Rev": "e1714bbe4764b15490fcc8ebd25d4bd9ea50a4b9" + "Rev": "a597b63b87d6140f79084c8aab214b4d533833a1" }, { "ImportPath": "github.com/juju/ratelimit", @@ -49,7 +49,7 @@ }, { "ImportPath": "github.com/syndtr/goleveldb/leveldb", - "Rev": "17fd8940e0f778c27793a25bff8c48ddd7bf53ac" + "Rev": "59d87758aeaab5ab6ed289c773349500228a1557" }, { "ImportPath": "github.com/vitrun/qart/coding", diff --git a/Godeps/_workspace/src/github.com/calmh/xdr/reader_ipdr.go b/Godeps/_workspace/src/github.com/calmh/xdr/reader_ipdr.go index 2fda700b9..28e0b56ad 100644 --- a/Godeps/_workspace/src/github.com/calmh/xdr/reader_ipdr.go +++ b/Godeps/_workspace/src/github.com/calmh/xdr/reader_ipdr.go @@ -22,7 +22,7 @@ func (r *Reader) ReadUint8() uint8 { } if debug { - dl.Printf("rd uint8=%d (0x%08x)", r.b[0], r.b[0]) + dl.Printf("rd uint8=%d (0x%02x)", r.b[0], r.b[0]) } return r.b[0] } @@ -43,7 +43,7 @@ func (r *Reader) ReadUint16() uint16 { v := uint16(r.b[1]) | uint16(r.b[0])<<8 if debug { - dl.Printf("rd uint16=%d (0x%08x)", v, v) + dl.Printf("rd uint16=%d (0x%04x)", v, v) } return v } diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/bench_test.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/bench_test.go index 295eebaef..91b426709 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/bench_test.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/bench_test.go @@ -249,7 +249,9 @@ func (p *dbBench) newIter() iterator.Iterator { } func (p *dbBench) close() { - p.b.Log(p.db.s.tops.bpool) + if bp, err := p.db.GetProperty("leveldb.blockpool"); err == nil { + p.b.Log("Block pool stats: ", bp) + } p.db.Close() p.stor.Close() os.RemoveAll(benchDB) diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/cache/cache.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/cache/cache.go index d71fe1505..fe398f03a 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/cache/cache.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/cache/cache.go @@ -11,84 +11,106 @@ import ( "sync/atomic" ) -// SetFunc used by Namespace.Get method to create a cache object. SetFunc -// may return ok false, in that case the cache object will not be created. -type SetFunc func() (ok bool, value interface{}, charge int, fin SetFin) +// SetFunc is the function that will be called by Namespace.Get to create +// a cache object, if charge is less than one than the cache object will +// not be registered to cache tree, if value is nil then the cache object +// will not be created. +type SetFunc func() (charge int, value interface{}) -// SetFin will be called when corresponding cache object are released. -type SetFin func() +// DelFin is the function that will be called as the result of a delete operation. +// Exist == true is indication that the object is exist, and pending == true is +// indication of deletion already happen but haven't done yet (wait for all handles +// to be released). And exist == false means the object doesn't exist. +type DelFin func(exist, pending bool) -// DelFin will be called when corresponding cache object are released. -// DelFin will be called after SetFin. The exist is true if the corresponding -// cache object is actually exist in the cache tree. -type DelFin func(exist bool) - -// PurgeFin will be called when corresponding cache object are released. -// PurgeFin will be called after SetFin. If PurgeFin present DelFin will -// not be executed but passed to the PurgeFin, it is up to the caller -// to call it or not. -type PurgeFin func(ns, key uint64, delfin DelFin) +// PurgeFin is the function that will be called as the result of a purge operation. +type PurgeFin func(ns, key uint64) // Cache is a cache tree. A cache instance must be goroutine-safe. type Cache interface { - // SetCapacity sets cache capacity. + // SetCapacity sets cache tree capacity. SetCapacity(capacity int) - // GetNamespace gets or creates a cache namespace for the given id. + // Capacity returns cache tree capacity. + Capacity() int + + // Used returns used cache tree capacity. + Used() int + + // Size returns entire alive cache objects size. + Size() int + + // GetNamespace gets cache namespace with the given id. + // GetNamespace is never return nil. GetNamespace(id uint64) Namespace - // Purge purges all cache namespaces, read Namespace.Purge method documentation. + // Purge purges all cache namespace from this cache tree. + // This is behave the same as calling Namespace.Purge method on all cache namespace. Purge(fin PurgeFin) - // Zap zaps all cache namespaces, read Namespace.Zap method documentation. - Zap(closed bool) + // Zap detaches all cache namespace from this cache tree. + // This is behave the same as calling Namespace.Zap method on all cache namespace. + Zap() } // Namespace is a cache namespace. A namespace instance must be goroutine-safe. type Namespace interface { - // Get gets cache object for the given key. The given SetFunc (if not nil) will - // be called if the given key does not exist. - // If the given key does not exist, SetFunc is nil or SetFunc return ok false, Get - // will return ok false. - Get(key uint64, setf SetFunc) (obj Object, ok bool) - - // Get deletes cache object for the given key. If exist the cache object will - // be deleted later when all of its handles have been released (i.e. no one use - // it anymore) and the given DelFin (if not nil) will finally be executed. If - // such cache object does not exist the given DelFin will be executed anyway. + // Get gets cache object with the given key. + // If cache object is not found and setf is not nil, Get will atomically creates + // the cache object by calling setf. Otherwise Get will returns nil. // - // Delete returns true if such cache object exist. + // The returned cache handle should be released after use by calling Release + // method. + Get(key uint64, setf SetFunc) Handle + + // Delete removes cache object with the given key from cache tree. + // A deleted cache object will be released as soon as all of its handles have + // been released. + // Delete only happen once, subsequent delete will consider cache object doesn't + // exist, even if the cache object ins't released yet. + // + // If not nil, fin will be called if the cache object doesn't exist or when + // finally be released. + // + // Delete returns true if such cache object exist and never been deleted. Delete(key uint64, fin DelFin) bool - // Purge deletes all cache objects, read Delete method documentation. + // Purge removes all cache objects within this namespace from cache tree. + // This is the same as doing delete on all cache objects. + // + // If not nil, fin will be called on all cache objects when its finally be + // released. Purge(fin PurgeFin) - // Zap detaches the namespace from the cache tree and delete all its cache - // objects. The cache objects deletion and finalizers execution are happen - // immediately, even if its existing handles haven't yet been released. - // A zapped namespace can't never be filled again. - // If closed is false then the Get function will always call the given SetFunc - // if it is not nil, but resultant of the SetFunc will not be cached. - Zap(closed bool) + // Zap detaches namespace from cache tree and release all its cache objects. + // A zapped namespace can never be filled again. + // Calling Get on zapped namespace will always return nil. + Zap() } -// Object is a cache object. -type Object interface { - // Release releases the cache object. Other methods should not be called - // after the cache object has been released. +// Handle is a cache handle. +type Handle interface { + // Release releases this cache handle. This method can be safely called mutiple + // times. Release() - // Value returns value of the cache object. + // Value returns value of this cache handle. + // Value will returns nil after this cache handle have be released. Value() interface{} } +const ( + DelNotExist = iota + DelExist + DelPendig +) + // Namespace state. type nsState int const ( nsEffective nsState = iota nsZapped - nsClosed ) // Node state. @@ -97,29 +119,29 @@ type nodeState int const ( nodeEffective nodeState = iota nodeEvicted - nodeRemoved + nodeDeleted ) -// Fake object. -type fakeObject struct { +// Fake handle. +type fakeHandle struct { value interface{} fin func() once uint32 } -func (o *fakeObject) Value() interface{} { - if atomic.LoadUint32(&o.once) == 0 { - return o.value +func (h *fakeHandle) Value() interface{} { + if atomic.LoadUint32(&h.once) == 0 { + return h.value } return nil } -func (o *fakeObject) Release() { - if !atomic.CompareAndSwapUint32(&o.once, 0, 1) { +func (h *fakeHandle) Release() { + if !atomic.CompareAndSwapUint32(&h.once, 0, 1) { return } - if o.fin != nil { - o.fin() - o.fin = nil + if h.fin != nil { + h.fin() + h.fin = nil } } diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/cache/cache_test.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/cache/cache_test.go index 07a9939b2..6735e02ef 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/cache/cache_test.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/cache/cache_test.go @@ -7,15 +7,35 @@ package cache import ( + "fmt" "math/rand" + "runtime" + "strings" + "sync" + "sync/atomic" "testing" + "time" ) -func set(ns Namespace, key uint64, value interface{}, charge int, fin func()) Object { - obj, _ := ns.Get(key, func() (bool, interface{}, int, SetFin) { - return true, value, charge, fin +type releaserFunc struct { + fn func() + value interface{} +} + +func (r releaserFunc) Release() { + if r.fn != nil { + r.fn() + } +} + +func set(ns Namespace, key uint64, value interface{}, charge int, relf func()) Handle { + return ns.Get(key, func() (int, interface{}) { + if relf != nil { + return charge, releaserFunc{relf, value} + } else { + return charge, value + } }) - return obj } func TestCache_HitMiss(t *testing.T) { @@ -43,29 +63,31 @@ func TestCache_HitMiss(t *testing.T) { setfin++ }).Release() for j, y := range cases { - r, ok := ns.Get(y.key, nil) + h := ns.Get(y.key, nil) if j <= i { // should hit - if !ok { + if h == nil { t.Errorf("case '%d' iteration '%d' is miss", i, j) - } else if r.Value().(string) != y.value { - t.Errorf("case '%d' iteration '%d' has invalid value got '%s', want '%s'", i, j, r.Value().(string), y.value) + } else { + if x := h.Value().(releaserFunc).value.(string); x != y.value { + t.Errorf("case '%d' iteration '%d' has invalid value got '%s', want '%s'", i, j, x, y.value) + } } } else { // should miss - if ok { - t.Errorf("case '%d' iteration '%d' is hit , value '%s'", i, j, r.Value().(string)) + if h != nil { + t.Errorf("case '%d' iteration '%d' is hit , value '%s'", i, j, h.Value().(releaserFunc).value.(string)) } } - if ok { - r.Release() + if h != nil { + h.Release() } } } for i, x := range cases { finalizerOk := false - ns.Delete(x.key, func(exist bool) { + ns.Delete(x.key, func(exist, pending bool) { finalizerOk = true }) @@ -74,22 +96,24 @@ func TestCache_HitMiss(t *testing.T) { } for j, y := range cases { - r, ok := ns.Get(y.key, nil) + h := ns.Get(y.key, nil) if j > i { // should hit - if !ok { + if h == nil { t.Errorf("case '%d' iteration '%d' is miss", i, j) - } else if r.Value().(string) != y.value { - t.Errorf("case '%d' iteration '%d' has invalid value got '%s', want '%s'", i, j, r.Value().(string), y.value) + } else { + if x := h.Value().(releaserFunc).value.(string); x != y.value { + t.Errorf("case '%d' iteration '%d' has invalid value got '%s', want '%s'", i, j, x, y.value) + } } } else { // should miss - if ok { - t.Errorf("case '%d' iteration '%d' is hit, value '%s'", i, j, r.Value().(string)) + if h != nil { + t.Errorf("case '%d' iteration '%d' is hit, value '%s'", i, j, h.Value().(releaserFunc).value.(string)) } } - if ok { - r.Release() + if h != nil { + h.Release() } } } @@ -107,42 +131,42 @@ func TestLRUCache_Eviction(t *testing.T) { set(ns, 3, 3, 1, nil).Release() set(ns, 4, 4, 1, nil).Release() set(ns, 5, 5, 1, nil).Release() - if r, ok := ns.Get(2, nil); ok { // 1,3,4,5,2 - r.Release() + if h := ns.Get(2, nil); h != nil { // 1,3,4,5,2 + h.Release() } set(ns, 9, 9, 10, nil).Release() // 5,2,9 - for _, x := range []uint64{9, 2, 5, 1} { - r, ok := ns.Get(x, nil) - if !ok { - t.Errorf("miss for key '%d'", x) + for _, key := range []uint64{9, 2, 5, 1} { + h := ns.Get(key, nil) + if h == nil { + t.Errorf("miss for key '%d'", key) } else { - if r.Value().(int) != int(x) { - t.Errorf("invalid value for key '%d' want '%d', got '%d'", x, x, r.Value().(int)) + if x := h.Value().(int); x != int(key) { + t.Errorf("invalid value for key '%d' want '%d', got '%d'", key, key, x) } - r.Release() + h.Release() } } o1.Release() - for _, x := range []uint64{1, 2, 5} { - r, ok := ns.Get(x, nil) - if !ok { - t.Errorf("miss for key '%d'", x) + for _, key := range []uint64{1, 2, 5} { + h := ns.Get(key, nil) + if h == nil { + t.Errorf("miss for key '%d'", key) } else { - if r.Value().(int) != int(x) { - t.Errorf("invalid value for key '%d' want '%d', got '%d'", x, x, r.Value().(int)) + if x := h.Value().(int); x != int(key) { + t.Errorf("invalid value for key '%d' want '%d', got '%d'", key, key, x) } - r.Release() + h.Release() } } - for _, x := range []uint64{3, 4, 9} { - r, ok := ns.Get(x, nil) - if ok { - t.Errorf("hit for key '%d'", x) - if r.Value().(int) != int(x) { - t.Errorf("invalid value for key '%d' want '%d', got '%d'", x, x, r.Value().(int)) + for _, key := range []uint64{3, 4, 9} { + h := ns.Get(key, nil) + if h != nil { + t.Errorf("hit for key '%d'", key) + if x := h.Value().(int); x != int(key) { + t.Errorf("invalid value for key '%d' want '%d', got '%d'", key, key, x) } - r.Release() + h.Release() } } } @@ -153,16 +177,15 @@ func TestLRUCache_SetGet(t *testing.T) { for i := 0; i < 200; i++ { n := uint64(rand.Intn(99999) % 20) set(ns, n, n, 1, nil).Release() - if p, ok := ns.Get(n, nil); ok { - if p.Value() == nil { + if h := ns.Get(n, nil); h != nil { + if h.Value() == nil { t.Errorf("key '%d' contains nil value", n) } else { - got := p.Value().(uint64) - if got != n { - t.Errorf("invalid value for key '%d' want '%d', got '%d'", n, n, got) + if x := h.Value().(uint64); x != n { + t.Errorf("invalid value for key '%d' want '%d', got '%d'", n, n, x) } } - p.Release() + h.Release() } else { t.Errorf("key '%d' doesn't exist", n) } @@ -176,31 +199,319 @@ func TestLRUCache_Purge(t *testing.T) { o2 := set(ns1, 2, 2, 1, nil) ns1.Purge(nil) set(ns1, 3, 3, 1, nil).Release() - for _, x := range []uint64{1, 2, 3} { - r, ok := ns1.Get(x, nil) - if !ok { - t.Errorf("miss for key '%d'", x) + for _, key := range []uint64{1, 2, 3} { + h := ns1.Get(key, nil) + if h == nil { + t.Errorf("miss for key '%d'", key) } else { - if r.Value().(int) != int(x) { - t.Errorf("invalid value for key '%d' want '%d', got '%d'", x, x, r.Value().(int)) + if x := h.Value().(int); x != int(key) { + t.Errorf("invalid value for key '%d' want '%d', got '%d'", key, key, x) } - r.Release() + h.Release() } } o1.Release() o2.Release() - for _, x := range []uint64{1, 2} { - r, ok := ns1.Get(x, nil) - if ok { - t.Errorf("hit for key '%d'", x) - if r.Value().(int) != int(x) { - t.Errorf("invalid value for key '%d' want '%d', got '%d'", x, x, r.Value().(int)) + for _, key := range []uint64{1, 2} { + h := ns1.Get(key, nil) + if h != nil { + t.Errorf("hit for key '%d'", key) + if x := h.Value().(int); x != int(key) { + t.Errorf("invalid value for key '%d' want '%d', got '%d'", key, key, x) } - r.Release() + h.Release() } } } +type testingCacheObjectCounter struct { + created uint32 + released uint32 +} + +func (c *testingCacheObjectCounter) createOne() { + atomic.AddUint32(&c.created, 1) +} + +func (c *testingCacheObjectCounter) releaseOne() { + atomic.AddUint32(&c.released, 1) +} + +type testingCacheObject struct { + t *testing.T + cnt *testingCacheObjectCounter + + ns, key uint64 + + releaseCalled uint32 +} + +func (x *testingCacheObject) Release() { + if atomic.CompareAndSwapUint32(&x.releaseCalled, 0, 1) { + x.cnt.releaseOne() + } else { + x.t.Errorf("duplicate setfin NS#%d KEY#%s", x.ns, x.key) + } +} + +func TestLRUCache_Finalizer(t *testing.T) { + const ( + capacity = 100 + goroutines = 100 + iterations = 10000 + keymax = 8000 + ) + + runtime.GOMAXPROCS(runtime.NumCPU()) + defer runtime.GOMAXPROCS(1) + + wg := &sync.WaitGroup{} + cnt := &testingCacheObjectCounter{} + + c := NewLRUCache(capacity) + + type instance struct { + seed int64 + rnd *rand.Rand + ns uint64 + effective int32 + handles []Handle + handlesMap map[uint64]int + + delete bool + purge bool + zap bool + wantDel int32 + delfinCalledAll int32 + delfinCalledEff int32 + purgefinCalled int32 + } + + instanceGet := func(p *instance, ns Namespace, key uint64) { + h := ns.Get(key, func() (charge int, value interface{}) { + to := &testingCacheObject{ + t: t, cnt: cnt, + ns: p.ns, + key: key, + } + atomic.AddInt32(&p.effective, 1) + cnt.createOne() + return 1, releaserFunc{func() { + to.Release() + atomic.AddInt32(&p.effective, -1) + }, to} + }) + p.handles = append(p.handles, h) + p.handlesMap[key] = p.handlesMap[key] + 1 + } + instanceRelease := func(p *instance, ns Namespace, i int) { + h := p.handles[i] + key := h.Value().(releaserFunc).value.(*testingCacheObject).key + if n := p.handlesMap[key]; n == 0 { + t.Fatal("key ref == 0") + } else if n > 1 { + p.handlesMap[key] = n - 1 + } else { + delete(p.handlesMap, key) + } + h.Release() + p.handles = append(p.handles[:i], p.handles[i+1:]...) + p.handles[len(p.handles) : len(p.handles)+1][0] = nil + } + + seeds := make([]int64, goroutines) + instances := make([]instance, goroutines) + for i := range instances { + p := &instances[i] + p.handlesMap = make(map[uint64]int) + if seeds[i] == 0 { + seeds[i] = time.Now().UnixNano() + } + p.seed = seeds[i] + p.rnd = rand.New(rand.NewSource(p.seed)) + p.ns = uint64(i) + p.delete = i%6 == 0 + p.purge = i%8 == 0 + p.zap = i%12 == 0 || i%3 == 0 + } + + seedsStr := make([]string, len(seeds)) + for i, seed := range seeds { + seedsStr[i] = fmt.Sprint(seed) + } + t.Logf("seeds := []int64{%s}", strings.Join(seedsStr, ", ")) + + // Get and release. + for i := range instances { + p := &instances[i] + + wg.Add(1) + go func(p *instance) { + defer wg.Done() + + ns := c.GetNamespace(p.ns) + for i := 0; i < iterations; i++ { + if len(p.handles) == 0 || p.rnd.Int()%2 == 0 { + instanceGet(p, ns, uint64(p.rnd.Intn(keymax))) + } else { + instanceRelease(p, ns, p.rnd.Intn(len(p.handles))) + } + } + }(p) + } + wg.Wait() + + if used, cap := c.Used(), c.Capacity(); used > cap { + t.Errorf("Used > capacity, used=%d cap=%d", used, cap) + } + + // Check effective objects. + for i := range instances { + p := &instances[i] + if int(p.effective) < len(p.handlesMap) { + t.Errorf("#%d effective objects < acquired handle, eo=%d ah=%d", i, p.effective, len(p.handlesMap)) + } + } + + if want := int(cnt.created - cnt.released); c.Size() != want { + t.Errorf("Invalid cache size, want=%d got=%d", want, c.Size()) + } + + // Delete and purge. + for i := range instances { + p := &instances[i] + p.wantDel = p.effective + + wg.Add(1) + go func(p *instance) { + defer wg.Done() + + ns := c.GetNamespace(p.ns) + + if p.delete { + for key := uint64(0); key < keymax; key++ { + _, wantExist := p.handlesMap[key] + gotExist := ns.Delete(key, func(exist, pending bool) { + atomic.AddInt32(&p.delfinCalledAll, 1) + if exist { + atomic.AddInt32(&p.delfinCalledEff, 1) + } + }) + if !gotExist && wantExist { + t.Errorf("delete on NS#%d KEY#%d not found", p.ns, key) + } + } + + var delfinCalled int + for key := uint64(0); key < keymax; key++ { + func(key uint64) { + gotExist := ns.Delete(key, func(exist, pending bool) { + if exist && !pending { + t.Errorf("delete fin on NS#%d KEY#%d exist and not pending for deletion", p.ns, key) + } + delfinCalled++ + }) + if gotExist { + t.Errorf("delete on NS#%d KEY#%d found", p.ns, key) + } + }(key) + } + if delfinCalled != keymax { + t.Errorf("(2) #%d not all delete fin called, diff=%d", p.ns, keymax-delfinCalled) + } + } + + if p.purge { + ns.Purge(func(ns, key uint64) { + atomic.AddInt32(&p.purgefinCalled, 1) + }) + } + }(p) + } + wg.Wait() + + if want := int(cnt.created - cnt.released); c.Size() != want { + t.Errorf("Invalid cache size, want=%d got=%d", want, c.Size()) + } + + // Release. + for i := range instances { + p := &instances[i] + + if !p.zap { + wg.Add(1) + go func(p *instance) { + defer wg.Done() + + ns := c.GetNamespace(p.ns) + for i := len(p.handles) - 1; i >= 0; i-- { + instanceRelease(p, ns, i) + } + }(p) + } + } + wg.Wait() + + if want := int(cnt.created - cnt.released); c.Size() != want { + t.Errorf("Invalid cache size, want=%d got=%d", want, c.Size()) + } + + // Zap. + for i := range instances { + p := &instances[i] + + if p.zap { + wg.Add(1) + go func(p *instance) { + defer wg.Done() + + ns := c.GetNamespace(p.ns) + ns.Zap() + + p.handles = nil + p.handlesMap = nil + }(p) + } + } + wg.Wait() + + if want := int(cnt.created - cnt.released); c.Size() != want { + t.Errorf("Invalid cache size, want=%d got=%d", want, c.Size()) + } + + if notrel, used := int(cnt.created-cnt.released), c.Used(); notrel != used { + t.Errorf("Invalid used value, want=%d got=%d", notrel, used) + } + + c.Purge(nil) + + for i := range instances { + p := &instances[i] + + if p.delete { + if p.delfinCalledAll != keymax { + t.Errorf("#%d not all delete fin called, purge=%v zap=%v diff=%d", p.ns, p.purge, p.zap, keymax-p.delfinCalledAll) + } + if p.delfinCalledEff != p.wantDel { + t.Errorf("#%d not all effective delete fin called, diff=%d", p.ns, p.wantDel-p.delfinCalledEff) + } + if p.purge && p.purgefinCalled > 0 { + t.Errorf("#%d some purge fin called, delete=%v zap=%v n=%d", p.ns, p.delete, p.zap, p.purgefinCalled) + } + } else { + if p.purge { + if p.purgefinCalled != p.wantDel { + t.Errorf("#%d not all purge fin called, delete=%v zap=%v diff=%d", p.ns, p.delete, p.zap, p.wantDel-p.purgefinCalled) + } + } + } + } + + if cnt.created != cnt.released { + t.Errorf("Some cache object weren't released, created=%d released=%d", cnt.created, cnt.released) + } +} + func BenchmarkLRUCache_SetRelease(b *testing.B) { capacity := b.N / 100 if capacity <= 0 { diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/cache/empty_cache.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/cache/empty_cache.go deleted file mode 100644 index 1fbf81459..000000000 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/cache/empty_cache.go +++ /dev/null @@ -1,246 +0,0 @@ -// Copyright (c) 2013, Suryandaru Triandana -// All rights reserved. -// -// Use of this source code is governed by a BSD-style license that can be -// found in the LICENSE file. - -package cache - -import ( - "sync" - "sync/atomic" -) - -type emptyCache struct { - sync.Mutex - table map[uint64]*emptyNS -} - -// NewEmptyCache creates a new initialized empty cache. -func NewEmptyCache() Cache { - return &emptyCache{ - table: make(map[uint64]*emptyNS), - } -} - -func (c *emptyCache) GetNamespace(id uint64) Namespace { - c.Lock() - defer c.Unlock() - - if ns, ok := c.table[id]; ok { - return ns - } - - ns := &emptyNS{ - cache: c, - id: id, - table: make(map[uint64]*emptyNode), - } - c.table[id] = ns - return ns -} - -func (c *emptyCache) Purge(fin PurgeFin) { - c.Lock() - for _, ns := range c.table { - ns.purgeNB(fin) - } - c.Unlock() -} - -func (c *emptyCache) Zap(closed bool) { - c.Lock() - for _, ns := range c.table { - ns.zapNB(closed) - } - c.table = make(map[uint64]*emptyNS) - c.Unlock() -} - -func (*emptyCache) SetCapacity(capacity int) {} - -type emptyNS struct { - cache *emptyCache - id uint64 - table map[uint64]*emptyNode - state nsState -} - -func (ns *emptyNS) Get(key uint64, setf SetFunc) (o Object, ok bool) { - ns.cache.Lock() - - switch ns.state { - case nsZapped: - ns.cache.Unlock() - if setf == nil { - return - } - - var value interface{} - var fin func() - ok, value, _, fin = setf() - if ok { - o = &fakeObject{ - value: value, - fin: fin, - } - } - return - case nsClosed: - ns.cache.Unlock() - return - } - - n, ok := ns.table[key] - if ok { - n.ref++ - } else { - if setf == nil { - ns.cache.Unlock() - return - } - - var value interface{} - var fin func() - ok, value, _, fin = setf() - if !ok { - ns.cache.Unlock() - return - } - - n = &emptyNode{ - ns: ns, - key: key, - value: value, - setfin: fin, - ref: 1, - } - ns.table[key] = n - } - - ns.cache.Unlock() - o = &emptyObject{node: n} - return -} - -func (ns *emptyNS) Delete(key uint64, fin DelFin) bool { - ns.cache.Lock() - - if ns.state != nsEffective { - ns.cache.Unlock() - if fin != nil { - fin(false) - } - return false - } - - n, ok := ns.table[key] - if !ok { - ns.cache.Unlock() - if fin != nil { - fin(false) - } - return false - } - n.delfin = fin - ns.cache.Unlock() - return true -} - -func (ns *emptyNS) purgeNB(fin PurgeFin) { - if ns.state != nsEffective { - return - } - for _, n := range ns.table { - n.purgefin = fin - } -} - -func (ns *emptyNS) Purge(fin PurgeFin) { - ns.cache.Lock() - ns.purgeNB(fin) - ns.cache.Unlock() -} - -func (ns *emptyNS) zapNB(closed bool) { - if ns.state != nsEffective { - return - } - for _, n := range ns.table { - n.execFin() - } - if closed { - ns.state = nsClosed - } else { - ns.state = nsZapped - } - ns.table = nil -} - -func (ns *emptyNS) Zap(closed bool) { - ns.cache.Lock() - ns.zapNB(closed) - delete(ns.cache.table, ns.id) - ns.cache.Unlock() -} - -type emptyNode struct { - ns *emptyNS - key uint64 - value interface{} - ref int - setfin SetFin - delfin DelFin - purgefin PurgeFin -} - -func (n *emptyNode) execFin() { - if n.setfin != nil { - n.setfin() - n.setfin = nil - } - if n.purgefin != nil { - n.purgefin(n.ns.id, n.key, n.delfin) - n.delfin = nil - n.purgefin = nil - } else if n.delfin != nil { - n.delfin(true) - n.delfin = nil - } -} - -func (n *emptyNode) evict() { - n.ns.cache.Lock() - n.ref-- - if n.ref == 0 { - if n.ns.state == nsEffective { - // Remove elem. - delete(n.ns.table, n.key) - // Execute finalizer. - n.execFin() - } - } else if n.ref < 0 { - panic("leveldb/cache: emptyNode: negative node reference") - } - n.ns.cache.Unlock() -} - -type emptyObject struct { - node *emptyNode - once uint32 -} - -func (o *emptyObject) Value() interface{} { - if atomic.LoadUint32(&o.once) == 0 { - return o.node.value - } - return nil -} - -func (o *emptyObject) Release() { - if !atomic.CompareAndSwapUint32(&o.once, 0, 1) { - return - } - o.node.evict() - o.node = nil -} diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/cache/lru_cache.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/cache/lru_cache.go index 3c98e076b..a1504b159 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/cache/lru_cache.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/cache/lru_cache.go @@ -9,16 +9,17 @@ package cache import ( "sync" "sync/atomic" + + "github.com/syndtr/goleveldb/leveldb/util" ) // lruCache represent a LRU cache state. type lruCache struct { - sync.Mutex - - recent lruNode - table map[uint64]*lruNs - capacity int - size int + mu sync.Mutex + recent lruNode + table map[uint64]*lruNs + capacity int + used, size int } // NewLRUCache creates a new initialized LRU cache with the given capacity. @@ -32,57 +33,75 @@ func NewLRUCache(capacity int) Cache { return c } +func (c *lruCache) Capacity() int { + c.mu.Lock() + defer c.mu.Unlock() + return c.capacity +} + +func (c *lruCache) Used() int { + c.mu.Lock() + defer c.mu.Unlock() + return c.used +} + +func (c *lruCache) Size() int { + c.mu.Lock() + defer c.mu.Unlock() + return c.size +} + // SetCapacity set cache capacity. func (c *lruCache) SetCapacity(capacity int) { - c.Lock() + c.mu.Lock() c.capacity = capacity c.evict() - c.Unlock() + c.mu.Unlock() } // GetNamespace return namespace object for given id. func (c *lruCache) GetNamespace(id uint64) Namespace { - c.Lock() - defer c.Unlock() + c.mu.Lock() + defer c.mu.Unlock() - if p, ok := c.table[id]; ok { - return p + if ns, ok := c.table[id]; ok { + return ns } - p := &lruNs{ + ns := &lruNs{ lru: c, id: id, table: make(map[uint64]*lruNode), } - c.table[id] = p - return p + c.table[id] = ns + return ns } // Purge purge entire cache. func (c *lruCache) Purge(fin PurgeFin) { - c.Lock() + c.mu.Lock() for _, ns := range c.table { ns.purgeNB(fin) } - c.Unlock() + c.mu.Unlock() } -func (c *lruCache) Zap(closed bool) { - c.Lock() +func (c *lruCache) Zap() { + c.mu.Lock() for _, ns := range c.table { - ns.zapNB(closed) + ns.zapNB() } c.table = make(map[uint64]*lruNs) - c.Unlock() + c.mu.Unlock() } func (c *lruCache) evict() { top := &c.recent - for n := c.recent.rPrev; c.size > c.capacity && n != top; { + for n := c.recent.rPrev; c.used > c.capacity && n != top; { n.state = nodeEvicted n.rRemove() - n.evictNB() - c.size -= n.charge + n.derefNB() + c.used -= n.charge n = c.recent.rPrev } } @@ -94,170 +113,157 @@ type lruNs struct { state nsState } -func (ns *lruNs) Get(key uint64, setf SetFunc) (o Object, ok bool) { - lru := ns.lru - lru.Lock() +func (ns *lruNs) Get(key uint64, setf SetFunc) Handle { + ns.lru.mu.Lock() - switch ns.state { - case nsZapped: - lru.Unlock() - if setf == nil { - return - } - - var value interface{} - var fin func() - ok, value, _, fin = setf() - if ok { - o = &fakeObject{ - value: value, - fin: fin, - } - } - return - case nsClosed: - lru.Unlock() - return + if ns.state != nsEffective { + ns.lru.mu.Unlock() + return nil } - n, ok := ns.table[key] + node, ok := ns.table[key] if ok { - switch n.state { + switch node.state { case nodeEvicted: // Insert to recent list. - n.state = nodeEffective - n.ref++ - lru.size += n.charge - lru.evict() + node.state = nodeEffective + node.ref++ + ns.lru.used += node.charge + ns.lru.evict() fallthrough case nodeEffective: - // Bump to front - n.rRemove() - n.rInsert(&lru.recent) + // Bump to front. + node.rRemove() + node.rInsert(&ns.lru.recent) } - n.ref++ + node.ref++ } else { if setf == nil { - lru.Unlock() - return + ns.lru.mu.Unlock() + return nil } - var value interface{} - var charge int - var fin func() - ok, value, charge, fin = setf() - if !ok { - lru.Unlock() - return + charge, value := setf() + if value == nil { + ns.lru.mu.Unlock() + return nil } - n = &lruNode{ + node = &lruNode{ ns: ns, key: key, value: value, charge: charge, - setfin: fin, - ref: 2, + ref: 1, } - ns.table[key] = n - n.rInsert(&lru.recent) + ns.table[key] = node - lru.size += charge - lru.evict() + if charge > 0 { + node.ref++ + node.rInsert(&ns.lru.recent) + ns.lru.used += charge + ns.lru.size += charge + ns.lru.evict() + } } - lru.Unlock() - o = &lruObject{node: n} - return + ns.lru.mu.Unlock() + return &lruHandle{node: node} } func (ns *lruNs) Delete(key uint64, fin DelFin) bool { - lru := ns.lru - lru.Lock() + ns.lru.mu.Lock() if ns.state != nsEffective { - lru.Unlock() if fin != nil { - fin(false) + fin(false, false) } + ns.lru.mu.Unlock() return false } - n, ok := ns.table[key] - if !ok { - lru.Unlock() + node, exist := ns.table[key] + if !exist { if fin != nil { - fin(false) + fin(false, false) } + ns.lru.mu.Unlock() return false } - n.delfin = fin - switch n.state { - case nodeRemoved: - lru.Unlock() + switch node.state { + case nodeDeleted: + if fin != nil { + fin(true, true) + } + ns.lru.mu.Unlock() return false case nodeEffective: - lru.size -= n.charge - n.rRemove() - n.evictNB() + ns.lru.used -= node.charge + node.state = nodeDeleted + node.delfin = fin + node.rRemove() + node.derefNB() + default: + node.state = nodeDeleted + node.delfin = fin } - n.state = nodeRemoved - lru.Unlock() + ns.lru.mu.Unlock() return true } func (ns *lruNs) purgeNB(fin PurgeFin) { - lru := ns.lru if ns.state != nsEffective { return } - for _, n := range ns.table { - n.purgefin = fin - if n.state == nodeEffective { - lru.size -= n.charge - n.rRemove() - n.evictNB() + for _, node := range ns.table { + switch node.state { + case nodeDeleted: + case nodeEffective: + ns.lru.used -= node.charge + node.state = nodeDeleted + node.purgefin = fin + node.rRemove() + node.derefNB() + default: + node.state = nodeDeleted + node.purgefin = fin } - n.state = nodeRemoved } } func (ns *lruNs) Purge(fin PurgeFin) { - ns.lru.Lock() + ns.lru.mu.Lock() ns.purgeNB(fin) - ns.lru.Unlock() + ns.lru.mu.Unlock() } -func (ns *lruNs) zapNB(closed bool) { - lru := ns.lru +func (ns *lruNs) zapNB() { if ns.state != nsEffective { return } - if closed { - ns.state = nsClosed - } else { - ns.state = nsZapped - } - for _, n := range ns.table { - if n.state == nodeEffective { - lru.size -= n.charge - n.rRemove() + ns.state = nsZapped + + for _, node := range ns.table { + if node.state == nodeEffective { + ns.lru.used -= node.charge + node.rRemove() } - n.state = nodeRemoved - n.execFin() + ns.lru.size -= node.charge + node.state = nodeDeleted + node.fin() } ns.table = nil } -func (ns *lruNs) Zap(closed bool) { - ns.lru.Lock() - ns.zapNB(closed) +func (ns *lruNs) Zap() { + ns.lru.mu.Lock() + ns.zapNB() delete(ns.lru.table, ns.id) - ns.lru.Unlock() + ns.lru.mu.Unlock() } type lruNode struct { @@ -270,7 +276,6 @@ type lruNode struct { charge int ref int state nodeState - setfin SetFin delfin DelFin purgefin PurgeFin } @@ -284,7 +289,6 @@ func (n *lruNode) rInsert(at *lruNode) { } func (n *lruNode) rRemove() bool { - // only remove if not already removed if n.rPrev == nil { return false } @@ -297,58 +301,56 @@ func (n *lruNode) rRemove() bool { return true } -func (n *lruNode) execFin() { - if n.setfin != nil { - n.setfin() - n.setfin = nil +func (n *lruNode) fin() { + if r, ok := n.value.(util.Releaser); ok { + r.Release() } if n.purgefin != nil { - n.purgefin(n.ns.id, n.key, n.delfin) + n.purgefin(n.ns.id, n.key) n.delfin = nil n.purgefin = nil } else if n.delfin != nil { - n.delfin(true) + n.delfin(true, false) n.delfin = nil } } -func (n *lruNode) evictNB() { +func (n *lruNode) derefNB() { n.ref-- if n.ref == 0 { if n.ns.state == nsEffective { - // remove elem + // Remove elemement. delete(n.ns.table, n.key) - // execute finalizer - n.execFin() + n.ns.lru.size -= n.charge + n.fin() } } else if n.ref < 0 { panic("leveldb/cache: lruCache: negative node reference") } } -func (n *lruNode) evict() { - n.ns.lru.Lock() - n.evictNB() - n.ns.lru.Unlock() +func (n *lruNode) deref() { + n.ns.lru.mu.Lock() + n.derefNB() + n.ns.lru.mu.Unlock() } -type lruObject struct { +type lruHandle struct { node *lruNode once uint32 } -func (o *lruObject) Value() interface{} { - if atomic.LoadUint32(&o.once) == 0 { - return o.node.value +func (h *lruHandle) Value() interface{} { + if atomic.LoadUint32(&h.once) == 0 { + return h.node.value } return nil } -func (o *lruObject) Release() { - if !atomic.CompareAndSwapUint32(&o.once, 0, 1) { +func (h *lruHandle) Release() { + if !atomic.CompareAndSwapUint32(&h.once, 0, 1) { return } - - o.node.evict() - o.node = nil + h.node.deref() + h.node = nil } diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db.go index 1be8748d4..59b9017d3 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db.go @@ -655,6 +655,8 @@ func (db *DB) GetSnapshot() (*Snapshot, error) { // Returns statistics of the underlying DB. // leveldb.sstables // Returns sstables list for each level. +// leveldb.blockpool +// Returns block pool stats. func (db *DB) GetProperty(name string) (value string, err error) { err = db.ok() if err != nil { @@ -700,6 +702,16 @@ func (db *DB) GetProperty(name string) (value string, err error) { value += fmt.Sprintf("%d:%d[%q .. %q]\n", t.file.Num(), t.size, t.imin, t.imax) } } + case p == "blockpool": + value = fmt.Sprintf("%v", db.s.tops.bpool) + case p == "cachedblock": + if bc := db.s.o.GetBlockCache(); bc != nil { + value = fmt.Sprintf("%d", bc.Size()) + } else { + value = "" + } + case p == "openedtables": + value = fmt.Sprintf("%d", db.s.tops.cache.Size()) default: err = errors.New("leveldb: GetProperty: unknown property: " + name) } diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/doc.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/doc.go index ac9ea3d0c..53f13bb24 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/doc.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/doc.go @@ -37,6 +37,16 @@ // err = iter.Error() // ... // +// Iterate over subset of database content with a particular prefix: +// iter := db.NewIterator(util.BytesPrefix([]byte("foo-")), nil) +// for iter.Next() { +// // Use key/value. +// ... +// } +// iter.Release() +// err = iter.Error() +// ... +// // Seek-then-Iterate: // // iter := db.NewIterator(nil, nil) diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/opt/options.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/opt/options.go index b940ce427..241184481 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/opt/options.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/opt/options.go @@ -31,9 +31,12 @@ const ( type noCache struct{} func (noCache) SetCapacity(capacity int) {} +func (noCache) Capacity() int { return 0 } +func (noCache) Used() int { return 0 } +func (noCache) Size() int { return 0 } func (noCache) GetNamespace(id uint64) cache.Namespace { return nil } func (noCache) Purge(fin cache.PurgeFin) {} -func (noCache) Zap(closed bool) {} +func (noCache) Zap() {} var NoCache cache.Cache = noCache{} diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table.go index 46dd599be..1c3ff3249 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table.go @@ -7,6 +7,7 @@ package leveldb import ( + "io" "sort" "sync/atomic" @@ -297,7 +298,7 @@ func (t *tOps) create() (*tWriter, error) { func (t *tOps) createFrom(src iterator.Iterator) (f *tFile, n int, err error) { w, err := t.create() if err != nil { - return f, n, err + return } defer func() { @@ -322,33 +323,33 @@ func (t *tOps) createFrom(src iterator.Iterator) (f *tFile, n int, err error) { return } -// Opens table. It returns a cache object, which should +type tableWrapper struct { + *table.Reader + closer io.Closer +} + +func (tr tableWrapper) Release() { + tr.closer.Close() +} + +// Opens table. It returns a cache handle, which should // be released after use. -func (t *tOps) open(f *tFile) (c cache.Object, err error) { +func (t *tOps) open(f *tFile) (ch cache.Handle, err error) { num := f.file.Num() - c, ok := t.cacheNS.Get(num, func() (ok bool, value interface{}, charge int, fin cache.SetFin) { + ch = t.cacheNS.Get(num, func() (charge int, value interface{}) { var r storage.Reader r, err = f.file.Open() if err != nil { - return + return 0, nil } - o := t.s.o - - var cacheNS cache.Namespace - if bc := o.GetBlockCache(); bc != nil { - cacheNS = bc.GetNamespace(num) + var bcacheNS cache.Namespace + if bc := t.s.o.GetBlockCache(); bc != nil { + bcacheNS = bc.GetNamespace(num) } - - ok = true - value = table.NewReader(r, int64(f.size), cacheNS, t.bpool, o) - charge = 1 - fin = func() { - r.Close() - } - return + return 1, tableWrapper{table.NewReader(r, int64(f.size), bcacheNS, t.bpool, t.s.o), r} }) - if !ok && err == nil { + if ch == nil && err == nil { err = ErrClosed } return @@ -357,34 +358,34 @@ func (t *tOps) open(f *tFile) (c cache.Object, err error) { // Finds key/value pair whose key is greater than or equal to the // given key. func (t *tOps) find(f *tFile, key []byte, ro *opt.ReadOptions) (rkey, rvalue []byte, err error) { - c, err := t.open(f) + ch, err := t.open(f) if err != nil { return nil, nil, err } - defer c.Release() - return c.Value().(*table.Reader).Find(key, ro) + defer ch.Release() + return ch.Value().(tableWrapper).Find(key, ro) } // Returns approximate offset of the given key. func (t *tOps) offsetOf(f *tFile, key []byte) (offset uint64, err error) { - c, err := t.open(f) + ch, err := t.open(f) if err != nil { return } - _offset, err := c.Value().(*table.Reader).OffsetOf(key) + _offset, err := ch.Value().(tableWrapper).OffsetOf(key) offset = uint64(_offset) - c.Release() + ch.Release() return } // Creates an iterator from the given table. func (t *tOps) newIterator(f *tFile, slice *util.Range, ro *opt.ReadOptions) iterator.Iterator { - c, err := t.open(f) + ch, err := t.open(f) if err != nil { return iterator.NewEmptyIterator(err) } - iter := c.Value().(*table.Reader).NewIterator(slice, ro) - iter.SetReleaser(c) + iter := ch.Value().(tableWrapper).NewIterator(slice, ro) + iter.SetReleaser(ch) return iter } @@ -392,14 +393,16 @@ func (t *tOps) newIterator(f *tFile, slice *util.Range, ro *opt.ReadOptions) ite // no one use the the table. func (t *tOps) remove(f *tFile) { num := f.file.Num() - t.cacheNS.Delete(num, func(exist bool) { - if err := f.file.Remove(); err != nil { - t.s.logf("table@remove removing @%d %q", num, err) - } else { - t.s.logf("table@remove removed @%d", num) - } - if bc := t.s.o.GetBlockCache(); bc != nil { - bc.GetNamespace(num).Zap(false) + t.cacheNS.Delete(num, func(exist, pending bool) { + if !pending { + if err := f.file.Remove(); err != nil { + t.s.logf("table@remove removing @%d %q", num, err) + } else { + t.s.logf("table@remove removed @%d", num) + } + if bc := t.s.o.GetBlockCache(); bc != nil { + bc.GetNamespace(num).Zap() + } } }) } @@ -407,7 +410,7 @@ func (t *tOps) remove(f *tFile) { // Closes the table ops instance. It will close all tables, // regadless still used or not. func (t *tOps) close() { - t.cache.Zap(true) + t.cache.Zap() } // Creates new initialized table ops instance. diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table/reader.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table/reader.go index 0c62a3c62..f397ac4f8 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table/reader.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table/reader.go @@ -37,6 +37,7 @@ func max(x, y int) int { } type block struct { + bpool *util.BufferPool cmp comparer.BasicComparer data []byte restartsLen int @@ -139,6 +140,14 @@ func (b *block) newIterator(slice *util.Range, inclLimit bool, cache util.Releas return bi } +func (b *block) Release() { + if b.bpool != nil { + b.bpool.Put(b.data) + b.bpool = nil + } + b.data = nil +} + type dir int const ( @@ -438,6 +447,7 @@ func (i *blockIter) Value() []byte { func (i *blockIter) Release() { if i.dir > dirReleased { + i.block = nil i.prevNode = nil i.prevKeys = nil i.key = nil @@ -610,43 +620,25 @@ func (r *Reader) readFilterBlock(bh blockHandle, filter filter.Filter) (*filterB return b, nil } -type releaseBlock struct { - r *Reader - b *block -} - -func (r releaseBlock) Release() { - if r.b.data != nil { - r.r.bpool.Put(r.b.data) - r.b.data = nil - } -} - func (r *Reader) getDataIter(dataBH blockHandle, slice *util.Range, checksum, fillCache bool) iterator.Iterator { if r.cache != nil { // Get/set block cache. var err error - cache, ok := r.cache.Get(dataBH.offset, func() (ok bool, value interface{}, charge int, fin cache.SetFin) { + cache := r.cache.Get(dataBH.offset, func() (charge int, value interface{}) { if !fillCache { - return + return 0, nil } var dataBlock *block dataBlock, err = r.readBlock(dataBH, checksum) - if err == nil { - ok = true - value = dataBlock - charge = int(dataBH.length) - fin = func() { - r.bpool.Put(dataBlock.data) - dataBlock.data = nil - } + if err != nil { + return 0, nil } - return + return int(dataBH.length), dataBlock }) if err != nil { return iterator.NewEmptyIterator(err) } - if ok { + if cache != nil { dataBlock := cache.Value().(*block) if !dataBlock.checksum && (r.checksum || checksum) { if !verifyChecksum(dataBlock.data) { @@ -662,7 +654,7 @@ func (r *Reader) getDataIter(dataBH blockHandle, slice *util.Range, checksum, fi if err != nil { return iterator.NewEmptyIterator(err) } - iter := dataBlock.newIterator(slice, false, releaseBlock{r, dataBlock}) + iter := dataBlock.newIterator(slice, false, dataBlock) return iter } diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/util/buffer_pool_legacy.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/util/buffer_pool_legacy.go index 7b7331dea..957f953b5 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/util/buffer_pool_legacy.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/util/buffer_pool_legacy.go @@ -26,6 +26,8 @@ type BufferPool struct { baseline1 int baseline2 int + get uint32 + put uint32 less uint32 equal uint32 greater uint32 @@ -47,6 +49,8 @@ func (p *BufferPool) poolNum(n int) int { // Get returns buffer with length of n. func (p *BufferPool) Get(n int) []byte { + atomic.AddUint32(&p.get, 1) + poolNum := p.poolNum(n) pool := p.pool[poolNum] if poolNum == 0 { @@ -112,6 +116,8 @@ func (p *BufferPool) Get(n int) []byte { // Put adds given buffer to the pool. func (p *BufferPool) Put(b []byte) { + atomic.AddUint32(&p.put, 1) + pool := p.pool[p.poolNum(cap(b))] select { case pool <- b: @@ -121,8 +127,8 @@ func (p *BufferPool) Put(b []byte) { } func (p *BufferPool) String() string { - return fmt.Sprintf("BufferPool{B·%d Z·%v Zm·%v L·%d E·%d G·%d M·%d}", - p.baseline0, p.size, p.sizeMiss, p.less, p.equal, p.greater, p.miss) + return fmt.Sprintf("BufferPool{B·%d Z·%v Zm·%v G·%d P·%d <·%d =·%d >·%d M·%d}", + p.baseline0, p.size, p.sizeMiss, p.get, p.put, p.less, p.equal, p.greater, p.miss) } func (p *BufferPool) drain() { diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/util/range.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/util/range.go index da0583123..24891d529 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/util/range.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/util/range.go @@ -14,3 +14,18 @@ type Range struct { // Limit of the key range, not include in the range. Limit []byte } + +// BytesPrefix returns key range that satisfy the given prefix. +// This only applicable for the standard 'bytes comparer'. +func BytesPrefix(prefix []byte) *Range { + var limit []byte + for i := len(prefix) - 1; i >= 0; i-- { + c := prefix[i] + if c < 0xff { + limit = make([]byte, i+1) + copy(limit, prefix) + limit[i] = c + 1 + } + } + return &Range{prefix, limit} +}