diff --git a/cmd/stcli/main.go b/cmd/stcli/main.go index 6e7ec11fe..f9cbecdc6 100644 --- a/cmd/stcli/main.go +++ b/cmd/stcli/main.go @@ -61,7 +61,7 @@ func connect(target string) { remoteID := certID(conn.ConnectionState().PeerCertificates[0].Raw) - pc = protocol.NewConnection(remoteID, conn, conn, Model{}, nil) + pc = protocol.NewConnection(remoteID, conn, conn, Model{}) select {} } @@ -127,6 +127,11 @@ func (m Model) IndexUpdate(nodeID string, repo string, files []protocol.FileInfo } } +func (m Model) ClusterConfig(nodeID string, config protocol.ClusterConfigMessage) { + log.Println("Received cluster config") + log.Printf("%#v", config) +} + func (m Model) Request(nodeID, repo string, name string, offset int64, size int) ([]byte, error) { log.Println("Received request") return nil, io.EOF diff --git a/cmd/syncthing/config.go b/cmd/syncthing/config.go index fc86ab390..77d33e300 100644 --- a/cmd/syncthing/config.go +++ b/cmd/syncthing/config.go @@ -1,9 +1,7 @@ package main import ( - "crypto/sha256" "encoding/xml" - "fmt" "io" "reflect" "sort" @@ -231,15 +229,6 @@ func (l NodeConfigurationList) Len() int { return len(l) } -func clusterHash(nodes []NodeConfiguration) string { - sort.Sort(NodeConfigurationList(nodes)) - h := sha256.New() - for _, n := range nodes { - h.Write([]byte(n.NodeID)) - } - return fmt.Sprintf("%x", h.Sum(nil)) -} - func cleanNodeList(nodes []NodeConfiguration, myID string) []NodeConfiguration { var myIDExists bool for _, node := range nodes { diff --git a/cmd/syncthing/main.go b/cmd/syncthing/main.go index 707189411..15076aa1d 100644 --- a/cmd/syncthing/main.go +++ b/cmd/syncthing/main.go @@ -219,15 +219,9 @@ func main() { m.ScanRepos() m.SaveIndexes(confDir) - connOpts := map[string]string{ - "clientId": "syncthing", - "clientVersion": Version, - "clusterHash": clusterHash(cfg.Repositories[0].Nodes), - } - // Routine to connect out to configured nodes disc := discovery() - go listenConnect(myID, disc, m, tlsCfg, connOpts) + go listenConnect(myID, disc, m, tlsCfg) for _, repo := range cfg.Repositories { // Routine to pull blocks from other nodes to synchronize the local @@ -325,7 +319,7 @@ func saveConfig() { saveConfigCh <- struct{}{} } -func listenConnect(myID string, disc *discover.Discoverer, m *Model, tlsCfg *tls.Config, connOpts map[string]string) { +func listenConnect(myID string, disc *discover.Discoverer, m *Model, tlsCfg *tls.Config) { var conns = make(chan *tls.Conn) // Listen @@ -438,7 +432,7 @@ next: if rateBucket != nil { wr = &limitedWriter{conn, rateBucket} } - protoConn := protocol.NewConnection(remoteID, conn, wr, m, connOpts) + protoConn := protocol.NewConnection(remoteID, conn, wr, m) m.AddConnection(conn, protoConn) continue next } diff --git a/cmd/syncthing/model.go b/cmd/syncthing/model.go index cfaa06165..8a896021e 100644 --- a/cmd/syncthing/model.go +++ b/cmd/syncthing/model.go @@ -31,6 +31,7 @@ type Model struct { protoConn map[string]protocol.Connection rawConn map[string]io.Closer + nodeVer map[string]string pmut sync.RWMutex // protects protoConn and rawConn sup suppressor @@ -56,6 +57,7 @@ func NewModel(maxChangeBw int) *Model { cm: cid.NewMap(), protoConn: make(map[string]protocol.Connection), rawConn: make(map[string]io.Closer), + nodeVer: make(map[string]string), sup: suppressor{threshold: int64(maxChangeBw)}, } @@ -87,7 +89,6 @@ func (m *Model) StartRepoRO(repo string) { type ConnectionInfo struct { protocol.Statistics Address string - ClientID string ClientVersion string Completion int } @@ -105,8 +106,7 @@ func (m *Model) ConnectionStats() map[string]ConnectionInfo { for node, conn := range m.protoConn { ci := ConnectionInfo{ Statistics: conn.Statistics(), - ClientID: conn.Option("clientId"), - ClientVersion: conn.Option("clientVersion"), + ClientVersion: m.nodeVer[node], } if nc, ok := m.rawConn[node].(remoteAddrer); ok { ci.Address = nc.RemoteAddr().String() @@ -245,15 +245,37 @@ func (m *Model) IndexUpdate(nodeID string, repo string, fs []protocol.FileInfo) m.rmut.RUnlock() } +func (m *Model) ClusterConfig(nodeID string, config protocol.ClusterConfigMessage) { + compErr := compareClusterConfig(m.clusterConfig(nodeID), config) + if debugNet { + dlog.Printf("ClusterConfig: %s: %#v", nodeID, config) + dlog.Printf(" ... compare: %s: %v", nodeID, compErr) + } + + if compErr != nil { + warnf("%s: %v", nodeID, compErr) + m.Close(nodeID, compErr) + } + + m.pmut.Lock() + if config.ClientName == "syncthing" { + m.nodeVer[nodeID] = config.ClientVersion + } else { + m.nodeVer[nodeID] = config.ClientName + " " + config.ClientVersion + } + m.pmut.Unlock() +} + // Close removes the peer from the model and closes the underlying connection if possible. // Implements the protocol.Model interface. func (m *Model) Close(node string, err error) { if debugNet { dlog.Printf("%s: %v", node, err) } - if err == protocol.ErrClusterHash { - warnf("Connection to %s closed due to mismatched cluster hash. Ensure that the configured cluster members are identical on both nodes.", node) - } else if err != io.EOF { + + if err != io.EOF { + warnf("Connection to %s closed: %v", node, err) + } else if _, ok := err.(ClusterConfigMismatch); ok { warnf("Connection to %s closed: %v", node, err) } @@ -272,6 +294,7 @@ func (m *Model) Close(node string, err error) { } delete(m.protoConn, node) delete(m.rawConn, node) + delete(m.nodeVer, node) m.pmut.Unlock() } @@ -386,6 +409,9 @@ func (m *Model) AddConnection(rawConn io.Closer, protoConn protocol.Connection) m.rawConn[nodeID] = rawConn m.pmut.Unlock() + cm := m.clusterConfig(nodeID) + protoConn.ClusterConfig(cm) + go func() { m.rmut.RLock() repos := m.nodeRepos[nodeID] @@ -596,46 +622,28 @@ func (m *Model) loadIndex(repo string, dir string) []protocol.FileInfo { return im.Files } -func fileFromFileInfo(f protocol.FileInfo) scanner.File { - var blocks = make([]scanner.Block, len(f.Blocks)) - var offset int64 - for i, b := range f.Blocks { - blocks[i] = scanner.Block{ - Offset: offset, - Size: b.Size, - Hash: b.Hash, - } - offset += int64(b.Size) +// clusterConfig returns a ClusterConfigMessage that is correct for the given peer node +func (m *Model) clusterConfig(node string) protocol.ClusterConfigMessage { + cm := protocol.ClusterConfigMessage{ + ClientName: "syncthing", + ClientVersion: Version, } - return scanner.File{ - // Name is with native separator and normalization - Name: filepath.FromSlash(f.Name), - Size: offset, - Flags: f.Flags &^ protocol.FlagInvalid, - Modified: f.Modified, - Version: f.Version, - Blocks: blocks, - Suppressed: f.Flags&protocol.FlagInvalid != 0, - } -} -func fileInfoFromFile(f scanner.File) protocol.FileInfo { - var blocks = make([]protocol.BlockInfo, len(f.Blocks)) - for i, b := range f.Blocks { - blocks[i] = protocol.BlockInfo{ - Size: b.Size, - Hash: b.Hash, + m.rmut.Lock() + for _, repo := range m.nodeRepos[node] { + cr := protocol.Repository{ + ID: repo, } + for _, node := range m.repoNodes[repo] { + // TODO: Set read only bit when relevant + cr.Nodes = append(cr.Nodes, protocol.Node{ + ID: node, + Flags: protocol.FlagShareTrusted, + }) + } + cm.Repositories = append(cm.Repositories, cr) } - pf := protocol.FileInfo{ - Name: filepath.ToSlash(f.Name), - Flags: f.Flags, - Modified: f.Modified, - Version: f.Version, - Blocks: blocks, - } - if f.Suppressed { - pf.Flags |= protocol.FlagInvalid - } - return pf + m.rmut.Unlock() + + return cm } diff --git a/cmd/syncthing/model_test.go b/cmd/syncthing/model_test.go index dc682fcbe..a62a32a43 100644 --- a/cmd/syncthing/model_test.go +++ b/cmd/syncthing/model_test.go @@ -170,6 +170,8 @@ func (f FakeConnection) Request(repo, name string, offset int64, size int) ([]by return f.requestData, nil } +func (FakeConnection) ClusterConfig(protocol.ClusterConfigMessage) {} + func (FakeConnection) Ping() bool { return true } diff --git a/cmd/syncthing/util.go b/cmd/syncthing/util.go index 6d284ec54..4fe2ef947 100644 --- a/cmd/syncthing/util.go +++ b/cmd/syncthing/util.go @@ -3,7 +3,11 @@ package main import ( "fmt" "os" + "path/filepath" "runtime" + + "github.com/calmh/syncthing/protocol" + "github.com/calmh/syncthing/scanner" ) func MetricPrefix(n int64) string { @@ -41,3 +45,99 @@ func Rename(from, to string) error { } return os.Rename(from, to) } + +func fileFromFileInfo(f protocol.FileInfo) scanner.File { + var blocks = make([]scanner.Block, len(f.Blocks)) + var offset int64 + for i, b := range f.Blocks { + blocks[i] = scanner.Block{ + Offset: offset, + Size: b.Size, + Hash: b.Hash, + } + offset += int64(b.Size) + } + return scanner.File{ + // Name is with native separator and normalization + Name: filepath.FromSlash(f.Name), + Size: offset, + Flags: f.Flags &^ protocol.FlagInvalid, + Modified: f.Modified, + Version: f.Version, + Blocks: blocks, + Suppressed: f.Flags&protocol.FlagInvalid != 0, + } +} + +func fileInfoFromFile(f scanner.File) protocol.FileInfo { + var blocks = make([]protocol.BlockInfo, len(f.Blocks)) + for i, b := range f.Blocks { + blocks[i] = protocol.BlockInfo{ + Size: b.Size, + Hash: b.Hash, + } + } + pf := protocol.FileInfo{ + Name: filepath.ToSlash(f.Name), + Flags: f.Flags, + Modified: f.Modified, + Version: f.Version, + Blocks: blocks, + } + if f.Suppressed { + pf.Flags |= protocol.FlagInvalid + } + return pf +} + +func cmMap(cm protocol.ClusterConfigMessage) map[string]map[string]uint32 { + m := make(map[string]map[string]uint32) + for _, repo := range cm.Repositories { + m[repo.ID] = make(map[string]uint32) + for _, node := range repo.Nodes { + m[repo.ID][node.ID] = node.Flags + } + } + return m +} + +type ClusterConfigMismatch error + +// compareClusterConfig returns nil for two equivalent configurations, +// otherwise a decriptive error +func compareClusterConfig(local, remote protocol.ClusterConfigMessage) error { + lm := cmMap(local) + rm := cmMap(remote) + + for repo, lnodes := range lm { + _ = lnodes + if rnodes, ok := rm[repo]; ok { + for node, lflags := range lnodes { + if rflags, ok := rnodes[node]; ok { + if lflags&protocol.FlagShareBits != rflags&protocol.FlagShareBits { + return ClusterConfigMismatch(fmt.Errorf("remote has different sharing flags for node %q in repository %q", node, repo)) + } + } else { + return ClusterConfigMismatch(fmt.Errorf("remote is missing node %q in repository %q", node, repo)) + } + } + } else { + return ClusterConfigMismatch(fmt.Errorf("remote is missing repository %q", repo)) + } + } + + for repo, rnodes := range rm { + if lnodes, ok := lm[repo]; ok { + for node := range rnodes { + if _, ok := lnodes[node]; !ok { + return ClusterConfigMismatch(fmt.Errorf("remote has extra node %q in repository %q", node, repo)) + } + } + } else { + return ClusterConfigMismatch(fmt.Errorf("remote has extra repository %q", repo)) + } + + } + + return nil +} diff --git a/cmd/syncthing/util_test.go b/cmd/syncthing/util_test.go new file mode 100644 index 000000000..448abd230 --- /dev/null +++ b/cmd/syncthing/util_test.go @@ -0,0 +1,183 @@ +package main + +import ( + "testing" + + "github.com/calmh/syncthing/protocol" +) + +var testcases = []struct { + local, remote protocol.ClusterConfigMessage + err string +}{ + { + local: protocol.ClusterConfigMessage{}, + remote: protocol.ClusterConfigMessage{}, + err: "", + }, + { + local: protocol.ClusterConfigMessage{ClientName: "a", ClientVersion: "b"}, + remote: protocol.ClusterConfigMessage{ClientName: "c", ClientVersion: "d"}, + err: "", + }, + { + local: protocol.ClusterConfigMessage{ + Repositories: []protocol.Repository{ + {ID: "foo"}, + }, + }, + remote: protocol.ClusterConfigMessage{ClientName: "c", ClientVersion: "d"}, + err: `remote is missing repository "foo"`, + }, + { + local: protocol.ClusterConfigMessage{ClientName: "c", ClientVersion: "d"}, + remote: protocol.ClusterConfigMessage{ + Repositories: []protocol.Repository{ + {ID: "foo"}, + }, + }, + err: `remote has extra repository "foo"`, + }, + { + local: protocol.ClusterConfigMessage{ + Repositories: []protocol.Repository{ + {ID: "foo"}, + {ID: "bar"}, + }, + }, + remote: protocol.ClusterConfigMessage{ + Repositories: []protocol.Repository{ + {ID: "foo"}, + {ID: "bar"}, + }, + }, + err: "", + }, + { + local: protocol.ClusterConfigMessage{ + Repositories: []protocol.Repository{ + {ID: "quux"}, + {ID: "foo"}, + {ID: "bar"}, + }, + }, + remote: protocol.ClusterConfigMessage{ + Repositories: []protocol.Repository{ + {ID: "bar"}, + {ID: "quux"}, + }, + }, + err: `remote is missing repository "foo"`, + }, + { + local: protocol.ClusterConfigMessage{ + Repositories: []protocol.Repository{ + {ID: "quux"}, + {ID: "bar"}, + }, + }, + remote: protocol.ClusterConfigMessage{ + Repositories: []protocol.Repository{ + {ID: "bar"}, + {ID: "foo"}, + {ID: "quux"}, + }, + }, + err: `remote has extra repository "foo"`, + }, + { + local: protocol.ClusterConfigMessage{ + Repositories: []protocol.Repository{ + { + ID: "foo", + Nodes: []protocol.Node{ + {ID: "a"}, + }, + }, + {ID: "bar"}, + }, + }, + remote: protocol.ClusterConfigMessage{ + Repositories: []protocol.Repository{ + {ID: "foo"}, + {ID: "bar"}, + }, + }, + err: `remote is missing node "a" in repository "foo"`, + }, + + { + local: protocol.ClusterConfigMessage{ + Repositories: []protocol.Repository{ + { + ID: "foo", + Nodes: []protocol.Node{ + {ID: "a"}, + }, + }, + {ID: "bar"}, + }, + }, + remote: protocol.ClusterConfigMessage{ + Repositories: []protocol.Repository{ + { + ID: "foo", + Nodes: []protocol.Node{ + {ID: "a"}, + {ID: "b"}, + }, + }, + {ID: "bar"}, + }, + }, + err: `remote has extra node "b" in repository "foo"`, + }, + + { + local: protocol.ClusterConfigMessage{ + Repositories: []protocol.Repository{ + { + ID: "foo", + Nodes: []protocol.Node{ + { + ID: "a", + Flags: protocol.FlagShareReadOnly, + }, + }, + }, + {ID: "bar"}, + }, + }, + remote: protocol.ClusterConfigMessage{ + Repositories: []protocol.Repository{ + { + ID: "foo", + Nodes: []protocol.Node{ + { + ID: "a", + Flags: protocol.FlagShareTrusted, + }, + }, + }, + {ID: "bar"}, + }, + }, + err: `remote has different sharing flags for node "a" in repository "foo"`, + }, +} + +func TestCompareClusterConfig(t *testing.T) { + for i, tc := range testcases { + err := compareClusterConfig(tc.local, tc.remote) + switch { + case tc.err == "" && err != nil: + t.Errorf("#%d: unexpected error: %v", i, err) + + case tc.err != "" && err == nil: + t.Errorf("#%d: unexpected nil error", i) + + case tc.err != "" && err != nil && tc.err != err.Error(): + t.Errorf("#%d: incorrect error: %q != %q", i, err, tc.err) + } + } +} diff --git a/protocol/PROTOCOL.md b/protocol/PROTOCOL.md index 460e9b84c..1acea0f9e 100644 --- a/protocol/PROTOCOL.md +++ b/protocol/PROTOCOL.md @@ -79,10 +79,14 @@ version, type and ID. +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ For BEP v1 the Version field is set to zero. Future versions with -incompatible message formats will increment the Version field. +incompatible message formats will increment the Version field. A message +with an unknown version is a protocol error and MUST result in the +connection being terminated. A client supporting multiple versions MAY +retry with a different protcol version upon disconnection. The Type field indicates the type of data following the message header -and is one of the integers defined below. +and is one of the integers defined below. A message of an unknown type +is a protocol error and MUST result in the connection being terminated. The Message ID is set to a unique value for each transmitted message. In request messages the Reply To is set to zero. In response messages it is @@ -110,6 +114,183 @@ Opaque data should not be interpreted but can be compared bytewise to other opaque data. All strings MUST use the Unicode UTF-8 encoding, normalization form C. +### Cluster Config (Type = 0) + +This informational message provides information about the cluster +configuration, as it pertains to the current connection. It is sent by +both sides after connection establishment. + +#### Graphical Representation + + ClusterConfigMessage Structure: + + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Length of ClientName | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + / / + \ ClientName (variable length) \ + / / + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Length of ClientVersion | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + / / + \ ClientVersion (variable length) \ + / / + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Number of Repositories | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + / / + \ Zero or more Repository Structures \ + / / + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Number of Options | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + / / + \ Zero or more Option Structures \ + / / + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + + + Repository Structure: + + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Length of ID | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + / / + \ ID (variable length) \ + / / + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Number of Nodes | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + / / + \ Zero or more Node Structures \ + / / + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + + + Node Structure: + + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Length of ID | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + / / + \ ID (variable length) \ + / / + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Flags | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + + + Option Structure: + + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Length of Key | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + / / + \ Key (variable length) \ + / / + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Length of Value | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + / / + \ Value (variable length) \ + / / + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + +#### Fields + +The ClientName and ClientVersion fields identify the implementation. The +values SHOULD be simple strings identifying the implementation name, as +a user would expect to see it, and the version string in the same +manner. An example ClientName is "syncthing" and an example +ClientVersion is "v0.7.2". The ClientVersion field SHOULD follow the +patterns laid out in the [Semantic Versioning](http://semver.org/) +standard. + +The Repositories field lists all repositories that will be synchronized +over the current connection. Each repository has a list of participating +Nodes. Each node has an associated Flags field to indicate the sharing +mode of that node for the repository in question. See the discussion on +Sharing Modes. + +The Node Flags field contains the following single bit flags: + + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Reserved |Pri| Reserved |R|T| + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + + - Bit 31 ("T", Trusted) is set for nodes that participate in trusted + mode. + + - Bit 30 ("R", Read Only) is set for nodes that participate in read + only mode. + + - Bits 16 through 28 are reserved and MUST be set to zero. + + - Bits 14-15 ("Pri) indicate the node's upload priority for this + repository. Possible values are: + + - 00: The default. Normal priority. + + - 01: High priority. Other nodes SHOULD favour requesting files from + this node over nodes with normal or low priority. + + - 10: Low priority. Other nodes SHOULD avoid requesting files from + this node when they are available from other nodes. + + - 11: Sharing disabled. Other nodes SHOULD NOT request files from + this node. + + - Bits 0 through 14 are reserved and MUST be set to zero. + +Exactly one of the T, R or S bits MUST be set. + +The Options field contain option values to be used in an implementation +specific manner. The options list is conceptually a map of Key => Value +items, although it is transmitted in the form of a list of (Key, Value) +pairs, both of string type. Key ID:s are implementation specific. An +implementation MUST ignore unknown keys. An implementation MAY impose +limits on the length keys and values. The options list may be used to +inform nodes of relevant local configuration options such as rate +limiting or make recommendations about request parallellism, node +priorities, etc. An empty options list is valid for nodes not having any +such information to share. Nodes MAY NOT make any assumptions about +peers acting in a specific manner as a result of sent options. + +#### XDR + + struct ClusterConfigMessage { + string ClientName<>; + string ClientVersion<>; + Repository Repositories<>; + Option Options<>; + } + + struct Repository { + string ID<>; + Node Nodes<>; + } + + struct Node { + string ID<>; + unsigned int Flags; + } + + struct Option { + string Key<>; + string Value<>; + } + ### Index (Type = 1) The Index message defines the contents of the senders repository. An @@ -356,65 +537,32 @@ model, the Index Update merely amends it with new or updated file information. Any files not mentioned in an Index Update are left unchanged. -### Options (Type = 7) +Sharing Modes +------------- -This informational message provides information about the client -configuration, version, etc. It is sent at connection initiation and, -optionally, when any of the sent parameters have changed. The message is -in the form of a list of (key, value) pairs, both of string type. +### Trusted -Key ID:s apart from the well known ones are implementation specific. An -implementation is expected to ignore unknown keys. An implementation may -impose limits on key and value size. +Trusted mode is the default sharing mode. Updates are exchanged in both +directions. -Well known keys: + +------------+ Updates /---------\ + | | -----------> / \ + | Node | | Cluster | + | | <----------- \ / + +------------+ Updates \---------/ - - "clientId" -- The name of the implementation. Example: "syncthing". +### Read Only - - "clientVersion" -- The version of the client. Example: "v1.0.33-47". - The Following the SemVer 2.0 specification for version strings is - encouraged but not enforced. +In read only mode a node does not synchronize the local repository to +the cluster, but publishes changes to it's local repository contents as +usual. The local repository can be seen as a "master copy" that is never +affected by the actions of other cluster nodes. -#### Graphical Representation - - 0 1 2 3 - 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 - +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - | Number of Options | - +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - / / - \ Zero or more KeyValue Structures \ - / / - +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - - KeyValue Structure: - - 0 1 2 3 - 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 - +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - | Length of Key | - +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - / / - \ Key (variable length) \ - / / - +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - | Length of Value | - +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - / / - \ Value (variable length) \ - / / - +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ - -#### XDR - - struct OptionsMessage { - KeyValue Options<>; - } - - struct KeyValue { - string Key<>; - string Value<>; - } + +------------+ Updates /---------\ + | | -----------> / \ + | Node | | Cluster | + | | \ / + +------------+ \---------/ Message Limits -------------- diff --git a/protocol/common_test.go b/protocol/common_test.go index 246d2b8fb..143039ea4 100644 --- a/protocol/common_test.go +++ b/protocol/common_test.go @@ -38,6 +38,9 @@ func (t *TestModel) Close(nodeID string, err error) { close(t.closedCh) } +func (t *TestModel) ClusterConfig(nodeID string, config ClusterConfigMessage) { +} + func (t *TestModel) isClosed() bool { select { case <-t.closedCh: diff --git a/protocol/message_types.go b/protocol/message_types.go index 18a04cded..a70ad9b6f 100644 --- a/protocol/message_types.go +++ b/protocol/message_types.go @@ -25,8 +25,21 @@ type RequestMessage struct { Size uint32 } -type OptionsMessage struct { - Options []Option // max:64 +type ClusterConfigMessage struct { + ClientName string // max:64 + ClientVersion string // max:64 + Repositories []Repository // max:64 + Options []Option // max:64 +} + +type Repository struct { + ID string // max:64 + Nodes []Node // max:64 +} + +type Node struct { + ID string // max:64 + Flags uint32 } type Option struct { diff --git a/protocol/message_xdr.go b/protocol/message_xdr.go index 290e1c0e1..3af3b4357 100644 --- a/protocol/message_xdr.go +++ b/protocol/message_xdr.go @@ -198,19 +198,34 @@ func (o *RequestMessage) decodeXDR(xr *xdr.Reader) error { return xr.Error() } -func (o OptionsMessage) EncodeXDR(w io.Writer) (int, error) { +func (o ClusterConfigMessage) EncodeXDR(w io.Writer) (int, error) { var xw = xdr.NewWriter(w) return o.encodeXDR(xw) } -func (o OptionsMessage) MarshalXDR() []byte { +func (o ClusterConfigMessage) MarshalXDR() []byte { var buf bytes.Buffer var xw = xdr.NewWriter(&buf) o.encodeXDR(xw) return buf.Bytes() } -func (o OptionsMessage) encodeXDR(xw *xdr.Writer) (int, error) { +func (o ClusterConfigMessage) encodeXDR(xw *xdr.Writer) (int, error) { + if len(o.ClientName) > 64 { + return xw.Tot(), xdr.ErrElementSizeExceeded + } + xw.WriteString(o.ClientName) + if len(o.ClientVersion) > 64 { + return xw.Tot(), xdr.ErrElementSizeExceeded + } + xw.WriteString(o.ClientVersion) + if len(o.Repositories) > 64 { + return xw.Tot(), xdr.ErrElementSizeExceeded + } + xw.WriteUint32(uint32(len(o.Repositories))) + for i := range o.Repositories { + o.Repositories[i].encodeXDR(xw) + } if len(o.Options) > 64 { return xw.Tot(), xdr.ErrElementSizeExceeded } @@ -221,18 +236,28 @@ func (o OptionsMessage) encodeXDR(xw *xdr.Writer) (int, error) { return xw.Tot(), xw.Error() } -func (o *OptionsMessage) DecodeXDR(r io.Reader) error { +func (o *ClusterConfigMessage) DecodeXDR(r io.Reader) error { xr := xdr.NewReader(r) return o.decodeXDR(xr) } -func (o *OptionsMessage) UnmarshalXDR(bs []byte) error { +func (o *ClusterConfigMessage) UnmarshalXDR(bs []byte) error { var buf = bytes.NewBuffer(bs) var xr = xdr.NewReader(buf) return o.decodeXDR(xr) } -func (o *OptionsMessage) decodeXDR(xr *xdr.Reader) error { +func (o *ClusterConfigMessage) decodeXDR(xr *xdr.Reader) error { + o.ClientName = xr.ReadStringMax(64) + o.ClientVersion = xr.ReadStringMax(64) + _RepositoriesSize := int(xr.ReadUint32()) + if _RepositoriesSize > 64 { + return xdr.ErrElementSizeExceeded + } + o.Repositories = make([]Repository, _RepositoriesSize) + for i := range o.Repositories { + (&o.Repositories[i]).decodeXDR(xr) + } _OptionsSize := int(xr.ReadUint32()) if _OptionsSize > 64 { return xdr.ErrElementSizeExceeded @@ -244,6 +269,95 @@ func (o *OptionsMessage) decodeXDR(xr *xdr.Reader) error { return xr.Error() } +func (o Repository) EncodeXDR(w io.Writer) (int, error) { + var xw = xdr.NewWriter(w) + return o.encodeXDR(xw) +} + +func (o Repository) MarshalXDR() []byte { + var buf bytes.Buffer + var xw = xdr.NewWriter(&buf) + o.encodeXDR(xw) + return buf.Bytes() +} + +func (o Repository) encodeXDR(xw *xdr.Writer) (int, error) { + if len(o.ID) > 64 { + return xw.Tot(), xdr.ErrElementSizeExceeded + } + xw.WriteString(o.ID) + if len(o.Nodes) > 64 { + return xw.Tot(), xdr.ErrElementSizeExceeded + } + xw.WriteUint32(uint32(len(o.Nodes))) + for i := range o.Nodes { + o.Nodes[i].encodeXDR(xw) + } + return xw.Tot(), xw.Error() +} + +func (o *Repository) DecodeXDR(r io.Reader) error { + xr := xdr.NewReader(r) + return o.decodeXDR(xr) +} + +func (o *Repository) UnmarshalXDR(bs []byte) error { + var buf = bytes.NewBuffer(bs) + var xr = xdr.NewReader(buf) + return o.decodeXDR(xr) +} + +func (o *Repository) decodeXDR(xr *xdr.Reader) error { + o.ID = xr.ReadStringMax(64) + _NodesSize := int(xr.ReadUint32()) + if _NodesSize > 64 { + return xdr.ErrElementSizeExceeded + } + o.Nodes = make([]Node, _NodesSize) + for i := range o.Nodes { + (&o.Nodes[i]).decodeXDR(xr) + } + return xr.Error() +} + +func (o Node) EncodeXDR(w io.Writer) (int, error) { + var xw = xdr.NewWriter(w) + return o.encodeXDR(xw) +} + +func (o Node) MarshalXDR() []byte { + var buf bytes.Buffer + var xw = xdr.NewWriter(&buf) + o.encodeXDR(xw) + return buf.Bytes() +} + +func (o Node) encodeXDR(xw *xdr.Writer) (int, error) { + if len(o.ID) > 64 { + return xw.Tot(), xdr.ErrElementSizeExceeded + } + xw.WriteString(o.ID) + xw.WriteUint32(o.Flags) + return xw.Tot(), xw.Error() +} + +func (o *Node) DecodeXDR(r io.Reader) error { + xr := xdr.NewReader(r) + return o.decodeXDR(xr) +} + +func (o *Node) UnmarshalXDR(bs []byte) error { + var buf = bytes.NewBuffer(bs) + var xr = xdr.NewReader(buf) + return o.decodeXDR(xr) +} + +func (o *Node) decodeXDR(xr *xdr.Reader) error { + o.ID = xr.ReadStringMax(64) + o.Flags = xr.ReadUint32() + return xr.Error() +} + func (o Option) EncodeXDR(w io.Writer) (int, error) { var xw = xdr.NewWriter(w) return o.encodeXDR(xw) diff --git a/protocol/nativemodel_darwin.go b/protocol/nativemodel_darwin.go index f89a26941..c8028c772 100644 --- a/protocol/nativemodel_darwin.go +++ b/protocol/nativemodel_darwin.go @@ -29,6 +29,10 @@ func (m nativeModel) Request(nodeID, repo string, name string, offset int64, siz return m.next.Request(nodeID, repo, name, offset, size) } +func (m nativeModel) ClusterConfig(nodeID string, config ClusterConfigMessage) { + m.next.ClusterConfig(nodeID, config) +} + func (m nativeModel) Close(nodeID string, err error) { m.next.Close(nodeID, err) } diff --git a/protocol/nativemodel_unix.go b/protocol/nativemodel_unix.go index 9739874fc..f01d7c6a5 100644 --- a/protocol/nativemodel_unix.go +++ b/protocol/nativemodel_unix.go @@ -20,6 +20,10 @@ func (m nativeModel) Request(nodeID, repo string, name string, offset int64, siz return m.next.Request(nodeID, repo, name, offset, size) } +func (m nativeModel) ClusterConfig(nodeID string, config ClusterConfigMessage) { + m.next.ClusterConfig(nodeID, config) +} + func (m nativeModel) Close(nodeID string, err error) { m.next.Close(nodeID, err) } diff --git a/protocol/nativemodel_windows.go b/protocol/nativemodel_windows.go index a7aeedf6f..c35831307 100644 --- a/protocol/nativemodel_windows.go +++ b/protocol/nativemodel_windows.go @@ -29,6 +29,10 @@ func (m nativeModel) Request(nodeID, repo string, name string, offset int64, siz return m.next.Request(nodeID, repo, name, offset, size) } +func (m nativeModel) ClusterConfig(nodeID string, config ClusterConfigMessage) { + m.next.ClusterConfig(nodeID, config) +} + func (m nativeModel) Close(nodeID string, err error) { m.next.Close(nodeID, err) } diff --git a/protocol/protocol.go b/protocol/protocol.go index 59c44baaa..9b190c02b 100644 --- a/protocol/protocol.go +++ b/protocol/protocol.go @@ -6,7 +6,6 @@ import ( "errors" "fmt" "io" - "log" "sync" "time" @@ -17,13 +16,13 @@ import ( const BlockSize = 128 * 1024 const ( - messageTypeIndex = 1 - messageTypeRequest = 2 - messageTypeResponse = 3 - messageTypePing = 4 - messageTypePong = 5 - messageTypeIndexUpdate = 6 - messageTypeOptions = 7 + messageTypeClusterConfig = 0 + messageTypeIndex = 1 + messageTypeRequest = 2 + messageTypeResponse = 3 + messageTypePing = 4 + messageTypePong = 5 + messageTypeIndexUpdate = 6 ) const ( @@ -32,6 +31,12 @@ const ( FlagDirectory = 1 << 14 ) +const ( + FlagShareTrusted uint32 = 1 << 0 + FlagShareReadOnly = 1 << 1 + FlagShareBits = 0x000000ff +) + var ( ErrClusterHash = fmt.Errorf("configuration error: mismatched cluster hash") ErrClosed = errors.New("connection closed") @@ -44,6 +49,8 @@ type Model interface { IndexUpdate(nodeID string, repo string, files []FileInfo) // A request was made by the peer node Request(nodeID string, repo string, name string, offset int64, size int) ([]byte, error) + // A cluster configuration message was received + ClusterConfig(nodeID string, config ClusterConfigMessage) // The peer node closed the connection Close(nodeID string, err error) } @@ -52,27 +59,24 @@ type Connection interface { ID() string Index(repo string, files []FileInfo) Request(repo string, name string, offset int64, size int) ([]byte, error) + ClusterConfig(config ClusterConfigMessage) Statistics() Statistics - Option(key string) string } type rawConnection struct { sync.RWMutex - id string - receiver Model - reader io.ReadCloser - xr *xdr.Reader - writer io.WriteCloser - wb *bufio.Writer - xw *xdr.Writer - closed chan struct{} - awaiting map[int]chan asyncResult - nextID int - indexSent map[string]map[string][2]int64 - peerOptions map[string]string - myOptions map[string]string - optionsLock sync.Mutex + id string + receiver Model + reader io.ReadCloser + xr *xdr.Reader + writer io.WriteCloser + wb *bufio.Writer + xw *xdr.Writer + closed chan struct{} + awaiting map[int]chan asyncResult + nextID int + indexSent map[string]map[string][2]int64 hasSentIndex bool hasRecvdIndex bool @@ -88,7 +92,7 @@ const ( pingIdleTime = 5 * time.Minute ) -func NewConnection(nodeID string, reader io.Reader, writer io.Writer, receiver Model, options map[string]string) Connection { +func NewConnection(nodeID string, reader io.Reader, writer io.Writer, receiver Model) Connection { flrd := flate.NewReader(reader) flwr, err := flate.NewWriter(writer, flate.BestSpeed) if err != nil { @@ -112,28 +116,6 @@ func NewConnection(nodeID string, reader io.Reader, writer io.Writer, receiver M go c.readerLoop() go c.pingerLoop() - if options != nil { - c.myOptions = options - go func() { - c.Lock() - header{0, c.nextID, messageTypeOptions}.encodeXDR(c.xw) - var om OptionsMessage - for k, v := range options { - om.Options = append(om.Options, Option{k, v}) - } - om.encodeXDR(c.xw) - err := c.xw.Error() - if err == nil { - err = c.flush() - } - if err != nil { - log.Println("Warning: Write error during initial handshake:", err) - } - c.nextID++ - c.Unlock() - }() - } - return wireFormatConnection{&c} } @@ -217,6 +199,27 @@ func (c *rawConnection) Request(repo string, name string, offset int64, size int return res.val, res.err } +// ClusterConfig send the cluster configuration message to the peer and returns any error +func (c *rawConnection) ClusterConfig(config ClusterConfigMessage) { + c.Lock() + defer c.Unlock() + + if c.isClosed() { + return + } + + header{0, c.nextID, messageTypeClusterConfig}.encodeXDR(c.xw) + c.nextID = (c.nextID + 1) & 0xfff + + _, err := config.encodeXDR(c.xw) + if err == nil { + err = c.flush() + } + if err != nil { + c.close(err) + } +} + func (c *rawConnection) ping() bool { c.Lock() if c.isClosed() { @@ -386,24 +389,14 @@ loop: c.Unlock() } - case messageTypeOptions: - var om OptionsMessage - om.decodeXDR(c.xr) + case messageTypeClusterConfig: + var cm ClusterConfigMessage + cm.decodeXDR(c.xr) if c.xr.Error() != nil { c.close(c.xr.Error()) break loop - } - - c.optionsLock.Lock() - c.peerOptions = make(map[string]string, len(om.Options)) - for _, opt := range om.Options { - c.peerOptions[opt.Key] = opt.Value - } - c.optionsLock.Unlock() - - if mh, rh := c.myOptions["clusterHash"], c.peerOptions["clusterHash"]; len(mh) > 0 && len(rh) > 0 && mh != rh { - c.close(ErrClusterHash) - break loop + } else { + go c.receiver.ClusterConfig(c.id, cm) } default: @@ -472,9 +465,3 @@ func (c *rawConnection) Statistics() Statistics { OutBytesTotal: int(c.xw.Tot()), } } - -func (c *rawConnection) Option(key string) string { - c.optionsLock.Lock() - defer c.optionsLock.Unlock() - return c.peerOptions[key] -} diff --git a/protocol/protocol_test.go b/protocol/protocol_test.go index a4d89cf35..80c4f83a3 100644 --- a/protocol/protocol_test.go +++ b/protocol/protocol_test.go @@ -25,8 +25,8 @@ func TestPing(t *testing.T) { ar, aw := io.Pipe() br, bw := io.Pipe() - c0 := NewConnection("c0", ar, bw, nil, nil).(wireFormatConnection).next.(*rawConnection) - c1 := NewConnection("c1", br, aw, nil, nil).(wireFormatConnection).next.(*rawConnection) + c0 := NewConnection("c0", ar, bw, nil).(wireFormatConnection).next.(*rawConnection) + c1 := NewConnection("c1", br, aw, nil).(wireFormatConnection).next.(*rawConnection) if ok := c0.ping(); !ok { t.Error("c0 ping failed") @@ -49,8 +49,8 @@ func TestPingErr(t *testing.T) { eaw := &ErrPipe{PipeWriter: *aw, max: i, err: e} ebw := &ErrPipe{PipeWriter: *bw, max: j, err: e} - c0 := NewConnection("c0", ar, ebw, m0, nil).(wireFormatConnection).next.(*rawConnection) - NewConnection("c1", br, eaw, m1, nil) + c0 := NewConnection("c0", ar, ebw, m0).(wireFormatConnection).next.(*rawConnection) + NewConnection("c1", br, eaw, m1) res := c0.ping() if (i < 4 || j < 4) && res { @@ -125,8 +125,8 @@ func TestVersionErr(t *testing.T) { ar, aw := io.Pipe() br, bw := io.Pipe() - c0 := NewConnection("c0", ar, bw, m0, nil).(wireFormatConnection).next.(*rawConnection) - NewConnection("c1", br, aw, m1, nil) + c0 := NewConnection("c0", ar, bw, m0).(wireFormatConnection).next.(*rawConnection) + NewConnection("c1", br, aw, m1) c0.xw.WriteUint32(encodeHeader(header{ version: 2, @@ -147,8 +147,8 @@ func TestTypeErr(t *testing.T) { ar, aw := io.Pipe() br, bw := io.Pipe() - c0 := NewConnection("c0", ar, bw, m0, nil).(wireFormatConnection).next.(*rawConnection) - NewConnection("c1", br, aw, m1, nil) + c0 := NewConnection("c0", ar, bw, m0).(wireFormatConnection).next.(*rawConnection) + NewConnection("c1", br, aw, m1) c0.xw.WriteUint32(encodeHeader(header{ version: 0, @@ -169,8 +169,8 @@ func TestClose(t *testing.T) { ar, aw := io.Pipe() br, bw := io.Pipe() - c0 := NewConnection("c0", ar, bw, m0, nil).(wireFormatConnection).next.(*rawConnection) - NewConnection("c1", br, aw, m1, nil) + c0 := NewConnection("c0", ar, bw, m0).(wireFormatConnection).next.(*rawConnection) + NewConnection("c1", br, aw, m1) c0.close(nil) diff --git a/protocol/wireformat.go b/protocol/wireformat.go index d3963ccf3..643e5fb9a 100644 --- a/protocol/wireformat.go +++ b/protocol/wireformat.go @@ -30,10 +30,10 @@ func (c wireFormatConnection) Request(repo, name string, offset int64, size int) return c.next.Request(repo, name, offset, size) } +func (c wireFormatConnection) ClusterConfig(config ClusterConfigMessage) { + c.next.ClusterConfig(config) +} + func (c wireFormatConnection) Statistics() Statistics { return c.next.Statistics() } - -func (c wireFormatConnection) Option(key string) string { - return c.next.Option(key) -}