Add 'lib/relaysrv/' from commit '6e126fb97e2ff566d35f8d8824e86793d22b2147'

git-subtree-dir: lib/relaysrv
git-subtree-mainline: 4316992d95
git-subtree-split: 6e126fb97e
This commit is contained in:
Jakob Borg 2015-09-22 19:34:52 +02:00
commit 633e888ba7
18 changed files with 2409 additions and 0 deletions

27
lib/relaysrv/.gitignore vendored Normal file
View File

@ -0,0 +1,27 @@
# Compiled Object files, Static and Dynamic libs (Shared Objects)
*.o
*.a
*.so
# Folders
_obj
_test
# Architecture specific extensions/prefixes
*.[568vq]
[568vq].out
*.cgo1.go
*.cgo2.c
_cgo_defun.c
_cgo_gotypes.go
_cgo_export.*
_testmain.go
*.exe
*.test
*.prof
*.tar.gz
*.zip
relaysrv

View File

@ -0,0 +1 @@
Jakob Borg <jakob@nym.se>

22
lib/relaysrv/LICENSE Normal file
View File

@ -0,0 +1,22 @@
The MIT License (MIT)
Copyright (c) 2015 The Syncthing Project
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

6
lib/relaysrv/README.md Normal file
View File

@ -0,0 +1,6 @@
relaysrv
========
This is the relay server for the `syncthing` project.
`go get github.com/syncthing/relaysrv`

39
lib/relaysrv/build.sh Executable file
View File

@ -0,0 +1,39 @@
#!/bin/bash
set -euo pipefail
set nullglob
echo Get dependencies
go get -d
rm -rf relaysrv-*-*
build() {
export GOOS="$1"
export GOARCH="$2"
target="relaysrv-$GOOS-$GOARCH"
go build -i -v -ldflags -w
mkdir "$target"
if [ -f relaysrv ] ; then
mv relaysrv "$target"
tar zcvf "$target.tar.gz" "$target"
rm -r "$target"
fi
if [ -f relaysrv.exe ] ; then
mv relaysrv.exe "$target"
zip -r "$target.zip" "$target"
rm -r "$target"
fi
}
for goos in linux darwin windows freebsd openbsd netbsd solaris ; do
build "$goos" amd64
done
for goos in linux windows freebsd openbsd netbsd ; do
build "$goos" 386
done
build linux arm
# Hack used because we run as root under Docker
if [[ ${CHOWN_USER:-} != "" ]] ; then
chown -R $CHOWN_USER .
fi

View File

@ -0,0 +1,298 @@
// Copyright (C) 2015 Audrius Butkevicius and Contributors (see the CONTRIBUTORS file).
package client
import (
"crypto/tls"
"fmt"
"log"
"net"
"net/url"
"time"
syncthingprotocol "github.com/syncthing/protocol"
"github.com/syncthing/relaysrv/protocol"
"github.com/syncthing/syncthing/lib/sync"
)
type ProtocolClient struct {
URI *url.URL
Invitations chan protocol.SessionInvitation
closeInvitationsOnFinish bool
config *tls.Config
timeout time.Duration
stop chan struct{}
stopped chan struct{}
conn *tls.Conn
mut sync.RWMutex
connected bool
latency time.Duration
}
func NewProtocolClient(uri *url.URL, certs []tls.Certificate, invitations chan protocol.SessionInvitation) *ProtocolClient {
closeInvitationsOnFinish := false
if invitations == nil {
closeInvitationsOnFinish = true
invitations = make(chan protocol.SessionInvitation)
}
return &ProtocolClient{
URI: uri,
Invitations: invitations,
closeInvitationsOnFinish: closeInvitationsOnFinish,
config: configForCerts(certs),
timeout: time.Minute * 2,
stop: make(chan struct{}),
stopped: make(chan struct{}),
mut: sync.NewRWMutex(),
connected: false,
}
}
func (c *ProtocolClient) Serve() {
c.stop = make(chan struct{})
c.stopped = make(chan struct{})
defer close(c.stopped)
if err := c.connect(); err != nil {
if debug {
l.Debugln("Relay connect:", err)
}
return
}
if debug {
l.Debugln(c, "connected", c.conn.RemoteAddr())
}
if err := c.join(); err != nil {
c.conn.Close()
l.Infoln("Relay join:", err)
return
}
if err := c.conn.SetDeadline(time.Time{}); err != nil {
l.Infoln("Relay set deadline:", err)
return
}
if debug {
l.Debugln(c, "joined", c.conn.RemoteAddr(), "via", c.conn.LocalAddr())
}
defer c.cleanup()
c.mut.Lock()
c.connected = true
c.mut.Unlock()
messages := make(chan interface{})
errors := make(chan error, 1)
go messageReader(c.conn, messages, errors)
timeout := time.NewTimer(c.timeout)
for {
select {
case message := <-messages:
timeout.Reset(c.timeout)
if debug {
log.Printf("%s received message %T", c, message)
}
switch msg := message.(type) {
case protocol.Ping:
if err := protocol.WriteMessage(c.conn, protocol.Pong{}); err != nil {
l.Infoln("Relay write:", err)
return
}
if debug {
l.Debugln(c, "sent pong")
}
case protocol.SessionInvitation:
ip := net.IP(msg.Address)
if len(ip) == 0 || ip.IsUnspecified() {
msg.Address = c.conn.RemoteAddr().(*net.TCPAddr).IP[:]
}
c.Invitations <- msg
default:
l.Infoln("Relay: protocol error: unexpected message %v", msg)
return
}
case <-c.stop:
if debug {
l.Debugln(c, "stopping")
}
return
case err := <-errors:
l.Infoln("Relay received:", err)
return
case <-timeout.C:
if debug {
l.Debugln(c, "timed out")
}
return
}
}
}
func (c *ProtocolClient) Stop() {
if c.stop == nil {
return
}
close(c.stop)
<-c.stopped
}
func (c *ProtocolClient) StatusOK() bool {
c.mut.RLock()
con := c.connected
c.mut.RUnlock()
return con
}
func (c *ProtocolClient) Latency() time.Duration {
c.mut.RLock()
lat := c.latency
c.mut.RUnlock()
return lat
}
func (c *ProtocolClient) String() string {
return fmt.Sprintf("ProtocolClient@%p", c)
}
func (c *ProtocolClient) connect() error {
if c.URI.Scheme != "relay" {
return fmt.Errorf("Unsupported relay schema:", c.URI.Scheme)
}
t0 := time.Now()
tcpConn, err := net.Dial("tcp", c.URI.Host)
if err != nil {
return err
}
c.mut.Lock()
c.latency = time.Since(t0)
c.mut.Unlock()
conn := tls.Client(tcpConn, c.config)
if err = conn.Handshake(); err != nil {
return err
}
if err := conn.SetDeadline(time.Now().Add(10 * time.Second)); err != nil {
conn.Close()
return err
}
if err := performHandshakeAndValidation(conn, c.URI); err != nil {
conn.Close()
return err
}
c.conn = conn
return nil
}
func (c *ProtocolClient) cleanup() {
if c.closeInvitationsOnFinish {
close(c.Invitations)
c.Invitations = make(chan protocol.SessionInvitation)
}
if debug {
l.Debugln(c, "cleaning up")
}
c.mut.Lock()
c.connected = false
c.mut.Unlock()
c.conn.Close()
}
func (c *ProtocolClient) join() error {
if err := protocol.WriteMessage(c.conn, protocol.JoinRelayRequest{}); err != nil {
return err
}
message, err := protocol.ReadMessage(c.conn)
if err != nil {
return err
}
switch msg := message.(type) {
case protocol.Response:
if msg.Code != 0 {
return fmt.Errorf("Incorrect response code %d: %s", msg.Code, msg.Message)
}
default:
return fmt.Errorf("protocol error: expecting response got %v", msg)
}
return nil
}
func performHandshakeAndValidation(conn *tls.Conn, uri *url.URL) error {
if err := conn.Handshake(); err != nil {
return err
}
cs := conn.ConnectionState()
if !cs.NegotiatedProtocolIsMutual || cs.NegotiatedProtocol != protocol.ProtocolName {
return fmt.Errorf("protocol negotiation error")
}
q := uri.Query()
relayIDs := q.Get("id")
if relayIDs != "" {
relayID, err := syncthingprotocol.DeviceIDFromString(relayIDs)
if err != nil {
return fmt.Errorf("relay address contains invalid verification id: %s", err)
}
certs := cs.PeerCertificates
if cl := len(certs); cl != 1 {
return fmt.Errorf("unexpected certificate count: %d", cl)
}
remoteID := syncthingprotocol.NewDeviceID(certs[0].Raw)
if remoteID != relayID {
return fmt.Errorf("relay id does not match. Expected %v got %v", relayID, remoteID)
}
}
return nil
}
func messageReader(conn net.Conn, messages chan<- interface{}, errors chan<- error) {
for {
msg, err := protocol.ReadMessage(conn)
if err != nil {
errors <- err
return
}
messages <- msg
}
}

