Merge pull request #2701 from AudriusButkevicius/raceee

Handle race while in the job queue (fixes #1263)
This commit is contained in:
Jakob Borg 2016-01-16 18:59:50 +01:00
commit 61d7e11001

View File

@ -453,6 +453,35 @@ func (p *rwFolder) pullerIteration(ignores *ignore.Matcher) int {
dirDeletions := []protocol.FileInfo{}
buckets := map[string][]protocol.FileInfo{}
handleFile := func(f protocol.FileInfo) bool {
switch {
case f.IsDeleted():
// A deleted file, directory or symlink
if f.IsDirectory() {
dirDeletions = append(dirDeletions, f)
} else {
fileDeletions[f.Name] = f
df, ok := p.model.CurrentFolderFile(p.folder, f.Name)
// Local file can be already deleted, but with a lower version
// number, hence the deletion coming in again as part of
// WithNeed, furthermore, the file can simply be of the wrong
// type if we haven't yet managed to pull it.
if ok && !df.IsDeleted() && !df.IsSymlink() && !df.IsDirectory() {
// Put files into buckets per first hash
key := string(df.Blocks[0].Hash)
buckets[key] = append(buckets[key], df)
}
}
case f.IsDirectory() && !f.IsSymlink():
// A new or changed directory
l.Debugln("Creating directory", f.Name)
p.handleDir(f)
default:
return false
}
return true
}
folderFiles.WithNeed(protocol.LocalDeviceID, func(intf db.FileIntf) bool {
// Needed items are delivered sorted lexicographically. We'll handle
// directories as they come along, so parents before children. Files
@ -467,29 +496,7 @@ func (p *rwFolder) pullerIteration(ignores *ignore.Matcher) int {
l.Debugln(p, "handling", file.Name)
switch {
case file.IsDeleted():
// A deleted file, directory or symlink
if file.IsDirectory() {
dirDeletions = append(dirDeletions, file)
} else {
fileDeletions[file.Name] = file
df, ok := p.model.CurrentFolderFile(p.folder, file.Name)
// Local file can be already deleted, but with a lower version
// number, hence the deletion coming in again as part of
// WithNeed, furthermore, the file can simply be of the wrong
// type if we haven't yet managed to pull it.
if ok && !df.IsDeleted() && !df.IsSymlink() && !df.IsDirectory() {
// Put files into buckets per first hash
key := string(df.Blocks[0].Hash)
buckets[key] = append(buckets[key], df)
}
}
case file.IsDirectory() && !file.IsSymlink():
// A new or changed directory
l.Debugln("Creating directory", file.Name)
p.handleDir(file)
default:
if !handleFile(file) {
// A new or changed file or symlink. This is the only case where we
// do stuff concurrently in the background
p.queue.Push(file.Name, file.Size(), file.Modified)
@ -532,11 +539,14 @@ nextFile:
continue
}
// Local file can be already deleted, but with a lower version
// number, hence the deletion coming in again as part of
// WithNeed, furthermore, the file can simply be of the wrong type if
// the global index changed while we were processing this iteration.
if !f.IsDeleted() && !f.IsSymlink() && !f.IsDirectory() {
// Handles races where an index update arrives changing what the file
// is between queueing and retrieving it from the queue, effectively
// changing how the file should be handled.
if handleFile(f) {
continue
}
if !f.IsSymlink() {
key := string(f.Blocks[0].Hash)
for i, candidate := range buckets[key] {
if scanner.BlocksEqual(candidate.Blocks, f.Blocks) {