From 2d217e72bd0d719a858ac373fa161c66fd1aec66 Mon Sep 17 00:00:00 2001 From: Jakob Borg Date: Mon, 15 Jun 2015 21:10:18 +0200 Subject: [PATCH] Dependency update --- Godeps/Godeps.json | 24 +- .../src/github.com/bkaradzic/go-lz4/fuzz.go | 23 + .../src/github.com/bkaradzic/go-lz4/reader.go | 7 +- .../src/github.com/bkaradzic/go-lz4/writer.go | 6 +- .../go-snappy}/snappy/decode.go | 0 .../go-snappy}/snappy/encode.go | 0 .../go-snappy}/snappy/snappy.go | 0 .../go-snappy}/snappy/snappy_test.go | 0 .../src/github.com/juju/ratelimit/LICENSE | 7 +- .../github.com/juju/ratelimit/ratelimit.go | 10 +- .../src/github.com/kardianos/osext/README.md | 4 +- .../src/github.com/kardianos/osext/osext.go | 6 +- .../kardianos/osext/osext_procfs.go | 6 +- .../github.com/kardianos/osext/osext_test.go | 23 + .../github.com/syndtr/goleveldb/leveldb/db.go | 403 ++++++++++++------ .../syndtr/goleveldb/leveldb/db_compaction.go | 114 ++--- .../syndtr/goleveldb/leveldb/db_iter.go | 32 +- .../syndtr/goleveldb/leveldb/db_state.go | 23 +- .../syndtr/goleveldb/leveldb/db_test.go | 148 ++++++- .../syndtr/goleveldb/leveldb/db_write.go | 75 ++-- .../syndtr/goleveldb/leveldb/errors.go | 1 + .../syndtr/goleveldb/leveldb/memdb/memdb.go | 9 +- .../syndtr/goleveldb/leveldb/opt/options.go | 50 ++- .../syndtr/goleveldb/leveldb/session.go | 264 +----------- .../goleveldb/leveldb/session_compaction.go | 287 +++++++++++++ .../goleveldb/leveldb/session_record.go | 14 +- .../goleveldb/leveldb/session_record_test.go | 6 +- .../syndtr/goleveldb/leveldb/session_util.go | 2 +- .../syndtr/goleveldb/leveldb/storage_test.go | 10 + .../syndtr/goleveldb/leveldb/table.go | 8 +- .../syndtr/goleveldb/leveldb/table/reader.go | 2 +- .../syndtr/goleveldb/leveldb/table/writer.go | 2 +- .../syndtr/goleveldb/leveldb/version.go | 27 +- 33 files changed, 1014 insertions(+), 579 deletions(-) create mode 100644 Godeps/_workspace/src/github.com/bkaradzic/go-lz4/fuzz.go rename Godeps/_workspace/src/github.com/{syndtr/gosnappy => google/go-snappy}/snappy/decode.go (100%) rename Godeps/_workspace/src/github.com/{syndtr/gosnappy => google/go-snappy}/snappy/encode.go (100%) rename Godeps/_workspace/src/github.com/{syndtr/gosnappy => google/go-snappy}/snappy/snappy.go (100%) rename Godeps/_workspace/src/github.com/{syndtr/gosnappy => google/go-snappy}/snappy/snappy_test.go (100%) create mode 100644 Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/session_compaction.go diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index c6923b912..ae8bb07c6 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -7,7 +7,7 @@ "Deps": [ { "ImportPath": "github.com/bkaradzic/go-lz4", - "Rev": "93a831dcee242be64a9cc9803dda84af25932de7" + "Rev": "4f7c2045dbd17b802370e2e6022200468abf02ba" }, { "ImportPath": "github.com/calmh/logger", @@ -21,13 +21,17 @@ "ImportPath": "github.com/calmh/xdr", "Rev": "5f7208e86762911861c94f1849eddbfc0a60cbf0" }, + { + "ImportPath": "github.com/google/go-snappy/snappy", + "Rev": "eaa750b9bf4dcb7cb20454be850613b66cda3273" + }, { "ImportPath": "github.com/juju/ratelimit", - "Rev": "c5abe513796336ee2869745bff0638508450e9c5" + "Rev": "faa59ce93750e747b2997635e8b7daf30024b1ac" }, { "ImportPath": "github.com/kardianos/osext", - "Rev": "efacde03154693404c65e7aa7d461ac9014acd0c" + "Rev": "6e7f843663477789fac7c02def0d0909e969b4e5" }, { "ImportPath": "github.com/syncthing/protocol", @@ -35,11 +39,7 @@ }, { "ImportPath": "github.com/syndtr/goleveldb/leveldb", - "Rev": "87e4e645d80ae9c537e8f2dee52b28036a5dd75e" - }, - { - "ImportPath": "github.com/syndtr/gosnappy/snappy", - "Rev": "156a073208e131d7d2e212cb749feae7c339e846" + "Rev": "a06509502ca32565bdf74afc1e573050023f261c" }, { "ImportPath": "github.com/thejerf/suture", @@ -59,19 +59,19 @@ }, { "ImportPath": "golang.org/x/crypto/bcrypt", - "Rev": "c57d4a71915a248dbad846d60825145062b4c18e" + "Rev": "1e856cbfdf9bc25eefca75f83f25d55e35ae72e0" }, { "ImportPath": "golang.org/x/crypto/blowfish", - "Rev": "c57d4a71915a248dbad846d60825145062b4c18e" + "Rev": "1e856cbfdf9bc25eefca75f83f25d55e35ae72e0" }, { "ImportPath": "golang.org/x/text/transform", - "Rev": "2076e9cab4147459c82bc81169e46c139d358547" + "Rev": "df923bbb63f8ea3a26bb743e2a497abd0ab585f7" }, { "ImportPath": "golang.org/x/text/unicode/norm", - "Rev": "2076e9cab4147459c82bc81169e46c139d358547" + "Rev": "df923bbb63f8ea3a26bb743e2a497abd0ab585f7" } ] } diff --git a/Godeps/_workspace/src/github.com/bkaradzic/go-lz4/fuzz.go b/Godeps/_workspace/src/github.com/bkaradzic/go-lz4/fuzz.go new file mode 100644 index 000000000..e4989de9f --- /dev/null +++ b/Godeps/_workspace/src/github.com/bkaradzic/go-lz4/fuzz.go @@ -0,0 +1,23 @@ +// +build gofuzz + +package lz4 + +import "encoding/binary" + +func Fuzz(data []byte) int { + + if len(data) < 4 { + return 0 + } + + ln := binary.LittleEndian.Uint32(data) + if ln > (1 << 21) { + return 0 + } + + if _, err := Decode(nil, data); err != nil { + return 0 + } + + return 1 +} diff --git a/Godeps/_workspace/src/github.com/bkaradzic/go-lz4/reader.go b/Godeps/_workspace/src/github.com/bkaradzic/go-lz4/reader.go index 18628eee7..a2b6535f6 100644 --- a/Godeps/_workspace/src/github.com/bkaradzic/go-lz4/reader.go +++ b/Godeps/_workspace/src/github.com/bkaradzic/go-lz4/reader.go @@ -141,7 +141,7 @@ func Decode(dst, src []byte) ([]byte, error) { length += ln } - if int(d.spos+length) > len(d.src) { + if int(d.spos+length) > len(d.src) || int(d.dpos+length) > len(d.dst) { return nil, ErrCorrupt } @@ -179,7 +179,12 @@ func Decode(dst, src []byte) ([]byte, error) { } literal := d.dpos - d.ref + if literal < 4 { + if int(d.dpos+4) > len(d.dst) { + return nil, ErrCorrupt + } + d.cp(4, decr[literal]) } else { length += 4 diff --git a/Godeps/_workspace/src/github.com/bkaradzic/go-lz4/writer.go b/Godeps/_workspace/src/github.com/bkaradzic/go-lz4/writer.go index 368d2480d..2bf84da50 100644 --- a/Godeps/_workspace/src/github.com/bkaradzic/go-lz4/writer.go +++ b/Godeps/_workspace/src/github.com/bkaradzic/go-lz4/writer.go @@ -25,8 +25,10 @@ package lz4 -import "encoding/binary" -import "errors" +import ( + "encoding/binary" + "errors" +) const ( minMatch = 4 diff --git a/Godeps/_workspace/src/github.com/syndtr/gosnappy/snappy/decode.go b/Godeps/_workspace/src/github.com/google/go-snappy/snappy/decode.go similarity index 100% rename from Godeps/_workspace/src/github.com/syndtr/gosnappy/snappy/decode.go rename to Godeps/_workspace/src/github.com/google/go-snappy/snappy/decode.go diff --git a/Godeps/_workspace/src/github.com/syndtr/gosnappy/snappy/encode.go b/Godeps/_workspace/src/github.com/google/go-snappy/snappy/encode.go similarity index 100% rename from Godeps/_workspace/src/github.com/syndtr/gosnappy/snappy/encode.go rename to Godeps/_workspace/src/github.com/google/go-snappy/snappy/encode.go diff --git a/Godeps/_workspace/src/github.com/syndtr/gosnappy/snappy/snappy.go b/Godeps/_workspace/src/github.com/google/go-snappy/snappy/snappy.go similarity index 100% rename from Godeps/_workspace/src/github.com/syndtr/gosnappy/snappy/snappy.go rename to Godeps/_workspace/src/github.com/google/go-snappy/snappy/snappy.go diff --git a/Godeps/_workspace/src/github.com/syndtr/gosnappy/snappy/snappy_test.go b/Godeps/_workspace/src/github.com/google/go-snappy/snappy/snappy_test.go similarity index 100% rename from Godeps/_workspace/src/github.com/syndtr/gosnappy/snappy/snappy_test.go rename to Godeps/_workspace/src/github.com/google/go-snappy/snappy/snappy_test.go diff --git a/Godeps/_workspace/src/github.com/juju/ratelimit/LICENSE b/Godeps/_workspace/src/github.com/juju/ratelimit/LICENSE index bce6bdd4f..ade9307b3 100644 --- a/Godeps/_workspace/src/github.com/juju/ratelimit/LICENSE +++ b/Godeps/_workspace/src/github.com/juju/ratelimit/LICENSE @@ -1,5 +1,8 @@ -This package contains an efficient token-bucket-based rate limiter. -Copyright (C) 2015 Canonical Ltd. +All files in this repository are licensed as follows. If you contribute +to this repository, it is assumed that you license your contribution +under the same license unless you state otherwise. + +All files Copyright (C) 2015 Canonical Ltd. unless otherwise specified in the file. This software is licensed under the LGPLv3, included below. diff --git a/Godeps/_workspace/src/github.com/juju/ratelimit/ratelimit.go b/Godeps/_workspace/src/github.com/juju/ratelimit/ratelimit.go index 7ebe9f744..5c55aa4ab 100644 --- a/Godeps/_workspace/src/github.com/juju/ratelimit/ratelimit.go +++ b/Godeps/_workspace/src/github.com/juju/ratelimit/ratelimit.go @@ -7,6 +7,7 @@ package ratelimit import ( + "math" "strconv" "sync" "time" @@ -55,7 +56,7 @@ func NewBucketWithRate(rate float64, capacity int64) *Bucket { continue } tb := NewBucketWithQuantum(fillInterval, capacity, quantum) - if diff := abs(tb.Rate() - rate); diff/rate <= rateMargin { + if diff := math.Abs(tb.Rate() - rate); diff/rate <= rateMargin { return tb } } @@ -217,10 +218,3 @@ func (tb *Bucket) adjust(now time.Time) (currentTick int64) { tb.availTick = currentTick return } - -func abs(f float64) float64 { - if f < 0 { - return -f - } - return f -} diff --git a/Godeps/_workspace/src/github.com/kardianos/osext/README.md b/Godeps/_workspace/src/github.com/kardianos/osext/README.md index 820e1ecb5..61350baba 100644 --- a/Godeps/_workspace/src/github.com/kardianos/osext/README.md +++ b/Godeps/_workspace/src/github.com/kardianos/osext/README.md @@ -4,7 +4,9 @@ There is sometimes utility in finding the current executable file that is running. This can be used for upgrading the current executable -or finding resources located relative to the executable file. +or finding resources located relative to the executable file. Both +working directory and the os.Args[0] value are arbitrary and cannot +be relied on; os.Args[0] can be "faked". Multi-platform and supports: * Linux diff --git a/Godeps/_workspace/src/github.com/kardianos/osext/osext.go b/Godeps/_workspace/src/github.com/kardianos/osext/osext.go index 4ed4b9aa3..7bef46f03 100644 --- a/Godeps/_workspace/src/github.com/kardianos/osext/osext.go +++ b/Godeps/_workspace/src/github.com/kardianos/osext/osext.go @@ -16,12 +16,12 @@ func Executable() (string, error) { } // Returns same path as Executable, returns just the folder -// path. Excludes the executable name. +// path. Excludes the executable name and any trailing slash. func ExecutableFolder() (string, error) { p, err := Executable() if err != nil { return "", err } - folder, _ := filepath.Split(p) - return folder, nil + + return filepath.Dir(p), nil } diff --git a/Godeps/_workspace/src/github.com/kardianos/osext/osext_procfs.go b/Godeps/_workspace/src/github.com/kardianos/osext/osext_procfs.go index 07a2a09e7..b2598bc77 100644 --- a/Godeps/_workspace/src/github.com/kardianos/osext/osext_procfs.go +++ b/Godeps/_workspace/src/github.com/kardianos/osext/osext_procfs.go @@ -17,12 +17,14 @@ import ( func executable() (string, error) { switch runtime.GOOS { case "linux": - const deletedSuffix = " (deleted)" + const deletedTag = " (deleted)" execpath, err := os.Readlink("/proc/self/exe") if err != nil { return execpath, err } - return strings.TrimSuffix(execpath, deletedSuffix), nil + execpath = strings.TrimSuffix(execpath, deletedTag) + execpath = strings.TrimPrefix(execpath, deletedTag) + return execpath, nil case "netbsd": return os.Readlink("/proc/curproc/exe") case "openbsd", "dragonfly": diff --git a/Godeps/_workspace/src/github.com/kardianos/osext/osext_test.go b/Godeps/_workspace/src/github.com/kardianos/osext/osext_test.go index 5aafa3af2..77ccc28e9 100644 --- a/Godeps/_workspace/src/github.com/kardianos/osext/osext_test.go +++ b/Godeps/_workspace/src/github.com/kardianos/osext/osext_test.go @@ -24,6 +24,29 @@ const ( executableEnvValueDelete = "delete" ) +func TestPrintExecutable(t *testing.T) { + ef, err := Executable() + if err != nil { + t.Fatalf("Executable failed: %v", err) + } + t.Log("Executable:", ef) +} +func TestPrintExecutableFolder(t *testing.T) { + ef, err := ExecutableFolder() + if err != nil { + t.Fatalf("ExecutableFolder failed: %v", err) + } + t.Log("Executable Folder:", ef) +} +func TestExecutableFolder(t *testing.T) { + ef, err := ExecutableFolder() + if err != nil { + t.Fatalf("ExecutableFolder failed: %v", err) + } + if ef[len(ef)-1] == filepath.Separator { + t.Fatal("ExecutableFolder ends with a trailing slash.") + } +} func TestExecutableMatch(t *testing.T) { ep, err := Executable() if err != nil { diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db.go index 323353b2a..def86bc1a 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db.go @@ -63,13 +63,14 @@ type DB struct { journalAckC chan error // Compaction. - tcompCmdC chan cCmd - tcompPauseC chan chan<- struct{} - mcompCmdC chan cCmd - compErrC chan error - compPerErrC chan error - compErrSetC chan error - compStats []cStats + tcompCmdC chan cCmd + tcompPauseC chan chan<- struct{} + mcompCmdC chan cCmd + compErrC chan error + compPerErrC chan error + compErrSetC chan error + compWriteLocking bool + compStats []cStats // Close. closeW sync.WaitGroup @@ -108,28 +109,44 @@ func openDB(s *session) (*DB, error) { closeC: make(chan struct{}), } - if err := db.recoverJournal(); err != nil { - return nil, err - } + // Read-only mode. + readOnly := s.o.GetReadOnly() - // Remove any obsolete files. - if err := db.checkAndCleanFiles(); err != nil { - // Close journal. - if db.journal != nil { - db.journal.Close() - db.journalWriter.Close() + if readOnly { + // Recover journals (read-only mode). + if err := db.recoverJournalRO(); err != nil { + return nil, err } - return nil, err + } else { + // Recover journals. + if err := db.recoverJournal(); err != nil { + return nil, err + } + + // Remove any obsolete files. + if err := db.checkAndCleanFiles(); err != nil { + // Close journal. + if db.journal != nil { + db.journal.Close() + db.journalWriter.Close() + } + return nil, err + } + } // Doesn't need to be included in the wait group. go db.compactionError() go db.mpoolDrain() - db.closeW.Add(3) - go db.tCompaction() - go db.mCompaction() - go db.jWriter() + if readOnly { + db.SetReadOnly() + } else { + db.closeW.Add(3) + go db.tCompaction() + go db.mCompaction() + go db.jWriter() + } s.logf("db@open done T·%v", time.Since(start)) @@ -275,7 +292,7 @@ func recoverTable(s *session, o *opt.Options) error { // We will drop corrupted table. strict = o.GetStrict(opt.StrictRecovery) - rec = &sessionRecord{numLevel: o.GetNumLevel()} + rec = &sessionRecord{} bpool = util.NewBufferPool(o.GetBlockSize() + 5) ) buildTable := func(iter iterator.Iterator) (tmp storage.File, size int64, err error) { @@ -450,132 +467,136 @@ func recoverTable(s *session, o *opt.Options) error { } func (db *DB) recoverJournal() error { - // Get all tables and sort it by file number. - journalFiles_, err := db.s.getFiles(storage.TypeJournal) + // Get all journals and sort it by file number. + allJournalFiles, err := db.s.getFiles(storage.TypeJournal) if err != nil { return err } - journalFiles := files(journalFiles_) - journalFiles.sort() + files(allJournalFiles).sort() - // Discard older journal. - prev := -1 - for i, file := range journalFiles { - if file.Num() >= db.s.stJournalNum { - if prev >= 0 { - i-- - journalFiles[i] = journalFiles[prev] - } - journalFiles = journalFiles[i:] - break - } else if file.Num() == db.s.stPrevJournalNum { - prev = i + // Journals that will be recovered. + var recJournalFiles []storage.File + for _, jf := range allJournalFiles { + if jf.Num() >= db.s.stJournalNum || jf.Num() == db.s.stPrevJournalNum { + recJournalFiles = append(recJournalFiles, jf) } } - var jr *journal.Reader - var of storage.File - var mem *memdb.DB - batch := new(Batch) - cm := newCMem(db.s) - buf := new(util.Buffer) - // Options. - strict := db.s.o.GetStrict(opt.StrictJournal) - checksum := db.s.o.GetStrict(opt.StrictJournalChecksum) - writeBuffer := db.s.o.GetWriteBuffer() - recoverJournal := func(file storage.File) error { - db.logf("journal@recovery recovering @%d", file.Num()) - reader, err := file.Open() - if err != nil { - return err - } - defer reader.Close() + var ( + of storage.File // Obsolete file. + rec = &sessionRecord{} + ) - // Create/reset journal reader instance. - if jr == nil { - jr = journal.NewReader(reader, dropper{db.s, file}, strict, checksum) - } else { - jr.Reset(reader, dropper{db.s, file}, strict, checksum) - } - - // Flush memdb and remove obsolete journal file. - if of != nil { - if mem.Len() > 0 { - if err := cm.flush(mem, 0); err != nil { - return err - } - } - if err := cm.commit(file.Num(), db.seq); err != nil { - return err - } - cm.reset() - of.Remove() - of = nil - } - - // Replay journal to memdb. - mem.Reset() - for { - r, err := jr.Next() - if err != nil { - if err == io.EOF { - break - } - return errors.SetFile(err, file) - } - - buf.Reset() - if _, err := buf.ReadFrom(r); err != nil { - if err == io.ErrUnexpectedEOF { - // This is error returned due to corruption, with strict == false. - continue - } else { - return errors.SetFile(err, file) - } - } - if err := batch.memDecodeAndReplay(db.seq, buf.Bytes(), mem); err != nil { - if strict || !errors.IsCorrupted(err) { - return errors.SetFile(err, file) - } else { - db.s.logf("journal error: %v (skipped)", err) - // We won't apply sequence number as it might be corrupted. - continue - } - } - - // Save sequence number. - db.seq = batch.seq + uint64(batch.Len()) - - // Flush it if large enough. - if mem.Size() >= writeBuffer { - if err := cm.flush(mem, 0); err != nil { - return err - } - mem.Reset() - } - } - - of = file - return nil - } - - // Recover all journals. - if len(journalFiles) > 0 { - db.logf("journal@recovery F·%d", len(journalFiles)) + // Recover journals. + if len(recJournalFiles) > 0 { + db.logf("journal@recovery F·%d", len(recJournalFiles)) // Mark file number as used. - db.s.markFileNum(journalFiles[len(journalFiles)-1].Num()) + db.s.markFileNum(recJournalFiles[len(recJournalFiles)-1].Num()) - mem = memdb.New(db.s.icmp, writeBuffer) - for _, file := range journalFiles { - if err := recoverJournal(file); err != nil { + var ( + // Options. + strict = db.s.o.GetStrict(opt.StrictJournal) + checksum = db.s.o.GetStrict(opt.StrictJournalChecksum) + writeBuffer = db.s.o.GetWriteBuffer() + + jr *journal.Reader + mdb = memdb.New(db.s.icmp, writeBuffer) + buf = &util.Buffer{} + batch = &Batch{} + ) + + for _, jf := range recJournalFiles { + db.logf("journal@recovery recovering @%d", jf.Num()) + + fr, err := jf.Open() + if err != nil { return err } + + // Create or reset journal reader instance. + if jr == nil { + jr = journal.NewReader(fr, dropper{db.s, jf}, strict, checksum) + } else { + jr.Reset(fr, dropper{db.s, jf}, strict, checksum) + } + + // Flush memdb and remove obsolete journal file. + if of != nil { + if mdb.Len() > 0 { + if _, err := db.s.flushMemdb(rec, mdb, -1); err != nil { + fr.Close() + return err + } + } + + rec.setJournalNum(jf.Num()) + rec.setSeqNum(db.seq) + if err := db.s.commit(rec); err != nil { + fr.Close() + return err + } + rec.resetAddedTables() + + of.Remove() + of = nil + } + + // Replay journal to memdb. + mdb.Reset() + for { + r, err := jr.Next() + if err != nil { + if err == io.EOF { + break + } + + fr.Close() + return errors.SetFile(err, jf) + } + + buf.Reset() + if _, err := buf.ReadFrom(r); err != nil { + if err == io.ErrUnexpectedEOF { + // This is error returned due to corruption, with strict == false. + continue + } + + fr.Close() + return errors.SetFile(err, jf) + } + if err := batch.memDecodeAndReplay(db.seq, buf.Bytes(), mdb); err != nil { + if !strict && errors.IsCorrupted(err) { + db.s.logf("journal error: %v (skipped)", err) + // We won't apply sequence number as it might be corrupted. + continue + } + + fr.Close() + return errors.SetFile(err, jf) + } + + // Save sequence number. + db.seq = batch.seq + uint64(batch.Len()) + + // Flush it if large enough. + if mdb.Size() >= writeBuffer { + if _, err := db.s.flushMemdb(rec, mdb, 0); err != nil { + fr.Close() + return err + } + + mdb.Reset() + } + } + + fr.Close() + of = jf } - // Flush the last journal. - if mem.Len() > 0 { - if err := cm.flush(mem, 0); err != nil { + // Flush the last memdb. + if mdb.Len() > 0 { + if _, err := db.s.flushMemdb(rec, mdb, 0); err != nil { return err } } @@ -587,8 +608,10 @@ func (db *DB) recoverJournal() error { } // Commit. - if err := cm.commit(db.journalFile.Num(), db.seq); err != nil { - // Close journal. + rec.setJournalNum(db.journalFile.Num()) + rec.setSeqNum(db.seq) + if err := db.s.commit(rec); err != nil { + // Close journal on error. if db.journal != nil { db.journal.Close() db.journalWriter.Close() @@ -604,6 +627,103 @@ func (db *DB) recoverJournal() error { return nil } +func (db *DB) recoverJournalRO() error { + // Get all journals and sort it by file number. + allJournalFiles, err := db.s.getFiles(storage.TypeJournal) + if err != nil { + return err + } + files(allJournalFiles).sort() + + // Journals that will be recovered. + var recJournalFiles []storage.File + for _, jf := range allJournalFiles { + if jf.Num() >= db.s.stJournalNum || jf.Num() == db.s.stPrevJournalNum { + recJournalFiles = append(recJournalFiles, jf) + } + } + + var ( + // Options. + strict = db.s.o.GetStrict(opt.StrictJournal) + checksum = db.s.o.GetStrict(opt.StrictJournalChecksum) + writeBuffer = db.s.o.GetWriteBuffer() + + mdb = memdb.New(db.s.icmp, writeBuffer) + ) + + // Recover journals. + if len(recJournalFiles) > 0 { + db.logf("journal@recovery RO·Mode F·%d", len(recJournalFiles)) + + var ( + jr *journal.Reader + buf = &util.Buffer{} + batch = &Batch{} + ) + + for _, jf := range recJournalFiles { + db.logf("journal@recovery recovering @%d", jf.Num()) + + fr, err := jf.Open() + if err != nil { + return err + } + + // Create or reset journal reader instance. + if jr == nil { + jr = journal.NewReader(fr, dropper{db.s, jf}, strict, checksum) + } else { + jr.Reset(fr, dropper{db.s, jf}, strict, checksum) + } + + // Replay journal to memdb. + for { + r, err := jr.Next() + if err != nil { + if err == io.EOF { + break + } + + fr.Close() + return errors.SetFile(err, jf) + } + + buf.Reset() + if _, err := buf.ReadFrom(r); err != nil { + if err == io.ErrUnexpectedEOF { + // This is error returned due to corruption, with strict == false. + continue + } + + fr.Close() + return errors.SetFile(err, jf) + } + if err := batch.memDecodeAndReplay(db.seq, buf.Bytes(), mdb); err != nil { + if !strict && errors.IsCorrupted(err) { + db.s.logf("journal error: %v (skipped)", err) + // We won't apply sequence number as it might be corrupted. + continue + } + + fr.Close() + return errors.SetFile(err, jf) + } + + // Save sequence number. + db.seq = batch.seq + uint64(batch.Len()) + } + + fr.Close() + } + } + + // Set memDB. + db.mem = &memDB{db: db, DB: mdb, ref: 1} + + return nil +} + func (db *DB) get(key []byte, seq uint64, ro *opt.ReadOptions) (value []byte, err error) { ikey := newIkey(key, seq, ktSeek) @@ -614,7 +734,7 @@ func (db *DB) get(key []byte, seq uint64, ro *opt.ReadOptions) (value []byte, er } defer m.decref() - mk, mv, me := m.mdb.Find(ikey) + mk, mv, me := m.Find(ikey) if me == nil { ukey, _, kt, kerr := parseIkey(mk) if kerr != nil { @@ -652,7 +772,7 @@ func (db *DB) has(key []byte, seq uint64, ro *opt.ReadOptions) (ret bool, err er } defer m.decref() - mk, _, me := m.mdb.Find(ikey) + mk, _, me := m.Find(ikey) if me == nil { ukey, _, kt, kerr := parseIkey(mk) if kerr != nil { @@ -784,7 +904,7 @@ func (db *DB) GetProperty(name string) (value string, err error) { const prefix = "leveldb." if !strings.HasPrefix(name, prefix) { - return "", errors.New("leveldb: GetProperty: unknown property: " + name) + return "", ErrNotFound } p := name[len(prefix):] @@ -798,7 +918,7 @@ func (db *DB) GetProperty(name string) (value string, err error) { var rest string n, _ := fmt.Sscanf(p[len(numFilesPrefix):], "%d%s", &level, &rest) if n != 1 || int(level) >= db.s.o.GetNumLevel() { - err = errors.New("leveldb: GetProperty: invalid property: " + name) + err = ErrNotFound } else { value = fmt.Sprint(v.tLen(int(level))) } @@ -837,7 +957,7 @@ func (db *DB) GetProperty(name string) (value string, err error) { case p == "aliveiters": value = fmt.Sprintf("%d", atomic.LoadInt32(&db.aliveIters)) default: - err = errors.New("leveldb: GetProperty: unknown property: " + name) + err = ErrNotFound } return @@ -900,6 +1020,9 @@ func (db *DB) Close() error { var err error select { case err = <-db.compErrC: + if err == ErrReadOnly { + err = nil + } default: } diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_compaction.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_compaction.go index 447407aba..26003106e 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_compaction.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_compaction.go @@ -11,7 +11,6 @@ import ( "time" "github.com/syndtr/goleveldb/leveldb/errors" - "github.com/syndtr/goleveldb/leveldb/memdb" "github.com/syndtr/goleveldb/leveldb/opt" ) @@ -62,58 +61,8 @@ func (p *cStatsStaging) stopTimer() { } } -type cMem struct { - s *session - level int - rec *sessionRecord -} - -func newCMem(s *session) *cMem { - return &cMem{s: s, rec: &sessionRecord{numLevel: s.o.GetNumLevel()}} -} - -func (c *cMem) flush(mem *memdb.DB, level int) error { - s := c.s - - // Write memdb to table. - iter := mem.NewIterator(nil) - defer iter.Release() - t, n, err := s.tops.createFrom(iter) - if err != nil { - return err - } - - // Pick level. - if level < 0 { - v := s.version() - level = v.pickLevel(t.imin.ukey(), t.imax.ukey()) - v.release() - } - c.rec.addTableFile(level, t) - - s.logf("mem@flush created L%d@%d N·%d S·%s %q:%q", level, t.file.Num(), n, shortenb(int(t.size)), t.imin, t.imax) - - c.level = level - return nil -} - -func (c *cMem) reset() { - c.rec = &sessionRecord{numLevel: c.s.o.GetNumLevel()} -} - -func (c *cMem) commit(journal, seq uint64) error { - c.rec.setJournalNum(journal) - c.rec.setSeqNum(seq) - - // Commit changes. - return c.s.commit(c.rec) -} - func (db *DB) compactionError() { - var ( - err error - wlocked bool - ) + var err error noerr: // No error. for { @@ -121,7 +70,7 @@ noerr: case err = <-db.compErrSetC: switch { case err == nil: - case errors.IsCorrupted(err): + case err == ErrReadOnly, errors.IsCorrupted(err): goto hasperr default: goto haserr @@ -139,7 +88,7 @@ haserr: switch { case err == nil: goto noerr - case errors.IsCorrupted(err): + case err == ErrReadOnly, errors.IsCorrupted(err): goto hasperr default: } @@ -155,9 +104,9 @@ hasperr: case db.compPerErrC <- err: case db.writeLockC <- struct{}{}: // Hold write lock, so that write won't pass-through. - wlocked = true + db.compWriteLocking = true case _, _ = <-db.closeC: - if wlocked { + if db.compWriteLocking { // We should release the lock or Close will hang. <-db.writeLockC } @@ -287,21 +236,18 @@ func (db *DB) compactionExitTransact() { } func (db *DB) memCompaction() { - mem := db.getFrozenMem() - if mem == nil { + mdb := db.getFrozenMem() + if mdb == nil { return } - defer mem.decref() + defer mdb.decref() - c := newCMem(db.s) - stats := new(cStatsStaging) - - db.logf("mem@flush N·%d S·%s", mem.mdb.Len(), shortenb(mem.mdb.Size())) + db.logf("memdb@flush N·%d S·%s", mdb.Len(), shortenb(mdb.Size())) // Don't compact empty memdb. - if mem.mdb.Len() == 0 { - db.logf("mem@flush skipping") - // drop frozen mem + if mdb.Len() == 0 { + db.logf("memdb@flush skipping") + // drop frozen memdb db.dropFrozenMem() return } @@ -317,13 +263,20 @@ func (db *DB) memCompaction() { return } - db.compactionTransactFunc("mem@flush", func(cnt *compactionTransactCounter) (err error) { + var ( + rec = &sessionRecord{} + stats = &cStatsStaging{} + flushLevel int + ) + + db.compactionTransactFunc("memdb@flush", func(cnt *compactionTransactCounter) (err error) { stats.startTimer() - defer stats.stopTimer() - return c.flush(mem.mdb, -1) + flushLevel, err = db.s.flushMemdb(rec, mdb.DB, -1) + stats.stopTimer() + return }, func() error { - for _, r := range c.rec.addedTables { - db.logf("mem@flush revert @%d", r.num) + for _, r := range rec.addedTables { + db.logf("memdb@flush revert @%d", r.num) f := db.s.getTableFile(r.num) if err := f.Remove(); err != nil { return err @@ -332,20 +285,23 @@ func (db *DB) memCompaction() { return nil }) - db.compactionTransactFunc("mem@commit", func(cnt *compactionTransactCounter) (err error) { + db.compactionTransactFunc("memdb@commit", func(cnt *compactionTransactCounter) (err error) { stats.startTimer() - defer stats.stopTimer() - return c.commit(db.journalFile.Num(), db.frozenSeq) + rec.setJournalNum(db.journalFile.Num()) + rec.setSeqNum(db.frozenSeq) + err = db.s.commit(rec) + stats.stopTimer() + return }, nil) - db.logf("mem@flush committed F·%d T·%v", len(c.rec.addedTables), stats.duration) + db.logf("memdb@flush committed F·%d T·%v", len(rec.addedTables), stats.duration) - for _, r := range c.rec.addedTables { + for _, r := range rec.addedTables { stats.write += r.size } - db.compStats[c.level].add(stats) + db.compStats[flushLevel].add(stats) - // Drop frozen mem. + // Drop frozen memdb. db.dropFrozenMem() // Resume table compaction. @@ -557,7 +513,7 @@ func (b *tableCompactionBuilder) revert() error { func (db *DB) tableCompaction(c *compaction, noTrivial bool) { defer c.release() - rec := &sessionRecord{numLevel: db.s.o.GetNumLevel()} + rec := &sessionRecord{} rec.addCompPtr(c.level, c.imax) if !noTrivial && c.trivial() { diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_iter.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_iter.go index 4607e5daf..656ae9856 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_iter.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_iter.go @@ -8,6 +8,7 @@ package leveldb import ( "errors" + "math/rand" "runtime" "sync" "sync/atomic" @@ -39,11 +40,11 @@ func (db *DB) newRawIterator(slice *util.Range, ro *opt.ReadOptions) iterator.It ti := v.getIterators(slice, ro) n := len(ti) + 2 i := make([]iterator.Iterator, 0, n) - emi := em.mdb.NewIterator(slice) + emi := em.NewIterator(slice) emi.SetReleaser(&memdbReleaser{m: em}) i = append(i, emi) if fm != nil { - fmi := fm.mdb.NewIterator(slice) + fmi := fm.NewIterator(slice) fmi.SetReleaser(&memdbReleaser{m: fm}) i = append(i, fmi) } @@ -80,6 +81,10 @@ func (db *DB) newIterator(seq uint64, slice *util.Range, ro *opt.ReadOptions) *d return iter } +func (db *DB) iterSamplingRate() int { + return rand.Intn(2 * db.s.o.GetIteratorSamplingRate()) +} + type dir int const ( @@ -98,11 +103,21 @@ type dbIter struct { seq uint64 strict bool - dir dir - key []byte - value []byte - err error - releaser util.Releaser + smaplingGap int + dir dir + key []byte + value []byte + err error + releaser util.Releaser +} + +func (i *dbIter) sampleSeek() { + ikey := i.iter.Key() + i.smaplingGap -= len(ikey) + len(i.iter.Value()) + for i.smaplingGap < 0 { + i.smaplingGap += i.db.iterSamplingRate() + i.db.sampleSeek(ikey) + } } func (i *dbIter) setErr(err error) { @@ -175,6 +190,7 @@ func (i *dbIter) Seek(key []byte) bool { func (i *dbIter) next() bool { for { if ukey, seq, kt, kerr := parseIkey(i.iter.Key()); kerr == nil { + i.sampleSeek() if seq <= i.seq { switch kt { case ktDel: @@ -225,6 +241,7 @@ func (i *dbIter) prev() bool { if i.iter.Valid() { for { if ukey, seq, kt, kerr := parseIkey(i.iter.Key()); kerr == nil { + i.sampleSeek() if seq <= i.seq { if !del && i.icmp.uCompare(ukey, i.key) < 0 { return true @@ -266,6 +283,7 @@ func (i *dbIter) Prev() bool { case dirForward: for i.iter.Prev() { if ukey, _, _, kerr := parseIkey(i.iter.Key()); kerr == nil { + i.sampleSeek() if i.icmp.uCompare(ukey, i.key) < 0 { goto cont } diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_state.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_state.go index 24ecab504..24671dd39 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_state.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_state.go @@ -15,8 +15,8 @@ import ( ) type memDB struct { - db *DB - mdb *memdb.DB + db *DB + *memdb.DB ref int32 } @@ -27,12 +27,12 @@ func (m *memDB) incref() { func (m *memDB) decref() { if ref := atomic.AddInt32(&m.ref, -1); ref == 0 { // Only put back memdb with std capacity. - if m.mdb.Capacity() == m.db.s.o.GetWriteBuffer() { - m.mdb.Reset() - m.db.mpoolPut(m.mdb) + if m.Capacity() == m.db.s.o.GetWriteBuffer() { + m.Reset() + m.db.mpoolPut(m.DB) } m.db = nil - m.mdb = nil + m.DB = nil } else if ref < 0 { panic("negative memdb ref") } @@ -48,6 +48,15 @@ func (db *DB) addSeq(delta uint64) { atomic.AddUint64(&db.seq, delta) } +func (db *DB) sampleSeek(ikey iKey) { + v := db.s.version() + if v.sampleSeek(ikey) { + // Trigger table compaction. + db.compSendTrigger(db.tcompCmdC) + } + v.release() +} + func (db *DB) mpoolPut(mem *memdb.DB) { defer func() { recover() @@ -117,7 +126,7 @@ func (db *DB) newMem(n int) (mem *memDB, err error) { } mem = &memDB{ db: db, - mdb: mdb, + DB: mdb, ref: 2, } db.mem = mem diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_test.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_test.go index 0acb567a1..9d91ebf1a 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_test.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_test.go @@ -405,19 +405,21 @@ func (h *dbHarness) compactRange(min, max string) { t.Log("DB range compaction done") } -func (h *dbHarness) sizeAssert(start, limit string, low, hi uint64) { - t := h.t - db := h.db - - s, err := db.SizeOf([]util.Range{ +func (h *dbHarness) sizeOf(start, limit string) uint64 { + sz, err := h.db.SizeOf([]util.Range{ {[]byte(start), []byte(limit)}, }) if err != nil { - t.Error("SizeOf: got error: ", err) + h.t.Error("SizeOf: got error: ", err) } - if s.Sum() < low || s.Sum() > hi { - t.Errorf("sizeof %q to %q not in range, want %d - %d, got %d", - shorten(start), shorten(limit), low, hi, s.Sum()) + return sz.Sum() +} + +func (h *dbHarness) sizeAssert(start, limit string, low, hi uint64) { + sz := h.sizeOf(start, limit) + if sz < low || sz > hi { + h.t.Errorf("sizeOf %q to %q not in range, want %d - %d, got %d", + shorten(start), shorten(limit), low, hi, sz) } } @@ -2443,7 +2445,7 @@ func TestDB_TableCompactionBuilder(t *testing.T) { if err != nil { t.Fatal(err) } - rec := &sessionRecord{numLevel: s.o.GetNumLevel()} + rec := &sessionRecord{} rec.addTableFile(i, tf) if err := s.commit(rec); err != nil { t.Fatal(err) @@ -2453,7 +2455,7 @@ func TestDB_TableCompactionBuilder(t *testing.T) { // Build grandparent. v := s.version() c := newCompaction(s, v, 1, append(tFiles{}, v.tables[1]...)) - rec := &sessionRecord{numLevel: s.o.GetNumLevel()} + rec := &sessionRecord{} b := &tableCompactionBuilder{ s: s, c: c, @@ -2477,7 +2479,7 @@ func TestDB_TableCompactionBuilder(t *testing.T) { // Build level-1. v = s.version() c = newCompaction(s, v, 0, append(tFiles{}, v.tables[0]...)) - rec = &sessionRecord{numLevel: s.o.GetNumLevel()} + rec = &sessionRecord{} b = &tableCompactionBuilder{ s: s, c: c, @@ -2521,7 +2523,7 @@ func TestDB_TableCompactionBuilder(t *testing.T) { // Compaction with transient error. v = s.version() c = newCompaction(s, v, 1, append(tFiles{}, v.tables[1]...)) - rec = &sessionRecord{numLevel: s.o.GetNumLevel()} + rec = &sessionRecord{} b = &tableCompactionBuilder{ s: s, c: c, @@ -2577,3 +2579,123 @@ func TestDB_TableCompactionBuilder(t *testing.T) { } v.release() } + +func testDB_IterTriggeredCompaction(t *testing.T, limitDiv int) { + const ( + vSize = 200 * opt.KiB + tSize = 100 * opt.MiB + mIter = 100 + n = tSize / vSize + ) + + h := newDbHarnessWopt(t, &opt.Options{ + Compression: opt.NoCompression, + DisableBlockCache: true, + }) + defer h.close() + + key := func(x int) string { + return fmt.Sprintf("v%06d", x) + } + + // Fill. + value := strings.Repeat("x", vSize) + for i := 0; i < n; i++ { + h.put(key(i), value) + } + h.compactMem() + + // Delete all. + for i := 0; i < n; i++ { + h.delete(key(i)) + } + h.compactMem() + + var ( + limit = n / limitDiv + + startKey = key(0) + limitKey = key(limit) + maxKey = key(n) + slice = &util.Range{Limit: []byte(limitKey)} + + initialSize0 = h.sizeOf(startKey, limitKey) + initialSize1 = h.sizeOf(limitKey, maxKey) + ) + + t.Logf("inital size %s [rest %s]", shortenb(int(initialSize0)), shortenb(int(initialSize1))) + + for r := 0; true; r++ { + if r >= mIter { + t.Fatal("taking too long to compact") + } + + // Iterates. + iter := h.db.NewIterator(slice, h.ro) + for iter.Next() { + } + if err := iter.Error(); err != nil { + t.Fatalf("Iter err: %v", err) + } + iter.Release() + + // Wait compaction. + h.waitCompaction() + + // Check size. + size0 := h.sizeOf(startKey, limitKey) + size1 := h.sizeOf(limitKey, maxKey) + t.Logf("#%03d size %s [rest %s]", r, shortenb(int(size0)), shortenb(int(size1))) + if size0 < initialSize0/10 { + break + } + } + + if initialSize1 > 0 { + h.sizeAssert(limitKey, maxKey, initialSize1/4-opt.MiB, initialSize1+opt.MiB) + } +} + +func TestDB_IterTriggeredCompaction(t *testing.T) { + testDB_IterTriggeredCompaction(t, 1) +} + +func TestDB_IterTriggeredCompactionHalf(t *testing.T) { + testDB_IterTriggeredCompaction(t, 2) +} + +func TestDB_ReadOnly(t *testing.T) { + h := newDbHarness(t) + defer h.close() + + h.put("foo", "v1") + h.put("bar", "v2") + h.compactMem() + + h.put("xfoo", "v1") + h.put("xbar", "v2") + + t.Log("Trigger read-only") + if err := h.db.SetReadOnly(); err != nil { + h.close() + t.Fatalf("SetReadOnly error: %v", err) + } + + h.stor.SetEmuErr(storage.TypeAll, tsOpCreate, tsOpReplace, tsOpRemove, tsOpWrite, tsOpWrite, tsOpSync) + + ro := func(key, value, wantValue string) { + if err := h.db.Put([]byte(key), []byte(value), h.wo); err != ErrReadOnly { + t.Fatalf("unexpected error: %v", err) + } + h.getVal(key, wantValue) + } + + ro("foo", "vx", "v1") + + h.o.ReadOnly = true + h.reopenDB() + + ro("foo", "vx", "v1") + ro("bar", "vx", "v2") + h.assertNumKeys(4) +} diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_write.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_write.go index e1cf30c53..176ee893f 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_write.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/db_write.go @@ -63,24 +63,24 @@ func (db *DB) rotateMem(n int) (mem *memDB, err error) { return } -func (db *DB) flush(n int) (mem *memDB, nn int, err error) { +func (db *DB) flush(n int) (mdb *memDB, mdbFree int, err error) { delayed := false flush := func() (retry bool) { v := db.s.version() defer v.release() - mem = db.getEffectiveMem() + mdb = db.getEffectiveMem() defer func() { if retry { - mem.decref() - mem = nil + mdb.decref() + mdb = nil } }() - nn = mem.mdb.Free() + mdbFree = mdb.Free() switch { case v.tLen(0) >= db.s.o.GetWriteL0SlowdownTrigger() && !delayed: delayed = true time.Sleep(time.Millisecond) - case nn >= n: + case mdbFree >= n: return false case v.tLen(0) >= db.s.o.GetWriteL0PauseTrigger(): delayed = true @@ -90,15 +90,15 @@ func (db *DB) flush(n int) (mem *memDB, nn int, err error) { } default: // Allow memdb to grow if it has no entry. - if mem.mdb.Len() == 0 { - nn = n + if mdb.Len() == 0 { + mdbFree = n } else { - mem.decref() - mem, err = db.rotateMem(n) + mdb.decref() + mdb, err = db.rotateMem(n) if err == nil { - nn = mem.mdb.Free() + mdbFree = mdb.Free() } else { - nn = 0 + mdbFree = 0 } } return false @@ -157,18 +157,18 @@ func (db *DB) Write(b *Batch, wo *opt.WriteOptions) (err error) { } }() - mem, memFree, err := db.flush(b.size()) + mdb, mdbFree, err := db.flush(b.size()) if err != nil { return } - defer mem.decref() + defer mdb.decref() // Calculate maximum size of the batch. m := 1 << 20 if x := b.size(); x <= 128<<10 { m = x + (128 << 10) } - m = minInt(m, memFree) + m = minInt(m, mdbFree) // Merge with other batch. drain: @@ -197,7 +197,7 @@ drain: select { case db.journalC <- b: // Write into memdb - if berr := b.memReplay(mem.mdb); berr != nil { + if berr := b.memReplay(mdb.DB); berr != nil { panic(berr) } case err = <-db.compPerErrC: @@ -211,7 +211,7 @@ drain: case err = <-db.journalAckC: if err != nil { // Revert memdb if error detected - if berr := b.revertMemReplay(mem.mdb); berr != nil { + if berr := b.revertMemReplay(mdb.DB); berr != nil { panic(berr) } return @@ -225,7 +225,7 @@ drain: if err != nil { return } - if berr := b.memReplay(mem.mdb); berr != nil { + if berr := b.memReplay(mdb.DB); berr != nil { panic(berr) } } @@ -233,7 +233,7 @@ drain: // Set last seq number. db.addSeq(uint64(b.Len())) - if b.size() >= memFree { + if b.size() >= mdbFree { db.rotateMem(0) } return @@ -249,8 +249,7 @@ func (db *DB) Put(key, value []byte, wo *opt.WriteOptions) error { return db.Write(b, wo) } -// Delete deletes the value for the given key. It returns ErrNotFound if -// the DB does not contain the key. +// Delete deletes the value for the given key. // // It is safe to modify the contents of the arguments after Delete returns. func (db *DB) Delete(key []byte, wo *opt.WriteOptions) error { @@ -290,9 +289,9 @@ func (db *DB) CompactRange(r util.Range) error { } // Check for overlaps in memdb. - mem := db.getEffectiveMem() - defer mem.decref() - if isMemOverlaps(db.s.icmp, mem.mdb, r.Start, r.Limit) { + mdb := db.getEffectiveMem() + defer mdb.decref() + if isMemOverlaps(db.s.icmp, mdb.DB, r.Start, r.Limit) { // Memdb compaction. if _, err := db.rotateMem(0); err != nil { <-db.writeLockC @@ -309,3 +308,31 @@ func (db *DB) CompactRange(r util.Range) error { // Table compaction. return db.compSendRange(db.tcompCmdC, -1, r.Start, r.Limit) } + +// SetReadOnly makes DB read-only. It will stay read-only until reopened. +func (db *DB) SetReadOnly() error { + if err := db.ok(); err != nil { + return err + } + + // Lock writer. + select { + case db.writeLockC <- struct{}{}: + db.compWriteLocking = true + case err := <-db.compPerErrC: + return err + case _, _ = <-db.closeC: + return ErrClosed + } + + // Set compaction read-only. + select { + case db.compErrSetC <- ErrReadOnly: + case perr := <-db.compPerErrC: + return perr + case _, _ = <-db.closeC: + return ErrClosed + } + + return nil +} diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/errors.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/errors.go index 29d0d2f27..c8bd66a5a 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/errors.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/errors.go @@ -12,6 +12,7 @@ import ( var ( ErrNotFound = errors.ErrNotFound + ErrReadOnly = errors.New("leveldb: read-only mode") ErrSnapshotReleased = errors.New("leveldb: snapshot released") ErrIterReleased = errors.New("leveldb: iterator released") ErrClosed = errors.New("leveldb: closed") diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/memdb/memdb.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/memdb/memdb.go index e5398873b..1395bd928 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/memdb/memdb.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/memdb/memdb.go @@ -206,6 +206,7 @@ func (p *DB) randHeight() (h int) { return } +// Must hold RW-lock if prev == true, as it use shared prevNode slice. func (p *DB) findGE(key []byte, prev bool) (int, bool) { node := 0 h := p.maxHeight - 1 @@ -302,7 +303,7 @@ func (p *DB) Put(key []byte, value []byte) error { node := len(p.nodeData) p.nodeData = append(p.nodeData, kvOffset, len(key), len(value), h) for i, n := range p.prevNode[:h] { - m := n + 4 + i + m := n + nNext + i p.nodeData = append(p.nodeData, p.nodeData[m]) p.nodeData[m] = node } @@ -434,20 +435,22 @@ func (p *DB) Len() int { // Reset resets the DB to initial empty state. Allows reuse the buffer. func (p *DB) Reset() { + p.mu.Lock() p.rnd = rand.New(rand.NewSource(0xdeadbeef)) p.maxHeight = 1 p.n = 0 p.kvSize = 0 p.kvData = p.kvData[:0] - p.nodeData = p.nodeData[:4+tMaxHeight] + p.nodeData = p.nodeData[:nNext+tMaxHeight] p.nodeData[nKV] = 0 p.nodeData[nKey] = 0 p.nodeData[nVal] = 0 p.nodeData[nHeight] = tMaxHeight for n := 0; n < tMaxHeight; n++ { - p.nodeData[4+n] = 0 + p.nodeData[nNext+n] = 0 p.prevNode[n] = 0 } + p.mu.Unlock() } // New creates a new initalized in-memory key/value DB. The capacity diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/opt/options.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/opt/options.go index 2b4289744..f9a309dac 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/opt/options.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/opt/options.go @@ -34,10 +34,11 @@ var ( DefaultCompactionTotalSize = 10 * MiB DefaultCompactionTotalSizeMultiplier = 10.0 DefaultCompressionType = SnappyCompression - DefaultOpenFilesCacher = LRUCacher - DefaultOpenFilesCacheCapacity = 500 + DefaultIteratorSamplingRate = 1 * MiB DefaultMaxMemCompationLevel = 2 DefaultNumLevel = 7 + DefaultOpenFilesCacher = LRUCacher + DefaultOpenFilesCacheCapacity = 500 DefaultWriteBuffer = 4 * MiB DefaultWriteL0PauseTrigger = 12 DefaultWriteL0SlowdownTrigger = 8 @@ -249,6 +250,11 @@ type Options struct { // The default value (DefaultCompression) uses snappy compression. Compression Compression + // DisableBufferPool allows disable use of util.BufferPool functionality. + // + // The default value is false. + DisableBufferPool bool + // DisableBlockCache allows disable use of cache.Cache functionality on // 'sorted table' block. // @@ -288,6 +294,13 @@ type Options struct { // The default value is nil. Filter filter.Filter + // IteratorSamplingRate defines approximate gap (in bytes) between read + // sampling of an iterator. The samples will be used to determine when + // compaction should be triggered. + // + // The default is 1MiB. + IteratorSamplingRate int + // MaxMemCompationLevel defines maximum level a newly compacted 'memdb' // will be pushed into if doesn't creates overlap. This should less than // NumLevel. Use -1 for level-0. @@ -313,6 +326,11 @@ type Options struct { // The default value is 500. OpenFilesCacheCapacity int + // If true then opens DB in read-only mode. + // + // The default value is false. + ReadOnly bool + // Strict defines the DB strict level. Strict Strict @@ -464,6 +482,20 @@ func (o *Options) GetCompression() Compression { return o.Compression } +func (o *Options) GetDisableBufferPool() bool { + if o == nil { + return false + } + return o.DisableBufferPool +} + +func (o *Options) GetDisableBlockCache() bool { + if o == nil { + return false + } + return o.DisableBlockCache +} + func (o *Options) GetDisableCompactionBackoff() bool { if o == nil { return false @@ -492,6 +524,13 @@ func (o *Options) GetFilter() filter.Filter { return o.Filter } +func (o *Options) GetIteratorSamplingRate() int { + if o == nil || o.IteratorSamplingRate <= 0 { + return DefaultIteratorSamplingRate + } + return o.IteratorSamplingRate +} + func (o *Options) GetMaxMemCompationLevel() int { level := DefaultMaxMemCompationLevel if o != nil { @@ -533,6 +572,13 @@ func (o *Options) GetOpenFilesCacheCapacity() int { return o.OpenFilesCacheCapacity } +func (o *Options) GetReadOnly() bool { + if o == nil { + return false + } + return o.ReadOnly +} + func (o *Options) GetStrict(strict Strict) bool { if o == nil || o.Strict == 0 { return DefaultStrict&strict != 0 diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/session.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/session.go index b3906f7fc..f0bba4602 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/session.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/session.go @@ -11,10 +11,8 @@ import ( "io" "os" "sync" - "sync/atomic" "github.com/syndtr/goleveldb/leveldb/errors" - "github.com/syndtr/goleveldb/leveldb/iterator" "github.com/syndtr/goleveldb/leveldb/journal" "github.com/syndtr/goleveldb/leveldb/opt" "github.com/syndtr/goleveldb/leveldb/storage" @@ -127,11 +125,16 @@ func (s *session) recover() (err error) { return } defer reader.Close() - strict := s.o.GetStrict(opt.StrictManifest) - jr := journal.NewReader(reader, dropper{s, m}, strict, true) - staging := s.stVersion.newStaging() - rec := &sessionRecord{numLevel: s.o.GetNumLevel()} + var ( + // Options. + numLevel = s.o.GetNumLevel() + strict = s.o.GetStrict(opt.StrictManifest) + + jr = journal.NewReader(reader, dropper{s, m}, strict, true) + rec = &sessionRecord{} + staging = s.stVersion.newStaging() + ) for { var r io.Reader r, err = jr.Next() @@ -143,7 +146,7 @@ func (s *session) recover() (err error) { return errors.SetFile(err, m) } - err = rec.decode(r) + err = rec.decode(r, numLevel) if err == nil { // save compact pointers for _, r := range rec.compPtrs { @@ -206,250 +209,3 @@ func (s *session) commit(r *sessionRecord) (err error) { return } - -// Pick a compaction based on current state; need external synchronization. -func (s *session) pickCompaction() *compaction { - v := s.version() - - var level int - var t0 tFiles - if v.cScore >= 1 { - level = v.cLevel - cptr := s.stCompPtrs[level] - tables := v.tables[level] - for _, t := range tables { - if cptr == nil || s.icmp.Compare(t.imax, cptr) > 0 { - t0 = append(t0, t) - break - } - } - if len(t0) == 0 { - t0 = append(t0, tables[0]) - } - } else { - if p := atomic.LoadPointer(&v.cSeek); p != nil { - ts := (*tSet)(p) - level = ts.level - t0 = append(t0, ts.table) - } else { - v.release() - return nil - } - } - - return newCompaction(s, v, level, t0) -} - -// Create compaction from given level and range; need external synchronization. -func (s *session) getCompactionRange(level int, umin, umax []byte) *compaction { - v := s.version() - - t0 := v.tables[level].getOverlaps(nil, s.icmp, umin, umax, level == 0) - if len(t0) == 0 { - v.release() - return nil - } - - // Avoid compacting too much in one shot in case the range is large. - // But we cannot do this for level-0 since level-0 files can overlap - // and we must not pick one file and drop another older file if the - // two files overlap. - if level > 0 { - limit := uint64(v.s.o.GetCompactionSourceLimit(level)) - total := uint64(0) - for i, t := range t0 { - total += t.size - if total >= limit { - s.logf("table@compaction limiting F·%d -> F·%d", len(t0), i+1) - t0 = t0[:i+1] - break - } - } - } - - return newCompaction(s, v, level, t0) -} - -func newCompaction(s *session, v *version, level int, t0 tFiles) *compaction { - c := &compaction{ - s: s, - v: v, - level: level, - tables: [2]tFiles{t0, nil}, - maxGPOverlaps: uint64(s.o.GetCompactionGPOverlaps(level)), - tPtrs: make([]int, s.o.GetNumLevel()), - } - c.expand() - c.save() - return c -} - -// compaction represent a compaction state. -type compaction struct { - s *session - v *version - - level int - tables [2]tFiles - maxGPOverlaps uint64 - - gp tFiles - gpi int - seenKey bool - gpOverlappedBytes uint64 - imin, imax iKey - tPtrs []int - released bool - - snapGPI int - snapSeenKey bool - snapGPOverlappedBytes uint64 - snapTPtrs []int -} - -func (c *compaction) save() { - c.snapGPI = c.gpi - c.snapSeenKey = c.seenKey - c.snapGPOverlappedBytes = c.gpOverlappedBytes - c.snapTPtrs = append(c.snapTPtrs[:0], c.tPtrs...) -} - -func (c *compaction) restore() { - c.gpi = c.snapGPI - c.seenKey = c.snapSeenKey - c.gpOverlappedBytes = c.snapGPOverlappedBytes - c.tPtrs = append(c.tPtrs[:0], c.snapTPtrs...) -} - -func (c *compaction) release() { - if !c.released { - c.released = true - c.v.release() - } -} - -// Expand compacted tables; need external synchronization. -func (c *compaction) expand() { - limit := uint64(c.s.o.GetCompactionExpandLimit(c.level)) - vt0, vt1 := c.v.tables[c.level], c.v.tables[c.level+1] - - t0, t1 := c.tables[0], c.tables[1] - imin, imax := t0.getRange(c.s.icmp) - // We expand t0 here just incase ukey hop across tables. - t0 = vt0.getOverlaps(t0, c.s.icmp, imin.ukey(), imax.ukey(), c.level == 0) - if len(t0) != len(c.tables[0]) { - imin, imax = t0.getRange(c.s.icmp) - } - t1 = vt1.getOverlaps(t1, c.s.icmp, imin.ukey(), imax.ukey(), false) - // Get entire range covered by compaction. - amin, amax := append(t0, t1...).getRange(c.s.icmp) - - // See if we can grow the number of inputs in "level" without - // changing the number of "level+1" files we pick up. - if len(t1) > 0 { - exp0 := vt0.getOverlaps(nil, c.s.icmp, amin.ukey(), amax.ukey(), c.level == 0) - if len(exp0) > len(t0) && t1.size()+exp0.size() < limit { - xmin, xmax := exp0.getRange(c.s.icmp) - exp1 := vt1.getOverlaps(nil, c.s.icmp, xmin.ukey(), xmax.ukey(), false) - if len(exp1) == len(t1) { - c.s.logf("table@compaction expanding L%d+L%d (F·%d S·%s)+(F·%d S·%s) -> (F·%d S·%s)+(F·%d S·%s)", - c.level, c.level+1, len(t0), shortenb(int(t0.size())), len(t1), shortenb(int(t1.size())), - len(exp0), shortenb(int(exp0.size())), len(exp1), shortenb(int(exp1.size()))) - imin, imax = xmin, xmax - t0, t1 = exp0, exp1 - amin, amax = append(t0, t1...).getRange(c.s.icmp) - } - } - } - - // Compute the set of grandparent files that overlap this compaction - // (parent == level+1; grandparent == level+2) - if c.level+2 < c.s.o.GetNumLevel() { - c.gp = c.v.tables[c.level+2].getOverlaps(c.gp, c.s.icmp, amin.ukey(), amax.ukey(), false) - } - - c.tables[0], c.tables[1] = t0, t1 - c.imin, c.imax = imin, imax -} - -// Check whether compaction is trivial. -func (c *compaction) trivial() bool { - return len(c.tables[0]) == 1 && len(c.tables[1]) == 0 && c.gp.size() <= c.maxGPOverlaps -} - -func (c *compaction) baseLevelForKey(ukey []byte) bool { - for level, tables := range c.v.tables[c.level+2:] { - for c.tPtrs[level] < len(tables) { - t := tables[c.tPtrs[level]] - if c.s.icmp.uCompare(ukey, t.imax.ukey()) <= 0 { - // We've advanced far enough. - if c.s.icmp.uCompare(ukey, t.imin.ukey()) >= 0 { - // Key falls in this file's range, so definitely not base level. - return false - } - break - } - c.tPtrs[level]++ - } - } - return true -} - -func (c *compaction) shouldStopBefore(ikey iKey) bool { - for ; c.gpi < len(c.gp); c.gpi++ { - gp := c.gp[c.gpi] - if c.s.icmp.Compare(ikey, gp.imax) <= 0 { - break - } - if c.seenKey { - c.gpOverlappedBytes += gp.size - } - } - c.seenKey = true - - if c.gpOverlappedBytes > c.maxGPOverlaps { - // Too much overlap for current output; start new output. - c.gpOverlappedBytes = 0 - return true - } - return false -} - -// Creates an iterator. -func (c *compaction) newIterator() iterator.Iterator { - // Creates iterator slice. - icap := len(c.tables) - if c.level == 0 { - // Special case for level-0 - icap = len(c.tables[0]) + 1 - } - its := make([]iterator.Iterator, 0, icap) - - // Options. - ro := &opt.ReadOptions{ - DontFillCache: true, - Strict: opt.StrictOverride, - } - strict := c.s.o.GetStrict(opt.StrictCompaction) - if strict { - ro.Strict |= opt.StrictReader - } - - for i, tables := range c.tables { - if len(tables) == 0 { - continue - } - - // Level-0 is not sorted and may overlaps each other. - if c.level+i == 0 { - for _, t := range tables { - its = append(its, c.s.tops.newIterator(t, nil, ro)) - } - } else { - it := iterator.NewIndexedIterator(tables.newIndexIterator(c.s.tops, c.s.icmp, nil, ro), strict) - its = append(its, it) - } - } - - return iterator.NewMergedIterator(its, c.s.icmp, strict) -} diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/session_compaction.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/session_compaction.go new file mode 100644 index 000000000..7c5a79418 --- /dev/null +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/session_compaction.go @@ -0,0 +1,287 @@ +// Copyright (c) 2012, Suryandaru Triandana +// All rights reserved. +// +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. + +package leveldb + +import ( + "sync/atomic" + + "github.com/syndtr/goleveldb/leveldb/iterator" + "github.com/syndtr/goleveldb/leveldb/memdb" + "github.com/syndtr/goleveldb/leveldb/opt" +) + +func (s *session) pickMemdbLevel(umin, umax []byte) int { + v := s.version() + defer v.release() + return v.pickMemdbLevel(umin, umax) +} + +func (s *session) flushMemdb(rec *sessionRecord, mdb *memdb.DB, level int) (level_ int, err error) { + // Create sorted table. + iter := mdb.NewIterator(nil) + defer iter.Release() + t, n, err := s.tops.createFrom(iter) + if err != nil { + return level, err + } + + // Pick level and add to record. + if level < 0 { + level = s.pickMemdbLevel(t.imin.ukey(), t.imax.ukey()) + } + rec.addTableFile(level, t) + + s.logf("memdb@flush created L%d@%d N·%d S·%s %q:%q", level, t.file.Num(), n, shortenb(int(t.size)), t.imin, t.imax) + return level, nil +} + +// Pick a compaction based on current state; need external synchronization. +func (s *session) pickCompaction() *compaction { + v := s.version() + + var level int + var t0 tFiles + if v.cScore >= 1 { + level = v.cLevel + cptr := s.stCompPtrs[level] + tables := v.tables[level] + for _, t := range tables { + if cptr == nil || s.icmp.Compare(t.imax, cptr) > 0 { + t0 = append(t0, t) + break + } + } + if len(t0) == 0 { + t0 = append(t0, tables[0]) + } + } else { + if p := atomic.LoadPointer(&v.cSeek); p != nil { + ts := (*tSet)(p) + level = ts.level + t0 = append(t0, ts.table) + } else { + v.release() + return nil + } + } + + return newCompaction(s, v, level, t0) +} + +// Create compaction from given level and range; need external synchronization. +func (s *session) getCompactionRange(level int, umin, umax []byte) *compaction { + v := s.version() + + t0 := v.tables[level].getOverlaps(nil, s.icmp, umin, umax, level == 0) + if len(t0) == 0 { + v.release() + return nil + } + + // Avoid compacting too much in one shot in case the range is large. + // But we cannot do this for level-0 since level-0 files can overlap + // and we must not pick one file and drop another older file if the + // two files overlap. + if level > 0 { + limit := uint64(v.s.o.GetCompactionSourceLimit(level)) + total := uint64(0) + for i, t := range t0 { + total += t.size + if total >= limit { + s.logf("table@compaction limiting F·%d -> F·%d", len(t0), i+1) + t0 = t0[:i+1] + break + } + } + } + + return newCompaction(s, v, level, t0) +} + +func newCompaction(s *session, v *version, level int, t0 tFiles) *compaction { + c := &compaction{ + s: s, + v: v, + level: level, + tables: [2]tFiles{t0, nil}, + maxGPOverlaps: uint64(s.o.GetCompactionGPOverlaps(level)), + tPtrs: make([]int, s.o.GetNumLevel()), + } + c.expand() + c.save() + return c +} + +// compaction represent a compaction state. +type compaction struct { + s *session + v *version + + level int + tables [2]tFiles + maxGPOverlaps uint64 + + gp tFiles + gpi int + seenKey bool + gpOverlappedBytes uint64 + imin, imax iKey + tPtrs []int + released bool + + snapGPI int + snapSeenKey bool + snapGPOverlappedBytes uint64 + snapTPtrs []int +} + +func (c *compaction) save() { + c.snapGPI = c.gpi + c.snapSeenKey = c.seenKey + c.snapGPOverlappedBytes = c.gpOverlappedBytes + c.snapTPtrs = append(c.snapTPtrs[:0], c.tPtrs...) +} + +func (c *compaction) restore() { + c.gpi = c.snapGPI + c.seenKey = c.snapSeenKey + c.gpOverlappedBytes = c.snapGPOverlappedBytes + c.tPtrs = append(c.tPtrs[:0], c.snapTPtrs...) +} + +func (c *compaction) release() { + if !c.released { + c.released = true + c.v.release() + } +} + +// Expand compacted tables; need external synchronization. +func (c *compaction) expand() { + limit := uint64(c.s.o.GetCompactionExpandLimit(c.level)) + vt0, vt1 := c.v.tables[c.level], c.v.tables[c.level+1] + + t0, t1 := c.tables[0], c.tables[1] + imin, imax := t0.getRange(c.s.icmp) + // We expand t0 here just incase ukey hop across tables. + t0 = vt0.getOverlaps(t0, c.s.icmp, imin.ukey(), imax.ukey(), c.level == 0) + if len(t0) != len(c.tables[0]) { + imin, imax = t0.getRange(c.s.icmp) + } + t1 = vt1.getOverlaps(t1, c.s.icmp, imin.ukey(), imax.ukey(), false) + // Get entire range covered by compaction. + amin, amax := append(t0, t1...).getRange(c.s.icmp) + + // See if we can grow the number of inputs in "level" without + // changing the number of "level+1" files we pick up. + if len(t1) > 0 { + exp0 := vt0.getOverlaps(nil, c.s.icmp, amin.ukey(), amax.ukey(), c.level == 0) + if len(exp0) > len(t0) && t1.size()+exp0.size() < limit { + xmin, xmax := exp0.getRange(c.s.icmp) + exp1 := vt1.getOverlaps(nil, c.s.icmp, xmin.ukey(), xmax.ukey(), false) + if len(exp1) == len(t1) { + c.s.logf("table@compaction expanding L%d+L%d (F·%d S·%s)+(F·%d S·%s) -> (F·%d S·%s)+(F·%d S·%s)", + c.level, c.level+1, len(t0), shortenb(int(t0.size())), len(t1), shortenb(int(t1.size())), + len(exp0), shortenb(int(exp0.size())), len(exp1), shortenb(int(exp1.size()))) + imin, imax = xmin, xmax + t0, t1 = exp0, exp1 + amin, amax = append(t0, t1...).getRange(c.s.icmp) + } + } + } + + // Compute the set of grandparent files that overlap this compaction + // (parent == level+1; grandparent == level+2) + if c.level+2 < c.s.o.GetNumLevel() { + c.gp = c.v.tables[c.level+2].getOverlaps(c.gp, c.s.icmp, amin.ukey(), amax.ukey(), false) + } + + c.tables[0], c.tables[1] = t0, t1 + c.imin, c.imax = imin, imax +} + +// Check whether compaction is trivial. +func (c *compaction) trivial() bool { + return len(c.tables[0]) == 1 && len(c.tables[1]) == 0 && c.gp.size() <= c.maxGPOverlaps +} + +func (c *compaction) baseLevelForKey(ukey []byte) bool { + for level, tables := range c.v.tables[c.level+2:] { + for c.tPtrs[level] < len(tables) { + t := tables[c.tPtrs[level]] + if c.s.icmp.uCompare(ukey, t.imax.ukey()) <= 0 { + // We've advanced far enough. + if c.s.icmp.uCompare(ukey, t.imin.ukey()) >= 0 { + // Key falls in this file's range, so definitely not base level. + return false + } + break + } + c.tPtrs[level]++ + } + } + return true +} + +func (c *compaction) shouldStopBefore(ikey iKey) bool { + for ; c.gpi < len(c.gp); c.gpi++ { + gp := c.gp[c.gpi] + if c.s.icmp.Compare(ikey, gp.imax) <= 0 { + break + } + if c.seenKey { + c.gpOverlappedBytes += gp.size + } + } + c.seenKey = true + + if c.gpOverlappedBytes > c.maxGPOverlaps { + // Too much overlap for current output; start new output. + c.gpOverlappedBytes = 0 + return true + } + return false +} + +// Creates an iterator. +func (c *compaction) newIterator() iterator.Iterator { + // Creates iterator slice. + icap := len(c.tables) + if c.level == 0 { + // Special case for level-0. + icap = len(c.tables[0]) + 1 + } + its := make([]iterator.Iterator, 0, icap) + + // Options. + ro := &opt.ReadOptions{ + DontFillCache: true, + Strict: opt.StrictOverride, + } + strict := c.s.o.GetStrict(opt.StrictCompaction) + if strict { + ro.Strict |= opt.StrictReader + } + + for i, tables := range c.tables { + if len(tables) == 0 { + continue + } + + // Level-0 is not sorted and may overlaps each other. + if c.level+i == 0 { + for _, t := range tables { + its = append(its, c.s.tops.newIterator(t, nil, ro)) + } + } else { + it := iterator.NewIndexedIterator(tables.newIndexIterator(c.s.tops, c.s.icmp, nil, ro), strict) + its = append(its, it) + } + } + + return iterator.NewMergedIterator(its, c.s.icmp, strict) +} diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/session_record.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/session_record.go index 1bdcc68f5..405e07bef 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/session_record.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/session_record.go @@ -52,8 +52,6 @@ type dtRecord struct { } type sessionRecord struct { - numLevel int - hasRec int comparer string journalNum uint64 @@ -230,7 +228,7 @@ func (p *sessionRecord) readBytes(field string, r byteReader) []byte { return x } -func (p *sessionRecord) readLevel(field string, r io.ByteReader) int { +func (p *sessionRecord) readLevel(field string, r io.ByteReader, numLevel int) int { if p.err != nil { return 0 } @@ -238,14 +236,14 @@ func (p *sessionRecord) readLevel(field string, r io.ByteReader) int { if p.err != nil { return 0 } - if x >= uint64(p.numLevel) { + if x >= uint64(numLevel) { p.err = errors.NewErrCorrupted(nil, &ErrManifestCorrupted{field, "invalid level number"}) return 0 } return int(x) } -func (p *sessionRecord) decode(r io.Reader) error { +func (p *sessionRecord) decode(r io.Reader, numLevel int) error { br, ok := r.(byteReader) if !ok { br = bufio.NewReader(r) @@ -286,13 +284,13 @@ func (p *sessionRecord) decode(r io.Reader) error { p.setSeqNum(x) } case recCompPtr: - level := p.readLevel("comp-ptr.level", br) + level := p.readLevel("comp-ptr.level", br, numLevel) ikey := p.readBytes("comp-ptr.ikey", br) if p.err == nil { p.addCompPtr(level, iKey(ikey)) } case recAddTable: - level := p.readLevel("add-table.level", br) + level := p.readLevel("add-table.level", br, numLevel) num := p.readUvarint("add-table.num", br) size := p.readUvarint("add-table.size", br) imin := p.readBytes("add-table.imin", br) @@ -301,7 +299,7 @@ func (p *sessionRecord) decode(r io.Reader) error { p.addTable(level, num, size, imin, imax) } case recDelTable: - level := p.readLevel("del-table.level", br) + level := p.readLevel("del-table.level", br, numLevel) num := p.readUvarint("del-table.num", br) if p.err == nil { p.delTable(level, num) diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/session_record_test.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/session_record_test.go index c0c035ae3..33c148756 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/session_record_test.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/session_record_test.go @@ -19,8 +19,8 @@ func decodeEncode(v *sessionRecord) (res bool, err error) { if err != nil { return } - v2 := &sessionRecord{numLevel: opt.DefaultNumLevel} - err = v.decode(b) + v2 := &sessionRecord{} + err = v.decode(b, opt.DefaultNumLevel) if err != nil { return } @@ -34,7 +34,7 @@ func decodeEncode(v *sessionRecord) (res bool, err error) { func TestSessionRecord_EncodeDecode(t *testing.T) { big := uint64(1) << 50 - v := &sessionRecord{numLevel: opt.DefaultNumLevel} + v := &sessionRecord{} i := uint64(0) test := func() { res, err := decodeEncode(v) diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/session_util.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/session_util.go index 007c02cde..399a788ba 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/session_util.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/session_util.go @@ -182,7 +182,7 @@ func (s *session) newManifest(rec *sessionRecord, v *version) (err error) { defer v.release() } if rec == nil { - rec = &sessionRecord{numLevel: s.o.GetNumLevel()} + rec = &sessionRecord{} } s.fillRecord(rec, true) v.fillRecord(rec) diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/storage_test.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/storage_test.go index dc1f1fb54..08be0bab3 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/storage_test.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/storage_test.go @@ -42,6 +42,8 @@ type tsOp uint const ( tsOpOpen tsOp = iota tsOpCreate + tsOpReplace + tsOpRemove tsOpRead tsOpReadAt tsOpWrite @@ -241,6 +243,10 @@ func (tf tsFile) Replace(newfile storage.File) (err error) { if err != nil { return } + if tf.shouldErr(tsOpReplace) { + err = errors.New("leveldb.testStorage: emulated create error") + return + } err = tf.File.Replace(newfile.(tsFile).File) if err != nil { ts.t.Errorf("E: cannot replace file, num=%d type=%v: %v", tf.Num(), tf.Type(), err) @@ -258,6 +264,10 @@ func (tf tsFile) Remove() (err error) { if err != nil { return } + if tf.shouldErr(tsOpRemove) { + err = errors.New("leveldb.testStorage: emulated create error") + return + } err = tf.File.Remove() if err != nil { ts.t.Errorf("E: cannot remove file, num=%d type=%v: %v", tf.Num(), tf.Type(), err) diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table.go index 3e8df6af5..db386f3b5 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table.go @@ -441,22 +441,26 @@ func newTableOps(s *session) *tOps { var ( cacher cache.Cacher bcache *cache.Cache + bpool *util.BufferPool ) if s.o.GetOpenFilesCacheCapacity() > 0 { cacher = cache.NewLRU(s.o.GetOpenFilesCacheCapacity()) } - if !s.o.DisableBlockCache { + if !s.o.GetDisableBlockCache() { var bcacher cache.Cacher if s.o.GetBlockCacheCapacity() > 0 { bcacher = cache.NewLRU(s.o.GetBlockCacheCapacity()) } bcache = cache.NewCache(bcacher) } + if !s.o.GetDisableBufferPool() { + bpool = util.NewBufferPool(s.o.GetBlockSize() + 5) + } return &tOps{ s: s, cache: cache.NewCache(cacher), bcache: bcache, - bpool: util.NewBufferPool(s.o.GetBlockSize() + 5), + bpool: bpool, } } diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table/reader.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table/reader.go index 6f38e84b3..95c369d14 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table/reader.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table/reader.go @@ -14,7 +14,7 @@ import ( "strings" "sync" - "github.com/syndtr/gosnappy/snappy" + "github.com/google/go-snappy/snappy" "github.com/syndtr/goleveldb/leveldb/cache" "github.com/syndtr/goleveldb/leveldb/comparer" diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table/writer.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table/writer.go index 274c95fad..45819282b 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table/writer.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/table/writer.go @@ -12,7 +12,7 @@ import ( "fmt" "io" - "github.com/syndtr/gosnappy/snappy" + "github.com/google/go-snappy/snappy" "github.com/syndtr/goleveldb/leveldb/comparer" "github.com/syndtr/goleveldb/leveldb/filter" diff --git a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/version.go b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/version.go index 5ab7b53d3..011d982da 100644 --- a/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/version.go +++ b/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/version.go @@ -136,9 +136,8 @@ func (v *version) get(ikey iKey, ro *opt.ReadOptions, noValue bool) (value []byt if !tseek { if tset == nil { tset = &tSet{level, t} - } else if tset.table.consumeSeek() <= 0 { + } else { tseek = true - tcomp = atomic.CompareAndSwapPointer(&v.cSeek, nil, unsafe.Pointer(tset)) } } @@ -203,6 +202,28 @@ func (v *version) get(ikey iKey, ro *opt.ReadOptions, noValue bool) (value []byt return true }) + if tseek && tset.table.consumeSeek() <= 0 { + tcomp = atomic.CompareAndSwapPointer(&v.cSeek, nil, unsafe.Pointer(tset)) + } + + return +} + +func (v *version) sampleSeek(ikey iKey) (tcomp bool) { + var tset *tSet + + v.walkOverlapping(ikey, func(level int, t *tFile) bool { + if tset == nil { + tset = &tSet{level, t} + return true + } else { + if tset.table.consumeSeek() <= 0 { + tcomp = atomic.CompareAndSwapPointer(&v.cSeek, nil, unsafe.Pointer(tset)) + } + return false + } + }, nil) + return } @@ -279,7 +300,7 @@ func (v *version) offsetOf(ikey iKey) (n uint64, err error) { return } -func (v *version) pickLevel(umin, umax []byte) (level int) { +func (v *version) pickMemdbLevel(umin, umax []byte) (level int) { if !v.tables[0].overlaps(v.s.icmp, umin, umax, true) { var overlaps tFiles maxLevel := v.s.o.GetMaxMemCompationLevel()