mirror of
https://github.com/octoleo/syncthing.git
synced 2025-01-24 15:38:27 +00:00
65aaa607ab
Change made by: - running "gvt fetch" on each of the packages mentioned in Godeps/Godeps.json - `rm -rf Godeps` - tweaking the build scripts to not mention Godeps - tweaking the build scripts to test `./lib/...`, `./cmd/...` explicitly (to avoid testing vendor) - tweaking the build scripts to not juggle GOPATH for Godeps and instead set GO15VENDOREXPERIMENT. This also results in some updated packages at the same time I bet. Building with Go 1.3 and 1.4 still *works* but won't use our vendored dependencies - the user needs to have the actual packages in their GOPATH then, which they'll get with a normal "go get". Building with Go 1.6+ will get our vendored dependencies by default even when not using our build script, which is nice. By doing this we gain some freedom in that we can pick and choose manually what to include in vendor, as it's not based on just dependency analysis of our own code. This is also a risk as we might pick up dependencies we are unaware of, as the build may work locally with those packages present in GOPATH. On the other hand the build server will detect this as it has no packages in it's GOPATH beyond what is included in the repo. Recommended tool to manage dependencies is github.com/FiloSottile/gvt.
1092 lines
25 KiB
Go
1092 lines
25 KiB
Go
// Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com>
|
|
// All rights reserved.
|
|
//
|
|
// Use of this source code is governed by a BSD-style license that can be
|
|
// found in the LICENSE file.
|
|
|
|
package leveldb
|
|
|
|
import (
|
|
"container/list"
|
|
"fmt"
|
|
"io"
|
|
"os"
|
|
"runtime"
|
|
"strings"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/syndtr/goleveldb/leveldb/errors"
|
|
"github.com/syndtr/goleveldb/leveldb/iterator"
|
|
"github.com/syndtr/goleveldb/leveldb/journal"
|
|
"github.com/syndtr/goleveldb/leveldb/memdb"
|
|
"github.com/syndtr/goleveldb/leveldb/opt"
|
|
"github.com/syndtr/goleveldb/leveldb/storage"
|
|
"github.com/syndtr/goleveldb/leveldb/table"
|
|
"github.com/syndtr/goleveldb/leveldb/util"
|
|
)
|
|
|
|
// DB is a LevelDB database.
|
|
type DB struct {
|
|
// Need 64-bit alignment.
|
|
seq uint64
|
|
|
|
// Session.
|
|
s *session
|
|
|
|
// MemDB.
|
|
memMu sync.RWMutex
|
|
memPool chan *memdb.DB
|
|
mem, frozenMem *memDB
|
|
journal *journal.Writer
|
|
journalWriter storage.Writer
|
|
journalFd storage.FileDesc
|
|
frozenJournalFd storage.FileDesc
|
|
frozenSeq uint64
|
|
|
|
// Snapshot.
|
|
snapsMu sync.Mutex
|
|
snapsList *list.List
|
|
|
|
// Stats.
|
|
aliveSnaps, aliveIters int32
|
|
|
|
// Write.
|
|
writeC chan *Batch
|
|
writeMergedC chan bool
|
|
writeLockC chan struct{}
|
|
writeAckC chan error
|
|
writeDelay time.Duration
|
|
writeDelayN int
|
|
journalC chan *Batch
|
|
journalAckC chan error
|
|
tr *Transaction
|
|
|
|
// Compaction.
|
|
compCommitLk sync.Mutex
|
|
tcompCmdC chan cCmd
|
|
tcompPauseC chan chan<- struct{}
|
|
mcompCmdC chan cCmd
|
|
compErrC chan error
|
|
compPerErrC chan error
|
|
compErrSetC chan error
|
|
compWriteLocking bool
|
|
compStats cStats
|
|
memdbMaxLevel int // For testing.
|
|
|
|
// Close.
|
|
closeW sync.WaitGroup
|
|
closeC chan struct{}
|
|
closed uint32
|
|
closer io.Closer
|
|
}
|
|
|
|
func openDB(s *session) (*DB, error) {
|
|
s.log("db@open opening")
|
|
start := time.Now()
|
|
db := &DB{
|
|
s: s,
|
|
// Initial sequence
|
|
seq: s.stSeqNum,
|
|
// MemDB
|
|
memPool: make(chan *memdb.DB, 1),
|
|
// Snapshot
|
|
snapsList: list.New(),
|
|
// Write
|
|
writeC: make(chan *Batch),
|
|
writeMergedC: make(chan bool),
|
|
writeLockC: make(chan struct{}, 1),
|
|
writeAckC: make(chan error),
|
|
journalC: make(chan *Batch),
|
|
journalAckC: make(chan error),
|
|
// Compaction
|
|
tcompCmdC: make(chan cCmd),
|
|
tcompPauseC: make(chan chan<- struct{}),
|
|
mcompCmdC: make(chan cCmd),
|
|
compErrC: make(chan error),
|
|
compPerErrC: make(chan error),
|
|
compErrSetC: make(chan error),
|
|
// Close
|
|
closeC: make(chan struct{}),
|
|
}
|
|
|
|
// Read-only mode.
|
|
readOnly := s.o.GetReadOnly()
|
|
|
|
if readOnly {
|
|
// Recover journals (read-only mode).
|
|
if err := db.recoverJournalRO(); err != nil {
|
|
return nil, err
|
|
}
|
|
} else {
|
|
// Recover journals.
|
|
if err := db.recoverJournal(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Remove any obsolete files.
|
|
if err := db.checkAndCleanFiles(); err != nil {
|
|
// Close journal.
|
|
if db.journal != nil {
|
|
db.journal.Close()
|
|
db.journalWriter.Close()
|
|
}
|
|
return nil, err
|
|
}
|
|
|
|
}
|
|
|
|
// Doesn't need to be included in the wait group.
|
|
go db.compactionError()
|
|
go db.mpoolDrain()
|
|
|
|
if readOnly {
|
|
db.SetReadOnly()
|
|
} else {
|
|
db.closeW.Add(3)
|
|
go db.tCompaction()
|
|
go db.mCompaction()
|
|
go db.jWriter()
|
|
}
|
|
|
|
s.logf("db@open done T·%v", time.Since(start))
|
|
|
|
runtime.SetFinalizer(db, (*DB).Close)
|
|
return db, nil
|
|
}
|
|
|
|
// Open opens or creates a DB for the given storage.
|
|
// The DB will be created if not exist, unless ErrorIfMissing is true.
|
|
// Also, if ErrorIfExist is true and the DB exist Open will returns
|
|
// os.ErrExist error.
|
|
//
|
|
// Open will return an error with type of ErrCorrupted if corruption
|
|
// detected in the DB. Corrupted DB can be recovered with Recover
|
|
// function.
|
|
//
|
|
// The returned DB instance is goroutine-safe.
|
|
// The DB must be closed after use, by calling Close method.
|
|
func Open(stor storage.Storage, o *opt.Options) (db *DB, err error) {
|
|
s, err := newSession(stor, o)
|
|
if err != nil {
|
|
return
|
|
}
|
|
defer func() {
|
|
if err != nil {
|
|
s.close()
|
|
s.release()
|
|
}
|
|
}()
|
|
|
|
err = s.recover()
|
|
if err != nil {
|
|
if !os.IsNotExist(err) || s.o.GetErrorIfMissing() {
|
|
return
|
|
}
|
|
err = s.create()
|
|
if err != nil {
|
|
return
|
|
}
|
|
} else if s.o.GetErrorIfExist() {
|
|
err = os.ErrExist
|
|
return
|
|
}
|
|
|
|
return openDB(s)
|
|
}
|
|
|
|
// OpenFile opens or creates a DB for the given path.
|
|
// The DB will be created if not exist, unless ErrorIfMissing is true.
|
|
// Also, if ErrorIfExist is true and the DB exist OpenFile will returns
|
|
// os.ErrExist error.
|
|
//
|
|
// OpenFile uses standard file-system backed storage implementation as
|
|
// desribed in the leveldb/storage package.
|
|
//
|
|
// OpenFile will return an error with type of ErrCorrupted if corruption
|
|
// detected in the DB. Corrupted DB can be recovered with Recover
|
|
// function.
|
|
//
|
|
// The returned DB instance is goroutine-safe.
|
|
// The DB must be closed after use, by calling Close method.
|
|
func OpenFile(path string, o *opt.Options) (db *DB, err error) {
|
|
stor, err := storage.OpenFile(path, o.GetReadOnly())
|
|
if err != nil {
|
|
return
|
|
}
|
|
db, err = Open(stor, o)
|
|
if err != nil {
|
|
stor.Close()
|
|
} else {
|
|
db.closer = stor
|
|
}
|
|
return
|
|
}
|
|
|
|
// Recover recovers and opens a DB with missing or corrupted manifest files
|
|
// for the given storage. It will ignore any manifest files, valid or not.
|
|
// The DB must already exist or it will returns an error.
|
|
// Also, Recover will ignore ErrorIfMissing and ErrorIfExist options.
|
|
//
|
|
// The returned DB instance is goroutine-safe.
|
|
// The DB must be closed after use, by calling Close method.
|
|
func Recover(stor storage.Storage, o *opt.Options) (db *DB, err error) {
|
|
s, err := newSession(stor, o)
|
|
if err != nil {
|
|
return
|
|
}
|
|
defer func() {
|
|
if err != nil {
|
|
s.close()
|
|
s.release()
|
|
}
|
|
}()
|
|
|
|
err = recoverTable(s, o)
|
|
if err != nil {
|
|
return
|
|
}
|
|
return openDB(s)
|
|
}
|
|
|
|
// RecoverFile recovers and opens a DB with missing or corrupted manifest files
|
|
// for the given path. It will ignore any manifest files, valid or not.
|
|
// The DB must already exist or it will returns an error.
|
|
// Also, Recover will ignore ErrorIfMissing and ErrorIfExist options.
|
|
//
|
|
// RecoverFile uses standard file-system backed storage implementation as desribed
|
|
// in the leveldb/storage package.
|
|
//
|
|
// The returned DB instance is goroutine-safe.
|
|
// The DB must be closed after use, by calling Close method.
|
|
func RecoverFile(path string, o *opt.Options) (db *DB, err error) {
|
|
stor, err := storage.OpenFile(path, false)
|
|
if err != nil {
|
|
return
|
|
}
|
|
db, err = Recover(stor, o)
|
|
if err != nil {
|
|
stor.Close()
|
|
} else {
|
|
db.closer = stor
|
|
}
|
|
return
|
|
}
|
|
|
|
func recoverTable(s *session, o *opt.Options) error {
|
|
o = dupOptions(o)
|
|
// Mask StrictReader, lets StrictRecovery doing its job.
|
|
o.Strict &= ^opt.StrictReader
|
|
|
|
// Get all tables and sort it by file number.
|
|
fds, err := s.stor.List(storage.TypeTable)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
sortFds(fds)
|
|
|
|
var (
|
|
maxSeq uint64
|
|
recoveredKey, goodKey, corruptedKey, corruptedBlock, droppedTable int
|
|
|
|
// We will drop corrupted table.
|
|
strict = o.GetStrict(opt.StrictRecovery)
|
|
noSync = o.GetNoSync()
|
|
|
|
rec = &sessionRecord{}
|
|
bpool = util.NewBufferPool(o.GetBlockSize() + 5)
|
|
)
|
|
buildTable := func(iter iterator.Iterator) (tmpFd storage.FileDesc, size int64, err error) {
|
|
tmpFd = s.newTemp()
|
|
writer, err := s.stor.Create(tmpFd)
|
|
if err != nil {
|
|
return
|
|
}
|
|
defer func() {
|
|
writer.Close()
|
|
if err != nil {
|
|
s.stor.Remove(tmpFd)
|
|
tmpFd = storage.FileDesc{}
|
|
}
|
|
}()
|
|
|
|
// Copy entries.
|
|
tw := table.NewWriter(writer, o)
|
|
for iter.Next() {
|
|
key := iter.Key()
|
|
if validInternalKey(key) {
|
|
err = tw.Append(key, iter.Value())
|
|
if err != nil {
|
|
return
|
|
}
|
|
}
|
|
}
|
|
err = iter.Error()
|
|
if err != nil {
|
|
return
|
|
}
|
|
err = tw.Close()
|
|
if err != nil {
|
|
return
|
|
}
|
|
if !noSync {
|
|
err = writer.Sync()
|
|
if err != nil {
|
|
return
|
|
}
|
|
}
|
|
size = int64(tw.BytesLen())
|
|
return
|
|
}
|
|
recoverTable := func(fd storage.FileDesc) error {
|
|
s.logf("table@recovery recovering @%d", fd.Num)
|
|
reader, err := s.stor.Open(fd)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
var closed bool
|
|
defer func() {
|
|
if !closed {
|
|
reader.Close()
|
|
}
|
|
}()
|
|
|
|
// Get file size.
|
|
size, err := reader.Seek(0, 2)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
var (
|
|
tSeq uint64
|
|
tgoodKey, tcorruptedKey, tcorruptedBlock int
|
|
imin, imax []byte
|
|
)
|
|
tr, err := table.NewReader(reader, size, fd, nil, bpool, o)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
iter := tr.NewIterator(nil, nil)
|
|
if itererr, ok := iter.(iterator.ErrorCallbackSetter); ok {
|
|
itererr.SetErrorCallback(func(err error) {
|
|
if errors.IsCorrupted(err) {
|
|
s.logf("table@recovery block corruption @%d %q", fd.Num, err)
|
|
tcorruptedBlock++
|
|
}
|
|
})
|
|
}
|
|
|
|
// Scan the table.
|
|
for iter.Next() {
|
|
key := iter.Key()
|
|
_, seq, _, kerr := parseInternalKey(key)
|
|
if kerr != nil {
|
|
tcorruptedKey++
|
|
continue
|
|
}
|
|
tgoodKey++
|
|
if seq > tSeq {
|
|
tSeq = seq
|
|
}
|
|
if imin == nil {
|
|
imin = append([]byte{}, key...)
|
|
}
|
|
imax = append(imax[:0], key...)
|
|
}
|
|
if err := iter.Error(); err != nil {
|
|
iter.Release()
|
|
return err
|
|
}
|
|
iter.Release()
|
|
|
|
goodKey += tgoodKey
|
|
corruptedKey += tcorruptedKey
|
|
corruptedBlock += tcorruptedBlock
|
|
|
|
if strict && (tcorruptedKey > 0 || tcorruptedBlock > 0) {
|
|
droppedTable++
|
|
s.logf("table@recovery dropped @%d Gk·%d Ck·%d Cb·%d S·%d Q·%d", fd.Num, tgoodKey, tcorruptedKey, tcorruptedBlock, size, tSeq)
|
|
return nil
|
|
}
|
|
|
|
if tgoodKey > 0 {
|
|
if tcorruptedKey > 0 || tcorruptedBlock > 0 {
|
|
// Rebuild the table.
|
|
s.logf("table@recovery rebuilding @%d", fd.Num)
|
|
iter := tr.NewIterator(nil, nil)
|
|
tmpFd, newSize, err := buildTable(iter)
|
|
iter.Release()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
closed = true
|
|
reader.Close()
|
|
if err := s.stor.Rename(tmpFd, fd); err != nil {
|
|
return err
|
|
}
|
|
size = newSize
|
|
}
|
|
if tSeq > maxSeq {
|
|
maxSeq = tSeq
|
|
}
|
|
recoveredKey += tgoodKey
|
|
// Add table to level 0.
|
|
rec.addTable(0, fd.Num, size, imin, imax)
|
|
s.logf("table@recovery recovered @%d Gk·%d Ck·%d Cb·%d S·%d Q·%d", fd.Num, tgoodKey, tcorruptedKey, tcorruptedBlock, size, tSeq)
|
|
} else {
|
|
droppedTable++
|
|
s.logf("table@recovery unrecoverable @%d Ck·%d Cb·%d S·%d", fd.Num, tcorruptedKey, tcorruptedBlock, size)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Recover all tables.
|
|
if len(fds) > 0 {
|
|
s.logf("table@recovery F·%d", len(fds))
|
|
|
|
// Mark file number as used.
|
|
s.markFileNum(fds[len(fds)-1].Num)
|
|
|
|
for _, fd := range fds {
|
|
if err := recoverTable(fd); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
s.logf("table@recovery recovered F·%d N·%d Gk·%d Ck·%d Q·%d", len(fds), recoveredKey, goodKey, corruptedKey, maxSeq)
|
|
}
|
|
|
|
// Set sequence number.
|
|
rec.setSeqNum(maxSeq)
|
|
|
|
// Create new manifest.
|
|
if err := s.create(); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Commit.
|
|
return s.commit(rec)
|
|
}
|
|
|
|
func (db *DB) recoverJournal() error {
|
|
// Get all journals and sort it by file number.
|
|
rawFds, err := db.s.stor.List(storage.TypeJournal)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
sortFds(rawFds)
|
|
|
|
// Journals that will be recovered.
|
|
var fds []storage.FileDesc
|
|
for _, fd := range rawFds {
|
|
if fd.Num >= db.s.stJournalNum || fd.Num == db.s.stPrevJournalNum {
|
|
fds = append(fds, fd)
|
|
}
|
|
}
|
|
|
|
var (
|
|
ofd storage.FileDesc // Obsolete file.
|
|
rec = &sessionRecord{}
|
|
)
|
|
|
|
// Recover journals.
|
|
if len(fds) > 0 {
|
|
db.logf("journal@recovery F·%d", len(fds))
|
|
|
|
// Mark file number as used.
|
|
db.s.markFileNum(fds[len(fds)-1].Num)
|
|
|
|
var (
|
|
// Options.
|
|
strict = db.s.o.GetStrict(opt.StrictJournal)
|
|
checksum = db.s.o.GetStrict(opt.StrictJournalChecksum)
|
|
writeBuffer = db.s.o.GetWriteBuffer()
|
|
|
|
jr *journal.Reader
|
|
mdb = memdb.New(db.s.icmp, writeBuffer)
|
|
buf = &util.Buffer{}
|
|
batch = &Batch{}
|
|
)
|
|
|
|
for _, fd := range fds {
|
|
db.logf("journal@recovery recovering @%d", fd.Num)
|
|
|
|
fr, err := db.s.stor.Open(fd)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Create or reset journal reader instance.
|
|
if jr == nil {
|
|
jr = journal.NewReader(fr, dropper{db.s, fd}, strict, checksum)
|
|
} else {
|
|
jr.Reset(fr, dropper{db.s, fd}, strict, checksum)
|
|
}
|
|
|
|
// Flush memdb and remove obsolete journal file.
|
|
if !ofd.Nil() {
|
|
if mdb.Len() > 0 {
|
|
if _, err := db.s.flushMemdb(rec, mdb, 0); err != nil {
|
|
fr.Close()
|
|
return err
|
|
}
|
|
}
|
|
|
|
rec.setJournalNum(fd.Num)
|
|
rec.setSeqNum(db.seq)
|
|
if err := db.s.commit(rec); err != nil {
|
|
fr.Close()
|
|
return err
|
|
}
|
|
rec.resetAddedTables()
|
|
|
|
db.s.stor.Remove(ofd)
|
|
ofd = storage.FileDesc{}
|
|
}
|
|
|
|
// Replay journal to memdb.
|
|
mdb.Reset()
|
|
for {
|
|
r, err := jr.Next()
|
|
if err != nil {
|
|
if err == io.EOF {
|
|
break
|
|
}
|
|
|
|
fr.Close()
|
|
return errors.SetFd(err, fd)
|
|
}
|
|
|
|
buf.Reset()
|
|
if _, err := buf.ReadFrom(r); err != nil {
|
|
if err == io.ErrUnexpectedEOF {
|
|
// This is error returned due to corruption, with strict == false.
|
|
continue
|
|
}
|
|
|
|
fr.Close()
|
|
return errors.SetFd(err, fd)
|
|
}
|
|
if err := batch.memDecodeAndReplay(db.seq, buf.Bytes(), mdb); err != nil {
|
|
if !strict && errors.IsCorrupted(err) {
|
|
db.s.logf("journal error: %v (skipped)", err)
|
|
// We won't apply sequence number as it might be corrupted.
|
|
continue
|
|
}
|
|
|
|
fr.Close()
|
|
return errors.SetFd(err, fd)
|
|
}
|
|
|
|
// Save sequence number.
|
|
db.seq = batch.seq + uint64(batch.Len())
|
|
|
|
// Flush it if large enough.
|
|
if mdb.Size() >= writeBuffer {
|
|
if _, err := db.s.flushMemdb(rec, mdb, 0); err != nil {
|
|
fr.Close()
|
|
return err
|
|
}
|
|
|
|
mdb.Reset()
|
|
}
|
|
}
|
|
|
|
fr.Close()
|
|
ofd = fd
|
|
}
|
|
|
|
// Flush the last memdb.
|
|
if mdb.Len() > 0 {
|
|
if _, err := db.s.flushMemdb(rec, mdb, 0); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
// Create a new journal.
|
|
if _, err := db.newMem(0); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Commit.
|
|
rec.setJournalNum(db.journalFd.Num)
|
|
rec.setSeqNum(db.seq)
|
|
if err := db.s.commit(rec); err != nil {
|
|
// Close journal on error.
|
|
if db.journal != nil {
|
|
db.journal.Close()
|
|
db.journalWriter.Close()
|
|
}
|
|
return err
|
|
}
|
|
|
|
// Remove the last obsolete journal file.
|
|
if !ofd.Nil() {
|
|
db.s.stor.Remove(ofd)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (db *DB) recoverJournalRO() error {
|
|
// Get all journals and sort it by file number.
|
|
rawFds, err := db.s.stor.List(storage.TypeJournal)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
sortFds(rawFds)
|
|
|
|
// Journals that will be recovered.
|
|
var fds []storage.FileDesc
|
|
for _, fd := range rawFds {
|
|
if fd.Num >= db.s.stJournalNum || fd.Num == db.s.stPrevJournalNum {
|
|
fds = append(fds, fd)
|
|
}
|
|
}
|
|
|
|
var (
|
|
// Options.
|
|
strict = db.s.o.GetStrict(opt.StrictJournal)
|
|
checksum = db.s.o.GetStrict(opt.StrictJournalChecksum)
|
|
writeBuffer = db.s.o.GetWriteBuffer()
|
|
|
|
mdb = memdb.New(db.s.icmp, writeBuffer)
|
|
)
|
|
|
|
// Recover journals.
|
|
if len(fds) > 0 {
|
|
db.logf("journal@recovery RO·Mode F·%d", len(fds))
|
|
|
|
var (
|
|
jr *journal.Reader
|
|
buf = &util.Buffer{}
|
|
batch = &Batch{}
|
|
)
|
|
|
|
for _, fd := range fds {
|
|
db.logf("journal@recovery recovering @%d", fd.Num)
|
|
|
|
fr, err := db.s.stor.Open(fd)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Create or reset journal reader instance.
|
|
if jr == nil {
|
|
jr = journal.NewReader(fr, dropper{db.s, fd}, strict, checksum)
|
|
} else {
|
|
jr.Reset(fr, dropper{db.s, fd}, strict, checksum)
|
|
}
|
|
|
|
// Replay journal to memdb.
|
|
for {
|
|
r, err := jr.Next()
|
|
if err != nil {
|
|
if err == io.EOF {
|
|
break
|
|
}
|
|
|
|
fr.Close()
|
|
return errors.SetFd(err, fd)
|
|
}
|
|
|
|
buf.Reset()
|
|
if _, err := buf.ReadFrom(r); err != nil {
|
|
if err == io.ErrUnexpectedEOF {
|
|
// This is error returned due to corruption, with strict == false.
|
|
continue
|
|
}
|
|
|
|
fr.Close()
|
|
return errors.SetFd(err, fd)
|
|
}
|
|
if err := batch.memDecodeAndReplay(db.seq, buf.Bytes(), mdb); err != nil {
|
|
if !strict && errors.IsCorrupted(err) {
|
|
db.s.logf("journal error: %v (skipped)", err)
|
|
// We won't apply sequence number as it might be corrupted.
|
|
continue
|
|
}
|
|
|
|
fr.Close()
|
|
return errors.SetFd(err, fd)
|
|
}
|
|
|
|
// Save sequence number.
|
|
db.seq = batch.seq + uint64(batch.Len())
|
|
}
|
|
|
|
fr.Close()
|
|
}
|
|
}
|
|
|
|
// Set memDB.
|
|
db.mem = &memDB{db: db, DB: mdb, ref: 1}
|
|
|
|
return nil
|
|
}
|
|
|
|
func memGet(mdb *memdb.DB, ikey internalKey, icmp *iComparer) (ok bool, mv []byte, err error) {
|
|
mk, mv, err := mdb.Find(ikey)
|
|
if err == nil {
|
|
ukey, _, kt, kerr := parseInternalKey(mk)
|
|
if kerr != nil {
|
|
// Shouldn't have had happen.
|
|
panic(kerr)
|
|
}
|
|
if icmp.uCompare(ukey, ikey.ukey()) == 0 {
|
|
if kt == keyTypeDel {
|
|
return true, nil, ErrNotFound
|
|
}
|
|
return true, mv, nil
|
|
|
|
}
|
|
} else if err != ErrNotFound {
|
|
return true, nil, err
|
|
}
|
|
return
|
|
}
|
|
|
|
func (db *DB) get(auxm *memdb.DB, auxt tFiles, key []byte, seq uint64, ro *opt.ReadOptions) (value []byte, err error) {
|
|
ikey := makeInternalKey(nil, key, seq, keyTypeSeek)
|
|
|
|
if auxm != nil {
|
|
if ok, mv, me := memGet(auxm, ikey, db.s.icmp); ok {
|
|
return append([]byte{}, mv...), me
|
|
}
|
|
}
|
|
|
|
em, fm := db.getMems()
|
|
for _, m := range [...]*memDB{em, fm} {
|
|
if m == nil {
|
|
continue
|
|
}
|
|
defer m.decref()
|
|
|
|
if ok, mv, me := memGet(m.DB, ikey, db.s.icmp); ok {
|
|
return append([]byte{}, mv...), me
|
|
}
|
|
}
|
|
|
|
v := db.s.version()
|
|
value, cSched, err := v.get(auxt, ikey, ro, false)
|
|
v.release()
|
|
if cSched {
|
|
// Trigger table compaction.
|
|
db.compTrigger(db.tcompCmdC)
|
|
}
|
|
return
|
|
}
|
|
|
|
func nilIfNotFound(err error) error {
|
|
if err == ErrNotFound {
|
|
return nil
|
|
}
|
|
return err
|
|
}
|
|
|
|
func (db *DB) has(auxm *memdb.DB, auxt tFiles, key []byte, seq uint64, ro *opt.ReadOptions) (ret bool, err error) {
|
|
ikey := makeInternalKey(nil, key, seq, keyTypeSeek)
|
|
|
|
if auxm != nil {
|
|
if ok, _, me := memGet(auxm, ikey, db.s.icmp); ok {
|
|
return me == nil, nilIfNotFound(me)
|
|
}
|
|
}
|
|
|
|
em, fm := db.getMems()
|
|
for _, m := range [...]*memDB{em, fm} {
|
|
if m == nil {
|
|
continue
|
|
}
|
|
defer m.decref()
|
|
|
|
if ok, _, me := memGet(m.DB, ikey, db.s.icmp); ok {
|
|
return me == nil, nilIfNotFound(me)
|
|
}
|
|
}
|
|
|
|
v := db.s.version()
|
|
_, cSched, err := v.get(auxt, ikey, ro, true)
|
|
v.release()
|
|
if cSched {
|
|
// Trigger table compaction.
|
|
db.compTrigger(db.tcompCmdC)
|
|
}
|
|
if err == nil {
|
|
ret = true
|
|
} else if err == ErrNotFound {
|
|
err = nil
|
|
}
|
|
return
|
|
}
|
|
|
|
// Get gets the value for the given key. It returns ErrNotFound if the
|
|
// DB does not contains the key.
|
|
//
|
|
// The returned slice is its own copy, it is safe to modify the contents
|
|
// of the returned slice.
|
|
// It is safe to modify the contents of the argument after Get returns.
|
|
func (db *DB) Get(key []byte, ro *opt.ReadOptions) (value []byte, err error) {
|
|
err = db.ok()
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
se := db.acquireSnapshot()
|
|
defer db.releaseSnapshot(se)
|
|
return db.get(nil, nil, key, se.seq, ro)
|
|
}
|
|
|
|
// Has returns true if the DB does contains the given key.
|
|
//
|
|
// It is safe to modify the contents of the argument after Get returns.
|
|
func (db *DB) Has(key []byte, ro *opt.ReadOptions) (ret bool, err error) {
|
|
err = db.ok()
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
se := db.acquireSnapshot()
|
|
defer db.releaseSnapshot(se)
|
|
return db.has(nil, nil, key, se.seq, ro)
|
|
}
|
|
|
|
// NewIterator returns an iterator for the latest snapshot of the
|
|
// underlying DB.
|
|
// The returned iterator is not goroutine-safe, but it is safe to use
|
|
// multiple iterators concurrently, with each in a dedicated goroutine.
|
|
// It is also safe to use an iterator concurrently with modifying its
|
|
// underlying DB. The resultant key/value pairs are guaranteed to be
|
|
// consistent.
|
|
//
|
|
// Slice allows slicing the iterator to only contains keys in the given
|
|
// range. A nil Range.Start is treated as a key before all keys in the
|
|
// DB. And a nil Range.Limit is treated as a key after all keys in
|
|
// the DB.
|
|
//
|
|
// The iterator must be released after use, by calling Release method.
|
|
//
|
|
// Also read Iterator documentation of the leveldb/iterator package.
|
|
func (db *DB) NewIterator(slice *util.Range, ro *opt.ReadOptions) iterator.Iterator {
|
|
if err := db.ok(); err != nil {
|
|
return iterator.NewEmptyIterator(err)
|
|
}
|
|
|
|
se := db.acquireSnapshot()
|
|
defer db.releaseSnapshot(se)
|
|
// Iterator holds 'version' lock, 'version' is immutable so snapshot
|
|
// can be released after iterator created.
|
|
return db.newIterator(nil, nil, se.seq, slice, ro)
|
|
}
|
|
|
|
// GetSnapshot returns a latest snapshot of the underlying DB. A snapshot
|
|
// is a frozen snapshot of a DB state at a particular point in time. The
|
|
// content of snapshot are guaranteed to be consistent.
|
|
//
|
|
// The snapshot must be released after use, by calling Release method.
|
|
func (db *DB) GetSnapshot() (*Snapshot, error) {
|
|
if err := db.ok(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return db.newSnapshot(), nil
|
|
}
|
|
|
|
// GetProperty returns value of the given property name.
|
|
//
|
|
// Property names:
|
|
// leveldb.num-files-at-level{n}
|
|
// Returns the number of files at level 'n'.
|
|
// leveldb.stats
|
|
// Returns statistics of the underlying DB.
|
|
// leveldb.sstables
|
|
// Returns sstables list for each level.
|
|
// leveldb.blockpool
|
|
// Returns block pool stats.
|
|
// leveldb.cachedblock
|
|
// Returns size of cached block.
|
|
// leveldb.openedtables
|
|
// Returns number of opened tables.
|
|
// leveldb.alivesnaps
|
|
// Returns number of alive snapshots.
|
|
// leveldb.aliveiters
|
|
// Returns number of alive iterators.
|
|
func (db *DB) GetProperty(name string) (value string, err error) {
|
|
err = db.ok()
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
const prefix = "leveldb."
|
|
if !strings.HasPrefix(name, prefix) {
|
|
return "", ErrNotFound
|
|
}
|
|
p := name[len(prefix):]
|
|
|
|
v := db.s.version()
|
|
defer v.release()
|
|
|
|
numFilesPrefix := "num-files-at-level"
|
|
switch {
|
|
case strings.HasPrefix(p, numFilesPrefix):
|
|
var level uint
|
|
var rest string
|
|
n, _ := fmt.Sscanf(p[len(numFilesPrefix):], "%d%s", &level, &rest)
|
|
if n != 1 {
|
|
err = ErrNotFound
|
|
} else {
|
|
value = fmt.Sprint(v.tLen(int(level)))
|
|
}
|
|
case p == "stats":
|
|
value = "Compactions\n" +
|
|
" Level | Tables | Size(MB) | Time(sec) | Read(MB) | Write(MB)\n" +
|
|
"-------+------------+---------------+---------------+---------------+---------------\n"
|
|
for level, tables := range v.levels {
|
|
duration, read, write := db.compStats.getStat(level)
|
|
if len(tables) == 0 && duration == 0 {
|
|
continue
|
|
}
|
|
value += fmt.Sprintf(" %3d | %10d | %13.5f | %13.5f | %13.5f | %13.5f\n",
|
|
level, len(tables), float64(tables.size())/1048576.0, duration.Seconds(),
|
|
float64(read)/1048576.0, float64(write)/1048576.0)
|
|
}
|
|
case p == "sstables":
|
|
for level, tables := range v.levels {
|
|
value += fmt.Sprintf("--- level %d ---\n", level)
|
|
for _, t := range tables {
|
|
value += fmt.Sprintf("%d:%d[%q .. %q]\n", t.fd.Num, t.size, t.imin, t.imax)
|
|
}
|
|
}
|
|
case p == "blockpool":
|
|
value = fmt.Sprintf("%v", db.s.tops.bpool)
|
|
case p == "cachedblock":
|
|
if db.s.tops.bcache != nil {
|
|
value = fmt.Sprintf("%d", db.s.tops.bcache.Size())
|
|
} else {
|
|
value = "<nil>"
|
|
}
|
|
case p == "openedtables":
|
|
value = fmt.Sprintf("%d", db.s.tops.cache.Size())
|
|
case p == "alivesnaps":
|
|
value = fmt.Sprintf("%d", atomic.LoadInt32(&db.aliveSnaps))
|
|
case p == "aliveiters":
|
|
value = fmt.Sprintf("%d", atomic.LoadInt32(&db.aliveIters))
|
|
default:
|
|
err = ErrNotFound
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
// SizeOf calculates approximate sizes of the given key ranges.
|
|
// The length of the returned sizes are equal with the length of the given
|
|
// ranges. The returned sizes measure storage space usage, so if the user
|
|
// data compresses by a factor of ten, the returned sizes will be one-tenth
|
|
// the size of the corresponding user data size.
|
|
// The results may not include the sizes of recently written data.
|
|
func (db *DB) SizeOf(ranges []util.Range) (Sizes, error) {
|
|
if err := db.ok(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
v := db.s.version()
|
|
defer v.release()
|
|
|
|
sizes := make(Sizes, 0, len(ranges))
|
|
for _, r := range ranges {
|
|
imin := makeInternalKey(nil, r.Start, keyMaxSeq, keyTypeSeek)
|
|
imax := makeInternalKey(nil, r.Limit, keyMaxSeq, keyTypeSeek)
|
|
start, err := v.offsetOf(imin)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
limit, err := v.offsetOf(imax)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
var size int64
|
|
if limit >= start {
|
|
size = limit - start
|
|
}
|
|
sizes = append(sizes, size)
|
|
}
|
|
|
|
return sizes, nil
|
|
}
|
|
|
|
// Close closes the DB. This will also releases any outstanding snapshot,
|
|
// abort any in-flight compaction and discard open transaction.
|
|
//
|
|
// It is not safe to close a DB until all outstanding iterators are released.
|
|
// It is valid to call Close multiple times. Other methods should not be
|
|
// called after the DB has been closed.
|
|
func (db *DB) Close() error {
|
|
if !db.setClosed() {
|
|
return ErrClosed
|
|
}
|
|
|
|
start := time.Now()
|
|
db.log("db@close closing")
|
|
|
|
// Clear the finalizer.
|
|
runtime.SetFinalizer(db, nil)
|
|
|
|
// Get compaction error.
|
|
var err error
|
|
select {
|
|
case err = <-db.compErrC:
|
|
if err == ErrReadOnly {
|
|
err = nil
|
|
}
|
|
default:
|
|
}
|
|
|
|
// Signal all goroutines.
|
|
close(db.closeC)
|
|
|
|
// Discard open transaction.
|
|
if db.tr != nil {
|
|
db.tr.Discard()
|
|
}
|
|
|
|
// Acquire writer lock.
|
|
db.writeLockC <- struct{}{}
|
|
|
|
// Wait for all gorotines to exit.
|
|
db.closeW.Wait()
|
|
|
|
// Closes journal.
|
|
if db.journal != nil {
|
|
db.journal.Close()
|
|
db.journalWriter.Close()
|
|
}
|
|
|
|
if db.writeDelayN > 0 {
|
|
db.logf("db@write was delayed N·%d T·%v", db.writeDelayN, db.writeDelay)
|
|
}
|
|
|
|
// Close session.
|
|
db.s.close()
|
|
db.logf("db@close done T·%v", time.Since(start))
|
|
db.s.release()
|
|
|
|
if db.closer != nil {
|
|
if err1 := db.closer.Close(); err == nil {
|
|
err = err1
|
|
}
|
|
}
|
|
|
|
// NIL'ing pointers.
|
|
db.s = nil
|
|
db.mem = nil
|
|
db.frozenMem = nil
|
|
db.journal = nil
|
|
db.journalWriter = nil
|
|
db.closer = nil
|
|
|
|
return err
|
|
}
|