From 795cbff55db41571355f408f9f4fffce5adc74c4 Mon Sep 17 00:00:00 2001 From: Jakob Borg Date: Thu, 25 May 2023 09:04:14 +0200 Subject: [PATCH] wip --- lib/connections/service.go | 23 +++++++++++------------ lib/connections/structs.go | 19 +++++++++++++------ lib/model/model.go | 38 ++++++++++++++++++++++++++++++++++---- test/h1/config.xml | 6 +++--- test/h2/config.xml | 7 +++---- 5 files changed, 64 insertions(+), 29 deletions(-) diff --git a/lib/connections/service.go b/lib/connections/service.go index f832c5685..4786eec54 100644 --- a/lib/connections/service.go +++ b/lib/connections/service.go @@ -68,7 +68,6 @@ var ( const ( perDeviceWarningIntv = 15 * time.Minute tlsHandshakeTimeout = 10 * time.Second - minConnectionReplaceAge = 10 * time.Second minConnectionLoopSleep = 5 * time.Second stdConnectionLoopSleep = time.Minute worstDialerPriority = math.MaxInt32 @@ -318,17 +317,17 @@ func (s *service) connectionCheckEarly(remoteID protocol.DeviceID, c internalCon return errNetworkNotAllowed } - // Lower priority is better, just like nice etc. - if ct, ok := s.model.Connection(remoteID); ok { - if ct.Priority() > c.priority || time.Since(ct.Statistics().StartedAt) > minConnectionReplaceAge { - l.Debugf("Switching connections %s (existing: %s new: %s)", remoteID, ct, c) - } else if cfg.MultipleConnections <= s.connectionsForDevice(cfg.DeviceID) { - // We should not already be connected to the other party. TODO: This - // could use some better handling. If the old connection is dead but - // hasn't timed out yet we may want to drop *that* connection and keep - // this one. But in case we are two devices connecting to each other - // in parallel we don't want to do that or we end up with no - // connections still established... + if existing := s.connectionsForDevice(cfg.DeviceID); existing > 0 { + // Check if the new connection is better than an existing one. Lower + // priority is better, just like `nice` etc. + if ct, ok := s.model.Connection(remoteID); ok { + if ct.Priority() > c.priority { + l.Debugf("Switching connections %s (existing: %s new: %s)", remoteID, ct, c) + return nil + } + } + if existing >= cfg.MultipleConnections { + // We're not allowed to accept any more connections to this device. return errDeviceAlreadyConnected } } diff --git a/lib/connections/structs.go b/lib/connections/structs.go index 2040fcb08..10feadaa3 100644 --- a/lib/connections/structs.go +++ b/lib/connections/structs.go @@ -93,21 +93,28 @@ func (t connType) Transport() string { func newInternalConn(tc tlsConn, connType connType, isLocal bool, priority int) internalConn { now := time.Now() - buf := binary.BigEndian.AppendUint64(nil, uint64(now.UnixNano())) - hash := sha256.Sum224([]byte(fmt.Sprintf("%v-%v-%v", tc.LocalAddr(), connType.Transport(), tc.RemoteAddr()))) - buf = append(buf, hash[:]...) - connectionID := base32.HexEncoding.WithPadding(base32.NoPadding).EncodeToString(buf) - return internalConn{ tlsConn: tc, connType: connType, isLocal: isLocal, priority: priority, establishedAt: now.Truncate(time.Second), - connectionID: connectionID, + connectionID: newConnectionID(tc, connType, now), } } +// newConnection generates a connection ID. The connection ID is designed to +// be 1) unique for each connection (even those reusing the same socket +// address on both sides), 2) sortable so that the connection with the +// lowest ID will be the primary one. This also coincides with being the +// oldest connection. +func newConnectionID(tc tlsConn, connType connType, now time.Time) string { + buf := binary.BigEndian.AppendUint64(nil, uint64(now.UnixNano())) + hash := sha256.Sum224([]byte(fmt.Sprintf("%v-%v-%v", tc.LocalAddr(), connType.Transport(), tc.RemoteAddr()))) + buf = append(buf, hash[:]...) + return base32.HexEncoding.WithPadding(base32.NoPadding).EncodeToString(buf) +} + func (c internalConn) Close() error { // *tls.Conn.Close() does more than it says on the tin. Specifically, it // sends a TLS alert message, which might block forever if the diff --git a/lib/model/model.go b/lib/model/model.go index 967a9f629..7d15f9130 100644 --- a/lib/model/model.go +++ b/lib/model/model.go @@ -163,6 +163,7 @@ type model struct { pmut sync.RWMutex conns map[string]protocol.Connection // connection ID -> connection deviceConns map[protocol.DeviceID][]string // device -> connection IDs (invariant: if the key exists, the value is len >= 1, with the primary connection at the start of the slice) + promotedConn map[protocol.DeviceID]string // device -> last promoted connection ID connRequestLimiters map[protocol.DeviceID]*util.Semaphore closed map[string]chan struct{} // connection ID -> closed channel helloMessages map[protocol.DeviceID]protocol.Hello @@ -248,6 +249,7 @@ func NewModel(cfg config.Wrapper, id protocol.DeviceID, clientName, clientVersio pmut: sync.NewRWMutex(), conns: make(map[string]protocol.Connection), deviceConns: make(map[protocol.DeviceID][]string), + promotedConn: make(map[protocol.DeviceID]string), connRequestLimiters: make(map[protocol.DeviceID]*util.Semaphore), closed: make(map[string]chan struct{}), helloMessages: make(map[protocol.DeviceID]protocol.Hello), @@ -1912,6 +1914,7 @@ func (m *model) Closed(conn protocol.Connection, err error) { if len(remainingConns) == 0 { // All device connections closed delete(m.deviceConns, deviceID) + delete(m.promotedConn, deviceID) delete(m.connRequestLimiters, deviceID) delete(m.helloMessages, deviceID) delete(m.remoteFolderStates, deviceID) @@ -2433,10 +2436,33 @@ func (m *model) promoteConnections() { defer m.pmut.Unlock() for deviceID, connIDs := range m.deviceConns { + // Figure out the best current connection priority for this device. + bestPriority := m.conns[connIDs[0]].Priority() + for _, connID := range connIDs[1:] { + priority := m.conns[connID].Priority() + if priority < bestPriority { + bestPriority = priority + } + } + + // Close connections with a worse connection priority than best. + closing := make(map[string]bool) + for _, connID := range connIDs { + if m.conns[connID].Priority() > bestPriority { + l.Infoln("Closing connection", connID, "to", deviceID.Short(), "because it has a worse connection priority than the best connection") + go m.conns[connID].Close(errReplacingConnection) + closing[connID] = true + } + } + cm, passwords := m.generateClusterConfigFRLocked(deviceID) - if idxh, ok := m.indexHandlers[deviceID]; !ok || idxh.conn.ConnectionID() != connIDs[0] { - // Primary device lacks an index handler. We should promote the - // primary connection to be the index handling one. + if !closing[connIDs[0]] && m.promotedConn[deviceID] != connIDs[0] { + // The last promoted connection is not the current primary; we + // should promote the primary connection to be the index + // handling one. We do this by sending a ClusterConfig on it, + // which will cause the other side to start sending us index + // messages there. (On our side, we manage index handlers based + // on where we get ClusterConfigs from the peer.) l.Infoln("Promoting connection", connIDs[0], "to", deviceID.Short()) conn := m.conns[connIDs[0]] if conn.Statistics().StartedAt.IsZero() { @@ -2444,11 +2470,15 @@ func (m *model) promoteConnections() { conn.Start() } conn.ClusterConfig(cm) + m.promotedConn[deviceID] = connIDs[0] } - // Make sure any new connections also get started, and a + // Make sure any other new connections also get started, and a // secondary-marked ClusterConfig. for _, connID := range connIDs[1:] { + if closing[connID] { + continue + } conn := m.conns[connID] if conn.Statistics().StartedAt.IsZero() { conn.SetFolderPasswords(passwords) diff --git a/test/h1/config.xml b/test/h1/config.xml index 8dce0e261..7e2f87944 100644 --- a/test/h1/config.xml +++ b/test/h1/config.xml @@ -1,5 +1,5 @@ - + fake @@ -123,9 +123,9 @@ 0 0 false - 20 + 10 20 - 40 + 30 40 50 0 diff --git a/test/h2/config.xml b/test/h2/config.xml index 704dafbd2..a56c6335e 100644 --- a/test/h2/config.xml +++ b/test/h2/config.xml @@ -44,8 +44,7 @@ -
tcp://127.0.0.1:22001
-
quic://127.0.0.1:22001
+
dynamic
false false 0 @@ -121,9 +120,9 @@ 0 0 false - 20 + 10 20 - 40 + 30 40 50 0