Merge branch 'master' into master
This commit is contained in:
commit
109b82bb3a
@ -38,6 +38,10 @@ Both interfaces may serve at the same time. Both respond to simple text command,
|
||||
- `unpostpone`: at a time where `gh-ost` is postponing the [cut-over](cut-over.md) phase, instruct `gh-ost` to stop postponing and proceed immediately to cut-over.
|
||||
- `panic`: immediately panic and abort operation
|
||||
|
||||
### Querying for data
|
||||
|
||||
For commands that accept an argumetn as value, pass `?` (question mark) to _get_ current value rather than _set_ a new one.
|
||||
|
||||
### Examples
|
||||
|
||||
While migration is running:
|
||||
@ -63,6 +67,11 @@ $ echo "chunk-size=250" | nc -U /tmp/gh-ost.test.sample_data_0.sock
|
||||
# Serving on TCP port: 10001
|
||||
```
|
||||
|
||||
```shell
|
||||
$ echo "chunk-size=?" | nc -U /tmp/gh-ost.test.sample_data_0.sock
|
||||
250
|
||||
```
|
||||
|
||||
```shell
|
||||
$ echo throttle | nc -U /tmp/gh-ost.test.sample_data_0.sock
|
||||
|
||||
|
@ -561,7 +561,11 @@ func (this *MigrationContext) GetControlReplicasLagResult() mysql.ReplicationLag
|
||||
func (this *MigrationContext) SetControlReplicasLagResult(lagResult *mysql.ReplicationLagResult) {
|
||||
this.throttleMutex.Lock()
|
||||
defer this.throttleMutex.Unlock()
|
||||
this.controlReplicasLagResult = *lagResult
|
||||
if lagResult == nil {
|
||||
this.controlReplicasLagResult = *mysql.NewNoReplicationLagResult()
|
||||
} else {
|
||||
this.controlReplicasLagResult = *lagResult
|
||||
}
|
||||
}
|
||||
|
||||
func (this *MigrationContext) GetThrottleControlReplicaKeys() *mysql.InstanceKeyMap {
|
||||
|
@ -761,6 +761,12 @@ func (this *Migrator) printMigrationStatusHint(writers ...io.Writer) {
|
||||
throttleQuery,
|
||||
))
|
||||
}
|
||||
if throttleControlReplicaKeys := this.migrationContext.GetThrottleControlReplicaKeys(); throttleControlReplicaKeys.Len() > 0 {
|
||||
fmt.Fprintln(w, fmt.Sprintf("# throttle-control-replicas count: %+v",
|
||||
throttleControlReplicaKeys.Len(),
|
||||
))
|
||||
}
|
||||
|
||||
if this.migrationContext.PostponeCutOverFlagFile != "" {
|
||||
setIndicator := ""
|
||||
if base.FileExists(this.migrationContext.PostponeCutOverFlagFile) {
|
||||
@ -940,7 +946,9 @@ func (this *Migrator) initiateThrottler() error {
|
||||
|
||||
go this.throttler.initiateThrottlerCollection(this.firstThrottlingCollected)
|
||||
log.Infof("Waiting for first throttle metrics to be collected")
|
||||
<-this.firstThrottlingCollected
|
||||
<-this.firstThrottlingCollected // replication lag
|
||||
<-this.firstThrottlingCollected // other metrics
|
||||
log.Infof("First throttle metrics collected")
|
||||
go this.throttler.initiateThrottlerChecks()
|
||||
|
||||
return nil
|
||||
|
@ -126,7 +126,7 @@ func (this *Server) applyServerCommand(command string, writer *bufio.Writer) (pr
|
||||
if len(tokens) > 1 {
|
||||
arg = strings.TrimSpace(tokens[1])
|
||||
}
|
||||
|
||||
argIsQuestion := (arg == "?")
|
||||
throttleHint := "# Note: you may only throttle for as long as your binary logs are not purged\n"
|
||||
|
||||
if err := this.hooksExecutor.onInteractiveCommand(command); err != nil {
|
||||
@ -152,6 +152,7 @@ no-throttle # End forced throttling (other throttling m
|
||||
unpostpone # Bail out a cut-over postpone; proceed to cut-over
|
||||
panic # panic and quit without cleanup
|
||||
help # This message
|
||||
- use '?' (question mark) as argument to get info rather than set. e.g. "max-load=?" will just print out current max-load.
|
||||
`)
|
||||
}
|
||||
case "sup":
|
||||
@ -160,6 +161,10 @@ help # This message
|
||||
return ForcePrintStatusAndHintRule, nil
|
||||
case "chunk-size":
|
||||
{
|
||||
if argIsQuestion {
|
||||
fmt.Fprintf(writer, "%+v\n", atomic.LoadInt64(&this.migrationContext.ChunkSize))
|
||||
return NoPrintStatusRule, nil
|
||||
}
|
||||
if chunkSize, err := strconv.Atoi(arg); err != nil {
|
||||
return NoPrintStatusRule, err
|
||||
} else {
|
||||
@ -169,6 +174,10 @@ help # This message
|
||||
}
|
||||
case "max-lag-millis":
|
||||
{
|
||||
if argIsQuestion {
|
||||
fmt.Fprintf(writer, "%+v\n", atomic.LoadInt64(&this.migrationContext.MaxLagMillisecondsThrottleThreshold))
|
||||
return NoPrintStatusRule, nil
|
||||
}
|
||||
if maxLagMillis, err := strconv.Atoi(arg); err != nil {
|
||||
return NoPrintStatusRule, err
|
||||
} else {
|
||||
@ -182,6 +191,10 @@ help # This message
|
||||
}
|
||||
case "nice-ratio":
|
||||
{
|
||||
if argIsQuestion {
|
||||
fmt.Fprintf(writer, "%+v\n", this.migrationContext.GetNiceRatio())
|
||||
return NoPrintStatusRule, nil
|
||||
}
|
||||
if niceRatio, err := strconv.ParseFloat(arg, 64); err != nil {
|
||||
return NoPrintStatusRule, err
|
||||
} else {
|
||||
@ -191,6 +204,11 @@ help # This message
|
||||
}
|
||||
case "max-load":
|
||||
{
|
||||
if argIsQuestion {
|
||||
maxLoad := this.migrationContext.GetMaxLoad()
|
||||
fmt.Fprintf(writer, "%s\n", maxLoad.String())
|
||||
return NoPrintStatusRule, nil
|
||||
}
|
||||
if err := this.migrationContext.ReadMaxLoad(arg); err != nil {
|
||||
return NoPrintStatusRule, err
|
||||
}
|
||||
@ -198,6 +216,11 @@ help # This message
|
||||
}
|
||||
case "critical-load":
|
||||
{
|
||||
if argIsQuestion {
|
||||
criticalLoad := this.migrationContext.GetCriticalLoad()
|
||||
fmt.Fprintf(writer, "%s\n", criticalLoad.String())
|
||||
return NoPrintStatusRule, nil
|
||||
}
|
||||
if err := this.migrationContext.ReadCriticalLoad(arg); err != nil {
|
||||
return NoPrintStatusRule, err
|
||||
}
|
||||
@ -205,12 +228,20 @@ help # This message
|
||||
}
|
||||
case "throttle-query":
|
||||
{
|
||||
if argIsQuestion {
|
||||
fmt.Fprintf(writer, "%+v\n", this.migrationContext.GetThrottleQuery())
|
||||
return NoPrintStatusRule, nil
|
||||
}
|
||||
this.migrationContext.SetThrottleQuery(arg)
|
||||
fmt.Fprintf(writer, throttleHint)
|
||||
return ForcePrintStatusAndHintRule, nil
|
||||
}
|
||||
case "throttle-control-replicas":
|
||||
{
|
||||
if argIsQuestion {
|
||||
fmt.Fprintf(writer, "%s\n", this.migrationContext.GetThrottleControlReplicaKeys().ToCommaDelimitedList())
|
||||
return NoPrintStatusRule, nil
|
||||
}
|
||||
if err := this.migrationContext.ReadThrottleControlReplicaKeys(arg); err != nil {
|
||||
return NoPrintStatusRule, err
|
||||
}
|
||||
|
@ -85,32 +85,37 @@ func (this *Throttler) parseChangelogHeartbeat(heartbeatValue string) (err error
|
||||
}
|
||||
|
||||
// collectReplicationLag reads the latest changelog heartbeat value
|
||||
func (this *Throttler) collectReplicationLag() {
|
||||
func (this *Throttler) collectReplicationLag(firstThrottlingCollected chan<- bool) {
|
||||
collectFunc := func() error {
|
||||
if atomic.LoadInt64(&this.migrationContext.CleanupImminentFlag) > 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
if this.migrationContext.TestOnReplica || this.migrationContext.MigrateOnReplica {
|
||||
// when running on replica, the heartbeat injection is also done on the replica.
|
||||
// This means we will always get a good heartbeat value.
|
||||
// When runnign on replica, we should instead check the `SHOW SLAVE STATUS` output.
|
||||
if lag, err := mysql.GetReplicationLag(this.inspector.connectionConfig); err != nil {
|
||||
return log.Errore(err)
|
||||
} else {
|
||||
atomic.StoreInt64(&this.migrationContext.CurrentLag, int64(lag))
|
||||
}
|
||||
} else {
|
||||
if heartbeatValue, err := this.inspector.readChangelogState("heartbeat"); err != nil {
|
||||
return log.Errore(err)
|
||||
} else {
|
||||
this.parseChangelogHeartbeat(heartbeatValue)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
collectFunc()
|
||||
firstThrottlingCollected <- true
|
||||
|
||||
ticker := time.Tick(time.Duration(this.migrationContext.HeartbeatIntervalMilliseconds) * time.Millisecond)
|
||||
for range ticker {
|
||||
go func() error {
|
||||
if atomic.LoadInt64(&this.migrationContext.CleanupImminentFlag) > 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
if this.migrationContext.TestOnReplica || this.migrationContext.MigrateOnReplica {
|
||||
// when running on replica, the heartbeat injection is also done on the replica.
|
||||
// This means we will always get a good heartbeat value.
|
||||
// When runnign on replica, we should instead check the `SHOW SLAVE STATUS` output.
|
||||
if lag, err := mysql.GetReplicationLag(this.inspector.connectionConfig); err != nil {
|
||||
return log.Errore(err)
|
||||
} else {
|
||||
atomic.StoreInt64(&this.migrationContext.CurrentLag, int64(lag))
|
||||
}
|
||||
} else {
|
||||
if heartbeatValue, err := this.inspector.readChangelogState("heartbeat"); err != nil {
|
||||
return log.Errore(err)
|
||||
} else {
|
||||
this.parseChangelogHeartbeat(heartbeatValue)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}()
|
||||
go collectFunc()
|
||||
}
|
||||
}
|
||||
|
||||
@ -171,9 +176,7 @@ func (this *Throttler) collectControlReplicasLag() {
|
||||
// No need to read lag
|
||||
return
|
||||
}
|
||||
if result := readControlReplicasLag(); result != nil {
|
||||
this.migrationContext.SetControlReplicasLagResult(result)
|
||||
}
|
||||
this.migrationContext.SetControlReplicasLagResult(readControlReplicasLag())
|
||||
}
|
||||
aggressiveTicker := time.Tick(100 * time.Millisecond)
|
||||
relaxedFactor := 10
|
||||
@ -285,13 +288,14 @@ func (this *Throttler) collectGeneralThrottleMetrics() error {
|
||||
// that may affect throttling. There are several components, all running independently,
|
||||
// that collect such metrics.
|
||||
func (this *Throttler) initiateThrottlerCollection(firstThrottlingCollected chan<- bool) {
|
||||
go this.collectReplicationLag()
|
||||
go this.collectReplicationLag(firstThrottlingCollected)
|
||||
go this.collectControlReplicasLag()
|
||||
|
||||
go func() {
|
||||
throttlerMetricsTick := time.Tick(1 * time.Second)
|
||||
this.collectGeneralThrottleMetrics()
|
||||
firstThrottlingCollected <- true
|
||||
|
||||
throttlerMetricsTick := time.Tick(1 * time.Second)
|
||||
for range throttlerMetricsTick {
|
||||
this.collectGeneralThrottleMetrics()
|
||||
}
|
||||
|
@ -22,6 +22,14 @@ type ReplicationLagResult struct {
|
||||
Err error
|
||||
}
|
||||
|
||||
func NewNoReplicationLagResult() *ReplicationLagResult {
|
||||
return &ReplicationLagResult{Lag: 0, Err: nil}
|
||||
}
|
||||
|
||||
func (this *ReplicationLagResult) HasLag() bool {
|
||||
return this.Lag > 0
|
||||
}
|
||||
|
||||
// GetReplicationLag returns replication lag for a given connection config; either by explicit query
|
||||
// or via SHOW SLAVE STATUS
|
||||
func GetReplicationLag(connectionConfig *ConnectionConfig) (replicationLag time.Duration, err error) {
|
||||
@ -32,9 +40,11 @@ func GetReplicationLag(connectionConfig *ConnectionConfig) (replicationLag time.
|
||||
}
|
||||
|
||||
err = sqlutils.QueryRowsMap(db, `show slave status`, func(m sqlutils.RowMap) error {
|
||||
slaveIORunning := m.GetString("Slave_IO_Running")
|
||||
slaveSQLRunning := m.GetString("Slave_SQL_Running")
|
||||
secondsBehindMaster := m.GetNullInt64("Seconds_Behind_Master")
|
||||
if !secondsBehindMaster.Valid {
|
||||
return fmt.Errorf("replication not running")
|
||||
return fmt.Errorf("replication not running; Slave_IO_Running=%+v, Slave_SQL_Running=%+v", slaveIORunning, slaveSQLRunning)
|
||||
}
|
||||
replicationLag = time.Duration(secondsBehindMaster.Int64) * time.Second
|
||||
return nil
|
||||
|
@ -88,7 +88,7 @@ test_single() {
|
||||
--throttle-query='select timestampdiff(second, min(last_update), now()) < 5 from _gh_ost_test_ghc' \
|
||||
--serve-socket-file=/tmp/gh-ost.test.sock \
|
||||
--initially-drop-socket-file \
|
||||
--postpone-cut-over-flag-file="" \
|
||||
--postpone-cut-over-flag-file=/tmp/gh-ost.test.postpone.flag \
|
||||
--test-on-replica \
|
||||
--default-retries=1 \
|
||||
--verbose \
|
||||
|
Loading…
Reference in New Issue
Block a user