From f407ff8861b91d996131c20eca54a967decaac30 Mon Sep 17 00:00:00 2001 From: Jakob Borg Date: Thu, 20 Aug 2015 14:02:52 +0200 Subject: [PATCH] Improve status reporter --- cmd/relaysrv/protocol_listener.go | 11 +++++-- cmd/relaysrv/session.go | 13 ++++---- cmd/relaysrv/status.go | 53 +++++++++++++++++++++++++++++++ 3 files changed, 68 insertions(+), 9 deletions(-) diff --git a/cmd/relaysrv/protocol_listener.go b/cmd/relaysrv/protocol_listener.go index a7243ff69..0ff0154d9 100644 --- a/cmd/relaysrv/protocol_listener.go +++ b/cmd/relaysrv/protocol_listener.go @@ -7,6 +7,7 @@ import ( "log" "net" "sync" + "sync/atomic" "time" syncthingprotocol "github.com/syncthing/protocol" @@ -15,8 +16,9 @@ import ( ) var ( - outboxesMut = sync.RWMutex{} - outboxes = make(map[syncthingprotocol.DeviceID]chan interface{}) + outboxesMut = sync.RWMutex{} + outboxes = make(map[syncthingprotocol.DeviceID]chan interface{}) + numConnections int64 ) func protocolListener(addr string, config *tls.Config) { @@ -122,7 +124,7 @@ func protocolConnectionHandler(tcpConn net.Conn, config *tls.Config) { outboxesMut.RUnlock() if !ok { if debug { - log.Println(id, "is looking", requestedPeer, "which does not exist") + log.Println(id, "is looking for", requestedPeer, "which does not exist") } protocol.WriteMessage(conn, protocol.ResponseNotFound) conn.Close() @@ -219,6 +221,9 @@ func protocolConnectionHandler(tcpConn net.Conn, config *tls.Config) { } func messageReader(conn net.Conn, messages chan<- interface{}, errors chan<- error) { + atomic.AddInt64(&numConnections, 1) + defer atomic.AddInt64(&numConnections, -1) + for { msg, err := protocol.ReadMessage(conn) if err != nil { diff --git a/cmd/relaysrv/session.go b/cmd/relaysrv/session.go index a189ab403..c94cdc2a0 100644 --- a/cmd/relaysrv/session.go +++ b/cmd/relaysrv/session.go @@ -110,30 +110,31 @@ func (s *session) Serve() { close(s.conns) if debug { - log.Println("Session", s, "starting between", conns[0].RemoteAddr(), conns[1].RemoteAddr()) + log.Println("Session", s, "starting between", conns[0].RemoteAddr(), "and", conns[1].RemoteAddr()) } wg := sync.WaitGroup{} wg.Add(2) - errors := make(chan error, 2) - + var err0 error go func() { - errors <- s.proxy(conns[0], conns[1]) + err0 = s.proxy(conns[0], conns[1]) wg.Done() }() + var err1 error go func() { - errors <- s.proxy(conns[1], conns[0]) + err1 = s.proxy(conns[1], conns[0]) wg.Done() }() wg.Wait() if debug { - log.Println("Session", s, "ended, outcomes:", <-errors, <-errors) + log.Println("Session", s, "ended, outcomes:", err0, "and", err1) } goto done + case <-timedout: if debug { log.Println("Session", s, "timed out") diff --git a/cmd/relaysrv/status.go b/cmd/relaysrv/status.go index d0391e17d..53cea572c 100644 --- a/cmd/relaysrv/status.go +++ b/cmd/relaysrv/status.go @@ -6,9 +6,14 @@ import ( "net/http" "runtime" "sync/atomic" + "time" ) +var rc *rateCalculator + func statusService(addr string) { + rc = newRateCalculator(360, 10*time.Second, &bytesProxied) + http.HandleFunc("/status", getStatus) if err := http.ListenAndServe(addr, nil); err != nil { log.Fatal(err) @@ -21,12 +26,21 @@ func getStatus(w http.ResponseWriter, r *http.Request) { sessionMut.Lock() status["numSessions"] = len(sessions) sessionMut.Unlock() + status["numConnections"] = atomic.LoadInt64(&numConnections) status["numProxies"] = atomic.LoadInt64(&numProxies) status["bytesProxied"] = atomic.LoadInt64(&bytesProxied) status["goVersion"] = runtime.Version() status["goOS"] = runtime.GOOS status["goAarch"] = runtime.GOARCH status["goMaxProcs"] = runtime.GOMAXPROCS(-1) + status["kbps10s1m5m15m30m60m"] = []int64{ + rc.rate(10/10) * 8 / 1000, + rc.rate(60/10) * 8 / 1000, + rc.rate(5*60/10) * 8 / 1000, + rc.rate(15*60/10) * 8 / 1000, + rc.rate(30*60/10) * 8 / 1000, + rc.rate(60*60/10) * 8 / 1000, + } bs, err := json.MarshalIndent(status, "", " ") if err != nil { @@ -37,3 +51,42 @@ func getStatus(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") w.Write(bs) } + +type rateCalculator struct { + rates []int64 + prev int64 + counter *int64 +} + +func newRateCalculator(keepIntervals int, interval time.Duration, counter *int64) *rateCalculator { + r := &rateCalculator{ + rates: make([]int64, keepIntervals), + counter: counter, + } + + go r.updateRates(interval) + + return r +} + +func (r *rateCalculator) updateRates(interval time.Duration) { + for { + now := time.Now() + next := now.Truncate(interval).Add(interval) + time.Sleep(next.Sub(now)) + + cur := atomic.LoadInt64(r.counter) + rate := int64(float64(cur-r.prev) / interval.Seconds()) + copy(r.rates[1:], r.rates) + r.rates[0] = rate + r.prev = cur + } +} + +func (r *rateCalculator) rate(periods int) int64 { + var tot int64 + for i := 0; i < periods; i++ { + tot += r.rates[i] + } + return tot / int64(periods) +}