mirror of
https://github.com/octoleo/syncthing.git
synced 2024-11-13 00:36:28 +00:00
fdcbd54cd7
Instead of just immediately dropping the event if the subscription isn't ready to receive it, give it 15 ms to catch up. The value 15 ms is grabbed out of thin air - it just seems reasonable to me. The timer juggling makes the event send pretty much exactly twice as slow as it was before, but we're still under a microsecond. I think it's negligible compared to whatever event that just happened that we're interested in logging (usually a file operation of some kind). benchmark old ns/op new ns/op delta BenchmarkBufferedSub-8 475 950 +100.00% benchmark old allocs new allocs delta BenchmarkBufferedSub-8 4 4 +0.00% benchmark old bytes new bytes delta BenchmarkBufferedSub-8 104 117 +12.50% GitHub-Pull-Request: https://github.com/syncthing/syncthing/pull/3960
369 lines
8.1 KiB
Go
369 lines
8.1 KiB
Go
// Copyright (C) 2014 The Syncthing Authors.
|
|
//
|
|
// This Source Code Form is subject to the terms of the Mozilla Public
|
|
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
|
|
// You can obtain one at http://mozilla.org/MPL/2.0/.
|
|
|
|
// Package events provides event subscription and polling functionality.
|
|
package events
|
|
|
|
import (
|
|
"errors"
|
|
"runtime"
|
|
"time"
|
|
|
|
"github.com/syncthing/syncthing/lib/sync"
|
|
)
|
|
|
|
type EventType int
|
|
|
|
const (
|
|
Starting EventType = 1 << iota
|
|
StartupComplete
|
|
DeviceDiscovered
|
|
DeviceConnected
|
|
DeviceDisconnected
|
|
DeviceRejected
|
|
DevicePaused
|
|
DeviceResumed
|
|
LocalChangeDetected
|
|
RemoteChangeDetected
|
|
LocalIndexUpdated
|
|
RemoteIndexUpdated
|
|
ItemStarted
|
|
ItemFinished
|
|
StateChanged
|
|
FolderRejected
|
|
ConfigSaved
|
|
DownloadProgress
|
|
RemoteDownloadProgress
|
|
FolderSummary
|
|
FolderCompletion
|
|
FolderErrors
|
|
FolderScanProgress
|
|
FolderPaused
|
|
FolderResumed
|
|
ListenAddressesChanged
|
|
LoginAttempt
|
|
|
|
AllEvents = (1 << iota) - 1
|
|
)
|
|
|
|
var runningTests = false
|
|
|
|
const eventLogTimeout = 15 * time.Millisecond
|
|
|
|
func (t EventType) String() string {
|
|
switch t {
|
|
case Starting:
|
|
return "Starting"
|
|
case StartupComplete:
|
|
return "StartupComplete"
|
|
case DeviceDiscovered:
|
|
return "DeviceDiscovered"
|
|
case DeviceConnected:
|
|
return "DeviceConnected"
|
|
case DeviceDisconnected:
|
|
return "DeviceDisconnected"
|
|
case DeviceRejected:
|
|
return "DeviceRejected"
|
|
case LocalChangeDetected:
|
|
return "LocalChangeDetected"
|
|
case RemoteChangeDetected:
|
|
return "RemoteChangeDetected"
|
|
case LocalIndexUpdated:
|
|
return "LocalIndexUpdated"
|
|
case RemoteIndexUpdated:
|
|
return "RemoteIndexUpdated"
|
|
case ItemStarted:
|
|
return "ItemStarted"
|
|
case ItemFinished:
|
|
return "ItemFinished"
|
|
case StateChanged:
|
|
return "StateChanged"
|
|
case FolderRejected:
|
|
return "FolderRejected"
|
|
case ConfigSaved:
|
|
return "ConfigSaved"
|
|
case DownloadProgress:
|
|
return "DownloadProgress"
|
|
case RemoteDownloadProgress:
|
|
return "RemoteDownloadProgress"
|
|
case FolderSummary:
|
|
return "FolderSummary"
|
|
case FolderCompletion:
|
|
return "FolderCompletion"
|
|
case FolderErrors:
|
|
return "FolderErrors"
|
|
case DevicePaused:
|
|
return "DevicePaused"
|
|
case DeviceResumed:
|
|
return "DeviceResumed"
|
|
case FolderScanProgress:
|
|
return "FolderScanProgress"
|
|
case FolderPaused:
|
|
return "FolderPaused"
|
|
case FolderResumed:
|
|
return "FolderResumed"
|
|
case ListenAddressesChanged:
|
|
return "ListenAddressesChanged"
|
|
case LoginAttempt:
|
|
return "LoginAttempt"
|
|
default:
|
|
return "Unknown"
|
|
}
|
|
}
|
|
|
|
func (t EventType) MarshalText() ([]byte, error) {
|
|
return []byte(t.String()), nil
|
|
}
|
|
|
|
const BufferSize = 64
|
|
|
|
type Logger struct {
|
|
subs []*Subscription
|
|
nextSubscriptionIDs []int
|
|
nextGlobalID int
|
|
timeout *time.Timer
|
|
mutex sync.Mutex
|
|
}
|
|
|
|
type Event struct {
|
|
// Per-subscription sequential event ID. Named "id" for backwards compatibility with the REST API
|
|
SubscriptionID int `json:"id"`
|
|
// Global ID of the event across all subscriptions
|
|
GlobalID int `json:"globalID"`
|
|
Time time.Time `json:"time"`
|
|
Type EventType `json:"type"`
|
|
Data interface{} `json:"data"`
|
|
}
|
|
|
|
type Subscription struct {
|
|
mask EventType
|
|
events chan Event
|
|
timeout *time.Timer
|
|
}
|
|
|
|
var Default = NewLogger()
|
|
|
|
var (
|
|
ErrTimeout = errors.New("timeout")
|
|
ErrClosed = errors.New("closed")
|
|
)
|
|
|
|
func NewLogger() *Logger {
|
|
l := &Logger{
|
|
mutex: sync.NewMutex(),
|
|
timeout: time.NewTimer(time.Second),
|
|
}
|
|
// Make sure the timer is in the stopped state and hasn't fired anything
|
|
// into the channel.
|
|
if !l.timeout.Stop() {
|
|
<-l.timeout.C
|
|
}
|
|
return l
|
|
}
|
|
|
|
func (l *Logger) Log(t EventType, data interface{}) {
|
|
l.mutex.Lock()
|
|
dl.Debugln("log", l.nextGlobalID, t, data)
|
|
l.nextGlobalID++
|
|
|
|
e := Event{
|
|
GlobalID: l.nextGlobalID,
|
|
Time: time.Now(),
|
|
Type: t,
|
|
Data: data,
|
|
}
|
|
|
|
for i, s := range l.subs {
|
|
if s.mask&t != 0 {
|
|
e.SubscriptionID = l.nextSubscriptionIDs[i]
|
|
l.nextSubscriptionIDs[i]++
|
|
|
|
l.timeout.Reset(eventLogTimeout)
|
|
timedOut := false
|
|
|
|
select {
|
|
case s.events <- e:
|
|
case <-l.timeout.C:
|
|
// if s.events is not ready, drop the event
|
|
timedOut = true
|
|
}
|
|
|
|
// If stop returns false it already sent something to the
|
|
// channel. If we didn't already read it above we must do so now
|
|
// or we get a spurious timeout on the next loop.
|
|
if !l.timeout.Stop() && !timedOut {
|
|
<-l.timeout.C
|
|
}
|
|
}
|
|
}
|
|
l.mutex.Unlock()
|
|
}
|
|
|
|
func (l *Logger) Subscribe(mask EventType) *Subscription {
|
|
l.mutex.Lock()
|
|
dl.Debugln("subscribe", mask)
|
|
|
|
s := &Subscription{
|
|
mask: mask,
|
|
events: make(chan Event, BufferSize),
|
|
timeout: time.NewTimer(0),
|
|
}
|
|
|
|
// We need to create the timeout timer in the stopped, non-fired state so
|
|
// that Subscription.Poll() can safely reset it and select on the timeout
|
|
// channel. This ensures the timer is stopped and the channel drained.
|
|
if runningTests {
|
|
// Make the behavior stable when running tests to avoid randomly
|
|
// varying test coverage. This ensures, in practice if not in
|
|
// theory, that the timer fires and we take the true branch of the
|
|
// next if.
|
|
runtime.Gosched()
|
|
}
|
|
if !s.timeout.Stop() {
|
|
<-s.timeout.C
|
|
}
|
|
|
|
l.subs = append(l.subs, s)
|
|
l.nextSubscriptionIDs = append(l.nextSubscriptionIDs, 1)
|
|
l.mutex.Unlock()
|
|
return s
|
|
}
|
|
|
|
func (l *Logger) Unsubscribe(s *Subscription) {
|
|
l.mutex.Lock()
|
|
dl.Debugln("unsubscribe")
|
|
for i, ss := range l.subs {
|
|
if s == ss {
|
|
last := len(l.subs) - 1
|
|
|
|
l.subs[i] = l.subs[last]
|
|
l.subs[last] = nil
|
|
l.subs = l.subs[:last]
|
|
|
|
l.nextSubscriptionIDs[i] = l.nextSubscriptionIDs[last]
|
|
l.nextSubscriptionIDs[last] = 0
|
|
l.nextSubscriptionIDs = l.nextSubscriptionIDs[:last]
|
|
|
|
break
|
|
}
|
|
}
|
|
close(s.events)
|
|
l.mutex.Unlock()
|
|
}
|
|
|
|
// Poll returns an event from the subscription or an error if the poll times
|
|
// out of the event channel is closed. Poll should not be called concurrently
|
|
// from multiple goroutines for a single subscription.
|
|
func (s *Subscription) Poll(timeout time.Duration) (Event, error) {
|
|
dl.Debugln("poll", timeout)
|
|
|
|
s.timeout.Reset(timeout)
|
|
|
|
select {
|
|
case e, ok := <-s.events:
|
|
if !ok {
|
|
return e, ErrClosed
|
|
}
|
|
if runningTests {
|
|
// Make the behavior stable when running tests to avoid randomly
|
|
// varying test coverage. This ensures, in practice if not in
|
|
// theory, that the timer fires and we take the true branch of
|
|
// the next if.
|
|
s.timeout.Reset(0)
|
|
runtime.Gosched()
|
|
}
|
|
if !s.timeout.Stop() {
|
|
// The timeout must be stopped and possibly drained to be ready
|
|
// for reuse in the next call.
|
|
<-s.timeout.C
|
|
}
|
|
return e, nil
|
|
case <-s.timeout.C:
|
|
return Event{}, ErrTimeout
|
|
}
|
|
}
|
|
|
|
func (s *Subscription) C() <-chan Event {
|
|
return s.events
|
|
}
|
|
|
|
type bufferedSubscription struct {
|
|
sub *Subscription
|
|
buf []Event
|
|
next int
|
|
cur int // Current SubscriptionID
|
|
mut sync.Mutex
|
|
cond *sync.TimeoutCond
|
|
}
|
|
|
|
type BufferedSubscription interface {
|
|
Since(id int, into []Event, timeout time.Duration) []Event
|
|
}
|
|
|
|
func NewBufferedSubscription(s *Subscription, size int) BufferedSubscription {
|
|
bs := &bufferedSubscription{
|
|
sub: s,
|
|
buf: make([]Event, size),
|
|
mut: sync.NewMutex(),
|
|
}
|
|
bs.cond = sync.NewTimeoutCond(bs.mut)
|
|
go bs.pollingLoop()
|
|
return bs
|
|
}
|
|
|
|
func (s *bufferedSubscription) pollingLoop() {
|
|
for ev := range s.sub.C() {
|
|
s.mut.Lock()
|
|
s.buf[s.next] = ev
|
|
s.next = (s.next + 1) % len(s.buf)
|
|
s.cur = ev.SubscriptionID
|
|
s.cond.Broadcast()
|
|
s.mut.Unlock()
|
|
}
|
|
}
|
|
|
|
func (s *bufferedSubscription) Since(id int, into []Event, timeout time.Duration) []Event {
|
|
s.mut.Lock()
|
|
defer s.mut.Unlock()
|
|
|
|
// Check once first before generating the TimeoutCondWaiter
|
|
if id >= s.cur {
|
|
waiter := s.cond.SetupWait(timeout)
|
|
defer waiter.Stop()
|
|
|
|
for id >= s.cur {
|
|
if eventsAvailable := waiter.Wait(); !eventsAvailable {
|
|
// Timed out
|
|
return into
|
|
}
|
|
}
|
|
}
|
|
|
|
for i := s.next; i < len(s.buf); i++ {
|
|
if s.buf[i].SubscriptionID > id {
|
|
into = append(into, s.buf[i])
|
|
}
|
|
}
|
|
for i := 0; i < s.next; i++ {
|
|
if s.buf[i].SubscriptionID > id {
|
|
into = append(into, s.buf[i])
|
|
}
|
|
}
|
|
|
|
return into
|
|
}
|
|
|
|
// Error returns a string pointer suitable for JSON marshalling errors. It
|
|
// retains the "null on success" semantics, but ensures the error result is a
|
|
// string regardless of the underlying concrete error type.
|
|
func Error(err error) *string {
|
|
if err == nil {
|
|
return nil
|
|
}
|
|
str := err.Error()
|
|
return &str
|
|
}
|