syncthing/cmd/uraggregate/main.go

327 lines
8.2 KiB
Go
Raw Normal View History

// Copyright (C) 2018 The Syncthing Authors.
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at https://mozilla.org/MPL/2.0/.
2015-05-21 06:52:19 +00:00
package main
import (
"database/sql"
"log"
"os"
"time"
2015-07-15 11:23:13 +00:00
_ "github.com/lib/pq"
2015-05-21 06:52:19 +00:00
)
var dbConn = getEnvDefault("UR_DB_URL", "postgres://user:password@localhost/ur?sslmode=disable")
func getEnvDefault(key, def string) string {
if val := os.Getenv(key); val != "" {
return val
}
return def
}
func main() {
log.SetFlags(log.Ltime | log.Ldate)
log.SetOutput(os.Stdout)
db, err := sql.Open("postgres", dbConn)
if err != nil {
log.Fatalln("database:", err)
}
err = setupDB(db)
if err != nil {
log.Fatalln("database:", err)
}
for {
runAggregation(db)
// Sleep until one minute past next midnight
sleepUntilNext(24*time.Hour, 1*time.Minute)
}
}
func runAggregation(db *sql.DB) {
2015-07-15 11:23:13 +00:00
since := maxIndexedDay(db, "VersionSummary")
log.Println("Aggregating VersionSummary data since", since)
rows, err := aggregateVersionSummary(db, since.Add(24*time.Hour))
2015-07-15 11:23:13 +00:00
if err != nil {
log.Println("aggregate:", err)
2015-07-15 11:23:13 +00:00
}
log.Println("Inserted", rows, "rows")
2016-09-06 18:15:18 +00:00
since = maxIndexedDay(db, "Performance")
log.Println("Aggregating Performance data since", since)
rows, err = aggregatePerformance(db, since.Add(24*time.Hour))
2016-09-06 18:15:18 +00:00
if err != nil {
log.Println("aggregate:", err)
2016-09-06 18:15:18 +00:00
}
log.Println("Inserted", rows, "rows")
2017-11-08 14:41:42 +00:00
since = maxIndexedDay(db, "BlockStats")
log.Println("Aggregating BlockStats data since", since)
rows, err = aggregateBlockStats(db, since.Add(24*time.Hour))
2017-11-08 14:41:42 +00:00
if err != nil {
log.Println("aggregate:", err)
2017-11-08 14:41:42 +00:00
}
log.Println("Inserted", rows, "rows")
2015-05-21 06:52:19 +00:00
}
func sleepUntilNext(intv, margin time.Duration) {
now := time.Now().UTC()
next := now.Truncate(intv).Add(intv).Add(margin)
log.Println("Sleeping until", next)
time.Sleep(next.Sub(now))
}
func setupDB(db *sql.DB) error {
_, err := db.Exec(`CREATE TABLE IF NOT EXISTS VersionSummary (
Day TIMESTAMP NOT NULL,
Version VARCHAR(8) NOT NULL,
Count INTEGER NOT NULL
)`)
if err != nil {
return err
}
2015-07-15 11:23:13 +00:00
_, err = db.Exec(`CREATE TABLE IF NOT EXISTS UserMovement (
Day TIMESTAMP NOT NULL,
Added INTEGER NOT NULL,
2015-07-15 11:45:33 +00:00
Bounced INTEGER NOT NULL,
2015-07-15 11:23:13 +00:00
Removed INTEGER NOT NULL
)`)
if err != nil {
return err
}
2016-09-06 18:15:18 +00:00
_, err = db.Exec(`CREATE TABLE IF NOT EXISTS Performance (
Day TIMESTAMP NOT NULL,
TotFiles INTEGER NOT NULL,
TotMiB INTEGER NOT NULL,
SHA256Perf DOUBLE PRECISION NOT NULL,
MemorySize INTEGER NOT NULL,
MemoryUsageMiB INTEGER NOT NULL
)`)
if err != nil {
return err
}
2017-11-08 14:41:42 +00:00
_, err = db.Exec(`CREATE TABLE IF NOT EXISTS BlockStats (
Day TIMESTAMP NOT NULL,
Reports INTEGER NOT NULL,
Total BIGINT NOT NULL,
Renamed BIGINT NOT NULL,
Reused BIGINT NOT NULL,
Pulled BIGINT NOT NULL,
CopyOrigin BIGINT NOT NULL,
CopyOriginShifted BIGINT NOT NULL,
CopyElsewhere BIGINT NOT NULL
2017-11-08 14:41:42 +00:00
)`)
if err != nil {
return err
}
2016-09-06 18:15:18 +00:00
var t string
2015-05-21 06:52:19 +00:00
row := db.QueryRow(`SELECT 'UniqueDayVersionIndex'::regclass`)
2016-09-06 18:15:18 +00:00
if err := row.Scan(&t); err != nil {
_, _ = db.Exec(`CREATE UNIQUE INDEX UniqueDayVersionIndex ON VersionSummary (Day, Version)`)
2015-05-21 06:52:19 +00:00
}
2016-09-06 18:15:18 +00:00
row = db.QueryRow(`SELECT 'VersionDayIndex'::regclass`)
if err := row.Scan(&t); err != nil {
_, _ = db.Exec(`CREATE INDEX VersionDayIndex ON VersionSummary (Day)`)
2015-05-21 06:52:19 +00:00
}
2015-07-15 11:23:13 +00:00
row = db.QueryRow(`SELECT 'MovementDayIndex'::regclass`)
2016-09-06 18:15:18 +00:00
if err := row.Scan(&t); err != nil {
_, _ = db.Exec(`CREATE INDEX MovementDayIndex ON UserMovement (Day)`)
2015-07-15 11:23:13 +00:00
}
2016-09-06 18:15:18 +00:00
row = db.QueryRow(`SELECT 'PerformanceDayIndex'::regclass`)
if err := row.Scan(&t); err != nil {
_, _ = db.Exec(`CREATE INDEX PerformanceDayIndex ON Performance (Day)`)
2016-09-06 18:15:18 +00:00
}
2017-11-08 14:41:42 +00:00
row = db.QueryRow(`SELECT 'BlockStatsDayIndex'::regclass`)
if err := row.Scan(&t); err != nil {
_, _ = db.Exec(`CREATE INDEX BlockStatsDayIndex ON BlockStats (Day)`)
2017-11-08 14:41:42 +00:00
}
return nil
2015-05-21 06:52:19 +00:00
}
2015-07-15 11:23:13 +00:00
func maxIndexedDay(db *sql.DB, table string) time.Time {
2015-05-21 06:52:19 +00:00
var t time.Time
row := db.QueryRow("SELECT MAX(DATE_TRUNC('day', Day)) FROM " + table)
2015-05-21 06:52:19 +00:00
err := row.Scan(&t)
if err != nil {
return time.Time{}
}
return t
}
2015-07-15 11:23:13 +00:00
func aggregateVersionSummary(db *sql.DB, since time.Time) (int64, error) {
2015-05-21 06:52:19 +00:00
res, err := db.Exec(`INSERT INTO VersionSummary (
SELECT
DATE_TRUNC('day', Received) AS Day,
SUBSTRING(Report->>'version' FROM '^v\d.\d+') AS Ver,
2015-05-21 06:52:19 +00:00
COUNT(*) AS Count
FROM ReportsJson
2015-05-21 06:52:19 +00:00
WHERE
Received > $1
AND Received < DATE_TRUNC('day', NOW())
AND Report->>'version' like 'v_.%'
2015-05-21 06:52:19 +00:00
GROUP BY Day, Ver
);
`, since)
if err != nil {
return 0, err
}
return res.RowsAffected()
}
2015-07-15 11:23:13 +00:00
func aggregateUserMovement(db *sql.DB) (int64, error) {
rows, err := db.Query(`SELECT
DATE_TRUNC('day', Received) AS Day,
Report->>'uniqueID'
FROM ReportsJson
2015-07-15 11:23:13 +00:00
WHERE
Report->>'uniqueID' IS NOT NULL
AND Received < DATE_TRUNC('day', NOW())
AND Report->>'version' like 'v_.%'
2015-07-15 11:23:13 +00:00
ORDER BY Day
`)
if err != nil {
return 0, err
}
defer rows.Close()
firstSeen := make(map[string]time.Time)
lastSeen := make(map[string]time.Time)
var minTs time.Time
2016-09-07 09:19:38 +00:00
minTs = minTs.In(time.UTC)
2015-07-15 11:23:13 +00:00
for rows.Next() {
var ts time.Time
var id string
if err := rows.Scan(&ts, &id); err != nil {
return 0, err
}
if minTs.IsZero() {
minTs = ts
}
if _, ok := firstSeen[id]; !ok {
firstSeen[id] = ts
}
lastSeen[id] = ts
}
type sumRow struct {
day time.Time
added int
removed int
2015-07-15 11:45:33 +00:00
bounced int
2015-07-15 11:23:13 +00:00
}
var sumRows []sumRow
for t := minTs; t.Before(time.Now().Truncate(24 * time.Hour)); t = t.AddDate(0, 0, 1) {
2015-07-15 11:45:33 +00:00
var added, removed, bounced int
2016-12-07 14:22:25 +00:00
old := t.Before(time.Now().AddDate(0, 0, -30))
2015-07-15 11:23:13 +00:00
for id, first := range firstSeen {
last := lastSeen[id]
2015-07-15 11:45:33 +00:00
if first.Equal(t) && last.Equal(t) && old {
bounced++
continue
}
2015-07-15 11:23:13 +00:00
if first.Equal(t) {
added++
}
2015-07-15 11:45:33 +00:00
if last == t && old {
2015-07-15 11:23:13 +00:00
removed++
}
}
2015-07-15 11:45:33 +00:00
sumRows = append(sumRows, sumRow{t, added, removed, bounced})
2015-07-15 11:23:13 +00:00
}
tx, err := db.Begin()
if err != nil {
return 0, err
}
if _, err := tx.Exec("DELETE FROM UserMovement"); err != nil {
tx.Rollback()
return 0, err
}
for _, r := range sumRows {
2015-07-15 11:45:33 +00:00
if _, err := tx.Exec("INSERT INTO UserMovement (Day, Added, Removed, Bounced) VALUES ($1, $2, $3, $4)", r.day, r.added, r.removed, r.bounced); err != nil {
2015-07-15 11:23:13 +00:00
tx.Rollback()
return 0, err
}
}
return int64(len(sumRows)), tx.Commit()
}
2016-09-06 18:15:18 +00:00
func aggregatePerformance(db *sql.DB, since time.Time) (int64, error) {
res, err := db.Exec(`INSERT INTO Performance (
SELECT
DATE_TRUNC('day', Received) AS Day,
AVG((Report->>'totFiles')::numeric) As TotFiles,
AVG((Report->>'totMiB')::numeric) As TotMiB,
AVG((Report->>'sha256Perf')::numeric) As SHA256Perf,
AVG((Report->>'memorySize')::numeric) As MemorySize,
AVG((Report->>'memoryUsageMiB')::numeric) As MemoryUsageMiB
FROM ReportsJson
2016-09-06 18:15:18 +00:00
WHERE
Received > $1
AND Received < DATE_TRUNC('day', NOW())
AND Report->>'version' like 'v_.%'
2021-05-12 07:01:18 +00:00
/* Some custom implementation reported bytes when we expect megabytes, cap at petabyte */
AND (Report->>'memorySize')::numeric < 1073741824
2016-09-06 18:15:18 +00:00
GROUP BY Day
);
`, since)
if err != nil {
return 0, err
}
return res.RowsAffected()
}
2017-11-08 14:41:42 +00:00
func aggregateBlockStats(db *sql.DB, since time.Time) (int64, error) {
2017-11-09 22:22:47 +00:00
// Filter out anything prior 0.14.41 as that has sum aggregations which
// made no sense.
2017-11-08 14:41:42 +00:00
res, err := db.Exec(`INSERT INTO BlockStats (
SELECT
DATE_TRUNC('day', Received) AS Day,
COUNT(1) As Reports,
SUM((Report->'blockStats'->>'total')::numeric)::bigint AS Total,
SUM((Report->'blockStats'->>'renamed')::numeric)::bigint AS Renamed,
SUM((Report->'blockStats'->>'reused')::numeric)::bigint AS Reused,
SUM((Report->'blockStats'->>'pulled')::numeric)::bigint AS Pulled,
SUM((Report->'blockStats'->>'copyOrigin')::numeric)::bigint AS CopyOrigin,
SUM((Report->'blockStats'->>'copyOriginShifted')::numeric)::bigint AS CopyOriginShifted,
SUM((Report->'blockStats'->>'copyElsewhere')::numeric)::bigint AS CopyElsewhere
FROM ReportsJson
2017-11-08 14:41:42 +00:00
WHERE
Received > $1
AND Received < DATE_TRUNC('day', NOW())
AND (Report->>'urVersion')::numeric >= 3
AND Report->>'version' like 'v_.%'
AND Report->>'version' NOT LIKE 'v0.14.40%'
AND Report->>'version' NOT LIKE 'v0.14.39%'
AND Report->>'version' NOT LIKE 'v0.14.38%'
2017-11-09 22:22:47 +00:00
GROUP BY Day
2017-11-08 14:41:42 +00:00
);
`, since)
if err != nil {
return 0, err
}
return res.RowsAffected()
}