diff --git a/cmd/relaysrv/README.md b/cmd/relaysrv/README.md index cfe73c805..355306508 100644 --- a/cmd/relaysrv/README.md +++ b/cmd/relaysrv/README.md @@ -9,7 +9,7 @@ To get it, run `go get github.com/syncthing/relaysrv` or download the [latest build](http://build.syncthing.net/job/relaysrv/lastSuccessfulBuild/artifact/) from the build server. -:exclamation:Warnings:exclamation: - Read or regret +:exclamation:Warnings:exclamation: - Read or regret ----- By default, all relay servers will join the default public relay pool, which means that the relay server will be availble for public use, and **will consume your bandwidth** helping others to connect. @@ -27,7 +27,7 @@ Make sure you have a public IP with port 22067 open, or make sure you have port- Run the `relaysrv` with no arguments (or `-debug` if you want more output), and that should be enough for the server to join the public relay pool. You should see a message saying: ``` -2015/09/21 22:45:46 pool.go:60: Joined https://relays.syncthing.net rejoining in 48m0s +2015/09/21 22:45:46 pool.go:60: Joined https://relays.syncthing.net/endpoint rejoining in 48m0s ``` See `relaysrv -help` for other options, such as rate limits, timeout intervals, etc. @@ -60,8 +60,8 @@ See `relaysrv -help` for other options, such as rate limits, timeout intervals, Other items available in this repo ---- -##### testutil -A test utility which can be used to test connectivity of a relay server. +##### testutil +A test utility which can be used to test connectivity of a relay server. You need to generate two x509 key pairs (key.pem and cert.pem), one for the client, another one for the server, in separate directories. Afterwards, start the client: ```bash @@ -81,10 +81,17 @@ In the other terminal run the following: Which should then give you an interactive prompt, where you can type things in one terminal, and they get relayed to the other terminal. -##### client +Relay related libraries used by this repo +---- +##### Relay protocol definition. -A client library which is used by `syncthing` +[Available here](https://github.com/syncthing/syncthing/tree/master/lib/relay/protocol) + + +##### Relay client + +Only used by the testutil. + +[Available here](https://github.com/syncthing/syncthing/tree/master/lib/relay/client) -##### protocol -Go files which define the protocol and it's messages, and a few utility functions which make it easier to consume. diff --git a/cmd/relaysrv/listener.go b/cmd/relaysrv/listener.go index 842f61a99..f73fa04c4 100644 --- a/cmd/relaysrv/listener.go +++ b/cmd/relaysrv/listener.go @@ -107,6 +107,16 @@ func protocolConnectionHandler(tcpConn net.Conn, config *tls.Config) { switch msg := message.(type) { case protocol.JoinRelayRequest: + if atomic.LoadInt32(&overLimit) > 0 { + protocol.WriteMessage(conn, protocol.RelayFull{}) + if debug { + log.Println("Refusing join request from", id, "due to being over limits") + } + conn.Close() + limitCheckTimer.Reset(time.Second) + continue + } + outboxesMut.RLock() _, ok := outboxes[id] outboxesMut.RUnlock() @@ -223,6 +233,16 @@ func protocolConnectionHandler(tcpConn net.Conn, config *tls.Config) { conn.Close() } + if atomic.LoadInt32(&overLimit) > 0 && !hasSessions(id) { + if debug { + log.Println("Dropping", id, "as it has no sessions and we are over our limits") + } + protocol.WriteMessage(conn, protocol.RelayFull{}) + conn.Close() + + limitCheckTimer.Reset(time.Second) + } + case <-timeoutTicker.C: // We should receive a error from the reader loop, which will cause // us to quit this loop. @@ -232,6 +252,10 @@ func protocolConnectionHandler(tcpConn net.Conn, config *tls.Config) { conn.Close() case msg := <-outbox: + if msg == nil { + conn.Close() + return + } if debug { log.Printf("Sending message %T to %s", msg, id) } diff --git a/cmd/relaysrv/main.go b/cmd/relaysrv/main.go index 2acd267d0..66d452ca8 100644 --- a/cmd/relaysrv/main.go +++ b/cmd/relaysrv/main.go @@ -9,11 +9,17 @@ import ( "log" "net" "net/url" + "os" + "os/signal" "path/filepath" + "runtime" "strings" + "sync/atomic" + "syscall" "time" "github.com/juju/ratelimit" + "github.com/syncthing/syncthing/lib/osutil" "github.com/syncthing/syncthing/lib/relay/protocol" "github.com/syncthing/syncthing/lib/tlsutil" @@ -31,8 +37,12 @@ var ( pingInterval time.Duration = time.Minute messageTimeout time.Duration = time.Minute + limitCheckTimer *time.Timer + sessionLimitBps int globalLimitBps int + overLimit int32 + descriptorLimit int64 sessionLimiter *ratelimit.Bucket globalLimiter *ratelimit.Bucket @@ -72,6 +82,17 @@ func main() { log.Fatal(err) } + maxDescriptors, err := osutil.MaximizeOpenFileLimit() + if maxDescriptors > 0 { + // Assume that 20% of FD's are leaked/unaccounted for. + descriptorLimit = int64(maxDescriptors*80) / 100 + log.Println("Connection limit", descriptorLimit) + + go monitorLimits() + } else if err != nil && runtime.GOOS != "windows" { + log.Println("Assuming no connection limit, due to error retrievign rlimits:", err) + } + sessionAddress = addr.IP[:] sessionPort = uint16(addr.Port) @@ -140,5 +161,43 @@ func main() { } } - listener(listen, tlsCfg) + go listener(listen, tlsCfg) + + sigs := make(chan os.Signal, 1) + signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) + <-sigs + + // Gracefully close all connections, hoping that clients will be faster + // to realize that the relay is now gone. + + sessionMut.RLock() + for _, session := range activeSessions { + session.CloseConns() + } + + for _, session := range pendingSessions { + session.CloseConns() + } + sessionMut.RUnlock() + + outboxesMut.RLock() + for _, outbox := range outboxes { + close(outbox) + } + outboxesMut.RUnlock() + + time.Sleep(500 * time.Millisecond) +} + +func monitorLimits() { + limitCheckTimer = time.NewTimer(time.Minute) + for range limitCheckTimer.C { + if atomic.LoadInt64(&numConnections)+atomic.LoadInt64(&numProxies) > descriptorLimit { + atomic.StoreInt32(&overLimit, 1) + log.Println("Gone past our connection limits. Starting to refuse new/drop idle connections.") + } else if atomic.CompareAndSwapInt32(&overLimit, 1, 0) { + log.Println("Dropped below our connection limits. Accepting new connections.") + } + limitCheckTimer.Reset(time.Minute) + } } diff --git a/cmd/relaysrv/session.go b/cmd/relaysrv/session.go index 37879f5b2..bf035ef3b 100644 --- a/cmd/relaysrv/session.go +++ b/cmd/relaysrv/session.go @@ -86,6 +86,19 @@ func dropSessions(id syncthingprotocol.DeviceID) { sessionMut.RUnlock() } +func hasSessions(id syncthingprotocol.DeviceID) bool { + sessionMut.RLock() + has := false + for _, session := range activeSessions { + if session.HasParticipant(id) { + has = true + break + } + } + sessionMut.RUnlock() + return has +} + type session struct { mut sync.Mutex @@ -127,7 +140,7 @@ func (s *session) Serve() { s.mut.Lock() s.conns = append(s.conns, conn) s.mut.Unlock() - // We're the only ones mutating% s.conns, hence we are free to read it. + // We're the only ones mutating s.conns, hence we are free to read it. if len(s.conns) < 2 { continue }