Add --test-on-replica-manual-replication-control flag

This will wait indefinitely for the replication status to change.
This allows us to run test schema changes in RDS without needing
custom RDS commands in gh-ost.
This commit is contained in:
Paulo Bittencourt 2016-08-15 12:56:17 -04:00
parent 88eb2d6ee1
commit a62f9e0754
3 changed files with 48 additions and 12 deletions

View File

@ -85,6 +85,7 @@ type MigrationContext struct {
Noop bool Noop bool
TestOnReplica bool TestOnReplica bool
MigrateOnReplica bool MigrateOnReplica bool
ManualReplicationControl bool
OkToDropTable bool OkToDropTable bool
InitiallyDropOldTable bool InitiallyDropOldTable bool
InitiallyDropGhostTable bool InitiallyDropGhostTable bool

View File

@ -60,6 +60,7 @@ func main() {
flag.BoolVar(&migrationContext.SkipRenamedColumns, "skip-renamed-columns", false, "in case your `ALTER` statement renames columns, gh-ost will note that and offer its interpretation of the rename. By default gh-ost does not proceed to execute. This flag tells gh-ost to skip the renamed columns, i.e. to treat what gh-ost thinks are renamed columns as unrelated columns. NOTE: you may lose column data") flag.BoolVar(&migrationContext.SkipRenamedColumns, "skip-renamed-columns", false, "in case your `ALTER` statement renames columns, gh-ost will note that and offer its interpretation of the rename. By default gh-ost does not proceed to execute. This flag tells gh-ost to skip the renamed columns, i.e. to treat what gh-ost thinks are renamed columns as unrelated columns. NOTE: you may lose column data")
executeFlag := flag.Bool("execute", false, "actually execute the alter & migrate the table. Default is noop: do some tests and exit") executeFlag := flag.Bool("execute", false, "actually execute the alter & migrate the table. Default is noop: do some tests and exit")
testOnReplicaWithManualReplicationControl := flag.Bool("test-on-replica-manual-replication-control", false, "Same as --test-on-replica, but waits for replication to be stopped, instead of stopping it automatically. (Useful in RDS.)")
flag.BoolVar(&migrationContext.TestOnReplica, "test-on-replica", false, "Have the migration run on a replica, not on the master. At the end of migration replication is stopped, and tables are swapped and immediately swap-revert. Replication remains stopped and you can compare the two tables for building trust") flag.BoolVar(&migrationContext.TestOnReplica, "test-on-replica", false, "Have the migration run on a replica, not on the master. At the end of migration replication is stopped, and tables are swapped and immediately swap-revert. Replication remains stopped and you can compare the two tables for building trust")
flag.BoolVar(&migrationContext.MigrateOnReplica, "migrate-on-replica", false, "Have the migration run on a replica, not on the master. This will do the full migration on the replica including cut-over (as opposed to --test-on-replica)") flag.BoolVar(&migrationContext.MigrateOnReplica, "migrate-on-replica", false, "Have the migration run on a replica, not on the master. This will do the full migration on the replica including cut-over (as opposed to --test-on-replica)")
@ -149,6 +150,13 @@ func main() {
if migrationContext.SwitchToRowBinlogFormat && migrationContext.AssumeRBR { if migrationContext.SwitchToRowBinlogFormat && migrationContext.AssumeRBR {
log.Fatalf("--switch-to-rbr and --assume-rbr are mutually exclusive") log.Fatalf("--switch-to-rbr and --assume-rbr are mutually exclusive")
} }
if *testOnReplicaWithManualReplicationControl {
if migrationContext.TestOnReplica {
log.Fatalf("--test-on-replica-manual-replication-control and --test-on-replica are mutually exclusive")
}
migrationContext.TestOnReplica = true
migrationContext.ManualReplicationControl = true
}
switch *cutOver { switch *cutOver {
case "atomic", "default", "": case "atomic", "default", "":
migrationContext.CutOverType = base.CutOverAtomic migrationContext.CutOverType = base.CutOverAtomic

View File

@ -566,14 +566,41 @@ func (this *Applier) StartSlaveSQLThread() error {
return nil return nil
} }
func (this *Applier) isReplicationStopped() bool {
query := `show slave status`
replicationStopped := false
err := sqlutils.QueryRowsMap(this.db, query, func(rowMap sqlutils.RowMap) error {
replicationStopped = rowMap["Slave_IO_Running"].String == "No" && rowMap["Slave_SQL_Running"].String == "No"
return nil
})
if err != nil {
return false
}
return replicationStopped
}
// StopReplication is used by `--test-on-replica` and stops replication. // StopReplication is used by `--test-on-replica` and stops replication.
func (this *Applier) StopReplication() error { func (this *Applier) StopReplication() error {
if this.migrationContext.ManualReplicationControl {
for {
log.Info("Waiting for replication to stop...")
if this.isReplicationStopped() {
log.Info("Replication stopped.")
break
}
time.Sleep(5 * time.Second)
}
} else {
if err := this.StopSlaveIOThread(); err != nil { if err := this.StopSlaveIOThread(); err != nil {
return err return err
} }
if err := this.StopSlaveSQLThread(); err != nil { if err := this.StopSlaveSQLThread(); err != nil {
return err return err
} }
}
readBinlogCoordinates, executeBinlogCoordinates, err := mysql.GetReplicationBinlogCoordinates(this.db) readBinlogCoordinates, executeBinlogCoordinates, err := mysql.GetReplicationBinlogCoordinates(this.db)
if err != nil { if err != nil {
return err return err