From d657bc4e3db1a625619f8954d3b92c58fe8ea795 Mon Sep 17 00:00:00 2001 From: Jakob Borg Date: Sun, 17 Aug 2014 15:01:48 +0200 Subject: [PATCH] Implement IPv6 multicast again (fixes #346) --- auto/gui.files.go | 2 +- beacon/beacon.go | 46 ++++++++++++++++++++++++++++ beacon/broadcast.go | 36 +--------------------- beacon/multicast.go | 70 +++++++++++++++++++++++++++++++++++++++++++ cmd/syncthing/main.go | 2 +- config/config.go | 1 + config/config_test.go | 3 ++ discover/discover.go | 39 +++++++++++++++++------- gui/index.html | 4 --- 9 files changed, 151 insertions(+), 52 deletions(-) create mode 100644 beacon/beacon.go create mode 100644 beacon/multicast.go diff --git a/auto/gui.files.go b/auto/gui.files.go index 28a9ea0b7..24312cdbd 100644 --- a/auto/gui.files.go +++ b/auto/gui.files.go @@ -73,7 +73,7 @@ func init() { bs, _ = ioutil.ReadAll(gr) Assets["favicon.png"] = bs - bs, _ = hex.DecodeString("") + bs, _ = hex.DecodeString("") gr, _ = gzip.NewReader(bytes.NewBuffer(bs)) bs, _ = ioutil.ReadAll(gr) Assets["index.html"] = bs diff --git a/beacon/beacon.go b/beacon/beacon.go new file mode 100644 index 000000000..fd4bc162c --- /dev/null +++ b/beacon/beacon.go @@ -0,0 +1,46 @@ +// Copyright (C) 2014 Jakob Borg and Contributors (see the CONTRIBUTORS file). +// All rights reserved. Use of this source code is governed by an MIT-style +// license that can be found in the LICENSE file. + +package beacon + +import "net" + +type recv struct { + data []byte + src net.Addr +} + +type dst struct { + intf string + conn *net.UDPConn +} + +type Interface interface { + Send(data []byte) + Recv() ([]byte, net.Addr) +} + +func genericReader(conn *net.UDPConn, outbox chan<- recv) { + bs := make([]byte, 65536) + for { + n, addr, err := conn.ReadFrom(bs) + if err != nil { + l.Warnln("multicast read:", err) + return + } + if debug { + l.Debugf("recv %d bytes from %s", n, addr) + } + + c := make([]byte, n) + copy(c, bs) + select { + case outbox <- recv{c, addr}: + default: + if debug { + l.Debugln("dropping message") + } + } + } +} diff --git a/beacon/broadcast.go b/beacon/broadcast.go index 11d7d723d..94067892e 100644 --- a/beacon/broadcast.go +++ b/beacon/broadcast.go @@ -6,16 +6,6 @@ package beacon import "net" -type recv struct { - data []byte - src net.Addr -} - -type dst struct { - intf string - conn *net.UDPConn -} - type Broadcast struct { conn *net.UDPConn port int @@ -36,7 +26,7 @@ func NewBroadcast(port int) (*Broadcast, error) { outbox: make(chan recv, 16), } - go b.reader() + go genericReader(b.conn, b.outbox) go b.writer() return b, nil @@ -51,30 +41,6 @@ func (b *Broadcast) Recv() ([]byte, net.Addr) { return recv.data, recv.src } -func (b *Broadcast) reader() { - bs := make([]byte, 65536) - for { - n, addr, err := b.conn.ReadFrom(bs) - if err != nil { - l.Warnln("Broadcast read:", err) - return - } - if debug { - l.Debugf("recv %d bytes from %s", n, addr) - } - - c := make([]byte, n) - copy(c, bs) - select { - case b.outbox <- recv{c, addr}: - default: - if debug { - l.Debugln("dropping message") - } - } - } -} - func (b *Broadcast) writer() { for bs := range b.inbox { diff --git a/beacon/multicast.go b/beacon/multicast.go new file mode 100644 index 000000000..586393231 --- /dev/null +++ b/beacon/multicast.go @@ -0,0 +1,70 @@ +// Copyright (C) 2014 Jakob Borg and Contributors (see the CONTRIBUTORS file). +// All rights reserved. Use of this source code is governed by an MIT-style +// license that can be found in the LICENSE file. + +package beacon + +import "net" + +type Multicast struct { + conn *net.UDPConn + addr *net.UDPAddr + conns []dst + inbox chan []byte + outbox chan recv +} + +func NewMulticast(addr string) (*Multicast, error) { + gaddr, err := net.ResolveUDPAddr("udp", addr) + if err != nil { + return nil, err + } + conn, err := net.ListenMulticastUDP("udp", nil, gaddr) + if err != nil { + return nil, err + } + b := &Multicast{ + conn: conn, + addr: gaddr, + inbox: make(chan []byte), + outbox: make(chan recv, 16), + } + + go genericReader(b.conn, b.outbox) + go b.writer() + + return b, nil +} + +func (b *Multicast) Send(data []byte) { + b.inbox <- data +} + +func (b *Multicast) Recv() ([]byte, net.Addr) { + recv := <-b.outbox + return recv.data, recv.src +} + +func (b *Multicast) writer() { + for bs := range b.inbox { + intfs, err := net.Interfaces() + if err != nil { + l.Warnln("multicast interfaces:", err) + continue + } + for _, intf := range intfs { + if intf.Flags&net.FlagUp != 0 && intf.Flags&net.FlagMulticast != 0 { + addr := *b.addr + addr.Zone = intf.Name + _, err = b.conn.WriteTo(bs, &addr) + if err != nil { + if debug { + l.Debugln(err, "on write to", addr) + } + } else if debug { + l.Debugf("sent %d bytes to %s", len(bs), addr.String()) + } + } + } + } +} diff --git a/cmd/syncthing/main.go b/cmd/syncthing/main.go index c86c7ad39..c57d64775 100644 --- a/cmd/syncthing/main.go +++ b/cmd/syncthing/main.go @@ -985,7 +985,7 @@ func setTCPOptions(conn *net.TCPConn) { } func discovery(extPort int) *discover.Discoverer { - disc, err := discover.NewDiscoverer(myID, cfg.Options.ListenAddress, cfg.Options.LocalAnnPort) + disc, err := discover.NewDiscoverer(myID, cfg.Options.ListenAddress, cfg.Options.LocalAnnPort, cfg.Options.LocalAnnMCAddr) if err != nil { l.Warnf("No discovery possible (%v)", err) return nil diff --git a/config/config.go b/config/config.go index 7b9a29158..61a19eb5b 100644 --- a/config/config.go +++ b/config/config.go @@ -113,6 +113,7 @@ type OptionsConfiguration struct { GlobalAnnEnabled bool `xml:"globalAnnounceEnabled" default:"true"` LocalAnnEnabled bool `xml:"localAnnounceEnabled" default:"true"` LocalAnnPort int `xml:"localAnnouncePort" default:"21025"` + LocalAnnMCAddr string `xml:"localAnnounceMCAddr" default:"[ff32::5222]:21026"` ParallelRequests int `xml:"parallelRequests" default:"16"` MaxSendKbps int `xml:"maxSendKbps"` RescanIntervalS int `xml:"rescanIntervalS" default:"60"` diff --git a/config/config_test.go b/config/config_test.go index ee21a59b3..582276e0f 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -30,6 +30,7 @@ func TestDefaultValues(t *testing.T) { GlobalAnnEnabled: true, LocalAnnEnabled: true, LocalAnnPort: 21025, + LocalAnnMCAddr: "[ff32::5222]:21026", ParallelRequests: 16, MaxSendKbps: 0, RescanIntervalS: 60, @@ -200,6 +201,7 @@ func TestOverriddenValues(t *testing.T) { false false 42123 + quux:3232 32 1234 600 @@ -218,6 +220,7 @@ func TestOverriddenValues(t *testing.T) { GlobalAnnEnabled: false, LocalAnnEnabled: false, LocalAnnPort: 42123, + LocalAnnMCAddr: "quux:3232", ParallelRequests: 32, MaxSendKbps: 1234, RescanIntervalS: 600, diff --git a/discover/discover.go b/discover/discover.go index c9287ec16..901dc6e35 100644 --- a/discover/discover.go +++ b/discover/discover.go @@ -26,7 +26,8 @@ type Discoverer struct { globalBcastIntv time.Duration errorRetryIntv time.Duration cacheLifetime time.Duration - broadcastBeacon *beacon.Broadcast + broadcastBeacon beacon.Interface + multicastBeacon beacon.Interface registry map[protocol.NodeID][]cacheEntry registryLock sync.RWMutex extServer string @@ -54,11 +55,7 @@ var ( // When we hit this many errors in succession, we stop. const maxErrors = 30 -func NewDiscoverer(id protocol.NodeID, addresses []string, localPort int) (*Discoverer, error) { - b, err := beacon.NewBroadcast(localPort) - if err != nil { - return nil, err - } +func NewDiscoverer(id protocol.NodeID, addresses []string, localPort int, localMCAddr string) (*Discoverer, error) { disc := &Discoverer{ myID: id, listenAddrs: addresses, @@ -66,11 +63,26 @@ func NewDiscoverer(id protocol.NodeID, addresses []string, localPort int) (*Disc globalBcastIntv: 1800 * time.Second, errorRetryIntv: 60 * time.Second, cacheLifetime: 5 * time.Minute, - broadcastBeacon: b, registry: make(map[protocol.NodeID][]cacheEntry), } - go disc.recvAnnouncements() + if localPort > 0 { + bb, err := beacon.NewBroadcast(localPort) + if err != nil { + return nil, err + } + disc.broadcastBeacon = bb + go disc.recvAnnouncements(bb) + } + + if len(localMCAddr) > 0 { + mb, err := beacon.NewMulticast(localMCAddr) + if err != nil { + return nil, err + } + disc.multicastBeacon = mb + go disc.recvAnnouncements(mb) + } return disc, nil } @@ -187,7 +199,12 @@ func (d *Discoverer) sendLocalAnnouncements() { msg := pkt.MarshalXDR() for { - d.broadcastBeacon.Send(msg) + if d.multicastBeacon != nil { + d.multicastBeacon.Send(msg) + } + if d.broadcastBeacon != nil { + d.broadcastBeacon.Send(msg) + } select { case <-d.localBcastTick: @@ -284,9 +301,9 @@ loop: } } -func (d *Discoverer) recvAnnouncements() { +func (d *Discoverer) recvAnnouncements(b beacon.Interface) { for { - buf, addr := d.broadcastBeacon.Recv() + buf, addr := b.Recv() if debug { l.Debugf("discover: read announcement from %s:\n%s", addr, hex.Dump(buf)) diff --git a/gui/index.html b/gui/index.html index 2c37892ed..9a23771d9 100644 --- a/gui/index.html +++ b/gui/index.html @@ -614,10 +614,6 @@ -
- - -