lib/model: Refactor folderRunners to use a serviceMap (#9071)

Instead of separately tracking the token.

Also changes serviceMap to have a channel version of RemoveAndWait, so
that it's possible to do the removal under a lock but wait outside of
the lock. And changed where we do that in connection close, reversing
the change that happened when I added the serviceMap in 40b3b9ad1.
This commit is contained in:
Jakob Borg 2023-09-02 16:42:46 +02:00 committed by GitHub
parent 4d93648f75
commit 5118538179
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 85 additions and 61 deletions

View File

@ -537,7 +537,8 @@ func setupROFolder(t *testing.T) (*testModel, *receiveOnlyFolder, context.Cancel
m.fmut.RLock() m.fmut.RLock()
defer m.fmut.RUnlock() defer m.fmut.RUnlock()
f := m.folderRunners["ro"].(*receiveOnlyFolder) r, _ := m.folderRunners.Get("ro")
f := r.(*receiveOnlyFolder)
return m, f, cancel return m, f, cancel
} }

View File

@ -117,7 +117,8 @@ func setupSendReceiveFolder(t testing.TB, files ...protocol.FileInfo) (*testMode
model := setupModel(t, w) model := setupModel(t, w)
model.cancel() model.cancel()
<-model.stopped <-model.stopped
f := model.folderRunners[fcfg.ID].(*sendReceiveFolder) r, _ := model.folderRunners.Get(fcfg.ID)
f := r.(*sendReceiveFolder)
f.tempPullErrors = make(map[string]string) f.tempPullErrors = make(map[string]string)
f.ctx = context.Background() f.ctx = context.Background()

View File

@ -427,7 +427,7 @@ func (r *indexHandlerRegistry) AddIndexInfo(folder string, startInfo *clusterCon
r.mut.Lock() r.mut.Lock()
defer r.mut.Unlock() defer r.mut.Unlock()
if r.indexHandlers.RemoveAndWait(folder, 0) { if r.indexHandlers.RemoveAndWait(folder, 0) == nil {
l.Debugf("Removed index sender for device %v and folder %v due to added pending", r.conn.DeviceID().Short(), folder) l.Debugf("Removed index sender for device %v and folder %v due to added pending", r.conn.DeviceID().Short(), folder)
} }
folderState, ok := r.folderStates[folder] folderState, ok := r.folderStates[folder]
@ -458,11 +458,12 @@ func (r *indexHandlerRegistry) RemoveAllExcept(except map[string]remoteFolderSta
r.mut.Lock() r.mut.Lock()
defer r.mut.Unlock() defer r.mut.Unlock()
r.indexHandlers.Each(func(folder string, is *indexHandler) { r.indexHandlers.Each(func(folder string, is *indexHandler) error {
if _, ok := except[folder]; !ok { if _, ok := except[folder]; !ok {
r.indexHandlers.RemoveAndWait(folder, 0) r.indexHandlers.RemoveAndWait(folder, 0)
l.Debugf("Removed index handler for device %v and folder %v (removeAllExcept)", r.conn.DeviceID().Short(), folder) l.Debugf("Removed index handler for device %v and folder %v (removeAllExcept)", r.conn.DeviceID().Short(), folder)
} }
return nil
}) })
for folder := range r.startInfos { for folder := range r.startInfos {
if _, ok := except[folder]; !ok { if _, ok := except[folder]; !ok {

View File

@ -150,8 +150,7 @@ type model struct {
folderFiles map[string]*db.FileSet // folder -> files folderFiles map[string]*db.FileSet // folder -> files
deviceStatRefs map[protocol.DeviceID]*stats.DeviceStatisticsReference // deviceID -> statsRef deviceStatRefs map[protocol.DeviceID]*stats.DeviceStatisticsReference // deviceID -> statsRef
folderIgnores map[string]*ignore.Matcher // folder -> matcher object folderIgnores map[string]*ignore.Matcher // folder -> matcher object
folderRunners map[string]service // folder -> puller or scanner folderRunners *serviceMap[string, service] // folder -> puller or scanner
folderRunnerToken map[string]suture.ServiceToken // folder -> token for folder runner
folderRestartMuts syncMutexMap // folder -> restart mutex folderRestartMuts syncMutexMap // folder -> restart mutex
folderVersioners map[string]versioner.Versioner // folder -> versioner (may be nil) folderVersioners map[string]versioner.Versioner // folder -> versioner (may be nil)
folderEncryptionPasswordTokens map[string][]byte // folder -> encryption token (may be missing, and only for encryption type folders) folderEncryptionPasswordTokens map[string][]byte // folder -> encryption token (may be missing, and only for encryption type folders)
@ -234,8 +233,7 @@ func NewModel(cfg config.Wrapper, id protocol.DeviceID, clientName, clientVersio
folderFiles: make(map[string]*db.FileSet), folderFiles: make(map[string]*db.FileSet),
deviceStatRefs: make(map[protocol.DeviceID]*stats.DeviceStatisticsReference), deviceStatRefs: make(map[protocol.DeviceID]*stats.DeviceStatisticsReference),
folderIgnores: make(map[string]*ignore.Matcher), folderIgnores: make(map[string]*ignore.Matcher),
folderRunners: make(map[string]service), folderRunners: newServiceMap[string, service](evLogger),
folderRunnerToken: make(map[string]suture.ServiceToken),
folderVersioners: make(map[string]versioner.Versioner), folderVersioners: make(map[string]versioner.Versioner),
folderEncryptionPasswordTokens: make(map[string][]byte), folderEncryptionPasswordTokens: make(map[string][]byte),
folderEncryptionFailures: make(map[string]map[protocol.DeviceID]error), folderEncryptionFailures: make(map[string]map[protocol.DeviceID]error),
@ -253,6 +251,7 @@ func NewModel(cfg config.Wrapper, id protocol.DeviceID, clientName, clientVersio
for devID := range cfg.Devices() { for devID := range cfg.Devices() {
m.deviceStatRefs[devID] = stats.NewDeviceStatisticsReference(m.db, devID) m.deviceStatRefs[devID] = stats.NewDeviceStatisticsReference(m.db, devID)
} }
m.Add(m.folderRunners)
m.Add(m.progressEmitter) m.Add(m.progressEmitter)
m.Add(m.indexHandlers) m.Add(m.indexHandlers)
m.Add(svcutil.AsService(m.serve, m.String())) m.Add(svcutil.AsService(m.serve, m.String()))
@ -350,7 +349,7 @@ func (m *model) addAndStartFolderLockedWithIgnores(cfg config.FolderConfiguratio
m.folderFiles[cfg.ID] = fset m.folderFiles[cfg.ID] = fset
m.folderIgnores[cfg.ID] = ignores m.folderIgnores[cfg.ID] = ignores
_, ok := m.folderRunners[cfg.ID] _, ok := m.folderRunners.Get(cfg.ID)
if ok { if ok {
l.Warnln("Cannot start already running folder", cfg.Description()) l.Warnln("Cannot start already running folder", cfg.Description())
panic("cannot start already running folder") panic("cannot start already running folder")
@ -413,13 +412,10 @@ func (m *model) addAndStartFolderLockedWithIgnores(cfg config.FolderConfiguratio
} }
m.folderVersioners[folder] = ver m.folderVersioners[folder] = ver
p := folderFactory(m, fset, ignores, cfg, ver, m.evLogger, m.folderIOLimiter)
m.folderRunners[folder] = p
m.warnAboutOverwritingProtectedFiles(cfg, ignores) m.warnAboutOverwritingProtectedFiles(cfg, ignores)
m.folderRunnerToken[folder] = m.Add(p) p := folderFactory(m, fset, ignores, cfg, ver, m.evLogger, m.folderIOLimiter)
m.folderRunners.Add(folder, p)
l.Infof("Ready to synchronize %s (%s)", cfg.Description(), cfg.Type) l.Infof("Ready to synchronize %s (%s)", cfg.Description(), cfg.Type)
} }
@ -459,11 +455,9 @@ func (m *model) warnAboutOverwritingProtectedFiles(cfg config.FolderConfiguratio
func (m *model) removeFolder(cfg config.FolderConfiguration) { func (m *model) removeFolder(cfg config.FolderConfiguration) {
m.fmut.RLock() m.fmut.RLock()
token, ok := m.folderRunnerToken[cfg.ID] wait := m.folderRunners.RemoveAndWaitChan(cfg.ID, 0)
m.fmut.RUnlock() m.fmut.RUnlock()
if ok { <-wait
m.RemoveAndWait(token, 0)
}
// We need to hold both fmut and pmut and must acquire locks in the same // We need to hold both fmut and pmut and must acquire locks in the same
// order always. (The locks can be *released* in any order.) // order always. (The locks can be *released* in any order.)
@ -488,8 +482,9 @@ func (m *model) removeFolder(cfg config.FolderConfiguration) {
} }
m.cleanupFolderLocked(cfg) m.cleanupFolderLocked(cfg)
m.indexHandlers.Each(func(_ protocol.DeviceID, r *indexHandlerRegistry) { m.indexHandlers.Each(func(_ protocol.DeviceID, r *indexHandlerRegistry) error {
r.Remove(cfg.ID) r.Remove(cfg.ID)
return nil
}) })
m.fmut.Unlock() m.fmut.Unlock()
@ -505,8 +500,6 @@ func (m *model) cleanupFolderLocked(cfg config.FolderConfiguration) {
delete(m.folderCfgs, cfg.ID) delete(m.folderCfgs, cfg.ID)
delete(m.folderFiles, cfg.ID) delete(m.folderFiles, cfg.ID)
delete(m.folderIgnores, cfg.ID) delete(m.folderIgnores, cfg.ID)
delete(m.folderRunners, cfg.ID)
delete(m.folderRunnerToken, cfg.ID)
delete(m.folderVersioners, cfg.ID) delete(m.folderVersioners, cfg.ID)
delete(m.folderEncryptionPasswordTokens, cfg.ID) delete(m.folderEncryptionPasswordTokens, cfg.ID)
delete(m.folderEncryptionFailures, cfg.ID) delete(m.folderEncryptionFailures, cfg.ID)
@ -533,11 +526,9 @@ func (m *model) restartFolder(from, to config.FolderConfiguration, cacheIgnoredF
defer restartMut.Unlock() defer restartMut.Unlock()
m.fmut.RLock() m.fmut.RLock()
token, ok := m.folderRunnerToken[from.ID] wait := m.folderRunners.RemoveAndWaitChan(from.ID, 0)
m.fmut.RUnlock() m.fmut.RUnlock()
if ok { <-wait
m.RemoveAndWait(token, 0)
}
m.fmut.Lock() m.fmut.Lock()
defer m.fmut.Unlock() defer m.fmut.Unlock()
@ -564,8 +555,10 @@ func (m *model) restartFolder(from, to config.FolderConfiguration, cacheIgnoredF
// Care needs to be taken because we already hold fmut and the lock order // Care needs to be taken because we already hold fmut and the lock order
// must be the same everywhere. As fmut is acquired first, this is fine. // must be the same everywhere. As fmut is acquired first, this is fine.
m.pmut.RLock() m.pmut.RLock()
m.indexHandlers.Each(func(_ protocol.DeviceID, r *indexHandlerRegistry) { runner, _ := m.folderRunners.Get(to.ID)
r.RegisterFolderState(to, fset, m.folderRunners[to.ID]) m.indexHandlers.Each(func(_ protocol.DeviceID, r *indexHandlerRegistry) error {
r.RegisterFolderState(to, fset, runner)
return nil
}) })
m.pmut.RUnlock() m.pmut.RUnlock()
@ -602,8 +595,10 @@ func (m *model) newFolder(cfg config.FolderConfiguration, cacheIgnoredFiles bool
// Care needs to be taken because we already hold fmut and the lock order // Care needs to be taken because we already hold fmut and the lock order
// must be the same everywhere. As fmut is acquired first, this is fine. // must be the same everywhere. As fmut is acquired first, this is fine.
m.pmut.RLock() m.pmut.RLock()
m.indexHandlers.Each(func(_ protocol.DeviceID, r *indexHandlerRegistry) { m.indexHandlers.Each(func(_ protocol.DeviceID, r *indexHandlerRegistry) error {
r.RegisterFolderState(cfg, fset, m.folderRunners[cfg.ID]) runner, _ := m.folderRunners.Get(cfg.ID)
r.RegisterFolderState(cfg, fset, runner)
return nil
}) })
m.pmut.RUnlock() m.pmut.RUnlock()
@ -786,12 +781,16 @@ func (m *model) FolderStatistics() (map[string]stats.FolderStatistics, error) {
res := make(map[string]stats.FolderStatistics) res := make(map[string]stats.FolderStatistics)
m.fmut.RLock() m.fmut.RLock()
defer m.fmut.RUnlock() defer m.fmut.RUnlock()
for id, runner := range m.folderRunners { err := m.folderRunners.Each(func(id string, runner service) error {
stats, err := runner.GetStatistics() stats, err := runner.GetStatistics()
if err != nil { if err != nil {
return nil, err return err
} }
res[id] = stats res[id] = stats
return nil
})
if err != nil {
return nil, err
} }
return res, nil return res, nil
} }
@ -951,7 +950,7 @@ func (m *model) FolderProgressBytesCompleted(folder string) int64 {
func (m *model) NeedFolderFiles(folder string, page, perpage int) ([]db.FileInfoTruncated, []db.FileInfoTruncated, []db.FileInfoTruncated, error) { func (m *model) NeedFolderFiles(folder string, page, perpage int) ([]db.FileInfoTruncated, []db.FileInfoTruncated, []db.FileInfoTruncated, error) {
m.fmut.RLock() m.fmut.RLock()
rf, rfOk := m.folderFiles[folder] rf, rfOk := m.folderFiles[folder]
runner, runnerOk := m.folderRunners[folder] runner, runnerOk := m.folderRunners.Get(folder)
cfg := m.folderCfgs[folder] cfg := m.folderCfgs[folder]
m.fmut.RUnlock() m.fmut.RUnlock()
@ -1793,8 +1792,9 @@ func (m *model) Closed(conn protocol.Connection, err error) {
delete(m.remoteFolderStates, device) delete(m.remoteFolderStates, device)
closed := m.closed[device] closed := m.closed[device]
delete(m.closed, device) delete(m.closed, device)
m.indexHandlers.RemoveAndWait(device, 0) wait := m.indexHandlers.RemoveAndWaitChan(device, 0)
m.pmut.Unlock() m.pmut.Unlock()
<-wait
m.progressEmitter.temporaryIndexUnsubscribe(conn) m.progressEmitter.temporaryIndexUnsubscribe(conn)
m.deviceDidClose(device, time.Since(conn.EstablishedAt())) m.deviceDidClose(device, time.Since(conn.EstablishedAt()))
@ -2019,7 +2019,7 @@ func (m *model) recheckFile(deviceID protocol.DeviceID, folder, name string, off
// Something is fishy, invalidate the file and rescan it. // Something is fishy, invalidate the file and rescan it.
// The file will temporarily become invalid, which is ok as the content is messed up. // The file will temporarily become invalid, which is ok as the content is messed up.
m.fmut.RLock() m.fmut.RLock()
runner, ok := m.folderRunners[folder] runner, ok := m.folderRunners.Get(folder)
m.fmut.RUnlock() m.fmut.RUnlock()
if !ok { if !ok {
l.Debugf("%v recheckFile: %s: %q / %q: Folder stopped before rescan could be scheduled", m, deviceID, folder, name) l.Debugf("%v recheckFile: %s: %q / %q: Folder stopped before rescan could be scheduled", m, deviceID, folder, name)
@ -2166,7 +2166,7 @@ func (m *model) setIgnores(cfg config.FolderConfiguration, content []string) err
} }
m.fmut.RLock() m.fmut.RLock()
runner, ok := m.folderRunners[cfg.ID] runner, ok := m.folderRunners.Get(cfg.ID)
m.fmut.RUnlock() m.fmut.RUnlock()
if ok { if ok {
runner.ScheduleScan() runner.ScheduleScan()
@ -2254,7 +2254,8 @@ func (m *model) AddConnection(conn protocol.Connection, hello protocol.Hello) {
m.deviceDownloads[deviceID] = newDeviceDownloadState() m.deviceDownloads[deviceID] = newDeviceDownloadState()
indexRegistry := newIndexHandlerRegistry(conn, m.deviceDownloads[deviceID], m.evLogger) indexRegistry := newIndexHandlerRegistry(conn, m.deviceDownloads[deviceID], m.evLogger)
for id, fcfg := range m.folderCfgs { for id, fcfg := range m.folderCfgs {
indexRegistry.RegisterFolderState(fcfg, m.folderFiles[id], m.folderRunners[id]) runner, _ := m.folderRunners.Get(id)
indexRegistry.RegisterFolderState(fcfg, m.folderFiles[id], runner)
} }
m.indexHandlers.Add(deviceID, indexRegistry) m.indexHandlers.Add(deviceID, indexRegistry)
m.fmut.RUnlock() m.fmut.RUnlock()
@ -2402,7 +2403,7 @@ func (m *model) ScanFolder(folder string) error {
func (m *model) ScanFolderSubdirs(folder string, subs []string) error { func (m *model) ScanFolderSubdirs(folder string, subs []string) error {
m.fmut.RLock() m.fmut.RLock()
err := m.checkFolderRunningLocked(folder) err := m.checkFolderRunningLocked(folder)
runner := m.folderRunners[folder] runner, _ := m.folderRunners.Get(folder)
m.fmut.RUnlock() m.fmut.RUnlock()
if err != nil { if err != nil {
@ -2414,7 +2415,7 @@ func (m *model) ScanFolderSubdirs(folder string, subs []string) error {
func (m *model) DelayScan(folder string, next time.Duration) { func (m *model) DelayScan(folder string, next time.Duration) {
m.fmut.RLock() m.fmut.RLock()
runner, ok := m.folderRunners[folder] runner, ok := m.folderRunners.Get(folder)
m.fmut.RUnlock() m.fmut.RUnlock()
if !ok { if !ok {
return return
@ -2532,7 +2533,7 @@ func (m *model) generateClusterConfig(device protocol.DeviceID) (protocol.Cluste
func (m *model) State(folder string) (string, time.Time, error) { func (m *model) State(folder string) (string, time.Time, error) {
m.fmut.RLock() m.fmut.RLock()
runner, ok := m.folderRunners[folder] runner, ok := m.folderRunners.Get(folder)
m.fmut.RUnlock() m.fmut.RUnlock()
if !ok { if !ok {
// The returned error should be an actual folder error, so returning // The returned error should be an actual folder error, so returning
@ -2547,7 +2548,7 @@ func (m *model) State(folder string) (string, time.Time, error) {
func (m *model) FolderErrors(folder string) ([]FileError, error) { func (m *model) FolderErrors(folder string) ([]FileError, error) {
m.fmut.RLock() m.fmut.RLock()
err := m.checkFolderRunningLocked(folder) err := m.checkFolderRunningLocked(folder)
runner := m.folderRunners[folder] runner, _ := m.folderRunners.Get(folder)
m.fmut.RUnlock() m.fmut.RUnlock()
if err != nil { if err != nil {
return nil, err return nil, err
@ -2558,7 +2559,7 @@ func (m *model) FolderErrors(folder string) ([]FileError, error) {
func (m *model) WatchError(folder string) error { func (m *model) WatchError(folder string) error {
m.fmut.RLock() m.fmut.RLock()
err := m.checkFolderRunningLocked(folder) err := m.checkFolderRunningLocked(folder)
runner := m.folderRunners[folder] runner, _ := m.folderRunners.Get(folder)
m.fmut.RUnlock() m.fmut.RUnlock()
if err != nil { if err != nil {
return nil // If the folder isn't running, there's no error to report. return nil // If the folder isn't running, there's no error to report.
@ -2570,7 +2571,7 @@ func (m *model) Override(folder string) {
// Grab the runner and the file set. // Grab the runner and the file set.
m.fmut.RLock() m.fmut.RLock()
runner, ok := m.folderRunners[folder] runner, ok := m.folderRunners.Get(folder)
m.fmut.RUnlock() m.fmut.RUnlock()
if !ok { if !ok {
return return
@ -2585,7 +2586,7 @@ func (m *model) Revert(folder string) {
// Grab the runner and the file set. // Grab the runner and the file set.
m.fmut.RLock() m.fmut.RLock()
runner, ok := m.folderRunners[folder] runner, ok := m.folderRunners.Get(folder)
m.fmut.RUnlock() m.fmut.RUnlock()
if !ok { if !ok {
return return
@ -2788,7 +2789,7 @@ func (m *model) availabilityInSnapshotPRlocked(cfg config.FolderConfiguration, s
// BringToFront bumps the given files priority in the job queue. // BringToFront bumps the given files priority in the job queue.
func (m *model) BringToFront(folder, file string) { func (m *model) BringToFront(folder, file string) {
m.fmut.RLock() m.fmut.RLock()
runner, ok := m.folderRunners[folder] runner, ok := m.folderRunners.Get(folder)
m.fmut.RUnlock() m.fmut.RUnlock()
if ok { if ok {
@ -2799,7 +2800,7 @@ func (m *model) BringToFront(folder, file string) {
func (m *model) ResetFolder(folder string) error { func (m *model) ResetFolder(folder string) error {
m.fmut.RLock() m.fmut.RLock()
defer m.fmut.RUnlock() defer m.fmut.RUnlock()
_, ok := m.folderRunners[folder] _, ok := m.folderRunners.Get(folder)
if ok { if ok {
return errors.New("folder must be paused when resetting") return errors.New("folder must be paused when resetting")
} }
@ -3085,7 +3086,7 @@ func (m *model) cleanPending(existingDevices map[protocol.DeviceID]config.Device
// descriptive error if not. // descriptive error if not.
// Need to hold (read) lock on m.fmut when calling this. // Need to hold (read) lock on m.fmut when calling this.
func (m *model) checkFolderRunningLocked(folder string) error { func (m *model) checkFolderRunningLocked(folder string) error {
_, ok := m.folderRunners[folder] _, ok := m.folderRunners.Get(folder)
if ok { if ok {
return nil return nil
} }

View File

@ -1241,7 +1241,7 @@ func TestAutoAcceptPausedWhenFolderConfigChanged(t *testing.T) {
if fcfg, ok := m.cfg.Folder(id); !ok || !fcfg.SharedWith(device1) { if fcfg, ok := m.cfg.Folder(id); !ok || !fcfg.SharedWith(device1) {
t.Error("missing folder, or not shared", id) t.Error("missing folder, or not shared", id)
} }
if _, ok := m.folderRunners[id]; ok { if _, ok := m.folderRunners.Get(id); ok {
t.Fatal("folder running?") t.Fatal("folder running?")
} }
@ -1261,7 +1261,7 @@ func TestAutoAcceptPausedWhenFolderConfigChanged(t *testing.T) {
t.Error("device missing") t.Error("device missing")
} }
if _, ok := m.folderRunners[id]; ok { if _, ok := m.folderRunners.Get(id); ok {
t.Error("folder started") t.Error("folder started")
} }
} }
@ -1290,7 +1290,7 @@ func TestAutoAcceptPausedWhenFolderConfigNotChanged(t *testing.T) {
if fcfg, ok := m.cfg.Folder(id); !ok || !fcfg.SharedWith(device1) { if fcfg, ok := m.cfg.Folder(id); !ok || !fcfg.SharedWith(device1) {
t.Error("missing folder, or not shared", id) t.Error("missing folder, or not shared", id)
} }
if _, ok := m.folderRunners[id]; ok { if _, ok := m.folderRunners.Get(id); ok {
t.Fatal("folder running?") t.Fatal("folder running?")
} }
@ -1310,7 +1310,7 @@ func TestAutoAcceptPausedWhenFolderConfigNotChanged(t *testing.T) {
t.Error("device missing") t.Error("device missing")
} }
if _, ok := m.folderRunners[id]; ok { if _, ok := m.folderRunners.Get(id); ok {
t.Error("folder started") t.Error("folder started")
} }
} }

View File

@ -377,7 +377,8 @@ func TestIssue4841(t *testing.T) {
} }
// Setup file from remote that was ignored locally // Setup file from remote that was ignored locally
folder := m.folderRunners[defaultFolderConfig.ID].(*sendReceiveFolder) runner, _ := m.folderRunners.Get(defaultFolderConfig.ID)
folder := runner.(*sendReceiveFolder)
folder.updateLocals([]protocol.FileInfo{{ folder.updateLocals([]protocol.FileInfo{{
Name: "foo", Name: "foo",
Type: protocol.FileInfoTypeFile, Type: protocol.FileInfoTypeFile,

View File

@ -16,6 +16,8 @@ import (
"github.com/thejerf/suture/v4" "github.com/thejerf/suture/v4"
) )
var errSvcNotFound = fmt.Errorf("service not found")
// A serviceMap is a utility map of arbitrary keys to a suture.Service of // A serviceMap is a utility map of arbitrary keys to a suture.Service of
// some kind, where adding and removing services ensures they are properly // some kind, where adding and removing services ensures they are properly
// started and stopped on the given Supervisor. The serviceMap is itself a // started and stopped on the given Supervisor. The serviceMap is itself a
@ -71,23 +73,39 @@ func (s *serviceMap[K, S]) Remove(k K) (found bool) {
} }
// RemoveAndWait removes the service at the given key, stopping it on the // RemoveAndWait removes the service at the given key, stopping it on the
// supervisor. If there is no service at the given key, nothing happens. The // supervisor. Returns errSvcNotFound if there is no service at the given
// return value indicates whether a service was removed. // key, otherwise the return value from the supervisor's RemoveAndWait.
func (s *serviceMap[K, S]) RemoveAndWait(k K, timeout time.Duration) (found bool) { func (s *serviceMap[K, S]) RemoveAndWait(k K, timeout time.Duration) error {
return <-s.RemoveAndWaitChan(k, timeout)
}
// RemoveAndWaitChan removes the service at the given key, stopping it on
// the supervisor. The returned channel will produce precisely one error
// value: either the return value from RemoveAndWait (possibly nil), or
// errSvcNotFound if the service was not found.
func (s *serviceMap[K, S]) RemoveAndWaitChan(k K, timeout time.Duration) <-chan error {
ret := make(chan error, 1)
if tok, ok := s.tokens[k]; ok { if tok, ok := s.tokens[k]; ok {
found = true go func() {
s.supervisor.RemoveAndWait(tok, timeout) ret <- s.supervisor.RemoveAndWait(tok, timeout)
}()
} else {
ret <- errSvcNotFound
} }
delete(s.services, k) delete(s.services, k)
delete(s.tokens, k) delete(s.tokens, k)
return found return ret
} }
// Each calls the given function for each service in the map. // Each calls the given function for each service in the map. An error from
func (s *serviceMap[K, S]) Each(fn func(K, S)) { // fn will stop the iteration and be returned as-is.
func (s *serviceMap[K, S]) Each(fn func(K, S) error) error {
for key, svc := range s.services { for key, svc := range s.services {
fn(key, svc) if err := fn(key, svc); err != nil {
return err
}
} }
return nil
} }
// Suture implementation // Suture implementation

View File

@ -105,10 +105,11 @@ func TestServiceMap(t *testing.T) {
// Remove two of them from within the iterator. // Remove two of them from within the iterator.
sm.Each(func(k string, v *dummyService) { sm.Each(func(k string, v *dummyService) error {
if strings.HasPrefix(k, "remove") { if strings.HasPrefix(k, "remove") {
sm.RemoveAndWait(k, 0) sm.RemoveAndWait(k, 0)
} }
return nil
}) })
// They should have stopped. // They should have stopped.