2020-05-29 12:43:02 +01:00
// Copyright (C) 2019 The Syncthing Authors.
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at https://mozilla.org/MPL/2.0/.
package backend
import (
"bytes"
"errors"
"time"
badger "github.com/dgraph-io/badger/v2"
)
const (
checkpointFlushMinSize = 128 << KiB
maxCacheSize = 64 << MiB
)
func OpenBadger ( path string ) ( Backend , error ) {
opts := badger . DefaultOptions ( path )
opts = opts . WithMaxCacheSize ( maxCacheSize ) . WithCompactL0OnClose ( false )
opts . Logger = nil
2020-09-10 10:54:41 +02:00
backend , err := openBadger ( opts )
if err != nil {
return nil , err
}
backend . location = path
return backend , nil
2020-05-29 12:43:02 +01:00
}
func OpenBadgerMemory ( ) Backend {
opts := badger . DefaultOptions ( "" ) . WithInMemory ( true )
opts . Logger = nil
backend , err := openBadger ( opts )
if err != nil {
// Opening in-memory should never be able to fail, and is anyway
// used just by tests.
panic ( err )
}
return backend
}
2020-09-10 10:54:41 +02:00
func openBadger ( opts badger . Options ) ( * badgerBackend , error ) {
2020-05-29 12:43:02 +01:00
// XXX: We should find good values for memory utilization in the "small"
// and "large" cases we support for LevelDB. Some notes here:
// https://github.com/dgraph-io/badger/tree/v2.0.3#memory-usage
bdb , err := badger . Open ( opts )
if err != nil {
return nil , wrapBadgerErr ( err )
}
return & badgerBackend {
bdb : bdb ,
closeWG : & closeWaitGroup { } ,
} , nil
}
// badgerBackend implements Backend on top of a badger
type badgerBackend struct {
2020-09-10 10:54:41 +02:00
bdb * badger . DB
closeWG * closeWaitGroup
location string
2020-05-29 12:43:02 +01:00
}
func ( b * badgerBackend ) NewReadTransaction ( ) ( ReadTransaction , error ) {
rel , err := newReleaser ( b . closeWG )
if err != nil {
return nil , err
}
return badgerSnapshot {
txn : b . bdb . NewTransaction ( false ) ,
rel : rel ,
} , nil
}
2020-07-19 09:55:27 +03:00
func ( b * badgerBackend ) NewWriteTransaction ( hooks ... CommitHook ) ( WriteTransaction , error ) {
2020-05-29 12:43:02 +01:00
rel1 , err := newReleaser ( b . closeWG )
if err != nil {
return nil , err
}
rel2 , err := newReleaser ( b . closeWG )
if err != nil {
rel1 . Release ( )
return nil , err
}
// We use two transactions here to preserve the property that our
// leveldb wrapper has, that writes in a transaction are completely
// invisible until it's committed, even inside that same transaction.
rtxn := b . bdb . NewTransaction ( false )
wtxn := b . bdb . NewTransaction ( true )
return & badgerTransaction {
badgerSnapshot : badgerSnapshot {
txn : rtxn ,
rel : rel1 ,
} ,
2020-07-19 09:55:27 +03:00
txn : wtxn ,
bdb : b . bdb ,
rel : rel2 ,
commitHooks : hooks ,
2020-05-29 12:43:02 +01:00
} , nil
}
func ( b * badgerBackend ) Close ( ) error {
b . closeWG . CloseWait ( )
return wrapBadgerErr ( b . bdb . Close ( ) )
}
func ( b * badgerBackend ) Get ( key [ ] byte ) ( [ ] byte , error ) {
if err := b . closeWG . Add ( 1 ) ; err != nil {
return nil , err
}
defer b . closeWG . Done ( )
txn := b . bdb . NewTransaction ( false )
defer txn . Discard ( )
item , err := txn . Get ( key )
if err != nil {
return nil , wrapBadgerErr ( err )
}
val , err := item . ValueCopy ( nil )
if err != nil {
return nil , wrapBadgerErr ( err )
}
return val , nil
}
func ( b * badgerBackend ) NewPrefixIterator ( prefix [ ] byte ) ( Iterator , error ) {
if err := b . closeWG . Add ( 1 ) ; err != nil {
return nil , err
}
txn := b . bdb . NewTransaction ( false )
it := badgerPrefixIterator ( txn , prefix )
it . releaseFn = func ( ) {
defer b . closeWG . Done ( )
txn . Discard ( )
}
return it , nil
}
func ( b * badgerBackend ) NewRangeIterator ( first , last [ ] byte ) ( Iterator , error ) {
if err := b . closeWG . Add ( 1 ) ; err != nil {
return nil , err
}
txn := b . bdb . NewTransaction ( false )
it := badgerRangeIterator ( txn , first , last )
it . releaseFn = func ( ) {
defer b . closeWG . Done ( )
txn . Discard ( )
}
return it , nil
}
func ( b * badgerBackend ) Put ( key , val [ ] byte ) error {
if err := b . closeWG . Add ( 1 ) ; err != nil {
return err
}
defer b . closeWG . Done ( )
txn := b . bdb . NewTransaction ( true )
if err := txn . Set ( key , val ) ; err != nil {
txn . Discard ( )
return wrapBadgerErr ( err )
}
return wrapBadgerErr ( txn . Commit ( ) )
}
func ( b * badgerBackend ) Delete ( key [ ] byte ) error {
if err := b . closeWG . Add ( 1 ) ; err != nil {
return err
}
defer b . closeWG . Done ( )
txn := b . bdb . NewTransaction ( true )
if err := txn . Delete ( key ) ; err != nil {
txn . Discard ( )
return wrapBadgerErr ( err )
}
return wrapBadgerErr ( txn . Commit ( ) )
}
func ( b * badgerBackend ) Compact ( ) error {
if err := b . closeWG . Add ( 1 ) ; err != nil {
return err
}
defer b . closeWG . Done ( )
// This weird looking loop is as recommended in the README
// (https://github.com/dgraph-io/badger/tree/v2.0.3#garbage-collection).
// Basically, the RunValueLogGC will pick some promising thing to
// garbage collect at random and return nil if it improved the
// situation, then return ErrNoRewrite when there is nothing more to GC.
// The 0.5 is the discard ratio, for which the method docs say they
// "recommend setting discardRatio to 0.5, thus indicating that a file
// be rewritten if half the space can be discarded".
var err error
t0 := time . Now ( )
for err == nil {
if time . Since ( t0 ) > time . Hour {
l . Warnln ( "Database compaction is taking a long time, performance may be impacted. Consider investigating and/or opening an issue if this warning repeats." )
t0 = time . Now ( )
}
err = b . bdb . RunValueLogGC ( 0.5 )
}
if errors . Is ( err , badger . ErrNoRewrite ) {
// GC did nothing, because nothing needed to be done
return nil
}
if errors . Is ( err , badger . ErrRejected ) {
// GC was already running (could possibly happen), or the database
// is closed (can't happen).
return nil
}
if errors . Is ( err , badger . ErrGCInMemoryMode ) {
// GC in in-memory mode, which is fine.
return nil
}
return err
}
2020-09-10 10:54:41 +02:00
func ( b * badgerBackend ) Location ( ) string {
return b . location
}
2020-05-29 12:43:02 +01:00
// badgerSnapshot implements backend.ReadTransaction
type badgerSnapshot struct {
txn * badger . Txn
rel * releaser
}
func ( l badgerSnapshot ) Get ( key [ ] byte ) ( [ ] byte , error ) {
item , err := l . txn . Get ( key )
if err != nil {
return nil , wrapBadgerErr ( err )
}
val , err := item . ValueCopy ( nil )
if err != nil {
return nil , wrapBadgerErr ( err )
}
return val , nil
}
func ( l badgerSnapshot ) NewPrefixIterator ( prefix [ ] byte ) ( Iterator , error ) {
return badgerPrefixIterator ( l . txn , prefix ) , nil
}
func ( l badgerSnapshot ) NewRangeIterator ( first , last [ ] byte ) ( Iterator , error ) {
return badgerRangeIterator ( l . txn , first , last ) , nil
}
func ( l badgerSnapshot ) Release ( ) {
defer l . rel . Release ( )
l . txn . Discard ( )
}
type badgerTransaction struct {
badgerSnapshot
2020-07-19 09:55:27 +03:00
txn * badger . Txn
bdb * badger . DB
rel * releaser
size int
commitHooks [ ] CommitHook
2020-05-29 12:43:02 +01:00
}
func ( t * badgerTransaction ) Delete ( key [ ] byte ) error {
t . size += len ( key )
kc := make ( [ ] byte , len ( key ) )
copy ( kc , key )
return t . transactionRetried ( func ( txn * badger . Txn ) error {
return txn . Delete ( kc )
} )
}
func ( t * badgerTransaction ) Put ( key , val [ ] byte ) error {
t . size += len ( key ) + len ( val )
kc := make ( [ ] byte , len ( key ) )
copy ( kc , key )
vc := make ( [ ] byte , len ( val ) )
copy ( vc , val )
return t . transactionRetried ( func ( txn * badger . Txn ) error {
return txn . Set ( kc , vc )
} )
}
// transactionRetried performs the given operation in the current
// transaction, with commit and retry if Badger says the transaction has
// grown too large.
func ( t * badgerTransaction ) transactionRetried ( fn func ( * badger . Txn ) error ) error {
if err := fn ( t . txn ) ; err == badger . ErrTxnTooBig {
if err := t . txn . Commit ( ) ; err != nil {
return wrapBadgerErr ( err )
}
t . size = 0
t . txn = t . bdb . NewTransaction ( true )
return wrapBadgerErr ( fn ( t . txn ) )
} else if err != nil {
return wrapBadgerErr ( err )
}
return nil
}
func ( t * badgerTransaction ) Commit ( ) error {
defer t . rel . Release ( )
defer t . badgerSnapshot . Release ( )
2020-07-19 09:55:27 +03:00
for _ , hook := range t . commitHooks {
if err := hook ( t ) ; err != nil {
return err
}
}
2020-05-29 12:43:02 +01:00
return wrapBadgerErr ( t . txn . Commit ( ) )
}
2020-07-19 09:55:27 +03:00
func ( t * badgerTransaction ) Checkpoint ( ) error {
2020-05-29 12:43:02 +01:00
if t . size < checkpointFlushMinSize {
return nil
}
2020-07-19 09:55:27 +03:00
for _ , hook := range t . commitHooks {
if err := hook ( t ) ; err != nil {
2020-05-29 12:43:02 +01:00
return err
}
}
err := t . txn . Commit ( )
if err == nil {
t . size = 0
t . txn = t . bdb . NewTransaction ( true )
}
return wrapBadgerErr ( err )
}
func ( t * badgerTransaction ) Release ( ) {
defer t . rel . Release ( )
defer t . badgerSnapshot . Release ( )
t . txn . Discard ( )
}
type badgerIterator struct {
it * badger . Iterator
prefix [ ] byte
first [ ] byte
last [ ] byte
releaseFn func ( )
2020-09-07 10:31:33 +02:00
released bool
2020-05-29 12:43:02 +01:00
didSeek bool
err error
}
func ( i * badgerIterator ) Next ( ) bool {
if i . err != nil {
return false
}
for {
if ! i . didSeek {
if i . first != nil {
// Range iterator
i . it . Seek ( i . first )
} else {
// Prefix iterator
i . it . Seek ( i . prefix )
}
i . didSeek = true
} else {
i . it . Next ( )
}
if ! i . it . ValidForPrefix ( i . prefix ) {
// Done
return false
}
if i . first == nil && i . last == nil {
// No range checks required
return true
}
key := i . it . Item ( ) . Key ( )
if bytes . Compare ( key , i . last ) > 0 {
// Key is after range last
return false
}
return true
}
}
func ( i * badgerIterator ) Key ( ) [ ] byte {
if i . err != nil {
return nil
}
return i . it . Item ( ) . Key ( )
}
func ( i * badgerIterator ) Value ( ) [ ] byte {
if i . err != nil {
return nil
}
val , err := i . it . Item ( ) . ValueCopy ( nil )
if err != nil {
i . err = err
}
return val
}
func ( i * badgerIterator ) Error ( ) error {
return wrapBadgerErr ( i . err )
}
func ( i * badgerIterator ) Release ( ) {
2020-09-07 10:31:33 +02:00
if i . released {
// We already closed this iterator, no need to do it again
// (and the releaseFn might hang if we do).
return
}
i . released = true
2020-05-29 12:43:02 +01:00
i . it . Close ( )
if i . releaseFn != nil {
i . releaseFn ( )
}
}
// wrapBadgerErr wraps errors so that the backend package can recognize them
func wrapBadgerErr ( err error ) error {
if err == nil {
return nil
}
if err == badger . ErrDiscardedTxn {
2020-06-16 09:27:34 +02:00
return & errClosed { }
2020-05-29 12:43:02 +01:00
}
if err == badger . ErrKeyNotFound {
2020-06-16 09:27:34 +02:00
return & errNotFound { }
2020-05-29 12:43:02 +01:00
}
return err
}
func badgerPrefixIterator ( txn * badger . Txn , prefix [ ] byte ) * badgerIterator {
it := iteratorForPrefix ( txn , prefix )
return & badgerIterator { it : it , prefix : prefix }
}
func badgerRangeIterator ( txn * badger . Txn , first , last [ ] byte ) * badgerIterator {
prefix := commonPrefix ( first , last )
it := iteratorForPrefix ( txn , prefix )
return & badgerIterator { it : it , prefix : prefix , first : first , last : last }
}
func iteratorForPrefix ( txn * badger . Txn , prefix [ ] byte ) * badger . Iterator {
opts := badger . DefaultIteratorOptions
opts . Prefix = prefix
return txn . NewIterator ( opts )
}
func commonPrefix ( a , b [ ] byte ) [ ] byte {
minLen := len ( a )
if len ( b ) < minLen {
minLen = len ( b )
}
prefix := make ( [ ] byte , 0 , minLen )
for i := 0 ; i < minLen ; i ++ {
if a [ i ] != b [ i ] {
break
}
prefix = append ( prefix , a [ i ] )
}
return prefix
}