syncthing/lib/model/folder.go
Jakob Borg 77970d5113
refactor: use modern Protobuf encoder (#9817)
At a high level, this is what I've done and why:

- I'm moving the protobuf generation for the `protocol`, `discovery` and
`db` packages to the modern alternatives, and using `buf` to generate
because it's nice and simple.
- After trying various approaches on how to integrate the new types with
the existing code, I opted for splitting off our own data model types
from the on-the-wire generated types. This means we can have a
`FileInfo` type with nicer ergonomics and lots of methods, while the
protobuf generated type stays clean and close to the wire protocol. It
does mean copying between the two when required, which certainly adds a
small amount of inefficiency. If we want to walk this back in the future
and use the raw generated type throughout, that's possible, this however
makes the refactor smaller (!) as it doesn't change everything about the
type for everyone at the same time.
- I have simply removed in cold blood a significant number of old
database migrations. These depended on previous generations of generated
messages of various kinds and were annoying to support in the new
fashion. The oldest supported database version now is the one from
Syncthing 1.9.0 from Sep 7, 2020.
- I changed config structs to be regular manually defined structs.

For the sake of discussion, some things I tried that turned out not to
work...

### Embedding / wrapping

Embedding the protobuf generated structs in our existing types as a data
container and keeping our methods and stuff:

```
package protocol

type FileInfo struct {
  *generated.FileInfo
}
```

This generates a lot of problems because the internal shape of the
generated struct is quite different (different names, different types,
more pointers), because initializing it doesn't work like you'd expect
(i.e., you end up with an embedded nil pointer and a panic), and because
the types of child types don't get wrapped. That is, even if we also
have a similar wrapper around a `Vector`, that's not the type you get
when accessing `someFileInfo.Version`, you get the `*generated.Vector`
that doesn't have methods, etc.

### Aliasing

```
package protocol

type FileInfo = generated.FileInfo
```

Doesn't help because you can't attach methods to it, plus all the above.

### Generating the types into the target package like we do now and
attaching methods

This fails because of the different shape of the generated type (as in
the embedding case above) plus the generated struct already has a bunch
of methods that we can't necessarily override properly (like `String()`
and a bunch of getters).

### Methods to functions

I considered just moving all the methods we attach to functions in a
specific package, so that for example

```
package protocol

func (f FileInfo) Equal(other FileInfo) bool
```

would become

```
package fileinfos

func Equal(a, b *generated.FileInfo) bool
```

and this would mostly work, but becomes quite verbose and cumbersome,
and somewhat limits discoverability (you can't see what methods are
available on the type in auto completions, etc). In the end I did this
in some cases, like in the database layer where a lot of things like
`func (fv *FileVersion) IsEmpty() bool` becomes `func fvIsEmpty(fv
*generated.FileVersion)` because they were anyway just internal methods.

Fixes #8247
2024-12-01 16:50:17 +01:00

1380 lines
35 KiB
Go

