More informative information upon control-replicas lagging
This commit is contained in:
parent
2f4d9b80bb
commit
e900dae2e9
@ -158,12 +158,12 @@ func (this *Migrator) shouldThrottle() (result bool, reason string) {
|
||||
checkThrottleControlReplicas = false
|
||||
}
|
||||
if checkThrottleControlReplicas {
|
||||
replicationLag, err := mysql.GetMaxReplicationLag(this.migrationContext.InspectorConnectionConfig, this.migrationContext.GetThrottleControlReplicaKeys(), this.migrationContext.GetReplicationLagQuery())
|
||||
if err != nil {
|
||||
return true, err.Error()
|
||||
lagResult := mysql.GetMaxReplicationLag(this.migrationContext.InspectorConnectionConfig, this.migrationContext.GetThrottleControlReplicaKeys(), this.migrationContext.GetReplicationLagQuery())
|
||||
if lagResult.Err != nil {
|
||||
return true, fmt.Sprintf("%+v %+v", lagResult.Key, lagResult.Err)
|
||||
}
|
||||
if replicationLag > time.Duration(maxLagMillisecondsThrottleThreshold)*time.Millisecond {
|
||||
return true, fmt.Sprintf("replica-lag=%fs", replicationLag.Seconds())
|
||||
if lagResult.Lag > time.Duration(maxLagMillisecondsThrottleThreshold)*time.Millisecond {
|
||||
return true, fmt.Sprintf("%+v replica-lag=%fs", lagResult.Key, lagResult.Lag.Seconds())
|
||||
}
|
||||
}
|
||||
|
||||
@ -217,13 +217,15 @@ func (this *Migrator) initiateThrottler() error {
|
||||
// calls callback functions, if any
|
||||
func (this *Migrator) throttle(onThrottled func()) {
|
||||
for {
|
||||
// IsThrottled() is non-blocking; the throttling decision making takes place asynchronously.
|
||||
// Therefore calling IsThrottled() is cheap
|
||||
if shouldThrottle, _ := this.migrationContext.IsThrottled(); !shouldThrottle {
|
||||
return
|
||||
}
|
||||
if onThrottled != nil {
|
||||
onThrottled()
|
||||
}
|
||||
time.Sleep(time.Second)
|
||||
time.Sleep(250 * time.Millisecond)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -14,6 +14,12 @@ import (
|
||||
"github.com/outbrain/golib/sqlutils"
|
||||
)
|
||||
|
||||
type ReplicationLagResult struct {
|
||||
Key InstanceKey
|
||||
Lag time.Duration
|
||||
Err error
|
||||
}
|
||||
|
||||
// GetReplicationLag returns replication lag for a given connection config; either by explicit query
|
||||
// or via SHOW SLAVE STATUS
|
||||
func GetReplicationLag(connectionConfig *ConnectionConfig, replicationLagQuery string) (replicationLag time.Duration, err error) {
|
||||
@ -32,7 +38,7 @@ func GetReplicationLag(connectionConfig *ConnectionConfig, replicationLagQuery s
|
||||
err = sqlutils.QueryRowsMap(db, `show slave status`, func(m sqlutils.RowMap) error {
|
||||
secondsBehindMaster := m.GetNullInt64("Seconds_Behind_Master")
|
||||
if !secondsBehindMaster.Valid {
|
||||
return fmt.Errorf("Replication not running on %+v", connectionConfig.Key)
|
||||
return fmt.Errorf("replication not running")
|
||||
}
|
||||
replicationLag = time.Duration(secondsBehindMaster.Int64) * time.Second
|
||||
return nil
|
||||
@ -42,30 +48,30 @@ func GetReplicationLag(connectionConfig *ConnectionConfig, replicationLagQuery s
|
||||
|
||||
// GetMaxReplicationLag concurrently checks for replication lag on given list of instance keys,
|
||||
// each via GetReplicationLag
|
||||
func GetMaxReplicationLag(baseConnectionConfig *ConnectionConfig, instanceKeyMap *InstanceKeyMap, replicationLagQuery string) (replicationLag time.Duration, err error) {
|
||||
func GetMaxReplicationLag(baseConnectionConfig *ConnectionConfig, instanceKeyMap *InstanceKeyMap, replicationLagQuery string) (result *ReplicationLagResult) {
|
||||
result = &ReplicationLagResult{Lag: 0}
|
||||
if instanceKeyMap.Len() == 0 {
|
||||
return 0, nil
|
||||
return result
|
||||
}
|
||||
lagsChan := make(chan time.Duration, instanceKeyMap.Len())
|
||||
errorsChan := make(chan error, instanceKeyMap.Len())
|
||||
lagResults := make(chan *ReplicationLagResult, instanceKeyMap.Len())
|
||||
for key := range *instanceKeyMap {
|
||||
connectionConfig := baseConnectionConfig.Duplicate()
|
||||
connectionConfig.Key = key
|
||||
result := &ReplicationLagResult{Key: connectionConfig.Key}
|
||||
go func() {
|
||||
lag, err := GetReplicationLag(connectionConfig, replicationLagQuery)
|
||||
lagsChan <- lag
|
||||
errorsChan <- err
|
||||
result.Lag, result.Err = GetReplicationLag(connectionConfig, replicationLagQuery)
|
||||
lagResults <- result
|
||||
}()
|
||||
}
|
||||
for range *instanceKeyMap {
|
||||
if lagError := <-errorsChan; lagError != nil {
|
||||
err = lagError
|
||||
}
|
||||
if lag := <-lagsChan; lag.Nanoseconds() > replicationLag.Nanoseconds() {
|
||||
replicationLag = lag
|
||||
lagResult := <-lagResults
|
||||
if lagResult.Err != nil {
|
||||
result = lagResult
|
||||
} else if lagResult.Lag.Nanoseconds() > result.Lag.Nanoseconds() {
|
||||
result = lagResult
|
||||
}
|
||||
}
|
||||
return replicationLag, err
|
||||
return result
|
||||
}
|
||||
|
||||
func GetMasterKeyFromSlaveStatus(connectionConfig *ConnectionConfig) (masterKey *InstanceKey, err error) {
|
||||
|
Loading…
Reference in New Issue
Block a user