Merge in upstream
This commit is contained in:
commit
79b9e7be7c
@ -131,6 +131,14 @@ See `approve-renamed-columns`
|
|||||||
|
|
||||||
Issue the migration on a replica; do not modify data on master. Useful for validating, testing and benchmarking. See [testing-on-replica](testing-on-replica.md)
|
Issue the migration on a replica; do not modify data on master. Useful for validating, testing and benchmarking. See [testing-on-replica](testing-on-replica.md)
|
||||||
|
|
||||||
|
### throttle-control-replicas
|
||||||
|
|
||||||
|
Provide a command delimited list of replicas; `gh-ost` will throttle when any of the given replicas lag beyond `--max-lag-millis`. The list can be queried and updated dynamically via [interactive commands](interactive-commands.md)
|
||||||
|
|
||||||
|
### throttle-http
|
||||||
|
|
||||||
|
Provide a HTTP endpoint; `gh-ost` will issue `HEAD` requests on given URL and throttle whenever response status code is not `200`. The URL can be queried and updated dynamically via [interactive commands](interactive-commands.md). Empty URL disables the HTTP check.
|
||||||
|
|
||||||
### timestamp-old-table
|
### timestamp-old-table
|
||||||
|
|
||||||
Makes the _old_ table include a timestamp value. The _old_ table is what the original table is renamed to at the end of a successful migration. For example, if the table is `gh_ost_test`, then the _old_ table would normally be `_gh_ost_test_del`. With `--timestamp-old-table` it would be, for example, `_gh_ost_test_20170221103147_del`.
|
Makes the _old_ table include a timestamp value. The _old_ table is what the original table is renamed to at the end of a successful migration. For example, if the table is `gh_ost_test`, then the _old_ table would normally be `_gh_ost_test_del`. With `--timestamp-old-table` it would be, for example, `_gh_ost_test_20170221103147_del`.
|
||||||
|
@ -31,6 +31,7 @@ Both interfaces may serve at the same time. Both respond to simple text command,
|
|||||||
- `nice-ratio=0.5` will cause `gh-ost` to sleep for `50ms` immediately following.
|
- `nice-ratio=0.5` will cause `gh-ost` to sleep for `50ms` immediately following.
|
||||||
- `nice-ratio=1` will cause `gh-ost` to sleep for `100ms`, effectively doubling runtime
|
- `nice-ratio=1` will cause `gh-ost` to sleep for `100ms`, effectively doubling runtime
|
||||||
- value of `2` will effectively triple the runtime; etc.
|
- value of `2` will effectively triple the runtime; etc.
|
||||||
|
- `throttle-http`: change throttle HTTP endpoint
|
||||||
- `throttle-query`: change throttle query
|
- `throttle-query`: change throttle query
|
||||||
- `throttle-control-replicas='replica1,replica2'`: change list of throttle-control replicas, these are replicas `gh-ost` will check. This takes a comma separated list of replica's to check and replaces the previous list.
|
- `throttle-control-replicas='replica1,replica2'`: change list of throttle-control replicas, these are replicas `gh-ost` will check. This takes a comma separated list of replica's to check and replaces the previous list.
|
||||||
- `throttle`: force migration suspend
|
- `throttle`: force migration suspend
|
||||||
|
@ -44,6 +44,10 @@ const (
|
|||||||
UserCommandThrottleReasonHint = "UserCommandThrottleReasonHint"
|
UserCommandThrottleReasonHint = "UserCommandThrottleReasonHint"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
HTTPStatusOK = 200
|
||||||
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
envVariableRegexp = regexp.MustCompile("[$][{](.*)[}]")
|
envVariableRegexp = regexp.MustCompile("[$][{](.*)[}]")
|
||||||
)
|
)
|
||||||
@ -99,6 +103,7 @@ type MigrationContext struct {
|
|||||||
ThrottleFlagFile string
|
ThrottleFlagFile string
|
||||||
ThrottleAdditionalFlagFile string
|
ThrottleAdditionalFlagFile string
|
||||||
throttleQuery string
|
throttleQuery string
|
||||||
|
throttleHTTP string
|
||||||
ThrottleCommandedByUser int64
|
ThrottleCommandedByUser int64
|
||||||
maxLoad LoadMap
|
maxLoad LoadMap
|
||||||
criticalLoad LoadMap
|
criticalLoad LoadMap
|
||||||
@ -148,6 +153,7 @@ type MigrationContext struct {
|
|||||||
pointOfInterestTime time.Time
|
pointOfInterestTime time.Time
|
||||||
pointOfInterestTimeMutex *sync.Mutex
|
pointOfInterestTimeMutex *sync.Mutex
|
||||||
CurrentLag int64
|
CurrentLag int64
|
||||||
|
ThrottleHTTPStatusCode int64
|
||||||
controlReplicasLagResult mysql.ReplicationLagResult
|
controlReplicasLagResult mysql.ReplicationLagResult
|
||||||
TotalRowsCopied int64
|
TotalRowsCopied int64
|
||||||
TotalDMLEventsApplied int64
|
TotalDMLEventsApplied int64
|
||||||
@ -157,6 +163,7 @@ type MigrationContext struct {
|
|||||||
throttleReasonHint ThrottleReasonHint
|
throttleReasonHint ThrottleReasonHint
|
||||||
throttleGeneralCheckResult ThrottleCheckResult
|
throttleGeneralCheckResult ThrottleCheckResult
|
||||||
throttleMutex *sync.Mutex
|
throttleMutex *sync.Mutex
|
||||||
|
throttleHTTPMutex *sync.Mutex
|
||||||
IsPostponingCutOver int64
|
IsPostponingCutOver int64
|
||||||
CountingRowsFlag int64
|
CountingRowsFlag int64
|
||||||
AllEventsUpToLockProcessedInjectedFlag int64
|
AllEventsUpToLockProcessedInjectedFlag int64
|
||||||
@ -215,6 +222,7 @@ func newMigrationContext() *MigrationContext {
|
|||||||
maxLoad: NewLoadMap(),
|
maxLoad: NewLoadMap(),
|
||||||
criticalLoad: NewLoadMap(),
|
criticalLoad: NewLoadMap(),
|
||||||
throttleMutex: &sync.Mutex{},
|
throttleMutex: &sync.Mutex{},
|
||||||
|
throttleHTTPMutex: &sync.Mutex{},
|
||||||
throttleControlReplicaKeys: mysql.NewInstanceKeyMap(),
|
throttleControlReplicaKeys: mysql.NewInstanceKeyMap(),
|
||||||
configMutex: &sync.Mutex{},
|
configMutex: &sync.Mutex{},
|
||||||
pointOfInterestTimeMutex: &sync.Mutex{},
|
pointOfInterestTimeMutex: &sync.Mutex{},
|
||||||
@ -472,12 +480,10 @@ func (this *MigrationContext) IsThrottled() (bool, string, ThrottleReasonHint) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (this *MigrationContext) GetThrottleQuery() string {
|
func (this *MigrationContext) GetThrottleQuery() string {
|
||||||
var query string
|
|
||||||
|
|
||||||
this.throttleMutex.Lock()
|
this.throttleMutex.Lock()
|
||||||
defer this.throttleMutex.Unlock()
|
defer this.throttleMutex.Unlock()
|
||||||
|
|
||||||
query = this.throttleQuery
|
var query = this.throttleQuery
|
||||||
return query
|
return query
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -488,6 +494,21 @@ func (this *MigrationContext) SetThrottleQuery(newQuery string) {
|
|||||||
this.throttleQuery = newQuery
|
this.throttleQuery = newQuery
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (this *MigrationContext) GetThrottleHTTP() string {
|
||||||
|
this.throttleHTTPMutex.Lock()
|
||||||
|
defer this.throttleHTTPMutex.Unlock()
|
||||||
|
|
||||||
|
var throttleHTTP = this.throttleHTTP
|
||||||
|
return throttleHTTP
|
||||||
|
}
|
||||||
|
|
||||||
|
func (this *MigrationContext) SetThrottleHTTP(throttleHTTP string) {
|
||||||
|
this.throttleHTTPMutex.Lock()
|
||||||
|
defer this.throttleHTTPMutex.Unlock()
|
||||||
|
|
||||||
|
this.throttleHTTP = throttleHTTP
|
||||||
|
}
|
||||||
|
|
||||||
func (this *MigrationContext) GetMaxLoad() LoadMap {
|
func (this *MigrationContext) GetMaxLoad() LoadMap {
|
||||||
this.throttleMutex.Lock()
|
this.throttleMutex.Lock()
|
||||||
defer this.throttleMutex.Unlock()
|
defer this.throttleMutex.Unlock()
|
||||||
|
@ -93,6 +93,7 @@ func main() {
|
|||||||
replicationLagQuery := flag.String("replication-lag-query", "", "Deprecated. gh-ost uses an internal, subsecond resolution query")
|
replicationLagQuery := flag.String("replication-lag-query", "", "Deprecated. gh-ost uses an internal, subsecond resolution query")
|
||||||
throttleControlReplicas := flag.String("throttle-control-replicas", "", "List of replicas on which to check for lag; comma delimited. Example: myhost1.com:3306,myhost2.com,myhost3.com:3307")
|
throttleControlReplicas := flag.String("throttle-control-replicas", "", "List of replicas on which to check for lag; comma delimited. Example: myhost1.com:3306,myhost2.com,myhost3.com:3307")
|
||||||
throttleQuery := flag.String("throttle-query", "", "when given, issued (every second) to check if operation should throttle. Expecting to return zero for no-throttle, >0 for throttle. Query is issued on the migrated server. Make sure this query is lightweight")
|
throttleQuery := flag.String("throttle-query", "", "when given, issued (every second) to check if operation should throttle. Expecting to return zero for no-throttle, >0 for throttle. Query is issued on the migrated server. Make sure this query is lightweight")
|
||||||
|
throttleHTTP := flag.String("throttle-http", "", "when given, gh-ost checks given URL via HEAD request; any response code other than 200 (OK) causes throttling; make sure it has low latency response")
|
||||||
heartbeatIntervalMillis := flag.Int64("heartbeat-interval-millis", 100, "how frequently would gh-ost inject a heartbeat value")
|
heartbeatIntervalMillis := flag.Int64("heartbeat-interval-millis", 100, "how frequently would gh-ost inject a heartbeat value")
|
||||||
flag.StringVar(&migrationContext.ThrottleFlagFile, "throttle-flag-file", "", "operation pauses when this file exists; hint: use a file that is specific to the table being altered")
|
flag.StringVar(&migrationContext.ThrottleFlagFile, "throttle-flag-file", "", "operation pauses when this file exists; hint: use a file that is specific to the table being altered")
|
||||||
flag.StringVar(&migrationContext.ThrottleAdditionalFlagFile, "throttle-additional-flag-file", "/tmp/gh-ost.throttle", "operation pauses when this file exists; hint: keep default, use for throttling multiple gh-ost operations")
|
flag.StringVar(&migrationContext.ThrottleAdditionalFlagFile, "throttle-additional-flag-file", "/tmp/gh-ost.throttle", "operation pauses when this file exists; hint: keep default, use for throttling multiple gh-ost operations")
|
||||||
@ -228,6 +229,7 @@ func main() {
|
|||||||
migrationContext.SetDMLBatchSize(*dmlBatchSize)
|
migrationContext.SetDMLBatchSize(*dmlBatchSize)
|
||||||
migrationContext.SetMaxLagMillisecondsThrottleThreshold(*maxLagMillis)
|
migrationContext.SetMaxLagMillisecondsThrottleThreshold(*maxLagMillis)
|
||||||
migrationContext.SetThrottleQuery(*throttleQuery)
|
migrationContext.SetThrottleQuery(*throttleQuery)
|
||||||
|
migrationContext.SetThrottleHTTP(*throttleHTTP)
|
||||||
migrationContext.SetDefaultNumRetries(*defaultRetries)
|
migrationContext.SetDefaultNumRetries(*defaultRetries)
|
||||||
migrationContext.ApplyCredentials()
|
migrationContext.ApplyCredentials()
|
||||||
if err := migrationContext.SetCutOverLockTimeoutSeconds(*cutOverLockTimeoutSeconds); err != nil {
|
if err := migrationContext.SetCutOverLockTimeoutSeconds(*cutOverLockTimeoutSeconds); err != nil {
|
||||||
|
@ -96,7 +96,7 @@ func NewMigrator() *Migrator {
|
|||||||
migrationContext: base.GetMigrationContext(),
|
migrationContext: base.GetMigrationContext(),
|
||||||
parser: sql.NewParser(),
|
parser: sql.NewParser(),
|
||||||
ghostTableMigrated: make(chan bool),
|
ghostTableMigrated: make(chan bool),
|
||||||
firstThrottlingCollected: make(chan bool, 1),
|
firstThrottlingCollected: make(chan bool, 3),
|
||||||
rowCopyComplete: make(chan bool),
|
rowCopyComplete: make(chan bool),
|
||||||
allEventsUpToLockProcessed: make(chan string),
|
allEventsUpToLockProcessed: make(chan string),
|
||||||
|
|
||||||
@ -977,7 +977,8 @@ func (this *Migrator) initiateThrottler() error {
|
|||||||
go this.throttler.initiateThrottlerCollection(this.firstThrottlingCollected)
|
go this.throttler.initiateThrottlerCollection(this.firstThrottlingCollected)
|
||||||
log.Infof("Waiting for first throttle metrics to be collected")
|
log.Infof("Waiting for first throttle metrics to be collected")
|
||||||
<-this.firstThrottlingCollected // replication lag
|
<-this.firstThrottlingCollected // replication lag
|
||||||
<-this.firstThrottlingCollected // other metrics
|
<-this.firstThrottlingCollected // HTTP status
|
||||||
|
<-this.firstThrottlingCollected // other, general metrics
|
||||||
log.Infof("First throttle metrics collected")
|
log.Infof("First throttle metrics collected")
|
||||||
go this.throttler.initiateThrottlerChecks()
|
go this.throttler.initiateThrottlerChecks()
|
||||||
|
|
||||||
|
@ -146,6 +146,7 @@ max-lag-millis=<max-lag> # Set a new replication lag threshold
|
|||||||
replication-lag-query=<query> # Set a new query that determines replication lag (no quotes)
|
replication-lag-query=<query> # Set a new query that determines replication lag (no quotes)
|
||||||
max-load=<load> # Set a new set of max-load thresholds
|
max-load=<load> # Set a new set of max-load thresholds
|
||||||
throttle-query=<query> # Set a new throttle-query (no quotes)
|
throttle-query=<query> # Set a new throttle-query (no quotes)
|
||||||
|
throttle-http=<URL> # Set a new throttle URL
|
||||||
throttle-control-replicas=<replicas> # Set a new comma delimited list of throttle control replicas
|
throttle-control-replicas=<replicas> # Set a new comma delimited list of throttle control replicas
|
||||||
throttle # Force throttling
|
throttle # Force throttling
|
||||||
no-throttle # End forced throttling (other throttling may still apply)
|
no-throttle # End forced throttling (other throttling may still apply)
|
||||||
@ -236,6 +237,16 @@ help # This message
|
|||||||
fmt.Fprintf(writer, throttleHint)
|
fmt.Fprintf(writer, throttleHint)
|
||||||
return ForcePrintStatusAndHintRule, nil
|
return ForcePrintStatusAndHintRule, nil
|
||||||
}
|
}
|
||||||
|
case "throttle-http":
|
||||||
|
{
|
||||||
|
if argIsQuestion {
|
||||||
|
fmt.Fprintf(writer, "%+v\n", this.migrationContext.GetThrottleHTTP())
|
||||||
|
return NoPrintStatusRule, nil
|
||||||
|
}
|
||||||
|
this.migrationContext.SetThrottleHTTP(arg)
|
||||||
|
fmt.Fprintf(writer, throttleHint)
|
||||||
|
return ForcePrintStatusAndHintRule, nil
|
||||||
|
}
|
||||||
case "throttle-control-replicas":
|
case "throttle-control-replicas":
|
||||||
{
|
{
|
||||||
if argIsQuestion {
|
if argIsQuestion {
|
||||||
|
@ -7,6 +7,7 @@ package logic
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"net/http"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -41,6 +42,11 @@ func (this *Throttler) shouldThrottle() (result bool, reason string, reasonHint
|
|||||||
if generalCheckResult.ShouldThrottle {
|
if generalCheckResult.ShouldThrottle {
|
||||||
return generalCheckResult.ShouldThrottle, generalCheckResult.Reason, generalCheckResult.ReasonHint
|
return generalCheckResult.ShouldThrottle, generalCheckResult.Reason, generalCheckResult.ReasonHint
|
||||||
}
|
}
|
||||||
|
// HTTP throttle
|
||||||
|
statusCode := atomic.LoadInt64(&this.migrationContext.ThrottleHTTPStatusCode)
|
||||||
|
if statusCode != 0 && statusCode != http.StatusOK {
|
||||||
|
return true, fmt.Sprintf("http=%d", statusCode), base.NoThrottleReasonHint
|
||||||
|
}
|
||||||
// Replication lag throttle
|
// Replication lag throttle
|
||||||
maxLagMillisecondsThrottleThreshold := atomic.LoadInt64(&this.migrationContext.MaxLagMillisecondsThrottleThreshold)
|
maxLagMillisecondsThrottleThreshold := atomic.LoadInt64(&this.migrationContext.MaxLagMillisecondsThrottleThreshold)
|
||||||
lag := atomic.LoadInt64(&this.migrationContext.CurrentLag)
|
lag := atomic.LoadInt64(&this.migrationContext.CurrentLag)
|
||||||
@ -213,6 +219,32 @@ func (this *Throttler) criticalLoadIsMet() (met bool, variableName string, value
|
|||||||
return false, variableName, value, threshold, nil
|
return false, variableName, value, threshold, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// collectReplicationLag reads the latest changelog heartbeat value
|
||||||
|
func (this *Throttler) collectThrottleHTTPStatus(firstThrottlingCollected chan<- bool) {
|
||||||
|
collectFunc := func() (sleep bool, err error) {
|
||||||
|
url := this.migrationContext.GetThrottleHTTP()
|
||||||
|
if url == "" {
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
resp, err := http.Head(url)
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
atomic.StoreInt64(&this.migrationContext.ThrottleHTTPStatusCode, int64(resp.StatusCode))
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
collectFunc()
|
||||||
|
firstThrottlingCollected <- true
|
||||||
|
|
||||||
|
ticker := time.Tick(100 * time.Millisecond)
|
||||||
|
for range ticker {
|
||||||
|
if sleep, _ := collectFunc(); sleep {
|
||||||
|
time.Sleep(1 * time.Second)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// collectGeneralThrottleMetrics reads the once-per-sec metrics, and stores them onto this.migrationContext
|
// collectGeneralThrottleMetrics reads the once-per-sec metrics, and stores them onto this.migrationContext
|
||||||
func (this *Throttler) collectGeneralThrottleMetrics() error {
|
func (this *Throttler) collectGeneralThrottleMetrics() error {
|
||||||
|
|
||||||
@ -290,6 +322,7 @@ func (this *Throttler) collectGeneralThrottleMetrics() error {
|
|||||||
func (this *Throttler) initiateThrottlerCollection(firstThrottlingCollected chan<- bool) {
|
func (this *Throttler) initiateThrottlerCollection(firstThrottlingCollected chan<- bool) {
|
||||||
go this.collectReplicationLag(firstThrottlingCollected)
|
go this.collectReplicationLag(firstThrottlingCollected)
|
||||||
go this.collectControlReplicasLag()
|
go this.collectControlReplicasLag()
|
||||||
|
go this.collectThrottleHTTPStatus(firstThrottlingCollected)
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
this.collectGeneralThrottleMetrics()
|
this.collectGeneralThrottleMetrics()
|
||||||
|
Loading…
Reference in New Issue
Block a user