syncthing/lib/protocol/protocol.go

811 lines
20 KiB
Go
Raw Normal View History

2015-01-13 12:31:14 +00:00
// Copyright (C) 2014 The Protocol Authors.
2014-09-22 19:42:11 +00:00
package protocol
import (
"encoding/binary"
"encoding/hex"
"errors"
"fmt"
"io"
"sync"
"time"
lz4 "github.com/bkaradzic/go-lz4"
2016-02-02 11:48:09 +00:00
"github.com/calmh/xdr"
2014-09-22 19:42:11 +00:00
)
const (
2015-09-22 18:34:24 +00:00
// BlockSize is the standard ata block size (128 KiB)
BlockSize = 128 << 10
// MaxMessageLen is the largest message size allowed on the wire. (512 MiB)
MaxMessageLen = 64 << 23
2014-09-22 19:42:11 +00:00
)
const (
messageTypeClusterConfig = 0
messageTypeIndex = 1
messageTypeRequest = 2
messageTypeResponse = 3
messageTypePing = 4
messageTypeIndexUpdate = 6
messageTypeClose = 7
messageTypeDownloadProgress = 8
2014-09-22 19:42:11 +00:00
)
const (
stateInitial = iota
stateReady
2014-09-22 19:42:11 +00:00
)
// FileInfo flags
2014-09-22 19:42:11 +00:00
const (
FlagDeleted uint32 = 1 << 12 // bit 19 in MSB order with the first bit being #0
FlagInvalid = 1 << 13 // bit 18
FlagDirectory = 1 << 14 // bit 17
FlagNoPermBits = 1 << 15 // bit 16
FlagSymlink = 1 << 16 // bit 15
FlagSymlinkMissingTarget = 1 << 17 // bit 14
2015-02-12 21:59:33 +00:00
FlagsAll = (1 << 18) - 1
2015-01-23 22:25:34 +00:00
SymlinkTypeMask = FlagDirectory | FlagSymlinkMissingTarget
2014-09-22 19:42:11 +00:00
)
// Request message flags
const (
FlagFromTemporary uint32 = 1 << iota
)
// FileDownloadProgressUpdate update types
const (
UpdateTypeAppend uint32 = iota
UpdateTypeForget
)
// ClusterConfigMessage.Folders flags
const (
FlagFolderReadOnly uint32 = 1 << 0
FlagFolderIgnorePerms = 1 << 1
FlagFolderIgnoreDelete = 1 << 2
FlagFolderDisabledTempIndexes = 1 << 3
FlagFolderAll = 1<<4 - 1
)
// ClusterConfigMessage.Folders.Devices flags
2014-09-22 19:42:11 +00:00
const (
FlagShareTrusted uint32 = 1 << 0
FlagShareReadOnly = 1 << 1
FlagIntroducer = 1 << 2
FlagShareBits = 0x000000ff
)
var (
ErrClosed = errors.New("connection closed")
ErrTimeout = errors.New("read timeout")
ErrSwitchingConnections = errors.New("switching connections")
2014-09-22 19:42:11 +00:00
)
// Specific variants of empty messages...
type pingMessage struct{ EmptyMessage }
2014-09-22 19:42:11 +00:00
type Model interface {
// An index was received from the peer device
Index(deviceID DeviceID, folder string, files []FileInfo, flags uint32, options []Option)
// An index update was received from the peer device
IndexUpdate(deviceID DeviceID, folder string, files []FileInfo, flags uint32, options []Option)
// A request was made by the peer device
2015-07-29 20:23:43 +00:00
Request(deviceID DeviceID, folder string, name string, offset int64, hash []byte, flags uint32, options []Option, buf []byte) error
2014-09-22 19:42:11 +00:00
// A cluster configuration message was received
ClusterConfig(deviceID DeviceID, config ClusterConfigMessage)
// The peer device closed the connection
Close(deviceID DeviceID, err error)
// The peer device sent progress updates for the files it is currently downloading
DownloadProgress(deviceID DeviceID, folder string, updates []FileDownloadProgressUpdate, flags uint32, options []Option)
2014-09-22 19:42:11 +00:00
}
type Connection interface {
2015-07-10 06:34:54 +00:00
Start()
ID() DeviceID
2014-09-22 19:42:11 +00:00
Name() string
Index(folder string, files []FileInfo, flags uint32, options []Option) error
IndexUpdate(folder string, files []FileInfo, flags uint32, options []Option) error
Request(folder string, name string, offset int64, size int, hash []byte, fromTemporary bool) ([]byte, error)
2014-09-22 19:42:11 +00:00
ClusterConfig(config ClusterConfigMessage)
DownloadProgress(folder string, updates []FileDownloadProgressUpdate, flags uint32, options []Option)
2014-09-22 19:42:11 +00:00
Statistics() Statistics
Closed() bool
2014-09-22 19:42:11 +00:00
}
type rawConnection struct {
id DeviceID
2014-09-22 19:42:11 +00:00
name string
receiver Model
cr *countingReader
cw *countingWriter
awaiting [4096]chan asyncResult
awaitingMut sync.Mutex
idxMut sync.Mutex // ensures serialization of Index calls
2015-07-29 20:23:43 +00:00
nextID chan int
outbox chan hdrMsg
closed chan struct{}
once sync.Once
pool sync.Pool
compression Compression
2014-09-22 19:42:11 +00:00
2016-02-02 11:48:09 +00:00
readerBuf []byte // used & reused by readMessage
2014-09-22 19:42:11 +00:00
}
type asyncResult struct {
val []byte
err error
}
type hdrMsg struct {
2015-07-29 20:23:43 +00:00
hdr header
msg encodable
done chan struct{}
2014-09-22 19:42:11 +00:00
}
type encodable interface {
2016-02-02 11:48:09 +00:00
MarshalXDRInto(m *xdr.Marshaller) error
XDRSize() int
2014-09-22 19:42:11 +00:00
}
type isEofer interface {
IsEOF() bool
}
const (
2015-09-22 18:34:24 +00:00
// PingSendInterval is how often we make sure to send a message, by
// triggering pings if necessary.
PingSendInterval = 90 * time.Second
2015-09-22 18:34:24 +00:00
// ReceiveTimeout is the longest we'll wait for a message from the other
// side before closing the connection.
ReceiveTimeout = 300 * time.Second
2014-09-22 19:42:11 +00:00
)
func NewConnection(deviceID DeviceID, reader io.Reader, writer io.Writer, receiver Model, name string, compress Compression) Connection {
2014-09-22 19:42:11 +00:00
cr := &countingReader{Reader: reader}
cw := &countingWriter{Writer: writer}
c := rawConnection{
2015-07-29 20:23:43 +00:00
id: deviceID,
name: name,
receiver: nativeModel{receiver},
cr: cr,
cw: cw,
outbox: make(chan hdrMsg),
nextID: make(chan int),
closed: make(chan struct{}),
pool: sync.Pool{
New: func() interface{} {
return make([]byte, BlockSize)
},
},
compression: compress,
2014-09-22 19:42:11 +00:00
}
2015-07-10 06:34:54 +00:00
return wireFormatConnection{&c}
}
// Start creates the goroutines for sending and receiving of messages. It must
// be called exactly once after creating a connection.
func (c *rawConnection) Start() {
2014-09-22 19:42:11 +00:00
go c.readerLoop()
go c.writerLoop()
go c.pingSender()
go c.pingReceiver()
2014-09-22 19:42:11 +00:00
go c.idGenerator()
}
func (c *rawConnection) ID() DeviceID {
2014-09-22 19:42:11 +00:00
return c.id
}
func (c *rawConnection) Name() string {
return c.name
}
// Index writes the list of file information to the connected peer device
func (c *rawConnection) Index(folder string, idx []FileInfo, flags uint32, options []Option) error {
2014-09-22 19:42:11 +00:00
select {
case <-c.closed:
return ErrClosed
default:
}
c.idxMut.Lock()
c.send(-1, messageTypeIndex, IndexMessage{
Folder: folder,
Files: idx,
Flags: flags,
Options: options,
2015-07-29 20:23:43 +00:00
}, nil)
2014-09-22 19:42:11 +00:00
c.idxMut.Unlock()
return nil
}
// IndexUpdate writes the list of file information to the connected peer device as an update
func (c *rawConnection) IndexUpdate(folder string, idx []FileInfo, flags uint32, options []Option) error {
2014-09-22 19:42:11 +00:00
select {
case <-c.closed:
return ErrClosed
default:
}
c.idxMut.Lock()
c.send(-1, messageTypeIndexUpdate, IndexMessage{
Folder: folder,
Files: idx,
Flags: flags,
Options: options,
2015-07-29 20:23:43 +00:00
}, nil)
2014-09-22 19:42:11 +00:00
c.idxMut.Unlock()
return nil
}
// Request returns the bytes for the specified block after fetching them from the connected peer.
func (c *rawConnection) Request(folder string, name string, offset int64, size int, hash []byte, fromTemporary bool) ([]byte, error) {
2014-09-22 19:42:11 +00:00
var id int
select {
case id = <-c.nextID:
case <-c.closed:
return nil, ErrClosed
}
var flags uint32
if fromTemporary {
flags |= FlagFromTemporary
}
2014-09-22 19:42:11 +00:00
c.awaitingMut.Lock()
if ch := c.awaiting[id]; ch != nil {
panic("id taken")
}
rc := make(chan asyncResult, 1)
c.awaiting[id] = rc
c.awaitingMut.Unlock()
ok := c.send(id, messageTypeRequest, RequestMessage{
2015-01-24 21:56:12 +00:00
Folder: folder,
Name: name,
Offset: offset,
Size: int32(size),
Hash: hash,
Flags: flags,
Options: nil,
2015-07-29 20:23:43 +00:00
}, nil)
2014-09-22 19:42:11 +00:00
if !ok {
return nil, ErrClosed
}
res, ok := <-rc
if !ok {
return nil, ErrClosed
}
return res.val, res.err
}
// ClusterConfig send the cluster configuration message to the peer and returns any error
func (c *rawConnection) ClusterConfig(config ClusterConfigMessage) {
2015-07-29 20:23:43 +00:00
c.send(-1, messageTypeClusterConfig, config, nil)
2014-09-22 19:42:11 +00:00
}
func (c *rawConnection) Closed() bool {
select {
case <-c.closed:
return true
default:
return false
}
}
// DownloadProgress sends the progress updates for the files that are currently being downloaded.
func (c *rawConnection) DownloadProgress(folder string, updates []FileDownloadProgressUpdate, flags uint32, options []Option) {
c.send(-1, messageTypeDownloadProgress, DownloadProgressMessage{
Folder: folder,
Updates: updates,
Flags: flags,
Options: options,
}, nil)
}
2014-09-22 19:42:11 +00:00
func (c *rawConnection) ping() bool {
var id int
select {
case id = <-c.nextID:
case <-c.closed:
return false
}
return c.send(id, messageTypePing, nil, nil)
2014-09-22 19:42:11 +00:00
}
func (c *rawConnection) readerLoop() (err error) {
defer func() {
c.close(err)
}()
state := stateInitial
2014-09-22 19:42:11 +00:00
for {
select {
case <-c.closed:
return ErrClosed
default:
}
hdr, msg, err := c.readMessage()
if err != nil {
return err
}
switch msg := msg.(type) {
case ClusterConfigMessage:
if state != stateInitial {
return fmt.Errorf("protocol error: cluster config message in state %d", state)
}
go c.receiver.ClusterConfig(c.id, msg)
state = stateReady
case IndexMessage:
switch hdr.msgType {
case messageTypeIndex:
if state != stateReady {
return fmt.Errorf("protocol error: index message in state %d", state)
}
c.handleIndex(msg)
state = stateReady
case messageTypeIndexUpdate:
if state != stateReady {
return fmt.Errorf("protocol error: index update message in state %d", state)
}
c.handleIndexUpdate(msg)
state = stateReady
2014-09-22 19:42:11 +00:00
}
case RequestMessage:
if state != stateReady {
return fmt.Errorf("protocol error: request message in state %d", state)
2014-09-22 19:42:11 +00:00
}
// Requests are handled asynchronously
go c.handleRequest(hdr.msgID, msg)
2014-09-22 19:42:11 +00:00
case ResponseMessage:
if state != stateReady {
return fmt.Errorf("protocol error: response message in state %d", state)
2014-09-22 19:42:11 +00:00
}
c.handleResponse(hdr.msgID, msg)
2014-09-22 19:42:11 +00:00
case DownloadProgressMessage:
if state != stateReady {
return fmt.Errorf("protocol error: response message in state %d", state)
}
c.receiver.DownloadProgress(c.id, msg.Folder, msg.Updates, msg.Flags, msg.Options)
case pingMessage:
if state != stateReady {
return fmt.Errorf("protocol error: ping message in state %d", state)
}
// Nothing
2014-09-22 19:42:11 +00:00
case CloseMessage:
return errors.New(msg.Reason)
2014-09-22 19:42:11 +00:00
default:
return fmt.Errorf("protocol error: %s: unknown message type %#x", c.id, hdr.msgType)
}
}
}
func (c *rawConnection) readMessage() (hdr header, msg encodable, err error) {
2016-02-02 11:48:09 +00:00
hdrBuf := make([]byte, 8)
_, err = io.ReadFull(c.cr, hdrBuf)
2014-09-22 19:42:11 +00:00
if err != nil {
return
}
2016-02-02 11:48:09 +00:00
hdr = decodeHeader(binary.BigEndian.Uint32(hdrBuf[:4]))
msglen := int(binary.BigEndian.Uint32(hdrBuf[4:]))
2014-09-22 19:42:11 +00:00
Implement facility based logger, debugging via REST API This implements a new debug/trace infrastructure based on a slightly hacked up logger. Instead of the traditional "if debug { ... }" I've rewritten the logger to have no-op Debugln and Debugf, unless debugging has been enabled for a given "facility". The "facility" is just a string, typically a package name. This will be slightly slower than before; but not that much as it's mostly a function call that returns immediately. For the cases where it matters (the Debugln takes a hex.Dump() of something for example, and it's not in a very occasional "if err != nil" branch) there is an l.ShouldDebug(facility) that is fast enough to be used like the old "if debug". The point of all this is that we can now toggle debugging for the various packages on and off at runtime. There's a new method /rest/system/debug that can be POSTed a set of facilities to enable and disable debug for, or GET from to get a list of facilities with descriptions and their current debug status. Similarly a /rest/system/log?since=... can grab the latest log entries, up to 250 of them (hardcoded constant in main.go) plus the initial few. Not implemented in this commit (but planned) is a simple debug GUI available on /debug that shows the current log in an easily pasteable format and has checkboxes to enable the various debug facilities. The debug instructions to a user then becomes "visit this URL, check these boxes, reproduce your problem, copy and paste the log". The actual log viewer on the hypothetical /debug URL can poll regularly for new log entries and this bypass the 250 line limit. The existing STTRACE=foo variable is still obeyed and just sets the start state of the system.
2015-10-03 15:25:21 +00:00
l.Debugf("read header %v (msglen=%d)", hdr, msglen)
2014-09-22 19:42:11 +00:00
if msglen > MaxMessageLen {
err = fmt.Errorf("message length %d exceeds maximum %d", msglen, MaxMessageLen)
return
}
if hdr.version != 0 {
err = fmt.Errorf("unknown protocol version 0x%x", hdr.version)
return
}
2016-02-02 11:48:09 +00:00
// c.readerBuf contains a buffer we can reuse. But once we've unmarshalled
// a message from the buffer we can't reuse it again as the unmarshalled
// message refers to the contents of the buffer. The only case we a buffer
// ends up in readerBuf for reuse is when the message is compressed, as we
// then decompress into a new buffer instead.
var msgBuf []byte
if cap(c.readerBuf) >= msglen {
// If we have a buffer ready in rdbuf we just use that.
msgBuf = c.readerBuf[:msglen]
2014-09-22 19:42:11 +00:00
} else {
2016-02-02 11:48:09 +00:00
// Otherwise we allocate a new buffer.
msgBuf = make([]byte, msglen)
2014-09-22 19:42:11 +00:00
}
2016-02-02 11:48:09 +00:00
_, err = io.ReadFull(c.cr, msgBuf)
2014-09-22 19:42:11 +00:00
if err != nil {
return
}
2016-02-02 11:48:09 +00:00
l.Debugf("read %d bytes", len(msgBuf))
2014-09-22 19:42:11 +00:00
if hdr.compression && msglen > 0 {
2016-02-02 11:48:09 +00:00
// We're going to decompress msgBuf into a different newly allocated
// buffer, so keep msgBuf around for reuse on the next message.
c.readerBuf = msgBuf
msgBuf, err = lz4.Decode(nil, msgBuf)
2014-09-22 19:42:11 +00:00
if err != nil {
return
}
Implement facility based logger, debugging via REST API This implements a new debug/trace infrastructure based on a slightly hacked up logger. Instead of the traditional "if debug { ... }" I've rewritten the logger to have no-op Debugln and Debugf, unless debugging has been enabled for a given "facility". The "facility" is just a string, typically a package name. This will be slightly slower than before; but not that much as it's mostly a function call that returns immediately. For the cases where it matters (the Debugln takes a hex.Dump() of something for example, and it's not in a very occasional "if err != nil" branch) there is an l.ShouldDebug(facility) that is fast enough to be used like the old "if debug". The point of all this is that we can now toggle debugging for the various packages on and off at runtime. There's a new method /rest/system/debug that can be POSTed a set of facilities to enable and disable debug for, or GET from to get a list of facilities with descriptions and their current debug status. Similarly a /rest/system/log?since=... can grab the latest log entries, up to 250 of them (hardcoded constant in main.go) plus the initial few. Not implemented in this commit (but planned) is a simple debug GUI available on /debug that shows the current log in an easily pasteable format and has checkboxes to enable the various debug facilities. The debug instructions to a user then becomes "visit this URL, check these boxes, reproduce your problem, copy and paste the log". The actual log viewer on the hypothetical /debug URL can poll regularly for new log entries and this bypass the 250 line limit. The existing STTRACE=foo variable is still obeyed and just sets the start state of the system.
2015-10-03 15:25:21 +00:00
l.Debugf("decompressed to %d bytes", len(msgBuf))
2016-02-02 11:48:09 +00:00
} else {
c.readerBuf = nil
2014-09-22 19:42:11 +00:00
}
if shouldDebug() {
if len(msgBuf) > 1024 {
l.Debugf("message data:\n%s", hex.Dump(msgBuf[:1024]))
} else {
l.Debugf("message data:\n%s", hex.Dump(msgBuf))
}
2014-09-22 19:42:11 +00:00
}
switch hdr.msgType {
case messageTypeIndex, messageTypeIndexUpdate:
var idx IndexMessage
err = idx.UnmarshalXDR(msgBuf)
msg = idx
case messageTypeRequest:
var req RequestMessage
err = req.UnmarshalXDR(msgBuf)
msg = req
case messageTypeResponse:
var resp ResponseMessage
err = resp.UnmarshalXDR(msgBuf)
msg = resp
case messageTypePing:
msg = pingMessage{}
2014-09-22 19:42:11 +00:00
case messageTypeClusterConfig:
var cc ClusterConfigMessage
err = cc.UnmarshalXDR(msgBuf)
msg = cc
case messageTypeClose:
var cm CloseMessage
err = cm.UnmarshalXDR(msgBuf)
msg = cm
case messageTypeDownloadProgress:
var dp DownloadProgressMessage
err := dp.UnmarshalXDR(msgBuf)
if xdrErr, ok := err.(isEofer); ok && xdrErr.IsEOF() {
err = nil
}
msg = dp
2014-09-22 19:42:11 +00:00
default:
err = fmt.Errorf("protocol error: %s: unknown message type %#x", c.id, hdr.msgType)
}
// We check the returned error for the XDRError.IsEOF() method.
// IsEOF()==true here means that the message contained fewer fields than
// expected. It does not signify an EOF on the socket, because we've
// successfully read a size value and then that many bytes from the wire.
// New fields we expected but the other peer didn't send should be
// interpreted as zero/nil, and if that's not valid we'll verify it
// somewhere else.
if xdrErr, ok := err.(isEofer); ok && xdrErr.IsEOF() {
err = nil
}
2014-09-22 19:42:11 +00:00
return
}
func (c *rawConnection) handleIndex(im IndexMessage) {
Implement facility based logger, debugging via REST API This implements a new debug/trace infrastructure based on a slightly hacked up logger. Instead of the traditional "if debug { ... }" I've rewritten the logger to have no-op Debugln and Debugf, unless debugging has been enabled for a given "facility". The "facility" is just a string, typically a package name. This will be slightly slower than before; but not that much as it's mostly a function call that returns immediately. For the cases where it matters (the Debugln takes a hex.Dump() of something for example, and it's not in a very occasional "if err != nil" branch) there is an l.ShouldDebug(facility) that is fast enough to be used like the old "if debug". The point of all this is that we can now toggle debugging for the various packages on and off at runtime. There's a new method /rest/system/debug that can be POSTed a set of facilities to enable and disable debug for, or GET from to get a list of facilities with descriptions and their current debug status. Similarly a /rest/system/log?since=... can grab the latest log entries, up to 250 of them (hardcoded constant in main.go) plus the initial few. Not implemented in this commit (but planned) is a simple debug GUI available on /debug that shows the current log in an easily pasteable format and has checkboxes to enable the various debug facilities. The debug instructions to a user then becomes "visit this URL, check these boxes, reproduce your problem, copy and paste the log". The actual log viewer on the hypothetical /debug URL can poll regularly for new log entries and this bypass the 250 line limit. The existing STTRACE=foo variable is still obeyed and just sets the start state of the system.
2015-10-03 15:25:21 +00:00
l.Debugf("Index(%v, %v, %d file, flags %x, opts: %s)", c.id, im.Folder, len(im.Files), im.Flags, im.Options)
c.receiver.Index(c.id, im.Folder, filterIndexMessageFiles(im.Files), im.Flags, im.Options)
2014-09-22 19:42:11 +00:00
}
func (c *rawConnection) handleIndexUpdate(im IndexMessage) {
Implement facility based logger, debugging via REST API This implements a new debug/trace infrastructure based on a slightly hacked up logger. Instead of the traditional "if debug { ... }" I've rewritten the logger to have no-op Debugln and Debugf, unless debugging has been enabled for a given "facility". The "facility" is just a string, typically a package name. This will be slightly slower than before; but not that much as it's mostly a function call that returns immediately. For the cases where it matters (the Debugln takes a hex.Dump() of something for example, and it's not in a very occasional "if err != nil" branch) there is an l.ShouldDebug(facility) that is fast enough to be used like the old "if debug". The point of all this is that we can now toggle debugging for the various packages on and off at runtime. There's a new method /rest/system/debug that can be POSTed a set of facilities to enable and disable debug for, or GET from to get a list of facilities with descriptions and their current debug status. Similarly a /rest/system/log?since=... can grab the latest log entries, up to 250 of them (hardcoded constant in main.go) plus the initial few. Not implemented in this commit (but planned) is a simple debug GUI available on /debug that shows the current log in an easily pasteable format and has checkboxes to enable the various debug facilities. The debug instructions to a user then becomes "visit this URL, check these boxes, reproduce your problem, copy and paste the log". The actual log viewer on the hypothetical /debug URL can poll regularly for new log entries and this bypass the 250 line limit. The existing STTRACE=foo variable is still obeyed and just sets the start state of the system.
2015-10-03 15:25:21 +00:00
l.Debugf("queueing IndexUpdate(%v, %v, %d files, flags %x, opts: %s)", c.id, im.Folder, len(im.Files), im.Flags, im.Options)
c.receiver.IndexUpdate(c.id, im.Folder, filterIndexMessageFiles(im.Files), im.Flags, im.Options)
}
func filterIndexMessageFiles(fs []FileInfo) []FileInfo {
var out []FileInfo
for i, f := range fs {
switch f.Name {
case "", ".", "..", "/": // A few obviously invalid filenames
l.Infof("Dropping invalid filename %q from incoming index", f.Name)
if out == nil {
// Most incoming updates won't contain anything invalid, so we
// delay the allocation and copy to output slice until we
// really need to do it, then copy all the so var valid files
// to it.
out = make([]FileInfo, i, len(fs)-1)
copy(out, fs)
}
default:
if out != nil {
out = append(out, f)
}
}
}
if out != nil {
return out
}
return fs
2014-09-22 19:42:11 +00:00
}
func (c *rawConnection) handleRequest(msgID int, req RequestMessage) {
2015-07-29 20:23:43 +00:00
size := int(req.Size)
usePool := size <= BlockSize
var buf []byte
var done chan struct{}
if usePool {
buf = c.pool.Get().([]byte)[:size]
done = make(chan struct{})
} else {
buf = make([]byte, size)
}
2014-09-22 19:42:11 +00:00
2015-07-29 20:23:43 +00:00
err := c.receiver.Request(c.id, req.Folder, req.Name, int64(req.Offset), req.Hash, req.Flags, req.Options, buf)
if err != nil {
c.send(msgID, messageTypeResponse, ResponseMessage{
Data: nil,
Code: errorToCode(err),
}, done)
} else {
c.send(msgID, messageTypeResponse, ResponseMessage{
Data: buf,
Code: errorToCode(err),
}, done)
}
if usePool {
<-done
c.pool.Put(buf)
}
2014-09-22 19:42:11 +00:00
}
func (c *rawConnection) handleResponse(msgID int, resp ResponseMessage) {
c.awaitingMut.Lock()
if rc := c.awaiting[msgID]; rc != nil {
c.awaiting[msgID] = nil
2015-02-08 11:04:01 +00:00
rc <- asyncResult{resp.Data, codeToError(resp.Code)}
2014-09-22 19:42:11 +00:00
close(rc)
}
c.awaitingMut.Unlock()
}
2015-07-29 20:23:43 +00:00
func (c *rawConnection) send(msgID int, msgType int, msg encodable, done chan struct{}) bool {
2014-09-22 19:42:11 +00:00
if msgID < 0 {
select {
case id := <-c.nextID:
msgID = id
case <-c.closed:
return false
}
}
hdr := header{
version: 0,
msgID: msgID,
msgType: msgType,
}
select {
2015-07-29 20:23:43 +00:00
case c.outbox <- hdrMsg{hdr, msg, done}:
2014-09-22 19:42:11 +00:00
return true
case <-c.closed:
return false
}
}
func (c *rawConnection) writerLoop() {
var msgBuf = make([]byte, 8) // buffer for wire format message, kept and reused
var uncBuf []byte // buffer for uncompressed message, kept and reused
for {
var tempBuf []byte
var err error
select {
case hm := <-c.outbox:
if hm.msg != nil {
// Uncompressed message in uncBuf
2016-02-02 11:48:09 +00:00
msgLen := hm.msg.XDRSize()
if cap(uncBuf) >= msgLen {
uncBuf = uncBuf[:msgLen]
} else {
uncBuf = make([]byte, msgLen)
}
m := &xdr.Marshaller{Data: uncBuf}
err = hm.msg.MarshalXDRInto(m)
2015-07-29 20:23:43 +00:00
if hm.done != nil {
close(hm.done)
}
2014-10-21 06:40:05 +00:00
if err != nil {
c.close(err)
return
}
2014-09-22 19:42:11 +00:00
compress := false
switch c.compression {
case CompressAlways:
compress = true
case CompressMetadata:
compress = hm.hdr.msgType != messageTypeResponse
}
if compress && len(uncBuf) >= compressionThreshold {
2014-09-22 19:42:11 +00:00
// Use compression for large messages
hm.hdr.compression = true
// Make sure we have enough space for the compressed message plus header in msgBug
msgBuf = msgBuf[:cap(msgBuf)]
if maxLen := lz4.CompressBound(len(uncBuf)) + 8; maxLen > len(msgBuf) {
msgBuf = make([]byte, maxLen)
}
// Compressed is written to msgBuf, we keep tb for the length only
tempBuf, err = lz4.Encode(msgBuf[8:], uncBuf)
binary.BigEndian.PutUint32(msgBuf[4:8], uint32(len(tempBuf)))
msgBuf = msgBuf[0 : len(tempBuf)+8]
Implement facility based logger, debugging via REST API This implements a new debug/trace infrastructure based on a slightly hacked up logger. Instead of the traditional "if debug { ... }" I've rewritten the logger to have no-op Debugln and Debugf, unless debugging has been enabled for a given "facility". The "facility" is just a string, typically a package name. This will be slightly slower than before; but not that much as it's mostly a function call that returns immediately. For the cases where it matters (the Debugln takes a hex.Dump() of something for example, and it's not in a very occasional "if err != nil" branch) there is an l.ShouldDebug(facility) that is fast enough to be used like the old "if debug". The point of all this is that we can now toggle debugging for the various packages on and off at runtime. There's a new method /rest/system/debug that can be POSTed a set of facilities to enable and disable debug for, or GET from to get a list of facilities with descriptions and their current debug status. Similarly a /rest/system/log?since=... can grab the latest log entries, up to 250 of them (hardcoded constant in main.go) plus the initial few. Not implemented in this commit (but planned) is a simple debug GUI available on /debug that shows the current log in an easily pasteable format and has checkboxes to enable the various debug facilities. The debug instructions to a user then becomes "visit this URL, check these boxes, reproduce your problem, copy and paste the log". The actual log viewer on the hypothetical /debug URL can poll regularly for new log entries and this bypass the 250 line limit. The existing STTRACE=foo variable is still obeyed and just sets the start state of the system.
2015-10-03 15:25:21 +00:00
l.Debugf("write compressed message; %v (len=%d)", hm.hdr, len(tempBuf))
2014-09-22 19:42:11 +00:00
} else {
// No point in compressing very short messages
hm.hdr.compression = false
msgBuf = msgBuf[:cap(msgBuf)]
if l := len(uncBuf) + 8; l > len(msgBuf) {
msgBuf = make([]byte, l)
}
binary.BigEndian.PutUint32(msgBuf[4:8], uint32(len(uncBuf)))
msgBuf = msgBuf[0 : len(uncBuf)+8]
copy(msgBuf[8:], uncBuf)
Implement facility based logger, debugging via REST API This implements a new debug/trace infrastructure based on a slightly hacked up logger. Instead of the traditional "if debug { ... }" I've rewritten the logger to have no-op Debugln and Debugf, unless debugging has been enabled for a given "facility". The "facility" is just a string, typically a package name. This will be slightly slower than before; but not that much as it's mostly a function call that returns immediately. For the cases where it matters (the Debugln takes a hex.Dump() of something for example, and it's not in a very occasional "if err != nil" branch) there is an l.ShouldDebug(facility) that is fast enough to be used like the old "if debug". The point of all this is that we can now toggle debugging for the various packages on and off at runtime. There's a new method /rest/system/debug that can be POSTed a set of facilities to enable and disable debug for, or GET from to get a list of facilities with descriptions and their current debug status. Similarly a /rest/system/log?since=... can grab the latest log entries, up to 250 of them (hardcoded constant in main.go) plus the initial few. Not implemented in this commit (but planned) is a simple debug GUI available on /debug that shows the current log in an easily pasteable format and has checkboxes to enable the various debug facilities. The debug instructions to a user then becomes "visit this URL, check these boxes, reproduce your problem, copy and paste the log". The actual log viewer on the hypothetical /debug URL can poll regularly for new log entries and this bypass the 250 line limit. The existing STTRACE=foo variable is still obeyed and just sets the start state of the system.
2015-10-03 15:25:21 +00:00
l.Debugf("write uncompressed message; %v (len=%d)", hm.hdr, len(uncBuf))
2014-09-22 19:42:11 +00:00
}
} else {
Implement facility based logger, debugging via REST API This implements a new debug/trace infrastructure based on a slightly hacked up logger. Instead of the traditional "if debug { ... }" I've rewritten the logger to have no-op Debugln and Debugf, unless debugging has been enabled for a given "facility". The "facility" is just a string, typically a package name. This will be slightly slower than before; but not that much as it's mostly a function call that returns immediately. For the cases where it matters (the Debugln takes a hex.Dump() of something for example, and it's not in a very occasional "if err != nil" branch) there is an l.ShouldDebug(facility) that is fast enough to be used like the old "if debug". The point of all this is that we can now toggle debugging for the various packages on and off at runtime. There's a new method /rest/system/debug that can be POSTed a set of facilities to enable and disable debug for, or GET from to get a list of facilities with descriptions and their current debug status. Similarly a /rest/system/log?since=... can grab the latest log entries, up to 250 of them (hardcoded constant in main.go) plus the initial few. Not implemented in this commit (but planned) is a simple debug GUI available on /debug that shows the current log in an easily pasteable format and has checkboxes to enable the various debug facilities. The debug instructions to a user then becomes "visit this URL, check these boxes, reproduce your problem, copy and paste the log". The actual log viewer on the hypothetical /debug URL can poll regularly for new log entries and this bypass the 250 line limit. The existing STTRACE=foo variable is still obeyed and just sets the start state of the system.
2015-10-03 15:25:21 +00:00
l.Debugf("write empty message; %v", hm.hdr)
2014-09-22 19:42:11 +00:00
binary.BigEndian.PutUint32(msgBuf[4:8], 0)
msgBuf = msgBuf[:8]
}
binary.BigEndian.PutUint32(msgBuf[0:4], encodeHeader(hm.hdr))
if err == nil {
var n int
n, err = c.cw.Write(msgBuf)
Implement facility based logger, debugging via REST API This implements a new debug/trace infrastructure based on a slightly hacked up logger. Instead of the traditional "if debug { ... }" I've rewritten the logger to have no-op Debugln and Debugf, unless debugging has been enabled for a given "facility". The "facility" is just a string, typically a package name. This will be slightly slower than before; but not that much as it's mostly a function call that returns immediately. For the cases where it matters (the Debugln takes a hex.Dump() of something for example, and it's not in a very occasional "if err != nil" branch) there is an l.ShouldDebug(facility) that is fast enough to be used like the old "if debug". The point of all this is that we can now toggle debugging for the various packages on and off at runtime. There's a new method /rest/system/debug that can be POSTed a set of facilities to enable and disable debug for, or GET from to get a list of facilities with descriptions and their current debug status. Similarly a /rest/system/log?since=... can grab the latest log entries, up to 250 of them (hardcoded constant in main.go) plus the initial few. Not implemented in this commit (but planned) is a simple debug GUI available on /debug that shows the current log in an easily pasteable format and has checkboxes to enable the various debug facilities. The debug instructions to a user then becomes "visit this URL, check these boxes, reproduce your problem, copy and paste the log". The actual log viewer on the hypothetical /debug URL can poll regularly for new log entries and this bypass the 250 line limit. The existing STTRACE=foo variable is still obeyed and just sets the start state of the system.
2015-10-03 15:25:21 +00:00
l.Debugf("wrote %d bytes on the wire", n)
2014-09-22 19:42:11 +00:00
}
if err != nil {
c.close(err)
return
}
case <-c.closed:
return
}
}
}
func (c *rawConnection) close(err error) {
c.once.Do(func() {
2016-01-12 08:19:44 +00:00
l.Debugln("close due to", err)
2014-09-22 19:42:11 +00:00
close(c.closed)
c.awaitingMut.Lock()
for i, ch := range c.awaiting {
if ch != nil {
close(ch)
c.awaiting[i] = nil
}
}
c.awaitingMut.Unlock()
go c.receiver.Close(c.id, err)
})
}
func (c *rawConnection) idGenerator() {
nextID := 0
for {
nextID = (nextID + 1) & 0xfff
select {
case c.nextID <- nextID:
case <-c.closed:
return
}
}
}
// The pingSender makes sure that we've sent a message within the last
// PingSendInterval. If we already have something sent in the last
// PingSendInterval/2, we do nothing. Otherwise we send a ping message. This
// results in an effecting ping interval of somewhere between
// PingSendInterval/2 and PingSendInterval.
func (c *rawConnection) pingSender() {
ticker := time.Tick(PingSendInterval / 2)
2014-09-22 19:42:11 +00:00
for {
select {
case <-ticker:
d := time.Since(c.cw.Last())
if d < PingSendInterval/2 {
Implement facility based logger, debugging via REST API This implements a new debug/trace infrastructure based on a slightly hacked up logger. Instead of the traditional "if debug { ... }" I've rewritten the logger to have no-op Debugln and Debugf, unless debugging has been enabled for a given "facility". The "facility" is just a string, typically a package name. This will be slightly slower than before; but not that much as it's mostly a function call that returns immediately. For the cases where it matters (the Debugln takes a hex.Dump() of something for example, and it's not in a very occasional "if err != nil" branch) there is an l.ShouldDebug(facility) that is fast enough to be used like the old "if debug". The point of all this is that we can now toggle debugging for the various packages on and off at runtime. There's a new method /rest/system/debug that can be POSTed a set of facilities to enable and disable debug for, or GET from to get a list of facilities with descriptions and their current debug status. Similarly a /rest/system/log?since=... can grab the latest log entries, up to 250 of them (hardcoded constant in main.go) plus the initial few. Not implemented in this commit (but planned) is a simple debug GUI available on /debug that shows the current log in an easily pasteable format and has checkboxes to enable the various debug facilities. The debug instructions to a user then becomes "visit this URL, check these boxes, reproduce your problem, copy and paste the log". The actual log viewer on the hypothetical /debug URL can poll regularly for new log entries and this bypass the 250 line limit. The existing STTRACE=foo variable is still obeyed and just sets the start state of the system.
2015-10-03 15:25:21 +00:00
l.Debugln(c.id, "ping skipped after wr", d)
2014-09-22 19:42:11 +00:00
continue
}
Implement facility based logger, debugging via REST API This implements a new debug/trace infrastructure based on a slightly hacked up logger. Instead of the traditional "if debug { ... }" I've rewritten the logger to have no-op Debugln and Debugf, unless debugging has been enabled for a given "facility". The "facility" is just a string, typically a package name. This will be slightly slower than before; but not that much as it's mostly a function call that returns immediately. For the cases where it matters (the Debugln takes a hex.Dump() of something for example, and it's not in a very occasional "if err != nil" branch) there is an l.ShouldDebug(facility) that is fast enough to be used like the old "if debug". The point of all this is that we can now toggle debugging for the various packages on and off at runtime. There's a new method /rest/system/debug that can be POSTed a set of facilities to enable and disable debug for, or GET from to get a list of facilities with descriptions and their current debug status. Similarly a /rest/system/log?since=... can grab the latest log entries, up to 250 of them (hardcoded constant in main.go) plus the initial few. Not implemented in this commit (but planned) is a simple debug GUI available on /debug that shows the current log in an easily pasteable format and has checkboxes to enable the various debug facilities. The debug instructions to a user then becomes "visit this URL, check these boxes, reproduce your problem, copy and paste the log". The actual log viewer on the hypothetical /debug URL can poll regularly for new log entries and this bypass the 250 line limit. The existing STTRACE=foo variable is still obeyed and just sets the start state of the system.
2015-10-03 15:25:21 +00:00
l.Debugln(c.id, "ping -> after", d)
c.ping()
case <-c.closed:
return
}
}
}
// The pingReceiver checks that we've received a message (any message will do,
// but we expect pings in the absence of other messages) within the last
// ReceiveTimeout. If not, we close the connection with an ErrTimeout.
func (c *rawConnection) pingReceiver() {
ticker := time.Tick(ReceiveTimeout / 2)
for {
select {
case <-ticker:
d := time.Since(c.cr.Last())
if d > ReceiveTimeout {
Implement facility based logger, debugging via REST API This implements a new debug/trace infrastructure based on a slightly hacked up logger. Instead of the traditional "if debug { ... }" I've rewritten the logger to have no-op Debugln and Debugf, unless debugging has been enabled for a given "facility". The "facility" is just a string, typically a package name. This will be slightly slower than before; but not that much as it's mostly a function call that returns immediately. For the cases where it matters (the Debugln takes a hex.Dump() of something for example, and it's not in a very occasional "if err != nil" branch) there is an l.ShouldDebug(facility) that is fast enough to be used like the old "if debug". The point of all this is that we can now toggle debugging for the various packages on and off at runtime. There's a new method /rest/system/debug that can be POSTed a set of facilities to enable and disable debug for, or GET from to get a list of facilities with descriptions and their current debug status. Similarly a /rest/system/log?since=... can grab the latest log entries, up to 250 of them (hardcoded constant in main.go) plus the initial few. Not implemented in this commit (but planned) is a simple debug GUI available on /debug that shows the current log in an easily pasteable format and has checkboxes to enable the various debug facilities. The debug instructions to a user then becomes "visit this URL, check these boxes, reproduce your problem, copy and paste the log". The actual log viewer on the hypothetical /debug URL can poll regularly for new log entries and this bypass the 250 line limit. The existing STTRACE=foo variable is still obeyed and just sets the start state of the system.
2015-10-03 15:25:21 +00:00
l.Debugln(c.id, "ping timeout", d)
c.close(ErrTimeout)
}
Implement facility based logger, debugging via REST API This implements a new debug/trace infrastructure based on a slightly hacked up logger. Instead of the traditional "if debug { ... }" I've rewritten the logger to have no-op Debugln and Debugf, unless debugging has been enabled for a given "facility". The "facility" is just a string, typically a package name. This will be slightly slower than before; but not that much as it's mostly a function call that returns immediately. For the cases where it matters (the Debugln takes a hex.Dump() of something for example, and it's not in a very occasional "if err != nil" branch) there is an l.ShouldDebug(facility) that is fast enough to be used like the old "if debug". The point of all this is that we can now toggle debugging for the various packages on and off at runtime. There's a new method /rest/system/debug that can be POSTed a set of facilities to enable and disable debug for, or GET from to get a list of facilities with descriptions and their current debug status. Similarly a /rest/system/log?since=... can grab the latest log entries, up to 250 of them (hardcoded constant in main.go) plus the initial few. Not implemented in this commit (but planned) is a simple debug GUI available on /debug that shows the current log in an easily pasteable format and has checkboxes to enable the various debug facilities. The debug instructions to a user then becomes "visit this URL, check these boxes, reproduce your problem, copy and paste the log". The actual log viewer on the hypothetical /debug URL can poll regularly for new log entries and this bypass the 250 line limit. The existing STTRACE=foo variable is still obeyed and just sets the start state of the system.
2015-10-03 15:25:21 +00:00
l.Debugln(c.id, "last read within", d)
2014-09-22 19:42:11 +00:00
case <-c.closed:
return
}
}
}
type Statistics struct {
At time.Time
InBytesTotal int64
OutBytesTotal int64
2014-09-22 19:42:11 +00:00
}
func (c *rawConnection) Statistics() Statistics {
return Statistics{
At: time.Now(),
InBytesTotal: c.cr.Tot(),
OutBytesTotal: c.cw.Tot(),
}
}