diff --git a/main.go b/main.go index c49b7652f..f2054f4fa 100644 --- a/main.go +++ b/main.go @@ -215,9 +215,7 @@ listen: for nodeID := range nodeAddrs { if nodeID == remoteID { - nc := protocol.NewConnection(remoteID, conn, conn, m) - m.AddNode(nc) - infoln("Connected to node", remoteID, "(in)") + m.AddConnection(conn, remoteID) continue listen } } @@ -286,9 +284,7 @@ func connect(myID string, addr string, nodeAddrs map[string][]string, m *Model, continue } - nc := protocol.NewConnection(nodeID, conn, conn, m) - m.AddNode(nc) - infoln("Connected to node", remoteID, "(out)") + m.AddConnection(conn, remoteID) continue nextNode } } diff --git a/model.go b/model.go index 0a8ab068f..dacaa0cf5 100644 --- a/model.go +++ b/model.go @@ -13,6 +13,7 @@ acquire locks, but document what locks they require. import ( "fmt" + "io" "os" "path" "sync" @@ -26,11 +27,12 @@ type Model struct { sync.RWMutex dir string - global map[string]File // the latest version of each file as it exists in the cluster - local map[string]File // the files we currently have locally on disk - remote map[string]map[string]File - need map[string]bool // the files we need to update - nodes map[string]*protocol.Connection + global map[string]File // the latest version of each file as it exists in the cluster + local map[string]File // the files we currently have locally on disk + remote map[string]map[string]File + need map[string]bool // the files we need to update + nodes map[string]*protocol.Connection + rawConn map[string]io.ReadWriteCloser updatedLocal int64 // timestamp of last update to local updateGlobal int64 // timestamp of last update to remote @@ -54,6 +56,7 @@ func NewModel(dir string) *Model { remote: make(map[string]map[string]File), need: make(map[string]bool), nodes: make(map[string]*protocol.Connection), + rawConn: make(map[string]io.ReadWriteCloser), lastIdxBcast: time.Now(), } @@ -192,6 +195,12 @@ func (m *Model) Close(node string, err error) { m.Lock() defer m.Unlock() + conn, ok := m.rawConn[node] + if !ok { + return + } + conn.Close() + if err != nil { warnf("Disconnected from node %s: %v", node, err) } else { @@ -200,6 +209,7 @@ func (m *Model) Close(node string, err error) { delete(m.remote, node) delete(m.nodes, node) + delete(m.rawConn, node) m.recomputeGlobal() m.recomputeNeed() @@ -460,24 +470,24 @@ func (m *Model) protocolIndex() []protocol.FileInfo { return index } -func (m *Model) AddNode(node *protocol.Connection) { +func (m *Model) AddConnection(conn io.ReadWriteCloser, nodeID string) { + node := protocol.NewConnection(nodeID, conn, conn, m) + m.Lock() - m.nodes[node.ID] = node + m.nodes[nodeID] = node + m.rawConn[nodeID] = conn m.Unlock() + + infoln("Connected to node", nodeID) + m.RLock() idx := m.protocolIndex() m.RUnlock() - for i := 0; i < len(idx); i += 1000 { - s := i + 1000 - if s > len(idx) { - s = len(idx) - } - if opts.Debug.TraceNet { - debugf("NET IDX(out/add): %s: %d:%d", node.ID, i, s) - } - node.Index(idx[i:s]) - } + go func() { + node.Index(idx) + infoln("Sent initial index to node", nodeID) + }() } func fileFromFileInfo(f protocol.FileInfo) File { diff --git a/model_puller.go b/model_puller.go index 7cbc10c75..c79f9868e 100644 --- a/model_puller.go +++ b/model_puller.go @@ -129,7 +129,6 @@ func (m *Model) pullFile(name string) error { } func (m *Model) puller() { - for { time.Sleep(time.Second) diff --git a/protocol/protocol.go b/protocol/protocol.go index 2ee21b6ff..9fc2bb463 100644 --- a/protocol/protocol.go +++ b/protocol/protocol.go @@ -57,6 +57,9 @@ type Connection struct { nextId int indexSent map[string]int64 + hasSentIndex bool + hasRecvdIndex bool + lastStatistics Statistics statisticsLock sync.Mutex } @@ -126,7 +129,9 @@ func (c *Connection) Index(idx []FileInfo) { c.mwriter.writeIndex(idx) err := c.flush() c.nextId = (c.nextId + 1) & 0xfff + c.hasSentIndex = true c.Unlock() + if err != nil { c.Close(err) return @@ -252,6 +257,9 @@ loop: } else { c.receiver.Index(c.ID, files) } + c.Lock() + c.hasRecvdIndex = true + c.Unlock() case messageTypeIndexUpdate: files := c.mreader.readIndex() @@ -343,16 +351,23 @@ func (c *Connection) pingerLoop() { var rc = make(chan bool, 1) for { time.Sleep(pingIdleTime / 2) - go func() { - rc <- c.Ping() - }() - select { - case ok := <-rc: - if !ok { - c.Close(fmt.Errorf("Ping failure")) + + c.RLock() + ready := c.hasRecvdIndex && c.hasSentIndex + c.RUnlock() + + if ready { + go func() { + rc <- c.Ping() + }() + select { + case ok := <-rc: + if !ok { + c.Close(fmt.Errorf("Ping failure")) + } + case <-time.After(pingTimeout): + c.Close(fmt.Errorf("Ping timeout")) } - case <-time.After(pingTimeout): - c.Close(fmt.Errorf("Ping timeout")) } } }