vendor: Update github.com/syndtr/goleveldb

This commit is contained in:
Jakob Borg 2016-09-13 21:39:04 +02:00
parent 526cab538a
commit 06dc91fadf
10 changed files with 154 additions and 30 deletions

View File

@ -511,18 +511,12 @@ func (r *Cache) EvictAll() {
} }
} }
// Close closes the 'cache map' and releases all 'cache node'. // Close closes the 'cache map' and forcefully releases all 'cache node'.
func (r *Cache) Close() error { func (r *Cache) Close() error {
r.mu.Lock() r.mu.Lock()
if !r.closed { if !r.closed {
r.closed = true r.closed = true
if r.cacher != nil {
if err := r.cacher.Close(); err != nil {
return err
}
}
h := (*mNode)(r.mHead) h := (*mNode)(r.mHead)
h.initBuckets() h.initBuckets()
@ -541,10 +535,37 @@ func (r *Cache) Close() error {
for _, f := range n.onDel { for _, f := range n.onDel {
f() f()
} }
n.onDel = nil
} }
} }
} }
r.mu.Unlock() r.mu.Unlock()
// Avoid deadlock.
if r.cacher != nil {
if err := r.cacher.Close(); err != nil {
return err
}
}
return nil
}
// CloseWeak closes the 'cache map' and evict all 'cache node' from cacher, but
// unlike Close it doesn't forcefully releases 'cache node'.
func (r *Cache) CloseWeak() error {
r.mu.Lock()
if !r.closed {
r.closed = true
}
r.mu.Unlock()
// Avoid deadlock.
if r.cacher != nil {
r.cacher.EvictAll()
if err := r.cacher.Close(); err != nil {
return err
}
}
return nil return nil
} }

View File

@ -1062,6 +1062,8 @@ func (db *DB) Close() error {
if db.journal != nil { if db.journal != nil {
db.journal.Close() db.journal.Close()
db.journalWriter.Close() db.journalWriter.Close()
db.journal = nil
db.journalWriter = nil
} }
if db.writeDelayN > 0 { if db.writeDelayN > 0 {
@ -1077,15 +1079,11 @@ func (db *DB) Close() error {
if err1 := db.closer.Close(); err == nil { if err1 := db.closer.Close(); err == nil {
err = err1 err = err1
} }
db.closer = nil
} }
// NIL'ing pointers. // Clear memdbs.
db.s = nil db.clearMems()
db.mem = nil
db.frozenMem = nil
db.journal = nil
db.journalWriter = nil
db.closer = nil
return err return err
} }

View File

