Merge pull request #295 from github/throttle-no-changelog-write

avoid writing heartbeat when throttle commanded by user
This commit is contained in:
Shlomi Noach 2016-11-01 12:21:47 +01:00 committed by GitHub
commit 1db2ea5ada
4 changed files with 39 additions and 25 deletions

View File

@ -37,6 +37,13 @@ const (
CutOverTwoStep = iota CutOverTwoStep = iota
) )
type ThrottleReasonHint string
const (
NoThrottleReasonHint ThrottleReasonHint = "NoThrottleReasonHint"
UserCommandThrottleReasonHint = "UserCommandThrottleReasonHint"
)
var ( var (
envVariableRegexp = regexp.MustCompile("[$][{](.*)[}]") envVariableRegexp = regexp.MustCompile("[$][{](.*)[}]")
) )
@ -44,12 +51,14 @@ var (
type ThrottleCheckResult struct { type ThrottleCheckResult struct {
ShouldThrottle bool ShouldThrottle bool
Reason string Reason string
ReasonHint ThrottleReasonHint
} }
func NewThrottleCheckResult(throttle bool, reason string) *ThrottleCheckResult { func NewThrottleCheckResult(throttle bool, reason string, reasonHint ThrottleReasonHint) *ThrottleCheckResult {
return &ThrottleCheckResult{ return &ThrottleCheckResult{
ShouldThrottle: throttle, ShouldThrottle: throttle,
Reason: reason, Reason: reason,
ReasonHint: reasonHint,
} }
} }
@ -138,6 +147,7 @@ type MigrationContext struct {
TotalDMLEventsApplied int64 TotalDMLEventsApplied int64
isThrottled bool isThrottled bool
throttleReason string throttleReason string
throttleReasonHint ThrottleReasonHint
throttleGeneralCheckResult ThrottleCheckResult throttleGeneralCheckResult ThrottleCheckResult
throttleMutex *sync.Mutex throttleMutex *sync.Mutex
IsPostponingCutOver int64 IsPostponingCutOver int64
@ -416,17 +426,18 @@ func (this *MigrationContext) GetThrottleGeneralCheckResult() *ThrottleCheckResu
return &result return &result
} }
func (this *MigrationContext) SetThrottled(throttle bool, reason string) { func (this *MigrationContext) SetThrottled(throttle bool, reason string, reasonHint ThrottleReasonHint) {
this.throttleMutex.Lock() this.throttleMutex.Lock()
defer this.throttleMutex.Unlock() defer this.throttleMutex.Unlock()
this.isThrottled = throttle this.isThrottled = throttle
this.throttleReason = reason this.throttleReason = reason
this.throttleReasonHint = reasonHint
} }
func (this *MigrationContext) IsThrottled() (bool, string) { func (this *MigrationContext) IsThrottled() (bool, string, ThrottleReasonHint) {
this.throttleMutex.Lock() this.throttleMutex.Lock()
defer this.throttleMutex.Unlock() defer this.throttleMutex.Unlock()
return this.isThrottled, this.throttleReason return this.isThrottled, this.throttleReason, this.throttleReasonHint
} }
func (this *MigrationContext) GetReplicationLagQuery() string { func (this *MigrationContext) GetReplicationLagQuery() string {

View File

@ -305,6 +305,9 @@ func (this *Applier) InitiateHeartbeat() {
// Generally speaking, we would issue a goroutine, but I'd actually rather // Generally speaking, we would issue a goroutine, but I'd actually rather
// have this block the loop rather than spam the master in the event something // have this block the loop rather than spam the master in the event something
// goes wrong // goes wrong
if throttle, _, reasonHint := this.migrationContext.IsThrottled(); throttle && (reasonHint == base.UserCommandThrottleReasonHint) {
continue
}
if err := injectHeartbeat(); err != nil { if err := injectHeartbeat(); err != nil {
return return
} }

View File

@ -803,7 +803,7 @@ func (this *Migrator) printStatus(rule PrintStatusRule, writers ...io.Writer) {
} else if atomic.LoadInt64(&this.migrationContext.IsPostponingCutOver) > 0 { } else if atomic.LoadInt64(&this.migrationContext.IsPostponingCutOver) > 0 {
eta = "due" eta = "due"
state = "postponing cut-over" state = "postponing cut-over"
} else if isThrottled, throttleReason := this.migrationContext.IsThrottled(); isThrottled { } else if isThrottled, throttleReason, _ := this.migrationContext.IsThrottled(); isThrottled {
state = fmt.Sprintf("throttled, %s", throttleReason) state = fmt.Sprintf("throttled, %s", throttleReason)
} }

View File

@ -34,16 +34,16 @@ func NewThrottler(applier *Applier, inspector *Inspector) *Throttler {
// shouldThrottle performs checks to see whether we should currently be throttling. // shouldThrottle performs checks to see whether we should currently be throttling.
// It merely observes the metrics collected by other components, it does not issue // It merely observes the metrics collected by other components, it does not issue
// its own metric collection. // its own metric collection.
func (this *Throttler) shouldThrottle() (result bool, reason string) { func (this *Throttler) shouldThrottle() (result bool, reason string, reasonHint base.ThrottleReasonHint) {
generalCheckResult := this.migrationContext.GetThrottleGeneralCheckResult() generalCheckResult := this.migrationContext.GetThrottleGeneralCheckResult()
if generalCheckResult.ShouldThrottle { if generalCheckResult.ShouldThrottle {
return generalCheckResult.ShouldThrottle, generalCheckResult.Reason return generalCheckResult.ShouldThrottle, generalCheckResult.Reason, generalCheckResult.ReasonHint
} }
// Replication lag throttle // Replication lag throttle
maxLagMillisecondsThrottleThreshold := atomic.LoadInt64(&this.migrationContext.MaxLagMillisecondsThrottleThreshold) maxLagMillisecondsThrottleThreshold := atomic.LoadInt64(&this.migrationContext.MaxLagMillisecondsThrottleThreshold)
lag := atomic.LoadInt64(&this.migrationContext.CurrentLag) lag := atomic.LoadInt64(&this.migrationContext.CurrentLag)
if time.Duration(lag) > time.Duration(maxLagMillisecondsThrottleThreshold)*time.Millisecond { if time.Duration(lag) > time.Duration(maxLagMillisecondsThrottleThreshold)*time.Millisecond {
return true, fmt.Sprintf("lag=%fs", time.Duration(lag).Seconds()) return true, fmt.Sprintf("lag=%fs", time.Duration(lag).Seconds()), base.NoThrottleReasonHint
} }
checkThrottleControlReplicas := true checkThrottleControlReplicas := true
if (this.migrationContext.TestOnReplica || this.migrationContext.MigrateOnReplica) && (atomic.LoadInt64(&this.migrationContext.AllEventsUpToLockProcessedInjectedFlag) > 0) { if (this.migrationContext.TestOnReplica || this.migrationContext.MigrateOnReplica) && (atomic.LoadInt64(&this.migrationContext.AllEventsUpToLockProcessedInjectedFlag) > 0) {
@ -52,14 +52,14 @@ func (this *Throttler) shouldThrottle() (result bool, reason string) {
if checkThrottleControlReplicas { if checkThrottleControlReplicas {
lagResult := this.migrationContext.GetControlReplicasLagResult() lagResult := this.migrationContext.GetControlReplicasLagResult()
if lagResult.Err != nil { if lagResult.Err != nil {
return true, fmt.Sprintf("%+v %+v", lagResult.Key, lagResult.Err) return true, fmt.Sprintf("%+v %+v", lagResult.Key, lagResult.Err), base.NoThrottleReasonHint
} }
if lagResult.Lag > time.Duration(maxLagMillisecondsThrottleThreshold)*time.Millisecond { if lagResult.Lag > time.Duration(maxLagMillisecondsThrottleThreshold)*time.Millisecond {
return true, fmt.Sprintf("%+v replica-lag=%fs", lagResult.Key, lagResult.Lag.Seconds()) return true, fmt.Sprintf("%+v replica-lag=%fs", lagResult.Key, lagResult.Lag.Seconds()), base.NoThrottleReasonHint
} }
} }
// Got here? No metrics indicates we need throttling. // Got here? No metrics indicates we need throttling.
return false, "" return false, "", base.NoThrottleReasonHint
} }
// parseChangelogHeartbeat is called when a heartbeat event is intercepted // parseChangelogHeartbeat is called when a heartbeat event is intercepted
@ -147,8 +147,8 @@ func (this *Throttler) criticalLoadIsMet() (met bool, variableName string, value
// collectGeneralThrottleMetrics reads the once-per-sec metrics, and stores them onto this.migrationContext // collectGeneralThrottleMetrics reads the once-per-sec metrics, and stores them onto this.migrationContext
func (this *Throttler) collectGeneralThrottleMetrics() error { func (this *Throttler) collectGeneralThrottleMetrics() error {
setThrottle := func(throttle bool, reason string) error { setThrottle := func(throttle bool, reason string, reasonHint base.ThrottleReasonHint) error {
this.migrationContext.SetThrottleGeneralCheckResult(base.NewThrottleCheckResult(throttle, reason)) this.migrationContext.SetThrottleGeneralCheckResult(base.NewThrottleCheckResult(throttle, reason, reasonHint))
return nil return nil
} }
@ -161,7 +161,7 @@ func (this *Throttler) collectGeneralThrottleMetrics() error {
criticalLoadMet, variableName, value, threshold, err := this.criticalLoadIsMet() criticalLoadMet, variableName, value, threshold, err := this.criticalLoadIsMet()
if err != nil { if err != nil {
return setThrottle(true, fmt.Sprintf("%s %s", variableName, err)) return setThrottle(true, fmt.Sprintf("%s %s", variableName, err), base.NoThrottleReasonHint)
} }
if criticalLoadMet && this.migrationContext.CriticalLoadIntervalMilliseconds == 0 { if criticalLoadMet && this.migrationContext.CriticalLoadIntervalMilliseconds == 0 {
this.migrationContext.PanicAbort <- fmt.Errorf("critical-load met: %s=%d, >=%d", variableName, value, threshold) this.migrationContext.PanicAbort <- fmt.Errorf("critical-load met: %s=%d, >=%d", variableName, value, threshold)
@ -181,18 +181,18 @@ func (this *Throttler) collectGeneralThrottleMetrics() error {
// User-based throttle // User-based throttle
if atomic.LoadInt64(&this.migrationContext.ThrottleCommandedByUser) > 0 { if atomic.LoadInt64(&this.migrationContext.ThrottleCommandedByUser) > 0 {
return setThrottle(true, "commanded by user") return setThrottle(true, "commanded by user", base.UserCommandThrottleReasonHint)
} }
if this.migrationContext.ThrottleFlagFile != "" { if this.migrationContext.ThrottleFlagFile != "" {
if base.FileExists(this.migrationContext.ThrottleFlagFile) { if base.FileExists(this.migrationContext.ThrottleFlagFile) {
// Throttle file defined and exists! // Throttle file defined and exists!
return setThrottle(true, "flag-file") return setThrottle(true, "flag-file", base.NoThrottleReasonHint)
} }
} }
if this.migrationContext.ThrottleAdditionalFlagFile != "" { if this.migrationContext.ThrottleAdditionalFlagFile != "" {
if base.FileExists(this.migrationContext.ThrottleAdditionalFlagFile) { if base.FileExists(this.migrationContext.ThrottleAdditionalFlagFile) {
// 2nd Throttle file defined and exists! // 2nd Throttle file defined and exists!
return setThrottle(true, "flag-file") return setThrottle(true, "flag-file", base.NoThrottleReasonHint)
} }
} }
@ -200,19 +200,19 @@ func (this *Throttler) collectGeneralThrottleMetrics() error {
for variableName, threshold := range maxLoad { for variableName, threshold := range maxLoad {
value, err := this.applier.ShowStatusVariable(variableName) value, err := this.applier.ShowStatusVariable(variableName)
if err != nil { if err != nil {
return setThrottle(true, fmt.Sprintf("%s %s", variableName, err)) return setThrottle(true, fmt.Sprintf("%s %s", variableName, err), base.NoThrottleReasonHint)
} }
if value >= threshold { if value >= threshold {
return setThrottle(true, fmt.Sprintf("max-load %s=%d >= %d", variableName, value, threshold)) return setThrottle(true, fmt.Sprintf("max-load %s=%d >= %d", variableName, value, threshold), base.NoThrottleReasonHint)
} }
} }
if this.migrationContext.GetThrottleQuery() != "" { if this.migrationContext.GetThrottleQuery() != "" {
if res, _ := this.applier.ExecuteThrottleQuery(); res > 0 { if res, _ := this.applier.ExecuteThrottleQuery(); res > 0 {
return setThrottle(true, "throttle-query") return setThrottle(true, "throttle-query", base.NoThrottleReasonHint)
} }
} }
return setThrottle(false, "") return setThrottle(false, "", base.NoThrottleReasonHint)
} }
// initiateThrottlerMetrics initiates the various processes that collect measurements // initiateThrottlerMetrics initiates the various processes that collect measurements
@ -237,8 +237,8 @@ func (this *Throttler) initiateThrottlerChecks() error {
throttlerTick := time.Tick(100 * time.Millisecond) throttlerTick := time.Tick(100 * time.Millisecond)
throttlerFunction := func() { throttlerFunction := func() {
alreadyThrottling, currentReason := this.migrationContext.IsThrottled() alreadyThrottling, currentReason, _ := this.migrationContext.IsThrottled()
shouldThrottle, throttleReason := this.shouldThrottle() shouldThrottle, throttleReason, throttleReasonHint := this.shouldThrottle()
if shouldThrottle && !alreadyThrottling { if shouldThrottle && !alreadyThrottling {
// New throttling // New throttling
this.applier.WriteAndLogChangelog("throttle", throttleReason) this.applier.WriteAndLogChangelog("throttle", throttleReason)
@ -249,7 +249,7 @@ func (this *Throttler) initiateThrottlerChecks() error {
// End of throttling // End of throttling
this.applier.WriteAndLogChangelog("throttle", "done throttling") this.applier.WriteAndLogChangelog("throttle", "done throttling")
} }
this.migrationContext.SetThrottled(shouldThrottle, throttleReason) this.migrationContext.SetThrottled(shouldThrottle, throttleReason, throttleReasonHint)
} }
throttlerFunction() throttlerFunction()
for range throttlerTick { for range throttlerTick {
@ -265,7 +265,7 @@ func (this *Throttler) throttle(onThrottled func()) {
for { for {
// IsThrottled() is non-blocking; the throttling decision making takes place asynchronously. // IsThrottled() is non-blocking; the throttling decision making takes place asynchronously.
// Therefore calling IsThrottled() is cheap // Therefore calling IsThrottled() is cheap
if shouldThrottle, _ := this.migrationContext.IsThrottled(); !shouldThrottle { if shouldThrottle, _, _ := this.migrationContext.IsThrottled(); !shouldThrottle {
return return
} }
if onThrottled != nil { if onThrottled != nil {