View File

@ -0,0 +1,15 @@
// Copyright (C) 2015 Audrius Butkevicius and Contributors (see the CONTRIBUTORS file).
package client
import (
"os"
"strings"
"github.com/calmh/logger"
)
var (
debug = strings.Contains(os.Getenv("STTRACE"), "relay") || os.Getenv("STTRACE") == "all"
l = logger.DefaultLogger
)

View File

@ -0,0 +1,141 @@
// Copyright (C) 2015 Audrius Butkevicius and Contributors (see the CONTRIBUTORS file).
package client
import (
"crypto/tls"
"fmt"
"net"
"net/url"
"strconv"
"strings"
"time"
syncthingprotocol "github.com/syncthing/protocol"
"github.com/syncthing/relaysrv/protocol"
)
func GetInvitationFromRelay(uri *url.URL, id syncthingprotocol.DeviceID, certs []tls.Certificate) (protocol.SessionInvitation, error) {
if uri.Scheme != "relay" {
return protocol.SessionInvitation{}, fmt.Errorf("Unsupported relay scheme:", uri.Scheme)
}
conn, err := tls.Dial("tcp", uri.Host, configForCerts(certs))
if err != nil {
return protocol.SessionInvitation{}, err
}
conn.SetDeadline(time.Now().Add(10 * time.Second))
if err := performHandshakeAndValidation(conn, uri); err != nil {
return protocol.SessionInvitation{}, err
}
defer conn.Close()
request := protocol.ConnectRequest{
ID: id[:],
}
if err := protocol.WriteMessage(conn, request); err != nil {
return protocol.SessionInvitation{}, err
}
message, err := protocol.ReadMessage(conn)
if err != nil {
return protocol.SessionInvitation{}, err
}
switch msg := message.(type) {
case protocol.Response:
return protocol.SessionInvitation{}, fmt.Errorf("Incorrect response code %d: %s", msg.Code, msg.Message)
case protocol.SessionInvitation:
if debug {
l.Debugln("Received invitation", msg, "via", conn.LocalAddr())
}
ip := net.IP(msg.Address)
if len(ip) == 0 || ip.IsUnspecified() {
msg.Address = conn.RemoteAddr().(*net.TCPAddr).IP[:]
}
return msg, nil
default:
return protocol.SessionInvitation{}, fmt.Errorf("protocol error: unexpected message %v", msg)
}
}
func JoinSession(invitation protocol.SessionInvitation) (net.Conn, error) {
addr := net.JoinHostPort(net.IP(invitation.Address).String(), strconv.Itoa(int(invitation.Port)))
conn, err := net.Dial("tcp", addr)
if err != nil {
return nil, err
}
request := protocol.JoinSessionRequest{
Key: invitation.Key,
}
conn.SetDeadline(time.Now().Add(10 * time.Second))
err = protocol.WriteMessage(conn, request)
if err != nil {
return nil, err
}
message, err := protocol.ReadMessage(conn)
if err != nil {
return nil, err
}
conn.SetDeadline(time.Time{})
switch msg := message.(type) {
case protocol.Response:
if msg.Code != 0 {
return nil, fmt.Errorf("Incorrect response code %d: %s", msg.Code, msg.Message)
}
return conn, nil
default:
return nil, fmt.Errorf("protocol error: expecting response got %v", msg)
}
}
func TestRelay(uri *url.URL, certs []tls.Certificate, sleep time.Duration, times int) bool {
id := syncthingprotocol.NewDeviceID(certs[0].Certificate[0])
invs := make(chan protocol.SessionInvitation, 1)
c := NewProtocolClient(uri, certs, invs)
go c.Serve()
defer func() {
close(invs)
c.Stop()
}()
for i := 0; i < times; i++ {
_, err := GetInvitationFromRelay(uri, id, certs)
if err == nil {
return true
}
if !strings.Contains(err.Error(), "Incorrect response code") {
return false
}
time.Sleep(sleep)
}
return false
}
func configForCerts(certs []tls.Certificate) *tls.Config {
return &tls.Config{
Certificates: certs,
NextProtos: []string{protocol.ProtocolName},
ClientAuth: tls.RequestClientCert,
SessionTicketsDisabled: true,
InsecureSkipVerify: true,
MinVersion: tls.VersionTLS12,
CipherSuites: []uint16{
tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,
tls.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256,
tls.TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA,
tls.TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA,
tls.TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA,
tls.TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA,
},
}
}

319
lib/relaysrv/listener.go Normal file
View File

