Auto-merged master into interactive-command-question on deployment

This commit is contained in:
Shlomi Noach 2017-02-08 13:03:45 +02:00 committed by GitHub
commit 6ecf7cc0ee
7 changed files with 49 additions and 18 deletions

View File

@ -146,8 +146,12 @@ gh-ost --allow-master-master --assume-master-host=a.specific.master.com
Topologies using _tungsten replicator_ are peculiar in that the participating servers are not actually aware they are replicating. The _tungsten replicator_ looks just like another app issuing queries on those hosts. `gh-ost` is unable to identify that a server participates in a _tungsten_ topology. Topologies using _tungsten replicator_ are peculiar in that the participating servers are not actually aware they are replicating. The _tungsten replicator_ looks just like another app issuing queries on those hosts. `gh-ost` is unable to identify that a server participates in a _tungsten_ topology.
If you choose to migrate directly on master (see above), there's nothing special you need to do. If you choose to migrate via replica, then you must supply the identity of the master, and indicate this is a tungsten setup, as follows: If you choose to migrate directly on master (see above), there's nothing special you need to do.
If you choose to migrate via replica, then you need to make sure Tungsten is configured with log-slave-updates parameter (note this is different from MySQL's own log-slave-updates parameter), otherwise changes will not be in the replica's binlog, causing data to be corrupted after table swap. You must also supply the identity of the master, and indicate this is a tungsten setup, as follows:
``` ```
gh-ost --tungsten --assume-master-host=the.topology.master.com gh-ost --tungsten --assume-master-host=the.topology.master.com
``` ```
Also note that `--switch-to-rbr` does not work for a Tungsten setup as the replication process is external, so you need to make sure `binlog_format` is set to ROW before Tungsten Replicator connects to the server and starts applying events from the master.

View File

@ -135,7 +135,9 @@ type MigrationContext struct {
OriginalBinlogFormat string OriginalBinlogFormat string
OriginalBinlogRowImage string OriginalBinlogRowImage string
InspectorConnectionConfig *mysql.ConnectionConfig InspectorConnectionConfig *mysql.ConnectionConfig
InspectorMySQLVersion string
ApplierConnectionConfig *mysql.ConnectionConfig ApplierConnectionConfig *mysql.ConnectionConfig
ApplierMySQLVersion string
StartTime time.Time StartTime time.Time
RowCopyStartTime time.Time RowCopyStartTime time.Time
RowCopyEndTime time.Time RowCopyEndTime time.Time

View File

@ -70,14 +70,15 @@ func (this *Applier) InitDBConnections() (err error) {
if err := this.readTableColumns(); err != nil { if err := this.readTableColumns(); err != nil {
return err return err
} }
log.Infof("Applier initiated on %+v, version %+v", this.connectionConfig.ImpliedKey, this.migrationContext.ApplierMySQLVersion)
return nil return nil
} }
// validateConnection issues a simple can-connect to MySQL // validateConnection issues a simple can-connect to MySQL
func (this *Applier) validateConnection(db *gosql.DB) error { func (this *Applier) validateConnection(db *gosql.DB) error {
query := `select @@global.port` query := `select @@global.port, @@global.version`
var port int var port int
if err := db.QueryRow(query).Scan(&port); err != nil { if err := db.QueryRow(query).Scan(&port, &this.migrationContext.ApplierMySQLVersion); err != nil {
return err return err
} }
if port != this.connectionConfig.Key.Port { if port != this.connectionConfig.Key.Port {

View File

@ -60,6 +60,7 @@ func (this *Inspector) InitDBConnections() (err error) {
if err := this.applyBinlogFormat(); err != nil { if err := this.applyBinlogFormat(); err != nil {
return err return err
} }
log.Infof("Inspector initiated on %+v, version %+v", this.connectionConfig.ImpliedKey, this.migrationContext.InspectorMySQLVersion)
return nil return nil
} }
@ -168,9 +169,9 @@ func (this *Inspector) inspectOriginalAndGhostTables() (err error) {
// validateConnection issues a simple can-connect to MySQL // validateConnection issues a simple can-connect to MySQL
func (this *Inspector) validateConnection() error { func (this *Inspector) validateConnection() error {
query := `select @@global.port` query := `select @@global.port, @@global.version`
var port int var port int
if err := this.db.QueryRow(query).Scan(&port); err != nil { if err := this.db.QueryRow(query).Scan(&port, &this.migrationContext.InspectorMySQLVersion); err != nil {
return err return err
} }
if port != this.connectionConfig.Key.Port { if port != this.connectionConfig.Key.Port {

View File

@ -946,7 +946,9 @@ func (this *Migrator) initiateThrottler() error {
go this.throttler.initiateThrottlerCollection(this.firstThrottlingCollected) go this.throttler.initiateThrottlerCollection(this.firstThrottlingCollected)
log.Infof("Waiting for first throttle metrics to be collected") log.Infof("Waiting for first throttle metrics to be collected")
<-this.firstThrottlingCollected <-this.firstThrottlingCollected // replication lag
<-this.firstThrottlingCollected // other metrics
log.Infof("First throttle metrics collected")
go this.throttler.initiateThrottlerChecks() go this.throttler.initiateThrottlerChecks()
return nil return nil

View File

@ -84,21 +84,38 @@ func (this *Throttler) parseChangelogHeartbeat(heartbeatValue string) (err error
} }
} }
// collectHeartbeat reads the latest changelog heartbeat value // collectReplicationLag reads the latest changelog heartbeat value
func (this *Throttler) collectHeartbeat() { func (this *Throttler) collectReplicationLag(firstThrottlingCollected chan<- bool) {
ticker := time.Tick(time.Duration(this.migrationContext.HeartbeatIntervalMilliseconds) * time.Millisecond) collectFunc := func() error {
for range ticker { if atomic.LoadInt64(&this.migrationContext.CleanupImminentFlag) > 0 {
go func() error { return nil
if atomic.LoadInt64(&this.migrationContext.CleanupImminentFlag) > 0 { }
return nil
if this.migrationContext.TestOnReplica || this.migrationContext.MigrateOnReplica {
// when running on replica, the heartbeat injection is also done on the replica.
// This means we will always get a good heartbeat value.
// When runnign on replica, we should instead check the `SHOW SLAVE STATUS` output.
if lag, err := mysql.GetReplicationLag(this.inspector.connectionConfig); err != nil {
return log.Errore(err)
} else {
atomic.StoreInt64(&this.migrationContext.CurrentLag, int64(lag))
} }
} else {
if heartbeatValue, err := this.inspector.readChangelogState("heartbeat"); err != nil { if heartbeatValue, err := this.inspector.readChangelogState("heartbeat"); err != nil {
return log.Errore(err) return log.Errore(err)
} else { } else {
this.parseChangelogHeartbeat(heartbeatValue) this.parseChangelogHeartbeat(heartbeatValue)
} }
return nil }
}() return nil
}
collectFunc()
firstThrottlingCollected <- true
ticker := time.Tick(time.Duration(this.migrationContext.HeartbeatIntervalMilliseconds) * time.Millisecond)
for range ticker {
go collectFunc()
} }
} }
@ -114,6 +131,7 @@ func (this *Throttler) collectControlReplicasLag() {
readReplicaLag := func(connectionConfig *mysql.ConnectionConfig) (lag time.Duration, err error) { readReplicaLag := func(connectionConfig *mysql.ConnectionConfig) (lag time.Duration, err error) {
dbUri := connectionConfig.GetDBUri("information_schema") dbUri := connectionConfig.GetDBUri("information_schema")
var heartbeatValue string var heartbeatValue string
if db, _, err := sqlutils.GetDB(dbUri); err != nil { if db, _, err := sqlutils.GetDB(dbUri); err != nil {
return lag, err return lag, err
@ -272,13 +290,14 @@ func (this *Throttler) collectGeneralThrottleMetrics() error {
// that may affect throttling. There are several components, all running independently, // that may affect throttling. There are several components, all running independently,
// that collect such metrics. // that collect such metrics.
func (this *Throttler) initiateThrottlerCollection(firstThrottlingCollected chan<- bool) { func (this *Throttler) initiateThrottlerCollection(firstThrottlingCollected chan<- bool) {
go this.collectHeartbeat() go this.collectReplicationLag(firstThrottlingCollected)
go this.collectControlReplicasLag() go this.collectControlReplicasLag()
go func() { go func() {
throttlerMetricsTick := time.Tick(1 * time.Second)
this.collectGeneralThrottleMetrics() this.collectGeneralThrottleMetrics()
firstThrottlingCollected <- true firstThrottlingCollected <- true
throttlerMetricsTick := time.Tick(1 * time.Second)
for range throttlerMetricsTick { for range throttlerMetricsTick {
this.collectGeneralThrottleMetrics() this.collectGeneralThrottleMetrics()
} }

View File

@ -32,9 +32,11 @@ func GetReplicationLag(connectionConfig *ConnectionConfig) (replicationLag time.
} }
err = sqlutils.QueryRowsMap(db, `show slave status`, func(m sqlutils.RowMap) error { err = sqlutils.QueryRowsMap(db, `show slave status`, func(m sqlutils.RowMap) error {
slaveIORunning := m.GetString("Slave_IO_Running")
slaveSQLRunning := m.GetString("Slave_SQL_Running")
secondsBehindMaster := m.GetNullInt64("Seconds_Behind_Master") secondsBehindMaster := m.GetNullInt64("Seconds_Behind_Master")
if !secondsBehindMaster.Valid { if !secondsBehindMaster.Valid {
return fmt.Errorf("replication not running") return fmt.Errorf("replication not running; Slave_IO_Running=%+v, Slave_SQL_Running=%+v", slaveIORunning, slaveSQLRunning)
} }
replicationLag = time.Duration(secondsBehindMaster.Int64) * time.Second replicationLag = time.Duration(secondsBehindMaster.Int64) * time.Second
return nil return nil