From 3e1a5624660ddcc8f177dbb1bf98f366f9181d52 Mon Sep 17 00:00:00 2001 From: Jakob Borg Date: Thu, 21 May 2015 08:52:19 +0200 Subject: [PATCH] Reorganize, add aggregation --- build.sh | 4 + cmd/aggregate/main.go | 109 +++++++++++++++++++ analytics.go => cmd/ursrv/analytics.go | 0 formatting.go => cmd/ursrv/formatting.go | 0 main.go => cmd/ursrv/main.go | 133 ++++++++++++++++++++++- 5 files changed, 245 insertions(+), 1 deletion(-) create mode 100755 build.sh create mode 100644 cmd/aggregate/main.go rename analytics.go => cmd/ursrv/analytics.go (100%) rename formatting.go => cmd/ursrv/formatting.go (100%) rename main.go => cmd/ursrv/main.go (76%) diff --git a/build.sh b/build.sh new file mode 100755 index 000000000..4b3fb88d0 --- /dev/null +++ b/build.sh @@ -0,0 +1,4 @@ +#!/bin/sh + +go install -v ./cmd/... + diff --git a/cmd/aggregate/main.go b/cmd/aggregate/main.go new file mode 100644 index 000000000..44b0eeb40 --- /dev/null +++ b/cmd/aggregate/main.go @@ -0,0 +1,109 @@ +package main + +import ( + "database/sql" + _ "github.com/lib/pq" + "log" + "os" + "time" +) + +var dbConn = getEnvDefault("UR_DB_URL", "postgres://user:password@localhost/ur?sslmode=disable") + +func getEnvDefault(key, def string) string { + if val := os.Getenv(key); val != "" { + return val + } + return def +} + +func main() { + log.SetFlags(log.Ltime | log.Ldate) + log.SetOutput(os.Stdout) + + db, err := sql.Open("postgres", dbConn) + if err != nil { + log.Fatalln("database:", err) + } + err = setupDB(db) + if err != nil { + log.Fatalln("database:", err) + } + + for { + runAggregation(db) + // Sleep until one minute past next midnight + sleepUntilNext(24*time.Hour, 1*time.Minute) + } +} + +func runAggregation(db *sql.DB) { + since := maxIndexedDay(db) + log.Println("Aggregating data since", since) + rows, err := aggregate(db, since) + if err != nil { + log.Fatalln("aggregate:", err) + } + log.Println("Inserted", rows, "rows") +} + +func sleepUntilNext(intv, margin time.Duration) { + now := time.Now().UTC() + next := now.Truncate(intv).Add(intv).Add(margin) + log.Println("Sleeping until", next) + time.Sleep(next.Sub(now)) +} + +func setupDB(db *sql.DB) error { + _, err := db.Exec(`CREATE TABLE IF NOT EXISTS VersionSummary ( + Day TIMESTAMP NOT NULL, + Version VARCHAR(8) NOT NULL, + Count INTEGER NOT NULL + )`) + if err != nil { + return err + } + + row := db.QueryRow(`SELECT 'UniqueDayVersionIndex'::regclass`) + if err := row.Scan(nil); err != nil { + _, err = db.Exec(`CREATE UNIQUE INDEX UniqueDayVersionIndex ON VersionSummary (Day, Version)`) + } + + row = db.QueryRow(`SELECT 'DayIndex'::regclass`) + if err := row.Scan(nil); err != nil { + _, err = db.Exec(`CREATE INDEX DayIndex ON VerionSummary (Day)`) + } + + return err +} + +func maxIndexedDay(db *sql.DB) time.Time { + var t time.Time + row := db.QueryRow("SELECT MAX(Day) FROM VersionSummary") + err := row.Scan(&t) + if err != nil { + return time.Time{} + } + return t +} + +func aggregate(db *sql.DB, since time.Time) (int64, error) { + res, err := db.Exec(`INSERT INTO VersionSummary ( + SELECT + DATE_TRUNC('day', Received) AS Day, + SUBSTRING(Version FROM '^v\d.\d+') AS Ver, + COUNT(*) AS Count + FROM Reports + WHERE + DATE_TRUNC('day', Received) > $1 + AND DATE_TRUNC('day', Received) < DATE_TRUNC('day', NOW() - '1 day'::INTERVAL) + AND Version like 'v0.%' + GROUP BY Day, Ver + ); + `, since) + if err != nil { + return 0, err + } + + return res.RowsAffected() +} diff --git a/analytics.go b/cmd/ursrv/analytics.go similarity index 100% rename from analytics.go rename to cmd/ursrv/analytics.go diff --git a/formatting.go b/cmd/ursrv/formatting.go similarity index 100% rename from formatting.go rename to cmd/ursrv/formatting.go diff --git a/main.go b/cmd/ursrv/main.go similarity index 76% rename from main.go rename to cmd/ursrv/main.go index d9c9d249e..8176b5502 100644 --- a/main.go +++ b/cmd/ursrv/main.go @@ -13,6 +13,7 @@ import ( "net/http" "os" "regexp" + "sort" "strings" "sync" "time" @@ -26,6 +27,7 @@ var ( dbConn = getEnvDefault("UR_DB_URL", "postgres://user:password@localhost/ur?sslmode=disable") listenAddr = getEnvDefault("UR_LISTEN", "0.0.0.0:8443") tpl *template.Template + compilerRe = regexp.MustCompile(`\(([A-Za-z0-9()., -]+) [\w-]+ \w+\) ([\w@-]+)`) ) var funcs = map[string]interface{}{ @@ -106,7 +108,7 @@ func setupDB(db *sql.DB) error { } func insertReport(db *sql.DB, r report) error { - _, err := db.Exec(`INSERT INTO Reports VALUES (now(), $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)`, + _, err := db.Exec(`INSERT INTO Reports VALUES (TIMEZONE('UTC', NOW()), $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)`, r.UniqueID, r.Version, r.LongVersion, r.Platform, r.NumFolders, r.NumDevices, r.TotFiles, r.FolderMaxFiles, r.TotMiB, r.FolderMaxMiB, r.MemoryUsageMiB, r.SHA256Perf, r.MemorySize, r.Date) @@ -176,6 +178,7 @@ func main() { http.HandleFunc("/", withDB(db, rootHandler)) http.HandleFunc("/newdata", withDB(db, newDataHandler)) + http.HandleFunc("/summary.json", withDB(db, summaryHandler)) http.Handle("/static/", http.StripPrefix("/static/", http.FileServer(http.Dir("static")))) err = srv.Serve(listener) @@ -246,6 +249,25 @@ func newDataHandler(db *sql.DB, w http.ResponseWriter, r *http.Request) { } } +func summaryHandler(db *sql.DB, w http.ResponseWriter, r *http.Request) { + s, err := getSummary(db) + if err != nil { + log.Println("summaryHandler:", err) + http.Error(w, "Database Error", http.StatusInternalServerError) + return + } + + bs, err := s.MarshalJSON() + if err != nil { + log.Println("summaryHandler:", err) + http.Error(w, "JSON Encode Error", http.StatusInternalServerError) + return + } + + w.Header().Set("Content-Type", "application/json") + w.Write(bs) +} + type category struct { Values [4]float64 Key string @@ -268,6 +290,8 @@ func getReport(db *sql.DB) map[string]interface{} { var memoryUsage []int var sha256Perf []float64 var memorySize []int + var compilers []string + var builders []string rows, err := db.Query(`SELECT * FROM Reports WHERE Received > now() - '1 day'::INTERVAL`) if err != nil { @@ -294,6 +318,10 @@ func getReport(db *sql.DB) map[string]interface{} { platforms = append(platforms, rep.Platform) ps := strings.Split(rep.Platform, "-") oses = append(oses, ps[0]) + if m := compilerRe.FindStringSubmatch(rep.LongVersion); len(m) == 3 { + compilers = append(compilers, m[1]) + builders = append(builders, m[2]) + } if rep.NumFolders > 0 { numFolders = append(numFolders, rep.NumFolders) } @@ -385,6 +413,8 @@ func getReport(db *sql.DB) map[string]interface{} { r["versions"] = analyticsFor(versions, 10) r["platforms"] = analyticsFor(platforms, 0) r["os"] = analyticsFor(oses, 0) + r["compilers"] = analyticsFor(compilers, 12) + r["builders"] = analyticsFor(builders, 12) return r } @@ -426,3 +456,104 @@ func transformVersion(v string) string { return v } + +type summary struct { + versions map[string]int // version string to count index + rows map[string][]int // date to list of counts +} + +func newSummary() summary { + return summary{ + versions: make(map[string]int), + rows: make(map[string][]int), + } +} + +func (s *summary) setCount(date, version string, count int) { + idx, ok := s.versions[version] + if !ok { + idx = len(s.versions) + s.versions[version] = idx + } + + row := s.rows[date] + if len(row) <= idx { + old := row + row = make([]int, idx+1) + copy(row, old) + s.rows[date] = row + } + + row[idx] = count +} + +func (s *summary) MarshalJSON() ([]byte, error) { + var versions []string + for v := range s.versions { + versions = append(versions, v) + } + sort.Strings(versions) + + headerRow := []interface{}{"Day"} + for _, v := range versions { + headerRow = append(headerRow, v) + } + + var table [][]interface{} + table = append(table, headerRow) + + var dates []string + for k := range s.rows { + dates = append(dates, k) + } + sort.Strings(dates) + + for _, date := range dates { + row := []interface{}{date} + for _, ver := range versions { + idx := s.versions[ver] + if len(s.rows[date]) > idx { + row = append(row, s.rows[date][idx]) + } else { + row = append(row, 0) + } + } + table = append(table, row) + } + + return json.Marshal(table) +} + +func getSummary(db *sql.DB) (summary, error) { + s := newSummary() + + rows, err := db.Query(`SELECT Day, Version, Count FROM VersionSummary;`) + if err != nil { + return summary{}, err + } + defer rows.Close() + + for rows.Next() { + var day time.Time + var ver string + var num int + err := rows.Scan(&day, &ver, &num) + if err != nil { + return summary{}, err + } + + if ver == "v0.0" { + // ? + continue + } + + // SUPER UGLY HACK to avoid having to do sorting properly + if len(ver) == 4 { // v0.x + ver = ver[:3] + "0" + ver[3:] // now v0.0x + } + + s.setCount(day.Format("2006-01-02"), ver, num) + } + + return s, nil +}