From 0918bab29b511aa776157ae89777252c09d1d59a Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Wed, 6 Jul 2022 23:56:07 +0200 Subject: [PATCH] Add context/timeout to HTTP throttle check (#1131) * Add context/timeout to HTTP throttle check * Dont run `.GetThrottleHTTPInterval()` on every loop * Update help message * Var rename * 2022 * Add timeout flag * Add unix/tcp server commands, use ParseInt() for string->int64 * Var rename * Re-check http timeout on every loop iteration * Remove stale comment * Make throttle interval idempotent * var rename * Usage grammar * Make http timeout idempotent too * Parse time.Duration once * Move timeout to NewThrottler * Help update * Set User-Agent header * Re-add newline Co-authored-by: dm-2 <45519614+dm-2@users.noreply.github.com> --- .golangci.yml | 2 ++ go/base/context.go | 2 ++ go/cmd/gh-ost/main.go | 6 ++++-- go/logic/migrator.go | 6 ++++-- go/logic/throttler.go | 24 +++++++++++++++++++++--- 5 files changed, 33 insertions(+), 7 deletions(-) diff --git a/.golangci.yml b/.golangci.yml index af04b55..a71dcdd 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -9,6 +9,8 @@ linters: enable: - gosimple - govet + - noctx - rowserrcheck - sqlclosecheck - unused + diff --git a/go/base/context.go b/go/base/context.go index 93f84ce..1c8b17a 100644 --- a/go/base/context.go +++ b/go/base/context.go @@ -184,7 +184,9 @@ type MigrationContext struct { CurrentLag int64 currentProgress uint64 etaNanoseonds int64 + ThrottleHTTPIntervalMillis int64 ThrottleHTTPStatusCode int64 + ThrottleHTTPTimeoutMillis int64 controlReplicasLagResult mysql.ReplicationLagResult TotalRowsCopied int64 TotalDMLEventsApplied int64 diff --git a/go/cmd/gh-ost/main.go b/go/cmd/gh-ost/main.go index 93d8fb9..d0ae4c3 100644 --- a/go/cmd/gh-ost/main.go +++ b/go/cmd/gh-ost/main.go @@ -1,5 +1,5 @@ /* - Copyright 2016 GitHub Inc. + Copyright 2022 GitHub Inc. See https://github.com/github/gh-ost/blob/master/LICENSE */ @@ -110,6 +110,8 @@ func main() { throttleControlReplicas := flag.String("throttle-control-replicas", "", "List of replicas on which to check for lag; comma delimited. Example: myhost1.com:3306,myhost2.com,myhost3.com:3307") throttleQuery := flag.String("throttle-query", "", "when given, issued (every second) to check if operation should throttle. Expecting to return zero for no-throttle, >0 for throttle. Query is issued on the migrated server. Make sure this query is lightweight") throttleHTTP := flag.String("throttle-http", "", "when given, gh-ost checks given URL via HEAD request; any response code other than 200 (OK) causes throttling; make sure it has low latency response") + flag.Int64Var(&migrationContext.ThrottleHTTPIntervalMillis, "throttle-http-interval-millis", 100, "Number of milliseconds to wait before triggering another HTTP throttle check") + flag.Int64Var(&migrationContext.ThrottleHTTPTimeoutMillis, "throttle-http-timeout-millis", 1000, "Number of milliseconds to use as an HTTP throttle check timeout") ignoreHTTPErrors := flag.Bool("ignore-http-errors", false, "ignore HTTP connection errors during throttle check") heartbeatIntervalMillis := flag.Int64("heartbeat-interval-millis", 100, "how frequently would gh-ost inject a heartbeat value") flag.StringVar(&migrationContext.ThrottleFlagFile, "throttle-flag-file", "", "operation pauses when this file exists; hint: use a file that is specific to the table being altered") @@ -297,7 +299,7 @@ func main() { log.Infof("starting gh-ost %+v", AppVersion) acceptSignals(migrationContext) - migrator := logic.NewMigrator(migrationContext) + migrator := logic.NewMigrator(migrationContext, AppVersion) err := migrator.Migrate() if err != nil { migrator.ExecOnFailureHook() diff --git a/go/logic/migrator.go b/go/logic/migrator.go index 747750c..e1fe7d1 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -62,6 +62,7 @@ const ( // Migrator is the main schema migration flow manager. type Migrator struct { + appVersion string parser *sql.AlterTableParser inspector *Inspector applier *Applier @@ -87,8 +88,9 @@ type Migrator struct { finishedMigrating int64 } -func NewMigrator(context *base.MigrationContext) *Migrator { +func NewMigrator(context *base.MigrationContext, appVersion string) *Migrator { migrator := &Migrator{ + appVersion: appVersion, migrationContext: context, parser: sql.NewAlterTableParser(), ghostTableMigrated: make(chan bool), @@ -1068,7 +1070,7 @@ func (this *Migrator) addDMLEventsListener() error { // initiateThrottler kicks in the throttling collection and the throttling checks. func (this *Migrator) initiateThrottler() error { - this.throttler = NewThrottler(this.migrationContext, this.applier, this.inspector) + this.throttler = NewThrottler(this.migrationContext, this.applier, this.inspector, this.appVersion) go this.throttler.initiateThrottlerCollection(this.firstThrottlingCollected) this.migrationContext.Log.Infof("Waiting for first throttle metrics to be collected") diff --git a/go/logic/throttler.go b/go/logic/throttler.go index 9c120b3..8f3d0db 100644 --- a/go/logic/throttler.go +++ b/go/logic/throttler.go @@ -6,6 +6,7 @@ package logic import ( + "context" "fmt" "net/http" "strings" @@ -42,16 +43,22 @@ const frenoMagicHint = "freno" // Throttler collects metrics related to throttling and makes informed decision // whether throttling should take place. type Throttler struct { + appVersion string migrationContext *base.MigrationContext applier *Applier + httpClient *http.Client + httpClientTimeout time.Duration inspector *Inspector finishedMigrating int64 } -func NewThrottler(migrationContext *base.MigrationContext, applier *Applier, inspector *Inspector) *Throttler { +func NewThrottler(migrationContext *base.MigrationContext, applier *Applier, inspector *Inspector, appVersion string) *Throttler { return &Throttler{ + appVersion: appVersion, migrationContext: migrationContext, applier: applier, + httpClient: &http.Client{}, + httpClientTimeout: time.Duration(migrationContext.ThrottleHTTPTimeoutMillis) * time.Millisecond, inspector: inspector, finishedMigrating: 0, } @@ -285,7 +292,17 @@ func (this *Throttler) collectThrottleHTTPStatus(firstThrottlingCollected chan<- if url == "" { return true, nil } - resp, err := http.Head(url) + + ctx, cancel := context.WithTimeout(context.Background(), this.httpClientTimeout) + defer cancel() + + req, err := http.NewRequestWithContext(ctx, http.MethodHead, url, nil) + if err != nil { + return false, err + } + req.Header.Set("User-Agent", fmt.Sprintf("gh-ost/%s", this.appVersion)) + + resp, err := this.httpClient.Do(req) if err != nil { return false, err } @@ -303,7 +320,8 @@ func (this *Throttler) collectThrottleHTTPStatus(firstThrottlingCollected chan<- firstThrottlingCollected <- true - ticker := time.Tick(100 * time.Millisecond) + collectInterval := time.Duration(this.migrationContext.ThrottleHTTPIntervalMillis) * time.Millisecond + ticker := time.Tick(collectInterval) for range ticker { if atomic.LoadInt64(&this.finishedMigrating) > 0 { return