From db6d3b495bbfe0b446c8f10fb62c255be6d64f04 Mon Sep 17 00:00:00 2001 From: Jakob Borg Date: Mon, 8 Sep 2014 11:48:26 +0200 Subject: [PATCH] Use persistent (leveldb) storage --- cmd/discosrv/main.go | 169 ++++++++++++++++++++++++++++---------- cmd/discosrv/types.go | 15 ++++ cmd/discosrv/types_xdr.go | 151 ++++++++++++++++++++++++++++++++++ 3 files changed, 291 insertions(+), 44 deletions(-) create mode 100644 cmd/discosrv/types.go create mode 100644 cmd/discosrv/types_xdr.go diff --git a/cmd/discosrv/main.go b/cmd/discosrv/main.go index f31179cf1..5b7a8771d 100644 --- a/cmd/discosrv/main.go +++ b/cmd/discosrv/main.go @@ -5,6 +5,7 @@ package main import ( + "bytes" "encoding/binary" "encoding/hex" "flag" @@ -13,6 +14,7 @@ import ( "log" "net" "os" + "path/filepath" "sync" "time" @@ -20,20 +22,13 @@ import ( "github.com/juju/ratelimit" "github.com/syncthing/syncthing/discover" "github.com/syncthing/syncthing/protocol" + "github.com/syndtr/goleveldb/leveldb" + "github.com/syndtr/goleveldb/leveldb/opt" ) -type node struct { - addresses []address - updated time.Time -} - -type address struct { - ip []byte - port uint16 -} +const cacheLimitSeconds = 3600 var ( - nodes = make(map[protocol.NodeID]node) lock sync.Mutex queries = 0 announces = 0 @@ -52,15 +47,17 @@ func main() { var timestamp bool var statsIntv int var statsFile string + var dbDir string flag.StringVar(&listen, "listen", ":22026", "Listen address") flag.BoolVar(&debug, "debug", false, "Enable debug output") flag.BoolVar(×tamp, "timestamp", true, "Timestamp the log output") flag.IntVar(&statsIntv, "stats-intv", 0, "Statistics output interval (s)") - flag.StringVar(&statsFile, "stats-file", "/var/log/discosrv.stats", "Statistics file name") + flag.StringVar(&statsFile, "stats-file", "/var/discosrv/stats", "Statistics file name") flag.IntVar(&lruSize, "limit-cache", lruSize, "Limiter cache entries") flag.IntVar(&limitAvg, "limit-avg", limitAvg, "Allowed average package rate, per 10 s") flag.IntVar(&limitBurst, "limit-burst", limitBurst, "Allowed burst size, packets") + flag.StringVar(&dbDir, "db-dir", "/var/discosrv/db", "Database directory") flag.Parse() limiter = lru.New(lruSize) @@ -76,10 +73,30 @@ func main() { log.Fatal(err) } - if statsIntv > 0 { - go logStats(statsFile, statsIntv) + parentDir := filepath.Dir(dbDir) + if _, err := os.Stat(parentDir); err != nil && os.IsNotExist(err) { + err = os.MkdirAll(parentDir, 0755) + if err != nil { + log.Fatal(err) + } } + db, err := leveldb.OpenFile(dbDir, &opt.Options{CachedOpenFiles: 32}) + if err != nil { + log.Fatal(err) + } + + statsLog, err := os.OpenFile(statsFile, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644) + if err != nil { + log.Fatal(err) + } + + if statsIntv > 0 { + go logStats(statsLog, statsIntv) + } + + go clean(statsLog, db) + var buf = make([]byte, 1024) for { buf = buf[:cap(buf)] @@ -104,10 +121,10 @@ func main() { switch magic { case discover.AnnouncementMagic: - handleAnnounceV2(addr, buf) + handleAnnounceV2(db, addr, buf) case discover.QueryMagic: - handleQueryV2(conn, addr, buf) + handleQueryV2(db, conn, addr, buf) default: lock.Lock() @@ -145,7 +162,7 @@ func limit(addr *net.UDPAddr) bool { return false } -func handleAnnounceV2(addr *net.UDPAddr, buf []byte) { +func handleAnnounceV2(db *leveldb.DB, addr *net.UDPAddr, buf []byte) { var pkt discover.Announce err := pkt.UnmarshalXDR(buf) if err != nil && err != io.EOF { @@ -167,6 +184,7 @@ func handleAnnounceV2(addr *net.UDPAddr, buf []byte) { } var addrs []address + now := time.Now().Unix() for _, addr := range pkt.This.Addresses { tip := addr.IP if len(tip) == 0 { @@ -175,14 +193,10 @@ func handleAnnounceV2(addr *net.UDPAddr, buf []byte) { addrs = append(addrs, address{ ip: tip, port: addr.Port, + seen: now, }) } - node := node{ - addresses: addrs, - updated: time.Now(), - } - var id protocol.NodeID if len(pkt.This.ID) == 32 { // Raw node ID @@ -191,12 +205,10 @@ func handleAnnounceV2(addr *net.UDPAddr, buf []byte) { id.UnmarshalText(pkt.This.ID) } - lock.Lock() - nodes[id] = node - lock.Unlock() + update(db, id, addrs) } -func handleQueryV2(conn *net.UDPConn, addr *net.UDPAddr, buf []byte) { +func handleQueryV2(db *leveldb.DB, conn *net.UDPConn, addr *net.UDPAddr, buf []byte) { var pkt discover.Query err := pkt.UnmarshalXDR(buf) if err != nil { @@ -217,24 +229,33 @@ func handleQueryV2(conn *net.UDPConn, addr *net.UDPAddr, buf []byte) { } lock.Lock() - node, ok := nodes[id] queries++ lock.Unlock() - if ok && len(node.addresses) > 0 { + addrs := get(db, id) + + now := time.Now().Unix() + if len(addrs) > 0 { ann := discover.Announce{ Magic: discover.AnnouncementMagic, This: discover.Node{ ID: pkt.NodeID, }, } - for _, addr := range node.addresses { + for _, addr := range addrs { + if now-addr.seen > cacheLimitSeconds { + continue + } ann.This.Addresses = append(ann.This.Addresses, discover.Address{IP: addr.ip, Port: addr.port}) } if debug { log.Printf("-> %v %#v", addr, pkt) } + if len(ann.This.Addresses) == 0 { + return + } + tb := ann.MarshalXDR() _, _, err = conn.WriteMsgUDP(tb, nil, addr) if err != nil { @@ -255,27 +276,14 @@ func next(intv int) time.Time { return t1 } -func logStats(file string, intv int) { - f, err := os.OpenFile(file, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644) - if err != nil { - log.Fatal(err) - } +func logStats(statsLog io.Writer, intv int) { for { t := next(intv) lock.Lock() - var deleted = 0 - for id, node := range nodes { - if time.Since(node.updated) > 60*time.Minute { - delete(nodes, id) - deleted++ - } - } - - fmt.Fprintf(f, "%d Nr:%d Ne:%d Qt:%d Qa:%d A:%d U:%d Lq:%d Lc:%d\n", - t.Unix(), len(nodes), deleted, queries, answered, announces, unknowns, limited, limiter.Len()) - f.Sync() + fmt.Fprintf(statsLog, "%d Queries:%d Answered:%d Announces:%d Unknown:%d Limited:%d\n", + t.Unix(), queries, answered, announces, unknowns, limited) queries = 0 announces = 0 @@ -286,3 +294,76 @@ func logStats(file string, intv int) { lock.Unlock() } } + +func get(db *leveldb.DB, id protocol.NodeID) []address { + var addrs addressList + val, err := db.Get(id[:], nil) + if err == nil { + addrs.UnmarshalXDR(val) + } + return addrs.addresses +} + +func update(db *leveldb.DB, id protocol.NodeID, addrs []address) { + var newAddrs addressList + + val, err := db.Get(id[:], nil) + if err == nil { + newAddrs.UnmarshalXDR(val) + } + +nextAddr: + for _, newAddr := range addrs { + for i, exAddr := range newAddrs.addresses { + if bytes.Compare(newAddr.ip, exAddr.ip) == 0 { + newAddrs.addresses[i] = newAddr + continue nextAddr + } + } + newAddrs.addresses = append(newAddrs.addresses, newAddr) + } + + db.Put(id[:], newAddrs.MarshalXDR(), nil) +} + +func clean(statsLog io.Writer, db *leveldb.DB) { + for { + now := time.Now() + nowSecs := now.Unix() + + var kept, deleted int64 + iter := db.NewIterator(nil, nil) + for iter.Next() { + var addrs addressList + addrs.UnmarshalXDR(iter.Value()) + + // Remove expired addresses + newAddrs := addrs.addresses + for i := 0; i < len(newAddrs); i++ { + if nowSecs-newAddrs[i].seen > cacheLimitSeconds { + newAddrs[i] = newAddrs[len(newAddrs)-1] + newAddrs = newAddrs[:len(newAddrs)-1] + } + } + + // Delete empty records + if len(newAddrs) == 0 { + db.Delete(iter.Key(), nil) + deleted++ + continue + } + + // Update changed records + if len(newAddrs) != len(addrs.addresses) { + addrs.addresses = newAddrs + db.Put(iter.Key(), addrs.MarshalXDR(), nil) + } + kept++ + } + iter.Release() + + fmt.Fprintf(statsLog, "%d Kept:%d Deleted:%d Took:%0.04fs\n", nowSecs, kept, deleted, time.Since(now).Seconds()) + + time.Sleep(cacheLimitSeconds * time.Second / 2) + } +} diff --git a/cmd/discosrv/types.go b/cmd/discosrv/types.go new file mode 100644 index 000000000..c6a072175 --- /dev/null +++ b/cmd/discosrv/types.go @@ -0,0 +1,15 @@ +// Copyright (C) 2014 Jakob Borg and Contributors (see the CONTRIBUTORS file). +// All rights reserved. Use of this source code is governed by an MIT-style +// license that can be found in the LICENSE file. + +package main + +type address struct { + ip []byte + port uint16 + seen int64 // epoch seconds +} + +type addressList struct { + addresses []address +} diff --git a/cmd/discosrv/types_xdr.go b/cmd/discosrv/types_xdr.go new file mode 100644 index 000000000..39db90b79 --- /dev/null +++ b/cmd/discosrv/types_xdr.go @@ -0,0 +1,151 @@ +// Copyright (C) 2014 Jakob Borg and Contributors (see the CONTRIBUTORS file). +// All rights reserved. Use of this source code is governed by an MIT-style +// license that can be found in the LICENSE file. + +// ************************************************************ +// This file is automatically generated by genxdr. Do not edit. +// ************************************************************ + +package main + +import ( + "bytes" + "io" + + "github.com/calmh/xdr" +) + +/* + +address Structure: + + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| Length of ip | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +/ / +\ ip (variable length) \ +/ / ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| 0x0000 | port | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| | ++ seen (64 bits) + +| | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + + +struct address { + opaque ip<>; + unsigned int port; + hyper seen; +} + +*/ + +func (o address) EncodeXDR(w io.Writer) (int, error) { + var xw = xdr.NewWriter(w) + return o.encodeXDR(xw) +} + +func (o address) MarshalXDR() []byte { + return o.AppendXDR(make([]byte, 0, 128)) +} + +func (o address) AppendXDR(bs []byte) []byte { + var aw = xdr.AppendWriter(bs) + var xw = xdr.NewWriter(&aw) + o.encodeXDR(xw) + return []byte(aw) +} + +func (o address) encodeXDR(xw *xdr.Writer) (int, error) { + xw.WriteBytes(o.ip) + xw.WriteUint16(o.port) + xw.WriteUint64(uint64(o.seen)) + return xw.Tot(), xw.Error() +} + +func (o *address) DecodeXDR(r io.Reader) error { + xr := xdr.NewReader(r) + return o.decodeXDR(xr) +} + +func (o *address) UnmarshalXDR(bs []byte) error { + var br = bytes.NewReader(bs) + var xr = xdr.NewReader(br) + return o.decodeXDR(xr) +} + +func (o *address) decodeXDR(xr *xdr.Reader) error { + o.ip = xr.ReadBytes() + o.port = xr.ReadUint16() + o.seen = int64(xr.ReadUint64()) + return xr.Error() +} + +/* + +addressList Structure: + + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| Number of addresses | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +/ / +\ Zero or more address Structures \ +/ / ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + + +struct addressList { + address addresses<>; +} + +*/ + +func (o addressList) EncodeXDR(w io.Writer) (int, error) { + var xw = xdr.NewWriter(w) + return o.encodeXDR(xw) +} + +func (o addressList) MarshalXDR() []byte { + return o.AppendXDR(make([]byte, 0, 128)) +} + +func (o addressList) AppendXDR(bs []byte) []byte { + var aw = xdr.AppendWriter(bs) + var xw = xdr.NewWriter(&aw) + o.encodeXDR(xw) + return []byte(aw) +} + +func (o addressList) encodeXDR(xw *xdr.Writer) (int, error) { + xw.WriteUint32(uint32(len(o.addresses))) + for i := range o.addresses { + o.addresses[i].encodeXDR(xw) + } + return xw.Tot(), xw.Error() +} + +func (o *addressList) DecodeXDR(r io.Reader) error { + xr := xdr.NewReader(r) + return o.decodeXDR(xr) +} + +func (o *addressList) UnmarshalXDR(bs []byte) error { + var br = bytes.NewReader(bs) + var xr = xdr.NewReader(br) + return o.decodeXDR(xr) +} + +func (o *addressList) decodeXDR(xr *xdr.Reader) error { + _addressesSize := int(xr.ReadUint32()) + o.addresses = make([]address, _addressesSize) + for i := range o.addresses { + (&o.addresses[i]).decodeXDR(xr) + } + return xr.Error() +}