2016-08-30 10:25:45 +00:00
/ *
Copyright 2016 GitHub Inc .
See https : //github.com/github/gh-ost/blob/master/LICENSE
* /
package logic
import (
"fmt"
2017-03-26 10:10:34 +00:00
"net/http"
2016-08-30 10:25:45 +00:00
"sync/atomic"
"time"
"github.com/github/gh-ost/go/base"
"github.com/github/gh-ost/go/mysql"
2016-12-26 19:31:35 +00:00
"github.com/github/gh-ost/go/sql"
2016-08-30 10:25:45 +00:00
"github.com/outbrain/golib/log"
2016-12-26 19:31:35 +00:00
"github.com/outbrain/golib/sqlutils"
2016-08-30 10:25:45 +00:00
)
// Throttler collects metrics related to throttling and makes informed decisison
// whether throttling should take place.
type Throttler struct {
migrationContext * base . MigrationContext
applier * Applier
inspector * Inspector
}
2016-09-12 10:38:14 +00:00
func NewThrottler ( applier * Applier , inspector * Inspector ) * Throttler {
2016-08-30 10:25:45 +00:00
return & Throttler {
migrationContext : base . GetMigrationContext ( ) ,
applier : applier ,
inspector : inspector ,
}
}
// shouldThrottle performs checks to see whether we should currently be throttling.
// It merely observes the metrics collected by other components, it does not issue
// its own metric collection.
2016-10-27 12:51:38 +00:00
func ( this * Throttler ) shouldThrottle ( ) ( result bool , reason string , reasonHint base . ThrottleReasonHint ) {
2016-08-30 10:25:45 +00:00
generalCheckResult := this . migrationContext . GetThrottleGeneralCheckResult ( )
if generalCheckResult . ShouldThrottle {
2016-10-27 12:51:38 +00:00
return generalCheckResult . ShouldThrottle , generalCheckResult . Reason , generalCheckResult . ReasonHint
2016-08-30 10:25:45 +00:00
}
2017-03-26 10:10:34 +00:00
// HTTP throttle
statusCode := atomic . LoadInt64 ( & this . migrationContext . ThrottleHTTPStatusCode )
if statusCode != 0 && statusCode != http . StatusOK {
return true , fmt . Sprintf ( "http=%d" , statusCode ) , base . NoThrottleReasonHint
}
2016-08-30 10:25:45 +00:00
// Replication lag throttle
maxLagMillisecondsThrottleThreshold := atomic . LoadInt64 ( & this . migrationContext . MaxLagMillisecondsThrottleThreshold )
lag := atomic . LoadInt64 ( & this . migrationContext . CurrentLag )
if time . Duration ( lag ) > time . Duration ( maxLagMillisecondsThrottleThreshold ) * time . Millisecond {
2016-10-27 12:51:38 +00:00
return true , fmt . Sprintf ( "lag=%fs" , time . Duration ( lag ) . Seconds ( ) ) , base . NoThrottleReasonHint
2016-08-30 10:25:45 +00:00
}
checkThrottleControlReplicas := true
if ( this . migrationContext . TestOnReplica || this . migrationContext . MigrateOnReplica ) && ( atomic . LoadInt64 ( & this . migrationContext . AllEventsUpToLockProcessedInjectedFlag ) > 0 ) {
checkThrottleControlReplicas = false
}
if checkThrottleControlReplicas {
lagResult := this . migrationContext . GetControlReplicasLagResult ( )
if lagResult . Err != nil {
2016-10-27 12:51:38 +00:00
return true , fmt . Sprintf ( "%+v %+v" , lagResult . Key , lagResult . Err ) , base . NoThrottleReasonHint
2016-08-30 10:25:45 +00:00
}
if lagResult . Lag > time . Duration ( maxLagMillisecondsThrottleThreshold ) * time . Millisecond {
2016-10-27 12:51:38 +00:00
return true , fmt . Sprintf ( "%+v replica-lag=%fs" , lagResult . Key , lagResult . Lag . Seconds ( ) ) , base . NoThrottleReasonHint
2016-08-30 10:25:45 +00:00
}
}
// Got here? No metrics indicates we need throttling.
2016-10-27 12:51:38 +00:00
return false , "" , base . NoThrottleReasonHint
2016-08-30 10:25:45 +00:00
}
2016-12-26 19:31:35 +00:00
// parseChangelogHeartbeat parses a string timestamp and deduces replication lag
func parseChangelogHeartbeat ( heartbeatValue string ) ( lag time . Duration , err error ) {
2016-08-30 10:25:45 +00:00
heartbeatTime , err := time . Parse ( time . RFC3339Nano , heartbeatValue )
if err != nil {
2016-12-26 19:31:35 +00:00
return lag , err
}
lag = time . Since ( heartbeatTime )
return lag , nil
}
// parseChangelogHeartbeat parses a string timestamp and deduces replication lag
func ( this * Throttler ) parseChangelogHeartbeat ( heartbeatValue string ) ( err error ) {
if lag , err := parseChangelogHeartbeat ( heartbeatValue ) ; err != nil {
2016-08-30 10:25:45 +00:00
return log . Errore ( err )
2016-12-26 19:31:35 +00:00
} else {
atomic . StoreInt64 ( & this . migrationContext . CurrentLag , int64 ( lag ) )
return nil
2016-08-30 10:25:45 +00:00
}
}
2017-02-02 09:18:07 +00:00
// collectReplicationLag reads the latest changelog heartbeat value
2017-02-07 10:13:19 +00:00
func ( this * Throttler ) collectReplicationLag ( firstThrottlingCollected chan <- bool ) {
collectFunc := func ( ) error {
if atomic . LoadInt64 ( & this . migrationContext . CleanupImminentFlag ) > 0 {
return nil
}
2017-02-02 09:18:07 +00:00
2017-02-07 10:13:19 +00:00
if this . migrationContext . TestOnReplica || this . migrationContext . MigrateOnReplica {
// when running on replica, the heartbeat injection is also done on the replica.
// This means we will always get a good heartbeat value.
// When runnign on replica, we should instead check the `SHOW SLAVE STATUS` output.
if lag , err := mysql . GetReplicationLag ( this . inspector . connectionConfig ) ; err != nil {
return log . Errore ( err )
2016-12-26 19:31:35 +00:00
} else {
2017-02-07 10:13:19 +00:00
atomic . StoreInt64 ( & this . migrationContext . CurrentLag , int64 ( lag ) )
2016-08-30 10:25:45 +00:00
}
2017-02-07 10:13:19 +00:00
} else {
2016-12-26 19:31:35 +00:00
if heartbeatValue , err := this . inspector . readChangelogState ( "heartbeat" ) ; err != nil {
2016-08-30 10:25:45 +00:00
return log . Errore ( err )
2016-12-26 19:31:35 +00:00
} else {
2016-08-30 10:25:45 +00:00
this . parseChangelogHeartbeat ( heartbeatValue )
}
2017-02-07 10:13:19 +00:00
}
return nil
}
collectFunc ( )
firstThrottlingCollected <- true
ticker := time . Tick ( time . Duration ( this . migrationContext . HeartbeatIntervalMilliseconds ) * time . Millisecond )
for range ticker {
go collectFunc ( )
2016-08-30 10:25:45 +00:00
}
}
// collectControlReplicasLag polls all the control replicas to get maximum lag value
func ( this * Throttler ) collectControlReplicasLag ( ) {
2016-12-26 19:31:35 +00:00
replicationLagQuery := fmt . Sprintf ( `
select value from % s . % s where hint = ' heartbeat ' and id <= 255
` ,
sql . EscapeName ( this . migrationContext . DatabaseName ) ,
sql . EscapeName ( this . migrationContext . GetChangelogTableName ( ) ) ,
)
readReplicaLag := func ( connectionConfig * mysql . ConnectionConfig ) ( lag time . Duration , err error ) {
dbUri := connectionConfig . GetDBUri ( "information_schema" )
2017-02-02 09:18:07 +00:00
2016-12-26 19:31:35 +00:00
var heartbeatValue string
if db , _ , err := sqlutils . GetDB ( dbUri ) ; err != nil {
return lag , err
} else if err = db . QueryRow ( replicationLagQuery ) . Scan ( & heartbeatValue ) ; err != nil {
return lag , err
}
lag , err = parseChangelogHeartbeat ( heartbeatValue )
return lag , err
}
readControlReplicasLag := func ( ) ( result * mysql . ReplicationLagResult ) {
instanceKeyMap := this . migrationContext . GetThrottleControlReplicaKeys ( )
if instanceKeyMap . Len ( ) == 0 {
return result
}
lagResults := make ( chan * mysql . ReplicationLagResult , instanceKeyMap . Len ( ) )
for replicaKey := range * instanceKeyMap {
connectionConfig := this . migrationContext . InspectorConnectionConfig . Duplicate ( )
connectionConfig . Key = replicaKey
lagResult := & mysql . ReplicationLagResult { Key : connectionConfig . Key }
go func ( ) {
lagResult . Lag , lagResult . Err = readReplicaLag ( connectionConfig )
lagResults <- lagResult
} ( )
}
for range * instanceKeyMap {
lagResult := <- lagResults
if result == nil {
result = lagResult
} else if lagResult . Err != nil {
result = lagResult
} else if lagResult . Lag . Nanoseconds ( ) > result . Lag . Nanoseconds ( ) {
result = lagResult
}
}
return result
}
checkControlReplicasLag := func ( ) {
2016-08-30 10:25:45 +00:00
if ( this . migrationContext . TestOnReplica || this . migrationContext . MigrateOnReplica ) && ( atomic . LoadInt64 ( & this . migrationContext . AllEventsUpToLockProcessedInjectedFlag ) > 0 ) {
2016-12-26 19:31:35 +00:00
// No need to read lag
return
}
2017-01-29 08:18:39 +00:00
this . migrationContext . SetControlReplicasLagResult ( readControlReplicasLag ( ) )
2016-08-30 10:25:45 +00:00
}
aggressiveTicker := time . Tick ( 100 * time . Millisecond )
relaxedFactor := 10
counter := 0
shouldReadLagAggressively := false
for range aggressiveTicker {
if counter % relaxedFactor == 0 {
// we only check if we wish to be aggressive once per second. The parameters for being aggressive
// do not typically change at all throughout the migration, but nonetheless we check them.
counter = 0
maxLagMillisecondsThrottleThreshold := atomic . LoadInt64 ( & this . migrationContext . MaxLagMillisecondsThrottleThreshold )
2016-12-26 19:31:35 +00:00
shouldReadLagAggressively = ( maxLagMillisecondsThrottleThreshold < 1000 )
2016-08-30 10:25:45 +00:00
}
if counter == 0 || shouldReadLagAggressively {
// We check replication lag every so often, or if we wish to be aggressive
2016-12-26 19:31:35 +00:00
checkControlReplicasLag ( )
2016-08-30 10:25:45 +00:00
}
counter ++
}
}
2016-10-10 11:21:01 +00:00
func ( this * Throttler ) criticalLoadIsMet ( ) ( met bool , variableName string , value int64 , threshold int64 , err error ) {
criticalLoad := this . migrationContext . GetCriticalLoad ( )
for variableName , threshold = range criticalLoad {
value , err = this . applier . ShowStatusVariable ( variableName )
if err != nil {
return false , variableName , value , threshold , err
}
if value >= threshold {
return true , variableName , value , threshold , nil
}
}
return false , variableName , value , threshold , nil
}
2017-03-26 10:10:34 +00:00
// collectReplicationLag reads the latest changelog heartbeat value
func ( this * Throttler ) collectThrottleHTTPStatus ( firstThrottlingCollected chan <- bool ) {
collectFunc := func ( ) ( sleep bool , err error ) {
url := this . migrationContext . GetThrottleHTTP ( )
if url == "" {
return true , nil
}
2017-03-26 10:12:56 +00:00
resp , err := http . Head ( url )
2017-03-26 10:10:34 +00:00
if err != nil {
return false , err
}
atomic . StoreInt64 ( & this . migrationContext . ThrottleHTTPStatusCode , int64 ( resp . StatusCode ) )
return false , nil
}
collectFunc ( )
firstThrottlingCollected <- true
ticker := time . Tick ( 100 * time . Millisecond )
for range ticker {
if sleep , _ := collectFunc ( ) ; sleep {
time . Sleep ( 1 * time . Second )
}
}
}
2016-08-30 10:25:45 +00:00
// collectGeneralThrottleMetrics reads the once-per-sec metrics, and stores them onto this.migrationContext
func ( this * Throttler ) collectGeneralThrottleMetrics ( ) error {
2016-10-27 12:51:38 +00:00
setThrottle := func ( throttle bool , reason string , reasonHint base . ThrottleReasonHint ) error {
this . migrationContext . SetThrottleGeneralCheckResult ( base . NewThrottleCheckResult ( throttle , reason , reasonHint ) )
2016-08-30 10:25:45 +00:00
return nil
}
// Regardless of throttle, we take opportunity to check for panic-abort
if this . migrationContext . PanicFlagFile != "" {
if base . FileExists ( this . migrationContext . PanicFlagFile ) {
2016-09-12 10:38:14 +00:00
this . migrationContext . PanicAbort <- fmt . Errorf ( "Found panic-file %s. Aborting without cleanup" , this . migrationContext . PanicFlagFile )
2016-08-30 10:25:45 +00:00
}
}
2016-10-10 11:21:01 +00:00
criticalLoadMet , variableName , value , threshold , err := this . criticalLoadIsMet ( )
if err != nil {
2016-10-27 12:51:38 +00:00
return setThrottle ( true , fmt . Sprintf ( "%s %s" , variableName , err ) , base . NoThrottleReasonHint )
2016-10-10 11:21:01 +00:00
}
if criticalLoadMet && this . migrationContext . CriticalLoadIntervalMilliseconds == 0 {
this . migrationContext . PanicAbort <- fmt . Errorf ( "critical-load met: %s=%d, >=%d" , variableName , value , threshold )
}
if criticalLoadMet && this . migrationContext . CriticalLoadIntervalMilliseconds > 0 {
log . Errorf ( "critical-load met once: %s=%d, >=%d. Will check again in %d millis" , variableName , value , threshold , this . migrationContext . CriticalLoadIntervalMilliseconds )
go func ( ) {
timer := time . NewTimer ( time . Millisecond * time . Duration ( this . migrationContext . CriticalLoadIntervalMilliseconds ) )
<- timer . C
if criticalLoadMetAgain , variableName , value , threshold , _ := this . criticalLoadIsMet ( ) ; criticalLoadMetAgain {
this . migrationContext . PanicAbort <- fmt . Errorf ( "critical-load met again after %d millis: %s=%d, >=%d" , this . migrationContext . CriticalLoadIntervalMilliseconds , variableName , value , threshold )
}
} ( )
2016-08-30 10:25:45 +00:00
}
// Back to throttle considerations
// User-based throttle
if atomic . LoadInt64 ( & this . migrationContext . ThrottleCommandedByUser ) > 0 {
2016-10-27 12:51:38 +00:00
return setThrottle ( true , "commanded by user" , base . UserCommandThrottleReasonHint )
2016-08-30 10:25:45 +00:00
}
if this . migrationContext . ThrottleFlagFile != "" {
if base . FileExists ( this . migrationContext . ThrottleFlagFile ) {
// Throttle file defined and exists!
2016-10-27 12:51:38 +00:00
return setThrottle ( true , "flag-file" , base . NoThrottleReasonHint )
2016-08-30 10:25:45 +00:00
}
}
if this . migrationContext . ThrottleAdditionalFlagFile != "" {
if base . FileExists ( this . migrationContext . ThrottleAdditionalFlagFile ) {
// 2nd Throttle file defined and exists!
2016-10-27 12:51:38 +00:00
return setThrottle ( true , "flag-file" , base . NoThrottleReasonHint )
2016-08-30 10:25:45 +00:00
}
}
maxLoad := this . migrationContext . GetMaxLoad ( )
for variableName , threshold := range maxLoad {
value , err := this . applier . ShowStatusVariable ( variableName )
if err != nil {
2016-10-27 12:51:38 +00:00
return setThrottle ( true , fmt . Sprintf ( "%s %s" , variableName , err ) , base . NoThrottleReasonHint )
2016-08-30 10:25:45 +00:00
}
if value >= threshold {
2016-10-27 12:51:38 +00:00
return setThrottle ( true , fmt . Sprintf ( "max-load %s=%d >= %d" , variableName , value , threshold ) , base . NoThrottleReasonHint )
2016-08-30 10:25:45 +00:00
}
}
if this . migrationContext . GetThrottleQuery ( ) != "" {
if res , _ := this . applier . ExecuteThrottleQuery ( ) ; res > 0 {
2016-10-27 12:51:38 +00:00
return setThrottle ( true , "throttle-query" , base . NoThrottleReasonHint )
2016-08-30 10:25:45 +00:00
}
}
2016-10-27 12:51:38 +00:00
return setThrottle ( false , "" , base . NoThrottleReasonHint )
2016-08-30 10:25:45 +00:00
}
// initiateThrottlerMetrics initiates the various processes that collect measurements
// that may affect throttling. There are several components, all running independently,
// that collect such metrics.
func ( this * Throttler ) initiateThrottlerCollection ( firstThrottlingCollected chan <- bool ) {
2017-02-07 10:13:19 +00:00
go this . collectReplicationLag ( firstThrottlingCollected )
2016-08-30 10:25:45 +00:00
go this . collectControlReplicasLag ( )
2017-03-26 10:10:34 +00:00
go this . collectThrottleHTTPStatus ( firstThrottlingCollected )
2016-08-30 10:25:45 +00:00
go func ( ) {
this . collectGeneralThrottleMetrics ( )
firstThrottlingCollected <- true
2017-02-07 10:13:19 +00:00
throttlerMetricsTick := time . Tick ( 1 * time . Second )
2016-08-30 10:25:45 +00:00
for range throttlerMetricsTick {
this . collectGeneralThrottleMetrics ( )
}
} ( )
}
// initiateThrottlerChecks initiates the throttle ticker and sets the basic behavior of throttling.
func ( this * Throttler ) initiateThrottlerChecks ( ) error {
throttlerTick := time . Tick ( 100 * time . Millisecond )
throttlerFunction := func ( ) {
2016-10-27 12:51:38 +00:00
alreadyThrottling , currentReason , _ := this . migrationContext . IsThrottled ( )
shouldThrottle , throttleReason , throttleReasonHint := this . shouldThrottle ( )
2016-08-30 10:25:45 +00:00
if shouldThrottle && ! alreadyThrottling {
// New throttling
this . applier . WriteAndLogChangelog ( "throttle" , throttleReason )
} else if shouldThrottle && alreadyThrottling && ( currentReason != throttleReason ) {
// Change of reason
this . applier . WriteAndLogChangelog ( "throttle" , throttleReason )
} else if alreadyThrottling && ! shouldThrottle {
// End of throttling
this . applier . WriteAndLogChangelog ( "throttle" , "done throttling" )
}
2016-10-27 12:51:38 +00:00
this . migrationContext . SetThrottled ( shouldThrottle , throttleReason , throttleReasonHint )
2016-08-30 10:25:45 +00:00
}
throttlerFunction ( )
for range throttlerTick {
throttlerFunction ( )
}
return nil
}
// throttle sees if throttling needs take place, and if so, continuously sleeps (blocks)
// until throttling reasons are gone
func ( this * Throttler ) throttle ( onThrottled func ( ) ) {
for {
// IsThrottled() is non-blocking; the throttling decision making takes place asynchronously.
// Therefore calling IsThrottled() is cheap
2016-10-27 12:51:38 +00:00
if shouldThrottle , _ , _ := this . migrationContext . IsThrottled ( ) ; ! shouldThrottle {
2016-08-30 10:25:45 +00:00
return
}
if onThrottled != nil {
onThrottled ( )
}
time . Sleep ( 250 * time . Millisecond )
}
}