syncthing/lib/connections/service.go
Jakob Borg d507126101 lib/protocol: Understand older/newer Hello messages (fixes #3287)
This is in preparation for future changes, but also improves the
handling when talking to pre-v0.13 clients. It breaks out the Hello
message and magic from the rest of the protocol implementation, with the
intention that this small part of the protocol will survive future
changes.

To enable this, and future testing, the new ExchangeHello function takes
an interface that can be implemented by future Hello versions and
returns a version indendent result type. It correctly detects pre-v0.13
protocols and returns a "too old" error message which gets logged to the
user at warning level:

   [I6KAH] 09:21:36 WARNING: Connecting to [...]:
     the remote device speaks an older version of the protocol (v0.12) not
     compatible with this version

Conversely, something entirely unknown will generate:

   [I6KAH] 09:40:27 WARNING: Connecting to [...]:
     the remote device speaks an unknown (newer?) version of the protocol

The intention is that in future iterations the Hello exchange will
succeed on at least one side and ExchangeHello will return the actual
data from the Hello together with ErrTooOld and an even more precise
message can be generated.

GitHub-Pull-Request: https://github.com/syncthing/syncthing/pull/3289
2016-06-09 10:50:14 +00:00

591 lines
16 KiB
Go

// Copyright (C) 2015 The Syncthing Authors.
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at http://mozilla.org/MPL/2.0/.
package connections
import (
"crypto/tls"
"errors"
"fmt"
"io"
"net"
"net/url"
"time"
"github.com/juju/ratelimit"
"github.com/syncthing/syncthing/lib/config"
"github.com/syncthing/syncthing/lib/discover"
"github.com/syncthing/syncthing/lib/events"
"github.com/syncthing/syncthing/lib/nat"
"github.com/syncthing/syncthing/lib/protocol"
"github.com/syncthing/syncthing/lib/sync"
"github.com/syncthing/syncthing/lib/util"
// Registers NAT service providers
_ "github.com/syncthing/syncthing/lib/pmp"
_ "github.com/syncthing/syncthing/lib/upnp"
"github.com/thejerf/suture"
)
var (
dialers = make(map[string]dialerFactory, 0)
listeners = make(map[string]listenerFactory, 0)
)
// Service listens and dials all configured unconnected devices, via supported
// dialers. Successful connections are handed to the model.
type Service struct {
*suture.Supervisor
cfg *config.Wrapper
myID protocol.DeviceID
model Model
tlsCfg *tls.Config
discoverer discover.Finder
conns chan IntermediateConnection
bepProtocolName string
tlsDefaultCommonName string
lans []*net.IPNet
writeRateLimit *ratelimit.Bucket
readRateLimit *ratelimit.Bucket
natService *nat.Service
natServiceToken *suture.ServiceToken
listenersMut sync.RWMutex
listeners map[string]genericListener
listenerTokens map[string]suture.ServiceToken
curConMut sync.Mutex
currentConnection map[protocol.DeviceID]Connection
}
func NewService(cfg *config.Wrapper, myID protocol.DeviceID, mdl Model, tlsCfg *tls.Config, discoverer discover.Finder,
bepProtocolName string, tlsDefaultCommonName string, lans []*net.IPNet) *Service {
service := &Service{
Supervisor: suture.NewSimple("connections.Service"),
cfg: cfg,
myID: myID,
model: mdl,
tlsCfg: tlsCfg,
discoverer: discoverer,
conns: make(chan IntermediateConnection),
bepProtocolName: bepProtocolName,
tlsDefaultCommonName: tlsDefaultCommonName,
lans: lans,
natService: nat.NewService(myID, cfg),
listenersMut: sync.NewRWMutex(),
listeners: make(map[string]genericListener),
listenerTokens: make(map[string]suture.ServiceToken),
curConMut: sync.NewMutex(),
currentConnection: make(map[protocol.DeviceID]Connection),
}
cfg.Subscribe(service)
// The rate variables are in KiB/s in the UI (despite the camel casing
// of the name). We multiply by 1024 here to get B/s.
options := service.cfg.Options()
if options.MaxSendKbps > 0 {
service.writeRateLimit = ratelimit.NewBucketWithRate(float64(1024*options.MaxSendKbps), int64(5*1024*options.MaxSendKbps))
}
if options.MaxRecvKbps > 0 {
service.readRateLimit = ratelimit.NewBucketWithRate(float64(1024*options.MaxRecvKbps), int64(5*1024*options.MaxRecvKbps))
}
// There are several moving parts here; one routine per listening address
// (handled in configuration changing) to handle incoming connections,
// one routine to periodically attempt outgoing connections, one routine to
// the the common handling regardless of whether the connection was
// incoming or outgoing.
service.Add(serviceFunc(service.connect))
service.Add(serviceFunc(service.handle))
raw := cfg.Raw()
// Actually starts the listeners and NAT service
service.CommitConfiguration(raw, raw)
return service
}
var (
errDisabled = errors.New("disabled by configuration")
)
func (s *Service) handle() {
next:
for c := range s.conns {
cs := c.ConnectionState()
// We should have negotiated the next level protocol "bep/1.0" as part
// of the TLS handshake. Unfortunately this can't be a hard error,
// because there are implementations out there that don't support
// protocol negotiation (iOS for one...).
if !cs.NegotiatedProtocolIsMutual || cs.NegotiatedProtocol != s.bepProtocolName {
l.Infof("Peer %s did not negotiate bep/1.0", c.RemoteAddr())
}
// We should have received exactly one certificate from the other
// side. If we didn't, they don't have a device ID and we drop the
// connection.
certs := cs.PeerCertificates
if cl := len(certs); cl != 1 {
l.Infof("Got peer certificate list of length %d != 1 from %s; protocol error", cl, c.RemoteAddr())
c.Close()
continue
}
remoteCert := certs[0]
remoteID := protocol.NewDeviceID(remoteCert.Raw)
// The device ID should not be that of ourselves. It can happen
// though, especially in the presence of NAT hairpinning, multiple
// clients between the same NAT gateway, and global discovery.
if remoteID == s.myID {
l.Infof("Connected to myself (%s) - should not happen", remoteID)
c.Close()
continue
}
c.SetDeadline(time.Now().Add(20 * time.Second))
hello, err := protocol.ExchangeHello(c, s.model.GetHello(remoteID))
if err != nil {
if protocol.IsVersionMismatch(err) {
// The error will be a relatively user friendly description
// of what's wrong with the version compatibility
l.Warnf("Connecting to %s (%s): %s", remoteID, c.RemoteAddr(), err)
} else {
// It's something else - connection reset or whatever
l.Infof("Failed to exchange Hello messages with %s (%s): %s", remoteID, c.RemoteAddr(), err)
}
c.Close()
continue
}
c.SetDeadline(time.Time{})
s.model.OnHello(remoteID, c.RemoteAddr(), hello)
// If we have a relay connection, and the new incoming connection is
// not a relay connection, we should drop that, and prefer the this one.
s.curConMut.Lock()
ct, ok := s.currentConnection[remoteID]
s.curConMut.Unlock()
// Lower priority is better, just like nice etc.
if ok && ct.Priority > c.Priority {
l.Debugln("Switching connections", remoteID)
s.model.Close(remoteID, protocol.ErrSwitchingConnections)
} else if s.model.ConnectedTo(remoteID) {
// We should not already be connected to the other party. TODO: This
// could use some better handling. If the old connection is dead but
// hasn't timed out yet we may want to drop *that* connection and keep
// this one. But in case we are two devices connecting to each other
// in parallel we don't want to do that or we end up with no
// connections still established...
l.Infof("Connected to already connected device (%s)", remoteID)
c.Close()
continue
} else if s.model.IsPaused(remoteID) {
l.Infof("Connection from paused device (%s)", remoteID)
c.Close()
continue
}
for deviceID, deviceCfg := range s.cfg.Devices() {
if deviceID == remoteID {
// Verify the name on the certificate. By default we set it to
// "syncthing" when generating, but the user may have replaced
// the certificate and used another name.
certName := deviceCfg.CertName
if certName == "" {
certName = s.tlsDefaultCommonName
}
err := remoteCert.VerifyHostname(certName)
if err != nil {
// Incorrect certificate name is something the user most
// likely wants to know about, since it's an advanced
// config. Warn instead of Info.
l.Warnf("Bad certificate from %s (%v): %v", remoteID, c.RemoteAddr(), err)
c.Close()
continue next
}
// If rate limiting is set, and based on the address we should
// limit the connection, then we wrap it in a limiter.
limit := s.shouldLimit(c.RemoteAddr())
wr := io.Writer(c)
if limit && s.writeRateLimit != nil {
wr = NewWriteLimiter(c, s.writeRateLimit)
}
rd := io.Reader(c)
if limit && s.readRateLimit != nil {
rd = NewReadLimiter(c, s.readRateLimit)
}
name := fmt.Sprintf("%s-%s (%s)", c.LocalAddr(), c.RemoteAddr(), c.Type)
protoConn := protocol.NewConnection(remoteID, rd, wr, s.model, name, deviceCfg.Compression)
modelConn := Connection{c, protoConn}
l.Infof("Established secure connection to %s at %s", remoteID, name)
l.Debugf("cipher suite: %04X in lan: %t", c.ConnectionState().CipherSuite, !limit)
s.model.AddConnection(modelConn, hello)
s.curConMut.Lock()
s.currentConnection[remoteID] = modelConn
s.curConMut.Unlock()
continue next
}
}
l.Infof("Connection from %s (%s) with ignored device ID %s", c.RemoteAddr(), c.Type, remoteID)
c.Close()
}
}
func (s *Service) connect() {
nextDial := make(map[string]time.Time)
// Used as delay for the first few connection attempts, increases
// exponentially
initialRampup := time.Second
// Calculated from actual dialers reconnectInterval
var sleep time.Duration
for {
cfg := s.cfg.Raw()
bestDialerPrio := 1<<31 - 1 // worse prio won't build on 32 bit
for _, df := range dialers {
if !df.Enabled(cfg) {
continue
}
if prio := df.Priority(); prio < bestDialerPrio {
bestDialerPrio = prio
}
}
l.Debugln("Reconnect loop")
now := time.Now()
var seen []string
nextDevice:
for _, deviceCfg := range cfg.Devices {
deviceID := deviceCfg.DeviceID
if deviceID == s.myID {
continue
}
paused := s.model.IsPaused(deviceID)
if paused {
continue
}
connected := s.model.ConnectedTo(deviceID)
s.curConMut.Lock()
ct := s.currentConnection[deviceID]
s.curConMut.Unlock()
if connected && ct.Priority == bestDialerPrio {
// Things are already as good as they can get.
continue
}
l.Debugln("Reconnect loop for", deviceID)
var addrs []string
for _, addr := range deviceCfg.Addresses {
if addr == "dynamic" {
if s.discoverer != nil {
if t, err := s.discoverer.Lookup(deviceID); err == nil {
addrs = append(addrs, t...)
}
}
} else {
addrs = append(addrs, addr)
}
}
seen = append(seen, addrs...)
for _, addr := range addrs {
nextDialAt, ok := nextDial[addr]
if ok && initialRampup >= sleep && nextDialAt.After(now) {
l.Debugf("Not dialing %v as sleep is %v, next dial is at %s and current time is %s", addr, sleep, nextDialAt, now)
continue
}
// If we fail at any step before actually getting the dialer
// retry in a minute
nextDial[addr] = now.Add(time.Minute)
uri, err := url.Parse(addr)
if err != nil {
l.Infof("Dialer for %s: %v", addr, err)
continue
}
dialerFactory, err := s.getDialerFactory(cfg, uri)
if err == errDisabled {
l.Debugln("Dialer for", uri, "is disabled")
continue
}
if err != nil {
l.Infof("Dialer for %v: %v", uri, err)
continue
}
if connected && dialerFactory.Priority() >= ct.Priority {
l.Debugf("Not dialing using %s as priorty is less than current connection (%d >= %d)", dialerFactory, dialerFactory.Priority(), ct.Priority)
continue
}
dialer := dialerFactory.New(s.cfg, s.tlsCfg)
l.Debugln("dial", deviceCfg.DeviceID, uri)
nextDial[addr] = now.Add(dialer.RedialFrequency())
conn, err := dialer.Dial(deviceID, uri)
if err != nil {
l.Debugln("dial failed", deviceCfg.DeviceID, uri, err)
continue
}
if connected {
s.model.Close(deviceID, protocol.ErrSwitchingConnections)
}
s.conns <- conn
continue nextDevice
}
}
nextDial, sleep = filterAndFindSleepDuration(nextDial, seen, now)
if initialRampup < sleep {
l.Debugln("initial rampup; sleep", initialRampup, "and update to", initialRampup*2)
time.Sleep(initialRampup)
initialRampup *= 2
} else {
l.Debugln("sleep until next dial", sleep)
time.Sleep(sleep)
}
}
}
func (s *Service) shouldLimit(addr net.Addr) bool {
if s.cfg.Options().LimitBandwidthInLan {
return true
}
tcpaddr, ok := addr.(*net.TCPAddr)
if !ok {
return true
}
for _, lan := range s.lans {
if lan.Contains(tcpaddr.IP) {
return false
}
}
return !tcpaddr.IP.IsLoopback()
}
func (s *Service) createListener(factory listenerFactory, uri *url.URL) bool {
// must be called with listenerMut held
l.Debugln("Starting listener", uri)
listener := factory.New(uri, s.cfg, s.tlsCfg, s.conns, s.natService)
listener.OnAddressesChanged(s.logListenAddressesChangedEvent)
s.listeners[uri.String()] = listener
s.listenerTokens[uri.String()] = s.Add(listener)
return true
}
func (s *Service) logListenAddressesChangedEvent(l genericListener) {
events.Default.Log(events.ListenAddressesChanged, map[string]interface{}{
"address": l.URI(),
"lan": l.LANAddresses(),
"wan": l.WANAddresses(),
})
}
func (s *Service) VerifyConfiguration(from, to config.Configuration) error {
return nil
}
func (s *Service) CommitConfiguration(from, to config.Configuration) bool {
// We require a restart if a device as been removed.
restart := false
newDevices := make(map[protocol.DeviceID]bool, len(to.Devices))
for _, dev := range to.Devices {
newDevices[dev.DeviceID] = true
}
for _, dev := range from.Devices {
if !newDevices[dev.DeviceID] {
restart = true
}
}
s.listenersMut.Lock()
seen := make(map[string]struct{})
for _, addr := range config.Wrap("", to).ListenAddresses() {
if _, ok := s.listeners[addr]; ok {
seen[addr] = struct{}{}
continue
}
uri, err := url.Parse(addr)
if err != nil {
l.Infof("Listener for %s: %v", addr, err)
continue
}
factory, err := s.getListenerFactory(to, uri)
if err == errDisabled {
l.Debugln("Listener for", uri, "is disabled")
continue
}
if err != nil {
l.Infof("Listener for %v: %v", uri, err)
continue
}
s.createListener(factory, uri)
seen[addr] = struct{}{}
}
for addr, listener := range s.listeners {
if _, ok := seen[addr]; !ok || !listener.Factory().Enabled(to) {
l.Debugln("Stopping listener", addr)
s.Remove(s.listenerTokens[addr])
delete(s.listenerTokens, addr)
delete(s.listeners, addr)
}
}
s.listenersMut.Unlock()
if to.Options.NATEnabled && s.natServiceToken == nil {
l.Debugln("Starting NAT service")
token := s.Add(s.natService)
s.natServiceToken = &token
} else if !to.Options.NATEnabled && s.natServiceToken != nil {
l.Debugln("Stopping NAT service")
s.Remove(*s.natServiceToken)
s.natServiceToken = nil
}
return !restart
}
func (s *Service) AllAddresses() []string {
s.listenersMut.RLock()
var addrs []string
for _, listener := range s.listeners {
for _, lanAddr := range listener.LANAddresses() {
addrs = append(addrs, lanAddr.String())
}
for _, wanAddr := range listener.WANAddresses() {
addrs = append(addrs, wanAddr.String())
}
}
s.listenersMut.RUnlock()
return util.UniqueStrings(addrs)
}
func (s *Service) ExternalAddresses() []string {
s.listenersMut.RLock()
var addrs []string
for _, listener := range s.listeners {
for _, wanAddr := range listener.WANAddresses() {
addrs = append(addrs, wanAddr.String())
}
}
s.listenersMut.RUnlock()
return util.UniqueStrings(addrs)
}
func (s *Service) Status() map[string]interface{} {
s.listenersMut.RLock()
result := make(map[string]interface{})
for addr, listener := range s.listeners {
status := make(map[string]interface{})
err := listener.Error()
if err != nil {
status["error"] = err.Error()
}
status["lanAddresses"] = urlsToStrings(listener.LANAddresses())
status["wanAddresses"] = urlsToStrings(listener.WANAddresses())
result[addr] = status
}
s.listenersMut.RUnlock()
return result
}
func (s *Service) getDialerFactory(cfg config.Configuration, uri *url.URL) (dialerFactory, error) {
dialerFactory, ok := dialers[uri.Scheme]
if !ok {
return nil, fmt.Errorf("unknown address scheme %q", uri.Scheme)
}
if !dialerFactory.Enabled(cfg) {
return nil, errDisabled
}
return dialerFactory, nil
}
func (s *Service) getListenerFactory(cfg config.Configuration, uri *url.URL) (listenerFactory, error) {
listenerFactory, ok := listeners[uri.Scheme]
if !ok {
return nil, fmt.Errorf("unknown address scheme %q", uri.Scheme)
}
if !listenerFactory.Enabled(cfg) {
return nil, errDisabled
}
return listenerFactory, nil
}
func filterAndFindSleepDuration(nextDial map[string]time.Time, seen []string, now time.Time) (map[string]time.Time, time.Duration) {
newNextDial := make(map[string]time.Time)
for _, addr := range seen {
nextDialAt, ok := nextDial[addr]
if ok {
newNextDial[addr] = nextDialAt
}
}
min := time.Minute
for _, next := range newNextDial {
cur := next.Sub(now)
if cur < min {
min = cur
}
}
return newNextDial, min
}
func urlsToStrings(urls []*url.URL) []string {
strings := make([]string, len(urls))
for i, url := range urls {
strings[i] = url.String()
}
return strings
}