Update goleveldb

This commit is contained in:
Jakob Borg 2014-08-29 12:36:45 +02:00
parent b0408ef5c6
commit 435d3958f4
14 changed files with 713 additions and 581 deletions

4
Godeps/Godeps.json generated
View File

@ -41,7 +41,7 @@
},
{
"ImportPath": "github.com/calmh/xdr",
"Rev": "e1714bbe4764b15490fcc8ebd25d4bd9ea50a4b9"
"Rev": "a597b63b87d6140f79084c8aab214b4d533833a1"
},
{
"ImportPath": "github.com/juju/ratelimit",
@ -49,7 +49,7 @@
},
{
"ImportPath": "github.com/syndtr/goleveldb/leveldb",
"Rev": "17fd8940e0f778c27793a25bff8c48ddd7bf53ac"
"Rev": "59d87758aeaab5ab6ed289c773349500228a1557"
},
{
"ImportPath": "github.com/vitrun/qart/coding",

View File

@ -22,7 +22,7 @@ func (r *Reader) ReadUint8() uint8 {
}
if debug {
dl.Printf("rd uint8=%d (0x%08x)", r.b[0], r.b[0])
dl.Printf("rd uint8=%d (0x%02x)", r.b[0], r.b[0])
}
return r.b[0]
}
@ -43,7 +43,7 @@ func (r *Reader) ReadUint16() uint16 {
v := uint16(r.b[1]) | uint16(r.b[0])<<8
if debug {
dl.Printf("rd uint16=%d (0x%08x)", v, v)
dl.Printf("rd uint16=%d (0x%04x)", v, v)
}
return v
}

View File

