mirror of
https://github.com/octoleo/syncthing.git
synced 2024-11-08 22:31:04 +00:00
cmd/syncthing: Extract interfaces for things the API depends on
Enables testing of the API service, in the long run.
This commit is contained in:
parent
894ccd18ff
commit
a492cfba13
@ -32,10 +32,10 @@ import (
|
||||
"github.com/syncthing/syncthing/lib/discover"
|
||||
"github.com/syncthing/syncthing/lib/events"
|
||||
"github.com/syncthing/syncthing/lib/logger"
|
||||
"github.com/syncthing/syncthing/lib/model"
|
||||
"github.com/syncthing/syncthing/lib/osutil"
|
||||
"github.com/syncthing/syncthing/lib/protocol"
|
||||
"github.com/syncthing/syncthing/lib/relay"
|
||||
"github.com/syncthing/syncthing/lib/stats"
|
||||
"github.com/syncthing/syncthing/lib/sync"
|
||||
"github.com/syncthing/syncthing/lib/tlsutil"
|
||||
"github.com/syncthing/syncthing/lib/upgrade"
|
||||
@ -50,15 +50,15 @@ var (
|
||||
|
||||
type apiService struct {
|
||||
id protocol.DeviceID
|
||||
cfg *config.Wrapper
|
||||
cfg configIntf
|
||||
httpsCertFile string
|
||||
httpsKeyFile string
|
||||
assetDir string
|
||||
themes []string
|
||||
model *model.Model
|
||||
eventSub *events.BufferedSubscription
|
||||
discoverer *discover.CachingMux
|
||||
relayService *relay.Service
|
||||
model modelIntf
|
||||
eventSub events.BufferedSubscription
|
||||
discoverer discover.CachingMux
|
||||
relayService relay.Service
|
||||
fss *folderSummaryService
|
||||
systemConfigMut sync.Mutex // serializes posts to /rest/system/config
|
||||
stop chan struct{} // signals intentional stop
|
||||
@ -68,11 +68,52 @@ type apiService struct {
|
||||
listener net.Listener
|
||||
listenerMut sync.Mutex
|
||||
|
||||
guiErrors *logger.Recorder
|
||||
systemLog *logger.Recorder
|
||||
guiErrors logger.Recorder
|
||||
systemLog logger.Recorder
|
||||
}
|
||||
|
||||
func newAPIService(id protocol.DeviceID, cfg *config.Wrapper, httpsCertFile, httpsKeyFile, assetDir string, m *model.Model, eventSub *events.BufferedSubscription, discoverer *discover.CachingMux, relayService *relay.Service, errors, systemLog *logger.Recorder) (*apiService, error) {
|
||||
type modelIntf interface {
|
||||
GlobalDirectoryTree(folder, prefix string, levels int, dirsonly bool) map[string]interface{}
|
||||
Completion(device protocol.DeviceID, folder string) float64
|
||||
Override(folder string)
|
||||
NeedFolderFiles(folder string, page, perpage int) ([]db.FileInfoTruncated, []db.FileInfoTruncated, []db.FileInfoTruncated, int)
|
||||
NeedSize(folder string) (nfiles int, bytes int64)
|
||||
ConnectionStats() map[string]interface{}
|
||||
DeviceStatistics() map[string]stats.DeviceStatistics
|
||||
FolderStatistics() map[string]stats.FolderStatistics
|
||||
CurrentFolderFile(folder string, file string) (protocol.FileInfo, bool)
|
||||
CurrentGlobalFile(folder string, file string) (protocol.FileInfo, bool)
|
||||
ResetFolder(folder string)
|
||||
Availability(folder, file string) []protocol.DeviceID
|
||||
GetIgnores(folder string) ([]string, []string, error)
|
||||
SetIgnores(folder string, content []string) error
|
||||
PauseDevice(device protocol.DeviceID)
|
||||
ResumeDevice(device protocol.DeviceID)
|
||||
DelayScan(folder string, next time.Duration)
|
||||
ScanFolder(folder string) error
|
||||
ScanFolders() map[string]error
|
||||
ScanFolderSubs(folder string, subs []string) error
|
||||
BringToFront(folder, file string)
|
||||
ConnectedTo(deviceID protocol.DeviceID) bool
|
||||
GlobalSize(folder string) (nfiles, deleted int, bytes int64)
|
||||
LocalSize(folder string) (nfiles, deleted int, bytes int64)
|
||||
CurrentLocalVersion(folder string) (int64, bool)
|
||||
RemoteLocalVersion(folder string) (int64, bool)
|
||||
State(folder string) (string, time.Time, error)
|
||||
}
|
||||
|
||||
type configIntf interface {
|
||||
GUI() config.GUIConfiguration
|
||||
Raw() config.Configuration
|
||||
Options() config.OptionsConfiguration
|
||||
Replace(cfg config.Configuration) config.CommitResponse
|
||||
Subscribe(c config.Committer)
|
||||
Folders() map[string]config.FolderConfiguration
|
||||
Devices() map[protocol.DeviceID]config.DeviceConfiguration
|
||||
Save() error
|
||||
}
|
||||
|
||||
func newAPIService(id protocol.DeviceID, cfg configIntf, httpsCertFile, httpsKeyFile, assetDir string, m modelIntf, eventSub events.BufferedSubscription, discoverer discover.CachingMux, relayService relay.Service, errors, systemLog logger.Recorder) (*apiService, error) {
|
||||
service := &apiService{
|
||||
id: id,
|
||||
cfg: cfg,
|
||||
@ -554,7 +595,7 @@ func (s *apiService) getDBStatus(w http.ResponseWriter, r *http.Request) {
|
||||
sendJSON(w, folderSummary(s.cfg, s.model, folder))
|
||||
}
|
||||
|
||||
func folderSummary(cfg *config.Wrapper, m *model.Model, folder string) map[string]interface{} {
|
||||
func folderSummary(cfg configIntf, m modelIntf, folder string) map[string]interface{} {
|
||||
var res = make(map[string]interface{})
|
||||
|
||||
res["invalid"] = cfg.Folders()[folder].Invalid
|
||||
|
@ -739,7 +739,7 @@ func syncthingMain(runtimeOptions RuntimeOptions) {
|
||||
|
||||
// Start relay management
|
||||
|
||||
var relayService *relay.Service
|
||||
var relayService relay.Service
|
||||
if opts.RelaysEnabled {
|
||||
relayService = relay.NewService(cfg, tlsCfg)
|
||||
mainService.Add(relayService)
|
||||
@ -972,7 +972,7 @@ func startAuditing(mainService *suture.Supervisor) {
|
||||
l.Infoln("Audit log in", auditFile)
|
||||
}
|
||||
|
||||
func setupGUI(mainService *suture.Supervisor, cfg *config.Wrapper, m *model.Model, apiSub *events.BufferedSubscription, discoverer *discover.CachingMux, relayService *relay.Service, errors, systemLog *logger.Recorder, runtimeOptions RuntimeOptions) {
|
||||
func setupGUI(mainService *suture.Supervisor, cfg *config.Wrapper, m *model.Model, apiSub events.BufferedSubscription, discoverer discover.CachingMux, relayService relay.Service, errors, systemLog logger.Recorder, runtimeOptions RuntimeOptions) {
|
||||
guiCfg := cfg.GUI()
|
||||
|
||||
if !guiCfg.Enabled {
|
||||
|
@ -9,9 +9,7 @@ package main
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/syncthing/syncthing/lib/config"
|
||||
"github.com/syncthing/syncthing/lib/events"
|
||||
"github.com/syncthing/syncthing/lib/model"
|
||||
"github.com/syncthing/syncthing/lib/sync"
|
||||
"github.com/thejerf/suture"
|
||||
)
|
||||
@ -21,8 +19,8 @@ import (
|
||||
type folderSummaryService struct {
|
||||
*suture.Supervisor
|
||||
|
||||
cfg *config.Wrapper
|
||||
model *model.Model
|
||||
cfg configIntf
|
||||
model modelIntf
|
||||
stop chan struct{}
|
||||
immediate chan string
|
||||
|
||||
@ -35,7 +33,7 @@ type folderSummaryService struct {
|
||||
lastEventReqMut sync.Mutex
|
||||
}
|
||||
|
||||
func newFolderSummaryService(cfg *config.Wrapper, m *model.Model) *folderSummaryService {
|
||||
func newFolderSummaryService(cfg configIntf, m modelIntf) *folderSummaryService {
|
||||
service := &folderSummaryService{
|
||||
Supervisor: suture.NewSimple("folderSummaryService"),
|
||||
cfg: cfg,
|
||||
|
@ -79,7 +79,7 @@ func (m *usageReportingManager) String() string {
|
||||
|
||||
// reportData returns the data to be sent in a usage report. It's used in
|
||||
// various places, so not part of the usageReportingManager object.
|
||||
func reportData(cfg *config.Wrapper, m *model.Model) map[string]interface{} {
|
||||
func reportData(cfg configIntf, m modelIntf) map[string]interface{} {
|
||||
res := make(map[string]interface{})
|
||||
res["urVersion"] = usageReportVersion
|
||||
res["uniqueID"] = cfg.Options().URUniqueID
|
||||
|
@ -52,7 +52,7 @@ type connectionService struct {
|
||||
tlsCfg *tls.Config
|
||||
discoverer discover.Finder
|
||||
conns chan model.IntermediateConnection
|
||||
relayService *relay.Service
|
||||
relayService relay.Service
|
||||
bepProtocolName string
|
||||
tlsDefaultCommonName string
|
||||
lans []*net.IPNet
|
||||
@ -66,7 +66,7 @@ type connectionService struct {
|
||||
relaysEnabled bool
|
||||
}
|
||||
|
||||
func NewConnectionService(cfg *config.Wrapper, myID protocol.DeviceID, mdl Model, tlsCfg *tls.Config, discoverer discover.Finder, relayService *relay.Service,
|
||||
func NewConnectionService(cfg *config.Wrapper, myID protocol.DeviceID, mdl Model, tlsCfg *tls.Config, discoverer discover.Finder, relayService relay.Service,
|
||||
bepProtocolName string, tlsDefaultCommonName string, lans []*net.IPNet) suture.Service {
|
||||
service := &connectionService{
|
||||
Supervisor: suture.NewSimple("connectionService"),
|
||||
|
@ -22,7 +22,13 @@ import (
|
||||
// time sets how long we refrain from asking about the same device ID after
|
||||
// receiving a negative answer. The value of zero disables caching (positive
|
||||
// or negative).
|
||||
type CachingMux struct {
|
||||
type CachingMux interface {
|
||||
FinderService
|
||||
Add(finder Finder, cacheTime, negCacheTime time.Duration, priority int)
|
||||
ChildErrors() map[string]error
|
||||
}
|
||||
|
||||
type cachingMux struct {
|
||||
*suture.Supervisor
|
||||
finders []cachedFinder
|
||||
caches []*cache
|
||||
@ -51,15 +57,15 @@ type cachedError interface {
|
||||
CacheFor() time.Duration
|
||||
}
|
||||
|
||||
func NewCachingMux() *CachingMux {
|
||||
return &CachingMux{
|
||||
func NewCachingMux() CachingMux {
|
||||
return &cachingMux{
|
||||
Supervisor: suture.NewSimple("discover.cachingMux"),
|
||||
mut: sync.NewRWMutex(),
|
||||
}
|
||||
}
|
||||
|
||||
// Add registers a new Finder, with associated cache timeouts.
|
||||
func (m *CachingMux) Add(finder Finder, cacheTime, negCacheTime time.Duration, priority int) {
|
||||
func (m *cachingMux) Add(finder Finder, cacheTime, negCacheTime time.Duration, priority int) {
|
||||
m.mut.Lock()
|
||||
m.finders = append(m.finders, cachedFinder{finder, cacheTime, negCacheTime, priority})
|
||||
m.caches = append(m.caches, newCache())
|
||||
@ -72,7 +78,7 @@ func (m *CachingMux) Add(finder Finder, cacheTime, negCacheTime time.Duration, p
|
||||
|
||||
// Lookup attempts to resolve the device ID using any of the added Finders,
|
||||
// while obeying the cache settings.
|
||||
func (m *CachingMux) Lookup(deviceID protocol.DeviceID) (direct []string, relays []Relay, err error) {
|
||||
func (m *cachingMux) Lookup(deviceID protocol.DeviceID) (direct []string, relays []Relay, err error) {
|
||||
var pdirect []prioritizedAddress
|
||||
|
||||
m.mut.RLock()
|
||||
@ -140,15 +146,15 @@ func (m *CachingMux) Lookup(deviceID protocol.DeviceID) (direct []string, relays
|
||||
return direct, relays, nil
|
||||
}
|
||||
|
||||
func (m *CachingMux) String() string {
|
||||
func (m *cachingMux) String() string {
|
||||
return "discovery cache"
|
||||
}
|
||||
|
||||
func (m *CachingMux) Error() error {
|
||||
func (m *cachingMux) Error() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *CachingMux) ChildErrors() map[string]error {
|
||||
func (m *cachingMux) ChildErrors() map[string]error {
|
||||
children := make(map[string]error, len(m.finders))
|
||||
m.mut.RLock()
|
||||
for _, f := range m.finders {
|
||||
@ -158,7 +164,7 @@ func (m *CachingMux) ChildErrors() map[string]error {
|
||||
return children
|
||||
}
|
||||
|
||||
func (m *CachingMux) Cache() map[protocol.DeviceID]CacheEntry {
|
||||
func (m *cachingMux) Cache() map[protocol.DeviceID]CacheEntry {
|
||||
// Res will be the "total" cache, i.e. the union of our cache and all our
|
||||
// children's caches.
|
||||
res := make(map[protocol.DeviceID]CacheEntry)
|
||||
|
@ -33,7 +33,7 @@ func TestCacheUnique(t *testing.T) {
|
||||
relays := []Relay{{URL: "relay://192.0.2.44:443"}, {URL: "tcp://192.0.2.45:443"}}
|
||||
|
||||
c := NewCachingMux()
|
||||
c.ServeBackground()
|
||||
c.(*cachingMux).ServeBackground()
|
||||
defer c.Stop()
|
||||
|
||||
// Add a fake discovery service and verify we get it's answers through the
|
||||
@ -94,7 +94,7 @@ func (f *fakeDiscovery) Cache() map[protocol.DeviceID]CacheEntry {
|
||||
|
||||
func TestCacheSlowLookup(t *testing.T) {
|
||||
c := NewCachingMux()
|
||||
c.ServeBackground()
|
||||
c.(*cachingMux).ServeBackground()
|
||||
defer c.Stop()
|
||||
|
||||
// Add a slow discovery service.
|
||||
|
@ -227,7 +227,7 @@ func (s *Subscription) C() <-chan Event {
|
||||
return s.events
|
||||
}
|
||||
|
||||
type BufferedSubscription struct {
|
||||
type bufferedSubscription struct {
|
||||
sub *Subscription
|
||||
buf []Event
|
||||
next int
|
||||
@ -236,8 +236,12 @@ type BufferedSubscription struct {
|
||||
cond *stdsync.Cond
|
||||
}
|
||||
|
||||
func NewBufferedSubscription(s *Subscription, size int) *BufferedSubscription {
|
||||
bs := &BufferedSubscription{
|
||||
type BufferedSubscription interface {
|
||||
Since(id int, into []Event) []Event
|
||||
}
|
||||
|
||||
func NewBufferedSubscription(s *Subscription, size int) BufferedSubscription {
|
||||
bs := &bufferedSubscription{
|
||||
sub: s,
|
||||
buf: make([]Event, size),
|
||||
mut: sync.NewMutex(),
|
||||
@ -247,7 +251,7 @@ func NewBufferedSubscription(s *Subscription, size int) *BufferedSubscription {
|
||||
return bs
|
||||
}
|
||||
|
||||
func (s *BufferedSubscription) pollingLoop() {
|
||||
func (s *bufferedSubscription) pollingLoop() {
|
||||
for {
|
||||
ev, err := s.sub.Poll(60 * time.Second)
|
||||
if err == ErrTimeout {
|
||||
@ -269,7 +273,7 @@ func (s *BufferedSubscription) pollingLoop() {
|
||||
}
|
||||
}
|
||||
|
||||
func (s *BufferedSubscription) Since(id int, into []Event) []Event {
|
||||
func (s *bufferedSubscription) Since(id int, into []Event) []Event {
|
||||
s.mut.Lock()
|
||||
defer s.mut.Unlock()
|
||||
|
||||
|
@ -290,7 +290,12 @@ func (l *facilityLogger) Debugf(format string, vals ...interface{}) {
|
||||
}
|
||||
|
||||
// A Recorder keeps a size limited record of log events.
|
||||
type Recorder struct {
|
||||
type Recorder interface {
|
||||
Since(t time.Time) []Line
|
||||
Clear()
|
||||
}
|
||||
|
||||
type recorder struct {
|
||||
lines []Line
|
||||
initial int
|
||||
mut sync.Mutex
|
||||
@ -302,8 +307,8 @@ type Line struct {
|
||||
Message string `json:"message"`
|
||||
}
|
||||
|
||||
func NewRecorder(l Logger, level LogLevel, size, initial int) *Recorder {
|
||||
r := &Recorder{
|
||||
func NewRecorder(l Logger, level LogLevel, size, initial int) Recorder {
|
||||
r := &recorder{
|
||||
lines: make([]Line, 0, size),
|
||||
initial: initial,
|
||||
}
|
||||
@ -311,7 +316,7 @@ func NewRecorder(l Logger, level LogLevel, size, initial int) *Recorder {
|
||||
return r
|
||||
}
|
||||
|
||||
func (r *Recorder) Since(t time.Time) []Line {
|
||||
func (r *recorder) Since(t time.Time) []Line {
|
||||
r.mut.Lock()
|
||||
defer r.mut.Unlock()
|
||||
|
||||
@ -330,13 +335,13 @@ func (r *Recorder) Since(t time.Time) []Line {
|
||||
return cp
|
||||
}
|
||||
|
||||
func (r *Recorder) Clear() {
|
||||
func (r *recorder) Clear() {
|
||||
r.mut.Lock()
|
||||
r.lines = r.lines[:0]
|
||||
r.mut.Unlock()
|
||||
}
|
||||
|
||||
func (r *Recorder) append(l LogLevel, msg string) {
|
||||
func (r *recorder) append(l LogLevel, msg string) {
|
||||
line := Line{
|
||||
When: time.Now(),
|
||||
Message: msg,
|
||||
|
@ -26,7 +26,14 @@ const (
|
||||
eventBroadcasterCheckInterval = 10 * time.Second
|
||||
)
|
||||
|
||||
type Service struct {
|
||||
type Service interface {
|
||||
suture.Service
|
||||
Accept() *tls.Conn
|
||||
Relays() []string
|
||||
RelayStatus(uri string) (time.Duration, bool)
|
||||
}
|
||||
|
||||
type service struct {
|
||||
*suture.Supervisor
|
||||
cfg *config.Wrapper
|
||||
tlsCfg *tls.Config
|
||||
@ -38,10 +45,10 @@ type Service struct {
|
||||
conns chan *tls.Conn
|
||||
}
|
||||
|
||||
func NewService(cfg *config.Wrapper, tlsCfg *tls.Config) *Service {
|
||||
func NewService(cfg *config.Wrapper, tlsCfg *tls.Config) Service {
|
||||
conns := make(chan *tls.Conn)
|
||||
|
||||
service := &Service{
|
||||
service := &service{
|
||||
Supervisor: suture.New("Service", suture.Spec{
|
||||
Log: func(log string) {
|
||||
l.Debugln(log)
|
||||
@ -82,7 +89,7 @@ func NewService(cfg *config.Wrapper, tlsCfg *tls.Config) *Service {
|
||||
return service
|
||||
}
|
||||
|
||||
func (s *Service) VerifyConfiguration(from, to config.Configuration) error {
|
||||
func (s *service) VerifyConfiguration(from, to config.Configuration) error {
|
||||
for _, addr := range to.Options.RelayServers {
|
||||
_, err := url.Parse(addr)
|
||||
if err != nil {
|
||||
@ -92,7 +99,7 @@ func (s *Service) VerifyConfiguration(from, to config.Configuration) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Service) CommitConfiguration(from, to config.Configuration) bool {
|
||||
func (s *service) CommitConfiguration(from, to config.Configuration) bool {
|
||||
existing := make(map[string]*url.URL, len(to.Options.RelayServers))
|
||||
|
||||
for _, addr := range to.Options.RelayServers {
|
||||
@ -142,7 +149,7 @@ type Status struct {
|
||||
}
|
||||
|
||||
// Relays return the list of relays that currently have an OK status.
|
||||
func (s *Service) Relays() []string {
|
||||
func (s *service) Relays() []string {
|
||||
if s == nil {
|
||||
// A nil client does not have a status, really. Yet we may be called
|
||||
// this way, for raisins...
|
||||
@ -162,7 +169,7 @@ func (s *Service) Relays() []string {
|
||||
}
|
||||
|
||||
// RelayStatus returns the latency and OK status for a given relay.
|
||||
func (s *Service) RelayStatus(uri string) (time.Duration, bool) {
|
||||
func (s *service) RelayStatus(uri string) (time.Duration, bool) {
|
||||
if s == nil {
|
||||
// A nil client does not have a status, really. Yet we may be called
|
||||
// this way, for raisins...
|
||||
@ -182,7 +189,7 @@ func (s *Service) RelayStatus(uri string) (time.Duration, bool) {
|
||||
}
|
||||
|
||||
// Accept returns a new *tls.Conn. The connection is already handshaken.
|
||||
func (s *Service) Accept() *tls.Conn {
|
||||
func (s *service) Accept() *tls.Conn {
|
||||
return <-s.conns
|
||||
}
|
||||
|
||||
@ -234,7 +241,7 @@ func (r *invitationReceiver) Stop() {
|
||||
// no way to get the event feed directly from the relay lib. This may be
|
||||
// something to revisit later, possibly.
|
||||
type eventBroadcaster struct {
|
||||
Service *Service
|
||||
Service Service
|
||||
stop chan struct{}
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user