lib: Add util.Service as suture.Service template (fixes #5801) (#5806)

This commit is contained in:
Simon Frei 2019-07-09 11:40:30 +02:00 committed by GitHub
parent d0ab65a178
commit ba056578ec
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 340 additions and 420 deletions

View File

@ -28,6 +28,10 @@ import (
"time" "time"
metrics "github.com/rcrowley/go-metrics" metrics "github.com/rcrowley/go-metrics"
"github.com/thejerf/suture"
"github.com/vitrun/qart/qr"
"golang.org/x/crypto/bcrypt"
"github.com/syncthing/syncthing/lib/build" "github.com/syncthing/syncthing/lib/build"
"github.com/syncthing/syncthing/lib/config" "github.com/syncthing/syncthing/lib/config"
"github.com/syncthing/syncthing/lib/connections" "github.com/syncthing/syncthing/lib/connections"
@ -44,9 +48,7 @@ import (
"github.com/syncthing/syncthing/lib/tlsutil" "github.com/syncthing/syncthing/lib/tlsutil"
"github.com/syncthing/syncthing/lib/upgrade" "github.com/syncthing/syncthing/lib/upgrade"
"github.com/syncthing/syncthing/lib/ur" "github.com/syncthing/syncthing/lib/ur"
"github.com/thejerf/suture" "github.com/syncthing/syncthing/lib/util"
"github.com/vitrun/qart/qr"
"golang.org/x/crypto/bcrypt"
) )
// matches a bcrypt hash and not too much else // matches a bcrypt hash and not too much else
@ -60,6 +62,8 @@ const (
) )
type service struct { type service struct {
suture.Service
id protocol.DeviceID id protocol.DeviceID
cfg config.Wrapper cfg config.Wrapper
statics *staticsServer statics *staticsServer
@ -102,7 +106,7 @@ type Service interface {
} }
func New(id protocol.DeviceID, cfg config.Wrapper, assetDir, tlsDefaultCommonName string, m model.Model, defaultSub, diskSub events.BufferedSubscription, discoverer discover.CachingMux, connectionsService connections.Service, urService *ur.Service, fss model.FolderSummaryService, errors, systemLog logger.Recorder, cpu Rater, contr Controller, noUpgrade bool) Service { func New(id protocol.DeviceID, cfg config.Wrapper, assetDir, tlsDefaultCommonName string, m model.Model, defaultSub, diskSub events.BufferedSubscription, discoverer discover.CachingMux, connectionsService connections.Service, urService *ur.Service, fss model.FolderSummaryService, errors, systemLog logger.Recorder, cpu Rater, contr Controller, noUpgrade bool) Service {
return &service{ s := &service{
id: id, id: id,
cfg: cfg, cfg: cfg,
statics: newStaticsServer(cfg.GUI().Theme, assetDir), statics: newStaticsServer(cfg.GUI().Theme, assetDir),
@ -123,10 +127,11 @@ func New(id protocol.DeviceID, cfg config.Wrapper, assetDir, tlsDefaultCommonNam
contr: contr, contr: contr,
noUpgrade: noUpgrade, noUpgrade: noUpgrade,
tlsDefaultCommonName: tlsDefaultCommonName, tlsDefaultCommonName: tlsDefaultCommonName,
stop: make(chan struct{}),
configChanged: make(chan struct{}), configChanged: make(chan struct{}),
startedOnce: make(chan struct{}), startedOnce: make(chan struct{}),
} }
s.Service = util.AsService(s.serve)
return s
} }
func (s *service) WaitForStart() error { func (s *service) WaitForStart() error {
@ -190,7 +195,7 @@ func sendJSON(w http.ResponseWriter, jsonObject interface{}) {
fmt.Fprintf(w, "%s\n", bs) fmt.Fprintf(w, "%s\n", bs)
} }
func (s *service) Serve() { func (s *service) serve(stop chan struct{}) {
listener, err := s.getListener(s.cfg.GUI()) listener, err := s.getListener(s.cfg.GUI())
if err != nil { if err != nil {
select { select {
@ -360,7 +365,7 @@ func (s *service) Serve() {
// Wait for stop, restart or error signals // Wait for stop, restart or error signals
select { select {
case <-s.stop: case <-stop:
// Shutting down permanently // Shutting down permanently
l.Debugln("shutting down (stop)") l.Debugln("shutting down (stop)")
case <-s.configChanged: case <-s.configChanged:
@ -378,17 +383,11 @@ func (s *service) Complete() bool {
select { select {
case <-s.startedOnce: case <-s.startedOnce:
return s.startupErr != nil return s.startupErr != nil
case <-s.stop:
return true
default: default:
} }
return false return false
} }
func (s *service) Stop() {
close(s.stop)
}
func (s *service) String() string { func (s *service) String() string {
return fmt.Sprintf("api.service@%p", s) return fmt.Sprintf("api.service@%p", s)
} }

View File

@ -14,6 +14,8 @@ import (
"github.com/thejerf/suture" "github.com/thejerf/suture"
"golang.org/x/net/ipv6" "golang.org/x/net/ipv6"
"github.com/syncthing/syncthing/lib/util"
) )
type Multicast struct { type Multicast struct {
@ -45,15 +47,15 @@ func NewMulticast(addr string) *Multicast {
m.mr = &multicastReader{ m.mr = &multicastReader{
addr: addr, addr: addr,
outbox: m.outbox, outbox: m.outbox,
stop: make(chan struct{}),
} }
m.mr.Service = util.AsService(m.mr.serve)
m.Add(m.mr) m.Add(m.mr)
m.mw = &multicastWriter{ m.mw = &multicastWriter{
addr: addr, addr: addr,
inbox: m.inbox, inbox: m.inbox,
stop: make(chan struct{}),
} }
m.mw.Service = util.AsService(m.mw.serve)
m.Add(m.mw) m.Add(m.mw)
return m return m
@ -76,13 +78,13 @@ func (m *Multicast) Error() error {
} }
type multicastWriter struct { type multicastWriter struct {
suture.Service
addr string addr string
inbox <-chan []byte inbox <-chan []byte
errorHolder errorHolder
stop chan struct{}
} }
func (w *multicastWriter) Serve() { func (w *multicastWriter) serve(stop chan struct{}) {
l.Debugln(w, "starting") l.Debugln(w, "starting")
defer l.Debugln(w, "stopping") defer l.Debugln(w, "stopping")
@ -106,7 +108,14 @@ func (w *multicastWriter) Serve() {
HopLimit: 1, HopLimit: 1,
} }
for bs := range w.inbox { for {
var bs []byte
select {
case bs = <-w.inbox:
case <-stop:
return
}
intfs, err := net.Interfaces() intfs, err := net.Interfaces()
if err != nil { if err != nil {
l.Debugln(err) l.Debugln(err)
@ -130,6 +139,12 @@ func (w *multicastWriter) Serve() {
l.Debugf("sent %d bytes to %v on %s", len(bs), gaddr, intf.Name) l.Debugf("sent %d bytes to %v on %s", len(bs), gaddr, intf.Name)
success++ success++
select {
case <-stop:
return
default:
}
} }
if success > 0 { if success > 0 {
@ -141,22 +156,18 @@ func (w *multicastWriter) Serve() {
} }
} }
func (w *multicastWriter) Stop() {
close(w.stop)
}
func (w *multicastWriter) String() string { func (w *multicastWriter) String() string {
return fmt.Sprintf("multicastWriter@%p", w) return fmt.Sprintf("multicastWriter@%p", w)
} }
type multicastReader struct { type multicastReader struct {
suture.Service
addr string addr string
outbox chan<- recv outbox chan<- recv
errorHolder errorHolder
stop chan struct{}
} }
func (r *multicastReader) Serve() { func (r *multicastReader) serve(stop chan struct{}) {
l.Debugln(r, "starting") l.Debugln(r, "starting")
defer l.Debugln(r, "stopping") defer l.Debugln(r, "stopping")
@ -213,16 +224,14 @@ func (r *multicastReader) Serve() {
copy(c, bs) copy(c, bs)
select { select {
case r.outbox <- recv{c, addr}: case r.outbox <- recv{c, addr}:
case <-stop:
return
default: default:
l.Debugln("dropping message") l.Debugln("dropping message")
} }
} }
} }
func (r *multicastReader) Stop() {
close(r.stop)
}
func (r *multicastReader) String() string { func (r *multicastReader) String() string {
return fmt.Sprintf("multicastReader@%p", r) return fmt.Sprintf("multicastReader@%p", r)
} }

View File

@ -23,6 +23,7 @@ import (
"github.com/syncthing/syncthing/lib/connections/registry" "github.com/syncthing/syncthing/lib/connections/registry"
"github.com/syncthing/syncthing/lib/nat" "github.com/syncthing/syncthing/lib/nat"
"github.com/syncthing/syncthing/lib/stun" "github.com/syncthing/syncthing/lib/stun"
"github.com/syncthing/syncthing/lib/util"
) )
func init() { func init() {
@ -33,6 +34,7 @@ func init() {
} }
type quicListener struct { type quicListener struct {
util.ServiceWithError
nat atomic.Value nat atomic.Value
onAddressesChangedNotifier onAddressesChangedNotifier
@ -40,12 +42,10 @@ type quicListener struct {
uri *url.URL uri *url.URL
cfg config.Wrapper cfg config.Wrapper
tlsCfg *tls.Config tlsCfg *tls.Config
stop chan struct{}
conns chan internalConn conns chan internalConn
factory listenerFactory factory listenerFactory
address *url.URL address *url.URL
err error
mut sync.Mutex mut sync.Mutex
} }
@ -77,20 +77,13 @@ func (t *quicListener) OnExternalAddressChanged(address *stun.Host, via string)
} }
} }
func (t *quicListener) Serve() { func (t *quicListener) serve(stop chan struct{}) error {
t.mut.Lock()
t.err = nil
t.mut.Unlock()
network := strings.Replace(t.uri.Scheme, "quic", "udp", -1) network := strings.Replace(t.uri.Scheme, "quic", "udp", -1)
packetConn, err := net.ListenPacket(network, t.uri.Host) packetConn, err := net.ListenPacket(network, t.uri.Host)
if err != nil { if err != nil {
t.mut.Lock()
t.err = err
t.mut.Unlock()
l.Infoln("Listen (BEP/quic):", err) l.Infoln("Listen (BEP/quic):", err)
return return err
} }
defer func() { _ = packetConn.Close() }() defer func() { _ = packetConn.Close() }()
@ -105,11 +98,8 @@ func (t *quicListener) Serve() {
listener, err := quic.Listen(conn, t.tlsCfg, quicConfig) listener, err := quic.Listen(conn, t.tlsCfg, quicConfig)
if err != nil { if err != nil {
t.mut.Lock()
t.err = err
t.mut.Unlock()
l.Infoln("Listen (BEP/quic):", err) l.Infoln("Listen (BEP/quic):", err)
return return err
} }
l.Infof("QUIC listener (%v) starting", packetConn.LocalAddr()) l.Infof("QUIC listener (%v) starting", packetConn.LocalAddr())
@ -118,7 +108,7 @@ func (t *quicListener) Serve() {
// Accept is forever, so handle stops externally. // Accept is forever, so handle stops externally.
go func() { go func() {
select { select {
case <-t.stop: case <-stop:
_ = listener.Close() _ = listener.Close()
} }
}() }()
@ -128,11 +118,11 @@ func (t *quicListener) Serve() {
session, err := listener.Accept() session, err := listener.Accept()
select { select {
case <-t.stop: case <-stop:
if err == nil { if err == nil {
_ = session.Close() _ = session.Close()
} }
return return nil
default: default:
} }
if err != nil { if err != nil {
@ -150,7 +140,7 @@ func (t *quicListener) Serve() {
select { select {
case <-ok: case <-ok:
return return
case <-t.stop: case <-stop:
_ = session.Close() _ = session.Close()
case <-time.After(10 * time.Second): case <-time.After(10 * time.Second):
l.Debugln("timed out waiting for AcceptStream on", session.RemoteAddr()) l.Debugln("timed out waiting for AcceptStream on", session.RemoteAddr())
@ -170,10 +160,6 @@ func (t *quicListener) Serve() {
} }
} }
func (t *quicListener) Stop() {
close(t.stop)
}
func (t *quicListener) URI() *url.URL { func (t *quicListener) URI() *url.URL {
return t.uri return t.uri
} }
@ -192,13 +178,6 @@ func (t *quicListener) LANAddresses() []*url.URL {
return []*url.URL{t.uri} return []*url.URL{t.uri}
} }
func (t *quicListener) Error() error {
t.mut.Lock()
err := t.err
t.mut.Unlock()
return err
}
func (t *quicListener) String() string { func (t *quicListener) String() string {
return t.uri.String() return t.uri.String()
} }
@ -227,9 +206,9 @@ func (f *quicListenerFactory) New(uri *url.URL, cfg config.Wrapper, tlsCfg *tls.
cfg: cfg, cfg: cfg,
tlsCfg: tlsCfg, tlsCfg: tlsCfg,
conns: conns, conns: conns,
stop: make(chan struct{}),
factory: f, factory: f,
} }
l.ServiceWithError = util.AsServiceWithError(l.serve)
l.nat.Store(stun.NATUnknown) l.nat.Store(stun.NATUnknown)
return l return l
} }

View File

@ -184,16 +184,22 @@ func NewService(cfg config.Wrapper, myID protocol.DeviceID, mdl Model, tlsCfg *t
// the common handling regardless of whether the connection was // the common handling regardless of whether the connection was
// incoming or outgoing. // incoming or outgoing.
service.Add(serviceFunc(service.connect)) service.Add(util.AsService(service.connect))
service.Add(serviceFunc(service.handle)) service.Add(util.AsService(service.handle))
service.Add(service.listenerSupervisor) service.Add(service.listenerSupervisor)
return service return service
} }
func (s *service) handle() { func (s *service) handle(stop chan struct{}) {
next: var c internalConn
for c := range s.conns { for {
select {
case <-stop:
return
case c = <-s.conns:
}
cs := c.ConnectionState() cs := c.ConnectionState()
// We should have negotiated the next level protocol "bep/1.0" as part // We should have negotiated the next level protocol "bep/1.0" as part
@ -298,7 +304,7 @@ next:
// config. Warn instead of Info. // config. Warn instead of Info.
l.Warnf("Bad certificate from %s at %s: %v", remoteID, c, err) l.Warnf("Bad certificate from %s at %s: %v", remoteID, c, err)
c.Close() c.Close()
continue next continue
} }
// Wrap the connection in rate limiters. The limiter itself will // Wrap the connection in rate limiters. The limiter itself will
@ -313,11 +319,11 @@ next:
l.Infof("Established secure connection to %s at %s", remoteID, c) l.Infof("Established secure connection to %s at %s", remoteID, c)
s.model.AddConnection(modelConn, hello) s.model.AddConnection(modelConn, hello)
continue next continue
} }
} }
func (s *service) connect() { func (s *service) connect(stop chan struct{}) {
nextDial := make(map[string]time.Time) nextDial := make(map[string]time.Time)
// Used as delay for the first few connection attempts, increases // Used as delay for the first few connection attempts, increases
@ -465,11 +471,16 @@ func (s *service) connect() {
if initialRampup < sleep { if initialRampup < sleep {
l.Debugln("initial rampup; sleep", initialRampup, "and update to", initialRampup*2) l.Debugln("initial rampup; sleep", initialRampup, "and update to", initialRampup*2)
time.Sleep(initialRampup) sleep = initialRampup
initialRampup *= 2 initialRampup *= 2
} else { } else {
l.Debugln("sleep until next dial", sleep) l.Debugln("sleep until next dial", sleep)
time.Sleep(sleep) }
select {
case <-time.After(sleep):
case <-stop:
return
} }
} }
} }

View File

@ -191,13 +191,6 @@ type Model interface {
GetHello(protocol.DeviceID) protocol.HelloIntf GetHello(protocol.DeviceID) protocol.HelloIntf
} }
// serviceFunc wraps a function to create a suture.Service without stop
// functionality.
type serviceFunc func()
func (f serviceFunc) Serve() { f() }
func (f serviceFunc) Stop() {}
type onAddressesChangedNotifier struct { type onAddressesChangedNotifier struct {
callbacks []func(genericListener) callbacks []func(genericListener)
} }

View File

@ -16,6 +16,7 @@ import (
"github.com/syncthing/syncthing/lib/config" "github.com/syncthing/syncthing/lib/config"
"github.com/syncthing/syncthing/lib/dialer" "github.com/syncthing/syncthing/lib/dialer"
"github.com/syncthing/syncthing/lib/nat" "github.com/syncthing/syncthing/lib/nat"
"github.com/syncthing/syncthing/lib/util"
) )
func init() { func init() {
@ -26,43 +27,32 @@ func init() {
} }
type tcpListener struct { type tcpListener struct {
util.ServiceWithError
onAddressesChangedNotifier onAddressesChangedNotifier
uri *url.URL uri *url.URL
cfg config.Wrapper cfg config.Wrapper
tlsCfg *tls.Config tlsCfg *tls.Config
stop chan struct{}
conns chan internalConn conns chan internalConn
factory listenerFactory factory listenerFactory
natService *nat.Service natService *nat.Service
mapping *nat.Mapping mapping *nat.Mapping
err error
mut sync.RWMutex mut sync.RWMutex
} }
func (t *tcpListener) Serve() { func (t *tcpListener) serve(stop chan struct{}) error {
t.mut.Lock()
t.err = nil
t.mut.Unlock()
tcaddr, err := net.ResolveTCPAddr(t.uri.Scheme, t.uri.Host) tcaddr, err := net.ResolveTCPAddr(t.uri.Scheme, t.uri.Host)
if err != nil { if err != nil {
t.mut.Lock()
t.err = err
t.mut.Unlock()
l.Infoln("Listen (BEP/tcp):", err) l.Infoln("Listen (BEP/tcp):", err)
return return err
} }
listener, err := net.ListenTCP(t.uri.Scheme, tcaddr) listener, err := net.ListenTCP(t.uri.Scheme, tcaddr)
if err != nil { if err != nil {
t.mut.Lock()
t.err = err
t.mut.Unlock()
l.Infoln("Listen (BEP/tcp):", err) l.Infoln("Listen (BEP/tcp):", err)
return return err
} }
defer listener.Close() defer listener.Close()
@ -86,14 +76,14 @@ func (t *tcpListener) Serve() {
listener.SetDeadline(time.Now().Add(time.Second)) listener.SetDeadline(time.Now().Add(time.Second))
conn, err := listener.Accept() conn, err := listener.Accept()
select { select {
case <-t.stop: case <-stop:
if err == nil { if err == nil {
conn.Close() conn.Close()
} }
t.mut.Lock() t.mut.Lock()
t.mapping = nil t.mapping = nil
t.mut.Unlock() t.mut.Unlock()
return return nil
default: default:
} }
if err != nil { if err != nil {
@ -104,7 +94,7 @@ func (t *tcpListener) Serve() {
if acceptFailures > maxAcceptFailures { if acceptFailures > maxAcceptFailures {
// Return to restart the listener, because something // Return to restart the listener, because something
// seems permanently damaged. // seems permanently damaged.
return return err
} }
// Slightly increased delay for each failure. // Slightly increased delay for each failure.
@ -137,10 +127,6 @@ func (t *tcpListener) Serve() {
} }
} }
func (t *tcpListener) Stop() {
close(t.stop)
}
func (t *tcpListener) URI() *url.URL { func (t *tcpListener) URI() *url.URL {
return t.uri return t.uri
} }
@ -174,13 +160,6 @@ func (t *tcpListener) LANAddresses() []*url.URL {
return []*url.URL{t.uri} return []*url.URL{t.uri}
} }
func (t *tcpListener) Error() error {
t.mut.RLock()
err := t.err
t.mut.RUnlock()
return err
}
func (t *tcpListener) String() string { func (t *tcpListener) String() string {
return t.uri.String() return t.uri.String()
} }
@ -196,15 +175,16 @@ func (t *tcpListener) NATType() string {
type tcpListenerFactory struct{} type tcpListenerFactory struct{}
func (f *tcpListenerFactory) New(uri *url.URL, cfg config.Wrapper, tlsCfg *tls.Config, conns chan internalConn, natService *nat.Service) genericListener { func (f *tcpListenerFactory) New(uri *url.URL, cfg config.Wrapper, tlsCfg *tls.Config, conns chan internalConn, natService *nat.Service) genericListener {
return &tcpListener{ l := &tcpListener{
uri: fixupPort(uri, config.DefaultTCPPort), uri: fixupPort(uri, config.DefaultTCPPort),
cfg: cfg, cfg: cfg,
tlsCfg: tlsCfg, tlsCfg: tlsCfg,
conns: conns, conns: conns,
natService: natService, natService: natService,
stop: make(chan struct{}),
factory: f, factory: f,
} }
l.ServiceWithError = util.AsServiceWithError(l.serve)
return l
} }
func (tcpListenerFactory) Valid(_ config.Configuration) error { func (tcpListenerFactory) Valid(_ config.Configuration) error {

View File

@ -19,19 +19,22 @@ import (
stdsync "sync" stdsync "sync"
"time" "time"
"github.com/thejerf/suture"
"github.com/syncthing/syncthing/lib/dialer" "github.com/syncthing/syncthing/lib/dialer"
"github.com/syncthing/syncthing/lib/events" "github.com/syncthing/syncthing/lib/events"
"github.com/syncthing/syncthing/lib/protocol" "github.com/syncthing/syncthing/lib/protocol"
"github.com/syncthing/syncthing/lib/util"
) )
type globalClient struct { type globalClient struct {
suture.Service
server string server string
addrList AddressLister addrList AddressLister
announceClient httpClient announceClient httpClient
queryClient httpClient queryClient httpClient
noAnnounce bool noAnnounce bool
noLookup bool noLookup bool
stop chan struct{}
errorHolder errorHolder
} }
@ -122,8 +125,8 @@ func NewGlobal(server string, cert tls.Certificate, addrList AddressLister) (Fin
queryClient: queryClient, queryClient: queryClient,
noAnnounce: opts.noAnnounce, noAnnounce: opts.noAnnounce,
noLookup: opts.noLookup, noLookup: opts.noLookup,
stop: make(chan struct{}),
} }
cl.Service = util.AsService(cl.serve)
if !opts.noAnnounce { if !opts.noAnnounce {
// If we are supposed to annonce, it's an error until we've done so. // If we are supposed to annonce, it's an error until we've done so.
cl.setError(errors.New("not announced")) cl.setError(errors.New("not announced"))
@ -183,11 +186,11 @@ func (c *globalClient) String() string {
return "global@" + c.server return "global@" + c.server
} }
func (c *globalClient) Serve() { func (c *globalClient) serve(stop chan struct{}) {
if c.noAnnounce { if c.noAnnounce {
// We're configured to not do announcements, only lookups. To maintain // We're configured to not do announcements, only lookups. To maintain
// the same interface, we just pause here if Serve() is run. // the same interface, we just pause here if Serve() is run.
<-c.stop <-stop
return return
} }
@ -207,7 +210,7 @@ func (c *globalClient) Serve() {
case <-timer.C: case <-timer.C:
c.sendAnnouncement(timer) c.sendAnnouncement(timer)
case <-c.stop: case <-stop:
return return
} }
} }
@ -276,10 +279,6 @@ func (c *globalClient) sendAnnouncement(timer *time.Timer) {
timer.Reset(defaultReannounceInterval) timer.Reset(defaultReannounceInterval)
} }
func (c *globalClient) Stop() {
close(c.stop)
}
func (c *globalClient) Cache() map[protocol.DeviceID]CacheEntry { func (c *globalClient) Cache() map[protocol.DeviceID]CacheEntry {
// The globalClient doesn't do caching // The globalClient doesn't do caching
return nil return nil

View File

@ -28,6 +28,8 @@ import (
"github.com/syncthing/syncthing/lib/stats" "github.com/syncthing/syncthing/lib/stats"
"github.com/syncthing/syncthing/lib/sync" "github.com/syncthing/syncthing/lib/sync"
"github.com/syncthing/syncthing/lib/watchaggregator" "github.com/syncthing/syncthing/lib/watchaggregator"
"github.com/thejerf/suture"
) )
// scanLimiter limits the number of concurrent scans. A limit of zero means no limit. // scanLimiter limits the number of concurrent scans. A limit of zero means no limit.
@ -36,6 +38,7 @@ var scanLimiter = newByteSemaphore(0)
var errWatchNotStarted = errors.New("not started") var errWatchNotStarted = errors.New("not started")
type folder struct { type folder struct {
suture.Service
stateTracker stateTracker
config.FolderConfiguration config.FolderConfiguration
*stats.FolderStatisticsReference *stats.FolderStatisticsReference
@ -54,7 +57,6 @@ type folder struct {
scanNow chan rescanRequest scanNow chan rescanRequest
scanDelay chan time.Duration scanDelay chan time.Duration
initialScanFinished chan struct{} initialScanFinished chan struct{}
stopped chan struct{}
scanErrors []FileError scanErrors []FileError
scanErrorsMut sync.Mutex scanErrorsMut sync.Mutex
@ -98,7 +100,6 @@ func newFolder(model *model, fset *db.FileSet, ignores *ignore.Matcher, cfg conf
scanNow: make(chan rescanRequest), scanNow: make(chan rescanRequest),
scanDelay: make(chan time.Duration), scanDelay: make(chan time.Duration),
initialScanFinished: make(chan struct{}), initialScanFinished: make(chan struct{}),
stopped: make(chan struct{}),
scanErrorsMut: sync.NewMutex(), scanErrorsMut: sync.NewMutex(),
pullScheduled: make(chan struct{}, 1), // This needs to be 1-buffered so that we queue a pull if we're busy when it comes. pullScheduled: make(chan struct{}, 1), // This needs to be 1-buffered so that we queue a pull if we're busy when it comes.
@ -109,7 +110,7 @@ func newFolder(model *model, fset *db.FileSet, ignores *ignore.Matcher, cfg conf
} }
} }
func (f *folder) Serve() { func (f *folder) serve(_ chan struct{}) {
atomic.AddInt32(&f.model.foldersRunning, 1) atomic.AddInt32(&f.model.foldersRunning, 1)
defer atomic.AddInt32(&f.model.foldersRunning, -1) defer atomic.AddInt32(&f.model.foldersRunning, -1)
@ -119,7 +120,6 @@ func (f *folder) Serve() {
defer func() { defer func() {
f.scanTimer.Stop() f.scanTimer.Stop()
f.setState(FolderIdle) f.setState(FolderIdle)
close(f.stopped)
}() }()
pause := f.basePause() pause := f.basePause()
@ -256,7 +256,7 @@ func (f *folder) Delay(next time.Duration) {
func (f *folder) Stop() { func (f *folder) Stop() {
f.cancel() f.cancel()
<-f.stopped f.Service.Stop()
} }
// CheckHealth checks the folder for common errors, updates the folder state // CheckHealth checks the folder for common errors, updates the folder state

View File

@ -12,6 +12,7 @@ import (
"github.com/syncthing/syncthing/lib/fs" "github.com/syncthing/syncthing/lib/fs"
"github.com/syncthing/syncthing/lib/ignore" "github.com/syncthing/syncthing/lib/ignore"
"github.com/syncthing/syncthing/lib/protocol" "github.com/syncthing/syncthing/lib/protocol"
"github.com/syncthing/syncthing/lib/util"
"github.com/syncthing/syncthing/lib/versioner" "github.com/syncthing/syncthing/lib/versioner"
) )
@ -28,6 +29,7 @@ func newSendOnlyFolder(model *model, fset *db.FileSet, ignores *ignore.Matcher,
folder: newFolder(model, fset, ignores, cfg), folder: newFolder(model, fset, ignores, cfg),
} }
f.folder.puller = f f.folder.puller = f
f.folder.Service = util.AsService(f.serve)
return f return f
} }

View File

@ -28,6 +28,7 @@ import (
"github.com/syncthing/syncthing/lib/scanner" "github.com/syncthing/syncthing/lib/scanner"
"github.com/syncthing/syncthing/lib/sha256" "github.com/syncthing/syncthing/lib/sha256"
"github.com/syncthing/syncthing/lib/sync" "github.com/syncthing/syncthing/lib/sync"
"github.com/syncthing/syncthing/lib/util"
"github.com/syncthing/syncthing/lib/versioner" "github.com/syncthing/syncthing/lib/versioner"
"github.com/syncthing/syncthing/lib/weakhash" "github.com/syncthing/syncthing/lib/weakhash"
) )
@ -116,6 +117,7 @@ func newSendReceiveFolder(model *model, fset *db.FileSet, ignores *ignore.Matche
pullErrorsMut: sync.NewMutex(), pullErrorsMut: sync.NewMutex(),
} }
f.folder.puller = f f.folder.puller = f
f.folder.Service = util.AsService(f.serve)
if f.Copiers == 0 { if f.Copiers == 0 {
f.Copiers = defaultCopiers f.Copiers = defaultCopiers

View File

@ -11,11 +11,13 @@ import (
"strings" "strings"
"time" "time"
"github.com/thejerf/suture"
"github.com/syncthing/syncthing/lib/config" "github.com/syncthing/syncthing/lib/config"
"github.com/syncthing/syncthing/lib/events" "github.com/syncthing/syncthing/lib/events"
"github.com/syncthing/syncthing/lib/protocol" "github.com/syncthing/syncthing/lib/protocol"
"github.com/syncthing/syncthing/lib/sync" "github.com/syncthing/syncthing/lib/sync"
"github.com/thejerf/suture" "github.com/syncthing/syncthing/lib/util"
) )
const minSummaryInterval = time.Minute const minSummaryInterval = time.Minute
@ -34,7 +36,6 @@ type folderSummaryService struct {
cfg config.Wrapper cfg config.Wrapper
model Model model Model
id protocol.DeviceID id protocol.DeviceID
stop chan struct{}
immediate chan string immediate chan string
// For keeping track of folders to recalculate for // For keeping track of folders to recalculate for
@ -54,24 +55,18 @@ func NewFolderSummaryService(cfg config.Wrapper, m Model, id protocol.DeviceID)
cfg: cfg, cfg: cfg,
model: m, model: m,
id: id, id: id,
stop: make(chan struct{}),
immediate: make(chan string), immediate: make(chan string),
folders: make(map[string]struct{}), folders: make(map[string]struct{}),
foldersMut: sync.NewMutex(), foldersMut: sync.NewMutex(),
lastEventReqMut: sync.NewMutex(), lastEventReqMut: sync.NewMutex(),
} }
service.Add(serviceFunc(service.listenForUpdates)) service.Add(util.AsService(service.listenForUpdates))
service.Add(serviceFunc(service.calculateSummaries)) service.Add(util.AsService(service.calculateSummaries))
return service return service
} }
func (c *folderSummaryService) Stop() {
c.Supervisor.Stop()
close(c.stop)
}
func (c *folderSummaryService) String() string { func (c *folderSummaryService) String() string {
return fmt.Sprintf("FolderSummaryService@%p", c) return fmt.Sprintf("FolderSummaryService@%p", c)
} }
@ -148,7 +143,7 @@ func (c *folderSummaryService) OnEventRequest() {
// listenForUpdates subscribes to the event bus and makes note of folders that // listenForUpdates subscribes to the event bus and makes note of folders that
// need their data recalculated. // need their data recalculated.
func (c *folderSummaryService) listenForUpdates() { func (c *folderSummaryService) listenForUpdates(stop chan struct{}) {
sub := events.Default.Subscribe(events.LocalIndexUpdated | events.RemoteIndexUpdated | events.StateChanged | events.RemoteDownloadProgress | events.DeviceConnected | events.FolderWatchStateChanged | events.DownloadProgress) sub := events.Default.Subscribe(events.LocalIndexUpdated | events.RemoteIndexUpdated | events.StateChanged | events.RemoteDownloadProgress | events.DeviceConnected | events.FolderWatchStateChanged | events.DownloadProgress)
defer events.Default.Unsubscribe(sub) defer events.Default.Unsubscribe(sub)
@ -158,7 +153,7 @@ func (c *folderSummaryService) listenForUpdates() {
select { select {
case ev := <-sub.C(): case ev := <-sub.C():
c.processUpdate(ev) c.processUpdate(ev)
case <-c.stop: case <-stop:
return return
} }
} }
@ -237,7 +232,7 @@ func (c *folderSummaryService) processUpdate(ev events.Event) {
// calculateSummaries periodically recalculates folder summaries and // calculateSummaries periodically recalculates folder summaries and
// completion percentage, and sends the results on the event bus. // completion percentage, and sends the results on the event bus.
func (c *folderSummaryService) calculateSummaries() { func (c *folderSummaryService) calculateSummaries(stop chan struct{}) {
const pumpInterval = 2 * time.Second const pumpInterval = 2 * time.Second
pump := time.NewTimer(pumpInterval) pump := time.NewTimer(pumpInterval)
@ -258,7 +253,7 @@ func (c *folderSummaryService) calculateSummaries() {
case folder := <-c.immediate: case folder := <-c.immediate:
c.sendSummary(folder) c.sendSummary(folder)
case <-c.stop: case <-stop:
return return
} }
} }
@ -319,10 +314,3 @@ func (c *folderSummaryService) sendSummary(folder string) {
events.Default.Log(events.FolderCompletion, comp) events.Default.Log(events.FolderCompletion, comp)
} }
} }
// serviceFunc wraps a function to create a suture.Service without stop
// functionality.
type serviceFunc func()
func (f serviceFunc) Serve() { f() }
func (f serviceFunc) Stop() {}

View File

@ -32,6 +32,7 @@ import (
"github.com/syncthing/syncthing/lib/stats" "github.com/syncthing/syncthing/lib/stats"
"github.com/syncthing/syncthing/lib/sync" "github.com/syncthing/syncthing/lib/sync"
"github.com/syncthing/syncthing/lib/upgrade" "github.com/syncthing/syncthing/lib/upgrade"
"github.com/syncthing/syncthing/lib/util"
"github.com/syncthing/syncthing/lib/versioner" "github.com/syncthing/syncthing/lib/versioner"
"github.com/thejerf/suture" "github.com/thejerf/suture"
) )
@ -1169,19 +1170,19 @@ func (m *model) ClusterConfig(deviceID protocol.DeviceID, cm protocol.ClusterCon
} }
} }
// The token isn't tracked as the service stops when the connection is := &indexSender{
// terminates and is automatically removed from supervisor (by
// implementing suture.IsCompletable).
m.Add(&indexSender{
conn: conn, conn: conn,
connClosed: closed, connClosed: closed,
folder: folder.ID, folder: folder.ID,
fset: fs, fset: fs,
prevSequence: startSequence, prevSequence: startSequence,
dropSymlinks: dropSymlinks, dropSymlinks: dropSymlinks,
stop: make(chan struct{}), }
stopped: make(chan struct{}), is.Service = util.AsService(is.serve)
}) // The token isn't tracked as the service stops when the connection
// terminates and is automatically removed from supervisor (by
// implementing suture.IsCompletable).
m.Add(is)
} }
m.pmut.Lock() m.pmut.Lock()
@ -1896,6 +1897,7 @@ func (m *model) deviceWasSeen(deviceID protocol.DeviceID) {
} }
type indexSender struct { type indexSender struct {
suture.Service
conn protocol.Connection conn protocol.Connection
folder string folder string
dev string dev string
@ -1903,13 +1905,9 @@ type indexSender struct {
prevSequence int64 prevSequence int64
dropSymlinks bool dropSymlinks bool
connClosed chan struct{} connClosed chan struct{}
stop chan struct{}
stopped chan struct{}
} }
func (s *indexSender) Serve() { func (s *indexSender) serve(stop chan struct{}) {
defer close(s.stopped)
var err error var err error
l.Debugf("Starting indexSender for %s to %s at %s (slv=%d)", s.folder, s.dev, s.conn, s.prevSequence) l.Debugf("Starting indexSender for %s to %s at %s (slv=%d)", s.folder, s.dev, s.conn, s.prevSequence)
@ -1930,7 +1928,7 @@ func (s *indexSender) Serve() {
for err == nil { for err == nil {
select { select {
case <-s.stop: case <-stop:
return return
case <-s.connClosed: case <-s.connClosed:
return return
@ -1943,7 +1941,7 @@ func (s *indexSender) Serve() {
// sending for. // sending for.
if s.fset.Sequence(protocol.LocalDeviceID) <= s.prevSequence { if s.fset.Sequence(protocol.LocalDeviceID) <= s.prevSequence {
select { select {
case <-s.stop: case <-stop:
return return
case <-s.connClosed: case <-s.connClosed:
return return
@ -1963,11 +1961,6 @@ func (s *indexSender) Serve() {
} }
} }
func (s *indexSender) Stop() {
close(s.stop)
<-s.stopped
}
// Complete implements the suture.IsCompletable interface. When Serve terminates // Complete implements the suture.IsCompletable interface. When Serve terminates
// before Stop is called, the supervisor will check for this method and if it // before Stop is called, the supervisor will check for this method and if it
// returns true removes the service instead of restarting it. Here it always // returns true removes the service instead of restarting it. Here it always

View File

@ -10,13 +10,18 @@ import (
"fmt" "fmt"
"time" "time"
"github.com/thejerf/suture"
"github.com/syncthing/syncthing/lib/config" "github.com/syncthing/syncthing/lib/config"
"github.com/syncthing/syncthing/lib/events" "github.com/syncthing/syncthing/lib/events"
"github.com/syncthing/syncthing/lib/protocol" "github.com/syncthing/syncthing/lib/protocol"
"github.com/syncthing/syncthing/lib/sync" "github.com/syncthing/syncthing/lib/sync"
"github.com/syncthing/syncthing/lib/util"
) )
type ProgressEmitter struct { type ProgressEmitter struct {
suture.Service
registry map[string]map[string]*sharedPullerState // folder: name: puller registry map[string]map[string]*sharedPullerState // folder: name: puller
interval time.Duration interval time.Duration
minBlocks int minBlocks int
@ -27,15 +32,12 @@ type ProgressEmitter struct {
mut sync.Mutex mut sync.Mutex
timer *time.Timer timer *time.Timer
stop chan struct{}
} }
// NewProgressEmitter creates a new progress emitter which emits // NewProgressEmitter creates a new progress emitter which emits
// DownloadProgress events every interval. // DownloadProgress events every interval.
func NewProgressEmitter(cfg config.Wrapper) *ProgressEmitter { func NewProgressEmitter(cfg config.Wrapper) *ProgressEmitter {
t := &ProgressEmitter{ t := &ProgressEmitter{
stop: make(chan struct{}),
registry: make(map[string]map[string]*sharedPullerState), registry: make(map[string]map[string]*sharedPullerState),
timer: time.NewTimer(time.Millisecond), timer: time.NewTimer(time.Millisecond),
sentDownloadStates: make(map[protocol.DeviceID]*sentDownloadState), sentDownloadStates: make(map[protocol.DeviceID]*sentDownloadState),
@ -43,6 +45,7 @@ func NewProgressEmitter(cfg config.Wrapper) *ProgressEmitter {
foldersByConns: make(map[protocol.DeviceID][]string), foldersByConns: make(map[protocol.DeviceID][]string),
mut: sync.NewMutex(), mut: sync.NewMutex(),
} }
t.Service = util.AsService(t.serve)
t.CommitConfiguration(config.Configuration{}, cfg.RawCopy()) t.CommitConfiguration(config.Configuration{}, cfg.RawCopy())
cfg.Subscribe(t) cfg.Subscribe(t)
@ -50,14 +53,14 @@ func NewProgressEmitter(cfg config.Wrapper) *ProgressEmitter {
return t return t
} }
// Serve starts the progress emitter which starts emitting DownloadProgress // serve starts the progress emitter which starts emitting DownloadProgress
// events as the progress happens. // events as the progress happens.
func (t *ProgressEmitter) Serve() { func (t *ProgressEmitter) serve(stop chan struct{}) {
var lastUpdate time.Time var lastUpdate time.Time
var lastCount, newCount int var lastCount, newCount int
for { for {
select { select {
case <-t.stop: case <-stop:
l.Debugln("progress emitter: stopping") l.Debugln("progress emitter: stopping")
return return
case <-t.timer.C: case <-t.timer.C:
@ -212,11 +215,6 @@ func (t *ProgressEmitter) CommitConfiguration(from, to config.Configuration) boo
return true return true
} }
// Stop stops the emitter.
func (t *ProgressEmitter) Stop() {
t.stop <- struct{}{}
}
// Register a puller with the emitter which will start broadcasting pullers // Register a puller with the emitter which will start broadcasting pullers
// progress. // progress.
func (t *ProgressEmitter) Register(s *sharedPullerState) { func (t *ProgressEmitter) Register(s *sharedPullerState) {

View File

@ -9,6 +9,10 @@ import (
"time" "time"
"github.com/syncthing/syncthing/lib/relay/protocol" "github.com/syncthing/syncthing/lib/relay/protocol"
"github.com/syncthing/syncthing/lib/sync"
"github.com/syncthing/syncthing/lib/util"
"github.com/thejerf/suture"
) )
type relayClientFactory func(uri *url.URL, certs []tls.Certificate, invitations chan protocol.SessionInvitation, timeout time.Duration) RelayClient type relayClientFactory func(uri *url.URL, certs []tls.Certificate, invitations chan protocol.SessionInvitation, timeout time.Duration) RelayClient
@ -22,8 +26,7 @@ var (
) )
type RelayClient interface { type RelayClient interface {
Serve() suture.Service
Stop()
Error() error Error() error
Latency() time.Duration Latency() time.Duration
String() string String() string
@ -39,3 +42,42 @@ func NewClient(uri *url.URL, certs []tls.Certificate, invitations chan protocol.
return factory(uri, certs, invitations, timeout), nil return factory(uri, certs, invitations, timeout), nil
} }
type commonClient struct {
util.ServiceWithError
invitations chan protocol.SessionInvitation
closeInvitationsOnFinish bool
mut sync.RWMutex
}
func newCommonClient(invitations chan protocol.SessionInvitation, serve func(chan struct{}) error) commonClient {
c := commonClient{
invitations: invitations,
mut: sync.NewRWMutex(),
}
newServe := func(stop chan struct{}) error {
defer c.cleanup()
return serve(stop)
}
c.ServiceWithError = util.AsServiceWithError(newServe)
if c.invitations == nil {
c.closeInvitationsOnFinish = true
c.invitations = make(chan protocol.SessionInvitation)
}
return c
}
func (c *commonClient) cleanup() {
c.mut.Lock()
if c.closeInvitationsOnFinish {
close(c.invitations)
}
c.mut.Unlock()
}
func (c *commonClient) Invitations() chan protocol.SessionInvitation {
c.mut.RLock()
defer c.mut.RUnlock()
return c.invitations
}

View File

@ -14,45 +14,29 @@ import (
"github.com/syncthing/syncthing/lib/osutil" "github.com/syncthing/syncthing/lib/osutil"
"github.com/syncthing/syncthing/lib/rand" "github.com/syncthing/syncthing/lib/rand"
"github.com/syncthing/syncthing/lib/relay/protocol" "github.com/syncthing/syncthing/lib/relay/protocol"
"github.com/syncthing/syncthing/lib/sync"
) )
type dynamicClient struct { type dynamicClient struct {
commonClient
pooladdr *url.URL pooladdr *url.URL
certs []tls.Certificate certs []tls.Certificate
invitations chan protocol.SessionInvitation
closeInvitationsOnFinish bool
timeout time.Duration timeout time.Duration
mut sync.RWMutex
err error
client RelayClient client RelayClient
stop chan struct{}
} }
func newDynamicClient(uri *url.URL, certs []tls.Certificate, invitations chan protocol.SessionInvitation, timeout time.Duration) RelayClient { func newDynamicClient(uri *url.URL, certs []tls.Certificate, invitations chan protocol.SessionInvitation, timeout time.Duration) RelayClient {
closeInvitationsOnFinish := false c := &dynamicClient{
if invitations == nil {
closeInvitationsOnFinish = true
invitations = make(chan protocol.SessionInvitation)
}
return &dynamicClient{
pooladdr: uri, pooladdr: uri,
certs: certs, certs: certs,
invitations: invitations,
closeInvitationsOnFinish: closeInvitationsOnFinish,
timeout: timeout, timeout: timeout,
mut: sync.NewRWMutex(),
} }
c.commonClient = newCommonClient(invitations, c.serve)
return c
} }
func (c *dynamicClient) Serve() { func (c *dynamicClient) serve(stop chan struct{}) error {
defer c.cleanup()
c.mut.Lock()
c.stop = make(chan struct{})
c.mut.Unlock()
uri := *c.pooladdr uri := *c.pooladdr
// Trim off the `dynamic+` prefix // Trim off the `dynamic+` prefix
@ -63,8 +47,7 @@ func (c *dynamicClient) Serve() {
data, err := http.Get(uri.String()) data, err := http.Get(uri.String())
if err != nil { if err != nil {
l.Debugln(c, "failed to lookup dynamic relays", err) l.Debugln(c, "failed to lookup dynamic relays", err)
c.setError(err) return err
return
} }
var ann dynamicAnnouncement var ann dynamicAnnouncement
@ -72,8 +55,7 @@ func (c *dynamicClient) Serve() {
data.Body.Close() data.Body.Close()
if err != nil { if err != nil {
l.Debugln(c, "failed to lookup dynamic relays", err) l.Debugln(c, "failed to lookup dynamic relays", err)
c.setError(err) return err
return
} }
var addrs []string var addrs []string
@ -87,22 +69,26 @@ func (c *dynamicClient) Serve() {
addrs = append(addrs, ruri.String()) addrs = append(addrs, ruri.String())
} }
defer func() {
c.mut.RLock()
if c.client != nil {
c.client.Stop()
}
c.mut.RUnlock()
}()
for _, addr := range relayAddressesOrder(addrs) { for _, addr := range relayAddressesOrder(addrs) {
select { select {
case <-c.stop: case <-stop:
l.Debugln(c, "stopping") l.Debugln(c, "stopping")
c.setError(nil) return nil
return
default: default:
ruri, err := url.Parse(addr) ruri, err := url.Parse(addr)
if err != nil { if err != nil {
l.Debugln(c, "skipping relay", addr, err) l.Debugln(c, "skipping relay", addr, err)
continue continue
} }
client, err := NewClient(ruri, c.certs, c.invitations, c.timeout) client := newStaticClient(ruri, c.certs, c.invitations, c.timeout)
if err != nil {
continue
}
c.mut.Lock() c.mut.Lock()
c.client = client c.client = client
c.mut.Unlock() c.mut.Unlock()
@ -115,24 +101,14 @@ func (c *dynamicClient) Serve() {
} }
} }
l.Debugln(c, "could not find a connectable relay") l.Debugln(c, "could not find a connectable relay")
c.setError(fmt.Errorf("could not find a connectable relay")) return fmt.Errorf("could not find a connectable relay")
}
func (c *dynamicClient) Stop() {
c.mut.RLock()
defer c.mut.RUnlock()
close(c.stop)
if c.client == nil {
return
}
c.client.Stop()
} }
func (c *dynamicClient) Error() error { func (c *dynamicClient) Error() error {
c.mut.RLock() c.mut.RLock()
defer c.mut.RUnlock() defer c.mut.RUnlock()
if c.client == nil { if c.client == nil {
return c.err return c.Error()
} }
return c.client.Error() return c.client.Error()
} }
@ -159,28 +135,6 @@ func (c *dynamicClient) URI() *url.URL {
return c.client.URI() return c.client.URI()
} }
func (c *dynamicClient) Invitations() chan protocol.SessionInvitation {
c.mut.RLock()
inv := c.invitations
c.mut.RUnlock()
return inv
}
func (c *dynamicClient) cleanup() {
c.mut.Lock()
if c.closeInvitationsOnFinish {
close(c.invitations)
c.invitations = make(chan protocol.SessionInvitation)
}
c.mut.Unlock()
}
func (c *dynamicClient) setError(err error) {
c.mut.Lock()
c.err = err
c.mut.Unlock()
}
// This is the announcement received from the relay server; // This is the announcement received from the relay server;
// {"relays": [{"url": "relay://10.20.30.40:5060"}, ...]} // {"relays": [{"url": "relay://10.20.30.40:5060"}, ...]}
type dynamicAnnouncement struct { type dynamicAnnouncement struct {

View File

@ -12,88 +12,54 @@ import (
"github.com/syncthing/syncthing/lib/dialer" "github.com/syncthing/syncthing/lib/dialer"
syncthingprotocol "github.com/syncthing/syncthing/lib/protocol" syncthingprotocol "github.com/syncthing/syncthing/lib/protocol"
"github.com/syncthing/syncthing/lib/relay/protocol" "github.com/syncthing/syncthing/lib/relay/protocol"
"github.com/syncthing/syncthing/lib/sync"
) )
type staticClient struct { type staticClient struct {
uri *url.URL commonClient
invitations chan protocol.SessionInvitation
closeInvitationsOnFinish bool uri *url.URL
config *tls.Config config *tls.Config
messageTimeout time.Duration messageTimeout time.Duration
connectTimeout time.Duration connectTimeout time.Duration
stop chan struct{}
stopped chan struct{}
stopMut sync.RWMutex
conn *tls.Conn conn *tls.Conn
mut sync.RWMutex
err error
connected bool connected bool
latency time.Duration latency time.Duration
} }
func newStaticClient(uri *url.URL, certs []tls.Certificate, invitations chan protocol.SessionInvitation, timeout time.Duration) RelayClient { func newStaticClient(uri *url.URL, certs []tls.Certificate, invitations chan protocol.SessionInvitation, timeout time.Duration) RelayClient {
closeInvitationsOnFinish := false c := &staticClient{
if invitations == nil {
closeInvitationsOnFinish = true
invitations = make(chan protocol.SessionInvitation)
}
stopped := make(chan struct{})
close(stopped) // not yet started, don't block on Stop()
return &staticClient{
uri: uri, uri: uri,
invitations: invitations,
closeInvitationsOnFinish: closeInvitationsOnFinish,
config: configForCerts(certs), config: configForCerts(certs),
messageTimeout: time.Minute * 2, messageTimeout: time.Minute * 2,
connectTimeout: timeout, connectTimeout: timeout,
stop: make(chan struct{}),
stopped: stopped,
stopMut: sync.NewRWMutex(),
mut: sync.NewRWMutex(),
} }
c.commonClient = newCommonClient(invitations, c.serve)
return c
} }
func (c *staticClient) Serve() { func (c *staticClient) serve(stop chan struct{}) error {
defer c.cleanup()
c.stopMut.Lock()
c.stop = make(chan struct{})
c.stopped = make(chan struct{})
c.stopMut.Unlock()
defer close(c.stopped)
if err := c.connect(); err != nil { if err := c.connect(); err != nil {
l.Infof("Could not connect to relay %s: %s", c.uri, err) l.Infof("Could not connect to relay %s: %s", c.uri, err)
c.setError(err) return err
return
} }
l.Debugln(c, "connected", c.conn.RemoteAddr()) l.Debugln(c, "connected", c.conn.RemoteAddr())
defer c.disconnect()
if err := c.join(); err != nil { if err := c.join(); err != nil {
c.conn.Close()
l.Infof("Could not join relay %s: %s", c.uri, err) l.Infof("Could not join relay %s: %s", c.uri, err)
c.setError(err) return err
return
} }
if err := c.conn.SetDeadline(time.Time{}); err != nil { if err := c.conn.SetDeadline(time.Time{}); err != nil {
c.conn.Close()
l.Infoln("Relay set deadline:", err) l.Infoln("Relay set deadline:", err)
c.setError(err) return err
return
} }
l.Infof("Joined relay %s://%s", c.uri.Scheme, c.uri.Host) l.Infof("Joined relay %s://%s", c.uri.Scheme, c.uri.Host)
@ -106,12 +72,10 @@ func (c *staticClient) Serve() {
messages := make(chan interface{}) messages := make(chan interface{})
errors := make(chan error, 1) errors := make(chan error, 1)
go messageReader(c.conn, messages, errors) go messageReader(c.conn, messages, errors, stop)
timeout := time.NewTimer(c.messageTimeout) timeout := time.NewTimer(c.messageTimeout)
c.stopMut.RLock()
defer c.stopMut.RUnlock()
for { for {
select { select {
case message := <-messages: case message := <-messages:
@ -122,11 +86,9 @@ func (c *staticClient) Serve() {
case protocol.Ping: case protocol.Ping:
if err := protocol.WriteMessage(c.conn, protocol.Pong{}); err != nil { if err := protocol.WriteMessage(c.conn, protocol.Pong{}); err != nil {
l.Infoln("Relay write:", err) l.Infoln("Relay write:", err)
c.setError(err) return err
c.disconnect()
} else {
l.Debugln(c, "sent pong")
} }
l.Debugln(c, "sent pong")
case protocol.SessionInvitation: case protocol.SessionInvitation:
ip := net.IP(msg.Address) ip := net.IP(msg.Address)
@ -137,52 +99,28 @@ func (c *staticClient) Serve() {
case protocol.RelayFull: case protocol.RelayFull:
l.Infof("Disconnected from relay %s due to it becoming full.", c.uri) l.Infof("Disconnected from relay %s due to it becoming full.", c.uri)
c.setError(fmt.Errorf("Relay full")) return fmt.Errorf("relay full")
c.disconnect()
default: default:
l.Infoln("Relay: protocol error: unexpected message %v", msg) l.Infoln("Relay: protocol error: unexpected message %v", msg)
c.setError(fmt.Errorf("protocol error: unexpected message %v", msg)) return fmt.Errorf("protocol error: unexpected message %v", msg)
c.disconnect()
} }
case <-c.stop: case <-stop:
l.Debugln(c, "stopping") l.Debugln(c, "stopping")
c.setError(nil) return nil
c.disconnect()
// We always exit via this branch of the select, to make sure the
// the reader routine exits.
case err := <-errors: case err := <-errors:
close(errors)
close(messages)
c.mut.Lock()
if c.connected {
c.conn.Close()
c.connected = false
l.Infof("Disconnecting from relay %s due to error: %s", c.uri, err) l.Infof("Disconnecting from relay %s due to error: %s", c.uri, err)
c.err = err return err
} else {
c.err = nil
}
c.mut.Unlock()
return
case <-timeout.C: case <-timeout.C:
l.Debugln(c, "timed out") l.Debugln(c, "timed out")
c.disconnect() return fmt.Errorf("timed out")
c.setError(fmt.Errorf("timed out"))
} }
} }
} }
func (c *staticClient) Stop() {
c.stopMut.RLock()
close(c.stop)
<-c.stopped
c.stopMut.RUnlock()
}
func (c *staticClient) StatusOK() bool { func (c *staticClient) StatusOK() bool {
c.mut.RLock() c.mut.RLock()
con := c.connected con := c.connected
@ -205,22 +143,6 @@ func (c *staticClient) URI() *url.URL {
return c.uri return c.uri
} }
func (c *staticClient) Invitations() chan protocol.SessionInvitation {
c.mut.RLock()
inv := c.invitations
c.mut.RUnlock()
return inv
}
func (c *staticClient) cleanup() {
c.mut.Lock()
if c.closeInvitationsOnFinish {
close(c.invitations)
c.invitations = make(chan protocol.SessionInvitation)
}
c.mut.Unlock()
}
func (c *staticClient) connect() error { func (c *staticClient) connect() error {
if c.uri.Scheme != "relay" { if c.uri.Scheme != "relay" {
return fmt.Errorf("Unsupported relay schema: %v", c.uri.Scheme) return fmt.Errorf("Unsupported relay schema: %v", c.uri.Scheme)
@ -261,19 +183,6 @@ func (c *staticClient) disconnect() {
c.conn.Close() c.conn.Close()
} }
func (c *staticClient) setError(err error) {
c.mut.Lock()
c.err = err
c.mut.Unlock()
}
func (c *staticClient) Error() error {
c.mut.RLock()
err := c.err
c.mut.RUnlock()
return err
}
func (c *staticClient) join() error { func (c *staticClient) join() error {
if err := protocol.WriteMessage(c.conn, protocol.JoinRelayRequest{}); err != nil { if err := protocol.WriteMessage(c.conn, protocol.JoinRelayRequest{}); err != nil {
return err return err
@ -332,13 +241,17 @@ func performHandshakeAndValidation(conn *tls.Conn, uri *url.URL) error {
return nil return nil
} }
func messageReader(conn net.Conn, messages chan<- interface{}, errors chan<- error) { func messageReader(conn net.Conn, messages chan<- interface{}, errors chan<- error, stop chan struct{}) {
for { for {
msg, err := protocol.ReadMessage(conn) msg, err := protocol.ReadMessage(conn)
if err != nil { if err != nil {
errors <- err errors <- err
return return
} }
messages <- msg select {
case messages <- msg:
case <-stop:
return
}
} }
} }

View File

@ -13,7 +13,10 @@ import (
"github.com/AudriusButkevicius/pfilter" "github.com/AudriusButkevicius/pfilter"
"github.com/ccding/go-stun/stun" "github.com/ccding/go-stun/stun"
"github.com/thejerf/suture"
"github.com/syncthing/syncthing/lib/config" "github.com/syncthing/syncthing/lib/config"
"github.com/syncthing/syncthing/lib/util"
) )
const stunRetryInterval = 5 * time.Minute const stunRetryInterval = 5 * time.Minute
@ -56,6 +59,8 @@ type Subscriber interface {
} }
type Service struct { type Service struct {
suture.Service
name string name string
cfg config.Wrapper cfg config.Wrapper
subscriber Subscriber subscriber Subscriber
@ -66,8 +71,6 @@ type Service struct {
natType NATType natType NATType
addr *Host addr *Host
stop chan struct{}
} }
func New(cfg config.Wrapper, subscriber Subscriber, conn net.PacketConn) (*Service, net.PacketConn) { func New(cfg config.Wrapper, subscriber Subscriber, conn net.PacketConn) (*Service, net.PacketConn) {
@ -88,7 +91,7 @@ func New(cfg config.Wrapper, subscriber Subscriber, conn net.PacketConn) (*Servi
client.SetSoftwareName("") // Explicitly unset this, seems to freak some servers out. client.SetSoftwareName("") // Explicitly unset this, seems to freak some servers out.
// Return the service and the other conn to the client // Return the service and the other conn to the client
return &Service{ s := &Service{
name: "Stun@" + conn.LocalAddr().Network() + "://" + conn.LocalAddr().String(), name: "Stun@" + conn.LocalAddr().Network() + "://" + conn.LocalAddr().String(),
cfg: cfg, cfg: cfg,
@ -100,16 +103,17 @@ func New(cfg config.Wrapper, subscriber Subscriber, conn net.PacketConn) (*Servi
natType: NATUnknown, natType: NATUnknown,
addr: nil, addr: nil,
stop: make(chan struct{}), }
}, otherDataConn s.Service = util.AsService(s.serve)
return s, otherDataConn
} }
func (s *Service) Stop() { func (s *Service) Stop() {
close(s.stop) s.Service.Stop()
_ = s.stunConn.Close() _ = s.stunConn.Close()
} }
func (s *Service) Serve() { func (s *Service) serve(stop chan struct{}) {
for { for {
disabled: disabled:
s.setNATType(NATUnknown) s.setNATType(NATUnknown)
@ -117,7 +121,7 @@ func (s *Service) Serve() {
if s.cfg.Options().IsStunDisabled() { if s.cfg.Options().IsStunDisabled() {
select { select {
case <-s.stop: case <-stop:
return return
case <-time.After(time.Second): case <-time.After(time.Second):
continue continue
@ -130,12 +134,12 @@ func (s *Service) Serve() {
// This blocks until we hit an exit condition or there are issues with the STUN server. // This blocks until we hit an exit condition or there are issues with the STUN server.
// This returns a boolean signifying if a different STUN server should be tried (oppose to the whole thing // This returns a boolean signifying if a different STUN server should be tried (oppose to the whole thing
// shutting down and this winding itself down. // shutting down and this winding itself down.
if !s.runStunForServer(addr) { if !s.runStunForServer(addr, stop) {
// Check exit conditions. // Check exit conditions.
// Have we been asked to stop? // Have we been asked to stop?
select { select {
case <-s.stop: case <-stop:
return return
default: default:
} }
@ -163,7 +167,7 @@ func (s *Service) Serve() {
} }
} }
func (s *Service) runStunForServer(addr string) (tryNext bool) { func (s *Service) runStunForServer(addr string, stop chan struct{}) (tryNext bool) {
l.Debugf("Running stun for %s via %s", s, addr) l.Debugf("Running stun for %s via %s", s, addr)
// Resolve the address, so that in case the server advertises two // Resolve the address, so that in case the server advertises two
@ -201,10 +205,10 @@ func (s *Service) runStunForServer(addr string) (tryNext bool) {
return false return false
} }
return s.stunKeepAlive(addr, extAddr) return s.stunKeepAlive(addr, extAddr, stop)
} }
func (s *Service) stunKeepAlive(addr string, extAddr *Host) (tryNext bool) { func (s *Service) stunKeepAlive(addr string, extAddr *Host, stop chan struct{}) (tryNext bool) {
var err error var err error
nextSleep := time.Duration(s.cfg.Options().StunKeepaliveStartS) * time.Second nextSleep := time.Duration(s.cfg.Options().StunKeepaliveStartS) * time.Second
@ -247,7 +251,7 @@ func (s *Service) stunKeepAlive(addr string, extAddr *Host) (tryNext bool) {
select { select {
case <-time.After(sleepFor): case <-time.After(sleepFor):
case <-s.stop: case <-stop:
l.Debugf("%s stopping, aborting stun", s) l.Debugf("%s stopping, aborting stun", s)
return false return false
} }

View File

@ -17,7 +17,6 @@ import (
"runtime" "runtime"
"sort" "sort"
"strings" "strings"
"sync"
"time" "time"
"github.com/syncthing/syncthing/lib/build" "github.com/syncthing/syncthing/lib/build"
@ -28,6 +27,9 @@ import (
"github.com/syncthing/syncthing/lib/protocol" "github.com/syncthing/syncthing/lib/protocol"
"github.com/syncthing/syncthing/lib/scanner" "github.com/syncthing/syncthing/lib/scanner"
"github.com/syncthing/syncthing/lib/upgrade" "github.com/syncthing/syncthing/lib/upgrade"
"github.com/syncthing/syncthing/lib/util"
"github.com/thejerf/suture"
) )
// Current version number of the usage report, for acceptance purposes. If // Current version number of the usage report, for acceptance purposes. If
@ -38,14 +40,12 @@ const Version = 3
var StartTime = time.Now() var StartTime = time.Now()
type Service struct { type Service struct {
suture.Service
cfg config.Wrapper cfg config.Wrapper
model model.Model model model.Model
connectionsService connections.Service connectionsService connections.Service
noUpgrade bool noUpgrade bool
forceRun chan struct{} forceRun chan struct{}
stop chan struct{}
stopped chan struct{}
stopMut sync.RWMutex
} }
func New(cfg config.Wrapper, m model.Model, connectionsService connections.Service, noUpgrade bool) *Service { func New(cfg config.Wrapper, m model.Model, connectionsService connections.Service, noUpgrade bool) *Service {
@ -54,11 +54,9 @@ func New(cfg config.Wrapper, m model.Model, connectionsService connections.Servi
model: m, model: m,
connectionsService: connectionsService, connectionsService: connectionsService,
noUpgrade: noUpgrade, noUpgrade: noUpgrade,
forceRun: make(chan struct{}), forceRun: make(chan struct{}, 1), // Buffered to prevent locking
stop: make(chan struct{}),
stopped: make(chan struct{}),
} }
close(svc.stopped) // Not yet running, dont block on Stop() svc.Service = util.AsService(svc.serve)
cfg.Subscribe(svc) cfg.Subscribe(svc)
return svc return svc
} }
@ -385,20 +383,11 @@ func (s *Service) sendUsageReport() error {
return err return err
} }
func (s *Service) Serve() { func (s *Service) serve(stop chan struct{}) {
s.stopMut.Lock()
s.stop = make(chan struct{})
s.stopped = make(chan struct{})
s.stopMut.Unlock()
t := time.NewTimer(time.Duration(s.cfg.Options().URInitialDelayS) * time.Second) t := time.NewTimer(time.Duration(s.cfg.Options().URInitialDelayS) * time.Second)
s.stopMut.RLock()
defer func() {
close(s.stopped)
s.stopMut.RUnlock()
}()
for { for {
select { select {
case <-s.stop: case <-stop:
return return
case <-s.forceRun: case <-s.forceRun:
t.Reset(0) t.Reset(0)
@ -422,23 +411,16 @@ func (s *Service) VerifyConfiguration(from, to config.Configuration) error {
func (s *Service) CommitConfiguration(from, to config.Configuration) bool { func (s *Service) CommitConfiguration(from, to config.Configuration) bool {
if from.Options.URAccepted != to.Options.URAccepted || from.Options.URUniqueID != to.Options.URUniqueID || from.Options.URURL != to.Options.URURL { if from.Options.URAccepted != to.Options.URAccepted || from.Options.URUniqueID != to.Options.URUniqueID || from.Options.URURL != to.Options.URURL {
s.stopMut.RLock()
select { select {
case s.forceRun <- struct{}{}: case s.forceRun <- struct{}{}:
case <-s.stop: default:
// s.forceRun is one buffered, so even though nothing
// was sent, a run will still happen after this point.
} }
s.stopMut.RUnlock()
} }
return true return true
} }
func (s *Service) Stop() {
s.stopMut.RLock()
close(s.stop)
<-s.stopped
s.stopMut.RUnlock()
}
func (*Service) String() string { func (*Service) String() string {
return "ur.Service" return "ur.Service"
} }

View File

@ -12,6 +12,10 @@ import (
"reflect" "reflect"
"strconv" "strconv"
"strings" "strings"
"github.com/syncthing/syncthing/lib/sync"
"github.com/thejerf/suture"
) )
type defaultParser interface { type defaultParser interface {
@ -170,3 +174,73 @@ func Address(network, host string) string {
} }
return u.String() return u.String()
} }
// AsService wraps the given function to implement suture.Service by calling
// that function on serve and closing the passed channel when Stop is called.
func AsService(fn func(stop chan struct{})) suture.Service {
return AsServiceWithError(func(stop chan struct{}) error {
fn(stop)
return nil
})
}
type ServiceWithError interface {
suture.Service
Error() error
}
// AsServiceWithError does the same as AsService, except that it keeps track
// of an error returned by the given function.
func AsServiceWithError(fn func(stop chan struct{}) error) ServiceWithError {
s := &service{
serve: fn,
stop: make(chan struct{}),
stopped: make(chan struct{}),
mut: sync.NewMutex(),
}
close(s.stopped) // not yet started, don't block on Stop()
return s
}
type service struct {
serve func(stop chan struct{}) error
stop chan struct{}
stopped chan struct{}
err error
mut sync.Mutex
}
func (s *service) Serve() {
s.mut.Lock()
select {
case <-s.stop:
s.mut.Unlock()
return
default:
}
s.err = nil
s.stopped = make(chan struct{})
s.mut.Unlock()
var err error
defer func() {
s.mut.Lock()
s.err = err
close(s.stopped)
s.mut.Unlock()
}()
err = s.serve(s.stop)
}
func (s *service) Stop() {
s.mut.Lock()
close(s.stop)
s.mut.Unlock()
<-s.stopped
}
func (s *service) Error() error {
s.mut.Lock()
defer s.mut.Unlock()
return s.err
}

View File

@ -11,8 +11,11 @@ import (
"strconv" "strconv"
"time" "time"
"github.com/thejerf/suture"
"github.com/syncthing/syncthing/lib/fs" "github.com/syncthing/syncthing/lib/fs"
"github.com/syncthing/syncthing/lib/sync" "github.com/syncthing/syncthing/lib/sync"
"github.com/syncthing/syncthing/lib/util"
) )
func init() { func init() {
@ -26,13 +29,13 @@ type Interval struct {
} }
type Staggered struct { type Staggered struct {
suture.Service
cleanInterval int64 cleanInterval int64
folderFs fs.Filesystem folderFs fs.Filesystem
versionsFs fs.Filesystem versionsFs fs.Filesystem
interval [4]Interval interval [4]Interval
mutex sync.Mutex mutex sync.Mutex
stop chan struct{}
testCleanDone chan struct{} testCleanDone chan struct{}
} }
@ -61,14 +64,14 @@ func NewStaggered(folderID string, folderFs fs.Filesystem, params map[string]str
{604800, maxAge}, // next year -> 1 week between versions {604800, maxAge}, // next year -> 1 week between versions
}, },
mutex: sync.NewMutex(), mutex: sync.NewMutex(),
stop: make(chan struct{}),
} }
s.Service = util.AsService(s.serve)
l.Debugf("instantiated %#v", s) l.Debugf("instantiated %#v", s)
return s return s
} }
func (v *Staggered) Serve() { func (v *Staggered) serve(stop chan struct{}) {
v.clean() v.clean()
if v.testCleanDone != nil { if v.testCleanDone != nil {
close(v.testCleanDone) close(v.testCleanDone)
@ -80,16 +83,12 @@ func (v *Staggered) Serve() {
select { select {
case <-tck.C: case <-tck.C:
v.clean() v.clean()
case <-v.stop: case <-stop:
return return
} }
} }
} }
func (v *Staggered) Stop() {
close(v.stop)
}
func (v *Staggered) clean() { func (v *Staggered) clean() {
l.Debugln("Versioner clean: Waiting for lock on", v.versionsFs) l.Debugln("Versioner clean: Waiting for lock on", v.versionsFs)
v.mutex.Lock() v.mutex.Lock()

View File

@ -11,7 +11,10 @@ import (
"strconv" "strconv"
"time" "time"
"github.com/thejerf/suture"
"github.com/syncthing/syncthing/lib/fs" "github.com/syncthing/syncthing/lib/fs"
"github.com/syncthing/syncthing/lib/util"
) )
func init() { func init() {
@ -20,10 +23,10 @@ func init() {
} }
type Trashcan struct { type Trashcan struct {
suture.Service
folderFs fs.Filesystem folderFs fs.Filesystem
versionsFs fs.Filesystem versionsFs fs.Filesystem
cleanoutDays int cleanoutDays int
stop chan struct{}
} }
func NewTrashcan(folderID string, folderFs fs.Filesystem, params map[string]string) Versioner { func NewTrashcan(folderID string, folderFs fs.Filesystem, params map[string]string) Versioner {
@ -34,8 +37,8 @@ func NewTrashcan(folderID string, folderFs fs.Filesystem, params map[string]stri
folderFs: folderFs, folderFs: folderFs,
versionsFs: fsFromParams(folderFs, params), versionsFs: fsFromParams(folderFs, params),
cleanoutDays: cleanoutDays, cleanoutDays: cleanoutDays,
stop: make(chan struct{}),
} }
s.Service = util.AsService(s.serve)
l.Debugf("instantiated %#v", s) l.Debugf("instantiated %#v", s)
return s return s
@ -49,7 +52,7 @@ func (t *Trashcan) Archive(filePath string) error {
}) })
} }
func (t *Trashcan) Serve() { func (t *Trashcan) serve(stop chan struct{}) {
l.Debugln(t, "starting") l.Debugln(t, "starting")
defer l.Debugln(t, "stopping") defer l.Debugln(t, "stopping")
@ -59,7 +62,7 @@ func (t *Trashcan) Serve() {
for { for {
select { select {
case <-t.stop: case <-stop:
return return
case <-timer.C: case <-timer.C:
@ -75,10 +78,6 @@ func (t *Trashcan) Serve() {
} }
} }
func (t *Trashcan) Stop() {
close(t.stop)
}
func (t *Trashcan) String() string { func (t *Trashcan) String() string {
return fmt.Sprintf("trashcan@%p", t) return fmt.Sprintf("trashcan@%p", t)
} }