From f633bdddf0c5ffbfe954b2d409fd121051c80eb7 Mon Sep 17 00:00:00 2001 From: Jakob Borg Date: Tue, 2 Sep 2014 09:43:42 +0200 Subject: [PATCH] Update goleveldb --- Godeps/Godeps.json | 2 +- .../syndtr/goleveldb/leveldb/cache/cache.go | 11 + .../goleveldb/leveldb/cache/lru_cache.go | 38 ++- .../github.com/syndtr/goleveldb/leveldb/db.go | 23 +- .../syndtr/goleveldb/leveldb/db_compaction.go | 6 +- .../syndtr/goleveldb/leveldb/db_iter.go | 11 +- .../syndtr/goleveldb/leveldb/db_snapshot.go | 9 +- .../syndtr/goleveldb/leveldb/db_state.go | 70 ++++-- .../syndtr/goleveldb/leveldb/db_test.go | 10 +- .../syndtr/goleveldb/leveldb/db_write.go | 14 +- .../syndtr/goleveldb/leveldb/opt/options.go | 17 +- .../syndtr/goleveldb/leveldb/table.go | 26 +-- .../goleveldb/leveldb/table/block_test.go | 2 +- .../syndtr/goleveldb/leveldb/table/reader.go | 217 +++++++++++------- .../goleveldb/leveldb/table/table_test.go | 4 +- .../{buffer_pool_legacy.go => buffer_pool.go} | 103 ++++++--- 16 files changed, 384 insertions(+), 179 deletions(-) rename Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/util/{buffer_pool_legacy.go => buffer_pool.go} (57%) diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index 065c61690..f8368e416 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -49,7 +49,7 @@ }, { "ImportPath": "github.com/syndtr/goleveldb/leveldb", - "Rev": "59d87758aeaab5ab6ed289c773349500228a1557" + "Rev": "2b99e8d4757bf06eeab1b0485d80b8ae1c088874" }, { "ImportPath": "github.com/vitrun/qart/coding", diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/cache/cache.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/cache/cache.go index fe398f03a..49f82f0fb 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/cache/cache.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/cache/cache.go @@ -40,10 +40,21 @@ type Cache interface { // Size returns entire alive cache objects size. Size() int + // NumObjects returns number of alive objects. + NumObjects() int + // GetNamespace gets cache namespace with the given id. // GetNamespace is never return nil. GetNamespace(id uint64) Namespace + // PurgeNamespace purges cache namespace with the given id from this cache tree. + // Also read Namespace.Purge. + PurgeNamespace(id uint64, fin PurgeFin) + + // ZapNamespace detaches cache namespace with the given id from this cache tree. + // Also read Namespace.Zap. + ZapNamespace(id uint64) + // Purge purges all cache namespace from this cache tree. // This is behave the same as calling Namespace.Purge method on all cache namespace. Purge(fin PurgeFin) diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/cache/lru_cache.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/cache/lru_cache.go index a1504b159..a29b5d088 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/cache/lru_cache.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/cache/lru_cache.go @@ -15,11 +15,11 @@ import ( // lruCache represent a LRU cache state. type lruCache struct { - mu sync.Mutex - recent lruNode - table map[uint64]*lruNs - capacity int - used, size int + mu sync.Mutex + recent lruNode + table map[uint64]*lruNs + capacity int + used, size, alive int } // NewLRUCache creates a new initialized LRU cache with the given capacity. @@ -51,6 +51,12 @@ func (c *lruCache) Size() int { return c.size } +func (c *lruCache) NumObjects() int { + c.mu.Lock() + defer c.mu.Unlock() + return c.alive +} + // SetCapacity set cache capacity. func (c *lruCache) SetCapacity(capacity int) { c.mu.Lock() @@ -77,6 +83,23 @@ func (c *lruCache) GetNamespace(id uint64) Namespace { return ns } +func (c *lruCache) ZapNamespace(id uint64) { + c.mu.Lock() + if ns, exist := c.table[id]; exist { + ns.zapNB() + delete(c.table, id) + } + c.mu.Unlock() +} + +func (c *lruCache) PurgeNamespace(id uint64, fin PurgeFin) { + c.mu.Lock() + if ns, exist := c.table[id]; exist { + ns.purgeNB(fin) + } + c.mu.Unlock() +} + // Purge purge entire cache. func (c *lruCache) Purge(fin PurgeFin) { c.mu.Lock() @@ -158,11 +181,12 @@ func (ns *lruNs) Get(key uint64, setf SetFunc) Handle { } ns.table[key] = node + ns.lru.size += charge + ns.lru.alive++ if charge > 0 { node.ref++ node.rInsert(&ns.lru.recent) ns.lru.used += charge - ns.lru.size += charge ns.lru.evict() } } @@ -322,8 +346,10 @@ func (n *lruNode) derefNB() { // Remove elemement. delete(n.ns.table, n.key) n.ns.lru.size -= n.charge + n.ns.lru.alive-- n.fin() } + n.value = nil } else if n.ref < 0 { panic("leveldb/cache: lruCache: negative node reference") } diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db.go index 59b9017d3..979d0ac4a 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db.go @@ -14,6 +14,7 @@ import ( "runtime" "strings" "sync" + "sync/atomic" "time" "github.com/syndtr/goleveldb/leveldb/iterator" @@ -35,7 +36,7 @@ type DB struct { // MemDB. memMu sync.RWMutex - memPool *util.Pool + memPool chan *memdb.DB mem, frozenMem *memDB journal *journal.Writer journalWriter storage.Writer @@ -47,6 +48,9 @@ type DB struct { snapsMu sync.Mutex snapsRoot snapshotElement + // Stats. + aliveSnaps, aliveIters int32 + // Write. writeC chan *Batch writeMergedC chan bool @@ -80,7 +84,7 @@ func openDB(s *session) (*DB, error) { // Initial sequence seq: s.stSeq, // MemDB - memPool: util.NewPool(1), + memPool: make(chan *memdb.DB, 1), // Write writeC: make(chan *Batch), writeMergedC: make(chan bool), @@ -122,6 +126,7 @@ func openDB(s *session) (*DB, error) { go db.tCompaction() go db.mCompaction() go db.jWriter() + go db.mpoolDrain() s.logf("db@open done T·%v", time.Since(start)) @@ -568,7 +573,7 @@ func (db *DB) get(key []byte, seq uint64, ro *opt.ReadOptions) (value []byte, er } defer m.decref() - mk, mv, me := m.db.Find(ikey) + mk, mv, me := m.mdb.Find(ikey) if me == nil { ukey, _, t, ok := parseIkey(mk) if ok && db.s.icmp.uCompare(ukey, key) == 0 { @@ -657,6 +662,14 @@ func (db *DB) GetSnapshot() (*Snapshot, error) { // Returns sstables list for each level. // leveldb.blockpool // Returns block pool stats. +// leveldb.cachedblock +// Returns size of cached block. +// leveldb.openedtables +// Returns number of opened tables. +// leveldb.alivesnaps +// Returns number of alive snapshots. +// leveldb.aliveiters +// Returns number of alive iterators. func (db *DB) GetProperty(name string) (value string, err error) { err = db.ok() if err != nil { @@ -712,6 +725,10 @@ func (db *DB) GetProperty(name string) (value string, err error) { } case p == "openedtables": value = fmt.Sprintf("%d", db.s.tops.cache.Size()) + case p == "alivesnaps": + value = fmt.Sprintf("%d", atomic.LoadInt32(&db.aliveSnaps)) + case p == "aliveiters": + value = fmt.Sprintf("%d", atomic.LoadInt32(&db.aliveIters)) default: err = errors.New("leveldb: GetProperty: unknown property: " + name) } diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_compaction.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_compaction.go index e5e74d7e1..b38cdedc5 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_compaction.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_compaction.go @@ -221,10 +221,10 @@ func (db *DB) memCompaction() { c := newCMem(db.s) stats := new(cStatsStaging) - db.logf("mem@flush N·%d S·%s", mem.db.Len(), shortenb(mem.db.Size())) + db.logf("mem@flush N·%d S·%s", mem.mdb.Len(), shortenb(mem.mdb.Size())) // Don't compact empty memdb. - if mem.db.Len() == 0 { + if mem.mdb.Len() == 0 { db.logf("mem@flush skipping") // drop frozen mem db.dropFrozenMem() @@ -242,7 +242,7 @@ func (db *DB) memCompaction() { db.compactionTransact("mem@flush", func(cnt *compactionTransactCounter) (err error) { stats.startTimer() defer stats.stopTimer() - return c.flush(mem.db, -1) + return c.flush(mem.mdb, -1) }, func() error { for _, r := range c.rec.addedTables { db.logf("mem@flush rollback @%d", r.num) diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_iter.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_iter.go index d028768d5..c34c7abae 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_iter.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_iter.go @@ -10,6 +10,7 @@ import ( "errors" "runtime" "sync" + "sync/atomic" "github.com/syndtr/goleveldb/leveldb/iterator" "github.com/syndtr/goleveldb/leveldb/opt" @@ -38,11 +39,11 @@ func (db *DB) newRawIterator(slice *util.Range, ro *opt.ReadOptions) iterator.It ti := v.getIterators(slice, ro) n := len(ti) + 2 i := make([]iterator.Iterator, 0, n) - emi := em.db.NewIterator(slice) + emi := em.mdb.NewIterator(slice) emi.SetReleaser(&memdbReleaser{m: em}) i = append(i, emi) if fm != nil { - fmi := fm.db.NewIterator(slice) + fmi := fm.mdb.NewIterator(slice) fmi.SetReleaser(&memdbReleaser{m: fm}) i = append(i, fmi) } @@ -66,6 +67,7 @@ func (db *DB) newIterator(seq uint64, slice *util.Range, ro *opt.ReadOptions) *d } rawIter := db.newRawIterator(islice, ro) iter := &dbIter{ + db: db, icmp: db.s.icmp, iter: rawIter, seq: seq, @@ -73,6 +75,7 @@ func (db *DB) newIterator(seq uint64, slice *util.Range, ro *opt.ReadOptions) *d key: make([]byte, 0), value: make([]byte, 0), } + atomic.AddInt32(&db.aliveIters, 1) runtime.SetFinalizer(iter, (*dbIter).Release) return iter } @@ -89,6 +92,7 @@ const ( // dbIter represent an interator states over a database session. type dbIter struct { + db *DB icmp *iComparer iter iterator.Iterator seq uint64 @@ -303,6 +307,7 @@ func (i *dbIter) Release() { if i.releaser != nil { i.releaser.Release() + i.releaser = nil } i.dir = dirReleased @@ -310,6 +315,8 @@ func (i *dbIter) Release() { i.value = nil i.iter.Release() i.iter = nil + atomic.AddInt32(&i.db.aliveIters, -1) + i.db = nil } } diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_snapshot.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_snapshot.go index 31340bdd0..fb1ce85b9 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_snapshot.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_snapshot.go @@ -9,6 +9,7 @@ package leveldb import ( "runtime" "sync" + "sync/atomic" "github.com/syndtr/goleveldb/leveldb/iterator" "github.com/syndtr/goleveldb/leveldb/opt" @@ -81,7 +82,7 @@ func (db *DB) minSeq() uint64 { type Snapshot struct { db *DB elem *snapshotElement - mu sync.Mutex + mu sync.RWMutex released bool } @@ -91,6 +92,7 @@ func (db *DB) newSnapshot() *Snapshot { db: db, elem: db.acquireSnapshot(), } + atomic.AddInt32(&db.aliveSnaps, 1) runtime.SetFinalizer(snap, (*Snapshot).Release) return snap } @@ -105,8 +107,8 @@ func (snap *Snapshot) Get(key []byte, ro *opt.ReadOptions) (value []byte, err er if err != nil { return } - snap.mu.Lock() - defer snap.mu.Unlock() + snap.mu.RLock() + defer snap.mu.RUnlock() if snap.released { err = ErrSnapshotReleased return @@ -160,6 +162,7 @@ func (snap *Snapshot) Release() { snap.released = true snap.db.releaseSnapshot(snap.elem) + atomic.AddInt32(&snap.db.aliveSnaps, -1) snap.db = nil snap.elem = nil } diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_state.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_state.go index 807cc6c7a..24ecab504 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_state.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_state.go @@ -8,16 +8,16 @@ package leveldb import ( "sync/atomic" + "time" "github.com/syndtr/goleveldb/leveldb/journal" "github.com/syndtr/goleveldb/leveldb/memdb" - "github.com/syndtr/goleveldb/leveldb/util" ) type memDB struct { - pool *util.Pool - db *memdb.DB - ref int32 + db *DB + mdb *memdb.DB + ref int32 } func (m *memDB) incref() { @@ -26,7 +26,13 @@ func (m *memDB) incref() { func (m *memDB) decref() { if ref := atomic.AddInt32(&m.ref, -1); ref == 0 { - m.pool.Put(m) + // Only put back memdb with std capacity. + if m.mdb.Capacity() == m.db.s.o.GetWriteBuffer() { + m.mdb.Reset() + m.db.mpoolPut(m.mdb) + } + m.db = nil + m.mdb = nil } else if ref < 0 { panic("negative memdb ref") } @@ -42,6 +48,41 @@ func (db *DB) addSeq(delta uint64) { atomic.AddUint64(&db.seq, delta) } +func (db *DB) mpoolPut(mem *memdb.DB) { + defer func() { + recover() + }() + select { + case db.memPool <- mem: + default: + } +} + +func (db *DB) mpoolGet() *memdb.DB { + select { + case mem := <-db.memPool: + return mem + default: + return nil + } +} + +func (db *DB) mpoolDrain() { + ticker := time.NewTicker(30 * time.Second) + for { + select { + case <-ticker.C: + select { + case <-db.memPool: + default: + } + case _, _ = <-db.closeC: + close(db.memPool) + return + } + } +} + // Create new memdb and froze the old one; need external synchronization. // newMem only called synchronously by the writer. func (db *DB) newMem(n int) (mem *memDB, err error) { @@ -70,18 +111,15 @@ func (db *DB) newMem(n int) (mem *memDB, err error) { db.journalWriter = w db.journalFile = file db.frozenMem = db.mem - mem, ok := db.memPool.Get().(*memDB) - if ok && mem.db.Capacity() >= n { - mem.db.Reset() - mem.incref() - } else { - mem = &memDB{ - pool: db.memPool, - db: memdb.New(db.s.icmp, maxInt(db.s.o.GetWriteBuffer(), n)), - ref: 1, - } + mdb := db.mpoolGet() + if mdb == nil || mdb.Capacity() < n { + mdb = memdb.New(db.s.icmp, maxInt(db.s.o.GetWriteBuffer(), n)) + } + mem = &memDB{ + db: db, + mdb: mdb, + ref: 2, } - mem.incref() db.mem = mem // The seq only incremented by the writer. And whoever called newMem // should hold write lock, so no need additional synchronization here. diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_test.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_test.go index 7f15b4b65..5aadc12d7 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_test.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_test.go @@ -1577,7 +1577,11 @@ func TestDb_BloomFilter(t *testing.T) { return fmt.Sprintf("key%06d", i) } - n := 10000 + const ( + n = 10000 + indexOverheat = 19898 + filterOverheat = 19799 + ) // Populate multiple layers for i := 0; i < n; i++ { @@ -1601,7 +1605,7 @@ func TestDb_BloomFilter(t *testing.T) { cnt := int(h.stor.ReadCounter()) t.Logf("lookup of %d present keys yield %d sstable I/O reads", n, cnt) - if min, max := n, n+2*n/100; cnt < min || cnt > max { + if min, max := n+indexOverheat+filterOverheat, n+indexOverheat+filterOverheat+2*n/100; cnt < min || cnt > max { t.Errorf("num of sstable I/O reads of present keys not in range of %d - %d, got %d", min, max, cnt) } @@ -1612,7 +1616,7 @@ func TestDb_BloomFilter(t *testing.T) { } cnt = int(h.stor.ReadCounter()) t.Logf("lookup of %d missing keys yield %d sstable I/O reads", n, cnt) - if max := 3 * n / 100; cnt > max { + if max := 3*n/100 + indexOverheat + filterOverheat; cnt > max { t.Errorf("num of sstable I/O reads of missing keys was more than %d, got %d", max, cnt) } diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_write.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_write.go index 755108590..82725a9ee 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_write.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_write.go @@ -75,7 +75,7 @@ func (db *DB) flush(n int) (mem *memDB, nn int, err error) { mem = nil } }() - nn = mem.db.Free() + nn = mem.mdb.Free() switch { case v.tLen(0) >= kL0_SlowdownWritesTrigger && !delayed: delayed = true @@ -90,13 +90,13 @@ func (db *DB) flush(n int) (mem *memDB, nn int, err error) { } default: // Allow memdb to grow if it has no entry. - if mem.db.Len() == 0 { + if mem.mdb.Len() == 0 { nn = n } else { mem.decref() mem, err = db.rotateMem(n) if err == nil { - nn = mem.db.Free() + nn = mem.mdb.Free() } else { nn = 0 } @@ -190,7 +190,7 @@ drain: return case db.journalC <- b: // Write into memdb - b.memReplay(mem.db) + b.memReplay(mem.mdb) } // Wait for journal writer select { @@ -200,7 +200,7 @@ drain: case err = <-db.journalAckC: if err != nil { // Revert memdb if error detected - b.revertMemReplay(mem.db) + b.revertMemReplay(mem.mdb) return } } @@ -209,7 +209,7 @@ drain: if err != nil { return } - b.memReplay(mem.db) + b.memReplay(mem.mdb) } // Set last seq number. @@ -271,7 +271,7 @@ func (db *DB) CompactRange(r util.Range) error { // Check for overlaps in memdb. mem := db.getEffectiveMem() defer mem.decref() - if isMemOverlaps(db.s.icmp, mem.db, r.Start, r.Limit) { + if isMemOverlaps(db.s.icmp, mem.mdb, r.Start, r.Limit) { // Memdb compaction. if _, err := db.rotateMem(0); err != nil { <-db.writeLockC diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/opt/options.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/opt/options.go index 241184481..21e7a8a7f 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/opt/options.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/opt/options.go @@ -30,13 +30,16 @@ const ( type noCache struct{} -func (noCache) SetCapacity(capacity int) {} -func (noCache) Capacity() int { return 0 } -func (noCache) Used() int { return 0 } -func (noCache) Size() int { return 0 } -func (noCache) GetNamespace(id uint64) cache.Namespace { return nil } -func (noCache) Purge(fin cache.PurgeFin) {} -func (noCache) Zap() {} +func (noCache) SetCapacity(capacity int) {} +func (noCache) Capacity() int { return 0 } +func (noCache) Used() int { return 0 } +func (noCache) Size() int { return 0 } +func (noCache) NumObjects() int { return 0 } +func (noCache) GetNamespace(id uint64) cache.Namespace { return nil } +func (noCache) PurgeNamespace(id uint64, fin cache.PurgeFin) {} +func (noCache) ZapNamespace(id uint64) {} +func (noCache) Purge(fin cache.PurgeFin) {} +func (noCache) Zap() {} var NoCache cache.Cache = noCache{} diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table.go index 1c3ff3249..a1b04d827 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table.go @@ -7,7 +7,6 @@ package leveldb import ( - "io" "sort" "sync/atomic" @@ -323,15 +322,6 @@ func (t *tOps) createFrom(src iterator.Iterator) (f *tFile, n int, err error) { return } -type tableWrapper struct { - *table.Reader - closer io.Closer -} - -func (tr tableWrapper) Release() { - tr.closer.Close() -} - // Opens table. It returns a cache handle, which should // be released after use. func (t *tOps) open(f *tFile) (ch cache.Handle, err error) { @@ -347,7 +337,7 @@ func (t *tOps) open(f *tFile) (ch cache.Handle, err error) { if bc := t.s.o.GetBlockCache(); bc != nil { bcacheNS = bc.GetNamespace(num) } - return 1, tableWrapper{table.NewReader(r, int64(f.size), bcacheNS, t.bpool, t.s.o), r} + return 1, table.NewReader(r, int64(f.size), bcacheNS, t.bpool, t.s.o) }) if ch == nil && err == nil { err = ErrClosed @@ -363,7 +353,7 @@ func (t *tOps) find(f *tFile, key []byte, ro *opt.ReadOptions) (rkey, rvalue []b return nil, nil, err } defer ch.Release() - return ch.Value().(tableWrapper).Find(key, ro) + return ch.Value().(*table.Reader).Find(key, ro) } // Returns approximate offset of the given key. @@ -372,10 +362,9 @@ func (t *tOps) offsetOf(f *tFile, key []byte) (offset uint64, err error) { if err != nil { return } - _offset, err := ch.Value().(tableWrapper).OffsetOf(key) - offset = uint64(_offset) - ch.Release() - return + defer ch.Release() + offset_, err := ch.Value().(*table.Reader).OffsetOf(key) + return uint64(offset_), err } // Creates an iterator from the given table. @@ -384,7 +373,7 @@ func (t *tOps) newIterator(f *tFile, slice *util.Range, ro *opt.ReadOptions) ite if err != nil { return iterator.NewEmptyIterator(err) } - iter := ch.Value().(tableWrapper).NewIterator(slice, ro) + iter := ch.Value().(*table.Reader).NewIterator(slice, ro) iter.SetReleaser(ch) return iter } @@ -401,7 +390,7 @@ func (t *tOps) remove(f *tFile) { t.s.logf("table@remove removed @%d", num) } if bc := t.s.o.GetBlockCache(); bc != nil { - bc.GetNamespace(num).Zap() + bc.ZapNamespace(num) } } }) @@ -411,6 +400,7 @@ func (t *tOps) remove(f *tFile) { // regadless still used or not. func (t *tOps) close() { t.cache.Zap() + t.bpool.Close() } // Creates new initialized table ops instance. diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table/block_test.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table/block_test.go index ca598f4f5..9032751b9 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table/block_test.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table/block_test.go @@ -40,7 +40,7 @@ var _ = testutil.Defer(func() { data := bw.buf.Bytes() restartsLen := int(binary.LittleEndian.Uint32(data[len(data)-4:])) return &block{ - cmp: comparer.DefaultComparer, + tr: &Reader{cmp: comparer.DefaultComparer}, data: data, restartsLen: restartsLen, restartsOffset: len(data) - (restartsLen+1)*4, diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table/reader.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table/reader.go index f397ac4f8..5ec2b3e53 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table/reader.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table/reader.go @@ -37,8 +37,7 @@ func max(x, y int) int { } type block struct { - bpool *util.BufferPool - cmp comparer.BasicComparer + tr *Reader data []byte restartsLen int restartsOffset int @@ -47,31 +46,25 @@ type block struct { } func (b *block) seek(rstart, rlimit int, key []byte) (index, offset int, err error) { - n := b.restartsOffset - data := b.data - cmp := b.cmp - index = sort.Search(b.restartsLen-rstart-(b.restartsLen-rlimit), func(i int) bool { - offset := int(binary.LittleEndian.Uint32(data[n+4*(rstart+i):])) - offset += 1 // shared always zero, since this is a restart point - v1, n1 := binary.Uvarint(data[offset:]) // key length - _, n2 := binary.Uvarint(data[offset+n1:]) // value length + offset := int(binary.LittleEndian.Uint32(b.data[b.restartsOffset+4*(rstart+i):])) + offset += 1 // shared always zero, since this is a restart point + v1, n1 := binary.Uvarint(b.data[offset:]) // key length + _, n2 := binary.Uvarint(b.data[offset+n1:]) // value length m := offset + n1 + n2 - return cmp.Compare(data[m:m+int(v1)], key) > 0 + return b.tr.cmp.Compare(b.data[m:m+int(v1)], key) > 0 }) + rstart - 1 if index < rstart { // The smallest key is greater-than key sought. index = rstart } - offset = int(binary.LittleEndian.Uint32(data[n+4*index:])) + offset = int(binary.LittleEndian.Uint32(b.data[b.restartsOffset+4*index:])) return } func (b *block) restartIndex(rstart, rlimit, offset int) int { - n := b.restartsOffset - data := b.data return sort.Search(b.restartsLen-rstart-(b.restartsLen-rlimit), func(i int) bool { - return int(binary.LittleEndian.Uint32(data[n+4*(rstart+i):])) > offset + return int(binary.LittleEndian.Uint32(b.data[b.restartsOffset+4*(rstart+i):])) > offset }) + rstart - 1 } @@ -141,10 +134,10 @@ func (b *block) newIterator(slice *util.Range, inclLimit bool, cache util.Releas } func (b *block) Release() { - if b.bpool != nil { - b.bpool.Put(b.data) - b.bpool = nil + if b.tr.bpool != nil { + b.tr.bpool.Put(b.data) } + b.tr = nil b.data = nil } @@ -270,7 +263,7 @@ func (i *blockIter) Seek(key []byte) bool { i.dir = dirForward } for i.Next() { - if i.block.cmp.Compare(i.key, key) >= 0 { + if i.block.tr.cmp.Compare(i.key, key) >= 0 { return true } } @@ -479,7 +472,7 @@ func (i *blockIter) Error() error { } type filterBlock struct { - filter filter.Filter + tr *Reader data []byte oOffset int baseLg uint @@ -493,7 +486,7 @@ func (b *filterBlock) contains(offset uint64, key []byte) bool { n := int(binary.LittleEndian.Uint32(o)) m := int(binary.LittleEndian.Uint32(o[4:])) if n < m && m <= b.oOffset { - return b.filter.Contains(b.data[n:m], key) + return b.tr.filter.Contains(b.data[n:m], key) } else if n == m { return false } @@ -501,10 +494,17 @@ func (b *filterBlock) contains(offset uint64, key []byte) bool { return true } +func (b *filterBlock) Release() { + if b.tr.bpool != nil { + b.tr.bpool.Put(b.data) + } + b.tr = nil + b.data = nil +} + type indexIter struct { - blockIter - tableReader *Reader - slice *util.Range + *blockIter + slice *util.Range // Options checksum bool fillCache bool @@ -523,7 +523,7 @@ func (i *indexIter) Get() iterator.Iterator { if i.slice != nil && (i.blockIter.isFirst() || i.blockIter.isLast()) { slice = i.slice } - return i.tableReader.getDataIter(dataBH, slice, i.checksum, i.fillCache) + return i.blockIter.block.tr.getDataIter(dataBH, slice, i.checksum, i.fillCache) } // Reader is a table reader. @@ -538,9 +538,8 @@ type Reader struct { checksum bool strictIter bool - dataEnd int64 - indexBlock *block - filterBlock *filterBlock + dataEnd int64 + indexBH, filterBH blockHandle } func verifyChecksum(data []byte) bool { @@ -557,6 +556,7 @@ func (r *Reader) readRawBlock(bh blockHandle, checksum bool) ([]byte, error) { } if checksum || r.checksum { if !verifyChecksum(data) { + r.bpool.Put(data) return nil, errors.New("leveldb/table: Reader: invalid block (checksum mismatch)") } } @@ -575,6 +575,7 @@ func (r *Reader) readRawBlock(bh blockHandle, checksum bool) ([]byte, error) { return nil, err } default: + r.bpool.Put(data) return nil, fmt.Errorf("leveldb/table: Reader: unknown block compression type: %d", data[bh.length]) } return data, nil @@ -587,7 +588,7 @@ func (r *Reader) readBlock(bh blockHandle, checksum bool) (*block, error) { } restartsLen := int(binary.LittleEndian.Uint32(data[len(data)-4:])) b := &block{ - cmp: r.cmp, + tr: r, data: data, restartsLen: restartsLen, restartsOffset: len(data) - (restartsLen+1)*4, @@ -596,7 +597,44 @@ func (r *Reader) readBlock(bh blockHandle, checksum bool) (*block, error) { return b, nil } -func (r *Reader) readFilterBlock(bh blockHandle, filter filter.Filter) (*filterBlock, error) { +func (r *Reader) readBlockCached(bh blockHandle, checksum, fillCache bool) (*block, util.Releaser, error) { + if r.cache != nil { + var err error + ch := r.cache.Get(bh.offset, func() (charge int, value interface{}) { + if !fillCache { + return 0, nil + } + var b *block + b, err = r.readBlock(bh, checksum) + if err != nil { + return 0, nil + } + return cap(b.data), b + }) + if ch != nil { + b, ok := ch.Value().(*block) + if !ok { + ch.Release() + return nil, nil, errors.New("leveldb/table: Reader: inconsistent block type") + } + if !b.checksum && (r.checksum || checksum) { + if !verifyChecksum(b.data) { + ch.Release() + return nil, nil, errors.New("leveldb/table: Reader: invalid block (checksum mismatch)") + } + b.checksum = true + } + return b, ch, err + } else if err != nil { + return nil, nil, err + } + } + + b, err := r.readBlock(bh, checksum) + return b, b, err +} + +func (r *Reader) readFilterBlock(bh blockHandle) (*filterBlock, error) { data, err := r.readRawBlock(bh, true) if err != nil { return nil, err @@ -611,7 +649,7 @@ func (r *Reader) readFilterBlock(bh blockHandle, filter filter.Filter) (*filterB return nil, errors.New("leveldb/table: Reader: invalid filter block (invalid offset)") } b := &filterBlock{ - filter: filter, + tr: r, data: data, oOffset: oOffset, baseLg: uint(data[n-1]), @@ -620,42 +658,42 @@ func (r *Reader) readFilterBlock(bh blockHandle, filter filter.Filter) (*filterB return b, nil } -func (r *Reader) getDataIter(dataBH blockHandle, slice *util.Range, checksum, fillCache bool) iterator.Iterator { +func (r *Reader) readFilterBlockCached(bh blockHandle, fillCache bool) (*filterBlock, util.Releaser, error) { if r.cache != nil { - // Get/set block cache. var err error - cache := r.cache.Get(dataBH.offset, func() (charge int, value interface{}) { + ch := r.cache.Get(bh.offset, func() (charge int, value interface{}) { if !fillCache { return 0, nil } - var dataBlock *block - dataBlock, err = r.readBlock(dataBH, checksum) + var b *filterBlock + b, err = r.readFilterBlock(bh) if err != nil { return 0, nil } - return int(dataBH.length), dataBlock + return cap(b.data), b }) - if err != nil { - return iterator.NewEmptyIterator(err) - } - if cache != nil { - dataBlock := cache.Value().(*block) - if !dataBlock.checksum && (r.checksum || checksum) { - if !verifyChecksum(dataBlock.data) { - return iterator.NewEmptyIterator(errors.New("leveldb/table: Reader: invalid block (checksum mismatch)")) - } - dataBlock.checksum = true + if ch != nil { + b, ok := ch.Value().(*filterBlock) + if !ok { + ch.Release() + return nil, nil, errors.New("leveldb/table: Reader: inconsistent block type") } - iter := dataBlock.newIterator(slice, false, cache) - return iter + return b, ch, err + } else if err != nil { + return nil, nil, err } } - dataBlock, err := r.readBlock(dataBH, checksum) + + b, err := r.readFilterBlock(bh) + return b, b, err +} + +func (r *Reader) getDataIter(dataBH blockHandle, slice *util.Range, checksum, fillCache bool) iterator.Iterator { + b, rel, err := r.readBlockCached(dataBH, checksum, fillCache) if err != nil { return iterator.NewEmptyIterator(err) } - iter := dataBlock.newIterator(slice, false, dataBlock) - return iter + return b.newIterator(slice, false, rel) } // NewIterator creates an iterator from the table. @@ -669,18 +707,21 @@ func (r *Reader) getDataIter(dataBH blockHandle, slice *util.Range, checksum, fi // when not used. // // Also read Iterator documentation of the leveldb/iterator package. - func (r *Reader) NewIterator(slice *util.Range, ro *opt.ReadOptions) iterator.Iterator { if r.err != nil { return iterator.NewEmptyIterator(r.err) } + fillCache := !ro.GetDontFillCache() + b, rel, err := r.readBlockCached(r.indexBH, true, fillCache) + if err != nil { + return iterator.NewEmptyIterator(err) + } index := &indexIter{ - blockIter: *r.indexBlock.newIterator(slice, true, nil), - tableReader: r, - slice: slice, - checksum: ro.GetStrict(opt.StrictBlockChecksum), - fillCache: !ro.GetDontFillCache(), + blockIter: b.newIterator(slice, true, rel), + slice: slice, + checksum: ro.GetStrict(opt.StrictBlockChecksum), + fillCache: !ro.GetDontFillCache(), } return iterator.NewIndexedIterator(index, r.strictIter || ro.GetStrict(opt.StrictIterator), false) } @@ -697,7 +738,13 @@ func (r *Reader) Find(key []byte, ro *opt.ReadOptions) (rkey, value []byte, err return } - index := r.indexBlock.newIterator(nil, true, nil) + indexBlock, rel, err := r.readBlockCached(r.indexBH, true, true) + if err != nil { + return + } + defer rel.Release() + + index := indexBlock.newIterator(nil, true, nil) defer index.Release() if !index.Seek(key) { err = index.Error() @@ -711,9 +758,15 @@ func (r *Reader) Find(key []byte, ro *opt.ReadOptions) (rkey, value []byte, err err = errors.New("leveldb/table: Reader: invalid table (bad data block handle)") return } - if r.filterBlock != nil && !r.filterBlock.contains(dataBH.offset, key) { - err = ErrNotFound - return + if r.filter != nil { + filterBlock, rel, ferr := r.readFilterBlockCached(r.filterBH, true) + if ferr == nil { + if !filterBlock.contains(dataBH.offset, key) { + rel.Release() + return nil, nil, ErrNotFound + } + rel.Release() + } } data := r.getDataIter(dataBH, nil, ro.GetStrict(opt.StrictBlockChecksum), !ro.GetDontFillCache()) defer data.Release() @@ -760,7 +813,13 @@ func (r *Reader) OffsetOf(key []byte) (offset int64, err error) { return } - index := r.indexBlock.newIterator(nil, true, nil) + indexBlock, rel, err := r.readBlockCached(r.indexBH, true, true) + if err != nil { + return + } + defer rel.Release() + + index := indexBlock.newIterator(nil, true, nil) defer index.Release() if index.Seek(key) { dataBH, n := decodeBlockHandle(index.Value()) @@ -778,6 +837,17 @@ func (r *Reader) OffsetOf(key []byte) (offset int64, err error) { return } +// Release implements util.Releaser. +// It also close the file if it is an io.Closer. +func (r *Reader) Release() { + if closer, ok := r.reader.(io.Closer); ok { + closer.Close() + } + r.reader = nil + r.cache = nil + r.bpool = nil +} + // NewReader creates a new initialized table reader for the file. // The cache and bpool is optional and can be nil. // @@ -817,16 +887,11 @@ func NewReader(f io.ReaderAt, size int64, cache cache.Namespace, bpool *util.Buf return r } // Decode the index block handle. - indexBH, n := decodeBlockHandle(footer[n:]) + r.indexBH, n = decodeBlockHandle(footer[n:]) if n == 0 { r.err = errors.New("leveldb/table: Reader: invalid table (bad index block handle)") return r } - // Read index block. - r.indexBlock, r.err = r.readBlock(indexBH, true) - if r.err != nil { - return r - } // Read metaindex block. metaBlock, err := r.readBlock(metaBH, true) if err != nil { @@ -842,32 +907,28 @@ func NewReader(f io.ReaderAt, size int64, cache cache.Namespace, bpool *util.Buf continue } fn := key[7:] - var filter filter.Filter if f0 := o.GetFilter(); f0 != nil && f0.Name() == fn { - filter = f0 + r.filter = f0 } else { for _, f0 := range o.GetAltFilters() { if f0.Name() == fn { - filter = f0 + r.filter = f0 break } } } - if filter != nil { + if r.filter != nil { filterBH, n := decodeBlockHandle(metaIter.Value()) if n == 0 { continue } + r.filterBH = filterBH // Update data end. r.dataEnd = int64(filterBH.offset) - filterBlock, err := r.readFilterBlock(filterBH, filter) - if err != nil { - continue - } - r.filterBlock = filterBlock break } } metaIter.Release() + metaBlock.Release() return r } diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table/table_test.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table/table_test.go index 0751cf529..fda541fde 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table/table_test.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table/table_test.go @@ -111,7 +111,9 @@ var _ = testutil.Defer(func() { testutil.AllKeyValueTesting(nil, Build) Describe("with one key per block", Test(testutil.KeyValue_Generate(nil, 9, 1, 10, 512, 512), func(r *Reader) { It("should have correct blocks number", func() { - Expect(r.indexBlock.restartsLen).Should(Equal(9)) + indexBlock, err := r.readBlock(r.indexBH, true) + Expect(err).To(BeNil()) + Expect(indexBlock.restartsLen).Should(Equal(9)) }) })) }) diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/util/buffer_pool_legacy.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/util/buffer_pool.go similarity index 57% rename from Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/util/buffer_pool_legacy.go rename to Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/util/buffer_pool.go index 957f953b5..554e28ebd 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/util/buffer_pool_legacy.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/util/buffer_pool.go @@ -19,15 +19,21 @@ type buffer struct { // BufferPool is a 'buffer pool'. type BufferPool struct { - pool [4]chan []byte - size [3]uint32 - sizeMiss [3]uint32 - baseline0 int - baseline1 int - baseline2 int + pool [6]chan []byte + size [5]uint32 + sizeMiss [5]uint32 + sizeHalf [5]uint32 + baseline [4]int + baselinex0 int + baselinex1 int + baseline0 int + baseline1 int + baseline2 int + close chan struct{} get uint32 put uint32 + half uint32 less uint32 equal uint32 greater uint32 @@ -35,16 +41,15 @@ type BufferPool struct { } func (p *BufferPool) poolNum(n int) int { - switch { - case n <= p.baseline0: + if n <= p.baseline0 && n > p.baseline0/2 { return 0 - case n <= p.baseline1: - return 1 - case n <= p.baseline2: - return 2 - default: - return 3 } + for i, x := range p.baseline { + if n <= x { + return i + 1 + } + } + return len(p.baseline) + 1 } // Get returns buffer with length of n. @@ -59,13 +64,22 @@ func (p *BufferPool) Get(n int) []byte { case b := <-pool: switch { case cap(b) > n: - atomic.AddUint32(&p.less, 1) - return b[:n] + if cap(b)-n >= n { + atomic.AddUint32(&p.half, 1) + select { + case pool <- b: + default: + } + return make([]byte, n) + } else { + atomic.AddUint32(&p.less, 1) + return b[:n] + } case cap(b) == n: atomic.AddUint32(&p.equal, 1) return b[:n] default: - panic("not reached") + atomic.AddUint32(&p.greater, 1) } default: atomic.AddUint32(&p.miss, 1) @@ -79,8 +93,23 @@ func (p *BufferPool) Get(n int) []byte { case b := <-pool: switch { case cap(b) > n: - atomic.AddUint32(&p.less, 1) - return b[:n] + if cap(b)-n >= n { + atomic.AddUint32(&p.half, 1) + sizeHalfPtr := &p.sizeHalf[poolNum-1] + if atomic.AddUint32(sizeHalfPtr, 1) == 20 { + atomic.StoreUint32(sizePtr, uint32(cap(b)/2)) + atomic.StoreUint32(sizeHalfPtr, 0) + } else { + select { + case pool <- b: + default: + } + } + return make([]byte, n) + } else { + atomic.AddUint32(&p.less, 1) + return b[:n] + } case cap(b) == n: atomic.AddUint32(&p.equal, 1) return b[:n] @@ -126,20 +155,34 @@ func (p *BufferPool) Put(b []byte) { } +func (p *BufferPool) Close() { + select { + case p.close <- struct{}{}: + default: + } +} + func (p *BufferPool) String() string { - return fmt.Sprintf("BufferPool{B·%d Z·%v Zm·%v G·%d P·%d <·%d =·%d >·%d M·%d}", - p.baseline0, p.size, p.sizeMiss, p.get, p.put, p.less, p.equal, p.greater, p.miss) + return fmt.Sprintf("BufferPool{B·%d Z·%v Zm·%v Zh·%v G·%d P·%d H·%d <·%d =·%d >·%d M·%d}", + p.baseline0, p.size, p.sizeMiss, p.sizeHalf, p.get, p.put, p.half, p.less, p.equal, p.greater, p.miss) } func (p *BufferPool) drain() { + ticker := time.NewTicker(2 * time.Second) for { - time.Sleep(1 * time.Second) select { - case <-p.pool[0]: - case <-p.pool[1]: - case <-p.pool[2]: - case <-p.pool[3]: - default: + case <-ticker.C: + for _, ch := range p.pool { + select { + case <-ch: + default: + } + } + case <-p.close: + for _, ch := range p.pool { + close(ch) + } + return } } } @@ -151,10 +194,10 @@ func NewBufferPool(baseline int) *BufferPool { } p := &BufferPool{ baseline0: baseline, - baseline1: baseline * 2, - baseline2: baseline * 4, + baseline: [...]int{baseline / 4, baseline / 2, baseline * 2, baseline * 4}, + close: make(chan struct{}, 1), } - for i, cap := range []int{6, 6, 3, 1} { + for i, cap := range []int{2, 2, 4, 4, 2, 1} { p.pool[i] = make(chan []byte, cap) } go p.drain()