gh-ost/go/logic/throttler.go

330 lines
12 KiB
Go

/*
Copyright 2016 GitHub Inc.
See https://github.com/github/gh-ost/blob/master/LICENSE
*/
package logic
import (
"fmt"
"sync/atomic"
"time"
"github.com/github/gh-ost/go/base"
"github.com/github/gh-ost/go/mysql"
"github.com/github/gh-ost/go/sql"
"github.com/outbrain/golib/log"
"github.com/outbrain/golib/sqlutils"
)
// Throttler collects metrics related to throttling and makes informed decisison
// whether throttling should take place.
type Throttler struct {
migrationContext *base.MigrationContext
applier *Applier
inspector *Inspector
}
func NewThrottler(applier *Applier, inspector *Inspector) *Throttler {
return &Throttler{
migrationContext: base.GetMigrationContext(),
applier: applier,
inspector: inspector,
}
}
// shouldThrottle performs checks to see whether we should currently be throttling.
// It merely observes the metrics collected by other components, it does not issue
// its own metric collection.
func (this *Throttler) shouldThrottle() (result bool, reason string, reasonHint base.ThrottleReasonHint) {
generalCheckResult := this.migrationContext.GetThrottleGeneralCheckResult()
if generalCheckResult.ShouldThrottle {
return generalCheckResult.ShouldThrottle, generalCheckResult.Reason, generalCheckResult.ReasonHint
}
// Replication lag throttle
maxLagMillisecondsThrottleThreshold := atomic.LoadInt64(&this.migrationContext.MaxLagMillisecondsThrottleThreshold)
lag := atomic.LoadInt64(&this.migrationContext.CurrentLag)
if time.Duration(lag) > time.Duration(maxLagMillisecondsThrottleThreshold)*time.Millisecond {
return true, fmt.Sprintf("lag=%fs", time.Duration(lag).Seconds()), base.NoThrottleReasonHint
}
checkThrottleControlReplicas := true
if (this.migrationContext.TestOnReplica || this.migrationContext.MigrateOnReplica) && (atomic.LoadInt64(&this.migrationContext.AllEventsUpToLockProcessedInjectedFlag) > 0) {
checkThrottleControlReplicas = false
}
if checkThrottleControlReplicas {
lagResult := this.migrationContext.GetControlReplicasLagResult()
if lagResult.Err != nil {
return true, fmt.Sprintf("%+v %+v", lagResult.Key, lagResult.Err), base.NoThrottleReasonHint
}
if lagResult.Lag > time.Duration(maxLagMillisecondsThrottleThreshold)*time.Millisecond {
return true, fmt.Sprintf("%+v replica-lag=%fs", lagResult.Key, lagResult.Lag.Seconds()), base.NoThrottleReasonHint
}
}
// Got here? No metrics indicates we need throttling.
return false, "", base.NoThrottleReasonHint
}
// parseChangelogHeartbeat parses a string timestamp and deduces replication lag
func parseChangelogHeartbeat(heartbeatValue string) (lag time.Duration, err error) {
heartbeatTime, err := time.Parse(time.RFC3339Nano, heartbeatValue)
if err != nil {
return lag, err
}
lag = time.Since(heartbeatTime)
return lag, nil
}
// parseChangelogHeartbeat parses a string timestamp and deduces replication lag
func (this *Throttler) parseChangelogHeartbeat(heartbeatValue string) (err error) {
if lag, err := parseChangelogHeartbeat(heartbeatValue); err != nil {
return log.Errore(err)
} else {
atomic.StoreInt64(&this.migrationContext.CurrentLag, int64(lag))
return nil
}
}
// collectHeartbeat reads the latest changelog heartbeat value
func (this *Throttler) collectHeartbeat() {
ticker := time.Tick(time.Duration(this.migrationContext.HeartbeatIntervalMilliseconds) * time.Millisecond)
for range ticker {
go func() error {
if atomic.LoadInt64(&this.migrationContext.CleanupImminentFlag) > 0 {
return nil
}
if heartbeatValue, err := this.inspector.readChangelogState("heartbeat"); err != nil {
return log.Errore(err)
} else {
this.parseChangelogHeartbeat(heartbeatValue)
}
return nil
}()
}
}
// collectControlReplicasLag polls all the control replicas to get maximum lag value
func (this *Throttler) collectControlReplicasLag() {
replicationLagQuery := fmt.Sprintf(`
select value from %s.%s where hint = 'heartbeat' and id <= 255
`,
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.GetChangelogTableName()),
)
readReplicaLag := func(connectionConfig *mysql.ConnectionConfig) (lag time.Duration, err error) {
dbUri := connectionConfig.GetDBUri("information_schema")
var heartbeatValue string
if db, _, err := sqlutils.GetDB(dbUri); err != nil {
return lag, err
} else if err = db.QueryRow(replicationLagQuery).Scan(&heartbeatValue); err != nil {
return lag, err
}
lag, err = parseChangelogHeartbeat(heartbeatValue)
return lag, err
}
readControlReplicasLag := func() (result *mysql.ReplicationLagResult) {
instanceKeyMap := this.migrationContext.GetThrottleControlReplicaKeys()
if instanceKeyMap.Len() == 0 {
return result
}
lagResults := make(chan *mysql.ReplicationLagResult, instanceKeyMap.Len())
for replicaKey := range *instanceKeyMap {
connectionConfig := this.migrationContext.InspectorConnectionConfig.Duplicate()
connectionConfig.Key = replicaKey
lagResult := &mysql.ReplicationLagResult{Key: connectionConfig.Key}
go func() {
lagResult.Lag, lagResult.Err = readReplicaLag(connectionConfig)
lagResults <- lagResult
}()
}
for range *instanceKeyMap {
lagResult := <-lagResults
if result == nil {
result = lagResult
} else if lagResult.Err != nil {
result = lagResult
} else if lagResult.Lag.Nanoseconds() > result.Lag.Nanoseconds() {
result = lagResult
}
}
return result
}
checkControlReplicasLag := func() {
if (this.migrationContext.TestOnReplica || this.migrationContext.MigrateOnReplica) && (atomic.LoadInt64(&this.migrationContext.AllEventsUpToLockProcessedInjectedFlag) > 0) {
// No need to read lag
return
}
if result := readControlReplicasLag(); result != nil {
this.migrationContext.SetControlReplicasLagResult(result)
}
}
aggressiveTicker := time.Tick(100 * time.Millisecond)
relaxedFactor := 10
counter := 0
shouldReadLagAggressively := false
for range aggressiveTicker {
if counter%relaxedFactor == 0 {
// we only check if we wish to be aggressive once per second. The parameters for being aggressive
// do not typically change at all throughout the migration, but nonetheless we check them.
counter = 0
maxLagMillisecondsThrottleThreshold := atomic.LoadInt64(&this.migrationContext.MaxLagMillisecondsThrottleThreshold)
shouldReadLagAggressively = (maxLagMillisecondsThrottleThreshold < 1000)
}
if counter == 0 || shouldReadLagAggressively {
// We check replication lag every so often, or if we wish to be aggressive
checkControlReplicasLag()
}
counter++
}
}
func (this *Throttler) criticalLoadIsMet() (met bool, variableName string, value int64, threshold int64, err error) {
criticalLoad := this.migrationContext.GetCriticalLoad()
for variableName, threshold = range criticalLoad {
value, err = this.applier.ShowStatusVariable(variableName)
if err != nil {
return false, variableName, value, threshold, err
}
if value >= threshold {
return true, variableName, value, threshold, nil
}
}
return false, variableName, value, threshold, nil
}
// collectGeneralThrottleMetrics reads the once-per-sec metrics, and stores them onto this.migrationContext
func (this *Throttler) collectGeneralThrottleMetrics() error {
setThrottle := func(throttle bool, reason string, reasonHint base.ThrottleReasonHint) error {
this.migrationContext.SetThrottleGeneralCheckResult(base.NewThrottleCheckResult(throttle, reason, reasonHint))
return nil
}
// Regardless of throttle, we take opportunity to check for panic-abort
if this.migrationContext.PanicFlagFile != "" {
if base.FileExists(this.migrationContext.PanicFlagFile) {
this.migrationContext.PanicAbort <- fmt.Errorf("Found panic-file %s. Aborting without cleanup", this.migrationContext.PanicFlagFile)
}
}
criticalLoadMet, variableName, value, threshold, err := this.criticalLoadIsMet()
if err != nil {
return setThrottle(true, fmt.Sprintf("%s %s", variableName, err), base.NoThrottleReasonHint)
}
if criticalLoadMet && this.migrationContext.CriticalLoadIntervalMilliseconds == 0 {
this.migrationContext.PanicAbort <- fmt.Errorf("critical-load met: %s=%d, >=%d", variableName, value, threshold)
}
if criticalLoadMet && this.migrationContext.CriticalLoadIntervalMilliseconds > 0 {
log.Errorf("critical-load met once: %s=%d, >=%d. Will check again in %d millis", variableName, value, threshold, this.migrationContext.CriticalLoadIntervalMilliseconds)
go func() {
timer := time.NewTimer(time.Millisecond * time.Duration(this.migrationContext.CriticalLoadIntervalMilliseconds))
<-timer.C
if criticalLoadMetAgain, variableName, value, threshold, _ := this.criticalLoadIsMet(); criticalLoadMetAgain {
this.migrationContext.PanicAbort <- fmt.Errorf("critical-load met again after %d millis: %s=%d, >=%d", this.migrationContext.CriticalLoadIntervalMilliseconds, variableName, value, threshold)
}
}()
}
// Back to throttle considerations
// User-based throttle
if atomic.LoadInt64(&this.migrationContext.ThrottleCommandedByUser) > 0 {
return setThrottle(true, "commanded by user", base.UserCommandThrottleReasonHint)
}
if this.migrationContext.ThrottleFlagFile != "" {
if base.FileExists(this.migrationContext.ThrottleFlagFile) {
// Throttle file defined and exists!
return setThrottle(true, "flag-file", base.NoThrottleReasonHint)
}
}
if this.migrationContext.ThrottleAdditionalFlagFile != "" {
if base.FileExists(this.migrationContext.ThrottleAdditionalFlagFile) {
// 2nd Throttle file defined and exists!
return setThrottle(true, "flag-file", base.NoThrottleReasonHint)
}
}
maxLoad := this.migrationContext.GetMaxLoad()
for variableName, threshold := range maxLoad {
value, err := this.applier.ShowStatusVariable(variableName)
if err != nil {
return setThrottle(true, fmt.Sprintf("%s %s", variableName, err), base.NoThrottleReasonHint)
}
if value >= threshold {
return setThrottle(true, fmt.Sprintf("max-load %s=%d >= %d", variableName, value, threshold), base.NoThrottleReasonHint)
}
}
if this.migrationContext.GetThrottleQuery() != "" {
if res, _ := this.applier.ExecuteThrottleQuery(); res > 0 {
return setThrottle(true, "throttle-query", base.NoThrottleReasonHint)
}
}
return setThrottle(false, "", base.NoThrottleReasonHint)
}
// initiateThrottlerMetrics initiates the various processes that collect measurements
// that may affect throttling. There are several components, all running independently,
// that collect such metrics.
func (this *Throttler) initiateThrottlerCollection(firstThrottlingCollected chan<- bool) {
go this.collectHeartbeat()
go this.collectControlReplicasLag()
go func() {
throttlerMetricsTick := time.Tick(1 * time.Second)
this.collectGeneralThrottleMetrics()
firstThrottlingCollected <- true
for range throttlerMetricsTick {
this.collectGeneralThrottleMetrics()
}
}()
}
// initiateThrottlerChecks initiates the throttle ticker and sets the basic behavior of throttling.
func (this *Throttler) initiateThrottlerChecks() error {
throttlerTick := time.Tick(100 * time.Millisecond)
throttlerFunction := func() {
alreadyThrottling, currentReason, _ := this.migrationContext.IsThrottled()
shouldThrottle, throttleReason, throttleReasonHint := this.shouldThrottle()
if shouldThrottle && !alreadyThrottling {
// New throttling
this.applier.WriteAndLogChangelog("throttle", throttleReason)
} else if shouldThrottle && alreadyThrottling && (currentReason != throttleReason) {
// Change of reason
this.applier.WriteAndLogChangelog("throttle", throttleReason)
} else if alreadyThrottling && !shouldThrottle {
// End of throttling
this.applier.WriteAndLogChangelog("throttle", "done throttling")
}
this.migrationContext.SetThrottled(shouldThrottle, throttleReason, throttleReasonHint)
}
throttlerFunction()
for range throttlerTick {
throttlerFunction()
}
return nil
}
// throttle sees if throttling needs take place, and if so, continuously sleeps (blocks)
// until throttling reasons are gone
func (this *Throttler) throttle(onThrottled func()) {
for {
// IsThrottled() is non-blocking; the throttling decision making takes place asynchronously.
// Therefore calling IsThrottled() is cheap
if shouldThrottle, _, _ := this.migrationContext.IsThrottled(); !shouldThrottle {
return
}
if onThrottled != nil {
onThrottled()
}
time.Sleep(250 * time.Millisecond)
}
}