diff --git a/cmd/syncthing/main.go b/cmd/syncthing/main.go index 6a85ddc69..699c61821 100644 --- a/cmd/syncthing/main.go +++ b/cmd/syncthing/main.go @@ -250,20 +250,12 @@ func main() { "clusterHash": clusterHash(cfg.Repositories[0].Nodes), } - // Routine to listen for incoming connections - if verbose { - infoln("Listening for incoming connections") - } - for _, addr := range cfg.Options.ListenAddress { - go listen(myID, addr, m, tlsCfg, connOpts) - } - // Routine to connect out to configured nodes if verbose { infoln("Attempting to connect to other nodes") } disc := discovery() - go connect(myID, disc, m, tlsCfg, connOpts) + go listenConnect(myID, disc, m, tlsCfg, connOpts) // Routine to pull blocks from other nodes to synchronize the local // repository. Does not run when we are in read only (publish only) mode. @@ -401,50 +393,106 @@ func printStatsLoop(m *Model) { } } -func listen(myID string, addr string, m *Model, tlsCfg *tls.Config, connOpts map[string]string) { - if debugNet { - dlog.Println("listening on", addr) +func listenConnect(myID string, disc *discover.Discoverer, m *Model, tlsCfg *tls.Config, connOpts map[string]string) { + var conns = make(chan *tls.Conn) + + // Listen + for _, addr := range cfg.Options.ListenAddress { + addr := addr + go func() { + if debugNet { + dlog.Println("listening on", addr) + } + l, err := tls.Listen("tcp", addr, tlsCfg) + fatalErr(err) + + for { + conn, err := l.Accept() + if err != nil { + warnln(err) + continue + } + + if debugNet { + dlog.Println("connect from", conn.RemoteAddr()) + } + + tc := conn.(*tls.Conn) + err = tc.Handshake() + if err != nil { + warnln(err) + tc.Close() + continue + } + + conns <- tc + } + }() } - l, err := tls.Listen("tcp", addr, tlsCfg) - fatalErr(err) -listen: - for { - conn, err := l.Accept() - if err != nil { - warnln(err) - continue + // Connect + go func() { + for { + nextNode: + for _, nodeCfg := range cfg.Repositories[0].Nodes { + if nodeCfg.NodeID == myID { + continue + } + if m.ConnectedTo(nodeCfg.NodeID) { + continue + } + for _, addr := range nodeCfg.Addresses { + if addr == "dynamic" { + if disc != nil { + t := disc.Lookup(nodeCfg.NodeID) + if len(t) == 0 { + continue + } + addr = t[0] //XXX: Handle all of them + } + } + + if debugNet { + dlog.Println("dial", nodeCfg.NodeID, addr) + } + conn, err := tls.Dial("tcp", addr, tlsCfg) + if err != nil { + if debugNet { + dlog.Println(err) + } + continue + } + + conns <- conn + continue nextNode + } + } + + time.Sleep(time.Duration(cfg.Options.ReconnectIntervalS) * time.Second) } + }() - if debugNet { - dlog.Println("connect from", conn.RemoteAddr()) - } - - tc := conn.(*tls.Conn) - err = tc.Handshake() - if err != nil { - warnln(err) - tc.Close() - continue - } - - remoteID := certID(tc.ConnectionState().PeerCertificates[0].Raw) +next: + for conn := range conns { + remoteID := certID(conn.ConnectionState().PeerCertificates[0].Raw) if remoteID == myID { - warnf("Connect from myself (%s) - should not happen", remoteID) + warnf("Connected to myself (%s) - should not happen", remoteID) conn.Close() continue } if m.ConnectedTo(remoteID) { - warnf("Connect from connected node (%s)", remoteID) + warnf("Connected to already connected node (%s)", remoteID) + conn.Close() + continue } for _, nodeCfg := range cfg.Repositories[0].Nodes { if nodeCfg.NodeID == remoteID { protoConn := protocol.NewConnection(remoteID, conn, conn, m, connOpts) m.AddConnection(conn, protoConn) - continue listen + continue next } } conn.Close() @@ -473,55 +521,6 @@ func discovery() *discover.Discoverer { return disc } -func connect(myID string, disc *discover.Discoverer, m *Model, tlsCfg *tls.Config, connOpts map[string]string) { - for { - nextNode: - for _, nodeCfg := range cfg.Repositories[0].Nodes { - if nodeCfg.NodeID == myID { - continue - } - if m.ConnectedTo(nodeCfg.NodeID) { - continue - } - for _, addr := range nodeCfg.Addresses { - if addr == "dynamic" { - if disc != nil { - t := disc.Lookup(nodeCfg.NodeID) - if len(t) == 0 { - continue - } - addr = t[0] //XXX: Handle all of them - } - } - - if debugNet { - dlog.Println("dial", nodeCfg.NodeID, addr) - } - conn, err := tls.Dial("tcp", addr, tlsCfg) - if err != nil { - if debugNet { - dlog.Println(err) - } - continue - } - - remoteID := certID(conn.ConnectionState().PeerCertificates[0].Raw) - if remoteID != nodeCfg.NodeID { - warnln("Unexpected nodeID", remoteID, "!=", nodeCfg.NodeID) - conn.Close() - continue - } - - protoConn := protocol.NewConnection(remoteID, conn, conn, m, connOpts) - m.AddConnection(conn, protoConn) - continue nextNode - } - } - - time.Sleep(time.Duration(cfg.Options.ReconnectIntervalS) * time.Second) - } -} - func updateLocalModel(m *Model, w *scanner.Walker) { files, _ := w.Walk() m.ReplaceLocal(files) diff --git a/cmd/syncthing/model.go b/cmd/syncthing/model.go index db6dd29d8..93ecbf724 100644 --- a/cmd/syncthing/model.go +++ b/cmd/syncthing/model.go @@ -507,7 +507,13 @@ func (m *Model) RepoID() string { func (m *Model) AddConnection(rawConn io.Closer, protoConn Connection) { nodeID := protoConn.ID() m.pmut.Lock() + if _, ok := m.protoConn[nodeID]; ok { + panic("add existing node") + } m.protoConn[nodeID] = protoConn + if _, ok := m.rawConn[nodeID]; ok { + panic("add existing node") + } m.rawConn[nodeID] = rawConn m.pmut.Unlock()