diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/cache/cache.go b/vendor/github.com/syndtr/goleveldb/leveldb/cache/cache.go index a287d0e5e..f28e1bdea 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/cache/cache.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/cache/cache.go @@ -511,18 +511,12 @@ func (r *Cache) EvictAll() { } } -// Close closes the 'cache map' and releases all 'cache node'. +// Close closes the 'cache map' and forcefully releases all 'cache node'. func (r *Cache) Close() error { r.mu.Lock() if !r.closed { r.closed = true - if r.cacher != nil { - if err := r.cacher.Close(); err != nil { - return err - } - } - h := (*mNode)(r.mHead) h.initBuckets() @@ -541,10 +535,37 @@ func (r *Cache) Close() error { for _, f := range n.onDel { f() } + n.onDel = nil } } } r.mu.Unlock() + + // Avoid deadlock. + if r.cacher != nil { + if err := r.cacher.Close(); err != nil { + return err + } + } + return nil +} + +// CloseWeak closes the 'cache map' and evict all 'cache node' from cacher, but +// unlike Close it doesn't forcefully releases 'cache node'. +func (r *Cache) CloseWeak() error { + r.mu.Lock() + if !r.closed { + r.closed = true + } + r.mu.Unlock() + + // Avoid deadlock. + if r.cacher != nil { + r.cacher.EvictAll() + if err := r.cacher.Close(); err != nil { + return err + } + } return nil } diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/db.go b/vendor/github.com/syndtr/goleveldb/leveldb/db.go index eb6abd0fb..1aed20943 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/db.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/db.go @@ -1062,6 +1062,8 @@ func (db *DB) Close() error { if db.journal != nil { db.journal.Close() db.journalWriter.Close() + db.journal = nil + db.journalWriter = nil } if db.writeDelayN > 0 { @@ -1077,15 +1079,11 @@ func (db *DB) Close() error { if err1 := db.closer.Close(); err == nil { err = err1 } + db.closer = nil } - // NIL'ing pointers. - db.s = nil - db.mem = nil - db.frozenMem = nil - db.journal = nil - db.journalWriter = nil - db.closer = nil + // Clear memdbs. + db.clearMems() return err } diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/db_state.go b/vendor/github.com/syndtr/goleveldb/leveldb/db_state.go index 40f454da1..b7de09e15 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/db_state.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/db_state.go @@ -67,12 +67,11 @@ func (db *DB) sampleSeek(ikey internalKey) { } func (db *DB) mpoolPut(mem *memdb.DB) { - defer func() { - recover() - }() - select { - case db.memPool <- mem: - default: + if !db.isClosed() { + select { + case db.memPool <- mem: + default: + } } } @@ -101,6 +100,12 @@ func (db *DB) mpoolDrain() { default: } case _, _ = <-db.closeC: + ticker.Stop() + // Make sure the pool is drained. + select { + case <-db.memPool: + case <-time.After(time.Second): + } close(db.memPool) return } @@ -148,10 +153,11 @@ func (db *DB) newMem(n int) (mem *memDB, err error) { func (db *DB) getMems() (e, f *memDB) { db.memMu.RLock() defer db.memMu.RUnlock() - if db.mem == nil { + if db.mem != nil { + db.mem.incref() + } else if !db.isClosed() { panic("nil effective mem") } - db.mem.incref() if db.frozenMem != nil { db.frozenMem.incref() } @@ -162,10 +168,11 @@ func (db *DB) getMems() (e, f *memDB) { func (db *DB) getEffectiveMem() *memDB { db.memMu.RLock() defer db.memMu.RUnlock() - if db.mem == nil { + if db.mem != nil { + db.mem.incref() + } else if !db.isClosed() { panic("nil effective mem") } - db.mem.incref() return db.mem } @@ -200,6 +207,14 @@ func (db *DB) dropFrozenMem() { db.memMu.Unlock() } +// Clear mems ptr; used by DB.Close(). +func (db *DB) clearMems() { + db.memMu.Lock() + db.mem = nil + db.frozenMem = nil + db.memMu.Unlock() +} + // Set closed flag; return true if not already closed. func (db *DB) setClosed() bool { return atomic.CompareAndSwapUint32(&db.closed, 0, 1) diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/db_test.go b/vendor/github.com/syndtr/goleveldb/leveldb/db_test.go index 37d08eb86..6eaf0d558 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/db_test.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/db_test.go @@ -116,7 +116,6 @@ func (h *dbHarness) closeDB() { if err := h.closeDB0(); err != nil { h.t.Error("Close: got error: ", err) } - h.db = nil } h.stor.CloseCheck() runtime.GC() @@ -2827,3 +2826,82 @@ func TestDB_BulkInsertDelete(t *testing.T) { t.Fatalf("too many uncompacted tables: %d (%s)", tot, h.getTablesPerLevel()) } } + +func TestDB_GracefulClose(t *testing.T) { + runtime.GOMAXPROCS(4) + h := newDbHarnessWopt(t, &opt.Options{ + DisableLargeBatchTransaction: true, + Compression: opt.NoCompression, + CompactionTableSize: 1 * opt.MiB, + WriteBuffer: 1 * opt.MiB, + }) + defer h.close() + + var closeWait sync.WaitGroup + + // During write. + n := 0 + closing := false + for i := 0; i < 1000000; i++ { + if !closing && h.totalTables() > 3 { + t.Logf("close db during write, index=%d", i) + closeWait.Add(1) + go func() { + h.closeDB() + closeWait.Done() + }() + closing = true + } + if err := h.db.Put([]byte(fmt.Sprintf("%09d", i)), []byte(fmt.Sprintf("VAL-%09d", i)), h.wo); err != nil { + t.Logf("Put error: %s (expected)", err) + n = i + break + } + } + closeWait.Wait() + + // During read. + h.openDB() + closing = false + for i := 0; i < n; i++ { + if !closing && i > n/2 { + t.Logf("close db during read, index=%d", i) + closeWait.Add(1) + go func() { + h.closeDB() + closeWait.Done() + }() + closing = true + } + if _, err := h.db.Get([]byte(fmt.Sprintf("%09d", i)), h.ro); err != nil { + t.Logf("Get error: %s (expected)", err) + break + } + } + closeWait.Wait() + + // During iterate. + h.openDB() + closing = false + iter := h.db.NewIterator(nil, h.ro) + for i := 0; iter.Next(); i++ { + if len(iter.Key()) == 0 || len(iter.Value()) == 0 { + t.Error("Key or value has zero length") + } + if !closing { + t.Logf("close db during iter, index=%d", i) + closeWait.Add(1) + go func() { + h.closeDB() + closeWait.Done() + }() + closing = true + } + time.Sleep(time.Millisecond) + } + if err := iter.Error(); err != nil { + t.Logf("Iter error: %s (expected)", err) + } + iter.Release() + closeWait.Wait() +} diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/db_write.go b/vendor/github.com/syndtr/goleveldb/leveldb/db_write.go index 5576761fe..37acb3f3a 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/db_write.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/db_write.go @@ -73,6 +73,10 @@ func (db *DB) flush(n int) (mdb *memDB, mdbFree int, err error) { v := db.s.version() defer v.release() mdb = db.getEffectiveMem() + if mdb == nil { + err = ErrClosed + return false + } defer func() { if retry { mdb.decref() @@ -310,6 +314,9 @@ func (db *DB) CompactRange(r util.Range) error { // Check for overlaps in memdb. mdb := db.getEffectiveMem() + if mdb == nil { + return ErrClosed + } defer mdb.decref() if isMemOverlaps(db.s.icmp, mdb.DB, r.Start, r.Limit) { // Memdb compaction. diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/session.go b/vendor/github.com/syndtr/goleveldb/leveldb/session.go index b0d3fef1d..c9679b6b5 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/session.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/session.go @@ -87,7 +87,7 @@ func (s *session) close() { } s.manifest = nil s.manifestWriter = nil - s.stVersion = nil + s.setVersion(&version{s: s, closing: true}) } // Release session lock. diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/table.go b/vendor/github.com/syndtr/goleveldb/leveldb/table.go index 310ba6c22..81d18a531 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/table.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/table.go @@ -434,7 +434,7 @@ func (t *tOps) close() { t.bpool.Close() t.cache.Close() if t.bcache != nil { - t.bcache.Close() + t.bcache.CloseWeak() } } diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/table/reader.go b/vendor/github.com/syndtr/goleveldb/leveldb/table/reader.go index ae61bece9..1edf4e2ba 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/table/reader.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/table/reader.go @@ -888,7 +888,7 @@ func (r *Reader) Find(key []byte, filtered bool, ro *opt.ReadOptions) (rkey, val return r.find(key, filtered, ro, false) } -// Find finds key that is greater than or equal to the given key. +// FindKey finds key that is greater than or equal to the given key. // It returns ErrNotFound if the table doesn't contain such key. // If filtered is true then the nearest 'block' will be checked against // 'filter data' (if present) and will immediately return ErrNotFound if diff --git a/vendor/github.com/syndtr/goleveldb/leveldb/version.go b/vendor/github.com/syndtr/goleveldb/leveldb/version.go index d274eeff2..c60f12c20 100644 --- a/vendor/github.com/syndtr/goleveldb/leveldb/version.go +++ b/vendor/github.com/syndtr/goleveldb/leveldb/version.go @@ -34,7 +34,8 @@ type version struct { cSeek unsafe.Pointer - ref int + closing bool + ref int // Succeeding version. next *version } @@ -131,6 +132,10 @@ func (v *version) walkOverlapping(aux tFiles, ikey internalKey, f func(level int } func (v *version) get(aux tFiles, ikey internalKey, ro *opt.ReadOptions, noValue bool) (value []byte, tcomp bool, err error) { + if v.closing { + return nil, false, ErrClosed + } + ukey := ikey.ukey() var ( diff --git a/vendor/manifest b/vendor/manifest index 63ca6c919..a5af4f3a2 100644 --- a/vendor/manifest +++ b/vendor/manifest @@ -233,7 +233,7 @@ { "importpath": "github.com/syndtr/goleveldb", "repository": "https://github.com/syndtr/goleveldb", - "revision": "ab8b5dcf1042e818ab68e770d465112a899b668e", + "revision": "6ae1797c0b42b9323fc27ff7dcf568df88f2f33d", "branch": "master" }, {