Merge pull request #1775 from calmh/fix-1765

Trigger pull check on remote index updates (fixes #1765)
This commit is contained in:
Audrius Butkevicius 2015-05-08 11:45:03 +03:00
commit 5528b6c231
3 changed files with 48 additions and 19 deletions

View File

@ -50,6 +50,7 @@ type service interface {
Jobs() ([]string, []string) // In progress, Queued
BringToFront(string)
DelayScan(d time.Duration)
IndexUpdated() // Remote index was updated notification
setState(state folderState)
setError(err error)
@ -469,8 +470,15 @@ func (m *Model) Index(deviceID protocol.DeviceID, folder string, fs []protocol.F
m.fmut.RLock()
files, ok := m.folderFiles[folder]
runner := m.folderRunners[folder]
m.fmut.RUnlock()
if runner != nil {
// Runner may legitimately not be set if this is the "cleanup" Index
// message at startup.
defer runner.IndexUpdated()
}
if !ok {
l.Fatalf("Index for nonexistant folder %q", folder)
}
@ -521,7 +529,8 @@ func (m *Model) IndexUpdate(deviceID protocol.DeviceID, folder string, fs []prot
}
m.fmut.RLock()
files, ok := m.folderFiles[folder]
files := m.folderFiles[folder]
runner, ok := m.folderRunners[folder]
m.fmut.RUnlock()
if !ok {
@ -554,6 +563,8 @@ func (m *Model) IndexUpdate(deviceID protocol.DeviceID, folder string, fs []prot
"items": len(fs),
"version": files.LocalVersion(deviceID),
})
runner.IndexUpdated()
}
func (m *Model) folderSharedWith(folder string, deviceID protocol.DeviceID) bool {

View File

@ -104,6 +104,9 @@ func (s *roFolder) Stop() {
close(s.stop)
}
func (s *roFolder) IndexUpdated() {
}
func (s *roFolder) String() string {
return fmt.Sprintf("roFolder/%s@%p", s.folder, s)
}

View File

@ -32,7 +32,7 @@ import (
const (
pauseIntv = 60 * time.Second
nextPullIntv = 10 * time.Second
checkPullIntv = 1 * time.Second
shortPullIntv = 5 * time.Second
)
// A pullBlockState is passed to the puller routine for each block that needs
@ -77,6 +77,7 @@ type rwFolder struct {
scanTimer *time.Timer
pullTimer *time.Timer
delayScan chan time.Duration
remoteIndex chan struct{} // An index update was received, we should re-evaluate needs
}
func newRWFolder(m *Model, shortID uint64, cfg config.FolderConfiguration) *rwFolder {
@ -101,9 +102,10 @@ func newRWFolder(m *Model, shortID uint64, cfg config.FolderConfiguration) *rwFo
stop: make(chan struct{}),
queue: newJobQueue(),
pullTimer: time.NewTimer(checkPullIntv),
pullTimer: time.NewTimer(shortPullIntv),
scanTimer: time.NewTimer(time.Millisecond), // The first scan should be done immediately.
delayScan: make(chan time.Duration),
remoteIndex: make(chan struct{}, 1), // This needs to be 1-buffered so that we queue a notification if we're busy doing a pull when it comes.
}
}
@ -149,11 +151,13 @@ func (p *rwFolder) Serve() {
case <-p.stop:
return
// TODO: We could easily add a channel here for notifications from
// Index(), so that we immediately start a pull when new index
// information is available. Before that though, I'd like to build a
// repeatable benchmark of how long it takes to sync a change from
// device A to device B, so we have something to work against.
case <-p.remoteIndex:
prevVer = 0
p.pullTimer.Reset(shortPullIntv)
if debug {
l.Debugln(p, "remote index updated, rescheduling pull")
}
case <-p.pullTimer.C:
if !initialScanCompleted {
if debug {
@ -183,7 +187,7 @@ func (p *rwFolder) Serve() {
if debug {
l.Debugln(p, "skip (curVer == prevVer)", prevVer)
}
p.pullTimer.Reset(checkPullIntv)
p.pullTimer.Reset(nextPullIntv)
continue
}
@ -282,6 +286,17 @@ func (p *rwFolder) Stop() {
close(p.stop)
}
func (p *rwFolder) IndexUpdated() {
select {
case p.remoteIndex <- struct{}{}:
default:
// We might be busy doing a pull and thus not reading from this
// channel. The channel is 1-buffered, so one notification will be
// queued to ensure we recheck after the pull, but beyond that we must
// make sure to not block index receiving.
}
}
func (p *rwFolder) String() string {
return fmt.Sprintf("rwFolder/%s@%p", p.folder, p)
}