@ -0,0 +1,319 @@
// Copyright (C) 2015 Audrius Butkevicius and Contributors (see the CONTRIBUTORS file).
package main
import (
"crypto/tls"
"encoding/hex"
"log"
"net"
"sync"
"sync/atomic"
"time"
syncthingprotocol "github.com/syncthing/protocol"
"github.com/syncthing/syncthing/lib/tlsutil"
"github.com/syncthing/relaysrv/protocol"
)
var (
outboxesMut = sync.RWMutex{}
outboxes = make(map[syncthingprotocol.DeviceID]chan interface{})
numConnections int64
)
func listener(addr string, config *tls.Config) {
tcpListener, err := net.Listen("tcp", addr)
if err != nil {
log.Fatalln(err)
}
listener := tlsutil.DowngradingListener{tcpListener, nil}
for {
conn, isTLS, err := listener.AcceptNoWrapTLS()
if err != nil {
if debug {
log.Println("Listener failed to accept connection from", conn.RemoteAddr(), ". Possibly a TCP Ping.")
}
continue
}
setTCPOptions(conn)
if debug {
log.Println("Listener accepted connection from", conn.RemoteAddr(), "tls", isTLS)
}
if isTLS {
go protocolConnectionHandler(conn, config)
} else {
go sessionConnectionHandler(conn)
}
}
}
func protocolConnectionHandler(tcpConn net.Conn, config *tls.Config) {
conn := tls.Server(tcpConn, config)
err := conn.Handshake()
if err != nil {
if debug {
log.Println("Protocol connection TLS handshake:", conn.RemoteAddr(), err)
}
conn.Close()
return
}
state := conn.ConnectionState()
if (!state.NegotiatedProtocolIsMutual || state.NegotiatedProtocol != protocol.ProtocolName) && debug {
log.Println("Protocol negotiation error")
}
certs := state.PeerCertificates
if len(certs) != 1 {
if debug {
log.Println("Certificate list error")
}
conn.Close()
return
}
id := syncthingprotocol.NewDeviceID(certs[0].Raw)
messages := make(chan interface{})
errors := make(chan error, 1)
outbox := make(chan interface{})
// Read messages from the connection and send them on the messages
// channel. When there is an error, send it on the error channel and
// return. Applies also when the connection gets closed, so the pattern
// below is to close the connection on error, then wait for the error
// signal from messageReader to exit.
go messageReader(conn, messages, errors)
pingTicker := time.NewTicker(pingInterval)
timeoutTicker := time.NewTimer(networkTimeout)
joined := false
for {
select {
case message := <-messages:
timeoutTicker.Reset(networkTimeout)
if debug {
log.Printf("Message %T from %s", message, id)
}
switch msg := message.(type) {
case protocol.JoinRelayRequest:
outboxesMut.RLock()
_, ok := outboxes[id]
outboxesMut.RUnlock()
if ok {
protocol.WriteMessage(conn, protocol.ResponseAlreadyConnected)
if debug {
log.Println("Already have a peer with the same ID", id, conn.RemoteAddr())
}
conn.Close()
continue
}
outboxesMut.Lock()
outboxes[id] = outbox
outboxesMut.Unlock()
joined = true
protocol.WriteMessage(conn, protocol.ResponseSuccess)
case protocol.ConnectRequest:
requestedPeer := syncthingprotocol.DeviceIDFromBytes(msg.ID)
outboxesMut.RLock()
peerOutbox, ok := outboxes[requestedPeer]
outboxesMut.RUnlock()
if !ok {
if debug {
log.Println(id, "is looking for", requestedPeer, "which does not exist")
}
protocol.WriteMessage(conn, protocol.ResponseNotFound)
conn.Close()
continue
}
// requestedPeer is the server, id is the client
ses := newSession(requestedPeer, id, sessionLimiter, globalLimiter)
go ses.Serve()
clientInvitation := ses.GetClientInvitationMessage()
serverInvitation := ses.GetServerInvitationMessage()
if err := protocol.WriteMessage(conn, clientInvitation); err != nil {
if debug {
log.Printf("Error sending invitation from %s to client: %s", id, err)
}
conn.Close()
continue
}
peerOutbox <- serverInvitation
if debug {
log.Println("Sent invitation from", id, "to", requestedPeer)
}
conn.Close()
case protocol.Ping:
if err := protocol.WriteMessage(conn, protocol.Pong{}); err != nil {
if debug {
log.Println("Error writing pong:", err)
}
conn.Close()
continue
}
case protocol.Pong:
// Nothing
default:
if debug {
log.Printf("Unknown message %s: %T", id, message)
}
protocol.WriteMessage(conn, protocol.ResponseUnexpectedMessage)
conn.Close()
}
case err := <-errors:
if debug {
log.Printf("Closing connection %s: %s", id, err)
}
close(outbox)
// Potentially closing a second time.
conn.Close()
if joined {
// Only delete the outbox if the client is joined, as it might be
// a lookup request coming from the same client.
outboxesMut.Lock()
delete(outboxes, id)
outboxesMut.Unlock()
// Also, kill all sessions related to this node, as it probably
// went offline. This is for the other end to realize the client
// is no longer there faster. This also helps resolve
// 'already connected' errors when one of the sides is
// restarting, and connecting to the other peer before the other
// peer even realised that the node has gone away.
dropSessions(id)
}
return
case <-pingTicker.C:
if !joined {
if debug {
log.Println(id, "didn't join within", pingInterval)
}
conn.Close()
continue
}
if err := protocol.WriteMessage(conn, protocol.Ping{}); err != nil {
if debug {
log.Println(id, err)
}
conn.Close()
}
case <-timeoutTicker.C:
// We should receive a error from the reader loop, which will cause
// us to quit this loop.
if debug {
log.Printf("%s timed out", id)
}
conn.Close()
case msg := <-outbox:
if debug {
log.Printf("Sending message %T to %s", msg, id)
}
if err := protocol.WriteMessage(conn, msg); err != nil {
if debug {
log.Println(id, err)
}
conn.Close()
}
}
}
}
func sessionConnectionHandler(conn net.Conn) {
if err := conn.SetDeadline(time.Now().Add(messageTimeout)); err != nil {
if debug {
log.Println("Weird error setting deadline:", err, "on", conn.RemoteAddr())
}
return
}
message, err := protocol.ReadMessage(conn)
if err != nil {
return
}
switch msg := message.(type) {
case protocol.JoinSessionRequest:
ses := findSession(string(msg.Key))
if debug {
log.Println(conn.RemoteAddr(), "session lookup", ses, hex.EncodeToString(msg.Key)[:5])
}
if ses == nil {
protocol.WriteMessage(conn, protocol.ResponseNotFound)
conn.Close()
return
}
if !ses.AddConnection(conn) {
if debug {
log.Println("Failed to add", conn.RemoteAddr(), "to session", ses)
}
protocol.WriteMessage(conn, protocol.ResponseAlreadyConnected)
conn.Close()
return
}
if err := protocol.WriteMessage(conn, protocol.ResponseSuccess); err != nil {
if debug {
log.Println("Failed to send session join response to ", conn.RemoteAddr(), "for", ses)
}
return
}
if err := conn.SetDeadline(time.Time{}); err != nil {
if debug {
log.Println("Weird error setting deadline:", err, "on", conn.RemoteAddr())
}
conn.Close()
return
}
default:
if debug {
log.Println("Unexpected message from", conn.RemoteAddr(), message)
}
protocol.WriteMessage(conn, protocol.ResponseUnexpectedMessage)
conn.Close()
}
}
func messageReader(conn net.Conn, messages chan<- interface{}, errors chan<- error) {
atomic.AddInt64(&numConnections, 1)
defer atomic.AddInt64(&numConnections, -1)
for {
msg, err := protocol.ReadMessage(conn)
if err != nil {
errors <- err
return
}
messages <- msg
}
}

142
lib/relaysrv/main.go Normal file
View File

