Dependency update

This commit is contained in:
Jakob Borg 2015-06-15 21:10:18 +02:00
parent 12331cc62b
commit 2d217e72bd
33 changed files with 1014 additions and 579 deletions

24
Godeps/Godeps.json generated
View File

@ -7,7 +7,7 @@
"Deps": [ "Deps": [
{ {
"ImportPath": "github.com/bkaradzic/go-lz4", "ImportPath": "github.com/bkaradzic/go-lz4",
"Rev": "93a831dcee242be64a9cc9803dda84af25932de7" "Rev": "4f7c2045dbd17b802370e2e6022200468abf02ba"
}, },
{ {
"ImportPath": "github.com/calmh/logger", "ImportPath": "github.com/calmh/logger",
@ -21,13 +21,17 @@
"ImportPath": "github.com/calmh/xdr", "ImportPath": "github.com/calmh/xdr",
"Rev": "5f7208e86762911861c94f1849eddbfc0a60cbf0" "Rev": "5f7208e86762911861c94f1849eddbfc0a60cbf0"
}, },
{
"ImportPath": "github.com/google/go-snappy/snappy",
"Rev": "eaa750b9bf4dcb7cb20454be850613b66cda3273"
},
{ {
"ImportPath": "github.com/juju/ratelimit", "ImportPath": "github.com/juju/ratelimit",
"Rev": "c5abe513796336ee2869745bff0638508450e9c5" "Rev": "faa59ce93750e747b2997635e8b7daf30024b1ac"
}, },
{ {
"ImportPath": "github.com/kardianos/osext", "ImportPath": "github.com/kardianos/osext",
"Rev": "efacde03154693404c65e7aa7d461ac9014acd0c" "Rev": "6e7f843663477789fac7c02def0d0909e969b4e5"
}, },
{ {
"ImportPath": "github.com/syncthing/protocol", "ImportPath": "github.com/syncthing/protocol",
@ -35,11 +39,7 @@
}, },
{ {
"ImportPath": "github.com/syndtr/goleveldb/leveldb", "ImportPath": "github.com/syndtr/goleveldb/leveldb",
"Rev": "87e4e645d80ae9c537e8f2dee52b28036a5dd75e" "Rev": "a06509502ca32565bdf74afc1e573050023f261c"
},
{
"ImportPath": "github.com/syndtr/gosnappy/snappy",
"Rev": "156a073208e131d7d2e212cb749feae7c339e846"
}, },
{ {
"ImportPath": "github.com/thejerf/suture", "ImportPath": "github.com/thejerf/suture",
@ -59,19 +59,19 @@
}, },
{ {
"ImportPath": "golang.org/x/crypto/bcrypt", "ImportPath": "golang.org/x/crypto/bcrypt",
"Rev": "c57d4a71915a248dbad846d60825145062b4c18e" "Rev": "1e856cbfdf9bc25eefca75f83f25d55e35ae72e0"
}, },
{ {
"ImportPath": "golang.org/x/crypto/blowfish", "ImportPath": "golang.org/x/crypto/blowfish",
"Rev": "c57d4a71915a248dbad846d60825145062b4c18e" "Rev": "1e856cbfdf9bc25eefca75f83f25d55e35ae72e0"
}, },
{ {
"ImportPath": "golang.org/x/text/transform", "ImportPath": "golang.org/x/text/transform",
"Rev": "2076e9cab4147459c82bc81169e46c139d358547" "Rev": "df923bbb63f8ea3a26bb743e2a497abd0ab585f7"
}, },
{ {
"ImportPath": "golang.org/x/text/unicode/norm", "ImportPath": "golang.org/x/text/unicode/norm",
"Rev": "2076e9cab4147459c82bc81169e46c139d358547" "Rev": "df923bbb63f8ea3a26bb743e2a497abd0ab585f7"
} }
] ]
} }

View File

@ -0,0 +1,23 @@
// +build gofuzz
package lz4
import "encoding/binary"
func Fuzz(data []byte) int {
if len(data) < 4 {
return 0
}
ln := binary.LittleEndian.Uint32(data)
if ln > (1 << 21) {
return 0
}
if _, err := Decode(nil, data); err != nil {
return 0
}
return 1
}

View File

