Merge pull request #204 from github/reduce-minimum-max-lag
Reduce minimum maxLagMillisecondsThrottleThreshold to 100ms
This commit is contained in:
commit
904215e286
@ -18,7 +18,7 @@ Both interfaces may serve at the same time. Both respond to simple text command,
|
|||||||
- `status`: returns a detailed status summary of migration progress and configuration
|
- `status`: returns a detailed status summary of migration progress and configuration
|
||||||
- `sup`: returns a brief status summary of migration progress
|
- `sup`: returns a brief status summary of migration progress
|
||||||
- `chunk-size=<newsize>`: modify the `chunk-size`; applies on next running copy-iteration
|
- `chunk-size=<newsize>`: modify the `chunk-size`; applies on next running copy-iteration
|
||||||
- `max-lag-millis=<max-lag>`: modify the maximum replication lag threshold (milliseconds, minimum value is `1000`, i.e. 1 second)
|
- `max-lag-millis=<max-lag>`: modify the maximum replication lag threshold (milliseconds, minimum value is `100`, i.e. `0.1` second)
|
||||||
- `max-load=<max-load-thresholds>`: modify the `max-load` config; applies on next running copy-iteration
|
- `max-load=<max-load-thresholds>`: modify the `max-load` config; applies on next running copy-iteration
|
||||||
The `max-load` format must be: `some_status=<numeric-threshold>[,some_status=<numeric-threshold>...]`. For example: `Threads_running=50,threads_connected=1000`, and you would then write/echo `max-load=Threads_running=50,threads_connected=1000` to the socket.
|
The `max-load` format must be: `some_status=<numeric-threshold>[,some_status=<numeric-threshold>...]`. For example: `Threads_running=50,threads_connected=1000`, and you would then write/echo `max-load=Threads_running=50,threads_connected=1000` to the socket.
|
||||||
- `critical-load=<load>`: change critical load setting (exceeding given thresholds causes panic and abort)
|
- `critical-load=<load>`: change critical load setting (exceeding given thresholds causes panic and abort)
|
||||||
|
@ -41,6 +41,18 @@ var (
|
|||||||
envVariableRegexp = regexp.MustCompile("[$][{](.*)[}]")
|
envVariableRegexp = regexp.MustCompile("[$][{](.*)[}]")
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type ThrottleCheckResult struct {
|
||||||
|
ShouldThrottle bool
|
||||||
|
Reason string
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewThrottleCheckResult(throttle bool, reason string) *ThrottleCheckResult {
|
||||||
|
return &ThrottleCheckResult{
|
||||||
|
ShouldThrottle: throttle,
|
||||||
|
Reason: reason,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// MigrationContext has the general, global state of migration. It is used by
|
// MigrationContext has the general, global state of migration. It is used by
|
||||||
// all components throughout the migration process.
|
// all components throughout the migration process.
|
||||||
type MigrationContext struct {
|
type MigrationContext struct {
|
||||||
@ -64,6 +76,7 @@ type MigrationContext struct {
|
|||||||
CliUser string
|
CliUser string
|
||||||
CliPassword string
|
CliPassword string
|
||||||
|
|
||||||
|
HeartbeatIntervalMilliseconds int64
|
||||||
defaultNumRetries int64
|
defaultNumRetries int64
|
||||||
ChunkSize int64
|
ChunkSize int64
|
||||||
niceRatio float64
|
niceRatio float64
|
||||||
@ -114,13 +127,17 @@ type MigrationContext struct {
|
|||||||
pointOfInterestTime time.Time
|
pointOfInterestTime time.Time
|
||||||
pointOfInterestTimeMutex *sync.Mutex
|
pointOfInterestTimeMutex *sync.Mutex
|
||||||
CurrentLag int64
|
CurrentLag int64
|
||||||
|
controlReplicasLagResult mysql.ReplicationLagResult
|
||||||
TotalRowsCopied int64
|
TotalRowsCopied int64
|
||||||
TotalDMLEventsApplied int64
|
TotalDMLEventsApplied int64
|
||||||
isThrottled bool
|
isThrottled bool
|
||||||
throttleReason string
|
throttleReason string
|
||||||
|
throttleGeneralCheckResult ThrottleCheckResult
|
||||||
throttleMutex *sync.Mutex
|
throttleMutex *sync.Mutex
|
||||||
IsPostponingCutOver int64
|
IsPostponingCutOver int64
|
||||||
CountingRowsFlag int64
|
CountingRowsFlag int64
|
||||||
|
AllEventsUpToLockProcessedInjectedFlag int64
|
||||||
|
CleanupImminentFlag int64
|
||||||
|
|
||||||
OriginalTableColumns *sql.ColumnList
|
OriginalTableColumns *sql.ColumnList
|
||||||
OriginalTableUniqueKeys [](*sql.UniqueKey)
|
OriginalTableUniqueKeys [](*sql.UniqueKey)
|
||||||
@ -326,9 +343,19 @@ func (this *MigrationContext) TimeSincePointOfInterest() time.Duration {
|
|||||||
return time.Since(this.pointOfInterestTime)
|
return time.Since(this.pointOfInterestTime)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (this *MigrationContext) SetHeartbeatIntervalMilliseconds(heartbeatIntervalMilliseconds int64) {
|
||||||
|
if heartbeatIntervalMilliseconds < 100 {
|
||||||
|
heartbeatIntervalMilliseconds = 100
|
||||||
|
}
|
||||||
|
if heartbeatIntervalMilliseconds > 1000 {
|
||||||
|
heartbeatIntervalMilliseconds = 1000
|
||||||
|
}
|
||||||
|
this.HeartbeatIntervalMilliseconds = heartbeatIntervalMilliseconds
|
||||||
|
}
|
||||||
|
|
||||||
func (this *MigrationContext) SetMaxLagMillisecondsThrottleThreshold(maxLagMillisecondsThrottleThreshold int64) {
|
func (this *MigrationContext) SetMaxLagMillisecondsThrottleThreshold(maxLagMillisecondsThrottleThreshold int64) {
|
||||||
if maxLagMillisecondsThrottleThreshold < 1000 {
|
if maxLagMillisecondsThrottleThreshold < 100 {
|
||||||
maxLagMillisecondsThrottleThreshold = 1000
|
maxLagMillisecondsThrottleThreshold = 100
|
||||||
}
|
}
|
||||||
atomic.StoreInt64(&this.MaxLagMillisecondsThrottleThreshold, maxLagMillisecondsThrottleThreshold)
|
atomic.StoreInt64(&this.MaxLagMillisecondsThrottleThreshold, maxLagMillisecondsThrottleThreshold)
|
||||||
}
|
}
|
||||||
@ -343,6 +370,20 @@ func (this *MigrationContext) SetChunkSize(chunkSize int64) {
|
|||||||
atomic.StoreInt64(&this.ChunkSize, chunkSize)
|
atomic.StoreInt64(&this.ChunkSize, chunkSize)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (this *MigrationContext) SetThrottleGeneralCheckResult(checkResult *ThrottleCheckResult) *ThrottleCheckResult {
|
||||||
|
this.throttleMutex.Lock()
|
||||||
|
defer this.throttleMutex.Unlock()
|
||||||
|
this.throttleGeneralCheckResult = *checkResult
|
||||||
|
return checkResult
|
||||||
|
}
|
||||||
|
|
||||||
|
func (this *MigrationContext) GetThrottleGeneralCheckResult() *ThrottleCheckResult {
|
||||||
|
this.throttleMutex.Lock()
|
||||||
|
defer this.throttleMutex.Unlock()
|
||||||
|
result := this.throttleGeneralCheckResult
|
||||||
|
return &result
|
||||||
|
}
|
||||||
|
|
||||||
func (this *MigrationContext) SetThrottled(throttle bool, reason string) {
|
func (this *MigrationContext) SetThrottled(throttle bool, reason string) {
|
||||||
this.throttleMutex.Lock()
|
this.throttleMutex.Lock()
|
||||||
defer this.throttleMutex.Unlock()
|
defer this.throttleMutex.Unlock()
|
||||||
@ -454,6 +495,20 @@ func (this *MigrationContext) ReadCriticalLoad(criticalLoadList string) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (this *MigrationContext) GetControlReplicasLagResult() mysql.ReplicationLagResult {
|
||||||
|
this.throttleMutex.Lock()
|
||||||
|
defer this.throttleMutex.Unlock()
|
||||||
|
|
||||||
|
lagResult := this.controlReplicasLagResult
|
||||||
|
return lagResult
|
||||||
|
}
|
||||||
|
|
||||||
|
func (this *MigrationContext) SetControlReplicasLagResult(lagResult *mysql.ReplicationLagResult) {
|
||||||
|
this.throttleMutex.Lock()
|
||||||
|
defer this.throttleMutex.Unlock()
|
||||||
|
this.controlReplicasLagResult = *lagResult
|
||||||
|
}
|
||||||
|
|
||||||
func (this *MigrationContext) GetThrottleControlReplicaKeys() *mysql.InstanceKeyMap {
|
func (this *MigrationContext) GetThrottleControlReplicaKeys() *mysql.InstanceKeyMap {
|
||||||
this.throttleMutex.Lock()
|
this.throttleMutex.Lock()
|
||||||
defer this.throttleMutex.Unlock()
|
defer this.throttleMutex.Unlock()
|
||||||
|
@ -81,6 +81,7 @@ func main() {
|
|||||||
replicationLagQuery := flag.String("replication-lag-query", "", "Query that detects replication lag in seconds. Result can be a floating point (by default gh-ost issues SHOW SLAVE STATUS and reads Seconds_behind_master). If you're using pt-heartbeat, query would be something like: SELECT ROUND(UNIX_TIMESTAMP() - MAX(UNIX_TIMESTAMP(ts))) AS delay FROM my_schema.heartbeat")
|
replicationLagQuery := flag.String("replication-lag-query", "", "Query that detects replication lag in seconds. Result can be a floating point (by default gh-ost issues SHOW SLAVE STATUS and reads Seconds_behind_master). If you're using pt-heartbeat, query would be something like: SELECT ROUND(UNIX_TIMESTAMP() - MAX(UNIX_TIMESTAMP(ts))) AS delay FROM my_schema.heartbeat")
|
||||||
throttleControlReplicas := flag.String("throttle-control-replicas", "", "List of replicas on which to check for lag; comma delimited. Example: myhost1.com:3306,myhost2.com,myhost3.com:3307")
|
throttleControlReplicas := flag.String("throttle-control-replicas", "", "List of replicas on which to check for lag; comma delimited. Example: myhost1.com:3306,myhost2.com,myhost3.com:3307")
|
||||||
throttleQuery := flag.String("throttle-query", "", "when given, issued (every second) to check if operation should throttle. Expecting to return zero for no-throttle, >0 for throttle. Query is issued on the migrated server. Make sure this query is lightweight")
|
throttleQuery := flag.String("throttle-query", "", "when given, issued (every second) to check if operation should throttle. Expecting to return zero for no-throttle, >0 for throttle. Query is issued on the migrated server. Make sure this query is lightweight")
|
||||||
|
heartbeatIntervalMillis := flag.Int64("heartbeat-interval-millis", 500, "how frequently would gh-ost inject a heartbeat value")
|
||||||
flag.StringVar(&migrationContext.ThrottleFlagFile, "throttle-flag-file", "", "operation pauses when this file exists; hint: use a file that is specific to the table being altered")
|
flag.StringVar(&migrationContext.ThrottleFlagFile, "throttle-flag-file", "", "operation pauses when this file exists; hint: use a file that is specific to the table being altered")
|
||||||
flag.StringVar(&migrationContext.ThrottleAdditionalFlagFile, "throttle-additional-flag-file", "/tmp/gh-ost.throttle", "operation pauses when this file exists; hint: keep default, use for throttling multiple gh-ost operations")
|
flag.StringVar(&migrationContext.ThrottleAdditionalFlagFile, "throttle-additional-flag-file", "/tmp/gh-ost.throttle", "operation pauses when this file exists; hint: keep default, use for throttling multiple gh-ost operations")
|
||||||
flag.StringVar(&migrationContext.PostponeCutOverFlagFile, "postpone-cut-over-flag-file", "", "while this file exists, migration will postpone the final stage of swapping tables, and will keep on syncing the ghost table. Cut-over/swapping would be ready to perform the moment the file is deleted.")
|
flag.StringVar(&migrationContext.PostponeCutOverFlagFile, "postpone-cut-over-flag-file", "", "while this file exists, migration will postpone the final stage of swapping tables, and will keep on syncing the ghost table. Cut-over/swapping would be ready to perform the moment the file is deleted.")
|
||||||
@ -184,6 +185,7 @@ func main() {
|
|||||||
if migrationContext.ServeSocketFile == "" {
|
if migrationContext.ServeSocketFile == "" {
|
||||||
migrationContext.ServeSocketFile = fmt.Sprintf("/tmp/gh-ost.%s.%s.sock", migrationContext.DatabaseName, migrationContext.OriginalTableName)
|
migrationContext.ServeSocketFile = fmt.Sprintf("/tmp/gh-ost.%s.%s.sock", migrationContext.DatabaseName, migrationContext.OriginalTableName)
|
||||||
}
|
}
|
||||||
|
migrationContext.SetHeartbeatIntervalMilliseconds(*heartbeatIntervalMillis)
|
||||||
migrationContext.SetNiceRatio(*niceRatio)
|
migrationContext.SetNiceRatio(*niceRatio)
|
||||||
migrationContext.SetChunkSize(*chunkSize)
|
migrationContext.SetChunkSize(*chunkSize)
|
||||||
migrationContext.SetMaxLagMillisecondsThrottleThreshold(*maxLagMillis)
|
migrationContext.SetMaxLagMillisecondsThrottleThreshold(*maxLagMillis)
|
||||||
|
@ -258,7 +258,7 @@ func (this *Applier) WriteChangelogState(value string) (string, error) {
|
|||||||
|
|
||||||
// InitiateHeartbeat creates a heartbeat cycle, writing to the changelog table.
|
// InitiateHeartbeat creates a heartbeat cycle, writing to the changelog table.
|
||||||
// This is done asynchronously
|
// This is done asynchronously
|
||||||
func (this *Applier) InitiateHeartbeat(heartbeatIntervalMilliseconds int64) {
|
func (this *Applier) InitiateHeartbeat() {
|
||||||
var numSuccessiveFailures int64
|
var numSuccessiveFailures int64
|
||||||
injectHeartbeat := func() error {
|
injectHeartbeat := func() error {
|
||||||
if _, err := this.WriteChangelog("heartbeat", time.Now().Format(time.RFC3339Nano)); err != nil {
|
if _, err := this.WriteChangelog("heartbeat", time.Now().Format(time.RFC3339Nano)); err != nil {
|
||||||
@ -273,10 +273,10 @@ func (this *Applier) InitiateHeartbeat(heartbeatIntervalMilliseconds int64) {
|
|||||||
}
|
}
|
||||||
injectHeartbeat()
|
injectHeartbeat()
|
||||||
|
|
||||||
heartbeatTick := time.Tick(time.Duration(heartbeatIntervalMilliseconds) * time.Millisecond)
|
heartbeatTick := time.Tick(time.Duration(this.migrationContext.HeartbeatIntervalMilliseconds) * time.Millisecond)
|
||||||
for range heartbeatTick {
|
for range heartbeatTick {
|
||||||
// Generally speaking, we would issue a goroutine, but I'd actually rather
|
// Generally speaking, we would issue a goroutine, but I'd actually rather
|
||||||
// have this blocked rather than spam the master in the event something
|
// have this block the loop rather than spam the master in the event something
|
||||||
// goes wrong
|
// goes wrong
|
||||||
if err := injectHeartbeat(); err != nil {
|
if err := injectHeartbeat(); err != nil {
|
||||||
return
|
return
|
||||||
|
@ -20,7 +20,6 @@ import (
|
|||||||
|
|
||||||
"github.com/github/gh-ost/go/base"
|
"github.com/github/gh-ost/go/base"
|
||||||
"github.com/github/gh-ost/go/binlog"
|
"github.com/github/gh-ost/go/binlog"
|
||||||
"github.com/github/gh-ost/go/mysql"
|
|
||||||
"github.com/github/gh-ost/go/sql"
|
"github.com/github/gh-ost/go/sql"
|
||||||
|
|
||||||
"github.com/outbrain/golib/log"
|
"github.com/outbrain/golib/log"
|
||||||
@ -37,7 +36,6 @@ type tableWriteFunc func() error
|
|||||||
|
|
||||||
const (
|
const (
|
||||||
applyEventsQueueBuffer = 100
|
applyEventsQueueBuffer = 100
|
||||||
heartbeatIntervalMilliseconds = 1000
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type PrintStatusRule int
|
type PrintStatusRule int
|
||||||
@ -56,18 +54,18 @@ type Migrator struct {
|
|||||||
applier *Applier
|
applier *Applier
|
||||||
eventsStreamer *EventsStreamer
|
eventsStreamer *EventsStreamer
|
||||||
server *Server
|
server *Server
|
||||||
|
throttler *Throttler
|
||||||
hooksExecutor *HooksExecutor
|
hooksExecutor *HooksExecutor
|
||||||
migrationContext *base.MigrationContext
|
migrationContext *base.MigrationContext
|
||||||
|
|
||||||
|
firstThrottlingCollected chan bool
|
||||||
tablesInPlace chan bool
|
tablesInPlace chan bool
|
||||||
rowCopyComplete chan bool
|
rowCopyComplete chan bool
|
||||||
allEventsUpToLockProcessed chan bool
|
allEventsUpToLockProcessed chan bool
|
||||||
panicAbort chan error
|
panicAbort chan error
|
||||||
|
|
||||||
rowCopyCompleteFlag int64
|
rowCopyCompleteFlag int64
|
||||||
allEventsUpToLockProcessedInjectedFlag int64
|
|
||||||
inCutOverCriticalActionFlag int64
|
inCutOverCriticalActionFlag int64
|
||||||
cleanupImminentFlag int64
|
|
||||||
userCommandedUnpostponeFlag int64
|
userCommandedUnpostponeFlag int64
|
||||||
// copyRowsQueue should not be buffered; if buffered some non-damaging but
|
// copyRowsQueue should not be buffered; if buffered some non-damaging but
|
||||||
// excessive work happens at the end of the iteration as new copy-jobs arrive befroe realizing the copy is complete
|
// excessive work happens at the end of the iteration as new copy-jobs arrive befroe realizing the copy is complete
|
||||||
@ -82,12 +80,11 @@ func NewMigrator() *Migrator {
|
|||||||
migrationContext: base.GetMigrationContext(),
|
migrationContext: base.GetMigrationContext(),
|
||||||
parser: sql.NewParser(),
|
parser: sql.NewParser(),
|
||||||
tablesInPlace: make(chan bool),
|
tablesInPlace: make(chan bool),
|
||||||
|
firstThrottlingCollected: make(chan bool, 1),
|
||||||
rowCopyComplete: make(chan bool),
|
rowCopyComplete: make(chan bool),
|
||||||
allEventsUpToLockProcessed: make(chan bool),
|
allEventsUpToLockProcessed: make(chan bool),
|
||||||
panicAbort: make(chan error),
|
panicAbort: make(chan error),
|
||||||
|
|
||||||
allEventsUpToLockProcessedInjectedFlag: 0,
|
|
||||||
|
|
||||||
copyRowsQueue: make(chan tableWriteFunc),
|
copyRowsQueue: make(chan tableWriteFunc),
|
||||||
applyEventsQueue: make(chan tableWriteFunc, applyEventsQueueBuffer),
|
applyEventsQueue: make(chan tableWriteFunc, applyEventsQueueBuffer),
|
||||||
handledChangelogStates: make(map[string]bool),
|
handledChangelogStates: make(map[string]bool),
|
||||||
@ -119,126 +116,6 @@ func (this *Migrator) initiateHooksExecutor() (err error) {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// shouldThrottle performs checks to see whether we should currently be throttling.
|
|
||||||
// It also checks for critical-load and panic aborts.
|
|
||||||
func (this *Migrator) shouldThrottle() (result bool, reason string) {
|
|
||||||
// Regardless of throttle, we take opportunity to check for panic-abort
|
|
||||||
if this.migrationContext.PanicFlagFile != "" {
|
|
||||||
if base.FileExists(this.migrationContext.PanicFlagFile) {
|
|
||||||
this.panicAbort <- fmt.Errorf("Found panic-file %s. Aborting without cleanup", this.migrationContext.PanicFlagFile)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
criticalLoad := this.migrationContext.GetCriticalLoad()
|
|
||||||
for variableName, threshold := range criticalLoad {
|
|
||||||
value, err := this.applier.ShowStatusVariable(variableName)
|
|
||||||
if err != nil {
|
|
||||||
return true, fmt.Sprintf("%s %s", variableName, err)
|
|
||||||
}
|
|
||||||
if value >= threshold {
|
|
||||||
this.panicAbort <- fmt.Errorf("critical-load met: %s=%d, >=%d", variableName, value, threshold)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Back to throttle considerations
|
|
||||||
|
|
||||||
// User-based throttle
|
|
||||||
if atomic.LoadInt64(&this.migrationContext.ThrottleCommandedByUser) > 0 {
|
|
||||||
return true, "commanded by user"
|
|
||||||
}
|
|
||||||
if this.migrationContext.ThrottleFlagFile != "" {
|
|
||||||
if base.FileExists(this.migrationContext.ThrottleFlagFile) {
|
|
||||||
// Throttle file defined and exists!
|
|
||||||
return true, "flag-file"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if this.migrationContext.ThrottleAdditionalFlagFile != "" {
|
|
||||||
if base.FileExists(this.migrationContext.ThrottleAdditionalFlagFile) {
|
|
||||||
// 2nd Throttle file defined and exists!
|
|
||||||
return true, "flag-file"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// Replication lag throttle
|
|
||||||
maxLagMillisecondsThrottleThreshold := atomic.LoadInt64(&this.migrationContext.MaxLagMillisecondsThrottleThreshold)
|
|
||||||
lag := atomic.LoadInt64(&this.migrationContext.CurrentLag)
|
|
||||||
if time.Duration(lag) > time.Duration(maxLagMillisecondsThrottleThreshold)*time.Millisecond {
|
|
||||||
return true, fmt.Sprintf("lag=%fs", time.Duration(lag).Seconds())
|
|
||||||
}
|
|
||||||
checkThrottleControlReplicas := true
|
|
||||||
if (this.migrationContext.TestOnReplica || this.migrationContext.MigrateOnReplica) && (atomic.LoadInt64(&this.allEventsUpToLockProcessedInjectedFlag) > 0) {
|
|
||||||
checkThrottleControlReplicas = false
|
|
||||||
}
|
|
||||||
if checkThrottleControlReplicas {
|
|
||||||
lagResult := mysql.GetMaxReplicationLag(this.migrationContext.InspectorConnectionConfig, this.migrationContext.GetThrottleControlReplicaKeys(), this.migrationContext.GetReplicationLagQuery())
|
|
||||||
if lagResult.Err != nil {
|
|
||||||
return true, fmt.Sprintf("%+v %+v", lagResult.Key, lagResult.Err)
|
|
||||||
}
|
|
||||||
if lagResult.Lag > time.Duration(maxLagMillisecondsThrottleThreshold)*time.Millisecond {
|
|
||||||
return true, fmt.Sprintf("%+v replica-lag=%fs", lagResult.Key, lagResult.Lag.Seconds())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
maxLoad := this.migrationContext.GetMaxLoad()
|
|
||||||
for variableName, threshold := range maxLoad {
|
|
||||||
value, err := this.applier.ShowStatusVariable(variableName)
|
|
||||||
if err != nil {
|
|
||||||
return true, fmt.Sprintf("%s %s", variableName, err)
|
|
||||||
}
|
|
||||||
if value >= threshold {
|
|
||||||
return true, fmt.Sprintf("max-load %s=%d >= %d", variableName, value, threshold)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if this.migrationContext.GetThrottleQuery() != "" {
|
|
||||||
if res, _ := this.applier.ExecuteThrottleQuery(); res > 0 {
|
|
||||||
return true, "throttle-query"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return false, ""
|
|
||||||
}
|
|
||||||
|
|
||||||
// initiateThrottler initiates the throttle ticker and sets the basic behavior of throttling.
|
|
||||||
func (this *Migrator) initiateThrottler() error {
|
|
||||||
throttlerTick := time.Tick(1 * time.Second)
|
|
||||||
|
|
||||||
throttlerFunction := func() {
|
|
||||||
alreadyThrottling, currentReason := this.migrationContext.IsThrottled()
|
|
||||||
shouldThrottle, throttleReason := this.shouldThrottle()
|
|
||||||
if shouldThrottle && !alreadyThrottling {
|
|
||||||
// New throttling
|
|
||||||
this.applier.WriteAndLogChangelog("throttle", throttleReason)
|
|
||||||
} else if shouldThrottle && alreadyThrottling && (currentReason != throttleReason) {
|
|
||||||
// Change of reason
|
|
||||||
this.applier.WriteAndLogChangelog("throttle", throttleReason)
|
|
||||||
} else if alreadyThrottling && !shouldThrottle {
|
|
||||||
// End of throttling
|
|
||||||
this.applier.WriteAndLogChangelog("throttle", "done throttling")
|
|
||||||
}
|
|
||||||
this.migrationContext.SetThrottled(shouldThrottle, throttleReason)
|
|
||||||
}
|
|
||||||
throttlerFunction()
|
|
||||||
for range throttlerTick {
|
|
||||||
throttlerFunction()
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// throttle initiates a throttling event, if need be, updates the Context and
|
|
||||||
// calls callback functions, if any
|
|
||||||
func (this *Migrator) throttle(onThrottled func()) {
|
|
||||||
for {
|
|
||||||
// IsThrottled() is non-blocking; the throttling decision making takes place asynchronously.
|
|
||||||
// Therefore calling IsThrottled() is cheap
|
|
||||||
if shouldThrottle, _ := this.migrationContext.IsThrottled(); !shouldThrottle {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if onThrottled != nil {
|
|
||||||
onThrottled()
|
|
||||||
}
|
|
||||||
time.Sleep(250 * time.Millisecond)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// sleepWhileTrue sleeps indefinitely until the given function returns 'false'
|
// sleepWhileTrue sleeps indefinitely until the given function returns 'false'
|
||||||
// (or fails with error)
|
// (or fails with error)
|
||||||
func (this *Migrator) sleepWhileTrue(operation func() (bool, error)) error {
|
func (this *Migrator) sleepWhileTrue(operation func() (bool, error)) error {
|
||||||
@ -279,7 +156,7 @@ func (this *Migrator) retryOperation(operation func() error, notFatalHint ...boo
|
|||||||
// throttles.
|
// throttles.
|
||||||
func (this *Migrator) executeAndThrottleOnError(operation func() error) (err error) {
|
func (this *Migrator) executeAndThrottleOnError(operation func() error) (err error) {
|
||||||
if err := operation(); err != nil {
|
if err := operation(); err != nil {
|
||||||
this.throttle(nil)
|
this.throttler.throttle(nil)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
@ -337,19 +214,6 @@ func (this *Migrator) onChangelogStateEvent(dmlEvent *binlog.BinlogDMLEvent) (er
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// onChangelogHeartbeat is called when a heartbeat event is intercepted
|
|
||||||
func (this *Migrator) onChangelogHeartbeat(heartbeatValue string) (err error) {
|
|
||||||
heartbeatTime, err := time.Parse(time.RFC3339Nano, heartbeatValue)
|
|
||||||
if err != nil {
|
|
||||||
return log.Errore(err)
|
|
||||||
}
|
|
||||||
lag := time.Since(heartbeatTime)
|
|
||||||
|
|
||||||
atomic.StoreInt64(&this.migrationContext.CurrentLag, int64(lag))
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// listenOnPanicAbort aborts on abort request
|
// listenOnPanicAbort aborts on abort request
|
||||||
func (this *Migrator) listenOnPanicAbort() {
|
func (this *Migrator) listenOnPanicAbort() {
|
||||||
err := <-this.panicAbort
|
err := <-this.panicAbort
|
||||||
@ -454,16 +318,15 @@ func (this *Migrator) Migrate() (err error) {
|
|||||||
if err := this.countTableRows(); err != nil {
|
if err := this.countTableRows(); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := this.addDMLEventsListener(); err != nil {
|
if err := this.addDMLEventsListener(); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
go this.initiateHeartbeatListener()
|
|
||||||
|
|
||||||
if err := this.applier.ReadMigrationRangeValues(); err != nil {
|
if err := this.applier.ReadMigrationRangeValues(); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
go this.initiateThrottler()
|
if err := this.initiateThrottler(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
if err := this.hooksExecutor.onBeforeRowCopy(); err != nil {
|
if err := this.hooksExecutor.onBeforeRowCopy(); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -511,7 +374,7 @@ func (this *Migrator) cutOver() (err error) {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
this.migrationContext.MarkPointOfInterest()
|
this.migrationContext.MarkPointOfInterest()
|
||||||
this.throttle(func() {
|
this.throttler.throttle(func() {
|
||||||
log.Debugf("throttling before swapping tables")
|
log.Debugf("throttling before swapping tables")
|
||||||
})
|
})
|
||||||
|
|
||||||
@ -590,7 +453,7 @@ func (this *Migrator) waitForEventsUpToLock() (err error) {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
log.Infof("Waiting for events up to lock")
|
log.Infof("Waiting for events up to lock")
|
||||||
atomic.StoreInt64(&this.allEventsUpToLockProcessedInjectedFlag, 1)
|
atomic.StoreInt64(&this.migrationContext.AllEventsUpToLockProcessedInjectedFlag, 1)
|
||||||
<-this.allEventsUpToLockProcessed
|
<-this.allEventsUpToLockProcessed
|
||||||
waitForEventsUpToLockDuration := time.Since(waitForEventsUpToLockStartTime)
|
waitForEventsUpToLockDuration := time.Since(waitForEventsUpToLockStartTime)
|
||||||
|
|
||||||
@ -607,7 +470,7 @@ func (this *Migrator) waitForEventsUpToLock() (err error) {
|
|||||||
func (this *Migrator) cutOverTwoStep() (err error) {
|
func (this *Migrator) cutOverTwoStep() (err error) {
|
||||||
atomic.StoreInt64(&this.inCutOverCriticalActionFlag, 1)
|
atomic.StoreInt64(&this.inCutOverCriticalActionFlag, 1)
|
||||||
defer atomic.StoreInt64(&this.inCutOverCriticalActionFlag, 0)
|
defer atomic.StoreInt64(&this.inCutOverCriticalActionFlag, 0)
|
||||||
atomic.StoreInt64(&this.allEventsUpToLockProcessedInjectedFlag, 0)
|
atomic.StoreInt64(&this.migrationContext.AllEventsUpToLockProcessedInjectedFlag, 0)
|
||||||
|
|
||||||
if err := this.retryOperation(this.applier.LockOriginalTable); err != nil {
|
if err := this.retryOperation(this.applier.LockOriginalTable); err != nil {
|
||||||
return err
|
return err
|
||||||
@ -638,7 +501,7 @@ func (this *Migrator) atomicCutOver() (err error) {
|
|||||||
this.applier.DropAtomicCutOverSentryTableIfExists()
|
this.applier.DropAtomicCutOverSentryTableIfExists()
|
||||||
}()
|
}()
|
||||||
|
|
||||||
atomic.StoreInt64(&this.allEventsUpToLockProcessedInjectedFlag, 0)
|
atomic.StoreInt64(&this.migrationContext.AllEventsUpToLockProcessedInjectedFlag, 0)
|
||||||
|
|
||||||
lockOriginalSessionIdChan := make(chan int64, 2)
|
lockOriginalSessionIdChan := make(chan int64, 2)
|
||||||
tableLocked := make(chan error, 2)
|
tableLocked := make(chan error, 2)
|
||||||
@ -1108,33 +971,6 @@ func (this *Migrator) printStatus(rule PrintStatusRule, writers ...io.Writer) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// initiateHeartbeatListener listens for heartbeat events. gh-ost implements its own
|
|
||||||
// heartbeat mechanism, whether your DB has or hasn't an existing heartbeat solution.
|
|
||||||
// Heartbeat is supplied via the changelog table
|
|
||||||
func (this *Migrator) initiateHeartbeatListener() {
|
|
||||||
ticker := time.Tick((heartbeatIntervalMilliseconds * time.Millisecond) / 2)
|
|
||||||
for range ticker {
|
|
||||||
go func() error {
|
|
||||||
if atomic.LoadInt64(&this.cleanupImminentFlag) > 0 {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
changelogState, err := this.inspector.readChangelogState()
|
|
||||||
if err != nil {
|
|
||||||
return log.Errore(err)
|
|
||||||
}
|
|
||||||
for hint, value := range changelogState {
|
|
||||||
switch hint {
|
|
||||||
case "heartbeat":
|
|
||||||
{
|
|
||||||
this.onChangelogHeartbeat(value)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// initiateStreaming begins treaming of binary log events and registers listeners for such events
|
// initiateStreaming begins treaming of binary log events and registers listeners for such events
|
||||||
func (this *Migrator) initiateStreaming() error {
|
func (this *Migrator) initiateStreaming() error {
|
||||||
this.eventsStreamer = NewEventsStreamer()
|
this.eventsStreamer = NewEventsStreamer()
|
||||||
@ -1180,6 +1016,18 @@ func (this *Migrator) addDMLEventsListener() error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// initiateThrottler kicks in the throttling collection and the throttling checks.
|
||||||
|
func (this *Migrator) initiateThrottler() error {
|
||||||
|
this.throttler = NewThrottler(this.applier, this.inspector, this.panicAbort)
|
||||||
|
|
||||||
|
go this.throttler.initiateThrottlerCollection(this.firstThrottlingCollected)
|
||||||
|
log.Infof("Waiting for first throttle metrics to be collected")
|
||||||
|
<-this.firstThrottlingCollected
|
||||||
|
go this.throttler.initiateThrottlerChecks()
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (this *Migrator) initiateApplier() error {
|
func (this *Migrator) initiateApplier() error {
|
||||||
this.applier = NewApplier()
|
this.applier = NewApplier()
|
||||||
if err := this.applier.InitDBConnections(); err != nil {
|
if err := this.applier.InitDBConnections(); err != nil {
|
||||||
@ -1202,7 +1050,7 @@ func (this *Migrator) initiateApplier() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
this.applier.WriteChangelogState(string(TablesInPlace))
|
this.applier.WriteChangelogState(string(TablesInPlace))
|
||||||
go this.applier.InitiateHeartbeat(heartbeatIntervalMilliseconds)
|
go this.applier.InitiateHeartbeat()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1271,7 +1119,7 @@ func (this *Migrator) executeWriteFuncs() error {
|
|||||||
// - during copy phase
|
// - during copy phase
|
||||||
// - just before cut-over
|
// - just before cut-over
|
||||||
// - in between cut-over retries
|
// - in between cut-over retries
|
||||||
this.throttle(nil)
|
this.throttler.throttle(nil)
|
||||||
// When cutting over, we need to be aggressive. Cut-over holds table locks.
|
// When cutting over, we need to be aggressive. Cut-over holds table locks.
|
||||||
// We need to release those asap.
|
// We need to release those asap.
|
||||||
}
|
}
|
||||||
@ -1317,7 +1165,7 @@ func (this *Migrator) executeWriteFuncs() error {
|
|||||||
|
|
||||||
// finalCleanup takes actions at very end of migration, dropping tables etc.
|
// finalCleanup takes actions at very end of migration, dropping tables etc.
|
||||||
func (this *Migrator) finalCleanup() error {
|
func (this *Migrator) finalCleanup() error {
|
||||||
atomic.StoreInt64(&this.cleanupImminentFlag, 1)
|
atomic.StoreInt64(&this.migrationContext.CleanupImminentFlag, 1)
|
||||||
|
|
||||||
if this.migrationContext.Noop {
|
if this.migrationContext.Noop {
|
||||||
if createTableStatement, err := this.inspector.showCreateTable(this.migrationContext.GetGhostTableName()); err == nil {
|
if createTableStatement, err := this.inspector.showCreateTable(this.migrationContext.GetGhostTableName()); err == nil {
|
||||||
|
256
go/logic/throttler.go
Normal file
256
go/logic/throttler.go
Normal file
@ -0,0 +1,256 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2016 GitHub Inc.
|
||||||
|
See https://github.com/github/gh-ost/blob/master/LICENSE
|
||||||
|
*/
|
||||||
|
|
||||||
|
package logic
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"sync/atomic"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/github/gh-ost/go/base"
|
||||||
|
"github.com/github/gh-ost/go/mysql"
|
||||||
|
"github.com/outbrain/golib/log"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Throttler collects metrics related to throttling and makes informed decisison
|
||||||
|
// whether throttling should take place.
|
||||||
|
type Throttler struct {
|
||||||
|
migrationContext *base.MigrationContext
|
||||||
|
applier *Applier
|
||||||
|
inspector *Inspector
|
||||||
|
panicAbort chan error
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewThrottler(applier *Applier, inspector *Inspector, panicAbort chan error) *Throttler {
|
||||||
|
return &Throttler{
|
||||||
|
migrationContext: base.GetMigrationContext(),
|
||||||
|
applier: applier,
|
||||||
|
inspector: inspector,
|
||||||
|
panicAbort: panicAbort,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// shouldThrottle performs checks to see whether we should currently be throttling.
|
||||||
|
// It merely observes the metrics collected by other components, it does not issue
|
||||||
|
// its own metric collection.
|
||||||
|
func (this *Throttler) shouldThrottle() (result bool, reason string) {
|
||||||
|
generalCheckResult := this.migrationContext.GetThrottleGeneralCheckResult()
|
||||||
|
if generalCheckResult.ShouldThrottle {
|
||||||
|
return generalCheckResult.ShouldThrottle, generalCheckResult.Reason
|
||||||
|
}
|
||||||
|
// Replication lag throttle
|
||||||
|
maxLagMillisecondsThrottleThreshold := atomic.LoadInt64(&this.migrationContext.MaxLagMillisecondsThrottleThreshold)
|
||||||
|
lag := atomic.LoadInt64(&this.migrationContext.CurrentLag)
|
||||||
|
if time.Duration(lag) > time.Duration(maxLagMillisecondsThrottleThreshold)*time.Millisecond {
|
||||||
|
return true, fmt.Sprintf("lag=%fs", time.Duration(lag).Seconds())
|
||||||
|
}
|
||||||
|
checkThrottleControlReplicas := true
|
||||||
|
if (this.migrationContext.TestOnReplica || this.migrationContext.MigrateOnReplica) && (atomic.LoadInt64(&this.migrationContext.AllEventsUpToLockProcessedInjectedFlag) > 0) {
|
||||||
|
checkThrottleControlReplicas = false
|
||||||
|
}
|
||||||
|
if checkThrottleControlReplicas {
|
||||||
|
lagResult := this.migrationContext.GetControlReplicasLagResult()
|
||||||
|
if lagResult.Err != nil {
|
||||||
|
return true, fmt.Sprintf("%+v %+v", lagResult.Key, lagResult.Err)
|
||||||
|
}
|
||||||
|
if lagResult.Lag > time.Duration(maxLagMillisecondsThrottleThreshold)*time.Millisecond {
|
||||||
|
return true, fmt.Sprintf("%+v replica-lag=%fs", lagResult.Key, lagResult.Lag.Seconds())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Got here? No metrics indicates we need throttling.
|
||||||
|
return false, ""
|
||||||
|
}
|
||||||
|
|
||||||
|
// parseChangelogHeartbeat is called when a heartbeat event is intercepted
|
||||||
|
func (this *Throttler) parseChangelogHeartbeat(heartbeatValue string) (err error) {
|
||||||
|
heartbeatTime, err := time.Parse(time.RFC3339Nano, heartbeatValue)
|
||||||
|
if err != nil {
|
||||||
|
return log.Errore(err)
|
||||||
|
}
|
||||||
|
lag := time.Since(heartbeatTime)
|
||||||
|
atomic.StoreInt64(&this.migrationContext.CurrentLag, int64(lag))
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// collectHeartbeat reads the latest changelog heartbeat value
|
||||||
|
func (this *Throttler) collectHeartbeat() {
|
||||||
|
ticker := time.Tick(time.Duration(this.migrationContext.HeartbeatIntervalMilliseconds) * time.Millisecond)
|
||||||
|
for range ticker {
|
||||||
|
go func() error {
|
||||||
|
if atomic.LoadInt64(&this.migrationContext.CleanupImminentFlag) > 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
changelogState, err := this.inspector.readChangelogState()
|
||||||
|
if err != nil {
|
||||||
|
return log.Errore(err)
|
||||||
|
}
|
||||||
|
if heartbeatValue, ok := changelogState["heartbeat"]; ok {
|
||||||
|
this.parseChangelogHeartbeat(heartbeatValue)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// collectControlReplicasLag polls all the control replicas to get maximum lag value
|
||||||
|
func (this *Throttler) collectControlReplicasLag() {
|
||||||
|
readControlReplicasLag := func(replicationLagQuery string) error {
|
||||||
|
if (this.migrationContext.TestOnReplica || this.migrationContext.MigrateOnReplica) && (atomic.LoadInt64(&this.migrationContext.AllEventsUpToLockProcessedInjectedFlag) > 0) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
lagResult := mysql.GetMaxReplicationLag(
|
||||||
|
this.migrationContext.InspectorConnectionConfig,
|
||||||
|
this.migrationContext.GetThrottleControlReplicaKeys(),
|
||||||
|
replicationLagQuery,
|
||||||
|
)
|
||||||
|
this.migrationContext.SetControlReplicasLagResult(lagResult)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
aggressiveTicker := time.Tick(100 * time.Millisecond)
|
||||||
|
relaxedFactor := 10
|
||||||
|
counter := 0
|
||||||
|
shouldReadLagAggressively := false
|
||||||
|
replicationLagQuery := ""
|
||||||
|
|
||||||
|
for range aggressiveTicker {
|
||||||
|
if counter%relaxedFactor == 0 {
|
||||||
|
// we only check if we wish to be aggressive once per second. The parameters for being aggressive
|
||||||
|
// do not typically change at all throughout the migration, but nonetheless we check them.
|
||||||
|
counter = 0
|
||||||
|
maxLagMillisecondsThrottleThreshold := atomic.LoadInt64(&this.migrationContext.MaxLagMillisecondsThrottleThreshold)
|
||||||
|
replicationLagQuery = this.migrationContext.GetReplicationLagQuery()
|
||||||
|
shouldReadLagAggressively = (replicationLagQuery != "" && maxLagMillisecondsThrottleThreshold < 1000)
|
||||||
|
}
|
||||||
|
if counter == 0 || shouldReadLagAggressively {
|
||||||
|
// We check replication lag every so often, or if we wish to be aggressive
|
||||||
|
readControlReplicasLag(replicationLagQuery)
|
||||||
|
}
|
||||||
|
counter++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// collectGeneralThrottleMetrics reads the once-per-sec metrics, and stores them onto this.migrationContext
|
||||||
|
func (this *Throttler) collectGeneralThrottleMetrics() error {
|
||||||
|
|
||||||
|
setThrottle := func(throttle bool, reason string) error {
|
||||||
|
this.migrationContext.SetThrottleGeneralCheckResult(base.NewThrottleCheckResult(throttle, reason))
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Regardless of throttle, we take opportunity to check for panic-abort
|
||||||
|
if this.migrationContext.PanicFlagFile != "" {
|
||||||
|
if base.FileExists(this.migrationContext.PanicFlagFile) {
|
||||||
|
this.panicAbort <- fmt.Errorf("Found panic-file %s. Aborting without cleanup", this.migrationContext.PanicFlagFile)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
criticalLoad := this.migrationContext.GetCriticalLoad()
|
||||||
|
for variableName, threshold := range criticalLoad {
|
||||||
|
value, err := this.applier.ShowStatusVariable(variableName)
|
||||||
|
if err != nil {
|
||||||
|
return setThrottle(true, fmt.Sprintf("%s %s", variableName, err))
|
||||||
|
}
|
||||||
|
if value >= threshold {
|
||||||
|
this.panicAbort <- fmt.Errorf("critical-load met: %s=%d, >=%d", variableName, value, threshold)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Back to throttle considerations
|
||||||
|
|
||||||
|
// User-based throttle
|
||||||
|
if atomic.LoadInt64(&this.migrationContext.ThrottleCommandedByUser) > 0 {
|
||||||
|
return setThrottle(true, "commanded by user")
|
||||||
|
}
|
||||||
|
if this.migrationContext.ThrottleFlagFile != "" {
|
||||||
|
if base.FileExists(this.migrationContext.ThrottleFlagFile) {
|
||||||
|
// Throttle file defined and exists!
|
||||||
|
return setThrottle(true, "flag-file")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if this.migrationContext.ThrottleAdditionalFlagFile != "" {
|
||||||
|
if base.FileExists(this.migrationContext.ThrottleAdditionalFlagFile) {
|
||||||
|
// 2nd Throttle file defined and exists!
|
||||||
|
return setThrottle(true, "flag-file")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
maxLoad := this.migrationContext.GetMaxLoad()
|
||||||
|
for variableName, threshold := range maxLoad {
|
||||||
|
value, err := this.applier.ShowStatusVariable(variableName)
|
||||||
|
if err != nil {
|
||||||
|
return setThrottle(true, fmt.Sprintf("%s %s", variableName, err))
|
||||||
|
}
|
||||||
|
if value >= threshold {
|
||||||
|
return setThrottle(true, fmt.Sprintf("max-load %s=%d >= %d", variableName, value, threshold))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if this.migrationContext.GetThrottleQuery() != "" {
|
||||||
|
if res, _ := this.applier.ExecuteThrottleQuery(); res > 0 {
|
||||||
|
return setThrottle(true, "throttle-query")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return setThrottle(false, "")
|
||||||
|
}
|
||||||
|
|
||||||
|
// initiateThrottlerMetrics initiates the various processes that collect measurements
|
||||||
|
// that may affect throttling. There are several components, all running independently,
|
||||||
|
// that collect such metrics.
|
||||||
|
func (this *Throttler) initiateThrottlerCollection(firstThrottlingCollected chan<- bool) {
|
||||||
|
go this.collectHeartbeat()
|
||||||
|
go this.collectControlReplicasLag()
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
throttlerMetricsTick := time.Tick(1 * time.Second)
|
||||||
|
this.collectGeneralThrottleMetrics()
|
||||||
|
firstThrottlingCollected <- true
|
||||||
|
for range throttlerMetricsTick {
|
||||||
|
this.collectGeneralThrottleMetrics()
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
// initiateThrottlerChecks initiates the throttle ticker and sets the basic behavior of throttling.
|
||||||
|
func (this *Throttler) initiateThrottlerChecks() error {
|
||||||
|
throttlerTick := time.Tick(100 * time.Millisecond)
|
||||||
|
|
||||||
|
throttlerFunction := func() {
|
||||||
|
alreadyThrottling, currentReason := this.migrationContext.IsThrottled()
|
||||||
|
shouldThrottle, throttleReason := this.shouldThrottle()
|
||||||
|
if shouldThrottle && !alreadyThrottling {
|
||||||
|
// New throttling
|
||||||
|
this.applier.WriteAndLogChangelog("throttle", throttleReason)
|
||||||
|
} else if shouldThrottle && alreadyThrottling && (currentReason != throttleReason) {
|
||||||
|
// Change of reason
|
||||||
|
this.applier.WriteAndLogChangelog("throttle", throttleReason)
|
||||||
|
} else if alreadyThrottling && !shouldThrottle {
|
||||||
|
// End of throttling
|
||||||
|
this.applier.WriteAndLogChangelog("throttle", "done throttling")
|
||||||
|
}
|
||||||
|
this.migrationContext.SetThrottled(shouldThrottle, throttleReason)
|
||||||
|
}
|
||||||
|
throttlerFunction()
|
||||||
|
for range throttlerTick {
|
||||||
|
throttlerFunction()
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// throttle sees if throttling needs take place, and if so, continuously sleeps (blocks)
|
||||||
|
// until throttling reasons are gone
|
||||||
|
func (this *Throttler) throttle(onThrottled func()) {
|
||||||
|
for {
|
||||||
|
// IsThrottled() is non-blocking; the throttling decision making takes place asynchronously.
|
||||||
|
// Therefore calling IsThrottled() is cheap
|
||||||
|
if shouldThrottle, _ := this.migrationContext.IsThrottled(); !shouldThrottle {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if onThrottled != nil {
|
||||||
|
onThrottled()
|
||||||
|
}
|
||||||
|
time.Sleep(250 * time.Millisecond)
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user