mirror of
https://github.com/octoleo/syncthing.git
synced 2024-11-09 14:50:56 +00:00
parent
d5141c6d51
commit
dd39556759
@ -20,7 +20,7 @@ import (
|
||||
// do not put restrictions on downgrades (e.g. for repairs after a bugfix).
|
||||
const (
|
||||
dbVersion = 14
|
||||
dbMigrationVersion = 16
|
||||
dbMigrationVersion = 17
|
||||
dbMinSyncthingVersion = "v1.9.0"
|
||||
)
|
||||
|
||||
@ -101,6 +101,7 @@ func (db *schemaUpdater) updateSchema() error {
|
||||
{13, 13, "v1.7.0", db.updateSchemaTo13},
|
||||
{14, 14, "v1.9.0", db.updateSchemaTo14},
|
||||
{14, 16, "v1.9.0", db.checkRepairMigration},
|
||||
{14, 17, "v1.9.0", db.migration17},
|
||||
}
|
||||
|
||||
for _, m := range migrations {
|
||||
@ -783,6 +784,53 @@ func (db *schemaUpdater) checkRepairMigration(_ int) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// migration17 finds all files that were pulled as invalid from an invalid
|
||||
// global and make sure they get scanned/pulled again.
|
||||
func (db *schemaUpdater) migration17(prev int) error {
|
||||
if prev < 16 {
|
||||
// Issue was introduced in migration to 16
|
||||
return nil
|
||||
}
|
||||
t, err := db.newReadOnlyTransaction()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer t.close()
|
||||
|
||||
for _, folderStr := range db.ListFolders() {
|
||||
folder := []byte(folderStr)
|
||||
meta, err := db.loadMetadataTracker(folderStr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
batch := NewFileInfoBatch(func(fs []protocol.FileInfo) error {
|
||||
return db.updateLocalFiles(folder, fs, meta)
|
||||
})
|
||||
var innerErr error
|
||||
err = t.withHave(folder, protocol.LocalDeviceID[:], nil, false, func(fi protocol.FileIntf) bool {
|
||||
if fi.IsInvalid() && fi.FileLocalFlags() == 0 {
|
||||
f := fi.(protocol.FileInfo)
|
||||
f.SetMustRescan()
|
||||
f.Version = protocol.Vector{}
|
||||
batch.Append(f)
|
||||
innerErr = batch.FlushIfFull()
|
||||
return innerErr == nil
|
||||
}
|
||||
return true
|
||||
})
|
||||
if innerErr != nil {
|
||||
return innerErr
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := batch.Flush(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (db *schemaUpdater) rewriteGlobals(t readWriteTransaction) error {
|
||||
it, err := t.NewPrefixIterator([]byte{KeyTypeGlobal})
|
||||
if err != nil {
|
||||
|
@ -472,7 +472,6 @@ func TestNeedWithInvalid(t *testing.T) {
|
||||
remote0Have[0],
|
||||
remote1Have[0],
|
||||
remote0Have[2],
|
||||
remote1Have[2],
|
||||
}
|
||||
|
||||
replace(s, protocol.LocalDeviceID, localHave)
|
||||
|
@ -774,8 +774,10 @@ func (t readWriteTransaction) updateLocalNeed(keyBuf, folder, name []byte, add b
|
||||
}
|
||||
|
||||
func Need(global FileVersion, haveLocal bool, localVersion protocol.Vector) bool {
|
||||
// We never need a file without a valid version.
|
||||
if global.Version.IsEmpty() {
|
||||
// We never need an invalid file or a file without a valid version (just
|
||||
// another way of expressing "invalid", really, until we fix that
|
||||
// part...).
|
||||
if global.IsInvalid() || global.Version.IsEmpty() {
|
||||
return false
|
||||
}
|
||||
// We don't need a deleted file if we don't have it.
|
||||
|
70
lib/db/util.go
Normal file
70
lib/db/util.go
Normal file
@ -0,0 +1,70 @@
|
||||
// Copyright (C) 2021 The Syncthing Authors.
|
||||
//
|
||||
// This Source Code Form is subject to the terms of the Mozilla Public
|
||||
// License, v. 2.0. If a copy of the MPL was not distributed with this file,
|
||||
// You can obtain one at https://mozilla.org/MPL/2.0/.
|
||||
|
||||
package db
|
||||
|
||||
import "github.com/syncthing/syncthing/lib/protocol"
|
||||
|
||||
// How many files to send in each Index/IndexUpdate message.
|
||||
const (
|
||||
MaxBatchSizeBytes = 250 * 1024 // Aim for making index messages no larger than 250 KiB (uncompressed)
|
||||
MaxBatchSizeFiles = 1000 // Either way, don't include more files than this
|
||||
)
|
||||
|
||||
// FileInfoBatch is a utility to do file operations on the database in suitably
|
||||
// sized batches.
|
||||
type FileInfoBatch struct {
|
||||
infos []protocol.FileInfo
|
||||
size int
|
||||
flushFn func([]protocol.FileInfo) error
|
||||
}
|
||||
|
||||
func NewFileInfoBatch(fn func([]protocol.FileInfo) error) *FileInfoBatch {
|
||||
return &FileInfoBatch{
|
||||
infos: make([]protocol.FileInfo, 0, MaxBatchSizeFiles),
|
||||
flushFn: fn,
|
||||
}
|
||||
}
|
||||
|
||||
func (b *FileInfoBatch) SetFlushFunc(fn func([]protocol.FileInfo) error) {
|
||||
b.flushFn = fn
|
||||
}
|
||||
|
||||
func (b *FileInfoBatch) Append(f protocol.FileInfo) {
|
||||
b.infos = append(b.infos, f)
|
||||
b.size += f.ProtoSize()
|
||||
}
|
||||
|
||||
func (b *FileInfoBatch) Full() bool {
|
||||
return len(b.infos) >= MaxBatchSizeFiles || b.size >= MaxBatchSizeBytes
|
||||
}
|
||||
|
||||
func (b *FileInfoBatch) FlushIfFull() error {
|
||||
if b.Full() {
|
||||
return b.Flush()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *FileInfoBatch) Flush() error {
|
||||
if len(b.infos) == 0 {
|
||||
return nil
|
||||
}
|
||||
if err := b.flushFn(b.infos); err != nil {
|
||||
return err
|
||||
}
|
||||
b.Reset()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *FileInfoBatch) Reset() {
|
||||
b.infos = b.infos[:0]
|
||||
b.size = 0
|
||||
}
|
||||
|
||||
func (b *FileInfoBatch) Size() int {
|
||||
return b.size
|
||||
}
|
@ -473,7 +473,7 @@ func (f *folder) scanSubdirs(subDirs []string) error {
|
||||
f.setState(FolderScanning)
|
||||
f.clearScanErrors(subDirs)
|
||||
|
||||
batch := newFileInfoBatch(func(fs []protocol.FileInfo) error {
|
||||
batch := db.NewFileInfoBatch(func(fs []protocol.FileInfo) error {
|
||||
if err := f.getHealthErrorWithoutIgnores(); err != nil {
|
||||
l.Debugf("Stopping scan of folder %s due to: %s", f.Description(), err)
|
||||
return err
|
||||
@ -500,7 +500,7 @@ func (f *folder) scanSubdirs(subDirs []string) error {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := batch.flush(); err != nil {
|
||||
if err := batch.Flush(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@ -519,7 +519,7 @@ func (f *folder) scanSubdirs(subDirs []string) error {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := batch.flush(); err != nil {
|
||||
if err := batch.Flush(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@ -529,7 +529,7 @@ func (f *folder) scanSubdirs(subDirs []string) error {
|
||||
|
||||
type batchAppendFunc func(protocol.FileInfo, *db.Snapshot) bool
|
||||
|
||||
func (f *folder) scanSubdirsBatchAppendFunc(batch *fileInfoBatch) batchAppendFunc {
|
||||
func (f *folder) scanSubdirsBatchAppendFunc(batch *db.FileInfoBatch) batchAppendFunc {
|
||||
// Resolve items which are identical with the global state.
|
||||
switch f.Type {
|
||||
case config.FolderTypeReceiveOnly:
|
||||
@ -550,7 +550,7 @@ func (f *folder) scanSubdirsBatchAppendFunc(batch *fileInfoBatch) batchAppendFun
|
||||
l.Debugf("%v scanning: Marking item as not locally changed", f, fi)
|
||||
fi.LocalFlags &^= protocol.FlagLocalReceiveOnly
|
||||
}
|
||||
batch.append(fi)
|
||||
batch.Append(fi)
|
||||
return true
|
||||
}
|
||||
case config.FolderTypeReceiveEncrypted:
|
||||
@ -567,18 +567,18 @@ func (f *folder) scanSubdirsBatchAppendFunc(batch *fileInfoBatch) batchAppendFun
|
||||
// Any local change must not be sent as index entry to
|
||||
// remotes and show up as an error in the UI.
|
||||
fi.LocalFlags = protocol.FlagLocalReceiveOnly
|
||||
batch.append(fi)
|
||||
batch.Append(fi)
|
||||
return true
|
||||
}
|
||||
default:
|
||||
return func(fi protocol.FileInfo, _ *db.Snapshot) bool {
|
||||
batch.append(fi)
|
||||
batch.Append(fi)
|
||||
return true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (f *folder) scanSubdirsChangedAndNew(subDirs []string, batch *fileInfoBatch, batchAppend batchAppendFunc) (int, error) {
|
||||
func (f *folder) scanSubdirsChangedAndNew(subDirs []string, batch *db.FileInfoBatch, batchAppend batchAppendFunc) (int, error) {
|
||||
changes := 0
|
||||
snap, err := f.dbSnapshot()
|
||||
if err != nil {
|
||||
@ -621,7 +621,7 @@ func (f *folder) scanSubdirsChangedAndNew(subDirs []string, batch *fileInfoBatch
|
||||
continue
|
||||
}
|
||||
|
||||
if err := batch.flushIfFull(); err != nil {
|
||||
if err := batch.FlushIfFull(); err != nil {
|
||||
// Prevent a race between the scan aborting due to context
|
||||
// cancellation and releasing the snapshot in defer here.
|
||||
scanCancel()
|
||||
@ -648,7 +648,7 @@ func (f *folder) scanSubdirsChangedAndNew(subDirs []string, batch *fileInfoBatch
|
||||
return changes, nil
|
||||
}
|
||||
|
||||
func (f *folder) scanSubdirsDeletedAndIgnored(subDirs []string, batch *fileInfoBatch, batchAppend batchAppendFunc) (int, error) {
|
||||
func (f *folder) scanSubdirsDeletedAndIgnored(subDirs []string, batch *db.FileInfoBatch, batchAppend batchAppendFunc) (int, error) {
|
||||
var toIgnore []db.FileInfoTruncated
|
||||
ignoredParent := ""
|
||||
changes := 0
|
||||
@ -670,7 +670,7 @@ func (f *folder) scanSubdirsDeletedAndIgnored(subDirs []string, batch *fileInfoB
|
||||
|
||||
file := fi.(db.FileInfoTruncated)
|
||||
|
||||
if err := batch.flushIfFull(); err != nil {
|
||||
if err := batch.FlushIfFull(); err != nil {
|
||||
iterError = err
|
||||
return false
|
||||
}
|
||||
@ -682,7 +682,7 @@ func (f *folder) scanSubdirsDeletedAndIgnored(subDirs []string, batch *fileInfoB
|
||||
if batchAppend(nf, snap) {
|
||||
changes++
|
||||
}
|
||||
if err := batch.flushIfFull(); err != nil {
|
||||
if err := batch.FlushIfFull(); err != nil {
|
||||
iterError = err
|
||||
return false
|
||||
}
|
||||
@ -775,7 +775,7 @@ func (f *folder) scanSubdirsDeletedAndIgnored(subDirs []string, batch *fileInfoB
|
||||
if batchAppend(nf, snap) {
|
||||
changes++
|
||||
}
|
||||
if iterError = batch.flushIfFull(); iterError != nil {
|
||||
if iterError = batch.FlushIfFull(); iterError != nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
@ -1206,7 +1206,7 @@ func (f *folder) handleForcedRescans() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
batch := newFileInfoBatch(func(fs []protocol.FileInfo) error {
|
||||
batch := db.NewFileInfoBatch(func(fs []protocol.FileInfo) error {
|
||||
f.fset.Update(protocol.LocalDeviceID, fs)
|
||||
return nil
|
||||
})
|
||||
@ -1218,7 +1218,7 @@ func (f *folder) handleForcedRescans() error {
|
||||
defer snap.Release()
|
||||
|
||||
for _, path := range paths {
|
||||
if err := batch.flushIfFull(); err != nil {
|
||||
if err := batch.FlushIfFull(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@ -1227,10 +1227,10 @@ func (f *folder) handleForcedRescans() error {
|
||||
continue
|
||||
}
|
||||
fi.SetMustRescan()
|
||||
batch.append(fi)
|
||||
batch.Append(fi)
|
||||
}
|
||||
|
||||
if err = batch.flush(); err != nil {
|
||||
if err = batch.Flush(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -41,7 +41,7 @@ func (f *receiveEncryptedFolder) revert() error {
|
||||
f.setState(FolderScanning)
|
||||
defer f.setState(FolderIdle)
|
||||
|
||||
batch := newFileInfoBatch(func(fs []protocol.FileInfo) error {
|
||||
batch := db.NewFileInfoBatch(func(fs []protocol.FileInfo) error {
|
||||
f.updateLocalsFromScanning(fs)
|
||||
return nil
|
||||
})
|
||||
@ -54,7 +54,7 @@ func (f *receiveEncryptedFolder) revert() error {
|
||||
var iterErr error
|
||||
var dirs []string
|
||||
snap.WithHaveTruncated(protocol.LocalDeviceID, func(intf protocol.FileIntf) bool {
|
||||
if iterErr = batch.flushIfFull(); iterErr != nil {
|
||||
if iterErr = batch.FlushIfFull(); iterErr != nil {
|
||||
return false
|
||||
}
|
||||
|
||||
@ -81,7 +81,7 @@ func (f *receiveEncryptedFolder) revert() error {
|
||||
// item should still not be sent in index updates. However being
|
||||
// deleted, it will not show up as an unexpected file in the UI
|
||||
// anymore.
|
||||
batch.append(fi)
|
||||
batch.Append(fi)
|
||||
|
||||
return true
|
||||
})
|
||||
@ -91,7 +91,7 @@ func (f *receiveEncryptedFolder) revert() error {
|
||||
if iterErr != nil {
|
||||
return iterErr
|
||||
}
|
||||
return batch.flush()
|
||||
return batch.Flush()
|
||||
}
|
||||
|
||||
func (f *receiveEncryptedFolder) revertHandleDirs(dirs []string, snap *db.Snapshot) {
|
||||
|
@ -82,8 +82,10 @@ func (f *receiveOnlyFolder) revert() error {
|
||||
scanChan: scanChan,
|
||||
}
|
||||
|
||||
batch := make([]protocol.FileInfo, 0, maxBatchSizeFiles)
|
||||
batchSizeBytes := 0
|
||||
batch := db.NewFileInfoBatch(func(files []protocol.FileInfo) error {
|
||||
f.updateLocalsFromScanning(files)
|
||||
return nil
|
||||
})
|
||||
snap, err := f.dbSnapshot()
|
||||
if err != nil {
|
||||
return err
|
||||
@ -124,21 +126,12 @@ func (f *receiveOnlyFolder) revert() error {
|
||||
fi.Version = protocol.Vector{}
|
||||
}
|
||||
|
||||
batch = append(batch, fi)
|
||||
batchSizeBytes += fi.ProtoSize()
|
||||
batch.Append(fi)
|
||||
_ = batch.FlushIfFull()
|
||||
|
||||
if len(batch) >= maxBatchSizeFiles || batchSizeBytes >= maxBatchSizeBytes {
|
||||
f.updateLocalsFromScanning(batch)
|
||||
batch = batch[:0]
|
||||
batchSizeBytes = 0
|
||||
}
|
||||
return true
|
||||
})
|
||||
if len(batch) > 0 {
|
||||
f.updateLocalsFromScanning(batch)
|
||||
}
|
||||
batch = batch[:0]
|
||||
batchSizeBytes = 0
|
||||
_ = batch.Flush()
|
||||
|
||||
// Handle any queued directories
|
||||
deleted, err := delQueue.flush(snap)
|
||||
@ -147,7 +140,7 @@ func (f *receiveOnlyFolder) revert() error {
|
||||
}
|
||||
now := time.Now()
|
||||
for _, dir := range deleted {
|
||||
batch = append(batch, protocol.FileInfo{
|
||||
batch.Append(protocol.FileInfo{
|
||||
Name: dir,
|
||||
Type: protocol.FileInfoTypeDirectory,
|
||||
ModifiedS: now.Unix(),
|
||||
@ -156,9 +149,7 @@ func (f *receiveOnlyFolder) revert() error {
|
||||
Version: protocol.Vector{},
|
||||
})
|
||||
}
|
||||
if len(batch) > 0 {
|
||||
f.updateLocalsFromScanning(batch)
|
||||
}
|
||||
_ = batch.Flush()
|
||||
|
||||
// We will likely have changed our local index, but that won't trigger a
|
||||
// pull by itself. Make sure we schedule one so that we start
|
||||
|
@ -37,7 +37,7 @@ func (f *sendOnlyFolder) PullErrors() []FileError {
|
||||
|
||||
// pull checks need for files that only differ by metadata (no changes on disk)
|
||||
func (f *sendOnlyFolder) pull() (bool, error) {
|
||||
batch := newFileInfoBatch(func(files []protocol.FileInfo) error {
|
||||
batch := db.NewFileInfoBatch(func(files []protocol.FileInfo) error {
|
||||
f.updateLocalsFromPulling(files)
|
||||
return nil
|
||||
})
|
||||
@ -48,13 +48,13 @@ func (f *sendOnlyFolder) pull() (bool, error) {
|
||||
}
|
||||
defer snap.Release()
|
||||
snap.WithNeed(protocol.LocalDeviceID, func(intf protocol.FileIntf) bool {
|
||||
batch.flushIfFull()
|
||||
batch.FlushIfFull()
|
||||
|
||||
file := intf.(protocol.FileInfo)
|
||||
|
||||
if f.ignores.ShouldIgnore(intf.FileName()) {
|
||||
file.SetIgnored()
|
||||
batch.append(file)
|
||||
batch.Append(file)
|
||||
l.Debugln(f, "Handling ignored file", file)
|
||||
return true
|
||||
}
|
||||
@ -63,7 +63,7 @@ func (f *sendOnlyFolder) pull() (bool, error) {
|
||||
if !ok {
|
||||
if intf.IsInvalid() {
|
||||
// Global invalid file just exists for need accounting
|
||||
batch.append(file)
|
||||
batch.Append(file)
|
||||
} else if intf.IsDeleted() {
|
||||
l.Debugln("Should never get a deleted file as needed when we don't have it")
|
||||
f.evLogger.Log(events.Failure, "got deleted file that doesn't exist locally as needed when pulling on send-only")
|
||||
@ -75,13 +75,13 @@ func (f *sendOnlyFolder) pull() (bool, error) {
|
||||
return true
|
||||
}
|
||||
|
||||
batch.append(file)
|
||||
batch.Append(file)
|
||||
l.Debugln(f, "Merging versions of identical file", file)
|
||||
|
||||
return true
|
||||
})
|
||||
|
||||
batch.flush()
|
||||
batch.Flush()
|
||||
|
||||
return true, nil
|
||||
}
|
||||
@ -96,8 +96,10 @@ func (f *sendOnlyFolder) override() error {
|
||||
f.setState(FolderScanning)
|
||||
defer f.setState(FolderIdle)
|
||||
|
||||
batch := make([]protocol.FileInfo, 0, maxBatchSizeFiles)
|
||||
batchSizeBytes := 0
|
||||
batch := db.NewFileInfoBatch(func(files []protocol.FileInfo) error {
|
||||
f.updateLocalsFromScanning(files)
|
||||
return nil
|
||||
})
|
||||
snap, err := f.dbSnapshot()
|
||||
if err != nil {
|
||||
return err
|
||||
@ -105,11 +107,7 @@ func (f *sendOnlyFolder) override() error {
|
||||
defer snap.Release()
|
||||
snap.WithNeed(protocol.LocalDeviceID, func(fi protocol.FileIntf) bool {
|
||||
need := fi.(protocol.FileInfo)
|
||||
if len(batch) == maxBatchSizeFiles || batchSizeBytes > maxBatchSizeBytes {
|
||||
f.updateLocalsFromScanning(batch)
|
||||
batch = batch[:0]
|
||||
batchSizeBytes = 0
|
||||
}
|
||||
_ = batch.FlushIfFull()
|
||||
|
||||
have, ok := snap.Get(protocol.LocalDeviceID, need.Name)
|
||||
// Don't override files that are in a bad state (ignored,
|
||||
@ -126,12 +124,8 @@ func (f *sendOnlyFolder) override() error {
|
||||
need = have
|
||||
}
|
||||
need.Sequence = 0
|
||||
batch = append(batch, need)
|
||||
batchSizeBytes += need.ProtoSize()
|
||||
batch.Append(need)
|
||||
return true
|
||||
})
|
||||
if len(batch) > 0 {
|
||||
f.updateLocalsFromScanning(batch)
|
||||
}
|
||||
return nil
|
||||
return batch.Flush()
|
||||
}
|
||||
|
@ -357,11 +357,6 @@ func (f *sendReceiveFolder) processNeeded(snap *db.Snapshot, dbUpdateChan chan<-
|
||||
changed--
|
||||
}
|
||||
|
||||
case file.IsInvalid():
|
||||
// Global invalid file just exists for need accounting
|
||||
l.Debugln(f, "Handling global invalid item", file)
|
||||
dbUpdateChan <- dbUpdateJob{file, dbUpdateInvalidate}
|
||||
|
||||
case file.IsDeleted():
|
||||
if file.IsDirectory() {
|
||||
// Perform directory deletions at the end, as we may have
|
||||
@ -1668,15 +1663,12 @@ func (f *sendReceiveFolder) Jobs(page, perpage int) ([]string, []string, int) {
|
||||
func (f *sendReceiveFolder) dbUpdaterRoutine(dbUpdateChan <-chan dbUpdateJob) {
|
||||
const maxBatchTime = 2 * time.Second
|
||||
|
||||
batch := newFileInfoBatch(nil)
|
||||
tick := time.NewTicker(maxBatchTime)
|
||||
defer tick.Stop()
|
||||
|
||||
changedDirs := make(map[string]struct{})
|
||||
found := false
|
||||
var lastFile protocol.FileInfo
|
||||
|
||||
batch.flushFn = func(files []protocol.FileInfo) error {
|
||||
tick := time.NewTicker(maxBatchTime)
|
||||
defer tick.Stop()
|
||||
batch := db.NewFileInfoBatch(func(files []protocol.FileInfo) error {
|
||||
// sync directories
|
||||
for dir := range changedDirs {
|
||||
delete(changedDirs, dir)
|
||||
@ -1703,7 +1695,7 @@ func (f *sendReceiveFolder) dbUpdaterRoutine(dbUpdateChan <-chan dbUpdateJob) {
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
})
|
||||
|
||||
recvEnc := f.Type == config.FolderTypeReceiveEncrypted
|
||||
loop:
|
||||
@ -1736,16 +1728,16 @@ loop:
|
||||
|
||||
job.file.Sequence = 0
|
||||
|
||||
batch.append(job.file)
|
||||
batch.Append(job.file)
|
||||
|
||||
batch.flushIfFull()
|
||||
batch.FlushIfFull()
|
||||
|
||||
case <-tick.C:
|
||||
batch.flush()
|
||||
batch.Flush()
|
||||
}
|
||||
}
|
||||
|
||||
batch.flush()
|
||||
batch.Flush()
|
||||
}
|
||||
|
||||
// pullScannerRoutine aggregates paths to be scanned after pulling. The scan is
|
||||
|
@ -118,15 +118,15 @@ func (s *indexSender) pause() {
|
||||
// returns the highest sent sequence number.
|
||||
func (s *indexSender) sendIndexTo(ctx context.Context) error {
|
||||
initial := s.prevSequence == 0
|
||||
batch := newFileInfoBatch(nil)
|
||||
batch.flushFn = func(fs []protocol.FileInfo) error {
|
||||
l.Debugf("%v: Sending %d files (<%d bytes)", s, len(batch.infos), batch.size)
|
||||
batch := db.NewFileInfoBatch(nil)
|
||||
batch.SetFlushFunc(func(fs []protocol.FileInfo) error {
|
||||
l.Debugf("%v: Sending %d files (<%d bytes)", s, len(fs), batch.Size())
|
||||
if initial {
|
||||
initial = false
|
||||
return s.conn.Index(ctx, s.folder, fs)
|
||||
}
|
||||
return s.conn.IndexUpdate(ctx, s.folder, fs)
|
||||
}
|
||||
})
|
||||
|
||||
var err error
|
||||
var f protocol.FileInfo
|
||||
@ -140,8 +140,8 @@ func (s *indexSender) sendIndexTo(ctx context.Context) error {
|
||||
// This is to make sure that renames (which is an add followed by a delete) land in the same batch.
|
||||
// Even if the batch is full, we allow a last delete to slip in, we do this by making sure that
|
||||
// the batch ends with a non-delete, or that the last item in the batch is already a delete
|
||||
if batch.full() && (!fi.IsDeleted() || previousWasDelete) {
|
||||
if err = batch.flush(); err != nil {
|
||||
if batch.Full() && (!fi.IsDeleted() || previousWasDelete) {
|
||||
if err = batch.Flush(); err != nil {
|
||||
return false
|
||||
}
|
||||
}
|
||||
@ -181,14 +181,14 @@ func (s *indexSender) sendIndexTo(ctx context.Context) error {
|
||||
|
||||
previousWasDelete = f.IsDeleted()
|
||||
|
||||
batch.append(f)
|
||||
batch.Append(f)
|
||||
return true
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = batch.flush()
|
||||
err = batch.Flush()
|
||||
|
||||
// True if there was nothing to be sent
|
||||
if f.Sequence == 0 {
|
||||
|
@ -42,12 +42,6 @@ import (
|
||||
"github.com/syncthing/syncthing/lib/versioner"
|
||||
)
|
||||
|
||||
// How many files to send in each Index/IndexUpdate message.
|
||||
const (
|
||||
maxBatchSizeBytes = 250 * 1024 // Aim for making index messages no larger than 250 KiB (uncompressed)
|
||||
maxBatchSizeFiles = 1000 // Either way, don't include more files than this
|
||||
)
|
||||
|
||||
type service interface {
|
||||
suture.Service
|
||||
BringToFront(string)
|
||||
@ -3154,51 +3148,6 @@ func (s folderDeviceSet) hasDevice(dev protocol.DeviceID) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
type fileInfoBatch struct {
|
||||
infos []protocol.FileInfo
|
||||
size int
|
||||
flushFn func([]protocol.FileInfo) error
|
||||
}
|
||||
|
||||
func newFileInfoBatch(fn func([]protocol.FileInfo) error) *fileInfoBatch {
|
||||
return &fileInfoBatch{
|
||||
infos: make([]protocol.FileInfo, 0, maxBatchSizeFiles),
|
||||
flushFn: fn,
|
||||
}
|
||||
}
|
||||
|
||||
func (b *fileInfoBatch) append(f protocol.FileInfo) {
|
||||
b.infos = append(b.infos, f)
|
||||
b.size += f.ProtoSize()
|
||||
}
|
||||
|
||||
func (b *fileInfoBatch) full() bool {
|
||||
return len(b.infos) >= maxBatchSizeFiles || b.size >= maxBatchSizeBytes
|
||||
}
|
||||
|
||||
func (b *fileInfoBatch) flushIfFull() error {
|
||||
if b.full() {
|
||||
return b.flush()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *fileInfoBatch) flush() error {
|
||||
if len(b.infos) == 0 {
|
||||
return nil
|
||||
}
|
||||
if err := b.flushFn(b.infos); err != nil {
|
||||
return err
|
||||
}
|
||||
b.reset()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *fileInfoBatch) reset() {
|
||||
b.infos = b.infos[:0]
|
||||
b.size = 0
|
||||
}
|
||||
|
||||
// syncMutexMap is a type safe wrapper for a sync.Map that holds mutexes
|
||||
type syncMutexMap struct {
|
||||
inner stdsync.Map
|
||||
|
@ -1400,21 +1400,24 @@ func TestRequestReceiveEncryptedLocalNoSend(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestRequestIssue7474(t *testing.T) {
|
||||
// Repro for https://github.com/syncthing/syncthing/issues/7474
|
||||
// Devices A, B and C. B connected to A and C, but not A to C.
|
||||
// A has valid file, B ignores it.
|
||||
// In the test C is local, and B is the fake connection.
|
||||
|
||||
func TestRequestGlobalInvalidToValid(t *testing.T) {
|
||||
done := make(chan struct{})
|
||||
defer close(done)
|
||||
|
||||
m, fc, fcfg, wcfgCancel := setupModelWithConnection(t)
|
||||
defer wcfgCancel()
|
||||
fcfg.Devices = append(fcfg.Devices, config.FolderDeviceConfiguration{DeviceID: device2})
|
||||
waiter, err := m.cfg.Modify(func(cfg *config.Configuration) {
|
||||
cfg.SetDevice(newDeviceConfiguration(cfg.Defaults.Device, device2, "device2"))
|
||||
fcfg.Devices = append(fcfg.Devices, config.FolderDeviceConfiguration{DeviceID: device2})
|
||||
cfg.SetFolder(fcfg)
|
||||
})
|
||||
must(t, err)
|
||||
waiter.Wait()
|
||||
tfs := fcfg.Filesystem()
|
||||
defer cleanupModelAndRemoveDir(m, tfs.URI())
|
||||
|
||||
indexChan := make(chan []protocol.FileInfo)
|
||||
indexChan := make(chan []protocol.FileInfo, 1)
|
||||
fc.setIndexFn(func(ctx context.Context, folder string, fs []protocol.FileInfo) error {
|
||||
select {
|
||||
case indexChan <- fs:
|
||||
@ -1426,18 +1429,67 @@ func TestRequestIssue7474(t *testing.T) {
|
||||
|
||||
name := "foo"
|
||||
|
||||
fc.addFileWithLocalFlags(name, protocol.FileInfoTypeFile, protocol.FlagLocalIgnored)
|
||||
// Setup device with valid file, do not send index yet
|
||||
contents := []byte("test file contents\n")
|
||||
fc.addFile(name, 0644, protocol.FileInfoTypeFile, contents)
|
||||
|
||||
// Third device ignoring the same file
|
||||
fc.mut.Lock()
|
||||
file := fc.files[0]
|
||||
fc.mut.Unlock()
|
||||
file.SetIgnored()
|
||||
m.IndexUpdate(device2, fcfg.ID, []protocol.FileInfo{prepareFileInfoForIndex(file)})
|
||||
|
||||
// Wait for the ignored file to be received and possible pulled
|
||||
timeout := time.After(10 * time.Second)
|
||||
globalUpdated := false
|
||||
for {
|
||||
select {
|
||||
case <-timeout:
|
||||
t.Fatalf("timed out (globalUpdated == %v)", globalUpdated)
|
||||
default:
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
}
|
||||
if !globalUpdated {
|
||||
_, ok, err := m.CurrentGlobalFile(fcfg.ID, name)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
globalUpdated = true
|
||||
}
|
||||
snap, err := m.DBSnapshot(fcfg.ID)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
need := snap.NeedSize(protocol.LocalDeviceID)
|
||||
snap.Release()
|
||||
if need.Files == 0 {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// Send the valid file
|
||||
fc.sendIndexUpdate()
|
||||
|
||||
select {
|
||||
case <-time.After(10 * time.Second):
|
||||
t.Fatal("timed out before receiving index")
|
||||
case fs := <-indexChan:
|
||||
if len(fs) != 1 {
|
||||
t.Fatalf("Expected one file in index, got %v", len(fs))
|
||||
}
|
||||
if !fs[0].IsInvalid() {
|
||||
t.Error("Expected invalid file")
|
||||
gotInvalid := false
|
||||
for {
|
||||
select {
|
||||
case <-timeout:
|
||||
t.Fatal("timed out before receiving index")
|
||||
case fs := <-indexChan:
|
||||
if len(fs) != 1 {
|
||||
t.Fatalf("Expected one file in index, got %v", len(fs))
|
||||
}
|
||||
if !fs[0].IsInvalid() {
|
||||
return
|
||||
}
|
||||
if gotInvalid {
|
||||
t.Fatal("Received two invalid index updates")
|
||||
}
|
||||
gotInvalid = true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user