From f45865606aacb971036c6ad5e72b2e537b5ef8b6 Mon Sep 17 00:00:00 2001 From: Jakob Borg Date: Thu, 9 Apr 2015 12:53:57 +0200 Subject: [PATCH 1/2] Add initial merge and reset conflict tests --- test/conflict_test.go | 326 +++++++++++++++++++++++++++++++++++++----- 1 file changed, 290 insertions(+), 36 deletions(-) diff --git a/test/conflict_test.go b/test/conflict_test.go index 608e79750..90d28c871 100644 --- a/test/conflict_test.go +++ b/test/conflict_test.go @@ -9,6 +9,7 @@ package integration import ( + "io/ioutil" "log" "os" "path/filepath" @@ -47,43 +48,8 @@ func TestConflict(t *testing.T) { t.Fatal(err) } - log.Println("Starting sender...") - sender := syncthingProcess{ // id1 - instance: "1", - argv: []string{"-home", "h1"}, - port: 8081, - apiKey: apiKey, - } - err = sender.start() - if err != nil { - t.Fatal(err) - } + sender, receiver := coSenderReceiver(t) defer sender.stop() - - // Wait for one scan to succeed, or up to 20 seconds... This is to let - // startup, UPnP etc complete and make sure the sender has the full index - // before they connect. - for i := 0; i < 20; i++ { - err := sender.rescan("default") - if err != nil { - time.Sleep(time.Second) - continue - } - break - } - - log.Println("Starting receiver...") - receiver := syncthingProcess{ // id2 - instance: "2", - argv: []string{"-home", "h2"}, - port: 8082, - apiKey: apiKey, - } - err = receiver.start() - if err != nil { - sender.stop() - t.Fatal(err) - } defer receiver.stop() if err = coCompletion(sender, receiver); err != nil { @@ -213,6 +179,294 @@ func TestConflict(t *testing.T) { } } +func TestInitialMergeConflicts(t *testing.T) { + log.Println("Cleaning...") + err := removeAll("s1", "s2", "h1/index*", "h2/index*") + if err != nil { + t.Fatal(err) + } + + err = os.Mkdir("s1", 0755) + if err != nil { + t.Fatal(err) + } + err = os.Mkdir("s2", 0755) + if err != nil { + t.Fatal(err) + } + + // File 1 is a conflict + + err = ioutil.WriteFile("s1/file1", []byte("hello\n"), 0644) + if err != nil { + t.Fatal(err) + } + + err = ioutil.WriteFile("s2/file1", []byte("goodbye\n"), 0644) + if err != nil { + t.Fatal(err) + } + + // File 2 exists on s1 only + + err = ioutil.WriteFile("s1/file2", []byte("hello\n"), 0644) + if err != nil { + t.Fatal(err) + } + + // File 3 exists on s2 only + + err = ioutil.WriteFile("s2/file3", []byte("goodbye\n"), 0644) + if err != nil { + t.Fatal(err) + } + + // Let them sync + + sender, receiver := coSenderReceiver(t) + defer sender.stop() + defer receiver.stop() + + log.Println("Syncing...") + + if err = coCompletion(sender, receiver); err != nil { + t.Fatal(err) + } + + sender.stop() + receiver.stop() + + log.Println("Verifying...") + + // s1 should have three-four files (there's a conflict from s2 which may or may not have synced yet) + + files, err := filepath.Glob("s1/file*") + if err != nil { + t.Fatal(err) + } + if len(files) < 3 || len(files) > 4 { + t.Errorf("Expected 3-4 files in s1 instead of %d", len(files)) + } + + // s2 should have four files (there's a conflict) + + files, err = filepath.Glob("s2/file*") + if err != nil { + t.Fatal(err) + } + if len(files) != 4 { + t.Errorf("Expected 4 files in s2 instead of %d", len(files)) + } + + // file1 is in conflict, so there's two versions of that one + + files, err = filepath.Glob("s2/file1*") + if err != nil { + t.Fatal(err) + } + if len(files) != 2 { + t.Errorf("Expected 2 'file1' files in s2 instead of %d", len(files)) + } +} + +func TestResetConflicts(t *testing.T) { + log.Println("Cleaning...") + err := removeAll("s1", "s2", "h1/index*", "h2/index*") + if err != nil { + t.Fatal(err) + } + + err = os.Mkdir("s1", 0755) + if err != nil { + t.Fatal(err) + } + err = os.Mkdir("s2", 0755) + if err != nil { + t.Fatal(err) + } + + // Three files on s1 + + err = ioutil.WriteFile("s1/file1", []byte("hello\n"), 0644) + if err != nil { + t.Fatal(err) + } + err = ioutil.WriteFile("s1/file2", []byte("hello\n"), 0644) + if err != nil { + t.Fatal(err) + } + err = ioutil.WriteFile("s2/file3", []byte("hello\n"), 0644) + if err != nil { + t.Fatal(err) + } + + // Let them sync + + sender, receiver := coSenderReceiver(t) + defer sender.stop() + defer receiver.stop() + + log.Println("Syncing...") + + if err = coCompletion(sender, receiver); err != nil { + t.Fatal(err) + } + + log.Println("Verifying...") + + // s1 should have three files + + files, err := filepath.Glob("s1/file*") + if err != nil { + t.Fatal(err) + } + if len(files) != 3 { + t.Errorf("Expected 3 files in s1 instead of %d", len(files)) + } + + // s2 should have three + + files, err = filepath.Glob("s2/file*") + if err != nil { + t.Fatal(err) + } + if len(files) != 3 { + t.Errorf("Expected 3 files in s2 instead of %d", len(files)) + } + + log.Println("Updating...") + + // change s2/file2 a few times, so that it's version counter increases. + // This will make the file on the cluster look newer than what we have + // locally after we rest the index, unless we have a fix for that. + + err = ioutil.WriteFile("s2/file2", []byte("hello1\n"), 0644) + if err != nil { + t.Fatal(err) + } + err = receiver.rescan("default") + if err != nil { + t.Fatal(err) + } + time.Sleep(time.Second) + err = ioutil.WriteFile("s2/file2", []byte("hello2\n"), 0644) + if err != nil { + t.Fatal(err) + } + err = receiver.rescan("default") + if err != nil { + t.Fatal(err) + } + time.Sleep(time.Second) + err = ioutil.WriteFile("s2/file2", []byte("hello3\n"), 0644) + if err != nil { + t.Fatal(err) + } + err = receiver.rescan("default") + if err != nil { + t.Fatal(err) + } + time.Sleep(time.Second) + + if err = coCompletion(sender, receiver); err != nil { + t.Fatal(err) + } + + // Now nuke the index + + log.Println("Resetting...") + + receiver.stop() + removeAll("h2/index*") + + // s1/file1 (remote) changes while receiver is down + + err = ioutil.WriteFile("s1/file1", []byte("goodbye\n"), 0644) + if err != nil { + t.Fatal(err) + } + + // s1 must know about it + err = sender.rescan("default") + if err != nil { + t.Fatal(err) + } + + // s2/file2 (local) changes while receiver is down + + err = ioutil.WriteFile("s2/file2", []byte("goodbye\n"), 0644) + if err != nil { + t.Fatal(err) + } + + receiver.start() + + log.Println("Syncing...") + + if err = coCompletion(sender, receiver); err != nil { + t.Fatal(err) + } + + // s2 should have five files (three plus two conflicts) + + files, err = filepath.Glob("s2/file*") + if err != nil { + t.Fatal(err) + } + if len(files) != 5 { + t.Errorf("Expected 5 files in s2 instead of %d", len(files)) + } + + // file1 is in conflict, so there's two versions of that one + + files, err = filepath.Glob("s2/file1*") + if err != nil { + t.Fatal(err) + } + if len(files) != 2 { + t.Errorf("Expected 2 'file1' files in s2 instead of %d", len(files)) + } + + // file2 is in conflict, so there's two versions of that one + + files, err = filepath.Glob("s2/file2*") + if err != nil { + t.Fatal(err) + } + if len(files) != 2 { + t.Errorf("Expected 2 'file2' files in s2 instead of %d", len(files)) + } +} + +func coSenderReceiver(t *testing.T) (syncthingProcess, syncthingProcess) { + log.Println("Starting sender...") + sender := syncthingProcess{ // id1 + instance: "1", + argv: []string{"-home", "h1"}, + port: 8081, + apiKey: apiKey, + } + err := sender.start() + if err != nil { + t.Fatal(err) + } + + log.Println("Starting receiver...") + receiver := syncthingProcess{ // id2 + instance: "2", + argv: []string{"-home", "h2"}, + port: 8082, + apiKey: apiKey, + } + err = receiver.start() + if err != nil { + sender.stop() + t.Fatal(err) + } + + return sender, receiver +} + func coCompletion(p ...syncthingProcess) error { mainLoop: for { From 936c76119da2aeecf4169a95e07a1b8a26bf98ad Mon Sep 17 00:00:00 2001 From: Jakob Borg Date: Thu, 9 Apr 2015 12:53:41 +0200 Subject: [PATCH 2/2] Index reset should generate file conflicts (fixes #1613) --- Godeps/Godeps.json | 2 +- .../github.com/syncthing/protocol/vector.go | 10 +++++ .../syncthing/protocol/vector_test.go | 14 ++++++- .../src/github.com/thejerf/suture/pre-commit | 1 + internal/model/model.go | 2 +- internal/model/rwfolder.go | 39 ++++++++++++++++--- 6 files changed, 60 insertions(+), 8 deletions(-) diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index 15857ef00..1886f33d4 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -31,7 +31,7 @@ }, { "ImportPath": "github.com/syncthing/protocol", - "Rev": "3d8a71fdb205fe2401a341a739208bc9d1e79a1b" + "Rev": "e7db2648034fb71b051902a02bc25d4468ed492e" }, { "ImportPath": "github.com/syndtr/goleveldb/leveldb", diff --git a/Godeps/_workspace/src/github.com/syncthing/protocol/vector.go b/Godeps/_workspace/src/github.com/syncthing/protocol/vector.go index 048594522..edd156143 100644 --- a/Godeps/_workspace/src/github.com/syncthing/protocol/vector.go +++ b/Godeps/_workspace/src/github.com/syncthing/protocol/vector.go @@ -103,3 +103,13 @@ func (a Vector) Concurrent(b Vector) bool { comp := a.Compare(b) return comp == ConcurrentGreater || comp == ConcurrentLesser } + +// Counter returns the current value of the given counter ID. +func (v Vector) Counter(id uint64) uint64 { + for _, c := range v { + if c.ID == id { + return c.Value + } + } + return 0 +} diff --git a/Godeps/_workspace/src/github.com/syncthing/protocol/vector_test.go b/Godeps/_workspace/src/github.com/syncthing/protocol/vector_test.go index 7815412c2..c01255e7a 100644 --- a/Godeps/_workspace/src/github.com/syncthing/protocol/vector_test.go +++ b/Godeps/_workspace/src/github.com/syncthing/protocol/vector_test.go @@ -118,5 +118,17 @@ func TestMerge(t *testing.T) { t.Errorf("%d: %+v.Merge(%+v) == %+v (expected %+v)", i, tc.a, tc.b, m, tc.m) } } - +} + +func TestCounterValue(t *testing.T) { + v0 := Vector{Counter{42, 1}, Counter{64, 5}} + if v0.Counter(42) != 1 { + t.Error("Counter error, %d != %d", v0.Counter(42), 1) + } + if v0.Counter(64) != 5 { + t.Error("Counter error, %d != %d", v0.Counter(64), 5) + } + if v0.Counter(72) != 0 { + t.Error("Counter error, %d != %d", v0.Counter(72), 0) + } } diff --git a/Godeps/_workspace/src/github.com/thejerf/suture/pre-commit b/Godeps/_workspace/src/github.com/thejerf/suture/pre-commit index c88ec0fe4..6199d610f 100644 --- a/Godeps/_workspace/src/github.com/thejerf/suture/pre-commit +++ b/Godeps/_workspace/src/github.com/thejerf/suture/pre-commit @@ -9,3 +9,4 @@ if [ ! -z "$GOLINTOUT" -o "$?" != 0 ]; then fi go test + diff --git a/internal/model/model.go b/internal/model/model.go index fa122d8af..7965fca32 100644 --- a/internal/model/model.go +++ b/internal/model/model.go @@ -142,7 +142,7 @@ func (m *Model) StartFolderRW(folder string) { if ok { panic("cannot start already running folder " + folder) } - p := newRWFolder(m, cfg) + p := newRWFolder(m, m.shortID, cfg) m.folderRunners[folder] = p m.fmut.Unlock() diff --git a/internal/model/rwfolder.go b/internal/model/rwfolder.go index 971081659..25eafcf57 100644 --- a/internal/model/rwfolder.go +++ b/internal/model/rwfolder.go @@ -68,13 +68,14 @@ type rwFolder struct { lenientMtimes bool copiers int pullers int + shortID uint64 stop chan struct{} queue *jobQueue dbUpdates chan protocol.FileInfo } -func newRWFolder(m *Model, cfg config.FolderConfiguration) *rwFolder { +func newRWFolder(m *Model, shortID uint64, cfg config.FolderConfiguration) *rwFolder { return &rwFolder{ stateTracker: stateTracker{folder: cfg.ID}, @@ -88,6 +89,7 @@ func newRWFolder(m *Model, cfg config.FolderConfiguration) *rwFolder { lenientMtimes: cfg.LenientMtimes, copiers: cfg.Copiers, pullers: cfg.Pullers, + shortID: shortID, stop: make(chan struct{}), queue: newJobQueue(), @@ -603,8 +605,11 @@ func (p *rwFolder) deleteFile(file protocol.FileInfo) { realName := filepath.Join(p.dir, file.Name) cur, ok := p.model.CurrentFolderFile(p.folder, file.Name) - if ok && cur.Version.Concurrent(file.Version) { - // There is a conflict here. Move the file to a conflict copy instead of deleting. + if ok && p.inConflict(cur.Version, file.Version) { + // There is a conflict here. Move the file to a conflict copy instead + // of deleting. Also merge with the version vector we had, to indicate + // we have resolved the conflict. + file.Version = file.Version.Merge(cur.Version) err = osutil.InWritableDir(moveForConflict, realName) } else if p.versioner != nil { err = osutil.InWritableDir(p.versioner.Archive, realName) @@ -816,6 +821,12 @@ func (p *rwFolder) shortcutFile(file protocol.FileInfo) (err error) { } } + // This may have been a conflict. We should merge the version vectors so + // that our clock doesn't move backwards. + if cur, ok := p.model.CurrentFolderFile(p.folder, file.Name); ok { + file.Version = file.Version.Merge(cur.Version) + } + p.dbUpdates <- file return } @@ -1011,10 +1022,12 @@ func (p *rwFolder) performFinish(state *sharedPullerState) { } } - if state.version.Concurrent(state.file.Version) { + if p.inConflict(state.version, state.file.Version) { // The new file has been changed in conflict with the existing one. We // should file it away as a conflict instead of just removing or - // archiving. + // archiving. Also merge with the version vector we had, to indicate + // we have resolved the conflict. + state.file.Version = state.file.Version.Merge(state.version) err = osutil.InWritableDir(moveForConflict, state.realName) } else if p.versioner != nil { // If we should use versioning, let the versioner archive the old @@ -1144,6 +1157,22 @@ loop: } } +func (p *rwFolder) inConflict(current, replacement protocol.Vector) bool { + if current.Concurrent(replacement) { + // Obvious case + return true + } + if replacement.Counter(p.shortID) > current.Counter(p.shortID) { + // The replacement file contains a higher version for ourselves than + // what we have. This isn't supposed to be possible, since it's only + // we who can increment that counter. We take it as a sign that + // something is wrong (our index may have been corrupted or removed) + // and flag it as a conflict. + return true + } + return false +} + func invalidateFolder(cfg *config.Configuration, folderID string, err error) { for i := range cfg.Folders { folder := &cfg.Folders[i]