Pick a single relay (fixes #2182)

This commit is contained in:
AudriusButkevicius 2015-09-02 17:56:44 +01:00
parent cf802dc67e
commit 876d7ac85e
3 changed files with 70 additions and 43 deletions

View File

@ -21,22 +21,21 @@ import (
"github.com/syncthing/syncthing/lib/beacon" "github.com/syncthing/syncthing/lib/beacon"
"github.com/syncthing/syncthing/lib/events" "github.com/syncthing/syncthing/lib/events"
"github.com/syncthing/syncthing/lib/osutil" "github.com/syncthing/syncthing/lib/osutil"
"github.com/syncthing/syncthing/lib/relay"
"github.com/syncthing/syncthing/lib/sync" "github.com/syncthing/syncthing/lib/sync"
) )
type Discoverer struct { type Discoverer struct {
myID protocol.DeviceID myID protocol.DeviceID
listenAddrs []string listenAddrs []string
relaySvc *relay.Svc relayStatusProvider relayStatusProvider
localBcastIntv time.Duration localBcastIntv time.Duration
localBcastStart time.Time localBcastStart time.Time
cacheLifetime time.Duration cacheLifetime time.Duration
negCacheCutoff time.Duration negCacheCutoff time.Duration
beacons []beacon.Interface beacons []beacon.Interface
extPort uint16 extPort uint16
localBcastTick <-chan time.Time localBcastTick <-chan time.Time
forcedBcastTick chan time.Time forcedBcastTick chan time.Time
registryLock sync.RWMutex registryLock sync.RWMutex
addressRegistry map[protocol.DeviceID][]CacheEntry addressRegistry map[protocol.DeviceID][]CacheEntry
@ -47,6 +46,10 @@ type Discoverer struct {
mut sync.RWMutex mut sync.RWMutex
} }
type relayStatusProvider interface {
ClientStatus() map[string]bool
}
type CacheEntry struct { type CacheEntry struct {
Address string Address string
Seen time.Time Seen time.Time
@ -56,19 +59,19 @@ var (
ErrIncorrectMagic = errors.New("incorrect magic number") ErrIncorrectMagic = errors.New("incorrect magic number")
) )
func NewDiscoverer(id protocol.DeviceID, addresses []string, relaySvc *relay.Svc) *Discoverer { func NewDiscoverer(id protocol.DeviceID, addresses []string, relayStatusProvider relayStatusProvider) *Discoverer {
return &Discoverer{ return &Discoverer{
myID: id, myID: id,
listenAddrs: addresses, listenAddrs: addresses,
relaySvc: relaySvc, relayStatusProvider: relayStatusProvider,
localBcastIntv: 30 * time.Second, localBcastIntv: 30 * time.Second,
cacheLifetime: 5 * time.Minute, cacheLifetime: 5 * time.Minute,
negCacheCutoff: 3 * time.Minute, negCacheCutoff: 3 * time.Minute,
addressRegistry: make(map[protocol.DeviceID][]CacheEntry), addressRegistry: make(map[protocol.DeviceID][]CacheEntry),
relayRegistry: make(map[protocol.DeviceID][]CacheEntry), relayRegistry: make(map[protocol.DeviceID][]CacheEntry),
lastLookup: make(map[protocol.DeviceID]time.Time), lastLookup: make(map[protocol.DeviceID]time.Time),
registryLock: sync.NewRWMutex(), registryLock: sync.NewRWMutex(),
mut: sync.NewRWMutex(), mut: sync.NewRWMutex(),
} }
} }
@ -251,7 +254,7 @@ func (d *Discoverer) Lookup(device protocol.DeviceID) ([]string, []string) {
} }
} }
relays = addressesSortedByLatency(availableRelays) relays = RelayAddressesSortedByLatency(availableRelays)
cachedRelays := make([]CacheEntry, len(relays)) cachedRelays := make([]CacheEntry, len(relays))
for i := range relays { for i := range relays {
cachedRelays[i] = CacheEntry{ cachedRelays[i] = CacheEntry{
@ -307,8 +310,8 @@ func (d *Discoverer) announcementPkt(allowExternal bool) Announce {
} }
var relayAddrs []string var relayAddrs []string
if d.relaySvc != nil { if d.relayStatusProvider != nil {
status := d.relaySvc.ClientStatus() status := d.relayStatusProvider.ClientStatus()
for uri, ok := range status { for uri, ok := range status {
if ok { if ok {
relayAddrs = append(relayAddrs, uri) relayAddrs = append(relayAddrs, uri)
@ -489,7 +492,7 @@ func measureLatency(relayAdresses []string) []Relay {
} }
relays = append(relays, relay) relays = append(relays, relay)
if latency, err := getLatencyForURL(addr); err == nil { if latency, err := osutil.GetLatencyForURL(addr); err == nil {
if debug { if debug {
l.Debugf("Relay %s latency %s", addr, latency) l.Debugf("Relay %s latency %s", addr, latency)
} }
@ -501,13 +504,13 @@ func measureLatency(relayAdresses []string) []Relay {
return relays return relays
} }
// addressesSortedByLatency adds local latency to the relay, and sorts them // RelayAddressesSortedByLatency adds local latency to the relay, and sorts them
// by sum latency, and returns the addresses. // by sum latency, and returns the addresses.
func addressesSortedByLatency(input []Relay) []string { func RelayAddressesSortedByLatency(input []Relay) []string {
relays := make([]Relay, len(input)) relays := make([]Relay, len(input))
copy(relays, input) copy(relays, input)
for i, relay := range relays { for i, relay := range relays {
if latency, err := getLatencyForURL(relay.Address); err == nil { if latency, err := osutil.GetLatencyForURL(relay.Address); err == nil {
relays[i].Latency += int32(latency / time.Millisecond) relays[i].Latency += int32(latency / time.Millisecond)
} else { } else {
relays[i].Latency += int32(time.Hour / time.Millisecond) relays[i].Latency += int32(time.Hour / time.Millisecond)
@ -523,15 +526,6 @@ func addressesSortedByLatency(input []Relay) []string {
return addresses return addresses
} }
func getLatencyForURL(addr string) (time.Duration, error) {
uri, err := url.Parse(addr)
if err != nil {
return 0, err
}
return osutil.TCPPing(uri.Host)
}
type relayList []Relay type relayList []Relay
func (l relayList) Len() int { func (l relayList) Len() int {

View File

@ -8,6 +8,7 @@ package osutil
import ( import (
"net" "net"
"net/url"
"time" "time"
) )
@ -25,3 +26,14 @@ func TCPPing(address string) (time.Duration, error) {
} }
return time.Since(start), err return time.Since(start), err
} }
// GetLatencyForURL parses the given URL, tries opening a TCP connection to it
// and returns the time it took to establish the connection.
func GetLatencyForURL(addr string) (time.Duration, error) {
uri, err := url.Parse(addr)
if err != nil {
return 0, err
}
return TCPPing(uri.Host)
}

View File

@ -17,6 +17,7 @@ import (
"github.com/syncthing/relaysrv/client" "github.com/syncthing/relaysrv/client"
"github.com/syncthing/relaysrv/protocol" "github.com/syncthing/relaysrv/protocol"
"github.com/syncthing/syncthing/lib/config" "github.com/syncthing/syncthing/lib/config"
"github.com/syncthing/syncthing/lib/discover"
"github.com/syncthing/syncthing/lib/model" "github.com/syncthing/syncthing/lib/model"
"github.com/syncthing/syncthing/lib/osutil" "github.com/syncthing/syncthing/lib/osutil"
"github.com/syncthing/syncthing/lib/sync" "github.com/syncthing/syncthing/lib/sync"
@ -29,7 +30,7 @@ func NewSvc(cfg *config.Wrapper, tlsCfg *tls.Config, conns chan<- model.Intermed
Supervisor: suture.New("Svc", suture.Spec{ Supervisor: suture.New("Svc", suture.Spec{
Log: func(log string) { Log: func(log string) {
if debug { if debug {
l.Infoln(log) l.Debugln(log)
} }
}, },
FailureBackoff: 5 * time.Minute, FailureBackoff: 5 * time.Minute,
@ -97,15 +98,20 @@ func (s *Svc) CommitConfiguration(from, to config.Configuration) bool {
existing[uri.String()] = uri existing[uri.String()] = uri
} }
// Expand dynamic addresses into a set of relays // Query dynamic addresses, and pick the closest relay from the ones they provide.
for key, uri := range existing { for key, uri := range existing {
if uri.Scheme != "dynamic+http" && uri.Scheme != "dynamic+https" { if uri.Scheme != "dynamic+http" && uri.Scheme != "dynamic+https" {
continue continue
} }
delete(existing, key) delete(existing, key)
// Trim off the `dynamic+` prefix
uri.Scheme = uri.Scheme[8:] uri.Scheme = uri.Scheme[8:]
if debug {
l.Debugln("Looking up dynamic relays from", uri)
}
data, err := http.Get(uri.String()) data, err := http.Get(uri.String())
if err != nil { if err != nil {
if debug { if debug {
@ -124,6 +130,7 @@ func (s *Svc) CommitConfiguration(from, to config.Configuration) bool {
continue continue
} }
dynRelays := make([]discover.Relay, 0, len(ann.Relays))
for _, relayAnn := range ann.Relays { for _, relayAnn := range ann.Relays {
ruri, err := url.Parse(relayAnn.URL) ruri, err := url.Parse(relayAnn.URL)
if err != nil { if err != nil {
@ -135,7 +142,21 @@ func (s *Svc) CommitConfiguration(from, to config.Configuration) bool {
if debug { if debug {
l.Debugln("Found", ruri, "via", uri) l.Debugln("Found", ruri, "via", uri)
} }
existing[ruri.String()] = ruri dynRelays = append(dynRelays, discover.Relay{
Address: ruri.String(),
})
}
dynRelayAddrs := discover.RelayAddressesSortedByLatency(dynRelays)
if len(dynRelayAddrs) > 0 {
closestRelay := dynRelayAddrs[0]
if debug {
l.Debugln("Picking", closestRelay, "as closest dynamic relay from", uri)
}
ruri, _ := url.Parse(closestRelay)
existing[closestRelay] = ruri
} else if debug {
l.Debugln("No dynamic relay found on", uri)
} }
} }