lib/connections: Use own KCP fork, move listener setup earlier (ref #4446)

GitHub-Pull-Request: https://github.com/syncthing/syncthing/pull/4452
This commit is contained in:
Audrius Butkevicius 2017-10-22 12:36:36 +00:00
parent d65f1fb08a
commit 0d30166357
14 changed files with 75 additions and 16 deletions

View File

@ -11,9 +11,9 @@ import (
"net/url" "net/url"
"time" "time"
"github.com/AudriusButkevicius/kcp-go"
"github.com/syncthing/syncthing/lib/config" "github.com/syncthing/syncthing/lib/config"
"github.com/syncthing/syncthing/lib/protocol" "github.com/syncthing/syncthing/lib/protocol"
"github.com/xtaci/kcp-go"
"github.com/xtaci/smux" "github.com/xtaci/smux"
) )

View File

@ -15,9 +15,9 @@ import (
"sync/atomic" "sync/atomic"
"time" "time"
"github.com/AudriusButkevicius/kcp-go"
"github.com/AudriusButkevicius/pfilter" "github.com/AudriusButkevicius/pfilter"
"github.com/ccding/go-stun/stun" "github.com/ccding/go-stun/stun"
"github.com/xtaci/kcp-go"
"github.com/xtaci/smux" "github.com/xtaci/smux"
"github.com/syncthing/syncthing/lib/config" "github.com/syncthing/syncthing/lib/config"

View File

@ -135,6 +135,12 @@ func NewService(cfg *config.Wrapper, myID protocol.DeviceID, mdl Model, tlsCfg *
} }
cfg.Subscribe(service) cfg.Subscribe(service)
raw := cfg.RawCopy()
// Actually starts the listeners and NAT service
// Need to start this before service.connect so that any dials that
// try punch through already have a listener to cling on.
service.CommitConfiguration(raw, raw)
// There are several moving parts here; one routine per listening address // There are several moving parts here; one routine per listening address
// (handled in configuration changing) to handle incoming connections, // (handled in configuration changing) to handle incoming connections,
// one routine to periodically attempt outgoing connections, one routine to // one routine to periodically attempt outgoing connections, one routine to
@ -145,10 +151,6 @@ func NewService(cfg *config.Wrapper, myID protocol.DeviceID, mdl Model, tlsCfg *
service.Add(serviceFunc(service.handle)) service.Add(serviceFunc(service.handle))
service.Add(service.listenerSupervisor) service.Add(service.listenerSupervisor)
raw := cfg.RawCopy()
// Actually starts the listeners and NAT service
service.CommitConfiguration(raw, raw)
return service return service
} }

View File

@ -8,8 +8,8 @@ import (
"net" "net"
"testing" "testing"
"github.com/AudriusButkevicius/kcp-go"
"github.com/syncthing/syncthing/lib/dialer" "github.com/syncthing/syncthing/lib/dialer"
"github.com/xtaci/kcp-go"
) )
func BenchmarkRequestsRawTCP(b *testing.B) { func BenchmarkRequestsRawTCP(b *testing.B) {

View File

@ -0,0 +1,56 @@
package kcp
import (
"sync"
"time"
)
var (
BlacklistDuration time.Duration
blacklist = blacklistMap{
entries: make(map[sessionKey]time.Time),
}
)
// a global map for blacklisting conversations
type blacklistMap struct {
entries map[sessionKey]time.Time
reapAt time.Time
mut sync.Mutex
}
func (m *blacklistMap) add(address string, conv uint32) {
if BlacklistDuration == 0 {
return
}
m.mut.Lock()
timeout := time.Now().Add(BlacklistDuration)
m.entries[sessionKey{
addr: address,
convID: conv,
}] = timeout
m.reap()
m.mut.Unlock()
}
func (m *blacklistMap) has(address string, conv uint32) bool {
if BlacklistDuration == 0 {
return false
}
m.mut.Lock()
t, ok := m.entries[sessionKey{
addr: address,
convID: conv,
}]
m.mut.Lock()
return ok && t.After(time.Now())
}
func (m *blacklistMap) reap() {
now := time.Now()
for k, t := range m.entries {
if t.Before(now) {
delete(m.entries, k)
}
}
}

View File

@ -151,6 +151,7 @@ func newUDPSession(conv uint32, dataShards, parityShards int, l *Listener, conn
} }
}) })
sess.kcp.SetMtu(IKCP_MTU_DEF - sess.headerSize) sess.kcp.SetMtu(IKCP_MTU_DEF - sess.headerSize)
blacklist.add(remote.String(), conv)
// add current session to the global updater, // add current session to the global updater,
// which periodically calls sess.update() // which periodically calls sess.update()
@ -751,7 +752,7 @@ func (l *Listener) monitor() {
} }
if !ok { // new session if !ok { // new session
if len(l.chAccepts) < cap(l.chAccepts) && len(l.sessions) < 4096 { // do not let new session overwhelm accept queue and connection count if !blacklist.has(from.String(), conv) && len(l.chAccepts) < cap(l.chAccepts) && len(l.sessions) < 4096 { // do not let new session overwhelm accept queue and connection count
s := newUDPSession(conv, l.dataShards, l.parityShards, l, l.conn, from, l.block) s := newUDPSession(conv, l.dataShards, l.parityShards, l, l.conn, from, l.block)
s.kcpInput(data) s.kcpInput(data)
l.sessions[key] = s l.sessions[key] = s

16
vendor/manifest vendored
View File

@ -17,6 +17,14 @@
"branch": "master", "branch": "master",
"notests": true "notests": true
}, },
{
"importpath": "github.com/AudriusButkevicius/kcp-go",
"repository": "https://github.com/AudriusButkevicius/kcp-go",
"vcs": "git",
"revision": "54928af49abc3a4e5b645f42466a56cebc4a941e",
"branch": "master",
"notests": true
},
{ {
"importpath": "github.com/AudriusButkevicius/pfilter", "importpath": "github.com/AudriusButkevicius/pfilter",
"repository": "https://github.com/AudriusButkevicius/pfilter", "repository": "https://github.com/AudriusButkevicius/pfilter",
@ -430,14 +438,6 @@
"path": "/qr", "path": "/qr",
"notests": true "notests": true
}, },
{
"importpath": "github.com/xtaci/kcp-go",
"repository": "https://github.com/xtaci/kcp-go",
"vcs": "git",
"revision": "21da33a6696d67c1bffb3c954366499d613097a6",
"branch": "master",
"notests": true
},
{ {
"importpath": "github.com/xtaci/smux", "importpath": "github.com/xtaci/smux",
"repository": "https://github.com/xtaci/smux", "repository": "https://github.com/xtaci/smux",