Prevent zombie connections due to simultaneous connect

This commit is contained in:
Jakob Borg 2014-03-23 08:45:05 +01:00
parent 804cce7ba0
commit 589244f39e
2 changed files with 90 additions and 85 deletions

View File

@ -250,20 +250,12 @@ func main() {
"clusterHash": clusterHash(cfg.Repositories[0].Nodes), "clusterHash": clusterHash(cfg.Repositories[0].Nodes),
} }
// Routine to listen for incoming connections
if verbose {
infoln("Listening for incoming connections")
}
for _, addr := range cfg.Options.ListenAddress {
go listen(myID, addr, m, tlsCfg, connOpts)
}
// Routine to connect out to configured nodes // Routine to connect out to configured nodes
if verbose { if verbose {
infoln("Attempting to connect to other nodes") infoln("Attempting to connect to other nodes")
} }
disc := discovery() disc := discovery()
go connect(myID, disc, m, tlsCfg, connOpts) go listenConnect(myID, disc, m, tlsCfg, connOpts)
// Routine to pull blocks from other nodes to synchronize the local // Routine to pull blocks from other nodes to synchronize the local
// repository. Does not run when we are in read only (publish only) mode. // repository. Does not run when we are in read only (publish only) mode.
@ -401,50 +393,106 @@ func printStatsLoop(m *Model) {
} }
} }
func listen(myID string, addr string, m *Model, tlsCfg *tls.Config, connOpts map[string]string) { func listenConnect(myID string, disc *discover.Discoverer, m *Model, tlsCfg *tls.Config, connOpts map[string]string) {
if debugNet { var conns = make(chan *tls.Conn)
dlog.Println("listening on", addr)
// Listen
for _, addr := range cfg.Options.ListenAddress {
addr := addr
go func() {
if debugNet {
dlog.Println("listening on", addr)
}
l, err := tls.Listen("tcp", addr, tlsCfg)
fatalErr(err)
for {
conn, err := l.Accept()
if err != nil {
warnln(err)
continue
}
if debugNet {
dlog.Println("connect from", conn.RemoteAddr())
}
tc := conn.(*tls.Conn)
err = tc.Handshake()
if err != nil {
warnln(err)
tc.Close()
continue
}
conns <- tc
}
}()
} }
l, err := tls.Listen("tcp", addr, tlsCfg)
fatalErr(err)
listen: // Connect
for { go func() {
conn, err := l.Accept() for {
if err != nil { nextNode:
warnln(err) for _, nodeCfg := range cfg.Repositories[0].Nodes {
continue if nodeCfg.NodeID == myID {
continue
}
if m.ConnectedTo(nodeCfg.NodeID) {
continue
}
for _, addr := range nodeCfg.Addresses {
if addr == "dynamic" {
if disc != nil {
t := disc.Lookup(nodeCfg.NodeID)
if len(t) == 0 {
continue
}
addr = t[0] //XXX: Handle all of them
}
}
if debugNet {
dlog.Println("dial", nodeCfg.NodeID, addr)
}
conn, err := tls.Dial("tcp", addr, tlsCfg)
if err != nil {
if debugNet {
dlog.Println(err)
}
continue
}
conns <- conn
continue nextNode
}
}
time.Sleep(time.Duration(cfg.Options.ReconnectIntervalS) * time.Second)
} }
}()
if debugNet { next:
dlog.Println("connect from", conn.RemoteAddr()) for conn := range conns {
} remoteID := certID(conn.ConnectionState().PeerCertificates[0].Raw)
tc := conn.(*tls.Conn)
err = tc.Handshake()
if err != nil {
warnln(err)
tc.Close()
continue
}
remoteID := certID(tc.ConnectionState().PeerCertificates[0].Raw)
if remoteID == myID { if remoteID == myID {
warnf("Connect from myself (%s) - should not happen", remoteID) warnf("Connected to myself (%s) - should not happen", remoteID)
conn.Close() conn.Close()
continue continue
} }
if m.ConnectedTo(remoteID) { if m.ConnectedTo(remoteID) {
warnf("Connect from connected node (%s)", remoteID) warnf("Connected to already connected node (%s)", remoteID)
conn.Close()
continue
} }
for _, nodeCfg := range cfg.Repositories[0].Nodes { for _, nodeCfg := range cfg.Repositories[0].Nodes {
if nodeCfg.NodeID == remoteID { if nodeCfg.NodeID == remoteID {
protoConn := protocol.NewConnection(remoteID, conn, conn, m, connOpts) protoConn := protocol.NewConnection(remoteID, conn, conn, m, connOpts)
m.AddConnection(conn, protoConn) m.AddConnection(conn, protoConn)
continue listen continue next
} }
} }
conn.Close() conn.Close()
@ -473,55 +521,6 @@ func discovery() *discover.Discoverer {
return disc return disc
} }
func connect(myID string, disc *discover.Discoverer, m *Model, tlsCfg *tls.Config, connOpts map[string]string) {
for {
nextNode:
for _, nodeCfg := range cfg.Repositories[0].Nodes {
if nodeCfg.NodeID == myID {
continue
}
if m.ConnectedTo(nodeCfg.NodeID) {
continue
}
for _, addr := range nodeCfg.Addresses {
if addr == "dynamic" {
if disc != nil {
t := disc.Lookup(nodeCfg.NodeID)
if len(t) == 0 {
continue
}
addr = t[0] //XXX: Handle all of them
}
}
if debugNet {
dlog.Println("dial", nodeCfg.NodeID, addr)
}
conn, err := tls.Dial("tcp", addr, tlsCfg)
if err != nil {
if debugNet {
dlog.Println(err)
}
continue
}
remoteID := certID(conn.ConnectionState().PeerCertificates[0].Raw)
if remoteID != nodeCfg.NodeID {
warnln("Unexpected nodeID", remoteID, "!=", nodeCfg.NodeID)
conn.Close()
continue
}
protoConn := protocol.NewConnection(remoteID, conn, conn, m, connOpts)
m.AddConnection(conn, protoConn)
continue nextNode
}
}
time.Sleep(time.Duration(cfg.Options.ReconnectIntervalS) * time.Second)
}
}
func updateLocalModel(m *Model, w *scanner.Walker) { func updateLocalModel(m *Model, w *scanner.Walker) {
files, _ := w.Walk() files, _ := w.Walk()
m.ReplaceLocal(files) m.ReplaceLocal(files)

View File

@ -507,7 +507,13 @@ func (m *Model) RepoID() string {
func (m *Model) AddConnection(rawConn io.Closer, protoConn Connection) { func (m *Model) AddConnection(rawConn io.Closer, protoConn Connection) {
nodeID := protoConn.ID() nodeID := protoConn.ID()
m.pmut.Lock() m.pmut.Lock()
if _, ok := m.protoConn[nodeID]; ok {
panic("add existing node")
}
m.protoConn[nodeID] = protoConn m.protoConn[nodeID] = protoConn
if _, ok := m.rawConn[nodeID]; ok {
panic("add existing node")
}
m.rawConn[nodeID] = rawConn m.rawConn[nodeID] = rawConn
m.pmut.Unlock() m.pmut.Unlock()