This commit is contained in:
Jakob Borg 2023-05-24 16:29:35 +02:00
parent 2597ac5c25
commit 6207609fb6
2 changed files with 38 additions and 4 deletions

View File

@ -8,7 +8,10 @@ package connections
import (
"context"
"crypto/sha256"
"crypto/tls"
"encoding/base32"
"encoding/binary"
"fmt"
"io"
"net"
@ -89,13 +92,19 @@ func (t connType) Transport() string {
}
func newInternalConn(tc tlsConn, connType connType, isLocal bool, priority int) internalConn {
now := time.Now()
buf := binary.BigEndian.AppendUint64(nil, uint64(now.UnixNano()))
hash := sha256.Sum224([]byte(fmt.Sprintf("%v-%v-%v", tc.LocalAddr(), connType.Transport(), tc.RemoteAddr())))
buf = append(buf, hash[:]...)
connectionID := base32.HexEncoding.WithPadding(base32.NoPadding).EncodeToString(buf)
return internalConn{
tlsConn: tc,
connType: connType,
isLocal: isLocal,
priority: priority,
establishedAt: time.Now().Truncate(time.Second),
connectionID: fmt.Sprintf("%v-%v-%v", tc.LocalAddr(), connType.Transport(), tc.RemoteAddr()),
establishedAt: now.Truncate(time.Second),
connectionID: connectionID,
}
}

View File

@ -281,10 +281,13 @@ func (m *model) serve(ctx context.Context) error {
for {
select {
case <-ctx.Done():
l.Infoln("context closed, stopping", ctx.Err())
return ctx.Err()
case err := <-m.fatalChan:
l.Infoln("fatal error, stopping", err)
return svcutil.AsFatalErr(err, svcutil.ExitError)
case <-m.promotionTimer.C:
l.Infoln("promotion timer fired")
m.promoteConnections()
}
}
@ -1192,7 +1195,11 @@ func (m *model) handleIndex(conn protocol.Connection, folder string, fs []protoc
return fmt.Errorf("%s: %w", folder, ErrFolderPaused)
}
indexHandler := m.ensureIndexHandler(conn)
indexHandler, ok := m.getIndexHandler(conn)
if !ok {
l.Infof("%v for folder %s sent from %s (connection %s), but no index handler is registered for this connection.", op, folder, deviceID.Short(), conn.ConnectionID())
return fmt.Errorf("%s: %w", folder, ErrFolderNotRunning)
}
return indexHandler.ReceiveIndex(folder, fs, update, op)
}
@ -1208,6 +1215,7 @@ func (m *model) ClusterConfig(conn protocol.Connection, cm protocol.ClusterConfi
if cm.Secondary {
// No handling of secondary connection ClusterConfigs; they merely
// indicate the connection is ready to start.
l.Infoln("Ignoring secondary connection cluster-config on connection", conn.ConnectionID())
return nil
}
@ -1218,7 +1226,7 @@ func (m *model) ClusterConfig(conn protocol.Connection, cm protocol.ClusterConfi
deviceID := conn.DeviceID()
connID := conn.ConnectionID()
l.Debugf("Handling ClusterConfig from %v/%s", deviceID.Short(), connID)
l.Infof("Handling ClusterConfig from %v/%s", deviceID.Short(), connID)
indexHandlerRegistry := m.ensureIndexHandler(conn)
@ -1374,6 +1382,23 @@ func (m *model) ensureIndexHandler(conn protocol.Connection) *indexHandlerRegist
return indexHandlerRegistry
}
func (m *model) getIndexHandler(conn protocol.Connection) (*indexHandlerRegistry, bool) {
deviceID := conn.DeviceID()
connID := conn.ConnectionID()
m.pmut.RLock()
defer m.pmut.RUnlock()
indexHandlerRegistry, ok := m.indexHandlers[deviceID]
if ok && indexHandlerRegistry.conn.ConnectionID() == connID {
// This is an existing and proper index handler for this connection.
return indexHandlerRegistry, true
}
// There is no index handler, or it's not registered for this connection.
return nil, false
}
func (m *model) ccHandleFolders(folders []protocol.Folder, deviceCfg config.DeviceConfiguration, ccDeviceInfos map[string]*clusterConfigDeviceInfo, indexHandlers *indexHandlerRegistry) ([]string, map[string]remoteFolderState, error) {
var folderDevice config.FolderDeviceConfiguration
tempIndexFolders := make([]string, 0, len(folders))