From 6c5c14f35f498b2bc239eed40455c0ef6ec015aa Mon Sep 17 00:00:00 2001 From: Jakob Borg Date: Mon, 28 Jul 2014 11:31:22 +0200 Subject: [PATCH] Refactor compression support, now at message level. --- protocol/PROTOCOL.md | 69 ++++---- protocol/counting.go | 17 +- protocol/header.go | 21 ++- protocol/message.go | 6 + protocol/message_xdr.go | 107 ++++++++++++ protocol/protocol.go | 363 ++++++++++++++++++++++++---------------- xdr/reader.go | 58 +++---- xdr/writer.go | 26 +-- 8 files changed, 418 insertions(+), 249 deletions(-) diff --git a/protocol/PROTOCOL.md b/protocol/PROTOCOL.md index b541b2a16..de1c6b9ea 100644 --- a/protocol/PROTOCOL.md +++ b/protocol/PROTOCOL.md @@ -25,13 +25,11 @@ Transport and Authentication ---------------------------- BEP is deployed as the highest level in a protocol stack, with the lower -level protocols providing compression, encryption and authentication. +level protocols providing encryption and authentication. +-----------------------------| | Block Exchange Protocol | |-----------------------------| - | Compression (LZ4) | - |-----------------------------| | Encryption & Auth (TLS 1.2) | |-----------------------------| | TCP | @@ -62,48 +60,19 @@ requests are received. The underlying transport protocol MUST be TCP. -Compression ------------ - -All data is sent within compressed blocks. Blocks are compressed using -the LZ4 format and algorithm described in -https://code.google.com/p/lz4/. Each compressed block is preceded by a -header consisting of three 32 bit words, in network order (big endian): - - 0 1 2 3 - 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 - +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - | Magic (0x0x5e63b278) | - +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - | Data Length | - +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - | Uncompressed Block Length | - +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - / / - \ Compressed Data \ - / / - +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - -The Data Length indicates the length of data following the Data Length -field until the next header, i.e. the length of the Compressed Data -section plus four bytes for the Uncompressed Block Length field. The -Uncompressed Block Length indicates the amount of data that will result -when decompressing the Compressed Data section. - -A single BEP message SHOULD be sent as a single compressed block. A -single compressed block MAY NOT contain more than one BEP message. - Messages -------- Every message starts with one 32 bit word indicating the message -version, type and ID. The header is in network byte order, i.e. big -endian. +version, type and ID, followed by the length of the message. The header +is in network byte order, i.e. big endian. 0 1 2 3 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - | Ver | Message ID | Type | Reserved | + | Ver | Message ID | Type | Reserved |C| + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Length | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ For BEP v1 the Version field is set to zero. Future versions with @@ -125,10 +94,28 @@ The Type field indicates the type of data following the message header and is one of the integers defined below. A message of an unknown type is a protocol error and MUST result in the connection being terminated. -All data following the message header MUST be in XDR (RFC 1014) -encoding. All fields shorter than 32 bits and all variable length data -MUST be padded to a multiple of 32 bits. The actual data types in use by -BEP, in XDR naming convention, are the following: +The Compression bit "C" indicates the compression used for the message. + +For C=1: + + * The Length field contains the length, in bytes, of the + compressed message data. + + * The message data is compressed using the LZ4 format and algorithm + described in https://code.google.com/p/lz4/. + +For C=0: + + * The Length field contains the length, in bytes, of the + uncompressed message data. + + * The message is not compressed. + +All data within the the message (post decompression, if compression is +in use) MUST be in XDR (RFC 1014) encoding. All fields shorter than 32 +bits and all variable length data MUST be padded to a multiple of 32 +bits. The actual data types in use by BEP, in XDR naming convention, are +the following: - (unsigned) int -- (unsigned) 32 bit integer - (unsigned) hyper -- (unsigned) 64 bit integer diff --git a/protocol/counting.go b/protocol/counting.go index a8c54d867..512774fba 100644 --- a/protocol/counting.go +++ b/protocol/counting.go @@ -7,11 +7,13 @@ package protocol import ( "io" "sync/atomic" + "time" ) type countingReader struct { io.Reader - tot uint64 + tot uint64 // bytes + last int64 // unix nanos } var ( @@ -23,6 +25,7 @@ func (c *countingReader) Read(bs []byte) (int, error) { n, err := c.Reader.Read(bs) atomic.AddUint64(&c.tot, uint64(n)) atomic.AddUint64(&totalIncoming, uint64(n)) + atomic.StoreInt64(&c.last, time.Now().UnixNano()) return n, err } @@ -30,15 +33,21 @@ func (c *countingReader) Tot() uint64 { return atomic.LoadUint64(&c.tot) } +func (c *countingReader) Last() time.Time { + return time.Unix(0, atomic.LoadInt64(&c.last)) +} + type countingWriter struct { io.Writer - tot uint64 + tot uint64 // bytes + last int64 // unix nanos } func (c *countingWriter) Write(bs []byte) (int, error) { n, err := c.Writer.Write(bs) atomic.AddUint64(&c.tot, uint64(n)) atomic.AddUint64(&totalOutgoing, uint64(n)) + atomic.StoreInt64(&c.last, time.Now().UnixNano()) return n, err } @@ -46,6 +55,10 @@ func (c *countingWriter) Tot() uint64 { return atomic.LoadUint64(&c.tot) } +func (c *countingWriter) Last() time.Time { + return time.Unix(0, atomic.LoadInt64(&c.last)) +} + func TotalInOut() (uint64, uint64) { return atomic.LoadUint64(&totalIncoming), atomic.LoadUint64(&totalOutgoing) } diff --git a/protocol/header.go b/protocol/header.go index d92b78891..9c0a1ccfc 100644 --- a/protocol/header.go +++ b/protocol/header.go @@ -7,9 +7,10 @@ package protocol import "github.com/calmh/syncthing/xdr" type header struct { - version int - msgID int - msgType int + version int + msgID int + msgType int + compression bool } func (h header) encodeXDR(xw *xdr.Writer) (int, error) { @@ -24,15 +25,21 @@ func (h *header) decodeXDR(xr *xdr.Reader) error { } func encodeHeader(h header) uint32 { + var isComp uint32 + if h.compression { + isComp = 1 << 0 // the zeroth bit is the compression bit + } return uint32(h.version&0xf)<<28 + uint32(h.msgID&0xfff)<<16 + - uint32(h.msgType&0xff)<<8 + uint32(h.msgType&0xff)<<8 + + isComp } func decodeHeader(u uint32) header { return header{ - version: int(u>>28) & 0xf, - msgID: int(u>>16) & 0xfff, - msgType: int(u>>8) & 0xff, + version: int(u>>28) & 0xf, + msgID: int(u>>16) & 0xfff, + msgType: int(u>>8) & 0xff, + compression: u&1 == 1, } } diff --git a/protocol/message.go b/protocol/message.go index e3b54d236..4f0899437 100644 --- a/protocol/message.go +++ b/protocol/message.go @@ -49,6 +49,10 @@ type RequestMessage struct { Size uint32 } +type ResponseMessage struct { + Data []byte +} + type ClusterConfigMessage struct { ClientName string // max:64 ClientVersion string // max:64 @@ -75,3 +79,5 @@ type Option struct { type CloseMessage struct { Reason string // max:1024 } + +type EmptyMessage struct{} diff --git a/protocol/message_xdr.go b/protocol/message_xdr.go index 899a8b426..917aab6ff 100644 --- a/protocol/message_xdr.go +++ b/protocol/message_xdr.go @@ -348,6 +348,64 @@ func (o *RequestMessage) decodeXDR(xr *xdr.Reader) error { /* +ResponseMessage Structure: + + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| Length of Data | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +/ / +\ Data (variable length) \ +/ / ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + + +struct ResponseMessage { + opaque Data<>; +} + +*/ + +func (o ResponseMessage) EncodeXDR(w io.Writer) (int, error) { + var xw = xdr.NewWriter(w) + return o.encodeXDR(xw) +} + +func (o ResponseMessage) MarshalXDR() []byte { + return o.AppendXDR(make([]byte, 0, 128)) +} + +func (o ResponseMessage) AppendXDR(bs []byte) []byte { + var aw = xdr.AppendWriter(bs) + var xw = xdr.NewWriter(&aw) + o.encodeXDR(xw) + return []byte(aw) +} + +func (o ResponseMessage) encodeXDR(xw *xdr.Writer) (int, error) { + xw.WriteBytes(o.Data) + return xw.Tot(), xw.Error() +} + +func (o *ResponseMessage) DecodeXDR(r io.Reader) error { + xr := xdr.NewReader(r) + return o.decodeXDR(xr) +} + +func (o *ResponseMessage) UnmarshalXDR(bs []byte) error { + var br = bytes.NewReader(bs) + var xr = xdr.NewReader(br) + return o.decodeXDR(xr) +} + +func (o *ResponseMessage) decodeXDR(xr *xdr.Reader) error { + o.Data = xr.ReadBytes() + return xr.Error() +} + +/* + ClusterConfigMessage Structure: 0 1 2 3 @@ -752,3 +810,52 @@ func (o *CloseMessage) decodeXDR(xr *xdr.Reader) error { o.Reason = xr.ReadStringMax(1024) return xr.Error() } + +/* + +EmptyMessage Structure: + + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + + +struct EmptyMessage { +} + +*/ + +func (o EmptyMessage) EncodeXDR(w io.Writer) (int, error) { + var xw = xdr.NewWriter(w) + return o.encodeXDR(xw) +} + +func (o EmptyMessage) MarshalXDR() []byte { + return o.AppendXDR(make([]byte, 0, 128)) +} + +func (o EmptyMessage) AppendXDR(bs []byte) []byte { + var aw = xdr.AppendWriter(bs) + var xw = xdr.NewWriter(&aw) + o.encodeXDR(xw) + return []byte(aw) +} + +func (o EmptyMessage) encodeXDR(xw *xdr.Writer) (int, error) { + return xw.Tot(), xw.Error() +} + +func (o *EmptyMessage) DecodeXDR(r io.Reader) error { + xr := xdr.NewReader(r) + return o.decodeXDR(xr) +} + +func (o *EmptyMessage) UnmarshalXDR(bs []byte) error { + var br = bytes.NewReader(bs) + var xr = xdr.NewReader(br) + return o.decodeXDR(xr) +} + +func (o *EmptyMessage) decodeXDR(xr *xdr.Reader) error { + return xr.Error() +} diff --git a/protocol/protocol.go b/protocol/protocol.go index 195ba9e08..e900e1b25 100644 --- a/protocol/protocol.go +++ b/protocol/protocol.go @@ -6,16 +6,21 @@ package protocol import ( "bufio" + "encoding/binary" + "encoding/hex" "errors" "fmt" "io" "sync" "time" - "github.com/calmh/syncthing/xdr" + lz4 "github.com/bkaradzic/go-lz4" ) -const BlockSize = 128 * 1024 +const ( + BlockSize = 128 * 1024 + MinCompressedSize = 128 // message must be this big to enable compression +) const ( messageTypeClusterConfig = 0 @@ -82,21 +87,22 @@ type rawConnection struct { state int cr *countingReader - xr *xdr.Reader cw *countingWriter wb *bufio.Writer - xw *xdr.Writer - awaiting []chan asyncResult + awaiting [4096]chan asyncResult awaitingMut sync.Mutex idxMut sync.Mutex // ensures serialization of Index calls nextID chan int - outbox chan []encodable + outbox chan hdrMsg closed chan struct{} once sync.Once + + rdbuf0 []byte // used & reused by readMessage + rdbuf1 []byte // used & reused by readMessage } type asyncResult struct { @@ -104,36 +110,32 @@ type asyncResult struct { err error } +type hdrMsg struct { + hdr header + msg encodable +} + +type encodable interface { + AppendXDR([]byte) []byte +} + const ( pingTimeout = 30 * time.Second pingIdleTime = 60 * time.Second ) func NewConnection(nodeID NodeID, reader io.Reader, writer io.Writer, receiver Model, name string) Connection { - // Byte counters are at the lowest level, counting compressed bytes cr := &countingReader{Reader: reader} cw := &countingWriter{Writer: writer} - // Compression is just above counting - zr := newLZ4Reader(cr) - zw := newLZ4Writer(cw) - - // We buffer writes on top of compression. - // The LZ4 reader is already internally buffered - wb := bufio.NewWriterSize(zw, 65536) - c := rawConnection{ id: nodeID, name: name, receiver: nativeModel{receiver}, state: stateInitial, cr: cr, - xr: xdr.NewReader(zr), cw: cw, - wb: wb, - xw: xdr.NewWriter(wb), - awaiting: make([]chan asyncResult, 0x1000), - outbox: make(chan []encodable), + outbox: make(chan hdrMsg), nextID: make(chan int), closed: make(chan struct{}), } @@ -162,7 +164,7 @@ func (c *rawConnection) Index(repo string, idx []FileInfo) error { default: } c.idxMut.Lock() - c.send(header{0, -1, messageTypeIndex}, IndexMessage{repo, idx}) + c.send(-1, messageTypeIndex, IndexMessage{repo, idx}) c.idxMut.Unlock() return nil } @@ -175,7 +177,7 @@ func (c *rawConnection) IndexUpdate(repo string, idx []FileInfo) error { default: } c.idxMut.Lock() - c.send(header{0, -1, messageTypeIndexUpdate}, IndexMessage{repo, idx}) + c.send(-1, messageTypeIndexUpdate, IndexMessage{repo, idx}) c.idxMut.Unlock() return nil } @@ -197,8 +199,7 @@ func (c *rawConnection) Request(repo string, name string, offset int64, size int c.awaiting[id] = rc c.awaitingMut.Unlock() - ok := c.send(header{0, id, messageTypeRequest}, - RequestMessage{repo, name, uint64(offset), uint32(size)}) + ok := c.send(id, messageTypeRequest, RequestMessage{repo, name, uint64(offset), uint32(size)}) if !ok { return nil, ErrClosed } @@ -212,7 +213,7 @@ func (c *rawConnection) Request(repo string, name string, offset int64, size int // ClusterConfig send the cluster configuration message to the peer and returns any error func (c *rawConnection) ClusterConfig(config ClusterConfigMessage) { - c.send(header{0, -1, messageTypeClusterConfig}, config) + c.send(-1, messageTypeClusterConfig, config) } func (c *rawConnection) ping() bool { @@ -228,7 +229,7 @@ func (c *rawConnection) ping() bool { c.awaiting[id] = rc c.awaitingMut.Unlock() - ok := c.send(header{0, id, messageTypePing}) + ok := c.send(id, messageTypePing, nil) if !ok { return false } @@ -249,68 +250,53 @@ func (c *rawConnection) readerLoop() (err error) { default: } - var hdr header - hdr.decodeXDR(c.xr) - if err := c.xr.Error(); err != nil { + hdr, msg, err := c.readMessage() + if err != nil { return err } - if hdr.version != 0 { - return fmt.Errorf("protocol error: %s: unknown message version %#x", c.id, hdr.version) - } switch hdr.msgType { case messageTypeIndex: if c.state < stateCCRcvd { return fmt.Errorf("protocol error: index message in state %d", c.state) } - if err := c.handleIndex(); err != nil { - return err - } + c.handleIndex(msg.(IndexMessage)) c.state = stateIdxRcvd case messageTypeIndexUpdate: if c.state < stateIdxRcvd { return fmt.Errorf("protocol error: index update message in state %d", c.state) } - if err := c.handleIndexUpdate(); err != nil { - return err - } + c.handleIndexUpdate(msg.(IndexMessage)) case messageTypeRequest: if c.state < stateIdxRcvd { return fmt.Errorf("protocol error: request message in state %d", c.state) } - if err := c.handleRequest(hdr); err != nil { - return err - } + // Requests are handled asynchronously + go c.handleRequest(hdr.msgID, msg.(RequestMessage)) case messageTypeResponse: if c.state < stateIdxRcvd { return fmt.Errorf("protocol error: response message in state %d", c.state) } - if err := c.handleResponse(hdr); err != nil { - return err - } + c.handleResponse(hdr.msgID, msg.(ResponseMessage)) case messageTypePing: - c.send(header{0, hdr.msgID, messageTypePong}) + c.send(hdr.msgID, messageTypePong, EmptyMessage{}) case messageTypePong: - c.handlePong(hdr) + c.handlePong(hdr.msgID) case messageTypeClusterConfig: if c.state != stateInitial { return fmt.Errorf("protocol error: cluster config message in state %d", c.state) } - if err := c.handleClusterConfig(); err != nil { - return err - } + go c.receiver.ClusterConfig(c.id, msg.(ClusterConfigMessage)) c.state = stateCCRcvd case messageTypeClose: - if err := c.handleClose(); err != nil { - return err - } + return errors.New(msg.(CloseMessage).Reason) default: return fmt.Errorf("protocol error: %s: unknown message type %#x", c.id, hdr.msgType) @@ -318,114 +304,153 @@ func (c *rawConnection) readerLoop() (err error) { } } -func (c *rawConnection) handleIndex() error { - var im IndexMessage - im.decodeXDR(c.xr) - if err := c.xr.Error(); err != nil { - return err +func (c *rawConnection) readMessage() (hdr header, msg encodable, err error) { + if cap(c.rdbuf0) < 8 { + c.rdbuf0 = make([]byte, 8) } else { - if debug { - l.Debugf("Index(%v, %v, %d files)", c.id, im.Repository, len(im.Files)) - } - c.receiver.Index(c.id, im.Repository, im.Files) + c.rdbuf0 = c.rdbuf0[:8] + } + _, err = io.ReadFull(c.cr, c.rdbuf0) + if err != nil { + return } - return nil -} -func (c *rawConnection) handleIndexUpdate() error { - var im IndexMessage - im.decodeXDR(c.xr) - if err := c.xr.Error(); err != nil { - return err + hdr = decodeHeader(binary.BigEndian.Uint32(c.rdbuf0[0:4])) + msglen := int(binary.BigEndian.Uint32(c.rdbuf0[4:8])) + + if debug { + l.Debugf("read header %v (msglen=%d)", hdr, msglen) + } + + if cap(c.rdbuf0) < msglen { + c.rdbuf0 = make([]byte, msglen) } else { - if debug { - l.Debugf("queueing IndexUpdate(%v, %v, %d files)", c.id, im.Repository, len(im.Files)) + c.rdbuf0 = c.rdbuf0[:msglen] + } + _, err = io.ReadFull(c.cr, c.rdbuf0) + if err != nil { + return + } + + if debug { + l.Debugf("read %d bytes", len(c.rdbuf0)) + } + + msgBuf := c.rdbuf0 + if hdr.compression { + c.rdbuf1 = c.rdbuf1[:cap(c.rdbuf1)] + c.rdbuf1, err = lz4.Decode(c.rdbuf1, c.rdbuf0) + if err != nil { + return + } + msgBuf = c.rdbuf1 + if debug { + l.Debugf("decompressed to %d bytes", len(msgBuf)) } - c.receiver.IndexUpdate(c.id, im.Repository, im.Files) } - return nil + + if debug { + if len(msgBuf) > 1024 { + l.Debugf("message data:\n%s", hex.Dump(msgBuf[:1024])) + } else { + l.Debugf("message data:\n%s", hex.Dump(msgBuf)) + } + } + + switch hdr.msgType { + case messageTypeIndex, messageTypeIndexUpdate: + var idx IndexMessage + err = idx.UnmarshalXDR(msgBuf) + msg = idx + + case messageTypeRequest: + var req RequestMessage + err = req.UnmarshalXDR(msgBuf) + msg = req + + case messageTypeResponse: + var resp ResponseMessage + err = resp.UnmarshalXDR(msgBuf) + msg = resp + + case messageTypePing, messageTypePong: + msg = EmptyMessage{} + + case messageTypeClusterConfig: + var cc ClusterConfigMessage + err = cc.UnmarshalXDR(msgBuf) + msg = cc + + case messageTypeClose: + var cm CloseMessage + err = cm.UnmarshalXDR(msgBuf) + msg = cm + + default: + err = fmt.Errorf("protocol error: %s: unknown message type %#x", c.id, hdr.msgType) + } + + return } -func (c *rawConnection) handleRequest(hdr header) error { - var req RequestMessage - req.decodeXDR(c.xr) - if err := c.xr.Error(); err != nil { - return err +func (c *rawConnection) handleIndex(im IndexMessage) { + if debug { + l.Debugf("Index(%v, %v, %d files)", c.id, im.Repository, len(im.Files)) } - go c.processRequest(hdr.msgID, req) - return nil + c.receiver.Index(c.id, im.Repository, im.Files) } -func (c *rawConnection) handleResponse(hdr header) error { - data := c.xr.ReadBytesMax(256 * 1024) // Sufficiently larger than max expected block size - - if err := c.xr.Error(); err != nil { - return err +func (c *rawConnection) handleIndexUpdate(im IndexMessage) { + if debug { + l.Debugf("queueing IndexUpdate(%v, %v, %d files)", c.id, im.Repository, len(im.Files)) } + c.receiver.IndexUpdate(c.id, im.Repository, im.Files) +} +func (c *rawConnection) handleRequest(msgID int, req RequestMessage) { + data, _ := c.receiver.Request(c.id, req.Repository, req.Name, int64(req.Offset), int(req.Size)) + + c.send(msgID, messageTypeResponse, ResponseMessage{data}) +} + +func (c *rawConnection) handleResponse(msgID int, resp ResponseMessage) { c.awaitingMut.Lock() - if rc := c.awaiting[hdr.msgID]; rc != nil { - c.awaiting[hdr.msgID] = nil - rc <- asyncResult{data, nil} + if rc := c.awaiting[msgID]; rc != nil { + c.awaiting[msgID] = nil + rc <- asyncResult{resp.Data, nil} close(rc) } c.awaitingMut.Unlock() - - return nil } -func (c *rawConnection) handlePong(hdr header) { +func (c *rawConnection) handlePong(msgID int) { c.awaitingMut.Lock() - if rc := c.awaiting[hdr.msgID]; rc != nil { - c.awaiting[hdr.msgID] = nil + if rc := c.awaiting[msgID]; rc != nil { + c.awaiting[msgID] = nil rc <- asyncResult{} close(rc) } c.awaitingMut.Unlock() } -func (c *rawConnection) handleClusterConfig() error { - var cm ClusterConfigMessage - cm.decodeXDR(c.xr) - if err := c.xr.Error(); err != nil { - return err - } else { - go c.receiver.ClusterConfig(c.id, cm) - } - return nil -} - -func (c *rawConnection) handleClose() error { - var cm CloseMessage - cm.decodeXDR(c.xr) - if err := c.xr.Error(); err != nil { - return err - } - return errors.New(cm.Reason) -} - -type encodable interface { - encodeXDR(*xdr.Writer) (int, error) -} -type encodableBytes []byte - -func (e encodableBytes) encodeXDR(xw *xdr.Writer) (int, error) { - return xw.WriteBytes(e) -} - -func (c *rawConnection) send(h header, es ...encodable) bool { - if h.msgID < 0 { +func (c *rawConnection) send(msgID int, msgType int, msg encodable) bool { + if msgID < 0 { select { case id := <-c.nextID: - h.msgID = id + msgID = id case <-c.closed: return false } } - msg := append([]encodable{h}, es...) + + hdr := header{ + version: 0, + msgID: msgID, + msgType: msgType, + } select { - case c.outbox <- msg: + case c.outbox <- hdrMsg{hdr, msg}: return true case <-c.closed: return false @@ -433,13 +458,71 @@ func (c *rawConnection) send(h header, es ...encodable) bool { } func (c *rawConnection) writerLoop() { + var msgBuf = make([]byte, 8) // buffer for wire format message, kept and reused + var uncBuf []byte // buffer for uncompressed message, kept and reused for { + var tempBuf []byte + var err error + select { - case es := <-c.outbox: - for _, e := range es { - e.encodeXDR(c.xw) + case hm := <-c.outbox: + if hm.msg != nil { + // Uncompressed message in uncBuf + uncBuf = hm.msg.AppendXDR(uncBuf[:0]) + + if len(uncBuf) >= MinCompressedSize { + // Use compression for large messages + hm.hdr.compression = true + + // Make sure we have enough space for the compressed message plus header in msgBug + msgBuf = msgBuf[:cap(msgBuf)] + if maxLen := lz4.CompressBound(len(uncBuf)) + 8; maxLen > len(msgBuf) { + msgBuf = make([]byte, maxLen) + } + + // Compressed is written to msgBuf, we keep tb for the length only + tempBuf, err = lz4.Encode(msgBuf[8:], uncBuf) + binary.BigEndian.PutUint32(msgBuf[4:8], uint32(len(tempBuf))) + msgBuf = msgBuf[0 : len(tempBuf)+8] + + if debug { + l.Debugf("write compressed message; %v (len=%d)", hm.hdr, len(tempBuf)) + } + } else { + // No point in compressing very short messages + hm.hdr.compression = false + + msgBuf = msgBuf[:cap(msgBuf)] + if l := len(uncBuf) + 8; l > len(msgBuf) { + msgBuf = make([]byte, l) + } + + binary.BigEndian.PutUint32(msgBuf[4:8], uint32(len(uncBuf))) + msgBuf = msgBuf[0 : len(uncBuf)+8] + copy(msgBuf[8:], uncBuf) + + if debug { + l.Debugf("write uncompressed message; %v (len=%d)", hm.hdr, len(uncBuf)) + } + } + } else { + if debug { + l.Debugf("write empty message; %v", hm.hdr) + } + binary.BigEndian.PutUint32(msgBuf[4:8], 0) + msgBuf = msgBuf[:8] } - if err := c.flush(); err != nil { + + binary.BigEndian.PutUint32(msgBuf[0:4], encodeHeader(hm.hdr)) + + if err == nil { + var n int + n, err = c.cw.Write(msgBuf) + if debug { + l.Debugf("wrote %d bytes on the wire", n) + } + } + if err != nil { c.close(err) return } @@ -449,16 +532,6 @@ func (c *rawConnection) writerLoop() { } } -func (c *rawConnection) flush() error { - if err := c.xw.Error(); err != nil { - return err - } - if err := c.wb.Flush(); err != nil { - return err - } - return nil -} - func (c *rawConnection) close(err error) { c.once.Do(func() { close(c.closed) @@ -494,13 +567,13 @@ func (c *rawConnection) pingerLoop() { for { select { case <-ticker: - if d := time.Since(c.xr.LastRead()); d < pingIdleTime { + if d := time.Since(c.cr.Last()); d < pingIdleTime { if debug { l.Debugln(c.id, "ping skipped after rd", d) } continue } - if d := time.Since(c.xw.LastWrite()); d < pingIdleTime { + if d := time.Since(c.cw.Last()); d < pingIdleTime { if debug { l.Debugln(c.id, "ping skipped after wr", d) } @@ -532,12 +605,6 @@ func (c *rawConnection) pingerLoop() { } } -func (c *rawConnection) processRequest(msgID int, req RequestMessage) { - data, _ := c.receiver.Request(c.id, req.Repository, req.Name, int64(req.Offset), int(req.Size)) - - c.send(header{0, msgID, messageTypeResponse}, encodableBytes(data)) -} - type Statistics struct { At time.Time InBytesTotal uint64 diff --git a/xdr/reader.go b/xdr/reader.go index 3f9cd0a3d..3dab0c319 100644 --- a/xdr/reader.go +++ b/xdr/reader.go @@ -7,18 +7,15 @@ package xdr import ( "errors" "io" - "time" ) var ErrElementSizeExceeded = errors.New("element size exceeded") type Reader struct { - r io.Reader - tot int - err error - b [8]byte - sb []byte - last time.Time + r io.Reader + err error + b [8]byte + sb []byte } func NewReader(r io.Reader) *Reader { @@ -63,8 +60,6 @@ func (r *Reader) ReadBytesMaxInto(max int, dst []byte) []byte { if r.err != nil { return nil } - r.last = time.Now() - s := r.tot l := int(r.ReadUint32()) if r.err != nil { @@ -85,17 +80,16 @@ func (r *Reader) ReadBytesMaxInto(max int, dst []byte) []byte { n, r.err = io.ReadFull(r.r, dst) if r.err != nil { if debug { - dl.Debugf("@0x%x: rd bytes (%d): %v", s, len(dst), r.err) + dl.Debugf("rd bytes (%d): %v", len(dst), r.err) } return nil } - r.tot += n if debug { if n > maxDebugBytes { - dl.Debugf("@0x%x: rd bytes (%d): %x...", s, len(dst), dst[:maxDebugBytes]) + dl.Debugf("rd bytes (%d): %x...", len(dst), dst[:maxDebugBytes]) } else { - dl.Debugf("@0x%x: rd bytes (%d): %x", s, len(dst), dst) + dl.Debugf("rd bytes (%d): %x", len(dst), dst) } } return dst[:l] @@ -113,15 +107,11 @@ func (r *Reader) ReadUint32() uint32 { if r.err != nil { return 0 } - r.last = time.Now() - s := r.tot - var n int - n, r.err = io.ReadFull(r.r, r.b[:4]) - r.tot += n + _, r.err = io.ReadFull(r.r, r.b[:4]) if r.err != nil { if debug { - dl.Debugf("@0x%x: rd uint32: %v", r.tot, r.err) + dl.Debugf("rd uint32: %v", r.err) } return 0 } @@ -129,7 +119,7 @@ func (r *Reader) ReadUint32() uint32 { v := uint32(r.b[3]) | uint32(r.b[2])<<8 | uint32(r.b[1])<<16 | uint32(r.b[0])<<24 if debug { - dl.Debugf("@0x%x: rd uint32=%d (0x%08x)", s, v, v) + dl.Debugf("rd uint32=%d (0x%08x)", v, v) } return v } @@ -138,15 +128,11 @@ func (r *Reader) ReadUint64() uint64 { if r.err != nil { return 0 } - r.last = time.Now() - s := r.tot - var n int - n, r.err = io.ReadFull(r.r, r.b[:8]) - r.tot += n + _, r.err = io.ReadFull(r.r, r.b[:8]) if r.err != nil { if debug { - dl.Debugf("@0x%x: rd uint64: %v", r.tot, r.err) + dl.Debugf("rd uint64: %v", r.err) } return 0 } @@ -155,19 +141,23 @@ func (r *Reader) ReadUint64() uint64 { uint64(r.b[3])<<32 | uint64(r.b[2])<<40 | uint64(r.b[1])<<48 | uint64(r.b[0])<<56 if debug { - dl.Debugf("@0x%x: rd uint64=%d (0x%016x)", s, v, v) + dl.Debugf("rd uint64=%d (0x%016x)", v, v) } return v } -func (r *Reader) Tot() int { - return r.tot +type XDRError struct { + op string + err error +} + +func (e XDRError) Error() string { + return "xdr " + e.op + ": " + e.err.Error() } func (r *Reader) Error() error { - return r.err -} - -func (r *Reader) LastRead() time.Time { - return r.last + if r.err == nil { + return nil + } + return XDRError{"read", r.err} } diff --git a/xdr/writer.go b/xdr/writer.go index 8dd827368..7c19d8f59 100644 --- a/xdr/writer.go +++ b/xdr/writer.go @@ -4,10 +4,7 @@ package xdr -import ( - "io" - "time" -) +import "io" func pad(l int) int { d := l % 4 @@ -20,11 +17,10 @@ func pad(l int) int { var padBytes = []byte{0, 0, 0} type Writer struct { - w io.Writer - tot int - err error - b [8]byte - last time.Time + w io.Writer + tot int + err error + b [8]byte } type AppendWriter []byte @@ -49,7 +45,6 @@ func (w *Writer) WriteBytes(bs []byte) (int, error) { return 0, w.err } - w.last = time.Now() w.WriteUint32(uint32(len(bs))) if w.err != nil { return 0, w.err @@ -93,7 +88,6 @@ func (w *Writer) WriteUint32(v uint32) (int, error) { return 0, w.err } - w.last = time.Now() if debug { dl.Debugf("wr uint32=%d", v) } @@ -114,7 +108,6 @@ func (w *Writer) WriteUint64(v uint64) (int, error) { return 0, w.err } - w.last = time.Now() if debug { dl.Debugf("wr uint64=%d", v) } @@ -139,9 +132,8 @@ func (w *Writer) Tot() int { } func (w *Writer) Error() error { - return w.err -} - -func (w *Writer) LastWrite() time.Time { - return w.last + if w.err == nil { + return nil + } + return XDRError{"write", w.err} }