From 8903825e020e51ed5cba5f105cdbad1988d3612d Mon Sep 17 00:00:00 2001 From: Jakob Borg Date: Wed, 14 May 2014 15:26:05 -0300 Subject: [PATCH] Use UDP broadcasts instead of multicast for discovery --- discover/discover.go | 16 +++-- mc/beacon.go | 153 ++++++++++++++++++++++-------------------- mc/cmd/mctest/main.go | 5 +- 3 files changed, 94 insertions(+), 80 deletions(-) diff --git a/discover/discover.go b/discover/discover.go index 6bfa95d80..ab53009e1 100644 --- a/discover/discover.go +++ b/discover/discover.go @@ -44,12 +44,16 @@ var ( const maxErrors = 30 func NewDiscoverer(id string, addresses []string) (*Discoverer, error) { + b, err := mc.NewBeacon(21025) + if err != nil { + return nil, err + } disc := &Discoverer{ myID: id, listenAddrs: addresses, localBcastIntv: 30 * time.Second, globalBcastIntv: 1800 * time.Second, - beacon: mc.NewBeacon("239.21.0.25", 21025), + beacon: b, registry: make(map[string][]string), } @@ -251,11 +255,11 @@ func (d *Discoverer) recvAnnouncements() { if pkt.This.ID != d.myID { n := d.registerNode(addr, pkt.This) newNode = newNode || n - } - for _, node := range pkt.Extra { - if node.ID != d.myID { - n := d.registerNode(nil, node) - newNode = newNode || n + for _, node := range pkt.Extra { + if node.ID != d.myID { + n := d.registerNode(nil, node) + newNode = newNode || n + } } } diff --git a/mc/beacon.go b/mc/beacon.go index c0e057f90..ceb4842c1 100644 --- a/mc/beacon.go +++ b/mc/beacon.go @@ -1,9 +1,6 @@ package mc -import ( - "log" - "net" -) +import "net" type recv struct { data []byte @@ -16,22 +13,29 @@ type dst struct { } type Beacon struct { - group string + conn *net.UDPConn port int conns []dst inbox chan []byte outbox chan recv } -func NewBeacon(group string, port int) *Beacon { +func NewBeacon(port int) (*Beacon, error) { + conn, err := net.ListenUDP("udp", &net.UDPAddr{Port: port}) + if err != nil { + return nil, err + } b := &Beacon{ - group: group, + conn: conn, port: port, inbox: make(chan []byte), - outbox: make(chan recv), + outbox: make(chan recv, 16), } - go b.run() - return b + + go b.reader() + go b.writer() + + return b, nil } func (b *Beacon) Send(data []byte) { @@ -43,72 +47,75 @@ func (b *Beacon) Recv() ([]byte, net.Addr) { return recv.data, recv.src } -func (b *Beacon) run() { - group := &net.UDPAddr{IP: net.ParseIP(b.group), Port: b.port} - - intfs, err := net.Interfaces() - if err != nil { - log.Fatal(err) - } - if debug { - dlog.Printf("trying %d interfaces", len(intfs)) - } - - for _, intf := range intfs { - intf := intf - - if debug { - dlog.Printf("trying interface %q", intf.Name) - } - conn, err := net.ListenMulticastUDP("udp4", &intf, group) +func (b *Beacon) reader() { + var bs = make([]byte, 65536) + for { + n, addr, err := b.conn.ReadFrom(bs) if err != nil { + dlog.Println(err) + return + } + if debug { + dlog.Printf("recv %d bytes from %s", n, addr) + } + select { + case b.outbox <- recv{bs[:n], addr}: + default: if debug { - dlog.Printf("failed to listen for multicast group on %q: %v", intf.Name, err) - } - } else { - b.conns = append(b.conns, dst{intf.Name, conn}) - if debug { - dlog.Printf("listening for multicast group on %q", intf.Name) + dlog.Println("Dropping message") } } } - - for _, dst := range b.conns { - dst := dst - go func() { - for { - var bs = make([]byte, 1500) - n, addr, err := dst.conn.ReadFrom(bs) - if err != nil { - dlog.Println(err) - return - } - if debug { - dlog.Printf("recv %d bytes from %s on %s", n, addr, dst.intf) - } - select { - case b.outbox <- recv{bs[:n], addr}: - default: - if debug { - dlog.Println("Dropping message") - } - } - } - }() - } - - go func() { - for bs := range b.inbox { - for _, dst := range b.conns { - _, err := dst.conn.WriteTo(bs, group) - if err != nil { - dlog.Println(err) - return - } - if debug { - dlog.Printf("sent %d bytes to %s on %s", len(bs), group, dst.intf) - } - } - } - }() +} + +func (b *Beacon) writer() { + for bs := range b.inbox { + + addrs, err := net.InterfaceAddrs() + if err != nil { + dlog.Println(err) + continue + } + + var dsts []net.IP + for _, addr := range addrs { + if iaddr, ok := addr.(*net.IPNet); ok && iaddr.IP.IsGlobalUnicast() { + baddr := bcast(iaddr) + dsts = append(dsts, baddr.IP) + } + } + + if len(dsts) == 0 { + // Fall back to the general IPv4 broadcast address + dsts = append(dsts, net.IP{0xff, 0xff, 0xff, 0xff}) + } + + for _, ip := range dsts { + dst := &net.UDPAddr{IP: ip, Port: b.port} + + _, err := b.conn.WriteTo(bs, dst) + if err != nil { + dlog.Println(err) + return + } + if debug { + dlog.Printf("sent %d bytes to %s", len(bs), dst) + } + } + } +} + +func bcast(ip *net.IPNet) *net.IPNet { + var bc = &net.IPNet{} + bc.IP = make([]byte, len(ip.IP)) + copy(bc.IP, ip.IP) + bc.Mask = ip.Mask + + offset := len(bc.IP) - len(bc.Mask) + for i := range bc.IP { + if i-offset > 0 { + bc.IP[i] = ip.IP[i] | ^ip.Mask[i-offset] + } + } + return bc } diff --git a/mc/cmd/mctest/main.go b/mc/cmd/mctest/main.go index 0adac2ea9..d334b4929 100644 --- a/mc/cmd/mctest/main.go +++ b/mc/cmd/mctest/main.go @@ -9,7 +9,10 @@ import ( ) func main() { - b := mc.NewBeacon("239.21.0.25", 21025) + b, err := mc.NewBeacon(21025) + if err != nil { + log.Fatal(err) + } go func() { for { bs, addr := b.Recv()