Add context/timeout to HTTP throttle check (#1131)
* Add context/timeout to HTTP throttle check * Dont run `.GetThrottleHTTPInterval()` on every loop * Update help message * Var rename * 2022 * Add timeout flag * Add unix/tcp server commands, use ParseInt() for string->int64 * Var rename * Re-check http timeout on every loop iteration * Remove stale comment * Make throttle interval idempotent * var rename * Usage grammar * Make http timeout idempotent too * Parse time.Duration once * Move timeout to NewThrottler * Help update * Set User-Agent header * Re-add newline Co-authored-by: dm-2 <45519614+dm-2@users.noreply.github.com>
This commit is contained in:
parent
0b066c16a5
commit
0918bab29b
@ -9,6 +9,8 @@ linters:
|
|||||||
enable:
|
enable:
|
||||||
- gosimple
|
- gosimple
|
||||||
- govet
|
- govet
|
||||||
|
- noctx
|
||||||
- rowserrcheck
|
- rowserrcheck
|
||||||
- sqlclosecheck
|
- sqlclosecheck
|
||||||
- unused
|
- unused
|
||||||
|
|
||||||
|
@ -184,7 +184,9 @@ type MigrationContext struct {
|
|||||||
CurrentLag int64
|
CurrentLag int64
|
||||||
currentProgress uint64
|
currentProgress uint64
|
||||||
etaNanoseonds int64
|
etaNanoseonds int64
|
||||||
|
ThrottleHTTPIntervalMillis int64
|
||||||
ThrottleHTTPStatusCode int64
|
ThrottleHTTPStatusCode int64
|
||||||
|
ThrottleHTTPTimeoutMillis int64
|
||||||
controlReplicasLagResult mysql.ReplicationLagResult
|
controlReplicasLagResult mysql.ReplicationLagResult
|
||||||
TotalRowsCopied int64
|
TotalRowsCopied int64
|
||||||
TotalDMLEventsApplied int64
|
TotalDMLEventsApplied int64
|
||||||
|
@ -1,5 +1,5 @@
|
|||||||
/*
|
/*
|
||||||
Copyright 2016 GitHub Inc.
|
Copyright 2022 GitHub Inc.
|
||||||
See https://github.com/github/gh-ost/blob/master/LICENSE
|
See https://github.com/github/gh-ost/blob/master/LICENSE
|
||||||
*/
|
*/
|
||||||
|
|
||||||
@ -110,6 +110,8 @@ func main() {
|
|||||||
throttleControlReplicas := flag.String("throttle-control-replicas", "", "List of replicas on which to check for lag; comma delimited. Example: myhost1.com:3306,myhost2.com,myhost3.com:3307")
|
throttleControlReplicas := flag.String("throttle-control-replicas", "", "List of replicas on which to check for lag; comma delimited. Example: myhost1.com:3306,myhost2.com,myhost3.com:3307")
|
||||||
throttleQuery := flag.String("throttle-query", "", "when given, issued (every second) to check if operation should throttle. Expecting to return zero for no-throttle, >0 for throttle. Query is issued on the migrated server. Make sure this query is lightweight")
|
throttleQuery := flag.String("throttle-query", "", "when given, issued (every second) to check if operation should throttle. Expecting to return zero for no-throttle, >0 for throttle. Query is issued on the migrated server. Make sure this query is lightweight")
|
||||||
throttleHTTP := flag.String("throttle-http", "", "when given, gh-ost checks given URL via HEAD request; any response code other than 200 (OK) causes throttling; make sure it has low latency response")
|
throttleHTTP := flag.String("throttle-http", "", "when given, gh-ost checks given URL via HEAD request; any response code other than 200 (OK) causes throttling; make sure it has low latency response")
|
||||||
|
flag.Int64Var(&migrationContext.ThrottleHTTPIntervalMillis, "throttle-http-interval-millis", 100, "Number of milliseconds to wait before triggering another HTTP throttle check")
|
||||||
|
flag.Int64Var(&migrationContext.ThrottleHTTPTimeoutMillis, "throttle-http-timeout-millis", 1000, "Number of milliseconds to use as an HTTP throttle check timeout")
|
||||||
ignoreHTTPErrors := flag.Bool("ignore-http-errors", false, "ignore HTTP connection errors during throttle check")
|
ignoreHTTPErrors := flag.Bool("ignore-http-errors", false, "ignore HTTP connection errors during throttle check")
|
||||||
heartbeatIntervalMillis := flag.Int64("heartbeat-interval-millis", 100, "how frequently would gh-ost inject a heartbeat value")
|
heartbeatIntervalMillis := flag.Int64("heartbeat-interval-millis", 100, "how frequently would gh-ost inject a heartbeat value")
|
||||||
flag.StringVar(&migrationContext.ThrottleFlagFile, "throttle-flag-file", "", "operation pauses when this file exists; hint: use a file that is specific to the table being altered")
|
flag.StringVar(&migrationContext.ThrottleFlagFile, "throttle-flag-file", "", "operation pauses when this file exists; hint: use a file that is specific to the table being altered")
|
||||||
@ -297,7 +299,7 @@ func main() {
|
|||||||
log.Infof("starting gh-ost %+v", AppVersion)
|
log.Infof("starting gh-ost %+v", AppVersion)
|
||||||
acceptSignals(migrationContext)
|
acceptSignals(migrationContext)
|
||||||
|
|
||||||
migrator := logic.NewMigrator(migrationContext)
|
migrator := logic.NewMigrator(migrationContext, AppVersion)
|
||||||
err := migrator.Migrate()
|
err := migrator.Migrate()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
migrator.ExecOnFailureHook()
|
migrator.ExecOnFailureHook()
|
||||||
|
@ -62,6 +62,7 @@ const (
|
|||||||
|
|
||||||
// Migrator is the main schema migration flow manager.
|
// Migrator is the main schema migration flow manager.
|
||||||
type Migrator struct {
|
type Migrator struct {
|
||||||
|
appVersion string
|
||||||
parser *sql.AlterTableParser
|
parser *sql.AlterTableParser
|
||||||
inspector *Inspector
|
inspector *Inspector
|
||||||
applier *Applier
|
applier *Applier
|
||||||
@ -87,8 +88,9 @@ type Migrator struct {
|
|||||||
finishedMigrating int64
|
finishedMigrating int64
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewMigrator(context *base.MigrationContext) *Migrator {
|
func NewMigrator(context *base.MigrationContext, appVersion string) *Migrator {
|
||||||
migrator := &Migrator{
|
migrator := &Migrator{
|
||||||
|
appVersion: appVersion,
|
||||||
migrationContext: context,
|
migrationContext: context,
|
||||||
parser: sql.NewAlterTableParser(),
|
parser: sql.NewAlterTableParser(),
|
||||||
ghostTableMigrated: make(chan bool),
|
ghostTableMigrated: make(chan bool),
|
||||||
@ -1068,7 +1070,7 @@ func (this *Migrator) addDMLEventsListener() error {
|
|||||||
|
|
||||||
// initiateThrottler kicks in the throttling collection and the throttling checks.
|
// initiateThrottler kicks in the throttling collection and the throttling checks.
|
||||||
func (this *Migrator) initiateThrottler() error {
|
func (this *Migrator) initiateThrottler() error {
|
||||||
this.throttler = NewThrottler(this.migrationContext, this.applier, this.inspector)
|
this.throttler = NewThrottler(this.migrationContext, this.applier, this.inspector, this.appVersion)
|
||||||
|
|
||||||
go this.throttler.initiateThrottlerCollection(this.firstThrottlingCollected)
|
go this.throttler.initiateThrottlerCollection(this.firstThrottlingCollected)
|
||||||
this.migrationContext.Log.Infof("Waiting for first throttle metrics to be collected")
|
this.migrationContext.Log.Infof("Waiting for first throttle metrics to be collected")
|
||||||
|
@ -6,6 +6,7 @@
|
|||||||
package logic
|
package logic
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
"strings"
|
"strings"
|
||||||
@ -42,16 +43,22 @@ const frenoMagicHint = "freno"
|
|||||||
// Throttler collects metrics related to throttling and makes informed decision
|
// Throttler collects metrics related to throttling and makes informed decision
|
||||||
// whether throttling should take place.
|
// whether throttling should take place.
|
||||||
type Throttler struct {
|
type Throttler struct {
|
||||||
|
appVersion string
|
||||||
migrationContext *base.MigrationContext
|
migrationContext *base.MigrationContext
|
||||||
applier *Applier
|
applier *Applier
|
||||||
|
httpClient *http.Client
|
||||||
|
httpClientTimeout time.Duration
|
||||||
inspector *Inspector
|
inspector *Inspector
|
||||||
finishedMigrating int64
|
finishedMigrating int64
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewThrottler(migrationContext *base.MigrationContext, applier *Applier, inspector *Inspector) *Throttler {
|
func NewThrottler(migrationContext *base.MigrationContext, applier *Applier, inspector *Inspector, appVersion string) *Throttler {
|
||||||
return &Throttler{
|
return &Throttler{
|
||||||
|
appVersion: appVersion,
|
||||||
migrationContext: migrationContext,
|
migrationContext: migrationContext,
|
||||||
applier: applier,
|
applier: applier,
|
||||||
|
httpClient: &http.Client{},
|
||||||
|
httpClientTimeout: time.Duration(migrationContext.ThrottleHTTPTimeoutMillis) * time.Millisecond,
|
||||||
inspector: inspector,
|
inspector: inspector,
|
||||||
finishedMigrating: 0,
|
finishedMigrating: 0,
|
||||||
}
|
}
|
||||||
@ -285,7 +292,17 @@ func (this *Throttler) collectThrottleHTTPStatus(firstThrottlingCollected chan<-
|
|||||||
if url == "" {
|
if url == "" {
|
||||||
return true, nil
|
return true, nil
|
||||||
}
|
}
|
||||||
resp, err := http.Head(url)
|
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), this.httpClientTimeout)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
req, err := http.NewRequestWithContext(ctx, http.MethodHead, url, nil)
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
req.Header.Set("User-Agent", fmt.Sprintf("gh-ost/%s", this.appVersion))
|
||||||
|
|
||||||
|
resp, err := this.httpClient.Do(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
@ -303,7 +320,8 @@ func (this *Throttler) collectThrottleHTTPStatus(firstThrottlingCollected chan<-
|
|||||||
|
|
||||||
firstThrottlingCollected <- true
|
firstThrottlingCollected <- true
|
||||||
|
|
||||||
ticker := time.Tick(100 * time.Millisecond)
|
collectInterval := time.Duration(this.migrationContext.ThrottleHTTPIntervalMillis) * time.Millisecond
|
||||||
|
ticker := time.Tick(collectInterval)
|
||||||
for range ticker {
|
for range ticker {
|
||||||
if atomic.LoadInt64(&this.finishedMigrating) > 0 {
|
if atomic.LoadInt64(&this.finishedMigrating) > 0 {
|
||||||
return
|
return
|
||||||
|
Loading…
Reference in New Issue
Block a user