@ -249,7 +249,9 @@ func (p *dbBench) newIter() iterator.Iterator {
}
func (p *dbBench) close() {
p.b.Log(p.db.s.tops.bpool)
if bp, err := p.db.GetProperty("leveldb.blockpool"); err == nil {
p.b.Log("Block pool stats: ", bp)
}
p.db.Close()
p.stor.Close()
os.RemoveAll(benchDB)

View File

@ -11,84 +11,106 @@ import (
"sync/atomic"
)
// SetFunc used by Namespace.Get method to create a cache object. SetFunc
// may return ok false, in that case the cache object will not be created.
type SetFunc func() (ok bool, value interface{}, charge int, fin SetFin)
// SetFunc is the function that will be called by Namespace.Get to create
// a cache object, if charge is less than one than the cache object will
// not be registered to cache tree, if value is nil then the cache object
// will not be created.
type SetFunc func() (charge int, value interface{})
// SetFin will be called when corresponding cache object are released.
type SetFin func()
// DelFin is the function that will be called as the result of a delete operation.
// Exist == true is indication that the object is exist, and pending == true is
// indication of deletion already happen but haven't done yet (wait for all handles
// to be released). And exist == false means the object doesn't exist.
type DelFin func(exist, pending bool)
// DelFin will be called when corresponding cache object are released.
// DelFin will be called after SetFin. The exist is true if the corresponding
// cache object is actually exist in the cache tree.
type DelFin func(exist bool)
// PurgeFin will be called when corresponding cache object are released.
// PurgeFin will be called after SetFin. If PurgeFin present DelFin will
// not be executed but passed to the PurgeFin, it is up to the caller
// to call it or not.
type PurgeFin func(ns, key uint64, delfin DelFin)
// PurgeFin is the function that will be called as the result of a purge operation.
type PurgeFin func(ns, key uint64)
// Cache is a cache tree. A cache instance must be goroutine-safe.
type Cache interface {
// SetCapacity sets cache capacity.
// SetCapacity sets cache tree capacity.
SetCapacity(capacity int)
// GetNamespace gets or creates a cache namespace for the given id.
// Capacity returns cache tree capacity.
Capacity() int
// Used returns used cache tree capacity.
Used() int
// Size returns entire alive cache objects size.
Size() int
// GetNamespace gets cache namespace with the given id.
// GetNamespace is never return nil.
GetNamespace(id uint64) Namespace
// Purge purges all cache namespaces, read Namespace.Purge method documentation.
// Purge purges all cache namespace from this cache tree.
// This is behave the same as calling Namespace.Purge method on all cache namespace.
Purge(fin PurgeFin)
// Zap zaps all cache namespaces, read Namespace.Zap method documentation.
Zap(closed bool)
// Zap detaches all cache namespace from this cache tree.
// This is behave the same as calling Namespace.Zap method on all cache namespace.
Zap()
}
// Namespace is a cache namespace. A namespace instance must be goroutine-safe.
type Namespace interface {
// Get gets cache object for the given key. The given SetFunc (if not nil) will
// be called if the given key does not exist.
// If the given key does not exist, SetFunc is nil or SetFunc return ok false, Get
// will return ok false.
Get(key uint64, setf SetFunc) (obj Object, ok bool)
// Get deletes cache object for the given key. If exist the cache object will
// be deleted later when all of its handles have been released (i.e. no one use
// it anymore) and the given DelFin (if not nil) will finally be executed. If
// such cache object does not exist the given DelFin will be executed anyway.
// Get gets cache object with the given key.
// If cache object is not found and setf is not nil, Get will atomically creates
// the cache object by calling setf. Otherwise Get will returns nil.
//
// Delete returns true if such cache object exist.
// The returned cache handle should be released after use by calling Release
// method.
Get(key uint64, setf SetFunc) Handle
// Delete removes cache object with the given key from cache tree.
// A deleted cache object will be released as soon as all of its handles have
// been released.
// Delete only happen once, subsequent delete will consider cache object doesn't
// exist, even if the cache object ins't released yet.
//
// If not nil, fin will be called if the cache object doesn't exist or when
// finally be released.
//
// Delete returns true if such cache object exist and never been deleted.
Delete(key uint64, fin DelFin) bool
// Purge deletes all cache objects, read Delete method documentation.
// Purge removes all cache objects within this namespace from cache tree.
// This is the same as doing delete on all cache objects.
//
// If not nil, fin will be called on all cache objects when its finally be
// released.
Purge(fin PurgeFin)
// Zap detaches the namespace from the cache tree and delete all its cache
// objects. The cache objects deletion and finalizers execution are happen
// immediately, even if its existing handles haven't yet been released.
// A zapped namespace can't never be filled again.
// If closed is false then the Get function will always call the given SetFunc
// if it is not nil, but resultant of the SetFunc will not be cached.
Zap(closed bool)
// Zap detaches namespace from cache tree and release all its cache objects.
// A zapped namespace can never be filled again.
// Calling Get on zapped namespace will always return nil.
Zap()
}
// Object is a cache object.
type Object interface {
// Release releases the cache object. Other methods should not be called
// after the cache object has been released.
// Handle is a cache handle.
type Handle interface {
// Release releases this cache handle. This method can be safely called mutiple
// times.
Release()
// Value returns value of the cache object.
// Value returns value of this cache handle.
// Value will returns nil after this cache handle have be released.
Value() interface{}
}
const (
DelNotExist = iota
DelExist
DelPendig
)
// Namespace state.
type nsState int
const (
nsEffective nsState = iota
nsZapped
nsClosed
)
// Node state.
@ -97,29 +119,29 @@ type nodeState int
const (
nodeEffective nodeState = iota
nodeEvicted
nodeRemoved
nodeDeleted
)
// Fake object.
type fakeObject struct {
// Fake handle.
type fakeHandle struct {
value interface{}
fin func()
once uint32
}
func (o *fakeObject) Value() interface{} {
if atomic.LoadUint32(&o.once) == 0 {
return o.value
func (h *fakeHandle) Value() interface{} {
if atomic.LoadUint32(&h.once) == 0 {
return h.value
}
return nil
}
func (o *fakeObject) Release() {
if !atomic.CompareAndSwapUint32(&o.once, 0, 1) {
func (h *fakeHandle) Release() {
if !atomic.CompareAndSwapUint32(&h.once, 0, 1) {
return
}
if o.fin != nil {
o.fin()
o.fin = nil
if h.fin != nil {
h.fin()
h.fin = nil
}
}

View File

@ -7,15 +7,35 @@
package cache
import (
"fmt"
"math/rand"
"runtime"
"strings"
"sync"
"sync/atomic"
"testing"
"time"
)
func set(ns Namespace, key uint64, value interface{}, charge int, fin func()) Object {
obj, _ := ns.Get(key, func() (bool, interface{}, int, SetFin) {
return true, value, charge, fin
type releaserFunc struct {
fn func()
value interface{}
}
func (r releaserFunc) Release() {
if r.fn != nil {
r.fn()
}
}
func set(ns Namespace, key uint64, value interface{}, charge int, relf func()) Handle {
return ns.Get(key, func() (int, interface{}) {
if relf != nil {
return charge, releaserFunc{relf, value}
} else {
return charge, value
}
})
return obj
}
func TestCache_HitMiss(t *testing.T) {
@ -43,29 +63,31 @@ func TestCache_HitMiss(t *testing.T) {
setfin++
}).Release()
for j, y := range cases {
r, ok := ns.Get(y.key, nil)
h := ns.Get(y.key, nil)
if j <= i {
// should hit
if !ok {
if h == nil {
t.Errorf("case '%d' iteration '%d' is miss", i, j)
} else if r.Value().(string) != y.value {
t.Errorf("case '%d' iteration '%d' has invalid value got '%s', want '%s'", i, j, r.Value().(string), y.value)
} else {
if x := h.Value().(releaserFunc).value.(string); x != y.value {
t.Errorf("case '%d' iteration '%d' has invalid value got '%s', want '%s'", i, j, x, y.value)
}
}
} else {
// should miss
if ok {
t.Errorf("case '%d' iteration '%d' is hit , value '%s'", i, j, r.Value().(string))
if h != nil {
t.Errorf("case '%d' iteration '%d' is hit , value '%s'", i, j, h.Value().(releaserFunc).value.(string))
}
}
if ok {
r.Release()
if h != nil {
h.Release()
}
}
}
for i, x := range cases {
finalizerOk := false
ns.Delete(x.key, func(exist bool) {
ns.Delete(x.key, func(exist, pending bool) {
finalizerOk = true
})
@ -74,22 +96,24 @@ func TestCache_HitMiss(t *testing.T) {
}
for j, y := range cases {
r, ok := ns.Get(y.key, nil)
h := ns.Get(y.key, nil)
if j > i {
// should hit
if !ok {
if h == nil {
t.Errorf("case '%d' iteration '%d' is miss", i, j)
} else if r.Value().(string) != y.value {
t.Errorf("case '%d' iteration '%d' has invalid value got '%s', want '%s'", i, j, r.Value().(string), y.value)
} else {
if x := h.Value().(releaserFunc).value.(string); x != y.value {
t.Errorf("case '%d' iteration '%d' has invalid value got '%s', want '%s'", i, j, x, y.value)
}
}
} else {
// should miss
if ok {
t.Errorf("case '%d' iteration '%d' is hit, value '%s'", i, j, r.Value().(string))
if h != nil {
t.Errorf("case '%d' iteration '%d' is hit, value '%s'", i, j, h.Value().(releaserFunc).value.(string))
}
}
if ok {
r.Release()
if h != nil {
h.Release()
}
}
}
@ -107,42 +131,42 @@ func TestLRUCache_Eviction(t *testing.T) {
set(ns, 3, 3, 1, nil).Release()
set(ns, 4, 4, 1, nil).Release()
set(ns, 5, 5, 1, nil).Release()
if r, ok := ns.Get(2, nil); ok { // 1,3,4,5,2
r.Release()
if h := ns.Get(2, nil); h != nil { // 1,3,4,5,2
h.Release()
}
set(ns, 9, 9, 10, nil).Release() // 5,2,9
for _, x := range []uint64{9, 2, 5, 1} {
r, ok := ns.Get(x, nil)
if !ok {
t.Errorf("miss for key '%d'", x)
for _, key := range []uint64{9, 2, 5, 1} {
h := ns.Get(key, nil)
if h == nil {
t.Errorf("miss for key '%d'", key)
} else {
if r.Value().(int) != int(x) {
t.Errorf("invalid value for key '%d' want '%d', got '%d'", x, x, r.Value().(int))
if x := h.Value().(int); x != int(key) {
t.Errorf("invalid value for key '%d' want '%d', got '%d'", key, key, x)
}
r.Release()
h.Release()
}
}
o1.Release()
for _, x := range []uint64{1, 2, 5} {
r, ok := ns.Get(x, nil)
if !ok {
t.Errorf("miss for key '%d'", x)
for _, key := range []uint64{1, 2, 5} {
h := ns.Get(key, nil)
if h == nil {
t.Errorf("miss for key '%d'", key)
} else {
if r.Value().(int) != int(x) {
t.Errorf("invalid value for key '%d' want '%d', got '%d'", x, x, r.Value().(int))
if x := h.Value().(int); x != int(key) {
t.Errorf("invalid value for key '%d' want '%d', got '%d'", key, key, x)
}
r.Release()
h.Release()
}
}
for _, x := range []uint64{3, 4, 9} {
r, ok := ns.Get(x, nil)
if ok {
t.Errorf("hit for key '%d'", x)
if r.Value().(int) != int(x) {
t.Errorf("invalid value for key '%d' want '%d', got '%d'", x, x, r.Value().(int))
for _, key := range []uint64{3, 4, 9} {
h := ns.Get(key, nil)
if h != nil {
t.Errorf("hit for key '%d'", key)
if x := h.Value().(int); x != int(key) {
t.Errorf("invalid value for key '%d' want '%d', got '%d'", key, key, x)
}
r.Release()
h.Release()
}
}
}
@ -153,16 +177,15 @@ func TestLRUCache_SetGet(t *testing.T) {
for i := 0; i < 200; i++ {
n := uint64(rand.Intn(99999) % 20)
set(ns, n, n, 1, nil).Release()
if p, ok := ns.Get(n, nil); ok {
if p.Value() == nil {
if h := ns.Get(n, nil); h != nil {
if h.Value() == nil {
t.Errorf("key '%d' contains nil value", n)
} else {
got := p.Value().(uint64)
if got != n {
t.Errorf("invalid value for key '%d' want '%d', got '%d'", n, n, got)
if x := h.Value().(uint64); x != n {
t.Errorf("invalid value for key '%d' want '%d', got '%d'", n, n, x)
}
}
p.Release()
h.Release()
} else {
t.Errorf("key '%d' doesn't exist", n)
}
@ -176,31 +199,319 @@ func TestLRUCache_Purge(t *testing.T) {
o2 := set(ns1, 2, 2, 1, nil)
ns1.Purge(nil)
set(ns1, 3, 3, 1, nil).Release()
for _, x := range []uint64{1, 2, 3} {
r, ok := ns1.Get(x, nil)
if !ok {
t.Errorf("miss for key '%d'", x)
for _, key := range []uint64{1, 2, 3} {
h := ns1.Get(key, nil)
if h == nil {
t.Errorf("miss for key '%d'", key)
} else {
if r.Value().(int) != int(x) {
t.Errorf("invalid value for key '%d' want '%d', got '%d'", x, x, r.Value().(int))
if x := h.Value().(int); x != int(key) {
t.Errorf("invalid value for key '%d' want '%d', got '%d'", key, key, x)
}
r.Release()
h.Release()
}
}
o1.Release()
o2.Release()
for _, x := range []uint64{1, 2} {
r, ok := ns1.Get(x, nil)
if ok {
t.Errorf("hit for key '%d'", x)
if r.Value().(int) != int(x) {
t.Errorf("invalid value for key '%d' want '%d', got '%d'", x, x, r.Value().(int))
for _, key := range []uint64{1, 2} {
h := ns1.Get(key, nil)
if h != nil {
t.Errorf("hit for key '%d'", key)
if x := h.Value().(int); x != int(key) {
t.Errorf("invalid value for key '%d' want '%d', got '%d'", key, key, x)
}
r.Release()
h.Release()
}
}
}
type testingCacheObjectCounter struct {
created uint32
released uint32
}
func (c *testingCacheObjectCounter) createOne() {
atomic.AddUint32(&c.created, 1)
}
func (c *testingCacheObjectCounter) releaseOne() {
atomic.AddUint32(&c.released, 1)
}
type testingCacheObject struct {
t *testing.T
cnt *testingCacheObjectCounter
ns, key uint64
releaseCalled uint32
}
func (x *testingCacheObject) Release() {
if atomic.CompareAndSwapUint32(&x.releaseCalled, 0, 1) {
x.cnt.releaseOne()
} else {
x.t.Errorf("duplicate setfin NS#%d KEY#%s", x.ns, x.key)
}
}
func TestLRUCache_Finalizer(t *testing.T) {
const (
capacity = 100
goroutines = 100
iterations = 10000
keymax = 8000
)
runtime.GOMAXPROCS(runtime.NumCPU())
defer runtime.GOMAXPROCS(1)
wg := &sync.WaitGroup{}
cnt := &testingCacheObjectCounter{}
c := NewLRUCache(capacity)
type instance struct {
seed int64
rnd *rand.Rand
ns uint64
effective int32
handles []Handle
handlesMap map[uint64]int
delete bool
purge bool
zap bool
wantDel int32
delfinCalledAll int32
delfinCalledEff int32
purgefinCalled int32
}
instanceGet := func(p *instance, ns Namespace, key uint64) {
h := ns.Get(key, func() (charge int, value interface{}) {
to := &testingCacheObject{
t: t, cnt: cnt,
ns: p.ns,
key: key,
}
atomic.AddInt32(&p.effective, 1)
cnt.createOne()
return 1, releaserFunc{func() {
to.Release()
atomic.AddInt32(&p.effective, -1)
}, to}
})
p.handles = append(p.handles, h)
p.handlesMap[key] = p.handlesMap[key] + 1
}
instanceRelease := func(p *instance, ns Namespace, i int) {
h := p.handles[i]
key := h.Value().(releaserFunc).value.(*testingCacheObject).key
if n := p.handlesMap[key]; n == 0 {
t.Fatal("key ref == 0")
} else if n > 1 {
p.handlesMap[key] = n - 1
} else {
delete(p.handlesMap, key)
}
h.Release()
p.handles = append(p.handles[:i], p.handles[i+1:]...)
p.handles[len(p.handles) : len(p.handles)+1][0] = nil
}
seeds := make([]int64, goroutines)
instances := make([]instance, goroutines)
for i := range instances {
p := &instances[i]
p.handlesMap = make(map[uint64]int)
if seeds[i] == 0 {
seeds[i] = time.Now().UnixNano()
}
p.seed = seeds[i]
p.rnd = rand.New(rand.NewSource(p.seed))
p.ns = uint64(i)
p.delete = i%6 == 0
p.purge = i%8 == 0
p.zap = i%12 == 0 || i%3 == 0
}
seedsStr := make([]string, len(seeds))
for i, seed := range seeds {
seedsStr[i] = fmt.Sprint(seed)
}
t.Logf("seeds := []int64{%s}", strings.Join(seedsStr, ", "))
// Get and release.
for i := range instances {
p := &instances[i]
wg.Add(1)
go func(p *instance) {
defer wg.Done()
ns := c.GetNamespace(p.ns)
for i := 0; i < iterations; i++ {
if len(p.handles) == 0 || p.rnd.Int()%2 == 0 {
instanceGet(p, ns, uint64(p.rnd.Intn(keymax)))
} else {
instanceRelease(p, ns, p.rnd.Intn(len(p.handles)))
}
}
}(p)
}
wg.Wait()
if used, cap := c.Used(), c.Capacity(); used > cap {
t.Errorf("Used > capacity, used=%d cap=%d", used, cap)
}
// Check effective objects.
for i := range instances {
p := &instances[i]
if int(p.effective) < len(p.handlesMap) {
t.Errorf("#%d effective objects < acquired handle, eo=%d ah=%d", i, p.effective, len(p.handlesMap))
}
}
if want := int(cnt.created - cnt.released); c.Size() != want {
t.Errorf("Invalid cache size, want=%d got=%d", want, c.Size())
}
// Delete and purge.
for i := range instances {
p := &instances[i]
p.wantDel = p.effective
wg.Add(1)
go func(p *instance) {
defer wg.Done()
ns := c.GetNamespace(p.ns)
if p.delete {
for key := uint64(0); key < keymax; key++ {
_, wantExist := p.handlesMap[key]
gotExist := ns.Delete(key, func(exist, pending bool) {
atomic.AddInt32(&p.delfinCalledAll, 1)
if exist {
atomic.AddInt32(&p.delfinCalledEff, 1)
}
})
if !gotExist && wantExist {
t.Errorf("delete on NS#%d KEY#%d not found", p.ns, key)
}
}
var delfinCalled int
for key := uint64(0); key < keymax; key++ {
func(key uint64) {
gotExist := ns.Delete(key, func(exist, pending bool) {
if exist && !pending {
t.Errorf("delete fin on NS#%d KEY#%d exist and not pending for deletion", p.ns, key)
}
delfinCalled++
})
if gotExist {
t.Errorf("delete on NS#%d KEY#%d found", p.ns, key)
}
}(key)
}
if delfinCalled != keymax {
t.Errorf("(2) #%d not all delete fin called, diff=%d", p.ns, keymax-delfinCalled)
}
}
if p.purge {
ns.Purge(func(ns, key uint64) {
atomic.AddInt32(&p.purgefinCalled, 1)
})
}
}(p)
}
wg.Wait()
if want := int(cnt.created - cnt.released); c.Size() != want {
t.Errorf("Invalid cache size, want=%d got=%d", want, c.Size())
}
// Release.
for i := range instances {
p := &instances[i]
if !p.zap {
wg.Add(1)
go func(p *instance) {
defer wg.Done()
ns := c.GetNamespace(p.ns)
for i := len(p.handles) - 1; i >= 0; i-- {
instanceRelease(p, ns, i)
}
}(p)
}
}
wg.Wait()
if want := int(cnt.created - cnt.released); c.Size() != want {
t.Errorf("Invalid cache size, want=%d got=%d", want, c.Size())
}
// Zap.
for i := range instances {
p := &instances[i]
if p.zap {
wg.Add(1)
go func(p *instance) {
defer wg.Done()
ns := c.GetNamespace(p.ns)
ns.Zap()
p.handles = nil
p.handlesMap = nil
}(p)
}
}
wg.Wait()
if want := int(cnt.created - cnt.released); c.Size() != want {
t.Errorf("Invalid cache size, want=%d got=%d", want, c.Size())
}
if notrel, used := int(cnt.created-cnt.released), c.Used(); notrel != used {
t.Errorf("Invalid used value, want=%d got=%d", notrel, used)
}
c.Purge(nil)
for i := range instances {
p := &instances[i]
if p.delete {
if p.delfinCalledAll != keymax {
t.Errorf("#%d not all delete fin called, purge=%v zap=%v diff=%d", p.ns, p.purge, p.zap, keymax-p.delfinCalledAll)
}
if p.delfinCalledEff != p.wantDel {
t.Errorf("#%d not all effective delete fin called, diff=%d", p.ns, p.wantDel-p.delfinCalledEff)
}
if p.purge && p.purgefinCalled > 0 {
t.Errorf("#%d some purge fin called, delete=%v zap=%v n=%d", p.ns, p.delete, p.zap, p.purgefinCalled)
}
} else {
if p.purge {
if p.purgefinCalled != p.wantDel {
t.Errorf("#%d not all purge fin called, delete=%v zap=%v diff=%d", p.ns, p.delete, p.zap, p.wantDel-p.purgefinCalled)
}
}
}
}
if cnt.created != cnt.released {
t.Errorf("Some cache object weren't released, created=%d released=%d", cnt.created, cnt.released)
}
}
func BenchmarkLRUCache_SetRelease(b *testing.B) {
capacity := b.N / 100
if capacity <= 0 {

View File

@ -1,246 +0,0 @@
// Copyright (c) 2013, Suryandaru Triandana <syndtr@gmail.com>
// All rights reserved.
//
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
package cache
import (
"sync"
"sync/atomic"
)
type emptyCache struct {
sync.Mutex
table map[uint64]*emptyNS
}
// NewEmptyCache creates a new initialized empty cache.
func NewEmptyCache() Cache {
return &emptyCache{
table: make(map[uint64]*emptyNS),
}
}
func (c *emptyCache) GetNamespace(id uint64) Namespace {
c.Lock()
defer c.Unlock()
if ns, ok := c.table[id]; ok {
return ns
}
ns := &emptyNS{
cache: c,
id: id,
table: make(map[uint64]*emptyNode),
}
c.table[id] = ns
return ns
}
func (c *emptyCache) Purge(fin PurgeFin) {
c.Lock()
for _, ns := range c.table {
ns.purgeNB(fin)
}
c.Unlock()
}
func (c *emptyCache) Zap(closed bool) {
c.Lock()
for _, ns := range c.table {
ns.zapNB(closed)
}
c.table = make(map[uint64]*emptyNS)
c.Unlock()
}
func (*emptyCache) SetCapacity(capacity int) {}
type emptyNS struct {
cache *emptyCache
id uint64
table map[uint64]*emptyNode
state nsState
}
func (ns *emptyNS) Get(key uint64, setf SetFunc) (o Object, ok bool) {
ns.cache.Lock()
switch ns.state {
case nsZapped:
ns.cache.Unlock()
if setf == nil {
return
}
var value interface{}
var fin func()
ok, value, _, fin = setf()
if ok {
o = &fakeObject{
value: value,
fin: fin,
}
}
return
case nsClosed:
ns.cache.Unlock()
return
}
n, ok := ns.table[key]
if ok {
n.ref++
} else {
if setf == nil {
ns.cache.Unlock()
return
}
var value interface{}
var fin func()
ok, value, _, fin = setf()
if !ok {
ns.cache.Unlock()
return
}
n = &emptyNode{
ns: ns,
key: key,
value: value,
setfin: fin,
ref: 1,
}
ns.table[key] = n
}
ns.cache.Unlock()
o = &emptyObject{node: n}
return
}
func (ns *emptyNS) Delete(key uint64, fin DelFin) bool {
ns.cache.Lock()
if ns.state != nsEffective {
ns.cache.Unlock()
if fin != nil {
fin(false)
}
return false
}
n, ok := ns.table[key]
if !ok {
ns.cache.Unlock()
if fin != nil {
fin(false)
}
return false
}
n.delfin = fin
ns.cache.Unlock()
return true
}
func (ns *emptyNS) purgeNB(fin PurgeFin) {
if ns.state != nsEffective {
return
}
for _, n := range ns.table {
n.purgefin = fin
}
}
func (ns *emptyNS) Purge(fin PurgeFin) {
ns.cache.Lock()
ns.purgeNB(fin)
ns.cache.Unlock()
}
func (ns *emptyNS) zapNB(closed bool) {
if ns.state != nsEffective {
return
}
for _, n := range ns.table {
n.execFin()
}
if closed {
ns.state = nsClosed
} else {
ns.state = nsZapped
}
ns.table = nil
}
func (ns *emptyNS) Zap(closed bool) {
ns.cache.Lock()
ns.zapNB(closed)
delete(ns.cache.table, ns.id)
ns.cache.Unlock()
}
type emptyNode struct {
ns *emptyNS
key uint64
value interface{}
ref int
setfin SetFin
delfin DelFin
purgefin PurgeFin
}
func (n *emptyNode) execFin() {
if n.setfin != nil {
n.setfin()
n.setfin = nil
}
if n.purgefin != nil {
n.purgefin(n.ns.id, n.key, n.delfin)
n.delfin = nil
n.purgefin = nil
} else if n.delfin != nil {
n.delfin(true)
n.delfin = nil
}
}
func (n *emptyNode) evict() {
n.ns.cache.Lock()
n.ref--
if n.ref == 0 {
if n.ns.state == nsEffective {
// Remove elem.
delete(n.ns.table, n.key)
// Execute finalizer.
n.execFin()
}
} else if n.ref < 0 {
panic("leveldb/cache: emptyNode: negative node reference")
}
n.ns.cache.Unlock()
}
type emptyObject struct {
node *emptyNode
once uint32
}
func (o *emptyObject) Value() interface{} {
if atomic.LoadUint32(&o.once) == 0 {
return o.node.value
}
return nil
}
func (o *emptyObject) Release() {
if !atomic.CompareAndSwapUint32(&o.once, 0, 1) {
return
}
o.node.evict()
o.node = nil
}

View File

@ -9,16 +9,17 @@ package cache
import (
"sync"
"sync/atomic"
"github.com/syndtr/goleveldb/leveldb/util"
)
// lruCache represent a LRU cache state.
type lruCache struct {
sync.Mutex
mu sync.Mutex
recent lruNode
table map[uint64]*lruNs
capacity int
size int
used, size int
}
// NewLRUCache creates a new initialized LRU cache with the given capacity.
@ -32,57 +33,75 @@ func NewLRUCache(capacity int) Cache {
return c
}
func (c *lruCache) Capacity() int {
c.mu.Lock()
defer c.mu.Unlock()
return c.capacity
}
func (c *lruCache) Used() int {
c.mu.Lock()
defer c.mu.Unlock()
return c.used
}
func (c *lruCache) Size() int {
c.mu.Lock()
defer c.mu.Unlock()
return c.size
}
// SetCapacity set cache capacity.
func (c *lruCache) SetCapacity(capacity int) {
c.Lock()
c.mu.Lock()
c.capacity = capacity
c.evict()
c.Unlock()
c.mu.Unlock()
}
// GetNamespace return namespace object for given id.
func (c *lruCache) GetNamespace(id uint64) Namespace {
c.Lock()
defer c.Unlock()
c.mu.Lock()
defer c.mu.Unlock()
if p, ok := c.table[id]; ok {
return p
if ns, ok := c.table[id]; ok {
return ns
}
p := &lruNs{
ns := &lruNs{
lru: c,
id: id,
table: make(map[uint64]*lruNode),
}
c.table[id] = p
return p
c.table[id] = ns
return ns
}
// Purge purge entire cache.
func (c *lruCache) Purge(fin PurgeFin) {
c.Lock()
c.mu.Lock()
for _, ns := range c.table {
ns.purgeNB(fin)
}
c.Unlock()
c.mu.Unlock()
}
func (c *lruCache) Zap(closed bool) {
c.Lock()
func (c *lruCache) Zap() {
c.mu.Lock()
for _, ns := range c.table {
ns.zapNB(closed)
ns.zapNB()
}
c.table = make(map[uint64]*lruNs)
c.Unlock()
c.mu.Unlock()
}
func (c *lruCache) evict() {
top := &c.recent
for n := c.recent.rPrev; c.size > c.capacity && n != top; {
for n := c.recent.rPrev; c.used > c.capacity && n != top; {
n.state = nodeEvicted
n.rRemove()
n.evictNB()
c.size -= n.charge
n.derefNB()
c.used -= n.charge
n = c.recent.rPrev
}
}
@ -94,170 +113,157 @@ type lruNs struct {
state nsState
}
func (ns *lruNs) Get(key uint64, setf SetFunc) (o Object, ok bool) {
lru := ns.lru
lru.Lock()
func (ns *lruNs) Get(key uint64, setf SetFunc) Handle {
ns.lru.mu.Lock()
switch ns.state {
case nsZapped:
lru.Unlock()
if setf == nil {
return
if ns.state != nsEffective {
ns.lru.mu.Unlock()
return nil
}
var value interface{}
var fin func()
ok, value, _, fin = setf()
node, ok := ns.table[key]
if ok {
o = &fakeObject{
value: value,
fin: fin,
}
}
return
case nsClosed:
lru.Unlock()
return
}
n, ok := ns.table[key]
if ok {
switch n.state {
switch node.state {
case nodeEvicted:
// Insert to recent list.
n.state = nodeEffective
n.ref++
lru.size += n.charge
lru.evict()
node.state = nodeEffective
node.ref++
ns.lru.used += node.charge
ns.lru.evict()
fallthrough
case nodeEffective:
// Bump to front
n.rRemove()
n.rInsert(&lru.recent)
// Bump to front.
node.rRemove()
node.rInsert(&ns.lru.recent)
}
n.ref++
node.ref++
} else {
if setf == nil {
lru.Unlock()
return
ns.lru.mu.Unlock()
return nil
}
var value interface{}
var charge int
var fin func()
ok, value, charge, fin = setf()
if !ok {
lru.Unlock()
return
charge, value := setf()
if value == nil {
ns.lru.mu.Unlock()
return nil
}
n = &lruNode{
node = &lruNode{
ns: ns,
key: key,
value: value,
charge: charge,
setfin: fin,
ref: 2,
ref: 1,
}
ns.table[key] = n
n.rInsert(&lru.recent)
ns.table[key] = node
lru.size += charge
lru.evict()
if charge > 0 {
node.ref++
node.rInsert(&ns.lru.recent)
ns.lru.used += charge
ns.lru.size += charge
ns.lru.evict()
}
}
lru.Unlock()
o = &lruObject{node: n}
return
ns.lru.mu.Unlock()
return &lruHandle{node: node}
}
func (ns *lruNs) Delete(key uint64, fin DelFin) bool {
lru := ns.lru
lru.Lock()
ns.lru.mu.Lock()
if ns.state != nsEffective {
lru.Unlock()
if fin != nil {
fin(false)
fin(false, false)
}
ns.lru.mu.Unlock()
return false
}
n, ok := ns.table[key]
if !ok {
lru.Unlock()
node, exist := ns.table[key]
if !exist {
if fin != nil {
fin(false)
fin(false, false)
}
ns.lru.mu.Unlock()
return false
}
n.delfin = fin
switch n.state {
case nodeRemoved:
lru.Unlock()
switch node.state {
case nodeDeleted:
if fin != nil {
fin(true, true)
}
ns.lru.mu.Unlock()
return false
case nodeEffective:
lru.size -= n.charge
n.rRemove()
n.evictNB()
ns.lru.used -= node.charge
node.state = nodeDeleted
node.delfin = fin
node.rRemove()
node.derefNB()
default:
node.state = nodeDeleted
node.delfin = fin
}
n.state = nodeRemoved
lru.Unlock()
ns.lru.mu.Unlock()
return true
}
func (ns *lruNs) purgeNB(fin PurgeFin) {
lru := ns.lru
if ns.state != nsEffective {
return
}
for _, n := range ns.table {
n.purgefin = fin
if n.state == nodeEffective {
lru.size -= n.charge
n.rRemove()
n.evictNB()
for _, node := range ns.table {
switch node.state {
case nodeDeleted:
case nodeEffective:
ns.lru.used -= node.charge
node.state = nodeDeleted
node.purgefin = fin
node.rRemove()
node.derefNB()
default:
node.state = nodeDeleted
node.purgefin = fin
}
n.state = nodeRemoved
}
}
func (ns *lruNs) Purge(fin PurgeFin) {
ns.lru.Lock()
ns.lru.mu.Lock()
ns.purgeNB(fin)
ns.lru.Unlock()
ns.lru.mu.Unlock()
}
func (ns *lruNs) zapNB(closed bool) {
lru := ns.lru
func (ns *lruNs) zapNB() {
if ns.state != nsEffective {
return
}
if closed {
ns.state = nsClosed
} else {
ns.state = nsZapped
for _, node := range ns.table {
if node.state == nodeEffective {
ns.lru.used -= node.charge
node.rRemove()
}
for _, n := range ns.table {
if n.state == nodeEffective {
lru.size -= n.charge
n.rRemove()
}
n.state = nodeRemoved
n.execFin()
ns.lru.size -= node.charge
node.state = nodeDeleted
node.fin()
}
ns.table = nil
}
func (ns *lruNs) Zap(closed bool) {
ns.lru.Lock()
ns.zapNB(closed)
func (ns *lruNs) Zap() {
ns.lru.mu.Lock()
ns.zapNB()
delete(ns.lru.table, ns.id)
ns.lru.Unlock()
ns.lru.mu.Unlock()
}
type lruNode struct {
@ -270,7 +276,6 @@ type lruNode struct {
charge int
ref int
state nodeState
setfin SetFin
delfin DelFin
purgefin PurgeFin
}
@ -284,7 +289,6 @@ func (n *lruNode) rInsert(at *lruNode) {
}
func (n *lruNode) rRemove() bool {
// only remove if not already removed
if n.rPrev == nil {
return false
}
@ -297,58 +301,56 @@ func (n *lruNode) rRemove() bool {
return true
}
func (n *lruNode) execFin() {
if n.setfin != nil {
n.setfin()
n.setfin = nil
func (n *lruNode) fin() {
if r, ok := n.value.(util.Releaser); ok {
r.Release()
}
if n.purgefin != nil {
n.purgefin(n.ns.id, n.key, n.delfin)
n.purgefin(n.ns.id, n.key)
n.delfin = nil
n.purgefin = nil
} else if n.delfin != nil {
n.delfin(true)
n.delfin(true, false)
n.delfin = nil
}
}
func (n *lruNode) evictNB() {
func (n *lruNode) derefNB() {
n.ref--
if n.ref == 0 {
if n.ns.state == nsEffective {
// remove elem
// Remove elemement.
delete(n.ns.table, n.key)
// execute finalizer
n.execFin()
n.ns.lru.size -= n.charge
n.fin()
}
} else if n.ref < 0 {
panic("leveldb/cache: lruCache: negative node reference")
}
}
func (n *lruNode) evict() {
n.ns.lru.Lock()
n.evictNB()
n.ns.lru.Unlock()
func (n *lruNode) deref() {
n.ns.lru.mu.Lock()
n.derefNB()
n.ns.lru.mu.Unlock()
}
type lruObject struct {
type lruHandle struct {
node *lruNode
once uint32
}
func (o *lruObject) Value() interface{} {
if atomic.LoadUint32(&o.once) == 0 {
return o.node.value
func (h *lruHandle) Value() interface{} {
if atomic.LoadUint32(&h.once) == 0 {
return h.node.value
}
return nil
}
func (o *lruObject) Release() {
if !atomic.CompareAndSwapUint32(&o.once, 0, 1) {
func (h *lruHandle) Release() {
if !atomic.CompareAndSwapUint32(&h.once, 0, 1) {
return
}
o.node.evict()
o.node = nil
h.node.deref()
h.node = nil
}

View File

@ -655,6 +655,8 @@ func (db *DB) GetSnapshot() (*Snapshot, error) {
// Returns statistics of the underlying DB.
// leveldb.sstables
// Returns sstables list for each level.
// leveldb.blockpool
// Returns block pool stats.
func (db *DB) GetProperty(name string) (value string, err error) {
err = db.ok()
if err != nil {
@ -700,6 +702,16 @@ func (db *DB) GetProperty(name string) (value string, err error) {
value += fmt.Sprintf("%d:%d[%q .. %q]\n", t.file.Num(), t.size, t.imin, t.imax)
}
}
case p == "blockpool":
value = fmt.Sprintf("%v", db.s.tops.bpool)
case p == "cachedblock":
if bc := db.s.o.GetBlockCache(); bc != nil {
value = fmt.Sprintf("%d", bc.Size())
} else {
value = "<nil>"
}
case p == "openedtables":
value = fmt.Sprintf("%d", db.s.tops.cache.Size())
default:
err = errors.New("leveldb: GetProperty: unknown property: " + name)
}

View File

@ -37,6 +37,16 @@
// err = iter.Error()
// ...
//
// Iterate over subset of database content with a particular prefix:
// iter := db.NewIterator(util.BytesPrefix([]byte("foo-")), nil)
// for iter.Next() {
// // Use key/value.
// ...
// }
// iter.Release()
// err = iter.Error()
// ...
//
// Seek-then-Iterate:
//
// iter := db.NewIterator(nil, nil)

View File

@ -31,9 +31,12 @@ const (
type noCache struct{}
func (noCache) SetCapacity(capacity int) {}
func (noCache) Capacity() int { return 0 }
func (noCache) Used() int { return 0 }
func (noCache) Size() int { return 0 }
func (noCache) GetNamespace(id uint64) cache.Namespace { return nil }
func (noCache) Purge(fin cache.PurgeFin) {}
func (noCache) Zap(closed bool) {}
func (noCache) Zap() {}
var NoCache cache.Cache = noCache{}

View File

@ -7,6 +7,7 @@
package leveldb
import (
"io"
"sort"
"sync/atomic"
@ -297,7 +298,7 @@ func (t *tOps) create() (*tWriter, error) {
func (t *tOps) createFrom(src iterator.Iterator) (f *tFile, n int, err error) {
w, err := t.create()
if err != nil {
return f, n, err
return
}
defer func() {
@ -322,33 +323,33 @@ func (t *tOps) createFrom(src iterator.Iterator) (f *tFile, n int, err error) {
return
}
// Opens table. It returns a cache object, which should
type tableWrapper struct {
*table.Reader
closer io.Closer
}
func (tr tableWrapper) Release() {
tr.closer.Close()
}
// Opens table. It returns a cache handle, which should
// be released after use.
func (t *tOps) open(f *tFile) (c cache.Object, err error) {
func (t *tOps) open(f *tFile) (ch cache.Handle, err error) {
num := f.file.Num()
c, ok := t.cacheNS.Get(num, func() (ok bool, value interface{}, charge int, fin cache.SetFin) {
ch = t.cacheNS.Get(num, func() (charge int, value interface{}) {
var r storage.Reader
r, err = f.file.Open()
if err != nil {
return
return 0, nil
}
o := t.s.o
var cacheNS cache.Namespace
if bc := o.GetBlockCache(); bc != nil {
cacheNS = bc.GetNamespace(num)
var bcacheNS cache.Namespace
if bc := t.s.o.GetBlockCache(); bc != nil {
bcacheNS = bc.GetNamespace(num)
}
ok = true
value = table.NewReader(r, int64(f.size), cacheNS, t.bpool, o)
charge = 1
fin = func() {
r.Close()
}
return
return 1, tableWrapper{table.NewReader(r, int64(f.size), bcacheNS, t.bpool, t.s.o), r}
})
if !ok && err == nil {
if ch == nil && err == nil {
err = ErrClosed
}
return
@ -357,34 +358,34 @@ func (t *tOps) open(f *tFile) (c cache.Object, err error) {
// Finds key/value pair whose key is greater than or equal to the
// given key.
func (t *tOps) find(f *tFile, key []byte, ro *opt.ReadOptions) (rkey, rvalue []byte, err error) {
c, err := t.open(f)
ch, err := t.open(f)
if err != nil {
return nil, nil, err
}
defer c.Release()
return c.Value().(*table.Reader).Find(key, ro)
defer ch.Release()
return ch.Value().(tableWrapper).Find(key, ro)
}
// Returns approximate offset of the given key.
func (t *tOps) offsetOf(f *tFile, key []byte) (offset uint64, err error) {
c, err := t.open(f)
ch, err := t.open(f)
if err != nil {
return
}
_offset, err := c.Value().(*table.Reader).OffsetOf(key)
_offset, err := ch.Value().(tableWrapper).OffsetOf(key)
offset = uint64(_offset)
c.Release()
ch.Release()
return
}
// Creates an iterator from the given table.
func (t *tOps) newIterator(f *tFile, slice *util.Range, ro *opt.ReadOptions) iterator.Iterator {
c, err := t.open(f)
ch, err := t.open(f)
if err != nil {
return iterator.NewEmptyIterator(err)
}
iter := c.Value().(*table.Reader).NewIterator(slice, ro)
iter.SetReleaser(c)
iter := ch.Value().(tableWrapper).NewIterator(slice, ro)
iter.SetReleaser(ch)
return iter
}
@ -392,14 +393,16 @@ func (t *tOps) newIterator(f *tFile, slice *util.Range, ro *opt.ReadOptions) ite
// no one use the the table.
func (t *tOps) remove(f *tFile) {
num := f.file.Num()
t.cacheNS.Delete(num, func(exist bool) {
t.cacheNS.Delete(num, func(exist, pending bool) {
if !pending {
if err := f.file.Remove(); err != nil {
t.s.logf("table@remove removing @%d %q", num, err)
} else {
t.s.logf("table@remove removed @%d", num)
}
if bc := t.s.o.GetBlockCache(); bc != nil {
bc.GetNamespace(num).Zap(false)
bc.GetNamespace(num).Zap()
}
}
})
}
@ -407,7 +410,7 @@ func (t *tOps) remove(f *tFile) {
// Closes the table ops instance. It will close all tables,
// regadless still used or not.
func (t *tOps) close() {
t.cache.Zap(true)
t.cache.Zap()
}
// Creates new initialized table ops instance.

View File

@ -37,6 +37,7 @@ func max(x, y int) int {
}
type block struct {
bpool *util.BufferPool
cmp comparer.BasicComparer
data []byte
restartsLen int
@ -139,6 +140,14 @@ func (b *block) newIterator(slice *util.Range, inclLimit bool, cache util.Releas
return bi
}
func (b *block) Release() {
if b.bpool != nil {
b.bpool.Put(b.data)
b.bpool = nil
}
b.data = nil
}
type dir int
const (
@ -438,6 +447,7 @@ func (i *blockIter) Value() []byte {
func (i *blockIter) Release() {
if i.dir > dirReleased {
i.block = nil
i.prevNode = nil
i.prevKeys = nil
i.key = nil
@ -610,43 +620,25 @@ func (r *Reader) readFilterBlock(bh blockHandle, filter filter.Filter) (*filterB
return b, nil
}
type releaseBlock struct {
r *Reader
b *block
}
func (r releaseBlock) Release() {
if r.b.data != nil {
r.r.bpool.Put(r.b.data)
r.b.data = nil
}
}
func (r *Reader) getDataIter(dataBH blockHandle, slice *util.Range, checksum, fillCache bool) iterator.Iterator {
if r.cache != nil {
// Get/set block cache.
var err error
cache, ok := r.cache.Get(dataBH.offset, func() (ok bool, value interface{}, charge int, fin cache.SetFin) {
cache := r.cache.Get(dataBH.offset, func() (charge int, value interface{}) {
if !fillCache {
return
return 0, nil
}
var dataBlock *block
dataBlock, err = r.readBlock(dataBH, checksum)
if err == nil {
ok = true
value = dataBlock
charge = int(dataBH.length)
fin = func() {
r.bpool.Put(dataBlock.data)
dataBlock.data = nil
if err != nil {
return 0, nil
}
}
return
return int(dataBH.length), dataBlock
})
if err != nil {
return iterator.NewEmptyIterator(err)
}
if ok {
if cache != nil {
dataBlock := cache.Value().(*block)
if !dataBlock.checksum && (r.checksum || checksum) {
if !verifyChecksum(dataBlock.data) {
@ -662,7 +654,7 @@ func (r *Reader) getDataIter(dataBH blockHandle, slice *util.Range, checksum, fi
if err != nil {
return iterator.NewEmptyIterator(err)
}
iter := dataBlock.newIterator(slice, false, releaseBlock{r, dataBlock})
iter := dataBlock.newIterator(slice, false, dataBlock)
return iter
}

View File

@ -26,6 +26,8 @@ type BufferPool struct {
baseline1 int
baseline2 int
get uint32
put uint32
less uint32
equal uint32
greater uint32
@ -47,6 +49,8 @@ func (p *BufferPool) poolNum(n int) int {
// Get returns buffer with length of n.
func (p *BufferPool) Get(n int) []byte {
atomic.AddUint32(&p.get, 1)
poolNum := p.poolNum(n)
pool := p.pool[poolNum]
if poolNum == 0 {
@ -112,6 +116,8 @@ func (p *BufferPool) Get(n int) []byte {
// Put adds given buffer to the pool.
func (p *BufferPool) Put(b []byte) {
atomic.AddUint32(&p.put, 1)
pool := p.pool[p.poolNum(cap(b))]
select {
case pool <- b:
@ -121,8 +127,8 @@ func (p *BufferPool) Put(b []byte) {
}
func (p *BufferPool) String() string {
return fmt.Sprintf("BufferPool{B·%d Z·%v Zm·%v L·%d E·%d G·%d M·%d}",
p.baseline0, p.size, p.sizeMiss, p.less, p.equal, p.greater, p.miss)
return fmt.Sprintf("BufferPool{B·%d Z·%v Zm·%v G·%d P·%d <·%d =·%d >·%d M·%d}",
p.baseline0, p.size, p.sizeMiss, p.get, p.put, p.less, p.equal, p.greater, p.miss)
}
func (p *BufferPool) drain() {

View File

@ -14,3 +14,18 @@ type Range struct {
// Limit of the key range, not include in the range.
Limit []byte
}
// BytesPrefix returns key range that satisfy the given prefix.
// This only applicable for the standard 'bytes comparer'.
func BytesPrefix(prefix []byte) *Range {
var limit []byte
for i := len(prefix) - 1; i >= 0; i-- {
c := prefix[i]
if c < 0xff {
limit = make([]byte, i+1)
copy(limit, prefix)
limit[i] = c + 1
}
}
return &Range{prefix, limit}
}