Merge pull request #1956 from calmh/earlyevents

Fix API event subscription
This commit is contained in:
Audrius Butkevicius 2015-06-16 08:46:29 +01:00
commit 08647f1267
2 changed files with 10 additions and 9 deletions

View File

@ -49,7 +49,6 @@ var (
guiErrors = []guiError{} guiErrors = []guiError{}
guiErrorsMut = sync.NewMutex() guiErrorsMut = sync.NewMutex()
startTime = time.Now() startTime = time.Now()
eventSub *events.BufferedSubscription
) )
type apiSvc struct { type apiSvc struct {
@ -60,14 +59,16 @@ type apiSvc struct {
fss *folderSummarySvc fss *folderSummarySvc
stop chan struct{} stop chan struct{}
systemConfigMut sync.Mutex systemConfigMut sync.Mutex
eventSub *events.BufferedSubscription
} }
func newAPISvc(cfg config.GUIConfiguration, assetDir string, m *model.Model) (*apiSvc, error) { func newAPISvc(cfg config.GUIConfiguration, assetDir string, m *model.Model, eventSub *events.BufferedSubscription) (*apiSvc, error) {
svc := &apiSvc{ svc := &apiSvc{
cfg: cfg, cfg: cfg,
assetDir: assetDir, assetDir: assetDir,
model: m, model: m,
systemConfigMut: sync.NewMutex(), systemConfigMut: sync.NewMutex(),
eventSub: eventSub,
} }
var err error var err error
@ -125,9 +126,6 @@ func (s *apiSvc) Serve() {
s.stop = make(chan struct{}) s.stop = make(chan struct{})
l.AddHandler(logger.LevelWarn, s.showGuiError) l.AddHandler(logger.LevelWarn, s.showGuiError)
sub := events.Default.Subscribe(events.AllEvents)
eventSub = events.NewBufferedSubscription(sub, 1000)
defer events.Default.Unsubscribe(sub)
// The GET handlers // The GET handlers
getRestMux := http.NewServeMux() getRestMux := http.NewServeMux()
@ -743,7 +741,7 @@ func (s *apiSvc) getEvents(w http.ResponseWriter, r *http.Request) {
f := w.(http.Flusher) f := w.(http.Flusher)
f.Flush() f.Flush()
evs := eventSub.Since(since, nil) evs := s.eventSub.Since(since, nil)
if 0 < limit && limit < len(evs) { if 0 < limit && limit < len(evs) {
evs = evs[len(evs)-limit:] evs = evs[len(evs)-limit:]
} }

View File

@ -447,6 +447,9 @@ func syncthingMain() {
mainSvc.Add(newVerboseSvc()) mainSvc.Add(newVerboseSvc())
} }
// Event subscription for the API; must start early to catch the early events.
apiSub := events.NewBufferedSubscription(events.Default.Subscribe(events.AllEvents), 1000)
if len(os.Getenv("GOMAXPROCS")) == 0 { if len(os.Getenv("GOMAXPROCS")) == 0 {
runtime.GOMAXPROCS(runtime.NumCPU()) runtime.GOMAXPROCS(runtime.NumCPU())
} }
@ -628,7 +631,7 @@ func syncthingMain() {
// GUI // GUI
setupGUI(mainSvc, cfg, m) setupGUI(mainSvc, cfg, m, apiSub)
// The default port we announce, possibly modified by setupUPnP next. // The default port we announce, possibly modified by setupUPnP next.
@ -768,7 +771,7 @@ func startAuditing(mainSvc *suture.Supervisor) {
l.Infoln("Audit log in", auditFile) l.Infoln("Audit log in", auditFile)
} }
func setupGUI(mainSvc *suture.Supervisor, cfg *config.Wrapper, m *model.Model) { func setupGUI(mainSvc *suture.Supervisor, cfg *config.Wrapper, m *model.Model, apiSub *events.BufferedSubscription) {
opts := cfg.Options() opts := cfg.Options()
guiCfg := overrideGUIConfig(cfg.GUI(), guiAddress, guiAuthentication, guiAPIKey) guiCfg := overrideGUIConfig(cfg.GUI(), guiAddress, guiAuthentication, guiAPIKey)
@ -797,7 +800,7 @@ func setupGUI(mainSvc *suture.Supervisor, cfg *config.Wrapper, m *model.Model) {
urlShow := fmt.Sprintf("%s://%s/", proto, net.JoinHostPort(hostShow, strconv.Itoa(addr.Port))) urlShow := fmt.Sprintf("%s://%s/", proto, net.JoinHostPort(hostShow, strconv.Itoa(addr.Port)))
l.Infoln("Starting web GUI on", urlShow) l.Infoln("Starting web GUI on", urlShow)
api, err := newAPISvc(guiCfg, guiAssets, m) api, err := newAPISvc(guiCfg, guiAssets, m, apiSub)
if err != nil { if err != nil {
l.Fatalln("Cannot start GUI:", err) l.Fatalln("Cannot start GUI:", err)
} }