mirror of
https://github.com/octoleo/syncthing.git
synced 2024-11-14 09:14:10 +00:00
1bae4b7f50
* all: Use context in lib/dialer * a bit slimmer * https://github.com/syncthing/syncthing/pull/5753 * bot * missed adding debug.go * errors.Cause * simultaneous dialing * anti-leak
187 lines
3.9 KiB
Go
187 lines
3.9 KiB
Go
// Copyright (C) 2015 Audrius Butkevicius and Contributors (see the CONTRIBUTORS file).
|
|
|
|
package client
|
|
|
|
import (
|
|
"context"
|
|
"crypto/tls"
|
|
"encoding/json"
|
|
"fmt"
|
|
"net/http"
|
|
"net/url"
|
|
"sort"
|
|
"time"
|
|
|
|
"github.com/syncthing/syncthing/lib/osutil"
|
|
"github.com/syncthing/syncthing/lib/rand"
|
|
"github.com/syncthing/syncthing/lib/relay/protocol"
|
|
)
|
|
|
|
type dynamicClient struct {
|
|
commonClient
|
|
|
|
pooladdr *url.URL
|
|
certs []tls.Certificate
|
|
timeout time.Duration
|
|
|
|
client RelayClient
|
|
}
|
|
|
|
func newDynamicClient(uri *url.URL, certs []tls.Certificate, invitations chan protocol.SessionInvitation, timeout time.Duration) RelayClient {
|
|
c := &dynamicClient{
|
|
pooladdr: uri,
|
|
certs: certs,
|
|
timeout: timeout,
|
|
}
|
|
c.commonClient = newCommonClient(invitations, c.serve, fmt.Sprintf("dynamicClient@%p", c))
|
|
return c
|
|
}
|
|
|
|
func (c *dynamicClient) serve(ctx context.Context) error {
|
|
uri := *c.pooladdr
|
|
|
|
// Trim off the `dynamic+` prefix
|
|
uri.Scheme = uri.Scheme[8:]
|
|
|
|
l.Debugln(c, "looking up dynamic relays")
|
|
|
|
data, err := http.Get(uri.String())
|
|
if err != nil {
|
|
l.Debugln(c, "failed to lookup dynamic relays", err)
|
|
return err
|
|
}
|
|
|
|
var ann dynamicAnnouncement
|
|
err = json.NewDecoder(data.Body).Decode(&ann)
|
|
data.Body.Close()
|
|
if err != nil {
|
|
l.Debugln(c, "failed to lookup dynamic relays", err)
|
|
return err
|
|
}
|
|
|
|
var addrs []string
|
|
for _, relayAnn := range ann.Relays {
|
|
ruri, err := url.Parse(relayAnn.URL)
|
|
if err != nil {
|
|
l.Debugln(c, "failed to parse dynamic relay address", relayAnn.URL, err)
|
|
continue
|
|
}
|
|
l.Debugln(c, "found", ruri)
|
|
addrs = append(addrs, ruri.String())
|
|
}
|
|
|
|
for _, addr := range relayAddressesOrder(ctx, addrs) {
|
|
select {
|
|
case <-ctx.Done():
|
|
l.Debugln(c, "stopping")
|
|
return nil
|
|
default:
|
|
ruri, err := url.Parse(addr)
|
|
if err != nil {
|
|
l.Debugln(c, "skipping relay", addr, err)
|
|
continue
|
|
}
|
|
client := newStaticClient(ruri, c.certs, c.invitations, c.timeout)
|
|
c.mut.Lock()
|
|
c.client = client
|
|
c.mut.Unlock()
|
|
|
|
c.client.Serve()
|
|
|
|
c.mut.Lock()
|
|
c.client = nil
|
|
c.mut.Unlock()
|
|
}
|
|
}
|
|
l.Debugln(c, "could not find a connectable relay")
|
|
return fmt.Errorf("could not find a connectable relay")
|
|
}
|
|
|
|
func (c *dynamicClient) Stop() {
|
|
c.mut.RLock()
|
|
if c.client != nil {
|
|
c.client.Stop()
|
|
}
|
|
c.mut.RUnlock()
|
|
c.commonClient.Stop()
|
|
}
|
|
|
|
func (c *dynamicClient) Error() error {
|
|
c.mut.RLock()
|
|
defer c.mut.RUnlock()
|
|
if c.client == nil {
|
|
return c.commonClient.Error()
|
|
}
|
|
return c.client.Error()
|
|
}
|
|
|
|
func (c *dynamicClient) Latency() time.Duration {
|
|
c.mut.RLock()
|
|
defer c.mut.RUnlock()
|
|
if c.client == nil {
|
|
return time.Hour
|
|
}
|
|
return c.client.Latency()
|
|
}
|
|
|
|
func (c *dynamicClient) String() string {
|
|
return fmt.Sprintf("DynamicClient:%p:%s@%s", c, c.URI(), c.pooladdr)
|
|
}
|
|
|
|
func (c *dynamicClient) URI() *url.URL {
|
|
c.mut.RLock()
|
|
defer c.mut.RUnlock()
|
|
if c.client == nil {
|
|
return nil
|
|
}
|
|
return c.client.URI()
|
|
}
|
|
|
|
// This is the announcement received from the relay server;
|
|
// {"relays": [{"url": "relay://10.20.30.40:5060"}, ...]}
|
|
type dynamicAnnouncement struct {
|
|
Relays []struct {
|
|
URL string
|
|
}
|
|
}
|
|
|
|
// relayAddressesOrder checks the latency to each relay, rounds latency down to
|
|
// the closest 50ms, and puts them in buckets of 50ms latency ranges. Then
|
|
// shuffles each bucket, and returns all addresses starting with the ones from
|
|
// the lowest latency bucket, ending with the highest latency buceket.
|
|
func relayAddressesOrder(ctx context.Context, input []string) []string {
|
|
buckets := make(map[int][]string)
|
|
|
|
for _, relay := range input {
|
|
latency, err := osutil.GetLatencyForURL(ctx, relay)
|
|
if err != nil {
|
|
latency = time.Hour
|
|
}
|
|
|
|
id := int(latency/time.Millisecond) / 50
|
|
|
|
buckets[id] = append(buckets[id], relay)
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
return nil
|
|
default:
|
|
}
|
|
}
|
|
|
|
var ids []int
|
|
for id, bucket := range buckets {
|
|
rand.Shuffle(bucket)
|
|
ids = append(ids, id)
|
|
}
|
|
|
|
sort.Ints(ids)
|
|
|
|
addresses := make([]string, 0, len(input))
|
|
for _, id := range ids {
|
|
addresses = append(addresses, buckets[id]...)
|
|
}
|
|
|
|
return addresses
|
|
}
|