From bef9ccfa713695f3b75f41cda5ae3887980aa97d Mon Sep 17 00:00:00 2001 From: Jakob Borg Date: Sun, 15 Dec 2013 16:19:45 +0100 Subject: [PATCH] Do ping check after 5 minute inactivity --- protocol/protocol.go | 59 +++++++++++++++++++++++++++++++++++--------- 1 file changed, 48 insertions(+), 11 deletions(-) diff --git a/protocol/protocol.go b/protocol/protocol.go index 20a605f93..2c35b8453 100644 --- a/protocol/protocol.go +++ b/protocol/protocol.go @@ -5,6 +5,7 @@ import ( "errors" "io" "sync" + "time" "github.com/calmh/syncthing/buffers" ) @@ -40,17 +41,19 @@ type Model interface { } type Connection struct { - receiver Model - reader io.Reader - mreader *marshalReader - writer io.Writer - mwriter *marshalWriter - wLock sync.RWMutex - closed bool - closedLock sync.RWMutex - awaiting map[int]chan asyncResult - nextId int - ID string + ID string + receiver Model + reader io.Reader + mreader *marshalReader + writer io.Writer + mwriter *marshalWriter + wLock sync.RWMutex + closed bool + closedLock sync.RWMutex + awaiting map[int]chan asyncResult + nextId int + lastReceive time.Time + peerLatency time.Duration } var ErrClosed = errors.New("Connection closed") @@ -60,6 +63,9 @@ type asyncResult struct { err error } +const pingTimeout = 30 * time.Second +const pingIdleTime = 5 * time.Minute + func NewConnection(nodeID string, reader io.Reader, writer io.Writer, receiver Model) *Connection { flrd := flate.NewReader(reader) flwr, err := flate.NewWriter(writer, flate.BestSpeed) @@ -78,6 +84,7 @@ func NewConnection(nodeID string, reader io.Reader, writer io.Writer, receiver M } go c.readerLoop() + go c.pingerLoop() return &c } @@ -166,6 +173,10 @@ func (c *Connection) readerLoop() { break } + c.wLock.Lock() + c.lastReceive = time.Now() + c.wLock.Unlock() + switch hdr.msgType { case messageTypeIndex: files := c.mreader.readIndex() @@ -237,3 +248,29 @@ func (c *Connection) processRequest(msgID int) { }() } } + +func (c *Connection) pingerLoop() { + var rc = make(chan time.Duration) + for !c.isClosed() { + c.wLock.RLock() + lr := c.lastReceive + c.wLock.RUnlock() + + if time.Since(lr) > pingIdleTime { + go func() { + t0 := time.Now() + c.Ping() + rc <- time.Since(t0) + }() + select { + case lat := <-rc: + c.wLock.Lock() + c.peerLatency = (c.peerLatency + lat) / 2 + c.wLock.Unlock() + case <-time.After(pingTimeout): + c.close() + } + } + time.Sleep(time.Second) + } +}