mirror of
https://github.com/octoleo/syncthing.git
synced 2025-01-22 22:58:25 +00:00
Add discovery source priorities (fixes #2339)
Sources are given a priority, lower being better, when added to a CachingMux.
This commit is contained in:
parent
96b3d31b42
commit
89928ca8e4
@ -73,6 +73,13 @@ const (
|
|||||||
pingEventInterval = time.Minute
|
pingEventInterval = time.Minute
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// The discovery results are sorted by their source priority.
|
||||||
|
const (
|
||||||
|
ipv6LocalDiscoveryPriority = iota
|
||||||
|
ipv4LocalDiscoveryPriority
|
||||||
|
globalDiscoveryPriority
|
||||||
|
)
|
||||||
|
|
||||||
var l = logger.DefaultLogger
|
var l = logger.DefaultLogger
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
@ -703,7 +710,7 @@ func syncthingMain() {
|
|||||||
// Each global discovery server gets its results cached for five
|
// Each global discovery server gets its results cached for five
|
||||||
// minutes, and is not asked again for a minute when it's returned
|
// minutes, and is not asked again for a minute when it's returned
|
||||||
// unsuccessfully.
|
// unsuccessfully.
|
||||||
cachedDiscovery.Add(gd, 5*time.Minute, time.Minute)
|
cachedDiscovery.Add(gd, 5*time.Minute, time.Minute, globalDiscoveryPriority)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -713,14 +720,14 @@ func syncthingMain() {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
l.Warnln("IPv4 local discovery:", err)
|
l.Warnln("IPv4 local discovery:", err)
|
||||||
} else {
|
} else {
|
||||||
cachedDiscovery.Add(bcd, 0, 0)
|
cachedDiscovery.Add(bcd, 0, 0, ipv4LocalDiscoveryPriority)
|
||||||
}
|
}
|
||||||
// v6 multicasts
|
// v6 multicasts
|
||||||
mcd, err := discover.NewLocal(myID, cfg.Options().LocalAnnMCAddr, addrList, relaySvc)
|
mcd, err := discover.NewLocal(myID, cfg.Options().LocalAnnMCAddr, addrList, relaySvc)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
l.Warnln("IPv6 local discovery:", err)
|
l.Warnln("IPv6 local discovery:", err)
|
||||||
} else {
|
} else {
|
||||||
cachedDiscovery.Add(mcd, 0, 0)
|
cachedDiscovery.Add(mcd, 0, 0, ipv6LocalDiscoveryPriority)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -34,6 +34,14 @@ type cachedFinder struct {
|
|||||||
Finder
|
Finder
|
||||||
cacheTime time.Duration
|
cacheTime time.Duration
|
||||||
negCacheTime time.Duration
|
negCacheTime time.Duration
|
||||||
|
priority int
|
||||||
|
}
|
||||||
|
|
||||||
|
// A prioritizedAddress is what we use to sort addresses returned from
|
||||||
|
// different sources with different priorities.
|
||||||
|
type prioritizedAddress struct {
|
||||||
|
priority int
|
||||||
|
addr string
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewCachingMux() *CachingMux {
|
func NewCachingMux() *CachingMux {
|
||||||
@ -44,9 +52,9 @@ func NewCachingMux() *CachingMux {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Add registers a new Finder, with associated cache timeouts.
|
// Add registers a new Finder, with associated cache timeouts.
|
||||||
func (m *CachingMux) Add(finder Finder, cacheTime, negCacheTime time.Duration) {
|
func (m *CachingMux) Add(finder Finder, cacheTime, negCacheTime time.Duration, priority int) {
|
||||||
m.mut.Lock()
|
m.mut.Lock()
|
||||||
m.finders = append(m.finders, cachedFinder{finder, cacheTime, negCacheTime})
|
m.finders = append(m.finders, cachedFinder{finder, cacheTime, negCacheTime, priority})
|
||||||
m.caches = append(m.caches, newCache())
|
m.caches = append(m.caches, newCache())
|
||||||
m.mut.Unlock()
|
m.mut.Unlock()
|
||||||
|
|
||||||
@ -58,6 +66,8 @@ func (m *CachingMux) Add(finder Finder, cacheTime, negCacheTime time.Duration) {
|
|||||||
// Lookup attempts to resolve the device ID using any of the added Finders,
|
// Lookup attempts to resolve the device ID using any of the added Finders,
|
||||||
// while obeying the cache settings.
|
// while obeying the cache settings.
|
||||||
func (m *CachingMux) Lookup(deviceID protocol.DeviceID) (direct []string, relays []Relay, err error) {
|
func (m *CachingMux) Lookup(deviceID protocol.DeviceID) (direct []string, relays []Relay, err error) {
|
||||||
|
var pdirect []prioritizedAddress
|
||||||
|
|
||||||
m.mut.Lock()
|
m.mut.Lock()
|
||||||
for i, finder := range m.finders {
|
for i, finder := range m.finders {
|
||||||
if cacheEntry, ok := m.caches[i].Get(deviceID); ok {
|
if cacheEntry, ok := m.caches[i].Get(deviceID); ok {
|
||||||
@ -67,9 +77,11 @@ func (m *CachingMux) Lookup(deviceID protocol.DeviceID) (direct []string, relays
|
|||||||
// It's a positive, valid entry. Use it.
|
// It's a positive, valid entry. Use it.
|
||||||
if debug {
|
if debug {
|
||||||
l.Debugln("cached discovery entry for", deviceID, "at", finder.String())
|
l.Debugln("cached discovery entry for", deviceID, "at", finder.String())
|
||||||
l.Debugln(" ", cacheEntry)
|
l.Debugln(" cache:", cacheEntry)
|
||||||
|
}
|
||||||
|
for _, addr := range cacheEntry.Direct {
|
||||||
|
pdirect = append(pdirect, prioritizedAddress{finder.priority, addr})
|
||||||
}
|
}
|
||||||
direct = append(direct, cacheEntry.Direct...)
|
|
||||||
relays = append(relays, cacheEntry.Relays...)
|
relays = append(relays, cacheEntry.Relays...)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
@ -90,10 +102,12 @@ func (m *CachingMux) Lookup(deviceID protocol.DeviceID) (direct []string, relays
|
|||||||
if td, tr, err := finder.Lookup(deviceID); err == nil {
|
if td, tr, err := finder.Lookup(deviceID); err == nil {
|
||||||
if debug {
|
if debug {
|
||||||
l.Debugln("lookup for", deviceID, "at", finder.String())
|
l.Debugln("lookup for", deviceID, "at", finder.String())
|
||||||
l.Debugln(" ", td)
|
l.Debugln(" direct:", td)
|
||||||
l.Debugln(" ", tr)
|
l.Debugln(" relays:", tr)
|
||||||
|
}
|
||||||
|
for _, addr := range td {
|
||||||
|
pdirect = append(pdirect, prioritizedAddress{finder.priority, addr})
|
||||||
}
|
}
|
||||||
direct = append(direct, td...)
|
|
||||||
relays = append(relays, tr...)
|
relays = append(relays, tr...)
|
||||||
m.caches[i].Set(deviceID, CacheEntry{
|
m.caches[i].Set(deviceID, CacheEntry{
|
||||||
Direct: td,
|
Direct: td,
|
||||||
@ -105,13 +119,15 @@ func (m *CachingMux) Lookup(deviceID protocol.DeviceID) (direct []string, relays
|
|||||||
}
|
}
|
||||||
m.mut.Unlock()
|
m.mut.Unlock()
|
||||||
|
|
||||||
|
direct = uniqueSortedAddrs(pdirect)
|
||||||
|
relays = uniqueSortedRelays(relays)
|
||||||
if debug {
|
if debug {
|
||||||
l.Debugln("lookup results for", deviceID)
|
l.Debugln("lookup results for", deviceID)
|
||||||
l.Debugln(" ", direct)
|
l.Debugln(" direct: ", direct)
|
||||||
l.Debugln(" ", relays)
|
l.Debugln(" relays: ", relays)
|
||||||
}
|
}
|
||||||
|
|
||||||
return uniqueSortedStrings(direct), uniqueSortedRelays(relays), nil
|
return direct, relays, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *CachingMux) String() string {
|
func (m *CachingMux) String() string {
|
||||||
@ -198,20 +214,19 @@ func (c *cache) Cache() map[protocol.DeviceID]CacheEntry {
|
|||||||
return m
|
return m
|
||||||
}
|
}
|
||||||
|
|
||||||
func uniqueSortedStrings(ss []string) []string {
|
func uniqueSortedAddrs(ss []prioritizedAddress) []string {
|
||||||
m := make(map[string]struct{}, len(ss))
|
// We sort the addresses by priority, then filter them based on seen
|
||||||
|
// (first time seen is the on kept, so we retain priority).
|
||||||
|
sort.Sort(prioritizedAddressList(ss))
|
||||||
|
filtered := make([]string, 0, len(ss))
|
||||||
|
seen := make(map[string]struct{}, len(ss))
|
||||||
for _, s := range ss {
|
for _, s := range ss {
|
||||||
m[s] = struct{}{}
|
if _, ok := seen[s.addr]; !ok {
|
||||||
|
filtered = append(filtered, s.addr)
|
||||||
|
seen[s.addr] = struct{}{}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
return filtered
|
||||||
var us = make([]string, 0, len(m))
|
|
||||||
for k := range m {
|
|
||||||
us = append(us, k)
|
|
||||||
}
|
|
||||||
|
|
||||||
sort.Strings(us)
|
|
||||||
|
|
||||||
return us
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func uniqueSortedRelays(rs []Relay) []Relay {
|
func uniqueSortedRelays(rs []Relay) []Relay {
|
||||||
@ -243,3 +258,20 @@ func (l relayList) Swap(a, b int) {
|
|||||||
func (l relayList) Less(a, b int) bool {
|
func (l relayList) Less(a, b int) bool {
|
||||||
return l[a].URL < l[b].URL
|
return l[a].URL < l[b].URL
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type prioritizedAddressList []prioritizedAddress
|
||||||
|
|
||||||
|
func (l prioritizedAddressList) Len() int {
|
||||||
|
return len(l)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l prioritizedAddressList) Swap(a, b int) {
|
||||||
|
l[a], l[b] = l[b], l[a]
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l prioritizedAddressList) Less(a, b int) bool {
|
||||||
|
if l[a].priority != l[b].priority {
|
||||||
|
return l[a].priority < l[b].priority
|
||||||
|
}
|
||||||
|
return l[a].addr < l[b].addr
|
||||||
|
}
|
||||||
|
@ -15,7 +15,21 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
func TestCacheUnique(t *testing.T) {
|
func TestCacheUnique(t *testing.T) {
|
||||||
direct := []string{"tcp://192.0.2.42:22000", "tcp://192.0.2.43:22000"}
|
direct0 := []string{"tcp://192.0.2.44:22000", "tcp://192.0.2.42:22000"} // prio 0
|
||||||
|
direct1 := []string{"tcp://192.0.2.43:22000", "tcp://192.0.2.42:22000"} // prio 1
|
||||||
|
|
||||||
|
// what we expect from just direct0
|
||||||
|
direct0Sorted := []string{"tcp://192.0.2.42:22000", "tcp://192.0.2.44:22000"}
|
||||||
|
|
||||||
|
// what we expect from direct0+direct1
|
||||||
|
totalSorted := []string{
|
||||||
|
// first prio 0, sorted
|
||||||
|
"tcp://192.0.2.42:22000", "tcp://192.0.2.44:22000",
|
||||||
|
// then prio 1
|
||||||
|
"tcp://192.0.2.43:22000",
|
||||||
|
// no duplicate .42
|
||||||
|
}
|
||||||
|
|
||||||
relays := []Relay{{URL: "relay://192.0.2.44:443"}, {URL: "tcp://192.0.2.45:443"}}
|
relays := []Relay{{URL: "relay://192.0.2.44:443"}, {URL: "tcp://192.0.2.45:443"}}
|
||||||
|
|
||||||
c := NewCachingMux()
|
c := NewCachingMux()
|
||||||
@ -25,15 +39,15 @@ func TestCacheUnique(t *testing.T) {
|
|||||||
// Add a fake discovery service and verify we get it's answers through the
|
// Add a fake discovery service and verify we get it's answers through the
|
||||||
// cache.
|
// cache.
|
||||||
|
|
||||||
f1 := &fakeDiscovery{direct, relays}
|
f1 := &fakeDiscovery{direct0, relays}
|
||||||
c.Add(f1, time.Minute, 0)
|
c.Add(f1, time.Minute, 0, 0)
|
||||||
|
|
||||||
dir, rel, err := c.Lookup(protocol.LocalDeviceID)
|
dir, rel, err := c.Lookup(protocol.LocalDeviceID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
if !reflect.DeepEqual(dir, direct) {
|
if !reflect.DeepEqual(dir, direct0Sorted) {
|
||||||
t.Errorf("Incorrect direct; %+v != %+v", dir, direct)
|
t.Errorf("Incorrect direct; %+v != %+v", dir, direct0Sorted)
|
||||||
}
|
}
|
||||||
if !reflect.DeepEqual(rel, relays) {
|
if !reflect.DeepEqual(rel, relays) {
|
||||||
t.Errorf("Incorrect relays; %+v != %+v", rel, relays)
|
t.Errorf("Incorrect relays; %+v != %+v", rel, relays)
|
||||||
@ -42,15 +56,15 @@ func TestCacheUnique(t *testing.T) {
|
|||||||
// Add one more that answers in the same way and check that we don't
|
// Add one more that answers in the same way and check that we don't
|
||||||
// duplicate or otherwise mess up the responses now.
|
// duplicate or otherwise mess up the responses now.
|
||||||
|
|
||||||
f2 := &fakeDiscovery{direct, relays}
|
f2 := &fakeDiscovery{direct1, relays}
|
||||||
c.Add(f2, time.Minute, 0)
|
c.Add(f2, time.Minute, 0, 1)
|
||||||
|
|
||||||
dir, rel, err = c.Lookup(protocol.LocalDeviceID)
|
dir, rel, err = c.Lookup(protocol.LocalDeviceID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
if !reflect.DeepEqual(dir, direct) {
|
if !reflect.DeepEqual(dir, totalSorted) {
|
||||||
t.Errorf("Incorrect direct; %+v != %+v", dir, direct)
|
t.Errorf("Incorrect direct; %+v != %+v", dir, totalSorted)
|
||||||
}
|
}
|
||||||
if !reflect.DeepEqual(rel, relays) {
|
if !reflect.DeepEqual(rel, relays) {
|
||||||
t.Errorf("Incorrect relays; %+v != %+v", rel, relays)
|
t.Errorf("Incorrect relays; %+v != %+v", rel, relays)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user