lib: Removal global connection registry (#8254)

This commit is contained in:
Simon Frei 2022-04-09 16:04:56 +02:00 committed by GitHub
parent e30898ddb3
commit b947056e62
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 118 additions and 94 deletions

View File

@ -84,7 +84,7 @@ func checkServers(deviceID protocol.DeviceID, servers ...string) {
} }
func checkServer(deviceID protocol.DeviceID, server string) checkResult { func checkServer(deviceID protocol.DeviceID, server string) checkResult {
disco, err := discover.NewGlobal(server, tls.Certificate{}, nil, events.NoopLogger) disco, err := discover.NewGlobal(server, tls.Certificate{}, nil, events.NoopLogger, nil)
if err != nil { if err != nil {
return checkResult{error: err} return checkResult{error: err}
} }

View File

@ -21,6 +21,7 @@ import (
"github.com/thejerf/suture/v4" "github.com/thejerf/suture/v4"
"github.com/syncthing/syncthing/lib/config" "github.com/syncthing/syncthing/lib/config"
"github.com/syncthing/syncthing/lib/connections/registry"
"github.com/syncthing/syncthing/lib/events" "github.com/syncthing/syncthing/lib/events"
"github.com/syncthing/syncthing/lib/nat" "github.com/syncthing/syncthing/lib/nat"
"github.com/syncthing/syncthing/lib/protocol" "github.com/syncthing/syncthing/lib/protocol"
@ -414,7 +415,7 @@ func withConnectionPair(b *testing.B, connUri string, h func(client, server inte
} }
natSvc := nat.NewService(deviceId, wcfg) natSvc := nat.NewService(deviceId, wcfg)
conns := make(chan internalConn, 1) conns := make(chan internalConn, 1)
listenSvc := lf.New(uri, wcfg, tlsCfg, conns, natSvc) listenSvc := lf.New(uri, wcfg, tlsCfg, conns, natSvc, registry.New())
supervisor.Add(listenSvc) supervisor.Add(listenSvc)
var addr *url.URL var addr *url.URL
@ -433,7 +434,8 @@ func withConnectionPair(b *testing.B, connUri string, h func(client, server inte
if err != nil { if err != nil {
b.Fatal(err) b.Fatal(err)
} }
dialer := df.New(cfg.Options, tlsCfg) // Purposely using a different registry: Don't want to reuse port between dialer and listener on the same device
dialer := df.New(cfg.Options, tlsCfg, registry.New())
// Relays might take some time to register the device, so dial multiple times // Relays might take some time to register the device, so dial multiple times
clientConn, err := dialer.Dial(ctx, deviceId, addr) clientConn, err := dialer.Dial(ctx, deviceId, addr)

View File

@ -41,6 +41,7 @@ func init() {
type quicDialer struct { type quicDialer struct {
commonDialer commonDialer
registry *registry.Registry
} }
func (d *quicDialer) Dial(ctx context.Context, _ protocol.DeviceID, uri *url.URL) (internalConn, error) { func (d *quicDialer) Dial(ctx context.Context, _ protocol.DeviceID, uri *url.URL) (internalConn, error) {
@ -58,7 +59,7 @@ func (d *quicDialer) Dial(ctx context.Context, _ protocol.DeviceID, uri *url.URL
// Given we always pass the connection to quic, it assumes it's a remote connection it never closes it, // Given we always pass the connection to quic, it assumes it's a remote connection it never closes it,
// So our wrapper around it needs to close it, but it only needs to close it if it's not the listening connection. // So our wrapper around it needs to close it, but it only needs to close it if it's not the listening connection.
var createdConn net.PacketConn var createdConn net.PacketConn
listenConn := registry.Get(uri.Scheme, packetConnUnspecified) listenConn := d.registry.Get(uri.Scheme, packetConnUnspecified)
if listenConn != nil { if listenConn != nil {
conn = listenConn.(net.PacketConn) conn = listenConn.(net.PacketConn)
} else { } else {
@ -96,7 +97,7 @@ func (d *quicDialer) Dial(ctx context.Context, _ protocol.DeviceID, uri *url.URL
type quicDialerFactory struct{} type quicDialerFactory struct{}
func (quicDialerFactory) New(opts config.OptionsConfiguration, tlsCfg *tls.Config) genericDialer { func (quicDialerFactory) New(opts config.OptionsConfiguration, tlsCfg *tls.Config, registry *registry.Registry) genericDialer {
// So the idea is that we should probably try dialing every 20 seconds. // So the idea is that we should probably try dialing every 20 seconds.
// However it would still be nice if this was adjustable/proportional to ReconnectIntervalS // However it would still be nice if this was adjustable/proportional to ReconnectIntervalS
// But prevent something silly like 1/3 = 0 etc. // But prevent something silly like 1/3 = 0 etc.
@ -104,10 +105,13 @@ func (quicDialerFactory) New(opts config.OptionsConfiguration, tlsCfg *tls.Confi
if quicInterval < 10 { if quicInterval < 10 {
quicInterval = 10 quicInterval = 10
} }
return &quicDialer{commonDialer{ return &quicDialer{
reconnectInterval: time.Duration(quicInterval) * time.Second, commonDialer: commonDialer{
tlsCfg: tlsCfg, reconnectInterval: time.Duration(quicInterval) * time.Second,
}} tlsCfg: tlsCfg,
},
registry: registry,
}
} }
func (quicDialerFactory) Priority() int { func (quicDialerFactory) Priority() int {

View File

@ -40,11 +40,12 @@ type quicListener struct {
onAddressesChangedNotifier onAddressesChangedNotifier
uri *url.URL uri *url.URL
cfg config.Wrapper cfg config.Wrapper
tlsCfg *tls.Config tlsCfg *tls.Config
conns chan internalConn conns chan internalConn
factory listenerFactory factory listenerFactory
registry *registry.Registry
address *url.URL address *url.URL
laddr net.Addr laddr net.Addr
@ -100,8 +101,8 @@ func (t *quicListener) serve(ctx context.Context) error {
go svc.Serve(ctx) go svc.Serve(ctx)
registry.Register(t.uri.Scheme, conn) t.registry.Register(t.uri.Scheme, conn)
defer registry.Unregister(t.uri.Scheme, conn) defer t.registry.Unregister(t.uri.Scheme, conn)
listener, err := quic.Listen(conn, t.tlsCfg, quicConfig) listener, err := quic.Listen(conn, t.tlsCfg, quicConfig)
if err != nil { if err != nil {
@ -217,13 +218,14 @@ func (f *quicListenerFactory) Valid(config.Configuration) error {
return nil return nil
} }
func (f *quicListenerFactory) New(uri *url.URL, cfg config.Wrapper, tlsCfg *tls.Config, conns chan internalConn, natService *nat.Service) genericListener { func (f *quicListenerFactory) New(uri *url.URL, cfg config.Wrapper, tlsCfg *tls.Config, conns chan internalConn, natService *nat.Service, registry *registry.Registry) genericListener {
l := &quicListener{ l := &quicListener{
uri: fixupPort(uri, config.DefaultQUICPort), uri: fixupPort(uri, config.DefaultQUICPort),
cfg: cfg, cfg: cfg,
tlsCfg: tlsCfg, tlsCfg: tlsCfg,
conns: conns, conns: conns,
factory: f, factory: f,
registry: registry,
} }
l.ServiceWithError = svcutil.AsService(l.serve, l.String()) l.ServiceWithError = svcutil.AsService(l.serve, l.String())
l.nat.Store(stun.NATUnknown) l.nat.Store(stun.NATUnknown)

View File

@ -15,10 +15,6 @@ import (
"github.com/syncthing/syncthing/lib/sync" "github.com/syncthing/syncthing/lib/sync"
) )
var (
Default = New()
)
type Registry struct { type Registry struct {
mut sync.Mutex mut sync.Mutex
available map[string][]interface{} available map[string][]interface{}
@ -85,15 +81,3 @@ func (r *Registry) Get(scheme string, preferred func(interface{}) bool) interfac
} }
return best return best
} }
func Register(scheme string, item interface{}) {
Default.Register(scheme, item)
}
func Unregister(scheme string, item interface{}) {
Default.Unregister(scheme, item)
}
func Get(scheme string, preferred func(interface{}) bool) interface{} {
return Default.Get(scheme, preferred)
}

View File

@ -13,6 +13,7 @@ import (
"time" "time"
"github.com/syncthing/syncthing/lib/config" "github.com/syncthing/syncthing/lib/config"
"github.com/syncthing/syncthing/lib/connections/registry"
"github.com/syncthing/syncthing/lib/dialer" "github.com/syncthing/syncthing/lib/dialer"
"github.com/syncthing/syncthing/lib/protocol" "github.com/syncthing/syncthing/lib/protocol"
"github.com/syncthing/syncthing/lib/relay/client" "github.com/syncthing/syncthing/lib/relay/client"
@ -68,7 +69,7 @@ func (d *relayDialer) Dial(ctx context.Context, id protocol.DeviceID, uri *url.U
type relayDialerFactory struct{} type relayDialerFactory struct{}
func (relayDialerFactory) New(opts config.OptionsConfiguration, tlsCfg *tls.Config) genericDialer { func (relayDialerFactory) New(opts config.OptionsConfiguration, tlsCfg *tls.Config, _ *registry.Registry) genericDialer {
return &relayDialer{commonDialer{ return &relayDialer{commonDialer{
trafficClass: opts.TrafficClass, trafficClass: opts.TrafficClass,
reconnectInterval: time.Duration(opts.RelayReconnectIntervalM) * time.Minute, reconnectInterval: time.Duration(opts.RelayReconnectIntervalM) * time.Minute,

View File

@ -16,6 +16,7 @@ import (
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/syncthing/syncthing/lib/config" "github.com/syncthing/syncthing/lib/config"
"github.com/syncthing/syncthing/lib/connections/registry"
"github.com/syncthing/syncthing/lib/dialer" "github.com/syncthing/syncthing/lib/dialer"
"github.com/syncthing/syncthing/lib/nat" "github.com/syncthing/syncthing/lib/nat"
"github.com/syncthing/syncthing/lib/relay/client" "github.com/syncthing/syncthing/lib/relay/client"
@ -177,7 +178,7 @@ func (t *relayListener) NATType() string {
type relayListenerFactory struct{} type relayListenerFactory struct{}
func (f *relayListenerFactory) New(uri *url.URL, cfg config.Wrapper, tlsCfg *tls.Config, conns chan internalConn, natService *nat.Service) genericListener { func (f *relayListenerFactory) New(uri *url.URL, cfg config.Wrapper, tlsCfg *tls.Config, conns chan internalConn, natService *nat.Service, _ *registry.Registry) genericListener {
t := &relayListener{ t := &relayListener{
uri: uri, uri: uri,
cfg: cfg, cfg: cfg,

View File

@ -23,6 +23,7 @@ import (
"time" "time"
"github.com/syncthing/syncthing/lib/config" "github.com/syncthing/syncthing/lib/config"
"github.com/syncthing/syncthing/lib/connections/registry"
"github.com/syncthing/syncthing/lib/discover" "github.com/syncthing/syncthing/lib/discover"
"github.com/syncthing/syncthing/lib/events" "github.com/syncthing/syncthing/lib/events"
"github.com/syncthing/syncthing/lib/nat" "github.com/syncthing/syncthing/lib/nat"
@ -160,6 +161,7 @@ type service struct {
limiter *limiter limiter *limiter
natService *nat.Service natService *nat.Service
evLogger events.Logger evLogger events.Logger
registry *registry.Registry
dialNow chan struct{} dialNow chan struct{}
dialNowDevices map[protocol.DeviceID]struct{} dialNowDevices map[protocol.DeviceID]struct{}
@ -170,7 +172,7 @@ type service struct {
listenerTokens map[string]suture.ServiceToken listenerTokens map[string]suture.ServiceToken
} }
func NewService(cfg config.Wrapper, myID protocol.DeviceID, mdl Model, tlsCfg *tls.Config, discoverer discover.Finder, bepProtocolName string, tlsDefaultCommonName string, evLogger events.Logger) Service { func NewService(cfg config.Wrapper, myID protocol.DeviceID, mdl Model, tlsCfg *tls.Config, discoverer discover.Finder, bepProtocolName string, tlsDefaultCommonName string, evLogger events.Logger, registry *registry.Registry) Service {
spec := svcutil.SpecWithInfoLogger(l) spec := svcutil.SpecWithInfoLogger(l)
service := &service{ service := &service{
Supervisor: suture.New("connections.Service", spec), Supervisor: suture.New("connections.Service", spec),
@ -187,6 +189,7 @@ func NewService(cfg config.Wrapper, myID protocol.DeviceID, mdl Model, tlsCfg *t
limiter: newLimiter(myID, cfg), limiter: newLimiter(myID, cfg),
natService: nat.NewService(myID, cfg), natService: nat.NewService(myID, cfg),
evLogger: evLogger, evLogger: evLogger,
registry: registry,
dialNowDevicesMut: sync.NewMutex(), dialNowDevicesMut: sync.NewMutex(),
dialNow: make(chan struct{}, 1), dialNow: make(chan struct{}, 1),
@ -655,7 +658,7 @@ func (s *service) resolveDialTargets(ctx context.Context, now time.Time, cfg con
continue continue
} }
dialer := dialerFactory.New(s.cfg.Options(), s.tlsCfg) dialer := dialerFactory.New(s.cfg.Options(), s.tlsCfg, s.registry)
nextDialAt.set(deviceID, addr, now.Add(dialer.RedialFrequency())) nextDialAt.set(deviceID, addr, now.Add(dialer.RedialFrequency()))
// For LAN addresses, increase the priority so that we // For LAN addresses, increase the priority so that we
@ -755,7 +758,7 @@ func (s *service) createListener(factory listenerFactory, uri *url.URL) bool {
l.Debugln("Starting listener", uri) l.Debugln("Starting listener", uri)
listener := factory.New(uri, s.cfg, s.tlsCfg, s.conns, s.natService) listener := factory.New(uri, s.cfg, s.tlsCfg, s.conns, s.natService, s.registry)
listener.OnAddressesChanged(s.logListenAddressesChangedEvent) listener.OnAddressesChanged(s.logListenAddressesChangedEvent)
// Retrying a listener many times in rapid succession is unlikely to help, // Retrying a listener many times in rapid succession is unlikely to help,

View File

@ -16,6 +16,7 @@ import (
"time" "time"
"github.com/syncthing/syncthing/lib/config" "github.com/syncthing/syncthing/lib/config"
"github.com/syncthing/syncthing/lib/connections/registry"
"github.com/syncthing/syncthing/lib/nat" "github.com/syncthing/syncthing/lib/nat"
"github.com/syncthing/syncthing/lib/protocol" "github.com/syncthing/syncthing/lib/protocol"
"github.com/syncthing/syncthing/lib/stats" "github.com/syncthing/syncthing/lib/stats"
@ -139,7 +140,7 @@ func (c internalConn) String() string {
} }
type dialerFactory interface { type dialerFactory interface {
New(config.OptionsConfiguration, *tls.Config) genericDialer New(config.OptionsConfiguration, *tls.Config, *registry.Registry) genericDialer
Priority() int Priority() int
AlwaysWAN() bool AlwaysWAN() bool
Valid(config.Configuration) error Valid(config.Configuration) error
@ -162,7 +163,7 @@ type genericDialer interface {
} }
type listenerFactory interface { type listenerFactory interface {
New(*url.URL, config.Wrapper, *tls.Config, chan internalConn, *nat.Service) genericListener New(*url.URL, config.Wrapper, *tls.Config, chan internalConn, *nat.Service, *registry.Registry) genericListener
Valid(config.Configuration) error Valid(config.Configuration) error
} }

View File

@ -13,6 +13,7 @@ import (
"time" "time"
"github.com/syncthing/syncthing/lib/config" "github.com/syncthing/syncthing/lib/config"
"github.com/syncthing/syncthing/lib/connections/registry"
"github.com/syncthing/syncthing/lib/dialer" "github.com/syncthing/syncthing/lib/dialer"
"github.com/syncthing/syncthing/lib/protocol" "github.com/syncthing/syncthing/lib/protocol"
) )
@ -28,6 +29,7 @@ func init() {
type tcpDialer struct { type tcpDialer struct {
commonDialer commonDialer
registry *registry.Registry
} }
func (d *tcpDialer) Dial(ctx context.Context, _ protocol.DeviceID, uri *url.URL) (internalConn, error) { func (d *tcpDialer) Dial(ctx context.Context, _ protocol.DeviceID, uri *url.URL) (internalConn, error) {
@ -35,7 +37,7 @@ func (d *tcpDialer) Dial(ctx context.Context, _ protocol.DeviceID, uri *url.URL)
timeoutCtx, cancel := context.WithTimeout(ctx, 10*time.Second) timeoutCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel() defer cancel()
conn, err := dialer.DialContextReusePort(timeoutCtx, uri.Scheme, uri.Host) conn, err := dialer.DialContextReusePortFunc(d.registry)(timeoutCtx, uri.Scheme, uri.Host)
if err != nil { if err != nil {
return internalConn{}, err return internalConn{}, err
} }
@ -62,12 +64,15 @@ func (d *tcpDialer) Dial(ctx context.Context, _ protocol.DeviceID, uri *url.URL)
type tcpDialerFactory struct{} type tcpDialerFactory struct{}
func (tcpDialerFactory) New(opts config.OptionsConfiguration, tlsCfg *tls.Config) genericDialer { func (tcpDialerFactory) New(opts config.OptionsConfiguration, tlsCfg *tls.Config, registry *registry.Registry) genericDialer {
return &tcpDialer{commonDialer{ return &tcpDialer{
trafficClass: opts.TrafficClass, commonDialer: commonDialer{
reconnectInterval: time.Duration(opts.ReconnectIntervalS) * time.Second, trafficClass: opts.TrafficClass,
tlsCfg: tlsCfg, reconnectInterval: time.Duration(opts.ReconnectIntervalS) * time.Second,
}} tlsCfg: tlsCfg,
},
registry: registry,
}
} }
func (tcpDialerFactory) Priority() int { func (tcpDialerFactory) Priority() int {

View File

@ -32,11 +32,12 @@ type tcpListener struct {
svcutil.ServiceWithError svcutil.ServiceWithError
onAddressesChangedNotifier onAddressesChangedNotifier
uri *url.URL uri *url.URL
cfg config.Wrapper cfg config.Wrapper
tlsCfg *tls.Config tlsCfg *tls.Config
conns chan internalConn conns chan internalConn
factory listenerFactory factory listenerFactory
registry *registry.Registry
natService *nat.Service natService *nat.Service
mapping *nat.Mapping mapping *nat.Mapping
@ -69,8 +70,8 @@ func (t *tcpListener) serve(ctx context.Context) error {
t.notifyAddressesChanged(t) t.notifyAddressesChanged(t)
defer t.clearAddresses(t) defer t.clearAddresses(t)
registry.Register(t.uri.Scheme, tcaddr) t.registry.Register(t.uri.Scheme, tcaddr)
defer registry.Unregister(t.uri.Scheme, tcaddr) defer t.registry.Unregister(t.uri.Scheme, tcaddr)
l.Infof("TCP listener (%v) starting", tcaddr) l.Infof("TCP listener (%v) starting", tcaddr)
defer l.Infof("TCP listener (%v) shutting down", tcaddr) defer l.Infof("TCP listener (%v) shutting down", tcaddr)
@ -213,7 +214,7 @@ func (t *tcpListener) NATType() string {
type tcpListenerFactory struct{} type tcpListenerFactory struct{}
func (f *tcpListenerFactory) New(uri *url.URL, cfg config.Wrapper, tlsCfg *tls.Config, conns chan internalConn, natService *nat.Service) genericListener { func (f *tcpListenerFactory) New(uri *url.URL, cfg config.Wrapper, tlsCfg *tls.Config, conns chan internalConn, natService *nat.Service, registry *registry.Registry) genericListener {
l := &tcpListener{ l := &tcpListener{
uri: fixupPort(uri, config.DefaultTCPPort), uri: fixupPort(uri, config.DefaultTCPPort),
cfg: cfg, cfg: cfg,
@ -221,6 +222,7 @@ func (f *tcpListenerFactory) New(uri *url.URL, cfg config.Wrapper, tlsCfg *tls.C
conns: conns, conns: conns,
natService: natService, natService: natService,
factory: f, factory: f,
registry: registry,
} }
l.ServiceWithError = svcutil.AsService(l.serve, l.String()) l.ServiceWithError = svcutil.AsService(l.serve, l.String())
return l return l

View File

@ -104,32 +104,34 @@ func DialContext(ctx context.Context, network, addr string) (net.Conn, error) {
// DialContextReusePort tries dialing via proxy if a proxy is configured, and falls back to // DialContextReusePort tries dialing via proxy if a proxy is configured, and falls back to
// a direct connection reusing the port from the connections registry, if no proxy is defined, or connecting via proxy // a direct connection reusing the port from the connections registry, if no proxy is defined, or connecting via proxy
// fails. It also in parallel dials without reusing the port, just in case reusing the port affects routing decisions badly. // fails. It also in parallel dials without reusing the port, just in case reusing the port affects routing decisions badly.
func DialContextReusePort(ctx context.Context, network, addr string) (net.Conn, error) { func DialContextReusePortFunc(registry *registry.Registry) func(ctx context.Context, network, addr string) (net.Conn, error) {
// If proxy is configured, there is no point trying to reuse listen addresses. return func(ctx context.Context, network, addr string) (net.Conn, error) {
if proxy.FromEnvironment() != proxy.Direct { // If proxy is configured, there is no point trying to reuse listen addresses.
return DialContext(ctx, network, addr) if proxy.FromEnvironment() != proxy.Direct {
} return DialContext(ctx, network, addr)
}
localAddrInterface := registry.Get(network, func(addr interface{}) bool { localAddrInterface := registry.Get(network, func(addr interface{}) bool {
return addr.(*net.TCPAddr).IP.IsUnspecified() return addr.(*net.TCPAddr).IP.IsUnspecified()
}) })
if localAddrInterface == nil { if localAddrInterface == nil {
// Nothing listening, nothing to reuse. // Nothing listening, nothing to reuse.
return DialContext(ctx, network, addr) return DialContext(ctx, network, addr)
} }
laddr, ok := localAddrInterface.(*net.TCPAddr) laddr, ok := localAddrInterface.(*net.TCPAddr)
if !ok { if !ok {
return nil, errUnexpectedInterfaceType return nil, errUnexpectedInterfaceType
} }
// Dial twice, once reusing the listen address, another time not reusing it, just in case reusing the address // Dial twice, once reusing the listen address, another time not reusing it, just in case reusing the address
// influences routing and we fail to reach our destination. // influences routing and we fail to reach our destination.
dialer := net.Dialer{ dialer := net.Dialer{
Control: ReusePortControl, Control: ReusePortControl,
LocalAddr: laddr, LocalAddr: laddr,
}
return dialTwicePreferFirst(ctx, dialer.DialContext, (&net.Dialer{}).DialContext, "reuse", "non-reuse", network, addr)
} }
return dialTwicePreferFirst(ctx, dialer.DialContext, (&net.Dialer{}).DialContext, "reuse", "non-reuse", network, addr)
} }
type dialFunc func(ctx context.Context, network, address string) (net.Conn, error) type dialFunc func(ctx context.Context, network, address string) (net.Conn, error)

View File

@ -14,6 +14,7 @@ import (
"time" "time"
"github.com/syncthing/syncthing/lib/config" "github.com/syncthing/syncthing/lib/config"
"github.com/syncthing/syncthing/lib/connections/registry"
"github.com/syncthing/syncthing/lib/events" "github.com/syncthing/syncthing/lib/events"
"github.com/syncthing/syncthing/lib/protocol" "github.com/syncthing/syncthing/lib/protocol"
) )
@ -23,7 +24,7 @@ func setupCache() *manager {
cfg.Options.LocalAnnEnabled = false cfg.Options.LocalAnnEnabled = false
cfg.Options.GlobalAnnEnabled = false cfg.Options.GlobalAnnEnabled = false
return NewManager(protocol.LocalDeviceID, config.Wrap("", cfg, protocol.LocalDeviceID, events.NoopLogger), tls.Certificate{}, events.NoopLogger, nil).(*manager) return NewManager(protocol.LocalDeviceID, config.Wrap("", cfg, protocol.LocalDeviceID, events.NoopLogger), tls.Certificate{}, events.NoopLogger, nil, registry.New()).(*manager)
} }
func TestCacheUnique(t *testing.T) { func TestCacheUnique(t *testing.T) {

View File

@ -14,12 +14,14 @@ import (
"errors" "errors"
"fmt" "fmt"
"io" "io"
"net"
"net/http" "net/http"
"net/url" "net/url"
"strconv" "strconv"
stdsync "sync" stdsync "sync"
"time" "time"
"github.com/syncthing/syncthing/lib/connections/registry"
"github.com/syncthing/syncthing/lib/dialer" "github.com/syncthing/syncthing/lib/dialer"
"github.com/syncthing/syncthing/lib/events" "github.com/syncthing/syncthing/lib/events"
"github.com/syncthing/syncthing/lib/protocol" "github.com/syncthing/syncthing/lib/protocol"
@ -71,7 +73,7 @@ func (e *lookupError) CacheFor() time.Duration {
return e.cacheFor return e.cacheFor
} }
func NewGlobal(server string, cert tls.Certificate, addrList AddressLister, evLogger events.Logger) (FinderService, error) { func NewGlobal(server string, cert tls.Certificate, addrList AddressLister, evLogger events.Logger, registry *registry.Registry) (FinderService, error) {
server, opts, err := parseOptions(server) server, opts, err := parseOptions(server)
if err != nil { if err != nil {
return nil, err return nil, err
@ -88,10 +90,16 @@ func NewGlobal(server string, cert tls.Certificate, addrList AddressLister, evLo
// The http.Client used for announcements. It needs to have our // The http.Client used for announcements. It needs to have our
// certificate to prove our identity, and may or may not verify the server // certificate to prove our identity, and may or may not verify the server
// certificate depending on the insecure setting. // certificate depending on the insecure setting.
var dialContext func(ctx context.Context, network, addr string) (net.Conn, error)
if registry != nil {
dialContext = dialer.DialContextReusePortFunc(registry)
} else {
dialContext = dialer.DialContext
}
var announceClient httpClient = &contextClient{&http.Client{ var announceClient httpClient = &contextClient{&http.Client{
Timeout: requestTimeout, Timeout: requestTimeout,
Transport: &http.Transport{ Transport: &http.Transport{
DialContext: dialer.DialContextReusePort, DialContext: dialContext,
Proxy: http.ProxyFromEnvironment, Proxy: http.ProxyFromEnvironment,
TLSClientConfig: &tls.Config{ TLSClientConfig: &tls.Config{
InsecureSkipVerify: opts.insecure, InsecureSkipVerify: opts.insecure,

View File

@ -16,6 +16,7 @@ import (
"testing" "testing"
"time" "time"
"github.com/syncthing/syncthing/lib/connections/registry"
"github.com/syncthing/syncthing/lib/events" "github.com/syncthing/syncthing/lib/events"
"github.com/syncthing/syncthing/lib/protocol" "github.com/syncthing/syncthing/lib/protocol"
"github.com/syncthing/syncthing/lib/tlsutil" "github.com/syncthing/syncthing/lib/tlsutil"
@ -56,15 +57,17 @@ func TestGlobalOverHTTP(t *testing.T) {
// is only allowed in combination with the "insecure" and "noannounce" // is only allowed in combination with the "insecure" and "noannounce"
// parameters. // parameters.
if _, err := NewGlobal("http://192.0.2.42/", tls.Certificate{}, nil, events.NoopLogger); err == nil { registry := registry.New()
if _, err := NewGlobal("http://192.0.2.42/", tls.Certificate{}, nil, events.NoopLogger, registry); err == nil {
t.Fatal("http is not allowed without insecure and noannounce") t.Fatal("http is not allowed without insecure and noannounce")
} }
if _, err := NewGlobal("http://192.0.2.42/?insecure", tls.Certificate{}, nil, events.NoopLogger); err == nil { if _, err := NewGlobal("http://192.0.2.42/?insecure", tls.Certificate{}, nil, events.NoopLogger, registry); err == nil {
t.Fatal("http is not allowed without noannounce") t.Fatal("http is not allowed without noannounce")
} }
if _, err := NewGlobal("http://192.0.2.42/?noannounce", tls.Certificate{}, nil, events.NoopLogger); err == nil { if _, err := NewGlobal("http://192.0.2.42/?noannounce", tls.Certificate{}, nil, events.NoopLogger, registry); err == nil {
t.Fatal("http is not allowed without insecure") t.Fatal("http is not allowed without insecure")
} }
@ -185,7 +188,7 @@ func TestGlobalAnnounce(t *testing.T) {
go func() { _ = http.Serve(list, mux) }() go func() { _ = http.Serve(list, mux) }()
url := "https://" + list.Addr().String() + "?insecure" url := "https://" + list.Addr().String() + "?insecure"
disco, err := NewGlobal(url, cert, new(fakeAddressLister), events.NoopLogger) disco, err := NewGlobal(url, cert, new(fakeAddressLister), events.NoopLogger, registry.New())
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -210,7 +213,7 @@ func TestGlobalAnnounce(t *testing.T) {
} }
func testLookup(url string) ([]string, error) { func testLookup(url string) ([]string, error) {
disco, err := NewGlobal(url, tls.Certificate{}, nil, events.NoopLogger) disco, err := NewGlobal(url, tls.Certificate{}, nil, events.NoopLogger, registry.New())
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -19,6 +19,7 @@ import (
"github.com/thejerf/suture/v4" "github.com/thejerf/suture/v4"
"github.com/syncthing/syncthing/lib/config" "github.com/syncthing/syncthing/lib/config"
"github.com/syncthing/syncthing/lib/connections/registry"
"github.com/syncthing/syncthing/lib/events" "github.com/syncthing/syncthing/lib/events"
"github.com/syncthing/syncthing/lib/protocol" "github.com/syncthing/syncthing/lib/protocol"
"github.com/syncthing/syncthing/lib/svcutil" "github.com/syncthing/syncthing/lib/svcutil"
@ -44,12 +45,13 @@ type manager struct {
cert tls.Certificate cert tls.Certificate
evLogger events.Logger evLogger events.Logger
addressLister AddressLister addressLister AddressLister
registry *registry.Registry
finders map[string]cachedFinder finders map[string]cachedFinder
mut sync.RWMutex mut sync.RWMutex
} }
func NewManager(myID protocol.DeviceID, cfg config.Wrapper, cert tls.Certificate, evLogger events.Logger, lister AddressLister) Manager { func NewManager(myID protocol.DeviceID, cfg config.Wrapper, cert tls.Certificate, evLogger events.Logger, lister AddressLister, registry *registry.Registry) Manager {
m := &manager{ m := &manager{
Supervisor: suture.New("discover.Manager", svcutil.SpecWithDebugLogger(l)), Supervisor: suture.New("discover.Manager", svcutil.SpecWithDebugLogger(l)),
myID: myID, myID: myID,
@ -57,6 +59,7 @@ func NewManager(myID protocol.DeviceID, cfg config.Wrapper, cert tls.Certificate
cert: cert, cert: cert,
evLogger: evLogger, evLogger: evLogger,
addressLister: lister, addressLister: lister,
registry: registry,
finders: make(map[string]cachedFinder), finders: make(map[string]cachedFinder),
mut: sync.NewRWMutex(), mut: sync.NewRWMutex(),
@ -257,7 +260,7 @@ func (m *manager) CommitConfiguration(_, to config.Configuration) (handled bool)
if _, ok := m.finders[identity]; ok { if _, ok := m.finders[identity]; ok {
continue continue
} }
gd, err := NewGlobal(srv, m.cert, m.addressLister, m.evLogger) gd, err := NewGlobal(srv, m.cert, m.addressLister, m.evLogger, m.registry)
if err != nil { if err != nil {
l.Warnln("Global discovery:", err) l.Warnln("Global discovery:", err)
continue continue

View File

@ -26,6 +26,7 @@ import (
"github.com/syncthing/syncthing/lib/build" "github.com/syncthing/syncthing/lib/build"
"github.com/syncthing/syncthing/lib/config" "github.com/syncthing/syncthing/lib/config"
"github.com/syncthing/syncthing/lib/connections" "github.com/syncthing/syncthing/lib/connections"
"github.com/syncthing/syncthing/lib/connections/registry"
"github.com/syncthing/syncthing/lib/db" "github.com/syncthing/syncthing/lib/db"
"github.com/syncthing/syncthing/lib/db/backend" "github.com/syncthing/syncthing/lib/db/backend"
"github.com/syncthing/syncthing/lib/discover" "github.com/syncthing/syncthing/lib/discover"
@ -276,8 +277,9 @@ func (a *App) startup() error {
// Create a wrapper that is then wired after they are both setup. // Create a wrapper that is then wired after they are both setup.
addrLister := &lateAddressLister{} addrLister := &lateAddressLister{}
discoveryManager := discover.NewManager(a.myID, a.cfg, a.cert, a.evLogger, addrLister) connRegistry := registry.New()
connectionsService := connections.NewService(a.cfg, a.myID, m, tlsCfg, discoveryManager, bepProtocolName, tlsDefaultCommonName, a.evLogger) discoveryManager := discover.NewManager(a.myID, a.cfg, a.cert, a.evLogger, addrLister, connRegistry)
connectionsService := connections.NewService(a.cfg, a.myID, m, tlsCfg, discoveryManager, bepProtocolName, tlsDefaultCommonName, a.evLogger, connRegistry)
addrLister.AddressLister = connectionsService addrLister.AddressLister = connectionsService