diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index 0611e9197..574bf93c5 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -12,13 +12,13 @@ }, { "ImportPath": "code.google.com/p/go.crypto/bcrypt", - "Comment": "null-212", - "Rev": "1064b89a6fb591df0dd65422295b8498916b092f" + "Comment": "null-213", + "Rev": "aa2644fe4aa50e3b38d75187b4799b1f0c9ddcef" }, { "ImportPath": "code.google.com/p/go.crypto/blowfish", - "Comment": "null-212", - "Rev": "1064b89a6fb591df0dd65422295b8498916b092f" + "Comment": "null-213", + "Rev": "aa2644fe4aa50e3b38d75187b4799b1f0c9ddcef" }, { "ImportPath": "code.google.com/p/go.net/html", @@ -27,13 +27,13 @@ }, { "ImportPath": "code.google.com/p/go.text/transform", - "Comment": "null-87", - "Rev": "c59e4f2f93824f81213799e64c3eead7be24660a" + "Comment": "null-88", + "Rev": "1506dcc33592c369c3be7bd30b38f90445b86deb" }, { "ImportPath": "code.google.com/p/go.text/unicode/norm", - "Comment": "null-87", - "Rev": "c59e4f2f93824f81213799e64c3eead7be24660a" + "Comment": "null-88", + "Rev": "1506dcc33592c369c3be7bd30b38f90445b86deb" }, { "ImportPath": "code.google.com/p/snappy-go/snappy", @@ -42,7 +42,7 @@ }, { "ImportPath": "github.com/golang/groupcache/lru", - "Rev": "a531d51b7f9f3dd13c1c2b50d42d739b70442dbb" + "Rev": "8b25adc0f62632c810997cb38c21111a3f256bf4" }, { "ImportPath": "github.com/juju/ratelimit", @@ -50,7 +50,7 @@ }, { "ImportPath": "github.com/syndtr/goleveldb/leveldb", - "Rev": "e1f2d2bdccd7c62f4d4a29aaf081bf1fc4404f91" + "Rev": "ba4481e4cb1d45f586e32be2ab663f173b08b207" }, { "ImportPath": "github.com/vitrun/qart/coding", diff --git a/Godeps/_workspace/src/code.google.com/p/go.text/transform/transform.go b/Godeps/_workspace/src/code.google.com/p/go.text/transform/transform.go index b4c79fe73..0625df5ef 100644 --- a/Godeps/_workspace/src/code.google.com/p/go.text/transform/transform.go +++ b/Godeps/_workspace/src/code.google.com/p/go.text/transform/transform.go @@ -9,6 +9,7 @@ package transform import ( + "bytes" "errors" "io" "unicode/utf8" @@ -127,7 +128,7 @@ func (r *Reader) Read(p []byte) (int, error) { // cannot read more bytes into src. r.transformComplete = r.err != nil continue - case err == ErrShortDst && r.dst1 != 0: + case err == ErrShortDst && (r.dst1 != 0 || n != 0): // Make room in dst by copying out, and try again. continue case err == ErrShortSrc && r.src1-r.src0 != len(r.src) && r.err == nil: @@ -210,7 +211,7 @@ func (w *Writer) Write(data []byte) (n int, err error) { n += nSrc } switch { - case err == ErrShortDst && nDst > 0: + case err == ErrShortDst && (nDst > 0 || nSrc > 0): case err == ErrShortSrc && len(src) < len(w.src): m := copy(w.src, src) // If w.n > 0, bytes from data were already copied to w.src and n @@ -467,30 +468,125 @@ func (t removeF) Transform(dst, src []byte, atEOF bool) (nDst, nSrc int, err err return } -// Bytes returns a new byte slice with the result of converting b using t. -// If any unrecoverable error occurs it returns nil. -func Bytes(t Transformer, b []byte) []byte { - out := make([]byte, len(b)) - n := 0 - for { - nDst, nSrc, err := t.Transform(out[n:], b, true) - n += nDst - if err == nil { - return out[:n] - } else if err != ErrShortDst { - return nil - } - b = b[nSrc:] +// grow returns a new []byte that is longer than b, and copies the first n bytes +// of b to the start of the new slice. +func grow(b []byte, n int) []byte { + m := len(b) + if m <= 256 { + m *= 2 + } else { + m += m >> 1 + } + buf := make([]byte, m) + copy(buf, b[:n]) + return buf +} - // Grow the destination buffer. - sz := len(out) - if sz <= 256 { - sz *= 2 - } else { - sz += sz >> 1 +const initialBufSize = 128 + +// String returns a string with the result of converting s[:n] using t, where +// n <= len(s). If err == nil, n will be len(s). +func String(t Transformer, s string) (result string, n int, err error) { + if s == "" { + return "", 0, nil + } + + // Allocate only once. Note that both dst and src escape when passed to + // Transform. + buf := [2 * initialBufSize]byte{} + dst := buf[:initialBufSize:initialBufSize] + src := buf[initialBufSize : 2*initialBufSize] + + // Avoid allocation if the transformed string is identical to the original. + // After this loop, pDst will point to the furthest point in s for which it + // could be detected that t gives equal results, src[:nSrc] will + // indicated the last processed chunk of s for which the output is not equal + // and dst[:nDst] will be the transform of this chunk. + var nDst, nSrc int + pDst := 0 // Used as index in both src and dst in this loop. + for { + n := copy(src, s[pDst:]) + nDst, nSrc, err = t.Transform(dst, src[:n], pDst+n == len(s)) + + // Note 1: we will not enter the loop with pDst == len(s) and we will + // not end the loop with it either. So if nSrc is 0, this means there is + // some kind of error from which we cannot recover given the current + // buffer sizes. We will give up in this case. + // Note 2: it is not entirely correct to simply do a bytes.Equal as + // a Transformer may buffer internally. It will work in most cases, + // though, and no harm is done if it doesn't work. + // TODO: let transformers implement an optional Spanner interface, akin + // to norm's QuickSpan. This would even allow us to avoid any allocation. + if nSrc == 0 || !bytes.Equal(dst[:nDst], src[:nSrc]) { + break + } + + if pDst += nDst; pDst == len(s) { + return s, pDst, nil + } + } + + // Move the bytes seen so far to dst. + pSrc := pDst + nSrc + if pDst+nDst <= initialBufSize { + copy(dst[pDst:], dst[:nDst]) + } else { + b := make([]byte, len(s)+nDst-nSrc) + copy(b[pDst:], dst[:nDst]) + dst = b + } + copy(dst, s[:pDst]) + pDst += nDst + + if err != nil && err != ErrShortDst && err != ErrShortSrc { + return string(dst[:pDst]), pSrc, err + } + + // Complete the string with the remainder. + for { + n := copy(src, s[pSrc:]) + nDst, nSrc, err = t.Transform(dst[pDst:], src[:n], pSrc+n == len(s)) + pDst += nDst + pSrc += nSrc + + switch err { + case nil: + if pSrc == len(s) { + return string(dst[:pDst]), pSrc, nil + } + case ErrShortDst: + // Do not grow as long as we can make progress. This may avoid + // excessive allocations. + if nDst == 0 { + dst = grow(dst, pDst) + } + case ErrShortSrc: + if nSrc == 0 { + src = grow(src, 0) + } + default: + return string(dst[:pDst]), pSrc, err + } + } +} + +// Bytes returns a new byte slice with the result of converting b[:n] using t, +// where n <= len(b). If err == nil, n will be len(b). +func Bytes(t Transformer, b []byte) (result []byte, n int, err error) { + dst := make([]byte, len(b)) + pDst, pSrc := 0, 0 + for { + nDst, nSrc, err := t.Transform(dst[pDst:], b[pSrc:], true) + pDst += nDst + pSrc += nSrc + if err != ErrShortDst { + return dst[:pDst], pSrc, err + } + + // Grow the destination buffer, but do not grow as long as we can make + // progress. This may avoid excessive allocations. + if nDst == 0 { + dst = grow(dst, pDst) } - out2 := make([]byte, sz) - copy(out2, out[:n]) - out = out2 } } diff --git a/Godeps/_workspace/src/code.google.com/p/go.text/transform/transform_test.go b/Godeps/_workspace/src/code.google.com/p/go.text/transform/transform_test.go index 7444a18dd..ec3b11e2f 100644 --- a/Godeps/_workspace/src/code.google.com/p/go.text/transform/transform_test.go +++ b/Godeps/_workspace/src/code.google.com/p/go.text/transform/transform_test.go @@ -12,6 +12,7 @@ import ( "strconv" "strings" "testing" + "time" "unicode/utf8" ) @@ -132,6 +133,43 @@ func (e rleEncode) Transform(dst, src []byte, atEOF bool) (nDst, nSrc int, err e return nDst, nSrc, nil } +// trickler consumes all input bytes, but writes a single byte at a time to dst. +type trickler []byte + +func (t *trickler) Transform(dst, src []byte, atEOF bool) (nDst, nSrc int, err error) { + *t = append(*t, src...) + if len(*t) == 0 { + return 0, 0, nil + } + if len(dst) == 0 { + return 0, len(src), ErrShortDst + } + dst[0] = (*t)[0] + *t = (*t)[1:] + if len(*t) > 0 { + err = ErrShortDst + } + return 1, len(src), err +} + +// delayedTrickler is like trickler, but delays writing output to dst. This is +// highly unlikely to be relevant in practice, but it seems like a good idea +// to have some tolerance as long as progress can be detected. +type delayedTrickler []byte + +func (t *delayedTrickler) Transform(dst, src []byte, atEOF bool) (nDst, nSrc int, err error) { + if len(*t) > 0 && len(dst) > 0 { + dst[0] = (*t)[0] + *t = (*t)[1:] + nDst = 1 + } + *t = append(*t, src...) + if len(*t) > 0 { + err = ErrShortDst + } + return nDst, len(src), err +} + type testCase struct { desc string t Transformer @@ -170,6 +208,15 @@ func (c chain) String() string { } var testCases = []testCase{ + { + desc: "empty", + t: lowerCaseASCII{}, + src: "", + dstSize: 100, + srcSize: 100, + wantStr: "", + }, + { desc: "basic", t: lowerCaseASCII{}, @@ -378,6 +425,24 @@ var testCases = []testCase{ ioSize: 10, wantStr: "4a6b2b4c4d1d", }, + + { + desc: "trickler", + t: &trickler{}, + src: "abcdefghijklm", + dstSize: 3, + srcSize: 15, + wantStr: "abcdefghijklm", + }, + + { + desc: "delayedTrickler", + t: &delayedTrickler{}, + src: "abcdefghijklm", + dstSize: 3, + srcSize: 15, + wantStr: "abcdefghijklm", + }, } func TestReader(t *testing.T) { @@ -685,7 +750,7 @@ func doTransform(tc testCase) (res string, iter int, err error) { switch { case err == nil && len(in) != 0: case err == ErrShortSrc && nSrc > 0: - case err == ErrShortDst && nDst > 0: + case err == ErrShortDst && (nDst > 0 || nSrc > 0): default: return string(out), iter, err } @@ -875,27 +940,136 @@ func TestRemoveFunc(t *testing.T) { } } -func TestBytes(t *testing.T) { +func testString(t *testing.T, f func(Transformer, string) (string, int, error)) { for _, tt := range append(testCases, chainTests()...) { if tt.desc == "allowStutter = true" { // We don't have control over the buffer size, so we eliminate tests // that depend on a specific buffer size being set. continue } - got := Bytes(tt.t, []byte(tt.src)) - if tt.wantErr != nil { - if tt.wantErr != ErrShortDst && tt.wantErr != ErrShortSrc { - // Bytes should return nil for non-recoverable errors. - if g, w := (got == nil), (tt.wantErr != nil); g != w { - t.Errorf("%s:error: got %v; want %v", tt.desc, g, w) - } - } - // The output strings in the tests that expect an error will - // almost certainly not be the same as the result of Bytes. + reset(tt.t) + if tt.wantErr == ErrShortDst || tt.wantErr == ErrShortSrc { + // The result string will be different. continue } - if string(got) != tt.wantStr { + got, n, err := f(tt.t, tt.src) + if tt.wantErr != err { + t.Errorf("%s:error: got %v; want %v", tt.desc, err, tt.wantErr) + } + if got, want := err == nil, n == len(tt.src); got != want { + t.Errorf("%s:n: got %v; want %v", tt.desc, got, want) + } + if got != tt.wantStr { t.Errorf("%s:string: got %q; want %q", tt.desc, got, tt.wantStr) } } } + +func TestBytes(t *testing.T) { + testString(t, func(z Transformer, s string) (string, int, error) { + b, n, err := Bytes(z, []byte(s)) + return string(b), n, err + }) +} + +func TestString(t *testing.T) { + testString(t, String) + + // Overrun the internal destination buffer. + for i, s := range []string{ + strings.Repeat("a", initialBufSize-1), + strings.Repeat("a", initialBufSize+0), + strings.Repeat("a", initialBufSize+1), + strings.Repeat("A", initialBufSize-1), + strings.Repeat("A", initialBufSize+0), + strings.Repeat("A", initialBufSize+1), + strings.Repeat("A", 2*initialBufSize-1), + strings.Repeat("A", 2*initialBufSize+0), + strings.Repeat("A", 2*initialBufSize+1), + strings.Repeat("a", initialBufSize-2) + "A", + strings.Repeat("a", initialBufSize-1) + "A", + strings.Repeat("a", initialBufSize+0) + "A", + strings.Repeat("a", initialBufSize+1) + "A", + } { + got, _, _ := String(lowerCaseASCII{}, s) + if want := strings.ToLower(s); got != want { + t.Errorf("%d:dst buffer test: got %s (%d); want %s (%d)", i, got, len(got), want, len(want)) + } + } + + // Overrun the internal source buffer. + for i, s := range []string{ + strings.Repeat("a", initialBufSize-1), + strings.Repeat("a", initialBufSize+0), + strings.Repeat("a", initialBufSize+1), + strings.Repeat("a", 2*initialBufSize+1), + strings.Repeat("a", 2*initialBufSize+0), + strings.Repeat("a", 2*initialBufSize+1), + } { + got, _, _ := String(rleEncode{}, s) + if want := fmt.Sprintf("%da", len(s)); got != want { + t.Errorf("%d:src buffer test: got %s (%d); want %s (%d)", i, got, len(got), want, len(want)) + } + } + + // Test allocations for non-changing strings. + // Note we still need to allocate a single buffer. + for i, s := range []string{ + "", + "123", + "123456789", + strings.Repeat("a", initialBufSize), + strings.Repeat("a", 10*initialBufSize), + } { + if n := testing.AllocsPerRun(5, func() { String(&lowerCaseASCII{}, s) }); n > 1 { + t.Errorf("%d: #allocs was %f; want 1", i, n) + } + } +} + +// TestBytesAllocation tests that buffer growth stays limited with the trickler +// transformer, which behaves oddly but within spec. In case buffer growth is +// not correctly handled, the test will either panic with a failed allocation or +// thrash. To ensure the tests terminate under the last condition, we time out +// after some sufficiently long period of time. +func TestBytesAllocation(t *testing.T) { + done := make(chan bool) + go func() { + in := bytes.Repeat([]byte{'a'}, 1000) + tr := trickler(make([]byte, 1)) + Bytes(&tr, in) + done <- true + }() + select { + case <-done: + case <-time.After(3 * time.Second): + t.Error("time out, likely due to excessive allocation") + } +} + +// TestStringAllocation tests that buffer growth stays limited with the trickler +// transformer, which behaves oddly but within spec. In case buffer growth is +// not correctly handled, the test will either panic with a failed allocation or +// thrash. To ensure the tests terminate under the last condition, we time out +// after some sufficiently long period of time. +func TestStringAllocation(t *testing.T) { + done := make(chan bool) + go func() { + in := strings.Repeat("a", 1000) + tr := trickler(make([]byte, 1)) + String(&tr, in) + done <- true + }() + select { + case <-done: + case <-time.After(3 * time.Second): + t.Error("time out, likely due to excessive allocation") + } +} + +func BenchmarkStringLower(b *testing.B) { + in := strings.Repeat("a", 4096) + for i := 0; i < b.N; i++ { + String(&lowerCaseASCII{}, in) + } +} diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/cache/cache.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/cache/cache.go index 9b6a74977..d71fe1505 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/cache/cache.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/cache/cache.go @@ -29,7 +29,7 @@ type DelFin func(exist bool) // to call it or not. type PurgeFin func(ns, key uint64, delfin DelFin) -// Cache is a cache tree. +// Cache is a cache tree. A cache instance must be goroutine-safe. type Cache interface { // SetCapacity sets cache capacity. SetCapacity(capacity int) @@ -44,7 +44,7 @@ type Cache interface { Zap(closed bool) } -// Namespace is a cache namespace. +// Namespace is a cache namespace. A namespace instance must be goroutine-safe. type Namespace interface { // Get gets cache object for the given key. The given SetFunc (if not nil) will // be called if the given key does not exist. diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db.go index 9ce8763f4..fd11a8633 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db.go @@ -30,9 +30,10 @@ type DB struct { // Need 64-bit alignment. seq uint64 + // Session. s *session - // MemDB + // MemDB. memMu sync.RWMutex mem *memdb.DB frozenMem *memdb.DB @@ -42,11 +43,11 @@ type DB struct { frozenJournalFile storage.File frozenSeq uint64 - // Snapshot + // Snapshot. snapsMu sync.Mutex snapsRoot snapshotElement - // Write + // Write. writeC chan *Batch writeMergedC chan bool writeLockC chan struct{} @@ -54,7 +55,7 @@ type DB struct { journalC chan *Batch journalAckC chan error - // Compaction + // Compaction. tcompCmdC chan cCmd tcompPauseC chan chan<- struct{} tcompTriggerC chan struct{} @@ -64,7 +65,7 @@ type DB struct { compErrSetC chan error compStats [kNumLevels]cStats - // Close + // Close. closeW sync.WaitGroup closeC chan struct{} closed uint32 @@ -135,9 +136,10 @@ func openDB(s *session) (*DB, error) { // detected in the DB. Corrupted DB can be recovered with Recover // function. // +// The returned DB instance is goroutine-safe. // The DB must be closed after use, by calling Close method. -func Open(p storage.Storage, o *opt.Options) (db *DB, err error) { - s, err := newSession(p, o) +func Open(stor storage.Storage, o *opt.Options) (db *DB, err error) { + s, err := newSession(stor, o) if err != nil { return } @@ -177,6 +179,7 @@ func Open(p storage.Storage, o *opt.Options) (db *DB, err error) { // detected in the DB. Corrupted DB can be recovered with Recover // function. // +// The returned DB instance is goroutine-safe. // The DB must be closed after use, by calling Close method. func OpenFile(path string, o *opt.Options) (db *DB, err error) { stor, err := storage.OpenFile(path) @@ -197,9 +200,10 @@ func OpenFile(path string, o *opt.Options) (db *DB, err error) { // The DB must already exist or it will returns an error. // Also, Recover will ignore ErrorIfMissing and ErrorIfExist options. // +// The returned DB instance is goroutine-safe. // The DB must be closed after use, by calling Close method. -func Recover(p storage.Storage, o *opt.Options) (db *DB, err error) { - s, err := newSession(p, o) +func Recover(stor storage.Storage, o *opt.Options) (db *DB, err error) { + s, err := newSession(stor, o) if err != nil { return } @@ -225,6 +229,7 @@ func Recover(p storage.Storage, o *opt.Options) (db *DB, err error) { // RecoverFile uses standard file-system backed storage implementation as desribed // in the leveldb/storage package. // +// The returned DB instance is goroutine-safe. // The DB must be closed after use, by calling Close method. func RecoverFile(path string, o *opt.Options) (db *DB, err error) { stor, err := storage.OpenFile(path) @@ -241,12 +246,13 @@ func RecoverFile(path string, o *opt.Options) (db *DB, err error) { } func recoverTable(s *session, o *opt.Options) error { - ff0, err := s.getFiles(storage.TypeTable) + // Get all tables and sort it by file number. + tableFiles_, err := s.getFiles(storage.TypeTable) if err != nil { return err } - ff1 := files(ff0) - ff1.sort() + tableFiles := files(tableFiles_) + tableFiles.sort() var mSeq uint64 var good, corrupted int @@ -264,8 +270,9 @@ func recoverTable(s *session, o *opt.Options) error { tmp = nil } }() + + // Copy entries. tw := table.NewWriter(writer, o) - // Copy records. for iter.Next() { key := iter.Key() if validIkey(key) { @@ -297,20 +304,23 @@ func recoverTable(s *session, o *opt.Options) error { return err } defer reader.Close() + // Get file size. size, err := reader.Seek(0, 2) if err != nil { return err } + var tSeq uint64 var tgood, tcorrupted, blockerr int - var min, max []byte + var imin, imax []byte tr := table.NewReader(reader, size, nil, o) iter := tr.NewIterator(nil, nil) iter.(iterator.ErrorCallbackSetter).SetErrorCallback(func(err error) { s.logf("table@recovery found error @%d %q", file.Num(), err) blockerr++ }) + // Scan the table. for iter.Next() { key := iter.Key() @@ -323,16 +333,17 @@ func recoverTable(s *session, o *opt.Options) error { if seq > tSeq { tSeq = seq } - if min == nil { - min = append([]byte{}, key...) + if imin == nil { + imin = append([]byte{}, key...) } - max = append(max[:0], key...) + imax = append(imax[:0], key...) } if err := iter.Error(); err != nil { iter.Release() return err } iter.Release() + if tgood > 0 { if tcorrupted > 0 || blockerr > 0 { // Rebuild the table. @@ -353,7 +364,7 @@ func recoverTable(s *session, o *opt.Options) error { mSeq = tSeq } // Add table to level 0. - rec.addTable(0, file.Num(), uint64(size), min, max) + rec.addTable(0, file.Num(), uint64(size), imin, imax) s.logf("table@recovery recovered @%d N·%d C·%d B·%d S·%d Q·%d", file.Num(), tgood, tcorrupted, blockerr, size, tSeq) } else { s.logf("table@recovery unrecoverable @%d C·%d B·%d S·%d", file.Num(), tcorrupted, blockerr, size) @@ -364,41 +375,56 @@ func recoverTable(s *session, o *opt.Options) error { return nil } + // Recover all tables. - if len(ff1) > 0 { - s.logf("table@recovery F·%d", len(ff1)) - s.markFileNum(ff1[len(ff1)-1].Num()) - for _, file := range ff1 { + if len(tableFiles) > 0 { + s.logf("table@recovery F·%d", len(tableFiles)) + + // Mark file number as used. + s.markFileNum(tableFiles[len(tableFiles)-1].Num()) + + for _, file := range tableFiles { if err := recoverTable(file); err != nil { return err } } - s.logf("table@recovery recovered F·%d N·%d C·%d Q·%d", len(ff1), good, corrupted, mSeq) + + s.logf("table@recovery recovered F·%d N·%d C·%d Q·%d", len(tableFiles), good, corrupted, mSeq) } + // Set sequence number. rec.setSeq(mSeq + 1) + // Create new manifest. if err := s.create(); err != nil { return err } + // Commit. return s.commit(rec) } -func (d *DB) recoverJournal() error { - s := d.s - - ff0, err := s.getFiles(storage.TypeJournal) +func (db *DB) recoverJournal() error { + // Get all tables and sort it by file number. + journalFiles_, err := db.s.getFiles(storage.TypeJournal) if err != nil { return err } - ff1 := files(ff0) - ff1.sort() - ff2 := make([]storage.File, 0, len(ff1)) - for _, file := range ff1 { - if file.Num() >= s.stJournalNum || file.Num() == s.stPrevJournalNum { - s.markFileNum(file.Num()) - ff2 = append(ff2, file) + journalFiles := files(journalFiles_) + journalFiles.sort() + + // Discard older journal. + prev := -1 + for i, file := range journalFiles { + if file.Num() >= db.s.stJournalNum { + if prev >= 0 { + i-- + journalFiles[i] = journalFiles[prev] + } + journalFiles = journalFiles[i:] + break + } else if file.Num() == db.s.stPrevJournalNum { + prev = i } } @@ -406,38 +432,43 @@ func (d *DB) recoverJournal() error { var of storage.File var mem *memdb.DB batch := new(Batch) - cm := newCMem(s) + cm := newCMem(db.s) buf := new(util.Buffer) // Options. - strict := s.o.GetStrict(opt.StrictJournal) - checksum := s.o.GetStrict(opt.StrictJournalChecksum) - writeBuffer := s.o.GetWriteBuffer() + strict := db.s.o.GetStrict(opt.StrictJournal) + checksum := db.s.o.GetStrict(opt.StrictJournalChecksum) + writeBuffer := db.s.o.GetWriteBuffer() recoverJournal := func(file storage.File) error { - s.logf("journal@recovery recovering @%d", file.Num()) + db.logf("journal@recovery recovering @%d", file.Num()) reader, err := file.Open() if err != nil { return err } defer reader.Close() + + // Create/reset journal reader instance. if jr == nil { - jr = journal.NewReader(reader, dropper{s, file}, strict, checksum) + jr = journal.NewReader(reader, dropper{db.s, file}, strict, checksum) } else { - jr.Reset(reader, dropper{s, file}, strict, checksum) + jr.Reset(reader, dropper{db.s, file}, strict, checksum) } + + // Flush memdb and remove obsolete journal file. if of != nil { if mem.Len() > 0 { if err := cm.flush(mem, 0); err != nil { return err } } - if err := cm.commit(file.Num(), d.seq); err != nil { + if err := cm.commit(file.Num(), db.seq); err != nil { return err } cm.reset() of.Remove() of = nil } - // Reset memdb. + + // Replay journal to memdb. mem.Reset() for { r, err := jr.Next() @@ -447,6 +478,7 @@ func (d *DB) recoverJournal() error { } return err } + buf.Reset() if _, err := buf.ReadFrom(r); err != nil { if strict { @@ -460,28 +492,37 @@ func (d *DB) recoverJournal() error { if err := batch.memReplay(mem); err != nil { return err } - d.seq = batch.seq + uint64(batch.len()) + + // Save sequence number. + db.seq = batch.seq + uint64(batch.len()) + + // Flush it if large enough. if mem.Size() >= writeBuffer { - // Large enough, flush it. if err := cm.flush(mem, 0); err != nil { return err } - // Reset memdb. mem.Reset() } } + of = file return nil } + // Recover all journals. - if len(ff2) > 0 { - s.logf("journal@recovery F·%d", len(ff2)) - mem = memdb.New(s.icmp, writeBuffer) - for _, file := range ff2 { + if len(journalFiles) > 0 { + db.logf("journal@recovery F·%d", len(journalFiles)) + + // Mark file number as used. + db.s.markFileNum(journalFiles[len(journalFiles)-1].Num()) + + mem = memdb.New(db.s.icmp, writeBuffer) + for _, file := range journalFiles { if err := recoverJournal(file); err != nil { return err } } + // Flush the last journal. if mem.Len() > 0 { if err := cm.flush(mem, 0); err != nil { @@ -489,35 +530,43 @@ func (d *DB) recoverJournal() error { } } } + // Create a new journal. - if _, err := d.newMem(0); err != nil { + if _, err := db.newMem(0); err != nil { return err } + // Commit. - if err := cm.commit(d.journalFile.Num(), d.seq); err != nil { + if err := cm.commit(db.journalFile.Num(), db.seq); err != nil { + // Close journal. + if db.journal != nil { + db.journal.Close() + db.journalWriter.Close() + } return err } - // Remove the last journal. + + // Remove the last obsolete journal file. if of != nil { of.Remove() } + return nil } -func (d *DB) get(key []byte, seq uint64, ro *opt.ReadOptions) (value []byte, err error) { - s := d.s - +func (db *DB) get(key []byte, seq uint64, ro *opt.ReadOptions) (value []byte, err error) { ikey := newIKey(key, seq, tSeek) - em, fm := d.getMems() + em, fm := db.getMems() for _, m := range [...]*memdb.DB{em, fm} { if m == nil { continue } + mk, mv, me := m.Find(ikey) if me == nil { ukey, _, t, ok := parseIkey(mk) - if ok && s.icmp.uCompare(ukey, key) == 0 { + if ok && db.s.icmp.uCompare(ukey, key) == 0 { if t == tDel { return nil, ErrNotFound } @@ -528,12 +577,12 @@ func (d *DB) get(key []byte, seq uint64, ro *opt.ReadOptions) (value []byte, err } } - v := s.version() + v := db.s.version() value, cSched, err := v.get(ikey, ro) v.release() if cSched { // Trigger table compaction. - d.compTrigger(d.tcompTriggerC) + db.compTrigger(db.tcompTriggerC) } return } @@ -543,13 +592,13 @@ func (d *DB) get(key []byte, seq uint64, ro *opt.ReadOptions) (value []byte, err // // The caller should not modify the contents of the returned slice, but // it is safe to modify the contents of the argument after Get returns. -func (d *DB) Get(key []byte, ro *opt.ReadOptions) (value []byte, err error) { - err = d.ok() +func (db *DB) Get(key []byte, ro *opt.ReadOptions) (value []byte, err error) { + err = db.ok() if err != nil { return } - return d.get(key, d.getSeq(), ro) + return db.get(key, db.getSeq(), ro) } // NewIterator returns an iterator for the latest snapshot of the @@ -568,14 +617,14 @@ func (d *DB) Get(key []byte, ro *opt.ReadOptions) (value []byte, err error) { // The iterator must be released after use, by calling Release method. // // Also read Iterator documentation of the leveldb/iterator package. -func (d *DB) NewIterator(slice *util.Range, ro *opt.ReadOptions) iterator.Iterator { - if err := d.ok(); err != nil { +func (db *DB) NewIterator(slice *util.Range, ro *opt.ReadOptions) iterator.Iterator { + if err := db.ok(); err != nil { return iterator.NewEmptyIterator(err) } - p := d.newSnapshot() - defer p.Release() - return p.NewIterator(slice, ro) + snap := db.newSnapshot() + defer snap.Release() + return snap.NewIterator(slice, ro) } // GetSnapshot returns a latest snapshot of the underlying DB. A snapshot @@ -583,12 +632,12 @@ func (d *DB) NewIterator(slice *util.Range, ro *opt.ReadOptions) iterator.Iterat // content of snapshot are guaranteed to be consistent. // // The snapshot must be released after use, by calling Release method. -func (d *DB) GetSnapshot() (*Snapshot, error) { - if err := d.ok(); err != nil { +func (db *DB) GetSnapshot() (*Snapshot, error) { + if err := db.ok(); err != nil { return nil, err } - return d.newSnapshot(), nil + return db.newSnapshot(), nil } // GetProperty returns value of the given property name. @@ -600,8 +649,8 @@ func (d *DB) GetSnapshot() (*Snapshot, error) { // Returns statistics of the underlying DB. // leveldb.sstables // Returns sstables list for each level. -func (d *DB) GetProperty(name string) (value string, err error) { - err = d.ok() +func (db *DB) GetProperty(name string) (value string, err error) { + err = db.ok() if err != nil { return } @@ -610,11 +659,9 @@ func (d *DB) GetProperty(name string) (value string, err error) { if !strings.HasPrefix(name, prefix) { return "", errors.New("leveldb: GetProperty: unknown property: " + name) } - p := name[len(prefix):] - s := d.s - v := s.version() + v := db.s.version() defer v.release() switch { @@ -631,20 +678,20 @@ func (d *DB) GetProperty(name string) (value string, err error) { value = "Compactions\n" + " Level | Tables | Size(MB) | Time(sec) | Read(MB) | Write(MB)\n" + "-------+------------+---------------+---------------+---------------+---------------\n" - for level, tt := range v.tables { - duration, read, write := d.compStats[level].get() - if len(tt) == 0 && duration == 0 { + for level, tables := range v.tables { + duration, read, write := db.compStats[level].get() + if len(tables) == 0 && duration == 0 { continue } value += fmt.Sprintf(" %3d | %10d | %13.5f | %13.5f | %13.5f | %13.5f\n", - level, len(tt), float64(tt.size())/1048576.0, duration.Seconds(), + level, len(tables), float64(tables.size())/1048576.0, duration.Seconds(), float64(read)/1048576.0, float64(write)/1048576.0) } case p == "sstables": - for level, tt := range v.tables { + for level, tables := range v.tables { value += fmt.Sprintf("--- level %d ---\n", level) - for _, t := range tt { - value += fmt.Sprintf("%d:%d[%q .. %q]\n", t.file.Num(), t.size, t.min, t.max) + for _, t := range tables { + value += fmt.Sprintf("%d:%d[%q .. %q]\n", t.file.Num(), t.size, t.imin, t.imax) } } default: @@ -660,23 +707,23 @@ func (d *DB) GetProperty(name string) (value string, err error) { // data compresses by a factor of ten, the returned sizes will be one-tenth // the size of the corresponding user data size. // The results may not include the sizes of recently written data. -func (d *DB) SizeOf(ranges []util.Range) (Sizes, error) { - if err := d.ok(); err != nil { +func (db *DB) SizeOf(ranges []util.Range) (Sizes, error) { + if err := db.ok(); err != nil { return nil, err } - v := d.s.version() + v := db.s.version() defer v.release() sizes := make(Sizes, 0, len(ranges)) for _, r := range ranges { - min := newIKey(r.Start, kMaxSeq, tSeek) - max := newIKey(r.Limit, kMaxSeq, tSeek) - start, err := v.offsetOf(min) + imin := newIKey(r.Start, kMaxSeq, tSeek) + imax := newIKey(r.Limit, kMaxSeq, tSeek) + start, err := v.offsetOf(imin) if err != nil { return nil, err } - limit, err := v.offsetOf(max) + limit, err := v.offsetOf(imax) if err != nil { return nil, err } @@ -690,61 +737,63 @@ func (d *DB) SizeOf(ranges []util.Range) (Sizes, error) { return sizes, nil } -// Close closes the DB. This will also releases any outstanding snapshot. +// Close closes the DB. This will also releases any outstanding snapshot and +// abort any in-flight compaction. // // It is not safe to close a DB until all outstanding iterators are released. // It is valid to call Close multiple times. Other methods should not be // called after the DB has been closed. -func (d *DB) Close() error { - if !d.setClosed() { +func (db *DB) Close() error { + if !db.setClosed() { return ErrClosed } - s := d.s start := time.Now() - s.log("db@close closing") + db.log("db@close closing") // Clear the finalizer. - runtime.SetFinalizer(d, nil) + runtime.SetFinalizer(db, nil) // Get compaction error. var err error select { - case err = <-d.compErrC: + case err = <-db.compErrC: default: } - close(d.closeC) + close(db.closeC) // Wait for the close WaitGroup. - d.closeW.Wait() + db.closeW.Wait() // Close journal. - if d.journal != nil { - d.journal.Close() - d.journalWriter.Close() + db.writeLockC <- struct{}{} + if db.journal != nil { + db.journal.Close() + db.journalWriter.Close() } // Close session. - s.close() - s.logf("db@close done T·%v", time.Since(start)) - s.release() + db.s.close() + db.logf("db@close done T·%v", time.Since(start)) + db.s.release() - if d.closer != nil { - if err1 := d.closer.Close(); err == nil { + if db.closer != nil { + if err1 := db.closer.Close(); err == nil { err = err1 } } - d.s = nil - d.mem = nil - d.frozenMem = nil - d.journal = nil - d.journalWriter = nil - d.journalFile = nil - d.frozenJournalFile = nil - d.snapsRoot = snapshotElement{} - d.closer = nil + // NIL'ing pointers. + db.s = nil + db.mem = nil + db.frozenMem = nil + db.journal = nil + db.journalWriter = nil + db.journalFile = nil + db.frozenJournalFile = nil + db.snapsRoot = snapshotElement{} + db.closer = nil return err } diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_compaction.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_compaction.go index c82bd9f28..87ac12cb5 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_compaction.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_compaction.go @@ -74,7 +74,7 @@ func newCMem(s *session) *cMem { func (c *cMem) flush(mem *memdb.DB, level int) error { s := c.s - // Write memdb to table + // Write memdb to table. iter := mem.NewIterator(nil) defer iter.Release() t, n, err := s.tops.createFrom(iter) @@ -82,12 +82,13 @@ func (c *cMem) flush(mem *memdb.DB, level int) error { return err } + // Pick level. if level < 0 { - level = s.version_NB().pickLevel(t.min.ukey(), t.max.ukey()) + level = s.version_NB().pickLevel(t.imin.ukey(), t.imax.ukey()) } c.rec.addTableFile(level, t) - s.logf("mem@flush created L%d@%d N·%d S·%s %q:%q", level, t.file.Num(), n, shortenb(int(t.size)), t.min, t.max) + s.logf("mem@flush created L%d@%d N·%d S·%s %q:%q", level, t.file.Num(), n, shortenb(int(t.size)), t.imin, t.imax) c.level = level return nil @@ -100,33 +101,34 @@ func (c *cMem) reset() { func (c *cMem) commit(journal, seq uint64) error { c.rec.setJournalNum(journal) c.rec.setSeq(seq) - // Commit changes + + // Commit changes. return c.s.commit(c.rec) } -func (d *DB) compactionError() { +func (db *DB) compactionError() { var err error noerr: for { select { - case _, _ = <-d.closeC: - return - case err = <-d.compErrSetC: + case err = <-db.compErrSetC: if err != nil { goto haserr } + case _, _ = <-db.closeC: + return } } haserr: for { select { - case _, _ = <-d.closeC: - return - case err = <-d.compErrSetC: + case db.compErrC <- err: + case err = <-db.compErrSetC: if err == nil { goto noerr } - case d.compErrC <- err: + case _, _ = <-db.closeC: + return } } } @@ -137,18 +139,18 @@ func (cnt *compactionTransactCounter) incr() { *cnt++ } -func (d *DB) compactionTransact(name string, exec func(cnt *compactionTransactCounter) error, rollback func() error) { - s := d.s +func (db *DB) compactionTransact(name string, exec func(cnt *compactionTransactCounter) error, rollback func() error) { defer func() { if x := recover(); x != nil { if x == errCompactionTransactExiting && rollback != nil { if err := rollback(); err != nil { - s.logf("%s rollback error %q", name, err) + db.logf("%s rollback error %q", name, err) } } panic(x) } }() + const ( backoffMin = 1 * time.Second backoffMax = 8 * time.Second @@ -159,11 +161,11 @@ func (d *DB) compactionTransact(name string, exec func(cnt *compactionTransactCo lastCnt := compactionTransactCounter(0) for n := 0; ; n++ { // Check wether the DB is closed. - if d.isClosed() { - s.logf("%s exiting", name) - d.compactionExitTransact() + if db.isClosed() { + db.logf("%s exiting", name) + db.compactionExitTransact() } else if n > 0 { - s.logf("%s retrying N·%d", name, n) + db.logf("%s retrying N·%d", name, n) } // Execute. @@ -172,15 +174,15 @@ func (d *DB) compactionTransact(name string, exec func(cnt *compactionTransactCo // Set compaction error status. select { - case d.compErrSetC <- err: - case _, _ = <-d.closeC: - s.logf("%s exiting", name) - d.compactionExitTransact() + case db.compErrSetC <- err: + case _, _ = <-db.closeC: + db.logf("%s exiting", name) + db.compactionExitTransact() } if err == nil { return } - s.logf("%s error I·%d %q", name, cnt, err) + db.logf("%s error I·%d %q", name, cnt, err) // Reset backoff duration if counter is advancing. if cnt > lastCnt { @@ -198,53 +200,52 @@ func (d *DB) compactionTransact(name string, exec func(cnt *compactionTransactCo } select { case <-backoffT.C: - case _, _ = <-d.closeC: - s.logf("%s exiting", name) - d.compactionExitTransact() + case _, _ = <-db.closeC: + db.logf("%s exiting", name) + db.compactionExitTransact() } } } -func (d *DB) compactionExitTransact() { +func (db *DB) compactionExitTransact() { panic(errCompactionTransactExiting) } -func (d *DB) memCompaction() { - mem := d.getFrozenMem() +func (db *DB) memCompaction() { + mem := db.getFrozenMem() if mem == nil { return } - s := d.s - c := newCMem(s) + c := newCMem(db.s) stats := new(cStatsStaging) - s.logf("mem@flush N·%d S·%s", mem.Len(), shortenb(mem.Size())) + db.logf("mem@flush N·%d S·%s", mem.Len(), shortenb(mem.Size())) // Don't compact empty memdb. if mem.Len() == 0 { - s.logf("mem@flush skipping") + db.logf("mem@flush skipping") // drop frozen mem - d.dropFrozenMem() + db.dropFrozenMem() return } // Pause table compaction. ch := make(chan struct{}) select { - case d.tcompPauseC <- (chan<- struct{})(ch): - case _, _ = <-d.closeC: + case db.tcompPauseC <- (chan<- struct{})(ch): + case _, _ = <-db.closeC: return } - d.compactionTransact("mem@flush", func(cnt *compactionTransactCounter) (err error) { + db.compactionTransact("mem@flush", func(cnt *compactionTransactCounter) (err error) { stats.startTimer() defer stats.stopTimer() return c.flush(mem, -1) }, func() error { for _, r := range c.rec.addedTables { - s.logf("mem@flush rollback @%d", r.num) - f := s.getTableFile(r.num) + db.logf("mem@flush rollback @%d", r.num) + f := db.s.getTableFile(r.num) if err := f.Remove(); err != nil { return err } @@ -252,61 +253,59 @@ func (d *DB) memCompaction() { return nil }) - d.compactionTransact("mem@commit", func(cnt *compactionTransactCounter) (err error) { + db.compactionTransact("mem@commit", func(cnt *compactionTransactCounter) (err error) { stats.startTimer() defer stats.stopTimer() - return c.commit(d.journalFile.Num(), d.frozenSeq) + return c.commit(db.journalFile.Num(), db.frozenSeq) }, nil) - s.logf("mem@flush commited F·%d T·%v", len(c.rec.addedTables), stats.duration) + db.logf("mem@flush commited F·%d T·%v", len(c.rec.addedTables), stats.duration) for _, r := range c.rec.addedTables { stats.write += r.size } - d.compStats[c.level].add(stats) + db.compStats[c.level].add(stats) // Drop frozen mem. - d.dropFrozenMem() + db.dropFrozenMem() // Resume table compaction. select { case <-ch: - case _, _ = <-d.closeC: + case _, _ = <-db.closeC: return } // Trigger table compaction. - d.compTrigger(d.mcompTriggerC) + db.compTrigger(db.mcompTriggerC) } -func (d *DB) tableCompaction(c *compaction, noTrivial bool) { - s := d.s - +func (db *DB) tableCompaction(c *compaction, noTrivial bool) { rec := new(sessionRecord) - rec.addCompactionPointer(c.level, c.max) + rec.addCompactionPointer(c.level, c.imax) if !noTrivial && c.trivial() { t := c.tables[0][0] - s.logf("table@move L%d@%d -> L%d", c.level, t.file.Num(), c.level+1) + db.logf("table@move L%d@%d -> L%d", c.level, t.file.Num(), c.level+1) rec.deleteTable(c.level, t.file.Num()) rec.addTableFile(c.level+1, t) - d.compactionTransact("table@move", func(cnt *compactionTransactCounter) (err error) { - return s.commit(rec) + db.compactionTransact("table@move", func(cnt *compactionTransactCounter) (err error) { + return db.s.commit(rec) }, nil) return } var stats [2]cStatsStaging - for i, tt := range c.tables { - for _, t := range tt { + for i, tables := range c.tables { + for _, t := range tables { stats[i].read += t.size // Insert deleted tables into record rec.deleteTable(c.level+i, t.file.Num()) } } sourceSize := int(stats[0].read + stats[1].read) - minSeq := d.minSeq() - s.logf("table@compaction L%d·%d -> L%d·%d S·%s Q·%d", c.level, len(c.tables[0]), c.level+1, len(c.tables[1]), shortenb(sourceSize), minSeq) + minSeq := db.minSeq() + db.logf("table@compaction L%d·%d -> L%d·%d S·%s Q·%d", c.level, len(c.tables[0]), c.level+1, len(c.tables[1]), shortenb(sourceSize), minSeq) var snapUkey []byte var snapHasUkey bool @@ -314,7 +313,7 @@ func (d *DB) tableCompaction(c *compaction, noTrivial bool) { var snapIter int var snapDropCnt int var dropCnt int - d.compactionTransact("table@build", func(cnt *compactionTransactCounter) (err error) { + db.compactionTransact("table@build", func(cnt *compactionTransactCounter) (err error) { ukey := append([]byte{}, snapUkey...) hasUkey := snapHasUkey lseq := snapSeq @@ -329,7 +328,7 @@ func (d *DB) tableCompaction(c *compaction, noTrivial bool) { } rec.addTableFile(c.level+1, t) stats[1].write += t.size - s.logf("table@build created L%d@%d N·%d S·%s %q:%q", c.level+1, t.file.Num(), tw.tw.EntriesLen(), shortenb(int(t.size)), t.min, t.max) + db.logf("table@build created L%d@%d N·%d S·%s %q:%q", c.level+1, t.file.Num(), tw.tw.EntriesLen(), shortenb(int(t.size)), t.imin, t.imax) return nil } @@ -353,9 +352,9 @@ func (d *DB) tableCompaction(c *compaction, noTrivial bool) { continue } - key := iKey(iter.Key()) + ikey := iKey(iter.Key()) - if c.shouldStopBefore(key) && tw != nil { + if c.shouldStopBefore(ikey) && tw != nil { err = finish() if err != nil { return @@ -375,15 +374,15 @@ func (d *DB) tableCompaction(c *compaction, noTrivial bool) { snapSched = false } - if seq, t, ok := key.parseNum(); !ok { + if seq, vt, ok := ikey.parseNum(); !ok { // Don't drop error keys ukey = ukey[:0] hasUkey = false lseq = kMaxSeq } else { - if !hasUkey || s.icmp.uCompare(key.ukey(), ukey) != 0 { + if !hasUkey || db.s.icmp.uCompare(ikey.ukey(), ukey) != 0 { // First occurrence of this user key - ukey = append(ukey[:0], key.ukey()...) + ukey = append(ukey[:0], ikey.ukey()...) hasUkey = true lseq = kMaxSeq } @@ -392,7 +391,7 @@ func (d *DB) tableCompaction(c *compaction, noTrivial bool) { if lseq <= minSeq { // Dropped because newer entry for same user key exist drop = true // (A) - } else if t == tDel && seq <= minSeq && c.isBaseLevelForKey(ukey) { + } else if vt == tDel && seq <= minSeq && c.baseLevelForKey(ukey) { // For this user key: // (1) there is no data in higher levels // (2) data in lower levels will have larger seq numbers @@ -414,22 +413,22 @@ func (d *DB) tableCompaction(c *compaction, noTrivial bool) { if tw == nil { // Check for pause event. select { - case ch := <-d.tcompPauseC: - d.pauseCompaction(ch) - case _, _ = <-d.closeC: - d.compactionExitTransact() + case ch := <-db.tcompPauseC: + db.pauseCompaction(ch) + case _, _ = <-db.closeC: + db.compactionExitTransact() default: } // Create new table. - tw, err = s.tops.create() + tw, err = db.s.tops.create() if err != nil { return } } // Write key/value into table - err = tw.add(key, iter.Value()) + err = tw.append(ikey, iter.Value()) if err != nil { return } @@ -461,8 +460,8 @@ func (d *DB) tableCompaction(c *compaction, noTrivial bool) { return }, func() error { for _, r := range rec.addedTables { - s.logf("table@build rollback @%d", r.num) - f := s.getTableFile(r.num) + db.logf("table@build rollback @%d", r.num) + f := db.s.getTableFile(r.num) if err := f.Remove(); err != nil { return err } @@ -471,60 +470,61 @@ func (d *DB) tableCompaction(c *compaction, noTrivial bool) { }) // Commit changes - d.compactionTransact("table@commit", func(cnt *compactionTransactCounter) (err error) { + db.compactionTransact("table@commit", func(cnt *compactionTransactCounter) (err error) { stats[1].startTimer() defer stats[1].stopTimer() - return s.commit(rec) + return db.s.commit(rec) }, nil) - resultSize := int(int(stats[1].write)) - s.logf("table@compaction commited F%s S%s D·%d T·%v", sint(len(rec.addedTables)-len(rec.deletedTables)), sshortenb(resultSize-sourceSize), dropCnt, stats[1].duration) + resultSize := int(stats[1].write) + db.logf("table@compaction commited F%s S%s D·%d T·%v", sint(len(rec.addedTables)-len(rec.deletedTables)), sshortenb(resultSize-sourceSize), dropCnt, stats[1].duration) // Save compaction stats for i := range stats { - d.compStats[c.level+1].add(&stats[i]) + db.compStats[c.level+1].add(&stats[i]) } } -func (d *DB) tableRangeCompaction(level int, min, max []byte) { - s := d.s - s.logf("table@compaction range L%d %q:%q", level, min, max) +func (db *DB) tableRangeCompaction(level int, umin, umax []byte) { + db.logf("table@compaction range L%d %q:%q", level, umin, umax) if level >= 0 { - if c := s.getCompactionRange(level, min, max); c != nil { - d.tableCompaction(c, true) + if c := db.s.getCompactionRange(level, umin, umax); c != nil { + db.tableCompaction(c, true) } } else { - v := s.version_NB() + v := db.s.version_NB() + m := 1 for i, t := range v.tables[1:] { - if t.isOverlaps(min, max, true, s.icmp) { + if t.overlaps(db.s.icmp, umin, umax, false) { m = i + 1 } } + for level := 0; level < m; level++ { - if c := s.getCompactionRange(level, min, max); c != nil { - d.tableCompaction(c, true) + if c := db.s.getCompactionRange(level, umin, umax); c != nil { + db.tableCompaction(c, true) } } } } -func (d *DB) tableAutoCompaction() { - if c := d.s.pickCompaction(); c != nil { - d.tableCompaction(c, false) +func (db *DB) tableAutoCompaction() { + if c := db.s.pickCompaction(); c != nil { + db.tableCompaction(c, false) } } -func (d *DB) tableNeedCompaction() bool { - return d.s.version_NB().needCompaction() +func (db *DB) tableNeedCompaction() bool { + return db.s.version_NB().needCompaction() } -func (d *DB) pauseCompaction(ch chan<- struct{}) { +func (db *DB) pauseCompaction(ch chan<- struct{}) { select { case ch <- struct{}{}: - case _, _ = <-d.closeC: - d.compactionExitTransact() + case _, _ = <-db.closeC: + db.compactionExitTransact() } } @@ -555,48 +555,48 @@ func (r cRange) ack(err error) { } } -func (d *DB) compSendIdle(compC chan<- cCmd) error { +func (db *DB) compSendIdle(compC chan<- cCmd) error { ch := make(chan error) defer close(ch) // Send cmd. select { case compC <- cIdle{ch}: - case err := <-d.compErrC: + case err := <-db.compErrC: return err - case _, _ = <-d.closeC: + case _, _ = <-db.closeC: return ErrClosed } // Wait cmd. return <-ch } -func (d *DB) compSendRange(compC chan<- cCmd, level int, min, max []byte) (err error) { +func (db *DB) compSendRange(compC chan<- cCmd, level int, min, max []byte) (err error) { ch := make(chan error) defer close(ch) // Send cmd. select { case compC <- cRange{level, min, max, ch}: - case err := <-d.compErrC: + case err := <-db.compErrC: return err - case _, _ = <-d.closeC: + case _, _ = <-db.closeC: return ErrClosed } // Wait cmd. select { - case err = <-d.compErrC: + case err = <-db.compErrC: case err = <-ch: } return err } -func (d *DB) compTrigger(compTriggerC chan struct{}) { +func (db *DB) compTrigger(compTriggerC chan struct{}) { select { case compTriggerC <- struct{}{}: default: } } -func (d *DB) mCompaction() { +func (db *DB) mCompaction() { var x cCmd defer func() { @@ -608,24 +608,24 @@ func (d *DB) mCompaction() { if x != nil { x.ack(ErrClosed) } - d.closeW.Done() + db.closeW.Done() }() for { select { - case _, _ = <-d.closeC: - return - case x = <-d.mcompCmdC: - d.memCompaction() + case x = <-db.mcompCmdC: + db.memCompaction() x.ack(nil) x = nil - case <-d.mcompTriggerC: - d.memCompaction() + case <-db.mcompTriggerC: + db.memCompaction() + case _, _ = <-db.closeC: + return } } } -func (d *DB) tCompaction() { +func (db *DB) tCompaction() { var x cCmd var ackQ []cCmd @@ -642,19 +642,19 @@ func (d *DB) tCompaction() { if x != nil { x.ack(ErrClosed) } - d.closeW.Done() + db.closeW.Done() }() for { - if d.tableNeedCompaction() { + if db.tableNeedCompaction() { select { - case x = <-d.tcompCmdC: - case <-d.tcompTriggerC: - case _, _ = <-d.closeC: - return - case ch := <-d.tcompPauseC: - d.pauseCompaction(ch) + case x = <-db.tcompCmdC: + case <-db.tcompTriggerC: + case ch := <-db.tcompPauseC: + db.pauseCompaction(ch) continue + case _, _ = <-db.closeC: + return default: } } else { @@ -664,12 +664,12 @@ func (d *DB) tCompaction() { } ackQ = ackQ[:0] select { - case x = <-d.tcompCmdC: - case <-d.tcompTriggerC: - case ch := <-d.tcompPauseC: - d.pauseCompaction(ch) + case x = <-db.tcompCmdC: + case <-db.tcompTriggerC: + case ch := <-db.tcompPauseC: + db.pauseCompaction(ch) continue - case _, _ = <-d.closeC: + case _, _ = <-db.closeC: return } } @@ -678,11 +678,11 @@ func (d *DB) tCompaction() { case cIdle: ackQ = append(ackQ, x) case cRange: - d.tableRangeCompaction(cmd.level, cmd.min, cmd.max) + db.tableRangeCompaction(cmd.level, cmd.min, cmd.max) x.ack(nil) } x = nil } - d.tableAutoCompaction() + db.tableAutoCompaction() } } diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_iter.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_iter.go index 9973a8fef..911d29a13 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_iter.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_iter.go @@ -20,10 +20,8 @@ var ( ) func (db *DB) newRawIterator(slice *util.Range, ro *opt.ReadOptions) iterator.Iterator { - s := db.s - em, fm := db.getMems() - v := s.version() + v := db.s.version() ti := v.getIterators(slice, ro) n := len(ti) + 2 @@ -33,24 +31,24 @@ func (db *DB) newRawIterator(slice *util.Range, ro *opt.ReadOptions) iterator.It i = append(i, fm.NewIterator(slice)) } i = append(i, ti...) - strict := s.o.GetStrict(opt.StrictIterator) || ro.GetStrict(opt.StrictIterator) - mi := iterator.NewMergedIterator(i, s.icmp, strict) + strict := db.s.o.GetStrict(opt.StrictIterator) || ro.GetStrict(opt.StrictIterator) + mi := iterator.NewMergedIterator(i, db.s.icmp, strict) mi.SetReleaser(&versionReleaser{v: v}) return mi } func (db *DB) newIterator(seq uint64, slice *util.Range, ro *opt.ReadOptions) *dbIter { - var slice_ *util.Range + var islice *util.Range if slice != nil { - slice_ = &util.Range{} + islice = &util.Range{} if slice.Start != nil { - slice_.Start = newIKey(slice.Start, kMaxSeq, tSeek) + islice.Start = newIKey(slice.Start, kMaxSeq, tSeek) } if slice.Limit != nil { - slice_.Limit = newIKey(slice.Limit, kMaxSeq, tSeek) + islice.Limit = newIKey(slice.Limit, kMaxSeq, tSeek) } } - rawIter := db.newRawIterator(slice_, ro) + rawIter := db.newRawIterator(islice, ro) iter := &dbIter{ icmp: db.s.icmp, iter: rawIter, diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_snapshot.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_snapshot.go index 225b7cd5e..31340bdd0 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_snapshot.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_snapshot.go @@ -87,12 +87,12 @@ type Snapshot struct { // Creates new snapshot object. func (db *DB) newSnapshot() *Snapshot { - p := &Snapshot{ + snap := &Snapshot{ db: db, elem: db.acquireSnapshot(), } - runtime.SetFinalizer(p, (*Snapshot).Release) - return p + runtime.SetFinalizer(snap, (*Snapshot).Release) + return snap } // Get gets the value for the given key. It returns ErrNotFound if @@ -100,19 +100,18 @@ func (db *DB) newSnapshot() *Snapshot { // // The caller should not modify the contents of the returned slice, but // it is safe to modify the contents of the argument after Get returns. -func (p *Snapshot) Get(key []byte, ro *opt.ReadOptions) (value []byte, err error) { - db := p.db - err = db.ok() +func (snap *Snapshot) Get(key []byte, ro *opt.ReadOptions) (value []byte, err error) { + err = snap.db.ok() if err != nil { return } - p.mu.Lock() - defer p.mu.Unlock() - if p.released { + snap.mu.Lock() + defer snap.mu.Unlock() + if snap.released { err = ErrSnapshotReleased return } - return db.get(key, p.elem.seq, ro) + return snap.db.get(key, snap.elem.seq, ro) } // NewIterator returns an iterator for the snapshot of the uderlying DB. @@ -132,17 +131,18 @@ func (p *Snapshot) Get(key []byte, ro *opt.ReadOptions) (value []byte, err error // iterator would be still valid until released. // // Also read Iterator documentation of the leveldb/iterator package. -func (p *Snapshot) NewIterator(slice *util.Range, ro *opt.ReadOptions) iterator.Iterator { - db := p.db - if err := db.ok(); err != nil { +func (snap *Snapshot) NewIterator(slice *util.Range, ro *opt.ReadOptions) iterator.Iterator { + if err := snap.db.ok(); err != nil { return iterator.NewEmptyIterator(err) } - p.mu.Lock() - defer p.mu.Unlock() - if p.released { + snap.mu.Lock() + defer snap.mu.Unlock() + if snap.released { return iterator.NewEmptyIterator(ErrSnapshotReleased) } - return db.newIterator(p.elem.seq, slice, ro) + // Since iterator already hold version ref, it doesn't need to + // hold snapshot ref. + return snap.db.newIterator(snap.elem.seq, slice, ro) } // Release releases the snapshot. This will not release any returned @@ -150,16 +150,17 @@ func (p *Snapshot) NewIterator(slice *util.Range, ro *opt.ReadOptions) iterator. // underlying DB is closed. // // Other methods should not be called after the snapshot has been released. -func (p *Snapshot) Release() { - p.mu.Lock() - if !p.released { - // Clear the finalizer. - runtime.SetFinalizer(p, nil) +func (snap *Snapshot) Release() { + snap.mu.Lock() + defer snap.mu.Unlock() - p.released = true - p.db.releaseSnapshot(p.elem) - p.db = nil - p.elem = nil + if !snap.released { + // Clear the finalizer. + runtime.SetFinalizer(snap, nil) + + snap.released = true + snap.db.releaseSnapshot(snap.elem) + snap.db = nil + snap.elem = nil } - p.mu.Unlock() } diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_state.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_state.go index a13706142..425bcfd5b 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_state.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_state.go @@ -14,100 +14,101 @@ import ( ) // Get latest sequence number. -func (d *DB) getSeq() uint64 { - return atomic.LoadUint64(&d.seq) +func (db *DB) getSeq() uint64 { + return atomic.LoadUint64(&db.seq) } // Atomically adds delta to seq. -func (d *DB) addSeq(delta uint64) { - atomic.AddUint64(&d.seq, delta) +func (db *DB) addSeq(delta uint64) { + atomic.AddUint64(&db.seq, delta) } // Create new memdb and froze the old one; need external synchronization. // newMem only called synchronously by the writer. -func (d *DB) newMem(n int) (mem *memdb.DB, err error) { - s := d.s - - num := s.allocFileNum() - file := s.getJournalFile(num) +func (db *DB) newMem(n int) (mem *memdb.DB, err error) { + num := db.s.allocFileNum() + file := db.s.getJournalFile(num) w, err := file.Create() if err != nil { - s.reuseFileNum(num) + db.s.reuseFileNum(num) return } - d.memMu.Lock() - if d.journal == nil { - d.journal = journal.NewWriter(w) + + db.memMu.Lock() + defer db.memMu.Unlock() + + if db.journal == nil { + db.journal = journal.NewWriter(w) } else { - d.journal.Reset(w) - d.journalWriter.Close() - d.frozenJournalFile = d.journalFile + db.journal.Reset(w) + db.journalWriter.Close() + db.frozenJournalFile = db.journalFile } - d.journalWriter = w - d.journalFile = file - d.frozenMem = d.mem - d.mem = memdb.New(s.icmp, maxInt(d.s.o.GetWriteBuffer(), n)) - mem = d.mem - // The seq only incremented by the writer. - d.frozenSeq = d.seq - d.memMu.Unlock() + db.journalWriter = w + db.journalFile = file + db.frozenMem = db.mem + db.mem = memdb.New(db.s.icmp, maxInt(db.s.o.GetWriteBuffer(), n)) + mem = db.mem + // The seq only incremented by the writer. And whoever called newMem + // should hold write lock, so no need additional synchronization here. + db.frozenSeq = db.seq return } // Get all memdbs. -func (d *DB) getMems() (e *memdb.DB, f *memdb.DB) { - d.memMu.RLock() - defer d.memMu.RUnlock() - return d.mem, d.frozenMem +func (db *DB) getMems() (e *memdb.DB, f *memdb.DB) { + db.memMu.RLock() + defer db.memMu.RUnlock() + return db.mem, db.frozenMem } // Get frozen memdb. -func (d *DB) getEffectiveMem() *memdb.DB { - d.memMu.RLock() - defer d.memMu.RUnlock() - return d.mem +func (db *DB) getEffectiveMem() *memdb.DB { + db.memMu.RLock() + defer db.memMu.RUnlock() + return db.mem } // Check whether we has frozen memdb. -func (d *DB) hasFrozenMem() bool { - d.memMu.RLock() - defer d.memMu.RUnlock() - return d.frozenMem != nil +func (db *DB) hasFrozenMem() bool { + db.memMu.RLock() + defer db.memMu.RUnlock() + return db.frozenMem != nil } // Get frozen memdb. -func (d *DB) getFrozenMem() *memdb.DB { - d.memMu.RLock() - defer d.memMu.RUnlock() - return d.frozenMem +func (db *DB) getFrozenMem() *memdb.DB { + db.memMu.RLock() + defer db.memMu.RUnlock() + return db.frozenMem } // Drop frozen memdb; assume that frozen memdb isn't nil. -func (d *DB) dropFrozenMem() { - d.memMu.Lock() - if err := d.frozenJournalFile.Remove(); err != nil { - d.s.logf("journal@remove removing @%d %q", d.frozenJournalFile.Num(), err) +func (db *DB) dropFrozenMem() { + db.memMu.Lock() + if err := db.frozenJournalFile.Remove(); err != nil { + db.logf("journal@remove removing @%d %q", db.frozenJournalFile.Num(), err) } else { - d.s.logf("journal@remove removed @%d", d.frozenJournalFile.Num()) + db.logf("journal@remove removed @%d", db.frozenJournalFile.Num()) } - d.frozenJournalFile = nil - d.frozenMem = nil - d.memMu.Unlock() + db.frozenJournalFile = nil + db.frozenMem = nil + db.memMu.Unlock() } // Set closed flag; return true if not already closed. -func (d *DB) setClosed() bool { - return atomic.CompareAndSwapUint32(&d.closed, 0, 1) +func (db *DB) setClosed() bool { + return atomic.CompareAndSwapUint32(&db.closed, 0, 1) } // Check whether DB was closed. -func (d *DB) isClosed() bool { - return atomic.LoadUint32(&d.closed) != 0 +func (db *DB) isClosed() bool { + return atomic.LoadUint32(&db.closed) != 0 } // Check read ok status. -func (d *DB) ok() error { - if d.isClosed() { +func (db *DB) ok() error { + if db.isClosed() { return ErrClosed } return nil diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_test.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_test.go index 5de7d9723..7f15b4b65 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_test.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_test.go @@ -154,9 +154,7 @@ func (h *dbHarness) maxNextLevelOverlappingBytes(want uint64) { level := i + 1 next := v.tables[level+1] for _, t := range tt { - var r tFiles - min, max := t.min.ukey(), t.max.ukey() - next.getOverlaps(min, max, &r, true, db.s.icmp.ucmp) + r := next.getOverlaps(nil, db.s.icmp, t.imin.ukey(), t.imax.ukey(), false) sum := r.size() if sum > res { res = sum diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_util.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_util.go index 2db007e27..4f6b792d1 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_util.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_util.go @@ -32,40 +32,42 @@ func (p Sizes) Sum() (n uint64) { return n } -// Check and clean files. -func (d *DB) checkAndCleanFiles() error { - s := d.s +// Logging. +func (db *DB) log(v ...interface{}) { db.s.log(v...) } +func (db *DB) logf(format string, v ...interface{}) { db.s.logf(format, v...) } - v := s.version_NB() - tables := make(map[uint64]bool) - for _, tt := range v.tables { - for _, t := range tt { - tables[t.file.Num()] = false +// Check and clean files. +func (db *DB) checkAndCleanFiles() error { + v := db.s.version_NB() + tablesMap := make(map[uint64]bool) + for _, tables := range v.tables { + for _, t := range tables { + tablesMap[t.file.Num()] = false } } - ff, err := s.getFiles(storage.TypeAll) + files, err := db.s.getFiles(storage.TypeAll) if err != nil { return err } var nTables int var rem []storage.File - for _, f := range ff { + for _, f := range files { keep := true switch f.Type() { case storage.TypeManifest: - keep = f.Num() >= s.manifestFile.Num() + keep = f.Num() >= db.s.manifestFile.Num() case storage.TypeJournal: - if d.frozenJournalFile != nil { - keep = f.Num() >= d.frozenJournalFile.Num() + if db.frozenJournalFile != nil { + keep = f.Num() >= db.frozenJournalFile.Num() } else { - keep = f.Num() >= d.journalFile.Num() + keep = f.Num() >= db.journalFile.Num() } case storage.TypeTable: - _, keep = tables[f.Num()] + _, keep = tablesMap[f.Num()] if keep { - tables[f.Num()] = true + tablesMap[f.Num()] = true nTables++ } } @@ -75,18 +77,18 @@ func (d *DB) checkAndCleanFiles() error { } } - if nTables != len(tables) { - for num, present := range tables { + if nTables != len(tablesMap) { + for num, present := range tablesMap { if !present { - s.logf("db@janitor table missing @%d", num) + db.logf("db@janitor table missing @%d", num) } } return ErrCorrupted{Type: MissingFiles, Err: errors.New("leveldb: table files missing")} } - s.logf("db@janitor F·%d G·%d", len(ff), len(rem)) + db.logf("db@janitor F·%d G·%d", len(files), len(rem)) for _, f := range rem { - s.logf("db@janitor removing %s-%d", f.Type(), f.Num()) + db.logf("db@janitor removing %s-%d", f.Type(), f.Num()) if err := f.Remove(); err != nil { return err } diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_write.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_write.go index 4660e840c..2b2840320 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_write.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_write.go @@ -14,63 +14,61 @@ import ( "github.com/syndtr/goleveldb/leveldb/util" ) -func (d *DB) writeJournal(b *Batch) error { - w, err := d.journal.Next() +func (db *DB) writeJournal(b *Batch) error { + w, err := db.journal.Next() if err != nil { return err } if _, err := w.Write(b.encode()); err != nil { return err } - if err := d.journal.Flush(); err != nil { + if err := db.journal.Flush(); err != nil { return err } if b.sync { - return d.journalWriter.Sync() + return db.journalWriter.Sync() } return nil } -func (d *DB) jWriter() { - defer d.closeW.Done() +func (db *DB) jWriter() { + defer db.closeW.Done() for { select { - case b := <-d.journalC: + case b := <-db.journalC: if b != nil { - d.journalAckC <- d.writeJournal(b) + db.journalAckC <- db.writeJournal(b) } - case _, _ = <-d.closeC: + case _, _ = <-db.closeC: return } } } -func (d *DB) rotateMem(n int) (mem *memdb.DB, err error) { +func (db *DB) rotateMem(n int) (mem *memdb.DB, err error) { // Wait for pending memdb compaction. - err = d.compSendIdle(d.mcompCmdC) + err = db.compSendIdle(db.mcompCmdC) if err != nil { return } // Create new memdb and journal. - mem, err = d.newMem(n) + mem, err = db.newMem(n) if err != nil { return } // Schedule memdb compaction. - d.compTrigger(d.mcompTriggerC) + db.compTrigger(db.mcompTriggerC) return } -func (d *DB) flush(n int) (mem *memdb.DB, nn int, err error) { - s := d.s - +func (db *DB) flush(n int) (mem *memdb.DB, nn int, err error) { delayed := false flush := func() bool { - v := s.version() + v := db.s.version() defer v.release() - mem = d.getEffectiveMem() + mem = db.getEffectiveMem() nn = mem.Free() switch { case v.tLen(0) >= kL0_SlowdownWritesTrigger && !delayed: @@ -80,7 +78,7 @@ func (d *DB) flush(n int) (mem *memdb.DB, nn int, err error) { return false case v.tLen(0) >= kL0_StopWritesTrigger: delayed = true - err = d.compSendIdle(d.tcompCmdC) + err = db.compSendIdle(db.tcompCmdC) if err != nil { return false } @@ -90,7 +88,7 @@ func (d *DB) flush(n int) (mem *memdb.DB, nn int, err error) { nn = n return false } - mem, err = d.rotateMem(n) + mem, err = db.rotateMem(n) nn = mem.Free() return false } @@ -100,7 +98,7 @@ func (d *DB) flush(n int) (mem *memdb.DB, nn int, err error) { for flush() { } if delayed { - s.logf("db@write delayed T·%v", time.Since(start)) + db.logf("db@write delayed T·%v", time.Since(start)) } return } @@ -109,8 +107,8 @@ func (d *DB) flush(n int) (mem *memdb.DB, nn int, err error) { // sequentially. // // It is safe to modify the contents of the arguments after Write returns. -func (d *DB) Write(b *Batch, wo *opt.WriteOptions) (err error) { - err = d.ok() +func (db *DB) Write(b *Batch, wo *opt.WriteOptions) (err error) { + err = db.ok() if err != nil || b == nil || b.len() == 0 { return } @@ -120,25 +118,25 @@ func (d *DB) Write(b *Batch, wo *opt.WriteOptions) (err error) { // The write happen synchronously. retry: select { - case d.writeC <- b: - if <-d.writeMergedC { - return <-d.writeAckC + case db.writeC <- b: + if <-db.writeMergedC { + return <-db.writeAckC } goto retry - case d.writeLockC <- struct{}{}: - case _, _ = <-d.closeC: + case db.writeLockC <- struct{}{}: + case _, _ = <-db.closeC: return ErrClosed } merged := 0 defer func() { - <-d.writeLockC + <-db.writeLockC for i := 0; i < merged; i++ { - d.writeAckC <- err + db.writeAckC <- err } }() - mem, memFree, err := d.flush(b.size()) + mem, memFree, err := db.flush(b.size()) if err != nil { return } @@ -154,13 +152,13 @@ retry: drain: for b.size() < m && !b.sync { select { - case nb := <-d.writeC: + case nb := <-db.writeC: if b.size()+nb.size() <= m { b.append(nb) - d.writeMergedC <- true + db.writeMergedC <- true merged++ } else { - d.writeMergedC <- false + db.writeMergedC <- false break drain } default: @@ -169,25 +167,25 @@ drain: } // Set batch first seq number relative from last seq. - b.seq = d.seq + 1 + b.seq = db.seq + 1 // Write journal concurrently if it is large enough. if b.size() >= (128 << 10) { // Push the write batch to the journal writer select { - case _, _ = <-d.closeC: + case _, _ = <-db.closeC: err = ErrClosed return - case d.journalC <- b: + case db.journalC <- b: // Write into memdb b.memReplay(mem) } // Wait for journal writer select { - case _, _ = <-d.closeC: + case _, _ = <-db.closeC: err = ErrClosed return - case err = <-d.journalAckC: + case err = <-db.journalAckC: if err != nil { // Revert memdb if error detected b.revertMemReplay(mem) @@ -195,7 +193,7 @@ drain: } } } else { - err = d.writeJournal(b) + err = db.writeJournal(b) if err != nil { return } @@ -203,10 +201,10 @@ drain: } // Set last seq number. - d.addSeq(uint64(b.len())) + db.addSeq(uint64(b.len())) if b.size() >= memFree { - d.rotateMem(0) + db.rotateMem(0) } return } @@ -215,20 +213,20 @@ drain: // for that key; a DB is not a multi-map. // // It is safe to modify the contents of the arguments after Put returns. -func (d *DB) Put(key, value []byte, wo *opt.WriteOptions) error { +func (db *DB) Put(key, value []byte, wo *opt.WriteOptions) error { b := new(Batch) b.Put(key, value) - return d.Write(b, wo) + return db.Write(b, wo) } // Delete deletes the value for the given key. It returns ErrNotFound if // the DB does not contain the key. // // It is safe to modify the contents of the arguments after Delete returns. -func (d *DB) Delete(key []byte, wo *opt.WriteOptions) error { +func (db *DB) Delete(key []byte, wo *opt.WriteOptions) error { b := new(Batch) b.Delete(key) - return d.Write(b, wo) + return db.Write(b, wo) } func isMemOverlaps(icmp *iComparer, mem *memdb.DB, min, max []byte) bool { @@ -247,33 +245,33 @@ func isMemOverlaps(icmp *iComparer, mem *memdb.DB, min, max []byte) bool { // A nil Range.Start is treated as a key before all keys in the DB. // And a nil Range.Limit is treated as a key after all keys in the DB. // Therefore if both is nil then it will compact entire DB. -func (d *DB) CompactRange(r util.Range) error { - if err := d.ok(); err != nil { +func (db *DB) CompactRange(r util.Range) error { + if err := db.ok(); err != nil { return err } select { - case d.writeLockC <- struct{}{}: - case _, _ = <-d.closeC: + case db.writeLockC <- struct{}{}: + case _, _ = <-db.closeC: return ErrClosed } // Check for overlaps in memdb. - mem := d.getEffectiveMem() - if isMemOverlaps(d.s.icmp, mem, r.Start, r.Limit) { + mem := db.getEffectiveMem() + if isMemOverlaps(db.s.icmp, mem, r.Start, r.Limit) { // Memdb compaction. - if _, err := d.rotateMem(0); err != nil { - <-d.writeLockC + if _, err := db.rotateMem(0); err != nil { + <-db.writeLockC return err } - <-d.writeLockC - if err := d.compSendIdle(d.mcompCmdC); err != nil { + <-db.writeLockC + if err := db.compSendIdle(db.mcompCmdC); err != nil { return err } } else { - <-d.writeLockC + <-db.writeLockC } // Table compaction. - return d.compSendRange(d.tcompCmdC, -1, r.Start, r.Limit) + return db.compSendRange(db.tcompCmdC, -1, r.Start, r.Limit) } diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/external_test.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/external_test.go index d7dff04b6..f74f354c6 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/external_test.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/external_test.go @@ -36,7 +36,7 @@ var _ = testutil.Defer(func() { testutil.DoDBTesting(&t) db.TestClose() done <- true - }, 9.0) + }, 20.0) }) Describe("read test", func() { diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/memdb/memdb.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/memdb/memdb.go index 7bcae992a..37a8b5740 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/memdb/memdb.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/memdb/memdb.go @@ -437,6 +437,8 @@ func (p *DB) Reset() { // New creates a new initalized in-memory key/value DB. The capacity // is the initial key/value buffer capacity. The capacity is advisory, // not enforced. +// +// The returned DB instance is goroutine-safe. func New(cmp comparer.BasicComparer, capacity int) *DB { p := &DB{ cmp: cmp, diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/session.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/session.go index 6b2a61683..8c64fb8f2 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/session.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/session.go @@ -39,11 +39,12 @@ type session struct { manifestWriter storage.Writer manifestFile storage.File - stCPtrs [kNumLevels]iKey // compact pointers; need external synchronization + stCptrs [kNumLevels]iKey // compact pointers; need external synchronization stVersion *version // current version vmu sync.Mutex } +// Creates new initialized session instance. func newSession(stor storage.Storage, o *opt.Options) (s *session, err error) { if stor == nil { return nil, os.ErrInvalid @@ -81,6 +82,7 @@ func (s *session) close() { s.stVersion = nil } +// Release session lock. func (s *session) release() { s.storLock.Release() } @@ -132,8 +134,8 @@ func (s *session) recover() (err error) { err = rec.decode(r) if err == nil { // save compact pointers - for _, rp := range rec.compactionPointers { - s.stCPtrs[rp.level] = iKey(rp.key) + for _, r := range rec.compactionPointers { + s.stCptrs[r.level] = iKey(r.ikey) } // commit record to version staging staging.commit(rec) @@ -195,16 +197,16 @@ func (s *session) pickCompaction() *compaction { var t0 tFiles if v.cScore >= 1 { level = v.cLevel - cp := s.stCPtrs[level] - tt := v.tables[level] - for _, t := range tt { - if cp == nil || s.icmp.Compare(t.max, cp) > 0 { + cptr := s.stCptrs[level] + tables := v.tables[level] + for _, t := range tables { + if cptr == nil || s.icmp.Compare(t.imax, cptr) > 0 { t0 = append(t0, t) break } } if len(t0) == 0 { - t0 = append(t0, tt[0]) + t0 = append(t0, tables[0]) } } else { if p := atomic.LoadPointer(&v.cSeek); p != nil { @@ -216,11 +218,10 @@ func (s *session) pickCompaction() *compaction { } } - c := &compaction{s: s, version: v, level: level} + c := &compaction{s: s, v: v, level: level} if level == 0 { - min, max := t0.getRange(s.icmp) - t0 = nil - v.tables[0].getOverlaps(min.ukey(), max.ukey(), &t0, false, s.icmp.ucmp) + imin, imax := t0.getRange(s.icmp) + t0 = v.tables[0].getOverlaps(t0[:0], s.icmp, imin.ukey(), imax.ukey(), true) } c.tables[0] = t0 @@ -229,11 +230,10 @@ func (s *session) pickCompaction() *compaction { } // Create compaction from given level and range; need external synchronization. -func (s *session) getCompactionRange(level int, min, max []byte) *compaction { +func (s *session) getCompactionRange(level int, umin, umax []byte) *compaction { v := s.version_NB() - var t0 tFiles - v.tables[level].getOverlaps(min, max, &t0, level != 0, s.icmp.ucmp) + t0 := v.tables[level].getOverlaps(nil, s.icmp, umin, umax, level == 0) if len(t0) == 0 { return nil } @@ -255,16 +255,16 @@ func (s *session) getCompactionRange(level int, min, max []byte) *compaction { } } - c := &compaction{s: s, version: v, level: level} + c := &compaction{s: s, v: v, level: level} c.tables[0] = t0 c.expand() return c } -// compaction represent a compaction state +// compaction represent a compaction state. type compaction struct { - s *session - version *version + s *session + v *version level int tables [2]tFiles @@ -273,42 +273,36 @@ type compaction struct { gpidx int seenKey bool overlappedBytes uint64 - min, max iKey + imin, imax iKey tPtrs [kNumLevels]int } // Expand compacted tables; need external synchronization. func (c *compaction) expand() { - s := c.s - v := c.version - level := c.level - vt0, vt1 := v.tables[level], v.tables[level+1] + vt0, vt1 := c.v.tables[level], c.v.tables[level+1] t0, t1 := c.tables[0], c.tables[1] - min, max := t0.getRange(s.icmp) - vt1.getOverlaps(min.ukey(), max.ukey(), &t1, true, s.icmp.ucmp) - - // Get entire range covered by compaction - amin, amax := append(t0, t1...).getRange(s.icmp) + imin, imax := t0.getRange(c.s.icmp) + t1 = vt1.getOverlaps(t1, c.s.icmp, imin.ukey(), imax.ukey(), false) + // Get entire range covered by compaction. + amin, amax := append(t0, t1...).getRange(c.s.icmp) // See if we can grow the number of inputs in "level" without // changing the number of "level+1" files we pick up. if len(t1) > 0 { - var exp0 tFiles - vt0.getOverlaps(amin.ukey(), amax.ukey(), &exp0, level != 0, s.icmp.ucmp) + exp0 := vt0.getOverlaps(nil, c.s.icmp, amin.ukey(), amax.ukey(), level == 0) if len(exp0) > len(t0) && t1.size()+exp0.size() < kExpCompactionMaxBytes { - var exp1 tFiles - xmin, xmax := exp0.getRange(s.icmp) - vt1.getOverlaps(xmin.ukey(), xmax.ukey(), &exp1, true, s.icmp.ucmp) + xmin, xmax := exp0.getRange(c.s.icmp) + exp1 := vt1.getOverlaps(nil, c.s.icmp, xmin.ukey(), xmax.ukey(), false) if len(exp1) == len(t1) { - s.logf("table@compaction expanding L%d+L%d (F·%d S·%s)+(F·%d S·%s) -> (F·%d S·%s)+(F·%d S·%s)", + c.s.logf("table@compaction expanding L%d+L%d (F·%d S·%s)+(F·%d S·%s) -> (F·%d S·%s)+(F·%d S·%s)", level, level+1, len(t0), shortenb(int(t0.size())), len(t1), shortenb(int(t1.size())), len(exp0), shortenb(int(exp0.size())), len(exp1), shortenb(int(exp1.size()))) - min, max = xmin, xmax + imin, imax = xmin, xmax t0, t1 = exp0, exp1 - amin, amax = append(t0, t1...).getRange(s.icmp) + amin, amax = append(t0, t1...).getRange(c.s.icmp) } } } @@ -316,11 +310,11 @@ func (c *compaction) expand() { // Compute the set of grandparent files that overlap this compaction // (parent == level+1; grandparent == level+2) if level+2 < kNumLevels { - v.tables[level+2].getOverlaps(amin.ukey(), amax.ukey(), &c.gp, true, s.icmp.ucmp) + c.gp = c.v.tables[level+2].getOverlaps(c.gp, c.s.icmp, amin.ukey(), amax.ukey(), false) } c.tables[0], c.tables[1] = t0, t1 - c.min, c.max = min, max + c.imin, c.imax = imin, imax } // Check whether compaction is trivial. @@ -328,17 +322,14 @@ func (c *compaction) trivial() bool { return len(c.tables[0]) == 1 && len(c.tables[1]) == 0 && c.gp.size() <= kMaxGrandParentOverlapBytes } -func (c *compaction) isBaseLevelForKey(key []byte) bool { - s := c.s - v := c.version - - for level, tt := range v.tables[c.level+2:] { - for c.tPtrs[level] < len(tt) { - t := tt[c.tPtrs[level]] - if s.icmp.uCompare(key, t.max.ukey()) <= 0 { - // We've advanced far enough - if s.icmp.uCompare(key, t.min.ukey()) >= 0 { - // Key falls in this file's range, so definitely not base level +func (c *compaction) baseLevelForKey(ukey []byte) bool { + for level, tables := range c.v.tables[c.level+2:] { + for c.tPtrs[level] < len(tables) { + t := tables[c.tPtrs[level]] + if c.s.icmp.uCompare(ukey, t.imax.ukey()) <= 0 { + // We've advanced far enough. + if c.s.icmp.uCompare(ukey, t.imin.ukey()) >= 0 { + // Key falls in this file's range, so definitely not base level. return false } break @@ -349,10 +340,10 @@ func (c *compaction) isBaseLevelForKey(key []byte) bool { return true } -func (c *compaction) shouldStopBefore(key iKey) bool { +func (c *compaction) shouldStopBefore(ikey iKey) bool { for ; c.gpidx < len(c.gp); c.gpidx++ { gp := c.gp[c.gpidx] - if c.s.icmp.Compare(key, gp.max) <= 0 { + if c.s.icmp.Compare(ikey, gp.imax) <= 0 { break } if c.seenKey { @@ -362,42 +353,44 @@ func (c *compaction) shouldStopBefore(key iKey) bool { c.seenKey = true if c.overlappedBytes > kMaxGrandParentOverlapBytes { - // Too much overlap for current output; start new output + // Too much overlap for current output; start new output. c.overlappedBytes = 0 return true } return false } +// Creates an iterator. func (c *compaction) newIterator() iterator.Iterator { - s := c.s - - level := c.level - icap := 2 + // Creates iterator slice. + icap := len(c.tables) if c.level == 0 { + // Special case for level-0 icap = len(c.tables[0]) + 1 } its := make([]iterator.Iterator, 0, icap) + // Options. ro := &opt.ReadOptions{ DontFillCache: true, } - strict := s.o.GetStrict(opt.StrictIterator) + strict := c.s.o.GetStrict(opt.StrictIterator) - for i, tt := range c.tables { - if len(tt) == 0 { + for i, tables := range c.tables { + if len(tables) == 0 { continue } - if level+i == 0 { - for _, t := range tt { - its = append(its, s.tops.newIterator(t, nil, ro)) + // Level-0 is not sorted and may overlaps each other. + if c.level+i == 0 { + for _, t := range tables { + its = append(its, c.s.tops.newIterator(t, nil, ro)) } } else { - it := iterator.NewIndexedIterator(tt.newIndexIterator(s.tops, s.icmp, nil, ro), strict, true) + it := iterator.NewIndexedIterator(tables.newIndexIterator(c.s.tops, c.s.icmp, nil, ro), strict, true) its = append(its, it) } } - return iterator.NewMergedIterator(its, s.icmp, true) + return iterator.NewMergedIterator(its, c.s.icmp, true) } diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/session_record.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/session_record.go index c50fda737..272129589 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/session_record.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/session_record.go @@ -35,19 +35,19 @@ const ( type cpRecord struct { level int - key iKey + ikey iKey } type ntRecord struct { level int num uint64 size uint64 - min iKey - max iKey + imin iKey + imax iKey } func (r ntRecord) makeFile(s *session) *tFile { - return newTFile(s.getTableFile(r.num), r.size, r.min, r.max) + return newTableFile(s.getTableFile(r.num), r.size, r.imin, r.imax) } type dtRecord struct { @@ -98,9 +98,9 @@ func (p *sessionRecord) setSeq(seq uint64) { p.seq = seq } -func (p *sessionRecord) addCompactionPointer(level int, key iKey) { +func (p *sessionRecord) addCompactionPointer(level int, ikey iKey) { p.hasRec |= 1 << recCompactionPointer - p.compactionPointers = append(p.compactionPointers, cpRecord{level, key}) + p.compactionPointers = append(p.compactionPointers, cpRecord{level, ikey}) } func (p *sessionRecord) resetCompactionPointers() { @@ -108,13 +108,13 @@ func (p *sessionRecord) resetCompactionPointers() { p.compactionPointers = p.compactionPointers[:0] } -func (p *sessionRecord) addTable(level int, num, size uint64, min, max iKey) { +func (p *sessionRecord) addTable(level int, num, size uint64, imin, imax iKey) { p.hasRec |= 1 << recNewTable - p.addedTables = append(p.addedTables, ntRecord{level, num, size, min, max}) + p.addedTables = append(p.addedTables, ntRecord{level, num, size, imin, imax}) } func (p *sessionRecord) addTableFile(level int, t *tFile) { - p.addTable(level, t.file.Num(), t.size, t.min, t.max) + p.addTable(level, t.file.Num(), t.size, t.imin, t.imax) } func (p *sessionRecord) resetAddedTables() { @@ -169,23 +169,23 @@ func (p *sessionRecord) encode(w io.Writer) error { p.putUvarint(w, recSeq) p.putUvarint(w, p.seq) } - for _, cp := range p.compactionPointers { + for _, r := range p.compactionPointers { p.putUvarint(w, recCompactionPointer) - p.putUvarint(w, uint64(cp.level)) - p.putBytes(w, cp.key) + p.putUvarint(w, uint64(r.level)) + p.putBytes(w, r.ikey) } - for _, t := range p.deletedTables { + for _, r := range p.deletedTables { p.putUvarint(w, recDeletedTable) - p.putUvarint(w, uint64(t.level)) - p.putUvarint(w, t.num) + p.putUvarint(w, uint64(r.level)) + p.putUvarint(w, r.num) } - for _, t := range p.addedTables { + for _, r := range p.addedTables { p.putUvarint(w, recNewTable) - p.putUvarint(w, uint64(t.level)) - p.putUvarint(w, t.num) - p.putUvarint(w, t.size) - p.putBytes(w, t.min) - p.putBytes(w, t.max) + p.putUvarint(w, uint64(r.level)) + p.putUvarint(w, r.num) + p.putUvarint(w, r.size) + p.putBytes(w, r.imin) + p.putBytes(w, r.imax) } return p.err } @@ -282,18 +282,18 @@ func (p *sessionRecord) decode(r io.Reader) error { } case recCompactionPointer: level := p.readLevel(br) - key := p.readBytes(br) + ikey := p.readBytes(br) if p.err == nil { - p.addCompactionPointer(level, iKey(key)) + p.addCompactionPointer(level, iKey(ikey)) } case recNewTable: level := p.readLevel(br) num := p.readUvarint(br) size := p.readUvarint(br) - min := p.readBytes(br) - max := p.readBytes(br) + imin := p.readBytes(br) + imax := p.readBytes(br) if p.err == nil { - p.addTable(level, num, size, min, max) + p.addTable(level, num, size, imin, imax) } case recDeletedTable: level := p.readLevel(br) diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/session_util.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/session_util.go index bf412b030..47a2ea898 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/session_util.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/session_util.go @@ -14,7 +14,7 @@ import ( "github.com/syndtr/goleveldb/leveldb/storage" ) -// logging +// Logging. type dropper struct { s *session @@ -29,15 +29,10 @@ func (d dropper) Drop(err error) { } } -func (s *session) log(v ...interface{}) { - s.stor.Log(fmt.Sprint(v...)) -} +func (s *session) log(v ...interface{}) { s.stor.Log(fmt.Sprint(v...)) } +func (s *session) logf(format string, v ...interface{}) { s.stor.Log(fmt.Sprintf(format, v...)) } -func (s *session) logf(format string, v ...interface{}) { - s.stor.Log(fmt.Sprintf(format, v...)) -} - -// file utils +// File utils. func (s *session) getJournalFile(num uint64) storage.File { return s.stor.GetFile(num, storage.TypeJournal) @@ -56,7 +51,7 @@ func (s *session) newTemp() storage.File { return s.stor.GetFile(num, storage.TypeTemp) } -// session state +// Session state. // Get current version. func (s *session) version() *version { @@ -126,7 +121,7 @@ func (s *session) reuseFileNum(num uint64) { } } -// manifest related utils +// Manifest related utils. // Fill given session record obj with current states; need external // synchronization. @@ -142,7 +137,7 @@ func (s *session) fillRecord(r *sessionRecord, snapshot bool) { r.setSeq(s.stSeq) } - for level, ik := range s.stCPtrs { + for level, ik := range s.stCptrs { if ik != nil { r.addCompactionPointer(level, ik) } @@ -168,7 +163,7 @@ func (s *session) recordCommited(r *sessionRecord) { } for _, p := range r.compactionPointers { - s.stCPtrs[p.level] = iKey(p.key) + s.stCptrs[p.level] = iKey(p.ikey) } } diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/storage/file_storage.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/storage/file_storage.go index ea0259428..75439f6db 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/storage/file_storage.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/storage/file_storage.go @@ -344,19 +344,17 @@ type fileWrap struct { } func (fw fileWrap) Sync() error { + if err := fw.File.Sync(); err != nil { + return err + } if fw.f.Type() == TypeManifest { // Also sync parent directory if file type is manifest. // See: https://code.google.com/p/leveldb/issues/detail?id=190. - f, err := os.Open(fw.f.fs.path) - if err != nil { - return err - } - defer f.Close() - if err := f.Sync(); err != nil { + if err := syncDir(fw.f.fs.path); err != nil { return err } } - return fw.File.Sync() + return nil } func (fw fileWrap) Close() error { diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/storage/file_storage_plan9.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/storage/file_storage_plan9.go index 9e3f19b21..42940d769 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/storage/file_storage_plan9.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/storage/file_storage_plan9.go @@ -38,3 +38,15 @@ func rename(oldpath, newpath string) error { _, fname := filepath.Split(newpath) return os.Rename(oldpath, fname) } + +func syncDir(name string) error { + f, err := os.Open(name) + if err != nil { + return err + } + defer f.Close() + if err := f.Sync(); err != nil { + return err + } + return nil +} diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/storage/file_storage_unix.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/storage/file_storage_unix.go index 395db2d2a..73499afc8 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/storage/file_storage_unix.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/storage/file_storage_unix.go @@ -49,3 +49,15 @@ func setFileLock(f *os.File, lock bool) error { func rename(oldpath, newpath string) error { return os.Rename(oldpath, newpath) } + +func syncDir(name string) error { + f, err := os.Open(name) + if err != nil { + return err + } + defer f.Close() + if err := f.Sync(); err != nil { + return err + } + return nil +} diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/storage/file_storage_windows.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/storage/file_storage_windows.go index b834b18fa..50c3c454e 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/storage/file_storage_windows.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/storage/file_storage_windows.go @@ -65,3 +65,5 @@ func rename(oldpath, newpath string) error { } return moveFileEx(from, to, _MOVEFILE_REPLACE_EXISTING) } + +func syncDir(name string) error { return nil } diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/storage/storage.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/storage/storage.go index de5694888..5a1885e60 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/storage/storage.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/storage/storage.go @@ -67,7 +67,7 @@ type Writer interface { Syncer } -// File is the file. +// File is the file. A file instance must be goroutine-safe. type File interface { // Open opens the file for read. Returns os.ErrNotExist error // if the file does not exist. @@ -94,7 +94,7 @@ type File interface { Remove() error } -// Storage is the storage. +// Storage is the storage. A storage instance must be goroutine-safe. type Storage interface { // Lock locks the storage. Any subsequent attempt to call Lock will fail // until the last lock released. diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table.go index fdd5d2bcf..b44120e5b 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table.go @@ -11,7 +11,6 @@ import ( "sync/atomic" "github.com/syndtr/goleveldb/leveldb/cache" - "github.com/syndtr/goleveldb/leveldb/comparer" "github.com/syndtr/goleveldb/leveldb/iterator" "github.com/syndtr/goleveldb/leveldb/opt" "github.com/syndtr/goleveldb/leveldb/storage" @@ -19,34 +18,41 @@ import ( "github.com/syndtr/goleveldb/leveldb/util" ) -// table file +// tFile holds basic information about a table. type tFile struct { - file storage.File - seekLeft int32 - size uint64 - min, max iKey + file storage.File + seekLeft int32 + size uint64 + imin, imax iKey } -// test if key is after t -func (t *tFile) isAfter(key []byte, ucmp comparer.BasicComparer) bool { - return key != nil && ucmp.Compare(key, t.max.ukey()) > 0 +// Returns true if given key is after largest key of this table. +func (t *tFile) after(icmp *iComparer, ukey []byte) bool { + return ukey != nil && icmp.uCompare(ukey, t.imax.ukey()) > 0 } -// test if key is before t -func (t *tFile) isBefore(key []byte, ucmp comparer.BasicComparer) bool { - return key != nil && ucmp.Compare(key, t.min.ukey()) < 0 +// Returns true if given key is before smallest key of this table. +func (t *tFile) before(icmp *iComparer, ukey []byte) bool { + return ukey != nil && icmp.uCompare(ukey, t.imin.ukey()) < 0 } -func (t *tFile) incrSeek() int32 { +// Returns true if given key range overlaps with this table key range. +func (t *tFile) overlaps(icmp *iComparer, umin, umax []byte) bool { + return !t.after(icmp, umin) && !t.before(icmp, umax) +} + +// Cosumes one seek and return current seeks left. +func (t *tFile) consumeSeek() int32 { return atomic.AddInt32(&t.seekLeft, -1) } -func newTFile(file storage.File, size uint64, min, max iKey) *tFile { +// Creates new tFile. +func newTableFile(file storage.File, size uint64, imin, imax iKey) *tFile { f := &tFile{ file: file, size: size, - min: min, - max: max, + imin: imin, + imax: imax, } // We arrange to automatically compact this file after @@ -70,33 +76,40 @@ func newTFile(file storage.File, size uint64, min, max iKey) *tFile { return f } -// table files +// tFiles hold multiple tFile. type tFiles []*tFile func (tf tFiles) Len() int { return len(tf) } func (tf tFiles) Swap(i, j int) { tf[i], tf[j] = tf[j], tf[i] } +// Returns true if i smallest key is less than j. +// This used for sort by key in ascending order. func (tf tFiles) lessByKey(icmp *iComparer, i, j int) bool { a, b := tf[i], tf[j] - n := icmp.Compare(a.min, b.min) + n := icmp.Compare(a.imin, b.imin) if n == 0 { return a.file.Num() < b.file.Num() } return n < 0 } +// Returns true if i file number is greater than j. +// This used for sort by file number in descending order. func (tf tFiles) lessByNum(i, j int) bool { return tf[i].file.Num() > tf[j].file.Num() } +// Sorts tables by key in ascending order. func (tf tFiles) sortByKey(icmp *iComparer) { sort.Sort(&tFilesSortByKey{tFiles: tf, icmp: icmp}) } +// Sorts tables by file number in descending order. func (tf tFiles) sortByNum() { sort.Sort(&tFilesSortByNum{tFiles: tf}) } +// Returns sum of all tables size. func (tf tFiles) size() (sum uint64) { for _, t := range tf { sum += t.size @@ -104,94 +117,106 @@ func (tf tFiles) size() (sum uint64) { return sum } -func (tf tFiles) searchMin(key iKey, icmp *iComparer) int { +// Searches smallest index of tables whose its smallest +// key is after or equal with given key. +func (tf tFiles) searchMin(icmp *iComparer, ikey iKey) int { return sort.Search(len(tf), func(i int) bool { - return icmp.Compare(tf[i].min, key) >= 0 + return icmp.Compare(tf[i].imin, ikey) >= 0 }) } -func (tf tFiles) searchMax(key iKey, icmp *iComparer) int { +// Searches smallest index of tables whose its largest +// key is after or equal with given key. +func (tf tFiles) searchMax(icmp *iComparer, ikey iKey) int { return sort.Search(len(tf), func(i int) bool { - return icmp.Compare(tf[i].max, key) >= 0 + return icmp.Compare(tf[i].imax, ikey) >= 0 }) } -func (tf tFiles) isOverlaps(min, max []byte, disjSorted bool, icmp *iComparer) bool { - if !disjSorted { - // Need to check against all files +// Returns true if given key range overlaps with one or more +// tables key range. If unsorted is true then binary search will not be used. +func (tf tFiles) overlaps(icmp *iComparer, umin, umax []byte, unsorted bool) bool { + if unsorted { + // Check against all files. for _, t := range tf { - if !t.isAfter(min, icmp.ucmp) && !t.isBefore(max, icmp.ucmp) { + if t.overlaps(icmp, umin, umax) { return true } } return false } - var idx int - if len(min) > 0 { - // Find the earliest possible internal key for min - idx = tf.searchMax(newIKey(min, kMaxSeq, tSeek), icmp) + i := 0 + if len(umin) > 0 { + // Find the earliest possible internal key for min. + i = tf.searchMax(icmp, newIKey(umin, kMaxSeq, tSeek)) } - - if idx >= len(tf) { - // beginning of range is after all files, so no overlap + if i >= len(tf) { + // Beginning of range is after all files, so no overlap. return false } - return !tf[idx].isBefore(max, icmp.ucmp) + return !tf[i].before(icmp, umax) } -func (tf tFiles) getOverlaps(min, max []byte, r *tFiles, disjSorted bool, ucmp comparer.BasicComparer) { +// Returns tables whose its key range overlaps with given key range. +// If overlapped is true then the search will be expanded to tables that +// overlaps with each other. +func (tf tFiles) getOverlaps(dst tFiles, icmp *iComparer, umin, umax []byte, overlapped bool) tFiles { + x := len(dst) for i := 0; i < len(tf); { t := tf[i] - i++ - if t.isAfter(min, ucmp) || t.isBefore(max, ucmp) { - continue - } - - *r = append(*r, t) - if !disjSorted { - // Level-0 files may overlap each other. So check if the newly - // added file has expanded the range. If so, restart search. - if min != nil && ucmp.Compare(t.min.ukey(), min) < 0 { - min = t.min.ukey() - *r = nil - i = 0 - } else if max != nil && ucmp.Compare(t.max.ukey(), max) > 0 { - max = t.max.ukey() - *r = nil - i = 0 + if t.overlaps(icmp, umin, umax) { + if overlapped { + // For overlapped files, check if the newly added file has + // expanded the range. If so, restart search. + if umin != nil && icmp.uCompare(t.imin.ukey(), umin) < 0 { + umin = t.imin.ukey() + dst = dst[:x] + i = 0 + continue + } else if umax != nil && icmp.uCompare(t.imax.ukey(), umax) > 0 { + umax = t.imax.ukey() + dst = dst[:x] + i = 0 + continue + } } + + dst = append(dst, t) } + i++ } - return + return dst } -func (tf tFiles) getRange(icmp *iComparer) (min, max iKey) { +// Returns tables key range. +func (tf tFiles) getRange(icmp *iComparer) (imin, imax iKey) { for i, t := range tf { if i == 0 { - min, max = t.min, t.max + imin, imax = t.imin, t.imax continue } - if icmp.Compare(t.min, min) < 0 { - min = t.min + if icmp.Compare(t.imin, imin) < 0 { + imin = t.imin } - if icmp.Compare(t.max, max) > 0 { - max = t.max + if icmp.Compare(t.imax, imax) > 0 { + imax = t.imax } } return } +// Creates iterator index from tables. func (tf tFiles) newIndexIterator(tops *tOps, icmp *iComparer, slice *util.Range, ro *opt.ReadOptions) iterator.IteratorIndexer { if slice != nil { var start, limit int if slice.Start != nil { - start = tf.searchMax(iKey(slice.Start), icmp) + start = tf.searchMax(icmp, iKey(slice.Start)) } if slice.Limit != nil { - limit = tf.searchMin(iKey(slice.Limit), icmp) + limit = tf.searchMin(icmp, iKey(slice.Limit)) } else { limit = tf.Len() } @@ -206,6 +231,7 @@ func (tf tFiles) newIndexIterator(tops *tOps, icmp *iComparer, slice *util.Range }) } +// Tables iterator index. type tFilesArrayIndexer struct { tFiles tops *tOps @@ -215,7 +241,7 @@ type tFilesArrayIndexer struct { } func (a *tFilesArrayIndexer) Search(key []byte) int { - return a.searchMax(iKey(key), a.icmp) + return a.searchMax(a.icmp, iKey(key)) } func (a *tFilesArrayIndexer) Get(i int) iterator.Iterator { @@ -225,6 +251,7 @@ func (a *tFilesArrayIndexer) Get(i int) iterator.Iterator { return a.tops.newIterator(a.tFiles[i], nil, a.ro) } +// Helper type for sortByKey. type tFilesSortByKey struct { tFiles icmp *iComparer @@ -234,6 +261,7 @@ func (x *tFilesSortByKey) Less(i, j int) bool { return x.lessByKey(x.icmp, i, j) } +// Helper type for sortByNum. type tFilesSortByNum struct { tFiles } @@ -242,19 +270,14 @@ func (x *tFilesSortByNum) Less(i, j int) bool { return x.lessByNum(i, j) } -// table operations +// Table operations. type tOps struct { s *session cache cache.Cache cacheNS cache.Namespace } -func newTableOps(s *session, cacheCap int) *tOps { - c := cache.NewLRUCache(cacheCap) - ns := c.GetNamespace(0) - return &tOps{s, c, ns} -} - +// Creates an empty table and returns table writer. func (t *tOps) create() (*tWriter, error) { file := t.s.getTableFile(t.s.allocFileNum()) fw, err := file.Create() @@ -269,6 +292,7 @@ func (t *tOps) create() (*tWriter, error) { }, nil } +// Builds table from src iterator. func (t *tOps) createFrom(src iterator.Iterator) (f *tFile, n int, err error) { w, err := t.create() if err != nil { @@ -282,7 +306,7 @@ func (t *tOps) createFrom(src iterator.Iterator) (f *tFile, n int, err error) { }() for src.Next() { - err = w.add(src.Key(), src.Value()) + err = w.append(src.Key(), src.Value()) if err != nil { return } @@ -297,7 +321,9 @@ func (t *tOps) createFrom(src iterator.Iterator) (f *tFile, n int, err error) { return } -func (t *tOps) lookup(f *tFile) (c cache.Object, err error) { +// Opens table. It returns a cache object, which should +// be released after use. +func (t *tOps) open(f *tFile) (c cache.Object, err error) { num := f.file.Num() c, ok := t.cacheNS.Get(num, func() (ok bool, value interface{}, charge int, fin cache.SetFin) { var r storage.Reader @@ -327,8 +353,10 @@ func (t *tOps) lookup(f *tFile) (c cache.Object, err error) { return } -func (t *tOps) get(f *tFile, key []byte, ro *opt.ReadOptions) (rkey, rvalue []byte, err error) { - c, err := t.lookup(f) +// Finds key/value pair whose key is greater than or equal to the +// given key. +func (t *tOps) find(f *tFile, key []byte, ro *opt.ReadOptions) (rkey, rvalue []byte, err error) { + c, err := t.open(f) if err != nil { return nil, nil, err } @@ -336,8 +364,9 @@ func (t *tOps) get(f *tFile, key []byte, ro *opt.ReadOptions) (rkey, rvalue []by return c.Value().(*table.Reader).Find(key, ro) } +// Returns approximate offset of the given key. func (t *tOps) offsetOf(f *tFile, key []byte) (offset uint64, err error) { - c, err := t.lookup(f) + c, err := t.open(f) if err != nil { return } @@ -347,8 +376,9 @@ func (t *tOps) offsetOf(f *tFile, key []byte) (offset uint64, err error) { return } +// Creates an iterator from the given table. func (t *tOps) newIterator(f *tFile, slice *util.Range, ro *opt.ReadOptions) iterator.Iterator { - c, err := t.lookup(f) + c, err := t.open(f) if err != nil { return iterator.NewEmptyIterator(err) } @@ -357,6 +387,8 @@ func (t *tOps) newIterator(f *tFile, slice *util.Range, ro *opt.ReadOptions) ite return iter } +// Removes table from persistent storage. It waits until +// no one use the the table. func (t *tOps) remove(f *tFile) { num := f.file.Num() t.cacheNS.Delete(num, func(exist bool) { @@ -371,10 +403,21 @@ func (t *tOps) remove(f *tFile) { }) } +// Closes the table ops instance. It will close all tables, +// regadless still used or not. func (t *tOps) close() { t.cache.Zap(true) } +// Creates new initialized table ops instance. +func newTableOps(s *session, cacheCap int) *tOps { + c := cache.NewLRUCache(cacheCap) + ns := c.GetNamespace(0) + return &tOps{s, c, ns} +} + +// tWriter wraps the table writer. It keep track of file descriptor +// and added key range. type tWriter struct { t *tOps @@ -385,7 +428,8 @@ type tWriter struct { first, last []byte } -func (w *tWriter) add(key, value []byte) error { +// Append key/value pair to the table. +func (w *tWriter) append(key, value []byte) error { if w.first == nil { w.first = append([]byte{}, key...) } @@ -393,10 +437,12 @@ func (w *tWriter) add(key, value []byte) error { return w.tw.Append(key, value) } +// Returns true if the table is empty. func (w *tWriter) empty() bool { return w.first == nil } +// Finalizes the table and returns table file. func (w *tWriter) finish() (f *tFile, err error) { err = w.tw.Close() if err != nil { @@ -408,10 +454,11 @@ func (w *tWriter) finish() (f *tFile, err error) { return } w.w.Close() - f = newTFile(w.file, uint64(w.tw.BytesLen()), iKey(w.first), iKey(w.last)) + f = newTableFile(w.file, uint64(w.tw.BytesLen()), iKey(w.first), iKey(w.last)) return } +// Drops the table. func (w *tWriter) drop() { w.w.Close() w.file.Remove() diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table/reader.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table/reader.go index 8acb9f720..51df477da 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table/reader.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table/reader.go @@ -761,6 +761,8 @@ func (r *Reader) OffsetOf(key []byte) (offset int64, err error) { // NewReader creates a new initialized table reader for the file. // The cache is optional and can be nil. +// +// The returned table reader instance is goroutine-safe. func NewReader(f io.ReaderAt, size int64, cache cache.Namespace, o *opt.Options) *Reader { r := &Reader{ reader: f, diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/testutil_test.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/testutil_test.go index c1402fda3..111f8730c 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/testutil_test.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/testutil_test.go @@ -48,6 +48,7 @@ func (t *testingDB) TestClose() { func newTestingDB(o *opt.Options, ro *opt.ReadOptions, wo *opt.WriteOptions) *testingDB { stor := testutil.NewStorage() db, err := Open(stor, o) + // FIXME: This may be called from outside It, which may cause panic. Expect(err).NotTo(HaveOccurred()) return &testingDB{ DB: db, diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/version.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/version.go index 4c54d6480..81fd9ee0d 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/version.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/version.go @@ -40,8 +40,8 @@ type version struct { tables [kNumLevels]tFiles // Level that should be compacted next and its compaction score. - // Score < 1 means compaction is not strictly needed. These fields - // are initialized by ComputeCompaction() + // Score < 1 means compaction is not strictly needed. These fields + // are initialized by computeCompaction() cLevel int cScore float64 @@ -60,8 +60,6 @@ func (v *version) release_NB() { panic("negative version ref") } - s := v.s - tables := make(map[uint64]bool) for _, tt := range v.next.tables { for _, t := range tt { @@ -74,7 +72,7 @@ func (v *version) release_NB() { for _, t := range tt { num := t.file.Num() if _, ok := tables[num]; !ok { - s.tops.remove(t) + v.s.tops.remove(t) } } } @@ -89,130 +87,142 @@ func (v *version) release() { v.s.vmu.Unlock() } -func (v *version) get(key iKey, ro *opt.ReadOptions) (value []byte, cstate bool, err error) { - s := v.s +func (v *version) walkOverlapping(ikey iKey, f func(level int, t *tFile) bool, lf func(level int) bool) { + ukey := ikey.ukey() - ukey := key.ukey() - - var tset *tSet - tseek := true - - // We can search level-by-level since entries never hop across - // levels. Therefore we are guaranteed that if we find data - // in an smaller level, later levels are irrelevant. - for level, ts := range v.tables { - if len(ts) == 0 { + // Walk tables level-by-level. + for level, tables := range v.tables { + if len(tables) == 0 { continue } if level == 0 { // Level-0 files may overlap each other. Find all files that - // overlap user_key and process them in order from newest to - var tmp tFiles - for _, t := range ts { - if s.icmp.uCompare(ukey, t.min.ukey()) >= 0 && - s.icmp.uCompare(ukey, t.max.ukey()) <= 0 { - tmp = append(tmp, t) - } - } - - if len(tmp) == 0 { - continue - } - - tmp.sortByNum() - ts = tmp - } else { - i := ts.searchMax(key, s.icmp) - if i >= len(ts) || s.icmp.uCompare(ukey, ts[i].min.ukey()) < 0 { - continue - } - - ts = ts[i : i+1] - } - - var l0found bool - var l0seq uint64 - var l0type vType - var l0value []byte - for _, t := range ts { - if tseek { - if tset == nil { - tset = &tSet{level, t} - } else if tset.table.incrSeek() <= 0 { - cstate = atomic.CompareAndSwapPointer(&v.cSeek, nil, unsafe.Pointer(tset)) - tseek = false - } - } - - var _rkey, rval []byte - _rkey, rval, err = s.tops.get(t, key, ro) - if err == ErrNotFound { - continue - } else if err != nil { - return - } - - rkey := iKey(_rkey) - if seq, t, ok := rkey.parseNum(); ok { - if s.icmp.uCompare(ukey, rkey.ukey()) == 0 { - if level == 0 { - if seq >= l0seq { - l0found = true - l0seq = seq - l0type = t - l0value = rval - } - } else { - switch t { - case tVal: - value = rval - case tDel: - err = ErrNotFound - default: - panic("invalid type") - } + // overlap ukey. + for _, t := range tables { + if t.overlaps(v.s.icmp, ukey, ukey) { + if !f(level, t) { + return + } + } + } + } else { + if i := tables.searchMax(v.s.icmp, ikey); i < len(tables) { + t := tables[i] + if v.s.icmp.uCompare(ukey, t.imin.ukey()) >= 0 { + if !f(level, t) { return } } - } else { - err = errors.New("leveldb: internal key corrupted") - return } } - if level == 0 && l0found { - switch l0type { - case tVal: - value = l0value - case tDel: - err = ErrNotFound - default: - panic("invalid type") - } + + if lf != nil && !lf(level) { return } } +} + +func (v *version) get(ikey iKey, ro *opt.ReadOptions) (value []byte, tcomp bool, err error) { + ukey := ikey.ukey() + + var ( + tset *tSet + tseek bool + + l0found bool + l0seq uint64 + l0vt vType + l0val []byte + ) err = ErrNotFound + + // Since entries never hope across level, finding key/value + // in smaller level make later levels irrelevant. + v.walkOverlapping(ikey, func(level int, t *tFile) bool { + if !tseek { + if tset == nil { + tset = &tSet{level, t} + } else if tset.table.consumeSeek() <= 0 { + tseek = true + tcomp = atomic.CompareAndSwapPointer(&v.cSeek, nil, unsafe.Pointer(tset)) + } + } + + ikey__, val_, err_ := v.s.tops.find(t, ikey, ro) + switch err_ { + case nil: + case ErrNotFound: + return true + default: + err = err_ + return false + } + + ikey_ := iKey(ikey__) + if seq, vt, ok := ikey_.parseNum(); ok { + if v.s.icmp.uCompare(ukey, ikey_.ukey()) != 0 { + return true + } + + if level == 0 { + if seq >= l0seq { + l0found = true + l0seq = seq + l0vt = vt + l0val = val_ + } + } else { + switch vt { + case tVal: + value = val_ + err = nil + case tDel: + default: + panic("leveldb: invalid internal key type") + } + return false + } + } else { + err = errors.New("leveldb: internal key corrupted") + return false + } + + return true + }, func(level int) bool { + if l0found { + switch l0vt { + case tVal: + value = l0val + err = nil + case tDel: + default: + panic("leveldb: invalid internal key type") + } + return false + } + + return true + }) + return } func (v *version) getIterators(slice *util.Range, ro *opt.ReadOptions) (its []iterator.Iterator) { - s := v.s - // Merge all level zero files together since they may overlap for _, t := range v.tables[0] { - it := s.tops.newIterator(t, slice, ro) + it := v.s.tops.newIterator(t, slice, ro) its = append(its, it) } - strict := s.o.GetStrict(opt.StrictIterator) || ro.GetStrict(opt.StrictIterator) - for _, tt := range v.tables[1:] { - if len(tt) == 0 { + strict := v.s.o.GetStrict(opt.StrictIterator) || ro.GetStrict(opt.StrictIterator) + for _, tables := range v.tables[1:] { + if len(tables) == 0 { continue } - it := iterator.NewIndexedIterator(tt.newIndexIterator(s.tops, s.icmp, slice, ro), strict, true) + it := iterator.NewIndexedIterator(tables.newIndexIterator(v.s.tops, v.s.icmp, slice, ro), strict, true) its = append(its, it) } @@ -242,25 +252,25 @@ func (v *version) tLen(level int) int { return len(v.tables[level]) } -func (v *version) offsetOf(key iKey) (n uint64, err error) { - for level, tt := range v.tables { - for _, t := range tt { - if v.s.icmp.Compare(t.max, key) <= 0 { - // Entire file is before "key", so just add the file size +func (v *version) offsetOf(ikey iKey) (n uint64, err error) { + for level, tables := range v.tables { + for _, t := range tables { + if v.s.icmp.Compare(t.imax, ikey) <= 0 { + // Entire file is before "ikey", so just add the file size n += t.size - } else if v.s.icmp.Compare(t.min, key) > 0 { - // Entire file is after "key", so ignore + } else if v.s.icmp.Compare(t.imin, ikey) > 0 { + // Entire file is after "ikey", so ignore if level > 0 { // Files other than level 0 are sorted by meta->min, so // no further files in this level will contain data for - // "key". + // "ikey". break } } else { - // "key" falls in the range for this table. Add the - // approximate offset of "key" within the table. + // "ikey" falls in the range for this table. Add the + // approximate offset of "ikey" within the table. var nn uint64 - nn, err = v.s.tops.offsetOf(t, key) + nn, err = v.s.tops.offsetOf(t, ikey) if err != nil { return 0, err } @@ -272,15 +282,15 @@ func (v *version) offsetOf(key iKey) (n uint64, err error) { return } -func (v *version) pickLevel(min, max []byte) (level int) { - if !v.tables[0].isOverlaps(min, max, false, v.s.icmp) { - var r tFiles +func (v *version) pickLevel(umin, umax []byte) (level int) { + if !v.tables[0].overlaps(v.s.icmp, umin, umax, true) { + var overlaps tFiles for ; level < kMaxMemCompactLevel; level++ { - if v.tables[level+1].isOverlaps(min, max, true, v.s.icmp) { + if v.tables[level+1].overlaps(v.s.icmp, umin, umax, false) { break } - v.tables[level+2].getOverlaps(min, max, &r, true, v.s.icmp.ucmp) - if r.size() > kMaxGrandParentOverlapBytes { + overlaps = v.tables[level+2].getOverlaps(overlaps, v.s.icmp, umin, umax, false) + if overlaps.size() > kMaxGrandParentOverlapBytes { break } } @@ -294,7 +304,7 @@ func (v *version) computeCompaction() { var bestLevel int = -1 var bestScore float64 = -1 - for level, ff := range v.tables { + for level, tables := range v.tables { var score float64 if level == 0 { // We treat level-0 specially by bounding the number of files @@ -308,9 +318,9 @@ func (v *version) computeCompaction() { // file size is small (perhaps because of a small write-buffer // setting, or very high compression ratios, or lots of // overwrites/deletions). - score = float64(len(ff)) / kL0_CompactionTrigger + score = float64(len(tables)) / kL0_CompactionTrigger } else { - score = float64(ff.size()) / levelMaxSize[level] + score = float64(tables.size()) / levelMaxSize[level] } if score > bestScore { @@ -336,57 +346,51 @@ type versionStaging struct { } func (p *versionStaging) commit(r *sessionRecord) { - btt := p.base.tables + // Deleted tables. + for _, r := range r.deletedTables { + tm := &(p.tables[r.level]) - // deleted tables - for _, tr := range r.deletedTables { - tm := &(p.tables[tr.level]) - - bt := btt[tr.level] - if len(bt) > 0 { + if len(p.base.tables[r.level]) > 0 { if tm.deleted == nil { tm.deleted = make(map[uint64]struct{}) } - tm.deleted[tr.num] = struct{}{} + tm.deleted[r.num] = struct{}{} } if tm.added != nil { - delete(tm.added, tr.num) + delete(tm.added, r.num) } } - // new tables - for _, tr := range r.addedTables { - tm := &(p.tables[tr.level]) + // New tables. + for _, r := range r.addedTables { + tm := &(p.tables[r.level]) if tm.added == nil { tm.added = make(map[uint64]ntRecord) } - tm.added[tr.num] = tr + tm.added[r.num] = r if tm.deleted != nil { - delete(tm.deleted, tr.num) + delete(tm.deleted, r.num) } } } func (p *versionStaging) finish() *version { - s := p.base.s - btt := p.base.tables - - // build new version - nv := &version{s: s} + // Build new version. + nv := &version{s: p.base.s} for level, tm := range p.tables { - bt := btt[level] + btables := p.base.tables[level] - n := len(bt) + len(tm.added) - len(tm.deleted) + n := len(btables) + len(tm.added) - len(tm.deleted) if n < 0 { n = 0 } nt := make(tFiles, 0, n) - // base tables - for _, t := range bt { + // Base tables. + for _, t := range btables { if _, ok := tm.deleted[t.file.Num()]; ok { continue } @@ -396,17 +400,21 @@ func (p *versionStaging) finish() *version { nt = append(nt, t) } - // new tables - for _, tr := range tm.added { - nt = append(nt, tr.makeFile(s)) + // New tables. + for _, r := range tm.added { + nt = append(nt, r.makeFile(p.base.s)) } - // sort tables - nt.sortByKey(s.icmp) + // Sort tables. + if level == 0 { + nt.sortByNum() + } else { + nt.sortByKey(p.base.s.icmp) + } nv.tables[level] = nt } - // compute compaction score for new version + // Compute compaction score for new version. nv.computeCompaction() return nv