Merge pull request #1474 from calmh/refactor-states

Refactor state handling
This commit is contained in:
Audrius Butkevicius 2015-03-17 19:09:48 +00:00
commit c82b5d4982
9 changed files with 276 additions and 158 deletions

View File

@ -0,0 +1,89 @@
// Copyright (C) 2015 The Syncthing Authors.
//
// This program is free software: you can redistribute it and/or modify it
// under the terms of the GNU General Public License as published by the Free
// Software Foundation, either version 3 of the License, or (at your option)
// any later version.
//
// This program is distributed in the hope that it will be useful, but WITHOUT
// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
// FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
// more details.
//
// You should have received a copy of the GNU General Public License along
// with this program. If not, see <http://www.gnu.org/licenses/>.
package model
import (
"sync"
"time"
"github.com/syncthing/syncthing/internal/events"
)
type folderState int
const (
FolderIdle folderState = iota
FolderScanning
FolderSyncing
FolderCleaning
)
func (s folderState) String() string {
switch s {
case FolderIdle:
return "idle"
case FolderScanning:
return "scanning"
case FolderCleaning:
return "cleaning"
case FolderSyncing:
return "syncing"
default:
return "unknown"
}
}
type stateTracker struct {
folder string
mut sync.Mutex
current folderState
changed time.Time
}
func (s *stateTracker) setState(newState folderState) {
s.mut.Lock()
if newState != s.current {
/* This should hold later...
if s.current != FolderIdle && (newState == FolderScanning || newState == FolderSyncing) {
panic("illegal state transition " + s.current.String() + " -> " + newState.String())
}
*/
eventData := map[string]interface{}{
"folder": s.folder,
"to": newState.String(),
"from": s.current.String(),
}
if !s.changed.IsZero() {
eventData["duration"] = time.Since(s.changed).Seconds()
}
s.current = newState
s.changed = time.Now()
events.Default.Log(events.StateChanged, eventData)
}
s.mut.Unlock()
}
func (s *stateTracker) getState() (current folderState, changed time.Time) {
s.mut.Lock()
current, changed = s.current, s.changed
s.mut.Unlock()
return
}

View File

