mirror of
https://github.com/octoleo/syncthing.git
synced 2024-12-22 02:48:59 +00:00
77970d5113
At a high level, this is what I've done and why: - I'm moving the protobuf generation for the `protocol`, `discovery` and `db` packages to the modern alternatives, and using `buf` to generate because it's nice and simple. - After trying various approaches on how to integrate the new types with the existing code, I opted for splitting off our own data model types from the on-the-wire generated types. This means we can have a `FileInfo` type with nicer ergonomics and lots of methods, while the protobuf generated type stays clean and close to the wire protocol. It does mean copying between the two when required, which certainly adds a small amount of inefficiency. If we want to walk this back in the future and use the raw generated type throughout, that's possible, this however makes the refactor smaller (!) as it doesn't change everything about the type for everyone at the same time. - I have simply removed in cold blood a significant number of old database migrations. These depended on previous generations of generated messages of various kinds and were annoying to support in the new fashion. The oldest supported database version now is the one from Syncthing 1.9.0 from Sep 7, 2020. - I changed config structs to be regular manually defined structs. For the sake of discussion, some things I tried that turned out not to work... ### Embedding / wrapping Embedding the protobuf generated structs in our existing types as a data container and keeping our methods and stuff: ``` package protocol type FileInfo struct { *generated.FileInfo } ``` This generates a lot of problems because the internal shape of the generated struct is quite different (different names, different types, more pointers), because initializing it doesn't work like you'd expect (i.e., you end up with an embedded nil pointer and a panic), and because the types of child types don't get wrapped. That is, even if we also have a similar wrapper around a `Vector`, that's not the type you get when accessing `someFileInfo.Version`, you get the `*generated.Vector` that doesn't have methods, etc. ### Aliasing ``` package protocol type FileInfo = generated.FileInfo ``` Doesn't help because you can't attach methods to it, plus all the above. ### Generating the types into the target package like we do now and attaching methods This fails because of the different shape of the generated type (as in the embedding case above) plus the generated struct already has a bunch of methods that we can't necessarily override properly (like `String()` and a bunch of getters). ### Methods to functions I considered just moving all the methods we attach to functions in a specific package, so that for example ``` package protocol func (f FileInfo) Equal(other FileInfo) bool ``` would become ``` package fileinfos func Equal(a, b *generated.FileInfo) bool ``` and this would mostly work, but becomes quite verbose and cumbersome, and somewhat limits discoverability (you can't see what methods are available on the type in auto completions, etc). In the end I did this in some cases, like in the database layer where a lot of things like `func (fv *FileVersion) IsEmpty() bool` becomes `func fvIsEmpty(fv *generated.FileVersion)` because they were anyway just internal methods. Fixes #8247
1380 lines
35 KiB
Go
1380 lines
35 KiB
Go
// Copyright (C) 2014 The Syncthing Authors.
|
|
//
|
|
// This Source Code Form is subject to the terms of the Mozilla Public
|
|
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
|
|
// You can obtain one at https://mozilla.org/MPL/2.0/.
|
|
|
|
package model
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"math/rand"
|
|
"path/filepath"
|
|
"sort"
|
|
"time"
|
|
|
|
"github.com/syncthing/syncthing/lib/config"
|
|
"github.com/syncthing/syncthing/lib/db"
|
|
"github.com/syncthing/syncthing/lib/events"
|
|
"github.com/syncthing/syncthing/lib/fs"
|
|
"github.com/syncthing/syncthing/lib/ignore"
|
|
"github.com/syncthing/syncthing/lib/locations"
|
|
"github.com/syncthing/syncthing/lib/osutil"
|
|
"github.com/syncthing/syncthing/lib/protocol"
|
|
"github.com/syncthing/syncthing/lib/scanner"
|
|
"github.com/syncthing/syncthing/lib/semaphore"
|
|
"github.com/syncthing/syncthing/lib/stats"
|
|
"github.com/syncthing/syncthing/lib/stringutil"
|
|
"github.com/syncthing/syncthing/lib/svcutil"
|
|
"github.com/syncthing/syncthing/lib/sync"
|
|
"github.com/syncthing/syncthing/lib/versioner"
|
|
"github.com/syncthing/syncthing/lib/watchaggregator"
|
|
)
|
|
|
|
// Arbitrary limit that triggers a warning on kqueue systems
|
|
const kqueueItemCountThreshold = 10000
|
|
|
|
type folder struct {
|
|
stateTracker
|
|
config.FolderConfiguration
|
|
*stats.FolderStatisticsReference
|
|
ioLimiter *semaphore.Semaphore
|
|
|
|
localFlags uint32
|
|
|
|
model *model
|
|
shortID protocol.ShortID
|
|
fset *db.FileSet
|
|
ignores *ignore.Matcher
|
|
mtimefs fs.Filesystem
|
|
modTimeWindow time.Duration
|
|
ctx context.Context // used internally, only accessible on serve lifetime
|
|
done chan struct{} // used externally, accessible regardless of serve
|
|
|
|
scanInterval time.Duration
|
|
scanTimer *time.Timer
|
|
scanDelay chan time.Duration
|
|
initialScanFinished chan struct{}
|
|
scanScheduled chan struct{}
|
|
versionCleanupInterval time.Duration
|
|
versionCleanupTimer *time.Timer
|
|
|
|
pullScheduled chan struct{}
|
|
pullPause time.Duration
|
|
pullFailTimer *time.Timer
|
|
|
|
scanErrors []FileError
|
|
pullErrors []FileError
|
|
errorsMut sync.Mutex
|
|
|
|
doInSyncChan chan syncRequest
|
|
|
|
forcedRescanRequested chan struct{}
|
|
forcedRescanPaths map[string]struct{}
|
|
forcedRescanPathsMut sync.Mutex
|
|
|
|
watchCancel context.CancelFunc
|
|
watchChan chan []string
|
|
restartWatchChan chan struct{}
|
|
watchErr error
|
|
watchMut sync.Mutex
|
|
|
|
puller puller
|
|
versioner versioner.Versioner
|
|
|
|
warnedKqueue bool
|
|
}
|
|
|
|
type syncRequest struct {
|
|
fn func() error
|
|
err chan error
|
|
}
|
|
|
|
type puller interface {
|
|
pull() (bool, error) // true when successful and should not be retried
|
|
}
|
|
|
|
func newFolder(model *model, fset *db.FileSet, ignores *ignore.Matcher, cfg config.FolderConfiguration, evLogger events.Logger, ioLimiter *semaphore.Semaphore, ver versioner.Versioner) folder {
|
|
f := folder{
|
|
stateTracker: newStateTracker(cfg.ID, evLogger),
|
|
FolderConfiguration: cfg,
|
|
FolderStatisticsReference: stats.NewFolderStatisticsReference(model.db, cfg.ID),
|
|
ioLimiter: ioLimiter,
|
|
|
|
model: model,
|
|
shortID: model.shortID,
|
|
fset: fset,
|
|
ignores: ignores,
|
|
mtimefs: cfg.Filesystem(fset),
|
|
modTimeWindow: cfg.ModTimeWindow(),
|
|
done: make(chan struct{}),
|
|
|
|
scanInterval: time.Duration(cfg.RescanIntervalS) * time.Second,
|
|
scanTimer: time.NewTimer(0), // The first scan should be done immediately.
|
|
scanDelay: make(chan time.Duration),
|
|
initialScanFinished: make(chan struct{}),
|
|
scanScheduled: make(chan struct{}, 1),
|
|
versionCleanupInterval: time.Duration(cfg.Versioning.CleanupIntervalS) * time.Second,
|
|
versionCleanupTimer: time.NewTimer(time.Duration(cfg.Versioning.CleanupIntervalS) * time.Second),
|
|
|
|
pullScheduled: make(chan struct{}, 1), // This needs to be 1-buffered so that we queue a pull if we're busy when it comes.
|
|
|
|
errorsMut: sync.NewMutex(),
|
|
|
|
doInSyncChan: make(chan syncRequest),
|
|
|
|
forcedRescanRequested: make(chan struct{}, 1),
|
|
forcedRescanPaths: make(map[string]struct{}),
|
|
forcedRescanPathsMut: sync.NewMutex(),
|
|
|
|
watchCancel: func() {},
|
|
restartWatchChan: make(chan struct{}, 1),
|
|
watchMut: sync.NewMutex(),
|
|
|
|
versioner: ver,
|
|
}
|
|
f.pullPause = f.pullBasePause()
|
|
f.pullFailTimer = time.NewTimer(0)
|
|
<-f.pullFailTimer.C
|
|
|
|
registerFolderMetrics(f.ID)
|
|
|
|
return f
|
|
}
|
|
|
|
func (f *folder) Serve(ctx context.Context) error {
|
|
f.model.foldersRunning.Add(1)
|
|
defer f.model.foldersRunning.Add(-1)
|
|
|
|
f.ctx = ctx
|
|
|
|
l.Debugln(f, "starting")
|
|
defer l.Debugln(f, "exiting")
|
|
|
|
defer func() {
|
|
f.scanTimer.Stop()
|
|
f.versionCleanupTimer.Stop()
|
|
f.setState(FolderIdle)
|
|
}()
|
|
|
|
if f.FSWatcherEnabled && f.getHealthErrorAndLoadIgnores() == nil {
|
|
f.startWatch()
|
|
}
|
|
|
|
// If we're configured to not do version cleanup, or we don't have a
|
|
// versioner, cancel and drain that timer now.
|
|
if f.versionCleanupInterval == 0 || f.versioner == nil {
|
|
if !f.versionCleanupTimer.Stop() {
|
|
<-f.versionCleanupTimer.C
|
|
}
|
|
}
|
|
|
|
initialCompleted := f.initialScanFinished
|
|
|
|
for {
|
|
var err error
|
|
|
|
select {
|
|
case <-f.ctx.Done():
|
|
close(f.done)
|
|
return nil
|
|
|
|
case <-f.pullScheduled:
|
|
_, err = f.pull()
|
|
|
|
case <-f.pullFailTimer.C:
|
|
var success bool
|
|
success, err = f.pull()
|
|
if (err != nil || !success) && f.pullPause < 60*f.pullBasePause() {
|
|
// Back off from retrying to pull
|
|
f.pullPause *= 2
|
|
}
|
|
|
|
case <-initialCompleted:
|
|
// Initial scan has completed, we should do a pull
|
|
initialCompleted = nil // never hit this case again
|
|
_, err = f.pull()
|
|
|
|
case <-f.forcedRescanRequested:
|
|
err = f.handleForcedRescans()
|
|
|
|
case <-f.scanTimer.C:
|
|
l.Debugln(f, "Scanning due to timer")
|
|
err = f.scanTimerFired()
|
|
|
|
case req := <-f.doInSyncChan:
|
|
l.Debugln(f, "Running something due to request")
|
|
err = req.fn()
|
|
req.err <- err
|
|
|
|
case next := <-f.scanDelay:
|
|
l.Debugln(f, "Delaying scan")
|
|
f.scanTimer.Reset(next)
|
|
|
|
case <-f.scanScheduled:
|
|
l.Debugln(f, "Scan was scheduled")
|
|
f.scanTimer.Reset(0)
|
|
|
|
case fsEvents := <-f.watchChan:
|
|
l.Debugln(f, "Scan due to watcher")
|
|
err = f.scanSubdirs(fsEvents)
|
|
|
|
case <-f.restartWatchChan:
|
|
l.Debugln(f, "Restart watcher")
|
|
err = f.restartWatch()
|
|
|
|
case <-f.versionCleanupTimer.C:
|
|
l.Debugln(f, "Doing version cleanup")
|
|
f.versionCleanupTimerFired()
|
|
}
|
|
|
|
if err != nil {
|
|
if svcutil.IsFatal(err) {
|
|
return err
|
|
}
|
|
f.setError(err)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (*folder) BringToFront(string) {}
|
|
|
|
func (*folder) Override() {}
|
|
|
|
func (*folder) Revert() {}
|
|
|
|
func (f *folder) DelayScan(next time.Duration) {
|
|
select {
|
|
case f.scanDelay <- next:
|
|
case <-f.done:
|
|
}
|
|
}
|
|
|
|
func (f *folder) ScheduleScan() {
|
|
// 1-buffered chan
|
|
select {
|
|
case f.scanScheduled <- struct{}{}:
|
|
default:
|
|
}
|
|
}
|
|
|
|
func (f *folder) ignoresUpdated() {
|
|
if f.FSWatcherEnabled {
|
|
f.scheduleWatchRestart()
|
|
}
|
|
}
|
|
|
|
func (f *folder) SchedulePull() {
|
|
select {
|
|
case f.pullScheduled <- struct{}{}:
|
|
default:
|
|
// We might be busy doing a pull and thus not reading from this
|
|
// channel. The channel is 1-buffered, so one notification will be
|
|
// queued to ensure we recheck after the pull, but beyond that we must
|
|
// make sure to not block index receiving.
|
|
}
|
|
}
|
|
|
|
func (*folder) Jobs(_, _ int) ([]string, []string, int) {
|
|
return nil, nil, 0
|
|
}
|
|
|
|
func (f *folder) Scan(subdirs []string) error {
|
|
<-f.initialScanFinished
|
|
return f.doInSync(func() error { return f.scanSubdirs(subdirs) })
|
|
}
|
|
|
|
// doInSync allows to run functions synchronously in folder.serve from exported,
|
|
// asynchronously called methods.
|
|
func (f *folder) doInSync(fn func() error) error {
|
|
req := syncRequest{
|
|
fn: fn,
|
|
err: make(chan error, 1),
|
|
}
|
|
|
|
select {
|
|
case f.doInSyncChan <- req:
|
|
return <-req.err
|
|
case <-f.done:
|
|
return context.Canceled
|
|
}
|
|
}
|
|
|
|
func (f *folder) Reschedule() {
|
|
if f.scanInterval == 0 {
|
|
return
|
|
}
|
|
// Sleep a random time between 3/4 and 5/4 of the configured interval.
|
|
sleepNanos := (f.scanInterval.Nanoseconds()*3 + rand.Int63n(2*f.scanInterval.Nanoseconds())) / 4
|
|
interval := time.Duration(sleepNanos) * time.Nanosecond
|
|
l.Debugln(f, "next rescan in", interval)
|
|
f.scanTimer.Reset(interval)
|
|
}
|
|
|
|
func (f *folder) getHealthErrorAndLoadIgnores() error {
|
|
if err := f.getHealthErrorWithoutIgnores(); err != nil {
|
|
return err
|
|
}
|
|
if f.Type != config.FolderTypeReceiveEncrypted {
|
|
if err := f.ignores.Load(".stignore"); err != nil && !fs.IsNotExist(err) {
|
|
return fmt.Errorf("loading ignores: %w", err)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (f *folder) getHealthErrorWithoutIgnores() error {
|
|
// Check for folder errors, with the most serious and specific first and
|
|
// generic ones like out of space on the home disk later.
|
|
|
|
if err := f.CheckPath(); err != nil {
|
|
return err
|
|
}
|
|
|
|
if minFree := f.model.cfg.Options().MinHomeDiskFree; minFree.Value > 0 {
|
|
dbPath := locations.Get(locations.Database)
|
|
if usage, err := fs.NewFilesystem(fs.FilesystemTypeBasic, dbPath).Usage("."); err == nil {
|
|
if err = config.CheckFreeSpace(minFree, usage); err != nil {
|
|
return fmt.Errorf("insufficient space on disk for database (%v): %w", dbPath, err)
|
|
}
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (f *folder) pull() (success bool, err error) {
|
|
f.pullFailTimer.Stop()
|
|
select {
|
|
case <-f.pullFailTimer.C:
|
|
default:
|
|
}
|
|
|
|
select {
|
|
case <-f.initialScanFinished:
|
|
default:
|
|
// Once the initial scan finished, a pull will be scheduled
|
|
return true, nil
|
|
}
|
|
|
|
defer func() {
|
|
if success {
|
|
// We're good, reset the pause interval.
|
|
f.pullPause = f.pullBasePause()
|
|
}
|
|
}()
|
|
|
|
// If there is nothing to do, don't even enter sync-waiting state.
|
|
abort := true
|
|
snap, err := f.dbSnapshot()
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
snap.WithNeed(protocol.LocalDeviceID, func(intf protocol.FileInfo) bool {
|
|
abort = false
|
|
return false
|
|
})
|
|
snap.Release()
|
|
if abort {
|
|
// Clears pull failures on items that were needed before, but aren't anymore.
|
|
f.errorsMut.Lock()
|
|
f.pullErrors = nil
|
|
f.errorsMut.Unlock()
|
|
return true, nil
|
|
}
|
|
|
|
// Abort early (before acquiring a token) if there's a folder error
|
|
err = f.getHealthErrorWithoutIgnores()
|
|
if err != nil {
|
|
l.Debugln("Skipping pull of", f.Description(), "due to folder error:", err)
|
|
return false, err
|
|
}
|
|
|
|
// Send only folder doesn't do any io, it only checks for out-of-sync
|
|
// items that differ in metadata and updates those.
|
|
if f.Type != config.FolderTypeSendOnly {
|
|
f.setState(FolderSyncWaiting)
|
|
|
|
if err := f.ioLimiter.TakeWithContext(f.ctx, 1); err != nil {
|
|
return true, err
|
|
}
|
|
defer f.ioLimiter.Give(1)
|
|
}
|
|
|
|
startTime := time.Now()
|
|
|
|
// Check if the ignore patterns changed.
|
|
oldHash := f.ignores.Hash()
|
|
defer func() {
|
|
if f.ignores.Hash() != oldHash {
|
|
f.ignoresUpdated()
|
|
}
|
|
}()
|
|
err = f.getHealthErrorAndLoadIgnores()
|
|
if err != nil {
|
|
l.Debugln("Skipping pull of", f.Description(), "due to folder error:", err)
|
|
return false, err
|
|
}
|
|
f.setError(nil)
|
|
|
|
success, err = f.puller.pull()
|
|
|
|
if success && err == nil {
|
|
return true, nil
|
|
}
|
|
|
|
// Pulling failed, try again later.
|
|
delay := f.pullPause + time.Since(startTime)
|
|
l.Infof("Folder %v isn't making sync progress - retrying in %v.", f.Description(), stringutil.NiceDurationString(delay))
|
|
f.pullFailTimer.Reset(delay)
|
|
|
|
return false, err
|
|
}
|
|
|
|
func (f *folder) scanSubdirs(subDirs []string) error {
|
|
l.Debugf("%v scanning", f)
|
|
|
|
oldHash := f.ignores.Hash()
|
|
|
|
err := f.getHealthErrorAndLoadIgnores()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
f.setError(nil)
|
|
|
|
// Check on the way out if the ignore patterns changed as part of scanning
|
|
// this folder. If they did we should schedule a pull of the folder so that
|
|
// we request things we might have suddenly become unignored and so on.
|
|
defer func() {
|
|
if f.ignores.Hash() != oldHash {
|
|
l.Debugln("Folder", f.Description(), "ignore patterns change detected while scanning; triggering puller")
|
|
f.ignoresUpdated()
|
|
f.SchedulePull()
|
|
}
|
|
}()
|
|
|
|
f.setState(FolderScanWaiting)
|
|
defer f.setState(FolderIdle)
|
|
|
|
if err := f.ioLimiter.TakeWithContext(f.ctx, 1); err != nil {
|
|
return err
|
|
}
|
|
defer f.ioLimiter.Give(1)
|
|
|
|
metricFolderScans.WithLabelValues(f.ID).Inc()
|
|
ctx, cancel := context.WithCancel(f.ctx)
|
|
defer cancel()
|
|
go addTimeUntilCancelled(ctx, metricFolderScanSeconds.WithLabelValues(f.ID))
|
|
|
|
for i := range subDirs {
|
|
sub := osutil.NativeFilename(subDirs[i])
|
|
|
|
if sub == "" {
|
|
// A blank subdirs means to scan the entire folder. We can trim
|
|
// the subDirs list and go on our way.
|
|
subDirs = nil
|
|
break
|
|
}
|
|
|
|
subDirs[i] = sub
|
|
}
|
|
|
|
// Clean the list of subitems to ensure that we start at a known
|
|
// directory, and don't scan subdirectories of things we've already
|
|
// scanned.
|
|
snap, err := f.dbSnapshot()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
subDirs = unifySubs(subDirs, func(file string) bool {
|
|
_, ok := snap.Get(protocol.LocalDeviceID, file)
|
|
return ok
|
|
})
|
|
snap.Release()
|
|
|
|
f.setState(FolderScanning)
|
|
f.clearScanErrors(subDirs)
|
|
|
|
batch := f.newScanBatch()
|
|
|
|
// Schedule a pull after scanning, but only if we actually detected any
|
|
// changes.
|
|
changes := 0
|
|
defer func() {
|
|
l.Debugf("%v finished scanning, detected %v changes", f, changes)
|
|
if changes > 0 {
|
|
f.SchedulePull()
|
|
}
|
|
}()
|
|
|
|
changesHere, err := f.scanSubdirsChangedAndNew(subDirs, batch)
|
|
changes += changesHere
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := batch.Flush(); err != nil {
|
|
return err
|
|
}
|
|
|
|
if len(subDirs) == 0 {
|
|
// If we have no specific subdirectories to traverse, set it to one
|
|
// empty prefix so we traverse the entire folder contents once.
|
|
subDirs = []string{""}
|
|
}
|
|
|
|
// Do a scan of the database for each prefix, to check for deleted and
|
|
// ignored files.
|
|
|
|
changesHere, err = f.scanSubdirsDeletedAndIgnored(subDirs, batch)
|
|
changes += changesHere
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := batch.Flush(); err != nil {
|
|
return err
|
|
}
|
|
|
|
f.ScanCompleted()
|
|
return nil
|
|
}
|
|
|
|
const maxToRemove = 1000
|
|
|
|
type scanBatch struct {
|
|
f *folder
|
|
updateBatch *db.FileInfoBatch
|
|
toRemove []string
|
|
}
|
|
|
|
func (f *folder) newScanBatch() *scanBatch {
|
|
b := &scanBatch{
|
|
f: f,
|
|
toRemove: make([]string, 0, maxToRemove),
|
|
}
|
|
b.updateBatch = db.NewFileInfoBatch(func(fs []protocol.FileInfo) error {
|
|
if err := b.f.getHealthErrorWithoutIgnores(); err != nil {
|
|
l.Debugf("Stopping scan of folder %s due to: %s", b.f.Description(), err)
|
|
return err
|
|
}
|
|
b.f.updateLocalsFromScanning(fs)
|
|
return nil
|
|
})
|
|
return b
|
|
}
|
|
|
|
func (b *scanBatch) Remove(item string) {
|
|
b.toRemove = append(b.toRemove, item)
|
|
}
|
|
|
|
func (b *scanBatch) flushToRemove() {
|
|
if len(b.toRemove) > 0 {
|
|
b.f.fset.RemoveLocalItems(b.toRemove)
|
|
b.toRemove = b.toRemove[:0]
|
|
}
|
|
}
|
|
|
|
func (b *scanBatch) Flush() error {
|
|
b.flushToRemove()
|
|
return b.updateBatch.Flush()
|
|
}
|
|
|
|
func (b *scanBatch) FlushIfFull() error {
|
|
if len(b.toRemove) >= maxToRemove {
|
|
b.flushToRemove()
|
|
}
|
|
return b.updateBatch.FlushIfFull()
|
|
}
|
|
|
|
// Update adds the fileinfo to the batch for updating, and does a few checks.
|
|
// It returns false if the checks result in the file not going to be updated or removed.
|
|
func (b *scanBatch) Update(fi protocol.FileInfo, snap *db.Snapshot) bool {
|
|
// Check for a "virtual" parent directory of encrypted files. We don't track
|
|
// it, but check if anything still exists within and delete it otherwise.
|
|
if b.f.Type == config.FolderTypeReceiveEncrypted && fi.IsDirectory() && protocol.IsEncryptedParent(fs.PathComponents(fi.Name)) {
|
|
if names, err := b.f.mtimefs.DirNames(fi.Name); err == nil && len(names) == 0 {
|
|
b.f.mtimefs.Remove(fi.Name)
|
|
}
|
|
return false
|
|
}
|
|
// Resolve receive-only items which are identical with the global state or
|
|
// the global item is our own receive-only item.
|
|
switch gf, ok := snap.GetGlobal(fi.Name); {
|
|
case !ok:
|
|
case gf.IsReceiveOnlyChanged():
|
|
if fi.IsDeleted() {
|
|
// Our item is deleted and the global item is our own receive only
|
|
// file. No point in keeping track of that.
|
|
b.Remove(fi.Name)
|
|
return true
|
|
}
|
|
case (b.f.Type == config.FolderTypeReceiveOnly || b.f.Type == config.FolderTypeReceiveEncrypted) &&
|
|
gf.IsEquivalentOptional(fi, protocol.FileInfoComparison{
|
|
ModTimeWindow: b.f.modTimeWindow,
|
|
IgnorePerms: b.f.IgnorePerms,
|
|
IgnoreBlocks: true,
|
|
IgnoreFlags: protocol.FlagLocalReceiveOnly,
|
|
IgnoreOwnership: !b.f.SyncOwnership && !b.f.SendOwnership,
|
|
IgnoreXattrs: !b.f.SyncXattrs && !b.f.SendXattrs,
|
|
}):
|
|
// What we have locally is equivalent to the global file.
|
|
l.Debugf("%v scanning: Merging identical locally changed item with global", b.f, fi)
|
|
fi = gf
|
|
}
|
|
b.updateBatch.Append(fi)
|
|
return true
|
|
}
|
|
|
|
func (f *folder) scanSubdirsChangedAndNew(subDirs []string, batch *scanBatch) (int, error) {
|
|
changes := 0
|
|
snap, err := f.dbSnapshot()
|
|
if err != nil {
|
|
return changes, err
|
|
}
|
|
defer snap.Release()
|
|
|
|
// If we return early e.g. due to a folder health error, the scan needs
|
|
// to be cancelled.
|
|
scanCtx, scanCancel := context.WithCancel(f.ctx)
|
|
defer scanCancel()
|
|
|
|
scanConfig := scanner.Config{
|
|
Folder: f.ID,
|
|
Subs: subDirs,
|
|
Matcher: f.ignores,
|
|
TempLifetime: time.Duration(f.model.cfg.Options().KeepTemporariesH) * time.Hour,
|
|
CurrentFiler: cFiler{snap},
|
|
Filesystem: f.mtimefs,
|
|
IgnorePerms: f.IgnorePerms,
|
|
AutoNormalize: f.AutoNormalize,
|
|
Hashers: f.model.numHashers(f.ID),
|
|
ShortID: f.shortID,
|
|
ProgressTickIntervalS: f.ScanProgressIntervalS,
|
|
LocalFlags: f.localFlags,
|
|
ModTimeWindow: f.modTimeWindow,
|
|
EventLogger: f.evLogger,
|
|
ScanOwnership: f.SendOwnership || f.SyncOwnership,
|
|
ScanXattrs: f.SendXattrs || f.SyncXattrs,
|
|
XattrFilter: f.XattrFilter,
|
|
}
|
|
var fchan chan scanner.ScanResult
|
|
if f.Type == config.FolderTypeReceiveEncrypted {
|
|
fchan = scanner.WalkWithoutHashing(scanCtx, scanConfig)
|
|
} else {
|
|
fchan = scanner.Walk(scanCtx, scanConfig)
|
|
}
|
|
|
|
alreadyUsedOrExisting := make(map[string]struct{})
|
|
for res := range fchan {
|
|
if res.Err != nil {
|
|
f.newScanError(res.Path, res.Err)
|
|
continue
|
|
}
|
|
|
|
if err := batch.FlushIfFull(); err != nil {
|
|
// Prevent a race between the scan aborting due to context
|
|
// cancellation and releasing the snapshot in defer here.
|
|
scanCancel()
|
|
for range fchan {
|
|
}
|
|
return changes, err
|
|
}
|
|
|
|
if batch.Update(res.File, snap) {
|
|
changes++
|
|
}
|
|
|
|
switch f.Type {
|
|
case config.FolderTypeReceiveOnly, config.FolderTypeReceiveEncrypted:
|
|
default:
|
|
if nf, ok := f.findRename(snap, res.File, alreadyUsedOrExisting); ok {
|
|
if batch.Update(nf, snap) {
|
|
changes++
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
return changes, nil
|
|
}
|
|
|
|
func (f *folder) scanSubdirsDeletedAndIgnored(subDirs []string, batch *scanBatch) (int, error) {
|
|
var toIgnore []protocol.FileInfo
|
|
ignoredParent := ""
|
|
changes := 0
|
|
snap, err := f.dbSnapshot()
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
defer snap.Release()
|
|
|
|
for _, sub := range subDirs {
|
|
var iterError error
|
|
|
|
snap.WithPrefixedHaveTruncated(protocol.LocalDeviceID, sub, func(fi protocol.FileInfo) bool {
|
|
select {
|
|
case <-f.ctx.Done():
|
|
return false
|
|
default:
|
|
}
|
|
|
|
if err := batch.FlushIfFull(); err != nil {
|
|
iterError = err
|
|
return false
|
|
}
|
|
|
|
if ignoredParent != "" && !fs.IsParent(fi.Name, ignoredParent) {
|
|
for _, file := range toIgnore {
|
|
l.Debugln("marking file as ignored", file)
|
|
nf := file
|
|
nf.SetIgnored()
|
|
if batch.Update(nf, snap) {
|
|
changes++
|
|
}
|
|
if err := batch.FlushIfFull(); err != nil {
|
|
iterError = err
|
|
return false
|
|
}
|
|
}
|
|
toIgnore = toIgnore[:0]
|
|
ignoredParent = ""
|
|
}
|
|
|
|
switch ignored := f.ignores.Match(fi.Name).IsIgnored(); {
|
|
case fi.IsIgnored() && ignored:
|
|
return true
|
|
case !fi.IsIgnored() && ignored:
|
|
// File was not ignored at last pass but has been ignored.
|
|
if fi.IsDirectory() {
|
|
// Delay ignoring as a child might be unignored.
|
|
toIgnore = append(toIgnore, fi)
|
|
if ignoredParent == "" {
|
|
// If the parent wasn't ignored already, set
|
|
// this path as the "highest" ignored parent
|
|
ignoredParent = fi.Name
|
|
}
|
|
return true
|
|
}
|
|
|
|
l.Debugln("marking file as ignored", fi)
|
|
nf := fi
|
|
nf.SetIgnored()
|
|
if batch.Update(nf, snap) {
|
|
changes++
|
|
}
|
|
|
|
case fi.IsIgnored() && !ignored:
|
|
// Successfully scanned items are already un-ignored during
|
|
// the scan, so check whether it is deleted.
|
|
fallthrough
|
|
case !fi.IsIgnored() && !fi.IsDeleted() && !fi.IsUnsupported():
|
|
// The file is not ignored, deleted or unsupported. Lets check if
|
|
// it's still here. Simply stat:ing it won't do as there are
|
|
// tons of corner cases (e.g. parent dir->symlink, missing
|
|
// permissions)
|
|
if !osutil.IsDeleted(f.mtimefs, fi.Name) {
|
|
if ignoredParent != "" {
|
|
// Don't ignore parents of this not ignored item
|
|
toIgnore = toIgnore[:0]
|
|
ignoredParent = ""
|
|
}
|
|
return true
|
|
}
|
|
nf := fi
|
|
nf.SetDeleted(f.shortID)
|
|
nf.LocalFlags = f.localFlags
|
|
if fi.ShouldConflict() {
|
|
// We do not want to override the global version with
|
|
// the deleted file. Setting to an empty version makes
|
|
// sure the file gets in sync on the following pull.
|
|
nf.Version = protocol.Vector{}
|
|
}
|
|
l.Debugln("marking file as deleted", nf)
|
|
if batch.Update(nf, snap) {
|
|
changes++
|
|
}
|
|
case fi.IsDeleted() && fi.IsReceiveOnlyChanged():
|
|
switch f.Type {
|
|
case config.FolderTypeReceiveOnly, config.FolderTypeReceiveEncrypted:
|
|
switch gf, ok := snap.GetGlobal(fi.Name); {
|
|
case !ok:
|
|
case gf.IsReceiveOnlyChanged():
|
|
l.Debugln("removing deleted, receive-only item that is globally receive-only from db", fi)
|
|
batch.Remove(fi.Name)
|
|
changes++
|
|
case gf.IsDeleted():
|
|
// Our item is deleted and the global item is deleted too. We just
|
|
// pretend it is a normal deleted file (nobody cares about that).
|
|
l.Debugf("%v scanning: Marking globally deleted item as not locally changed: %v", f, fi.Name)
|
|
fi.LocalFlags &^= protocol.FlagLocalReceiveOnly
|
|
if batch.Update(fi, snap) {
|
|
changes++
|
|
}
|
|
}
|
|
default:
|
|
// No need to bump the version for a file that was and is
|
|
// deleted and just the folder type/local flags changed.
|
|
fi.LocalFlags &^= protocol.FlagLocalReceiveOnly
|
|
l.Debugln("removing receive-only flag on deleted item", fi)
|
|
if batch.Update(fi, snap) {
|
|
changes++
|
|
}
|
|
}
|
|
}
|
|
|
|
return true
|
|
})
|
|
|
|
select {
|
|
case <-f.ctx.Done():
|
|
return changes, f.ctx.Err()
|
|
default:
|
|
}
|
|
|
|
if iterError == nil && len(toIgnore) > 0 {
|
|
for _, file := range toIgnore {
|
|
l.Debugln("marking file as ignored", file)
|
|
nf := file
|
|
nf.SetIgnored()
|
|
if batch.Update(nf, snap) {
|
|
changes++
|
|
}
|
|
if iterError = batch.FlushIfFull(); iterError != nil {
|
|
break
|
|
}
|
|
}
|
|
toIgnore = toIgnore[:0]
|
|
}
|
|
|
|
if iterError != nil {
|
|
return changes, iterError
|
|
}
|
|
}
|
|
|
|
return changes, nil
|
|
}
|
|
|
|
func (f *folder) findRename(snap *db.Snapshot, file protocol.FileInfo, alreadyUsedOrExisting map[string]struct{}) (protocol.FileInfo, bool) {
|
|
if len(file.Blocks) == 0 || file.Size == 0 {
|
|
return protocol.FileInfo{}, false
|
|
}
|
|
|
|
found := false
|
|
nf := protocol.FileInfo{}
|
|
|
|
snap.WithBlocksHash(file.BlocksHash, func(fi protocol.FileInfo) bool {
|
|
select {
|
|
case <-f.ctx.Done():
|
|
return false
|
|
default:
|
|
}
|
|
|
|
if fi.Name == file.Name {
|
|
alreadyUsedOrExisting[fi.Name] = struct{}{}
|
|
return true
|
|
}
|
|
|
|
if _, ok := alreadyUsedOrExisting[fi.Name]; ok {
|
|
return true
|
|
}
|
|
|
|
if fi.ShouldConflict() {
|
|
return true
|
|
}
|
|
|
|
if f.ignores.Match(fi.Name).IsIgnored() {
|
|
return true
|
|
}
|
|
|
|
// Only check the size.
|
|
// No point checking block equality, as that uses BlocksHash comparison if that is set (which it will be).
|
|
// No point checking BlocksHash comparison as WithBlocksHash already does that.
|
|
if file.Size != fi.Size {
|
|
return true
|
|
}
|
|
|
|
alreadyUsedOrExisting[fi.Name] = struct{}{}
|
|
|
|
if !osutil.IsDeleted(f.mtimefs, fi.Name) {
|
|
return true
|
|
}
|
|
|
|
nf = fi
|
|
nf.SetDeleted(f.shortID)
|
|
nf.LocalFlags = f.localFlags
|
|
found = true
|
|
return false
|
|
})
|
|
|
|
return nf, found
|
|
}
|
|
|
|
func (f *folder) scanTimerFired() error {
|
|
err := f.scanSubdirs(nil)
|
|
|
|
select {
|
|
case <-f.initialScanFinished:
|
|
default:
|
|
status := "Completed"
|
|
if err != nil {
|
|
status = "Failed"
|
|
}
|
|
l.Infoln(status, "initial scan of", f.Type.String(), "folder", f.Description())
|
|
close(f.initialScanFinished)
|
|
}
|
|
|
|
f.Reschedule()
|
|
|
|
return err
|
|
}
|
|
|
|
func (f *folder) versionCleanupTimerFired() {
|
|
f.setState(FolderCleanWaiting)
|
|
defer f.setState(FolderIdle)
|
|
|
|
if err := f.ioLimiter.TakeWithContext(f.ctx, 1); err != nil {
|
|
return
|
|
}
|
|
defer f.ioLimiter.Give(1)
|
|
|
|
f.setState(FolderCleaning)
|
|
|
|
if err := f.versioner.Clean(f.ctx); err != nil {
|
|
l.Infoln("Failed to clean versions in %s: %v", f.Description(), err)
|
|
}
|
|
|
|
f.versionCleanupTimer.Reset(f.versionCleanupInterval)
|
|
}
|
|
|
|
func (f *folder) WatchError() error {
|
|
f.watchMut.Lock()
|
|
defer f.watchMut.Unlock()
|
|
return f.watchErr
|
|
}
|
|
|
|
// stopWatch immediately aborts watching and may be called asynchronously
|
|
func (f *folder) stopWatch() {
|
|
f.watchMut.Lock()
|
|
f.watchCancel()
|
|
f.watchMut.Unlock()
|
|
f.setWatchError(nil, 0)
|
|
}
|
|
|
|
// scheduleWatchRestart makes sure watching is restarted from the main for loop
|
|
// in a folder's Serve and thus may be called asynchronously (e.g. when ignores change).
|
|
func (f *folder) scheduleWatchRestart() {
|
|
select {
|
|
case f.restartWatchChan <- struct{}{}:
|
|
default:
|
|
// We might be busy doing a pull and thus not reading from this
|
|
// channel. The channel is 1-buffered, so one notification will be
|
|
// queued to ensure we recheck after the pull.
|
|
}
|
|
}
|
|
|
|
// restartWatch should only ever be called synchronously. If you want to use
|
|
// this asynchronously, you should probably use scheduleWatchRestart instead.
|
|
func (f *folder) restartWatch() error {
|
|
f.stopWatch()
|
|
f.startWatch()
|
|
return f.scanSubdirs(nil)
|
|
}
|
|
|
|
// startWatch should only ever be called synchronously. If you want to use
|
|
// this asynchronously, you should probably use scheduleWatchRestart instead.
|
|
func (f *folder) startWatch() {
|
|
ctx, cancel := context.WithCancel(f.ctx)
|
|
f.watchMut.Lock()
|
|
f.watchChan = make(chan []string)
|
|
f.watchCancel = cancel
|
|
f.watchMut.Unlock()
|
|
go f.monitorWatch(ctx)
|
|
}
|
|
|
|
// monitorWatch starts the filesystem watching and retries every minute on failure.
|
|
// It should not be used except in startWatch.
|
|
func (f *folder) monitorWatch(ctx context.Context) {
|
|
failTimer := time.NewTimer(0)
|
|
aggrCtx, aggrCancel := context.WithCancel(ctx)
|
|
var err error
|
|
var eventChan <-chan fs.Event
|
|
var errChan <-chan error
|
|
warnedOutside := false
|
|
var lastWatch time.Time
|
|
pause := time.Minute
|
|
// Subscribe to folder summaries only on kqueue systems, to warn about potential high resource usage
|
|
var summarySub events.Subscription
|
|
var summaryChan <-chan events.Event
|
|
if fs.WatchKqueue && !f.warnedKqueue {
|
|
summarySub = f.evLogger.Subscribe(events.FolderSummary)
|
|
summaryChan = summarySub.C()
|
|
}
|
|
defer func() {
|
|
aggrCancel() // aggrCancel might e re-assigned -> call within closure
|
|
if summaryChan != nil {
|
|
summarySub.Unsubscribe()
|
|
}
|
|
}()
|
|
for {
|
|
select {
|
|
case <-failTimer.C:
|
|
eventChan, errChan, err = f.mtimefs.Watch(".", f.ignores, ctx, f.IgnorePerms)
|
|
// We do this once per minute initially increased to
|
|
// max one hour in case of repeat failures.
|
|
f.scanOnWatchErr()
|
|
f.setWatchError(err, pause)
|
|
if err != nil {
|
|
failTimer.Reset(pause)
|
|
if pause < 60*time.Minute {
|
|
pause *= 2
|
|
}
|
|
continue
|
|
}
|
|
lastWatch = time.Now()
|
|
watchaggregator.Aggregate(aggrCtx, eventChan, f.watchChan, f.FolderConfiguration, f.model.cfg, f.evLogger)
|
|
l.Debugln("Started filesystem watcher for folder", f.Description())
|
|
case err = <-errChan:
|
|
var next time.Duration
|
|
if dur := time.Since(lastWatch); dur > pause {
|
|
pause = time.Minute
|
|
next = 0
|
|
} else {
|
|
next = pause - dur
|
|
if pause < 60*time.Minute {
|
|
pause *= 2
|
|
}
|
|
}
|
|
failTimer.Reset(next)
|
|
f.setWatchError(err, next)
|
|
// This error was previously a panic and should never occur, so generate
|
|
// a warning, but don't do it repetitively.
|
|
var errOutside *fs.ErrWatchEventOutsideRoot
|
|
if errors.As(err, &errOutside) {
|
|
if !warnedOutside {
|
|
l.Warnln(err)
|
|
warnedOutside = true
|
|
}
|
|
f.evLogger.Log(events.Failure, "watching for changes encountered an event outside of the filesystem root")
|
|
}
|
|
aggrCancel()
|
|
errChan = nil
|
|
aggrCtx, aggrCancel = context.WithCancel(ctx)
|
|
case ev := <-summaryChan:
|
|
if data, ok := ev.Data.(FolderSummaryEventData); !ok {
|
|
f.evLogger.Log(events.Failure, "Unexpected type of folder-summary event in folder.monitorWatch")
|
|
} else if data.Folder == f.folderID && data.Summary.LocalTotalItems-data.Summary.LocalDeleted > kqueueItemCountThreshold {
|
|
f.warnedKqueue = true
|
|
summarySub.Unsubscribe()
|
|
summaryChan = nil
|
|
l.Warnf("Filesystem watching (kqueue) is enabled on %v with a lot of files/directories, and that requires a lot of resources and might slow down your system significantly", f.Description())
|
|
}
|
|
case <-ctx.Done():
|
|
aggrCancel() // for good measure and keeping the linters happy
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// setWatchError sets the current error state of the watch and should be called
|
|
// regardless of whether err is nil or not.
|
|
func (f *folder) setWatchError(err error, nextTryIn time.Duration) {
|
|
f.watchMut.Lock()
|
|
prevErr := f.watchErr
|
|
f.watchErr = err
|
|
f.watchMut.Unlock()
|
|
if err != prevErr {
|
|
data := map[string]interface{}{
|
|
"folder": f.ID,
|
|
}
|
|
if prevErr != nil {
|
|
data["from"] = prevErr.Error()
|
|
}
|
|
if err != nil {
|
|
data["to"] = err.Error()
|
|
}
|
|
f.evLogger.Log(events.FolderWatchStateChanged, data)
|
|
}
|
|
if err == nil {
|
|
return
|
|
}
|
|
msg := fmt.Sprintf("Error while trying to start filesystem watcher for folder %s, trying again in %v: %v", f.Description(), nextTryIn, err)
|
|
if prevErr != err {
|
|
l.Infof(msg)
|
|
return
|
|
}
|
|
l.Debugf(msg)
|
|
}
|
|
|
|
// scanOnWatchErr schedules a full scan immediately if an error occurred while watching.
|
|
func (f *folder) scanOnWatchErr() {
|
|
f.watchMut.Lock()
|
|
err := f.watchErr
|
|
f.watchMut.Unlock()
|
|
if err != nil {
|
|
f.DelayScan(0)
|
|
}
|
|
}
|
|
|
|
func (f *folder) setError(err error) {
|
|
select {
|
|
case <-f.ctx.Done():
|
|
return
|
|
default:
|
|
}
|
|
|
|
_, _, oldErr := f.getState()
|
|
if (err != nil && oldErr != nil && oldErr.Error() == err.Error()) || (err == nil && oldErr == nil) {
|
|
return
|
|
}
|
|
|
|
if err != nil {
|
|
if oldErr == nil {
|
|
l.Warnf("Error on folder %s: %v", f.Description(), err)
|
|
} else {
|
|
l.Infof("Error on folder %s changed: %q -> %q", f.Description(), oldErr, err)
|
|
}
|
|
} else {
|
|
l.Infoln("Cleared error on folder", f.Description())
|
|
f.SchedulePull()
|
|
}
|
|
|
|
if f.FSWatcherEnabled {
|
|
if err != nil {
|
|
f.stopWatch()
|
|
} else {
|
|
f.scheduleWatchRestart()
|
|
}
|
|
}
|
|
|
|
f.stateTracker.setError(err)
|
|
}
|
|
|
|
func (f *folder) pullBasePause() time.Duration {
|
|
if f.PullerPauseS == 0 {
|
|
return defaultPullerPause
|
|
}
|
|
return time.Duration(f.PullerPauseS) * time.Second
|
|
}
|
|
|
|
func (f *folder) String() string {
|
|
return fmt.Sprintf("%s/%s@%p", f.Type, f.folderID, f)
|
|
}
|
|
|
|
func (f *folder) newScanError(path string, err error) {
|
|
f.errorsMut.Lock()
|
|
l.Infof("Scanner (folder %s, item %q): %v", f.Description(), path, err)
|
|
f.scanErrors = append(f.scanErrors, FileError{
|
|
Err: err.Error(),
|
|
Path: path,
|
|
})
|
|
f.errorsMut.Unlock()
|
|
}
|
|
|
|
func (f *folder) clearScanErrors(subDirs []string) {
|
|
f.errorsMut.Lock()
|
|
defer f.errorsMut.Unlock()
|
|
if len(subDirs) == 0 {
|
|
f.scanErrors = nil
|
|
return
|
|
}
|
|
filtered := f.scanErrors[:0]
|
|
outer:
|
|
for _, fe := range f.scanErrors {
|
|
for _, sub := range subDirs {
|
|
if fe.Path == sub || fs.IsParent(fe.Path, sub) {
|
|
continue outer
|
|
}
|
|
}
|
|
filtered = append(filtered, fe)
|
|
}
|
|
f.scanErrors = filtered
|
|
}
|
|
|
|
func (f *folder) Errors() []FileError {
|
|
f.errorsMut.Lock()
|
|
defer f.errorsMut.Unlock()
|
|
scanLen := len(f.scanErrors)
|
|
errors := make([]FileError, scanLen+len(f.pullErrors))
|
|
copy(errors[:scanLen], f.scanErrors)
|
|
copy(errors[scanLen:], f.pullErrors)
|
|
sort.Sort(fileErrorList(errors))
|
|
return errors
|
|
}
|
|
|
|
// ScheduleForceRescan marks the file such that it gets rehashed on next scan, and schedules a scan.
|
|
func (f *folder) ScheduleForceRescan(path string) {
|
|
f.forcedRescanPathsMut.Lock()
|
|
f.forcedRescanPaths[path] = struct{}{}
|
|
f.forcedRescanPathsMut.Unlock()
|
|
|
|
select {
|
|
case f.forcedRescanRequested <- struct{}{}:
|
|
default:
|
|
}
|
|
}
|
|
|
|
func (f *folder) updateLocalsFromScanning(fs []protocol.FileInfo) {
|
|
f.updateLocals(fs)
|
|
|
|
f.emitDiskChangeEvents(fs, events.LocalChangeDetected)
|
|
}
|
|
|
|
func (f *folder) updateLocalsFromPulling(fs []protocol.FileInfo) {
|
|
f.updateLocals(fs)
|
|
|
|
f.emitDiskChangeEvents(fs, events.RemoteChangeDetected)
|
|
}
|
|
|
|
func (f *folder) updateLocals(fs []protocol.FileInfo) {
|
|
f.fset.Update(protocol.LocalDeviceID, fs)
|
|
|
|
filenames := make([]string, len(fs))
|
|
f.forcedRescanPathsMut.Lock()
|
|
for i, file := range fs {
|
|
filenames[i] = file.Name
|
|
// No need to rescan a file that was changed since anyway.
|
|
delete(f.forcedRescanPaths, file.Name)
|
|
}
|
|
f.forcedRescanPathsMut.Unlock()
|
|
|
|
seq := f.fset.Sequence(protocol.LocalDeviceID)
|
|
f.evLogger.Log(events.LocalIndexUpdated, map[string]interface{}{
|
|
"folder": f.ID,
|
|
"items": len(fs),
|
|
"filenames": filenames,
|
|
"sequence": seq,
|
|
"version": seq, // legacy for sequence
|
|
})
|
|
}
|
|
|
|
func (f *folder) emitDiskChangeEvents(fs []protocol.FileInfo, typeOfEvent events.EventType) {
|
|
for _, file := range fs {
|
|
if file.IsInvalid() {
|
|
continue
|
|
}
|
|
|
|
objType := "file"
|
|
action := "modified"
|
|
|
|
if file.IsDeleted() {
|
|
action = "deleted"
|
|
}
|
|
|
|
if file.IsSymlink() {
|
|
objType = "symlink"
|
|
} else if file.IsDirectory() {
|
|
objType = "dir"
|
|
}
|
|
|
|
// Two different events can be fired here based on what EventType is passed into function
|
|
f.evLogger.Log(typeOfEvent, map[string]string{
|
|
"folder": f.ID,
|
|
"folderID": f.ID, // incorrect, deprecated, kept for historical compliance
|
|
"label": f.Label,
|
|
"action": action,
|
|
"type": objType,
|
|
"path": filepath.FromSlash(file.Name),
|
|
"modifiedBy": file.ModifiedBy.String(),
|
|
})
|
|
}
|
|
}
|
|
|
|
func (f *folder) handleForcedRescans() error {
|
|
f.forcedRescanPathsMut.Lock()
|
|
paths := make([]string, 0, len(f.forcedRescanPaths))
|
|
for path := range f.forcedRescanPaths {
|
|
paths = append(paths, path)
|
|
}
|
|
f.forcedRescanPaths = make(map[string]struct{})
|
|
f.forcedRescanPathsMut.Unlock()
|
|
if len(paths) == 0 {
|
|
return nil
|
|
}
|
|
|
|
batch := db.NewFileInfoBatch(func(fs []protocol.FileInfo) error {
|
|
f.fset.Update(protocol.LocalDeviceID, fs)
|
|
return nil
|
|
})
|
|
|
|
snap, err := f.dbSnapshot()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer snap.Release()
|
|
|
|
for _, path := range paths {
|
|
if err := batch.FlushIfFull(); err != nil {
|
|
return err
|
|
}
|
|
|
|
fi, ok := snap.Get(protocol.LocalDeviceID, path)
|
|
if !ok {
|
|
continue
|
|
}
|
|
fi.SetMustRescan()
|
|
batch.Append(fi)
|
|
}
|
|
|
|
if err = batch.Flush(); err != nil {
|
|
return err
|
|
}
|
|
|
|
return f.scanSubdirs(paths)
|
|
}
|
|
|
|
// dbSnapshots gets a snapshot from the fileset, and wraps any error
|
|
// in a svcutil.FatalErr.
|
|
func (f *folder) dbSnapshot() (*db.Snapshot, error) {
|
|
snap, err := f.fset.Snapshot()
|
|
if err != nil {
|
|
return nil, svcutil.AsFatalErr(err, svcutil.ExitError)
|
|
}
|
|
return snap, nil
|
|
}
|
|
|
|
// The exists function is expected to return true for all known paths
|
|
// (excluding "" and ".")
|
|
func unifySubs(dirs []string, exists func(dir string) bool) []string {
|
|
if len(dirs) == 0 {
|
|
return nil
|
|
}
|
|
sort.Strings(dirs)
|
|
if dirs[0] == "" || dirs[0] == "." || dirs[0] == string(fs.PathSeparator) {
|
|
return nil
|
|
}
|
|
prev := "./" // Anything that can't be parent of a clean path
|
|
for i := 0; i < len(dirs); {
|
|
dir, err := fs.Canonicalize(dirs[i])
|
|
if err != nil {
|
|
l.Debugf("Skipping %v for scan: %s", dirs[i], err)
|
|
dirs = append(dirs[:i], dirs[i+1:]...)
|
|
continue
|
|
}
|
|
if dir == prev || fs.IsParent(dir, prev) {
|
|
dirs = append(dirs[:i], dirs[i+1:]...)
|
|
continue
|
|
}
|
|
parent := filepath.Dir(dir)
|
|
for parent != "." && parent != string(fs.PathSeparator) && !exists(parent) {
|
|
dir = parent
|
|
parent = filepath.Dir(dir)
|
|
}
|
|
dirs[i] = dir
|
|
prev = dir
|
|
i++
|
|
}
|
|
return dirs
|
|
}
|
|
|
|
type cFiler struct {
|
|
*db.Snapshot
|
|
}
|
|
|
|
// Implements scanner.CurrentFiler
|
|
func (cf cFiler) CurrentFile(file string) (protocol.FileInfo, bool) {
|
|
return cf.Get(protocol.LocalDeviceID, file)
|
|
}
|