2015-07-15 13:45:33 +02:00

212 lines
4.6 KiB
Go

package main
import (
"database/sql"
"log"
"os"
"time"
_ "github.com/lib/pq"
)
var dbConn = getEnvDefault("UR_DB_URL", "postgres://user:password@localhost/ur?sslmode=disable")
func getEnvDefault(key, def string) string {
if val := os.Getenv(key); val != "" {
return val
}
return def
}
func main() {
log.SetFlags(log.Ltime | log.Ldate)
log.SetOutput(os.Stdout)
db, err := sql.Open("postgres", dbConn)
if err != nil {
log.Fatalln("database:", err)
}
err = setupDB(db)
if err != nil {
log.Fatalln("database:", err)
}
for {
runAggregation(db)
// Sleep until one minute past next midnight
sleepUntilNext(24*time.Hour, 1*time.Minute)
}
}
func runAggregation(db *sql.DB) {
since := maxIndexedDay(db, "VersionSummary")
log.Println("Aggregating VersionSummary data since", since)
rows, err := aggregateVersionSummary(db, since)
if err != nil {
log.Fatalln("aggregate:", err)
}
log.Println("Inserted", rows, "rows")
log.Println("Aggregating UserMovement data")
rows, err = aggregateUserMovement(db)
if err != nil {
log.Fatalln("aggregate:", err)
}
log.Println("Inserted", rows, "rows")
}
func sleepUntilNext(intv, margin time.Duration) {
now := time.Now().UTC()
next := now.Truncate(intv).Add(intv).Add(margin)
log.Println("Sleeping until", next)
time.Sleep(next.Sub(now))
}
func setupDB(db *sql.DB) error {
_, err := db.Exec(`CREATE TABLE IF NOT EXISTS VersionSummary (
Day TIMESTAMP NOT NULL,
Version VARCHAR(8) NOT NULL,
Count INTEGER NOT NULL
)`)
if err != nil {
return err
}
_, err = db.Exec(`CREATE TABLE IF NOT EXISTS UserMovement (
Day TIMESTAMP NOT NULL,
Added INTEGER NOT NULL,
Bounced INTEGER NOT NULL,
Removed INTEGER NOT NULL
)`)
if err != nil {
return err
}
row := db.QueryRow(`SELECT 'UniqueDayVersionIndex'::regclass`)
if err := row.Scan(nil); err != nil {
_, err = db.Exec(`CREATE UNIQUE INDEX UniqueDayVersionIndex ON VersionSummary (Day, Version)`)
}
row = db.QueryRow(`SELECT 'DayIndex'::regclass`)
if err := row.Scan(nil); err != nil {
_, err = db.Exec(`CREATE INDEX DayIndex ON VerionSummary (Day)`)
}
row = db.QueryRow(`SELECT 'MovementDayIndex'::regclass`)
if err := row.Scan(nil); err != nil {
_, err = db.Exec(`CREATE INDEX MovementDayIndex ON UserMovement (Day)`)
}
return err
}
func maxIndexedDay(db *sql.DB, table string) time.Time {
var t time.Time
row := db.QueryRow("SELECT MAX(Day) FROM " + table)
err := row.Scan(&t)
if err != nil {
return time.Time{}
}
return t
}
func aggregateVersionSummary(db *sql.DB, since time.Time) (int64, error) {
res, err := db.Exec(`INSERT INTO VersionSummary (
SELECT
DATE_TRUNC('day', Received) AS Day,
SUBSTRING(Version FROM '^v\d.\d+') AS Ver,
COUNT(*) AS Count
FROM Reports
WHERE
DATE_TRUNC('day', Received) > $1
AND DATE_TRUNC('day', Received) < DATE_TRUNC('day', NOW())
AND Version like 'v0.%'
GROUP BY Day, Ver
);
`, since)
if err != nil {
return 0, err
}
return res.RowsAffected()
}
func aggregateUserMovement(db *sql.DB) (int64, error) {
rows, err := db.Query(`SELECT
DATE_TRUNC('day', Received) AS Day,
UniqueID
FROM Reports
WHERE
DATE_TRUNC('day', Received) < DATE_TRUNC('day', NOW())
AND Version like 'v0.%'
ORDER BY Day
`)
if err != nil {
return 0, err
}
defer rows.Close()
firstSeen := make(map[string]time.Time)
lastSeen := make(map[string]time.Time)
var minTs time.Time
for rows.Next() {
var ts time.Time
var id string
if err := rows.Scan(&ts, &id); err != nil {
return 0, err
}
if minTs.IsZero() {
minTs = ts
}
if _, ok := firstSeen[id]; !ok {
firstSeen[id] = ts
}
lastSeen[id] = ts
}
type sumRow struct {
day time.Time
added int
removed int
bounced int
}
var sumRows []sumRow
for t := minTs; t.Before(time.Now().Truncate(24 * time.Hour)); t = t.AddDate(0, 0, 1) {
var added, removed, bounced int
old := t.Before(time.Now().AddDate(0, 0, -14))
for id, first := range firstSeen {
last := lastSeen[id]
if first.Equal(t) && last.Equal(t) && old {
bounced++
continue
}
if first.Equal(t) {
added++
}
if last == t && old {
removed++
}
}
sumRows = append(sumRows, sumRow{t, added, removed, bounced})
}
tx, err := db.Begin()
if err != nil {
return 0, err
}
if _, err := tx.Exec("DELETE FROM UserMovement"); err != nil {
tx.Rollback()
return 0, err
}
for _, r := range sumRows {
if _, err := tx.Exec("INSERT INTO UserMovement (Day, Added, Removed, Bounced) VALUES ($1, $2, $3, $4)", r.day, r.added, r.removed, r.bounced); err != nil {
tx.Rollback()
return 0, err
}
}
return int64(len(sumRows)), tx.Commit()
}