From 6a840a040bd128e910d9f1cac87c6d2b58c65a83 Mon Sep 17 00:00:00 2001 From: Jakob Borg Date: Thu, 13 Feb 2020 15:23:08 +0100 Subject: [PATCH] lib/db: Keep metadata better in sync (ref #6335) (#6337) This adds metadata updates to the same write batch as the underlying file change. The odds of a metadata update going missing is greatly reduced. Bonus change: actually commit the transaction in recalcMeta. --- lib/db/backend/backend.go | 7 ++++++- lib/db/backend/leveldb_backend.go | 11 ++++++++--- lib/db/lowlevel.go | 16 ++++++++++++++-- lib/db/meta.go | 6 +++--- lib/db/set.go | 28 ++++++++++++++++++++-------- 5 files changed, 51 insertions(+), 17 deletions(-) diff --git a/lib/db/backend/backend.go b/lib/db/backend/backend.go index e587c073b..3320da03c 100644 --- a/lib/db/backend/backend.go +++ b/lib/db/backend/backend.go @@ -48,10 +48,15 @@ type ReadTransaction interface { // purposes of saving memory when transactions are in-RAM. Note that // transactions may be checkpointed *anyway* even if this is not called, due to // resource constraints, but this gives you a chance to decide when. +// +// Functions can be passed to Checkpoint. These are run if and only if the +// checkpoint will result in a flush, and will run before the flush. The +// transaction can be accessed via a closure. If an error is returned from +// these functions the flush will be aborted and the error bubbled. type WriteTransaction interface { ReadTransaction Writer - Checkpoint() error + Checkpoint(...func() error) error Commit() error } diff --git a/lib/db/backend/leveldb_backend.go b/lib/db/backend/leveldb_backend.go index cc073f720..c45cf9dd7 100644 --- a/lib/db/backend/leveldb_backend.go +++ b/lib/db/backend/leveldb_backend.go @@ -150,8 +150,8 @@ func (t *leveldbTransaction) Put(key, val []byte) error { return t.checkFlush(dbFlushBatchMax) } -func (t *leveldbTransaction) Checkpoint() error { - return t.checkFlush(dbFlushBatchMin) +func (t *leveldbTransaction) Checkpoint(preFlush ...func() error) error { + return t.checkFlush(dbFlushBatchMin, preFlush...) } func (t *leveldbTransaction) Commit() error { @@ -167,10 +167,15 @@ func (t *leveldbTransaction) Release() { } // checkFlush flushes and resets the batch if its size exceeds the given size. -func (t *leveldbTransaction) checkFlush(size int) error { +func (t *leveldbTransaction) checkFlush(size int, preFlush ...func() error) error { if len(t.batch.Dump()) < size { return nil } + for _, hook := range preFlush { + if err := hook(); err != nil { + return err + } + } return t.flush() } diff --git a/lib/db/lowlevel.go b/lib/db/lowlevel.go index 3d83f3b43..17f4d0413 100644 --- a/lib/db/lowlevel.go +++ b/lib/db/lowlevel.go @@ -124,11 +124,17 @@ func (db *Lowlevel) updateRemoteFiles(folder, device []byte, fs []protocol.FileI return err } - if err := t.Checkpoint(); err != nil { + if err := t.Checkpoint(func() error { + return meta.toDB(t, folder) + }); err != nil { return err } } + if err := meta.toDB(t, folder); err != nil { + return err + } + return t.Commit() } @@ -227,11 +233,17 @@ func (db *Lowlevel) updateLocalFiles(folder []byte, fs []protocol.FileInfo, meta } } - if err := t.Checkpoint(); err != nil { + if err := t.Checkpoint(func() error { + return meta.toDB(t, folder) + }); err != nil { return err } } + if err := meta.toDB(t, folder); err != nil { + return err + } + return t.Commit() } diff --git a/lib/db/meta.go b/lib/db/meta.go index 157fc6de8..0225d8925 100644 --- a/lib/db/meta.go +++ b/lib/db/meta.go @@ -62,8 +62,8 @@ func (m *metadataTracker) Marshal() ([]byte, error) { // toDB saves the marshalled metadataTracker to the given db, under the key // corresponding to the given folder -func (m *metadataTracker) toDB(db *Lowlevel, folder []byte) error { - key, err := db.keyer.GenerateFolderMetaKey(nil, folder) +func (m *metadataTracker) toDB(t readWriteTransaction, folder []byte) error { + key, err := t.keyer.GenerateFolderMetaKey(nil, folder) if err != nil { return err } @@ -79,7 +79,7 @@ func (m *metadataTracker) toDB(db *Lowlevel, folder []byte) error { if err != nil { return err } - err = db.Put(key, bs) + err = t.Put(key, bs) if err == nil { m.dirty = false } diff --git a/lib/db/set.go b/lib/db/set.go index fae21e009..5a8d94cde 100644 --- a/lib/db/set.go +++ b/lib/db/set.go @@ -117,6 +117,8 @@ func (s *FileSet) recalcMeta() error { if err != nil { return err } + defer t.close() + var deviceID protocol.DeviceID err = t.withAllFolderTruncated([]byte(s.folder), func(device []byte, f FileInfoTruncated) bool { copy(deviceID[:], device) @@ -128,7 +130,10 @@ func (s *FileSet) recalcMeta() error { } s.meta.SetCreated() - return s.meta.toDB(s.db, []byte(s.folder)) + if err := s.meta.toDB(t, []byte(s.folder)); err != nil { + return err + } + return t.Commit() } // Verify the local sequence number from actual sequence entries. Returns @@ -184,7 +189,20 @@ func (s *FileSet) Drop(device protocol.DeviceID) { s.meta.resetAll(device) } - if err := s.meta.toDB(s.db, []byte(s.folder)); backend.IsClosed(err) { + t, err := s.db.newReadWriteTransaction() + if backend.IsClosed(err) { + return + } else if err != nil { + panic(err) + } + defer t.close() + + if err := s.meta.toDB(t, []byte(s.folder)); backend.IsClosed(err) { + return + } else if err != nil { + panic(err) + } + if err := t.Commit(); backend.IsClosed(err) { return } else if err != nil { panic(err) @@ -202,12 +220,6 @@ func (s *FileSet) Update(device protocol.DeviceID, fs []protocol.FileInfo) { s.updateMutex.Lock() defer s.updateMutex.Unlock() - defer func() { - if err := s.meta.toDB(s.db, []byte(s.folder)); err != nil && !backend.IsClosed(err) { - panic(err) - } - }() - if device == protocol.LocalDeviceID { // For the local device we have a bunch of metadata to track. if err := s.db.updateLocalFiles([]byte(s.folder), fs, s.meta); err != nil && !backend.IsClosed(err) {