From 31589625061185cb01ea2ffd296b14ec99c423cc Mon Sep 17 00:00:00 2001 From: Jakob Borg Date: Thu, 3 Apr 2014 23:38:32 +0200 Subject: [PATCH] discosrv: Source based rate limiting --- cmd/discosrv/main.go | 45 +++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 44 insertions(+), 1 deletion(-) diff --git a/cmd/discosrv/main.go b/cmd/discosrv/main.go index ba98a1413..94d1b20a7 100644 --- a/cmd/discosrv/main.go +++ b/cmd/discosrv/main.go @@ -11,6 +11,8 @@ import ( "time" "github.com/calmh/syncthing/discover" + "github.com/golang/groupcache/lru" + "github.com/juju/ratelimit" ) type Node struct { @@ -28,7 +30,9 @@ var ( lock sync.Mutex queries = 0 answered = 0 + limited = 0 debug = false + limiter = lru.New(1024) ) func main() { @@ -57,9 +61,16 @@ func main() { for { buf = buf[:cap(buf)] n, addr, err := conn.ReadFromUDP(buf) + + if limit(addr) { + // Rate limit in effect for source + continue + } + if err != nil { log.Fatal(err) } + if n < 4 { log.Printf("Received short packet (%d bytes)", n) continue @@ -84,6 +95,36 @@ func main() { } } +func limit(addr *net.UDPAddr) bool { + key := addr.IP.String() + + lock.Lock() + defer lock.Unlock() + + bkt, ok := limiter.Get(key) + if ok { + bkt := bkt.(*ratelimit.Bucket) + if bkt.TakeAvailable(1) != 1 { + // Rate limit exceeded; ignore packet + if debug { + log.Printf("Rate limit exceeded for", key) + } + limited++ + return true + } else if debug { + log.Printf("Rate limit OK for", key) + } + } else { + if debug { + log.Printf("New limiter for", key) + } + // One packet per ten seconds average rate, burst ten packets + limiter.Add(key, ratelimit.NewBucket(10*time.Second, 10)) + } + + return false +} + func handleAnnounceV1(addr *net.UDPAddr, buf []byte) { var pkt discover.AnnounceV1 err := pkt.UnmarshalXDR(buf) @@ -246,9 +287,11 @@ func logStats() { deleted++ } } - log.Printf("Expired %d nodes; %d nodes in registry; %d queries (%d answered)", deleted, len(nodes), queries, answered) + log.Printf("Expired %d nodes; %d nodes in registry; %d queries (%d answered); %d entries in limiter cache", deleted, len(nodes), queries, answered, limiter.Len()) + log.Printf("Limited %d queries; %d entries in limiter cache", limited, limiter.Len()) queries = 0 answered = 0 + limited = 0 lock.Unlock() }