mirror of
https://github.com/octoleo/syncthing.git
synced 2025-01-22 22:58:25 +00:00
lib/model: Refactor folder.scanSubdirs into smaller parts (#7321)
This commit is contained in:
parent
46bbc78e82
commit
11e9d575c8
@ -440,20 +440,128 @@ func (f *folder) scanSubdirs(subDirs []string) error {
|
||||
subDirs[i] = sub
|
||||
}
|
||||
|
||||
snap := f.fset.Snapshot()
|
||||
// We release explicitly later in this function, however we might exit early
|
||||
// and it's ok to release twice.
|
||||
defer snap.Release()
|
||||
|
||||
// Clean the list of subitems to ensure that we start at a known
|
||||
// directory, and don't scan subdirectories of things we've already
|
||||
// scanned.
|
||||
snap := f.fset.Snapshot()
|
||||
subDirs = unifySubs(subDirs, func(file string) bool {
|
||||
_, ok := snap.Get(protocol.LocalDeviceID, file)
|
||||
return ok
|
||||
})
|
||||
snap.Release()
|
||||
|
||||
f.setState(FolderScanning)
|
||||
f.clearScanErrors(subDirs)
|
||||
|
||||
batch := newFileInfoBatch(func(fs []protocol.FileInfo) error {
|
||||
if err := f.getHealthErrorWithoutIgnores(); err != nil {
|
||||
l.Debugf("Stopping scan of folder %s due to: %s", f.Description(), err)
|
||||
return err
|
||||
}
|
||||
f.updateLocalsFromScanning(fs)
|
||||
return nil
|
||||
})
|
||||
|
||||
batchAppend := f.scanSubdirsBatchAppendFunc(batch)
|
||||
|
||||
// Schedule a pull after scanning, but only if we actually detected any
|
||||
// changes.
|
||||
changes := 0
|
||||
defer func() {
|
||||
l.Debugf("%v finished scanning, detected %v changes", f, changes)
|
||||
if changes > 0 {
|
||||
f.SchedulePull()
|
||||
}
|
||||
}()
|
||||
|
||||
changesHere, err := f.scanSubdirsChangedAndNew(subDirs, batch, batchAppend)
|
||||
changes += changesHere
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := batch.flush(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if len(subDirs) == 0 {
|
||||
// If we have no specific subdirectories to traverse, set it to one
|
||||
// empty prefix so we traverse the entire folder contents once.
|
||||
subDirs = []string{""}
|
||||
}
|
||||
|
||||
// Do a scan of the database for each prefix, to check for deleted and
|
||||
// ignored files.
|
||||
|
||||
changesHere, err = f.scanSubdirsDeletedAndIgnored(subDirs, batch, batchAppend)
|
||||
changes += changesHere
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := batch.flush(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
f.ScanCompleted()
|
||||
return nil
|
||||
}
|
||||
|
||||
type batchAppendFunc func(protocol.FileInfo, *db.Snapshot) bool
|
||||
|
||||
func (f *folder) scanSubdirsBatchAppendFunc(batch *fileInfoBatch) batchAppendFunc {
|
||||
// Resolve items which are identical with the global state.
|
||||
switch f.Type {
|
||||
case config.FolderTypeReceiveOnly:
|
||||
return func(fi protocol.FileInfo, snap *db.Snapshot) bool {
|
||||
switch gf, ok := snap.GetGlobal(fi.Name); {
|
||||
case !ok:
|
||||
case gf.IsEquivalentOptional(fi, f.modTimeWindow, false, false, protocol.FlagLocalReceiveOnly):
|
||||
// What we have locally is equivalent to the global file.
|
||||
fi.Version = gf.Version
|
||||
l.Debugf("%v scanning: Merging identical locally changed item with global", f, fi)
|
||||
fallthrough
|
||||
case fi.IsDeleted() && (gf.IsReceiveOnlyChanged() || gf.IsDeleted()):
|
||||
// Our item is deleted and the global item is our own
|
||||
// receive only file or deleted too. In the former
|
||||
// case we can't delete file infos, so we just
|
||||
// pretend it is a normal deleted file (nobody
|
||||
// cares about that).
|
||||
l.Debugf("%v scanning: Marking item as not locally changed", f, fi)
|
||||
fi.LocalFlags &^= protocol.FlagLocalReceiveOnly
|
||||
}
|
||||
batch.append(fi)
|
||||
return true
|
||||
}
|
||||
case config.FolderTypeReceiveEncrypted:
|
||||
return func(fi protocol.FileInfo, _ *db.Snapshot) bool {
|
||||
// This is a "virtual" parent directory of encrypted files.
|
||||
// We don't track it, but check if anything still exists
|
||||
// within and delete it otherwise.
|
||||
if fi.IsDirectory() && protocol.IsEncryptedParent(fi.Name) {
|
||||
if names, err := f.mtimefs.DirNames(fi.Name); err == nil && len(names) == 0 {
|
||||
f.mtimefs.Remove(fi.Name)
|
||||
}
|
||||
return false
|
||||
}
|
||||
// Any local change must not be sent as index entry to
|
||||
// remotes and show up as an error in the UI.
|
||||
fi.LocalFlags = protocol.FlagLocalReceiveOnly
|
||||
batch.append(fi)
|
||||
return true
|
||||
}
|
||||
default:
|
||||
return func(fi protocol.FileInfo, _ *db.Snapshot) bool {
|
||||
batch.append(fi)
|
||||
return true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (f *folder) scanSubdirsChangedAndNew(subDirs []string, batch *fileInfoBatch, batchAppend batchAppendFunc) (int, error) {
|
||||
changes := 0
|
||||
snap := f.fset.Snapshot()
|
||||
defer snap.Release()
|
||||
|
||||
// If we return early e.g. due to a folder health error, the scan needs
|
||||
// to be cancelled.
|
||||
@ -483,72 +591,6 @@ func (f *folder) scanSubdirs(subDirs []string) error {
|
||||
fchan = scanner.Walk(scanCtx, scanConfig)
|
||||
}
|
||||
|
||||
batch := newFileInfoBatch(func(fs []protocol.FileInfo) error {
|
||||
if err := f.getHealthErrorWithoutIgnores(); err != nil {
|
||||
l.Debugf("Stopping scan of folder %s due to: %s", f.Description(), err)
|
||||
return err
|
||||
}
|
||||
f.updateLocalsFromScanning(fs)
|
||||
return nil
|
||||
})
|
||||
|
||||
// Schedule a pull after scanning, but only if we actually detected any
|
||||
// changes.
|
||||
changes := 0
|
||||
defer func() {
|
||||
l.Debugf("%v finished scanning, detected %v changes", f, changes)
|
||||
if changes > 0 {
|
||||
f.SchedulePull()
|
||||
}
|
||||
}()
|
||||
|
||||
var batchAppend func(protocol.FileInfo, *db.Snapshot)
|
||||
// Resolve items which are identical with the global state.
|
||||
switch f.Type {
|
||||
case config.FolderTypeReceiveOnly:
|
||||
batchAppend = func(fi protocol.FileInfo, snap *db.Snapshot) {
|
||||
switch gf, ok := snap.GetGlobal(fi.Name); {
|
||||
case !ok:
|
||||
case gf.IsEquivalentOptional(fi, f.modTimeWindow, false, false, protocol.FlagLocalReceiveOnly):
|
||||
// What we have locally is equivalent to the global file.
|
||||
fi.Version = gf.Version
|
||||
l.Debugf("%v scanning: Merging identical locally changed item with global", f, fi)
|
||||
fallthrough
|
||||
case fi.IsDeleted() && (gf.IsReceiveOnlyChanged() || gf.IsDeleted()):
|
||||
// Our item is deleted and the global item is our own
|
||||
// receive only file or deleted too. In the former
|
||||
// case we can't delete file infos, so we just
|
||||
// pretend it is a normal deleted file (nobody
|
||||
// cares about that).
|
||||
l.Debugf("%v scanning: Marking item as not locally changed", f, fi)
|
||||
fi.LocalFlags &^= protocol.FlagLocalReceiveOnly
|
||||
}
|
||||
batch.append(fi)
|
||||
}
|
||||
case config.FolderTypeReceiveEncrypted:
|
||||
batchAppend = func(fi protocol.FileInfo, _ *db.Snapshot) {
|
||||
// This is a "virtual" parent directory of encrypted files.
|
||||
// We don't track it, but check if anything still exists
|
||||
// within and delete it otherwise.
|
||||
if fi.IsDirectory() && protocol.IsEncryptedParent(fi.Name) {
|
||||
if names, err := f.mtimefs.DirNames(fi.Name); err == nil && len(names) == 0 {
|
||||
f.mtimefs.Remove(fi.Name)
|
||||
}
|
||||
changes--
|
||||
return
|
||||
}
|
||||
// Any local change must not be sent as index entry to
|
||||
// remotes and show up as an error in the UI.
|
||||
fi.LocalFlags = protocol.FlagLocalReceiveOnly
|
||||
batch.append(fi)
|
||||
}
|
||||
default:
|
||||
batchAppend = func(fi protocol.FileInfo, _ *db.Snapshot) {
|
||||
batch.append(fi)
|
||||
}
|
||||
}
|
||||
|
||||
f.clearScanErrors(subDirs)
|
||||
alreadyUsedOrExisting := make(map[string]struct{})
|
||||
for res := range fchan {
|
||||
if res.Err != nil {
|
||||
@ -562,43 +604,32 @@ func (f *folder) scanSubdirs(subDirs []string) error {
|
||||
scanCancel()
|
||||
for range fchan {
|
||||
}
|
||||
return err
|
||||
return changes, err
|
||||
}
|
||||
|
||||
batchAppend(res.File, snap)
|
||||
changes++
|
||||
if batchAppend(res.File, snap) {
|
||||
changes++
|
||||
}
|
||||
|
||||
switch f.Type {
|
||||
case config.FolderTypeReceiveOnly, config.FolderTypeReceiveEncrypted:
|
||||
default:
|
||||
if nf, ok := f.findRename(snap, res.File, alreadyUsedOrExisting); ok {
|
||||
batchAppend(nf, snap)
|
||||
changes++
|
||||
if batchAppend(nf, snap) {
|
||||
changes++
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if err := batch.flush(); err != nil {
|
||||
return err
|
||||
}
|
||||
return changes, nil
|
||||
}
|
||||
|
||||
// Might have grown large, isn't used anymore and this function may keep
|
||||
// running for some time.
|
||||
alreadyUsedOrExisting = nil
|
||||
|
||||
if len(subDirs) == 0 {
|
||||
// If we have no specific subdirectories to traverse, set it to one
|
||||
// empty prefix so we traverse the entire folder contents once.
|
||||
subDirs = []string{""}
|
||||
}
|
||||
|
||||
// Do a scan of the database for each prefix, to check for deleted and
|
||||
// ignored files.
|
||||
func (f *folder) scanSubdirsDeletedAndIgnored(subDirs []string, batch *fileInfoBatch, batchAppend batchAppendFunc) (int, error) {
|
||||
var toIgnore []db.FileInfoTruncated
|
||||
ignoredParent := ""
|
||||
|
||||
snap.Release()
|
||||
snap = f.fset.Snapshot()
|
||||
changes := 0
|
||||
snap := f.fset.Snapshot()
|
||||
defer snap.Release()
|
||||
|
||||
for _, sub := range subDirs {
|
||||
@ -622,8 +653,9 @@ func (f *folder) scanSubdirs(subDirs []string) error {
|
||||
for _, file := range toIgnore {
|
||||
l.Debugln("marking file as ignored", file)
|
||||
nf := file.ConvertToIgnoredFileInfo(f.shortID)
|
||||
batchAppend(nf, snap)
|
||||
changes++
|
||||
if batchAppend(nf, snap) {
|
||||
changes++
|
||||
}
|
||||
if err := batch.flushIfFull(); err != nil {
|
||||
iterError = err
|
||||
return false
|
||||
@ -651,8 +683,9 @@ func (f *folder) scanSubdirs(subDirs []string) error {
|
||||
|
||||
l.Debugln("marking file as ignored", file)
|
||||
nf := file.ConvertToIgnoredFileInfo(f.shortID)
|
||||
batchAppend(nf, snap)
|
||||
changes++
|
||||
if batchAppend(nf, snap) {
|
||||
changes++
|
||||
}
|
||||
|
||||
case file.IsIgnored() && !ignored:
|
||||
// Successfully scanned items are already un-ignored during
|
||||
@ -680,21 +713,24 @@ func (f *folder) scanSubdirs(subDirs []string) error {
|
||||
nf.Version = protocol.Vector{}
|
||||
}
|
||||
l.Debugln("marking file as deleted", nf)
|
||||
batchAppend(nf, snap)
|
||||
changes++
|
||||
if batchAppend(nf, snap) {
|
||||
changes++
|
||||
}
|
||||
case file.IsDeleted() && file.IsReceiveOnlyChanged() && f.Type == config.FolderTypeReceiveOnly && len(snap.Availability(file.Name)) == 0:
|
||||
file.Version = protocol.Vector{}
|
||||
file.LocalFlags &^= protocol.FlagLocalReceiveOnly
|
||||
l.Debugln("marking deleted item that doesn't exist anywhere as not receive-only", file)
|
||||
batchAppend(file.ConvertDeletedToFileInfo(), snap)
|
||||
changes++
|
||||
if batchAppend(file.ConvertDeletedToFileInfo(), snap) {
|
||||
changes++
|
||||
}
|
||||
case file.IsDeleted() && file.IsReceiveOnlyChanged() && f.Type != config.FolderTypeReceiveOnly:
|
||||
// No need to bump the version for a file that was and is
|
||||
// deleted and just the folder type/local flags changed.
|
||||
file.LocalFlags &^= protocol.FlagLocalReceiveOnly
|
||||
l.Debugln("removing receive-only flag on deleted item", file)
|
||||
batchAppend(file.ConvertDeletedToFileInfo(), snap)
|
||||
changes++
|
||||
if batchAppend(file.ConvertDeletedToFileInfo(), snap) {
|
||||
changes++
|
||||
}
|
||||
}
|
||||
|
||||
return true
|
||||
@ -702,7 +738,7 @@ func (f *folder) scanSubdirs(subDirs []string) error {
|
||||
|
||||
select {
|
||||
case <-f.ctx.Done():
|
||||
return f.ctx.Err()
|
||||
return changes, f.ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
@ -710,8 +746,9 @@ func (f *folder) scanSubdirs(subDirs []string) error {
|
||||
for _, file := range toIgnore {
|
||||
l.Debugln("marking file as ignored", f)
|
||||
nf := file.ConvertToIgnoredFileInfo(f.shortID)
|
||||
batchAppend(nf, snap)
|
||||
changes++
|
||||
if batchAppend(nf, snap) {
|
||||
changes++
|
||||
}
|
||||
if iterError = batch.flushIfFull(); iterError != nil {
|
||||
break
|
||||
}
|
||||
@ -720,16 +757,11 @@ func (f *folder) scanSubdirs(subDirs []string) error {
|
||||
}
|
||||
|
||||
if iterError != nil {
|
||||
return iterError
|
||||
return changes, iterError
|
||||
}
|
||||
}
|
||||
|
||||
if err := batch.flush(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
f.ScanCompleted()
|
||||
return nil
|
||||
return changes, nil
|
||||
}
|
||||
|
||||
func (f *folder) findRename(snap *db.Snapshot, file protocol.FileInfo, alreadyUsedOrExisting map[string]struct{}) (protocol.FileInfo, bool) {
|
||||
|
@ -16,6 +16,7 @@ import (
|
||||
"runtime"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@ -368,12 +369,15 @@ func pullInvalidIgnored(t *testing.T, ft config.FolderType) {
|
||||
|
||||
done = make(chan struct{})
|
||||
expected := map[string]struct{}{ign: {}, ignExisting: {}}
|
||||
var expectedMut sync.Mutex
|
||||
// The indexes will normally arrive in one update, but it is possible
|
||||
// that they arrive in separate ones.
|
||||
fc.mut.Lock()
|
||||
fc.indexFn = func(_ context.Context, folder string, fs []protocol.FileInfo) {
|
||||
expectedMut.Lock()
|
||||
for _, f := range fs {
|
||||
if _, ok := expected[f.Name]; !ok {
|
||||
_, ok := expected[f.Name]
|
||||
if !ok {
|
||||
t.Errorf("Unexpected file %v was updated in index", f.Name)
|
||||
continue
|
||||
}
|
||||
@ -406,6 +410,7 @@ func pullInvalidIgnored(t *testing.T, ft config.FolderType) {
|
||||
if len(expected) == 0 {
|
||||
close(done)
|
||||
}
|
||||
expectedMut.Unlock()
|
||||
}
|
||||
// Make sure pulling doesn't interfere, as index updates are racy and
|
||||
// thus we cannot distinguish between scan and pull results.
|
||||
@ -420,7 +425,9 @@ func pullInvalidIgnored(t *testing.T, ft config.FolderType) {
|
||||
|
||||
select {
|
||||
case <-time.After(5 * time.Second):
|
||||
expectedMut.Lock()
|
||||
t.Fatal("timed out before receiving index updates for all existing files, missing", expected)
|
||||
expectedMut.Unlock()
|
||||
case <-done:
|
||||
}
|
||||
}
|
||||
@ -463,10 +470,14 @@ func TestIssue4841(t *testing.T) {
|
||||
t.Fatal("Failed scanning:", err)
|
||||
}
|
||||
|
||||
f := checkReceived(<-received)
|
||||
|
||||
if !f.Version.Equal(protocol.Vector{}) {
|
||||
t.Errorf("Got Version == %v, expected empty version", f.Version)
|
||||
select {
|
||||
case <-time.After(10 * time.Second):
|
||||
t.Fatal("timed out")
|
||||
case r := <-received:
|
||||
f := checkReceived(r)
|
||||
if !f.Version.Equal(protocol.Vector{}) {
|
||||
t.Errorf("Got Version == %v, expected empty version", f.Version)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -1179,10 +1190,11 @@ func TestRequestIndexSenderPause(t *testing.T) {
|
||||
|
||||
indexChan := make(chan []protocol.FileInfo)
|
||||
fc.mut.Lock()
|
||||
fc.indexFn = func(_ context.Context, folder string, fs []protocol.FileInfo) {
|
||||
fc.indexFn = func(ctx context.Context, folder string, fs []protocol.FileInfo) {
|
||||
select {
|
||||
case indexChan <- fs:
|
||||
case <-done:
|
||||
case <-ctx.Done():
|
||||
}
|
||||
}
|
||||
fc.mut.Unlock()
|
||||
|
Loading…
x
Reference in New Issue
Block a user