diff --git a/lib/config/folderconfiguration.go b/lib/config/folderconfiguration.go index 18fb1c6a1..13401b57d 100644 --- a/lib/config/folderconfiguration.go +++ b/lib/config/folderconfiguration.go @@ -42,7 +42,6 @@ type FolderConfiguration struct { Order PullOrder `xml:"order" json:"order"` IgnoreDelete bool `xml:"ignoreDelete" json:"ignoreDelete"` ScanProgressIntervalS int `xml:"scanProgressIntervalS" json:"scanProgressIntervalS"` // Set to a negative value to disable. Value of 0 will get replaced with value of 2 (default value) - PullerSleepS int `xml:"pullerSleepS" json:"pullerSleepS"` PullerPauseS int `xml:"pullerPauseS" json:"pullerPauseS"` MaxConflicts int `xml:"maxConflicts" json:"maxConflicts"` DisableSparseFiles bool `xml:"disableSparseFiles" json:"disableSparseFiles"` diff --git a/lib/model/folder.go b/lib/model/folder.go index ce7459e9f..6dc4c2915 100644 --- a/lib/model/folder.go +++ b/lib/model/folder.go @@ -50,8 +50,7 @@ func (f *folder) DelayScan(next time.Duration) { f.scan.Delay(next) } -func (f *folder) IndexUpdated() { -} +func (f *folder) IndexUpdated() {} func (f *folder) IgnoresUpdated() { if f.FSWatcherEnabled { @@ -59,6 +58,8 @@ func (f *folder) IgnoresUpdated() { } } +func (f *folder) SchedulePull() {} + func (f *folder) Jobs() ([]string, []string) { return nil, nil } diff --git a/lib/model/model.go b/lib/model/model.go index 9c9b1c6a1..2678a150b 100644 --- a/lib/model/model.go +++ b/lib/model/model.go @@ -47,8 +47,9 @@ const ( type service interface { BringToFront(string) DelayScan(d time.Duration) - IndexUpdated() // Remote index was updated notification - IgnoresUpdated() // ignore matcher was updated notification + IndexUpdated() // Remote index was updated notification + IgnoresUpdated() // ignore matcher was updated notification + SchedulePull() Jobs() ([]string, []string) // In progress, Queued Scan(subs []string) error Serve() @@ -813,7 +814,7 @@ func (m *Model) Index(deviceID protocol.DeviceID, folder string, fs []protocol.F if runner != nil { // Runner may legitimately not be set if this is the "cleanup" Index // message at startup. - defer runner.IndexUpdated() + defer runner.SchedulePull() } m.pmut.RLock() @@ -862,7 +863,7 @@ func (m *Model) IndexUpdate(deviceID protocol.DeviceID, folder string, fs []prot "version": files.Sequence(deviceID), }) - runner.IndexUpdated() + runner.SchedulePull() } func (m *Model) folderSharedWith(folder string, deviceID protocol.DeviceID) bool { @@ -1002,7 +1003,7 @@ func (m *Model) ClusterConfig(deviceID protocol.DeviceID, cm protocol.ClusterCon // that we need to pull so let the folder runner know // that it should recheck the index data. if runner := m.folderRunners[folder.ID]; runner != nil { - defer runner.IndexUpdated() + defer runner.SchedulePull() } } } @@ -1853,7 +1854,6 @@ func (m *Model) internalScanFolderSubdirs(ctx context.Context, folder string, su // Check if the ignore patterns changed as part of scanning this folder. // If they did we should schedule a pull of the folder so that we // request things we might have suddenly become unignored and so on. - oldHash := ignores.Hash() defer func() { if ignores.Hash() != oldHash { @@ -1879,6 +1879,8 @@ func (m *Model) internalScanFolderSubdirs(ctx context.Context, folder string, su return err } + defer runner.SchedulePull() + // Clean the list of subitems to ensure that we start at a known // directory, and don't scan subdirectories of things we've already // scanned. diff --git a/lib/model/requests_test.go b/lib/model/requests_test.go index bfd0bc49f..580601a15 100644 --- a/lib/model/requests_test.go +++ b/lib/model/requests_test.go @@ -216,7 +216,6 @@ func TestRequestVersioningSymlinkAttack(t *testing.T) { cfg := defaultConfig.RawCopy() cfg.Folders[0] = config.NewFolderConfiguration("default", fs.FilesystemTypeBasic, "_tmpfolder") - cfg.Folders[0].PullerSleepS = 1 cfg.Folders[0].Devices = []config.FolderDeviceConfiguration{ {DeviceID: device1}, {DeviceID: device2}, @@ -289,7 +288,6 @@ func TestRequestVersioningSymlinkAttack(t *testing.T) { func setupModelWithConnection() (*Model, *fakeConnection) { cfg := defaultConfig.RawCopy() cfg.Folders[0] = config.NewFolderConfiguration("default", fs.FilesystemTypeBasic, "_tmpfolder") - cfg.Folders[0].PullerSleepS = 1 cfg.Folders[0].Devices = []config.FolderDeviceConfiguration{ {DeviceID: device1}, {DeviceID: device2}, diff --git a/lib/model/rwfolder.go b/lib/model/rwfolder.go index e524354f5..5ff00bc7d 100644 --- a/lib/model/rwfolder.go +++ b/lib/model/rwfolder.go @@ -67,10 +67,10 @@ const ( ) const ( - defaultCopiers = 2 - defaultPullers = 64 - defaultPullerSleep = 10 * time.Second - defaultPullerPause = 60 * time.Second + defaultCopiers = 2 + defaultPullers = 64 + defaultPullerPause = 60 * time.Second + maxPullerIterations = 3 ) type dbUpdateJob struct { @@ -83,13 +83,11 @@ type sendReceiveFolder struct { fs fs.Filesystem versioner versioner.Versioner - sleep time.Duration pause time.Duration - queue *jobQueue - dbUpdates chan dbUpdateJob - pullTimer *time.Timer - remoteIndex chan struct{} // An index update was received, we should re-evaluate needs + queue *jobQueue + dbUpdates chan dbUpdateJob + pullScheduled chan struct{} errors map[string]string // path -> error string errorsMut sync.Mutex @@ -105,9 +103,8 @@ func newSendReceiveFolder(model *Model, cfg config.FolderConfiguration, ver vers fs: fs, versioner: ver, - queue: newJobQueue(), - pullTimer: time.NewTimer(time.Second), - remoteIndex: make(chan struct{}, 1), // This needs to be 1-buffered so that we queue a notification if we're busy doing a pull when it comes. + queue: newJobQueue(), + pullScheduled: make(chan struct{}, 1), // This needs to be 1-buffered so that we queue a pull if we're busy when it comes. errorsMut: sync.NewMutex(), @@ -128,17 +125,7 @@ func (f *sendReceiveFolder) configureCopiersAndPullers() { f.Pullers = defaultPullers } - if f.PullerPauseS == 0 { - f.pause = defaultPullerPause - } else { - f.pause = time.Duration(f.PullerPauseS) * time.Second - } - - if f.PullerSleepS == 0 { - f.sleep = defaultPullerSleep - } else { - f.sleep = time.Duration(f.PullerSleepS) * time.Second - } + f.pause = f.basePause() } // Helper function to check whether either the ignorePerm flag has been @@ -155,14 +142,16 @@ func (f *sendReceiveFolder) Serve() { defer l.Debugln(f, "exiting") defer func() { - f.pullTimer.Stop() f.scan.timer.Stop() // TODO: Should there be an actual FolderStopped state? f.setState(FolderIdle) }() - var prevSec int64 + var prevSeq int64 var prevIgnoreHash string + var success bool + pullFailTimer := time.NewTimer(time.Duration(0)) + <-pullFailTimer.C if f.FSWatcherEnabled && f.CheckHealth() == nil { f.startWatch() @@ -173,102 +162,27 @@ func (f *sendReceiveFolder) Serve() { case <-f.ctx.Done(): return - case <-f.remoteIndex: - prevSec = 0 - f.pullTimer.Reset(0) - l.Debugln(f, "remote index updated, rescheduling pull") - - case <-f.pullTimer.C: + case <-f.pullScheduled: + pullFailTimer.Stop() select { - case <-f.initialScanFinished: + case <-pullFailTimer.C: default: - // We don't start pulling files until a scan has been completed. - l.Debugln(f, "skip (initial)") - f.pullTimer.Reset(f.sleep) - continue } - f.model.fmut.RLock() - curIgnores := f.model.folderIgnores[f.folderID] - f.model.fmut.RUnlock() - - if newHash := curIgnores.Hash(); newHash != prevIgnoreHash { - // The ignore patterns have changed. We need to re-evaluate if - // there are files we need now that were ignored before. - l.Debugln(f, "ignore patterns have changed, resetting prevVer") - prevSec = 0 - prevIgnoreHash = newHash + if prevSeq, prevIgnoreHash, success = f.pull(prevSeq, prevIgnoreHash); !success { + // Pulling failed, try again later. + pullFailTimer.Reset(f.pause) } - // RemoteSequence() is a fast call, doesn't touch the database. - curSeq, ok := f.model.RemoteSequence(f.folderID) - if !ok || curSeq == prevSec { - l.Debugln(f, "skip (curSeq == prevSeq)", prevSec, ok) - f.pullTimer.Reset(f.sleep) - continue - } - - if err := f.CheckHealth(); err != nil { - l.Debugln("Skipping pull of", f.Description(), "due to folder error:", err) - f.pullTimer.Reset(f.sleep) - continue - } - - l.Debugln(f, "pulling", prevSec, curSeq) - - f.setState(FolderSyncing) - f.clearErrors() - tries := 0 - - for { - tries++ - - changed := f.pullerIteration(curIgnores) - l.Debugln(f, "changed", changed) - - if changed == 0 { - // No files were changed by the puller, so we are in - // sync. Remember the local version number and - // schedule a resync a little bit into the future. - - if lv, ok := f.model.RemoteSequence(f.folderID); ok && lv < curSeq { - // There's a corner case where the device we needed - // files from disconnected during the puller - // iteration. The files will have been removed from - // the index, so we've concluded that we don't need - // them, but at the same time we have the local - // version that includes those files in curVer. So we - // catch the case that sequence might have - // decreased here. - l.Debugln(f, "adjusting curVer", lv) - curSeq = lv - } - prevSec = curSeq - l.Debugln(f, "next pull in", f.sleep) - f.pullTimer.Reset(f.sleep) - break - } - - if tries > 2 { - // We've tried a bunch of times to get in sync, but - // we're not making it. Probably there are write - // errors preventing us. Flag this with a warning and - // wait a bit longer before retrying. - if folderErrors := f.currentErrors(); len(folderErrors) > 0 { - events.Default.Log(events.FolderErrors, map[string]interface{}{ - "folder": f.folderID, - "errors": folderErrors, - }) - } - - l.Infof("Folder %v isn't making progress. Pausing puller for %v.", f.Description(), f.pause) - l.Debugln(f, "next pull in", f.pause) - - f.pullTimer.Reset(f.pause) - break + case <-pullFailTimer.C: + if prevSeq, prevIgnoreHash, success = f.pull(prevSeq, prevIgnoreHash); !success { + // Pulling failed, try again later. + pullFailTimer.Reset(f.pause) + // Back off from retrying to pull with an upper limit. + if f.pause < 60*f.basePause() { + f.pause *= 2 } } - f.setState(FolderIdle) // The reason for running the scanner from within the puller is that // this is the easiest way to make sure we are not doing both at the @@ -293,9 +207,9 @@ func (f *sendReceiveFolder) Serve() { } } -func (f *sendReceiveFolder) IndexUpdated() { +func (f *sendReceiveFolder) SchedulePull() { select { - case f.remoteIndex <- struct{}{}: + case f.pullScheduled <- struct{}{}: default: // We might be busy doing a pull and thus not reading from this // channel. The channel is 1-buffered, so one notification will be @@ -308,6 +222,98 @@ func (f *sendReceiveFolder) String() string { return fmt.Sprintf("sendReceiveFolder/%s@%p", f.folderID, f) } +func (f *sendReceiveFolder) pull(prevSeq int64, prevIgnoreHash string) (curSeq int64, curIgnoreHash string, success bool) { + select { + case <-f.initialScanFinished: + default: + // Once the initial scan finished, a pull will be scheduled + return prevSeq, prevIgnoreHash, true + } + + f.model.fmut.RLock() + curIgnores := f.model.folderIgnores[f.folderID] + f.model.fmut.RUnlock() + + curSeq = prevSeq + if curIgnoreHash = curIgnores.Hash(); curIgnoreHash != prevIgnoreHash { + // The ignore patterns have changed. We need to re-evaluate if + // there are files we need now that were ignored before. + l.Debugln(f, "ignore patterns have changed, resetting curSeq") + curSeq = 0 + } + + // RemoteSequence() is a fast call, doesn't touch the database. + remoteSeq, ok := f.model.RemoteSequence(f.folderID) + if !ok || remoteSeq == curSeq { + l.Debugln(f, "skip (remoteSeq == curSeq)", curSeq, ok) + return curSeq, curIgnoreHash, true + } + + if err := f.CheckHealth(); err != nil { + l.Debugln("Skipping pull of", f.Description(), "due to folder error:", err) + return curSeq, curIgnoreHash, true + } + + l.Debugln(f, "pulling", curSeq, remoteSeq) + + f.setState(FolderSyncing) + f.clearErrors() + var changed int + tries := 0 + + for { + tries++ + + changed = f.pullerIteration(curIgnores) + l.Debugln(f, "changed", changed) + + if changed == 0 { + // No files were changed by the puller, so we are in + // sync. Update the local version number. + + if lv, ok := f.model.RemoteSequence(f.folderID); ok && lv < remoteSeq { + // There's a corner case where the device we needed + // files from disconnected during the puller + // iteration. The files will have been removed from + // the index, so we've concluded that we don't need + // them, but at the same time we have the old remote sequence + // that includes those files in remoteSeq. So we + // catch the case that this sequence might have + // decreased here. + l.Debugf("%v adjusting remoteSeq from %d to %d", remoteSeq, lv) + remoteSeq = lv + } + curSeq = remoteSeq + + f.pause = f.basePause() + + break + } + + if tries == maxPullerIterations { + // We've tried a bunch of times to get in sync, but + // we're not making it. Probably there are write + // errors preventing us. Flag this with a warning and + // wait a bit longer before retrying. + if folderErrors := f.currentErrors(); len(folderErrors) > 0 { + events.Default.Log(events.FolderErrors, map[string]interface{}{ + "folder": f.folderID, + "errors": folderErrors, + }) + } + + l.Infof("Folder %v isn't making progress. Pausing puller for %v.", f.Description(), f.pause) + l.Debugln(f, "next pull in", f.pause) + + break + } + } + + f.setState(FolderIdle) + + return curSeq, curIgnoreHash, changed == 0 +} + // pullerIteration runs a single puller iteration for the given folder and // returns the number items that should have been synced (even those that // might have failed). One puller iteration handles all files currently @@ -1693,6 +1699,13 @@ func (f *sendReceiveFolder) currentErrors() []fileError { return errors } +func (f *sendReceiveFolder) basePause() time.Duration { + if f.PullerPauseS == 0 { + return defaultPullerPause + } + return time.Duration(f.PullerPauseS) * time.Second +} + func (f *sendReceiveFolder) IgnoresUpdated() { f.folder.IgnoresUpdated() f.IndexUpdated()