mirror of
https://github.com/octoleo/syncthing.git
synced 2025-01-10 18:24:44 +00:00
discosrv: Source based rate limiting
This commit is contained in:
parent
c314f74de6
commit
3158962506
@ -11,6 +11,8 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/calmh/syncthing/discover"
|
"github.com/calmh/syncthing/discover"
|
||||||
|
"github.com/golang/groupcache/lru"
|
||||||
|
"github.com/juju/ratelimit"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Node struct {
|
type Node struct {
|
||||||
@ -28,7 +30,9 @@ var (
|
|||||||
lock sync.Mutex
|
lock sync.Mutex
|
||||||
queries = 0
|
queries = 0
|
||||||
answered = 0
|
answered = 0
|
||||||
|
limited = 0
|
||||||
debug = false
|
debug = false
|
||||||
|
limiter = lru.New(1024)
|
||||||
)
|
)
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
@ -57,9 +61,16 @@ func main() {
|
|||||||
for {
|
for {
|
||||||
buf = buf[:cap(buf)]
|
buf = buf[:cap(buf)]
|
||||||
n, addr, err := conn.ReadFromUDP(buf)
|
n, addr, err := conn.ReadFromUDP(buf)
|
||||||
|
|
||||||
|
if limit(addr) {
|
||||||
|
// Rate limit in effect for source
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if n < 4 {
|
if n < 4 {
|
||||||
log.Printf("Received short packet (%d bytes)", n)
|
log.Printf("Received short packet (%d bytes)", n)
|
||||||
continue
|
continue
|
||||||
@ -84,6 +95,36 @@ func main() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func limit(addr *net.UDPAddr) bool {
|
||||||
|
key := addr.IP.String()
|
||||||
|
|
||||||
|
lock.Lock()
|
||||||
|
defer lock.Unlock()
|
||||||
|
|
||||||
|
bkt, ok := limiter.Get(key)
|
||||||
|
if ok {
|
||||||
|
bkt := bkt.(*ratelimit.Bucket)
|
||||||
|
if bkt.TakeAvailable(1) != 1 {
|
||||||
|
// Rate limit exceeded; ignore packet
|
||||||
|
if debug {
|
||||||
|
log.Printf("Rate limit exceeded for", key)
|
||||||
|
}
|
||||||
|
limited++
|
||||||
|
return true
|
||||||
|
} else if debug {
|
||||||
|
log.Printf("Rate limit OK for", key)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if debug {
|
||||||
|
log.Printf("New limiter for", key)
|
||||||
|
}
|
||||||
|
// One packet per ten seconds average rate, burst ten packets
|
||||||
|
limiter.Add(key, ratelimit.NewBucket(10*time.Second, 10))
|
||||||
|
}
|
||||||
|
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
func handleAnnounceV1(addr *net.UDPAddr, buf []byte) {
|
func handleAnnounceV1(addr *net.UDPAddr, buf []byte) {
|
||||||
var pkt discover.AnnounceV1
|
var pkt discover.AnnounceV1
|
||||||
err := pkt.UnmarshalXDR(buf)
|
err := pkt.UnmarshalXDR(buf)
|
||||||
@ -246,9 +287,11 @@ func logStats() {
|
|||||||
deleted++
|
deleted++
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
log.Printf("Expired %d nodes; %d nodes in registry; %d queries (%d answered)", deleted, len(nodes), queries, answered)
|
log.Printf("Expired %d nodes; %d nodes in registry; %d queries (%d answered); %d entries in limiter cache", deleted, len(nodes), queries, answered, limiter.Len())
|
||||||
|
log.Printf("Limited %d queries; %d entries in limiter cache", limited, limiter.Len())
|
||||||
queries = 0
|
queries = 0
|
||||||
answered = 0
|
answered = 0
|
||||||
|
limited = 0
|
||||||
|
|
||||||
lock.Unlock()
|
lock.Unlock()
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user