syncthing/lib/events/events.go

345 lines
7.3 KiB
Go
Raw Normal View History

2014-11-16 20:13:20 +00:00
// Copyright (C) 2014 The Syncthing Authors.
2014-09-29 19:43:32 +00:00
//
2015-03-07 20:36:35 +00:00
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at http://mozilla.org/MPL/2.0/.
2014-07-25 12:50:14 +00:00
2014-07-13 19:07:24 +00:00
// Package events provides event subscription and polling functionality.
package events
import (
"errors"
"runtime"
2015-04-22 22:54:31 +00:00
stdsync "sync"
2014-07-13 19:07:24 +00:00
"time"
2015-04-22 22:54:31 +00:00
2015-08-06 09:29:25 +00:00
"github.com/syncthing/syncthing/lib/sync"
2014-07-13 19:07:24 +00:00
)
type EventType int
2014-07-13 19:07:24 +00:00
const (
Ping EventType = 1 << iota
2014-07-17 11:38:36 +00:00
Starting
2014-07-13 19:07:24 +00:00
StartupComplete
DeviceDiscovered
DeviceConnected
DeviceDisconnected
DeviceRejected
2015-08-23 19:56:10 +00:00
DevicePaused
DeviceResumed
LocalChangeDetected
2014-07-13 19:07:24 +00:00
LocalIndexUpdated
RemoteIndexUpdated
ItemStarted
2015-02-01 17:31:19 +00:00
ItemFinished
2014-07-17 11:38:36 +00:00
StateChanged
FolderRejected
2014-09-06 15:31:23 +00:00
ConfigSaved
DownloadProgress
RemoteDownloadProgress
FolderSummary
FolderCompletion
FolderErrors
2015-08-26 22:49:06 +00:00
FolderScanProgress
ListenAddressesChanged
LoginAttempt
2014-07-13 19:07:24 +00:00
AllEvents = (1 << iota) - 1
2014-07-13 19:07:24 +00:00
)
var runningTests = false
2014-07-13 19:07:24 +00:00
func (t EventType) String() string {
switch t {
case Ping:
return "Ping"
2014-07-17 11:38:36 +00:00
case Starting:
return "Starting"
2014-07-13 19:07:24 +00:00
case StartupComplete:
return "StartupComplete"
case DeviceDiscovered:
return "DeviceDiscovered"
case DeviceConnected:
return "DeviceConnected"
case DeviceDisconnected:
return "DeviceDisconnected"
case DeviceRejected:
return "DeviceRejected"
case LocalChangeDetected:
return "LocalChangeDetected"
2014-07-13 19:07:24 +00:00
case LocalIndexUpdated:
return "LocalIndexUpdated"
case RemoteIndexUpdated:
return "RemoteIndexUpdated"
case ItemStarted:
return "ItemStarted"
2015-02-01 17:31:19 +00:00
case ItemFinished:
return "ItemFinished"
2014-07-17 11:38:36 +00:00
case StateChanged:
return "StateChanged"
case FolderRejected:
return "FolderRejected"
2014-09-06 15:31:23 +00:00
case ConfigSaved:
return "ConfigSaved"
case DownloadProgress:
return "DownloadProgress"
case RemoteDownloadProgress:
return "RemoteDownloadProgress"
case FolderSummary:
return "FolderSummary"
case FolderCompletion:
return "FolderCompletion"
case FolderErrors:
return "FolderErrors"
2015-08-23 19:56:10 +00:00
case DevicePaused:
return "DevicePaused"
case DeviceResumed:
return "DeviceResumed"
2015-08-26 22:49:06 +00:00
case FolderScanProgress:
return "FolderScanProgress"
case ListenAddressesChanged:
return "ListenAddressesChanged"
case LoginAttempt:
return "LoginAttempt"
2014-07-13 19:07:24 +00:00
default:
return "Unknown"
}
}
func (t EventType) MarshalText() ([]byte, error) {
return []byte(t.String()), nil
}
const BufferSize = 64
type Logger struct {
subs []*Subscription
nextSubscriptionIDs []int
nextGlobalID int
mutex sync.Mutex
2014-07-13 19:07:24 +00:00
}
type Event struct {
// Per-subscription sequential event ID. Named "id" for backwards compatibility with the REST API
SubscriptionID int `json:"id"`
// Global ID of the event across all subscriptions
GlobalID int `json:"globalID"`
Time time.Time `json:"time"`
Type EventType `json:"type"`
Data interface{} `json:"data"`
2014-07-13 19:07:24 +00:00
}
type Subscription struct {
mask EventType
events chan Event
timeout *time.Timer
2014-07-13 19:07:24 +00:00
}
var Default = NewLogger()
var (
ErrTimeout = errors.New("timeout")
ErrClosed = errors.New("closed")
)
func NewLogger() *Logger {
return &Logger{
2015-04-22 22:54:31 +00:00
mutex: sync.NewMutex(),
2014-07-13 19:07:24 +00:00
}
}
func (l *Logger) Log(t EventType, data interface{}) {
l.mutex.Lock()
dl.Debugln("log", l.nextGlobalID, t, data)
l.nextGlobalID++
2014-07-13 19:07:24 +00:00
e := Event{
GlobalID: l.nextGlobalID,
Time: time.Now(),
Type: t,
Data: data,
2014-07-13 19:07:24 +00:00
}
for i, s := range l.subs {
2014-07-13 19:07:24 +00:00
if s.mask&t != 0 {
e.SubscriptionID = l.nextSubscriptionIDs[i]
l.nextSubscriptionIDs[i]++
2014-07-13 19:07:24 +00:00
select {
case s.events <- e:
default:
// if s.events is not ready, drop the event
2014-07-13 19:07:24 +00:00
}
}
}
l.mutex.Unlock()
}
func (l *Logger) Subscribe(mask EventType) *Subscription {
l.mutex.Lock()
Implement facility based logger, debugging via REST API This implements a new debug/trace infrastructure based on a slightly hacked up logger. Instead of the traditional "if debug { ... }" I've rewritten the logger to have no-op Debugln and Debugf, unless debugging has been enabled for a given "facility". The "facility" is just a string, typically a package name. This will be slightly slower than before; but not that much as it's mostly a function call that returns immediately. For the cases where it matters (the Debugln takes a hex.Dump() of something for example, and it's not in a very occasional "if err != nil" branch) there is an l.ShouldDebug(facility) that is fast enough to be used like the old "if debug". The point of all this is that we can now toggle debugging for the various packages on and off at runtime. There's a new method /rest/system/debug that can be POSTed a set of facilities to enable and disable debug for, or GET from to get a list of facilities with descriptions and their current debug status. Similarly a /rest/system/log?since=... can grab the latest log entries, up to 250 of them (hardcoded constant in main.go) plus the initial few. Not implemented in this commit (but planned) is a simple debug GUI available on /debug that shows the current log in an easily pasteable format and has checkboxes to enable the various debug facilities. The debug instructions to a user then becomes "visit this URL, check these boxes, reproduce your problem, copy and paste the log". The actual log viewer on the hypothetical /debug URL can poll regularly for new log entries and this bypass the 250 line limit. The existing STTRACE=foo variable is still obeyed and just sets the start state of the system.
2015-10-03 15:25:21 +00:00
dl.Debugln("subscribe", mask)
2014-07-13 19:07:24 +00:00
s := &Subscription{
mask: mask,
events: make(chan Event, BufferSize),
timeout: time.NewTimer(0),
2014-07-13 19:07:24 +00:00
}
// We need to create the timeout timer in the stopped, non-fired state so
// that Subscription.Poll() can safely reset it and select on the timeout
// channel. This ensures the timer is stopped and the channel drained.
if runningTests {
// Make the behavior stable when running tests to avoid randomly
// varying test coverage. This ensures, in practice if not in
// theory, that the timer fires and we take the true branch of the
// next if.
runtime.Gosched()
}
if !s.timeout.Stop() {
<-s.timeout.C
}
l.subs = append(l.subs, s)
l.nextSubscriptionIDs = append(l.nextSubscriptionIDs, 1)
2014-07-13 19:07:24 +00:00
l.mutex.Unlock()
return s
}
func (l *Logger) Unsubscribe(s *Subscription) {
l.mutex.Lock()
Implement facility based logger, debugging via REST API This implements a new debug/trace infrastructure based on a slightly hacked up logger. Instead of the traditional "if debug { ... }" I've rewritten the logger to have no-op Debugln and Debugf, unless debugging has been enabled for a given "facility". The "facility" is just a string, typically a package name. This will be slightly slower than before; but not that much as it's mostly a function call that returns immediately. For the cases where it matters (the Debugln takes a hex.Dump() of something for example, and it's not in a very occasional "if err != nil" branch) there is an l.ShouldDebug(facility) that is fast enough to be used like the old "if debug". The point of all this is that we can now toggle debugging for the various packages on and off at runtime. There's a new method /rest/system/debug that can be POSTed a set of facilities to enable and disable debug for, or GET from to get a list of facilities with descriptions and their current debug status. Similarly a /rest/system/log?since=... can grab the latest log entries, up to 250 of them (hardcoded constant in main.go) plus the initial few. Not implemented in this commit (but planned) is a simple debug GUI available on /debug that shows the current log in an easily pasteable format and has checkboxes to enable the various debug facilities. The debug instructions to a user then becomes "visit this URL, check these boxes, reproduce your problem, copy and paste the log". The actual log viewer on the hypothetical /debug URL can poll regularly for new log entries and this bypass the 250 line limit. The existing STTRACE=foo variable is still obeyed and just sets the start state of the system.
2015-10-03 15:25:21 +00:00
dl.Debugln("unsubscribe")
for i, ss := range l.subs {
if s == ss {
last := len(l.subs) - 1
l.subs[i] = l.subs[last]
l.subs[last] = nil
l.subs = l.subs[:last]
l.nextSubscriptionIDs[i] = l.nextSubscriptionIDs[last]
l.nextSubscriptionIDs[last] = 0
l.nextSubscriptionIDs = l.nextSubscriptionIDs[:last]
break
}
}
2014-07-13 19:07:24 +00:00
close(s.events)
l.mutex.Unlock()
}
// Poll returns an event from the subscription or an error if the poll times
// out of the event channel is closed. Poll should not be called concurrently
// from multiple goroutines for a single subscription.
2014-07-13 19:07:24 +00:00
func (s *Subscription) Poll(timeout time.Duration) (Event, error) {
Implement facility based logger, debugging via REST API This implements a new debug/trace infrastructure based on a slightly hacked up logger. Instead of the traditional "if debug { ... }" I've rewritten the logger to have no-op Debugln and Debugf, unless debugging has been enabled for a given "facility". The "facility" is just a string, typically a package name. This will be slightly slower than before; but not that much as it's mostly a function call that returns immediately. For the cases where it matters (the Debugln takes a hex.Dump() of something for example, and it's not in a very occasional "if err != nil" branch) there is an l.ShouldDebug(facility) that is fast enough to be used like the old "if debug". The point of all this is that we can now toggle debugging for the various packages on and off at runtime. There's a new method /rest/system/debug that can be POSTed a set of facilities to enable and disable debug for, or GET from to get a list of facilities with descriptions and their current debug status. Similarly a /rest/system/log?since=... can grab the latest log entries, up to 250 of them (hardcoded constant in main.go) plus the initial few. Not implemented in this commit (but planned) is a simple debug GUI available on /debug that shows the current log in an easily pasteable format and has checkboxes to enable the various debug facilities. The debug instructions to a user then becomes "visit this URL, check these boxes, reproduce your problem, copy and paste the log". The actual log viewer on the hypothetical /debug URL can poll regularly for new log entries and this bypass the 250 line limit. The existing STTRACE=foo variable is still obeyed and just sets the start state of the system.
2015-10-03 15:25:21 +00:00
dl.Debugln("poll", timeout)
2014-07-25 12:50:14 +00:00
s.timeout.Reset(timeout)
2014-07-13 19:07:24 +00:00
select {
case e, ok := <-s.events:
if !ok {
return e, ErrClosed
}
if runningTests {
// Make the behavior stable when running tests to avoid randomly
// varying test coverage. This ensures, in practice if not in
// theory, that the timer fires and we take the true branch of
// the next if.
s.timeout.Reset(0)
runtime.Gosched()
}
if !s.timeout.Stop() {
// The timeout must be stopped and possibly drained to be ready
// for reuse in the next call.
<-s.timeout.C
}
2014-07-13 19:07:24 +00:00
return e, nil
case <-s.timeout.C:
2014-07-13 19:07:24 +00:00
return Event{}, ErrTimeout
}
}
func (s *Subscription) C() <-chan Event {
return s.events
}
type bufferedSubscription struct {
2014-07-13 19:07:24 +00:00
sub *Subscription
buf []Event
next int
cur int // Current SubscriptionID
2014-07-13 19:07:24 +00:00
mut sync.Mutex
2015-04-22 22:54:31 +00:00
cond *stdsync.Cond
2014-07-13 19:07:24 +00:00
}
type BufferedSubscription interface {
Since(id int, into []Event) []Event
}
func NewBufferedSubscription(s *Subscription, size int) BufferedSubscription {
bs := &bufferedSubscription{
2014-07-13 19:07:24 +00:00
sub: s,
buf: make([]Event, size),
2015-04-22 22:54:31 +00:00
mut: sync.NewMutex(),
2014-07-13 19:07:24 +00:00
}
2015-04-22 22:54:31 +00:00
bs.cond = stdsync.NewCond(bs.mut)
2014-07-13 19:07:24 +00:00
go bs.pollingLoop()
return bs
}
func (s *bufferedSubscription) pollingLoop() {
2014-07-13 19:07:24 +00:00
for {
ev, err := s.sub.Poll(60 * time.Second)
if err == ErrTimeout {
continue
}
if err == ErrClosed {
return
}
if err != nil {
panic("unexpected error: " + err.Error())
}
s.mut.Lock()
s.buf[s.next] = ev
s.next = (s.next + 1) % len(s.buf)
s.cur = ev.SubscriptionID
2014-07-13 19:07:24 +00:00
s.cond.Broadcast()
s.mut.Unlock()
}
}
func (s *bufferedSubscription) Since(id int, into []Event) []Event {
2014-07-13 19:07:24 +00:00
s.mut.Lock()
defer s.mut.Unlock()
for id >= s.cur {
s.cond.Wait()
}
for i := s.next; i < len(s.buf); i++ {
if s.buf[i].SubscriptionID > id {
2014-07-13 19:07:24 +00:00
into = append(into, s.buf[i])
}
}
for i := 0; i < s.next; i++ {
if s.buf[i].SubscriptionID > id {
2014-07-13 19:07:24 +00:00
into = append(into, s.buf[i])
}
}
return into
}
// Error returns a string pointer suitable for JSON marshalling errors. It
2015-11-12 02:20:34 +00:00
// retains the "null on success" semantics, but ensures the error result is a
// string regardless of the underlying concrete error type.
func Error(err error) *string {
if err == nil {
return nil
}
str := err.Error()
return &str
}