This commit is contained in:
Jakob Borg 2023-05-23 22:37:48 +02:00
parent c5c9b4a7f6
commit a7a3dd08ad
6 changed files with 92 additions and 55 deletions

View File

@ -434,7 +434,7 @@ func (s *service) handleHellos(ctx context.Context) error {
s.dialNowDevicesMut.Unlock() s.dialNowDevicesMut.Unlock()
}() }()
l.Infof("Established secure connection to %s at %s", remoteID, c) l.Infof("Established secure connection to %s at %s", remoteID.Short(), c)
s.model.AddConnection(protoConn, hello) s.model.AddConnection(protoConn, hello)
continue continue

View File

@ -9,7 +9,9 @@ package connections
import ( import (
"context" "context"
"crypto/tls" "crypto/tls"
"math/rand"
"net/url" "net/url"
"os"
"time" "time"
"github.com/syncthing/syncthing/lib/config" "github.com/syncthing/syncthing/lib/config"
@ -62,6 +64,15 @@ func (d *tcpDialer) Dial(ctx context.Context, _ protocol.DeviceID, uri *url.URL)
if isLocal { if isLocal {
priority = d.lanPriority priority = d.lanPriority
} }
// XXX: Induced flakyness
if dur, _ := time.ParseDuration(os.Getenv("TCP_FLAKY_LIFETIME")); dur > 0 {
dur = dur/2 + time.Duration(rand.Intn(int(dur)))
time.AfterFunc(dur, func() {
tc.Close()
})
}
return newInternalConn(tc, connTypeTCPClient, isLocal, priority), nil return newInternalConn(tc, connTypeTCPClient, isLocal, priority), nil
} }

View File

