syncthing/lib/model/folder.go

1359 lines
35 KiB
Go

// Copyright (C) 2014 The Syncthing Authors.
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at https://mozilla.org/MPL/2.0/.
package model
import (
"context"
"fmt"
"math/rand"
"path/filepath"
"sort"
"sync/atomic"
"time"
"github.com/pkg/errors"
"github.com/syncthing/syncthing/lib/config"
"github.com/syncthing/syncthing/lib/db"
"github.com/syncthing/syncthing/lib/events"
"github.com/syncthing/syncthing/lib/fs"
"github.com/syncthing/syncthing/lib/ignore"
"github.com/syncthing/syncthing/lib/locations"
"github.com/syncthing/syncthing/lib/osutil"
"github.com/syncthing/syncthing/lib/protocol"
"github.com/syncthing/syncthing/lib/scanner"
"github.com/syncthing/syncthing/lib/stats"
"github.com/syncthing/syncthing/lib/svcutil"
"github.com/syncthing/syncthing/lib/sync"
"github.com/syncthing/syncthing/lib/util"
"github.com/syncthing/syncthing/lib/versioner"
"github.com/syncthing/syncthing/lib/watchaggregator"
)
// Arbitrary limit that triggers a warning on kqueue systems
const kqueueItemCountThreshold = 10000
type folder struct {
stateTracker
config.FolderConfiguration
*stats.FolderStatisticsReference
ioLimiter *util.Semaphore
localFlags uint32
model *model
shortID protocol.ShortID
fset *db.FileSet
ignores *ignore.Matcher
mtimefs fs.Filesystem
modTimeWindow time.Duration
ctx context.Context // used internally, only accessible on serve lifetime
done chan struct{} // used externally, accessible regardless of serve
scanInterval time.Duration
scanTimer *time.Timer
scanDelay chan time.Duration
initialScanFinished chan struct{}
scanScheduled chan struct{}
versionCleanupInterval time.Duration
versionCleanupTimer *time.Timer
pullScheduled chan struct{}
pullPause time.Duration
pullFailTimer *time.Timer
scanErrors []FileError
pullErrors []FileError
errorsMut sync.Mutex
doInSyncChan chan syncRequest
forcedRescanRequested chan struct{}
forcedRescanPaths map[string]struct{}
forcedRescanPathsMut sync.Mutex
watchCancel context.CancelFunc
watchChan chan []string
restartWatchChan chan struct{}
watchErr error
watchMut sync.Mutex
puller puller
versioner versioner.Versioner
warnedKqueue bool
}
type syncRequest struct {
fn func() error
err chan error
}
type puller interface {
pull() (bool, error) // true when successful and should not be retried
}
func newFolder(model *model, fset *db.FileSet, ignores *ignore.Matcher, cfg config.FolderConfiguration, evLogger events.Logger, ioLimiter *util.Semaphore, ver versioner.Versioner) folder {
f := folder{
stateTracker: newStateTracker(cfg.ID, evLogger),
FolderConfiguration: cfg,
FolderStatisticsReference: stats.NewFolderStatisticsReference(model.db, cfg.ID),
ioLimiter: ioLimiter,
model: model,
shortID: model.shortID,
fset: fset,
ignores: ignores,
mtimefs: cfg.Filesystem(fset),
modTimeWindow: cfg.ModTimeWindow(),
done: make(chan struct{}),
scanInterval: time.Duration(cfg.RescanIntervalS) * time.Second,
scanTimer: time.NewTimer(0), // The first scan should be done immediately.
scanDelay: make(chan time.Duration),
initialScanFinished: make(chan struct{}),
scanScheduled: make(chan struct{}, 1),
versionCleanupInterval: time.Duration(cfg.Versioning.CleanupIntervalS) * time.Second,
versionCleanupTimer: time.NewTimer(time.Duration(cfg.Versioning.CleanupIntervalS) * time.Second),
pullScheduled: make(chan struct{}, 1), // This needs to be 1-buffered so that we queue a pull if we're busy when it comes.
errorsMut: sync.NewMutex(),
doInSyncChan: make(chan syncRequest),
forcedRescanRequested: make(chan struct{}, 1),
forcedRescanPaths: make(map[string]struct{}),
forcedRescanPathsMut: sync.NewMutex(),
watchCancel: func() {},
restartWatchChan: make(chan struct{}, 1),
watchMut: sync.NewMutex(),
versioner: ver,
}
f.pullPause = f.pullBasePause()
f.pullFailTimer = time.NewTimer(0)
<-f.pullFailTimer.C
return f
}
func (f *folder) Serve(ctx context.Context) error {
atomic.AddInt32(&f.model.foldersRunning, 1)
defer atomic.AddInt32(&f.model.foldersRunning, -1)
f.ctx = ctx
l.Debugln(f, "starting")
defer l.Debugln(f, "exiting")
defer func() {
f.scanTimer.Stop()
f.versionCleanupTimer.Stop()
f.setState(FolderIdle)
}()
if f.FSWatcherEnabled && f.getHealthErrorAndLoadIgnores() == nil {
f.startWatch()
}
// If we're configured to not do version cleanup, or we don't have a
// versioner, cancel and drain that timer now.
if f.versionCleanupInterval == 0 || f.versioner == nil {
if !f.versionCleanupTimer.Stop() {
<-f.versionCleanupTimer.C
}
}
initialCompleted := f.initialScanFinished
for {
var err error
select {
case <-f.ctx.Done():
close(f.done)
return nil
case <-f.pullScheduled:
_, err = f.pull()
case <-f.pullFailTimer.C:
var success bool
success, err = f.pull()
if (err != nil || !success) && f.pullPause < 60*f.pullBasePause() {
// Back off from retrying to pull
f.pullPause *= 2
}
case <-initialCompleted:
// Initial scan has completed, we should do a pull
initialCompleted = nil // never hit this case again
_, err = f.pull()
case <-f.forcedRescanRequested:
err = f.handleForcedRescans()
case <-f.scanTimer.C:
l.Debugln(f, "Scanning due to timer")
err = f.scanTimerFired()
case req := <-f.doInSyncChan:
l.Debugln(f, "Running something due to request")
err = req.fn()
req.err <- err
case next := <-f.scanDelay:
l.Debugln(f, "Delaying scan")
f.scanTimer.Reset(next)
case <-f.scanScheduled:
l.Debugln(f, "Scan was scheduled")
f.scanTimer.Reset(0)
case fsEvents := <-f.watchChan:
l.Debugln(f, "Scan due to watcher")
err = f.scanSubdirs(fsEvents)
case <-f.restartWatchChan:
l.Debugln(f, "Restart watcher")
err = f.restartWatch()
case <-f.versionCleanupTimer.C:
l.Debugln(f, "Doing version cleanup")
f.versionCleanupTimerFired()
}
if err != nil {
if svcutil.IsFatal(err) {
return err
}
f.setError(err)
}
}
}
func (f *folder) BringToFront(string) {}
func (f *folder) Override() {}
func (f *folder) Revert() {}
func (f *folder) DelayScan(next time.Duration) {
select {
case f.scanDelay <- next:
case <-f.done:
}
}
func (f *folder) ScheduleScan() {
// 1-buffered chan
select {
case f.scanScheduled <- struct{}{}:
default:
}
}
func (f *folder) ignoresUpdated() {
if f.FSWatcherEnabled {
f.scheduleWatchRestart()
}
}
func (f *folder) SchedulePull() {
select {
case f.pullScheduled <- struct{}{}:
default:
// We might be busy doing a pull and thus not reading from this
// channel. The channel is 1-buffered, so one notification will be
// queued to ensure we recheck after the pull, but beyond that we must
// make sure to not block index receiving.
}
}
func (f *folder) Jobs(_, _ int) ([]string, []string, int) {
return nil, nil, 0
}
func (f *folder) Scan(subdirs []string) error {
<-f.initialScanFinished
return f.doInSync(func() error { return f.scanSubdirs(subdirs) })
}
// doInSync allows to run functions synchronously in folder.serve from exported,
// asynchronously called methods.
func (f *folder) doInSync(fn func() error) error {
req := syncRequest{
fn: fn,
err: make(chan error, 1),
}
select {
case f.doInSyncChan <- req:
return <-req.err
case <-f.done:
return context.Canceled
}
}
func (f *folder) Reschedule() {
if f.scanInterval == 0 {
return
}
// Sleep a random time between 3/4 and 5/4 of the configured interval.
sleepNanos := (f.scanInterval.Nanoseconds()*3 + rand.Int63n(2*f.scanInterval.Nanoseconds())) / 4
interval := time.Duration(sleepNanos) * time.Nanosecond
l.Debugln(f, "next rescan in", interval)
f.scanTimer.Reset(interval)
}
func (f *folder) getHealthErrorAndLoadIgnores() error {
if err := f.getHealthErrorWithoutIgnores(); err != nil {
return err
}
if f.Type != config.FolderTypeReceiveEncrypted {
if err := f.ignores.Load(".stignore"); err != nil && !fs.IsNotExist(err) {
return errors.Wrap(err, "loading ignores")
}
}
return nil
}
func (f *folder) getHealthErrorWithoutIgnores() error {
// Check for folder errors, with the most serious and specific first and
// generic ones like out of space on the home disk later.
if err := f.CheckPath(); err != nil {
return err
}
dbPath := locations.Get(locations.Database)
if usage, err := fs.NewFilesystem(fs.FilesystemTypeBasic, dbPath).Usage("."); err == nil {
if err = config.CheckFreeSpace(f.model.cfg.Options().MinHomeDiskFree, usage); err != nil {
return fmt.Errorf("insufficient space on disk for database (%v): %w", dbPath, err)
}
}
return nil
}
func (f *folder) pull() (success bool, err error) {
f.pullFailTimer.Stop()
select {
case <-f.pullFailTimer.C:
default:
}
select {
case <-f.initialScanFinished:
default:
// Once the initial scan finished, a pull will be scheduled
return true, nil
}
defer func() {
if success {
// We're good, reset the pause interval.
f.pullPause = f.pullBasePause()
}
}()
// If there is nothing to do, don't even enter sync-waiting state.
abort := true
snap, err := f.dbSnapshot()
if err != nil {
return false, err
}
snap.WithNeed(protocol.LocalDeviceID, func(intf protocol.FileIntf) bool {
abort = false
return false
})
snap.Release()
if abort {
// Clears pull failures on items that were needed before, but aren't anymore.
f.errorsMut.Lock()
f.pullErrors = nil
f.errorsMut.Unlock()
return true, nil
}
// Abort early (before acquiring a token) if there's a folder error
err = f.getHealthErrorWithoutIgnores()
if err != nil {
l.Debugln("Skipping pull of", f.Description(), "due to folder error:", err)
return false, err
}
// Send only folder doesn't do any io, it only checks for out-of-sync
// items that differ in metadata and updates those.
if f.Type != config.FolderTypeSendOnly {
f.setState(FolderSyncWaiting)
if err := f.ioLimiter.TakeWithContext(f.ctx, 1); err != nil {
return true, err
}
defer f.ioLimiter.Give(1)
}
startTime := time.Now()
// Check if the ignore patterns changed.
oldHash := f.ignores.Hash()
defer func() {
if f.ignores.Hash() != oldHash {
f.ignoresUpdated()
}
}()
err = f.getHealthErrorAndLoadIgnores()
if err != nil {
l.Debugln("Skipping pull of", f.Description(), "due to folder error:", err)
return false, err
}
f.setError(nil)
success, err = f.puller.pull()
if success && err == nil {
return true, nil
}
// Pulling failed, try again later.
delay := f.pullPause + time.Since(startTime)
l.Infof("Folder %v isn't making sync progress - retrying in %v.", f.Description(), util.NiceDurationString(delay))
f.pullFailTimer.Reset(delay)
return false, err
}
func (f *folder) scanSubdirs(subDirs []string) error {
l.Debugf("%v scanning", f)
oldHash := f.ignores.Hash()
err := f.getHealthErrorAndLoadIgnores()
if err != nil {
return err
}
f.setError(nil)
// Check on the way out if the ignore patterns changed as part of scanning
// this folder. If they did we should schedule a pull of the folder so that
// we request things we might have suddenly become unignored and so on.
defer func() {
if f.ignores.Hash() != oldHash {
l.Debugln("Folder", f.Description(), "ignore patterns change detected while scanning; triggering puller")
f.ignoresUpdated()
f.SchedulePull()
}
}()
f.setState(FolderScanWaiting)
defer f.setState(FolderIdle)
if err := f.ioLimiter.TakeWithContext(f.ctx, 1); err != nil {
return err
}
defer f.ioLimiter.Give(1)
for i := range subDirs {
sub := osutil.NativeFilename(subDirs[i])
if sub == "" {
// A blank subdirs means to scan the entire folder. We can trim
// the subDirs list and go on our way.
subDirs = nil
break
}
subDirs[i] = sub
}
// Clean the list of subitems to ensure that we start at a known
// directory, and don't scan subdirectories of things we've already
// scanned.
snap, err := f.dbSnapshot()
if err != nil {
return err
}
subDirs = unifySubs(subDirs, func(file string) bool {
_, ok := snap.Get(protocol.LocalDeviceID, file)
return ok
})
snap.Release()
f.setState(FolderScanning)
f.clearScanErrors(subDirs)
batch := f.newScanBatch()
// Schedule a pull after scanning, but only if we actually detected any
// changes.
changes := 0
defer func() {
l.Debugf("%v finished scanning, detected %v changes", f, changes)
if changes > 0 {
f.SchedulePull()
}
}()
changesHere, err := f.scanSubdirsChangedAndNew(subDirs, batch)
changes += changesHere
if err != nil {
return err
}
if err := batch.Flush(); err != nil {
return err
}
if len(subDirs) == 0 {
// If we have no specific subdirectories to traverse, set it to one
// empty prefix so we traverse the entire folder contents once.
subDirs = []string{""}
}
// Do a scan of the database for each prefix, to check for deleted and
// ignored files.
changesHere, err = f.scanSubdirsDeletedAndIgnored(subDirs, batch)
changes += changesHere
if err != nil {
return err
}
if err := batch.Flush(); err != nil {
return err
}
f.ScanCompleted()
return nil
}
const maxToRemove = 1000
type scanBatch struct {
f *folder
updateBatch *db.FileInfoBatch
toRemove []string
}
func (f *folder) newScanBatch() *scanBatch {
b := &scanBatch{
f: f,
toRemove: make([]string, 0, maxToRemove),
}
b.updateBatch = db.NewFileInfoBatch(func(fs []protocol.FileInfo) error {
if err := b.f.getHealthErrorWithoutIgnores(); err != nil {
l.Debugf("Stopping scan of folder %s due to: %s", b.f.Description(), err)
return err
}
b.f.updateLocalsFromScanning(fs)
return nil
})
return b
}
func (b *scanBatch) Remove(item string) {
b.toRemove = append(b.toRemove, item)
}
func (b *scanBatch) flushToRemove() {
if len(b.toRemove) > 0 {
b.f.fset.RemoveLocalItems(b.toRemove)
b.toRemove = b.toRemove[:0]
}
}
func (b *scanBatch) Flush() error {
b.flushToRemove()
return b.updateBatch.Flush()
}
func (b *scanBatch) FlushIfFull() error {
if len(b.toRemove) >= maxToRemove {
b.flushToRemove()
}
return b.updateBatch.FlushIfFull()
}
// Update adds the fileinfo to the batch for updating, and does a few checks.
// It returns false if the checks result in the file not going to be updated or removed.
func (b *scanBatch) Update(fi protocol.FileInfo, snap *db.Snapshot) bool {
// Check for a "virtual" parent directory of encrypted files. We don't track
// it, but check if anything still exists within and delete it otherwise.
if b.f.Type == config.FolderTypeReceiveEncrypted && fi.IsDirectory() && protocol.IsEncryptedParent(fs.PathComponents(fi.Name)) {
if names, err := b.f.mtimefs.DirNames(fi.Name); err == nil && len(names) == 0 {
b.f.mtimefs.Remove(fi.Name)
}
return false
}
// Resolve receive-only items which are identical with the global state or
// the global item is our own receive-only item.
switch gf, ok := snap.GetGlobal(fi.Name); {
case !ok:
case gf.IsReceiveOnlyChanged():
if fi.IsDeleted() {
// Our item is deleted and the global item is our own receive only
// file. No point in keeping track of that.
b.Remove(fi.Name)
return true
}
case gf.IsEquivalentOptional(fi, b.f.modTimeWindow, false, false, protocol.FlagLocalReceiveOnly):
// What we have locally is equivalent to the global file.
l.Debugf("%v scanning: Merging identical locally changed item with global", b.f, fi)
fi = gf
}
b.updateBatch.Append(fi)
return true
}
func (f *folder) scanSubdirsChangedAndNew(subDirs []string, batch *scanBatch) (int, error) {
changes := 0
snap, err := f.dbSnapshot()
if err != nil {
return changes, err
}
defer snap.Release()
// If we return early e.g. due to a folder health error, the scan needs
// to be cancelled.
scanCtx, scanCancel := context.WithCancel(f.ctx)
defer scanCancel()
scanConfig := scanner.Config{
Folder: f.ID,
Subs: subDirs,
Matcher: f.ignores,
TempLifetime: time.Duration(f.model.cfg.Options().KeepTemporariesH) * time.Hour,
CurrentFiler: cFiler{snap},
Filesystem: f.mtimefs,
IgnorePerms: f.IgnorePerms,
AutoNormalize: f.AutoNormalize,
Hashers: f.model.numHashers(f.ID),
ShortID: f.shortID,
ProgressTickIntervalS: f.ScanProgressIntervalS,
LocalFlags: f.localFlags,
ModTimeWindow: f.modTimeWindow,
EventLogger: f.evLogger,
}
var fchan chan scanner.ScanResult
if f.Type == config.FolderTypeReceiveEncrypted {
fchan = scanner.WalkWithoutHashing(scanCtx, scanConfig)
} else {
fchan = scanner.Walk(scanCtx, scanConfig)
}
alreadyUsedOrExisting := make(map[string]struct{})
for res := range fchan {
if res.Err != nil {
f.newScanError(res.Path, res.Err)
continue
}
if err := batch.FlushIfFull(); err != nil {
// Prevent a race between the scan aborting due to context
// cancellation and releasing the snapshot in defer here.
scanCancel()
for range fchan {
}
return changes, err
}
if batch.Update(res.File, snap) {
changes++
}
switch f.Type {
case config.FolderTypeReceiveOnly, config.FolderTypeReceiveEncrypted:
default:
if nf, ok := f.findRename(snap, res.File, alreadyUsedOrExisting); ok {
if batch.Update(nf, snap) {
changes++
}
}
}
}
return changes, nil
}
func (f *folder) scanSubdirsDeletedAndIgnored(subDirs []string, batch *scanBatch) (int, error) {
var toIgnore []db.FileInfoTruncated
ignoredParent := ""
changes := 0
snap, err := f.dbSnapshot()
if err != nil {
return 0, err
}
defer snap.Release()
for _, sub := range subDirs {
var iterError error
snap.WithPrefixedHaveTruncated(protocol.LocalDeviceID, sub, func(fi protocol.FileIntf) bool {
select {
case <-f.ctx.Done():
return false
default:
}
file := fi.(db.FileInfoTruncated)
if err := batch.FlushIfFull(); err != nil {
iterError = err
return false
}
if ignoredParent != "" && !fs.IsParent(file.Name, ignoredParent) {
for _, file := range toIgnore {
l.Debugln("marking file as ignored", file)
nf := file.ConvertToIgnoredFileInfo()
if batch.Update(nf, snap) {
changes++
}
if err := batch.FlushIfFull(); err != nil {
iterError = err
return false
}
}
toIgnore = toIgnore[:0]
ignoredParent = ""
}
switch ignored := f.ignores.Match(file.Name).IsIgnored(); {
case file.IsIgnored() && ignored:
return true
case !file.IsIgnored() && ignored:
// File was not ignored at last pass but has been ignored.
if file.IsDirectory() {
// Delay ignoring as a child might be unignored.
toIgnore = append(toIgnore, file)
if ignoredParent == "" {
// If the parent wasn't ignored already, set
// this path as the "highest" ignored parent
ignoredParent = file.Name
}
return true
}
l.Debugln("marking file as ignored", file)
nf := file.ConvertToIgnoredFileInfo()
if batch.Update(nf, snap) {
changes++
}
case file.IsIgnored() && !ignored:
// Successfully scanned items are already un-ignored during
// the scan, so check whether it is deleted.
fallthrough
case !file.IsIgnored() && !file.IsDeleted() && !file.IsUnsupported():
// The file is not ignored, deleted or unsupported. Lets check if
// it's still here. Simply stat:ing it wont do as there are
// tons of corner cases (e.g. parent dir->symlink, missing
// permissions)
if !osutil.IsDeleted(f.mtimefs, file.Name) {
if ignoredParent != "" {
// Don't ignore parents of this not ignored item
toIgnore = toIgnore[:0]
ignoredParent = ""
}
return true
}
nf := file.ConvertToDeletedFileInfo(f.shortID)
nf.LocalFlags = f.localFlags
if file.ShouldConflict() {
// We do not want to override the global version with
// the deleted file. Setting to an empty version makes
// sure the file gets in sync on the following pull.
nf.Version = protocol.Vector{}
}
l.Debugln("marking file as deleted", nf)
if batch.Update(nf, snap) {
changes++
}
case file.IsDeleted() && file.IsReceiveOnlyChanged():
switch f.Type {
case config.FolderTypeReceiveOnly, config.FolderTypeReceiveEncrypted:
switch gf, ok := snap.GetGlobal(file.Name); {
case !ok:
case gf.IsReceiveOnlyChanged():
l.Debugln("removing deleted, receive-only item that is globally receive-only from db", file)
batch.Remove(file.Name)
changes++
case gf.IsDeleted():
// Our item is deleted and the global item is deleted too. We just
// pretend it is a normal deleted file (nobody cares about that).
l.Debugf("%v scanning: Marking globally deleted item as not locally changed: %v", f, file.Name)
file.LocalFlags &^= protocol.FlagLocalReceiveOnly
if batch.Update(file.ConvertDeletedToFileInfo(), snap) {
changes++
}
}
default:
// No need to bump the version for a file that was and is
// deleted and just the folder type/local flags changed.
file.LocalFlags &^= protocol.FlagLocalReceiveOnly
l.Debugln("removing receive-only flag on deleted item", file)
if batch.Update(file.ConvertDeletedToFileInfo(), snap) {
changes++
}
}
}
return true
})
select {
case <-f.ctx.Done():
return changes, f.ctx.Err()
default:
}
if iterError == nil && len(toIgnore) > 0 {
for _, file := range toIgnore {
l.Debugln("marking file as ignored", f)
nf := file.ConvertToIgnoredFileInfo()
if batch.Update(nf, snap) {
changes++
}
if iterError = batch.FlushIfFull(); iterError != nil {
break
}
}
toIgnore = toIgnore[:0]
}
if iterError != nil {
return changes, iterError
}
}
return changes, nil
}
func (f *folder) findRename(snap *db.Snapshot, file protocol.FileInfo, alreadyUsedOrExisting map[string]struct{}) (protocol.FileInfo, bool) {
if len(file.Blocks) == 0 || file.Size == 0 {
return protocol.FileInfo{}, false
}
found := false
nf := protocol.FileInfo{}
snap.WithBlocksHash(file.BlocksHash, func(ifi protocol.FileIntf) bool {
fi := ifi.(protocol.FileInfo)
select {
case <-f.ctx.Done():
return false
default:
}
if fi.Name == file.Name {
alreadyUsedOrExisting[fi.Name] = struct{}{}
return true
}
if _, ok := alreadyUsedOrExisting[fi.Name]; ok {
return true
}
if fi.ShouldConflict() {
return true
}
if f.ignores.Match(fi.Name).IsIgnored() {
return true
}
// Only check the size.
// No point checking block equality, as that uses BlocksHash comparison if that is set (which it will be).
// No point checking BlocksHash comparison as WithBlocksHash already does that.
if file.Size != fi.Size {
return true
}
alreadyUsedOrExisting[fi.Name] = struct{}{}
if !osutil.IsDeleted(f.mtimefs, fi.Name) {
return true
}
nf = fi
nf.SetDeleted(f.shortID)
nf.LocalFlags = f.localFlags
found = true
return false
})
return nf, found
}
func (f *folder) scanTimerFired() error {
err := f.scanSubdirs(nil)
select {
case <-f.initialScanFinished:
default:
status := "Completed"
if err != nil {
status = "Failed"
}
l.Infoln(status, "initial scan of", f.Type.String(), "folder", f.Description())
close(f.initialScanFinished)
}
f.Reschedule()
return err
}
func (f *folder) versionCleanupTimerFired() {
f.setState(FolderCleanWaiting)
defer f.setState(FolderIdle)
if err := f.ioLimiter.TakeWithContext(f.ctx, 1); err != nil {
return
}
defer f.ioLimiter.Give(1)
f.setState(FolderCleaning)
if err := f.versioner.Clean(f.ctx); err != nil {
l.Infoln("Failed to clean versions in %s: %v", f.Description(), err)
}
f.versionCleanupTimer.Reset(f.versionCleanupInterval)
}
func (f *folder) WatchError() error {
f.watchMut.Lock()
defer f.watchMut.Unlock()
return f.watchErr
}
// stopWatch immediately aborts watching and may be called asynchronously
func (f *folder) stopWatch() {
f.watchMut.Lock()
f.watchCancel()
f.watchMut.Unlock()
f.setWatchError(nil, 0)
}
// scheduleWatchRestart makes sure watching is restarted from the main for loop
// in a folder's Serve and thus may be called asynchronously (e.g. when ignores change).
func (f *folder) scheduleWatchRestart() {
select {
case f.restartWatchChan <- struct{}{}:
default:
// We might be busy doing a pull and thus not reading from this
// channel. The channel is 1-buffered, so one notification will be
// queued to ensure we recheck after the pull.
}
}
// restartWatch should only ever be called synchronously. If you want to use
// this asynchronously, you should probably use scheduleWatchRestart instead.
func (f *folder) restartWatch() error {
f.stopWatch()
f.startWatch()
return f.scanSubdirs(nil)
}
// startWatch should only ever be called synchronously. If you want to use
// this asynchronously, you should probably use scheduleWatchRestart instead.
func (f *folder) startWatch() {
ctx, cancel := context.WithCancel(f.ctx)
f.watchMut.Lock()
f.watchChan = make(chan []string)
f.watchCancel = cancel
f.watchMut.Unlock()
go f.monitorWatch(ctx)
}
// monitorWatch starts the filesystem watching and retries every minute on failure.
// It should not be used except in startWatch.
func (f *folder) monitorWatch(ctx context.Context) {
failTimer := time.NewTimer(0)
aggrCtx, aggrCancel := context.WithCancel(ctx)
var err error
var eventChan <-chan fs.Event
var errChan <-chan error
warnedOutside := false
var lastWatch time.Time
pause := time.Minute
// Subscribe to folder summaries only on kqueue systems, to warn about potential high resource usage
var summarySub events.Subscription
var summaryChan <-chan events.Event
if fs.WatchKqueue && !f.warnedKqueue {
summarySub = f.evLogger.Subscribe(events.FolderSummary)
summaryChan = summarySub.C()
}
defer func() {
aggrCancel() // aggrCancel might e re-assigned -> call within closure
if summaryChan != nil {
summarySub.Unsubscribe()
}
}()
for {
select {
case <-failTimer.C:
eventChan, errChan, err = f.mtimefs.Watch(".", f.ignores, ctx, f.IgnorePerms)
// We do this once per minute initially increased to
// max one hour in case of repeat failures.
f.scanOnWatchErr()
f.setWatchError(err, pause)
if err != nil {
failTimer.Reset(pause)
if pause < 60*time.Minute {
pause *= 2
}
continue
}
lastWatch = time.Now()
watchaggregator.Aggregate(aggrCtx, eventChan, f.watchChan, f.FolderConfiguration, f.model.cfg, f.evLogger)
l.Debugln("Started filesystem watcher for folder", f.Description())
case err = <-errChan:
var next time.Duration
if dur := time.Since(lastWatch); dur > pause {
pause = time.Minute
next = 0
} else {
next = pause - dur
if pause < 60*time.Minute {
pause *= 2
}
}
failTimer.Reset(next)
f.setWatchError(err, next)
// This error was previously a panic and should never occur, so generate
// a warning, but don't do it repetitively.
var errOutside *fs.ErrWatchEventOutsideRoot
if errors.As(err, &errOutside) {
if !warnedOutside {
l.Warnln(err)
warnedOutside = true
}
f.evLogger.Log(events.Failure, "watching for changes encountered an event outside of the filesystem root")
}
aggrCancel()
errChan = nil
aggrCtx, aggrCancel = context.WithCancel(ctx)
case ev := <-summaryChan:
if data, ok := ev.Data.(FolderSummaryEventData); !ok {
f.evLogger.Log(events.Failure, "Unexpected type of folder-summary event in folder.monitorWatch")
} else if data.Summary.LocalTotalItems > kqueueItemCountThreshold {
f.warnedKqueue = true
summarySub.Unsubscribe()
summaryChan = nil
l.Warnf("Filesystem watching (kqueue) is enabled on %v with a lot of files/directories, and that requires a lot of resources and might slow down your system significantly", f.Description())
}
case <-ctx.Done():
return
}
}
}
// setWatchError sets the current error state of the watch and should be called
// regardless of whether err is nil or not.
func (f *folder) setWatchError(err error, nextTryIn time.Duration) {
f.watchMut.Lock()
prevErr := f.watchErr
f.watchErr = err
f.watchMut.Unlock()
if err != prevErr {
data := map[string]string{
"folder": f.ID,
}
if prevErr != nil {
data["from"] = prevErr.Error()
}
if err != nil {
data["to"] = err.Error()
}
f.evLogger.Log(events.FolderWatchStateChanged, data)
}
if err == nil {
return
}
msg := fmt.Sprintf("Error while trying to start filesystem watcher for folder %s, trying again in %v: %v", f.Description(), nextTryIn, err)
if prevErr != err {
l.Infof(msg)
return
}
l.Debugf(msg)
}
// scanOnWatchErr schedules a full scan immediately if an error occurred while watching.
func (f *folder) scanOnWatchErr() {
f.watchMut.Lock()
err := f.watchErr
f.watchMut.Unlock()
if err != nil {
f.DelayScan(0)
}
}
func (f *folder) setError(err error) {
select {
case <-f.ctx.Done():
return
default:
}
_, _, oldErr := f.getState()
if (err != nil && oldErr != nil && oldErr.Error() == err.Error()) || (err == nil && oldErr == nil) {
return
}
if err != nil {
if oldErr == nil {
l.Warnf("Error on folder %s: %v", f.Description(), err)
} else {
l.Infof("Error on folder %s changed: %q -> %q", f.Description(), oldErr, err)
}
} else {
l.Infoln("Cleared error on folder", f.Description())
f.SchedulePull()
}
if f.FSWatcherEnabled {
if err != nil {
f.stopWatch()
} else {
f.scheduleWatchRestart()
}
}
f.stateTracker.setError(err)
}
func (f *folder) pullBasePause() time.Duration {
if f.PullerPauseS == 0 {
return defaultPullerPause
}
return time.Duration(f.PullerPauseS) * time.Second
}
func (f *folder) String() string {
return fmt.Sprintf("%s/%s@%p", f.Type, f.folderID, f)
}
func (f *folder) newScanError(path string, err error) {
f.errorsMut.Lock()
l.Infof("Scanner (folder %s, item %q): %v", f.Description(), path, err)
f.scanErrors = append(f.scanErrors, FileError{
Err: err.Error(),
Path: path,
})
f.errorsMut.Unlock()
}
func (f *folder) clearScanErrors(subDirs []string) {
f.errorsMut.Lock()
defer f.errorsMut.Unlock()
if len(subDirs) == 0 {
f.scanErrors = nil
return
}
filtered := f.scanErrors[:0]
outer:
for _, fe := range f.scanErrors {
for _, sub := range subDirs {
if fe.Path == sub || fs.IsParent(fe.Path, sub) {
continue outer
}
}
filtered = append(filtered, fe)
}
f.scanErrors = filtered
}
func (f *folder) Errors() []FileError {
f.errorsMut.Lock()
defer f.errorsMut.Unlock()
scanLen := len(f.scanErrors)
errors := make([]FileError, scanLen+len(f.pullErrors))
copy(errors[:scanLen], f.scanErrors)
copy(errors[scanLen:], f.pullErrors)
sort.Sort(fileErrorList(errors))
return errors
}
// ScheduleForceRescan marks the file such that it gets rehashed on next scan, and schedules a scan.
func (f *folder) ScheduleForceRescan(path string) {
f.forcedRescanPathsMut.Lock()
f.forcedRescanPaths[path] = struct{}{}
f.forcedRescanPathsMut.Unlock()
select {
case f.forcedRescanRequested <- struct{}{}:
default:
}
}
func (f *folder) updateLocalsFromScanning(fs []protocol.FileInfo) {
f.updateLocals(fs)
f.emitDiskChangeEvents(fs, events.LocalChangeDetected)
}
func (f *folder) updateLocalsFromPulling(fs []protocol.FileInfo) {
f.updateLocals(fs)
f.emitDiskChangeEvents(fs, events.RemoteChangeDetected)
}
func (f *folder) updateLocals(fs []protocol.FileInfo) {
f.fset.Update(protocol.LocalDeviceID, fs)
filenames := make([]string, len(fs))
f.forcedRescanPathsMut.Lock()
for i, file := range fs {
filenames[i] = file.Name
// No need to rescan a file that was changed since anyway.
delete(f.forcedRescanPaths, file.Name)
}
f.forcedRescanPathsMut.Unlock()
seq := f.fset.Sequence(protocol.LocalDeviceID)
f.evLogger.Log(events.LocalIndexUpdated, map[string]interface{}{
"folder": f.ID,
"items": len(fs),
"filenames": filenames,
"sequence": seq,
"version": seq, // legacy for sequence
})
}
func (f *folder) emitDiskChangeEvents(fs []protocol.FileInfo, typeOfEvent events.EventType) {
for _, file := range fs {
if file.IsInvalid() {
continue
}
objType := "file"
action := "modified"
if file.IsDeleted() {
action = "deleted"
}
if file.IsSymlink() {
objType = "symlink"
} else if file.IsDirectory() {
objType = "dir"
}
// Two different events can be fired here based on what EventType is passed into function
f.evLogger.Log(typeOfEvent, map[string]string{
"folder": f.ID,
"folderID": f.ID, // incorrect, deprecated, kept for historical compliance
"label": f.Label,
"action": action,
"type": objType,
"path": filepath.FromSlash(file.Name),
"modifiedBy": file.ModifiedBy.String(),
})
}
}
func (f *folder) handleForcedRescans() error {
f.forcedRescanPathsMut.Lock()
paths := make([]string, 0, len(f.forcedRescanPaths))
for path := range f.forcedRescanPaths {
paths = append(paths, path)
}
f.forcedRescanPaths = make(map[string]struct{})
f.forcedRescanPathsMut.Unlock()
if len(paths) == 0 {
return nil
}
batch := db.NewFileInfoBatch(func(fs []protocol.FileInfo) error {
f.fset.Update(protocol.LocalDeviceID, fs)
return nil
})
snap, err := f.dbSnapshot()
if err != nil {
return err
}
defer snap.Release()
for _, path := range paths {
if err := batch.FlushIfFull(); err != nil {
return err
}
fi, ok := snap.Get(protocol.LocalDeviceID, path)
if !ok {
continue
}
fi.SetMustRescan()
batch.Append(fi)
}
if err = batch.Flush(); err != nil {
return err
}
return f.scanSubdirs(paths)
}
// dbSnapshots gets a snapshot from the fileset, and wraps any error
// in a svcutil.FatalErr.
func (f *folder) dbSnapshot() (*db.Snapshot, error) {
snap, err := f.fset.Snapshot()
if err != nil {
return nil, svcutil.AsFatalErr(err, svcutil.ExitError)
}
return snap, nil
}
// The exists function is expected to return true for all known paths
// (excluding "" and ".")
func unifySubs(dirs []string, exists func(dir string) bool) []string {
if len(dirs) == 0 {
return nil
}
sort.Strings(dirs)
if dirs[0] == "" || dirs[0] == "." || dirs[0] == string(fs.PathSeparator) {
return nil
}
prev := "./" // Anything that can't be parent of a clean path
for i := 0; i < len(dirs); {
dir, err := fs.Canonicalize(dirs[i])
if err != nil {
l.Debugf("Skipping %v for scan: %s", dirs[i], err)
dirs = append(dirs[:i], dirs[i+1:]...)
continue
}
if dir == prev || fs.IsParent(dir, prev) {
dirs = append(dirs[:i], dirs[i+1:]...)
continue
}
parent := filepath.Dir(dir)
for parent != "." && parent != string(fs.PathSeparator) && !exists(parent) {
dir = parent
parent = filepath.Dir(dir)
}
dirs[i] = dir
prev = dir
i++
}
return dirs
}
type cFiler struct {
*db.Snapshot
}
// Implements scanner.CurrentFiler
func (cf cFiler) CurrentFile(file string) (protocol.FileInfo, bool) {
return cf.Get(protocol.LocalDeviceID, file)
}