lib/connections: React to listeners going up and down faster (#6590)

This commit is contained in:
Audrius Butkevicius 2020-05-11 14:02:22 +01:00 committed by GitHub
parent 50faa8f7ef
commit 7dc290c3ed
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 57 additions and 17 deletions

View File

@ -102,7 +102,9 @@ func (t *quicListener) serve(ctx context.Context) error {
l.Infoln("Listen (BEP/quic):", err) l.Infoln("Listen (BEP/quic):", err)
return err return err
} }
t.notifyAddressesChanged(t)
defer listener.Close() defer listener.Close()
defer t.clearAddresses(t)
l.Infof("QUIC listener (%v) starting", packetConn.LocalAddr()) l.Infof("QUIC listener (%v) starting", packetConn.LocalAddr())
defer l.Infof("QUIC listener (%v) shutting down", packetConn.LocalAddr()) defer l.Infof("QUIC listener (%v) shutting down", packetConn.LocalAddr())

View File

@ -57,10 +57,12 @@ func (t *relayListener) serve(ctx context.Context) error {
defer clnt.Stop() defer clnt.Stop()
t.mut.Unlock() t.mut.Unlock()
oldURI := clnt.URI() // Start with nil, so that we send a addresses changed notification as soon as we connect somewhere.
var oldURI *url.URL
l.Infof("Relay listener (%v) starting", t) l.Infof("Relay listener (%v) starting", t)
defer l.Infof("Relay listener (%v) shutting down", t) defer l.Infof("Relay listener (%v) shutting down", t)
defer t.clearAddresses(t)
for { for {
select { select {

View File

@ -565,11 +565,11 @@ func (s *service) createListener(factory listenerFactory, uri *url.URL) bool {
return true return true
} }
func (s *service) logListenAddressesChangedEvent(l genericListener) { func (s *service) logListenAddressesChangedEvent(l ListenerAddresses) {
s.evLogger.Log(events.ListenAddressesChanged, map[string]interface{}{ s.evLogger.Log(events.ListenAddressesChanged, map[string]interface{}{
"address": l.URI(), "address": l.URI,
"lan": l.LANAddresses(), "lan": l.LANAddresses,
"wan": l.WANAddresses(), "wan": l.WANAddresses,
}) })
} }

View File

@ -174,6 +174,12 @@ type listenerFactory interface {
Valid(config.Configuration) error Valid(config.Configuration) error
} }
type ListenerAddresses struct {
URI *url.URL
WANAddresses []*url.URL
LANAddresses []*url.URL
}
type genericListener interface { type genericListener interface {
Serve() Serve()
Stop() Stop()
@ -188,7 +194,7 @@ type genericListener interface {
WANAddresses() []*url.URL WANAddresses() []*url.URL
LANAddresses() []*url.URL LANAddresses() []*url.URL
Error() error Error() error
OnAddressesChanged(func(genericListener)) OnAddressesChanged(func(ListenerAddresses))
String() string String() string
Factory() listenerFactory Factory() listenerFactory
NATType() string NATType() string
@ -203,14 +209,28 @@ type Model interface {
} }
type onAddressesChangedNotifier struct { type onAddressesChangedNotifier struct {
callbacks []func(genericListener) callbacks []func(ListenerAddresses)
} }
func (o *onAddressesChangedNotifier) OnAddressesChanged(callback func(genericListener)) { func (o *onAddressesChangedNotifier) OnAddressesChanged(callback func(ListenerAddresses)) {
o.callbacks = append(o.callbacks, callback) o.callbacks = append(o.callbacks, callback)
} }
func (o *onAddressesChangedNotifier) notifyAddressesChanged(l genericListener) { func (o *onAddressesChangedNotifier) notifyAddressesChanged(l genericListener) {
o.notifyAddresses(ListenerAddresses{
URI: l.URI(),
WANAddresses: l.WANAddresses(),
LANAddresses: l.LANAddresses(),
})
}
func (o *onAddressesChangedNotifier) clearAddresses(l genericListener) {
o.notifyAddresses(ListenerAddresses{
URI: l.URI(),
})
}
func (o *onAddressesChangedNotifier) notifyAddresses(l ListenerAddresses) {
for _, callback := range o.callbacks { for _, callback := range o.callbacks {
callback(l) callback(l)
} }

View File

@ -55,7 +55,9 @@ func (t *tcpListener) serve(ctx context.Context) error {
l.Infoln("Listen (BEP/tcp):", err) l.Infoln("Listen (BEP/tcp):", err)
return err return err
} }
t.notifyAddressesChanged(t)
defer listener.Close() defer listener.Close()
defer t.clearAddresses(t)
l.Infof("TCP listener (%v) starting", listener.Addr()) l.Infof("TCP listener (%v) starting", listener.Addr())
defer l.Infof("TCP listener (%v) shutting down", listener.Addr()) defer l.Infof("TCP listener (%v) shutting down", listener.Addr())

View File

@ -46,9 +46,10 @@ type httpClient interface {
} }
const ( const (
defaultReannounceInterval = 30 * time.Minute defaultReannounceInterval = 30 * time.Minute
announceErrorRetryInterval = 5 * time.Minute announceErrorRetryInterval = 5 * time.Minute
requestTimeout = 5 * time.Second requestTimeout = 5 * time.Second
maxAddressChangesBetweenAnnouncements = 10
) )
type announcement struct { type announcement struct {
@ -197,20 +198,33 @@ func (c *globalClient) serve(ctx context.Context) {
return return
} }
timer := time.NewTimer(0) timer := time.NewTimer(5 * time.Second)
defer timer.Stop() defer timer.Stop()
eventSub := c.evLogger.Subscribe(events.ListenAddressesChanged) eventSub := c.evLogger.Subscribe(events.ListenAddressesChanged)
defer eventSub.Unsubscribe() defer eventSub.Unsubscribe()
timerResetCount := 0
for { for {
select { select {
case <-eventSub.C(): case <-eventSub.C():
// Defer announcement by 2 seconds, essentially debouncing if timerResetCount < maxAddressChangesBetweenAnnouncements {
// if we have a stream of events incoming in quick succession. // Defer announcement by 2 seconds, essentially debouncing
timer.Reset(2 * time.Second) // if we have a stream of events incoming in quick succession.
timer.Reset(2 * time.Second)
} else if timerResetCount == maxAddressChangesBetweenAnnouncements {
// Yet only do it if we haven't had to reset maxAddressChangesBetweenAnnouncements times in a row,
// so if something is flip-flopping within 2 seconds, we don't end up in a permanent reset loop.
l.Warnf("Detected a flip-flopping listener")
c.setError(errors.New("flip flopping listener"))
// Incrementing the count above 10 will prevent us from warning or setting the error again
// It will also suppress event based resets until we've had a proper round after announceErrorRetryInterval
timer.Reset(announceErrorRetryInterval)
}
timerResetCount++
case <-timer.C: case <-timer.C:
timerResetCount = 0
c.sendAnnouncement(ctx, timer) c.sendAnnouncement(ctx, timer)
case <-ctx.Done(): case <-ctx.Done():
@ -237,7 +251,7 @@ func (c *globalClient) sendAnnouncement(ctx context.Context, timer *time.Timer)
// The marshal doesn't fail, I promise. // The marshal doesn't fail, I promise.
postData, _ := json.Marshal(ann) postData, _ := json.Marshal(ann)
l.Debugf("Announcement: %s", postData) l.Debugf("Announcement: %v", ann)
resp, err := c.announceClient.Post(ctx, c.server, "application/json", bytes.NewReader(postData)) resp, err := c.announceClient.Post(ctx, c.server, "application/json", bytes.NewReader(postData))
if err != nil { if err != nil {