Update goleveldb

This commit is contained in:
Jakob Borg 2014-10-06 22:07:33 +02:00
parent d089436546
commit d819151020
16 changed files with 512 additions and 226 deletions

2
Godeps/Godeps.json generated
View File

@ -48,7 +48,7 @@
},
{
"ImportPath": "github.com/syndtr/goleveldb/leveldb",
"Rev": "9bca75c48d6c31becfbb127702b425e7226052e3"
"Rev": "e2fa4e6ac1cc41a73bc9fd467878ecbf65df5cc3"
},
{
"ImportPath": "github.com/vitrun/qart/coding",

View File

@ -7,10 +7,8 @@
package cache
import (
"fmt"
"math/rand"
"runtime"
"strings"
"sync"
"sync/atomic"
"testing"
@ -225,16 +223,16 @@ func TestLRUCache_Purge(t *testing.T) {
}
type testingCacheObjectCounter struct {
created uint32
released uint32
created uint
released uint
}
func (c *testingCacheObjectCounter) createOne() {
atomic.AddUint32(&c.created, 1)
c.created++
}
func (c *testingCacheObjectCounter) releaseOne() {
atomic.AddUint32(&c.released, 1)
c.released++
}
type testingCacheObject struct {
@ -243,17 +241,75 @@ type testingCacheObject struct {
ns, key uint64
releaseCalled uint32
releaseCalled bool
}
func (x *testingCacheObject) Release() {
if atomic.CompareAndSwapUint32(&x.releaseCalled, 0, 1) {
if !x.releaseCalled {
x.releaseCalled = true
x.cnt.releaseOne()
} else {
x.t.Errorf("duplicate setfin NS#%d KEY#%s", x.ns, x.key)
}
}
func TestLRUCache_ConcurrentSetGet(t *testing.T) {
runtime.GOMAXPROCS(runtime.NumCPU())
seed := time.Now().UnixNano()
t.Logf("seed=%d", seed)
const (
N = 2000000
M = 4000
C = 3
)
var set, get uint32
wg := &sync.WaitGroup{}
c := NewLRUCache(M / 4)
for ni := uint64(0); ni < C; ni++ {
r0 := rand.New(rand.NewSource(seed + int64(ni)))
r1 := rand.New(rand.NewSource(seed + int64(ni) + 1))
ns := c.GetNamespace(ni)
wg.Add(2)
go func(ns Namespace, r *rand.Rand) {
for i := 0; i < N; i++ {
x := uint64(r.Int63n(M))
o := ns.Get(x, func() (int, interface{}) {
atomic.AddUint32(&set, 1)
return 1, x
})
if v := o.Value().(uint64); v != x {
t.Errorf("#%d invalid value, got=%d", x, v)
}
o.Release()
}
wg.Done()
}(ns, r0)
go func(ns Namespace, r *rand.Rand) {
for i := 0; i < N; i++ {
x := uint64(r.Int63n(M))
o := ns.Get(x, nil)
if o != nil {
atomic.AddUint32(&get, 1)
if v := o.Value().(uint64); v != x {
t.Errorf("#%d invalid value, got=%d", x, v)
}
o.Release()
}
}
wg.Done()
}(ns, r1)
}
wg.Wait()
t.Logf("set=%d get=%d", set, get)
}
func TestLRUCache_Finalizer(t *testing.T) {
const (
capacity = 100
@ -262,10 +318,6 @@ func TestLRUCache_Finalizer(t *testing.T) {
keymax = 8000
)
runtime.GOMAXPROCS(runtime.NumCPU())
defer runtime.GOMAXPROCS(1)
wg := &sync.WaitGroup{}
cnt := &testingCacheObjectCounter{}
c := NewLRUCache(capacity)
@ -273,38 +325,40 @@ func TestLRUCache_Finalizer(t *testing.T) {
type instance struct {
seed int64
rnd *rand.Rand
ns uint64
effective int32
nsid uint64
ns Namespace
effective int
handles []Handle
handlesMap map[uint64]int
delete bool
purge bool
zap bool
wantDel int32
delfinCalledAll int32
delfinCalledEff int32
purgefinCalled int32
wantDel int
delfinCalled int
delfinCalledAll int
delfinCalledEff int
purgefinCalled int
}
instanceGet := func(p *instance, ns Namespace, key uint64) {
h := ns.Get(key, func() (charge int, value interface{}) {
instanceGet := func(p *instance, key uint64) {
h := p.ns.Get(key, func() (charge int, value interface{}) {
to := &testingCacheObject{
t: t, cnt: cnt,
ns: p.ns,
ns: p.nsid,
key: key,
}
atomic.AddInt32(&p.effective, 1)
p.effective++
cnt.createOne()
return 1, releaserFunc{func() {
to.Release()
atomic.AddInt32(&p.effective, -1)
p.effective--
}, to}
})
p.handles = append(p.handles, h)
p.handlesMap[key] = p.handlesMap[key] + 1
}
instanceRelease := func(p *instance, ns Namespace, i int) {
instanceRelease := func(p *instance, i int) {
h := p.handles[i]
key := h.Value().(releaserFunc).value.(*testingCacheObject).key
if n := p.handlesMap[key]; n == 0 {
@ -319,55 +373,71 @@ func TestLRUCache_Finalizer(t *testing.T) {
p.handles[len(p.handles) : len(p.handles)+1][0] = nil
}
seeds := make([]int64, goroutines)
instances := make([]instance, goroutines)
seed := time.Now().UnixNano()
t.Logf("seed=%d", seed)
instances := make([]*instance, goroutines)
for i := range instances {
p := &instances[i]
p := &instance{}
p.handlesMap = make(map[uint64]int)
if seeds[i] == 0 {
seeds[i] = time.Now().UnixNano()
}
p.seed = seeds[i]
p.seed = seed + int64(i)
p.rnd = rand.New(rand.NewSource(p.seed))
p.ns = uint64(i)
p.nsid = uint64(i)
p.ns = c.GetNamespace(p.nsid)
p.delete = i%6 == 0
p.purge = i%8 == 0
p.zap = i%12 == 0 || i%3 == 0
instances[i] = p
}
seedsStr := make([]string, len(seeds))
for i, seed := range seeds {
seedsStr[i] = fmt.Sprint(seed)
}
t.Logf("seeds := []int64{%s}", strings.Join(seedsStr, ", "))
// Get and release.
for i := range instances {
p := &instances[i]
wg.Add(1)
go func(p *instance) {
defer wg.Done()
ns := c.GetNamespace(p.ns)
for i := 0; i < iterations; i++ {
if len(p.handles) == 0 || p.rnd.Int()%2 == 0 {
instanceGet(p, ns, uint64(p.rnd.Intn(keymax)))
} else {
instanceRelease(p, ns, p.rnd.Intn(len(p.handles)))
runr := rand.New(rand.NewSource(seed - 1))
run := func(rnd *rand.Rand, x []*instance, init func(p *instance) bool, fn func(p *instance, i int) bool) {
var (
rx []*instance
rn []int
)
if init == nil {
rx = append([]*instance{}, x...)
rn = make([]int, len(x))
} else {
for _, p := range x {
if init(p) {
rx = append(rx, p)
rn = append(rn, 0)
}
}
}(p)
}
for len(rx) > 0 {
i := rand.Intn(len(rx))
if fn(rx[i], rn[i]) {
rn[i]++
} else {
rx = append(rx[:i], rx[i+1:]...)
rn = append(rn[:i], rn[i+1:]...)
}
}
}
wg.Wait()
// Get and release.
run(runr, instances, nil, func(p *instance, i int) bool {
if i < iterations {
if len(p.handles) == 0 || p.rnd.Int()%2 == 0 {
instanceGet(p, uint64(p.rnd.Intn(keymax)))
} else {
instanceRelease(p, p.rnd.Intn(len(p.handles)))
}
return true
} else {
return false
}
})
if used, cap := c.Used(), c.Capacity(); used > cap {
t.Errorf("Used > capacity, used=%d cap=%d", used, cap)
}
// Check effective objects.
for i := range instances {
p := &instances[i]
for i, p := range instances {
if int(p.effective) < len(p.handlesMap) {
t.Errorf("#%d effective objects < acquired handle, eo=%d ah=%d", i, p.effective, len(p.handlesMap))
}
@ -377,103 +447,93 @@ func TestLRUCache_Finalizer(t *testing.T) {
t.Errorf("Invalid cache size, want=%d got=%d", want, c.Size())
}
// Delete and purge.
for i := range instances {
p := &instances[i]
// First delete.
run(runr, instances, func(p *instance) bool {
p.wantDel = p.effective
wg.Add(1)
go func(p *instance) {
defer wg.Done()
ns := c.GetNamespace(p.ns)
if p.delete {
for key := uint64(0); key < keymax; key++ {
_, wantExist := p.handlesMap[key]
gotExist := ns.Delete(key, func(exist, pending bool) {
atomic.AddInt32(&p.delfinCalledAll, 1)
if exist {
atomic.AddInt32(&p.delfinCalledEff, 1)
}
})
if !gotExist && wantExist {
t.Errorf("delete on NS#%d KEY#%d not found", p.ns, key)
}
}
var delfinCalled int
for key := uint64(0); key < keymax; key++ {
func(key uint64) {
gotExist := ns.Delete(key, func(exist, pending bool) {
if exist && !pending {
t.Errorf("delete fin on NS#%d KEY#%d exist and not pending for deletion", p.ns, key)
}
delfinCalled++
})
if gotExist {
t.Errorf("delete on NS#%d KEY#%d found", p.ns, key)
}
}(key)
}
if delfinCalled != keymax {
t.Errorf("(2) #%d not all delete fin called, diff=%d", p.ns, keymax-delfinCalled)
return p.delete
}, func(p *instance, i int) bool {
key := uint64(i)
if key < keymax {
_, wantExist := p.handlesMap[key]
gotExist := p.ns.Delete(key, func(exist, pending bool) {
p.delfinCalledAll++
if exist {
p.delfinCalledEff++
}
})
if !gotExist && wantExist {
t.Errorf("delete on NS#%d KEY#%d not found", p.nsid, key)
}
return true
} else {
return false
}
})
if p.purge {
ns.Purge(func(ns, key uint64) {
atomic.AddInt32(&p.purgefinCalled, 1)
})
// Second delete.
run(runr, instances, func(p *instance) bool {
p.delfinCalled = 0
return p.delete
}, func(p *instance, i int) bool {
key := uint64(i)
if key < keymax {
gotExist := p.ns.Delete(key, func(exist, pending bool) {
if exist && !pending {
t.Errorf("delete fin on NS#%d KEY#%d exist and not pending for deletion", p.nsid, key)
}
p.delfinCalled++
})
if gotExist {
t.Errorf("delete on NS#%d KEY#%d found", p.nsid, key)
}
}(p)
}
wg.Wait()
return true
} else {
if p.delfinCalled != keymax {
t.Errorf("(2) #%d not all delete fin called, diff=%d", p.ns, keymax-p.delfinCalled)
}
return false
}
})
// Purge.
run(runr, instances, func(p *instance) bool {
return p.purge
}, func(p *instance, i int) bool {
p.ns.Purge(func(ns, key uint64) {
p.purgefinCalled++
})
return false
})
if want := int(cnt.created - cnt.released); c.Size() != want {
t.Errorf("Invalid cache size, want=%d got=%d", want, c.Size())
}
// Release.
for i := range instances {
p := &instances[i]
if !p.zap {
wg.Add(1)
go func(p *instance) {
defer wg.Done()
ns := c.GetNamespace(p.ns)
for i := len(p.handles) - 1; i >= 0; i-- {
instanceRelease(p, ns, i)
}
}(p)
run(runr, instances, func(p *instance) bool {
return !p.zap
}, func(p *instance, i int) bool {
if len(p.handles) > 0 {
instanceRelease(p, len(p.handles)-1)
return true
} else {
return false
}
}
wg.Wait()
})
if want := int(cnt.created - cnt.released); c.Size() != want {
t.Errorf("Invalid cache size, want=%d got=%d", want, c.Size())
}
// Zap.
for i := range instances {
p := &instances[i]
if p.zap {
wg.Add(1)
go func(p *instance) {
defer wg.Done()
ns := c.GetNamespace(p.ns)
ns.Zap()
p.handles = nil
p.handlesMap = nil
}(p)
}
}
wg.Wait()
run(runr, instances, func(p *instance) bool {
return p.zap
}, func(p *instance, i int) bool {
p.ns.Zap()
p.handles = nil
p.handlesMap = nil
return false
})
if want := int(cnt.created - cnt.released); c.Size() != want {
t.Errorf("Invalid cache size, want=%d got=%d", want, c.Size())
@ -485,23 +545,21 @@ func TestLRUCache_Finalizer(t *testing.T) {
c.Purge(nil)
for i := range instances {
p := &instances[i]
for _, p := range instances {
if p.delete {
if p.delfinCalledAll != keymax {
t.Errorf("#%d not all delete fin called, purge=%v zap=%v diff=%d", p.ns, p.purge, p.zap, keymax-p.delfinCalledAll)
t.Errorf("#%d not all delete fin called, purge=%v zap=%v diff=%d", p.nsid, p.purge, p.zap, keymax-p.delfinCalledAll)
}
if p.delfinCalledEff != p.wantDel {
t.Errorf("#%d not all effective delete fin called, diff=%d", p.ns, p.wantDel-p.delfinCalledEff)
t.Errorf("#%d not all effective delete fin called, diff=%d", p.nsid, p.wantDel-p.delfinCalledEff)
}
if p.purge && p.purgefinCalled > 0 {
t.Errorf("#%d some purge fin called, delete=%v zap=%v n=%d", p.ns, p.delete, p.zap, p.purgefinCalled)
t.Errorf("#%d some purge fin called, delete=%v zap=%v n=%d", p.nsid, p.delete, p.zap, p.purgefinCalled)
}
} else {
if p.purge {
if p.purgefinCalled != p.wantDel {
t.Errorf("#%d not all purge fin called, delete=%v zap=%v diff=%d", p.ns, p.delete, p.zap, p.wantDel-p.purgefinCalled)
t.Errorf("#%d not all purge fin called, delete=%v zap=%v diff=%d", p.nsid, p.delete, p.zap, p.wantDel-p.purgefinCalled)
}
}
}

View File

@ -124,6 +124,9 @@ func (c *lruCache) Zap() {
func (c *lruCache) evict() {
top := &c.recent
for n := c.recent.rPrev; c.used > c.capacity && n != top; {
if n.state != nodeEffective {
panic("evicting non effective node")
}
n.state = nodeEvicted
n.rRemove()
n.derefNB()
@ -324,6 +327,10 @@ func (ns *lruNs) Get(key uint64, setf SetFunc) Handle {
// Bump to front.
n.rRemove()
n.rInsert(&ns.lru.recent)
case nodeDeleted:
// Do nothing.
default:
panic("invalid state")
}
n.ref++
@ -472,8 +479,10 @@ func (n *lruNode) fin() {
r.Release()
}
if n.purgefin != nil {
if n.delfin != nil {
panic("conflicting delete and purge fin")
}
n.purgefin(n.ns.id, n.key)
n.delfin = nil
n.purgefin = nil
} else if n.delfin != nil {
n.delfin(true, false)

View File

@ -260,7 +260,7 @@ func (db *DB) memCompaction() {
return c.commit(db.journalFile.Num(), db.frozenSeq)
}, nil)
db.logf("mem@flush commited F·%d T·%v", len(c.rec.addedTables), stats.duration)
db.logf("mem@flush committed F·%d T·%v", len(c.rec.addedTables), stats.duration)
for _, r := range c.rec.addedTables {
stats.write += r.size
@ -478,7 +478,7 @@ func (db *DB) tableCompaction(c *compaction, noTrivial bool) {
}, nil)
resultSize := int(stats[1].write)
db.logf("table@compaction commited F%s S%s D·%d T·%v", sint(len(rec.addedTables)-len(rec.deletedTables)), sshortenb(resultSize-sourceSize), dropCnt, stats[1].duration)
db.logf("table@compaction committed F%s S%s D·%d T·%v", sint(len(rec.addedTables)-len(rec.deletedTables)), sshortenb(resultSize-sourceSize), dropCnt, stats[1].duration)
// Save compaction stats
for i := range stats {

View File

@ -321,9 +321,13 @@ func (i *dbIter) Release() {
}
func (i *dbIter) SetReleaser(releaser util.Releaser) {
if i.dir != dirReleased {
i.releaser = releaser
if i.dir == dirReleased {
panic(util.ErrReleased)
}
if i.releaser != nil && releaser != nil {
panic(util.ErrHasReleaser)
}
i.releaser = releaser
}
func (i *dbIter) Error() error {

View File

@ -1577,11 +1577,7 @@ func TestDb_BloomFilter(t *testing.T) {
return fmt.Sprintf("key%06d", i)
}
const (
n = 10000
indexOverhead = 19898
filterOverhead = 19799
)
const n = 10000
// Populate multiple layers
for i := 0; i < n; i++ {
@ -1605,7 +1601,7 @@ func TestDb_BloomFilter(t *testing.T) {
cnt := int(h.stor.ReadCounter())
t.Logf("lookup of %d present keys yield %d sstable I/O reads", n, cnt)
if min, max := n+indexOverhead+filterOverhead, n+indexOverhead+filterOverhead+2*n/100; cnt < min || cnt > max {
if min, max := n, n+2*n/100; cnt < min || cnt > max {
t.Errorf("num of sstable I/O reads of present keys not in range of %d - %d, got %d", min, max, cnt)
}
@ -1616,7 +1612,7 @@ func TestDb_BloomFilter(t *testing.T) {
}
cnt = int(h.stor.ReadCounter())
t.Logf("lookup of %d missing keys yield %d sstable I/O reads", n, cnt)
if max := 3*n/100 + indexOverhead + filterOverhead; cnt > max {
if max := 3 * n / 100; cnt > max {
t.Errorf("num of sstable I/O reads of missing keys was more than %d, got %d", max, cnt)
}

View File

@ -40,13 +40,19 @@ type basicArrayIterator struct {
util.BasicReleaser
array BasicArray
pos int
err error
}
func (i *basicArrayIterator) Valid() bool {
return i.pos >= 0 && i.pos < i.array.Len()
return i.pos >= 0 && i.pos < i.array.Len() && !i.Released()
}
func (i *basicArrayIterator) First() bool {
if i.Released() {
i.err = ErrIterReleased
return false
}
if i.array.Len() == 0 {
i.pos = -1
return false
@ -56,6 +62,11 @@ func (i *basicArrayIterator) First() bool {
}
func (i *basicArrayIterator) Last() bool {
if i.Released() {
i.err = ErrIterReleased
return false
}
n := i.array.Len()
if n == 0 {
i.pos = 0
@ -66,6 +77,11 @@ func (i *basicArrayIterator) Last() bool {
}
func (i *basicArrayIterator) Seek(key []byte) bool {
if i.Released() {
i.err = ErrIterReleased
return false
}
n := i.array.Len()
if n == 0 {
i.pos = 0
@ -79,6 +95,11 @@ func (i *basicArrayIterator) Seek(key []byte) bool {
}
func (i *basicArrayIterator) Next() bool {
if i.Released() {
i.err = ErrIterReleased
return false
}
i.pos++
if n := i.array.Len(); i.pos >= n {
i.pos = n
@ -88,6 +109,11 @@ func (i *basicArrayIterator) Next() bool {
}
func (i *basicArrayIterator) Prev() bool {
if i.Released() {
i.err = ErrIterReleased
return false
}
i.pos--
if i.pos < 0 {
i.pos = -1
@ -96,7 +122,7 @@ func (i *basicArrayIterator) Prev() bool {
return true
}
func (i *basicArrayIterator) Error() error { return nil }
func (i *basicArrayIterator) Error() error { return i.err }
type arrayIterator struct {
basicArrayIterator

View File

@ -26,9 +26,10 @@ type indexedIterator struct {
strict bool
strictGet bool
data Iterator
err error
errf func(err error)
data Iterator
err error
errf func(err error)
closed bool
}
func (i *indexedIterator) setData() {
@ -50,6 +51,15 @@ func (i *indexedIterator) clearData() {
i.data = nil
}
func (i *indexedIterator) indexErr() {
if err := i.index.Error(); err != nil {
if i.errf != nil {
i.errf(err)
}
i.err = err
}
}
func (i *indexedIterator) dataErr() bool {
if i.errf != nil {
if err := i.data.Error(); err != nil {
@ -72,9 +82,13 @@ func (i *indexedIterator) Valid() bool {
func (i *indexedIterator) First() bool {
if i.err != nil {
return false
} else if i.Released() {
i.err = ErrIterReleased
return false
}
if !i.index.First() {
i.indexErr()
i.clearData()
return false
}
@ -85,9 +99,13 @@ func (i *indexedIterator) First() bool {
func (i *indexedIterator) Last() bool {
if i.err != nil {
return false
} else if i.Released() {
i.err = ErrIterReleased
return false
}
if !i.index.Last() {
i.indexErr()
i.clearData()
return false
}
@ -105,9 +123,13 @@ func (i *indexedIterator) Last() bool {
func (i *indexedIterator) Seek(key []byte) bool {
if i.err != nil {
return false
} else if i.Released() {
i.err = ErrIterReleased
return false
}
if !i.index.Seek(key) {
i.indexErr()
i.clearData()
return false
}
@ -125,6 +147,9 @@ func (i *indexedIterator) Seek(key []byte) bool {
func (i *indexedIterator) Next() bool {
if i.err != nil {
return false
} else if i.Released() {
i.err = ErrIterReleased
return false
}
switch {
@ -136,6 +161,7 @@ func (i *indexedIterator) Next() bool {
fallthrough
case i.data == nil:
if !i.index.Next() {
i.indexErr()
return false
}
i.setData()
@ -147,6 +173,9 @@ func (i *indexedIterator) Next() bool {
func (i *indexedIterator) Prev() bool {
if i.err != nil {
return false
} else if i.Released() {
i.err = ErrIterReleased
return false
}
switch {
@ -158,6 +187,7 @@ func (i *indexedIterator) Prev() bool {
fallthrough
case i.data == nil:
if !i.index.Prev() {
i.indexErr()
return false
}
i.setData()

View File

@ -14,6 +14,10 @@ import (
"github.com/syndtr/goleveldb/leveldb/util"
)
var (
ErrIterReleased = errors.New("leveldb/iterator: iterator released")
)
// IteratorSeeker is the interface that wraps the 'seeks method'.
type IteratorSeeker interface {
// First moves the iterator to the first key/value pair. If the iterator
@ -100,28 +104,13 @@ type ErrorCallbackSetter interface {
}
type emptyIterator struct {
releaser util.Releaser
released bool
err error
util.BasicReleaser
err error
}
func (i *emptyIterator) rErr() {
if i.err == nil && i.released {
i.err = errors.New("leveldb/iterator: iterator released")
}
}
func (i *emptyIterator) Release() {
if i.releaser != nil {
i.releaser.Release()
i.releaser = nil
}
i.released = true
}
func (i *emptyIterator) SetReleaser(releaser util.Releaser) {
if !i.released {
i.releaser = releaser
if i.err == nil && i.Released() {
i.err = ErrIterReleased
}
}

View File

@ -7,16 +7,10 @@
package iterator
import (
"errors"
"github.com/syndtr/goleveldb/leveldb/comparer"
"github.com/syndtr/goleveldb/leveldb/util"
)
var (
ErrIterReleased = errors.New("leveldb/iterator: iterator released")
)
type dir int
const (
@ -274,9 +268,13 @@ func (i *mergedIterator) Release() {
}
func (i *mergedIterator) SetReleaser(releaser util.Releaser) {
if i.dir != dirReleased {
i.releaser = releaser
if i.dir == dirReleased {
panic(util.ErrReleased)
}
if i.releaser != nil && releaser != nil {
panic(util.ErrHasReleaser)
}
i.releaser = releaser
}
func (i *mergedIterator) Error() error {

View File

@ -8,6 +8,7 @@
package memdb
import (
"errors"
"math/rand"
"sync"
@ -17,7 +18,8 @@ import (
)
var (
ErrNotFound = util.ErrNotFound
ErrNotFound = util.ErrNotFound
ErrIterReleased = errors.New("leveldb/memdb: iterator released")
)
const tMaxHeight = 12
@ -29,6 +31,7 @@ type dbIter struct {
node int
forward bool
key, value []byte
err error
}
func (i *dbIter) fill(checkStart, checkLimit bool) bool {
@ -59,6 +62,11 @@ func (i *dbIter) Valid() bool {
}
func (i *dbIter) First() bool {
if i.Released() {
i.err = ErrIterReleased
return false
}
i.forward = true
i.p.mu.RLock()
defer i.p.mu.RUnlock()
@ -71,9 +79,11 @@ func (i *dbIter) First() bool {
}
func (i *dbIter) Last() bool {
if i.p == nil {
if i.Released() {
i.err = ErrIterReleased
return false
}
i.forward = false
i.p.mu.RLock()
defer i.p.mu.RUnlock()
@ -86,9 +96,11 @@ func (i *dbIter) Last() bool {
}
func (i *dbIter) Seek(key []byte) bool {
if i.p == nil {
if i.Released() {
i.err = ErrIterReleased
return false
}
i.forward = true
i.p.mu.RLock()
defer i.p.mu.RUnlock()
@ -100,9 +112,11 @@ func (i *dbIter) Seek(key []byte) bool {
}
func (i *dbIter) Next() bool {
if i.p == nil {
if i.Released() {
i.err = ErrIterReleased
return false
}
if i.node == 0 {
if !i.forward {
return i.First()
@ -117,9 +131,11 @@ func (i *dbIter) Next() bool {
}
func (i *dbIter) Prev() bool {
if i.p == nil {
if i.Released() {
i.err = ErrIterReleased
return false
}
if i.node == 0 {
if i.forward {
return i.Last()
@ -141,10 +157,10 @@ func (i *dbIter) Value() []byte {
return i.value
}
func (i *dbIter) Error() error { return nil }
func (i *dbIter) Error() error { return i.err }
func (i *dbIter) Release() {
if i.p != nil {
if !i.Released() {
i.p = nil
i.node = 0
i.key = nil

View File

@ -147,7 +147,7 @@ func (s *session) fillRecord(r *sessionRecord, snapshot bool) {
}
}
// Mark if record has been commited, this will update session state;
// Mark if record has been committed, this will update session state;
// need external synchronization.
func (s *session) recordCommited(r *sessionRecord) {
if r.has(recJournalNum) {

View File

@ -7,7 +7,9 @@
package leveldb
import (
"fmt"
"sort"
"sync"
"sync/atomic"
"github.com/syndtr/goleveldb/leveldb/cache"
@ -276,6 +278,8 @@ type tOps struct {
cache cache.Cache
cacheNS cache.Namespace
bpool *util.BufferPool
mu sync.Mutex
closed bool
}
// Creates an empty table and returns table writer.
@ -322,9 +326,42 @@ func (t *tOps) createFrom(src iterator.Iterator) (f *tFile, n int, err error) {
return
}
type trWrapper struct {
*table.Reader
t *tOps
ref int
}
func (w *trWrapper) Release() {
if w.ref != 0 && !w.t.closed {
panic(fmt.Sprintf("BUG: invalid ref %d, refer to issue #72", w.ref))
}
w.Reader.Release()
}
type trCacheHandleWrapper struct {
cache.Handle
t *tOps
released bool
}
func (w *trCacheHandleWrapper) Release() {
w.t.mu.Lock()
defer w.t.mu.Unlock()
if !w.released {
w.released = true
w.Value().(*trWrapper).ref--
}
w.Handle.Release()
}
// Opens table. It returns a cache handle, which should
// be released after use.
func (t *tOps) open(f *tFile) (ch cache.Handle, err error) {
t.mu.Lock()
defer t.mu.Unlock()
num := f.file.Num()
ch = t.cacheNS.Get(num, func() (charge int, value interface{}) {
var r storage.Reader
@ -337,11 +374,13 @@ func (t *tOps) open(f *tFile) (ch cache.Handle, err error) {
if bc := t.s.o.GetBlockCache(); bc != nil {
bcacheNS = bc.GetNamespace(num)
}
return 1, table.NewReader(r, int64(f.size), bcacheNS, t.bpool, t.s.o)
return 1, &trWrapper{table.NewReader(r, int64(f.size), bcacheNS, t.bpool, t.s.o), t, 0}
})
if ch == nil && err == nil {
err = ErrClosed
}
ch.Value().(*trWrapper).ref++
ch = &trCacheHandleWrapper{ch, t, false}
return
}
@ -353,7 +392,7 @@ func (t *tOps) find(f *tFile, key []byte, ro *opt.ReadOptions) (rkey, rvalue []b
return nil, nil, err
}
defer ch.Release()
return ch.Value().(*table.Reader).Find(key, ro)
return ch.Value().(*trWrapper).Find(key, ro)
}
// Returns approximate offset of the given key.
@ -363,7 +402,7 @@ func (t *tOps) offsetOf(f *tFile, key []byte) (offset uint64, err error) {
return
}
defer ch.Release()
offset_, err := ch.Value().(*table.Reader).OffsetOf(key)
offset_, err := ch.Value().(*trWrapper).OffsetOf(key)
return uint64(offset_), err
}
@ -373,7 +412,7 @@ func (t *tOps) newIterator(f *tFile, slice *util.Range, ro *opt.ReadOptions) ite
if err != nil {
return iterator.NewEmptyIterator(err)
}
iter := ch.Value().(*table.Reader).NewIterator(slice, ro)
iter := ch.Value().(*trWrapper).NewIterator(slice, ro)
iter.SetReleaser(ch)
return iter
}
@ -381,6 +420,9 @@ func (t *tOps) newIterator(f *tFile, slice *util.Range, ro *opt.ReadOptions) ite
// Removes table from persistent storage. It waits until
// no one use the the table.
func (t *tOps) remove(f *tFile) {
t.mu.Lock()
defer t.mu.Unlock()
num := f.file.Num()
t.cacheNS.Delete(num, func(exist, pending bool) {
if !pending {
@ -399,6 +441,10 @@ func (t *tOps) remove(f *tFile) {
// Closes the table ops instance. It will close all tables,
// regadless still used or not.
func (t *tOps) close() {
t.mu.Lock()
defer t.mu.Unlock()
t.closed = true
t.cache.Zap()
t.bpool.Close()
}

View File

@ -13,6 +13,7 @@ import (
"io"
"sort"
"strings"
"sync"
"code.google.com/p/snappy-go/snappy"
@ -25,8 +26,9 @@ import (
)
var (
ErrNotFound = util.ErrNotFound
ErrIterReleased = errors.New("leveldb/table: iterator released")
ErrNotFound = util.ErrNotFound
ErrReaderReleased = errors.New("leveldb/table: reader released")
ErrIterReleased = errors.New("leveldb/table: iterator released")
)
func max(x, y int) int {
@ -134,9 +136,7 @@ func (b *block) newIterator(slice *util.Range, inclLimit bool, cache util.Releas
}
func (b *block) Release() {
if b.tr.bpool != nil {
b.tr.bpool.Put(b.data)
}
b.tr.bpool.Put(b.data)
b.tr = nil
b.data = nil
}
@ -439,7 +439,7 @@ func (i *blockIter) Value() []byte {
}
func (i *blockIter) Release() {
if i.dir > dirReleased {
if i.dir != dirReleased {
i.block = nil
i.prevNode = nil
i.prevKeys = nil
@ -458,9 +458,13 @@ func (i *blockIter) Release() {
}
func (i *blockIter) SetReleaser(releaser util.Releaser) {
if i.dir > dirReleased {
i.releaser = releaser
if i.dir == dirReleased {
panic(util.ErrReleased)
}
if i.releaser != nil && releaser != nil {
panic(util.ErrHasReleaser)
}
i.releaser = releaser
}
func (i *blockIter) Valid() bool {
@ -495,9 +499,7 @@ func (b *filterBlock) contains(offset uint64, key []byte) bool {
}
func (b *filterBlock) Release() {
if b.tr.bpool != nil {
b.tr.bpool.Put(b.data)
}
b.tr.bpool.Put(b.data)
b.tr = nil
b.data = nil
}
@ -519,15 +521,17 @@ func (i *indexIter) Get() iterator.Iterator {
if n == 0 {
return iterator.NewEmptyIterator(errors.New("leveldb/table: Reader: invalid table (bad data block handle)"))
}
var slice *util.Range
if i.slice != nil && (i.blockIter.isFirst() || i.blockIter.isLast()) {
slice = i.slice
}
return i.blockIter.block.tr.getDataIter(dataBH, slice, i.checksum, i.fillCache)
return i.blockIter.block.tr.getDataIterErr(dataBH, slice, i.checksum, i.fillCache)
}
// Reader is a table reader.
type Reader struct {
mu sync.RWMutex
reader io.ReaderAt
cache cache.Namespace
err error
@ -540,6 +544,8 @@ type Reader struct {
dataEnd int64
indexBH, filterBH blockHandle
indexBlock *block
filterBlock *filterBlock
}
func verifyChecksum(data []byte) bool {
@ -688,6 +694,20 @@ func (r *Reader) readFilterBlockCached(bh blockHandle, fillCache bool) (*filterB
return b, b, err
}
func (r *Reader) getIndexBlock(fillCache bool) (b *block, rel util.Releaser, err error) {
if r.indexBlock == nil {
return r.readBlockCached(r.indexBH, true, fillCache)
}
return r.indexBlock, util.NoopReleaser{}, nil
}
func (r *Reader) getFilterBlock(fillCache bool) (*filterBlock, util.Releaser, error) {
if r.filterBlock == nil {
return r.readFilterBlockCached(r.filterBH, fillCache)
}
return r.filterBlock, util.NoopReleaser{}, nil
}
func (r *Reader) getDataIter(dataBH blockHandle, slice *util.Range, checksum, fillCache bool) iterator.Iterator {
b, rel, err := r.readBlockCached(dataBH, checksum, fillCache)
if err != nil {
@ -696,6 +716,17 @@ func (r *Reader) getDataIter(dataBH blockHandle, slice *util.Range, checksum, fi
return b.newIterator(slice, false, rel)
}
func (r *Reader) getDataIterErr(dataBH blockHandle, slice *util.Range, checksum, fillCache bool) iterator.Iterator {
r.mu.RLock()
defer r.mu.RUnlock()
if r.err != nil {
return iterator.NewEmptyIterator(r.err)
}
return r.getDataIter(dataBH, slice, checksum, fillCache)
}
// NewIterator creates an iterator from the table.
//
// Slice allows slicing the iterator to only contains keys in the given
@ -708,22 +739,25 @@ func (r *Reader) getDataIter(dataBH blockHandle, slice *util.Range, checksum, fi
//
// Also read Iterator documentation of the leveldb/iterator package.
func (r *Reader) NewIterator(slice *util.Range, ro *opt.ReadOptions) iterator.Iterator {
r.mu.RLock()
defer r.mu.RUnlock()
if r.err != nil {
return iterator.NewEmptyIterator(r.err)
}
fillCache := !ro.GetDontFillCache()
b, rel, err := r.readBlockCached(r.indexBH, true, fillCache)
indexBlock, rel, err := r.getIndexBlock(fillCache)
if err != nil {
return iterator.NewEmptyIterator(err)
}
index := &indexIter{
blockIter: b.newIterator(slice, true, rel),
blockIter: indexBlock.newIterator(slice, true, rel),
slice: slice,
checksum: ro.GetStrict(opt.StrictBlockChecksum),
fillCache: !ro.GetDontFillCache(),
}
return iterator.NewIndexedIterator(index, r.strictIter || ro.GetStrict(opt.StrictIterator), false)
return iterator.NewIndexedIterator(index, r.strictIter || ro.GetStrict(opt.StrictIterator), true)
}
// Find finds key/value pair whose key is greater than or equal to the
@ -733,12 +767,15 @@ func (r *Reader) NewIterator(slice *util.Range, ro *opt.ReadOptions) iterator.It
// The caller should not modify the contents of the returned slice, but
// it is safe to modify the contents of the argument after Find returns.
func (r *Reader) Find(key []byte, ro *opt.ReadOptions) (rkey, value []byte, err error) {
r.mu.RLock()
defer r.mu.RUnlock()
if r.err != nil {
err = r.err
return
}
indexBlock, rel, err := r.readBlockCached(r.indexBH, true, true)
indexBlock, rel, err := r.getIndexBlock(true)
if err != nil {
return
}
@ -759,7 +796,7 @@ func (r *Reader) Find(key []byte, ro *opt.ReadOptions) (rkey, value []byte, err
return
}
if r.filter != nil {
filterBlock, rel, ferr := r.readFilterBlockCached(r.filterBH, true)
filterBlock, rel, ferr := r.getFilterBlock(true)
if ferr == nil {
if !filterBlock.contains(dataBH.offset, key) {
rel.Release()
@ -779,9 +816,13 @@ func (r *Reader) Find(key []byte, ro *opt.ReadOptions) (rkey, value []byte, err
}
// Don't use block buffer, no need to copy the buffer.
rkey = data.Key()
// Use block buffer, and since the buffer will be recycled, the buffer
// need to be copied.
value = append([]byte{}, data.Value()...)
if r.bpool == nil {
value = data.Value()
} else {
// Use block buffer, and since the buffer will be recycled, the buffer
// need to be copied.
value = append([]byte{}, data.Value()...)
}
return
}
@ -791,6 +832,9 @@ func (r *Reader) Find(key []byte, ro *opt.ReadOptions) (rkey, value []byte, err
// The caller should not modify the contents of the returned slice, but
// it is safe to modify the contents of the argument after Get returns.
func (r *Reader) Get(key []byte, ro *opt.ReadOptions) (value []byte, err error) {
r.mu.RLock()
defer r.mu.RUnlock()
if r.err != nil {
err = r.err
return
@ -808,6 +852,9 @@ func (r *Reader) Get(key []byte, ro *opt.ReadOptions) (value []byte, err error)
//
// It is safe to modify the contents of the argument after Get returns.
func (r *Reader) OffsetOf(key []byte) (offset int64, err error) {
r.mu.RLock()
defer r.mu.RUnlock()
if r.err != nil {
err = r.err
return
@ -840,12 +887,24 @@ func (r *Reader) OffsetOf(key []byte) (offset int64, err error) {
// Release implements util.Releaser.
// It also close the file if it is an io.Closer.
func (r *Reader) Release() {
r.mu.Lock()
defer r.mu.Unlock()
if closer, ok := r.reader.(io.Closer); ok {
closer.Close()
}
if r.indexBlock != nil {
r.indexBlock.Release()
r.indexBlock = nil
}
if r.filterBlock != nil {
r.filterBlock.Release()
r.filterBlock = nil
}
r.reader = nil
r.cache = nil
r.bpool = nil
r.err = ErrReaderReleased
}
// NewReader creates a new initialized table reader for the file.
@ -853,9 +912,6 @@ func (r *Reader) Release() {
//
// The returned table reader instance is goroutine-safe.
func NewReader(f io.ReaderAt, size int64, cache cache.Namespace, bpool *util.BufferPool, o *opt.Options) *Reader {
if bpool == nil {
bpool = util.NewBufferPool(o.GetBlockSize() + blockTrailerLen)
}
r := &Reader{
reader: f,
cache: cache,
@ -930,5 +986,22 @@ func NewReader(f io.ReaderAt, size int64, cache cache.Namespace, bpool *util.Buf
}
metaIter.Release()
metaBlock.Release()
// Cache index and filter block locally, since we don't have global cache.
if cache == nil {
r.indexBlock, r.err = r.readBlock(r.indexBH, true)
if r.err != nil {
return r
}
if r.filter != nil {
r.filterBlock, err = r.readFilterBlock(r.filterBH)
if err != nil {
// Don't use filter then.
r.filter = nil
r.filterBH = blockHandle{}
}
}
}
return r
}

View File

@ -54,6 +54,10 @@ func (p *BufferPool) poolNum(n int) int {
// Get returns buffer with length of n.
func (p *BufferPool) Get(n int) []byte {
if p == nil {
return make([]byte, n)
}
atomic.AddUint32(&p.get, 1)
poolNum := p.poolNum(n)
@ -145,6 +149,10 @@ func (p *BufferPool) Get(n int) []byte {
// Put adds given buffer to the pool.
func (p *BufferPool) Put(b []byte) {
if p == nil {
return
}
atomic.AddUint32(&p.put, 1)
pool := p.pool[p.poolNum(cap(b))]
@ -156,6 +164,10 @@ func (p *BufferPool) Put(b []byte) {
}
func (p *BufferPool) Close() {
if p == nil {
return
}
select {
case p.close <- struct{}{}:
default:
@ -163,6 +175,10 @@ func (p *BufferPool) Close() {
}
func (p *BufferPool) String() string {
if p == nil {
return "<nil>"
}
return fmt.Sprintf("BufferPool{B·%d Z·%v Zm·%v Zh·%v G·%d P·%d H·%d <·%d =·%d >·%d M·%d}",
p.baseline0, p.size, p.sizeMiss, p.sizeHalf, p.get, p.put, p.half, p.less, p.equal, p.greater, p.miss)
}

View File

@ -12,7 +12,9 @@ import (
)
var (
ErrNotFound = errors.New("leveldb: not found")
ErrNotFound = errors.New("leveldb: not found")
ErrReleased = errors.New("leveldb: resource already relesed")
ErrHasReleaser = errors.New("leveldb: releaser already defined")
)
// Releaser is the interface that wraps the basic Release method.
@ -27,23 +29,46 @@ type ReleaseSetter interface {
// SetReleaser associates the given releaser to the resources. The
// releaser will be called once coresponding resources released.
// Calling SetReleaser with nil will clear the releaser.
//
// This will panic if a releaser already present or coresponding
// resource is already released. Releaser should be cleared first
// before assigned a new one.
SetReleaser(releaser Releaser)
}
// BasicReleaser provides basic implementation of Releaser and ReleaseSetter.
type BasicReleaser struct {
releaser Releaser
released bool
}
// Released returns whether Release method already called.
func (r *BasicReleaser) Released() bool {
return r.released
}
// Release implements Releaser.Release.
func (r *BasicReleaser) Release() {
if r.releaser != nil {
r.releaser.Release()
r.releaser = nil
if !r.released {
if r.releaser != nil {
r.releaser.Release()
r.releaser = nil
}
r.released = true
}
}
// SetReleaser implements ReleaseSetter.SetReleaser.
func (r *BasicReleaser) SetReleaser(releaser Releaser) {
if r.released {
panic(ErrReleased)
}
if r.releaser != nil && releaser != nil {
panic(ErrHasReleaser)
}
r.releaser = releaser
}
type NoopReleaser struct{}
func (NoopReleaser) Release() {}