Merge pull request #2330 from calmh/eventids

Subscribing to events should not bump event ID (fixes #2329)
This commit is contained in:
Audrius Butkevicius 2015-09-29 19:05:57 +01:00
commit b614cfffcb
2 changed files with 15 additions and 10 deletions

View File

@ -105,7 +105,7 @@ func (t EventType) MarshalText() ([]byte, error) {
const BufferSize = 64 const BufferSize = 64
type Logger struct { type Logger struct {
subs map[int]*Subscription subs []*Subscription
nextID int nextID int
mutex sync.Mutex mutex sync.Mutex
} }
@ -119,7 +119,6 @@ type Event struct {
type Subscription struct { type Subscription struct {
mask EventType mask EventType
id int
events chan Event events chan Event
timeout *time.Timer timeout *time.Timer
} }
@ -133,7 +132,6 @@ var (
func NewLogger() *Logger { func NewLogger() *Logger {
return &Logger{ return &Logger{
subs: make(map[int]*Subscription),
mutex: sync.NewMutex(), mutex: sync.NewMutex(),
} }
} }
@ -143,13 +141,13 @@ func (l *Logger) Log(t EventType, data interface{}) {
if debug { if debug {
dl.Debugln("log", l.nextID, t.String(), data) dl.Debugln("log", l.nextID, t.String(), data)
} }
l.nextID++
e := Event{ e := Event{
ID: l.nextID, ID: l.nextID,
Time: time.Now(), Time: time.Now(),
Type: t, Type: t,
Data: data, Data: data,
} }
l.nextID++
for _, s := range l.subs { for _, s := range l.subs {
if s.mask&t != 0 { if s.mask&t != 0 {
select { select {
@ -169,12 +167,10 @@ func (l *Logger) Subscribe(mask EventType) *Subscription {
} }
s := &Subscription{ s := &Subscription{
mask: mask, mask: mask,
id: l.nextID,
events: make(chan Event, BufferSize), events: make(chan Event, BufferSize),
timeout: time.NewTimer(0), timeout: time.NewTimer(0),
} }
l.nextID++ l.subs = append(l.subs, s)
l.subs[s.id] = s
l.mutex.Unlock() l.mutex.Unlock()
return s return s
} }
@ -184,7 +180,15 @@ func (l *Logger) Unsubscribe(s *Subscription) {
if debug { if debug {
dl.Debugln("unsubscribe") dl.Debugln("unsubscribe")
} }
delete(l.subs, s.id) for i, ss := range l.subs {
if s == ss {
last := len(l.subs) - 1
l.subs[i] = l.subs[last]
l.subs[last] = nil
l.subs = l.subs[:last]
break
}
}
close(s.events) close(s.events)
l.mutex.Unlock() l.mutex.Unlock()
} }

View File

@ -134,6 +134,7 @@ func TestIDs(t *testing.T) {
s := l.Subscribe(events.AllEvents) s := l.Subscribe(events.AllEvents)
defer l.Unsubscribe(s) defer l.Unsubscribe(s)
l.Log(events.DeviceConnected, "foo") l.Log(events.DeviceConnected, "foo")
_ = l.Subscribe(events.AllEvents)
l.Log(events.DeviceConnected, "bar") l.Log(events.DeviceConnected, "bar")
ev, err := s.Poll(timeout) ev, err := s.Poll(timeout)
@ -152,8 +153,8 @@ func TestIDs(t *testing.T) {
if ev.Data.(string) != "bar" { if ev.Data.(string) != "bar" {
t.Fatal("Incorrect event:", ev) t.Fatal("Incorrect event:", ev)
} }
if !(ev.ID > id) { if ev.ID != id+1 {
t.Fatalf("ID not incremented (%d !> %d)", ev.ID, id) t.Fatalf("ID not incremented (%d != %d)", ev.ID, id+1)
} }
} }