Update goleveldb

This commit is contained in:
Jakob Borg 2014-09-02 09:43:42 +02:00
parent de0b91d157
commit f633bdddf0
16 changed files with 384 additions and 179 deletions

2
Godeps/Godeps.json generated
View File

@ -49,7 +49,7 @@
},
{
"ImportPath": "github.com/syndtr/goleveldb/leveldb",
"Rev": "59d87758aeaab5ab6ed289c773349500228a1557"
"Rev": "2b99e8d4757bf06eeab1b0485d80b8ae1c088874"
},
{
"ImportPath": "github.com/vitrun/qart/coding",

View File

@ -40,10 +40,21 @@ type Cache interface {
// Size returns entire alive cache objects size.
Size() int
// NumObjects returns number of alive objects.
NumObjects() int
// GetNamespace gets cache namespace with the given id.
// GetNamespace is never return nil.
GetNamespace(id uint64) Namespace
// PurgeNamespace purges cache namespace with the given id from this cache tree.
// Also read Namespace.Purge.
PurgeNamespace(id uint64, fin PurgeFin)
// ZapNamespace detaches cache namespace with the given id from this cache tree.
// Also read Namespace.Zap.
ZapNamespace(id uint64)
// Purge purges all cache namespace from this cache tree.
// This is behave the same as calling Namespace.Purge method on all cache namespace.
Purge(fin PurgeFin)

View File

@ -15,11 +15,11 @@ import (
// lruCache represent a LRU cache state.
type lruCache struct {
mu sync.Mutex
recent lruNode
table map[uint64]*lruNs
capacity int
used, size int
mu sync.Mutex
recent lruNode
table map[uint64]*lruNs
capacity int
used, size, alive int
}
// NewLRUCache creates a new initialized LRU cache with the given capacity.
@ -51,6 +51,12 @@ func (c *lruCache) Size() int {
return c.size
}
func (c *lruCache) NumObjects() int {
c.mu.Lock()
defer c.mu.Unlock()
return c.alive
}
// SetCapacity set cache capacity.
func (c *lruCache) SetCapacity(capacity int) {
c.mu.Lock()
@ -77,6 +83,23 @@ func (c *lruCache) GetNamespace(id uint64) Namespace {
return ns
}
func (c *lruCache) ZapNamespace(id uint64) {
c.mu.Lock()
if ns, exist := c.table[id]; exist {
ns.zapNB()
delete(c.table, id)
}
c.mu.Unlock()
}
func (c *lruCache) PurgeNamespace(id uint64, fin PurgeFin) {
c.mu.Lock()
if ns, exist := c.table[id]; exist {
ns.purgeNB(fin)
}
c.mu.Unlock()
}
// Purge purge entire cache.
func (c *lruCache) Purge(fin PurgeFin) {
c.mu.Lock()
@ -158,11 +181,12 @@ func (ns *lruNs) Get(key uint64, setf SetFunc) Handle {
}
ns.table[key] = node
ns.lru.size += charge
ns.lru.alive++
if charge > 0 {
node.ref++
node.rInsert(&ns.lru.recent)
ns.lru.used += charge
ns.lru.size += charge
ns.lru.evict()
}
}
@ -322,8 +346,10 @@ func (n *lruNode) derefNB() {
// Remove elemement.
delete(n.ns.table, n.key)
n.ns.lru.size -= n.charge
n.ns.lru.alive--
n.fin()
}
n.value = nil
} else if n.ref < 0 {
panic("leveldb/cache: lruCache: negative node reference")
}

View File

@ -14,6 +14,7 @@ import (
"runtime"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/syndtr/goleveldb/leveldb/iterator"
@ -35,7 +36,7 @@ type DB struct {
// MemDB.
memMu sync.RWMutex
memPool *util.Pool
memPool chan *memdb.DB
mem, frozenMem *memDB
journal *journal.Writer
journalWriter storage.Writer
@ -47,6 +48,9 @@ type DB struct {
snapsMu sync.Mutex
snapsRoot snapshotElement
// Stats.
aliveSnaps, aliveIters int32
// Write.
writeC chan *Batch
writeMergedC chan bool
@ -80,7 +84,7 @@ func openDB(s *session) (*DB, error) {
// Initial sequence
seq: s.stSeq,
// MemDB
memPool: util.NewPool(1),
memPool: make(chan *memdb.DB, 1),
// Write
writeC: make(chan *Batch),
writeMergedC: make(chan bool),
@ -122,6 +126,7 @@ func openDB(s *session) (*DB, error) {
go db.tCompaction()
go db.mCompaction()
go db.jWriter()
go db.mpoolDrain()
s.logf("db@open done T·%v", time.Since(start))
@ -568,7 +573,7 @@ func (db *DB) get(key []byte, seq uint64, ro *opt.ReadOptions) (value []byte, er
}
defer m.decref()
mk, mv, me := m.db.Find(ikey)
mk, mv, me := m.mdb.Find(ikey)
if me == nil {
ukey, _, t, ok := parseIkey(mk)
if ok && db.s.icmp.uCompare(ukey, key) == 0 {
@ -657,6 +662,14 @@ func (db *DB) GetSnapshot() (*Snapshot, error) {
// Returns sstables list for each level.
// leveldb.blockpool
// Returns block pool stats.
// leveldb.cachedblock
// Returns size of cached block.
// leveldb.openedtables
// Returns number of opened tables.
// leveldb.alivesnaps
// Returns number of alive snapshots.
// leveldb.aliveiters
// Returns number of alive iterators.
func (db *DB) GetProperty(name string) (value string, err error) {
err = db.ok()
if err != nil {
@ -712,6 +725,10 @@ func (db *DB) GetProperty(name string) (value string, err error) {
}
case p == "openedtables":
value = fmt.Sprintf("%d", db.s.tops.cache.Size())
case p == "alivesnaps":
value = fmt.Sprintf("%d", atomic.LoadInt32(&db.aliveSnaps))
case p == "aliveiters":
value = fmt.Sprintf("%d", atomic.LoadInt32(&db.aliveIters))
default:
err = errors.New("leveldb: GetProperty: unknown property: " + name)
}

View File

@ -221,10 +221,10 @@ func (db *DB) memCompaction() {
c := newCMem(db.s)
stats := new(cStatsStaging)
db.logf("mem@flush N·%d S·%s", mem.db.Len(), shortenb(mem.db.Size()))
db.logf("mem@flush N·%d S·%s", mem.mdb.Len(), shortenb(mem.mdb.Size()))
// Don't compact empty memdb.
if mem.db.Len() == 0 {
if mem.mdb.Len() == 0 {
db.logf("mem@flush skipping")
// drop frozen mem
db.dropFrozenMem()
@ -242,7 +242,7 @@ func (db *DB) memCompaction() {
db.compactionTransact("mem@flush", func(cnt *compactionTransactCounter) (err error) {
stats.startTimer()
defer stats.stopTimer()
return c.flush(mem.db, -1)
return c.flush(mem.mdb, -1)
}, func() error {
for _, r := range c.rec.addedTables {
db.logf("mem@flush rollback @%d", r.num)

View File

@ -10,6 +10,7 @@ import (
"errors"
"runtime"
"sync"
"sync/atomic"
"github.com/syndtr/goleveldb/leveldb/iterator"
"github.com/syndtr/goleveldb/leveldb/opt"
@ -38,11 +39,11 @@ func (db *DB) newRawIterator(slice *util.Range, ro *opt.ReadOptions) iterator.It
ti := v.getIterators(slice, ro)
n := len(ti) + 2
i := make([]iterator.Iterator, 0, n)
emi := em.db.NewIterator(slice)
emi := em.mdb.NewIterator(slice)
emi.SetReleaser(&memdbReleaser{m: em})
i = append(i, emi)
if fm != nil {
fmi := fm.db.NewIterator(slice)
fmi := fm.mdb.NewIterator(slice)
fmi.SetReleaser(&memdbReleaser{m: fm})
i = append(i, fmi)
}
@ -66,6 +67,7 @@ func (db *DB) newIterator(seq uint64, slice *util.Range, ro *opt.ReadOptions) *d
}
rawIter := db.newRawIterator(islice, ro)
iter := &dbIter{
db: db,
icmp: db.s.icmp,
iter: rawIter,
seq: seq,
@ -73,6 +75,7 @@ func (db *DB) newIterator(seq uint64, slice *util.Range, ro *opt.ReadOptions) *d
key: make([]byte, 0),
value: make([]byte, 0),
}
atomic.AddInt32(&db.aliveIters, 1)
runtime.SetFinalizer(iter, (*dbIter).Release)
return iter
}
@ -89,6 +92,7 @@ const (
// dbIter represent an interator states over a database session.
type dbIter struct {
db *DB
icmp *iComparer
iter iterator.Iterator
seq uint64
@ -303,6 +307,7 @@ func (i *dbIter) Release() {
if i.releaser != nil {
i.releaser.Release()
i.releaser = nil
}
i.dir = dirReleased
@ -310,6 +315,8 @@ func (i *dbIter) Release() {
i.value = nil
i.iter.Release()
i.iter = nil
atomic.AddInt32(&i.db.aliveIters, -1)
i.db = nil
}
}

View File

@ -9,6 +9,7 @@ package leveldb
import (
"runtime"
"sync"
"sync/atomic"
"github.com/syndtr/goleveldb/leveldb/iterator"
"github.com/syndtr/goleveldb/leveldb/opt"
@ -81,7 +82,7 @@ func (db *DB) minSeq() uint64 {
type Snapshot struct {
db *DB
elem *snapshotElement
mu sync.Mutex
mu sync.RWMutex
released bool
}
@ -91,6 +92,7 @@ func (db *DB) newSnapshot() *Snapshot {
db: db,
elem: db.acquireSnapshot(),
}
atomic.AddInt32(&db.aliveSnaps, 1)
runtime.SetFinalizer(snap, (*Snapshot).Release)
return snap
}
@ -105,8 +107,8 @@ func (snap *Snapshot) Get(key []byte, ro *opt.ReadOptions) (value []byte, err er
if err != nil {
return
}
snap.mu.Lock()
defer snap.mu.Unlock()
snap.mu.RLock()
defer snap.mu.RUnlock()
if snap.released {
err = ErrSnapshotReleased
return
@ -160,6 +162,7 @@ func (snap *Snapshot) Release() {
snap.released = true
snap.db.releaseSnapshot(snap.elem)
atomic.AddInt32(&snap.db.aliveSnaps, -1)
snap.db = nil
snap.elem = nil
}

View File

@ -8,16 +8,16 @@ package leveldb
import (
"sync/atomic"
"time"
"github.com/syndtr/goleveldb/leveldb/journal"
"github.com/syndtr/goleveldb/leveldb/memdb"
"github.com/syndtr/goleveldb/leveldb/util"
)
type memDB struct {
pool *util.Pool
db *memdb.DB
ref int32
db *DB
mdb *memdb.DB
ref int32
}
func (m *memDB) incref() {
@ -26,7 +26,13 @@ func (m *memDB) incref() {
func (m *memDB) decref() {
if ref := atomic.AddInt32(&m.ref, -1); ref == 0 {
m.pool.Put(m)
// Only put back memdb with std capacity.
if m.mdb.Capacity() == m.db.s.o.GetWriteBuffer() {
m.mdb.Reset()
m.db.mpoolPut(m.mdb)
}
m.db = nil
m.mdb = nil
} else if ref < 0 {
panic("negative memdb ref")
}
@ -42,6 +48,41 @@ func (db *DB) addSeq(delta uint64) {
atomic.AddUint64(&db.seq, delta)
}
func (db *DB) mpoolPut(mem *memdb.DB) {
defer func() {
recover()
}()
select {
case db.memPool <- mem:
default:
}
}
func (db *DB) mpoolGet() *memdb.DB {
select {
case mem := <-db.memPool:
return mem
default:
return nil
}
}
func (db *DB) mpoolDrain() {
ticker := time.NewTicker(30 * time.Second)
for {
select {
case <-ticker.C:
select {
case <-db.memPool:
default:
}
case _, _ = <-db.closeC:
close(db.memPool)
return
}
}
}
// Create new memdb and froze the old one; need external synchronization.
// newMem only called synchronously by the writer.
func (db *DB) newMem(n int) (mem *memDB, err error) {
@ -70,18 +111,15 @@ func (db *DB) newMem(n int) (mem *memDB, err error) {
db.journalWriter = w
db.journalFile = file
db.frozenMem = db.mem
mem, ok := db.memPool.Get().(*memDB)
if ok && mem.db.Capacity() >= n {
mem.db.Reset()
mem.incref()
} else {
mem = &memDB{
pool: db.memPool,
db: memdb.New(db.s.icmp, maxInt(db.s.o.GetWriteBuffer(), n)),
ref: 1,
}
mdb := db.mpoolGet()
if mdb == nil || mdb.Capacity() < n {
mdb = memdb.New(db.s.icmp, maxInt(db.s.o.GetWriteBuffer(), n))
}
mem = &memDB{
db: db,
mdb: mdb,
ref: 2,
}
mem.incref()
db.mem = mem
// The seq only incremented by the writer. And whoever called newMem
// should hold write lock, so no need additional synchronization here.

View File

@ -1577,7 +1577,11 @@ func TestDb_BloomFilter(t *testing.T) {
return fmt.Sprintf("key%06d", i)
}
n := 10000
const (
n = 10000
indexOverheat = 19898
filterOverheat = 19799
)
// Populate multiple layers
for i := 0; i < n; i++ {
@ -1601,7 +1605,7 @@ func TestDb_BloomFilter(t *testing.T) {
cnt := int(h.stor.ReadCounter())
t.Logf("lookup of %d present keys yield %d sstable I/O reads", n, cnt)
if min, max := n, n+2*n/100; cnt < min || cnt > max {
if min, max := n+indexOverheat+filterOverheat, n+indexOverheat+filterOverheat+2*n/100; cnt < min || cnt > max {
t.Errorf("num of sstable I/O reads of present keys not in range of %d - %d, got %d", min, max, cnt)
}
@ -1612,7 +1616,7 @@ func TestDb_BloomFilter(t *testing.T) {
}
cnt = int(h.stor.ReadCounter())
t.Logf("lookup of %d missing keys yield %d sstable I/O reads", n, cnt)
if max := 3 * n / 100; cnt > max {
if max := 3*n/100 + indexOverheat + filterOverheat; cnt > max {
t.Errorf("num of sstable I/O reads of missing keys was more than %d, got %d", max, cnt)
}

View File

@ -75,7 +75,7 @@ func (db *DB) flush(n int) (mem *memDB, nn int, err error) {
mem = nil
}
}()
nn = mem.db.Free()
nn = mem.mdb.Free()
switch {
case v.tLen(0) >= kL0_SlowdownWritesTrigger && !delayed:
delayed = true
@ -90,13 +90,13 @@ func (db *DB) flush(n int) (mem *memDB, nn int, err error) {
}
default:
// Allow memdb to grow if it has no entry.
if mem.db.Len() == 0 {
if mem.mdb.Len() == 0 {
nn = n
} else {
mem.decref()
mem, err = db.rotateMem(n)
if err == nil {
nn = mem.db.Free()
nn = mem.mdb.Free()
} else {
nn = 0
}
@ -190,7 +190,7 @@ drain:
return
case db.journalC <- b:
// Write into memdb
b.memReplay(mem.db)
b.memReplay(mem.mdb)
}
// Wait for journal writer
select {
@ -200,7 +200,7 @@ drain:
case err = <-db.journalAckC:
if err != nil {
// Revert memdb if error detected
b.revertMemReplay(mem.db)
b.revertMemReplay(mem.mdb)
return
}
}
@ -209,7 +209,7 @@ drain:
if err != nil {
return
}
b.memReplay(mem.db)
b.memReplay(mem.mdb)
}
// Set last seq number.
@ -271,7 +271,7 @@ func (db *DB) CompactRange(r util.Range) error {
// Check for overlaps in memdb.
mem := db.getEffectiveMem()
defer mem.decref()
if isMemOverlaps(db.s.icmp, mem.db, r.Start, r.Limit) {
if isMemOverlaps(db.s.icmp, mem.mdb, r.Start, r.Limit) {
// Memdb compaction.
if _, err := db.rotateMem(0); err != nil {
<-db.writeLockC

View File

@ -30,13 +30,16 @@ const (
type noCache struct{}
func (noCache) SetCapacity(capacity int) {}
func (noCache) Capacity() int { return 0 }
func (noCache) Used() int { return 0 }
func (noCache) Size() int { return 0 }
func (noCache) GetNamespace(id uint64) cache.Namespace { return nil }
func (noCache) Purge(fin cache.PurgeFin) {}
func (noCache) Zap() {}
func (noCache) SetCapacity(capacity int) {}
func (noCache) Capacity() int { return 0 }
func (noCache) Used() int { return 0 }
func (noCache) Size() int { return 0 }
func (noCache) NumObjects() int { return 0 }
func (noCache) GetNamespace(id uint64) cache.Namespace { return nil }
func (noCache) PurgeNamespace(id uint64, fin cache.PurgeFin) {}
func (noCache) ZapNamespace(id uint64) {}
func (noCache) Purge(fin cache.PurgeFin) {}
func (noCache) Zap() {}
var NoCache cache.Cache = noCache{}

View File

@ -7,7 +7,6 @@
package leveldb
import (
"io"
"sort"
"sync/atomic"
@ -323,15 +322,6 @@ func (t *tOps) createFrom(src iterator.Iterator) (f *tFile, n int, err error) {
return
}
type tableWrapper struct {
*table.Reader
closer io.Closer
}
func (tr tableWrapper) Release() {
tr.closer.Close()
}
// Opens table. It returns a cache handle, which should
// be released after use.
func (t *tOps) open(f *tFile) (ch cache.Handle, err error) {
@ -347,7 +337,7 @@ func (t *tOps) open(f *tFile) (ch cache.Handle, err error) {
if bc := t.s.o.GetBlockCache(); bc != nil {
bcacheNS = bc.GetNamespace(num)
}
return 1, tableWrapper{table.NewReader(r, int64(f.size), bcacheNS, t.bpool, t.s.o), r}
return 1, table.NewReader(r, int64(f.size), bcacheNS, t.bpool, t.s.o)
})
if ch == nil && err == nil {
err = ErrClosed
@ -363,7 +353,7 @@ func (t *tOps) find(f *tFile, key []byte, ro *opt.ReadOptions) (rkey, rvalue []b
return nil, nil, err
}
defer ch.Release()
return ch.Value().(tableWrapper).Find(key, ro)
return ch.Value().(*table.Reader).Find(key, ro)
}
// Returns approximate offset of the given key.
@ -372,10 +362,9 @@ func (t *tOps) offsetOf(f *tFile, key []byte) (offset uint64, err error) {
if err != nil {
return
}
_offset, err := ch.Value().(tableWrapper).OffsetOf(key)
offset = uint64(_offset)
ch.Release()
return
defer ch.Release()
offset_, err := ch.Value().(*table.Reader).OffsetOf(key)
return uint64(offset_), err
}
// Creates an iterator from the given table.
@ -384,7 +373,7 @@ func (t *tOps) newIterator(f *tFile, slice *util.Range, ro *opt.ReadOptions) ite
if err != nil {
return iterator.NewEmptyIterator(err)
}
iter := ch.Value().(tableWrapper).NewIterator(slice, ro)
iter := ch.Value().(*table.Reader).NewIterator(slice, ro)
iter.SetReleaser(ch)
return iter
}
@ -401,7 +390,7 @@ func (t *tOps) remove(f *tFile) {
t.s.logf("table@remove removed @%d", num)
}
if bc := t.s.o.GetBlockCache(); bc != nil {
bc.GetNamespace(num).Zap()
bc.ZapNamespace(num)
}
}
})
@ -411,6 +400,7 @@ func (t *tOps) remove(f *tFile) {
// regadless still used or not.
func (t *tOps) close() {
t.cache.Zap()
t.bpool.Close()
}
// Creates new initialized table ops instance.

View File

@ -40,7 +40,7 @@ var _ = testutil.Defer(func() {
data := bw.buf.Bytes()
restartsLen := int(binary.LittleEndian.Uint32(data[len(data)-4:]))
return &block{
cmp: comparer.DefaultComparer,
tr: &Reader{cmp: comparer.DefaultComparer},
data: data,
restartsLen: restartsLen,
restartsOffset: len(data) - (restartsLen+1)*4,

View File

@ -37,8 +37,7 @@ func max(x, y int) int {
}
type block struct {
bpool *util.BufferPool
cmp comparer.BasicComparer
tr *Reader
data []byte
restartsLen int
restartsOffset int
@ -47,31 +46,25 @@ type block struct {
}
func (b *block) seek(rstart, rlimit int, key []byte) (index, offset int, err error) {
n := b.restartsOffset
data := b.data
cmp := b.cmp
index = sort.Search(b.restartsLen-rstart-(b.restartsLen-rlimit), func(i int) bool {
offset := int(binary.LittleEndian.Uint32(data[n+4*(rstart+i):]))
offset += 1 // shared always zero, since this is a restart point
v1, n1 := binary.Uvarint(data[offset:]) // key length
_, n2 := binary.Uvarint(data[offset+n1:]) // value length
offset := int(binary.LittleEndian.Uint32(b.data[b.restartsOffset+4*(rstart+i):]))
offset += 1 // shared always zero, since this is a restart point
v1, n1 := binary.Uvarint(b.data[offset:]) // key length
_, n2 := binary.Uvarint(b.data[offset+n1:]) // value length
m := offset + n1 + n2
return cmp.Compare(data[m:m+int(v1)], key) > 0
return b.tr.cmp.Compare(b.data[m:m+int(v1)], key) > 0
}) + rstart - 1
if index < rstart {
// The smallest key is greater-than key sought.
index = rstart
}
offset = int(binary.LittleEndian.Uint32(data[n+4*index:]))
offset = int(binary.LittleEndian.Uint32(b.data[b.restartsOffset+4*index:]))
return
}
func (b *block) restartIndex(rstart, rlimit, offset int) int {
n := b.restartsOffset
data := b.data
return sort.Search(b.restartsLen-rstart-(b.restartsLen-rlimit), func(i int) bool {
return int(binary.LittleEndian.Uint32(data[n+4*(rstart+i):])) > offset
return int(binary.LittleEndian.Uint32(b.data[b.restartsOffset+4*(rstart+i):])) > offset
}) + rstart - 1
}
@ -141,10 +134,10 @@ func (b *block) newIterator(slice *util.Range, inclLimit bool, cache util.Releas
}
func (b *block) Release() {
if b.bpool != nil {
b.bpool.Put(b.data)
b.bpool = nil
if b.tr.bpool != nil {
b.tr.bpool.Put(b.data)
}
b.tr = nil
b.data = nil
}
@ -270,7 +263,7 @@ func (i *blockIter) Seek(key []byte) bool {
i.dir = dirForward
}
for i.Next() {
if i.block.cmp.Compare(i.key, key) >= 0 {
if i.block.tr.cmp.Compare(i.key, key) >= 0 {
return true
}
}
@ -479,7 +472,7 @@ func (i *blockIter) Error() error {
}
type filterBlock struct {
filter filter.Filter
tr *Reader
data []byte
oOffset int
baseLg uint
@ -493,7 +486,7 @@ func (b *filterBlock) contains(offset uint64, key []byte) bool {
n := int(binary.LittleEndian.Uint32(o))
m := int(binary.LittleEndian.Uint32(o[4:]))
if n < m && m <= b.oOffset {
return b.filter.Contains(b.data[n:m], key)
return b.tr.filter.Contains(b.data[n:m], key)
} else if n == m {
return false
}
@ -501,10 +494,17 @@ func (b *filterBlock) contains(offset uint64, key []byte) bool {
return true
}
func (b *filterBlock) Release() {
if b.tr.bpool != nil {
b.tr.bpool.Put(b.data)
}
b.tr = nil
b.data = nil
}
type indexIter struct {
blockIter
tableReader *Reader
slice *util.Range
*blockIter
slice *util.Range
// Options
checksum bool
fillCache bool
@ -523,7 +523,7 @@ func (i *indexIter) Get() iterator.Iterator {
if i.slice != nil && (i.blockIter.isFirst() || i.blockIter.isLast()) {
slice = i.slice
}
return i.tableReader.getDataIter(dataBH, slice, i.checksum, i.fillCache)
return i.blockIter.block.tr.getDataIter(dataBH, slice, i.checksum, i.fillCache)
}
// Reader is a table reader.
@ -538,9 +538,8 @@ type Reader struct {
checksum bool
strictIter bool
dataEnd int64
indexBlock *block
filterBlock *filterBlock
dataEnd int64
indexBH, filterBH blockHandle
}
func verifyChecksum(data []byte) bool {
@ -557,6 +556,7 @@ func (r *Reader) readRawBlock(bh blockHandle, checksum bool) ([]byte, error) {
}
if checksum || r.checksum {
if !verifyChecksum(data) {
r.bpool.Put(data)
return nil, errors.New("leveldb/table: Reader: invalid block (checksum mismatch)")
}
}
@ -575,6 +575,7 @@ func (r *Reader) readRawBlock(bh blockHandle, checksum bool) ([]byte, error) {
return nil, err
}
default:
r.bpool.Put(data)
return nil, fmt.Errorf("leveldb/table: Reader: unknown block compression type: %d", data[bh.length])
}
return data, nil
@ -587,7 +588,7 @@ func (r *Reader) readBlock(bh blockHandle, checksum bool) (*block, error) {
}
restartsLen := int(binary.LittleEndian.Uint32(data[len(data)-4:]))
b := &block{
cmp: r.cmp,
tr: r,
data: data,
restartsLen: restartsLen,
restartsOffset: len(data) - (restartsLen+1)*4,
@ -596,7 +597,44 @@ func (r *Reader) readBlock(bh blockHandle, checksum bool) (*block, error) {
return b, nil
}
func (r *Reader) readFilterBlock(bh blockHandle, filter filter.Filter) (*filterBlock, error) {
func (r *Reader) readBlockCached(bh blockHandle, checksum, fillCache bool) (*block, util.Releaser, error) {
if r.cache != nil {
var err error
ch := r.cache.Get(bh.offset, func() (charge int, value interface{}) {
if !fillCache {
return 0, nil
}
var b *block
b, err = r.readBlock(bh, checksum)
if err != nil {
return 0, nil
}
return cap(b.data), b
})
if ch != nil {
b, ok := ch.Value().(*block)
if !ok {
ch.Release()
return nil, nil, errors.New("leveldb/table: Reader: inconsistent block type")
}
if !b.checksum && (r.checksum || checksum) {
if !verifyChecksum(b.data) {
ch.Release()
return nil, nil, errors.New("leveldb/table: Reader: invalid block (checksum mismatch)")
}
b.checksum = true
}
return b, ch, err
} else if err != nil {
return nil, nil, err
}
}
b, err := r.readBlock(bh, checksum)
return b, b, err
}
func (r *Reader) readFilterBlock(bh blockHandle) (*filterBlock, error) {
data, err := r.readRawBlock(bh, true)
if err != nil {
return nil, err
@ -611,7 +649,7 @@ func (r *Reader) readFilterBlock(bh blockHandle, filter filter.Filter) (*filterB
return nil, errors.New("leveldb/table: Reader: invalid filter block (invalid offset)")
}
b := &filterBlock{
filter: filter,
tr: r,
data: data,
oOffset: oOffset,
baseLg: uint(data[n-1]),
@ -620,42 +658,42 @@ func (r *Reader) readFilterBlock(bh blockHandle, filter filter.Filter) (*filterB
return b, nil
}
func (r *Reader) getDataIter(dataBH blockHandle, slice *util.Range, checksum, fillCache bool) iterator.Iterator {
func (r *Reader) readFilterBlockCached(bh blockHandle, fillCache bool) (*filterBlock, util.Releaser, error) {
if r.cache != nil {
// Get/set block cache.
var err error
cache := r.cache.Get(dataBH.offset, func() (charge int, value interface{}) {
ch := r.cache.Get(bh.offset, func() (charge int, value interface{}) {
if !fillCache {
return 0, nil
}
var dataBlock *block
dataBlock, err = r.readBlock(dataBH, checksum)
var b *filterBlock
b, err = r.readFilterBlock(bh)
if err != nil {
return 0, nil
}
return int(dataBH.length), dataBlock
return cap(b.data), b
})
if err != nil {
return iterator.NewEmptyIterator(err)
}
if cache != nil {
dataBlock := cache.Value().(*block)
if !dataBlock.checksum && (r.checksum || checksum) {
if !verifyChecksum(dataBlock.data) {
return iterator.NewEmptyIterator(errors.New("leveldb/table: Reader: invalid block (checksum mismatch)"))
}
dataBlock.checksum = true
if ch != nil {
b, ok := ch.Value().(*filterBlock)
if !ok {
ch.Release()
return nil, nil, errors.New("leveldb/table: Reader: inconsistent block type")
}
iter := dataBlock.newIterator(slice, false, cache)
return iter
return b, ch, err
} else if err != nil {
return nil, nil, err
}
}
dataBlock, err := r.readBlock(dataBH, checksum)
b, err := r.readFilterBlock(bh)
return b, b, err
}
func (r *Reader) getDataIter(dataBH blockHandle, slice *util.Range, checksum, fillCache bool) iterator.Iterator {
b, rel, err := r.readBlockCached(dataBH, checksum, fillCache)
if err != nil {
return iterator.NewEmptyIterator(err)
}
iter := dataBlock.newIterator(slice, false, dataBlock)
return iter
return b.newIterator(slice, false, rel)
}
// NewIterator creates an iterator from the table.
@ -669,18 +707,21 @@ func (r *Reader) getDataIter(dataBH blockHandle, slice *util.Range, checksum, fi
// when not used.
//
// Also read Iterator documentation of the leveldb/iterator package.
func (r *Reader) NewIterator(slice *util.Range, ro *opt.ReadOptions) iterator.Iterator {
if r.err != nil {
return iterator.NewEmptyIterator(r.err)
}
fillCache := !ro.GetDontFillCache()
b, rel, err := r.readBlockCached(r.indexBH, true, fillCache)
if err != nil {
return iterator.NewEmptyIterator(err)
}
index := &indexIter{
blockIter: *r.indexBlock.newIterator(slice, true, nil),
tableReader: r,
slice: slice,
checksum: ro.GetStrict(opt.StrictBlockChecksum),
fillCache: !ro.GetDontFillCache(),
blockIter: b.newIterator(slice, true, rel),
slice: slice,
checksum: ro.GetStrict(opt.StrictBlockChecksum),
fillCache: !ro.GetDontFillCache(),
}
return iterator.NewIndexedIterator(index, r.strictIter || ro.GetStrict(opt.StrictIterator), false)
}
@ -697,7 +738,13 @@ func (r *Reader) Find(key []byte, ro *opt.ReadOptions) (rkey, value []byte, err
return
}
index := r.indexBlock.newIterator(nil, true, nil)
indexBlock, rel, err := r.readBlockCached(r.indexBH, true, true)
if err != nil {
return
}
defer rel.Release()
index := indexBlock.newIterator(nil, true, nil)
defer index.Release()
if !index.Seek(key) {
err = index.Error()
@ -711,9 +758,15 @@ func (r *Reader) Find(key []byte, ro *opt.ReadOptions) (rkey, value []byte, err
err = errors.New("leveldb/table: Reader: invalid table (bad data block handle)")
return
}
if r.filterBlock != nil && !r.filterBlock.contains(dataBH.offset, key) {
err = ErrNotFound
return
if r.filter != nil {
filterBlock, rel, ferr := r.readFilterBlockCached(r.filterBH, true)
if ferr == nil {
if !filterBlock.contains(dataBH.offset, key) {
rel.Release()
return nil, nil, ErrNotFound
}
rel.Release()
}
}
data := r.getDataIter(dataBH, nil, ro.GetStrict(opt.StrictBlockChecksum), !ro.GetDontFillCache())
defer data.Release()
@ -760,7 +813,13 @@ func (r *Reader) OffsetOf(key []byte) (offset int64, err error) {
return
}
index := r.indexBlock.newIterator(nil, true, nil)
indexBlock, rel, err := r.readBlockCached(r.indexBH, true, true)
if err != nil {
return
}
defer rel.Release()
index := indexBlock.newIterator(nil, true, nil)
defer index.Release()
if index.Seek(key) {
dataBH, n := decodeBlockHandle(index.Value())
@ -778,6 +837,17 @@ func (r *Reader) OffsetOf(key []byte) (offset int64, err error) {
return
}
// Release implements util.Releaser.
// It also close the file if it is an io.Closer.
func (r *Reader) Release() {
if closer, ok := r.reader.(io.Closer); ok {
closer.Close()
}
r.reader = nil
r.cache = nil
r.bpool = nil
}
// NewReader creates a new initialized table reader for the file.
// The cache and bpool is optional and can be nil.
//
@ -817,16 +887,11 @@ func NewReader(f io.ReaderAt, size int64, cache cache.Namespace, bpool *util.Buf
return r
}
// Decode the index block handle.
indexBH, n := decodeBlockHandle(footer[n:])
r.indexBH, n = decodeBlockHandle(footer[n:])
if n == 0 {
r.err = errors.New("leveldb/table: Reader: invalid table (bad index block handle)")
return r
}
// Read index block.
r.indexBlock, r.err = r.readBlock(indexBH, true)
if r.err != nil {
return r
}
// Read metaindex block.
metaBlock, err := r.readBlock(metaBH, true)
if err != nil {
@ -842,32 +907,28 @@ func NewReader(f io.ReaderAt, size int64, cache cache.Namespace, bpool *util.Buf
continue
}
fn := key[7:]
var filter filter.Filter
if f0 := o.GetFilter(); f0 != nil && f0.Name() == fn {
filter = f0
r.filter = f0
} else {
for _, f0 := range o.GetAltFilters() {
if f0.Name() == fn {
filter = f0
r.filter = f0
break
}
}
}
if filter != nil {
if r.filter != nil {
filterBH, n := decodeBlockHandle(metaIter.Value())
if n == 0 {
continue
}
r.filterBH = filterBH
// Update data end.
r.dataEnd = int64(filterBH.offset)
filterBlock, err := r.readFilterBlock(filterBH, filter)
if err != nil {
continue
}
r.filterBlock = filterBlock
break
}
}
metaIter.Release()
metaBlock.Release()
return r
}

View File

@ -111,7 +111,9 @@ var _ = testutil.Defer(func() {
testutil.AllKeyValueTesting(nil, Build)
Describe("with one key per block", Test(testutil.KeyValue_Generate(nil, 9, 1, 10, 512, 512), func(r *Reader) {
It("should have correct blocks number", func() {
Expect(r.indexBlock.restartsLen).Should(Equal(9))
indexBlock, err := r.readBlock(r.indexBH, true)
Expect(err).To(BeNil())
Expect(indexBlock.restartsLen).Should(Equal(9))
})
}))
})

View File

@ -19,15 +19,21 @@ type buffer struct {
// BufferPool is a 'buffer pool'.
type BufferPool struct {
pool [4]chan []byte
size [3]uint32
sizeMiss [3]uint32
baseline0 int
baseline1 int
baseline2 int
pool [6]chan []byte
size [5]uint32
sizeMiss [5]uint32
sizeHalf [5]uint32
baseline [4]int
baselinex0 int
baselinex1 int
baseline0 int
baseline1 int
baseline2 int
close chan struct{}
get uint32
put uint32
half uint32
less uint32
equal uint32
greater uint32
@ -35,16 +41,15 @@ type BufferPool struct {
}
func (p *BufferPool) poolNum(n int) int {
switch {
case n <= p.baseline0:
if n <= p.baseline0 && n > p.baseline0/2 {
return 0
case n <= p.baseline1:
return 1
case n <= p.baseline2:
return 2
default:
return 3
}
for i, x := range p.baseline {
if n <= x {
return i + 1
}
}
return len(p.baseline) + 1
}
// Get returns buffer with length of n.
@ -59,13 +64,22 @@ func (p *BufferPool) Get(n int) []byte {
case b := <-pool:
switch {
case cap(b) > n:
atomic.AddUint32(&p.less, 1)
return b[:n]
if cap(b)-n >= n {
atomic.AddUint32(&p.half, 1)
select {
case pool <- b:
default:
}
return make([]byte, n)
} else {
atomic.AddUint32(&p.less, 1)
return b[:n]
}
case cap(b) == n:
atomic.AddUint32(&p.equal, 1)
return b[:n]
default:
panic("not reached")
atomic.AddUint32(&p.greater, 1)
}
default:
atomic.AddUint32(&p.miss, 1)
@ -79,8 +93,23 @@ func (p *BufferPool) Get(n int) []byte {
case b := <-pool:
switch {
case cap(b) > n:
atomic.AddUint32(&p.less, 1)
return b[:n]
if cap(b)-n >= n {
atomic.AddUint32(&p.half, 1)
sizeHalfPtr := &p.sizeHalf[poolNum-1]
if atomic.AddUint32(sizeHalfPtr, 1) == 20 {
atomic.StoreUint32(sizePtr, uint32(cap(b)/2))
atomic.StoreUint32(sizeHalfPtr, 0)
} else {
select {
case pool <- b:
default:
}
}
return make([]byte, n)
} else {
atomic.AddUint32(&p.less, 1)
return b[:n]
}
case cap(b) == n:
atomic.AddUint32(&p.equal, 1)
return b[:n]
@ -126,20 +155,34 @@ func (p *BufferPool) Put(b []byte) {
}
func (p *BufferPool) Close() {
select {
case p.close <- struct{}{}:
default:
}
}
func (p *BufferPool) String() string {
return fmt.Sprintf("BufferPool{B·%d Z·%v Zm·%v G·%d P·%d <·%d =·%d >·%d M·%d}",
p.baseline0, p.size, p.sizeMiss, p.get, p.put, p.less, p.equal, p.greater, p.miss)
return fmt.Sprintf("BufferPool{B·%d Z·%v Zm·%v Zh·%v G·%d P·%d H·%d <·%d =·%d >·%d M·%d}",
p.baseline0, p.size, p.sizeMiss, p.sizeHalf, p.get, p.put, p.half, p.less, p.equal, p.greater, p.miss)
}
func (p *BufferPool) drain() {
ticker := time.NewTicker(2 * time.Second)
for {
time.Sleep(1 * time.Second)
select {
case <-p.pool[0]:
case <-p.pool[1]:
case <-p.pool[2]:
case <-p.pool[3]:
default:
case <-ticker.C:
for _, ch := range p.pool {
select {
case <-ch:
default:
}
}
case <-p.close:
for _, ch := range p.pool {
close(ch)
}
return
}
}
}
@ -151,10 +194,10 @@ func NewBufferPool(baseline int) *BufferPool {
}
p := &BufferPool{
baseline0: baseline,
baseline1: baseline * 2,
baseline2: baseline * 4,
baseline: [...]int{baseline / 4, baseline / 2, baseline * 2, baseline * 4},
close: make(chan struct{}, 1),
}
for i, cap := range []int{6, 6, 3, 1} {
for i, cap := range []int{2, 2, 4, 4, 2, 1} {
p.pool[i] = make(chan []byte, cap)
}
go p.drain()