From 236f121c4e6882e31fd96e3ba5aa39fb01947115 Mon Sep 17 00:00:00 2001 From: "Lars K.W. Gohlke" Date: Tue, 26 Apr 2016 14:01:46 +0000 Subject: [PATCH] lib/model: Refactor out folder and folderscan types, simplify somewhat GitHub-Pull-Request: https://github.com/syncthing/syncthing/pull/3007 --- lib/model/folder.go | 53 ++++ lib/model/folderscan.go | 49 +++ lib/model/folderstate.go | 8 +- lib/model/model.go | 6 +- lib/model/rofolder.go | 139 +++------ lib/model/rwfolder.go | 594 +++++++++++++++++-------------------- lib/model/rwfolder_test.go | 55 ++-- 7 files changed, 446 insertions(+), 458 deletions(-) create mode 100644 lib/model/folder.go create mode 100644 lib/model/folderscan.go diff --git a/lib/model/folder.go b/lib/model/folder.go new file mode 100644 index 000000000..8f15adc8c --- /dev/null +++ b/lib/model/folder.go @@ -0,0 +1,53 @@ +// Copyright (C) 2014 The Syncthing Authors. +// +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this file, +// You can obtain one at http://mozilla.org/MPL/2.0/. + +package model + +import "time" + +type folder struct { + stateTracker + scan folderscan + model *Model + stop chan struct{} +} + +func (f *folder) IndexUpdated() { +} + +func (f *folder) DelayScan(next time.Duration) { + f.scan.Delay(next) +} + +func (f *folder) Scan(subdirs []string) error { + return f.scan.Scan(subdirs) +} +func (f *folder) Stop() { + close(f.stop) +} + +func (f *folder) Jobs() ([]string, []string) { + return nil, nil +} + +func (f *folder) BringToFront(string) {} + +func (f *folder) scanSubdirsIfHealthy(subDirs []string) error { + if err := f.model.CheckFolderHealth(f.folderID); err != nil { + l.Infoln("Skipping folder", f.folderID, "scan due to folder error:", err) + return err + } + l.Debugln(f, "Scanning subdirectories") + if err := f.model.internalScanFolderSubdirs(f.folderID, subDirs); err != nil { + // Potentially sets the error twice, once in the scanner just + // by doing a check, and once here, if the error returned is + // the same one as returned by CheckFolderHealth, though + // duplicate set is handled by setError. + f.setError(err) + return err + } + return nil +} diff --git a/lib/model/folderscan.go b/lib/model/folderscan.go new file mode 100644 index 000000000..65ed31aa5 --- /dev/null +++ b/lib/model/folderscan.go @@ -0,0 +1,49 @@ +// Copyright (C) 2016 The Syncthing Authors. +// +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this file, +// You can obtain one at http://mozilla.org/MPL/2.0/. + +package model + +import ( + "math/rand" + "time" +) + +type rescanRequest struct { + subdirs []string + err chan error +} + +// bundle all folder scan activity +type folderscan struct { + interval time.Duration + timer *time.Timer + now chan rescanRequest + delay chan time.Duration +} + +func (s *folderscan) reschedule() { + if s.interval == 0 { + return + } + // Sleep a random time between 3/4 and 5/4 of the configured interval. + sleepNanos := (s.interval.Nanoseconds()*3 + rand.Int63n(2*s.interval.Nanoseconds())) / 4 + interval := time.Duration(sleepNanos) * time.Nanosecond + l.Debugln(s, "next rescan in", interval) + s.timer.Reset(interval) +} + +func (s *folderscan) Scan(subdirs []string) error { + req := rescanRequest{ + subdirs: subdirs, + err: make(chan error), + } + s.now <- req + return <-req.err +} + +func (s *folderscan) Delay(next time.Duration) { + s.delay <- next +} diff --git a/lib/model/folderstate.go b/lib/model/folderstate.go index dfacdc47d..f68bee355 100644 --- a/lib/model/folderstate.go +++ b/lib/model/folderstate.go @@ -38,7 +38,7 @@ func (s folderState) String() string { } type stateTracker struct { - folder string + folderID string mut sync.Mutex current folderState @@ -61,7 +61,7 @@ func (s *stateTracker) setState(newState folderState) { */ eventData := map[string]interface{}{ - "folder": s.folder, + "folder": s.folderID, "to": newState.String(), "from": s.current.String(), } @@ -92,7 +92,7 @@ func (s *stateTracker) setError(err error) { s.mut.Lock() if s.current != FolderError || s.err.Error() != err.Error() { eventData := map[string]interface{}{ - "folder": s.folder, + "folder": s.folderID, "to": FolderError.String(), "from": s.current.String(), "error": err.Error(), @@ -116,7 +116,7 @@ func (s *stateTracker) clearError() { s.mut.Lock() if s.current == FolderError { eventData := map[string]interface{}{ - "folder": s.folder, + "folder": s.folderID, "to": FolderIdle.String(), "from": s.current.String(), } diff --git a/lib/model/model.go b/lib/model/model.go index 83d2ee3b5..558d59837 100644 --- a/lib/model/model.go +++ b/lib/model/model.go @@ -172,7 +172,7 @@ func (m *Model) StartFolderRW(folder string) { if ok { panic("cannot start already running folder " + folder) } - p := newRWFolder(m, m.shortID, cfg) + p := newRWFolder(m, cfg) m.folderRunners[folder] = p if len(cfg.Versioning.Type) > 0 { @@ -243,7 +243,7 @@ func (m *Model) StartFolderRO(folder string) { if ok { panic("cannot start already running folder " + folder) } - s := newROFolder(m, folder, time.Duration(cfg.RescanIntervalS)*time.Second) + s := newROFolder(m, cfg) m.folderRunners[folder] = s token := m.Add(s) @@ -1360,7 +1360,7 @@ func (m *Model) ScanFolderSubs(folder string, subs []string) error { return runner.Scan(subs) } -func (m *Model) internalScanFolderSubs(folder string, subs []string) error { +func (m *Model) internalScanFolderSubdirs(folder string, subs []string) error { for i, sub := range subs { sub = osutil.NativeFilename(sub) if p := filepath.Clean(filepath.Join(folder, sub)); !strings.HasPrefix(p, folder) { diff --git a/lib/model/rofolder.go b/lib/model/rofolder.go index f1d65f94b..5a146191e 100644 --- a/lib/model/rofolder.go +++ b/lib/model/rofolder.go @@ -8,151 +8,88 @@ package model import ( "fmt" - "math/rand" "time" + "github.com/syncthing/syncthing/lib/config" "github.com/syncthing/syncthing/lib/sync" ) type roFolder struct { - stateTracker - - folder string - intv time.Duration - timer *time.Timer - model *Model - stop chan struct{} - scanNow chan rescanRequest - delayScan chan time.Duration + folder } -type rescanRequest struct { - subs []string - err chan error -} - -func newROFolder(model *Model, folder string, interval time.Duration) *roFolder { +func newROFolder(model *Model, cfg config.FolderConfiguration) *roFolder { return &roFolder{ - stateTracker: stateTracker{ - folder: folder, - mut: sync.NewMutex(), + folder: folder{ + stateTracker: stateTracker{ + folderID: cfg.ID, + mut: sync.NewMutex(), + }, + scan: folderscan{ + interval: time.Duration(cfg.RescanIntervalS) * time.Second, + timer: time.NewTimer(time.Millisecond), + now: make(chan rescanRequest), + delay: make(chan time.Duration), + }, + stop: make(chan struct{}), + model: model, }, - folder: folder, - intv: interval, - timer: time.NewTimer(time.Millisecond), - model: model, - stop: make(chan struct{}), - scanNow: make(chan rescanRequest), - delayScan: make(chan time.Duration), } } -func (s *roFolder) Serve() { - l.Debugln(s, "starting") - defer l.Debugln(s, "exiting") +func (f *roFolder) Serve() { + l.Debugln(f, "starting") + defer l.Debugln(f, "exiting") defer func() { - s.timer.Stop() + f.scan.timer.Stop() }() - reschedule := func() { - if s.intv == 0 { - return - } - // Sleep a random time between 3/4 and 5/4 of the configured interval. - sleepNanos := (s.intv.Nanoseconds()*3 + rand.Int63n(2*s.intv.Nanoseconds())) / 4 - s.timer.Reset(time.Duration(sleepNanos) * time.Nanosecond) - } - initialScanCompleted := false for { select { - case <-s.stop: + case <-f.stop: return - case <-s.timer.C: - if err := s.model.CheckFolderHealth(s.folder); err != nil { - l.Infoln("Skipping folder", s.folder, "scan due to folder error:", err) - reschedule() + case <-f.scan.timer.C: + if err := f.model.CheckFolderHealth(f.folderID); err != nil { + l.Infoln("Skipping folder", f.folderID, "scan due to folder error:", err) + f.scan.reschedule() continue } - l.Debugln(s, "rescan") + l.Debugln(f, "rescan") - if err := s.model.internalScanFolderSubs(s.folder, nil); err != nil { + if err := f.model.internalScanFolderSubdirs(f.folderID, nil); err != nil { // Potentially sets the error twice, once in the scanner just // by doing a check, and once here, if the error returned is // the same one as returned by CheckFolderHealth, though // duplicate set is handled by setError. - s.setError(err) - reschedule() + f.setError(err) + f.scan.reschedule() continue } if !initialScanCompleted { - l.Infoln("Completed initial scan (ro) of folder", s.folder) + l.Infoln("Completed initial scan (ro) of folder", f.folderID) initialScanCompleted = true } - if s.intv == 0 { + if f.scan.interval == 0 { continue } - reschedule() + f.scan.reschedule() - case req := <-s.scanNow: - if err := s.model.CheckFolderHealth(s.folder); err != nil { - l.Infoln("Skipping folder", s.folder, "scan due to folder error:", err) - req.err <- err - continue - } + case req := <-f.scan.now: + req.err <- f.scanSubdirsIfHealthy(req.subdirs) - l.Debugln(s, "forced rescan") - - if err := s.model.internalScanFolderSubs(s.folder, req.subs); err != nil { - // Potentially sets the error twice, once in the scanner just - // by doing a check, and once here, if the error returned is - // the same one as returned by CheckFolderHealth, though - // duplicate set is handled by setError. - s.setError(err) - req.err <- err - continue - } - - req.err <- nil - - case next := <-s.delayScan: - s.timer.Reset(next) + case next := <-f.scan.delay: + f.scan.timer.Reset(next) } } } -func (s *roFolder) Stop() { - close(s.stop) -} - -func (s *roFolder) IndexUpdated() { -} - -func (s *roFolder) Scan(subs []string) error { - req := rescanRequest{ - subs: subs, - err: make(chan error), - } - s.scanNow <- req - return <-req.err -} - -func (s *roFolder) String() string { - return fmt.Sprintf("roFolder/%s@%p", s.folder, s) -} - -func (s *roFolder) BringToFront(string) {} - -func (s *roFolder) Jobs() ([]string, []string) { - return nil, nil -} - -func (s *roFolder) DelayScan(next time.Duration) { - s.delayScan <- next +func (f *roFolder) String() string { + return fmt.Sprintf("roFolder/%s@%p", f.folderID, f) } diff --git a/lib/model/rwfolder.go b/lib/model/rwfolder.go index 44bfac512..b1751214a 100644 --- a/lib/model/rwfolder.go +++ b/lib/model/rwfolder.go @@ -74,114 +74,108 @@ type dbUpdateJob struct { } type rwFolder struct { - stateTracker + folder - model *Model - progressEmitter *ProgressEmitter virtualMtimeRepo *db.VirtualMtimeRepo + dir string + versioner versioner.Versioner + ignorePerms bool + copiers int + pullers int + order config.PullOrder + maxConflicts int + sleep time.Duration + pause time.Duration + allowSparse bool + checkFreeSpace bool - folder string - dir string - scanIntv time.Duration - versioner versioner.Versioner - ignorePerms bool - copiers int - pullers int - shortID protocol.ShortID - order config.PullOrder - maxConflicts int - sleep time.Duration - pause time.Duration - allowSparse bool - checkFreeSpace bool - - stop chan struct{} queue *jobQueue dbUpdates chan dbUpdateJob - scanTimer *time.Timer pullTimer *time.Timer - delayScan chan time.Duration - scanNow chan rescanRequest remoteIndex chan struct{} // An index update was received, we should re-evaluate needs errors map[string]string // path -> error string errorsMut sync.Mutex } -func newRWFolder(m *Model, shortID protocol.ShortID, cfg config.FolderConfiguration) *rwFolder { - p := &rwFolder{ - stateTracker: stateTracker{ - folder: cfg.ID, - mut: sync.NewMutex(), +func newRWFolder(model *Model, cfg config.FolderConfiguration) *rwFolder { + f := &rwFolder{ + folder: folder{ + stateTracker: stateTracker{ + folderID: cfg.ID, + mut: sync.NewMutex(), + }, + scan: folderscan{ + interval: time.Duration(cfg.RescanIntervalS) * time.Second, + timer: time.NewTimer(time.Millisecond), // The first scan should be done immediately. + now: make(chan rescanRequest), + delay: make(chan time.Duration), + }, + stop: make(chan struct{}), + model: model, }, - model: m, - progressEmitter: m.progressEmitter, - virtualMtimeRepo: db.NewVirtualMtimeRepo(m.db, cfg.ID), + virtualMtimeRepo: db.NewVirtualMtimeRepo(model.db, cfg.ID), + dir: cfg.Path(), + ignorePerms: cfg.IgnorePerms, + copiers: cfg.Copiers, + pullers: cfg.Pullers, + order: cfg.Order, + maxConflicts: cfg.MaxConflicts, + allowSparse: !cfg.DisableSparseFiles, + checkFreeSpace: cfg.MinDiskFreePct != 0, - folder: cfg.ID, - dir: cfg.Path(), - scanIntv: time.Duration(cfg.RescanIntervalS) * time.Second, - ignorePerms: cfg.IgnorePerms, - copiers: cfg.Copiers, - pullers: cfg.Pullers, - shortID: shortID, - order: cfg.Order, - maxConflicts: cfg.MaxConflicts, - allowSparse: !cfg.DisableSparseFiles, - checkFreeSpace: cfg.MinDiskFreePct != 0, - - stop: make(chan struct{}), queue: newJobQueue(), pullTimer: time.NewTimer(time.Second), - scanTimer: time.NewTimer(time.Millisecond), // The first scan should be done immediately. - delayScan: make(chan time.Duration), - scanNow: make(chan rescanRequest), remoteIndex: make(chan struct{}, 1), // This needs to be 1-buffered so that we queue a notification if we're busy doing a pull when it comes. errorsMut: sync.NewMutex(), } - if p.copiers == 0 { - p.copiers = defaultCopiers + f.configureCopiersAndPullers(cfg) + + return f +} + +func (f *rwFolder) configureCopiersAndPullers(config config.FolderConfiguration) { + if f.copiers == 0 { + f.copiers = defaultCopiers } - if p.pullers == 0 { - p.pullers = defaultPullers + if f.pullers == 0 { + f.pullers = defaultPullers } - if cfg.PullerPauseS == 0 { - p.pause = defaultPullerPause + if config.PullerPauseS == 0 { + f.pause = defaultPullerPause } else { - p.pause = time.Duration(cfg.PullerPauseS) * time.Second + f.pause = time.Duration(config.PullerPauseS) * time.Second } - if cfg.PullerSleepS == 0 { - p.sleep = defaultPullerSleep + if config.PullerSleepS == 0 { + f.sleep = defaultPullerSleep } else { - p.sleep = time.Duration(cfg.PullerSleepS) * time.Second + f.sleep = time.Duration(config.PullerSleepS) * time.Second } - - return p } // Helper function to check whether either the ignorePerm flag has been // set on the local host or the FlagNoPermBits has been set on the file/dir // which is being pulled. -func (p *rwFolder) ignorePermissions(file protocol.FileInfo) bool { - return p.ignorePerms || file.Flags&protocol.FlagNoPermBits != 0 +func (f *rwFolder) ignorePermissions(file protocol.FileInfo) bool { + return f.ignorePerms || file.Flags&protocol.FlagNoPermBits != 0 } // Serve will run scans and pulls. It will return when Stop()ed or on a // critical error. -func (p *rwFolder) Serve() { - l.Debugln(p, "starting") - defer l.Debugln(p, "exiting") +func (f *rwFolder) Serve() { + l.Debugln(f, "starting") + defer l.Debugln(f, "exiting") defer func() { - p.pullTimer.Stop() - p.scanTimer.Stop() + f.pullTimer.Stop() + f.scan.timer.Stop() // TODO: Should there be an actual FolderStopped state? - p.setState(FolderIdle) + f.setState(FolderIdle) }() var prevVer int64 @@ -192,65 +186,65 @@ func (p *rwFolder) Serve() { for { select { - case <-p.stop: + case <-f.stop: return - case <-p.remoteIndex: + case <-f.remoteIndex: prevVer = 0 - p.pullTimer.Reset(0) - l.Debugln(p, "remote index updated, rescheduling pull") + f.pullTimer.Reset(0) + l.Debugln(f, "remote index updated, rescheduling pull") - case <-p.pullTimer.C: + case <-f.pullTimer.C: if !initialScanCompleted { - l.Debugln(p, "skip (initial)") - p.pullTimer.Reset(p.sleep) + l.Debugln(f, "skip (initial)") + f.pullTimer.Reset(f.sleep) continue } - p.model.fmut.RLock() - curIgnores := p.model.folderIgnores[p.folder] - p.model.fmut.RUnlock() + f.model.fmut.RLock() + curIgnores := f.model.folderIgnores[f.folderID] + f.model.fmut.RUnlock() if newHash := curIgnores.Hash(); newHash != prevIgnoreHash { // The ignore patterns have changed. We need to re-evaluate if // there are files we need now that were ignored before. - l.Debugln(p, "ignore patterns have changed, resetting prevVer") + l.Debugln(f, "ignore patterns have changed, resetting prevVer") prevVer = 0 prevIgnoreHash = newHash } // RemoteLocalVersion() is a fast call, doesn't touch the database. - curVer, ok := p.model.RemoteLocalVersion(p.folder) + curVer, ok := f.model.RemoteLocalVersion(f.folderID) if !ok || curVer == prevVer { - l.Debugln(p, "skip (curVer == prevVer)", prevVer, ok) - p.pullTimer.Reset(p.sleep) + l.Debugln(f, "skip (curVer == prevVer)", prevVer, ok) + f.pullTimer.Reset(f.sleep) continue } - if err := p.model.CheckFolderHealth(p.folder); err != nil { - l.Infoln("Skipping folder", p.folder, "pull due to folder error:", err) - p.pullTimer.Reset(p.sleep) + if err := f.model.CheckFolderHealth(f.folderID); err != nil { + l.Infoln("Skipping folder", f.folderID, "pull due to folder error:", err) + f.pullTimer.Reset(f.sleep) continue } - l.Debugln(p, "pulling", prevVer, curVer) + l.Debugln(f, "pulling", prevVer, curVer) - p.setState(FolderSyncing) - p.clearErrors() + f.setState(FolderSyncing) + f.clearErrors() tries := 0 for { tries++ - changed := p.pullerIteration(curIgnores) - l.Debugln(p, "changed", changed) + changed := f.pullerIteration(curIgnores) + l.Debugln(f, "changed", changed) if changed == 0 { // No files were changed by the puller, so we are in // sync. Remember the local version number and // schedule a resync a little bit into the future. - if lv, ok := p.model.RemoteLocalVersion(p.folder); ok && lv < curVer { + if lv, ok := f.model.RemoteLocalVersion(f.folderID); ok && lv < curVer { // There's a corner case where the device we needed // files from disconnected during the puller // iteration. The files will have been removed from @@ -259,12 +253,12 @@ func (p *rwFolder) Serve() { // version that includes those files in curVer. So we // catch the case that localVersion might have // decreased here. - l.Debugln(p, "adjusting curVer", lv) + l.Debugln(f, "adjusting curVer", lv) curVer = lv } prevVer = curVer - l.Debugln(p, "next pull in", p.sleep) - p.pullTimer.Reset(p.sleep) + l.Debugln(f, "next pull in", f.sleep) + f.pullTimer.Reset(f.sleep) break } @@ -273,81 +267,48 @@ func (p *rwFolder) Serve() { // we're not making it. Probably there are write // errors preventing us. Flag this with a warning and // wait a bit longer before retrying. - l.Infof("Folder %q isn't making progress. Pausing puller for %v.", p.folder, p.pause) - l.Debugln(p, "next pull in", p.pause) + l.Infof("Folder %q isn't making progress. Pausing puller for %v.", f.folderID, f.pause) + l.Debugln(f, "next pull in", f.pause) - if folderErrors := p.currentErrors(); len(folderErrors) > 0 { + if folderErrors := f.currentErrors(); len(folderErrors) > 0 { events.Default.Log(events.FolderErrors, map[string]interface{}{ - "folder": p.folder, + "folder": f.folderID, "errors": folderErrors, }) } - p.pullTimer.Reset(p.pause) + f.pullTimer.Reset(f.pause) break } } - p.setState(FolderIdle) + f.setState(FolderIdle) // The reason for running the scanner from within the puller is that // this is the easiest way to make sure we are not doing both at the // same time. - case <-p.scanTimer.C: - err := p.scanSubsIfHealthy(nil) - p.rescheduleScan() + case <-f.scan.timer.C: + err := f.scanSubdirsIfHealthy(nil) + f.scan.reschedule() if err != nil { continue } if !initialScanCompleted { - l.Infoln("Completed initial scan (rw) of folder", p.folder) + l.Infoln("Completed initial scan (rw) of folder", f.folderID) initialScanCompleted = true } - case req := <-p.scanNow: - req.err <- p.scanSubsIfHealthy(req.subs) + case req := <-f.scan.now: + req.err <- f.scanSubdirsIfHealthy(req.subdirs) - case next := <-p.delayScan: - p.scanTimer.Reset(next) + case next := <-f.scan.delay: + f.scan.timer.Reset(next) } } } -func (p *rwFolder) rescheduleScan() { - if p.scanIntv == 0 { - // We should not run scans, so it should not be rescheduled. - return - } - // Sleep a random time between 3/4 and 5/4 of the configured interval. - sleepNanos := (p.scanIntv.Nanoseconds()*3 + rand.Int63n(2*p.scanIntv.Nanoseconds())) / 4 - intv := time.Duration(sleepNanos) * time.Nanosecond - l.Debugln(p, "next rescan in", intv) - p.scanTimer.Reset(intv) -} - -func (p *rwFolder) scanSubsIfHealthy(subs []string) error { - if err := p.model.CheckFolderHealth(p.folder); err != nil { - l.Infoln("Skipping folder", p.folder, "scan due to folder error:", err) - return err - } - l.Debugln(p, "Scanning subdirectories") - if err := p.model.internalScanFolderSubs(p.folder, subs); err != nil { - // Potentially sets the error twice, once in the scanner just - // by doing a check, and once here, if the error returned is - // the same one as returned by CheckFolderHealth, though - // duplicate set is handled by setError. - p.setError(err) - return err - } - return nil -} - -func (p *rwFolder) Stop() { - close(p.stop) -} - -func (p *rwFolder) IndexUpdated() { +func (f *rwFolder) IndexUpdated() { select { - case p.remoteIndex <- struct{}{}: + case f.remoteIndex <- struct{}{}: default: // We might be busy doing a pull and thus not reading from this // channel. The channel is 1-buffered, so one notification will be @@ -356,24 +317,15 @@ func (p *rwFolder) IndexUpdated() { } } -func (p *rwFolder) Scan(subs []string) error { - req := rescanRequest{ - subs: subs, - err: make(chan error), - } - p.scanNow <- req - return <-req.err -} - -func (p *rwFolder) String() string { - return fmt.Sprintf("rwFolder/%s@%p", p.folder, p) +func (f *rwFolder) String() string { + return fmt.Sprintf("rwFolder/%s@%p", f.folderID, f) } // pullerIteration runs a single puller iteration for the given folder and // returns the number items that should have been synced (even those that // might have failed). One puller iteration handles all files currently // flagged as needed in the folder. -func (p *rwFolder) pullerIteration(ignores *ignore.Matcher) int { +func (f *rwFolder) pullerIteration(ignores *ignore.Matcher) int { pullChan := make(chan pullBlockState) copyChan := make(chan copyBlocksState) finisherChan := make(chan *sharedPullerState) @@ -383,30 +335,30 @@ func (p *rwFolder) pullerIteration(ignores *ignore.Matcher) int { pullWg := sync.NewWaitGroup() doneWg := sync.NewWaitGroup() - l.Debugln(p, "c", p.copiers, "p", p.pullers) + l.Debugln(f, "c", f.copiers, "p", f.pullers) - p.dbUpdates = make(chan dbUpdateJob) + f.dbUpdates = make(chan dbUpdateJob) updateWg.Add(1) go func() { // dbUpdaterRoutine finishes when p.dbUpdates is closed - p.dbUpdaterRoutine() + f.dbUpdaterRoutine() updateWg.Done() }() - for i := 0; i < p.copiers; i++ { + for i := 0; i < f.copiers; i++ { copyWg.Add(1) go func() { // copierRoutine finishes when copyChan is closed - p.copierRoutine(copyChan, pullChan, finisherChan) + f.copierRoutine(copyChan, pullChan, finisherChan) copyWg.Done() }() } - for i := 0; i < p.pullers; i++ { + for i := 0; i < f.pullers; i++ { pullWg.Add(1) go func() { // pullerRoutine finishes when pullChan is closed - p.pullerRoutine(pullChan, finisherChan) + f.pullerRoutine(pullChan, finisherChan) pullWg.Done() }() } @@ -414,13 +366,13 @@ func (p *rwFolder) pullerIteration(ignores *ignore.Matcher) int { doneWg.Add(1) // finisherRoutine finishes when finisherChan is closed go func() { - p.finisherRoutine(finisherChan) + f.finisherRoutine(finisherChan) doneWg.Done() }() - p.model.fmut.RLock() - folderFiles := p.model.folderFiles[p.folder] - p.model.fmut.RUnlock() + f.model.fmut.RLock() + folderFiles := f.model.folderFiles[f.folderID] + f.model.fmut.RUnlock() // !!! // WithNeed takes a database snapshot (by necessity). By the time we've @@ -434,15 +386,15 @@ func (p *rwFolder) pullerIteration(ignores *ignore.Matcher) int { dirDeletions := []protocol.FileInfo{} buckets := map[string][]protocol.FileInfo{} - handleFile := func(f protocol.FileInfo) bool { + handleFile := func(fi protocol.FileInfo) bool { switch { - case f.IsDeleted(): + case fi.IsDeleted(): // A deleted file, directory or symlink - if f.IsDirectory() { - dirDeletions = append(dirDeletions, f) + if fi.IsDirectory() { + dirDeletions = append(dirDeletions, fi) } else { - fileDeletions[f.Name] = f - df, ok := p.model.CurrentFolderFile(p.folder, f.Name) + fileDeletions[fi.Name] = fi + df, ok := f.model.CurrentFolderFile(f.folderID, fi.Name) // Local file can be already deleted, but with a lower version // number, hence the deletion coming in again as part of // WithNeed, furthermore, the file can simply be of the wrong @@ -453,10 +405,10 @@ func (p *rwFolder) pullerIteration(ignores *ignore.Matcher) int { buckets[key] = append(buckets[key], df) } } - case f.IsDirectory() && !f.IsSymlink(): + case fi.IsDirectory() && !fi.IsSymlink(): // A new or changed directory - l.Debugln("Creating directory", f.Name) - p.handleDir(f) + l.Debugln("Creating directory", fi.Name) + f.handleDir(fi) default: return false } @@ -475,12 +427,12 @@ func (p *rwFolder) pullerIteration(ignores *ignore.Matcher) int { return true } - l.Debugln(p, "handling", file.Name) + l.Debugln(f, "handling", file.Name) if !handleFile(file) { // A new or changed file or symlink. This is the only case where we // do stuff concurrently in the background - p.queue.Push(file.Name, file.Size(), file.Modified) + f.queue.Push(file.Name, file.Size(), file.Modified) } changed++ @@ -489,19 +441,19 @@ func (p *rwFolder) pullerIteration(ignores *ignore.Matcher) int { // Reorder the file queue according to configuration - switch p.order { + switch f.order { case config.OrderRandom: - p.queue.Shuffle() + f.queue.Shuffle() case config.OrderAlphabetic: - // The queue is already in alphabetic order. + // The queue is already in alphabetic order. case config.OrderSmallestFirst: - p.queue.SortSmallestFirst() + f.queue.SortSmallestFirst() case config.OrderLargestFirst: - p.queue.SortLargestFirst() + f.queue.SortLargestFirst() case config.OrderOldestFirst: - p.queue.SortOldestFirst() + f.queue.SortOldestFirst() case config.OrderNewestFirst: - p.queue.SortNewestFirst() + f.queue.SortNewestFirst() } // Process the file queue @@ -509,35 +461,35 @@ func (p *rwFolder) pullerIteration(ignores *ignore.Matcher) int { nextFile: for { select { - case <-p.stop: + case <-f.stop: // Stop processing files if the puller has been told to stop. break default: } - fileName, ok := p.queue.Pop() + fileName, ok := f.queue.Pop() if !ok { break } - f, ok := p.model.CurrentGlobalFile(p.folder, fileName) + fi, ok := f.model.CurrentGlobalFile(f.folderID, fileName) if !ok { // File is no longer in the index. Mark it as done and drop it. - p.queue.Done(fileName) + f.queue.Done(fileName) continue } // Handles races where an index update arrives changing what the file // is between queueing and retrieving it from the queue, effectively // changing how the file should be handled. - if handleFile(f) { + if handleFile(fi) { continue } - if !f.IsSymlink() { - key := string(f.Blocks[0].Hash) + if !fi.IsSymlink() { + key := string(fi.Blocks[0].Hash) for i, candidate := range buckets[key] { - if scanner.BlocksEqual(candidate.Blocks, f.Blocks) { + if scanner.BlocksEqual(candidate.Blocks, fi.Blocks) { // Remove the candidate from the bucket lidx := len(buckets[key]) - 1 buckets[key][i] = buckets[key][lidx] @@ -550,16 +502,16 @@ nextFile: // Remove the pending deletion (as we perform it by renaming) delete(fileDeletions, candidate.Name) - p.renameFile(desired, f) + f.renameFile(desired, fi) - p.queue.Done(fileName) + f.queue.Done(fileName) continue nextFile } } } // Not a rename or a symlink, deal with it. - p.handleFile(f, copyChan, finisherChan) + f.handleFile(fi, copyChan, finisherChan) } // Signal copy and puller routines that we are done with the in data for @@ -577,27 +529,27 @@ nextFile: for _, file := range fileDeletions { l.Debugln("Deleting file", file.Name) - p.deleteFile(file) + f.deleteFile(file) } for i := range dirDeletions { dir := dirDeletions[len(dirDeletions)-i-1] l.Debugln("Deleting dir", dir.Name) - p.deleteDir(dir, ignores) + f.deleteDir(dir, ignores) } // Wait for db updates to complete - close(p.dbUpdates) + close(f.dbUpdates) updateWg.Wait() return changed } // handleDir creates or updates the given directory -func (p *rwFolder) handleDir(file protocol.FileInfo) { +func (f *rwFolder) handleDir(file protocol.FileInfo) { var err error events.Default.Log(events.ItemStarted, map[string]string{ - "folder": p.folder, + "folder": f.folderID, "item": file.Name, "type": "dir", "action": "update", @@ -605,7 +557,7 @@ func (p *rwFolder) handleDir(file protocol.FileInfo) { defer func() { events.Default.Log(events.ItemFinished, map[string]interface{}{ - "folder": p.folder, + "folder": f.folderID, "item": file.Name, "error": events.Error(err), "type": "dir", @@ -613,14 +565,14 @@ func (p *rwFolder) handleDir(file protocol.FileInfo) { }) }() - realName := filepath.Join(p.dir, file.Name) + realName := filepath.Join(f.dir, file.Name) mode := os.FileMode(file.Flags & 0777) - if p.ignorePermissions(file) { + if f.ignorePermissions(file) { mode = 0777 } if shouldDebug() { - curFile, _ := p.model.CurrentFolderFile(p.folder, file.Name) + curFile, _ := f.model.CurrentFolderFile(f.folderID, file.Name) l.Debugf("need dir\n\t%v\n\t%v", file, curFile) } @@ -632,8 +584,8 @@ func (p *rwFolder) handleDir(file protocol.FileInfo) { case err == nil && (!info.IsDir() || info.Mode()&os.ModeSymlink != 0): err = osutil.InWritableDir(osutil.Remove, realName) if err != nil { - l.Infof("Puller (folder %q, dir %q): %v", p.folder, file.Name, err) - p.newError(file.Name, err) + l.Infof("Puller (folder %q, dir %q): %v", f.folderID, file.Name, err) + f.newError(file.Name, err) return } fallthrough @@ -645,7 +597,7 @@ func (p *rwFolder) handleDir(file protocol.FileInfo) { // not MkdirAll because the parent should already exist. mkdir := func(path string) error { err = os.Mkdir(path, mode) - if err != nil || p.ignorePermissions(file) { + if err != nil || f.ignorePermissions(file) { return err } @@ -661,45 +613,45 @@ func (p *rwFolder) handleDir(file protocol.FileInfo) { } if err = osutil.InWritableDir(mkdir, realName); err == nil { - p.dbUpdates <- dbUpdateJob{file, dbUpdateHandleDir} + f.dbUpdates <- dbUpdateJob{file, dbUpdateHandleDir} } else { - l.Infof("Puller (folder %q, dir %q): %v", p.folder, file.Name, err) - p.newError(file.Name, err) + l.Infof("Puller (folder %q, dir %q): %v", f.folderID, file.Name, err) + f.newError(file.Name, err) } return // Weird error when stat()'ing the dir. Probably won't work to do // anything else with it if we can't even stat() it. case err != nil: - l.Infof("Puller (folder %q, dir %q): %v", p.folder, file.Name, err) - p.newError(file.Name, err) + l.Infof("Puller (folder %q, dir %q): %v", f.folderID, file.Name, err) + f.newError(file.Name, err) return } // The directory already exists, so we just correct the mode bits. (We // don't handle modification times on directories, because that sucks...) // It's OK to change mode bits on stuff within non-writable directories. - if p.ignorePermissions(file) { - p.dbUpdates <- dbUpdateJob{file, dbUpdateHandleDir} + if f.ignorePermissions(file) { + f.dbUpdates <- dbUpdateJob{file, dbUpdateHandleDir} } else if err := os.Chmod(realName, mode|(info.Mode()&retainBits)); err == nil { - p.dbUpdates <- dbUpdateJob{file, dbUpdateHandleDir} + f.dbUpdates <- dbUpdateJob{file, dbUpdateHandleDir} } else { - l.Infof("Puller (folder %q, dir %q): %v", p.folder, file.Name, err) - p.newError(file.Name, err) + l.Infof("Puller (folder %q, dir %q): %v", f.folderID, file.Name, err) + f.newError(file.Name, err) } } // deleteDir attempts to delete the given directory -func (p *rwFolder) deleteDir(file protocol.FileInfo, matcher *ignore.Matcher) { +func (f *rwFolder) deleteDir(file protocol.FileInfo, matcher *ignore.Matcher) { var err error events.Default.Log(events.ItemStarted, map[string]string{ - "folder": p.folder, + "folder": f.folderID, "item": file.Name, "type": "dir", "action": "delete", }) defer func() { events.Default.Log(events.ItemFinished, map[string]interface{}{ - "folder": p.folder, + "folder": f.folderID, "item": file.Name, "error": events.Error(err), "type": "dir", @@ -707,7 +659,7 @@ func (p *rwFolder) deleteDir(file protocol.FileInfo, matcher *ignore.Matcher) { }) }() - realName := filepath.Join(p.dir, file.Name) + realName := filepath.Join(f.dir, file.Name) // Delete any temporary files lying around in the directory dir, _ := os.Open(realName) if dir != nil { @@ -723,31 +675,31 @@ func (p *rwFolder) deleteDir(file protocol.FileInfo, matcher *ignore.Matcher) { err = osutil.InWritableDir(osutil.Remove, realName) if err == nil || os.IsNotExist(err) { // It was removed or it doesn't exist to start with - p.dbUpdates <- dbUpdateJob{file, dbUpdateDeleteDir} + f.dbUpdates <- dbUpdateJob{file, dbUpdateDeleteDir} } else if _, serr := os.Lstat(realName); serr != nil && !os.IsPermission(serr) { // We get an error just looking at the directory, and it's not a // permission problem. Lets assume the error is in fact some variant // of "file does not exist" (possibly expressed as some parent being a // file and not a directory etc) and that the delete is handled. - p.dbUpdates <- dbUpdateJob{file, dbUpdateDeleteDir} + f.dbUpdates <- dbUpdateJob{file, dbUpdateDeleteDir} } else { - l.Infof("Puller (folder %q, dir %q): delete: %v", p.folder, file.Name, err) - p.newError(file.Name, err) + l.Infof("Puller (folder %q, dir %q): delete: %v", f.folderID, file.Name, err) + f.newError(file.Name, err) } } // deleteFile attempts to delete the given file -func (p *rwFolder) deleteFile(file protocol.FileInfo) { +func (f *rwFolder) deleteFile(file protocol.FileInfo) { var err error events.Default.Log(events.ItemStarted, map[string]string{ - "folder": p.folder, + "folder": f.folderID, "item": file.Name, "type": "file", "action": "delete", }) defer func() { events.Default.Log(events.ItemFinished, map[string]interface{}{ - "folder": p.folder, + "folder": f.folderID, "item": file.Name, "error": events.Error(err), "type": "file", @@ -755,62 +707,62 @@ func (p *rwFolder) deleteFile(file protocol.FileInfo) { }) }() - realName := filepath.Join(p.dir, file.Name) + realName := filepath.Join(f.dir, file.Name) - cur, ok := p.model.CurrentFolderFile(p.folder, file.Name) - if ok && p.inConflict(cur.Version, file.Version) { + cur, ok := f.model.CurrentFolderFile(f.folderID, file.Name) + if ok && f.inConflict(cur.Version, file.Version) { // There is a conflict here. Move the file to a conflict copy instead // of deleting. Also merge with the version vector we had, to indicate // we have resolved the conflict. file.Version = file.Version.Merge(cur.Version) - err = osutil.InWritableDir(p.moveForConflict, realName) - } else if p.versioner != nil { - err = osutil.InWritableDir(p.versioner.Archive, realName) + err = osutil.InWritableDir(f.moveForConflict, realName) + } else if f.versioner != nil { + err = osutil.InWritableDir(f.versioner.Archive, realName) } else { err = osutil.InWritableDir(osutil.Remove, realName) } if err == nil || os.IsNotExist(err) { // It was removed or it doesn't exist to start with - p.dbUpdates <- dbUpdateJob{file, dbUpdateDeleteFile} + f.dbUpdates <- dbUpdateJob{file, dbUpdateDeleteFile} } else if _, serr := os.Lstat(realName); serr != nil && !os.IsPermission(serr) { // We get an error just looking at the file, and it's not a permission // problem. Lets assume the error is in fact some variant of "file // does not exist" (possibly expressed as some parent being a file and // not a directory etc) and that the delete is handled. - p.dbUpdates <- dbUpdateJob{file, dbUpdateDeleteFile} + f.dbUpdates <- dbUpdateJob{file, dbUpdateDeleteFile} } else { - l.Infof("Puller (folder %q, file %q): delete: %v", p.folder, file.Name, err) - p.newError(file.Name, err) + l.Infof("Puller (folder %q, file %q): delete: %v", f.folderID, file.Name, err) + f.newError(file.Name, err) } } // renameFile attempts to rename an existing file to a destination // and set the right attributes on it. -func (p *rwFolder) renameFile(source, target protocol.FileInfo) { +func (f *rwFolder) renameFile(source, target protocol.FileInfo) { var err error events.Default.Log(events.ItemStarted, map[string]string{ - "folder": p.folder, + "folder": f.folderID, "item": source.Name, "type": "file", "action": "delete", }) events.Default.Log(events.ItemStarted, map[string]string{ - "folder": p.folder, + "folder": f.folderID, "item": target.Name, "type": "file", "action": "update", }) defer func() { events.Default.Log(events.ItemFinished, map[string]interface{}{ - "folder": p.folder, + "folder": f.folderID, "item": source.Name, "error": events.Error(err), "type": "file", "action": "delete", }) events.Default.Log(events.ItemFinished, map[string]interface{}{ - "folder": p.folder, + "folder": f.folderID, "item": target.Name, "error": events.Error(err), "type": "file", @@ -818,15 +770,15 @@ func (p *rwFolder) renameFile(source, target protocol.FileInfo) { }) }() - l.Debugln(p, "taking rename shortcut", source.Name, "->", target.Name) + l.Debugln(f, "taking rename shortcut", source.Name, "->", target.Name) - from := filepath.Join(p.dir, source.Name) - to := filepath.Join(p.dir, target.Name) + from := filepath.Join(f.dir, source.Name) + to := filepath.Join(f.dir, target.Name) - if p.versioner != nil { + if f.versioner != nil { err = osutil.Copy(from, to) if err == nil { - err = osutil.InWritableDir(p.versioner.Archive, from) + err = osutil.InWritableDir(f.versioner.Archive, from) } } else { err = osutil.TryRename(from, to) @@ -837,16 +789,16 @@ func (p *rwFolder) renameFile(source, target protocol.FileInfo) { // of the source and the creation of the target. Fix-up the metadata, // and update the local index of the target file. - p.dbUpdates <- dbUpdateJob{source, dbUpdateDeleteFile} + f.dbUpdates <- dbUpdateJob{source, dbUpdateDeleteFile} - err = p.shortcutFile(target) + err = f.shortcutFile(target) if err != nil { - l.Infof("Puller (folder %q, file %q): rename from %q metadata: %v", p.folder, target.Name, source.Name, err) - p.newError(target.Name, err) + l.Infof("Puller (folder %q, file %q): rename from %q metadata: %v", f.folderID, target.Name, source.Name, err) + f.newError(target.Name, err) return } - p.dbUpdates <- dbUpdateJob{target, dbUpdateHandleFile} + f.dbUpdates <- dbUpdateJob{target, dbUpdateHandleFile} } else { // We failed the rename so we have a source file that we still need to // get rid of. Attempt to delete it instead so that we make *some* @@ -854,12 +806,12 @@ func (p *rwFolder) renameFile(source, target protocol.FileInfo) { err = osutil.InWritableDir(osutil.Remove, from) if err != nil { - l.Infof("Puller (folder %q, file %q): delete %q after failed rename: %v", p.folder, target.Name, source.Name, err) - p.newError(target.Name, err) + l.Infof("Puller (folder %q, file %q): delete %q after failed rename: %v", f.folderID, target.Name, source.Name, err) + f.newError(target.Name, err) return } - p.dbUpdates <- dbUpdateJob{source, dbUpdateDeleteFile} + f.dbUpdates <- dbUpdateJob{source, dbUpdateDeleteFile} } } @@ -899,33 +851,33 @@ func (p *rwFolder) renameFile(source, target protocol.FileInfo) { // handleFile queues the copies and pulls as necessary for a single new or // changed file. -func (p *rwFolder) handleFile(file protocol.FileInfo, copyChan chan<- copyBlocksState, finisherChan chan<- *sharedPullerState) { - curFile, hasCurFile := p.model.CurrentFolderFile(p.folder, file.Name) +func (f *rwFolder) handleFile(file protocol.FileInfo, copyChan chan<- copyBlocksState, finisherChan chan<- *sharedPullerState) { + curFile, hasCurFile := f.model.CurrentFolderFile(f.folderID, file.Name) if hasCurFile && len(curFile.Blocks) == len(file.Blocks) && scanner.BlocksEqual(curFile.Blocks, file.Blocks) { // We are supposed to copy the entire file, and then fetch nothing. We // are only updating metadata, so we don't actually *need* to make the // copy. - l.Debugln(p, "taking shortcut on", file.Name) + l.Debugln(f, "taking shortcut on", file.Name) events.Default.Log(events.ItemStarted, map[string]string{ - "folder": p.folder, + "folder": f.folderID, "item": file.Name, "type": "file", "action": "metadata", }) - p.queue.Done(file.Name) + f.queue.Done(file.Name) var err error if file.IsSymlink() { - err = p.shortcutSymlink(file) + err = f.shortcutSymlink(file) } else { - err = p.shortcutFile(file) + err = f.shortcutFile(file) } events.Default.Log(events.ItemFinished, map[string]interface{}{ - "folder": p.folder, + "folder": f.folderID, "item": file.Name, "error": events.Error(err), "type": "file", @@ -934,17 +886,17 @@ func (p *rwFolder) handleFile(file protocol.FileInfo, copyChan chan<- copyBlocks if err != nil { l.Infoln("Puller: shortcut:", err) - p.newError(file.Name, err) + f.newError(file.Name, err) } else { - p.dbUpdates <- dbUpdateJob{file, dbUpdateShortcutFile} + f.dbUpdates <- dbUpdateJob{file, dbUpdateShortcutFile} } return } // Figure out the absolute filenames we need once and for all - tempName := filepath.Join(p.dir, defTempNamer.TempName(file.Name)) - realName := filepath.Join(p.dir, file.Name) + tempName := filepath.Join(f.dir, defTempNamer.TempName(file.Name)) + realName := filepath.Join(f.dir, file.Name) if hasCurFile && !curFile.IsDirectory() && !curFile.IsSymlink() { // Check that the file on disk is what we expect it to be according to @@ -952,7 +904,7 @@ func (p *rwFolder) handleFile(file protocol.FileInfo, copyChan chan<- copyBlocks // changes that we don't know about yet and we should scan before // touching the file. If we can't stat the file we'll just pull it. if info, err := osutil.Lstat(realName); err == nil { - mtime := p.virtualMtimeRepo.GetMtime(file.Name, info.ModTime()) + mtime := f.virtualMtimeRepo.GetMtime(file.Name, info.ModTime()) if mtime.Unix() != curFile.Modified || info.Size() != curFile.Size() { l.Debugln("file modified but not rescanned; not pulling:", realName) // Scan() is synchronous (i.e. blocks until the scan is @@ -962,7 +914,7 @@ func (p *rwFolder) handleFile(file protocol.FileInfo, copyChan chan<- copyBlocks // sweep is complete. As we do retries, we'll queue the scan // for this file up to ten times, but the last nine of those // scans will be cheap... - go p.Scan([]string{file.Name}) + go f.scan.Scan([]string{file.Name}) return } } @@ -1012,10 +964,10 @@ func (p *rwFolder) handleFile(file protocol.FileInfo, copyChan chan<- copyBlocks blocksSize = file.Size() } - if p.checkFreeSpace { - if free, err := osutil.DiskFreeBytes(p.dir); err == nil && free < blocksSize { - l.Warnf(`Folder "%s": insufficient disk space in %s for %s: have %.2f MiB, need %.2f MiB`, p.folder, p.dir, file.Name, float64(free)/1024/1024, float64(blocksSize)/1024/1024) - p.newError(file.Name, errors.New("insufficient space")) + if f.checkFreeSpace { + if free, err := osutil.DiskFreeBytes(f.dir); err == nil && free < blocksSize { + l.Warnf(`Folder "%s": insufficient disk space in %s for %s: have %.2f MiB, need %.2f MiB`, f.folderID, f.dir, file.Name, float64(free)/1024/1024, float64(blocksSize)/1024/1024) + f.newError(file.Name, errors.New("insufficient space")) return } } @@ -1027,7 +979,7 @@ func (p *rwFolder) handleFile(file protocol.FileInfo, copyChan chan<- copyBlocks } events.Default.Log(events.ItemStarted, map[string]string{ - "folder": p.folder, + "folder": f.folderID, "item": file.Name, "type": "file", "action": "update", @@ -1035,7 +987,7 @@ func (p *rwFolder) handleFile(file protocol.FileInfo, copyChan chan<- copyBlocks s := sharedPullerState{ file: file, - folder: p.folder, + folder: f.folderID, tempName: tempName, realName: realName, copyTotal: len(blocks), @@ -1044,13 +996,13 @@ func (p *rwFolder) handleFile(file protocol.FileInfo, copyChan chan<- copyBlocks updated: time.Now(), available: reused, availableUpdated: time.Now(), - ignorePerms: p.ignorePermissions(file), + ignorePerms: f.ignorePermissions(file), version: curFile.Version, mut: sync.NewRWMutex(), - sparse: p.allowSparse, + sparse: f.allowSparse, } - l.Debugf("%v need file %s; copy %d, reused %v", p, file.Name, len(blocks), reused) + l.Debugf("%v need file %s; copy %d, reused %v", f, file.Name, len(blocks), reused) cs := copyBlocksState{ sharedPullerState: &s, @@ -1061,12 +1013,12 @@ func (p *rwFolder) handleFile(file protocol.FileInfo, copyChan chan<- copyBlocks // shortcutFile sets file mode and modification time, when that's the only // thing that has changed. -func (p *rwFolder) shortcutFile(file protocol.FileInfo) error { - realName := filepath.Join(p.dir, file.Name) - if !p.ignorePermissions(file) { +func (f *rwFolder) shortcutFile(file protocol.FileInfo) error { + realName := filepath.Join(f.dir, file.Name) + if !f.ignorePermissions(file) { if err := os.Chmod(realName, os.FileMode(file.Flags&0777)); err != nil { - l.Infof("Puller (folder %q, file %q): shortcut: chmod: %v", p.folder, file.Name, err) - p.newError(file.Name, err) + l.Infof("Puller (folder %q, file %q): shortcut: chmod: %v", f.folderID, file.Name, err) + f.newError(file.Name, err) return err } } @@ -1076,17 +1028,17 @@ func (p *rwFolder) shortcutFile(file protocol.FileInfo) error { // Try using virtual mtimes info, err := os.Stat(realName) if err != nil { - l.Infof("Puller (folder %q, file %q): shortcut: unable to stat file: %v", p.folder, file.Name, err) - p.newError(file.Name, err) + l.Infof("Puller (folder %q, file %q): shortcut: unable to stat file: %v", f.folderID, file.Name, err) + f.newError(file.Name, err) return err } - p.virtualMtimeRepo.UpdateMtime(file.Name, info.ModTime(), t) + f.virtualMtimeRepo.UpdateMtime(file.Name, info.ModTime(), t) } // This may have been a conflict. We should merge the version vectors so // that our clock doesn't move backwards. - if cur, ok := p.model.CurrentFolderFile(p.folder, file.Name); ok { + if cur, ok := f.model.CurrentFolderFile(f.folderID, file.Name); ok { file.Version = file.Version.Merge(cur.Version) } @@ -1094,22 +1046,22 @@ func (p *rwFolder) shortcutFile(file protocol.FileInfo) error { } // shortcutSymlink changes the symlinks type if necessary. -func (p *rwFolder) shortcutSymlink(file protocol.FileInfo) (err error) { +func (f *rwFolder) shortcutSymlink(file protocol.FileInfo) (err error) { tt := symlinks.TargetFile if file.IsDirectory() { tt = symlinks.TargetDirectory } - err = symlinks.ChangeType(filepath.Join(p.dir, file.Name), tt) + err = symlinks.ChangeType(filepath.Join(f.dir, file.Name), tt) if err != nil { - l.Infof("Puller (folder %q, file %q): symlink shortcut: %v", p.folder, file.Name, err) - p.newError(file.Name, err) + l.Infof("Puller (folder %q, file %q): symlink shortcut: %v", f.folderID, file.Name, err) + f.newError(file.Name, err) } return } // copierRoutine reads copierStates until the in channel closes and performs // the relevant copies when possible, or passes it to the puller routine. -func (p *rwFolder) copierRoutine(in <-chan copyBlocksState, pullChan chan<- pullBlockState, out chan<- *sharedPullerState) { +func (f *rwFolder) copierRoutine(in <-chan copyBlocksState, pullChan chan<- pullBlockState, out chan<- *sharedPullerState) { buf := make([]byte, protocol.BlockSize) for state := range in { @@ -1120,21 +1072,21 @@ func (p *rwFolder) copierRoutine(in <-chan copyBlocksState, pullChan chan<- pull continue } - if p.progressEmitter != nil { - p.progressEmitter.Register(state.sharedPullerState) + if f.model.progressEmitter != nil { + f.model.progressEmitter.Register(state.sharedPullerState) } folderRoots := make(map[string]string) var folders []string - p.model.fmut.RLock() - for folder, cfg := range p.model.folderCfgs { + f.model.fmut.RLock() + for folder, cfg := range f.model.folderCfgs { folderRoots[folder] = cfg.Path() folders = append(folders, folder) } - p.model.fmut.RUnlock() + f.model.fmut.RUnlock() for _, block := range state.blocks { - if p.allowSparse && state.reused == 0 && block.IsEmpty() { + if f.allowSparse && state.reused == 0 && block.IsEmpty() { // The block is a block of all zeroes, and we are not reusing // a temp file, so there is no need to do anything with it. // If we were reusing a temp file and had this block to copy, @@ -1147,7 +1099,7 @@ func (p *rwFolder) copierRoutine(in <-chan copyBlocksState, pullChan chan<- pull } buf = buf[:int(block.Size)] - found := p.model.finder.Iterate(folders, block.Hash, func(folder, file string, index int32) bool { + found := f.model.finder.Iterate(folders, block.Hash, func(folder, file string, index int32) bool { fd, err := os.Open(filepath.Join(folderRoots[folder], file)) if err != nil { return false @@ -1163,7 +1115,7 @@ func (p *rwFolder) copierRoutine(in <-chan copyBlocksState, pullChan chan<- pull if err != nil { if hash != nil { l.Debugf("Finder block mismatch in %s:%s:%d expected %q got %q", folder, file, index, block.Hash, hash) - err = p.model.finder.Fix(folder, file, index, block.Hash, hash) + err = f.model.finder.Fix(folder, file, index, block.Hash, hash) if err != nil { l.Warnln("finder fix:", err) } @@ -1202,7 +1154,7 @@ func (p *rwFolder) copierRoutine(in <-chan copyBlocksState, pullChan chan<- pull } } -func (p *rwFolder) pullerRoutine(in <-chan pullBlockState, out chan<- *sharedPullerState) { +func (f *rwFolder) pullerRoutine(in <-chan pullBlockState, out chan<- *sharedPullerState) { for state := range in { if state.failed() != nil { out <- state.sharedPullerState @@ -1218,7 +1170,7 @@ func (p *rwFolder) pullerRoutine(in <-chan pullBlockState, out chan<- *sharedPul continue } - if p.allowSparse && state.reused == 0 && state.block.IsEmpty() { + if f.allowSparse && state.reused == 0 && state.block.IsEmpty() { // There is no need to request a block of all zeroes. Pretend we // requested it and handled it correctly. state.pullDone(state.block) @@ -1227,7 +1179,7 @@ func (p *rwFolder) pullerRoutine(in <-chan pullBlockState, out chan<- *sharedPul } var lastError error - candidates := p.model.Availability(p.folder, state.file.Name, state.file.Version, state.block) + candidates := f.model.Availability(f.folderID, state.file.Name, state.file.Version, state.block) for { // Select the least busy device to pull the block from. If we found no // feasible device at all, fail the block (and in the long run, the @@ -1247,10 +1199,10 @@ func (p *rwFolder) pullerRoutine(in <-chan pullBlockState, out chan<- *sharedPul // Fetch the block, while marking the selected device as in use so that // leastBusy can select another device when someone else asks. activity.using(selected) - buf, lastError := p.model.requestGlobal(selected.ID, p.folder, state.file.Name, state.block.Offset, int(state.block.Size), state.block.Hash, selected.FromTemporary) + buf, lastError := f.model.requestGlobal(selected.ID, f.folderID, state.file.Name, state.block.Offset, int(state.block.Size), state.block.Hash, selected.FromTemporary) activity.done(selected) if lastError != nil { - l.Debugln("request:", p.folder, state.file.Name, state.block.Offset, state.block.Size, "returned error:", lastError) + l.Debugln("request:", f.folderID, state.file.Name, state.block.Offset, state.block.Size, "returned error:", lastError) continue } @@ -1258,7 +1210,7 @@ func (p *rwFolder) pullerRoutine(in <-chan pullBlockState, out chan<- *sharedPul // try pulling it from another device. _, lastError = scanner.VerifyBuffer(buf, state.block) if lastError != nil { - l.Debugln("request:", p.folder, state.file.Name, state.block.Offset, state.block.Size, "hash mismatch") + l.Debugln("request:", f.folderID, state.file.Name, state.block.Offset, state.block.Size, "hash mismatch") continue } @@ -1275,9 +1227,9 @@ func (p *rwFolder) pullerRoutine(in <-chan pullBlockState, out chan<- *sharedPul } } -func (p *rwFolder) performFinish(state *sharedPullerState) error { +func (f *rwFolder) performFinish(state *sharedPullerState) error { // Set the correct permission bits on the new file - if !p.ignorePermissions(state.file) { + if !f.ignorePermissions(state.file) { if err := os.Chmod(state.tempName, os.FileMode(state.file.Flags&0777)); err != nil { return err } @@ -1291,7 +1243,7 @@ func (p *rwFolder) performFinish(state *sharedPullerState) error { if err != nil { return err } - p.virtualMtimeRepo.UpdateMtime(state.file.Name, info.ModTime(), t) + f.virtualMtimeRepo.UpdateMtime(state.file.Name, info.ModTime(), t) } if stat, err := osutil.Lstat(state.realName); err == nil { @@ -1312,23 +1264,23 @@ func (p *rwFolder) performFinish(state *sharedPullerState) error { return err } - case p.inConflict(state.version, state.file.Version): + case f.inConflict(state.version, state.file.Version): // The new file has been changed in conflict with the existing one. We // should file it away as a conflict instead of just removing or // archiving. Also merge with the version vector we had, to indicate // we have resolved the conflict. state.file.Version = state.file.Version.Merge(state.version) - if err = osutil.InWritableDir(p.moveForConflict, state.realName); err != nil { + if err = osutil.InWritableDir(f.moveForConflict, state.realName); err != nil { return err } - case p.versioner != nil: + case f.versioner != nil: // If we should use versioning, let the versioner archive the old // file before we replace it. Archiving a non-existent file is not // an error. - if err = p.versioner.Archive(state.realName); err != nil { + if err = f.versioner.Archive(state.realName); err != nil { return err } } @@ -1361,7 +1313,7 @@ func (p *rwFolder) performFinish(state *sharedPullerState) error { } // Record the updated file in the index - p.dbUpdates <- dbUpdateJob{state.file, dbUpdateHandleFile} + f.dbUpdates <- dbUpdateJob{state.file, dbUpdateHandleFile} return nil } @@ -1381,15 +1333,15 @@ func (p *rwFolder) finisherRoutine(in <-chan *sharedPullerState) { p.newError(state.file.Name, err) } events.Default.Log(events.ItemFinished, map[string]interface{}{ - "folder": p.folder, + "folder": p.folderID, "item": state.file.Name, "error": events.Error(err), "type": "file", "action": "update", }) - if p.progressEmitter != nil { - p.progressEmitter.Deregister(state) + if p.model.progressEmitter != nil { + p.model.progressEmitter.Deregister(state) } } } @@ -1404,10 +1356,6 @@ func (p *rwFolder) Jobs() ([]string, []string) { return p.queue.Jobs() } -func (p *rwFolder) DelayScan(next time.Duration) { - p.delayScan <- next -} - // dbUpdaterRoutine aggregates db updates and commits them in batches no // larger than 1000 items, and no more delayed than 2 seconds. func (p *rwFolder) dbUpdaterRoutine() { @@ -1439,10 +1387,10 @@ func (p *rwFolder) dbUpdaterRoutine() { lastFile = job.file } - p.model.updateLocals(p.folder, files) + p.model.updateLocals(p.folderID, files) if found { - p.model.receivedFile(p.folder, lastFile) + p.model.receivedFile(p.folderID, lastFile) } batch = batch[:0] @@ -1481,7 +1429,7 @@ func (p *rwFolder) inConflict(current, replacement protocol.Vector) bool { // Obvious case return true } - if replacement.Counter(p.shortID) > current.Counter(p.shortID) { + if replacement.Counter(p.model.shortID) > current.Counter(p.model.shortID) { // The replacement file contains a higher version for ourselves than // what we have. This isn't supposed to be possible, since it's only // we who can increment that counter. We take it as a sign that diff --git a/lib/model/rwfolder_test.go b/lib/model/rwfolder_test.go index 62f0e5b02..0dd6d6053 100644 --- a/lib/model/rwfolder_test.go +++ b/lib/model/rwfolder_test.go @@ -67,13 +67,14 @@ func setUpModel(file protocol.FileInfo) *Model { } func setUpRwFolder(model *Model) rwFolder { - return rwFolder{ - folder: "default", + f := rwFolder{ dir: "testdata", - model: model, errors: make(map[string]string), errorsMut: sync.NewMutex(), } + f.folderID = "default" + f.model = model + return f } // Layout of the files: (indexes from the above array) @@ -329,17 +330,17 @@ func TestDeregisterOnFailInCopy(t *testing.T) { m.AddFolder(defaultFolderConfig) emitter := NewProgressEmitter(defaultConfig) + m.progressEmitter = emitter go emitter.Serve() p := rwFolder{ - folder: "default", - dir: "testdata", - model: m, - queue: newJobQueue(), - progressEmitter: emitter, - errors: make(map[string]string), - errorsMut: sync.NewMutex(), + dir: "testdata", + queue: newJobQueue(), + errors: make(map[string]string), + errorsMut: sync.NewMutex(), } + p.folderID = "default" + p.model = m // queue.Done should be called by the finisher routine p.queue.Push("filex", 0, 0) @@ -373,7 +374,7 @@ func TestDeregisterOnFailInCopy(t *testing.T) { case state := <-finisherBufferChan: // At this point the file should still be registered with both the job // queue, and the progress emitter. Verify this. - if p.progressEmitter.lenRegistry() != 1 || p.queue.lenProgress() != 1 || p.queue.lenQueued() != 0 { + if p.model.progressEmitter.lenRegistry() != 1 || p.queue.lenProgress() != 1 || p.queue.lenQueued() != 0 { t.Fatal("Could not find file") } @@ -388,16 +389,16 @@ func TestDeregisterOnFailInCopy(t *testing.T) { t.Fatal("File not closed?") } - if p.progressEmitter.lenRegistry() != 0 || p.queue.lenProgress() != 0 || p.queue.lenQueued() != 0 { - t.Fatal("Still registered", p.progressEmitter.lenRegistry(), p.queue.lenProgress(), p.queue.lenQueued()) + if p.model.progressEmitter.lenRegistry() != 0 || p.queue.lenProgress() != 0 || p.queue.lenQueued() != 0 { + t.Fatal("Still registered", p.model.progressEmitter.lenRegistry(), p.queue.lenProgress(), p.queue.lenQueued()) } // Doing it again should have no effect finisherChan <- state time.Sleep(100 * time.Millisecond) - if p.progressEmitter.lenRegistry() != 0 || p.queue.lenProgress() != 0 || p.queue.lenQueued() != 0 { - t.Fatal("Still registered", p.progressEmitter.lenRegistry(), p.queue.lenProgress(), p.queue.lenQueued()) + if p.model.progressEmitter.lenRegistry() != 0 || p.queue.lenProgress() != 0 || p.queue.lenQueued() != 0 { + t.Fatal("Still registered", p.model.progressEmitter.lenRegistry(), p.queue.lenProgress(), p.queue.lenQueued()) } case <-time.After(time.Second): t.Fatal("Didn't get anything to the finisher") @@ -413,17 +414,17 @@ func TestDeregisterOnFailInPull(t *testing.T) { m.AddFolder(defaultFolderConfig) emitter := NewProgressEmitter(defaultConfig) + m.progressEmitter = emitter go emitter.Serve() p := rwFolder{ - folder: "default", - dir: "testdata", - model: m, - queue: newJobQueue(), - progressEmitter: emitter, - errors: make(map[string]string), - errorsMut: sync.NewMutex(), + dir: "testdata", + queue: newJobQueue(), + errors: make(map[string]string), + errorsMut: sync.NewMutex(), } + p.folderID = "default" + p.model = m // queue.Done should be called by the finisher routine p.queue.Push("filex", 0, 0) @@ -450,7 +451,7 @@ func TestDeregisterOnFailInPull(t *testing.T) { case state := <-finisherBufferChan: // At this point the file should still be registered with both the job // queue, and the progress emitter. Verify this. - if p.progressEmitter.lenRegistry() != 1 || p.queue.lenProgress() != 1 || p.queue.lenQueued() != 0 { + if p.model.progressEmitter.lenRegistry() != 1 || p.queue.lenProgress() != 1 || p.queue.lenQueued() != 0 { t.Fatal("Could not find file") } @@ -465,16 +466,16 @@ func TestDeregisterOnFailInPull(t *testing.T) { t.Fatal("File not closed?") } - if p.progressEmitter.lenRegistry() != 0 || p.queue.lenProgress() != 0 || p.queue.lenQueued() != 0 { - t.Fatal("Still registered", p.progressEmitter.lenRegistry(), p.queue.lenProgress(), p.queue.lenQueued()) + if p.model.progressEmitter.lenRegistry() != 0 || p.queue.lenProgress() != 0 || p.queue.lenQueued() != 0 { + t.Fatal("Still registered", p.model.progressEmitter.lenRegistry(), p.queue.lenProgress(), p.queue.lenQueued()) } // Doing it again should have no effect finisherChan <- state time.Sleep(100 * time.Millisecond) - if p.progressEmitter.lenRegistry() != 0 || p.queue.lenProgress() != 0 || p.queue.lenQueued() != 0 { - t.Fatal("Still registered", p.progressEmitter.lenRegistry(), p.queue.lenProgress(), p.queue.lenQueued()) + if p.model.progressEmitter.lenRegistry() != 0 || p.queue.lenProgress() != 0 || p.queue.lenQueued() != 0 { + t.Fatal("Still registered", p.model.progressEmitter.lenRegistry(), p.queue.lenProgress(), p.queue.lenQueued()) } case <-time.After(time.Second): t.Fatal("Didn't get anything to the finisher")