From ce65aea0abf0b79f54b351f637ad6a15d1350ad4 Mon Sep 17 00:00:00 2001 From: Jakob Borg Date: Mon, 7 Jun 2021 10:52:06 +0200 Subject: [PATCH] lib/db: Use a more concurrent GC (fixes #7722) (#7750) This changes the GC mechanism so that the first pass (which reads all FileInfos to populate bloom filters with block & version hashes) can happen concurrently with normal database operations. The big gcMut still exists, and we grab it temporarily to block all other modifications while we set up the bloom filters. We then release the lock and let other things happen, with those other things also updating the bloom filters as required. Once the first phase is done we again grab the gcMut, knowing that we are the sole modifier of the database, and do the cleanup. I also removed the final compaction step. --- lib/db/lowlevel.go | 111 ++++++++++++++++++++++++++--------------- lib/db/transactions.go | 8 +++ 2 files changed, 80 insertions(+), 39 deletions(-) diff --git a/lib/db/lowlevel.go b/lib/db/lowlevel.go index 51608dc49..21f77fd0e 100644 --- a/lib/db/lowlevel.go +++ b/lib/db/lowlevel.go @@ -70,6 +70,9 @@ type Lowlevel struct { recheckInterval time.Duration oneFileSetCreated chan struct{} evLogger events.Logger + + blockFilter *bloomFilter + versionFilter *bloomFilter } func NewLowlevel(backend backend.Backend, evLogger events.Logger, opts ...Option) (*Lowlevel, error) { @@ -686,28 +689,30 @@ func (db *Lowlevel) gcIndirect(ctx context.Context) (err error) { // Indirection GC needs to run when there are no modifications to the // FileInfos or indirected items. - db.gcMut.Lock() - defer db.gcMut.Unlock() + l.Debugln("Starting database GC") - l.Debugln("Started database GC") + // Create a new set of bloom filters, while holding the gcMut which + // guarantees that no other modifications are happening concurrently. + + db.gcMut.Lock() + capacity := indirectGCBloomCapacity + if db.gcKeyCount > capacity { + capacity = db.gcKeyCount + } + db.blockFilter = newBloomFilter(capacity) + db.versionFilter = newBloomFilter(capacity) + db.gcMut.Unlock() + + defer func() { + // Forget the bloom filters on the way out. + db.gcMut.Lock() + db.blockFilter = nil + db.versionFilter = nil + db.gcMut.Unlock() + }() var discardedBlocks, matchedBlocks, discardedVersions, matchedVersions int - // Only print something if the process takes more than "a moment". - logWait := make(chan struct{}) - logTimer := time.AfterFunc(10*time.Second, func() { - l.Infoln("Database GC started - many Syncthing operations will be unresponsive until it's finished") - close(logWait) - }) - defer func() { - if logTimer.Stop() || err != nil { - return - } - <-logWait // Make sure messages are sent in order. - l.Infof("Database GC done (discarded/remaining: %v/%v blocks, %v/%v versions)", - discardedBlocks, matchedBlocks, discardedVersions, matchedVersions) - }() - t, err := db.newReadWriteTransaction() if err != nil { return err @@ -719,16 +724,13 @@ func (db *Lowlevel) gcIndirect(ctx context.Context) (err error) { // items. For simplicity's sake we track just one count, which is the // highest of the various indirected items. - capacity := indirectGCBloomCapacity - if db.gcKeyCount > capacity { - capacity = db.gcKeyCount - } - blockFilter := newBloomFilter(capacity) - versionFilter := newBloomFilter(capacity) - // Iterate the FileInfos, unmarshal the block and version hashes and // add them to the filter. + // This happens concurrently with normal database modifications, though + // those modifications will now also add their blocks and versions to + // the bloom filters. + it, err := t.NewPrefixIterator([]byte{KeyTypeDevice}) if err != nil { return err @@ -745,18 +747,35 @@ func (db *Lowlevel) gcIndirect(ctx context.Context) (err error) { if err := hashes.Unmarshal(it.Value()); err != nil { return err } - if len(hashes.BlocksHash) > 0 { - blockFilter.add(hashes.BlocksHash) - } - if len(hashes.VersionHash) > 0 { - versionFilter.add(hashes.VersionHash) - } + db.recordIndirectionHashes(hashes) } it.Release() if err := it.Error(); err != nil { return err } + // For the next phase we grab the GC lock again and hold it for the rest + // of the method call. Now there can't be any further modifications to + // the database or the bloom filters. + + db.gcMut.Lock() + defer db.gcMut.Unlock() + + // Only print something if the process takes more than "a moment". + logWait := make(chan struct{}) + logTimer := time.AfterFunc(10*time.Second, func() { + l.Infoln("Database GC in progress - many Syncthing operations will be unresponsive until it's finished") + close(logWait) + }) + defer func() { + if logTimer.Stop() { + return + } + <-logWait // Make sure messages are sent in order. + l.Infof("Database GC complete (discarded/remaining: %v/%v blocks, %v/%v versions)", + discardedBlocks, matchedBlocks, discardedVersions, matchedVersions) + }() + // Iterate over block lists, removing keys with hashes that don't match // the filter. @@ -773,7 +792,7 @@ func (db *Lowlevel) gcIndirect(ctx context.Context) (err error) { } key := blockListKey(it.Key()) - if blockFilter.has(key.Hash()) { + if db.blockFilter.has(key.Hash()) { matchedBlocks++ continue } @@ -802,7 +821,7 @@ func (db *Lowlevel) gcIndirect(ctx context.Context) (err error) { } key := versionKey(it.Key()) - if versionFilter.has(key.Hash()) { + if db.versionFilter.has(key.Hash()) { matchedVersions++ continue } @@ -826,17 +845,31 @@ func (db *Lowlevel) gcIndirect(ctx context.Context) (err error) { return err } - l.Debugf("Finished GC, starting compaction (discarded/remaining: %v/%v blocks, %v/%v versions)", discardedBlocks, matchedBlocks, discardedVersions, matchedVersions) + l.Debugf("Finished GC (discarded/remaining: %v/%v blocks, %v/%v versions)", discardedBlocks, matchedBlocks, discardedVersions, matchedVersions) - return db.Compact() + return nil } -func newBloomFilter(capacity int) bloomFilter { +func (db *Lowlevel) recordIndirectionHashesForFile(f *protocol.FileInfo) { + db.recordIndirectionHashes(IndirectionHashesOnly{BlocksHash: f.BlocksHash, VersionHash: f.VersionHash}) +} + +func (db *Lowlevel) recordIndirectionHashes(hs IndirectionHashesOnly) { + // must be called with gcMut held (at least read-held) + if db.blockFilter != nil && len(hs.BlocksHash) > 0 { + db.blockFilter.add(hs.BlocksHash) + } + if db.versionFilter != nil && len(hs.VersionHash) > 0 { + db.versionFilter.add(hs.VersionHash) + } +} + +func newBloomFilter(capacity int) *bloomFilter { var buf [16]byte io.ReadFull(rand.Reader, buf[:]) - return bloomFilter{ - f: blobloom.NewOptimized(blobloom.Config{ + return &bloomFilter{ + f: blobloom.NewSyncOptimized(blobloom.Config{ Capacity: uint64(capacity), FPRate: indirectGCBloomFalsePositiveRate, MaxBits: 8 * indirectGCBloomMaxBytes, @@ -848,7 +881,7 @@ func newBloomFilter(capacity int) bloomFilter { } type bloomFilter struct { - f *blobloom.Filter + f *blobloom.SyncFilter k0, k1 uint64 // Random key for SipHash. } diff --git a/lib/db/transactions.go b/lib/db/transactions.go index 4054a85d3..7e41b9655 100644 --- a/lib/db/transactions.go +++ b/lib/db/transactions.go @@ -534,6 +534,11 @@ func (t *readOnlyTransaction) withNeedLocal(folder []byte, truncate bool, fn Ite type readWriteTransaction struct { backend.WriteTransaction readOnlyTransaction + indirectionTracker +} + +type indirectionTracker interface { + recordIndirectionHashesForFile(f *protocol.FileInfo) } func (db *Lowlevel) newReadWriteTransaction(hooks ...backend.CommitHook) (readWriteTransaction, error) { @@ -547,6 +552,7 @@ func (db *Lowlevel) newReadWriteTransaction(hooks ...backend.CommitHook) (readWr ReadTransaction: tran, keyer: db.keyer, }, + indirectionTracker: db, }, nil } @@ -606,6 +612,8 @@ func (t readWriteTransaction) putFile(fkey []byte, fi protocol.FileInfo) error { fi.VersionHash = nil } + t.indirectionTracker.recordIndirectionHashesForFile(&fi) + fiBs := mustMarshal(&fi) return t.Put(fkey, fiBs) }