@ -67,12 +67,11 @@ func (db *DB) sampleSeek(ikey internalKey) {
} }
func (db *DB) mpoolPut(mem *memdb.DB) { func (db *DB) mpoolPut(mem *memdb.DB) {
defer func() { if !db.isClosed() {
recover() select {
}() case db.memPool <- mem:
select { default:
case db.memPool <- mem: }
default:
} }
} }
@ -101,6 +100,12 @@ func (db *DB) mpoolDrain() {
default: default:
} }
case _, _ = <-db.closeC: case _, _ = <-db.closeC:
ticker.Stop()
// Make sure the pool is drained.
select {
case <-db.memPool:
case <-time.After(time.Second):
}
close(db.memPool) close(db.memPool)
return return
} }
@ -148,10 +153,11 @@ func (db *DB) newMem(n int) (mem *memDB, err error) {
func (db *DB) getMems() (e, f *memDB) { func (db *DB) getMems() (e, f *memDB) {
db.memMu.RLock() db.memMu.RLock()
defer db.memMu.RUnlock() defer db.memMu.RUnlock()
if db.mem == nil { if db.mem != nil {
db.mem.incref()
} else if !db.isClosed() {
panic("nil effective mem") panic("nil effective mem")
} }
db.mem.incref()
if db.frozenMem != nil { if db.frozenMem != nil {
db.frozenMem.incref() db.frozenMem.incref()
} }
@ -162,10 +168,11 @@ func (db *DB) getMems() (e, f *memDB) {
func (db *DB) getEffectiveMem() *memDB { func (db *DB) getEffectiveMem() *memDB {
db.memMu.RLock() db.memMu.RLock()
defer db.memMu.RUnlock() defer db.memMu.RUnlock()
if db.mem == nil { if db.mem != nil {
db.mem.incref()
} else if !db.isClosed() {
panic("nil effective mem") panic("nil effective mem")
} }
db.mem.incref()
return db.mem return db.mem
} }
@ -200,6 +207,14 @@ func (db *DB) dropFrozenMem() {
db.memMu.Unlock() db.memMu.Unlock()
} }
// Clear mems ptr; used by DB.Close().
func (db *DB) clearMems() {
db.memMu.Lock()
db.mem = nil
db.frozenMem = nil
db.memMu.Unlock()
}
// Set closed flag; return true if not already closed. // Set closed flag; return true if not already closed.
func (db *DB) setClosed() bool { func (db *DB) setClosed() bool {
return atomic.CompareAndSwapUint32(&db.closed, 0, 1) return atomic.CompareAndSwapUint32(&db.closed, 0, 1)

View File

@ -116,7 +116,6 @@ func (h *dbHarness) closeDB() {
if err := h.closeDB0(); err != nil { if err := h.closeDB0(); err != nil {
h.t.Error("Close: got error: ", err) h.t.Error("Close: got error: ", err)
} }
h.db = nil
} }
h.stor.CloseCheck() h.stor.CloseCheck()
runtime.GC() runtime.GC()
@ -2827,3 +2826,82 @@ func TestDB_BulkInsertDelete(t *testing.T) {
t.Fatalf("too many uncompacted tables: %d (%s)", tot, h.getTablesPerLevel()) t.Fatalf("too many uncompacted tables: %d (%s)", tot, h.getTablesPerLevel())
} }
} }
func TestDB_GracefulClose(t *testing.T) {
runtime.GOMAXPROCS(4)
h := newDbHarnessWopt(t, &opt.Options{
DisableLargeBatchTransaction: true,
Compression: opt.NoCompression,
CompactionTableSize: 1 * opt.MiB,
WriteBuffer: 1 * opt.MiB,
})
defer h.close()
var closeWait sync.WaitGroup
// During write.
n := 0
closing := false
for i := 0; i < 1000000; i++ {
if !closing && h.totalTables() > 3 {
t.Logf("close db during write, index=%d", i)
closeWait.Add(1)
go func() {
h.closeDB()
closeWait.Done()
}()
closing = true
}
if err := h.db.Put([]byte(fmt.Sprintf("%09d", i)), []byte(fmt.Sprintf("VAL-%09d", i)), h.wo); err != nil {
t.Logf("Put error: %s (expected)", err)
n = i
break
}
}
closeWait.Wait()
// During read.
h.openDB()
closing = false
for i := 0; i < n; i++ {
if !closing && i > n/2 {
t.Logf("close db during read, index=%d", i)
closeWait.Add(1)
go func() {
h.closeDB()
closeWait.Done()
}()
closing = true
}
if _, err := h.db.Get([]byte(fmt.Sprintf("%09d", i)), h.ro); err != nil {
t.Logf("Get error: %s (expected)", err)
break
}
}
closeWait.Wait()
// During iterate.
h.openDB()
closing = false
iter := h.db.NewIterator(nil, h.ro)
for i := 0; iter.Next(); i++ {
if len(iter.Key()) == 0 || len(iter.Value()) == 0 {
t.Error("Key or value has zero length")
}
if !closing {
t.Logf("close db during iter, index=%d", i)
closeWait.Add(1)
go func() {
h.closeDB()
closeWait.Done()
}()
closing = true
}
time.Sleep(time.Millisecond)
}
if err := iter.Error(); err != nil {
t.Logf("Iter error: %s (expected)", err)
}
iter.Release()
closeWait.Wait()
}

View File

@ -73,6 +73,10 @@ func (db *DB) flush(n int) (mdb *memDB, mdbFree int, err error) {
v := db.s.version() v := db.s.version()
defer v.release() defer v.release()
mdb = db.getEffectiveMem() mdb = db.getEffectiveMem()
if mdb == nil {
err = ErrClosed
return false
}
defer func() { defer func() {
if retry { if retry {
mdb.decref() mdb.decref()
@ -310,6 +314,9 @@ func (db *DB) CompactRange(r util.Range) error {
// Check for overlaps in memdb. // Check for overlaps in memdb.
mdb := db.getEffectiveMem() mdb := db.getEffectiveMem()
if mdb == nil {
return ErrClosed
}
defer mdb.decref() defer mdb.decref()
if isMemOverlaps(db.s.icmp, mdb.DB, r.Start, r.Limit) { if isMemOverlaps(db.s.icmp, mdb.DB, r.Start, r.Limit) {
// Memdb compaction. // Memdb compaction.

View File

@ -87,7 +87,7 @@ func (s *session) close() {
} }
s.manifest = nil s.manifest = nil
s.manifestWriter = nil s.manifestWriter = nil
s.stVersion = nil s.setVersion(&version{s: s, closing: true})
} }
// Release session lock. // Release session lock.

View File

@ -434,7 +434,7 @@ func (t *tOps) close() {
t.bpool.Close() t.bpool.Close()
t.cache.Close() t.cache.Close()
if t.bcache != nil { if t.bcache != nil {
t.bcache.Close() t.bcache.CloseWeak()
} }
} }

View File

@ -888,7 +888,7 @@ func (r *Reader) Find(key []byte, filtered bool, ro *opt.ReadOptions) (rkey, val
return r.find(key, filtered, ro, false) return r.find(key, filtered, ro, false)
} }
// Find finds key that is greater than or equal to the given key. // FindKey finds key that is greater than or equal to the given key.
// It returns ErrNotFound if the table doesn't contain such key. // It returns ErrNotFound if the table doesn't contain such key.
// If filtered is true then the nearest 'block' will be checked against // If filtered is true then the nearest 'block' will be checked against
// 'filter data' (if present) and will immediately return ErrNotFound if // 'filter data' (if present) and will immediately return ErrNotFound if

View File

@ -34,7 +34,8 @@ type version struct {
cSeek unsafe.Pointer cSeek unsafe.Pointer
ref int closing bool
ref int
// Succeeding version. // Succeeding version.
next *version next *version
} }
@ -131,6 +132,10 @@ func (v *version) walkOverlapping(aux tFiles, ikey internalKey, f func(level int
} }
func (v *version) get(aux tFiles, ikey internalKey, ro *opt.ReadOptions, noValue bool) (value []byte, tcomp bool, err error) { func (v *version) get(aux tFiles, ikey internalKey, ro *opt.ReadOptions, noValue bool) (value []byte, tcomp bool, err error) {
if v.closing {
return nil, false, ErrClosed
}
ukey := ikey.ukey() ukey := ikey.ukey()
var ( var (

2
vendor/manifest vendored
View File

@ -233,7 +233,7 @@
{ {
"importpath": "github.com/syndtr/goleveldb", "importpath": "github.com/syndtr/goleveldb",
"repository": "https://github.com/syndtr/goleveldb", "repository": "https://github.com/syndtr/goleveldb",
"revision": "ab8b5dcf1042e818ab68e770d465112a899b668e", "revision": "6ae1797c0b42b9323fc27ff7dcf568df88f2f33d",
"branch": "master" "branch": "master"
}, },
{ {