mirror of
https://github.com/octoleo/syncthing.git
synced 2024-11-14 09:14:10 +00:00
6a840a040b
This adds metadata updates to the same write batch as the underlying file change. The odds of a metadata update going missing is greatly reduced. Bonus change: actually commit the transaction in recalcMeta.
214 lines
5.1 KiB
Go
214 lines
5.1 KiB
Go
// Copyright (C) 2018 The Syncthing Authors.
|
|
//
|
|
// This Source Code Form is subject to the terms of the Mozilla Public
|
|
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
|
|
// You can obtain one at https://mozilla.org/MPL/2.0/.
|
|
|
|
package backend
|
|
|
|
import (
|
|
"github.com/syndtr/goleveldb/leveldb"
|
|
"github.com/syndtr/goleveldb/leveldb/iterator"
|
|
"github.com/syndtr/goleveldb/leveldb/util"
|
|
)
|
|
|
|
const (
|
|
// Never flush transactions smaller than this, even on Checkpoint()
|
|
dbFlushBatchMin = 1 << MiB
|
|
// Once a transaction reaches this size, flush it unconditionally.
|
|
dbFlushBatchMax = 128 << MiB
|
|
)
|
|
|
|
// leveldbBackend implements Backend on top of a leveldb
|
|
type leveldbBackend struct {
|
|
ldb *leveldb.DB
|
|
closeWG *closeWaitGroup
|
|
}
|
|
|
|
func newLeveldbBackend(ldb *leveldb.DB) *leveldbBackend {
|
|
return &leveldbBackend{
|
|
ldb: ldb,
|
|
closeWG: &closeWaitGroup{},
|
|
}
|
|
}
|
|
|
|
func (b *leveldbBackend) NewReadTransaction() (ReadTransaction, error) {
|
|
return b.newSnapshot()
|
|
}
|
|
|
|
func (b *leveldbBackend) newSnapshot() (leveldbSnapshot, error) {
|
|
rel, err := newReleaser(b.closeWG)
|
|
if err != nil {
|
|
return leveldbSnapshot{}, err
|
|
}
|
|
snap, err := b.ldb.GetSnapshot()
|
|
if err != nil {
|
|
rel.Release()
|
|
return leveldbSnapshot{}, wrapLeveldbErr(err)
|
|
}
|
|
return leveldbSnapshot{
|
|
snap: snap,
|
|
rel: rel,
|
|
}, nil
|
|
}
|
|
|
|
func (b *leveldbBackend) NewWriteTransaction() (WriteTransaction, error) {
|
|
rel, err := newReleaser(b.closeWG)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
snap, err := b.newSnapshot()
|
|
if err != nil {
|
|
rel.Release()
|
|
return nil, err // already wrapped
|
|
}
|
|
return &leveldbTransaction{
|
|
leveldbSnapshot: snap,
|
|
ldb: b.ldb,
|
|
batch: new(leveldb.Batch),
|
|
rel: rel,
|
|
}, nil
|
|
}
|
|
|
|
func (b *leveldbBackend) Close() error {
|
|
b.closeWG.CloseWait()
|
|
return wrapLeveldbErr(b.ldb.Close())
|
|
}
|
|
|
|
func (b *leveldbBackend) Get(key []byte) ([]byte, error) {
|
|
val, err := b.ldb.Get(key, nil)
|
|
return val, wrapLeveldbErr(err)
|
|
}
|
|
|
|
func (b *leveldbBackend) NewPrefixIterator(prefix []byte) (Iterator, error) {
|
|
return &leveldbIterator{b.ldb.NewIterator(util.BytesPrefix(prefix), nil)}, nil
|
|
}
|
|
|
|
func (b *leveldbBackend) NewRangeIterator(first, last []byte) (Iterator, error) {
|
|
return &leveldbIterator{b.ldb.NewIterator(&util.Range{Start: first, Limit: last}, nil)}, nil
|
|
}
|
|
|
|
func (b *leveldbBackend) Put(key, val []byte) error {
|
|
return wrapLeveldbErr(b.ldb.Put(key, val, nil))
|
|
}
|
|
|
|
func (b *leveldbBackend) Delete(key []byte) error {
|
|
return wrapLeveldbErr(b.ldb.Delete(key, nil))
|
|
}
|
|
|
|
func (b *leveldbBackend) Compact() error {
|
|
// Race is detected during testing when db is closed while compaction
|
|
// is ongoing.
|
|
err := b.closeWG.Add(1)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer b.closeWG.Done()
|
|
return wrapLeveldbErr(b.ldb.CompactRange(util.Range{}))
|
|
}
|
|
|
|
// leveldbSnapshot implements backend.ReadTransaction
|
|
type leveldbSnapshot struct {
|
|
snap *leveldb.Snapshot
|
|
rel *releaser
|
|
}
|
|
|
|
func (l leveldbSnapshot) Get(key []byte) ([]byte, error) {
|
|
val, err := l.snap.Get(key, nil)
|
|
return val, wrapLeveldbErr(err)
|
|
}
|
|
|
|
func (l leveldbSnapshot) NewPrefixIterator(prefix []byte) (Iterator, error) {
|
|
return l.snap.NewIterator(util.BytesPrefix(prefix), nil), nil
|
|
}
|
|
|
|
func (l leveldbSnapshot) NewRangeIterator(first, last []byte) (Iterator, error) {
|
|
return l.snap.NewIterator(&util.Range{Start: first, Limit: last}, nil), nil
|
|
}
|
|
|
|
func (l leveldbSnapshot) Release() {
|
|
l.snap.Release()
|
|
l.rel.Release()
|
|
}
|
|
|
|
// leveldbTransaction implements backend.WriteTransaction using a batch (not
|
|
// an actual leveldb transaction)
|
|
type leveldbTransaction struct {
|
|
leveldbSnapshot
|
|
ldb *leveldb.DB
|
|
batch *leveldb.Batch
|
|
rel *releaser
|
|
}
|
|
|
|
func (t *leveldbTransaction) Delete(key []byte) error {
|
|
t.batch.Delete(key)
|
|
return t.checkFlush(dbFlushBatchMax)
|
|
}
|
|
|
|
func (t *leveldbTransaction) Put(key, val []byte) error {
|
|
t.batch.Put(key, val)
|
|
return t.checkFlush(dbFlushBatchMax)
|
|
}
|
|
|
|
func (t *leveldbTransaction) Checkpoint(preFlush ...func() error) error {
|
|
return t.checkFlush(dbFlushBatchMin, preFlush...)
|
|
}
|
|
|
|
func (t *leveldbTransaction) Commit() error {
|
|
err := wrapLeveldbErr(t.flush())
|
|
t.leveldbSnapshot.Release()
|
|
t.rel.Release()
|
|
return err
|
|
}
|
|
|
|
func (t *leveldbTransaction) Release() {
|
|
t.leveldbSnapshot.Release()
|
|
t.rel.Release()
|
|
}
|
|
|
|
// checkFlush flushes and resets the batch if its size exceeds the given size.
|
|
func (t *leveldbTransaction) checkFlush(size int, preFlush ...func() error) error {
|
|
if len(t.batch.Dump()) < size {
|
|
return nil
|
|
}
|
|
for _, hook := range preFlush {
|
|
if err := hook(); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return t.flush()
|
|
}
|
|
|
|
func (t *leveldbTransaction) flush() error {
|
|
if t.batch.Len() == 0 {
|
|
return nil
|
|
}
|
|
if err := t.ldb.Write(t.batch, nil); err != nil {
|
|
return wrapLeveldbErr(err)
|
|
}
|
|
t.batch.Reset()
|
|
return nil
|
|
}
|
|
|
|
type leveldbIterator struct {
|
|
iterator.Iterator
|
|
}
|
|
|
|
func (it *leveldbIterator) Error() error {
|
|
return wrapLeveldbErr(it.Iterator.Error())
|
|
}
|
|
|
|
// wrapLeveldbErr wraps errors so that the backend package can recognize them
|
|
func wrapLeveldbErr(err error) error {
|
|
if err == nil {
|
|
return nil
|
|
}
|
|
if err == leveldb.ErrClosed {
|
|
return errClosed{}
|
|
}
|
|
if err == leveldb.ErrNotFound {
|
|
return errNotFound{}
|
|
}
|
|
return err
|
|
}
|