diff --git a/lib/events/events.go b/lib/events/events.go index 2abd61af0..453547099 100644 --- a/lib/events/events.go +++ b/lib/events/events.go @@ -51,6 +51,8 @@ const ( var runningTests = false +const eventLogTimeout = 15 * time.Millisecond + func (t EventType) String() string { switch t { case Starting: @@ -122,6 +124,7 @@ type Logger struct { subs []*Subscription nextSubscriptionIDs []int nextGlobalID int + timeout *time.Timer mutex sync.Mutex } @@ -149,9 +152,16 @@ var ( ) func NewLogger() *Logger { - return &Logger{ - mutex: sync.NewMutex(), + l := &Logger{ + mutex: sync.NewMutex(), + timeout: time.NewTimer(time.Second), } + // Make sure the timer is in the stopped state and hasn't fired anything + // into the channel. + if !l.timeout.Stop() { + <-l.timeout.C + } + return l } func (l *Logger) Log(t EventType, data interface{}) { @@ -171,10 +181,21 @@ func (l *Logger) Log(t EventType, data interface{}) { e.SubscriptionID = l.nextSubscriptionIDs[i] l.nextSubscriptionIDs[i]++ + l.timeout.Reset(eventLogTimeout) + timedOut := false + select { case s.events <- e: - default: + case <-l.timeout.C: // if s.events is not ready, drop the event + timedOut = true + } + + // If stop returns false it already sent something to the + // channel. If we didn't already read it above we must do so now + // or we get a spurious timeout on the next loop. + if !l.timeout.Stop() && !timedOut { + <-l.timeout.C } } } diff --git a/lib/events/events_test.go b/lib/events/events_test.go index ceec063e5..950fc50aa 100644 --- a/lib/events/events_test.go +++ b/lib/events/events_test.go @@ -12,7 +12,7 @@ import ( "time" ) -const timeout = 100 * time.Millisecond +const timeout = 5 * time.Second func init() { runningTests = true