Atomic connection stats updates

This commit is contained in:
Jakob Borg 2013-12-30 09:53:54 -05:00
parent bceacf04ca
commit 005b207737
3 changed files with 39 additions and 26 deletions

View File

@ -3,6 +3,7 @@ package protocol
import ( import (
"errors" "errors"
"io" "io"
"sync/atomic"
"github.com/calmh/syncthing/buffers" "github.com/calmh/syncthing/buffers"
) )
@ -19,7 +20,7 @@ var padBytes = []byte{0, 0, 0}
type marshalWriter struct { type marshalWriter struct {
w io.Writer w io.Writer
tot int tot uint64
err error err error
b [8]byte b [8]byte
} }
@ -51,7 +52,7 @@ func (w *marshalWriter) writeBytes(bs []byte) {
if p := pad(len(bs)); w.err == nil && p > 0 { if p := pad(len(bs)); w.err == nil && p > 0 {
_, w.err = w.w.Write(padBytes[:p]) _, w.err = w.w.Write(padBytes[:p])
} }
w.tot += len(bs) + pad(len(bs)) atomic.AddUint64(&w.tot, uint64(len(bs)+pad(len(bs))))
} }
func (w *marshalWriter) writeUint32(v uint32) { func (w *marshalWriter) writeUint32(v uint32) {
@ -63,7 +64,7 @@ func (w *marshalWriter) writeUint32(v uint32) {
w.b[2] = byte(v >> 8) w.b[2] = byte(v >> 8)
w.b[3] = byte(v) w.b[3] = byte(v)
_, w.err = w.w.Write(w.b[:4]) _, w.err = w.w.Write(w.b[:4])
w.tot += 4 atomic.AddUint64(&w.tot, 4)
} }
func (w *marshalWriter) writeUint64(v uint64) { func (w *marshalWriter) writeUint64(v uint64) {
@ -79,12 +80,16 @@ func (w *marshalWriter) writeUint64(v uint64) {
w.b[6] = byte(v >> 8) w.b[6] = byte(v >> 8)
w.b[7] = byte(v) w.b[7] = byte(v)
_, w.err = w.w.Write(w.b[:8]) _, w.err = w.w.Write(w.b[:8])
w.tot += 8 atomic.AddUint64(&w.tot, 8)
}
func (w *marshalWriter) getTot() uint64 {
return atomic.LoadUint64(&w.tot)
} }
type marshalReader struct { type marshalReader struct {
r io.Reader r io.Reader
tot int tot uint64
err error err error
b [8]byte b [8]byte
} }
@ -109,7 +114,7 @@ func (r *marshalReader) readBytes() []byte {
} }
b := buffers.Get(l + pad(l)) b := buffers.Get(l + pad(l))
_, r.err = io.ReadFull(r.r, b) _, r.err = io.ReadFull(r.r, b)
r.tot += int(l + pad(l)) atomic.AddUint64(&r.tot, uint64(l+pad(l)))
return b[:l] return b[:l]
} }
@ -118,7 +123,7 @@ func (r *marshalReader) readUint32() uint32 {
return 0 return 0
} }
_, r.err = io.ReadFull(r.r, r.b[:4]) _, r.err = io.ReadFull(r.r, r.b[:4])
r.tot += 4 atomic.AddUint64(&r.tot, 8)
return uint32(r.b[3]) | uint32(r.b[2])<<8 | uint32(r.b[1])<<16 | uint32(r.b[0])<<24 return uint32(r.b[3]) | uint32(r.b[2])<<8 | uint32(r.b[1])<<16 | uint32(r.b[0])<<24
} }
@ -127,7 +132,11 @@ func (r *marshalReader) readUint64() uint64 {
return 0 return 0
} }
_, r.err = io.ReadFull(r.r, r.b[:8]) _, r.err = io.ReadFull(r.r, r.b[:8])
r.tot += 8 atomic.AddUint64(&r.tot, 8)
return uint64(r.b[7]) | uint64(r.b[6])<<8 | uint64(r.b[5])<<16 | uint64(r.b[4])<<24 | return uint64(r.b[7]) | uint64(r.b[6])<<8 | uint64(r.b[5])<<16 | uint64(r.b[4])<<24 |
uint64(r.b[3])<<32 | uint64(r.b[2])<<40 | uint64(r.b[1])<<48 | uint64(r.b[0])<<56 uint64(r.b[3])<<32 | uint64(r.b[2])<<40 | uint64(r.b[1])<<48 | uint64(r.b[0])<<56
} }
func (r *marshalReader) getTot() uint64 {
return atomic.LoadUint64(&r.tot)
}

View File

@ -50,7 +50,7 @@ func (w *marshalWriter) writeIndex(idx []FileInfo) {
func WriteIndex(w io.Writer, idx []FileInfo) (int, error) { func WriteIndex(w io.Writer, idx []FileInfo) (int, error) {
mw := marshalWriter{w: w} mw := marshalWriter{w: w}
mw.writeIndex(idx) mw.writeIndex(idx)
return mw.tot, mw.err return int(mw.getTot()), mw.err
} }
func (w *marshalWriter) writeRequest(r request) { func (w *marshalWriter) writeRequest(r request) {

View File

@ -46,18 +46,20 @@ type Model interface {
type Connection struct { type Connection struct {
sync.RWMutex sync.RWMutex
ID string ID string
receiver Model receiver Model
reader io.Reader reader io.Reader
mreader *marshalReader mreader *marshalReader
writer io.Writer writer io.Writer
mwriter *marshalWriter mwriter *marshalWriter
closed bool closed bool
awaiting map[int]chan asyncResult awaiting map[int]chan asyncResult
nextId int nextId int
peerLatency time.Duration peerLatency time.Duration
indexSent map[string]int64
lastStatistics Statistics lastStatistics Statistics
indexSent map[string]int64 statisticsLock sync.Mutex
lastReceive time.Time lastReceive time.Time
lastReceiveLock sync.RWMutex lastReceiveLock sync.RWMutex
@ -371,16 +373,18 @@ type Statistics struct {
} }
func (c *Connection) Statistics() Statistics { func (c *Connection) Statistics() Statistics {
c.Lock() c.statisticsLock.Lock()
defer c.Unlock() defer c.statisticsLock.Unlock()
secs := time.Since(c.lastStatistics.At).Seconds() secs := time.Since(c.lastStatistics.At).Seconds()
rt := int(c.mreader.getTot())
wt := int(c.mwriter.getTot())
stats := Statistics{ stats := Statistics{
At: time.Now(), At: time.Now(),
InBytesTotal: c.mreader.tot, InBytesTotal: rt,
InBytesPerSec: int(float64(c.mreader.tot-c.lastStatistics.InBytesTotal) / secs), InBytesPerSec: int(float64(rt-c.lastStatistics.InBytesTotal) / secs),
OutBytesTotal: c.mwriter.tot, OutBytesTotal: wt,
OutBytesPerSec: int(float64(c.mwriter.tot-c.lastStatistics.OutBytesTotal) / secs), OutBytesPerSec: int(float64(wt-c.lastStatistics.OutBytesTotal) / secs),
Latency: c.peerLatency, Latency: c.peerLatency,
} }
c.lastStatistics = stats c.lastStatistics = stats