From 10b222bc7bc533d362671ad22ada5a6898be7c5f Mon Sep 17 00:00:00 2001 From: Jonah Berquist Date: Fri, 26 Aug 2016 16:44:40 -0700 Subject: [PATCH 1/4] Reduce minimum maxLagMillisecondsThrottleThreshold to 100ms --- go/base/context.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/go/base/context.go b/go/base/context.go index 2f08fec..1f7326d 100644 --- a/go/base/context.go +++ b/go/base/context.go @@ -324,8 +324,8 @@ func (this *MigrationContext) TimeSincePointOfInterest() time.Duration { } func (this *MigrationContext) SetMaxLagMillisecondsThrottleThreshold(maxLagMillisecondsThrottleThreshold int64) { - if maxLagMillisecondsThrottleThreshold < 1000 { - maxLagMillisecondsThrottleThreshold = 1000 + if maxLagMillisecondsThrottleThreshold < 100 { + maxLagMillisecondsThrottleThreshold = 100 } atomic.StoreInt64(&this.MaxLagMillisecondsThrottleThreshold, maxLagMillisecondsThrottleThreshold) } From 2afb86b9e4eaef7a34305f61aa4f85965908fdfe Mon Sep 17 00:00:00 2001 From: Shlomi Noach Date: Tue, 30 Aug 2016 09:41:59 +0200 Subject: [PATCH 2/4] support for millisecond throttling - `--max-lag-millis` is at least `100ms` - `--heartbeat-interval-millis` introduced; defaults `500ms`, can range `100ms` - `1s` - Control replicas lag calculated asynchronously to throttle test - aggressive when `max-lag-millis < 1000` and when `replication-lag-query` is given --- doc/interactive-commands.md | 2 +- go/base/context.go | 26 ++++++++++++++++ go/cmd/gh-ost/main.go | 2 ++ go/logic/applier.go | 6 ++-- go/logic/migrator.go | 62 +++++++++++++++++++++++++++---------- 5 files changed, 77 insertions(+), 21 deletions(-) diff --git a/doc/interactive-commands.md b/doc/interactive-commands.md index fa971b0..afa4532 100644 --- a/doc/interactive-commands.md +++ b/doc/interactive-commands.md @@ -18,7 +18,7 @@ Both interfaces may serve at the same time. Both respond to simple text command, - `status`: returns a detailed status summary of migration progress and configuration - `sup`: returns a brief status summary of migration progress - `chunk-size=`: modify the `chunk-size`; applies on next running copy-iteration -- `max-lag-millis=`: modify the maximum replication lag threshold (milliseconds, minimum value is `1000`, i.e. 1 second) +- `max-lag-millis=`: modify the maximum replication lag threshold (milliseconds, minimum value is `100`, i.e. `0.1` second) - `max-load=`: modify the `max-load` config; applies on next running copy-iteration The `max-load` format must be: `some_status=[,some_status=...]`. For example: `Threads_running=50,threads_connected=1000`, and you would then write/echo `max-load=Threads_running=50,threads_connected=1000` to the socket. - `critical-load=`: change critical load setting (exceeding given thresholds causes panic and abort) diff --git a/go/base/context.go b/go/base/context.go index 1f7326d..fb6ff4e 100644 --- a/go/base/context.go +++ b/go/base/context.go @@ -64,6 +64,7 @@ type MigrationContext struct { CliUser string CliPassword string + HeartbeatIntervalMilliseconds int64 defaultNumRetries int64 ChunkSize int64 niceRatio float64 @@ -111,6 +112,7 @@ type MigrationContext struct { pointOfInterestTime time.Time pointOfInterestTimeMutex *sync.Mutex CurrentLag int64 + controlReplicasLagResult mysql.ReplicationLagResult TotalRowsCopied int64 TotalDMLEventsApplied int64 isThrottled bool @@ -323,6 +325,16 @@ func (this *MigrationContext) TimeSincePointOfInterest() time.Duration { return time.Since(this.pointOfInterestTime) } +func (this *MigrationContext) SetHeartbeatIntervalMilliseconds(heartbeatIntervalMilliseconds int64) { + if heartbeatIntervalMilliseconds < 100 { + heartbeatIntervalMilliseconds = 100 + } + if heartbeatIntervalMilliseconds > 1000 { + heartbeatIntervalMilliseconds = 1000 + } + this.HeartbeatIntervalMilliseconds = heartbeatIntervalMilliseconds +} + func (this *MigrationContext) SetMaxLagMillisecondsThrottleThreshold(maxLagMillisecondsThrottleThreshold int64) { if maxLagMillisecondsThrottleThreshold < 100 { maxLagMillisecondsThrottleThreshold = 100 @@ -451,6 +463,20 @@ func (this *MigrationContext) ReadCriticalLoad(criticalLoadList string) error { return nil } +func (this *MigrationContext) GetControlReplicasLagResult() mysql.ReplicationLagResult { + this.throttleMutex.Lock() + defer this.throttleMutex.Unlock() + + lagResult := this.controlReplicasLagResult + return lagResult +} + +func (this *MigrationContext) SetControlReplicasLagResult(lagResult *mysql.ReplicationLagResult) { + this.throttleMutex.Lock() + defer this.throttleMutex.Unlock() + this.controlReplicasLagResult = *lagResult +} + func (this *MigrationContext) GetThrottleControlReplicaKeys() *mysql.InstanceKeyMap { this.throttleMutex.Lock() defer this.throttleMutex.Unlock() diff --git a/go/cmd/gh-ost/main.go b/go/cmd/gh-ost/main.go index 78a4d5f..df1148a 100644 --- a/go/cmd/gh-ost/main.go +++ b/go/cmd/gh-ost/main.go @@ -81,6 +81,7 @@ func main() { replicationLagQuery := flag.String("replication-lag-query", "", "Query that detects replication lag in seconds. Result can be a floating point (by default gh-ost issues SHOW SLAVE STATUS and reads Seconds_behind_master). If you're using pt-heartbeat, query would be something like: SELECT ROUND(UNIX_TIMESTAMP() - MAX(UNIX_TIMESTAMP(ts))) AS delay FROM my_schema.heartbeat") throttleControlReplicas := flag.String("throttle-control-replicas", "", "List of replicas on which to check for lag; comma delimited. Example: myhost1.com:3306,myhost2.com,myhost3.com:3307") throttleQuery := flag.String("throttle-query", "", "when given, issued (every second) to check if operation should throttle. Expecting to return zero for no-throttle, >0 for throttle. Query is issued on the migrated server. Make sure this query is lightweight") + heartbeatIntervalMillis := flag.Int64("heartbeat-interval-millis", 500, "how frequently would gh-ost inject a heartbeat value") flag.StringVar(&migrationContext.ThrottleFlagFile, "throttle-flag-file", "", "operation pauses when this file exists; hint: use a file that is specific to the table being altered") flag.StringVar(&migrationContext.ThrottleAdditionalFlagFile, "throttle-additional-flag-file", "/tmp/gh-ost.throttle", "operation pauses when this file exists; hint: keep default, use for throttling multiple gh-ost operations") flag.StringVar(&migrationContext.PostponeCutOverFlagFile, "postpone-cut-over-flag-file", "", "while this file exists, migration will postpone the final stage of swapping tables, and will keep on syncing the ghost table. Cut-over/swapping would be ready to perform the moment the file is deleted.") @@ -181,6 +182,7 @@ func main() { if migrationContext.ServeSocketFile == "" { migrationContext.ServeSocketFile = fmt.Sprintf("/tmp/gh-ost.%s.%s.sock", migrationContext.DatabaseName, migrationContext.OriginalTableName) } + migrationContext.SetHeartbeatIntervalMilliseconds(*heartbeatIntervalMillis) migrationContext.SetNiceRatio(*niceRatio) migrationContext.SetChunkSize(*chunkSize) migrationContext.SetMaxLagMillisecondsThrottleThreshold(*maxLagMillis) diff --git a/go/logic/applier.go b/go/logic/applier.go index 0e06582..97be6a2 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -258,7 +258,7 @@ func (this *Applier) WriteChangelogState(value string) (string, error) { // InitiateHeartbeat creates a heartbeat cycle, writing to the changelog table. // This is done asynchronously -func (this *Applier) InitiateHeartbeat(heartbeatIntervalMilliseconds int64) { +func (this *Applier) InitiateHeartbeat() { var numSuccessiveFailures int64 injectHeartbeat := func() error { if _, err := this.WriteChangelog("heartbeat", time.Now().Format(time.RFC3339Nano)); err != nil { @@ -273,10 +273,10 @@ func (this *Applier) InitiateHeartbeat(heartbeatIntervalMilliseconds int64) { } injectHeartbeat() - heartbeatTick := time.Tick(time.Duration(heartbeatIntervalMilliseconds) * time.Millisecond) + heartbeatTick := time.Tick(time.Duration(this.migrationContext.HeartbeatIntervalMilliseconds) * time.Millisecond) for range heartbeatTick { // Generally speaking, we would issue a goroutine, but I'd actually rather - // have this blocked rather than spam the master in the event something + // have this block the loop rather than spam the master in the event something // goes wrong if err := injectHeartbeat(); err != nil { return diff --git a/go/logic/migrator.go b/go/logic/migrator.go index 3382ca8..cfd4bef 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -36,8 +36,7 @@ const ( type tableWriteFunc func() error const ( - applyEventsQueueBuffer = 100 - heartbeatIntervalMilliseconds = 1000 + applyEventsQueueBuffer = 100 ) type PrintStatusRule int @@ -159,7 +158,7 @@ func (this *Migrator) shouldThrottle() (result bool, reason string) { checkThrottleControlReplicas = false } if checkThrottleControlReplicas { - lagResult := mysql.GetMaxReplicationLag(this.migrationContext.InspectorConnectionConfig, this.migrationContext.GetThrottleControlReplicaKeys(), this.migrationContext.GetReplicationLagQuery()) + lagResult := this.migrationContext.GetControlReplicasLagResult() if lagResult.Err != nil { return true, fmt.Sprintf("%+v %+v", lagResult.Key, lagResult.Err) } @@ -328,8 +327,8 @@ func (this *Migrator) onChangelogStateEvent(dmlEvent *binlog.BinlogDMLEvent) (er return nil } -// onChangelogHeartbeat is called when a heartbeat event is intercepted -func (this *Migrator) onChangelogHeartbeat(heartbeatValue string) (err error) { +// parseChangelogHeartbeat is called when a heartbeat event is intercepted +func (this *Migrator) parseChangelogHeartbeat(heartbeatValue string) (err error) { heartbeatTime, err := time.Parse(time.RFC3339Nano, heartbeatValue) if err != nil { return log.Errore(err) @@ -427,7 +426,8 @@ func (this *Migrator) Migrate() (err error) { if err := this.addDMLEventsListener(); err != nil { return err } - go this.initiateHeartbeatListener() + go this.initiateHeartbeatReader() + go this.initiateControlReplicasReader() if err := this.applier.ReadMigrationRangeValues(); err != nil { return err @@ -1045,11 +1045,11 @@ func (this *Migrator) printStatus(rule PrintStatusRule, writers ...io.Writer) { fmt.Fprintln(w, status) } -// initiateHeartbeatListener listens for heartbeat events. gh-ost implements its own +// initiateHeartbeatReader listens for heartbeat events. gh-ost implements its own // heartbeat mechanism, whether your DB has or hasn't an existing heartbeat solution. // Heartbeat is supplied via the changelog table -func (this *Migrator) initiateHeartbeatListener() { - ticker := time.Tick((heartbeatIntervalMilliseconds * time.Millisecond) / 2) +func (this *Migrator) initiateHeartbeatReader() { + ticker := time.Tick(time.Duration(this.migrationContext.HeartbeatIntervalMilliseconds) * time.Millisecond) for range ticker { go func() error { if atomic.LoadInt64(&this.cleanupImminentFlag) > 0 { @@ -1059,19 +1059,47 @@ func (this *Migrator) initiateHeartbeatListener() { if err != nil { return log.Errore(err) } - for hint, value := range changelogState { - switch hint { - case "heartbeat": - { - this.onChangelogHeartbeat(value) - } - } + if heartbeatValue, ok := changelogState["heartbeat"]; ok { + this.parseChangelogHeartbeat(heartbeatValue) } return nil }() } } +// initiateControlReplicasReader +func (this *Migrator) initiateControlReplicasReader() { + readControlReplicasLag := func(replicationLagQuery string) error { + if (this.migrationContext.TestOnReplica || this.migrationContext.MigrateOnReplica) && (atomic.LoadInt64(&this.allEventsUpToLockProcessedInjectedFlag) > 0) { + return nil + } + lagResult := mysql.GetMaxReplicationLag(this.migrationContext.InspectorConnectionConfig, this.migrationContext.GetThrottleControlReplicaKeys(), replicationLagQuery) + this.migrationContext.SetControlReplicasLagResult(lagResult) + return nil + } + aggressiveTicker := time.Tick(100 * time.Millisecond) + relaxedFactor := 10 + counter := 0 + shouldReadLagAggressively := false + replicationLagQuery := "" + + for range aggressiveTicker { + if counter%relaxedFactor == 0 { + // we only check if we wish to be aggressive once per second. The parameters for being aggressive + // do not typically change at all throughout the migration, but nonetheless we check them. + counter = 0 + maxLagMillisecondsThrottleThreshold := atomic.LoadInt64(&this.migrationContext.MaxLagMillisecondsThrottleThreshold) + replicationLagQuery = this.migrationContext.GetReplicationLagQuery() + shouldReadLagAggressively = (replicationLagQuery != "" && maxLagMillisecondsThrottleThreshold < 1000) + } + if counter == 0 || shouldReadLagAggressively { + // We check replication lag every so often, or if we wish to be aggressive + readControlReplicasLag(replicationLagQuery) + } + counter++ + } +} + // initiateStreaming begins treaming of binary log events and registers listeners for such events func (this *Migrator) initiateStreaming() error { this.eventsStreamer = NewEventsStreamer() @@ -1139,7 +1167,7 @@ func (this *Migrator) initiateApplier() error { } this.applier.WriteChangelogState(string(TablesInPlace)) - go this.applier.InitiateHeartbeat(heartbeatIntervalMilliseconds) + go this.applier.InitiateHeartbeat() return nil } From 23357d0643872da2f78236f658fbda56182c9d50 Mon Sep 17 00:00:00 2001 From: Shlomi Noach Date: Tue, 30 Aug 2016 11:32:17 +0200 Subject: [PATCH 3/4] WIP: decoupling general throttling from throttle logic --- go/base/context.go | 81 ++++++++++++++++++--------- go/logic/migrator.go | 129 ++++++++++++++++++++++++++++--------------- 2 files changed, 139 insertions(+), 71 deletions(-) diff --git a/go/base/context.go b/go/base/context.go index 091d3ec..2444c5e 100644 --- a/go/base/context.go +++ b/go/base/context.go @@ -41,6 +41,18 @@ var ( envVariableRegexp = regexp.MustCompile("[$][{](.*)[}]") ) +type ThrottleCheckResult struct { + ShouldThrottle bool + Reason string +} + +func NewThrottleCheckResult(throttle bool, reason string) *ThrottleCheckResult { + return &ThrottleCheckResult{ + ShouldThrottle: throttle, + Reason: reason, + } +} + // MigrationContext has the general, global state of migration. It is used by // all components throughout the migration process. type MigrationContext struct { @@ -96,33 +108,34 @@ type MigrationContext struct { InitiallyDropGhostTable bool CutOverType CutOver - Hostname string - TableEngine string - RowsEstimate int64 - RowsDeltaEstimate int64 - UsedRowsEstimateMethod RowsEstimateMethod - HasSuperPrivilege bool - OriginalBinlogFormat string - OriginalBinlogRowImage string - InspectorConnectionConfig *mysql.ConnectionConfig - ApplierConnectionConfig *mysql.ConnectionConfig - StartTime time.Time - RowCopyStartTime time.Time - RowCopyEndTime time.Time - LockTablesStartTime time.Time - RenameTablesStartTime time.Time - RenameTablesEndTime time.Time - pointOfInterestTime time.Time - pointOfInterestTimeMutex *sync.Mutex - CurrentLag int64 - controlReplicasLagResult mysql.ReplicationLagResult - TotalRowsCopied int64 - TotalDMLEventsApplied int64 - isThrottled bool - throttleReason string - throttleMutex *sync.Mutex - IsPostponingCutOver int64 - CountingRowsFlag int64 + Hostname string + TableEngine string + RowsEstimate int64 + RowsDeltaEstimate int64 + UsedRowsEstimateMethod RowsEstimateMethod + HasSuperPrivilege bool + OriginalBinlogFormat string + OriginalBinlogRowImage string + InspectorConnectionConfig *mysql.ConnectionConfig + ApplierConnectionConfig *mysql.ConnectionConfig + StartTime time.Time + RowCopyStartTime time.Time + RowCopyEndTime time.Time + LockTablesStartTime time.Time + RenameTablesStartTime time.Time + RenameTablesEndTime time.Time + pointOfInterestTime time.Time + pointOfInterestTimeMutex *sync.Mutex + CurrentLag int64 + controlReplicasLagResult mysql.ReplicationLagResult + TotalRowsCopied int64 + TotalDMLEventsApplied int64 + isThrottled bool + throttleReason string + throttleGeneralCheckResult ThrottleCheckResult + throttleMutex *sync.Mutex + IsPostponingCutOver int64 + CountingRowsFlag int64 OriginalTableColumns *sql.ColumnList OriginalTableUniqueKeys [](*sql.UniqueKey) @@ -355,6 +368,20 @@ func (this *MigrationContext) SetChunkSize(chunkSize int64) { atomic.StoreInt64(&this.ChunkSize, chunkSize) } +func (this *MigrationContext) SetThrottleGeneralCheckResult(checkResult *ThrottleCheckResult) *ThrottleCheckResult { + this.throttleMutex.Lock() + defer this.throttleMutex.Unlock() + this.throttleGeneralCheckResult = *checkResult + return checkResult +} + +func (this *MigrationContext) GetThrottleGeneralCheckResult() *ThrottleCheckResult { + this.throttleMutex.Lock() + defer this.throttleMutex.Unlock() + result := this.throttleGeneralCheckResult + return &result +} + func (this *MigrationContext) SetThrottled(throttle bool, reason string) { this.throttleMutex.Lock() defer this.throttleMutex.Unlock() diff --git a/go/logic/migrator.go b/go/logic/migrator.go index 393564e..7d95ea3 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -55,9 +55,11 @@ type Migrator struct { applier *Applier eventsStreamer *EventsStreamer server *Server + throttler *Throttler hooksExecutor *HooksExecutor migrationContext *base.MigrationContext + firstThrottlingCollected chan bool tablesInPlace chan bool rowCopyComplete chan bool allEventsUpToLockProcessed chan bool @@ -81,6 +83,7 @@ func NewMigrator() *Migrator { migrationContext: base.GetMigrationContext(), parser: sql.NewParser(), tablesInPlace: make(chan bool), + firstThrottlingCollected: make(chan bool, 1), rowCopyComplete: make(chan bool), allEventsUpToLockProcessed: make(chan bool), panicAbort: make(chan error), @@ -119,42 +122,12 @@ func (this *Migrator) initiateHooksExecutor() (err error) { } // shouldThrottle performs checks to see whether we should currently be throttling. -// It also checks for critical-load and panic aborts. +// It merely observes the metrics collected by other components, it does not issue +// its own metric collection. func (this *Migrator) shouldThrottle() (result bool, reason string) { - // Regardless of throttle, we take opportunity to check for panic-abort - if this.migrationContext.PanicFlagFile != "" { - if base.FileExists(this.migrationContext.PanicFlagFile) { - this.panicAbort <- fmt.Errorf("Found panic-file %s. Aborting without cleanup", this.migrationContext.PanicFlagFile) - } - } - criticalLoad := this.migrationContext.GetCriticalLoad() - for variableName, threshold := range criticalLoad { - value, err := this.applier.ShowStatusVariable(variableName) - if err != nil { - return true, fmt.Sprintf("%s %s", variableName, err) - } - if value >= threshold { - this.panicAbort <- fmt.Errorf("critical-load met: %s=%d, >=%d", variableName, value, threshold) - } - } - - // Back to throttle considerations - - // User-based throttle - if atomic.LoadInt64(&this.migrationContext.ThrottleCommandedByUser) > 0 { - return true, "commanded by user" - } - if this.migrationContext.ThrottleFlagFile != "" { - if base.FileExists(this.migrationContext.ThrottleFlagFile) { - // Throttle file defined and exists! - return true, "flag-file" - } - } - if this.migrationContext.ThrottleAdditionalFlagFile != "" { - if base.FileExists(this.migrationContext.ThrottleAdditionalFlagFile) { - // 2nd Throttle file defined and exists! - return true, "flag-file" - } + generalCheckResult := this.migrationContext.GetThrottleGeneralCheckResult() + if generalCheckResult.ShouldThrottle { + return generalCheckResult.ShouldThrottle, generalCheckResult.Reason } // Replication lag throttle maxLagMillisecondsThrottleThreshold := atomic.LoadInt64(&this.migrationContext.MaxLagMillisecondsThrottleThreshold) @@ -175,29 +148,93 @@ func (this *Migrator) shouldThrottle() (result bool, reason string) { return true, fmt.Sprintf("%+v replica-lag=%fs", lagResult.Key, lagResult.Lag.Seconds()) } } + // Got here? No metrics indicates we need throttling. + return false, "" +} + +// readGeneralThrottleMetrics reads the once-per-sec metrics, and stores them onto this.migrationContext +func (this *Migrator) readGeneralThrottleMetrics() error { + + setThrottle := func(throttle bool, reason string) error { + this.migrationContext.SetThrottleGeneralCheckResult(base.NewThrottleCheckResult(throttle, reason)) + return nil + } + + // Regardless of throttle, we take opportunity to check for panic-abort + if this.migrationContext.PanicFlagFile != "" { + if base.FileExists(this.migrationContext.PanicFlagFile) { + this.panicAbort <- fmt.Errorf("Found panic-file %s. Aborting without cleanup", this.migrationContext.PanicFlagFile) + } + } + criticalLoad := this.migrationContext.GetCriticalLoad() + for variableName, threshold := range criticalLoad { + value, err := this.applier.ShowStatusVariable(variableName) + if err != nil { + return setThrottle(true, fmt.Sprintf("%s %s", variableName, err)) + } + if value >= threshold { + this.panicAbort <- fmt.Errorf("critical-load met: %s=%d, >=%d", variableName, value, threshold) + } + } + + // Back to throttle considerations + + // User-based throttle + if atomic.LoadInt64(&this.migrationContext.ThrottleCommandedByUser) > 0 { + return setThrottle(true, "commanded by user") + } + if this.migrationContext.ThrottleFlagFile != "" { + if base.FileExists(this.migrationContext.ThrottleFlagFile) { + // Throttle file defined and exists! + return setThrottle(true, "flag-file") + } + } + if this.migrationContext.ThrottleAdditionalFlagFile != "" { + if base.FileExists(this.migrationContext.ThrottleAdditionalFlagFile) { + // 2nd Throttle file defined and exists! + return setThrottle(true, "flag-file") + } + } maxLoad := this.migrationContext.GetMaxLoad() for variableName, threshold := range maxLoad { value, err := this.applier.ShowStatusVariable(variableName) if err != nil { - return true, fmt.Sprintf("%s %s", variableName, err) + return setThrottle(true, fmt.Sprintf("%s %s", variableName, err)) } if value >= threshold { - return true, fmt.Sprintf("max-load %s=%d >= %d", variableName, value, threshold) + return setThrottle(true, fmt.Sprintf("max-load %s=%d >= %d", variableName, value, threshold)) } } if this.migrationContext.GetThrottleQuery() != "" { if res, _ := this.applier.ExecuteThrottleQuery(); res > 0 { - return true, "throttle-query" + return setThrottle(true, "throttle-query") } } - return false, "" + return setThrottle(false, "") +} + +// initiateThrottlerMetrics initiates the various processes that collect measurements +// that may affect throttling. There are several components, all running independently, +// that collect such metrics. +func (this *Migrator) initiateThrottlerMetrics() { + go this.initiateHeartbeatReader() + go this.initiateControlReplicasReader() + + go func() { + throttlerMetricsTick := time.Tick(1 * time.Second) + this.readGeneralThrottleMetrics() + this.firstThrottlingCollected <- true + for range throttlerMetricsTick { + this.readGeneralThrottleMetrics() + } + }() } // initiateThrottler initiates the throttle ticker and sets the basic behavior of throttling. func (this *Migrator) initiateThrottler() error { - throttlerTick := time.Tick(1 * time.Second) + throttlerTick := time.Tick(100 * time.Millisecond) throttlerFunction := func() { alreadyThrottling, currentReason := this.migrationContext.IsThrottled() @@ -453,16 +490,15 @@ func (this *Migrator) Migrate() (err error) { if err := this.countTableRows(); err != nil { return err } - if err := this.addDMLEventsListener(); err != nil { return err } - go this.initiateHeartbeatReader() - go this.initiateControlReplicasReader() - if err := this.applier.ReadMigrationRangeValues(); err != nil { return err } + go this.initiateThrottlerMetrics() + log.Infof("Waiting for first throttle metrics to be collected") + <-this.firstThrottlingCollected go this.initiateThrottler() if err := this.hooksExecutor.onBeforeRowCopy(); err != nil { return err @@ -1206,6 +1242,11 @@ func (this *Migrator) addDMLEventsListener() error { return err } +func (this *Migrator) initiateThrottler() error { + this.throttler = NewThrottler(this.panicAbort) + return nil +} + func (this *Migrator) initiateApplier() error { this.applier = NewApplier() if err := this.applier.InitDBConnections(); err != nil { From b2c71931c689c441b7ff484e6ca558e546cf8082 Mon Sep 17 00:00:00 2001 From: Shlomi Noach Date: Tue, 30 Aug 2016 12:25:45 +0200 Subject: [PATCH 4/4] refactored all throttling code into throttler.so --- go/base/context.go | 58 +++++----- go/logic/migrator.go | 263 ++++-------------------------------------- go/logic/throttler.go | 256 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 307 insertions(+), 270 deletions(-) create mode 100644 go/logic/throttler.go diff --git a/go/base/context.go b/go/base/context.go index 2444c5e..8d988ea 100644 --- a/go/base/context.go +++ b/go/base/context.go @@ -108,34 +108,36 @@ type MigrationContext struct { InitiallyDropGhostTable bool CutOverType CutOver - Hostname string - TableEngine string - RowsEstimate int64 - RowsDeltaEstimate int64 - UsedRowsEstimateMethod RowsEstimateMethod - HasSuperPrivilege bool - OriginalBinlogFormat string - OriginalBinlogRowImage string - InspectorConnectionConfig *mysql.ConnectionConfig - ApplierConnectionConfig *mysql.ConnectionConfig - StartTime time.Time - RowCopyStartTime time.Time - RowCopyEndTime time.Time - LockTablesStartTime time.Time - RenameTablesStartTime time.Time - RenameTablesEndTime time.Time - pointOfInterestTime time.Time - pointOfInterestTimeMutex *sync.Mutex - CurrentLag int64 - controlReplicasLagResult mysql.ReplicationLagResult - TotalRowsCopied int64 - TotalDMLEventsApplied int64 - isThrottled bool - throttleReason string - throttleGeneralCheckResult ThrottleCheckResult - throttleMutex *sync.Mutex - IsPostponingCutOver int64 - CountingRowsFlag int64 + Hostname string + TableEngine string + RowsEstimate int64 + RowsDeltaEstimate int64 + UsedRowsEstimateMethod RowsEstimateMethod + HasSuperPrivilege bool + OriginalBinlogFormat string + OriginalBinlogRowImage string + InspectorConnectionConfig *mysql.ConnectionConfig + ApplierConnectionConfig *mysql.ConnectionConfig + StartTime time.Time + RowCopyStartTime time.Time + RowCopyEndTime time.Time + LockTablesStartTime time.Time + RenameTablesStartTime time.Time + RenameTablesEndTime time.Time + pointOfInterestTime time.Time + pointOfInterestTimeMutex *sync.Mutex + CurrentLag int64 + controlReplicasLagResult mysql.ReplicationLagResult + TotalRowsCopied int64 + TotalDMLEventsApplied int64 + isThrottled bool + throttleReason string + throttleGeneralCheckResult ThrottleCheckResult + throttleMutex *sync.Mutex + IsPostponingCutOver int64 + CountingRowsFlag int64 + AllEventsUpToLockProcessedInjectedFlag int64 + CleanupImminentFlag int64 OriginalTableColumns *sql.ColumnList OriginalTableUniqueKeys [](*sql.UniqueKey) diff --git a/go/logic/migrator.go b/go/logic/migrator.go index 7d95ea3..e713280 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -20,7 +20,6 @@ import ( "github.com/github/gh-ost/go/base" "github.com/github/gh-ost/go/binlog" - "github.com/github/gh-ost/go/mysql" "github.com/github/gh-ost/go/sql" "github.com/outbrain/golib/log" @@ -65,11 +64,9 @@ type Migrator struct { allEventsUpToLockProcessed chan bool panicAbort chan error - rowCopyCompleteFlag int64 - allEventsUpToLockProcessedInjectedFlag int64 - inCutOverCriticalActionFlag int64 - cleanupImminentFlag int64 - userCommandedUnpostponeFlag int64 + rowCopyCompleteFlag int64 + inCutOverCriticalActionFlag int64 + userCommandedUnpostponeFlag int64 // copyRowsQueue should not be buffered; if buffered some non-damaging but // excessive work happens at the end of the iteration as new copy-jobs arrive befroe realizing the copy is complete copyRowsQueue chan tableWriteFunc @@ -88,8 +85,6 @@ func NewMigrator() *Migrator { allEventsUpToLockProcessed: make(chan bool), panicAbort: make(chan error), - allEventsUpToLockProcessedInjectedFlag: 0, - copyRowsQueue: make(chan tableWriteFunc), applyEventsQueue: make(chan tableWriteFunc, applyEventsQueueBuffer), handledChangelogStates: make(map[string]bool), @@ -121,160 +116,6 @@ func (this *Migrator) initiateHooksExecutor() (err error) { return nil } -// shouldThrottle performs checks to see whether we should currently be throttling. -// It merely observes the metrics collected by other components, it does not issue -// its own metric collection. -func (this *Migrator) shouldThrottle() (result bool, reason string) { - generalCheckResult := this.migrationContext.GetThrottleGeneralCheckResult() - if generalCheckResult.ShouldThrottle { - return generalCheckResult.ShouldThrottle, generalCheckResult.Reason - } - // Replication lag throttle - maxLagMillisecondsThrottleThreshold := atomic.LoadInt64(&this.migrationContext.MaxLagMillisecondsThrottleThreshold) - lag := atomic.LoadInt64(&this.migrationContext.CurrentLag) - if time.Duration(lag) > time.Duration(maxLagMillisecondsThrottleThreshold)*time.Millisecond { - return true, fmt.Sprintf("lag=%fs", time.Duration(lag).Seconds()) - } - checkThrottleControlReplicas := true - if (this.migrationContext.TestOnReplica || this.migrationContext.MigrateOnReplica) && (atomic.LoadInt64(&this.allEventsUpToLockProcessedInjectedFlag) > 0) { - checkThrottleControlReplicas = false - } - if checkThrottleControlReplicas { - lagResult := this.migrationContext.GetControlReplicasLagResult() - if lagResult.Err != nil { - return true, fmt.Sprintf("%+v %+v", lagResult.Key, lagResult.Err) - } - if lagResult.Lag > time.Duration(maxLagMillisecondsThrottleThreshold)*time.Millisecond { - return true, fmt.Sprintf("%+v replica-lag=%fs", lagResult.Key, lagResult.Lag.Seconds()) - } - } - // Got here? No metrics indicates we need throttling. - return false, "" -} - -// readGeneralThrottleMetrics reads the once-per-sec metrics, and stores them onto this.migrationContext -func (this *Migrator) readGeneralThrottleMetrics() error { - - setThrottle := func(throttle bool, reason string) error { - this.migrationContext.SetThrottleGeneralCheckResult(base.NewThrottleCheckResult(throttle, reason)) - return nil - } - - // Regardless of throttle, we take opportunity to check for panic-abort - if this.migrationContext.PanicFlagFile != "" { - if base.FileExists(this.migrationContext.PanicFlagFile) { - this.panicAbort <- fmt.Errorf("Found panic-file %s. Aborting without cleanup", this.migrationContext.PanicFlagFile) - } - } - criticalLoad := this.migrationContext.GetCriticalLoad() - for variableName, threshold := range criticalLoad { - value, err := this.applier.ShowStatusVariable(variableName) - if err != nil { - return setThrottle(true, fmt.Sprintf("%s %s", variableName, err)) - } - if value >= threshold { - this.panicAbort <- fmt.Errorf("critical-load met: %s=%d, >=%d", variableName, value, threshold) - } - } - - // Back to throttle considerations - - // User-based throttle - if atomic.LoadInt64(&this.migrationContext.ThrottleCommandedByUser) > 0 { - return setThrottle(true, "commanded by user") - } - if this.migrationContext.ThrottleFlagFile != "" { - if base.FileExists(this.migrationContext.ThrottleFlagFile) { - // Throttle file defined and exists! - return setThrottle(true, "flag-file") - } - } - if this.migrationContext.ThrottleAdditionalFlagFile != "" { - if base.FileExists(this.migrationContext.ThrottleAdditionalFlagFile) { - // 2nd Throttle file defined and exists! - return setThrottle(true, "flag-file") - } - } - - maxLoad := this.migrationContext.GetMaxLoad() - for variableName, threshold := range maxLoad { - value, err := this.applier.ShowStatusVariable(variableName) - if err != nil { - return setThrottle(true, fmt.Sprintf("%s %s", variableName, err)) - } - if value >= threshold { - return setThrottle(true, fmt.Sprintf("max-load %s=%d >= %d", variableName, value, threshold)) - } - } - if this.migrationContext.GetThrottleQuery() != "" { - if res, _ := this.applier.ExecuteThrottleQuery(); res > 0 { - return setThrottle(true, "throttle-query") - } - } - - return setThrottle(false, "") -} - -// initiateThrottlerMetrics initiates the various processes that collect measurements -// that may affect throttling. There are several components, all running independently, -// that collect such metrics. -func (this *Migrator) initiateThrottlerMetrics() { - go this.initiateHeartbeatReader() - go this.initiateControlReplicasReader() - - go func() { - throttlerMetricsTick := time.Tick(1 * time.Second) - this.readGeneralThrottleMetrics() - this.firstThrottlingCollected <- true - for range throttlerMetricsTick { - this.readGeneralThrottleMetrics() - } - }() -} - -// initiateThrottler initiates the throttle ticker and sets the basic behavior of throttling. -func (this *Migrator) initiateThrottler() error { - throttlerTick := time.Tick(100 * time.Millisecond) - - throttlerFunction := func() { - alreadyThrottling, currentReason := this.migrationContext.IsThrottled() - shouldThrottle, throttleReason := this.shouldThrottle() - if shouldThrottle && !alreadyThrottling { - // New throttling - this.applier.WriteAndLogChangelog("throttle", throttleReason) - } else if shouldThrottle && alreadyThrottling && (currentReason != throttleReason) { - // Change of reason - this.applier.WriteAndLogChangelog("throttle", throttleReason) - } else if alreadyThrottling && !shouldThrottle { - // End of throttling - this.applier.WriteAndLogChangelog("throttle", "done throttling") - } - this.migrationContext.SetThrottled(shouldThrottle, throttleReason) - } - throttlerFunction() - for range throttlerTick { - throttlerFunction() - } - - return nil -} - -// throttle initiates a throttling event, if need be, updates the Context and -// calls callback functions, if any -func (this *Migrator) throttle(onThrottled func()) { - for { - // IsThrottled() is non-blocking; the throttling decision making takes place asynchronously. - // Therefore calling IsThrottled() is cheap - if shouldThrottle, _ := this.migrationContext.IsThrottled(); !shouldThrottle { - return - } - if onThrottled != nil { - onThrottled() - } - time.Sleep(250 * time.Millisecond) - } -} - // sleepWhileTrue sleeps indefinitely until the given function returns 'false' // (or fails with error) func (this *Migrator) sleepWhileTrue(operation func() (bool, error)) error { @@ -315,7 +156,7 @@ func (this *Migrator) retryOperation(operation func() error, notFatalHint ...boo // throttles. func (this *Migrator) executeAndThrottleOnError(operation func() error) (err error) { if err := operation(); err != nil { - this.throttle(nil) + this.throttler.throttle(nil) return err } return nil @@ -373,19 +214,6 @@ func (this *Migrator) onChangelogStateEvent(dmlEvent *binlog.BinlogDMLEvent) (er return nil } -// parseChangelogHeartbeat is called when a heartbeat event is intercepted -func (this *Migrator) parseChangelogHeartbeat(heartbeatValue string) (err error) { - heartbeatTime, err := time.Parse(time.RFC3339Nano, heartbeatValue) - if err != nil { - return log.Errore(err) - } - lag := time.Since(heartbeatTime) - - atomic.StoreInt64(&this.migrationContext.CurrentLag, int64(lag)) - - return nil -} - // listenOnPanicAbort aborts on abort request func (this *Migrator) listenOnPanicAbort() { err := <-this.panicAbort @@ -496,10 +324,9 @@ func (this *Migrator) Migrate() (err error) { if err := this.applier.ReadMigrationRangeValues(); err != nil { return err } - go this.initiateThrottlerMetrics() - log.Infof("Waiting for first throttle metrics to be collected") - <-this.firstThrottlingCollected - go this.initiateThrottler() + if err := this.initiateThrottler(); err != nil { + return err + } if err := this.hooksExecutor.onBeforeRowCopy(); err != nil { return err } @@ -547,7 +374,7 @@ func (this *Migrator) cutOver() (err error) { return nil } this.migrationContext.MarkPointOfInterest() - this.throttle(func() { + this.throttler.throttle(func() { log.Debugf("throttling before swapping tables") }) @@ -626,7 +453,7 @@ func (this *Migrator) waitForEventsUpToLock() (err error) { return err } log.Infof("Waiting for events up to lock") - atomic.StoreInt64(&this.allEventsUpToLockProcessedInjectedFlag, 1) + atomic.StoreInt64(&this.migrationContext.AllEventsUpToLockProcessedInjectedFlag, 1) <-this.allEventsUpToLockProcessed waitForEventsUpToLockDuration := time.Since(waitForEventsUpToLockStartTime) @@ -643,7 +470,7 @@ func (this *Migrator) waitForEventsUpToLock() (err error) { func (this *Migrator) cutOverTwoStep() (err error) { atomic.StoreInt64(&this.inCutOverCriticalActionFlag, 1) defer atomic.StoreInt64(&this.inCutOverCriticalActionFlag, 0) - atomic.StoreInt64(&this.allEventsUpToLockProcessedInjectedFlag, 0) + atomic.StoreInt64(&this.migrationContext.AllEventsUpToLockProcessedInjectedFlag, 0) if err := this.retryOperation(this.applier.LockOriginalTable); err != nil { return err @@ -674,7 +501,7 @@ func (this *Migrator) atomicCutOver() (err error) { this.applier.DropAtomicCutOverSentryTableIfExists() }() - atomic.StoreInt64(&this.allEventsUpToLockProcessedInjectedFlag, 0) + atomic.StoreInt64(&this.migrationContext.AllEventsUpToLockProcessedInjectedFlag, 0) lockOriginalSessionIdChan := make(chan int64, 2) tableLocked := make(chan error, 2) @@ -1142,61 +969,6 @@ func (this *Migrator) printStatus(rule PrintStatusRule, writers ...io.Writer) { } } -// initiateHeartbeatReader listens for heartbeat events. gh-ost implements its own -// heartbeat mechanism, whether your DB has or hasn't an existing heartbeat solution. -// Heartbeat is supplied via the changelog table -func (this *Migrator) initiateHeartbeatReader() { - ticker := time.Tick(time.Duration(this.migrationContext.HeartbeatIntervalMilliseconds) * time.Millisecond) - for range ticker { - go func() error { - if atomic.LoadInt64(&this.cleanupImminentFlag) > 0 { - return nil - } - changelogState, err := this.inspector.readChangelogState() - if err != nil { - return log.Errore(err) - } - if heartbeatValue, ok := changelogState["heartbeat"]; ok { - this.parseChangelogHeartbeat(heartbeatValue) - } - return nil - }() - } -} - -// initiateControlReplicasReader -func (this *Migrator) initiateControlReplicasReader() { - readControlReplicasLag := func(replicationLagQuery string) error { - if (this.migrationContext.TestOnReplica || this.migrationContext.MigrateOnReplica) && (atomic.LoadInt64(&this.allEventsUpToLockProcessedInjectedFlag) > 0) { - return nil - } - lagResult := mysql.GetMaxReplicationLag(this.migrationContext.InspectorConnectionConfig, this.migrationContext.GetThrottleControlReplicaKeys(), replicationLagQuery) - this.migrationContext.SetControlReplicasLagResult(lagResult) - return nil - } - aggressiveTicker := time.Tick(100 * time.Millisecond) - relaxedFactor := 10 - counter := 0 - shouldReadLagAggressively := false - replicationLagQuery := "" - - for range aggressiveTicker { - if counter%relaxedFactor == 0 { - // we only check if we wish to be aggressive once per second. The parameters for being aggressive - // do not typically change at all throughout the migration, but nonetheless we check them. - counter = 0 - maxLagMillisecondsThrottleThreshold := atomic.LoadInt64(&this.migrationContext.MaxLagMillisecondsThrottleThreshold) - replicationLagQuery = this.migrationContext.GetReplicationLagQuery() - shouldReadLagAggressively = (replicationLagQuery != "" && maxLagMillisecondsThrottleThreshold < 1000) - } - if counter == 0 || shouldReadLagAggressively { - // We check replication lag every so often, or if we wish to be aggressive - readControlReplicasLag(replicationLagQuery) - } - counter++ - } -} - // initiateStreaming begins treaming of binary log events and registers listeners for such events func (this *Migrator) initiateStreaming() error { this.eventsStreamer = NewEventsStreamer() @@ -1242,8 +1014,15 @@ func (this *Migrator) addDMLEventsListener() error { return err } +// initiateThrottler kicks in the throttling collection and the throttling checks. func (this *Migrator) initiateThrottler() error { - this.throttler = NewThrottler(this.panicAbort) + this.throttler = NewThrottler(this.applier, this.inspector, this.panicAbort) + + go this.throttler.initiateThrottlerCollection(this.firstThrottlingCollected) + log.Infof("Waiting for first throttle metrics to be collected") + <-this.firstThrottlingCollected + go this.throttler.initiateThrottlerChecks() + return nil } @@ -1338,7 +1117,7 @@ func (this *Migrator) executeWriteFuncs() error { // - during copy phase // - just before cut-over // - in between cut-over retries - this.throttle(nil) + this.throttler.throttle(nil) // When cutting over, we need to be aggressive. Cut-over holds table locks. // We need to release those asap. } @@ -1384,7 +1163,7 @@ func (this *Migrator) executeWriteFuncs() error { // finalCleanup takes actions at very end of migration, dropping tables etc. func (this *Migrator) finalCleanup() error { - atomic.StoreInt64(&this.cleanupImminentFlag, 1) + atomic.StoreInt64(&this.migrationContext.CleanupImminentFlag, 1) if this.migrationContext.Noop { if createTableStatement, err := this.inspector.showCreateTable(this.migrationContext.GetGhostTableName()); err == nil { diff --git a/go/logic/throttler.go b/go/logic/throttler.go new file mode 100644 index 0000000..65e0157 --- /dev/null +++ b/go/logic/throttler.go @@ -0,0 +1,256 @@ +/* + Copyright 2016 GitHub Inc. + See https://github.com/github/gh-ost/blob/master/LICENSE +*/ + +package logic + +import ( + "fmt" + "sync/atomic" + "time" + + "github.com/github/gh-ost/go/base" + "github.com/github/gh-ost/go/mysql" + "github.com/outbrain/golib/log" +) + +// Throttler collects metrics related to throttling and makes informed decisison +// whether throttling should take place. +type Throttler struct { + migrationContext *base.MigrationContext + applier *Applier + inspector *Inspector + panicAbort chan error +} + +func NewThrottler(applier *Applier, inspector *Inspector, panicAbort chan error) *Throttler { + return &Throttler{ + migrationContext: base.GetMigrationContext(), + applier: applier, + inspector: inspector, + panicAbort: panicAbort, + } +} + +// shouldThrottle performs checks to see whether we should currently be throttling. +// It merely observes the metrics collected by other components, it does not issue +// its own metric collection. +func (this *Throttler) shouldThrottle() (result bool, reason string) { + generalCheckResult := this.migrationContext.GetThrottleGeneralCheckResult() + if generalCheckResult.ShouldThrottle { + return generalCheckResult.ShouldThrottle, generalCheckResult.Reason + } + // Replication lag throttle + maxLagMillisecondsThrottleThreshold := atomic.LoadInt64(&this.migrationContext.MaxLagMillisecondsThrottleThreshold) + lag := atomic.LoadInt64(&this.migrationContext.CurrentLag) + if time.Duration(lag) > time.Duration(maxLagMillisecondsThrottleThreshold)*time.Millisecond { + return true, fmt.Sprintf("lag=%fs", time.Duration(lag).Seconds()) + } + checkThrottleControlReplicas := true + if (this.migrationContext.TestOnReplica || this.migrationContext.MigrateOnReplica) && (atomic.LoadInt64(&this.migrationContext.AllEventsUpToLockProcessedInjectedFlag) > 0) { + checkThrottleControlReplicas = false + } + if checkThrottleControlReplicas { + lagResult := this.migrationContext.GetControlReplicasLagResult() + if lagResult.Err != nil { + return true, fmt.Sprintf("%+v %+v", lagResult.Key, lagResult.Err) + } + if lagResult.Lag > time.Duration(maxLagMillisecondsThrottleThreshold)*time.Millisecond { + return true, fmt.Sprintf("%+v replica-lag=%fs", lagResult.Key, lagResult.Lag.Seconds()) + } + } + // Got here? No metrics indicates we need throttling. + return false, "" +} + +// parseChangelogHeartbeat is called when a heartbeat event is intercepted +func (this *Throttler) parseChangelogHeartbeat(heartbeatValue string) (err error) { + heartbeatTime, err := time.Parse(time.RFC3339Nano, heartbeatValue) + if err != nil { + return log.Errore(err) + } + lag := time.Since(heartbeatTime) + atomic.StoreInt64(&this.migrationContext.CurrentLag, int64(lag)) + return nil +} + +// collectHeartbeat reads the latest changelog heartbeat value +func (this *Throttler) collectHeartbeat() { + ticker := time.Tick(time.Duration(this.migrationContext.HeartbeatIntervalMilliseconds) * time.Millisecond) + for range ticker { + go func() error { + if atomic.LoadInt64(&this.migrationContext.CleanupImminentFlag) > 0 { + return nil + } + changelogState, err := this.inspector.readChangelogState() + if err != nil { + return log.Errore(err) + } + if heartbeatValue, ok := changelogState["heartbeat"]; ok { + this.parseChangelogHeartbeat(heartbeatValue) + } + return nil + }() + } +} + +// collectControlReplicasLag polls all the control replicas to get maximum lag value +func (this *Throttler) collectControlReplicasLag() { + readControlReplicasLag := func(replicationLagQuery string) error { + if (this.migrationContext.TestOnReplica || this.migrationContext.MigrateOnReplica) && (atomic.LoadInt64(&this.migrationContext.AllEventsUpToLockProcessedInjectedFlag) > 0) { + return nil + } + lagResult := mysql.GetMaxReplicationLag( + this.migrationContext.InspectorConnectionConfig, + this.migrationContext.GetThrottleControlReplicaKeys(), + replicationLagQuery, + ) + this.migrationContext.SetControlReplicasLagResult(lagResult) + return nil + } + aggressiveTicker := time.Tick(100 * time.Millisecond) + relaxedFactor := 10 + counter := 0 + shouldReadLagAggressively := false + replicationLagQuery := "" + + for range aggressiveTicker { + if counter%relaxedFactor == 0 { + // we only check if we wish to be aggressive once per second. The parameters for being aggressive + // do not typically change at all throughout the migration, but nonetheless we check them. + counter = 0 + maxLagMillisecondsThrottleThreshold := atomic.LoadInt64(&this.migrationContext.MaxLagMillisecondsThrottleThreshold) + replicationLagQuery = this.migrationContext.GetReplicationLagQuery() + shouldReadLagAggressively = (replicationLagQuery != "" && maxLagMillisecondsThrottleThreshold < 1000) + } + if counter == 0 || shouldReadLagAggressively { + // We check replication lag every so often, or if we wish to be aggressive + readControlReplicasLag(replicationLagQuery) + } + counter++ + } +} + +// collectGeneralThrottleMetrics reads the once-per-sec metrics, and stores them onto this.migrationContext +func (this *Throttler) collectGeneralThrottleMetrics() error { + + setThrottle := func(throttle bool, reason string) error { + this.migrationContext.SetThrottleGeneralCheckResult(base.NewThrottleCheckResult(throttle, reason)) + return nil + } + + // Regardless of throttle, we take opportunity to check for panic-abort + if this.migrationContext.PanicFlagFile != "" { + if base.FileExists(this.migrationContext.PanicFlagFile) { + this.panicAbort <- fmt.Errorf("Found panic-file %s. Aborting without cleanup", this.migrationContext.PanicFlagFile) + } + } + criticalLoad := this.migrationContext.GetCriticalLoad() + for variableName, threshold := range criticalLoad { + value, err := this.applier.ShowStatusVariable(variableName) + if err != nil { + return setThrottle(true, fmt.Sprintf("%s %s", variableName, err)) + } + if value >= threshold { + this.panicAbort <- fmt.Errorf("critical-load met: %s=%d, >=%d", variableName, value, threshold) + } + } + + // Back to throttle considerations + + // User-based throttle + if atomic.LoadInt64(&this.migrationContext.ThrottleCommandedByUser) > 0 { + return setThrottle(true, "commanded by user") + } + if this.migrationContext.ThrottleFlagFile != "" { + if base.FileExists(this.migrationContext.ThrottleFlagFile) { + // Throttle file defined and exists! + return setThrottle(true, "flag-file") + } + } + if this.migrationContext.ThrottleAdditionalFlagFile != "" { + if base.FileExists(this.migrationContext.ThrottleAdditionalFlagFile) { + // 2nd Throttle file defined and exists! + return setThrottle(true, "flag-file") + } + } + + maxLoad := this.migrationContext.GetMaxLoad() + for variableName, threshold := range maxLoad { + value, err := this.applier.ShowStatusVariable(variableName) + if err != nil { + return setThrottle(true, fmt.Sprintf("%s %s", variableName, err)) + } + if value >= threshold { + return setThrottle(true, fmt.Sprintf("max-load %s=%d >= %d", variableName, value, threshold)) + } + } + if this.migrationContext.GetThrottleQuery() != "" { + if res, _ := this.applier.ExecuteThrottleQuery(); res > 0 { + return setThrottle(true, "throttle-query") + } + } + + return setThrottle(false, "") +} + +// initiateThrottlerMetrics initiates the various processes that collect measurements +// that may affect throttling. There are several components, all running independently, +// that collect such metrics. +func (this *Throttler) initiateThrottlerCollection(firstThrottlingCollected chan<- bool) { + go this.collectHeartbeat() + go this.collectControlReplicasLag() + + go func() { + throttlerMetricsTick := time.Tick(1 * time.Second) + this.collectGeneralThrottleMetrics() + firstThrottlingCollected <- true + for range throttlerMetricsTick { + this.collectGeneralThrottleMetrics() + } + }() +} + +// initiateThrottlerChecks initiates the throttle ticker and sets the basic behavior of throttling. +func (this *Throttler) initiateThrottlerChecks() error { + throttlerTick := time.Tick(100 * time.Millisecond) + + throttlerFunction := func() { + alreadyThrottling, currentReason := this.migrationContext.IsThrottled() + shouldThrottle, throttleReason := this.shouldThrottle() + if shouldThrottle && !alreadyThrottling { + // New throttling + this.applier.WriteAndLogChangelog("throttle", throttleReason) + } else if shouldThrottle && alreadyThrottling && (currentReason != throttleReason) { + // Change of reason + this.applier.WriteAndLogChangelog("throttle", throttleReason) + } else if alreadyThrottling && !shouldThrottle { + // End of throttling + this.applier.WriteAndLogChangelog("throttle", "done throttling") + } + this.migrationContext.SetThrottled(shouldThrottle, throttleReason) + } + throttlerFunction() + for range throttlerTick { + throttlerFunction() + } + + return nil +} + +// throttle sees if throttling needs take place, and if so, continuously sleeps (blocks) +// until throttling reasons are gone +func (this *Throttler) throttle(onThrottled func()) { + for { + // IsThrottled() is non-blocking; the throttling decision making takes place asynchronously. + // Therefore calling IsThrottled() is cheap + if shouldThrottle, _ := this.migrationContext.IsThrottled(); !shouldThrottle { + return + } + if onThrottled != nil { + onThrottled() + } + time.Sleep(250 * time.Millisecond) + } +}