// Copyright (C) 2014 The Syncthing Authors.
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
// You can obtain one at https://mozilla.org/MPL/2.0/.
package model
import (
"context"
"errors"
"fmt"
"math/rand"
"path/filepath"
"sort"
"time"
"github.com/syncthing/syncthing/lib/config"
"github.com/syncthing/syncthing/lib/db"
"github.com/syncthing/syncthing/lib/events"
"github.com/syncthing/syncthing/lib/fs"
"github.com/syncthing/syncthing/lib/ignore"
"github.com/syncthing/syncthing/lib/locations"
"github.com/syncthing/syncthing/lib/osutil"
"github.com/syncthing/syncthing/lib/protocol"
"github.com/syncthing/syncthing/lib/scanner"
"github.com/syncthing/syncthing/lib/semaphore"
"github.com/syncthing/syncthing/lib/stats"
"github.com/syncthing/syncthing/lib/stringutil"
"github.com/syncthing/syncthing/lib/svcutil"
"github.com/syncthing/syncthing/lib/sync"
"github.com/syncthing/syncthing/lib/versioner"
"github.com/syncthing/syncthing/lib/watchaggregator"
)
// Arbitrary limit that triggers a warning on kqueue systems
const kqueueItemCountThreshold = 10000
type folder struct {
stateTracker
config.FolderConfiguration
*stats.FolderStatisticsReference
ioLimiter *semaphore.Semaphore
localFlags uint32
model *model
shortID protocol.ShortID
fset *db.FileSet
ignores *ignore.Matcher
mtimefs fs.Filesystem
modTimeWindow time.Duration
ctx context.Context // used internally, only accessible on serve lifetime
done chan struct{} // used externally, accessible regardless of serve
scanInterval time.Duration
scanTimer *time.Timer
scanDelay chan time.Duration
initialScanFinished chan struct{}
scanScheduled chan struct{}
versionCleanupInterval time.Duration
versionCleanupTimer *time.Timer
pullScheduled chan struct{}
pullPause time.Duration
pullFailTimer *time.Timer
scanErrors []FileError
pullErrors []FileError
errorsMut sync.Mutex
doInSyncChan chan syncRequest
forcedRescanRequested chan struct{}
forcedRescanPaths map[string]struct{}
forcedRescanPathsMut sync.Mutex
watchCancel context.CancelFunc
watchChan chan []string
restartWatchChan chan struct{}
watchErr error
watchMut sync.Mutex
puller puller
versioner versioner.Versioner
warnedKqueue bool
}
type syncRequest struct {
fn func() error
err chan error
}
type puller interface {
pull() (bool, error) // true when successful and should not be retried
}
func newFolder(model *model, fset *db.FileSet, ignores *ignore.Matcher, cfg config.FolderConfiguration, evLogger events.Logger, ioLimiter *semaphore.Semaphore, ver versioner.Versioner) folder {
f := folder{
stateTracker: newStateTracker(cfg.ID, evLogger),
FolderConfiguration: cfg,
FolderStatisticsReference: stats.NewFolderStatisticsReference(model.db, cfg.ID),
ioLimiter: ioLimiter,
model: model,
shortID: model.shortID,
fset: fset,
ignores: ignores,
mtimefs: cfg.Filesystem(fset),
modTimeWindow: cfg.ModTimeWindow(),
done: make(chan struct{}),
scanInterval: time.Duration(cfg.RescanIntervalS) * time.Second,
scanTimer: time.NewTimer(0), // The first scan should be done immediately.
scanDelay: make(chan time.Duration),
initialScanFinished: make(chan struct{}),
scanScheduled: make(chan struct{}, 1),
versionCleanupInterval: time.Duration(cfg.Versioning.CleanupIntervalS) * time.Second,
versionCleanupTimer: time.NewTimer(time.Duration(cfg.Versioning.CleanupIntervalS) * time.Second),
pullScheduled: make(chan struct{}, 1), // This needs to be 1-buffered so that we queue a pull if we're busy when it comes.
errorsMut: sync.NewMutex(),
doInSyncChan: make(chan syncRequest),
forcedRescanRequested: make(chan struct{}, 1),
forcedRescanPaths: make(map[string]struct{}),
forcedRescanPathsMut: sync.NewMutex(),
watchCancel: func() {},
restartWatchChan: make(chan struct{}, 1),
watchMut: sync.NewMutex(),
versioner: ver,
}
f.pullPause = f.pullBasePause()
f.pullFailTimer = time.NewTimer(0)
<-f.pullFailTimer.C
registerFolderMetrics(f.ID)
return f
}
func (f *folder) Serve(ctx context.Context) error {
f.model.foldersRunning.Add(1)
defer f.model.foldersRunning.Add(-1)
f.ctx = ctx
l.Debugln(f, "starting")
defer l.Debugln(f, "exiting")
defer func() {
f.scanTimer.Stop()
f.versionCleanupTimer.Stop()
f.setState(FolderIdle)
}()
if f.FSWatcherEnabled && f.getHealthErrorAndLoadIgnores() == nil {
f.startWatch()
}
// If we're configured to not do version cleanup, or we don't have a
// versioner, cancel and drain that timer now.
if f.versionCleanupInterval == 0 || f.versioner == nil {
if !f.versionCleanupTimer.Stop() {
<-f.versionCleanupTimer.C
}
}
initialCompleted := f.initialScanFinished
for {
var err error
select {
case <-f.ctx.Done():
close(f.done)
return nil
case <-f.pullScheduled:
_, err = f.pull()
case <-f.pullFailTimer.C:
var success bool
success, err = f.pull()
if (err != nil || !success) && f.pullPause < 60*f.pullBasePause() {
// Back off from retrying to pull
f.pullPause *= 2
}
case <-initialCompleted:
// Initial scan has completed, we should do a pull
initialCompleted = nil // never hit this case again
_, err = f.pull()
case <-f.forcedRescanRequested:
err = f.handleForcedRescans()
case <-f.scanTimer.C:
l.Debugln(f, "Scanning due to timer")
err = f.scanTimerFired()
case req := <-f.doInSyncChan:
l.Debugln(f, "Running something due to request")
err = req.fn()
req.err <- err
case next := <-f.scanDelay:
l.Debugln(f, "Delaying scan")
f.scanTimer.Reset(next)
case <-f.scanScheduled:
l.Debugln(f, "Scan was scheduled")
f.scanTimer.Reset(0)
case fsEvents := <-f.watchChan:
l.Debugln(f, "Scan due to watcher")
err = f.scanSubdirs(fsEvents)
case <-f.restartWatchChan:
l.Debugln(f, "Restart watcher")
err = f.restartWatch()
case <-f.versionCleanupTimer.C:
l.Debugln(f, "Doing version cleanup")
f.versionCleanupTimerFired()
}
if err != nil {
if svcutil.IsFatal(err) {
return err
}
f.setError(err)
}
}
}
func (*folder) BringToFront(string) {}
func (*folder) Override() {}
func (*folder) Revert() {}
func (f *folder) DelayScan(next time.Duration) {
select {
case f.scanDelay <- next:
case <-f.done:
}
}
func (f *folder) ScheduleScan() {
// 1-buffered chan
select {
case f.scanScheduled <- struct{}{}:
default:
}
}
func (f *folder) ignoresUpdated() {
if f.FSWatcherEnabled {
f.scheduleWatchRestart()
}
}
func (f *folder) SchedulePull() {
select {
case f.pullScheduled <- struct{}{}:
default:
// We might be busy doing a pull and thus not reading from this
// channel. The channel is 1-buffered, so one notification will be
// queued to ensure we recheck after the pull, but beyond that we must
// make sure to not block index receiving.
}
}
func (*folder) Jobs(_, _ int) ([]string, []string, int) {
return nil, nil, 0
}
func (f *folder) Scan(subdirs []string) error {
<-f.initialScanFinished
return f.doInSync(func() error { return f.scanSubdirs(subdirs) })
}
// doInSync allows to run functions synchronously in folder.serve from exported,
// asynchronously called methods.
func (f *folder) doInSync(fn func() error) error {
req := syncRequest{
fn: fn,
err: make(chan error, 1),
}
select {
case f.doInSyncChan <- req:
return <-req.err
case <-f.done:
return context.Canceled
}
}
func (f *folder) Reschedule() {
if f.scanInterval == 0 {
return
}
// Sleep a random time between 3/4 and 5/4 of the configured interval.
sleepNanos := (f.scanInterval.Nanoseconds()*3 + rand.Int63n(2*f.scanInterval.Nanoseconds())) / 4
interval := time.Duration(sleepNanos) * time.Nanosecond
l.Debugln(f, "next rescan in", interval)
f.scanTimer.Reset(interval)
}
func (f *folder) getHealthErrorAndLoadIgnores() error {
if err := f.getHealthErrorWithoutIgnores(); err != nil {
return err
}
if f.Type != config.FolderTypeReceiveEncrypted {
if err := f.ignores.Load(".stignore"); err != nil && !fs.IsNotExist(err) {
return fmt.Errorf("loading ignores: %w", err)
}
}
return nil
}
func (f *folder) getHealthErrorWithoutIgnores() error {
// Check for folder errors, with the most serious and specific first and
// generic ones like out of space on the home disk later.
if err := f.CheckPath(); err != nil {
return err
}
if minFree := f.model.cfg.Options().MinHomeDiskFree; minFree.Value > 0 {
dbPath := locations.Get(locations.Database)
if usage, err := fs.NewFilesystem(fs.FilesystemTypeBasic, dbPath).Usage("."); err == nil {
if err = config.CheckFreeSpace(minFree, usage); err != nil {
return fmt.Errorf("insufficient space on disk for database (%v): %w", dbPath, err)
}
}
}
return nil
}
func (f *folder) pull() (success bool, err error) {
f.pullFailTimer.Stop()
select {
case <-f.pullFailTimer.C:
default:
}
select {
case <-f.initialScanFinished:
default:
// Once the initial scan finished, a pull will be scheduled
return true, nil
}
defer func() {
if success {
// We're good, reset the pause interval.
f.pullPause = f.pullBasePause()
}
}()
// If there is nothing to do, don't even enter sync-waiting state.
abort := true
snap, err := f.dbSnapshot()
if err != nil {
return false, err
}
snap.WithNeed(protocol.LocalDeviceID, func(intf protocol.FileInfo) bool {
abort = false
return false
})
snap.Release()
if abort {
// Clears pull failures on items that were needed before, but aren't anymore.
f.errorsMut.Lock()
f.pullErrors = nil
f.errorsMut.Unlock()
return true, nil
}
// Abort early (before acquiring a token) if there's a folder error
err = f.getHealthErrorWithoutIgnores()
if err != nil {
l.Debugln("Skipping pull of", f.Description(), "due to folder error:", err)
return false, err
}
// Send only folder doesn't do any io, it only checks for out-of-sync
// items that differ in metadata and updates those.
if f.Type != config.FolderTypeSendOnly {
f.setState(FolderSyncWaiting)
if err := f.ioLimiter.TakeWithContext(f.ctx, 1); err != nil {
return true, err
}
defer f.ioLimiter.Give(1)
}
startTime := time.Now()
// Check if the ignore patterns changed.
oldHash := f.ignores.Hash()
defer func() {
if f.ignores.Hash() != oldHash {
f.ignoresUpdated()
}
}()
err = f.getHealthErrorAndLoadIgnores()
if err != nil {
l.Debugln("Skipping pull of", f.Description(), "due to folder error:", err)
return false, err
}
f.setError(nil)
success, err = f.puller.pull()
if success && err == nil {
return true, nil
}
// Pulling failed, try again later.
delay := f.pullPause + time.Since(startTime)
l.Infof("Folder %v isn't making sync progress - retrying in %v.", f.Description(), stringutil.NiceDurationString(delay))
f.pullFailTimer.Reset(delay)
return false, err
}
func (f *folder) scanSubdirs(subDirs []string) error {
l.Debugf("%v scanning", f)
oldHash := f.ignores.Hash()
err := f.getHealthErrorAndLoadIgnores()
if err != nil {
return err
}
f.setError(nil)
// Check on the way out if the ignore patterns changed as part of scanning
// this folder. If they did we should schedule a pull of the folder so that
// we request things we might have suddenly become unignored and so on.
defer func() {
if f.ignores.Hash() != oldHash {
l.Debugln("Folder", f.Description(), "ignore patterns change detected while scanning; triggering puller")
f.ignoresUpdated()
f.SchedulePull()
}
}()
f.setState(FolderScanWaiting)
defer f.setState(FolderIdle)
if err := f.ioLimiter.TakeWithContext(f.ctx, 1); err != nil {
return err
}
defer f.ioLimiter.Give(1)
metricFolderScans.WithLabelValues(f.ID).Inc()
ctx, cancel := context.WithCancel(f.ctx)
defer cancel()
go addTimeUntilCancelled(ctx, metricFolderScanSeconds.WithLabelValues(f.ID))
for i := range subDirs {
sub := osutil.NativeFilename(subDirs[i])
if sub == "" {
// A blank subdirs means to scan the entire folder. We can trim
// the subDirs list and go on our way.
subDirs = nil
break
}
subDirs[i] = sub
}
// Clean the list of subitems to ensure that we start at a known
// directory, and don't scan subdirectories of things we've already
// scanned.
snap, err := f.dbSnapshot()
if err != nil {
return err
}
subDirs = unifySubs(subDirs, func(file string) bool {
_, ok := snap.Get(protocol.LocalDeviceID, file)
return ok
})
snap.Release()
f.setState(FolderScanning)
f.clearScanErrors(subDirs)
batch := f.newScanBatch()
// Schedule a pull after scanning, but only if we actually detected any
// changes.
changes := 0
defer func() {
l.Debugf("%v finished scanning, detected %v changes", f, changes)
if changes > 0 {
f.SchedulePull()
}
}()
changesHere, err := f.scanSubdirsChangedAndNew(subDirs, batch)
changes += changesHere
if err != nil {
return err
}
if err := batch.Flush(); err != nil {
return err
}
if len(subDirs) == 0 {
// If we have no specific subdirectories to traverse, set it to one
// empty prefix so we traverse the entire folder contents once.
subDirs = []string{""}
}
// Do a scan of the database for each prefix, to check for deleted and
// ignored files.
changesHere, err = f.scanSubdirsDeletedAndIgnored(subDirs, batch)
changes += changesHere
if err != nil {
return err
}
if err := batch.Flush(); err != nil {
return err
}
f.ScanCompleted()
return nil
}
const maxToRemove = 1000
type scanBatch struct {
f *folder
updateBatch *db.FileInfoBatch
toRemove []string
}
func (f *folder) newScanBatch() *scanBatch {
b := &scanBatch{
f: f,
toRemove: make([]string, 0, maxToRemove),
}
b.updateBatch = db.NewFileInfoBatch(func(fs []protocol.FileInfo) error {
if err := b.f.getHealthErrorWithoutIgnores(); err != nil {
l.Debugf("Stopping scan of folder %s due to: %s", b.f.Description(), err)
return err
}
b.f.updateLocalsFromScanning(fs)
return nil
})
return b
}
func (b *scanBatch) Remove(item string) {
b.toRemove = append(b.toRemove, item)
}
func (b *scanBatch) flushToRemove() {
if len(b.toRemove) > 0 {
b.f.fset.RemoveLocalItems(b.toRemove)
b.toRemove = b.toRemove[:0]
}
}
func (b *scanBatch) Flush() error {
b.flushToRemove()
return b.updateBatch.Flush()
}
func (b *scanBatch) FlushIfFull() error {
if len(b.toRemove) >= maxToRemove {
b.flushToRemove()
}
return b.updateBatch.FlushIfFull()
}
// Update adds the fileinfo to the batch for updating, and does a few checks.
// It returns false if the checks result in the file not going to be updated or removed.
func (b *scanBatch) Update(fi protocol.FileInfo, snap *db.Snapshot) bool {
// Check for a "virtual" parent directory of encrypted files. We don't track
// it, but check if anything still exists within and delete it otherwise.
if b.f.Type == config.FolderTypeReceiveEncrypted && fi.IsDirectory() && protocol.IsEncryptedParent(fs.PathComponents(fi.Name)) {
if names, err := b.f.mtimefs.DirNames(fi.Name); err == nil && len(names) == 0 {
b.f.mtimefs.Remove(fi.Name)
}
return false
}
// Resolve receive-only items which are identical with the global state or
// the global item is our own receive-only item.
switch gf, ok := snap.GetGlobal(fi.Name); {
case !ok:
case gf.IsReceiveOnlyChanged():
if fi.IsDeleted() {
// Our item is deleted and the global item is our own receive only
// file. No point in keeping track of that.
b.Remove(fi.Name)
return true
}
case (b.f.Type == config.FolderTypeReceiveOnly || b.f.Type == config.FolderTypeReceiveEncrypted) &&
gf.IsEquivalentOptional(fi, protocol.FileInfoComparison{
ModTimeWindow: b.f.modTimeWindow,
IgnorePerms: b.f.IgnorePerms,
IgnoreBlocks: true,
IgnoreFlags: protocol.FlagLocalReceiveOnly,
IgnoreOwnership: !b.f.SyncOwnership && !b.f.SendOwnership,
IgnoreXattrs: !b.f.SyncXattrs && !b.f.SendXattrs,
}):
// What we have locally is equivalent to the global file.
l.Debugf("%v scanning: Merging identical locally changed item with global", b.f, fi)
fi = gf
}
b.updateBatch.Append(fi)
return true
}
func (f *folder) scanSubdirsChangedAndNew(subDirs []string, batch *scanBatch) (int, error) {
changes := 0
snap, err := f.dbSnapshot()
if err != nil {
return changes, err
}
defer snap.Release()
// If we return early e.g. due to a folder health error, the scan needs
// to be cancelled.
scanCtx, scanCancel := context.WithCancel(f.ctx)
defer scanCancel()
scanConfig := scanner.Config{
Folder: f.ID,
Subs: subDirs,
Matcher: f.ignores,
TempLifetime: time.Duration(f.model.cfg.Options().KeepTemporariesH) * time.Hour,
CurrentFiler: cFiler{snap},
Filesystem: f.mtimefs,
IgnorePerms: f.IgnorePerms,
AutoNormalize: f.AutoNormalize,
Hashers: f.model.numHashers(f.ID),
ShortID: f.shortID,
ProgressTickIntervalS: f.ScanProgressIntervalS,
LocalFlags: f.localFlags,
ModTimeWindow: f.modTimeWindow,
EventLogger: f.evLogger,
ScanOwnership: f.SendOwnership || f.SyncOwnership,
ScanXattrs: f.SendXattrs || f.SyncXattrs,
XattrFilter: f.XattrFilter,
}
var fchan chan scanner.ScanResult
if f.Type == config.FolderTypeReceiveEncrypted {
fchan = scanner.WalkWithoutHashing(scanCtx, scanConfig)
} else {
fchan = scanner.Walk(scanCtx, scanConfig)
}
alreadyUsedOrExisting := make(map[string]struct{})
for res := range fchan {
if res.Err != nil {
f.newScanError(res.Path, res.Err)
continue
}
if err := batch.FlushIfFull(); err != nil {
// Prevent a race between the scan aborting due to context
// cancellation and releasing the snapshot in defer here.
scanCancel()
for range fchan {
}
return changes, err
}
if batch.Update(res.File, snap) {
changes++
}
switch f.Type {
case config.FolderTypeReceiveOnly, config.FolderTypeReceiveEncrypted:
default:
if nf, ok := f.findRename(snap, res.File, alreadyUsedOrExisting); ok {
if batch.Update(nf, snap) {
changes++
}
}
}
}
return changes, nil
}
func (f *folder) scanSubdirsDeletedAndIgnored(subDirs []string, batch *scanBatch) (int, error) {
var toIgnore []protocol.FileInfo
ignoredParent := ""
changes := 0
snap, err := f.dbSnapshot()
if err != nil {
return 0, err
}
defer snap.Release()
for _, sub := range subDirs {
var iterError error
snap.WithPrefixedHaveTruncated(protocol.LocalDeviceID, sub, func(fi protocol.FileInfo) bool {
select {
case <-f.ctx.Done():
return false
default:
}
if err := batch.FlushIfFull(); err != nil {
iterError = err
return false
}
if ignoredParent != "" && !fs.IsParent(fi.Name, ignoredParent) {
for _, file := range toIgnore {
l.Debugln("marking file as ignored", file)
nf := file
nf.SetIgnored()
if batch.Update(nf, snap) {
changes++
}
if err := batch.FlushIfFull(); err != nil {
iterError = err
return false
}
}
toIgnore = toIgnore[:0]
ignoredParent = ""
}
switch ignored := f.ignores.Match(fi.Name).IsIgnored(); {
case fi.IsIgnored() && ignored:
return true
case !fi.IsIgnored() && ignored:
// File was not ignored at last pass but has been ignored.
if fi.IsDirectory() {
// Delay ignoring as a child might be unignored.
toIgnore = append(toIgnore, fi)
if ignoredParent == "" {
// If the parent wasn't ignored already, set
// this path as the "highest" ignored parent
ignoredParent = fi.Name
}
return true
}
l.Debugln("marking file as ignored", fi)
nf := fi
nf.SetIgnored()
if batch.Update(nf, snap) {
changes++
}
case fi.IsIgnored() && !ignored:
// Successfully scanned items are already un-ignored during
// the scan, so check whether it is deleted.
fallthrough
case !fi.IsIgnored() && !fi.IsDeleted() && !fi.IsUnsupported():
// The file is not ignored, deleted or unsupported. Lets check if
// it's still here. Simply stat:ing it won't do as there are
// tons of corner cases (e.g. parent dir->symlink, missing
// permissions)
if !osutil.IsDeleted(f.mtimefs, fi.Name) {
if ignoredParent != "" {
// Don't ignore parents of this not ignored item
toIgnore = toIgnore[:0]
ignoredParent = ""
}
return true
}
nf := fi
nf.SetDeleted(f.shortID)
nf.LocalFlags = f.localFlags
if fi.ShouldConflict() {
// We do not want to override the global version with
// the deleted file. Setting to an empty version makes
// sure the file gets in sync on the following pull.
nf.Version = protocol.Vector{}
}
l.Debugln("marking file as deleted", nf)
if batch.Update(nf, snap) {
changes++
}
case fi.IsDeleted() && fi.IsReceiveOnlyChanged():
switch f.Type {
case config.FolderTypeReceiveOnly, config.FolderTypeReceiveEncrypted:
switch gf, ok := snap.GetGlobal(fi.Name); {
case !ok:
case gf.IsReceiveOnlyChanged():
l.Debugln("removing deleted, receive-only item that is globally receive-only from db", fi)
batch.Remove(fi.Name)
changes++
case gf.IsDeleted():
// Our item is deleted and the global item is deleted too. We just
// pretend it is a normal deleted file (nobody cares about that).
l.Debugf("%v scanning: Marking globally deleted item as not locally changed: %v", f, fi.Name)
fi.LocalFlags &^= protocol.FlagLocalReceiveOnly
if batch.Update(fi, snap) {
changes++
}
}
default:
// No need to bump the version for a file that was and is
// deleted and just the folder type/local flags changed.
fi.LocalFlags &^= protocol.FlagLocalReceiveOnly
l.Debugln("removing receive-only flag on deleted item", fi)
if batch.Update(fi, snap) {
changes++
}
}
}
return true
})
select {
case <-f.ctx.Done():
return changes, f.ctx.Err()
default:
}
if iterError == nil && len(toIgnore) > 0 {
for _, file := range toIgnore {
l.Debugln("marking file as ignored", file)
nf := file
nf.SetIgnored()
if batch.Update(nf, snap) {
changes++
}
if iterError = batch.FlushIfFull(); iterError != nil {
break
}
}
toIgnore = toIgnore[:0]
}
if iterError != nil {
return changes, iterError
}
}
return changes, nil
}
func (f *folder) findRename(snap *db.Snapshot, file protocol.FileInfo, alreadyUsedOrExisting map[string]struct{}) (protocol.FileInfo, bool) {
if len(file.Blocks) == 0 || file.Size == 0 {
return protocol.FileInfo{}, false
}
found := false
nf := protocol.FileInfo{}
snap.WithBlocksHash(file.BlocksHash, func(fi protocol.FileInfo) bool {
select {
case <-f.ctx.Done():
return false
default:
}
if fi.Name == file.Name {
alreadyUsedOrExisting[fi.Name] = struct{}{}
return true
}
if _, ok := alreadyUsedOrExisting[fi.Name]; ok {
return true
}
if fi.ShouldConflict() {
return true
}
if f.ignores.Match(fi.Name).IsIgnored() {
return true
}
// Only check the size.
// No point checking block equality, as that uses BlocksHash comparison if that is set (which it will be).
// No point checking BlocksHash comparison as WithBlocksHash already does that.
if file.Size != fi.Size {
return true
}
alreadyUsedOrExisting[fi.Name] = struct{}{}
if !osutil.IsDeleted(f.mtimefs, fi.Name) {
return true
}
nf = fi
nf.SetDeleted(f.shortID)
nf.LocalFlags = f.localFlags
found = true
return false
})
return nf, found
}
func (f *folder) scanTimerFired() error {
err := f.scanSubdirs(nil)
select {
case <-f.initialScanFinished:
default:
status := "Completed"
if err != nil {
status = "Failed"
}
l.Infoln(status, "initial scan of", f.Type.String(), "folder", f.Description())
close(f.initialScanFinished)
}
f.Reschedule()
return err
}
func (f *folder) versionCleanupTimerFired() {
f.setState(FolderCleanWaiting)
defer f.setState(FolderIdle)
if err := f.ioLimiter.TakeWithContext(f.ctx, 1); err != nil {
return
}
defer f.ioLimiter.Give(1)
f.setState(FolderCleaning)
if err := f.versioner.Clean(f.ctx); err != nil {
l.Infoln("Failed to clean versions in %s: %v", f.Description(), err)
}
f.versionCleanupTimer.Reset(f.versionCleanupInterval)
}
func (f *folder) WatchError() error {
f.watchMut.Lock()
defer f.watchMut.Unlock()
return f.watchErr
}
// stopWatch immediately aborts watching and may be called asynchronously
func (f *folder) stopWatch() {
f.watchMut.Lock()
f.watchCancel()
f.watchMut.Unlock()
f.setWatchError(nil, 0)
}
// scheduleWatchRestart makes sure watching is restarted from the main for loop
// in a folder's Serve and thus may be called asynchronously (e.g. when ignores change).
func (f *folder) scheduleWatchRestart() {
select {
case f.restartWatchChan <- struct{}{}:
default:
// We might be busy doing a pull and thus not reading from this
// channel. The channel is 1-buffered, so one notification will be
// queued to ensure we recheck after the pull.
}
}
// restartWatch should only ever be called synchronously. If you want to use
// this asynchronously, you should probably use scheduleWatchRestart instead.
func (f *folder) restartWatch() error {
f.stopWatch()
f.startWatch()
return f.scanSubdirs(nil)
}
// startWatch should only ever be called synchronously. If you want to use
// this asynchronously, you should probably use scheduleWatchRestart instead.
func (f *folder) startWatch() {
ctx, cancel := context.WithCancel(f.ctx)
f.watchMut.Lock()
f.watchChan = make(chan []string)
f.watchCancel = cancel
f.watchMut.Unlock()
go f.monitorWatch(ctx)
}
// monitorWatch starts the filesystem watching and retries every minute on failure.
// It should not be used except in startWatch.
func (f *folder) monitorWatch(ctx context.Context) {
failTimer := time.NewTimer(0)
aggrCtx, aggrCancel := context.WithCancel(ctx)
var err error
var eventChan <-chan fs.Event
var errChan <-chan error
warnedOutside := false
var lastWatch time.Time
pause := time.Minute
// Subscribe to folder summaries only on kqueue systems, to warn about potential high resource usage
var summarySub events.Subscription
var summaryChan <-chan events.Event
if fs.WatchKqueue && !f.warnedKqueue {
summarySub = f.evLogger.Subscribe(events.FolderSummary)
summaryChan = summarySub.C()
}
defer func() {
aggrCancel() // aggrCancel might e re-assigned -> call within closure
if summaryChan != nil {
summarySub.Unsubscribe()
}
}()
for {
select {
case <-failTimer.C:
eventChan, errChan, err = f.mtimefs.Watch(".", f.ignores, ctx, f.IgnorePerms)
// We do this once per minute initially increased to
// max one hour in case of repeat failures.
f.scanOnWatchErr()
f.setWatchError(err, pause)
if err != nil {
failTimer.Reset(pause)
if pause < 60*time.Minute {
pause *= 2
}
continue
}
lastWatch = time.Now()
watchaggregator.Aggregate(aggrCtx, eventChan, f.watchChan, f.FolderConfiguration, f.model.cfg, f.evLogger)
l.Debugln("Started filesystem watcher for folder", f.Description())
case err = <-errChan:
var next time.Duration
if dur := time.Since(lastWatch); dur > pause {
pause = time.Minute
next = 0
} else {
next = pause - dur
if pause < 60*time.Minute {
pause *= 2
}
}
failTimer.Reset(next)
f.setWatchError(err, next)
// This error was previously a panic and should never occur, so generate
// a warning, but don't do it repetitively.
var errOutside *fs.ErrWatchEventOutsideRoot
if errors.As(err, &errOutside) {
if !warnedOutside {
l.Warnln(err)
warnedOutside = true
}
f.evLogger.Log(events.Failure, "watching for changes encountered an event outside of the filesystem root")
}
aggrCancel()
errChan = nil
aggrCtx, aggrCancel = context.WithCancel(ctx)
case ev := <-summaryChan:
if data, ok := ev.Data.(FolderSummaryEventData); !ok {
f.evLogger.Log(events.Failure, "Unexpected type of folder-summary event in folder.monitorWatch")
} else if data.Folder == f.folderID && data.Summary.LocalTotalItems-data.Summary.LocalDeleted > kqueueItemCountThreshold {
f.warnedKqueue = true
summarySub.Unsubscribe()
summaryChan = nil
l.Warnf("Filesystem watching (kqueue) is enabled on %v with a lot of files/directories, and that requires a lot of resources and might slow down your system significantly", f.Description())
}
case <-ctx.Done():
aggrCancel() // for good measure and keeping the linters happy
return
}
}
}
// setWatchError sets the current error state of the watch and should be called
// regardless of whether err is nil or not.
func (f *folder) setWatchError(err error, nextTryIn time.Duration) {
f.watchMut.Lock()
prevErr := f.watchErr
f.watchErr = err
f.watchMut.Unlock()
if err != prevErr {
data := map[string]interface{}{
"folder": f.ID,
}
if prevErr != nil {
data["from"] = prevErr.Error()
}
if err != nil {
data["to"] = err.Error()
}
f.evLogger.Log(events.FolderWatchStateChanged, data)
}
if err == nil {
return
}
msg := fmt.Sprintf("Error while trying to start filesystem watcher for folder %s, trying again in %v: %v", f.Description(), nextTryIn, err)
if prevErr != err {
l.Infof(msg)
return
}
l.Debugf(msg)
}
// scanOnWatchErr schedules a full scan immediately if an error occurred while watching.
func (f *folder) scanOnWatchErr() {
f.watchMut.Lock()
err := f.watchErr
f.watchMut.Unlock()
if err != nil {
f.DelayScan(0)
}
}
func (f *folder) setError(err error) {
select {
case <-f.ctx.Done():
return
default:
}
_, _, oldErr := f.getState()
if (err != nil && oldErr != nil && oldErr.Error() == err.Error()) || (err == nil && oldErr == nil) {
return
}
if err != nil {
if oldErr == nil {
l.Warnf("Error on folder %s: %v", f.Description(), err)
} else {
l.Infof("Error on folder %s changed: %q -> %q", f.Description(), oldErr, err)
}
} else {
l.Infoln("Cleared error on folder", f.Description())
f.SchedulePull()
}
if f.FSWatcherEnabled {
if err != nil {
f.stopWatch()
} else {
f.scheduleWatchRestart()
}
}
f.stateTracker.setError(err)
}
func (f *folder) pullBasePause() time.Duration {
if f.PullerPauseS == 0 {
return defaultPullerPause
}
return time.Duration(f.PullerPauseS) * time.Second
}
func (f *folder) String() string {
return fmt.Sprintf("%s/%s@%p", f.Type, f.folderID, f)
}
func (f *folder) newScanError(path string, err error) {
f.errorsMut.Lock()
l.Infof("Scanner (folder %s, item %q): %v", f.Description(), path, err)
f.scanErrors = append(f.scanErrors, FileError{
Err: err.Error(),
Path: path,
})
f.errorsMut.Unlock()
}
func (f *folder) clearScanErrors(subDirs []string) {
f.errorsMut.Lock()
defer f.errorsMut.Unlock()
if len(subDirs) == 0 {
f.scanErrors = nil
return
}
filtered := f.scanErrors[:0]
outer:
for _, fe := range f.scanErrors {
for _, sub := range subDirs {
if fe.Path == sub || fs.IsParent(fe.Path, sub) {
continue outer
}
}
filtered = append(filtered, fe)
}
f.scanErrors = filtered
}
func (f *folder) Errors() []FileError {
f.errorsMut.Lock()
defer f.errorsMut.Unlock()
scanLen := len(f.scanErrors)
errors := make([]FileError, scanLen+len(f.pullErrors))
copy(errors[:scanLen], f.scanErrors)
copy(errors[scanLen:], f.pullErrors)
sort.Sort(fileErrorList(errors))
return errors
}
// ScheduleForceRescan marks the file such that it gets rehashed on next scan, and schedules a scan.
func (f *folder) ScheduleForceRescan(path string) {
f.forcedRescanPathsMut.Lock()
f.forcedRescanPaths[path] = struct{}{}
f.forcedRescanPathsMut.Unlock()
select {
case f.forcedRescanRequested <- struct{}{}:
default:
}
}
func (f *folder) updateLocalsFromScanning(fs []protocol.FileInfo) {
f.updateLocals(fs)
f.emitDiskChangeEvents(fs, events.LocalChangeDetected)
}
func (f *folder) updateLocalsFromPulling(fs []protocol.FileInfo) {
f.updateLocals(fs)
f.emitDiskChangeEvents(fs, events.RemoteChangeDetected)
}
func (f *folder) updateLocals(fs []protocol.FileInfo) {
f.fset.Update(protocol.LocalDeviceID, fs)
filenames := make([]string, len(fs))
f.forcedRescanPathsMut.Lock()
for i, file := range fs {
filenames[i] = file.Name
// No need to rescan a file that was changed since anyway.
delete(f.forcedRescanPaths, file.Name)
}
f.forcedRescanPathsMut.Unlock()
seq := f.fset.Sequence(protocol.LocalDeviceID)
f.evLogger.Log(events.LocalIndexUpdated, map[string]interface{}{
"folder": f.ID,
"items": len(fs),
"filenames": filenames,
"sequence": seq,
"version": seq, // legacy for sequence
})
}
func (f *folder) emitDiskChangeEvents(fs []protocol.FileInfo, typeOfEvent events.EventType) {
for _, file := range fs {
if file.IsInvalid() {
continue
}
objType := "file"
action := "modified"
if file.IsDeleted() {
action = "deleted"
}
if file.IsSymlink() {
objType = "symlink"
} else if file.IsDirectory() {
objType = "dir"
}
// Two different events can be fired here based on what EventType is passed into function
f.evLogger.Log(typeOfEvent, map[string]string{
"folder": f.ID,
"folderID": f.ID, // incorrect, deprecated, kept for historical compliance
"label": f.Label,
"action": action,
"type": objType,
"path": filepath.FromSlash(file.Name),
"modifiedBy": file.ModifiedBy.String(),
})
}
}
func (f *folder) handleForcedRescans() error {
f.forcedRescanPathsMut.Lock()
paths := make([]string, 0, len(f.forcedRescanPaths))
for path := range f.forcedRescanPaths {
paths = append(paths, path)
}
f.forcedRescanPaths = make(map[string]struct{})
f.forcedRescanPathsMut.Unlock()
if len(paths) == 0 {
return nil
}
batch := db.NewFileInfoBatch(func(fs []protocol.FileInfo) error {
f.fset.Update(protocol.LocalDeviceID, fs)
return nil
})
snap, err := f.dbSnapshot()
if err != nil {
return err
}
defer snap.Release()
for _, path := range paths {
if err := batch.FlushIfFull(); err != nil {
return err
}
fi, ok := snap.Get(protocol.LocalDeviceID, path)
if !ok {
continue
}
fi.SetMustRescan()
batch.Append(fi)
}
if err = batch.Flush(); err != nil {
return err
}
return f.scanSubdirs(paths)
}
// dbSnapshots gets a snapshot from the fileset, and wraps any error
// in a svcutil.FatalErr.
func (f *folder) dbSnapshot() (*db.Snapshot, error) {
snap, err := f.fset.Snapshot()
if err != nil {
return nil, svcutil.AsFatalErr(err, svcutil.ExitError)
}
return snap, nil
}
// The exists function is expected to return true for all known paths
// (excluding "" and ".")
func unifySubs(dirs []string, exists func(dir string) bool) []string {
if len(dirs) == 0 {
return nil
}
sort.Strings(dirs)
if dirs[0] == "" || dirs[0] == "." || dirs[0] == string(fs.PathSeparator) {
return nil
}
prev := "./" // Anything that can't be parent of a clean path
for i := 0; i < len(dirs); {
dir, err := fs.Canonicalize(dirs[i])
if err != nil {
l.Debugf("Skipping %v for scan: %s", dirs[i], err)
dirs = append(dirs[:i], dirs[i+1:]...)
continue
}
if dir == prev || fs.IsParent(dir, prev) {
dirs = append(dirs[:i], dirs[i+1:]...)
continue
}
parent := filepath.Dir(dir)
for parent != "." && parent != string(fs.PathSeparator) && !exists(parent) {
dir = parent
parent = filepath.Dir(dir)
}
dirs[i] = dir
prev = dir
i++
}
return dirs
}
type cFiler struct {
*db.Snapshot
}
// Implements scanner.CurrentFiler
func (cf cFiler) CurrentFile(file string) (protocol.FileInfo, bool) {
return cf.Get(protocol.LocalDeviceID, file)
}