From 746d52930db77f8506111b108c9b533ac6341703 Mon Sep 17 00:00:00 2001 From: Jakob Borg Date: Mon, 23 Dec 2013 12:12:44 -0500 Subject: [PATCH] Report transfer stats --- discover/discover.go | 1 - main.go | 4 +-- model.go | 31 ++++++++++++++++++++ protocol/protocol.go | 68 +++++++++++++++++++++++++++++++------------- 4 files changed, 81 insertions(+), 23 deletions(-) diff --git a/discover/discover.go b/discover/discover.go index 1aa067f15..b0a816c8c 100644 --- a/discover/discover.go +++ b/discover/discover.go @@ -229,7 +229,6 @@ func (d *Discoverer) externalLookup(node string) (string, bool) { log.Printf("discover/external: %v; no external lookup", err) return "", false } - log.Println("query", extIP) go func() { var buf = make([]byte, 1024) diff --git a/main.go b/main.go index 9c2f34329..45d171493 100644 --- a/main.go +++ b/main.go @@ -162,7 +162,7 @@ func main() { // XXX: Should use some fsnotify mechanism. go func() { for { - time.Sleep(time.Duration(opts.ScanInterval) * time.Second) + time.Sleep(opts.ScanInterval) updateLocalModel(m) } }() @@ -291,7 +291,7 @@ func connect(myID string, addr string, nodeAddrs map[string][]string, m *Model, } } - time.Sleep(time.Duration(opts.ConnInterval) * time.Second) + time.Sleep(opts.ConnInterval) } } diff --git a/model.go b/model.go index 16705659d..31111edf2 100644 --- a/model.go +++ b/model.go @@ -14,6 +14,7 @@ TODO(jb): Keep global and per node transfer and performance statistics. */ import ( + "fmt" "os" "path" "sync" @@ -49,6 +50,7 @@ func NewModel(dir string) *Model { nodes: make(map[string]*protocol.Connection), } + go m.printStats() return m } @@ -56,6 +58,35 @@ func (m *Model) Start() { go m.puller() } +func (m *Model) printStats() { + for { + time.Sleep(15 * time.Second) + m.RLock() + for node, conn := range m.nodes { + stats := conn.Statistics() + if (stats.InBytesPerSec > 0 || stats.OutBytesPerSec > 0) && stats.Latency > 0 { + infof("%s: %sB/s in, %sB/s out, %0.02f ms", node, toSI(stats.InBytesPerSec), toSI(stats.OutBytesPerSec), stats.Latency.Seconds()*1000) + } else if stats.InBytesPerSec > 0 || stats.OutBytesPerSec > 0 { + infof("%s: %sB/s in, %sB/s out", node, toSI(stats.InBytesPerSec), toSI(stats.OutBytesPerSec)) + } + } + m.RUnlock() + } +} + +func toSI(n int) string { + if n > 1<<30 { + return fmt.Sprintf("%.02f G", float64(n)/(1<<30)) + } + if n > 1<<20 { + return fmt.Sprintf("%.02f M", float64(n)/(1<<20)) + } + if n > 1<<10 { + return fmt.Sprintf("%.01f K", float64(n)/(1<<10)) + } + return fmt.Sprintf("%d ", n) +} + func (m *Model) Index(nodeID string, fs []protocol.FileInfo) { m.Lock() defer m.Unlock() diff --git a/protocol/protocol.go b/protocol/protocol.go index 2cd57135c..c350b7e1b 100644 --- a/protocol/protocol.go +++ b/protocol/protocol.go @@ -42,18 +42,19 @@ type Model interface { } type Connection struct { - ID string - receiver Model - reader io.Reader - mreader *marshalReader - writer io.Writer - mwriter *marshalWriter - wLock sync.RWMutex - closed bool - awaiting map[int]chan asyncResult - nextId int - lastReceive time.Time - peerLatency time.Duration + ID string + receiver Model + reader io.Reader + mreader *marshalReader + writer io.Writer + mwriter *marshalWriter + wLock sync.RWMutex + closed bool + awaiting map[int]chan asyncResult + nextId int + lastReceive time.Time + peerLatency time.Duration + lastStatistics Statistics } var ErrClosed = errors.New("Connection closed") @@ -74,14 +75,15 @@ func NewConnection(nodeID string, reader io.Reader, writer io.Writer, receiver M } c := Connection{ - receiver: receiver, - reader: flrd, - mreader: &marshalReader{flrd, 0, nil}, - writer: flwr, - mwriter: &marshalWriter{flwr, 0, nil}, - awaiting: make(map[int]chan asyncResult), - lastReceive: time.Now(), - ID: nodeID, + receiver: receiver, + reader: flrd, + mreader: &marshalReader{flrd, 0, nil}, + writer: flwr, + mwriter: &marshalWriter{flwr, 0, nil}, + awaiting: make(map[int]chan asyncResult), + lastReceive: time.Now(), + ID: nodeID, + lastStatistics: Statistics{At: time.Now()}, } go c.readerLoop() @@ -313,3 +315,29 @@ func (c *Connection) pingerLoop() { time.Sleep(time.Second) } } + +type Statistics struct { + At time.Time + InBytesTotal int + InBytesPerSec int + OutBytesTotal int + OutBytesPerSec int + Latency time.Duration +} + +func (c *Connection) Statistics() Statistics { + c.wLock.Lock() + defer c.wLock.Unlock() + + secs := time.Since(c.lastStatistics.At).Seconds() + stats := Statistics{ + At: time.Now(), + InBytesTotal: c.mreader.tot, + InBytesPerSec: int(float64(c.mreader.tot-c.lastStatistics.InBytesTotal) / secs), + OutBytesTotal: c.mwriter.tot, + OutBytesPerSec: int(float64(c.mwriter.tot-c.lastStatistics.OutBytesTotal) / secs), + Latency: c.peerLatency, + } + c.lastStatistics = stats + return stats +}