Reading replication lag via _changelog_ table, also on control replicas

This commit is contained in:
Shlomi Noach 2016-12-26 21:31:35 +02:00
parent d13049b41e
commit fc831b0548
4 changed files with 84 additions and 67 deletions

View File

@ -91,7 +91,7 @@ func main() {
replicationLagQuery := flag.String("replication-lag-query", "", "Query that detects replication lag in seconds. Result can be a floating point (by default gh-ost issues SHOW SLAVE STATUS and reads Seconds_behind_master). If you're using pt-heartbeat, query would be something like: SELECT ROUND(UNIX_TIMESTAMP() - MAX(UNIX_TIMESTAMP(ts))) AS delay FROM my_schema.heartbeat")
throttleControlReplicas := flag.String("throttle-control-replicas", "", "List of replicas on which to check for lag; comma delimited. Example: myhost1.com:3306,myhost2.com,myhost3.com:3307")
throttleQuery := flag.String("throttle-query", "", "when given, issued (every second) to check if operation should throttle. Expecting to return zero for no-throttle, >0 for throttle. Query is issued on the migrated server. Make sure this query is lightweight")
heartbeatIntervalMillis := flag.Int64("heartbeat-interval-millis", 500, "how frequently would gh-ost inject a heartbeat value")
heartbeatIntervalMillis := flag.Int64("heartbeat-interval-millis", 100, "how frequently would gh-ost inject a heartbeat value")
flag.StringVar(&migrationContext.ThrottleFlagFile, "throttle-flag-file", "", "operation pauses when this file exists; hint: use a file that is specific to the table being altered")
flag.StringVar(&migrationContext.ThrottleAdditionalFlagFile, "throttle-additional-flag-file", "/tmp/gh-ost.throttle", "operation pauses when this file exists; hint: keep default, use for throttling multiple gh-ost operations")
flag.StringVar(&migrationContext.PostponeCutOverFlagFile, "postpone-cut-over-flag-file", "", "while this file exists, migration will postpone the final stage of swapping tables, and will keep on syncing the ghost table. Cut-over/swapping would be ready to perform the moment the file is deleted.")

View File

@ -680,18 +680,18 @@ func (this *Inspector) showCreateTable(tableName string) (createTableStatement s
}
// readChangelogState reads changelog hints
func (this *Inspector) readChangelogState() (map[string]string, error) {
func (this *Inspector) readChangelogState(hint string) (string, error) {
query := fmt.Sprintf(`
select hint, value from %s.%s where id <= 255
select hint, value from %s.%s where hint = ? and id <= 255
`,
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.GetChangelogTableName()),
)
result := make(map[string]string)
result := ""
err := sqlutils.QueryRowsMap(this.db, query, func(m sqlutils.RowMap) error {
result[m.GetString("hint")] = m.GetString("value")
result = m.GetString("value")
return nil
})
}, hint)
return result, err
}
@ -702,10 +702,8 @@ func (this *Inspector) getMasterConnectionConfig() (applierConfig *mysql.Connect
}
func (this *Inspector) getReplicationLag() (replicationLag time.Duration, err error) {
replicationLagQuery := this.migrationContext.GetReplicationLagQuery()
replicationLag, err = mysql.GetReplicationLag(
this.migrationContext.InspectorConnectionConfig,
replicationLagQuery,
)
return replicationLag, err
}

View File

