From f86deedd9c6efa3e436dc5fc84a94fcd47483ec2 Mon Sep 17 00:00:00 2001 From: Audrius Butkevicius Date: Fri, 1 May 2020 08:54:15 +0100 Subject: [PATCH] lib/model: Progress emitter network operations dont need to be blocking (#6589) * lib/model: Progress emitter network operations dont need to be blocking * Do sends outside of the lock --- lib/model/progressemitter.go | 37 ++++++++++++++++++++++++++----- lib/model/progressemitter_test.go | 8 +++++-- 2 files changed, 38 insertions(+), 7 deletions(-) diff --git a/lib/model/progressemitter.go b/lib/model/progressemitter.go index d08179372..263674008 100644 --- a/lib/model/progressemitter.go +++ b/lib/model/progressemitter.go @@ -37,6 +37,16 @@ type ProgressEmitter struct { timer *time.Timer } +type progressUpdate struct { + conn protocol.Connection + folder string + updates []protocol.FileDownloadProgressUpdate +} + +func (p progressUpdate) send(ctx context.Context) { + p.conn.DownloadProgress(ctx, p.folder, p.updates) +} + // NewProgressEmitter creates a new progress emitter which emits // DownloadProgress events every interval. func NewProgressEmitter(cfg config.Wrapper, evLogger events.Logger) *ProgressEmitter { @@ -76,6 +86,7 @@ func (t *ProgressEmitter) serve(ctx context.Context) { newLastUpdated := lastUpdate newCount = t.lenRegistryLocked() + var progressUpdates []progressUpdate for _, pullers := range t.registry { for _, puller := range pullers { if updated := puller.Updated(); updated.After(newLastUpdated) { @@ -88,9 +99,7 @@ func (t *ProgressEmitter) serve(ctx context.Context) { lastUpdate = newLastUpdated lastCount = newCount t.sendDownloadProgressEventLocked() - if len(t.connections) > 0 { - t.sendDownloadProgressMessagesLocked(ctx) - } + progressUpdates = t.computeProgressUpdates() } else { l.Debugln("progress emitter: nothing new") } @@ -99,6 +108,17 @@ func (t *ProgressEmitter) serve(ctx context.Context) { t.timer.Reset(t.interval) } t.mut.Unlock() + + // Do the sending outside of the lock. + // If these send block, the whole process of reporting progress to others stops, but that's probably fine. + // It's better to stop this component from working under back-pressure than causing other components that + // rely on this component to be waiting for locks. + // + // This might leave remote peers in some funky state where we are unable the fact that we no longer have + // something, but there is not much we can do here. + for _, update := range progressUpdates { + update.send(ctx) + } } } } @@ -118,7 +138,8 @@ func (t *ProgressEmitter) sendDownloadProgressEventLocked() { l.Debugf("progress emitter: emitting %#v", output) } -func (t *ProgressEmitter) sendDownloadProgressMessagesLocked(ctx context.Context) { +func (t *ProgressEmitter) computeProgressUpdates() []progressUpdate { + var progressUpdates []progressUpdate for id, conn := range t.connections { for _, folder := range t.foldersByConns[id] { pullers, ok := t.registry[folder] @@ -149,7 +170,11 @@ func (t *ProgressEmitter) sendDownloadProgressMessagesLocked(ctx context.Context updates := state.update(folder, activePullers) if len(updates) > 0 { - conn.DownloadProgress(ctx, folder, updates) + progressUpdates = append(progressUpdates, progressUpdate{ + conn: conn, + folder: folder, + updates: updates, + }) } } } @@ -189,6 +214,8 @@ func (t *ProgressEmitter) sendDownloadProgressMessagesLocked(ctx context.Context // } } } + + return progressUpdates } // VerifyConfiguration implements the config.Committer interface diff --git a/lib/model/progressemitter_test.go b/lib/model/progressemitter_test.go index b8ce2826b..bbe6792f1 100644 --- a/lib/model/progressemitter_test.go +++ b/lib/model/progressemitter_test.go @@ -461,6 +461,10 @@ func TestSendDownloadProgressMessages(t *testing.T) { func sendMsgs(p *ProgressEmitter) { p.mut.Lock() - defer p.mut.Unlock() - p.sendDownloadProgressMessagesLocked(context.Background()) + updates := p.computeProgressUpdates() + p.mut.Unlock() + ctx := context.Background() + for _, update := range updates { + update.send(ctx) + } }