diff --git a/internal/model/model.go b/internal/model/model.go index 4fb25e90c..964af8120 100644 --- a/internal/model/model.go +++ b/internal/model/model.go @@ -1027,17 +1027,14 @@ func sendIndexTo(initial bool, minLocalVer int64, conn protocol.Connection, fold return maxLocalVer, err } -func (m *Model) updateLocal(folder string, f protocol.FileInfo) { - f.LocalVersion = 0 +func (m *Model) updateLocals(folder string, fs []protocol.FileInfo) { m.fmut.RLock() - m.folderFiles[folder].Update(protocol.LocalDeviceID, []protocol.FileInfo{f}) + m.folderFiles[folder].Update(protocol.LocalDeviceID, fs) m.fmut.RUnlock() + events.Default.Log(events.LocalIndexUpdated, map[string]interface{}{ "folder": folder, - "name": f.Name, - "modified": time.Unix(f.Modified, 0), - "flags": fmt.Sprintf("0%o", f.Flags), - "size": f.Size(), + "numFiles": len(fs), }) } diff --git a/internal/model/rwfolder.go b/internal/model/rwfolder.go index 1c2c16eb2..f30659e70 100644 --- a/internal/model/rwfolder.go +++ b/internal/model/rwfolder.go @@ -69,8 +69,9 @@ type rwFolder struct { copiers int pullers int - stop chan struct{} - queue *jobQueue + stop chan struct{} + queue *jobQueue + dbUpdates chan protocol.FileInfo } func newRWFolder(m *Model, cfg config.FolderConfiguration) *rwFolder { @@ -276,6 +277,7 @@ func (p *rwFolder) pullerIteration(ignores *ignore.Matcher) int { copyChan := make(chan copyBlocksState) finisherChan := make(chan *sharedPullerState) + var updateWg sync.WaitGroup var copyWg sync.WaitGroup var pullWg sync.WaitGroup var doneWg sync.WaitGroup @@ -284,6 +286,14 @@ func (p *rwFolder) pullerIteration(ignores *ignore.Matcher) int { l.Debugln(p, "c", p.copiers, "p", p.pullers) } + p.dbUpdates = make(chan protocol.FileInfo) + updateWg.Add(1) + go func() { + // dbUpdaterRoutine finishes when p.dbUpdates is closed + p.dbUpdaterRoutine() + updateWg.Done() + }() + for i := 0; i < p.copiers; i++ { copyWg.Add(1) go func() { @@ -453,6 +463,10 @@ nextFile: p.deleteDir(dir) } + // Wait for db updates to complete + close(p.dbUpdates) + updateWg.Wait() + return changed } @@ -510,7 +524,7 @@ func (p *rwFolder) handleDir(file protocol.FileInfo) { } if err = osutil.InWritableDir(mkdir, realName); err == nil { - p.model.updateLocal(p.folder, file) + p.dbUpdates <- file } else { l.Infof("Puller (folder %q, dir %q): %v", p.folder, file.Name, err) } @@ -527,9 +541,9 @@ func (p *rwFolder) handleDir(file protocol.FileInfo) { // It's OK to change mode bits on stuff within non-writable directories. if p.ignorePerms { - p.model.updateLocal(p.folder, file) + p.dbUpdates <- file } else if err := os.Chmod(realName, mode); err == nil { - p.model.updateLocal(p.folder, file) + p.dbUpdates <- file } else { l.Infof("Puller (folder %q, dir %q): %v", p.folder, file.Name, err) } @@ -564,7 +578,7 @@ func (p *rwFolder) deleteDir(file protocol.FileInfo) { } err = osutil.InWritableDir(os.Remove, realName) if err == nil || os.IsNotExist(err) { - p.model.updateLocal(p.folder, file) + p.dbUpdates <- file } else { l.Infof("Puller (folder %q, dir %q): delete: %v", p.folder, file.Name, err) } @@ -601,7 +615,7 @@ func (p *rwFolder) deleteFile(file protocol.FileInfo) { if err != nil && !os.IsNotExist(err) { l.Infof("Puller (folder %q, file %q): delete: %v", p.folder, file.Name, err) } else { - p.model.updateLocal(p.folder, file) + p.dbUpdates <- file } } @@ -653,7 +667,7 @@ func (p *rwFolder) renameFile(source, target protocol.FileInfo) { // of the source and the creation of the target. Fix-up the metadata, // and update the local index of the target file. - p.model.updateLocal(p.folder, source) + p.dbUpdates <- source err = p.shortcutFile(target) if err != nil { @@ -671,7 +685,7 @@ func (p *rwFolder) renameFile(source, target protocol.FileInfo) { return } - p.model.updateLocal(p.folder, source) + p.dbUpdates <- source } } @@ -802,7 +816,7 @@ func (p *rwFolder) shortcutFile(file protocol.FileInfo) (err error) { } } - p.model.updateLocal(p.folder, file) + p.dbUpdates <- file return } @@ -810,7 +824,7 @@ func (p *rwFolder) shortcutFile(file protocol.FileInfo) (err error) { func (p *rwFolder) shortcutSymlink(file protocol.FileInfo) (err error) { err = symlinks.ChangeType(filepath.Join(p.dir, file.Name), file.Flags) if err == nil { - p.model.updateLocal(p.folder, file) + p.dbUpdates <- file } else { l.Infof("Puller (folder %q, file %q): symlink shortcut: %v", p.folder, file.Name, err) } @@ -1048,7 +1062,7 @@ func (p *rwFolder) performFinish(state *sharedPullerState) { } // Record the updated file in the index - p.model.updateLocal(p.folder, state.file) + p.dbUpdates <- state.file } func (p *rwFolder) finisherRoutine(in <-chan *sharedPullerState) { @@ -1089,6 +1103,47 @@ func (p *rwFolder) Jobs() ([]string, []string) { return p.queue.Jobs() } +// dbUpdaterRoutine aggregates db updates and commits them in batches no +// larger than 1000 items, and no more delayed than 2 seconds. +func (p *rwFolder) dbUpdaterRoutine() { + const ( + maxBatchSize = 1000 + maxBatchTime = 2 * time.Second + ) + + batch := make([]protocol.FileInfo, 0, maxBatchSize) + tick := time.NewTicker(maxBatchTime) + defer tick.Stop() + +loop: + for { + select { + case file, ok := <-p.dbUpdates: + if !ok { + break loop + } + + file.LocalVersion = 0 + batch = append(batch, file) + + if len(batch) == maxBatchSize { + p.model.updateLocals(p.folder, batch) + batch = batch[:0] + } + + case <-tick.C: + if len(batch) > 0 { + p.model.updateLocals(p.folder, batch) + batch = batch[:0] + } + } + } + + if len(batch) > 0 { + p.model.updateLocals(p.folder, batch) + } +} + func invalidateFolder(cfg *config.Configuration, folderID string, err error) { for i := range cfg.Folders { folder := &cfg.Folders[i] diff --git a/internal/model/rwfolder_test.go b/internal/model/rwfolder_test.go index 71e21f9cd..61957395b 100644 --- a/internal/model/rwfolder_test.go +++ b/internal/model/rwfolder_test.go @@ -70,7 +70,7 @@ func TestHandleFile(t *testing.T) { m := NewModel(defaultConfig, protocol.LocalDeviceID, "device", "syncthing", "dev", db) m.AddFolder(defaultFolderConfig) // Update index - m.updateLocal("default", existingFile) + m.updateLocals("default", []protocol.FileInfo{existingFile}) p := rwFolder{ folder: "default", @@ -124,7 +124,7 @@ func TestHandleFileWithTemp(t *testing.T) { m := NewModel(defaultConfig, protocol.LocalDeviceID, "device", "syncthing", "dev", db) m.AddFolder(defaultFolderConfig) // Update index - m.updateLocal("default", existingFile) + m.updateLocals("default", []protocol.FileInfo{existingFile}) p := rwFolder{ folder: "default", @@ -184,7 +184,7 @@ func TestCopierFinder(t *testing.T) { m := NewModel(defaultConfig, protocol.LocalDeviceID, "device", "syncthing", "dev", db) m.AddFolder(defaultFolderConfig) // Update index - m.updateLocal("default", existingFile) + m.updateLocals("default", []protocol.FileInfo{existingFile}) iterFn := func(folder, file string, index int32) bool { return true @@ -268,7 +268,7 @@ func TestCopierCleanup(t *testing.T) { } // Add file to index - m.updateLocal("default", file) + m.updateLocals("default", []protocol.FileInfo{file}) if !m.finder.Iterate(blocks[0].Hash, iterFn) { t.Error("Expected block not found") @@ -277,7 +277,7 @@ func TestCopierCleanup(t *testing.T) { file.Blocks = []protocol.BlockInfo{blocks[1]} file.Version = file.Version.Update(protocol.LocalDeviceID.Short()) // Update index (removing old blocks) - m.updateLocal("default", file) + m.updateLocals("default", []protocol.FileInfo{file}) if m.finder.Iterate(blocks[0].Hash, iterFn) { t.Error("Unexpected block found") @@ -290,7 +290,7 @@ func TestCopierCleanup(t *testing.T) { file.Blocks = []protocol.BlockInfo{blocks[0]} file.Version = file.Version.Update(protocol.LocalDeviceID.Short()) // Update index (removing old blocks) - m.updateLocal("default", file) + m.updateLocals("default", []protocol.FileInfo{file}) if !m.finder.Iterate(blocks[0].Hash, iterFn) { t.Error("Unexpected block found") @@ -316,7 +316,7 @@ func TestLastResortPulling(t *testing.T) { Modified: 0, Blocks: []protocol.BlockInfo{blocks[0]}, } - m.updateLocal("default", file) + m.updateLocals("default", []protocol.FileInfo{file}) // Pretend that we are handling a new file of the same content but // with a different name (causing to copy that particular block)