Update suture for data race bug

This commit is contained in:
Jakob Borg 2015-09-21 15:48:37 +02:00
parent e3dd072022
commit b7e5948b09
5 changed files with 57 additions and 10 deletions

3
Godeps/Godeps.json generated
View File

@ -47,7 +47,8 @@
}, },
{ {
"ImportPath": "github.com/thejerf/suture", "ImportPath": "github.com/thejerf/suture",
"Rev": "fc7aaeabdc43fe41c5328efa1479ffea0b820978" "Comment": "v1.0.1",
"Rev": "99c1f2d613756768fc4299acd9dc621e11ed3fd7"
}, },
{ {
"ImportPath": "github.com/vitrun/qart/coding", "ImportPath": "github.com/vitrun/qart/coding",

View File

@ -43,3 +43,14 @@ easily fit into the OTP paradigm. It ought to someday be considered a good
idea to distribute libraries that provide some sort of supervisor tree idea to distribute libraries that provide some sort of supervisor tree
functionality out of the box. It is possible to provide this functionality functionality out of the box. It is possible to provide this functionality
without explicitly depending on the Suture library. without explicitly depending on the Suture library.
Changelog
---------
suture uses semantic versioning.
1. 1.0.0
* Initial release.
2. 1.0.1
* Fixed data race on the .state variable.

View File

@ -59,6 +59,7 @@ import (
"log" "log"
"math" "math"
"runtime" "runtime"
"sync"
"sync/atomic" "sync/atomic"
"time" "time"
) )
@ -123,7 +124,6 @@ type Supervisor struct {
lastFail time.Time lastFail time.Time
failures float64 failures float64
restartQueue []serviceID restartQueue []serviceID
state uint8
serviceCounter serviceID serviceCounter serviceID
control chan supervisorMessage control chan supervisorMessage
resumeTimer <-chan time.Time resumeTimer <-chan time.Time
@ -143,6 +143,9 @@ type Supervisor struct {
// a minimal chunk. // a minimal chunk.
getNow func() time.Time getNow func() time.Time
getResume func(time.Duration) <-chan time.Time getResume func(time.Duration) <-chan time.Time
sync.Mutex
state uint8
} }
// Spec is used to pass arguments to the New function to create a // Spec is used to pass arguments to the New function to create a
@ -373,6 +376,7 @@ func (s *Supervisor) Add(service Service) ServiceToken {
supervisor.logBackoff = s.logBackoff supervisor.logBackoff = s.logBackoff
} }
s.Lock()
if s.state == notRunning { if s.state == notRunning {
id := s.serviceCounter id := s.serviceCounter
s.serviceCounter++ s.serviceCounter++
@ -380,8 +384,10 @@ func (s *Supervisor) Add(service Service) ServiceToken {
s.services[id] = service s.services[id] = service
s.restartQueue = append(s.restartQueue, id) s.restartQueue = append(s.restartQueue, id)
s.Unlock()
return ServiceToken{uint64(s.id)<<32 | uint64(id)} return ServiceToken{uint64(s.id)<<32 | uint64(id)}
} }
s.Unlock()
response := make(chan serviceID) response := make(chan serviceID)
s.control <- addService{service, response} s.control <- addService{service, response}
@ -408,16 +414,19 @@ func (s *Supervisor) Serve() {
} }
defer func() { defer func() {
s.Lock()
s.state = notRunning s.state = notRunning
s.Unlock()
}() }()
s.Lock()
if s.state != notRunning { if s.state != notRunning {
// FIXME: Don't explain why I don't need a semaphore, just use one s.Unlock()
// This doesn't use a semaphore because it's just a sanity check.
panic("Running a supervisor while it is already running?") panic("Running a supervisor while it is already running?")
} }
s.state = normal s.state = normal
s.Unlock()
// for all the services I currently know about, start them // for all the services I currently know about, start them
for _, id := range s.restartQueue { for _, id := range s.restartQueue {
@ -472,7 +481,9 @@ func (s *Supervisor) Serve() {
// excessive thrashing // excessive thrashing
// FIXME: Ought to permit some spacing of these functions, rather // FIXME: Ought to permit some spacing of these functions, rather
// than simply hammering through them // than simply hammering through them
s.Lock()
s.state = normal s.state = normal
s.Unlock()
s.failures = 0 s.failures = 0
s.logBackoff(s, false) s.logBackoff(s, false)
for _, id := range s.restartQueue { for _, id := range s.restartQueue {
@ -499,7 +510,9 @@ func (s *Supervisor) handleFailedService(id serviceID, err interface{}, stacktra
} }
if s.failures > s.failureThreshold { if s.failures > s.failureThreshold {
s.Lock()
s.state = paused s.state = paused
s.Unlock()
s.logBackoff(s, true) s.logBackoff(s, true)
s.resumeTimer = s.getResume(s.failureBackoff) s.resumeTimer = s.getResume(s.failureBackoff)
} }
@ -511,7 +524,13 @@ func (s *Supervisor) handleFailedService(id serviceID, err interface{}, stacktra
// It is possible for a service to be no longer monitored // It is possible for a service to be no longer monitored
// by the time we get here. In that case, just ignore it. // by the time we get here. In that case, just ignore it.
if monitored { if monitored {
if s.state == normal { // this may look dangerous because the state could change, but this
// code is only ever run in the one goroutine that is permitted to
// change the state, so nothing else will.
s.Lock()
curState := s.state
s.Unlock()
if curState == normal {
s.runService(failedService, id) s.runService(failedService, id)
s.logFailure(s, failedService, s.failures, s.failureThreshold, true, err, stacktrace) s.logFailure(s, failedService, s.failures, s.failureThreshold, true, err, stacktrace)
} else { } else {

View File

@ -17,7 +17,7 @@ func (i *Incrementor) Serve() {
for { for {
select { select {
case i.next <- i.current: case i.next <- i.current:
i.current += 1 i.current++
case <-i.stop: case <-i.stop:
// We sync here just to guarantee the output of "Stopping the service", // We sync here just to guarantee the output of "Stopping the service",
// so this passes the test reliably. // so this passes the test reliably.

View File

@ -478,6 +478,22 @@ func TestNilSupervisorAdd(t *testing.T) {
s.Add(s) s.Add(s)
} }
// https://github.com/thejerf/suture/issues/11
//
// The purpose of this test is to verify that it does not cause data races,
// so there are no obvious assertions.
func TestIssue11(t *testing.T) {
t.Parallel()
s := NewSimple("main")
s.ServeBackground()
subsuper := NewSimple("sub")
s.Add(subsuper)
subsuper.Add(NewService("may cause data race"))
}
// http://golangtutorials.blogspot.com/2011/10/gotest-unit-testing-and-benchmarking-go.html // http://golangtutorials.blogspot.com/2011/10/gotest-unit-testing-and-benchmarking-go.html
// claims test function are run in the same order as the source file... // claims test function are run in the same order as the source file...
// I'm not sure if this is part of the contract, though. Especially in the // I'm not sure if this is part of the contract, though. Especially in the
@ -509,7 +525,7 @@ func (s *FailableService) Serve() {
everMultistarted = true everMultistarted = true
panic("Multi-started the same service! " + s.name) panic("Multi-started the same service! " + s.name)
} }
s.existing += 1 s.existing++
s.started <- true s.started <- true
@ -522,13 +538,13 @@ func (s *FailableService) Serve() {
case Happy: case Happy:
// Do nothing on purpose. Life is good! // Do nothing on purpose. Life is good!
case Fail: case Fail:
s.existing -= 1 s.existing--
if useStopChan { if useStopChan {
s.stop <- true s.stop <- true
} }
return return
case Panic: case Panic:
s.existing -= 1 s.existing--
panic("Panic!") panic("Panic!")
case Hang: case Hang:
// or more specifically, "hang until I release you" // or more specifically, "hang until I release you"
@ -537,7 +553,7 @@ func (s *FailableService) Serve() {
useStopChan = true useStopChan = true
} }
case <-s.shutdown: case <-s.shutdown:
s.existing -= 1 s.existing--
if useStopChan { if useStopChan {
s.stop <- true s.stop <- true
} }