mirror of
https://github.com/octoleo/syncthing.git
synced 2024-11-10 15:20:56 +00:00
c8b6e6fd9b
Upgrade FileInfo up to 10000000 blocks. 1310 GB files can be shared. Increase limit when unmarshaling XDR. Increase the size of message.
784 lines
18 KiB
Go
784 lines
18 KiB
Go
// Copyright (C) 2014 The Protocol Authors.
|
|
|
|
package protocol
|
|
|
|
import (
|
|
"encoding/binary"
|
|
"encoding/hex"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"sync"
|
|
"time"
|
|
|
|
lz4 "github.com/bkaradzic/go-lz4"
|
|
"github.com/calmh/xdr"
|
|
)
|
|
|
|
const (
|
|
// BlockSize is the standard ata block size (128 KiB)
|
|
BlockSize = 128 << 10
|
|
|
|
// MaxMessageLen is the largest message size allowed on the wire. (512 MiB)
|
|
MaxMessageLen = 64 << 23
|
|
)
|
|
|
|
const (
|
|
messageTypeClusterConfig = 0
|
|
messageTypeIndex = 1
|
|
messageTypeRequest = 2
|
|
messageTypeResponse = 3
|
|
messageTypePing = 4
|
|
messageTypeIndexUpdate = 6
|
|
messageTypeClose = 7
|
|
)
|
|
|
|
const (
|
|
stateInitial = iota
|
|
stateReady
|
|
)
|
|
|
|
// FileInfo flags
|
|
const (
|
|
FlagDeleted uint32 = 1 << 12
|
|
FlagInvalid = 1 << 13
|
|
FlagDirectory = 1 << 14
|
|
FlagNoPermBits = 1 << 15
|
|
FlagSymlink = 1 << 16
|
|
FlagSymlinkMissingTarget = 1 << 17
|
|
|
|
FlagsAll = (1 << 18) - 1
|
|
|
|
SymlinkTypeMask = FlagDirectory | FlagSymlinkMissingTarget
|
|
)
|
|
|
|
// IndexMessage message flags (for IndexUpdate)
|
|
const (
|
|
FlagIndexTemporary uint32 = 1 << iota
|
|
)
|
|
|
|
// Request message flags
|
|
const (
|
|
FlagRequestTemporary uint32 = 1 << iota
|
|
)
|
|
|
|
// ClusterConfigMessage.Folders flags
|
|
const (
|
|
FlagFolderReadOnly uint32 = 1 << 0
|
|
FlagFolderIgnorePerms = 1 << 1
|
|
FlagFolderIgnoreDelete = 1 << 2
|
|
FlagFolderAll = 1<<3 - 1
|
|
)
|
|
|
|
// ClusterConfigMessage.Folders.Devices flags
|
|
const (
|
|
FlagShareTrusted uint32 = 1 << 0
|
|
FlagShareReadOnly = 1 << 1
|
|
FlagIntroducer = 1 << 2
|
|
FlagShareBits = 0x000000ff
|
|
)
|
|
|
|
var (
|
|
ErrClosed = errors.New("connection closed")
|
|
ErrTimeout = errors.New("read timeout")
|
|
)
|
|
|
|
// Specific variants of empty messages...
|
|
type pingMessage struct{ EmptyMessage }
|
|
|
|
type Model interface {
|
|
// An index was received from the peer device
|
|
Index(deviceID DeviceID, folder string, files []FileInfo, flags uint32, options []Option)
|
|
// An index update was received from the peer device
|
|
IndexUpdate(deviceID DeviceID, folder string, files []FileInfo, flags uint32, options []Option)
|
|
// A request was made by the peer device
|
|
Request(deviceID DeviceID, folder string, name string, offset int64, hash []byte, flags uint32, options []Option, buf []byte) error
|
|
// A cluster configuration message was received
|
|
ClusterConfig(deviceID DeviceID, config ClusterConfigMessage)
|
|
// The peer device closed the connection
|
|
Close(deviceID DeviceID, err error)
|
|
}
|
|
|
|
type Connection interface {
|
|
Start()
|
|
ID() DeviceID
|
|
Name() string
|
|
Index(folder string, files []FileInfo, flags uint32, options []Option) error
|
|
IndexUpdate(folder string, files []FileInfo, flags uint32, options []Option) error
|
|
Request(folder string, name string, offset int64, size int, hash []byte, flags uint32, options []Option) ([]byte, error)
|
|
ClusterConfig(config ClusterConfigMessage)
|
|
Statistics() Statistics
|
|
Closed() bool
|
|
}
|
|
|
|
type rawConnection struct {
|
|
id DeviceID
|
|
name string
|
|
receiver Model
|
|
|
|
cr *countingReader
|
|
cw *countingWriter
|
|
|
|
awaiting [4096]chan asyncResult
|
|
awaitingMut sync.Mutex
|
|
|
|
idxMut sync.Mutex // ensures serialization of Index calls
|
|
|
|
nextID chan int
|
|
outbox chan hdrMsg
|
|
closed chan struct{}
|
|
once sync.Once
|
|
pool sync.Pool
|
|
compression Compression
|
|
|
|
readerBuf []byte // used & reused by readMessage
|
|
}
|
|
|
|
type asyncResult struct {
|
|
val []byte
|
|
err error
|
|
}
|
|
|
|
type hdrMsg struct {
|
|
hdr header
|
|
msg encodable
|
|
done chan struct{}
|
|
}
|
|
|
|
type encodable interface {
|
|
MarshalXDRInto(m *xdr.Marshaller) error
|
|
XDRSize() int
|
|
}
|
|
|
|
type isEofer interface {
|
|
IsEOF() bool
|
|
}
|
|
|
|
const (
|
|
// PingSendInterval is how often we make sure to send a message, by
|
|
// triggering pings if necessary.
|
|
PingSendInterval = 90 * time.Second
|
|
// ReceiveTimeout is the longest we'll wait for a message from the other
|
|
// side before closing the connection.
|
|
ReceiveTimeout = 300 * time.Second
|
|
)
|
|
|
|
func NewConnection(deviceID DeviceID, reader io.Reader, writer io.Writer, receiver Model, name string, compress Compression) Connection {
|
|
cr := &countingReader{Reader: reader}
|
|
cw := &countingWriter{Writer: writer}
|
|
|
|
c := rawConnection{
|
|
id: deviceID,
|
|
name: name,
|
|
receiver: nativeModel{receiver},
|
|
cr: cr,
|
|
cw: cw,
|
|
outbox: make(chan hdrMsg),
|
|
nextID: make(chan int),
|
|
closed: make(chan struct{}),
|
|
pool: sync.Pool{
|
|
New: func() interface{} {
|
|
return make([]byte, BlockSize)
|
|
},
|
|
},
|
|
compression: compress,
|
|
}
|
|
|
|
return wireFormatConnection{&c}
|
|
}
|
|
|
|
// Start creates the goroutines for sending and receiving of messages. It must
|
|
// be called exactly once after creating a connection.
|
|
func (c *rawConnection) Start() {
|
|
go c.readerLoop()
|
|
go c.writerLoop()
|
|
go c.pingSender()
|
|
go c.pingReceiver()
|
|
go c.idGenerator()
|
|
}
|
|
|
|
func (c *rawConnection) ID() DeviceID {
|
|
return c.id
|
|
}
|
|
|
|
func (c *rawConnection) Name() string {
|
|
return c.name
|
|
}
|
|
|
|
// Index writes the list of file information to the connected peer device
|
|
func (c *rawConnection) Index(folder string, idx []FileInfo, flags uint32, options []Option) error {
|
|
select {
|
|
case <-c.closed:
|
|
return ErrClosed
|
|
default:
|
|
}
|
|
c.idxMut.Lock()
|
|
c.send(-1, messageTypeIndex, IndexMessage{
|
|
Folder: folder,
|
|
Files: idx,
|
|
Flags: flags,
|
|
Options: options,
|
|
}, nil)
|
|
c.idxMut.Unlock()
|
|
return nil
|
|
}
|
|
|
|
// IndexUpdate writes the list of file information to the connected peer device as an update
|
|
func (c *rawConnection) IndexUpdate(folder string, idx []FileInfo, flags uint32, options []Option) error {
|
|
select {
|
|
case <-c.closed:
|
|
return ErrClosed
|
|
default:
|
|
}
|
|
c.idxMut.Lock()
|
|
c.send(-1, messageTypeIndexUpdate, IndexMessage{
|
|
Folder: folder,
|
|
Files: idx,
|
|
Flags: flags,
|
|
Options: options,
|
|
}, nil)
|
|
c.idxMut.Unlock()
|
|
return nil
|
|
}
|
|
|
|
// Request returns the bytes for the specified block after fetching them from the connected peer.
|
|
func (c *rawConnection) Request(folder string, name string, offset int64, size int, hash []byte, flags uint32, options []Option) ([]byte, error) {
|
|
var id int
|
|
select {
|
|
case id = <-c.nextID:
|
|
case <-c.closed:
|
|
return nil, ErrClosed
|
|
}
|
|
|
|
c.awaitingMut.Lock()
|
|
if ch := c.awaiting[id]; ch != nil {
|
|
panic("id taken")
|
|
}
|
|
rc := make(chan asyncResult, 1)
|
|
c.awaiting[id] = rc
|
|
c.awaitingMut.Unlock()
|
|
|
|
ok := c.send(id, messageTypeRequest, RequestMessage{
|
|
Folder: folder,
|
|
Name: name,
|
|
Offset: offset,
|
|
Size: int32(size),
|
|
Hash: hash,
|
|
Flags: flags,
|
|
Options: options,
|
|
}, nil)
|
|
if !ok {
|
|
return nil, ErrClosed
|
|
}
|
|
|
|
res, ok := <-rc
|
|
if !ok {
|
|
return nil, ErrClosed
|
|
}
|
|
return res.val, res.err
|
|
}
|
|
|
|
// ClusterConfig send the cluster configuration message to the peer and returns any error
|
|
func (c *rawConnection) ClusterConfig(config ClusterConfigMessage) {
|
|
c.send(-1, messageTypeClusterConfig, config, nil)
|
|
}
|
|
|
|
func (c *rawConnection) Closed() bool {
|
|
select {
|
|
case <-c.closed:
|
|
return true
|
|
default:
|
|
return false
|
|
}
|
|
}
|
|
|
|
func (c *rawConnection) ping() bool {
|
|
var id int
|
|
select {
|
|
case id = <-c.nextID:
|
|
case <-c.closed:
|
|
return false
|
|
}
|
|
|
|
return c.send(id, messageTypePing, nil, nil)
|
|
}
|
|
|
|
func (c *rawConnection) readerLoop() (err error) {
|
|
defer func() {
|
|
c.close(err)
|
|
}()
|
|
|
|
state := stateInitial
|
|
for {
|
|
select {
|
|
case <-c.closed:
|
|
return ErrClosed
|
|
default:
|
|
}
|
|
|
|
hdr, msg, err := c.readMessage()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
switch msg := msg.(type) {
|
|
case ClusterConfigMessage:
|
|
if state != stateInitial {
|
|
return fmt.Errorf("protocol error: cluster config message in state %d", state)
|
|
}
|
|
go c.receiver.ClusterConfig(c.id, msg)
|
|
state = stateReady
|
|
|
|
case IndexMessage:
|
|
switch hdr.msgType {
|
|
case messageTypeIndex:
|
|
if state != stateReady {
|
|
return fmt.Errorf("protocol error: index message in state %d", state)
|
|
}
|
|
c.handleIndex(msg)
|
|
state = stateReady
|
|
|
|
case messageTypeIndexUpdate:
|
|
if state != stateReady {
|
|
return fmt.Errorf("protocol error: index update message in state %d", state)
|
|
}
|
|
c.handleIndexUpdate(msg)
|
|
state = stateReady
|
|
}
|
|
|
|
case RequestMessage:
|
|
if state != stateReady {
|
|
return fmt.Errorf("protocol error: request message in state %d", state)
|
|
}
|
|
// Requests are handled asynchronously
|
|
go c.handleRequest(hdr.msgID, msg)
|
|
|
|
case ResponseMessage:
|
|
if state != stateReady {
|
|
return fmt.Errorf("protocol error: response message in state %d", state)
|
|
}
|
|
c.handleResponse(hdr.msgID, msg)
|
|
|
|
case pingMessage:
|
|
if state != stateReady {
|
|
return fmt.Errorf("protocol error: ping message in state %d", state)
|
|
}
|
|
// Nothing
|
|
|
|
case CloseMessage:
|
|
return errors.New(msg.Reason)
|
|
|
|
default:
|
|
return fmt.Errorf("protocol error: %s: unknown message type %#x", c.id, hdr.msgType)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (c *rawConnection) readMessage() (hdr header, msg encodable, err error) {
|
|
hdrBuf := make([]byte, 8)
|
|
_, err = io.ReadFull(c.cr, hdrBuf)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
hdr = decodeHeader(binary.BigEndian.Uint32(hdrBuf[:4]))
|
|
msglen := int(binary.BigEndian.Uint32(hdrBuf[4:]))
|
|
|
|
l.Debugf("read header %v (msglen=%d)", hdr, msglen)
|
|
|
|
if msglen > MaxMessageLen {
|
|
err = fmt.Errorf("message length %d exceeds maximum %d", msglen, MaxMessageLen)
|
|
return
|
|
}
|
|
|
|
if hdr.version != 0 {
|
|
err = fmt.Errorf("unknown protocol version 0x%x", hdr.version)
|
|
return
|
|
}
|
|
|
|
// c.readerBuf contains a buffer we can reuse. But once we've unmarshalled
|
|
// a message from the buffer we can't reuse it again as the unmarshalled
|
|
// message refers to the contents of the buffer. The only case we a buffer
|
|
// ends up in readerBuf for reuse is when the message is compressed, as we
|
|
// then decompress into a new buffer instead.
|
|
|
|
var msgBuf []byte
|
|
if cap(c.readerBuf) >= msglen {
|
|
// If we have a buffer ready in rdbuf we just use that.
|
|
msgBuf = c.readerBuf[:msglen]
|
|
} else {
|
|
// Otherwise we allocate a new buffer.
|
|
msgBuf = make([]byte, msglen)
|
|
}
|
|
|
|
_, err = io.ReadFull(c.cr, msgBuf)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
l.Debugf("read %d bytes", len(msgBuf))
|
|
|
|
if hdr.compression && msglen > 0 {
|
|
// We're going to decompress msgBuf into a different newly allocated
|
|
// buffer, so keep msgBuf around for reuse on the next message.
|
|
c.readerBuf = msgBuf
|
|
|
|
msgBuf, err = lz4.Decode(nil, msgBuf)
|
|
if err != nil {
|
|
return
|
|
}
|
|
l.Debugf("decompressed to %d bytes", len(msgBuf))
|
|
} else {
|
|
c.readerBuf = nil
|
|
}
|
|
|
|
if shouldDebug() {
|
|
if len(msgBuf) > 1024 {
|
|
l.Debugf("message data:\n%s", hex.Dump(msgBuf[:1024]))
|
|
} else {
|
|
l.Debugf("message data:\n%s", hex.Dump(msgBuf))
|
|
}
|
|
}
|
|
|
|
switch hdr.msgType {
|
|
case messageTypeIndex, messageTypeIndexUpdate:
|
|
var idx IndexMessage
|
|
err = idx.UnmarshalXDR(msgBuf)
|
|
msg = idx
|
|
|
|
case messageTypeRequest:
|
|
var req RequestMessage
|
|
err = req.UnmarshalXDR(msgBuf)
|
|
msg = req
|
|
|
|
case messageTypeResponse:
|
|
var resp ResponseMessage
|
|
err = resp.UnmarshalXDR(msgBuf)
|
|
msg = resp
|
|
|
|
case messageTypePing:
|
|
msg = pingMessage{}
|
|
|
|
case messageTypeClusterConfig:
|
|
var cc ClusterConfigMessage
|
|
err = cc.UnmarshalXDR(msgBuf)
|
|
msg = cc
|
|
|
|
case messageTypeClose:
|
|
var cm CloseMessage
|
|
err = cm.UnmarshalXDR(msgBuf)
|
|
msg = cm
|
|
|
|
default:
|
|
err = fmt.Errorf("protocol error: %s: unknown message type %#x", c.id, hdr.msgType)
|
|
}
|
|
|
|
// We check the returned error for the XDRError.IsEOF() method.
|
|
// IsEOF()==true here means that the message contained fewer fields than
|
|
// expected. It does not signify an EOF on the socket, because we've
|
|
// successfully read a size value and then that many bytes from the wire.
|
|
// New fields we expected but the other peer didn't send should be
|
|
// interpreted as zero/nil, and if that's not valid we'll verify it
|
|
// somewhere else.
|
|
if xdrErr, ok := err.(isEofer); ok && xdrErr.IsEOF() {
|
|
err = nil
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
func (c *rawConnection) handleIndex(im IndexMessage) {
|
|
l.Debugf("Index(%v, %v, %d file, flags %x, opts: %s)", c.id, im.Folder, len(im.Files), im.Flags, im.Options)
|
|
c.receiver.Index(c.id, im.Folder, filterIndexMessageFiles(im.Files), im.Flags, im.Options)
|
|
}
|
|
|
|
func (c *rawConnection) handleIndexUpdate(im IndexMessage) {
|
|
l.Debugf("queueing IndexUpdate(%v, %v, %d files, flags %x, opts: %s)", c.id, im.Folder, len(im.Files), im.Flags, im.Options)
|
|
c.receiver.IndexUpdate(c.id, im.Folder, filterIndexMessageFiles(im.Files), im.Flags, im.Options)
|
|
}
|
|
|
|
func filterIndexMessageFiles(fs []FileInfo) []FileInfo {
|
|
var out []FileInfo
|
|
for i, f := range fs {
|
|
switch f.Name {
|
|
case "", ".", "..", "/": // A few obviously invalid filenames
|
|
l.Infof("Dropping invalid filename %q from incoming index", f.Name)
|
|
if out == nil {
|
|
// Most incoming updates won't contain anything invalid, so we
|
|
// delay the allocation and copy to output slice until we
|
|
// really need to do it, then copy all the so var valid files
|
|
// to it.
|
|
out = make([]FileInfo, i, len(fs)-1)
|
|
copy(out, fs)
|
|
}
|
|
default:
|
|
if out != nil {
|
|
out = append(out, f)
|
|
}
|
|
}
|
|
}
|
|
if out != nil {
|
|
return out
|
|
}
|
|
return fs
|
|
}
|
|
|
|
func (c *rawConnection) handleRequest(msgID int, req RequestMessage) {
|
|
size := int(req.Size)
|
|
usePool := size <= BlockSize
|
|
|
|
var buf []byte
|
|
var done chan struct{}
|
|
|
|
if usePool {
|
|
buf = c.pool.Get().([]byte)[:size]
|
|
done = make(chan struct{})
|
|
} else {
|
|
buf = make([]byte, size)
|
|
}
|
|
|
|
err := c.receiver.Request(c.id, req.Folder, req.Name, int64(req.Offset), req.Hash, req.Flags, req.Options, buf)
|
|
if err != nil {
|
|
c.send(msgID, messageTypeResponse, ResponseMessage{
|
|
Data: nil,
|
|
Code: errorToCode(err),
|
|
}, done)
|
|
} else {
|
|
c.send(msgID, messageTypeResponse, ResponseMessage{
|
|
Data: buf,
|
|
Code: errorToCode(err),
|
|
}, done)
|
|
}
|
|
|
|
if usePool {
|
|
<-done
|
|
c.pool.Put(buf)
|
|
}
|
|
}
|
|
|
|
func (c *rawConnection) handleResponse(msgID int, resp ResponseMessage) {
|
|
c.awaitingMut.Lock()
|
|
if rc := c.awaiting[msgID]; rc != nil {
|
|
c.awaiting[msgID] = nil
|
|
rc <- asyncResult{resp.Data, codeToError(resp.Code)}
|
|
close(rc)
|
|
}
|
|
c.awaitingMut.Unlock()
|
|
}
|
|
|
|
func (c *rawConnection) handlePong(msgID int) {
|
|
c.awaitingMut.Lock()
|
|
if rc := c.awaiting[msgID]; rc != nil {
|
|
c.awaiting[msgID] = nil
|
|
rc <- asyncResult{}
|
|
close(rc)
|
|
}
|
|
c.awaitingMut.Unlock()
|
|
}
|
|
|
|
func (c *rawConnection) send(msgID int, msgType int, msg encodable, done chan struct{}) bool {
|
|
if msgID < 0 {
|
|
select {
|
|
case id := <-c.nextID:
|
|
msgID = id
|
|
case <-c.closed:
|
|
return false
|
|
}
|
|
}
|
|
|
|
hdr := header{
|
|
version: 0,
|
|
msgID: msgID,
|
|
msgType: msgType,
|
|
}
|
|
|
|
select {
|
|
case c.outbox <- hdrMsg{hdr, msg, done}:
|
|
return true
|
|
case <-c.closed:
|
|
return false
|
|
}
|
|
}
|
|
|
|
func (c *rawConnection) writerLoop() {
|
|
var msgBuf = make([]byte, 8) // buffer for wire format message, kept and reused
|
|
var uncBuf []byte // buffer for uncompressed message, kept and reused
|
|
for {
|
|
var tempBuf []byte
|
|
var err error
|
|
|
|
select {
|
|
case hm := <-c.outbox:
|
|
if hm.msg != nil {
|
|
// Uncompressed message in uncBuf
|
|
msgLen := hm.msg.XDRSize()
|
|
if cap(uncBuf) >= msgLen {
|
|
uncBuf = uncBuf[:msgLen]
|
|
} else {
|
|
uncBuf = make([]byte, msgLen)
|
|
}
|
|
m := &xdr.Marshaller{Data: uncBuf}
|
|
err = hm.msg.MarshalXDRInto(m)
|
|
if hm.done != nil {
|
|
close(hm.done)
|
|
}
|
|
if err != nil {
|
|
c.close(err)
|
|
return
|
|
}
|
|
|
|
compress := false
|
|
switch c.compression {
|
|
case CompressAlways:
|
|
compress = true
|
|
case CompressMetadata:
|
|
compress = hm.hdr.msgType != messageTypeResponse
|
|
}
|
|
|
|
if compress && len(uncBuf) >= compressionThreshold {
|
|
// Use compression for large messages
|
|
hm.hdr.compression = true
|
|
|
|
// Make sure we have enough space for the compressed message plus header in msgBug
|
|
msgBuf = msgBuf[:cap(msgBuf)]
|
|
if maxLen := lz4.CompressBound(len(uncBuf)) + 8; maxLen > len(msgBuf) {
|
|
msgBuf = make([]byte, maxLen)
|
|
}
|
|
|
|
// Compressed is written to msgBuf, we keep tb for the length only
|
|
tempBuf, err = lz4.Encode(msgBuf[8:], uncBuf)
|
|
binary.BigEndian.PutUint32(msgBuf[4:8], uint32(len(tempBuf)))
|
|
msgBuf = msgBuf[0 : len(tempBuf)+8]
|
|
|
|
l.Debugf("write compressed message; %v (len=%d)", hm.hdr, len(tempBuf))
|
|
} else {
|
|
// No point in compressing very short messages
|
|
hm.hdr.compression = false
|
|
|
|
msgBuf = msgBuf[:cap(msgBuf)]
|
|
if l := len(uncBuf) + 8; l > len(msgBuf) {
|
|
msgBuf = make([]byte, l)
|
|
}
|
|
|
|
binary.BigEndian.PutUint32(msgBuf[4:8], uint32(len(uncBuf)))
|
|
msgBuf = msgBuf[0 : len(uncBuf)+8]
|
|
copy(msgBuf[8:], uncBuf)
|
|
|
|
l.Debugf("write uncompressed message; %v (len=%d)", hm.hdr, len(uncBuf))
|
|
}
|
|
} else {
|
|
l.Debugf("write empty message; %v", hm.hdr)
|
|
binary.BigEndian.PutUint32(msgBuf[4:8], 0)
|
|
msgBuf = msgBuf[:8]
|
|
}
|
|
|
|
binary.BigEndian.PutUint32(msgBuf[0:4], encodeHeader(hm.hdr))
|
|
|
|
if err == nil {
|
|
var n int
|
|
n, err = c.cw.Write(msgBuf)
|
|
l.Debugf("wrote %d bytes on the wire", n)
|
|
}
|
|
if err != nil {
|
|
c.close(err)
|
|
return
|
|
}
|
|
case <-c.closed:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (c *rawConnection) close(err error) {
|
|
c.once.Do(func() {
|
|
l.Debugln("close due to", err)
|
|
close(c.closed)
|
|
|
|
c.awaitingMut.Lock()
|
|
for i, ch := range c.awaiting {
|
|
if ch != nil {
|
|
close(ch)
|
|
c.awaiting[i] = nil
|
|
}
|
|
}
|
|
c.awaitingMut.Unlock()
|
|
|
|
go c.receiver.Close(c.id, err)
|
|
})
|
|
}
|
|
|
|
func (c *rawConnection) idGenerator() {
|
|
nextID := 0
|
|
for {
|
|
nextID = (nextID + 1) & 0xfff
|
|
select {
|
|
case c.nextID <- nextID:
|
|
case <-c.closed:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// The pingSender makes sure that we've sent a message within the last
|
|
// PingSendInterval. If we already have something sent in the last
|
|
// PingSendInterval/2, we do nothing. Otherwise we send a ping message. This
|
|
// results in an effecting ping interval of somewhere between
|
|
// PingSendInterval/2 and PingSendInterval.
|
|
func (c *rawConnection) pingSender() {
|
|
ticker := time.Tick(PingSendInterval / 2)
|
|
|
|
for {
|
|
select {
|
|
case <-ticker:
|
|
d := time.Since(c.cw.Last())
|
|
if d < PingSendInterval/2 {
|
|
l.Debugln(c.id, "ping skipped after wr", d)
|
|
continue
|
|
}
|
|
|
|
l.Debugln(c.id, "ping -> after", d)
|
|
c.ping()
|
|
|
|
case <-c.closed:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// The pingReciever checks that we've received a message (any message will do,
|
|
// but we expect pings in the absence of other messages) within the last
|
|
// ReceiveTimeout. If not, we close the connection with an ErrTimeout.
|
|
func (c *rawConnection) pingReceiver() {
|
|
ticker := time.Tick(ReceiveTimeout / 2)
|
|
|
|
for {
|
|
select {
|
|
case <-ticker:
|
|
d := time.Since(c.cr.Last())
|
|
if d > ReceiveTimeout {
|
|
l.Debugln(c.id, "ping timeout", d)
|
|
c.close(ErrTimeout)
|
|
}
|
|
|
|
l.Debugln(c.id, "last read within", d)
|
|
|
|
case <-c.closed:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
type Statistics struct {
|
|
At time.Time
|
|
InBytesTotal int64
|
|
OutBytesTotal int64
|
|
}
|
|
|
|
func (c *rawConnection) Statistics() Statistics {
|
|
return Statistics{
|
|
At: time.Now(),
|
|
InBytesTotal: c.cr.Tot(),
|
|
OutBytesTotal: c.cw.Tot(),
|
|
}
|
|
}
|