diff --git a/go/cmd/gh-ost/main.go b/go/cmd/gh-ost/main.go index bb8b8d2..58e231e 100644 --- a/go/cmd/gh-ost/main.go +++ b/go/cmd/gh-ost/main.go @@ -91,7 +91,7 @@ func main() { replicationLagQuery := flag.String("replication-lag-query", "", "Query that detects replication lag in seconds. Result can be a floating point (by default gh-ost issues SHOW SLAVE STATUS and reads Seconds_behind_master). If you're using pt-heartbeat, query would be something like: SELECT ROUND(UNIX_TIMESTAMP() - MAX(UNIX_TIMESTAMP(ts))) AS delay FROM my_schema.heartbeat") throttleControlReplicas := flag.String("throttle-control-replicas", "", "List of replicas on which to check for lag; comma delimited. Example: myhost1.com:3306,myhost2.com,myhost3.com:3307") throttleQuery := flag.String("throttle-query", "", "when given, issued (every second) to check if operation should throttle. Expecting to return zero for no-throttle, >0 for throttle. Query is issued on the migrated server. Make sure this query is lightweight") - heartbeatIntervalMillis := flag.Int64("heartbeat-interval-millis", 500, "how frequently would gh-ost inject a heartbeat value") + heartbeatIntervalMillis := flag.Int64("heartbeat-interval-millis", 100, "how frequently would gh-ost inject a heartbeat value") flag.StringVar(&migrationContext.ThrottleFlagFile, "throttle-flag-file", "", "operation pauses when this file exists; hint: use a file that is specific to the table being altered") flag.StringVar(&migrationContext.ThrottleAdditionalFlagFile, "throttle-additional-flag-file", "/tmp/gh-ost.throttle", "operation pauses when this file exists; hint: keep default, use for throttling multiple gh-ost operations") flag.StringVar(&migrationContext.PostponeCutOverFlagFile, "postpone-cut-over-flag-file", "", "while this file exists, migration will postpone the final stage of swapping tables, and will keep on syncing the ghost table. Cut-over/swapping would be ready to perform the moment the file is deleted.") diff --git a/go/logic/inspect.go b/go/logic/inspect.go index 19db8a3..6c81e99 100644 --- a/go/logic/inspect.go +++ b/go/logic/inspect.go @@ -680,18 +680,18 @@ func (this *Inspector) showCreateTable(tableName string) (createTableStatement s } // readChangelogState reads changelog hints -func (this *Inspector) readChangelogState() (map[string]string, error) { +func (this *Inspector) readChangelogState(hint string) (string, error) { query := fmt.Sprintf(` - select hint, value from %s.%s where id <= 255 + select hint, value from %s.%s where hint = ? and id <= 255 `, sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.GetChangelogTableName()), ) - result := make(map[string]string) + result := "" err := sqlutils.QueryRowsMap(this.db, query, func(m sqlutils.RowMap) error { - result[m.GetString("hint")] = m.GetString("value") + result = m.GetString("value") return nil - }) + }, hint) return result, err } @@ -702,10 +702,8 @@ func (this *Inspector) getMasterConnectionConfig() (applierConfig *mysql.Connect } func (this *Inspector) getReplicationLag() (replicationLag time.Duration, err error) { - replicationLagQuery := this.migrationContext.GetReplicationLagQuery() replicationLag, err = mysql.GetReplicationLag( this.migrationContext.InspectorConnectionConfig, - replicationLagQuery, ) return replicationLag, err } diff --git a/go/logic/throttler.go b/go/logic/throttler.go index 482087c..1635b5e 100644 --- a/go/logic/throttler.go +++ b/go/logic/throttler.go @@ -12,7 +12,9 @@ import ( "github.com/github/gh-ost/go/base" "github.com/github/gh-ost/go/mysql" + "github.com/github/gh-ost/go/sql" "github.com/outbrain/golib/log" + "github.com/outbrain/golib/sqlutils" ) // Throttler collects metrics related to throttling and makes informed decisison @@ -62,15 +64,24 @@ func (this *Throttler) shouldThrottle() (result bool, reason string, reasonHint return false, "", base.NoThrottleReasonHint } -// parseChangelogHeartbeat is called when a heartbeat event is intercepted -func (this *Throttler) parseChangelogHeartbeat(heartbeatValue string) (err error) { +// parseChangelogHeartbeat parses a string timestamp and deduces replication lag +func parseChangelogHeartbeat(heartbeatValue string) (lag time.Duration, err error) { heartbeatTime, err := time.Parse(time.RFC3339Nano, heartbeatValue) if err != nil { - return log.Errore(err) + return lag, err + } + lag = time.Since(heartbeatTime) + return lag, nil +} + +// parseChangelogHeartbeat parses a string timestamp and deduces replication lag +func (this *Throttler) parseChangelogHeartbeat(heartbeatValue string) (err error) { + if lag, err := parseChangelogHeartbeat(heartbeatValue); err != nil { + return log.Errore(err) + } else { + atomic.StoreInt64(&this.migrationContext.CurrentLag, int64(lag)) + return nil } - lag := time.Since(heartbeatTime) - atomic.StoreInt64(&this.migrationContext.CurrentLag, int64(lag)) - return nil } // collectHeartbeat reads the latest changelog heartbeat value @@ -81,11 +92,9 @@ func (this *Throttler) collectHeartbeat() { if atomic.LoadInt64(&this.migrationContext.CleanupImminentFlag) > 0 { return nil } - changelogState, err := this.inspector.readChangelogState() - if err != nil { + if heartbeatValue, err := this.inspector.readChangelogState("heartbeat"); err != nil { return log.Errore(err) - } - if heartbeatValue, ok := changelogState["heartbeat"]; ok { + } else { this.parseChangelogHeartbeat(heartbeatValue) } return nil @@ -95,23 +104,68 @@ func (this *Throttler) collectHeartbeat() { // collectControlReplicasLag polls all the control replicas to get maximum lag value func (this *Throttler) collectControlReplicasLag() { - readControlReplicasLag := func(replicationLagQuery string) error { - if (this.migrationContext.TestOnReplica || this.migrationContext.MigrateOnReplica) && (atomic.LoadInt64(&this.migrationContext.AllEventsUpToLockProcessedInjectedFlag) > 0) { - return nil + + replicationLagQuery := fmt.Sprintf(` + select value from %s.%s where hint = 'heartbeat' and id <= 255 + `, + sql.EscapeName(this.migrationContext.DatabaseName), + sql.EscapeName(this.migrationContext.GetChangelogTableName()), + ) + + readReplicaLag := func(connectionConfig *mysql.ConnectionConfig) (lag time.Duration, err error) { + dbUri := connectionConfig.GetDBUri("information_schema") + var heartbeatValue string + if db, _, err := sqlutils.GetDB(dbUri); err != nil { + return lag, err + } else if err = db.QueryRow(replicationLagQuery).Scan(&heartbeatValue); err != nil { + return lag, err + } + lag, err = parseChangelogHeartbeat(heartbeatValue) + return lag, err + } + + readControlReplicasLag := func() (result *mysql.ReplicationLagResult) { + instanceKeyMap := this.migrationContext.GetThrottleControlReplicaKeys() + if instanceKeyMap.Len() == 0 { + return result + } + lagResults := make(chan *mysql.ReplicationLagResult, instanceKeyMap.Len()) + for replicaKey := range *instanceKeyMap { + connectionConfig := this.migrationContext.InspectorConnectionConfig.Duplicate() + connectionConfig.Key = replicaKey + + lagResult := &mysql.ReplicationLagResult{Key: connectionConfig.Key} + go func() { + lagResult.Lag, lagResult.Err = readReplicaLag(connectionConfig) + lagResults <- lagResult + }() + } + for range *instanceKeyMap { + lagResult := <-lagResults + if result == nil { + result = lagResult + } else if lagResult.Err != nil { + result = lagResult + } else if lagResult.Lag.Nanoseconds() > result.Lag.Nanoseconds() { + result = lagResult + } + } + return result + } + + checkControlReplicasLag := func() { + if (this.migrationContext.TestOnReplica || this.migrationContext.MigrateOnReplica) && (atomic.LoadInt64(&this.migrationContext.AllEventsUpToLockProcessedInjectedFlag) > 0) { + // No need to read lag + return + } + if result := readControlReplicasLag(); result != nil { + this.migrationContext.SetControlReplicasLagResult(result) } - lagResult := mysql.GetMaxReplicationLag( - this.migrationContext.InspectorConnectionConfig, - this.migrationContext.GetThrottleControlReplicaKeys(), - replicationLagQuery, - ) - this.migrationContext.SetControlReplicasLagResult(lagResult) - return nil } aggressiveTicker := time.Tick(100 * time.Millisecond) relaxedFactor := 10 counter := 0 shouldReadLagAggressively := false - replicationLagQuery := "" for range aggressiveTicker { if counter%relaxedFactor == 0 { @@ -119,12 +173,11 @@ func (this *Throttler) collectControlReplicasLag() { // do not typically change at all throughout the migration, but nonetheless we check them. counter = 0 maxLagMillisecondsThrottleThreshold := atomic.LoadInt64(&this.migrationContext.MaxLagMillisecondsThrottleThreshold) - replicationLagQuery = this.migrationContext.GetReplicationLagQuery() - shouldReadLagAggressively = (replicationLagQuery != "" && maxLagMillisecondsThrottleThreshold < 1000) + shouldReadLagAggressively = (maxLagMillisecondsThrottleThreshold < 1000) } if counter == 0 || shouldReadLagAggressively { // We check replication lag every so often, or if we wish to be aggressive - readControlReplicasLag(replicationLagQuery) + checkControlReplicasLag() } counter++ } diff --git a/go/mysql/utils.go b/go/mysql/utils.go index 6b70c7c..bfd3c24 100644 --- a/go/mysql/utils.go +++ b/go/mysql/utils.go @@ -24,19 +24,13 @@ type ReplicationLagResult struct { // GetReplicationLag returns replication lag for a given connection config; either by explicit query // or via SHOW SLAVE STATUS -func GetReplicationLag(connectionConfig *ConnectionConfig, replicationLagQuery string) (replicationLag time.Duration, err error) { +func GetReplicationLag(connectionConfig *ConnectionConfig) (replicationLag time.Duration, err error) { dbUri := connectionConfig.GetDBUri("information_schema") var db *gosql.DB if db, _, err = sqlutils.GetDB(dbUri); err != nil { return replicationLag, err } - if replicationLagQuery != "" { - var floatLag float64 - err = db.QueryRow(replicationLagQuery).Scan(&floatLag) - return time.Duration(int64(floatLag*1000)) * time.Millisecond, err - } - // No explicit replication lag query. err = sqlutils.QueryRowsMap(db, `show slave status`, func(m sqlutils.RowMap) error { secondsBehindMaster := m.GetNullInt64("Seconds_Behind_Master") if !secondsBehindMaster.Valid { @@ -48,34 +42,6 @@ func GetReplicationLag(connectionConfig *ConnectionConfig, replicationLagQuery s return replicationLag, err } -// GetMaxReplicationLag concurrently checks for replication lag on given list of instance keys, -// each via GetReplicationLag -func GetMaxReplicationLag(baseConnectionConfig *ConnectionConfig, instanceKeyMap *InstanceKeyMap, replicationLagQuery string) (result *ReplicationLagResult) { - result = &ReplicationLagResult{Lag: 0} - if instanceKeyMap.Len() == 0 { - return result - } - lagResults := make(chan *ReplicationLagResult, instanceKeyMap.Len()) - for key := range *instanceKeyMap { - connectionConfig := baseConnectionConfig.Duplicate() - connectionConfig.Key = key - result := &ReplicationLagResult{Key: connectionConfig.Key} - go func() { - result.Lag, result.Err = GetReplicationLag(connectionConfig, replicationLagQuery) - lagResults <- result - }() - } - for range *instanceKeyMap { - lagResult := <-lagResults - if lagResult.Err != nil { - result = lagResult - } else if lagResult.Lag.Nanoseconds() > result.Lag.Nanoseconds() { - result = lagResult - } - } - return result -} - func GetMasterKeyFromSlaveStatus(connectionConfig *ConnectionConfig) (masterKey *InstanceKey, err error) { currentUri := connectionConfig.GetDBUri("information_schema") db, _, err := sqlutils.GetDB(currentUri)