From 245bd1eb17e83c7ca1630ee699187b1242f9d99b Mon Sep 17 00:00:00 2001 From: Jakob Borg Date: Thu, 7 May 2015 22:45:07 +0200 Subject: [PATCH] Trigger pull check on remote index updates (fixes #1765) Without this, when an index update comes in we only do a new pull if the remote `localVersion` was increased. But it may not be, because the index is sent alphabetically and the file with the highest local version may come first. In that case we'll never do a new pull when the rest of the index comes in, and we'll be stuck in idle but with lots of out of sync data. --- internal/model/model.go | 13 +++++++++- internal/model/rofolder.go | 3 +++ internal/model/rwfolder.go | 51 ++++++++++++++++++++++++-------------- 3 files changed, 48 insertions(+), 19 deletions(-) diff --git a/internal/model/model.go b/internal/model/model.go index a5bb03e84..64ce76b3f 100644 --- a/internal/model/model.go +++ b/internal/model/model.go @@ -50,6 +50,7 @@ type service interface { Jobs() ([]string, []string) // In progress, Queued BringToFront(string) DelayScan(d time.Duration) + IndexUpdated() // Remote index was updated notification setState(state folderState) setError(err error) @@ -469,8 +470,15 @@ func (m *Model) Index(deviceID protocol.DeviceID, folder string, fs []protocol.F m.fmut.RLock() files, ok := m.folderFiles[folder] + runner := m.folderRunners[folder] m.fmut.RUnlock() + if runner != nil { + // Runner may legitimately not be set if this is the "cleanup" Index + // message at startup. + defer runner.IndexUpdated() + } + if !ok { l.Fatalf("Index for nonexistant folder %q", folder) } @@ -521,7 +529,8 @@ func (m *Model) IndexUpdate(deviceID protocol.DeviceID, folder string, fs []prot } m.fmut.RLock() - files, ok := m.folderFiles[folder] + files := m.folderFiles[folder] + runner, ok := m.folderRunners[folder] m.fmut.RUnlock() if !ok { @@ -554,6 +563,8 @@ func (m *Model) IndexUpdate(deviceID protocol.DeviceID, folder string, fs []prot "items": len(fs), "version": files.LocalVersion(deviceID), }) + + runner.IndexUpdated() } func (m *Model) folderSharedWith(folder string, deviceID protocol.DeviceID) bool { diff --git a/internal/model/rofolder.go b/internal/model/rofolder.go index ae97d3a5a..b74acbf5d 100644 --- a/internal/model/rofolder.go +++ b/internal/model/rofolder.go @@ -104,6 +104,9 @@ func (s *roFolder) Stop() { close(s.stop) } +func (s *roFolder) IndexUpdated() { +} + func (s *roFolder) String() string { return fmt.Sprintf("roFolder/%s@%p", s.folder, s) } diff --git a/internal/model/rwfolder.go b/internal/model/rwfolder.go index 6fc103399..eb5b14c1e 100644 --- a/internal/model/rwfolder.go +++ b/internal/model/rwfolder.go @@ -32,7 +32,7 @@ import ( const ( pauseIntv = 60 * time.Second nextPullIntv = 10 * time.Second - checkPullIntv = 1 * time.Second + shortPullIntv = 5 * time.Second ) // A pullBlockState is passed to the puller routine for each block that needs @@ -71,12 +71,13 @@ type rwFolder struct { shortID uint64 order config.PullOrder - stop chan struct{} - queue *jobQueue - dbUpdates chan protocol.FileInfo - scanTimer *time.Timer - pullTimer *time.Timer - delayScan chan time.Duration + stop chan struct{} + queue *jobQueue + dbUpdates chan protocol.FileInfo + scanTimer *time.Timer + pullTimer *time.Timer + delayScan chan time.Duration + remoteIndex chan struct{} // An index update was received, we should re-evaluate needs } func newRWFolder(m *Model, shortID uint64, cfg config.FolderConfiguration) *rwFolder { @@ -99,11 +100,12 @@ func newRWFolder(m *Model, shortID uint64, cfg config.FolderConfiguration) *rwFo shortID: shortID, order: cfg.Order, - stop: make(chan struct{}), - queue: newJobQueue(), - pullTimer: time.NewTimer(checkPullIntv), - scanTimer: time.NewTimer(time.Millisecond), // The first scan should be done immediately. - delayScan: make(chan time.Duration), + stop: make(chan struct{}), + queue: newJobQueue(), + pullTimer: time.NewTimer(shortPullIntv), + scanTimer: time.NewTimer(time.Millisecond), // The first scan should be done immediately. + delayScan: make(chan time.Duration), + remoteIndex: make(chan struct{}, 1), // This needs to be 1-buffered so that we queue a notification if we're busy doing a pull when it comes. } } @@ -149,11 +151,13 @@ func (p *rwFolder) Serve() { case <-p.stop: return - // TODO: We could easily add a channel here for notifications from - // Index(), so that we immediately start a pull when new index - // information is available. Before that though, I'd like to build a - // repeatable benchmark of how long it takes to sync a change from - // device A to device B, so we have something to work against. + case <-p.remoteIndex: + prevVer = 0 + p.pullTimer.Reset(shortPullIntv) + if debug { + l.Debugln(p, "remote index updated, rescheduling pull") + } + case <-p.pullTimer.C: if !initialScanCompleted { if debug { @@ -183,7 +187,7 @@ func (p *rwFolder) Serve() { if debug { l.Debugln(p, "skip (curVer == prevVer)", prevVer) } - p.pullTimer.Reset(checkPullIntv) + p.pullTimer.Reset(nextPullIntv) continue } @@ -282,6 +286,17 @@ func (p *rwFolder) Stop() { close(p.stop) } +func (p *rwFolder) IndexUpdated() { + select { + case p.remoteIndex <- struct{}{}: + default: + // We might be busy doing a pull and thus not reading from this + // channel. The channel is 1-buffered, so one notification will be + // queued to ensure we recheck after the pull, but beyond that we must + // make sure to not block index receiving. + } +} + func (p *rwFolder) String() string { return fmt.Sprintf("rwFolder/%s@%p", p.folder, p) }