@ -0,0 +1,142 @@
// Copyright (C) 2015 Audrius Butkevicius and Contributors (see the CONTRIBUTORS file).
package main
import (
"crypto/tls"
"flag"
"fmt"
"log"
"net"
"net/url"
"path/filepath"
"strings"
"time"
"github.com/juju/ratelimit"
"github.com/syncthing/relaysrv/protocol"
"github.com/syncthing/syncthing/lib/tlsutil"
syncthingprotocol "github.com/syncthing/protocol"
)
var (
listen string
debug bool = false
sessionAddress []byte
sessionPort uint16
networkTimeout time.Duration = 2 * time.Minute
pingInterval time.Duration = time.Minute
messageTimeout time.Duration = time.Minute
sessionLimitBps int
globalLimitBps int
sessionLimiter *ratelimit.Bucket
globalLimiter *ratelimit.Bucket
statusAddr string
poolAddrs string
defaultPoolAddrs string = "https://relays.syncthing.net"
)
func main() {
log.SetFlags(log.Lshortfile | log.LstdFlags)
var dir, extAddress string
flag.StringVar(&listen, "listen", ":22067", "Protocol listen address")
flag.StringVar(&dir, "keys", ".", "Directory where cert.pem and key.pem is stored")
flag.DurationVar(&networkTimeout, "network-timeout", networkTimeout, "Timeout for network operations between the client and the relay.\n\tIf no data is received between the client and the relay in this period of time, the connection is terminated.\n\tFurthermore, if no data is sent between either clients being relayed within this period of time, the session is also terminated.")
flag.DurationVar(&pingInterval, "ping-interval", pingInterval, "How often pings are sent")
flag.DurationVar(&messageTimeout, "message-timeout", messageTimeout, "Maximum amount of time we wait for relevant messages to arrive")
flag.IntVar(&sessionLimitBps, "per-session-rate", sessionLimitBps, "Per session rate limit, in bytes/s")
flag.IntVar(&globalLimitBps, "global-rate", globalLimitBps, "Global rate limit, in bytes/s")
flag.BoolVar(&debug, "debug", debug, "Enable debug output")
flag.StringVar(&statusAddr, "status-srv", ":22070", "Listen address for status service (blank to disable)")
flag.StringVar(&poolAddrs, "pools", defaultPoolAddrs, "Comma separated list of relay pool addresses to join")
flag.Parse()
if extAddress == "" {
extAddress = listen
}
addr, err := net.ResolveTCPAddr("tcp", extAddress)
if err != nil {
log.Fatal(err)
}
sessionAddress = addr.IP[:]
sessionPort = uint16(addr.Port)
certFile, keyFile := filepath.Join(dir, "cert.pem"), filepath.Join(dir, "key.pem")
cert, err := tls.LoadX509KeyPair(certFile, keyFile)
if err != nil {
log.Println("Failed to load keypair. Generating one, this might take a while...")
cert, err = tlsutil.NewCertificate(certFile, keyFile, "relaysrv", 3072)
if err != nil {
log.Fatalln("Failed to generate X509 key pair:", err)
}
}
tlsCfg := &tls.Config{
Certificates: []tls.Certificate{cert},
NextProtos: []string{protocol.ProtocolName},
ClientAuth: tls.RequestClientCert,
SessionTicketsDisabled: true,
InsecureSkipVerify: true,
MinVersion: tls.VersionTLS12,
CipherSuites: []uint16{
tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,
tls.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256,
tls.TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA,
tls.TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA,
tls.TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA,
tls.TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA,
},
}
id := syncthingprotocol.NewDeviceID(cert.Certificate[0])
if debug {
log.Println("ID:", id)
}
if sessionLimitBps > 0 {
sessionLimiter = ratelimit.NewBucketWithRate(float64(sessionLimitBps), int64(2*sessionLimitBps))
}
if globalLimitBps > 0 {
globalLimiter = ratelimit.NewBucketWithRate(float64(globalLimitBps), int64(2*globalLimitBps))
}
if statusAddr != "" {
go statusService(statusAddr)
}
uri, err := url.Parse(fmt.Sprintf("relay://%s/?id=%s&pingInterval=%s&networkTimeout=%s&sessionLimitBps=%d&globalLimitBps=%d&statusAddr=%s", extAddress, id, pingInterval, networkTimeout, sessionLimitBps, globalLimitBps, statusAddr))
if err != nil {
log.Fatalln("Failed to construct URI", err)
}
if debug {
log.Println("URI:", uri.String())
}
if poolAddrs == defaultPoolAddrs {
log.Println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
log.Println("!! Joining default relay pools, this relay will be available for public use. !!")
log.Println(`!! Use the -pools="" command line option to make the relay private. !!`)
log.Println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!")
}
pools := strings.Split(poolAddrs, ",")
for _, pool := range pools {
pool = strings.TrimSpace(pool)
if len(pool) > 0 {
go poolHandler(pool, uri)
}
}
listener(listen, tlsCfg)
}

68
lib/relaysrv/pool.go Normal file
View File

@ -0,0 +1,68 @@
// Copyright (C) 2015 Audrius Butkevicius and Contributors (see the CONTRIBUTORS file).
package main
import (
"bytes"
"encoding/json"
"io/ioutil"
"log"
"net/http"
"net/url"
"time"
)
func poolHandler(pool string, uri *url.URL) {
for {
var b bytes.Buffer
json.NewEncoder(&b).Encode(struct {
URL string `json:"url"`
}{
uri.String(),
})
resp, err := http.Post(pool, "application/json", &b)
if err != nil {
if debug {
log.Println("Error joining pool", pool, err)
}
} else if resp.StatusCode == 500 {
if debug {
bs, err := ioutil.ReadAll(resp.Body)
if err != nil {
log.Println("Failed to read response body for", pool, err)
} else {
log.Println("Response for", pool, string(bs))
}
resp.Body.Close()
}
} else if resp.StatusCode == 429 {
if debug {
log.Println(pool, "under load, will retry in a minute")
}
time.Sleep(time.Minute)
continue
} else if resp.StatusCode == 403 {
if debug {
log.Println(pool, "failed to join due to IP address not matching external address")
}
return
} else if resp.StatusCode == 200 {
var x struct {
EvictionIn time.Duration `json:"evictionIn"`
}
err := json.NewDecoder(resp.Body).Decode(&x)
if err == nil {
rejoin := x.EvictionIn - (x.EvictionIn / 5)
if debug {
log.Println("Joined", pool, "rejoining in", rejoin)
}
time.Sleep(rejoin)
continue
} else if debug {
log.Println("Failed to deserialize respnse", err)
}
}
time.Sleep(time.Hour)
}
}

View File

@ -0,0 +1,66 @@
// Copyright (C) 2015 Audrius Butkevicius and Contributors (see the CONTRIBUTORS file).
//go:generate -command genxdr go run ../../syncthing/Godeps/_workspace/src/github.com/calmh/xdr/cmd/genxdr/main.go
//go:generate genxdr -o packets_xdr.go packets.go
package protocol
import (
"fmt"
"net"
syncthingprotocol "github.com/syncthing/protocol"
)
const (
messageTypePing int32 = iota
messageTypePong
messageTypeJoinRelayRequest
messageTypeJoinSessionRequest
messageTypeResponse
messageTypeConnectRequest
messageTypeSessionInvitation
)
type header struct {
magic uint32
messageType int32
messageLength int32
}
type Ping struct{}
type Pong struct{}
type JoinRelayRequest struct{}
type JoinSessionRequest struct {
Key []byte // max:32
}
type Response struct {
Code int32
Message string
}
type ConnectRequest struct {
ID []byte // max:32
}
type SessionInvitation struct {
From []byte // max:32
Key []byte // max:32
Address []byte // max:32
Port uint16
ServerSocket bool
}
func (i SessionInvitation) String() string {
return fmt.Sprintf("%s@%s", syncthingprotocol.DeviceIDFromBytes(i.From), i.AddressString())
}
func (i SessionInvitation) GoString() string {
return i.String()
}
func (i SessionInvitation) AddressString() string {
return fmt.Sprintf("%s:%d", net.IP(i.Address), i.Port)
}

View File

