From f177924629396526442335c4087365065a50badf Mon Sep 17 00:00:00 2001 From: Jakob Borg Date: Tue, 22 Sep 2015 19:37:03 +0200 Subject: [PATCH] Rejiggle lib/relaysrv/* -> lib/relay/* --- lib/{relaysrv => relay}/client/client.go | 0 lib/{relaysrv => relay}/client/debug.go | 0 lib/{relaysrv => relay}/client/methods.go | 0 lib/{relaysrv => relay}/protocol/packets.go | 0 .../protocol/packets_xdr.go | 0 lib/{relaysrv => relay}/protocol/protocol.go | 0 lib/relaysrv/.gitignore | 27 -- lib/relaysrv/CONTRIBUTORS | 1 - lib/relaysrv/LICENSE | 22 -- lib/relaysrv/README.md | 6 - lib/relaysrv/build.sh | 39 --- lib/relaysrv/listener.go | 319 ------------------ lib/relaysrv/main.go | 142 -------- lib/relaysrv/pool.go | 68 ---- lib/relaysrv/session.go | 313 ----------------- lib/relaysrv/status.go | 94 ------ lib/relaysrv/testutil/main.go | 149 -------- lib/relaysrv/utils.go | 28 -- 18 files changed, 1208 deletions(-) rename lib/{relaysrv => relay}/client/client.go (100%) rename lib/{relaysrv => relay}/client/debug.go (100%) rename lib/{relaysrv => relay}/client/methods.go (100%) rename lib/{relaysrv => relay}/protocol/packets.go (100%) rename lib/{relaysrv => relay}/protocol/packets_xdr.go (100%) rename lib/{relaysrv => relay}/protocol/protocol.go (100%) delete mode 100644 lib/relaysrv/.gitignore delete mode 100644 lib/relaysrv/CONTRIBUTORS delete mode 100644 lib/relaysrv/LICENSE delete mode 100644 lib/relaysrv/README.md delete mode 100755 lib/relaysrv/build.sh delete mode 100644 lib/relaysrv/listener.go delete mode 100644 lib/relaysrv/main.go delete mode 100644 lib/relaysrv/pool.go delete mode 100644 lib/relaysrv/session.go delete mode 100644 lib/relaysrv/status.go delete mode 100644 lib/relaysrv/testutil/main.go delete mode 100644 lib/relaysrv/utils.go diff --git a/lib/relaysrv/client/client.go b/lib/relay/client/client.go similarity index 100% rename from lib/relaysrv/client/client.go rename to lib/relay/client/client.go diff --git a/lib/relaysrv/client/debug.go b/lib/relay/client/debug.go similarity index 100% rename from lib/relaysrv/client/debug.go rename to lib/relay/client/debug.go diff --git a/lib/relaysrv/client/methods.go b/lib/relay/client/methods.go similarity index 100% rename from lib/relaysrv/client/methods.go rename to lib/relay/client/methods.go diff --git a/lib/relaysrv/protocol/packets.go b/lib/relay/protocol/packets.go similarity index 100% rename from lib/relaysrv/protocol/packets.go rename to lib/relay/protocol/packets.go diff --git a/lib/relaysrv/protocol/packets_xdr.go b/lib/relay/protocol/packets_xdr.go similarity index 100% rename from lib/relaysrv/protocol/packets_xdr.go rename to lib/relay/protocol/packets_xdr.go diff --git a/lib/relaysrv/protocol/protocol.go b/lib/relay/protocol/protocol.go similarity index 100% rename from lib/relaysrv/protocol/protocol.go rename to lib/relay/protocol/protocol.go diff --git a/lib/relaysrv/.gitignore b/lib/relaysrv/.gitignore deleted file mode 100644 index 775c43cc2..000000000 --- a/lib/relaysrv/.gitignore +++ /dev/null @@ -1,27 +0,0 @@ -# Compiled Object files, Static and Dynamic libs (Shared Objects) -*.o -*.a -*.so - -# Folders -_obj -_test - -# Architecture specific extensions/prefixes -*.[568vq] -[568vq].out - -*.cgo1.go -*.cgo2.c -_cgo_defun.c -_cgo_gotypes.go -_cgo_export.* - -_testmain.go - -*.exe -*.test -*.prof -*.tar.gz -*.zip -relaysrv diff --git a/lib/relaysrv/CONTRIBUTORS b/lib/relaysrv/CONTRIBUTORS deleted file mode 100644 index 4b00258cc..000000000 --- a/lib/relaysrv/CONTRIBUTORS +++ /dev/null @@ -1 +0,0 @@ -Jakob Borg diff --git a/lib/relaysrv/LICENSE b/lib/relaysrv/LICENSE deleted file mode 100644 index 581a17054..000000000 --- a/lib/relaysrv/LICENSE +++ /dev/null @@ -1,22 +0,0 @@ -The MIT License (MIT) - -Copyright (c) 2015 The Syncthing Project - -Permission is hereby granted, free of charge, to any person obtaining a copy -of this software and associated documentation files (the "Software"), to deal -in the Software without restriction, including without limitation the rights -to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -copies of the Software, and to permit persons to whom the Software is -furnished to do so, subject to the following conditions: - -The above copyright notice and this permission notice shall be included in all -copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -SOFTWARE. - diff --git a/lib/relaysrv/README.md b/lib/relaysrv/README.md deleted file mode 100644 index e88929280..000000000 --- a/lib/relaysrv/README.md +++ /dev/null @@ -1,6 +0,0 @@ -relaysrv -======== - -This is the relay server for the `syncthing` project. - -`go get github.com/syncthing/relaysrv` diff --git a/lib/relaysrv/build.sh b/lib/relaysrv/build.sh deleted file mode 100755 index 42c2681dd..000000000 --- a/lib/relaysrv/build.sh +++ /dev/null @@ -1,39 +0,0 @@ -#!/bin/bash -set -euo pipefail -set nullglob - -echo Get dependencies -go get -d - -rm -rf relaysrv-*-* - -build() { - export GOOS="$1" - export GOARCH="$2" - target="relaysrv-$GOOS-$GOARCH" - go build -i -v -ldflags -w - mkdir "$target" - if [ -f relaysrv ] ; then - mv relaysrv "$target" - tar zcvf "$target.tar.gz" "$target" - rm -r "$target" - fi - if [ -f relaysrv.exe ] ; then - mv relaysrv.exe "$target" - zip -r "$target.zip" "$target" - rm -r "$target" - fi -} - -for goos in linux darwin windows freebsd openbsd netbsd solaris ; do - build "$goos" amd64 -done -for goos in linux windows freebsd openbsd netbsd ; do - build "$goos" 386 -done -build linux arm - -# Hack used because we run as root under Docker -if [[ ${CHOWN_USER:-} != "" ]] ; then - chown -R $CHOWN_USER . -fi diff --git a/lib/relaysrv/listener.go b/lib/relaysrv/listener.go deleted file mode 100644 index 3f584535e..000000000 --- a/lib/relaysrv/listener.go +++ /dev/null @@ -1,319 +0,0 @@ -// Copyright (C) 2015 Audrius Butkevicius and Contributors (see the CONTRIBUTORS file). - -package main - -import ( - "crypto/tls" - "encoding/hex" - "log" - "net" - "sync" - "sync/atomic" - "time" - - syncthingprotocol "github.com/syncthing/protocol" - "github.com/syncthing/syncthing/lib/tlsutil" - - "github.com/syncthing/relaysrv/protocol" -) - -var ( - outboxesMut = sync.RWMutex{} - outboxes = make(map[syncthingprotocol.DeviceID]chan interface{}) - numConnections int64 -) - -func listener(addr string, config *tls.Config) { - tcpListener, err := net.Listen("tcp", addr) - if err != nil { - log.Fatalln(err) - } - - listener := tlsutil.DowngradingListener{tcpListener, nil} - - for { - conn, isTLS, err := listener.AcceptNoWrapTLS() - if err != nil { - if debug { - log.Println("Listener failed to accept connection from", conn.RemoteAddr(), ". Possibly a TCP Ping.") - } - continue - } - - setTCPOptions(conn) - - if debug { - log.Println("Listener accepted connection from", conn.RemoteAddr(), "tls", isTLS) - } - - if isTLS { - go protocolConnectionHandler(conn, config) - } else { - go sessionConnectionHandler(conn) - } - - } -} - -func protocolConnectionHandler(tcpConn net.Conn, config *tls.Config) { - conn := tls.Server(tcpConn, config) - err := conn.Handshake() - if err != nil { - if debug { - log.Println("Protocol connection TLS handshake:", conn.RemoteAddr(), err) - } - conn.Close() - return - } - - state := conn.ConnectionState() - if (!state.NegotiatedProtocolIsMutual || state.NegotiatedProtocol != protocol.ProtocolName) && debug { - log.Println("Protocol negotiation error") - } - - certs := state.PeerCertificates - if len(certs) != 1 { - if debug { - log.Println("Certificate list error") - } - conn.Close() - return - } - - id := syncthingprotocol.NewDeviceID(certs[0].Raw) - - messages := make(chan interface{}) - errors := make(chan error, 1) - outbox := make(chan interface{}) - - // Read messages from the connection and send them on the messages - // channel. When there is an error, send it on the error channel and - // return. Applies also when the connection gets closed, so the pattern - // below is to close the connection on error, then wait for the error - // signal from messageReader to exit. - go messageReader(conn, messages, errors) - - pingTicker := time.NewTicker(pingInterval) - timeoutTicker := time.NewTimer(networkTimeout) - joined := false - - for { - select { - case message := <-messages: - timeoutTicker.Reset(networkTimeout) - if debug { - log.Printf("Message %T from %s", message, id) - } - - switch msg := message.(type) { - case protocol.JoinRelayRequest: - outboxesMut.RLock() - _, ok := outboxes[id] - outboxesMut.RUnlock() - if ok { - protocol.WriteMessage(conn, protocol.ResponseAlreadyConnected) - if debug { - log.Println("Already have a peer with the same ID", id, conn.RemoteAddr()) - } - conn.Close() - continue - } - - outboxesMut.Lock() - outboxes[id] = outbox - outboxesMut.Unlock() - joined = true - - protocol.WriteMessage(conn, protocol.ResponseSuccess) - - case protocol.ConnectRequest: - requestedPeer := syncthingprotocol.DeviceIDFromBytes(msg.ID) - outboxesMut.RLock() - peerOutbox, ok := outboxes[requestedPeer] - outboxesMut.RUnlock() - if !ok { - if debug { - log.Println(id, "is looking for", requestedPeer, "which does not exist") - } - protocol.WriteMessage(conn, protocol.ResponseNotFound) - conn.Close() - continue - } - // requestedPeer is the server, id is the client - ses := newSession(requestedPeer, id, sessionLimiter, globalLimiter) - - go ses.Serve() - - clientInvitation := ses.GetClientInvitationMessage() - serverInvitation := ses.GetServerInvitationMessage() - - if err := protocol.WriteMessage(conn, clientInvitation); err != nil { - if debug { - log.Printf("Error sending invitation from %s to client: %s", id, err) - } - conn.Close() - continue - } - - peerOutbox <- serverInvitation - - if debug { - log.Println("Sent invitation from", id, "to", requestedPeer) - } - conn.Close() - - case protocol.Ping: - if err := protocol.WriteMessage(conn, protocol.Pong{}); err != nil { - if debug { - log.Println("Error writing pong:", err) - } - conn.Close() - continue - } - - case protocol.Pong: - // Nothing - - default: - if debug { - log.Printf("Unknown message %s: %T", id, message) - } - protocol.WriteMessage(conn, protocol.ResponseUnexpectedMessage) - conn.Close() - } - - case err := <-errors: - if debug { - log.Printf("Closing connection %s: %s", id, err) - } - close(outbox) - - // Potentially closing a second time. - conn.Close() - - if joined { - // Only delete the outbox if the client is joined, as it might be - // a lookup request coming from the same client. - outboxesMut.Lock() - delete(outboxes, id) - outboxesMut.Unlock() - // Also, kill all sessions related to this node, as it probably - // went offline. This is for the other end to realize the client - // is no longer there faster. This also helps resolve - // 'already connected' errors when one of the sides is - // restarting, and connecting to the other peer before the other - // peer even realised that the node has gone away. - dropSessions(id) - } - return - - case <-pingTicker.C: - if !joined { - if debug { - log.Println(id, "didn't join within", pingInterval) - } - conn.Close() - continue - } - - if err := protocol.WriteMessage(conn, protocol.Ping{}); err != nil { - if debug { - log.Println(id, err) - } - conn.Close() - } - - case <-timeoutTicker.C: - // We should receive a error from the reader loop, which will cause - // us to quit this loop. - if debug { - log.Printf("%s timed out", id) - } - conn.Close() - - case msg := <-outbox: - if debug { - log.Printf("Sending message %T to %s", msg, id) - } - if err := protocol.WriteMessage(conn, msg); err != nil { - if debug { - log.Println(id, err) - } - conn.Close() - } - } - } -} - -func sessionConnectionHandler(conn net.Conn) { - if err := conn.SetDeadline(time.Now().Add(messageTimeout)); err != nil { - if debug { - log.Println("Weird error setting deadline:", err, "on", conn.RemoteAddr()) - } - return - } - - message, err := protocol.ReadMessage(conn) - if err != nil { - return - } - - switch msg := message.(type) { - case protocol.JoinSessionRequest: - ses := findSession(string(msg.Key)) - if debug { - log.Println(conn.RemoteAddr(), "session lookup", ses, hex.EncodeToString(msg.Key)[:5]) - } - - if ses == nil { - protocol.WriteMessage(conn, protocol.ResponseNotFound) - conn.Close() - return - } - - if !ses.AddConnection(conn) { - if debug { - log.Println("Failed to add", conn.RemoteAddr(), "to session", ses) - } - protocol.WriteMessage(conn, protocol.ResponseAlreadyConnected) - conn.Close() - return - } - - if err := protocol.WriteMessage(conn, protocol.ResponseSuccess); err != nil { - if debug { - log.Println("Failed to send session join response to ", conn.RemoteAddr(), "for", ses) - } - return - } - - if err := conn.SetDeadline(time.Time{}); err != nil { - if debug { - log.Println("Weird error setting deadline:", err, "on", conn.RemoteAddr()) - } - conn.Close() - return - } - - default: - if debug { - log.Println("Unexpected message from", conn.RemoteAddr(), message) - } - protocol.WriteMessage(conn, protocol.ResponseUnexpectedMessage) - conn.Close() - } -} - -func messageReader(conn net.Conn, messages chan<- interface{}, errors chan<- error) { - atomic.AddInt64(&numConnections, 1) - defer atomic.AddInt64(&numConnections, -1) - - for { - msg, err := protocol.ReadMessage(conn) - if err != nil { - errors <- err - return - } - messages <- msg - } -} diff --git a/lib/relaysrv/main.go b/lib/relaysrv/main.go deleted file mode 100644 index 614f82c7f..000000000 --- a/lib/relaysrv/main.go +++ /dev/null @@ -1,142 +0,0 @@ -// Copyright (C) 2015 Audrius Butkevicius and Contributors (see the CONTRIBUTORS file). - -package main - -import ( - "crypto/tls" - "flag" - "fmt" - "log" - "net" - "net/url" - "path/filepath" - "strings" - "time" - - "github.com/juju/ratelimit" - "github.com/syncthing/relaysrv/protocol" - "github.com/syncthing/syncthing/lib/tlsutil" - - syncthingprotocol "github.com/syncthing/protocol" -) - -var ( - listen string - debug bool = false - - sessionAddress []byte - sessionPort uint16 - - networkTimeout time.Duration = 2 * time.Minute - pingInterval time.Duration = time.Minute - messageTimeout time.Duration = time.Minute - - sessionLimitBps int - globalLimitBps int - sessionLimiter *ratelimit.Bucket - globalLimiter *ratelimit.Bucket - - statusAddr string - poolAddrs string - defaultPoolAddrs string = "https://relays.syncthing.net" -) - -func main() { - log.SetFlags(log.Lshortfile | log.LstdFlags) - - var dir, extAddress string - - flag.StringVar(&listen, "listen", ":22067", "Protocol listen address") - flag.StringVar(&dir, "keys", ".", "Directory where cert.pem and key.pem is stored") - flag.DurationVar(&networkTimeout, "network-timeout", networkTimeout, "Timeout for network operations between the client and the relay.\n\tIf no data is received between the client and the relay in this period of time, the connection is terminated.\n\tFurthermore, if no data is sent between either clients being relayed within this period of time, the session is also terminated.") - flag.DurationVar(&pingInterval, "ping-interval", pingInterval, "How often pings are sent") - flag.DurationVar(&messageTimeout, "message-timeout", messageTimeout, "Maximum amount of time we wait for relevant messages to arrive") - flag.IntVar(&sessionLimitBps, "per-session-rate", sessionLimitBps, "Per session rate limit, in bytes/s") - flag.IntVar(&globalLimitBps, "global-rate", globalLimitBps, "Global rate limit, in bytes/s") - flag.BoolVar(&debug, "debug", debug, "Enable debug output") - flag.StringVar(&statusAddr, "status-srv", ":22070", "Listen address for status service (blank to disable)") - flag.StringVar(&poolAddrs, "pools", defaultPoolAddrs, "Comma separated list of relay pool addresses to join") - - flag.Parse() - - if extAddress == "" { - extAddress = listen - } - - addr, err := net.ResolveTCPAddr("tcp", extAddress) - if err != nil { - log.Fatal(err) - } - - sessionAddress = addr.IP[:] - sessionPort = uint16(addr.Port) - - certFile, keyFile := filepath.Join(dir, "cert.pem"), filepath.Join(dir, "key.pem") - cert, err := tls.LoadX509KeyPair(certFile, keyFile) - if err != nil { - log.Println("Failed to load keypair. Generating one, this might take a while...") - cert, err = tlsutil.NewCertificate(certFile, keyFile, "relaysrv", 3072) - if err != nil { - log.Fatalln("Failed to generate X509 key pair:", err) - } - } - - tlsCfg := &tls.Config{ - Certificates: []tls.Certificate{cert}, - NextProtos: []string{protocol.ProtocolName}, - ClientAuth: tls.RequestClientCert, - SessionTicketsDisabled: true, - InsecureSkipVerify: true, - MinVersion: tls.VersionTLS12, - CipherSuites: []uint16{ - tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256, - tls.TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256, - tls.TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA, - tls.TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA, - tls.TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA, - tls.TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA, - }, - } - - id := syncthingprotocol.NewDeviceID(cert.Certificate[0]) - if debug { - log.Println("ID:", id) - } - - if sessionLimitBps > 0 { - sessionLimiter = ratelimit.NewBucketWithRate(float64(sessionLimitBps), int64(2*sessionLimitBps)) - } - if globalLimitBps > 0 { - globalLimiter = ratelimit.NewBucketWithRate(float64(globalLimitBps), int64(2*globalLimitBps)) - } - - if statusAddr != "" { - go statusService(statusAddr) - } - - uri, err := url.Parse(fmt.Sprintf("relay://%s/?id=%s&pingInterval=%s&networkTimeout=%s&sessionLimitBps=%d&globalLimitBps=%d&statusAddr=%s", extAddress, id, pingInterval, networkTimeout, sessionLimitBps, globalLimitBps, statusAddr)) - if err != nil { - log.Fatalln("Failed to construct URI", err) - } - - if debug { - log.Println("URI:", uri.String()) - } - - if poolAddrs == defaultPoolAddrs { - log.Println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!") - log.Println("!! Joining default relay pools, this relay will be available for public use. !!") - log.Println(`!! Use the -pools="" command line option to make the relay private. !!`) - log.Println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!") - } - - pools := strings.Split(poolAddrs, ",") - for _, pool := range pools { - pool = strings.TrimSpace(pool) - if len(pool) > 0 { - go poolHandler(pool, uri) - } - } - - listener(listen, tlsCfg) -} diff --git a/lib/relaysrv/pool.go b/lib/relaysrv/pool.go deleted file mode 100644 index 0f327f690..000000000 --- a/lib/relaysrv/pool.go +++ /dev/null @@ -1,68 +0,0 @@ -// Copyright (C) 2015 Audrius Butkevicius and Contributors (see the CONTRIBUTORS file). - -package main - -import ( - "bytes" - "encoding/json" - "io/ioutil" - "log" - "net/http" - "net/url" - "time" -) - -func poolHandler(pool string, uri *url.URL) { - for { - var b bytes.Buffer - json.NewEncoder(&b).Encode(struct { - URL string `json:"url"` - }{ - uri.String(), - }) - - resp, err := http.Post(pool, "application/json", &b) - if err != nil { - if debug { - log.Println("Error joining pool", pool, err) - } - } else if resp.StatusCode == 500 { - if debug { - bs, err := ioutil.ReadAll(resp.Body) - if err != nil { - log.Println("Failed to read response body for", pool, err) - } else { - log.Println("Response for", pool, string(bs)) - } - resp.Body.Close() - } - } else if resp.StatusCode == 429 { - if debug { - log.Println(pool, "under load, will retry in a minute") - } - time.Sleep(time.Minute) - continue - } else if resp.StatusCode == 403 { - if debug { - log.Println(pool, "failed to join due to IP address not matching external address") - } - return - } else if resp.StatusCode == 200 { - var x struct { - EvictionIn time.Duration `json:"evictionIn"` - } - err := json.NewDecoder(resp.Body).Decode(&x) - if err == nil { - rejoin := x.EvictionIn - (x.EvictionIn / 5) - if debug { - log.Println("Joined", pool, "rejoining in", rejoin) - } - time.Sleep(rejoin) - continue - } else if debug { - log.Println("Failed to deserialize respnse", err) - } - } - time.Sleep(time.Hour) - } -} diff --git a/lib/relaysrv/session.go b/lib/relaysrv/session.go deleted file mode 100644 index bbd29d1f5..000000000 --- a/lib/relaysrv/session.go +++ /dev/null @@ -1,313 +0,0 @@ -// Copyright (C) 2015 Audrius Butkevicius and Contributors (see the CONTRIBUTORS file). - -package main - -import ( - "crypto/rand" - "encoding/hex" - "fmt" - "log" - "net" - "sync" - "sync/atomic" - "time" - - "github.com/juju/ratelimit" - "github.com/syncthing/relaysrv/protocol" - - syncthingprotocol "github.com/syncthing/protocol" -) - -var ( - sessionMut = sync.RWMutex{} - activeSessions = make([]*session, 0) - pendingSessions = make(map[string]*session, 0) - numProxies int64 - bytesProxied int64 -) - -func newSession(serverid, clientid syncthingprotocol.DeviceID, sessionRateLimit, globalRateLimit *ratelimit.Bucket) *session { - serverkey := make([]byte, 32) - _, err := rand.Read(serverkey) - if err != nil { - return nil - } - - clientkey := make([]byte, 32) - _, err = rand.Read(clientkey) - if err != nil { - return nil - } - - ses := &session{ - serverkey: serverkey, - serverid: serverid, - clientkey: clientkey, - clientid: clientid, - rateLimit: makeRateLimitFunc(sessionRateLimit, globalRateLimit), - connsChan: make(chan net.Conn), - conns: make([]net.Conn, 0, 2), - } - - if debug { - log.Println("New session", ses) - } - - sessionMut.Lock() - pendingSessions[string(ses.serverkey)] = ses - pendingSessions[string(ses.clientkey)] = ses - sessionMut.Unlock() - - return ses -} - -func findSession(key string) *session { - sessionMut.Lock() - defer sessionMut.Unlock() - ses, ok := pendingSessions[key] - if !ok { - return nil - - } - delete(pendingSessions, key) - return ses -} - -func dropSessions(id syncthingprotocol.DeviceID) { - sessionMut.RLock() - for _, session := range activeSessions { - if session.HasParticipant(id) { - if debug { - log.Println("Dropping session", session, "involving", id) - } - session.CloseConns() - } - } - sessionMut.RUnlock() -} - -type session struct { - mut sync.Mutex - - serverkey []byte - serverid syncthingprotocol.DeviceID - - clientkey []byte - clientid syncthingprotocol.DeviceID - - rateLimit func(bytes int64) - - connsChan chan net.Conn - conns []net.Conn -} - -func (s *session) AddConnection(conn net.Conn) bool { - if debug { - log.Println("New connection for", s, "from", conn.RemoteAddr()) - } - - select { - case s.connsChan <- conn: - return true - default: - } - return false -} - -func (s *session) Serve() { - timedout := time.After(messageTimeout) - - if debug { - log.Println("Session", s, "serving") - } - - for { - select { - case conn := <-s.connsChan: - s.mut.Lock() - s.conns = append(s.conns, conn) - s.mut.Unlock() - // We're the only ones mutating% s.conns, hence we are free to read it. - if len(s.conns) < 2 { - continue - } - - close(s.connsChan) - - if debug { - log.Println("Session", s, "starting between", s.conns[0].RemoteAddr(), "and", s.conns[1].RemoteAddr()) - } - - wg := sync.WaitGroup{} - wg.Add(2) - - var err0 error - go func() { - err0 = s.proxy(s.conns[0], s.conns[1]) - wg.Done() - }() - - var err1 error - go func() { - err1 = s.proxy(s.conns[1], s.conns[0]) - wg.Done() - }() - - sessionMut.Lock() - activeSessions = append(activeSessions, s) - sessionMut.Unlock() - - wg.Wait() - - if debug { - log.Println("Session", s, "ended, outcomes:", err0, "and", err1) - } - goto done - - case <-timedout: - if debug { - log.Println("Session", s, "timed out") - } - goto done - } - } -done: - // We can end up here in 3 cases: - // 1. Timeout joining, in which case there are potentially entries in pendingSessions - // 2. General session end/timeout, in which case there are entries in activeSessions - // 3. Protocol handler calls dropSession as one of it's clients disconnects. - - sessionMut.Lock() - delete(pendingSessions, string(s.serverkey)) - delete(pendingSessions, string(s.clientkey)) - - for i, session := range activeSessions { - if session == s { - l := len(activeSessions) - 1 - activeSessions[i] = activeSessions[l] - activeSessions[l] = nil - activeSessions = activeSessions[:l] - } - } - sessionMut.Unlock() - - // If we are here because of case 2 or 3, we are potentially closing some or - // all connections a second time. - s.CloseConns() - - if debug { - log.Println("Session", s, "stopping") - } -} - -func (s *session) GetClientInvitationMessage() protocol.SessionInvitation { - return protocol.SessionInvitation{ - From: s.serverid[:], - Key: []byte(s.clientkey), - Address: sessionAddress, - Port: sessionPort, - ServerSocket: false, - } -} - -func (s *session) GetServerInvitationMessage() protocol.SessionInvitation { - return protocol.SessionInvitation{ - From: s.clientid[:], - Key: []byte(s.serverkey), - Address: sessionAddress, - Port: sessionPort, - ServerSocket: true, - } -} - -func (s *session) HasParticipant(id syncthingprotocol.DeviceID) bool { - return s.clientid == id || s.serverid == id -} - -func (s *session) CloseConns() { - s.mut.Lock() - for _, conn := range s.conns { - conn.Close() - } - s.mut.Unlock() -} - -func (s *session) proxy(c1, c2 net.Conn) error { - if debug { - log.Println("Proxy", c1.RemoteAddr(), "->", c2.RemoteAddr()) - } - - atomic.AddInt64(&numProxies, 1) - defer atomic.AddInt64(&numProxies, -1) - - buf := make([]byte, 65536) - for { - c1.SetReadDeadline(time.Now().Add(networkTimeout)) - n, err := c1.Read(buf) - if err != nil { - return err - } - - atomic.AddInt64(&bytesProxied, int64(n)) - - if debug { - log.Printf("%d bytes from %s to %s", n, c1.RemoteAddr(), c2.RemoteAddr()) - } - - if s.rateLimit != nil { - s.rateLimit(int64(n)) - } - - c2.SetWriteDeadline(time.Now().Add(networkTimeout)) - _, err = c2.Write(buf[:n]) - if err != nil { - return err - } - } -} - -func (s *session) String() string { - return fmt.Sprintf("<%s/%s>", hex.EncodeToString(s.clientkey)[:5], hex.EncodeToString(s.serverkey)[:5]) -} - -func makeRateLimitFunc(sessionRateLimit, globalRateLimit *ratelimit.Bucket) func(int64) { - // This may be a case of super duper premature optimization... We build an - // optimized function to do the rate limiting here based on what we need - // to do and then use it in the loop. - - if sessionRateLimit == nil && globalRateLimit == nil { - // No limiting needed. We could equally well return a func(int64){} and - // not do a nil check were we use it, but I think the nil check there - // makes it clear that there will be no limiting if none is - // configured... - return nil - } - - if sessionRateLimit == nil { - // We only have a global limiter - return func(bytes int64) { - globalRateLimit.Wait(bytes) - } - } - - if globalRateLimit == nil { - // We only have a session limiter - return func(bytes int64) { - sessionRateLimit.Wait(bytes) - } - } - - // We have both. Queue the bytes on both the global and session specific - // rate limiters. Wait for both in parallell, so that the actual send - // happens when both conditions are satisfied. In practice this just means - // wait the longer of the two times. - return func(bytes int64) { - t0 := sessionRateLimit.Take(bytes) - t1 := globalRateLimit.Take(bytes) - if t0 > t1 { - time.Sleep(t0) - } else { - time.Sleep(t1) - } - } -} diff --git a/lib/relaysrv/status.go b/lib/relaysrv/status.go deleted file mode 100644 index b18cf3ea7..000000000 --- a/lib/relaysrv/status.go +++ /dev/null @@ -1,94 +0,0 @@ -package main - -import ( - "encoding/json" - "log" - "net/http" - "runtime" - "sync/atomic" - "time" -) - -var rc *rateCalculator - -func statusService(addr string) { - rc = newRateCalculator(360, 10*time.Second, &bytesProxied) - - http.HandleFunc("/status", getStatus) - if err := http.ListenAndServe(addr, nil); err != nil { - log.Fatal(err) - } -} - -func getStatus(w http.ResponseWriter, r *http.Request) { - status := make(map[string]interface{}) - - sessionMut.Lock() - // This can potentially be double the number of pending sessions, as each session has two keys, one for each side. - status["numPendingSessionKeys"] = len(pendingSessions) - status["numActiveSessions"] = len(activeSessions) - sessionMut.Unlock() - status["numConnections"] = atomic.LoadInt64(&numConnections) - status["numProxies"] = atomic.LoadInt64(&numProxies) - status["bytesProxied"] = atomic.LoadInt64(&bytesProxied) - status["goVersion"] = runtime.Version() - status["goOS"] = runtime.GOOS - status["goAarch"] = runtime.GOARCH - status["goMaxProcs"] = runtime.GOMAXPROCS(-1) - status["kbps10s1m5m15m30m60m"] = []int64{ - rc.rate(10/10) * 8 / 1000, - rc.rate(60/10) * 8 / 1000, - rc.rate(5*60/10) * 8 / 1000, - rc.rate(15*60/10) * 8 / 1000, - rc.rate(30*60/10) * 8 / 1000, - rc.rate(60*60/10) * 8 / 1000, - } - - bs, err := json.MarshalIndent(status, "", " ") - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - - w.Header().Set("Content-Type", "application/json") - w.Write(bs) -} - -type rateCalculator struct { - rates []int64 - prev int64 - counter *int64 -} - -func newRateCalculator(keepIntervals int, interval time.Duration, counter *int64) *rateCalculator { - r := &rateCalculator{ - rates: make([]int64, keepIntervals), - counter: counter, - } - - go r.updateRates(interval) - - return r -} - -func (r *rateCalculator) updateRates(interval time.Duration) { - for { - now := time.Now() - next := now.Truncate(interval).Add(interval) - time.Sleep(next.Sub(now)) - - cur := atomic.LoadInt64(r.counter) - rate := int64(float64(cur-r.prev) / interval.Seconds()) - copy(r.rates[1:], r.rates) - r.rates[0] = rate - r.prev = cur - } -} - -func (r *rateCalculator) rate(periods int) int64 { - var tot int64 - for i := 0; i < periods; i++ { - tot += r.rates[i] - } - return tot / int64(periods) -} diff --git a/lib/relaysrv/testutil/main.go b/lib/relaysrv/testutil/main.go deleted file mode 100644 index ffeb9942d..000000000 --- a/lib/relaysrv/testutil/main.go +++ /dev/null @@ -1,149 +0,0 @@ -// Copyright (C) 2015 Audrius Butkevicius and Contributors (see the CONTRIBUTORS file). - -package main - -import ( - "bufio" - "crypto/tls" - "flag" - "log" - "net" - "net/url" - "os" - "path/filepath" - "time" - - syncthingprotocol "github.com/syncthing/protocol" - "github.com/syncthing/relaysrv/client" - "github.com/syncthing/relaysrv/protocol" -) - -func main() { - log.SetOutput(os.Stdout) - log.SetFlags(log.LstdFlags | log.Lshortfile) - - var connect, relay, dir string - var join, test bool - - flag.StringVar(&connect, "connect", "", "Device ID to which to connect to") - flag.BoolVar(&join, "join", false, "Join relay") - flag.BoolVar(&test, "test", false, "Generic relay test") - flag.StringVar(&relay, "relay", "relay://127.0.0.1:22067", "Relay address") - flag.StringVar(&dir, "keys", ".", "Directory where cert.pem and key.pem is stored") - - flag.Parse() - - certFile, keyFile := filepath.Join(dir, "cert.pem"), filepath.Join(dir, "key.pem") - cert, err := tls.LoadX509KeyPair(certFile, keyFile) - if err != nil { - log.Fatalln("Failed to load X509 key pair:", err) - } - - id := syncthingprotocol.NewDeviceID(cert.Certificate[0]) - log.Println("ID:", id) - - uri, err := url.Parse(relay) - if err != nil { - log.Fatal(err) - } - - stdin := make(chan string) - - go stdinReader(stdin) - - if join { - log.Println("Creating client") - relay := client.NewProtocolClient(uri, []tls.Certificate{cert}, nil) - log.Println("Created client") - - go relay.Serve() - - recv := make(chan protocol.SessionInvitation) - - go func() { - log.Println("Starting invitation receiver") - for invite := range relay.Invitations { - select { - case recv <- invite: - log.Println("Received invitation", invite) - default: - log.Println("Discarding invitation", invite) - } - } - }() - - for { - conn, err := client.JoinSession(<-recv) - if err != nil { - log.Fatalln("Failed to join", err) - } - log.Println("Joined", conn.RemoteAddr(), conn.LocalAddr()) - connectToStdio(stdin, conn) - log.Println("Finished", conn.RemoteAddr(), conn.LocalAddr()) - } - } else if connect != "" { - id, err := syncthingprotocol.DeviceIDFromString(connect) - if err != nil { - log.Fatal(err) - } - - invite, err := client.GetInvitationFromRelay(uri, id, []tls.Certificate{cert}) - if err != nil { - log.Fatal(err) - } - - log.Println("Received invitation", invite) - conn, err := client.JoinSession(invite) - if err != nil { - log.Fatalln("Failed to join", err) - } - log.Println("Joined", conn.RemoteAddr(), conn.LocalAddr()) - connectToStdio(stdin, conn) - log.Println("Finished", conn.RemoteAddr(), conn.LocalAddr()) - } else if test { - if client.TestRelay(uri, []tls.Certificate{cert}) { - log.Println("OK") - } else { - log.Println("FAIL") - } - } else { - log.Fatal("Requires either join or connect") - } -} - -func stdinReader(c chan<- string) { - scanner := bufio.NewScanner(os.Stdin) - for scanner.Scan() { - c <- scanner.Text() - c <- "\n" - } -} - -func connectToStdio(stdin <-chan string, conn net.Conn) { - go func() { - - }() - - buf := make([]byte, 1024) - for { - conn.SetReadDeadline(time.Now().Add(time.Millisecond)) - n, err := conn.Read(buf[0:]) - if err != nil { - nerr, ok := err.(net.Error) - if !ok || !nerr.Timeout() { - log.Println(err) - return - } - } - os.Stdout.Write(buf[:n]) - - select { - case msg := <-stdin: - _, err := conn.Write([]byte(msg)) - if err != nil { - return - } - default: - } - } -} diff --git a/lib/relaysrv/utils.go b/lib/relaysrv/utils.go deleted file mode 100644 index 7d1f6bfa4..000000000 --- a/lib/relaysrv/utils.go +++ /dev/null @@ -1,28 +0,0 @@ -// Copyright (C) 2015 Audrius Butkevicius and Contributors (see the CONTRIBUTORS file). - -package main - -import ( - "errors" - "net" -) - -func setTCPOptions(conn net.Conn) error { - tcpConn, ok := conn.(*net.TCPConn) - if !ok { - return errors.New("Not a TCP connection") - } - if err := tcpConn.SetLinger(0); err != nil { - return err - } - if err := tcpConn.SetNoDelay(true); err != nil { - return err - } - if err := tcpConn.SetKeepAlivePeriod(networkTimeout); err != nil { - return err - } - if err := tcpConn.SetKeepAlive(true); err != nil { - return err - } - return nil -}