From 8e46b4ceea9219fdd1e3e51173102f67a20d4381 Mon Sep 17 00:00:00 2001 From: Shlomi Noach Date: Wed, 13 Jul 2016 09:44:00 +0200 Subject: [PATCH] max-lag-millis is dynamicly controllable --- build.sh | 2 +- doc/interactive-commands.md | 1 + go/base/context.go | 9 ++++++++- go/cmd/gh-ost/main.go | 3 ++- go/logic/migrator.go | 18 +++++++++++++++--- 5 files changed, 27 insertions(+), 6 deletions(-) diff --git a/build.sh b/build.sh index 79ea741..b834412 100644 --- a/build.sh +++ b/build.sh @@ -1,7 +1,7 @@ #!/bin/bash # # -RELEASE_VERSION="1.0.1" +RELEASE_VERSION="1.0.2" buildpath=/tmp/gh-ost target=gh-ost diff --git a/doc/interactive-commands.md b/doc/interactive-commands.md index 563804c..1579e01 100644 --- a/doc/interactive-commands.md +++ b/doc/interactive-commands.md @@ -18,6 +18,7 @@ Both interfaces may serve at the same time. Both respond to simple text command, - `status`: returns a status summary of migration progress and configuration replication lag on to determine throttling - `chunk-size=`: modify the `chunk-size`; applies on next running copy-iteration +- `max-lag-millis=`: modify the maximum replication lag threshold (milliseconds, minimum value is `1000`, i.e. 1 second) - `max-load=`: modify the `max-load` config; applies on next running copy-iteration The `max-load` format must be: `some_status=[,some_status=...]`. For example: `Threads_running=50,threads_connected=1000`, and you would then write/echo `max-load=Threads_running=50,threads_connected=1000` to the socket. - `critical-load=`: change critical load setting (exceeding given thresholds causes panic and abort) diff --git a/go/base/context.go b/go/base/context.go index 971d1e9..00a5ad7 100644 --- a/go/base/context.go +++ b/go/base/context.go @@ -148,7 +148,7 @@ func newMigrationContext() *MigrationContext { ChunkSize: 1000, InspectorConnectionConfig: mysql.NewConnectionConfig(), ApplierConnectionConfig: mysql.NewConnectionConfig(), - MaxLagMillisecondsThrottleThreshold: 1000, + MaxLagMillisecondsThrottleThreshold: 1500, CutOverLockTimeoutSeconds: 3, maxLoad: NewLoadMap(), criticalLoad: NewLoadMap(), @@ -298,6 +298,13 @@ func (this *MigrationContext) TimeSincePointOfInterest() time.Duration { return time.Now().Sub(this.pointOfInterestTime) } +func (this *MigrationContext) SetMaxLagMillisecondsThrottleThreshold(maxLagMillisecondsThrottleThreshold int64) { + if maxLagMillisecondsThrottleThreshold < 1000 { + maxLagMillisecondsThrottleThreshold = 1000 + } + atomic.StoreInt64(&this.MaxLagMillisecondsThrottleThreshold, maxLagMillisecondsThrottleThreshold) +} + func (this *MigrationContext) SetChunkSize(chunkSize int64) { if chunkSize < 100 { chunkSize = 100 diff --git a/go/cmd/gh-ost/main.go b/go/cmd/gh-ost/main.go index e56f4ea..ac4796b 100644 --- a/go/cmd/gh-ost/main.go +++ b/go/cmd/gh-ost/main.go @@ -74,7 +74,7 @@ func main() { cutOverLockTimeoutSeconds := flag.Int64("cut-over-lock-timeout-seconds", 3, "Max number of seconds to hold locks on tables while attempting to cut-over (retry attempted when lock exceeds timeout)") flag.Int64Var(&migrationContext.NiceRatio, "nice-ratio", 0, "force being 'nice', imply sleep time per chunk time. Example values: 0 is aggressive. 3: for every ms spend in a rowcopy chunk, spend 3ms sleeping immediately after") - flag.Int64Var(&migrationContext.MaxLagMillisecondsThrottleThreshold, "max-lag-millis", 1500, "replication lag at which to throttle operation") + maxLagMillis := flag.Int64("max-lag-millis", 1500, "replication lag at which to throttle operation") flag.StringVar(&migrationContext.ReplictionLagQuery, "replication-lag-query", "", "Query that detects replication lag in seconds. Result can be a floating point (by default gh-ost issues SHOW SLAVE STATUS and reads Seconds_behind_master). If you're using pt-heartbeat, query would be something like: SELECT ROUND(UNIX_TIMESTAMP() - MAX(UNIX_TIMESTAMP(ts))) AS delay FROM my_schema.heartbeat") throttleControlReplicas := flag.String("throttle-control-replicas", "", "List of replicas on which to check for lag; comma delimited. Example: myhost1.com:3306,myhost2.com,myhost3.com:3307") flag.StringVar(&migrationContext.ThrottleQuery, "throttle-query", "", "when given, issued (every second) to check if operation should throttle. Expecting to return zero for no-throttle, >0 for throttle. Query is issued on the migrated server. Make sure this query is lightweight") @@ -171,6 +171,7 @@ func main() { migrationContext.ServeSocketFile = fmt.Sprintf("/tmp/gh-ost.%s.%s.sock", migrationContext.DatabaseName, migrationContext.OriginalTableName) } migrationContext.SetChunkSize(*chunkSize) + migrationContext.SetMaxLagMillisecondsThrottleThreshold(*maxLagMillis) migrationContext.SetDefaultNumRetries(*defaultRetries) migrationContext.ApplyCredentials() if err := migrationContext.SetCutOverLockTimeoutSeconds(*cutOverLockTimeoutSeconds); err != nil { diff --git a/go/logic/migrator.go b/go/logic/migrator.go index 17ff1c8..e1bf7c6 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -148,8 +148,9 @@ func (this *Migrator) shouldThrottle() (result bool, reason string) { } } // Replication lag throttle + maxLagMillisecondsThrottleThreshold := atomic.LoadInt64(&this.migrationContext.MaxLagMillisecondsThrottleThreshold) lag := atomic.LoadInt64(&this.migrationContext.CurrentLag) - if time.Duration(lag) > time.Duration(this.migrationContext.MaxLagMillisecondsThrottleThreshold)*time.Millisecond { + if time.Duration(lag) > time.Duration(maxLagMillisecondsThrottleThreshold)*time.Millisecond { return true, fmt.Sprintf("lag=%fs", time.Duration(lag).Seconds()) } if (this.migrationContext.TestOnReplica || this.migrationContext.MigrateOnReplica) && (atomic.LoadInt64(&this.allEventsUpToLockProcessedInjectedFlag) == 0) { @@ -157,7 +158,7 @@ func (this *Migrator) shouldThrottle() (result bool, reason string) { if err != nil { return true, err.Error() } - if replicationLag > time.Duration(this.migrationContext.MaxLagMillisecondsThrottleThreshold)*time.Millisecond { + if replicationLag > time.Duration(maxLagMillisecondsThrottleThreshold)*time.Millisecond { return true, fmt.Sprintf("replica-lag=%fs", replicationLag.Seconds()) } } @@ -792,6 +793,7 @@ status # Print a status message chunk-size= # Set a new chunk-size nice-ratio= # Set a new nice-ratio, integer (0 is agrressive) critical-load= # Set a new set of max-load thresholds +max-lag-millis= # Set a new replication lag threshold max-load= # Set a new set of max-load thresholds throttle-query= # Set a new throttle-query throttle-control-replicas= # @@ -814,6 +816,16 @@ help # This message this.printStatus(ForcePrintStatusAndHint, writer) } } + case "max-lag-millis": + { + if maxLagMillis, err := strconv.Atoi(arg); err != nil { + fmt.Fprintf(writer, "%s\n", err.Error()) + return log.Errore(err) + } else { + this.migrationContext.SetMaxLagMillisecondsThrottleThreshold(int64(maxLagMillis)) + this.printStatus(ForcePrintStatusAndHint, writer) + } + } case "nice-ratio": { if niceRatio, err := strconv.Atoi(arg); err != nil { @@ -974,7 +986,7 @@ func (this *Migrator) printMigrationStatusHint(writers ...io.Writer) { )) maxLoad := this.migrationContext.GetMaxLoad() criticalLoad := this.migrationContext.GetCriticalLoad() - fmt.Fprintln(w, fmt.Sprintf("# chunk-size: %+v; max lag: %+vms; max-load: %s; critical-load: %s; nice-ratio: %d", + fmt.Fprintln(w, fmt.Sprintf("# chunk-size: %+v; max-lag-millis: %+vms; max-load: %s; critical-load: %s; nice-ratio: %d", atomic.LoadInt64(&this.migrationContext.ChunkSize), atomic.LoadInt64(&this.migrationContext.MaxLagMillisecondsThrottleThreshold), maxLoad.String(),