@ -0,0 +1,567 @@
// ************************************************************
// This file is automatically generated by genxdr. Do not edit.
// ************************************************************
package protocol
import (
"bytes"
"io"
"github.com/calmh/xdr"
)
/*
header Structure:
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| magic |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| message Type |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| message Length |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
struct header {
unsigned int magic;
int messageType;
int messageLength;
}
*/
func (o header) EncodeXDR(w io.Writer) (int, error) {
var xw = xdr.NewWriter(w)
return o.EncodeXDRInto(xw)
}
func (o header) MarshalXDR() ([]byte, error) {
return o.AppendXDR(make([]byte, 0, 128))
}
func (o header) MustMarshalXDR() []byte {
bs, err := o.MarshalXDR()
if err != nil {
panic(err)
}
return bs
}
func (o header) AppendXDR(bs []byte) ([]byte, error) {
var aw = xdr.AppendWriter(bs)
var xw = xdr.NewWriter(&aw)
_, err := o.EncodeXDRInto(xw)
return []byte(aw), err
}
func (o header) EncodeXDRInto(xw *xdr.Writer) (int, error) {
xw.WriteUint32(o.magic)
xw.WriteUint32(uint32(o.messageType))
xw.WriteUint32(uint32(o.messageLength))
return xw.Tot(), xw.Error()
}
func (o *header) DecodeXDR(r io.Reader) error {
xr := xdr.NewReader(r)
return o.DecodeXDRFrom(xr)
}
func (o *header) UnmarshalXDR(bs []byte) error {
var br = bytes.NewReader(bs)
var xr = xdr.NewReader(br)
return o.DecodeXDRFrom(xr)
}
func (o *header) DecodeXDRFrom(xr *xdr.Reader) error {
o.magic = xr.ReadUint32()
o.messageType = int32(xr.ReadUint32())
o.messageLength = int32(xr.ReadUint32())
return xr.Error()
}
/*
Ping Structure:
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
struct Ping {
}
*/
func (o Ping) EncodeXDR(w io.Writer) (int, error) {
var xw = xdr.NewWriter(w)
return o.EncodeXDRInto(xw)
}
func (o Ping) MarshalXDR() ([]byte, error) {
return o.AppendXDR(make([]byte, 0, 128))
}
func (o Ping) MustMarshalXDR() []byte {
bs, err := o.MarshalXDR()
if err != nil {
panic(err)
}
return bs
}
func (o Ping) AppendXDR(bs []byte) ([]byte, error) {
var aw = xdr.AppendWriter(bs)
var xw = xdr.NewWriter(&aw)
_, err := o.EncodeXDRInto(xw)
return []byte(aw), err
}
func (o Ping) EncodeXDRInto(xw *xdr.Writer) (int, error) {
return xw.Tot(), xw.Error()
}
func (o *Ping) DecodeXDR(r io.Reader) error {
xr := xdr.NewReader(r)
return o.DecodeXDRFrom(xr)
}
func (o *Ping) UnmarshalXDR(bs []byte) error {
var br = bytes.NewReader(bs)
var xr = xdr.NewReader(br)
return o.DecodeXDRFrom(xr)
}
func (o *Ping) DecodeXDRFrom(xr *xdr.Reader) error {
return xr.Error()
}
/*
Pong Structure:
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
struct Pong {
}
*/
func (o Pong) EncodeXDR(w io.Writer) (int, error) {
var xw = xdr.NewWriter(w)
return o.EncodeXDRInto(xw)
}
func (o Pong) MarshalXDR() ([]byte, error) {
return o.AppendXDR(make([]byte, 0, 128))
}
func (o Pong) MustMarshalXDR() []byte {
bs, err := o.MarshalXDR()
if err != nil {
panic(err)
}
return bs
}
func (o Pong) AppendXDR(bs []byte) ([]byte, error) {
var aw = xdr.AppendWriter(bs)
var xw = xdr.NewWriter(&aw)
_, err := o.EncodeXDRInto(xw)
return []byte(aw), err
}
func (o Pong) EncodeXDRInto(xw *xdr.Writer) (int, error) {
return xw.Tot(), xw.Error()
}
func (o *Pong) DecodeXDR(r io.Reader) error {
xr := xdr.NewReader(r)
return o.DecodeXDRFrom(xr)
}
func (o *Pong) UnmarshalXDR(bs []byte) error {
var br = bytes.NewReader(bs)
var xr = xdr.NewReader(br)
return o.DecodeXDRFrom(xr)
}
func (o *Pong) DecodeXDRFrom(xr *xdr.Reader) error {
return xr.Error()
}
/*
JoinRelayRequest Structure:
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
struct JoinRelayRequest {
}
*/
func (o JoinRelayRequest) EncodeXDR(w io.Writer) (int, error) {
var xw = xdr.NewWriter(w)
return o.EncodeXDRInto(xw)
}
func (o JoinRelayRequest) MarshalXDR() ([]byte, error) {
return o.AppendXDR(make([]byte, 0, 128))
}
func (o JoinRelayRequest) MustMarshalXDR() []byte {
bs, err := o.MarshalXDR()
if err != nil {
panic(err)
}
return bs
}
func (o JoinRelayRequest) AppendXDR(bs []byte) ([]byte, error) {
var aw = xdr.AppendWriter(bs)
var xw = xdr.NewWriter(&aw)
_, err := o.EncodeXDRInto(xw)
return []byte(aw), err
}
func (o JoinRelayRequest) EncodeXDRInto(xw *xdr.Writer) (int, error) {
return xw.Tot(), xw.Error()
}
func (o *JoinRelayRequest) DecodeXDR(r io.Reader) error {
xr := xdr.NewReader(r)
return o.DecodeXDRFrom(xr)
}
func (o *JoinRelayRequest) UnmarshalXDR(bs []byte) error {
var br = bytes.NewReader(bs)
var xr = xdr.NewReader(br)
return o.DecodeXDRFrom(xr)
}
func (o *JoinRelayRequest) DecodeXDRFrom(xr *xdr.Reader) error {
return xr.Error()
}
/*
JoinSessionRequest Structure:
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Length of Key |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
/ /
\ Key (variable length) \
/ /
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
struct JoinSessionRequest {
opaque Key<32>;
}
*/
func (o JoinSessionRequest) EncodeXDR(w io.Writer) (int, error) {
var xw = xdr.NewWriter(w)
return o.EncodeXDRInto(xw)
}
func (o JoinSessionRequest) MarshalXDR() ([]byte, error) {
return o.AppendXDR(make([]byte, 0, 128))
}
func (o JoinSessionRequest) MustMarshalXDR() []byte {
bs, err := o.MarshalXDR()
if err != nil {
panic(err)
}
return bs
}
func (o JoinSessionRequest) AppendXDR(bs []byte) ([]byte, error) {
var aw = xdr.AppendWriter(bs)
var xw = xdr.NewWriter(&aw)
_, err := o.EncodeXDRInto(xw)
return []byte(aw), err
}
func (o JoinSessionRequest) EncodeXDRInto(xw *xdr.Writer) (int, error) {
if l := len(o.Key); l > 32 {
return xw.Tot(), xdr.ElementSizeExceeded("Key", l, 32)
}
xw.WriteBytes(o.Key)
return xw.Tot(), xw.Error()
}
func (o *JoinSessionRequest) DecodeXDR(r io.Reader) error {
xr := xdr.NewReader(r)
return o.DecodeXDRFrom(xr)
}
func (o *JoinSessionRequest) UnmarshalXDR(bs []byte) error {
var br = bytes.NewReader(bs)
var xr = xdr.NewReader(br)
return o.DecodeXDRFrom(xr)
}
func (o *JoinSessionRequest) DecodeXDRFrom(xr *xdr.Reader) error {
o.Key = xr.ReadBytesMax(32)
return xr.Error()
}
/*
Response Structure:
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Code |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Length of Message |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
/ /
\ Message (variable length) \
/ /
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
struct Response {
int Code;
string Message<>;
}
*/
func (o Response) EncodeXDR(w io.Writer) (int, error) {
var xw = xdr.NewWriter(w)
return o.EncodeXDRInto(xw)
}
func (o Response) MarshalXDR() ([]byte, error) {
return o.AppendXDR(make([]byte, 0, 128))
}
func (o Response) MustMarshalXDR() []byte {
bs, err := o.MarshalXDR()
if err != nil {
panic(err)
}
return bs
}
func (o Response) AppendXDR(bs []byte) ([]byte, error) {
var aw = xdr.AppendWriter(bs)
var xw = xdr.NewWriter(&aw)
_, err := o.EncodeXDRInto(xw)
return []byte(aw), err
}
func (o Response) EncodeXDRInto(xw *xdr.Writer) (int, error) {
xw.WriteUint32(uint32(o.Code))
xw.WriteString(o.Message)
return xw.Tot(), xw.Error()
}
func (o *Response) DecodeXDR(r io.Reader) error {
xr := xdr.NewReader(r)
return o.DecodeXDRFrom(xr)
}
func (o *Response) UnmarshalXDR(bs []byte) error {
var br = bytes.NewReader(bs)
var xr = xdr.NewReader(br)
return o.DecodeXDRFrom(xr)
}
func (o *Response) DecodeXDRFrom(xr *xdr.Reader) error {
o.Code = int32(xr.ReadUint32())
o.Message = xr.ReadString()
return xr.Error()
}
/*
ConnectRequest Structure:
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Length of ID |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
/ /
\ ID (variable length) \
/ /
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
struct ConnectRequest {
opaque ID<32>;
}
*/
func (o ConnectRequest) EncodeXDR(w io.Writer) (int, error) {
var xw = xdr.NewWriter(w)
return o.EncodeXDRInto(xw)
}
func (o ConnectRequest) MarshalXDR() ([]byte, error) {
return o.AppendXDR(make([]byte, 0, 128))
}
func (o ConnectRequest) MustMarshalXDR() []byte {
bs, err := o.MarshalXDR()
if err != nil {
panic(err)
}
return bs
}
func (o ConnectRequest) AppendXDR(bs []byte) ([]byte, error) {
var aw = xdr.AppendWriter(bs)
var xw = xdr.NewWriter(&aw)
_, err := o.EncodeXDRInto(xw)
return []byte(aw), err
}
func (o ConnectRequest) EncodeXDRInto(xw *xdr.Writer) (int, error) {
if l := len(o.ID); l > 32 {
return xw.Tot(), xdr.ElementSizeExceeded("ID", l, 32)
}
xw.WriteBytes(o.ID)
return xw.Tot(), xw.Error()
}
func (o *ConnectRequest) DecodeXDR(r io.Reader) error {
xr := xdr.NewReader(r)
return o.DecodeXDRFrom(xr)
}
func (o *ConnectRequest) UnmarshalXDR(bs []byte) error {
var br = bytes.NewReader(bs)
var xr = xdr.NewReader(br)
return o.DecodeXDRFrom(xr)
}
func (o *ConnectRequest) DecodeXDRFrom(xr *xdr.Reader) error {
o.ID = xr.ReadBytesMax(32)
return xr.Error()
}
/*
SessionInvitation Structure:
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Length of From |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
/ /
\ From (variable length) \
/ /
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Length of Key |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
/ /
\ Key (variable length) \
/ /
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Length of Address |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
/ /
\ Address (variable length) \
/ /
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| 0x0000 | Port |
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
| Server Socket (V=0 or 1) |V|
+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
struct SessionInvitation {
opaque From<32>;
opaque Key<32>;
opaque Address<32>;
unsigned int Port;
bool ServerSocket;
}
*/
func (o SessionInvitation) EncodeXDR(w io.Writer) (int, error) {
var xw = xdr.NewWriter(w)
return o.EncodeXDRInto(xw)
}
func (o SessionInvitation) MarshalXDR() ([]byte, error) {
return o.AppendXDR(make([]byte, 0, 128))
}
func (o SessionInvitation) MustMarshalXDR() []byte {
bs, err := o.MarshalXDR()
if err != nil {
panic(err)
}
return bs
}
func (o SessionInvitation) AppendXDR(bs []byte) ([]byte, error) {
var aw = xdr.AppendWriter(bs)
var xw = xdr.NewWriter(&aw)
_, err := o.EncodeXDRInto(xw)
return []byte(aw), err
}
func (o SessionInvitation) EncodeXDRInto(xw *xdr.Writer) (int, error) {
if l := len(o.From); l > 32 {
return xw.Tot(), xdr.ElementSizeExceeded("From", l, 32)
}
xw.WriteBytes(o.From)
if l := len(o.Key); l > 32 {
return xw.Tot(), xdr.ElementSizeExceeded("Key", l, 32)
}
xw.WriteBytes(o.Key)
if l := len(o.Address); l > 32 {
return xw.Tot(), xdr.ElementSizeExceeded("Address", l, 32)
}
xw.WriteBytes(o.Address)
xw.WriteUint16(o.Port)
xw.WriteBool(o.ServerSocket)
return xw.Tot(), xw.Error()
}
func (o *SessionInvitation) DecodeXDR(r io.Reader) error {
xr := xdr.NewReader(r)
return o.DecodeXDRFrom(xr)
}
func (o *SessionInvitation) UnmarshalXDR(bs []byte) error {
var br = bytes.NewReader(bs)
var xr = xdr.NewReader(br)
return o.DecodeXDRFrom(xr)
}
func (o *SessionInvitation) DecodeXDRFrom(xr *xdr.Reader) error {
o.From = xr.ReadBytesMax(32)
o.Key = xr.ReadBytesMax(32)
o.Address = xr.ReadBytesMax(32)
o.Port = xr.ReadUint16()
o.ServerSocket = xr.ReadBool()
return xr.Error()
}

