discosrv: Source based rate limiting

This commit is contained in:
Jakob Borg 2014-04-03 23:38:32 +02:00
parent aa3d73d322
commit 671d5cace6

View File

@ -11,6 +11,8 @@ import (
"time"
"github.com/calmh/syncthing/discover"
"github.com/golang/groupcache/lru"
"github.com/juju/ratelimit"
)
type Node struct {
@ -28,7 +30,9 @@ var (
lock sync.Mutex
queries = 0
answered = 0
limited = 0
debug = false
limiter = lru.New(1024)
)
func main() {
@ -57,9 +61,16 @@ func main() {
for {
buf = buf[:cap(buf)]
n, addr, err := conn.ReadFromUDP(buf)
if limit(addr) {
// Rate limit in effect for source
continue
}
if err != nil {
log.Fatal(err)
}
if n < 4 {
log.Printf("Received short packet (%d bytes)", n)
continue
@ -84,6 +95,36 @@ func main() {
}
}
func limit(addr *net.UDPAddr) bool {
key := addr.IP.String()
lock.Lock()
defer lock.Unlock()
bkt, ok := limiter.Get(key)
if ok {
bkt := bkt.(*ratelimit.Bucket)
if bkt.TakeAvailable(1) != 1 {
// Rate limit exceeded; ignore packet
if debug {
log.Printf("Rate limit exceeded for", key)
}
limited++
return true
} else if debug {
log.Printf("Rate limit OK for", key)
}
} else {
if debug {
log.Printf("New limiter for", key)
}
// One packet per ten seconds average rate, burst ten packets
limiter.Add(key, ratelimit.NewBucket(10*time.Second, 10))
}
return false
}
func handleAnnounceV1(addr *net.UDPAddr, buf []byte) {
var pkt discover.AnnounceV1
err := pkt.UnmarshalXDR(buf)
@ -246,9 +287,11 @@ func logStats() {
deleted++
}
}
log.Printf("Expired %d nodes; %d nodes in registry; %d queries (%d answered)", deleted, len(nodes), queries, answered)
log.Printf("Expired %d nodes; %d nodes in registry; %d queries (%d answered); %d entries in limiter cache", deleted, len(nodes), queries, answered, limiter.Len())
log.Printf("Limited %d queries; %d entries in limiter cache", limited, limiter.Len())
queries = 0
answered = 0
limited = 0
lock.Unlock()
}