vendor: Update github.com/syndtr/goleveldb

This commit is contained in:
Jakob Borg 2016-12-28 12:19:14 +01:00
parent c69c3c7c36
commit eb14f85a57
43 changed files with 1005 additions and 669 deletions

View File

@ -9,13 +9,15 @@ package leveldb
import ( import (
"encoding/binary" "encoding/binary"
"fmt" "fmt"
"io"
"github.com/syndtr/goleveldb/leveldb/errors" "github.com/syndtr/goleveldb/leveldb/errors"
"github.com/syndtr/goleveldb/leveldb/memdb" "github.com/syndtr/goleveldb/leveldb/memdb"
"github.com/syndtr/goleveldb/leveldb/storage" "github.com/syndtr/goleveldb/leveldb/storage"
) )
// ErrBatchCorrupted records reason of batch corruption. // ErrBatchCorrupted records reason of batch corruption. This error will be
// wrapped with errors.ErrCorrupted.
type ErrBatchCorrupted struct { type ErrBatchCorrupted struct {
Reason string Reason string
} }
@ -29,8 +31,9 @@ func newErrBatchCorrupted(reason string) error {
} }
const ( const (
batchHdrLen = 8 + 4 batchHeaderLen = 8 + 4
batchGrowRec = 3000 batchGrowRec = 3000
batchBufioSize = 16
) )
// BatchReplay wraps basic batch operations. // BatchReplay wraps basic batch operations.
@ -39,34 +42,46 @@ type BatchReplay interface {
Delete(key []byte) Delete(key []byte)
} }
type batchIndex struct {
keyType keyType
keyPos, keyLen int
valuePos, valueLen int
}
func (index batchIndex) k(data []byte) []byte {
return data[index.keyPos : index.keyPos+index.keyLen]
}
func (index batchIndex) v(data []byte) []byte {
if index.valueLen != 0 {
return data[index.valuePos : index.valuePos+index.valueLen]
}
return nil
}
func (index batchIndex) kv(data []byte) (key, value []byte) {
return index.k(data), index.v(data)
}
// Batch is a write batch. // Batch is a write batch.
type Batch struct { type Batch struct {
data []byte data []byte
rLen, bLen int index []batchIndex
seq uint64
sync bool // internalLen is sums of key/value pair length plus 8-bytes internal key.
internalLen int
} }
func (b *Batch) grow(n int) { func (b *Batch) grow(n int) {
off := len(b.data) o := len(b.data)
if off == 0 { if cap(b.data)-o < n {
off = batchHdrLen div := 1
if b.data != nil { if len(b.index) > batchGrowRec {
b.data = b.data[:off] div = len(b.index) / batchGrowRec
}
}
if cap(b.data)-off < n {
if b.data == nil {
b.data = make([]byte, off, off+n)
} else {
odata := b.data
div := 1
if b.rLen > batchGrowRec {
div = b.rLen / batchGrowRec
}
b.data = make([]byte, off, off+n+(off-batchHdrLen)/div)
copy(b.data, odata)
} }
ndata := make([]byte, o, o+n+o/div)
copy(ndata, b.data)
b.data = ndata
} }
} }
@ -76,32 +91,36 @@ func (b *Batch) appendRec(kt keyType, key, value []byte) {
n += binary.MaxVarintLen32 + len(value) n += binary.MaxVarintLen32 + len(value)
} }
b.grow(n) b.grow(n)
off := len(b.data) index := batchIndex{keyType: kt}
data := b.data[:off+n] o := len(b.data)
data[off] = byte(kt) data := b.data[:o+n]
off++ data[o] = byte(kt)
off += binary.PutUvarint(data[off:], uint64(len(key))) o++
copy(data[off:], key) o += binary.PutUvarint(data[o:], uint64(len(key)))
off += len(key) index.keyPos = o
index.keyLen = len(key)
o += copy(data[o:], key)
if kt == keyTypeVal { if kt == keyTypeVal {
off += binary.PutUvarint(data[off:], uint64(len(value))) o += binary.PutUvarint(data[o:], uint64(len(value)))
copy(data[off:], value) index.valuePos = o
off += len(value) index.valueLen = len(value)
o += copy(data[o:], value)
} }
b.data = data[:off] b.data = data[:o]
b.rLen++ b.index = append(b.index, index)
// Include 8-byte ikey header b.internalLen += index.keyLen + index.valueLen + 8
b.bLen += len(key) + len(value) + 8
} }
// Put appends 'put operation' of the given key/value pair to the batch. // Put appends 'put operation' of the given key/value pair to the batch.
// It is safe to modify the contents of the argument after Put returns. // It is safe to modify the contents of the argument after Put returns but not
// before.
func (b *Batch) Put(key, value []byte) { func (b *Batch) Put(key, value []byte) {
b.appendRec(keyTypeVal, key, value) b.appendRec(keyTypeVal, key, value)
} }
// Delete appends 'delete operation' of the given key to the batch. // Delete appends 'delete operation' of the given key to the batch.
// It is safe to modify the contents of the argument after Delete returns. // It is safe to modify the contents of the argument after Delete returns but
// not before.
func (b *Batch) Delete(key []byte) { func (b *Batch) Delete(key []byte) {
b.appendRec(keyTypeDel, key, nil) b.appendRec(keyTypeDel, key, nil)
} }
@ -111,7 +130,7 @@ func (b *Batch) Delete(key []byte) {
// The returned slice is not its own copy, so the contents should not be // The returned slice is not its own copy, so the contents should not be
// modified. // modified.
func (b *Batch) Dump() []byte { func (b *Batch) Dump() []byte {
return b.encode() return b.data
} }
// Load loads given slice into the batch. Previous contents of the batch // Load loads given slice into the batch. Previous contents of the batch
@ -119,144 +138,212 @@ func (b *Batch) Dump() []byte {
// The given slice will not be copied and will be used as batch buffer, so // The given slice will not be copied and will be used as batch buffer, so
// it is not safe to modify the contents of the slice. // it is not safe to modify the contents of the slice.
func (b *Batch) Load(data []byte) error { func (b *Batch) Load(data []byte) error {
return b.decode(0, data) return b.decode(data, -1)
} }
// Replay replays batch contents. // Replay replays batch contents.
func (b *Batch) Replay(r BatchReplay) error { func (b *Batch) Replay(r BatchReplay) error {
return b.decodeRec(func(i int, kt keyType, key, value []byte) error { for _, index := range b.index {
switch kt { switch index.keyType {
case keyTypeVal: case keyTypeVal:
r.Put(key, value) r.Put(index.k(b.data), index.v(b.data))
case keyTypeDel: case keyTypeDel:
r.Delete(key) r.Delete(index.k(b.data))
} }
return nil }
}) return nil
} }
// Len returns number of records in the batch. // Len returns number of records in the batch.
func (b *Batch) Len() int { func (b *Batch) Len() int {
return b.rLen return len(b.index)
} }
// Reset resets the batch. // Reset resets the batch.
func (b *Batch) Reset() { func (b *Batch) Reset() {
b.data = b.data[:0] b.data = b.data[:0]
b.seq = 0 b.index = b.index[:0]
b.rLen = 0 b.internalLen = 0
b.bLen = 0
b.sync = false
} }
func (b *Batch) init(sync bool) { func (b *Batch) replayInternal(fn func(i int, kt keyType, k, v []byte) error) error {
b.sync = sync for i, index := range b.index {
} if err := fn(i, index.keyType, index.k(b.data), index.v(b.data)); err != nil {
func (b *Batch) append(p *Batch) {
if p.rLen > 0 {
b.grow(len(p.data) - batchHdrLen)
b.data = append(b.data, p.data[batchHdrLen:]...)
b.rLen += p.rLen
b.bLen += p.bLen
}
if p.sync {
b.sync = true
}
}
// size returns sums of key/value pair length plus 8-bytes ikey.
func (b *Batch) size() int {
return b.bLen
}
func (b *Batch) encode() []byte {
b.grow(0)
binary.LittleEndian.PutUint64(b.data, b.seq)
binary.LittleEndian.PutUint32(b.data[8:], uint32(b.rLen))
return b.data
}
func (b *Batch) decode(prevSeq uint64, data []byte) error {
if len(data) < batchHdrLen {
return newErrBatchCorrupted("too short")
}
b.seq = binary.LittleEndian.Uint64(data)
if b.seq < prevSeq {
return newErrBatchCorrupted("invalid sequence number")
}
b.rLen = int(binary.LittleEndian.Uint32(data[8:]))
if b.rLen < 0 {
return newErrBatchCorrupted("invalid records length")
}
// No need to be precise at this point, it won't be used anyway
b.bLen = len(data) - batchHdrLen
b.data = data
return nil
}
func (b *Batch) decodeRec(f func(i int, kt keyType, key, value []byte) error) error {
off := batchHdrLen
for i := 0; i < b.rLen; i++ {
if off >= len(b.data) {
return newErrBatchCorrupted("invalid records length")
}
kt := keyType(b.data[off])
if kt > keyTypeVal {
panic(kt)
return newErrBatchCorrupted("bad record: invalid type")
}
off++
x, n := binary.Uvarint(b.data[off:])
off += n
if n <= 0 || off+int(x) > len(b.data) {
return newErrBatchCorrupted("bad record: invalid key length")
}
key := b.data[off : off+int(x)]
off += int(x)
var value []byte
if kt == keyTypeVal {
x, n := binary.Uvarint(b.data[off:])
off += n
if n <= 0 || off+int(x) > len(b.data) {
return newErrBatchCorrupted("bad record: invalid value length")
}
value = b.data[off : off+int(x)]
off += int(x)
}
if err := f(i, kt, key, value); err != nil {
return err return err
} }
} }
return nil return nil
} }
func (b *Batch) memReplay(to *memdb.DB) error { func (b *Batch) append(p *Batch) {
var ikScratch []byte ob := len(b.data)
return b.decodeRec(func(i int, kt keyType, key, value []byte) error { oi := len(b.index)
ikScratch = makeInternalKey(ikScratch, key, b.seq+uint64(i), kt) b.data = append(b.data, p.data...)
return to.Put(ikScratch, value) b.index = append(b.index, p.index...)
}) b.internalLen += p.internalLen
// Updating index offset.
if ob != 0 {
for ; oi < len(b.index); oi++ {
index := &b.index[oi]
index.keyPos += ob
if index.valueLen != 0 {
index.valuePos += ob
}
}
}
} }
func (b *Batch) memDecodeAndReplay(prevSeq uint64, data []byte, to *memdb.DB) error { func (b *Batch) decode(data []byte, expectedLen int) error {
if err := b.decode(prevSeq, data); err != nil { b.data = data
b.index = b.index[:0]
b.internalLen = 0
err := decodeBatch(data, func(i int, index batchIndex) error {
b.index = append(b.index, index)
b.internalLen += index.keyLen + index.valueLen + 8
return nil
})
if err != nil {
return err return err
} }
return b.memReplay(to) if expectedLen >= 0 && len(b.index) != expectedLen {
return newErrBatchCorrupted(fmt.Sprintf("invalid records length: %d vs %d", expectedLen, len(b.index)))
}
return nil
} }
func (b *Batch) revertMemReplay(to *memdb.DB) error { func (b *Batch) putMem(seq uint64, mdb *memdb.DB) error {
var ikScratch []byte var ik []byte
return b.decodeRec(func(i int, kt keyType, key, value []byte) error { for i, index := range b.index {
ikScratch := makeInternalKey(ikScratch, key, b.seq+uint64(i), kt) ik = makeInternalKey(ik, index.k(b.data), seq+uint64(i), index.keyType)
return to.Delete(ikScratch) if err := mdb.Put(ik, index.v(b.data)); err != nil {
}) return err
}
}
return nil
}
func (b *Batch) revertMem(seq uint64, mdb *memdb.DB) error {
var ik []byte
for i, index := range b.index {
ik = makeInternalKey(ik, index.k(b.data), seq+uint64(i), index.keyType)
if err := mdb.Delete(ik); err != nil {
return err
}
}
return nil
}
func newBatch() interface{} {
return &Batch{}
}
func decodeBatch(data []byte, fn func(i int, index batchIndex) error) error {
var index batchIndex
for i, o := 0, 0; o < len(data); i++ {
// Key type.
index.keyType = keyType(data[o])
if index.keyType > keyTypeVal {
return newErrBatchCorrupted(fmt.Sprintf("bad record: invalid type %#x", uint(index.keyType)))
}
o++
// Key.
x, n := binary.Uvarint(data[o:])
o += n
if n <= 0 || o+int(x) > len(data) {
return newErrBatchCorrupted("bad record: invalid key length")
}
index.keyPos = o
index.keyLen = int(x)
o += index.keyLen
// Value.
if index.keyType == keyTypeVal {
x, n = binary.Uvarint(data[o:])
o += n
if n <= 0 || o+int(x) > len(data) {
return newErrBatchCorrupted("bad record: invalid value length")
}
index.valuePos = o
index.valueLen = int(x)
o += index.valueLen
} else {
index.valuePos = 0
index.valueLen = 0
}
if err := fn(i, index); err != nil {
return err
}
}
return nil
}
func decodeBatchToMem(data []byte, expectSeq uint64, mdb *memdb.DB) (seq uint64, batchLen int, err error) {
seq, batchLen, err = decodeBatchHeader(data)
if err != nil {
return 0, 0, err
}
if seq < expectSeq {
return 0, 0, newErrBatchCorrupted("invalid sequence number")
}
data = data[batchHeaderLen:]
var ik []byte
var decodedLen int
err = decodeBatch(data, func(i int, index batchIndex) error {
if i >= batchLen {
return newErrBatchCorrupted("invalid records length")
}
ik = makeInternalKey(ik, index.k(data), seq+uint64(i), index.keyType)
if err := mdb.Put(ik, index.v(data)); err != nil {
return err
}
decodedLen++
return nil
})
if err == nil && decodedLen != batchLen {
err = newErrBatchCorrupted(fmt.Sprintf("invalid records length: %d vs %d", batchLen, decodedLen))
}
return
}
func encodeBatchHeader(dst []byte, seq uint64, batchLen int) []byte {
dst = ensureBuffer(dst, batchHeaderLen)
binary.LittleEndian.PutUint64(dst, seq)
binary.LittleEndian.PutUint32(dst[8:], uint32(batchLen))
return dst
}
func decodeBatchHeader(data []byte) (seq uint64, batchLen int, err error) {
if len(data) < batchHeaderLen {
return 0, 0, newErrBatchCorrupted("too short")
}
seq = binary.LittleEndian.Uint64(data)
batchLen = int(binary.LittleEndian.Uint32(data[8:]))
if batchLen < 0 {
return 0, 0, newErrBatchCorrupted("invalid records length")
}
return
}
func batchesLen(batches []*Batch) int {
batchLen := 0
for _, batch := range batches {
batchLen += batch.Len()
}
return batchLen
}
func writeBatchesWithHeader(wr io.Writer, batches []*Batch, seq uint64) error {
if _, err := wr.Write(encodeBatchHeader(nil, seq, batchesLen(batches))); err != nil {
return err
}
for _, batch := range batches {
if _, err := wr.Write(batch.data); err != nil {
return err
}
}
return nil
} }

View File

@ -8,116 +8,140 @@ package leveldb
import ( import (
"bytes" "bytes"
"fmt"
"testing" "testing"
"testing/quick"
"github.com/syndtr/goleveldb/leveldb/comparer" "github.com/syndtr/goleveldb/leveldb/testutil"
"github.com/syndtr/goleveldb/leveldb/memdb"
) )
type tbRec struct { func TestBatchHeader(t *testing.T) {
kt keyType f := func(seq uint64, length uint32) bool {
key, value []byte encoded := encodeBatchHeader(nil, seq, int(length))
decSeq, decLength, err := decodeBatchHeader(encoded)
return err == nil && decSeq == seq && decLength == int(length)
}
config := &quick.Config{
Rand: testutil.NewRand(),
}
if err := quick.Check(f, config); err != nil {
t.Error(err)
}
} }
type testBatch struct { type batchKV struct {
rec []*tbRec kt keyType
k, v []byte
} }
func (p *testBatch) Put(key, value []byte) { func TestBatch(t *testing.T) {
p.rec = append(p.rec, &tbRec{keyTypeVal, key, value}) var (
} kvs []batchKV
internalLen int
func (p *testBatch) Delete(key []byte) { )
p.rec = append(p.rec, &tbRec{keyTypeDel, key, nil}) batch := new(Batch)
} rbatch := new(Batch)
abatch := new(Batch)
func compareBatch(t *testing.T, b1, b2 *Batch) { testBatch := func(i int, kt keyType, k, v []byte) error {
if b1.seq != b2.seq { kv := kvs[i]
t.Errorf("invalid seq number want %d, got %d", b1.seq, b2.seq) if kv.kt != kt {
} return fmt.Errorf("invalid key type, index=%d: %d vs %d", i, kv.kt, kt)
if b1.Len() != b2.Len() {
t.Fatalf("invalid record length want %d, got %d", b1.Len(), b2.Len())
}
p1, p2 := new(testBatch), new(testBatch)
err := b1.Replay(p1)
if err != nil {
t.Fatal("error when replaying batch 1: ", err)
}
err = b2.Replay(p2)
if err != nil {
t.Fatal("error when replaying batch 2: ", err)
}
for i := range p1.rec {
r1, r2 := p1.rec[i], p2.rec[i]
if r1.kt != r2.kt {
t.Errorf("invalid type on record '%d' want %d, got %d", i, r1.kt, r2.kt)
} }
if !bytes.Equal(r1.key, r2.key) { if !bytes.Equal(kv.k, k) {
t.Errorf("invalid key on record '%d' want %s, got %s", i, string(r1.key), string(r2.key)) return fmt.Errorf("invalid key, index=%d", i)
} }
if r1.kt == keyTypeVal { if !bytes.Equal(kv.v, v) {
if !bytes.Equal(r1.value, r2.value) { return fmt.Errorf("invalid value, index=%d", i)
t.Errorf("invalid value on record '%d' want %s, got %s", i, string(r1.value), string(r2.value)) }
return nil
}
f := func(ktr uint8, k, v []byte) bool {
kt := keyType(ktr % 2)
if kt == keyTypeVal {
batch.Put(k, v)
rbatch.Put(k, v)
kvs = append(kvs, batchKV{kt: kt, k: k, v: v})
internalLen += len(k) + len(v) + 8
} else {
batch.Delete(k)
rbatch.Delete(k)
kvs = append(kvs, batchKV{kt: kt, k: k})
internalLen += len(k) + 8
}
if batch.Len() != len(kvs) {
t.Logf("batch.Len: %d vs %d", len(kvs), batch.Len())
return false
}
if batch.internalLen != internalLen {
t.Logf("abatch.internalLen: %d vs %d", internalLen, batch.internalLen)
return false
}
if len(kvs)%1000 == 0 {
if err := batch.replayInternal(testBatch); err != nil {
t.Logf("batch.replayInternal: %v", err)
return false
}
abatch.append(rbatch)
rbatch.Reset()
if abatch.Len() != len(kvs) {
t.Logf("abatch.Len: %d vs %d", len(kvs), abatch.Len())
return false
}
if abatch.internalLen != internalLen {
t.Logf("abatch.internalLen: %d vs %d", internalLen, abatch.internalLen)
return false
}
if err := abatch.replayInternal(testBatch); err != nil {
t.Logf("abatch.replayInternal: %v", err)
return false
}
nbatch := new(Batch)
if err := nbatch.Load(batch.Dump()); err != nil {
t.Logf("nbatch.Load: %v", err)
return false
}
if nbatch.Len() != len(kvs) {
t.Logf("nbatch.Len: %d vs %d", len(kvs), nbatch.Len())
return false
}
if nbatch.internalLen != internalLen {
t.Logf("nbatch.internalLen: %d vs %d", internalLen, nbatch.internalLen)
return false
}
if err := nbatch.replayInternal(testBatch); err != nil {
t.Logf("nbatch.replayInternal: %v", err)
return false
} }
} }
} if len(kvs)%10000 == 0 {
} nbatch := new(Batch)
if err := batch.Replay(nbatch); err != nil {
func TestBatch_EncodeDecode(t *testing.T) { t.Logf("batch.Replay: %v", err)
b1 := new(Batch) return false
b1.seq = 10009 }
b1.Put([]byte("key1"), []byte("value1")) if nbatch.Len() != len(kvs) {
b1.Put([]byte("key2"), []byte("value2")) t.Logf("nbatch.Len: %d vs %d", len(kvs), nbatch.Len())
b1.Delete([]byte("key1")) return false
b1.Put([]byte("k"), []byte("")) }
b1.Put([]byte("zzzzzzzzzzz"), []byte("zzzzzzzzzzzzzzzzzzzzzzzz")) if nbatch.internalLen != internalLen {
b1.Delete([]byte("key10000")) t.Logf("nbatch.internalLen: %d vs %d", internalLen, nbatch.internalLen)
b1.Delete([]byte("k")) return false
buf := b1.encode() }
b2 := new(Batch) if err := nbatch.replayInternal(testBatch); err != nil {
err := b2.decode(0, buf) t.Logf("nbatch.replayInternal: %v", err)
if err != nil { return false
t.Error("error when decoding batch: ", err) }
}
compareBatch(t, b1, b2)
}
func TestBatch_Append(t *testing.T) {
b1 := new(Batch)
b1.seq = 10009
b1.Put([]byte("key1"), []byte("value1"))
b1.Put([]byte("key2"), []byte("value2"))
b1.Delete([]byte("key1"))
b1.Put([]byte("foo"), []byte("foovalue"))
b1.Put([]byte("bar"), []byte("barvalue"))
b2a := new(Batch)
b2a.seq = 10009
b2a.Put([]byte("key1"), []byte("value1"))
b2a.Put([]byte("key2"), []byte("value2"))
b2a.Delete([]byte("key1"))
b2b := new(Batch)
b2b.Put([]byte("foo"), []byte("foovalue"))
b2b.Put([]byte("bar"), []byte("barvalue"))
b2a.append(b2b)
compareBatch(t, b1, b2a)
if b1.size() != b2a.size() {
t.Fatalf("invalid batch size want %d, got %d", b1.size(), b2a.size())
}
}
func TestBatch_Size(t *testing.T) {
b := new(Batch)
for i := 0; i < 2; i++ {
b.Put([]byte("key1"), []byte("value1"))
b.Put([]byte("key2"), []byte("value2"))
b.Delete([]byte("key1"))
b.Put([]byte("foo"), []byte("foovalue"))
b.Put([]byte("bar"), []byte("barvalue"))
mem := memdb.New(&iComparer{comparer.DefaultComparer}, 0)
b.memReplay(mem)
if b.size() != mem.Size() {
t.Errorf("invalid batch size calculation, want=%d got=%d", mem.Size(), b.size())
} }
b.Reset() return true
} }
config := &quick.Config{
MaxCount: 40000,
Rand: testutil.NewRand(),
}
if err := quick.Check(f, config); err != nil {
t.Error(err)
}
t.Logf("length=%d internalLen=%d", len(kvs), internalLen)
} }

View File

@ -104,7 +104,6 @@ func openDBBench(b *testing.B, noCompress bool) *dbBench {
b.Fatal("cannot open db: ", err) b.Fatal("cannot open db: ", err)
} }
runtime.GOMAXPROCS(runtime.NumCPU())
return p return p
} }
@ -260,7 +259,6 @@ func (p *dbBench) close() {
p.keys = nil p.keys = nil
p.values = nil p.values = nil
runtime.GC() runtime.GC()
runtime.GOMAXPROCS(1)
} }
func BenchmarkDBWrite(b *testing.B) { func BenchmarkDBWrite(b *testing.B) {

View File

@ -16,7 +16,7 @@ import (
) )
// Cacher provides interface to implements a caching functionality. // Cacher provides interface to implements a caching functionality.
// An implementation must be goroutine-safe. // An implementation must be safe for concurrent use.
type Cacher interface { type Cacher interface {
// Capacity returns cache capacity. // Capacity returns cache capacity.
Capacity() int Capacity() int

View File

@ -50,14 +50,24 @@ func set(c *Cache, ns, key uint64, value Value, charge int, relf func()) *Handle
}) })
} }
type cacheMapTestParams struct {
nobjects, nhandles, concurrent, repeat int
}
func TestCacheMap(t *testing.T) { func TestCacheMap(t *testing.T) {
runtime.GOMAXPROCS(runtime.NumCPU()) runtime.GOMAXPROCS(runtime.NumCPU())
nsx := []struct { var params []cacheMapTestParams
nobjects, nhandles, concurrent, repeat int if testing.Short() {
}{ params = []cacheMapTestParams{
{10000, 400, 50, 3}, {1000, 100, 20, 3},
{100000, 1000, 100, 10}, {10000, 300, 50, 10},
}
} else {
params = []cacheMapTestParams{
{10000, 400, 50, 3},
{100000, 1000, 100, 10},
}
} }
var ( var (
@ -65,7 +75,7 @@ func TestCacheMap(t *testing.T) {
handles [][]unsafe.Pointer handles [][]unsafe.Pointer
) )
for _, x := range nsx { for _, x := range params {
objects = append(objects, make([]int32o, x.nobjects)) objects = append(objects, make([]int32o, x.nobjects))
handles = append(handles, make([]unsafe.Pointer, x.nhandles)) handles = append(handles, make([]unsafe.Pointer, x.nhandles))
} }
@ -75,7 +85,7 @@ func TestCacheMap(t *testing.T) {
wg := new(sync.WaitGroup) wg := new(sync.WaitGroup)
var done int32 var done int32
for ns, x := range nsx { for ns, x := range params {
for i := 0; i < x.concurrent; i++ { for i := 0; i < x.concurrent; i++ {
wg.Add(1) wg.Add(1)
go func(ns, i, repeat int, objects []int32o, handles []unsafe.Pointer) { go func(ns, i, repeat int, objects []int32o, handles []unsafe.Pointer) {

View File

@ -6,7 +6,9 @@
package leveldb package leveldb
import "github.com/syndtr/goleveldb/leveldb/comparer" import (
"github.com/syndtr/goleveldb/leveldb/comparer"
)
type iComparer struct { type iComparer struct {
ucmp comparer.Comparer ucmp comparer.Comparer
@ -33,12 +35,12 @@ func (icmp *iComparer) Name() string {
} }
func (icmp *iComparer) Compare(a, b []byte) int { func (icmp *iComparer) Compare(a, b []byte) int {
x := icmp.ucmp.Compare(internalKey(a).ukey(), internalKey(b).ukey()) x := icmp.uCompare(internalKey(a).ukey(), internalKey(b).ukey())
if x == 0 { if x == 0 {
if m, n := internalKey(a).num(), internalKey(b).num(); m > n { if m, n := internalKey(a).num(), internalKey(b).num(); m > n {
x = -1 return -1
} else if m < n { } else if m < n {
x = 1 return 1
} }
} }
return x return x
@ -46,30 +48,20 @@ func (icmp *iComparer) Compare(a, b []byte) int {
func (icmp *iComparer) Separator(dst, a, b []byte) []byte { func (icmp *iComparer) Separator(dst, a, b []byte) []byte {
ua, ub := internalKey(a).ukey(), internalKey(b).ukey() ua, ub := internalKey(a).ukey(), internalKey(b).ukey()
dst = icmp.ucmp.Separator(dst, ua, ub) dst = icmp.uSeparator(dst, ua, ub)
if dst == nil { if dst != nil && len(dst) < len(ua) && icmp.uCompare(ua, dst) < 0 {
return nil // Append earliest possible number.
return append(dst, keyMaxNumBytes...)
} }
if len(dst) < len(ua) && icmp.uCompare(ua, dst) < 0 { return nil
dst = append(dst, keyMaxNumBytes...)
} else {
// Did not close possibilities that n maybe longer than len(ub).
dst = append(dst, a[len(a)-8:]...)
}
return dst
} }
func (icmp *iComparer) Successor(dst, b []byte) []byte { func (icmp *iComparer) Successor(dst, b []byte) []byte {
ub := internalKey(b).ukey() ub := internalKey(b).ukey()
dst = icmp.ucmp.Successor(dst, ub) dst = icmp.uSuccessor(dst, ub)
if dst == nil { if dst != nil && len(dst) < len(ub) && icmp.uCompare(ub, dst) < 0 {
return nil // Append earliest possible number.
return append(dst, keyMaxNumBytes...)
} }
if len(dst) < len(ub) && icmp.uCompare(ub, dst) < 0 { return nil
dst = append(dst, keyMaxNumBytes...)
} else {
// Did not close possibilities that n maybe longer than len(ub).
dst = append(dst, b[len(b)-8:]...)
}
return dst
} }

View File

@ -53,14 +53,13 @@ type DB struct {
aliveSnaps, aliveIters int32 aliveSnaps, aliveIters int32
// Write. // Write.
writeC chan *Batch batchPool sync.Pool
writeMergeC chan writeMerge
writeMergedC chan bool writeMergedC chan bool
writeLockC chan struct{} writeLockC chan struct{}
writeAckC chan error writeAckC chan error
writeDelay time.Duration writeDelay time.Duration
writeDelayN int writeDelayN int
journalC chan *Batch
journalAckC chan error
tr *Transaction tr *Transaction
// Compaction. // Compaction.
@ -94,12 +93,11 @@ func openDB(s *session) (*DB, error) {
// Snapshot // Snapshot
snapsList: list.New(), snapsList: list.New(),
// Write // Write
writeC: make(chan *Batch), batchPool: sync.Pool{New: newBatch},
writeMergeC: make(chan writeMerge),
writeMergedC: make(chan bool), writeMergedC: make(chan bool),
writeLockC: make(chan struct{}, 1), writeLockC: make(chan struct{}, 1),
writeAckC: make(chan error), writeAckC: make(chan error),
journalC: make(chan *Batch),
journalAckC: make(chan error),
// Compaction // Compaction
tcompCmdC: make(chan cCmd), tcompCmdC: make(chan cCmd),
tcompPauseC: make(chan chan<- struct{}), tcompPauseC: make(chan chan<- struct{}),
@ -144,10 +142,10 @@ func openDB(s *session) (*DB, error) {
if readOnly { if readOnly {
db.SetReadOnly() db.SetReadOnly()
} else { } else {
db.closeW.Add(3) db.closeW.Add(2)
go db.tCompaction() go db.tCompaction()
go db.mCompaction() go db.mCompaction()
go db.jWriter() // go db.jWriter()
} }
s.logf("db@open done T·%v", time.Since(start)) s.logf("db@open done T·%v", time.Since(start))
@ -162,10 +160,10 @@ func openDB(s *session) (*DB, error) {
// os.ErrExist error. // os.ErrExist error.
// //
// Open will return an error with type of ErrCorrupted if corruption // Open will return an error with type of ErrCorrupted if corruption
// detected in the DB. Corrupted DB can be recovered with Recover // detected in the DB. Use errors.IsCorrupted to test whether an error is
// function. // due to corruption. Corrupted DB can be recovered with Recover function.
// //
// The returned DB instance is goroutine-safe. // The returned DB instance is safe for concurrent use.
// The DB must be closed after use, by calling Close method. // The DB must be closed after use, by calling Close method.
func Open(stor storage.Storage, o *opt.Options) (db *DB, err error) { func Open(stor storage.Storage, o *opt.Options) (db *DB, err error) {
s, err := newSession(stor, o) s, err := newSession(stor, o)
@ -202,13 +200,13 @@ func Open(stor storage.Storage, o *opt.Options) (db *DB, err error) {
// os.ErrExist error. // os.ErrExist error.
// //
// OpenFile uses standard file-system backed storage implementation as // OpenFile uses standard file-system backed storage implementation as
// desribed in the leveldb/storage package. // described in the leveldb/storage package.
// //
// OpenFile will return an error with type of ErrCorrupted if corruption // OpenFile will return an error with type of ErrCorrupted if corruption
// detected in the DB. Corrupted DB can be recovered with Recover // detected in the DB. Use errors.IsCorrupted to test whether an error is
// function. // due to corruption. Corrupted DB can be recovered with Recover function.
// //
// The returned DB instance is goroutine-safe. // The returned DB instance is safe for concurrent use.
// The DB must be closed after use, by calling Close method. // The DB must be closed after use, by calling Close method.
func OpenFile(path string, o *opt.Options) (db *DB, err error) { func OpenFile(path string, o *opt.Options) (db *DB, err error) {
stor, err := storage.OpenFile(path, o.GetReadOnly()) stor, err := storage.OpenFile(path, o.GetReadOnly())
@ -229,7 +227,7 @@ func OpenFile(path string, o *opt.Options) (db *DB, err error) {
// The DB must already exist or it will returns an error. // The DB must already exist or it will returns an error.
// Also, Recover will ignore ErrorIfMissing and ErrorIfExist options. // Also, Recover will ignore ErrorIfMissing and ErrorIfExist options.
// //
// The returned DB instance is goroutine-safe. // The returned DB instance is safe for concurrent use.
// The DB must be closed after use, by calling Close method. // The DB must be closed after use, by calling Close method.
func Recover(stor storage.Storage, o *opt.Options) (db *DB, err error) { func Recover(stor storage.Storage, o *opt.Options) (db *DB, err error) {
s, err := newSession(stor, o) s, err := newSession(stor, o)
@ -255,10 +253,10 @@ func Recover(stor storage.Storage, o *opt.Options) (db *DB, err error) {
// The DB must already exist or it will returns an error. // The DB must already exist or it will returns an error.
// Also, Recover will ignore ErrorIfMissing and ErrorIfExist options. // Also, Recover will ignore ErrorIfMissing and ErrorIfExist options.
// //
// RecoverFile uses standard file-system backed storage implementation as desribed // RecoverFile uses standard file-system backed storage implementation as described
// in the leveldb/storage package. // in the leveldb/storage package.
// //
// The returned DB instance is goroutine-safe. // The returned DB instance is safe for concurrent use.
// The DB must be closed after use, by calling Close method. // The DB must be closed after use, by calling Close method.
func RecoverFile(path string, o *opt.Options) (db *DB, err error) { func RecoverFile(path string, o *opt.Options) (db *DB, err error) {
stor, err := storage.OpenFile(path, false) stor, err := storage.OpenFile(path, false)
@ -504,10 +502,11 @@ func (db *DB) recoverJournal() error {
checksum = db.s.o.GetStrict(opt.StrictJournalChecksum) checksum = db.s.o.GetStrict(opt.StrictJournalChecksum)
writeBuffer = db.s.o.GetWriteBuffer() writeBuffer = db.s.o.GetWriteBuffer()
jr *journal.Reader jr *journal.Reader
mdb = memdb.New(db.s.icmp, writeBuffer) mdb = memdb.New(db.s.icmp, writeBuffer)
buf = &util.Buffer{} buf = &util.Buffer{}
batch = &Batch{} batchSeq uint64
batchLen int
) )
for _, fd := range fds { for _, fd := range fds {
@ -526,7 +525,7 @@ func (db *DB) recoverJournal() error {
} }
// Flush memdb and remove obsolete journal file. // Flush memdb and remove obsolete journal file.
if !ofd.Nil() { if !ofd.Zero() {
if mdb.Len() > 0 { if mdb.Len() > 0 {
if _, err := db.s.flushMemdb(rec, mdb, 0); err != nil { if _, err := db.s.flushMemdb(rec, mdb, 0); err != nil {
fr.Close() fr.Close()
@ -569,7 +568,8 @@ func (db *DB) recoverJournal() error {
fr.Close() fr.Close()
return errors.SetFd(err, fd) return errors.SetFd(err, fd)
} }
if err := batch.memDecodeAndReplay(db.seq, buf.Bytes(), mdb); err != nil { batchSeq, batchLen, err = decodeBatchToMem(buf.Bytes(), db.seq, mdb)
if err != nil {
if !strict && errors.IsCorrupted(err) { if !strict && errors.IsCorrupted(err) {
db.s.logf("journal error: %v (skipped)", err) db.s.logf("journal error: %v (skipped)", err)
// We won't apply sequence number as it might be corrupted. // We won't apply sequence number as it might be corrupted.
@ -581,7 +581,7 @@ func (db *DB) recoverJournal() error {
} }
// Save sequence number. // Save sequence number.
db.seq = batch.seq + uint64(batch.Len()) db.seq = batchSeq + uint64(batchLen)
// Flush it if large enough. // Flush it if large enough.
if mdb.Size() >= writeBuffer { if mdb.Size() >= writeBuffer {
@ -624,7 +624,7 @@ func (db *DB) recoverJournal() error {
} }
// Remove the last obsolete journal file. // Remove the last obsolete journal file.
if !ofd.Nil() { if !ofd.Zero() {
db.s.stor.Remove(ofd) db.s.stor.Remove(ofd)
} }
@ -661,9 +661,10 @@ func (db *DB) recoverJournalRO() error {
db.logf("journal@recovery RO·Mode F·%d", len(fds)) db.logf("journal@recovery RO·Mode F·%d", len(fds))
var ( var (
jr *journal.Reader jr *journal.Reader
buf = &util.Buffer{} buf = &util.Buffer{}
batch = &Batch{} batchSeq uint64
batchLen int
) )
for _, fd := range fds { for _, fd := range fds {
@ -703,7 +704,8 @@ func (db *DB) recoverJournalRO() error {
fr.Close() fr.Close()
return errors.SetFd(err, fd) return errors.SetFd(err, fd)
} }
if err := batch.memDecodeAndReplay(db.seq, buf.Bytes(), mdb); err != nil { batchSeq, batchLen, err = decodeBatchToMem(buf.Bytes(), db.seq, mdb)
if err != nil {
if !strict && errors.IsCorrupted(err) { if !strict && errors.IsCorrupted(err) {
db.s.logf("journal error: %v (skipped)", err) db.s.logf("journal error: %v (skipped)", err)
// We won't apply sequence number as it might be corrupted. // We won't apply sequence number as it might be corrupted.
@ -715,7 +717,7 @@ func (db *DB) recoverJournalRO() error {
} }
// Save sequence number. // Save sequence number.
db.seq = batch.seq + uint64(batch.Len()) db.seq = batchSeq + uint64(batchLen)
} }
fr.Close() fr.Close()
@ -856,7 +858,7 @@ func (db *DB) Has(key []byte, ro *opt.ReadOptions) (ret bool, err error) {
// NewIterator returns an iterator for the latest snapshot of the // NewIterator returns an iterator for the latest snapshot of the
// underlying DB. // underlying DB.
// The returned iterator is not goroutine-safe, but it is safe to use // The returned iterator is not safe for concurrent use, but it is safe to use
// multiple iterators concurrently, with each in a dedicated goroutine. // multiple iterators concurrently, with each in a dedicated goroutine.
// It is also safe to use an iterator concurrently with modifying its // It is also safe to use an iterator concurrently with modifying its
// underlying DB. The resultant key/value pairs are guaranteed to be // underlying DB. The resultant key/value pairs are guaranteed to be

View File

@ -96,7 +96,7 @@ noerr:
default: default:
goto haserr goto haserr
} }
case _, _ = <-db.closeC: case <-db.closeC:
return return
} }
} }
@ -113,7 +113,7 @@ haserr:
goto hasperr goto hasperr
default: default:
} }
case _, _ = <-db.closeC: case <-db.closeC:
return return
} }
} }
@ -126,7 +126,7 @@ hasperr:
case db.writeLockC <- struct{}{}: case db.writeLockC <- struct{}{}:
// Hold write lock, so that write won't pass-through. // Hold write lock, so that write won't pass-through.
db.compWriteLocking = true db.compWriteLocking = true
case _, _ = <-db.closeC: case <-db.closeC:
if db.compWriteLocking { if db.compWriteLocking {
// We should release the lock or Close will hang. // We should release the lock or Close will hang.
<-db.writeLockC <-db.writeLockC
@ -195,7 +195,7 @@ func (db *DB) compactionTransact(name string, t compactionTransactInterface) {
db.logf("%s exiting (persistent error %q)", name, perr) db.logf("%s exiting (persistent error %q)", name, perr)
db.compactionExitTransact() db.compactionExitTransact()
} }
case _, _ = <-db.closeC: case <-db.closeC:
db.logf("%s exiting", name) db.logf("%s exiting", name)
db.compactionExitTransact() db.compactionExitTransact()
} }
@ -224,7 +224,7 @@ func (db *DB) compactionTransact(name string, t compactionTransactInterface) {
} }
select { select {
case <-backoffT.C: case <-backoffT.C:
case _, _ = <-db.closeC: case <-db.closeC:
db.logf("%s exiting", name) db.logf("%s exiting", name)
db.compactionExitTransact() db.compactionExitTransact()
} }
@ -288,7 +288,7 @@ func (db *DB) memCompaction() {
case <-db.compPerErrC: case <-db.compPerErrC:
close(resumeC) close(resumeC)
resumeC = nil resumeC = nil
case _, _ = <-db.closeC: case <-db.closeC:
return return
} }
@ -337,7 +337,7 @@ func (db *DB) memCompaction() {
select { select {
case <-resumeC: case <-resumeC:
close(resumeC) close(resumeC)
case _, _ = <-db.closeC: case <-db.closeC:
return return
} }
} }
@ -378,7 +378,7 @@ func (b *tableCompactionBuilder) appendKV(key, value []byte) error {
select { select {
case ch := <-b.db.tcompPauseC: case ch := <-b.db.tcompPauseC:
b.db.pauseCompaction(ch) b.db.pauseCompaction(ch)
case _, _ = <-b.db.closeC: case <-b.db.closeC:
b.db.compactionExitTransact() b.db.compactionExitTransact()
default: default:
} }
@ -643,7 +643,7 @@ func (db *DB) tableNeedCompaction() bool {
func (db *DB) pauseCompaction(ch chan<- struct{}) { func (db *DB) pauseCompaction(ch chan<- struct{}) {
select { select {
case ch <- struct{}{}: case ch <- struct{}{}:
case _, _ = <-db.closeC: case <-db.closeC:
db.compactionExitTransact() db.compactionExitTransact()
} }
} }
@ -697,14 +697,14 @@ func (db *DB) compTriggerWait(compC chan<- cCmd) (err error) {
case compC <- cAuto{ch}: case compC <- cAuto{ch}:
case err = <-db.compErrC: case err = <-db.compErrC:
return return
case _, _ = <-db.closeC: case <-db.closeC:
return ErrClosed return ErrClosed
} }
// Wait cmd. // Wait cmd.
select { select {
case err = <-ch: case err = <-ch:
case err = <-db.compErrC: case err = <-db.compErrC:
case _, _ = <-db.closeC: case <-db.closeC:
return ErrClosed return ErrClosed
} }
return err return err
@ -719,14 +719,14 @@ func (db *DB) compTriggerRange(compC chan<- cCmd, level int, min, max []byte) (e
case compC <- cRange{level, min, max, ch}: case compC <- cRange{level, min, max, ch}:
case err := <-db.compErrC: case err := <-db.compErrC:
return err return err
case _, _ = <-db.closeC: case <-db.closeC:
return ErrClosed return ErrClosed
} }
// Wait cmd. // Wait cmd.
select { select {
case err = <-ch: case err = <-ch:
case err = <-db.compErrC: case err = <-db.compErrC:
case _, _ = <-db.closeC: case <-db.closeC:
return ErrClosed return ErrClosed
} }
return err return err
@ -758,7 +758,7 @@ func (db *DB) mCompaction() {
default: default:
panic("leveldb: unknown command") panic("leveldb: unknown command")
} }
case _, _ = <-db.closeC: case <-db.closeC:
return return
} }
} }
@ -791,7 +791,7 @@ func (db *DB) tCompaction() {
case ch := <-db.tcompPauseC: case ch := <-db.tcompPauseC:
db.pauseCompaction(ch) db.pauseCompaction(ch)
continue continue
case _, _ = <-db.closeC: case <-db.closeC:
return return
default: default:
} }
@ -806,7 +806,7 @@ func (db *DB) tCompaction() {
case ch := <-db.tcompPauseC: case ch := <-db.tcompPauseC:
db.pauseCompaction(ch) db.pauseCompaction(ch)
continue continue
case _, _ = <-db.closeC: case <-db.closeC:
return return
} }
} }

View File

@ -59,7 +59,7 @@ func (db *DB) releaseSnapshot(se *snapshotElement) {
} }
} }
// Gets minimum sequence that not being snapshoted. // Gets minimum sequence that not being snapshotted.
func (db *DB) minSeq() uint64 { func (db *DB) minSeq() uint64 {
db.snapsMu.Lock() db.snapsMu.Lock()
defer db.snapsMu.Unlock() defer db.snapsMu.Unlock()
@ -131,7 +131,7 @@ func (snap *Snapshot) Has(key []byte, ro *opt.ReadOptions) (ret bool, err error)
} }
// NewIterator returns an iterator for the snapshot of the underlying DB. // NewIterator returns an iterator for the snapshot of the underlying DB.
// The returned iterator is not goroutine-safe, but it is safe to use // The returned iterator is not safe for concurrent use, but it is safe to use
// multiple iterators concurrently, with each in a dedicated goroutine. // multiple iterators concurrently, with each in a dedicated goroutine.
// It is also safe to use an iterator concurrently with modifying its // It is also safe to use an iterator concurrently with modifying its
// underlying DB. The resultant key/value pairs are guaranteed to be // underlying DB. The resultant key/value pairs are guaranteed to be

View File

@ -99,7 +99,7 @@ func (db *DB) mpoolDrain() {
case <-db.memPool: case <-db.memPool:
default: default:
} }
case _, _ = <-db.closeC: case <-db.closeC:
ticker.Stop() ticker.Stop()
// Make sure the pool is drained. // Make sure the pool is drained.
select { select {
@ -164,7 +164,7 @@ func (db *DB) getMems() (e, f *memDB) {
return db.mem, db.frozenMem return db.mem, db.frozenMem
} }
// Get frozen memdb. // Get effective memdb.
func (db *DB) getEffectiveMem() *memDB { func (db *DB) getEffectiveMem() *memDB {
db.memMu.RLock() db.memMu.RLock()
defer db.memMu.RUnlock() defer db.memMu.RUnlock()

View File

@ -1877,7 +1877,7 @@ func TestDB_ConcurrentIterator(t *testing.T) {
} }
func TestDB_ConcurrentWrite(t *testing.T) { func TestDB_ConcurrentWrite(t *testing.T) {
const n, niter = 10, 10000 const n, bk, niter = 10, 3, 10000
h := newDbHarness(t) h := newDbHarness(t)
defer h.close() defer h.close()
@ -1889,7 +1889,7 @@ func TestDB_ConcurrentWrite(t *testing.T) {
go func(i int) { go func(i int) {
defer wg.Done() defer wg.Done()
for k := 0; k < niter; k++ { for k := 0; k < niter; k++ {
kstr := fmt.Sprintf("%d.%d", i, k) kstr := fmt.Sprintf("put-%d.%d", i, k)
vstr := fmt.Sprintf("v%d", k) vstr := fmt.Sprintf("v%d", k)
h.put(kstr, vstr) h.put(kstr, vstr)
// Key should immediately available after put returns. // Key should immediately available after put returns.
@ -1897,6 +1897,24 @@ func TestDB_ConcurrentWrite(t *testing.T) {
} }
}(i) }(i)
} }
for i := 0; i < n; i++ {
wg.Add(1)
batch := &Batch{}
go func(i int) {
defer wg.Done()
for k := 0; k < niter; k++ {
batch.Reset()
for j := 0; j < bk; j++ {
batch.Put([]byte(fmt.Sprintf("batch-%d.%d.%d", i, k, j)), []byte(fmt.Sprintf("v%d", k)))
}
h.write(batch)
// Key should immediately available after put returns.
for j := 0; j < bk; j++ {
h.getVal(fmt.Sprintf("batch-%d.%d.%d", i, k, j), fmt.Sprintf("v%d", k))
}
}
}(i)
}
wg.Wait() wg.Wait()
} }

View File

@ -59,8 +59,8 @@ func (tr *Transaction) Has(key []byte, ro *opt.ReadOptions) (bool, error) {
} }
// NewIterator returns an iterator for the latest snapshot of the transaction. // NewIterator returns an iterator for the latest snapshot of the transaction.
// The returned iterator is not goroutine-safe, but it is safe to use multiple // The returned iterator is not safe for concurrent use, but it is safe to use
// iterators concurrently, with each in a dedicated goroutine. // multiple iterators concurrently, with each in a dedicated goroutine.
// It is also safe to use an iterator concurrently while writes to the // It is also safe to use an iterator concurrently while writes to the
// transaction. The resultant key/value pairs are guaranteed to be consistent. // transaction. The resultant key/value pairs are guaranteed to be consistent.
// //
@ -167,8 +167,8 @@ func (tr *Transaction) Write(b *Batch, wo *opt.WriteOptions) error {
if tr.closed { if tr.closed {
return errTransactionDone return errTransactionDone
} }
return b.decodeRec(func(i int, kt keyType, key, value []byte) error { return b.replayInternal(func(i int, kt keyType, k, v []byte) error {
return tr.put(kt, key, value) return tr.put(kt, k, v)
}) })
} }
@ -179,7 +179,8 @@ func (tr *Transaction) setDone() {
<-tr.db.writeLockC <-tr.db.writeLockC
} }
// Commit commits the transaction. // Commit commits the transaction. If error is not nil, then the transaction is
// not committed, it can then either be retried or discarded.
// //
// Other methods should not be called after transaction has been committed. // Other methods should not be called after transaction has been committed.
func (tr *Transaction) Commit() error { func (tr *Transaction) Commit() error {
@ -192,24 +193,27 @@ func (tr *Transaction) Commit() error {
if tr.closed { if tr.closed {
return errTransactionDone return errTransactionDone
} }
defer tr.setDone()
if err := tr.flush(); err != nil { if err := tr.flush(); err != nil {
tr.discard() // Return error, lets user decide either to retry or discard
// transaction.
return err return err
} }
if len(tr.tables) != 0 { if len(tr.tables) != 0 {
// Committing transaction. // Committing transaction.
tr.rec.setSeqNum(tr.seq) tr.rec.setSeqNum(tr.seq)
tr.db.compCommitLk.Lock() tr.db.compCommitLk.Lock()
defer tr.db.compCommitLk.Unlock() tr.stats.startTimer()
var cerr error
for retry := 0; retry < 3; retry++ { for retry := 0; retry < 3; retry++ {
if err := tr.db.s.commit(&tr.rec); err != nil { cerr = tr.db.s.commit(&tr.rec)
tr.db.logf("transaction@commit error R·%d %q", retry, err) if cerr != nil {
tr.db.logf("transaction@commit error R·%d %q", retry, cerr)
select { select {
case <-time.After(time.Second): case <-time.After(time.Second):
case _, _ = <-tr.db.closeC: case <-tr.db.closeC:
tr.db.logf("transaction@commit exiting") tr.db.logf("transaction@commit exiting")
return err tr.db.compCommitLk.Unlock()
return cerr
} }
} else { } else {
// Success. Set db.seq. // Success. Set db.seq.
@ -217,9 +221,26 @@ func (tr *Transaction) Commit() error {
break break
} }
} }
tr.stats.stopTimer()
if cerr != nil {
// Return error, lets user decide either to retry or discard
// transaction.
return cerr
}
// Update compaction stats. This is safe as long as we hold compCommitLk.
tr.db.compStats.addStat(0, &tr.stats)
// Trigger table auto-compaction. // Trigger table auto-compaction.
tr.db.compTrigger(tr.db.tcompCmdC) tr.db.compTrigger(tr.db.tcompCmdC)
tr.db.compCommitLk.Unlock()
// Additionally, wait compaction when certain threshold reached.
// Ignore error, returns error only if transaction can't be committed.
tr.db.waitCompaction()
} }
// Only mark as done if transaction committed successfully.
tr.setDone()
return nil return nil
} }
@ -245,10 +266,20 @@ func (tr *Transaction) Discard() {
tr.lk.Unlock() tr.lk.Unlock()
} }
func (db *DB) waitCompaction() error {
if db.s.tLen(0) >= db.s.o.GetWriteL0PauseTrigger() {
return db.compTriggerWait(db.tcompCmdC)
}
return nil
}
// OpenTransaction opens an atomic DB transaction. Only one transaction can be // OpenTransaction opens an atomic DB transaction. Only one transaction can be
// opened at a time. Write will be blocked until the transaction is committed or // opened at a time. Subsequent call to Write and OpenTransaction will be blocked
// discarded. // until in-flight transaction is committed or discarded.
// The returned transaction handle is goroutine-safe. // The returned transaction handle is safe for concurrent use.
//
// Transaction is expensive and can overwhelm compaction, especially if
// transaction size is small. Use with caution.
// //
// The transaction must be closed once done, either by committing or discarding // The transaction must be closed once done, either by committing or discarding
// the transaction. // the transaction.
@ -263,7 +294,7 @@ func (db *DB) OpenTransaction() (*Transaction, error) {
case db.writeLockC <- struct{}{}: case db.writeLockC <- struct{}{}:
case err := <-db.compPerErrC: case err := <-db.compPerErrC:
return nil, err return nil, err
case _, _ = <-db.closeC: case <-db.closeC:
return nil, ErrClosed return nil, ErrClosed
} }
@ -278,6 +309,11 @@ func (db *DB) OpenTransaction() (*Transaction, error) {
} }
} }
// Wait compaction when certain threshold reached.
if err := db.waitCompaction(); err != nil {
return nil, err
}
tr := &Transaction{ tr := &Transaction{
db: db, db: db,
seq: db.seq, seq: db.seq,

View File

@ -62,7 +62,7 @@ func (db *DB) checkAndCleanFiles() error {
case storage.TypeManifest: case storage.TypeManifest:
keep = fd.Num >= db.s.manifestFd.Num keep = fd.Num >= db.s.manifestFd.Num
case storage.TypeJournal: case storage.TypeJournal:
if !db.frozenJournalFd.Nil() { if !db.frozenJournalFd.Zero() {
keep = fd.Num >= db.frozenJournalFd.Num keep = fd.Num >= db.frozenJournalFd.Num
} else { } else {
keep = fd.Num >= db.journalFd.Num keep = fd.Num >= db.journalFd.Num

View File

@ -14,37 +14,23 @@ import (
"github.com/syndtr/goleveldb/leveldb/util" "github.com/syndtr/goleveldb/leveldb/util"
) )
func (db *DB) writeJournal(b *Batch) error { func (db *DB) writeJournal(batches []*Batch, seq uint64, sync bool) error {
w, err := db.journal.Next() wr, err := db.journal.Next()
if err != nil { if err != nil {
return err return err
} }
if _, err := w.Write(b.encode()); err != nil { if err := writeBatchesWithHeader(wr, batches, seq); err != nil {
return err return err
} }
if err := db.journal.Flush(); err != nil { if err := db.journal.Flush(); err != nil {
return err return err
} }
if b.sync { if sync {
return db.journalWriter.Sync() return db.journalWriter.Sync()
} }
return nil return nil
} }
func (db *DB) jWriter() {
defer db.closeW.Done()
for {
select {
case b := <-db.journalC:
if b != nil {
db.journalAckC <- db.writeJournal(b)
}
case _, _ = <-db.closeC:
return
}
}
}
func (db *DB) rotateMem(n int, wait bool) (mem *memDB, err error) { func (db *DB) rotateMem(n int, wait bool) (mem *memDB, err error) {
// Wait for pending memdb compaction. // Wait for pending memdb compaction.
err = db.compTriggerWait(db.mcompCmdC) err = db.compTriggerWait(db.mcompCmdC)
@ -69,9 +55,9 @@ func (db *DB) rotateMem(n int, wait bool) (mem *memDB, err error) {
func (db *DB) flush(n int) (mdb *memDB, mdbFree int, err error) { func (db *DB) flush(n int) (mdb *memDB, mdbFree int, err error) {
delayed := false delayed := false
slowdownTrigger := db.s.o.GetWriteL0SlowdownTrigger()
pauseTrigger := db.s.o.GetWriteL0PauseTrigger()
flush := func() (retry bool) { flush := func() (retry bool) {
v := db.s.version()
defer v.release()
mdb = db.getEffectiveMem() mdb = db.getEffectiveMem()
if mdb == nil { if mdb == nil {
err = ErrClosed err = ErrClosed
@ -83,14 +69,15 @@ func (db *DB) flush(n int) (mdb *memDB, mdbFree int, err error) {
mdb = nil mdb = nil
} }
}() }()
tLen := db.s.tLen(0)
mdbFree = mdb.Free() mdbFree = mdb.Free()
switch { switch {
case v.tLen(0) >= db.s.o.GetWriteL0SlowdownTrigger() && !delayed: case tLen >= slowdownTrigger && !delayed:
delayed = true delayed = true
time.Sleep(time.Millisecond) time.Sleep(time.Millisecond)
case mdbFree >= n: case mdbFree >= n:
return false return false
case v.tLen(0) >= db.s.o.GetWriteL0PauseTrigger(): case tLen >= pauseTrigger:
delayed = true delayed = true
err = db.compTriggerWait(db.tcompCmdC) err = db.compTriggerWait(db.tcompCmdC)
if err != nil { if err != nil {
@ -127,159 +114,250 @@ func (db *DB) flush(n int) (mdb *memDB, mdbFree int, err error) {
return return
} }
// Write apply the given batch to the DB. The batch will be applied type writeMerge struct {
// sequentially. sync bool
// batch *Batch
// It is safe to modify the contents of the arguments after Write returns. keyType keyType
func (db *DB) Write(b *Batch, wo *opt.WriteOptions) (err error) { key, value []byte
err = db.ok() }
if err != nil || b == nil || b.Len() == 0 {
return func (db *DB) unlockWrite(overflow bool, merged int, err error) {
for i := 0; i < merged; i++ {
db.writeAckC <- err
}
if overflow {
// Pass lock to the next write (that failed to merge).
db.writeMergedC <- false
} else {
// Release lock.
<-db.writeLockC
}
}
// ourBatch if defined should equal with batch.
func (db *DB) writeLocked(batch, ourBatch *Batch, merge, sync bool) error {
// Try to flush memdb. This method would also trying to throttle writes
// if it is too fast and compaction cannot catch-up.
mdb, mdbFree, err := db.flush(batch.internalLen)
if err != nil {
db.unlockWrite(false, 0, err)
return err
}
defer mdb.decref()
var (
overflow bool
merged int
batches = []*Batch{batch}
)
if merge {
// Merge limit.
var mergeLimit int
if batch.internalLen > 128<<10 {
mergeLimit = (1 << 20) - batch.internalLen
} else {
mergeLimit = 128 << 10
}
mergeCap := mdbFree - batch.internalLen
if mergeLimit > mergeCap {
mergeLimit = mergeCap
}
merge:
for mergeLimit > 0 {
select {
case incoming := <-db.writeMergeC:
if incoming.batch != nil {
// Merge batch.
if incoming.batch.internalLen > mergeLimit {
overflow = true
break merge
}
batches = append(batches, incoming.batch)
mergeLimit -= incoming.batch.internalLen
} else {
// Merge put.
internalLen := len(incoming.key) + len(incoming.value) + 8
if internalLen > mergeLimit {
overflow = true
break merge
}
if ourBatch == nil {
ourBatch = db.batchPool.Get().(*Batch)
ourBatch.Reset()
batches = append(batches, ourBatch)
}
// We can use same batch since concurrent write doesn't
// guarantee write order.
ourBatch.appendRec(incoming.keyType, incoming.key, incoming.value)
mergeLimit -= internalLen
}
sync = sync || incoming.sync
merged++
db.writeMergedC <- true
default:
break merge
}
}
} }
b.init(wo.GetSync() && !db.s.o.GetNoSync()) // Seq number.
seq := db.seq + 1
if b.size() > db.s.o.GetWriteBuffer() && !db.s.o.GetDisableLargeBatchTransaction() { // Write journal.
// Writes using transaction. if err := db.writeJournal(batches, seq, sync); err != nil {
tr, err1 := db.OpenTransaction() db.unlockWrite(overflow, merged, err)
if err1 != nil { return err
return err1 }
// Put batches.
for _, batch := range batches {
if err := batch.putMem(seq, mdb.DB); err != nil {
panic(err)
} }
if err1 := tr.Write(b, wo); err1 != nil { seq += uint64(batch.Len())
}
// Incr seq number.
db.addSeq(uint64(batchesLen(batches)))
// Rotate memdb if it's reach the threshold.
if batch.internalLen >= mdbFree {
db.rotateMem(0, false)
}
db.unlockWrite(overflow, merged, nil)
return nil
}
// Write apply the given batch to the DB. The batch records will be applied
// sequentially. Write might be used concurrently, when used concurrently and
// batch is small enough, write will try to merge the batches. Set NoWriteMerge
// option to true to disable write merge.
//
// It is safe to modify the contents of the arguments after Write returns but
// not before. Write will not modify content of the batch.
func (db *DB) Write(batch *Batch, wo *opt.WriteOptions) error {
if err := db.ok(); err != nil || batch == nil || batch.Len() == 0 {
return err
}
// If the batch size is larger than write buffer, it may justified to write
// using transaction instead. Using transaction the batch will be written
// into tables directly, skipping the journaling.
if batch.internalLen > db.s.o.GetWriteBuffer() && !db.s.o.GetDisableLargeBatchTransaction() {
tr, err := db.OpenTransaction()
if err != nil {
return err
}
if err := tr.Write(batch, wo); err != nil {
tr.Discard() tr.Discard()
return err1 return err
} }
return tr.Commit() return tr.Commit()
} }
// The write happen synchronously. merge := !wo.GetNoWriteMerge() && !db.s.o.GetNoWriteMerge()
select { sync := wo.GetSync() && !db.s.o.GetNoSync()
case db.writeC <- b:
if <-db.writeMergedC {
return <-db.writeAckC
}
// Continue, the write lock already acquired by previous writer
// and handed out to us.
case db.writeLockC <- struct{}{}:
case err = <-db.compPerErrC:
return
case _, _ = <-db.closeC:
return ErrClosed
}
merged := 0 // Acquire write lock.
danglingMerge := false if merge {
defer func() {
for i := 0; i < merged; i++ {
db.writeAckC <- err
}
if danglingMerge {
// Only one dangling merge at most, so this is safe.
db.writeMergedC <- false
} else {
<-db.writeLockC
}
}()
mdb, mdbFree, err := db.flush(b.size())
if err != nil {
return
}
defer mdb.decref()
// Calculate maximum size of the batch.
m := 1 << 20
if x := b.size(); x <= 128<<10 {
m = x + (128 << 10)
}
m = minInt(m, mdbFree)
// Merge with other batch.
drain:
for b.size() < m && !b.sync {
select { select {
case nb := <-db.writeC: case db.writeMergeC <- writeMerge{sync: sync, batch: batch}:
if b.size()+nb.size() <= m { if <-db.writeMergedC {
b.append(nb) // Write is merged.
db.writeMergedC <- true return <-db.writeAckC
merged++
} else {
danglingMerge = true
break drain
} }
default: // Write is not merged, the write lock is handed to us. Continue.
break drain case db.writeLockC <- struct{}{}:
} // Write lock acquired.
} case err := <-db.compPerErrC:
// Compaction error.
// Set batch first seq number relative from last seq. return err
b.seq = db.seq + 1 case <-db.closeC:
// Closed
// Write journal concurrently if it is large enough. return ErrClosed
if b.size() >= (128 << 10) {
// Push the write batch to the journal writer
select {
case db.journalC <- b:
// Write into memdb
if berr := b.memReplay(mdb.DB); berr != nil {
panic(berr)
}
case err = <-db.compPerErrC:
return
case _, _ = <-db.closeC:
err = ErrClosed
return
}
// Wait for journal writer
select {
case err = <-db.journalAckC:
if err != nil {
// Revert memdb if error detected
if berr := b.revertMemReplay(mdb.DB); berr != nil {
panic(berr)
}
return
}
case _, _ = <-db.closeC:
err = ErrClosed
return
} }
} else { } else {
err = db.writeJournal(b) select {
if err != nil { case db.writeLockC <- struct{}{}:
return // Write lock acquired.
} case err := <-db.compPerErrC:
if berr := b.memReplay(mdb.DB); berr != nil { // Compaction error.
panic(berr) return err
case <-db.closeC:
// Closed
return ErrClosed
} }
} }
// Set last seq number. return db.writeLocked(batch, nil, merge, sync)
db.addSeq(uint64(b.Len())) }
if b.size() >= mdbFree { func (db *DB) putRec(kt keyType, key, value []byte, wo *opt.WriteOptions) error {
db.rotateMem(0, false) if err := db.ok(); err != nil {
return err
} }
return
merge := !wo.GetNoWriteMerge() && !db.s.o.GetNoWriteMerge()
sync := wo.GetSync() && !db.s.o.GetNoSync()
// Acquire write lock.
if merge {
select {
case db.writeMergeC <- writeMerge{sync: sync, keyType: kt, key: key, value: value}:
if <-db.writeMergedC {
// Write is merged.
return <-db.writeAckC
}
// Write is not merged, the write lock is handed to us. Continue.
case db.writeLockC <- struct{}{}:
// Write lock acquired.
case err := <-db.compPerErrC:
// Compaction error.
return err
case <-db.closeC:
// Closed
return ErrClosed
}
} else {
select {
case db.writeLockC <- struct{}{}:
// Write lock acquired.
case err := <-db.compPerErrC:
// Compaction error.
return err
case <-db.closeC:
// Closed
return ErrClosed
}
}
batch := db.batchPool.Get().(*Batch)
batch.Reset()
batch.appendRec(kt, key, value)
return db.writeLocked(batch, batch, merge, sync)
} }
// Put sets the value for the given key. It overwrites any previous value // Put sets the value for the given key. It overwrites any previous value
// for that key; a DB is not a multi-map. // for that key; a DB is not a multi-map. Write merge also applies for Put, see
// Write.
// //
// It is safe to modify the contents of the arguments after Put returns. // It is safe to modify the contents of the arguments after Put returns but not
// before.
func (db *DB) Put(key, value []byte, wo *opt.WriteOptions) error { func (db *DB) Put(key, value []byte, wo *opt.WriteOptions) error {
b := new(Batch) return db.putRec(keyTypeVal, key, value, wo)
b.Put(key, value)
return db.Write(b, wo)
} }
// Delete deletes the value for the given key. // Delete deletes the value for the given key. Delete will not returns error if
// key doesn't exist. Write merge also applies for Delete, see Write.
// //
// It is safe to modify the contents of the arguments after Delete returns. // It is safe to modify the contents of the arguments after Delete returns but
// not before.
func (db *DB) Delete(key []byte, wo *opt.WriteOptions) error { func (db *DB) Delete(key []byte, wo *opt.WriteOptions) error {
b := new(Batch) return db.putRec(keyTypeDel, key, nil, wo)
b.Delete(key)
return db.Write(b, wo)
} }
func isMemOverlaps(icmp *iComparer, mem *memdb.DB, min, max []byte) bool { func isMemOverlaps(icmp *iComparer, mem *memdb.DB, min, max []byte) bool {
@ -308,7 +386,7 @@ func (db *DB) CompactRange(r util.Range) error {
case db.writeLockC <- struct{}{}: case db.writeLockC <- struct{}{}:
case err := <-db.compPerErrC: case err := <-db.compPerErrC:
return err return err
case _, _ = <-db.closeC: case <-db.closeC:
return ErrClosed return ErrClosed
} }
@ -348,7 +426,7 @@ func (db *DB) SetReadOnly() error {
db.compWriteLocking = true db.compWriteLocking = true
case err := <-db.compPerErrC: case err := <-db.compPerErrC:
return err return err
case _, _ = <-db.closeC: case <-db.closeC:
return ErrClosed return ErrClosed
} }
@ -357,7 +435,7 @@ func (db *DB) SetReadOnly() error {
case db.compErrSetC <- ErrReadOnly: case db.compErrSetC <- ErrReadOnly:
case perr := <-db.compPerErrC: case perr := <-db.compPerErrC:
return perr return perr
case _, _ = <-db.closeC: case <-db.closeC:
return ErrClosed return ErrClosed
} }

View File

@ -10,6 +10,7 @@ import (
"github.com/syndtr/goleveldb/leveldb/errors" "github.com/syndtr/goleveldb/leveldb/errors"
) )
// Common errors.
var ( var (
ErrNotFound = errors.ErrNotFound ErrNotFound = errors.ErrNotFound
ErrReadOnly = errors.New("leveldb: read-only mode") ErrReadOnly = errors.New("leveldb: read-only mode")

View File

@ -15,6 +15,7 @@ import (
"github.com/syndtr/goleveldb/leveldb/util" "github.com/syndtr/goleveldb/leveldb/util"
) )
// Common errors.
var ( var (
ErrNotFound = New("leveldb: not found") ErrNotFound = New("leveldb: not found")
ErrReleased = util.ErrReleased ErrReleased = util.ErrReleased
@ -34,11 +35,10 @@ type ErrCorrupted struct {
} }
func (e *ErrCorrupted) Error() string { func (e *ErrCorrupted) Error() string {
if !e.Fd.Nil() { if !e.Fd.Zero() {
return fmt.Sprintf("%v [file=%v]", e.Err, e.Fd) return fmt.Sprintf("%v [file=%v]", e.Err, e.Fd)
} else {
return e.Err.Error()
} }
return e.Err.Error()
} }
// NewErrCorrupted creates new ErrCorrupted error. // NewErrCorrupted creates new ErrCorrupted error.

View File

@ -32,12 +32,12 @@ var _ = testutil.Defer(func() {
db := newTestingDB(o, nil, nil) db := newTestingDB(o, nil, nil)
t := testutil.DBTesting{ t := testutil.DBTesting{
DB: db, DB: db,
Deleted: testutil.KeyValue_Generate(nil, 500, 1, 50, 5, 5).Clone(), Deleted: testutil.KeyValue_Generate(nil, 500, 1, 1, 50, 5, 5).Clone(),
} }
testutil.DoDBTesting(&t) testutil.DoDBTesting(&t)
db.TestClose() db.TestClose()
done <- true done <- true
}, 20.0) }, 80.0)
}) })
Describe("read test", func() { Describe("read test", func() {
@ -66,7 +66,7 @@ var _ = testutil.Defer(func() {
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
t0 := &testutil.DBTesting{ t0 := &testutil.DBTesting{
DB: tr, DB: tr,
Deleted: testutil.KeyValue_Generate(nil, 200, 1, 50, 5, 5).Clone(), Deleted: testutil.KeyValue_Generate(nil, 200, 1, 1, 50, 5, 5).Clone(),
} }
testutil.DoDBTesting(t0) testutil.DoDBTesting(t0)
testutil.TestGet(tr, t0.Present) testutil.TestGet(tr, t0.Present)
@ -111,7 +111,7 @@ var _ = testutil.Defer(func() {
db.TestClose() db.TestClose()
done <- true done <- true
}, 30.0) }, 240.0)
}) })
}) })
}) })

View File

@ -17,7 +17,7 @@ var _ = testutil.Defer(func() {
Describe("Array iterator", func() { Describe("Array iterator", func() {
It("Should iterates and seeks correctly", func() { It("Should iterates and seeks correctly", func() {
// Build key/value. // Build key/value.
kv := testutil.KeyValue_Generate(nil, 70, 1, 5, 3, 3) kv := testutil.KeyValue_Generate(nil, 70, 1, 1, 5, 3, 3)
// Test the iterator. // Test the iterator.
t := testutil.IteratorTesting{ t := testutil.IteratorTesting{

View File

@ -52,7 +52,7 @@ var _ = testutil.Defer(func() {
for _, x := range n { for _, x := range n {
sum += x sum += x
} }
kv := testutil.KeyValue_Generate(nil, sum, 1, 10, 4, 4) kv := testutil.KeyValue_Generate(nil, sum, 1, 1, 10, 4, 4)
for i, j := 0, 0; i < len(n); i++ { for i, j := 0, 0; i < len(n); i++ {
for x := n[i]; x > 0; x-- { for x := n[i]; x > 0; x-- {
key, value := kv.Index(j) key, value := kv.Index(j)
@ -69,7 +69,7 @@ var _ = testutil.Defer(func() {
} }
testutil.DoIteratorTesting(&t) testutil.DoIteratorTesting(&t)
done <- true done <- true
}, 1.5) }, 15.0)
} }
} }

View File

@ -21,13 +21,13 @@ var (
// IteratorSeeker is the interface that wraps the 'seeks method'. // IteratorSeeker is the interface that wraps the 'seeks method'.
type IteratorSeeker interface { type IteratorSeeker interface {
// First moves the iterator to the first key/value pair. If the iterator // First moves the iterator to the first key/value pair. If the iterator
// only contains one key/value pair then First and Last whould moves // only contains one key/value pair then First and Last would moves
// to the same key/value pair. // to the same key/value pair.
// It returns whether such pair exist. // It returns whether such pair exist.
First() bool First() bool
// Last moves the iterator to the last key/value pair. If the iterator // Last moves the iterator to the last key/value pair. If the iterator
// only contains one key/value pair then First and Last whould moves // only contains one key/value pair then First and Last would moves
// to the same key/value pair. // to the same key/value pair.
// It returns whether such pair exist. // It returns whether such pair exist.
Last() bool Last() bool
@ -48,7 +48,7 @@ type IteratorSeeker interface {
Prev() bool Prev() bool
} }
// CommonIterator is the interface that wraps common interator methods. // CommonIterator is the interface that wraps common iterator methods.
type CommonIterator interface { type CommonIterator interface {
IteratorSeeker IteratorSeeker
@ -71,14 +71,15 @@ type CommonIterator interface {
// Iterator iterates over a DB's key/value pairs in key order. // Iterator iterates over a DB's key/value pairs in key order.
// //
// When encouter an error any 'seeks method' will return false and will // When encounter an error any 'seeks method' will return false and will
// yield no key/value pairs. The error can be queried by calling the Error // yield no key/value pairs. The error can be queried by calling the Error
// method. Calling Release is still necessary. // method. Calling Release is still necessary.
// //
// An iterator must be released after use, but it is not necessary to read // An iterator must be released after use, but it is not necessary to read
// an iterator until exhaustion. // an iterator until exhaustion.
// Also, an iterator is not necessarily goroutine-safe, but it is safe to use // Also, an iterator is not necessarily safe for concurrent use, but it is
// multiple iterators concurrently, with each in a dedicated goroutine. // safe to use multiple iterators concurrently, with each in a dedicated
// goroutine.
type Iterator interface { type Iterator interface {
CommonIterator CommonIterator
@ -98,7 +99,7 @@ type Iterator interface {
// //
// ErrorCallbackSetter implemented by indexed and merged iterator. // ErrorCallbackSetter implemented by indexed and merged iterator.
type ErrorCallbackSetter interface { type ErrorCallbackSetter interface {
// SetErrorCallback allows set an error callback of the coresponding // SetErrorCallback allows set an error callback of the corresponding
// iterator. Use nil to clear the callback. // iterator. Use nil to clear the callback.
SetErrorCallback(f func(err error)) SetErrorCallback(f func(err error))
} }

View File

@ -24,7 +24,7 @@ var _ = testutil.Defer(func() {
// Build key/value. // Build key/value.
filledKV := make([]testutil.KeyValue, filled) filledKV := make([]testutil.KeyValue, filled)
kv := testutil.KeyValue_Generate(nil, 100, 1, 10, 4, 4) kv := testutil.KeyValue_Generate(nil, 100, 1, 1, 10, 4, 4)
kv.Iterate(func(i int, key, value []byte) { kv.Iterate(func(i int, key, value []byte) {
filledKV[rnd.Intn(filled)].Put(key, value) filledKV[rnd.Intn(filled)].Put(key, value)
}) })
@ -49,7 +49,7 @@ var _ = testutil.Defer(func() {
} }
testutil.DoIteratorTesting(&t) testutil.DoIteratorTesting(&t)
done <- true done <- true
}, 1.5) }, 15.0)
} }
} }

View File

@ -180,34 +180,37 @@ func (r *Reader) nextChunk(first bool) error {
checksum := binary.LittleEndian.Uint32(r.buf[r.j+0 : r.j+4]) checksum := binary.LittleEndian.Uint32(r.buf[r.j+0 : r.j+4])
length := binary.LittleEndian.Uint16(r.buf[r.j+4 : r.j+6]) length := binary.LittleEndian.Uint16(r.buf[r.j+4 : r.j+6])
chunkType := r.buf[r.j+6] chunkType := r.buf[r.j+6]
unprocBlock := r.n - r.j
if checksum == 0 && length == 0 && chunkType == 0 { if checksum == 0 && length == 0 && chunkType == 0 {
// Drop entire block. // Drop entire block.
m := r.n - r.j
r.i = r.n r.i = r.n
r.j = r.n r.j = r.n
return r.corrupt(m, "zero header", false) return r.corrupt(unprocBlock, "zero header", false)
} else { }
m := r.n - r.j if chunkType < fullChunkType || chunkType > lastChunkType {
r.i = r.j + headerSize // Drop entire block.
r.j = r.j + headerSize + int(length) r.i = r.n
if r.j > r.n { r.j = r.n
// Drop entire block. return r.corrupt(unprocBlock, fmt.Sprintf("invalid chunk type %#x", chunkType), false)
r.i = r.n }
r.j = r.n r.i = r.j + headerSize
return r.corrupt(m, "chunk length overflows block", false) r.j = r.j + headerSize + int(length)
} else if r.checksum && checksum != util.NewCRC(r.buf[r.i-1:r.j]).Value() { if r.j > r.n {
// Drop entire block. // Drop entire block.
r.i = r.n r.i = r.n
r.j = r.n r.j = r.n
return r.corrupt(m, "checksum mismatch", false) return r.corrupt(unprocBlock, "chunk length overflows block", false)
} } else if r.checksum && checksum != util.NewCRC(r.buf[r.i-1:r.j]).Value() {
// Drop entire block.
r.i = r.n
r.j = r.n
return r.corrupt(unprocBlock, "checksum mismatch", false)
} }
if first && chunkType != fullChunkType && chunkType != firstChunkType { if first && chunkType != fullChunkType && chunkType != firstChunkType {
m := r.j - r.i chunkLength := (r.j - r.i) + headerSize
r.i = r.j r.i = r.j
// Report the error, but skip it. // Report the error, but skip it.
return r.corrupt(m+headerSize, "orphan chunk", true) return r.corrupt(chunkLength, "orphan chunk", true)
} }
r.last = chunkType == fullChunkType || chunkType == lastChunkType r.last = chunkType == fullChunkType || chunkType == lastChunkType
return nil return nil

View File

@ -37,14 +37,14 @@ func (kt keyType) String() string {
case keyTypeVal: case keyTypeVal:
return "v" return "v"
} }
return "x" return fmt.Sprintf("<invalid:%#x>", uint(kt))
} }
// Value types encoded as the last component of internal keys. // Value types encoded as the last component of internal keys.
// Don't modify; this value are saved to disk. // Don't modify; this value are saved to disk.
const ( const (
keyTypeDel keyType = iota keyTypeDel = keyType(0)
keyTypeVal keyTypeVal = keyType(1)
) )
// keyTypeSeek defines the keyType that should be passed when constructing an // keyTypeSeek defines the keyType that should be passed when constructing an
@ -79,11 +79,7 @@ func makeInternalKey(dst, ukey []byte, seq uint64, kt keyType) internalKey {
panic("leveldb: invalid type") panic("leveldb: invalid type")
} }
if n := len(ukey) + 8; cap(dst) < n { dst = ensureBuffer(dst, len(ukey)+8)
dst = make([]byte, n)
} else {
dst = dst[:n]
}
copy(dst, ukey) copy(dst, ukey)
binary.LittleEndian.PutUint64(dst[len(ukey):], (seq<<8)|uint64(kt)) binary.LittleEndian.PutUint64(dst[len(ukey):], (seq<<8)|uint64(kt))
return internalKey(dst) return internalKey(dst)
@ -143,5 +139,5 @@ func (ik internalKey) String() string {
if ukey, seq, kt, err := parseInternalKey(ik); err == nil { if ukey, seq, kt, err := parseInternalKey(ik); err == nil {
return fmt.Sprintf("%s,%s%d", shorten(string(ukey)), kt, seq) return fmt.Sprintf("%s,%s%d", shorten(string(ukey)), kt, seq)
} }
return "<invalid>" return fmt.Sprintf("<invalid:%#x>", []byte(ik))
} }

View File

@ -17,6 +17,7 @@ import (
"github.com/syndtr/goleveldb/leveldb/util" "github.com/syndtr/goleveldb/leveldb/util"
) )
// Common errors.
var ( var (
ErrNotFound = errors.ErrNotFound ErrNotFound = errors.ErrNotFound
ErrIterReleased = errors.New("leveldb/memdb: iterator released") ErrIterReleased = errors.New("leveldb/memdb: iterator released")
@ -385,7 +386,7 @@ func (p *DB) Find(key []byte) (rkey, value []byte, err error) {
} }
// NewIterator returns an iterator of the DB. // NewIterator returns an iterator of the DB.
// The returned iterator is not goroutine-safe, but it is safe to use // The returned iterator is not safe for concurrent use, but it is safe to use
// multiple iterators concurrently, with each in a dedicated goroutine. // multiple iterators concurrently, with each in a dedicated goroutine.
// It is also safe to use an iterator concurrently with modifying its // It is also safe to use an iterator concurrently with modifying its
// underlying DB. However, the resultant key/value pairs are not guaranteed // underlying DB. However, the resultant key/value pairs are not guaranteed
@ -411,7 +412,7 @@ func (p *DB) Capacity() int {
} }
// Size returns sum of keys and values length. Note that deleted // Size returns sum of keys and values length. Note that deleted
// key/value will not be accouted for, but it will still consume // key/value will not be accounted for, but it will still consume
// the buffer, since the buffer is append only. // the buffer, since the buffer is append only.
func (p *DB) Size() int { func (p *DB) Size() int {
p.mu.RLock() p.mu.RLock()
@ -453,11 +454,14 @@ func (p *DB) Reset() {
p.mu.Unlock() p.mu.Unlock()
} }
// New creates a new initalized in-memory key/value DB. The capacity // New creates a new initialized in-memory key/value DB. The capacity
// is the initial key/value buffer capacity. The capacity is advisory, // is the initial key/value buffer capacity. The capacity is advisory,
// not enforced. // not enforced.
// //
// The returned DB instance is goroutine-safe. // This DB is append-only, deleting an entry would remove entry node but not
// reclaim KV buffer.
//
// The returned DB instance is safe for concurrent use.
func New(cmp comparer.BasicComparer, capacity int) *DB { func New(cmp comparer.BasicComparer, capacity int) *DB {
p := &DB{ p := &DB{
cmp: cmp, cmp: cmp,

View File

@ -73,7 +73,7 @@ var _ = testutil.Defer(func() {
db := New(comparer.DefaultComparer, 0) db := New(comparer.DefaultComparer, 0)
t := testutil.DBTesting{ t := testutil.DBTesting{
DB: db, DB: db,
Deleted: testutil.KeyValue_Generate(nil, 1000, 1, 30, 5, 5).Clone(), Deleted: testutil.KeyValue_Generate(nil, 1000, 1, 1, 30, 5, 5).Clone(),
PostFn: func(t *testutil.DBTesting) { PostFn: func(t *testutil.DBTesting) {
Expect(db.Len()).Should(Equal(t.Present.Len())) Expect(db.Len()).Should(Equal(t.Present.Len()))
Expect(db.Size()).Should(Equal(t.Present.Size())) Expect(db.Size()).Should(Equal(t.Present.Size()))

View File

@ -312,6 +312,11 @@ type Options struct {
// The default is false. // The default is false.
NoSync bool NoSync bool
// NoWriteMerge allows disabling write merge.
//
// The default is false.
NoWriteMerge bool
// OpenFilesCacher provides cache algorithm for open files caching. // OpenFilesCacher provides cache algorithm for open files caching.
// Specify NoCacher to disable caching algorithm. // Specify NoCacher to disable caching algorithm.
// //
@ -543,6 +548,13 @@ func (o *Options) GetNoSync() bool {
return o.NoSync return o.NoSync
} }
func (o *Options) GetNoWriteMerge() bool {
if o == nil {
return false
}
return o.NoWriteMerge
}
func (o *Options) GetOpenFilesCacher() Cacher { func (o *Options) GetOpenFilesCacher() Cacher {
if o == nil || o.OpenFilesCacher == nil { if o == nil || o.OpenFilesCacher == nil {
return DefaultOpenFilesCacher return DefaultOpenFilesCacher
@ -629,6 +641,11 @@ func (ro *ReadOptions) GetStrict(strict Strict) bool {
// WriteOptions holds the optional parameters for 'write operation'. The // WriteOptions holds the optional parameters for 'write operation'. The
// 'write operation' includes Write, Put and Delete. // 'write operation' includes Write, Put and Delete.
type WriteOptions struct { type WriteOptions struct {
// NoWriteMerge allows disabling write merge.
//
// The default is false.
NoWriteMerge bool
// Sync is whether to sync underlying writes from the OS buffer cache // Sync is whether to sync underlying writes from the OS buffer cache
// through to actual disk, if applicable. Setting Sync can result in // through to actual disk, if applicable. Setting Sync can result in
// slower writes. // slower writes.
@ -644,6 +661,13 @@ type WriteOptions struct {
Sync bool Sync bool
} }
func (wo *WriteOptions) GetNoWriteMerge() bool {
if wo == nil {
return false
}
return wo.NoWriteMerge
}
func (wo *WriteOptions) GetSync() bool { func (wo *WriteOptions) GetSync() bool {
if wo == nil { if wo == nil {
return false return false

View File

@ -18,7 +18,8 @@ import (
"github.com/syndtr/goleveldb/leveldb/storage" "github.com/syndtr/goleveldb/leveldb/storage"
) )
// ErrManifestCorrupted records manifest corruption. // ErrManifestCorrupted records manifest corruption. This error will be
// wrapped with errors.ErrCorrupted.
type ErrManifestCorrupted struct { type ErrManifestCorrupted struct {
Field string Field string
Reason string Reason string
@ -42,10 +43,11 @@ type session struct {
stSeqNum uint64 // last mem compacted seq; need external synchronization stSeqNum uint64 // last mem compacted seq; need external synchronization
stor storage.Storage stor storage.Storage
storLock storage.Lock storLock storage.Locker
o *cachedOptions o *cachedOptions
icmp *iComparer icmp *iComparer
tops *tOps tops *tOps
fileRef map[int64]int
manifest *journal.Writer manifest *journal.Writer
manifestWriter storage.Writer manifestWriter storage.Writer
@ -68,6 +70,7 @@ func newSession(stor storage.Storage, o *opt.Options) (s *session, err error) {
s = &session{ s = &session{
stor: stor, stor: stor,
storLock: storLock, storLock: storLock,
fileRef: make(map[int64]int),
} }
s.setOptions(o) s.setOptions(o)
s.tops = newTableOps(s) s.tops = newTableOps(s)
@ -92,7 +95,7 @@ func (s *session) close() {
// Release session lock. // Release session lock.
func (s *session) release() { func (s *session) release() {
s.storLock.Release() s.storLock.Unlock()
} }
// Create a new database session; need external synchronization. // Create a new database session; need external synchronization.

View File

@ -39,6 +39,18 @@ func (s *session) newTemp() storage.FileDesc {
return storage.FileDesc{storage.TypeTemp, num} return storage.FileDesc{storage.TypeTemp, num}
} }
func (s *session) addFileRef(fd storage.FileDesc, ref int) int {
ref += s.fileRef[fd.Num]
if ref > 0 {
s.fileRef[fd.Num] = ref
} else if ref == 0 {
delete(s.fileRef, fd.Num)
} else {
panic(fmt.Sprintf("negative ref: %v", fd))
}
return ref
}
// Session state. // Session state.
// Get current version. This will incr version ref, must call // Get current version. This will incr version ref, must call
@ -46,21 +58,28 @@ func (s *session) newTemp() storage.FileDesc {
func (s *session) version() *version { func (s *session) version() *version {
s.vmu.Lock() s.vmu.Lock()
defer s.vmu.Unlock() defer s.vmu.Unlock()
s.stVersion.ref++ s.stVersion.incref()
return s.stVersion return s.stVersion
} }
func (s *session) tLen(level int) int {
s.vmu.Lock()
defer s.vmu.Unlock()
return s.stVersion.tLen(level)
}
// Set current version to v. // Set current version to v.
func (s *session) setVersion(v *version) { func (s *session) setVersion(v *version) {
s.vmu.Lock() s.vmu.Lock()
v.ref = 1 // Holds by session. defer s.vmu.Unlock()
if old := s.stVersion; old != nil { // Hold by session. It is important to call this first before releasing
v.ref++ // Holds by old version. // current version, otherwise the still used files might get released.
old.next = v v.incref()
old.releaseNB() if s.stVersion != nil {
// Release current version.
s.stVersion.releaseNB()
} }
s.stVersion = v s.stVersion = v
s.vmu.Unlock()
} }
// Get current unused file number. // Get current unused file number.
@ -197,7 +216,7 @@ func (s *session) newManifest(rec *sessionRecord, v *version) (err error) {
if s.manifestWriter != nil { if s.manifestWriter != nil {
s.manifestWriter.Close() s.manifestWriter.Close()
} }
if !s.manifestFd.Nil() { if !s.manifestFd.Zero() {
s.stor.Remove(s.manifestFd) s.stor.Remove(s.manifestFd)
} }
s.manifestFd = fd s.manifestFd = fd

View File

@ -32,7 +32,7 @@ type fileStorageLock struct {
fs *fileStorage fs *fileStorage
} }
func (lock *fileStorageLock) Release() { func (lock *fileStorageLock) Unlock() {
if lock.fs != nil { if lock.fs != nil {
lock.fs.mu.Lock() lock.fs.mu.Lock()
defer lock.fs.mu.Unlock() defer lock.fs.mu.Unlock()
@ -116,7 +116,7 @@ func OpenFile(path string, readOnly bool) (Storage, error) {
return fs, nil return fs, nil
} }
func (fs *fileStorage) Lock() (Lock, error) { func (fs *fileStorage) Lock() (Locker, error) {
fs.mu.Lock() fs.mu.Lock()
defer fs.mu.Unlock() defer fs.mu.Unlock()
if fs.open < 0 { if fs.open < 0 {
@ -323,7 +323,7 @@ func (fs *fileStorage) GetMeta() (fd FileDesc, err error) {
} }
} }
// Don't remove any files if there is no valid CURRENT file. // Don't remove any files if there is no valid CURRENT file.
if fd.Nil() { if fd.Zero() {
if cerr != nil { if cerr != nil {
err = cerr err = cerr
} else { } else {

View File

@ -126,7 +126,7 @@ func TestFileStorage_Locking(t *testing.T) {
} else { } else {
t.Logf("storage lock got error: %s (expected)", err) t.Logf("storage lock got error: %s (expected)", err)
} }
l.Release() l.Unlock()
_, err = p3.Lock() _, err = p3.Lock()
if err != nil { if err != nil {
t.Fatal("storage lock failed(2): ", err) t.Fatal("storage lock failed(2): ", err)

View File

@ -18,7 +18,7 @@ type memStorageLock struct {
ms *memStorage ms *memStorage
} }
func (lock *memStorageLock) Release() { func (lock *memStorageLock) Unlock() {
ms := lock.ms ms := lock.ms
ms.mu.Lock() ms.mu.Lock()
defer ms.mu.Unlock() defer ms.mu.Unlock()
@ -43,7 +43,7 @@ func NewMemStorage() Storage {
} }
} }
func (ms *memStorage) Lock() (Lock, error) { func (ms *memStorage) Lock() (Locker, error) {
ms.mu.Lock() ms.mu.Lock()
defer ms.mu.Unlock() defer ms.mu.Unlock()
if ms.slock != nil { if ms.slock != nil {
@ -69,7 +69,7 @@ func (ms *memStorage) SetMeta(fd FileDesc) error {
func (ms *memStorage) GetMeta() (FileDesc, error) { func (ms *memStorage) GetMeta() (FileDesc, error) {
ms.mu.Lock() ms.mu.Lock()
defer ms.mu.Unlock() defer ms.mu.Unlock()
if ms.meta.Nil() { if ms.meta.Zero() {
return FileDesc{}, os.ErrNotExist return FileDesc{}, os.ErrNotExist
} }
return ms.meta, nil return ms.meta, nil
@ -78,7 +78,7 @@ func (ms *memStorage) GetMeta() (FileDesc, error) {
func (ms *memStorage) List(ft FileType) ([]FileDesc, error) { func (ms *memStorage) List(ft FileType) ([]FileDesc, error) {
ms.mu.Lock() ms.mu.Lock()
var fds []FileDesc var fds []FileDesc
for x, _ := range ms.files { for x := range ms.files {
fd := unpackFile(x) fd := unpackFile(x)
if fd.Type&ft != 0 { if fd.Type&ft != 0 {
fds = append(fds, fd) fds = append(fds, fd)

View File

@ -24,7 +24,7 @@ func TestMemStorage(t *testing.T) {
} else { } else {
t.Logf("storage lock got error: %s (expected)", err) t.Logf("storage lock got error: %s (expected)", err)
} }
l.Release() l.Unlock()
_, err = m.Lock() _, err = m.Lock()
if err != nil { if err != nil {
t.Fatal("storage lock failed(2): ", err) t.Fatal("storage lock failed(2): ", err)

View File

@ -11,12 +11,12 @@ import (
"errors" "errors"
"fmt" "fmt"
"io" "io"
"github.com/syndtr/goleveldb/leveldb/util"
) )
// FileType represent a file type.
type FileType int type FileType int
// File types.
const ( const (
TypeManifest FileType = 1 << iota TypeManifest FileType = 1 << iota
TypeJournal TypeJournal
@ -40,6 +40,7 @@ func (t FileType) String() string {
return fmt.Sprintf("<unknown:%d>", t) return fmt.Sprintf("<unknown:%d>", t)
} }
// Common error.
var ( var (
ErrInvalidFile = errors.New("leveldb/storage: invalid file for argument") ErrInvalidFile = errors.New("leveldb/storage: invalid file for argument")
ErrLocked = errors.New("leveldb/storage: already locked") ErrLocked = errors.New("leveldb/storage: already locked")
@ -55,11 +56,10 @@ type ErrCorrupted struct {
} }
func (e *ErrCorrupted) Error() string { func (e *ErrCorrupted) Error() string {
if !e.Fd.Nil() { if !e.Fd.Zero() {
return fmt.Sprintf("%v [file=%v]", e.Err, e.Fd) return fmt.Sprintf("%v [file=%v]", e.Err, e.Fd)
} else {
return e.Err.Error()
} }
return e.Err.Error()
} }
// Syncer is the interface that wraps basic Sync method. // Syncer is the interface that wraps basic Sync method.
@ -83,11 +83,12 @@ type Writer interface {
Syncer Syncer
} }
type Lock interface { // Locker is the interface that wraps Unlock method.
util.Releaser type Locker interface {
Unlock()
} }
// FileDesc is a file descriptor. // FileDesc is a 'file descriptor'.
type FileDesc struct { type FileDesc struct {
Type FileType Type FileType
Num int64 Num int64
@ -108,12 +109,12 @@ func (fd FileDesc) String() string {
} }
} }
// Nil returns true if fd == (FileDesc{}). // Zero returns true if fd == (FileDesc{}).
func (fd FileDesc) Nil() bool { func (fd FileDesc) Zero() bool {
return fd == (FileDesc{}) return fd == (FileDesc{})
} }
// FileDescOk returns true if fd is a valid file descriptor. // FileDescOk returns true if fd is a valid 'file descriptor'.
func FileDescOk(fd FileDesc) bool { func FileDescOk(fd FileDesc) bool {
switch fd.Type { switch fd.Type {
case TypeManifest: case TypeManifest:
@ -126,43 +127,44 @@ func FileDescOk(fd FileDesc) bool {
return fd.Num >= 0 return fd.Num >= 0
} }
// Storage is the storage. A storage instance must be goroutine-safe. // Storage is the storage. A storage instance must be safe for concurrent use.
type Storage interface { type Storage interface {
// Lock locks the storage. Any subsequent attempt to call Lock will fail // Lock locks the storage. Any subsequent attempt to call Lock will fail
// until the last lock released. // until the last lock released.
// After use the caller should call the Release method. // Caller should call Unlock method after use.
Lock() (Lock, error) Lock() (Locker, error)
// Log logs a string. This is used for logging. // Log logs a string. This is used for logging.
// An implementation may write to a file, stdout or simply do nothing. // An implementation may write to a file, stdout or simply do nothing.
Log(str string) Log(str string)
// SetMeta sets to point to the given fd, which then can be acquired using // SetMeta store 'file descriptor' that can later be acquired using GetMeta
// GetMeta method. // method. The 'file descriptor' should point to a valid file.
// SetMeta should be implemented in such way that changes should happened // SetMeta should be implemented in such way that changes should happen
// atomically. // atomically.
SetMeta(fd FileDesc) error SetMeta(fd FileDesc) error
// GetManifest returns a manifest file. // GetMeta returns 'file descriptor' stored in meta. The 'file descriptor'
// Returns os.ErrNotExist if meta doesn't point to any fd, or point to fd // can be updated using SetMeta method.
// that doesn't exist. // Returns os.ErrNotExist if meta doesn't store any 'file descriptor', or
// 'file descriptor' point to nonexistent file.
GetMeta() (FileDesc, error) GetMeta() (FileDesc, error)
// List returns fds that match the given file types. // List returns file descriptors that match the given file types.
// The file types may be OR'ed together. // The file types may be OR'ed together.
List(ft FileType) ([]FileDesc, error) List(ft FileType) ([]FileDesc, error)
// Open opens file with the given fd read-only. // Open opens file with the given 'file descriptor' read-only.
// Returns os.ErrNotExist error if the file does not exist. // Returns os.ErrNotExist error if the file does not exist.
// Returns ErrClosed if the underlying storage is closed. // Returns ErrClosed if the underlying storage is closed.
Open(fd FileDesc) (Reader, error) Open(fd FileDesc) (Reader, error)
// Create creates file with the given fd, truncate if already exist and // Create creates file with the given 'file descriptor', truncate if already
// opens write-only. // exist and opens write-only.
// Returns ErrClosed if the underlying storage is closed. // Returns ErrClosed if the underlying storage is closed.
Create(fd FileDesc) (Writer, error) Create(fd FileDesc) (Writer, error)
// Remove removes file with the given fd. // Remove removes file with the given 'file descriptor'.
// Returns ErrClosed if the underlying storage is closed. // Returns ErrClosed if the underlying storage is closed.
Remove(fd FileDesc) error Remove(fd FileDesc) error

View File

@ -26,12 +26,15 @@ import (
"github.com/syndtr/goleveldb/leveldb/util" "github.com/syndtr/goleveldb/leveldb/util"
) )
// Reader errors.
var ( var (
ErrNotFound = errors.ErrNotFound ErrNotFound = errors.ErrNotFound
ErrReaderReleased = errors.New("leveldb/table: reader released") ErrReaderReleased = errors.New("leveldb/table: reader released")
ErrIterReleased = errors.New("leveldb/table: iterator released") ErrIterReleased = errors.New("leveldb/table: iterator released")
) )
// ErrCorrupted describes error due to corruption. This error will be wrapped
// with errors.ErrCorrupted.
type ErrCorrupted struct { type ErrCorrupted struct {
Pos int64 Pos int64
Size int64 Size int64
@ -61,7 +64,7 @@ type block struct {
func (b *block) seek(cmp comparer.Comparer, rstart, rlimit int, key []byte) (index, offset int, err error) { func (b *block) seek(cmp comparer.Comparer, rstart, rlimit int, key []byte) (index, offset int, err error) {
index = sort.Search(b.restartsLen-rstart-(b.restartsLen-rlimit), func(i int) bool { index = sort.Search(b.restartsLen-rstart-(b.restartsLen-rlimit), func(i int) bool {
offset := int(binary.LittleEndian.Uint32(b.data[b.restartsOffset+4*(rstart+i):])) offset := int(binary.LittleEndian.Uint32(b.data[b.restartsOffset+4*(rstart+i):]))
offset += 1 // shared always zero, since this is a restart point offset++ // shared always zero, since this is a restart point
v1, n1 := binary.Uvarint(b.data[offset:]) // key length v1, n1 := binary.Uvarint(b.data[offset:]) // key length
_, n2 := binary.Uvarint(b.data[offset+n1:]) // value length _, n2 := binary.Uvarint(b.data[offset+n1:]) // value length
m := offset + n1 + n2 m := offset + n1 + n2
@ -356,7 +359,7 @@ func (i *blockIter) Prev() bool {
i.value = nil i.value = nil
offset := i.block.restartOffset(ri) offset := i.block.restartOffset(ri)
if offset == i.offset { if offset == i.offset {
ri -= 1 ri--
if ri < 0 { if ri < 0 {
i.dir = dirSOI i.dir = dirSOI
return false return false
@ -783,8 +786,8 @@ func (r *Reader) getDataIterErr(dataBH blockHandle, slice *util.Range, verifyChe
// table. And a nil Range.Limit is treated as a key after all keys in // table. And a nil Range.Limit is treated as a key after all keys in
// the table. // the table.
// //
// The returned iterator is not goroutine-safe and should be released // The returned iterator is not safe for concurrent use and should be released
// when not used. // after use.
// //
// Also read Iterator documentation of the leveldb/iterator package. // Also read Iterator documentation of the leveldb/iterator package.
func (r *Reader) NewIterator(slice *util.Range, ro *opt.ReadOptions) iterator.Iterator { func (r *Reader) NewIterator(slice *util.Range, ro *opt.ReadOptions) iterator.Iterator {
@ -826,18 +829,21 @@ func (r *Reader) find(key []byte, filtered bool, ro *opt.ReadOptions, noValue bo
index := r.newBlockIter(indexBlock, nil, nil, true) index := r.newBlockIter(indexBlock, nil, nil, true)
defer index.Release() defer index.Release()
if !index.Seek(key) { if !index.Seek(key) {
err = index.Error() if err = index.Error(); err == nil {
if err == nil {
err = ErrNotFound err = ErrNotFound
} }
return return
} }
dataBH, n := decodeBlockHandle(index.Value()) dataBH, n := decodeBlockHandle(index.Value())
if n == 0 { if n == 0 {
r.err = r.newErrCorruptedBH(r.indexBH, "bad data block handle") r.err = r.newErrCorruptedBH(r.indexBH, "bad data block handle")
return return nil, nil, r.err
} }
// The filter should only used for exact match.
if filtered && r.filter != nil { if filtered && r.filter != nil {
filterBlock, frel, ferr := r.getFilterBlock(true) filterBlock, frel, ferr := r.getFilterBlock(true)
if ferr == nil { if ferr == nil {
@ -847,30 +853,53 @@ func (r *Reader) find(key []byte, filtered bool, ro *opt.ReadOptions, noValue bo
} }
frel.Release() frel.Release()
} else if !errors.IsCorrupted(ferr) { } else if !errors.IsCorrupted(ferr) {
err = ferr return nil, nil, ferr
}
}
data := r.getDataIter(dataBH, nil, r.verifyChecksum, !ro.GetDontFillCache())
if !data.Seek(key) {
data.Release()
if err = data.Error(); err != nil {
return
}
// The nearest greater-than key is the first key of the next block.
if !index.Next() {
if err = index.Error(); err == nil {
err = ErrNotFound
}
return
}
dataBH, n = decodeBlockHandle(index.Value())
if n == 0 {
r.err = r.newErrCorruptedBH(r.indexBH, "bad data block handle")
return nil, nil, r.err
}
data = r.getDataIter(dataBH, nil, r.verifyChecksum, !ro.GetDontFillCache())
if !data.Next() {
data.Release()
if err = data.Error(); err == nil {
err = ErrNotFound
}
return return
} }
} }
data := r.getDataIter(dataBH, nil, r.verifyChecksum, !ro.GetDontFillCache())
defer data.Release() // Key doesn't use block buffer, no need to copy the buffer.
if !data.Seek(key) {
err = data.Error()
if err == nil {
err = ErrNotFound
}
return
}
// Don't use block buffer, no need to copy the buffer.
rkey = data.Key() rkey = data.Key()
if !noValue { if !noValue {
if r.bpool == nil { if r.bpool == nil {
value = data.Value() value = data.Value()
} else { } else {
// Use block buffer, and since the buffer will be recycled, the buffer // Value does use block buffer, and since the buffer will be
// need to be copied. // recycled, it need to be copied.
value = append([]byte{}, data.Value()...) value = append([]byte{}, data.Value()...)
} }
} }
data.Release()
return return
} }
@ -987,7 +1016,7 @@ func (r *Reader) Release() {
// NewReader creates a new initialized table reader for the file. // NewReader creates a new initialized table reader for the file.
// The fi, cache and bpool is optional and can be nil. // The fi, cache and bpool is optional and can be nil.
// //
// The returned table reader instance is goroutine-safe. // The returned table reader instance is safe for concurrent use.
func NewReader(f io.ReaderAt, size int64, fd storage.FileDesc, cache *cache.NamespaceGetter, bpool *util.BufferPool, o *opt.Options) (*Reader, error) { func NewReader(f io.ReaderAt, size int64, fd storage.FileDesc, cache *cache.NamespaceGetter, bpool *util.BufferPool, o *opt.Options) (*Reader, error) {
if f == nil { if f == nil {
return nil, errors.New("leveldb/table: nil file") return nil, errors.New("leveldb/table: nil file")
@ -1039,9 +1068,8 @@ func NewReader(f io.ReaderAt, size int64, fd storage.FileDesc, cache *cache.Name
if errors.IsCorrupted(err) { if errors.IsCorrupted(err) {
r.err = err r.err = err
return r, nil return r, nil
} else {
return nil, err
} }
return nil, err
} }
// Set data end. // Set data end.
@ -1086,9 +1114,8 @@ func NewReader(f io.ReaderAt, size int64, fd storage.FileDesc, cache *cache.Name
if errors.IsCorrupted(err) { if errors.IsCorrupted(err) {
r.err = err r.err = err
return r, nil return r, nil
} else {
return nil, err
} }
return nil, err
} }
if r.filter != nil { if r.filter != nil {
r.filterBlock, err = r.readFilterBlock(r.filterBH) r.filterBlock, err = r.readFilterBlock(r.filterBH)

View File

@ -111,7 +111,7 @@ var _ = testutil.Defer(func() {
} }
testutil.AllKeyValueTesting(nil, Build, nil, nil) testutil.AllKeyValueTesting(nil, Build, nil, nil)
Describe("with one key per block", Test(testutil.KeyValue_Generate(nil, 9, 1, 10, 512, 512), func(r *Reader) { Describe("with one key per block", Test(testutil.KeyValue_Generate(nil, 9, 1, 1, 10, 512, 512), func(r *Reader) {
It("should have correct blocks number", func() { It("should have correct blocks number", func() {
indexBlock, err := r.readBlock(r.indexBH, true) indexBlock, err := r.readBlock(r.indexBH, true)
Expect(err).To(BeNil()) Expect(err).To(BeNil())

View File

@ -349,7 +349,7 @@ func (w *Writer) Close() error {
// NewWriter creates a new initialized table writer for the file. // NewWriter creates a new initialized table writer for the file.
// //
// Table writer is not goroutine-safe. // Table writer is not safe for concurrent use.
func NewWriter(f io.Writer, o *opt.Options) *Writer { func NewWriter(f io.Writer, o *opt.Options) *Writer {
w := &Writer{ w := &Writer{
writer: f, writer: f,

View File

@ -292,7 +292,7 @@ func KeyValue_MultipleKeyValue() *KeyValue {
var keymap = []byte("012345678ABCDEFGHIJKLMNOPQRSTUVWXYabcdefghijklmnopqrstuvwxy") var keymap = []byte("012345678ABCDEFGHIJKLMNOPQRSTUVWXYabcdefghijklmnopqrstuvwxy")
func KeyValue_Generate(rnd *rand.Rand, n, minlen, maxlen, vminlen, vmaxlen int) *KeyValue { func KeyValue_Generate(rnd *rand.Rand, n, incr, minlen, maxlen, vminlen, vmaxlen int) *KeyValue {
if rnd == nil { if rnd == nil {
rnd = NewRand() rnd = NewRand()
} }
@ -308,7 +308,7 @@ func KeyValue_Generate(rnd *rand.Rand, n, minlen, maxlen, vminlen, vmaxlen int)
} }
kv := &KeyValue{} kv := &KeyValue{}
endC := byte(len(keymap) - 1) endC := byte(len(keymap) - incr)
gen := make([]byte, 0, maxlen) gen := make([]byte, 0, maxlen)
for i := 0; i < n; i++ { for i := 0; i < n; i++ {
m := rrand(minlen, maxlen) m := rrand(minlen, maxlen)
@ -325,8 +325,8 @@ func KeyValue_Generate(rnd *rand.Rand, n, minlen, maxlen, vminlen, vmaxlen int)
if c == endC { if c == endC {
continue continue
} }
gen[j] = c + 1 gen[j] = c + byte(incr)
for j += 1; j < m; j++ { for j++; j < m; j++ {
gen[j] = 0 gen[j] = 0
} }
goto ok goto ok

View File

@ -23,15 +23,15 @@ func TestFind(db Find, kv KeyValue) {
// Using exact key. // Using exact key.
rkey, rvalue, err := db.TestFind(key) rkey, rvalue, err := db.TestFind(key)
Expect(err).ShouldNot(HaveOccurred(), "Error for key %q", key) Expect(err).ShouldNot(HaveOccurred(), "Error for exact key %q", key)
Expect(rkey).Should(Equal(key), "Key") Expect(rkey).Should(Equal(key), "Key")
Expect(rvalue).Should(Equal(value), "Value for key %q", key) Expect(rvalue).Should(Equal(value), "Value for exact key %q", key)
// Using inexact key. // Using inexact key.
rkey, rvalue, err = db.TestFind(key_) rkey, rvalue, err = db.TestFind(key_)
Expect(err).ShouldNot(HaveOccurred(), "Error for key %q (%q)", key_, key) Expect(err).ShouldNot(HaveOccurred(), "Error for inexact key %q (%q)", key_, key)
Expect(rkey).Should(Equal(key)) Expect(rkey).Should(Equal(key), "Key for inexact key %q (%q)", key_, key)
Expect(rvalue).Should(Equal(value), "Value for key %q (%q)", key_, key) Expect(rvalue).Should(Equal(value), "Value for inexact key %q (%q)", key_, key)
}) })
} }
@ -140,7 +140,7 @@ func KeyValueTesting(rnd *rand.Rand, kv KeyValue, p DB, setup func(KeyValue) DB,
TestIter(db, nil, kv.Clone()) TestIter(db, nil, kv.Clone())
} }
done <- true done <- true
}, 3.0) }, 30.0)
It("Should iterates and seeks slice correctly", func(done Done) { It("Should iterates and seeks slice correctly", func(done Done) {
if db, ok := p.(NewIterator); ok { if db, ok := p.(NewIterator); ok {
@ -162,7 +162,7 @@ func KeyValueTesting(rnd *rand.Rand, kv KeyValue, p DB, setup func(KeyValue) DB,
}) })
} }
done <- true done <- true
}, 50.0) }, 200.0)
It("Should iterates and seeks slice correctly", func(done Done) { It("Should iterates and seeks slice correctly", func(done Done) {
if db, ok := p.(NewIterator); ok { if db, ok := p.(NewIterator); ok {
@ -174,7 +174,7 @@ func KeyValueTesting(rnd *rand.Rand, kv KeyValue, p DB, setup func(KeyValue) DB,
}) })
} }
done <- true done <- true
}, 50.0) }, 200.0)
} }
func AllKeyValueTesting(rnd *rand.Rand, body, setup func(KeyValue) DB, teardown func(DB)) { func AllKeyValueTesting(rnd *rand.Rand, body, setup func(KeyValue) DB, teardown func(DB)) {
@ -207,5 +207,6 @@ func AllKeyValueTesting(rnd *rand.Rand, body, setup func(KeyValue) DB, teardown
Describe("with big value", Test(KeyValue_BigValue())) Describe("with big value", Test(KeyValue_BigValue()))
Describe("with special key", Test(KeyValue_SpecialKey())) Describe("with special key", Test(KeyValue_SpecialKey()))
Describe("with multiple key/value", Test(KeyValue_MultipleKeyValue())) Describe("with multiple key/value", Test(KeyValue_MultipleKeyValue()))
Describe("with generated key/value", Test(KeyValue_Generate(nil, 120, 1, 50, 10, 120))) Describe("with generated key/value 2-incr", Test(KeyValue_Generate(nil, 120, 2, 1, 50, 10, 120)))
Describe("with generated key/value 3-incr", Test(KeyValue_Generate(nil, 120, 3, 1, 50, 10, 120)))
} }

View File

@ -20,7 +20,6 @@ import (
. "github.com/onsi/gomega" . "github.com/onsi/gomega"
"github.com/syndtr/goleveldb/leveldb/storage" "github.com/syndtr/goleveldb/leveldb/storage"
"github.com/syndtr/goleveldb/leveldb/util"
) )
var ( var (
@ -161,11 +160,11 @@ func (err emulatedError) Error() string {
type storageLock struct { type storageLock struct {
s *Storage s *Storage
r util.Releaser l storage.Locker
} }
func (l storageLock) Release() { func (l storageLock) Unlock() {
l.r.Release() l.l.Unlock()
l.s.logI("storage lock released") l.s.logI("storage lock released")
} }
@ -332,7 +331,7 @@ func (s *Storage) Log(str string) {
s.Storage.Log(str) s.Storage.Log(str)
} }
func (s *Storage) Lock() (l storage.Lock, err error) { func (s *Storage) Lock() (l storage.Locker, err error) {
l, err = s.Storage.Lock() l, err = s.Storage.Lock()
if err != nil { if err != nil {
s.logI("storage locking failed, err=%v", err) s.logI("storage locking failed, err=%v", err)

View File

@ -89,3 +89,10 @@ func (p fdSorter) Swap(i, j int) {
func sortFds(fds []storage.FileDesc) { func sortFds(fds []storage.FileDesc) {
sort.Sort(fdSorter(fds)) sort.Sort(fdSorter(fds))
} }
func ensureBuffer(b []byte, n int) []byte {
if cap(b) < n {
return make([]byte, n)
}
return b[:n]
}

View File

@ -34,44 +34,48 @@ type version struct {
cSeek unsafe.Pointer cSeek unsafe.Pointer
closing bool closing bool
ref int ref int
// Succeeding version. released bool
next *version
} }
func newVersion(s *session) *version { func newVersion(s *session) *version {
return &version{s: s} return &version{s: s}
} }
func (v *version) incref() {
if v.released {
panic("already released")
}
v.ref++
if v.ref == 1 {
// Incr file ref.
for _, tt := range v.levels {
for _, t := range tt {
v.s.addFileRef(t.fd, 1)
}
}
}
}
func (v *version) releaseNB() { func (v *version) releaseNB() {
v.ref-- v.ref--
if v.ref > 0 { if v.ref > 0 {
return return
} } else if v.ref < 0 {
if v.ref < 0 {
panic("negative version ref") panic("negative version ref")
} }
nextTables := make(map[int64]bool)
for _, tt := range v.next.levels {
for _, t := range tt {
num := t.fd.Num
nextTables[num] = true
}
}
for _, tt := range v.levels { for _, tt := range v.levels {
for _, t := range tt { for _, t := range tt {
num := t.fd.Num if v.s.addFileRef(t.fd, -1) == 0 {
if _, ok := nextTables[num]; !ok {
v.s.tops.remove(t) v.s.tops.remove(t)
} }
} }
} }
v.next.releaseNB() v.released = true
v.next = nil
} }
func (v *version) release() { func (v *version) release() {

View File

@ -331,7 +331,7 @@ func main() {
log.Printf("FATAL: "+format, v...) log.Printf("FATAL: "+format, v...)
if err != nil && errors.IsCorrupted(err) { if err != nil && errors.IsCorrupted(err) {
cerr := err.(*errors.ErrCorrupted) cerr := err.(*errors.ErrCorrupted)
if !cerr.Fd.Nil() && cerr.Fd.Type == storage.TypeTable { if !cerr.Fd.Zero() && cerr.Fd.Type == storage.TypeTable {
log.Print("FATAL: corruption detected, scanning...") log.Print("FATAL: corruption detected, scanning...")
if !tstor.scanTable(storage.FileDesc{Type: storage.TypeTable, Num: cerr.Fd.Num}, false) { if !tstor.scanTable(storage.FileDesc{Type: storage.TypeTable, Num: cerr.Fd.Num}, false) {
log.Printf("FATAL: unable to find corrupted key/value pair in table %v", cerr.Fd) log.Printf("FATAL: unable to find corrupted key/value pair in table %v", cerr.Fd)

4
vendor/manifest vendored
View File

@ -261,8 +261,8 @@
{ {
"importpath": "github.com/syndtr/goleveldb", "importpath": "github.com/syndtr/goleveldb",
"repository": "https://github.com/syndtr/goleveldb", "repository": "https://github.com/syndtr/goleveldb",
"vcs": "", "vcs": "git",
"revision": "6ae1797c0b42b9323fc27ff7dcf568df88f2f33d", "revision": "23851d93a2292dcc56e71a18ec9e0624d84a0f65",
"branch": "master" "branch": "master"
}, },
{ {