mirror of
https://github.com/octoleo/syncthing.git
synced 2025-01-22 22:58:25 +00:00
Handle race within the job queue (fixes #1263)
This commit is contained in:
parent
b04b7bf357
commit
c4c6df179b
@ -453,6 +453,35 @@ func (p *rwFolder) pullerIteration(ignores *ignore.Matcher) int {
|
||||
dirDeletions := []protocol.FileInfo{}
|
||||
buckets := map[string][]protocol.FileInfo{}
|
||||
|
||||
handleFile := func(f protocol.FileInfo) bool {
|
||||
switch {
|
||||
case f.IsDeleted():
|
||||
// A deleted file, directory or symlink
|
||||
if f.IsDirectory() {
|
||||
dirDeletions = append(dirDeletions, f)
|
||||
} else {
|
||||
fileDeletions[f.Name] = f
|
||||
df, ok := p.model.CurrentFolderFile(p.folder, f.Name)
|
||||
// Local file can be already deleted, but with a lower version
|
||||
// number, hence the deletion coming in again as part of
|
||||
// WithNeed, furthermore, the file can simply be of the wrong
|
||||
// type if we haven't yet managed to pull it.
|
||||
if ok && !df.IsDeleted() && !df.IsSymlink() && !df.IsDirectory() {
|
||||
// Put files into buckets per first hash
|
||||
key := string(df.Blocks[0].Hash)
|
||||
buckets[key] = append(buckets[key], df)
|
||||
}
|
||||
}
|
||||
case f.IsDirectory() && !f.IsSymlink():
|
||||
// A new or changed directory
|
||||
l.Debugln("Creating directory", f.Name)
|
||||
p.handleDir(f)
|
||||
default:
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
folderFiles.WithNeed(protocol.LocalDeviceID, func(intf db.FileIntf) bool {
|
||||
// Needed items are delivered sorted lexicographically. We'll handle
|
||||
// directories as they come along, so parents before children. Files
|
||||
@ -467,29 +496,7 @@ func (p *rwFolder) pullerIteration(ignores *ignore.Matcher) int {
|
||||
|
||||
l.Debugln(p, "handling", file.Name)
|
||||
|
||||
switch {
|
||||
case file.IsDeleted():
|
||||
// A deleted file, directory or symlink
|
||||
if file.IsDirectory() {
|
||||
dirDeletions = append(dirDeletions, file)
|
||||
} else {
|
||||
fileDeletions[file.Name] = file
|
||||
df, ok := p.model.CurrentFolderFile(p.folder, file.Name)
|
||||
// Local file can be already deleted, but with a lower version
|
||||
// number, hence the deletion coming in again as part of
|
||||
// WithNeed, furthermore, the file can simply be of the wrong
|
||||
// type if we haven't yet managed to pull it.
|
||||
if ok && !df.IsDeleted() && !df.IsSymlink() && !df.IsDirectory() {
|
||||
// Put files into buckets per first hash
|
||||
key := string(df.Blocks[0].Hash)
|
||||
buckets[key] = append(buckets[key], df)
|
||||
}
|
||||
}
|
||||
case file.IsDirectory() && !file.IsSymlink():
|
||||
// A new or changed directory
|
||||
l.Debugln("Creating directory", file.Name)
|
||||
p.handleDir(file)
|
||||
default:
|
||||
if !handleFile(file) {
|
||||
// A new or changed file or symlink. This is the only case where we
|
||||
// do stuff concurrently in the background
|
||||
p.queue.Push(file.Name, file.Size(), file.Modified)
|
||||
@ -532,11 +539,14 @@ nextFile:
|
||||
continue
|
||||
}
|
||||
|
||||
// Local file can be already deleted, but with a lower version
|
||||
// number, hence the deletion coming in again as part of
|
||||
// WithNeed, furthermore, the file can simply be of the wrong type if
|
||||
// the global index changed while we were processing this iteration.
|
||||
if !f.IsDeleted() && !f.IsSymlink() && !f.IsDirectory() {
|
||||
// Handles races where an index update arrives changing what the file
|
||||
// is between queueing and retrieving it from the queue, effectively
|
||||
// changing how the file should be handled.
|
||||
if handleFile(f) {
|
||||
continue
|
||||
}
|
||||
|
||||
if !f.IsSymlink() {
|
||||
key := string(f.Blocks[0].Hash)
|
||||
for i, candidate := range buckets[key] {
|
||||
if scanner.BlocksEqual(candidate.Blocks, f.Blocks) {
|
||||
|
Loading…
x
Reference in New Issue
Block a user