2014-11-16 20:13:20 +00:00
|
|
|
// Copyright (C) 2014 The Syncthing Authors.
|
2014-09-29 19:43:32 +00:00
|
|
|
//
|
2015-03-07 20:36:35 +00:00
|
|
|
// This Source Code Form is subject to the terms of the Mozilla Public
|
|
|
|
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
|
2017-02-09 06:52:18 +00:00
|
|
|
// You can obtain one at https://mozilla.org/MPL/2.0/.
|
2014-07-25 12:50:14 +00:00
|
|
|
|
2021-05-08 10:52:06 +00:00
|
|
|
//go:generate -command counterfeiter go run github.com/maxbrunsfeld/counterfeiter/v6
|
2021-03-03 07:53:50 +00:00
|
|
|
//go:generate counterfeiter -o mocks/buffered_subscription.go --fake-name BufferedSubscription . BufferedSubscription
|
|
|
|
|
2014-07-13 19:07:24 +00:00
|
|
|
// Package events provides event subscription and polling functionality.
|
|
|
|
package events
|
|
|
|
|
|
|
|
import (
|
2019-11-21 07:41:15 +00:00
|
|
|
"context"
|
2017-11-22 23:25:55 +00:00
|
|
|
"encoding/json"
|
2014-07-13 19:07:24 +00:00
|
|
|
"errors"
|
2019-11-21 07:41:15 +00:00
|
|
|
"fmt"
|
2016-08-08 18:09:40 +00:00
|
|
|
"runtime"
|
2014-07-13 19:07:24 +00:00
|
|
|
"time"
|
2015-04-22 22:54:31 +00:00
|
|
|
|
2020-11-17 12:19:04 +00:00
|
|
|
"github.com/thejerf/suture/v4"
|
2019-08-15 14:29:37 +00:00
|
|
|
|
2015-08-06 09:29:25 +00:00
|
|
|
"github.com/syncthing/syncthing/lib/sync"
|
2014-07-13 19:07:24 +00:00
|
|
|
)
|
|
|
|
|
2020-10-07 08:05:13 +00:00
|
|
|
type EventType int64
|
2014-07-13 19:07:24 +00:00
|
|
|
|
|
|
|
const (
|
2017-01-31 12:04:29 +00:00
|
|
|
Starting EventType = 1 << iota
|
2014-07-13 19:07:24 +00:00
|
|
|
StartupComplete
|
2014-09-28 11:00:38 +00:00
|
|
|
DeviceDiscovered
|
|
|
|
DeviceConnected
|
|
|
|
DeviceDisconnected
|
lib/model: Forget pending folders no longer announced in ClusterConfig (fixes #5187) (#7205)
* lib/db: Add ExpirePendingFolders().
Use-case is to drop any no-longer-pending folders for a specific
device when parsing its ClusterConfig message where previously offered
folders are not mentioned any more.
The timestamp in ObservedFolder is stored with only second precision,
so round to seconds here as well. This allows calling the function
within the same second of adding or updating entries.
* lib/model: Weed out pending folders when receiving ClusterConfig.
Filter the entries by timestamp, which must be newer than or equal to
the reception time of the ClusterConfig. For just mentioned ones,
this assumption will hold as AddOrUpdatePendingFolder() updates the
timestamp.
* lib/model, gui: Notify when one or more pending folders expired.
Introduce new event type FolderOfferCancelled and use it to trigger a
complete refreshCluster() cycle. Listing individual entries would be
much more code and probably just as much work to answer the API
request.
* lib/model: Add comment and rename ExpirePendingFolders().
* lib/events: Rename FolderOfferCancelled to ClusterPendingChanged.
* lib/model: Reuse ClusterPendingChanged event for cleanPending()
Changing the config does not necessarily mean that the
/resut/cluster/pending endpoints need to be refreshed, but only if
something was actually removed. Detect this and indicate it through
the ClusterPendingChanged event, which is already hooked up to requery
respective endpoints within the GUI.
No more need for a separate refreshCluster() in reaction to
ConfigSaved event or calling refreshConfig().
* lib/model: Gofmt.
* lib/db: Warn instead of info log for failed removal.
* gui: Fix pending notifications not loading on GUI start.
* lib/db: Use short device ID in log message.
* lib/db: Return list of expired folder IDs after deleting them.
* lib/model: Refactor Pending...Changed events.
* lib/model: Adjust format of removed pending folders enumeration.
Use an array of objects with device / folder ID properties, matching
the other places where it's used.
* lib/db: Drop invalid entries in RemovePendingFoldersBeforeTime().
* lib/model: Gofmt.
My local gofmt did not complain here, strangely...
* gui: Handle PendingDevicesChanged event.
Even though it currently only holds one device at a time, wrap the
contents in an array under the "added" property name.
* lib/model: Fix null values in PendingFoldersChanged removed member.
* gui: Handle PendingFoldersChanged event.
* lib/model: Simplify construction of expiredPendingList.
* lib/model: Reduce code duplication in cleanPending().
Use goto and a label for the common parts of calling the DB removal
function and building the event data part.
* lib/events, gui: Mark ...Rejected events deprecated.
Extend comments explaining the conditions when the replacement event
types are emitted.
* lib/model: Wrap removed devices in array of objects as well.
* lib/db: Use iter.Value() instead of needless db.Get(iter.Key())
* lib/db: Add comment explaining RemovePendingFoldersBeforeTime().
* lib/model: Rename fields folderID and deviceID in event data.
* lib/db: Only list actually expired IDs as removed.
Skip entries where Delete() failed as well as invalid entries that got
removed automatically.
* lib/model: Gofmt
2021-01-25 10:58:10 +00:00
|
|
|
DeviceRejected // DEPRECATED, superseded by PendingDevicesChanged
|
|
|
|
PendingDevicesChanged
|
2015-08-23 19:56:10 +00:00
|
|
|
DevicePaused
|
|
|
|
DeviceResumed
|
2022-04-22 06:42:20 +00:00
|
|
|
ClusterConfigReceived
|
2016-05-19 07:01:43 +00:00
|
|
|
LocalChangeDetected
|
2016-12-21 16:35:20 +00:00
|
|
|
RemoteChangeDetected
|
2014-07-13 19:07:24 +00:00
|
|
|
LocalIndexUpdated
|
|
|
|
RemoteIndexUpdated
|
|
|
|
ItemStarted
|
2015-02-01 17:31:19 +00:00
|
|
|
ItemFinished
|
2014-07-17 11:38:36 +00:00
|
|
|
StateChanged
|
lib/model: Forget pending folders no longer announced in ClusterConfig (fixes #5187) (#7205)
* lib/db: Add ExpirePendingFolders().
Use-case is to drop any no-longer-pending folders for a specific
device when parsing its ClusterConfig message where previously offered
folders are not mentioned any more.
The timestamp in ObservedFolder is stored with only second precision,
so round to seconds here as well. This allows calling the function
within the same second of adding or updating entries.
* lib/model: Weed out pending folders when receiving ClusterConfig.
Filter the entries by timestamp, which must be newer than or equal to
the reception time of the ClusterConfig. For just mentioned ones,
this assumption will hold as AddOrUpdatePendingFolder() updates the
timestamp.
* lib/model, gui: Notify when one or more pending folders expired.
Introduce new event type FolderOfferCancelled and use it to trigger a
complete refreshCluster() cycle. Listing individual entries would be
much more code and probably just as much work to answer the API
request.
* lib/model: Add comment and rename ExpirePendingFolders().
* lib/events: Rename FolderOfferCancelled to ClusterPendingChanged.
* lib/model: Reuse ClusterPendingChanged event for cleanPending()
Changing the config does not necessarily mean that the
/resut/cluster/pending endpoints need to be refreshed, but only if
something was actually removed. Detect this and indicate it through
the ClusterPendingChanged event, which is already hooked up to requery
respective endpoints within the GUI.
No more need for a separate refreshCluster() in reaction to
ConfigSaved event or calling refreshConfig().
* lib/model: Gofmt.
* lib/db: Warn instead of info log for failed removal.
* gui: Fix pending notifications not loading on GUI start.
* lib/db: Use short device ID in log message.
* lib/db: Return list of expired folder IDs after deleting them.
* lib/model: Refactor Pending...Changed events.
* lib/model: Adjust format of removed pending folders enumeration.
Use an array of objects with device / folder ID properties, matching
the other places where it's used.
* lib/db: Drop invalid entries in RemovePendingFoldersBeforeTime().
* lib/model: Gofmt.
My local gofmt did not complain here, strangely...
* gui: Handle PendingDevicesChanged event.
Even though it currently only holds one device at a time, wrap the
contents in an array under the "added" property name.
* lib/model: Fix null values in PendingFoldersChanged removed member.
* gui: Handle PendingFoldersChanged event.
* lib/model: Simplify construction of expiredPendingList.
* lib/model: Reduce code duplication in cleanPending().
Use goto and a label for the common parts of calling the DB removal
function and building the event data part.
* lib/events, gui: Mark ...Rejected events deprecated.
Extend comments explaining the conditions when the replacement event
types are emitted.
* lib/model: Wrap removed devices in array of objects as well.
* lib/db: Use iter.Value() instead of needless db.Get(iter.Key())
* lib/db: Add comment explaining RemovePendingFoldersBeforeTime().
* lib/model: Rename fields folderID and deviceID in event data.
* lib/db: Only list actually expired IDs as removed.
Skip entries where Delete() failed as well as invalid entries that got
removed automatically.
* lib/model: Gofmt
2021-01-25 10:58:10 +00:00
|
|
|
FolderRejected // DEPRECATED, superseded by PendingFoldersChanged
|
|
|
|
PendingFoldersChanged
|
2014-09-06 15:31:23 +00:00
|
|
|
ConfigSaved
|
2014-11-16 23:18:59 +00:00
|
|
|
DownloadProgress
|
2016-05-22 07:52:08 +00:00
|
|
|
RemoteDownloadProgress
|
2015-03-26 22:26:51 +00:00
|
|
|
FolderSummary
|
|
|
|
FolderCompletion
|
2015-06-26 11:31:30 +00:00
|
|
|
FolderErrors
|
2015-08-26 22:49:06 +00:00
|
|
|
FolderScanProgress
|
2016-12-21 18:41:25 +00:00
|
|
|
FolderPaused
|
|
|
|
FolderResumed
|
2018-06-11 13:47:54 +00:00
|
|
|
FolderWatchStateChanged
|
2016-05-04 19:38:12 +00:00
|
|
|
ListenAddressesChanged
|
2015-11-08 20:05:36 +00:00
|
|
|
LoginAttempt
|
2020-10-07 08:05:13 +00:00
|
|
|
Failure
|
2014-07-13 19:07:24 +00:00
|
|
|
|
2014-10-06 22:03:24 +00:00
|
|
|
AllEvents = (1 << iota) - 1
|
2014-07-13 19:07:24 +00:00
|
|
|
)
|
|
|
|
|
2019-08-15 14:29:37 +00:00
|
|
|
var (
|
|
|
|
runningTests = false
|
|
|
|
errNoop = errors.New("method of a noop object called")
|
|
|
|
)
|
2016-08-08 18:09:40 +00:00
|
|
|
|
2017-02-07 07:25:09 +00:00
|
|
|
const eventLogTimeout = 15 * time.Millisecond
|
|
|
|
|
2014-07-13 19:07:24 +00:00
|
|
|
func (t EventType) String() string {
|
|
|
|
switch t {
|
2014-07-17 11:38:36 +00:00
|
|
|
case Starting:
|
|
|
|
return "Starting"
|
2014-07-13 19:07:24 +00:00
|
|
|
case StartupComplete:
|
|
|
|
return "StartupComplete"
|
2014-09-28 11:00:38 +00:00
|
|
|
case DeviceDiscovered:
|
|
|
|
return "DeviceDiscovered"
|
|
|
|
case DeviceConnected:
|
|
|
|
return "DeviceConnected"
|
|
|
|
case DeviceDisconnected:
|
|
|
|
return "DeviceDisconnected"
|
|
|
|
case DeviceRejected:
|
|
|
|
return "DeviceRejected"
|
lib/model: Forget pending folders no longer announced in ClusterConfig (fixes #5187) (#7205)
* lib/db: Add ExpirePendingFolders().
Use-case is to drop any no-longer-pending folders for a specific
device when parsing its ClusterConfig message where previously offered
folders are not mentioned any more.
The timestamp in ObservedFolder is stored with only second precision,
so round to seconds here as well. This allows calling the function
within the same second of adding or updating entries.
* lib/model: Weed out pending folders when receiving ClusterConfig.
Filter the entries by timestamp, which must be newer than or equal to
the reception time of the ClusterConfig. For just mentioned ones,
this assumption will hold as AddOrUpdatePendingFolder() updates the
timestamp.
* lib/model, gui: Notify when one or more pending folders expired.
Introduce new event type FolderOfferCancelled and use it to trigger a
complete refreshCluster() cycle. Listing individual entries would be
much more code and probably just as much work to answer the API
request.
* lib/model: Add comment and rename ExpirePendingFolders().
* lib/events: Rename FolderOfferCancelled to ClusterPendingChanged.
* lib/model: Reuse ClusterPendingChanged event for cleanPending()
Changing the config does not necessarily mean that the
/resut/cluster/pending endpoints need to be refreshed, but only if
something was actually removed. Detect this and indicate it through
the ClusterPendingChanged event, which is already hooked up to requery
respective endpoints within the GUI.
No more need for a separate refreshCluster() in reaction to
ConfigSaved event or calling refreshConfig().
* lib/model: Gofmt.
* lib/db: Warn instead of info log for failed removal.
* gui: Fix pending notifications not loading on GUI start.
* lib/db: Use short device ID in log message.
* lib/db: Return list of expired folder IDs after deleting them.
* lib/model: Refactor Pending...Changed events.
* lib/model: Adjust format of removed pending folders enumeration.
Use an array of objects with device / folder ID properties, matching
the other places where it's used.
* lib/db: Drop invalid entries in RemovePendingFoldersBeforeTime().
* lib/model: Gofmt.
My local gofmt did not complain here, strangely...
* gui: Handle PendingDevicesChanged event.
Even though it currently only holds one device at a time, wrap the
contents in an array under the "added" property name.
* lib/model: Fix null values in PendingFoldersChanged removed member.
* gui: Handle PendingFoldersChanged event.
* lib/model: Simplify construction of expiredPendingList.
* lib/model: Reduce code duplication in cleanPending().
Use goto and a label for the common parts of calling the DB removal
function and building the event data part.
* lib/events, gui: Mark ...Rejected events deprecated.
Extend comments explaining the conditions when the replacement event
types are emitted.
* lib/model: Wrap removed devices in array of objects as well.
* lib/db: Use iter.Value() instead of needless db.Get(iter.Key())
* lib/db: Add comment explaining RemovePendingFoldersBeforeTime().
* lib/model: Rename fields folderID and deviceID in event data.
* lib/db: Only list actually expired IDs as removed.
Skip entries where Delete() failed as well as invalid entries that got
removed automatically.
* lib/model: Gofmt
2021-01-25 10:58:10 +00:00
|
|
|
case PendingDevicesChanged:
|
|
|
|
return "PendingDevicesChanged"
|
2016-05-19 07:01:43 +00:00
|
|
|
case LocalChangeDetected:
|
|
|
|
return "LocalChangeDetected"
|
2016-12-21 16:35:20 +00:00
|
|
|
case RemoteChangeDetected:
|
|
|
|
return "RemoteChangeDetected"
|
2014-07-13 19:07:24 +00:00
|
|
|
case LocalIndexUpdated:
|
|
|
|
return "LocalIndexUpdated"
|
|
|
|
case RemoteIndexUpdated:
|
|
|
|
return "RemoteIndexUpdated"
|
|
|
|
case ItemStarted:
|
|
|
|
return "ItemStarted"
|
2015-02-01 17:31:19 +00:00
|
|
|
case ItemFinished:
|
|
|
|
return "ItemFinished"
|
2014-07-17 11:38:36 +00:00
|
|
|
case StateChanged:
|
|
|
|
return "StateChanged"
|
2014-09-28 11:00:38 +00:00
|
|
|
case FolderRejected:
|
|
|
|
return "FolderRejected"
|
lib/model: Forget pending folders no longer announced in ClusterConfig (fixes #5187) (#7205)
* lib/db: Add ExpirePendingFolders().
Use-case is to drop any no-longer-pending folders for a specific
device when parsing its ClusterConfig message where previously offered
folders are not mentioned any more.
The timestamp in ObservedFolder is stored with only second precision,
so round to seconds here as well. This allows calling the function
within the same second of adding or updating entries.
* lib/model: Weed out pending folders when receiving ClusterConfig.
Filter the entries by timestamp, which must be newer than or equal to
the reception time of the ClusterConfig. For just mentioned ones,
this assumption will hold as AddOrUpdatePendingFolder() updates the
timestamp.
* lib/model, gui: Notify when one or more pending folders expired.
Introduce new event type FolderOfferCancelled and use it to trigger a
complete refreshCluster() cycle. Listing individual entries would be
much more code and probably just as much work to answer the API
request.
* lib/model: Add comment and rename ExpirePendingFolders().
* lib/events: Rename FolderOfferCancelled to ClusterPendingChanged.
* lib/model: Reuse ClusterPendingChanged event for cleanPending()
Changing the config does not necessarily mean that the
/resut/cluster/pending endpoints need to be refreshed, but only if
something was actually removed. Detect this and indicate it through
the ClusterPendingChanged event, which is already hooked up to requery
respective endpoints within the GUI.
No more need for a separate refreshCluster() in reaction to
ConfigSaved event or calling refreshConfig().
* lib/model: Gofmt.
* lib/db: Warn instead of info log for failed removal.
* gui: Fix pending notifications not loading on GUI start.
* lib/db: Use short device ID in log message.
* lib/db: Return list of expired folder IDs after deleting them.
* lib/model: Refactor Pending...Changed events.
* lib/model: Adjust format of removed pending folders enumeration.
Use an array of objects with device / folder ID properties, matching
the other places where it's used.
* lib/db: Drop invalid entries in RemovePendingFoldersBeforeTime().
* lib/model: Gofmt.
My local gofmt did not complain here, strangely...
* gui: Handle PendingDevicesChanged event.
Even though it currently only holds one device at a time, wrap the
contents in an array under the "added" property name.
* lib/model: Fix null values in PendingFoldersChanged removed member.
* gui: Handle PendingFoldersChanged event.
* lib/model: Simplify construction of expiredPendingList.
* lib/model: Reduce code duplication in cleanPending().
Use goto and a label for the common parts of calling the DB removal
function and building the event data part.
* lib/events, gui: Mark ...Rejected events deprecated.
Extend comments explaining the conditions when the replacement event
types are emitted.
* lib/model: Wrap removed devices in array of objects as well.
* lib/db: Use iter.Value() instead of needless db.Get(iter.Key())
* lib/db: Add comment explaining RemovePendingFoldersBeforeTime().
* lib/model: Rename fields folderID and deviceID in event data.
* lib/db: Only list actually expired IDs as removed.
Skip entries where Delete() failed as well as invalid entries that got
removed automatically.
* lib/model: Gofmt
2021-01-25 10:58:10 +00:00
|
|
|
case PendingFoldersChanged:
|
|
|
|
return "PendingFoldersChanged"
|
2014-09-06 15:31:23 +00:00
|
|
|
case ConfigSaved:
|
|
|
|
return "ConfigSaved"
|
2014-11-16 23:18:59 +00:00
|
|
|
case DownloadProgress:
|
|
|
|
return "DownloadProgress"
|
2016-05-22 07:52:08 +00:00
|
|
|
case RemoteDownloadProgress:
|
|
|
|
return "RemoteDownloadProgress"
|
2015-03-26 22:26:51 +00:00
|
|
|
case FolderSummary:
|
|
|
|
return "FolderSummary"
|
|
|
|
case FolderCompletion:
|
|
|
|
return "FolderCompletion"
|
2015-06-26 11:31:30 +00:00
|
|
|
case FolderErrors:
|
|
|
|
return "FolderErrors"
|
2015-08-23 19:56:10 +00:00
|
|
|
case DevicePaused:
|
|
|
|
return "DevicePaused"
|
|
|
|
case DeviceResumed:
|
|
|
|
return "DeviceResumed"
|
2022-04-22 06:42:20 +00:00
|
|
|
case ClusterConfigReceived:
|
|
|
|
return "ClusterConfigReceived"
|
2015-08-26 22:49:06 +00:00
|
|
|
case FolderScanProgress:
|
|
|
|
return "FolderScanProgress"
|
2016-12-21 18:41:25 +00:00
|
|
|
case FolderPaused:
|
|
|
|
return "FolderPaused"
|
|
|
|
case FolderResumed:
|
|
|
|
return "FolderResumed"
|
2016-05-04 19:38:12 +00:00
|
|
|
case ListenAddressesChanged:
|
|
|
|
return "ListenAddressesChanged"
|
2015-11-08 20:05:36 +00:00
|
|
|
case LoginAttempt:
|
|
|
|
return "LoginAttempt"
|
2018-06-11 13:47:54 +00:00
|
|
|
case FolderWatchStateChanged:
|
|
|
|
return "FolderWatchStateChanged"
|
2020-10-07 08:05:13 +00:00
|
|
|
case Failure:
|
|
|
|
return "Failure"
|
2014-07-13 19:07:24 +00:00
|
|
|
default:
|
|
|
|
return "Unknown"
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (t EventType) MarshalText() ([]byte, error) {
|
|
|
|
return []byte(t.String()), nil
|
|
|
|
}
|
|
|
|
|
2017-11-22 23:25:55 +00:00
|
|
|
func (t *EventType) UnmarshalJSON(b []byte) error {
|
|
|
|
var s string
|
|
|
|
|
|
|
|
if err := json.Unmarshal(b, &s); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
*t = UnmarshalEventType(s)
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2017-04-13 17:14:34 +00:00
|
|
|
func UnmarshalEventType(s string) EventType {
|
|
|
|
switch s {
|
|
|
|
case "Starting":
|
|
|
|
return Starting
|
|
|
|
case "StartupComplete":
|
|
|
|
return StartupComplete
|
|
|
|
case "DeviceDiscovered":
|
|
|
|
return DeviceDiscovered
|
|
|
|
case "DeviceConnected":
|
|
|
|
return DeviceConnected
|
|
|
|
case "DeviceDisconnected":
|
|
|
|
return DeviceDisconnected
|
|
|
|
case "DeviceRejected":
|
|
|
|
return DeviceRejected
|
lib/model: Forget pending folders no longer announced in ClusterConfig (fixes #5187) (#7205)
* lib/db: Add ExpirePendingFolders().
Use-case is to drop any no-longer-pending folders for a specific
device when parsing its ClusterConfig message where previously offered
folders are not mentioned any more.
The timestamp in ObservedFolder is stored with only second precision,
so round to seconds here as well. This allows calling the function
within the same second of adding or updating entries.
* lib/model: Weed out pending folders when receiving ClusterConfig.
Filter the entries by timestamp, which must be newer than or equal to
the reception time of the ClusterConfig. For just mentioned ones,
this assumption will hold as AddOrUpdatePendingFolder() updates the
timestamp.
* lib/model, gui: Notify when one or more pending folders expired.
Introduce new event type FolderOfferCancelled and use it to trigger a
complete refreshCluster() cycle. Listing individual entries would be
much more code and probably just as much work to answer the API
request.
* lib/model: Add comment and rename ExpirePendingFolders().
* lib/events: Rename FolderOfferCancelled to ClusterPendingChanged.
* lib/model: Reuse ClusterPendingChanged event for cleanPending()
Changing the config does not necessarily mean that the
/resut/cluster/pending endpoints need to be refreshed, but only if
something was actually removed. Detect this and indicate it through
the ClusterPendingChanged event, which is already hooked up to requery
respective endpoints within the GUI.
No more need for a separate refreshCluster() in reaction to
ConfigSaved event or calling refreshConfig().
* lib/model: Gofmt.
* lib/db: Warn instead of info log for failed removal.
* gui: Fix pending notifications not loading on GUI start.
* lib/db: Use short device ID in log message.
* lib/db: Return list of expired folder IDs after deleting them.
* lib/model: Refactor Pending...Changed events.
* lib/model: Adjust format of removed pending folders enumeration.
Use an array of objects with device / folder ID properties, matching
the other places where it's used.
* lib/db: Drop invalid entries in RemovePendingFoldersBeforeTime().
* lib/model: Gofmt.
My local gofmt did not complain here, strangely...
* gui: Handle PendingDevicesChanged event.
Even though it currently only holds one device at a time, wrap the
contents in an array under the "added" property name.
* lib/model: Fix null values in PendingFoldersChanged removed member.
* gui: Handle PendingFoldersChanged event.
* lib/model: Simplify construction of expiredPendingList.
* lib/model: Reduce code duplication in cleanPending().
Use goto and a label for the common parts of calling the DB removal
function and building the event data part.
* lib/events, gui: Mark ...Rejected events deprecated.
Extend comments explaining the conditions when the replacement event
types are emitted.
* lib/model: Wrap removed devices in array of objects as well.
* lib/db: Use iter.Value() instead of needless db.Get(iter.Key())
* lib/db: Add comment explaining RemovePendingFoldersBeforeTime().
* lib/model: Rename fields folderID and deviceID in event data.
* lib/db: Only list actually expired IDs as removed.
Skip entries where Delete() failed as well as invalid entries that got
removed automatically.
* lib/model: Gofmt
2021-01-25 10:58:10 +00:00
|
|
|
case "PendingDevicesChanged":
|
|
|
|
return PendingDevicesChanged
|
2017-04-13 17:14:34 +00:00
|
|
|
case "LocalChangeDetected":
|
|
|
|
return LocalChangeDetected
|
|
|
|
case "RemoteChangeDetected":
|
|
|
|
return RemoteChangeDetected
|
|
|
|
case "LocalIndexUpdated":
|
|
|
|
return LocalIndexUpdated
|
|
|
|
case "RemoteIndexUpdated":
|
|
|
|
return RemoteIndexUpdated
|
|
|
|
case "ItemStarted":
|
|
|
|
return ItemStarted
|
|
|
|
case "ItemFinished":
|
|
|
|
return ItemFinished
|
|
|
|
case "StateChanged":
|
|
|
|
return StateChanged
|
|
|
|
case "FolderRejected":
|
|
|
|
return FolderRejected
|
lib/model: Forget pending folders no longer announced in ClusterConfig (fixes #5187) (#7205)
* lib/db: Add ExpirePendingFolders().
Use-case is to drop any no-longer-pending folders for a specific
device when parsing its ClusterConfig message where previously offered
folders are not mentioned any more.
The timestamp in ObservedFolder is stored with only second precision,
so round to seconds here as well. This allows calling the function
within the same second of adding or updating entries.
* lib/model: Weed out pending folders when receiving ClusterConfig.
Filter the entries by timestamp, which must be newer than or equal to
the reception time of the ClusterConfig. For just mentioned ones,
this assumption will hold as AddOrUpdatePendingFolder() updates the
timestamp.
* lib/model, gui: Notify when one or more pending folders expired.
Introduce new event type FolderOfferCancelled and use it to trigger a
complete refreshCluster() cycle. Listing individual entries would be
much more code and probably just as much work to answer the API
request.
* lib/model: Add comment and rename ExpirePendingFolders().
* lib/events: Rename FolderOfferCancelled to ClusterPendingChanged.
* lib/model: Reuse ClusterPendingChanged event for cleanPending()
Changing the config does not necessarily mean that the
/resut/cluster/pending endpoints need to be refreshed, but only if
something was actually removed. Detect this and indicate it through
the ClusterPendingChanged event, which is already hooked up to requery
respective endpoints within the GUI.
No more need for a separate refreshCluster() in reaction to
ConfigSaved event or calling refreshConfig().
* lib/model: Gofmt.
* lib/db: Warn instead of info log for failed removal.
* gui: Fix pending notifications not loading on GUI start.
* lib/db: Use short device ID in log message.
* lib/db: Return list of expired folder IDs after deleting them.
* lib/model: Refactor Pending...Changed events.
* lib/model: Adjust format of removed pending folders enumeration.
Use an array of objects with device / folder ID properties, matching
the other places where it's used.
* lib/db: Drop invalid entries in RemovePendingFoldersBeforeTime().
* lib/model: Gofmt.
My local gofmt did not complain here, strangely...
* gui: Handle PendingDevicesChanged event.
Even though it currently only holds one device at a time, wrap the
contents in an array under the "added" property name.
* lib/model: Fix null values in PendingFoldersChanged removed member.
* gui: Handle PendingFoldersChanged event.
* lib/model: Simplify construction of expiredPendingList.
* lib/model: Reduce code duplication in cleanPending().
Use goto and a label for the common parts of calling the DB removal
function and building the event data part.
* lib/events, gui: Mark ...Rejected events deprecated.
Extend comments explaining the conditions when the replacement event
types are emitted.
* lib/model: Wrap removed devices in array of objects as well.
* lib/db: Use iter.Value() instead of needless db.Get(iter.Key())
* lib/db: Add comment explaining RemovePendingFoldersBeforeTime().
* lib/model: Rename fields folderID and deviceID in event data.
* lib/db: Only list actually expired IDs as removed.
Skip entries where Delete() failed as well as invalid entries that got
removed automatically.
* lib/model: Gofmt
2021-01-25 10:58:10 +00:00
|
|
|
case "PendingFoldersChanged":
|
|
|
|
return PendingFoldersChanged
|
2017-04-13 17:14:34 +00:00
|
|
|
case "ConfigSaved":
|
|
|
|
return ConfigSaved
|
|
|
|
case "DownloadProgress":
|
|
|
|
return DownloadProgress
|
|
|
|
case "RemoteDownloadProgress":
|
|
|
|
return RemoteDownloadProgress
|
|
|
|
case "FolderSummary":
|
|
|
|
return FolderSummary
|
|
|
|
case "FolderCompletion":
|
|
|
|
return FolderCompletion
|
|
|
|
case "FolderErrors":
|
|
|
|
return FolderErrors
|
|
|
|
case "DevicePaused":
|
|
|
|
return DevicePaused
|
|
|
|
case "DeviceResumed":
|
|
|
|
return DeviceResumed
|
2022-04-22 06:42:20 +00:00
|
|
|
case "ClusterConfigReceived":
|
|
|
|
return ClusterConfigReceived
|
2017-04-13 17:14:34 +00:00
|
|
|
case "FolderScanProgress":
|
|
|
|
return FolderScanProgress
|
|
|
|
case "FolderPaused":
|
|
|
|
return FolderPaused
|
|
|
|
case "FolderResumed":
|
|
|
|
return FolderResumed
|
|
|
|
case "ListenAddressesChanged":
|
|
|
|
return ListenAddressesChanged
|
|
|
|
case "LoginAttempt":
|
|
|
|
return LoginAttempt
|
2018-06-11 13:47:54 +00:00
|
|
|
case "FolderWatchStateChanged":
|
|
|
|
return FolderWatchStateChanged
|
2020-10-07 08:05:13 +00:00
|
|
|
case "Failure":
|
|
|
|
return Failure
|
2017-04-13 17:14:34 +00:00
|
|
|
default:
|
|
|
|
return 0
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2014-07-13 19:07:24 +00:00
|
|
|
const BufferSize = 64
|
|
|
|
|
2019-08-15 14:29:37 +00:00
|
|
|
type Logger interface {
|
|
|
|
suture.Service
|
|
|
|
Log(t EventType, data interface{})
|
|
|
|
Subscribe(mask EventType) Subscription
|
|
|
|
}
|
|
|
|
|
|
|
|
type logger struct {
|
|
|
|
subs []*subscription
|
2016-06-27 21:18:58 +00:00
|
|
|
nextSubscriptionIDs []int
|
|
|
|
nextGlobalID int
|
2017-02-07 07:25:09 +00:00
|
|
|
timeout *time.Timer
|
2018-12-13 12:42:28 +00:00
|
|
|
events chan Event
|
2020-01-11 07:14:05 +00:00
|
|
|
funcs chan func(context.Context)
|
2019-08-15 14:29:37 +00:00
|
|
|
toUnsubscribe chan *subscription
|
2014-07-13 19:07:24 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
type Event struct {
|
2016-06-27 21:18:58 +00:00
|
|
|
// Per-subscription sequential event ID. Named "id" for backwards compatibility with the REST API
|
|
|
|
SubscriptionID int `json:"id"`
|
|
|
|
// Global ID of the event across all subscriptions
|
|
|
|
GlobalID int `json:"globalID"`
|
|
|
|
Time time.Time `json:"time"`
|
|
|
|
Type EventType `json:"type"`
|
|
|
|
Data interface{} `json:"data"`
|
2014-07-13 19:07:24 +00:00
|
|
|
}
|
|
|
|
|
2019-08-15 14:29:37 +00:00
|
|
|
type Subscription interface {
|
|
|
|
C() <-chan Event
|
|
|
|
Poll(timeout time.Duration) (Event, error)
|
2020-06-25 19:48:48 +00:00
|
|
|
Mask() EventType
|
2019-08-15 14:29:37 +00:00
|
|
|
Unsubscribe()
|
2014-07-13 19:07:24 +00:00
|
|
|
}
|
|
|
|
|
2019-08-15 14:29:37 +00:00
|
|
|
type subscription struct {
|
|
|
|
mask EventType
|
|
|
|
events chan Event
|
|
|
|
toUnsubscribe chan *subscription
|
|
|
|
timeout *time.Timer
|
2020-01-11 07:14:05 +00:00
|
|
|
ctx context.Context
|
2018-12-13 12:42:28 +00:00
|
|
|
}
|
|
|
|
|
2014-07-13 19:07:24 +00:00
|
|
|
var (
|
|
|
|
ErrTimeout = errors.New("timeout")
|
|
|
|
ErrClosed = errors.New("closed")
|
|
|
|
)
|
|
|
|
|
2019-08-15 14:29:37 +00:00
|
|
|
func NewLogger() Logger {
|
|
|
|
l := &logger{
|
|
|
|
timeout: time.NewTimer(time.Second),
|
|
|
|
events: make(chan Event, BufferSize),
|
2020-01-11 07:14:05 +00:00
|
|
|
funcs: make(chan func(context.Context)),
|
2019-08-15 14:29:37 +00:00
|
|
|
toUnsubscribe: make(chan *subscription),
|
2017-02-07 07:25:09 +00:00
|
|
|
}
|
|
|
|
// Make sure the timer is in the stopped state and hasn't fired anything
|
|
|
|
// into the channel.
|
|
|
|
if !l.timeout.Stop() {
|
|
|
|
<-l.timeout.C
|
2014-07-13 19:07:24 +00:00
|
|
|
}
|
2017-02-07 07:25:09 +00:00
|
|
|
return l
|
2014-07-13 19:07:24 +00:00
|
|
|
}
|
|
|
|
|
2020-11-17 12:19:04 +00:00
|
|
|
func (l *logger) Serve(ctx context.Context) error {
|
2018-12-13 12:42:28 +00:00
|
|
|
loop:
|
|
|
|
for {
|
|
|
|
select {
|
|
|
|
case e := <-l.events:
|
|
|
|
// Incoming events get sent
|
|
|
|
l.sendEvent(e)
|
2023-08-04 17:57:30 +00:00
|
|
|
metricEvents.WithLabelValues(e.Type.String(), metricEventStateCreated).Inc()
|
2018-12-13 12:42:28 +00:00
|
|
|
|
|
|
|
case fn := <-l.funcs:
|
2019-08-15 14:29:37 +00:00
|
|
|
// Subscriptions are handled here.
|
2020-01-11 07:14:05 +00:00
|
|
|
fn(ctx)
|
2018-12-13 12:42:28 +00:00
|
|
|
|
2019-08-15 14:29:37 +00:00
|
|
|
case s := <-l.toUnsubscribe:
|
|
|
|
l.unsubscribe(s)
|
|
|
|
|
2019-11-21 07:41:15 +00:00
|
|
|
case <-ctx.Done():
|
2018-12-13 12:42:28 +00:00
|
|
|
break loop
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Closing the event channels corresponds to what happens when a
|
|
|
|
// subscription is unsubscribed; this stops any BufferedSubscription,
|
|
|
|
// makes Poll() return ErrClosed, etc.
|
|
|
|
for _, s := range l.subs {
|
|
|
|
close(s.events)
|
|
|
|
}
|
2020-11-17 12:19:04 +00:00
|
|
|
|
|
|
|
return nil
|
2018-12-13 12:42:28 +00:00
|
|
|
}
|
|
|
|
|
2019-08-15 14:29:37 +00:00
|
|
|
func (l *logger) Log(t EventType, data interface{}) {
|
2018-12-13 12:42:28 +00:00
|
|
|
l.events <- Event{
|
2021-03-12 09:35:10 +00:00
|
|
|
Time: time.Now(), // intentionally high precision
|
2018-12-13 12:42:28 +00:00
|
|
|
Type: t,
|
|
|
|
Data: data,
|
|
|
|
// SubscriptionID and GlobalID are set in sendEvent
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-08-15 14:29:37 +00:00
|
|
|
func (l *logger) sendEvent(e Event) {
|
2016-06-27 21:18:58 +00:00
|
|
|
l.nextGlobalID++
|
2018-12-13 12:42:28 +00:00
|
|
|
dl.Debugln("log", l.nextGlobalID, e.Type, e.Data)
|
2016-06-27 21:18:58 +00:00
|
|
|
|
2018-12-13 12:42:28 +00:00
|
|
|
e.GlobalID = l.nextGlobalID
|
2016-06-27 21:18:58 +00:00
|
|
|
|
|
|
|
for i, s := range l.subs {
|
2018-12-13 12:42:28 +00:00
|
|
|
if s.mask&e.Type != 0 {
|
2016-06-27 21:18:58 +00:00
|
|
|
e.SubscriptionID = l.nextSubscriptionIDs[i]
|
|
|
|
l.nextSubscriptionIDs[i]++
|
|
|
|
|
2017-02-07 07:25:09 +00:00
|
|
|
l.timeout.Reset(eventLogTimeout)
|
|
|
|
timedOut := false
|
|
|
|
|
2014-07-13 19:07:24 +00:00
|
|
|
select {
|
|
|
|
case s.events <- e:
|
2023-08-04 17:57:30 +00:00
|
|
|
metricEvents.WithLabelValues(e.Type.String(), metricEventStateDelivered).Inc()
|
2017-02-07 07:25:09 +00:00
|
|
|
case <-l.timeout.C:
|
2014-10-06 22:03:24 +00:00
|
|
|
// if s.events is not ready, drop the event
|
2017-02-07 07:25:09 +00:00
|
|
|
timedOut = true
|
2023-08-04 17:57:30 +00:00
|
|
|
metricEvents.WithLabelValues(e.Type.String(), metricEventStateDropped).Inc()
|
2017-02-07 07:25:09 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// If stop returns false it already sent something to the
|
|
|
|
// channel. If we didn't already read it above we must do so now
|
|
|
|
// or we get a spurious timeout on the next loop.
|
|
|
|
if !l.timeout.Stop() && !timedOut {
|
|
|
|
<-l.timeout.C
|
2014-07-13 19:07:24 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-08-15 14:29:37 +00:00
|
|
|
func (l *logger) Subscribe(mask EventType) Subscription {
|
|
|
|
res := make(chan Subscription)
|
2020-01-11 07:14:05 +00:00
|
|
|
l.funcs <- func(ctx context.Context) {
|
2018-12-13 12:42:28 +00:00
|
|
|
dl.Debugln("subscribe", mask)
|
|
|
|
|
2019-08-15 14:29:37 +00:00
|
|
|
s := &subscription{
|
|
|
|
mask: mask,
|
|
|
|
events: make(chan Event, BufferSize),
|
|
|
|
toUnsubscribe: l.toUnsubscribe,
|
|
|
|
timeout: time.NewTimer(0),
|
2020-01-11 07:14:05 +00:00
|
|
|
ctx: ctx,
|
2018-12-13 12:42:28 +00:00
|
|
|
}
|
2015-11-17 11:03:18 +00:00
|
|
|
|
2018-12-13 12:42:28 +00:00
|
|
|
// We need to create the timeout timer in the stopped, non-fired state so
|
|
|
|
// that Subscription.Poll() can safely reset it and select on the timeout
|
|
|
|
// channel. This ensures the timer is stopped and the channel drained.
|
|
|
|
if runningTests {
|
|
|
|
// Make the behavior stable when running tests to avoid randomly
|
|
|
|
// varying test coverage. This ensures, in practice if not in
|
|
|
|
// theory, that the timer fires and we take the true branch of the
|
|
|
|
// next if.
|
|
|
|
runtime.Gosched()
|
|
|
|
}
|
|
|
|
if !s.timeout.Stop() {
|
|
|
|
<-s.timeout.C
|
|
|
|
}
|
2015-11-17 11:03:18 +00:00
|
|
|
|
2018-12-13 12:42:28 +00:00
|
|
|
l.subs = append(l.subs, s)
|
|
|
|
l.nextSubscriptionIDs = append(l.nextSubscriptionIDs, 1)
|
|
|
|
res <- s
|
2016-08-08 18:09:40 +00:00
|
|
|
}
|
2018-12-13 12:42:28 +00:00
|
|
|
return <-res
|
2014-07-13 19:07:24 +00:00
|
|
|
}
|
|
|
|
|
2019-08-15 14:29:37 +00:00
|
|
|
func (l *logger) unsubscribe(s *subscription) {
|
|
|
|
dl.Debugln("unsubscribe", s.mask)
|
|
|
|
for i, ss := range l.subs {
|
|
|
|
if s == ss {
|
|
|
|
last := len(l.subs) - 1
|
2016-06-27 21:18:58 +00:00
|
|
|
|
2019-08-15 14:29:37 +00:00
|
|
|
l.subs[i] = l.subs[last]
|
|
|
|
l.subs[last] = nil
|
|
|
|
l.subs = l.subs[:last]
|
2016-06-27 21:18:58 +00:00
|
|
|
|
2019-08-15 14:29:37 +00:00
|
|
|
l.nextSubscriptionIDs[i] = l.nextSubscriptionIDs[last]
|
|
|
|
l.nextSubscriptionIDs[last] = 0
|
|
|
|
l.nextSubscriptionIDs = l.nextSubscriptionIDs[:last]
|
2016-06-27 21:18:58 +00:00
|
|
|
|
2019-08-15 14:29:37 +00:00
|
|
|
break
|
2015-09-29 15:17:09 +00:00
|
|
|
}
|
|
|
|
}
|
2019-08-15 14:29:37 +00:00
|
|
|
close(s.events)
|
2014-07-13 19:07:24 +00:00
|
|
|
}
|
|
|
|
|
2019-11-21 07:41:15 +00:00
|
|
|
func (l *logger) String() string {
|
|
|
|
return fmt.Sprintf("events.Logger/@%p", l)
|
|
|
|
}
|
|
|
|
|
2015-05-23 18:38:41 +00:00
|
|
|
// Poll returns an event from the subscription or an error if the poll times
|
|
|
|
// out of the event channel is closed. Poll should not be called concurrently
|
|
|
|
// from multiple goroutines for a single subscription.
|
2019-08-15 14:29:37 +00:00
|
|
|
func (s *subscription) Poll(timeout time.Duration) (Event, error) {
|
2015-10-03 15:25:21 +00:00
|
|
|
dl.Debugln("poll", timeout)
|
2014-07-25 12:50:14 +00:00
|
|
|
|
2015-11-17 11:03:18 +00:00
|
|
|
s.timeout.Reset(timeout)
|
2015-08-24 07:38:39 +00:00
|
|
|
|
2014-07-13 19:07:24 +00:00
|
|
|
select {
|
|
|
|
case e, ok := <-s.events:
|
|
|
|
if !ok {
|
|
|
|
return e, ErrClosed
|
|
|
|
}
|
2016-08-08 18:09:40 +00:00
|
|
|
if runningTests {
|
|
|
|
// Make the behavior stable when running tests to avoid randomly
|
|
|
|
// varying test coverage. This ensures, in practice if not in
|
|
|
|
// theory, that the timer fires and we take the true branch of
|
|
|
|
// the next if.
|
|
|
|
s.timeout.Reset(0)
|
|
|
|
runtime.Gosched()
|
|
|
|
}
|
2015-11-17 11:03:18 +00:00
|
|
|
if !s.timeout.Stop() {
|
|
|
|
// The timeout must be stopped and possibly drained to be ready
|
|
|
|
// for reuse in the next call.
|
|
|
|
<-s.timeout.C
|
|
|
|
}
|
2014-07-13 19:07:24 +00:00
|
|
|
return e, nil
|
2015-05-23 18:38:41 +00:00
|
|
|
case <-s.timeout.C:
|
2014-07-13 19:07:24 +00:00
|
|
|
return Event{}, ErrTimeout
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-08-15 14:29:37 +00:00
|
|
|
func (s *subscription) C() <-chan Event {
|
2014-12-26 23:12:12 +00:00
|
|
|
return s.events
|
|
|
|
}
|
|
|
|
|
2020-06-25 19:48:48 +00:00
|
|
|
func (s *subscription) Mask() EventType {
|
|
|
|
return s.mask
|
|
|
|
}
|
|
|
|
|
2019-08-15 14:29:37 +00:00
|
|
|
func (s *subscription) Unsubscribe() {
|
2020-01-11 07:14:05 +00:00
|
|
|
select {
|
|
|
|
case s.toUnsubscribe <- s:
|
|
|
|
case <-s.ctx.Done():
|
|
|
|
}
|
2019-08-15 14:29:37 +00:00
|
|
|
}
|
|
|
|
|
2016-03-21 19:36:08 +00:00
|
|
|
type bufferedSubscription struct {
|
2019-08-15 14:29:37 +00:00
|
|
|
sub Subscription
|
2014-07-13 19:07:24 +00:00
|
|
|
buf []Event
|
|
|
|
next int
|
2016-06-27 21:18:58 +00:00
|
|
|
cur int // Current SubscriptionID
|
2014-07-13 19:07:24 +00:00
|
|
|
mut sync.Mutex
|
2017-01-31 12:04:29 +00:00
|
|
|
cond *sync.TimeoutCond
|
2014-07-13 19:07:24 +00:00
|
|
|
}
|
|
|
|
|
2016-03-21 19:36:08 +00:00
|
|
|
type BufferedSubscription interface {
|
2017-01-31 12:04:29 +00:00
|
|
|
Since(id int, into []Event, timeout time.Duration) []Event
|
2020-06-25 19:48:48 +00:00
|
|
|
Mask() EventType
|
2016-03-21 19:36:08 +00:00
|
|
|
}
|
|
|
|
|
2019-08-15 14:29:37 +00:00
|
|
|
func NewBufferedSubscription(s Subscription, size int) BufferedSubscription {
|
2016-03-21 19:36:08 +00:00
|
|
|
bs := &bufferedSubscription{
|
2014-07-13 19:07:24 +00:00
|
|
|
sub: s,
|
|
|
|
buf: make([]Event, size),
|
2015-04-22 22:54:31 +00:00
|
|
|
mut: sync.NewMutex(),
|
2014-07-13 19:07:24 +00:00
|
|
|
}
|
2017-01-31 12:04:29 +00:00
|
|
|
bs.cond = sync.NewTimeoutCond(bs.mut)
|
2014-07-13 19:07:24 +00:00
|
|
|
go bs.pollingLoop()
|
|
|
|
return bs
|
|
|
|
}
|
|
|
|
|
2016-03-21 19:36:08 +00:00
|
|
|
func (s *bufferedSubscription) pollingLoop() {
|
2017-02-04 15:53:39 +00:00
|
|
|
for ev := range s.sub.C() {
|
2014-07-13 19:07:24 +00:00
|
|
|
s.mut.Lock()
|
|
|
|
s.buf[s.next] = ev
|
|
|
|
s.next = (s.next + 1) % len(s.buf)
|
2016-06-27 21:18:58 +00:00
|
|
|
s.cur = ev.SubscriptionID
|
2014-07-13 19:07:24 +00:00
|
|
|
s.cond.Broadcast()
|
|
|
|
s.mut.Unlock()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2017-01-31 12:04:29 +00:00
|
|
|
func (s *bufferedSubscription) Since(id int, into []Event, timeout time.Duration) []Event {
|
2014-07-13 19:07:24 +00:00
|
|
|
s.mut.Lock()
|
|
|
|
defer s.mut.Unlock()
|
|
|
|
|
2017-01-31 12:04:29 +00:00
|
|
|
// Check once first before generating the TimeoutCondWaiter
|
|
|
|
if id >= s.cur {
|
|
|
|
waiter := s.cond.SetupWait(timeout)
|
|
|
|
defer waiter.Stop()
|
|
|
|
|
|
|
|
for id >= s.cur {
|
|
|
|
if eventsAvailable := waiter.Wait(); !eventsAvailable {
|
|
|
|
// Timed out
|
|
|
|
return into
|
|
|
|
}
|
|
|
|
}
|
2014-07-13 19:07:24 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
for i := s.next; i < len(s.buf); i++ {
|
2016-06-27 21:18:58 +00:00
|
|
|
if s.buf[i].SubscriptionID > id {
|
2014-07-13 19:07:24 +00:00
|
|
|
into = append(into, s.buf[i])
|
|
|
|
}
|
|
|
|
}
|
|
|
|
for i := 0; i < s.next; i++ {
|
2016-06-27 21:18:58 +00:00
|
|
|
if s.buf[i].SubscriptionID > id {
|
2014-07-13 19:07:24 +00:00
|
|
|
into = append(into, s.buf[i])
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return into
|
|
|
|
}
|
2015-05-27 09:14:39 +00:00
|
|
|
|
2020-06-25 19:48:48 +00:00
|
|
|
func (s *bufferedSubscription) Mask() EventType {
|
|
|
|
return s.sub.Mask()
|
|
|
|
}
|
|
|
|
|
2015-05-27 09:14:39 +00:00
|
|
|
// Error returns a string pointer suitable for JSON marshalling errors. It
|
2015-11-12 02:20:34 +00:00
|
|
|
// retains the "null on success" semantics, but ensures the error result is a
|
2015-05-27 09:14:39 +00:00
|
|
|
// string regardless of the underlying concrete error type.
|
|
|
|
func Error(err error) *string {
|
|
|
|
if err == nil {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
str := err.Error()
|
|
|
|
return &str
|
|
|
|
}
|
2019-08-15 14:29:37 +00:00
|
|
|
|
|
|
|
type noopLogger struct{}
|
|
|
|
|
|
|
|
var NoopLogger Logger = &noopLogger{}
|
|
|
|
|
2022-07-28 15:17:29 +00:00
|
|
|
func (*noopLogger) Serve(_ context.Context) error { return nil }
|
2019-08-15 14:29:37 +00:00
|
|
|
|
2022-07-28 15:17:29 +00:00
|
|
|
func (*noopLogger) Log(_ EventType, _ interface{}) {}
|
2019-08-15 14:29:37 +00:00
|
|
|
|
2022-07-28 15:17:29 +00:00
|
|
|
func (*noopLogger) Subscribe(_ EventType) Subscription {
|
2019-08-15 14:29:37 +00:00
|
|
|
return &noopSubscription{}
|
|
|
|
}
|
|
|
|
|
|
|
|
type noopSubscription struct{}
|
|
|
|
|
|
|
|
func (*noopSubscription) C() <-chan Event {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2022-07-28 15:17:29 +00:00
|
|
|
func (*noopSubscription) Poll(_ time.Duration) (Event, error) {
|
2019-08-15 14:29:37 +00:00
|
|
|
return Event{}, errNoop
|
|
|
|
}
|
|
|
|
|
2022-07-28 15:32:45 +00:00
|
|
|
func (*noopSubscription) Mask() EventType {
|
2020-06-25 19:48:48 +00:00
|
|
|
return 0
|
|
|
|
}
|
|
|
|
|
2019-08-15 14:29:37 +00:00
|
|
|
func (*noopSubscription) Unsubscribe() {}
|