syncthing/lib/relay/client/dynamic.go

234 lines
4.9 KiB
Go
Raw Normal View History

// Copyright (C) 2015 Audrius Butkevicius and Contributors (see the CONTRIBUTORS file).
package client
import (
"crypto/tls"
"encoding/json"
"fmt"
"math/rand"
"net/http"
"net/url"
"sort"
"time"
"github.com/syncthing/syncthing/lib/osutil"
"github.com/syncthing/syncthing/lib/relay/protocol"
"github.com/syncthing/syncthing/lib/sync"
)
type dynamicClient struct {
pooladdr *url.URL
certs []tls.Certificate
invitations chan protocol.SessionInvitation
closeInvitationsOnFinish bool
2015-11-23 21:14:46 +00:00
timeout time.Duration
mut sync.RWMutex
err error
client RelayClient
stop chan struct{}
}
2015-11-23 21:14:46 +00:00
func newDynamicClient(uri *url.URL, certs []tls.Certificate, invitations chan protocol.SessionInvitation, timeout time.Duration) RelayClient {
closeInvitationsOnFinish := false
if invitations == nil {
closeInvitationsOnFinish = true
invitations = make(chan protocol.SessionInvitation)
}
return &dynamicClient{
pooladdr: uri,
certs: certs,
invitations: invitations,
closeInvitationsOnFinish: closeInvitationsOnFinish,
2015-11-23 21:14:46 +00:00
timeout: timeout,
mut: sync.NewRWMutex(),
}
}
func (c *dynamicClient) Serve() {
c.mut.Lock()
c.stop = make(chan struct{})
c.mut.Unlock()
uri := *c.pooladdr
// Trim off the `dynamic+` prefix
uri.Scheme = uri.Scheme[8:]
l.Debugln(c, "looking up dynamic relays")
data, err := http.Get(uri.String())
if err != nil {
l.Debugln(c, "failed to lookup dynamic relays", err)
c.setError(err)
return
}
var ann dynamicAnnouncement
err = json.NewDecoder(data.Body).Decode(&ann)
data.Body.Close()
if err != nil {
l.Debugln(c, "failed to lookup dynamic relays", err)
c.setError(err)
return
}
defer c.cleanup()
var addrs []string
for _, relayAnn := range ann.Relays {
ruri, err := url.Parse(relayAnn.URL)
if err != nil {
l.Debugln(c, "failed to parse dynamic relay address", relayAnn.URL, err)
continue
}
l.Debugln(c, "found", ruri)
addrs = append(addrs, ruri.String())
}
for _, addr := range relayAddressesOrder(addrs) {
select {
case <-c.stop:
l.Debugln(c, "stopping")
c.setError(nil)
return
default:
ruri, err := url.Parse(addr)
if err != nil {
l.Debugln(c, "skipping relay", addr, err)
continue
}
2015-11-23 21:14:46 +00:00
client, err := NewClient(ruri, c.certs, c.invitations, c.timeout)
if err != nil {
continue
}
c.mut.Lock()
c.client = client
c.mut.Unlock()
c.client.Serve()
c.mut.Lock()
c.client = nil
c.mut.Unlock()
}
}
l.Debugln(c, "could not find a connectable relay")
c.setError(fmt.Errorf("could not find a connectable relay"))
}
func (c *dynamicClient) Stop() {
c.mut.RLock()
defer c.mut.RUnlock()
close(c.stop)
if c.client == nil {
return
}
c.client.Stop()
}
func (c *dynamicClient) Error() error {
c.mut.RLock()
defer c.mut.RUnlock()
if c.client == nil {
return c.err
}
return c.client.Error()
}
func (c *dynamicClient) Latency() time.Duration {
c.mut.RLock()
defer c.mut.RUnlock()
if c.client == nil {
return time.Hour
}
return c.client.Latency()
}
func (c *dynamicClient) String() string {
return fmt.Sprintf("DynamicClient:%p:%s@%s", c, c.URI(), c.pooladdr)
}
func (c *dynamicClient) URI() *url.URL {
c.mut.RLock()
defer c.mut.RUnlock()
if c.client == nil {
return nil
}
return c.client.URI()
}
func (c *dynamicClient) Invitations() chan protocol.SessionInvitation {
c.mut.RLock()
inv := c.invitations
c.mut.RUnlock()
return inv
}
func (c *dynamicClient) cleanup() {
c.mut.Lock()
if c.closeInvitationsOnFinish {
close(c.invitations)
c.invitations = make(chan protocol.SessionInvitation)
}
c.mut.Unlock()
}
func (c *dynamicClient) setError(err error) {
c.mut.Lock()
c.err = err
c.mut.Unlock()
}
// This is the announcement received from the relay server;
// {"relays": [{"url": "relay://10.20.30.40:5060"}, ...]}
type dynamicAnnouncement struct {
Relays []struct {
URL string
}
}
// relayAddressesOrder checks the latency to each relay, rounds latency down to
// the closest 50ms, and puts them in buckets of 50ms latency ranges. Then
// shuffles each bucket, and returns all addresses starting with the ones from
// the lowest latency bucket, ending with the highest latency buceket.
func relayAddressesOrder(input []string) []string {
buckets := make(map[int][]string)
for _, relay := range input {
latency, err := osutil.GetLatencyForURL(relay)
if err != nil {
latency = time.Hour
}
id := int(latency/time.Millisecond) / 50
buckets[id] = append(buckets[id], relay)
}
var ids []int
for id, bucket := range buckets {
shuffle(bucket)
ids = append(ids, id)
}
sort.Ints(ids)
addresses := make([]string, len(input))
for _, id := range ids {
addresses = append(addresses, buckets[id]...)
}
return addresses
}
func shuffle(slice []string) {
for i := len(slice) - 1; i > 0; i-- {
j := rand.Intn(i + 1)
slice[i], slice[j] = slice[j], slice[i]
}
}