From 3b3c0c5950f2779af1d71298ea3bea333e495ebc Mon Sep 17 00:00:00 2001 From: Jakob Borg Date: Tue, 8 Apr 2014 13:45:18 +0200 Subject: [PATCH] Configuration version 2 (nodes separate from repos) --- cmd/syncthing/config.go | 71 +++++++++++++++++++++++++------- cmd/syncthing/config_test.go | 80 ++++++++++++++++++++++++++++++++++-- cmd/syncthing/main.go | 36 ++++++++++------ cmd/syncthing/model.go | 33 +++++---------- 4 files changed, 166 insertions(+), 54 deletions(-) diff --git a/cmd/syncthing/config.go b/cmd/syncthing/config.go index b3c6d65eb..3e0878aa0 100644 --- a/cmd/syncthing/config.go +++ b/cmd/syncthing/config.go @@ -11,8 +11,9 @@ import ( ) type Configuration struct { - Version int `xml:"version,attr" default:"1"` + Version int `xml:"version,attr" default:"2"` Repositories []RepositoryConfiguration `xml:"repository"` + Nodes []NodeConfiguration `xml:"node"` Options OptionsConfiguration `xml:"options"` XMLName xml.Name `xml:"configuration" json:"-"` } @@ -21,27 +22,38 @@ type RepositoryConfiguration struct { ID string `xml:"id,attr"` Directory string `xml:"directory,attr"` Nodes []NodeConfiguration `xml:"node"` + ReadOnly bool `xml:"ro,attr"` + nodeIDs []string +} + +func (r *RepositoryConfiguration) NodeIDs() []string { + if r.nodeIDs == nil { + for _, n := range r.Nodes { + r.nodeIDs = append(r.nodeIDs, n.NodeID) + } + } + return r.nodeIDs } type NodeConfiguration struct { NodeID string `xml:"id,attr"` - Name string `xml:"name,attr"` - Addresses []string `xml:"address"` + Name string `xml:"name,attr,omitempty"` + Addresses []string `xml:"address,omitempty"` } type OptionsConfiguration struct { - ListenAddress []string `xml:"listenAddress" default:":22000" ini:"listen-address"` - ReadOnly bool `xml:"readOnly" ini:"read-only"` - GUIEnabled bool `xml:"guiEnabled" default:"true" ini:"gui-enabled"` - GUIAddress string `xml:"guiAddress" default:"127.0.0.1:8080" ini:"gui-address"` - GlobalAnnServer string `xml:"globalAnnounceServer" default:"announce.syncthing.net:22025" ini:"global-announce-server"` - GlobalAnnEnabled bool `xml:"globalAnnounceEnabled" default:"true" ini:"global-announce-enabled"` - LocalAnnEnabled bool `xml:"localAnnounceEnabled" default:"true" ini:"local-announce-enabled"` - ParallelRequests int `xml:"parallelRequests" default:"16" ini:"parallel-requests"` - MaxSendKbps int `xml:"maxSendKbps" ini:"max-send-kbps"` - RescanIntervalS int `xml:"rescanIntervalS" default:"60" ini:"rescan-interval"` - ReconnectIntervalS int `xml:"reconnectionIntervalS" default:"60" ini:"reconnection-interval"` - MaxChangeKbps int `xml:"maxChangeKbps" default:"1000" ini:"max-change-bw"` + ListenAddress []string `xml:"listenAddress" default:":22000"` + ReadOnly bool `xml:"readOnly,omitempty"` + GUIEnabled bool `xml:"guiEnabled" default:"true"` + GUIAddress string `xml:"guiAddress" default:"127.0.0.1:8080"` + GlobalAnnServer string `xml:"globalAnnounceServer" default:"announce.syncthing.net:22025"` + GlobalAnnEnabled bool `xml:"globalAnnounceEnabled" default:"true"` + LocalAnnEnabled bool `xml:"localAnnounceEnabled" default:"true"` + ParallelRequests int `xml:"parallelRequests" default:"16"` + MaxSendKbps int `xml:"maxSendKbps"` + RescanIntervalS int `xml:"rescanIntervalS" default:"60"` + ReconnectIntervalS int `xml:"reconnectionIntervalS" default:"60"` + MaxChangeKbps int `xml:"maxChangeKbps" default:"1000"` StartBrowser bool `xml:"startBrowser" default:"true"` } @@ -159,9 +171,38 @@ func readConfigXML(rd io.Reader) (Configuration, error) { seenRepos[id] = true } + if cfg.Version == 1 { + convertV1V2(&cfg) + } + return cfg, err } +func convertV1V2(cfg *Configuration) { + // Collect the list of nodes. + // Replace node configs inside repositories with only a reference to the nide ID. + // Set all repositories to read only if the global read only flag is set. + var nodes = map[string]NodeConfiguration{} + for i, repo := range cfg.Repositories { + cfg.Repositories[i].ReadOnly = cfg.Options.ReadOnly + for j, node := range repo.Nodes { + if _, ok := nodes[node.NodeID]; !ok { + nodes[node.NodeID] = node + } + cfg.Repositories[i].Nodes[j] = NodeConfiguration{NodeID: node.NodeID} + } + } + + // Set and sort the list of nodes. + for _, node := range nodes { + cfg.Nodes = append(cfg.Nodes, node) + } + sort.Sort(NodeConfigurationList(cfg.Nodes)) + + cfg.Options.ReadOnly = false + cfg.Version = 2 +} + type NodeConfigurationList []NodeConfiguration func (l NodeConfigurationList) Less(a, b int) bool { diff --git a/cmd/syncthing/config_test.go b/cmd/syncthing/config_test.go index 910c37502..a81f4fc94 100644 --- a/cmd/syncthing/config_test.go +++ b/cmd/syncthing/config_test.go @@ -10,7 +10,6 @@ import ( func TestDefaultValues(t *testing.T) { expected := OptionsConfiguration{ ListenAddress: []string{":22000"}, - ReadOnly: false, GUIEnabled: true, GUIAddress: "127.0.0.1:8080", GlobalAnnServer: "announce.syncthing.net:22025", @@ -34,6 +33,81 @@ func TestDefaultValues(t *testing.T) { } } +func TestNodeConfig(t *testing.T) { + v1data := []byte(` + + + +
a
+
+ +
b
+
+
+ + true + +
+`) + + v2data := []byte(` + + + + + + +
a
+
+ +
b
+
+
+`) + + for i, data := range [][]byte{v1data, v2data} { + cfg, err := readConfigXML(bytes.NewReader(data)) + if err != nil { + t.Error(err) + } + + expectedRepos := []RepositoryConfiguration{ + { + ID: "test", + Directory: "~/Sync", + Nodes: []NodeConfiguration{{NodeID: "node1"}, {NodeID: "node2"}}, + ReadOnly: true, + }, + } + expectedNodes := []NodeConfiguration{ + { + NodeID: "node1", + Name: "node one", + Addresses: []string{"a"}, + }, + { + NodeID: "node2", + Name: "node two", + Addresses: []string{"b"}, + }, + } + expectedNodeIDs := []string{"node1", "node2"} + + if cfg.Version != 2 { + t.Errorf("%d: Incorrect version %d != 2", i, cfg.Version) + } + if !reflect.DeepEqual(cfg.Repositories, expectedRepos) { + t.Errorf("%d: Incorrect Repositories\n A: %#v\n E: %#v", i, cfg.Repositories, expectedRepos) + } + if !reflect.DeepEqual(cfg.Nodes, expectedNodes) { + t.Errorf("%d: Incorrect Nodes\n A: %#v\n E: %#v", i, cfg.Nodes, expectedNodes) + } + if !reflect.DeepEqual(cfg.Repositories[0].NodeIDs(), expectedNodeIDs) { + t.Errorf("%d: Incorrect NodeIDs\n A: %#v\n E: %#v", i, cfg.Repositories[0].NodeIDs(), expectedNodeIDs) + } + } +} + func TestNoListenAddress(t *testing.T) { data := []byte(` @@ -59,7 +133,7 @@ func TestNoListenAddress(t *testing.T) { } func TestOverriddenValues(t *testing.T) { - data := []byte(` + data := []byte(`
dynamic
@@ -67,7 +141,6 @@ func TestOverriddenValues(t *testing.T) {
:23000 - true false false 125.2.2.2:8080 @@ -86,7 +159,6 @@ func TestOverriddenValues(t *testing.T) { expected := OptionsConfiguration{ ListenAddress: []string{":23000"}, - ReadOnly: true, GUIEnabled: false, GUIAddress: "125.2.2.2:8080", GlobalAnnServer: "syncthing.nym.se:22025", diff --git a/cmd/syncthing/main.go b/cmd/syncthing/main.go index 89760db9b..73bf45d01 100644 --- a/cmd/syncthing/main.go +++ b/cmd/syncthing/main.go @@ -16,6 +16,7 @@ import ( "runtime/debug" "strings" "time" + "github.com/calmh/syncthing/discover" "github.com/calmh/syncthing/protocol" "github.com/juju/ratelimit" @@ -128,11 +129,12 @@ func main() { { ID: "default", Directory: filepath.Join(getHomeDir(), "Sync"), - Nodes: []NodeConfiguration{ - {NodeID: myID, Addresses: []string{"dynamic"}}, - }, + Nodes: []NodeConfiguration{{NodeID: myID}}, }, } + cfg.Nodes = []NodeConfiguration{ + {NodeID: myID, Addresses: []string{"dynamic"}}, + } saveConfig() infof("Edit %s to taste or use the GUI\n", cfgFile) @@ -227,14 +229,16 @@ func main() { disc := discovery() go listenConnect(myID, disc, m, tlsCfg, connOpts) - // Routine to pull blocks from other nodes to synchronize the local - // repository. Does not run when we are in read only (publish only) mode. - if cfg.Options.ReadOnly { - okln("Ready to synchronize (read only; no external updates accepted)") - m.StartRO() - } else { - okln("Ready to synchronize (read-write)") - m.StartRW(cfg.Options.ParallelRequests) + for _, repo := range cfg.Repositories { + // Routine to pull blocks from other nodes to synchronize the local + // repository. Does not run when we are in read only (publish only) mode. + if repo.ReadOnly { + okf("Ready to synchronize %s (read only; no external updates accepted)", repo.ID) + m.StartRepoRO(repo.ID) + } else { + okf("Ready to synchronize %s (read-write)", repo.ID) + m.StartRepoRW(repo.ID, cfg.Options.ParallelRequests) + } } select {} @@ -362,13 +366,15 @@ func listenConnect(myID string, disc *discover.Discoverer, m *Model, tlsCfg *tls go func() { for { nextNode: - for _, nodeCfg := range cfg.Repositories[0].Nodes { + for _, nodeCfg := range cfg.Nodes { if nodeCfg.NodeID == myID { continue } if m.ConnectedTo(nodeCfg.NodeID) { continue } + + var addrs []string for _, addr := range nodeCfg.Addresses { if addr == "dynamic" { if disc != nil { @@ -376,10 +382,14 @@ func listenConnect(myID string, disc *discover.Discoverer, m *Model, tlsCfg *tls if len(t) == 0 { continue } - addr = t[0] //XXX: Handle all of them + addrs = append(addrs, t...) } + } else { + addrs = append(addrs, addr) } + } + for _, addr := range addrs { if debugNet { dlog.Println("dial", nodeCfg.NodeID, addr) } diff --git a/cmd/syncthing/model.go b/cmd/syncthing/model.go index 993702dc6..b1c87b8cc 100644 --- a/cmd/syncthing/model.go +++ b/cmd/syncthing/model.go @@ -66,15 +66,13 @@ func NewModel(maxChangeBw int) *Model { // StartRW starts read/write processing on the current model. When in // read/write mode the model will attempt to keep in sync with the cluster by // pulling needed files from peer nodes. -func (m *Model) StartRW(threads int) { +func (m *Model) StartRepoRW(repo string, threads int) { m.rmut.Lock() defer m.rmut.Unlock() - if !m.addedRepo { + if dir, ok := m.repoDirs[repo]; !ok { panic("cannot start without repo") - } - m.started = true - for repo, dir := range m.repoDirs { + } else { newPuller(repo, dir, m, threads) } } @@ -82,17 +80,8 @@ func (m *Model) StartRW(threads int) { // StartRO starts read only processing on the current model. When in // read only mode the model will announce files to the cluster but not // pull in any external changes. -func (m *Model) StartRO() { - m.rmut.Lock() - defer m.rmut.Unlock() - - if !m.addedRepo { - panic("cannot start without repo") - } - m.started = true - for repo, dir := range m.repoDirs { - newPuller(repo, dir, m, 0) // zero threads => read only - } +func (m *Model) StartRepoRO(repo string) { + m.StartRepoRW(repo, 0) // zero threads => read only } type ConnectionInfo struct { @@ -555,12 +544,12 @@ func (m *Model) ScanRepos() { func (m *Model) ScanRepo(repo string) { sup := &suppressor{threshold: int64(cfg.Options.MaxChangeKbps)} w := &scanner.Walker{ - Dir: m.repoDirs[repo], - IgnoreFile: ".stignore", - BlockSize: BlockSize, - TempNamer: defTempNamer, - Suppressor: sup, - CurrentFiler: cFiler{m, repo}, + Dir: m.repoDirs[repo], + IgnoreFile: ".stignore", + BlockSize: BlockSize, + TempNamer: defTempNamer, + Suppressor: sup, + CurrentFiler: cFiler{m, repo}, } fs, _ := w.Walk() m.ReplaceLocal(repo, fs)