diff --git a/lib/relaysrv/.gitignore b/lib/relaysrv/.gitignore new file mode 100644 index 000000000..775c43cc2 --- /dev/null +++ b/lib/relaysrv/.gitignore @@ -0,0 +1,27 @@ +# Compiled Object files, Static and Dynamic libs (Shared Objects) +*.o +*.a +*.so + +# Folders +_obj +_test + +# Architecture specific extensions/prefixes +*.[568vq] +[568vq].out + +*.cgo1.go +*.cgo2.c +_cgo_defun.c +_cgo_gotypes.go +_cgo_export.* + +_testmain.go + +*.exe +*.test +*.prof +*.tar.gz +*.zip +relaysrv diff --git a/lib/relaysrv/CONTRIBUTORS b/lib/relaysrv/CONTRIBUTORS new file mode 100644 index 000000000..4b00258cc --- /dev/null +++ b/lib/relaysrv/CONTRIBUTORS @@ -0,0 +1 @@ +Jakob Borg diff --git a/lib/relaysrv/LICENSE b/lib/relaysrv/LICENSE new file mode 100644 index 000000000..581a17054 --- /dev/null +++ b/lib/relaysrv/LICENSE @@ -0,0 +1,22 @@ +The MIT License (MIT) + +Copyright (c) 2015 The Syncthing Project + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. + diff --git a/lib/relaysrv/README.md b/lib/relaysrv/README.md new file mode 100644 index 000000000..e88929280 --- /dev/null +++ b/lib/relaysrv/README.md @@ -0,0 +1,6 @@ +relaysrv +======== + +This is the relay server for the `syncthing` project. + +`go get github.com/syncthing/relaysrv` diff --git a/lib/relaysrv/build.sh b/lib/relaysrv/build.sh new file mode 100755 index 000000000..42c2681dd --- /dev/null +++ b/lib/relaysrv/build.sh @@ -0,0 +1,39 @@ +#!/bin/bash +set -euo pipefail +set nullglob + +echo Get dependencies +go get -d + +rm -rf relaysrv-*-* + +build() { + export GOOS="$1" + export GOARCH="$2" + target="relaysrv-$GOOS-$GOARCH" + go build -i -v -ldflags -w + mkdir "$target" + if [ -f relaysrv ] ; then + mv relaysrv "$target" + tar zcvf "$target.tar.gz" "$target" + rm -r "$target" + fi + if [ -f relaysrv.exe ] ; then + mv relaysrv.exe "$target" + zip -r "$target.zip" "$target" + rm -r "$target" + fi +} + +for goos in linux darwin windows freebsd openbsd netbsd solaris ; do + build "$goos" amd64 +done +for goos in linux windows freebsd openbsd netbsd ; do + build "$goos" 386 +done +build linux arm + +# Hack used because we run as root under Docker +if [[ ${CHOWN_USER:-} != "" ]] ; then + chown -R $CHOWN_USER . +fi diff --git a/lib/relaysrv/client/client.go b/lib/relaysrv/client/client.go new file mode 100644 index 000000000..89b16e000 --- /dev/null +++ b/lib/relaysrv/client/client.go @@ -0,0 +1,298 @@ +// Copyright (C) 2015 Audrius Butkevicius and Contributors (see the CONTRIBUTORS file). + +package client + +import ( + "crypto/tls" + "fmt" + "log" + "net" + "net/url" + "time" + + syncthingprotocol "github.com/syncthing/protocol" + "github.com/syncthing/relaysrv/protocol" + "github.com/syncthing/syncthing/lib/sync" +) + +type ProtocolClient struct { + URI *url.URL + Invitations chan protocol.SessionInvitation + + closeInvitationsOnFinish bool + + config *tls.Config + + timeout time.Duration + + stop chan struct{} + stopped chan struct{} + + conn *tls.Conn + + mut sync.RWMutex + connected bool + latency time.Duration +} + +func NewProtocolClient(uri *url.URL, certs []tls.Certificate, invitations chan protocol.SessionInvitation) *ProtocolClient { + closeInvitationsOnFinish := false + if invitations == nil { + closeInvitationsOnFinish = true + invitations = make(chan protocol.SessionInvitation) + } + + return &ProtocolClient{ + URI: uri, + Invitations: invitations, + + closeInvitationsOnFinish: closeInvitationsOnFinish, + + config: configForCerts(certs), + + timeout: time.Minute * 2, + + stop: make(chan struct{}), + stopped: make(chan struct{}), + + mut: sync.NewRWMutex(), + connected: false, + } +} + +func (c *ProtocolClient) Serve() { + c.stop = make(chan struct{}) + c.stopped = make(chan struct{}) + defer close(c.stopped) + + if err := c.connect(); err != nil { + if debug { + l.Debugln("Relay connect:", err) + } + return + } + + if debug { + l.Debugln(c, "connected", c.conn.RemoteAddr()) + } + + if err := c.join(); err != nil { + c.conn.Close() + l.Infoln("Relay join:", err) + return + } + + if err := c.conn.SetDeadline(time.Time{}); err != nil { + l.Infoln("Relay set deadline:", err) + return + } + + if debug { + l.Debugln(c, "joined", c.conn.RemoteAddr(), "via", c.conn.LocalAddr()) + } + + defer c.cleanup() + c.mut.Lock() + c.connected = true + c.mut.Unlock() + + messages := make(chan interface{}) + errors := make(chan error, 1) + + go messageReader(c.conn, messages, errors) + + timeout := time.NewTimer(c.timeout) + + for { + select { + case message := <-messages: + timeout.Reset(c.timeout) + if debug { + log.Printf("%s received message %T", c, message) + } + + switch msg := message.(type) { + case protocol.Ping: + if err := protocol.WriteMessage(c.conn, protocol.Pong{}); err != nil { + l.Infoln("Relay write:", err) + return + + } + if debug { + l.Debugln(c, "sent pong") + } + + case protocol.SessionInvitation: + ip := net.IP(msg.Address) + if len(ip) == 0 || ip.IsUnspecified() { + msg.Address = c.conn.RemoteAddr().(*net.TCPAddr).IP[:] + } + c.Invitations <- msg + + default: + l.Infoln("Relay: protocol error: unexpected message %v", msg) + return + } + + case <-c.stop: + if debug { + l.Debugln(c, "stopping") + } + return + + case err := <-errors: + l.Infoln("Relay received:", err) + return + + case <-timeout.C: + if debug { + l.Debugln(c, "timed out") + } + return + } + } +} + +func (c *ProtocolClient) Stop() { + if c.stop == nil { + return + } + + close(c.stop) + <-c.stopped +} + +func (c *ProtocolClient) StatusOK() bool { + c.mut.RLock() + con := c.connected + c.mut.RUnlock() + return con +} + +func (c *ProtocolClient) Latency() time.Duration { + c.mut.RLock() + lat := c.latency + c.mut.RUnlock() + return lat +} + +func (c *ProtocolClient) String() string { + return fmt.Sprintf("ProtocolClient@%p", c) +} + +func (c *ProtocolClient) connect() error { + if c.URI.Scheme != "relay" { + return fmt.Errorf("Unsupported relay schema:", c.URI.Scheme) + } + + t0 := time.Now() + tcpConn, err := net.Dial("tcp", c.URI.Host) + if err != nil { + return err + } + + c.mut.Lock() + c.latency = time.Since(t0) + c.mut.Unlock() + + conn := tls.Client(tcpConn, c.config) + if err = conn.Handshake(); err != nil { + return err + } + + if err := conn.SetDeadline(time.Now().Add(10 * time.Second)); err != nil { + conn.Close() + return err + } + + if err := performHandshakeAndValidation(conn, c.URI); err != nil { + conn.Close() + return err + } + + c.conn = conn + return nil +} + +func (c *ProtocolClient) cleanup() { + if c.closeInvitationsOnFinish { + close(c.Invitations) + c.Invitations = make(chan protocol.SessionInvitation) + } + + if debug { + l.Debugln(c, "cleaning up") + } + + c.mut.Lock() + c.connected = false + c.mut.Unlock() + + c.conn.Close() +} + +func (c *ProtocolClient) join() error { + if err := protocol.WriteMessage(c.conn, protocol.JoinRelayRequest{}); err != nil { + return err + } + + message, err := protocol.ReadMessage(c.conn) + if err != nil { + return err + } + + switch msg := message.(type) { + case protocol.Response: + if msg.Code != 0 { + return fmt.Errorf("Incorrect response code %d: %s", msg.Code, msg.Message) + } + + default: + return fmt.Errorf("protocol error: expecting response got %v", msg) + } + + return nil +} + +func performHandshakeAndValidation(conn *tls.Conn, uri *url.URL) error { + if err := conn.Handshake(); err != nil { + return err + } + + cs := conn.ConnectionState() + if !cs.NegotiatedProtocolIsMutual || cs.NegotiatedProtocol != protocol.ProtocolName { + return fmt.Errorf("protocol negotiation error") + } + + q := uri.Query() + relayIDs := q.Get("id") + if relayIDs != "" { + relayID, err := syncthingprotocol.DeviceIDFromString(relayIDs) + if err != nil { + return fmt.Errorf("relay address contains invalid verification id: %s", err) + } + + certs := cs.PeerCertificates + if cl := len(certs); cl != 1 { + return fmt.Errorf("unexpected certificate count: %d", cl) + } + + remoteID := syncthingprotocol.NewDeviceID(certs[0].Raw) + if remoteID != relayID { + return fmt.Errorf("relay id does not match. Expected %v got %v", relayID, remoteID) + } + } + + return nil +} + +func messageReader(conn net.Conn, messages chan<- interface{}, errors chan<- error) { + for { + msg, err := protocol.ReadMessage(conn) + if err != nil { + errors <- err + return + } + messages <- msg + } +} diff --git a/lib/relaysrv/client/debug.go b/lib/relaysrv/client/debug.go new file mode 100644 index 000000000..935e9fe62 --- /dev/null +++ b/lib/relaysrv/client/debug.go @@ -0,0 +1,15 @@ +// Copyright (C) 2015 Audrius Butkevicius and Contributors (see the CONTRIBUTORS file). + +package client + +import ( + "os" + "strings" + + "github.com/calmh/logger" +) + +var ( + debug = strings.Contains(os.Getenv("STTRACE"), "relay") || os.Getenv("STTRACE") == "all" + l = logger.DefaultLogger +) diff --git a/lib/relaysrv/client/methods.go b/lib/relaysrv/client/methods.go new file mode 100644 index 000000000..67a9a71c1 --- /dev/null +++ b/lib/relaysrv/client/methods.go @@ -0,0 +1,141 @@ +// Copyright (C) 2015 Audrius Butkevicius and Contributors (see the CONTRIBUTORS file). + +package client + +import ( + "crypto/tls" + "fmt" + "net" + "net/url" + "strconv" + "strings" + "time" + + syncthingprotocol "github.com/syncthing/protocol" + "github.com/syncthing/relaysrv/protocol" +) + +func GetInvitationFromRelay(uri *url.URL, id syncthingprotocol.DeviceID, certs []tls.Certificate) (protocol.SessionInvitation, error) { + if uri.Scheme != "relay" { + return protocol.SessionInvitation{}, fmt.Errorf("Unsupported relay scheme:", uri.Scheme) + } + + conn, err := tls.Dial("tcp", uri.Host, configForCerts(certs)) + if err != nil { + return protocol.SessionInvitation{}, err + } + conn.SetDeadline(time.Now().Add(10 * time.Second)) + + if err := performHandshakeAndValidation(conn, uri); err != nil { + return protocol.SessionInvitation{}, err + } + + defer conn.Close() + + request := protocol.ConnectRequest{ + ID: id[:], + } + + if err := protocol.WriteMessage(conn, request); err != nil { + return protocol.SessionInvitation{}, err + } + + message, err := protocol.ReadMessage(conn) + if err != nil { + return protocol.SessionInvitation{}, err + } + + switch msg := message.(type) { + case protocol.Response: + return protocol.SessionInvitation{}, fmt.Errorf("Incorrect response code %d: %s", msg.Code, msg.Message) + case protocol.SessionInvitation: + if debug { + l.Debugln("Received invitation", msg, "via", conn.LocalAddr()) + } + ip := net.IP(msg.Address) + if len(ip) == 0 || ip.IsUnspecified() { + msg.Address = conn.RemoteAddr().(*net.TCPAddr).IP[:] + } + return msg, nil + default: + return protocol.SessionInvitation{}, fmt.Errorf("protocol error: unexpected message %v", msg) + } +} + +func JoinSession(invitation protocol.SessionInvitation) (net.Conn, error) { + addr := net.JoinHostPort(net.IP(invitation.Address).String(), strconv.Itoa(int(invitation.Port))) + + conn, err := net.Dial("tcp", addr) + if err != nil { + return nil, err + } + + request := protocol.JoinSessionRequest{ + Key: invitation.Key, + } + + conn.SetDeadline(time.Now().Add(10 * time.Second)) + err = protocol.WriteMessage(conn, request) + if err != nil { + return nil, err + } + + message, err := protocol.ReadMessage(conn) + if err != nil { + return nil, err + } + + conn.SetDeadline(time.Time{}) + + switch msg := message.(type) { + case protocol.Response: + if msg.Code != 0 { + return nil, fmt.Errorf("Incorrect response code %d: %s", msg.Code, msg.Message) + } + return conn, nil + default: + return nil, fmt.Errorf("protocol error: expecting response got %v", msg) + } +} + +func TestRelay(uri *url.URL, certs []tls.Certificate, sleep time.Duration, times int) bool { + id := syncthingprotocol.NewDeviceID(certs[0].Certificate[0]) + invs := make(chan protocol.SessionInvitation, 1) + c := NewProtocolClient(uri, certs, invs) + go c.Serve() + defer func() { + close(invs) + c.Stop() + }() + + for i := 0; i < times; i++ { + _, err := GetInvitationFromRelay(uri, id, certs) + if err == nil { + return true + } + if !strings.Contains(err.Error(), "Incorrect response code") { + return false + } + time.Sleep(sleep) + } + return false +} + +func configForCerts(certs []tls.Certificate) *tls.Config { + return &tls.Config{ + Certificates: certs, + NextProtos: []string{protocol.ProtocolName}, + ClientAuth: tls.RequestClientCert, + SessionTicketsDisabled: true, + InsecureSkipVerify: true, + MinVersion: tls.VersionTLS12, + CipherSuites: []uint16{ + tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256, + tls.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256, + tls.TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA, + tls.TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA, + tls.TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA, + tls.TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA, + }, + } +} diff --git a/lib/relaysrv/listener.go b/lib/relaysrv/listener.go new file mode 100644 index 000000000..3f584535e --- /dev/null +++ b/lib/relaysrv/listener.go @@ -0,0 +1,319 @@ +// Copyright (C) 2015 Audrius Butkevicius and Contributors (see the CONTRIBUTORS file). + +package main + +import ( + "crypto/tls" + "encoding/hex" + "log" + "net" + "sync" + "sync/atomic" + "time" + + syncthingprotocol "github.com/syncthing/protocol" + "github.com/syncthing/syncthing/lib/tlsutil" + + "github.com/syncthing/relaysrv/protocol" +) + +var ( + outboxesMut = sync.RWMutex{} + outboxes = make(map[syncthingprotocol.DeviceID]chan interface{}) + numConnections int64 +) + +func listener(addr string, config *tls.Config) { + tcpListener, err := net.Listen("tcp", addr) + if err != nil { + log.Fatalln(err) + } + + listener := tlsutil.DowngradingListener{tcpListener, nil} + + for { + conn, isTLS, err := listener.AcceptNoWrapTLS() + if err != nil { + if debug { + log.Println("Listener failed to accept connection from", conn.RemoteAddr(), ". Possibly a TCP Ping.") + } + continue + } + + setTCPOptions(conn) + + if debug { + log.Println("Listener accepted connection from", conn.RemoteAddr(), "tls", isTLS) + } + + if isTLS { + go protocolConnectionHandler(conn, config) + } else { + go sessionConnectionHandler(conn) + } + + } +} + +func protocolConnectionHandler(tcpConn net.Conn, config *tls.Config) { + conn := tls.Server(tcpConn, config) + err := conn.Handshake() + if err != nil { + if debug { + log.Println("Protocol connection TLS handshake:", conn.RemoteAddr(), err) + } + conn.Close() + return + } + + state := conn.ConnectionState() + if (!state.NegotiatedProtocolIsMutual || state.NegotiatedProtocol != protocol.ProtocolName) && debug { + log.Println("Protocol negotiation error") + } + + certs := state.PeerCertificates + if len(certs) != 1 { + if debug { + log.Println("Certificate list error") + } + conn.Close() + return + } + + id := syncthingprotocol.NewDeviceID(certs[0].Raw) + + messages := make(chan interface{}) + errors := make(chan error, 1) + outbox := make(chan interface{}) + + // Read messages from the connection and send them on the messages + // channel. When there is an error, send it on the error channel and + // return. Applies also when the connection gets closed, so the pattern + // below is to close the connection on error, then wait for the error + // signal from messageReader to exit. + go messageReader(conn, messages, errors) + + pingTicker := time.NewTicker(pingInterval) + timeoutTicker := time.NewTimer(networkTimeout) + joined := false + + for { + select { + case message := <-messages: + timeoutTicker.Reset(networkTimeout) + if debug { + log.Printf("Message %T from %s", message, id) + } + + switch msg := message.(type) { + case protocol.JoinRelayRequest: + outboxesMut.RLock() + _, ok := outboxes[id] + outboxesMut.RUnlock() + if ok { + protocol.WriteMessage(conn, protocol.ResponseAlreadyConnected) + if debug { + log.Println("Already have a peer with the same ID", id, conn.RemoteAddr()) + } + conn.Close() + continue + } + + outboxesMut.Lock() + outboxes[id] = outbox + outboxesMut.Unlock() + joined = true + + protocol.WriteMessage(conn, protocol.ResponseSuccess) + + case protocol.ConnectRequest: + requestedPeer := syncthingprotocol.DeviceIDFromBytes(msg.ID) + outboxesMut.RLock() + peerOutbox, ok := outboxes[requestedPeer] + outboxesMut.RUnlock() + if !ok { + if debug { + log.Println(id, "is looking for", requestedPeer, "which does not exist") + } + protocol.WriteMessage(conn, protocol.ResponseNotFound) + conn.Close() + continue + } + // requestedPeer is the server, id is the client + ses := newSession(requestedPeer, id, sessionLimiter, globalLimiter) + + go ses.Serve() + + clientInvitation := ses.GetClientInvitationMessage() + serverInvitation := ses.GetServerInvitationMessage() + + if err := protocol.WriteMessage(conn, clientInvitation); err != nil { + if debug { + log.Printf("Error sending invitation from %s to client: %s", id, err) + } + conn.Close() + continue + } + + peerOutbox <- serverInvitation + + if debug { + log.Println("Sent invitation from", id, "to", requestedPeer) + } + conn.Close() + + case protocol.Ping: + if err := protocol.WriteMessage(conn, protocol.Pong{}); err != nil { + if debug { + log.Println("Error writing pong:", err) + } + conn.Close() + continue + } + + case protocol.Pong: + // Nothing + + default: + if debug { + log.Printf("Unknown message %s: %T", id, message) + } + protocol.WriteMessage(conn, protocol.ResponseUnexpectedMessage) + conn.Close() + } + + case err := <-errors: + if debug { + log.Printf("Closing connection %s: %s", id, err) + } + close(outbox) + + // Potentially closing a second time. + conn.Close() + + if joined { + // Only delete the outbox if the client is joined, as it might be + // a lookup request coming from the same client. + outboxesMut.Lock() + delete(outboxes, id) + outboxesMut.Unlock() + // Also, kill all sessions related to this node, as it probably + // went offline. This is for the other end to realize the client + // is no longer there faster. This also helps resolve + // 'already connected' errors when one of the sides is + // restarting, and connecting to the other peer before the other + // peer even realised that the node has gone away. + dropSessions(id) + } + return + + case <-pingTicker.C: + if !joined { + if debug { + log.Println(id, "didn't join within", pingInterval) + } + conn.Close() + continue + } + + if err := protocol.WriteMessage(conn, protocol.Ping{}); err != nil { + if debug { + log.Println(id, err) + } + conn.Close() + } + + case <-timeoutTicker.C: + // We should receive a error from the reader loop, which will cause + // us to quit this loop. + if debug { + log.Printf("%s timed out", id) + } + conn.Close() + + case msg := <-outbox: + if debug { + log.Printf("Sending message %T to %s", msg, id) + } + if err := protocol.WriteMessage(conn, msg); err != nil { + if debug { + log.Println(id, err) + } + conn.Close() + } + } + } +} + +func sessionConnectionHandler(conn net.Conn) { + if err := conn.SetDeadline(time.Now().Add(messageTimeout)); err != nil { + if debug { + log.Println("Weird error setting deadline:", err, "on", conn.RemoteAddr()) + } + return + } + + message, err := protocol.ReadMessage(conn) + if err != nil { + return + } + + switch msg := message.(type) { + case protocol.JoinSessionRequest: + ses := findSession(string(msg.Key)) + if debug { + log.Println(conn.RemoteAddr(), "session lookup", ses, hex.EncodeToString(msg.Key)[:5]) + } + + if ses == nil { + protocol.WriteMessage(conn, protocol.ResponseNotFound) + conn.Close() + return + } + + if !ses.AddConnection(conn) { + if debug { + log.Println("Failed to add", conn.RemoteAddr(), "to session", ses) + } + protocol.WriteMessage(conn, protocol.ResponseAlreadyConnected) + conn.Close() + return + } + + if err := protocol.WriteMessage(conn, protocol.ResponseSuccess); err != nil { + if debug { + log.Println("Failed to send session join response to ", conn.RemoteAddr(), "for", ses) + } + return + } + + if err := conn.SetDeadline(time.Time{}); err != nil { + if debug { + log.Println("Weird error setting deadline:", err, "on", conn.RemoteAddr()) + } + conn.Close() + return + } + + default: + if debug { + log.Println("Unexpected message from", conn.RemoteAddr(), message) + } + protocol.WriteMessage(conn, protocol.ResponseUnexpectedMessage) + conn.Close() + } +} + +func messageReader(conn net.Conn, messages chan<- interface{}, errors chan<- error) { + atomic.AddInt64(&numConnections, 1) + defer atomic.AddInt64(&numConnections, -1) + + for { + msg, err := protocol.ReadMessage(conn) + if err != nil { + errors <- err + return + } + messages <- msg + } +} diff --git a/lib/relaysrv/main.go b/lib/relaysrv/main.go new file mode 100644 index 000000000..614f82c7f --- /dev/null +++ b/lib/relaysrv/main.go @@ -0,0 +1,142 @@ +// Copyright (C) 2015 Audrius Butkevicius and Contributors (see the CONTRIBUTORS file). + +package main + +import ( + "crypto/tls" + "flag" + "fmt" + "log" + "net" + "net/url" + "path/filepath" + "strings" + "time" + + "github.com/juju/ratelimit" + "github.com/syncthing/relaysrv/protocol" + "github.com/syncthing/syncthing/lib/tlsutil" + + syncthingprotocol "github.com/syncthing/protocol" +) + +var ( + listen string + debug bool = false + + sessionAddress []byte + sessionPort uint16 + + networkTimeout time.Duration = 2 * time.Minute + pingInterval time.Duration = time.Minute + messageTimeout time.Duration = time.Minute + + sessionLimitBps int + globalLimitBps int + sessionLimiter *ratelimit.Bucket + globalLimiter *ratelimit.Bucket + + statusAddr string + poolAddrs string + defaultPoolAddrs string = "https://relays.syncthing.net" +) + +func main() { + log.SetFlags(log.Lshortfile | log.LstdFlags) + + var dir, extAddress string + + flag.StringVar(&listen, "listen", ":22067", "Protocol listen address") + flag.StringVar(&dir, "keys", ".", "Directory where cert.pem and key.pem is stored") + flag.DurationVar(&networkTimeout, "network-timeout", networkTimeout, "Timeout for network operations between the client and the relay.\n\tIf no data is received between the client and the relay in this period of time, the connection is terminated.\n\tFurthermore, if no data is sent between either clients being relayed within this period of time, the session is also terminated.") + flag.DurationVar(&pingInterval, "ping-interval", pingInterval, "How often pings are sent") + flag.DurationVar(&messageTimeout, "message-timeout", messageTimeout, "Maximum amount of time we wait for relevant messages to arrive") + flag.IntVar(&sessionLimitBps, "per-session-rate", sessionLimitBps, "Per session rate limit, in bytes/s") + flag.IntVar(&globalLimitBps, "global-rate", globalLimitBps, "Global rate limit, in bytes/s") + flag.BoolVar(&debug, "debug", debug, "Enable debug output") + flag.StringVar(&statusAddr, "status-srv", ":22070", "Listen address for status service (blank to disable)") + flag.StringVar(&poolAddrs, "pools", defaultPoolAddrs, "Comma separated list of relay pool addresses to join") + + flag.Parse() + + if extAddress == "" { + extAddress = listen + } + + addr, err := net.ResolveTCPAddr("tcp", extAddress) + if err != nil { + log.Fatal(err) + } + + sessionAddress = addr.IP[:] + sessionPort = uint16(addr.Port) + + certFile, keyFile := filepath.Join(dir, "cert.pem"), filepath.Join(dir, "key.pem") + cert, err := tls.LoadX509KeyPair(certFile, keyFile) + if err != nil { + log.Println("Failed to load keypair. Generating one, this might take a while...") + cert, err = tlsutil.NewCertificate(certFile, keyFile, "relaysrv", 3072) + if err != nil { + log.Fatalln("Failed to generate X509 key pair:", err) + } + } + + tlsCfg := &tls.Config{ + Certificates: []tls.Certificate{cert}, + NextProtos: []string{protocol.ProtocolName}, + ClientAuth: tls.RequestClientCert, + SessionTicketsDisabled: true, + InsecureSkipVerify: true, + MinVersion: tls.VersionTLS12, + CipherSuites: []uint16{ + tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256, + tls.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256, + tls.TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA, + tls.TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA, + tls.TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA, + tls.TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA, + }, + } + + id := syncthingprotocol.NewDeviceID(cert.Certificate[0]) + if debug { + log.Println("ID:", id) + } + + if sessionLimitBps > 0 { + sessionLimiter = ratelimit.NewBucketWithRate(float64(sessionLimitBps), int64(2*sessionLimitBps)) + } + if globalLimitBps > 0 { + globalLimiter = ratelimit.NewBucketWithRate(float64(globalLimitBps), int64(2*globalLimitBps)) + } + + if statusAddr != "" { + go statusService(statusAddr) + } + + uri, err := url.Parse(fmt.Sprintf("relay://%s/?id=%s&pingInterval=%s&networkTimeout=%s&sessionLimitBps=%d&globalLimitBps=%d&statusAddr=%s", extAddress, id, pingInterval, networkTimeout, sessionLimitBps, globalLimitBps, statusAddr)) + if err != nil { + log.Fatalln("Failed to construct URI", err) + } + + if debug { + log.Println("URI:", uri.String()) + } + + if poolAddrs == defaultPoolAddrs { + log.Println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!") + log.Println("!! Joining default relay pools, this relay will be available for public use. !!") + log.Println(`!! Use the -pools="" command line option to make the relay private. !!`) + log.Println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!") + } + + pools := strings.Split(poolAddrs, ",") + for _, pool := range pools { + pool = strings.TrimSpace(pool) + if len(pool) > 0 { + go poolHandler(pool, uri) + } + } + + listener(listen, tlsCfg) +} diff --git a/lib/relaysrv/pool.go b/lib/relaysrv/pool.go new file mode 100644 index 000000000..0f327f690 --- /dev/null +++ b/lib/relaysrv/pool.go @@ -0,0 +1,68 @@ +// Copyright (C) 2015 Audrius Butkevicius and Contributors (see the CONTRIBUTORS file). + +package main + +import ( + "bytes" + "encoding/json" + "io/ioutil" + "log" + "net/http" + "net/url" + "time" +) + +func poolHandler(pool string, uri *url.URL) { + for { + var b bytes.Buffer + json.NewEncoder(&b).Encode(struct { + URL string `json:"url"` + }{ + uri.String(), + }) + + resp, err := http.Post(pool, "application/json", &b) + if err != nil { + if debug { + log.Println("Error joining pool", pool, err) + } + } else if resp.StatusCode == 500 { + if debug { + bs, err := ioutil.ReadAll(resp.Body) + if err != nil { + log.Println("Failed to read response body for", pool, err) + } else { + log.Println("Response for", pool, string(bs)) + } + resp.Body.Close() + } + } else if resp.StatusCode == 429 { + if debug { + log.Println(pool, "under load, will retry in a minute") + } + time.Sleep(time.Minute) + continue + } else if resp.StatusCode == 403 { + if debug { + log.Println(pool, "failed to join due to IP address not matching external address") + } + return + } else if resp.StatusCode == 200 { + var x struct { + EvictionIn time.Duration `json:"evictionIn"` + } + err := json.NewDecoder(resp.Body).Decode(&x) + if err == nil { + rejoin := x.EvictionIn - (x.EvictionIn / 5) + if debug { + log.Println("Joined", pool, "rejoining in", rejoin) + } + time.Sleep(rejoin) + continue + } else if debug { + log.Println("Failed to deserialize respnse", err) + } + } + time.Sleep(time.Hour) + } +} diff --git a/lib/relaysrv/protocol/packets.go b/lib/relaysrv/protocol/packets.go new file mode 100644 index 000000000..1b21eba24 --- /dev/null +++ b/lib/relaysrv/protocol/packets.go @@ -0,0 +1,66 @@ +// Copyright (C) 2015 Audrius Butkevicius and Contributors (see the CONTRIBUTORS file). + +//go:generate -command genxdr go run ../../syncthing/Godeps/_workspace/src/github.com/calmh/xdr/cmd/genxdr/main.go +//go:generate genxdr -o packets_xdr.go packets.go + +package protocol + +import ( + "fmt" + "net" + + syncthingprotocol "github.com/syncthing/protocol" +) + +const ( + messageTypePing int32 = iota + messageTypePong + messageTypeJoinRelayRequest + messageTypeJoinSessionRequest + messageTypeResponse + messageTypeConnectRequest + messageTypeSessionInvitation +) + +type header struct { + magic uint32 + messageType int32 + messageLength int32 +} + +type Ping struct{} +type Pong struct{} +type JoinRelayRequest struct{} + +type JoinSessionRequest struct { + Key []byte // max:32 +} + +type Response struct { + Code int32 + Message string +} + +type ConnectRequest struct { + ID []byte // max:32 +} + +type SessionInvitation struct { + From []byte // max:32 + Key []byte // max:32 + Address []byte // max:32 + Port uint16 + ServerSocket bool +} + +func (i SessionInvitation) String() string { + return fmt.Sprintf("%s@%s", syncthingprotocol.DeviceIDFromBytes(i.From), i.AddressString()) +} + +func (i SessionInvitation) GoString() string { + return i.String() +} + +func (i SessionInvitation) AddressString() string { + return fmt.Sprintf("%s:%d", net.IP(i.Address), i.Port) +} diff --git a/lib/relaysrv/protocol/packets_xdr.go b/lib/relaysrv/protocol/packets_xdr.go new file mode 100644 index 000000000..f18e18c18 --- /dev/null +++ b/lib/relaysrv/protocol/packets_xdr.go @@ -0,0 +1,567 @@ +// ************************************************************ +// This file is automatically generated by genxdr. Do not edit. +// ************************************************************ + +package protocol + +import ( + "bytes" + "io" + + "github.com/calmh/xdr" +) + +/* + +header Structure: + + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| magic | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| message Type | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| message Length | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + + +struct header { + unsigned int magic; + int messageType; + int messageLength; +} + +*/ + +func (o header) EncodeXDR(w io.Writer) (int, error) { + var xw = xdr.NewWriter(w) + return o.EncodeXDRInto(xw) +} + +func (o header) MarshalXDR() ([]byte, error) { + return o.AppendXDR(make([]byte, 0, 128)) +} + +func (o header) MustMarshalXDR() []byte { + bs, err := o.MarshalXDR() + if err != nil { + panic(err) + } + return bs +} + +func (o header) AppendXDR(bs []byte) ([]byte, error) { + var aw = xdr.AppendWriter(bs) + var xw = xdr.NewWriter(&aw) + _, err := o.EncodeXDRInto(xw) + return []byte(aw), err +} + +func (o header) EncodeXDRInto(xw *xdr.Writer) (int, error) { + xw.WriteUint32(o.magic) + xw.WriteUint32(uint32(o.messageType)) + xw.WriteUint32(uint32(o.messageLength)) + return xw.Tot(), xw.Error() +} + +func (o *header) DecodeXDR(r io.Reader) error { + xr := xdr.NewReader(r) + return o.DecodeXDRFrom(xr) +} + +func (o *header) UnmarshalXDR(bs []byte) error { + var br = bytes.NewReader(bs) + var xr = xdr.NewReader(br) + return o.DecodeXDRFrom(xr) +} + +func (o *header) DecodeXDRFrom(xr *xdr.Reader) error { + o.magic = xr.ReadUint32() + o.messageType = int32(xr.ReadUint32()) + o.messageLength = int32(xr.ReadUint32()) + return xr.Error() +} + +/* + +Ping Structure: + + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + + +struct Ping { +} + +*/ + +func (o Ping) EncodeXDR(w io.Writer) (int, error) { + var xw = xdr.NewWriter(w) + return o.EncodeXDRInto(xw) +} + +func (o Ping) MarshalXDR() ([]byte, error) { + return o.AppendXDR(make([]byte, 0, 128)) +} + +func (o Ping) MustMarshalXDR() []byte { + bs, err := o.MarshalXDR() + if err != nil { + panic(err) + } + return bs +} + +func (o Ping) AppendXDR(bs []byte) ([]byte, error) { + var aw = xdr.AppendWriter(bs) + var xw = xdr.NewWriter(&aw) + _, err := o.EncodeXDRInto(xw) + return []byte(aw), err +} + +func (o Ping) EncodeXDRInto(xw *xdr.Writer) (int, error) { + return xw.Tot(), xw.Error() +} + +func (o *Ping) DecodeXDR(r io.Reader) error { + xr := xdr.NewReader(r) + return o.DecodeXDRFrom(xr) +} + +func (o *Ping) UnmarshalXDR(bs []byte) error { + var br = bytes.NewReader(bs) + var xr = xdr.NewReader(br) + return o.DecodeXDRFrom(xr) +} + +func (o *Ping) DecodeXDRFrom(xr *xdr.Reader) error { + return xr.Error() +} + +/* + +Pong Structure: + + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + + +struct Pong { +} + +*/ + +func (o Pong) EncodeXDR(w io.Writer) (int, error) { + var xw = xdr.NewWriter(w) + return o.EncodeXDRInto(xw) +} + +func (o Pong) MarshalXDR() ([]byte, error) { + return o.AppendXDR(make([]byte, 0, 128)) +} + +func (o Pong) MustMarshalXDR() []byte { + bs, err := o.MarshalXDR() + if err != nil { + panic(err) + } + return bs +} + +func (o Pong) AppendXDR(bs []byte) ([]byte, error) { + var aw = xdr.AppendWriter(bs) + var xw = xdr.NewWriter(&aw) + _, err := o.EncodeXDRInto(xw) + return []byte(aw), err +} + +func (o Pong) EncodeXDRInto(xw *xdr.Writer) (int, error) { + return xw.Tot(), xw.Error() +} + +func (o *Pong) DecodeXDR(r io.Reader) error { + xr := xdr.NewReader(r) + return o.DecodeXDRFrom(xr) +} + +func (o *Pong) UnmarshalXDR(bs []byte) error { + var br = bytes.NewReader(bs) + var xr = xdr.NewReader(br) + return o.DecodeXDRFrom(xr) +} + +func (o *Pong) DecodeXDRFrom(xr *xdr.Reader) error { + return xr.Error() +} + +/* + +JoinRelayRequest Structure: + + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + + +struct JoinRelayRequest { +} + +*/ + +func (o JoinRelayRequest) EncodeXDR(w io.Writer) (int, error) { + var xw = xdr.NewWriter(w) + return o.EncodeXDRInto(xw) +} + +func (o JoinRelayRequest) MarshalXDR() ([]byte, error) { + return o.AppendXDR(make([]byte, 0, 128)) +} + +func (o JoinRelayRequest) MustMarshalXDR() []byte { + bs, err := o.MarshalXDR() + if err != nil { + panic(err) + } + return bs +} + +func (o JoinRelayRequest) AppendXDR(bs []byte) ([]byte, error) { + var aw = xdr.AppendWriter(bs) + var xw = xdr.NewWriter(&aw) + _, err := o.EncodeXDRInto(xw) + return []byte(aw), err +} + +func (o JoinRelayRequest) EncodeXDRInto(xw *xdr.Writer) (int, error) { + return xw.Tot(), xw.Error() +} + +func (o *JoinRelayRequest) DecodeXDR(r io.Reader) error { + xr := xdr.NewReader(r) + return o.DecodeXDRFrom(xr) +} + +func (o *JoinRelayRequest) UnmarshalXDR(bs []byte) error { + var br = bytes.NewReader(bs) + var xr = xdr.NewReader(br) + return o.DecodeXDRFrom(xr) +} + +func (o *JoinRelayRequest) DecodeXDRFrom(xr *xdr.Reader) error { + return xr.Error() +} + +/* + +JoinSessionRequest Structure: + + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| Length of Key | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +/ / +\ Key (variable length) \ +/ / ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + + +struct JoinSessionRequest { + opaque Key<32>; +} + +*/ + +func (o JoinSessionRequest) EncodeXDR(w io.Writer) (int, error) { + var xw = xdr.NewWriter(w) + return o.EncodeXDRInto(xw) +} + +func (o JoinSessionRequest) MarshalXDR() ([]byte, error) { + return o.AppendXDR(make([]byte, 0, 128)) +} + +func (o JoinSessionRequest) MustMarshalXDR() []byte { + bs, err := o.MarshalXDR() + if err != nil { + panic(err) + } + return bs +} + +func (o JoinSessionRequest) AppendXDR(bs []byte) ([]byte, error) { + var aw = xdr.AppendWriter(bs) + var xw = xdr.NewWriter(&aw) + _, err := o.EncodeXDRInto(xw) + return []byte(aw), err +} + +func (o JoinSessionRequest) EncodeXDRInto(xw *xdr.Writer) (int, error) { + if l := len(o.Key); l > 32 { + return xw.Tot(), xdr.ElementSizeExceeded("Key", l, 32) + } + xw.WriteBytes(o.Key) + return xw.Tot(), xw.Error() +} + +func (o *JoinSessionRequest) DecodeXDR(r io.Reader) error { + xr := xdr.NewReader(r) + return o.DecodeXDRFrom(xr) +} + +func (o *JoinSessionRequest) UnmarshalXDR(bs []byte) error { + var br = bytes.NewReader(bs) + var xr = xdr.NewReader(br) + return o.DecodeXDRFrom(xr) +} + +func (o *JoinSessionRequest) DecodeXDRFrom(xr *xdr.Reader) error { + o.Key = xr.ReadBytesMax(32) + return xr.Error() +} + +/* + +Response Structure: + + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| Code | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| Length of Message | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +/ / +\ Message (variable length) \ +/ / ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + + +struct Response { + int Code; + string Message<>; +} + +*/ + +func (o Response) EncodeXDR(w io.Writer) (int, error) { + var xw = xdr.NewWriter(w) + return o.EncodeXDRInto(xw) +} + +func (o Response) MarshalXDR() ([]byte, error) { + return o.AppendXDR(make([]byte, 0, 128)) +} + +func (o Response) MustMarshalXDR() []byte { + bs, err := o.MarshalXDR() + if err != nil { + panic(err) + } + return bs +} + +func (o Response) AppendXDR(bs []byte) ([]byte, error) { + var aw = xdr.AppendWriter(bs) + var xw = xdr.NewWriter(&aw) + _, err := o.EncodeXDRInto(xw) + return []byte(aw), err +} + +func (o Response) EncodeXDRInto(xw *xdr.Writer) (int, error) { + xw.WriteUint32(uint32(o.Code)) + xw.WriteString(o.Message) + return xw.Tot(), xw.Error() +} + +func (o *Response) DecodeXDR(r io.Reader) error { + xr := xdr.NewReader(r) + return o.DecodeXDRFrom(xr) +} + +func (o *Response) UnmarshalXDR(bs []byte) error { + var br = bytes.NewReader(bs) + var xr = xdr.NewReader(br) + return o.DecodeXDRFrom(xr) +} + +func (o *Response) DecodeXDRFrom(xr *xdr.Reader) error { + o.Code = int32(xr.ReadUint32()) + o.Message = xr.ReadString() + return xr.Error() +} + +/* + +ConnectRequest Structure: + + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| Length of ID | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +/ / +\ ID (variable length) \ +/ / ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + + +struct ConnectRequest { + opaque ID<32>; +} + +*/ + +func (o ConnectRequest) EncodeXDR(w io.Writer) (int, error) { + var xw = xdr.NewWriter(w) + return o.EncodeXDRInto(xw) +} + +func (o ConnectRequest) MarshalXDR() ([]byte, error) { + return o.AppendXDR(make([]byte, 0, 128)) +} + +func (o ConnectRequest) MustMarshalXDR() []byte { + bs, err := o.MarshalXDR() + if err != nil { + panic(err) + } + return bs +} + +func (o ConnectRequest) AppendXDR(bs []byte) ([]byte, error) { + var aw = xdr.AppendWriter(bs) + var xw = xdr.NewWriter(&aw) + _, err := o.EncodeXDRInto(xw) + return []byte(aw), err +} + +func (o ConnectRequest) EncodeXDRInto(xw *xdr.Writer) (int, error) { + if l := len(o.ID); l > 32 { + return xw.Tot(), xdr.ElementSizeExceeded("ID", l, 32) + } + xw.WriteBytes(o.ID) + return xw.Tot(), xw.Error() +} + +func (o *ConnectRequest) DecodeXDR(r io.Reader) error { + xr := xdr.NewReader(r) + return o.DecodeXDRFrom(xr) +} + +func (o *ConnectRequest) UnmarshalXDR(bs []byte) error { + var br = bytes.NewReader(bs) + var xr = xdr.NewReader(br) + return o.DecodeXDRFrom(xr) +} + +func (o *ConnectRequest) DecodeXDRFrom(xr *xdr.Reader) error { + o.ID = xr.ReadBytesMax(32) + return xr.Error() +} + +/* + +SessionInvitation Structure: + + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| Length of From | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +/ / +\ From (variable length) \ +/ / ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| Length of Key | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +/ / +\ Key (variable length) \ +/ / ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| Length of Address | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +/ / +\ Address (variable length) \ +/ / ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| 0x0000 | Port | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| Server Socket (V=0 or 1) |V| ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + + +struct SessionInvitation { + opaque From<32>; + opaque Key<32>; + opaque Address<32>; + unsigned int Port; + bool ServerSocket; +} + +*/ + +func (o SessionInvitation) EncodeXDR(w io.Writer) (int, error) { + var xw = xdr.NewWriter(w) + return o.EncodeXDRInto(xw) +} + +func (o SessionInvitation) MarshalXDR() ([]byte, error) { + return o.AppendXDR(make([]byte, 0, 128)) +} + +func (o SessionInvitation) MustMarshalXDR() []byte { + bs, err := o.MarshalXDR() + if err != nil { + panic(err) + } + return bs +} + +func (o SessionInvitation) AppendXDR(bs []byte) ([]byte, error) { + var aw = xdr.AppendWriter(bs) + var xw = xdr.NewWriter(&aw) + _, err := o.EncodeXDRInto(xw) + return []byte(aw), err +} + +func (o SessionInvitation) EncodeXDRInto(xw *xdr.Writer) (int, error) { + if l := len(o.From); l > 32 { + return xw.Tot(), xdr.ElementSizeExceeded("From", l, 32) + } + xw.WriteBytes(o.From) + if l := len(o.Key); l > 32 { + return xw.Tot(), xdr.ElementSizeExceeded("Key", l, 32) + } + xw.WriteBytes(o.Key) + if l := len(o.Address); l > 32 { + return xw.Tot(), xdr.ElementSizeExceeded("Address", l, 32) + } + xw.WriteBytes(o.Address) + xw.WriteUint16(o.Port) + xw.WriteBool(o.ServerSocket) + return xw.Tot(), xw.Error() +} + +func (o *SessionInvitation) DecodeXDR(r io.Reader) error { + xr := xdr.NewReader(r) + return o.DecodeXDRFrom(xr) +} + +func (o *SessionInvitation) UnmarshalXDR(bs []byte) error { + var br = bytes.NewReader(bs) + var xr = xdr.NewReader(br) + return o.DecodeXDRFrom(xr) +} + +func (o *SessionInvitation) DecodeXDRFrom(xr *xdr.Reader) error { + o.From = xr.ReadBytesMax(32) + o.Key = xr.ReadBytesMax(32) + o.Address = xr.ReadBytesMax(32) + o.Port = xr.ReadUint16() + o.ServerSocket = xr.ReadBool() + return xr.Error() +} diff --git a/lib/relaysrv/protocol/protocol.go b/lib/relaysrv/protocol/protocol.go new file mode 100644 index 000000000..57a967ac8 --- /dev/null +++ b/lib/relaysrv/protocol/protocol.go @@ -0,0 +1,114 @@ +// Copyright (C) 2015 Audrius Butkevicius and Contributors (see the CONTRIBUTORS file). + +package protocol + +import ( + "fmt" + "io" +) + +const ( + magic = 0x9E79BC40 + ProtocolName = "bep-relay" +) + +var ( + ResponseSuccess = Response{0, "success"} + ResponseNotFound = Response{1, "not found"} + ResponseAlreadyConnected = Response{2, "already connected"} + ResponseInternalError = Response{99, "internal error"} + ResponseUnexpectedMessage = Response{100, "unexpected message"} +) + +func WriteMessage(w io.Writer, message interface{}) error { + header := header{ + magic: magic, + } + + var payload []byte + var err error + + switch msg := message.(type) { + case Ping: + payload, err = msg.MarshalXDR() + header.messageType = messageTypePing + case Pong: + payload, err = msg.MarshalXDR() + header.messageType = messageTypePong + case JoinRelayRequest: + payload, err = msg.MarshalXDR() + header.messageType = messageTypeJoinRelayRequest + case JoinSessionRequest: + payload, err = msg.MarshalXDR() + header.messageType = messageTypeJoinSessionRequest + case Response: + payload, err = msg.MarshalXDR() + header.messageType = messageTypeResponse + case ConnectRequest: + payload, err = msg.MarshalXDR() + header.messageType = messageTypeConnectRequest + case SessionInvitation: + payload, err = msg.MarshalXDR() + header.messageType = messageTypeSessionInvitation + default: + err = fmt.Errorf("Unknown message type") + } + + if err != nil { + return err + } + + header.messageLength = int32(len(payload)) + + headerpayload, err := header.MarshalXDR() + if err != nil { + return err + } + + _, err = w.Write(append(headerpayload, payload...)) + return err +} + +func ReadMessage(r io.Reader) (interface{}, error) { + var header header + if err := header.DecodeXDR(r); err != nil { + return nil, err + } + + if header.magic != magic { + return nil, fmt.Errorf("magic mismatch") + } + + switch header.messageType { + case messageTypePing: + var msg Ping + err := msg.DecodeXDR(r) + return msg, err + case messageTypePong: + var msg Pong + err := msg.DecodeXDR(r) + return msg, err + case messageTypeJoinRelayRequest: + var msg JoinRelayRequest + err := msg.DecodeXDR(r) + return msg, err + case messageTypeJoinSessionRequest: + var msg JoinSessionRequest + err := msg.DecodeXDR(r) + return msg, err + case messageTypeResponse: + var msg Response + err := msg.DecodeXDR(r) + return msg, err + case messageTypeConnectRequest: + var msg ConnectRequest + err := msg.DecodeXDR(r) + return msg, err + case messageTypeSessionInvitation: + var msg SessionInvitation + err := msg.DecodeXDR(r) + return msg, err + } + + return nil, fmt.Errorf("Unknown message type") +} diff --git a/lib/relaysrv/session.go b/lib/relaysrv/session.go new file mode 100644 index 000000000..bbd29d1f5 --- /dev/null +++ b/lib/relaysrv/session.go @@ -0,0 +1,313 @@ +// Copyright (C) 2015 Audrius Butkevicius and Contributors (see the CONTRIBUTORS file). + +package main + +import ( + "crypto/rand" + "encoding/hex" + "fmt" + "log" + "net" + "sync" + "sync/atomic" + "time" + + "github.com/juju/ratelimit" + "github.com/syncthing/relaysrv/protocol" + + syncthingprotocol "github.com/syncthing/protocol" +) + +var ( + sessionMut = sync.RWMutex{} + activeSessions = make([]*session, 0) + pendingSessions = make(map[string]*session, 0) + numProxies int64 + bytesProxied int64 +) + +func newSession(serverid, clientid syncthingprotocol.DeviceID, sessionRateLimit, globalRateLimit *ratelimit.Bucket) *session { + serverkey := make([]byte, 32) + _, err := rand.Read(serverkey) + if err != nil { + return nil + } + + clientkey := make([]byte, 32) + _, err = rand.Read(clientkey) + if err != nil { + return nil + } + + ses := &session{ + serverkey: serverkey, + serverid: serverid, + clientkey: clientkey, + clientid: clientid, + rateLimit: makeRateLimitFunc(sessionRateLimit, globalRateLimit), + connsChan: make(chan net.Conn), + conns: make([]net.Conn, 0, 2), + } + + if debug { + log.Println("New session", ses) + } + + sessionMut.Lock() + pendingSessions[string(ses.serverkey)] = ses + pendingSessions[string(ses.clientkey)] = ses + sessionMut.Unlock() + + return ses +} + +func findSession(key string) *session { + sessionMut.Lock() + defer sessionMut.Unlock() + ses, ok := pendingSessions[key] + if !ok { + return nil + + } + delete(pendingSessions, key) + return ses +} + +func dropSessions(id syncthingprotocol.DeviceID) { + sessionMut.RLock() + for _, session := range activeSessions { + if session.HasParticipant(id) { + if debug { + log.Println("Dropping session", session, "involving", id) + } + session.CloseConns() + } + } + sessionMut.RUnlock() +} + +type session struct { + mut sync.Mutex + + serverkey []byte + serverid syncthingprotocol.DeviceID + + clientkey []byte + clientid syncthingprotocol.DeviceID + + rateLimit func(bytes int64) + + connsChan chan net.Conn + conns []net.Conn +} + +func (s *session) AddConnection(conn net.Conn) bool { + if debug { + log.Println("New connection for", s, "from", conn.RemoteAddr()) + } + + select { + case s.connsChan <- conn: + return true + default: + } + return false +} + +func (s *session) Serve() { + timedout := time.After(messageTimeout) + + if debug { + log.Println("Session", s, "serving") + } + + for { + select { + case conn := <-s.connsChan: + s.mut.Lock() + s.conns = append(s.conns, conn) + s.mut.Unlock() + // We're the only ones mutating% s.conns, hence we are free to read it. + if len(s.conns) < 2 { + continue + } + + close(s.connsChan) + + if debug { + log.Println("Session", s, "starting between", s.conns[0].RemoteAddr(), "and", s.conns[1].RemoteAddr()) + } + + wg := sync.WaitGroup{} + wg.Add(2) + + var err0 error + go func() { + err0 = s.proxy(s.conns[0], s.conns[1]) + wg.Done() + }() + + var err1 error + go func() { + err1 = s.proxy(s.conns[1], s.conns[0]) + wg.Done() + }() + + sessionMut.Lock() + activeSessions = append(activeSessions, s) + sessionMut.Unlock() + + wg.Wait() + + if debug { + log.Println("Session", s, "ended, outcomes:", err0, "and", err1) + } + goto done + + case <-timedout: + if debug { + log.Println("Session", s, "timed out") + } + goto done + } + } +done: + // We can end up here in 3 cases: + // 1. Timeout joining, in which case there are potentially entries in pendingSessions + // 2. General session end/timeout, in which case there are entries in activeSessions + // 3. Protocol handler calls dropSession as one of it's clients disconnects. + + sessionMut.Lock() + delete(pendingSessions, string(s.serverkey)) + delete(pendingSessions, string(s.clientkey)) + + for i, session := range activeSessions { + if session == s { + l := len(activeSessions) - 1 + activeSessions[i] = activeSessions[l] + activeSessions[l] = nil + activeSessions = activeSessions[:l] + } + } + sessionMut.Unlock() + + // If we are here because of case 2 or 3, we are potentially closing some or + // all connections a second time. + s.CloseConns() + + if debug { + log.Println("Session", s, "stopping") + } +} + +func (s *session) GetClientInvitationMessage() protocol.SessionInvitation { + return protocol.SessionInvitation{ + From: s.serverid[:], + Key: []byte(s.clientkey), + Address: sessionAddress, + Port: sessionPort, + ServerSocket: false, + } +} + +func (s *session) GetServerInvitationMessage() protocol.SessionInvitation { + return protocol.SessionInvitation{ + From: s.clientid[:], + Key: []byte(s.serverkey), + Address: sessionAddress, + Port: sessionPort, + ServerSocket: true, + } +} + +func (s *session) HasParticipant(id syncthingprotocol.DeviceID) bool { + return s.clientid == id || s.serverid == id +} + +func (s *session) CloseConns() { + s.mut.Lock() + for _, conn := range s.conns { + conn.Close() + } + s.mut.Unlock() +} + +func (s *session) proxy(c1, c2 net.Conn) error { + if debug { + log.Println("Proxy", c1.RemoteAddr(), "->", c2.RemoteAddr()) + } + + atomic.AddInt64(&numProxies, 1) + defer atomic.AddInt64(&numProxies, -1) + + buf := make([]byte, 65536) + for { + c1.SetReadDeadline(time.Now().Add(networkTimeout)) + n, err := c1.Read(buf) + if err != nil { + return err + } + + atomic.AddInt64(&bytesProxied, int64(n)) + + if debug { + log.Printf("%d bytes from %s to %s", n, c1.RemoteAddr(), c2.RemoteAddr()) + } + + if s.rateLimit != nil { + s.rateLimit(int64(n)) + } + + c2.SetWriteDeadline(time.Now().Add(networkTimeout)) + _, err = c2.Write(buf[:n]) + if err != nil { + return err + } + } +} + +func (s *session) String() string { + return fmt.Sprintf("<%s/%s>", hex.EncodeToString(s.clientkey)[:5], hex.EncodeToString(s.serverkey)[:5]) +} + +func makeRateLimitFunc(sessionRateLimit, globalRateLimit *ratelimit.Bucket) func(int64) { + // This may be a case of super duper premature optimization... We build an + // optimized function to do the rate limiting here based on what we need + // to do and then use it in the loop. + + if sessionRateLimit == nil && globalRateLimit == nil { + // No limiting needed. We could equally well return a func(int64){} and + // not do a nil check were we use it, but I think the nil check there + // makes it clear that there will be no limiting if none is + // configured... + return nil + } + + if sessionRateLimit == nil { + // We only have a global limiter + return func(bytes int64) { + globalRateLimit.Wait(bytes) + } + } + + if globalRateLimit == nil { + // We only have a session limiter + return func(bytes int64) { + sessionRateLimit.Wait(bytes) + } + } + + // We have both. Queue the bytes on both the global and session specific + // rate limiters. Wait for both in parallell, so that the actual send + // happens when both conditions are satisfied. In practice this just means + // wait the longer of the two times. + return func(bytes int64) { + t0 := sessionRateLimit.Take(bytes) + t1 := globalRateLimit.Take(bytes) + if t0 > t1 { + time.Sleep(t0) + } else { + time.Sleep(t1) + } + } +} diff --git a/lib/relaysrv/status.go b/lib/relaysrv/status.go new file mode 100644 index 000000000..b18cf3ea7 --- /dev/null +++ b/lib/relaysrv/status.go @@ -0,0 +1,94 @@ +package main + +import ( + "encoding/json" + "log" + "net/http" + "runtime" + "sync/atomic" + "time" +) + +var rc *rateCalculator + +func statusService(addr string) { + rc = newRateCalculator(360, 10*time.Second, &bytesProxied) + + http.HandleFunc("/status", getStatus) + if err := http.ListenAndServe(addr, nil); err != nil { + log.Fatal(err) + } +} + +func getStatus(w http.ResponseWriter, r *http.Request) { + status := make(map[string]interface{}) + + sessionMut.Lock() + // This can potentially be double the number of pending sessions, as each session has two keys, one for each side. + status["numPendingSessionKeys"] = len(pendingSessions) + status["numActiveSessions"] = len(activeSessions) + sessionMut.Unlock() + status["numConnections"] = atomic.LoadInt64(&numConnections) + status["numProxies"] = atomic.LoadInt64(&numProxies) + status["bytesProxied"] = atomic.LoadInt64(&bytesProxied) + status["goVersion"] = runtime.Version() + status["goOS"] = runtime.GOOS + status["goAarch"] = runtime.GOARCH + status["goMaxProcs"] = runtime.GOMAXPROCS(-1) + status["kbps10s1m5m15m30m60m"] = []int64{ + rc.rate(10/10) * 8 / 1000, + rc.rate(60/10) * 8 / 1000, + rc.rate(5*60/10) * 8 / 1000, + rc.rate(15*60/10) * 8 / 1000, + rc.rate(30*60/10) * 8 / 1000, + rc.rate(60*60/10) * 8 / 1000, + } + + bs, err := json.MarshalIndent(status, "", " ") + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + w.Header().Set("Content-Type", "application/json") + w.Write(bs) +} + +type rateCalculator struct { + rates []int64 + prev int64 + counter *int64 +} + +func newRateCalculator(keepIntervals int, interval time.Duration, counter *int64) *rateCalculator { + r := &rateCalculator{ + rates: make([]int64, keepIntervals), + counter: counter, + } + + go r.updateRates(interval) + + return r +} + +func (r *rateCalculator) updateRates(interval time.Duration) { + for { + now := time.Now() + next := now.Truncate(interval).Add(interval) + time.Sleep(next.Sub(now)) + + cur := atomic.LoadInt64(r.counter) + rate := int64(float64(cur-r.prev) / interval.Seconds()) + copy(r.rates[1:], r.rates) + r.rates[0] = rate + r.prev = cur + } +} + +func (r *rateCalculator) rate(periods int) int64 { + var tot int64 + for i := 0; i < periods; i++ { + tot += r.rates[i] + } + return tot / int64(periods) +} diff --git a/lib/relaysrv/testutil/main.go b/lib/relaysrv/testutil/main.go new file mode 100644 index 000000000..ffeb9942d --- /dev/null +++ b/lib/relaysrv/testutil/main.go @@ -0,0 +1,149 @@ +// Copyright (C) 2015 Audrius Butkevicius and Contributors (see the CONTRIBUTORS file). + +package main + +import ( + "bufio" + "crypto/tls" + "flag" + "log" + "net" + "net/url" + "os" + "path/filepath" + "time" + + syncthingprotocol "github.com/syncthing/protocol" + "github.com/syncthing/relaysrv/client" + "github.com/syncthing/relaysrv/protocol" +) + +func main() { + log.SetOutput(os.Stdout) + log.SetFlags(log.LstdFlags | log.Lshortfile) + + var connect, relay, dir string + var join, test bool + + flag.StringVar(&connect, "connect", "", "Device ID to which to connect to") + flag.BoolVar(&join, "join", false, "Join relay") + flag.BoolVar(&test, "test", false, "Generic relay test") + flag.StringVar(&relay, "relay", "relay://127.0.0.1:22067", "Relay address") + flag.StringVar(&dir, "keys", ".", "Directory where cert.pem and key.pem is stored") + + flag.Parse() + + certFile, keyFile := filepath.Join(dir, "cert.pem"), filepath.Join(dir, "key.pem") + cert, err := tls.LoadX509KeyPair(certFile, keyFile) + if err != nil { + log.Fatalln("Failed to load X509 key pair:", err) + } + + id := syncthingprotocol.NewDeviceID(cert.Certificate[0]) + log.Println("ID:", id) + + uri, err := url.Parse(relay) + if err != nil { + log.Fatal(err) + } + + stdin := make(chan string) + + go stdinReader(stdin) + + if join { + log.Println("Creating client") + relay := client.NewProtocolClient(uri, []tls.Certificate{cert}, nil) + log.Println("Created client") + + go relay.Serve() + + recv := make(chan protocol.SessionInvitation) + + go func() { + log.Println("Starting invitation receiver") + for invite := range relay.Invitations { + select { + case recv <- invite: + log.Println("Received invitation", invite) + default: + log.Println("Discarding invitation", invite) + } + } + }() + + for { + conn, err := client.JoinSession(<-recv) + if err != nil { + log.Fatalln("Failed to join", err) + } + log.Println("Joined", conn.RemoteAddr(), conn.LocalAddr()) + connectToStdio(stdin, conn) + log.Println("Finished", conn.RemoteAddr(), conn.LocalAddr()) + } + } else if connect != "" { + id, err := syncthingprotocol.DeviceIDFromString(connect) + if err != nil { + log.Fatal(err) + } + + invite, err := client.GetInvitationFromRelay(uri, id, []tls.Certificate{cert}) + if err != nil { + log.Fatal(err) + } + + log.Println("Received invitation", invite) + conn, err := client.JoinSession(invite) + if err != nil { + log.Fatalln("Failed to join", err) + } + log.Println("Joined", conn.RemoteAddr(), conn.LocalAddr()) + connectToStdio(stdin, conn) + log.Println("Finished", conn.RemoteAddr(), conn.LocalAddr()) + } else if test { + if client.TestRelay(uri, []tls.Certificate{cert}) { + log.Println("OK") + } else { + log.Println("FAIL") + } + } else { + log.Fatal("Requires either join or connect") + } +} + +func stdinReader(c chan<- string) { + scanner := bufio.NewScanner(os.Stdin) + for scanner.Scan() { + c <- scanner.Text() + c <- "\n" + } +} + +func connectToStdio(stdin <-chan string, conn net.Conn) { + go func() { + + }() + + buf := make([]byte, 1024) + for { + conn.SetReadDeadline(time.Now().Add(time.Millisecond)) + n, err := conn.Read(buf[0:]) + if err != nil { + nerr, ok := err.(net.Error) + if !ok || !nerr.Timeout() { + log.Println(err) + return + } + } + os.Stdout.Write(buf[:n]) + + select { + case msg := <-stdin: + _, err := conn.Write([]byte(msg)) + if err != nil { + return + } + default: + } + } +} diff --git a/lib/relaysrv/utils.go b/lib/relaysrv/utils.go new file mode 100644 index 000000000..7d1f6bfa4 --- /dev/null +++ b/lib/relaysrv/utils.go @@ -0,0 +1,28 @@ +// Copyright (C) 2015 Audrius Butkevicius and Contributors (see the CONTRIBUTORS file). + +package main + +import ( + "errors" + "net" +) + +func setTCPOptions(conn net.Conn) error { + tcpConn, ok := conn.(*net.TCPConn) + if !ok { + return errors.New("Not a TCP connection") + } + if err := tcpConn.SetLinger(0); err != nil { + return err + } + if err := tcpConn.SetNoDelay(true); err != nil { + return err + } + if err := tcpConn.SetKeepAlivePeriod(networkTimeout); err != nil { + return err + } + if err := tcpConn.SetKeepAlive(true); err != nil { + return err + } + return nil +}