Merge branch 'master' into batch-apply-dml-events

This commit is contained in:
Shlomi Noach 2017-01-04 08:37:08 +02:00 committed by GitHub
commit 220bf83a5b
10 changed files with 95 additions and 125 deletions

View File

@ -100,10 +100,7 @@ On a replication topology, this is perhaps the most important migration throttli
When using [Connect to replica, migrate on master](cheatsheet.md), this lag is primarily tested on the very replica `gh-ost` operates on. Lag is measured by checking the heartbeat events injected by `gh-ost` itself on the utility changelog table. That is, to measure this replica's lag, `gh-ost` doesn't need to issue `show slave status` nor have any external heartbeat mechanism.
When `--throttle-control-replicas` is provided, throttling also considers lag on specified hosts. Measuring lag on these hosts works as follows:
- If `--replication-lag-query` is provided, use the query, trust its result to indicate lag seconds (fraction, i.e. float, allowed)
- Otherwise, issue `show slave status` and read `Seconds_behind_master` (`1sec` granularity)
When `--throttle-control-replicas` is provided, throttling also considers lag on specified hosts. Lag measurements on listed hosts is done by querying `gh-ost`'s _changelog_ table, where `gh-ost` injects a heartbeat.
See also: [Sub-second replication lag throttling](subsecond-lag.md)

View File

