diff --git a/lib/db/lowlevel.go b/lib/db/lowlevel.go index 51608dc49..21f77fd0e 100644 --- a/lib/db/lowlevel.go +++ b/lib/db/lowlevel.go @@ -70,6 +70,9 @@ type Lowlevel struct { recheckInterval time.Duration oneFileSetCreated chan struct{} evLogger events.Logger + + blockFilter *bloomFilter + versionFilter *bloomFilter } func NewLowlevel(backend backend.Backend, evLogger events.Logger, opts ...Option) (*Lowlevel, error) { @@ -686,28 +689,30 @@ func (db *Lowlevel) gcIndirect(ctx context.Context) (err error) { // Indirection GC needs to run when there are no modifications to the // FileInfos or indirected items. - db.gcMut.Lock() - defer db.gcMut.Unlock() + l.Debugln("Starting database GC") - l.Debugln("Started database GC") + // Create a new set of bloom filters, while holding the gcMut which + // guarantees that no other modifications are happening concurrently. + + db.gcMut.Lock() + capacity := indirectGCBloomCapacity + if db.gcKeyCount > capacity { + capacity = db.gcKeyCount + } + db.blockFilter = newBloomFilter(capacity) + db.versionFilter = newBloomFilter(capacity) + db.gcMut.Unlock() + + defer func() { + // Forget the bloom filters on the way out. + db.gcMut.Lock() + db.blockFilter = nil + db.versionFilter = nil + db.gcMut.Unlock() + }() var discardedBlocks, matchedBlocks, discardedVersions, matchedVersions int - // Only print something if the process takes more than "a moment". - logWait := make(chan struct{}) - logTimer := time.AfterFunc(10*time.Second, func() { - l.Infoln("Database GC started - many Syncthing operations will be unresponsive until it's finished") - close(logWait) - }) - defer func() { - if logTimer.Stop() || err != nil { - return - } - <-logWait // Make sure messages are sent in order. - l.Infof("Database GC done (discarded/remaining: %v/%v blocks, %v/%v versions)", - discardedBlocks, matchedBlocks, discardedVersions, matchedVersions) - }() - t, err := db.newReadWriteTransaction() if err != nil { return err @@ -719,16 +724,13 @@ func (db *Lowlevel) gcIndirect(ctx context.Context) (err error) { // items. For simplicity's sake we track just one count, which is the // highest of the various indirected items. - capacity := indirectGCBloomCapacity - if db.gcKeyCount > capacity { - capacity = db.gcKeyCount - } - blockFilter := newBloomFilter(capacity) - versionFilter := newBloomFilter(capacity) - // Iterate the FileInfos, unmarshal the block and version hashes and // add them to the filter. + // This happens concurrently with normal database modifications, though + // those modifications will now also add their blocks and versions to + // the bloom filters. + it, err := t.NewPrefixIterator([]byte{KeyTypeDevice}) if err != nil { return err @@ -745,18 +747,35 @@ func (db *Lowlevel) gcIndirect(ctx context.Context) (err error) { if err := hashes.Unmarshal(it.Value()); err != nil { return err } - if len(hashes.BlocksHash) > 0 { - blockFilter.add(hashes.BlocksHash) - } - if len(hashes.VersionHash) > 0 { - versionFilter.add(hashes.VersionHash) - } + db.recordIndirectionHashes(hashes) } it.Release() if err := it.Error(); err != nil { return err } + // For the next phase we grab the GC lock again and hold it for the rest + // of the method call. Now there can't be any further modifications to + // the database or the bloom filters. + + db.gcMut.Lock() + defer db.gcMut.Unlock() + + // Only print something if the process takes more than "a moment". + logWait := make(chan struct{}) + logTimer := time.AfterFunc(10*time.Second, func() { + l.Infoln("Database GC in progress - many Syncthing operations will be unresponsive until it's finished") + close(logWait) + }) + defer func() { + if logTimer.Stop() { + return + } + <-logWait // Make sure messages are sent in order. + l.Infof("Database GC complete (discarded/remaining: %v/%v blocks, %v/%v versions)", + discardedBlocks, matchedBlocks, discardedVersions, matchedVersions) + }() + // Iterate over block lists, removing keys with hashes that don't match // the filter. @@ -773,7 +792,7 @@ func (db *Lowlevel) gcIndirect(ctx context.Context) (err error) { } key := blockListKey(it.Key()) - if blockFilter.has(key.Hash()) { + if db.blockFilter.has(key.Hash()) { matchedBlocks++ continue } @@ -802,7 +821,7 @@ func (db *Lowlevel) gcIndirect(ctx context.Context) (err error) { } key := versionKey(it.Key()) - if versionFilter.has(key.Hash()) { + if db.versionFilter.has(key.Hash()) { matchedVersions++ continue } @@ -826,17 +845,31 @@ func (db *Lowlevel) gcIndirect(ctx context.Context) (err error) { return err } - l.Debugf("Finished GC, starting compaction (discarded/remaining: %v/%v blocks, %v/%v versions)", discardedBlocks, matchedBlocks, discardedVersions, matchedVersions) + l.Debugf("Finished GC (discarded/remaining: %v/%v blocks, %v/%v versions)", discardedBlocks, matchedBlocks, discardedVersions, matchedVersions) - return db.Compact() + return nil } -func newBloomFilter(capacity int) bloomFilter { +func (db *Lowlevel) recordIndirectionHashesForFile(f *protocol.FileInfo) { + db.recordIndirectionHashes(IndirectionHashesOnly{BlocksHash: f.BlocksHash, VersionHash: f.VersionHash}) +} + +func (db *Lowlevel) recordIndirectionHashes(hs IndirectionHashesOnly) { + // must be called with gcMut held (at least read-held) + if db.blockFilter != nil && len(hs.BlocksHash) > 0 { + db.blockFilter.add(hs.BlocksHash) + } + if db.versionFilter != nil && len(hs.VersionHash) > 0 { + db.versionFilter.add(hs.VersionHash) + } +} + +func newBloomFilter(capacity int) *bloomFilter { var buf [16]byte io.ReadFull(rand.Reader, buf[:]) - return bloomFilter{ - f: blobloom.NewOptimized(blobloom.Config{ + return &bloomFilter{ + f: blobloom.NewSyncOptimized(blobloom.Config{ Capacity: uint64(capacity), FPRate: indirectGCBloomFalsePositiveRate, MaxBits: 8 * indirectGCBloomMaxBytes, @@ -848,7 +881,7 @@ func newBloomFilter(capacity int) bloomFilter { } type bloomFilter struct { - f *blobloom.Filter + f *blobloom.SyncFilter k0, k1 uint64 // Random key for SipHash. } diff --git a/lib/db/transactions.go b/lib/db/transactions.go index 4054a85d3..7e41b9655 100644 --- a/lib/db/transactions.go +++ b/lib/db/transactions.go @@ -534,6 +534,11 @@ func (t *readOnlyTransaction) withNeedLocal(folder []byte, truncate bool, fn Ite type readWriteTransaction struct { backend.WriteTransaction readOnlyTransaction + indirectionTracker +} + +type indirectionTracker interface { + recordIndirectionHashesForFile(f *protocol.FileInfo) } func (db *Lowlevel) newReadWriteTransaction(hooks ...backend.CommitHook) (readWriteTransaction, error) { @@ -547,6 +552,7 @@ func (db *Lowlevel) newReadWriteTransaction(hooks ...backend.CommitHook) (readWr ReadTransaction: tran, keyer: db.keyer, }, + indirectionTracker: db, }, nil } @@ -606,6 +612,8 @@ func (t readWriteTransaction) putFile(fkey []byte, fi protocol.FileInfo) error { fi.VersionHash = nil } + t.indirectionTracker.recordIndirectionHashesForFile(&fi) + fiBs := mustMarshal(&fi) return t.Put(fkey, fiBs) }