mirror of
https://github.com/octoleo/syncthing.git
synced 2025-01-22 14:48:30 +00:00
580 lines
13 KiB
Go
580 lines
13 KiB
Go
// Copyright (C) 2014 The Syncthing Authors.
|
|
//
|
|
// This Source Code Form is subject to the terms of the Mozilla Public
|
|
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
|
|
// You can obtain one at https://mozilla.org/MPL/2.0/.
|
|
|
|
//go:generate -command counterfeiter go run github.com/maxbrunsfeld/counterfeiter/v6
|
|
//go:generate counterfeiter -o mocks/buffered_subscription.go --fake-name BufferedSubscription . BufferedSubscription
|
|
|
|
// Package events provides event subscription and polling functionality.
|
|
package events
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"runtime"
|
|
"time"
|
|
|
|
"github.com/thejerf/suture/v4"
|
|
|
|
"github.com/syncthing/syncthing/lib/sync"
|
|
)
|
|
|
|
type EventType int64
|
|
|
|
const (
|
|
Starting EventType = 1 << iota
|
|
StartupComplete
|
|
DeviceDiscovered
|
|
DeviceConnected
|
|
DeviceDisconnected
|
|
DeviceRejected // DEPRECATED, superseded by PendingDevicesChanged
|
|
PendingDevicesChanged
|
|
DevicePaused
|
|
DeviceResumed
|
|
ClusterConfigReceived
|
|
LocalChangeDetected
|
|
RemoteChangeDetected
|
|
LocalIndexUpdated
|
|
RemoteIndexUpdated
|
|
ItemStarted
|
|
ItemFinished
|
|
StateChanged
|
|
FolderRejected // DEPRECATED, superseded by PendingFoldersChanged
|
|
PendingFoldersChanged
|
|
ConfigSaved
|
|
DownloadProgress
|
|
RemoteDownloadProgress
|
|
FolderSummary
|
|
FolderCompletion
|
|
FolderErrors
|
|
FolderScanProgress
|
|
FolderPaused
|
|
FolderResumed
|
|
FolderWatchStateChanged
|
|
ListenAddressesChanged
|
|
LoginAttempt
|
|
Failure
|
|
|
|
AllEvents = (1 << iota) - 1
|
|
)
|
|
|
|
var (
|
|
runningTests = false
|
|
errNoop = errors.New("method of a noop object called")
|
|
)
|
|
|
|
const eventLogTimeout = 15 * time.Millisecond
|
|
|
|
func (t EventType) String() string {
|
|
switch t {
|
|
case Starting:
|
|
return "Starting"
|
|
case StartupComplete:
|
|
return "StartupComplete"
|
|
case DeviceDiscovered:
|
|
return "DeviceDiscovered"
|
|
case DeviceConnected:
|
|
return "DeviceConnected"
|
|
case DeviceDisconnected:
|
|
return "DeviceDisconnected"
|
|
case DeviceRejected:
|
|
return "DeviceRejected"
|
|
case PendingDevicesChanged:
|
|
return "PendingDevicesChanged"
|
|
case LocalChangeDetected:
|
|
return "LocalChangeDetected"
|
|
case RemoteChangeDetected:
|
|
return "RemoteChangeDetected"
|
|
case LocalIndexUpdated:
|
|
return "LocalIndexUpdated"
|
|
case RemoteIndexUpdated:
|
|
return "RemoteIndexUpdated"
|
|
case ItemStarted:
|
|
return "ItemStarted"
|
|
case ItemFinished:
|
|
return "ItemFinished"
|
|
case StateChanged:
|
|
return "StateChanged"
|
|
case FolderRejected:
|
|
return "FolderRejected"
|
|
case PendingFoldersChanged:
|
|
return "PendingFoldersChanged"
|
|
case ConfigSaved:
|
|
return "ConfigSaved"
|
|
case DownloadProgress:
|
|
return "DownloadProgress"
|
|
case RemoteDownloadProgress:
|
|
return "RemoteDownloadProgress"
|
|
case FolderSummary:
|
|
return "FolderSummary"
|
|
case FolderCompletion:
|
|
return "FolderCompletion"
|
|
case FolderErrors:
|
|
return "FolderErrors"
|
|
case DevicePaused:
|
|
return "DevicePaused"
|
|
case DeviceResumed:
|
|
return "DeviceResumed"
|
|
case ClusterConfigReceived:
|
|
return "ClusterConfigReceived"
|
|
case FolderScanProgress:
|
|
return "FolderScanProgress"
|
|
case FolderPaused:
|
|
return "FolderPaused"
|
|
case FolderResumed:
|
|
return "FolderResumed"
|
|
case ListenAddressesChanged:
|
|
return "ListenAddressesChanged"
|
|
case LoginAttempt:
|
|
return "LoginAttempt"
|
|
case FolderWatchStateChanged:
|
|
return "FolderWatchStateChanged"
|
|
case Failure:
|
|
return "Failure"
|
|
default:
|
|
return "Unknown"
|
|
}
|
|
}
|
|
|
|
func (t EventType) MarshalText() ([]byte, error) {
|
|
return []byte(t.String()), nil
|
|
}
|
|
|
|
func (t *EventType) UnmarshalJSON(b []byte) error {
|
|
var s string
|
|
|
|
if err := json.Unmarshal(b, &s); err != nil {
|
|
return err
|
|
}
|
|
|
|
*t = UnmarshalEventType(s)
|
|
|
|
return nil
|
|
}
|
|
|
|
func UnmarshalEventType(s string) EventType {
|
|
switch s {
|
|
case "Starting":
|
|
return Starting
|
|
case "StartupComplete":
|
|
return StartupComplete
|
|
case "DeviceDiscovered":
|
|
return DeviceDiscovered
|
|
case "DeviceConnected":
|
|
return DeviceConnected
|
|
case "DeviceDisconnected":
|
|
return DeviceDisconnected
|
|
case "DeviceRejected":
|
|
return DeviceRejected
|
|
case "PendingDevicesChanged":
|
|
return PendingDevicesChanged
|
|
case "LocalChangeDetected":
|
|
return LocalChangeDetected
|
|
case "RemoteChangeDetected":
|
|
return RemoteChangeDetected
|
|
case "LocalIndexUpdated":
|
|
return LocalIndexUpdated
|
|
case "RemoteIndexUpdated":
|
|
return RemoteIndexUpdated
|
|
case "ItemStarted":
|
|
return ItemStarted
|
|
case "ItemFinished":
|
|
return ItemFinished
|
|
case "StateChanged":
|
|
return StateChanged
|
|
case "FolderRejected":
|
|
return FolderRejected
|
|
case "PendingFoldersChanged":
|
|
return PendingFoldersChanged
|
|
case "ConfigSaved":
|
|
return ConfigSaved
|
|
case "DownloadProgress":
|
|
return DownloadProgress
|
|
case "RemoteDownloadProgress":
|
|
return RemoteDownloadProgress
|
|
case "FolderSummary":
|
|
return FolderSummary
|
|
case "FolderCompletion":
|
|
return FolderCompletion
|
|
case "FolderErrors":
|
|
return FolderErrors
|
|
case "DevicePaused":
|
|
return DevicePaused
|
|
case "DeviceResumed":
|
|
return DeviceResumed
|
|
case "ClusterConfigReceived":
|
|
return ClusterConfigReceived
|
|
case "FolderScanProgress":
|
|
return FolderScanProgress
|
|
case "FolderPaused":
|
|
return FolderPaused
|
|
case "FolderResumed":
|
|
return FolderResumed
|
|
case "ListenAddressesChanged":
|
|
return ListenAddressesChanged
|
|
case "LoginAttempt":
|
|
return LoginAttempt
|
|
case "FolderWatchStateChanged":
|
|
return FolderWatchStateChanged
|
|
case "Failure":
|
|
return Failure
|
|
default:
|
|
return 0
|
|
}
|
|
}
|
|
|
|
const BufferSize = 64
|
|
|
|
type Logger interface {
|
|
suture.Service
|
|
Log(t EventType, data interface{})
|
|
Subscribe(mask EventType) Subscription
|
|
}
|
|
|
|
type logger struct {
|
|
subs []*subscription
|
|
nextSubscriptionIDs []int
|
|
nextGlobalID int
|
|
timeout *time.Timer
|
|
events chan Event
|
|
funcs chan func(context.Context)
|
|
toUnsubscribe chan *subscription
|
|
}
|
|
|
|
type Event struct {
|
|
// Per-subscription sequential event ID. Named "id" for backwards compatibility with the REST API
|
|
SubscriptionID int `json:"id"`
|
|
// Global ID of the event across all subscriptions
|
|
GlobalID int `json:"globalID"`
|
|
Time time.Time `json:"time"`
|
|
Type EventType `json:"type"`
|
|
Data interface{} `json:"data"`
|
|
}
|
|
|
|
type Subscription interface {
|
|
C() <-chan Event
|
|
Poll(timeout time.Duration) (Event, error)
|
|
Mask() EventType
|
|
Unsubscribe()
|
|
}
|
|
|
|
type subscription struct {
|
|
mask EventType
|
|
events chan Event
|
|
toUnsubscribe chan *subscription
|
|
timeout *time.Timer
|
|
ctx context.Context
|
|
}
|
|
|
|
var (
|
|
ErrTimeout = errors.New("timeout")
|
|
ErrClosed = errors.New("closed")
|
|
)
|
|
|
|
func NewLogger() Logger {
|
|
l := &logger{
|
|
timeout: time.NewTimer(time.Second),
|
|
events: make(chan Event, BufferSize),
|
|
funcs: make(chan func(context.Context)),
|
|
toUnsubscribe: make(chan *subscription),
|
|
}
|
|
// Make sure the timer is in the stopped state and hasn't fired anything
|
|
// into the channel.
|
|
if !l.timeout.Stop() {
|
|
<-l.timeout.C
|
|
}
|
|
return l
|
|
}
|
|
|
|
func (l *logger) Serve(ctx context.Context) error {
|
|
loop:
|
|
for {
|
|
select {
|
|
case e := <-l.events:
|
|
// Incoming events get sent
|
|
l.sendEvent(e)
|
|
metricEvents.WithLabelValues(e.Type.String(), metricEventStateCreated).Inc()
|
|
|
|
case fn := <-l.funcs:
|
|
// Subscriptions are handled here.
|
|
fn(ctx)
|
|
|
|
case s := <-l.toUnsubscribe:
|
|
l.unsubscribe(s)
|
|
|
|
case <-ctx.Done():
|
|
break loop
|
|
}
|
|
}
|
|
|
|
// Closing the event channels corresponds to what happens when a
|
|
// subscription is unsubscribed; this stops any BufferedSubscription,
|
|
// makes Poll() return ErrClosed, etc.
|
|
for _, s := range l.subs {
|
|
close(s.events)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (l *logger) Log(t EventType, data interface{}) {
|
|
l.events <- Event{
|
|
Time: time.Now(), // intentionally high precision
|
|
Type: t,
|
|
Data: data,
|
|
// SubscriptionID and GlobalID are set in sendEvent
|
|
}
|
|
}
|
|
|
|
func (l *logger) sendEvent(e Event) {
|
|
l.nextGlobalID++
|
|
dl.Debugln("log", l.nextGlobalID, e.Type, e.Data)
|
|
|
|
e.GlobalID = l.nextGlobalID
|
|
|
|
for i, s := range l.subs {
|
|
if s.mask&e.Type != 0 {
|
|
e.SubscriptionID = l.nextSubscriptionIDs[i]
|
|
l.nextSubscriptionIDs[i]++
|
|
|
|
l.timeout.Reset(eventLogTimeout)
|
|
timedOut := false
|
|
|
|
select {
|
|
case s.events <- e:
|
|
metricEvents.WithLabelValues(e.Type.String(), metricEventStateDelivered).Inc()
|
|
case <-l.timeout.C:
|
|
// if s.events is not ready, drop the event
|
|
timedOut = true
|
|
metricEvents.WithLabelValues(e.Type.String(), metricEventStateDropped).Inc()
|
|
}
|
|
|
|
// If stop returns false it already sent something to the
|
|
// channel. If we didn't already read it above we must do so now
|
|
// or we get a spurious timeout on the next loop.
|
|
if !l.timeout.Stop() && !timedOut {
|
|
<-l.timeout.C
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (l *logger) Subscribe(mask EventType) Subscription {
|
|
res := make(chan Subscription)
|
|
l.funcs <- func(ctx context.Context) {
|
|
dl.Debugln("subscribe", mask)
|
|
|
|
s := &subscription{
|
|
mask: mask,
|
|
events: make(chan Event, BufferSize),
|
|
toUnsubscribe: l.toUnsubscribe,
|
|
timeout: time.NewTimer(0),
|
|
ctx: ctx,
|
|
}
|
|
|
|
// We need to create the timeout timer in the stopped, non-fired state so
|
|
// that Subscription.Poll() can safely reset it and select on the timeout
|
|
// channel. This ensures the timer is stopped and the channel drained.
|
|
if runningTests {
|
|
// Make the behavior stable when running tests to avoid randomly
|
|
// varying test coverage. This ensures, in practice if not in
|
|
// theory, that the timer fires and we take the true branch of the
|
|
// next if.
|
|
runtime.Gosched()
|
|
}
|
|
if !s.timeout.Stop() {
|
|
<-s.timeout.C
|
|
}
|
|
|
|
l.subs = append(l.subs, s)
|
|
l.nextSubscriptionIDs = append(l.nextSubscriptionIDs, 1)
|
|
res <- s
|
|
}
|
|
return <-res
|
|
}
|
|
|
|
func (l *logger) unsubscribe(s *subscription) {
|
|
dl.Debugln("unsubscribe", s.mask)
|
|
for i, ss := range l.subs {
|
|
if s == ss {
|
|
last := len(l.subs) - 1
|
|
|
|
l.subs[i] = l.subs[last]
|
|
l.subs[last] = nil
|
|
l.subs = l.subs[:last]
|
|
|
|
l.nextSubscriptionIDs[i] = l.nextSubscriptionIDs[last]
|
|
l.nextSubscriptionIDs[last] = 0
|
|
l.nextSubscriptionIDs = l.nextSubscriptionIDs[:last]
|
|
|
|
break
|
|
}
|
|
}
|
|
close(s.events)
|
|
}
|
|
|
|
func (l *logger) String() string {
|
|
return fmt.Sprintf("events.Logger/@%p", l)
|
|
}
|
|
|
|
// Poll returns an event from the subscription or an error if the poll times
|
|
// out of the event channel is closed. Poll should not be called concurrently
|
|
// from multiple goroutines for a single subscription.
|
|
func (s *subscription) Poll(timeout time.Duration) (Event, error) {
|
|
dl.Debugln("poll", timeout)
|
|
|
|
s.timeout.Reset(timeout)
|
|
|
|
select {
|
|
case e, ok := <-s.events:
|
|
if !ok {
|
|
return e, ErrClosed
|
|
}
|
|
if runningTests {
|
|
// Make the behavior stable when running tests to avoid randomly
|
|
// varying test coverage. This ensures, in practice if not in
|
|
// theory, that the timer fires and we take the true branch of
|
|
// the next if.
|
|
s.timeout.Reset(0)
|
|
runtime.Gosched()
|
|
}
|
|
if !s.timeout.Stop() {
|
|
// The timeout must be stopped and possibly drained to be ready
|
|
// for reuse in the next call.
|
|
<-s.timeout.C
|
|
}
|
|
return e, nil
|
|
case <-s.timeout.C:
|
|
return Event{}, ErrTimeout
|
|
}
|
|
}
|
|
|
|
func (s *subscription) C() <-chan Event {
|
|
return s.events
|
|
}
|
|
|
|
func (s *subscription) Mask() EventType {
|
|
return s.mask
|
|
}
|
|
|
|
func (s *subscription) Unsubscribe() {
|
|
select {
|
|
case s.toUnsubscribe <- s:
|
|
case <-s.ctx.Done():
|
|
}
|
|
}
|
|
|
|
type bufferedSubscription struct {
|
|
sub Subscription
|
|
buf []Event
|
|
next int
|
|
cur int // Current SubscriptionID
|
|
mut sync.Mutex
|
|
cond *sync.TimeoutCond
|
|
}
|
|
|
|
type BufferedSubscription interface {
|
|
Since(id int, into []Event, timeout time.Duration) []Event
|
|
Mask() EventType
|
|
}
|
|
|
|
func NewBufferedSubscription(s Subscription, size int) BufferedSubscription {
|
|
bs := &bufferedSubscription{
|
|
sub: s,
|
|
buf: make([]Event, size),
|
|
mut: sync.NewMutex(),
|
|
}
|
|
bs.cond = sync.NewTimeoutCond(bs.mut)
|
|
go bs.pollingLoop()
|
|
return bs
|
|
}
|
|
|
|
func (s *bufferedSubscription) pollingLoop() {
|
|
for ev := range s.sub.C() {
|
|
s.mut.Lock()
|
|
s.buf[s.next] = ev
|
|
s.next = (s.next + 1) % len(s.buf)
|
|
s.cur = ev.SubscriptionID
|
|
s.cond.Broadcast()
|
|
s.mut.Unlock()
|
|
}
|
|
}
|
|
|
|
func (s *bufferedSubscription) Since(id int, into []Event, timeout time.Duration) []Event {
|
|
s.mut.Lock()
|
|
defer s.mut.Unlock()
|
|
|
|
// Check once first before generating the TimeoutCondWaiter
|
|
if id >= s.cur {
|
|
waiter := s.cond.SetupWait(timeout)
|
|
defer waiter.Stop()
|
|
|
|
for id >= s.cur {
|
|
if eventsAvailable := waiter.Wait(); !eventsAvailable {
|
|
// Timed out
|
|
return into
|
|
}
|
|
}
|
|
}
|
|
|
|
for i := s.next; i < len(s.buf); i++ {
|
|
if s.buf[i].SubscriptionID > id {
|
|
into = append(into, s.buf[i])
|
|
}
|
|
}
|
|
for i := 0; i < s.next; i++ {
|
|
if s.buf[i].SubscriptionID > id {
|
|
into = append(into, s.buf[i])
|
|
}
|
|
}
|
|
|
|
return into
|
|
}
|
|
|
|
func (s *bufferedSubscription) Mask() EventType {
|
|
return s.sub.Mask()
|
|
}
|
|
|
|
// Error returns a string pointer suitable for JSON marshalling errors. It
|
|
// retains the "null on success" semantics, but ensures the error result is a
|
|
// string regardless of the underlying concrete error type.
|
|
func Error(err error) *string {
|
|
if err == nil {
|
|
return nil
|
|
}
|
|
str := err.Error()
|
|
return &str
|
|
}
|
|
|
|
type noopLogger struct{}
|
|
|
|
var NoopLogger Logger = &noopLogger{}
|
|
|
|
func (*noopLogger) Serve(_ context.Context) error { return nil }
|
|
|
|
func (*noopLogger) Log(_ EventType, _ interface{}) {}
|
|
|
|
func (*noopLogger) Subscribe(_ EventType) Subscription {
|
|
return &noopSubscription{}
|
|
}
|
|
|
|
type noopSubscription struct{}
|
|
|
|
func (*noopSubscription) C() <-chan Event {
|
|
return nil
|
|
}
|
|
|
|
func (*noopSubscription) Poll(_ time.Duration) (Event, error) {
|
|
return Event{}, errNoop
|
|
}
|
|
|
|
func (*noopSubscription) Mask() EventType {
|
|
return 0
|
|
}
|
|
|
|
func (*noopSubscription) Unsubscribe() {}
|