Merge branch 'master' into fix-infinite-cutover-loop
This commit is contained in:
commit
289ce46a2b
@ -146,8 +146,12 @@ gh-ost --allow-master-master --assume-master-host=a.specific.master.com
|
||||
|
||||
Topologies using _tungsten replicator_ are peculiar in that the participating servers are not actually aware they are replicating. The _tungsten replicator_ looks just like another app issuing queries on those hosts. `gh-ost` is unable to identify that a server participates in a _tungsten_ topology.
|
||||
|
||||
If you choose to migrate directly on master (see above), there's nothing special you need to do. If you choose to migrate via replica, then you must supply the identity of the master, and indicate this is a tungsten setup, as follows:
|
||||
If you choose to migrate directly on master (see above), there's nothing special you need to do.
|
||||
|
||||
If you choose to migrate via replica, then you need to make sure Tungsten is configured with log-slave-updates parameter (note this is different from MySQL's own log-slave-updates parameter), otherwise changes will not be in the replica's binlog, causing data to be corrupted after table swap. You must also supply the identity of the master, and indicate this is a tungsten setup, as follows:
|
||||
|
||||
```
|
||||
gh-ost --tungsten --assume-master-host=the.topology.master.com
|
||||
```
|
||||
|
||||
Also note that `--switch-to-rbr` does not work for a Tungsten setup as the replication process is external, so you need to make sure `binlog_format` is set to ROW before Tungsten Replicator connects to the server and starts applying events from the master.
|
||||
|
@ -38,6 +38,10 @@ Both interfaces may serve at the same time. Both respond to simple text command,
|
||||
- `unpostpone`: at a time where `gh-ost` is postponing the [cut-over](cut-over.md) phase, instruct `gh-ost` to stop postponing and proceed immediately to cut-over.
|
||||
- `panic`: immediately panic and abort operation
|
||||
|
||||
### Querying for data
|
||||
|
||||
For commands that accept an argumetn as value, pass `?` (question mark) to _get_ current value rather than _set_ a new one.
|
||||
|
||||
### Examples
|
||||
|
||||
While migration is running:
|
||||
@ -63,6 +67,11 @@ $ echo "chunk-size=250" | nc -U /tmp/gh-ost.test.sample_data_0.sock
|
||||
# Serving on TCP port: 10001
|
||||
```
|
||||
|
||||
```shell
|
||||
$ echo "chunk-size=?" | nc -U /tmp/gh-ost.test.sample_data_0.sock
|
||||
250
|
||||
```
|
||||
|
||||
```shell
|
||||
$ echo throttle | nc -U /tmp/gh-ost.test.sample_data_0.sock
|
||||
|
||||
|
@ -135,7 +135,9 @@ type MigrationContext struct {
|
||||
OriginalBinlogFormat string
|
||||
OriginalBinlogRowImage string
|
||||
InspectorConnectionConfig *mysql.ConnectionConfig
|
||||
InspectorMySQLVersion string
|
||||
ApplierConnectionConfig *mysql.ConnectionConfig
|
||||
ApplierMySQLVersion string
|
||||
StartTime time.Time
|
||||
RowCopyStartTime time.Time
|
||||
RowCopyEndTime time.Time
|
||||
@ -559,7 +561,11 @@ func (this *MigrationContext) GetControlReplicasLagResult() mysql.ReplicationLag
|
||||
func (this *MigrationContext) SetControlReplicasLagResult(lagResult *mysql.ReplicationLagResult) {
|
||||
this.throttleMutex.Lock()
|
||||
defer this.throttleMutex.Unlock()
|
||||
this.controlReplicasLagResult = *lagResult
|
||||
if lagResult == nil {
|
||||
this.controlReplicasLagResult = *mysql.NewNoReplicationLagResult()
|
||||
} else {
|
||||
this.controlReplicasLagResult = *lagResult
|
||||
}
|
||||
}
|
||||
|
||||
func (this *MigrationContext) GetThrottleControlReplicaKeys() *mysql.InstanceKeyMap {
|
||||
|
@ -242,5 +242,5 @@ func main() {
|
||||
migrator.ExecOnFailureHook()
|
||||
log.Fatale(err)
|
||||
}
|
||||
log.Info("Done")
|
||||
fmt.Fprintf(os.Stdout, "# Done\n")
|
||||
}
|
||||
|
@ -70,14 +70,15 @@ func (this *Applier) InitDBConnections() (err error) {
|
||||
if err := this.readTableColumns(); err != nil {
|
||||
return err
|
||||
}
|
||||
log.Infof("Applier initiated on %+v, version %+v", this.connectionConfig.ImpliedKey, this.migrationContext.ApplierMySQLVersion)
|
||||
return nil
|
||||
}
|
||||
|
||||
// validateConnection issues a simple can-connect to MySQL
|
||||
func (this *Applier) validateConnection(db *gosql.DB) error {
|
||||
query := `select @@global.port`
|
||||
query := `select @@global.port, @@global.version`
|
||||
var port int
|
||||
if err := db.QueryRow(query).Scan(&port); err != nil {
|
||||
if err := db.QueryRow(query).Scan(&port, &this.migrationContext.ApplierMySQLVersion); err != nil {
|
||||
return err
|
||||
}
|
||||
if port != this.connectionConfig.Key.Port {
|
||||
|
@ -60,6 +60,7 @@ func (this *Inspector) InitDBConnections() (err error) {
|
||||
if err := this.applyBinlogFormat(); err != nil {
|
||||
return err
|
||||
}
|
||||
log.Infof("Inspector initiated on %+v, version %+v", this.connectionConfig.ImpliedKey, this.migrationContext.InspectorMySQLVersion)
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -168,9 +169,9 @@ func (this *Inspector) inspectOriginalAndGhostTables() (err error) {
|
||||
|
||||
// validateConnection issues a simple can-connect to MySQL
|
||||
func (this *Inspector) validateConnection() error {
|
||||
query := `select @@global.port`
|
||||
query := `select @@global.port, @@global.version`
|
||||
var port int
|
||||
if err := this.db.QueryRow(query).Scan(&port); err != nil {
|
||||
if err := this.db.QueryRow(query).Scan(&port, &this.migrationContext.InspectorMySQLVersion); err != nil {
|
||||
return err
|
||||
}
|
||||
if port != this.connectionConfig.Key.Port {
|
||||
|
@ -791,6 +791,12 @@ func (this *Migrator) printMigrationStatusHint(writers ...io.Writer) {
|
||||
throttleQuery,
|
||||
))
|
||||
}
|
||||
if throttleControlReplicaKeys := this.migrationContext.GetThrottleControlReplicaKeys(); throttleControlReplicaKeys.Len() > 0 {
|
||||
fmt.Fprintln(w, fmt.Sprintf("# throttle-control-replicas count: %+v",
|
||||
throttleControlReplicaKeys.Len(),
|
||||
))
|
||||
}
|
||||
|
||||
if this.migrationContext.PostponeCutOverFlagFile != "" {
|
||||
setIndicator := ""
|
||||
if base.FileExists(this.migrationContext.PostponeCutOverFlagFile) {
|
||||
@ -970,7 +976,9 @@ func (this *Migrator) initiateThrottler() error {
|
||||
|
||||
go this.throttler.initiateThrottlerCollection(this.firstThrottlingCollected)
|
||||
log.Infof("Waiting for first throttle metrics to be collected")
|
||||
<-this.firstThrottlingCollected
|
||||
<-this.firstThrottlingCollected // replication lag
|
||||
<-this.firstThrottlingCollected // other metrics
|
||||
log.Infof("First throttle metrics collected")
|
||||
go this.throttler.initiateThrottlerChecks()
|
||||
|
||||
return nil
|
||||
|
@ -126,7 +126,7 @@ func (this *Server) applyServerCommand(command string, writer *bufio.Writer) (pr
|
||||
if len(tokens) > 1 {
|
||||
arg = strings.TrimSpace(tokens[1])
|
||||
}
|
||||
|
||||
argIsQuestion := (arg == "?")
|
||||
throttleHint := "# Note: you may only throttle for as long as your binary logs are not purged\n"
|
||||
|
||||
if err := this.hooksExecutor.onInteractiveCommand(command); err != nil {
|
||||
@ -152,6 +152,7 @@ no-throttle # End forced throttling (other throttling m
|
||||
unpostpone # Bail out a cut-over postpone; proceed to cut-over
|
||||
panic # panic and quit without cleanup
|
||||
help # This message
|
||||
- use '?' (question mark) as argument to get info rather than set. e.g. "max-load=?" will just print out current max-load.
|
||||
`)
|
||||
}
|
||||
case "sup":
|
||||
@ -160,6 +161,10 @@ help # This message
|
||||
return ForcePrintStatusAndHintRule, nil
|
||||
case "chunk-size":
|
||||
{
|
||||
if argIsQuestion {
|
||||
fmt.Fprintf(writer, "%+v\n", atomic.LoadInt64(&this.migrationContext.ChunkSize))
|
||||
return NoPrintStatusRule, nil
|
||||
}
|
||||
if chunkSize, err := strconv.Atoi(arg); err != nil {
|
||||
return NoPrintStatusRule, err
|
||||
} else {
|
||||
@ -169,6 +174,10 @@ help # This message
|
||||
}
|
||||
case "max-lag-millis":
|
||||
{
|
||||
if argIsQuestion {
|
||||
fmt.Fprintf(writer, "%+v\n", atomic.LoadInt64(&this.migrationContext.MaxLagMillisecondsThrottleThreshold))
|
||||
return NoPrintStatusRule, nil
|
||||
}
|
||||
if maxLagMillis, err := strconv.Atoi(arg); err != nil {
|
||||
return NoPrintStatusRule, err
|
||||
} else {
|
||||
@ -182,6 +191,10 @@ help # This message
|
||||
}
|
||||
case "nice-ratio":
|
||||
{
|
||||
if argIsQuestion {
|
||||
fmt.Fprintf(writer, "%+v\n", this.migrationContext.GetNiceRatio())
|
||||
return NoPrintStatusRule, nil
|
||||
}
|
||||
if niceRatio, err := strconv.ParseFloat(arg, 64); err != nil {
|
||||
return NoPrintStatusRule, err
|
||||
} else {
|
||||
@ -191,6 +204,11 @@ help # This message
|
||||
}
|
||||
case "max-load":
|
||||
{
|
||||
if argIsQuestion {
|
||||
maxLoad := this.migrationContext.GetMaxLoad()
|
||||
fmt.Fprintf(writer, "%s\n", maxLoad.String())
|
||||
return NoPrintStatusRule, nil
|
||||
}
|
||||
if err := this.migrationContext.ReadMaxLoad(arg); err != nil {
|
||||
return NoPrintStatusRule, err
|
||||
}
|
||||
@ -198,6 +216,11 @@ help # This message
|
||||
}
|
||||
case "critical-load":
|
||||
{
|
||||
if argIsQuestion {
|
||||
criticalLoad := this.migrationContext.GetCriticalLoad()
|
||||
fmt.Fprintf(writer, "%s\n", criticalLoad.String())
|
||||
return NoPrintStatusRule, nil
|
||||
}
|
||||
if err := this.migrationContext.ReadCriticalLoad(arg); err != nil {
|
||||
return NoPrintStatusRule, err
|
||||
}
|
||||
@ -205,12 +228,20 @@ help # This message
|
||||
}
|
||||
case "throttle-query":
|
||||
{
|
||||
if argIsQuestion {
|
||||
fmt.Fprintf(writer, "%+v\n", this.migrationContext.GetThrottleQuery())
|
||||
return NoPrintStatusRule, nil
|
||||
}
|
||||
this.migrationContext.SetThrottleQuery(arg)
|
||||
fmt.Fprintf(writer, throttleHint)
|
||||
return ForcePrintStatusAndHintRule, nil
|
||||
}
|
||||
case "throttle-control-replicas":
|
||||
{
|
||||
if argIsQuestion {
|
||||
fmt.Fprintf(writer, "%s\n", this.migrationContext.GetThrottleControlReplicaKeys().ToCommaDelimitedList())
|
||||
return NoPrintStatusRule, nil
|
||||
}
|
||||
if err := this.migrationContext.ReadThrottleControlReplicaKeys(arg); err != nil {
|
||||
return NoPrintStatusRule, err
|
||||
}
|
||||
|
@ -84,21 +84,38 @@ func (this *Throttler) parseChangelogHeartbeat(heartbeatValue string) (err error
|
||||
}
|
||||
}
|
||||
|
||||
// collectHeartbeat reads the latest changelog heartbeat value
|
||||
func (this *Throttler) collectHeartbeat() {
|
||||
ticker := time.Tick(time.Duration(this.migrationContext.HeartbeatIntervalMilliseconds) * time.Millisecond)
|
||||
for range ticker {
|
||||
go func() error {
|
||||
if atomic.LoadInt64(&this.migrationContext.CleanupImminentFlag) > 0 {
|
||||
return nil
|
||||
// collectReplicationLag reads the latest changelog heartbeat value
|
||||
func (this *Throttler) collectReplicationLag(firstThrottlingCollected chan<- bool) {
|
||||
collectFunc := func() error {
|
||||
if atomic.LoadInt64(&this.migrationContext.CleanupImminentFlag) > 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
if this.migrationContext.TestOnReplica || this.migrationContext.MigrateOnReplica {
|
||||
// when running on replica, the heartbeat injection is also done on the replica.
|
||||
// This means we will always get a good heartbeat value.
|
||||
// When runnign on replica, we should instead check the `SHOW SLAVE STATUS` output.
|
||||
if lag, err := mysql.GetReplicationLag(this.inspector.connectionConfig); err != nil {
|
||||
return log.Errore(err)
|
||||
} else {
|
||||
atomic.StoreInt64(&this.migrationContext.CurrentLag, int64(lag))
|
||||
}
|
||||
} else {
|
||||
if heartbeatValue, err := this.inspector.readChangelogState("heartbeat"); err != nil {
|
||||
return log.Errore(err)
|
||||
} else {
|
||||
this.parseChangelogHeartbeat(heartbeatValue)
|
||||
}
|
||||
return nil
|
||||
}()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
collectFunc()
|
||||
firstThrottlingCollected <- true
|
||||
|
||||
ticker := time.Tick(time.Duration(this.migrationContext.HeartbeatIntervalMilliseconds) * time.Millisecond)
|
||||
for range ticker {
|
||||
go collectFunc()
|
||||
}
|
||||
}
|
||||
|
||||
@ -114,6 +131,7 @@ func (this *Throttler) collectControlReplicasLag() {
|
||||
|
||||
readReplicaLag := func(connectionConfig *mysql.ConnectionConfig) (lag time.Duration, err error) {
|
||||
dbUri := connectionConfig.GetDBUri("information_schema")
|
||||
|
||||
var heartbeatValue string
|
||||
if db, _, err := sqlutils.GetDB(dbUri); err != nil {
|
||||
return lag, err
|
||||
@ -158,9 +176,7 @@ func (this *Throttler) collectControlReplicasLag() {
|
||||
// No need to read lag
|
||||
return
|
||||
}
|
||||
if result := readControlReplicasLag(); result != nil {
|
||||
this.migrationContext.SetControlReplicasLagResult(result)
|
||||
}
|
||||
this.migrationContext.SetControlReplicasLagResult(readControlReplicasLag())
|
||||
}
|
||||
aggressiveTicker := time.Tick(100 * time.Millisecond)
|
||||
relaxedFactor := 10
|
||||
@ -272,13 +288,14 @@ func (this *Throttler) collectGeneralThrottleMetrics() error {
|
||||
// that may affect throttling. There are several components, all running independently,
|
||||
// that collect such metrics.
|
||||
func (this *Throttler) initiateThrottlerCollection(firstThrottlingCollected chan<- bool) {
|
||||
go this.collectHeartbeat()
|
||||
go this.collectReplicationLag(firstThrottlingCollected)
|
||||
go this.collectControlReplicasLag()
|
||||
|
||||
go func() {
|
||||
throttlerMetricsTick := time.Tick(1 * time.Second)
|
||||
this.collectGeneralThrottleMetrics()
|
||||
firstThrottlingCollected <- true
|
||||
|
||||
throttlerMetricsTick := time.Tick(1 * time.Second)
|
||||
for range throttlerMetricsTick {
|
||||
this.collectGeneralThrottleMetrics()
|
||||
}
|
||||
|
@ -22,6 +22,14 @@ type ReplicationLagResult struct {
|
||||
Err error
|
||||
}
|
||||
|
||||
func NewNoReplicationLagResult() *ReplicationLagResult {
|
||||
return &ReplicationLagResult{Lag: 0, Err: nil}
|
||||
}
|
||||
|
||||
func (this *ReplicationLagResult) HasLag() bool {
|
||||
return this.Lag > 0
|
||||
}
|
||||
|
||||
// GetReplicationLag returns replication lag for a given connection config; either by explicit query
|
||||
// or via SHOW SLAVE STATUS
|
||||
func GetReplicationLag(connectionConfig *ConnectionConfig) (replicationLag time.Duration, err error) {
|
||||
@ -32,9 +40,11 @@ func GetReplicationLag(connectionConfig *ConnectionConfig) (replicationLag time.
|
||||
}
|
||||
|
||||
err = sqlutils.QueryRowsMap(db, `show slave status`, func(m sqlutils.RowMap) error {
|
||||
slaveIORunning := m.GetString("Slave_IO_Running")
|
||||
slaveSQLRunning := m.GetString("Slave_SQL_Running")
|
||||
secondsBehindMaster := m.GetNullInt64("Seconds_Behind_Master")
|
||||
if !secondsBehindMaster.Valid {
|
||||
return fmt.Errorf("replication not running")
|
||||
return fmt.Errorf("replication not running; Slave_IO_Running=%+v, Slave_SQL_Running=%+v", slaveIORunning, slaveSQLRunning)
|
||||
}
|
||||
replicationLag = time.Duration(secondsBehindMaster.Int64) * time.Second
|
||||
return nil
|
||||
|
@ -88,7 +88,7 @@ test_single() {
|
||||
--throttle-query='select timestampdiff(second, min(last_update), now()) < 5 from _gh_ost_test_ghc' \
|
||||
--serve-socket-file=/tmp/gh-ost.test.sock \
|
||||
--initially-drop-socket-file \
|
||||
--postpone-cut-over-flag-file="" \
|
||||
--postpone-cut-over-flag-file=/tmp/gh-ost.test.postpone.flag \
|
||||
--test-on-replica \
|
||||
--default-retries=1 \
|
||||
--verbose \
|
||||
|
Loading…
Reference in New Issue
Block a user