@ -129,15 +129,27 @@ func (s *indexHandler) waitForFileset(ctx context.Context) (*db.FileSet, error)
} }
func (s *indexHandler) Serve(ctx context.Context) (err error) { func (s *indexHandler) Serve(ctx context.Context) (err error) {
l.Debugf("Starting index handler for %s to %s at %s (slv=%d)", s.folder, s.conn.ID(), s.conn, s.prevSequence) l.Infof("Starting index handler for %s to %s at %s (slv=%d)", s.folder, s.conn.ID().Short(), s.conn, s.prevSequence)
stop := make(chan struct{}) stop := make(chan struct{})
defer func() { defer func() {
err = svcutil.NoRestartErr(err) err = svcutil.NoRestartErr(err)
l.Debugf("Exiting index handler for %s to %s at %s: %v", s.folder, s.conn.ID(), s.conn, err) l.Infof("Exiting index handler for %s to %s at %s: %v", s.folder, s.conn.ID().Short(), s.conn, err)
close(stop) close(stop)
}() }()
ctx, cancel := context.WithCancel(ctx)
// Cancel the context when the connection closes
go func() {
select {
case <-s.conn.Closed():
cancel()
case <-ctx.Done():
case <-stop:
}
}()
// Broadcast the pause cond when the context quits // Broadcast the pause cond when the context quits
go func() { go func() {
select { select {

View File

@ -1165,21 +1165,22 @@ func (p *pager) done() bool {
// Index is called when a new device is connected and we receive their full index. // Index is called when a new device is connected and we receive their full index.
// Implements the protocol.Model interface. // Implements the protocol.Model interface.
func (m *model) Index(conn protocol.Connection, folder string, fs []protocol.FileInfo) error { func (m *model) Index(conn protocol.Connection, folder string, fs []protocol.FileInfo) error {
return m.handleIndex(conn.ID(), folder, fs, false) return m.handleIndex(conn, folder, fs, false)
} }
// IndexUpdate is called for incremental updates to connected devices' indexes. // IndexUpdate is called for incremental updates to connected devices' indexes.
// Implements the protocol.Model interface. // Implements the protocol.Model interface.
func (m *model) IndexUpdate(conn protocol.Connection, folder string, fs []protocol.FileInfo) error { func (m *model) IndexUpdate(conn protocol.Connection, folder string, fs []protocol.FileInfo) error {
return m.handleIndex(conn.ID(), folder, fs, true) return m.handleIndex(conn, folder, fs, true)
} }
func (m *model) handleIndex(deviceID protocol.DeviceID, folder string, fs []protocol.FileInfo, update bool) error { func (m *model) handleIndex(conn protocol.Connection, folder string, fs []protocol.FileInfo, update bool) error {
op := "Index" op := "Index"
if update { if update {
op += " update" op += " update"
} }
deviceID := conn.ID()
l.Debugf("%v (in): %s / %q: %d files", op, deviceID, folder, len(fs)) l.Debugf("%v (in): %s / %q: %d files", op, deviceID, folder, len(fs))
if cfg, ok := m.cfg.Folder(folder); !ok || !cfg.SharedWith(deviceID) { if cfg, ok := m.cfg.Folder(folder); !ok || !cfg.SharedWith(deviceID) {
@ -1190,18 +1191,7 @@ func (m *model) handleIndex(deviceID protocol.DeviceID, folder string, fs []prot
return fmt.Errorf("%s: %w", folder, ErrFolderPaused) return fmt.Errorf("%s: %w", folder, ErrFolderPaused)
} }
m.pmut.RLock() indexHandler := m.ensureIndexHandler(conn)
indexHandler, ok := m.indexHandlers[deviceID]
m.pmut.RUnlock()
if !ok {
// This should be impossible, as an index handler always exists for an
// open connection, and this method can't be called on a closed
// connection
m.evLogger.Log(events.Failure, "index sender does not exist for connection on which indexes were received")
l.Debugf("%v for folder (ID %q) sent from device %q: missing index handler", op, folder, deviceID)
return fmt.Errorf("index handler missing: %s", folder)
}
return indexHandler.ReceiveIndex(folder, fs, update, op) return indexHandler.ReceiveIndex(folder, fs, update, op)
} }
@ -1226,14 +1216,10 @@ func (m *model) ClusterConfig(conn protocol.Connection, cm protocol.ClusterConfi
// temporary indexes, subscribe the connection. // temporary indexes, subscribe the connection.
deviceID := conn.ID() deviceID := conn.ID()
l.Debugf("Handling ClusterConfig from %v/%s", deviceID.Short(), conn.ConnectionID()) connID := conn.ConnectionID()
l.Debugf("Handling ClusterConfig from %v/%s", deviceID.Short(), connID)
m.pmut.RLock() indexHandlerRegistry := m.ensureIndexHandler(conn)
indexHandlerRegistry, ok := m.indexHandlers[deviceID]
m.pmut.RUnlock()
if !ok {
panic("bug: ClusterConfig called on closed or nonexistent connection")
}
deviceCfg, ok := m.cfg.Device(deviceID) deviceCfg, ok := m.cfg.Device(deviceID)
if !ok { if !ok {
@ -1354,6 +1340,39 @@ func (m *model) ClusterConfig(conn protocol.Connection, cm protocol.ClusterConfi
return nil return nil
} }
func (m *model) ensureIndexHandler(conn protocol.Connection) *indexHandlerRegistry {
deviceID := conn.ID()
connID := conn.ConnectionID()
m.pmut.Lock()
defer m.pmut.Unlock()
indexHandlerRegistry, ok := m.indexHandlers[deviceID]
if ok && indexHandlerRegistry.conn.ConnectionID() == connID {
// This is an existing and proper index handler for this connection.
return indexHandlerRegistry
}
if ok {
// A handler exists, but it's for another connection than the one we
// now got a ClusterConfig on. This should be unusual as it means
// the other side has decided to start using a new primary
// connection but we haven't seen it close yet. Ideally it will
// close shortly by itself...
l.Infoln("Abandoning old index handler for", deviceID)
}
// Create a new index handler for this device.
indexHandlerRegistry = newIndexHandlerRegistry(conn, m.deviceDownloads[deviceID], m.closed[connID], m.Supervisor, m.evLogger)
for id, fcfg := range m.folderCfgs {
indexHandlerRegistry.RegisterFolderState(fcfg, m.folderFiles[id], m.folderRunners[id])
}
m.indexHandlers[deviceID] = indexHandlerRegistry
m.deviceDownloads[deviceID] = newDeviceDownloadState()
return indexHandlerRegistry
}
func (m *model) ccHandleFolders(folders []protocol.Folder, deviceCfg config.DeviceConfiguration, ccDeviceInfos map[string]*clusterConfigDeviceInfo, indexHandlers *indexHandlerRegistry) ([]string, map[string]remoteFolderState, error) { func (m *model) ccHandleFolders(folders []protocol.Folder, deviceCfg config.DeviceConfiguration, ccDeviceInfos map[string]*clusterConfigDeviceInfo, indexHandlers *indexHandlerRegistry) ([]string, map[string]remoteFolderState, error) {
var folderDevice config.FolderDeviceConfiguration var folderDevice config.FolderDeviceConfiguration
tempIndexFolders := make([]string, 0, len(folders)) tempIndexFolders := make([]string, 0, len(folders))
@ -1859,8 +1878,10 @@ func (m *model) Closed(conn protocol.Connection, err error) {
// XXX: all the below needs more thinking about when to remove what // XXX: all the below needs more thinking about when to remove what
if removedIsPrimary { if removedIsPrimary {
m.progressEmitter.temporaryIndexUnsubscribe(conn) m.progressEmitter.temporaryIndexUnsubscribe(conn)
if idxh, ok := m.indexHandlers[deviceID]; ok && idxh.conn.ConnectionID() == connID {
delete(m.indexHandlers, deviceID) delete(m.indexHandlers, deviceID)
delete(m.deviceDownloads, deviceID) delete(m.deviceDownloads, deviceID)
}
m.scheduleConnectionPromotion() m.scheduleConnectionPromotion()
} }
if len(remainingConns) == 0 { if len(remainingConns) == 0 {
@ -1876,11 +1897,17 @@ func (m *model) Closed(conn protocol.Connection, err error) {
} }
m.pmut.Unlock() m.pmut.Unlock()
l.Infof("Connection to %s at %s closed: %v", deviceID, conn, err) k := map[bool]string{false: "secondary", true: "primary"}[removedIsPrimary]
l.Infof("Lost %s connection to %s at %s: %v (%d remain)", k, deviceID.Short(), conn, err, len(remainingConns))
if len(remainingConns) == 0 {
l.Infof("Connection to %s at %s closed: %v", deviceID.Short(), conn, err)
m.evLogger.Log(events.DeviceDisconnected, map[string]string{ m.evLogger.Log(events.DeviceDisconnected, map[string]string{
"id": deviceID.String(), "id": deviceID.String(),
"error": err.Error(), "error": err.Error(),
}) })
} else {
}
close(closed) close(closed)
} }
@ -2338,7 +2365,7 @@ func (m *model) AddConnection(conn protocol.Connection, hello protocol.Hello) {
if len(m.deviceConns[deviceID]) == 1 { if len(m.deviceConns[deviceID]) == 1 {
l.Infof(`Device %s client is "%s %s" named "%s" at %s`, deviceID.Short(), hello.ClientName, hello.ClientVersion, hello.DeviceName, conn) l.Infof(`Device %s client is "%s %s" named "%s" at %s`, deviceID.Short(), hello.ClientName, hello.ClientVersion, hello.DeviceName, conn)
} else { } else {
l.Infof(`Additional connection #%d for device %s at %s`, len(m.deviceConns[deviceID]), deviceID.Short(), conn) l.Infof(`Additional connection (#%d) for device %s at %s`, len(m.deviceConns[deviceID])-1, deviceID.Short(), conn)
} }
m.pmut.Unlock() m.pmut.Unlock()
@ -2378,21 +2405,20 @@ func (m *model) promoteConnections() {
for deviceID, connIDs := range m.deviceConns { for deviceID, connIDs := range m.deviceConns {
cm, passwords := m.generateClusterConfigFRLocked(deviceID) cm, passwords := m.generateClusterConfigFRLocked(deviceID)
if _, ok := m.indexHandlers[deviceID]; !ok { if idxh, ok := m.indexHandlers[deviceID]; !ok || idxh.conn.ConnectionID() != connIDs[0] {
// Connected device lacks an index handler. We should promote // Primary device lacks an index handler. We should promote the
// the primary connection to be the index handling one. // primary connection to be the index handling one.
l.Infoln("Promoting connection", connIDs[0], "to", deviceID) l.Infoln("Promoting connection", connIDs[0], "to", deviceID.Short())
conn := m.conns[connIDs[0]] conn := m.conns[connIDs[0]]
m.promoteDeviceConnectionLocked(conn)
if conn.Statistics().StartedAt.IsZero() { if conn.Statistics().StartedAt.IsZero() {
conn.SetFolderPasswords(passwords) conn.SetFolderPasswords(passwords)
conn.Start() conn.Start()
}
conn.ClusterConfig(cm) conn.ClusterConfig(cm)
} }
}
// Make sure any new connections also get started, and an empty // Make sure any new connections also get started, and a
// cluster config. // secondary-marked ClusterConfig.
for _, connID := range connIDs[1:] { for _, connID := range connIDs[1:] {
conn := m.conns[connID] conn := m.conns[connID]
if conn.Statistics().StartedAt.IsZero() { if conn.Statistics().StartedAt.IsZero() {
@ -2404,18 +2430,6 @@ func (m *model) promoteConnections() {
} }
} }
func (m *model) promoteDeviceConnectionLocked(conn protocol.Connection) {
deviceID := conn.ID()
connID := conn.ConnectionID()
m.deviceDownloads[deviceID] = newDeviceDownloadState()
indexRegistry := newIndexHandlerRegistry(conn, m.deviceDownloads[deviceID], m.closed[connID], m.Supervisor, m.evLogger)
for id, fcfg := range m.folderCfgs {
indexRegistry.RegisterFolderState(fcfg, m.folderFiles[id], m.folderRunners[id])
}
m.indexHandlers[deviceID] = indexRegistry
}
func (m *model) DownloadProgress(conn protocol.Connection, folder string, updates []protocol.FileDownloadProgressUpdate) error { func (m *model) DownloadProgress(conn protocol.Connection, folder string, updates []protocol.FileDownloadProgressUpdate) error {
deviceID := conn.ID() deviceID := conn.ID()

View File

@ -63,7 +63,7 @@
<maxRequestKiB>0</maxRequestKiB> <maxRequestKiB>0</maxRequestKiB>
<untrusted>false</untrusted> <untrusted>false</untrusted>
<remoteGUIPort>0</remoteGUIPort> <remoteGUIPort>0</remoteGUIPort>
<multipleConnections>9</multipleConnections> <multipleConnections>4</multipleConnections>
</device> </device>
<gui enabled="true" tls="false" debugging="true"> <gui enabled="true" tls="false" debugging="true">
<address>127.0.0.1:8081</address> <address>127.0.0.1:8081</address>

View File

@ -55,7 +55,7 @@
<maxRequestKiB>0</maxRequestKiB> <maxRequestKiB>0</maxRequestKiB>
<untrusted>false</untrusted> <untrusted>false</untrusted>
<remoteGUIPort>0</remoteGUIPort> <remoteGUIPort>0</remoteGUIPort>
<multipleConnections>9</multipleConnections> <multipleConnections>4</multipleConnections>
</device> </device>
<device id="MRIW7OK-NETT3M4-N6SBWME-N25O76W-YJKVXPH-FUMQJ3S-P57B74J-GBITBAC" name="s2" compression="metadata" introducer="false" skipIntroductionRemovals="false" introducedBy=""> <device id="MRIW7OK-NETT3M4-N6SBWME-N25O76W-YJKVXPH-FUMQJ3S-P57B74J-GBITBAC" name="s2" compression="metadata" introducer="false" skipIntroductionRemovals="false" introducedBy="">
<address>tcp://127.0.0.1:22002</address> <address>tcp://127.0.0.1:22002</address>