mirror of
https://github.com/octoleo/syncthing.git
synced 2025-02-02 11:58:28 +00:00
lib/model: Refactor {ro,rw}folder serve loop into just folder (#4939)
The actual pull method (which is really the only thing that differs between them) is now an interface member which gets overridden by the subclass. "Subclass?!" Well, this is dynamic dispatch with overriding, I guess.
This commit is contained in:
parent
a2f51c85c2
commit
344e52e311
@ -39,6 +39,12 @@ type folder struct {
|
||||
restartWatchChan chan struct{}
|
||||
watchErr error
|
||||
watchErrMut sync.Mutex
|
||||
|
||||
puller puller
|
||||
}
|
||||
|
||||
type puller interface {
|
||||
pull() bool // true when successfull and should not be retried
|
||||
}
|
||||
|
||||
func newFolder(model *Model, cfg config.FolderConfiguration) folder {
|
||||
@ -64,6 +70,89 @@ func newFolder(model *Model, cfg config.FolderConfiguration) folder {
|
||||
}
|
||||
}
|
||||
|
||||
func (f *folder) Serve() {
|
||||
l.Debugln(f, "starting")
|
||||
defer l.Debugln(f, "exiting")
|
||||
|
||||
defer func() {
|
||||
f.scan.timer.Stop()
|
||||
f.setState(FolderIdle)
|
||||
}()
|
||||
|
||||
pause := f.basePause()
|
||||
pullFailTimer := time.NewTimer(0)
|
||||
<-pullFailTimer.C
|
||||
|
||||
if f.FSWatcherEnabled && f.CheckHealth() == nil {
|
||||
f.startWatch()
|
||||
}
|
||||
|
||||
initialCompleted := f.initialScanFinished
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-f.ctx.Done():
|
||||
return
|
||||
|
||||
case <-f.pullScheduled:
|
||||
pullFailTimer.Stop()
|
||||
select {
|
||||
case <-pullFailTimer.C:
|
||||
default:
|
||||
}
|
||||
|
||||
if !f.puller.pull() {
|
||||
// Pulling failed, try again later.
|
||||
pullFailTimer.Reset(pause)
|
||||
}
|
||||
|
||||
case <-pullFailTimer.C:
|
||||
if f.puller.pull() {
|
||||
// We're good. Don't schedule another fail pull and reset
|
||||
// the pause interval.
|
||||
pause = f.basePause()
|
||||
continue
|
||||
}
|
||||
|
||||
// Pulling failed, try again later.
|
||||
l.Infof("Folder %v isn't making sync progress - retrying in %v.", f.Description(), pause)
|
||||
pullFailTimer.Reset(pause)
|
||||
// Back off from retrying to pull with an upper limit.
|
||||
if pause < 60*f.basePause() {
|
||||
pause *= 2
|
||||
}
|
||||
|
||||
case <-initialCompleted:
|
||||
// Initial scan has completed, we should do a pull
|
||||
initialCompleted = nil // never hit this case again
|
||||
if !f.puller.pull() {
|
||||
// Pulling failed, try again later.
|
||||
pullFailTimer.Reset(pause)
|
||||
}
|
||||
|
||||
// The reason for running the scanner from within the puller is that
|
||||
// this is the easiest way to make sure we are not doing both at the
|
||||
// same time.
|
||||
case <-f.scan.timer.C:
|
||||
l.Debugln(f, "Scanning subdirectories")
|
||||
f.scanTimerFired()
|
||||
|
||||
case req := <-f.scan.now:
|
||||
req.err <- f.scanSubdirs(req.subdirs)
|
||||
|
||||
case next := <-f.scan.delay:
|
||||
f.scan.timer.Reset(next)
|
||||
|
||||
case fsEvents := <-f.watchChan:
|
||||
l.Debugln(f, "filesystem notification rescan")
|
||||
f.scanSubdirs(fsEvents)
|
||||
|
||||
case <-f.restartWatchChan:
|
||||
f.restartWatch()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (f *folder) BringToFront(string) {}
|
||||
|
||||
func (f *folder) DelayScan(next time.Duration) {
|
||||
@ -258,3 +347,10 @@ func (f *folder) setError(err error) {
|
||||
|
||||
f.stateTracker.setError(err)
|
||||
}
|
||||
|
||||
func (f *folder) basePause() time.Duration {
|
||||
if f.PullerPauseS == 0 {
|
||||
return defaultPullerPause
|
||||
}
|
||||
return time.Duration(f.PullerPauseS) * time.Second
|
||||
}
|
||||
|
@ -25,47 +25,11 @@ type sendOnlyFolder struct {
|
||||
}
|
||||
|
||||
func newSendOnlyFolder(model *Model, cfg config.FolderConfiguration, _ versioner.Versioner, _ fs.Filesystem) service {
|
||||
return &sendOnlyFolder{folder: newFolder(model, cfg)}
|
||||
}
|
||||
|
||||
func (f *sendOnlyFolder) Serve() {
|
||||
l.Debugln(f, "starting")
|
||||
defer l.Debugln(f, "exiting")
|
||||
|
||||
defer func() {
|
||||
f.scan.timer.Stop()
|
||||
}()
|
||||
|
||||
if f.FSWatcherEnabled && f.CheckHealth() == nil {
|
||||
f.startWatch()
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-f.ctx.Done():
|
||||
return
|
||||
|
||||
case <-f.pullScheduled:
|
||||
f.pull()
|
||||
|
||||
case <-f.restartWatchChan:
|
||||
f.restartWatch()
|
||||
|
||||
case <-f.scan.timer.C:
|
||||
l.Debugln(f, "Scanning subdirectories")
|
||||
f.scanTimerFired()
|
||||
|
||||
case req := <-f.scan.now:
|
||||
req.err <- f.scanSubdirs(req.subdirs)
|
||||
|
||||
case next := <-f.scan.delay:
|
||||
f.scan.timer.Reset(next)
|
||||
|
||||
case fsEvents := <-f.watchChan:
|
||||
l.Debugln(f, "filesystem notification rescan")
|
||||
f.scanSubdirs(fsEvents)
|
||||
}
|
||||
f := &sendOnlyFolder{
|
||||
folder: newFolder(model, cfg),
|
||||
}
|
||||
f.folder.puller = f
|
||||
return f
|
||||
}
|
||||
|
||||
func (f *sendOnlyFolder) String() string {
|
||||
@ -77,12 +41,12 @@ func (f *sendOnlyFolder) PullErrors() []FileError {
|
||||
}
|
||||
|
||||
// pull checks need for files that only differ by metadata (no changes on disk)
|
||||
func (f *sendOnlyFolder) pull() {
|
||||
func (f *sendOnlyFolder) pull() bool {
|
||||
select {
|
||||
case <-f.initialScanFinished:
|
||||
default:
|
||||
// Once the initial scan finished, a pull will be scheduled
|
||||
return
|
||||
return false
|
||||
}
|
||||
|
||||
f.model.fmut.RLock()
|
||||
@ -133,4 +97,6 @@ func (f *sendOnlyFolder) pull() {
|
||||
if len(batch) > 0 {
|
||||
f.model.updateLocalsFromPulling(f.folderID, batch)
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
@ -94,9 +94,9 @@ type dbUpdateJob struct {
|
||||
type sendReceiveFolder struct {
|
||||
folder
|
||||
|
||||
fs fs.Filesystem
|
||||
versioner versioner.Versioner
|
||||
pause time.Duration
|
||||
prevIgnoreHash string
|
||||
fs fs.Filesystem
|
||||
versioner versioner.Versioner
|
||||
|
||||
queue *jobQueue
|
||||
|
||||
@ -106,15 +106,13 @@ type sendReceiveFolder struct {
|
||||
|
||||
func newSendReceiveFolder(model *Model, cfg config.FolderConfiguration, ver versioner.Versioner, fs fs.Filesystem) service {
|
||||
f := &sendReceiveFolder{
|
||||
folder: newFolder(model, cfg),
|
||||
|
||||
folder: newFolder(model, cfg),
|
||||
fs: fs,
|
||||
versioner: ver,
|
||||
|
||||
queue: newJobQueue(),
|
||||
|
||||
queue: newJobQueue(),
|
||||
errorsMut: sync.NewMutex(),
|
||||
}
|
||||
f.folder.puller = f
|
||||
|
||||
if f.Copiers == 0 {
|
||||
f.Copiers = defaultCopiers
|
||||
@ -130,126 +128,32 @@ func newSendReceiveFolder(model *Model, cfg config.FolderConfiguration, ver vers
|
||||
f.PullerMaxPendingKiB = blockSizeKiB
|
||||
}
|
||||
|
||||
f.pause = f.basePause()
|
||||
|
||||
return f
|
||||
}
|
||||
|
||||
// Serve will run scans and pulls. It will return when Stop()ed or on a
|
||||
// critical error.
|
||||
func (f *sendReceiveFolder) Serve() {
|
||||
l.Debugln(f, "starting")
|
||||
defer l.Debugln(f, "exiting")
|
||||
|
||||
defer func() {
|
||||
f.scan.timer.Stop()
|
||||
// TODO: Should there be an actual FolderStopped state?
|
||||
f.setState(FolderIdle)
|
||||
}()
|
||||
|
||||
var prevIgnoreHash string
|
||||
var success bool
|
||||
pullFailTimer := time.NewTimer(time.Duration(0))
|
||||
<-pullFailTimer.C
|
||||
|
||||
if f.FSWatcherEnabled && f.CheckHealth() == nil {
|
||||
f.startWatch()
|
||||
}
|
||||
|
||||
initialCompleted := f.initialScanFinished
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-f.ctx.Done():
|
||||
return
|
||||
|
||||
case <-f.pullScheduled:
|
||||
pullFailTimer.Stop()
|
||||
select {
|
||||
case <-pullFailTimer.C:
|
||||
default:
|
||||
}
|
||||
|
||||
if prevIgnoreHash, success = f.pull(prevIgnoreHash); !success {
|
||||
// Pulling failed, try again later.
|
||||
pullFailTimer.Reset(f.pause)
|
||||
}
|
||||
|
||||
case <-pullFailTimer.C:
|
||||
if prevIgnoreHash, success = f.pull(prevIgnoreHash); !success {
|
||||
// Pulling failed, try again later.
|
||||
pullFailTimer.Reset(f.pause)
|
||||
// Back off from retrying to pull with an upper limit.
|
||||
if f.pause < 60*f.basePause() {
|
||||
f.pause *= 2
|
||||
}
|
||||
}
|
||||
|
||||
case <-initialCompleted:
|
||||
// Initial scan has completed, we should do a pull
|
||||
initialCompleted = nil // never hit this case again
|
||||
if prevIgnoreHash, success = f.pull(prevIgnoreHash); !success {
|
||||
// Pulling failed, try again later.
|
||||
pullFailTimer.Reset(f.pause)
|
||||
}
|
||||
|
||||
// The reason for running the scanner from within the puller is that
|
||||
// this is the easiest way to make sure we are not doing both at the
|
||||
// same time.
|
||||
case <-f.scan.timer.C:
|
||||
l.Debugln(f, "Scanning subdirectories")
|
||||
f.scanTimerFired()
|
||||
|
||||
case req := <-f.scan.now:
|
||||
req.err <- f.scanSubdirs(req.subdirs)
|
||||
|
||||
case next := <-f.scan.delay:
|
||||
f.scan.timer.Reset(next)
|
||||
|
||||
case fsEvents := <-f.watchChan:
|
||||
l.Debugln(f, "filesystem notification rescan")
|
||||
f.scanSubdirs(fsEvents)
|
||||
|
||||
case <-f.restartWatchChan:
|
||||
f.restartWatch()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (f *sendReceiveFolder) SchedulePull() {
|
||||
select {
|
||||
case f.pullScheduled <- struct{}{}:
|
||||
default:
|
||||
// We might be busy doing a pull and thus not reading from this
|
||||
// channel. The channel is 1-buffered, so one notification will be
|
||||
// queued to ensure we recheck after the pull, but beyond that we must
|
||||
// make sure to not block index receiving.
|
||||
}
|
||||
}
|
||||
|
||||
func (f *sendReceiveFolder) String() string {
|
||||
return fmt.Sprintf("sendReceiveFolder/%s@%p", f.folderID, f)
|
||||
}
|
||||
|
||||
func (f *sendReceiveFolder) pull(prevIgnoreHash string) (curIgnoreHash string, success bool) {
|
||||
func (f *sendReceiveFolder) pull() bool {
|
||||
select {
|
||||
case <-f.initialScanFinished:
|
||||
default:
|
||||
// Once the initial scan finished, a pull will be scheduled
|
||||
return prevIgnoreHash, true
|
||||
return true
|
||||
}
|
||||
|
||||
if err := f.CheckHealth(); err != nil {
|
||||
l.Debugln("Skipping pull of", f.Description(), "due to folder error:", err)
|
||||
return prevIgnoreHash, true
|
||||
return true
|
||||
}
|
||||
|
||||
f.model.fmut.RLock()
|
||||
curIgnores := f.model.folderIgnores[f.folderID]
|
||||
f.model.fmut.RUnlock()
|
||||
|
||||
curIgnoreHash = curIgnores.Hash()
|
||||
ignoresChanged := curIgnoreHash != prevIgnoreHash
|
||||
curIgnoreHash := curIgnores.Hash()
|
||||
ignoresChanged := curIgnoreHash != f.prevIgnoreHash
|
||||
|
||||
l.Debugf("%v pulling (ignoresChanged=%v)", f, ignoresChanged)
|
||||
|
||||
@ -270,10 +174,7 @@ func (f *sendReceiveFolder) pull(prevIgnoreHash string) (curIgnoreHash string, s
|
||||
|
||||
if changed == 0 {
|
||||
// No files were changed by the puller, so we are in
|
||||
// sync. Update the local version number.
|
||||
|
||||
f.pause = f.basePause()
|
||||
|
||||
// sync.
|
||||
break
|
||||
}
|
||||
|
||||
@ -288,10 +189,6 @@ func (f *sendReceiveFolder) pull(prevIgnoreHash string) (curIgnoreHash string, s
|
||||
"errors": folderErrors,
|
||||
})
|
||||
}
|
||||
|
||||
l.Infof("Folder %v isn't making progress. Pausing puller for %v.", f.Description(), f.pause)
|
||||
l.Debugln(f, "next pull in", f.pause)
|
||||
|
||||
break
|
||||
}
|
||||
}
|
||||
@ -301,10 +198,11 @@ func (f *sendReceiveFolder) pull(prevIgnoreHash string) (curIgnoreHash string, s
|
||||
close(scanChan)
|
||||
|
||||
if changed == 0 {
|
||||
return curIgnoreHash, true
|
||||
f.prevIgnoreHash = curIgnoreHash
|
||||
return true
|
||||
}
|
||||
|
||||
return prevIgnoreHash, false
|
||||
return false
|
||||
}
|
||||
|
||||
// pullerIteration runs a single puller iteration for the given folder and
|
||||
@ -1800,13 +1698,6 @@ func (f *sendReceiveFolder) PullErrors() []FileError {
|
||||
return errors
|
||||
}
|
||||
|
||||
func (f *sendReceiveFolder) basePause() time.Duration {
|
||||
if f.PullerPauseS == 0 {
|
||||
return defaultPullerPause
|
||||
}
|
||||
return time.Duration(f.PullerPauseS) * time.Second
|
||||
}
|
||||
|
||||
// deleteDir attempts to delete a directory. It checks for files/dirs inside
|
||||
// the directory and removes them if possible or returns an error if it fails
|
||||
func (f *sendReceiveFolder) deleteDir(dir string, ignores *ignore.Matcher, scanChan chan<- string) error {
|
||||
|
Loading…
x
Reference in New Issue
Block a user