View File

@ -0,0 +1,114 @@
// Copyright (C) 2015 Audrius Butkevicius and Contributors (see the CONTRIBUTORS file).
package protocol
import (
"fmt"
"io"
)
const (
magic = 0x9E79BC40
ProtocolName = "bep-relay"
)
var (
ResponseSuccess = Response{0, "success"}
ResponseNotFound = Response{1, "not found"}
ResponseAlreadyConnected = Response{2, "already connected"}
ResponseInternalError = Response{99, "internal error"}
ResponseUnexpectedMessage = Response{100, "unexpected message"}
)
func WriteMessage(w io.Writer, message interface{}) error {
header := header{
magic: magic,
}
var payload []byte
var err error
switch msg := message.(type) {
case Ping:
payload, err = msg.MarshalXDR()
header.messageType = messageTypePing
case Pong:
payload, err = msg.MarshalXDR()
header.messageType = messageTypePong
case JoinRelayRequest:
payload, err = msg.MarshalXDR()
header.messageType = messageTypeJoinRelayRequest
case JoinSessionRequest:
payload, err = msg.MarshalXDR()
header.messageType = messageTypeJoinSessionRequest
case Response:
payload, err = msg.MarshalXDR()
header.messageType = messageTypeResponse
case ConnectRequest:
payload, err = msg.MarshalXDR()
header.messageType = messageTypeConnectRequest
case SessionInvitation:
payload, err = msg.MarshalXDR()
header.messageType = messageTypeSessionInvitation
default:
err = fmt.Errorf("Unknown message type")
}
if err != nil {
return err
}
header.messageLength = int32(len(payload))
headerpayload, err := header.MarshalXDR()
if err != nil {
return err
}
_, err = w.Write(append(headerpayload, payload...))
return err
}
func ReadMessage(r io.Reader) (interface{}, error) {
var header header
if err := header.DecodeXDR(r); err != nil {
return nil, err
}
if header.magic != magic {
return nil, fmt.Errorf("magic mismatch")
}
switch header.messageType {
case messageTypePing:
var msg Ping
err := msg.DecodeXDR(r)
return msg, err
case messageTypePong:
var msg Pong
err := msg.DecodeXDR(r)
return msg, err
case messageTypeJoinRelayRequest:
var msg JoinRelayRequest
err := msg.DecodeXDR(r)
return msg, err
case messageTypeJoinSessionRequest:
var msg JoinSessionRequest
err := msg.DecodeXDR(r)
return msg, err
case messageTypeResponse:
var msg Response
err := msg.DecodeXDR(r)
return msg, err
case messageTypeConnectRequest:
var msg ConnectRequest
err := msg.DecodeXDR(r)
return msg, err
case messageTypeSessionInvitation:
var msg SessionInvitation
err := msg.DecodeXDR(r)
return msg, err
}
return nil, fmt.Errorf("Unknown message type")
}

