Make parallel hasher configurable, remove finisher setting (fixes #1199)

This commit is contained in:
Audrius Butkevicius 2014-12-24 23:12:12 +00:00
parent 5d173168cc
commit 5827a686b8
5 changed files with 20 additions and 19 deletions

View File

@ -62,7 +62,7 @@ type FolderConfiguration struct {
LenientMtimes bool `xml:"lenientMtimes"`
Copiers int `xml:"copiers" default:"1"` // This defines how many files are handled concurrently.
Pullers int `xml:"pullers" default:"16"` // Defines how many blocks are fetched at the same time, possibly between separate copier routines.
Finishers int `xml:"finishers" default:"1"` // Most of the time, should be equal to the number of copiers. These are CPU bound due to hashing.
Hashers int `xml:"hashers" default:"0"` // Less than one sets the value to the number of cores. These are CPU bound due to hashing.
Invalid string `xml:"-"` // Set at runtime when there is an error, not saved
@ -358,9 +358,6 @@ func (cfg *Configuration) prepare(myID protocol.DeviceID) {
if cfg.Folders[i].Pullers == 0 {
cfg.Folders[i].Pullers = 16
}
if cfg.Folders[i].Finishers == 0 {
cfg.Folders[i].Finishers = 1
}
sort.Sort(FolderDeviceConfigurationList(cfg.Folders[i].Devices))
}

View File

@ -89,7 +89,7 @@ func TestDeviceConfig(t *testing.T) {
RescanIntervalS: 600,
Copiers: 1,
Pullers: 16,
Finishers: 1,
Hashers: 0,
},
}
expectedDevices := []DeviceConfiguration{

View File

@ -190,7 +190,6 @@ func (m *Model) StartFolderRW(folder string) {
progressEmitter: m.progressEmitter,
copiers: cfg.Copiers,
pullers: cfg.Pullers,
finishers: cfg.Finishers,
queue: newJobQueue(),
}
m.folderRunners[folder] = p
@ -1138,6 +1137,7 @@ func (m *Model) ScanFolderSub(folder, sub string) error {
TempLifetime: time.Duration(m.cfg.Options().KeepTemporariesH) * time.Hour,
CurrentFiler: cFiler{m, folder},
IgnorePerms: folderCfg.IgnorePerms,
Hashers: folderCfg.Hashers,
}
m.setState(folder, FolderScanning)

View File

@ -77,7 +77,6 @@ type Puller struct {
progressEmitter *ProgressEmitter
copiers int
pullers int
finishers int
queue *jobQueue
}
@ -258,7 +257,7 @@ func (p *Puller) pullerIteration(ignores *ignore.Matcher) int {
var doneWg sync.WaitGroup
if debug {
l.Debugln(p, "c", p.copiers, "p", p.pullers, "f", p.finishers)
l.Debugln(p, "c", p.copiers, "p", p.pullers)
}
for i := 0; i < p.copiers; i++ {
@ -279,14 +278,12 @@ func (p *Puller) pullerIteration(ignores *ignore.Matcher) int {
}()
}
for i := 0; i < p.finishers; i++ {
doneWg.Add(1)
// finisherRoutine finishes when finisherChan is closed
go func() {
p.finisherRoutine(finisherChan)
doneWg.Done()
}()
}
p.model.fmut.RLock()
folderFiles := p.model.folderFiles[p.folder]

View File

@ -49,6 +49,8 @@ type Walker struct {
// detected. Scanned files will get zero permission bits and the
// NoPermissionBits flag set.
IgnorePerms bool
// Number of routines to use for hashing
Hashers int
}
type TempNamer interface {
@ -75,9 +77,14 @@ func (w *Walker) Walk() (chan protocol.FileInfo, error) {
return nil, err
}
workers := w.Hashers
if workers < 1 {
workers = runtime.NumCPU()
}
files := make(chan protocol.FileInfo)
hashedFiles := make(chan protocol.FileInfo)
newParallelHasher(w.Dir, w.BlockSize, runtime.NumCPU(), hashedFiles, files)
newParallelHasher(w.Dir, w.BlockSize, workers, hashedFiles, files)
go func() {
hashFiles := w.walkAndHashFiles(files)