mirror of
https://github.com/octoleo/syncthing.git
synced 2024-11-10 07:11:08 +00:00
Co-authored-by: Simon Frei <freisim93@gmail.com>
This commit is contained in:
parent
c2c6133aa5
commit
40b3b9ad15
@ -14,6 +14,7 @@ import (
|
|||||||
|
|
||||||
"github.com/syncthing/syncthing/lib/protocol"
|
"github.com/syncthing/syncthing/lib/protocol"
|
||||||
protocolmocks "github.com/syncthing/syncthing/lib/protocol/mocks"
|
protocolmocks "github.com/syncthing/syncthing/lib/protocol/mocks"
|
||||||
|
"github.com/syncthing/syncthing/lib/rand"
|
||||||
"github.com/syncthing/syncthing/lib/scanner"
|
"github.com/syncthing/syncthing/lib/scanner"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -36,10 +37,11 @@ func newFakeConnection(id protocol.DeviceID, model Model) *fakeConnection {
|
|||||||
f.CloseCalls(func(err error) {
|
f.CloseCalls(func(err error) {
|
||||||
f.closeOnce.Do(func() {
|
f.closeOnce.Do(func() {
|
||||||
close(f.closed)
|
close(f.closed)
|
||||||
|
model.Closed(f, err)
|
||||||
})
|
})
|
||||||
model.Closed(f, err)
|
|
||||||
f.ClosedReturns(f.closed)
|
f.ClosedReturns(f.closed)
|
||||||
})
|
})
|
||||||
|
f.StringReturns(rand.String(8))
|
||||||
return f
|
return f
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -12,8 +12,6 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/thejerf/suture/v4"
|
|
||||||
|
|
||||||
"github.com/syncthing/syncthing/lib/config"
|
"github.com/syncthing/syncthing/lib/config"
|
||||||
"github.com/syncthing/syncthing/lib/db"
|
"github.com/syncthing/syncthing/lib/db"
|
||||||
"github.com/syncthing/syncthing/lib/events"
|
"github.com/syncthing/syncthing/lib/events"
|
||||||
@ -28,7 +26,6 @@ type indexHandler struct {
|
|||||||
folderIsReceiveEncrypted bool
|
folderIsReceiveEncrypted bool
|
||||||
prevSequence int64
|
prevSequence int64
|
||||||
evLogger events.Logger
|
evLogger events.Logger
|
||||||
token suture.ServiceToken
|
|
||||||
|
|
||||||
cond *sync.Cond
|
cond *sync.Cond
|
||||||
paused bool
|
paused bool
|
||||||
@ -373,11 +370,10 @@ func (s *indexHandler) String() string {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type indexHandlerRegistry struct {
|
type indexHandlerRegistry struct {
|
||||||
sup *suture.Supervisor
|
|
||||||
evLogger events.Logger
|
evLogger events.Logger
|
||||||
conn protocol.Connection
|
conn protocol.Connection
|
||||||
downloads *deviceDownloadState
|
downloads *deviceDownloadState
|
||||||
indexHandlers map[string]*indexHandler
|
indexHandlers *serviceMap[string, *indexHandler]
|
||||||
startInfos map[string]*clusterConfigDeviceInfo
|
startInfos map[string]*clusterConfigDeviceInfo
|
||||||
folderStates map[string]*indexHandlerFolderState
|
folderStates map[string]*indexHandlerFolderState
|
||||||
mut sync.Mutex
|
mut sync.Mutex
|
||||||
@ -389,27 +385,16 @@ type indexHandlerFolderState struct {
|
|||||||
runner service
|
runner service
|
||||||
}
|
}
|
||||||
|
|
||||||
func newIndexHandlerRegistry(conn protocol.Connection, downloads *deviceDownloadState, closed chan struct{}, parentSup *suture.Supervisor, evLogger events.Logger) *indexHandlerRegistry {
|
func newIndexHandlerRegistry(conn protocol.Connection, downloads *deviceDownloadState, evLogger events.Logger) *indexHandlerRegistry {
|
||||||
r := &indexHandlerRegistry{
|
r := &indexHandlerRegistry{
|
||||||
|
evLogger: evLogger,
|
||||||
conn: conn,
|
conn: conn,
|
||||||
downloads: downloads,
|
downloads: downloads,
|
||||||
evLogger: evLogger,
|
indexHandlers: newServiceMap[string, *indexHandler](evLogger),
|
||||||
indexHandlers: make(map[string]*indexHandler),
|
|
||||||
startInfos: make(map[string]*clusterConfigDeviceInfo),
|
startInfos: make(map[string]*clusterConfigDeviceInfo),
|
||||||
folderStates: make(map[string]*indexHandlerFolderState),
|
folderStates: make(map[string]*indexHandlerFolderState),
|
||||||
mut: sync.Mutex{},
|
mut: sync.Mutex{},
|
||||||
}
|
}
|
||||||
r.sup = suture.New(r.String(), svcutil.SpecWithDebugLogger(l))
|
|
||||||
ourToken := parentSup.Add(r.sup)
|
|
||||||
r.sup.Add(svcutil.AsService(func(ctx context.Context) error {
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
return ctx.Err()
|
|
||||||
case <-closed:
|
|
||||||
parentSup.Remove(ourToken)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}, fmt.Sprintf("%v/waitForClosed", r)))
|
|
||||||
return r
|
return r
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -417,20 +402,18 @@ func (r *indexHandlerRegistry) String() string {
|
|||||||
return fmt.Sprintf("indexHandlerRegistry/%v", r.conn.DeviceID().Short())
|
return fmt.Sprintf("indexHandlerRegistry/%v", r.conn.DeviceID().Short())
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *indexHandlerRegistry) GetSupervisor() *suture.Supervisor {
|
func (r *indexHandlerRegistry) Serve(ctx context.Context) error {
|
||||||
return r.sup
|
// Running the index handler registry means running the individual index
|
||||||
|
// handler children.
|
||||||
|
return r.indexHandlers.Serve(ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *indexHandlerRegistry) startLocked(folder config.FolderConfiguration, fset *db.FileSet, runner service, startInfo *clusterConfigDeviceInfo) {
|
func (r *indexHandlerRegistry) startLocked(folder config.FolderConfiguration, fset *db.FileSet, runner service, startInfo *clusterConfigDeviceInfo) {
|
||||||
if is, ok := r.indexHandlers[folder.ID]; ok {
|
r.indexHandlers.RemoveAndWait(folder.ID, 0)
|
||||||
r.sup.RemoveAndWait(is.token, 0)
|
|
||||||
delete(r.indexHandlers, folder.ID)
|
|
||||||
}
|
|
||||||
delete(r.startInfos, folder.ID)
|
delete(r.startInfos, folder.ID)
|
||||||
|
|
||||||
is := newIndexHandler(r.conn, r.downloads, folder, fset, runner, startInfo, r.evLogger)
|
is := newIndexHandler(r.conn, r.downloads, folder, fset, runner, startInfo, r.evLogger)
|
||||||
is.token = r.sup.Add(is)
|
r.indexHandlers.Add(folder.ID, is)
|
||||||
r.indexHandlers[folder.ID] = is
|
|
||||||
|
|
||||||
// This new connection might help us get in sync.
|
// This new connection might help us get in sync.
|
||||||
runner.SchedulePull()
|
runner.SchedulePull()
|
||||||
@ -444,9 +427,7 @@ func (r *indexHandlerRegistry) AddIndexInfo(folder string, startInfo *clusterCon
|
|||||||
r.mut.Lock()
|
r.mut.Lock()
|
||||||
defer r.mut.Unlock()
|
defer r.mut.Unlock()
|
||||||
|
|
||||||
if is, ok := r.indexHandlers[folder]; ok {
|
if r.indexHandlers.RemoveAndWait(folder, 0) {
|
||||||
r.sup.RemoveAndWait(is.token, 0)
|
|
||||||
delete(r.indexHandlers, folder)
|
|
||||||
l.Debugf("Removed index sender for device %v and folder %v due to added pending", r.conn.DeviceID().Short(), folder)
|
l.Debugf("Removed index sender for device %v and folder %v due to added pending", r.conn.DeviceID().Short(), folder)
|
||||||
}
|
}
|
||||||
folderState, ok := r.folderStates[folder]
|
folderState, ok := r.folderStates[folder]
|
||||||
@ -465,10 +446,7 @@ func (r *indexHandlerRegistry) Remove(folder string) {
|
|||||||
defer r.mut.Unlock()
|
defer r.mut.Unlock()
|
||||||
|
|
||||||
l.Debugf("Removing index handler for device %v and folder %v", r.conn.DeviceID().Short(), folder)
|
l.Debugf("Removing index handler for device %v and folder %v", r.conn.DeviceID().Short(), folder)
|
||||||
if is, ok := r.indexHandlers[folder]; ok {
|
r.indexHandlers.RemoveAndWait(folder, 0)
|
||||||
r.sup.RemoveAndWait(is.token, 0)
|
|
||||||
delete(r.indexHandlers, folder)
|
|
||||||
}
|
|
||||||
delete(r.startInfos, folder)
|
delete(r.startInfos, folder)
|
||||||
l.Debugf("Removed index handler for device %v and folder %v", r.conn.DeviceID().Short(), folder)
|
l.Debugf("Removed index handler for device %v and folder %v", r.conn.DeviceID().Short(), folder)
|
||||||
}
|
}
|
||||||
@ -480,13 +458,12 @@ func (r *indexHandlerRegistry) RemoveAllExcept(except map[string]remoteFolderSta
|
|||||||
r.mut.Lock()
|
r.mut.Lock()
|
||||||
defer r.mut.Unlock()
|
defer r.mut.Unlock()
|
||||||
|
|
||||||
for folder, is := range r.indexHandlers {
|
r.indexHandlers.Each(func(folder string, is *indexHandler) {
|
||||||
if _, ok := except[folder]; !ok {
|
if _, ok := except[folder]; !ok {
|
||||||
r.sup.RemoveAndWait(is.token, 0)
|
r.indexHandlers.RemoveAndWait(folder, 0)
|
||||||
delete(r.indexHandlers, folder)
|
|
||||||
l.Debugf("Removed index handler for device %v and folder %v (removeAllExcept)", r.conn.DeviceID().Short(), folder)
|
l.Debugf("Removed index handler for device %v and folder %v (removeAllExcept)", r.conn.DeviceID().Short(), folder)
|
||||||
}
|
}
|
||||||
}
|
})
|
||||||
for folder := range r.startInfos {
|
for folder := range r.startInfos {
|
||||||
if _, ok := except[folder]; !ok {
|
if _, ok := except[folder]; !ok {
|
||||||
delete(r.startInfos, folder)
|
delete(r.startInfos, folder)
|
||||||
@ -518,7 +495,7 @@ func (r *indexHandlerRegistry) RegisterFolderState(folder config.FolderConfigura
|
|||||||
func (r *indexHandlerRegistry) folderPausedLocked(folder string) {
|
func (r *indexHandlerRegistry) folderPausedLocked(folder string) {
|
||||||
l.Debugf("Pausing index handler for device %v and folder %v", r.conn.DeviceID().Short(), folder)
|
l.Debugf("Pausing index handler for device %v and folder %v", r.conn.DeviceID().Short(), folder)
|
||||||
delete(r.folderStates, folder)
|
delete(r.folderStates, folder)
|
||||||
if is, ok := r.indexHandlers[folder]; ok {
|
if is, ok := r.indexHandlers.Get(folder); ok {
|
||||||
is.pause()
|
is.pause()
|
||||||
l.Debugf("Paused index handler for device %v and folder %v", r.conn.DeviceID().Short(), folder)
|
l.Debugf("Paused index handler for device %v and folder %v", r.conn.DeviceID().Short(), folder)
|
||||||
} else {
|
} else {
|
||||||
@ -536,11 +513,10 @@ func (r *indexHandlerRegistry) folderRunningLocked(folder config.FolderConfigura
|
|||||||
runner: runner,
|
runner: runner,
|
||||||
}
|
}
|
||||||
|
|
||||||
is, isOk := r.indexHandlers[folder.ID]
|
is, isOk := r.indexHandlers.Get(folder.ID)
|
||||||
if info, ok := r.startInfos[folder.ID]; ok {
|
if info, ok := r.startInfos[folder.ID]; ok {
|
||||||
if isOk {
|
if isOk {
|
||||||
r.sup.RemoveAndWait(is.token, 0)
|
r.indexHandlers.RemoveAndWait(folder.ID, 0)
|
||||||
delete(r.indexHandlers, folder.ID)
|
|
||||||
l.Debugf("Removed index handler for device %v and folder %v in resume", r.conn.DeviceID().Short(), folder.ID)
|
l.Debugf("Removed index handler for device %v and folder %v in resume", r.conn.DeviceID().Short(), folder.ID)
|
||||||
}
|
}
|
||||||
r.startLocked(folder, fset, runner, info)
|
r.startLocked(folder, fset, runner, info)
|
||||||
@ -557,7 +533,7 @@ func (r *indexHandlerRegistry) folderRunningLocked(folder config.FolderConfigura
|
|||||||
func (r *indexHandlerRegistry) ReceiveIndex(folder string, fs []protocol.FileInfo, update bool, op string) error {
|
func (r *indexHandlerRegistry) ReceiveIndex(folder string, fs []protocol.FileInfo, update bool, op string) error {
|
||||||
r.mut.Lock()
|
r.mut.Lock()
|
||||||
defer r.mut.Unlock()
|
defer r.mut.Unlock()
|
||||||
is, isOk := r.indexHandlers[folder]
|
is, isOk := r.indexHandlers.Get(folder)
|
||||||
if !isOk {
|
if !isOk {
|
||||||
l.Infof("%v for nonexistent or paused folder %q", op, folder)
|
l.Infof("%v for nonexistent or paused folder %q", op, folder)
|
||||||
return fmt.Errorf("%s: %w", folder, ErrFolderMissing)
|
return fmt.Errorf("%s: %w", folder, ErrFolderMissing)
|
||||||
|
@ -165,7 +165,7 @@ type model struct {
|
|||||||
helloMessages map[protocol.DeviceID]protocol.Hello
|
helloMessages map[protocol.DeviceID]protocol.Hello
|
||||||
deviceDownloads map[protocol.DeviceID]*deviceDownloadState
|
deviceDownloads map[protocol.DeviceID]*deviceDownloadState
|
||||||
remoteFolderStates map[protocol.DeviceID]map[string]remoteFolderState // deviceID -> folders
|
remoteFolderStates map[protocol.DeviceID]map[string]remoteFolderState // deviceID -> folders
|
||||||
indexHandlers map[protocol.DeviceID]*indexHandlerRegistry
|
indexHandlers *serviceMap[protocol.DeviceID, *indexHandlerRegistry]
|
||||||
|
|
||||||
// for testing only
|
// for testing only
|
||||||
foldersRunning atomic.Int32
|
foldersRunning atomic.Int32
|
||||||
@ -248,12 +248,13 @@ func NewModel(cfg config.Wrapper, id protocol.DeviceID, clientName, clientVersio
|
|||||||
helloMessages: make(map[protocol.DeviceID]protocol.Hello),
|
helloMessages: make(map[protocol.DeviceID]protocol.Hello),
|
||||||
deviceDownloads: make(map[protocol.DeviceID]*deviceDownloadState),
|
deviceDownloads: make(map[protocol.DeviceID]*deviceDownloadState),
|
||||||
remoteFolderStates: make(map[protocol.DeviceID]map[string]remoteFolderState),
|
remoteFolderStates: make(map[protocol.DeviceID]map[string]remoteFolderState),
|
||||||
indexHandlers: make(map[protocol.DeviceID]*indexHandlerRegistry),
|
indexHandlers: newServiceMap[protocol.DeviceID, *indexHandlerRegistry](evLogger),
|
||||||
}
|
}
|
||||||
for devID := range cfg.Devices() {
|
for devID := range cfg.Devices() {
|
||||||
m.deviceStatRefs[devID] = stats.NewDeviceStatisticsReference(m.db, devID)
|
m.deviceStatRefs[devID] = stats.NewDeviceStatisticsReference(m.db, devID)
|
||||||
}
|
}
|
||||||
m.Add(m.progressEmitter)
|
m.Add(m.progressEmitter)
|
||||||
|
m.Add(m.indexHandlers)
|
||||||
m.Add(svcutil.AsService(m.serve, m.String()))
|
m.Add(svcutil.AsService(m.serve, m.String()))
|
||||||
|
|
||||||
return m
|
return m
|
||||||
@ -487,9 +488,9 @@ func (m *model) removeFolder(cfg config.FolderConfiguration) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
m.cleanupFolderLocked(cfg)
|
m.cleanupFolderLocked(cfg)
|
||||||
for _, r := range m.indexHandlers {
|
m.indexHandlers.Each(func(_ protocol.DeviceID, r *indexHandlerRegistry) {
|
||||||
r.Remove(cfg.ID)
|
r.Remove(cfg.ID)
|
||||||
}
|
})
|
||||||
|
|
||||||
m.fmut.Unlock()
|
m.fmut.Unlock()
|
||||||
m.pmut.RUnlock()
|
m.pmut.RUnlock()
|
||||||
@ -563,9 +564,9 @@ func (m *model) restartFolder(from, to config.FolderConfiguration, cacheIgnoredF
|
|||||||
// Care needs to be taken because we already hold fmut and the lock order
|
// Care needs to be taken because we already hold fmut and the lock order
|
||||||
// must be the same everywhere. As fmut is acquired first, this is fine.
|
// must be the same everywhere. As fmut is acquired first, this is fine.
|
||||||
m.pmut.RLock()
|
m.pmut.RLock()
|
||||||
for _, indexRegistry := range m.indexHandlers {
|
m.indexHandlers.Each(func(_ protocol.DeviceID, r *indexHandlerRegistry) {
|
||||||
indexRegistry.RegisterFolderState(to, fset, m.folderRunners[to.ID])
|
r.RegisterFolderState(to, fset, m.folderRunners[to.ID])
|
||||||
}
|
})
|
||||||
m.pmut.RUnlock()
|
m.pmut.RUnlock()
|
||||||
|
|
||||||
var infoMsg string
|
var infoMsg string
|
||||||
@ -601,9 +602,9 @@ func (m *model) newFolder(cfg config.FolderConfiguration, cacheIgnoredFiles bool
|
|||||||
// Care needs to be taken because we already hold fmut and the lock order
|
// Care needs to be taken because we already hold fmut and the lock order
|
||||||
// must be the same everywhere. As fmut is acquired first, this is fine.
|
// must be the same everywhere. As fmut is acquired first, this is fine.
|
||||||
m.pmut.RLock()
|
m.pmut.RLock()
|
||||||
for _, indexRegistry := range m.indexHandlers {
|
m.indexHandlers.Each(func(_ protocol.DeviceID, r *indexHandlerRegistry) {
|
||||||
indexRegistry.RegisterFolderState(cfg, fset, m.folderRunners[cfg.ID])
|
r.RegisterFolderState(cfg, fset, m.folderRunners[cfg.ID])
|
||||||
}
|
})
|
||||||
m.pmut.RUnlock()
|
m.pmut.RUnlock()
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
@ -1138,7 +1139,7 @@ func (m *model) handleIndex(conn protocol.Connection, folder string, fs []protoc
|
|||||||
}
|
}
|
||||||
|
|
||||||
m.pmut.RLock()
|
m.pmut.RLock()
|
||||||
indexHandler, ok := m.indexHandlers[deviceID]
|
indexHandler, ok := m.indexHandlers.Get(deviceID)
|
||||||
m.pmut.RUnlock()
|
m.pmut.RUnlock()
|
||||||
if !ok {
|
if !ok {
|
||||||
// This should be impossible, as an index handler always exists for an
|
// This should be impossible, as an index handler always exists for an
|
||||||
@ -1170,7 +1171,7 @@ func (m *model) ClusterConfig(conn protocol.Connection, cm protocol.ClusterConfi
|
|||||||
l.Debugf("Handling ClusterConfig from %v", deviceID.Short())
|
l.Debugf("Handling ClusterConfig from %v", deviceID.Short())
|
||||||
|
|
||||||
m.pmut.RLock()
|
m.pmut.RLock()
|
||||||
indexHandlerRegistry, ok := m.indexHandlers[deviceID]
|
indexHandlerRegistry, ok := m.indexHandlers.Get(deviceID)
|
||||||
m.pmut.RUnlock()
|
m.pmut.RUnlock()
|
||||||
if !ok {
|
if !ok {
|
||||||
panic("bug: ClusterConfig called on closed or nonexistent connection")
|
panic("bug: ClusterConfig called on closed or nonexistent connection")
|
||||||
@ -1792,7 +1793,7 @@ func (m *model) Closed(conn protocol.Connection, err error) {
|
|||||||
delete(m.remoteFolderStates, device)
|
delete(m.remoteFolderStates, device)
|
||||||
closed := m.closed[device]
|
closed := m.closed[device]
|
||||||
delete(m.closed, device)
|
delete(m.closed, device)
|
||||||
delete(m.indexHandlers, device)
|
m.indexHandlers.RemoveAndWait(device, 0)
|
||||||
m.pmut.Unlock()
|
m.pmut.Unlock()
|
||||||
|
|
||||||
m.progressEmitter.temporaryIndexUnsubscribe(conn)
|
m.progressEmitter.temporaryIndexUnsubscribe(conn)
|
||||||
@ -2251,11 +2252,11 @@ func (m *model) AddConnection(conn protocol.Connection, hello protocol.Hello) {
|
|||||||
closed := make(chan struct{})
|
closed := make(chan struct{})
|
||||||
m.closed[deviceID] = closed
|
m.closed[deviceID] = closed
|
||||||
m.deviceDownloads[deviceID] = newDeviceDownloadState()
|
m.deviceDownloads[deviceID] = newDeviceDownloadState()
|
||||||
indexRegistry := newIndexHandlerRegistry(conn, m.deviceDownloads[deviceID], closed, m.Supervisor, m.evLogger)
|
indexRegistry := newIndexHandlerRegistry(conn, m.deviceDownloads[deviceID], m.evLogger)
|
||||||
for id, fcfg := range m.folderCfgs {
|
for id, fcfg := range m.folderCfgs {
|
||||||
indexRegistry.RegisterFolderState(fcfg, m.folderFiles[id], m.folderRunners[id])
|
indexRegistry.RegisterFolderState(fcfg, m.folderFiles[id], m.folderRunners[id])
|
||||||
}
|
}
|
||||||
m.indexHandlers[deviceID] = indexRegistry
|
m.indexHandlers.Add(deviceID, indexRegistry)
|
||||||
m.fmut.RUnlock()
|
m.fmut.RUnlock()
|
||||||
// 0: default, <0: no limiting
|
// 0: default, <0: no limiting
|
||||||
switch {
|
switch {
|
||||||
|
@ -1337,8 +1337,9 @@ func TestAutoAcceptEnc(t *testing.T) {
|
|||||||
// Earlier tests might cause the connection to get closed, thus ClusterConfig
|
// Earlier tests might cause the connection to get closed, thus ClusterConfig
|
||||||
// would panic.
|
// would panic.
|
||||||
clusterConfig := func(deviceID protocol.DeviceID, cm protocol.ClusterConfig) {
|
clusterConfig := func(deviceID protocol.DeviceID, cm protocol.ClusterConfig) {
|
||||||
m.AddConnection(newFakeConnection(deviceID, m), protocol.Hello{})
|
conn := newFakeConnection(deviceID, m)
|
||||||
m.ClusterConfig(&protocolmocks.Connection{DeviceIDStub: func() protocol.DeviceID { return deviceID }}, cm)
|
m.AddConnection(conn, protocol.Hello{})
|
||||||
|
m.ClusterConfig(conn, cm)
|
||||||
}
|
}
|
||||||
|
|
||||||
clusterConfig(device1, basicCC())
|
clusterConfig(device1, basicCC())
|
||||||
|
103
lib/model/service_map.go
Normal file
103
lib/model/service_map.go
Normal file
@ -0,0 +1,103 @@
|
|||||||
|
// Copyright (C) 2023 The Syncthing Authors.
|
||||||
|
//
|
||||||
|
// This Source Code Form is subject to the terms of the Mozilla Public
|
||||||
|
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
|
||||||
|
// You can obtain one at https://mozilla.org/MPL/2.0/.
|
||||||
|
|
||||||
|
package model
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/syncthing/syncthing/lib/events"
|
||||||
|
"github.com/syncthing/syncthing/lib/svcutil"
|
||||||
|
"github.com/thejerf/suture/v4"
|
||||||
|
)
|
||||||
|
|
||||||
|
// A serviceMap is a utility map of arbitrary keys to a suture.Service of
|
||||||
|
// some kind, where adding and removing services ensures they are properly
|
||||||
|
// started and stopped on the given Supervisor. The serviceMap is itself a
|
||||||
|
// suture.Service and should be added to a Supervisor.
|
||||||
|
// Not safe for concurrent use.
|
||||||
|
type serviceMap[K comparable, S suture.Service] struct {
|
||||||
|
services map[K]S
|
||||||
|
tokens map[K]suture.ServiceToken
|
||||||
|
supervisor *suture.Supervisor
|
||||||
|
eventLogger events.Logger
|
||||||
|
}
|
||||||
|
|
||||||
|
func newServiceMap[K comparable, S suture.Service](eventLogger events.Logger) *serviceMap[K, S] {
|
||||||
|
m := &serviceMap[K, S]{
|
||||||
|
services: make(map[K]S),
|
||||||
|
tokens: make(map[K]suture.ServiceToken),
|
||||||
|
eventLogger: eventLogger,
|
||||||
|
}
|
||||||
|
m.supervisor = suture.New(m.String(), svcutil.SpecWithDebugLogger(l))
|
||||||
|
return m
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add adds a service to the map, starting it on the supervisor. If there is
|
||||||
|
// already a service at the given key, it is removed first.
|
||||||
|
func (s *serviceMap[K, S]) Add(k K, v S) {
|
||||||
|
if tok, ok := s.tokens[k]; ok {
|
||||||
|
// There is already a service at this key, remove it first.
|
||||||
|
s.supervisor.Remove(tok)
|
||||||
|
s.eventLogger.Log(events.Failure, fmt.Sprintf("%s replaced service at key %v", s, k))
|
||||||
|
}
|
||||||
|
s.services[k] = v
|
||||||
|
s.tokens[k] = s.supervisor.Add(v)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get returns the service at the given key, or the empty value and false if
|
||||||
|
// there is no service at that key.
|
||||||
|
func (s *serviceMap[K, S]) Get(k K) (v S, ok bool) {
|
||||||
|
v, ok = s.services[k]
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Remove removes the service at the given key, stopping it on the supervisor.
|
||||||
|
// If there is no service at the given key, nothing happens. The return value
|
||||||
|
// indicates whether a service was removed.
|
||||||
|
func (s *serviceMap[K, S]) Remove(k K) (found bool) {
|
||||||
|
if tok, ok := s.tokens[k]; ok {
|
||||||
|
found = true
|
||||||
|
s.supervisor.Remove(tok)
|
||||||
|
}
|
||||||
|
delete(s.services, k)
|
||||||
|
delete(s.tokens, k)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// RemoveAndWait removes the service at the given key, stopping it on the
|
||||||
|
// supervisor. If there is no service at the given key, nothing happens. The
|
||||||
|
// return value indicates whether a service was removed.
|
||||||
|
func (s *serviceMap[K, S]) RemoveAndWait(k K, timeout time.Duration) (found bool) {
|
||||||
|
if tok, ok := s.tokens[k]; ok {
|
||||||
|
found = true
|
||||||
|
s.supervisor.RemoveAndWait(tok, timeout)
|
||||||
|
}
|
||||||
|
delete(s.services, k)
|
||||||
|
delete(s.tokens, k)
|
||||||
|
return found
|
||||||
|
}
|
||||||
|
|
||||||
|
// Each calls the given function for each service in the map.
|
||||||
|
func (s *serviceMap[K, S]) Each(fn func(K, S)) {
|
||||||
|
for key, svc := range s.services {
|
||||||
|
fn(key, svc)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Suture implementation
|
||||||
|
|
||||||
|
func (s *serviceMap[K, S]) Serve(ctx context.Context) error {
|
||||||
|
return s.supervisor.Serve(ctx)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *serviceMap[K, S]) String() string {
|
||||||
|
var kv K
|
||||||
|
var sv S
|
||||||
|
return fmt.Sprintf("serviceMap[%T, %T]@%p", kv, sv, s)
|
||||||
|
}
|
156
lib/model/service_map_test.go
Normal file
156
lib/model/service_map_test.go
Normal file
@ -0,0 +1,156 @@
|
|||||||
|
// Copyright (C) 2023 The Syncthing Authors.
|
||||||
|
//
|
||||||
|
// This Source Code Form is subject to the terms of the Mozilla Public
|
||||||
|
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
|
||||||
|
// You can obtain one at https://mozilla.org/MPL/2.0/.
|
||||||
|
|
||||||
|
package model
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"strings"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/syncthing/syncthing/lib/events"
|
||||||
|
"github.com/thejerf/suture/v4"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestServiceMap(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
t.Cleanup(cancel)
|
||||||
|
sup := suture.NewSimple("TestServiceMap")
|
||||||
|
sup.ServeBackground(ctx)
|
||||||
|
|
||||||
|
t.Run("SimpleAddRemove", func(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
sm := newServiceMap[string, *dummyService](events.NoopLogger)
|
||||||
|
sup.Add(sm)
|
||||||
|
|
||||||
|
// Add two services. They should start.
|
||||||
|
|
||||||
|
d1 := newDummyService()
|
||||||
|
d2 := newDummyService()
|
||||||
|
|
||||||
|
sm.Add("d1", d1)
|
||||||
|
sm.Add("d2", d2)
|
||||||
|
|
||||||
|
<-d1.started
|
||||||
|
<-d2.started
|
||||||
|
|
||||||
|
// Remove them. They should stop.
|
||||||
|
|
||||||
|
if !sm.Remove("d1") {
|
||||||
|
t.Errorf("Remove failed")
|
||||||
|
}
|
||||||
|
if !sm.Remove("d2") {
|
||||||
|
t.Errorf("Remove failed")
|
||||||
|
}
|
||||||
|
|
||||||
|
<-d1.stopped
|
||||||
|
<-d2.stopped
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("OverwriteImpliesRemove", func(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
sm := newServiceMap[string, *dummyService](events.NoopLogger)
|
||||||
|
sup.Add(sm)
|
||||||
|
|
||||||
|
d1 := newDummyService()
|
||||||
|
d2 := newDummyService()
|
||||||
|
|
||||||
|
// Add d1, it should start.
|
||||||
|
|
||||||
|
sm.Add("k", d1)
|
||||||
|
<-d1.started
|
||||||
|
|
||||||
|
// Add d2, with the same key. The previous one should stop as we're
|
||||||
|
// doing a replace.
|
||||||
|
|
||||||
|
sm.Add("k", d2)
|
||||||
|
<-d1.stopped
|
||||||
|
<-d2.started
|
||||||
|
|
||||||
|
if !sm.Remove("k") {
|
||||||
|
t.Errorf("Remove failed")
|
||||||
|
}
|
||||||
|
|
||||||
|
<-d2.stopped
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("IterateWithRemoveAndWait", func(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
sm := newServiceMap[string, *dummyService](events.NoopLogger)
|
||||||
|
sup.Add(sm)
|
||||||
|
|
||||||
|
// Add four services.
|
||||||
|
|
||||||
|
d1 := newDummyService()
|
||||||
|
d2 := newDummyService()
|
||||||
|
d3 := newDummyService()
|
||||||
|
d4 := newDummyService()
|
||||||
|
|
||||||
|
sm.Add("keep1", d1)
|
||||||
|
sm.Add("remove2", d2)
|
||||||
|
sm.Add("keep3", d3)
|
||||||
|
sm.Add("remove4", d4)
|
||||||
|
|
||||||
|
<-d1.started
|
||||||
|
<-d2.started
|
||||||
|
<-d3.started
|
||||||
|
<-d4.started
|
||||||
|
|
||||||
|
// Remove two of them from within the iterator.
|
||||||
|
|
||||||
|
sm.Each(func(k string, v *dummyService) {
|
||||||
|
if strings.HasPrefix(k, "remove") {
|
||||||
|
sm.RemoveAndWait(k, 0)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
// They should have stopped.
|
||||||
|
|
||||||
|
<-d2.stopped
|
||||||
|
<-d4.stopped
|
||||||
|
|
||||||
|
// They should not be in the map anymore.
|
||||||
|
|
||||||
|
if _, ok := sm.Get("remove2"); ok {
|
||||||
|
t.Errorf("Service still in map")
|
||||||
|
}
|
||||||
|
if _, ok := sm.Get("remove4"); ok {
|
||||||
|
t.Errorf("Service still in map")
|
||||||
|
}
|
||||||
|
|
||||||
|
// The other two should still be running.
|
||||||
|
|
||||||
|
if _, ok := sm.Get("keep1"); !ok {
|
||||||
|
t.Errorf("Service not in map")
|
||||||
|
}
|
||||||
|
if _, ok := sm.Get("keep3"); !ok {
|
||||||
|
t.Errorf("Service not in map")
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
type dummyService struct {
|
||||||
|
started chan struct{}
|
||||||
|
stopped chan struct{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func newDummyService() *dummyService {
|
||||||
|
return &dummyService{
|
||||||
|
started: make(chan struct{}),
|
||||||
|
stopped: make(chan struct{}),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *dummyService) Serve(ctx context.Context) error {
|
||||||
|
close(d.started)
|
||||||
|
defer close(d.stopped)
|
||||||
|
<-ctx.Done()
|
||||||
|
return nil
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user