mirror of
https://github.com/octoleo/syncthing.git
synced 2024-12-22 10:58:57 +00:00
lib/db: Rework flush hooks (#6838)
This commit is contained in:
parent
851ee51c1b
commit
55147f5901
@ -15,6 +15,8 @@ import (
|
||||
"github.com/syncthing/syncthing/lib/locations"
|
||||
)
|
||||
|
||||
type CommitHook func(WriteTransaction) error
|
||||
|
||||
// The Reader interface specifies the read-only operations available on the
|
||||
// main database and on read-only transactions (snapshots). Note that when
|
||||
// called directly on the database handle these operations may take implicit
|
||||
@ -61,7 +63,7 @@ type ReadTransaction interface {
|
||||
type WriteTransaction interface {
|
||||
ReadTransaction
|
||||
Writer
|
||||
Checkpoint(...func() error) error
|
||||
Checkpoint() error
|
||||
Commit() error
|
||||
}
|
||||
|
||||
@ -108,7 +110,7 @@ type Backend interface {
|
||||
Reader
|
||||
Writer
|
||||
NewReadTransaction() (ReadTransaction, error)
|
||||
NewWriteTransaction() (WriteTransaction, error)
|
||||
NewWriteTransaction(hooks ...CommitHook) (WriteTransaction, error)
|
||||
Close() error
|
||||
Compact() error
|
||||
}
|
||||
|
@ -69,7 +69,7 @@ func (b *badgerBackend) NewReadTransaction() (ReadTransaction, error) {
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (b *badgerBackend) NewWriteTransaction() (WriteTransaction, error) {
|
||||
func (b *badgerBackend) NewWriteTransaction(hooks ...CommitHook) (WriteTransaction, error) {
|
||||
rel1, err := newReleaser(b.closeWG)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -90,9 +90,10 @@ func (b *badgerBackend) NewWriteTransaction() (WriteTransaction, error) {
|
||||
txn: rtxn,
|
||||
rel: rel1,
|
||||
},
|
||||
txn: wtxn,
|
||||
bdb: b.bdb,
|
||||
rel: rel2,
|
||||
txn: wtxn,
|
||||
bdb: b.bdb,
|
||||
rel: rel2,
|
||||
commitHooks: hooks,
|
||||
}, nil
|
||||
}
|
||||
|
||||
@ -249,10 +250,11 @@ func (l badgerSnapshot) Release() {
|
||||
|
||||
type badgerTransaction struct {
|
||||
badgerSnapshot
|
||||
txn *badger.Txn
|
||||
bdb *badger.DB
|
||||
rel *releaser
|
||||
size int
|
||||
txn *badger.Txn
|
||||
bdb *badger.DB
|
||||
rel *releaser
|
||||
size int
|
||||
commitHooks []CommitHook
|
||||
}
|
||||
|
||||
func (t *badgerTransaction) Delete(key []byte) error {
|
||||
@ -295,15 +297,20 @@ func (t *badgerTransaction) transactionRetried(fn func(*badger.Txn) error) error
|
||||
func (t *badgerTransaction) Commit() error {
|
||||
defer t.rel.Release()
|
||||
defer t.badgerSnapshot.Release()
|
||||
for _, hook := range t.commitHooks {
|
||||
if err := hook(t); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return wrapBadgerErr(t.txn.Commit())
|
||||
}
|
||||
|
||||
func (t *badgerTransaction) Checkpoint(preFlush ...func() error) error {
|
||||
func (t *badgerTransaction) Checkpoint() error {
|
||||
if t.size < checkpointFlushMinSize {
|
||||
return nil
|
||||
}
|
||||
for _, hook := range preFlush {
|
||||
if err := hook(); err != nil {
|
||||
for _, hook := range t.commitHooks {
|
||||
if err := hook(t); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
@ -59,7 +59,7 @@ func (b *leveldbBackend) newSnapshot() (leveldbSnapshot, error) {
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (b *leveldbBackend) NewWriteTransaction() (WriteTransaction, error) {
|
||||
func (b *leveldbBackend) NewWriteTransaction(hooks ...CommitHook) (WriteTransaction, error) {
|
||||
rel, err := newReleaser(b.closeWG)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -74,6 +74,7 @@ func (b *leveldbBackend) NewWriteTransaction() (WriteTransaction, error) {
|
||||
ldb: b.ldb,
|
||||
batch: new(leveldb.Batch),
|
||||
rel: rel,
|
||||
commitHooks: hooks,
|
||||
}, nil
|
||||
}
|
||||
|
||||
@ -142,9 +143,10 @@ func (l leveldbSnapshot) Release() {
|
||||
// an actual leveldb transaction)
|
||||
type leveldbTransaction struct {
|
||||
leveldbSnapshot
|
||||
ldb *leveldb.DB
|
||||
batch *leveldb.Batch
|
||||
rel *releaser
|
||||
ldb *leveldb.DB
|
||||
batch *leveldb.Batch
|
||||
rel *releaser
|
||||
commitHooks []CommitHook
|
||||
}
|
||||
|
||||
func (t *leveldbTransaction) Delete(key []byte) error {
|
||||
@ -157,8 +159,8 @@ func (t *leveldbTransaction) Put(key, val []byte) error {
|
||||
return t.checkFlush(dbFlushBatchMax)
|
||||
}
|
||||
|
||||
func (t *leveldbTransaction) Checkpoint(preFlush ...func() error) error {
|
||||
return t.checkFlush(dbFlushBatchMin, preFlush...)
|
||||
func (t *leveldbTransaction) Checkpoint() error {
|
||||
return t.checkFlush(dbFlushBatchMin)
|
||||
}
|
||||
|
||||
func (t *leveldbTransaction) Commit() error {
|
||||
@ -174,19 +176,19 @@ func (t *leveldbTransaction) Release() {
|
||||
}
|
||||
|
||||
// checkFlush flushes and resets the batch if its size exceeds the given size.
|
||||
func (t *leveldbTransaction) checkFlush(size int, preFlush ...func() error) error {
|
||||
func (t *leveldbTransaction) checkFlush(size int) error {
|
||||
if len(t.batch.Dump()) < size {
|
||||
return nil
|
||||
}
|
||||
for _, hook := range preFlush {
|
||||
if err := hook(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return t.flush()
|
||||
}
|
||||
|
||||
func (t *leveldbTransaction) flush() error {
|
||||
for _, hook := range t.commitHooks {
|
||||
if err := hook(t); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if t.batch.Len() == 0 {
|
||||
return nil
|
||||
}
|
||||
|
@ -640,7 +640,7 @@ func TestGCIndirect(t *testing.T) {
|
||||
|
||||
db := NewLowlevel(backend.OpenMemory())
|
||||
defer db.Close()
|
||||
meta := newMetadataTracker()
|
||||
meta := newMetadataTracker(db.keyer)
|
||||
|
||||
// Add three files with different block lists
|
||||
|
||||
|
@ -117,7 +117,7 @@ func (db *Lowlevel) updateRemoteFiles(folder, device []byte, fs []protocol.FileI
|
||||
db.gcMut.RLock()
|
||||
defer db.gcMut.RUnlock()
|
||||
|
||||
t, err := db.newReadWriteTransaction()
|
||||
t, err := db.newReadWriteTransaction(meta.CommitHook(folder))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -162,17 +162,11 @@ func (db *Lowlevel) updateRemoteFiles(folder, device []byte, fs []protocol.FileI
|
||||
return err
|
||||
}
|
||||
|
||||
if err := t.Checkpoint(func() error {
|
||||
return meta.toDB(t, folder)
|
||||
}); err != nil {
|
||||
if err := t.Checkpoint(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if err := meta.toDB(t, folder); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return t.Commit()
|
||||
}
|
||||
|
||||
@ -182,7 +176,7 @@ func (db *Lowlevel) updateLocalFiles(folder []byte, fs []protocol.FileInfo, meta
|
||||
db.gcMut.RLock()
|
||||
defer db.gcMut.RUnlock()
|
||||
|
||||
t, err := db.newReadWriteTransaction()
|
||||
t, err := db.newReadWriteTransaction(meta.CommitHook(folder))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -290,17 +284,11 @@ func (db *Lowlevel) updateLocalFiles(folder []byte, fs []protocol.FileInfo, meta
|
||||
}
|
||||
}
|
||||
|
||||
if err := t.Checkpoint(func() error {
|
||||
return meta.toDB(t, folder)
|
||||
}); err != nil {
|
||||
if err := t.Checkpoint(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if err := meta.toDB(t, folder); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return t.Commit()
|
||||
}
|
||||
|
||||
@ -830,7 +818,7 @@ func (db *Lowlevel) getMetaAndCheck(folder string) *metadataTracker {
|
||||
}
|
||||
|
||||
func (db *Lowlevel) loadMetadataTracker(folder string) *metadataTracker {
|
||||
meta := newMetadataTracker()
|
||||
meta := newMetadataTracker(db.keyer)
|
||||
if err := meta.fromDB(db, []byte(folder)); err != nil {
|
||||
if err == errMetaInconsistent {
|
||||
l.Infof("Stored folder metadata for %q is inconsistent; recalculating", folder)
|
||||
@ -856,7 +844,7 @@ func (db *Lowlevel) loadMetadataTracker(folder string) *metadataTracker {
|
||||
}
|
||||
|
||||
func (db *Lowlevel) recalcMeta(folder string) (*metadataTracker, error) {
|
||||
meta := newMetadataTracker()
|
||||
meta := newMetadataTracker(db.keyer)
|
||||
if err := db.checkGlobals([]byte(folder)); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -944,7 +932,7 @@ func (db *Lowlevel) verifyLocalSequence(curSeq int64, folder string) bool {
|
||||
// match those in the corresponding file entries. It returns the amount of fixed
|
||||
// entries.
|
||||
func (db *Lowlevel) repairSequenceGCLocked(folderStr string, meta *metadataTracker) (int, error) {
|
||||
t, err := db.newReadWriteTransaction()
|
||||
t, err := db.newReadWriteTransaction(meta.CommitHook([]byte(folderStr)))
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
@ -996,9 +984,7 @@ func (db *Lowlevel) repairSequenceGCLocked(folderStr string, meta *metadataTrack
|
||||
return 0, err
|
||||
}
|
||||
}
|
||||
if err := t.Checkpoint(func() error {
|
||||
return meta.toDB(t, folder)
|
||||
}); err != nil {
|
||||
if err := t.Checkpoint(); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
}
|
||||
@ -1045,10 +1031,6 @@ func (db *Lowlevel) repairSequenceGCLocked(folderStr string, meta *metadataTrack
|
||||
|
||||
it.Release()
|
||||
|
||||
if err := meta.toDB(t, folder); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
return fixed, t.Commit()
|
||||
}
|
||||
|
||||
|
@ -12,6 +12,7 @@ import (
|
||||
"math/bits"
|
||||
"time"
|
||||
|
||||
"github.com/syncthing/syncthing/lib/db/backend"
|
||||
"github.com/syncthing/syncthing/lib/protocol"
|
||||
"github.com/syncthing/syncthing/lib/sync"
|
||||
)
|
||||
@ -25,6 +26,7 @@ type countsMap struct {
|
||||
|
||||
// metadataTracker keeps metadata on a per device, per local flag basis.
|
||||
type metadataTracker struct {
|
||||
keyer keyer
|
||||
countsMap
|
||||
mut sync.RWMutex
|
||||
dirty bool
|
||||
@ -37,9 +39,10 @@ type metaKey struct {
|
||||
|
||||
const needFlag uint32 = 1 << 31 // Last bit, as early ones are local flags
|
||||
|
||||
func newMetadataTracker() *metadataTracker {
|
||||
func newMetadataTracker(keyer keyer) *metadataTracker {
|
||||
return &metadataTracker{
|
||||
mut: sync.NewRWMutex(),
|
||||
keyer: keyer,
|
||||
mut: sync.NewRWMutex(),
|
||||
countsMap: countsMap{
|
||||
indexes: make(map[metaKey]int),
|
||||
},
|
||||
@ -69,10 +72,16 @@ func (m *metadataTracker) Marshal() ([]byte, error) {
|
||||
return m.counts.Marshal()
|
||||
}
|
||||
|
||||
func (m *metadataTracker) CommitHook(folder []byte) backend.CommitHook {
|
||||
return func(t backend.WriteTransaction) error {
|
||||
return m.toDB(t, folder)
|
||||
}
|
||||
}
|
||||
|
||||
// toDB saves the marshalled metadataTracker to the given db, under the key
|
||||
// corresponding to the given folder
|
||||
func (m *metadataTracker) toDB(t readWriteTransaction, folder []byte) error {
|
||||
key, err := t.keyer.GenerateFolderMetaKey(nil, folder)
|
||||
func (m *metadataTracker) toDB(t backend.WriteTransaction, folder []byte) error {
|
||||
key, err := m.keyer.GenerateFolderMetaKey(nil, folder)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -52,7 +52,7 @@ func TestEachFlagBit(t *testing.T) {
|
||||
func TestMetaDevices(t *testing.T) {
|
||||
d1 := protocol.DeviceID{1}
|
||||
d2 := protocol.DeviceID{2}
|
||||
meta := newMetadataTracker()
|
||||
meta := newMetadataTracker(nil)
|
||||
|
||||
meta.addFile(d1, protocol.FileInfo{Sequence: 1})
|
||||
meta.addFile(d1, protocol.FileInfo{Sequence: 2, LocalFlags: 1})
|
||||
@ -85,7 +85,7 @@ func TestMetaDevices(t *testing.T) {
|
||||
|
||||
func TestMetaSequences(t *testing.T) {
|
||||
d1 := protocol.DeviceID{1}
|
||||
meta := newMetadataTracker()
|
||||
meta := newMetadataTracker(nil)
|
||||
|
||||
meta.addFile(d1, protocol.FileInfo{Sequence: 1})
|
||||
meta.addFile(d1, protocol.FileInfo{Sequence: 2, RawInvalid: true})
|
||||
|
@ -516,8 +516,8 @@ type readWriteTransaction struct {
|
||||
readOnlyTransaction
|
||||
}
|
||||
|
||||
func (db *Lowlevel) newReadWriteTransaction() (readWriteTransaction, error) {
|
||||
tran, err := db.NewWriteTransaction()
|
||||
func (db *Lowlevel) newReadWriteTransaction(hooks ...backend.CommitHook) (readWriteTransaction, error) {
|
||||
tran, err := db.NewWriteTransaction(hooks...)
|
||||
if err != nil {
|
||||
return readWriteTransaction{}, err
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user