syncthing/lib/db/backend/badger_backend.go

451 lines
10 KiB
Go
Raw Normal View History

// Copyright (C) 2019 The Syncthing Authors.
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at https://mozilla.org/MPL/2.0/.
package backend
import (
"bytes"
"errors"
"time"
badger "github.com/dgraph-io/badger/v2"
)
const (
checkpointFlushMinSize = 128 << KiB
maxCacheSize = 64 << MiB
)
func OpenBadger(path string) (Backend, error) {
opts := badger.DefaultOptions(path)
opts = opts.WithMaxCacheSize(maxCacheSize).WithCompactL0OnClose(false)
opts.Logger = nil
return openBadger(opts)
}
func OpenBadgerMemory() Backend {
opts := badger.DefaultOptions("").WithInMemory(true)
opts.Logger = nil
backend, err := openBadger(opts)
if err != nil {
// Opening in-memory should never be able to fail, and is anyway
// used just by tests.
panic(err)
}
return backend
}
func openBadger(opts badger.Options) (Backend, error) {
// XXX: We should find good values for memory utilization in the "small"
// and "large" cases we support for LevelDB. Some notes here:
// https://github.com/dgraph-io/badger/tree/v2.0.3#memory-usage
bdb, err := badger.Open(opts)
if err != nil {
return nil, wrapBadgerErr(err)
}
return &badgerBackend{
bdb: bdb,
closeWG: &closeWaitGroup{},
}, nil
}
// badgerBackend implements Backend on top of a badger
type badgerBackend struct {
bdb *badger.DB
closeWG *closeWaitGroup
}
func (b *badgerBackend) NewReadTransaction() (ReadTransaction, error) {
rel, err := newReleaser(b.closeWG)
if err != nil {
return nil, err
}
return badgerSnapshot{
txn: b.bdb.NewTransaction(false),
rel: rel,
}, nil
}
2020-07-19 09:55:27 +03:00
func (b *badgerBackend) NewWriteTransaction(hooks ...CommitHook) (WriteTransaction, error) {
rel1, err := newReleaser(b.closeWG)
if err != nil {
return nil, err
}
rel2, err := newReleaser(b.closeWG)
if err != nil {
rel1.Release()
return nil, err
}
// We use two transactions here to preserve the property that our
// leveldb wrapper has, that writes in a transaction are completely
// invisible until it's committed, even inside that same transaction.
rtxn := b.bdb.NewTransaction(false)
wtxn := b.bdb.NewTransaction(true)
return &badgerTransaction{
badgerSnapshot: badgerSnapshot{
txn: rtxn,
rel: rel1,
},
2020-07-19 09:55:27 +03:00
txn: wtxn,
bdb: b.bdb,
rel: rel2,
commitHooks: hooks,
}, nil
}
func (b *badgerBackend) Close() error {
b.closeWG.CloseWait()
return wrapBadgerErr(b.bdb.Close())
}
func (b *badgerBackend) Get(key []byte) ([]byte, error) {
if err := b.closeWG.Add(1); err != nil {
return nil, err
}
defer b.closeWG.Done()
txn := b.bdb.NewTransaction(false)
defer txn.Discard()
item, err := txn.Get(key)
if err != nil {
return nil, wrapBadgerErr(err)
}
val, err := item.ValueCopy(nil)
if err != nil {
return nil, wrapBadgerErr(err)
}
return val, nil
}
func (b *badgerBackend) NewPrefixIterator(prefix []byte) (Iterator, error) {
if err := b.closeWG.Add(1); err != nil {
return nil, err
}
txn := b.bdb.NewTransaction(false)
it := badgerPrefixIterator(txn, prefix)
it.releaseFn = func() {
defer b.closeWG.Done()
txn.Discard()
}
return it, nil
}
func (b *badgerBackend) NewRangeIterator(first, last []byte) (Iterator, error) {
if err := b.closeWG.Add(1); err != nil {
return nil, err
}
txn := b.bdb.NewTransaction(false)
it := badgerRangeIterator(txn, first, last)
it.releaseFn = func() {
defer b.closeWG.Done()
txn.Discard()
}
return it, nil
}
func (b *badgerBackend) Put(key, val []byte) error {
if err := b.closeWG.Add(1); err != nil {
return err
}
defer b.closeWG.Done()
txn := b.bdb.NewTransaction(true)
if err := txn.Set(key, val); err != nil {
txn.Discard()
return wrapBadgerErr(err)
}
return wrapBadgerErr(txn.Commit())
}
func (b *badgerBackend) Delete(key []byte) error {
if err := b.closeWG.Add(1); err != nil {
return err
}
defer b.closeWG.Done()
txn := b.bdb.NewTransaction(true)
if err := txn.Delete(key); err != nil {
txn.Discard()
return wrapBadgerErr(err)
}
return wrapBadgerErr(txn.Commit())
}
func (b *badgerBackend) Compact() error {
if err := b.closeWG.Add(1); err != nil {
return err
}
defer b.closeWG.Done()
// This weird looking loop is as recommended in the README
// (https://github.com/dgraph-io/badger/tree/v2.0.3#garbage-collection).
// Basically, the RunValueLogGC will pick some promising thing to
// garbage collect at random and return nil if it improved the
// situation, then return ErrNoRewrite when there is nothing more to GC.
// The 0.5 is the discard ratio, for which the method docs say they
// "recommend setting discardRatio to 0.5, thus indicating that a file
// be rewritten if half the space can be discarded".
var err error
t0 := time.Now()
for err == nil {
if time.Since(t0) > time.Hour {
l.Warnln("Database compaction is taking a long time, performance may be impacted. Consider investigating and/or opening an issue if this warning repeats.")
t0 = time.Now()
}
err = b.bdb.RunValueLogGC(0.5)
}
if errors.Is(err, badger.ErrNoRewrite) {
// GC did nothing, because nothing needed to be done
return nil
}
if errors.Is(err, badger.ErrRejected) {
// GC was already running (could possibly happen), or the database
// is closed (can't happen).
return nil
}
if errors.Is(err, badger.ErrGCInMemoryMode) {
// GC in in-memory mode, which is fine.
return nil
}
return err
}
// badgerSnapshot implements backend.ReadTransaction
type badgerSnapshot struct {
txn *badger.Txn
rel *releaser
}
func (l badgerSnapshot) Get(key []byte) ([]byte, error) {
item, err := l.txn.Get(key)
if err != nil {
return nil, wrapBadgerErr(err)
}
val, err := item.ValueCopy(nil)
if err != nil {
return nil, wrapBadgerErr(err)
}
return val, nil
}
func (l badgerSnapshot) NewPrefixIterator(prefix []byte) (Iterator, error) {
return badgerPrefixIterator(l.txn, prefix), nil
}
func (l badgerSnapshot) NewRangeIterator(first, last []byte) (Iterator, error) {
return badgerRangeIterator(l.txn, first, last), nil
}
func (l badgerSnapshot) Release() {
defer l.rel.Release()
l.txn.Discard()
}
type badgerTransaction struct {
badgerSnapshot
2020-07-19 09:55:27 +03:00
txn *badger.Txn
bdb *badger.DB
rel *releaser
size int
commitHooks []CommitHook
}
func (t *badgerTransaction) Delete(key []byte) error {
t.size += len(key)
kc := make([]byte, len(key))
copy(kc, key)
return t.transactionRetried(func(txn *badger.Txn) error {
return txn.Delete(kc)
})
}
func (t *badgerTransaction) Put(key, val []byte) error {
t.size += len(key) + len(val)
kc := make([]byte, len(key))
copy(kc, key)
vc := make([]byte, len(val))
copy(vc, val)
return t.transactionRetried(func(txn *badger.Txn) error {
return txn.Set(kc, vc)
})
}
// transactionRetried performs the given operation in the current
// transaction, with commit and retry if Badger says the transaction has
// grown too large.
func (t *badgerTransaction) transactionRetried(fn func(*badger.Txn) error) error {
if err := fn(t.txn); err == badger.ErrTxnTooBig {
if err := t.txn.Commit(); err != nil {
return wrapBadgerErr(err)
}
t.size = 0
t.txn = t.bdb.NewTransaction(true)
return wrapBadgerErr(fn(t.txn))
} else if err != nil {
return wrapBadgerErr(err)
}
return nil
}
func (t *badgerTransaction) Commit() error {
defer t.rel.Release()
defer t.badgerSnapshot.Release()
2020-07-19 09:55:27 +03:00
for _, hook := range t.commitHooks {
if err := hook(t); err != nil {
return err
}
}
return wrapBadgerErr(t.txn.Commit())
}
2020-07-19 09:55:27 +03:00
func (t *badgerTransaction) Checkpoint() error {
if t.size < checkpointFlushMinSize {
return nil
}
2020-07-19 09:55:27 +03:00
for _, hook := range t.commitHooks {
if err := hook(t); err != nil {
return err
}
}
err := t.txn.Commit()
if err == nil {
t.size = 0
t.txn = t.bdb.NewTransaction(true)
}
return wrapBadgerErr(err)
}
func (t *badgerTransaction) Release() {
defer t.rel.Release()
defer t.badgerSnapshot.Release()
t.txn.Discard()
}
type badgerIterator struct {
it *badger.Iterator
prefix []byte
first []byte
last []byte
releaseFn func()
didSeek bool
err error
}
func (i *badgerIterator) Next() bool {
if i.err != nil {
return false
}
for {
if !i.didSeek {
if i.first != nil {
// Range iterator
i.it.Seek(i.first)
} else {
// Prefix iterator
i.it.Seek(i.prefix)
}
i.didSeek = true
} else {
i.it.Next()
}
if !i.it.ValidForPrefix(i.prefix) {
// Done
return false
}
if i.first == nil && i.last == nil {
// No range checks required
return true
}
key := i.it.Item().Key()
if bytes.Compare(key, i.last) > 0 {
// Key is after range last
return false
}
return true
}
}
func (i *badgerIterator) Key() []byte {
if i.err != nil {
return nil
}
return i.it.Item().Key()
}
func (i *badgerIterator) Value() []byte {
if i.err != nil {
return nil
}
val, err := i.it.Item().ValueCopy(nil)
if err != nil {
i.err = err
}
return val
}
func (i *badgerIterator) Error() error {
return wrapBadgerErr(i.err)
}
func (i *badgerIterator) Release() {
i.it.Close()
if i.releaseFn != nil {
i.releaseFn()
}
}
// wrapBadgerErr wraps errors so that the backend package can recognize them
func wrapBadgerErr(err error) error {
if err == nil {
return nil
}
if err == badger.ErrDiscardedTxn {
return &errClosed{}
}
if err == badger.ErrKeyNotFound {
return &errNotFound{}
}
return err
}
func badgerPrefixIterator(txn *badger.Txn, prefix []byte) *badgerIterator {
it := iteratorForPrefix(txn, prefix)
return &badgerIterator{it: it, prefix: prefix}
}
func badgerRangeIterator(txn *badger.Txn, first, last []byte) *badgerIterator {
prefix := commonPrefix(first, last)
it := iteratorForPrefix(txn, prefix)
return &badgerIterator{it: it, prefix: prefix, first: first, last: last}
}
func iteratorForPrefix(txn *badger.Txn, prefix []byte) *badger.Iterator {
opts := badger.DefaultIteratorOptions
opts.Prefix = prefix
return txn.NewIterator(opts)
}
func commonPrefix(a, b []byte) []byte {
minLen := len(a)
if len(b) < minLen {
minLen = len(b)
}
prefix := make([]byte, 0, minLen)
for i := 0; i < minLen; i++ {
if a[i] != b[i] {
break
}
prefix = append(prefix, a[i])
}
return prefix
}