From 23ce390d6973e470800c391f0e136914d13283ba Mon Sep 17 00:00:00 2001 From: Shlomi Noach Date: Sun, 26 Mar 2017 13:10:34 +0300 Subject: [PATCH] supporting throttle-http --- go/base/context.go | 27 ++++++++++++++++++++++++--- go/cmd/gh-ost/main.go | 2 ++ go/logic/migrator.go | 5 +++-- go/logic/server.go | 11 +++++++++++ go/logic/throttler.go | 33 +++++++++++++++++++++++++++++++++ 5 files changed, 73 insertions(+), 5 deletions(-) diff --git a/go/base/context.go b/go/base/context.go index d6cd6ce..5ab6d2a 100644 --- a/go/base/context.go +++ b/go/base/context.go @@ -44,6 +44,10 @@ const ( UserCommandThrottleReasonHint = "UserCommandThrottleReasonHint" ) +const ( + HTTPStatusOK = 200 +) + var ( envVariableRegexp = regexp.MustCompile("[$][{](.*)[}]") ) @@ -99,6 +103,7 @@ type MigrationContext struct { ThrottleFlagFile string ThrottleAdditionalFlagFile string throttleQuery string + throttleHTTP string ThrottleCommandedByUser int64 maxLoad LoadMap criticalLoad LoadMap @@ -148,6 +153,7 @@ type MigrationContext struct { pointOfInterestTime time.Time pointOfInterestTimeMutex *sync.Mutex CurrentLag int64 + ThrottleHTTPStatusCode int64 controlReplicasLagResult mysql.ReplicationLagResult TotalRowsCopied int64 TotalDMLEventsApplied int64 @@ -157,6 +163,7 @@ type MigrationContext struct { throttleReasonHint ThrottleReasonHint throttleGeneralCheckResult ThrottleCheckResult throttleMutex *sync.Mutex + throttleHTTPMutex *sync.Mutex IsPostponingCutOver int64 CountingRowsFlag int64 AllEventsUpToLockProcessedInjectedFlag int64 @@ -215,6 +222,7 @@ func newMigrationContext() *MigrationContext { maxLoad: NewLoadMap(), criticalLoad: NewLoadMap(), throttleMutex: &sync.Mutex{}, + throttleHTTPMutex: &sync.Mutex{}, throttleControlReplicaKeys: mysql.NewInstanceKeyMap(), configMutex: &sync.Mutex{}, pointOfInterestTimeMutex: &sync.Mutex{}, @@ -472,12 +480,10 @@ func (this *MigrationContext) IsThrottled() (bool, string, ThrottleReasonHint) { } func (this *MigrationContext) GetThrottleQuery() string { - var query string - this.throttleMutex.Lock() defer this.throttleMutex.Unlock() - query = this.throttleQuery + var query = this.throttleQuery return query } @@ -488,6 +494,21 @@ func (this *MigrationContext) SetThrottleQuery(newQuery string) { this.throttleQuery = newQuery } +func (this *MigrationContext) GetThrottleHTTP() string { + this.throttleHTTPMutex.Lock() + defer this.throttleHTTPMutex.Unlock() + + var throttleHTTP = this.throttleHTTP + return throttleHTTP +} + +func (this *MigrationContext) SetThrottleHTTP(throttleHTTP string) { + this.throttleHTTPMutex.Lock() + defer this.throttleHTTPMutex.Unlock() + + this.throttleHTTP = throttleHTTP +} + func (this *MigrationContext) GetMaxLoad() LoadMap { this.throttleMutex.Lock() defer this.throttleMutex.Unlock() diff --git a/go/cmd/gh-ost/main.go b/go/cmd/gh-ost/main.go index 238dc81..f27e12b 100644 --- a/go/cmd/gh-ost/main.go +++ b/go/cmd/gh-ost/main.go @@ -93,6 +93,7 @@ func main() { replicationLagQuery := flag.String("replication-lag-query", "", "Deprecated. gh-ost uses an internal, subsecond resolution query") throttleControlReplicas := flag.String("throttle-control-replicas", "", "List of replicas on which to check for lag; comma delimited. Example: myhost1.com:3306,myhost2.com,myhost3.com:3307") throttleQuery := flag.String("throttle-query", "", "when given, issued (every second) to check if operation should throttle. Expecting to return zero for no-throttle, >0 for throttle. Query is issued on the migrated server. Make sure this query is lightweight") + throttleHTTP := flag.String("throttle-http", "", "when given, gh-ost checks given URL via HEAD request; any response code other than 200 (OK) causes throttling; make sure it has low latency response") heartbeatIntervalMillis := flag.Int64("heartbeat-interval-millis", 100, "how frequently would gh-ost inject a heartbeat value") flag.StringVar(&migrationContext.ThrottleFlagFile, "throttle-flag-file", "", "operation pauses when this file exists; hint: use a file that is specific to the table being altered") flag.StringVar(&migrationContext.ThrottleAdditionalFlagFile, "throttle-additional-flag-file", "/tmp/gh-ost.throttle", "operation pauses when this file exists; hint: keep default, use for throttling multiple gh-ost operations") @@ -228,6 +229,7 @@ func main() { migrationContext.SetDMLBatchSize(*dmlBatchSize) migrationContext.SetMaxLagMillisecondsThrottleThreshold(*maxLagMillis) migrationContext.SetThrottleQuery(*throttleQuery) + migrationContext.SetThrottleHTTP(*throttleHTTP) migrationContext.SetDefaultNumRetries(*defaultRetries) migrationContext.ApplyCredentials() if err := migrationContext.SetCutOverLockTimeoutSeconds(*cutOverLockTimeoutSeconds); err != nil { diff --git a/go/logic/migrator.go b/go/logic/migrator.go index 549dd1d..59432a3 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -96,7 +96,7 @@ func NewMigrator() *Migrator { migrationContext: base.GetMigrationContext(), parser: sql.NewParser(), ghostTableMigrated: make(chan bool), - firstThrottlingCollected: make(chan bool, 1), + firstThrottlingCollected: make(chan bool, 3), rowCopyComplete: make(chan bool), allEventsUpToLockProcessed: make(chan string), @@ -977,7 +977,8 @@ func (this *Migrator) initiateThrottler() error { go this.throttler.initiateThrottlerCollection(this.firstThrottlingCollected) log.Infof("Waiting for first throttle metrics to be collected") <-this.firstThrottlingCollected // replication lag - <-this.firstThrottlingCollected // other metrics + <-this.firstThrottlingCollected // HTTP status + <-this.firstThrottlingCollected // other, general metrics log.Infof("First throttle metrics collected") go this.throttler.initiateThrottlerChecks() diff --git a/go/logic/server.go b/go/logic/server.go index b1246c2..3faeb98 100644 --- a/go/logic/server.go +++ b/go/logic/server.go @@ -146,6 +146,7 @@ max-lag-millis= # Set a new replication lag threshold replication-lag-query= # Set a new query that determines replication lag (no quotes) max-load= # Set a new set of max-load thresholds throttle-query= # Set a new throttle-query (no quotes) +throttle-http= # Set a new throttle URL throttle-control-replicas= # Set a new comma delimited list of throttle control replicas throttle # Force throttling no-throttle # End forced throttling (other throttling may still apply) @@ -236,6 +237,16 @@ help # This message fmt.Fprintf(writer, throttleHint) return ForcePrintStatusAndHintRule, nil } + case "throttle-http": + { + if argIsQuestion { + fmt.Fprintf(writer, "%+v\n", this.migrationContext.GetThrottleHTTP()) + return NoPrintStatusRule, nil + } + this.migrationContext.SetThrottleHTTP(arg) + fmt.Fprintf(writer, throttleHint) + return ForcePrintStatusAndHintRule, nil + } case "throttle-control-replicas": { if argIsQuestion { diff --git a/go/logic/throttler.go b/go/logic/throttler.go index 33d3f79..49ad83a 100644 --- a/go/logic/throttler.go +++ b/go/logic/throttler.go @@ -7,6 +7,7 @@ package logic import ( "fmt" + "net/http" "sync/atomic" "time" @@ -41,6 +42,11 @@ func (this *Throttler) shouldThrottle() (result bool, reason string, reasonHint if generalCheckResult.ShouldThrottle { return generalCheckResult.ShouldThrottle, generalCheckResult.Reason, generalCheckResult.ReasonHint } + // HTTP throttle + statusCode := atomic.LoadInt64(&this.migrationContext.ThrottleHTTPStatusCode) + if statusCode != 0 && statusCode != http.StatusOK { + return true, fmt.Sprintf("http=%d", statusCode), base.NoThrottleReasonHint + } // Replication lag throttle maxLagMillisecondsThrottleThreshold := atomic.LoadInt64(&this.migrationContext.MaxLagMillisecondsThrottleThreshold) lag := atomic.LoadInt64(&this.migrationContext.CurrentLag) @@ -213,6 +219,32 @@ func (this *Throttler) criticalLoadIsMet() (met bool, variableName string, value return false, variableName, value, threshold, nil } +// collectReplicationLag reads the latest changelog heartbeat value +func (this *Throttler) collectThrottleHTTPStatus(firstThrottlingCollected chan<- bool) { + collectFunc := func() (sleep bool, err error) { + url := this.migrationContext.GetThrottleHTTP() + if url == "" { + return true, nil + } + resp, err := http.Get(url) + if err != nil { + return false, err + } + atomic.StoreInt64(&this.migrationContext.ThrottleHTTPStatusCode, int64(resp.StatusCode)) + return false, nil + } + + collectFunc() + firstThrottlingCollected <- true + + ticker := time.Tick(100 * time.Millisecond) + for range ticker { + if sleep, _ := collectFunc(); sleep { + time.Sleep(1 * time.Second) + } + } +} + // collectGeneralThrottleMetrics reads the once-per-sec metrics, and stores them onto this.migrationContext func (this *Throttler) collectGeneralThrottleMetrics() error { @@ -290,6 +322,7 @@ func (this *Throttler) collectGeneralThrottleMetrics() error { func (this *Throttler) initiateThrottlerCollection(firstThrottlingCollected chan<- bool) { go this.collectReplicationLag(firstThrottlingCollected) go this.collectControlReplicasLag() + go this.collectThrottleHTTPStatus(firstThrottlingCollected) go func() { this.collectGeneralThrottleMetrics()