mirror of
https://github.com/octoleo/syncthing.git
synced 2024-12-22 10:58:57 +00:00
lib/model: Properly schedule pull on reconnect (fixes #4504)
We need to reset prevSeq so that we force a full check when someone reconnects - the sequence number may not have changed due to the reconnect. (This is a regression; we did this before f6ea2a7.) Also add an optimization: we schedule a pull after scanning, but there is no need to do so if no changes were detected. This matters now because the scheduled pull actually traverses the database which is expensive. This, however, makes the pull not happen on initial scan if there were no changes during the initial scan. Compensate by always scheduling a pull after initial scan in the rwfolder itself. GitHub-Pull-Request: https://github.com/syncthing/syncthing/pull/4508 LGTM: imsodin, AudriusButkevicius
This commit is contained in:
parent
ee5d0dd43f
commit
5f4ed66aa1
@ -1877,8 +1877,6 @@ func (m *Model) internalScanFolderSubdirs(ctx context.Context, folder string, su
|
||||
return err
|
||||
}
|
||||
|
||||
defer runner.SchedulePull()
|
||||
|
||||
// Clean the list of subitems to ensure that we start at a known
|
||||
// directory, and don't scan subdirectories of things we've already
|
||||
// scanned.
|
||||
@ -1918,6 +1916,15 @@ func (m *Model) internalScanFolderSubdirs(ctx context.Context, folder string, su
|
||||
|
||||
batch := make([]protocol.FileInfo, 0, maxBatchSizeFiles)
|
||||
batchSizeBytes := 0
|
||||
changes := 0
|
||||
|
||||
// Schedule a pull after scanning, but only if we actually detected any
|
||||
// changes.
|
||||
defer func() {
|
||||
if changes > 0 {
|
||||
runner.SchedulePull()
|
||||
}
|
||||
}()
|
||||
|
||||
for f := range fchan {
|
||||
if len(batch) == maxBatchSizeFiles || batchSizeBytes > maxBatchSizeBytes {
|
||||
@ -1929,8 +1936,10 @@ func (m *Model) internalScanFolderSubdirs(ctx context.Context, folder string, su
|
||||
batch = batch[:0]
|
||||
batchSizeBytes = 0
|
||||
}
|
||||
|
||||
batch = append(batch, f)
|
||||
batchSizeBytes += f.ProtoSize()
|
||||
changes++
|
||||
}
|
||||
|
||||
if err := runner.CheckHealth(); err != nil {
|
||||
@ -1972,6 +1981,7 @@ func (m *Model) internalScanFolderSubdirs(ctx context.Context, folder string, su
|
||||
nf := f.ConvertToInvalidFileInfo(m.id.Short())
|
||||
batch = append(batch, nf)
|
||||
batchSizeBytes += nf.ProtoSize()
|
||||
changes++
|
||||
|
||||
case !f.IsInvalid() && !f.IsDeleted():
|
||||
// The file is valid and not deleted. Lets check if it's
|
||||
@ -1998,6 +2008,7 @@ func (m *Model) internalScanFolderSubdirs(ctx context.Context, folder string, su
|
||||
|
||||
batch = append(batch, nf)
|
||||
batchSizeBytes += nf.ProtoSize()
|
||||
changes++
|
||||
}
|
||||
}
|
||||
return true
|
||||
|
@ -147,7 +147,6 @@ func (f *sendReceiveFolder) Serve() {
|
||||
f.setState(FolderIdle)
|
||||
}()
|
||||
|
||||
var prevSeq int64
|
||||
var prevIgnoreHash string
|
||||
var success bool
|
||||
pullFailTimer := time.NewTimer(time.Duration(0))
|
||||
@ -157,6 +156,8 @@ func (f *sendReceiveFolder) Serve() {
|
||||
f.startWatch()
|
||||
}
|
||||
|
||||
initialCompleted := f.initialScanFinished
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-f.ctx.Done():
|
||||
@ -169,13 +170,13 @@ func (f *sendReceiveFolder) Serve() {
|
||||
default:
|
||||
}
|
||||
|
||||
if prevSeq, prevIgnoreHash, success = f.pull(prevSeq, prevIgnoreHash); !success {
|
||||
if prevIgnoreHash, success = f.pull(prevIgnoreHash); !success {
|
||||
// Pulling failed, try again later.
|
||||
pullFailTimer.Reset(f.pause)
|
||||
}
|
||||
|
||||
case <-pullFailTimer.C:
|
||||
if prevSeq, prevIgnoreHash, success = f.pull(prevSeq, prevIgnoreHash); !success {
|
||||
if prevIgnoreHash, success = f.pull(prevIgnoreHash); !success {
|
||||
// Pulling failed, try again later.
|
||||
pullFailTimer.Reset(f.pause)
|
||||
// Back off from retrying to pull with an upper limit.
|
||||
@ -184,6 +185,14 @@ func (f *sendReceiveFolder) Serve() {
|
||||
}
|
||||
}
|
||||
|
||||
case <-initialCompleted:
|
||||
// Initial scan has completed, we should do a pull
|
||||
initialCompleted = nil // never hit this case again
|
||||
if prevIgnoreHash, success = f.pull(prevIgnoreHash); !success {
|
||||
// Pulling failed, try again later.
|
||||
pullFailTimer.Reset(f.pause)
|
||||
}
|
||||
|
||||
// The reason for running the scanner from within the puller is that
|
||||
// this is the easiest way to make sure we are not doing both at the
|
||||
// same time.
|
||||
@ -222,41 +231,27 @@ func (f *sendReceiveFolder) String() string {
|
||||
return fmt.Sprintf("sendReceiveFolder/%s@%p", f.folderID, f)
|
||||
}
|
||||
|
||||
func (f *sendReceiveFolder) pull(prevSeq int64, prevIgnoreHash string) (curSeq int64, curIgnoreHash string, success bool) {
|
||||
func (f *sendReceiveFolder) pull(prevIgnoreHash string) (curIgnoreHash string, success bool) {
|
||||
select {
|
||||
case <-f.initialScanFinished:
|
||||
default:
|
||||
// Once the initial scan finished, a pull will be scheduled
|
||||
return prevSeq, prevIgnoreHash, true
|
||||
return prevIgnoreHash, true
|
||||
}
|
||||
|
||||
f.model.fmut.RLock()
|
||||
curIgnores := f.model.folderIgnores[f.folderID]
|
||||
f.model.fmut.RUnlock()
|
||||
|
||||
curSeq = prevSeq
|
||||
curIgnoreHash = curIgnores.Hash()
|
||||
ignoresChanged := curIgnoreHash != prevIgnoreHash
|
||||
if ignoresChanged {
|
||||
// The ignore patterns have changed. We need to re-evaluate if
|
||||
// there are files we need now that were ignored before.
|
||||
l.Debugln(f, "ignore patterns have changed, resetting curSeq")
|
||||
curSeq = 0
|
||||
}
|
||||
|
||||
// RemoteSequence() is a fast call, doesn't touch the database.
|
||||
remoteSeq, ok := f.model.RemoteSequence(f.folderID)
|
||||
if !ok || remoteSeq == curSeq {
|
||||
l.Debugln(f, "skip (remoteSeq == curSeq)", curSeq, ok)
|
||||
return curSeq, curIgnoreHash, true
|
||||
}
|
||||
|
||||
if err := f.CheckHealth(); err != nil {
|
||||
l.Debugln("Skipping pull of", f.Description(), "due to folder error:", err)
|
||||
return curSeq, curIgnoreHash, true
|
||||
return curIgnoreHash, true
|
||||
}
|
||||
|
||||
l.Debugln(f, "pulling", curSeq, remoteSeq)
|
||||
l.Debugln(f, "pulling")
|
||||
|
||||
f.setState(FolderSyncing)
|
||||
f.clearErrors()
|
||||
@ -273,20 +268,6 @@ func (f *sendReceiveFolder) pull(prevSeq int64, prevIgnoreHash string) (curSeq i
|
||||
// No files were changed by the puller, so we are in
|
||||
// sync. Update the local version number.
|
||||
|
||||
if lv, ok := f.model.RemoteSequence(f.folderID); ok && lv < remoteSeq {
|
||||
// There's a corner case where the device we needed
|
||||
// files from disconnected during the puller
|
||||
// iteration. The files will have been removed from
|
||||
// the index, so we've concluded that we don't need
|
||||
// them, but at the same time we have the old remote sequence
|
||||
// that includes those files in remoteSeq. So we
|
||||
// catch the case that this sequence might have
|
||||
// decreased here.
|
||||
l.Debugf("%v adjusting remoteSeq from %d to %d", remoteSeq, lv)
|
||||
remoteSeq = lv
|
||||
}
|
||||
curSeq = remoteSeq
|
||||
|
||||
f.pause = f.basePause()
|
||||
|
||||
break
|
||||
@ -313,7 +294,7 @@ func (f *sendReceiveFolder) pull(prevSeq int64, prevIgnoreHash string) (curSeq i
|
||||
|
||||
f.setState(FolderIdle)
|
||||
|
||||
return curSeq, curIgnoreHash, changed == 0
|
||||
return curIgnoreHash, changed == 0
|
||||
}
|
||||
|
||||
// pullerIteration runs a single puller iteration for the given folder and
|
||||
|
@ -10,27 +10,19 @@ package integration
|
||||
|
||||
import (
|
||||
"log"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestReconnectReceiverDuringTransfer(t *testing.T) {
|
||||
testReconnectDuringTransfer(t, false, true, 0, 0)
|
||||
testReconnectDuringTransfer(t, false, true)
|
||||
}
|
||||
|
||||
func TestReconnectSenderDuringTransfer(t *testing.T) {
|
||||
testReconnectDuringTransfer(t, true, false, 0, 0)
|
||||
testReconnectDuringTransfer(t, true, false)
|
||||
}
|
||||
|
||||
func TestReconnectSenderAndReceiverDuringTransfer(t *testing.T) {
|
||||
// Give the receiver some time to rot with needed files but
|
||||
// without any peer. This triggers
|
||||
// https://github.com/syncthing/syncthing/issues/463
|
||||
testReconnectDuringTransfer(t, true, true, 10*time.Second, 0)
|
||||
}
|
||||
|
||||
func testReconnectDuringTransfer(t *testing.T, ReconnectSender, ReconnectReceiver bool, senderDelay, receiverDelay time.Duration) {
|
||||
func testReconnectDuringTransfer(t *testing.T, restartSender, restartReceiver bool) {
|
||||
log.Println("Cleaning...")
|
||||
err := removeAll("s1", "s2", "h1/index*", "h2/index*")
|
||||
if err != nil {
|
||||
@ -38,7 +30,7 @@ func testReconnectDuringTransfer(t *testing.T, ReconnectSender, ReconnectReceive
|
||||
}
|
||||
|
||||
log.Println("Generating files...")
|
||||
err = generateFiles("s1", 2500, 20, "../LICENSE")
|
||||
err = generateFiles("s1", 250, 20, "../LICENSE")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -63,8 +55,9 @@ func testReconnectDuringTransfer(t *testing.T, ReconnectSender, ReconnectReceive
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
cfg.Options.MaxRecvKbps = 100
|
||||
cfg.Options.MaxSendKbps = 100
|
||||
cfg.Options.MaxRecvKbps = 750
|
||||
cfg.Options.MaxSendKbps = 750
|
||||
cfg.Options.LimitBandwidthInLan = true
|
||||
if err := receiver.PostConfig(cfg); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -86,42 +79,22 @@ func testReconnectDuringTransfer(t *testing.T, ReconnectSender, ReconnectReceive
|
||||
// Receiver has made progress
|
||||
prevBytes = recv.InSyncBytes
|
||||
|
||||
if ReconnectReceiver {
|
||||
log.Printf("Pausing receiver...")
|
||||
receiver.PauseAll()
|
||||
}
|
||||
|
||||
if ReconnectSender {
|
||||
log.Printf("Pausing sender...")
|
||||
sender.PauseAll()
|
||||
}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
|
||||
if ReconnectReceiver {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
time.Sleep(receiverDelay)
|
||||
log.Printf("Resuming receiver...")
|
||||
if restartReceiver {
|
||||
log.Printf("Stopping receiver...")
|
||||
receiver.Stop()
|
||||
receiver = startInstance(t, 2)
|
||||
receiver.ResumeAll()
|
||||
wg.Done()
|
||||
}()
|
||||
}
|
||||
|
||||
if ReconnectSender {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
time.Sleep(senderDelay)
|
||||
log.Printf("Resuming sender...")
|
||||
if restartSender {
|
||||
log.Printf("Stopping sender...")
|
||||
sender.Stop()
|
||||
sender = startInstance(t, 1)
|
||||
sender.ResumeAll()
|
||||
wg.Done()
|
||||
}()
|
||||
}
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
time.Sleep(time.Second)
|
||||
time.Sleep(250 * time.Millisecond)
|
||||
}
|
||||
|
||||
// Reset rate limits
|
||||
@ -131,6 +104,7 @@ func testReconnectDuringTransfer(t *testing.T, ReconnectSender, ReconnectReceive
|
||||
}
|
||||
cfg.Options.MaxRecvKbps = 0
|
||||
cfg.Options.MaxSendKbps = 0
|
||||
cfg.Options.LimitBandwidthInLan = false
|
||||
if err := receiver.PostConfig(cfg); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
@ -10,7 +10,9 @@ package integration
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"os"
|
||||
"path/filepath"
|
||||
@ -128,25 +130,15 @@ func TestReset(t *testing.T) {
|
||||
}
|
||||
|
||||
func createFiles(t *testing.T) int {
|
||||
// Create eight empty files and directories
|
||||
files := []string{"f1", "f2", "f3", "f4", "f11", "f12", "f13", "f14"}
|
||||
dirs := []string{"d1", "d2", "d3", "d4", "d11", "d12", "d13", "d14"}
|
||||
all := append(files, dirs...)
|
||||
// Create a few files
|
||||
|
||||
for _, file := range files {
|
||||
fd, err := os.Create(filepath.Join("s1", file))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
fd.Close()
|
||||
}
|
||||
|
||||
for _, dir := range dirs {
|
||||
err := os.Mkdir(filepath.Join("s1", dir), 0755)
|
||||
if err != nil {
|
||||
const n = 8
|
||||
for i := 0; i < n; i++ {
|
||||
file := fmt.Sprintf("f%d", i)
|
||||
if err := ioutil.WriteFile(filepath.Join("s1", file), []byte("data"), 0644); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
return len(all)
|
||||
return n
|
||||
}
|
||||
|
@ -20,8 +20,8 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
longTimeLimit = 5 * time.Minute
|
||||
shortTimeLimit = 45 * time.Second
|
||||
longTimeLimit = 1 * time.Minute
|
||||
shortTimeLimit = 25 * time.Second
|
||||
s12Folder = `¯\_(ツ)_/¯ Räksmörgås 动作 Адрес` // This was renamed to ensure arbitrary folder IDs are fine.
|
||||
)
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user