2017-03-07 12:44:16 +00:00
package kcp
import (
"crypto/rand"
"encoding/binary"
"hash/crc32"
"io"
"net"
"sync"
"sync/atomic"
"time"
"github.com/pkg/errors"
"golang.org/x/net/ipv4"
)
type errTimeout struct {
error
}
func ( errTimeout ) Timeout ( ) bool { return true }
func ( errTimeout ) Temporary ( ) bool { return true }
func ( errTimeout ) Error ( ) string { return "i/o timeout" }
const (
2017-03-09 13:01:07 +00:00
// 16-bytes magic number for each packet
nonceSize = 16
// 4-bytes packet checksum
crcSize = 4
// overall crypto header size
2017-03-07 13:28:09 +00:00
cryptHeaderSize = nonceSize + crcSize
2017-03-09 13:01:07 +00:00
// maximum packet size
2017-04-05 14:34:41 +00:00
mtuLimit = 1500
2017-03-09 13:01:07 +00:00
// FEC keeps rxFECMulti* (dataShard+parityShard) ordered packets in memory
rxFECMulti = 3
2017-04-05 14:34:41 +00:00
// accept backlog
acceptBacklog = 128
// prerouting(to session) queue
qlen = 128
2017-03-07 12:44:16 +00:00
)
const (
errBrokenPipe = "broken pipe"
errInvalidOperation = "invalid operation"
)
var (
2017-03-09 13:01:07 +00:00
// global packet buffer
// shared among sending/receiving/FEC
2017-03-07 12:44:16 +00:00
xmitBuf sync . Pool
)
func init ( ) {
xmitBuf . New = func ( ) interface { } {
return make ( [ ] byte , mtuLimit )
}
}
type (
// UDPSession defines a KCP session implemented by UDP
UDPSession struct {
2017-09-02 06:04:35 +00:00
updaterIdx int // record slice index in updater
conn net . PacketConn // the underlying packet connection
kcp * KCP // KCP ARQ protocol
l * Listener // point to the Listener if it's accepted by Listener
block BlockCrypt // block encryption
2017-03-09 13:01:07 +00:00
// kcp receiving is based on packets
2017-04-05 14:34:41 +00:00
// recvbuf turns packets into stream
recvbuf [ ] byte
bufptr [ ] byte
// extended output buffer(with header)
ext [ ] byte
2017-03-09 13:01:07 +00:00
2017-04-05 14:34:41 +00:00
// FEC
2017-09-02 06:04:35 +00:00
fecDecoder * fecDecoder
fecEncoder * fecEncoder
2017-03-07 13:28:09 +00:00
// settings
2017-09-02 06:04:35 +00:00
remote net . Addr // remote peer address
rd time . Time // read deadline
wd time . Time // write deadline
headerSize int // the overall header size added before KCP frame
ackNoDelay bool // send ack immediately for each incoming packet
writeDelay bool // delay kcp.flush() for Write() for bulk transfer
dup int // duplicate udp packets
2017-03-07 13:28:09 +00:00
// notifications
2017-03-09 13:01:07 +00:00
die chan struct { } // notify session has Closed
chReadEvent chan struct { } // notify Read() can be called without blocking
chWriteEvent chan struct { } // notify Write() can be called without blocking
2017-09-02 06:04:35 +00:00
chErrorEvent chan error // notify Read() have an error
2017-03-09 13:01:07 +00:00
isClosed bool // flag the session has Closed
mu sync . Mutex
2017-03-07 12:44:16 +00:00
}
setReadBuffer interface {
SetReadBuffer ( bytes int ) error
}
setWriteBuffer interface {
SetWriteBuffer ( bytes int ) error
}
)
// newUDPSession create a new udp session for client or server
2017-09-02 14:11:48 +00:00
func newUDPSession ( conv uint32 , dataShards , parityShards int , l * Listener , conn net . PacketConn , remote net . Addr , block BlockCrypt ) * UDPSession {
2017-03-07 12:44:16 +00:00
sess := new ( UDPSession )
sess . die = make ( chan struct { } )
sess . chReadEvent = make ( chan struct { } , 1 )
sess . chWriteEvent = make ( chan struct { } , 1 )
2017-09-02 06:04:35 +00:00
sess . chErrorEvent = make ( chan error , 1 )
2017-03-07 12:44:16 +00:00
sess . remote = remote
sess . conn = conn
sess . l = l
sess . block = block
2017-04-05 14:34:41 +00:00
sess . recvbuf = make ( [ ] byte , mtuLimit )
2017-03-07 13:28:09 +00:00
// FEC initialization
2017-04-05 14:34:41 +00:00
sess . fecDecoder = newFECDecoder ( rxFECMulti * ( dataShards + parityShards ) , dataShards , parityShards )
if sess . block != nil {
sess . fecEncoder = newFECEncoder ( dataShards , parityShards , cryptHeaderSize )
} else {
sess . fecEncoder = newFECEncoder ( dataShards , parityShards , 0 )
2017-03-07 13:28:09 +00:00
}
2017-03-07 12:44:16 +00:00
// calculate header size
if sess . block != nil {
sess . headerSize += cryptHeaderSize
}
2017-04-05 14:34:41 +00:00
if sess . fecEncoder != nil {
2017-03-07 12:44:16 +00:00
sess . headerSize += fecHeaderSizePlus2
}
2017-04-05 14:34:41 +00:00
// only allocate extended packet buffer
// when the extra header is required
if sess . headerSize > 0 {
sess . ext = make ( [ ] byte , mtuLimit )
}
2017-03-07 12:44:16 +00:00
sess . kcp = NewKCP ( conv , func ( buf [ ] byte , size int ) {
if size >= IKCP_OVERHEAD {
2017-03-07 13:28:09 +00:00
sess . output ( buf [ : size ] )
2017-03-07 12:44:16 +00:00
}
} )
sess . kcp . SetMtu ( IKCP_MTU_DEF - sess . headerSize )
2017-10-22 12:36:36 +00:00
blacklist . add ( remote . String ( ) , conv )
2017-03-07 12:44:16 +00:00
2017-04-05 14:34:41 +00:00
// add current session to the global updater,
// which periodically calls sess.update()
2017-03-07 13:28:09 +00:00
updater . addSession ( sess )
2017-04-05 14:34:41 +00:00
2017-03-07 12:44:16 +00:00
if sess . l == nil { // it's a client connection
go sess . readLoop ( )
atomic . AddUint64 ( & DefaultSnmp . ActiveOpens , 1 )
} else {
atomic . AddUint64 ( & DefaultSnmp . PassiveOpens , 1 )
}
currestab := atomic . AddUint64 ( & DefaultSnmp . CurrEstab , 1 )
maxconn := atomic . LoadUint64 ( & DefaultSnmp . MaxConn )
if currestab > maxconn {
atomic . CompareAndSwapUint64 ( & DefaultSnmp . MaxConn , maxconn , currestab )
}
return sess
}
2017-04-05 14:34:41 +00:00
// Read implements net.Conn
2017-03-07 12:44:16 +00:00
func ( s * UDPSession ) Read ( b [ ] byte ) ( n int , err error ) {
for {
s . mu . Lock ( )
2017-04-05 14:34:41 +00:00
if len ( s . bufptr ) > 0 { // copy from buffer into b
n = copy ( b , s . bufptr )
s . bufptr = s . bufptr [ n : ]
2017-03-07 12:44:16 +00:00
s . mu . Unlock ( )
return n , nil
}
if s . isClosed {
s . mu . Unlock ( )
return 0 , errors . New ( errBrokenPipe )
}
2017-04-05 14:34:41 +00:00
if size := s . kcp . PeekSize ( ) ; size > 0 { // peek data size from kcp
atomic . AddUint64 ( & DefaultSnmp . BytesReceived , uint64 ( size ) )
if len ( b ) >= size { // direct write to b
s . kcp . Recv ( b )
2017-03-07 12:44:16 +00:00
s . mu . Unlock ( )
2017-04-05 14:34:41 +00:00
return size , nil
2017-03-07 12:44:16 +00:00
}
2017-04-05 14:34:41 +00:00
// resize kcp receive buffer
// to make sure recvbuf has enough capacity
if cap ( s . recvbuf ) < size {
s . recvbuf = make ( [ ] byte , size )
2017-03-07 12:44:16 +00:00
}
2017-04-05 14:34:41 +00:00
// resize recvbuf slice length
s . recvbuf = s . recvbuf [ : size ]
s . kcp . Recv ( s . recvbuf )
n = copy ( b , s . recvbuf ) // copy to b
s . bufptr = s . recvbuf [ n : ] // update pointer
2017-03-07 12:44:16 +00:00
s . mu . Unlock ( )
return n , nil
}
2017-04-05 14:34:41 +00:00
// read deadline
2017-03-07 12:44:16 +00:00
var timeout * time . Timer
var c <- chan time . Time
if ! s . rd . IsZero ( ) {
2017-04-05 14:34:41 +00:00
if time . Now ( ) . After ( s . rd ) {
s . mu . Unlock ( )
return 0 , errTimeout { }
}
2017-03-07 12:44:16 +00:00
delay := s . rd . Sub ( time . Now ( ) )
timeout = time . NewTimer ( delay )
c = timeout . C
}
s . mu . Unlock ( )
// wait for read event or timeout
select {
case <- s . chReadEvent :
case <- c :
case <- s . die :
2017-09-02 06:04:35 +00:00
case err = <- s . chErrorEvent :
if timeout != nil {
timeout . Stop ( )
}
return n , err
2017-03-07 12:44:16 +00:00
}
if timeout != nil {
timeout . Stop ( )
}
}
}
2017-04-05 14:34:41 +00:00
// Write implements net.Conn
2017-03-07 12:44:16 +00:00
func ( s * UDPSession ) Write ( b [ ] byte ) ( n int , err error ) {
for {
s . mu . Lock ( )
if s . isClosed {
s . mu . Unlock ( )
return 0 , errors . New ( errBrokenPipe )
}
2017-04-05 14:34:41 +00:00
// api flow control
if s . kcp . WaitSnd ( ) < int ( s . kcp . snd_wnd ) {
2017-03-07 12:44:16 +00:00
n = len ( b )
for {
2017-03-07 13:28:09 +00:00
if len ( b ) <= int ( s . kcp . mss ) {
2017-03-07 12:44:16 +00:00
s . kcp . Send ( b )
break
} else {
2017-03-07 13:28:09 +00:00
s . kcp . Send ( b [ : s . kcp . mss ] )
b = b [ s . kcp . mss : ]
2017-03-07 12:44:16 +00:00
}
}
2017-03-07 13:28:09 +00:00
2017-04-05 14:34:41 +00:00
if ! s . writeDelay {
s . kcp . flush ( false )
}
2017-03-07 12:44:16 +00:00
s . mu . Unlock ( )
atomic . AddUint64 ( & DefaultSnmp . BytesSent , uint64 ( n ) )
return n , nil
}
2017-04-05 14:34:41 +00:00
// write deadline
2017-03-07 12:44:16 +00:00
var timeout * time . Timer
var c <- chan time . Time
if ! s . wd . IsZero ( ) {
2017-04-05 14:34:41 +00:00
if time . Now ( ) . After ( s . wd ) {
s . mu . Unlock ( )
return 0 , errTimeout { }
}
2017-03-07 12:44:16 +00:00
delay := s . wd . Sub ( time . Now ( ) )
timeout = time . NewTimer ( delay )
c = timeout . C
}
s . mu . Unlock ( )
// wait for write event or timeout
select {
case <- s . chWriteEvent :
case <- c :
case <- s . die :
}
if timeout != nil {
timeout . Stop ( )
}
}
}
// Close closes the connection.
func ( s * UDPSession ) Close ( ) error {
2017-09-02 06:04:35 +00:00
// remove this session from updater & listener(if necessary)
2017-03-07 13:28:09 +00:00
updater . removeSession ( s )
2017-04-05 14:34:41 +00:00
if s . l != nil { // notify listener
2017-10-17 22:17:10 +00:00
s . l . closeSession ( sessionKey {
addr : s . remote . String ( ) ,
convID : s . kcp . conv ,
} )
2017-04-05 14:34:41 +00:00
}
2017-03-07 13:28:09 +00:00
2017-03-07 12:44:16 +00:00
s . mu . Lock ( )
defer s . mu . Unlock ( )
if s . isClosed {
return errors . New ( errBrokenPipe )
}
close ( s . die )
s . isClosed = true
atomic . AddUint64 ( & DefaultSnmp . CurrEstab , ^ uint64 ( 0 ) )
2017-09-02 14:11:48 +00:00
if s . l == nil { // client socket close
2017-03-07 12:44:16 +00:00
return s . conn . Close ( )
}
return nil
}
// LocalAddr returns the local network address. The Addr returned is shared by all invocations of LocalAddr, so do not modify it.
func ( s * UDPSession ) LocalAddr ( ) net . Addr { return s . conn . LocalAddr ( ) }
// RemoteAddr returns the remote network address. The Addr returned is shared by all invocations of RemoteAddr, so do not modify it.
func ( s * UDPSession ) RemoteAddr ( ) net . Addr { return s . remote }
// SetDeadline sets the deadline associated with the listener. A zero time value disables the deadline.
func ( s * UDPSession ) SetDeadline ( t time . Time ) error {
s . mu . Lock ( )
defer s . mu . Unlock ( )
s . rd = t
s . wd = t
return nil
}
// SetReadDeadline implements the Conn SetReadDeadline method.
func ( s * UDPSession ) SetReadDeadline ( t time . Time ) error {
s . mu . Lock ( )
defer s . mu . Unlock ( )
s . rd = t
return nil
}
// SetWriteDeadline implements the Conn SetWriteDeadline method.
func ( s * UDPSession ) SetWriteDeadline ( t time . Time ) error {
s . mu . Lock ( )
defer s . mu . Unlock ( )
s . wd = t
return nil
}
2017-04-05 14:34:41 +00:00
// SetWriteDelay delays write for bulk transfer until the next update interval
func ( s * UDPSession ) SetWriteDelay ( delay bool ) {
s . mu . Lock ( )
defer s . mu . Unlock ( )
s . writeDelay = delay
}
2017-03-07 12:44:16 +00:00
// SetWindowSize set maximum window size
func ( s * UDPSession ) SetWindowSize ( sndwnd , rcvwnd int ) {
s . mu . Lock ( )
defer s . mu . Unlock ( )
s . kcp . WndSize ( sndwnd , rcvwnd )
}
2017-03-09 13:01:07 +00:00
// SetMtu sets the maximum transmission unit(not including UDP header)
func ( s * UDPSession ) SetMtu ( mtu int ) bool {
if mtu > mtuLimit {
return false
}
2017-03-07 12:44:16 +00:00
s . mu . Lock ( )
defer s . mu . Unlock ( )
s . kcp . SetMtu ( mtu - s . headerSize )
2017-03-09 13:01:07 +00:00
return true
2017-03-07 12:44:16 +00:00
}
// SetStreamMode toggles the stream mode on/off
func ( s * UDPSession ) SetStreamMode ( enable bool ) {
s . mu . Lock ( )
defer s . mu . Unlock ( )
if enable {
s . kcp . stream = 1
} else {
s . kcp . stream = 0
}
}
// SetACKNoDelay changes ack flush option, set true to flush ack immediately,
func ( s * UDPSession ) SetACKNoDelay ( nodelay bool ) {
s . mu . Lock ( )
defer s . mu . Unlock ( )
s . ackNoDelay = nodelay
}
2017-09-02 06:04:35 +00:00
// SetDUP duplicates udp packets for kcp output, for testing purpose only
func ( s * UDPSession ) SetDUP ( dup int ) {
s . mu . Lock ( )
defer s . mu . Unlock ( )
s . dup = dup
}
2017-03-07 12:44:16 +00:00
// SetNoDelay calls nodelay() of kcp
2017-09-02 06:04:35 +00:00
// https://github.com/skywind3000/kcp/blob/master/README.en.md#protocol-configuration
2017-03-07 12:44:16 +00:00
func ( s * UDPSession ) SetNoDelay ( nodelay , interval , resend , nc int ) {
s . mu . Lock ( )
defer s . mu . Unlock ( )
s . kcp . NoDelay ( nodelay , interval , resend , nc )
}
// SetDSCP sets the 6bit DSCP field of IP header, no effect if it's accepted from Listener
func ( s * UDPSession ) SetDSCP ( dscp int ) error {
s . mu . Lock ( )
defer s . mu . Unlock ( )
if s . l == nil {
2017-09-02 06:04:35 +00:00
if nc , ok := s . conn . ( * connectedUDPConn ) ; ok {
return ipv4 . NewConn ( nc . UDPConn ) . SetTOS ( dscp << 2 )
2017-03-07 12:44:16 +00:00
} else if nc , ok := s . conn . ( net . Conn ) ; ok {
return ipv4 . NewConn ( nc ) . SetTOS ( dscp << 2 )
}
}
return errors . New ( errInvalidOperation )
}
// SetReadBuffer sets the socket read buffer, no effect if it's accepted from Listener
func ( s * UDPSession ) SetReadBuffer ( bytes int ) error {
s . mu . Lock ( )
defer s . mu . Unlock ( )
if s . l == nil {
if nc , ok := s . conn . ( setReadBuffer ) ; ok {
return nc . SetReadBuffer ( bytes )
}
}
return errors . New ( errInvalidOperation )
}
// SetWriteBuffer sets the socket write buffer, no effect if it's accepted from Listener
func ( s * UDPSession ) SetWriteBuffer ( bytes int ) error {
s . mu . Lock ( )
defer s . mu . Unlock ( )
if s . l == nil {
if nc , ok := s . conn . ( setWriteBuffer ) ; ok {
return nc . SetWriteBuffer ( bytes )
}
}
return errors . New ( errInvalidOperation )
}
2017-03-07 13:28:09 +00:00
// output pipeline entry
// steps for output data processing:
2017-04-05 14:34:41 +00:00
// 0. Header extends
2017-03-07 13:28:09 +00:00
// 1. FEC
// 2. CRC32
// 3. Encryption
2017-04-05 14:34:41 +00:00
// 4. WriteTo kernel
2017-03-07 13:28:09 +00:00
func ( s * UDPSession ) output ( buf [ ] byte ) {
var ecc [ ] [ ] byte
2017-09-02 06:04:35 +00:00
// 0. extend buf's header space(if necessary)
2017-04-05 14:34:41 +00:00
ext := buf
if s . headerSize > 0 {
ext = s . ext [ : s . headerSize + len ( buf ) ]
copy ( ext [ s . headerSize : ] , buf )
}
2017-03-07 13:28:09 +00:00
2017-09-02 06:04:35 +00:00
// 1. FEC encoding
2017-04-05 14:34:41 +00:00
if s . fecEncoder != nil {
2017-09-02 06:04:35 +00:00
ecc = s . fecEncoder . encode ( ext )
2017-03-07 13:28:09 +00:00
}
2017-03-07 12:44:16 +00:00
2017-09-02 06:04:35 +00:00
// 2&3. crc32 & encryption
2017-03-07 13:28:09 +00:00
if s . block != nil {
io . ReadFull ( rand . Reader , ext [ : nonceSize ] )
checksum := crc32 . ChecksumIEEE ( ext [ cryptHeaderSize : ] )
binary . LittleEndian . PutUint32 ( ext [ nonceSize : ] , checksum )
s . block . Encrypt ( ext , ext )
2017-09-02 06:04:35 +00:00
for k := range ecc {
io . ReadFull ( rand . Reader , ecc [ k ] [ : nonceSize ] )
checksum := crc32 . ChecksumIEEE ( ecc [ k ] [ cryptHeaderSize : ] )
binary . LittleEndian . PutUint32 ( ecc [ k ] [ nonceSize : ] , checksum )
s . block . Encrypt ( ecc [ k ] , ecc [ k ] )
2017-03-07 12:44:16 +00:00
}
}
2017-09-02 06:04:35 +00:00
// 4. WriteTo kernel
2017-04-05 14:34:41 +00:00
nbytes := 0
npkts := 0
2017-09-02 06:04:35 +00:00
for i := 0 ; i < s . dup + 1 ; i ++ {
if n , err := s . conn . WriteTo ( ext , s . remote ) ; err == nil {
nbytes += n
npkts ++
}
2017-04-05 14:34:41 +00:00
}
2017-09-02 06:04:35 +00:00
for k := range ecc {
if n , err := s . conn . WriteTo ( ecc [ k ] , s . remote ) ; err == nil {
nbytes += n
npkts ++
2017-03-07 12:44:16 +00:00
}
}
2017-04-05 14:34:41 +00:00
atomic . AddUint64 ( & DefaultSnmp . OutPkts , uint64 ( npkts ) )
atomic . AddUint64 ( & DefaultSnmp . OutBytes , uint64 ( nbytes ) )
2017-03-07 12:44:16 +00:00
}
2017-03-07 13:28:09 +00:00
// kcp update, returns interval for next calling
2017-04-05 14:34:41 +00:00
func ( s * UDPSession ) update ( ) ( interval time . Duration ) {
2017-03-07 13:28:09 +00:00
s . mu . Lock ( )
s . kcp . flush ( false )
2017-04-05 14:34:41 +00:00
if s . kcp . WaitSnd ( ) < int ( s . kcp . snd_wnd ) {
2017-03-07 13:28:09 +00:00
s . notifyWriteEvent ( )
}
2017-09-02 06:04:35 +00:00
interval = time . Duration ( s . kcp . interval ) * time . Millisecond
2017-03-07 13:28:09 +00:00
s . mu . Unlock ( )
2017-04-05 14:34:41 +00:00
return
2017-03-07 13:28:09 +00:00
}
2017-03-07 12:44:16 +00:00
// GetConv gets conversation id of a session
2017-09-02 06:04:35 +00:00
func ( s * UDPSession ) GetConv ( ) uint32 { return s . kcp . conv }
2017-03-07 12:44:16 +00:00
func ( s * UDPSession ) notifyReadEvent ( ) {
select {
case s . chReadEvent <- struct { } { } :
default :
}
}
func ( s * UDPSession ) notifyWriteEvent ( ) {
select {
case s . chWriteEvent <- struct { } { } :
default :
}
}
func ( s * UDPSession ) kcpInput ( data [ ] byte ) {
2017-03-07 13:28:09 +00:00
var kcpInErrors , fecErrs , fecRecovered , fecParityShards uint64
2017-03-07 12:44:16 +00:00
2017-04-05 14:34:41 +00:00
if s . fecDecoder != nil {
f := s . fecDecoder . decodeBytes ( data )
2017-03-07 12:44:16 +00:00
s . mu . Lock ( )
if f . flag == typeData {
2017-03-07 13:28:09 +00:00
if ret := s . kcp . Input ( data [ fecHeaderSizePlus2 : ] , true , s . ackNoDelay ) ; ret != 0 {
2017-03-07 12:44:16 +00:00
kcpInErrors ++
}
}
if f . flag == typeData || f . flag == typeFEC {
if f . flag == typeFEC {
2017-03-07 13:28:09 +00:00
fecParityShards ++
2017-03-07 12:44:16 +00:00
}
2017-09-02 06:04:35 +00:00
recovers := s . fecDecoder . decode ( f )
for _ , r := range recovers {
if len ( r ) >= 2 { // must be larger than 2bytes
sz := binary . LittleEndian . Uint16 ( r )
if int ( sz ) <= len ( r ) && sz >= 2 {
if ret := s . kcp . Input ( r [ 2 : sz ] , false , s . ackNoDelay ) ; ret == 0 {
fecRecovered ++
2017-03-07 12:44:16 +00:00
} else {
2017-09-02 06:04:35 +00:00
kcpInErrors ++
2017-03-07 12:44:16 +00:00
}
} else {
fecErrs ++
}
2017-09-02 06:04:35 +00:00
} else {
fecErrs ++
2017-03-07 12:44:16 +00:00
}
}
}
// notify reader
if n := s . kcp . PeekSize ( ) ; n > 0 {
s . notifyReadEvent ( )
}
s . mu . Unlock ( )
} else {
s . mu . Lock ( )
2017-03-07 13:28:09 +00:00
if ret := s . kcp . Input ( data , true , s . ackNoDelay ) ; ret != 0 {
2017-03-07 12:44:16 +00:00
kcpInErrors ++
}
// notify reader
if n := s . kcp . PeekSize ( ) ; n > 0 {
s . notifyReadEvent ( )
}
s . mu . Unlock ( )
}
2017-03-09 13:01:07 +00:00
atomic . AddUint64 ( & DefaultSnmp . InPkts , 1 )
2017-03-07 12:44:16 +00:00
atomic . AddUint64 ( & DefaultSnmp . InBytes , uint64 ( len ( data ) ) )
2017-03-07 13:28:09 +00:00
if fecParityShards > 0 {
atomic . AddUint64 ( & DefaultSnmp . FECParityShards , fecParityShards )
2017-03-07 12:44:16 +00:00
}
if kcpInErrors > 0 {
atomic . AddUint64 ( & DefaultSnmp . KCPInErrors , kcpInErrors )
}
if fecErrs > 0 {
atomic . AddUint64 ( & DefaultSnmp . FECErrs , fecErrs )
}
if fecRecovered > 0 {
atomic . AddUint64 ( & DefaultSnmp . FECRecovered , fecRecovered )
}
}
2017-09-02 06:04:35 +00:00
func ( s * UDPSession ) receiver ( ch chan <- [ ] byte ) {
2017-03-07 12:44:16 +00:00
for {
data := xmitBuf . Get ( ) . ( [ ] byte ) [ : mtuLimit ]
if n , _ , err := s . conn . ReadFrom ( data ) ; err == nil && n >= s . headerSize + IKCP_OVERHEAD {
select {
case ch <- data [ : n ] :
case <- s . die :
2017-04-05 14:34:41 +00:00
return
2017-03-07 12:44:16 +00:00
}
} else if err != nil {
2017-09-02 06:04:35 +00:00
s . chErrorEvent <- err
2017-03-07 12:44:16 +00:00
return
} else {
atomic . AddUint64 ( & DefaultSnmp . InErrs , 1 )
}
}
}
// read loop for client session
func ( s * UDPSession ) readLoop ( ) {
2017-04-05 14:34:41 +00:00
chPacket := make ( chan [ ] byte , qlen )
2017-03-07 12:44:16 +00:00
go s . receiver ( chPacket )
for {
select {
case data := <- chPacket :
raw := data
dataValid := false
if s . block != nil {
s . block . Decrypt ( data , data )
data = data [ nonceSize : ]
checksum := crc32 . ChecksumIEEE ( data [ crcSize : ] )
if checksum == binary . LittleEndian . Uint32 ( data ) {
data = data [ crcSize : ]
dataValid = true
} else {
atomic . AddUint64 ( & DefaultSnmp . InCsumErrors , 1 )
}
} else if s . block == nil {
dataValid = true
}
if dataValid {
s . kcpInput ( data )
}
xmitBuf . Put ( raw )
case <- s . die :
return
}
}
}
type (
2017-10-17 22:17:10 +00:00
sessionKey struct {
addr string
convID uint32
}
2017-03-07 12:44:16 +00:00
// Listener defines a server listening for connections
Listener struct {
2017-03-09 13:01:07 +00:00
block BlockCrypt // block encryption
dataShards int // FEC data shard
parityShards int // FEC parity shard
2017-09-02 06:04:35 +00:00
fecDecoder * fecDecoder // FEC mock initialization
2017-03-09 13:01:07 +00:00
conn net . PacketConn // the underlying packet connection
2017-10-17 22:17:10 +00:00
sessions map [ sessionKey ] * UDPSession // all sessions accepted by this Listener
chAccepts chan * UDPSession // Listen() backlog
chSessionClosed chan sessionKey // session close queue
headerSize int // the overall header size added before KCP frame
die chan struct { } // notify the listener has closed
rd atomic . Value // read deadline for Accept()
2017-04-05 14:34:41 +00:00
wd atomic . Value
2017-03-09 13:01:07 +00:00
}
// incoming packet
inPacket struct {
2017-03-07 12:44:16 +00:00
from net . Addr
data [ ] byte
}
)
// monitor incoming data for all connections of server
func ( l * Listener ) monitor ( ) {
2017-09-02 06:04:35 +00:00
// cache last session
2017-10-17 22:17:10 +00:00
var lastKey sessionKey
2017-09-02 06:04:35 +00:00
var lastSession * UDPSession
2017-04-05 14:34:41 +00:00
chPacket := make ( chan inPacket , qlen )
2017-03-07 12:44:16 +00:00
go l . receiver ( chPacket )
for {
select {
case p := <- chPacket :
raw := p . data
data := p . data
from := p . from
dataValid := false
if l . block != nil {
l . block . Decrypt ( data , data )
data = data [ nonceSize : ]
checksum := crc32 . ChecksumIEEE ( data [ crcSize : ] )
if checksum == binary . LittleEndian . Uint32 ( data ) {
data = data [ crcSize : ]
dataValid = true
} else {
atomic . AddUint64 ( & DefaultSnmp . InCsumErrors , 1 )
}
} else if l . block == nil {
dataValid = true
}
if dataValid {
2017-09-02 06:04:35 +00:00
var conv uint32
convValid := false
if l . fecDecoder != nil {
isfec := binary . LittleEndian . Uint16 ( data [ 4 : ] )
if isfec == typeData {
conv = binary . LittleEndian . Uint32 ( data [ fecHeaderSizePlus2 : ] )
convValid = true
}
} else {
conv = binary . LittleEndian . Uint32 ( data )
convValid = true
}
2017-03-07 12:44:16 +00:00
2017-09-02 06:04:35 +00:00
if convValid {
2017-10-17 22:17:10 +00:00
key := sessionKey {
addr : from . String ( ) ,
convID : conv ,
}
2017-09-02 06:04:35 +00:00
var s * UDPSession
var ok bool
// packets received from an address always come in batch.
// cache the session for next packet, without querying map.
if key == lastKey {
s , ok = lastSession , true
} else if s , ok = l . sessions [ key ] ; ok {
lastSession = s
2017-10-17 22:17:10 +00:00
lastKey = key
2017-09-02 06:04:35 +00:00
}
if ! ok { // new session
2017-10-22 12:36:36 +00:00
if ! blacklist . has ( from . String ( ) , conv ) && len ( l . chAccepts ) < cap ( l . chAccepts ) && len ( l . sessions ) < 4096 { // do not let new session overwhelm accept queue and connection count
2017-09-02 14:11:48 +00:00
s := newUDPSession ( conv , l . dataShards , l . parityShards , l , l . conn , from , l . block )
2017-04-05 14:34:41 +00:00
s . kcpInput ( data )
2017-09-02 06:04:35 +00:00
l . sessions [ key ] = s
2017-04-05 14:34:41 +00:00
l . chAccepts <- s
}
2017-09-02 06:04:35 +00:00
} else {
s . kcpInput ( data )
2017-03-07 12:44:16 +00:00
}
}
}
2017-03-09 13:01:07 +00:00
xmitBuf . Put ( raw )
2017-09-02 06:04:35 +00:00
case key := <- l . chSessionClosed :
if key == lastKey {
2017-10-17 22:17:10 +00:00
lastKey = sessionKey { }
2017-09-02 06:04:35 +00:00
}
delete ( l . sessions , key )
2017-03-07 12:44:16 +00:00
case <- l . die :
return
}
}
}
2017-09-02 06:04:35 +00:00
func ( l * Listener ) receiver ( ch chan <- inPacket ) {
2017-03-07 12:44:16 +00:00
for {
2017-03-09 13:01:07 +00:00
data := xmitBuf . Get ( ) . ( [ ] byte ) [ : mtuLimit ]
2017-03-07 12:44:16 +00:00
if n , from , err := l . conn . ReadFrom ( data ) ; err == nil && n >= l . headerSize + IKCP_OVERHEAD {
2017-04-05 14:34:41 +00:00
select {
case ch <- inPacket { from , data [ : n ] } :
case <- l . die :
return
}
2017-03-07 12:44:16 +00:00
} else if err != nil {
return
} else {
atomic . AddUint64 ( & DefaultSnmp . InErrs , 1 )
}
}
}
// SetReadBuffer sets the socket read buffer for the Listener
func ( l * Listener ) SetReadBuffer ( bytes int ) error {
if nc , ok := l . conn . ( setReadBuffer ) ; ok {
return nc . SetReadBuffer ( bytes )
}
return errors . New ( errInvalidOperation )
}
// SetWriteBuffer sets the socket write buffer for the Listener
func ( l * Listener ) SetWriteBuffer ( bytes int ) error {
if nc , ok := l . conn . ( setWriteBuffer ) ; ok {
return nc . SetWriteBuffer ( bytes )
}
return errors . New ( errInvalidOperation )
}
// SetDSCP sets the 6bit DSCP field of IP header
func ( l * Listener ) SetDSCP ( dscp int ) error {
if nc , ok := l . conn . ( net . Conn ) ; ok {
return ipv4 . NewConn ( nc ) . SetTOS ( dscp << 2 )
}
return errors . New ( errInvalidOperation )
}
// Accept implements the Accept method in the Listener interface; it waits for the next call and returns a generic Conn.
func ( l * Listener ) Accept ( ) ( net . Conn , error ) {
return l . AcceptKCP ( )
}
// AcceptKCP accepts a KCP connection
func ( l * Listener ) AcceptKCP ( ) ( * UDPSession , error ) {
var timeout <- chan time . Time
if tdeadline , ok := l . rd . Load ( ) . ( time . Time ) ; ok && ! tdeadline . IsZero ( ) {
timeout = time . After ( tdeadline . Sub ( time . Now ( ) ) )
}
select {
case <- timeout :
return nil , & errTimeout { }
case c := <- l . chAccepts :
return c , nil
case <- l . die :
return nil , errors . New ( errBrokenPipe )
}
}
// SetDeadline sets the deadline associated with the listener. A zero time value disables the deadline.
func ( l * Listener ) SetDeadline ( t time . Time ) error {
l . SetReadDeadline ( t )
l . SetWriteDeadline ( t )
return nil
}
// SetReadDeadline implements the Conn SetReadDeadline method.
func ( l * Listener ) SetReadDeadline ( t time . Time ) error {
l . rd . Store ( t )
return nil
}
// SetWriteDeadline implements the Conn SetWriteDeadline method.
func ( l * Listener ) SetWriteDeadline ( t time . Time ) error {
l . wd . Store ( t )
return nil
}
// Close stops listening on the UDP address. Already Accepted connections are not closed.
func ( l * Listener ) Close ( ) error {
close ( l . die )
return l . conn . Close ( )
}
2017-04-05 14:34:41 +00:00
// closeSession notify the listener that a session has closed
2017-10-17 22:17:10 +00:00
func ( l * Listener ) closeSession ( key sessionKey ) bool {
2017-04-05 14:34:41 +00:00
select {
2017-09-02 06:04:35 +00:00
case l . chSessionClosed <- key :
2017-04-05 14:34:41 +00:00
return true
case <- l . die :
return false
}
}
2017-03-07 12:44:16 +00:00
// Addr returns the listener's network address, The Addr returned is shared by all invocations of Addr, so do not modify it.
2017-09-02 06:04:35 +00:00
func ( l * Listener ) Addr ( ) net . Addr { return l . conn . LocalAddr ( ) }
2017-03-07 12:44:16 +00:00
// Listen listens for incoming KCP packets addressed to the local address laddr on the network "udp",
2017-09-02 06:04:35 +00:00
func Listen ( laddr string ) ( net . Listener , error ) { return ListenWithOptions ( laddr , nil , 0 , 0 ) }
2017-03-07 12:44:16 +00:00
// ListenWithOptions listens for incoming KCP packets addressed to the local address laddr on the network "udp" with packet encryption,
// dataShards, parityShards defines Reed-Solomon Erasure Coding parameters
func ListenWithOptions ( laddr string , block BlockCrypt , dataShards , parityShards int ) ( * Listener , error ) {
udpaddr , err := net . ResolveUDPAddr ( "udp" , laddr )
if err != nil {
return nil , errors . Wrap ( err , "net.ResolveUDPAddr" )
}
conn , err := net . ListenUDP ( "udp" , udpaddr )
if err != nil {
return nil , errors . Wrap ( err , "net.ListenUDP" )
}
return ServeConn ( block , dataShards , parityShards , conn )
}
// ServeConn serves KCP protocol for a single packet connection.
func ServeConn ( block BlockCrypt , dataShards , parityShards int , conn net . PacketConn ) ( * Listener , error ) {
l := new ( Listener )
l . conn = conn
2017-10-17 22:17:10 +00:00
l . sessions = make ( map [ sessionKey ] * UDPSession )
2017-04-05 14:34:41 +00:00
l . chAccepts = make ( chan * UDPSession , acceptBacklog )
2017-10-17 22:17:10 +00:00
l . chSessionClosed = make ( chan sessionKey )
2017-03-07 12:44:16 +00:00
l . die = make ( chan struct { } )
l . dataShards = dataShards
l . parityShards = parityShards
l . block = block
2017-04-05 14:34:41 +00:00
l . fecDecoder = newFECDecoder ( rxFECMulti * ( dataShards + parityShards ) , dataShards , parityShards )
2017-03-07 12:44:16 +00:00
// calculate header size
if l . block != nil {
l . headerSize += cryptHeaderSize
}
2017-04-05 14:34:41 +00:00
if l . fecDecoder != nil {
2017-03-07 12:44:16 +00:00
l . headerSize += fecHeaderSizePlus2
}
go l . monitor ( )
return l , nil
}
// Dial connects to the remote address "raddr" on the network "udp"
2017-09-02 06:04:35 +00:00
func Dial ( raddr string ) ( net . Conn , error ) { return DialWithOptions ( raddr , nil , 0 , 0 ) }
2017-03-07 12:44:16 +00:00
// DialWithOptions connects to the remote address "raddr" on the network "udp" with packet encryption
func DialWithOptions ( raddr string , block BlockCrypt , dataShards , parityShards int ) ( * UDPSession , error ) {
udpaddr , err := net . ResolveUDPAddr ( "udp" , raddr )
if err != nil {
return nil , errors . Wrap ( err , "net.ResolveUDPAddr" )
}
udpconn , err := net . DialUDP ( "udp" , nil , udpaddr )
if err != nil {
return nil , errors . Wrap ( err , "net.DialUDP" )
}
2017-09-02 14:11:48 +00:00
return NewConn ( raddr , block , dataShards , parityShards , & connectedUDPConn { udpconn } )
2017-03-07 12:44:16 +00:00
}
// NewConn establishes a session and talks KCP protocol over a packet connection.
2017-09-02 14:11:48 +00:00
func NewConn ( raddr string , block BlockCrypt , dataShards , parityShards int , conn net . PacketConn ) ( * UDPSession , error ) {
2017-03-07 12:44:16 +00:00
udpaddr , err := net . ResolveUDPAddr ( "udp" , raddr )
if err != nil {
return nil , errors . Wrap ( err , "net.ResolveUDPAddr" )
}
var convid uint32
binary . Read ( rand . Reader , binary . LittleEndian , & convid )
2017-09-02 14:11:48 +00:00
return newUDPSession ( convid , dataShards , parityShards , nil , conn , udpaddr , block ) , nil
2017-03-07 12:44:16 +00:00
}
2017-09-02 06:04:35 +00:00
// returns current time in milliseconds
func currentMs ( ) uint32 { return uint32 ( time . Now ( ) . UnixNano ( ) / int64 ( time . Millisecond ) ) }
2017-03-07 12:44:16 +00:00
2017-09-02 06:04:35 +00:00
// connectedUDPConn is a wrapper for net.UDPConn which converts WriteTo syscalls
2017-03-07 12:44:16 +00:00
// to Write syscalls that are 4 times faster on some OS'es. This should only be
// used for connections that were produced by a net.Dial* call.
2017-09-02 06:04:35 +00:00
type connectedUDPConn struct { * net . UDPConn }
2017-03-07 12:44:16 +00:00
// WriteTo redirects all writes to the Write syscall, which is 4 times faster.
2017-09-02 06:04:35 +00:00
func ( c * connectedUDPConn ) WriteTo ( b [ ] byte , addr net . Addr ) ( int , error ) { return c . Write ( b ) }