diff --git a/lib/model/model.go b/lib/model/model.go index 87cfbb21a..14724eb9c 100644 --- a/lib/model/model.go +++ b/lib/model/model.go @@ -75,15 +75,16 @@ type Model struct { clientName string clientVersion string - folderCfgs map[string]config.FolderConfiguration // folder -> cfg - folderFiles map[string]*db.FileSet // folder -> files - folderDevices map[string][]protocol.DeviceID // folder -> deviceIDs - deviceFolders map[protocol.DeviceID][]string // deviceID -> folders - deviceStatRefs map[protocol.DeviceID]*stats.DeviceStatisticsReference // deviceID -> statsRef - folderIgnores map[string]*ignore.Matcher // folder -> matcher object - folderRunners map[string]service // folder -> puller or scanner - folderStatRefs map[string]*stats.FolderStatisticsReference // folder -> statsRef - fmut sync.RWMutex // protects the above + folderCfgs map[string]config.FolderConfiguration // folder -> cfg + folderFiles map[string]*db.FileSet // folder -> files + folderDevices map[string][]protocol.DeviceID // folder -> deviceIDs + deviceFolders map[protocol.DeviceID][]string // deviceID -> folders + deviceStatRefs map[protocol.DeviceID]*stats.DeviceStatisticsReference // deviceID -> statsRef + folderIgnores map[string]*ignore.Matcher // folder -> matcher object + folderRunners map[string]service // folder -> puller or scanner + folderRunnerTokens map[string][]suture.ServiceToken // folder -> tokens for puller or scanner + folderStatRefs map[string]*stats.FolderStatisticsReference // folder -> statsRef + fmut sync.RWMutex // protects the above conn map[protocol.DeviceID]Connection deviceVer map[protocol.DeviceID]string @@ -105,28 +106,29 @@ func NewModel(cfg *config.Wrapper, id protocol.DeviceID, deviceName, clientName, l.Debugln(line) }, }), - cfg: cfg, - db: ldb, - finder: db.NewBlockFinder(ldb), - progressEmitter: NewProgressEmitter(cfg), - id: id, - shortID: id.Short(), - cacheIgnoredFiles: cfg.Options().CacheIgnoredFiles, - protectedFiles: protectedFiles, - deviceName: deviceName, - clientName: clientName, - clientVersion: clientVersion, - folderCfgs: make(map[string]config.FolderConfiguration), - folderFiles: make(map[string]*db.FileSet), - folderDevices: make(map[string][]protocol.DeviceID), - deviceFolders: make(map[protocol.DeviceID][]string), - deviceStatRefs: make(map[protocol.DeviceID]*stats.DeviceStatisticsReference), - folderIgnores: make(map[string]*ignore.Matcher), - folderRunners: make(map[string]service), - folderStatRefs: make(map[string]*stats.FolderStatisticsReference), - conn: make(map[protocol.DeviceID]Connection), - deviceVer: make(map[protocol.DeviceID]string), - devicePaused: make(map[protocol.DeviceID]bool), + cfg: cfg, + db: ldb, + finder: db.NewBlockFinder(ldb), + progressEmitter: NewProgressEmitter(cfg), + id: id, + shortID: id.Short(), + cacheIgnoredFiles: cfg.Options().CacheIgnoredFiles, + protectedFiles: protectedFiles, + deviceName: deviceName, + clientName: clientName, + clientVersion: clientVersion, + folderCfgs: make(map[string]config.FolderConfiguration), + folderFiles: make(map[string]*db.FileSet), + folderDevices: make(map[string][]protocol.DeviceID), + deviceFolders: make(map[protocol.DeviceID][]string), + deviceStatRefs: make(map[protocol.DeviceID]*stats.DeviceStatisticsReference), + folderIgnores: make(map[string]*ignore.Matcher), + folderRunners: make(map[string]service), + folderRunnerTokens: make(map[string][]suture.ServiceToken), + folderStatRefs: make(map[string]*stats.FolderStatisticsReference), + conn: make(map[protocol.DeviceID]Connection), + deviceVer: make(map[protocol.DeviceID]string), + devicePaused: make(map[protocol.DeviceID]bool), fmut: sync.NewRWMutex(), pmut: sync.NewRWMutex(), @@ -163,7 +165,6 @@ func (m *Model) StartFolderRW(folder string) { } p := newRWFolder(m, m.shortID, cfg) m.folderRunners[folder] = p - m.fmut.Unlock() if len(cfg.Versioning.Type) > 0 { factory, ok := versioner.Factories[cfg.Versioning.Type] @@ -176,14 +177,17 @@ func (m *Model) StartFolderRW(folder string) { // The versioner implements the suture.Service interface, so // expects to be run in the background in addition to being called // when files are going to be archived. - m.Add(service) + token := m.Add(service) + m.folderRunnerTokens[folder] = append(m.folderRunnerTokens[folder], token) } p.versioner = versioner } m.warnAboutOverwritingProtectedFiles(folder) - m.Add(p) + token := m.Add(p) + m.folderRunnerTokens[folder] = append(m.folderRunnerTokens[folder], token) + m.fmut.Unlock() l.Okln("Ready to synchronize", folder, "(read-write)") } @@ -232,13 +236,49 @@ func (m *Model) StartFolderRO(folder string) { } s := newROFolder(m, folder, time.Duration(cfg.RescanIntervalS)*time.Second) m.folderRunners[folder] = s + + token := m.Add(s) + m.folderRunnerTokens[folder] = append(m.folderRunnerTokens[folder], token) m.fmut.Unlock() - m.Add(s) - l.Okln("Ready to synchronize", folder, "(read only; no external updates accepted)") } +func (m *Model) RemoveFolder(folder string) { + m.fmut.Lock() + m.pmut.Lock() + + // Stop the services running for this folder + for _, id := range m.folderRunnerTokens[folder] { + m.Remove(id) + } + + // Close connections to affected devices + for _, dev := range m.folderDevices[folder] { + if conn, ok := m.conn[dev]; ok { + closeRawConn(conn) + } + } + + // Clean up our config maps + delete(m.folderCfgs, folder) + delete(m.folderFiles, folder) + delete(m.folderDevices, folder) + delete(m.folderIgnores, folder) + delete(m.folderRunners, folder) + delete(m.folderRunnerTokens, folder) + delete(m.folderStatRefs, folder) + for dev, folders := range m.deviceFolders { + m.deviceFolders[dev] = stringSliceWithout(folders, folder) + } + + // Remove it from the database + db.DropFolder(m.db, folder) + + m.pmut.Unlock() + m.fmut.Unlock() +} + type ConnectionInfo struct { protocol.Statistics Connected bool @@ -1250,6 +1290,11 @@ nextSub: } subs = unifySubs + // The cancel channel is closed whenever we return (such as from an error), + // to signal the potentially still running walker to stop. + cancel := make(chan struct{}) + defer close(cancel) + w := &scanner.Walker{ Folder: folderCfg.ID, Dir: folderCfg.Path(), @@ -1265,6 +1310,7 @@ nextSub: Hashers: m.numHashers(folder), ShortID: m.shortID, ProgressTickIntervalS: folderCfg.ScanProgressIntervalS, + Cancel: cancel, } runner.setState(FolderScanning) @@ -1674,17 +1720,17 @@ func (m *Model) BringToFront(folder, file string) { // CheckFolderHealth checks the folder for common errors and returns the // current folder error, or nil if the folder is healthy. func (m *Model) CheckFolderHealth(id string) error { + folder, ok := m.cfg.Folders()[id] + if !ok { + return errors.New("folder does not exist") + } + if minFree := m.cfg.Options().MinHomeDiskFreePct; minFree > 0 { if free, err := osutil.DiskFreePercentage(m.cfg.ConfigPath()); err == nil && free < minFree { return errors.New("home disk has insufficient free space") } } - folder, ok := m.cfg.Folders()[id] - if !ok { - return errors.New("folder does not exist") - } - fi, err := os.Stat(folder.Path()) v, ok := m.CurrentLocalVersion(id) @@ -1797,9 +1843,9 @@ func (m *Model) CommitConfiguration(from, to config.Configuration) bool { for folderID, fromCfg := range fromFolders { toCfg, ok := toFolders[folderID] if !ok { - // A folder was removed. Requires restart. - l.Debugln(m, "requires restart, removing folder", folderID) - return false + // The folder was removed. + m.RemoveFolder(folderID) + continue } // This folder exists on both sides. Compare the device lists, as we @@ -1961,3 +2007,14 @@ func closeRawConn(conn io.Closer) error { } return conn.Close() } + +func stringSliceWithout(ss []string, s string) []string { + for i := range ss { + if ss[i] == s { + copy(ss[i:], ss[i+1:]) + ss = ss[:len(ss)-1] + return ss + } + } + return ss +} diff --git a/lib/scanner/blockqueue.go b/lib/scanner/blockqueue.go index ea5f374d6..9c878733b 100644 --- a/lib/scanner/blockqueue.go +++ b/lib/scanner/blockqueue.go @@ -19,13 +19,13 @@ import ( // workers are used in parallel. The outbox will become closed when the inbox // is closed and all items handled. -func newParallelHasher(dir string, blockSize, workers int, outbox, inbox chan protocol.FileInfo, counter *int64, done chan struct{}) { +func newParallelHasher(dir string, blockSize, workers int, outbox, inbox chan protocol.FileInfo, counter *int64, done, cancel chan struct{}) { wg := sync.NewWaitGroup() wg.Add(workers) for i := 0; i < workers; i++ { go func() { - hashFiles(dir, blockSize, outbox, inbox, counter) + hashFiles(dir, blockSize, outbox, inbox, counter, cancel) wg.Done() }() } @@ -59,19 +59,33 @@ func HashFile(path string, blockSize int, sizeHint int64, counter *int64) ([]pro return Blocks(fd, blockSize, sizeHint, counter) } -func hashFiles(dir string, blockSize int, outbox, inbox chan protocol.FileInfo, counter *int64) { - for f := range inbox { - if f.IsDirectory() || f.IsDeleted() { - panic("Bug. Asked to hash a directory or a deleted file.") - } +func hashFiles(dir string, blockSize int, outbox, inbox chan protocol.FileInfo, counter *int64, cancel chan struct{}) { + for { + select { + case f, ok := <-inbox: + if !ok { + return + } - blocks, err := HashFile(filepath.Join(dir, f.Name), blockSize, f.CachedSize, counter) - if err != nil { - l.Debugln("hash error:", f.Name, err) - continue - } + if f.IsDirectory() || f.IsDeleted() { + panic("Bug. Asked to hash a directory or a deleted file.") + } - f.Blocks = blocks - outbox <- f + blocks, err := HashFile(filepath.Join(dir, f.Name), blockSize, f.CachedSize, counter) + if err != nil { + l.Debugln("hash error:", f.Name, err) + continue + } + + f.Blocks = blocks + select { + case outbox <- f: + case <-cancel: + return + } + + case <-cancel: + return + } } } diff --git a/lib/scanner/walk.go b/lib/scanner/walk.go index b827ed9fa..17a06442f 100644 --- a/lib/scanner/walk.go +++ b/lib/scanner/walk.go @@ -72,6 +72,8 @@ type Walker struct { // Optional progress tick interval which defines how often FolderScanProgress // events are emitted. Negative number means disabled. ProgressTickIntervalS int + // Signals cancel from the outside - when closed, we should stop walking. + Cancel chan struct{} } type TempNamer interface { @@ -121,7 +123,7 @@ func (w *Walker) Walk() (chan protocol.FileInfo, error) { // We're not required to emit scan progress events, just kick off hashers, // and feed inputs directly from the walker. if w.ProgressTickIntervalS < 0 { - newParallelHasher(w.Dir, w.BlockSize, w.Hashers, finishedChan, toHashChan, nil, nil) + newParallelHasher(w.Dir, w.BlockSize, w.Hashers, finishedChan, toHashChan, nil, nil, w.Cancel) return finishedChan, nil } @@ -149,7 +151,7 @@ func (w *Walker) Walk() (chan protocol.FileInfo, error) { realToHashChan := make(chan protocol.FileInfo) done := make(chan struct{}) - newParallelHasher(w.Dir, w.BlockSize, w.Hashers, finishedChan, realToHashChan, &progress, done) + newParallelHasher(w.Dir, w.BlockSize, w.Hashers, finishedChan, realToHashChan, &progress, done, w.Cancel) // A routine which actually emits the FolderScanProgress events // every w.ProgressTicker ticks, until the hasher routines terminate. @@ -168,13 +170,21 @@ func (w *Walker) Walk() (chan protocol.FileInfo, error) { "current": current, "total": total, }) + case <-w.Cancel: + ticker.Stop() + return } } }() + loop: for _, file := range filesToHash { l.Debugln("real to hash:", file.Name) - realToHashChan <- file + select { + case realToHashChan <- file: + case <-w.Cancel: + break loop + } } close(realToHashChan) }() @@ -329,7 +339,11 @@ func (w *Walker) walkAndHashFiles(fchan, dchan chan protocol.FileInfo) filepath. l.Debugln("symlink changedb:", p, f) - dchan <- f + select { + case dchan <- f: + case <-w.Cancel: + return errors.New("cancelled") + } return skip } @@ -363,7 +377,13 @@ func (w *Walker) walkAndHashFiles(fchan, dchan chan protocol.FileInfo) filepath. Modified: mtime.Unix(), } l.Debugln("dir:", p, f) - dchan <- f + + select { + case dchan <- f: + case <-w.Cancel: + return errors.New("cancelled") + } + return nil } @@ -406,7 +426,12 @@ func (w *Walker) walkAndHashFiles(fchan, dchan chan protocol.FileInfo) filepath. CachedSize: info.Size(), } l.Debugln("to hash:", p, f) - fchan <- f + + select { + case fchan <- f: + case <-w.Cancel: + return errors.New("cancelled") + } } return nil