// Copyright (C) 2014 The Syncthing Authors. // // This Source Code Form is subject to the terms of the Mozilla Public // License, v. 2.0. If a copy of the MPL was not distributed with this file, // You can obtain one at https://mozilla.org/MPL/2.0/. // Package events provides event subscription and polling functionality. package events import ( "encoding/json" "errors" "runtime" "time" "github.com/thejerf/suture" "github.com/syncthing/syncthing/lib/sync" "github.com/syncthing/syncthing/lib/util" ) type EventType int const ( Starting EventType = 1 << iota StartupComplete DeviceDiscovered DeviceConnected DeviceDisconnected DeviceRejected DevicePaused DeviceResumed LocalChangeDetected RemoteChangeDetected LocalIndexUpdated RemoteIndexUpdated ItemStarted ItemFinished StateChanged FolderRejected ConfigSaved DownloadProgress RemoteDownloadProgress FolderSummary FolderCompletion FolderErrors FolderScanProgress FolderPaused FolderResumed FolderWatchStateChanged ListenAddressesChanged LoginAttempt AllEvents = (1 << iota) - 1 ) var ( runningTests = false errNoop = errors.New("method of a noop object called") ) const eventLogTimeout = 15 * time.Millisecond func (t EventType) String() string { switch t { case Starting: return "Starting" case StartupComplete: return "StartupComplete" case DeviceDiscovered: return "DeviceDiscovered" case DeviceConnected: return "DeviceConnected" case DeviceDisconnected: return "DeviceDisconnected" case DeviceRejected: return "DeviceRejected" case LocalChangeDetected: return "LocalChangeDetected" case RemoteChangeDetected: return "RemoteChangeDetected" case LocalIndexUpdated: return "LocalIndexUpdated" case RemoteIndexUpdated: return "RemoteIndexUpdated" case ItemStarted: return "ItemStarted" case ItemFinished: return "ItemFinished" case StateChanged: return "StateChanged" case FolderRejected: return "FolderRejected" case ConfigSaved: return "ConfigSaved" case DownloadProgress: return "DownloadProgress" case RemoteDownloadProgress: return "RemoteDownloadProgress" case FolderSummary: return "FolderSummary" case FolderCompletion: return "FolderCompletion" case FolderErrors: return "FolderErrors" case DevicePaused: return "DevicePaused" case DeviceResumed: return "DeviceResumed" case FolderScanProgress: return "FolderScanProgress" case FolderPaused: return "FolderPaused" case FolderResumed: return "FolderResumed" case ListenAddressesChanged: return "ListenAddressesChanged" case LoginAttempt: return "LoginAttempt" case FolderWatchStateChanged: return "FolderWatchStateChanged" default: return "Unknown" } } func (t EventType) MarshalText() ([]byte, error) { return []byte(t.String()), nil } func (t *EventType) UnmarshalJSON(b []byte) error { var s string if err := json.Unmarshal(b, &s); err != nil { return err } *t = UnmarshalEventType(s) return nil } func UnmarshalEventType(s string) EventType { switch s { case "Starting": return Starting case "StartupComplete": return StartupComplete case "DeviceDiscovered": return DeviceDiscovered case "DeviceConnected": return DeviceConnected case "DeviceDisconnected": return DeviceDisconnected case "DeviceRejected": return DeviceRejected case "LocalChangeDetected": return LocalChangeDetected case "RemoteChangeDetected": return RemoteChangeDetected case "LocalIndexUpdated": return LocalIndexUpdated case "RemoteIndexUpdated": return RemoteIndexUpdated case "ItemStarted": return ItemStarted case "ItemFinished": return ItemFinished case "StateChanged": return StateChanged case "FolderRejected": return FolderRejected case "ConfigSaved": return ConfigSaved case "DownloadProgress": return DownloadProgress case "RemoteDownloadProgress": return RemoteDownloadProgress case "FolderSummary": return FolderSummary case "FolderCompletion": return FolderCompletion case "FolderErrors": return FolderErrors case "DevicePaused": return DevicePaused case "DeviceResumed": return DeviceResumed case "FolderScanProgress": return FolderScanProgress case "FolderPaused": return FolderPaused case "FolderResumed": return FolderResumed case "ListenAddressesChanged": return ListenAddressesChanged case "LoginAttempt": return LoginAttempt case "FolderWatchStateChanged": return FolderWatchStateChanged default: return 0 } } const BufferSize = 64 type Logger interface { suture.Service Log(t EventType, data interface{}) Subscribe(mask EventType) Subscription } type logger struct { suture.Service subs []*subscription nextSubscriptionIDs []int nextGlobalID int timeout *time.Timer events chan Event funcs chan func() toUnsubscribe chan *subscription stop chan struct{} } type Event struct { // Per-subscription sequential event ID. Named "id" for backwards compatibility with the REST API SubscriptionID int `json:"id"` // Global ID of the event across all subscriptions GlobalID int `json:"globalID"` Time time.Time `json:"time"` Type EventType `json:"type"` Data interface{} `json:"data"` } type Subscription interface { C() <-chan Event Poll(timeout time.Duration) (Event, error) Unsubscribe() } type subscription struct { mask EventType events chan Event toUnsubscribe chan *subscription timeout *time.Timer } var ( ErrTimeout = errors.New("timeout") ErrClosed = errors.New("closed") ) func NewLogger() Logger { l := &logger{ timeout: time.NewTimer(time.Second), events: make(chan Event, BufferSize), funcs: make(chan func()), toUnsubscribe: make(chan *subscription), } l.Service = util.AsService(l.serve) // Make sure the timer is in the stopped state and hasn't fired anything // into the channel. if !l.timeout.Stop() { <-l.timeout.C } return l } func (l *logger) serve(stop chan struct{}) { loop: for { select { case e := <-l.events: // Incoming events get sent l.sendEvent(e) case fn := <-l.funcs: // Subscriptions are handled here. fn() case s := <-l.toUnsubscribe: l.unsubscribe(s) case <-stop: break loop } } // Closing the event channels corresponds to what happens when a // subscription is unsubscribed; this stops any BufferedSubscription, // makes Poll() return ErrClosed, etc. for _, s := range l.subs { close(s.events) } } func (l *logger) Log(t EventType, data interface{}) { l.events <- Event{ Time: time.Now(), Type: t, Data: data, // SubscriptionID and GlobalID are set in sendEvent } } func (l *logger) sendEvent(e Event) { l.nextGlobalID++ dl.Debugln("log", l.nextGlobalID, e.Type, e.Data) e.GlobalID = l.nextGlobalID for i, s := range l.subs { if s.mask&e.Type != 0 { e.SubscriptionID = l.nextSubscriptionIDs[i] l.nextSubscriptionIDs[i]++ l.timeout.Reset(eventLogTimeout) timedOut := false select { case s.events <- e: case <-l.timeout.C: // if s.events is not ready, drop the event timedOut = true } // If stop returns false it already sent something to the // channel. If we didn't already read it above we must do so now // or we get a spurious timeout on the next loop. if !l.timeout.Stop() && !timedOut { <-l.timeout.C } } } } func (l *logger) Subscribe(mask EventType) Subscription { res := make(chan Subscription) l.funcs <- func() { dl.Debugln("subscribe", mask) s := &subscription{ mask: mask, events: make(chan Event, BufferSize), toUnsubscribe: l.toUnsubscribe, timeout: time.NewTimer(0), } // We need to create the timeout timer in the stopped, non-fired state so // that Subscription.Poll() can safely reset it and select on the timeout // channel. This ensures the timer is stopped and the channel drained. if runningTests { // Make the behavior stable when running tests to avoid randomly // varying test coverage. This ensures, in practice if not in // theory, that the timer fires and we take the true branch of the // next if. runtime.Gosched() } if !s.timeout.Stop() { <-s.timeout.C } l.subs = append(l.subs, s) l.nextSubscriptionIDs = append(l.nextSubscriptionIDs, 1) res <- s } return <-res } func (l *logger) unsubscribe(s *subscription) { dl.Debugln("unsubscribe", s.mask) for i, ss := range l.subs { if s == ss { last := len(l.subs) - 1 l.subs[i] = l.subs[last] l.subs[last] = nil l.subs = l.subs[:last] l.nextSubscriptionIDs[i] = l.nextSubscriptionIDs[last] l.nextSubscriptionIDs[last] = 0 l.nextSubscriptionIDs = l.nextSubscriptionIDs[:last] break } } close(s.events) } // Poll returns an event from the subscription or an error if the poll times // out of the event channel is closed. Poll should not be called concurrently // from multiple goroutines for a single subscription. func (s *subscription) Poll(timeout time.Duration) (Event, error) { dl.Debugln("poll", timeout) s.timeout.Reset(timeout) select { case e, ok := <-s.events: if !ok { return e, ErrClosed } if runningTests { // Make the behavior stable when running tests to avoid randomly // varying test coverage. This ensures, in practice if not in // theory, that the timer fires and we take the true branch of // the next if. s.timeout.Reset(0) runtime.Gosched() } if !s.timeout.Stop() { // The timeout must be stopped and possibly drained to be ready // for reuse in the next call. <-s.timeout.C } return e, nil case <-s.timeout.C: return Event{}, ErrTimeout } } func (s *subscription) C() <-chan Event { return s.events } func (s *subscription) Unsubscribe() { s.toUnsubscribe <- s } type bufferedSubscription struct { sub Subscription buf []Event next int cur int // Current SubscriptionID mut sync.Mutex cond *sync.TimeoutCond } type BufferedSubscription interface { Since(id int, into []Event, timeout time.Duration) []Event } func NewBufferedSubscription(s Subscription, size int) BufferedSubscription { bs := &bufferedSubscription{ sub: s, buf: make([]Event, size), mut: sync.NewMutex(), } bs.cond = sync.NewTimeoutCond(bs.mut) go bs.pollingLoop() return bs } func (s *bufferedSubscription) pollingLoop() { for ev := range s.sub.C() { s.mut.Lock() s.buf[s.next] = ev s.next = (s.next + 1) % len(s.buf) s.cur = ev.SubscriptionID s.cond.Broadcast() s.mut.Unlock() } } func (s *bufferedSubscription) Since(id int, into []Event, timeout time.Duration) []Event { s.mut.Lock() defer s.mut.Unlock() // Check once first before generating the TimeoutCondWaiter if id >= s.cur { waiter := s.cond.SetupWait(timeout) defer waiter.Stop() for id >= s.cur { if eventsAvailable := waiter.Wait(); !eventsAvailable { // Timed out return into } } } for i := s.next; i < len(s.buf); i++ { if s.buf[i].SubscriptionID > id { into = append(into, s.buf[i]) } } for i := 0; i < s.next; i++ { if s.buf[i].SubscriptionID > id { into = append(into, s.buf[i]) } } return into } // Error returns a string pointer suitable for JSON marshalling errors. It // retains the "null on success" semantics, but ensures the error result is a // string regardless of the underlying concrete error type. func Error(err error) *string { if err == nil { return nil } str := err.Error() return &str } type noopLogger struct{} var NoopLogger Logger = &noopLogger{} func (*noopLogger) Serve() {} func (*noopLogger) Stop() {} func (*noopLogger) Log(t EventType, data interface{}) {} func (*noopLogger) Subscribe(mask EventType) Subscription { return &noopSubscription{} } type noopSubscription struct{} func (*noopSubscription) C() <-chan Event { return nil } func (*noopSubscription) Poll(timeout time.Duration) (Event, error) { return Event{}, errNoop } func (*noopSubscription) Unsubscribe() {}