diff --git a/cmd/discosrv/README.md b/cmd/discosrv/README.md index ec0191d3e..c57d4b673 100644 --- a/cmd/discosrv/README.md +++ b/cmd/discosrv/README.md @@ -5,7 +5,36 @@ discosrv This is the global discovery server for the `syncthing` project. -`go get github.com/syncthing/discosrv` +To get it, run `go get github.com/syncthing/discosrv` or download the +[latest build](http://build.syncthing.net/job/discosrv/lastSuccessfulBuild/artifact/) +from the build server. -Or download the latest [Linux build](http://build.syncthing.net/job/discosrv/lastSuccessfulBuild/artifact/). +Usage +----- +The discovery server supports `ql` and `postgres` backends. +Specify the backend via `-db-backend` and the database DSN via `-db-dsn`. + +By default it will use in-memory `ql` backend. If you wish to persist the +information on disk between restarts in `ql`, specify a file DSN: + +```bash +$ discosrv -db-dsn="file://var/run/discosrv.db" +``` + +For `postgres`, you will need to create a database and a user with permissions +to create tables in it, then start the discosrv as follows: + +```bash +$ export DISCOSRV_DB_DSN="postgres://user:password@localhost/databasename" +$ discosrv -db-backend="postgres" +``` + +You can pass the DSN as command line option, but the value what you pass in will +be visible in most process managers, potentially exposing the database password +to other users. + +In all cases, the appropriate tables and indexes will be created at first +startup. If it doesn't exit with an error, you're fine. + +See `discosrv -help` for other options. diff --git a/cmd/discosrv/clean.go b/cmd/discosrv/clean.go new file mode 100644 index 000000000..7871bca01 --- /dev/null +++ b/cmd/discosrv/clean.go @@ -0,0 +1,87 @@ +// Copyright (C) 2014-2015 Jakob Borg and Contributors (see the CONTRIBUTORS file). + +package main + +import ( + "database/sql" + "log" + "time" +) + +type cleansrv struct { + intv time.Duration + db *sql.DB + prep map[string]*sql.Stmt +} + +func (s *cleansrv) Serve() { + for { + time.Sleep(next(s.intv)) + + err := s.cleanOldEntries() + if err != nil { + log.Println("Clean:", err) + } + } +} + +func (s *cleansrv) Stop() { + panic("stop unimplemented") +} + +func (s *cleansrv) cleanOldEntries() (err error) { + var tx *sql.Tx + tx, err = s.db.Begin() + if err != nil { + return err + } + + defer func() { + if err == nil { + err = tx.Commit() + } else { + tx.Rollback() + } + }() + + res, err := tx.Stmt(s.prep["cleanAddress"]).Exec() + if err != nil { + return err + } + if rows, _ := res.RowsAffected(); rows > 0 { + log.Printf("Clean: %d old addresses", rows) + } + + res, err = tx.Stmt(s.prep["cleanRelay"]).Exec() + if err != nil { + return err + } + if rows, _ := res.RowsAffected(); rows > 0 { + log.Printf("Clean: %d old relays", rows) + } + + res, err = tx.Stmt(s.prep["cleanDevice"]).Exec() + if err != nil { + return err + } + if rows, _ := res.RowsAffected(); rows > 0 { + log.Printf("Clean: %d old devices", rows) + } + + var devs, addrs, relays int + row := tx.Stmt(s.prep["countDevice"]).QueryRow() + if err = row.Scan(&devs); err != nil { + return err + } + row = tx.Stmt(s.prep["countAddress"]).QueryRow() + if err = row.Scan(&addrs); err != nil { + return err + } + row = tx.Stmt(s.prep["countRelay"]).QueryRow() + if err = row.Scan(&relays); err != nil { + return err + } + + log.Printf("Database: %d devices, %d addresses, %d relays", devs, addrs, relays) + return nil +} diff --git a/cmd/discosrv/db.go b/cmd/discosrv/db.go new file mode 100644 index 000000000..34162d588 --- /dev/null +++ b/cmd/discosrv/db.go @@ -0,0 +1,32 @@ +// Copyright (C) 2014-2015 Jakob Borg and Contributors (see the CONTRIBUTORS file). + +package main + +import ( + "database/sql" + "fmt" +) + +type setupFunc func(db *sql.DB) error +type compileFunc func(db *sql.DB) (map[string]*sql.Stmt, error) + +var ( + setupFuncs = make(map[string]setupFunc) + compileFuncs = make(map[string]compileFunc) +) + +func register(name string, setup setupFunc, compile compileFunc) { + setupFuncs[name] = setup + compileFuncs[name] = compile +} + +func setup(backend string, db *sql.DB) (map[string]*sql.Stmt, error) { + setup, ok := setupFuncs[backend] + if !ok { + return nil, fmt.Errorf("Unsupported backend") + } + if err := setup(db); err != nil { + return nil, err + } + return compileFuncs[backend](db) +} diff --git a/cmd/discosrv/main.go b/cmd/discosrv/main.go index d14842cf2..91f16c10c 100644 --- a/cmd/discosrv/main.go +++ b/cmd/discosrv/main.go @@ -1,392 +1,93 @@ -// Copyright (C) 2014 Jakob Borg and Contributors (see the CONTRIBUTORS file). +// Copyright (C) 2014-2015 Jakob Borg and Contributors (see the CONTRIBUTORS file). package main import ( - "bytes" - "encoding/binary" + "database/sql" "flag" - "fmt" - "io" "log" "net" "os" - "path/filepath" - "sync" "time" - "github.com/golang/groupcache/lru" - "github.com/juju/ratelimit" - "github.com/syncthing/protocol" - "github.com/syncthing/syncthing/lib/discover" - "github.com/syndtr/goleveldb/leveldb" - "github.com/syndtr/goleveldb/leveldb/opt" + "github.com/thejerf/suture" ) -const cacheLimitSeconds = 3600 - var ( - lock sync.Mutex - queries = 0 - announces = 0 - answered = 0 - limited = 0 - unknowns = 0 - debug = false - lruSize = 1024 - limitAvg = 1 - limitBurst = 10 - limiter *lru.Cache + lruSize = 10240 + limitAvg = 5 + limitBurst = 20 + globalStats stats + statsFile string + backend = "ql" + dsn = getEnvDefault("DISCOSRV_DB_DSN", "memory://discosrv") ) func main() { - var listen string - var timestamp bool - var statsIntv int - var statsFile string - var unknownFile string - var dbDir string + const ( + cleanIntv = 1 * time.Hour + statsIntv = 5 * time.Minute + ) - flag.StringVar(&listen, "listen", ":22026", "Listen address") - flag.BoolVar(&debug, "debug", false, "Enable debug output") - flag.BoolVar(×tamp, "timestamp", true, "Timestamp the log output") - flag.IntVar(&statsIntv, "stats-intv", 0, "Statistics output interval (s)") - flag.StringVar(&statsFile, "stats-file", "/var/discosrv/stats", "Statistics file name") - flag.StringVar(&unknownFile, "unknown-file", "", "Unknown packet log file name") + var listen string + + log.SetOutput(os.Stdout) + log.SetFlags(0) + + flag.StringVar(&listen, "listen", ":22027", "Listen address") flag.IntVar(&lruSize, "limit-cache", lruSize, "Limiter cache entries") flag.IntVar(&limitAvg, "limit-avg", limitAvg, "Allowed average package rate, per 10 s") flag.IntVar(&limitBurst, "limit-burst", limitBurst, "Allowed burst size, packets") - flag.StringVar(&dbDir, "db-dir", "/var/discosrv/db", "Database directory") + flag.StringVar(&statsFile, "stats-file", statsFile, "File to write periodic operation stats to") + flag.StringVar(&backend, "db-backend", backend, "Database backend to use") + flag.StringVar(&dsn, "db-dsn", dsn, "Database DSN") flag.Parse() - limiter = lru.New(lruSize) - - log.SetOutput(os.Stdout) - if !timestamp { - log.SetFlags(0) - } - addr, _ := net.ResolveUDPAddr("udp", listen) - conn, err := net.ListenUDP("udp", addr) + + var err error + db, err := sql.Open(backend, dsn) if err != nil { - log.Fatal(err) + log.Fatalln("sql.Open:", err) } - - parentDir := filepath.Dir(dbDir) - if _, err := os.Stat(parentDir); err != nil && os.IsNotExist(err) { - err = os.MkdirAll(parentDir, 0755) - if err != nil { - log.Fatal(err) - } - } - - db, err := leveldb.OpenFile(dbDir, &opt.Options{OpenFilesCacheCapacity: 32}) + prep, err := setup(backend, db) if err != nil { - log.Fatal(err) + log.Fatalln("Setup:", err) } - statsLog, err := os.OpenFile(statsFile, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644) - if err != nil { - log.Fatal(err) - } + main := suture.NewSimple("main") - var unknownLog io.Writer - if unknownFile != "" { - unknownLog, err = os.OpenFile(unknownFile, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644) - if err != nil { - log.Fatal(err) - } - } + main.Add(&querysrv{ + addr: addr, + db: db, + prep: prep, + }) - if statsIntv > 0 { - go logStats(statsLog, statsIntv) - } + main.Add(&cleansrv{ + intv: cleanIntv, + db: db, + prep: prep, + }) - go clean(statsLog, db) + main.Add(&statssrv{ + intv: statsIntv, + file: statsFile, + db: db, + }) - var buf = make([]byte, 1024) - for { - buf = buf[:cap(buf)] - n, addr, err := conn.ReadFromUDP(buf) - - if limit(addr) { - // Rate limit in effect for source - continue - } - - if err != nil { - log.Fatal(err) - } - - if n < 4 { - log.Printf("Received short packet (%d bytes)", n) - continue - } - - buf = buf[:n] - magic := binary.BigEndian.Uint32(buf) - - switch magic { - case discover.AnnouncementMagic: - err := handleAnnounceV2(db, addr, buf) - if err != nil && unknownLog != nil { - fmt.Fprintf(unknownLog, "AE %d %v %x\n", time.Now().Unix(), addr, buf) - } - - case discover.QueryMagic: - err := handleQueryV2(db, conn, addr, buf) - if err != nil && unknownLog != nil { - fmt.Fprintf(unknownLog, "QE %d %v %x\n", time.Now().Unix(), addr, buf) - } - - default: - lock.Lock() - unknowns++ - lock.Unlock() - if unknownLog != nil { - fmt.Fprintf(unknownLog, "UN %d %v %x\n", time.Now().Unix(), addr, buf) - } - } - } + globalStats.Reset() + main.Serve() } -func limit(addr *net.UDPAddr) bool { - key := addr.IP.String() - - lock.Lock() - defer lock.Unlock() - - bkt, ok := limiter.Get(key) - if ok { - bkt := bkt.(*ratelimit.Bucket) - if bkt.TakeAvailable(1) != 1 { - // Rate limit exceeded; ignore packet - if debug { - log.Println("Rate limit exceeded for", key) - } - limited++ - return true - } - } else { - if debug { - log.Println("New limiter for", key) - } - // One packet per ten seconds average rate, burst ten packets - limiter.Add(key, ratelimit.NewBucket(10*time.Second/time.Duration(limitAvg), int64(limitBurst))) +func getEnvDefault(key, def string) string { + if val := os.Getenv(key); val != "" { + return val } - - return false + return def } -func handleAnnounceV2(db *leveldb.DB, addr *net.UDPAddr, buf []byte) error { - var pkt discover.Announce - err := pkt.UnmarshalXDR(buf) - if err != nil && err != io.EOF { - return err - } - if debug { - log.Printf("<- %v %#v", addr, pkt) - } - - lock.Lock() - announces++ - lock.Unlock() - - ip := addr.IP.To4() - if ip == nil { - ip = addr.IP.To16() - } - - var addrs []address - now := time.Now().Unix() - for _, addr := range pkt.This.Addresses { - tip := addr.IP - if len(tip) == 0 { - tip = ip - } - addrs = append(addrs, address{ - ip: tip, - port: addr.Port, - seen: now, - }) - } - - var id protocol.DeviceID - if len(pkt.This.ID) == 32 { - // Raw node ID - copy(id[:], pkt.This.ID) - } else { - err = id.UnmarshalText(pkt.This.ID) - if err != nil { - return err - } - } - - update(db, id, addrs) - return nil -} - -func handleQueryV2(db *leveldb.DB, conn *net.UDPConn, addr *net.UDPAddr, buf []byte) error { - var pkt discover.Query - err := pkt.UnmarshalXDR(buf) - if err != nil { - return err - } - if debug { - log.Printf("<- %v %#v", addr, pkt) - } - - var id protocol.DeviceID - if len(pkt.DeviceID) == 32 { - // Raw node ID - copy(id[:], pkt.DeviceID) - } else { - err = id.UnmarshalText(pkt.DeviceID) - if err != nil { - return err - } - } - - lock.Lock() - queries++ - lock.Unlock() - - addrs := get(db, id) - - now := time.Now().Unix() - if len(addrs) > 0 { - ann := discover.Announce{ - Magic: discover.AnnouncementMagic, - This: discover.Device{ - ID: pkt.DeviceID, - }, - } - for _, addr := range addrs { - if now-addr.seen > cacheLimitSeconds { - continue - } - ann.This.Addresses = append(ann.This.Addresses, discover.Address{IP: addr.ip, Port: addr.port}) - } - if debug { - log.Printf("-> %v %#v", addr, pkt) - } - - if len(ann.This.Addresses) == 0 { - return nil - } - - tb, err := ann.MarshalXDR() - if err != nil { - log.Println("QueryV2 response marshal:", err) - return nil - } - _, err = conn.WriteToUDP(tb, addr) - if err != nil { - log.Println("QueryV2 response write:", err) - return nil - } - - lock.Lock() - answered++ - lock.Unlock() - } - return nil -} - -func next(intv int) time.Time { - d := time.Duration(intv) * time.Second +func next(intv time.Duration) time.Duration { t0 := time.Now() - t1 := t0.Add(d).Truncate(d) - time.Sleep(t1.Sub(t0)) - return t1 -} - -func logStats(statsLog io.Writer, intv int) { - for { - t := next(intv) - - lock.Lock() - - fmt.Fprintf(statsLog, "%d Queries:%d Answered:%d Announces:%d Unknown:%d Limited:%d\n", - t.Unix(), queries, answered, announces, unknowns, limited) - - queries = 0 - announces = 0 - answered = 0 - limited = 0 - unknowns = 0 - - lock.Unlock() - } -} - -func get(db *leveldb.DB, id protocol.DeviceID) []address { - var addrs addressList - val, err := db.Get(id[:], nil) - if err == nil { - addrs.UnmarshalXDR(val) - } - return addrs.addresses -} - -func update(db *leveldb.DB, id protocol.DeviceID, addrs []address) { - var newAddrs addressList - - val, err := db.Get(id[:], nil) - if err == nil { - newAddrs.UnmarshalXDR(val) - } - -nextAddr: - for _, newAddr := range addrs { - for i, exAddr := range newAddrs.addresses { - if bytes.Compare(newAddr.ip, exAddr.ip) == 0 { - newAddrs.addresses[i] = newAddr - continue nextAddr - } - } - newAddrs.addresses = append(newAddrs.addresses, newAddr) - } - - db.Put(id[:], newAddrs.MarshalXDR(), nil) -} - -func clean(statsLog io.Writer, db *leveldb.DB) { - for { - now := next(cacheLimitSeconds) - nowSecs := now.Unix() - - var kept, deleted int64 - iter := db.NewIterator(nil, nil) - for iter.Next() { - var addrs addressList - addrs.UnmarshalXDR(iter.Value()) - - // Remove expired addresses - newAddrs := addrs.addresses - for i := 0; i < len(newAddrs); i++ { - if nowSecs-newAddrs[i].seen > cacheLimitSeconds { - newAddrs[i] = newAddrs[len(newAddrs)-1] - newAddrs = newAddrs[:len(newAddrs)-1] - } - } - - // Delete empty records - if len(newAddrs) == 0 { - db.Delete(iter.Key(), nil) - deleted++ - continue - } - - // Update changed records - if len(newAddrs) != len(addrs.addresses) { - addrs.addresses = newAddrs - db.Put(iter.Key(), addrs.MarshalXDR(), nil) - } - kept++ - } - iter.Release() - - fmt.Fprintf(statsLog, "%d Kept:%d Deleted:%d Took:%0.04fs\n", nowSecs, kept, deleted, time.Since(now).Seconds()) - } + t1 := t0.Add(intv).Truncate(intv) + return t1.Sub(t0) } diff --git a/cmd/discosrv/psql.go b/cmd/discosrv/psql.go new file mode 100644 index 000000000..068547812 --- /dev/null +++ b/cmd/discosrv/psql.go @@ -0,0 +1,123 @@ +// Copyright (C) 2014-2015 Jakob Borg and Contributors (see the CONTRIBUTORS file). + +package main + +import ( + "database/sql" + + _ "github.com/lib/pq" +) + +func init() { + register("postgres", postgresSetup, postgresCompile) +} + +func postgresSetup(db *sql.DB) error { + var err error + + _, err = db.Exec(`CREATE TABLE IF NOT EXISTS Devices ( + DeviceID CHAR(63) NOT NULL PRIMARY KEY, + Seen TIMESTAMP NOT NULL + )`) + if err != nil { + return err + } + + row := db.QueryRow(`SELECT 'DevicesDeviceIDIndex'::regclass`) + if err := row.Scan(nil); err != nil { + _, err = db.Exec(`CREATE INDEX DevicesDeviceIDIndex ON Devices (DeviceID)`) + } + if err != nil { + return err + } + + row = db.QueryRow(`SELECT 'DevicesSeenIndex'::regclass`) + if err := row.Scan(nil); err != nil { + _, err = db.Exec(`CREATE INDEX DevicesSeenIndex ON Devices (Seen)`) + } + if err != nil { + return err + } + + _, err = db.Exec(`CREATE TABLE IF NOT EXISTS Addresses ( + DeviceID CHAR(63) NOT NULL, + Seen TIMESTAMP NOT NULL, + Address VARCHAR(256) NOT NULL + )`) + if err != nil { + return err + } + + row = db.QueryRow(`SELECT 'AddressesDeviceIDSeenIndex'::regclass`) + if err := row.Scan(nil); err != nil { + _, err = db.Exec(`CREATE INDEX AddressesDeviceIDSeenIndex ON Addresses (DeviceID, Seen)`) + } + if err != nil { + return err + } + + row = db.QueryRow(`SELECT 'AddressesDeviceIDAddressIndex'::regclass`) + if err := row.Scan(nil); err != nil { + _, err = db.Exec(`CREATE INDEX AddressesDeviceIDAddressIndex ON Addresses (DeviceID, Address)`) + } + if err != nil { + return err + } + + _, err = db.Exec(`CREATE TABLE IF NOT EXISTS Relays ( + DeviceID CHAR(63) NOT NULL, + Seen TIMESTAMP NOT NULL, + Address VARCHAR(256) NOT NULL, + Latency INTEGER NOT NULL + )`) + if err != nil { + return err + } + + row = db.QueryRow(`SELECT 'RelaysDeviceIDSeenIndex'::regclass`) + if err := row.Scan(nil); err != nil { + _, err = db.Exec(`CREATE INDEX RelaysDeviceIDSeenIndex ON Relays (DeviceID, Seen)`) + } + if err != nil { + return err + } + + row = db.QueryRow(`SELECT 'RelaysDeviceIDAddressIndex'::regclass`) + if err := row.Scan(nil); err != nil { + _, err = db.Exec(`CREATE INDEX RelaysDeviceIDAddressIndex ON Relays (DeviceID, Address)`) + } + if err != nil { + return err + } + + return nil +} + +func postgresCompile(db *sql.DB) (map[string]*sql.Stmt, error) { + stmts := map[string]string{ + "cleanAddress": "DELETE FROM Addresses WHERE Seen < now() - '2 hour'::INTERVAL", + "cleanRelay": "DELETE FROM Relays WHERE Seen < now() - '2 hour'::INTERVAL", + "cleanDevice": "DELETE FROM Devices WHERE Seen < now() - '24 hour'::INTERVAL", + "countAddress": "SELECT count(*) FROM Addresses", + "countDevice": "SELECT count(*) FROM Devices", + "countRelay": "SELECT count(*) FROM Relays", + "insertAddress": "INSERT INTO Addresses (DeviceID, Seen, Address) VALUES ($1, now(), $2)", + "insertRelay": "INSERT INTO Relays (DeviceID, Seen, Address, Latency) VALUES ($1, now(), $2, $3)", + "insertDevice": "INSERT INTO Devices (DeviceID, Seen) VALUES ($1, now())", + "selectAddress": "SELECT Address from Addresses WHERE DeviceID=$1 AND Seen > now() - '1 hour'::INTERVAL ORDER BY random() LIMIT 16", + "selectRelay": "SELECT Address, Latency from Relays WHERE DeviceID=$1 AND Seen > now() - '1 hour'::INTERVAL ORDER BY random() LIMIT 16", + "updateRelay": "UPDATE Relays SET Seen=now(), Latency=$3 WHERE DeviceID=$1 AND Address=$2", + "updateDevice": "UPDATE Devices SET Seen=now() WHERE DeviceID=$1", + "deleteRelay": "DELETE FROM Relays WHERE DeviceID=$1", + } + + res := make(map[string]*sql.Stmt, len(stmts)) + for key, stmt := range stmts { + prep, err := db.Prepare(stmt) + if err != nil { + return nil, err + } + res[key] = prep + } + return res, nil +} diff --git a/cmd/discosrv/ql.go b/cmd/discosrv/ql.go new file mode 100644 index 000000000..850eac74f --- /dev/null +++ b/cmd/discosrv/ql.go @@ -0,0 +1,98 @@ +// Copyright (C) 2015 Audrius Butkevicius and Contributors (see the CONTRIBUTORS file). + +package main + +import ( + "database/sql" + "log" + + "github.com/cznic/ql" +) + +func init() { + ql.RegisterDriver() + register("ql", qlSetup, qlCompile) +} + +func qlSetup(db *sql.DB) (err error) { + tx, err := db.Begin() + if err != nil { + return + } + + defer func() { + if err == nil { + err = tx.Commit() + } else { + tx.Rollback() + } + }() + + _, err = tx.Exec(`CREATE TABLE IF NOT EXISTS Devices ( + DeviceID STRING NOT NULL, + Seen TIME NOT NULL + )`) + if err != nil { + return + } + + if _, err = tx.Exec(`CREATE INDEX IF NOT EXISTS DevicesDeviceIDIndex ON Devices (DeviceID)`); err != nil { + return + } + + _, err = tx.Exec(`CREATE TABLE IF NOT EXISTS Addresses ( + DeviceID STRING NOT NULL, + Seen TIME NOT NULL, + Address STRING NOT NULL, + )`) + if err != nil { + return + } + + if _, err = tx.Exec(`CREATE INDEX IF NOT EXISTS AddressesDeviceIDAddressIndex ON Addresses (DeviceID, Address)`); err != nil { + return + } + + _, err = tx.Exec(`CREATE TABLE IF NOT EXISTS Relays ( + DeviceID STRING NOT NULL, + Seen TIME NOT NULL, + Address STRING NOT NULL, + Latency INT NOT NULL, + )`) + if err != nil { + return + } + + _, err = tx.Exec(`CREATE INDEX IF NOT EXISTS RelaysDeviceIDAddressIndex ON Relays (DeviceID, Address)`) + return +} + +func qlCompile(db *sql.DB) (map[string]*sql.Stmt, error) { + stmts := map[string]string{ + "cleanAddress": `DELETE FROM Addresses WHERE Seen < now() - duration("2h")`, + "cleanRelay": `DELETE FROM Relays WHERE Seen < now() - duration("2h")`, + "cleanDevice": `DELETE FROM Devices WHERE Seen < now() - duration("24h")`, + "countAddress": "SELECT count(*) FROM Addresses", + "countDevice": "SELECT count(*) FROM Devices", + "countRelay": "SELECT count(*) FROM Relays", + "insertAddress": "INSERT INTO Addresses (DeviceID, Seen, Address) VALUES ($1, now(), $2)", + "insertRelay": "INSERT INTO Relays (DeviceID, Seen, Address, Latency) VALUES ($1, now(), $2, $3)", + "insertDevice": "INSERT INTO Devices (DeviceID, Seen) VALUES ($1, now())", + "selectAddress": `SELECT Address from Addresses WHERE DeviceID==$1 AND Seen > now() - duration("1h") LIMIT 16`, + "selectRelay": `SELECT Address, Latency from Relays WHERE DeviceID==$1 AND Seen > now() - duration("1h") LIMIT 16`, + "updateAddress": "UPDATE Addresses Seen=now()WHERE DeviceID==$1 AND Address==$2", + "updateDevice": "UPDATE Devices Seen=now() WHERE DeviceID==$1", + "deleteRelay": "DELETE FROM Relays WHERE DeviceID==$1", + } + + res := make(map[string]*sql.Stmt, len(stmts)) + for key, stmt := range stmts { + prep, err := db.Prepare(stmt) + if err != nil { + log.Println("Failed to compile", stmt) + return nil, err + } + res[key] = prep + } + return res, nil +} diff --git a/cmd/discosrv/querysrv.go b/cmd/discosrv/querysrv.go new file mode 100644 index 000000000..9af15ef69 --- /dev/null +++ b/cmd/discosrv/querysrv.go @@ -0,0 +1,305 @@ +// Copyright (C) 2014-2015 Jakob Borg and Contributors (see the CONTRIBUTORS file). + +package main + +import ( + "database/sql" + "encoding/binary" + "fmt" + "io" + "log" + "net" + "net/url" + "time" + + "github.com/golang/groupcache/lru" + "github.com/juju/ratelimit" + "github.com/syncthing/protocol" + "github.com/syncthing/syncthing/lib/discover" +) + +type querysrv struct { + addr *net.UDPAddr + db *sql.DB + prep map[string]*sql.Stmt + limiter *lru.Cache +} + +func (s *querysrv) Serve() { + s.limiter = lru.New(lruSize) + + conn, err := net.ListenUDP("udp", s.addr) + if err != nil { + log.Println("Listen:", err) + return + } + + // Attempt to set the read and write buffers to 2^24 bytes (16 MB) or as high as + // possible. + for i := 24; i >= 16; i-- { + if conn.SetReadBuffer(1<= 16; i-- { + if conn.SetWriteBuffer(1< 0 { + ann := discover.Announce{ + Magic: discover.AnnouncementMagic, + This: discover.Device{ + ID: pkt.DeviceID, + Addresses: addrs, + Relays: relays, + }, + } + + tb, err := ann.MarshalXDR() + if err != nil { + return fmt.Errorf("QueryV2 response marshal: %v", err) + } + _, err = conn.WriteToUDP(tb, addr) + if err != nil { + return fmt.Errorf("QueryV2 response write: %v", err) + } + + globalStats.Answer() + } + + return nil +} + +func (s *querysrv) limit(addr *net.UDPAddr) bool { + key := addr.IP.String() + + bkt, ok := s.limiter.Get(key) + if ok { + bkt := bkt.(*ratelimit.Bucket) + if bkt.TakeAvailable(1) != 1 { + // Rate limit exceeded; ignore packet + return true + } + } else { + // One packet per ten seconds average rate, burst ten packets + s.limiter.Add(key, ratelimit.NewBucket(10*time.Second/time.Duration(limitAvg), int64(limitBurst))) + } + + return false +} + +func (s *querysrv) updateDevice(tx *sql.Tx, device protocol.DeviceID) error { + res, err := tx.Stmt(s.prep["updateDevice"]).Exec(device.String()) + if err != nil { + return err + } + + if rows, _ := res.RowsAffected(); rows == 0 { + _, err := tx.Stmt(s.prep["insertDevice"]).Exec(device.String()) + if err != nil { + return err + } + } + + return nil +} + +func (s *querysrv) updateAddress(tx *sql.Tx, device protocol.DeviceID, uri string) error { + res, err := tx.Stmt(s.prep["updateAddress"]).Exec(device.String(), uri) + if err != nil { + return err + } + + if rows, _ := res.RowsAffected(); rows == 0 { + _, err := tx.Stmt(s.prep["insertAddress"]).Exec(device.String(), uri) + if err != nil { + return err + } + } + + return nil +} + +func (s *querysrv) getAddresses(device protocol.DeviceID) ([]string, error) { + rows, err := s.prep["selectAddress"].Query(device.String()) + if err != nil { + return nil, err + } + + var res []string + for rows.Next() { + var addr string + + err := rows.Scan(&addr) + if err != nil { + log.Println("Scan:", err) + continue + } + res = append(res, addr) + } + + return res, nil +} + +func (s *querysrv) getRelays(device protocol.DeviceID) ([]discover.Relay, error) { + rows, err := s.prep["selectRelay"].Query(device.String()) + if err != nil { + return nil, err + } + + var res []discover.Relay + for rows.Next() { + var addr string + var latency int32 + + err := rows.Scan(&addr, &latency) + if err != nil { + log.Println("Scan:", err) + continue + } + res = append(res, discover.Relay{ + Address: addr, + Latency: latency, + }) + } + + return res, nil +} diff --git a/cmd/discosrv/stats.go b/cmd/discosrv/stats.go new file mode 100644 index 000000000..1a0140915 --- /dev/null +++ b/cmd/discosrv/stats.go @@ -0,0 +1,136 @@ +// Copyright (C) 2014-2015 Jakob Borg and Contributors (see the CONTRIBUTORS file). + +package main + +import ( + "bytes" + "database/sql" + "fmt" + "io/ioutil" + "log" + "os" + "sync" + "time" +) + +type stats struct { + mut sync.Mutex + reset time.Time + announces int64 + queries int64 + answers int64 + errors int64 +} + +func (s *stats) Announce() { + s.mut.Lock() + s.announces++ + s.mut.Unlock() +} + +func (s *stats) Query() { + s.mut.Lock() + s.queries++ + s.mut.Unlock() +} + +func (s *stats) Answer() { + s.mut.Lock() + s.answers++ + s.mut.Unlock() +} + +func (s *stats) Error() { + s.mut.Lock() + s.errors++ + s.mut.Unlock() +} + +func (s *stats) Reset() stats { + s.mut.Lock() + ns := *s + s.announces, s.queries, s.answers = 0, 0, 0 + s.reset = time.Now() + s.mut.Unlock() + return ns +} + +type statssrv struct { + intv time.Duration + file string + db *sql.DB +} + +func (s *statssrv) Serve() { + for { + time.Sleep(next(s.intv)) + + stats := globalStats.Reset() + d := time.Since(stats.reset).Seconds() + log.Printf("Stats: %.02f announces/s, %.02f queries/s, %.02f answers/s, %.02f errors/s", + float64(stats.announces)/d, float64(stats.queries)/d, float64(stats.answers)/d, float64(stats.errors)/d) + + if s.file != "" { + s.writeToFile(stats, d) + } + } +} + +func (s *statssrv) Stop() { + panic("stop unimplemented") +} + +func (s *statssrv) writeToFile(stats stats, secs float64) { + newLine := []byte("\n") + + var addrs int + row := s.db.QueryRow("SELECT COUNT(*) FROM Addresses") + if err := row.Scan(&addrs); err != nil { + log.Println("stats query:", err) + return + } + + fd, err := os.OpenFile(s.file, os.O_RDWR|os.O_CREATE, 0666) + if err != nil { + log.Println("stats file:", err) + return + } + + bs, err := ioutil.ReadAll(fd) + if err != nil { + log.Println("stats file:", err) + return + } + lines := bytes.Split(bytes.TrimSpace(bs), newLine) + if len(lines) > 12 { + lines = lines[len(lines)-12:] + } + + latest := fmt.Sprintf("%v: %6d addresses, %8.02f announces/s, %8.02f queries/s, %8.02f answers/s, %8.02f errors/s\n", + time.Now().UTC().Format(time.RFC3339), addrs, + float64(stats.announces)/secs, float64(stats.queries)/secs, float64(stats.answers)/secs, float64(stats.errors)/secs) + lines = append(lines, []byte(latest)) + + _, err = fd.Seek(0, 0) + if err != nil { + log.Println("stats file:", err) + return + } + err = fd.Truncate(0) + if err != nil { + log.Println("stats file:", err) + return + } + + _, err = fd.Write(bytes.Join(lines, newLine)) + if err != nil { + log.Println("stats file:", err) + return + } + + err = fd.Close() + if err != nil { + log.Println("stats file:", err) + return + } +} diff --git a/cmd/discosrv/types.go b/cmd/discosrv/types.go deleted file mode 100644 index fda44caed..000000000 --- a/cmd/discosrv/types.go +++ /dev/null @@ -1,13 +0,0 @@ -// Copyright (C) 2014 Jakob Borg and Contributors (see the CONTRIBUTORS file). - -package main - -type address struct { - ip []byte - port uint16 - seen int64 // epoch seconds -} - -type addressList struct { - addresses []address -} diff --git a/cmd/discosrv/types_xdr.go b/cmd/discosrv/types_xdr.go deleted file mode 100644 index a4da8e684..000000000 --- a/cmd/discosrv/types_xdr.go +++ /dev/null @@ -1,147 +0,0 @@ -// ************************************************************ -// This file is automatically generated by genxdr. Do not edit. -// ************************************************************ - -package main - -import ( - "bytes" - "io" - - "github.com/calmh/xdr" -) - -/* - -address Structure: - - 0 1 2 3 - 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 -+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ -| Length of ip | -+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ -/ / -\ ip (variable length) \ -/ / -+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ -| 0x0000 | port | -+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ -| | -+ seen (64 bits) + -| | -+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - - -struct address { - opaque ip<>; - unsigned int port; - hyper seen; -} - -*/ - -func (o address) EncodeXDR(w io.Writer) (int, error) { - var xw = xdr.NewWriter(w) - return o.encodeXDR(xw) -} - -func (o address) MarshalXDR() []byte { - return o.AppendXDR(make([]byte, 0, 128)) -} - -func (o address) AppendXDR(bs []byte) []byte { - var aw = xdr.AppendWriter(bs) - var xw = xdr.NewWriter(&aw) - o.encodeXDR(xw) - return []byte(aw) -} - -func (o address) encodeXDR(xw *xdr.Writer) (int, error) { - xw.WriteBytes(o.ip) - xw.WriteUint16(o.port) - xw.WriteUint64(uint64(o.seen)) - return xw.Tot(), xw.Error() -} - -func (o *address) DecodeXDR(r io.Reader) error { - xr := xdr.NewReader(r) - return o.decodeXDR(xr) -} - -func (o *address) UnmarshalXDR(bs []byte) error { - var br = bytes.NewReader(bs) - var xr = xdr.NewReader(br) - return o.decodeXDR(xr) -} - -func (o *address) decodeXDR(xr *xdr.Reader) error { - o.ip = xr.ReadBytes() - o.port = xr.ReadUint16() - o.seen = int64(xr.ReadUint64()) - return xr.Error() -} - -/* - -addressList Structure: - - 0 1 2 3 - 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 -+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ -| Number of addresses | -+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ -/ / -\ Zero or more address Structures \ -/ / -+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - - -struct addressList { - address addresses<>; -} - -*/ - -func (o addressList) EncodeXDR(w io.Writer) (int, error) { - var xw = xdr.NewWriter(w) - return o.encodeXDR(xw) -} - -func (o addressList) MarshalXDR() []byte { - return o.AppendXDR(make([]byte, 0, 128)) -} - -func (o addressList) AppendXDR(bs []byte) []byte { - var aw = xdr.AppendWriter(bs) - var xw = xdr.NewWriter(&aw) - o.encodeXDR(xw) - return []byte(aw) -} - -func (o addressList) encodeXDR(xw *xdr.Writer) (int, error) { - xw.WriteUint32(uint32(len(o.addresses))) - for i := range o.addresses { - o.addresses[i].encodeXDR(xw) - } - return xw.Tot(), xw.Error() -} - -func (o *addressList) DecodeXDR(r io.Reader) error { - xr := xdr.NewReader(r) - return o.decodeXDR(xr) -} - -func (o *addressList) UnmarshalXDR(bs []byte) error { - var br = bytes.NewReader(bs) - var xr = xdr.NewReader(br) - return o.decodeXDR(xr) -} - -func (o *addressList) decodeXDR(xr *xdr.Reader) error { - _addressesSize := int(xr.ReadUint32()) - o.addresses = make([]address, _addressesSize) - for i := range o.addresses { - (&o.addresses[i]).decodeXDR(xr) - } - return xr.Error() -}