2016-08-30 10:25:45 +00:00
/ *
2022-05-31 19:23:39 +00:00
Copyright 2022 GitHub Inc .
2016-08-30 10:25:45 +00:00
See https : //github.com/github/gh-ost/blob/master/LICENSE
* /
package logic
import (
2022-07-06 21:56:07 +00:00
"context"
2016-08-30 10:25:45 +00:00
"fmt"
2017-03-26 10:10:34 +00:00
"net/http"
2017-07-04 07:28:09 +00:00
"strings"
2016-08-30 10:25:45 +00:00
"sync/atomic"
"time"
"github.com/github/gh-ost/go/base"
"github.com/github/gh-ost/go/mysql"
2016-12-26 19:31:35 +00:00
"github.com/github/gh-ost/go/sql"
2016-08-30 10:25:45 +00:00
)
2017-07-04 07:28:09 +00:00
var (
2020-03-30 14:55:49 +00:00
httpStatusMessages = map [ int ] string {
2017-07-04 07:28:09 +00:00
200 : "OK" ,
404 : "Not found" ,
417 : "Expectation failed" ,
429 : "Too many requests" ,
500 : "Internal server error" ,
2020-03-30 15:05:23 +00:00
- 1 : "Connection error" ,
2017-07-04 07:28:09 +00:00
}
// See https://github.com/github/freno/blob/master/doc/http.md
2020-03-30 14:55:49 +00:00
httpStatusFrenoMessages = map [ int ] string {
2017-07-04 07:28:09 +00:00
200 : "OK" ,
404 : "freno: unknown metric" ,
417 : "freno: access forbidden" ,
429 : "freno: threshold exceeded" ,
500 : "freno: internal error" ,
2020-03-30 14:55:49 +00:00
- 1 : "freno: connection error" ,
2017-07-04 07:28:09 +00:00
}
)
const frenoMagicHint = "freno"
2017-11-08 00:46:04 +00:00
// Throttler collects metrics related to throttling and makes informed decision
2016-08-30 10:25:45 +00:00
// whether throttling should take place.
type Throttler struct {
2022-07-06 21:56:07 +00:00
appVersion string
2017-12-14 23:14:13 +00:00
migrationContext * base . MigrationContext
applier * Applier
2022-07-06 21:56:07 +00:00
httpClient * http . Client
httpClientTimeout time . Duration
2017-12-14 23:14:13 +00:00
inspector * Inspector
finishedMigrating int64
2016-08-30 10:25:45 +00:00
}
2022-07-06 21:56:07 +00:00
func NewThrottler ( migrationContext * base . MigrationContext , applier * Applier , inspector * Inspector , appVersion string ) * Throttler {
2016-08-30 10:25:45 +00:00
return & Throttler {
2022-07-06 21:56:07 +00:00
appVersion : appVersion ,
2017-12-14 23:14:13 +00:00
migrationContext : migrationContext ,
applier : applier ,
2022-07-06 21:56:07 +00:00
httpClient : & http . Client { } ,
httpClientTimeout : time . Duration ( migrationContext . ThrottleHTTPTimeoutMillis ) * time . Millisecond ,
2017-12-14 23:14:13 +00:00
inspector : inspector ,
finishedMigrating : 0 ,
2016-08-30 10:25:45 +00:00
}
}
2017-07-04 07:28:09 +00:00
func ( this * Throttler ) throttleHttpMessage ( statusCode int ) string {
statusCodesMap := httpStatusMessages
if throttleHttp := this . migrationContext . GetThrottleHTTP ( ) ; strings . Contains ( throttleHttp , frenoMagicHint ) {
statusCodesMap = httpStatusFrenoMessages
}
if message , ok := statusCodesMap [ statusCode ] ; ok {
return fmt . Sprintf ( "%s (http=%d)" , message , statusCode )
}
return fmt . Sprintf ( "http=%d" , statusCode )
}
2016-08-30 10:25:45 +00:00
// shouldThrottle performs checks to see whether we should currently be throttling.
// It merely observes the metrics collected by other components, it does not issue
// its own metric collection.
2016-10-27 12:51:38 +00:00
func ( this * Throttler ) shouldThrottle ( ) ( result bool , reason string , reasonHint base . ThrottleReasonHint ) {
2017-05-24 05:32:13 +00:00
if hibernateUntil := atomic . LoadInt64 ( & this . migrationContext . HibernateUntil ) ; hibernateUntil > 0 {
hibernateUntilTime := time . Unix ( 0 , hibernateUntil )
return true , fmt . Sprintf ( "critical-load-hibernate until %+v" , hibernateUntilTime ) , base . NoThrottleReasonHint
}
2016-08-30 10:25:45 +00:00
generalCheckResult := this . migrationContext . GetThrottleGeneralCheckResult ( )
if generalCheckResult . ShouldThrottle {
2016-10-27 12:51:38 +00:00
return generalCheckResult . ShouldThrottle , generalCheckResult . Reason , generalCheckResult . ReasonHint
2016-08-30 10:25:45 +00:00
}
2017-03-26 10:10:34 +00:00
// HTTP throttle
statusCode := atomic . LoadInt64 ( & this . migrationContext . ThrottleHTTPStatusCode )
if statusCode != 0 && statusCode != http . StatusOK {
2017-07-04 07:28:09 +00:00
return true , this . throttleHttpMessage ( int ( statusCode ) ) , base . NoThrottleReasonHint
2017-03-26 10:10:34 +00:00
}
2020-03-25 20:40:09 +00:00
2016-08-30 10:25:45 +00:00
// Replication lag throttle
maxLagMillisecondsThrottleThreshold := atomic . LoadInt64 ( & this . migrationContext . MaxLagMillisecondsThrottleThreshold )
lag := atomic . LoadInt64 ( & this . migrationContext . CurrentLag )
if time . Duration ( lag ) > time . Duration ( maxLagMillisecondsThrottleThreshold ) * time . Millisecond {
2016-10-27 12:51:38 +00:00
return true , fmt . Sprintf ( "lag=%fs" , time . Duration ( lag ) . Seconds ( ) ) , base . NoThrottleReasonHint
2016-08-30 10:25:45 +00:00
}
checkThrottleControlReplicas := true
if ( this . migrationContext . TestOnReplica || this . migrationContext . MigrateOnReplica ) && ( atomic . LoadInt64 ( & this . migrationContext . AllEventsUpToLockProcessedInjectedFlag ) > 0 ) {
checkThrottleControlReplicas = false
}
if checkThrottleControlReplicas {
lagResult := this . migrationContext . GetControlReplicasLagResult ( )
if lagResult . Err != nil {
2016-10-27 12:51:38 +00:00
return true , fmt . Sprintf ( "%+v %+v" , lagResult . Key , lagResult . Err ) , base . NoThrottleReasonHint
2016-08-30 10:25:45 +00:00
}
if lagResult . Lag > time . Duration ( maxLagMillisecondsThrottleThreshold ) * time . Millisecond {
2016-10-27 12:51:38 +00:00
return true , fmt . Sprintf ( "%+v replica-lag=%fs" , lagResult . Key , lagResult . Lag . Seconds ( ) ) , base . NoThrottleReasonHint
2016-08-30 10:25:45 +00:00
}
}
// Got here? No metrics indicates we need throttling.
2016-10-27 12:51:38 +00:00
return false , "" , base . NoThrottleReasonHint
2016-08-30 10:25:45 +00:00
}
2016-12-26 19:31:35 +00:00
// parseChangelogHeartbeat parses a string timestamp and deduces replication lag
func parseChangelogHeartbeat ( heartbeatValue string ) ( lag time . Duration , err error ) {
2016-08-30 10:25:45 +00:00
heartbeatTime , err := time . Parse ( time . RFC3339Nano , heartbeatValue )
if err != nil {
2016-12-26 19:31:35 +00:00
return lag , err
}
lag = time . Since ( heartbeatTime )
return lag , nil
}
// parseChangelogHeartbeat parses a string timestamp and deduces replication lag
func ( this * Throttler ) parseChangelogHeartbeat ( heartbeatValue string ) ( err error ) {
if lag , err := parseChangelogHeartbeat ( heartbeatValue ) ; err != nil {
2019-10-07 15:10:36 +00:00
return this . migrationContext . Log . Errore ( err )
2016-12-26 19:31:35 +00:00
} else {
atomic . StoreInt64 ( & this . migrationContext . CurrentLag , int64 ( lag ) )
return nil
2016-08-30 10:25:45 +00:00
}
}
2017-02-02 09:18:07 +00:00
// collectReplicationLag reads the latest changelog heartbeat value
2017-02-07 10:13:19 +00:00
func ( this * Throttler ) collectReplicationLag ( firstThrottlingCollected chan <- bool ) {
collectFunc := func ( ) error {
if atomic . LoadInt64 ( & this . migrationContext . CleanupImminentFlag ) > 0 {
return nil
}
2017-05-24 05:32:13 +00:00
if atomic . LoadInt64 ( & this . migrationContext . HibernateUntil ) > 0 {
return nil
}
2017-02-02 09:18:07 +00:00
2017-02-07 10:13:19 +00:00
if this . migrationContext . TestOnReplica || this . migrationContext . MigrateOnReplica {
// when running on replica, the heartbeat injection is also done on the replica.
// This means we will always get a good heartbeat value.
2018-04-28 04:20:28 +00:00
// When running on replica, we should instead check the `SHOW SLAVE STATUS` output.
if lag , err := mysql . GetReplicationLagFromSlaveStatus ( this . inspector . informationSchemaDb ) ; err != nil {
2019-10-07 15:10:36 +00:00
return this . migrationContext . Log . Errore ( err )
2016-12-26 19:31:35 +00:00
} else {
2017-02-07 10:13:19 +00:00
atomic . StoreInt64 ( & this . migrationContext . CurrentLag , int64 ( lag ) )
2016-08-30 10:25:45 +00:00
}
2017-02-07 10:13:19 +00:00
} else {
2016-12-26 19:31:35 +00:00
if heartbeatValue , err := this . inspector . readChangelogState ( "heartbeat" ) ; err != nil {
2019-10-07 15:10:36 +00:00
return this . migrationContext . Log . Errore ( err )
2016-12-26 19:31:35 +00:00
} else {
2016-08-30 10:25:45 +00:00
this . parseChangelogHeartbeat ( heartbeatValue )
}
2017-02-07 10:13:19 +00:00
}
return nil
}
collectFunc ( )
firstThrottlingCollected <- true
2022-07-07 01:12:44 +00:00
ticker := time . NewTicker ( time . Duration ( this . migrationContext . HeartbeatIntervalMilliseconds ) * time . Millisecond )
defer ticker . Stop ( )
2022-07-07 03:05:37 +00:00
for range ticker . C {
2017-12-14 23:14:13 +00:00
if atomic . LoadInt64 ( & this . finishedMigrating ) > 0 {
return
}
2017-02-07 10:13:19 +00:00
go collectFunc ( )
2016-08-30 10:25:45 +00:00
}
}
// collectControlReplicasLag polls all the control replicas to get maximum lag value
func ( this * Throttler ) collectControlReplicasLag ( ) {
2017-05-24 05:32:13 +00:00
if atomic . LoadInt64 ( & this . migrationContext . HibernateUntil ) > 0 {
return
}
2016-12-26 19:31:35 +00:00
replicationLagQuery := fmt . Sprintf ( `
select value from % s . % s where hint = ' heartbeat ' and id <= 255
` ,
sql . EscapeName ( this . migrationContext . DatabaseName ) ,
sql . EscapeName ( this . migrationContext . GetChangelogTableName ( ) ) ,
)
readReplicaLag := func ( connectionConfig * mysql . ConnectionConfig ) ( lag time . Duration , err error ) {
dbUri := connectionConfig . GetDBUri ( "information_schema" )
2017-02-02 09:18:07 +00:00
2016-12-26 19:31:35 +00:00
var heartbeatValue string
2021-05-27 18:00:58 +00:00
db , _ , err := mysql . GetDB ( this . migrationContext . Uuid , dbUri )
if err != nil {
2016-12-26 19:31:35 +00:00
return lag , err
2021-05-27 18:00:58 +00:00
}
if err := db . QueryRow ( replicationLagQuery ) . Scan ( & heartbeatValue ) ; err != nil {
2016-12-26 19:31:35 +00:00
return lag , err
}
2017-08-28 22:53:47 +00:00
2016-12-26 19:31:35 +00:00
lag , err = parseChangelogHeartbeat ( heartbeatValue )
return lag , err
}
readControlReplicasLag := func ( ) ( result * mysql . ReplicationLagResult ) {
instanceKeyMap := this . migrationContext . GetThrottleControlReplicaKeys ( )
if instanceKeyMap . Len ( ) == 0 {
return result
}
lagResults := make ( chan * mysql . ReplicationLagResult , instanceKeyMap . Len ( ) )
for replicaKey := range * instanceKeyMap {
connectionConfig := this . migrationContext . InspectorConnectionConfig . Duplicate ( )
connectionConfig . Key = replicaKey
lagResult := & mysql . ReplicationLagResult { Key : connectionConfig . Key }
go func ( ) {
lagResult . Lag , lagResult . Err = readReplicaLag ( connectionConfig )
lagResults <- lagResult
} ( )
}
for range * instanceKeyMap {
lagResult := <- lagResults
if result == nil {
result = lagResult
} else if lagResult . Err != nil {
result = lagResult
} else if lagResult . Lag . Nanoseconds ( ) > result . Lag . Nanoseconds ( ) {
result = lagResult
}
}
return result
}
checkControlReplicasLag := func ( ) {
2016-08-30 10:25:45 +00:00
if ( this . migrationContext . TestOnReplica || this . migrationContext . MigrateOnReplica ) && ( atomic . LoadInt64 ( & this . migrationContext . AllEventsUpToLockProcessedInjectedFlag ) > 0 ) {
2016-12-26 19:31:35 +00:00
// No need to read lag
return
}
2017-01-29 08:18:39 +00:00
this . migrationContext . SetControlReplicasLagResult ( readControlReplicasLag ( ) )
2016-08-30 10:25:45 +00:00
}
2022-07-07 01:12:44 +00:00
2016-08-30 10:25:45 +00:00
relaxedFactor := 10
counter := 0
shouldReadLagAggressively := false
2022-07-07 01:19:53 +00:00
ticker := time . NewTicker ( 100 * time . Millisecond )
defer ticker . Stop ( )
2022-07-07 03:10:22 +00:00
for range ticker . C {
2017-12-14 23:14:13 +00:00
if atomic . LoadInt64 ( & this . finishedMigrating ) > 0 {
return
}
2016-08-30 10:25:45 +00:00
if counter % relaxedFactor == 0 {
// we only check if we wish to be aggressive once per second. The parameters for being aggressive
// do not typically change at all throughout the migration, but nonetheless we check them.
counter = 0
maxLagMillisecondsThrottleThreshold := atomic . LoadInt64 ( & this . migrationContext . MaxLagMillisecondsThrottleThreshold )
2016-12-26 19:31:35 +00:00
shouldReadLagAggressively = ( maxLagMillisecondsThrottleThreshold < 1000 )
2016-08-30 10:25:45 +00:00
}
if counter == 0 || shouldReadLagAggressively {
// We check replication lag every so often, or if we wish to be aggressive
2016-12-26 19:31:35 +00:00
checkControlReplicasLag ( )
2016-08-30 10:25:45 +00:00
}
counter ++
}
}
2016-10-10 11:21:01 +00:00
func ( this * Throttler ) criticalLoadIsMet ( ) ( met bool , variableName string , value int64 , threshold int64 , err error ) {
criticalLoad := this . migrationContext . GetCriticalLoad ( )
for variableName , threshold = range criticalLoad {
value , err = this . applier . ShowStatusVariable ( variableName )
if err != nil {
return false , variableName , value , threshold , err
}
if value >= threshold {
return true , variableName , value , threshold , nil
}
}
return false , variableName , value , threshold , nil
}
2017-03-26 10:10:34 +00:00
// collectReplicationLag reads the latest changelog heartbeat value
func ( this * Throttler ) collectThrottleHTTPStatus ( firstThrottlingCollected chan <- bool ) {
collectFunc := func ( ) ( sleep bool , err error ) {
2017-05-24 05:32:13 +00:00
if atomic . LoadInt64 ( & this . migrationContext . HibernateUntil ) > 0 {
return true , nil
}
2017-03-26 10:10:34 +00:00
url := this . migrationContext . GetThrottleHTTP ( )
if url == "" {
return true , nil
}
2022-07-06 21:56:07 +00:00
ctx , cancel := context . WithTimeout ( context . Background ( ) , this . httpClientTimeout )
defer cancel ( )
req , err := http . NewRequestWithContext ( ctx , http . MethodHead , url , nil )
if err != nil {
return false , err
}
req . Header . Set ( "User-Agent" , fmt . Sprintf ( "gh-ost/%s" , this . appVersion ) )
resp , err := this . httpClient . Do ( req )
2017-03-26 10:10:34 +00:00
if err != nil {
return false , err
}
2022-10-21 16:02:06 +00:00
defer resp . Body . Close ( )
2017-03-26 10:10:34 +00:00
atomic . StoreInt64 ( & this . migrationContext . ThrottleHTTPStatusCode , int64 ( resp . StatusCode ) )
return false , nil
}
2020-03-25 20:40:09 +00:00
_ , err := collectFunc ( )
if err != nil {
2020-03-25 20:58:32 +00:00
// If not told to ignore errors, we'll throttle on HTTP connection issues
if ! this . migrationContext . IgnoreHTTPErrors {
atomic . StoreInt64 ( & this . migrationContext . ThrottleHTTPStatusCode , int64 ( - 1 ) )
}
2020-03-25 20:40:09 +00:00
}
2017-03-26 10:10:34 +00:00
firstThrottlingCollected <- true
2022-07-06 21:56:07 +00:00
collectInterval := time . Duration ( this . migrationContext . ThrottleHTTPIntervalMillis ) * time . Millisecond
2022-07-07 01:12:44 +00:00
ticker := time . NewTicker ( collectInterval )
defer ticker . Stop ( )
2022-07-07 03:05:37 +00:00
for range ticker . C {
2017-12-14 23:14:13 +00:00
if atomic . LoadInt64 ( & this . finishedMigrating ) > 0 {
return
}
2020-03-30 14:53:10 +00:00
sleep , err := collectFunc ( )
if err != nil {
// If not told to ignore errors, we'll throttle on HTTP connection issues
if ! this . migrationContext . IgnoreHTTPErrors {
atomic . StoreInt64 ( & this . migrationContext . ThrottleHTTPStatusCode , int64 ( - 1 ) )
}
}
if sleep {
2017-03-26 10:10:34 +00:00
time . Sleep ( 1 * time . Second )
}
}
}
2016-08-30 10:25:45 +00:00
// collectGeneralThrottleMetrics reads the once-per-sec metrics, and stores them onto this.migrationContext
func ( this * Throttler ) collectGeneralThrottleMetrics ( ) error {
2017-05-24 05:32:13 +00:00
if atomic . LoadInt64 ( & this . migrationContext . HibernateUntil ) > 0 {
return nil
}
2016-08-30 10:25:45 +00:00
2016-10-27 12:51:38 +00:00
setThrottle := func ( throttle bool , reason string , reasonHint base . ThrottleReasonHint ) error {
this . migrationContext . SetThrottleGeneralCheckResult ( base . NewThrottleCheckResult ( throttle , reason , reasonHint ) )
2016-08-30 10:25:45 +00:00
return nil
}
// Regardless of throttle, we take opportunity to check for panic-abort
if this . migrationContext . PanicFlagFile != "" {
if base . FileExists ( this . migrationContext . PanicFlagFile ) {
2016-09-12 10:38:14 +00:00
this . migrationContext . PanicAbort <- fmt . Errorf ( "Found panic-file %s. Aborting without cleanup" , this . migrationContext . PanicFlagFile )
2016-08-30 10:25:45 +00:00
}
}
2016-10-10 11:21:01 +00:00
criticalLoadMet , variableName , value , threshold , err := this . criticalLoadIsMet ( )
if err != nil {
2016-10-27 12:51:38 +00:00
return setThrottle ( true , fmt . Sprintf ( "%s %s" , variableName , err ) , base . NoThrottleReasonHint )
2016-10-10 11:21:01 +00:00
}
2017-05-24 05:32:13 +00:00
2017-05-24 07:53:00 +00:00
if criticalLoadMet && this . migrationContext . CriticalLoadHibernateSeconds > 0 {
2017-05-24 05:32:13 +00:00
hibernateDuration := time . Duration ( this . migrationContext . CriticalLoadHibernateSeconds ) * time . Second
hibernateUntilTime := time . Now ( ) . Add ( hibernateDuration )
atomic . StoreInt64 ( & this . migrationContext . HibernateUntil , hibernateUntilTime . UnixNano ( ) )
2019-10-07 15:10:36 +00:00
this . migrationContext . Log . Errorf ( "critical-load met: %s=%d, >=%d. Will hibernate for the duration of %+v, until %+v" , variableName , value , threshold , hibernateDuration , hibernateUntilTime )
2017-05-24 05:32:13 +00:00
go func ( ) {
time . Sleep ( hibernateDuration )
2017-05-24 07:42:47 +00:00
this . migrationContext . SetThrottleGeneralCheckResult ( base . NewThrottleCheckResult ( true , "leaving hibernation" , base . LeavingHibernationThrottleReasonHint ) )
2017-05-24 05:32:13 +00:00
atomic . StoreInt64 ( & this . migrationContext . HibernateUntil , 0 )
} ( )
return nil
}
2016-10-10 11:21:01 +00:00
if criticalLoadMet && this . migrationContext . CriticalLoadIntervalMilliseconds == 0 {
this . migrationContext . PanicAbort <- fmt . Errorf ( "critical-load met: %s=%d, >=%d" , variableName , value , threshold )
}
if criticalLoadMet && this . migrationContext . CriticalLoadIntervalMilliseconds > 0 {
2019-10-07 15:10:36 +00:00
this . migrationContext . Log . Errorf ( "critical-load met once: %s=%d, >=%d. Will check again in %d millis" , variableName , value , threshold , this . migrationContext . CriticalLoadIntervalMilliseconds )
2016-10-10 11:21:01 +00:00
go func ( ) {
timer := time . NewTimer ( time . Millisecond * time . Duration ( this . migrationContext . CriticalLoadIntervalMilliseconds ) )
<- timer . C
if criticalLoadMetAgain , variableName , value , threshold , _ := this . criticalLoadIsMet ( ) ; criticalLoadMetAgain {
this . migrationContext . PanicAbort <- fmt . Errorf ( "critical-load met again after %d millis: %s=%d, >=%d" , this . migrationContext . CriticalLoadIntervalMilliseconds , variableName , value , threshold )
}
} ( )
2016-08-30 10:25:45 +00:00
}
// Back to throttle considerations
// User-based throttle
if atomic . LoadInt64 ( & this . migrationContext . ThrottleCommandedByUser ) > 0 {
2016-10-27 12:51:38 +00:00
return setThrottle ( true , "commanded by user" , base . UserCommandThrottleReasonHint )
2016-08-30 10:25:45 +00:00
}
if this . migrationContext . ThrottleFlagFile != "" {
if base . FileExists ( this . migrationContext . ThrottleFlagFile ) {
// Throttle file defined and exists!
2016-10-27 12:51:38 +00:00
return setThrottle ( true , "flag-file" , base . NoThrottleReasonHint )
2016-08-30 10:25:45 +00:00
}
}
if this . migrationContext . ThrottleAdditionalFlagFile != "" {
if base . FileExists ( this . migrationContext . ThrottleAdditionalFlagFile ) {
// 2nd Throttle file defined and exists!
2016-10-27 12:51:38 +00:00
return setThrottle ( true , "flag-file" , base . NoThrottleReasonHint )
2016-08-30 10:25:45 +00:00
}
}
maxLoad := this . migrationContext . GetMaxLoad ( )
for variableName , threshold := range maxLoad {
value , err := this . applier . ShowStatusVariable ( variableName )
if err != nil {
2016-10-27 12:51:38 +00:00
return setThrottle ( true , fmt . Sprintf ( "%s %s" , variableName , err ) , base . NoThrottleReasonHint )
2016-08-30 10:25:45 +00:00
}
if value >= threshold {
2016-10-27 12:51:38 +00:00
return setThrottle ( true , fmt . Sprintf ( "max-load %s=%d >= %d" , variableName , value , threshold ) , base . NoThrottleReasonHint )
2016-08-30 10:25:45 +00:00
}
}
if this . migrationContext . GetThrottleQuery ( ) != "" {
if res , _ := this . applier . ExecuteThrottleQuery ( ) ; res > 0 {
2016-10-27 12:51:38 +00:00
return setThrottle ( true , "throttle-query" , base . NoThrottleReasonHint )
2016-08-30 10:25:45 +00:00
}
}
2016-10-27 12:51:38 +00:00
return setThrottle ( false , "" , base . NoThrottleReasonHint )
2016-08-30 10:25:45 +00:00
}
// initiateThrottlerMetrics initiates the various processes that collect measurements
// that may affect throttling. There are several components, all running independently,
// that collect such metrics.
func ( this * Throttler ) initiateThrottlerCollection ( firstThrottlingCollected chan <- bool ) {
2017-02-07 10:13:19 +00:00
go this . collectReplicationLag ( firstThrottlingCollected )
2016-08-30 10:25:45 +00:00
go this . collectControlReplicasLag ( )
2017-03-26 10:10:34 +00:00
go this . collectThrottleHTTPStatus ( firstThrottlingCollected )
2016-08-30 10:25:45 +00:00
go func ( ) {
this . collectGeneralThrottleMetrics ( )
firstThrottlingCollected <- true
2017-02-07 10:13:19 +00:00
2022-07-07 01:27:36 +00:00
ticker := time . NewTicker ( time . Second )
defer ticker . Stop ( )
2022-07-07 03:05:37 +00:00
for range ticker . C {
2017-12-14 23:14:13 +00:00
if atomic . LoadInt64 ( & this . finishedMigrating ) > 0 {
return
}
2016-08-30 10:25:45 +00:00
this . collectGeneralThrottleMetrics ( )
}
} ( )
}
// initiateThrottlerChecks initiates the throttle ticker and sets the basic behavior of throttling.
2022-07-07 01:12:44 +00:00
func ( this * Throttler ) initiateThrottlerChecks ( ) {
2016-08-30 10:25:45 +00:00
throttlerFunction := func ( ) {
2016-10-27 12:51:38 +00:00
alreadyThrottling , currentReason , _ := this . migrationContext . IsThrottled ( )
shouldThrottle , throttleReason , throttleReasonHint := this . shouldThrottle ( )
2016-08-30 10:25:45 +00:00
if shouldThrottle && ! alreadyThrottling {
// New throttling
this . applier . WriteAndLogChangelog ( "throttle" , throttleReason )
} else if shouldThrottle && alreadyThrottling && ( currentReason != throttleReason ) {
// Change of reason
this . applier . WriteAndLogChangelog ( "throttle" , throttleReason )
} else if alreadyThrottling && ! shouldThrottle {
// End of throttling
this . applier . WriteAndLogChangelog ( "throttle" , "done throttling" )
}
2016-10-27 12:51:38 +00:00
this . migrationContext . SetThrottled ( shouldThrottle , throttleReason , throttleReasonHint )
2016-08-30 10:25:45 +00:00
}
throttlerFunction ( )
2022-07-07 01:19:53 +00:00
ticker := time . NewTicker ( 100 * time . Millisecond )
defer ticker . Stop ( )
2022-07-07 03:05:37 +00:00
for range ticker . C {
2017-12-14 23:14:13 +00:00
if atomic . LoadInt64 ( & this . finishedMigrating ) > 0 {
2022-07-07 01:12:44 +00:00
return
2017-12-14 23:14:13 +00:00
}
2016-08-30 10:25:45 +00:00
throttlerFunction ( )
}
}
// throttle sees if throttling needs take place, and if so, continuously sleeps (blocks)
// until throttling reasons are gone
func ( this * Throttler ) throttle ( onThrottled func ( ) ) {
for {
// IsThrottled() is non-blocking; the throttling decision making takes place asynchronously.
// Therefore calling IsThrottled() is cheap
2016-10-27 12:51:38 +00:00
if shouldThrottle , _ , _ := this . migrationContext . IsThrottled ( ) ; ! shouldThrottle {
2016-08-30 10:25:45 +00:00
return
}
if onThrottled != nil {
onThrottled ( )
}
time . Sleep ( 250 * time . Millisecond )
}
}
2017-12-14 23:14:13 +00:00
func ( this * Throttler ) Teardown ( ) {
2019-10-07 15:10:36 +00:00
this . migrationContext . Log . Debugf ( "Tearing down..." )
2017-12-14 23:14:13 +00:00
atomic . StoreInt64 ( & this . finishedMigrating , 1 )
}