@ -141,7 +141,7 @@ func Decode(dst, src []byte) ([]byte, error) {
length += ln length += ln
} }
if int(d.spos+length) > len(d.src) { if int(d.spos+length) > len(d.src) || int(d.dpos+length) > len(d.dst) {
return nil, ErrCorrupt return nil, ErrCorrupt
} }
@ -179,7 +179,12 @@ func Decode(dst, src []byte) ([]byte, error) {
} }
literal := d.dpos - d.ref literal := d.dpos - d.ref
if literal < 4 { if literal < 4 {
if int(d.dpos+4) > len(d.dst) {
return nil, ErrCorrupt
}
d.cp(4, decr[literal]) d.cp(4, decr[literal])
} else { } else {
length += 4 length += 4

View File

@ -25,8 +25,10 @@
package lz4 package lz4
import "encoding/binary" import (
import "errors" "encoding/binary"
"errors"
)
const ( const (
minMatch = 4 minMatch = 4

View File

@ -1,5 +1,8 @@
This package contains an efficient token-bucket-based rate limiter. All files in this repository are licensed as follows. If you contribute
Copyright (C) 2015 Canonical Ltd. to this repository, it is assumed that you license your contribution
under the same license unless you state otherwise.
All files Copyright (C) 2015 Canonical Ltd. unless otherwise specified in the file.
This software is licensed under the LGPLv3, included below. This software is licensed under the LGPLv3, included below.

View File

@ -7,6 +7,7 @@
package ratelimit package ratelimit
import ( import (
"math"
"strconv" "strconv"
"sync" "sync"
"time" "time"
@ -55,7 +56,7 @@ func NewBucketWithRate(rate float64, capacity int64) *Bucket {
continue continue
} }
tb := NewBucketWithQuantum(fillInterval, capacity, quantum) tb := NewBucketWithQuantum(fillInterval, capacity, quantum)
if diff := abs(tb.Rate() - rate); diff/rate <= rateMargin { if diff := math.Abs(tb.Rate() - rate); diff/rate <= rateMargin {
return tb return tb
} }
} }
@ -217,10 +218,3 @@ func (tb *Bucket) adjust(now time.Time) (currentTick int64) {
tb.availTick = currentTick tb.availTick = currentTick
return return
} }
func abs(f float64) float64 {
if f < 0 {
return -f
}
return f
}

View File

@ -4,7 +4,9 @@
There is sometimes utility in finding the current executable file There is sometimes utility in finding the current executable file
that is running. This can be used for upgrading the current executable that is running. This can be used for upgrading the current executable
or finding resources located relative to the executable file. or finding resources located relative to the executable file. Both
working directory and the os.Args[0] value are arbitrary and cannot
be relied on; os.Args[0] can be "faked".
Multi-platform and supports: Multi-platform and supports:
* Linux * Linux

View File

@ -16,12 +16,12 @@ func Executable() (string, error) {
} }
// Returns same path as Executable, returns just the folder // Returns same path as Executable, returns just the folder
// path. Excludes the executable name. // path. Excludes the executable name and any trailing slash.
func ExecutableFolder() (string, error) { func ExecutableFolder() (string, error) {
p, err := Executable() p, err := Executable()
if err != nil { if err != nil {
return "", err return "", err
} }
folder, _ := filepath.Split(p)
return folder, nil return filepath.Dir(p), nil
} }

View File

@ -17,12 +17,14 @@ import (
func executable() (string, error) { func executable() (string, error) {
switch runtime.GOOS { switch runtime.GOOS {
case "linux": case "linux":
const deletedSuffix = " (deleted)" const deletedTag = " (deleted)"
execpath, err := os.Readlink("/proc/self/exe") execpath, err := os.Readlink("/proc/self/exe")
if err != nil { if err != nil {
return execpath, err return execpath, err
} }
return strings.TrimSuffix(execpath, deletedSuffix), nil execpath = strings.TrimSuffix(execpath, deletedTag)
execpath = strings.TrimPrefix(execpath, deletedTag)
return execpath, nil
case "netbsd": case "netbsd":
return os.Readlink("/proc/curproc/exe") return os.Readlink("/proc/curproc/exe")
case "openbsd", "dragonfly": case "openbsd", "dragonfly":

View File

@ -24,6 +24,29 @@ const (
executableEnvValueDelete = "delete" executableEnvValueDelete = "delete"
) )
func TestPrintExecutable(t *testing.T) {
ef, err := Executable()
if err != nil {
t.Fatalf("Executable failed: %v", err)
}
t.Log("Executable:", ef)
}
func TestPrintExecutableFolder(t *testing.T) {
ef, err := ExecutableFolder()
if err != nil {
t.Fatalf("ExecutableFolder failed: %v", err)
}
t.Log("Executable Folder:", ef)
}
func TestExecutableFolder(t *testing.T) {
ef, err := ExecutableFolder()
if err != nil {
t.Fatalf("ExecutableFolder failed: %v", err)
}
if ef[len(ef)-1] == filepath.Separator {
t.Fatal("ExecutableFolder ends with a trailing slash.")
}
}
func TestExecutableMatch(t *testing.T) { func TestExecutableMatch(t *testing.T) {
ep, err := Executable() ep, err := Executable()
if err != nil { if err != nil {

View File

@ -63,13 +63,14 @@ type DB struct {
journalAckC chan error journalAckC chan error
// Compaction. // Compaction.
tcompCmdC chan cCmd tcompCmdC chan cCmd
tcompPauseC chan chan<- struct{} tcompPauseC chan chan<- struct{}
mcompCmdC chan cCmd mcompCmdC chan cCmd
compErrC chan error compErrC chan error
compPerErrC chan error compPerErrC chan error
compErrSetC chan error compErrSetC chan error
compStats []cStats compWriteLocking bool
compStats []cStats
// Close. // Close.
closeW sync.WaitGroup closeW sync.WaitGroup
@ -108,28 +109,44 @@ func openDB(s *session) (*DB, error) {
closeC: make(chan struct{}), closeC: make(chan struct{}),
} }
if err := db.recoverJournal(); err != nil { // Read-only mode.
return nil, err readOnly := s.o.GetReadOnly()
}
// Remove any obsolete files. if readOnly {
if err := db.checkAndCleanFiles(); err != nil { // Recover journals (read-only mode).
// Close journal. if err := db.recoverJournalRO(); err != nil {
if db.journal != nil { return nil, err
db.journal.Close()
db.journalWriter.Close()
} }
return nil, err } else {
// Recover journals.
if err := db.recoverJournal(); err != nil {
return nil, err
}
// Remove any obsolete files.
if err := db.checkAndCleanFiles(); err != nil {
// Close journal.
if db.journal != nil {
db.journal.Close()
db.journalWriter.Close()
}
return nil, err
}
} }
// Doesn't need to be included in the wait group. // Doesn't need to be included in the wait group.
go db.compactionError() go db.compactionError()
go db.mpoolDrain() go db.mpoolDrain()
db.closeW.Add(3) if readOnly {
go db.tCompaction() db.SetReadOnly()
go db.mCompaction() } else {
go db.jWriter() db.closeW.Add(3)
go db.tCompaction()
go db.mCompaction()
go db.jWriter()
}
s.logf("db@open done T·%v", time.Since(start)) s.logf("db@open done T·%v", time.Since(start))
@ -275,7 +292,7 @@ func recoverTable(s *session, o *opt.Options) error {
// We will drop corrupted table. // We will drop corrupted table.
strict = o.GetStrict(opt.StrictRecovery) strict = o.GetStrict(opt.StrictRecovery)
rec = &sessionRecord{numLevel: o.GetNumLevel()} rec = &sessionRecord{}
bpool = util.NewBufferPool(o.GetBlockSize() + 5) bpool = util.NewBufferPool(o.GetBlockSize() + 5)
) )
buildTable := func(iter iterator.Iterator) (tmp storage.File, size int64, err error) { buildTable := func(iter iterator.Iterator) (tmp storage.File, size int64, err error) {
@ -450,132 +467,136 @@ func recoverTable(s *session, o *opt.Options) error {
} }
func (db *DB) recoverJournal() error { func (db *DB) recoverJournal() error {
// Get all tables and sort it by file number. // Get all journals and sort it by file number.
journalFiles_, err := db.s.getFiles(storage.TypeJournal) allJournalFiles, err := db.s.getFiles(storage.TypeJournal)
if err != nil { if err != nil {
return err return err
} }
journalFiles := files(journalFiles_) files(allJournalFiles).sort()
journalFiles.sort()
// Discard older journal. // Journals that will be recovered.
prev := -1 var recJournalFiles []storage.File
for i, file := range journalFiles { for _, jf := range allJournalFiles {
if file.Num() >= db.s.stJournalNum { if jf.Num() >= db.s.stJournalNum || jf.Num() == db.s.stPrevJournalNum {
if prev >= 0 { recJournalFiles = append(recJournalFiles, jf)
i--
journalFiles[i] = journalFiles[prev]
}
journalFiles = journalFiles[i:]
break
} else if file.Num() == db.s.stPrevJournalNum {
prev = i
} }
} }
var jr *journal.Reader var (
var of storage.File of storage.File // Obsolete file.
var mem *memdb.DB rec = &sessionRecord{}
batch := new(Batch) )
cm := newCMem(db.s)
buf := new(util.Buffer)
// Options.
strict := db.s.o.GetStrict(opt.StrictJournal)
checksum := db.s.o.GetStrict(opt.StrictJournalChecksum)
writeBuffer := db.s.o.GetWriteBuffer()
recoverJournal := func(file storage.File) error {
db.logf("journal@recovery recovering @%d", file.Num())
reader, err := file.Open()
if err != nil {
return err
}
defer reader.Close()
// Create/reset journal reader instance. // Recover journals.
if jr == nil { if len(recJournalFiles) > 0 {
jr = journal.NewReader(reader, dropper{db.s, file}, strict, checksum) db.logf("journal@recovery F·%d", len(recJournalFiles))
} else {
jr.Reset(reader, dropper{db.s, file}, strict, checksum)
}
// Flush memdb and remove obsolete journal file.
if of != nil {
if mem.Len() > 0 {
if err := cm.flush(mem, 0); err != nil {
return err
}
}
if err := cm.commit(file.Num(), db.seq); err != nil {
return err
}
cm.reset()
of.Remove()
of = nil
}
// Replay journal to memdb.
mem.Reset()
for {
r, err := jr.Next()
if err != nil {
if err == io.EOF {
break
}
return errors.SetFile(err, file)
}
buf.Reset()
if _, err := buf.ReadFrom(r); err != nil {
if err == io.ErrUnexpectedEOF {
// This is error returned due to corruption, with strict == false.
continue
} else {
return errors.SetFile(err, file)
}
}
if err := batch.memDecodeAndReplay(db.seq, buf.Bytes(), mem); err != nil {
if strict || !errors.IsCorrupted(err) {
return errors.SetFile(err, file)
} else {
db.s.logf("journal error: %v (skipped)", err)
// We won't apply sequence number as it might be corrupted.
continue
}
}
// Save sequence number.
db.seq = batch.seq + uint64(batch.Len())
// Flush it if large enough.
if mem.Size() >= writeBuffer {
if err := cm.flush(mem, 0); err != nil {
return err
}
mem.Reset()
}
}
of = file
return nil
}
// Recover all journals.
if len(journalFiles) > 0 {
db.logf("journal@recovery F·%d", len(journalFiles))
// Mark file number as used. // Mark file number as used.
db.s.markFileNum(journalFiles[len(journalFiles)-1].Num()) db.s.markFileNum(recJournalFiles[len(recJournalFiles)-1].Num())
mem = memdb.New(db.s.icmp, writeBuffer) var (
for _, file := range journalFiles { // Options.
if err := recoverJournal(file); err != nil { strict = db.s.o.GetStrict(opt.StrictJournal)
checksum = db.s.o.GetStrict(opt.StrictJournalChecksum)
writeBuffer = db.s.o.GetWriteBuffer()
jr *journal.Reader
mdb = memdb.New(db.s.icmp, writeBuffer)
buf = &util.Buffer{}
batch = &Batch{}
)
for _, jf := range recJournalFiles {
db.logf("journal@recovery recovering @%d", jf.Num())
fr, err := jf.Open()
if err != nil {
return err return err
} }
// Create or reset journal reader instance.
if jr == nil {
jr = journal.NewReader(fr, dropper{db.s, jf}, strict, checksum)
} else {
jr.Reset(fr, dropper{db.s, jf}, strict, checksum)
}
// Flush memdb and remove obsolete journal file.
if of != nil {
if mdb.Len() > 0 {
if _, err := db.s.flushMemdb(rec, mdb, -1); err != nil {
fr.Close()
return err
}
}
rec.setJournalNum(jf.Num())
rec.setSeqNum(db.seq)
if err := db.s.commit(rec); err != nil {
fr.Close()
return err
}
rec.resetAddedTables()
of.Remove()
of = nil
}
// Replay journal to memdb.
mdb.Reset()
for {
r, err := jr.Next()
if err != nil {
if err == io.EOF {
break
}
fr.Close()
return errors.SetFile(err, jf)
}
buf.Reset()
if _, err := buf.ReadFrom(r); err != nil {
if err == io.ErrUnexpectedEOF {
// This is error returned due to corruption, with strict == false.
continue
}
fr.Close()
return errors.SetFile(err, jf)
}
if err := batch.memDecodeAndReplay(db.seq, buf.Bytes(), mdb); err != nil {
if !strict && errors.IsCorrupted(err) {
db.s.logf("journal error: %v (skipped)", err)
// We won't apply sequence number as it might be corrupted.
continue
}
fr.Close()
return errors.SetFile(err, jf)
}
// Save sequence number.
db.seq = batch.seq + uint64(batch.Len())
// Flush it if large enough.
if mdb.Size() >= writeBuffer {
if _, err := db.s.flushMemdb(rec, mdb, 0); err != nil {
fr.Close()
return err
}
mdb.Reset()
}
}
fr.Close()
of = jf
} }
// Flush the last journal. // Flush the last memdb.
if mem.Len() > 0 { if mdb.Len() > 0 {
if err := cm.flush(mem, 0); err != nil { if _, err := db.s.flushMemdb(rec, mdb, 0); err != nil {
return err return err
} }
} }
@ -587,8 +608,10 @@ func (db *DB) recoverJournal() error {
} }
// Commit. // Commit.
if err := cm.commit(db.journalFile.Num(), db.seq); err != nil { rec.setJournalNum(db.journalFile.Num())
// Close journal. rec.setSeqNum(db.seq)
if err := db.s.commit(rec); err != nil {
// Close journal on error.
if db.journal != nil { if db.journal != nil {
db.journal.Close() db.journal.Close()
db.journalWriter.Close() db.journalWriter.Close()
@ -604,6 +627,103 @@ func (db *DB) recoverJournal() error {
return nil return nil
} }
func (db *DB) recoverJournalRO() error {
// Get all journals and sort it by file number.
allJournalFiles, err := db.s.getFiles(storage.TypeJournal)
if err != nil {
return err
}
files(allJournalFiles).sort()
// Journals that will be recovered.
var recJournalFiles []storage.File
for _, jf := range allJournalFiles {
if jf.Num() >= db.s.stJournalNum || jf.Num() == db.s.stPrevJournalNum {
recJournalFiles = append(recJournalFiles, jf)
}
}
var (
// Options.
strict = db.s.o.GetStrict(opt.StrictJournal)
checksum = db.s.o.GetStrict(opt.StrictJournalChecksum)
writeBuffer = db.s.o.GetWriteBuffer()
mdb = memdb.New(db.s.icmp, writeBuffer)
)
// Recover journals.
if len(recJournalFiles) > 0 {
db.logf("journal@recovery RO·Mode F·%d", len(recJournalFiles))
var (
jr *journal.Reader
buf = &util.Buffer{}
batch = &Batch{}
)
for _, jf := range recJournalFiles {
db.logf("journal@recovery recovering @%d", jf.Num())
fr, err := jf.Open()
if err != nil {
return err
}
// Create or reset journal reader instance.
if jr == nil {
jr = journal.NewReader(fr, dropper{db.s, jf}, strict, checksum)
} else {
jr.Reset(fr, dropper{db.s, jf}, strict, checksum)
}
// Replay journal to memdb.
for {
r, err := jr.Next()
if err != nil {
if err == io.EOF {
break
}
fr.Close()
return errors.SetFile(err, jf)
}
buf.Reset()
if _, err := buf.ReadFrom(r); err != nil {
if err == io.ErrUnexpectedEOF {
// This is error returned due to corruption, with strict == false.
continue
}
fr.Close()
return errors.SetFile(err, jf)
}
if err := batch.memDecodeAndReplay(db.seq, buf.Bytes(), mdb); err != nil {
if !strict && errors.IsCorrupted(err) {
db.s.logf("journal error: %v (skipped)", err)
// We won't apply sequence number as it might be corrupted.
continue
}
fr.Close()
return errors.SetFile(err, jf)
}
// Save sequence number.
db.seq = batch.seq + uint64(batch.Len())
}
fr.Close()
}
}
// Set memDB.
db.mem = &memDB{db: db, DB: mdb, ref: 1}
return nil
}
func (db *DB) get(key []byte, seq uint64, ro *opt.ReadOptions) (value []byte, err error) { func (db *DB) get(key []byte, seq uint64, ro *opt.ReadOptions) (value []byte, err error) {
ikey := newIkey(key, seq, ktSeek) ikey := newIkey(key, seq, ktSeek)
@ -614,7 +734,7 @@ func (db *DB) get(key []byte, seq uint64, ro *opt.ReadOptions) (value []byte, er
} }
defer m.decref() defer m.decref()
mk, mv, me := m.mdb.Find(ikey) mk, mv, me := m.Find(ikey)
if me == nil { if me == nil {
ukey, _, kt, kerr := parseIkey(mk) ukey, _, kt, kerr := parseIkey(mk)
if kerr != nil { if kerr != nil {
@ -652,7 +772,7 @@ func (db *DB) has(key []byte, seq uint64, ro *opt.ReadOptions) (ret bool, err er
} }
defer m.decref() defer m.decref()
mk, _, me := m.mdb.Find(ikey) mk, _, me := m.Find(ikey)
if me == nil { if me == nil {
ukey, _, kt, kerr := parseIkey(mk) ukey, _, kt, kerr := parseIkey(mk)
if kerr != nil { if kerr != nil {
@ -784,7 +904,7 @@ func (db *DB) GetProperty(name string) (value string, err error) {
const prefix = "leveldb." const prefix = "leveldb."
if !strings.HasPrefix(name, prefix) { if !strings.HasPrefix(name, prefix) {
return "", errors.New("leveldb: GetProperty: unknown property: " + name) return "", ErrNotFound
} }
p := name[len(prefix):] p := name[len(prefix):]
@ -798,7 +918,7 @@ func (db *DB) GetProperty(name string) (value string, err error) {
var rest string var rest string
n, _ := fmt.Sscanf(p[len(numFilesPrefix):], "%d%s", &level, &rest) n, _ := fmt.Sscanf(p[len(numFilesPrefix):], "%d%s", &level, &rest)
if n != 1 || int(level) >= db.s.o.GetNumLevel() { if n != 1 || int(level) >= db.s.o.GetNumLevel() {
err = errors.New("leveldb: GetProperty: invalid property: " + name) err = ErrNotFound
} else { } else {
value = fmt.Sprint(v.tLen(int(level))) value = fmt.Sprint(v.tLen(int(level)))
} }
@ -837,7 +957,7 @@ func (db *DB) GetProperty(name string) (value string, err error) {
case p == "aliveiters": case p == "aliveiters":
value = fmt.Sprintf("%d", atomic.LoadInt32(&db.aliveIters)) value = fmt.Sprintf("%d", atomic.LoadInt32(&db.aliveIters))
default: default:
err = errors.New("leveldb: GetProperty: unknown property: " + name) err = ErrNotFound
} }
return return
@ -900,6 +1020,9 @@ func (db *DB) Close() error {
var err error var err error
select { select {
case err = <-db.compErrC: case err = <-db.compErrC:
if err == ErrReadOnly {
err = nil
}
default: default:
} }

View File

@ -11,7 +11,6 @@ import (
"time" "time"
"github.com/syndtr/goleveldb/leveldb/errors" "github.com/syndtr/goleveldb/leveldb/errors"
"github.com/syndtr/goleveldb/leveldb/memdb"
"github.com/syndtr/goleveldb/leveldb/opt" "github.com/syndtr/goleveldb/leveldb/opt"
) )
@ -62,58 +61,8 @@ func (p *cStatsStaging) stopTimer() {
} }
} }
type cMem struct {
s *session
level int
rec *sessionRecord
}
func newCMem(s *session) *cMem {
return &cMem{s: s, rec: &sessionRecord{numLevel: s.o.GetNumLevel()}}
}
func (c *cMem) flush(mem *memdb.DB, level int) error {
s := c.s
// Write memdb to table.
iter := mem.NewIterator(nil)
defer iter.Release()
t, n, err := s.tops.createFrom(iter)
if err != nil {
return err
}
// Pick level.
if level < 0 {
v := s.version()
level = v.pickLevel(t.imin.ukey(), t.imax.ukey())
v.release()
}
c.rec.addTableFile(level, t)
s.logf("mem@flush created L%d@%d N·%d S·%s %q:%q", level, t.file.Num(), n, shortenb(int(t.size)), t.imin, t.imax)
c.level = level
return nil
}
func (c *cMem) reset() {
c.rec = &sessionRecord{numLevel: c.s.o.GetNumLevel()}
}
func (c *cMem) commit(journal, seq uint64) error {
c.rec.setJournalNum(journal)
c.rec.setSeqNum(seq)
// Commit changes.
return c.s.commit(c.rec)
}
func (db *DB) compactionError() { func (db *DB) compactionError() {
var ( var err error
err error
wlocked bool
)
noerr: noerr:
// No error. // No error.
for { for {
@ -121,7 +70,7 @@ noerr:
case err = <-db.compErrSetC: case err = <-db.compErrSetC:
switch { switch {
case err == nil: case err == nil:
case errors.IsCorrupted(err): case err == ErrReadOnly, errors.IsCorrupted(err):
goto hasperr goto hasperr
default: default:
goto haserr goto haserr
@ -139,7 +88,7 @@ haserr:
switch { switch {
case err == nil: case err == nil:
goto noerr goto noerr
case errors.IsCorrupted(err): case err == ErrReadOnly, errors.IsCorrupted(err):
goto hasperr goto hasperr
default: default:
} }
@ -155,9 +104,9 @@ hasperr:
case db.compPerErrC <- err: case db.compPerErrC <- err:
case db.writeLockC <- struct{}{}: case db.writeLockC <- struct{}{}:
// Hold write lock, so that write won't pass-through. // Hold write lock, so that write won't pass-through.
wlocked = true db.compWriteLocking = true
case _, _ = <-db.closeC: case _, _ = <-db.closeC:
if wlocked { if db.compWriteLocking {
// We should release the lock or Close will hang. // We should release the lock or Close will hang.
<-db.writeLockC <-db.writeLockC
} }
@ -287,21 +236,18 @@ func (db *DB) compactionExitTransact() {
} }
func (db *DB) memCompaction() { func (db *DB) memCompaction() {
mem := db.getFrozenMem() mdb := db.getFrozenMem()
if mem == nil { if mdb == nil {
return return
} }
defer mem.decref() defer mdb.decref()
c := newCMem(db.s) db.logf("memdb@flush N·%d S·%s", mdb.Len(), shortenb(mdb.Size()))
stats := new(cStatsStaging)
db.logf("mem@flush N·%d S·%s", mem.mdb.Len(), shortenb(mem.mdb.Size()))
// Don't compact empty memdb. // Don't compact empty memdb.
if mem.mdb.Len() == 0 { if mdb.Len() == 0 {
db.logf("mem@flush skipping") db.logf("memdb@flush skipping")
// drop frozen mem // drop frozen memdb
db.dropFrozenMem() db.dropFrozenMem()
return return
} }
@ -317,13 +263,20 @@ func (db *DB) memCompaction() {
return return
} }
db.compactionTransactFunc("mem@flush", func(cnt *compactionTransactCounter) (err error) { var (
rec = &sessionRecord{}
stats = &cStatsStaging{}
flushLevel int
)
db.compactionTransactFunc("memdb@flush", func(cnt *compactionTransactCounter) (err error) {
stats.startTimer() stats.startTimer()
defer stats.stopTimer() flushLevel, err = db.s.flushMemdb(rec, mdb.DB, -1)
return c.flush(mem.mdb, -1) stats.stopTimer()
return
}, func() error { }, func() error {
for _, r := range c.rec.addedTables { for _, r := range rec.addedTables {
db.logf("mem@flush revert @%d", r.num) db.logf("memdb@flush revert @%d", r.num)
f := db.s.getTableFile(r.num) f := db.s.getTableFile(r.num)
if err := f.Remove(); err != nil { if err := f.Remove(); err != nil {
return err return err
@ -332,20 +285,23 @@ func (db *DB) memCompaction() {
return nil return nil
}) })
db.compactionTransactFunc("mem@commit", func(cnt *compactionTransactCounter) (err error) { db.compactionTransactFunc("memdb@commit", func(cnt *compactionTransactCounter) (err error) {
stats.startTimer() stats.startTimer()
defer stats.stopTimer() rec.setJournalNum(db.journalFile.Num())
return c.commit(db.journalFile.Num(), db.frozenSeq) rec.setSeqNum(db.frozenSeq)
err = db.s.commit(rec)
stats.stopTimer()
return
}, nil) }, nil)
db.logf("mem@flush committed F·%d T·%v", len(c.rec.addedTables), stats.duration) db.logf("memdb@flush committed F·%d T·%v", len(rec.addedTables), stats.duration)
for _, r := range c.rec.addedTables { for _, r := range rec.addedTables {
stats.write += r.size stats.write += r.size
} }
db.compStats[c.level].add(stats) db.compStats[flushLevel].add(stats)
// Drop frozen mem. // Drop frozen memdb.
db.dropFrozenMem() db.dropFrozenMem()
// Resume table compaction. // Resume table compaction.
@ -557,7 +513,7 @@ func (b *tableCompactionBuilder) revert() error {
func (db *DB) tableCompaction(c *compaction, noTrivial bool) { func (db *DB) tableCompaction(c *compaction, noTrivial bool) {
defer c.release() defer c.release()
rec := &sessionRecord{numLevel: db.s.o.GetNumLevel()} rec := &sessionRecord{}
rec.addCompPtr(c.level, c.imax) rec.addCompPtr(c.level, c.imax)
if !noTrivial && c.trivial() { if !noTrivial && c.trivial() {

View File

@ -8,6 +8,7 @@ package leveldb
import ( import (
"errors" "errors"
"math/rand"
"runtime" "runtime"
"sync" "sync"
"sync/atomic" "sync/atomic"
@ -39,11 +40,11 @@ func (db *DB) newRawIterator(slice *util.Range, ro *opt.ReadOptions) iterator.It
ti := v.getIterators(slice, ro) ti := v.getIterators(slice, ro)
n := len(ti) + 2 n := len(ti) + 2
i := make([]iterator.Iterator, 0, n) i := make([]iterator.Iterator, 0, n)
emi := em.mdb.NewIterator(slice) emi := em.NewIterator(slice)
emi.SetReleaser(&memdbReleaser{m: em}) emi.SetReleaser(&memdbReleaser{m: em})
i = append(i, emi) i = append(i, emi)
if fm != nil { if fm != nil {
fmi := fm.mdb.NewIterator(slice) fmi := fm.NewIterator(slice)
fmi.SetReleaser(&memdbReleaser{m: fm}) fmi.SetReleaser(&memdbReleaser{m: fm})
i = append(i, fmi) i = append(i, fmi)
} }
@ -80,6 +81,10 @@ func (db *DB) newIterator(seq uint64, slice *util.Range, ro *opt.ReadOptions) *d
return iter return iter
} }
func (db *DB) iterSamplingRate() int {
return rand.Intn(2 * db.s.o.GetIteratorSamplingRate())
}
type dir int type dir int
const ( const (
@ -98,11 +103,21 @@ type dbIter struct {
seq uint64 seq uint64
strict bool strict bool
dir dir smaplingGap int
key []byte dir dir
value []byte key []byte
err error value []byte
releaser util.Releaser err error
releaser util.Releaser
}
func (i *dbIter) sampleSeek() {
ikey := i.iter.Key()
i.smaplingGap -= len(ikey) + len(i.iter.Value())
for i.smaplingGap < 0 {
i.smaplingGap += i.db.iterSamplingRate()
i.db.sampleSeek(ikey)
}
} }
func (i *dbIter) setErr(err error) { func (i *dbIter) setErr(err error) {
@ -175,6 +190,7 @@ func (i *dbIter) Seek(key []byte) bool {
func (i *dbIter) next() bool { func (i *dbIter) next() bool {
for { for {
if ukey, seq, kt, kerr := parseIkey(i.iter.Key()); kerr == nil { if ukey, seq, kt, kerr := parseIkey(i.iter.Key()); kerr == nil {
i.sampleSeek()
if seq <= i.seq { if seq <= i.seq {
switch kt { switch kt {
case ktDel: case ktDel:
@ -225,6 +241,7 @@ func (i *dbIter) prev() bool {
if i.iter.Valid() { if i.iter.Valid() {
for { for {
if ukey, seq, kt, kerr := parseIkey(i.iter.Key()); kerr == nil { if ukey, seq, kt, kerr := parseIkey(i.iter.Key()); kerr == nil {
i.sampleSeek()
if seq <= i.seq { if seq <= i.seq {
if !del && i.icmp.uCompare(ukey, i.key) < 0 { if !del && i.icmp.uCompare(ukey, i.key) < 0 {
return true return true
@ -266,6 +283,7 @@ func (i *dbIter) Prev() bool {
case dirForward: case dirForward:
for i.iter.Prev() { for i.iter.Prev() {
if ukey, _, _, kerr := parseIkey(i.iter.Key()); kerr == nil { if ukey, _, _, kerr := parseIkey(i.iter.Key()); kerr == nil {
i.sampleSeek()
if i.icmp.uCompare(ukey, i.key) < 0 { if i.icmp.uCompare(ukey, i.key) < 0 {
goto cont goto cont
} }

View File

@ -15,8 +15,8 @@ import (
) )
type memDB struct { type memDB struct {
db *DB db *DB
mdb *memdb.DB *memdb.DB
ref int32 ref int32
} }
@ -27,12 +27,12 @@ func (m *memDB) incref() {
func (m *memDB) decref() { func (m *memDB) decref() {
if ref := atomic.AddInt32(&m.ref, -1); ref == 0 { if ref := atomic.AddInt32(&m.ref, -1); ref == 0 {
// Only put back memdb with std capacity. // Only put back memdb with std capacity.
if m.mdb.Capacity() == m.db.s.o.GetWriteBuffer() { if m.Capacity() == m.db.s.o.GetWriteBuffer() {
m.mdb.Reset() m.Reset()
m.db.mpoolPut(m.mdb) m.db.mpoolPut(m.DB)
} }
m.db = nil m.db = nil
m.mdb = nil m.DB = nil
} else if ref < 0 { } else if ref < 0 {
panic("negative memdb ref") panic("negative memdb ref")
} }
@ -48,6 +48,15 @@ func (db *DB) addSeq(delta uint64) {
atomic.AddUint64(&db.seq, delta) atomic.AddUint64(&db.seq, delta)
} }
func (db *DB) sampleSeek(ikey iKey) {
v := db.s.version()
if v.sampleSeek(ikey) {
// Trigger table compaction.
db.compSendTrigger(db.tcompCmdC)
}
v.release()
}
func (db *DB) mpoolPut(mem *memdb.DB) { func (db *DB) mpoolPut(mem *memdb.DB) {
defer func() { defer func() {
recover() recover()
@ -117,7 +126,7 @@ func (db *DB) newMem(n int) (mem *memDB, err error) {
} }
mem = &memDB{ mem = &memDB{
db: db, db: db,
mdb: mdb, DB: mdb,
ref: 2, ref: 2,
} }
db.mem = mem db.mem = mem

View File

@ -405,19 +405,21 @@ func (h *dbHarness) compactRange(min, max string) {
t.Log("DB range compaction done") t.Log("DB range compaction done")
} }
func (h *dbHarness) sizeAssert(start, limit string, low, hi uint64) { func (h *dbHarness) sizeOf(start, limit string) uint64 {
t := h.t sz, err := h.db.SizeOf([]util.Range{
db := h.db
s, err := db.SizeOf([]util.Range{
{[]byte(start), []byte(limit)}, {[]byte(start), []byte(limit)},
}) })
if err != nil { if err != nil {
t.Error("SizeOf: got error: ", err) h.t.Error("SizeOf: got error: ", err)
} }
if s.Sum() < low || s.Sum() > hi { return sz.Sum()
t.Errorf("sizeof %q to %q not in range, want %d - %d, got %d", }
shorten(start), shorten(limit), low, hi, s.Sum())
func (h *dbHarness) sizeAssert(start, limit string, low, hi uint64) {
sz := h.sizeOf(start, limit)
if sz < low || sz > hi {
h.t.Errorf("sizeOf %q to %q not in range, want %d - %d, got %d",
shorten(start), shorten(limit), low, hi, sz)
} }
} }
@ -2443,7 +2445,7 @@ func TestDB_TableCompactionBuilder(t *testing.T) {
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
rec := &sessionRecord{numLevel: s.o.GetNumLevel()} rec := &sessionRecord{}
rec.addTableFile(i, tf) rec.addTableFile(i, tf)
if err := s.commit(rec); err != nil { if err := s.commit(rec); err != nil {
t.Fatal(err) t.Fatal(err)
@ -2453,7 +2455,7 @@ func TestDB_TableCompactionBuilder(t *testing.T) {
// Build grandparent. // Build grandparent.
v := s.version() v := s.version()
c := newCompaction(s, v, 1, append(tFiles{}, v.tables[1]...)) c := newCompaction(s, v, 1, append(tFiles{}, v.tables[1]...))
rec := &sessionRecord{numLevel: s.o.GetNumLevel()} rec := &sessionRecord{}
b := &tableCompactionBuilder{ b := &tableCompactionBuilder{
s: s, s: s,
c: c, c: c,
@ -2477,7 +2479,7 @@ func TestDB_TableCompactionBuilder(t *testing.T) {
// Build level-1. // Build level-1.
v = s.version() v = s.version()
c = newCompaction(s, v, 0, append(tFiles{}, v.tables[0]...)) c = newCompaction(s, v, 0, append(tFiles{}, v.tables[0]...))
rec = &sessionRecord{numLevel: s.o.GetNumLevel()} rec = &sessionRecord{}
b = &tableCompactionBuilder{ b = &tableCompactionBuilder{
s: s, s: s,
c: c, c: c,
@ -2521,7 +2523,7 @@ func TestDB_TableCompactionBuilder(t *testing.T) {
// Compaction with transient error. // Compaction with transient error.
v = s.version() v = s.version()
c = newCompaction(s, v, 1, append(tFiles{}, v.tables[1]...)) c = newCompaction(s, v, 1, append(tFiles{}, v.tables[1]...))
rec = &sessionRecord{numLevel: s.o.GetNumLevel()} rec = &sessionRecord{}
b = &tableCompactionBuilder{ b = &tableCompactionBuilder{
s: s, s: s,
c: c, c: c,
@ -2577,3 +2579,123 @@ func TestDB_TableCompactionBuilder(t *testing.T) {
} }
v.release() v.release()
} }
func testDB_IterTriggeredCompaction(t *testing.T, limitDiv int) {
const (
vSize = 200 * opt.KiB
tSize = 100 * opt.MiB
mIter = 100
n = tSize / vSize
)
h := newDbHarnessWopt(t, &opt.Options{
Compression: opt.NoCompression,
DisableBlockCache: true,
})
defer h.close()
key := func(x int) string {
return fmt.Sprintf("v%06d", x)
}
// Fill.
value := strings.Repeat("x", vSize)
for i := 0; i < n; i++ {
h.put(key(i), value)
}
h.compactMem()
// Delete all.
for i := 0; i < n; i++ {
h.delete(key(i))
}
h.compactMem()
var (
limit = n / limitDiv
startKey = key(0)
limitKey = key(limit)
maxKey = key(n)
slice = &util.Range{Limit: []byte(limitKey)}
initialSize0 = h.sizeOf(startKey, limitKey)
initialSize1 = h.sizeOf(limitKey, maxKey)
)
t.Logf("inital size %s [rest %s]", shortenb(int(initialSize0)), shortenb(int(initialSize1)))
for r := 0; true; r++ {
if r >= mIter {
t.Fatal("taking too long to compact")
}
// Iterates.
iter := h.db.NewIterator(slice, h.ro)
for iter.Next() {
}
if err := iter.Error(); err != nil {
t.Fatalf("Iter err: %v", err)
}
iter.Release()
// Wait compaction.
h.waitCompaction()
// Check size.
size0 := h.sizeOf(startKey, limitKey)
size1 := h.sizeOf(limitKey, maxKey)
t.Logf("#%03d size %s [rest %s]", r, shortenb(int(size0)), shortenb(int(size1)))
if size0 < initialSize0/10 {
break
}
}
if initialSize1 > 0 {
h.sizeAssert(limitKey, maxKey, initialSize1/4-opt.MiB, initialSize1+opt.MiB)
}
}
func TestDB_IterTriggeredCompaction(t *testing.T) {
testDB_IterTriggeredCompaction(t, 1)
}
func TestDB_IterTriggeredCompactionHalf(t *testing.T) {
testDB_IterTriggeredCompaction(t, 2)
}
func TestDB_ReadOnly(t *testing.T) {
h := newDbHarness(t)
defer h.close()
h.put("foo", "v1")
h.put("bar", "v2")
h.compactMem()
h.put("xfoo", "v1")
h.put("xbar", "v2")
t.Log("Trigger read-only")
if err := h.db.SetReadOnly(); err != nil {
h.close()
t.Fatalf("SetReadOnly error: %v", err)
}
h.stor.SetEmuErr(storage.TypeAll, tsOpCreate, tsOpReplace, tsOpRemove, tsOpWrite, tsOpWrite, tsOpSync)
ro := func(key, value, wantValue string) {
if err := h.db.Put([]byte(key), []byte(value), h.wo); err != ErrReadOnly {
t.Fatalf("unexpected error: %v", err)
}
h.getVal(key, wantValue)
}
ro("foo", "vx", "v1")
h.o.ReadOnly = true
h.reopenDB()
ro("foo", "vx", "v1")
ro("bar", "vx", "v2")
h.assertNumKeys(4)
}

View File

@ -63,24 +63,24 @@ func (db *DB) rotateMem(n int) (mem *memDB, err error) {
return return
} }
func (db *DB) flush(n int) (mem *memDB, nn int, err error) { func (db *DB) flush(n int) (mdb *memDB, mdbFree int, err error) {
delayed := false delayed := false
flush := func() (retry bool) { flush := func() (retry bool) {
v := db.s.version() v := db.s.version()
defer v.release() defer v.release()
mem = db.getEffectiveMem() mdb = db.getEffectiveMem()
defer func() { defer func() {
if retry { if retry {
mem.decref() mdb.decref()
mem = nil mdb = nil
} }
}() }()
nn = mem.mdb.Free() mdbFree = mdb.Free()
switch { switch {
case v.tLen(0) >= db.s.o.GetWriteL0SlowdownTrigger() && !delayed: case v.tLen(0) >= db.s.o.GetWriteL0SlowdownTrigger() && !delayed:
delayed = true delayed = true
time.Sleep(time.Millisecond) time.Sleep(time.Millisecond)
case nn >= n: case mdbFree >= n:
return false return false
case v.tLen(0) >= db.s.o.GetWriteL0PauseTrigger(): case v.tLen(0) >= db.s.o.GetWriteL0PauseTrigger():
delayed = true delayed = true
@ -90,15 +90,15 @@ func (db *DB) flush(n int) (mem *memDB, nn int, err error) {
} }
default: default:
// Allow memdb to grow if it has no entry. // Allow memdb to grow if it has no entry.
if mem.mdb.Len() == 0 { if mdb.Len() == 0 {
nn = n mdbFree = n
} else { } else {
mem.decref() mdb.decref()
mem, err = db.rotateMem(n) mdb, err = db.rotateMem(n)
if err == nil { if err == nil {
nn = mem.mdb.Free() mdbFree = mdb.Free()
} else { } else {
nn = 0 mdbFree = 0
} }
} }
return false return false
@ -157,18 +157,18 @@ func (db *DB) Write(b *Batch, wo *opt.WriteOptions) (err error) {
} }
}() }()
mem, memFree, err := db.flush(b.size()) mdb, mdbFree, err := db.flush(b.size())
if err != nil { if err != nil {
return return
} }
defer mem.decref() defer mdb.decref()
// Calculate maximum size of the batch. // Calculate maximum size of the batch.
m := 1 << 20 m := 1 << 20
if x := b.size(); x <= 128<<10 { if x := b.size(); x <= 128<<10 {
m = x + (128 << 10) m = x + (128 << 10)
} }
m = minInt(m, memFree) m = minInt(m, mdbFree)
// Merge with other batch. // Merge with other batch.
drain: drain:
@ -197,7 +197,7 @@ drain:
select { select {
case db.journalC <- b: case db.journalC <- b:
// Write into memdb // Write into memdb
if berr := b.memReplay(mem.mdb); berr != nil { if berr := b.memReplay(mdb.DB); berr != nil {
panic(berr) panic(berr)
} }
case err = <-db.compPerErrC: case err = <-db.compPerErrC:
@ -211,7 +211,7 @@ drain:
case err = <-db.journalAckC: case err = <-db.journalAckC:
if err != nil { if err != nil {
// Revert memdb if error detected // Revert memdb if error detected
if berr := b.revertMemReplay(mem.mdb); berr != nil { if berr := b.revertMemReplay(mdb.DB); berr != nil {
panic(berr) panic(berr)
} }
return return
@ -225,7 +225,7 @@ drain:
if err != nil { if err != nil {
return return
} }
if berr := b.memReplay(mem.mdb); berr != nil { if berr := b.memReplay(mdb.DB); berr != nil {
panic(berr) panic(berr)
} }
} }
@ -233,7 +233,7 @@ drain:
// Set last seq number. // Set last seq number.
db.addSeq(uint64(b.Len())) db.addSeq(uint64(b.Len()))
if b.size() >= memFree { if b.size() >= mdbFree {
db.rotateMem(0) db.rotateMem(0)
} }
return return
@ -249,8 +249,7 @@ func (db *DB) Put(key, value []byte, wo *opt.WriteOptions) error {
return db.Write(b, wo) return db.Write(b, wo)
} }
// Delete deletes the value for the given key. It returns ErrNotFound if // Delete deletes the value for the given key.
// the DB does not contain the key.
// //
// It is safe to modify the contents of the arguments after Delete returns. // It is safe to modify the contents of the arguments after Delete returns.
func (db *DB) Delete(key []byte, wo *opt.WriteOptions) error { func (db *DB) Delete(key []byte, wo *opt.WriteOptions) error {
@ -290,9 +289,9 @@ func (db *DB) CompactRange(r util.Range) error {
} }
// Check for overlaps in memdb. // Check for overlaps in memdb.
mem := db.getEffectiveMem() mdb := db.getEffectiveMem()
defer mem.decref() defer mdb.decref()
if isMemOverlaps(db.s.icmp, mem.mdb, r.Start, r.Limit) { if isMemOverlaps(db.s.icmp, mdb.DB, r.Start, r.Limit) {
// Memdb compaction. // Memdb compaction.
if _, err := db.rotateMem(0); err != nil { if _, err := db.rotateMem(0); err != nil {
<-db.writeLockC <-db.writeLockC
@ -309,3 +308,31 @@ func (db *DB) CompactRange(r util.Range) error {
// Table compaction. // Table compaction.
return db.compSendRange(db.tcompCmdC, -1, r.Start, r.Limit) return db.compSendRange(db.tcompCmdC, -1, r.Start, r.Limit)
} }
// SetReadOnly makes DB read-only. It will stay read-only until reopened.
func (db *DB) SetReadOnly() error {
if err := db.ok(); err != nil {
return err
}
// Lock writer.
select {
case db.writeLockC <- struct{}{}:
db.compWriteLocking = true
case err := <-db.compPerErrC:
return err
case _, _ = <-db.closeC:
return ErrClosed
}
// Set compaction read-only.
select {
case db.compErrSetC <- ErrReadOnly:
case perr := <-db.compPerErrC:
return perr
case _, _ = <-db.closeC:
return ErrClosed
}
return nil
}

View File

@ -12,6 +12,7 @@ import (
var ( var (
ErrNotFound = errors.ErrNotFound ErrNotFound = errors.ErrNotFound
ErrReadOnly = errors.New("leveldb: read-only mode")
ErrSnapshotReleased = errors.New("leveldb: snapshot released") ErrSnapshotReleased = errors.New("leveldb: snapshot released")
ErrIterReleased = errors.New("leveldb: iterator released") ErrIterReleased = errors.New("leveldb: iterator released")
ErrClosed = errors.New("leveldb: closed") ErrClosed = errors.New("leveldb: closed")

View File

@ -206,6 +206,7 @@ func (p *DB) randHeight() (h int) {
return return
} }
// Must hold RW-lock if prev == true, as it use shared prevNode slice.
func (p *DB) findGE(key []byte, prev bool) (int, bool) { func (p *DB) findGE(key []byte, prev bool) (int, bool) {
node := 0 node := 0
h := p.maxHeight - 1 h := p.maxHeight - 1
@ -302,7 +303,7 @@ func (p *DB) Put(key []byte, value []byte) error {
node := len(p.nodeData) node := len(p.nodeData)
p.nodeData = append(p.nodeData, kvOffset, len(key), len(value), h) p.nodeData = append(p.nodeData, kvOffset, len(key), len(value), h)
for i, n := range p.prevNode[:h] { for i, n := range p.prevNode[:h] {
m := n + 4 + i m := n + nNext + i
p.nodeData = append(p.nodeData, p.nodeData[m]) p.nodeData = append(p.nodeData, p.nodeData[m])
p.nodeData[m] = node p.nodeData[m] = node
} }
@ -434,20 +435,22 @@ func (p *DB) Len() int {
// Reset resets the DB to initial empty state. Allows reuse the buffer. // Reset resets the DB to initial empty state. Allows reuse the buffer.
func (p *DB) Reset() { func (p *DB) Reset() {
p.mu.Lock()
p.rnd = rand.New(rand.NewSource(0xdeadbeef)) p.rnd = rand.New(rand.NewSource(0xdeadbeef))
p.maxHeight = 1 p.maxHeight = 1
p.n = 0 p.n = 0
p.kvSize = 0 p.kvSize = 0
p.kvData = p.kvData[:0] p.kvData = p.kvData[:0]
p.nodeData = p.nodeData[:4+tMaxHeight] p.nodeData = p.nodeData[:nNext+tMaxHeight]
p.nodeData[nKV] = 0 p.nodeData[nKV] = 0
p.nodeData[nKey] = 0 p.nodeData[nKey] = 0
p.nodeData[nVal] = 0 p.nodeData[nVal] = 0
p.nodeData[nHeight] = tMaxHeight p.nodeData[nHeight] = tMaxHeight
for n := 0; n < tMaxHeight; n++ { for n := 0; n < tMaxHeight; n++ {
p.nodeData[4+n] = 0 p.nodeData[nNext+n] = 0
p.prevNode[n] = 0 p.prevNode[n] = 0
} }
p.mu.Unlock()
} }
// New creates a new initalized in-memory key/value DB. The capacity // New creates a new initalized in-memory key/value DB. The capacity

View File

@ -34,10 +34,11 @@ var (
DefaultCompactionTotalSize = 10 * MiB DefaultCompactionTotalSize = 10 * MiB
DefaultCompactionTotalSizeMultiplier = 10.0 DefaultCompactionTotalSizeMultiplier = 10.0
DefaultCompressionType = SnappyCompression DefaultCompressionType = SnappyCompression
DefaultOpenFilesCacher = LRUCacher DefaultIteratorSamplingRate = 1 * MiB
DefaultOpenFilesCacheCapacity = 500
DefaultMaxMemCompationLevel = 2 DefaultMaxMemCompationLevel = 2
DefaultNumLevel = 7 DefaultNumLevel = 7
DefaultOpenFilesCacher = LRUCacher
DefaultOpenFilesCacheCapacity = 500
DefaultWriteBuffer = 4 * MiB DefaultWriteBuffer = 4 * MiB
DefaultWriteL0PauseTrigger = 12 DefaultWriteL0PauseTrigger = 12
DefaultWriteL0SlowdownTrigger = 8 DefaultWriteL0SlowdownTrigger = 8
@ -249,6 +250,11 @@ type Options struct {
// The default value (DefaultCompression) uses snappy compression. // The default value (DefaultCompression) uses snappy compression.
Compression Compression Compression Compression
// DisableBufferPool allows disable use of util.BufferPool functionality.
//
// The default value is false.
DisableBufferPool bool
// DisableBlockCache allows disable use of cache.Cache functionality on // DisableBlockCache allows disable use of cache.Cache functionality on
// 'sorted table' block. // 'sorted table' block.
// //
@ -288,6 +294,13 @@ type Options struct {
// The default value is nil. // The default value is nil.
Filter filter.Filter Filter filter.Filter
// IteratorSamplingRate defines approximate gap (in bytes) between read
// sampling of an iterator. The samples will be used to determine when
// compaction should be triggered.
//
// The default is 1MiB.
IteratorSamplingRate int
// MaxMemCompationLevel defines maximum level a newly compacted 'memdb' // MaxMemCompationLevel defines maximum level a newly compacted 'memdb'
// will be pushed into if doesn't creates overlap. This should less than // will be pushed into if doesn't creates overlap. This should less than
// NumLevel. Use -1 for level-0. // NumLevel. Use -1 for level-0.
@ -313,6 +326,11 @@ type Options struct {
// The default value is 500. // The default value is 500.
OpenFilesCacheCapacity int OpenFilesCacheCapacity int
// If true then opens DB in read-only mode.
//
// The default value is false.
ReadOnly bool
// Strict defines the DB strict level. // Strict defines the DB strict level.
Strict Strict Strict Strict
@ -464,6 +482,20 @@ func (o *Options) GetCompression() Compression {
return o.Compression return o.Compression
} }
func (o *Options) GetDisableBufferPool() bool {
if o == nil {
return false
}
return o.DisableBufferPool
}
func (o *Options) GetDisableBlockCache() bool {
if o == nil {
return false
}
return o.DisableBlockCache
}
func (o *Options) GetDisableCompactionBackoff() bool { func (o *Options) GetDisableCompactionBackoff() bool {
if o == nil { if o == nil {
return false return false
@ -492,6 +524,13 @@ func (o *Options) GetFilter() filter.Filter {
return o.Filter return o.Filter
} }
func (o *Options) GetIteratorSamplingRate() int {
if o == nil || o.IteratorSamplingRate <= 0 {
return DefaultIteratorSamplingRate
}
return o.IteratorSamplingRate
}
func (o *Options) GetMaxMemCompationLevel() int { func (o *Options) GetMaxMemCompationLevel() int {
level := DefaultMaxMemCompationLevel level := DefaultMaxMemCompationLevel
if o != nil { if o != nil {
@ -533,6 +572,13 @@ func (o *Options) GetOpenFilesCacheCapacity() int {
return o.OpenFilesCacheCapacity return o.OpenFilesCacheCapacity
} }
func (o *Options) GetReadOnly() bool {
if o == nil {
return false
}
return o.ReadOnly
}
func (o *Options) GetStrict(strict Strict) bool { func (o *Options) GetStrict(strict Strict) bool {
if o == nil || o.Strict == 0 { if o == nil || o.Strict == 0 {
return DefaultStrict&strict != 0 return DefaultStrict&strict != 0

View File

@ -11,10 +11,8 @@ import (
"io" "io"
"os" "os"
"sync" "sync"
"sync/atomic"
"github.com/syndtr/goleveldb/leveldb/errors" "github.com/syndtr/goleveldb/leveldb/errors"
"github.com/syndtr/goleveldb/leveldb/iterator"
"github.com/syndtr/goleveldb/leveldb/journal" "github.com/syndtr/goleveldb/leveldb/journal"
"github.com/syndtr/goleveldb/leveldb/opt" "github.com/syndtr/goleveldb/leveldb/opt"
"github.com/syndtr/goleveldb/leveldb/storage" "github.com/syndtr/goleveldb/leveldb/storage"
@ -127,11 +125,16 @@ func (s *session) recover() (err error) {
return return
} }
defer reader.Close() defer reader.Close()
strict := s.o.GetStrict(opt.StrictManifest)
jr := journal.NewReader(reader, dropper{s, m}, strict, true)
staging := s.stVersion.newStaging() var (
rec := &sessionRecord{numLevel: s.o.GetNumLevel()} // Options.
numLevel = s.o.GetNumLevel()
strict = s.o.GetStrict(opt.StrictManifest)
jr = journal.NewReader(reader, dropper{s, m}, strict, true)
rec = &sessionRecord{}
staging = s.stVersion.newStaging()
)
for { for {
var r io.Reader var r io.Reader
r, err = jr.Next() r, err = jr.Next()
@ -143,7 +146,7 @@ func (s *session) recover() (err error) {
return errors.SetFile(err, m) return errors.SetFile(err, m)
} }
err = rec.decode(r) err = rec.decode(r, numLevel)
if err == nil { if err == nil {
// save compact pointers // save compact pointers
for _, r := range rec.compPtrs { for _, r := range rec.compPtrs {
@ -206,250 +209,3 @@ func (s *session) commit(r *sessionRecord) (err error) {
return return
} }
// Pick a compaction based on current state; need external synchronization.
func (s *session) pickCompaction() *compaction {
v := s.version()
var level int
var t0 tFiles
if v.cScore >= 1 {
level = v.cLevel
cptr := s.stCompPtrs[level]
tables := v.tables[level]
for _, t := range tables {
if cptr == nil || s.icmp.Compare(t.imax, cptr) > 0 {
t0 = append(t0, t)
break
}
}
if len(t0) == 0 {
t0 = append(t0, tables[0])
}
} else {
if p := atomic.LoadPointer(&v.cSeek); p != nil {
ts := (*tSet)(p)
level = ts.level
t0 = append(t0, ts.table)
} else {
v.release()
return nil
}
}
return newCompaction(s, v, level, t0)
}
// Create compaction from given level and range; need external synchronization.
func (s *session) getCompactionRange(level int, umin, umax []byte) *compaction {
v := s.version()
t0 := v.tables[level].getOverlaps(nil, s.icmp, umin, umax, level == 0)
if len(t0) == 0 {
v.release()
return nil
}
// Avoid compacting too much in one shot in case the range is large.
// But we cannot do this for level-0 since level-0 files can overlap
// and we must not pick one file and drop another older file if the
// two files overlap.
if level > 0 {
limit := uint64(v.s.o.GetCompactionSourceLimit(level))
total := uint64(0)
for i, t := range t0 {
total += t.size
if total >= limit {
s.logf("table@compaction limiting F·%d -> F·%d", len(t0), i+1)
t0 = t0[:i+1]
break
}
}
}
return newCompaction(s, v, level, t0)
}
func newCompaction(s *session, v *version, level int, t0 tFiles) *compaction {
c := &compaction{
s: s,
v: v,
level: level,
tables: [2]tFiles{t0, nil},
maxGPOverlaps: uint64(s.o.GetCompactionGPOverlaps(level)),
tPtrs: make([]int, s.o.GetNumLevel()),
}
c.expand()
c.save()
return c
}
// compaction represent a compaction state.
type compaction struct {
s *session
v *version
level int
tables [2]tFiles
maxGPOverlaps uint64
gp tFiles
gpi int
seenKey bool
gpOverlappedBytes uint64
imin, imax iKey
tPtrs []int
released bool
snapGPI int
snapSeenKey bool
snapGPOverlappedBytes uint64
snapTPtrs []int
}
func (c *compaction) save() {
c.snapGPI = c.gpi
c.snapSeenKey = c.seenKey
c.snapGPOverlappedBytes = c.gpOverlappedBytes
c.snapTPtrs = append(c.snapTPtrs[:0], c.tPtrs...)
}
func (c *compaction) restore() {
c.gpi = c.snapGPI
c.seenKey = c.snapSeenKey
c.gpOverlappedBytes = c.snapGPOverlappedBytes
c.tPtrs = append(c.tPtrs[:0], c.snapTPtrs...)
}
func (c *compaction) release() {
if !c.released {
c.released = true
c.v.release()
}
}
// Expand compacted tables; need external synchronization.
func (c *compaction) expand() {
limit := uint64(c.s.o.GetCompactionExpandLimit(c.level))
vt0, vt1 := c.v.tables[c.level], c.v.tables[c.level+1]
t0, t1 := c.tables[0], c.tables[1]
imin, imax := t0.getRange(c.s.icmp)
// We expand t0 here just incase ukey hop across tables.
t0 = vt0.getOverlaps(t0, c.s.icmp, imin.ukey(), imax.ukey(), c.level == 0)
if len(t0) != len(c.tables[0]) {
imin, imax = t0.getRange(c.s.icmp)
}
t1 = vt1.getOverlaps(t1, c.s.icmp, imin.ukey(), imax.ukey(), false)
// Get entire range covered by compaction.
amin, amax := append(t0, t1...).getRange(c.s.icmp)
// See if we can grow the number of inputs in "level" without
// changing the number of "level+1" files we pick up.
if len(t1) > 0 {
exp0 := vt0.getOverlaps(nil, c.s.icmp, amin.ukey(), amax.ukey(), c.level == 0)
if len(exp0) > len(t0) && t1.size()+exp0.size() < limit {
xmin, xmax := exp0.getRange(c.s.icmp)
exp1 := vt1.getOverlaps(nil, c.s.icmp, xmin.ukey(), xmax.ukey(), false)
if len(exp1) == len(t1) {
c.s.logf("table@compaction expanding L%d+L%d (F·%d S·%s)+(F·%d S·%s) -> (F·%d S·%s)+(F·%d S·%s)",
c.level, c.level+1, len(t0), shortenb(int(t0.size())), len(t1), shortenb(int(t1.size())),
len(exp0), shortenb(int(exp0.size())), len(exp1), shortenb(int(exp1.size())))
imin, imax = xmin, xmax
t0, t1 = exp0, exp1
amin, amax = append(t0, t1...).getRange(c.s.icmp)
}
}
}
// Compute the set of grandparent files that overlap this compaction
// (parent == level+1; grandparent == level+2)
if c.level+2 < c.s.o.GetNumLevel() {
c.gp = c.v.tables[c.level+2].getOverlaps(c.gp, c.s.icmp, amin.ukey(), amax.ukey(), false)
}
c.tables[0], c.tables[1] = t0, t1
c.imin, c.imax = imin, imax
}
// Check whether compaction is trivial.
func (c *compaction) trivial() bool {
return len(c.tables[0]) == 1 && len(c.tables[1]) == 0 && c.gp.size() <= c.maxGPOverlaps
}
func (c *compaction) baseLevelForKey(ukey []byte) bool {
for level, tables := range c.v.tables[c.level+2:] {
for c.tPtrs[level] < len(tables) {
t := tables[c.tPtrs[level]]
if c.s.icmp.uCompare(ukey, t.imax.ukey()) <= 0 {
// We've advanced far enough.
if c.s.icmp.uCompare(ukey, t.imin.ukey()) >= 0 {
// Key falls in this file's range, so definitely not base level.
return false
}
break
}
c.tPtrs[level]++
}
}
return true
}
func (c *compaction) shouldStopBefore(ikey iKey) bool {
for ; c.gpi < len(c.gp); c.gpi++ {
gp := c.gp[c.gpi]
if c.s.icmp.Compare(ikey, gp.imax) <= 0 {
break
}
if c.seenKey {
c.gpOverlappedBytes += gp.size
}
}
c.seenKey = true
if c.gpOverlappedBytes > c.maxGPOverlaps {
// Too much overlap for current output; start new output.
c.gpOverlappedBytes = 0
return true
}
return false
}
// Creates an iterator.
func (c *compaction) newIterator() iterator.Iterator {
// Creates iterator slice.
icap := len(c.tables)
if c.level == 0 {
// Special case for level-0
icap = len(c.tables[0]) + 1
}
its := make([]iterator.Iterator, 0, icap)
// Options.
ro := &opt.ReadOptions{
DontFillCache: true,
Strict: opt.StrictOverride,
}
strict := c.s.o.GetStrict(opt.StrictCompaction)
if strict {
ro.Strict |= opt.StrictReader
}
for i, tables := range c.tables {
if len(tables) == 0 {
continue
}
// Level-0 is not sorted and may overlaps each other.
if c.level+i == 0 {
for _, t := range tables {
its = append(its, c.s.tops.newIterator(t, nil, ro))
}
} else {
it := iterator.NewIndexedIterator(tables.newIndexIterator(c.s.tops, c.s.icmp, nil, ro), strict)
its = append(its, it)
}
}
return iterator.NewMergedIterator(its, c.s.icmp, strict)
}

View File

@ -0,0 +1,287 @@
// Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com>
// All rights reserved.
//
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
package leveldb
import (
"sync/atomic"
"github.com/syndtr/goleveldb/leveldb/iterator"
"github.com/syndtr/goleveldb/leveldb/memdb"
"github.com/syndtr/goleveldb/leveldb/opt"
)
func (s *session) pickMemdbLevel(umin, umax []byte) int {
v := s.version()
defer v.release()
return v.pickMemdbLevel(umin, umax)
}
func (s *session) flushMemdb(rec *sessionRecord, mdb *memdb.DB, level int) (level_ int, err error) {
// Create sorted table.
iter := mdb.NewIterator(nil)
defer iter.Release()
t, n, err := s.tops.createFrom(iter)
if err != nil {
return level, err
}
// Pick level and add to record.
if level < 0 {
level = s.pickMemdbLevel(t.imin.ukey(), t.imax.ukey())
}
rec.addTableFile(level, t)
s.logf("memdb@flush created L%d@%d N·%d S·%s %q:%q", level, t.file.Num(), n, shortenb(int(t.size)), t.imin, t.imax)
return level, nil
}
// Pick a compaction based on current state; need external synchronization.
func (s *session) pickCompaction() *compaction {
v := s.version()
var level int
var t0 tFiles
if v.cScore >= 1 {
level = v.cLevel
cptr := s.stCompPtrs[level]
tables := v.tables[level]
for _, t := range tables {
if cptr == nil || s.icmp.Compare(t.imax, cptr) > 0 {
t0 = append(t0, t)
break
}
}
if len(t0) == 0 {
t0 = append(t0, tables[0])
}
} else {
if p := atomic.LoadPointer(&v.cSeek); p != nil {
ts := (*tSet)(p)
level = ts.level
t0 = append(t0, ts.table)
} else {
v.release()
return nil
}
}
return newCompaction(s, v, level, t0)
}
// Create compaction from given level and range; need external synchronization.
func (s *session) getCompactionRange(level int, umin, umax []byte) *compaction {
v := s.version()
t0 := v.tables[level].getOverlaps(nil, s.icmp, umin, umax, level == 0)
if len(t0) == 0 {
v.release()
return nil
}
// Avoid compacting too much in one shot in case the range is large.
// But we cannot do this for level-0 since level-0 files can overlap
// and we must not pick one file and drop another older file if the
// two files overlap.
if level > 0 {
limit := uint64(v.s.o.GetCompactionSourceLimit(level))
total := uint64(0)
for i, t := range t0 {
total += t.size
if total >= limit {
s.logf("table@compaction limiting F·%d -> F·%d", len(t0), i+1)
t0 = t0[:i+1]
break
}
}
}
return newCompaction(s, v, level, t0)
}
func newCompaction(s *session, v *version, level int, t0 tFiles) *compaction {
c := &compaction{
s: s,
v: v,
level: level,
tables: [2]tFiles{t0, nil},
maxGPOverlaps: uint64(s.o.GetCompactionGPOverlaps(level)),
tPtrs: make([]int, s.o.GetNumLevel()),
}
c.expand()
c.save()
return c
}
// compaction represent a compaction state.
type compaction struct {
s *session
v *version
level int
tables [2]tFiles
maxGPOverlaps uint64
gp tFiles
gpi int
seenKey bool
gpOverlappedBytes uint64
imin, imax iKey
tPtrs []int
released bool
snapGPI int
snapSeenKey bool
snapGPOverlappedBytes uint64
snapTPtrs []int
}
func (c *compaction) save() {
c.snapGPI = c.gpi
c.snapSeenKey = c.seenKey
c.snapGPOverlappedBytes = c.gpOverlappedBytes
c.snapTPtrs = append(c.snapTPtrs[:0], c.tPtrs...)
}
func (c *compaction) restore() {
c.gpi = c.snapGPI
c.seenKey = c.snapSeenKey
c.gpOverlappedBytes = c.snapGPOverlappedBytes
c.tPtrs = append(c.tPtrs[:0], c.snapTPtrs...)
}
func (c *compaction) release() {
if !c.released {
c.released = true
c.v.release()
}
}
// Expand compacted tables; need external synchronization.
func (c *compaction) expand() {
limit := uint64(c.s.o.GetCompactionExpandLimit(c.level))
vt0, vt1 := c.v.tables[c.level], c.v.tables[c.level+1]
t0, t1 := c.tables[0], c.tables[1]
imin, imax := t0.getRange(c.s.icmp)
// We expand t0 here just incase ukey hop across tables.
t0 = vt0.getOverlaps(t0, c.s.icmp, imin.ukey(), imax.ukey(), c.level == 0)
if len(t0) != len(c.tables[0]) {
imin, imax = t0.getRange(c.s.icmp)
}
t1 = vt1.getOverlaps(t1, c.s.icmp, imin.ukey(), imax.ukey(), false)
// Get entire range covered by compaction.
amin, amax := append(t0, t1...).getRange(c.s.icmp)
// See if we can grow the number of inputs in "level" without
// changing the number of "level+1" files we pick up.
if len(t1) > 0 {
exp0 := vt0.getOverlaps(nil, c.s.icmp, amin.ukey(), amax.ukey(), c.level == 0)
if len(exp0) > len(t0) && t1.size()+exp0.size() < limit {
xmin, xmax := exp0.getRange(c.s.icmp)
exp1 := vt1.getOverlaps(nil, c.s.icmp, xmin.ukey(), xmax.ukey(), false)
if len(exp1) == len(t1) {
c.s.logf("table@compaction expanding L%d+L%d (F·%d S·%s)+(F·%d S·%s) -> (F·%d S·%s)+(F·%d S·%s)",
c.level, c.level+1, len(t0), shortenb(int(t0.size())), len(t1), shortenb(int(t1.size())),
len(exp0), shortenb(int(exp0.size())), len(exp1), shortenb(int(exp1.size())))
imin, imax = xmin, xmax
t0, t1 = exp0, exp1
amin, amax = append(t0, t1...).getRange(c.s.icmp)
}
}
}
// Compute the set of grandparent files that overlap this compaction
// (parent == level+1; grandparent == level+2)
if c.level+2 < c.s.o.GetNumLevel() {
c.gp = c.v.tables[c.level+2].getOverlaps(c.gp, c.s.icmp, amin.ukey(), amax.ukey(), false)
}
c.tables[0], c.tables[1] = t0, t1
c.imin, c.imax = imin, imax
}
// Check whether compaction is trivial.
func (c *compaction) trivial() bool {
return len(c.tables[0]) == 1 && len(c.tables[1]) == 0 && c.gp.size() <= c.maxGPOverlaps
}
func (c *compaction) baseLevelForKey(ukey []byte) bool {
for level, tables := range c.v.tables[c.level+2:] {
for c.tPtrs[level] < len(tables) {
t := tables[c.tPtrs[level]]
if c.s.icmp.uCompare(ukey, t.imax.ukey()) <= 0 {
// We've advanced far enough.
if c.s.icmp.uCompare(ukey, t.imin.ukey()) >= 0 {
// Key falls in this file's range, so definitely not base level.
return false
}
break
}
c.tPtrs[level]++
}
}
return true
}
func (c *compaction) shouldStopBefore(ikey iKey) bool {
for ; c.gpi < len(c.gp); c.gpi++ {
gp := c.gp[c.gpi]
if c.s.icmp.Compare(ikey, gp.imax) <= 0 {
break
}
if c.seenKey {
c.gpOverlappedBytes += gp.size
}
}
c.seenKey = true
if c.gpOverlappedBytes > c.maxGPOverlaps {
// Too much overlap for current output; start new output.
c.gpOverlappedBytes = 0
return true
}
return false
}
// Creates an iterator.
func (c *compaction) newIterator() iterator.Iterator {
// Creates iterator slice.
icap := len(c.tables)
if c.level == 0 {
// Special case for level-0.
icap = len(c.tables[0]) + 1
}
its := make([]iterator.Iterator, 0, icap)
// Options.
ro := &opt.ReadOptions{
DontFillCache: true,
Strict: opt.StrictOverride,
}
strict := c.s.o.GetStrict(opt.StrictCompaction)
if strict {
ro.Strict |= opt.StrictReader
}
for i, tables := range c.tables {
if len(tables) == 0 {
continue
}
// Level-0 is not sorted and may overlaps each other.
if c.level+i == 0 {
for _, t := range tables {
its = append(its, c.s.tops.newIterator(t, nil, ro))
}
} else {
it := iterator.NewIndexedIterator(tables.newIndexIterator(c.s.tops, c.s.icmp, nil, ro), strict)
its = append(its, it)
}
}
return iterator.NewMergedIterator(its, c.s.icmp, strict)
}

View File

@ -52,8 +52,6 @@ type dtRecord struct {
} }
type sessionRecord struct { type sessionRecord struct {
numLevel int
hasRec int hasRec int
comparer string comparer string
journalNum uint64 journalNum uint64
@ -230,7 +228,7 @@ func (p *sessionRecord) readBytes(field string, r byteReader) []byte {
return x return x
} }
func (p *sessionRecord) readLevel(field string, r io.ByteReader) int { func (p *sessionRecord) readLevel(field string, r io.ByteReader, numLevel int) int {
if p.err != nil { if p.err != nil {
return 0 return 0
} }
@ -238,14 +236,14 @@ func (p *sessionRecord) readLevel(field string, r io.ByteReader) int {
if p.err != nil { if p.err != nil {
return 0 return 0
} }
if x >= uint64(p.numLevel) { if x >= uint64(numLevel) {
p.err = errors.NewErrCorrupted(nil, &ErrManifestCorrupted{field, "invalid level number"}) p.err = errors.NewErrCorrupted(nil, &ErrManifestCorrupted{field, "invalid level number"})
return 0 return 0
} }
return int(x) return int(x)
} }
func (p *sessionRecord) decode(r io.Reader) error { func (p *sessionRecord) decode(r io.Reader, numLevel int) error {
br, ok := r.(byteReader) br, ok := r.(byteReader)
if !ok { if !ok {
br = bufio.NewReader(r) br = bufio.NewReader(r)
@ -286,13 +284,13 @@ func (p *sessionRecord) decode(r io.Reader) error {
p.setSeqNum(x) p.setSeqNum(x)
} }
case recCompPtr: case recCompPtr:
level := p.readLevel("comp-ptr.level", br) level := p.readLevel("comp-ptr.level", br, numLevel)
ikey := p.readBytes("comp-ptr.ikey", br) ikey := p.readBytes("comp-ptr.ikey", br)
if p.err == nil { if p.err == nil {
p.addCompPtr(level, iKey(ikey)) p.addCompPtr(level, iKey(ikey))
} }
case recAddTable: case recAddTable:
level := p.readLevel("add-table.level", br) level := p.readLevel("add-table.level", br, numLevel)
num := p.readUvarint("add-table.num", br) num := p.readUvarint("add-table.num", br)
size := p.readUvarint("add-table.size", br) size := p.readUvarint("add-table.size", br)
imin := p.readBytes("add-table.imin", br) imin := p.readBytes("add-table.imin", br)
@ -301,7 +299,7 @@ func (p *sessionRecord) decode(r io.Reader) error {
p.addTable(level, num, size, imin, imax) p.addTable(level, num, size, imin, imax)
} }
case recDelTable: case recDelTable:
level := p.readLevel("del-table.level", br) level := p.readLevel("del-table.level", br, numLevel)
num := p.readUvarint("del-table.num", br) num := p.readUvarint("del-table.num", br)
if p.err == nil { if p.err == nil {
p.delTable(level, num) p.delTable(level, num)

View File

@ -19,8 +19,8 @@ func decodeEncode(v *sessionRecord) (res bool, err error) {
if err != nil { if err != nil {
return return
} }
v2 := &sessionRecord{numLevel: opt.DefaultNumLevel} v2 := &sessionRecord{}
err = v.decode(b) err = v.decode(b, opt.DefaultNumLevel)
if err != nil { if err != nil {
return return
} }
@ -34,7 +34,7 @@ func decodeEncode(v *sessionRecord) (res bool, err error) {
func TestSessionRecord_EncodeDecode(t *testing.T) { func TestSessionRecord_EncodeDecode(t *testing.T) {
big := uint64(1) << 50 big := uint64(1) << 50
v := &sessionRecord{numLevel: opt.DefaultNumLevel} v := &sessionRecord{}
i := uint64(0) i := uint64(0)
test := func() { test := func() {
res, err := decodeEncode(v) res, err := decodeEncode(v)

View File

@ -182,7 +182,7 @@ func (s *session) newManifest(rec *sessionRecord, v *version) (err error) {
defer v.release() defer v.release()
} }
if rec == nil { if rec == nil {
rec = &sessionRecord{numLevel: s.o.GetNumLevel()} rec = &sessionRecord{}
} }
s.fillRecord(rec, true) s.fillRecord(rec, true)
v.fillRecord(rec) v.fillRecord(rec)

View File

@ -42,6 +42,8 @@ type tsOp uint
const ( const (
tsOpOpen tsOp = iota tsOpOpen tsOp = iota
tsOpCreate tsOpCreate
tsOpReplace
tsOpRemove
tsOpRead tsOpRead
tsOpReadAt tsOpReadAt
tsOpWrite tsOpWrite
@ -241,6 +243,10 @@ func (tf tsFile) Replace(newfile storage.File) (err error) {
if err != nil { if err != nil {
return return
} }
if tf.shouldErr(tsOpReplace) {
err = errors.New("leveldb.testStorage: emulated create error")
return
}
err = tf.File.Replace(newfile.(tsFile).File) err = tf.File.Replace(newfile.(tsFile).File)
if err != nil { if err != nil {
ts.t.Errorf("E: cannot replace file, num=%d type=%v: %v", tf.Num(), tf.Type(), err) ts.t.Errorf("E: cannot replace file, num=%d type=%v: %v", tf.Num(), tf.Type(), err)
@ -258,6 +264,10 @@ func (tf tsFile) Remove() (err error) {
if err != nil { if err != nil {
return return
} }
if tf.shouldErr(tsOpRemove) {
err = errors.New("leveldb.testStorage: emulated create error")
return
}
err = tf.File.Remove() err = tf.File.Remove()
if err != nil { if err != nil {
ts.t.Errorf("E: cannot remove file, num=%d type=%v: %v", tf.Num(), tf.Type(), err) ts.t.Errorf("E: cannot remove file, num=%d type=%v: %v", tf.Num(), tf.Type(), err)

View File

@ -441,22 +441,26 @@ func newTableOps(s *session) *tOps {
var ( var (
cacher cache.Cacher cacher cache.Cacher
bcache *cache.Cache bcache *cache.Cache
bpool *util.BufferPool
) )
if s.o.GetOpenFilesCacheCapacity() > 0 { if s.o.GetOpenFilesCacheCapacity() > 0 {
cacher = cache.NewLRU(s.o.GetOpenFilesCacheCapacity()) cacher = cache.NewLRU(s.o.GetOpenFilesCacheCapacity())
} }
if !s.o.DisableBlockCache { if !s.o.GetDisableBlockCache() {
var bcacher cache.Cacher var bcacher cache.Cacher
if s.o.GetBlockCacheCapacity() > 0 { if s.o.GetBlockCacheCapacity() > 0 {
bcacher = cache.NewLRU(s.o.GetBlockCacheCapacity()) bcacher = cache.NewLRU(s.o.GetBlockCacheCapacity())
} }
bcache = cache.NewCache(bcacher) bcache = cache.NewCache(bcacher)
} }
if !s.o.GetDisableBufferPool() {
bpool = util.NewBufferPool(s.o.GetBlockSize() + 5)
}
return &tOps{ return &tOps{
s: s, s: s,
cache: cache.NewCache(cacher), cache: cache.NewCache(cacher),
bcache: bcache, bcache: bcache,
bpool: util.NewBufferPool(s.o.GetBlockSize() + 5), bpool: bpool,
} }
} }

View File

@ -14,7 +14,7 @@ import (
"strings" "strings"
"sync" "sync"
"github.com/syndtr/gosnappy/snappy" "github.com/google/go-snappy/snappy"
"github.com/syndtr/goleveldb/leveldb/cache" "github.com/syndtr/goleveldb/leveldb/cache"
"github.com/syndtr/goleveldb/leveldb/comparer" "github.com/syndtr/goleveldb/leveldb/comparer"

View File

@ -12,7 +12,7 @@ import (
"fmt" "fmt"
"io" "io"
"github.com/syndtr/gosnappy/snappy" "github.com/google/go-snappy/snappy"
"github.com/syndtr/goleveldb/leveldb/comparer" "github.com/syndtr/goleveldb/leveldb/comparer"
"github.com/syndtr/goleveldb/leveldb/filter" "github.com/syndtr/goleveldb/leveldb/filter"

View File

@ -136,9 +136,8 @@ func (v *version) get(ikey iKey, ro *opt.ReadOptions, noValue bool) (value []byt
if !tseek { if !tseek {
if tset == nil { if tset == nil {
tset = &tSet{level, t} tset = &tSet{level, t}
} else if tset.table.consumeSeek() <= 0 { } else {
tseek = true tseek = true
tcomp = atomic.CompareAndSwapPointer(&v.cSeek, nil, unsafe.Pointer(tset))
} }
} }
@ -203,6 +202,28 @@ func (v *version) get(ikey iKey, ro *opt.ReadOptions, noValue bool) (value []byt
return true return true
}) })
if tseek && tset.table.consumeSeek() <= 0 {
tcomp = atomic.CompareAndSwapPointer(&v.cSeek, nil, unsafe.Pointer(tset))
}
return
}
func (v *version) sampleSeek(ikey iKey) (tcomp bool) {
var tset *tSet
v.walkOverlapping(ikey, func(level int, t *tFile) bool {
if tset == nil {
tset = &tSet{level, t}
return true
} else {
if tset.table.consumeSeek() <= 0 {
tcomp = atomic.CompareAndSwapPointer(&v.cSeek, nil, unsafe.Pointer(tset))
}
return false
}
}, nil)
return return
} }
@ -279,7 +300,7 @@ func (v *version) offsetOf(ikey iKey) (n uint64, err error) {
return return
} }
func (v *version) pickLevel(umin, umax []byte) (level int) { func (v *version) pickMemdbLevel(umin, umax []byte) (level int) {
if !v.tables[0].overlaps(v.s.icmp, umin, umax, true) { if !v.tables[0].overlaps(v.s.icmp, umin, umax, true) {
var overlaps tFiles var overlaps tFiles
maxLevel := v.s.o.GetMaxMemCompationLevel() maxLevel := v.s.o.GetMaxMemCompationLevel()