Jakob Borg 297769ef57 More resilient broadcast handling (fixes #1907)
My theory is that some error condition on the socket results in it
blocking for writes, which maybe also blocks reads... This separates the
two into separate services with their own socket, with restarts and
retries as appropriates on write timeouts and read/write errors. It
should be more robust, hopefully, but I have a hard time testing the
actual error conditions...
2015-06-11 15:06:05 +02:00

454 lines
10 KiB
Go

// Copyright (C) 2014 The Syncthing Authors.
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at http://mozilla.org/MPL/2.0/.
package discover
import (
"bytes"
"encoding/hex"
"errors"
"io"
"net"
"runtime"
"strconv"
"time"
"github.com/syncthing/protocol"
"github.com/syncthing/syncthing/internal/beacon"
"github.com/syncthing/syncthing/internal/events"
"github.com/syncthing/syncthing/internal/sync"
)
type Discoverer struct {
myID protocol.DeviceID
listenAddrs []string
localBcastIntv time.Duration
localBcastStart time.Time
cacheLifetime time.Duration
negCacheCutoff time.Duration
beacons []beacon.Interface
extPort uint16
localBcastTick <-chan time.Time
forcedBcastTick chan time.Time
registryLock sync.RWMutex
registry map[protocol.DeviceID][]CacheEntry
lastLookup map[protocol.DeviceID]time.Time
clients []Client
mut sync.RWMutex
}
type CacheEntry struct {
Address string
Seen time.Time
}
var (
ErrIncorrectMagic = errors.New("incorrect magic number")
)
func NewDiscoverer(id protocol.DeviceID, addresses []string) *Discoverer {
return &Discoverer{
myID: id,
listenAddrs: addresses,
localBcastIntv: 30 * time.Second,
cacheLifetime: 5 * time.Minute,
negCacheCutoff: 3 * time.Minute,
registry: make(map[protocol.DeviceID][]CacheEntry),
lastLookup: make(map[protocol.DeviceID]time.Time),
registryLock: sync.NewRWMutex(),
mut: sync.NewRWMutex(),
}
}
func (d *Discoverer) StartLocal(localPort int, localMCAddr string) {
if localPort > 0 {
d.startLocalIPv4Broadcasts(localPort)
}
if len(localMCAddr) > 0 {
d.startLocalIPv6Multicasts(localMCAddr)
}
if len(d.beacons) == 0 {
l.Warnln("Local discovery unavailable")
return
}
d.localBcastTick = time.Tick(d.localBcastIntv)
d.forcedBcastTick = make(chan time.Time)
d.localBcastStart = time.Now()
go d.sendLocalAnnouncements()
}
func (d *Discoverer) startLocalIPv4Broadcasts(localPort int) {
bb := beacon.NewBroadcast(localPort)
d.beacons = append(d.beacons, bb)
go d.recvAnnouncements(bb)
bb.ServeBackground()
}
func (d *Discoverer) startLocalIPv6Multicasts(localMCAddr string) {
intfs, err := net.Interfaces()
if err != nil {
if debug {
l.Debugln("discover: interfaces:", err)
}
l.Infoln("Local discovery over IPv6 unavailable")
return
}
v6Intfs := 0
for _, intf := range intfs {
// Interface flags seem to always be 0 on Windows
if runtime.GOOS != "windows" && (intf.Flags&net.FlagUp == 0 || intf.Flags&net.FlagMulticast == 0) {
continue
}
mb, err := beacon.NewMulticast(localMCAddr, intf.Name)
if err != nil {
if debug {
l.Debugln("discover: Start local v6:", err)
}
continue
}
d.beacons = append(d.beacons, mb)
go d.recvAnnouncements(mb)
v6Intfs++
}
if v6Intfs == 0 {
l.Infoln("Local discovery over IPv6 unavailable")
}
}
func (d *Discoverer) StartGlobal(servers []string, extPort uint16) {
d.mut.Lock()
defer d.mut.Unlock()
if len(d.clients) > 0 {
d.stopGlobal()
}
d.extPort = extPort
pkt := d.announcementPkt()
wg := sync.NewWaitGroup()
clients := make(chan Client, len(servers))
for _, address := range servers {
wg.Add(1)
go func(addr string) {
defer wg.Done()
client, err := New(addr, pkt)
if err != nil {
l.Infoln("Error creating discovery client", addr, err)
return
}
clients <- client
}(address)
}
wg.Wait()
close(clients)
for client := range clients {
d.clients = append(d.clients, client)
}
}
func (d *Discoverer) StopGlobal() {
d.mut.Lock()
defer d.mut.Unlock()
d.stopGlobal()
}
func (d *Discoverer) stopGlobal() {
for _, client := range d.clients {
client.Stop()
}
d.clients = []Client{}
}
func (d *Discoverer) ExtAnnounceOK() map[string]bool {
d.mut.RLock()
defer d.mut.RUnlock()
ret := make(map[string]bool)
for _, client := range d.clients {
ret[client.Address()] = client.StatusOK()
}
return ret
}
func (d *Discoverer) Lookup(device protocol.DeviceID) []string {
d.registryLock.RLock()
cached := d.filterCached(d.registry[device])
lastLookup := d.lastLookup[device]
d.registryLock.RUnlock()
d.mut.RLock()
defer d.mut.RUnlock()
if len(cached) > 0 {
// There are cached address entries.
addrs := make([]string, len(cached))
for i := range cached {
addrs[i] = cached[i].Address
}
return addrs
}
if time.Since(lastLookup) < d.negCacheCutoff {
// We have recently tried to lookup this address and failed. Lets
// chill for a while.
return nil
}
if len(d.clients) != 0 && time.Since(d.localBcastStart) > d.localBcastIntv {
// Only perform external lookups if we have at least one external
// server client and one local announcement interval has passed. This is
// to avoid finding local peers on their remote address at startup.
results := make(chan []string, len(d.clients))
wg := sync.NewWaitGroup()
for _, client := range d.clients {
wg.Add(1)
go func(c Client) {
defer wg.Done()
results <- c.Lookup(device)
}(client)
}
wg.Wait()
close(results)
cached := []CacheEntry{}
seen := make(map[string]struct{})
now := time.Now()
var addrs []string
for result := range results {
for _, addr := range result {
_, ok := seen[addr]
if !ok {
cached = append(cached, CacheEntry{
Address: addr,
Seen: now,
})
seen[addr] = struct{}{}
addrs = append(addrs, addr)
}
}
}
d.registryLock.Lock()
d.registry[device] = cached
d.lastLookup[device] = time.Now()
d.registryLock.Unlock()
return addrs
}
return nil
}
func (d *Discoverer) Hint(device string, addrs []string) {
resAddrs := resolveAddrs(addrs)
var id protocol.DeviceID
id.UnmarshalText([]byte(device))
d.registerDevice(nil, Device{
Addresses: resAddrs,
ID: id[:],
})
}
func (d *Discoverer) All() map[protocol.DeviceID][]CacheEntry {
d.registryLock.RLock()
devices := make(map[protocol.DeviceID][]CacheEntry, len(d.registry))
for device, addrs := range d.registry {
addrsCopy := make([]CacheEntry, len(addrs))
copy(addrsCopy, addrs)
devices[device] = addrsCopy
}
d.registryLock.RUnlock()
return devices
}
func (d *Discoverer) announcementPkt() *Announce {
var addrs []Address
if d.extPort != 0 {
addrs = []Address{{Port: d.extPort}}
} else {
for _, astr := range d.listenAddrs {
addr, err := net.ResolveTCPAddr("tcp", astr)
if err != nil {
l.Warnln("discover: %v: not announcing %s", err, astr)
continue
} else if debug {
l.Debugf("discover: resolved %s as %#v", astr, addr)
}
if len(addr.IP) == 0 || addr.IP.IsUnspecified() {
addrs = append(addrs, Address{Port: uint16(addr.Port)})
} else if bs := addr.IP.To4(); bs != nil {
addrs = append(addrs, Address{IP: bs, Port: uint16(addr.Port)})
} else if bs := addr.IP.To16(); bs != nil {
addrs = append(addrs, Address{IP: bs, Port: uint16(addr.Port)})
}
}
}
return &Announce{
Magic: AnnouncementMagic,
This: Device{d.myID[:], addrs},
}
}
func (d *Discoverer) sendLocalAnnouncements() {
var addrs = resolveAddrs(d.listenAddrs)
var pkt = Announce{
Magic: AnnouncementMagic,
This: Device{d.myID[:], addrs},
}
msg := pkt.MustMarshalXDR()
for {
for _, b := range d.beacons {
b.Send(msg)
}
select {
case <-d.localBcastTick:
case <-d.forcedBcastTick:
}
}
}
func (d *Discoverer) recvAnnouncements(b beacon.Interface) {
for {
buf, addr := b.Recv()
var pkt Announce
err := pkt.UnmarshalXDR(buf)
if err != nil && err != io.EOF {
if debug {
l.Debugf("discover: Failed to unmarshal local announcement from %s:\n%s", addr, hex.Dump(buf))
}
continue
}
if debug {
l.Debugf("discover: Received local announcement from %s for %s", addr, protocol.DeviceIDFromBytes(pkt.This.ID))
}
var newDevice bool
if bytes.Compare(pkt.This.ID, d.myID[:]) != 0 {
newDevice = d.registerDevice(addr, pkt.This)
}
if newDevice {
select {
case d.forcedBcastTick <- time.Now():
}
}
}
}
func (d *Discoverer) registerDevice(addr net.Addr, device Device) bool {
var id protocol.DeviceID
copy(id[:], device.ID)
d.registryLock.Lock()
defer d.registryLock.Unlock()
current := d.filterCached(d.registry[id])
orig := current
for _, a := range device.Addresses {
var deviceAddr string
if len(a.IP) > 0 {
deviceAddr = net.JoinHostPort(net.IP(a.IP).String(), strconv.Itoa(int(a.Port)))
} else if addr != nil {
ua := addr.(*net.UDPAddr)
ua.Port = int(a.Port)
deviceAddr = ua.String()
}
for i := range current {
if current[i].Address == deviceAddr {
current[i].Seen = time.Now()
goto done
}
}
current = append(current, CacheEntry{
Address: deviceAddr,
Seen: time.Now(),
})
done:
}
if debug {
l.Debugf("discover: Caching %s addresses: %v", id, current)
}
d.registry[id] = current
if len(current) > len(orig) {
addrs := make([]string, len(current))
for i := range current {
addrs[i] = current[i].Address
}
events.Default.Log(events.DeviceDiscovered, map[string]interface{}{
"device": id.String(),
"addrs": addrs,
})
}
return len(current) > len(orig)
}
func (d *Discoverer) filterCached(c []CacheEntry) []CacheEntry {
for i := 0; i < len(c); {
if ago := time.Since(c[i].Seen); ago > d.cacheLifetime {
if debug {
l.Debugf("discover: Removing cached address %s - seen %v ago", c[i].Address, ago)
}
c[i] = c[len(c)-1]
c = c[:len(c)-1]
} else {
i++
}
}
return c
}
func addrToAddr(addr *net.TCPAddr) Address {
if len(addr.IP) == 0 || addr.IP.IsUnspecified() {
return Address{Port: uint16(addr.Port)}
} else if bs := addr.IP.To4(); bs != nil {
return Address{IP: bs, Port: uint16(addr.Port)}
} else if bs := addr.IP.To16(); bs != nil {
return Address{IP: bs, Port: uint16(addr.Port)}
}
return Address{}
}
func resolveAddrs(addrs []string) []Address {
var raddrs []Address
for _, addrStr := range addrs {
addrRes, err := net.ResolveTCPAddr("tcp", addrStr)
if err != nil {
continue
}
addr := addrToAddr(addrRes)
if len(addr.IP) > 0 {
raddrs = append(raddrs, addr)
} else {
raddrs = append(raddrs, Address{Port: addr.Port})
}
}
return raddrs
}