Use a channel instead of locks

This commit is contained in:
Jakob Borg 2015-05-03 14:18:32 +02:00
parent fe34b08ece
commit 1bd85d8baf
2 changed files with 22 additions and 36 deletions

View File

@ -17,12 +17,12 @@ import (
type roFolder struct { type roFolder struct {
stateTracker stateTracker
folder string folder string
intv time.Duration intv time.Duration
timer *time.Timer timer *time.Timer
tmut sync.Mutex // protects timer model *Model
model *Model stop chan struct{}
stop chan struct{} delayScan chan time.Duration
} }
func newROFolder(model *Model, folder string, interval time.Duration) *roFolder { func newROFolder(model *Model, folder string, interval time.Duration) *roFolder {
@ -31,12 +31,12 @@ func newROFolder(model *Model, folder string, interval time.Duration) *roFolder
folder: folder, folder: folder,
mut: sync.NewMutex(), mut: sync.NewMutex(),
}, },
folder: folder, folder: folder,
intv: interval, intv: interval,
timer: time.NewTimer(time.Millisecond), timer: time.NewTimer(time.Millisecond),
tmut: sync.NewMutex(), model: model,
model: model, stop: make(chan struct{}),
stop: make(chan struct{}), delayScan: make(chan time.Duration),
} }
} }
@ -47,17 +47,13 @@ func (s *roFolder) Serve() {
} }
defer func() { defer func() {
s.tmut.Lock()
s.timer.Stop() s.timer.Stop()
s.tmut.Unlock()
}() }()
reschedule := func() { reschedule := func() {
// Sleep a random time between 3/4 and 5/4 of the configured interval. // Sleep a random time between 3/4 and 5/4 of the configured interval.
sleepNanos := (s.intv.Nanoseconds()*3 + rand.Int63n(2*s.intv.Nanoseconds())) / 4 sleepNanos := (s.intv.Nanoseconds()*3 + rand.Int63n(2*s.intv.Nanoseconds())) / 4
s.tmut.Lock()
s.timer.Reset(time.Duration(sleepNanos) * time.Nanosecond) s.timer.Reset(time.Duration(sleepNanos) * time.Nanosecond)
s.tmut.Unlock()
} }
initialScanCompleted := false initialScanCompleted := false
@ -97,6 +93,9 @@ func (s *roFolder) Serve() {
} }
reschedule() reschedule()
case next := <-s.delayScan:
s.timer.Reset(next)
} }
} }
} }
@ -116,7 +115,5 @@ func (s *roFolder) Jobs() ([]string, []string) {
} }
func (s *roFolder) DelayScan(next time.Duration) { func (s *roFolder) DelayScan(next time.Duration) {
s.tmut.Lock() s.delayScan <- next
s.timer.Reset(next)
s.tmut.Unlock()
} }

View File

@ -76,7 +76,7 @@ type rwFolder struct {
dbUpdates chan protocol.FileInfo dbUpdates chan protocol.FileInfo
scanTimer *time.Timer scanTimer *time.Timer
pullTimer *time.Timer pullTimer *time.Timer
tmut sync.Mutex // protects scanTimer and pullTimer delayScan chan time.Duration
} }
func newRWFolder(m *Model, shortID uint64, cfg config.FolderConfiguration) *rwFolder { func newRWFolder(m *Model, shortID uint64, cfg config.FolderConfiguration) *rwFolder {
@ -103,7 +103,7 @@ func newRWFolder(m *Model, shortID uint64, cfg config.FolderConfiguration) *rwFo
queue: newJobQueue(), queue: newJobQueue(),
pullTimer: time.NewTimer(checkPullIntv), pullTimer: time.NewTimer(checkPullIntv),
scanTimer: time.NewTimer(time.Millisecond), // The first scan should be done immediately. scanTimer: time.NewTimer(time.Millisecond), // The first scan should be done immediately.
tmut: sync.NewMutex(), delayScan: make(chan time.Duration),
} }
} }
@ -116,10 +116,8 @@ func (p *rwFolder) Serve() {
} }
defer func() { defer func() {
p.tmut.Lock()
p.pullTimer.Stop() p.pullTimer.Stop()
p.scanTimer.Stop() p.scanTimer.Stop()
p.tmut.Unlock()
// TODO: Should there be an actual FolderStopped state? // TODO: Should there be an actual FolderStopped state?
p.setState(FolderIdle) p.setState(FolderIdle)
}() }()
@ -140,9 +138,7 @@ func (p *rwFolder) Serve() {
if debug { if debug {
l.Debugln(p, "next rescan in", intv) l.Debugln(p, "next rescan in", intv)
} }
p.tmut.Lock()
p.scanTimer.Reset(intv) p.scanTimer.Reset(intv)
p.tmut.Unlock()
} }
// We don't start pulling files until a scan has been completed. // We don't start pulling files until a scan has been completed.
@ -163,9 +159,7 @@ func (p *rwFolder) Serve() {
if debug { if debug {
l.Debugln(p, "skip (initial)") l.Debugln(p, "skip (initial)")
} }
p.tmut.Lock()
p.pullTimer.Reset(nextPullIntv) p.pullTimer.Reset(nextPullIntv)
p.tmut.Unlock()
continue continue
} }
@ -189,9 +183,7 @@ func (p *rwFolder) Serve() {
if debug { if debug {
l.Debugln(p, "skip (curVer == prevVer)", prevVer) l.Debugln(p, "skip (curVer == prevVer)", prevVer)
} }
p.tmut.Lock()
p.pullTimer.Reset(checkPullIntv) p.pullTimer.Reset(checkPullIntv)
p.tmut.Unlock()
continue continue
} }
@ -229,9 +221,7 @@ func (p *rwFolder) Serve() {
if debug { if debug {
l.Debugln(p, "next pull in", nextPullIntv) l.Debugln(p, "next pull in", nextPullIntv)
} }
p.tmut.Lock()
p.pullTimer.Reset(nextPullIntv) p.pullTimer.Reset(nextPullIntv)
p.tmut.Unlock()
break break
} }
@ -244,9 +234,7 @@ func (p *rwFolder) Serve() {
if debug { if debug {
l.Debugln(p, "next pull in", pauseIntv) l.Debugln(p, "next pull in", pauseIntv)
} }
p.tmut.Lock()
p.pullTimer.Reset(pauseIntv) p.pullTimer.Reset(pauseIntv)
p.tmut.Unlock()
break break
} }
} }
@ -283,6 +271,9 @@ func (p *rwFolder) Serve() {
l.Infoln("Completed initial scan (rw) of folder", p.folder) l.Infoln("Completed initial scan (rw) of folder", p.folder)
initialScanCompleted = true initialScanCompleted = true
} }
case next := <-p.delayScan:
p.scanTimer.Reset(next)
} }
} }
} }
@ -1181,9 +1172,7 @@ func (p *rwFolder) Jobs() ([]string, []string) {
} }
func (p *rwFolder) DelayScan(next time.Duration) { func (p *rwFolder) DelayScan(next time.Duration) {
p.tmut.Lock() p.delayScan <- next
p.scanTimer.Reset(next)
p.tmut.Unlock()
} }
// dbUpdaterRoutine aggregates db updates and commits them in batches no // dbUpdaterRoutine aggregates db updates and commits them in batches no