mirror of
https://github.com/octoleo/syncthing.git
synced 2025-01-22 14:48:30 +00:00
This changes the GC mechanism so that the first pass (which reads all FileInfos to populate bloom filters with block & version hashes) can happen concurrently with normal database operations. The big gcMut still exists, and we grab it temporarily to block all other modifications while we set up the bloom filters. We then release the lock and let other things happen, with those other things also updating the bloom filters as required. Once the first phase is done we again grab the gcMut, knowing that we are the sole modifier of the database, and do the cleanup. I also removed the final compaction step.
This commit is contained in:
parent
45edad867c
commit
ce65aea0ab
@ -70,6 +70,9 @@ type Lowlevel struct {
|
||||
recheckInterval time.Duration
|
||||
oneFileSetCreated chan struct{}
|
||||
evLogger events.Logger
|
||||
|
||||
blockFilter *bloomFilter
|
||||
versionFilter *bloomFilter
|
||||
}
|
||||
|
||||
func NewLowlevel(backend backend.Backend, evLogger events.Logger, opts ...Option) (*Lowlevel, error) {
|
||||
@ -686,28 +689,30 @@ func (db *Lowlevel) gcIndirect(ctx context.Context) (err error) {
|
||||
// Indirection GC needs to run when there are no modifications to the
|
||||
// FileInfos or indirected items.
|
||||
|
||||
db.gcMut.Lock()
|
||||
defer db.gcMut.Unlock()
|
||||
l.Debugln("Starting database GC")
|
||||
|
||||
l.Debugln("Started database GC")
|
||||
// Create a new set of bloom filters, while holding the gcMut which
|
||||
// guarantees that no other modifications are happening concurrently.
|
||||
|
||||
db.gcMut.Lock()
|
||||
capacity := indirectGCBloomCapacity
|
||||
if db.gcKeyCount > capacity {
|
||||
capacity = db.gcKeyCount
|
||||
}
|
||||
db.blockFilter = newBloomFilter(capacity)
|
||||
db.versionFilter = newBloomFilter(capacity)
|
||||
db.gcMut.Unlock()
|
||||
|
||||
defer func() {
|
||||
// Forget the bloom filters on the way out.
|
||||
db.gcMut.Lock()
|
||||
db.blockFilter = nil
|
||||
db.versionFilter = nil
|
||||
db.gcMut.Unlock()
|
||||
}()
|
||||
|
||||
var discardedBlocks, matchedBlocks, discardedVersions, matchedVersions int
|
||||
|
||||
// Only print something if the process takes more than "a moment".
|
||||
logWait := make(chan struct{})
|
||||
logTimer := time.AfterFunc(10*time.Second, func() {
|
||||
l.Infoln("Database GC started - many Syncthing operations will be unresponsive until it's finished")
|
||||
close(logWait)
|
||||
})
|
||||
defer func() {
|
||||
if logTimer.Stop() || err != nil {
|
||||
return
|
||||
}
|
||||
<-logWait // Make sure messages are sent in order.
|
||||
l.Infof("Database GC done (discarded/remaining: %v/%v blocks, %v/%v versions)",
|
||||
discardedBlocks, matchedBlocks, discardedVersions, matchedVersions)
|
||||
}()
|
||||
|
||||
t, err := db.newReadWriteTransaction()
|
||||
if err != nil {
|
||||
return err
|
||||
@ -719,16 +724,13 @@ func (db *Lowlevel) gcIndirect(ctx context.Context) (err error) {
|
||||
// items. For simplicity's sake we track just one count, which is the
|
||||
// highest of the various indirected items.
|
||||
|
||||
capacity := indirectGCBloomCapacity
|
||||
if db.gcKeyCount > capacity {
|
||||
capacity = db.gcKeyCount
|
||||
}
|
||||
blockFilter := newBloomFilter(capacity)
|
||||
versionFilter := newBloomFilter(capacity)
|
||||
|
||||
// Iterate the FileInfos, unmarshal the block and version hashes and
|
||||
// add them to the filter.
|
||||
|
||||
// This happens concurrently with normal database modifications, though
|
||||
// those modifications will now also add their blocks and versions to
|
||||
// the bloom filters.
|
||||
|
||||
it, err := t.NewPrefixIterator([]byte{KeyTypeDevice})
|
||||
if err != nil {
|
||||
return err
|
||||
@ -745,18 +747,35 @@ func (db *Lowlevel) gcIndirect(ctx context.Context) (err error) {
|
||||
if err := hashes.Unmarshal(it.Value()); err != nil {
|
||||
return err
|
||||
}
|
||||
if len(hashes.BlocksHash) > 0 {
|
||||
blockFilter.add(hashes.BlocksHash)
|
||||
}
|
||||
if len(hashes.VersionHash) > 0 {
|
||||
versionFilter.add(hashes.VersionHash)
|
||||
}
|
||||
db.recordIndirectionHashes(hashes)
|
||||
}
|
||||
it.Release()
|
||||
if err := it.Error(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// For the next phase we grab the GC lock again and hold it for the rest
|
||||
// of the method call. Now there can't be any further modifications to
|
||||
// the database or the bloom filters.
|
||||
|
||||
db.gcMut.Lock()
|
||||
defer db.gcMut.Unlock()
|
||||
|
||||
// Only print something if the process takes more than "a moment".
|
||||
logWait := make(chan struct{})
|
||||
logTimer := time.AfterFunc(10*time.Second, func() {
|
||||
l.Infoln("Database GC in progress - many Syncthing operations will be unresponsive until it's finished")
|
||||
close(logWait)
|
||||
})
|
||||
defer func() {
|
||||
if logTimer.Stop() {
|
||||
return
|
||||
}
|
||||
<-logWait // Make sure messages are sent in order.
|
||||
l.Infof("Database GC complete (discarded/remaining: %v/%v blocks, %v/%v versions)",
|
||||
discardedBlocks, matchedBlocks, discardedVersions, matchedVersions)
|
||||
}()
|
||||
|
||||
// Iterate over block lists, removing keys with hashes that don't match
|
||||
// the filter.
|
||||
|
||||
@ -773,7 +792,7 @@ func (db *Lowlevel) gcIndirect(ctx context.Context) (err error) {
|
||||
}
|
||||
|
||||
key := blockListKey(it.Key())
|
||||
if blockFilter.has(key.Hash()) {
|
||||
if db.blockFilter.has(key.Hash()) {
|
||||
matchedBlocks++
|
||||
continue
|
||||
}
|
||||
@ -802,7 +821,7 @@ func (db *Lowlevel) gcIndirect(ctx context.Context) (err error) {
|
||||
}
|
||||
|
||||
key := versionKey(it.Key())
|
||||
if versionFilter.has(key.Hash()) {
|
||||
if db.versionFilter.has(key.Hash()) {
|
||||
matchedVersions++
|
||||
continue
|
||||
}
|
||||
@ -826,17 +845,31 @@ func (db *Lowlevel) gcIndirect(ctx context.Context) (err error) {
|
||||
return err
|
||||
}
|
||||
|
||||
l.Debugf("Finished GC, starting compaction (discarded/remaining: %v/%v blocks, %v/%v versions)", discardedBlocks, matchedBlocks, discardedVersions, matchedVersions)
|
||||
l.Debugf("Finished GC (discarded/remaining: %v/%v blocks, %v/%v versions)", discardedBlocks, matchedBlocks, discardedVersions, matchedVersions)
|
||||
|
||||
return db.Compact()
|
||||
return nil
|
||||
}
|
||||
|
||||
func newBloomFilter(capacity int) bloomFilter {
|
||||
func (db *Lowlevel) recordIndirectionHashesForFile(f *protocol.FileInfo) {
|
||||
db.recordIndirectionHashes(IndirectionHashesOnly{BlocksHash: f.BlocksHash, VersionHash: f.VersionHash})
|
||||
}
|
||||
|
||||
func (db *Lowlevel) recordIndirectionHashes(hs IndirectionHashesOnly) {
|
||||
// must be called with gcMut held (at least read-held)
|
||||
if db.blockFilter != nil && len(hs.BlocksHash) > 0 {
|
||||
db.blockFilter.add(hs.BlocksHash)
|
||||
}
|
||||
if db.versionFilter != nil && len(hs.VersionHash) > 0 {
|
||||
db.versionFilter.add(hs.VersionHash)
|
||||
}
|
||||
}
|
||||
|
||||
func newBloomFilter(capacity int) *bloomFilter {
|
||||
var buf [16]byte
|
||||
io.ReadFull(rand.Reader, buf[:])
|
||||
|
||||
return bloomFilter{
|
||||
f: blobloom.NewOptimized(blobloom.Config{
|
||||
return &bloomFilter{
|
||||
f: blobloom.NewSyncOptimized(blobloom.Config{
|
||||
Capacity: uint64(capacity),
|
||||
FPRate: indirectGCBloomFalsePositiveRate,
|
||||
MaxBits: 8 * indirectGCBloomMaxBytes,
|
||||
@ -848,7 +881,7 @@ func newBloomFilter(capacity int) bloomFilter {
|
||||
}
|
||||
|
||||
type bloomFilter struct {
|
||||
f *blobloom.Filter
|
||||
f *blobloom.SyncFilter
|
||||
k0, k1 uint64 // Random key for SipHash.
|
||||
}
|
||||
|
||||
|
@ -534,6 +534,11 @@ func (t *readOnlyTransaction) withNeedLocal(folder []byte, truncate bool, fn Ite
|
||||
type readWriteTransaction struct {
|
||||
backend.WriteTransaction
|
||||
readOnlyTransaction
|
||||
indirectionTracker
|
||||
}
|
||||
|
||||
type indirectionTracker interface {
|
||||
recordIndirectionHashesForFile(f *protocol.FileInfo)
|
||||
}
|
||||
|
||||
func (db *Lowlevel) newReadWriteTransaction(hooks ...backend.CommitHook) (readWriteTransaction, error) {
|
||||
@ -547,6 +552,7 @@ func (db *Lowlevel) newReadWriteTransaction(hooks ...backend.CommitHook) (readWr
|
||||
ReadTransaction: tran,
|
||||
keyer: db.keyer,
|
||||
},
|
||||
indirectionTracker: db,
|
||||
}, nil
|
||||
}
|
||||
|
||||
@ -606,6 +612,8 @@ func (t readWriteTransaction) putFile(fkey []byte, fi protocol.FileInfo) error {
|
||||
fi.VersionHash = nil
|
||||
}
|
||||
|
||||
t.indirectionTracker.recordIndirectionHashesForFile(&fi)
|
||||
|
||||
fiBs := mustMarshal(&fi)
|
||||
return t.Put(fkey, fiBs)
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user