313
lib/relaysrv/session.go Normal file
View File

@ -0,0 +1,313 @@
// Copyright (C) 2015 Audrius Butkevicius and Contributors (see the CONTRIBUTORS file).
package main
import (
"crypto/rand"
"encoding/hex"
"fmt"
"log"
"net"
"sync"
"sync/atomic"
"time"
"github.com/juju/ratelimit"
"github.com/syncthing/relaysrv/protocol"
syncthingprotocol "github.com/syncthing/protocol"
)
var (
sessionMut = sync.RWMutex{}
activeSessions = make([]*session, 0)
pendingSessions = make(map[string]*session, 0)
numProxies int64
bytesProxied int64
)
func newSession(serverid, clientid syncthingprotocol.DeviceID, sessionRateLimit, globalRateLimit *ratelimit.Bucket) *session {
serverkey := make([]byte, 32)
_, err := rand.Read(serverkey)
if err != nil {
return nil
}
clientkey := make([]byte, 32)
_, err = rand.Read(clientkey)
if err != nil {
return nil
}
ses := &session{
serverkey: serverkey,
serverid: serverid,
clientkey: clientkey,
clientid: clientid,
rateLimit: makeRateLimitFunc(sessionRateLimit, globalRateLimit),
connsChan: make(chan net.Conn),
conns: make([]net.Conn, 0, 2),
}
if debug {
log.Println("New session", ses)
}
sessionMut.Lock()
pendingSessions[string(ses.serverkey)] = ses
pendingSessions[string(ses.clientkey)] = ses
sessionMut.Unlock()
return ses
}
func findSession(key string) *session {
sessionMut.Lock()
defer sessionMut.Unlock()
ses, ok := pendingSessions[key]
if !ok {
return nil
}
delete(pendingSessions, key)
return ses
}
func dropSessions(id syncthingprotocol.DeviceID) {
sessionMut.RLock()
for _, session := range activeSessions {
if session.HasParticipant(id) {
if debug {
log.Println("Dropping session", session, "involving", id)
}
session.CloseConns()
}
}
sessionMut.RUnlock()
}
type session struct {
mut sync.Mutex
serverkey []byte
serverid syncthingprotocol.DeviceID
clientkey []byte
clientid syncthingprotocol.DeviceID
rateLimit func(bytes int64)
connsChan chan net.Conn
conns []net.Conn
}
func (s *session) AddConnection(conn net.Conn) bool {
if debug {
log.Println("New connection for", s, "from", conn.RemoteAddr())
}
select {
case s.connsChan <- conn:
return true
default:
}
return false
}
func (s *session) Serve() {
timedout := time.After(messageTimeout)
if debug {
log.Println("Session", s, "serving")
}
for {
select {
case conn := <-s.connsChan:
s.mut.Lock()
s.conns = append(s.conns, conn)
s.mut.Unlock()
// We're the only ones mutating% s.conns, hence we are free to read it.
if len(s.conns) < 2 {
continue
}
close(s.connsChan)
if debug {
log.Println("Session", s, "starting between", s.conns[0].RemoteAddr(), "and", s.conns[1].RemoteAddr())
}
wg := sync.WaitGroup{}
wg.Add(2)
var err0 error
go func() {
err0 = s.proxy(s.conns[0], s.conns[1])
wg.Done()
}()
var err1 error
go func() {
err1 = s.proxy(s.conns[1], s.conns[0])
wg.Done()
}()
sessionMut.Lock()
activeSessions = append(activeSessions, s)
sessionMut.Unlock()
wg.Wait()
if debug {
log.Println("Session", s, "ended, outcomes:", err0, "and", err1)
}
goto done
case <-timedout:
if debug {
log.Println("Session", s, "timed out")
}
goto done
}
}
done:
// We can end up here in 3 cases:
// 1. Timeout joining, in which case there are potentially entries in pendingSessions
// 2. General session end/timeout, in which case there are entries in activeSessions
// 3. Protocol handler calls dropSession as one of it's clients disconnects.
sessionMut.Lock()
delete(pendingSessions, string(s.serverkey))
delete(pendingSessions, string(s.clientkey))
for i, session := range activeSessions {
if session == s {
l := len(activeSessions) - 1
activeSessions[i] = activeSessions[l]
activeSessions[l] = nil
activeSessions = activeSessions[:l]
}
}
sessionMut.Unlock()
// If we are here because of case 2 or 3, we are potentially closing some or
// all connections a second time.
s.CloseConns()
if debug {
log.Println("Session", s, "stopping")
}
}
func (s *session) GetClientInvitationMessage() protocol.SessionInvitation {
return protocol.SessionInvitation{
From: s.serverid[:],
Key: []byte(s.clientkey),
Address: sessionAddress,
Port: sessionPort,
ServerSocket: false,
}
}
func (s *session) GetServerInvitationMessage() protocol.SessionInvitation {
return protocol.SessionInvitation{
From: s.clientid[:],
Key: []byte(s.serverkey),
Address: sessionAddress,
Port: sessionPort,
ServerSocket: true,
}
}
func (s *session) HasParticipant(id syncthingprotocol.DeviceID) bool {
return s.clientid == id || s.serverid == id
}
func (s *session) CloseConns() {
s.mut.Lock()
for _, conn := range s.conns {
conn.Close()
}
s.mut.Unlock()
}
func (s *session) proxy(c1, c2 net.Conn) error {
if debug {
log.Println("Proxy", c1.RemoteAddr(), "->", c2.RemoteAddr())
}
atomic.AddInt64(&numProxies, 1)
defer atomic.AddInt64(&numProxies, -1)
buf := make([]byte, 65536)
for {
c1.SetReadDeadline(time.Now().Add(networkTimeout))
n, err := c1.Read(buf)
if err != nil {
return err
}
atomic.AddInt64(&bytesProxied, int64(n))
if debug {
log.Printf("%d bytes from %s to %s", n, c1.RemoteAddr(), c2.RemoteAddr())
}
if s.rateLimit != nil {
s.rateLimit(int64(n))
}
c2.SetWriteDeadline(time.Now().Add(networkTimeout))
_, err = c2.Write(buf[:n])
if err != nil {
return err
}
}
}
func (s *session) String() string {
return fmt.Sprintf("<%s/%s>", hex.EncodeToString(s.clientkey)[:5], hex.EncodeToString(s.serverkey)[:5])
}
func makeRateLimitFunc(sessionRateLimit, globalRateLimit *ratelimit.Bucket) func(int64) {
// This may be a case of super duper premature optimization... We build an
// optimized function to do the rate limiting here based on what we need
// to do and then use it in the loop.
if sessionRateLimit == nil && globalRateLimit == nil {
// No limiting needed. We could equally well return a func(int64){} and
// not do a nil check were we use it, but I think the nil check there
// makes it clear that there will be no limiting if none is
// configured...
return nil
}
if sessionRateLimit == nil {
// We only have a global limiter
return func(bytes int64) {
globalRateLimit.Wait(bytes)
}
}
if globalRateLimit == nil {
// We only have a session limiter
return func(bytes int64) {
sessionRateLimit.Wait(bytes)
}
}
// We have both. Queue the bytes on both the global and session specific
// rate limiters. Wait for both in parallell, so that the actual send
// happens when both conditions are satisfied. In practice this just means
// wait the longer of the two times.
return func(bytes int64) {
t0 := sessionRateLimit.Take(bytes)
t1 := globalRateLimit.Take(bytes)
if t0 > t1 {
time.Sleep(t0)
} else {
time.Sleep(t1)
}
}
}

