refactored all throttling code into throttler.so

This commit is contained in:
Shlomi Noach 2016-08-30 12:25:45 +02:00
parent 23357d0643
commit b2c71931c6
3 changed files with 307 additions and 270 deletions

View File

@ -108,34 +108,36 @@ type MigrationContext struct {
InitiallyDropGhostTable bool InitiallyDropGhostTable bool
CutOverType CutOver CutOverType CutOver
Hostname string Hostname string
TableEngine string TableEngine string
RowsEstimate int64 RowsEstimate int64
RowsDeltaEstimate int64 RowsDeltaEstimate int64
UsedRowsEstimateMethod RowsEstimateMethod UsedRowsEstimateMethod RowsEstimateMethod
HasSuperPrivilege bool HasSuperPrivilege bool
OriginalBinlogFormat string OriginalBinlogFormat string
OriginalBinlogRowImage string OriginalBinlogRowImage string
InspectorConnectionConfig *mysql.ConnectionConfig InspectorConnectionConfig *mysql.ConnectionConfig
ApplierConnectionConfig *mysql.ConnectionConfig ApplierConnectionConfig *mysql.ConnectionConfig
StartTime time.Time StartTime time.Time
RowCopyStartTime time.Time RowCopyStartTime time.Time
RowCopyEndTime time.Time RowCopyEndTime time.Time
LockTablesStartTime time.Time LockTablesStartTime time.Time
RenameTablesStartTime time.Time RenameTablesStartTime time.Time
RenameTablesEndTime time.Time RenameTablesEndTime time.Time
pointOfInterestTime time.Time pointOfInterestTime time.Time
pointOfInterestTimeMutex *sync.Mutex pointOfInterestTimeMutex *sync.Mutex
CurrentLag int64 CurrentLag int64
controlReplicasLagResult mysql.ReplicationLagResult controlReplicasLagResult mysql.ReplicationLagResult
TotalRowsCopied int64 TotalRowsCopied int64
TotalDMLEventsApplied int64 TotalDMLEventsApplied int64
isThrottled bool isThrottled bool
throttleReason string throttleReason string
throttleGeneralCheckResult ThrottleCheckResult throttleGeneralCheckResult ThrottleCheckResult
throttleMutex *sync.Mutex throttleMutex *sync.Mutex
IsPostponingCutOver int64 IsPostponingCutOver int64
CountingRowsFlag int64 CountingRowsFlag int64
AllEventsUpToLockProcessedInjectedFlag int64
CleanupImminentFlag int64
OriginalTableColumns *sql.ColumnList OriginalTableColumns *sql.ColumnList
OriginalTableUniqueKeys [](*sql.UniqueKey) OriginalTableUniqueKeys [](*sql.UniqueKey)

View File

@ -20,7 +20,6 @@ import (
"github.com/github/gh-ost/go/base" "github.com/github/gh-ost/go/base"
"github.com/github/gh-ost/go/binlog" "github.com/github/gh-ost/go/binlog"
"github.com/github/gh-ost/go/mysql"
"github.com/github/gh-ost/go/sql" "github.com/github/gh-ost/go/sql"
"github.com/outbrain/golib/log" "github.com/outbrain/golib/log"
@ -65,11 +64,9 @@ type Migrator struct {
allEventsUpToLockProcessed chan bool allEventsUpToLockProcessed chan bool
panicAbort chan error panicAbort chan error
rowCopyCompleteFlag int64 rowCopyCompleteFlag int64
allEventsUpToLockProcessedInjectedFlag int64 inCutOverCriticalActionFlag int64
inCutOverCriticalActionFlag int64 userCommandedUnpostponeFlag int64
cleanupImminentFlag int64
userCommandedUnpostponeFlag int64
// copyRowsQueue should not be buffered; if buffered some non-damaging but // copyRowsQueue should not be buffered; if buffered some non-damaging but
// excessive work happens at the end of the iteration as new copy-jobs arrive befroe realizing the copy is complete // excessive work happens at the end of the iteration as new copy-jobs arrive befroe realizing the copy is complete
copyRowsQueue chan tableWriteFunc copyRowsQueue chan tableWriteFunc
@ -88,8 +85,6 @@ func NewMigrator() *Migrator {
allEventsUpToLockProcessed: make(chan bool), allEventsUpToLockProcessed: make(chan bool),
panicAbort: make(chan error), panicAbort: make(chan error),
allEventsUpToLockProcessedInjectedFlag: 0,
copyRowsQueue: make(chan tableWriteFunc), copyRowsQueue: make(chan tableWriteFunc),
applyEventsQueue: make(chan tableWriteFunc, applyEventsQueueBuffer), applyEventsQueue: make(chan tableWriteFunc, applyEventsQueueBuffer),
handledChangelogStates: make(map[string]bool), handledChangelogStates: make(map[string]bool),
@ -121,160 +116,6 @@ func (this *Migrator) initiateHooksExecutor() (err error) {
return nil return nil
} }
// shouldThrottle performs checks to see whether we should currently be throttling.
// It merely observes the metrics collected by other components, it does not issue
// its own metric collection.
func (this *Migrator) shouldThrottle() (result bool, reason string) {
generalCheckResult := this.migrationContext.GetThrottleGeneralCheckResult()
if generalCheckResult.ShouldThrottle {
return generalCheckResult.ShouldThrottle, generalCheckResult.Reason
}
// Replication lag throttle
maxLagMillisecondsThrottleThreshold := atomic.LoadInt64(&this.migrationContext.MaxLagMillisecondsThrottleThreshold)
lag := atomic.LoadInt64(&this.migrationContext.CurrentLag)
if time.Duration(lag) > time.Duration(maxLagMillisecondsThrottleThreshold)*time.Millisecond {
return true, fmt.Sprintf("lag=%fs", time.Duration(lag).Seconds())
}
checkThrottleControlReplicas := true
if (this.migrationContext.TestOnReplica || this.migrationContext.MigrateOnReplica) && (atomic.LoadInt64(&this.allEventsUpToLockProcessedInjectedFlag) > 0) {
checkThrottleControlReplicas = false
}
if checkThrottleControlReplicas {
lagResult := this.migrationContext.GetControlReplicasLagResult()
if lagResult.Err != nil {
return true, fmt.Sprintf("%+v %+v", lagResult.Key, lagResult.Err)
}
if lagResult.Lag > time.Duration(maxLagMillisecondsThrottleThreshold)*time.Millisecond {
return true, fmt.Sprintf("%+v replica-lag=%fs", lagResult.Key, lagResult.Lag.Seconds())
}
}
// Got here? No metrics indicates we need throttling.
return false, ""
}
// readGeneralThrottleMetrics reads the once-per-sec metrics, and stores them onto this.migrationContext
func (this *Migrator) readGeneralThrottleMetrics() error {
setThrottle := func(throttle bool, reason string) error {
this.migrationContext.SetThrottleGeneralCheckResult(base.NewThrottleCheckResult(throttle, reason))
return nil
}
// Regardless of throttle, we take opportunity to check for panic-abort
if this.migrationContext.PanicFlagFile != "" {
if base.FileExists(this.migrationContext.PanicFlagFile) {
this.panicAbort <- fmt.Errorf("Found panic-file %s. Aborting without cleanup", this.migrationContext.PanicFlagFile)
}
}
criticalLoad := this.migrationContext.GetCriticalLoad()
for variableName, threshold := range criticalLoad {
value, err := this.applier.ShowStatusVariable(variableName)
if err != nil {
return setThrottle(true, fmt.Sprintf("%s %s", variableName, err))
}
if value >= threshold {
this.panicAbort <- fmt.Errorf("critical-load met: %s=%d, >=%d", variableName, value, threshold)
}
}
// Back to throttle considerations
// User-based throttle
if atomic.LoadInt64(&this.migrationContext.ThrottleCommandedByUser) > 0 {
return setThrottle(true, "commanded by user")
}
if this.migrationContext.ThrottleFlagFile != "" {
if base.FileExists(this.migrationContext.ThrottleFlagFile) {
// Throttle file defined and exists!
return setThrottle(true, "flag-file")
}
}
if this.migrationContext.ThrottleAdditionalFlagFile != "" {
if base.FileExists(this.migrationContext.ThrottleAdditionalFlagFile) {
// 2nd Throttle file defined and exists!
return setThrottle(true, "flag-file")
}
}
maxLoad := this.migrationContext.GetMaxLoad()
for variableName, threshold := range maxLoad {
value, err := this.applier.ShowStatusVariable(variableName)
if err != nil {
return setThrottle(true, fmt.Sprintf("%s %s", variableName, err))
}
if value >= threshold {
return setThrottle(true, fmt.Sprintf("max-load %s=%d >= %d", variableName, value, threshold))
}
}
if this.migrationContext.GetThrottleQuery() != "" {
if res, _ := this.applier.ExecuteThrottleQuery(); res > 0 {
return setThrottle(true, "throttle-query")
}
}
return setThrottle(false, "")
}
// initiateThrottlerMetrics initiates the various processes that collect measurements
// that may affect throttling. There are several components, all running independently,
// that collect such metrics.
func (this *Migrator) initiateThrottlerMetrics() {
go this.initiateHeartbeatReader()
go this.initiateControlReplicasReader()
go func() {
throttlerMetricsTick := time.Tick(1 * time.Second)
this.readGeneralThrottleMetrics()
this.firstThrottlingCollected <- true
for range throttlerMetricsTick {
this.readGeneralThrottleMetrics()
}
}()
}
// initiateThrottler initiates the throttle ticker and sets the basic behavior of throttling.
func (this *Migrator) initiateThrottler() error {
throttlerTick := time.Tick(100 * time.Millisecond)
throttlerFunction := func() {
alreadyThrottling, currentReason := this.migrationContext.IsThrottled()
shouldThrottle, throttleReason := this.shouldThrottle()
if shouldThrottle && !alreadyThrottling {
// New throttling
this.applier.WriteAndLogChangelog("throttle", throttleReason)
} else if shouldThrottle && alreadyThrottling && (currentReason != throttleReason) {
// Change of reason
this.applier.WriteAndLogChangelog("throttle", throttleReason)
} else if alreadyThrottling && !shouldThrottle {
// End of throttling
this.applier.WriteAndLogChangelog("throttle", "done throttling")
}
this.migrationContext.SetThrottled(shouldThrottle, throttleReason)
}
throttlerFunction()
for range throttlerTick {
throttlerFunction()
}
return nil
}
// throttle initiates a throttling event, if need be, updates the Context and
// calls callback functions, if any
func (this *Migrator) throttle(onThrottled func()) {
for {
// IsThrottled() is non-blocking; the throttling decision making takes place asynchronously.
// Therefore calling IsThrottled() is cheap
if shouldThrottle, _ := this.migrationContext.IsThrottled(); !shouldThrottle {
return
}
if onThrottled != nil {
onThrottled()
}
time.Sleep(250 * time.Millisecond)
}
}
// sleepWhileTrue sleeps indefinitely until the given function returns 'false' // sleepWhileTrue sleeps indefinitely until the given function returns 'false'
// (or fails with error) // (or fails with error)
func (this *Migrator) sleepWhileTrue(operation func() (bool, error)) error { func (this *Migrator) sleepWhileTrue(operation func() (bool, error)) error {
@ -315,7 +156,7 @@ func (this *Migrator) retryOperation(operation func() error, notFatalHint ...boo
// throttles. // throttles.
func (this *Migrator) executeAndThrottleOnError(operation func() error) (err error) { func (this *Migrator) executeAndThrottleOnError(operation func() error) (err error) {
if err := operation(); err != nil { if err := operation(); err != nil {
this.throttle(nil) this.throttler.throttle(nil)
return err return err
} }
return nil return nil
@ -373,19 +214,6 @@ func (this *Migrator) onChangelogStateEvent(dmlEvent *binlog.BinlogDMLEvent) (er
return nil return nil
} }
// parseChangelogHeartbeat is called when a heartbeat event is intercepted
func (this *Migrator) parseChangelogHeartbeat(heartbeatValue string) (err error) {
heartbeatTime, err := time.Parse(time.RFC3339Nano, heartbeatValue)
if err != nil {
return log.Errore(err)
}
lag := time.Since(heartbeatTime)
atomic.StoreInt64(&this.migrationContext.CurrentLag, int64(lag))
return nil
}
// listenOnPanicAbort aborts on abort request // listenOnPanicAbort aborts on abort request
func (this *Migrator) listenOnPanicAbort() { func (this *Migrator) listenOnPanicAbort() {
err := <-this.panicAbort err := <-this.panicAbort
@ -496,10 +324,9 @@ func (this *Migrator) Migrate() (err error) {
if err := this.applier.ReadMigrationRangeValues(); err != nil { if err := this.applier.ReadMigrationRangeValues(); err != nil {
return err return err
} }
go this.initiateThrottlerMetrics() if err := this.initiateThrottler(); err != nil {
log.Infof("Waiting for first throttle metrics to be collected") return err
<-this.firstThrottlingCollected }
go this.initiateThrottler()
if err := this.hooksExecutor.onBeforeRowCopy(); err != nil { if err := this.hooksExecutor.onBeforeRowCopy(); err != nil {
return err return err
} }
@ -547,7 +374,7 @@ func (this *Migrator) cutOver() (err error) {
return nil return nil
} }
this.migrationContext.MarkPointOfInterest() this.migrationContext.MarkPointOfInterest()
this.throttle(func() { this.throttler.throttle(func() {
log.Debugf("throttling before swapping tables") log.Debugf("throttling before swapping tables")
}) })
@ -626,7 +453,7 @@ func (this *Migrator) waitForEventsUpToLock() (err error) {
return err return err
} }
log.Infof("Waiting for events up to lock") log.Infof("Waiting for events up to lock")
atomic.StoreInt64(&this.allEventsUpToLockProcessedInjectedFlag, 1) atomic.StoreInt64(&this.migrationContext.AllEventsUpToLockProcessedInjectedFlag, 1)
<-this.allEventsUpToLockProcessed <-this.allEventsUpToLockProcessed
waitForEventsUpToLockDuration := time.Since(waitForEventsUpToLockStartTime) waitForEventsUpToLockDuration := time.Since(waitForEventsUpToLockStartTime)
@ -643,7 +470,7 @@ func (this *Migrator) waitForEventsUpToLock() (err error) {
func (this *Migrator) cutOverTwoStep() (err error) { func (this *Migrator) cutOverTwoStep() (err error) {
atomic.StoreInt64(&this.inCutOverCriticalActionFlag, 1) atomic.StoreInt64(&this.inCutOverCriticalActionFlag, 1)
defer atomic.StoreInt64(&this.inCutOverCriticalActionFlag, 0) defer atomic.StoreInt64(&this.inCutOverCriticalActionFlag, 0)
atomic.StoreInt64(&this.allEventsUpToLockProcessedInjectedFlag, 0) atomic.StoreInt64(&this.migrationContext.AllEventsUpToLockProcessedInjectedFlag, 0)
if err := this.retryOperation(this.applier.LockOriginalTable); err != nil { if err := this.retryOperation(this.applier.LockOriginalTable); err != nil {
return err return err
@ -674,7 +501,7 @@ func (this *Migrator) atomicCutOver() (err error) {
this.applier.DropAtomicCutOverSentryTableIfExists() this.applier.DropAtomicCutOverSentryTableIfExists()
}() }()
atomic.StoreInt64(&this.allEventsUpToLockProcessedInjectedFlag, 0) atomic.StoreInt64(&this.migrationContext.AllEventsUpToLockProcessedInjectedFlag, 0)
lockOriginalSessionIdChan := make(chan int64, 2) lockOriginalSessionIdChan := make(chan int64, 2)
tableLocked := make(chan error, 2) tableLocked := make(chan error, 2)
@ -1142,61 +969,6 @@ func (this *Migrator) printStatus(rule PrintStatusRule, writers ...io.Writer) {
} }
} }
// initiateHeartbeatReader listens for heartbeat events. gh-ost implements its own
// heartbeat mechanism, whether your DB has or hasn't an existing heartbeat solution.
// Heartbeat is supplied via the changelog table
func (this *Migrator) initiateHeartbeatReader() {
ticker := time.Tick(time.Duration(this.migrationContext.HeartbeatIntervalMilliseconds) * time.Millisecond)
for range ticker {
go func() error {
if atomic.LoadInt64(&this.cleanupImminentFlag) > 0 {
return nil
}
changelogState, err := this.inspector.readChangelogState()
if err != nil {
return log.Errore(err)
}
if heartbeatValue, ok := changelogState["heartbeat"]; ok {
this.parseChangelogHeartbeat(heartbeatValue)
}
return nil
}()
}
}
// initiateControlReplicasReader
func (this *Migrator) initiateControlReplicasReader() {
readControlReplicasLag := func(replicationLagQuery string) error {
if (this.migrationContext.TestOnReplica || this.migrationContext.MigrateOnReplica) && (atomic.LoadInt64(&this.allEventsUpToLockProcessedInjectedFlag) > 0) {
return nil
}
lagResult := mysql.GetMaxReplicationLag(this.migrationContext.InspectorConnectionConfig, this.migrationContext.GetThrottleControlReplicaKeys(), replicationLagQuery)
this.migrationContext.SetControlReplicasLagResult(lagResult)
return nil
}
aggressiveTicker := time.Tick(100 * time.Millisecond)
relaxedFactor := 10
counter := 0
shouldReadLagAggressively := false
replicationLagQuery := ""
for range aggressiveTicker {
if counter%relaxedFactor == 0 {
// we only check if we wish to be aggressive once per second. The parameters for being aggressive
// do not typically change at all throughout the migration, but nonetheless we check them.
counter = 0
maxLagMillisecondsThrottleThreshold := atomic.LoadInt64(&this.migrationContext.MaxLagMillisecondsThrottleThreshold)
replicationLagQuery = this.migrationContext.GetReplicationLagQuery()
shouldReadLagAggressively = (replicationLagQuery != "" && maxLagMillisecondsThrottleThreshold < 1000)
}
if counter == 0 || shouldReadLagAggressively {
// We check replication lag every so often, or if we wish to be aggressive
readControlReplicasLag(replicationLagQuery)
}
counter++
}
}
// initiateStreaming begins treaming of binary log events and registers listeners for such events // initiateStreaming begins treaming of binary log events and registers listeners for such events
func (this *Migrator) initiateStreaming() error { func (this *Migrator) initiateStreaming() error {
this.eventsStreamer = NewEventsStreamer() this.eventsStreamer = NewEventsStreamer()
@ -1242,8 +1014,15 @@ func (this *Migrator) addDMLEventsListener() error {
return err return err
} }
// initiateThrottler kicks in the throttling collection and the throttling checks.
func (this *Migrator) initiateThrottler() error { func (this *Migrator) initiateThrottler() error {
this.throttler = NewThrottler(this.panicAbort) this.throttler = NewThrottler(this.applier, this.inspector, this.panicAbort)
go this.throttler.initiateThrottlerCollection(this.firstThrottlingCollected)
log.Infof("Waiting for first throttle metrics to be collected")
<-this.firstThrottlingCollected
go this.throttler.initiateThrottlerChecks()
return nil return nil
} }
@ -1338,7 +1117,7 @@ func (this *Migrator) executeWriteFuncs() error {
// - during copy phase // - during copy phase
// - just before cut-over // - just before cut-over
// - in between cut-over retries // - in between cut-over retries
this.throttle(nil) this.throttler.throttle(nil)
// When cutting over, we need to be aggressive. Cut-over holds table locks. // When cutting over, we need to be aggressive. Cut-over holds table locks.
// We need to release those asap. // We need to release those asap.
} }
@ -1384,7 +1163,7 @@ func (this *Migrator) executeWriteFuncs() error {
// finalCleanup takes actions at very end of migration, dropping tables etc. // finalCleanup takes actions at very end of migration, dropping tables etc.
func (this *Migrator) finalCleanup() error { func (this *Migrator) finalCleanup() error {
atomic.StoreInt64(&this.cleanupImminentFlag, 1) atomic.StoreInt64(&this.migrationContext.CleanupImminentFlag, 1)
if this.migrationContext.Noop { if this.migrationContext.Noop {
if createTableStatement, err := this.inspector.showCreateTable(this.migrationContext.GetGhostTableName()); err == nil { if createTableStatement, err := this.inspector.showCreateTable(this.migrationContext.GetGhostTableName()); err == nil {

256
go/logic/throttler.go Normal file
View File

@ -0,0 +1,256 @@
/*
Copyright 2016 GitHub Inc.
See https://github.com/github/gh-ost/blob/master/LICENSE
*/
package logic
import (
"fmt"
"sync/atomic"
"time"
"github.com/github/gh-ost/go/base"
"github.com/github/gh-ost/go/mysql"
"github.com/outbrain/golib/log"
)
// Throttler collects metrics related to throttling and makes informed decisison
// whether throttling should take place.
type Throttler struct {
migrationContext *base.MigrationContext
applier *Applier
inspector *Inspector
panicAbort chan error
}
func NewThrottler(applier *Applier, inspector *Inspector, panicAbort chan error) *Throttler {
return &Throttler{
migrationContext: base.GetMigrationContext(),
applier: applier,
inspector: inspector,
panicAbort: panicAbort,
}
}
// shouldThrottle performs checks to see whether we should currently be throttling.
// It merely observes the metrics collected by other components, it does not issue
// its own metric collection.
func (this *Throttler) shouldThrottle() (result bool, reason string) {
generalCheckResult := this.migrationContext.GetThrottleGeneralCheckResult()
if generalCheckResult.ShouldThrottle {
return generalCheckResult.ShouldThrottle, generalCheckResult.Reason
}
// Replication lag throttle
maxLagMillisecondsThrottleThreshold := atomic.LoadInt64(&this.migrationContext.MaxLagMillisecondsThrottleThreshold)
lag := atomic.LoadInt64(&this.migrationContext.CurrentLag)
if time.Duration(lag) > time.Duration(maxLagMillisecondsThrottleThreshold)*time.Millisecond {
return true, fmt.Sprintf("lag=%fs", time.Duration(lag).Seconds())
}
checkThrottleControlReplicas := true
if (this.migrationContext.TestOnReplica || this.migrationContext.MigrateOnReplica) && (atomic.LoadInt64(&this.migrationContext.AllEventsUpToLockProcessedInjectedFlag) > 0) {
checkThrottleControlReplicas = false
}
if checkThrottleControlReplicas {
lagResult := this.migrationContext.GetControlReplicasLagResult()
if lagResult.Err != nil {
return true, fmt.Sprintf("%+v %+v", lagResult.Key, lagResult.Err)
}
if lagResult.Lag > time.Duration(maxLagMillisecondsThrottleThreshold)*time.Millisecond {
return true, fmt.Sprintf("%+v replica-lag=%fs", lagResult.Key, lagResult.Lag.Seconds())
}
}
// Got here? No metrics indicates we need throttling.
return false, ""
}
// parseChangelogHeartbeat is called when a heartbeat event is intercepted
func (this *Throttler) parseChangelogHeartbeat(heartbeatValue string) (err error) {
heartbeatTime, err := time.Parse(time.RFC3339Nano, heartbeatValue)
if err != nil {
return log.Errore(err)
}
lag := time.Since(heartbeatTime)
atomic.StoreInt64(&this.migrationContext.CurrentLag, int64(lag))
return nil
}
// collectHeartbeat reads the latest changelog heartbeat value
func (this *Throttler) collectHeartbeat() {
ticker := time.Tick(time.Duration(this.migrationContext.HeartbeatIntervalMilliseconds) * time.Millisecond)
for range ticker {
go func() error {
if atomic.LoadInt64(&this.migrationContext.CleanupImminentFlag) > 0 {
return nil
}
changelogState, err := this.inspector.readChangelogState()
if err != nil {
return log.Errore(err)
}
if heartbeatValue, ok := changelogState["heartbeat"]; ok {
this.parseChangelogHeartbeat(heartbeatValue)
}
return nil
}()
}
}
// collectControlReplicasLag polls all the control replicas to get maximum lag value
func (this *Throttler) collectControlReplicasLag() {
readControlReplicasLag := func(replicationLagQuery string) error {
if (this.migrationContext.TestOnReplica || this.migrationContext.MigrateOnReplica) && (atomic.LoadInt64(&this.migrationContext.AllEventsUpToLockProcessedInjectedFlag) > 0) {
return nil
}
lagResult := mysql.GetMaxReplicationLag(
this.migrationContext.InspectorConnectionConfig,
this.migrationContext.GetThrottleControlReplicaKeys(),
replicationLagQuery,
)
this.migrationContext.SetControlReplicasLagResult(lagResult)
return nil
}
aggressiveTicker := time.Tick(100 * time.Millisecond)
relaxedFactor := 10
counter := 0
shouldReadLagAggressively := false
replicationLagQuery := ""
for range aggressiveTicker {
if counter%relaxedFactor == 0 {
// we only check if we wish to be aggressive once per second. The parameters for being aggressive
// do not typically change at all throughout the migration, but nonetheless we check them.
counter = 0
maxLagMillisecondsThrottleThreshold := atomic.LoadInt64(&this.migrationContext.MaxLagMillisecondsThrottleThreshold)
replicationLagQuery = this.migrationContext.GetReplicationLagQuery()
shouldReadLagAggressively = (replicationLagQuery != "" && maxLagMillisecondsThrottleThreshold < 1000)
}
if counter == 0 || shouldReadLagAggressively {
// We check replication lag every so often, or if we wish to be aggressive
readControlReplicasLag(replicationLagQuery)
}
counter++
}
}
// collectGeneralThrottleMetrics reads the once-per-sec metrics, and stores them onto this.migrationContext
func (this *Throttler) collectGeneralThrottleMetrics() error {
setThrottle := func(throttle bool, reason string) error {
this.migrationContext.SetThrottleGeneralCheckResult(base.NewThrottleCheckResult(throttle, reason))
return nil
}
// Regardless of throttle, we take opportunity to check for panic-abort
if this.migrationContext.PanicFlagFile != "" {
if base.FileExists(this.migrationContext.PanicFlagFile) {
this.panicAbort <- fmt.Errorf("Found panic-file %s. Aborting without cleanup", this.migrationContext.PanicFlagFile)
}
}
criticalLoad := this.migrationContext.GetCriticalLoad()
for variableName, threshold := range criticalLoad {
value, err := this.applier.ShowStatusVariable(variableName)
if err != nil {
return setThrottle(true, fmt.Sprintf("%s %s", variableName, err))
}
if value >= threshold {
this.panicAbort <- fmt.Errorf("critical-load met: %s=%d, >=%d", variableName, value, threshold)
}
}
// Back to throttle considerations
// User-based throttle
if atomic.LoadInt64(&this.migrationContext.ThrottleCommandedByUser) > 0 {
return setThrottle(true, "commanded by user")
}
if this.migrationContext.ThrottleFlagFile != "" {
if base.FileExists(this.migrationContext.ThrottleFlagFile) {
// Throttle file defined and exists!
return setThrottle(true, "flag-file")
}
}
if this.migrationContext.ThrottleAdditionalFlagFile != "" {
if base.FileExists(this.migrationContext.ThrottleAdditionalFlagFile) {
// 2nd Throttle file defined and exists!
return setThrottle(true, "flag-file")
}
}
maxLoad := this.migrationContext.GetMaxLoad()
for variableName, threshold := range maxLoad {
value, err := this.applier.ShowStatusVariable(variableName)
if err != nil {
return setThrottle(true, fmt.Sprintf("%s %s", variableName, err))
}
if value >= threshold {
return setThrottle(true, fmt.Sprintf("max-load %s=%d >= %d", variableName, value, threshold))
}
}
if this.migrationContext.GetThrottleQuery() != "" {
if res, _ := this.applier.ExecuteThrottleQuery(); res > 0 {
return setThrottle(true, "throttle-query")
}
}
return setThrottle(false, "")
}
// initiateThrottlerMetrics initiates the various processes that collect measurements
// that may affect throttling. There are several components, all running independently,
// that collect such metrics.
func (this *Throttler) initiateThrottlerCollection(firstThrottlingCollected chan<- bool) {
go this.collectHeartbeat()
go this.collectControlReplicasLag()
go func() {
throttlerMetricsTick := time.Tick(1 * time.Second)
this.collectGeneralThrottleMetrics()
firstThrottlingCollected <- true
for range throttlerMetricsTick {
this.collectGeneralThrottleMetrics()
}
}()
}
// initiateThrottlerChecks initiates the throttle ticker and sets the basic behavior of throttling.
func (this *Throttler) initiateThrottlerChecks() error {
throttlerTick := time.Tick(100 * time.Millisecond)
throttlerFunction := func() {
alreadyThrottling, currentReason := this.migrationContext.IsThrottled()
shouldThrottle, throttleReason := this.shouldThrottle()
if shouldThrottle && !alreadyThrottling {
// New throttling
this.applier.WriteAndLogChangelog("throttle", throttleReason)
} else if shouldThrottle && alreadyThrottling && (currentReason != throttleReason) {
// Change of reason
this.applier.WriteAndLogChangelog("throttle", throttleReason)
} else if alreadyThrottling && !shouldThrottle {
// End of throttling
this.applier.WriteAndLogChangelog("throttle", "done throttling")
}
this.migrationContext.SetThrottled(shouldThrottle, throttleReason)
}
throttlerFunction()
for range throttlerTick {
throttlerFunction()
}
return nil
}
// throttle sees if throttling needs take place, and if so, continuously sleeps (blocks)
// until throttling reasons are gone
func (this *Throttler) throttle(onThrottled func()) {
for {
// IsThrottled() is non-blocking; the throttling decision making takes place asynchronously.
// Therefore calling IsThrottled() is cheap
if shouldThrottle, _ := this.migrationContext.IsThrottled(); !shouldThrottle {
return
}
if onThrottled != nil {
onThrottled()
}
time.Sleep(250 * time.Millisecond)
}
}