mirror of
https://github.com/octoleo/syncthing.git
synced 2024-11-14 01:04:14 +00:00
55937b61ca
This adds a new config with the simple and concise name maxConcurrentIncomingRequestKiB. This limits how many bytes we have "in the air" in the form of response data being read and processed. After some testing I think that not having this limiter is seldom a great idea and thus I propose a default value of 256 MiB for this new setting. I also refactored the folder IO limiter to be a model/folder attribute instead of a package global.
939 lines
24 KiB
Go
939 lines
24 KiB
Go
// Copyright (C) 2014 The Syncthing Authors.
|
|
//
|
|
// This Source Code Form is subject to the terms of the Mozilla Public
|
|
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
|
|
// You can obtain one at https://mozilla.org/MPL/2.0/.
|
|
|
|
package model
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"math/rand"
|
|
"path/filepath"
|
|
"sort"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/pkg/errors"
|
|
|
|
"github.com/syncthing/syncthing/lib/config"
|
|
"github.com/syncthing/syncthing/lib/db"
|
|
"github.com/syncthing/syncthing/lib/events"
|
|
"github.com/syncthing/syncthing/lib/fs"
|
|
"github.com/syncthing/syncthing/lib/ignore"
|
|
"github.com/syncthing/syncthing/lib/locations"
|
|
"github.com/syncthing/syncthing/lib/osutil"
|
|
"github.com/syncthing/syncthing/lib/protocol"
|
|
"github.com/syncthing/syncthing/lib/scanner"
|
|
"github.com/syncthing/syncthing/lib/stats"
|
|
"github.com/syncthing/syncthing/lib/sync"
|
|
"github.com/syncthing/syncthing/lib/watchaggregator"
|
|
|
|
"github.com/thejerf/suture"
|
|
)
|
|
|
|
type folder struct {
|
|
suture.Service
|
|
stateTracker
|
|
config.FolderConfiguration
|
|
*stats.FolderStatisticsReference
|
|
ioLimiter *byteSemaphore
|
|
|
|
localFlags uint32
|
|
|
|
model *model
|
|
shortID protocol.ShortID
|
|
fset *db.FileSet
|
|
ignores *ignore.Matcher
|
|
ctx context.Context
|
|
|
|
scanInterval time.Duration
|
|
scanTimer *time.Timer
|
|
scanNow chan rescanRequest
|
|
scanDelay chan time.Duration
|
|
initialScanFinished chan struct{}
|
|
scanErrors []FileError
|
|
scanErrorsMut sync.Mutex
|
|
|
|
pullScheduled chan struct{}
|
|
|
|
watchCancel context.CancelFunc
|
|
watchChan chan []string
|
|
restartWatchChan chan struct{}
|
|
watchErr error
|
|
watchMut sync.Mutex
|
|
|
|
puller puller
|
|
}
|
|
|
|
type rescanRequest struct {
|
|
subdirs []string
|
|
err chan error
|
|
}
|
|
|
|
type puller interface {
|
|
pull() bool // true when successfull and should not be retried
|
|
}
|
|
|
|
func newFolder(model *model, fset *db.FileSet, ignores *ignore.Matcher, cfg config.FolderConfiguration, evLogger events.Logger, ioLimiter *byteSemaphore) folder {
|
|
return folder{
|
|
stateTracker: newStateTracker(cfg.ID, evLogger),
|
|
FolderConfiguration: cfg,
|
|
FolderStatisticsReference: stats.NewFolderStatisticsReference(model.db, cfg.ID),
|
|
ioLimiter: ioLimiter,
|
|
|
|
model: model,
|
|
shortID: model.shortID,
|
|
fset: fset,
|
|
ignores: ignores,
|
|
|
|
scanInterval: time.Duration(cfg.RescanIntervalS) * time.Second,
|
|
scanTimer: time.NewTimer(time.Millisecond), // The first scan should be done immediately.
|
|
scanNow: make(chan rescanRequest),
|
|
scanDelay: make(chan time.Duration),
|
|
initialScanFinished: make(chan struct{}),
|
|
scanErrorsMut: sync.NewMutex(),
|
|
|
|
pullScheduled: make(chan struct{}, 1), // This needs to be 1-buffered so that we queue a pull if we're busy when it comes.
|
|
|
|
watchCancel: func() {},
|
|
restartWatchChan: make(chan struct{}, 1),
|
|
watchMut: sync.NewMutex(),
|
|
}
|
|
}
|
|
|
|
func (f *folder) serve(ctx context.Context) {
|
|
atomic.AddInt32(&f.model.foldersRunning, 1)
|
|
defer atomic.AddInt32(&f.model.foldersRunning, -1)
|
|
|
|
f.ctx = ctx
|
|
|
|
l.Debugln(f, "starting")
|
|
defer l.Debugln(f, "exiting")
|
|
|
|
defer func() {
|
|
f.scanTimer.Stop()
|
|
f.setState(FolderIdle)
|
|
}()
|
|
|
|
pause := f.basePause()
|
|
pullFailTimer := time.NewTimer(0)
|
|
<-pullFailTimer.C
|
|
|
|
if f.FSWatcherEnabled && f.CheckHealth() == nil {
|
|
f.startWatch()
|
|
}
|
|
|
|
initialCompleted := f.initialScanFinished
|
|
|
|
pull := func() {
|
|
startTime := time.Now()
|
|
if f.pull() {
|
|
// We're good. Don't schedule another pull and reset
|
|
// the pause interval.
|
|
pause = f.basePause()
|
|
return
|
|
}
|
|
// Pulling failed, try again later.
|
|
delay := pause + time.Since(startTime)
|
|
l.Infof("Folder %v isn't making sync progress - retrying in %v.", f.Description(), delay)
|
|
pullFailTimer.Reset(delay)
|
|
if pause < 60*f.basePause() {
|
|
pause *= 2
|
|
}
|
|
}
|
|
|
|
for {
|
|
select {
|
|
case <-f.ctx.Done():
|
|
return
|
|
|
|
case <-f.pullScheduled:
|
|
pullFailTimer.Stop()
|
|
select {
|
|
case <-pullFailTimer.C:
|
|
default:
|
|
}
|
|
pull()
|
|
|
|
case <-pullFailTimer.C:
|
|
pull()
|
|
|
|
case <-initialCompleted:
|
|
// Initial scan has completed, we should do a pull
|
|
initialCompleted = nil // never hit this case again
|
|
if !f.pull() {
|
|
// Pulling failed, try again later.
|
|
pullFailTimer.Reset(pause)
|
|
}
|
|
|
|
case <-f.scanTimer.C:
|
|
l.Debugln(f, "Scanning due to timer")
|
|
f.scanTimerFired()
|
|
|
|
case req := <-f.scanNow:
|
|
l.Debugln(f, "Scanning due to request")
|
|
req.err <- f.scanSubdirs(req.subdirs)
|
|
|
|
case next := <-f.scanDelay:
|
|
l.Debugln(f, "Delaying scan")
|
|
f.scanTimer.Reset(next)
|
|
|
|
case fsEvents := <-f.watchChan:
|
|
l.Debugln(f, "Scan due to watcher")
|
|
f.scanSubdirs(fsEvents)
|
|
|
|
case <-f.restartWatchChan:
|
|
l.Debugln(f, "Restart watcher")
|
|
f.restartWatch()
|
|
}
|
|
}
|
|
}
|
|
|
|
func (f *folder) BringToFront(string) {}
|
|
|
|
func (f *folder) Override() {}
|
|
|
|
func (f *folder) Revert() {}
|
|
|
|
func (f *folder) DelayScan(next time.Duration) {
|
|
f.Delay(next)
|
|
}
|
|
|
|
func (f *folder) ignoresUpdated() {
|
|
if f.FSWatcherEnabled {
|
|
f.scheduleWatchRestart()
|
|
}
|
|
}
|
|
|
|
func (f *folder) SchedulePull() {
|
|
select {
|
|
case f.pullScheduled <- struct{}{}:
|
|
default:
|
|
// We might be busy doing a pull and thus not reading from this
|
|
// channel. The channel is 1-buffered, so one notification will be
|
|
// queued to ensure we recheck after the pull, but beyond that we must
|
|
// make sure to not block index receiving.
|
|
}
|
|
}
|
|
|
|
func (f *folder) Jobs(_, _ int) ([]string, []string, int) {
|
|
return nil, nil, 0
|
|
}
|
|
|
|
func (f *folder) Scan(subdirs []string) error {
|
|
<-f.initialScanFinished
|
|
req := rescanRequest{
|
|
subdirs: subdirs,
|
|
err: make(chan error),
|
|
}
|
|
|
|
select {
|
|
case f.scanNow <- req:
|
|
return <-req.err
|
|
case <-f.ctx.Done():
|
|
return f.ctx.Err()
|
|
}
|
|
}
|
|
|
|
func (f *folder) Reschedule() {
|
|
if f.scanInterval == 0 {
|
|
return
|
|
}
|
|
// Sleep a random time between 3/4 and 5/4 of the configured interval.
|
|
sleepNanos := (f.scanInterval.Nanoseconds()*3 + rand.Int63n(2*f.scanInterval.Nanoseconds())) / 4
|
|
interval := time.Duration(sleepNanos) * time.Nanosecond
|
|
l.Debugln(f, "next rescan in", interval)
|
|
f.scanTimer.Reset(interval)
|
|
}
|
|
|
|
func (f *folder) Delay(next time.Duration) {
|
|
f.scanDelay <- next
|
|
}
|
|
|
|
// CheckHealth checks the folder for common errors, updates the folder state
|
|
// and returns the current folder error, or nil if the folder is healthy.
|
|
func (f *folder) CheckHealth() error {
|
|
err := f.getHealthError()
|
|
f.setError(err)
|
|
return err
|
|
}
|
|
|
|
func (f *folder) getHealthError() error {
|
|
// Check for folder errors, with the most serious and specific first and
|
|
// generic ones like out of space on the home disk later.
|
|
|
|
if err := f.CheckPath(); err != nil {
|
|
return err
|
|
}
|
|
|
|
dbPath := locations.Get(locations.Database)
|
|
if usage, err := fs.NewFilesystem(fs.FilesystemTypeBasic, dbPath).Usage("."); err == nil {
|
|
if err = config.CheckFreeSpace(f.model.cfg.Options().MinHomeDiskFree, usage); err != nil {
|
|
return errors.Wrapf(err, "insufficient space on disk for database (%v)", dbPath)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (f *folder) pull() bool {
|
|
select {
|
|
case <-f.initialScanFinished:
|
|
default:
|
|
// Once the initial scan finished, a pull will be scheduled
|
|
return true
|
|
}
|
|
|
|
// If there is nothing to do, don't even enter sync-waiting state.
|
|
abort := true
|
|
snap := f.fset.Snapshot()
|
|
snap.WithNeed(protocol.LocalDeviceID, func(intf db.FileIntf) bool {
|
|
abort = false
|
|
return false
|
|
})
|
|
snap.Release()
|
|
if abort {
|
|
return true
|
|
}
|
|
|
|
f.setState(FolderSyncWaiting)
|
|
defer f.setState(FolderIdle)
|
|
|
|
f.ioLimiter.take(1)
|
|
defer f.ioLimiter.give(1)
|
|
|
|
return f.puller.pull()
|
|
}
|
|
|
|
func (f *folder) scanSubdirs(subDirs []string) error {
|
|
if err := f.getHealthError(); err != nil {
|
|
// If there is a health error we set it as the folder error. We do not
|
|
// clear the folder error if there is no health error, as there might be
|
|
// an *other* folder error (failed to load ignores, for example). Hence
|
|
// we do not use the CheckHealth() convenience function here.
|
|
f.setError(err)
|
|
return err
|
|
}
|
|
|
|
oldHash := f.ignores.Hash()
|
|
if err := f.ignores.Load(".stignore"); err != nil && !fs.IsNotExist(err) {
|
|
err = errors.Wrap(err, "loading ignores")
|
|
f.setError(err)
|
|
return err
|
|
}
|
|
|
|
// Check on the way out if the ignore patterns changed as part of scanning
|
|
// this folder. If they did we should schedule a pull of the folder so that
|
|
// we request things we might have suddenly become unignored and so on.
|
|
defer func() {
|
|
if f.ignores.Hash() != oldHash {
|
|
l.Debugln("Folder", f.Description(), "ignore patterns change detected while scanning; triggering puller")
|
|
f.ignoresUpdated()
|
|
f.SchedulePull()
|
|
}
|
|
}()
|
|
|
|
// We've passed all health checks so now mark ourselves healthy and queued
|
|
// for scanning.
|
|
f.setError(nil)
|
|
f.setState(FolderScanWaiting)
|
|
|
|
f.ioLimiter.take(1)
|
|
defer f.ioLimiter.give(1)
|
|
|
|
for i := range subDirs {
|
|
sub := osutil.NativeFilename(subDirs[i])
|
|
|
|
if sub == "" {
|
|
// A blank subdirs means to scan the entire folder. We can trim
|
|
// the subDirs list and go on our way.
|
|
subDirs = nil
|
|
break
|
|
}
|
|
|
|
subDirs[i] = sub
|
|
}
|
|
|
|
snap := f.fset.Snapshot()
|
|
// We release explicitly later in this function, however we might exit early
|
|
// and it's ok to release twice.
|
|
defer snap.Release()
|
|
|
|
// Clean the list of subitems to ensure that we start at a known
|
|
// directory, and don't scan subdirectories of things we've already
|
|
// scanned.
|
|
subDirs = unifySubs(subDirs, func(file string) bool {
|
|
_, ok := snap.Get(protocol.LocalDeviceID, file)
|
|
return ok
|
|
})
|
|
|
|
f.setState(FolderScanning)
|
|
|
|
mtimefs := f.fset.MtimeFS()
|
|
fchan := scanner.Walk(f.ctx, scanner.Config{
|
|
Folder: f.ID,
|
|
Subs: subDirs,
|
|
Matcher: f.ignores,
|
|
TempLifetime: time.Duration(f.model.cfg.Options().KeepTemporariesH) * time.Hour,
|
|
CurrentFiler: cFiler{snap},
|
|
Filesystem: mtimefs,
|
|
IgnorePerms: f.IgnorePerms,
|
|
AutoNormalize: f.AutoNormalize,
|
|
Hashers: f.model.numHashers(f.ID),
|
|
ShortID: f.shortID,
|
|
ProgressTickIntervalS: f.ScanProgressIntervalS,
|
|
LocalFlags: f.localFlags,
|
|
ModTimeWindow: f.ModTimeWindow(),
|
|
EventLogger: f.evLogger,
|
|
})
|
|
|
|
batchFn := func(fs []protocol.FileInfo) error {
|
|
if err := f.CheckHealth(); err != nil {
|
|
l.Debugf("Stopping scan of folder %s due to: %s", f.Description(), err)
|
|
return err
|
|
}
|
|
f.updateLocalsFromScanning(fs)
|
|
return nil
|
|
}
|
|
// Resolve items which are identical with the global state.
|
|
if f.localFlags&protocol.FlagLocalReceiveOnly != 0 {
|
|
oldBatchFn := batchFn // can't reference batchFn directly (recursion)
|
|
batchFn = func(fs []protocol.FileInfo) error {
|
|
for i := range fs {
|
|
switch gf, ok := snap.GetGlobal(fs[i].Name); {
|
|
case !ok:
|
|
continue
|
|
case gf.IsEquivalentOptional(fs[i], f.ModTimeWindow(), false, false, protocol.FlagLocalReceiveOnly):
|
|
// What we have locally is equivalent to the global file.
|
|
fs[i].Version = fs[i].Version.Merge(gf.Version)
|
|
fallthrough
|
|
case fs[i].IsDeleted() && gf.IsReceiveOnlyChanged():
|
|
// Our item is deleted and the global item is our own
|
|
// receive only file. We can't delete file infos, so
|
|
// we just pretend it is a normal deleted file (nobody
|
|
// cares about that).
|
|
fs[i].LocalFlags &^= protocol.FlagLocalReceiveOnly
|
|
}
|
|
}
|
|
return oldBatchFn(fs)
|
|
}
|
|
}
|
|
batch := newFileInfoBatch(batchFn)
|
|
|
|
// Schedule a pull after scanning, but only if we actually detected any
|
|
// changes.
|
|
changes := 0
|
|
defer func() {
|
|
if changes > 0 {
|
|
f.SchedulePull()
|
|
}
|
|
}()
|
|
|
|
f.clearScanErrors(subDirs)
|
|
for res := range fchan {
|
|
if res.Err != nil {
|
|
f.newScanError(res.Path, res.Err)
|
|
continue
|
|
}
|
|
if err := batch.flushIfFull(); err != nil {
|
|
return err
|
|
}
|
|
|
|
batch.append(res.File)
|
|
changes++
|
|
}
|
|
|
|
if err := batch.flush(); err != nil {
|
|
return err
|
|
}
|
|
|
|
if len(subDirs) == 0 {
|
|
// If we have no specific subdirectories to traverse, set it to one
|
|
// empty prefix so we traverse the entire folder contents once.
|
|
subDirs = []string{""}
|
|
}
|
|
|
|
// Do a scan of the database for each prefix, to check for deleted and
|
|
// ignored files.
|
|
var toIgnore []db.FileInfoTruncated
|
|
ignoredParent := ""
|
|
|
|
snap.Release()
|
|
snap = f.fset.Snapshot()
|
|
defer snap.Release()
|
|
|
|
for _, sub := range subDirs {
|
|
var iterError error
|
|
|
|
snap.WithPrefixedHaveTruncated(protocol.LocalDeviceID, sub, func(fi db.FileIntf) bool {
|
|
select {
|
|
case <-f.ctx.Done():
|
|
return false
|
|
default:
|
|
}
|
|
|
|
file := fi.(db.FileInfoTruncated)
|
|
|
|
if err := batch.flushIfFull(); err != nil {
|
|
iterError = err
|
|
return false
|
|
}
|
|
|
|
if ignoredParent != "" && !fs.IsParent(file.Name, ignoredParent) {
|
|
for _, file := range toIgnore {
|
|
l.Debugln("marking file as ignored", file)
|
|
nf := file.ConvertToIgnoredFileInfo(f.shortID)
|
|
batch.append(nf)
|
|
changes++
|
|
if err := batch.flushIfFull(); err != nil {
|
|
iterError = err
|
|
return false
|
|
}
|
|
}
|
|
toIgnore = toIgnore[:0]
|
|
ignoredParent = ""
|
|
}
|
|
|
|
switch ignored := f.ignores.Match(file.Name).IsIgnored(); {
|
|
case !file.IsIgnored() && ignored:
|
|
// File was not ignored at last pass but has been ignored.
|
|
if file.IsDirectory() {
|
|
// Delay ignoring as a child might be unignored.
|
|
toIgnore = append(toIgnore, file)
|
|
if ignoredParent == "" {
|
|
// If the parent wasn't ignored already, set
|
|
// this path as the "highest" ignored parent
|
|
ignoredParent = file.Name
|
|
}
|
|
return true
|
|
}
|
|
|
|
l.Debugln("marking file as ignored", file)
|
|
nf := file.ConvertToIgnoredFileInfo(f.shortID)
|
|
batch.append(nf)
|
|
changes++
|
|
|
|
case file.IsIgnored() && !ignored:
|
|
// Successfully scanned items are already un-ignored during
|
|
// the scan, so check whether it is deleted.
|
|
fallthrough
|
|
case !file.IsIgnored() && !file.IsDeleted() && !file.IsUnsupported():
|
|
// The file is not ignored, deleted or unsupported. Lets check if
|
|
// it's still here. Simply stat:ing it wont do as there are
|
|
// tons of corner cases (e.g. parent dir->symlink, missing
|
|
// permissions)
|
|
if !osutil.IsDeleted(mtimefs, file.Name) {
|
|
if ignoredParent != "" {
|
|
// Don't ignore parents of this not ignored item
|
|
toIgnore = toIgnore[:0]
|
|
ignoredParent = ""
|
|
}
|
|
return true
|
|
}
|
|
nf := protocol.FileInfo{
|
|
Name: file.Name,
|
|
Type: file.Type,
|
|
Size: 0,
|
|
ModifiedS: file.ModifiedS,
|
|
ModifiedNs: file.ModifiedNs,
|
|
ModifiedBy: f.shortID,
|
|
Deleted: true,
|
|
Version: file.Version.Update(f.shortID),
|
|
LocalFlags: f.localFlags,
|
|
}
|
|
// We do not want to override the global version
|
|
// with the deleted file. Setting to an empty
|
|
// version makes sure the file gets in sync on
|
|
// the following pull.
|
|
if file.ShouldConflict() {
|
|
nf.Version = protocol.Vector{}
|
|
}
|
|
|
|
batch.append(nf)
|
|
changes++
|
|
}
|
|
return true
|
|
})
|
|
|
|
select {
|
|
case <-f.ctx.Done():
|
|
return f.ctx.Err()
|
|
default:
|
|
}
|
|
|
|
if iterError == nil && len(toIgnore) > 0 {
|
|
for _, file := range toIgnore {
|
|
l.Debugln("marking file as ignored", f)
|
|
nf := file.ConvertToIgnoredFileInfo(f.shortID)
|
|
batch.append(nf)
|
|
changes++
|
|
if iterError = batch.flushIfFull(); iterError != nil {
|
|
break
|
|
}
|
|
}
|
|
toIgnore = toIgnore[:0]
|
|
}
|
|
|
|
if iterError != nil {
|
|
return iterError
|
|
}
|
|
}
|
|
|
|
if err := batch.flush(); err != nil {
|
|
return err
|
|
}
|
|
|
|
f.ScanCompleted()
|
|
f.setState(FolderIdle)
|
|
return nil
|
|
}
|
|
|
|
func (f *folder) scanTimerFired() {
|
|
err := f.scanSubdirs(nil)
|
|
|
|
select {
|
|
case <-f.initialScanFinished:
|
|
default:
|
|
status := "Completed"
|
|
if err != nil {
|
|
status = "Failed"
|
|
}
|
|
l.Infoln(status, "initial scan of", f.Type.String(), "folder", f.Description())
|
|
close(f.initialScanFinished)
|
|
}
|
|
|
|
f.Reschedule()
|
|
}
|
|
|
|
func (f *folder) WatchError() error {
|
|
f.watchMut.Lock()
|
|
defer f.watchMut.Unlock()
|
|
return f.watchErr
|
|
}
|
|
|
|
// stopWatch immediately aborts watching and may be called asynchronously
|
|
func (f *folder) stopWatch() {
|
|
f.watchMut.Lock()
|
|
f.watchCancel()
|
|
f.watchMut.Unlock()
|
|
f.setWatchError(nil)
|
|
}
|
|
|
|
// scheduleWatchRestart makes sure watching is restarted from the main for loop
|
|
// in a folder's Serve and thus may be called asynchronously (e.g. when ignores change).
|
|
func (f *folder) scheduleWatchRestart() {
|
|
select {
|
|
case f.restartWatchChan <- struct{}{}:
|
|
default:
|
|
// We might be busy doing a pull and thus not reading from this
|
|
// channel. The channel is 1-buffered, so one notification will be
|
|
// queued to ensure we recheck after the pull.
|
|
}
|
|
}
|
|
|
|
// restartWatch should only ever be called synchronously. If you want to use
|
|
// this asynchronously, you should probably use scheduleWatchRestart instead.
|
|
func (f *folder) restartWatch() {
|
|
f.stopWatch()
|
|
f.startWatch()
|
|
f.scanSubdirs(nil)
|
|
}
|
|
|
|
// startWatch should only ever be called synchronously. If you want to use
|
|
// this asynchronously, you should probably use scheduleWatchRestart instead.
|
|
func (f *folder) startWatch() {
|
|
ctx, cancel := context.WithCancel(f.ctx)
|
|
f.watchMut.Lock()
|
|
f.watchChan = make(chan []string)
|
|
f.watchCancel = cancel
|
|
f.watchMut.Unlock()
|
|
go f.monitorWatch(ctx)
|
|
}
|
|
|
|
// monitorWatch starts the filesystem watching and retries every minute on failure.
|
|
// It should not be used except in startWatch.
|
|
func (f *folder) monitorWatch(ctx context.Context) {
|
|
failTimer := time.NewTimer(0)
|
|
aggrCtx, aggrCancel := context.WithCancel(ctx)
|
|
var err error
|
|
var eventChan <-chan fs.Event
|
|
var errChan <-chan error
|
|
warnedOutside := false
|
|
for {
|
|
select {
|
|
case <-failTimer.C:
|
|
eventChan, errChan, err = f.Filesystem().Watch(".", f.ignores, ctx, f.IgnorePerms)
|
|
// We do this at most once per minute which is the
|
|
// default rescan time without watcher.
|
|
f.scanOnWatchErr()
|
|
f.setWatchError(err)
|
|
if err != nil {
|
|
failTimer.Reset(time.Minute)
|
|
continue
|
|
}
|
|
watchaggregator.Aggregate(aggrCtx, eventChan, f.watchChan, f.FolderConfiguration, f.model.cfg, f.evLogger)
|
|
l.Debugln("Started filesystem watcher for folder", f.Description())
|
|
case err = <-errChan:
|
|
f.setWatchError(err)
|
|
// This error was previously a panic and should never occur, so generate
|
|
// a warning, but don't do it repetitively.
|
|
if !warnedOutside {
|
|
if _, ok := err.(*fs.ErrWatchEventOutsideRoot); ok {
|
|
l.Warnln(err)
|
|
warnedOutside = true
|
|
}
|
|
}
|
|
aggrCancel()
|
|
errChan = nil
|
|
aggrCtx, aggrCancel = context.WithCancel(ctx)
|
|
failTimer.Reset(time.Minute)
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// setWatchError sets the current error state of the watch and should be called
|
|
// regardless of whether err is nil or not.
|
|
func (f *folder) setWatchError(err error) {
|
|
f.watchMut.Lock()
|
|
prevErr := f.watchErr
|
|
f.watchErr = err
|
|
f.watchMut.Unlock()
|
|
if err != prevErr {
|
|
data := map[string]interface{}{
|
|
"folder": f.ID,
|
|
}
|
|
if prevErr != nil {
|
|
data["from"] = prevErr.Error()
|
|
}
|
|
if err != nil {
|
|
data["to"] = err.Error()
|
|
}
|
|
f.evLogger.Log(events.FolderWatchStateChanged, data)
|
|
}
|
|
if err == nil {
|
|
return
|
|
}
|
|
msg := fmt.Sprintf("Error while trying to start filesystem watcher for folder %s, trying again in 1min: %v", f.Description(), err)
|
|
if prevErr != err {
|
|
l.Infof(msg)
|
|
return
|
|
}
|
|
l.Debugf(msg)
|
|
}
|
|
|
|
// scanOnWatchErr schedules a full scan immediately if an error occurred while watching.
|
|
func (f *folder) scanOnWatchErr() {
|
|
f.watchMut.Lock()
|
|
err := f.watchErr
|
|
f.watchMut.Unlock()
|
|
if err != nil {
|
|
f.Delay(0)
|
|
}
|
|
}
|
|
|
|
func (f *folder) setError(err error) {
|
|
select {
|
|
case <-f.ctx.Done():
|
|
return
|
|
default:
|
|
}
|
|
|
|
_, _, oldErr := f.getState()
|
|
if (err != nil && oldErr != nil && oldErr.Error() == err.Error()) || (err == nil && oldErr == nil) {
|
|
return
|
|
}
|
|
|
|
if err != nil {
|
|
if oldErr == nil {
|
|
l.Warnf("Error on folder %s: %v", f.Description(), err)
|
|
} else {
|
|
l.Infof("Error on folder %s changed: %q -> %q", f.Description(), oldErr, err)
|
|
}
|
|
} else {
|
|
l.Infoln("Cleared error on folder", f.Description())
|
|
}
|
|
|
|
if f.FSWatcherEnabled {
|
|
if err != nil {
|
|
f.stopWatch()
|
|
} else {
|
|
f.scheduleWatchRestart()
|
|
}
|
|
}
|
|
|
|
f.stateTracker.setError(err)
|
|
}
|
|
|
|
func (f *folder) basePause() time.Duration {
|
|
if f.PullerPauseS == 0 {
|
|
return defaultPullerPause
|
|
}
|
|
return time.Duration(f.PullerPauseS) * time.Second
|
|
}
|
|
|
|
func (f *folder) String() string {
|
|
return fmt.Sprintf("%s/%s@%p", f.Type, f.folderID, f)
|
|
}
|
|
|
|
func (f *folder) newScanError(path string, err error) {
|
|
f.scanErrorsMut.Lock()
|
|
f.scanErrors = append(f.scanErrors, FileError{
|
|
Err: err.Error(),
|
|
Path: path,
|
|
})
|
|
f.scanErrorsMut.Unlock()
|
|
}
|
|
|
|
func (f *folder) clearScanErrors(subDirs []string) {
|
|
f.scanErrorsMut.Lock()
|
|
defer f.scanErrorsMut.Unlock()
|
|
if len(subDirs) == 0 {
|
|
f.scanErrors = nil
|
|
return
|
|
}
|
|
filtered := f.scanErrors[:0]
|
|
outer:
|
|
for _, fe := range f.scanErrors {
|
|
for _, sub := range subDirs {
|
|
if fe.Path == sub || fs.IsParent(fe.Path, sub) {
|
|
continue outer
|
|
}
|
|
}
|
|
filtered = append(filtered, fe)
|
|
}
|
|
f.scanErrors = filtered
|
|
}
|
|
|
|
func (f *folder) Errors() []FileError {
|
|
f.scanErrorsMut.Lock()
|
|
defer f.scanErrorsMut.Unlock()
|
|
return append([]FileError{}, f.scanErrors...)
|
|
}
|
|
|
|
// ForceRescan marks the file such that it gets rehashed on next scan and then
|
|
// immediately executes that scan.
|
|
func (f *folder) ForceRescan(file protocol.FileInfo) error {
|
|
file.SetMustRescan(f.shortID)
|
|
f.fset.Update(protocol.LocalDeviceID, []protocol.FileInfo{file})
|
|
|
|
return f.Scan([]string{file.Name})
|
|
}
|
|
|
|
func (f *folder) updateLocalsFromScanning(fs []protocol.FileInfo) {
|
|
f.updateLocals(fs)
|
|
|
|
f.emitDiskChangeEvents(fs, events.LocalChangeDetected)
|
|
}
|
|
|
|
func (f *folder) updateLocalsFromPulling(fs []protocol.FileInfo) {
|
|
f.updateLocals(fs)
|
|
|
|
f.emitDiskChangeEvents(fs, events.RemoteChangeDetected)
|
|
}
|
|
|
|
func (f *folder) updateLocals(fs []protocol.FileInfo) {
|
|
f.fset.Update(protocol.LocalDeviceID, fs)
|
|
|
|
filenames := make([]string, len(fs))
|
|
for i, file := range fs {
|
|
filenames[i] = file.Name
|
|
}
|
|
|
|
f.evLogger.Log(events.LocalIndexUpdated, map[string]interface{}{
|
|
"folder": f.ID,
|
|
"items": len(fs),
|
|
"filenames": filenames,
|
|
"version": f.fset.Sequence(protocol.LocalDeviceID),
|
|
})
|
|
}
|
|
|
|
func (f *folder) emitDiskChangeEvents(fs []protocol.FileInfo, typeOfEvent events.EventType) {
|
|
for _, file := range fs {
|
|
if file.IsInvalid() {
|
|
continue
|
|
}
|
|
|
|
objType := "file"
|
|
action := "modified"
|
|
|
|
switch {
|
|
case file.IsDeleted():
|
|
action = "deleted"
|
|
|
|
// If our local vector is version 1 AND it is the only version
|
|
// vector so far seen for this file then it is a new file. Else if
|
|
// it is > 1 it's not new, and if it is 1 but another shortId
|
|
// version vector exists then it is new for us but created elsewhere
|
|
// so the file is still not new but modified by us. Only if it is
|
|
// truly new do we change this to 'added', else we leave it as
|
|
// 'modified'.
|
|
case len(file.Version.Counters) == 1 && file.Version.Counters[0].Value == 1:
|
|
action = "added"
|
|
}
|
|
|
|
if file.IsSymlink() {
|
|
objType = "symlink"
|
|
} else if file.IsDirectory() {
|
|
objType = "dir"
|
|
}
|
|
|
|
// Two different events can be fired here based on what EventType is passed into function
|
|
f.evLogger.Log(typeOfEvent, map[string]string{
|
|
"folder": f.ID,
|
|
"folderID": f.ID, // incorrect, deprecated, kept for historical compliance
|
|
"label": f.Label,
|
|
"action": action,
|
|
"type": objType,
|
|
"path": filepath.FromSlash(file.Name),
|
|
"modifiedBy": file.ModifiedBy.String(),
|
|
})
|
|
}
|
|
}
|
|
|
|
// The exists function is expected to return true for all known paths
|
|
// (excluding "" and ".")
|
|
func unifySubs(dirs []string, exists func(dir string) bool) []string {
|
|
if len(dirs) == 0 {
|
|
return nil
|
|
}
|
|
sort.Strings(dirs)
|
|
if dirs[0] == "" || dirs[0] == "." || dirs[0] == string(fs.PathSeparator) {
|
|
return nil
|
|
}
|
|
prev := "./" // Anything that can't be parent of a clean path
|
|
for i := 0; i < len(dirs); {
|
|
dir, err := fs.Canonicalize(dirs[i])
|
|
if err != nil {
|
|
l.Debugf("Skipping %v for scan: %s", dirs[i], err)
|
|
dirs = append(dirs[:i], dirs[i+1:]...)
|
|
continue
|
|
}
|
|
if dir == prev || fs.IsParent(dir, prev) {
|
|
dirs = append(dirs[:i], dirs[i+1:]...)
|
|
continue
|
|
}
|
|
parent := filepath.Dir(dir)
|
|
for parent != "." && parent != string(fs.PathSeparator) && !exists(parent) {
|
|
dir = parent
|
|
parent = filepath.Dir(dir)
|
|
}
|
|
dirs[i] = dir
|
|
prev = dir
|
|
i++
|
|
}
|
|
return dirs
|
|
}
|
|
|
|
type cFiler struct {
|
|
*db.Snapshot
|
|
}
|
|
|
|
// Implements scanner.CurrentFiler
|
|
func (cf cFiler) CurrentFile(file string) (protocol.FileInfo, bool) {
|
|
return cf.Get(protocol.LocalDeviceID, file)
|
|
}
|