supporting throttle-http

This commit is contained in:
Shlomi Noach 2017-03-26 13:10:34 +03:00
parent f3fb0ea24a
commit 23ce390d69
5 changed files with 73 additions and 5 deletions

View File

@ -44,6 +44,10 @@ const (
UserCommandThrottleReasonHint = "UserCommandThrottleReasonHint"
)
const (
HTTPStatusOK = 200
)
var (
envVariableRegexp = regexp.MustCompile("[$][{](.*)[}]")
)
@ -99,6 +103,7 @@ type MigrationContext struct {
ThrottleFlagFile string
ThrottleAdditionalFlagFile string
throttleQuery string
throttleHTTP string
ThrottleCommandedByUser int64
maxLoad LoadMap
criticalLoad LoadMap
@ -148,6 +153,7 @@ type MigrationContext struct {
pointOfInterestTime time.Time
pointOfInterestTimeMutex *sync.Mutex
CurrentLag int64
ThrottleHTTPStatusCode int64
controlReplicasLagResult mysql.ReplicationLagResult
TotalRowsCopied int64
TotalDMLEventsApplied int64
@ -157,6 +163,7 @@ type MigrationContext struct {
throttleReasonHint ThrottleReasonHint
throttleGeneralCheckResult ThrottleCheckResult
throttleMutex *sync.Mutex
throttleHTTPMutex *sync.Mutex
IsPostponingCutOver int64
CountingRowsFlag int64
AllEventsUpToLockProcessedInjectedFlag int64
@ -215,6 +222,7 @@ func newMigrationContext() *MigrationContext {
maxLoad: NewLoadMap(),
criticalLoad: NewLoadMap(),
throttleMutex: &sync.Mutex{},
throttleHTTPMutex: &sync.Mutex{},
throttleControlReplicaKeys: mysql.NewInstanceKeyMap(),
configMutex: &sync.Mutex{},
pointOfInterestTimeMutex: &sync.Mutex{},
@ -472,12 +480,10 @@ func (this *MigrationContext) IsThrottled() (bool, string, ThrottleReasonHint) {
}
func (this *MigrationContext) GetThrottleQuery() string {
var query string
this.throttleMutex.Lock()
defer this.throttleMutex.Unlock()
query = this.throttleQuery
var query = this.throttleQuery
return query
}
@ -488,6 +494,21 @@ func (this *MigrationContext) SetThrottleQuery(newQuery string) {
this.throttleQuery = newQuery
}
func (this *MigrationContext) GetThrottleHTTP() string {
this.throttleHTTPMutex.Lock()
defer this.throttleHTTPMutex.Unlock()
var throttleHTTP = this.throttleHTTP
return throttleHTTP
}
func (this *MigrationContext) SetThrottleHTTP(throttleHTTP string) {
this.throttleHTTPMutex.Lock()
defer this.throttleHTTPMutex.Unlock()
this.throttleHTTP = throttleHTTP
}
func (this *MigrationContext) GetMaxLoad() LoadMap {
this.throttleMutex.Lock()
defer this.throttleMutex.Unlock()

View File

@ -93,6 +93,7 @@ func main() {
replicationLagQuery := flag.String("replication-lag-query", "", "Deprecated. gh-ost uses an internal, subsecond resolution query")
throttleControlReplicas := flag.String("throttle-control-replicas", "", "List of replicas on which to check for lag; comma delimited. Example: myhost1.com:3306,myhost2.com,myhost3.com:3307")
throttleQuery := flag.String("throttle-query", "", "when given, issued (every second) to check if operation should throttle. Expecting to return zero for no-throttle, >0 for throttle. Query is issued on the migrated server. Make sure this query is lightweight")
throttleHTTP := flag.String("throttle-http", "", "when given, gh-ost checks given URL via HEAD request; any response code other than 200 (OK) causes throttling; make sure it has low latency response")
heartbeatIntervalMillis := flag.Int64("heartbeat-interval-millis", 100, "how frequently would gh-ost inject a heartbeat value")
flag.StringVar(&migrationContext.ThrottleFlagFile, "throttle-flag-file", "", "operation pauses when this file exists; hint: use a file that is specific to the table being altered")
flag.StringVar(&migrationContext.ThrottleAdditionalFlagFile, "throttle-additional-flag-file", "/tmp/gh-ost.throttle", "operation pauses when this file exists; hint: keep default, use for throttling multiple gh-ost operations")
@ -228,6 +229,7 @@ func main() {
migrationContext.SetDMLBatchSize(*dmlBatchSize)
migrationContext.SetMaxLagMillisecondsThrottleThreshold(*maxLagMillis)
migrationContext.SetThrottleQuery(*throttleQuery)
migrationContext.SetThrottleHTTP(*throttleHTTP)
migrationContext.SetDefaultNumRetries(*defaultRetries)
migrationContext.ApplyCredentials()
if err := migrationContext.SetCutOverLockTimeoutSeconds(*cutOverLockTimeoutSeconds); err != nil {

View File

@ -96,7 +96,7 @@ func NewMigrator() *Migrator {
migrationContext: base.GetMigrationContext(),
parser: sql.NewParser(),
ghostTableMigrated: make(chan bool),
firstThrottlingCollected: make(chan bool, 1),
firstThrottlingCollected: make(chan bool, 3),
rowCopyComplete: make(chan bool),
allEventsUpToLockProcessed: make(chan string),
@ -977,7 +977,8 @@ func (this *Migrator) initiateThrottler() error {
go this.throttler.initiateThrottlerCollection(this.firstThrottlingCollected)
log.Infof("Waiting for first throttle metrics to be collected")
<-this.firstThrottlingCollected // replication lag
<-this.firstThrottlingCollected // other metrics
<-this.firstThrottlingCollected // HTTP status
<-this.firstThrottlingCollected // other, general metrics
log.Infof("First throttle metrics collected")
go this.throttler.initiateThrottlerChecks()

View File

@ -146,6 +146,7 @@ max-lag-millis=<max-lag> # Set a new replication lag threshold
replication-lag-query=<query> # Set a new query that determines replication lag (no quotes)
max-load=<load> # Set a new set of max-load thresholds
throttle-query=<query> # Set a new throttle-query (no quotes)
throttle-http=<URL> # Set a new throttle URL
throttle-control-replicas=<replicas> # Set a new comma delimited list of throttle control replicas
throttle # Force throttling
no-throttle # End forced throttling (other throttling may still apply)
@ -236,6 +237,16 @@ help # This message
fmt.Fprintf(writer, throttleHint)
return ForcePrintStatusAndHintRule, nil
}
case "throttle-http":
{
if argIsQuestion {
fmt.Fprintf(writer, "%+v\n", this.migrationContext.GetThrottleHTTP())
return NoPrintStatusRule, nil
}
this.migrationContext.SetThrottleHTTP(arg)
fmt.Fprintf(writer, throttleHint)
return ForcePrintStatusAndHintRule, nil
}
case "throttle-control-replicas":
{
if argIsQuestion {

View File

@ -7,6 +7,7 @@ package logic
import (
"fmt"
"net/http"
"sync/atomic"
"time"
@ -41,6 +42,11 @@ func (this *Throttler) shouldThrottle() (result bool, reason string, reasonHint
if generalCheckResult.ShouldThrottle {
return generalCheckResult.ShouldThrottle, generalCheckResult.Reason, generalCheckResult.ReasonHint
}
// HTTP throttle
statusCode := atomic.LoadInt64(&this.migrationContext.ThrottleHTTPStatusCode)
if statusCode != 0 && statusCode != http.StatusOK {
return true, fmt.Sprintf("http=%d", statusCode), base.NoThrottleReasonHint
}
// Replication lag throttle
maxLagMillisecondsThrottleThreshold := atomic.LoadInt64(&this.migrationContext.MaxLagMillisecondsThrottleThreshold)
lag := atomic.LoadInt64(&this.migrationContext.CurrentLag)
@ -213,6 +219,32 @@ func (this *Throttler) criticalLoadIsMet() (met bool, variableName string, value
return false, variableName, value, threshold, nil
}
// collectReplicationLag reads the latest changelog heartbeat value
func (this *Throttler) collectThrottleHTTPStatus(firstThrottlingCollected chan<- bool) {
collectFunc := func() (sleep bool, err error) {
url := this.migrationContext.GetThrottleHTTP()
if url == "" {
return true, nil
}
resp, err := http.Get(url)
if err != nil {
return false, err
}
atomic.StoreInt64(&this.migrationContext.ThrottleHTTPStatusCode, int64(resp.StatusCode))
return false, nil
}
collectFunc()
firstThrottlingCollected <- true
ticker := time.Tick(100 * time.Millisecond)
for range ticker {
if sleep, _ := collectFunc(); sleep {
time.Sleep(1 * time.Second)
}
}
}
// collectGeneralThrottleMetrics reads the once-per-sec metrics, and stores them onto this.migrationContext
func (this *Throttler) collectGeneralThrottleMetrics() error {
@ -290,6 +322,7 @@ func (this *Throttler) collectGeneralThrottleMetrics() error {
func (this *Throttler) initiateThrottlerCollection(firstThrottlingCollected chan<- bool) {
go this.collectReplicationLag(firstThrottlingCollected)
go this.collectControlReplicasLag()
go this.collectThrottleHTTPStatus(firstThrottlingCollected)
go func() {
this.collectGeneralThrottleMetrics()