syncthing/vendor/github.com/syndtr/goleveldb/leveldb/db_write.go

359 lines
7.7 KiB
Go
Raw Normal View History

2014-07-06 12:46:48 +00:00
// Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com>
// All rights reserved.
//
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
package leveldb
import (
"time"
"github.com/syndtr/goleveldb/leveldb/memdb"
"github.com/syndtr/goleveldb/leveldb/opt"
"github.com/syndtr/goleveldb/leveldb/util"
)
2014-07-23 06:31:36 +00:00
func (db *DB) writeJournal(b *Batch) error {
w, err := db.journal.Next()
2014-07-06 12:46:48 +00:00
if err != nil {
return err
}
if _, err := w.Write(b.encode()); err != nil {
return err
}
2014-07-23 06:31:36 +00:00
if err := db.journal.Flush(); err != nil {
2014-07-06 12:46:48 +00:00
return err
}
if b.sync {
2014-07-23 06:31:36 +00:00
return db.journalWriter.Sync()
2014-07-06 12:46:48 +00:00
}
return nil
}
2014-07-23 06:31:36 +00:00
func (db *DB) jWriter() {
defer db.closeW.Done()
2014-07-06 12:46:48 +00:00
for {
select {
2014-07-23 06:31:36 +00:00
case b := <-db.journalC:
2014-07-06 12:46:48 +00:00
if b != nil {
2014-07-23 06:31:36 +00:00
db.journalAckC <- db.writeJournal(b)
2014-07-06 12:46:48 +00:00
}
2014-07-23 06:31:36 +00:00
case _, _ = <-db.closeC:
2014-07-06 12:46:48 +00:00
return
}
}
}
func (db *DB) rotateMem(n int, wait bool) (mem *memDB, err error) {
2014-07-06 12:46:48 +00:00
// Wait for pending memdb compaction.
err = db.compTriggerWait(db.mcompCmdC)
2014-07-06 12:46:48 +00:00
if err != nil {
return
}
// Create new memdb and journal.
2014-07-23 06:31:36 +00:00
mem, err = db.newMem(n)
2014-07-06 12:46:48 +00:00
if err != nil {
return
}
// Schedule memdb compaction.
if wait {
err = db.compTriggerWait(db.mcompCmdC)
} else {
db.compTrigger(db.mcompCmdC)
}
2014-07-06 12:46:48 +00:00
return
}
2015-06-15 19:10:18 +00:00
func (db *DB) flush(n int) (mdb *memDB, mdbFree int, err error) {
2014-07-06 12:46:48 +00:00
delayed := false
2014-08-15 07:16:30 +00:00
flush := func() (retry bool) {
2014-07-23 06:31:36 +00:00
v := db.s.version()
2014-07-06 12:46:48 +00:00
defer v.release()
2015-06-15 19:10:18 +00:00
mdb = db.getEffectiveMem()
2014-08-15 07:16:30 +00:00
defer func() {
if retry {
2015-06-15 19:10:18 +00:00
mdb.decref()
mdb = nil
2014-08-15 07:16:30 +00:00
}
}()
2015-06-15 19:10:18 +00:00
mdbFree = mdb.Free()
2014-07-06 12:46:48 +00:00
switch {
2014-11-18 12:24:42 +00:00
case v.tLen(0) >= db.s.o.GetWriteL0SlowdownTrigger() && !delayed:
2014-07-06 12:46:48 +00:00
delayed = true
time.Sleep(time.Millisecond)
2015-06-15 19:10:18 +00:00
case mdbFree >= n:
2014-07-06 12:46:48 +00:00
return false
2014-11-18 12:24:42 +00:00
case v.tLen(0) >= db.s.o.GetWriteL0PauseTrigger():
2014-07-06 12:46:48 +00:00
delayed = true
err = db.compTriggerWait(db.tcompCmdC)
2014-07-06 12:46:48 +00:00
if err != nil {
return false
}
default:
// Allow memdb to grow if it has no entry.
2015-06-15 19:10:18 +00:00
if mdb.Len() == 0 {
mdbFree = n
2014-08-15 07:16:30 +00:00
} else {
2015-06-15 19:10:18 +00:00
mdb.decref()
mdb, err = db.rotateMem(n, false)
2014-08-15 07:16:30 +00:00
if err == nil {
2015-06-15 19:10:18 +00:00
mdbFree = mdb.Free()
2014-08-15 07:16:30 +00:00
} else {
2015-06-15 19:10:18 +00:00
mdbFree = 0
2014-08-15 07:16:30 +00:00
}
2014-07-06 12:46:48 +00:00
}
return false
}
return true
}
start := time.Now()
for flush() {
}
if delayed {
2014-11-18 12:24:42 +00:00
db.writeDelay += time.Since(start)
db.writeDelayN++
} else if db.writeDelayN > 0 {
2014-12-29 11:22:58 +00:00
db.logf("db@write was delayed N·%d T·%v", db.writeDelayN, db.writeDelay)
2014-11-18 12:24:42 +00:00
db.writeDelay = 0
db.writeDelayN = 0
2014-07-06 12:46:48 +00:00
}
return
}
// Write apply the given batch to the DB. The batch will be applied
// sequentially.
//
// It is safe to modify the contents of the arguments after Write returns.
2014-07-23 06:31:36 +00:00
func (db *DB) Write(b *Batch, wo *opt.WriteOptions) (err error) {
err = db.ok()
2014-11-18 12:24:42 +00:00
if err != nil || b == nil || b.Len() == 0 {
2014-07-06 12:46:48 +00:00
return
}
2015-08-18 06:56:07 +00:00
b.init(wo.GetSync() && !db.s.o.GetNoSync())
2014-07-06 12:46:48 +00:00
if b.size() > db.s.o.GetWriteBuffer() && !db.s.o.GetDisableLargeBatchTransaction() {
// Writes using transaction.
tr, err1 := db.OpenTransaction()
if err1 != nil {
return err1
}
if err1 := tr.Write(b, wo); err1 != nil {
tr.Discard()
return err1
}
return tr.Commit()
}
2014-07-06 12:46:48 +00:00
// The write happen synchronously.
select {
2014-07-23 06:31:36 +00:00
case db.writeC <- b:
if <-db.writeMergedC {
return <-db.writeAckC
2014-07-06 12:46:48 +00:00
}
// Continue, the write lock already acquired by previous writer
// and handed out to us.
2014-07-23 06:31:36 +00:00
case db.writeLockC <- struct{}{}:
2014-11-18 12:24:42 +00:00
case err = <-db.compPerErrC:
return
2014-07-23 06:31:36 +00:00
case _, _ = <-db.closeC:
2014-07-06 12:46:48 +00:00
return ErrClosed
}
merged := 0
2014-11-04 04:00:11 +00:00
danglingMerge := false
2014-07-06 12:46:48 +00:00
defer func() {
2014-11-04 04:00:11 +00:00
if danglingMerge {
// Only one dangling merge at most, so this is safe.
2014-11-04 04:00:11 +00:00
db.writeMergedC <- false
} else {
<-db.writeLockC
}
2014-07-06 12:46:48 +00:00
for i := 0; i < merged; i++ {
2014-07-23 06:31:36 +00:00
db.writeAckC <- err
2014-07-06 12:46:48 +00:00
}
}()
2015-06-15 19:10:18 +00:00
mdb, mdbFree, err := db.flush(b.size())
2014-07-06 12:46:48 +00:00
if err != nil {
return
}
2015-06-15 19:10:18 +00:00
defer mdb.decref()
2014-07-06 12:46:48 +00:00
// Calculate maximum size of the batch.
m := 1 << 20
if x := b.size(); x <= 128<<10 {
m = x + (128 << 10)
}
2015-06-15 19:10:18 +00:00
m = minInt(m, mdbFree)
2014-07-06 12:46:48 +00:00
// Merge with other batch.
drain:
for b.size() < m && !b.sync {
select {
2014-07-23 06:31:36 +00:00
case nb := <-db.writeC:
2014-07-06 12:46:48 +00:00
if b.size()+nb.size() <= m {
b.append(nb)
2014-07-23 06:31:36 +00:00
db.writeMergedC <- true
2014-07-06 12:46:48 +00:00
merged++
} else {
2014-11-04 04:00:11 +00:00
danglingMerge = true
2014-07-06 12:46:48 +00:00
break drain
}
default:
break drain
}
}
// Set batch first seq number relative from last seq.
2014-07-23 06:31:36 +00:00
b.seq = db.seq + 1
2014-07-06 12:46:48 +00:00
// Write journal concurrently if it is large enough.
if b.size() >= (128 << 10) {
// Push the write batch to the journal writer
select {
2014-11-18 12:24:42 +00:00
case db.journalC <- b:
// Write into memdb
2015-06-15 19:10:18 +00:00
if berr := b.memReplay(mdb.DB); berr != nil {
2014-11-18 12:24:42 +00:00
panic(berr)
}
case err = <-db.compPerErrC:
return
2014-07-23 06:31:36 +00:00
case _, _ = <-db.closeC:
2014-07-06 12:46:48 +00:00
err = ErrClosed
return
}
// Wait for journal writer
select {
2014-07-23 06:31:36 +00:00
case err = <-db.journalAckC:
2014-07-06 12:46:48 +00:00
if err != nil {
// Revert memdb if error detected
2015-06-15 19:10:18 +00:00
if berr := b.revertMemReplay(mdb.DB); berr != nil {
2014-11-18 12:24:42 +00:00
panic(berr)
}
2014-07-06 12:46:48 +00:00
return
}
2014-11-18 12:24:42 +00:00
case _, _ = <-db.closeC:
err = ErrClosed
return
2014-07-06 12:46:48 +00:00
}
} else {
2014-07-23 06:31:36 +00:00
err = db.writeJournal(b)
2014-07-06 12:46:48 +00:00
if err != nil {
return
}
2015-06-15 19:10:18 +00:00
if berr := b.memReplay(mdb.DB); berr != nil {
2014-11-18 12:24:42 +00:00
panic(berr)
}
2014-07-06 12:46:48 +00:00
}
// Set last seq number.
2014-11-18 12:24:42 +00:00
db.addSeq(uint64(b.Len()))
2014-07-06 12:46:48 +00:00
2015-06-15 19:10:18 +00:00
if b.size() >= mdbFree {
db.rotateMem(0, false)
2014-07-06 12:46:48 +00:00
}
return
}
// Put sets the value for the given key. It overwrites any previous value
// for that key; a DB is not a multi-map.
//
// It is safe to modify the contents of the arguments after Put returns.
2014-07-23 06:31:36 +00:00
func (db *DB) Put(key, value []byte, wo *opt.WriteOptions) error {
2014-07-06 12:46:48 +00:00
b := new(Batch)
b.Put(key, value)
2014-07-23 06:31:36 +00:00
return db.Write(b, wo)
2014-07-06 12:46:48 +00:00
}
2015-06-15 19:10:18 +00:00
// Delete deletes the value for the given key.
2014-07-06 12:46:48 +00:00
//
// It is safe to modify the contents of the arguments after Delete returns.
2014-07-23 06:31:36 +00:00
func (db *DB) Delete(key []byte, wo *opt.WriteOptions) error {
2014-07-06 12:46:48 +00:00
b := new(Batch)
b.Delete(key)
2014-07-23 06:31:36 +00:00
return db.Write(b, wo)
2014-07-06 12:46:48 +00:00
}
2014-07-06 21:13:10 +00:00
func isMemOverlaps(icmp *iComparer, mem *memdb.DB, min, max []byte) bool {
2014-07-06 12:46:48 +00:00
iter := mem.NewIterator(nil)
defer iter.Release()
return (max == nil || (iter.First() && icmp.uCompare(max, internalKey(iter.Key()).ukey()) >= 0)) &&
(min == nil || (iter.Last() && icmp.uCompare(min, internalKey(iter.Key()).ukey()) <= 0))
2014-07-06 12:46:48 +00:00
}
// CompactRange compacts the underlying DB for the given key range.
// In particular, deleted and overwritten versions are discarded,
// and the data is rearranged to reduce the cost of operations
// needed to access the data. This operation should typically only
// be invoked by users who understand the underlying implementation.
//
// A nil Range.Start is treated as a key before all keys in the DB.
// And a nil Range.Limit is treated as a key after all keys in the DB.
// Therefore if both is nil then it will compact entire DB.
2014-07-23 06:31:36 +00:00
func (db *DB) CompactRange(r util.Range) error {
if err := db.ok(); err != nil {
2014-07-06 12:46:48 +00:00
return err
}
2014-11-04 04:00:11 +00:00
// Lock writer.
2014-07-06 12:46:48 +00:00
select {
2014-07-23 06:31:36 +00:00
case db.writeLockC <- struct{}{}:
2014-11-18 12:24:42 +00:00
case err := <-db.compPerErrC:
return err
2014-07-23 06:31:36 +00:00
case _, _ = <-db.closeC:
2014-07-06 12:46:48 +00:00
return ErrClosed
}
// Check for overlaps in memdb.
2015-06-15 19:10:18 +00:00
mdb := db.getEffectiveMem()
defer mdb.decref()
if isMemOverlaps(db.s.icmp, mdb.DB, r.Start, r.Limit) {
2014-07-06 12:46:48 +00:00
// Memdb compaction.
if _, err := db.rotateMem(0, false); err != nil {
2014-07-23 06:31:36 +00:00
<-db.writeLockC
2014-07-06 12:46:48 +00:00
return err
}
2014-07-23 06:31:36 +00:00
<-db.writeLockC
if err := db.compTriggerWait(db.mcompCmdC); err != nil {
2014-07-06 12:46:48 +00:00
return err
}
} else {
2014-07-23 06:31:36 +00:00
<-db.writeLockC
2014-07-06 12:46:48 +00:00
}
// Table compaction.
return db.compTriggerRange(db.tcompCmdC, -1, r.Start, r.Limit)
2014-07-06 12:46:48 +00:00
}
2015-06-15 19:10:18 +00:00
// SetReadOnly makes DB read-only. It will stay read-only until reopened.
func (db *DB) SetReadOnly() error {
if err := db.ok(); err != nil {
return err
}
// Lock writer.
select {
case db.writeLockC <- struct{}{}:
db.compWriteLocking = true
case err := <-db.compPerErrC:
return err
case _, _ = <-db.closeC:
return ErrClosed
}
// Set compaction read-only.
select {
case db.compErrSetC <- ErrReadOnly:
case perr := <-db.compPerErrC:
return perr
case _, _ = <-db.closeC:
return ErrClosed
}
return nil
}