Update all deps to latest version

This commit is contained in:
Jakob Borg 2014-07-23 08:31:36 +02:00
parent 08ca9f9378
commit 544fea51b0
26 changed files with 1197 additions and 808 deletions

20
Godeps/Godeps.json generated
View File

@ -12,13 +12,13 @@
}, },
{ {
"ImportPath": "code.google.com/p/go.crypto/bcrypt", "ImportPath": "code.google.com/p/go.crypto/bcrypt",
"Comment": "null-212", "Comment": "null-213",
"Rev": "1064b89a6fb591df0dd65422295b8498916b092f" "Rev": "aa2644fe4aa50e3b38d75187b4799b1f0c9ddcef"
}, },
{ {
"ImportPath": "code.google.com/p/go.crypto/blowfish", "ImportPath": "code.google.com/p/go.crypto/blowfish",
"Comment": "null-212", "Comment": "null-213",
"Rev": "1064b89a6fb591df0dd65422295b8498916b092f" "Rev": "aa2644fe4aa50e3b38d75187b4799b1f0c9ddcef"
}, },
{ {
"ImportPath": "code.google.com/p/go.net/html", "ImportPath": "code.google.com/p/go.net/html",
@ -27,13 +27,13 @@
}, },
{ {
"ImportPath": "code.google.com/p/go.text/transform", "ImportPath": "code.google.com/p/go.text/transform",
"Comment": "null-87", "Comment": "null-88",
"Rev": "c59e4f2f93824f81213799e64c3eead7be24660a" "Rev": "1506dcc33592c369c3be7bd30b38f90445b86deb"
}, },
{ {
"ImportPath": "code.google.com/p/go.text/unicode/norm", "ImportPath": "code.google.com/p/go.text/unicode/norm",
"Comment": "null-87", "Comment": "null-88",
"Rev": "c59e4f2f93824f81213799e64c3eead7be24660a" "Rev": "1506dcc33592c369c3be7bd30b38f90445b86deb"
}, },
{ {
"ImportPath": "code.google.com/p/snappy-go/snappy", "ImportPath": "code.google.com/p/snappy-go/snappy",
@ -42,7 +42,7 @@
}, },
{ {
"ImportPath": "github.com/golang/groupcache/lru", "ImportPath": "github.com/golang/groupcache/lru",
"Rev": "a531d51b7f9f3dd13c1c2b50d42d739b70442dbb" "Rev": "8b25adc0f62632c810997cb38c21111a3f256bf4"
}, },
{ {
"ImportPath": "github.com/juju/ratelimit", "ImportPath": "github.com/juju/ratelimit",
@ -50,7 +50,7 @@
}, },
{ {
"ImportPath": "github.com/syndtr/goleveldb/leveldb", "ImportPath": "github.com/syndtr/goleveldb/leveldb",
"Rev": "e1f2d2bdccd7c62f4d4a29aaf081bf1fc4404f91" "Rev": "ba4481e4cb1d45f586e32be2ab663f173b08b207"
}, },
{ {
"ImportPath": "github.com/vitrun/qart/coding", "ImportPath": "github.com/vitrun/qart/coding",

View File

@ -9,6 +9,7 @@
package transform package transform
import ( import (
"bytes"
"errors" "errors"
"io" "io"
"unicode/utf8" "unicode/utf8"
@ -127,7 +128,7 @@ func (r *Reader) Read(p []byte) (int, error) {
// cannot read more bytes into src. // cannot read more bytes into src.
r.transformComplete = r.err != nil r.transformComplete = r.err != nil
continue continue
case err == ErrShortDst && r.dst1 != 0: case err == ErrShortDst && (r.dst1 != 0 || n != 0):
// Make room in dst by copying out, and try again. // Make room in dst by copying out, and try again.
continue continue
case err == ErrShortSrc && r.src1-r.src0 != len(r.src) && r.err == nil: case err == ErrShortSrc && r.src1-r.src0 != len(r.src) && r.err == nil:
@ -210,7 +211,7 @@ func (w *Writer) Write(data []byte) (n int, err error) {
n += nSrc n += nSrc
} }
switch { switch {
case err == ErrShortDst && nDst > 0: case err == ErrShortDst && (nDst > 0 || nSrc > 0):
case err == ErrShortSrc && len(src) < len(w.src): case err == ErrShortSrc && len(src) < len(w.src):
m := copy(w.src, src) m := copy(w.src, src)
// If w.n > 0, bytes from data were already copied to w.src and n // If w.n > 0, bytes from data were already copied to w.src and n
@ -467,30 +468,125 @@ func (t removeF) Transform(dst, src []byte, atEOF bool) (nDst, nSrc int, err err
return return
} }
// Bytes returns a new byte slice with the result of converting b using t. // grow returns a new []byte that is longer than b, and copies the first n bytes
// If any unrecoverable error occurs it returns nil. // of b to the start of the new slice.
func Bytes(t Transformer, b []byte) []byte { func grow(b []byte, n int) []byte {
out := make([]byte, len(b)) m := len(b)
n := 0 if m <= 256 {
for { m *= 2
nDst, nSrc, err := t.Transform(out[n:], b, true) } else {
n += nDst m += m >> 1
if err == nil { }
return out[:n] buf := make([]byte, m)
} else if err != ErrShortDst { copy(buf, b[:n])
return nil return buf
} }
b = b[nSrc:]
// Grow the destination buffer. const initialBufSize = 128
sz := len(out)
if sz <= 256 { // String returns a string with the result of converting s[:n] using t, where
sz *= 2 // n <= len(s). If err == nil, n will be len(s).
} else { func String(t Transformer, s string) (result string, n int, err error) {
sz += sz >> 1 if s == "" {
return "", 0, nil
}
// Allocate only once. Note that both dst and src escape when passed to
// Transform.
buf := [2 * initialBufSize]byte{}
dst := buf[:initialBufSize:initialBufSize]
src := buf[initialBufSize : 2*initialBufSize]
// Avoid allocation if the transformed string is identical to the original.
// After this loop, pDst will point to the furthest point in s for which it
// could be detected that t gives equal results, src[:nSrc] will
// indicated the last processed chunk of s for which the output is not equal
// and dst[:nDst] will be the transform of this chunk.
var nDst, nSrc int
pDst := 0 // Used as index in both src and dst in this loop.
for {
n := copy(src, s[pDst:])
nDst, nSrc, err = t.Transform(dst, src[:n], pDst+n == len(s))
// Note 1: we will not enter the loop with pDst == len(s) and we will
// not end the loop with it either. So if nSrc is 0, this means there is
// some kind of error from which we cannot recover given the current
// buffer sizes. We will give up in this case.
// Note 2: it is not entirely correct to simply do a bytes.Equal as
// a Transformer may buffer internally. It will work in most cases,
// though, and no harm is done if it doesn't work.
// TODO: let transformers implement an optional Spanner interface, akin
// to norm's QuickSpan. This would even allow us to avoid any allocation.
if nSrc == 0 || !bytes.Equal(dst[:nDst], src[:nSrc]) {
break
}
if pDst += nDst; pDst == len(s) {
return s, pDst, nil
}
}
// Move the bytes seen so far to dst.
pSrc := pDst + nSrc
if pDst+nDst <= initialBufSize {
copy(dst[pDst:], dst[:nDst])
} else {
b := make([]byte, len(s)+nDst-nSrc)
copy(b[pDst:], dst[:nDst])
dst = b
}
copy(dst, s[:pDst])
pDst += nDst
if err != nil && err != ErrShortDst && err != ErrShortSrc {
return string(dst[:pDst]), pSrc, err
}
// Complete the string with the remainder.
for {
n := copy(src, s[pSrc:])
nDst, nSrc, err = t.Transform(dst[pDst:], src[:n], pSrc+n == len(s))
pDst += nDst
pSrc += nSrc
switch err {
case nil:
if pSrc == len(s) {
return string(dst[:pDst]), pSrc, nil
}
case ErrShortDst:
// Do not grow as long as we can make progress. This may avoid
// excessive allocations.
if nDst == 0 {
dst = grow(dst, pDst)
}
case ErrShortSrc:
if nSrc == 0 {
src = grow(src, 0)
}
default:
return string(dst[:pDst]), pSrc, err
}
}
}
// Bytes returns a new byte slice with the result of converting b[:n] using t,
// where n <= len(b). If err == nil, n will be len(b).
func Bytes(t Transformer, b []byte) (result []byte, n int, err error) {
dst := make([]byte, len(b))
pDst, pSrc := 0, 0
for {
nDst, nSrc, err := t.Transform(dst[pDst:], b[pSrc:], true)
pDst += nDst
pSrc += nSrc
if err != ErrShortDst {
return dst[:pDst], pSrc, err
}
// Grow the destination buffer, but do not grow as long as we can make
// progress. This may avoid excessive allocations.
if nDst == 0 {
dst = grow(dst, pDst)
} }
out2 := make([]byte, sz)
copy(out2, out[:n])
out = out2
} }
} }

View File

@ -12,6 +12,7 @@ import (
"strconv" "strconv"
"strings" "strings"
"testing" "testing"
"time"
"unicode/utf8" "unicode/utf8"
) )
@ -132,6 +133,43 @@ func (e rleEncode) Transform(dst, src []byte, atEOF bool) (nDst, nSrc int, err e
return nDst, nSrc, nil return nDst, nSrc, nil
} }
// trickler consumes all input bytes, but writes a single byte at a time to dst.
type trickler []byte
func (t *trickler) Transform(dst, src []byte, atEOF bool) (nDst, nSrc int, err error) {
*t = append(*t, src...)
if len(*t) == 0 {
return 0, 0, nil
}
if len(dst) == 0 {
return 0, len(src), ErrShortDst
}
dst[0] = (*t)[0]
*t = (*t)[1:]
if len(*t) > 0 {
err = ErrShortDst
}
return 1, len(src), err
}
// delayedTrickler is like trickler, but delays writing output to dst. This is
// highly unlikely to be relevant in practice, but it seems like a good idea
// to have some tolerance as long as progress can be detected.
type delayedTrickler []byte
func (t *delayedTrickler) Transform(dst, src []byte, atEOF bool) (nDst, nSrc int, err error) {
if len(*t) > 0 && len(dst) > 0 {
dst[0] = (*t)[0]
*t = (*t)[1:]
nDst = 1
}
*t = append(*t, src...)
if len(*t) > 0 {
err = ErrShortDst
}
return nDst, len(src), err
}
type testCase struct { type testCase struct {
desc string desc string
t Transformer t Transformer
@ -170,6 +208,15 @@ func (c chain) String() string {
} }
var testCases = []testCase{ var testCases = []testCase{
{
desc: "empty",
t: lowerCaseASCII{},
src: "",
dstSize: 100,
srcSize: 100,
wantStr: "",
},
{ {
desc: "basic", desc: "basic",
t: lowerCaseASCII{}, t: lowerCaseASCII{},
@ -378,6 +425,24 @@ var testCases = []testCase{
ioSize: 10, ioSize: 10,
wantStr: "4a6b2b4c4d1d", wantStr: "4a6b2b4c4d1d",
}, },
{
desc: "trickler",
t: &trickler{},
src: "abcdefghijklm",
dstSize: 3,
srcSize: 15,
wantStr: "abcdefghijklm",
},
{
desc: "delayedTrickler",
t: &delayedTrickler{},
src: "abcdefghijklm",
dstSize: 3,
srcSize: 15,
wantStr: "abcdefghijklm",
},
} }
func TestReader(t *testing.T) { func TestReader(t *testing.T) {
@ -685,7 +750,7 @@ func doTransform(tc testCase) (res string, iter int, err error) {
switch { switch {
case err == nil && len(in) != 0: case err == nil && len(in) != 0:
case err == ErrShortSrc && nSrc > 0: case err == ErrShortSrc && nSrc > 0:
case err == ErrShortDst && nDst > 0: case err == ErrShortDst && (nDst > 0 || nSrc > 0):
default: default:
return string(out), iter, err return string(out), iter, err
} }
@ -875,27 +940,136 @@ func TestRemoveFunc(t *testing.T) {
} }
} }
func TestBytes(t *testing.T) { func testString(t *testing.T, f func(Transformer, string) (string, int, error)) {
for _, tt := range append(testCases, chainTests()...) { for _, tt := range append(testCases, chainTests()...) {
if tt.desc == "allowStutter = true" { if tt.desc == "allowStutter = true" {
// We don't have control over the buffer size, so we eliminate tests // We don't have control over the buffer size, so we eliminate tests
// that depend on a specific buffer size being set. // that depend on a specific buffer size being set.
continue continue
} }
got := Bytes(tt.t, []byte(tt.src)) reset(tt.t)
if tt.wantErr != nil { if tt.wantErr == ErrShortDst || tt.wantErr == ErrShortSrc {
if tt.wantErr != ErrShortDst && tt.wantErr != ErrShortSrc { // The result string will be different.
// Bytes should return nil for non-recoverable errors.
if g, w := (got == nil), (tt.wantErr != nil); g != w {
t.Errorf("%s:error: got %v; want %v", tt.desc, g, w)
}
}
// The output strings in the tests that expect an error will
// almost certainly not be the same as the result of Bytes.
continue continue
} }
if string(got) != tt.wantStr { got, n, err := f(tt.t, tt.src)
if tt.wantErr != err {
t.Errorf("%s:error: got %v; want %v", tt.desc, err, tt.wantErr)
}
if got, want := err == nil, n == len(tt.src); got != want {
t.Errorf("%s:n: got %v; want %v", tt.desc, got, want)
}
if got != tt.wantStr {
t.Errorf("%s:string: got %q; want %q", tt.desc, got, tt.wantStr) t.Errorf("%s:string: got %q; want %q", tt.desc, got, tt.wantStr)
} }
} }
} }
func TestBytes(t *testing.T) {
testString(t, func(z Transformer, s string) (string, int, error) {
b, n, err := Bytes(z, []byte(s))
return string(b), n, err
})
}
func TestString(t *testing.T) {
testString(t, String)
// Overrun the internal destination buffer.
for i, s := range []string{
strings.Repeat("a", initialBufSize-1),
strings.Repeat("a", initialBufSize+0),
strings.Repeat("a", initialBufSize+1),
strings.Repeat("A", initialBufSize-1),
strings.Repeat("A", initialBufSize+0),
strings.Repeat("A", initialBufSize+1),
strings.Repeat("A", 2*initialBufSize-1),
strings.Repeat("A", 2*initialBufSize+0),
strings.Repeat("A", 2*initialBufSize+1),
strings.Repeat("a", initialBufSize-2) + "A",
strings.Repeat("a", initialBufSize-1) + "A",
strings.Repeat("a", initialBufSize+0) + "A",
strings.Repeat("a", initialBufSize+1) + "A",
} {
got, _, _ := String(lowerCaseASCII{}, s)
if want := strings.ToLower(s); got != want {
t.Errorf("%d:dst buffer test: got %s (%d); want %s (%d)", i, got, len(got), want, len(want))
}
}
// Overrun the internal source buffer.
for i, s := range []string{
strings.Repeat("a", initialBufSize-1),
strings.Repeat("a", initialBufSize+0),
strings.Repeat("a", initialBufSize+1),
strings.Repeat("a", 2*initialBufSize+1),
strings.Repeat("a", 2*initialBufSize+0),
strings.Repeat("a", 2*initialBufSize+1),
} {
got, _, _ := String(rleEncode{}, s)
if want := fmt.Sprintf("%da", len(s)); got != want {
t.Errorf("%d:src buffer test: got %s (%d); want %s (%d)", i, got, len(got), want, len(want))
}
}
// Test allocations for non-changing strings.
// Note we still need to allocate a single buffer.
for i, s := range []string{
"",
"123",
"123456789",
strings.Repeat("a", initialBufSize),
strings.Repeat("a", 10*initialBufSize),
} {
if n := testing.AllocsPerRun(5, func() { String(&lowerCaseASCII{}, s) }); n > 1 {
t.Errorf("%d: #allocs was %f; want 1", i, n)
}
}
}
// TestBytesAllocation tests that buffer growth stays limited with the trickler
// transformer, which behaves oddly but within spec. In case buffer growth is
// not correctly handled, the test will either panic with a failed allocation or
// thrash. To ensure the tests terminate under the last condition, we time out
// after some sufficiently long period of time.
func TestBytesAllocation(t *testing.T) {
done := make(chan bool)
go func() {
in := bytes.Repeat([]byte{'a'}, 1000)
tr := trickler(make([]byte, 1))
Bytes(&tr, in)
done <- true
}()
select {
case <-done:
case <-time.After(3 * time.Second):
t.Error("time out, likely due to excessive allocation")
}
}
// TestStringAllocation tests that buffer growth stays limited with the trickler
// transformer, which behaves oddly but within spec. In case buffer growth is
// not correctly handled, the test will either panic with a failed allocation or
// thrash. To ensure the tests terminate under the last condition, we time out
// after some sufficiently long period of time.
func TestStringAllocation(t *testing.T) {
done := make(chan bool)
go func() {
in := strings.Repeat("a", 1000)
tr := trickler(make([]byte, 1))
String(&tr, in)
done <- true
}()
select {
case <-done:
case <-time.After(3 * time.Second):
t.Error("time out, likely due to excessive allocation")
}
}
func BenchmarkStringLower(b *testing.B) {
in := strings.Repeat("a", 4096)
for i := 0; i < b.N; i++ {
String(&lowerCaseASCII{}, in)
}
}

View File

@ -29,7 +29,7 @@ type DelFin func(exist bool)
// to call it or not. // to call it or not.
type PurgeFin func(ns, key uint64, delfin DelFin) type PurgeFin func(ns, key uint64, delfin DelFin)
// Cache is a cache tree. // Cache is a cache tree. A cache instance must be goroutine-safe.
type Cache interface { type Cache interface {
// SetCapacity sets cache capacity. // SetCapacity sets cache capacity.
SetCapacity(capacity int) SetCapacity(capacity int)
@ -44,7 +44,7 @@ type Cache interface {
Zap(closed bool) Zap(closed bool)
} }
// Namespace is a cache namespace. // Namespace is a cache namespace. A namespace instance must be goroutine-safe.
type Namespace interface { type Namespace interface {
// Get gets cache object for the given key. The given SetFunc (if not nil) will // Get gets cache object for the given key. The given SetFunc (if not nil) will
// be called if the given key does not exist. // be called if the given key does not exist.

View File

@ -30,9 +30,10 @@ type DB struct {
// Need 64-bit alignment. // Need 64-bit alignment.
seq uint64 seq uint64
// Session.
s *session s *session
// MemDB // MemDB.
memMu sync.RWMutex memMu sync.RWMutex
mem *memdb.DB mem *memdb.DB
frozenMem *memdb.DB frozenMem *memdb.DB
@ -42,11 +43,11 @@ type DB struct {
frozenJournalFile storage.File frozenJournalFile storage.File
frozenSeq uint64 frozenSeq uint64
// Snapshot // Snapshot.
snapsMu sync.Mutex snapsMu sync.Mutex
snapsRoot snapshotElement snapsRoot snapshotElement
// Write // Write.
writeC chan *Batch writeC chan *Batch
writeMergedC chan bool writeMergedC chan bool
writeLockC chan struct{} writeLockC chan struct{}
@ -54,7 +55,7 @@ type DB struct {
journalC chan *Batch journalC chan *Batch
journalAckC chan error journalAckC chan error
// Compaction // Compaction.
tcompCmdC chan cCmd tcompCmdC chan cCmd
tcompPauseC chan chan<- struct{} tcompPauseC chan chan<- struct{}
tcompTriggerC chan struct{} tcompTriggerC chan struct{}
@ -64,7 +65,7 @@ type DB struct {
compErrSetC chan error compErrSetC chan error
compStats [kNumLevels]cStats compStats [kNumLevels]cStats
// Close // Close.
closeW sync.WaitGroup closeW sync.WaitGroup
closeC chan struct{} closeC chan struct{}
closed uint32 closed uint32
@ -135,9 +136,10 @@ func openDB(s *session) (*DB, error) {
// detected in the DB. Corrupted DB can be recovered with Recover // detected in the DB. Corrupted DB can be recovered with Recover
// function. // function.
// //
// The returned DB instance is goroutine-safe.
// The DB must be closed after use, by calling Close method. // The DB must be closed after use, by calling Close method.
func Open(p storage.Storage, o *opt.Options) (db *DB, err error) { func Open(stor storage.Storage, o *opt.Options) (db *DB, err error) {
s, err := newSession(p, o) s, err := newSession(stor, o)
if err != nil { if err != nil {
return return
} }
@ -177,6 +179,7 @@ func Open(p storage.Storage, o *opt.Options) (db *DB, err error) {
// detected in the DB. Corrupted DB can be recovered with Recover // detected in the DB. Corrupted DB can be recovered with Recover
// function. // function.
// //
// The returned DB instance is goroutine-safe.
// The DB must be closed after use, by calling Close method. // The DB must be closed after use, by calling Close method.
func OpenFile(path string, o *opt.Options) (db *DB, err error) { func OpenFile(path string, o *opt.Options) (db *DB, err error) {
stor, err := storage.OpenFile(path) stor, err := storage.OpenFile(path)
@ -197,9 +200,10 @@ func OpenFile(path string, o *opt.Options) (db *DB, err error) {
// The DB must already exist or it will returns an error. // The DB must already exist or it will returns an error.
// Also, Recover will ignore ErrorIfMissing and ErrorIfExist options. // Also, Recover will ignore ErrorIfMissing and ErrorIfExist options.
// //
// The returned DB instance is goroutine-safe.
// The DB must be closed after use, by calling Close method. // The DB must be closed after use, by calling Close method.
func Recover(p storage.Storage, o *opt.Options) (db *DB, err error) { func Recover(stor storage.Storage, o *opt.Options) (db *DB, err error) {
s, err := newSession(p, o) s, err := newSession(stor, o)
if err != nil { if err != nil {
return return
} }
@ -225,6 +229,7 @@ func Recover(p storage.Storage, o *opt.Options) (db *DB, err error) {
// RecoverFile uses standard file-system backed storage implementation as desribed // RecoverFile uses standard file-system backed storage implementation as desribed
// in the leveldb/storage package. // in the leveldb/storage package.
// //
// The returned DB instance is goroutine-safe.
// The DB must be closed after use, by calling Close method. // The DB must be closed after use, by calling Close method.
func RecoverFile(path string, o *opt.Options) (db *DB, err error) { func RecoverFile(path string, o *opt.Options) (db *DB, err error) {
stor, err := storage.OpenFile(path) stor, err := storage.OpenFile(path)
@ -241,12 +246,13 @@ func RecoverFile(path string, o *opt.Options) (db *DB, err error) {
} }
func recoverTable(s *session, o *opt.Options) error { func recoverTable(s *session, o *opt.Options) error {
ff0, err := s.getFiles(storage.TypeTable) // Get all tables and sort it by file number.
tableFiles_, err := s.getFiles(storage.TypeTable)
if err != nil { if err != nil {
return err return err
} }
ff1 := files(ff0) tableFiles := files(tableFiles_)
ff1.sort() tableFiles.sort()
var mSeq uint64 var mSeq uint64
var good, corrupted int var good, corrupted int
@ -264,8 +270,9 @@ func recoverTable(s *session, o *opt.Options) error {
tmp = nil tmp = nil
} }
}() }()
// Copy entries.
tw := table.NewWriter(writer, o) tw := table.NewWriter(writer, o)
// Copy records.
for iter.Next() { for iter.Next() {
key := iter.Key() key := iter.Key()
if validIkey(key) { if validIkey(key) {
@ -297,20 +304,23 @@ func recoverTable(s *session, o *opt.Options) error {
return err return err
} }
defer reader.Close() defer reader.Close()
// Get file size. // Get file size.
size, err := reader.Seek(0, 2) size, err := reader.Seek(0, 2)
if err != nil { if err != nil {
return err return err
} }
var tSeq uint64 var tSeq uint64
var tgood, tcorrupted, blockerr int var tgood, tcorrupted, blockerr int
var min, max []byte var imin, imax []byte
tr := table.NewReader(reader, size, nil, o) tr := table.NewReader(reader, size, nil, o)
iter := tr.NewIterator(nil, nil) iter := tr.NewIterator(nil, nil)
iter.(iterator.ErrorCallbackSetter).SetErrorCallback(func(err error) { iter.(iterator.ErrorCallbackSetter).SetErrorCallback(func(err error) {
s.logf("table@recovery found error @%d %q", file.Num(), err) s.logf("table@recovery found error @%d %q", file.Num(), err)
blockerr++ blockerr++
}) })
// Scan the table. // Scan the table.
for iter.Next() { for iter.Next() {
key := iter.Key() key := iter.Key()
@ -323,16 +333,17 @@ func recoverTable(s *session, o *opt.Options) error {
if seq > tSeq { if seq > tSeq {
tSeq = seq tSeq = seq
} }
if min == nil { if imin == nil {
min = append([]byte{}, key...) imin = append([]byte{}, key...)
} }
max = append(max[:0], key...) imax = append(imax[:0], key...)
} }
if err := iter.Error(); err != nil { if err := iter.Error(); err != nil {
iter.Release() iter.Release()
return err return err
} }
iter.Release() iter.Release()
if tgood > 0 { if tgood > 0 {
if tcorrupted > 0 || blockerr > 0 { if tcorrupted > 0 || blockerr > 0 {
// Rebuild the table. // Rebuild the table.
@ -353,7 +364,7 @@ func recoverTable(s *session, o *opt.Options) error {
mSeq = tSeq mSeq = tSeq
} }
// Add table to level 0. // Add table to level 0.
rec.addTable(0, file.Num(), uint64(size), min, max) rec.addTable(0, file.Num(), uint64(size), imin, imax)
s.logf("table@recovery recovered @%d N·%d C·%d B·%d S·%d Q·%d", file.Num(), tgood, tcorrupted, blockerr, size, tSeq) s.logf("table@recovery recovered @%d N·%d C·%d B·%d S·%d Q·%d", file.Num(), tgood, tcorrupted, blockerr, size, tSeq)
} else { } else {
s.logf("table@recovery unrecoverable @%d C·%d B·%d S·%d", file.Num(), tcorrupted, blockerr, size) s.logf("table@recovery unrecoverable @%d C·%d B·%d S·%d", file.Num(), tcorrupted, blockerr, size)
@ -364,41 +375,56 @@ func recoverTable(s *session, o *opt.Options) error {
return nil return nil
} }
// Recover all tables. // Recover all tables.
if len(ff1) > 0 { if len(tableFiles) > 0 {
s.logf("table@recovery F·%d", len(ff1)) s.logf("table@recovery F·%d", len(tableFiles))
s.markFileNum(ff1[len(ff1)-1].Num())
for _, file := range ff1 { // Mark file number as used.
s.markFileNum(tableFiles[len(tableFiles)-1].Num())
for _, file := range tableFiles {
if err := recoverTable(file); err != nil { if err := recoverTable(file); err != nil {
return err return err
} }
} }
s.logf("table@recovery recovered F·%d N·%d C·%d Q·%d", len(ff1), good, corrupted, mSeq)
s.logf("table@recovery recovered F·%d N·%d C·%d Q·%d", len(tableFiles), good, corrupted, mSeq)
} }
// Set sequence number. // Set sequence number.
rec.setSeq(mSeq + 1) rec.setSeq(mSeq + 1)
// Create new manifest. // Create new manifest.
if err := s.create(); err != nil { if err := s.create(); err != nil {
return err return err
} }
// Commit. // Commit.
return s.commit(rec) return s.commit(rec)
} }
func (d *DB) recoverJournal() error { func (db *DB) recoverJournal() error {
s := d.s // Get all tables and sort it by file number.
journalFiles_, err := db.s.getFiles(storage.TypeJournal)
ff0, err := s.getFiles(storage.TypeJournal)
if err != nil { if err != nil {
return err return err
} }
ff1 := files(ff0) journalFiles := files(journalFiles_)
ff1.sort() journalFiles.sort()
ff2 := make([]storage.File, 0, len(ff1))
for _, file := range ff1 { // Discard older journal.
if file.Num() >= s.stJournalNum || file.Num() == s.stPrevJournalNum { prev := -1
s.markFileNum(file.Num()) for i, file := range journalFiles {
ff2 = append(ff2, file) if file.Num() >= db.s.stJournalNum {
if prev >= 0 {
i--
journalFiles[i] = journalFiles[prev]
}
journalFiles = journalFiles[i:]
break
} else if file.Num() == db.s.stPrevJournalNum {
prev = i
} }
} }
@ -406,38 +432,43 @@ func (d *DB) recoverJournal() error {
var of storage.File var of storage.File
var mem *memdb.DB var mem *memdb.DB
batch := new(Batch) batch := new(Batch)
cm := newCMem(s) cm := newCMem(db.s)
buf := new(util.Buffer) buf := new(util.Buffer)
// Options. // Options.
strict := s.o.GetStrict(opt.StrictJournal) strict := db.s.o.GetStrict(opt.StrictJournal)
checksum := s.o.GetStrict(opt.StrictJournalChecksum) checksum := db.s.o.GetStrict(opt.StrictJournalChecksum)
writeBuffer := s.o.GetWriteBuffer() writeBuffer := db.s.o.GetWriteBuffer()
recoverJournal := func(file storage.File) error { recoverJournal := func(file storage.File) error {
s.logf("journal@recovery recovering @%d", file.Num()) db.logf("journal@recovery recovering @%d", file.Num())
reader, err := file.Open() reader, err := file.Open()
if err != nil { if err != nil {
return err return err
} }
defer reader.Close() defer reader.Close()
// Create/reset journal reader instance.
if jr == nil { if jr == nil {
jr = journal.NewReader(reader, dropper{s, file}, strict, checksum) jr = journal.NewReader(reader, dropper{db.s, file}, strict, checksum)
} else { } else {
jr.Reset(reader, dropper{s, file}, strict, checksum) jr.Reset(reader, dropper{db.s, file}, strict, checksum)
} }
// Flush memdb and remove obsolete journal file.
if of != nil { if of != nil {
if mem.Len() > 0 { if mem.Len() > 0 {
if err := cm.flush(mem, 0); err != nil { if err := cm.flush(mem, 0); err != nil {
return err return err
} }
} }
if err := cm.commit(file.Num(), d.seq); err != nil { if err := cm.commit(file.Num(), db.seq); err != nil {
return err return err
} }
cm.reset() cm.reset()
of.Remove() of.Remove()
of = nil of = nil
} }
// Reset memdb.
// Replay journal to memdb.
mem.Reset() mem.Reset()
for { for {
r, err := jr.Next() r, err := jr.Next()
@ -447,6 +478,7 @@ func (d *DB) recoverJournal() error {
} }
return err return err
} }
buf.Reset() buf.Reset()
if _, err := buf.ReadFrom(r); err != nil { if _, err := buf.ReadFrom(r); err != nil {
if strict { if strict {
@ -460,28 +492,37 @@ func (d *DB) recoverJournal() error {
if err := batch.memReplay(mem); err != nil { if err := batch.memReplay(mem); err != nil {
return err return err
} }
d.seq = batch.seq + uint64(batch.len())
// Save sequence number.
db.seq = batch.seq + uint64(batch.len())
// Flush it if large enough.
if mem.Size() >= writeBuffer { if mem.Size() >= writeBuffer {
// Large enough, flush it.
if err := cm.flush(mem, 0); err != nil { if err := cm.flush(mem, 0); err != nil {
return err return err
} }
// Reset memdb.
mem.Reset() mem.Reset()
} }
} }
of = file of = file
return nil return nil
} }
// Recover all journals. // Recover all journals.
if len(ff2) > 0 { if len(journalFiles) > 0 {
s.logf("journal@recovery F·%d", len(ff2)) db.logf("journal@recovery F·%d", len(journalFiles))
mem = memdb.New(s.icmp, writeBuffer)
for _, file := range ff2 { // Mark file number as used.
db.s.markFileNum(journalFiles[len(journalFiles)-1].Num())
mem = memdb.New(db.s.icmp, writeBuffer)
for _, file := range journalFiles {
if err := recoverJournal(file); err != nil { if err := recoverJournal(file); err != nil {
return err return err
} }
} }
// Flush the last journal. // Flush the last journal.
if mem.Len() > 0 { if mem.Len() > 0 {
if err := cm.flush(mem, 0); err != nil { if err := cm.flush(mem, 0); err != nil {
@ -489,35 +530,43 @@ func (d *DB) recoverJournal() error {
} }
} }
} }
// Create a new journal. // Create a new journal.
if _, err := d.newMem(0); err != nil { if _, err := db.newMem(0); err != nil {
return err return err
} }
// Commit. // Commit.
if err := cm.commit(d.journalFile.Num(), d.seq); err != nil { if err := cm.commit(db.journalFile.Num(), db.seq); err != nil {
// Close journal.
if db.journal != nil {
db.journal.Close()
db.journalWriter.Close()
}
return err return err
} }
// Remove the last journal.
// Remove the last obsolete journal file.
if of != nil { if of != nil {
of.Remove() of.Remove()
} }
return nil return nil
} }
func (d *DB) get(key []byte, seq uint64, ro *opt.ReadOptions) (value []byte, err error) { func (db *DB) get(key []byte, seq uint64, ro *opt.ReadOptions) (value []byte, err error) {
s := d.s
ikey := newIKey(key, seq, tSeek) ikey := newIKey(key, seq, tSeek)
em, fm := d.getMems() em, fm := db.getMems()
for _, m := range [...]*memdb.DB{em, fm} { for _, m := range [...]*memdb.DB{em, fm} {
if m == nil { if m == nil {
continue continue
} }
mk, mv, me := m.Find(ikey) mk, mv, me := m.Find(ikey)
if me == nil { if me == nil {
ukey, _, t, ok := parseIkey(mk) ukey, _, t, ok := parseIkey(mk)
if ok && s.icmp.uCompare(ukey, key) == 0 { if ok && db.s.icmp.uCompare(ukey, key) == 0 {
if t == tDel { if t == tDel {
return nil, ErrNotFound return nil, ErrNotFound
} }
@ -528,12 +577,12 @@ func (d *DB) get(key []byte, seq uint64, ro *opt.ReadOptions) (value []byte, err
} }
} }
v := s.version() v := db.s.version()
value, cSched, err := v.get(ikey, ro) value, cSched, err := v.get(ikey, ro)
v.release() v.release()
if cSched { if cSched {
// Trigger table compaction. // Trigger table compaction.
d.compTrigger(d.tcompTriggerC) db.compTrigger(db.tcompTriggerC)
} }
return return
} }
@ -543,13 +592,13 @@ func (d *DB) get(key []byte, seq uint64, ro *opt.ReadOptions) (value []byte, err
// //
// The caller should not modify the contents of the returned slice, but // The caller should not modify the contents of the returned slice, but
// it is safe to modify the contents of the argument after Get returns. // it is safe to modify the contents of the argument after Get returns.
func (d *DB) Get(key []byte, ro *opt.ReadOptions) (value []byte, err error) { func (db *DB) Get(key []byte, ro *opt.ReadOptions) (value []byte, err error) {
err = d.ok() err = db.ok()
if err != nil { if err != nil {
return return
} }
return d.get(key, d.getSeq(), ro) return db.get(key, db.getSeq(), ro)
} }
// NewIterator returns an iterator for the latest snapshot of the // NewIterator returns an iterator for the latest snapshot of the
@ -568,14 +617,14 @@ func (d *DB) Get(key []byte, ro *opt.ReadOptions) (value []byte, err error) {
// The iterator must be released after use, by calling Release method. // The iterator must be released after use, by calling Release method.
// //
// Also read Iterator documentation of the leveldb/iterator package. // Also read Iterator documentation of the leveldb/iterator package.
func (d *DB) NewIterator(slice *util.Range, ro *opt.ReadOptions) iterator.Iterator { func (db *DB) NewIterator(slice *util.Range, ro *opt.ReadOptions) iterator.Iterator {
if err := d.ok(); err != nil { if err := db.ok(); err != nil {
return iterator.NewEmptyIterator(err) return iterator.NewEmptyIterator(err)
} }
p := d.newSnapshot() snap := db.newSnapshot()
defer p.Release() defer snap.Release()
return p.NewIterator(slice, ro) return snap.NewIterator(slice, ro)
} }
// GetSnapshot returns a latest snapshot of the underlying DB. A snapshot // GetSnapshot returns a latest snapshot of the underlying DB. A snapshot
@ -583,12 +632,12 @@ func (d *DB) NewIterator(slice *util.Range, ro *opt.ReadOptions) iterator.Iterat
// content of snapshot are guaranteed to be consistent. // content of snapshot are guaranteed to be consistent.
// //
// The snapshot must be released after use, by calling Release method. // The snapshot must be released after use, by calling Release method.
func (d *DB) GetSnapshot() (*Snapshot, error) { func (db *DB) GetSnapshot() (*Snapshot, error) {
if err := d.ok(); err != nil { if err := db.ok(); err != nil {
return nil, err return nil, err
} }
return d.newSnapshot(), nil return db.newSnapshot(), nil
} }
// GetProperty returns value of the given property name. // GetProperty returns value of the given property name.
@ -600,8 +649,8 @@ func (d *DB) GetSnapshot() (*Snapshot, error) {
// Returns statistics of the underlying DB. // Returns statistics of the underlying DB.
// leveldb.sstables // leveldb.sstables
// Returns sstables list for each level. // Returns sstables list for each level.
func (d *DB) GetProperty(name string) (value string, err error) { func (db *DB) GetProperty(name string) (value string, err error) {
err = d.ok() err = db.ok()
if err != nil { if err != nil {
return return
} }
@ -610,11 +659,9 @@ func (d *DB) GetProperty(name string) (value string, err error) {
if !strings.HasPrefix(name, prefix) { if !strings.HasPrefix(name, prefix) {
return "", errors.New("leveldb: GetProperty: unknown property: " + name) return "", errors.New("leveldb: GetProperty: unknown property: " + name)
} }
p := name[len(prefix):] p := name[len(prefix):]
s := d.s v := db.s.version()
v := s.version()
defer v.release() defer v.release()
switch { switch {
@ -631,20 +678,20 @@ func (d *DB) GetProperty(name string) (value string, err error) {
value = "Compactions\n" + value = "Compactions\n" +
" Level | Tables | Size(MB) | Time(sec) | Read(MB) | Write(MB)\n" + " Level | Tables | Size(MB) | Time(sec) | Read(MB) | Write(MB)\n" +
"-------+------------+---------------+---------------+---------------+---------------\n" "-------+------------+---------------+---------------+---------------+---------------\n"
for level, tt := range v.tables { for level, tables := range v.tables {
duration, read, write := d.compStats[level].get() duration, read, write := db.compStats[level].get()
if len(tt) == 0 && duration == 0 { if len(tables) == 0 && duration == 0 {
continue continue
} }
value += fmt.Sprintf(" %3d | %10d | %13.5f | %13.5f | %13.5f | %13.5f\n", value += fmt.Sprintf(" %3d | %10d | %13.5f | %13.5f | %13.5f | %13.5f\n",
level, len(tt), float64(tt.size())/1048576.0, duration.Seconds(), level, len(tables), float64(tables.size())/1048576.0, duration.Seconds(),
float64(read)/1048576.0, float64(write)/1048576.0) float64(read)/1048576.0, float64(write)/1048576.0)
} }
case p == "sstables": case p == "sstables":
for level, tt := range v.tables { for level, tables := range v.tables {
value += fmt.Sprintf("--- level %d ---\n", level) value += fmt.Sprintf("--- level %d ---\n", level)
for _, t := range tt { for _, t := range tables {
value += fmt.Sprintf("%d:%d[%q .. %q]\n", t.file.Num(), t.size, t.min, t.max) value += fmt.Sprintf("%d:%d[%q .. %q]\n", t.file.Num(), t.size, t.imin, t.imax)
} }
} }
default: default:
@ -660,23 +707,23 @@ func (d *DB) GetProperty(name string) (value string, err error) {
// data compresses by a factor of ten, the returned sizes will be one-tenth // data compresses by a factor of ten, the returned sizes will be one-tenth
// the size of the corresponding user data size. // the size of the corresponding user data size.
// The results may not include the sizes of recently written data. // The results may not include the sizes of recently written data.
func (d *DB) SizeOf(ranges []util.Range) (Sizes, error) { func (db *DB) SizeOf(ranges []util.Range) (Sizes, error) {
if err := d.ok(); err != nil { if err := db.ok(); err != nil {
return nil, err return nil, err
} }
v := d.s.version() v := db.s.version()
defer v.release() defer v.release()
sizes := make(Sizes, 0, len(ranges)) sizes := make(Sizes, 0, len(ranges))
for _, r := range ranges { for _, r := range ranges {
min := newIKey(r.Start, kMaxSeq, tSeek) imin := newIKey(r.Start, kMaxSeq, tSeek)
max := newIKey(r.Limit, kMaxSeq, tSeek) imax := newIKey(r.Limit, kMaxSeq, tSeek)
start, err := v.offsetOf(min) start, err := v.offsetOf(imin)
if err != nil { if err != nil {
return nil, err return nil, err
} }
limit, err := v.offsetOf(max) limit, err := v.offsetOf(imax)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -690,61 +737,63 @@ func (d *DB) SizeOf(ranges []util.Range) (Sizes, error) {
return sizes, nil return sizes, nil
} }
// Close closes the DB. This will also releases any outstanding snapshot. // Close closes the DB. This will also releases any outstanding snapshot and
// abort any in-flight compaction.
// //
// It is not safe to close a DB until all outstanding iterators are released. // It is not safe to close a DB until all outstanding iterators are released.
// It is valid to call Close multiple times. Other methods should not be // It is valid to call Close multiple times. Other methods should not be
// called after the DB has been closed. // called after the DB has been closed.
func (d *DB) Close() error { func (db *DB) Close() error {
if !d.setClosed() { if !db.setClosed() {
return ErrClosed return ErrClosed
} }
s := d.s
start := time.Now() start := time.Now()
s.log("db@close closing") db.log("db@close closing")
// Clear the finalizer. // Clear the finalizer.
runtime.SetFinalizer(d, nil) runtime.SetFinalizer(db, nil)
// Get compaction error. // Get compaction error.
var err error var err error
select { select {
case err = <-d.compErrC: case err = <-db.compErrC:
default: default:
} }
close(d.closeC) close(db.closeC)
// Wait for the close WaitGroup. // Wait for the close WaitGroup.
d.closeW.Wait() db.closeW.Wait()
// Close journal. // Close journal.
if d.journal != nil { db.writeLockC <- struct{}{}
d.journal.Close() if db.journal != nil {
d.journalWriter.Close() db.journal.Close()
db.journalWriter.Close()
} }
// Close session. // Close session.
s.close() db.s.close()
s.logf("db@close done T·%v", time.Since(start)) db.logf("db@close done T·%v", time.Since(start))
s.release() db.s.release()
if d.closer != nil { if db.closer != nil {
if err1 := d.closer.Close(); err == nil { if err1 := db.closer.Close(); err == nil {
err = err1 err = err1
} }
} }
d.s = nil // NIL'ing pointers.
d.mem = nil db.s = nil
d.frozenMem = nil db.mem = nil
d.journal = nil db.frozenMem = nil
d.journalWriter = nil db.journal = nil
d.journalFile = nil db.journalWriter = nil
d.frozenJournalFile = nil db.journalFile = nil
d.snapsRoot = snapshotElement{} db.frozenJournalFile = nil
d.closer = nil db.snapsRoot = snapshotElement{}
db.closer = nil
return err return err
} }

View File

@ -74,7 +74,7 @@ func newCMem(s *session) *cMem {
func (c *cMem) flush(mem *memdb.DB, level int) error { func (c *cMem) flush(mem *memdb.DB, level int) error {
s := c.s s := c.s
// Write memdb to table // Write memdb to table.
iter := mem.NewIterator(nil) iter := mem.NewIterator(nil)
defer iter.Release() defer iter.Release()
t, n, err := s.tops.createFrom(iter) t, n, err := s.tops.createFrom(iter)
@ -82,12 +82,13 @@ func (c *cMem) flush(mem *memdb.DB, level int) error {
return err return err
} }
// Pick level.
if level < 0 { if level < 0 {
level = s.version_NB().pickLevel(t.min.ukey(), t.max.ukey()) level = s.version_NB().pickLevel(t.imin.ukey(), t.imax.ukey())
} }
c.rec.addTableFile(level, t) c.rec.addTableFile(level, t)
s.logf("mem@flush created L%d@%d N·%d S·%s %q:%q", level, t.file.Num(), n, shortenb(int(t.size)), t.min, t.max) s.logf("mem@flush created L%d@%d N·%d S·%s %q:%q", level, t.file.Num(), n, shortenb(int(t.size)), t.imin, t.imax)
c.level = level c.level = level
return nil return nil
@ -100,33 +101,34 @@ func (c *cMem) reset() {
func (c *cMem) commit(journal, seq uint64) error { func (c *cMem) commit(journal, seq uint64) error {
c.rec.setJournalNum(journal) c.rec.setJournalNum(journal)
c.rec.setSeq(seq) c.rec.setSeq(seq)
// Commit changes
// Commit changes.
return c.s.commit(c.rec) return c.s.commit(c.rec)
} }
func (d *DB) compactionError() { func (db *DB) compactionError() {
var err error var err error
noerr: noerr:
for { for {
select { select {
case _, _ = <-d.closeC: case err = <-db.compErrSetC:
return
case err = <-d.compErrSetC:
if err != nil { if err != nil {
goto haserr goto haserr
} }
case _, _ = <-db.closeC:
return
} }
} }
haserr: haserr:
for { for {
select { select {
case _, _ = <-d.closeC: case db.compErrC <- err:
return case err = <-db.compErrSetC:
case err = <-d.compErrSetC:
if err == nil { if err == nil {
goto noerr goto noerr
} }
case d.compErrC <- err: case _, _ = <-db.closeC:
return
} }
} }
} }
@ -137,18 +139,18 @@ func (cnt *compactionTransactCounter) incr() {
*cnt++ *cnt++
} }
func (d *DB) compactionTransact(name string, exec func(cnt *compactionTransactCounter) error, rollback func() error) { func (db *DB) compactionTransact(name string, exec func(cnt *compactionTransactCounter) error, rollback func() error) {
s := d.s
defer func() { defer func() {
if x := recover(); x != nil { if x := recover(); x != nil {
if x == errCompactionTransactExiting && rollback != nil { if x == errCompactionTransactExiting && rollback != nil {
if err := rollback(); err != nil { if err := rollback(); err != nil {
s.logf("%s rollback error %q", name, err) db.logf("%s rollback error %q", name, err)
} }
} }
panic(x) panic(x)
} }
}() }()
const ( const (
backoffMin = 1 * time.Second backoffMin = 1 * time.Second
backoffMax = 8 * time.Second backoffMax = 8 * time.Second
@ -159,11 +161,11 @@ func (d *DB) compactionTransact(name string, exec func(cnt *compactionTransactCo
lastCnt := compactionTransactCounter(0) lastCnt := compactionTransactCounter(0)
for n := 0; ; n++ { for n := 0; ; n++ {
// Check wether the DB is closed. // Check wether the DB is closed.
if d.isClosed() { if db.isClosed() {
s.logf("%s exiting", name) db.logf("%s exiting", name)
d.compactionExitTransact() db.compactionExitTransact()
} else if n > 0 { } else if n > 0 {
s.logf("%s retrying N·%d", name, n) db.logf("%s retrying N·%d", name, n)
} }
// Execute. // Execute.
@ -172,15 +174,15 @@ func (d *DB) compactionTransact(name string, exec func(cnt *compactionTransactCo
// Set compaction error status. // Set compaction error status.
select { select {
case d.compErrSetC <- err: case db.compErrSetC <- err:
case _, _ = <-d.closeC: case _, _ = <-db.closeC:
s.logf("%s exiting", name) db.logf("%s exiting", name)
d.compactionExitTransact() db.compactionExitTransact()
} }
if err == nil { if err == nil {
return return
} }
s.logf("%s error I·%d %q", name, cnt, err) db.logf("%s error I·%d %q", name, cnt, err)
// Reset backoff duration if counter is advancing. // Reset backoff duration if counter is advancing.
if cnt > lastCnt { if cnt > lastCnt {
@ -198,53 +200,52 @@ func (d *DB) compactionTransact(name string, exec func(cnt *compactionTransactCo
} }
select { select {
case <-backoffT.C: case <-backoffT.C:
case _, _ = <-d.closeC: case _, _ = <-db.closeC:
s.logf("%s exiting", name) db.logf("%s exiting", name)
d.compactionExitTransact() db.compactionExitTransact()
} }
} }
} }
func (d *DB) compactionExitTransact() { func (db *DB) compactionExitTransact() {
panic(errCompactionTransactExiting) panic(errCompactionTransactExiting)
} }
func (d *DB) memCompaction() { func (db *DB) memCompaction() {
mem := d.getFrozenMem() mem := db.getFrozenMem()
if mem == nil { if mem == nil {
return return
} }
s := d.s c := newCMem(db.s)
c := newCMem(s)
stats := new(cStatsStaging) stats := new(cStatsStaging)
s.logf("mem@flush N·%d S·%s", mem.Len(), shortenb(mem.Size())) db.logf("mem@flush N·%d S·%s", mem.Len(), shortenb(mem.Size()))
// Don't compact empty memdb. // Don't compact empty memdb.
if mem.Len() == 0 { if mem.Len() == 0 {
s.logf("mem@flush skipping") db.logf("mem@flush skipping")
// drop frozen mem // drop frozen mem
d.dropFrozenMem() db.dropFrozenMem()
return return
} }
// Pause table compaction. // Pause table compaction.
ch := make(chan struct{}) ch := make(chan struct{})
select { select {
case d.tcompPauseC <- (chan<- struct{})(ch): case db.tcompPauseC <- (chan<- struct{})(ch):
case _, _ = <-d.closeC: case _, _ = <-db.closeC:
return return
} }
d.compactionTransact("mem@flush", func(cnt *compactionTransactCounter) (err error) { db.compactionTransact("mem@flush", func(cnt *compactionTransactCounter) (err error) {
stats.startTimer() stats.startTimer()
defer stats.stopTimer() defer stats.stopTimer()
return c.flush(mem, -1) return c.flush(mem, -1)
}, func() error { }, func() error {
for _, r := range c.rec.addedTables { for _, r := range c.rec.addedTables {
s.logf("mem@flush rollback @%d", r.num) db.logf("mem@flush rollback @%d", r.num)
f := s.getTableFile(r.num) f := db.s.getTableFile(r.num)
if err := f.Remove(); err != nil { if err := f.Remove(); err != nil {
return err return err
} }
@ -252,61 +253,59 @@ func (d *DB) memCompaction() {
return nil return nil
}) })
d.compactionTransact("mem@commit", func(cnt *compactionTransactCounter) (err error) { db.compactionTransact("mem@commit", func(cnt *compactionTransactCounter) (err error) {
stats.startTimer() stats.startTimer()
defer stats.stopTimer() defer stats.stopTimer()
return c.commit(d.journalFile.Num(), d.frozenSeq) return c.commit(db.journalFile.Num(), db.frozenSeq)
}, nil) }, nil)
s.logf("mem@flush commited F·%d T·%v", len(c.rec.addedTables), stats.duration) db.logf("mem@flush commited F·%d T·%v", len(c.rec.addedTables), stats.duration)
for _, r := range c.rec.addedTables { for _, r := range c.rec.addedTables {
stats.write += r.size stats.write += r.size
} }
d.compStats[c.level].add(stats) db.compStats[c.level].add(stats)
// Drop frozen mem. // Drop frozen mem.
d.dropFrozenMem() db.dropFrozenMem()
// Resume table compaction. // Resume table compaction.
select { select {
case <-ch: case <-ch:
case _, _ = <-d.closeC: case _, _ = <-db.closeC:
return return
} }
// Trigger table compaction. // Trigger table compaction.
d.compTrigger(d.mcompTriggerC) db.compTrigger(db.mcompTriggerC)
} }
func (d *DB) tableCompaction(c *compaction, noTrivial bool) { func (db *DB) tableCompaction(c *compaction, noTrivial bool) {
s := d.s
rec := new(sessionRecord) rec := new(sessionRecord)
rec.addCompactionPointer(c.level, c.max) rec.addCompactionPointer(c.level, c.imax)
if !noTrivial && c.trivial() { if !noTrivial && c.trivial() {
t := c.tables[0][0] t := c.tables[0][0]
s.logf("table@move L%d@%d -> L%d", c.level, t.file.Num(), c.level+1) db.logf("table@move L%d@%d -> L%d", c.level, t.file.Num(), c.level+1)
rec.deleteTable(c.level, t.file.Num()) rec.deleteTable(c.level, t.file.Num())
rec.addTableFile(c.level+1, t) rec.addTableFile(c.level+1, t)
d.compactionTransact("table@move", func(cnt *compactionTransactCounter) (err error) { db.compactionTransact("table@move", func(cnt *compactionTransactCounter) (err error) {
return s.commit(rec) return db.s.commit(rec)
}, nil) }, nil)
return return
} }
var stats [2]cStatsStaging var stats [2]cStatsStaging
for i, tt := range c.tables { for i, tables := range c.tables {
for _, t := range tt { for _, t := range tables {
stats[i].read += t.size stats[i].read += t.size
// Insert deleted tables into record // Insert deleted tables into record
rec.deleteTable(c.level+i, t.file.Num()) rec.deleteTable(c.level+i, t.file.Num())
} }
} }
sourceSize := int(stats[0].read + stats[1].read) sourceSize := int(stats[0].read + stats[1].read)
minSeq := d.minSeq() minSeq := db.minSeq()
s.logf("table@compaction L%d·%d -> L%d·%d S·%s Q·%d", c.level, len(c.tables[0]), c.level+1, len(c.tables[1]), shortenb(sourceSize), minSeq) db.logf("table@compaction L%d·%d -> L%d·%d S·%s Q·%d", c.level, len(c.tables[0]), c.level+1, len(c.tables[1]), shortenb(sourceSize), minSeq)
var snapUkey []byte var snapUkey []byte
var snapHasUkey bool var snapHasUkey bool
@ -314,7 +313,7 @@ func (d *DB) tableCompaction(c *compaction, noTrivial bool) {
var snapIter int var snapIter int
var snapDropCnt int var snapDropCnt int
var dropCnt int var dropCnt int
d.compactionTransact("table@build", func(cnt *compactionTransactCounter) (err error) { db.compactionTransact("table@build", func(cnt *compactionTransactCounter) (err error) {
ukey := append([]byte{}, snapUkey...) ukey := append([]byte{}, snapUkey...)
hasUkey := snapHasUkey hasUkey := snapHasUkey
lseq := snapSeq lseq := snapSeq
@ -329,7 +328,7 @@ func (d *DB) tableCompaction(c *compaction, noTrivial bool) {
} }
rec.addTableFile(c.level+1, t) rec.addTableFile(c.level+1, t)
stats[1].write += t.size stats[1].write += t.size
s.logf("table@build created L%d@%d N·%d S·%s %q:%q", c.level+1, t.file.Num(), tw.tw.EntriesLen(), shortenb(int(t.size)), t.min, t.max) db.logf("table@build created L%d@%d N·%d S·%s %q:%q", c.level+1, t.file.Num(), tw.tw.EntriesLen(), shortenb(int(t.size)), t.imin, t.imax)
return nil return nil
} }
@ -353,9 +352,9 @@ func (d *DB) tableCompaction(c *compaction, noTrivial bool) {
continue continue
} }
key := iKey(iter.Key()) ikey := iKey(iter.Key())
if c.shouldStopBefore(key) && tw != nil { if c.shouldStopBefore(ikey) && tw != nil {
err = finish() err = finish()
if err != nil { if err != nil {
return return
@ -375,15 +374,15 @@ func (d *DB) tableCompaction(c *compaction, noTrivial bool) {
snapSched = false snapSched = false
} }
if seq, t, ok := key.parseNum(); !ok { if seq, vt, ok := ikey.parseNum(); !ok {
// Don't drop error keys // Don't drop error keys
ukey = ukey[:0] ukey = ukey[:0]
hasUkey = false hasUkey = false
lseq = kMaxSeq lseq = kMaxSeq
} else { } else {
if !hasUkey || s.icmp.uCompare(key.ukey(), ukey) != 0 { if !hasUkey || db.s.icmp.uCompare(ikey.ukey(), ukey) != 0 {
// First occurrence of this user key // First occurrence of this user key
ukey = append(ukey[:0], key.ukey()...) ukey = append(ukey[:0], ikey.ukey()...)
hasUkey = true hasUkey = true
lseq = kMaxSeq lseq = kMaxSeq
} }
@ -392,7 +391,7 @@ func (d *DB) tableCompaction(c *compaction, noTrivial bool) {
if lseq <= minSeq { if lseq <= minSeq {
// Dropped because newer entry for same user key exist // Dropped because newer entry for same user key exist
drop = true // (A) drop = true // (A)
} else if t == tDel && seq <= minSeq && c.isBaseLevelForKey(ukey) { } else if vt == tDel && seq <= minSeq && c.baseLevelForKey(ukey) {
// For this user key: // For this user key:
// (1) there is no data in higher levels // (1) there is no data in higher levels
// (2) data in lower levels will have larger seq numbers // (2) data in lower levels will have larger seq numbers
@ -414,22 +413,22 @@ func (d *DB) tableCompaction(c *compaction, noTrivial bool) {
if tw == nil { if tw == nil {
// Check for pause event. // Check for pause event.
select { select {
case ch := <-d.tcompPauseC: case ch := <-db.tcompPauseC:
d.pauseCompaction(ch) db.pauseCompaction(ch)
case _, _ = <-d.closeC: case _, _ = <-db.closeC:
d.compactionExitTransact() db.compactionExitTransact()
default: default:
} }
// Create new table. // Create new table.
tw, err = s.tops.create() tw, err = db.s.tops.create()
if err != nil { if err != nil {
return return
} }
} }
// Write key/value into table // Write key/value into table
err = tw.add(key, iter.Value()) err = tw.append(ikey, iter.Value())
if err != nil { if err != nil {
return return
} }
@ -461,8 +460,8 @@ func (d *DB) tableCompaction(c *compaction, noTrivial bool) {
return return
}, func() error { }, func() error {
for _, r := range rec.addedTables { for _, r := range rec.addedTables {
s.logf("table@build rollback @%d", r.num) db.logf("table@build rollback @%d", r.num)
f := s.getTableFile(r.num) f := db.s.getTableFile(r.num)
if err := f.Remove(); err != nil { if err := f.Remove(); err != nil {
return err return err
} }
@ -471,60 +470,61 @@ func (d *DB) tableCompaction(c *compaction, noTrivial bool) {
}) })
// Commit changes // Commit changes
d.compactionTransact("table@commit", func(cnt *compactionTransactCounter) (err error) { db.compactionTransact("table@commit", func(cnt *compactionTransactCounter) (err error) {
stats[1].startTimer() stats[1].startTimer()
defer stats[1].stopTimer() defer stats[1].stopTimer()
return s.commit(rec) return db.s.commit(rec)
}, nil) }, nil)
resultSize := int(int(stats[1].write)) resultSize := int(stats[1].write)
s.logf("table@compaction commited F%s S%s D·%d T·%v", sint(len(rec.addedTables)-len(rec.deletedTables)), sshortenb(resultSize-sourceSize), dropCnt, stats[1].duration) db.logf("table@compaction commited F%s S%s D·%d T·%v", sint(len(rec.addedTables)-len(rec.deletedTables)), sshortenb(resultSize-sourceSize), dropCnt, stats[1].duration)
// Save compaction stats // Save compaction stats
for i := range stats { for i := range stats {
d.compStats[c.level+1].add(&stats[i]) db.compStats[c.level+1].add(&stats[i])
} }
} }
func (d *DB) tableRangeCompaction(level int, min, max []byte) { func (db *DB) tableRangeCompaction(level int, umin, umax []byte) {
s := d.s db.logf("table@compaction range L%d %q:%q", level, umin, umax)
s.logf("table@compaction range L%d %q:%q", level, min, max)
if level >= 0 { if level >= 0 {
if c := s.getCompactionRange(level, min, max); c != nil { if c := db.s.getCompactionRange(level, umin, umax); c != nil {
d.tableCompaction(c, true) db.tableCompaction(c, true)
} }
} else { } else {
v := s.version_NB() v := db.s.version_NB()
m := 1 m := 1
for i, t := range v.tables[1:] { for i, t := range v.tables[1:] {
if t.isOverlaps(min, max, true, s.icmp) { if t.overlaps(db.s.icmp, umin, umax, false) {
m = i + 1 m = i + 1
} }
} }
for level := 0; level < m; level++ { for level := 0; level < m; level++ {
if c := s.getCompactionRange(level, min, max); c != nil { if c := db.s.getCompactionRange(level, umin, umax); c != nil {
d.tableCompaction(c, true) db.tableCompaction(c, true)
} }
} }
} }
} }
func (d *DB) tableAutoCompaction() { func (db *DB) tableAutoCompaction() {
if c := d.s.pickCompaction(); c != nil { if c := db.s.pickCompaction(); c != nil {
d.tableCompaction(c, false) db.tableCompaction(c, false)
} }
} }
func (d *DB) tableNeedCompaction() bool { func (db *DB) tableNeedCompaction() bool {
return d.s.version_NB().needCompaction() return db.s.version_NB().needCompaction()
} }
func (d *DB) pauseCompaction(ch chan<- struct{}) { func (db *DB) pauseCompaction(ch chan<- struct{}) {
select { select {
case ch <- struct{}{}: case ch <- struct{}{}:
case _, _ = <-d.closeC: case _, _ = <-db.closeC:
d.compactionExitTransact() db.compactionExitTransact()
} }
} }
@ -555,48 +555,48 @@ func (r cRange) ack(err error) {
} }
} }
func (d *DB) compSendIdle(compC chan<- cCmd) error { func (db *DB) compSendIdle(compC chan<- cCmd) error {
ch := make(chan error) ch := make(chan error)
defer close(ch) defer close(ch)
// Send cmd. // Send cmd.
select { select {
case compC <- cIdle{ch}: case compC <- cIdle{ch}:
case err := <-d.compErrC: case err := <-db.compErrC:
return err return err
case _, _ = <-d.closeC: case _, _ = <-db.closeC:
return ErrClosed return ErrClosed
} }
// Wait cmd. // Wait cmd.
return <-ch return <-ch
} }
func (d *DB) compSendRange(compC chan<- cCmd, level int, min, max []byte) (err error) { func (db *DB) compSendRange(compC chan<- cCmd, level int, min, max []byte) (err error) {
ch := make(chan error) ch := make(chan error)
defer close(ch) defer close(ch)
// Send cmd. // Send cmd.
select { select {
case compC <- cRange{level, min, max, ch}: case compC <- cRange{level, min, max, ch}:
case err := <-d.compErrC: case err := <-db.compErrC:
return err return err
case _, _ = <-d.closeC: case _, _ = <-db.closeC:
return ErrClosed return ErrClosed
} }
// Wait cmd. // Wait cmd.
select { select {
case err = <-d.compErrC: case err = <-db.compErrC:
case err = <-ch: case err = <-ch:
} }
return err return err
} }
func (d *DB) compTrigger(compTriggerC chan struct{}) { func (db *DB) compTrigger(compTriggerC chan struct{}) {
select { select {
case compTriggerC <- struct{}{}: case compTriggerC <- struct{}{}:
default: default:
} }
} }
func (d *DB) mCompaction() { func (db *DB) mCompaction() {
var x cCmd var x cCmd
defer func() { defer func() {
@ -608,24 +608,24 @@ func (d *DB) mCompaction() {
if x != nil { if x != nil {
x.ack(ErrClosed) x.ack(ErrClosed)
} }
d.closeW.Done() db.closeW.Done()
}() }()
for { for {
select { select {
case _, _ = <-d.closeC: case x = <-db.mcompCmdC:
return db.memCompaction()
case x = <-d.mcompCmdC:
d.memCompaction()
x.ack(nil) x.ack(nil)
x = nil x = nil
case <-d.mcompTriggerC: case <-db.mcompTriggerC:
d.memCompaction() db.memCompaction()
case _, _ = <-db.closeC:
return
} }
} }
} }
func (d *DB) tCompaction() { func (db *DB) tCompaction() {
var x cCmd var x cCmd
var ackQ []cCmd var ackQ []cCmd
@ -642,19 +642,19 @@ func (d *DB) tCompaction() {
if x != nil { if x != nil {
x.ack(ErrClosed) x.ack(ErrClosed)
} }
d.closeW.Done() db.closeW.Done()
}() }()
for { for {
if d.tableNeedCompaction() { if db.tableNeedCompaction() {
select { select {
case x = <-d.tcompCmdC: case x = <-db.tcompCmdC:
case <-d.tcompTriggerC: case <-db.tcompTriggerC:
case _, _ = <-d.closeC: case ch := <-db.tcompPauseC:
return db.pauseCompaction(ch)
case ch := <-d.tcompPauseC:
d.pauseCompaction(ch)
continue continue
case _, _ = <-db.closeC:
return
default: default:
} }
} else { } else {
@ -664,12 +664,12 @@ func (d *DB) tCompaction() {
} }
ackQ = ackQ[:0] ackQ = ackQ[:0]
select { select {
case x = <-d.tcompCmdC: case x = <-db.tcompCmdC:
case <-d.tcompTriggerC: case <-db.tcompTriggerC:
case ch := <-d.tcompPauseC: case ch := <-db.tcompPauseC:
d.pauseCompaction(ch) db.pauseCompaction(ch)
continue continue
case _, _ = <-d.closeC: case _, _ = <-db.closeC:
return return
} }
} }
@ -678,11 +678,11 @@ func (d *DB) tCompaction() {
case cIdle: case cIdle:
ackQ = append(ackQ, x) ackQ = append(ackQ, x)
case cRange: case cRange:
d.tableRangeCompaction(cmd.level, cmd.min, cmd.max) db.tableRangeCompaction(cmd.level, cmd.min, cmd.max)
x.ack(nil) x.ack(nil)
} }
x = nil x = nil
} }
d.tableAutoCompaction() db.tableAutoCompaction()
} }
} }

View File

@ -20,10 +20,8 @@ var (
) )
func (db *DB) newRawIterator(slice *util.Range, ro *opt.ReadOptions) iterator.Iterator { func (db *DB) newRawIterator(slice *util.Range, ro *opt.ReadOptions) iterator.Iterator {
s := db.s
em, fm := db.getMems() em, fm := db.getMems()
v := s.version() v := db.s.version()
ti := v.getIterators(slice, ro) ti := v.getIterators(slice, ro)
n := len(ti) + 2 n := len(ti) + 2
@ -33,24 +31,24 @@ func (db *DB) newRawIterator(slice *util.Range, ro *opt.ReadOptions) iterator.It
i = append(i, fm.NewIterator(slice)) i = append(i, fm.NewIterator(slice))
} }
i = append(i, ti...) i = append(i, ti...)
strict := s.o.GetStrict(opt.StrictIterator) || ro.GetStrict(opt.StrictIterator) strict := db.s.o.GetStrict(opt.StrictIterator) || ro.GetStrict(opt.StrictIterator)
mi := iterator.NewMergedIterator(i, s.icmp, strict) mi := iterator.NewMergedIterator(i, db.s.icmp, strict)
mi.SetReleaser(&versionReleaser{v: v}) mi.SetReleaser(&versionReleaser{v: v})
return mi return mi
} }
func (db *DB) newIterator(seq uint64, slice *util.Range, ro *opt.ReadOptions) *dbIter { func (db *DB) newIterator(seq uint64, slice *util.Range, ro *opt.ReadOptions) *dbIter {
var slice_ *util.Range var islice *util.Range
if slice != nil { if slice != nil {
slice_ = &util.Range{} islice = &util.Range{}
if slice.Start != nil { if slice.Start != nil {
slice_.Start = newIKey(slice.Start, kMaxSeq, tSeek) islice.Start = newIKey(slice.Start, kMaxSeq, tSeek)
} }
if slice.Limit != nil { if slice.Limit != nil {
slice_.Limit = newIKey(slice.Limit, kMaxSeq, tSeek) islice.Limit = newIKey(slice.Limit, kMaxSeq, tSeek)
} }
} }
rawIter := db.newRawIterator(slice_, ro) rawIter := db.newRawIterator(islice, ro)
iter := &dbIter{ iter := &dbIter{
icmp: db.s.icmp, icmp: db.s.icmp,
iter: rawIter, iter: rawIter,

View File

@ -87,12 +87,12 @@ type Snapshot struct {
// Creates new snapshot object. // Creates new snapshot object.
func (db *DB) newSnapshot() *Snapshot { func (db *DB) newSnapshot() *Snapshot {
p := &Snapshot{ snap := &Snapshot{
db: db, db: db,
elem: db.acquireSnapshot(), elem: db.acquireSnapshot(),
} }
runtime.SetFinalizer(p, (*Snapshot).Release) runtime.SetFinalizer(snap, (*Snapshot).Release)
return p return snap
} }
// Get gets the value for the given key. It returns ErrNotFound if // Get gets the value for the given key. It returns ErrNotFound if
@ -100,19 +100,18 @@ func (db *DB) newSnapshot() *Snapshot {
// //
// The caller should not modify the contents of the returned slice, but // The caller should not modify the contents of the returned slice, but
// it is safe to modify the contents of the argument after Get returns. // it is safe to modify the contents of the argument after Get returns.
func (p *Snapshot) Get(key []byte, ro *opt.ReadOptions) (value []byte, err error) { func (snap *Snapshot) Get(key []byte, ro *opt.ReadOptions) (value []byte, err error) {
db := p.db err = snap.db.ok()
err = db.ok()
if err != nil { if err != nil {
return return
} }
p.mu.Lock() snap.mu.Lock()
defer p.mu.Unlock() defer snap.mu.Unlock()
if p.released { if snap.released {
err = ErrSnapshotReleased err = ErrSnapshotReleased
return return
} }
return db.get(key, p.elem.seq, ro) return snap.db.get(key, snap.elem.seq, ro)
} }
// NewIterator returns an iterator for the snapshot of the uderlying DB. // NewIterator returns an iterator for the snapshot of the uderlying DB.
@ -132,17 +131,18 @@ func (p *Snapshot) Get(key []byte, ro *opt.ReadOptions) (value []byte, err error
// iterator would be still valid until released. // iterator would be still valid until released.
// //
// Also read Iterator documentation of the leveldb/iterator package. // Also read Iterator documentation of the leveldb/iterator package.
func (p *Snapshot) NewIterator(slice *util.Range, ro *opt.ReadOptions) iterator.Iterator { func (snap *Snapshot) NewIterator(slice *util.Range, ro *opt.ReadOptions) iterator.Iterator {
db := p.db if err := snap.db.ok(); err != nil {
if err := db.ok(); err != nil {
return iterator.NewEmptyIterator(err) return iterator.NewEmptyIterator(err)
} }
p.mu.Lock() snap.mu.Lock()
defer p.mu.Unlock() defer snap.mu.Unlock()
if p.released { if snap.released {
return iterator.NewEmptyIterator(ErrSnapshotReleased) return iterator.NewEmptyIterator(ErrSnapshotReleased)
} }
return db.newIterator(p.elem.seq, slice, ro) // Since iterator already hold version ref, it doesn't need to
// hold snapshot ref.
return snap.db.newIterator(snap.elem.seq, slice, ro)
} }
// Release releases the snapshot. This will not release any returned // Release releases the snapshot. This will not release any returned
@ -150,16 +150,17 @@ func (p *Snapshot) NewIterator(slice *util.Range, ro *opt.ReadOptions) iterator.
// underlying DB is closed. // underlying DB is closed.
// //
// Other methods should not be called after the snapshot has been released. // Other methods should not be called after the snapshot has been released.
func (p *Snapshot) Release() { func (snap *Snapshot) Release() {
p.mu.Lock() snap.mu.Lock()
if !p.released { defer snap.mu.Unlock()
// Clear the finalizer.
runtime.SetFinalizer(p, nil)
p.released = true if !snap.released {
p.db.releaseSnapshot(p.elem) // Clear the finalizer.
p.db = nil runtime.SetFinalizer(snap, nil)
p.elem = nil
snap.released = true
snap.db.releaseSnapshot(snap.elem)
snap.db = nil
snap.elem = nil
} }
p.mu.Unlock()
} }

View File

@ -14,100 +14,101 @@ import (
) )
// Get latest sequence number. // Get latest sequence number.
func (d *DB) getSeq() uint64 { func (db *DB) getSeq() uint64 {
return atomic.LoadUint64(&d.seq) return atomic.LoadUint64(&db.seq)
} }
// Atomically adds delta to seq. // Atomically adds delta to seq.
func (d *DB) addSeq(delta uint64) { func (db *DB) addSeq(delta uint64) {
atomic.AddUint64(&d.seq, delta) atomic.AddUint64(&db.seq, delta)
} }
// Create new memdb and froze the old one; need external synchronization. // Create new memdb and froze the old one; need external synchronization.
// newMem only called synchronously by the writer. // newMem only called synchronously by the writer.
func (d *DB) newMem(n int) (mem *memdb.DB, err error) { func (db *DB) newMem(n int) (mem *memdb.DB, err error) {
s := d.s num := db.s.allocFileNum()
file := db.s.getJournalFile(num)
num := s.allocFileNum()
file := s.getJournalFile(num)
w, err := file.Create() w, err := file.Create()
if err != nil { if err != nil {
s.reuseFileNum(num) db.s.reuseFileNum(num)
return return
} }
d.memMu.Lock()
if d.journal == nil { db.memMu.Lock()
d.journal = journal.NewWriter(w) defer db.memMu.Unlock()
if db.journal == nil {
db.journal = journal.NewWriter(w)
} else { } else {
d.journal.Reset(w) db.journal.Reset(w)
d.journalWriter.Close() db.journalWriter.Close()
d.frozenJournalFile = d.journalFile db.frozenJournalFile = db.journalFile
} }
d.journalWriter = w db.journalWriter = w
d.journalFile = file db.journalFile = file
d.frozenMem = d.mem db.frozenMem = db.mem
d.mem = memdb.New(s.icmp, maxInt(d.s.o.GetWriteBuffer(), n)) db.mem = memdb.New(db.s.icmp, maxInt(db.s.o.GetWriteBuffer(), n))
mem = d.mem mem = db.mem
// The seq only incremented by the writer. // The seq only incremented by the writer. And whoever called newMem
d.frozenSeq = d.seq // should hold write lock, so no need additional synchronization here.
d.memMu.Unlock() db.frozenSeq = db.seq
return return
} }
// Get all memdbs. // Get all memdbs.
func (d *DB) getMems() (e *memdb.DB, f *memdb.DB) { func (db *DB) getMems() (e *memdb.DB, f *memdb.DB) {
d.memMu.RLock() db.memMu.RLock()
defer d.memMu.RUnlock() defer db.memMu.RUnlock()
return d.mem, d.frozenMem return db.mem, db.frozenMem
} }
// Get frozen memdb. // Get frozen memdb.
func (d *DB) getEffectiveMem() *memdb.DB { func (db *DB) getEffectiveMem() *memdb.DB {
d.memMu.RLock() db.memMu.RLock()
defer d.memMu.RUnlock() defer db.memMu.RUnlock()
return d.mem return db.mem
} }
// Check whether we has frozen memdb. // Check whether we has frozen memdb.
func (d *DB) hasFrozenMem() bool { func (db *DB) hasFrozenMem() bool {
d.memMu.RLock() db.memMu.RLock()
defer d.memMu.RUnlock() defer db.memMu.RUnlock()
return d.frozenMem != nil return db.frozenMem != nil
} }
// Get frozen memdb. // Get frozen memdb.
func (d *DB) getFrozenMem() *memdb.DB { func (db *DB) getFrozenMem() *memdb.DB {
d.memMu.RLock() db.memMu.RLock()
defer d.memMu.RUnlock() defer db.memMu.RUnlock()
return d.frozenMem return db.frozenMem
} }
// Drop frozen memdb; assume that frozen memdb isn't nil. // Drop frozen memdb; assume that frozen memdb isn't nil.
func (d *DB) dropFrozenMem() { func (db *DB) dropFrozenMem() {
d.memMu.Lock() db.memMu.Lock()
if err := d.frozenJournalFile.Remove(); err != nil { if err := db.frozenJournalFile.Remove(); err != nil {
d.s.logf("journal@remove removing @%d %q", d.frozenJournalFile.Num(), err) db.logf("journal@remove removing @%d %q", db.frozenJournalFile.Num(), err)
} else { } else {
d.s.logf("journal@remove removed @%d", d.frozenJournalFile.Num()) db.logf("journal@remove removed @%d", db.frozenJournalFile.Num())
} }
d.frozenJournalFile = nil db.frozenJournalFile = nil
d.frozenMem = nil db.frozenMem = nil
d.memMu.Unlock() db.memMu.Unlock()
} }
// Set closed flag; return true if not already closed. // Set closed flag; return true if not already closed.
func (d *DB) setClosed() bool { func (db *DB) setClosed() bool {
return atomic.CompareAndSwapUint32(&d.closed, 0, 1) return atomic.CompareAndSwapUint32(&db.closed, 0, 1)
} }
// Check whether DB was closed. // Check whether DB was closed.
func (d *DB) isClosed() bool { func (db *DB) isClosed() bool {
return atomic.LoadUint32(&d.closed) != 0 return atomic.LoadUint32(&db.closed) != 0
} }
// Check read ok status. // Check read ok status.
func (d *DB) ok() error { func (db *DB) ok() error {
if d.isClosed() { if db.isClosed() {
return ErrClosed return ErrClosed
} }
return nil return nil

View File

@ -154,9 +154,7 @@ func (h *dbHarness) maxNextLevelOverlappingBytes(want uint64) {
level := i + 1 level := i + 1
next := v.tables[level+1] next := v.tables[level+1]
for _, t := range tt { for _, t := range tt {
var r tFiles r := next.getOverlaps(nil, db.s.icmp, t.imin.ukey(), t.imax.ukey(), false)
min, max := t.min.ukey(), t.max.ukey()
next.getOverlaps(min, max, &r, true, db.s.icmp.ucmp)
sum := r.size() sum := r.size()
if sum > res { if sum > res {
res = sum res = sum

View File

@ -32,40 +32,42 @@ func (p Sizes) Sum() (n uint64) {
return n return n
} }
// Check and clean files. // Logging.
func (d *DB) checkAndCleanFiles() error { func (db *DB) log(v ...interface{}) { db.s.log(v...) }
s := d.s func (db *DB) logf(format string, v ...interface{}) { db.s.logf(format, v...) }
v := s.version_NB() // Check and clean files.
tables := make(map[uint64]bool) func (db *DB) checkAndCleanFiles() error {
for _, tt := range v.tables { v := db.s.version_NB()
for _, t := range tt { tablesMap := make(map[uint64]bool)
tables[t.file.Num()] = false for _, tables := range v.tables {
for _, t := range tables {
tablesMap[t.file.Num()] = false
} }
} }
ff, err := s.getFiles(storage.TypeAll) files, err := db.s.getFiles(storage.TypeAll)
if err != nil { if err != nil {
return err return err
} }
var nTables int var nTables int
var rem []storage.File var rem []storage.File
for _, f := range ff { for _, f := range files {
keep := true keep := true
switch f.Type() { switch f.Type() {
case storage.TypeManifest: case storage.TypeManifest:
keep = f.Num() >= s.manifestFile.Num() keep = f.Num() >= db.s.manifestFile.Num()
case storage.TypeJournal: case storage.TypeJournal:
if d.frozenJournalFile != nil { if db.frozenJournalFile != nil {
keep = f.Num() >= d.frozenJournalFile.Num() keep = f.Num() >= db.frozenJournalFile.Num()
} else { } else {
keep = f.Num() >= d.journalFile.Num() keep = f.Num() >= db.journalFile.Num()
} }
case storage.TypeTable: case storage.TypeTable:
_, keep = tables[f.Num()] _, keep = tablesMap[f.Num()]
if keep { if keep {
tables[f.Num()] = true tablesMap[f.Num()] = true
nTables++ nTables++
} }
} }
@ -75,18 +77,18 @@ func (d *DB) checkAndCleanFiles() error {
} }
} }
if nTables != len(tables) { if nTables != len(tablesMap) {
for num, present := range tables { for num, present := range tablesMap {
if !present { if !present {
s.logf("db@janitor table missing @%d", num) db.logf("db@janitor table missing @%d", num)
} }
} }
return ErrCorrupted{Type: MissingFiles, Err: errors.New("leveldb: table files missing")} return ErrCorrupted{Type: MissingFiles, Err: errors.New("leveldb: table files missing")}
} }
s.logf("db@janitor F·%d G·%d", len(ff), len(rem)) db.logf("db@janitor F·%d G·%d", len(files), len(rem))
for _, f := range rem { for _, f := range rem {
s.logf("db@janitor removing %s-%d", f.Type(), f.Num()) db.logf("db@janitor removing %s-%d", f.Type(), f.Num())
if err := f.Remove(); err != nil { if err := f.Remove(); err != nil {
return err return err
} }

View File

@ -14,63 +14,61 @@ import (
"github.com/syndtr/goleveldb/leveldb/util" "github.com/syndtr/goleveldb/leveldb/util"
) )
func (d *DB) writeJournal(b *Batch) error { func (db *DB) writeJournal(b *Batch) error {
w, err := d.journal.Next() w, err := db.journal.Next()
if err != nil { if err != nil {
return err return err
} }
if _, err := w.Write(b.encode()); err != nil { if _, err := w.Write(b.encode()); err != nil {
return err return err
} }
if err := d.journal.Flush(); err != nil { if err := db.journal.Flush(); err != nil {
return err return err
} }
if b.sync { if b.sync {
return d.journalWriter.Sync() return db.journalWriter.Sync()
} }
return nil return nil
} }
func (d *DB) jWriter() { func (db *DB) jWriter() {
defer d.closeW.Done() defer db.closeW.Done()
for { for {
select { select {
case b := <-d.journalC: case b := <-db.journalC:
if b != nil { if b != nil {
d.journalAckC <- d.writeJournal(b) db.journalAckC <- db.writeJournal(b)
} }
case _, _ = <-d.closeC: case _, _ = <-db.closeC:
return return
} }
} }
} }
func (d *DB) rotateMem(n int) (mem *memdb.DB, err error) { func (db *DB) rotateMem(n int) (mem *memdb.DB, err error) {
// Wait for pending memdb compaction. // Wait for pending memdb compaction.
err = d.compSendIdle(d.mcompCmdC) err = db.compSendIdle(db.mcompCmdC)
if err != nil { if err != nil {
return return
} }
// Create new memdb and journal. // Create new memdb and journal.
mem, err = d.newMem(n) mem, err = db.newMem(n)
if err != nil { if err != nil {
return return
} }
// Schedule memdb compaction. // Schedule memdb compaction.
d.compTrigger(d.mcompTriggerC) db.compTrigger(db.mcompTriggerC)
return return
} }
func (d *DB) flush(n int) (mem *memdb.DB, nn int, err error) { func (db *DB) flush(n int) (mem *memdb.DB, nn int, err error) {
s := d.s
delayed := false delayed := false
flush := func() bool { flush := func() bool {
v := s.version() v := db.s.version()
defer v.release() defer v.release()
mem = d.getEffectiveMem() mem = db.getEffectiveMem()
nn = mem.Free() nn = mem.Free()
switch { switch {
case v.tLen(0) >= kL0_SlowdownWritesTrigger && !delayed: case v.tLen(0) >= kL0_SlowdownWritesTrigger && !delayed:
@ -80,7 +78,7 @@ func (d *DB) flush(n int) (mem *memdb.DB, nn int, err error) {
return false return false
case v.tLen(0) >= kL0_StopWritesTrigger: case v.tLen(0) >= kL0_StopWritesTrigger:
delayed = true delayed = true
err = d.compSendIdle(d.tcompCmdC) err = db.compSendIdle(db.tcompCmdC)
if err != nil { if err != nil {
return false return false
} }
@ -90,7 +88,7 @@ func (d *DB) flush(n int) (mem *memdb.DB, nn int, err error) {
nn = n nn = n
return false return false
} }
mem, err = d.rotateMem(n) mem, err = db.rotateMem(n)
nn = mem.Free() nn = mem.Free()
return false return false
} }
@ -100,7 +98,7 @@ func (d *DB) flush(n int) (mem *memdb.DB, nn int, err error) {
for flush() { for flush() {
} }
if delayed { if delayed {
s.logf("db@write delayed T·%v", time.Since(start)) db.logf("db@write delayed T·%v", time.Since(start))
} }
return return
} }
@ -109,8 +107,8 @@ func (d *DB) flush(n int) (mem *memdb.DB, nn int, err error) {
// sequentially. // sequentially.
// //
// It is safe to modify the contents of the arguments after Write returns. // It is safe to modify the contents of the arguments after Write returns.
func (d *DB) Write(b *Batch, wo *opt.WriteOptions) (err error) { func (db *DB) Write(b *Batch, wo *opt.WriteOptions) (err error) {
err = d.ok() err = db.ok()
if err != nil || b == nil || b.len() == 0 { if err != nil || b == nil || b.len() == 0 {
return return
} }
@ -120,25 +118,25 @@ func (d *DB) Write(b *Batch, wo *opt.WriteOptions) (err error) {
// The write happen synchronously. // The write happen synchronously.
retry: retry:
select { select {
case d.writeC <- b: case db.writeC <- b:
if <-d.writeMergedC { if <-db.writeMergedC {
return <-d.writeAckC return <-db.writeAckC
} }
goto retry goto retry
case d.writeLockC <- struct{}{}: case db.writeLockC <- struct{}{}:
case _, _ = <-d.closeC: case _, _ = <-db.closeC:
return ErrClosed return ErrClosed
} }
merged := 0 merged := 0
defer func() { defer func() {
<-d.writeLockC <-db.writeLockC
for i := 0; i < merged; i++ { for i := 0; i < merged; i++ {
d.writeAckC <- err db.writeAckC <- err
} }
}() }()
mem, memFree, err := d.flush(b.size()) mem, memFree, err := db.flush(b.size())
if err != nil { if err != nil {
return return
} }
@ -154,13 +152,13 @@ retry:
drain: drain:
for b.size() < m && !b.sync { for b.size() < m && !b.sync {
select { select {
case nb := <-d.writeC: case nb := <-db.writeC:
if b.size()+nb.size() <= m { if b.size()+nb.size() <= m {
b.append(nb) b.append(nb)
d.writeMergedC <- true db.writeMergedC <- true
merged++ merged++
} else { } else {
d.writeMergedC <- false db.writeMergedC <- false
break drain break drain
} }
default: default:
@ -169,25 +167,25 @@ drain:
} }
// Set batch first seq number relative from last seq. // Set batch first seq number relative from last seq.
b.seq = d.seq + 1 b.seq = db.seq + 1
// Write journal concurrently if it is large enough. // Write journal concurrently if it is large enough.
if b.size() >= (128 << 10) { if b.size() >= (128 << 10) {
// Push the write batch to the journal writer // Push the write batch to the journal writer
select { select {
case _, _ = <-d.closeC: case _, _ = <-db.closeC:
err = ErrClosed err = ErrClosed
return return
case d.journalC <- b: case db.journalC <- b:
// Write into memdb // Write into memdb
b.memReplay(mem) b.memReplay(mem)
} }
// Wait for journal writer // Wait for journal writer
select { select {
case _, _ = <-d.closeC: case _, _ = <-db.closeC:
err = ErrClosed err = ErrClosed
return return
case err = <-d.journalAckC: case err = <-db.journalAckC:
if err != nil { if err != nil {
// Revert memdb if error detected // Revert memdb if error detected
b.revertMemReplay(mem) b.revertMemReplay(mem)
@ -195,7 +193,7 @@ drain:
} }
} }
} else { } else {
err = d.writeJournal(b) err = db.writeJournal(b)
if err != nil { if err != nil {
return return
} }
@ -203,10 +201,10 @@ drain:
} }
// Set last seq number. // Set last seq number.
d.addSeq(uint64(b.len())) db.addSeq(uint64(b.len()))
if b.size() >= memFree { if b.size() >= memFree {
d.rotateMem(0) db.rotateMem(0)
} }
return return
} }
@ -215,20 +213,20 @@ drain:
// for that key; a DB is not a multi-map. // for that key; a DB is not a multi-map.
// //
// It is safe to modify the contents of the arguments after Put returns. // It is safe to modify the contents of the arguments after Put returns.
func (d *DB) Put(key, value []byte, wo *opt.WriteOptions) error { func (db *DB) Put(key, value []byte, wo *opt.WriteOptions) error {
b := new(Batch) b := new(Batch)
b.Put(key, value) b.Put(key, value)
return d.Write(b, wo) return db.Write(b, wo)
} }
// Delete deletes the value for the given key. It returns ErrNotFound if // Delete deletes the value for the given key. It returns ErrNotFound if
// the DB does not contain the key. // the DB does not contain the key.
// //
// It is safe to modify the contents of the arguments after Delete returns. // It is safe to modify the contents of the arguments after Delete returns.
func (d *DB) Delete(key []byte, wo *opt.WriteOptions) error { func (db *DB) Delete(key []byte, wo *opt.WriteOptions) error {
b := new(Batch) b := new(Batch)
b.Delete(key) b.Delete(key)
return d.Write(b, wo) return db.Write(b, wo)
} }
func isMemOverlaps(icmp *iComparer, mem *memdb.DB, min, max []byte) bool { func isMemOverlaps(icmp *iComparer, mem *memdb.DB, min, max []byte) bool {
@ -247,33 +245,33 @@ func isMemOverlaps(icmp *iComparer, mem *memdb.DB, min, max []byte) bool {
// A nil Range.Start is treated as a key before all keys in the DB. // A nil Range.Start is treated as a key before all keys in the DB.
// And a nil Range.Limit is treated as a key after all keys in the DB. // And a nil Range.Limit is treated as a key after all keys in the DB.
// Therefore if both is nil then it will compact entire DB. // Therefore if both is nil then it will compact entire DB.
func (d *DB) CompactRange(r util.Range) error { func (db *DB) CompactRange(r util.Range) error {
if err := d.ok(); err != nil { if err := db.ok(); err != nil {
return err return err
} }
select { select {
case d.writeLockC <- struct{}{}: case db.writeLockC <- struct{}{}:
case _, _ = <-d.closeC: case _, _ = <-db.closeC:
return ErrClosed return ErrClosed
} }
// Check for overlaps in memdb. // Check for overlaps in memdb.
mem := d.getEffectiveMem() mem := db.getEffectiveMem()
if isMemOverlaps(d.s.icmp, mem, r.Start, r.Limit) { if isMemOverlaps(db.s.icmp, mem, r.Start, r.Limit) {
// Memdb compaction. // Memdb compaction.
if _, err := d.rotateMem(0); err != nil { if _, err := db.rotateMem(0); err != nil {
<-d.writeLockC <-db.writeLockC
return err return err
} }
<-d.writeLockC <-db.writeLockC
if err := d.compSendIdle(d.mcompCmdC); err != nil { if err := db.compSendIdle(db.mcompCmdC); err != nil {
return err return err
} }
} else { } else {
<-d.writeLockC <-db.writeLockC
} }
// Table compaction. // Table compaction.
return d.compSendRange(d.tcompCmdC, -1, r.Start, r.Limit) return db.compSendRange(db.tcompCmdC, -1, r.Start, r.Limit)
} }

View File

@ -36,7 +36,7 @@ var _ = testutil.Defer(func() {
testutil.DoDBTesting(&t) testutil.DoDBTesting(&t)
db.TestClose() db.TestClose()
done <- true done <- true
}, 9.0) }, 20.0)
}) })
Describe("read test", func() { Describe("read test", func() {

View File

@ -437,6 +437,8 @@ func (p *DB) Reset() {
// New creates a new initalized in-memory key/value DB. The capacity // New creates a new initalized in-memory key/value DB. The capacity
// is the initial key/value buffer capacity. The capacity is advisory, // is the initial key/value buffer capacity. The capacity is advisory,
// not enforced. // not enforced.
//
// The returned DB instance is goroutine-safe.
func New(cmp comparer.BasicComparer, capacity int) *DB { func New(cmp comparer.BasicComparer, capacity int) *DB {
p := &DB{ p := &DB{
cmp: cmp, cmp: cmp,

View File

@ -39,11 +39,12 @@ type session struct {
manifestWriter storage.Writer manifestWriter storage.Writer
manifestFile storage.File manifestFile storage.File
stCPtrs [kNumLevels]iKey // compact pointers; need external synchronization stCptrs [kNumLevels]iKey // compact pointers; need external synchronization
stVersion *version // current version stVersion *version // current version
vmu sync.Mutex vmu sync.Mutex
} }
// Creates new initialized session instance.
func newSession(stor storage.Storage, o *opt.Options) (s *session, err error) { func newSession(stor storage.Storage, o *opt.Options) (s *session, err error) {
if stor == nil { if stor == nil {
return nil, os.ErrInvalid return nil, os.ErrInvalid
@ -81,6 +82,7 @@ func (s *session) close() {
s.stVersion = nil s.stVersion = nil
} }
// Release session lock.
func (s *session) release() { func (s *session) release() {
s.storLock.Release() s.storLock.Release()
} }
@ -132,8 +134,8 @@ func (s *session) recover() (err error) {
err = rec.decode(r) err = rec.decode(r)
if err == nil { if err == nil {
// save compact pointers // save compact pointers
for _, rp := range rec.compactionPointers { for _, r := range rec.compactionPointers {
s.stCPtrs[rp.level] = iKey(rp.key) s.stCptrs[r.level] = iKey(r.ikey)
} }
// commit record to version staging // commit record to version staging
staging.commit(rec) staging.commit(rec)
@ -195,16 +197,16 @@ func (s *session) pickCompaction() *compaction {
var t0 tFiles var t0 tFiles
if v.cScore >= 1 { if v.cScore >= 1 {
level = v.cLevel level = v.cLevel
cp := s.stCPtrs[level] cptr := s.stCptrs[level]
tt := v.tables[level] tables := v.tables[level]
for _, t := range tt { for _, t := range tables {
if cp == nil || s.icmp.Compare(t.max, cp) > 0 { if cptr == nil || s.icmp.Compare(t.imax, cptr) > 0 {
t0 = append(t0, t) t0 = append(t0, t)
break break
} }
} }
if len(t0) == 0 { if len(t0) == 0 {
t0 = append(t0, tt[0]) t0 = append(t0, tables[0])
} }
} else { } else {
if p := atomic.LoadPointer(&v.cSeek); p != nil { if p := atomic.LoadPointer(&v.cSeek); p != nil {
@ -216,11 +218,10 @@ func (s *session) pickCompaction() *compaction {
} }
} }
c := &compaction{s: s, version: v, level: level} c := &compaction{s: s, v: v, level: level}
if level == 0 { if level == 0 {
min, max := t0.getRange(s.icmp) imin, imax := t0.getRange(s.icmp)
t0 = nil t0 = v.tables[0].getOverlaps(t0[:0], s.icmp, imin.ukey(), imax.ukey(), true)
v.tables[0].getOverlaps(min.ukey(), max.ukey(), &t0, false, s.icmp.ucmp)
} }
c.tables[0] = t0 c.tables[0] = t0
@ -229,11 +230,10 @@ func (s *session) pickCompaction() *compaction {
} }
// Create compaction from given level and range; need external synchronization. // Create compaction from given level and range; need external synchronization.
func (s *session) getCompactionRange(level int, min, max []byte) *compaction { func (s *session) getCompactionRange(level int, umin, umax []byte) *compaction {
v := s.version_NB() v := s.version_NB()
var t0 tFiles t0 := v.tables[level].getOverlaps(nil, s.icmp, umin, umax, level == 0)
v.tables[level].getOverlaps(min, max, &t0, level != 0, s.icmp.ucmp)
if len(t0) == 0 { if len(t0) == 0 {
return nil return nil
} }
@ -255,16 +255,16 @@ func (s *session) getCompactionRange(level int, min, max []byte) *compaction {
} }
} }
c := &compaction{s: s, version: v, level: level} c := &compaction{s: s, v: v, level: level}
c.tables[0] = t0 c.tables[0] = t0
c.expand() c.expand()
return c return c
} }
// compaction represent a compaction state // compaction represent a compaction state.
type compaction struct { type compaction struct {
s *session s *session
version *version v *version
level int level int
tables [2]tFiles tables [2]tFiles
@ -273,42 +273,36 @@ type compaction struct {
gpidx int gpidx int
seenKey bool seenKey bool
overlappedBytes uint64 overlappedBytes uint64
min, max iKey imin, imax iKey
tPtrs [kNumLevels]int tPtrs [kNumLevels]int
} }
// Expand compacted tables; need external synchronization. // Expand compacted tables; need external synchronization.
func (c *compaction) expand() { func (c *compaction) expand() {
s := c.s
v := c.version
level := c.level level := c.level
vt0, vt1 := v.tables[level], v.tables[level+1] vt0, vt1 := c.v.tables[level], c.v.tables[level+1]
t0, t1 := c.tables[0], c.tables[1] t0, t1 := c.tables[0], c.tables[1]
min, max := t0.getRange(s.icmp) imin, imax := t0.getRange(c.s.icmp)
vt1.getOverlaps(min.ukey(), max.ukey(), &t1, true, s.icmp.ucmp) t1 = vt1.getOverlaps(t1, c.s.icmp, imin.ukey(), imax.ukey(), false)
// Get entire range covered by compaction.
// Get entire range covered by compaction amin, amax := append(t0, t1...).getRange(c.s.icmp)
amin, amax := append(t0, t1...).getRange(s.icmp)
// See if we can grow the number of inputs in "level" without // See if we can grow the number of inputs in "level" without
// changing the number of "level+1" files we pick up. // changing the number of "level+1" files we pick up.
if len(t1) > 0 { if len(t1) > 0 {
var exp0 tFiles exp0 := vt0.getOverlaps(nil, c.s.icmp, amin.ukey(), amax.ukey(), level == 0)
vt0.getOverlaps(amin.ukey(), amax.ukey(), &exp0, level != 0, s.icmp.ucmp)
if len(exp0) > len(t0) && t1.size()+exp0.size() < kExpCompactionMaxBytes { if len(exp0) > len(t0) && t1.size()+exp0.size() < kExpCompactionMaxBytes {
var exp1 tFiles xmin, xmax := exp0.getRange(c.s.icmp)
xmin, xmax := exp0.getRange(s.icmp) exp1 := vt1.getOverlaps(nil, c.s.icmp, xmin.ukey(), xmax.ukey(), false)
vt1.getOverlaps(xmin.ukey(), xmax.ukey(), &exp1, true, s.icmp.ucmp)
if len(exp1) == len(t1) { if len(exp1) == len(t1) {
s.logf("table@compaction expanding L%d+L%d (F·%d S·%s)+(F·%d S·%s) -> (F·%d S·%s)+(F·%d S·%s)", c.s.logf("table@compaction expanding L%d+L%d (F·%d S·%s)+(F·%d S·%s) -> (F·%d S·%s)+(F·%d S·%s)",
level, level+1, len(t0), shortenb(int(t0.size())), len(t1), shortenb(int(t1.size())), level, level+1, len(t0), shortenb(int(t0.size())), len(t1), shortenb(int(t1.size())),
len(exp0), shortenb(int(exp0.size())), len(exp1), shortenb(int(exp1.size()))) len(exp0), shortenb(int(exp0.size())), len(exp1), shortenb(int(exp1.size())))
min, max = xmin, xmax imin, imax = xmin, xmax
t0, t1 = exp0, exp1 t0, t1 = exp0, exp1
amin, amax = append(t0, t1...).getRange(s.icmp) amin, amax = append(t0, t1...).getRange(c.s.icmp)
} }
} }
} }
@ -316,11 +310,11 @@ func (c *compaction) expand() {
// Compute the set of grandparent files that overlap this compaction // Compute the set of grandparent files that overlap this compaction
// (parent == level+1; grandparent == level+2) // (parent == level+1; grandparent == level+2)
if level+2 < kNumLevels { if level+2 < kNumLevels {
v.tables[level+2].getOverlaps(amin.ukey(), amax.ukey(), &c.gp, true, s.icmp.ucmp) c.gp = c.v.tables[level+2].getOverlaps(c.gp, c.s.icmp, amin.ukey(), amax.ukey(), false)
} }
c.tables[0], c.tables[1] = t0, t1 c.tables[0], c.tables[1] = t0, t1
c.min, c.max = min, max c.imin, c.imax = imin, imax
} }
// Check whether compaction is trivial. // Check whether compaction is trivial.
@ -328,17 +322,14 @@ func (c *compaction) trivial() bool {
return len(c.tables[0]) == 1 && len(c.tables[1]) == 0 && c.gp.size() <= kMaxGrandParentOverlapBytes return len(c.tables[0]) == 1 && len(c.tables[1]) == 0 && c.gp.size() <= kMaxGrandParentOverlapBytes
} }
func (c *compaction) isBaseLevelForKey(key []byte) bool { func (c *compaction) baseLevelForKey(ukey []byte) bool {
s := c.s for level, tables := range c.v.tables[c.level+2:] {
v := c.version for c.tPtrs[level] < len(tables) {
t := tables[c.tPtrs[level]]
for level, tt := range v.tables[c.level+2:] { if c.s.icmp.uCompare(ukey, t.imax.ukey()) <= 0 {
for c.tPtrs[level] < len(tt) { // We've advanced far enough.
t := tt[c.tPtrs[level]] if c.s.icmp.uCompare(ukey, t.imin.ukey()) >= 0 {
if s.icmp.uCompare(key, t.max.ukey()) <= 0 { // Key falls in this file's range, so definitely not base level.
// We've advanced far enough
if s.icmp.uCompare(key, t.min.ukey()) >= 0 {
// Key falls in this file's range, so definitely not base level
return false return false
} }
break break
@ -349,10 +340,10 @@ func (c *compaction) isBaseLevelForKey(key []byte) bool {
return true return true
} }
func (c *compaction) shouldStopBefore(key iKey) bool { func (c *compaction) shouldStopBefore(ikey iKey) bool {
for ; c.gpidx < len(c.gp); c.gpidx++ { for ; c.gpidx < len(c.gp); c.gpidx++ {
gp := c.gp[c.gpidx] gp := c.gp[c.gpidx]
if c.s.icmp.Compare(key, gp.max) <= 0 { if c.s.icmp.Compare(ikey, gp.imax) <= 0 {
break break
} }
if c.seenKey { if c.seenKey {
@ -362,42 +353,44 @@ func (c *compaction) shouldStopBefore(key iKey) bool {
c.seenKey = true c.seenKey = true
if c.overlappedBytes > kMaxGrandParentOverlapBytes { if c.overlappedBytes > kMaxGrandParentOverlapBytes {
// Too much overlap for current output; start new output // Too much overlap for current output; start new output.
c.overlappedBytes = 0 c.overlappedBytes = 0
return true return true
} }
return false return false
} }
// Creates an iterator.
func (c *compaction) newIterator() iterator.Iterator { func (c *compaction) newIterator() iterator.Iterator {
s := c.s // Creates iterator slice.
icap := len(c.tables)
level := c.level
icap := 2
if c.level == 0 { if c.level == 0 {
// Special case for level-0
icap = len(c.tables[0]) + 1 icap = len(c.tables[0]) + 1
} }
its := make([]iterator.Iterator, 0, icap) its := make([]iterator.Iterator, 0, icap)
// Options.
ro := &opt.ReadOptions{ ro := &opt.ReadOptions{
DontFillCache: true, DontFillCache: true,
} }
strict := s.o.GetStrict(opt.StrictIterator) strict := c.s.o.GetStrict(opt.StrictIterator)
for i, tt := range c.tables { for i, tables := range c.tables {
if len(tt) == 0 { if len(tables) == 0 {
continue continue
} }
if level+i == 0 { // Level-0 is not sorted and may overlaps each other.
for _, t := range tt { if c.level+i == 0 {
its = append(its, s.tops.newIterator(t, nil, ro)) for _, t := range tables {
its = append(its, c.s.tops.newIterator(t, nil, ro))
} }
} else { } else {
it := iterator.NewIndexedIterator(tt.newIndexIterator(s.tops, s.icmp, nil, ro), strict, true) it := iterator.NewIndexedIterator(tables.newIndexIterator(c.s.tops, c.s.icmp, nil, ro), strict, true)
its = append(its, it) its = append(its, it)
} }
} }
return iterator.NewMergedIterator(its, s.icmp, true) return iterator.NewMergedIterator(its, c.s.icmp, true)
} }

View File

@ -35,19 +35,19 @@ const (
type cpRecord struct { type cpRecord struct {
level int level int
key iKey ikey iKey
} }
type ntRecord struct { type ntRecord struct {
level int level int
num uint64 num uint64
size uint64 size uint64
min iKey imin iKey
max iKey imax iKey
} }
func (r ntRecord) makeFile(s *session) *tFile { func (r ntRecord) makeFile(s *session) *tFile {
return newTFile(s.getTableFile(r.num), r.size, r.min, r.max) return newTableFile(s.getTableFile(r.num), r.size, r.imin, r.imax)
} }
type dtRecord struct { type dtRecord struct {
@ -98,9 +98,9 @@ func (p *sessionRecord) setSeq(seq uint64) {
p.seq = seq p.seq = seq
} }
func (p *sessionRecord) addCompactionPointer(level int, key iKey) { func (p *sessionRecord) addCompactionPointer(level int, ikey iKey) {
p.hasRec |= 1 << recCompactionPointer p.hasRec |= 1 << recCompactionPointer
p.compactionPointers = append(p.compactionPointers, cpRecord{level, key}) p.compactionPointers = append(p.compactionPointers, cpRecord{level, ikey})
} }
func (p *sessionRecord) resetCompactionPointers() { func (p *sessionRecord) resetCompactionPointers() {
@ -108,13 +108,13 @@ func (p *sessionRecord) resetCompactionPointers() {
p.compactionPointers = p.compactionPointers[:0] p.compactionPointers = p.compactionPointers[:0]
} }
func (p *sessionRecord) addTable(level int, num, size uint64, min, max iKey) { func (p *sessionRecord) addTable(level int, num, size uint64, imin, imax iKey) {
p.hasRec |= 1 << recNewTable p.hasRec |= 1 << recNewTable
p.addedTables = append(p.addedTables, ntRecord{level, num, size, min, max}) p.addedTables = append(p.addedTables, ntRecord{level, num, size, imin, imax})
} }
func (p *sessionRecord) addTableFile(level int, t *tFile) { func (p *sessionRecord) addTableFile(level int, t *tFile) {
p.addTable(level, t.file.Num(), t.size, t.min, t.max) p.addTable(level, t.file.Num(), t.size, t.imin, t.imax)
} }
func (p *sessionRecord) resetAddedTables() { func (p *sessionRecord) resetAddedTables() {
@ -169,23 +169,23 @@ func (p *sessionRecord) encode(w io.Writer) error {
p.putUvarint(w, recSeq) p.putUvarint(w, recSeq)
p.putUvarint(w, p.seq) p.putUvarint(w, p.seq)
} }
for _, cp := range p.compactionPointers { for _, r := range p.compactionPointers {
p.putUvarint(w, recCompactionPointer) p.putUvarint(w, recCompactionPointer)
p.putUvarint(w, uint64(cp.level)) p.putUvarint(w, uint64(r.level))
p.putBytes(w, cp.key) p.putBytes(w, r.ikey)
} }
for _, t := range p.deletedTables { for _, r := range p.deletedTables {
p.putUvarint(w, recDeletedTable) p.putUvarint(w, recDeletedTable)
p.putUvarint(w, uint64(t.level)) p.putUvarint(w, uint64(r.level))
p.putUvarint(w, t.num) p.putUvarint(w, r.num)
} }
for _, t := range p.addedTables { for _, r := range p.addedTables {
p.putUvarint(w, recNewTable) p.putUvarint(w, recNewTable)
p.putUvarint(w, uint64(t.level)) p.putUvarint(w, uint64(r.level))
p.putUvarint(w, t.num) p.putUvarint(w, r.num)
p.putUvarint(w, t.size) p.putUvarint(w, r.size)
p.putBytes(w, t.min) p.putBytes(w, r.imin)
p.putBytes(w, t.max) p.putBytes(w, r.imax)
} }
return p.err return p.err
} }
@ -282,18 +282,18 @@ func (p *sessionRecord) decode(r io.Reader) error {
} }
case recCompactionPointer: case recCompactionPointer:
level := p.readLevel(br) level := p.readLevel(br)
key := p.readBytes(br) ikey := p.readBytes(br)
if p.err == nil { if p.err == nil {
p.addCompactionPointer(level, iKey(key)) p.addCompactionPointer(level, iKey(ikey))
} }
case recNewTable: case recNewTable:
level := p.readLevel(br) level := p.readLevel(br)
num := p.readUvarint(br) num := p.readUvarint(br)
size := p.readUvarint(br) size := p.readUvarint(br)
min := p.readBytes(br) imin := p.readBytes(br)
max := p.readBytes(br) imax := p.readBytes(br)
if p.err == nil { if p.err == nil {
p.addTable(level, num, size, min, max) p.addTable(level, num, size, imin, imax)
} }
case recDeletedTable: case recDeletedTable:
level := p.readLevel(br) level := p.readLevel(br)

View File

@ -14,7 +14,7 @@ import (
"github.com/syndtr/goleveldb/leveldb/storage" "github.com/syndtr/goleveldb/leveldb/storage"
) )
// logging // Logging.
type dropper struct { type dropper struct {
s *session s *session
@ -29,15 +29,10 @@ func (d dropper) Drop(err error) {
} }
} }
func (s *session) log(v ...interface{}) { func (s *session) log(v ...interface{}) { s.stor.Log(fmt.Sprint(v...)) }
s.stor.Log(fmt.Sprint(v...)) func (s *session) logf(format string, v ...interface{}) { s.stor.Log(fmt.Sprintf(format, v...)) }
}
func (s *session) logf(format string, v ...interface{}) { // File utils.
s.stor.Log(fmt.Sprintf(format, v...))
}
// file utils
func (s *session) getJournalFile(num uint64) storage.File { func (s *session) getJournalFile(num uint64) storage.File {
return s.stor.GetFile(num, storage.TypeJournal) return s.stor.GetFile(num, storage.TypeJournal)
@ -56,7 +51,7 @@ func (s *session) newTemp() storage.File {
return s.stor.GetFile(num, storage.TypeTemp) return s.stor.GetFile(num, storage.TypeTemp)
} }
// session state // Session state.
// Get current version. // Get current version.
func (s *session) version() *version { func (s *session) version() *version {
@ -126,7 +121,7 @@ func (s *session) reuseFileNum(num uint64) {
} }
} }
// manifest related utils // Manifest related utils.
// Fill given session record obj with current states; need external // Fill given session record obj with current states; need external
// synchronization. // synchronization.
@ -142,7 +137,7 @@ func (s *session) fillRecord(r *sessionRecord, snapshot bool) {
r.setSeq(s.stSeq) r.setSeq(s.stSeq)
} }
for level, ik := range s.stCPtrs { for level, ik := range s.stCptrs {
if ik != nil { if ik != nil {
r.addCompactionPointer(level, ik) r.addCompactionPointer(level, ik)
} }
@ -168,7 +163,7 @@ func (s *session) recordCommited(r *sessionRecord) {
} }
for _, p := range r.compactionPointers { for _, p := range r.compactionPointers {
s.stCPtrs[p.level] = iKey(p.key) s.stCptrs[p.level] = iKey(p.ikey)
} }
} }

View File

@ -344,19 +344,17 @@ type fileWrap struct {
} }
func (fw fileWrap) Sync() error { func (fw fileWrap) Sync() error {
if err := fw.File.Sync(); err != nil {
return err
}
if fw.f.Type() == TypeManifest { if fw.f.Type() == TypeManifest {
// Also sync parent directory if file type is manifest. // Also sync parent directory if file type is manifest.
// See: https://code.google.com/p/leveldb/issues/detail?id=190. // See: https://code.google.com/p/leveldb/issues/detail?id=190.
f, err := os.Open(fw.f.fs.path) if err := syncDir(fw.f.fs.path); err != nil {
if err != nil {
return err
}
defer f.Close()
if err := f.Sync(); err != nil {
return err return err
} }
} }
return fw.File.Sync() return nil
} }
func (fw fileWrap) Close() error { func (fw fileWrap) Close() error {

View File

@ -38,3 +38,15 @@ func rename(oldpath, newpath string) error {
_, fname := filepath.Split(newpath) _, fname := filepath.Split(newpath)
return os.Rename(oldpath, fname) return os.Rename(oldpath, fname)
} }
func syncDir(name string) error {
f, err := os.Open(name)
if err != nil {
return err
}
defer f.Close()
if err := f.Sync(); err != nil {
return err
}
return nil
}

View File

@ -49,3 +49,15 @@ func setFileLock(f *os.File, lock bool) error {
func rename(oldpath, newpath string) error { func rename(oldpath, newpath string) error {
return os.Rename(oldpath, newpath) return os.Rename(oldpath, newpath)
} }
func syncDir(name string) error {
f, err := os.Open(name)
if err != nil {
return err
}
defer f.Close()
if err := f.Sync(); err != nil {
return err
}
return nil
}

View File

@ -65,3 +65,5 @@ func rename(oldpath, newpath string) error {
} }
return moveFileEx(from, to, _MOVEFILE_REPLACE_EXISTING) return moveFileEx(from, to, _MOVEFILE_REPLACE_EXISTING)
} }
func syncDir(name string) error { return nil }

View File

@ -67,7 +67,7 @@ type Writer interface {
Syncer Syncer
} }
// File is the file. // File is the file. A file instance must be goroutine-safe.
type File interface { type File interface {
// Open opens the file for read. Returns os.ErrNotExist error // Open opens the file for read. Returns os.ErrNotExist error
// if the file does not exist. // if the file does not exist.
@ -94,7 +94,7 @@ type File interface {
Remove() error Remove() error
} }
// Storage is the storage. // Storage is the storage. A storage instance must be goroutine-safe.
type Storage interface { type Storage interface {
// Lock locks the storage. Any subsequent attempt to call Lock will fail // Lock locks the storage. Any subsequent attempt to call Lock will fail
// until the last lock released. // until the last lock released.

View File

@ -11,7 +11,6 @@ import (
"sync/atomic" "sync/atomic"
"github.com/syndtr/goleveldb/leveldb/cache" "github.com/syndtr/goleveldb/leveldb/cache"
"github.com/syndtr/goleveldb/leveldb/comparer"
"github.com/syndtr/goleveldb/leveldb/iterator" "github.com/syndtr/goleveldb/leveldb/iterator"
"github.com/syndtr/goleveldb/leveldb/opt" "github.com/syndtr/goleveldb/leveldb/opt"
"github.com/syndtr/goleveldb/leveldb/storage" "github.com/syndtr/goleveldb/leveldb/storage"
@ -19,34 +18,41 @@ import (
"github.com/syndtr/goleveldb/leveldb/util" "github.com/syndtr/goleveldb/leveldb/util"
) )
// table file // tFile holds basic information about a table.
type tFile struct { type tFile struct {
file storage.File file storage.File
seekLeft int32 seekLeft int32
size uint64 size uint64
min, max iKey imin, imax iKey
} }
// test if key is after t // Returns true if given key is after largest key of this table.
func (t *tFile) isAfter(key []byte, ucmp comparer.BasicComparer) bool { func (t *tFile) after(icmp *iComparer, ukey []byte) bool {
return key != nil && ucmp.Compare(key, t.max.ukey()) > 0 return ukey != nil && icmp.uCompare(ukey, t.imax.ukey()) > 0
} }
// test if key is before t // Returns true if given key is before smallest key of this table.
func (t *tFile) isBefore(key []byte, ucmp comparer.BasicComparer) bool { func (t *tFile) before(icmp *iComparer, ukey []byte) bool {
return key != nil && ucmp.Compare(key, t.min.ukey()) < 0 return ukey != nil && icmp.uCompare(ukey, t.imin.ukey()) < 0
} }
func (t *tFile) incrSeek() int32 { // Returns true if given key range overlaps with this table key range.
func (t *tFile) overlaps(icmp *iComparer, umin, umax []byte) bool {
return !t.after(icmp, umin) && !t.before(icmp, umax)
}
// Cosumes one seek and return current seeks left.
func (t *tFile) consumeSeek() int32 {
return atomic.AddInt32(&t.seekLeft, -1) return atomic.AddInt32(&t.seekLeft, -1)
} }
func newTFile(file storage.File, size uint64, min, max iKey) *tFile { // Creates new tFile.
func newTableFile(file storage.File, size uint64, imin, imax iKey) *tFile {
f := &tFile{ f := &tFile{
file: file, file: file,
size: size, size: size,
min: min, imin: imin,
max: max, imax: imax,
} }
// We arrange to automatically compact this file after // We arrange to automatically compact this file after
@ -70,33 +76,40 @@ func newTFile(file storage.File, size uint64, min, max iKey) *tFile {
return f return f
} }
// table files // tFiles hold multiple tFile.
type tFiles []*tFile type tFiles []*tFile
func (tf tFiles) Len() int { return len(tf) } func (tf tFiles) Len() int { return len(tf) }
func (tf tFiles) Swap(i, j int) { tf[i], tf[j] = tf[j], tf[i] } func (tf tFiles) Swap(i, j int) { tf[i], tf[j] = tf[j], tf[i] }
// Returns true if i smallest key is less than j.
// This used for sort by key in ascending order.
func (tf tFiles) lessByKey(icmp *iComparer, i, j int) bool { func (tf tFiles) lessByKey(icmp *iComparer, i, j int) bool {
a, b := tf[i], tf[j] a, b := tf[i], tf[j]
n := icmp.Compare(a.min, b.min) n := icmp.Compare(a.imin, b.imin)
if n == 0 { if n == 0 {
return a.file.Num() < b.file.Num() return a.file.Num() < b.file.Num()
} }
return n < 0 return n < 0
} }
// Returns true if i file number is greater than j.
// This used for sort by file number in descending order.
func (tf tFiles) lessByNum(i, j int) bool { func (tf tFiles) lessByNum(i, j int) bool {
return tf[i].file.Num() > tf[j].file.Num() return tf[i].file.Num() > tf[j].file.Num()
} }
// Sorts tables by key in ascending order.
func (tf tFiles) sortByKey(icmp *iComparer) { func (tf tFiles) sortByKey(icmp *iComparer) {
sort.Sort(&tFilesSortByKey{tFiles: tf, icmp: icmp}) sort.Sort(&tFilesSortByKey{tFiles: tf, icmp: icmp})
} }
// Sorts tables by file number in descending order.
func (tf tFiles) sortByNum() { func (tf tFiles) sortByNum() {
sort.Sort(&tFilesSortByNum{tFiles: tf}) sort.Sort(&tFilesSortByNum{tFiles: tf})
} }
// Returns sum of all tables size.
func (tf tFiles) size() (sum uint64) { func (tf tFiles) size() (sum uint64) {
for _, t := range tf { for _, t := range tf {
sum += t.size sum += t.size
@ -104,94 +117,106 @@ func (tf tFiles) size() (sum uint64) {
return sum return sum
} }
func (tf tFiles) searchMin(key iKey, icmp *iComparer) int { // Searches smallest index of tables whose its smallest
// key is after or equal with given key.
func (tf tFiles) searchMin(icmp *iComparer, ikey iKey) int {
return sort.Search(len(tf), func(i int) bool { return sort.Search(len(tf), func(i int) bool {
return icmp.Compare(tf[i].min, key) >= 0 return icmp.Compare(tf[i].imin, ikey) >= 0
}) })
} }
func (tf tFiles) searchMax(key iKey, icmp *iComparer) int { // Searches smallest index of tables whose its largest
// key is after or equal with given key.
func (tf tFiles) searchMax(icmp *iComparer, ikey iKey) int {
return sort.Search(len(tf), func(i int) bool { return sort.Search(len(tf), func(i int) bool {
return icmp.Compare(tf[i].max, key) >= 0 return icmp.Compare(tf[i].imax, ikey) >= 0
}) })
} }
func (tf tFiles) isOverlaps(min, max []byte, disjSorted bool, icmp *iComparer) bool { // Returns true if given key range overlaps with one or more
if !disjSorted { // tables key range. If unsorted is true then binary search will not be used.
// Need to check against all files func (tf tFiles) overlaps(icmp *iComparer, umin, umax []byte, unsorted bool) bool {
if unsorted {
// Check against all files.
for _, t := range tf { for _, t := range tf {
if !t.isAfter(min, icmp.ucmp) && !t.isBefore(max, icmp.ucmp) { if t.overlaps(icmp, umin, umax) {
return true return true
} }
} }
return false return false
} }
var idx int i := 0
if len(min) > 0 { if len(umin) > 0 {
// Find the earliest possible internal key for min // Find the earliest possible internal key for min.
idx = tf.searchMax(newIKey(min, kMaxSeq, tSeek), icmp) i = tf.searchMax(icmp, newIKey(umin, kMaxSeq, tSeek))
} }
if i >= len(tf) {
if idx >= len(tf) { // Beginning of range is after all files, so no overlap.
// beginning of range is after all files, so no overlap
return false return false
} }
return !tf[idx].isBefore(max, icmp.ucmp) return !tf[i].before(icmp, umax)
} }
func (tf tFiles) getOverlaps(min, max []byte, r *tFiles, disjSorted bool, ucmp comparer.BasicComparer) { // Returns tables whose its key range overlaps with given key range.
// If overlapped is true then the search will be expanded to tables that
// overlaps with each other.
func (tf tFiles) getOverlaps(dst tFiles, icmp *iComparer, umin, umax []byte, overlapped bool) tFiles {
x := len(dst)
for i := 0; i < len(tf); { for i := 0; i < len(tf); {
t := tf[i] t := tf[i]
i++ if t.overlaps(icmp, umin, umax) {
if t.isAfter(min, ucmp) || t.isBefore(max, ucmp) { if overlapped {
continue // For overlapped files, check if the newly added file has
} // expanded the range. If so, restart search.
if umin != nil && icmp.uCompare(t.imin.ukey(), umin) < 0 {
*r = append(*r, t) umin = t.imin.ukey()
if !disjSorted { dst = dst[:x]
// Level-0 files may overlap each other. So check if the newly i = 0
// added file has expanded the range. If so, restart search. continue
if min != nil && ucmp.Compare(t.min.ukey(), min) < 0 { } else if umax != nil && icmp.uCompare(t.imax.ukey(), umax) > 0 {
min = t.min.ukey() umax = t.imax.ukey()
*r = nil dst = dst[:x]
i = 0 i = 0
} else if max != nil && ucmp.Compare(t.max.ukey(), max) > 0 { continue
max = t.max.ukey() }
*r = nil
i = 0
} }
dst = append(dst, t)
} }
i++
} }
return return dst
} }
func (tf tFiles) getRange(icmp *iComparer) (min, max iKey) { // Returns tables key range.
func (tf tFiles) getRange(icmp *iComparer) (imin, imax iKey) {
for i, t := range tf { for i, t := range tf {
if i == 0 { if i == 0 {
min, max = t.min, t.max imin, imax = t.imin, t.imax
continue continue
} }
if icmp.Compare(t.min, min) < 0 { if icmp.Compare(t.imin, imin) < 0 {
min = t.min imin = t.imin
} }
if icmp.Compare(t.max, max) > 0 { if icmp.Compare(t.imax, imax) > 0 {
max = t.max imax = t.imax
} }
} }
return return
} }
// Creates iterator index from tables.
func (tf tFiles) newIndexIterator(tops *tOps, icmp *iComparer, slice *util.Range, ro *opt.ReadOptions) iterator.IteratorIndexer { func (tf tFiles) newIndexIterator(tops *tOps, icmp *iComparer, slice *util.Range, ro *opt.ReadOptions) iterator.IteratorIndexer {
if slice != nil { if slice != nil {
var start, limit int var start, limit int
if slice.Start != nil { if slice.Start != nil {
start = tf.searchMax(iKey(slice.Start), icmp) start = tf.searchMax(icmp, iKey(slice.Start))
} }
if slice.Limit != nil { if slice.Limit != nil {
limit = tf.searchMin(iKey(slice.Limit), icmp) limit = tf.searchMin(icmp, iKey(slice.Limit))
} else { } else {
limit = tf.Len() limit = tf.Len()
} }
@ -206,6 +231,7 @@ func (tf tFiles) newIndexIterator(tops *tOps, icmp *iComparer, slice *util.Range
}) })
} }
// Tables iterator index.
type tFilesArrayIndexer struct { type tFilesArrayIndexer struct {
tFiles tFiles
tops *tOps tops *tOps
@ -215,7 +241,7 @@ type tFilesArrayIndexer struct {
} }
func (a *tFilesArrayIndexer) Search(key []byte) int { func (a *tFilesArrayIndexer) Search(key []byte) int {
return a.searchMax(iKey(key), a.icmp) return a.searchMax(a.icmp, iKey(key))
} }
func (a *tFilesArrayIndexer) Get(i int) iterator.Iterator { func (a *tFilesArrayIndexer) Get(i int) iterator.Iterator {
@ -225,6 +251,7 @@ func (a *tFilesArrayIndexer) Get(i int) iterator.Iterator {
return a.tops.newIterator(a.tFiles[i], nil, a.ro) return a.tops.newIterator(a.tFiles[i], nil, a.ro)
} }
// Helper type for sortByKey.
type tFilesSortByKey struct { type tFilesSortByKey struct {
tFiles tFiles
icmp *iComparer icmp *iComparer
@ -234,6 +261,7 @@ func (x *tFilesSortByKey) Less(i, j int) bool {
return x.lessByKey(x.icmp, i, j) return x.lessByKey(x.icmp, i, j)
} }
// Helper type for sortByNum.
type tFilesSortByNum struct { type tFilesSortByNum struct {
tFiles tFiles
} }
@ -242,19 +270,14 @@ func (x *tFilesSortByNum) Less(i, j int) bool {
return x.lessByNum(i, j) return x.lessByNum(i, j)
} }
// table operations // Table operations.
type tOps struct { type tOps struct {
s *session s *session
cache cache.Cache cache cache.Cache
cacheNS cache.Namespace cacheNS cache.Namespace
} }
func newTableOps(s *session, cacheCap int) *tOps { // Creates an empty table and returns table writer.
c := cache.NewLRUCache(cacheCap)
ns := c.GetNamespace(0)
return &tOps{s, c, ns}
}
func (t *tOps) create() (*tWriter, error) { func (t *tOps) create() (*tWriter, error) {
file := t.s.getTableFile(t.s.allocFileNum()) file := t.s.getTableFile(t.s.allocFileNum())
fw, err := file.Create() fw, err := file.Create()
@ -269,6 +292,7 @@ func (t *tOps) create() (*tWriter, error) {
}, nil }, nil
} }
// Builds table from src iterator.
func (t *tOps) createFrom(src iterator.Iterator) (f *tFile, n int, err error) { func (t *tOps) createFrom(src iterator.Iterator) (f *tFile, n int, err error) {
w, err := t.create() w, err := t.create()
if err != nil { if err != nil {
@ -282,7 +306,7 @@ func (t *tOps) createFrom(src iterator.Iterator) (f *tFile, n int, err error) {
}() }()
for src.Next() { for src.Next() {
err = w.add(src.Key(), src.Value()) err = w.append(src.Key(), src.Value())
if err != nil { if err != nil {
return return
} }
@ -297,7 +321,9 @@ func (t *tOps) createFrom(src iterator.Iterator) (f *tFile, n int, err error) {
return return
} }
func (t *tOps) lookup(f *tFile) (c cache.Object, err error) { // Opens table. It returns a cache object, which should
// be released after use.
func (t *tOps) open(f *tFile) (c cache.Object, err error) {
num := f.file.Num() num := f.file.Num()
c, ok := t.cacheNS.Get(num, func() (ok bool, value interface{}, charge int, fin cache.SetFin) { c, ok := t.cacheNS.Get(num, func() (ok bool, value interface{}, charge int, fin cache.SetFin) {
var r storage.Reader var r storage.Reader
@ -327,8 +353,10 @@ func (t *tOps) lookup(f *tFile) (c cache.Object, err error) {
return return
} }
func (t *tOps) get(f *tFile, key []byte, ro *opt.ReadOptions) (rkey, rvalue []byte, err error) { // Finds key/value pair whose key is greater than or equal to the
c, err := t.lookup(f) // given key.
func (t *tOps) find(f *tFile, key []byte, ro *opt.ReadOptions) (rkey, rvalue []byte, err error) {
c, err := t.open(f)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
@ -336,8 +364,9 @@ func (t *tOps) get(f *tFile, key []byte, ro *opt.ReadOptions) (rkey, rvalue []by
return c.Value().(*table.Reader).Find(key, ro) return c.Value().(*table.Reader).Find(key, ro)
} }
// Returns approximate offset of the given key.
func (t *tOps) offsetOf(f *tFile, key []byte) (offset uint64, err error) { func (t *tOps) offsetOf(f *tFile, key []byte) (offset uint64, err error) {
c, err := t.lookup(f) c, err := t.open(f)
if err != nil { if err != nil {
return return
} }
@ -347,8 +376,9 @@ func (t *tOps) offsetOf(f *tFile, key []byte) (offset uint64, err error) {
return return
} }
// Creates an iterator from the given table.
func (t *tOps) newIterator(f *tFile, slice *util.Range, ro *opt.ReadOptions) iterator.Iterator { func (t *tOps) newIterator(f *tFile, slice *util.Range, ro *opt.ReadOptions) iterator.Iterator {
c, err := t.lookup(f) c, err := t.open(f)
if err != nil { if err != nil {
return iterator.NewEmptyIterator(err) return iterator.NewEmptyIterator(err)
} }
@ -357,6 +387,8 @@ func (t *tOps) newIterator(f *tFile, slice *util.Range, ro *opt.ReadOptions) ite
return iter return iter
} }
// Removes table from persistent storage. It waits until
// no one use the the table.
func (t *tOps) remove(f *tFile) { func (t *tOps) remove(f *tFile) {
num := f.file.Num() num := f.file.Num()
t.cacheNS.Delete(num, func(exist bool) { t.cacheNS.Delete(num, func(exist bool) {
@ -371,10 +403,21 @@ func (t *tOps) remove(f *tFile) {
}) })
} }
// Closes the table ops instance. It will close all tables,
// regadless still used or not.
func (t *tOps) close() { func (t *tOps) close() {
t.cache.Zap(true) t.cache.Zap(true)
} }
// Creates new initialized table ops instance.
func newTableOps(s *session, cacheCap int) *tOps {
c := cache.NewLRUCache(cacheCap)
ns := c.GetNamespace(0)
return &tOps{s, c, ns}
}
// tWriter wraps the table writer. It keep track of file descriptor
// and added key range.
type tWriter struct { type tWriter struct {
t *tOps t *tOps
@ -385,7 +428,8 @@ type tWriter struct {
first, last []byte first, last []byte
} }
func (w *tWriter) add(key, value []byte) error { // Append key/value pair to the table.
func (w *tWriter) append(key, value []byte) error {
if w.first == nil { if w.first == nil {
w.first = append([]byte{}, key...) w.first = append([]byte{}, key...)
} }
@ -393,10 +437,12 @@ func (w *tWriter) add(key, value []byte) error {
return w.tw.Append(key, value) return w.tw.Append(key, value)
} }
// Returns true if the table is empty.
func (w *tWriter) empty() bool { func (w *tWriter) empty() bool {
return w.first == nil return w.first == nil
} }
// Finalizes the table and returns table file.
func (w *tWriter) finish() (f *tFile, err error) { func (w *tWriter) finish() (f *tFile, err error) {
err = w.tw.Close() err = w.tw.Close()
if err != nil { if err != nil {
@ -408,10 +454,11 @@ func (w *tWriter) finish() (f *tFile, err error) {
return return
} }
w.w.Close() w.w.Close()
f = newTFile(w.file, uint64(w.tw.BytesLen()), iKey(w.first), iKey(w.last)) f = newTableFile(w.file, uint64(w.tw.BytesLen()), iKey(w.first), iKey(w.last))
return return
} }
// Drops the table.
func (w *tWriter) drop() { func (w *tWriter) drop() {
w.w.Close() w.w.Close()
w.file.Remove() w.file.Remove()

View File

@ -761,6 +761,8 @@ func (r *Reader) OffsetOf(key []byte) (offset int64, err error) {
// NewReader creates a new initialized table reader for the file. // NewReader creates a new initialized table reader for the file.
// The cache is optional and can be nil. // The cache is optional and can be nil.
//
// The returned table reader instance is goroutine-safe.
func NewReader(f io.ReaderAt, size int64, cache cache.Namespace, o *opt.Options) *Reader { func NewReader(f io.ReaderAt, size int64, cache cache.Namespace, o *opt.Options) *Reader {
r := &Reader{ r := &Reader{
reader: f, reader: f,

View File

@ -48,6 +48,7 @@ func (t *testingDB) TestClose() {
func newTestingDB(o *opt.Options, ro *opt.ReadOptions, wo *opt.WriteOptions) *testingDB { func newTestingDB(o *opt.Options, ro *opt.ReadOptions, wo *opt.WriteOptions) *testingDB {
stor := testutil.NewStorage() stor := testutil.NewStorage()
db, err := Open(stor, o) db, err := Open(stor, o)
// FIXME: This may be called from outside It, which may cause panic.
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
return &testingDB{ return &testingDB{
DB: db, DB: db,

View File

@ -40,8 +40,8 @@ type version struct {
tables [kNumLevels]tFiles tables [kNumLevels]tFiles
// Level that should be compacted next and its compaction score. // Level that should be compacted next and its compaction score.
// Score < 1 means compaction is not strictly needed. These fields // Score < 1 means compaction is not strictly needed. These fields
// are initialized by ComputeCompaction() // are initialized by computeCompaction()
cLevel int cLevel int
cScore float64 cScore float64
@ -60,8 +60,6 @@ func (v *version) release_NB() {
panic("negative version ref") panic("negative version ref")
} }
s := v.s
tables := make(map[uint64]bool) tables := make(map[uint64]bool)
for _, tt := range v.next.tables { for _, tt := range v.next.tables {
for _, t := range tt { for _, t := range tt {
@ -74,7 +72,7 @@ func (v *version) release_NB() {
for _, t := range tt { for _, t := range tt {
num := t.file.Num() num := t.file.Num()
if _, ok := tables[num]; !ok { if _, ok := tables[num]; !ok {
s.tops.remove(t) v.s.tops.remove(t)
} }
} }
} }
@ -89,130 +87,142 @@ func (v *version) release() {
v.s.vmu.Unlock() v.s.vmu.Unlock()
} }
func (v *version) get(key iKey, ro *opt.ReadOptions) (value []byte, cstate bool, err error) { func (v *version) walkOverlapping(ikey iKey, f func(level int, t *tFile) bool, lf func(level int) bool) {
s := v.s ukey := ikey.ukey()
ukey := key.ukey() // Walk tables level-by-level.
for level, tables := range v.tables {
var tset *tSet if len(tables) == 0 {
tseek := true
// We can search level-by-level since entries never hop across
// levels. Therefore we are guaranteed that if we find data
// in an smaller level, later levels are irrelevant.
for level, ts := range v.tables {
if len(ts) == 0 {
continue continue
} }
if level == 0 { if level == 0 {
// Level-0 files may overlap each other. Find all files that // Level-0 files may overlap each other. Find all files that
// overlap user_key and process them in order from newest to // overlap ukey.
var tmp tFiles for _, t := range tables {
for _, t := range ts { if t.overlaps(v.s.icmp, ukey, ukey) {
if s.icmp.uCompare(ukey, t.min.ukey()) >= 0 && if !f(level, t) {
s.icmp.uCompare(ukey, t.max.ukey()) <= 0 { return
tmp = append(tmp, t) }
} }
} }
} else {
if len(tmp) == 0 { if i := tables.searchMax(v.s.icmp, ikey); i < len(tables) {
continue t := tables[i]
} if v.s.icmp.uCompare(ukey, t.imin.ukey()) >= 0 {
if !f(level, t) {
tmp.sortByNum()
ts = tmp
} else {
i := ts.searchMax(key, s.icmp)
if i >= len(ts) || s.icmp.uCompare(ukey, ts[i].min.ukey()) < 0 {
continue
}
ts = ts[i : i+1]
}
var l0found bool
var l0seq uint64
var l0type vType
var l0value []byte
for _, t := range ts {
if tseek {
if tset == nil {
tset = &tSet{level, t}
} else if tset.table.incrSeek() <= 0 {
cstate = atomic.CompareAndSwapPointer(&v.cSeek, nil, unsafe.Pointer(tset))
tseek = false
}
}
var _rkey, rval []byte
_rkey, rval, err = s.tops.get(t, key, ro)
if err == ErrNotFound {
continue
} else if err != nil {
return
}
rkey := iKey(_rkey)
if seq, t, ok := rkey.parseNum(); ok {
if s.icmp.uCompare(ukey, rkey.ukey()) == 0 {
if level == 0 {
if seq >= l0seq {
l0found = true
l0seq = seq
l0type = t
l0value = rval
}
} else {
switch t {
case tVal:
value = rval
case tDel:
err = ErrNotFound
default:
panic("invalid type")
}
return return
} }
} }
} else {
err = errors.New("leveldb: internal key corrupted")
return
} }
} }
if level == 0 && l0found {
switch l0type { if lf != nil && !lf(level) {
case tVal:
value = l0value
case tDel:
err = ErrNotFound
default:
panic("invalid type")
}
return return
} }
} }
}
func (v *version) get(ikey iKey, ro *opt.ReadOptions) (value []byte, tcomp bool, err error) {
ukey := ikey.ukey()
var (
tset *tSet
tseek bool
l0found bool
l0seq uint64
l0vt vType
l0val []byte
)
err = ErrNotFound err = ErrNotFound
// Since entries never hope across level, finding key/value
// in smaller level make later levels irrelevant.
v.walkOverlapping(ikey, func(level int, t *tFile) bool {
if !tseek {
if tset == nil {
tset = &tSet{level, t}
} else if tset.table.consumeSeek() <= 0 {
tseek = true
tcomp = atomic.CompareAndSwapPointer(&v.cSeek, nil, unsafe.Pointer(tset))
}
}
ikey__, val_, err_ := v.s.tops.find(t, ikey, ro)
switch err_ {
case nil:
case ErrNotFound:
return true
default:
err = err_
return false
}
ikey_ := iKey(ikey__)
if seq, vt, ok := ikey_.parseNum(); ok {
if v.s.icmp.uCompare(ukey, ikey_.ukey()) != 0 {
return true
}
if level == 0 {
if seq >= l0seq {
l0found = true
l0seq = seq
l0vt = vt
l0val = val_
}
} else {
switch vt {
case tVal:
value = val_
err = nil
case tDel:
default:
panic("leveldb: invalid internal key type")
}
return false
}
} else {
err = errors.New("leveldb: internal key corrupted")
return false
}
return true
}, func(level int) bool {
if l0found {
switch l0vt {
case tVal:
value = l0val
err = nil
case tDel:
default:
panic("leveldb: invalid internal key type")
}
return false
}
return true
})
return return
} }
func (v *version) getIterators(slice *util.Range, ro *opt.ReadOptions) (its []iterator.Iterator) { func (v *version) getIterators(slice *util.Range, ro *opt.ReadOptions) (its []iterator.Iterator) {
s := v.s
// Merge all level zero files together since they may overlap // Merge all level zero files together since they may overlap
for _, t := range v.tables[0] { for _, t := range v.tables[0] {
it := s.tops.newIterator(t, slice, ro) it := v.s.tops.newIterator(t, slice, ro)
its = append(its, it) its = append(its, it)
} }
strict := s.o.GetStrict(opt.StrictIterator) || ro.GetStrict(opt.StrictIterator) strict := v.s.o.GetStrict(opt.StrictIterator) || ro.GetStrict(opt.StrictIterator)
for _, tt := range v.tables[1:] { for _, tables := range v.tables[1:] {
if len(tt) == 0 { if len(tables) == 0 {
continue continue
} }
it := iterator.NewIndexedIterator(tt.newIndexIterator(s.tops, s.icmp, slice, ro), strict, true) it := iterator.NewIndexedIterator(tables.newIndexIterator(v.s.tops, v.s.icmp, slice, ro), strict, true)
its = append(its, it) its = append(its, it)
} }
@ -242,25 +252,25 @@ func (v *version) tLen(level int) int {
return len(v.tables[level]) return len(v.tables[level])
} }
func (v *version) offsetOf(key iKey) (n uint64, err error) { func (v *version) offsetOf(ikey iKey) (n uint64, err error) {
for level, tt := range v.tables { for level, tables := range v.tables {
for _, t := range tt { for _, t := range tables {
if v.s.icmp.Compare(t.max, key) <= 0 { if v.s.icmp.Compare(t.imax, ikey) <= 0 {
// Entire file is before "key", so just add the file size // Entire file is before "ikey", so just add the file size
n += t.size n += t.size
} else if v.s.icmp.Compare(t.min, key) > 0 { } else if v.s.icmp.Compare(t.imin, ikey) > 0 {
// Entire file is after "key", so ignore // Entire file is after "ikey", so ignore
if level > 0 { if level > 0 {
// Files other than level 0 are sorted by meta->min, so // Files other than level 0 are sorted by meta->min, so
// no further files in this level will contain data for // no further files in this level will contain data for
// "key". // "ikey".
break break
} }
} else { } else {
// "key" falls in the range for this table. Add the // "ikey" falls in the range for this table. Add the
// approximate offset of "key" within the table. // approximate offset of "ikey" within the table.
var nn uint64 var nn uint64
nn, err = v.s.tops.offsetOf(t, key) nn, err = v.s.tops.offsetOf(t, ikey)
if err != nil { if err != nil {
return 0, err return 0, err
} }
@ -272,15 +282,15 @@ func (v *version) offsetOf(key iKey) (n uint64, err error) {
return return
} }
func (v *version) pickLevel(min, max []byte) (level int) { func (v *version) pickLevel(umin, umax []byte) (level int) {
if !v.tables[0].isOverlaps(min, max, false, v.s.icmp) { if !v.tables[0].overlaps(v.s.icmp, umin, umax, true) {
var r tFiles var overlaps tFiles
for ; level < kMaxMemCompactLevel; level++ { for ; level < kMaxMemCompactLevel; level++ {
if v.tables[level+1].isOverlaps(min, max, true, v.s.icmp) { if v.tables[level+1].overlaps(v.s.icmp, umin, umax, false) {
break break
} }
v.tables[level+2].getOverlaps(min, max, &r, true, v.s.icmp.ucmp) overlaps = v.tables[level+2].getOverlaps(overlaps, v.s.icmp, umin, umax, false)
if r.size() > kMaxGrandParentOverlapBytes { if overlaps.size() > kMaxGrandParentOverlapBytes {
break break
} }
} }
@ -294,7 +304,7 @@ func (v *version) computeCompaction() {
var bestLevel int = -1 var bestLevel int = -1
var bestScore float64 = -1 var bestScore float64 = -1
for level, ff := range v.tables { for level, tables := range v.tables {
var score float64 var score float64
if level == 0 { if level == 0 {
// We treat level-0 specially by bounding the number of files // We treat level-0 specially by bounding the number of files
@ -308,9 +318,9 @@ func (v *version) computeCompaction() {
// file size is small (perhaps because of a small write-buffer // file size is small (perhaps because of a small write-buffer
// setting, or very high compression ratios, or lots of // setting, or very high compression ratios, or lots of
// overwrites/deletions). // overwrites/deletions).
score = float64(len(ff)) / kL0_CompactionTrigger score = float64(len(tables)) / kL0_CompactionTrigger
} else { } else {
score = float64(ff.size()) / levelMaxSize[level] score = float64(tables.size()) / levelMaxSize[level]
} }
if score > bestScore { if score > bestScore {
@ -336,57 +346,51 @@ type versionStaging struct {
} }
func (p *versionStaging) commit(r *sessionRecord) { func (p *versionStaging) commit(r *sessionRecord) {
btt := p.base.tables // Deleted tables.
for _, r := range r.deletedTables {
tm := &(p.tables[r.level])
// deleted tables if len(p.base.tables[r.level]) > 0 {
for _, tr := range r.deletedTables {
tm := &(p.tables[tr.level])
bt := btt[tr.level]
if len(bt) > 0 {
if tm.deleted == nil { if tm.deleted == nil {
tm.deleted = make(map[uint64]struct{}) tm.deleted = make(map[uint64]struct{})
} }
tm.deleted[tr.num] = struct{}{} tm.deleted[r.num] = struct{}{}
} }
if tm.added != nil { if tm.added != nil {
delete(tm.added, tr.num) delete(tm.added, r.num)
} }
} }
// new tables // New tables.
for _, tr := range r.addedTables { for _, r := range r.addedTables {
tm := &(p.tables[tr.level]) tm := &(p.tables[r.level])
if tm.added == nil { if tm.added == nil {
tm.added = make(map[uint64]ntRecord) tm.added = make(map[uint64]ntRecord)
} }
tm.added[tr.num] = tr tm.added[r.num] = r
if tm.deleted != nil { if tm.deleted != nil {
delete(tm.deleted, tr.num) delete(tm.deleted, r.num)
} }
} }
} }
func (p *versionStaging) finish() *version { func (p *versionStaging) finish() *version {
s := p.base.s // Build new version.
btt := p.base.tables nv := &version{s: p.base.s}
// build new version
nv := &version{s: s}
for level, tm := range p.tables { for level, tm := range p.tables {
bt := btt[level] btables := p.base.tables[level]
n := len(bt) + len(tm.added) - len(tm.deleted) n := len(btables) + len(tm.added) - len(tm.deleted)
if n < 0 { if n < 0 {
n = 0 n = 0
} }
nt := make(tFiles, 0, n) nt := make(tFiles, 0, n)
// base tables // Base tables.
for _, t := range bt { for _, t := range btables {
if _, ok := tm.deleted[t.file.Num()]; ok { if _, ok := tm.deleted[t.file.Num()]; ok {
continue continue
} }
@ -396,17 +400,21 @@ func (p *versionStaging) finish() *version {
nt = append(nt, t) nt = append(nt, t)
} }
// new tables // New tables.
for _, tr := range tm.added { for _, r := range tm.added {
nt = append(nt, tr.makeFile(s)) nt = append(nt, r.makeFile(p.base.s))
} }
// sort tables // Sort tables.
nt.sortByKey(s.icmp) if level == 0 {
nt.sortByNum()
} else {
nt.sortByKey(p.base.s.icmp)
}
nv.tables[level] = nt nv.tables[level] = nt
} }
// compute compaction score for new version // Compute compaction score for new version.
nv.computeCompaction() nv.computeCompaction()
return nv return nv