94
lib/relaysrv/status.go Normal file
View File

@ -0,0 +1,94 @@
package main
import (
"encoding/json"
"log"
"net/http"
"runtime"
"sync/atomic"
"time"
)
var rc *rateCalculator
func statusService(addr string) {
rc = newRateCalculator(360, 10*time.Second, &bytesProxied)
http.HandleFunc("/status", getStatus)
if err := http.ListenAndServe(addr, nil); err != nil {
log.Fatal(err)
}
}
func getStatus(w http.ResponseWriter, r *http.Request) {
status := make(map[string]interface{})
sessionMut.Lock()
// This can potentially be double the number of pending sessions, as each session has two keys, one for each side.
status["numPendingSessionKeys"] = len(pendingSessions)
status["numActiveSessions"] = len(activeSessions)
sessionMut.Unlock()
status["numConnections"] = atomic.LoadInt64(&numConnections)
status["numProxies"] = atomic.LoadInt64(&numProxies)
status["bytesProxied"] = atomic.LoadInt64(&bytesProxied)
status["goVersion"] = runtime.Version()
status["goOS"] = runtime.GOOS
status["goAarch"] = runtime.GOARCH
status["goMaxProcs"] = runtime.GOMAXPROCS(-1)
status["kbps10s1m5m15m30m60m"] = []int64{
rc.rate(10/10) * 8 / 1000,
rc.rate(60/10) * 8 / 1000,
rc.rate(5*60/10) * 8 / 1000,
rc.rate(15*60/10) * 8 / 1000,
rc.rate(30*60/10) * 8 / 1000,
rc.rate(60*60/10) * 8 / 1000,
}
bs, err := json.MarshalIndent(status, "", " ")
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
w.Write(bs)
}
type rateCalculator struct {
rates []int64
prev int64
counter *int64
}
func newRateCalculator(keepIntervals int, interval time.Duration, counter *int64) *rateCalculator {
r := &rateCalculator{
rates: make([]int64, keepIntervals),
counter: counter,
}
go r.updateRates(interval)
return r
}
func (r *rateCalculator) updateRates(interval time.Duration) {
for {
now := time.Now()
next := now.Truncate(interval).Add(interval)
time.Sleep(next.Sub(now))
cur := atomic.LoadInt64(r.counter)
rate := int64(float64(cur-r.prev) / interval.Seconds())
copy(r.rates[1:], r.rates)
r.rates[0] = rate
r.prev = cur
}
}
func (r *rateCalculator) rate(periods int) int64 {
var tot int64
for i := 0; i < periods; i++ {
tot += r.rates[i]
}
return tot / int64(periods)
}

View File

@ -0,0 +1,149 @@
// Copyright (C) 2015 Audrius Butkevicius and Contributors (see the CONTRIBUTORS file).
package main
import (
"bufio"
"crypto/tls"
"flag"
"log"
"net"
"net/url"
"os"
"path/filepath"
"time"
syncthingprotocol "github.com/syncthing/protocol"
"github.com/syncthing/relaysrv/client"
"github.com/syncthing/relaysrv/protocol"
)
func main() {
log.SetOutput(os.Stdout)
log.SetFlags(log.LstdFlags | log.Lshortfile)
var connect, relay, dir string
var join, test bool
flag.StringVar(&connect, "connect", "", "Device ID to which to connect to")
flag.BoolVar(&join, "join", false, "Join relay")
flag.BoolVar(&test, "test", false, "Generic relay test")
flag.StringVar(&relay, "relay", "relay://127.0.0.1:22067", "Relay address")
flag.StringVar(&dir, "keys", ".", "Directory where cert.pem and key.pem is stored")
flag.Parse()
certFile, keyFile := filepath.Join(dir, "cert.pem"), filepath.Join(dir, "key.pem")
cert, err := tls.LoadX509KeyPair(certFile, keyFile)
if err != nil {
log.Fatalln("Failed to load X509 key pair:", err)
}
id := syncthingprotocol.NewDeviceID(cert.Certificate[0])
log.Println("ID:", id)
uri, err := url.Parse(relay)
if err != nil {
log.Fatal(err)
}
stdin := make(chan string)
go stdinReader(stdin)
if join {
log.Println("Creating client")
relay := client.NewProtocolClient(uri, []tls.Certificate{cert}, nil)
log.Println("Created client")
go relay.Serve()
recv := make(chan protocol.SessionInvitation)
go func() {
log.Println("Starting invitation receiver")
for invite := range relay.Invitations {
select {
case recv <- invite:
log.Println("Received invitation", invite)
default:
log.Println("Discarding invitation", invite)
}
}
}()
for {
conn, err := client.JoinSession(<-recv)
if err != nil {
log.Fatalln("Failed to join", err)
}
log.Println("Joined", conn.RemoteAddr(), conn.LocalAddr())
connectToStdio(stdin, conn)
log.Println("Finished", conn.RemoteAddr(), conn.LocalAddr())
}
} else if connect != "" {
id, err := syncthingprotocol.DeviceIDFromString(connect)
if err != nil {
log.Fatal(err)
}
invite, err := client.GetInvitationFromRelay(uri, id, []tls.Certificate{cert})
if err != nil {
log.Fatal(err)
}
log.Println("Received invitation", invite)
conn, err := client.JoinSession(invite)
if err != nil {
log.Fatalln("Failed to join", err)
}
log.Println("Joined", conn.RemoteAddr(), conn.LocalAddr())
connectToStdio(stdin, conn)
log.Println("Finished", conn.RemoteAddr(), conn.LocalAddr())
} else if test {
if client.TestRelay(uri, []tls.Certificate{cert}) {
log.Println("OK")
} else {
log.Println("FAIL")
}
} else {
log.Fatal("Requires either join or connect")
}
}
func stdinReader(c chan<- string) {
scanner := bufio.NewScanner(os.Stdin)
for scanner.Scan() {
c <- scanner.Text()
c <- "\n"
}
}
func connectToStdio(stdin <-chan string, conn net.Conn) {
go func() {
}()
buf := make([]byte, 1024)
for {
conn.SetReadDeadline(time.Now().Add(time.Millisecond))
n, err := conn.Read(buf[0:])
if err != nil {
nerr, ok := err.(net.Error)
if !ok || !nerr.Timeout() {
log.Println(err)
return
}
}
os.Stdout.Write(buf[:n])
select {
case msg := <-stdin:
_, err := conn.Write([]byte(msg))
if err != nil {
return
}
default:
}
}
}

28
lib/relaysrv/utils.go Normal file
View File

@ -0,0 +1,28 @@
// Copyright (C) 2015 Audrius Butkevicius and Contributors (see the CONTRIBUTORS file).
package main
import (
"errors"
"net"
)
func setTCPOptions(conn net.Conn) error {
tcpConn, ok := conn.(*net.TCPConn)
if !ok {
return errors.New("Not a TCP connection")
}
if err := tcpConn.SetLinger(0); err != nil {
return err
}
if err := tcpConn.SetNoDelay(true); err != nil {
return err
}
if err := tcpConn.SetKeepAlivePeriod(networkTimeout); err != nil {
return err
}
if err := tcpConn.SetKeepAlive(true); err != nil {
return err
}
return nil
}