dynamic replication-lag-query
This commit is contained in:
parent
5d23b72955
commit
b53ee24a1f
2
build.sh
2
build.sh
@ -1,7 +1,7 @@
|
||||
#!/bin/bash
|
||||
#
|
||||
#
|
||||
RELEASE_VERSION="1.0.5"
|
||||
RELEASE_VERSION="1.0.6"
|
||||
|
||||
buildpath=/tmp/gh-ost
|
||||
target=gh-ost
|
||||
|
@ -65,11 +65,11 @@ type MigrationContext struct {
|
||||
ChunkSize int64
|
||||
NiceRatio int64
|
||||
MaxLagMillisecondsThrottleThreshold int64
|
||||
ReplictionLagQuery string
|
||||
ThrottleControlReplicaKeys *mysql.InstanceKeyMap
|
||||
replicationLagQuery string
|
||||
throttleControlReplicaKeys *mysql.InstanceKeyMap
|
||||
ThrottleFlagFile string
|
||||
ThrottleAdditionalFlagFile string
|
||||
ThrottleQuery string
|
||||
throttleQuery string
|
||||
ThrottleCommandedByUser int64
|
||||
maxLoad LoadMap
|
||||
criticalLoad LoadMap
|
||||
@ -159,7 +159,7 @@ func newMigrationContext() *MigrationContext {
|
||||
maxLoad: NewLoadMap(),
|
||||
criticalLoad: NewLoadMap(),
|
||||
throttleMutex: &sync.Mutex{},
|
||||
ThrottleControlReplicaKeys: mysql.NewInstanceKeyMap(),
|
||||
throttleControlReplicaKeys: mysql.NewInstanceKeyMap(),
|
||||
configMutex: &sync.Mutex{},
|
||||
pointOfInterestTimeMutex: &sync.Mutex{},
|
||||
ColumnRenameMap: make(map[string]string),
|
||||
@ -334,13 +334,30 @@ func (this *MigrationContext) IsThrottled() (bool, string) {
|
||||
return this.isThrottled, this.throttleReason
|
||||
}
|
||||
|
||||
func (this *MigrationContext) GetReplicationLagQuery() string {
|
||||
var query string
|
||||
|
||||
this.throttleMutex.Lock()
|
||||
defer this.throttleMutex.Unlock()
|
||||
|
||||
query = this.replicationLagQuery
|
||||
return query
|
||||
}
|
||||
|
||||
func (this *MigrationContext) SetReplicationLagQuery(newQuery string) {
|
||||
this.throttleMutex.Lock()
|
||||
defer this.throttleMutex.Unlock()
|
||||
|
||||
this.replicationLagQuery = newQuery
|
||||
}
|
||||
|
||||
func (this *MigrationContext) GetThrottleQuery() string {
|
||||
var query string
|
||||
|
||||
this.throttleMutex.Lock()
|
||||
defer this.throttleMutex.Unlock()
|
||||
|
||||
query = this.ThrottleQuery
|
||||
query = this.throttleQuery
|
||||
return query
|
||||
}
|
||||
|
||||
@ -348,7 +365,7 @@ func (this *MigrationContext) SetThrottleQuery(newQuery string) {
|
||||
this.throttleMutex.Lock()
|
||||
defer this.throttleMutex.Unlock()
|
||||
|
||||
this.ThrottleQuery = newQuery
|
||||
this.throttleQuery = newQuery
|
||||
}
|
||||
|
||||
func (this *MigrationContext) GetMaxLoad() LoadMap {
|
||||
@ -400,7 +417,7 @@ func (this *MigrationContext) GetThrottleControlReplicaKeys() *mysql.InstanceKey
|
||||
defer this.throttleMutex.Unlock()
|
||||
|
||||
keys := mysql.NewInstanceKeyMap()
|
||||
keys.AddKeys(this.ThrottleControlReplicaKeys.GetInstanceKeys())
|
||||
keys.AddKeys(this.throttleControlReplicaKeys.GetInstanceKeys())
|
||||
return keys
|
||||
}
|
||||
|
||||
@ -413,7 +430,15 @@ func (this *MigrationContext) ReadThrottleControlReplicaKeys(throttleControlRepl
|
||||
this.throttleMutex.Lock()
|
||||
defer this.throttleMutex.Unlock()
|
||||
|
||||
this.ThrottleControlReplicaKeys = keys
|
||||
this.throttleControlReplicaKeys = keys
|
||||
return nil
|
||||
}
|
||||
|
||||
func (this *MigrationContext) AddThrottleControlReplicaKey(key mysql.InstanceKey) error {
|
||||
this.throttleMutex.Lock()
|
||||
defer this.throttleMutex.Unlock()
|
||||
|
||||
this.throttleControlReplicaKeys.AddKey(key)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -75,9 +75,9 @@ func main() {
|
||||
flag.Int64Var(&migrationContext.NiceRatio, "nice-ratio", 0, "force being 'nice', imply sleep time per chunk time. Example values: 0 is aggressive. 3: for every ms spend in a rowcopy chunk, spend 3ms sleeping immediately after")
|
||||
|
||||
maxLagMillis := flag.Int64("max-lag-millis", 1500, "replication lag at which to throttle operation")
|
||||
flag.StringVar(&migrationContext.ReplictionLagQuery, "replication-lag-query", "", "Query that detects replication lag in seconds. Result can be a floating point (by default gh-ost issues SHOW SLAVE STATUS and reads Seconds_behind_master). If you're using pt-heartbeat, query would be something like: SELECT ROUND(UNIX_TIMESTAMP() - MAX(UNIX_TIMESTAMP(ts))) AS delay FROM my_schema.heartbeat")
|
||||
replicationLagQuery := flag.String("replication-lag-query", "", "Query that detects replication lag in seconds. Result can be a floating point (by default gh-ost issues SHOW SLAVE STATUS and reads Seconds_behind_master). If you're using pt-heartbeat, query would be something like: SELECT ROUND(UNIX_TIMESTAMP() - MAX(UNIX_TIMESTAMP(ts))) AS delay FROM my_schema.heartbeat")
|
||||
throttleControlReplicas := flag.String("throttle-control-replicas", "", "List of replicas on which to check for lag; comma delimited. Example: myhost1.com:3306,myhost2.com,myhost3.com:3307")
|
||||
flag.StringVar(&migrationContext.ThrottleQuery, "throttle-query", "", "when given, issued (every second) to check if operation should throttle. Expecting to return zero for no-throttle, >0 for throttle. Query is issued on the migrated server. Make sure this query is lightweight")
|
||||
throttleQuery := flag.String("throttle-query", "", "when given, issued (every second) to check if operation should throttle. Expecting to return zero for no-throttle, >0 for throttle. Query is issued on the migrated server. Make sure this query is lightweight")
|
||||
flag.StringVar(&migrationContext.ThrottleFlagFile, "throttle-flag-file", "", "operation pauses when this file exists; hint: use a file that is specific to the table being altered")
|
||||
flag.StringVar(&migrationContext.ThrottleAdditionalFlagFile, "throttle-additional-flag-file", "/tmp/gh-ost.throttle", "operation pauses when this file exists; hint: keep default, use for throttling multiple gh-ost operations")
|
||||
flag.StringVar(&migrationContext.PostponeCutOverFlagFile, "postpone-cut-over-flag-file", "", "while this file exists, migration will postpone the final stage of swapping tables, and will keep on syncing the ghost table. Cut-over/swapping would be ready to perform the moment the file is deleted.")
|
||||
@ -171,6 +171,8 @@ func main() {
|
||||
}
|
||||
migrationContext.SetChunkSize(*chunkSize)
|
||||
migrationContext.SetMaxLagMillisecondsThrottleThreshold(*maxLagMillis)
|
||||
migrationContext.SetReplicationLagQuery(*replicationLagQuery)
|
||||
migrationContext.SetThrottleQuery(*throttleQuery)
|
||||
migrationContext.SetDefaultNumRetries(*defaultRetries)
|
||||
migrationContext.ApplyCredentials()
|
||||
if err := migrationContext.SetCutOverLockTimeoutSeconds(*cutOverLockTimeoutSeconds); err != nil {
|
||||
|
@ -158,7 +158,7 @@ func (this *Migrator) shouldThrottle() (result bool, reason string) {
|
||||
checkThrottleControlReplicas = false
|
||||
}
|
||||
if checkThrottleControlReplicas {
|
||||
replicationLag, err := mysql.GetMaxReplicationLag(this.migrationContext.InspectorConnectionConfig, this.migrationContext.ThrottleControlReplicaKeys, this.migrationContext.ReplictionLagQuery)
|
||||
replicationLag, err := mysql.GetMaxReplicationLag(this.migrationContext.InspectorConnectionConfig, this.migrationContext.GetThrottleControlReplicaKeys(), this.migrationContext.GetReplicationLagQuery())
|
||||
if err != nil {
|
||||
return true, err.Error()
|
||||
}
|
||||
@ -661,9 +661,10 @@ chunk-size=<newsize> # Set a new chunk-size
|
||||
nice-ratio=<ratio> # Set a new nice-ratio, integer (0 is agrressive)
|
||||
critical-load=<load> # Set a new set of max-load thresholds
|
||||
max-lag-millis=<max-lag> # Set a new replication lag threshold
|
||||
replication-lag-query=<query> # Set a new query that determines replication lag (no quotes)
|
||||
max-load=<load> # Set a new set of max-load thresholds
|
||||
throttle-query=<query> # Set a new throttle-query
|
||||
throttle-control-replicas=<replicas> #
|
||||
throttle-query=<query> # Set a new throttle-query (no quotes)
|
||||
throttle-control-replicas=<replicas> # Set a new comma delimited list of throttle control replicas
|
||||
throttle # Force throttling
|
||||
no-throttle # End forced throttling (other throttling may still apply)
|
||||
unpostpone # Bail out a cut-over postpone; proceed to cut-over
|
||||
@ -693,6 +694,11 @@ help # This message
|
||||
this.printStatus(ForcePrintStatusAndHint, writer)
|
||||
}
|
||||
}
|
||||
case "replication-lag-query":
|
||||
{
|
||||
this.migrationContext.SetReplicationLagQuery(arg)
|
||||
this.printStatus(ForcePrintStatusAndHint, writer)
|
||||
}
|
||||
case "nice-ratio":
|
||||
{
|
||||
if niceRatio, err := strconv.Atoi(arg); err != nil {
|
||||
@ -809,8 +815,8 @@ func (this *Migrator) initiateInspector() (err error) {
|
||||
*this.migrationContext.ApplierConnectionConfig.ImpliedKey, *this.migrationContext.InspectorConnectionConfig.ImpliedKey,
|
||||
)
|
||||
this.migrationContext.ApplierConnectionConfig = this.migrationContext.InspectorConnectionConfig.Duplicate()
|
||||
if this.migrationContext.ThrottleControlReplicaKeys.Len() == 0 {
|
||||
this.migrationContext.ThrottleControlReplicaKeys.AddKey(this.migrationContext.InspectorConnectionConfig.Key)
|
||||
if this.migrationContext.GetThrottleControlReplicaKeys().Len() == 0 {
|
||||
this.migrationContext.AddThrottleControlReplicaKey(this.migrationContext.InspectorConnectionConfig.Key)
|
||||
}
|
||||
} else if this.migrationContext.InspectorIsAlsoApplier() && !this.migrationContext.AllowedRunningOnMaster {
|
||||
return fmt.Errorf("It seems like this migration attempt to run directly on master. Preferably it would be executed on a replica (and this reduces load from the master). To proceed please provide --allow-on-master")
|
||||
@ -860,6 +866,11 @@ func (this *Migrator) printMigrationStatusHint(writers ...io.Writer) {
|
||||
criticalLoad.String(),
|
||||
atomic.LoadInt64(&this.migrationContext.NiceRatio),
|
||||
))
|
||||
if replicationLagQuery := this.migrationContext.GetReplicationLagQuery(); replicationLagQuery != "" {
|
||||
fmt.Fprintln(w, fmt.Sprintf("# Replication lag query: %+v",
|
||||
replicationLagQuery,
|
||||
))
|
||||
}
|
||||
if this.migrationContext.ThrottleFlagFile != "" {
|
||||
fmt.Fprintln(w, fmt.Sprintf("# Throttle flag file: %+v",
|
||||
this.migrationContext.ThrottleFlagFile,
|
||||
|
Loading…
Reference in New Issue
Block a user