diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index 6c1f1821e..cc336bf85 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -23,7 +23,7 @@ }, { "ImportPath": "github.com/calmh/xdr", - "Rev": "45c46b7db7ff83b8b9ee09bbd95f36ab50043ece" + "Rev": "214788d8fedfc310c18eca9ed12be408a5054cd5" }, { "ImportPath": "github.com/juju/ratelimit", diff --git a/Godeps/_workspace/src/github.com/calmh/xdr/reader.go b/Godeps/_workspace/src/github.com/calmh/xdr/reader.go index c21b9ce67..e669322c9 100644 --- a/Godeps/_workspace/src/github.com/calmh/xdr/reader.go +++ b/Godeps/_workspace/src/github.com/calmh/xdr/reader.go @@ -154,6 +154,10 @@ func (e XDRError) Error() string { return "xdr " + e.op + ": " + e.err.Error() } +func (e XDRError) IsEOF() bool { + return e.err == io.EOF +} + func (r *Reader) Error() error { if r.err == nil { return nil diff --git a/internal/protocol/message.go b/internal/protocol/message.go index ae04a9da8..e0c66b9dc 100644 --- a/internal/protocol/message.go +++ b/internal/protocol/message.go @@ -21,8 +21,10 @@ package protocol import "fmt" type IndexMessage struct { - Folder string // max:64 - Files []FileInfo + Folder string // max:64 + Files []FileInfo + Flags uint32 + Options []Option // max:64 } type FileInfo struct { @@ -150,14 +152,18 @@ func (b BlockInfo) String() string { } type RequestMessage struct { - Folder string // max:64 - Name string // max:8192 - Offset uint64 - Size uint32 + Folder string // max:64 + Name string // max:8192 + Offset uint64 + Size uint32 + Hash []byte // max:64 + Flags uint32 + Options []Option // max:64 } type ResponseMessage struct { - Data []byte + Data []byte + Error uint32 } type ClusterConfigMessage struct { @@ -194,6 +200,7 @@ type Option struct { type CloseMessage struct { Reason string // max:1024 + Code uint32 } type EmptyMessage struct{} diff --git a/internal/protocol/message_xdr.go b/internal/protocol/message_xdr.go index 948e63c32..da13111c2 100644 --- a/internal/protocol/message_xdr.go +++ b/internal/protocol/message_xdr.go @@ -30,11 +30,21 @@ IndexMessage Structure: \ Zero or more FileInfo Structures \ / / +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| Flags | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| Number of Options | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +/ / +\ Zero or more Option Structures \ +/ / ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ struct IndexMessage { string Folder<64>; FileInfo Files<>; + unsigned int Flags; + Option Options<64>; } */ @@ -75,6 +85,17 @@ func (o IndexMessage) encodeXDR(xw *xdr.Writer) (int, error) { return xw.Tot(), err } } + xw.WriteUint32(o.Flags) + if l := len(o.Options); l > 64 { + return xw.Tot(), xdr.ElementSizeExceeded("Options", l, 64) + } + xw.WriteUint32(uint32(len(o.Options))) + for i := range o.Options { + _, err := o.Options[i].encodeXDR(xw) + if err != nil { + return xw.Tot(), err + } + } return xw.Tot(), xw.Error() } @@ -96,6 +117,15 @@ func (o *IndexMessage) decodeXDR(xr *xdr.Reader) error { for i := range o.Files { (&o.Files[i]).decodeXDR(xr) } + o.Flags = xr.ReadUint32() + _OptionsSize := int(xr.ReadUint32()) + if _OptionsSize > 64 { + return xdr.ElementSizeExceeded("Options", _OptionsSize, 64) + } + o.Options = make([]Option, _OptionsSize) + for i := range o.Options { + (&o.Options[i]).decodeXDR(xr) + } return xr.Error() } @@ -412,6 +442,20 @@ RequestMessage Structure: +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ | Size | +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| Length of Hash | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +/ / +\ Hash (variable length) \ +/ / ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| Flags | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| Number of Options | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +/ / +\ Zero or more Option Structures \ +/ / ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ struct RequestMessage { @@ -419,6 +463,9 @@ struct RequestMessage { string Name<8192>; unsigned hyper Offset; unsigned int Size; + opaque Hash<64>; + unsigned int Flags; + Option Options<64>; } */ @@ -458,6 +505,21 @@ func (o RequestMessage) encodeXDR(xw *xdr.Writer) (int, error) { xw.WriteString(o.Name) xw.WriteUint64(o.Offset) xw.WriteUint32(o.Size) + if l := len(o.Hash); l > 64 { + return xw.Tot(), xdr.ElementSizeExceeded("Hash", l, 64) + } + xw.WriteBytes(o.Hash) + xw.WriteUint32(o.Flags) + if l := len(o.Options); l > 64 { + return xw.Tot(), xdr.ElementSizeExceeded("Options", l, 64) + } + xw.WriteUint32(uint32(len(o.Options))) + for i := range o.Options { + _, err := o.Options[i].encodeXDR(xw) + if err != nil { + return xw.Tot(), err + } + } return xw.Tot(), xw.Error() } @@ -477,6 +539,16 @@ func (o *RequestMessage) decodeXDR(xr *xdr.Reader) error { o.Name = xr.ReadStringMax(8192) o.Offset = xr.ReadUint64() o.Size = xr.ReadUint32() + o.Hash = xr.ReadBytesMax(64) + o.Flags = xr.ReadUint32() + _OptionsSize := int(xr.ReadUint32()) + if _OptionsSize > 64 { + return xdr.ElementSizeExceeded("Options", _OptionsSize, 64) + } + o.Options = make([]Option, _OptionsSize) + for i := range o.Options { + (&o.Options[i]).decodeXDR(xr) + } return xr.Error() } @@ -493,10 +565,13 @@ ResponseMessage Structure: \ Data (variable length) \ / / +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| Error | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ struct ResponseMessage { opaque Data<>; + unsigned int Error; } */ @@ -527,6 +602,7 @@ func (o ResponseMessage) AppendXDR(bs []byte) ([]byte, error) { func (o ResponseMessage) encodeXDR(xw *xdr.Writer) (int, error) { xw.WriteBytes(o.Data) + xw.WriteUint32(o.Error) return xw.Tot(), xw.Error() } @@ -543,6 +619,7 @@ func (o *ResponseMessage) UnmarshalXDR(bs []byte) error { func (o *ResponseMessage) decodeXDR(xr *xdr.Reader) error { o.Data = xr.ReadBytes() + o.Error = xr.ReadUint32() return xr.Error() } @@ -940,10 +1017,13 @@ CloseMessage Structure: \ Reason (variable length) \ / / +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| Code | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ struct CloseMessage { string Reason<1024>; + unsigned int Code; } */ @@ -977,6 +1057,7 @@ func (o CloseMessage) encodeXDR(xw *xdr.Writer) (int, error) { return xw.Tot(), xdr.ElementSizeExceeded("Reason", l, 1024) } xw.WriteString(o.Reason) + xw.WriteUint32(o.Code) return xw.Tot(), xw.Error() } @@ -993,6 +1074,7 @@ func (o *CloseMessage) UnmarshalXDR(bs []byte) error { func (o *CloseMessage) decodeXDR(xr *xdr.Reader) error { o.Reason = xr.ReadStringMax(1024) + o.Code = xr.ReadUint32() return xr.Error() } diff --git a/internal/protocol/protocol.go b/internal/protocol/protocol.go index fb51a1c01..a55256799 100644 --- a/internal/protocol/protocol.go +++ b/internal/protocol/protocol.go @@ -133,6 +133,10 @@ type encodable interface { AppendXDR([]byte) ([]byte, error) } +type isEofer interface { + IsEOF() bool +} + const ( pingTimeout = 30 * time.Second pingIdleTime = 60 * time.Second @@ -183,7 +187,10 @@ func (c *rawConnection) Index(folder string, idx []FileInfo) error { default: } c.idxMut.Lock() - c.send(-1, messageTypeIndex, IndexMessage{folder, idx}) + c.send(-1, messageTypeIndex, IndexMessage{ + Folder: folder, + Files: idx, + }) c.idxMut.Unlock() return nil } @@ -196,7 +203,10 @@ func (c *rawConnection) IndexUpdate(folder string, idx []FileInfo) error { default: } c.idxMut.Lock() - c.send(-1, messageTypeIndexUpdate, IndexMessage{folder, idx}) + c.send(-1, messageTypeIndexUpdate, IndexMessage{ + Folder: folder, + Files: idx, + }) c.idxMut.Unlock() return nil } @@ -218,7 +228,12 @@ func (c *rawConnection) Request(folder string, name string, offset int64, size i c.awaiting[id] = rc c.awaitingMut.Unlock() - ok := c.send(id, messageTypeRequest, RequestMessage{folder, name, uint64(offset), uint32(size)}) + ok := c.send(id, messageTypeRequest, RequestMessage{ + Folder: folder, + Name: name, + Offset: uint64(offset), + Size: uint32(size), + }) if !ok { return nil, ErrClosed } @@ -341,6 +356,11 @@ func (c *rawConnection) readMessage() (hdr header, msg encodable, err error) { l.Debugf("read header %v (msglen=%d)", hdr, msglen) } + if hdr.version != 0 { + err = fmt.Errorf("unknown protocol version 0x%x", hdr.version) + return + } + if cap(c.rdbuf0) < msglen { c.rdbuf0 = make([]byte, msglen) } else { @@ -376,20 +396,36 @@ func (c *rawConnection) readMessage() (hdr header, msg encodable, err error) { } } + // We check each returned error for the XDRError.IsEOF() method. + // IsEOF()==true here means that the message contained fewer fields than + // expected. It does not signify an EOF on the socket, because we've + // successfully read a size value and that many bytes already. New fields + // we expected but the other peer didn't send should be interpreted as + // zero/nil, and if that's not valid we'll verify it somewhere else. + switch hdr.msgType { case messageTypeIndex, messageTypeIndexUpdate: var idx IndexMessage err = idx.UnmarshalXDR(msgBuf) + if xdrErr, ok := err.(isEofer); ok && xdrErr.IsEOF() { + err = nil + } msg = idx case messageTypeRequest: var req RequestMessage err = req.UnmarshalXDR(msgBuf) + if xdrErr, ok := err.(isEofer); ok && xdrErr.IsEOF() { + err = nil + } msg = req case messageTypeResponse: var resp ResponseMessage err = resp.UnmarshalXDR(msgBuf) + if xdrErr, ok := err.(isEofer); ok && xdrErr.IsEOF() { + err = nil + } msg = resp case messageTypePing, messageTypePong: @@ -398,11 +434,17 @@ func (c *rawConnection) readMessage() (hdr header, msg encodable, err error) { case messageTypeClusterConfig: var cc ClusterConfigMessage err = cc.UnmarshalXDR(msgBuf) + if xdrErr, ok := err.(isEofer); ok && xdrErr.IsEOF() { + err = nil + } msg = cc case messageTypeClose: var cm CloseMessage err = cm.UnmarshalXDR(msgBuf) + if xdrErr, ok := err.(isEofer); ok && xdrErr.IsEOF() { + err = nil + } msg = cm default: @@ -429,7 +471,9 @@ func (c *rawConnection) handleIndexUpdate(im IndexMessage) { func (c *rawConnection) handleRequest(msgID int, req RequestMessage) { data, _ := c.receiver.Request(c.id, req.Folder, req.Name, int64(req.Offset), int(req.Size)) - c.send(msgID, messageTypeResponse, ResponseMessage{data}) + c.send(msgID, messageTypeResponse, ResponseMessage{ + Data: data, + }) } func (c *rawConnection) handleResponse(msgID int, resp ResponseMessage) { diff --git a/internal/protocol/protocol_test.go b/internal/protocol/protocol_test.go index 9da9422c1..75e0086ee 100644 --- a/internal/protocol/protocol_test.go +++ b/internal/protocol/protocol_test.go @@ -189,7 +189,7 @@ func TestVersionErr(t *testing.T) { msgID: 0, msgType: 0, })) - w.WriteUint32(0) + w.WriteUint32(0) // Avoids reader closing due to EOF if !m1.isClosed() { t.Error("Connection should close due to unknown version") @@ -212,7 +212,7 @@ func TestTypeErr(t *testing.T) { msgID: 0, msgType: 42, })) - w.WriteUint32(0) + w.WriteUint32(0) // Avoids reader closing due to EOF if !m1.isClosed() { t.Error("Connection should close due to unknown message type")