From ceea5ebeb31d47385ff39b542ed4e65a8576cd80 Mon Sep 17 00:00:00 2001 From: Audrius Butkevicius Date: Thu, 9 Mar 2017 14:01:07 +0100 Subject: [PATCH] lib/connections, vendor: Change KCP mux to SMUX Closes #4032 --- lib/connections/config.go | 15 +- lib/connections/kcp_dial.go | 4 +- lib/connections/kcp_listen.go | 8 +- lib/connections/kcp_misc.go | 6 +- vendor/ | 362 ----------- vendor/ | 60 -- vendor/ | 157 ----- vendor/ | 87 --- vendor/ | 623 ------------------- vendor/ | 457 -------------- vendor/ | 28 - vendor/ | 14 +- vendor/ | 19 +- vendor/ | 140 +++-- vendor/ | 40 +- vendor/ | 2 + vendor/ | 21 + vendor/ | 60 ++ vendor/ | 80 +++ vendor/ | 335 ++++++++++ vendor/ | 261 ++++++++ vendor/manifest | 18 +- 22 files changed, 908 insertions(+), 1889 deletions(-) delete mode 100644 vendor/ delete mode 100644 vendor/ delete mode 100644 vendor/ delete mode 100644 vendor/ delete mode 100644 vendor/ delete mode 100644 vendor/ delete mode 100644 vendor/ create mode 100644 vendor/ create mode 100644 vendor/ create mode 100644 vendor/ create mode 100644 vendor/ create mode 100644 vendor/ diff --git a/lib/connections/config.go b/lib/connections/config.go index 837c181fa..7824ae7b8 100644 --- a/lib/connections/config.go +++ b/lib/connections/config.go @@ -7,10 +7,9 @@ package connections import ( - "io/ioutil" "time" - "" + "" ) const ( @@ -25,12 +24,10 @@ const ( ) var ( - yamuxConfig = &yamux.Config{ - AcceptBacklog: 256, - EnableKeepAlive: true, - KeepAliveInterval: 30 * time.Second, - ConnectionWriteTimeout: 10 * time.Second, - MaxStreamWindowSize: 256 * 1024, - LogOutput: ioutil.Discard, + smuxConfig = &smux.Config{ + KeepAliveInterval: 10 * time.Second, + KeepAliveTimeout: 30 * time.Second, + MaxFrameSize: 4096, + MaxReceiveBuffer: 4 * 1024 * 1024, } ) diff --git a/lib/connections/kcp_dial.go b/lib/connections/kcp_dial.go index 702b433e8..280641c95 100644 --- a/lib/connections/kcp_dial.go +++ b/lib/connections/kcp_dial.go @@ -11,10 +11,10 @@ import ( "net/url" "time" - "" "" "" "" + "" ) func init() { @@ -56,7 +56,7 @@ func (d *kcpDialer) Dial(id protocol.DeviceID, uri *url.URL) (internalConn, erro conn.SetWindowSize(opts.KCPSendWindowSize, opts.KCPReceiveWindowSize) conn.SetNoDelay(boolInt(opts.KCPNoDelay), opts.KCPUpdateIntervalMs, boolInt(opts.KCPFastResend), boolInt(!opts.KCPCongestionControl)) - ses, err := yamux.Client(conn, yamuxConfig) + ses, err := smux.Client(conn, smuxConfig) if err != nil { conn.Close() return internalConn{}, err diff --git a/lib/connections/kcp_listen.go b/lib/connections/kcp_listen.go index 5b44ca39e..99cbf7b7a 100644 --- a/lib/connections/kcp_listen.go +++ b/lib/connections/kcp_listen.go @@ -16,10 +16,10 @@ import ( "" "" - "" "" "" "" + "" ) func init() { @@ -116,16 +116,16 @@ func (t *kcpListener) Serve() { l.Debugln("connect from", conn.RemoteAddr()) - ses, err := yamux.Server(conn, yamuxConfig) + ses, err := smux.Server(conn, smuxConfig) if err != nil { - l.Debugln("yamux server:", err) + l.Debugln("smux server:", err) conn.Close() continue } stream, err := ses.AcceptStream() if err != nil { - l.Debugln("yamux accept:", err) + l.Debugln("smux accept:", err) ses.Close() continue } diff --git a/lib/connections/kcp_misc.go b/lib/connections/kcp_misc.go index 0e50acff7..c11cfe957 100644 --- a/lib/connections/kcp_misc.go +++ b/lib/connections/kcp_misc.go @@ -16,7 +16,7 @@ import ( "time" "" - "" + "" ) var ( @@ -162,8 +162,8 @@ func (f *stunFilter) reap() { } type sessionClosingStream struct { - *yamux.Stream - session *yamux.Session + *smux.Stream + session *smux.Session } func (w *sessionClosingStream) Close() error { diff --git a/vendor/ b/vendor/ deleted file mode 100644 index f0e5c79e1..000000000 --- a/vendor/ +++ /dev/null @@ -1,362 +0,0 @@ -Mozilla Public License, version 2.0 - -1. This can happen - // if the backlog is exceeded, or if there was a remote GoAway. - ErrConnectionReset = fmt.Errorf("connection reset") - - // ErrConnectionWriteTimeout indicates that we hit the "safety valve" - // timeout writing to the underlying stream connection. - ErrConnectionWriteTimeout = fmt.Errorf("connection write timeout") - - // ErrKeepAliveTimeout is sent if a missed keepalive caused the stream close - ErrKeepAliveTimeout = fmt.Errorf("keepalive timeout") -) - -const ( - // protoVersion is the only version we support - protoVersion uint8 = 0 -) - -const ( - // Data is used for data frames. They are followed - // by length bytes worth of payload. - typeData uint8 = iota - - // WindowUpdate is used to change the window of - // a given stream. The length indicates the delta - // update to the window. - typeWindowUpdate - - // Ping is sent as a keep-alive or to measure - // the RTT. The StreamID and Length value are echoed - // back in the response. - typePing - - // GoAway is sent to terminate a session. The StreamID - // should be 0 and the length is an error code. - typeGoAway -) - -const ( - // SYN is sent to signal a new stream. May - // be sent with a data payload - flagSYN uint16 = 1 << iota - - // ACK is sent to acknowledge a new stream. May - // be sent with a data payload - flagACK - - // FIN is sent to half-close the given stream. - // May be sent with a data payload. - flagFIN - - // RST is used to hard close a given stream. - flagRST -) - -const ( - // initialStreamWindow is the initial stream window size - initialStreamWindow uint32 = 256 * 1024 -) - -const ( - // goAwayNormal is sent on a normal termination - goAwayNormal uint32 = iota - - // goAwayProtoErr sent on a protocol error - goAwayProtoErr - - // goAwayInternalErr sent on an internal error - goAwayInternalErr -) - -const ( - sizeOfVersion = 1 - sizeOfType = 1 - sizeOfFlags = 2 - sizeOfStreamID = 4 - sizeOfLength = 4 - headerSize = sizeOfVersion + sizeOfType + sizeOfFlags + - sizeOfStreamID + sizeOfLength -) - -type header []byte - -func (h header) Version() uint8 { - return h[0] -} - -func (h header) MsgType() uint8 { - return h[1] -} - -func (h header) Flags() uint16 { - return binary.BigEndian.Uint16(h[2:4]) -} - -func (h header) StreamID() uint32 { - return binary.BigEndian.Uint32(h[4:8]) -} - -func (h header) Length() uint32 { - return binary.BigEndian.Uint32(h[8:12]) -} - -func (h header) String() string { - return fmt.Sprintf("Vsn:%d Type:%d Flags:%d StreamID:%d Length:%d", - h.Version(), h.MsgType(), h.Flags(), h.StreamID(), h.Length()) -} - -func (h header) encode(msgType uint8, flags uint16, streamID uint32, length uint32) { - h[0] = protoVersion - h[1] = msgType - binary.BigEndian.PutUint16(h[2:4], flags) - binary.BigEndian.PutUint32(h[4:8], streamID) - binary.BigEndian.PutUint32(h[8:12], length) -} diff --git a/vendor/ b/vendor/ deleted file mode 100644 index 7abc7c744..000000000 --- a/vendor/ +++ /dev/null @@ -1,87 +0,0 @@ -package yamux - -import ( - "fmt" - "io" - "os" - "time" -) - -// Config is used to tune the Yamux session -type Config struct { - // AcceptBacklog is used to limit how many streams may be - // waiting an accept. - AcceptBacklog int - - // EnableKeepalive is used to do a period keep alive - // messages using a ping. - EnableKeepAlive bool - - // KeepAliveInterval is how often to perform the keep alive - KeepAliveInterval time.Duration - - // ConnectionWriteTimeout is meant to be a "safety valve" timeout after - // we which will suspect a problem with the underlying connection and - // close it. This is only applied to writes, where's there's generally - // an expectation that things will move along quickly. - ConnectionWriteTimeout time.Duration - - // MaxStreamWindowSize is used to control the maximum - // window size that we allow for a stream. - MaxStreamWindowSize uint32 - - // LogOutput is used to control the log destination - LogOutput io.Writer -} - -// DefaultConfig is used to return a default configuration -func DefaultConfig() *Config { - return &Config{ - AcceptBacklog: 256, - EnableKeepAlive: true, - KeepAliveInterval: 30 * time.Second, - ConnectionWriteTimeout: 10 * time.Second, - MaxStreamWindowSize: initialStreamWindow, - LogOutput: os.Stderr, - } -} - -// VerifyConfig is used to verify the sanity of configuration -func VerifyConfig(config *Config) error { - if config.AcceptBacklog <= 0 { - return fmt.Errorf("backlog must be positive") - } - if config.KeepAliveInterval == 0 { - return fmt.Errorf("keep-alive interval must be positive") - } - if config.MaxStreamWindowSize < initialStreamWindow { - return fmt.Errorf("MaxStreamWindowSize must be larger than %d", initialStreamWindow) - } - return nil -} - -// Server is used to initialize a new server-side connection. -// There must be at most one server-side connection. If a nil config is -// provided, the DefaultConfiguration will be used. -func Server(conn io.ReadWriteCloser, config *Config) (*Session, error) { - if config == nil { - config = DefaultConfig() - } - if err := VerifyConfig(config); err != nil { - return nil, err - } - return newSession(config, conn, false), nil -} - -// Client is used to initialize a new client-side connection. -// There must be at most one client-side connection. -func Client(conn io.ReadWriteCloser, config *Config) (*Session, error) { - if config == nil { - config = DefaultConfig() - } - - if err := VerifyConfig(config); err != nil { - return nil, err - } - return newSession(config, conn, true), nil -} diff --git a/vendor/ b/vendor/ deleted file mode 100644 index e17981839..000000000 --- a/vendor/ +++ /dev/null @@ -1,623 +0,0 @@ -package yamux - -import ( - "bufio" - "fmt" - "io" - "io/ioutil" - "log" - "math" - "net" - "strings" - "sync" - "sync/atomic" - "time" -) - -// Session is used to wrap a reliable ordered connection and to -// multiplex it into multiple streams. -type Session struct { - // remoteGoAway indicates the remote side does - // not want futher connections. Must be first for alignment. - remoteGoAway int32 - - // localGoAway indicates that we should stop - // accepting futher connections. Must be first for alignment. - localGoAway int32 - - // nextStreamID is the next stream we should - // send. This depends if we are a client/server. - nextStreamID uint32 - - // config holds our configuration - config *Config - - // logger is used for our logs - logger *log.Logger - - // conn is the underlying connection - conn io.ReadWriteCloser - - // bufRead is a buffered reader - bufRead *bufio.Reader - - // pings is used to track inflight pings - pings map[uint32]chan struct{} - pingID uint32 - pingLock sync.Mutex - - // streams maps a stream id to a stream, and inflight has an entry - // for any outgoing stream that has not yet been established. Both are - // protected by streamLock. - streams map[uint32]*Stream - inflight map[uint32]struct{} - streamLock sync.Mutex - - // synCh acts like a semaphore. It is sized to the AcceptBacklog which - // is assumed to be symmetric between the client and server. This allows - // the client to avoid exceeding the backlog and instead blocks the open. - synCh chan struct{} - - // acceptCh is used to pass ready streams to the client - acceptCh chan *Stream - - // sendCh is used to mark a stream as ready to send, - // or to send a header out directly. - sendCh chan sendReady - - // recvDoneCh is closed when recv() exits to avoid a race - // between stream registration and stream shutdown - recvDoneCh chan struct{} - - // shutdown is used to safely close a session - shutdown bool - shutdownErr error - shutdownCh chan struct{} - shutdownLock sync.Mutex -} - -// sendReady is used to either mark a stream as ready -// or to directly send a header -type sendReady struct { - Hdr []byte - Body io.Reader - Err chan error -} - -// newSession is used to construct a new session -func newSession(config *Config, conn io.ReadWriteCloser, client bool) *Session { - s := &Session{ - config: config, - logger: log.New(config.LogOutput, "", log.LstdFlags), - conn: conn, - bufRead: bufio.NewReader(conn), - pings: make(map[uint32]chan struct{}), - streams: make(map[uint32]*Stream), - inflight: make(map[uint32]struct{}), - synCh: make(chan struct{}, config.AcceptBacklog), - acceptCh: make(chan *Stream, config.AcceptBacklog), - sendCh: make(chan sendReady, 64), - recvDoneCh: make(chan struct{}), - shutdownCh: make(chan struct{}), - } - if client { - s.nextStreamID = 1 - } else { - s.nextStreamID = 2 - } - go s.recv() - go s.send() - if config.EnableKeepAlive { - go s.keepalive() - } - return s -} - -// IsClosed does a safe check to see if we have shutdown -func (s *Session) IsClosed() bool { - select { - case <-s.shutdownCh: - return true - default: - return false - } -} - -// NumStreams returns the number of currently open streams -func (s *Session) NumStreams() int { - s.streamLock.Lock() - num := len(s.streams) - s.streamLock.Unlock() - return num -} - -// Open is used to create a new stream as a net.Conn -func (s *Session) Open() (net.Conn, error) { - conn, err := s.OpenStream() - if err != nil { - return nil, err - } - return conn, nil -} - -// OpenStream is used to create a new stream -func (s *Session) OpenStream() (*Stream, error) { - if s.IsClosed() { - return nil, ErrSessionShutdown - } - if atomic.LoadInt32(&s.remoteGoAway) == 1 { - return nil, ErrRemoteGoAway - } - - // Block if we have too many inflight SYNs - select { - case s.synCh <- struct{}{}: - case <-s.shutdownCh: - return nil, ErrSessionShutdown - } - -GET_ID: - // Get an ID, and check for stream exhaustion - id := atomic.LoadUint32(&s.nextStreamID) - if id >= math.MaxUint32-1 { - return nil, ErrStreamsExhausted - } - if !atomic.CompareAndSwapUint32(&s.nextStreamID, id, id+2) { - goto GET_ID - } - - // Register the stream - stream := newStream(s, id, streamInit) - s.streamLock.Lock() - s.streams[id] = stream - s.inflight[id] = struct{}{} - s.streamLock.Unlock() - - // Send the window update to create - if err := stream.sendWindowUpdate(); err != nil { - select { - case <-s.synCh: - default: - s.logger.Printf("[ERR] yamux: aborted stream open without inflight syn semaphore") - } - return nil, err - } - return stream, nil -} - -// Accept is used to block until the next available stream -// is ready to be accepted. -func (s *Session) Accept() (net.Conn, error) { - conn, err := s.AcceptStream() - if err != nil { - return nil, err - } - return conn, err -} - -// AcceptStream is used to block until the next available stream -// is ready to be accepted. -func (s *Session) AcceptStream() (*Stream, error) { - select { - case stream := <-s.acceptCh: - if err := stream.sendWindowUpdate(); err != nil { - return nil, err - } - return stream, nil - case <-s.shutdownCh: - return nil, s.shutdownErr - } -} - -// Close is used to close the session and all streams. -// Attempts to send a GoAway before closing the connection. -func (s *Session) Close() error { - s.shutdownLock.Lock() - defer s.shutdownLock.Unlock() - - if s.shutdown { - return nil - } - s.shutdown = true - if s.shutdownErr == nil { - s.shutdownErr = ErrSessionShutdown - } - close(s.shutdownCh) - s.conn.Close() - <-s.recvDoneCh - - s.streamLock.Lock() - defer s.streamLock.Unlock() - for _, stream := range s.streams { - stream.forceClose() - } - return nil -} - -// exitErr is used to handle an error that is causing the -// session to terminate. -func (s *Session) exitErr(err error) { - s.shutdownLock.Lock() - if s.shutdownErr == nil { - s.shutdownErr = err - } - s.shutdownLock.Unlock() - s.Close() -} - -// GoAway can be used to prevent accepting further -// connections. It does not close the underlying conn. -func (s *Session) GoAway() error { - return s.waitForSend(s.goAway(goAwayNormal), nil) -} - -// goAway is used to send a goAway message -func (s *Session) goAway(reason uint32) header { - atomic.SwapInt32(&s.localGoAway, 1) - hdr := header(make([]byte, headerSize)) - hdr.encode(typeGoAway, 0, 0, reason) - return hdr -} - -// Ping is used to measure the RTT response time -func (s *Session) Ping() (time.Duration, error) { - // Get a channel for the ping - ch := make(chan struct{}) - - // Get a new ping id, mark as pending - s.pingLock.Lock() - id := s.pingID - s.pingID++ - s.pings[id] = ch - s.pingLock.Unlock() - - // Send the ping request - hdr := header(make([]byte, headerSize)) - hdr.encode(typePing, flagSYN, 0, id) - if err := s.waitForSend(hdr, nil); err != nil { - return 0, err - } - - // Wait for a response - start := time.Now() - select { - case <-ch: - case <-time.After(s.config.ConnectionWriteTimeout): - s.pingLock.Lock() - delete(s.pings, id) // Ignore it if a response comes later. - s.pingLock.Unlock() - return 0, ErrTimeout - case <-s.shutdownCh: - return 0, ErrSessionShutdown - } - - // Compute the RTT - return time.Now().Sub(start), nil -} - -// keepalive is a long running goroutine that periodically does -// a ping to keep the connection alive. -func (s *Session) keepalive() { - for { - select { - case <-time.After(s.config.KeepAliveInterval): - _, err := s.Ping() - if err != nil { - s.logger.Printf("[ERR] yamux: keepalive failed: %v", err) - s.exitErr(ErrKeepAliveTimeout) - return - } - case <-s.shutdownCh: - return - } - } -} - -// waitForSendErr waits to send a header, checking for a potential shutdown -func (s *Session) waitForSend(hdr header, body io.Reader) error { - errCh := make(chan error, 1) - return s.waitForSendErr(hdr, body, errCh) -} - -// waitForSendErr waits to send a header with optional data, checking for a -// potential shutdown. Since there's the expectation that sends can happen -// in a timely manner, we enforce the connection write timeout here. -func (s *Session) waitForSendErr(hdr header, body io.Reader, errCh chan error) error { - timer := time.NewTimer(s.config.ConnectionWriteTimeout) - defer timer.Stop() - - ready := sendReady{Hdr: hdr, Body: body, Err: errCh} - select { - case s.sendCh <- ready: - case <-s.shutdownCh: - return ErrSessionShutdown - case <-timer.C: - return ErrConnectionWriteTimeout - } - - select { - case err := <-errCh: - return err - case <-s.shutdownCh: - return ErrSessionShutdown - case <-timer.C: - return ErrConnectionWriteTimeout - } -} - -// sendNoWait does a send without waiting. Since there's the expectation that -// the send happens right here, we enforce the connection write timeout if we -// can't queue the header to be sent. -func (s *Session) sendNoWait(hdr header) error { - timer := time.NewTimer(s.config.ConnectionWriteTimeout) - defer timer.Stop() - - select { - case s.sendCh <- sendReady{Hdr: hdr}: - return nil - case <-s.shutdownCh: - return ErrSessionShutdown - case <-timer.C: - return ErrConnectionWriteTimeout - } -} - -// send is a long running goroutine that sends data -func (s *Session) send() { - for { - select { - case ready := <-s.sendCh: - // Send a header if ready - if ready.Hdr != nil { - sent := 0 - for sent < len(ready.Hdr) { - n, err := s.conn.Write(ready.Hdr[sent:]) - if err != nil { - s.logger.Printf("[ERR] yamux: Failed to write header: %v", err) - asyncSendErr(ready.Err, err) - s.exitErr(err) - return - } - sent += n - } - } - - // Send data from a body if given - if ready.Body != nil { - _, err := io.Copy(s.conn, ready.Body) - if err != nil { - s.logger.Printf("[ERR] yamux: Failed to write body: %v", err) - asyncSendErr(ready.Err, err) - s.exitErr(err) - return - } - } - - // No error, successful send - asyncSendErr(ready.Err, nil) - case <-s.shutdownCh: - return - } - } -} - -// recv is a long running goroutine that accepts new data -func (s *Session) recv() { - if err := s.recvLoop(); err != nil { - s.exitErr(err) - } -} - -// recvLoop continues to receive data until a fatal error is encountered -func (s *Session) recvLoop() error { - defer close(s.recvDoneCh) - hdr := header(make([]byte, headerSize)) - var handler func(header) error - for { - // Read the header - if _, err := io.ReadFull(s.bufRead, hdr); err != nil { - if err != io.EOF && !strings.Contains(err.Error(), "closed") && !strings.Contains(err.Error(), "reset by peer") { - s.logger.Printf("[ERR] yamux: Failed to read header: %v", err) - } - return err - } - - // Verify the version - if hdr.Version() != protoVersion { - s.logger.Printf("[ERR] yamux: Invalid protocol version: %d", hdr.Version()) - return ErrInvalidVersion - } - - // Switch on the type - switch hdr.MsgType() { - case typeData: - handler = s.handleStreamMessage - case typeWindowUpdate: - handler = s.handleStreamMessage - case typeGoAway: - handler = s.handleGoAway - case typePing: - handler = s.handlePing - default: - return ErrInvalidMsgType - } - - // Invoke the handler - if err := handler(hdr); err != nil { - return err - } - } -} - -// handleStreamMessage handles either a data or window update frame -func (s *Session) handleStreamMessage(hdr header) error { - // Check for a new stream creation - id := hdr.StreamID() - flags := hdr.Flags() - if flags&flagSYN == flagSYN { - if err := s.incomingStream(id); err != nil { - return err - } - } - - // Get the stream - s.streamLock.Lock() - stream := s.streams[id] - s.streamLock.Unlock() - - // If we do not have a stream, likely we sent a RST - if stream == nil { - // Drain any data on the wire - if hdr.MsgType() == typeData && hdr.Length() > 0 { - s.logger.Printf("[WARN] yamux: Discarding data for stream: %d", id) - if _, err := io.CopyN(ioutil.Discard, s.bufRead, int64(hdr.Length())); err != nil { - s.logger.Printf("[ERR] yamux: Failed to discard data: %v", err) - return nil - } - } else { - s.logger.Printf("[WARN] yamux: frame for missing stream: %v", hdr) - } - return nil - } - - // Check if this is a window update - if hdr.MsgType() == typeWindowUpdate { - if err := stream.incrSendWindow(hdr, flags); err != nil { - if sendErr := s.sendNoWait(s.goAway(goAwayProtoErr)); sendErr != nil { - s.logger.Printf("[WARN] yamux: failed to send go away: %v", sendErr) - } - return err - } - return nil - } - - // Read the new data - if err := stream.readData(hdr, flags, s.bufRead); err != nil { - if sendErr := s.sendNoWait(s.goAway(goAwayProtoErr)); sendErr != nil { - s.logger.Printf("[WARN] yamux: failed to send go away: %v", sendErr) - } - return err - } - return nil -} - -// handlePing is invokde for a typePing frame -func (s *Session) handlePing(hdr header) error { - flags := hdr.Flags() - pingID := hdr.Length() - - // Check if this is a query, respond back in a separate context so we - // don't interfere with the receiving thread blocking for the write. - if flags&flagSYN == flagSYN { - go func() { - hdr := header(make([]byte, headerSize)) - hdr.encode(typePing, flagACK, 0, pingID) - if err := s.sendNoWait(hdr); err != nil { - s.logger.Printf("[WARN] yamux: failed to send ping reply: %v", err) - } - }() - return nil - } - - // Handle a response - s.pingLock.Lock() - ch := s.pings[pingID] - if ch != nil { - delete(s.pings, pingID) - close(ch) - } - s.pingLock.Unlock() - return nil -} - -// handleGoAway is invokde for a typeGoAway frame -func (s *Session) handleGoAway(hdr header) error { - code := hdr.Length() - switch code { - case goAwayNormal: - atomic.SwapInt32(&s.remoteGoAway, 1) - case goAwayProtoErr: - s.logger.Printf("[ERR] yamux: received protocol error go away") - return fmt.Errorf("yamux protocol error") - case goAwayInternalErr: - s.logger.Printf("[ERR] yamux: received internal error go away") - return fmt.Errorf("remote yamux internal error") - default: - s.logger.Printf("[ERR] yamux: received unexpected go away") - return fmt.Errorf("unexpected go away received") - } - return nil -} - -// incomingStream is used to create a new incoming stream -func (s *Session) incomingStream(id uint32) error { - // Reject immediately if we are doing a go away - if atomic.LoadInt32(&s.localGoAway) == 1 { - hdr := header(make([]byte, headerSize)) - hdr.encode(typeWindowUpdate, flagRST, id, 0) - return s.sendNoWait(hdr) - } - - // Allocate a new stream - stream := newStream(s, id, streamSYNReceived) - - s.streamLock.Lock() - defer s.streamLock.Unlock() - - // Check if stream already exists - if _, ok := s.streams[id]; ok { - s.logger.Printf("[ERR] yamux: duplicate stream declared") - if sendErr := s.sendNoWait(s.goAway(goAwayProtoErr)); sendErr != nil { - s.logger.Printf("[WARN] yamux: failed to send go away: %v", sendErr) - } - return ErrDuplicateStream - } - - // Register the stream - s.streams[id] = stream - - // Check if we've exceeded the backlog - select { - case s.acceptCh <- stream: - return nil - default: - // Backlog exceeded! RST the stream - s.logger.Printf("[WARN] yamux: backlog exceeded, forcing connection reset") - delete(s.streams, id) - stream.sendHdr.encode(typeWindowUpdate, flagRST, id, 0) - return s.sendNoWait(stream.sendHdr) - } -} - -// closeStream is used to close a stream once both sides have -// issued a close. If there was an in-flight SYN and the stream -// was not yet established, then this will give the credit back. -func (s *Session) closeStream(id uint32) { - s.streamLock.Lock() - if _, ok := s.inflight[id]; ok { - select { - case <-s.synCh: - default: - s.logger.Printf("[ERR] yamux: SYN tracking out of sync") - } - } - delete(s.streams, id) - s.streamLock.Unlock() -} - -// establishStream is used to mark a stream that was in the -// SYN Sent state as established. -func (s *Session) establishStream(id uint32) { - s.streamLock.Lock() - if _, ok := s.inflight[id]; ok { - delete(s.inflight, id) - } else { - s.logger.Printf("[ERR] yamux: established stream without inflight SYN (no tracking entry)") - } - select { - case <-s.synCh: - default: - s.logger.Printf("[ERR] yamux: established stream without inflight SYN (didn't have semaphore)") - } - s.streamLock.Unlock() -} diff --git a/vendor/ b/vendor/ deleted file mode 100644 index d216e281c..000000000 --- a/vendor/ +++ /dev/null @@ -1,457 +0,0 @@ -package yamux - -import ( - "bytes" - "io" - "sync" - "sync/atomic" - "time" -) - -type streamState int - -const ( - streamInit streamState = iota - streamSYNSent - streamSYNReceived - streamEstablished - streamLocalClose - streamRemoteClose - streamClosed - streamReset -) - -// Stream is used to represent a logical stream -// within a session. -type Stream struct { - recvWindow uint32 - sendWindow uint32 - - id uint32 - session *Session - - state streamState - stateLock sync.Mutex - - recvBuf *bytes.Buffer - recvLock sync.Mutex - - controlHdr header - controlErr chan error - controlHdrLock sync.Mutex - - sendHdr header - sendErr chan error - sendLock sync.Mutex - - recvNotifyCh chan struct{} - sendNotifyCh chan struct{} - - readDeadline time.Time - writeDeadline time.Time -} - -// newStream is used to construct a new stream within -// a given session for an ID -func newStream(session *Session, id uint32, state streamState) *Stream { - s := &Stream{ - id: id, - session: session, - state: state, - controlHdr: header(make([]byte, headerSize)), - controlErr: make(chan error, 1), - sendHdr: header(make([]byte, headerSize)), - sendErr: make(chan error, 1), - recvWindow: initialStreamWindow, - sendWindow: initialStreamWindow, - recvNotifyCh: make(chan struct{}, 1), - sendNotifyCh: make(chan struct{}, 1), - } - return s -} - -// Session returns the associated stream session -func (s *Stream) Session() *Session { - return s.session -} - -// StreamID returns the ID of this stream -func (s *Stream) StreamID() uint32 { - return -} - -// Read is used to read from the stream -func (s *Stream) Read(b []byte) (n int, err error) { - defer asyncNotify(s.recvNotifyCh) -START: - s.stateLock.Lock() - switch s.state { - case streamLocalClose: - fallthrough - case streamRemoteClose: - fallthrough - case streamClosed: - s.recvLock.Lock() - if s.recvBuf == nil || s.recvBuf.Len() == 0 { - s.recvLock.Unlock() - s.stateLock.Unlock() - return 0, io.EOF - } - s.recvLock.Unlock() - case streamReset: - s.stateLock.Unlock() - return 0, ErrConnectionReset - } - s.stateLock.Unlock() - - // If there is no data available, block - s.recvLock.Lock() - if s.recvBuf == nil || s.recvBuf.Len() == 0 { - s.recvLock.Unlock() - goto WAIT - } - - // Read any bytes - n, _ = s.recvBuf.Read(b) - s.recvLock.Unlock() - - // Send a window update potentially - err = s.sendWindowUpdate() - return n, err - -WAIT: - var timeout <-chan time.Time - var timer *time.Timer - if !s.readDeadline.IsZero() { - delay := s.readDeadline.Sub(time.Now()) - timer = time.NewTimer(delay) - timeout = timer.C - } - select { - case <-s.recvNotifyCh: - if timer != nil { - timer.Stop() - } - goto START - case <-timeout: - return 0, ErrTimeout - } -} - -// Write is used to write to the stream -func (s *Stream) Write(b []byte) (n int, err error) { - s.sendLock.Lock() - defer s.sendLock.Unlock() - total := 0 - for total < len(b) { - n, err := s.write(b[total:]) - total += n - if err != nil { - return total, err - } - } - return total, nil -} - -// write is used to write to the stream, may return on -// a short write. -func (s *Stream) write(b []byte) (n int, err error) { - var flags uint16 - var max uint32 - var body io.Reader -START: - s.stateLock.Lock() - switch s.state { - case streamLocalClose: - fallthrough - case streamClosed: - s.stateLock.Unlock() - return 0, ErrStreamClosed - case streamReset: - s.stateLock.Unlock() - return 0, ErrConnectionReset - } - s.stateLock.Unlock() - - // If there is no data available, block - window := atomic.LoadUint32(&s.sendWindow) - if window == 0 { - goto WAIT - } - - // Determine the flags if any - flags = s.sendFlags() - - // Send up to our send window - max = min(window, uint32(len(b))) - body = bytes.NewReader(b[:max]) - - // Send the header - s.sendHdr.encode(typeData, flags,, max) - if err := s.session.waitForSendErr(s.sendHdr, body, s.sendErr); err != nil { - return 0, err - } - - // Reduce our send window - atomic.AddUint32(&s.sendWindow, ^uint32(max-1)) - - // Unlock - return int(max), err - -WAIT: - var timeout <-chan time.Time - if !s.writeDeadline.IsZero() { - delay := s.writeDeadline.Sub(time.Now()) - timeout = time.After(delay) - } - select { - case <-s.sendNotifyCh: - goto START - case <-timeout: - return 0, ErrTimeout - } - return 0, nil -} - -// sendFlags determines any flags that are appropriate -// based on the current stream state -func (s *Stream) sendFlags() uint16 { - s.stateLock.Lock() - defer s.stateLock.Unlock() - var flags uint16 - switch s.state { - case streamInit: - flags |= flagSYN - s.state = streamSYNSent - case streamSYNReceived: - flags |= flagACK - s.state = streamEstablished - } - return flags -} - -// sendWindowUpdate potentially sends a window update enabling -// further writes to take place. Must be invoked with the lock. -func (s *Stream) sendWindowUpdate() error { - s.controlHdrLock.Lock() - defer s.controlHdrLock.Unlock() - - // Determine the delta update - max := s.session.config.MaxStreamWindowSize - delta := max - atomic.LoadUint32(&s.recvWindow) - - // Determine the flags if any - flags := s.sendFlags() - - // Check if we can omit the update - if delta < (max/2) && flags == 0 { - return nil - } - - // Update our window - atomic.AddUint32(&s.recvWindow, delta) - - // Send the header - s.controlHdr.encode(typeWindowUpdate, flags,, delta) - if err := s.session.waitForSendErr(s.controlHdr, nil, s.controlErr); err != nil { - return err - } - return nil -} - -// sendClose is used to send a FIN -func (s *Stream) sendClose() error { - s.controlHdrLock.Lock() - defer s.controlHdrLock.Unlock() - - flags := s.sendFlags() - flags |= flagFIN - s.controlHdr.encode(typeWindowUpdate, flags,, 0) - if err := s.session.waitForSendErr(s.controlHdr, nil, s.controlErr); err != nil { - return err - } - return nil -} - -// Close is used to close the stream -func (s *Stream) Close() error { - closeStream := false - s.stateLock.Lock() - switch s.state { - // Opened means we need to signal a close - case streamSYNSent: - fallthrough - case streamSYNReceived: - fallthrough - case streamEstablished: - s.state = streamLocalClose - goto SEND_CLOSE - - case streamLocalClose: - case streamRemoteClose: - s.state = streamClosed - closeStream = true - goto SEND_CLOSE - - case streamClosed: - case streamReset: - default: - panic("unhandled state") - } - s.stateLock.Unlock() - return nil -SEND_CLOSE: - s.stateLock.Unlock() - s.sendClose() - s.notifyWaiting() - if closeStream { - s.session.closeStream( - } - return nil -} - -// forceClose is used for when the session is exiting -func (s *Stream) forceClose() { - s.stateLock.Lock() - s.state = streamClosed - s.stateLock.Unlock() - s.notifyWaiting() -} - -// processFlags is used to update the state of the stream -// based on set flags, if any. Lock must be held -func (s *Stream) processFlags(flags uint16) error { - // Close the stream without holding the state lock - closeStream := false - defer func() { - if closeStream { - s.session.closeStream( - } - }() - - s.stateLock.Lock() - defer s.stateLock.Unlock() - if flags&flagACK == flagACK { - if s.state == streamSYNSent { - s.state = streamEstablished - } - s.session.establishStream( - } - if flags&flagFIN == flagFIN { - switch s.state { - case streamSYNSent: - fallthrough - case streamSYNReceived: - fallthrough - case streamEstablished: - s.state = streamRemoteClose - s.notifyWaiting() - case streamLocalClose: - s.state = streamClosed - closeStream = true - s.notifyWaiting() - default: - s.session.logger.Printf("[ERR] yamux: unexpected FIN flag in state %d", s.state) - return ErrUnexpectedFlag - } - } - if flags&flagRST == flagRST { - s.state = streamReset - closeStream = true - s.notifyWaiting() - } - return nil -} - -// notifyWaiting notifies all the waiting channels -func (s *Stream) notifyWaiting() { - asyncNotify(s.recvNotifyCh) - asyncNotify(s.sendNotifyCh) -} - -// incrSendWindow updates the size of our send window -func (s *Stream) incrSendWindow(hdr header, flags uint16) error { - if err := s.processFlags(flags); err != nil { - return err - } - - // Increase window, unblock a sender - atomic.AddUint32(&s.sendWindow, hdr.Length()) - asyncNotify(s.sendNotifyCh) - return nil -} - -// readData is used to handle a data frame -func (s *Stream) readData(hdr header, flags uint16, conn io.Reader) error { - if err := s.processFlags(flags); err != nil { - return err - } - - // Check that our recv window is not exceeded - length := hdr.Length() - if length == 0 { - return nil - } - if remain := atomic.LoadUint32(&s.recvWindow); length > remain { - s.session.logger.Printf("[ERR] yamux: receive window exceeded (stream: %d, remain: %d, recv: %d)",, remain, length) - return ErrRecvWindowExceeded - } - - // Wrap in a limited reader - conn = &io.LimitedReader{R: conn, N: int64(length)} - - // Copy into buffer - s.recvLock.Lock() - if s.recvBuf == nil { - // Allocate the receive buffer just-in-time to fit the full data frame. - // This way we can read in the whole packet without further allocations. - s.recvBuf = bytes.NewBuffer(make([]byte, 0, length)) - } - if _, err := io.Copy(s.recvBuf, conn); err != nil { - s.session.logger.Printf("[ERR] yamux: Failed to read stream data: %v", err) - s.recvLock.Unlock() - return err - } - - // Decrement the receive window - atomic.AddUint32(&s.recvWindow, ^uint32(length-1)) - s.recvLock.Unlock() - - // Unblock any readers - asyncNotify(s.recvNotifyCh) - return nil -} - -// SetDeadline sets the read and write deadlines -func (s *Stream) SetDeadline(t time.Time) error { - if err := s.SetReadDeadline(t); err != nil { - return err - } - if err := s.SetWriteDeadline(t); err != nil { - return err - } - return nil -} - -// SetReadDeadline sets the deadline for future Read calls. -func (s *Stream) SetReadDeadline(t time.Time) error { - s.readDeadline = t - return nil -} - -// SetWriteDeadline sets the deadline for future Write calls -func (s *Stream) SetWriteDeadline(t time.Time) error { - s.writeDeadline = t - return nil -} - -// Shrink is used to compact the amount of buffers utilized -// This is useful when using Yamux in a connection pool to reduce -// the idle memory utilization. -func (s *Stream) Shrink() { - s.recvLock.Lock() - if s.recvBuf != nil && s.recvBuf.Len() == 0 { - s.recvBuf = nil - } - s.recvLock.Unlock() -} diff --git a/vendor/ b/vendor/ deleted file mode 100644 index 5fe45afcd..000000000 --- a/vendor/ +++ /dev/null @@ -1,28 +0,0 @@ -package yamux - -// asyncSendErr is used to try an async send of an error -func asyncSendErr(ch chan error, err error) { - if ch == nil { - return - } - select { - case ch <- err: - default: - } -} - -// asyncNotify is used to signal a waiting goroutine -func asyncNotify(ch chan struct{}) { - select { - case ch <- struct{}{}: - default: - } -} - -// min computes the minimum of two values -func min(a, b uint32) uint32 { - if a < b { - return a - } - return b -} diff --git a/vendor/ b/vendor/ index 8c13118d8..778151ae4 100644 --- a/vendor/ +++ b/vendor/ @@ -7,17 +7,17 @@ import ( var defaultEmitter Emitter -const emitQueue = 8192 - func init() { defaultEmitter.init() } type ( + // packet emit request emitPacket struct { - conn net.PacketConn - to net.Addr - data []byte + conn net.PacketConn + to net.Addr + data []byte + // mark this packet should recycle to global xmitBuf recycle bool } @@ -28,7 +28,7 @@ type ( ) func (e *Emitter) init() { - = make(chan emitPacket, emitQueue) + = make(chan emitPacket) go e.emitTask() } @@ -36,7 +36,7 @@ func (e *Emitter) init() { func (e *Emitter) emitTask() { for p := range { if n, err := p.conn.WriteTo(,; err == nil { - atomic.AddUint64(&DefaultSnmp.OutSegs, 1) + atomic.AddUint64(&DefaultSnmp.OutPkts, 1) atomic.AddUint64(&DefaultSnmp.OutBytes, uint64(n)) } if p.recycle { diff --git a/vendor/ b/vendor/ index 4ad88aa24..86efc3ab4 100644 --- a/vendor/ +++ b/vendor/ @@ -117,6 +117,7 @@ func (seg *Segment) encode(ptr []byte) []byte { ptr = ikcp_encode32u(ptr, ptr = ikcp_encode32u(ptr, seg.una) ptr = ikcp_encode32u(ptr, uint32(len( + atomic.AddUint64(&DefaultSnmp.OutSegs, 1) return ptr } @@ -484,9 +485,10 @@ func (kcp *KCP) Input(data []byte, regular, ackNoDelay bool) int { } var maxack uint32 + var lastackts uint32 var flag int + var inSegs uint64 - current := currentMs() for { var ts, sn, length, una, conv uint32 var wnd uint16 @@ -525,10 +527,6 @@ func (kcp *KCP) Input(data []byte, regular, ackNoDelay bool) int { kcp.shrink_buf() if cmd == IKCP_CMD_ACK { - if _itimediff(current, ts) >= 0 { - kcp.update_ack(_itimediff(current, ts)) - } - kcp.parse_ack(sn) kcp.shrink_buf() if flag == 0 { @@ -537,6 +535,7 @@ func (kcp *KCP) Input(data []byte, regular, ackNoDelay bool) int { } else if _itimediff(sn, maxack) > 0 { maxack = sn } + lastackts = ts } else if cmd == IKCP_CMD_PUSH { if _itimediff(sn, kcp.rcv_nxt+kcp.rcv_wnd) < 0 { kcp.ack_push(sn, ts) @@ -567,11 +566,17 @@ func (kcp *KCP) Input(data []byte, regular, ackNoDelay bool) int { return -3 } + inSegs++ data = data[length:] } + atomic.AddUint64(&DefaultSnmp.InSegs, inSegs) if flag != 0 && regular { kcp.parse_fastack(maxack) + current := currentMs() + if _itimediff(current, lastackts) >= 0 { + kcp.update_ack(_itimediff(current, lastackts)) + } } if _itimediff(kcp.snd_una, una) > 0 { @@ -660,9 +665,9 @@ func (kcp *KCP) flush(ackOnly bool) { return } - current := currentMs() // probe window size (if remote window size equals zero) if kcp.rmt_wnd == 0 { + current := currentMs() if kcp.probe_wait == 0 { kcp.probe_wait = IKCP_PROBE_INIT kcp.ts_probe = current + kcp.probe_wait @@ -742,6 +747,7 @@ func (kcp *KCP) flush(ackOnly bool) { // send new segments for k := len(kcp.snd_buf) - newSegsCount; k < len(kcp.snd_buf); k++ { + current := currentMs() segment := &kcp.snd_buf[k] segment.xmit++ segment.rto = kcp.rx_rto @@ -765,6 +771,7 @@ func (kcp *KCP) flush(ackOnly bool) { // check for retransmissions for k := 0; k < len(kcp.snd_buf)-newSegsCount; k++ { + current := currentMs() segment := &kcp.snd_buf[k] needsend := false if _itimediff(current, segment.resendts) >= 0 { // RTO diff --git a/vendor/ b/vendor/ index f5df379de..7ddfa797f 100644 --- a/vendor/ +++ b/vendor/ @@ -23,13 +23,23 @@ func (errTimeout) Temporary() bool { return true } func (errTimeout) Error() string { return "i/o timeout" } const ( - defaultWndSize = 128 // default window size, in packet - nonceSize = 16 // magic number - crcSize = 4 // 4bytes packet checksum + // 16-bytes magic number for each packet + nonceSize = 16 + + // 4-bytes packet checksum + crcSize = 4 + + // overall crypto header size cryptHeaderSize = nonceSize + crcSize - mtuLimit = 2048 - rxQueueLimit = 8192 - rxFECMulti = 3 // FEC keeps rxFECMulti* (dataShard+parityShard) ordered packets in memory + + // maximum packet size + mtuLimit = 2048 + + // packet receiving channel limit + rxQueueLimit = 2048 + + // FEC keeps rxFECMulti* (dataShard+parityShard) ordered packets in memory + rxFECMulti = 3 ) const ( @@ -38,8 +48,12 @@ const ( ) var ( + // global packet buffer + // shared among sending/receiving/FEC xmitBuf sync.Pool - sid uint32 + + // monotonic session id + sid uint32 ) func init() { @@ -51,36 +65,39 @@ func init() { type ( // UDPSession defines a KCP session implemented by UDP UDPSession struct { - // core - sid uint32 - conn net.PacketConn // the underlying packet socket - kcp *KCP // the core ARQ - l *Listener // point to server listener if it's a server socket - block BlockCrypt // encryption - sockbuff []byte // kcp receiving is based on packet, I turn it into stream + sid uint32 // session id(monotonic) + conn net.PacketConn // the underlying packet connection + kcp *KCP // KCP ARQ protocol + l *Listener // point to the Listener if it's accepted by Listener + block BlockCrypt // block encryption - // forward error correction - fec *FEC - fecDataShards [][]byte - fecHeaderOffset int - fecPayloadOffset int - fecCnt int // count datashard - fecMaxSize int // record maximum data length in datashard + // kcp receiving is based on packets + // sockbuff turns packets into stream + sockbuff []byte + + fec *FEC // forward error correction + fecDataShards [][]byte // data shards cache + fecShardCount int // count the number of datashards collected + fecMaxSize int // record maximum data length in datashard + + fecHeaderOffset int // FEC header offset in packet + fecPayloadOffset int // FEC payload offset in packet // settings - remote net.Addr + remote net.Addr // remote peer address rd time.Time // read deadline wd time.Time // write deadline - headerSize int - updateInterval int32 - ackNoDelay bool + headerSize int // the overall header size added before KCP frame + updateInterval int32 // interval in seconds to call kcp.flush() + ackNoDelay bool // send ack immediately for each incoming packet // notifications - die chan struct{} - chReadEvent chan struct{} - chWriteEvent chan struct{} - isClosed bool - mu sync.Mutex + die chan struct{} // notify session has Closed + chReadEvent chan struct{} // notify Read() can be called without blocking + chWriteEvent chan struct{} // notify Write() can be called without blocking + + isClosed bool // flag the session has Closed + mu sync.Mutex } setReadBuffer interface { @@ -132,7 +149,6 @@ func newUDPSession(conv uint32, dataShards, parityShards int, l *Listener, conn sess.output(buf[:size]) } }) - sess.kcp.WndSize(defaultWndSize, defaultWndSize) sess.kcp.SetMtu(IKCP_MTU_DEF - sess.headerSize) sess.kcp.setFEC(dataShards, parityShards) @@ -324,11 +340,16 @@ func (s *UDPSession) SetWindowSize(sndwnd, rcvwnd int) { s.kcp.WndSize(sndwnd, rcvwnd) } -// SetMtu sets the maximum transmission unit -func (s *UDPSession) SetMtu(mtu int) { +// SetMtu sets the maximum transmission unit(not including UDP header) +func (s *UDPSession) SetMtu(mtu int) bool { + if mtu > mtuLimit { + return false + } + defer s.kcp.SetMtu(mtu - s.headerSize) + return true } // SetStreamMode toggles the stream mode on/off @@ -416,9 +437,9 @@ func (s *UDPSession) output(buf []byte) { // copy data to fec datashards sz := len(ext) - s.fecDataShards[s.fecCnt] = s.fecDataShards[s.fecCnt][:sz] - copy(s.fecDataShards[s.fecCnt], ext) - s.fecCnt++ + s.fecDataShards[s.fecShardCount] = s.fecDataShards[s.fecShardCount][:sz] + copy(s.fecDataShards[s.fecShardCount], ext) + s.fecShardCount++ // record max datashard length if sz > s.fecMaxSize { @@ -426,7 +447,7 @@ func (s *UDPSession) output(buf []byte) { } // calculate Reed-Solomon Erasure Code - if s.fecCnt == s.fec.dataShards { + if s.fecShardCount == s.fec.dataShards { // bzero each datashard's tail for i := 0; i < s.fec.dataShards; i++ { shard := s.fecDataShards[i] @@ -442,7 +463,7 @@ func (s *UDPSession) output(buf []byte) { } // reset counters to zero - s.fecCnt = 0 + s.fecShardCount = 0 s.fecMaxSize = 0 } } @@ -557,7 +578,7 @@ func (s *UDPSession) kcpInput(data []byte) { } - atomic.AddUint64(&DefaultSnmp.InSegs, 1) + atomic.AddUint64(&DefaultSnmp.InPkts, 1) atomic.AddUint64(&DefaultSnmp.InBytes, uint64(len(data))) if fecParityShards > 0 { atomic.AddUint64(&DefaultSnmp.FECParityShards, fecParityShards) @@ -626,21 +647,23 @@ func (s *UDPSession) readLoop() { type ( // Listener defines a server listening for connections Listener struct { - block BlockCrypt - dataShards, parityShards int - fec *FEC // for fec init test - conn net.PacketConn - sessions map[string]*UDPSession - chAccepts chan *UDPSession - chDeadlinks chan net.Addr - headerSize int - die chan struct{} - rxbuf sync.Pool - rd atomic.Value - wd atomic.Value + block BlockCrypt // block encryption + dataShards int // FEC data shard + parityShards int // FEC parity shard + fec *FEC // FEC mock initialization + conn net.PacketConn // the underlying packet connection + + sessions map[string]*UDPSession // all sessions accepted by this Listener + chAccepts chan *UDPSession // Listen() backlog + chDeadlinks chan net.Addr // session close queue + headerSize int // the overall header size added before KCP frame + die chan struct{} // notify the listener has closed + rd atomic.Value // read deadline for Accept() + wd atomic.Value } - packet struct { + // incoming packet + inPacket struct { from net.Addr data []byte } @@ -648,7 +671,7 @@ type ( // monitor incoming data for all connections of server func (l *Listener) monitor() { - chPacket := make(chan packet, rxQueueLimit) + chPacket := make(chan inPacket, rxQueueLimit) go l.receiver(chPacket) for { select { @@ -699,7 +722,7 @@ func (l *Listener) monitor() { } } - l.rxbuf.Put(raw) + xmitBuf.Put(raw) case deadlink := <-l.chDeadlinks: delete(l.sessions, deadlink.String()) case <-l.die: @@ -708,11 +731,11 @@ func (l *Listener) monitor() { } } -func (l *Listener) receiver(ch chan packet) { +func (l *Listener) receiver(ch chan inPacket) { for { - data := l.rxbuf.Get().([]byte)[:mtuLimit] + data := xmitBuf.Get().([]byte)[:mtuLimit] if n, from, err := l.conn.ReadFrom(data); IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/vendor/ b/vendor/ new file mode 100644 index 000000000..36062d7be --- /dev/null +++ b/vendor/ @@ -0,0 +1,60 @@ +package smux + +import ( + "encoding/binary" + "fmt" +) + +const ( + version = 1 +) + +const ( // cmds + cmdSYN byte = iota // stream open + cmdFIN // stream close, a.k.a EOF mark + cmdPSH // data push + cmdNOP // no operation +) + +const ( + sizeOfVer = 1 + sizeOfCmd = 1 + sizeOfLength = 2 + sizeOfSid = 4 + headerSize = sizeOfVer + sizeOfCmd + sizeOfSid + sizeOfLength +) + +// Frame defines a packet from or to be multiplexed into a single connection +type Frame struct { + ver byte + cmd byte + sid uint32 + data []byte +} + +func newFrame(cmd byte, sid uint32) Frame { + return Frame{ver: version, cmd: cmd, sid: sid} +} + +type rawHeader []byte + +func (h rawHeader) Version() byte { + return h[0] +} + +func (h rawHeader) Cmd() byte { + return h[1] +} + +func (h rawHeader) Length() uint16 { + return binary.LittleEndian.Uint16(h[2:]) +} + +func (h rawHeader) StreamID() uint32 { + return binary.LittleEndian.Uint32(h[4:]) +} + +func (h rawHeader) String() string { + return fmt.Sprintf("Version:%d Cmd:%d StreamID:%d Length:%d", + h.Version(), h.Cmd(), h.StreamID(), h.Length()) +} diff --git a/vendor/ b/vendor/ new file mode 100644 index 000000000..afcf58b49 --- /dev/null +++ b/vendor/ @@ -0,0 +1,80 @@ +package smux + +import ( + "fmt" + "io" + "time" + + "" +) + +// Config is used to tune the Smux session +type Config struct { + // KeepAliveInterval is how often to send a NOP command to the remote + KeepAliveInterval time.Duration + + // KeepAliveTimeout is how long the session + // will be closed if no data has arrived + KeepAliveTimeout time.Duration + + // MaxFrameSize is used to control the maximum + // frame size to sent to the remote + MaxFrameSize int + + // MaxReceiveBuffer is used to control the maximum + // number of data in the buffer pool + MaxReceiveBuffer int +} + +// DefaultConfig is used to return a default configuration +func DefaultConfig() *Config { + return &Config{ + KeepAliveInterval: 10 * time.Second, + KeepAliveTimeout: 30 * time.Second, + MaxFrameSize: 4096, + MaxReceiveBuffer: 4194304, + } +} + +// VerifyConfig is used to verify the sanity of configuration +func VerifyConfig(config *Config) error { + if config.KeepAliveInterval == 0 { + return errors.New("keep-alive interval must be positive") + } + if config.KeepAliveTimeout < config.KeepAliveInterval { + return fmt.Errorf("keep-alive timeout must be larger than keep-alive interval") + } + if config.MaxFrameSize <= 0 { + return errors.New("max frame size must be positive") + } + if config.MaxFrameSize > 65535 { + return errors.New("max frame size must not be larger than 65535") + } + if config.MaxReceiveBuffer <= 0 { + return errors.New("max receive buffer must be positive") + } + return nil +} + +// Server is used to initialize a new server-side connection. +func Server(conn io.ReadWriteCloser, config *Config) (*Session, error) { + if config == nil { + config = DefaultConfig() + } + if err := VerifyConfig(config); err != nil { + return nil, err + } + return newSession(config, conn, false), nil +} + +// Client is used to initialize a new client-side connection. +func Client(conn io.ReadWriteCloser, config *Config) (*Session, error) { + if config == nil { + config = DefaultConfig() + } + + if err := VerifyConfig(config); err != nil { + return nil, err + } + return newSession(config, conn, true), nil +} diff --git a/vendor/ b/vendor/ new file mode 100644 index 000000000..1a2cf25e6 --- /dev/null +++ b/vendor/ @@ -0,0 +1,335 @@ +package smux + +import ( + "encoding/binary" + "io" + "sync" + "sync/atomic" + "time" + + "" +) + +const ( + defaultAcceptBacklog = 1024 +) + +const ( + errBrokenPipe = "broken pipe" + errInvalidProtocol = "invalid protocol version" +) + +type writeRequest struct { + frame Frame + result chan writeResult +} + +type writeResult struct { + n int + err error +} + +// Session defines a multiplexed connection for streams +type Session struct { + conn io.ReadWriteCloser + + config *Config + nextStreamID uint32 // next stream identifier + + bucket int32 // token bucket + bucketCond *sync.Cond // used for waiting for tokens + + streams map[uint32]*Stream // all streams in this session + streamLock sync.Mutex // locks streams + + die chan struct{} // flag session has died + dieLock sync.Mutex + chAccepts chan *Stream + + dataReady int32 // flag data has arrived + + deadline atomic.Value + + writes chan writeRequest +} + +func newSession(config *Config, conn io.ReadWriteCloser, client bool) *Session { + s := new(Session) + s.die = make(chan struct{}) + s.conn = conn + s.config = config + s.streams = make(map[uint32]*Stream) + s.chAccepts = make(chan *Stream, defaultAcceptBacklog) + s.bucket = int32(config.MaxReceiveBuffer) + s.bucketCond = sync.NewCond(&sync.Mutex{}) + s.writes = make(chan writeRequest) + + if client { + s.nextStreamID = 1 + } else { + s.nextStreamID = 2 + } + go s.recvLoop() + go s.sendLoop() + go s.keepalive() + return s +} + +// OpenStream is used to create a new stream +func (s *Session) OpenStream() (*Stream, error) { + if s.IsClosed() { + return nil, errors.New(errBrokenPipe) + } + + sid := atomic.AddUint32(&s.nextStreamID, 2) + stream := newStream(sid, s.config.MaxFrameSize, s) + + if _, err := s.writeFrame(newFrame(cmdSYN, sid)); err != nil { + return nil, errors.Wrap(err, "writeFrame") + } + + s.streamLock.Lock() + s.streams[sid] = stream + s.streamLock.Unlock() + return stream, nil +} + +// AcceptStream is used to block until the next available stream +// is ready to be accepted. +func (s *Session) AcceptStream() (*Stream, error) { + var deadline <-chan time.Time + if d, ok := s.deadline.Load().(time.Time); ok && !d.IsZero() { + timer := time.NewTimer(d.Sub(time.Now())) + defer timer.Stop() + deadline = timer.C + } + select { + case stream := <-s.chAccepts: + return stream, nil + case <-deadline: + return nil, errTimeout + case <-s.die: + return nil, errors.New(errBrokenPipe) + } +} + +// Close is used to close the session and all streams. +func (s *Session) Close() (err error) { + s.dieLock.Lock() + + select { + case <-s.die: + s.dieLock.Unlock() + return errors.New(errBrokenPipe) + default: + close(s.die) + s.dieLock.Unlock() + s.streamLock.Lock() + for k := range s.streams { + s.streams[k].sessionClose() + } + s.streamLock.Unlock() + s.bucketCond.Signal() + return s.conn.Close() + } +} + +// IsClosed does a safe check to see if we have shutdown +func (s *Session) IsClosed() bool { + select { + case <-s.die: + return true + default: + return false + } +} + +// NumStreams returns the number of currently open streams +func (s *Session) NumStreams() int { + if s.IsClosed() { + return 0 + } + s.streamLock.Lock() + defer s.streamLock.Unlock() + return len(s.streams) +} + +// SetDeadline sets a deadline used by Accept* calls. +// A zero time value disables the deadline. +func (s *Session) SetDeadline(t time.Time) error { + s.deadline.Store(t) + return nil +} + +// notify the session that a stream has closed +func (s *Session) streamClosed(sid uint32) { + s.streamLock.Lock() + if n := s.streams[sid].recycleTokens(); n > 0 { // return remaining tokens to the bucket + if atomic.AddInt32(&s.bucket, int32(n)) > 0 { + s.bucketCond.Signal() + } + } + delete(s.streams, sid) + s.streamLock.Unlock() +} + +// returnTokens is called by stream to return token after read +func (s *Session) returnTokens(n int) { + oldvalue := atomic.LoadInt32(&s.bucket) + newvalue := atomic.AddInt32(&s.bucket, int32(n)) + if oldvalue <= 0 && newvalue > 0 { + s.bucketCond.Signal() + } + +} + +// session read a frame from underlying connection +// it's data is pointed to the input buffer +func (s *Session) readFrame(buffer []byte) (f Frame, err error) { + if _, err := io.ReadFull(s.conn, buffer[:headerSize]); err != nil { + return f, errors.Wrap(err, "readFrame") + } + + dec := rawHeader(buffer) + if dec.Version() != version { + return f, errors.New(errInvalidProtocol) + } + + f.ver = dec.Version() + f.cmd = dec.Cmd() + f.sid = dec.StreamID() + if length := dec.Length(); length > 0 { + if _, err := io.ReadFull(s.conn, buffer[headerSize:headerSize+length]); err != nil { + return f, errors.Wrap(err, "readFrame") + } + = buffer[headerSize : headerSize+length] + } + return f, nil +} + +// recvLoop keeps on reading from underlying connection if tokens are available +func (s *Session) recvLoop() { + buffer := make([]byte, (1<<16)+headerSize) + for { + s.bucketCond.L.Lock() + for atomic.LoadInt32(&s.bucket) <= 0 && !s.IsClosed() { + s.bucketCond.Wait() + } + s.bucketCond.L.Unlock() + + if s.IsClosed() { + return + } + + if f, err := s.readFrame(buffer); err == nil { + atomic.StoreInt32(&s.dataReady, 1) + + switch f.cmd { + case cmdNOP: + case cmdSYN: + s.streamLock.Lock() + if _, ok := s.streams[f.sid]; !ok { + stream := newStream(f.sid, s.config.MaxFrameSize, s) + s.streams[f.sid] = stream + select { + case s.chAccepts <- stream: + case <-s.die: + } + } + s.streamLock.Unlock() + case cmdFIN: + s.streamLock.Lock() + if stream, ok := s.streams[f.sid]; ok { + stream.markRST() + stream.notifyReadEvent() + } + s.streamLock.Unlock() + case cmdPSH: + s.streamLock.Lock() + if stream, ok := s.streams[f.sid]; ok { + atomic.AddInt32(&s.bucket, -int32(len( + stream.pushBytes( + stream.notifyReadEvent() + } + s.streamLock.Unlock() + default: + s.Close() + return + } + } else { + s.Close() + return + } + } +} + +func (s *Session) keepalive() { + tickerPing := time.NewTicker(s.config.KeepAliveInterval) + tickerTimeout := time.NewTicker(s.config.KeepAliveTimeout) + defer tickerPing.Stop() + defer tickerTimeout.Stop() + for { + select { + case <-tickerPing.C: + s.writeFrame(newFrame(cmdNOP, 0)) + s.bucketCond.Signal() // force a signal to the recvLoop + case <-tickerTimeout.C: + if !atomic.CompareAndSwapInt32(&s.dataReady, 1, 0) { + s.Close() + return + } + case <-s.die: + return + } + } +} + +func (s *Session) sendLoop() { + buf := make([]byte, (1<<16)+headerSize) + for { + select { + case <-s.die: + return + case request, ok := <-s.writes: + if !ok { + continue + } + buf[0] = request.frame.ver + buf[1] = request.frame.cmd + binary.LittleEndian.PutUint16(buf[2:], uint16(len( + binary.LittleEndian.PutUint32(buf[4:], request.frame.sid) + copy(buf[headerSize:], + n, err := s.conn.Write(buf[:headerSize+len(]) + + n -= headerSize + if n < 0 { + n = 0 + } + + result := writeResult{ + n: n, + err: err, + } + + request.result <- result + close(request.result) + } + } +} + +// writeFrame writes the frame to the underlying connection +// and returns the number of bytes written if successful +func (s *Session) writeFrame(f Frame) (n int, err error) { + req := writeRequest{ + frame: f, + result: make(chan writeResult, 1), + } + select { + case <-s.die: + return 0, errors.New(errBrokenPipe) + case s.writes <- req: + } + + result := <-req.result + return result.n, result.err +} diff --git a/vendor/ b/vendor/ new file mode 100644 index 000000000..cf61b451b --- /dev/null +++ b/vendor/ @@ -0,0 +1,261 @@ +package smux + +import ( + "bytes" + "io" + "net" + "sync" + "sync/atomic" + "time" + + "" +) + +// Stream implements net.Conn +type Stream struct { + id uint32 + rstflag int32 + sess *Session + buffer bytes.Buffer + bufferLock sync.Mutex + frameSize int + chReadEvent chan struct{} // notify a read event + die chan struct{} // flag the stream has closed + dieLock sync.Mutex + readDeadline atomic.Value + writeDeadline atomic.Value +} + +// newStream initiates a Stream struct +func newStream(id uint32, frameSize int, sess *Session) *Stream { + s := new(Stream) + = id + s.chReadEvent = make(chan struct{}, 1) + s.frameSize = frameSize + s.sess = sess + s.die = make(chan struct{}) + return s +} + +// ID returns the unique stream ID. +func (s *Stream) ID() uint32 { + return +} + +// Read implements net.Conn +func (s *Stream) Read(b []byte) (n int, err error) { + var deadline <-chan time.Time + if d, ok := s.readDeadline.Load().(time.Time); ok && !d.IsZero() { + timer := time.NewTimer(d.Sub(time.Now())) + defer timer.Stop() + deadline = timer.C + } + +READ: + select { + case <-s.die: + return 0, errors.New(errBrokenPipe) + case <-deadline: + return n, errTimeout + default: + } + + s.bufferLock.Lock() + n, err = s.buffer.Read(b) + s.bufferLock.Unlock() + + if n > 0 { + s.sess.returnTokens(n) + return n, nil + } else if atomic.LoadInt32(&s.rstflag) == 1 { + _ = s.Close() + return 0, io.EOF + } + + select { + case <-s.chReadEvent: + goto READ + case <-deadline: + return n, errTimeout + case <-s.die: + return 0, errors.New(errBrokenPipe) + } +} + +// Write implements net.Conn +func (s *Stream) Write(b []byte) (n int, err error) { + var deadline <-chan time.Time + if d, ok := s.writeDeadline.Load().(time.Time); ok && !d.IsZero() { + timer := time.NewTimer(d.Sub(time.Now())) + defer timer.Stop() + deadline = timer.C + } + + select { + case <-s.die: + return 0, errors.New(errBrokenPipe) + default: + } + + frames := s.split(b, cmdPSH, + sent := 0 + for k := range frames { + req := writeRequest{ + frame: frames[k], + result: make(chan writeResult, 1), + } + + select { + case s.sess.writes <- req: + case <-s.die: + return sent, errors.New(errBrokenPipe) + case <-deadline: + return sent, errTimeout + } + + select { + case result := <-req.result: + sent += result.n + if result.err != nil { + return sent, result.err + } + case <-s.die: + return sent, errors.New(errBrokenPipe) + case <-deadline: + return sent, errTimeout + } + } + return sent, nil +} + +// Close implements net.Conn +func (s *Stream) Close() error { + s.dieLock.Lock() + + select { + case <-s.die: + s.dieLock.Unlock() + return errors.New(errBrokenPipe) + default: + close(s.die) + s.dieLock.Unlock() + s.sess.streamClosed( + _, err := s.sess.writeFrame(newFrame(cmdFIN, + return err + } +} + +// SetReadDeadline sets the read deadline as defined by +// net.Conn.SetReadDeadline. +// A zero time value disables the deadline. +func (s *Stream) SetReadDeadline(t time.Time) error { + s.readDeadline.Store(t) + return nil +} + +// SetWriteDeadline sets the write deadline as defined by +// net.Conn.SetWriteDeadline. +// A zero time value disables the deadline. +func (s *Stream) SetWriteDeadline(t time.Time) error { + s.writeDeadline.Store(t) + return nil +} + +// SetDeadline sets both read and write deadlines as defined by +// net.Conn.SetDeadline. +// A zero time value disables the deadlines. +func (s *Stream) SetDeadline(t time.Time) error { + if err := s.SetReadDeadline(t); err != nil { + return err + } + if err := s.SetWriteDeadline(t); err != nil { + return err + } + return nil +} + +// session closes the stream +func (s *Stream) sessionClose() { + s.dieLock.Lock() + defer s.dieLock.Unlock() + + select { + case <-s.die: + default: + close(s.die) + } +} + +// LocalAddr satisfies net.Conn interface +func (s *Stream) LocalAddr() net.Addr { + if ts, ok := s.sess.conn.(interface { + LocalAddr() net.Addr + }); ok { + return ts.LocalAddr() + } + return nil +} + +// RemoteAddr satisfies net.Conn interface +func (s *Stream) RemoteAddr() net.Addr { + if ts, ok := s.sess.conn.(interface { + RemoteAddr() net.Addr + }); ok { + return ts.RemoteAddr() + } + return nil +} + +// pushBytes a slice into buffer +func (s *Stream) pushBytes(p []byte) { + s.bufferLock.Lock() + s.buffer.Write(p) + s.bufferLock.Unlock() +} + +// recycleTokens transform remaining bytes to tokens(will truncate buffer) +func (s *Stream) recycleTokens() (n int) { + s.bufferLock.Lock() + n = s.buffer.Len() + s.buffer.Reset() + s.bufferLock.Unlock() + return +} + +// split large byte buffer into smaller frames, reference only +func (s *Stream) split(bts []byte, cmd byte, sid uint32) []Frame { + var frames []Frame + for len(bts) > s.frameSize { + frame := newFrame(cmd, sid) + = bts[:s.frameSize] + bts = bts[s.frameSize:] + frames = append(frames, frame) + } + if len(bts) > 0 { + frame := newFrame(cmd, sid) + = bts + frames = append(frames, frame) + } + return frames +} + +// notify read event +func (s *Stream) notifyReadEvent() { + select { + case s.chReadEvent <- struct{}{}: + default: + } +} + +// mark this stream has been reset +func (s *Stream) markRST() { + atomic.StoreInt32(&s.rstflag, 1) +} + +var errTimeout error = &timeoutError{} + +type timeoutError struct{} + +func (e *timeoutError) Error() string { return "i/o timeout" } +func (e *timeoutError) Timeout() bool { return true } +func (e *timeoutError) Temporary() bool { return true } diff --git a/vendor/manifest b/vendor/manifest index 4805ea189..c61edb430 100644 --- a/vendor/manifest +++ b/vendor/manifest @@ -199,14 +199,6 @@ "revision": "5f1c01d9f64b941dd9582c638279d046eda6ca31", "branch": "master" }, - { - "importpath": "", - "repository": "", - "vcs": "git", - "revision": "d1caa6c97c9fc1cc9e83bbe34d0603f9ff0ce8bd", - "branch": "master", - "notests": true - }, { "importpath": "", "repository": "", @@ -354,7 +346,15 @@ "importpath": "", "repository": "", "vcs": "git", - "revision": "0ca962cb10f29ee0735ff7dec69ec7283af47f65", + "revision": "f918e6d43cb5e8398d91e1767ec61bed7b7b4d49", + "branch": "master", + "notests": true + }, + { + "importpath": "", + "repository": "", + "vcs": "git", + "revision": "bfc89bc3f7f7791e35a10b24496cc7454a9b4a64", "branch": "master", "notests": true },