Merge pull request #25 from syncthing/fixes

Fix ALL THE BUGS
This commit is contained in:
Jakob Borg 2015-12-07 13:36:49 +01:00
commit ad418abf91
4 changed files with 113 additions and 10 deletions

View File

@ -9,7 +9,7 @@ To get it, run `go get github.com/syncthing/relaysrv` or download the
[latest build](http://build.syncthing.net/job/relaysrv/lastSuccessfulBuild/artifact/) [latest build](http://build.syncthing.net/job/relaysrv/lastSuccessfulBuild/artifact/)
from the build server. from the build server.
:exclamation:Warnings:exclamation: - Read or regret :exclamation:Warnings:exclamation: - Read or regret
----- -----
By default, all relay servers will join the default public relay pool, which means that the relay server will be availble for public use, and **will consume your bandwidth** helping others to connect. By default, all relay servers will join the default public relay pool, which means that the relay server will be availble for public use, and **will consume your bandwidth** helping others to connect.
@ -27,7 +27,7 @@ Make sure you have a public IP with port 22067 open, or make sure you have port-
Run the `relaysrv` with no arguments (or `-debug` if you want more output), and that should be enough for the server to join the public relay pool. Run the `relaysrv` with no arguments (or `-debug` if you want more output), and that should be enough for the server to join the public relay pool.
You should see a message saying: You should see a message saying:
``` ```
2015/09/21 22:45:46 pool.go:60: Joined https://relays.syncthing.net rejoining in 48m0s 2015/09/21 22:45:46 pool.go:60: Joined https://relays.syncthing.net/endpoint rejoining in 48m0s
``` ```
See `relaysrv -help` for other options, such as rate limits, timeout intervals, etc. See `relaysrv -help` for other options, such as rate limits, timeout intervals, etc.
@ -60,8 +60,8 @@ See `relaysrv -help` for other options, such as rate limits, timeout intervals,
Other items available in this repo Other items available in this repo
---- ----
##### testutil ##### testutil
A test utility which can be used to test connectivity of a relay server. A test utility which can be used to test connectivity of a relay server.
You need to generate two x509 key pairs (key.pem and cert.pem), one for the client, another one for the server, in separate directories. You need to generate two x509 key pairs (key.pem and cert.pem), one for the client, another one for the server, in separate directories.
Afterwards, start the client: Afterwards, start the client:
```bash ```bash
@ -81,10 +81,17 @@ In the other terminal run the following:
Which should then give you an interactive prompt, where you can type things in one terminal, and they get relayed to the other terminal. Which should then give you an interactive prompt, where you can type things in one terminal, and they get relayed to the other terminal.
##### client Relay related libraries used by this repo
----
##### Relay protocol definition.
A client library which is used by `syncthing` [Available here](https://github.com/syncthing/syncthing/tree/master/lib/relay/protocol)
##### Relay client
Only used by the testutil.
[Available here](https://github.com/syncthing/syncthing/tree/master/lib/relay/client)
##### protocol
Go files which define the protocol and it's messages, and a few utility functions which make it easier to consume.

View File

@ -107,6 +107,16 @@ func protocolConnectionHandler(tcpConn net.Conn, config *tls.Config) {
switch msg := message.(type) { switch msg := message.(type) {
case protocol.JoinRelayRequest: case protocol.JoinRelayRequest:
if atomic.LoadInt32(&overLimit) > 0 {
protocol.WriteMessage(conn, protocol.RelayFull{})
if debug {
log.Println("Refusing join request from", id, "due to being over limits")
}
conn.Close()
limitCheckTimer.Reset(time.Second)
continue
}
outboxesMut.RLock() outboxesMut.RLock()
_, ok := outboxes[id] _, ok := outboxes[id]
outboxesMut.RUnlock() outboxesMut.RUnlock()
@ -223,6 +233,16 @@ func protocolConnectionHandler(tcpConn net.Conn, config *tls.Config) {
conn.Close() conn.Close()
} }
if atomic.LoadInt32(&overLimit) > 0 && !hasSessions(id) {
if debug {
log.Println("Dropping", id, "as it has no sessions and we are over our limits")
}
protocol.WriteMessage(conn, protocol.RelayFull{})
conn.Close()
limitCheckTimer.Reset(time.Second)
}
case <-timeoutTicker.C: case <-timeoutTicker.C:
// We should receive a error from the reader loop, which will cause // We should receive a error from the reader loop, which will cause
// us to quit this loop. // us to quit this loop.
@ -232,6 +252,10 @@ func protocolConnectionHandler(tcpConn net.Conn, config *tls.Config) {
conn.Close() conn.Close()
case msg := <-outbox: case msg := <-outbox:
if msg == nil {
conn.Close()
return
}
if debug { if debug {
log.Printf("Sending message %T to %s", msg, id) log.Printf("Sending message %T to %s", msg, id)
} }

View File

@ -9,11 +9,17 @@ import (
"log" "log"
"net" "net"
"net/url" "net/url"
"os"
"os/signal"
"path/filepath" "path/filepath"
"runtime"
"strings" "strings"
"sync/atomic"
"syscall"
"time" "time"
"github.com/juju/ratelimit" "github.com/juju/ratelimit"
"github.com/syncthing/syncthing/lib/osutil"
"github.com/syncthing/syncthing/lib/relay/protocol" "github.com/syncthing/syncthing/lib/relay/protocol"
"github.com/syncthing/syncthing/lib/tlsutil" "github.com/syncthing/syncthing/lib/tlsutil"
@ -31,8 +37,12 @@ var (
pingInterval time.Duration = time.Minute pingInterval time.Duration = time.Minute
messageTimeout time.Duration = time.Minute messageTimeout time.Duration = time.Minute
limitCheckTimer *time.Timer
sessionLimitBps int sessionLimitBps int
globalLimitBps int globalLimitBps int
overLimit int32
descriptorLimit int64
sessionLimiter *ratelimit.Bucket sessionLimiter *ratelimit.Bucket
globalLimiter *ratelimit.Bucket globalLimiter *ratelimit.Bucket
@ -72,6 +82,17 @@ func main() {
log.Fatal(err) log.Fatal(err)
} }
maxDescriptors, err := osutil.MaximizeOpenFileLimit()
if maxDescriptors > 0 {
// Assume that 20% of FD's are leaked/unaccounted for.
descriptorLimit = int64(maxDescriptors*80) / 100
log.Println("Connection limit", descriptorLimit)
go monitorLimits()
} else if err != nil && runtime.GOOS != "windows" {
log.Println("Assuming no connection limit, due to error retrievign rlimits:", err)
}
sessionAddress = addr.IP[:] sessionAddress = addr.IP[:]
sessionPort = uint16(addr.Port) sessionPort = uint16(addr.Port)
@ -140,5 +161,43 @@ func main() {
} }
} }
listener(listen, tlsCfg) go listener(listen, tlsCfg)
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
<-sigs
// Gracefully close all connections, hoping that clients will be faster
// to realize that the relay is now gone.
sessionMut.RLock()
for _, session := range activeSessions {
session.CloseConns()
}
for _, session := range pendingSessions {
session.CloseConns()
}
sessionMut.RUnlock()
outboxesMut.RLock()
for _, outbox := range outboxes {
close(outbox)
}
outboxesMut.RUnlock()
time.Sleep(500 * time.Millisecond)
}
func monitorLimits() {
limitCheckTimer = time.NewTimer(time.Minute)
for range limitCheckTimer.C {
if atomic.LoadInt64(&numConnections)+atomic.LoadInt64(&numProxies) > descriptorLimit {
atomic.StoreInt32(&overLimit, 1)
log.Println("Gone past our connection limits. Starting to refuse new/drop idle connections.")
} else if atomic.CompareAndSwapInt32(&overLimit, 1, 0) {
log.Println("Dropped below our connection limits. Accepting new connections.")
}
limitCheckTimer.Reset(time.Minute)
}
} }

View File

@ -86,6 +86,19 @@ func dropSessions(id syncthingprotocol.DeviceID) {
sessionMut.RUnlock() sessionMut.RUnlock()
} }
func hasSessions(id syncthingprotocol.DeviceID) bool {
sessionMut.RLock()
has := false
for _, session := range activeSessions {
if session.HasParticipant(id) {
has = true
break
}
}
sessionMut.RUnlock()
return has
}
type session struct { type session struct {
mut sync.Mutex mut sync.Mutex
@ -127,7 +140,7 @@ func (s *session) Serve() {
s.mut.Lock() s.mut.Lock()
s.conns = append(s.conns, conn) s.conns = append(s.conns, conn)
s.mut.Unlock() s.mut.Unlock()
// We're the only ones mutating% s.conns, hence we are free to read it. // We're the only ones mutating s.conns, hence we are free to read it.
if len(s.conns) < 2 { if len(s.conns) < 2 {
continue continue
} }