syncthing/lib/discover/client_udp.go
2015-09-13 07:56:13 +02:00

262 lines
5.8 KiB
Go

// Copyright (C) 2014 The Syncthing Authors.
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at http://mozilla.org/MPL/2.0/.
package discover
import (
"encoding/hex"
"io"
"net"
"net/url"
"strconv"
"time"
"github.com/syncthing/protocol"
"github.com/syncthing/syncthing/lib/events"
"github.com/syncthing/syncthing/lib/sync"
)
func init() {
for _, proto := range []string{"udp", "udp4", "udp6"} {
Register(proto, func(uri *url.URL, announcer Announcer) (Client, error) {
c := &UDPClient{
announcer: announcer,
wg: sync.NewWaitGroup(),
mut: sync.NewRWMutex(),
}
err := c.Start(uri)
if err != nil {
return nil, err
}
return c, nil
})
}
}
type UDPClient struct {
url *url.URL
stop chan struct{}
wg sync.WaitGroup
listenAddress *net.UDPAddr
globalBroadcastInterval time.Duration
errorRetryInterval time.Duration
announcer Announcer
status bool
mut sync.RWMutex
}
func (d *UDPClient) Start(uri *url.URL) error {
d.url = uri
d.stop = make(chan struct{})
params := uri.Query()
// The address must not have a port, as otherwise both announce and lookup
// sockets would try to bind to the same port.
addr, err := net.ResolveUDPAddr(d.url.Scheme, params.Get("listenaddress")+":0")
if err != nil {
return err
}
d.listenAddress = addr
broadcastSeconds, err := strconv.ParseUint(params.Get("broadcast"), 0, 0)
if err != nil {
d.globalBroadcastInterval = DefaultGlobalBroadcastInterval
} else {
d.globalBroadcastInterval = time.Duration(broadcastSeconds) * time.Second
}
retrySeconds, err := strconv.ParseUint(params.Get("retry"), 0, 0)
if err != nil {
d.errorRetryInterval = DefaultErrorRetryInternval
} else {
d.errorRetryInterval = time.Duration(retrySeconds) * time.Second
}
d.wg.Add(1)
go d.broadcast()
return nil
}
func (d *UDPClient) broadcast() {
defer d.wg.Done()
conn, err := net.ListenUDP(d.url.Scheme, d.listenAddress)
for err != nil {
if debug {
l.Debugf("discover %s: broadcast listen: %v; trying again in %v", d.url, err, d.errorRetryInterval)
}
select {
case <-d.stop:
return
case <-time.After(d.errorRetryInterval):
}
conn, err = net.ListenUDP(d.url.Scheme, d.listenAddress)
}
defer conn.Close()
remote, err := net.ResolveUDPAddr(d.url.Scheme, d.url.Host)
for err != nil {
if debug {
l.Debugf("discover %s: broadcast resolve: %v; trying again in %v", d.url, err, d.errorRetryInterval)
}
select {
case <-d.stop:
return
case <-time.After(d.errorRetryInterval):
}
remote, err = net.ResolveUDPAddr(d.url.Scheme, d.url.Host)
}
timer := time.NewTimer(0)
eventSub := events.Default.Subscribe(events.ExternalPortMappingChanged)
defer events.Default.Unsubscribe(eventSub)
for {
select {
case <-d.stop:
return
case <-eventSub.C():
ok := d.sendAnnouncement(remote, conn)
d.mut.Lock()
d.status = ok
d.mut.Unlock()
case <-timer.C:
ok := d.sendAnnouncement(remote, conn)
d.mut.Lock()
d.status = ok
d.mut.Unlock()
if ok {
timer.Reset(d.globalBroadcastInterval)
} else {
timer.Reset(d.errorRetryInterval)
}
}
}
}
func (d *UDPClient) sendAnnouncement(remote net.Addr, conn *net.UDPConn) bool {
if debug {
l.Debugf("discover %s: broadcast: Sending self announcement to %v", d.url, remote)
}
ann := d.announcer.Announcement()
pkt, err := ann.MarshalXDR()
if err != nil {
return false
}
myID := protocol.DeviceIDFromBytes(ann.This.ID)
_, err = conn.WriteTo(pkt, remote)
if err != nil {
if debug {
l.Debugf("discover %s: broadcast: Failed to send self announcement: %s", d.url, err)
}
return false
}
// Verify that the announce server responds positively for our device ID
time.Sleep(1 * time.Second)
ann, err = d.Lookup(myID)
if err != nil && debug {
l.Debugf("discover %s: broadcast: Self-lookup failed: %v", d.url, err)
} else if debug {
l.Debugf("discover %s: broadcast: Self-lookup returned: %v", d.url, ann.This.Addresses)
}
return len(ann.This.Addresses) > 0
}
func (d *UDPClient) Lookup(device protocol.DeviceID) (Announce, error) {
extIP, err := net.ResolveUDPAddr(d.url.Scheme, d.url.Host)
if err != nil {
if debug {
l.Debugf("discover %s: Lookup(%s): %s", d.url, device, err)
}
return Announce{}, err
}
conn, err := net.DialUDP(d.url.Scheme, d.listenAddress, extIP)
if err != nil {
if debug {
l.Debugf("discover %s: Lookup(%s): %s", d.url, device, err)
}
return Announce{}, err
}
defer conn.Close()
err = conn.SetDeadline(time.Now().Add(5 * time.Second))
if err != nil {
if debug {
l.Debugf("discover %s: Lookup(%s): %s", d.url, device, err)
}
return Announce{}, err
}
buf := Query{QueryMagic, device[:]}.MustMarshalXDR()
_, err = conn.Write(buf)
if err != nil {
if debug {
l.Debugf("discover %s: Lookup(%s): %s", d.url, device, err)
}
return Announce{}, err
}
buf = make([]byte, 2048)
n, err := conn.Read(buf)
if err != nil {
if err, ok := err.(net.Error); ok && err.Timeout() {
// Expected if the server doesn't know about requested device ID
return Announce{}, err
}
if debug {
l.Debugf("discover %s: Lookup(%s): %s", d.url, device, err)
}
return Announce{}, err
}
var pkt Announce
err = pkt.UnmarshalXDR(buf[:n])
if err != nil && err != io.EOF {
if debug {
l.Debugf("discover %s: Lookup(%s): %s\n%s", d.url, device, err, hex.Dump(buf[:n]))
}
return Announce{}, err
}
if debug {
l.Debugf("discover %s: Lookup(%s) result: %v relays: %v", d.url, device, pkt.This.Addresses, pkt.This.Relays)
}
return pkt, nil
}
func (d *UDPClient) Stop() {
if d.stop != nil {
close(d.stop)
d.wg.Wait()
}
}
func (d *UDPClient) StatusOK() bool {
d.mut.RLock()
defer d.mut.RUnlock()
return d.status
}
func (d *UDPClient) Address() string {
return d.url.String()
}