From 20b925abeceee6407f5acd5b42b419ebcb73254f Mon Sep 17 00:00:00 2001 From: Audrius Butkevicius Date: Sat, 21 Nov 2015 00:08:09 +0000 Subject: [PATCH] Limit number of connections (fixes #23) --- cmd/relaysrv/listener.go | 20 ++++++++++++++++++++ cmd/relaysrv/main.go | 31 +++++++++++++++++++++++++++++++ cmd/relaysrv/session.go | 15 ++++++++++++++- 3 files changed, 65 insertions(+), 1 deletion(-) diff --git a/cmd/relaysrv/listener.go b/cmd/relaysrv/listener.go index 842f61a99..1e6610f23 100644 --- a/cmd/relaysrv/listener.go +++ b/cmd/relaysrv/listener.go @@ -107,6 +107,16 @@ func protocolConnectionHandler(tcpConn net.Conn, config *tls.Config) { switch msg := message.(type) { case protocol.JoinRelayRequest: + if atomic.LoadInt32(&overLimit) > 0 { + protocol.WriteMessage(conn, protocol.RelayFull{}) + if debug { + log.Println("Refusing join request from", id, "due to being over limits") + } + conn.Close() + limitCheckTimer.Reset(time.Second) + continue + } + outboxesMut.RLock() _, ok := outboxes[id] outboxesMut.RUnlock() @@ -223,6 +233,16 @@ func protocolConnectionHandler(tcpConn net.Conn, config *tls.Config) { conn.Close() } + if atomic.LoadInt32(&overLimit) > 0 && !hasSessions(id) { + if debug { + log.Println("Dropping", id, "as it has no sessions and we are over our limits") + } + protocol.WriteMessage(conn, protocol.RelayFull{}) + conn.Close() + + limitCheckTimer.Reset(time.Second) + } + case <-timeoutTicker.C: // We should receive a error from the reader loop, which will cause // us to quit this loop. diff --git a/cmd/relaysrv/main.go b/cmd/relaysrv/main.go index 2acd267d0..023401474 100644 --- a/cmd/relaysrv/main.go +++ b/cmd/relaysrv/main.go @@ -10,10 +10,13 @@ import ( "net" "net/url" "path/filepath" + "runtime" "strings" + "sync/atomic" "time" "github.com/juju/ratelimit" + "github.com/syncthing/syncthing/lib/osutil" "github.com/syncthing/syncthing/lib/relay/protocol" "github.com/syncthing/syncthing/lib/tlsutil" @@ -31,8 +34,12 @@ var ( pingInterval time.Duration = time.Minute messageTimeout time.Duration = time.Minute + limitCheckTimer *time.Timer + sessionLimitBps int globalLimitBps int + overLimit int32 + descriptorLimit int64 sessionLimiter *ratelimit.Bucket globalLimiter *ratelimit.Bucket @@ -72,6 +79,17 @@ func main() { log.Fatal(err) } + maxDescriptors, err := osutil.MaximizeOpenFileLimit() + if maxDescriptors > 0 { + // Assume that 20% of FD's are leaked/unaccounted for. + descriptorLimit = int64(maxDescriptors*80) / 100 + log.Println("Connection limit", descriptorLimit) + + go monitorLimits() + } else if err != nil && runtime.GOOS != "windows" { + log.Println("Assuming no connection limit, due to error retrievign rlimits:", err) + } + sessionAddress = addr.IP[:] sessionPort = uint16(addr.Port) @@ -142,3 +160,16 @@ func main() { listener(listen, tlsCfg) } + +func monitorLimits() { + limitCheckTimer = time.NewTimer(time.Minute) + for range limitCheckTimer.C { + if atomic.LoadInt64(&numConnections)+atomic.LoadInt64(&numProxies) > descriptorLimit { + atomic.StoreInt32(&overLimit, 1) + log.Println("Gone past our connection limits. Starting to refuse new/drop idle connections.") + } else if atomic.CompareAndSwapInt32(&overLimit, 1, 0) { + log.Println("Dropped below our connection limits. Accepting new connections.") + } + limitCheckTimer.Reset(time.Minute) + } +} diff --git a/cmd/relaysrv/session.go b/cmd/relaysrv/session.go index 37879f5b2..bf035ef3b 100644 --- a/cmd/relaysrv/session.go +++ b/cmd/relaysrv/session.go @@ -86,6 +86,19 @@ func dropSessions(id syncthingprotocol.DeviceID) { sessionMut.RUnlock() } +func hasSessions(id syncthingprotocol.DeviceID) bool { + sessionMut.RLock() + has := false + for _, session := range activeSessions { + if session.HasParticipant(id) { + has = true + break + } + } + sessionMut.RUnlock() + return has +} + type session struct { mut sync.Mutex @@ -127,7 +140,7 @@ func (s *session) Serve() { s.mut.Lock() s.conns = append(s.conns, conn) s.mut.Unlock() - // We're the only ones mutating% s.conns, hence we are free to read it. + // We're the only ones mutating s.conns, hence we are free to read it. if len(s.conns) < 2 { continue }