@ -36,30 +36,6 @@ import (
"github.com/syndtr/goleveldb/leveldb" "github.com/syndtr/goleveldb/leveldb"
) )
type folderState int
const (
FolderIdle folderState = iota
FolderScanning
FolderSyncing
FolderCleaning
)
func (s folderState) String() string {
switch s {
case FolderIdle:
return "idle"
case FolderScanning:
return "scanning"
case FolderCleaning:
return "cleaning"
case FolderSyncing:
return "syncing"
default:
return "unknown"
}
}
// How many files to send in each Index/IndexUpdate message. // How many files to send in each Index/IndexUpdate message.
const ( const (
indexTargetSize = 250 * 1024 // Aim for making index messages no larger than 250 KiB (uncompressed) indexTargetSize = 250 * 1024 // Aim for making index messages no larger than 250 KiB (uncompressed)
@ -73,6 +49,9 @@ type service interface {
Stop() Stop()
Jobs() ([]string, []string) // In progress, Queued Jobs() ([]string, []string) // In progress, Queued
BringToFront(string) BringToFront(string)
setState(folderState)
getState() (folderState, time.Time)
} }
type Model struct { type Model struct {
@ -95,10 +74,6 @@ type Model struct {
folderStatRefs map[string]*stats.FolderStatisticsReference // folder -> statsRef folderStatRefs map[string]*stats.FolderStatisticsReference // folder -> statsRef
fmut sync.RWMutex // protects the above fmut sync.RWMutex // protects the above
folderState map[string]folderState // folder -> state
folderStateChanged map[string]time.Time // folder -> time when state changed
smut sync.RWMutex
protoConn map[protocol.DeviceID]protocol.Connection protoConn map[protocol.DeviceID]protocol.Connection
rawConn map[protocol.DeviceID]io.Closer rawConn map[protocol.DeviceID]io.Closer
deviceVer map[protocol.DeviceID]string deviceVer map[protocol.DeviceID]string
@ -120,26 +95,24 @@ var (
// for file data without altering the local folder in any way. // for file data without altering the local folder in any way.
func NewModel(cfg *config.Wrapper, deviceName, clientName, clientVersion string, ldb *leveldb.DB) *Model { func NewModel(cfg *config.Wrapper, deviceName, clientName, clientVersion string, ldb *leveldb.DB) *Model {
m := &Model{ m := &Model{
cfg: cfg, cfg: cfg,
db: ldb, db: ldb,
deviceName: deviceName, deviceName: deviceName,
clientName: clientName, clientName: clientName,
clientVersion: clientVersion, clientVersion: clientVersion,
folderCfgs: make(map[string]config.FolderConfiguration), folderCfgs: make(map[string]config.FolderConfiguration),
folderFiles: make(map[string]*db.FileSet), folderFiles: make(map[string]*db.FileSet),
folderDevices: make(map[string][]protocol.DeviceID), folderDevices: make(map[string][]protocol.DeviceID),
deviceFolders: make(map[protocol.DeviceID][]string), deviceFolders: make(map[protocol.DeviceID][]string),
deviceStatRefs: make(map[protocol.DeviceID]*stats.DeviceStatisticsReference), deviceStatRefs: make(map[protocol.DeviceID]*stats.DeviceStatisticsReference),
folderIgnores: make(map[string]*ignore.Matcher), folderIgnores: make(map[string]*ignore.Matcher),
folderRunners: make(map[string]service), folderRunners: make(map[string]service),
folderStatRefs: make(map[string]*stats.FolderStatisticsReference), folderStatRefs: make(map[string]*stats.FolderStatisticsReference),
folderState: make(map[string]folderState), protoConn: make(map[protocol.DeviceID]protocol.Connection),
folderStateChanged: make(map[string]time.Time), rawConn: make(map[protocol.DeviceID]io.Closer),
protoConn: make(map[protocol.DeviceID]protocol.Connection), deviceVer: make(map[protocol.DeviceID]string),
rawConn: make(map[protocol.DeviceID]io.Closer), finder: db.NewBlockFinder(ldb, cfg),
deviceVer: make(map[protocol.DeviceID]string), progressEmitter: NewProgressEmitter(cfg),
finder: db.NewBlockFinder(ldb, cfg),
progressEmitter: NewProgressEmitter(cfg),
} }
if cfg.Options().ProgressUpdateIntervalS > -1 { if cfg.Options().ProgressUpdateIntervalS > -1 {
go m.progressEmitter.Serve() go m.progressEmitter.Serve()
@ -153,7 +126,6 @@ func NewModel(cfg *config.Wrapper, deviceName, clientName, clientVersion string,
} }
} }
deadlockDetect(&m.fmut, time.Duration(timeout)*time.Second) deadlockDetect(&m.fmut, time.Duration(timeout)*time.Second)
deadlockDetect(&m.smut, time.Duration(timeout)*time.Second)
deadlockDetect(&m.pmut, time.Duration(timeout)*time.Second) deadlockDetect(&m.pmut, time.Duration(timeout)*time.Second)
return m return m
} }
@ -172,18 +144,7 @@ func (m *Model) StartFolderRW(folder string) {
if ok { if ok {
panic("cannot start already running folder " + folder) panic("cannot start already running folder " + folder)
} }
p := &Puller{ p := newRWFolder(m, cfg)
folder: folder,
dir: cfg.Path,
scanIntv: time.Duration(cfg.RescanIntervalS) * time.Second,
model: m,
ignorePerms: cfg.IgnorePerms,
lenientMtimes: cfg.LenientMtimes,
progressEmitter: m.progressEmitter,
copiers: cfg.Copiers,
pullers: cfg.Pullers,
queue: newJobQueue(),
}
m.folderRunners[folder] = p m.folderRunners[folder] = p
m.fmut.Unlock() m.fmut.Unlock()
@ -216,11 +177,7 @@ func (m *Model) StartFolderRO(folder string) {
if ok { if ok {
panic("cannot start already running folder " + folder) panic("cannot start already running folder " + folder)
} }
s := &Scanner{ s := newROFolder(m, folder, time.Duration(cfg.RescanIntervalS)*time.Second)
folder: folder,
intv: time.Duration(cfg.RescanIntervalS) * time.Second,
model: m,
}
m.folderRunners[folder] = s m.folderRunners[folder] = s
m.fmut.Unlock() m.fmut.Unlock()
@ -1154,11 +1111,15 @@ func (m *Model) ScanFolderSub(folder, sub string) error {
} }
m.fmut.Lock() m.fmut.Lock()
fs, ok := m.folderFiles[folder] fs := m.folderFiles[folder]
folderCfg := m.folderCfgs[folder] folderCfg := m.folderCfgs[folder]
ignores := m.folderIgnores[folder] ignores := m.folderIgnores[folder]
runner, ok := m.folderRunners[folder]
m.fmut.Unlock() m.fmut.Unlock()
// Folders are added to folderRunners only when they are started. We can't
// scan them before they have started, so that's what we need to check for
// here.
if !ok { if !ok {
return errors.New("no such folder") return errors.New("no such folder")
} }
@ -1189,7 +1150,7 @@ func (m *Model) ScanFolderSub(folder, sub string) error {
Hashers: folderCfg.Hashers, Hashers: folderCfg.Hashers,
} }
m.setState(folder, FolderScanning) runner.setState(FolderScanning)
fchan, err := w.Walk() fchan, err := w.Walk()
if err != nil { if err != nil {
@ -1289,7 +1250,7 @@ func (m *Model) ScanFolderSub(folder, sub string) error {
fs.Update(protocol.LocalDeviceID, batch) fs.Update(protocol.LocalDeviceID, batch)
} }
m.setState(folder, FolderIdle) runner.setState(FolderIdle)
return nil return nil
} }
@ -1332,40 +1293,24 @@ func (m *Model) clusterConfig(device protocol.DeviceID) protocol.ClusterConfigMe
return cm return cm
} }
func (m *Model) setState(folder string, state folderState) {
m.smut.Lock()
oldState := m.folderState[folder]
changed, ok := m.folderStateChanged[folder]
if state != oldState {
m.folderState[folder] = state
m.folderStateChanged[folder] = time.Now()
eventData := map[string]interface{}{
"folder": folder,
"to": state.String(),
}
if ok {
eventData["duration"] = time.Since(changed).Seconds()
eventData["from"] = oldState.String()
}
events.Default.Log(events.StateChanged, eventData)
}
m.smut.Unlock()
}
func (m *Model) State(folder string) (string, time.Time) { func (m *Model) State(folder string) (string, time.Time) {
m.smut.RLock() m.fmut.RLock()
state := m.folderState[folder] runner, ok := m.folderRunners[folder]
changed := m.folderStateChanged[folder] m.fmut.RUnlock()
m.smut.RUnlock() if !ok {
return "", time.Time{}
}
state, changed := runner.getState()
return state.String(), changed return state.String(), changed
} }
func (m *Model) Override(folder string) { func (m *Model) Override(folder string) {
m.fmut.RLock() m.fmut.RLock()
fs := m.folderFiles[folder] fs := m.folderFiles[folder]
runner := m.folderRunners[folder]
m.fmut.RUnlock() m.fmut.RUnlock()
m.setState(folder, FolderScanning) runner.setState(FolderScanning)
batch := make([]protocol.FileInfo, 0, indexBatchSize) batch := make([]protocol.FileInfo, 0, indexBatchSize)
fs.WithNeed(protocol.LocalDeviceID, func(fi db.FileIntf) bool { fs.WithNeed(protocol.LocalDeviceID, func(fi db.FileIntf) bool {
need := fi.(protocol.FileInfo) need := fi.(protocol.FileInfo)
@ -1391,7 +1336,7 @@ func (m *Model) Override(folder string) {
if len(batch) > 0 { if len(batch) > 0 {
fs.Update(protocol.LocalDeviceID, batch) fs.Update(protocol.LocalDeviceID, batch)
} }
m.setState(folder, FolderIdle) runner.setState(FolderIdle)
} }
// CurrentLocalVersion returns the change version for the given folder. // CurrentLocalVersion returns the change version for the given folder.

View File

@ -90,6 +90,7 @@ func TestRequest(t *testing.T) {
// device1 shares default, but device2 doesn't // device1 shares default, but device2 doesn't
m.AddFolder(defaultFolderConfig) m.AddFolder(defaultFolderConfig)
m.StartFolderRO("default")
m.ScanFolder("default") m.ScanFolder("default")
// Existing, shared file // Existing, shared file
@ -470,6 +471,7 @@ func TestIgnores(t *testing.T) {
db, _ := leveldb.Open(storage.NewMemStorage(), nil) db, _ := leveldb.Open(storage.NewMemStorage(), nil)
m := NewModel(defaultConfig, "device", "syncthing", "dev", db) m := NewModel(defaultConfig, "device", "syncthing", "dev", db)
m.AddFolder(defaultFolderConfig) m.AddFolder(defaultFolderConfig)
m.StartFolderRO("default")
expected := []string{ expected := []string{
".*", ".*",

View File

@ -12,14 +12,26 @@ import (
"time" "time"
) )
type Scanner struct { type roFolder struct {
stateTracker
folder string folder string
intv time.Duration intv time.Duration
model *Model model *Model
stop chan struct{} stop chan struct{}
} }
func (s *Scanner) Serve() { func newROFolder(model *Model, folder string, interval time.Duration) *roFolder {
return &roFolder{
stateTracker: stateTracker{folder: folder},
folder: folder,
intv: interval,
model: model,
stop: make(chan struct{}),
}
}
func (s *roFolder) Serve() {
if debug { if debug {
l.Debugln(s, "starting") l.Debugln(s, "starting")
defer l.Debugln(s, "exiting") defer l.Debugln(s, "exiting")
@ -39,12 +51,12 @@ func (s *Scanner) Serve() {
l.Debugln(s, "rescan") l.Debugln(s, "rescan")
} }
s.model.setState(s.folder, FolderScanning) s.setState(FolderScanning)
if err := s.model.ScanFolder(s.folder); err != nil { if err := s.model.ScanFolder(s.folder); err != nil {
s.model.cfg.InvalidateFolder(s.folder, err.Error()) s.model.cfg.InvalidateFolder(s.folder, err.Error())
return return
} }
s.model.setState(s.folder, FolderIdle) s.setState(FolderIdle)
if !initialScanCompleted { if !initialScanCompleted {
l.Infoln("Completed initial scan (ro) of folder", s.folder) l.Infoln("Completed initial scan (ro) of folder", s.folder)
@ -62,16 +74,16 @@ func (s *Scanner) Serve() {
} }
} }
func (s *Scanner) Stop() { func (s *roFolder) Stop() {
close(s.stop) close(s.stop)
} }
func (s *Scanner) String() string { func (s *roFolder) String() string {
return fmt.Sprintf("scanner/%s@%p", s.folder, s) return fmt.Sprintf("roFolder/%s@%p", s.folder, s)
} }
func (s *Scanner) BringToFront(string) {} func (s *roFolder) BringToFront(string) {}
func (s *Scanner) Jobs() ([]string, []string) { func (s *roFolder) Jobs() ([]string, []string) {
return nil, nil return nil, nil
} }

View File

@ -54,31 +54,53 @@ var (
errNoDevice = errors.New("no available source device") errNoDevice = errors.New("no available source device")
) )
type Puller struct { type rwFolder struct {
folder string stateTracker
dir string
scanIntv time.Duration
model *Model model *Model
stop chan struct{}
versioner versioner.Versioner
ignorePerms bool
lenientMtimes bool
progressEmitter *ProgressEmitter progressEmitter *ProgressEmitter
copiers int
pullers int folder string
queue *jobQueue dir string
scanIntv time.Duration
versioner versioner.Versioner
ignorePerms bool
lenientMtimes bool
copiers int
pullers int
stop chan struct{}
queue *jobQueue
}
func newRWFolder(m *Model, cfg config.FolderConfiguration) *rwFolder {
return &rwFolder{
stateTracker: stateTracker{folder: cfg.ID},
model: m,
progressEmitter: m.progressEmitter,
folder: cfg.ID,
dir: cfg.Path,
scanIntv: time.Duration(cfg.RescanIntervalS) * time.Second,
ignorePerms: cfg.IgnorePerms,
lenientMtimes: cfg.LenientMtimes,
copiers: cfg.Copiers,
pullers: cfg.Pullers,
stop: make(chan struct{}),
queue: newJobQueue(),
}
} }
// Serve will run scans and pulls. It will return when Stop()ed or on a // Serve will run scans and pulls. It will return when Stop()ed or on a
// critical error. // critical error.
func (p *Puller) Serve() { func (p *rwFolder) Serve() {
if debug { if debug {
l.Debugln(p, "starting") l.Debugln(p, "starting")
defer l.Debugln(p, "exiting") defer l.Debugln(p, "exiting")
} }
p.stop = make(chan struct{})
pullTimer := time.NewTimer(checkPullIntv) pullTimer := time.NewTimer(checkPullIntv)
scanTimer := time.NewTimer(time.Millisecond) // The first scan should be done immediately. scanTimer := time.NewTimer(time.Millisecond) // The first scan should be done immediately.
@ -86,7 +108,7 @@ func (p *Puller) Serve() {
pullTimer.Stop() pullTimer.Stop()
scanTimer.Stop() scanTimer.Stop()
// TODO: Should there be an actual FolderStopped state? // TODO: Should there be an actual FolderStopped state?
p.model.setState(p.folder, FolderIdle) p.setState(FolderIdle)
}() }()
var prevVer int64 var prevVer int64
@ -143,7 +165,7 @@ loop:
if debug { if debug {
l.Debugln(p, "pulling", prevVer, curVer) l.Debugln(p, "pulling", prevVer, curVer)
} }
p.model.setState(p.folder, FolderSyncing) p.setState(FolderSyncing)
tries := 0 tries := 0
for { for {
tries++ tries++
@ -191,7 +213,7 @@ loop:
break break
} }
} }
p.model.setState(p.folder, FolderIdle) p.setState(FolderIdle)
// The reason for running the scanner from within the puller is that // The reason for running the scanner from within the puller is that
// this is the easiest way to make sure we are not doing both at the // this is the easiest way to make sure we are not doing both at the
@ -200,12 +222,12 @@ loop:
if debug { if debug {
l.Debugln(p, "rescan") l.Debugln(p, "rescan")
} }
p.model.setState(p.folder, FolderScanning) p.setState(FolderScanning)
if err := p.model.ScanFolder(p.folder); err != nil { if err := p.model.ScanFolder(p.folder); err != nil {
p.model.cfg.InvalidateFolder(p.folder, err.Error()) p.model.cfg.InvalidateFolder(p.folder, err.Error())
break loop break loop
} }
p.model.setState(p.folder, FolderIdle) p.setState(FolderIdle)
if p.scanIntv > 0 { if p.scanIntv > 0 {
// Sleep a random time between 3/4 and 5/4 of the configured interval. // Sleep a random time between 3/4 and 5/4 of the configured interval.
sleepNanos := (p.scanIntv.Nanoseconds()*3 + rand.Int63n(2*p.scanIntv.Nanoseconds())) / 4 sleepNanos := (p.scanIntv.Nanoseconds()*3 + rand.Int63n(2*p.scanIntv.Nanoseconds())) / 4
@ -224,19 +246,19 @@ loop:
} }
} }
func (p *Puller) Stop() { func (p *rwFolder) Stop() {
close(p.stop) close(p.stop)
} }
func (p *Puller) String() string { func (p *rwFolder) String() string {
return fmt.Sprintf("puller/%s@%p", p.folder, p) return fmt.Sprintf("rwFolder/%s@%p", p.folder, p)
} }
// pullerIteration runs a single puller iteration for the given folder and // pullerIteration runs a single puller iteration for the given folder and
// returns the number items that should have been synced (even those that // returns the number items that should have been synced (even those that
// might have failed). One puller iteration handles all files currently // might have failed). One puller iteration handles all files currently
// flagged as needed in the folder. // flagged as needed in the folder.
func (p *Puller) pullerIteration(ignores *ignore.Matcher) int { func (p *rwFolder) pullerIteration(ignores *ignore.Matcher) int {
pullChan := make(chan pullBlockState) pullChan := make(chan pullBlockState)
copyChan := make(chan copyBlocksState) copyChan := make(chan copyBlocksState)
finisherChan := make(chan *sharedPullerState) finisherChan := make(chan *sharedPullerState)
@ -422,7 +444,7 @@ nextFile:
} }
// handleDir creates or updates the given directory // handleDir creates or updates the given directory
func (p *Puller) handleDir(file protocol.FileInfo) { func (p *rwFolder) handleDir(file protocol.FileInfo) {
var err error var err error
events.Default.Log(events.ItemStarted, map[string]interface{}{ events.Default.Log(events.ItemStarted, map[string]interface{}{
"folder": p.folder, "folder": p.folder,
@ -497,7 +519,7 @@ func (p *Puller) handleDir(file protocol.FileInfo) {
} }
// deleteDir attempts to delete the given directory // deleteDir attempts to delete the given directory
func (p *Puller) deleteDir(file protocol.FileInfo) { func (p *rwFolder) deleteDir(file protocol.FileInfo) {
var err error var err error
events.Default.Log(events.ItemStarted, map[string]interface{}{ events.Default.Log(events.ItemStarted, map[string]interface{}{
"folder": p.folder, "folder": p.folder,
@ -532,7 +554,7 @@ func (p *Puller) deleteDir(file protocol.FileInfo) {
} }
// deleteFile attempts to delete the given file // deleteFile attempts to delete the given file
func (p *Puller) deleteFile(file protocol.FileInfo) { func (p *rwFolder) deleteFile(file protocol.FileInfo) {
var err error var err error
events.Default.Log(events.ItemStarted, map[string]interface{}{ events.Default.Log(events.ItemStarted, map[string]interface{}{
"folder": p.folder, "folder": p.folder,
@ -564,7 +586,7 @@ func (p *Puller) deleteFile(file protocol.FileInfo) {
// renameFile attempts to rename an existing file to a destination // renameFile attempts to rename an existing file to a destination
// and set the right attributes on it. // and set the right attributes on it.
func (p *Puller) renameFile(source, target protocol.FileInfo) { func (p *rwFolder) renameFile(source, target protocol.FileInfo) {
var err error var err error
events.Default.Log(events.ItemStarted, map[string]interface{}{ events.Default.Log(events.ItemStarted, map[string]interface{}{
"folder": p.folder, "folder": p.folder,
@ -634,7 +656,7 @@ func (p *Puller) renameFile(source, target protocol.FileInfo) {
// handleFile queues the copies and pulls as necessary for a single new or // handleFile queues the copies and pulls as necessary for a single new or
// changed file. // changed file.
func (p *Puller) handleFile(file protocol.FileInfo, copyChan chan<- copyBlocksState, finisherChan chan<- *sharedPullerState) { func (p *rwFolder) handleFile(file protocol.FileInfo, copyChan chan<- copyBlocksState, finisherChan chan<- *sharedPullerState) {
events.Default.Log(events.ItemStarted, map[string]interface{}{ events.Default.Log(events.ItemStarted, map[string]interface{}{
"folder": p.folder, "folder": p.folder,
"item": file.Name, "item": file.Name,
@ -732,7 +754,7 @@ func (p *Puller) handleFile(file protocol.FileInfo, copyChan chan<- copyBlocksSt
// shortcutFile sets file mode and modification time, when that's the only // shortcutFile sets file mode and modification time, when that's the only
// thing that has changed. // thing that has changed.
func (p *Puller) shortcutFile(file protocol.FileInfo) (err error) { func (p *rwFolder) shortcutFile(file protocol.FileInfo) (err error) {
realName := filepath.Join(p.dir, file.Name) realName := filepath.Join(p.dir, file.Name)
if !p.ignorePerms { if !p.ignorePerms {
err = os.Chmod(realName, os.FileMode(file.Flags&0777)) err = os.Chmod(realName, os.FileMode(file.Flags&0777))
@ -763,7 +785,7 @@ func (p *Puller) shortcutFile(file protocol.FileInfo) (err error) {
} }
// shortcutSymlink changes the symlinks type if necessery. // shortcutSymlink changes the symlinks type if necessery.
func (p *Puller) shortcutSymlink(file protocol.FileInfo) (err error) { func (p *rwFolder) shortcutSymlink(file protocol.FileInfo) (err error) {
err = symlinks.ChangeType(filepath.Join(p.dir, file.Name), file.Flags) err = symlinks.ChangeType(filepath.Join(p.dir, file.Name), file.Flags)
if err == nil { if err == nil {
p.model.updateLocal(p.folder, file) p.model.updateLocal(p.folder, file)
@ -775,7 +797,7 @@ func (p *Puller) shortcutSymlink(file protocol.FileInfo) (err error) {
// copierRoutine reads copierStates until the in channel closes and performs // copierRoutine reads copierStates until the in channel closes and performs
// the relevant copies when possible, or passes it to the puller routine. // the relevant copies when possible, or passes it to the puller routine.
func (p *Puller) copierRoutine(in <-chan copyBlocksState, pullChan chan<- pullBlockState, out chan<- *sharedPullerState) { func (p *rwFolder) copierRoutine(in <-chan copyBlocksState, pullChan chan<- pullBlockState, out chan<- *sharedPullerState) {
buf := make([]byte, protocol.BlockSize) buf := make([]byte, protocol.BlockSize)
for state := range in { for state := range in {
@ -857,7 +879,7 @@ func (p *Puller) copierRoutine(in <-chan copyBlocksState, pullChan chan<- pullBl
} }
} }
func (p *Puller) pullerRoutine(in <-chan pullBlockState, out chan<- *sharedPullerState) { func (p *rwFolder) pullerRoutine(in <-chan pullBlockState, out chan<- *sharedPullerState) {
for state := range in { for state := range in {
if state.failed() != nil { if state.failed() != nil {
continue continue
@ -918,7 +940,7 @@ func (p *Puller) pullerRoutine(in <-chan pullBlockState, out chan<- *sharedPulle
} }
} }
func (p *Puller) performFinish(state *sharedPullerState) { func (p *rwFolder) performFinish(state *sharedPullerState) {
var err error var err error
defer func() { defer func() {
events.Default.Log(events.ItemFinished, map[string]interface{}{ events.Default.Log(events.ItemFinished, map[string]interface{}{
@ -931,7 +953,7 @@ func (p *Puller) performFinish(state *sharedPullerState) {
if !p.ignorePerms { if !p.ignorePerms {
err = os.Chmod(state.tempName, os.FileMode(state.file.Flags&0777)) err = os.Chmod(state.tempName, os.FileMode(state.file.Flags&0777))
if err != nil { if err != nil {
l.Warnln("puller: final:", err) l.Warnln("Puller: final:", err)
return return
} }
} }
@ -947,7 +969,7 @@ func (p *Puller) performFinish(state *sharedPullerState) {
// sync. // sync.
l.Infof("Puller (folder %q, file %q): final: %v (continuing anyway as requested)", p.folder, state.file.Name, err) l.Infof("Puller (folder %q, file %q): final: %v (continuing anyway as requested)", p.folder, state.file.Name, err)
} else { } else {
l.Warnln("puller: final:", err) l.Warnln("Puller: final:", err)
return return
} }
} }
@ -958,7 +980,7 @@ func (p *Puller) performFinish(state *sharedPullerState) {
if p.versioner != nil { if p.versioner != nil {
err = p.versioner.Archive(state.realName) err = p.versioner.Archive(state.realName)
if err != nil { if err != nil {
l.Warnln("puller: final:", err) l.Warnln("Puller: final:", err)
return return
} }
} }
@ -972,7 +994,7 @@ func (p *Puller) performFinish(state *sharedPullerState) {
// Replace the original content with the new one // Replace the original content with the new one
err = osutil.Rename(state.tempName, state.realName) err = osutil.Rename(state.tempName, state.realName)
if err != nil { if err != nil {
l.Warnln("puller: final:", err) l.Warnln("Puller: final:", err)
return return
} }
@ -980,7 +1002,7 @@ func (p *Puller) performFinish(state *sharedPullerState) {
if state.file.IsSymlink() { if state.file.IsSymlink() {
content, err := ioutil.ReadFile(state.realName) content, err := ioutil.ReadFile(state.realName)
if err != nil { if err != nil {
l.Warnln("puller: final: reading symlink:", err) l.Warnln("Puller: final: reading symlink:", err)
return return
} }
@ -990,7 +1012,7 @@ func (p *Puller) performFinish(state *sharedPullerState) {
return symlinks.Create(path, string(content), state.file.Flags) return symlinks.Create(path, string(content), state.file.Flags)
}, state.realName) }, state.realName)
if err != nil { if err != nil {
l.Warnln("puller: final: creating symlink:", err) l.Warnln("Puller: final: creating symlink:", err)
return return
} }
} }
@ -999,14 +1021,14 @@ func (p *Puller) performFinish(state *sharedPullerState) {
p.model.updateLocal(p.folder, state.file) p.model.updateLocal(p.folder, state.file)
} }
func (p *Puller) finisherRoutine(in <-chan *sharedPullerState) { func (p *rwFolder) finisherRoutine(in <-chan *sharedPullerState) {
for state := range in { for state := range in {
if closed, err := state.finalClose(); closed { if closed, err := state.finalClose(); closed {
if debug { if debug {
l.Debugln(p, "closing", state.file.Name) l.Debugln(p, "closing", state.file.Name)
} }
if err != nil { if err != nil {
l.Warnln("puller: final:", err) l.Warnln("Puller: final:", err)
continue continue
} }
@ -1029,11 +1051,11 @@ func (p *Puller) finisherRoutine(in <-chan *sharedPullerState) {
} }
// Moves the given filename to the front of the job queue // Moves the given filename to the front of the job queue
func (p *Puller) BringToFront(filename string) { func (p *rwFolder) BringToFront(filename string) {
p.queue.BringToFront(filename) p.queue.BringToFront(filename)
} }
func (p *Puller) Jobs() ([]string, []string) { func (p *rwFolder) Jobs() ([]string, []string) {
return p.queue.Jobs() return p.queue.Jobs()
} }

View File

@ -72,7 +72,7 @@ func TestHandleFile(t *testing.T) {
// Update index // Update index
m.updateLocal("default", existingFile) m.updateLocal("default", existingFile)
p := Puller{ p := rwFolder{
folder: "default", folder: "default",
dir: "testdata", dir: "testdata",
model: m, model: m,
@ -126,7 +126,7 @@ func TestHandleFileWithTemp(t *testing.T) {
// Update index // Update index
m.updateLocal("default", existingFile) m.updateLocal("default", existingFile)
p := Puller{ p := rwFolder{
folder: "default", folder: "default",
dir: "testdata", dir: "testdata",
model: m, model: m,
@ -197,7 +197,7 @@ func TestCopierFinder(t *testing.T) {
} }
} }
p := Puller{ p := rwFolder{
folder: "default", folder: "default",
dir: "testdata", dir: "testdata",
model: m, model: m,
@ -331,7 +331,7 @@ func TestLastResortPulling(t *testing.T) {
t.Error("Expected block not found") t.Error("Expected block not found")
} }
p := Puller{ p := rwFolder{
folder: "default", folder: "default",
dir: "testdata", dir: "testdata",
model: m, model: m,
@ -384,7 +384,7 @@ func TestDeregisterOnFailInCopy(t *testing.T) {
emitter := NewProgressEmitter(defaultConfig) emitter := NewProgressEmitter(defaultConfig)
go emitter.Serve() go emitter.Serve()
p := Puller{ p := rwFolder{
folder: "default", folder: "default",
dir: "testdata", dir: "testdata",
model: m, model: m,
@ -471,7 +471,7 @@ func TestDeregisterOnFailInPull(t *testing.T) {
emitter := NewProgressEmitter(defaultConfig) emitter := NewProgressEmitter(defaultConfig)
go emitter.Serve() go emitter.Serve()
p := Puller{ p := rwFolder{
folder: "default", folder: "default",
dir: "testdata", dir: "testdata",
model: m, model: m,

View File

@ -13,6 +13,7 @@ import (
"os" "os"
"path/filepath" "path/filepath"
"testing" "testing"
"time"
"github.com/syncthing/syncthing/internal/symlinks" "github.com/syncthing/syncthing/internal/symlinks"
) )
@ -71,7 +72,22 @@ func TestIgnores(t *testing.T) {
// Rescan and verify that we see them all // Rescan and verify that we see them all
p.post("/rest/scan?folder=default", nil) // Wait for one scan to succeed, or up to 20 seconds...
// This is to let startup, UPnP etc complete.
for i := 0; i < 20; i++ {
resp, err := p.post("/rest/scan?folder=default", nil)
if err != nil {
time.Sleep(time.Second)
continue
}
if resp.StatusCode != 200 {
resp.Body.Close()
time.Sleep(time.Second)
continue
}
break
}
m, err := p.model("default") m, err := p.model("default")
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)

View File

@ -29,7 +29,7 @@ func TestParallellScan(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
log.Println("Generaing .stignore...") log.Println("Generating .stignore...")
err = ioutil.WriteFile("s1/.stignore", []byte("some ignore data\n"), 0644) err = ioutil.WriteFile("s1/.stignore", []byte("some ignore data\n"), 0644)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
@ -46,7 +46,25 @@ func TestParallellScan(t *testing.T) {
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
time.Sleep(5 * time.Second)
// Wait for one scan to succeed, or up to 20 seconds...
// This is to let startup, UPnP etc complete.
for i := 0; i < 20; i++ {
resp, err := st.post("/rest/scan?folder=default", nil)
if err != nil {
time.Sleep(time.Second)
continue
}
if resp.StatusCode != 200 {
resp.Body.Close()
time.Sleep(time.Second)
continue
}
break
}
// Wait for UPnP and stuff
time.Sleep(10 * time.Second)
var wg sync.WaitGroup var wg sync.WaitGroup
log.Println("Starting scans...") log.Println("Starting scans...")

View File

@ -43,8 +43,22 @@ func TestBenchmarkTransfer(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
// Make sure the sender has the full index before they connect // Wait for one scan to succeed, or up to 20 seconds... This is to let
sender.post("/rest/scan?folder=default", nil) // startup, UPnP etc complete and make sure the sender has the full index
// before they connect.
for i := 0; i < 20; i++ {
resp, err := sender.post("/rest/scan?folder=default", nil)
if err != nil {
time.Sleep(time.Second)
continue
}
if resp.StatusCode != 200 {
resp.Body.Close()
time.Sleep(time.Second)
continue
}
break
}
log.Println("Starting receiver...") log.Println("Starting receiver...")
receiver := syncthingProcess{ // id2 receiver := syncthingProcess{ // id2