From 454e688c3d626f8e2413d8c0d8acc65c204d8981 Mon Sep 17 00:00:00 2001 From: Jakob Borg Date: Thu, 26 Mar 2015 23:26:51 +0100 Subject: [PATCH] Push model data instead of pull (fixes #1434) --- cmd/syncthing/gui.go | 25 ++- cmd/syncthing/main.go | 11 +- cmd/syncthing/summarysvc.go | 159 ++++++++++++++++++ .../core/controllers/eventController.js | 6 +- .../core/controllers/syncthingController.js | 80 +++++---- internal/auto/gui.files.go | 4 +- internal/events/events.go | 6 + 7 files changed, 244 insertions(+), 47 deletions(-) create mode 100644 cmd/syncthing/summarysvc.go diff --git a/cmd/syncthing/gui.go b/cmd/syncthing/gui.go index 9a4b6f930..7966668ec 100644 --- a/cmd/syncthing/gui.go +++ b/cmd/syncthing/gui.go @@ -49,6 +49,11 @@ var ( eventSub *events.BufferedSubscription ) +var ( + lastEventRequest time.Time + lastEventRequestMut sync.Mutex +) + func init() { l.AddHandler(logger.LevelWarn, showGuiError) sub := events.Default.Subscribe(events.AllEvents) @@ -179,6 +184,9 @@ func startGUI(cfg config.GUIConfiguration, assetDir string, m *model.Model) erro ReadTimeout: 10 * time.Second, } + csrv := &folderSummarySvc{model: m} + go csrv.Serve() + go func() { err := srv.Serve(listener) if err != nil { @@ -293,8 +301,14 @@ func restGetCompletion(m *model.Model, w http.ResponseWriter, r *http.Request) { } func restGetModel(m *model.Model, w http.ResponseWriter, r *http.Request) { - var qs = r.URL.Query() - var folder = qs.Get("folder") + qs := r.URL.Query() + folder := qs.Get("folder") + res := folderSummary(m, folder) + w.Header().Set("Content-Type", "application/json; charset=utf-8") + json.NewEncoder(w).Encode(res) +} + +func folderSummary(m *model.Model, folder string) map[string]interface{} { var res = make(map[string]interface{}) res["invalid"] = cfg.Folders()[folder].Invalid @@ -322,8 +336,7 @@ func restGetModel(m *model.Model, w http.ResponseWriter, r *http.Request) { } } - w.Header().Set("Content-Type", "application/json; charset=utf-8") - json.NewEncoder(w).Encode(res) + return res } func restPostOverride(m *model.Model, w http.ResponseWriter, r *http.Request) { @@ -598,6 +611,10 @@ func restGetEvents(w http.ResponseWriter, r *http.Request) { since, _ := strconv.Atoi(sinceStr) limit, _ := strconv.Atoi(limitStr) + lastEventRequestMut.Lock() + lastEventRequest = time.Now() + lastEventRequestMut.Unlock() + w.Header().Set("Content-Type", "application/json; charset=utf-8") // Flush before blocking, to indicate that we've received the request diff --git a/cmd/syncthing/main.go b/cmd/syncthing/main.go index f15e847bb..94b65336d 100644 --- a/cmd/syncthing/main.go +++ b/cmd/syncthing/main.go @@ -61,7 +61,10 @@ const ( exitUpgrading = 4 ) -const bepProtocolName = "bep/1.0" +const ( + bepProtocolName = "bep/1.0" + pingEventInterval = time.Minute +) var l = logger.DefaultLogger @@ -613,7 +616,7 @@ func syncthingMain() { } events.Default.Log(events.StartupComplete, nil) - go generateEvents() + go generatePingEvents() code := <-stop @@ -701,9 +704,9 @@ func defaultConfig(myName string) config.Configuration { return newCfg } -func generateEvents() { +func generatePingEvents() { for { - time.Sleep(300 * time.Second) + time.Sleep(pingEventInterval) events.Default.Log(events.Ping, nil) } } diff --git a/cmd/syncthing/summarysvc.go b/cmd/syncthing/summarysvc.go new file mode 100644 index 000000000..9120e524e --- /dev/null +++ b/cmd/syncthing/summarysvc.go @@ -0,0 +1,159 @@ +// Copyright (C) 2015 The Syncthing Authors. +// +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this file, +// You can obtain one at http://mozilla.org/MPL/2.0/. + +package main + +import ( + "sync" + "time" + + "github.com/syncthing/syncthing/internal/events" + "github.com/syncthing/syncthing/internal/model" + "github.com/thejerf/suture" +) + +// The folderSummarySvc adds summary information events (FolderSummary and +// FolderCompletion) into the event stream at certain intervals. +type folderSummarySvc struct { + model *model.Model + srv suture.Service + stop chan struct{} + + // For keeping track of folders to recalculate for + foldersMut sync.Mutex + folders map[string]struct{} +} + +func (c *folderSummarySvc) Serve() { + srv := suture.NewSimple("folderSummarySvc") + srv.Add(serviceFunc(c.listenForUpdates)) + srv.Add(serviceFunc(c.calculateSummaries)) + + c.stop = make(chan struct{}) + c.folders = make(map[string]struct{}) + c.srv = srv + + srv.Serve() +} + +func (c *folderSummarySvc) Stop() { + // c.srv.Stop() is mostly a no-op here, but we need to call it anyway so + // c.srv doesn't try to restart the serviceFuncs when they exit after we + // close the stop channel. + c.srv.Stop() + close(c.stop) +} + +// listenForUpdates subscribes to the event bus and makes note of folders that +// need their data recalculated. +func (c *folderSummarySvc) listenForUpdates() { + sub := events.Default.Subscribe(events.LocalIndexUpdated | events.RemoteIndexUpdated) + defer events.Default.Unsubscribe(sub) + + for { + // This loop needs to be fast so we don't miss too many events. + + select { + case ev := <-sub.C(): + // Whenever the local or remote index is updated for a given + // folder we make a note of it. + + data := ev.Data.(map[string]interface{}) + folder := data["folder"].(string) + c.foldersMut.Lock() + c.folders[folder] = struct{}{} + c.foldersMut.Unlock() + + case <-c.stop: + return + } + } +} + +// calculateSummaries periodically recalculates folder summaries and +// completion percentage, and sends the results on the event bus. +func (c *folderSummarySvc) calculateSummaries() { + const pumpInterval = 2 * time.Second + pump := time.NewTimer(pumpInterval) + + for { + select { + case <-pump.C: + // We only recalculate sumamries if someone is listening to events + // (a request to /rest/events has been made within the last + // pingEventInterval). + + lastEventRequestMut.Lock() + // XXX: Reaching out to a global var here is very ugly :( Should + // we make the gui stuff a proper object with methods on it that + // we can query about this kind of thing? + last := lastEventRequest + lastEventRequestMut.Unlock() + + t0 := time.Now() + if time.Since(last) < pingEventInterval { + for _, folder := range c.foldersToHandle() { + // The folder summary contains how many bytes, files etc + // are in the folder and how in sync we are. + data := folderSummary(c.model, folder) + events.Default.Log(events.FolderSummary, map[string]interface{}{ + "folder": folder, + "summary": data, + }) + + for _, devCfg := range cfg.Folders()[folder].Devices { + if devCfg.DeviceID.Equals(myID) { + // We already know about ourselves. + continue + } + if !c.model.ConnectedTo(devCfg.DeviceID) { + // We're not interested in disconnected devices. + continue + } + + // Get completion percentage of this folder for the + // remote device. + comp := c.model.Completion(devCfg.DeviceID, folder) + events.Default.Log(events.FolderCompletion, map[string]interface{}{ + "folder": folder, + "device": devCfg.DeviceID.String(), + "completion": comp, + }) + } + } + } + + // We don't want to spend all our time calculating summaries. Lets + // set an arbitrary limit at not spending more than about 30% of + // our time here... + wait := 2*time.Since(t0) + pumpInterval + pump.Reset(wait) + + case <-c.stop: + return + } + } +} + +// foldersToHandle returns the list of folders needing a summary update, and +// clears the list. +func (c *folderSummarySvc) foldersToHandle() []string { + c.foldersMut.Lock() + res := make([]string, 0, len(c.folders)) + for folder := range c.folders { + res = append(res, folder) + delete(c.folders, folder) + } + c.foldersMut.Unlock() + return res +} + +// serviceFunc wraps a function to create a suture.Service without stop +// functionality. +type serviceFunc func() + +func (f serviceFunc) Serve() { f() } +func (f serviceFunc) Stop() {} diff --git a/gui/scripts/syncthing/core/controllers/eventController.js b/gui/scripts/syncthing/core/controllers/eventController.js index 85c30763d..f72da69ba 100644 --- a/gui/scripts/syncthing/core/controllers/eventController.js +++ b/gui/scripts/syncthing/core/controllers/eventController.js @@ -1,3 +1,5 @@ +var debugEvents = false; + angular.module('syncthing.core') .controller('EventController', function ($scope, $http) { 'use strict'; @@ -20,7 +22,9 @@ angular.module('syncthing.core') if (lastID > 0) { data.forEach(function (event) { - console.log("event", event.id, event.type, event.data); + if (debugEvents) { + console.log("event", event.id, event.type, event.data); + } $scope.$emit(event.type, event); }); } diff --git a/gui/scripts/syncthing/core/controllers/syncthingController.js b/gui/scripts/syncthing/core/controllers/syncthingController.js index 610f830ba..e7615d82e 100644 --- a/gui/scripts/syncthing/core/controllers/syncthingController.js +++ b/gui/scripts/syncthing/core/controllers/syncthingController.js @@ -140,19 +140,11 @@ angular.module('syncthing.core') $scope.$on('LocalIndexUpdated', function (event, arg) { var data = arg.data; - refreshFolder(data.folder); refreshFolderStats(); - - // Update completion status for all devices that we share this folder with. - $scope.folders[data.folder].devices.forEach(function (deviceCfg) { - refreshCompletion(deviceCfg.deviceID, data.folder); - }); }); $scope.$on('RemoteIndexUpdated', function (event, arg) { - var data = arg.data; - refreshFolder(data.folder); - refreshCompletion(data.device, data.folder); + // Nothing }); $scope.$on('DeviceDisconnected', function (event, arg) { @@ -215,7 +207,6 @@ angular.module('syncthing.core') var stats = arg.data; var progress = {}; for (var folder in stats) { - refreshFolder(folder); progress[folder] = {}; for (var file in stats[folder]) { var s = stats[folder][file]; @@ -241,7 +232,6 @@ angular.module('syncthing.core') } for (var folder in $scope.progress) { if (!(folder in progress)) { - refreshFolder(folder); if ($scope.neededFolder == folder) { refreshNeed(folder); } @@ -258,6 +248,30 @@ angular.module('syncthing.core') console.log("DownloadProgress", $scope.progress); }); + $scope.$on('FolderSummary', function (event, arg) { + var data = arg.data; + $scope.model[data.folder] = data.summary; + }); + + $scope.$on('FolderCompletion', function (event, arg) { + var data = arg.data; + if (!$scope.completion[data.device]) { + $scope.completion[data.device] = {}; + } + $scope.completion[data.device][data.folder] = data.completion; + + var tot = 0, + cnt = 0; + for (var cmp in $scope.completion[data.device]) { + if (cmp === "_total") { + continue; + } + tot += $scope.completion[data.device][cmp]; + cnt += 1; + } + $scope.completion[data.device]._total = tot / cnt; + }); + $scope.emitHTTPError = function (data, status, headers, config) { $scope.$emit('HTTPError', {data: data, status: status, headers: headers, config: config}); }; @@ -325,31 +339,25 @@ angular.module('syncthing.core') return; } - var key = "refreshCompletion" + device + folder; - if (!debouncedFuncs[key]) { - debouncedFuncs[key] = debounce(function () { - $http.get(urlbase + '/completion?device=' + device + '&folder=' + encodeURIComponent(folder)).success(function (data) { - if (!$scope.completion[device]) { - $scope.completion[device] = {}; - } - $scope.completion[device][folder] = data.completion; + $http.get(urlbase + '/completion?device=' + device + '&folder=' + encodeURIComponent(folder)).success(function (data) { + if (!$scope.completion[device]) { + $scope.completion[device] = {}; + } + $scope.completion[device][folder] = data.completion; - var tot = 0, - cnt = 0; - for (var cmp in $scope.completion[device]) { - if (cmp === "_total") { - continue; - } - tot += $scope.completion[device][cmp]; - cnt += 1; - } - $scope.completion[device]._total = tot / cnt; + var tot = 0, + cnt = 0; + for (var cmp in $scope.completion[device]) { + if (cmp === "_total") { + continue; + } + tot += $scope.completion[device][cmp]; + cnt += 1; + } + $scope.completion[device]._total = tot / cnt; - console.log("refreshCompletion", device, folder, $scope.completion[device]); - }).error($scope.emitHTTPError); - }, 1000, true); - } - debouncedFuncs[key](); + console.log("refreshCompletion", device, folder, $scope.completion[device]); + }).error($scope.emitHTTPError); } function refreshConnectionStats() { @@ -412,7 +420,7 @@ angular.module('syncthing.core') } console.log("refreshDeviceStats", data); }).error($scope.emitHTTPError); - }, 500); + }, 2500); var refreshFolderStats = debounce(function () { $http.get(urlbase + "/stats/folder").success(function (data) { @@ -424,7 +432,7 @@ angular.module('syncthing.core') } console.log("refreshfolderStats", data); }).error($scope.emitHTTPError); - }, 500); + }, 2500); $scope.refresh = function () { refreshSystem(); diff --git a/internal/auto/gui.files.go b/internal/auto/gui.files.go index 5da44d1a5..1eca14ea3 100644 --- a/internal/auto/gui.files.go +++ b/internal/auto/gui.files.go @@ -187,12 +187,12 @@ func Assets() map[string][]byte { bs, _ = ioutil.ReadAll(gr) assets["scripts/syncthing/app.js"] = bs - bs, _ = base64.StdEncoding.DecodeString("H4sIAAAJbogA/7RUT4/aPhC98ynmt0JK0C8NUKknRHtod6U99dCteqh6MMkkseTYyB4vQhXfvWOHDRCCtqpaXwjz/z2/sdC1V8LmrSm9wjRxe11QI3WdF8ZiMpsAH/7WZI1SaNPk/hk1fewNSQaV5xxpNKRTV5gtZjBtiLYz+Bmzw0m8Q3BkZUHJatKbu/hcCUexLKxBe6VWfcCzsBC8j5/YtTjLDA7niwKde9DsO81QChLnrcOZz+Fbgxq+vKADi46EJQe7RioEahCUYfuWQQU/I9bYVZQOpB6W21pTcw0XMzfW7Bxa4GBnWoStElQZ2zpuQ95qBwLeLhaQOqmL2GxYrkFRonXQCAcb5Ekr5V2DJewkNbFHV4nnKjHUmmWdSxsIgPNhwaeGx94IJwuh1B5aFDrMKigWO0PXd2Q0SBkIXXYhnDcsuuNYbQhEQT6W5RtgDiqvLvvLCtL/xu4hHLTW2Afd3dPqyt3hvLQfJhd/j6qZYispTb4+ftZ8ZazV1eRqiqN23sNibJTIHF/UvSia9KQgDFIciw+HqXNGsWZNnd7FyLsM4m8uy5cv2oct6L5v4LwCcpU5knSY/Q4x5+sUun+POBXqmgXzBpY/Lov0+zVMZ0ADSlkhT7JF4+mMrjGm4gOQ10ipt4pliPA/JPMIy32IW7BO2NS1no2Skx/XO+3X/EZcFFR6lNWQoAze8bKcjIfBG3JMe/UFudJcVY2K7u8wpCS3WS+Tf8vMcnGTmj+Y7rWpRqcJgv4FAAD//wEAAP//QKHce4MGAAA=") + bs, _ = base64.StdEncoding.DecodeString("H4sIAAAJbogA/7RUQYvbPBC951fMtyzY4XO9SaGnkPbQ7sKeeuiWHkoPij22BbIUNNKGUPLfO5ITr+M4bCmtLnFGM2/ePD3pWVgocePr+2fUjmANlVCEq9lM6NorYfPWlF5hmtBeF66Rus4LYzGZz4AXf2tnjVJo0yRCfOwDSQaV5xppNKS3VJgtZnDbOLedw89YHVbiCYGclYVLuOsp3OXnSpCLsExMe6VWfUIgHnYfP/HWYlAZNsgXBRI96DBPz6EUTgxbh3V3B98a1PDlNB1YJCcsS7FrpEJwDYIyHN/yUGGfJ9bYIUoCqcdwW2tqxqBYubFmR2iBk8m0CFslXGVsS9zGeasJBLxdLCAlqYvYbAzXoCjREjSCYIPMtFKeGixhJ10Te3RIzKvEgDXPui1tIAycjwGfGqa9ESQLodQeWhQ6cBUugg2m6zvyNOgyELrsUrhuDLrjXG0ciML5CMsnwBpUXp33lxWk/02dQ1horbEPujun1cV2N+d5/DA7+3t0zS220qXJ18fPmo+MvbqaXbA4euc9LKaoROX4oO5F0aQvDsJgxan8E+rgKl1LC4tVJqPY3qZObyLoTQbxN5fl6cvtw4Xpvq9I0kkwFT1T4gJvAuow/x1lh/cxcPoehVKoa3bcG1j+OAfpL+i4nMccnQlb7Em2aLwb6D2lYXxB8hpd6q1iHyP8D8ldHIs+xGu0TjjUtZ5PipMf34e0fyeu5EVHpkdfjgXK4B3ftpfgYfQIHctefYIuTFtVk679OwopyW3Wy+TfKrNcXJXmD9i9xmqSTTD0LwAAAP//AQAA//8bMA983gYAAA==") gr, _ = gzip.NewReader(bytes.NewReader(bs)) bs, _ = ioutil.ReadAll(gr) assets["scripts/syncthing/core/controllers/eventController.js"] = bs - bs, _ = base64.StdEncoding.DecodeString("") + bs, _ = base64.StdEncoding.DecodeString("") gr, _ = gzip.NewReader(bytes.NewReader(bs)) bs, _ = ioutil.ReadAll(gr) assets["scripts/syncthing/core/controllers/syncthingController.js"] = bs diff --git a/internal/events/events.go b/internal/events/events.go index 7ccadd3ca..3f22c4f41 100644 --- a/internal/events/events.go +++ b/internal/events/events.go @@ -31,6 +31,8 @@ const ( FolderRejected ConfigSaved DownloadProgress + FolderSummary + FolderCompletion AllEvents = (1 << iota) - 1 ) @@ -67,6 +69,10 @@ func (t EventType) String() string { return "ConfigSaved" case DownloadProgress: return "DownloadProgress" + case FolderSummary: + return "FolderSummary" + case FolderCompletion: + return "FolderCompletion" default: return "Unknown" }