mirror of
https://github.com/octoleo/syncthing.git
synced 2024-11-09 23:00:58 +00:00
358cf25cff
Allow the watcher delay to take fractional values, effectively allowing for much shorter delays. The minimum value is limited at 0.01, which effectively translates to 10ms. This is required in order to guarantee that there is still enough time to aggregate multiple single change events. Signed-off-by: Tomasz Wilczyński <twilczynski@naver.com>
474 lines
15 KiB
Go
474 lines
15 KiB
Go
// Copyright (C) 2016 The Syncthing Authors.
|
|
//
|
|
// This Source Code Form is subject to the terms of the Mozilla Public
|
|
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
|
|
// You can obtain one at http://mozilla.org/MPL/2.0/.
|
|
|
|
package watchaggregator
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"path/filepath"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/syncthing/syncthing/lib/config"
|
|
"github.com/syncthing/syncthing/lib/events"
|
|
"github.com/syncthing/syncthing/lib/fs"
|
|
)
|
|
|
|
// Not meant to be changed, but must be changeable for tests
|
|
var (
|
|
maxFiles = 512
|
|
maxFilesPerDir = 128
|
|
)
|
|
|
|
// aggregatedEvent represents potentially multiple events at and/or recursively
|
|
// below one path until it times out and a scan is scheduled.
|
|
// If it represents multiple events and there are events of both Remove and
|
|
// NonRemove types, the evType attribute is Mixed (as returned by fs.Event.Merge).
|
|
type aggregatedEvent struct {
|
|
firstModTime time.Time
|
|
lastModTime time.Time
|
|
evType fs.EventType
|
|
}
|
|
|
|
// Stores pointers to both aggregated events directly within this directory and
|
|
// child directories recursively containing aggregated events themselves.
|
|
type eventDir struct {
|
|
events map[string]*aggregatedEvent
|
|
dirs map[string]*eventDir
|
|
}
|
|
|
|
func newEventDir() *eventDir {
|
|
return &eventDir{
|
|
events: make(map[string]*aggregatedEvent),
|
|
dirs: make(map[string]*eventDir),
|
|
}
|
|
}
|
|
|
|
func (dir *eventDir) childCount() int {
|
|
return len(dir.events) + len(dir.dirs)
|
|
}
|
|
|
|
func (dir *eventDir) firstModTime() time.Time {
|
|
if dir.childCount() == 0 {
|
|
panic("bug: firstModTime must not be used on empty eventDir")
|
|
}
|
|
firstModTime := time.Now()
|
|
for _, childDir := range dir.dirs {
|
|
dirTime := childDir.firstModTime()
|
|
if dirTime.Before(firstModTime) {
|
|
firstModTime = dirTime
|
|
}
|
|
}
|
|
for _, event := range dir.events {
|
|
if event.firstModTime.Before(firstModTime) {
|
|
firstModTime = event.firstModTime
|
|
}
|
|
}
|
|
return firstModTime
|
|
}
|
|
|
|
func (dir *eventDir) eventType() fs.EventType {
|
|
if dir.childCount() == 0 {
|
|
panic("bug: eventType must not be used on empty eventDir")
|
|
}
|
|
var evType fs.EventType
|
|
for _, childDir := range dir.dirs {
|
|
evType |= childDir.eventType()
|
|
if evType == fs.Mixed {
|
|
return fs.Mixed
|
|
}
|
|
}
|
|
for _, event := range dir.events {
|
|
evType |= event.evType
|
|
if evType == fs.Mixed {
|
|
return fs.Mixed
|
|
}
|
|
}
|
|
return evType
|
|
}
|
|
|
|
type aggregator struct {
|
|
// folderID never changes and is accessed in CommitConfiguration, which
|
|
// asynchronously updates folderCfg -> can't use folderCfg.ID (racy)
|
|
folderID string
|
|
folderCfg config.FolderConfiguration
|
|
folderCfgUpdate chan config.FolderConfiguration
|
|
// Time after which an event is scheduled for scanning when no modifications occur.
|
|
notifyDelay time.Duration
|
|
// Time after which an event is scheduled for scanning even though modifications occur.
|
|
notifyTimeout time.Duration
|
|
notifyTimer *time.Timer
|
|
notifyTimerNeedsReset bool
|
|
notifyTimerResetChan chan time.Duration
|
|
counts eventCounter
|
|
root *eventDir
|
|
ctx context.Context
|
|
}
|
|
|
|
type eventCounter struct {
|
|
removes int // Includes mixed events.
|
|
nonRemoves int
|
|
}
|
|
|
|
func (c *eventCounter) add(typ fs.EventType, n int) {
|
|
if typ&fs.Remove != 0 {
|
|
c.removes += n
|
|
} else {
|
|
c.nonRemoves += n
|
|
}
|
|
}
|
|
|
|
func (c *eventCounter) total() int { return c.removes + c.nonRemoves }
|
|
|
|
func newAggregator(ctx context.Context, folderCfg config.FolderConfiguration) *aggregator {
|
|
a := &aggregator{
|
|
folderID: folderCfg.ID,
|
|
folderCfgUpdate: make(chan config.FolderConfiguration),
|
|
notifyTimerNeedsReset: false,
|
|
notifyTimerResetChan: make(chan time.Duration),
|
|
root: newEventDir(),
|
|
ctx: ctx,
|
|
}
|
|
|
|
a.updateConfig(folderCfg)
|
|
|
|
return a
|
|
}
|
|
|
|
func Aggregate(ctx context.Context, in <-chan fs.Event, out chan<- []string, folderCfg config.FolderConfiguration, cfg config.Wrapper, evLogger events.Logger) {
|
|
a := newAggregator(ctx, folderCfg)
|
|
|
|
// Necessary for unit tests where the backend is mocked
|
|
go a.mainLoop(in, out, cfg, evLogger)
|
|
}
|
|
|
|
func (a *aggregator) mainLoop(in <-chan fs.Event, out chan<- []string, cfg config.Wrapper, evLogger events.Logger) {
|
|
a.notifyTimer = time.NewTimer(a.notifyDelay)
|
|
defer a.notifyTimer.Stop()
|
|
|
|
inProgressItemSubscription := evLogger.Subscribe(events.ItemStarted | events.ItemFinished)
|
|
defer inProgressItemSubscription.Unsubscribe()
|
|
|
|
cfg.Subscribe(a)
|
|
defer cfg.Unsubscribe(a)
|
|
|
|
inProgress := make(map[string]struct{})
|
|
|
|
for {
|
|
select {
|
|
case event := <-in:
|
|
a.newEvent(event, inProgress)
|
|
case event, ok := <-inProgressItemSubscription.C():
|
|
if ok {
|
|
updateInProgressSet(event, inProgress)
|
|
}
|
|
case <-a.notifyTimer.C:
|
|
a.actOnTimer(out)
|
|
case interval := <-a.notifyTimerResetChan:
|
|
a.resetNotifyTimer(interval)
|
|
case folderCfg := <-a.folderCfgUpdate:
|
|
a.updateConfig(folderCfg)
|
|
case <-a.ctx.Done():
|
|
l.Debugln(a, "Stopped")
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (a *aggregator) newEvent(event fs.Event, inProgress map[string]struct{}) {
|
|
if _, ok := a.root.events["."]; ok {
|
|
l.Debugln(a, "Will scan entire folder anyway; dropping:", event.Name)
|
|
return
|
|
}
|
|
if _, ok := inProgress[event.Name]; ok {
|
|
l.Debugln(a, "Skipping path we modified:", event.Name)
|
|
return
|
|
}
|
|
a.aggregateEvent(event, time.Now())
|
|
}
|
|
|
|
func (a *aggregator) aggregateEvent(event fs.Event, evTime time.Time) {
|
|
if event.Name == "." || a.counts.total() == maxFiles {
|
|
l.Debugln(a, "Scan entire folder")
|
|
firstModTime := evTime
|
|
if a.root.childCount() != 0 {
|
|
event.Type = event.Type.Merge(a.root.eventType())
|
|
firstModTime = a.root.firstModTime()
|
|
}
|
|
a.root.dirs = make(map[string]*eventDir)
|
|
a.root.events = make(map[string]*aggregatedEvent)
|
|
a.root.events["."] = &aggregatedEvent{
|
|
firstModTime: firstModTime,
|
|
lastModTime: evTime,
|
|
evType: event.Type,
|
|
}
|
|
a.counts = eventCounter{}
|
|
a.counts.add(event.Type, 1)
|
|
a.resetNotifyTimerIfNeeded()
|
|
return
|
|
}
|
|
|
|
parentDir := a.root
|
|
|
|
// Check if any parent directory is already tracked or will exceed
|
|
// events per directory limit bottom up
|
|
pathSegments := strings.Split(filepath.ToSlash(event.Name), "/")
|
|
|
|
// As root dir cannot be further aggregated, allow up to maxFiles
|
|
// children.
|
|
localMaxFilesPerDir := maxFiles
|
|
var currPath string
|
|
for i, name := range pathSegments[:len(pathSegments)-1] {
|
|
currPath = filepath.Join(currPath, name)
|
|
|
|
if ev, ok := parentDir.events[name]; ok {
|
|
ev.lastModTime = evTime
|
|
if merged := event.Type.Merge(ev.evType); ev.evType != merged {
|
|
a.counts.add(ev.evType, -1)
|
|
a.counts.add(merged, 1)
|
|
ev.evType = merged
|
|
}
|
|
l.Debugf("%v Parent %s (type %s) already tracked: %s", a, currPath, ev.evType, event.Name)
|
|
return
|
|
}
|
|
|
|
if parentDir.childCount() == localMaxFilesPerDir {
|
|
l.Debugf("%v Parent dir %s already has %d children, tracking it instead: %s", a, currPath, localMaxFilesPerDir, event.Name)
|
|
event.Name = filepath.Dir(currPath)
|
|
a.aggregateEvent(event, evTime)
|
|
return
|
|
}
|
|
|
|
// If there are no events below path, but we need to recurse
|
|
// into that path, create eventDir at path.
|
|
if newParent, ok := parentDir.dirs[name]; ok {
|
|
parentDir = newParent
|
|
} else {
|
|
l.Debugln(a, "Creating eventDir at:", currPath)
|
|
newParent = newEventDir()
|
|
parentDir.dirs[name] = newParent
|
|
parentDir = newParent
|
|
}
|
|
|
|
// Reset allowed children count to maxFilesPerDir for non-root
|
|
if i == 0 {
|
|
localMaxFilesPerDir = maxFilesPerDir
|
|
}
|
|
}
|
|
|
|
name := pathSegments[len(pathSegments)-1]
|
|
|
|
if ev, ok := parentDir.events[name]; ok {
|
|
ev.lastModTime = evTime
|
|
if merged := event.Type.Merge(ev.evType); ev.evType != merged {
|
|
a.counts.add(ev.evType, -1)
|
|
a.counts.add(merged, 1)
|
|
ev.evType = merged
|
|
}
|
|
l.Debugf("%v Already tracked (type %v): %s", a, ev.evType, event.Name)
|
|
return
|
|
}
|
|
|
|
childDir, ok := parentDir.dirs[name]
|
|
|
|
// If a dir existed at path, it would be removed from dirs, thus
|
|
// childCount would not increase.
|
|
if !ok && parentDir.childCount() == localMaxFilesPerDir {
|
|
l.Debugf("%v Parent dir already has %d children, tracking it instead: %s", a, localMaxFilesPerDir, event.Name)
|
|
event.Name = filepath.Dir(event.Name)
|
|
a.aggregateEvent(event, evTime)
|
|
return
|
|
}
|
|
|
|
firstModTime := evTime
|
|
if ok {
|
|
firstModTime = childDir.firstModTime()
|
|
if merged := event.Type.Merge(childDir.eventType()); event.Type != merged {
|
|
a.counts.add(event.Type, -1)
|
|
event.Type = merged
|
|
}
|
|
delete(parentDir.dirs, name)
|
|
}
|
|
l.Debugf("%v Tracking (type %v): %s", a, event.Type, event.Name)
|
|
parentDir.events[name] = &aggregatedEvent{
|
|
firstModTime: firstModTime,
|
|
lastModTime: evTime,
|
|
evType: event.Type,
|
|
}
|
|
a.counts.add(event.Type, 1)
|
|
a.resetNotifyTimerIfNeeded()
|
|
}
|
|
|
|
func (a *aggregator) resetNotifyTimerIfNeeded() {
|
|
if a.notifyTimerNeedsReset {
|
|
a.resetNotifyTimer(a.notifyDelay)
|
|
}
|
|
}
|
|
|
|
// resetNotifyTimer should only ever be called when notifyTimer has stopped
|
|
// and notifyTimer.C been read from. Otherwise, call resetNotifyTimerIfNeeded.
|
|
func (a *aggregator) resetNotifyTimer(duration time.Duration) {
|
|
l.Debugln(a, "Resetting notifyTimer to", duration.String())
|
|
a.notifyTimerNeedsReset = false
|
|
a.notifyTimer.Reset(duration)
|
|
}
|
|
|
|
func (a *aggregator) actOnTimer(out chan<- []string) {
|
|
c := a.counts.total()
|
|
if c == 0 {
|
|
l.Debugln(a, "No tracked events, waiting for new event.")
|
|
a.notifyTimerNeedsReset = true
|
|
return
|
|
}
|
|
oldEvents := make(map[string]*aggregatedEvent, c)
|
|
a.popOldEventsTo(oldEvents, a.root, ".", time.Now(), true)
|
|
if a.notifyDelay != a.notifyTimeout && a.counts.nonRemoves == 0 && a.counts.removes != 0 {
|
|
// Only delayed events remaining, no need to delay them additionally
|
|
a.popOldEventsTo(oldEvents, a.root, ".", time.Now(), false)
|
|
}
|
|
if len(oldEvents) == 0 {
|
|
l.Debugln(a, "No old fs events")
|
|
a.resetNotifyTimer(a.notifyDelay)
|
|
return
|
|
}
|
|
// Sending to channel might block for a long time, but we need to keep
|
|
// reading from notify backend channel to avoid overflow
|
|
go a.notify(oldEvents, out)
|
|
}
|
|
|
|
// Schedule scan for given events dispatching deletes last and reset notification
|
|
// afterwards to set up for the next scan scheduling.
|
|
func (a *aggregator) notify(oldEvents map[string]*aggregatedEvent, out chan<- []string) {
|
|
timeBeforeSending := time.Now()
|
|
l.Debugf("%v Notifying about %d fs events", a, len(oldEvents))
|
|
separatedBatches := make(map[fs.EventType][]string)
|
|
for path, event := range oldEvents {
|
|
separatedBatches[event.evType] = append(separatedBatches[event.evType], path)
|
|
}
|
|
for _, evType := range [3]fs.EventType{fs.NonRemove, fs.Mixed, fs.Remove} {
|
|
currBatch := separatedBatches[evType]
|
|
if len(currBatch) != 0 {
|
|
select {
|
|
case out <- currBatch:
|
|
case <-a.ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}
|
|
// If sending to channel blocked for a long time,
|
|
// shorten next notifyDelay accordingly.
|
|
duration := time.Since(timeBeforeSending)
|
|
buffer := time.Millisecond
|
|
var nextDelay time.Duration
|
|
switch {
|
|
case duration < a.notifyDelay/10:
|
|
nextDelay = a.notifyDelay
|
|
case duration+buffer > a.notifyDelay:
|
|
nextDelay = buffer
|
|
default:
|
|
nextDelay = a.notifyDelay - duration
|
|
}
|
|
select {
|
|
case a.notifyTimerResetChan <- nextDelay:
|
|
case <-a.ctx.Done():
|
|
}
|
|
}
|
|
|
|
// popOldEvents finds events that should be scheduled for scanning recursively in dirs,
|
|
// removes those events and empty eventDirs and returns a map with all the removed
|
|
// events referenced by their filesystem path
|
|
func (a *aggregator) popOldEventsTo(to map[string]*aggregatedEvent, dir *eventDir, dirPath string, currTime time.Time, delayRem bool) {
|
|
for childName, childDir := range dir.dirs {
|
|
a.popOldEventsTo(to, childDir, filepath.Join(dirPath, childName), currTime, delayRem)
|
|
if childDir.childCount() == 0 {
|
|
delete(dir.dirs, childName)
|
|
}
|
|
}
|
|
for name, event := range dir.events {
|
|
if a.isOld(event, currTime, delayRem) {
|
|
to[filepath.Join(dirPath, name)] = event
|
|
delete(dir.events, name)
|
|
a.counts.add(event.evType, -1)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (a *aggregator) isOld(ev *aggregatedEvent, currTime time.Time, delayRem bool) bool {
|
|
// Deletes should in general be scanned last, therefore they are delayed by
|
|
// letting them time out. This behaviour is overridden by delayRem == false.
|
|
// Refer to following comments as to why.
|
|
// An event that has not registered any new modifications recently is scanned.
|
|
// a.notifyDelay is the user facing value signifying the normal delay between
|
|
// picking up a modification and scanning it. As scheduling scans happens at
|
|
// regular intervals of a.notifyDelay the delay of a single event is not exactly
|
|
// a.notifyDelay, but lies in the range of 0.5 to 1.5 times a.notifyDelay.
|
|
if (!delayRem || ev.evType == fs.NonRemove) && 2*currTime.Sub(ev.lastModTime) > a.notifyDelay {
|
|
return true
|
|
}
|
|
// When an event registers repeat modifications or involves removals it
|
|
// is delayed to reduce resource usage, but after a certain time (notifyTimeout)
|
|
// passed it is scanned anyway.
|
|
// If only removals are remaining to be scanned, there is no point to delay
|
|
// removals further, so this behaviour is overridden by delayRem == false.
|
|
return currTime.Sub(ev.firstModTime) > a.notifyTimeout
|
|
}
|
|
|
|
func (a *aggregator) String() string {
|
|
return fmt.Sprintf("aggregator/%s:", a.folderCfg.Description())
|
|
}
|
|
|
|
func (a *aggregator) CommitConfiguration(_, to config.Configuration) bool {
|
|
for _, folderCfg := range to.Folders {
|
|
if folderCfg.ID == a.folderID {
|
|
select {
|
|
case a.folderCfgUpdate <- folderCfg:
|
|
case <-a.ctx.Done():
|
|
}
|
|
return true
|
|
}
|
|
}
|
|
// Nothing to do, model will soon stop this
|
|
return true
|
|
}
|
|
|
|
func (a *aggregator) updateConfig(folderCfg config.FolderConfiguration) {
|
|
a.notifyDelay = time.Duration(folderCfg.FSWatcherDelayS) * time.Second
|
|
a.notifyTimeout = notifyTimeout(folderCfg.FSWatcherDelayS)
|
|
a.folderCfg = folderCfg
|
|
}
|
|
|
|
func updateInProgressSet(event events.Event, inProgress map[string]struct{}) {
|
|
if event.Type == events.ItemStarted {
|
|
path := event.Data.(map[string]string)["item"]
|
|
inProgress[path] = struct{}{}
|
|
} else if event.Type == events.ItemFinished {
|
|
path := event.Data.(map[string]interface{})["item"].(string)
|
|
delete(inProgress, path)
|
|
}
|
|
}
|
|
|
|
// Events that involve removals or continuously receive new modifications are
|
|
// delayed but must time out at some point. The following numbers come out of thin
|
|
// air, they were just considered as a sensible compromise between fast updates and
|
|
// saving resources. For short delays the timeout is 6 times the delay, capped at 1
|
|
// minute. For delays longer than 1 minute, the delay and timeout are equal.
|
|
func notifyTimeout(eventDelayS float64) time.Duration {
|
|
const (
|
|
shortDelayS = 10
|
|
shortDelayMultiplicator = 6
|
|
longDelayS = 60
|
|
)
|
|
longDelayTimeout := time.Duration(1) * time.Minute
|
|
if eventDelayS < shortDelayS {
|
|
return time.Duration(eventDelayS*shortDelayMultiplicator) * time.Second
|
|
}
|
|
if eventDelayS < longDelayS {
|
|
return longDelayTimeout
|
|
}
|
|
return time.Duration(eventDelayS) * time.Second
|
|
}
|