mirror of
https://github.com/octoleo/syncthing.git
synced 2024-11-08 22:31:04 +00:00
lib/model: Fix accounting error in rescan with multiple subs (fixes #3028)
When doing prefix scans in the database, "foo" should not be considered a prefix of "foo2". Instead, it should match "foo" exactly and also strings with the prefix "foo/". This is more restrictive than what the standard leveldb prefix scan does so we add some code to enforce it. Also exposes the initialScanCompleted on the rwfolder for testing, and change it to be a channel (so we can wait for it from another goroutine). Otherwise we can't be sure when the initial scan has completed, and we need to wait for that or it might pick up changes we're doing at an unexpected time. GitHub-Pull-Request: https://github.com/syncthing/syncthing/pull/3067
This commit is contained in:
parent
8b7b0a03eb
commit
1a703efa78
@ -262,7 +262,17 @@ func (db *Instance) withHave(folder, device, prefix []byte, truncate bool, fn It
|
||||
dbi := t.NewIterator(util.BytesPrefix(db.deviceKey(folder, device, prefix)[:keyPrefixLen+keyFolderLen+keyDeviceLen+len(prefix)]), nil)
|
||||
defer dbi.Release()
|
||||
|
||||
slashedPrefix := prefix
|
||||
if !bytes.HasSuffix(prefix, []byte{'/'}) {
|
||||
slashedPrefix = append(slashedPrefix, '/')
|
||||
}
|
||||
|
||||
for dbi.Next() {
|
||||
name := db.deviceKeyName(dbi.Key())
|
||||
if len(prefix) > 0 && !bytes.Equal(name, prefix) && !bytes.HasPrefix(name, slashedPrefix) {
|
||||
return
|
||||
}
|
||||
|
||||
// The iterator function may keep a reference to the unmarshalled
|
||||
// struct, which in turn references the buffer it was unmarshalled
|
||||
// from. dbi.Value() just returns an internal slice that it reuses, so
|
||||
@ -359,6 +369,11 @@ func (db *Instance) withGlobal(folder, prefix []byte, truncate bool, fn Iterator
|
||||
dbi := t.NewIterator(util.BytesPrefix(db.globalKey(folder, prefix)), nil)
|
||||
defer dbi.Release()
|
||||
|
||||
slashedPrefix := prefix
|
||||
if !bytes.HasSuffix(prefix, []byte{'/'}) {
|
||||
slashedPrefix = append(slashedPrefix, '/')
|
||||
}
|
||||
|
||||
var fk []byte
|
||||
for dbi.Next() {
|
||||
var vl versionList
|
||||
@ -370,7 +385,12 @@ func (db *Instance) withGlobal(folder, prefix []byte, truncate bool, fn Iterator
|
||||
l.Debugln(dbi.Key())
|
||||
panic("no versions?")
|
||||
}
|
||||
|
||||
name := db.globalKeyName(dbi.Key())
|
||||
if len(prefix) > 0 && !bytes.Equal(name, prefix) && !bytes.HasPrefix(name, slashedPrefix) {
|
||||
return
|
||||
}
|
||||
|
||||
fk = db.deviceKeyInto(fk[:cap(fk)], folder, vl.versions[0].device, name)
|
||||
bs, err := t.Get(fk, nil)
|
||||
if err != nil {
|
||||
|
@ -1360,6 +1360,66 @@ func TestUnifySubs(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestIssue3028(t *testing.T) {
|
||||
// Create two files that we'll delete, one with a name that is a prefix of the other.
|
||||
|
||||
if err := ioutil.WriteFile("testdata/testrm", []byte("Hello"), 0644); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer os.Remove("testdata/testrm")
|
||||
if err := ioutil.WriteFile("testdata/testrm2", []byte("Hello"), 0644); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer os.Remove("testdata/testrm2")
|
||||
|
||||
// Create a model and default folder
|
||||
|
||||
db := db.OpenMemory()
|
||||
m := NewModel(defaultConfig, protocol.LocalDeviceID, "device", "syncthing", "dev", db, nil)
|
||||
defCfg := defaultFolderConfig.Copy()
|
||||
defCfg.RescanIntervalS = 86400
|
||||
m.AddFolder(defCfg)
|
||||
m.StartFolder("default")
|
||||
m.ServeBackground()
|
||||
|
||||
// Ugly hack for testing: reach into the model for the rwfolder and wait
|
||||
// for it to complete the initial scan. The risk is that it otherwise
|
||||
// runs during our modifications and screws up the test.
|
||||
m.fmut.RLock()
|
||||
folder := m.folderRunners["default"].(*rwFolder)
|
||||
m.fmut.RUnlock()
|
||||
<-folder.initialScanCompleted
|
||||
|
||||
// Get a count of how many files are there now
|
||||
|
||||
locorigfiles, _, _ := m.LocalSize("default")
|
||||
globorigfiles, _, _ := m.GlobalSize("default")
|
||||
|
||||
// Delete and rescan specifically these two
|
||||
|
||||
os.Remove("testdata/testrm")
|
||||
os.Remove("testdata/testrm2")
|
||||
m.ScanFolderSubs("default", []string{"testrm", "testrm2"})
|
||||
|
||||
// Verify that the number of files decreased by two and the number of
|
||||
// deleted files increases by two
|
||||
|
||||
locnowfiles, locdelfiles, _ := m.LocalSize("default")
|
||||
globnowfiles, globdelfiles, _ := m.GlobalSize("default")
|
||||
if locnowfiles != locorigfiles-2 {
|
||||
t.Errorf("Incorrect local accounting; got %d current files, expected %d", locnowfiles, locorigfiles-2)
|
||||
}
|
||||
if globnowfiles != globorigfiles-2 {
|
||||
t.Errorf("Incorrect global accounting; got %d current files, expected %d", globnowfiles, globorigfiles-2)
|
||||
}
|
||||
if locdelfiles != 2 {
|
||||
t.Errorf("Incorrect local accounting; got %d deleted files, expected 2", locdelfiles)
|
||||
}
|
||||
if globdelfiles != 2 {
|
||||
t.Errorf("Incorrect global accounting; got %d deleted files, expected 2", globdelfiles)
|
||||
}
|
||||
}
|
||||
|
||||
type fakeAddr struct{}
|
||||
|
||||
func (fakeAddr) Network() string {
|
||||
|
@ -100,6 +100,8 @@ type rwFolder struct {
|
||||
|
||||
errors map[string]string // path -> error string
|
||||
errorsMut sync.Mutex
|
||||
|
||||
initialScanCompleted chan (struct{}) // exposed for testing
|
||||
}
|
||||
|
||||
func newRWFolder(model *Model, cfg config.FolderConfiguration, ver versioner.Versioner) service {
|
||||
@ -135,6 +137,8 @@ func newRWFolder(model *Model, cfg config.FolderConfiguration, ver versioner.Ver
|
||||
remoteIndex: make(chan struct{}, 1), // This needs to be 1-buffered so that we queue a notification if we're busy doing a pull when it comes.
|
||||
|
||||
errorsMut: sync.NewMutex(),
|
||||
|
||||
initialScanCompleted: make(chan struct{}),
|
||||
}
|
||||
|
||||
f.configureCopiersAndPullers(cfg)
|
||||
@ -186,9 +190,6 @@ func (f *rwFolder) Serve() {
|
||||
var prevVer int64
|
||||
var prevIgnoreHash string
|
||||
|
||||
// We don't start pulling files until a scan has been completed.
|
||||
initialScanCompleted := false
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-f.stop:
|
||||
@ -200,7 +201,10 @@ func (f *rwFolder) Serve() {
|
||||
l.Debugln(f, "remote index updated, rescheduling pull")
|
||||
|
||||
case <-f.pullTimer.C:
|
||||
if !initialScanCompleted {
|
||||
select {
|
||||
case <-f.initialScanCompleted:
|
||||
default:
|
||||
// We don't start pulling files until a scan has been completed.
|
||||
l.Debugln(f, "skip (initial)")
|
||||
f.pullTimer.Reset(f.sleep)
|
||||
continue
|
||||
@ -297,9 +301,11 @@ func (f *rwFolder) Serve() {
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
if !initialScanCompleted {
|
||||
select {
|
||||
case <-f.initialScanCompleted:
|
||||
default:
|
||||
l.Infoln("Completed initial scan (rw) of folder", f.folderID)
|
||||
initialScanCompleted = true
|
||||
close(f.initialScanCompleted)
|
||||
}
|
||||
|
||||
case req := <-f.scan.now:
|
||||
|
Loading…
Reference in New Issue
Block a user