Merge pull request #2385 from AudriusButkevicius/you-are-next

Add separate client for dynamic relays (fixes #2368)
This commit is contained in:
Jakob Borg 2015-10-22 08:01:22 +02:00
commit 9a1922fdc6
5 changed files with 551 additions and 375 deletions

View File

@ -5,279 +5,37 @@ package client
import ( import (
"crypto/tls" "crypto/tls"
"fmt" "fmt"
"net"
"net/url" "net/url"
"time" "time"
"github.com/syncthing/syncthing/lib/dialer"
syncthingprotocol "github.com/syncthing/syncthing/lib/protocol"
"github.com/syncthing/syncthing/lib/relay/protocol" "github.com/syncthing/syncthing/lib/relay/protocol"
"github.com/syncthing/syncthing/lib/sync"
) )
type ProtocolClient struct { type relayClientFactory func(uri *url.URL, certs []tls.Certificate, invitations chan protocol.SessionInvitation) RelayClient
URI *url.URL
Invitations chan protocol.SessionInvitation
closeInvitationsOnFinish bool var (
supportedSchemes = map[string]relayClientFactory{
"relay": newStaticClient,
"dynamic+http": newDynamicClient,
"dynamic+https": newDynamicClient,
}
)
config *tls.Config type RelayClient interface {
Serve()
timeout time.Duration Stop()
StatusOK() bool
stop chan struct{} Latency() time.Duration
stopped chan struct{} String() string
Invitations() chan protocol.SessionInvitation
conn *tls.Conn URI() *url.URL
mut sync.RWMutex
connected bool
latency time.Duration
} }
func NewProtocolClient(uri *url.URL, certs []tls.Certificate, invitations chan protocol.SessionInvitation) *ProtocolClient { func NewClient(uri *url.URL, certs []tls.Certificate, invitations chan protocol.SessionInvitation) (RelayClient, error) {
closeInvitationsOnFinish := false factory, ok := supportedSchemes[uri.Scheme]
if invitations == nil { if !ok {
closeInvitationsOnFinish = true return nil, fmt.Errorf("Unsupported scheme: %s", uri.Scheme)
invitations = make(chan protocol.SessionInvitation)
} }
return &ProtocolClient{ return factory(uri, certs, invitations), nil
URI: uri,
Invitations: invitations,
closeInvitationsOnFinish: closeInvitationsOnFinish,
config: configForCerts(certs),
timeout: time.Minute * 2,
stop: make(chan struct{}),
stopped: make(chan struct{}),
mut: sync.NewRWMutex(),
connected: false,
}
}
func (c *ProtocolClient) Serve() {
c.stop = make(chan struct{})
c.stopped = make(chan struct{})
defer close(c.stopped)
if err := c.connect(); err != nil {
l.Debugln("Relay connect:", err)
return
}
l.Debugln(c, "connected", c.conn.RemoteAddr())
if err := c.join(); err != nil {
c.conn.Close()
l.Infoln("Relay join:", err)
return
}
if err := c.conn.SetDeadline(time.Time{}); err != nil {
c.conn.Close()
l.Infoln("Relay set deadline:", err)
return
}
l.Debugln(c, "joined", c.conn.RemoteAddr(), "via", c.conn.LocalAddr())
defer c.cleanup()
c.mut.Lock()
c.connected = true
c.mut.Unlock()
messages := make(chan interface{})
errors := make(chan error, 1)
go messageReader(c.conn, messages, errors)
timeout := time.NewTimer(c.timeout)
for {
select {
case message := <-messages:
timeout.Reset(c.timeout)
l.Debugf("%s received message %T", c, message)
switch msg := message.(type) {
case protocol.Ping:
if err := protocol.WriteMessage(c.conn, protocol.Pong{}); err != nil {
l.Infoln("Relay write:", err)
return
}
l.Debugln(c, "sent pong")
case protocol.SessionInvitation:
ip := net.IP(msg.Address)
if len(ip) == 0 || ip.IsUnspecified() {
msg.Address = c.conn.RemoteAddr().(*net.TCPAddr).IP[:]
}
c.Invitations <- msg
default:
l.Infoln("Relay: protocol error: unexpected message %v", msg)
return
}
case <-c.stop:
l.Debugln(c, "stopping")
return
case err := <-errors:
l.Infoln("Relay received:", err)
return
case <-timeout.C:
l.Debugln(c, "timed out")
return
}
}
}
func (c *ProtocolClient) Stop() {
if c.stop == nil {
return
}
close(c.stop)
<-c.stopped
}
func (c *ProtocolClient) StatusOK() bool {
c.mut.RLock()
con := c.connected
c.mut.RUnlock()
return con
}
func (c *ProtocolClient) Latency() time.Duration {
c.mut.RLock()
lat := c.latency
c.mut.RUnlock()
return lat
}
func (c *ProtocolClient) String() string {
return fmt.Sprintf("ProtocolClient@%p", c)
}
func (c *ProtocolClient) connect() error {
if c.URI.Scheme != "relay" {
return fmt.Errorf("Unsupported relay schema: %v", c.URI.Scheme)
}
t0 := time.Now()
tcpConn, err := dialer.Dial("tcp", c.URI.Host)
if err != nil {
return err
}
c.mut.Lock()
c.latency = time.Since(t0)
c.mut.Unlock()
conn := tls.Client(tcpConn, c.config)
if err = conn.Handshake(); err != nil {
return err
}
if err := conn.SetDeadline(time.Now().Add(10 * time.Second)); err != nil {
conn.Close()
return err
}
if err := performHandshakeAndValidation(conn, c.URI); err != nil {
conn.Close()
return err
}
c.conn = conn
return nil
}
func (c *ProtocolClient) cleanup() {
if c.closeInvitationsOnFinish {
close(c.Invitations)
c.Invitations = make(chan protocol.SessionInvitation)
}
l.Debugln(c, "cleaning up")
c.mut.Lock()
c.connected = false
c.mut.Unlock()
c.conn.Close()
}
func (c *ProtocolClient) join() error {
if err := protocol.WriteMessage(c.conn, protocol.JoinRelayRequest{}); err != nil {
return err
}
message, err := protocol.ReadMessage(c.conn)
if err != nil {
return err
}
switch msg := message.(type) {
case protocol.Response:
if msg.Code != 0 {
return fmt.Errorf("Incorrect response code %d: %s", msg.Code, msg.Message)
}
default:
return fmt.Errorf("protocol error: expecting response got %v", msg)
}
return nil
}
func performHandshakeAndValidation(conn *tls.Conn, uri *url.URL) error {
if err := conn.Handshake(); err != nil {
return err
}
cs := conn.ConnectionState()
if !cs.NegotiatedProtocolIsMutual || cs.NegotiatedProtocol != protocol.ProtocolName {
return fmt.Errorf("protocol negotiation error")
}
q := uri.Query()
relayIDs := q.Get("id")
if relayIDs != "" {
relayID, err := syncthingprotocol.DeviceIDFromString(relayIDs)
if err != nil {
return fmt.Errorf("relay address contains invalid verification id: %s", err)
}
certs := cs.PeerCertificates
if cl := len(certs); cl != 1 {
return fmt.Errorf("unexpected certificate count: %d", cl)
}
remoteID := syncthingprotocol.NewDeviceID(certs[0].Raw)
if remoteID != relayID {
return fmt.Errorf("relay id does not match. Expected %v got %v", relayID, remoteID)
}
}
return nil
}
func messageReader(conn net.Conn, messages chan<- interface{}, errors chan<- error) {
for {
msg, err := protocol.ReadMessage(conn)
if err != nil {
errors <- err
return
}
messages <- msg
}
} }

217
lib/relay/client/dynamic.go Normal file
View File

@ -0,0 +1,217 @@
// Copyright (C) 2015 Audrius Butkevicius and Contributors (see the CONTRIBUTORS file).
package client
import (
"crypto/tls"
"encoding/json"
"fmt"
"net/http"
"net/url"
"sort"
"time"
"github.com/syncthing/syncthing/lib/osutil"
"github.com/syncthing/syncthing/lib/relay/protocol"
"github.com/syncthing/syncthing/lib/sync"
)
type dynamicClient struct {
pooladdr *url.URL
certs []tls.Certificate
invitations chan protocol.SessionInvitation
closeInvitationsOnFinish bool
mut sync.RWMutex
client RelayClient
stop chan struct{}
}
func newDynamicClient(uri *url.URL, certs []tls.Certificate, invitations chan protocol.SessionInvitation) RelayClient {
closeInvitationsOnFinish := false
if invitations == nil {
closeInvitationsOnFinish = true
invitations = make(chan protocol.SessionInvitation)
}
return &dynamicClient{
pooladdr: uri,
certs: certs,
invitations: invitations,
closeInvitationsOnFinish: closeInvitationsOnFinish,
mut: sync.NewRWMutex(),
}
}
func (c *dynamicClient) Serve() {
c.mut.Lock()
c.stop = make(chan struct{})
c.mut.Unlock()
uri := *c.pooladdr
// Trim off the `dynamic+` prefix
uri.Scheme = uri.Scheme[8:]
l.Debugln(c, "looking up dynamic relays")
data, err := http.Get(uri.String())
if err != nil {
l.Debugln(c, "failed to lookup dynamic relays", err)
return
}
var ann dynamicAnnouncement
err = json.NewDecoder(data.Body).Decode(&ann)
data.Body.Close()
if err != nil {
l.Debugln(c, "failed to lookup dynamic relays", err)
return
}
defer c.cleanup()
var addrs []string
for _, relayAnn := range ann.Relays {
ruri, err := url.Parse(relayAnn.URL)
if err != nil {
l.Debugln(c, "failed to parse dynamic relay address", relayAnn.URL, err)
continue
}
l.Debugln(c, "found", ruri)
addrs = append(addrs, ruri.String())
}
for _, addr := range relayAddressesSortedByLatency(addrs) {
select {
case <-c.stop:
l.Debugln(c, "stopping")
return
default:
ruri, err := url.Parse(addr)
if err != nil {
l.Debugln(c, "skipping relay", addr, err)
continue
}
client, err := NewClient(ruri, c.certs, c.invitations)
if err != nil {
continue
}
c.mut.Lock()
c.client = client
c.mut.Unlock()
c.client.Serve()
c.mut.Lock()
c.client = nil
c.mut.Unlock()
}
}
l.Debugln(c, "could not find a connectable relay")
}
func (c *dynamicClient) Stop() {
c.mut.RLock()
defer c.mut.RUnlock()
close(c.stop)
if c.client == nil {
return
}
c.client.Stop()
}
func (c *dynamicClient) StatusOK() bool {
c.mut.RLock()
defer c.mut.RUnlock()
if c.client == nil {
return false
}
return c.client.StatusOK()
}
func (c *dynamicClient) Latency() time.Duration {
c.mut.RLock()
defer c.mut.RUnlock()
if c.client == nil {
return time.Hour
}
return c.client.Latency()
}
func (c *dynamicClient) String() string {
return fmt.Sprintf("DynamicClient:%p:%s@%s", c, c.URI(), c.pooladdr)
}
func (c *dynamicClient) URI() *url.URL {
c.mut.RLock()
defer c.mut.RUnlock()
if c.client == nil {
return c.pooladdr
}
return c.client.URI()
}
func (c *dynamicClient) Invitations() chan protocol.SessionInvitation {
c.mut.RLock()
inv := c.invitations
c.mut.RUnlock()
return inv
}
func (c *dynamicClient) cleanup() {
c.mut.Lock()
if c.closeInvitationsOnFinish {
close(c.invitations)
c.invitations = make(chan protocol.SessionInvitation)
}
c.mut.Unlock()
}
// This is the announcement recieved from the relay server;
// {"relays": [{"url": "relay://10.20.30.40:5060"}, ...]}
type dynamicAnnouncement struct {
Relays []struct {
URL string
}
}
// relayAddressesSortedByLatency adds local latency to the relay, and sorts them
// by sum latency, and returns the addresses.
func relayAddressesSortedByLatency(input []string) []string {
relays := make(relayList, len(input))
for i, relay := range input {
if latency, err := osutil.GetLatencyForURL(relay); err == nil {
relays[i] = relayWithLatency{relay, int(latency / time.Millisecond)}
} else {
relays[i] = relayWithLatency{relay, int(time.Hour / time.Millisecond)}
}
}
sort.Sort(relays)
addresses := make([]string, len(relays))
for i, relay := range relays {
addresses[i] = relay.relay
}
return addresses
}
type relayWithLatency struct {
relay string
latency int
}
type relayList []relayWithLatency
func (l relayList) Len() int {
return len(l)
}
func (l relayList) Less(a, b int) bool {
return l[a].latency < l[b].latency
}
func (l relayList) Swap(a, b int) {
l[a], l[b] = l[b], l[a]
}

View File

@ -102,7 +102,11 @@ func JoinSession(invitation protocol.SessionInvitation) (net.Conn, error) {
func TestRelay(uri *url.URL, certs []tls.Certificate, sleep time.Duration, times int) bool { func TestRelay(uri *url.URL, certs []tls.Certificate, sleep time.Duration, times int) bool {
id := syncthingprotocol.NewDeviceID(certs[0].Certificate[0]) id := syncthingprotocol.NewDeviceID(certs[0].Certificate[0])
invs := make(chan protocol.SessionInvitation, 1) invs := make(chan protocol.SessionInvitation, 1)
c := NewProtocolClient(uri, certs, invs) c, err := NewClient(uri, certs, invs)
if err != nil {
close(invs)
return false
}
go c.Serve() go c.Serve()
defer func() { defer func() {
close(invs) close(invs)

291
lib/relay/client/static.go Normal file
View File

@ -0,0 +1,291 @@
// Copyright (C) 2015 Audrius Butkevicius and Contributors (see the CONTRIBUTORS file).
package client
import (
"crypto/tls"
"fmt"
"net"
"net/url"
"time"
syncthingprotocol "github.com/syncthing/syncthing/lib/protocol"
"github.com/syncthing/syncthing/lib/relay/protocol"
"github.com/syncthing/syncthing/lib/sync"
)
type staticClient struct {
uri *url.URL
invitations chan protocol.SessionInvitation
closeInvitationsOnFinish bool
config *tls.Config
timeout time.Duration
stop chan struct{}
stopped chan struct{}
conn *tls.Conn
mut sync.RWMutex
connected bool
latency time.Duration
}
func newStaticClient(uri *url.URL, certs []tls.Certificate, invitations chan protocol.SessionInvitation) RelayClient {
closeInvitationsOnFinish := false
if invitations == nil {
closeInvitationsOnFinish = true
invitations = make(chan protocol.SessionInvitation)
}
return &staticClient{
uri: uri,
invitations: invitations,
closeInvitationsOnFinish: closeInvitationsOnFinish,
config: configForCerts(certs),
timeout: time.Minute * 2,
stop: make(chan struct{}),
stopped: make(chan struct{}),
mut: sync.NewRWMutex(),
connected: false,
}
}
func (c *staticClient) Serve() {
c.stop = make(chan struct{})
c.stopped = make(chan struct{})
defer close(c.stopped)
if err := c.connect(); err != nil {
l.Debugln("Relay connect:", err)
return
}
l.Debugln(c, "connected", c.conn.RemoteAddr())
if err := c.join(); err != nil {
c.conn.Close()
l.Infoln("Relay join:", err)
return
}
if err := c.conn.SetDeadline(time.Time{}); err != nil {
c.conn.Close()
l.Infoln("Relay set deadline:", err)
return
}
l.Debugln(c, "joined", c.conn.RemoteAddr(), "via", c.conn.LocalAddr())
defer c.cleanup()
c.mut.Lock()
c.connected = true
c.mut.Unlock()
messages := make(chan interface{})
errors := make(chan error, 1)
go messageReader(c.conn, messages, errors)
timeout := time.NewTimer(c.timeout)
for {
select {
case message := <-messages:
timeout.Reset(c.timeout)
l.Debugf("%s received message %T", c, message)
switch msg := message.(type) {
case protocol.Ping:
if err := protocol.WriteMessage(c.conn, protocol.Pong{}); err != nil {
l.Infoln("Relay write:", err)
return
}
l.Debugln(c, "sent pong")
case protocol.SessionInvitation:
ip := net.IP(msg.Address)
if len(ip) == 0 || ip.IsUnspecified() {
msg.Address = c.conn.RemoteAddr().(*net.TCPAddr).IP[:]
}
c.invitations <- msg
default:
l.Infoln("Relay: protocol error: unexpected message %v", msg)
return
}
case <-c.stop:
l.Debugln(c, "stopping")
return
case err := <-errors:
l.Infoln("Relay received:", err)
return
case <-timeout.C:
l.Debugln(c, "timed out")
return
}
}
}
func (c *staticClient) Stop() {
if c.stop == nil {
return
}
close(c.stop)
<-c.stopped
}
func (c *staticClient) StatusOK() bool {
c.mut.RLock()
con := c.connected
c.mut.RUnlock()
return con
}
func (c *staticClient) Latency() time.Duration {
c.mut.RLock()
lat := c.latency
c.mut.RUnlock()
return lat
}
func (c *staticClient) String() string {
return fmt.Sprintf("StaticClient:%p@%s", c, c.URI())
}
func (c *staticClient) URI() *url.URL {
return c.uri
}
func (c *staticClient) Invitations() chan protocol.SessionInvitation {
c.mut.RLock()
inv := c.invitations
c.mut.RUnlock()
return inv
}
func (c *staticClient) connect() error {
if c.uri.Scheme != "relay" {
return fmt.Errorf("Unsupported relay schema: %v", c.uri.Scheme)
}
t0 := time.Now()
tcpConn, err := net.Dial("tcp", c.uri.Host)
if err != nil {
return err
}
c.mut.Lock()
c.latency = time.Since(t0)
c.mut.Unlock()
conn := tls.Client(tcpConn, c.config)
if err = conn.Handshake(); err != nil {
return err
}
if err := conn.SetDeadline(time.Now().Add(10 * time.Second)); err != nil {
conn.Close()
return err
}
if err := performHandshakeAndValidation(conn, c.uri); err != nil {
conn.Close()
return err
}
c.conn = conn
return nil
}
func (c *staticClient) cleanup() {
l.Debugln(c, "cleaning up")
c.mut.Lock()
if c.closeInvitationsOnFinish {
close(c.invitations)
c.invitations = make(chan protocol.SessionInvitation)
}
c.connected = false
c.mut.Unlock()
c.conn.Close()
}
func (c *staticClient) join() error {
if err := protocol.WriteMessage(c.conn, protocol.JoinRelayRequest{}); err != nil {
return err
}
message, err := protocol.ReadMessage(c.conn)
if err != nil {
return err
}
switch msg := message.(type) {
case protocol.Response:
if msg.Code != 0 {
return fmt.Errorf("Incorrect response code %d: %s", msg.Code, msg.Message)
}
default:
return fmt.Errorf("protocol error: expecting response got %v", msg)
}
return nil
}
func performHandshakeAndValidation(conn *tls.Conn, uri *url.URL) error {
if err := conn.Handshake(); err != nil {
return err
}
cs := conn.ConnectionState()
if !cs.NegotiatedProtocolIsMutual || cs.NegotiatedProtocol != protocol.ProtocolName {
return fmt.Errorf("protocol negotiation error")
}
q := uri.Query()
relayIDs := q.Get("id")
if relayIDs != "" {
relayID, err := syncthingprotocol.DeviceIDFromString(relayIDs)
if err != nil {
return fmt.Errorf("relay address contains invalid verification id: %s", err)
}
certs := cs.PeerCertificates
if cl := len(certs); cl != 1 {
return fmt.Errorf("unexpected certificate count: %d", cl)
}
remoteID := syncthingprotocol.NewDeviceID(certs[0].Raw)
if remoteID != relayID {
return fmt.Errorf("relay id does not match. Expected %v got %v", relayID, remoteID)
}
}
return nil
}
func messageReader(conn net.Conn, messages chan<- interface{}, errors chan<- error) {
for {
msg, err := protocol.ReadMessage(conn)
if err != nil {
errors <- err
return
}
messages <- msg
}
}

View File

@ -8,15 +8,13 @@ package relay
import ( import (
"crypto/tls" "crypto/tls"
"encoding/json"
"net/http"
"net/url" "net/url"
"sort" "sort"
"time" "time"
"github.com/syncthing/syncthing/lib/config" "github.com/syncthing/syncthing/lib/config"
"github.com/syncthing/syncthing/lib/events" "github.com/syncthing/syncthing/lib/events"
"github.com/syncthing/syncthing/lib/osutil"
"github.com/syncthing/syncthing/lib/relay/client" "github.com/syncthing/syncthing/lib/relay/client"
"github.com/syncthing/syncthing/lib/relay/protocol" "github.com/syncthing/syncthing/lib/relay/protocol"
"github.com/syncthing/syncthing/lib/sync" "github.com/syncthing/syncthing/lib/sync"
@ -34,7 +32,7 @@ type Svc struct {
tlsCfg *tls.Config tlsCfg *tls.Config
tokens map[string]suture.ServiceToken tokens map[string]suture.ServiceToken
clients map[string]*client.ProtocolClient clients map[string]client.RelayClient
mut sync.RWMutex mut sync.RWMutex
invitations chan protocol.SessionInvitation invitations chan protocol.SessionInvitation
conns chan *tls.Conn conns chan *tls.Conn
@ -56,7 +54,7 @@ func NewSvc(cfg *config.Wrapper, tlsCfg *tls.Config) *Svc {
tlsCfg: tlsCfg, tlsCfg: tlsCfg,
tokens: make(map[string]suture.ServiceToken), tokens: make(map[string]suture.ServiceToken),
clients: make(map[string]*client.ProtocolClient), clients: make(map[string]client.RelayClient),
mut: sync.NewRWMutex(), mut: sync.NewRWMutex(),
invitations: make(chan protocol.SessionInvitation), invitations: make(chan protocol.SessionInvitation),
conns: conns, conns: conns,
@ -106,61 +104,17 @@ func (s *Svc) CommitConfiguration(from, to config.Configuration) bool {
existing[uri.String()] = uri existing[uri.String()] = uri
} }
// Query dynamic addresses, and pick the closest relay from the ones they provide.
for key, uri := range existing {
if uri.Scheme != "dynamic+http" && uri.Scheme != "dynamic+https" {
continue
}
delete(existing, key)
// Trim off the `dynamic+` prefix
uri.Scheme = uri.Scheme[8:]
l.Debugln("Looking up dynamic relays from", uri)
data, err := http.Get(uri.String())
if err != nil {
l.Debugln("Failed to lookup dynamic relays", err)
continue
}
var ann dynamicAnnouncement
err = json.NewDecoder(data.Body).Decode(&ann)
data.Body.Close()
if err != nil {
l.Debugln("Failed to lookup dynamic relays", err)
continue
}
var dynRelayAddrs []string
for _, relayAnn := range ann.Relays {
ruri, err := url.Parse(relayAnn.URL)
if err != nil {
l.Debugln("Failed to parse dynamic relay address", relayAnn.URL, err)
continue
}
l.Debugln("Found", ruri, "via", uri)
dynRelayAddrs = append(dynRelayAddrs, ruri.String())
}
if len(dynRelayAddrs) > 0 {
dynRelayAddrs = relayAddressesSortedByLatency(dynRelayAddrs)
closestRelay := dynRelayAddrs[0]
l.Debugln("Picking", closestRelay, "as closest dynamic relay from", uri)
ruri, _ := url.Parse(closestRelay)
existing[closestRelay] = ruri
} else {
l.Debugln("No dynamic relay found on", uri)
}
}
s.mut.Lock() s.mut.Lock()
for key, uri := range existing { for key, uri := range existing {
_, ok := s.tokens[key] _, ok := s.tokens[key]
if !ok { if !ok {
l.Debugln("Connecting to relay", uri) l.Debugln("Connecting to relay", uri)
c := client.NewProtocolClient(uri, s.tlsCfg.Certificates, s.invitations) c, err := client.NewClient(uri, s.tlsCfg.Certificates, s.invitations)
if err != nil {
l.Debugln("Failed to connect to relay", uri, err)
continue
}
s.tokens[key] = s.Add(c) s.tokens[key] = s.Add(c)
s.clients[key] = c s.clients[key] = c
} }
@ -197,8 +151,8 @@ func (s *Svc) Relays() []string {
s.mut.RLock() s.mut.RLock()
relays := make([]string, 0, len(s.clients)) relays := make([]string, 0, len(s.clients))
for uri := range s.clients { for _, client := range s.clients {
relays = append(relays, uri) relays = append(relays, client.URI().String())
} }
s.mut.RUnlock() s.mut.RUnlock()
@ -216,14 +170,14 @@ func (s *Svc) RelayStatus(uri string) (time.Duration, bool) {
} }
s.mut.RLock() s.mut.RLock()
client, ok := s.clients[uri] for _, client := range s.clients {
if client.URI().String() == uri {
return client.Latency(), client.StatusOK()
}
}
s.mut.RUnlock() s.mut.RUnlock()
if !ok || !client.StatusOK() { return time.Hour, false
return time.Hour, false
}
return client.Latency(), true
} }
// Accept returns a new *tls.Conn. The connection is already handshaken. // Accept returns a new *tls.Conn. The connection is already handshaken.
@ -277,7 +231,7 @@ func (r *invitationReceiver) Stop() {
// The eventBroadcaster sends a RelayStateChanged event when the relay status // The eventBroadcaster sends a RelayStateChanged event when the relay status
// changes. We need this somewhat ugly polling mechanism as there's currently // changes. We need this somewhat ugly polling mechanism as there's currently
// no way to get the event feed directly from the relay lib. This may be // no way to get the event feed directly from the relay lib. This may be
// somethign to revisit later, possibly. // something to revisit later, possibly.
type eventBroadcaster struct { type eventBroadcaster struct {
svc *Svc svc *Svc
stop chan struct{} stop chan struct{}
@ -322,51 +276,3 @@ func (e *eventBroadcaster) Serve() {
func (e *eventBroadcaster) Stop() { func (e *eventBroadcaster) Stop() {
close(e.stop) close(e.stop)
} }
// This is the announcement recieved from the relay server;
// {"relays": [{"url": "relay://10.20.30.40:5060"}, ...]}
type dynamicAnnouncement struct {
Relays []struct {
URL string
}
}
// relayAddressesSortedByLatency adds local latency to the relay, and sorts them
// by sum latency, and returns the addresses.
func relayAddressesSortedByLatency(input []string) []string {
relays := make(relayList, len(input))
for i, relay := range input {
if latency, err := osutil.GetLatencyForURL(relay); err == nil {
relays[i] = relayWithLatency{relay, int(latency / time.Millisecond)}
} else {
relays[i] = relayWithLatency{relay, int(time.Hour / time.Millisecond)}
}
}
sort.Sort(relays)
addresses := make([]string, len(relays))
for i, relay := range relays {
addresses[i] = relay.relay
}
return addresses
}
type relayWithLatency struct {
relay string
latency int
}
type relayList []relayWithLatency
func (l relayList) Len() int {
return len(l)
}
func (l relayList) Less(a, b int) bool {
return l[a].latency < l[b].latency
}
func (l relayList) Swap(a, b int) {
l[a], l[b] = l[b], l[a]
}