Don't ping timeout during long transfers (fixes #280)

This commit is contained in:
Jakob Borg 2014-05-28 12:30:47 +02:00
parent 9f76c87880
commit b44016ff70
3 changed files with 49 additions and 11 deletions

View File

@ -92,8 +92,8 @@ type asyncResult struct {
} }
const ( const (
pingTimeout = 4 * time.Minute pingTimeout = 300 * time.Second
pingIdleTime = 5 * time.Minute pingIdleTime = 600 * time.Second
) )
func NewConnection(nodeID string, reader io.Reader, writer io.Writer, receiver Model) Connection { func NewConnection(nodeID string, reader io.Reader, writer io.Writer, receiver Model) Connection {
@ -476,11 +476,27 @@ func (c *rawConnection) pingerLoop() {
for { for {
select { select {
case <-ticker: case <-ticker:
if d := time.Since(c.xr.LastRead()); d < pingIdleTime {
if debug {
l.Debugln(c.id, "ping skipped after rd", d)
}
continue
}
if d := time.Since(c.xw.LastWrite()); d < pingIdleTime {
if debug {
l.Debugln(c.id, "ping skipped after wr", d)
}
continue
}
go func() { go func() {
if debug {
l.Debugln(c.id, "ping ->")
}
rc <- c.ping() rc <- c.ping()
}() }()
select { select {
case ok := <-rc: case ok := <-rc:
l.Debugln(c.id, "<- pong")
if !ok { if !ok {
c.close(fmt.Errorf("ping failure")) c.close(fmt.Errorf("ping failure"))
} }

View File

@ -3,15 +3,17 @@ package xdr
import ( import (
"errors" "errors"
"io" "io"
"time"
) )
var ErrElementSizeExceeded = errors.New("element size exceeded") var ErrElementSizeExceeded = errors.New("element size exceeded")
type Reader struct { type Reader struct {
r io.Reader r io.Reader
tot int tot int
err error err error
b [8]byte b [8]byte
last time.Time
} }
func NewReader(r io.Reader) *Reader { func NewReader(r io.Reader) *Reader {
@ -44,6 +46,7 @@ func (r *Reader) ReadBytesMaxInto(max int, dst []byte) []byte {
if r.err != nil { if r.err != nil {
return nil return nil
} }
r.last = time.Now()
l := int(r.ReadUint32()) l := int(r.ReadUint32())
if r.err != nil { if r.err != nil {
return nil return nil
@ -74,6 +77,7 @@ func (r *Reader) ReadUint16() uint16 {
if r.err != nil { if r.err != nil {
return 0 return 0
} }
r.last = time.Now()
_, r.err = io.ReadFull(r.r, r.b[:4]) _, r.err = io.ReadFull(r.r, r.b[:4])
r.tot += 4 r.tot += 4
v := uint16(r.b[1]) | uint16(r.b[0])<<8 v := uint16(r.b[1]) | uint16(r.b[0])<<8
@ -88,6 +92,7 @@ func (r *Reader) ReadUint32() uint32 {
if r.err != nil { if r.err != nil {
return 0 return 0
} }
r.last = time.Now()
n, r.err = io.ReadFull(r.r, r.b[:4]) n, r.err = io.ReadFull(r.r, r.b[:4])
if n < 4 { if n < 4 {
return 0 return 0
@ -105,6 +110,7 @@ func (r *Reader) ReadUint64() uint64 {
if r.err != nil { if r.err != nil {
return 0 return 0
} }
r.last = time.Now()
n, r.err = io.ReadFull(r.r, r.b[:8]) n, r.err = io.ReadFull(r.r, r.b[:8])
r.tot += n r.tot += n
v := uint64(r.b[7]) | uint64(r.b[6])<<8 | uint64(r.b[5])<<16 | uint64(r.b[4])<<24 | v := uint64(r.b[7]) | uint64(r.b[6])<<8 | uint64(r.b[5])<<16 | uint64(r.b[4])<<24 |
@ -122,3 +128,7 @@ func (r *Reader) Tot() int {
func (r *Reader) Error() error { func (r *Reader) Error() error {
return r.err return r.err
} }
func (r *Reader) LastRead() time.Time {
return r.last
}

View File

@ -1,6 +1,9 @@
package xdr package xdr
import "io" import (
"io"
"time"
)
func pad(l int) int { func pad(l int) int {
d := l % 4 d := l % 4
@ -13,10 +16,11 @@ func pad(l int) int {
var padBytes = []byte{0, 0, 0} var padBytes = []byte{0, 0, 0}
type Writer struct { type Writer struct {
w io.Writer w io.Writer
tot int tot int
err error err error
b [8]byte b [8]byte
last time.Time
} }
func NewWriter(w io.Writer) *Writer { func NewWriter(w io.Writer) *Writer {
@ -34,6 +38,7 @@ func (w *Writer) WriteBytes(bs []byte) (int, error) {
return 0, w.err return 0, w.err
} }
w.last = time.Now()
w.WriteUint32(uint32(len(bs))) w.WriteUint32(uint32(len(bs)))
if w.err != nil { if w.err != nil {
return 0, w.err return 0, w.err
@ -65,6 +70,7 @@ func (w *Writer) WriteUint16(v uint16) (int, error) {
return 0, w.err return 0, w.err
} }
w.last = time.Now()
if debug { if debug {
dl.Debugf("wr uint16=%d", v) dl.Debugf("wr uint16=%d", v)
} }
@ -85,6 +91,7 @@ func (w *Writer) WriteUint32(v uint32) (int, error) {
return 0, w.err return 0, w.err
} }
w.last = time.Now()
if debug { if debug {
dl.Debugf("wr uint32=%d", v) dl.Debugf("wr uint32=%d", v)
} }
@ -105,6 +112,7 @@ func (w *Writer) WriteUint64(v uint64) (int, error) {
return 0, w.err return 0, w.err
} }
w.last = time.Now()
if debug { if debug {
dl.Debugf("wr uint64=%d", v) dl.Debugf("wr uint64=%d", v)
} }
@ -131,3 +139,7 @@ func (w *Writer) Tot() int {
func (w *Writer) Error() error { func (w *Writer) Error() error {
return w.err return w.err
} }
func (w *Writer) LastWrite() time.Time {
return w.last
}