diff --git a/main.go b/main.go index 45d171493..5a243f7d8 100644 --- a/main.go +++ b/main.go @@ -282,11 +282,6 @@ func connect(myID string, addr string, nodeAddrs map[string][]string, m *Model, nc := protocol.NewConnection(nodeID, conn, conn, m) okln("Connected to node", remoteID, "(out)") m.AddNode(nc) - if opts.Debug.TraceNet { - t0 := time.Now() - nc.Ping() - timing("NET: Ping reply", t0) - } continue nextNode } } diff --git a/model.go b/model.go index 31111edf2..4e4d5028e 100644 --- a/model.go +++ b/model.go @@ -60,7 +60,7 @@ func (m *Model) Start() { func (m *Model) printStats() { for { - time.Sleep(15 * time.Second) + time.Sleep(60 * time.Second) m.RLock() for node, conn := range m.nodes { stats := conn.Statistics() @@ -223,7 +223,7 @@ func (m *Model) ReplaceLocal(fs []File) { m.recomputeGlobal() m.recomputeNeed() m.updated = time.Now().Unix() - go m.broadcastIndex() + m.broadcastIndex() } } @@ -231,10 +231,11 @@ func (m *Model) ReplaceLocal(fs []File) { func (m *Model) broadcastIndex() { idx := m.protocolIndex() for _, node := range m.nodes { + node := node if opts.Debug.TraceNet { debugf("NET IDX(out): %s: %d files", node.ID, len(idx)) } - node.Index(idx) + go node.Index(idx) } } @@ -271,7 +272,7 @@ func (m *Model) UpdateLocal(f File) { m.recomputeGlobal() m.recomputeNeed() m.updated = time.Now().Unix() - go m.broadcastIndex() + m.broadcastIndex() } } @@ -400,5 +401,8 @@ func (m *Model) AddNode(node *protocol.Connection) { if opts.Debug.TraceNet { debugf("NET IDX(out): %s: %d files", node.ID, len(idx)) } - node.Index(idx) + + // Sending the index might take a while if we have many files and a slow + // uplink. Return from AddNode in the meantime. + go node.Index(idx) } diff --git a/protocol/protocol.go b/protocol/protocol.go index c350b7e1b..a34f5eb21 100644 --- a/protocol/protocol.go +++ b/protocol/protocol.go @@ -42,13 +42,13 @@ type Model interface { } type Connection struct { + sync.RWMutex ID string receiver Model reader io.Reader mreader *marshalReader writer io.Writer mwriter *marshalWriter - wLock sync.RWMutex closed bool awaiting map[int]chan asyncResult nextId int @@ -94,12 +94,12 @@ func NewConnection(nodeID string, reader io.Reader, writer io.Writer, receiver M // Index writes the list of file information to the connected peer node func (c *Connection) Index(idx []FileInfo) { - c.wLock.Lock() + c.Lock() c.mwriter.writeHeader(header{0, c.nextId, messageTypeIndex}) c.mwriter.writeIndex(idx) err := c.flush() c.nextId = (c.nextId + 1) & 0xfff - c.wLock.Unlock() + c.Unlock() if err != nil || c.mwriter.err != nil { c.close() return @@ -108,24 +108,24 @@ func (c *Connection) Index(idx []FileInfo) { // Request returns the bytes for the specified block after fetching them from the connected peer. func (c *Connection) Request(name string, offset uint64, size uint32, hash []byte) ([]byte, error) { - c.wLock.Lock() + c.Lock() rc := make(chan asyncResult) c.awaiting[c.nextId] = rc c.mwriter.writeHeader(header{0, c.nextId, messageTypeRequest}) c.mwriter.writeRequest(request{name, offset, size, hash}) if c.mwriter.err != nil { - c.wLock.Unlock() + c.Unlock() c.close() return nil, c.mwriter.err } err := c.flush() if err != nil { - c.wLock.Unlock() + c.Unlock() c.close() return nil, err } c.nextId = (c.nextId + 1) & 0xfff - c.wLock.Unlock() + c.Unlock() res, ok := <-rc if !ok { @@ -134,22 +134,23 @@ func (c *Connection) Request(name string, offset uint64, size uint32, hash []byt return res.val, res.err } -func (c *Connection) Ping() bool { - c.wLock.Lock() +func (c *Connection) Ping() (time.Duration, bool) { + c.Lock() rc := make(chan asyncResult) c.awaiting[c.nextId] = rc + t0 := time.Now() c.mwriter.writeHeader(header{0, c.nextId, messageTypePing}) err := c.flush() if err != nil || c.mwriter.err != nil { - c.wLock.Unlock() + c.Unlock() c.close() - return false + return 0, false } c.nextId = (c.nextId + 1) & 0xfff - c.wLock.Unlock() + c.Unlock() _, ok := <-rc - return ok + return time.Since(t0), ok } func (c *Connection) Stop() { @@ -167,9 +168,9 @@ func (c *Connection) flush() error { } func (c *Connection) close() { - c.wLock.Lock() + c.Lock() if c.closed { - c.wLock.Unlock() + c.Unlock() return } c.closed = true @@ -177,14 +178,14 @@ func (c *Connection) close() { close(ch) } c.awaiting = nil - c.wLock.Unlock() + c.Unlock() c.receiver.Close(c.ID) } func (c *Connection) isClosed() bool { - c.wLock.RLock() - defer c.wLock.RUnlock() + c.RLock() + defer c.RUnlock() return c.closed } @@ -201,9 +202,9 @@ func (c *Connection) readerLoop() { break } - c.wLock.Lock() + c.Lock() c.lastReceive = time.Now() - c.wLock.Unlock() + c.Unlock() switch hdr.msgType { case messageTypeIndex: @@ -226,41 +227,41 @@ func (c *Connection) readerLoop() { if c.mreader.err != nil { c.close() } else { - c.wLock.RLock() + c.RLock() rc, ok := c.awaiting[hdr.msgID] - c.wLock.RUnlock() + c.RUnlock() if ok { rc <- asyncResult{data, c.mreader.err} close(rc) - c.wLock.Lock() + c.Lock() delete(c.awaiting, hdr.msgID) - c.wLock.Unlock() + c.Unlock() } } case messageTypePing: - c.wLock.Lock() + c.Lock() c.mwriter.writeUint32(encodeHeader(header{0, hdr.msgID, messageTypePong})) err := c.flush() - c.wLock.Unlock() + c.Unlock() if err != nil || c.mwriter.err != nil { c.close() } case messageTypePong: - c.wLock.RLock() + c.RLock() rc, ok := c.awaiting[hdr.msgID] - c.wLock.RUnlock() + c.RUnlock() if ok { rc <- asyncResult{} close(rc) - c.wLock.Lock() + c.Lock() delete(c.awaiting, hdr.msgID) - c.wLock.Unlock() + c.Unlock() } default: @@ -277,11 +278,11 @@ func (c *Connection) processRequest(msgID int) { } else { go func() { data, _ := c.receiver.Request(c.ID, req.name, req.offset, req.size, req.hash) - c.wLock.Lock() + c.Lock() c.mwriter.writeUint32(encodeHeader(header{0, msgID, messageTypeResponse})) c.mwriter.writeResponse(data) err := c.flush() - c.wLock.Unlock() + c.Unlock() buffers.Put(data) if c.mwriter.err != nil || err != nil { c.close() @@ -291,23 +292,24 @@ func (c *Connection) processRequest(msgID int) { } func (c *Connection) pingerLoop() { - var rc = make(chan time.Duration) + var rc = make(chan time.Duration, 1) for !c.isClosed() { - c.wLock.RLock() + c.RLock() lr := c.lastReceive - c.wLock.RUnlock() + c.RUnlock() if time.Since(lr) > pingIdleTime { go func() { - t0 := time.Now() - c.Ping() - rc <- time.Since(t0) + t, ok := c.Ping() + if ok { + rc <- t + } }() select { case lat := <-rc: - c.wLock.Lock() + c.Lock() c.peerLatency = (c.peerLatency + lat) / 2 - c.wLock.Unlock() + c.Unlock() case <-time.After(pingTimeout): c.close() } @@ -326,8 +328,8 @@ type Statistics struct { } func (c *Connection) Statistics() Statistics { - c.wLock.Lock() - defer c.wLock.Unlock() + c.Lock() + defer c.Unlock() secs := time.Since(c.lastStatistics.At).Seconds() stats := Statistics{ diff --git a/protocol/protocol_test.go b/protocol/protocol_test.go index 83ca69135..1b3c8c8c1 100644 --- a/protocol/protocol_test.go +++ b/protocol/protocol_test.go @@ -46,10 +46,10 @@ func TestPing(t *testing.T) { c0 := NewConnection("c0", ar, bw, nil) c1 := NewConnection("c1", br, aw, nil) - if !c0.Ping() { + if _, ok := c0.Ping(); !ok { t.Error("c0 ping failed") } - if !c1.Ping() { + if _, ok := c1.Ping(); !ok { t.Error("c1 ping failed") } } @@ -70,7 +70,7 @@ func TestPingErr(t *testing.T) { c0 := NewConnection("c0", ar, ebw, m0) NewConnection("c1", br, eaw, m1) - res := c0.Ping() + _, res := c0.Ping() if (i < 4 || j < 4) && res { t.Errorf("Unexpected ping success; i=%d, j=%d", i, j) } else if (i >= 8 && j >= 8) && !res {