Add Options message to protocol

This commit is contained in:
Jakob Borg 2014-01-23 13:12:45 +01:00
parent 20a47695fb
commit 2ea3558283
9 changed files with 156 additions and 42 deletions

14
main.go
View File

@ -295,6 +295,11 @@ func listen(myID string, addr string, m *model.Model, cfg *tls.Config) {
l, err := tls.Listen("tcp", addr, cfg) l, err := tls.Listen("tcp", addr, cfg)
fatalErr(err) fatalErr(err)
connOpts := map[string]string{
"clientId": "syncthing",
"clientVersion": Version,
}
listen: listen:
for { for {
conn, err := l.Accept() conn, err := l.Accept()
@ -329,7 +334,7 @@ listen:
for nodeID := range nodeAddrs { for nodeID := range nodeAddrs {
if nodeID == remoteID { if nodeID == remoteID {
protoConn := protocol.NewConnection(remoteID, conn, conn, m) protoConn := protocol.NewConnection(remoteID, conn, conn, m, connOpts)
m.AddConnection(conn, protoConn) m.AddConnection(conn, protoConn)
continue listen continue listen
} }
@ -361,6 +366,11 @@ func connect(myID string, addr string, nodeAddrs map[string][]string, m *model.M
warnf("No discovery possible (%v)", err) warnf("No discovery possible (%v)", err)
} }
connOpts := map[string]string{
"clientId": "syncthing",
"clientVersion": Version,
}
for { for {
nextNode: nextNode:
for nodeID, addrs := range nodeAddrs { for nodeID, addrs := range nodeAddrs {
@ -399,7 +409,7 @@ func connect(myID string, addr string, nodeAddrs map[string][]string, m *model.M
continue continue
} }
protoConn := protocol.NewConnection(remoteID, conn, conn, m) protoConn := protocol.NewConnection(remoteID, conn, conn, m, connOpts)
m.AddConnection(conn, protoConn) m.AddConnection(conn, protoConn)
continue nextNode continue nextNode
} }

View File

@ -59,6 +59,7 @@ type Connection interface {
Index([]protocol.FileInfo) Index([]protocol.FileInfo)
Request(name string, offset int64, size uint32, hash []byte) ([]byte, error) Request(name string, offset int64, size uint32, hash []byte) ([]byte, error)
Statistics() protocol.Statistics Statistics() protocol.Statistics
Option(key string) string
} }
const ( const (
@ -155,7 +156,9 @@ func (m *Model) LocalAge() float64 {
type ConnectionInfo struct { type ConnectionInfo struct {
protocol.Statistics protocol.Statistics
Address string Address string
ClientID string
ClientVersion string
} }
// ConnectionStats returns a map with connection statistics for each connected node. // ConnectionStats returns a map with connection statistics for each connected node.
@ -169,7 +172,9 @@ func (m *Model) ConnectionStats() map[string]ConnectionInfo {
var res = make(map[string]ConnectionInfo) var res = make(map[string]ConnectionInfo)
for node, conn := range m.protoConn { for node, conn := range m.protoConn {
ci := ConnectionInfo{ ci := ConnectionInfo{
Statistics: conn.Statistics(), Statistics: conn.Statistics(),
ClientID: conn.Option("clientId"),
ClientVersion: conn.Option("clientVersion"),
} }
if nc, ok := m.rawConn[node].(remoteAddrer); ok { if nc, ok := m.rawConn[node].(remoteAddrer); ok {
ci.Address = nc.RemoteAddr().String() ci.Address = nc.RemoteAddr().String()

View File

@ -12,7 +12,7 @@ import (
) )
func TestNewModel(t *testing.T) { func TestNewModel(t *testing.T) {
m := NewModel("foo") m := NewModel("foo", 1e6)
if m == nil { if m == nil {
t.Fatalf("NewModel returned nil") t.Fatalf("NewModel returned nil")
@ -53,7 +53,7 @@ func init() {
} }
func TestUpdateLocal(t *testing.T) { func TestUpdateLocal(t *testing.T) {
m := NewModel("testdata") m := NewModel("testdata", 1e6)
fs, _ := m.Walk(false) fs, _ := m.Walk(false)
m.ReplaceLocal(fs) m.ReplaceLocal(fs)
@ -95,7 +95,7 @@ func TestUpdateLocal(t *testing.T) {
} }
func TestRemoteUpdateExisting(t *testing.T) { func TestRemoteUpdateExisting(t *testing.T) {
m := NewModel("testdata") m := NewModel("testdata", 1e6)
fs, _ := m.Walk(false) fs, _ := m.Walk(false)
m.ReplaceLocal(fs) m.ReplaceLocal(fs)
@ -112,7 +112,7 @@ func TestRemoteUpdateExisting(t *testing.T) {
} }
func TestRemoteAddNew(t *testing.T) { func TestRemoteAddNew(t *testing.T) {
m := NewModel("testdata") m := NewModel("testdata", 1e6)
fs, _ := m.Walk(false) fs, _ := m.Walk(false)
m.ReplaceLocal(fs) m.ReplaceLocal(fs)
@ -129,7 +129,7 @@ func TestRemoteAddNew(t *testing.T) {
} }
func TestRemoteUpdateOld(t *testing.T) { func TestRemoteUpdateOld(t *testing.T) {
m := NewModel("testdata") m := NewModel("testdata", 1e6)
fs, _ := m.Walk(false) fs, _ := m.Walk(false)
m.ReplaceLocal(fs) m.ReplaceLocal(fs)
@ -147,7 +147,7 @@ func TestRemoteUpdateOld(t *testing.T) {
} }
func TestRemoteIndexUpdate(t *testing.T) { func TestRemoteIndexUpdate(t *testing.T) {
m := NewModel("testdata") m := NewModel("testdata", 1e6)
fs, _ := m.Walk(false) fs, _ := m.Walk(false)
m.ReplaceLocal(fs) m.ReplaceLocal(fs)
@ -180,7 +180,7 @@ func TestRemoteIndexUpdate(t *testing.T) {
} }
func TestDelete(t *testing.T) { func TestDelete(t *testing.T) {
m := NewModel("testdata") m := NewModel("testdata", 1e6)
fs, _ := m.Walk(false) fs, _ := m.Walk(false)
m.ReplaceLocal(fs) m.ReplaceLocal(fs)
@ -282,7 +282,7 @@ func TestDelete(t *testing.T) {
} }
func TestForgetNode(t *testing.T) { func TestForgetNode(t *testing.T) {
m := NewModel("testdata") m := NewModel("testdata", 1e6)
fs, _ := m.Walk(false) fs, _ := m.Walk(false)
m.ReplaceLocal(fs) m.ReplaceLocal(fs)
@ -335,7 +335,7 @@ func TestForgetNode(t *testing.T) {
} }
func TestRequest(t *testing.T) { func TestRequest(t *testing.T) {
m := NewModel("testdata") m := NewModel("testdata", 1e6)
fs, _ := m.Walk(false) fs, _ := m.Walk(false)
m.ReplaceLocal(fs) m.ReplaceLocal(fs)
@ -357,7 +357,7 @@ func TestRequest(t *testing.T) {
} }
func TestIgnoreWithUnknownFlags(t *testing.T) { func TestIgnoreWithUnknownFlags(t *testing.T) {
m := NewModel("testdata") m := NewModel("testdata", 1e6)
fs, _ := m.Walk(false) fs, _ := m.Walk(false)
m.ReplaceLocal(fs) m.ReplaceLocal(fs)
@ -404,7 +404,7 @@ func prepareModel(n int, m *Model) []protocol.FileInfo {
} }
func BenchmarkRecomputeGlobal10k(b *testing.B) { func BenchmarkRecomputeGlobal10k(b *testing.B) {
m := NewModel("testdata") m := NewModel("testdata", 1e6)
prepareModel(10000, m) prepareModel(10000, m)
b.ResetTimer() b.ResetTimer()
@ -414,7 +414,7 @@ func BenchmarkRecomputeGlobal10k(b *testing.B) {
} }
func BenchmarkRecomputeNeed10K(b *testing.B) { func BenchmarkRecomputeNeed10K(b *testing.B) {
m := NewModel("testdata") m := NewModel("testdata", 1e6)
prepareModel(10000, m) prepareModel(10000, m)
b.ResetTimer() b.ResetTimer()
@ -424,7 +424,7 @@ func BenchmarkRecomputeNeed10K(b *testing.B) {
} }
func BenchmarkIndexUpdate10000(b *testing.B) { func BenchmarkIndexUpdate10000(b *testing.B) {
m := NewModel("testdata") m := NewModel("testdata", 1e6)
files := prepareModel(10000, m) files := prepareModel(10000, m)
b.ResetTimer() b.ResetTimer()
@ -446,6 +446,10 @@ func (f FakeConnection) ID() string {
return string(f.id) return string(f.id)
} }
func (f FakeConnection) Option(string) string {
return ""
}
func (FakeConnection) Index([]protocol.FileInfo) {} func (FakeConnection) Index([]protocol.FileInfo) {}
func (f FakeConnection) Request(name string, offset int64, size uint32, hash []byte) ([]byte, error) { func (f FakeConnection) Request(name string, offset int64, size uint32, hash []byte) ([]byte, error) {
@ -461,7 +465,7 @@ func (FakeConnection) Statistics() protocol.Statistics {
} }
func BenchmarkRequest(b *testing.B) { func BenchmarkRequest(b *testing.B) {
m := NewModel("testdata") m := NewModel("testdata", 1e6)
fs, _ := m.Walk(false) fs, _ := m.Walk(false)
m.ReplaceLocal(fs) m.ReplaceLocal(fs)

View File

@ -21,7 +21,7 @@ var correctIgnores = map[string][]string{
} }
func TestWalk(t *testing.T) { func TestWalk(t *testing.T) {
m := NewModel("testdata") m := NewModel("testdata", 1e6)
files, ignores := m.Walk(false) files, ignores := m.Walk(false)
if l1, l2 := len(files), len(testdata); l1 != l2 { if l1, l2 := len(files), len(testdata); l1 != l2 {

View File

@ -193,6 +193,33 @@ model, the Index Update merely amends it with new or updated file
information. Any files not mentioned in an Index Update are left information. Any files not mentioned in an Index Update are left
unchanged. unchanged.
### Options (Type = 7)
This informational message provides information about the client
configuration, version, etc. It is sent at connection initiation and,
optionally, when any of the sent parameters have changed. The message is
in the form of a list of (key, value) pairs, both of string type.
struct OptionsMessage {
KeyValue Options<>;
}
struct KeyValue {
string Key;
string Value;
}
Key ID:s apart from the well known ones are implementation
specific. An implementation is expected to ignore unknown keys. An
implementation may impose limits on key and value size.
Well known keys:
- "clientId" -- The name of the implementation. Example: "syncthing".
- "clientVersion" -- The version of the client. Example: "v1.0.33-47". The
Following the SemVer 2.0 specification for version strings is
encouraged but not enforced.
Example Exchange Example Exchange
---------------- ----------------

View File

@ -65,6 +65,14 @@ func (w *marshalWriter) writeResponse(data []byte) {
w.writeBytes(data) w.writeBytes(data)
} }
func (w *marshalWriter) writeOptions(opts map[string]string) {
w.writeUint32(uint32(len(opts)))
for k, v := range opts {
w.writeString(k)
w.writeString(v)
}
}
func (r *marshalReader) readHeader() header { func (r *marshalReader) readHeader() header {
return decodeHeader(r.readUint32()) return decodeHeader(r.readUint32())
} }
@ -109,3 +117,14 @@ func (r *marshalReader) readRequest() request {
func (r *marshalReader) readResponse() []byte { func (r *marshalReader) readResponse() []byte {
return r.readBytes() return r.readBytes()
} }
func (r *marshalReader) readOptions() map[string]string {
n := r.readUint32()
opts := make(map[string]string, n)
for i := 0; i < int(n); i++ {
k := r.readString()
v := r.readString()
opts[k] = v
}
return opts
}

View File

@ -117,3 +117,23 @@ func BenchmarkWriteRequest(b *testing.B) {
wr.writeRequest(req) wr.writeRequest(req)
} }
} }
func TestOptions(t *testing.T) {
opts := map[string]string{
"foo": "bar",
"someKey": "otherValue",
"hello": "",
"": "42",
}
var buf = new(bytes.Buffer)
var wr = marshalWriter{w: buf}
wr.writeOptions(opts)
var rd = marshalReader{r: buf}
var ropts = rd.readOptions()
if !reflect.DeepEqual(opts, ropts) {
t.Error("Incorrect options marshal/demarshal")
}
}

View File

@ -5,6 +5,7 @@ import (
"errors" "errors"
"fmt" "fmt"
"io" "io"
"log"
"sync" "sync"
"time" "time"
@ -18,6 +19,7 @@ const (
messageTypePing = 4 messageTypePing = 4
messageTypePong = 5 messageTypePong = 5
messageTypeIndexUpdate = 6 messageTypeIndexUpdate = 6
messageTypeOptions = 7
) )
const ( const (
@ -52,16 +54,18 @@ type Model interface {
type Connection struct { type Connection struct {
sync.RWMutex sync.RWMutex
id string id string
receiver Model receiver Model
reader io.Reader reader io.Reader
mreader *marshalReader mreader *marshalReader
writer io.Writer writer io.Writer
mwriter *marshalWriter mwriter *marshalWriter
closed bool closed bool
awaiting map[int]chan asyncResult awaiting map[int]chan asyncResult
nextId int nextId int
indexSent map[string][2]int64 indexSent map[string][2]int64
options map[string]string
optionsLock sync.Mutex
hasSentIndex bool hasSentIndex bool
hasRecvdIndex bool hasRecvdIndex bool
@ -81,7 +85,7 @@ const (
pingIdleTime = 5 * time.Minute pingIdleTime = 5 * time.Minute
) )
func NewConnection(nodeID string, reader io.Reader, writer io.Writer, receiver Model) *Connection { func NewConnection(nodeID string, reader io.Reader, writer io.Writer, receiver Model, options map[string]string) *Connection {
flrd := flate.NewReader(reader) flrd := flate.NewReader(reader)
flwr, err := flate.NewWriter(writer, flate.BestSpeed) flwr, err := flate.NewWriter(writer, flate.BestSpeed)
if err != nil { if err != nil {
@ -101,6 +105,20 @@ func NewConnection(nodeID string, reader io.Reader, writer io.Writer, receiver M
go c.readerLoop() go c.readerLoop()
go c.pingerLoop() go c.pingerLoop()
if options != nil {
go func() {
c.Lock()
c.mwriter.writeHeader(header{0, c.nextId, messageTypeOptions})
c.mwriter.writeOptions(options)
err := c.flush()
if err != nil {
log.Printf("Warning:", err)
}
c.nextId++
c.Unlock()
}()
}
return &c return &c
} }
@ -328,6 +346,11 @@ loop:
c.Unlock() c.Unlock()
} }
case messageTypeOptions:
c.optionsLock.Lock()
c.options = c.mreader.readOptions()
c.optionsLock.Unlock()
default: default:
c.close(fmt.Errorf("Protocol error: %s: unknown message type %#x", c.ID, hdr.msgType)) c.close(fmt.Errorf("Protocol error: %s: unknown message type %#x", c.ID, hdr.msgType))
break loop break loop
@ -396,3 +419,9 @@ func (c *Connection) Statistics() Statistics {
return stats return stats
} }
func (c *Connection) Option(key string) string {
c.optionsLock.Lock()
defer c.optionsLock.Unlock()
return c.options[key]
}

View File

@ -43,8 +43,8 @@ func TestPing(t *testing.T) {
ar, aw := io.Pipe() ar, aw := io.Pipe()
br, bw := io.Pipe() br, bw := io.Pipe()
c0 := NewConnection("c0", ar, bw, nil) c0 := NewConnection("c0", ar, bw, nil, nil)
c1 := NewConnection("c1", br, aw, nil) c1 := NewConnection("c1", br, aw, nil, nil)
if ok := c0.ping(); !ok { if ok := c0.ping(); !ok {
t.Error("c0 ping failed") t.Error("c0 ping failed")
@ -67,8 +67,8 @@ func TestPingErr(t *testing.T) {
eaw := &ErrPipe{PipeWriter: *aw, max: i, err: e} eaw := &ErrPipe{PipeWriter: *aw, max: i, err: e}
ebw := &ErrPipe{PipeWriter: *bw, max: j, err: e} ebw := &ErrPipe{PipeWriter: *bw, max: j, err: e}
c0 := NewConnection("c0", ar, ebw, m0) c0 := NewConnection("c0", ar, ebw, m0, nil)
NewConnection("c1", br, eaw, m1) NewConnection("c1", br, eaw, m1, nil)
res := c0.ping() res := c0.ping()
if (i < 4 || j < 4) && res { if (i < 4 || j < 4) && res {
@ -94,8 +94,8 @@ func TestRequestResponseErr(t *testing.T) {
eaw := &ErrPipe{PipeWriter: *aw, max: i, err: e} eaw := &ErrPipe{PipeWriter: *aw, max: i, err: e}
ebw := &ErrPipe{PipeWriter: *bw, max: j, err: e} ebw := &ErrPipe{PipeWriter: *bw, max: j, err: e}
NewConnection("c0", ar, ebw, m0) NewConnection("c0", ar, ebw, m0, nil)
c1 := NewConnection("c1", br, eaw, m1) c1 := NewConnection("c1", br, eaw, m1, nil)
d, err := c1.Request("tn", 1234, 3456, []byte("hashbytes")) d, err := c1.Request("tn", 1234, 3456, []byte("hashbytes"))
if err == e || err == ErrClosed { if err == e || err == ErrClosed {
@ -143,8 +143,8 @@ func TestVersionErr(t *testing.T) {
ar, aw := io.Pipe() ar, aw := io.Pipe()
br, bw := io.Pipe() br, bw := io.Pipe()
c0 := NewConnection("c0", ar, bw, m0) c0 := NewConnection("c0", ar, bw, m0, nil)
NewConnection("c1", br, aw, m1) NewConnection("c1", br, aw, m1, nil)
c0.mwriter.writeHeader(header{ c0.mwriter.writeHeader(header{
version: 2, version: 2,
@ -165,8 +165,8 @@ func TestTypeErr(t *testing.T) {
ar, aw := io.Pipe() ar, aw := io.Pipe()
br, bw := io.Pipe() br, bw := io.Pipe()
c0 := NewConnection("c0", ar, bw, m0) c0 := NewConnection("c0", ar, bw, m0, nil)
NewConnection("c1", br, aw, m1) NewConnection("c1", br, aw, m1, nil)
c0.mwriter.writeHeader(header{ c0.mwriter.writeHeader(header{
version: 0, version: 0,
@ -187,8 +187,8 @@ func TestClose(t *testing.T) {
ar, aw := io.Pipe() ar, aw := io.Pipe()
br, bw := io.Pipe() br, bw := io.Pipe()
c0 := NewConnection("c0", ar, bw, m0) c0 := NewConnection("c0", ar, bw, m0, nil)
NewConnection("c1", br, aw, m1) NewConnection("c1", br, aw, m1, nil)
c0.close(nil) c0.close(nil)