From f28367bcfc94f2648c99ca05403993c5d56ec3cd Mon Sep 17 00:00:00 2001 From: Jakob Borg Date: Mon, 22 Sep 2014 21:42:11 +0200 Subject: [PATCH] Move top level packages to internal. --- .gitignore | 1 + common_test.go | 76 ++++ counting.go | 64 +++ debug.go | 17 + doc.go | 6 + header.go | 45 ++ message.go | 139 ++++++ message_xdr.go | 964 +++++++++++++++++++++++++++++++++++++++++ nativemodel_darwin.go | 42 ++ nativemodel_unix.go | 33 ++ nativemodel_windows.go | 72 +++ nodeid.go | 159 +++++++ nodeid_test.go | 78 ++++ protocol.go | 640 +++++++++++++++++++++++++++ protocol_test.go | 383 ++++++++++++++++ wireformat.go | 58 +++ 16 files changed, 2777 insertions(+) create mode 100644 .gitignore create mode 100644 common_test.go create mode 100644 counting.go create mode 100644 debug.go create mode 100644 doc.go create mode 100644 header.go create mode 100644 message.go create mode 100644 message_xdr.go create mode 100644 nativemodel_darwin.go create mode 100644 nativemodel_unix.go create mode 100644 nativemodel_windows.go create mode 100644 nodeid.go create mode 100644 nodeid_test.go create mode 100644 protocol.go create mode 100644 protocol_test.go create mode 100644 wireformat.go diff --git a/.gitignore b/.gitignore new file mode 100644 index 000000000..2211df63d --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +*.txt diff --git a/common_test.go b/common_test.go new file mode 100644 index 000000000..9d387825c --- /dev/null +++ b/common_test.go @@ -0,0 +1,76 @@ +// Copyright (C) 2014 Jakob Borg and Contributors (see the CONTRIBUTORS file). +// All rights reserved. Use of this source code is governed by an MIT-style +// license that can be found in the LICENSE file. + +package protocol + +import ( + "io" + "time" +) + +type TestModel struct { + data []byte + repo string + name string + offset int64 + size int + closedCh chan bool +} + +func newTestModel() *TestModel { + return &TestModel{ + closedCh: make(chan bool), + } +} + +func (t *TestModel) Index(nodeID NodeID, repo string, files []FileInfo) { +} + +func (t *TestModel) IndexUpdate(nodeID NodeID, repo string, files []FileInfo) { +} + +func (t *TestModel) Request(nodeID NodeID, repo, name string, offset int64, size int) ([]byte, error) { + t.repo = repo + t.name = name + t.offset = offset + t.size = size + return t.data, nil +} + +func (t *TestModel) Close(nodeID NodeID, err error) { + close(t.closedCh) +} + +func (t *TestModel) ClusterConfig(nodeID NodeID, config ClusterConfigMessage) { +} + +func (t *TestModel) isClosed() bool { + select { + case <-t.closedCh: + return true + case <-time.After(1 * time.Second): + return false // Timeout + } +} + +type ErrPipe struct { + io.PipeWriter + written int + max int + err error + closed bool +} + +func (e *ErrPipe) Write(data []byte) (int, error) { + if e.closed { + return 0, e.err + } + if e.written+len(data) > e.max { + n, _ := e.PipeWriter.Write(data[:e.max-e.written]) + e.PipeWriter.CloseWithError(e.err) + e.closed = true + return n, e.err + } + return e.PipeWriter.Write(data) +} diff --git a/counting.go b/counting.go new file mode 100644 index 000000000..512774fba --- /dev/null +++ b/counting.go @@ -0,0 +1,64 @@ +// Copyright (C) 2014 Jakob Borg and Contributors (see the CONTRIBUTORS file). +// All rights reserved. Use of this source code is governed by an MIT-style +// license that can be found in the LICENSE file. + +package protocol + +import ( + "io" + "sync/atomic" + "time" +) + +type countingReader struct { + io.Reader + tot uint64 // bytes + last int64 // unix nanos +} + +var ( + totalIncoming uint64 + totalOutgoing uint64 +) + +func (c *countingReader) Read(bs []byte) (int, error) { + n, err := c.Reader.Read(bs) + atomic.AddUint64(&c.tot, uint64(n)) + atomic.AddUint64(&totalIncoming, uint64(n)) + atomic.StoreInt64(&c.last, time.Now().UnixNano()) + return n, err +} + +func (c *countingReader) Tot() uint64 { + return atomic.LoadUint64(&c.tot) +} + +func (c *countingReader) Last() time.Time { + return time.Unix(0, atomic.LoadInt64(&c.last)) +} + +type countingWriter struct { + io.Writer + tot uint64 // bytes + last int64 // unix nanos +} + +func (c *countingWriter) Write(bs []byte) (int, error) { + n, err := c.Writer.Write(bs) + atomic.AddUint64(&c.tot, uint64(n)) + atomic.AddUint64(&totalOutgoing, uint64(n)) + atomic.StoreInt64(&c.last, time.Now().UnixNano()) + return n, err +} + +func (c *countingWriter) Tot() uint64 { + return atomic.LoadUint64(&c.tot) +} + +func (c *countingWriter) Last() time.Time { + return time.Unix(0, atomic.LoadInt64(&c.last)) +} + +func TotalInOut() (uint64, uint64) { + return atomic.LoadUint64(&totalIncoming), atomic.LoadUint64(&totalOutgoing) +} diff --git a/debug.go b/debug.go new file mode 100644 index 000000000..6c586b90e --- /dev/null +++ b/debug.go @@ -0,0 +1,17 @@ +// Copyright (C) 2014 Jakob Borg and Contributors (see the CONTRIBUTORS file). +// All rights reserved. Use of this source code is governed by an MIT-style +// license that can be found in the LICENSE file. + +package protocol + +import ( + "os" + "strings" + + "github.com/syncthing/syncthing/internal/logger" +) + +var ( + debug = strings.Contains(os.Getenv("STTRACE"), "protocol") || os.Getenv("STTRACE") == "all" + l = logger.DefaultLogger +) diff --git a/doc.go b/doc.go new file mode 100644 index 000000000..8c6b524e6 --- /dev/null +++ b/doc.go @@ -0,0 +1,6 @@ +// Copyright (C) 2014 Jakob Borg and Contributors (see the CONTRIBUTORS file). +// All rights reserved. Use of this source code is governed by an MIT-style +// license that can be found in the LICENSE file. + +// Package protocol implements the Block Exchange Protocol. +package protocol diff --git a/header.go b/header.go new file mode 100644 index 000000000..6fd2ebb8f --- /dev/null +++ b/header.go @@ -0,0 +1,45 @@ +// Copyright (C) 2014 Jakob Borg and Contributors (see the CONTRIBUTORS file). +// All rights reserved. Use of this source code is governed by an MIT-style +// license that can be found in the LICENSE file. + +package protocol + +import "github.com/calmh/xdr" + +type header struct { + version int + msgID int + msgType int + compression bool +} + +func (h header) encodeXDR(xw *xdr.Writer) (int, error) { + u := encodeHeader(h) + return xw.WriteUint32(u) +} + +func (h *header) decodeXDR(xr *xdr.Reader) error { + u := xr.ReadUint32() + *h = decodeHeader(u) + return xr.Error() +} + +func encodeHeader(h header) uint32 { + var isComp uint32 + if h.compression { + isComp = 1 << 0 // the zeroth bit is the compression bit + } + return uint32(h.version&0xf)<<28 + + uint32(h.msgID&0xfff)<<16 + + uint32(h.msgType&0xff)<<8 + + isComp +} + +func decodeHeader(u uint32) header { + return header{ + version: int(u>>28) & 0xf, + msgID: int(u>>16) & 0xfff, + msgType: int(u>>8) & 0xff, + compression: u&1 == 1, + } +} diff --git a/message.go b/message.go new file mode 100644 index 000000000..779817a7b --- /dev/null +++ b/message.go @@ -0,0 +1,139 @@ +// Copyright (C) 2014 Jakob Borg and Contributors (see the CONTRIBUTORS file). +// All rights reserved. Use of this source code is governed by an MIT-style +// license that can be found in the LICENSE file. + +package protocol + +import "fmt" + +type IndexMessage struct { + Repository string // max:64 + Files []FileInfo +} + +type FileInfo struct { + Name string // max:8192 + Flags uint32 + Modified int64 + Version uint64 + LocalVersion uint64 + Blocks []BlockInfo +} + +func (f FileInfo) String() string { + return fmt.Sprintf("File{Name:%q, Flags:0%o, Modified:%d, Version:%d, Size:%d, Blocks:%v}", + f.Name, f.Flags, f.Modified, f.Version, f.Size(), f.Blocks) +} + +func (f FileInfo) Size() (bytes int64) { + if IsDeleted(f.Flags) || IsDirectory(f.Flags) { + return 128 + } + for _, b := range f.Blocks { + bytes += int64(b.Size) + } + return +} + +func (f FileInfo) IsDeleted() bool { + return IsDeleted(f.Flags) +} + +func (f FileInfo) IsInvalid() bool { + return IsInvalid(f.Flags) +} + +// Used for unmarshalling a FileInfo structure but skipping the actual block list +type FileInfoTruncated struct { + Name string // max:8192 + Flags uint32 + Modified int64 + Version uint64 + LocalVersion uint64 + NumBlocks uint32 +} + +// Returns a statistical guess on the size, not the exact figure +func (f FileInfoTruncated) Size() int64 { + if IsDeleted(f.Flags) || IsDirectory(f.Flags) { + return 128 + } + if f.NumBlocks < 2 { + return BlockSize / 2 + } else { + return int64(f.NumBlocks-1)*BlockSize + BlockSize/2 + } +} + +func (f FileInfoTruncated) IsDeleted() bool { + return IsDeleted(f.Flags) +} + +func (f FileInfoTruncated) IsInvalid() bool { + return IsInvalid(f.Flags) +} + +type FileIntf interface { + Size() int64 + IsDeleted() bool + IsInvalid() bool +} + +type BlockInfo struct { + Offset int64 // noencode (cache only) + Size uint32 + Hash []byte // max:64 +} + +func (b BlockInfo) String() string { + return fmt.Sprintf("Block{%d/%d/%x}", b.Offset, b.Size, b.Hash) +} + +type RequestMessage struct { + Repository string // max:64 + Name string // max:8192 + Offset uint64 + Size uint32 +} + +type ResponseMessage struct { + Data []byte +} + +type ClusterConfigMessage struct { + ClientName string // max:64 + ClientVersion string // max:64 + Repositories []Repository // max:64 + Options []Option // max:64 +} + +func (o *ClusterConfigMessage) GetOption(key string) string { + for _, option := range o.Options { + if option.Key == key { + return option.Value + } + } + return "" +} + +type Repository struct { + ID string // max:64 + Nodes []Node // max:64 +} + +type Node struct { + ID []byte // max:32 + Flags uint32 + MaxLocalVersion uint64 +} + +type Option struct { + Key string // max:64 + Value string // max:1024 +} + +type CloseMessage struct { + Reason string // max:1024 +} + +type EmptyMessage struct{} diff --git a/message_xdr.go b/message_xdr.go new file mode 100644 index 000000000..11a11da34 --- /dev/null +++ b/message_xdr.go @@ -0,0 +1,964 @@ +// ************************************************************ +// This file is automatically generated by genxdr. Do not edit. +// ************************************************************ + +package protocol + +import ( + "bytes" + "io" + + "github.com/calmh/xdr" +) + +/* + +IndexMessage Structure: + + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| Length of Repository | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +/ / +\ Repository (variable length) \ +/ / ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| Number of Files | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +/ / +\ Zero or more FileInfo Structures \ +/ / ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + + +struct IndexMessage { + string Repository<64>; + FileInfo Files<>; +} + +*/ + +func (o IndexMessage) EncodeXDR(w io.Writer) (int, error) { + var xw = xdr.NewWriter(w) + return o.encodeXDR(xw) +} + +func (o IndexMessage) MarshalXDR() []byte { + return o.AppendXDR(make([]byte, 0, 128)) +} + +func (o IndexMessage) AppendXDR(bs []byte) []byte { + var aw = xdr.AppendWriter(bs) + var xw = xdr.NewWriter(&aw) + o.encodeXDR(xw) + return []byte(aw) +} + +func (o IndexMessage) encodeXDR(xw *xdr.Writer) (int, error) { + if len(o.Repository) > 64 { + return xw.Tot(), xdr.ErrElementSizeExceeded + } + xw.WriteString(o.Repository) + xw.WriteUint32(uint32(len(o.Files))) + for i := range o.Files { + _, err := o.Files[i].encodeXDR(xw) + if err != nil { + return xw.Tot(), err + } + } + return xw.Tot(), xw.Error() +} + +func (o *IndexMessage) DecodeXDR(r io.Reader) error { + xr := xdr.NewReader(r) + return o.decodeXDR(xr) +} + +func (o *IndexMessage) UnmarshalXDR(bs []byte) error { + var br = bytes.NewReader(bs) + var xr = xdr.NewReader(br) + return o.decodeXDR(xr) +} + +func (o *IndexMessage) decodeXDR(xr *xdr.Reader) error { + o.Repository = xr.ReadStringMax(64) + _FilesSize := int(xr.ReadUint32()) + o.Files = make([]FileInfo, _FilesSize) + for i := range o.Files { + (&o.Files[i]).decodeXDR(xr) + } + return xr.Error() +} + +/* + +FileInfo Structure: + + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| Length of Name | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +/ / +\ Name (variable length) \ +/ / ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| Flags | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| | ++ Modified (64 bits) + +| | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| | ++ Version (64 bits) + +| | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| | ++ Local Version (64 bits) + +| | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| Number of Blocks | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +/ / +\ Zero or more BlockInfo Structures \ +/ / ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + + +struct FileInfo { + string Name<8192>; + unsigned int Flags; + hyper Modified; + unsigned hyper Version; + unsigned hyper LocalVersion; + BlockInfo Blocks<>; +} + +*/ + +func (o FileInfo) EncodeXDR(w io.Writer) (int, error) { + var xw = xdr.NewWriter(w) + return o.encodeXDR(xw) +} + +func (o FileInfo) MarshalXDR() []byte { + return o.AppendXDR(make([]byte, 0, 128)) +} + +func (o FileInfo) AppendXDR(bs []byte) []byte { + var aw = xdr.AppendWriter(bs) + var xw = xdr.NewWriter(&aw) + o.encodeXDR(xw) + return []byte(aw) +} + +func (o FileInfo) encodeXDR(xw *xdr.Writer) (int, error) { + if len(o.Name) > 8192 { + return xw.Tot(), xdr.ErrElementSizeExceeded + } + xw.WriteString(o.Name) + xw.WriteUint32(o.Flags) + xw.WriteUint64(uint64(o.Modified)) + xw.WriteUint64(o.Version) + xw.WriteUint64(o.LocalVersion) + xw.WriteUint32(uint32(len(o.Blocks))) + for i := range o.Blocks { + _, err := o.Blocks[i].encodeXDR(xw) + if err != nil { + return xw.Tot(), err + } + } + return xw.Tot(), xw.Error() +} + +func (o *FileInfo) DecodeXDR(r io.Reader) error { + xr := xdr.NewReader(r) + return o.decodeXDR(xr) +} + +func (o *FileInfo) UnmarshalXDR(bs []byte) error { + var br = bytes.NewReader(bs) + var xr = xdr.NewReader(br) + return o.decodeXDR(xr) +} + +func (o *FileInfo) decodeXDR(xr *xdr.Reader) error { + o.Name = xr.ReadStringMax(8192) + o.Flags = xr.ReadUint32() + o.Modified = int64(xr.ReadUint64()) + o.Version = xr.ReadUint64() + o.LocalVersion = xr.ReadUint64() + _BlocksSize := int(xr.ReadUint32()) + o.Blocks = make([]BlockInfo, _BlocksSize) + for i := range o.Blocks { + (&o.Blocks[i]).decodeXDR(xr) + } + return xr.Error() +} + +/* + +FileInfoTruncated Structure: + + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| Length of Name | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +/ / +\ Name (variable length) \ +/ / ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| Flags | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| | ++ Modified (64 bits) + +| | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| | ++ Version (64 bits) + +| | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| | ++ Local Version (64 bits) + +| | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| Num Blocks | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + + +struct FileInfoTruncated { + string Name<8192>; + unsigned int Flags; + hyper Modified; + unsigned hyper Version; + unsigned hyper LocalVersion; + unsigned int NumBlocks; +} + +*/ + +func (o FileInfoTruncated) EncodeXDR(w io.Writer) (int, error) { + var xw = xdr.NewWriter(w) + return o.encodeXDR(xw) +} + +func (o FileInfoTruncated) MarshalXDR() []byte { + return o.AppendXDR(make([]byte, 0, 128)) +} + +func (o FileInfoTruncated) AppendXDR(bs []byte) []byte { + var aw = xdr.AppendWriter(bs) + var xw = xdr.NewWriter(&aw) + o.encodeXDR(xw) + return []byte(aw) +} + +func (o FileInfoTruncated) encodeXDR(xw *xdr.Writer) (int, error) { + if len(o.Name) > 8192 { + return xw.Tot(), xdr.ErrElementSizeExceeded + } + xw.WriteString(o.Name) + xw.WriteUint32(o.Flags) + xw.WriteUint64(uint64(o.Modified)) + xw.WriteUint64(o.Version) + xw.WriteUint64(o.LocalVersion) + xw.WriteUint32(o.NumBlocks) + return xw.Tot(), xw.Error() +} + +func (o *FileInfoTruncated) DecodeXDR(r io.Reader) error { + xr := xdr.NewReader(r) + return o.decodeXDR(xr) +} + +func (o *FileInfoTruncated) UnmarshalXDR(bs []byte) error { + var br = bytes.NewReader(bs) + var xr = xdr.NewReader(br) + return o.decodeXDR(xr) +} + +func (o *FileInfoTruncated) decodeXDR(xr *xdr.Reader) error { + o.Name = xr.ReadStringMax(8192) + o.Flags = xr.ReadUint32() + o.Modified = int64(xr.ReadUint64()) + o.Version = xr.ReadUint64() + o.LocalVersion = xr.ReadUint64() + o.NumBlocks = xr.ReadUint32() + return xr.Error() +} + +/* + +BlockInfo Structure: + + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| Size | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| Length of Hash | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +/ / +\ Hash (variable length) \ +/ / ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + + +struct BlockInfo { + unsigned int Size; + opaque Hash<64>; +} + +*/ + +func (o BlockInfo) EncodeXDR(w io.Writer) (int, error) { + var xw = xdr.NewWriter(w) + return o.encodeXDR(xw) +} + +func (o BlockInfo) MarshalXDR() []byte { + return o.AppendXDR(make([]byte, 0, 128)) +} + +func (o BlockInfo) AppendXDR(bs []byte) []byte { + var aw = xdr.AppendWriter(bs) + var xw = xdr.NewWriter(&aw) + o.encodeXDR(xw) + return []byte(aw) +} + +func (o BlockInfo) encodeXDR(xw *xdr.Writer) (int, error) { + xw.WriteUint32(o.Size) + if len(o.Hash) > 64 { + return xw.Tot(), xdr.ErrElementSizeExceeded + } + xw.WriteBytes(o.Hash) + return xw.Tot(), xw.Error() +} + +func (o *BlockInfo) DecodeXDR(r io.Reader) error { + xr := xdr.NewReader(r) + return o.decodeXDR(xr) +} + +func (o *BlockInfo) UnmarshalXDR(bs []byte) error { + var br = bytes.NewReader(bs) + var xr = xdr.NewReader(br) + return o.decodeXDR(xr) +} + +func (o *BlockInfo) decodeXDR(xr *xdr.Reader) error { + o.Size = xr.ReadUint32() + o.Hash = xr.ReadBytesMax(64) + return xr.Error() +} + +/* + +RequestMessage Structure: + + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| Length of Repository | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +/ / +\ Repository (variable length) \ +/ / ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| Length of Name | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +/ / +\ Name (variable length) \ +/ / ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| | ++ Offset (64 bits) + +| | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| Size | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + + +struct RequestMessage { + string Repository<64>; + string Name<8192>; + unsigned hyper Offset; + unsigned int Size; +} + +*/ + +func (o RequestMessage) EncodeXDR(w io.Writer) (int, error) { + var xw = xdr.NewWriter(w) + return o.encodeXDR(xw) +} + +func (o RequestMessage) MarshalXDR() []byte { + return o.AppendXDR(make([]byte, 0, 128)) +} + +func (o RequestMessage) AppendXDR(bs []byte) []byte { + var aw = xdr.AppendWriter(bs) + var xw = xdr.NewWriter(&aw) + o.encodeXDR(xw) + return []byte(aw) +} + +func (o RequestMessage) encodeXDR(xw *xdr.Writer) (int, error) { + if len(o.Repository) > 64 { + return xw.Tot(), xdr.ErrElementSizeExceeded + } + xw.WriteString(o.Repository) + if len(o.Name) > 8192 { + return xw.Tot(), xdr.ErrElementSizeExceeded + } + xw.WriteString(o.Name) + xw.WriteUint64(o.Offset) + xw.WriteUint32(o.Size) + return xw.Tot(), xw.Error() +} + +func (o *RequestMessage) DecodeXDR(r io.Reader) error { + xr := xdr.NewReader(r) + return o.decodeXDR(xr) +} + +func (o *RequestMessage) UnmarshalXDR(bs []byte) error { + var br = bytes.NewReader(bs) + var xr = xdr.NewReader(br) + return o.decodeXDR(xr) +} + +func (o *RequestMessage) decodeXDR(xr *xdr.Reader) error { + o.Repository = xr.ReadStringMax(64) + o.Name = xr.ReadStringMax(8192) + o.Offset = xr.ReadUint64() + o.Size = xr.ReadUint32() + return xr.Error() +} + +/* + +ResponseMessage Structure: + + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| Length of Data | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +/ / +\ Data (variable length) \ +/ / ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + + +struct ResponseMessage { + opaque Data<>; +} + +*/ + +func (o ResponseMessage) EncodeXDR(w io.Writer) (int, error) { + var xw = xdr.NewWriter(w) + return o.encodeXDR(xw) +} + +func (o ResponseMessage) MarshalXDR() []byte { + return o.AppendXDR(make([]byte, 0, 128)) +} + +func (o ResponseMessage) AppendXDR(bs []byte) []byte { + var aw = xdr.AppendWriter(bs) + var xw = xdr.NewWriter(&aw) + o.encodeXDR(xw) + return []byte(aw) +} + +func (o ResponseMessage) encodeXDR(xw *xdr.Writer) (int, error) { + xw.WriteBytes(o.Data) + return xw.Tot(), xw.Error() +} + +func (o *ResponseMessage) DecodeXDR(r io.Reader) error { + xr := xdr.NewReader(r) + return o.decodeXDR(xr) +} + +func (o *ResponseMessage) UnmarshalXDR(bs []byte) error { + var br = bytes.NewReader(bs) + var xr = xdr.NewReader(br) + return o.decodeXDR(xr) +} + +func (o *ResponseMessage) decodeXDR(xr *xdr.Reader) error { + o.Data = xr.ReadBytes() + return xr.Error() +} + +/* + +ClusterConfigMessage Structure: + + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| Length of Client Name | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +/ / +\ Client Name (variable length) \ +/ / ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| Length of Client Version | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +/ / +\ Client Version (variable length) \ +/ / ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| Number of Repositories | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +/ / +\ Zero or more Repository Structures \ +/ / ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| Number of Options | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +/ / +\ Zero or more Option Structures \ +/ / ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + + +struct ClusterConfigMessage { + string ClientName<64>; + string ClientVersion<64>; + Repository Repositories<64>; + Option Options<64>; +} + +*/ + +func (o ClusterConfigMessage) EncodeXDR(w io.Writer) (int, error) { + var xw = xdr.NewWriter(w) + return o.encodeXDR(xw) +} + +func (o ClusterConfigMessage) MarshalXDR() []byte { + return o.AppendXDR(make([]byte, 0, 128)) +} + +func (o ClusterConfigMessage) AppendXDR(bs []byte) []byte { + var aw = xdr.AppendWriter(bs) + var xw = xdr.NewWriter(&aw) + o.encodeXDR(xw) + return []byte(aw) +} + +func (o ClusterConfigMessage) encodeXDR(xw *xdr.Writer) (int, error) { + if len(o.ClientName) > 64 { + return xw.Tot(), xdr.ErrElementSizeExceeded + } + xw.WriteString(o.ClientName) + if len(o.ClientVersion) > 64 { + return xw.Tot(), xdr.ErrElementSizeExceeded + } + xw.WriteString(o.ClientVersion) + if len(o.Repositories) > 64 { + return xw.Tot(), xdr.ErrElementSizeExceeded + } + xw.WriteUint32(uint32(len(o.Repositories))) + for i := range o.Repositories { + _, err := o.Repositories[i].encodeXDR(xw) + if err != nil { + return xw.Tot(), err + } + } + if len(o.Options) > 64 { + return xw.Tot(), xdr.ErrElementSizeExceeded + } + xw.WriteUint32(uint32(len(o.Options))) + for i := range o.Options { + _, err := o.Options[i].encodeXDR(xw) + if err != nil { + return xw.Tot(), err + } + } + return xw.Tot(), xw.Error() +} + +func (o *ClusterConfigMessage) DecodeXDR(r io.Reader) error { + xr := xdr.NewReader(r) + return o.decodeXDR(xr) +} + +func (o *ClusterConfigMessage) UnmarshalXDR(bs []byte) error { + var br = bytes.NewReader(bs) + var xr = xdr.NewReader(br) + return o.decodeXDR(xr) +} + +func (o *ClusterConfigMessage) decodeXDR(xr *xdr.Reader) error { + o.ClientName = xr.ReadStringMax(64) + o.ClientVersion = xr.ReadStringMax(64) + _RepositoriesSize := int(xr.ReadUint32()) + if _RepositoriesSize > 64 { + return xdr.ErrElementSizeExceeded + } + o.Repositories = make([]Repository, _RepositoriesSize) + for i := range o.Repositories { + (&o.Repositories[i]).decodeXDR(xr) + } + _OptionsSize := int(xr.ReadUint32()) + if _OptionsSize > 64 { + return xdr.ErrElementSizeExceeded + } + o.Options = make([]Option, _OptionsSize) + for i := range o.Options { + (&o.Options[i]).decodeXDR(xr) + } + return xr.Error() +} + +/* + +Repository Structure: + + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| Length of ID | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +/ / +\ ID (variable length) \ +/ / ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| Number of Nodes | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +/ / +\ Zero or more Node Structures \ +/ / ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + + +struct Repository { + string ID<64>; + Node Nodes<64>; +} + +*/ + +func (o Repository) EncodeXDR(w io.Writer) (int, error) { + var xw = xdr.NewWriter(w) + return o.encodeXDR(xw) +} + +func (o Repository) MarshalXDR() []byte { + return o.AppendXDR(make([]byte, 0, 128)) +} + +func (o Repository) AppendXDR(bs []byte) []byte { + var aw = xdr.AppendWriter(bs) + var xw = xdr.NewWriter(&aw) + o.encodeXDR(xw) + return []byte(aw) +} + +func (o Repository) encodeXDR(xw *xdr.Writer) (int, error) { + if len(o.ID) > 64 { + return xw.Tot(), xdr.ErrElementSizeExceeded + } + xw.WriteString(o.ID) + if len(o.Nodes) > 64 { + return xw.Tot(), xdr.ErrElementSizeExceeded + } + xw.WriteUint32(uint32(len(o.Nodes))) + for i := range o.Nodes { + _, err := o.Nodes[i].encodeXDR(xw) + if err != nil { + return xw.Tot(), err + } + } + return xw.Tot(), xw.Error() +} + +func (o *Repository) DecodeXDR(r io.Reader) error { + xr := xdr.NewReader(r) + return o.decodeXDR(xr) +} + +func (o *Repository) UnmarshalXDR(bs []byte) error { + var br = bytes.NewReader(bs) + var xr = xdr.NewReader(br) + return o.decodeXDR(xr) +} + +func (o *Repository) decodeXDR(xr *xdr.Reader) error { + o.ID = xr.ReadStringMax(64) + _NodesSize := int(xr.ReadUint32()) + if _NodesSize > 64 { + return xdr.ErrElementSizeExceeded + } + o.Nodes = make([]Node, _NodesSize) + for i := range o.Nodes { + (&o.Nodes[i]).decodeXDR(xr) + } + return xr.Error() +} + +/* + +Node Structure: + + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| Length of ID | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +/ / +\ ID (variable length) \ +/ / ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| Flags | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| | ++ Max Local Version (64 bits) + +| | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + + +struct Node { + opaque ID<32>; + unsigned int Flags; + unsigned hyper MaxLocalVersion; +} + +*/ + +func (o Node) EncodeXDR(w io.Writer) (int, error) { + var xw = xdr.NewWriter(w) + return o.encodeXDR(xw) +} + +func (o Node) MarshalXDR() []byte { + return o.AppendXDR(make([]byte, 0, 128)) +} + +func (o Node) AppendXDR(bs []byte) []byte { + var aw = xdr.AppendWriter(bs) + var xw = xdr.NewWriter(&aw) + o.encodeXDR(xw) + return []byte(aw) +} + +func (o Node) encodeXDR(xw *xdr.Writer) (int, error) { + if len(o.ID) > 32 { + return xw.Tot(), xdr.ErrElementSizeExceeded + } + xw.WriteBytes(o.ID) + xw.WriteUint32(o.Flags) + xw.WriteUint64(o.MaxLocalVersion) + return xw.Tot(), xw.Error() +} + +func (o *Node) DecodeXDR(r io.Reader) error { + xr := xdr.NewReader(r) + return o.decodeXDR(xr) +} + +func (o *Node) UnmarshalXDR(bs []byte) error { + var br = bytes.NewReader(bs) + var xr = xdr.NewReader(br) + return o.decodeXDR(xr) +} + +func (o *Node) decodeXDR(xr *xdr.Reader) error { + o.ID = xr.ReadBytesMax(32) + o.Flags = xr.ReadUint32() + o.MaxLocalVersion = xr.ReadUint64() + return xr.Error() +} + +/* + +Option Structure: + + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| Length of Key | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +/ / +\ Key (variable length) \ +/ / ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| Length of Value | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +/ / +\ Value (variable length) \ +/ / ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + + +struct Option { + string Key<64>; + string Value<1024>; +} + +*/ + +func (o Option) EncodeXDR(w io.Writer) (int, error) { + var xw = xdr.NewWriter(w) + return o.encodeXDR(xw) +} + +func (o Option) MarshalXDR() []byte { + return o.AppendXDR(make([]byte, 0, 128)) +} + +func (o Option) AppendXDR(bs []byte) []byte { + var aw = xdr.AppendWriter(bs) + var xw = xdr.NewWriter(&aw) + o.encodeXDR(xw) + return []byte(aw) +} + +func (o Option) encodeXDR(xw *xdr.Writer) (int, error) { + if len(o.Key) > 64 { + return xw.Tot(), xdr.ErrElementSizeExceeded + } + xw.WriteString(o.Key) + if len(o.Value) > 1024 { + return xw.Tot(), xdr.ErrElementSizeExceeded + } + xw.WriteString(o.Value) + return xw.Tot(), xw.Error() +} + +func (o *Option) DecodeXDR(r io.Reader) error { + xr := xdr.NewReader(r) + return o.decodeXDR(xr) +} + +func (o *Option) UnmarshalXDR(bs []byte) error { + var br = bytes.NewReader(bs) + var xr = xdr.NewReader(br) + return o.decodeXDR(xr) +} + +func (o *Option) decodeXDR(xr *xdr.Reader) error { + o.Key = xr.ReadStringMax(64) + o.Value = xr.ReadStringMax(1024) + return xr.Error() +} + +/* + +CloseMessage Structure: + + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +| Length of Reason | ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +/ / +\ Reason (variable length) \ +/ / ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + + +struct CloseMessage { + string Reason<1024>; +} + +*/ + +func (o CloseMessage) EncodeXDR(w io.Writer) (int, error) { + var xw = xdr.NewWriter(w) + return o.encodeXDR(xw) +} + +func (o CloseMessage) MarshalXDR() []byte { + return o.AppendXDR(make([]byte, 0, 128)) +} + +func (o CloseMessage) AppendXDR(bs []byte) []byte { + var aw = xdr.AppendWriter(bs) + var xw = xdr.NewWriter(&aw) + o.encodeXDR(xw) + return []byte(aw) +} + +func (o CloseMessage) encodeXDR(xw *xdr.Writer) (int, error) { + if len(o.Reason) > 1024 { + return xw.Tot(), xdr.ErrElementSizeExceeded + } + xw.WriteString(o.Reason) + return xw.Tot(), xw.Error() +} + +func (o *CloseMessage) DecodeXDR(r io.Reader) error { + xr := xdr.NewReader(r) + return o.decodeXDR(xr) +} + +func (o *CloseMessage) UnmarshalXDR(bs []byte) error { + var br = bytes.NewReader(bs) + var xr = xdr.NewReader(br) + return o.decodeXDR(xr) +} + +func (o *CloseMessage) decodeXDR(xr *xdr.Reader) error { + o.Reason = xr.ReadStringMax(1024) + return xr.Error() +} + +/* + +EmptyMessage Structure: + + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 ++-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + + +struct EmptyMessage { +} + +*/ + +func (o EmptyMessage) EncodeXDR(w io.Writer) (int, error) { + var xw = xdr.NewWriter(w) + return o.encodeXDR(xw) +} + +func (o EmptyMessage) MarshalXDR() []byte { + return o.AppendXDR(make([]byte, 0, 128)) +} + +func (o EmptyMessage) AppendXDR(bs []byte) []byte { + var aw = xdr.AppendWriter(bs) + var xw = xdr.NewWriter(&aw) + o.encodeXDR(xw) + return []byte(aw) +} + +func (o EmptyMessage) encodeXDR(xw *xdr.Writer) (int, error) { + return xw.Tot(), xw.Error() +} + +func (o *EmptyMessage) DecodeXDR(r io.Reader) error { + xr := xdr.NewReader(r) + return o.decodeXDR(xr) +} + +func (o *EmptyMessage) UnmarshalXDR(bs []byte) error { + var br = bytes.NewReader(bs) + var xr = xdr.NewReader(br) + return o.decodeXDR(xr) +} + +func (o *EmptyMessage) decodeXDR(xr *xdr.Reader) error { + return xr.Error() +} diff --git a/nativemodel_darwin.go b/nativemodel_darwin.go new file mode 100644 index 000000000..9ac402fe6 --- /dev/null +++ b/nativemodel_darwin.go @@ -0,0 +1,42 @@ +// Copyright (C) 2014 Jakob Borg and Contributors (see the CONTRIBUTORS file). +// All rights reserved. Use of this source code is governed by an MIT-style +// license that can be found in the LICENSE file. + +// +build darwin + +package protocol + +// Darwin uses NFD normalization + +import "code.google.com/p/go.text/unicode/norm" + +type nativeModel struct { + next Model +} + +func (m nativeModel) Index(nodeID NodeID, repo string, files []FileInfo) { + for i := range files { + files[i].Name = norm.NFD.String(files[i].Name) + } + m.next.Index(nodeID, repo, files) +} + +func (m nativeModel) IndexUpdate(nodeID NodeID, repo string, files []FileInfo) { + for i := range files { + files[i].Name = norm.NFD.String(files[i].Name) + } + m.next.IndexUpdate(nodeID, repo, files) +} + +func (m nativeModel) Request(nodeID NodeID, repo string, name string, offset int64, size int) ([]byte, error) { + name = norm.NFD.String(name) + return m.next.Request(nodeID, repo, name, offset, size) +} + +func (m nativeModel) ClusterConfig(nodeID NodeID, config ClusterConfigMessage) { + m.next.ClusterConfig(nodeID, config) +} + +func (m nativeModel) Close(nodeID NodeID, err error) { + m.next.Close(nodeID, err) +} diff --git a/nativemodel_unix.go b/nativemodel_unix.go new file mode 100644 index 000000000..23fbe0b6b --- /dev/null +++ b/nativemodel_unix.go @@ -0,0 +1,33 @@ +// Copyright (C) 2014 Jakob Borg and Contributors (see the CONTRIBUTORS file). +// All rights reserved. Use of this source code is governed by an MIT-style +// license that can be found in the LICENSE file. + +// +build !windows,!darwin + +package protocol + +// Normal Unixes uses NFC and slashes, which is the wire format. + +type nativeModel struct { + next Model +} + +func (m nativeModel) Index(nodeID NodeID, repo string, files []FileInfo) { + m.next.Index(nodeID, repo, files) +} + +func (m nativeModel) IndexUpdate(nodeID NodeID, repo string, files []FileInfo) { + m.next.IndexUpdate(nodeID, repo, files) +} + +func (m nativeModel) Request(nodeID NodeID, repo string, name string, offset int64, size int) ([]byte, error) { + return m.next.Request(nodeID, repo, name, offset, size) +} + +func (m nativeModel) ClusterConfig(nodeID NodeID, config ClusterConfigMessage) { + m.next.ClusterConfig(nodeID, config) +} + +func (m nativeModel) Close(nodeID NodeID, err error) { + m.next.Close(nodeID, err) +} diff --git a/nativemodel_windows.go b/nativemodel_windows.go new file mode 100644 index 000000000..9841d63f7 --- /dev/null +++ b/nativemodel_windows.go @@ -0,0 +1,72 @@ +// Copyright (C) 2014 Jakob Borg and Contributors (see the CONTRIBUTORS file). +// All rights reserved. Use of this source code is governed by an MIT-style +// license that can be found in the LICENSE file. + +// +build windows + +package protocol + +// Windows uses backslashes as file separator and disallows a bunch of +// characters in the filename + +import ( + "path/filepath" + "strings" +) + +var disallowedCharacters = string([]rune{ + '<', '>', ':', '"', '|', '?', '*', + 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, + 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, + 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, + 31, +}) + +type nativeModel struct { + next Model +} + +func (m nativeModel) Index(nodeID NodeID, repo string, files []FileInfo) { + for i, f := range files { + if strings.ContainsAny(f.Name, disallowedCharacters) { + if f.IsDeleted() { + // Don't complain if the file is marked as deleted, since it + // can't possibly exist here anyway. + continue + } + files[i].Flags |= FlagInvalid + l.Warnf("File name %q contains invalid characters; marked as invalid.", f.Name) + } + files[i].Name = filepath.FromSlash(f.Name) + } + m.next.Index(nodeID, repo, files) +} + +func (m nativeModel) IndexUpdate(nodeID NodeID, repo string, files []FileInfo) { + for i, f := range files { + if strings.ContainsAny(f.Name, disallowedCharacters) { + if f.IsDeleted() { + // Don't complain if the file is marked as deleted, since it + // can't possibly exist here anyway. + continue + } + files[i].Flags |= FlagInvalid + l.Warnf("File name %q contains invalid characters; marked as invalid.", f.Name) + } + files[i].Name = filepath.FromSlash(files[i].Name) + } + m.next.IndexUpdate(nodeID, repo, files) +} + +func (m nativeModel) Request(nodeID NodeID, repo string, name string, offset int64, size int) ([]byte, error) { + name = filepath.FromSlash(name) + return m.next.Request(nodeID, repo, name, offset, size) +} + +func (m nativeModel) ClusterConfig(nodeID NodeID, config ClusterConfigMessage) { + m.next.ClusterConfig(nodeID, config) +} + +func (m nativeModel) Close(nodeID NodeID, err error) { + m.next.Close(nodeID, err) +} diff --git a/nodeid.go b/nodeid.go new file mode 100644 index 000000000..9079781b9 --- /dev/null +++ b/nodeid.go @@ -0,0 +1,159 @@ +// Copyright (C) 2014 Jakob Borg and Contributors (see the CONTRIBUTORS file). +// All rights reserved. Use of this source code is governed by an MIT-style +// license that can be found in the LICENSE file. + +package protocol + +import ( + "bytes" + "crypto/sha256" + "encoding/base32" + "errors" + "fmt" + "regexp" + "strings" + + "github.com/syncthing/syncthing/internal/luhn" +) + +type NodeID [32]byte + +var LocalNodeID = NodeID{0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff} + +// NewNodeID generates a new node ID from the raw bytes of a certificate +func NewNodeID(rawCert []byte) NodeID { + var n NodeID + hf := sha256.New() + hf.Write(rawCert) + hf.Sum(n[:0]) + return n +} + +func NodeIDFromString(s string) (NodeID, error) { + var n NodeID + err := n.UnmarshalText([]byte(s)) + return n, err +} + +func NodeIDFromBytes(bs []byte) NodeID { + var n NodeID + if len(bs) != len(n) { + panic("incorrect length of byte slice representing node ID") + } + copy(n[:], bs) + return n +} + +// String returns the canonical string representation of the node ID +func (n NodeID) String() string { + id := base32.StdEncoding.EncodeToString(n[:]) + id = strings.Trim(id, "=") + id, err := luhnify(id) + if err != nil { + // Should never happen + panic(err) + } + id = chunkify(id) + return id +} + +func (n NodeID) GoString() string { + return n.String() +} + +func (n NodeID) Compare(other NodeID) int { + return bytes.Compare(n[:], other[:]) +} + +func (n NodeID) Equals(other NodeID) bool { + return bytes.Compare(n[:], other[:]) == 0 +} + +func (n *NodeID) MarshalText() ([]byte, error) { + return []byte(n.String()), nil +} + +func (n *NodeID) UnmarshalText(bs []byte) error { + id := string(bs) + id = strings.Trim(id, "=") + id = strings.ToUpper(id) + id = untypeoify(id) + id = unchunkify(id) + + var err error + switch len(id) { + case 56: + // New style, with check digits + id, err = unluhnify(id) + if err != nil { + return err + } + fallthrough + case 52: + // Old style, no check digits + dec, err := base32.StdEncoding.DecodeString(id + "====") + if err != nil { + return err + } + copy(n[:], dec) + return nil + default: + return errors.New("node ID invalid: incorrect length") + } +} + +func luhnify(s string) (string, error) { + if len(s) != 52 { + panic("unsupported string length") + } + + res := make([]string, 0, 4) + for i := 0; i < 4; i++ { + p := s[i*13 : (i+1)*13] + l, err := luhn.Base32.Generate(p) + if err != nil { + return "", err + } + res = append(res, fmt.Sprintf("%s%c", p, l)) + } + return res[0] + res[1] + res[2] + res[3], nil +} + +func unluhnify(s string) (string, error) { + if len(s) != 56 { + return "", fmt.Errorf("unsupported string length %d", len(s)) + } + + res := make([]string, 0, 4) + for i := 0; i < 4; i++ { + p := s[i*14 : (i+1)*14-1] + l, err := luhn.Base32.Generate(p) + if err != nil { + return "", err + } + if g := fmt.Sprintf("%s%c", p, l); g != s[i*14:(i+1)*14] { + return "", errors.New("check digit incorrect") + } + res = append(res, p) + } + return res[0] + res[1] + res[2] + res[3], nil +} + +func chunkify(s string) string { + s = regexp.MustCompile("(.{7})").ReplaceAllString(s, "$1-") + s = strings.Trim(s, "-") + return s +} + +func unchunkify(s string) string { + s = strings.Replace(s, "-", "", -1) + s = strings.Replace(s, " ", "", -1) + return s +} + +func untypeoify(s string) string { + s = strings.Replace(s, "0", "O", -1) + s = strings.Replace(s, "1", "I", -1) + s = strings.Replace(s, "8", "B", -1) + return s +} diff --git a/nodeid_test.go b/nodeid_test.go new file mode 100644 index 000000000..5b861b6de --- /dev/null +++ b/nodeid_test.go @@ -0,0 +1,78 @@ +// Copyright (C) 2014 Jakob Borg and Contributors (see the CONTRIBUTORS file). +// All rights reserved. Use of this source code is governed by an MIT-style +// license that can be found in the LICENSE file. + +package protocol + +import "testing" + +var formatted = "P56IOI7-MZJNU2Y-IQGDREY-DM2MGTI-MGL3BXN-PQ6W5BM-TBBZ4TJ-XZWICQ2" +var formatCases = []string{ + "P56IOI-7MZJNU-2IQGDR-EYDM2M-GTMGL3-BXNPQ6-W5BTBB-Z4TJXZ-WICQ", + "P56IOI-7MZJNU2Y-IQGDR-EYDM2M-GTI-MGL3-BXNPQ6-W5BM-TBB-Z4TJXZ-WICQ2", + "P56IOI7 MZJNU2I QGDREYD M2MGTMGL 3BXNPQ6W 5BTB BZ4T JXZWICQ", + "P56IOI7 MZJNU2Y IQGDREY DM2MGTI MGL3BXN PQ6W5BM TBBZ4TJ XZWICQ2", + "P56IOI7MZJNU2IQGDREYDM2MGTMGL3BXNPQ6W5BTBBZ4TJXZWICQ", + "p56ioi7mzjnu2iqgdreydm2mgtmgl3bxnpq6w5btbbz4tjxzwicq", + "P56IOI7MZJNU2YIQGDREYDM2MGTIMGL3BXNPQ6W5BMTBBZ4TJXZWICQ2", + "P561017MZJNU2YIQGDREYDM2MGTIMGL3BXNPQ6W5BMT88Z4TJXZWICQ2", + "p56ioi7mzjnu2yiqgdreydm2mgtimgl3bxnpq6w5bmtbbz4tjxzwicq2", + "p561017mzjnu2yiqgdreydm2mgtimgl3bxnpq6w5bmt88z4tjxzwicq2", +} + +func TestFormatNodeID(t *testing.T) { + for i, tc := range formatCases { + var id NodeID + err := id.UnmarshalText([]byte(tc)) + if err != nil { + t.Errorf("#%d UnmarshalText(%q); %v", i, tc, err) + } else if f := id.String(); f != formatted { + t.Errorf("#%d FormatNodeID(%q)\n\t%q !=\n\t%q", i, tc, f, formatted) + } + } +} + +var validateCases = []struct { + s string + ok bool +}{ + {"", false}, + {"P56IOI7-MZJNU2Y-IQGDREY-DM2MGTI-MGL3BXN-PQ6W5BM-TBBZ4TJ-XZWICQ2", true}, + {"P56IOI7-MZJNU2-IQGDREY-DM2MGT-MGL3BXN-PQ6W5B-TBBZ4TJ-XZWICQ", true}, + {"P56IOI7 MZJNU2I QGDREYD M2MGTMGL 3BXNPQ6W 5BTB BZ4T JXZWICQ", true}, + {"P56IOI7MZJNU2IQGDREYDM2MGTMGL3BXNPQ6W5BTBBZ4TJXZWICQ", true}, + {"P56IOI7MZJNU2IQGDREYDM2MGTMGL3BXNPQ6W5BTBBZ4TJXZWICQCCCC", false}, + {"p56ioi7mzjnu2iqgdreydm2mgtmgl3bxnpq6w5btbbz4tjxzwicq", true}, + {"p56ioi7mzjnu2iqgdreydm2mgtmgl3bxnpq6w5btbbz4tjxzwicqCCCC", false}, +} + +func TestValidateNodeID(t *testing.T) { + for _, tc := range validateCases { + var id NodeID + err := id.UnmarshalText([]byte(tc.s)) + if (err == nil && !tc.ok) || (err != nil && tc.ok) { + t.Errorf("ValidateNodeID(%q); %v != %v", tc.s, err, tc.ok) + } + } +} + +func TestMarshallingNodeID(t *testing.T) { + n0 := NodeID{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 10, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32} + n1 := NodeID{} + n2 := NodeID{} + + bs, _ := n0.MarshalText() + n1.UnmarshalText(bs) + bs, _ = n1.MarshalText() + n2.UnmarshalText(bs) + + if n2.String() != n0.String() { + t.Errorf("String marshalling error; %q != %q", n2.String(), n0.String()) + } + if !n2.Equals(n0) { + t.Error("Equals error") + } + if n2.Compare(n0) != 0 { + t.Error("Compare error") + } +} diff --git a/protocol.go b/protocol.go new file mode 100644 index 000000000..86fd6199f --- /dev/null +++ b/protocol.go @@ -0,0 +1,640 @@ +// Copyright (C) 2014 Jakob Borg and Contributors (see the CONTRIBUTORS file). +// All rights reserved. Use of this source code is governed by an MIT-style +// license that can be found in the LICENSE file. + +package protocol + +import ( + "encoding/binary" + "encoding/hex" + "errors" + "fmt" + "io" + "sync" + "time" + + lz4 "github.com/bkaradzic/go-lz4" +) + +const ( + BlockSize = 128 * 1024 +) + +const ( + messageTypeClusterConfig = 0 + messageTypeIndex = 1 + messageTypeRequest = 2 + messageTypeResponse = 3 + messageTypePing = 4 + messageTypePong = 5 + messageTypeIndexUpdate = 6 + messageTypeClose = 7 +) + +const ( + stateInitial = iota + stateCCRcvd + stateIdxRcvd +) + +const ( + FlagDeleted uint32 = 1 << 12 + FlagInvalid = 1 << 13 + FlagDirectory = 1 << 14 + FlagNoPermBits = 1 << 15 +) + +const ( + FlagShareTrusted uint32 = 1 << 0 + FlagShareReadOnly = 1 << 1 + FlagIntroducer = 1 << 2 + FlagShareBits = 0x000000ff +) + +var ( + ErrClusterHash = fmt.Errorf("configuration error: mismatched cluster hash") + ErrClosed = errors.New("connection closed") +) + +type Model interface { + // An index was received from the peer node + Index(nodeID NodeID, repo string, files []FileInfo) + // An index update was received from the peer node + IndexUpdate(nodeID NodeID, repo string, files []FileInfo) + // A request was made by the peer node + Request(nodeID NodeID, repo string, name string, offset int64, size int) ([]byte, error) + // A cluster configuration message was received + ClusterConfig(nodeID NodeID, config ClusterConfigMessage) + // The peer node closed the connection + Close(nodeID NodeID, err error) +} + +type Connection interface { + ID() NodeID + Name() string + Index(repo string, files []FileInfo) error + IndexUpdate(repo string, files []FileInfo) error + Request(repo string, name string, offset int64, size int) ([]byte, error) + ClusterConfig(config ClusterConfigMessage) + Statistics() Statistics +} + +type rawConnection struct { + id NodeID + name string + receiver Model + state int + + cr *countingReader + cw *countingWriter + + awaiting [4096]chan asyncResult + awaitingMut sync.Mutex + + idxMut sync.Mutex // ensures serialization of Index calls + + nextID chan int + outbox chan hdrMsg + closed chan struct{} + once sync.Once + + compressionThreshold int // compress messages larger than this many bytes + + rdbuf0 []byte // used & reused by readMessage + rdbuf1 []byte // used & reused by readMessage +} + +type asyncResult struct { + val []byte + err error +} + +type hdrMsg struct { + hdr header + msg encodable +} + +type encodable interface { + AppendXDR([]byte) []byte +} + +const ( + pingTimeout = 30 * time.Second + pingIdleTime = 60 * time.Second +) + +func NewConnection(nodeID NodeID, reader io.Reader, writer io.Writer, receiver Model, name string, compress bool) Connection { + cr := &countingReader{Reader: reader} + cw := &countingWriter{Writer: writer} + + compThres := 1<<31 - 1 // compression disabled + if compress { + compThres = 128 // compress messages that are 128 bytes long or larger + } + c := rawConnection{ + id: nodeID, + name: name, + receiver: nativeModel{receiver}, + state: stateInitial, + cr: cr, + cw: cw, + outbox: make(chan hdrMsg), + nextID: make(chan int), + closed: make(chan struct{}), + compressionThreshold: compThres, + } + + go c.readerLoop() + go c.writerLoop() + go c.pingerLoop() + go c.idGenerator() + + return wireFormatConnection{&c} +} + +func (c *rawConnection) ID() NodeID { + return c.id +} + +func (c *rawConnection) Name() string { + return c.name +} + +// Index writes the list of file information to the connected peer node +func (c *rawConnection) Index(repo string, idx []FileInfo) error { + select { + case <-c.closed: + return ErrClosed + default: + } + c.idxMut.Lock() + c.send(-1, messageTypeIndex, IndexMessage{repo, idx}) + c.idxMut.Unlock() + return nil +} + +// IndexUpdate writes the list of file information to the connected peer node as an update +func (c *rawConnection) IndexUpdate(repo string, idx []FileInfo) error { + select { + case <-c.closed: + return ErrClosed + default: + } + c.idxMut.Lock() + c.send(-1, messageTypeIndexUpdate, IndexMessage{repo, idx}) + c.idxMut.Unlock() + return nil +} + +// Request returns the bytes for the specified block after fetching them from the connected peer. +func (c *rawConnection) Request(repo string, name string, offset int64, size int) ([]byte, error) { + var id int + select { + case id = <-c.nextID: + case <-c.closed: + return nil, ErrClosed + } + + c.awaitingMut.Lock() + if ch := c.awaiting[id]; ch != nil { + panic("id taken") + } + rc := make(chan asyncResult, 1) + c.awaiting[id] = rc + c.awaitingMut.Unlock() + + ok := c.send(id, messageTypeRequest, RequestMessage{repo, name, uint64(offset), uint32(size)}) + if !ok { + return nil, ErrClosed + } + + res, ok := <-rc + if !ok { + return nil, ErrClosed + } + return res.val, res.err +} + +// ClusterConfig send the cluster configuration message to the peer and returns any error +func (c *rawConnection) ClusterConfig(config ClusterConfigMessage) { + c.send(-1, messageTypeClusterConfig, config) +} + +func (c *rawConnection) ping() bool { + var id int + select { + case id = <-c.nextID: + case <-c.closed: + return false + } + + rc := make(chan asyncResult, 1) + c.awaitingMut.Lock() + c.awaiting[id] = rc + c.awaitingMut.Unlock() + + ok := c.send(id, messageTypePing, nil) + if !ok { + return false + } + + res, ok := <-rc + return ok && res.err == nil +} + +func (c *rawConnection) readerLoop() (err error) { + defer func() { + c.close(err) + }() + + for { + select { + case <-c.closed: + return ErrClosed + default: + } + + hdr, msg, err := c.readMessage() + if err != nil { + return err + } + + switch hdr.msgType { + case messageTypeIndex: + if c.state < stateCCRcvd { + return fmt.Errorf("protocol error: index message in state %d", c.state) + } + c.handleIndex(msg.(IndexMessage)) + c.state = stateIdxRcvd + + case messageTypeIndexUpdate: + if c.state < stateIdxRcvd { + return fmt.Errorf("protocol error: index update message in state %d", c.state) + } + c.handleIndexUpdate(msg.(IndexMessage)) + + case messageTypeRequest: + if c.state < stateIdxRcvd { + return fmt.Errorf("protocol error: request message in state %d", c.state) + } + // Requests are handled asynchronously + go c.handleRequest(hdr.msgID, msg.(RequestMessage)) + + case messageTypeResponse: + if c.state < stateIdxRcvd { + return fmt.Errorf("protocol error: response message in state %d", c.state) + } + c.handleResponse(hdr.msgID, msg.(ResponseMessage)) + + case messageTypePing: + c.send(hdr.msgID, messageTypePong, EmptyMessage{}) + + case messageTypePong: + c.handlePong(hdr.msgID) + + case messageTypeClusterConfig: + if c.state != stateInitial { + return fmt.Errorf("protocol error: cluster config message in state %d", c.state) + } + go c.receiver.ClusterConfig(c.id, msg.(ClusterConfigMessage)) + c.state = stateCCRcvd + + case messageTypeClose: + return errors.New(msg.(CloseMessage).Reason) + + default: + return fmt.Errorf("protocol error: %s: unknown message type %#x", c.id, hdr.msgType) + } + } +} + +func (c *rawConnection) readMessage() (hdr header, msg encodable, err error) { + if cap(c.rdbuf0) < 8 { + c.rdbuf0 = make([]byte, 8) + } else { + c.rdbuf0 = c.rdbuf0[:8] + } + _, err = io.ReadFull(c.cr, c.rdbuf0) + if err != nil { + return + } + + hdr = decodeHeader(binary.BigEndian.Uint32(c.rdbuf0[0:4])) + msglen := int(binary.BigEndian.Uint32(c.rdbuf0[4:8])) + + if debug { + l.Debugf("read header %v (msglen=%d)", hdr, msglen) + } + + if cap(c.rdbuf0) < msglen { + c.rdbuf0 = make([]byte, msglen) + } else { + c.rdbuf0 = c.rdbuf0[:msglen] + } + _, err = io.ReadFull(c.cr, c.rdbuf0) + if err != nil { + return + } + + if debug { + l.Debugf("read %d bytes", len(c.rdbuf0)) + } + + msgBuf := c.rdbuf0 + if hdr.compression { + c.rdbuf1 = c.rdbuf1[:cap(c.rdbuf1)] + c.rdbuf1, err = lz4.Decode(c.rdbuf1, c.rdbuf0) + if err != nil { + return + } + msgBuf = c.rdbuf1 + if debug { + l.Debugf("decompressed to %d bytes", len(msgBuf)) + } + } + + if debug { + if len(msgBuf) > 1024 { + l.Debugf("message data:\n%s", hex.Dump(msgBuf[:1024])) + } else { + l.Debugf("message data:\n%s", hex.Dump(msgBuf)) + } + } + + switch hdr.msgType { + case messageTypeIndex, messageTypeIndexUpdate: + var idx IndexMessage + err = idx.UnmarshalXDR(msgBuf) + msg = idx + + case messageTypeRequest: + var req RequestMessage + err = req.UnmarshalXDR(msgBuf) + msg = req + + case messageTypeResponse: + var resp ResponseMessage + err = resp.UnmarshalXDR(msgBuf) + msg = resp + + case messageTypePing, messageTypePong: + msg = EmptyMessage{} + + case messageTypeClusterConfig: + var cc ClusterConfigMessage + err = cc.UnmarshalXDR(msgBuf) + msg = cc + + case messageTypeClose: + var cm CloseMessage + err = cm.UnmarshalXDR(msgBuf) + msg = cm + + default: + err = fmt.Errorf("protocol error: %s: unknown message type %#x", c.id, hdr.msgType) + } + + return +} + +func (c *rawConnection) handleIndex(im IndexMessage) { + if debug { + l.Debugf("Index(%v, %v, %d files)", c.id, im.Repository, len(im.Files)) + } + c.receiver.Index(c.id, im.Repository, im.Files) +} + +func (c *rawConnection) handleIndexUpdate(im IndexMessage) { + if debug { + l.Debugf("queueing IndexUpdate(%v, %v, %d files)", c.id, im.Repository, len(im.Files)) + } + c.receiver.IndexUpdate(c.id, im.Repository, im.Files) +} + +func (c *rawConnection) handleRequest(msgID int, req RequestMessage) { + data, _ := c.receiver.Request(c.id, req.Repository, req.Name, int64(req.Offset), int(req.Size)) + + c.send(msgID, messageTypeResponse, ResponseMessage{data}) +} + +func (c *rawConnection) handleResponse(msgID int, resp ResponseMessage) { + c.awaitingMut.Lock() + if rc := c.awaiting[msgID]; rc != nil { + c.awaiting[msgID] = nil + rc <- asyncResult{resp.Data, nil} + close(rc) + } + c.awaitingMut.Unlock() +} + +func (c *rawConnection) handlePong(msgID int) { + c.awaitingMut.Lock() + if rc := c.awaiting[msgID]; rc != nil { + c.awaiting[msgID] = nil + rc <- asyncResult{} + close(rc) + } + c.awaitingMut.Unlock() +} + +func (c *rawConnection) send(msgID int, msgType int, msg encodable) bool { + if msgID < 0 { + select { + case id := <-c.nextID: + msgID = id + case <-c.closed: + return false + } + } + + hdr := header{ + version: 0, + msgID: msgID, + msgType: msgType, + } + + select { + case c.outbox <- hdrMsg{hdr, msg}: + return true + case <-c.closed: + return false + } +} + +func (c *rawConnection) writerLoop() { + var msgBuf = make([]byte, 8) // buffer for wire format message, kept and reused + var uncBuf []byte // buffer for uncompressed message, kept and reused + for { + var tempBuf []byte + var err error + + select { + case hm := <-c.outbox: + if hm.msg != nil { + // Uncompressed message in uncBuf + uncBuf = hm.msg.AppendXDR(uncBuf[:0]) + + if len(uncBuf) >= c.compressionThreshold { + // Use compression for large messages + hm.hdr.compression = true + + // Make sure we have enough space for the compressed message plus header in msgBug + msgBuf = msgBuf[:cap(msgBuf)] + if maxLen := lz4.CompressBound(len(uncBuf)) + 8; maxLen > len(msgBuf) { + msgBuf = make([]byte, maxLen) + } + + // Compressed is written to msgBuf, we keep tb for the length only + tempBuf, err = lz4.Encode(msgBuf[8:], uncBuf) + binary.BigEndian.PutUint32(msgBuf[4:8], uint32(len(tempBuf))) + msgBuf = msgBuf[0 : len(tempBuf)+8] + + if debug { + l.Debugf("write compressed message; %v (len=%d)", hm.hdr, len(tempBuf)) + } + } else { + // No point in compressing very short messages + hm.hdr.compression = false + + msgBuf = msgBuf[:cap(msgBuf)] + if l := len(uncBuf) + 8; l > len(msgBuf) { + msgBuf = make([]byte, l) + } + + binary.BigEndian.PutUint32(msgBuf[4:8], uint32(len(uncBuf))) + msgBuf = msgBuf[0 : len(uncBuf)+8] + copy(msgBuf[8:], uncBuf) + + if debug { + l.Debugf("write uncompressed message; %v (len=%d)", hm.hdr, len(uncBuf)) + } + } + } else { + if debug { + l.Debugf("write empty message; %v", hm.hdr) + } + binary.BigEndian.PutUint32(msgBuf[4:8], 0) + msgBuf = msgBuf[:8] + } + + binary.BigEndian.PutUint32(msgBuf[0:4], encodeHeader(hm.hdr)) + + if err == nil { + var n int + n, err = c.cw.Write(msgBuf) + if debug { + l.Debugf("wrote %d bytes on the wire", n) + } + } + if err != nil { + c.close(err) + return + } + case <-c.closed: + return + } + } +} + +func (c *rawConnection) close(err error) { + c.once.Do(func() { + close(c.closed) + + c.awaitingMut.Lock() + for i, ch := range c.awaiting { + if ch != nil { + close(ch) + c.awaiting[i] = nil + } + } + c.awaitingMut.Unlock() + + go c.receiver.Close(c.id, err) + }) +} + +func (c *rawConnection) idGenerator() { + nextID := 0 + for { + nextID = (nextID + 1) & 0xfff + select { + case c.nextID <- nextID: + case <-c.closed: + return + } + } +} + +func (c *rawConnection) pingerLoop() { + var rc = make(chan bool, 1) + ticker := time.Tick(pingIdleTime / 2) + for { + select { + case <-ticker: + if d := time.Since(c.cr.Last()); d < pingIdleTime { + if debug { + l.Debugln(c.id, "ping skipped after rd", d) + } + continue + } + if d := time.Since(c.cw.Last()); d < pingIdleTime { + if debug { + l.Debugln(c.id, "ping skipped after wr", d) + } + continue + } + go func() { + if debug { + l.Debugln(c.id, "ping ->") + } + rc <- c.ping() + }() + select { + case ok := <-rc: + if debug { + l.Debugln(c.id, "<- pong") + } + if !ok { + c.close(fmt.Errorf("ping failure")) + } + case <-time.After(pingTimeout): + c.close(fmt.Errorf("ping timeout")) + case <-c.closed: + return + } + + case <-c.closed: + return + } + } +} + +type Statistics struct { + At time.Time + InBytesTotal uint64 + OutBytesTotal uint64 +} + +func (c *rawConnection) Statistics() Statistics { + return Statistics{ + At: time.Now(), + InBytesTotal: c.cr.Tot(), + OutBytesTotal: c.cw.Tot(), + } +} + +func IsDeleted(bits uint32) bool { + return bits&FlagDeleted != 0 +} + +func IsInvalid(bits uint32) bool { + return bits&FlagInvalid != 0 +} + +func IsDirectory(bits uint32) bool { + return bits&FlagDirectory != 0 +} + +func HasPermissionBits(bits uint32) bool { + return bits&FlagNoPermBits == 0 +} diff --git a/protocol_test.go b/protocol_test.go new file mode 100644 index 000000000..56a46f2ec --- /dev/null +++ b/protocol_test.go @@ -0,0 +1,383 @@ +// Copyright (C) 2014 Jakob Borg and Contributors (see the CONTRIBUTORS file). +// All rights reserved. Use of this source code is governed by an MIT-style +// license that can be found in the LICENSE file. + +package protocol + +import ( + "bytes" + "encoding/hex" + "encoding/json" + "errors" + "fmt" + "io" + "io/ioutil" + "os" + "reflect" + "testing" + "testing/quick" + + "github.com/calmh/xdr" +) + +var ( + c0ID = NewNodeID([]byte{1}) + c1ID = NewNodeID([]byte{2}) +) + +func TestHeaderFunctions(t *testing.T) { + f := func(ver, id, typ int) bool { + ver = int(uint(ver) % 16) + id = int(uint(id) % 4096) + typ = int(uint(typ) % 256) + h0 := header{version: ver, msgID: id, msgType: typ} + h1 := decodeHeader(encodeHeader(h0)) + return h0 == h1 + } + if err := quick.Check(f, nil); err != nil { + t.Error(err) + } +} + +func TestHeaderLayout(t *testing.T) { + var e, a uint32 + + // Version are the first four bits + e = 0xf0000000 + a = encodeHeader(header{version: 0xf}) + if a != e { + t.Errorf("Header layout incorrect; %08x != %08x", a, e) + } + + // Message ID are the following 12 bits + e = 0x0fff0000 + a = encodeHeader(header{msgID: 0xfff}) + if a != e { + t.Errorf("Header layout incorrect; %08x != %08x", a, e) + } + + // Type are the last 8 bits before reserved + e = 0x0000ff00 + a = encodeHeader(header{msgType: 0xff}) + if a != e { + t.Errorf("Header layout incorrect; %08x != %08x", a, e) + } +} + +func TestPing(t *testing.T) { + ar, aw := io.Pipe() + br, bw := io.Pipe() + + c0 := NewConnection(c0ID, ar, bw, nil, "name", true).(wireFormatConnection).next.(*rawConnection) + c1 := NewConnection(c1ID, br, aw, nil, "name", true).(wireFormatConnection).next.(*rawConnection) + + if ok := c0.ping(); !ok { + t.Error("c0 ping failed") + } + if ok := c1.ping(); !ok { + t.Error("c1 ping failed") + } +} + +func TestPingErr(t *testing.T) { + e := errors.New("something broke") + + for i := 0; i < 16; i++ { + for j := 0; j < 16; j++ { + m0 := newTestModel() + m1 := newTestModel() + + ar, aw := io.Pipe() + br, bw := io.Pipe() + eaw := &ErrPipe{PipeWriter: *aw, max: i, err: e} + ebw := &ErrPipe{PipeWriter: *bw, max: j, err: e} + + c0 := NewConnection(c0ID, ar, ebw, m0, "name", true).(wireFormatConnection).next.(*rawConnection) + NewConnection(c1ID, br, eaw, m1, "name", true) + + res := c0.ping() + if (i < 8 || j < 8) && res { + t.Errorf("Unexpected ping success; i=%d, j=%d", i, j) + } else if (i >= 12 && j >= 12) && !res { + t.Errorf("Unexpected ping fail; i=%d, j=%d", i, j) + } + } + } +} + +// func TestRequestResponseErr(t *testing.T) { +// e := errors.New("something broke") + +// var pass bool +// for i := 0; i < 48; i++ { +// for j := 0; j < 38; j++ { +// m0 := newTestModel() +// m0.data = []byte("response data") +// m1 := newTestModel() + +// ar, aw := io.Pipe() +// br, bw := io.Pipe() +// eaw := &ErrPipe{PipeWriter: *aw, max: i, err: e} +// ebw := &ErrPipe{PipeWriter: *bw, max: j, err: e} + +// NewConnection(c0ID, ar, ebw, m0, nil) +// c1 := NewConnection(c1ID, br, eaw, m1, nil).(wireFormatConnection).next.(*rawConnection) + +// d, err := c1.Request("default", "tn", 1234, 5678) +// if err == e || err == ErrClosed { +// t.Logf("Error at %d+%d bytes", i, j) +// if !m1.isClosed() { +// t.Fatal("c1 not closed") +// } +// if !m0.isClosed() { +// t.Fatal("c0 not closed") +// } +// continue +// } +// if err != nil { +// t.Fatal(err) +// } +// if string(d) != "response data" { +// t.Fatalf("Incorrect response data %q", string(d)) +// } +// if m0.repo != "default" { +// t.Fatalf("Incorrect repo %q", m0.repo) +// } +// if m0.name != "tn" { +// t.Fatalf("Incorrect name %q", m0.name) +// } +// if m0.offset != 1234 { +// t.Fatalf("Incorrect offset %d", m0.offset) +// } +// if m0.size != 5678 { +// t.Fatalf("Incorrect size %d", m0.size) +// } +// t.Logf("Pass at %d+%d bytes", i, j) +// pass = true +// } +// } +// if !pass { +// t.Fatal("Never passed") +// } +// } + +func TestVersionErr(t *testing.T) { + m0 := newTestModel() + m1 := newTestModel() + + ar, aw := io.Pipe() + br, bw := io.Pipe() + + c0 := NewConnection(c0ID, ar, bw, m0, "name", true).(wireFormatConnection).next.(*rawConnection) + NewConnection(c1ID, br, aw, m1, "name", true) + + w := xdr.NewWriter(c0.cw) + w.WriteUint32(encodeHeader(header{ + version: 2, + msgID: 0, + msgType: 0, + })) + w.WriteUint32(0) + + if !m1.isClosed() { + t.Error("Connection should close due to unknown version") + } +} + +func TestTypeErr(t *testing.T) { + m0 := newTestModel() + m1 := newTestModel() + + ar, aw := io.Pipe() + br, bw := io.Pipe() + + c0 := NewConnection(c0ID, ar, bw, m0, "name", true).(wireFormatConnection).next.(*rawConnection) + NewConnection(c1ID, br, aw, m1, "name", true) + + w := xdr.NewWriter(c0.cw) + w.WriteUint32(encodeHeader(header{ + version: 0, + msgID: 0, + msgType: 42, + })) + w.WriteUint32(0) + + if !m1.isClosed() { + t.Error("Connection should close due to unknown message type") + } +} + +func TestClose(t *testing.T) { + m0 := newTestModel() + m1 := newTestModel() + + ar, aw := io.Pipe() + br, bw := io.Pipe() + + c0 := NewConnection(c0ID, ar, bw, m0, "name", true).(wireFormatConnection).next.(*rawConnection) + NewConnection(c1ID, br, aw, m1, "name", true) + + c0.close(nil) + + <-c0.closed + if !m0.isClosed() { + t.Fatal("Connection should be closed") + } + + // None of these should panic, some should return an error + + if c0.ping() { + t.Error("Ping should not return true") + } + + c0.Index("default", nil) + c0.Index("default", nil) + + if _, err := c0.Request("default", "foo", 0, 0); err == nil { + t.Error("Request should return an error") + } +} + +func TestElementSizeExceededNested(t *testing.T) { + m := ClusterConfigMessage{ + Repositories: []Repository{ + {ID: "longstringlongstringlongstringinglongstringlongstringlonlongstringlongstringlon"}, + }, + } + _, err := m.EncodeXDR(ioutil.Discard) + if err == nil { + t.Errorf("ID length %d > max 64, but no error", len(m.Repositories[0].ID)) + } +} + +func TestMarshalIndexMessage(t *testing.T) { + var quickCfg = &quick.Config{MaxCountScale: 10} + if testing.Short() { + quickCfg = nil + } + + f := func(m1 IndexMessage) bool { + for _, f := range m1.Files { + for i := range f.Blocks { + f.Blocks[i].Offset = 0 + if len(f.Blocks[i].Hash) == 0 { + f.Blocks[i].Hash = nil + } + } + } + + return testMarshal(t, "index", &m1, &IndexMessage{}) + } + + if err := quick.Check(f, quickCfg); err != nil { + t.Error(err) + } +} + +func TestMarshalRequestMessage(t *testing.T) { + var quickCfg = &quick.Config{MaxCountScale: 10} + if testing.Short() { + quickCfg = nil + } + + f := func(m1 RequestMessage) bool { + return testMarshal(t, "request", &m1, &RequestMessage{}) + } + + if err := quick.Check(f, quickCfg); err != nil { + t.Error(err) + } +} + +func TestMarshalResponseMessage(t *testing.T) { + var quickCfg = &quick.Config{MaxCountScale: 10} + if testing.Short() { + quickCfg = nil + } + + f := func(m1 ResponseMessage) bool { + if len(m1.Data) == 0 { + m1.Data = nil + } + return testMarshal(t, "response", &m1, &ResponseMessage{}) + } + + if err := quick.Check(f, quickCfg); err != nil { + t.Error(err) + } +} + +func TestMarshalClusterConfigMessage(t *testing.T) { + var quickCfg = &quick.Config{MaxCountScale: 10} + if testing.Short() { + quickCfg = nil + } + + f := func(m1 ClusterConfigMessage) bool { + return testMarshal(t, "clusterconfig", &m1, &ClusterConfigMessage{}) + } + + if err := quick.Check(f, quickCfg); err != nil { + t.Error(err) + } +} + +func TestMarshalCloseMessage(t *testing.T) { + var quickCfg = &quick.Config{MaxCountScale: 10} + if testing.Short() { + quickCfg = nil + } + + f := func(m1 CloseMessage) bool { + return testMarshal(t, "close", &m1, &CloseMessage{}) + } + + if err := quick.Check(f, quickCfg); err != nil { + t.Error(err) + } +} + +type message interface { + EncodeXDR(io.Writer) (int, error) + DecodeXDR(io.Reader) error +} + +func testMarshal(t *testing.T, prefix string, m1, m2 message) bool { + var buf bytes.Buffer + + failed := func(bc []byte) { + bs, _ := json.MarshalIndent(m1, "", " ") + ioutil.WriteFile(prefix+"-1.txt", bs, 0644) + bs, _ = json.MarshalIndent(m2, "", " ") + ioutil.WriteFile(prefix+"-2.txt", bs, 0644) + if len(bc) > 0 { + f, _ := os.Create(prefix + "-data.txt") + fmt.Fprint(f, hex.Dump(bc)) + f.Close() + } + } + + _, err := m1.EncodeXDR(&buf) + if err == xdr.ErrElementSizeExceeded { + return true + } + if err != nil { + failed(nil) + t.Fatal(err) + } + + bc := make([]byte, len(buf.Bytes())) + copy(bc, buf.Bytes()) + + err = m2.DecodeXDR(&buf) + if err != nil { + failed(bc) + t.Fatal(err) + } + + ok := reflect.DeepEqual(m1, m2) + if !ok { + failed(bc) + } + return ok +} diff --git a/wireformat.go b/wireformat.go new file mode 100644 index 000000000..987c03eff --- /dev/null +++ b/wireformat.go @@ -0,0 +1,58 @@ +// Copyright (C) 2014 Jakob Borg and Contributors (see the CONTRIBUTORS file). +// All rights reserved. Use of this source code is governed by an MIT-style +// license that can be found in the LICENSE file. + +package protocol + +import ( + "path/filepath" + + "code.google.com/p/go.text/unicode/norm" +) + +type wireFormatConnection struct { + next Connection +} + +func (c wireFormatConnection) ID() NodeID { + return c.next.ID() +} + +func (c wireFormatConnection) Name() string { + return c.next.Name() +} + +func (c wireFormatConnection) Index(repo string, fs []FileInfo) error { + var myFs = make([]FileInfo, len(fs)) + copy(myFs, fs) + + for i := range fs { + myFs[i].Name = norm.NFC.String(filepath.ToSlash(myFs[i].Name)) + } + + return c.next.Index(repo, myFs) +} + +func (c wireFormatConnection) IndexUpdate(repo string, fs []FileInfo) error { + var myFs = make([]FileInfo, len(fs)) + copy(myFs, fs) + + for i := range fs { + myFs[i].Name = norm.NFC.String(filepath.ToSlash(myFs[i].Name)) + } + + return c.next.IndexUpdate(repo, myFs) +} + +func (c wireFormatConnection) Request(repo, name string, offset int64, size int) ([]byte, error) { + name = norm.NFC.String(filepath.ToSlash(name)) + return c.next.Request(repo, name, offset, size) +} + +func (c wireFormatConnection) ClusterConfig(config ClusterConfigMessage) { + c.next.ClusterConfig(config) +} + +func (c wireFormatConnection) Statistics() Statistics { + return c.next.Statistics() +}