Update goleveldb

This commit is contained in:
Jakob Borg 2014-08-15 09:16:30 +02:00
parent 1ee3407946
commit 32a9466277
12 changed files with 371 additions and 76 deletions

2
Godeps/Godeps.json generated
View File

@ -49,7 +49,7 @@
},
{
"ImportPath": "github.com/syndtr/goleveldb/leveldb",
"Rev": "6f6f5d93f7499d2c505c2839c1d6b28b25a2ce21"
"Rev": "a44c00531ccc005546f20c6e00ab7bb9a8f6b2e0"
},
{
"ImportPath": "github.com/vitrun/qart/coding",

View File

@ -13,7 +13,6 @@ import (
"os"
"path/filepath"
"runtime"
"sync/atomic"
"testing"
"github.com/syndtr/goleveldb/leveldb/iterator"
@ -345,50 +344,6 @@ func BenchmarkDBRead(b *testing.B) {
p.close()
}
func BenchmarkDBReadConcurrent(b *testing.B) {
p := openDBBench(b, false)
p.populate(b.N)
p.fill()
p.gc()
defer p.close()
b.ResetTimer()
b.SetBytes(116)
b.RunParallel(func(pb *testing.PB) {
iter := p.newIter()
defer iter.Release()
for pb.Next() && iter.Next() {
}
})
}
func BenchmarkDBReadConcurrent2(b *testing.B) {
p := openDBBench(b, false)
p.populate(b.N)
p.fill()
p.gc()
defer p.close()
b.ResetTimer()
b.SetBytes(116)
var dir uint32
b.RunParallel(func(pb *testing.PB) {
iter := p.newIter()
defer iter.Release()
if atomic.AddUint32(&dir, 1)%2 == 0 {
for pb.Next() && iter.Next() {
}
} else {
if pb.Next() && iter.Last() {
for pb.Next() && iter.Prev() {
}
}
}
})
}
func BenchmarkDBReadGC(b *testing.B) {
p := openDBBench(b, false)
p.populate(b.N)

View File

@ -35,8 +35,8 @@ type DB struct {
// MemDB.
memMu sync.RWMutex
mem *memdb.DB
frozenMem *memdb.DB
memPool *util.Pool
mem, frozenMem *memDB
journal *journal.Writer
journalWriter storage.Writer
journalFile storage.File
@ -79,6 +79,8 @@ func openDB(s *session) (*DB, error) {
s: s,
// Initial sequence
seq: s.stSeq,
// MemDB
memPool: util.NewPool(1),
// Write
writeC: make(chan *Batch),
writeMergedC: make(chan bool),
@ -560,19 +562,20 @@ func (db *DB) get(key []byte, seq uint64, ro *opt.ReadOptions) (value []byte, er
ikey := newIKey(key, seq, tSeek)
em, fm := db.getMems()
for _, m := range [...]*memdb.DB{em, fm} {
for _, m := range [...]*memDB{em, fm} {
if m == nil {
continue
}
defer m.decref()
mk, mv, me := m.Find(ikey)
mk, mv, me := m.db.Find(ikey)
if me == nil {
ukey, _, t, ok := parseIkey(mk)
if ok && db.s.icmp.uCompare(ukey, key) == 0 {
if t == tDel {
return nil, ErrNotFound
}
return mv, nil
return append([]byte{}, mv...), nil
}
} else if me != ErrNotFound {
return nil, me
@ -592,8 +595,9 @@ func (db *DB) get(key []byte, seq uint64, ro *opt.ReadOptions) (value []byte, er
// Get gets the value for the given key. It returns ErrNotFound if the
// DB does not contain the key.
//
// The caller should not modify the contents of the returned slice, but
// it is safe to modify the contents of the argument after Get returns.
// The returned slice is its own copy, it is safe to modify the contents
// of the returned slice.
// It is safe to modify the contents of the argument after Get returns.
func (db *DB) Get(key []byte, ro *opt.ReadOptions) (value []byte, err error) {
err = db.ok()
if err != nil {

View File

@ -216,14 +216,15 @@ func (db *DB) memCompaction() {
if mem == nil {
return
}
defer mem.decref()
c := newCMem(db.s)
stats := new(cStatsStaging)
db.logf("mem@flush N·%d S·%s", mem.Len(), shortenb(mem.Size()))
db.logf("mem@flush N·%d S·%s", mem.db.Len(), shortenb(mem.db.Size()))
// Don't compact empty memdb.
if mem.Len() == 0 {
if mem.db.Len() == 0 {
db.logf("mem@flush skipping")
// drop frozen mem
db.dropFrozenMem()
@ -241,7 +242,7 @@ func (db *DB) memCompaction() {
db.compactionTransact("mem@flush", func(cnt *compactionTransactCounter) (err error) {
stats.startTimer()
defer stats.stopTimer()
return c.flush(mem, -1)
return c.flush(mem.db, -1)
}, func() error {
for _, r := range c.rec.addedTables {
db.logf("mem@flush rollback @%d", r.num)

View File

@ -9,6 +9,7 @@ package leveldb
import (
"errors"
"runtime"
"sync"
"github.com/syndtr/goleveldb/leveldb/iterator"
"github.com/syndtr/goleveldb/leveldb/opt"
@ -19,6 +20,17 @@ var (
errInvalidIkey = errors.New("leveldb: Iterator: invalid internal key")
)
type memdbReleaser struct {
once sync.Once
m *memDB
}
func (mr *memdbReleaser) Release() {
mr.once.Do(func() {
mr.m.decref()
})
}
func (db *DB) newRawIterator(slice *util.Range, ro *opt.ReadOptions) iterator.Iterator {
em, fm := db.getMems()
v := db.s.version()
@ -26,9 +38,13 @@ func (db *DB) newRawIterator(slice *util.Range, ro *opt.ReadOptions) iterator.It
ti := v.getIterators(slice, ro)
n := len(ti) + 2
i := make([]iterator.Iterator, 0, n)
i = append(i, em.NewIterator(slice))
emi := em.db.NewIterator(slice)
emi.SetReleaser(&memdbReleaser{m: em})
i = append(i, emi)
if fm != nil {
i = append(i, fm.NewIterator(slice))
fmi := fm.db.NewIterator(slice)
fmi.SetReleaser(&memdbReleaser{m: fm})
i = append(i, fmi)
}
i = append(i, ti...)
strict := db.s.o.GetStrict(opt.StrictIterator) || ro.GetStrict(opt.StrictIterator)

View File

@ -11,8 +11,27 @@ import (
"github.com/syndtr/goleveldb/leveldb/journal"
"github.com/syndtr/goleveldb/leveldb/memdb"
"github.com/syndtr/goleveldb/leveldb/util"
)
type memDB struct {
pool *util.Pool
db *memdb.DB
ref int32
}
func (m *memDB) incref() {
atomic.AddInt32(&m.ref, 1)
}
func (m *memDB) decref() {
if ref := atomic.AddInt32(&m.ref, -1); ref == 0 {
m.pool.Put(m)
} else if ref < 0 {
panic("negative memdb ref")
}
}
// Get latest sequence number.
func (db *DB) getSeq() uint64 {
return atomic.LoadUint64(&db.seq)
@ -25,7 +44,7 @@ func (db *DB) addSeq(delta uint64) {
// Create new memdb and froze the old one; need external synchronization.
// newMem only called synchronously by the writer.
func (db *DB) newMem(n int) (mem *memdb.DB, err error) {
func (db *DB) newMem(n int) (mem *memDB, err error) {
num := db.s.allocFileNum()
file := db.s.getJournalFile(num)
w, err := file.Create()
@ -37,6 +56,10 @@ func (db *DB) newMem(n int) (mem *memdb.DB, err error) {
db.memMu.Lock()
defer db.memMu.Unlock()
if db.frozenMem != nil {
panic("still has frozen mem")
}
if db.journal == nil {
db.journal = journal.NewWriter(w)
} else {
@ -47,8 +70,19 @@ func (db *DB) newMem(n int) (mem *memdb.DB, err error) {
db.journalWriter = w
db.journalFile = file
db.frozenMem = db.mem
db.mem = memdb.New(db.s.icmp, maxInt(db.s.o.GetWriteBuffer(), n))
mem = db.mem
mem, ok := db.memPool.Get().(*memDB)
if ok && mem.db.Capacity() >= n {
mem.db.Reset()
mem.incref()
} else {
mem = &memDB{
pool: db.memPool,
db: memdb.New(db.s.icmp, maxInt(db.s.o.GetWriteBuffer(), n)),
ref: 1,
}
}
mem.incref()
db.mem = mem
// The seq only incremented by the writer. And whoever called newMem
// should hold write lock, so no need additional synchronization here.
db.frozenSeq = db.seq
@ -56,16 +90,27 @@ func (db *DB) newMem(n int) (mem *memdb.DB, err error) {
}
// Get all memdbs.
func (db *DB) getMems() (e *memdb.DB, f *memdb.DB) {
func (db *DB) getMems() (e, f *memDB) {
db.memMu.RLock()
defer db.memMu.RUnlock()
if db.mem == nil {
panic("nil effective mem")
}
db.mem.incref()
if db.frozenMem != nil {
db.frozenMem.incref()
}
return db.mem, db.frozenMem
}
// Get frozen memdb.
func (db *DB) getEffectiveMem() *memdb.DB {
func (db *DB) getEffectiveMem() *memDB {
db.memMu.RLock()
defer db.memMu.RUnlock()
if db.mem == nil {
panic("nil effective mem")
}
db.mem.incref()
return db.mem
}
@ -77,9 +122,12 @@ func (db *DB) hasFrozenMem() bool {
}
// Get frozen memdb.
func (db *DB) getFrozenMem() *memdb.DB {
func (db *DB) getFrozenMem() *memDB {
db.memMu.RLock()
defer db.memMu.RUnlock()
if db.frozenMem != nil {
db.frozenMem.incref()
}
return db.frozenMem
}
@ -92,6 +140,7 @@ func (db *DB) dropFrozenMem() {
db.logf("journal@remove removed @%d", db.frozenJournalFile.Num())
}
db.frozenJournalFile = nil
db.frozenMem.decref()
db.frozenMem = nil
db.memMu.Unlock()
}

View File

@ -45,7 +45,7 @@ func (db *DB) jWriter() {
}
}
func (db *DB) rotateMem(n int) (mem *memdb.DB, err error) {
func (db *DB) rotateMem(n int) (mem *memDB, err error) {
// Wait for pending memdb compaction.
err = db.compSendIdle(db.mcompCmdC)
if err != nil {
@ -63,13 +63,19 @@ func (db *DB) rotateMem(n int) (mem *memdb.DB, err error) {
return
}
func (db *DB) flush(n int) (mem *memdb.DB, nn int, err error) {
func (db *DB) flush(n int) (mem *memDB, nn int, err error) {
delayed := false
flush := func() bool {
flush := func() (retry bool) {
v := db.s.version()
defer v.release()
mem = db.getEffectiveMem()
nn = mem.Free()
defer func() {
if retry {
mem.decref()
mem = nil
}
}()
nn = mem.db.Free()
switch {
case v.tLen(0) >= kL0_SlowdownWritesTrigger && !delayed:
delayed = true
@ -84,12 +90,17 @@ func (db *DB) flush(n int) (mem *memdb.DB, nn int, err error) {
}
default:
// Allow memdb to grow if it has no entry.
if mem.Len() == 0 {
if mem.db.Len() == 0 {
nn = n
return false
} else {
mem.decref()
mem, err = db.rotateMem(n)
if err == nil {
nn = mem.db.Free()
} else {
nn = 0
}
}
mem, err = db.rotateMem(n)
nn = mem.Free()
return false
}
return true
@ -140,6 +151,7 @@ retry:
if err != nil {
return
}
defer mem.decref()
// Calculate maximum size of the batch.
m := 1 << 20
@ -178,7 +190,7 @@ drain:
return
case db.journalC <- b:
// Write into memdb
b.memReplay(mem)
b.memReplay(mem.db)
}
// Wait for journal writer
select {
@ -188,7 +200,7 @@ drain:
case err = <-db.journalAckC:
if err != nil {
// Revert memdb if error detected
b.revertMemReplay(mem)
b.revertMemReplay(mem.db)
return
}
}
@ -197,7 +209,7 @@ drain:
if err != nil {
return
}
b.memReplay(mem)
b.memReplay(mem.db)
}
// Set last seq number.
@ -258,7 +270,8 @@ func (db *DB) CompactRange(r util.Range) error {
// Check for overlaps in memdb.
mem := db.getEffectiveMem()
if isMemOverlaps(db.s.icmp, mem, r.Start, r.Limit) {
defer mem.decref()
if isMemOverlaps(db.s.icmp, mem.db, r.Start, r.Limit) {
// Memdb compaction.
if _, err := db.rotateMem(0); err != nil {
<-db.writeLockC

View File

@ -0,0 +1,58 @@
// Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com>
// All rights reserved.
//
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
// +build go1.3
package leveldb
import (
"sync/atomic"
"testing"
)
func BenchmarkDBReadConcurrent(b *testing.B) {
p := openDBBench(b, false)
p.populate(b.N)
p.fill()
p.gc()
defer p.close()
b.ResetTimer()
b.SetBytes(116)
b.RunParallel(func(pb *testing.PB) {
iter := p.newIter()
defer iter.Release()
for pb.Next() && iter.Next() {
}
})
}
func BenchmarkDBReadConcurrent2(b *testing.B) {
p := openDBBench(b, false)
p.populate(b.N)
p.fill()
p.gc()
defer p.close()
b.ResetTimer()
b.SetBytes(116)
var dir uint32
b.RunParallel(func(pb *testing.PB) {
iter := p.newIter()
defer iter.Release()
if atomic.AddUint32(&dir, 1)%2 == 0 {
for pb.Next() && iter.Next() {
}
} else {
if pb.Next() && iter.Last() {
for pb.Next() && iter.Prev() {
}
}
}
})
}

View File

@ -4,6 +4,8 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
// +build go1.3
package util
import (

View File

@ -0,0 +1,143 @@
// Copyright (c) 2014, Suryandaru Triandana <syndtr@gmail.com>
// All rights reserved.
//
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
// +build !go1.3
package util
import (
"fmt"
"sync/atomic"
)
type buffer struct {
b []byte
miss int
}
// BufferPool is a 'buffer pool'.
type BufferPool struct {
pool [4]chan []byte
size [3]uint32
sizeMiss [3]uint32
baseline0 int
baseline1 int
baseline2 int
less uint32
equal uint32
greater uint32
miss uint32
}
func (p *BufferPool) poolNum(n int) int {
switch {
case n <= p.baseline0:
return 0
case n <= p.baseline1:
return 1
case n <= p.baseline2:
return 2
default:
return 3
}
}
// Get returns buffer with length of n.
func (p *BufferPool) Get(n int) []byte {
poolNum := p.poolNum(n)
pool := p.pool[poolNum]
if poolNum == 0 {
// Fast path.
select {
case b := <-pool:
switch {
case cap(b) > n:
atomic.AddUint32(&p.less, 1)
return b[:n]
case cap(b) == n:
atomic.AddUint32(&p.equal, 1)
return b[:n]
default:
panic("not reached")
}
default:
atomic.AddUint32(&p.miss, 1)
}
return make([]byte, n, p.baseline0)
} else {
sizePtr := &p.size[poolNum-1]
select {
case b := <-pool:
switch {
case cap(b) > n:
atomic.AddUint32(&p.less, 1)
return b[:n]
case cap(b) == n:
atomic.AddUint32(&p.equal, 1)
return b[:n]
default:
atomic.AddUint32(&p.greater, 1)
if uint32(cap(b)) >= atomic.LoadUint32(sizePtr) {
select {
case pool <- b:
default:
}
}
}
default:
atomic.AddUint32(&p.miss, 1)
}
if size := atomic.LoadUint32(sizePtr); uint32(n) > size {
if size == 0 {
atomic.CompareAndSwapUint32(sizePtr, 0, uint32(n))
} else {
sizeMissPtr := &p.sizeMiss[poolNum-1]
if atomic.AddUint32(sizeMissPtr, 1) == 20 {
atomic.StoreUint32(sizePtr, uint32(n))
atomic.StoreUint32(sizeMissPtr, 0)
}
}
return make([]byte, n)
} else {
return make([]byte, n, size)
}
}
}
// Put adds given buffer to the pool.
func (p *BufferPool) Put(b []byte) {
pool := p.pool[p.poolNum(cap(b))]
select {
case pool <- b:
default:
}
}
func (p *BufferPool) String() string {
return fmt.Sprintf("BufferPool{B·%d Z·%v Zm·%v L·%d E·%d G·%d M·%d}",
p.baseline0, p.size, p.sizeMiss, p.less, p.equal, p.greater, p.miss)
}
// NewBufferPool creates a new initialized 'buffer pool'.
func NewBufferPool(baseline int) *BufferPool {
if baseline <= 0 {
panic("baseline can't be <= 0")
}
p := &BufferPool{
baseline0: baseline,
baseline1: baseline * 2,
baseline2: baseline * 4,
}
for i, cap := range []int{6, 6, 3, 1} {
p.pool[i] = make(chan []byte, cap)
}
return p
}

View File

@ -0,0 +1,21 @@
// Copyright (c) 2014, Suryandaru Triandana <syndtr@gmail.com>
// All rights reserved.
//
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
// +build go1.3
package util
import (
"sync"
)
type Pool struct {
sync.Pool
}
func NewPool(cap int) *Pool {
return &Pool{}
}

View File

@ -0,0 +1,33 @@
// Copyright (c) 2014, Suryandaru Triandana <syndtr@gmail.com>
// All rights reserved.
//
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
// +build !go1.3
package util
type Pool struct {
pool chan interface{}
}
func (p *Pool) Get() interface{} {
select {
case x := <-p.pool:
return x
default:
return nil
}
}
func (p *Pool) Put(x interface{}) {
select {
case p.pool <- x:
default:
}
}
func NewPool(cap int) *Pool {
return &Pool{pool: make(chan interface{}, cap)}
}