From 51185381797a6309c5f579d4de6d8ba4dd9e1e76 Mon Sep 17 00:00:00 2001 From: Jakob Borg Date: Sat, 2 Sep 2023 16:42:46 +0200 Subject: [PATCH] lib/model: Refactor folderRunners to use a serviceMap (#9071) Instead of separately tracking the token. Also changes serviceMap to have a channel version of RemoveAndWait, so that it's possible to do the removal under a lock but wait outside of the lock. And changed where we do that in connection close, reversing the change that happened when I added the serviceMap in 40b3b9ad1. --- lib/model/folder_recvonly_test.go | 3 +- lib/model/folder_sendrecv_test.go | 3 +- lib/model/indexhandler.go | 5 +- lib/model/model.go | 85 ++++++++++++++++--------------- lib/model/model_test.go | 8 +-- lib/model/requests_test.go | 3 +- lib/model/service_map.go | 36 +++++++++---- lib/model/service_map_test.go | 3 +- 8 files changed, 85 insertions(+), 61 deletions(-) diff --git a/lib/model/folder_recvonly_test.go b/lib/model/folder_recvonly_test.go index 9a583e28f..a30b6b240 100644 --- a/lib/model/folder_recvonly_test.go +++ b/lib/model/folder_recvonly_test.go @@ -537,7 +537,8 @@ func setupROFolder(t *testing.T) (*testModel, *receiveOnlyFolder, context.Cancel m.fmut.RLock() defer m.fmut.RUnlock() - f := m.folderRunners["ro"].(*receiveOnlyFolder) + r, _ := m.folderRunners.Get("ro") + f := r.(*receiveOnlyFolder) return m, f, cancel } diff --git a/lib/model/folder_sendrecv_test.go b/lib/model/folder_sendrecv_test.go index 9b3edffaa..11b6e01f1 100644 --- a/lib/model/folder_sendrecv_test.go +++ b/lib/model/folder_sendrecv_test.go @@ -117,7 +117,8 @@ func setupSendReceiveFolder(t testing.TB, files ...protocol.FileInfo) (*testMode model := setupModel(t, w) model.cancel() <-model.stopped - f := model.folderRunners[fcfg.ID].(*sendReceiveFolder) + r, _ := model.folderRunners.Get(fcfg.ID) + f := r.(*sendReceiveFolder) f.tempPullErrors = make(map[string]string) f.ctx = context.Background() diff --git a/lib/model/indexhandler.go b/lib/model/indexhandler.go index ae11bbf48..30b56c695 100644 --- a/lib/model/indexhandler.go +++ b/lib/model/indexhandler.go @@ -427,7 +427,7 @@ func (r *indexHandlerRegistry) AddIndexInfo(folder string, startInfo *clusterCon r.mut.Lock() defer r.mut.Unlock() - if r.indexHandlers.RemoveAndWait(folder, 0) { + if r.indexHandlers.RemoveAndWait(folder, 0) == nil { l.Debugf("Removed index sender for device %v and folder %v due to added pending", r.conn.DeviceID().Short(), folder) } folderState, ok := r.folderStates[folder] @@ -458,11 +458,12 @@ func (r *indexHandlerRegistry) RemoveAllExcept(except map[string]remoteFolderSta r.mut.Lock() defer r.mut.Unlock() - r.indexHandlers.Each(func(folder string, is *indexHandler) { + r.indexHandlers.Each(func(folder string, is *indexHandler) error { if _, ok := except[folder]; !ok { r.indexHandlers.RemoveAndWait(folder, 0) l.Debugf("Removed index handler for device %v and folder %v (removeAllExcept)", r.conn.DeviceID().Short(), folder) } + return nil }) for folder := range r.startInfos { if _, ok := except[folder]; !ok { diff --git a/lib/model/model.go b/lib/model/model.go index 2ab5e8b22..02cf8636d 100644 --- a/lib/model/model.go +++ b/lib/model/model.go @@ -150,8 +150,7 @@ type model struct { folderFiles map[string]*db.FileSet // folder -> files deviceStatRefs map[protocol.DeviceID]*stats.DeviceStatisticsReference // deviceID -> statsRef folderIgnores map[string]*ignore.Matcher // folder -> matcher object - folderRunners map[string]service // folder -> puller or scanner - folderRunnerToken map[string]suture.ServiceToken // folder -> token for folder runner + folderRunners *serviceMap[string, service] // folder -> puller or scanner folderRestartMuts syncMutexMap // folder -> restart mutex folderVersioners map[string]versioner.Versioner // folder -> versioner (may be nil) folderEncryptionPasswordTokens map[string][]byte // folder -> encryption token (may be missing, and only for encryption type folders) @@ -234,8 +233,7 @@ func NewModel(cfg config.Wrapper, id protocol.DeviceID, clientName, clientVersio folderFiles: make(map[string]*db.FileSet), deviceStatRefs: make(map[protocol.DeviceID]*stats.DeviceStatisticsReference), folderIgnores: make(map[string]*ignore.Matcher), - folderRunners: make(map[string]service), - folderRunnerToken: make(map[string]suture.ServiceToken), + folderRunners: newServiceMap[string, service](evLogger), folderVersioners: make(map[string]versioner.Versioner), folderEncryptionPasswordTokens: make(map[string][]byte), folderEncryptionFailures: make(map[string]map[protocol.DeviceID]error), @@ -253,6 +251,7 @@ func NewModel(cfg config.Wrapper, id protocol.DeviceID, clientName, clientVersio for devID := range cfg.Devices() { m.deviceStatRefs[devID] = stats.NewDeviceStatisticsReference(m.db, devID) } + m.Add(m.folderRunners) m.Add(m.progressEmitter) m.Add(m.indexHandlers) m.Add(svcutil.AsService(m.serve, m.String())) @@ -350,7 +349,7 @@ func (m *model) addAndStartFolderLockedWithIgnores(cfg config.FolderConfiguratio m.folderFiles[cfg.ID] = fset m.folderIgnores[cfg.ID] = ignores - _, ok := m.folderRunners[cfg.ID] + _, ok := m.folderRunners.Get(cfg.ID) if ok { l.Warnln("Cannot start already running folder", cfg.Description()) panic("cannot start already running folder") @@ -413,13 +412,10 @@ func (m *model) addAndStartFolderLockedWithIgnores(cfg config.FolderConfiguratio } m.folderVersioners[folder] = ver - p := folderFactory(m, fset, ignores, cfg, ver, m.evLogger, m.folderIOLimiter) - - m.folderRunners[folder] = p - m.warnAboutOverwritingProtectedFiles(cfg, ignores) - m.folderRunnerToken[folder] = m.Add(p) + p := folderFactory(m, fset, ignores, cfg, ver, m.evLogger, m.folderIOLimiter) + m.folderRunners.Add(folder, p) l.Infof("Ready to synchronize %s (%s)", cfg.Description(), cfg.Type) } @@ -459,11 +455,9 @@ func (m *model) warnAboutOverwritingProtectedFiles(cfg config.FolderConfiguratio func (m *model) removeFolder(cfg config.FolderConfiguration) { m.fmut.RLock() - token, ok := m.folderRunnerToken[cfg.ID] + wait := m.folderRunners.RemoveAndWaitChan(cfg.ID, 0) m.fmut.RUnlock() - if ok { - m.RemoveAndWait(token, 0) - } + <-wait // We need to hold both fmut and pmut and must acquire locks in the same // order always. (The locks can be *released* in any order.) @@ -488,8 +482,9 @@ func (m *model) removeFolder(cfg config.FolderConfiguration) { } m.cleanupFolderLocked(cfg) - m.indexHandlers.Each(func(_ protocol.DeviceID, r *indexHandlerRegistry) { + m.indexHandlers.Each(func(_ protocol.DeviceID, r *indexHandlerRegistry) error { r.Remove(cfg.ID) + return nil }) m.fmut.Unlock() @@ -505,8 +500,6 @@ func (m *model) cleanupFolderLocked(cfg config.FolderConfiguration) { delete(m.folderCfgs, cfg.ID) delete(m.folderFiles, cfg.ID) delete(m.folderIgnores, cfg.ID) - delete(m.folderRunners, cfg.ID) - delete(m.folderRunnerToken, cfg.ID) delete(m.folderVersioners, cfg.ID) delete(m.folderEncryptionPasswordTokens, cfg.ID) delete(m.folderEncryptionFailures, cfg.ID) @@ -533,11 +526,9 @@ func (m *model) restartFolder(from, to config.FolderConfiguration, cacheIgnoredF defer restartMut.Unlock() m.fmut.RLock() - token, ok := m.folderRunnerToken[from.ID] + wait := m.folderRunners.RemoveAndWaitChan(from.ID, 0) m.fmut.RUnlock() - if ok { - m.RemoveAndWait(token, 0) - } + <-wait m.fmut.Lock() defer m.fmut.Unlock() @@ -564,8 +555,10 @@ func (m *model) restartFolder(from, to config.FolderConfiguration, cacheIgnoredF // Care needs to be taken because we already hold fmut and the lock order // must be the same everywhere. As fmut is acquired first, this is fine. m.pmut.RLock() - m.indexHandlers.Each(func(_ protocol.DeviceID, r *indexHandlerRegistry) { - r.RegisterFolderState(to, fset, m.folderRunners[to.ID]) + runner, _ := m.folderRunners.Get(to.ID) + m.indexHandlers.Each(func(_ protocol.DeviceID, r *indexHandlerRegistry) error { + r.RegisterFolderState(to, fset, runner) + return nil }) m.pmut.RUnlock() @@ -602,8 +595,10 @@ func (m *model) newFolder(cfg config.FolderConfiguration, cacheIgnoredFiles bool // Care needs to be taken because we already hold fmut and the lock order // must be the same everywhere. As fmut is acquired first, this is fine. m.pmut.RLock() - m.indexHandlers.Each(func(_ protocol.DeviceID, r *indexHandlerRegistry) { - r.RegisterFolderState(cfg, fset, m.folderRunners[cfg.ID]) + m.indexHandlers.Each(func(_ protocol.DeviceID, r *indexHandlerRegistry) error { + runner, _ := m.folderRunners.Get(cfg.ID) + r.RegisterFolderState(cfg, fset, runner) + return nil }) m.pmut.RUnlock() @@ -786,12 +781,16 @@ func (m *model) FolderStatistics() (map[string]stats.FolderStatistics, error) { res := make(map[string]stats.FolderStatistics) m.fmut.RLock() defer m.fmut.RUnlock() - for id, runner := range m.folderRunners { + err := m.folderRunners.Each(func(id string, runner service) error { stats, err := runner.GetStatistics() if err != nil { - return nil, err + return err } res[id] = stats + return nil + }) + if err != nil { + return nil, err } return res, nil } @@ -951,7 +950,7 @@ func (m *model) FolderProgressBytesCompleted(folder string) int64 { func (m *model) NeedFolderFiles(folder string, page, perpage int) ([]db.FileInfoTruncated, []db.FileInfoTruncated, []db.FileInfoTruncated, error) { m.fmut.RLock() rf, rfOk := m.folderFiles[folder] - runner, runnerOk := m.folderRunners[folder] + runner, runnerOk := m.folderRunners.Get(folder) cfg := m.folderCfgs[folder] m.fmut.RUnlock() @@ -1793,8 +1792,9 @@ func (m *model) Closed(conn protocol.Connection, err error) { delete(m.remoteFolderStates, device) closed := m.closed[device] delete(m.closed, device) - m.indexHandlers.RemoveAndWait(device, 0) + wait := m.indexHandlers.RemoveAndWaitChan(device, 0) m.pmut.Unlock() + <-wait m.progressEmitter.temporaryIndexUnsubscribe(conn) m.deviceDidClose(device, time.Since(conn.EstablishedAt())) @@ -2019,7 +2019,7 @@ func (m *model) recheckFile(deviceID protocol.DeviceID, folder, name string, off // Something is fishy, invalidate the file and rescan it. // The file will temporarily become invalid, which is ok as the content is messed up. m.fmut.RLock() - runner, ok := m.folderRunners[folder] + runner, ok := m.folderRunners.Get(folder) m.fmut.RUnlock() if !ok { l.Debugf("%v recheckFile: %s: %q / %q: Folder stopped before rescan could be scheduled", m, deviceID, folder, name) @@ -2166,7 +2166,7 @@ func (m *model) setIgnores(cfg config.FolderConfiguration, content []string) err } m.fmut.RLock() - runner, ok := m.folderRunners[cfg.ID] + runner, ok := m.folderRunners.Get(cfg.ID) m.fmut.RUnlock() if ok { runner.ScheduleScan() @@ -2254,7 +2254,8 @@ func (m *model) AddConnection(conn protocol.Connection, hello protocol.Hello) { m.deviceDownloads[deviceID] = newDeviceDownloadState() indexRegistry := newIndexHandlerRegistry(conn, m.deviceDownloads[deviceID], m.evLogger) for id, fcfg := range m.folderCfgs { - indexRegistry.RegisterFolderState(fcfg, m.folderFiles[id], m.folderRunners[id]) + runner, _ := m.folderRunners.Get(id) + indexRegistry.RegisterFolderState(fcfg, m.folderFiles[id], runner) } m.indexHandlers.Add(deviceID, indexRegistry) m.fmut.RUnlock() @@ -2402,7 +2403,7 @@ func (m *model) ScanFolder(folder string) error { func (m *model) ScanFolderSubdirs(folder string, subs []string) error { m.fmut.RLock() err := m.checkFolderRunningLocked(folder) - runner := m.folderRunners[folder] + runner, _ := m.folderRunners.Get(folder) m.fmut.RUnlock() if err != nil { @@ -2414,7 +2415,7 @@ func (m *model) ScanFolderSubdirs(folder string, subs []string) error { func (m *model) DelayScan(folder string, next time.Duration) { m.fmut.RLock() - runner, ok := m.folderRunners[folder] + runner, ok := m.folderRunners.Get(folder) m.fmut.RUnlock() if !ok { return @@ -2532,7 +2533,7 @@ func (m *model) generateClusterConfig(device protocol.DeviceID) (protocol.Cluste func (m *model) State(folder string) (string, time.Time, error) { m.fmut.RLock() - runner, ok := m.folderRunners[folder] + runner, ok := m.folderRunners.Get(folder) m.fmut.RUnlock() if !ok { // The returned error should be an actual folder error, so returning @@ -2547,7 +2548,7 @@ func (m *model) State(folder string) (string, time.Time, error) { func (m *model) FolderErrors(folder string) ([]FileError, error) { m.fmut.RLock() err := m.checkFolderRunningLocked(folder) - runner := m.folderRunners[folder] + runner, _ := m.folderRunners.Get(folder) m.fmut.RUnlock() if err != nil { return nil, err @@ -2558,7 +2559,7 @@ func (m *model) FolderErrors(folder string) ([]FileError, error) { func (m *model) WatchError(folder string) error { m.fmut.RLock() err := m.checkFolderRunningLocked(folder) - runner := m.folderRunners[folder] + runner, _ := m.folderRunners.Get(folder) m.fmut.RUnlock() if err != nil { return nil // If the folder isn't running, there's no error to report. @@ -2570,7 +2571,7 @@ func (m *model) Override(folder string) { // Grab the runner and the file set. m.fmut.RLock() - runner, ok := m.folderRunners[folder] + runner, ok := m.folderRunners.Get(folder) m.fmut.RUnlock() if !ok { return @@ -2585,7 +2586,7 @@ func (m *model) Revert(folder string) { // Grab the runner and the file set. m.fmut.RLock() - runner, ok := m.folderRunners[folder] + runner, ok := m.folderRunners.Get(folder) m.fmut.RUnlock() if !ok { return @@ -2788,7 +2789,7 @@ func (m *model) availabilityInSnapshotPRlocked(cfg config.FolderConfiguration, s // BringToFront bumps the given files priority in the job queue. func (m *model) BringToFront(folder, file string) { m.fmut.RLock() - runner, ok := m.folderRunners[folder] + runner, ok := m.folderRunners.Get(folder) m.fmut.RUnlock() if ok { @@ -2799,7 +2800,7 @@ func (m *model) BringToFront(folder, file string) { func (m *model) ResetFolder(folder string) error { m.fmut.RLock() defer m.fmut.RUnlock() - _, ok := m.folderRunners[folder] + _, ok := m.folderRunners.Get(folder) if ok { return errors.New("folder must be paused when resetting") } @@ -3085,7 +3086,7 @@ func (m *model) cleanPending(existingDevices map[protocol.DeviceID]config.Device // descriptive error if not. // Need to hold (read) lock on m.fmut when calling this. func (m *model) checkFolderRunningLocked(folder string) error { - _, ok := m.folderRunners[folder] + _, ok := m.folderRunners.Get(folder) if ok { return nil } diff --git a/lib/model/model_test.go b/lib/model/model_test.go index 878239eaa..508173285 100644 --- a/lib/model/model_test.go +++ b/lib/model/model_test.go @@ -1241,7 +1241,7 @@ func TestAutoAcceptPausedWhenFolderConfigChanged(t *testing.T) { if fcfg, ok := m.cfg.Folder(id); !ok || !fcfg.SharedWith(device1) { t.Error("missing folder, or not shared", id) } - if _, ok := m.folderRunners[id]; ok { + if _, ok := m.folderRunners.Get(id); ok { t.Fatal("folder running?") } @@ -1261,7 +1261,7 @@ func TestAutoAcceptPausedWhenFolderConfigChanged(t *testing.T) { t.Error("device missing") } - if _, ok := m.folderRunners[id]; ok { + if _, ok := m.folderRunners.Get(id); ok { t.Error("folder started") } } @@ -1290,7 +1290,7 @@ func TestAutoAcceptPausedWhenFolderConfigNotChanged(t *testing.T) { if fcfg, ok := m.cfg.Folder(id); !ok || !fcfg.SharedWith(device1) { t.Error("missing folder, or not shared", id) } - if _, ok := m.folderRunners[id]; ok { + if _, ok := m.folderRunners.Get(id); ok { t.Fatal("folder running?") } @@ -1310,7 +1310,7 @@ func TestAutoAcceptPausedWhenFolderConfigNotChanged(t *testing.T) { t.Error("device missing") } - if _, ok := m.folderRunners[id]; ok { + if _, ok := m.folderRunners.Get(id); ok { t.Error("folder started") } } diff --git a/lib/model/requests_test.go b/lib/model/requests_test.go index 08001c9a2..497342481 100644 --- a/lib/model/requests_test.go +++ b/lib/model/requests_test.go @@ -377,7 +377,8 @@ func TestIssue4841(t *testing.T) { } // Setup file from remote that was ignored locally - folder := m.folderRunners[defaultFolderConfig.ID].(*sendReceiveFolder) + runner, _ := m.folderRunners.Get(defaultFolderConfig.ID) + folder := runner.(*sendReceiveFolder) folder.updateLocals([]protocol.FileInfo{{ Name: "foo", Type: protocol.FileInfoTypeFile, diff --git a/lib/model/service_map.go b/lib/model/service_map.go index bef67203f..988ad520a 100644 --- a/lib/model/service_map.go +++ b/lib/model/service_map.go @@ -16,6 +16,8 @@ import ( "github.com/thejerf/suture/v4" ) +var errSvcNotFound = fmt.Errorf("service not found") + // A serviceMap is a utility map of arbitrary keys to a suture.Service of // some kind, where adding and removing services ensures they are properly // started and stopped on the given Supervisor. The serviceMap is itself a @@ -71,23 +73,39 @@ func (s *serviceMap[K, S]) Remove(k K) (found bool) { } // RemoveAndWait removes the service at the given key, stopping it on the -// supervisor. If there is no service at the given key, nothing happens. The -// return value indicates whether a service was removed. -func (s *serviceMap[K, S]) RemoveAndWait(k K, timeout time.Duration) (found bool) { +// supervisor. Returns errSvcNotFound if there is no service at the given +// key, otherwise the return value from the supervisor's RemoveAndWait. +func (s *serviceMap[K, S]) RemoveAndWait(k K, timeout time.Duration) error { + return <-s.RemoveAndWaitChan(k, timeout) +} + +// RemoveAndWaitChan removes the service at the given key, stopping it on +// the supervisor. The returned channel will produce precisely one error +// value: either the return value from RemoveAndWait (possibly nil), or +// errSvcNotFound if the service was not found. +func (s *serviceMap[K, S]) RemoveAndWaitChan(k K, timeout time.Duration) <-chan error { + ret := make(chan error, 1) if tok, ok := s.tokens[k]; ok { - found = true - s.supervisor.RemoveAndWait(tok, timeout) + go func() { + ret <- s.supervisor.RemoveAndWait(tok, timeout) + }() + } else { + ret <- errSvcNotFound } delete(s.services, k) delete(s.tokens, k) - return found + return ret } -// Each calls the given function for each service in the map. -func (s *serviceMap[K, S]) Each(fn func(K, S)) { +// Each calls the given function for each service in the map. An error from +// fn will stop the iteration and be returned as-is. +func (s *serviceMap[K, S]) Each(fn func(K, S) error) error { for key, svc := range s.services { - fn(key, svc) + if err := fn(key, svc); err != nil { + return err + } } + return nil } // Suture implementation diff --git a/lib/model/service_map_test.go b/lib/model/service_map_test.go index 3f7c9bae5..8fcfd896f 100644 --- a/lib/model/service_map_test.go +++ b/lib/model/service_map_test.go @@ -105,10 +105,11 @@ func TestServiceMap(t *testing.T) { // Remove two of them from within the iterator. - sm.Each(func(k string, v *dummyService) { + sm.Each(func(k string, v *dummyService) error { if strings.HasPrefix(k, "remove") { sm.RemoveAndWait(k, 0) } + return nil }) // They should have stopped.