From 23ce390d6973e470800c391f0e136914d13283ba Mon Sep 17 00:00:00 2001 From: Shlomi Noach Date: Sun, 26 Mar 2017 13:10:34 +0300 Subject: [PATCH 1/3] supporting throttle-http --- go/base/context.go | 27 ++++++++++++++++++++++++--- go/cmd/gh-ost/main.go | 2 ++ go/logic/migrator.go | 5 +++-- go/logic/server.go | 11 +++++++++++ go/logic/throttler.go | 33 +++++++++++++++++++++++++++++++++ 5 files changed, 73 insertions(+), 5 deletions(-) diff --git a/go/base/context.go b/go/base/context.go index d6cd6ce..5ab6d2a 100644 --- a/go/base/context.go +++ b/go/base/context.go @@ -44,6 +44,10 @@ const ( UserCommandThrottleReasonHint = "UserCommandThrottleReasonHint" ) +const ( + HTTPStatusOK = 200 +) + var ( envVariableRegexp = regexp.MustCompile("[$][{](.*)[}]") ) @@ -99,6 +103,7 @@ type MigrationContext struct { ThrottleFlagFile string ThrottleAdditionalFlagFile string throttleQuery string + throttleHTTP string ThrottleCommandedByUser int64 maxLoad LoadMap criticalLoad LoadMap @@ -148,6 +153,7 @@ type MigrationContext struct { pointOfInterestTime time.Time pointOfInterestTimeMutex *sync.Mutex CurrentLag int64 + ThrottleHTTPStatusCode int64 controlReplicasLagResult mysql.ReplicationLagResult TotalRowsCopied int64 TotalDMLEventsApplied int64 @@ -157,6 +163,7 @@ type MigrationContext struct { throttleReasonHint ThrottleReasonHint throttleGeneralCheckResult ThrottleCheckResult throttleMutex *sync.Mutex + throttleHTTPMutex *sync.Mutex IsPostponingCutOver int64 CountingRowsFlag int64 AllEventsUpToLockProcessedInjectedFlag int64 @@ -215,6 +222,7 @@ func newMigrationContext() *MigrationContext { maxLoad: NewLoadMap(), criticalLoad: NewLoadMap(), throttleMutex: &sync.Mutex{}, + throttleHTTPMutex: &sync.Mutex{}, throttleControlReplicaKeys: mysql.NewInstanceKeyMap(), configMutex: &sync.Mutex{}, pointOfInterestTimeMutex: &sync.Mutex{}, @@ -472,12 +480,10 @@ func (this *MigrationContext) IsThrottled() (bool, string, ThrottleReasonHint) { } func (this *MigrationContext) GetThrottleQuery() string { - var query string - this.throttleMutex.Lock() defer this.throttleMutex.Unlock() - query = this.throttleQuery + var query = this.throttleQuery return query } @@ -488,6 +494,21 @@ func (this *MigrationContext) SetThrottleQuery(newQuery string) { this.throttleQuery = newQuery } +func (this *MigrationContext) GetThrottleHTTP() string { + this.throttleHTTPMutex.Lock() + defer this.throttleHTTPMutex.Unlock() + + var throttleHTTP = this.throttleHTTP + return throttleHTTP +} + +func (this *MigrationContext) SetThrottleHTTP(throttleHTTP string) { + this.throttleHTTPMutex.Lock() + defer this.throttleHTTPMutex.Unlock() + + this.throttleHTTP = throttleHTTP +} + func (this *MigrationContext) GetMaxLoad() LoadMap { this.throttleMutex.Lock() defer this.throttleMutex.Unlock() diff --git a/go/cmd/gh-ost/main.go b/go/cmd/gh-ost/main.go index 238dc81..f27e12b 100644 --- a/go/cmd/gh-ost/main.go +++ b/go/cmd/gh-ost/main.go @@ -93,6 +93,7 @@ func main() { replicationLagQuery := flag.String("replication-lag-query", "", "Deprecated. gh-ost uses an internal, subsecond resolution query") throttleControlReplicas := flag.String("throttle-control-replicas", "", "List of replicas on which to check for lag; comma delimited. Example: myhost1.com:3306,myhost2.com,myhost3.com:3307") throttleQuery := flag.String("throttle-query", "", "when given, issued (every second) to check if operation should throttle. Expecting to return zero for no-throttle, >0 for throttle. Query is issued on the migrated server. Make sure this query is lightweight") + throttleHTTP := flag.String("throttle-http", "", "when given, gh-ost checks given URL via HEAD request; any response code other than 200 (OK) causes throttling; make sure it has low latency response") heartbeatIntervalMillis := flag.Int64("heartbeat-interval-millis", 100, "how frequently would gh-ost inject a heartbeat value") flag.StringVar(&migrationContext.ThrottleFlagFile, "throttle-flag-file", "", "operation pauses when this file exists; hint: use a file that is specific to the table being altered") flag.StringVar(&migrationContext.ThrottleAdditionalFlagFile, "throttle-additional-flag-file", "/tmp/gh-ost.throttle", "operation pauses when this file exists; hint: keep default, use for throttling multiple gh-ost operations") @@ -228,6 +229,7 @@ func main() { migrationContext.SetDMLBatchSize(*dmlBatchSize) migrationContext.SetMaxLagMillisecondsThrottleThreshold(*maxLagMillis) migrationContext.SetThrottleQuery(*throttleQuery) + migrationContext.SetThrottleHTTP(*throttleHTTP) migrationContext.SetDefaultNumRetries(*defaultRetries) migrationContext.ApplyCredentials() if err := migrationContext.SetCutOverLockTimeoutSeconds(*cutOverLockTimeoutSeconds); err != nil { diff --git a/go/logic/migrator.go b/go/logic/migrator.go index 549dd1d..59432a3 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -96,7 +96,7 @@ func NewMigrator() *Migrator { migrationContext: base.GetMigrationContext(), parser: sql.NewParser(), ghostTableMigrated: make(chan bool), - firstThrottlingCollected: make(chan bool, 1), + firstThrottlingCollected: make(chan bool, 3), rowCopyComplete: make(chan bool), allEventsUpToLockProcessed: make(chan string), @@ -977,7 +977,8 @@ func (this *Migrator) initiateThrottler() error { go this.throttler.initiateThrottlerCollection(this.firstThrottlingCollected) log.Infof("Waiting for first throttle metrics to be collected") <-this.firstThrottlingCollected // replication lag - <-this.firstThrottlingCollected // other metrics + <-this.firstThrottlingCollected // HTTP status + <-this.firstThrottlingCollected // other, general metrics log.Infof("First throttle metrics collected") go this.throttler.initiateThrottlerChecks() diff --git a/go/logic/server.go b/go/logic/server.go index b1246c2..3faeb98 100644 --- a/go/logic/server.go +++ b/go/logic/server.go @@ -146,6 +146,7 @@ max-lag-millis= # Set a new replication lag threshold replication-lag-query= # Set a new query that determines replication lag (no quotes) max-load= # Set a new set of max-load thresholds throttle-query= # Set a new throttle-query (no quotes) +throttle-http= # Set a new throttle URL throttle-control-replicas= # Set a new comma delimited list of throttle control replicas throttle # Force throttling no-throttle # End forced throttling (other throttling may still apply) @@ -236,6 +237,16 @@ help # This message fmt.Fprintf(writer, throttleHint) return ForcePrintStatusAndHintRule, nil } + case "throttle-http": + { + if argIsQuestion { + fmt.Fprintf(writer, "%+v\n", this.migrationContext.GetThrottleHTTP()) + return NoPrintStatusRule, nil + } + this.migrationContext.SetThrottleHTTP(arg) + fmt.Fprintf(writer, throttleHint) + return ForcePrintStatusAndHintRule, nil + } case "throttle-control-replicas": { if argIsQuestion { diff --git a/go/logic/throttler.go b/go/logic/throttler.go index 33d3f79..49ad83a 100644 --- a/go/logic/throttler.go +++ b/go/logic/throttler.go @@ -7,6 +7,7 @@ package logic import ( "fmt" + "net/http" "sync/atomic" "time" @@ -41,6 +42,11 @@ func (this *Throttler) shouldThrottle() (result bool, reason string, reasonHint if generalCheckResult.ShouldThrottle { return generalCheckResult.ShouldThrottle, generalCheckResult.Reason, generalCheckResult.ReasonHint } + // HTTP throttle + statusCode := atomic.LoadInt64(&this.migrationContext.ThrottleHTTPStatusCode) + if statusCode != 0 && statusCode != http.StatusOK { + return true, fmt.Sprintf("http=%d", statusCode), base.NoThrottleReasonHint + } // Replication lag throttle maxLagMillisecondsThrottleThreshold := atomic.LoadInt64(&this.migrationContext.MaxLagMillisecondsThrottleThreshold) lag := atomic.LoadInt64(&this.migrationContext.CurrentLag) @@ -213,6 +219,32 @@ func (this *Throttler) criticalLoadIsMet() (met bool, variableName string, value return false, variableName, value, threshold, nil } +// collectReplicationLag reads the latest changelog heartbeat value +func (this *Throttler) collectThrottleHTTPStatus(firstThrottlingCollected chan<- bool) { + collectFunc := func() (sleep bool, err error) { + url := this.migrationContext.GetThrottleHTTP() + if url == "" { + return true, nil + } + resp, err := http.Get(url) + if err != nil { + return false, err + } + atomic.StoreInt64(&this.migrationContext.ThrottleHTTPStatusCode, int64(resp.StatusCode)) + return false, nil + } + + collectFunc() + firstThrottlingCollected <- true + + ticker := time.Tick(100 * time.Millisecond) + for range ticker { + if sleep, _ := collectFunc(); sleep { + time.Sleep(1 * time.Second) + } + } +} + // collectGeneralThrottleMetrics reads the once-per-sec metrics, and stores them onto this.migrationContext func (this *Throttler) collectGeneralThrottleMetrics() error { @@ -290,6 +322,7 @@ func (this *Throttler) collectGeneralThrottleMetrics() error { func (this *Throttler) initiateThrottlerCollection(firstThrottlingCollected chan<- bool) { go this.collectReplicationLag(firstThrottlingCollected) go this.collectControlReplicasLag() + go this.collectThrottleHTTPStatus(firstThrottlingCollected) go func() { this.collectGeneralThrottleMetrics() From c413d508cc37e5af519fe35f22e311096ad05333 Mon Sep 17 00:00:00 2001 From: Shlomi Noach Date: Sun, 26 Mar 2017 13:12:56 +0300 Subject: [PATCH 2/3] HEAD instead of GET --- go/logic/throttler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/logic/throttler.go b/go/logic/throttler.go index 49ad83a..1c2c62a 100644 --- a/go/logic/throttler.go +++ b/go/logic/throttler.go @@ -226,7 +226,7 @@ func (this *Throttler) collectThrottleHTTPStatus(firstThrottlingCollected chan<- if url == "" { return true, nil } - resp, err := http.Get(url) + resp, err := http.Head(url) if err != nil { return false, err } From 8d4d9cbaeca86f01f0e575d7a0e37e9593c17366 Mon Sep 17 00:00:00 2001 From: Shlomi Noach Date: Sun, 26 Mar 2017 15:14:36 +0300 Subject: [PATCH 3/3] throttle-http docuemntation --- doc/command-line-flags.md | 8 ++++++++ doc/interactive-commands.md | 1 + 2 files changed, 9 insertions(+) diff --git a/doc/command-line-flags.md b/doc/command-line-flags.md index d266f2c..5f92cc0 100644 --- a/doc/command-line-flags.md +++ b/doc/command-line-flags.md @@ -131,6 +131,14 @@ See `approve-renamed-columns` Issue the migration on a replica; do not modify data on master. Useful for validating, testing and benchmarking. See [testing-on-replica](testing-on-replica.md) +### throttle-control-replicas + +Provide a command delimited list of replicas; `gh-ost` will throttle when any of the given replicas lag beyond `--max-lag-millis`. The list can be queried and updated dynamically via [interactive commands](interactive-commands.md) + +### throttle-http + +Provide a HTTP endpoint; `gh-ost` will issue `HEAD` requests on given URL and throttle whenever response status code is not `200`. The URL can be queried and updated dynamically via [interactive commands](interactive-commands.md). Empty URL disables the HTTP check. + ### timestamp-old-table Makes the _old_ table include a timestamp value. The _old_ table is what the original table is renamed to at the end of a successful migration. For example, if the table is `gh_ost_test`, then the _old_ table would normally be `_gh_ost_test_del`. With `--timestamp-old-table` it would be, for example, `_gh_ost_test_20170221103147_del`. diff --git a/doc/interactive-commands.md b/doc/interactive-commands.md index c6398c5..c0389e1 100644 --- a/doc/interactive-commands.md +++ b/doc/interactive-commands.md @@ -31,6 +31,7 @@ Both interfaces may serve at the same time. Both respond to simple text command, - `nice-ratio=0.5` will cause `gh-ost` to sleep for `50ms` immediately following. - `nice-ratio=1` will cause `gh-ost` to sleep for `100ms`, effectively doubling runtime - value of `2` will effectively triple the runtime; etc. +- `throttle-http`: change throttle HTTP endpoint - `throttle-query`: change throttle query - `throttle-control-replicas='replica1,replica2'`: change list of throttle-control replicas, these are replicas `gh-ost` will check. This takes a comma separated list of replica's to check and replaces the previous list. - `throttle`: force migration suspend