diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index d96e9fdbb..04c7a2ceb 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -49,7 +49,7 @@ }, { "ImportPath": "github.com/syndtr/goleveldb/leveldb", - "Rev": "6f6f5d93f7499d2c505c2839c1d6b28b25a2ce21" + "Rev": "a44c00531ccc005546f20c6e00ab7bb9a8f6b2e0" }, { "ImportPath": "github.com/vitrun/qart/coding", diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/bench_test.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/bench_test.go index 1b790402b..295eebaef 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/bench_test.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/bench_test.go @@ -13,7 +13,6 @@ import ( "os" "path/filepath" "runtime" - "sync/atomic" "testing" "github.com/syndtr/goleveldb/leveldb/iterator" @@ -345,50 +344,6 @@ func BenchmarkDBRead(b *testing.B) { p.close() } -func BenchmarkDBReadConcurrent(b *testing.B) { - p := openDBBench(b, false) - p.populate(b.N) - p.fill() - p.gc() - defer p.close() - - b.ResetTimer() - b.SetBytes(116) - - b.RunParallel(func(pb *testing.PB) { - iter := p.newIter() - defer iter.Release() - for pb.Next() && iter.Next() { - } - }) -} - -func BenchmarkDBReadConcurrent2(b *testing.B) { - p := openDBBench(b, false) - p.populate(b.N) - p.fill() - p.gc() - defer p.close() - - b.ResetTimer() - b.SetBytes(116) - - var dir uint32 - b.RunParallel(func(pb *testing.PB) { - iter := p.newIter() - defer iter.Release() - if atomic.AddUint32(&dir, 1)%2 == 0 { - for pb.Next() && iter.Next() { - } - } else { - if pb.Next() && iter.Last() { - for pb.Next() && iter.Prev() { - } - } - } - }) -} - func BenchmarkDBReadGC(b *testing.B) { p := openDBBench(b, false) p.populate(b.N) diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db.go index b29b5f1e7..1be8748d4 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db.go @@ -35,8 +35,8 @@ type DB struct { // MemDB. memMu sync.RWMutex - mem *memdb.DB - frozenMem *memdb.DB + memPool *util.Pool + mem, frozenMem *memDB journal *journal.Writer journalWriter storage.Writer journalFile storage.File @@ -79,6 +79,8 @@ func openDB(s *session) (*DB, error) { s: s, // Initial sequence seq: s.stSeq, + // MemDB + memPool: util.NewPool(1), // Write writeC: make(chan *Batch), writeMergedC: make(chan bool), @@ -560,19 +562,20 @@ func (db *DB) get(key []byte, seq uint64, ro *opt.ReadOptions) (value []byte, er ikey := newIKey(key, seq, tSeek) em, fm := db.getMems() - for _, m := range [...]*memdb.DB{em, fm} { + for _, m := range [...]*memDB{em, fm} { if m == nil { continue } + defer m.decref() - mk, mv, me := m.Find(ikey) + mk, mv, me := m.db.Find(ikey) if me == nil { ukey, _, t, ok := parseIkey(mk) if ok && db.s.icmp.uCompare(ukey, key) == 0 { if t == tDel { return nil, ErrNotFound } - return mv, nil + return append([]byte{}, mv...), nil } } else if me != ErrNotFound { return nil, me @@ -592,8 +595,9 @@ func (db *DB) get(key []byte, seq uint64, ro *opt.ReadOptions) (value []byte, er // Get gets the value for the given key. It returns ErrNotFound if the // DB does not contain the key. // -// The caller should not modify the contents of the returned slice, but -// it is safe to modify the contents of the argument after Get returns. +// The returned slice is its own copy, it is safe to modify the contents +// of the returned slice. +// It is safe to modify the contents of the argument after Get returns. func (db *DB) Get(key []byte, ro *opt.ReadOptions) (value []byte, err error) { err = db.ok() if err != nil { diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_compaction.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_compaction.go index 87ac12cb5..e5e74d7e1 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_compaction.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_compaction.go @@ -216,14 +216,15 @@ func (db *DB) memCompaction() { if mem == nil { return } + defer mem.decref() c := newCMem(db.s) stats := new(cStatsStaging) - db.logf("mem@flush N·%d S·%s", mem.Len(), shortenb(mem.Size())) + db.logf("mem@flush N·%d S·%s", mem.db.Len(), shortenb(mem.db.Size())) // Don't compact empty memdb. - if mem.Len() == 0 { + if mem.db.Len() == 0 { db.logf("mem@flush skipping") // drop frozen mem db.dropFrozenMem() @@ -241,7 +242,7 @@ func (db *DB) memCompaction() { db.compactionTransact("mem@flush", func(cnt *compactionTransactCounter) (err error) { stats.startTimer() defer stats.stopTimer() - return c.flush(mem, -1) + return c.flush(mem.db, -1) }, func() error { for _, r := range c.rec.addedTables { db.logf("mem@flush rollback @%d", r.num) diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_iter.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_iter.go index 911d29a13..d028768d5 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_iter.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_iter.go @@ -9,6 +9,7 @@ package leveldb import ( "errors" "runtime" + "sync" "github.com/syndtr/goleveldb/leveldb/iterator" "github.com/syndtr/goleveldb/leveldb/opt" @@ -19,6 +20,17 @@ var ( errInvalidIkey = errors.New("leveldb: Iterator: invalid internal key") ) +type memdbReleaser struct { + once sync.Once + m *memDB +} + +func (mr *memdbReleaser) Release() { + mr.once.Do(func() { + mr.m.decref() + }) +} + func (db *DB) newRawIterator(slice *util.Range, ro *opt.ReadOptions) iterator.Iterator { em, fm := db.getMems() v := db.s.version() @@ -26,9 +38,13 @@ func (db *DB) newRawIterator(slice *util.Range, ro *opt.ReadOptions) iterator.It ti := v.getIterators(slice, ro) n := len(ti) + 2 i := make([]iterator.Iterator, 0, n) - i = append(i, em.NewIterator(slice)) + emi := em.db.NewIterator(slice) + emi.SetReleaser(&memdbReleaser{m: em}) + i = append(i, emi) if fm != nil { - i = append(i, fm.NewIterator(slice)) + fmi := fm.db.NewIterator(slice) + fmi.SetReleaser(&memdbReleaser{m: fm}) + i = append(i, fmi) } i = append(i, ti...) strict := db.s.o.GetStrict(opt.StrictIterator) || ro.GetStrict(opt.StrictIterator) diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_state.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_state.go index 425bcfd5b..807cc6c7a 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_state.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_state.go @@ -11,8 +11,27 @@ import ( "github.com/syndtr/goleveldb/leveldb/journal" "github.com/syndtr/goleveldb/leveldb/memdb" + "github.com/syndtr/goleveldb/leveldb/util" ) +type memDB struct { + pool *util.Pool + db *memdb.DB + ref int32 +} + +func (m *memDB) incref() { + atomic.AddInt32(&m.ref, 1) +} + +func (m *memDB) decref() { + if ref := atomic.AddInt32(&m.ref, -1); ref == 0 { + m.pool.Put(m) + } else if ref < 0 { + panic("negative memdb ref") + } +} + // Get latest sequence number. func (db *DB) getSeq() uint64 { return atomic.LoadUint64(&db.seq) @@ -25,7 +44,7 @@ func (db *DB) addSeq(delta uint64) { // Create new memdb and froze the old one; need external synchronization. // newMem only called synchronously by the writer. -func (db *DB) newMem(n int) (mem *memdb.DB, err error) { +func (db *DB) newMem(n int) (mem *memDB, err error) { num := db.s.allocFileNum() file := db.s.getJournalFile(num) w, err := file.Create() @@ -37,6 +56,10 @@ func (db *DB) newMem(n int) (mem *memdb.DB, err error) { db.memMu.Lock() defer db.memMu.Unlock() + if db.frozenMem != nil { + panic("still has frozen mem") + } + if db.journal == nil { db.journal = journal.NewWriter(w) } else { @@ -47,8 +70,19 @@ func (db *DB) newMem(n int) (mem *memdb.DB, err error) { db.journalWriter = w db.journalFile = file db.frozenMem = db.mem - db.mem = memdb.New(db.s.icmp, maxInt(db.s.o.GetWriteBuffer(), n)) - mem = db.mem + mem, ok := db.memPool.Get().(*memDB) + if ok && mem.db.Capacity() >= n { + mem.db.Reset() + mem.incref() + } else { + mem = &memDB{ + pool: db.memPool, + db: memdb.New(db.s.icmp, maxInt(db.s.o.GetWriteBuffer(), n)), + ref: 1, + } + } + mem.incref() + db.mem = mem // The seq only incremented by the writer. And whoever called newMem // should hold write lock, so no need additional synchronization here. db.frozenSeq = db.seq @@ -56,16 +90,27 @@ func (db *DB) newMem(n int) (mem *memdb.DB, err error) { } // Get all memdbs. -func (db *DB) getMems() (e *memdb.DB, f *memdb.DB) { +func (db *DB) getMems() (e, f *memDB) { db.memMu.RLock() defer db.memMu.RUnlock() + if db.mem == nil { + panic("nil effective mem") + } + db.mem.incref() + if db.frozenMem != nil { + db.frozenMem.incref() + } return db.mem, db.frozenMem } // Get frozen memdb. -func (db *DB) getEffectiveMem() *memdb.DB { +func (db *DB) getEffectiveMem() *memDB { db.memMu.RLock() defer db.memMu.RUnlock() + if db.mem == nil { + panic("nil effective mem") + } + db.mem.incref() return db.mem } @@ -77,9 +122,12 @@ func (db *DB) hasFrozenMem() bool { } // Get frozen memdb. -func (db *DB) getFrozenMem() *memdb.DB { +func (db *DB) getFrozenMem() *memDB { db.memMu.RLock() defer db.memMu.RUnlock() + if db.frozenMem != nil { + db.frozenMem.incref() + } return db.frozenMem } @@ -92,6 +140,7 @@ func (db *DB) dropFrozenMem() { db.logf("journal@remove removed @%d", db.frozenJournalFile.Num()) } db.frozenJournalFile = nil + db.frozenMem.decref() db.frozenMem = nil db.memMu.Unlock() } diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_write.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_write.go index 2b2840320..755108590 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_write.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_write.go @@ -45,7 +45,7 @@ func (db *DB) jWriter() { } } -func (db *DB) rotateMem(n int) (mem *memdb.DB, err error) { +func (db *DB) rotateMem(n int) (mem *memDB, err error) { // Wait for pending memdb compaction. err = db.compSendIdle(db.mcompCmdC) if err != nil { @@ -63,13 +63,19 @@ func (db *DB) rotateMem(n int) (mem *memdb.DB, err error) { return } -func (db *DB) flush(n int) (mem *memdb.DB, nn int, err error) { +func (db *DB) flush(n int) (mem *memDB, nn int, err error) { delayed := false - flush := func() bool { + flush := func() (retry bool) { v := db.s.version() defer v.release() mem = db.getEffectiveMem() - nn = mem.Free() + defer func() { + if retry { + mem.decref() + mem = nil + } + }() + nn = mem.db.Free() switch { case v.tLen(0) >= kL0_SlowdownWritesTrigger && !delayed: delayed = true @@ -84,12 +90,17 @@ func (db *DB) flush(n int) (mem *memdb.DB, nn int, err error) { } default: // Allow memdb to grow if it has no entry. - if mem.Len() == 0 { + if mem.db.Len() == 0 { nn = n - return false + } else { + mem.decref() + mem, err = db.rotateMem(n) + if err == nil { + nn = mem.db.Free() + } else { + nn = 0 + } } - mem, err = db.rotateMem(n) - nn = mem.Free() return false } return true @@ -140,6 +151,7 @@ retry: if err != nil { return } + defer mem.decref() // Calculate maximum size of the batch. m := 1 << 20 @@ -178,7 +190,7 @@ drain: return case db.journalC <- b: // Write into memdb - b.memReplay(mem) + b.memReplay(mem.db) } // Wait for journal writer select { @@ -188,7 +200,7 @@ drain: case err = <-db.journalAckC: if err != nil { // Revert memdb if error detected - b.revertMemReplay(mem) + b.revertMemReplay(mem.db) return } } @@ -197,7 +209,7 @@ drain: if err != nil { return } - b.memReplay(mem) + b.memReplay(mem.db) } // Set last seq number. @@ -258,7 +270,8 @@ func (db *DB) CompactRange(r util.Range) error { // Check for overlaps in memdb. mem := db.getEffectiveMem() - if isMemOverlaps(db.s.icmp, mem, r.Start, r.Limit) { + defer mem.decref() + if isMemOverlaps(db.s.icmp, mem.db, r.Start, r.Limit) { // Memdb compaction. if _, err := db.rotateMem(0); err != nil { <-db.writeLockC diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/go13_bench_test.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/go13_bench_test.go new file mode 100644 index 000000000..e76657e5e --- /dev/null +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/go13_bench_test.go @@ -0,0 +1,58 @@ +// Copyright (c) 2012, Suryandaru Triandana +// All rights reserved. +// +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +// +build go1.3 + +package leveldb + +import ( + "sync/atomic" + "testing" +) + +func BenchmarkDBReadConcurrent(b *testing.B) { + p := openDBBench(b, false) + p.populate(b.N) + p.fill() + p.gc() + defer p.close() + + b.ResetTimer() + b.SetBytes(116) + + b.RunParallel(func(pb *testing.PB) { + iter := p.newIter() + defer iter.Release() + for pb.Next() && iter.Next() { + } + }) +} + +func BenchmarkDBReadConcurrent2(b *testing.B) { + p := openDBBench(b, false) + p.populate(b.N) + p.fill() + p.gc() + defer p.close() + + b.ResetTimer() + b.SetBytes(116) + + var dir uint32 + b.RunParallel(func(pb *testing.PB) { + iter := p.newIter() + defer iter.Release() + if atomic.AddUint32(&dir, 1)%2 == 0 { + for pb.Next() && iter.Next() { + } + } else { + if pb.Next() && iter.Last() { + for pb.Next() && iter.Prev() { + } + } + } + }) +} diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/util/buffer_pool.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/util/buffer_pool.go index edafdfdeb..6980e9ab6 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/util/buffer_pool.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/util/buffer_pool.go @@ -4,6 +4,8 @@ // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. +// +build go1.3 + package util import ( diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/util/buffer_pool_legacy.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/util/buffer_pool_legacy.go new file mode 100644 index 000000000..2bffb54e2 --- /dev/null +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/util/buffer_pool_legacy.go @@ -0,0 +1,143 @@ +// Copyright (c) 2014, Suryandaru Triandana +// All rights reserved. +// +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +// +build !go1.3 + +package util + +import ( + "fmt" + "sync/atomic" +) + +type buffer struct { + b []byte + miss int +} + +// BufferPool is a 'buffer pool'. +type BufferPool struct { + pool [4]chan []byte + size [3]uint32 + sizeMiss [3]uint32 + baseline0 int + baseline1 int + baseline2 int + + less uint32 + equal uint32 + greater uint32 + miss uint32 +} + +func (p *BufferPool) poolNum(n int) int { + switch { + case n <= p.baseline0: + return 0 + case n <= p.baseline1: + return 1 + case n <= p.baseline2: + return 2 + default: + return 3 + } +} + +// Get returns buffer with length of n. +func (p *BufferPool) Get(n int) []byte { + poolNum := p.poolNum(n) + pool := p.pool[poolNum] + if poolNum == 0 { + // Fast path. + select { + case b := <-pool: + switch { + case cap(b) > n: + atomic.AddUint32(&p.less, 1) + return b[:n] + case cap(b) == n: + atomic.AddUint32(&p.equal, 1) + return b[:n] + default: + panic("not reached") + } + default: + atomic.AddUint32(&p.miss, 1) + } + + return make([]byte, n, p.baseline0) + } else { + sizePtr := &p.size[poolNum-1] + + select { + case b := <-pool: + switch { + case cap(b) > n: + atomic.AddUint32(&p.less, 1) + return b[:n] + case cap(b) == n: + atomic.AddUint32(&p.equal, 1) + return b[:n] + default: + atomic.AddUint32(&p.greater, 1) + if uint32(cap(b)) >= atomic.LoadUint32(sizePtr) { + select { + case pool <- b: + default: + } + } + } + default: + atomic.AddUint32(&p.miss, 1) + } + + if size := atomic.LoadUint32(sizePtr); uint32(n) > size { + if size == 0 { + atomic.CompareAndSwapUint32(sizePtr, 0, uint32(n)) + } else { + sizeMissPtr := &p.sizeMiss[poolNum-1] + if atomic.AddUint32(sizeMissPtr, 1) == 20 { + atomic.StoreUint32(sizePtr, uint32(n)) + atomic.StoreUint32(sizeMissPtr, 0) + } + } + return make([]byte, n) + } else { + return make([]byte, n, size) + } + } +} + +// Put adds given buffer to the pool. +func (p *BufferPool) Put(b []byte) { + pool := p.pool[p.poolNum(cap(b))] + select { + case pool <- b: + default: + } + +} + +func (p *BufferPool) String() string { + return fmt.Sprintf("BufferPool{B·%d Z·%v Zm·%v L·%d E·%d G·%d M·%d}", + p.baseline0, p.size, p.sizeMiss, p.less, p.equal, p.greater, p.miss) +} + +// NewBufferPool creates a new initialized 'buffer pool'. +func NewBufferPool(baseline int) *BufferPool { + if baseline <= 0 { + panic("baseline can't be <= 0") + } + p := &BufferPool{ + baseline0: baseline, + baseline1: baseline * 2, + baseline2: baseline * 4, + } + for i, cap := range []int{6, 6, 3, 1} { + p.pool[i] = make(chan []byte, cap) + } + return p +} diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/util/pool.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/util/pool.go new file mode 100644 index 000000000..1f7fdd41f --- /dev/null +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/util/pool.go @@ -0,0 +1,21 @@ +// Copyright (c) 2014, Suryandaru Triandana +// All rights reserved. +// +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +// +build go1.3 + +package util + +import ( + "sync" +) + +type Pool struct { + sync.Pool +} + +func NewPool(cap int) *Pool { + return &Pool{} +} diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/util/pool_legacy.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/util/pool_legacy.go new file mode 100644 index 000000000..27b8d03be --- /dev/null +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/util/pool_legacy.go @@ -0,0 +1,33 @@ +// Copyright (c) 2014, Suryandaru Triandana +// All rights reserved. +// +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +// +build !go1.3 + +package util + +type Pool struct { + pool chan interface{} +} + +func (p *Pool) Get() interface{} { + select { + case x := <-p.pool: + return x + default: + return nil + } +} + +func (p *Pool) Put(x interface{}) { + select { + case p.pool <- x: + default: + } +} + +func NewPool(cap int) *Pool { + return &Pool{pool: make(chan interface{}, cap)} +}