@ -12,7 +12,9 @@ import (
"github.com/github/gh-ost/go/base"
"github.com/github/gh-ost/go/mysql"
"github.com/github/gh-ost/go/sql"
"github.com/outbrain/golib/log"
"github.com/outbrain/golib/sqlutils"
)
// Throttler collects metrics related to throttling and makes informed decisison
@ -62,15 +64,24 @@ func (this *Throttler) shouldThrottle() (result bool, reason string, reasonHint
return false, "", base.NoThrottleReasonHint
}
// parseChangelogHeartbeat is called when a heartbeat event is intercepted
func (this *Throttler) parseChangelogHeartbeat(heartbeatValue string) (err error) {
// parseChangelogHeartbeat parses a string timestamp and deduces replication lag
func parseChangelogHeartbeat(heartbeatValue string) (lag time.Duration, err error) {
heartbeatTime, err := time.Parse(time.RFC3339Nano, heartbeatValue)
if err != nil {
return log.Errore(err)
return lag, err
}
lag = time.Since(heartbeatTime)
return lag, nil
}
// parseChangelogHeartbeat parses a string timestamp and deduces replication lag
func (this *Throttler) parseChangelogHeartbeat(heartbeatValue string) (err error) {
if lag, err := parseChangelogHeartbeat(heartbeatValue); err != nil {
return log.Errore(err)
} else {
atomic.StoreInt64(&this.migrationContext.CurrentLag, int64(lag))
return nil
}
lag := time.Since(heartbeatTime)
atomic.StoreInt64(&this.migrationContext.CurrentLag, int64(lag))
return nil
}
// collectHeartbeat reads the latest changelog heartbeat value
@ -81,11 +92,9 @@ func (this *Throttler) collectHeartbeat() {
if atomic.LoadInt64(&this.migrationContext.CleanupImminentFlag) > 0 {
return nil
}
changelogState, err := this.inspector.readChangelogState()
if err != nil {
if heartbeatValue, err := this.inspector.readChangelogState("heartbeat"); err != nil {
return log.Errore(err)
}
if heartbeatValue, ok := changelogState["heartbeat"]; ok {
} else {
this.parseChangelogHeartbeat(heartbeatValue)
}
return nil
@ -95,23 +104,68 @@ func (this *Throttler) collectHeartbeat() {
// collectControlReplicasLag polls all the control replicas to get maximum lag value
func (this *Throttler) collectControlReplicasLag() {
readControlReplicasLag := func(replicationLagQuery string) error {
if (this.migrationContext.TestOnReplica || this.migrationContext.MigrateOnReplica) && (atomic.LoadInt64(&this.migrationContext.AllEventsUpToLockProcessedInjectedFlag) > 0) {
return nil
replicationLagQuery := fmt.Sprintf(`
select value from %s.%s where hint = 'heartbeat' and id <= 255
`,
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.GetChangelogTableName()),
)
readReplicaLag := func(connectionConfig *mysql.ConnectionConfig) (lag time.Duration, err error) {
dbUri := connectionConfig.GetDBUri("information_schema")
var heartbeatValue string
if db, _, err := sqlutils.GetDB(dbUri); err != nil {
return lag, err
} else if err = db.QueryRow(replicationLagQuery).Scan(&heartbeatValue); err != nil {
return lag, err
}
lag, err = parseChangelogHeartbeat(heartbeatValue)
return lag, err
}
readControlReplicasLag := func() (result *mysql.ReplicationLagResult) {
instanceKeyMap := this.migrationContext.GetThrottleControlReplicaKeys()
if instanceKeyMap.Len() == 0 {
return result
}
lagResults := make(chan *mysql.ReplicationLagResult, instanceKeyMap.Len())
for replicaKey := range *instanceKeyMap {
connectionConfig := this.migrationContext.InspectorConnectionConfig.Duplicate()
connectionConfig.Key = replicaKey
lagResult := &mysql.ReplicationLagResult{Key: connectionConfig.Key}
go func() {
lagResult.Lag, lagResult.Err = readReplicaLag(connectionConfig)
lagResults <- lagResult
}()
}
for range *instanceKeyMap {
lagResult := <-lagResults
if result == nil {
result = lagResult
} else if lagResult.Err != nil {
result = lagResult
} else if lagResult.Lag.Nanoseconds() > result.Lag.Nanoseconds() {
result = lagResult
}
}
return result
}
checkControlReplicasLag := func() {
if (this.migrationContext.TestOnReplica || this.migrationContext.MigrateOnReplica) && (atomic.LoadInt64(&this.migrationContext.AllEventsUpToLockProcessedInjectedFlag) > 0) {
// No need to read lag
return
}
if result := readControlReplicasLag(); result != nil {
this.migrationContext.SetControlReplicasLagResult(result)
}
lagResult := mysql.GetMaxReplicationLag(
this.migrationContext.InspectorConnectionConfig,
this.migrationContext.GetThrottleControlReplicaKeys(),
replicationLagQuery,
)
this.migrationContext.SetControlReplicasLagResult(lagResult)
return nil
}
aggressiveTicker := time.Tick(100 * time.Millisecond)
relaxedFactor := 10
counter := 0
shouldReadLagAggressively := false
replicationLagQuery := ""
for range aggressiveTicker {
if counter%relaxedFactor == 0 {
@ -119,12 +173,11 @@ func (this *Throttler) collectControlReplicasLag() {
// do not typically change at all throughout the migration, but nonetheless we check them.
counter = 0
maxLagMillisecondsThrottleThreshold := atomic.LoadInt64(&this.migrationContext.MaxLagMillisecondsThrottleThreshold)
replicationLagQuery = this.migrationContext.GetReplicationLagQuery()
shouldReadLagAggressively = (replicationLagQuery != "" && maxLagMillisecondsThrottleThreshold < 1000)
shouldReadLagAggressively = (maxLagMillisecondsThrottleThreshold < 1000)
}
if counter == 0 || shouldReadLagAggressively {
// We check replication lag every so often, or if we wish to be aggressive
readControlReplicasLag(replicationLagQuery)
checkControlReplicasLag()
}
counter++
}

View File

@ -24,19 +24,13 @@ type ReplicationLagResult struct {
// GetReplicationLag returns replication lag for a given connection config; either by explicit query
// or via SHOW SLAVE STATUS
func GetReplicationLag(connectionConfig *ConnectionConfig, replicationLagQuery string) (replicationLag time.Duration, err error) {
func GetReplicationLag(connectionConfig *ConnectionConfig) (replicationLag time.Duration, err error) {
dbUri := connectionConfig.GetDBUri("information_schema")
var db *gosql.DB
if db, _, err = sqlutils.GetDB(dbUri); err != nil {
return replicationLag, err
}
if replicationLagQuery != "" {
var floatLag float64
err = db.QueryRow(replicationLagQuery).Scan(&floatLag)
return time.Duration(int64(floatLag*1000)) * time.Millisecond, err
}
// No explicit replication lag query.
err = sqlutils.QueryRowsMap(db, `show slave status`, func(m sqlutils.RowMap) error {
secondsBehindMaster := m.GetNullInt64("Seconds_Behind_Master")
if !secondsBehindMaster.Valid {
@ -48,34 +42,6 @@ func GetReplicationLag(connectionConfig *ConnectionConfig, replicationLagQuery s
return replicationLag, err
}
// GetMaxReplicationLag concurrently checks for replication lag on given list of instance keys,
// each via GetReplicationLag
func GetMaxReplicationLag(baseConnectionConfig *ConnectionConfig, instanceKeyMap *InstanceKeyMap, replicationLagQuery string) (result *ReplicationLagResult) {
result = &ReplicationLagResult{Lag: 0}
if instanceKeyMap.Len() == 0 {
return result
}
lagResults := make(chan *ReplicationLagResult, instanceKeyMap.Len())
for key := range *instanceKeyMap {
connectionConfig := baseConnectionConfig.Duplicate()
connectionConfig.Key = key
result := &ReplicationLagResult{Key: connectionConfig.Key}
go func() {
result.Lag, result.Err = GetReplicationLag(connectionConfig, replicationLagQuery)
lagResults <- result
}()
}
for range *instanceKeyMap {
lagResult := <-lagResults
if lagResult.Err != nil {
result = lagResult
} else if lagResult.Lag.Nanoseconds() > result.Lag.Nanoseconds() {
result = lagResult
}
}
return result
}
func GetMasterKeyFromSlaveStatus(connectionConfig *ConnectionConfig) (masterKey *InstanceKey, err error) {
currentUri := connectionConfig.GetDBUri("information_schema")
db, _, err := sqlutils.GetDB(currentUri)