@ -2,7 +2,7 @@
`gh-ost` is able to utilize sub-second replication lag measurements.
At GitHub, small replication lag is crucial, and we like to keep it below `1s` at all times. If you have similar concern, we strongly urge you to proceed to implement sub-second lag throttling.
At GitHub, small replication lag is crucial, and we like to keep it below `1s` at all times.
`gh-ost` will do sub-second throttling when `--max-lag-millis` is smaller than `1000`, i.e. smaller than `1sec`.
Replication lag is measured on:
@ -10,24 +10,10 @@ Replication lag is measured on:
- The "inspected" server (the server `gh-ost` connects to; replica is desired but not mandatory)
- The `throttle-control-replicas` list
For the inspected server, `gh-ost` uses an internal heartbeat mechanism. It injects heartbeat events onto the utility changelog table, then reads those events in the binary log, and compares times. This measurement is by default and by definition sub-second enabled.
In both cases, `gh-ost` uses an internal heartbeat mechanism. It injects heartbeat events onto the utility changelog table, then reads those entries on replicas, and compares times. This measurement is on by default and by definition supports sub-second resolution.
You can explicitly define how frequently will `gh-ost` inject heartbeat events, via `heartbeat-interval-millis`. You should set `heartbeat-interval-millis <= max-lag-millis`. It still works if not, but loses granularity and effect.
On the `throttle-control-replicas`, `gh-ost` only issues SQL queries, and does not attempt to read the binary log stream. Perhaps those other replicas don't have binary logs in the first place.
In earlier versions, the `--throttle-control-replicas` list was subjected to `1` second resolution or to 3rd party heartbeat injections such as `pt-heartbeat`. This is no longer the case. The argument `--replication-lag-query` has been deprecated and is no longer needed.
The standard way of getting replication lag on a replica is to issue `SHOW SLAVE STATUS`, then reading `Seconds_behind_master` value. But that value has a `1sec` granularity.
To be able to throttle on your production replicas fleet when replication lag exceeds a sub-second threshold, you must provide with a `replication-lag-query` that returns a sub-second resolution lag.
As a common example, many use [pt-heartbeat](https://www.percona.com/doc/percona-toolkit/2.2/pt-heartbeat.html) to inject heartbeat events on the master. You would issue something like:
/usr/bin/pt-heartbeat -- -D your_schema --create-table --update --replace --interval=0.1 --daemonize --pid ...
Note `--interval=0.1` to indicate `10` heartbeats per second.
You would then provide
gh-ost ... --replication-lag-query="select unix_timestamp(now(6)) - unix_timestamp(ts) as ghost_lag_check from your_schema.heartbeat order by ts desc limit 1"
Our production migrations use sub-second lag throttling and are able to keep our entire fleet of replicas well below `1sec` lag.
Our production migrations use sub-second lag throttling and are able to keep our entire fleet of replicas well below `1sec` lag. We use `--heartbeat-interval-millis=100` on our production migrations with a `--max-lag-millis` value of between `300` and `500`.

View File

@ -28,15 +28,7 @@ Otherwise you may specify your own list of replica servers you wish it to observ
- `--max-lag-millis`: maximum allowed lag; any controlled replica lagging more than this value will cause throttling to kick in. When all control replicas have smaller lag than indicated, operation resumes.
- `--replication-lag-query`: `gh-ost` will, by default, issue a `show slave status` query to find replication lag. However, this is a notoriously flaky value. If you're using your own `heartbeat` mechanism, e.g. via [`pt-heartbeat`](https://www.percona.com/doc/percona-toolkit/2.2/pt-heartbeat.html), you may provide your own custom query to return a single decimal (floating point) value indicating replication lag.
Example: `--replication-lag-query="SELECT UNIX_TIMESTAMP() - MAX(UNIX_TIMESTAMP(ts)) AS lag FROM mydb.heartbeat"`
We encourage you to use [sub-second replication lag throttling](subsecond-lag.md). Your query may then look like:
`--replication-lag-query="SELECT UNIX_TIMESTAMP(6) - MAX(UNIX_TIMESTAMP(ts)) AS lag FROM mydb.heartbeat"`
Note that you may dynamically change both `replication-lag-query` and the `throttle-control-replicas` list via [interactive commands](interactive-commands.md)
Note that you may dynamically change both `--max-lag-millis` and the `throttle-control-replicas` list via [interactive commands](interactive-commands.md)
#### Status thresholds

View File

@ -95,7 +95,6 @@ type MigrationContext struct {
ChunkSize int64
niceRatio float64
MaxLagMillisecondsThrottleThreshold int64
replicationLagQuery string
throttleControlReplicaKeys *mysql.InstanceKeyMap
ThrottleFlagFile string
ThrottleAdditionalFlagFile string
@ -468,23 +467,6 @@ func (this *MigrationContext) IsThrottled() (bool, string, ThrottleReasonHint) {
return this.isThrottled, this.throttleReason, this.throttleReasonHint
}
func (this *MigrationContext) GetReplicationLagQuery() string {
var query string
this.throttleMutex.Lock()
defer this.throttleMutex.Unlock()
query = this.replicationLagQuery
return query
}
func (this *MigrationContext) SetReplicationLagQuery(newQuery string) {
this.throttleMutex.Lock()
defer this.throttleMutex.Unlock()
this.replicationLagQuery = newQuery
}
func (this *MigrationContext) GetThrottleQuery() string {
var query string

View File

@ -89,10 +89,10 @@ func main() {
niceRatio := flag.Float64("nice-ratio", 0, "force being 'nice', imply sleep time per chunk time; range: [0.0..100.0]. Example values: 0 is aggressive. 1: for every 1ms spent copying rows, sleep additional 1ms (effectively doubling runtime); 0.7: for every 10ms spend in a rowcopy chunk, spend 7ms sleeping immediately after")
maxLagMillis := flag.Int64("max-lag-millis", 1500, "replication lag at which to throttle operation")
replicationLagQuery := flag.String("replication-lag-query", "", "Query that detects replication lag in seconds. Result can be a floating point (by default gh-ost issues SHOW SLAVE STATUS and reads Seconds_behind_master). If you're using pt-heartbeat, query would be something like: SELECT ROUND(UNIX_TIMESTAMP() - MAX(UNIX_TIMESTAMP(ts))) AS delay FROM my_schema.heartbeat")
replicationLagQuery := flag.String("replication-lag-query", "", "Deprecated. gh-ost uses an internal, subsecond resolution query")
throttleControlReplicas := flag.String("throttle-control-replicas", "", "List of replicas on which to check for lag; comma delimited. Example: myhost1.com:3306,myhost2.com,myhost3.com:3307")
throttleQuery := flag.String("throttle-query", "", "when given, issued (every second) to check if operation should throttle. Expecting to return zero for no-throttle, >0 for throttle. Query is issued on the migrated server. Make sure this query is lightweight")
heartbeatIntervalMillis := flag.Int64("heartbeat-interval-millis", 500, "how frequently would gh-ost inject a heartbeat value")
heartbeatIntervalMillis := flag.Int64("heartbeat-interval-millis", 100, "how frequently would gh-ost inject a heartbeat value")
flag.StringVar(&migrationContext.ThrottleFlagFile, "throttle-flag-file", "", "operation pauses when this file exists; hint: use a file that is specific to the table being altered")
flag.StringVar(&migrationContext.ThrottleAdditionalFlagFile, "throttle-additional-flag-file", "/tmp/gh-ost.throttle", "operation pauses when this file exists; hint: keep default, use for throttling multiple gh-ost operations")
flag.StringVar(&migrationContext.PostponeCutOverFlagFile, "postpone-cut-over-flag-file", "", "while this file exists, migration will postpone the final stage of swapping tables, and will keep on syncing the ghost table. Cut-over/swapping would be ready to perform the moment the file is deleted.")
@ -186,6 +186,9 @@ func main() {
if migrationContext.CliMasterPassword != "" && migrationContext.AssumeMasterHostname == "" {
log.Fatalf("--master-password requires --assume-master-host")
}
if *replicationLagQuery != "" {
log.Warningf("--replication-lag-query is deprecated")
}
switch *cutOver {
case "atomic", "default", "":
@ -223,7 +226,6 @@ func main() {
migrationContext.SetChunkSize(*chunkSize)
migrationContext.SetDMLBatchSize(*dmlBatchSize)
migrationContext.SetMaxLagMillisecondsThrottleThreshold(*maxLagMillis)
migrationContext.SetReplicationLagQuery(*replicationLagQuery)
migrationContext.SetThrottleQuery(*throttleQuery)
migrationContext.SetDefaultNumRetries(*defaultRetries)
migrationContext.ApplyCredentials()

View File

@ -680,18 +680,18 @@ func (this *Inspector) showCreateTable(tableName string) (createTableStatement s
}
// readChangelogState reads changelog hints
func (this *Inspector) readChangelogState() (map[string]string, error) {
func (this *Inspector) readChangelogState(hint string) (string, error) {
query := fmt.Sprintf(`
select hint, value from %s.%s where id <= 255
select hint, value from %s.%s where hint = ? and id <= 255
`,
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.GetChangelogTableName()),
)
result := make(map[string]string)
result := ""
err := sqlutils.QueryRowsMap(this.db, query, func(m sqlutils.RowMap) error {
result[m.GetString("hint")] = m.GetString("value")
result = m.GetString("value")
return nil
})
}, hint)
return result, err
}
@ -702,10 +702,8 @@ func (this *Inspector) getMasterConnectionConfig() (applierConfig *mysql.Connect
}
func (this *Inspector) getReplicationLag() (replicationLag time.Duration, err error) {
replicationLagQuery := this.migrationContext.GetReplicationLagQuery()
replicationLag, err = mysql.GetReplicationLag(
this.migrationContext.InspectorConnectionConfig,
replicationLagQuery,
)
return replicationLag, err
}

View File

@ -738,11 +738,6 @@ func (this *Migrator) printMigrationStatusHint(writers ...io.Writer) {
criticalLoad.String(),
this.migrationContext.GetNiceRatio(),
))
if replicationLagQuery := this.migrationContext.GetReplicationLagQuery(); replicationLagQuery != "" {
fmt.Fprintln(w, fmt.Sprintf("# replication-lag-query: %+v",
replicationLagQuery,
))
}
if this.migrationContext.ThrottleFlagFile != "" {
setIndicator := ""
if base.FileExists(this.migrationContext.ThrottleFlagFile) {

View File

@ -178,8 +178,7 @@ help # This message
}
case "replication-lag-query":
{
this.migrationContext.SetReplicationLagQuery(arg)
return ForcePrintStatusAndHintRule, nil
return NoPrintStatusRule, fmt.Errorf("replication-lag-query is deprecated. gh-ost uses an internal, subsecond resolution query")
}
case "nice-ratio":
{

View File

@ -12,7 +12,9 @@ import (
"github.com/github/gh-ost/go/base"
"github.com/github/gh-ost/go/mysql"
"github.com/github/gh-ost/go/sql"
"github.com/outbrain/golib/log"
"github.com/outbrain/golib/sqlutils"
)
// Throttler collects metrics related to throttling and makes informed decisison
@ -62,15 +64,24 @@ func (this *Throttler) shouldThrottle() (result bool, reason string, reasonHint
return false, "", base.NoThrottleReasonHint
}
// parseChangelogHeartbeat is called when a heartbeat event is intercepted
func (this *Throttler) parseChangelogHeartbeat(heartbeatValue string) (err error) {
// parseChangelogHeartbeat parses a string timestamp and deduces replication lag
func parseChangelogHeartbeat(heartbeatValue string) (lag time.Duration, err error) {
heartbeatTime, err := time.Parse(time.RFC3339Nano, heartbeatValue)
if err != nil {
return log.Errore(err)
return lag, err
}
lag = time.Since(heartbeatTime)
return lag, nil
}
// parseChangelogHeartbeat parses a string timestamp and deduces replication lag
func (this *Throttler) parseChangelogHeartbeat(heartbeatValue string) (err error) {
if lag, err := parseChangelogHeartbeat(heartbeatValue); err != nil {
return log.Errore(err)
} else {
atomic.StoreInt64(&this.migrationContext.CurrentLag, int64(lag))
return nil
}
lag := time.Since(heartbeatTime)
atomic.StoreInt64(&this.migrationContext.CurrentLag, int64(lag))
return nil
}
// collectHeartbeat reads the latest changelog heartbeat value
@ -81,11 +92,9 @@ func (this *Throttler) collectHeartbeat() {
if atomic.LoadInt64(&this.migrationContext.CleanupImminentFlag) > 0 {
return nil
}
changelogState, err := this.inspector.readChangelogState()
if err != nil {
if heartbeatValue, err := this.inspector.readChangelogState("heartbeat"); err != nil {
return log.Errore(err)
}
if heartbeatValue, ok := changelogState["heartbeat"]; ok {
} else {
this.parseChangelogHeartbeat(heartbeatValue)
}
return nil
@ -95,23 +104,68 @@ func (this *Throttler) collectHeartbeat() {
// collectControlReplicasLag polls all the control replicas to get maximum lag value
func (this *Throttler) collectControlReplicasLag() {
readControlReplicasLag := func(replicationLagQuery string) error {
if (this.migrationContext.TestOnReplica || this.migrationContext.MigrateOnReplica) && (atomic.LoadInt64(&this.migrationContext.AllEventsUpToLockProcessedInjectedFlag) > 0) {
return nil
replicationLagQuery := fmt.Sprintf(`
select value from %s.%s where hint = 'heartbeat' and id <= 255
`,
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.GetChangelogTableName()),
)
readReplicaLag := func(connectionConfig *mysql.ConnectionConfig) (lag time.Duration, err error) {
dbUri := connectionConfig.GetDBUri("information_schema")
var heartbeatValue string
if db, _, err := sqlutils.GetDB(dbUri); err != nil {
return lag, err
} else if err = db.QueryRow(replicationLagQuery).Scan(&heartbeatValue); err != nil {
return lag, err
}
lag, err = parseChangelogHeartbeat(heartbeatValue)
return lag, err
}
readControlReplicasLag := func() (result *mysql.ReplicationLagResult) {
instanceKeyMap := this.migrationContext.GetThrottleControlReplicaKeys()
if instanceKeyMap.Len() == 0 {
return result
}
lagResults := make(chan *mysql.ReplicationLagResult, instanceKeyMap.Len())
for replicaKey := range *instanceKeyMap {
connectionConfig := this.migrationContext.InspectorConnectionConfig.Duplicate()
connectionConfig.Key = replicaKey
lagResult := &mysql.ReplicationLagResult{Key: connectionConfig.Key}
go func() {
lagResult.Lag, lagResult.Err = readReplicaLag(connectionConfig)
lagResults <- lagResult
}()
}
for range *instanceKeyMap {
lagResult := <-lagResults
if result == nil {
result = lagResult
} else if lagResult.Err != nil {
result = lagResult
} else if lagResult.Lag.Nanoseconds() > result.Lag.Nanoseconds() {
result = lagResult
}
}
return result
}
checkControlReplicasLag := func() {
if (this.migrationContext.TestOnReplica || this.migrationContext.MigrateOnReplica) && (atomic.LoadInt64(&this.migrationContext.AllEventsUpToLockProcessedInjectedFlag) > 0) {
// No need to read lag
return
}
if result := readControlReplicasLag(); result != nil {
this.migrationContext.SetControlReplicasLagResult(result)
}
lagResult := mysql.GetMaxReplicationLag(
this.migrationContext.InspectorConnectionConfig,
this.migrationContext.GetThrottleControlReplicaKeys(),
replicationLagQuery,
)
this.migrationContext.SetControlReplicasLagResult(lagResult)
return nil
}
aggressiveTicker := time.Tick(100 * time.Millisecond)
relaxedFactor := 10
counter := 0
shouldReadLagAggressively := false
replicationLagQuery := ""
for range aggressiveTicker {
if counter%relaxedFactor == 0 {
@ -119,12 +173,11 @@ func (this *Throttler) collectControlReplicasLag() {
// do not typically change at all throughout the migration, but nonetheless we check them.
counter = 0
maxLagMillisecondsThrottleThreshold := atomic.LoadInt64(&this.migrationContext.MaxLagMillisecondsThrottleThreshold)
replicationLagQuery = this.migrationContext.GetReplicationLagQuery()
shouldReadLagAggressively = (replicationLagQuery != "" && maxLagMillisecondsThrottleThreshold < 1000)
shouldReadLagAggressively = (maxLagMillisecondsThrottleThreshold < 1000)
}
if counter == 0 || shouldReadLagAggressively {
// We check replication lag every so often, or if we wish to be aggressive
readControlReplicasLag(replicationLagQuery)
checkControlReplicasLag()
}
counter++
}

View File

@ -24,19 +24,13 @@ type ReplicationLagResult struct {
// GetReplicationLag returns replication lag for a given connection config; either by explicit query
// or via SHOW SLAVE STATUS
func GetReplicationLag(connectionConfig *ConnectionConfig, replicationLagQuery string) (replicationLag time.Duration, err error) {
func GetReplicationLag(connectionConfig *ConnectionConfig) (replicationLag time.Duration, err error) {
dbUri := connectionConfig.GetDBUri("information_schema")
var db *gosql.DB
if db, _, err = sqlutils.GetDB(dbUri); err != nil {
return replicationLag, err
}
if replicationLagQuery != "" {
var floatLag float64
err = db.QueryRow(replicationLagQuery).Scan(&floatLag)
return time.Duration(int64(floatLag*1000)) * time.Millisecond, err
}
// No explicit replication lag query.
err = sqlutils.QueryRowsMap(db, `show slave status`, func(m sqlutils.RowMap) error {
secondsBehindMaster := m.GetNullInt64("Seconds_Behind_Master")
if !secondsBehindMaster.Valid {
@ -48,34 +42,6 @@ func GetReplicationLag(connectionConfig *ConnectionConfig, replicationLagQuery s
return replicationLag, err
}
// GetMaxReplicationLag concurrently checks for replication lag on given list of instance keys,
// each via GetReplicationLag
func GetMaxReplicationLag(baseConnectionConfig *ConnectionConfig, instanceKeyMap *InstanceKeyMap, replicationLagQuery string) (result *ReplicationLagResult) {
result = &ReplicationLagResult{Lag: 0}
if instanceKeyMap.Len() == 0 {
return result
}
lagResults := make(chan *ReplicationLagResult, instanceKeyMap.Len())
for key := range *instanceKeyMap {
connectionConfig := baseConnectionConfig.Duplicate()
connectionConfig.Key = key
result := &ReplicationLagResult{Key: connectionConfig.Key}
go func() {
result.Lag, result.Err = GetReplicationLag(connectionConfig, replicationLagQuery)
lagResults <- result
}()
}
for range *instanceKeyMap {
lagResult := <-lagResults
if lagResult.Err != nil {
result = lagResult
} else if lagResult.Lag.Nanoseconds() > result.Lag.Nanoseconds() {
result = lagResult
}
}
return result
}
func GetMasterKeyFromSlaveStatus(connectionConfig *ConnectionConfig) (masterKey *InstanceKey, err error) {
currentUri := connectionConfig.GetDBUri("information_schema")
db, _, err := sqlutils.GetDB(currentUri)