diff --git a/cmd/syncthing/gui.go b/cmd/syncthing/gui.go index ca431e6be..05a2b19f2 100644 --- a/cmd/syncthing/gui.go +++ b/cmd/syncthing/gui.go @@ -45,6 +45,12 @@ var ( startTime = time.Now() ) +const ( + defaultEventMask = events.AllEvents &^ events.LocalChangeDetected &^ events.RemoteChangeDetected + diskEventMask = events.LocalChangeDetected | events.RemoteChangeDetected + eventSubBufferSize = 1000 +) + type apiService struct { id protocol.DeviceID cfg configIntf @@ -52,8 +58,8 @@ type apiService struct { httpsKeyFile string statics *staticsServer model modelIntf - eventSub events.BufferedSubscription - diskEventSub events.BufferedSubscription + eventSubs map[events.EventType]events.BufferedSubscription + eventSubsMut sync.Mutex discoverer discover.CachingMux connectionsService connectionsIntf fss *folderSummaryService @@ -114,16 +120,19 @@ type connectionsIntf interface { Status() map[string]interface{} } -func newAPIService(id protocol.DeviceID, cfg configIntf, httpsCertFile, httpsKeyFile, assetDir string, m modelIntf, eventSub events.BufferedSubscription, diskEventSub events.BufferedSubscription, discoverer discover.CachingMux, connectionsService connectionsIntf, errors, systemLog logger.Recorder) *apiService { +func newAPIService(id protocol.DeviceID, cfg configIntf, httpsCertFile, httpsKeyFile, assetDir string, m modelIntf, defaultSub, diskSub events.BufferedSubscription, discoverer discover.CachingMux, connectionsService connectionsIntf, errors, systemLog logger.Recorder) *apiService { service := &apiService{ - id: id, - cfg: cfg, - httpsCertFile: httpsCertFile, - httpsKeyFile: httpsKeyFile, - statics: newStaticsServer(cfg.GUI().Theme, assetDir), - model: m, - eventSub: eventSub, - diskEventSub: diskEventSub, + id: id, + cfg: cfg, + httpsCertFile: httpsCertFile, + httpsKeyFile: httpsKeyFile, + statics: newStaticsServer(cfg.GUI().Theme, assetDir), + model: m, + eventSubs: map[events.EventType]events.BufferedSubscription{ + defaultEventMask: defaultSub, + diskEventMask: diskSub, + }, + eventSubsMut: sync.NewMutex(), discoverer: discoverer, connectionsService: connectionsService, systemConfigMut: sync.NewMutex(), @@ -234,7 +243,7 @@ func (s *apiService) Serve() { getRestMux.HandleFunc("/rest/db/need", s.getDBNeed) // folder [perpage] [page] getRestMux.HandleFunc("/rest/db/status", s.getDBStatus) // folder getRestMux.HandleFunc("/rest/db/browse", s.getDBBrowse) // folder [prefix] [dirsonly] [levels] - getRestMux.HandleFunc("/rest/events", s.getIndexEvents) // [since] [limit] [timeout] + getRestMux.HandleFunc("/rest/events", s.getIndexEvents) // [since] [limit] [timeout] [events] getRestMux.HandleFunc("/rest/events/disk", s.getDiskEvents) // [since] [limit] [timeout] getRestMux.HandleFunc("/rest/stats/device", s.getDeviceStats) // - getRestMux.HandleFunc("/rest/stats/folder", s.getFolderStats) // - @@ -1011,11 +1020,14 @@ func (s *apiService) postDBIgnores(w http.ResponseWriter, r *http.Request) { func (s *apiService) getIndexEvents(w http.ResponseWriter, r *http.Request) { s.fss.gotEventRequest() - s.getEvents(w, r, s.eventSub) + mask := s.getEventMask(r.URL.Query().Get("events")) + sub := s.getEventSub(mask) + s.getEvents(w, r, sub) } func (s *apiService) getDiskEvents(w http.ResponseWriter, r *http.Request) { - s.getEvents(w, r, s.diskEventSub) + sub := s.getEventSub(diskEventMask) + s.getEvents(w, r, sub) } func (s *apiService) getEvents(w http.ResponseWriter, r *http.Request, eventSub events.BufferedSubscription) { @@ -1047,6 +1059,31 @@ func (s *apiService) getEvents(w http.ResponseWriter, r *http.Request, eventSub sendJSON(w, evs) } +func (s *apiService) getEventMask(evs string) events.EventType { + eventMask := defaultEventMask + if evs != "" { + eventList := strings.Split(evs, ",") + eventMask = 0 + for _, ev := range eventList { + eventMask |= events.UnmarshalEventType(strings.TrimSpace(ev)) + } + } + return eventMask +} + +func (s *apiService) getEventSub(mask events.EventType) events.BufferedSubscription { + s.eventSubsMut.Lock() + bufsub, ok := s.eventSubs[mask] + if !ok { + evsub := events.Default.Subscribe(mask) + bufsub = events.NewBufferedSubscription(evsub, eventSubBufferSize) + s.eventSubs[mask] = bufsub + } + s.eventSubsMut.Unlock() + + return bufsub +} + func (s *apiService) getSystemUpgrade(w http.ResponseWriter, r *http.Request) { if noUpgradeFromEnv { http.Error(w, upgrade.ErrUpgradeUnsupported.Error(), 500) diff --git a/cmd/syncthing/gui_test.go b/cmd/syncthing/gui_test.go index bd2919f82..92e2df7f9 100644 --- a/cmd/syncthing/gui_test.go +++ b/cmd/syncthing/gui_test.go @@ -23,6 +23,7 @@ import ( "github.com/d4l3k/messagediff" "github.com/syncthing/syncthing/lib/config" + "github.com/syncthing/syncthing/lib/events" "github.com/syncthing/syncthing/lib/protocol" "github.com/syncthing/syncthing/lib/sync" "github.com/thejerf/suture" @@ -924,3 +925,34 @@ func TestOptionsRequest(t *testing.T) { t.Fatal("OPTIONS on /rest/system/status should return a 'Access-Control-Allow-Headers: Content-Type, X-API-KEY' header") } } + +func TestEventMasks(t *testing.T) { + cfg := new(mockedConfig) + defSub := new(mockedEventSub) + diskSub := new(mockedEventSub) + svc := newAPIService(protocol.LocalDeviceID, cfg, "", "", "", nil, defSub, diskSub, nil, nil, nil, nil) + + if mask := svc.getEventMask(""); mask != defaultEventMask { + t.Errorf("incorrect default mask %x != %x", int64(mask), int64(defaultEventMask)) + } + + expected := events.FolderSummary | events.LocalChangeDetected + if mask := svc.getEventMask("FolderSummary,LocalChangeDetected"); mask != expected { + t.Errorf("incorrect parsed mask %x != %x", int64(mask), int64(expected)) + } + + expected = 0 + if mask := svc.getEventMask("WeirdEvent,something else that doens't exist"); mask != expected { + t.Errorf("incorrect parsed mask %x != %x", int64(mask), int64(expected)) + } + + if res := svc.getEventSub(defaultEventMask); res != defSub { + t.Errorf("should have returned the given default event sub") + } + if res := svc.getEventSub(diskEventMask); res != diskSub { + t.Errorf("should have returned the given disk event sub") + } + if res := svc.getEventSub(events.LocalIndexUpdated); res == nil || res == defSub || res == diskSub { + t.Errorf("should have returned a valid, non-default event sub") + } +} diff --git a/cmd/syncthing/main.go b/cmd/syncthing/main.go index 26faaaefb..13ba2a559 100644 --- a/cmd/syncthing/main.go +++ b/cmd/syncthing/main.go @@ -637,8 +637,8 @@ func syncthingMain(runtimeOptions RuntimeOptions) { // Event subscription for the API; must start early to catch the early // events. The LocalChangeDetected event might overwhelm the event // receiver in some situations so we will not subscribe to it here. - apiSub := events.NewBufferedSubscription(events.Default.Subscribe(events.AllEvents&^events.LocalChangeDetected&^events.RemoteChangeDetected), 1000) - diskSub := events.NewBufferedSubscription(events.Default.Subscribe(events.LocalChangeDetected|events.RemoteChangeDetected), 1000) + defaultSub := events.NewBufferedSubscription(events.Default.Subscribe(defaultEventMask), eventSubBufferSize) + diskSub := events.NewBufferedSubscription(events.Default.Subscribe(diskEventMask), eventSubBufferSize) if len(os.Getenv("GOMAXPROCS")) == 0 { runtime.GOMAXPROCS(runtime.NumCPU()) @@ -868,7 +868,7 @@ func syncthingMain(runtimeOptions RuntimeOptions) { // GUI - setupGUI(mainService, cfg, m, apiSub, diskSub, cachedDiscovery, connectionsService, errors, systemLog, runtimeOptions) + setupGUI(mainService, cfg, m, defaultSub, diskSub, cachedDiscovery, connectionsService, errors, systemLog, runtimeOptions) if runtimeOptions.cpuProfile { f, err := os.Create(fmt.Sprintf("cpu-%d.pprof", os.Getpid())) @@ -1086,7 +1086,7 @@ func startAuditing(mainService *suture.Supervisor, auditFile string) { l.Infoln("Audit log in", auditDest) } -func setupGUI(mainService *suture.Supervisor, cfg *config.Wrapper, m *model.Model, apiSub events.BufferedSubscription, diskSub events.BufferedSubscription, discoverer discover.CachingMux, connectionsService *connections.Service, errors, systemLog logger.Recorder, runtimeOptions RuntimeOptions) { +func setupGUI(mainService *suture.Supervisor, cfg *config.Wrapper, m *model.Model, defaultSub, diskSub events.BufferedSubscription, discoverer discover.CachingMux, connectionsService *connections.Service, errors, systemLog logger.Recorder, runtimeOptions RuntimeOptions) { guiCfg := cfg.GUI() if !guiCfg.Enabled { @@ -1097,7 +1097,7 @@ func setupGUI(mainService *suture.Supervisor, cfg *config.Wrapper, m *model.Mode l.Warnln("Insecure admin access is enabled.") } - api := newAPIService(myID, cfg, locations[locHTTPSCertFile], locations[locHTTPSKeyFile], runtimeOptions.assetDir, m, apiSub, diskSub, discoverer, connectionsService, errors, systemLog) + api := newAPIService(myID, cfg, locations[locHTTPSCertFile], locations[locHTTPSKeyFile], runtimeOptions.assetDir, m, defaultSub, diskSub, discoverer, connectionsService, errors, systemLog) cfg.Subscribe(api) mainService.Add(api) diff --git a/lib/events/events.go b/lib/events/events.go index 3eca53c18..6d4033e89 100644 --- a/lib/events/events.go +++ b/lib/events/events.go @@ -118,6 +118,67 @@ func (t EventType) MarshalText() ([]byte, error) { return []byte(t.String()), nil } +func UnmarshalEventType(s string) EventType { + switch s { + case "Starting": + return Starting + case "StartupComplete": + return StartupComplete + case "DeviceDiscovered": + return DeviceDiscovered + case "DeviceConnected": + return DeviceConnected + case "DeviceDisconnected": + return DeviceDisconnected + case "DeviceRejected": + return DeviceRejected + case "LocalChangeDetected": + return LocalChangeDetected + case "RemoteChangeDetected": + return RemoteChangeDetected + case "LocalIndexUpdated": + return LocalIndexUpdated + case "RemoteIndexUpdated": + return RemoteIndexUpdated + case "ItemStarted": + return ItemStarted + case "ItemFinished": + return ItemFinished + case "StateChanged": + return StateChanged + case "FolderRejected": + return FolderRejected + case "ConfigSaved": + return ConfigSaved + case "DownloadProgress": + return DownloadProgress + case "RemoteDownloadProgress": + return RemoteDownloadProgress + case "FolderSummary": + return FolderSummary + case "FolderCompletion": + return FolderCompletion + case "FolderErrors": + return FolderErrors + case "DevicePaused": + return DevicePaused + case "DeviceResumed": + return DeviceResumed + case "FolderScanProgress": + return FolderScanProgress + case "FolderPaused": + return FolderPaused + case "FolderResumed": + return FolderResumed + case "ListenAddressesChanged": + return ListenAddressesChanged + case "LoginAttempt": + return LoginAttempt + default: + return 0 + } +} + const BufferSize = 64 type Logger struct {