diff --git a/protocol/marshal.go b/protocol/marshal.go index 728b81845..c1bce9270 100644 --- a/protocol/marshal.go +++ b/protocol/marshal.go @@ -3,6 +3,7 @@ package protocol import ( "errors" "io" + "sync/atomic" "github.com/calmh/syncthing/buffers" ) @@ -19,7 +20,7 @@ var padBytes = []byte{0, 0, 0} type marshalWriter struct { w io.Writer - tot int + tot uint64 err error b [8]byte } @@ -51,7 +52,7 @@ func (w *marshalWriter) writeBytes(bs []byte) { if p := pad(len(bs)); w.err == nil && p > 0 { _, w.err = w.w.Write(padBytes[:p]) } - w.tot += len(bs) + pad(len(bs)) + atomic.AddUint64(&w.tot, uint64(len(bs)+pad(len(bs)))) } func (w *marshalWriter) writeUint32(v uint32) { @@ -63,7 +64,7 @@ func (w *marshalWriter) writeUint32(v uint32) { w.b[2] = byte(v >> 8) w.b[3] = byte(v) _, w.err = w.w.Write(w.b[:4]) - w.tot += 4 + atomic.AddUint64(&w.tot, 4) } func (w *marshalWriter) writeUint64(v uint64) { @@ -79,12 +80,16 @@ func (w *marshalWriter) writeUint64(v uint64) { w.b[6] = byte(v >> 8) w.b[7] = byte(v) _, w.err = w.w.Write(w.b[:8]) - w.tot += 8 + atomic.AddUint64(&w.tot, 8) +} + +func (w *marshalWriter) getTot() uint64 { + return atomic.LoadUint64(&w.tot) } type marshalReader struct { r io.Reader - tot int + tot uint64 err error b [8]byte } @@ -109,7 +114,7 @@ func (r *marshalReader) readBytes() []byte { } b := buffers.Get(l + pad(l)) _, r.err = io.ReadFull(r.r, b) - r.tot += int(l + pad(l)) + atomic.AddUint64(&r.tot, uint64(l+pad(l))) return b[:l] } @@ -118,7 +123,7 @@ func (r *marshalReader) readUint32() uint32 { return 0 } _, r.err = io.ReadFull(r.r, r.b[:4]) - r.tot += 4 + atomic.AddUint64(&r.tot, 8) return uint32(r.b[3]) | uint32(r.b[2])<<8 | uint32(r.b[1])<<16 | uint32(r.b[0])<<24 } @@ -127,7 +132,11 @@ func (r *marshalReader) readUint64() uint64 { return 0 } _, r.err = io.ReadFull(r.r, r.b[:8]) - r.tot += 8 + atomic.AddUint64(&r.tot, 8) return uint64(r.b[7]) | uint64(r.b[6])<<8 | uint64(r.b[5])<<16 | uint64(r.b[4])<<24 | uint64(r.b[3])<<32 | uint64(r.b[2])<<40 | uint64(r.b[1])<<48 | uint64(r.b[0])<<56 } + +func (r *marshalReader) getTot() uint64 { + return atomic.LoadUint64(&r.tot) +} diff --git a/protocol/messages.go b/protocol/messages.go index 3328bddc5..9b273a222 100644 --- a/protocol/messages.go +++ b/protocol/messages.go @@ -50,7 +50,7 @@ func (w *marshalWriter) writeIndex(idx []FileInfo) { func WriteIndex(w io.Writer, idx []FileInfo) (int, error) { mw := marshalWriter{w: w} mw.writeIndex(idx) - return mw.tot, mw.err + return int(mw.getTot()), mw.err } func (w *marshalWriter) writeRequest(r request) { diff --git a/protocol/protocol.go b/protocol/protocol.go index d659cb849..cc8f6e776 100644 --- a/protocol/protocol.go +++ b/protocol/protocol.go @@ -46,18 +46,20 @@ type Model interface { type Connection struct { sync.RWMutex - ID string - receiver Model - reader io.Reader - mreader *marshalReader - writer io.Writer - mwriter *marshalWriter - closed bool - awaiting map[int]chan asyncResult - nextId int - peerLatency time.Duration + ID string + receiver Model + reader io.Reader + mreader *marshalReader + writer io.Writer + mwriter *marshalWriter + closed bool + awaiting map[int]chan asyncResult + nextId int + peerLatency time.Duration + indexSent map[string]int64 + lastStatistics Statistics - indexSent map[string]int64 + statisticsLock sync.Mutex lastReceive time.Time lastReceiveLock sync.RWMutex @@ -371,16 +373,18 @@ type Statistics struct { } func (c *Connection) Statistics() Statistics { - c.Lock() - defer c.Unlock() + c.statisticsLock.Lock() + defer c.statisticsLock.Unlock() secs := time.Since(c.lastStatistics.At).Seconds() + rt := int(c.mreader.getTot()) + wt := int(c.mwriter.getTot()) stats := Statistics{ At: time.Now(), - InBytesTotal: c.mreader.tot, - InBytesPerSec: int(float64(c.mreader.tot-c.lastStatistics.InBytesTotal) / secs), - OutBytesTotal: c.mwriter.tot, - OutBytesPerSec: int(float64(c.mwriter.tot-c.lastStatistics.OutBytesTotal) / secs), + InBytesTotal: rt, + InBytesPerSec: int(float64(rt-c.lastStatistics.InBytesTotal) / secs), + OutBytesTotal: wt, + OutBytesPerSec: int(float64(wt-c.lastStatistics.OutBytesTotal) / secs), Latency: c.peerLatency, } c.lastStatistics = stats