This commit is contained in:
Jakob Borg 2023-05-25 09:04:14 +02:00
parent 962fedc378
commit 795cbff55d
5 changed files with 64 additions and 29 deletions

View File

@ -68,7 +68,6 @@ var (
const (
perDeviceWarningIntv = 15 * time.Minute
tlsHandshakeTimeout = 10 * time.Second
minConnectionReplaceAge = 10 * time.Second
minConnectionLoopSleep = 5 * time.Second
stdConnectionLoopSleep = time.Minute
worstDialerPriority = math.MaxInt32
@ -318,17 +317,17 @@ func (s *service) connectionCheckEarly(remoteID protocol.DeviceID, c internalCon
return errNetworkNotAllowed
}
// Lower priority is better, just like nice etc.
if ct, ok := s.model.Connection(remoteID); ok {
if ct.Priority() > c.priority || time.Since(ct.Statistics().StartedAt) > minConnectionReplaceAge {
l.Debugf("Switching connections %s (existing: %s new: %s)", remoteID, ct, c)
} else if cfg.MultipleConnections <= s.connectionsForDevice(cfg.DeviceID) {
// We should not already be connected to the other party. TODO: This
// could use some better handling. If the old connection is dead but
// hasn't timed out yet we may want to drop *that* connection and keep
// this one. But in case we are two devices connecting to each other
// in parallel we don't want to do that or we end up with no
// connections still established...
if existing := s.connectionsForDevice(cfg.DeviceID); existing > 0 {
// Check if the new connection is better than an existing one. Lower
// priority is better, just like `nice` etc.
if ct, ok := s.model.Connection(remoteID); ok {
if ct.Priority() > c.priority {
l.Debugf("Switching connections %s (existing: %s new: %s)", remoteID, ct, c)
return nil
}
}
if existing >= cfg.MultipleConnections {
// We're not allowed to accept any more connections to this device.
return errDeviceAlreadyConnected
}
}

View File

@ -93,21 +93,28 @@ func (t connType) Transport() string {
func newInternalConn(tc tlsConn, connType connType, isLocal bool, priority int) internalConn {
now := time.Now()
buf := binary.BigEndian.AppendUint64(nil, uint64(now.UnixNano()))
hash := sha256.Sum224([]byte(fmt.Sprintf("%v-%v-%v", tc.LocalAddr(), connType.Transport(), tc.RemoteAddr())))
buf = append(buf, hash[:]...)
connectionID := base32.HexEncoding.WithPadding(base32.NoPadding).EncodeToString(buf)
return internalConn{
tlsConn: tc,
connType: connType,
isLocal: isLocal,
priority: priority,
establishedAt: now.Truncate(time.Second),
connectionID: connectionID,
connectionID: newConnectionID(tc, connType, now),
}
}
// newConnection generates a connection ID. The connection ID is designed to
// be 1) unique for each connection (even those reusing the same socket
// address on both sides), 2) sortable so that the connection with the
// lowest ID will be the primary one. This also coincides with being the
// oldest connection.
func newConnectionID(tc tlsConn, connType connType, now time.Time) string {
buf := binary.BigEndian.AppendUint64(nil, uint64(now.UnixNano()))
hash := sha256.Sum224([]byte(fmt.Sprintf("%v-%v-%v", tc.LocalAddr(), connType.Transport(), tc.RemoteAddr())))
buf = append(buf, hash[:]...)
return base32.HexEncoding.WithPadding(base32.NoPadding).EncodeToString(buf)
}
func (c internalConn) Close() error {
// *tls.Conn.Close() does more than it says on the tin. Specifically, it
// sends a TLS alert message, which might block forever if the

View File

@ -163,6 +163,7 @@ type model struct {
pmut sync.RWMutex
conns map[string]protocol.Connection // connection ID -> connection
deviceConns map[protocol.DeviceID][]string // device -> connection IDs (invariant: if the key exists, the value is len >= 1, with the primary connection at the start of the slice)
promotedConn map[protocol.DeviceID]string // device -> last promoted connection ID
connRequestLimiters map[protocol.DeviceID]*util.Semaphore
closed map[string]chan struct{} // connection ID -> closed channel
helloMessages map[protocol.DeviceID]protocol.Hello
@ -248,6 +249,7 @@ func NewModel(cfg config.Wrapper, id protocol.DeviceID, clientName, clientVersio
pmut: sync.NewRWMutex(),
conns: make(map[string]protocol.Connection),
deviceConns: make(map[protocol.DeviceID][]string),
promotedConn: make(map[protocol.DeviceID]string),
connRequestLimiters: make(map[protocol.DeviceID]*util.Semaphore),
closed: make(map[string]chan struct{}),
helloMessages: make(map[protocol.DeviceID]protocol.Hello),
@ -1912,6 +1914,7 @@ func (m *model) Closed(conn protocol.Connection, err error) {
if len(remainingConns) == 0 {
// All device connections closed
delete(m.deviceConns, deviceID)
delete(m.promotedConn, deviceID)
delete(m.connRequestLimiters, deviceID)
delete(m.helloMessages, deviceID)
delete(m.remoteFolderStates, deviceID)
@ -2433,10 +2436,33 @@ func (m *model) promoteConnections() {
defer m.pmut.Unlock()
for deviceID, connIDs := range m.deviceConns {
// Figure out the best current connection priority for this device.
bestPriority := m.conns[connIDs[0]].Priority()
for _, connID := range connIDs[1:] {
priority := m.conns[connID].Priority()
if priority < bestPriority {
bestPriority = priority
}
}
// Close connections with a worse connection priority than best.
closing := make(map[string]bool)
for _, connID := range connIDs {
if m.conns[connID].Priority() > bestPriority {
l.Infoln("Closing connection", connID, "to", deviceID.Short(), "because it has a worse connection priority than the best connection")
go m.conns[connID].Close(errReplacingConnection)
closing[connID] = true
}
}
cm, passwords := m.generateClusterConfigFRLocked(deviceID)
if idxh, ok := m.indexHandlers[deviceID]; !ok || idxh.conn.ConnectionID() != connIDs[0] {
// Primary device lacks an index handler. We should promote the
// primary connection to be the index handling one.
if !closing[connIDs[0]] && m.promotedConn[deviceID] != connIDs[0] {
// The last promoted connection is not the current primary; we
// should promote the primary connection to be the index
// handling one. We do this by sending a ClusterConfig on it,
// which will cause the other side to start sending us index
// messages there. (On our side, we manage index handlers based
// on where we get ClusterConfigs from the peer.)
l.Infoln("Promoting connection", connIDs[0], "to", deviceID.Short())
conn := m.conns[connIDs[0]]
if conn.Statistics().StartedAt.IsZero() {
@ -2444,11 +2470,15 @@ func (m *model) promoteConnections() {
conn.Start()
}
conn.ClusterConfig(cm)
m.promotedConn[deviceID] = connIDs[0]
}
// Make sure any new connections also get started, and a
// Make sure any other new connections also get started, and a
// secondary-marked ClusterConfig.
for _, connID := range connIDs[1:] {
if closing[connID] {
continue
}
conn := m.conns[connID]
if conn.Statistics().StartedAt.IsZero() {
conn.SetFolderPasswords(passwords)

View File

@ -1,5 +1,5 @@
<configuration version="37">
<folder id="default" label="" path="s1?files=10000&amp;sizeavg=25000000" type="sendreceive" rescanIntervalS="3600" fsWatcherEnabled="false" fsWatcherDelayS="10" ignorePerms="false" autoNormalize="true">
<folder id="default" label="" path="s1" type="sendreceive" rescanIntervalS="3600" fsWatcherEnabled="false" fsWatcherDelayS="10" ignorePerms="false" autoNormalize="true">
<filesystemType>fake</filesystemType>
<device id="I6KAH76-66SLLLB-5PFXSOA-UFJCDZC-YAOMLEK-CP2GB32-BV5RQST-3PSROAU" introducedBy="">
<encryptionPassword></encryptionPassword>
@ -123,9 +123,9 @@
<connectionLimitEnough>0</connectionLimitEnough>
<connectionLimitMax>0</connectionLimitMax>
<insecureAllowOldTLSVersions>false</insecureAllowOldTLSVersions>
<connectionPriorityTcpLan>20</connectionPriorityTcpLan>
<connectionPriorityTcpLan>10</connectionPriorityTcpLan>
<connectionPriorityQuicLan>20</connectionPriorityQuicLan>
<connectionPriorityTcpWan>40</connectionPriorityTcpWan>
<connectionPriorityTcpWan>30</connectionPriorityTcpWan>
<connectionPriorityQuicWan>40</connectionPriorityQuicWan>
<connectionPriorityRelay>50</connectionPriorityRelay>
<connectionPriorityUpgradeThreshold>0</connectionPriorityUpgradeThreshold>

View File

@ -44,8 +44,7 @@
</xattrFilter>
</folder>
<device id="I6KAH76-66SLLLB-5PFXSOA-UFJCDZC-YAOMLEK-CP2GB32-BV5RQST-3PSROAU" name="s1" compression="metadata" introducer="false" skipIntroductionRemovals="false" introducedBy="">
<address>tcp://127.0.0.1:22001</address>
<address>quic://127.0.0.1:22001</address>
<address>dynamic</address>
<paused>false</paused>
<autoAcceptFolders>false</autoAcceptFolders>
<maxSendKbps>0</maxSendKbps>
@ -121,9 +120,9 @@
<connectionLimitEnough>0</connectionLimitEnough>
<connectionLimitMax>0</connectionLimitMax>
<insecureAllowOldTLSVersions>false</insecureAllowOldTLSVersions>
<connectionPriorityTcpLan>20</connectionPriorityTcpLan>
<connectionPriorityTcpLan>10</connectionPriorityTcpLan>
<connectionPriorityQuicLan>20</connectionPriorityQuicLan>
<connectionPriorityTcpWan>40</connectionPriorityTcpWan>
<connectionPriorityTcpWan>30</connectionPriorityTcpWan>
<connectionPriorityQuicWan>40</connectionPriorityQuicWan>
<connectionPriorityRelay>50</connectionPriorityRelay>
<connectionPriorityUpgradeThreshold>0</connectionPriorityUpgradeThreshold>