From 4d9ca822a7f84e971f1f234c4b09e693b9208923 Mon Sep 17 00:00:00 2001 From: Audrius Butkevicius Date: Tue, 21 Jul 2015 23:56:27 +0100 Subject: [PATCH] Add relay support, add ql support --- cmd/discosrv/README.md | 28 ++++++--- cmd/discosrv/clean.go | 20 +++++-- cmd/discosrv/db.go | 93 +++++++---------------------- cmd/discosrv/main.go | 25 ++++---- cmd/discosrv/psql.go | 123 +++++++++++++++++++++++++++++++++++++++ cmd/discosrv/ql.go | 98 +++++++++++++++++++++++++++++++ cmd/discosrv/querysrv.go | 118 ++++++++++++++++++++++++------------- 7 files changed, 366 insertions(+), 139 deletions(-) create mode 100644 cmd/discosrv/psql.go create mode 100644 cmd/discosrv/ql.go diff --git a/cmd/discosrv/README.md b/cmd/discosrv/README.md index f60f2d628..c57d4b673 100644 --- a/cmd/discosrv/README.md +++ b/cmd/discosrv/README.md @@ -12,17 +12,29 @@ from the build server. Usage ----- -The discovery server requires a postgresql backend server. You will need -to create a database and a user with permissions to create tables in it. -Set the database URL in the environment variable `DISCOSRV_DB` before -starting discosrv. +The discovery server supports `ql` and `postgres` backends. +Specify the backend via `-db-backend` and the database DSN via `-db-dsn`. + +By default it will use in-memory `ql` backend. If you wish to persist the +information on disk between restarts in `ql`, specify a file DSN: ```bash -$ export DISCOSRV_DB="postgres://user:password@localhost/databasename" -$ discosrv +$ discosrv -db-dsn="file://var/run/discosrv.db" ``` -The appropriate tables and indexes will be created at first startup. If -it doesn't exit with an error, you're fine. +For `postgres`, you will need to create a database and a user with permissions +to create tables in it, then start the discosrv as follows: + +```bash +$ export DISCOSRV_DB_DSN="postgres://user:password@localhost/databasename" +$ discosrv -db-backend="postgres" +``` + +You can pass the DSN as command line option, but the value what you pass in will +be visible in most process managers, potentially exposing the database password +to other users. + +In all cases, the appropriate tables and indexes will be created at first +startup. If it doesn't exit with an error, you're fine. See `discosrv -help` for other options. diff --git a/cmd/discosrv/clean.go b/cmd/discosrv/clean.go index a2115190c..7871bca01 100644 --- a/cmd/discosrv/clean.go +++ b/cmd/discosrv/clean.go @@ -18,7 +18,7 @@ func (s *cleansrv) Serve() { for { time.Sleep(next(s.intv)) - err := s.cleanOldAddresses() + err := s.cleanOldEntries() if err != nil { log.Println("Clean:", err) } @@ -29,7 +29,7 @@ func (s *cleansrv) Stop() { panic("stop unimplemented") } -func (s *cleansrv) cleanOldAddresses() (err error) { +func (s *cleansrv) cleanOldEntries() (err error) { var tx *sql.Tx tx, err = s.db.Begin() if err != nil { @@ -52,6 +52,14 @@ func (s *cleansrv) cleanOldAddresses() (err error) { log.Printf("Clean: %d old addresses", rows) } + res, err = tx.Stmt(s.prep["cleanRelay"]).Exec() + if err != nil { + return err + } + if rows, _ := res.RowsAffected(); rows > 0 { + log.Printf("Clean: %d old relays", rows) + } + res, err = tx.Stmt(s.prep["cleanDevice"]).Exec() if err != nil { return err @@ -60,7 +68,7 @@ func (s *cleansrv) cleanOldAddresses() (err error) { log.Printf("Clean: %d old devices", rows) } - var devs, addrs int + var devs, addrs, relays int row := tx.Stmt(s.prep["countDevice"]).QueryRow() if err = row.Scan(&devs); err != nil { return err @@ -69,7 +77,11 @@ func (s *cleansrv) cleanOldAddresses() (err error) { if err = row.Scan(&addrs); err != nil { return err } + row = tx.Stmt(s.prep["countRelay"]).QueryRow() + if err = row.Scan(&relays); err != nil { + return err + } - log.Printf("Database: %d devices, %d addresses", devs, addrs) + log.Printf("Database: %d devices, %d addresses, %d relays", devs, addrs, relays) return nil } diff --git a/cmd/discosrv/db.go b/cmd/discosrv/db.go index 84f558133..34162d588 100644 --- a/cmd/discosrv/db.go +++ b/cmd/discosrv/db.go @@ -2,84 +2,31 @@ package main -import "database/sql" +import ( + "database/sql" + "fmt" +) -func setupDB(db *sql.DB) error { - var err error +type setupFunc func(db *sql.DB) error +type compileFunc func(db *sql.DB) (map[string]*sql.Stmt, error) - _, err = db.Exec(`CREATE TABLE IF NOT EXISTS Devices ( - DeviceID CHAR(63) NOT NULL PRIMARY KEY, - Seen TIMESTAMP NOT NULL - )`) - if err != nil { - return err - } +var ( + setupFuncs = make(map[string]setupFunc) + compileFuncs = make(map[string]compileFunc) +) - row := db.QueryRow(`SELECT 'DevicesDeviceIDIndex'::regclass`) - if err := row.Scan(nil); err != nil { - _, err = db.Exec(`CREATE INDEX DevicesDeviceIDIndex ON Devices (DeviceID)`) - } - if err != nil { - return err - } - - row = db.QueryRow(`SELECT 'DevicesSeenIndex'::regclass`) - if err := row.Scan(nil); err != nil { - _, err = db.Exec(`CREATE INDEX DevicesSeenIndex ON Devices (Seen)`) - } - if err != nil { - return err - } - - _, err = db.Exec(`CREATE TABLE IF NOT EXISTS Addresses ( - DeviceID CHAR(63) NOT NULL, - Seen TIMESTAMP NOT NULL, - Address VARCHAR(42) NOT NULL, - Port INTEGER NOT NULL - )`) - if err != nil { - return err - } - - row = db.QueryRow(`SELECT 'AddressesDeviceIDSeenIndex'::regclass`) - if err := row.Scan(nil); err != nil { - _, err = db.Exec(`CREATE INDEX AddressesDeviceIDSeenIndex ON Addresses (DeviceID, Seen)`) - } - if err != nil { - return err - } - - row = db.QueryRow(`SELECT 'AddressesDeviceIDAddressPortIndex'::regclass`) - if err := row.Scan(nil); err != nil { - _, err = db.Exec(`CREATE INDEX AddressesDeviceIDAddressPortIndex ON Addresses (DeviceID, Address, Port)`) - } - if err != nil { - return err - } - - return nil +func register(name string, setup setupFunc, compile compileFunc) { + setupFuncs[name] = setup + compileFuncs[name] = compile } -func compileStatements(db *sql.DB) (map[string]*sql.Stmt, error) { - stmts := map[string]string{ - "cleanAddress": "DELETE FROM Addresses WHERE Seen < now() - '2 hour'::INTERVAL", - "cleanDevice": "DELETE FROM Devices WHERE Seen < now() - '24 hour'::INTERVAL", - "countAddress": "SELECT count(*) FROM Addresses", - "countDevice": "SELECT count(*) FROM Devices", - "insertAddress": "INSERT INTO Addresses (DeviceID, Seen, Address, Port) VALUES ($1, now(), $2, $3)", - "insertDevice": "INSERT INTO Devices (DeviceID, Seen) VALUES ($1, now())", - "selectAddress": "SELECT Address, Port from Addresses WHERE DeviceID=$1 AND Seen > now() - '1 hour'::INTERVAL ORDER BY random() LIMIT 16", - "updateAddress": "UPDATE Addresses SET Seen=now() WHERE DeviceID=$1 AND Address=$2 AND Port=$3", - "updateDevice": "UPDATE Devices SET Seen=now() WHERE DeviceID=$1", +func setup(backend string, db *sql.DB) (map[string]*sql.Stmt, error) { + setup, ok := setupFuncs[backend] + if !ok { + return nil, fmt.Errorf("Unsupported backend") } - - res := make(map[string]*sql.Stmt, len(stmts)) - for key, stmt := range stmts { - prep, err := db.Prepare(stmt) - if err != nil { - return nil, err - } - res[key] = prep + if err := setup(db); err != nil { + return nil, err } - return res, nil + return compileFuncs[backend](db) } diff --git a/cmd/discosrv/main.go b/cmd/discosrv/main.go index 013a602af..91f16c10c 100644 --- a/cmd/discosrv/main.go +++ b/cmd/discosrv/main.go @@ -8,10 +8,8 @@ import ( "log" "net" "os" - "strings" "time" - _ "github.com/lib/pq" "github.com/thejerf/suture" ) @@ -19,9 +17,10 @@ var ( lruSize = 10240 limitAvg = 5 limitBurst = 20 - dbConn = getEnvDefault("DISCOSRV_DB", "postgres://user:password@localhost/discosrv") globalStats stats statsFile string + backend = "ql" + dsn = getEnvDefault("DISCOSRV_DB_DSN", "memory://discosrv") ) func main() { @@ -35,30 +34,26 @@ func main() { log.SetOutput(os.Stdout) log.SetFlags(0) - flag.StringVar(&listen, "listen", ":22026", "Listen address") + flag.StringVar(&listen, "listen", ":22027", "Listen address") flag.IntVar(&lruSize, "limit-cache", lruSize, "Limiter cache entries") flag.IntVar(&limitAvg, "limit-avg", limitAvg, "Allowed average package rate, per 10 s") flag.IntVar(&limitBurst, "limit-burst", limitBurst, "Allowed burst size, packets") flag.StringVar(&statsFile, "stats-file", statsFile, "File to write periodic operation stats to") + flag.StringVar(&backend, "db-backend", backend, "Database backend to use") + flag.StringVar(&dsn, "db-dsn", dsn, "Database DSN") flag.Parse() addr, _ := net.ResolveUDPAddr("udp", listen) - if !strings.Contains(dbConn, "sslmode=") { - dbConn += "?sslmode=disable" - } - var err error - db, err := sql.Open("postgres", dbConn) + db, err := sql.Open(backend, dsn) + if err != nil { + log.Fatalln("sql.Open:", err) + } + prep, err := setup(backend, db) if err != nil { log.Fatalln("Setup:", err) } - err = setupDB(db) - if err != nil { - log.Fatalln("Setup:", err) - } - - prep, err := compileStatements(db) main := suture.NewSimple("main") diff --git a/cmd/discosrv/psql.go b/cmd/discosrv/psql.go new file mode 100644 index 000000000..068547812 --- /dev/null +++ b/cmd/discosrv/psql.go @@ -0,0 +1,123 @@ +// Copyright (C) 2014-2015 Jakob Borg and Contributors (see the CONTRIBUTORS file). + +package main + +import ( + "database/sql" + + _ "github.com/lib/pq" +) + +func init() { + register("postgres", postgresSetup, postgresCompile) +} + +func postgresSetup(db *sql.DB) error { + var err error + + _, err = db.Exec(`CREATE TABLE IF NOT EXISTS Devices ( + DeviceID CHAR(63) NOT NULL PRIMARY KEY, + Seen TIMESTAMP NOT NULL + )`) + if err != nil { + return err + } + + row := db.QueryRow(`SELECT 'DevicesDeviceIDIndex'::regclass`) + if err := row.Scan(nil); err != nil { + _, err = db.Exec(`CREATE INDEX DevicesDeviceIDIndex ON Devices (DeviceID)`) + } + if err != nil { + return err + } + + row = db.QueryRow(`SELECT 'DevicesSeenIndex'::regclass`) + if err := row.Scan(nil); err != nil { + _, err = db.Exec(`CREATE INDEX DevicesSeenIndex ON Devices (Seen)`) + } + if err != nil { + return err + } + + _, err = db.Exec(`CREATE TABLE IF NOT EXISTS Addresses ( + DeviceID CHAR(63) NOT NULL, + Seen TIMESTAMP NOT NULL, + Address VARCHAR(256) NOT NULL + )`) + if err != nil { + return err + } + + row = db.QueryRow(`SELECT 'AddressesDeviceIDSeenIndex'::regclass`) + if err := row.Scan(nil); err != nil { + _, err = db.Exec(`CREATE INDEX AddressesDeviceIDSeenIndex ON Addresses (DeviceID, Seen)`) + } + if err != nil { + return err + } + + row = db.QueryRow(`SELECT 'AddressesDeviceIDAddressIndex'::regclass`) + if err := row.Scan(nil); err != nil { + _, err = db.Exec(`CREATE INDEX AddressesDeviceIDAddressIndex ON Addresses (DeviceID, Address)`) + } + if err != nil { + return err + } + + _, err = db.Exec(`CREATE TABLE IF NOT EXISTS Relays ( + DeviceID CHAR(63) NOT NULL, + Seen TIMESTAMP NOT NULL, + Address VARCHAR(256) NOT NULL, + Latency INTEGER NOT NULL + )`) + if err != nil { + return err + } + + row = db.QueryRow(`SELECT 'RelaysDeviceIDSeenIndex'::regclass`) + if err := row.Scan(nil); err != nil { + _, err = db.Exec(`CREATE INDEX RelaysDeviceIDSeenIndex ON Relays (DeviceID, Seen)`) + } + if err != nil { + return err + } + + row = db.QueryRow(`SELECT 'RelaysDeviceIDAddressIndex'::regclass`) + if err := row.Scan(nil); err != nil { + _, err = db.Exec(`CREATE INDEX RelaysDeviceIDAddressIndex ON Relays (DeviceID, Address)`) + } + if err != nil { + return err + } + + return nil +} + +func postgresCompile(db *sql.DB) (map[string]*sql.Stmt, error) { + stmts := map[string]string{ + "cleanAddress": "DELETE FROM Addresses WHERE Seen < now() - '2 hour'::INTERVAL", + "cleanRelay": "DELETE FROM Relays WHERE Seen < now() - '2 hour'::INTERVAL", + "cleanDevice": "DELETE FROM Devices WHERE Seen < now() - '24 hour'::INTERVAL", + "countAddress": "SELECT count(*) FROM Addresses", + "countDevice": "SELECT count(*) FROM Devices", + "countRelay": "SELECT count(*) FROM Relays", + "insertAddress": "INSERT INTO Addresses (DeviceID, Seen, Address) VALUES ($1, now(), $2)", + "insertRelay": "INSERT INTO Relays (DeviceID, Seen, Address, Latency) VALUES ($1, now(), $2, $3)", + "insertDevice": "INSERT INTO Devices (DeviceID, Seen) VALUES ($1, now())", + "selectAddress": "SELECT Address from Addresses WHERE DeviceID=$1 AND Seen > now() - '1 hour'::INTERVAL ORDER BY random() LIMIT 16", + "selectRelay": "SELECT Address, Latency from Relays WHERE DeviceID=$1 AND Seen > now() - '1 hour'::INTERVAL ORDER BY random() LIMIT 16", + "updateRelay": "UPDATE Relays SET Seen=now(), Latency=$3 WHERE DeviceID=$1 AND Address=$2", + "updateDevice": "UPDATE Devices SET Seen=now() WHERE DeviceID=$1", + "deleteRelay": "DELETE FROM Relays WHERE DeviceID=$1", + } + + res := make(map[string]*sql.Stmt, len(stmts)) + for key, stmt := range stmts { + prep, err := db.Prepare(stmt) + if err != nil { + return nil, err + } + res[key] = prep + } + return res, nil +} diff --git a/cmd/discosrv/ql.go b/cmd/discosrv/ql.go new file mode 100644 index 000000000..850eac74f --- /dev/null +++ b/cmd/discosrv/ql.go @@ -0,0 +1,98 @@ +// Copyright (C) 2015 Audrius Butkevicius and Contributors (see the CONTRIBUTORS file). + +package main + +import ( + "database/sql" + "log" + + "github.com/cznic/ql" +) + +func init() { + ql.RegisterDriver() + register("ql", qlSetup, qlCompile) +} + +func qlSetup(db *sql.DB) (err error) { + tx, err := db.Begin() + if err != nil { + return + } + + defer func() { + if err == nil { + err = tx.Commit() + } else { + tx.Rollback() + } + }() + + _, err = tx.Exec(`CREATE TABLE IF NOT EXISTS Devices ( + DeviceID STRING NOT NULL, + Seen TIME NOT NULL + )`) + if err != nil { + return + } + + if _, err = tx.Exec(`CREATE INDEX IF NOT EXISTS DevicesDeviceIDIndex ON Devices (DeviceID)`); err != nil { + return + } + + _, err = tx.Exec(`CREATE TABLE IF NOT EXISTS Addresses ( + DeviceID STRING NOT NULL, + Seen TIME NOT NULL, + Address STRING NOT NULL, + )`) + if err != nil { + return + } + + if _, err = tx.Exec(`CREATE INDEX IF NOT EXISTS AddressesDeviceIDAddressIndex ON Addresses (DeviceID, Address)`); err != nil { + return + } + + _, err = tx.Exec(`CREATE TABLE IF NOT EXISTS Relays ( + DeviceID STRING NOT NULL, + Seen TIME NOT NULL, + Address STRING NOT NULL, + Latency INT NOT NULL, + )`) + if err != nil { + return + } + + _, err = tx.Exec(`CREATE INDEX IF NOT EXISTS RelaysDeviceIDAddressIndex ON Relays (DeviceID, Address)`) + return +} + +func qlCompile(db *sql.DB) (map[string]*sql.Stmt, error) { + stmts := map[string]string{ + "cleanAddress": `DELETE FROM Addresses WHERE Seen < now() - duration("2h")`, + "cleanRelay": `DELETE FROM Relays WHERE Seen < now() - duration("2h")`, + "cleanDevice": `DELETE FROM Devices WHERE Seen < now() - duration("24h")`, + "countAddress": "SELECT count(*) FROM Addresses", + "countDevice": "SELECT count(*) FROM Devices", + "countRelay": "SELECT count(*) FROM Relays", + "insertAddress": "INSERT INTO Addresses (DeviceID, Seen, Address) VALUES ($1, now(), $2)", + "insertRelay": "INSERT INTO Relays (DeviceID, Seen, Address, Latency) VALUES ($1, now(), $2, $3)", + "insertDevice": "INSERT INTO Devices (DeviceID, Seen) VALUES ($1, now())", + "selectAddress": `SELECT Address from Addresses WHERE DeviceID==$1 AND Seen > now() - duration("1h") LIMIT 16`, + "selectRelay": `SELECT Address, Latency from Relays WHERE DeviceID==$1 AND Seen > now() - duration("1h") LIMIT 16`, + "updateAddress": "UPDATE Addresses Seen=now()WHERE DeviceID==$1 AND Address==$2", + "updateDevice": "UPDATE Devices Seen=now() WHERE DeviceID==$1", + "deleteRelay": "DELETE FROM Relays WHERE DeviceID==$1", + } + + res := make(map[string]*sql.Stmt, len(stmts)) + for key, stmt := range stmts { + prep, err := db.Prepare(stmt) + if err != nil { + log.Println("Failed to compile", stmt) + return nil, err + } + res[key] = prep + } + return res, nil +} diff --git a/cmd/discosrv/querysrv.go b/cmd/discosrv/querysrv.go index 410c90386..c4abd17f3 100644 --- a/cmd/discosrv/querysrv.go +++ b/cmd/discosrv/querysrv.go @@ -9,6 +9,7 @@ import ( "io" "log" "net" + "net/url" "time" "github.com/golang/groupcache/lru" @@ -70,7 +71,7 @@ func (s *querysrv) Serve() { switch magic { case discover.AnnouncementMagic: - err := s.handleAnnounceV2(addr, buf) + err := s.handleAnnounce(addr, buf) globalStats.Announce() if err != nil { log.Println("Announce:", err) @@ -78,7 +79,7 @@ func (s *querysrv) Serve() { } case discover.QueryMagic: - err := s.handleQueryV2(conn, addr, buf) + err := s.handleQuery(conn, addr, buf) globalStats.Query() if err != nil { log.Println("Query:", err) @@ -95,7 +96,7 @@ func (s *querysrv) Stop() { panic("stop unimplemented") } -func (s *querysrv) handleAnnounceV2(addr *net.UDPAddr, buf []byte) error { +func (s *querysrv) handleAnnounce(addr *net.UDPAddr, buf []byte) error { var pkt discover.Announce err := pkt.UnmarshalXDR(buf) if err != nil && err != io.EOF { @@ -103,15 +104,7 @@ func (s *querysrv) handleAnnounceV2(addr *net.UDPAddr, buf []byte) error { } var id protocol.DeviceID - if len(pkt.This.ID) == 32 { - // Raw node ID - copy(id[:], pkt.This.ID) - } else { - err = id.UnmarshalText(pkt.This.ID) - if err != nil { - return err - } - } + copy(id[:], pkt.This.ID) if id == protocol.LocalDeviceID { return fmt.Errorf("Rejecting announce for local device ID from %v", addr) @@ -123,11 +116,40 @@ func (s *querysrv) handleAnnounceV2(addr *net.UDPAddr, buf []byte) error { } for _, annAddr := range pkt.This.Addresses { - tip := annAddr.IP - if len(tip) == 0 { - tip = addr.IP + uri, err := url.Parse(annAddr) + if err != nil { + continue } - if err := s.updateAddress(tx, id, tip, annAddr.Port); err != nil { + + host, port, err := net.SplitHostPort(uri.Host) + if err != nil { + continue + } + + if len(host) == 0 { + uri.Host = net.JoinHostPort(addr.IP.String(), port) + } + + if err := s.updateAddress(tx, id, uri.String()); err != nil { + tx.Rollback() + return err + } + } + + _, err = tx.Stmt(s.prep["deleteRelay"]).Exec(id.String()) + if err != nil { + tx.Rollback() + return err + } + + for _, relay := range pkt.This.Relays { + uri, err := url.Parse(relay.Address) + if err != nil { + continue + } + + _, err = tx.Stmt(s.prep["insertRelay"]).Exec(id.String(), uri, relay.Latency) + if err != nil { tx.Rollback() return err } @@ -141,7 +163,7 @@ func (s *querysrv) handleAnnounceV2(addr *net.UDPAddr, buf []byte) error { return tx.Commit() } -func (s *querysrv) handleQueryV2(conn *net.UDPConn, addr *net.UDPAddr, buf []byte) error { +func (s *querysrv) handleQuery(conn *net.UDPConn, addr *net.UDPAddr, buf []byte) error { var pkt discover.Query err := pkt.UnmarshalXDR(buf) if err != nil { @@ -149,27 +171,25 @@ func (s *querysrv) handleQueryV2(conn *net.UDPConn, addr *net.UDPAddr, buf []byt } var id protocol.DeviceID - if len(pkt.DeviceID) == 32 { - // Raw node ID - copy(id[:], pkt.DeviceID) - } else { - err = id.UnmarshalText(pkt.DeviceID) - if err != nil { - return err - } - } + copy(id[:], pkt.DeviceID) addrs, err := s.getAddresses(id) if err != nil { return err } + relays, err := s.getRelays(id) + if err != nil { + return err + } + if len(addrs) > 0 { ann := discover.Announce{ Magic: discover.AnnouncementMagic, This: discover.Device{ ID: pkt.DeviceID, Addresses: addrs, + Relays: relays, }, } @@ -222,14 +242,14 @@ func (s *querysrv) updateDevice(tx *sql.Tx, device protocol.DeviceID) error { return nil } -func (s *querysrv) updateAddress(tx *sql.Tx, device protocol.DeviceID, ip net.IP, port uint16) error { - res, err := tx.Stmt(s.prep["updateAddress"]).Exec(device.String(), ip.String(), port) +func (s *querysrv) updateAddress(tx *sql.Tx, device protocol.DeviceID, uri string) error { + res, err := tx.Stmt(s.prep["updateAddress"]).Exec(device.String(), uri) if err != nil { return err } if rows, _ := res.RowsAffected(); rows == 0 { - _, err := tx.Stmt(s.prep["insertAddress"]).Exec(device.String(), ip.String(), port) + _, err := tx.Stmt(s.prep["insertAddress"]).Exec(device.String(), uri) if err != nil { return err } @@ -238,27 +258,47 @@ func (s *querysrv) updateAddress(tx *sql.Tx, device protocol.DeviceID, ip net.IP return nil } -func (s *querysrv) getAddresses(device protocol.DeviceID) ([]discover.Address, error) { +func (s *querysrv) getAddresses(device protocol.DeviceID) ([]string, error) { rows, err := s.prep["selectAddress"].Query(device.String()) if err != nil { return nil, err } - var res []discover.Address + var res []string for rows.Next() { var addr string - var port int - err := rows.Scan(&addr, &port) + + err := rows.Scan(&addr) if err != nil { log.Println("Scan:", err) continue } - ip := net.ParseIP(addr) - bs := ip.To4() - if bs == nil { - bs = ip.To16() - } - res = append(res, discover.Address{IP: []byte(bs), Port: uint16(port)}) + res = append(res, addr) + } + + return res, nil +} + +func (s *querysrv) getRelays(device protocol.DeviceID) ([]discover.Relay, error) { + rows, err := s.prep["selectRelay"].Query(device.String()) + if err != nil { + return nil, err + } + + var res []discover.Relay + for rows.Next() { + var addr string + var latency int32 + + err := rows.Scan(&addr, &latency) + if err != nil { + log.Println("Scan:", err) + continue + } + res = append(res, discover.Relay{ + Address: addr, + Latency: latency, + }) } return res, nil