From b0f46beffb6c8ee2288468e4a5057ec0b397e00d Mon Sep 17 00:00:00 2001 From: Jakob Borg Date: Sun, 13 Jul 2014 21:07:24 +0200 Subject: [PATCH] Basic events interface --- cmd/stevents/main.go | 54 +++++++++++ cmd/syncthing/gui.go | 13 +++ cmd/syncthing/main.go | 16 ++++ events/events.go | 203 ++++++++++++++++++++++++++++++++++++++++++ events/events_test.go | 174 ++++++++++++++++++++++++++++++++++++ model/model.go | 16 ++++ model/puller.go | 11 +++ 7 files changed, 487 insertions(+) create mode 100644 cmd/stevents/main.go create mode 100644 events/events.go create mode 100644 events/events_test.go diff --git a/cmd/stevents/main.go b/cmd/stevents/main.go new file mode 100644 index 000000000..85ed37398 --- /dev/null +++ b/cmd/stevents/main.go @@ -0,0 +1,54 @@ +package main + +import ( + "encoding/json" + "flag" + "fmt" + "log" + "net/http" + "time" +) + +type event struct { + ID int + Type string + Time time.Time + Data map[string]interface{} +} + +func main() { + target := flag.String("target", "localhost:8080", "Target Syncthing instance") + apikey := flag.String("apikey", "", "Syncthing API key") + flag.Parse() + + if *apikey == "" { + log.Fatal("Must give -apikey argument") + } + + since := 0 + for { + req, err := http.NewRequest("GET", fmt.Sprintf("http://%s/rest/events?since=%d", *target, since), nil) + if err != nil { + log.Fatal(err) + } + req.Header.Set("X-API-Key", *apikey) + res, err := http.DefaultClient.Do(req) + if err != nil { + log.Fatal(err) + } + + var events []event + err = json.NewDecoder(res.Body).Decode(&events) + if err != nil { + log.Fatal(err) + } + + for _, event := range events { + log.Printf("%d: %v", event.ID, event.Type) + for k, v := range event.Data { + log.Printf("\t%s: %v", k, v) + } + since = event.ID + } + } +} diff --git a/cmd/syncthing/gui.go b/cmd/syncthing/gui.go index bf04809ef..fd26ddb79 100644 --- a/cmd/syncthing/gui.go +++ b/cmd/syncthing/gui.go @@ -18,6 +18,7 @@ import ( "path/filepath" "reflect" "runtime" + "strconv" "strings" "sync" "time" @@ -26,6 +27,7 @@ import ( "code.google.com/p/go.crypto/bcrypt" "github.com/calmh/syncthing/auto" "github.com/calmh/syncthing/config" + "github.com/calmh/syncthing/events" "github.com/calmh/syncthing/logger" "github.com/calmh/syncthing/model" "github.com/vitrun/qart/qr" @@ -43,6 +45,7 @@ var ( static func(http.ResponseWriter, *http.Request, *log.Logger) apiKey string modt = time.Now().UTC().Format(http.TimeFormat) + eventSub = events.NewBufferedSubscription(events.Default.Subscribe(events.AllEvents), 1000) ) const ( @@ -98,6 +101,7 @@ func startGUI(cfg config.GUIConfiguration, assetDir string, m *model.Model) erro getRestMux.HandleFunc("/rest/errors", restGetErrors) getRestMux.HandleFunc("/rest/discovery", restGetDiscovery) getRestMux.HandleFunc("/rest/report", withModel(m, restGetReport)) + getRestMux.HandleFunc("/rest/events", restGetEvents) // The POST handlers postRestMux := http.NewServeMux() @@ -414,6 +418,15 @@ func restGetReport(m *model.Model, w http.ResponseWriter, r *http.Request) { json.NewEncoder(w).Encode(reportData(m)) } +func restGetEvents(w http.ResponseWriter, r *http.Request) { + qs := r.URL.Query() + ts := qs.Get("since") + since, _ := strconv.Atoi(ts) + + w.Header().Set("Content-Type", "application/json; charset=utf-8") + json.NewEncoder(w).Encode(eventSub.Since(since, nil)) +} + func getQR(w http.ResponseWriter, r *http.Request) { r.ParseForm() text := r.FormValue("text") diff --git a/cmd/syncthing/main.go b/cmd/syncthing/main.go index 00bb93d6f..e9b1ea7e3 100644 --- a/cmd/syncthing/main.go +++ b/cmd/syncthing/main.go @@ -28,6 +28,7 @@ import ( "github.com/calmh/syncthing/config" "github.com/calmh/syncthing/discover" + "github.com/calmh/syncthing/events" "github.com/calmh/syncthing/logger" "github.com/calmh/syncthing/model" "github.com/calmh/syncthing/osutil" @@ -454,10 +455,21 @@ nextRepo: }() } + events.Default.Log(events.StartupComplete, nil) + go generateEvents() + <-stop + l.Okln("Exiting") } +func generateEvents() { + for { + time.Sleep(300 * time.Second) + events.Default.Log(events.Ping, nil) + } +} + func waitForParentExit() { l.Infoln("Waiting for parent to exit...") // Wait for the listen address to become free, indicating that the parent has exited. @@ -723,6 +735,10 @@ next: protoConn := protocol.NewConnection(remoteID, conn, wr, m) l.Infof("Established secure connection to %s at %v", remoteID, conn.RemoteAddr()) + events.Default.Log(events.NodeConnected, map[string]string{ + "id": remoteID.String(), + "addr": conn.RemoteAddr().String(), + }) m.AddConnection(conn, protoConn) continue next diff --git a/events/events.go b/events/events.go new file mode 100644 index 000000000..4b9c33aec --- /dev/null +++ b/events/events.go @@ -0,0 +1,203 @@ +// Package events provides event subscription and polling functionality. +package events + +import ( + "errors" + "sync" + "time" +) + +type EventType uint64 + +const ( + Ping = 1 << iota + StartupComplete + NodeConnected + NodeDisconnected + LocalIndexUpdated + RemoteIndexUpdated + ItemStarted + ItemCompleted + + AllEvents = ^EventType(0) +) + +func (t EventType) String() string { + switch t { + case Ping: + return "Ping" + case StartupComplete: + return "StartupComplete" + case NodeConnected: + return "NodeConnected" + case NodeDisconnected: + return "NodeDisconnected" + case LocalIndexUpdated: + return "LocalIndexUpdated" + case RemoteIndexUpdated: + return "RemoteIndexUpdated" + case ItemStarted: + return "ItemStarted" + default: + return "Unknown" + } +} + +func (t EventType) MarshalText() ([]byte, error) { + return []byte(t.String()), nil +} + +const BufferSize = 64 + +type Logger struct { + subs map[int]*Subscription + nextId int + mutex sync.Mutex +} + +type Event struct { + ID int `json:"id"` + Time time.Time `json:"time"` + Type EventType `json:"type"` + Data interface{} `json:"data"` +} + +type Subscription struct { + mask EventType + id int + events chan Event + mutex sync.Mutex +} + +var Default = NewLogger() + +var ( + ErrTimeout = errors.New("timeout") + ErrClosed = errors.New("closed") +) + +func NewLogger() *Logger { + return &Logger{ + subs: make(map[int]*Subscription), + } +} + +func (l *Logger) Log(t EventType, data interface{}) { + l.mutex.Lock() + e := Event{ + ID: l.nextId, + Time: time.Now(), + Type: t, + Data: data, + } + l.nextId++ + for _, s := range l.subs { + if s.mask&t != 0 { + select { + case s.events <- e: + default: + //log.Println("Dropping event:", e) + } + } + } + l.mutex.Unlock() +} + +func (l *Logger) Subscribe(mask EventType) *Subscription { + l.mutex.Lock() + s := &Subscription{ + mask: mask, + id: l.nextId, + events: make(chan Event, BufferSize), + } + l.nextId++ + l.subs[s.id] = s + l.mutex.Unlock() + return s +} + +func (l *Logger) Unsubscribe(s *Subscription) { + l.mutex.Lock() + delete(l.subs, s.id) + close(s.events) + l.mutex.Unlock() +} + +func (s *Subscription) Poll(timeout time.Duration) (Event, error) { + s.mutex.Lock() + defer s.mutex.Unlock() + + to := time.After(timeout) + select { + case e, ok := <-s.events: + if !ok { + return e, ErrClosed + } + return e, nil + case <-to: + return Event{}, ErrTimeout + } +} + +type BufferedSubscription struct { + sub *Subscription + buf []Event + next int + cur int + mut sync.Mutex + cond *sync.Cond +} + +func NewBufferedSubscription(s *Subscription, size int) *BufferedSubscription { + bs := &BufferedSubscription{ + sub: s, + buf: make([]Event, size), + } + bs.cond = sync.NewCond(&bs.mut) + go bs.pollingLoop() + return bs +} + +func (s *BufferedSubscription) pollingLoop() { + for { + ev, err := s.sub.Poll(60 * time.Second) + if err == ErrTimeout { + continue + } + if err == ErrClosed { + return + } + if err != nil { + panic("unexpected error: " + err.Error()) + } + + s.mut.Lock() + s.buf[s.next] = ev + s.next = (s.next + 1) % len(s.buf) + s.cur = ev.ID + s.cond.Broadcast() + s.mut.Unlock() + } +} + +func (s *BufferedSubscription) Since(id int, into []Event) []Event { + s.mut.Lock() + defer s.mut.Unlock() + + for id >= s.cur { + s.cond.Wait() + } + + for i := s.next; i < len(s.buf); i++ { + if s.buf[i].ID > id { + into = append(into, s.buf[i]) + } + } + for i := 0; i < s.next; i++ { + if s.buf[i].ID > id { + into = append(into, s.buf[i]) + } + } + + return into +} diff --git a/events/events_test.go b/events/events_test.go new file mode 100644 index 000000000..ffc4adfe4 --- /dev/null +++ b/events/events_test.go @@ -0,0 +1,174 @@ +package events_test + +import ( + "fmt" + "testing" + "time" + + "github.com/calmh/syncthing/events" +) + +var timeout = 100 * time.Millisecond + +func TestNewLogger(t *testing.T) { + l := events.NewLogger() + if l == nil { + t.Fatal("Unexpected nil Logger") + } +} + +func TestSubscriber(t *testing.T) { + l := events.NewLogger() + s := l.Subscribe(0) + if s == nil { + t.Fatal("Unexpected nil Subscription") + } +} + +func TestTimeout(t *testing.T) { + l := events.NewLogger() + s := l.Subscribe(0) + _, err := s.Poll(timeout) + if err != events.ErrTimeout { + t.Fatal("Unexpected non-Timeout error:", err) + } +} + +func TestEventBeforeSubscribe(t *testing.T) { + l := events.NewLogger() + + l.Log(events.NodeConnected, "foo") + s := l.Subscribe(0) + + _, err := s.Poll(timeout) + if err != events.ErrTimeout { + t.Fatal("Unexpected non-Timeout error:", err) + } +} + +func TestEventAfterSubscribe(t *testing.T) { + l := events.NewLogger() + + s := l.Subscribe(events.AllEvents) + l.Log(events.NodeConnected, "foo") + + ev, err := s.Poll(timeout) + + if err != nil { + t.Fatal("Unexpected error:", err) + } + if ev.Type != events.NodeConnected { + t.Error("Incorrect event type", ev.Type) + } + switch v := ev.Data.(type) { + case string: + if v != "foo" { + t.Error("Incorrect Data string", v) + } + default: + t.Errorf("Incorrect Data type %#v", v) + } +} + +func TestEventAfterSubscribeIgnoreMask(t *testing.T) { + l := events.NewLogger() + + s := l.Subscribe(events.NodeDisconnected) + l.Log(events.NodeConnected, "foo") + + _, err := s.Poll(timeout) + if err != events.ErrTimeout { + t.Fatal("Unexpected non-Timeout error:", err) + } +} + +func TestBufferOverflow(t *testing.T) { + l := events.NewLogger() + + _ = l.Subscribe(events.AllEvents) + + t0 := time.Now() + for i := 0; i < events.BufferSize*2; i++ { + l.Log(events.NodeConnected, "foo") + } + if time.Since(t0) > timeout { + t.Fatalf("Logging took too long") + } +} + +func TestUnsubscribe(t *testing.T) { + l := events.NewLogger() + + s := l.Subscribe(events.AllEvents) + l.Log(events.NodeConnected, "foo") + + _, err := s.Poll(timeout) + if err != nil { + t.Fatal("Unexpected error:", err) + } + + l.Unsubscribe(s) + l.Log(events.NodeConnected, "foo") + + _, err = s.Poll(timeout) + if err != events.ErrClosed { + t.Fatal("Unexpected non-Closed error:", err) + } +} + +func TestIDs(t *testing.T) { + l := events.NewLogger() + + s := l.Subscribe(events.AllEvents) + l.Log(events.NodeConnected, "foo") + l.Log(events.NodeConnected, "bar") + + ev, err := s.Poll(timeout) + if err != nil { + t.Fatal("Unexpected error:", err) + } + if ev.Data.(string) != "foo" { + t.Fatal("Incorrect event:", ev) + } + id := ev.ID + + ev, err = s.Poll(timeout) + if err != nil { + t.Fatal("Unexpected error:", err) + } + if ev.Data.(string) != "bar" { + t.Fatal("Incorrect event:", ev) + } + if !(ev.ID > id) { + t.Fatalf("ID not incremented (%d !> %d)", ev.ID, id) + } +} + +func TestBufferedSub(t *testing.T) { + l := events.NewLogger() + + s := l.Subscribe(events.AllEvents) + bs := events.NewBufferedSubscription(s, 10*events.BufferSize) + + go func() { + for i := 0; i < 10*events.BufferSize; i++ { + l.Log(events.NodeConnected, fmt.Sprintf("event-%d", i)) + if i%30 == 0 { + // Give the buffer routine time to pick up the events + time.Sleep(20 * time.Millisecond) + } + } + }() + + recv := 0 + for recv < 10*events.BufferSize { + evs := bs.Since(recv, nil) + for _, ev := range evs { + if ev.ID != recv+1 { + t.Fatalf("Incorrect ID; %d != %d", ev.ID, recv+1) + } + recv = ev.ID + } + } + +} diff --git a/model/model.go b/model/model.go index b5f85ae25..eef00af6c 100644 --- a/model/model.go +++ b/model/model.go @@ -18,6 +18,7 @@ import ( "time" "github.com/calmh/syncthing/config" + "github.com/calmh/syncthing/events" "github.com/calmh/syncthing/files" "github.com/calmh/syncthing/lamport" "github.com/calmh/syncthing/osutil" @@ -315,6 +316,11 @@ func (m *Model) Index(nodeID protocol.NodeID, repo string, fs []protocol.FileInf l.Fatalf("Index for nonexistant repo %q", repo) } m.rmut.RUnlock() + + events.Default.Log(events.RemoteIndexUpdated, map[string]string{ + "node": nodeID.String(), + "repo": repo, + }) } // IndexUpdate is called for incremental updates to connected nodes' indexes. @@ -336,6 +342,11 @@ func (m *Model) IndexUpdate(nodeID protocol.NodeID, repo string, fs []protocol.F l.Fatalf("IndexUpdate for nonexistant repo %q", repo) } m.rmut.RUnlock() + + events.Default.Log(events.RemoteIndexUpdated, map[string]string{ + "node": nodeID.String(), + "repo": repo, + }) } func (m *Model) repoSharedWith(repo string, nodeID protocol.NodeID) bool { @@ -376,6 +387,10 @@ func (m *Model) ClusterConfig(nodeID protocol.NodeID, config protocol.ClusterCon // Implements the protocol.Model interface. func (m *Model) Close(node protocol.NodeID, err error) { l.Infof("Connection to %s closed: %v", node, err) + events.Default.Log(events.NodeDisconnected, map[string]string{ + "id": node.String(), + "error": err.Error(), + }) m.rmut.RLock() for _, repo := range m.nodeRepos[node] { @@ -541,6 +556,7 @@ func (m *Model) updateLocal(repo string, f protocol.FileInfo) { m.rmut.RLock() m.repoFiles[repo].Update(protocol.LocalNodeID, []protocol.FileInfo{f}) m.rmut.RUnlock() + events.Default.Log(events.LocalIndexUpdated, map[string]string{"repo": repo}) } func (m *Model) requestGlobal(nodeID protocol.NodeID, repo, name string, offset int64, size int, hash []byte) ([]byte, error) { diff --git a/model/puller.go b/model/puller.go index 313cb34b8..d5e28f380 100644 --- a/model/puller.go +++ b/model/puller.go @@ -13,6 +13,7 @@ import ( "time" "github.com/calmh/syncthing/config" + "github.com/calmh/syncthing/events" "github.com/calmh/syncthing/osutil" "github.com/calmh/syncthing/protocol" "github.com/calmh/syncthing/scanner" @@ -395,6 +396,11 @@ func (p *puller) handleBlock(b bqBlock) bool { } } + events.Default.Log(events.ItemStarted, map[string]string{ + "repo": p.repoCfg.ID, + "item": f.Name, + }) + p.model.updateLocal(p.repoCfg.ID, f) return true } @@ -407,6 +413,11 @@ func (p *puller) handleBlock(b bqBlock) bool { l.Debugf("pull: %q: opening file %q", p.repoCfg.ID, f.Name) } + events.Default.Log(events.ItemStarted, map[string]string{ + "repo": p.repoCfg.ID, + "item": f.Name, + }) + of.availability = p.model.repoFiles[p.repoCfg.ID].Availability(f.Name) of.filepath = filepath.Join(p.repoCfg.Directory, f.Name) of.temp = filepath.Join(p.repoCfg.Directory, defTempNamer.TempName(f.Name))