diff --git a/go/logic/migrator.go b/go/logic/migrator.go index 8a25fd1..9ebe585 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -158,12 +158,12 @@ func (this *Migrator) shouldThrottle() (result bool, reason string) { checkThrottleControlReplicas = false } if checkThrottleControlReplicas { - replicationLag, err := mysql.GetMaxReplicationLag(this.migrationContext.InspectorConnectionConfig, this.migrationContext.GetThrottleControlReplicaKeys(), this.migrationContext.GetReplicationLagQuery()) - if err != nil { - return true, err.Error() + lagResult := mysql.GetMaxReplicationLag(this.migrationContext.InspectorConnectionConfig, this.migrationContext.GetThrottleControlReplicaKeys(), this.migrationContext.GetReplicationLagQuery()) + if lagResult.Err != nil { + return true, fmt.Sprintf("%+v %+v", lagResult.Key, lagResult.Err) } - if replicationLag > time.Duration(maxLagMillisecondsThrottleThreshold)*time.Millisecond { - return true, fmt.Sprintf("replica-lag=%fs", replicationLag.Seconds()) + if lagResult.Lag > time.Duration(maxLagMillisecondsThrottleThreshold)*time.Millisecond { + return true, fmt.Sprintf("%+v replica-lag=%fs", lagResult.Key, lagResult.Lag.Seconds()) } } @@ -217,13 +217,15 @@ func (this *Migrator) initiateThrottler() error { // calls callback functions, if any func (this *Migrator) throttle(onThrottled func()) { for { + // IsThrottled() is non-blocking; the throttling decision making takes place asynchronously. + // Therefore calling IsThrottled() is cheap if shouldThrottle, _ := this.migrationContext.IsThrottled(); !shouldThrottle { return } if onThrottled != nil { onThrottled() } - time.Sleep(time.Second) + time.Sleep(250 * time.Millisecond) } } diff --git a/go/mysql/utils.go b/go/mysql/utils.go index 2e1d602..cf1ce54 100644 --- a/go/mysql/utils.go +++ b/go/mysql/utils.go @@ -14,6 +14,12 @@ import ( "github.com/outbrain/golib/sqlutils" ) +type ReplicationLagResult struct { + Key InstanceKey + Lag time.Duration + Err error +} + // GetReplicationLag returns replication lag for a given connection config; either by explicit query // or via SHOW SLAVE STATUS func GetReplicationLag(connectionConfig *ConnectionConfig, replicationLagQuery string) (replicationLag time.Duration, err error) { @@ -32,7 +38,7 @@ func GetReplicationLag(connectionConfig *ConnectionConfig, replicationLagQuery s err = sqlutils.QueryRowsMap(db, `show slave status`, func(m sqlutils.RowMap) error { secondsBehindMaster := m.GetNullInt64("Seconds_Behind_Master") if !secondsBehindMaster.Valid { - return fmt.Errorf("Replication not running on %+v", connectionConfig.Key) + return fmt.Errorf("replication not running") } replicationLag = time.Duration(secondsBehindMaster.Int64) * time.Second return nil @@ -42,30 +48,30 @@ func GetReplicationLag(connectionConfig *ConnectionConfig, replicationLagQuery s // GetMaxReplicationLag concurrently checks for replication lag on given list of instance keys, // each via GetReplicationLag -func GetMaxReplicationLag(baseConnectionConfig *ConnectionConfig, instanceKeyMap *InstanceKeyMap, replicationLagQuery string) (replicationLag time.Duration, err error) { +func GetMaxReplicationLag(baseConnectionConfig *ConnectionConfig, instanceKeyMap *InstanceKeyMap, replicationLagQuery string) (result *ReplicationLagResult) { + result = &ReplicationLagResult{Lag: 0} if instanceKeyMap.Len() == 0 { - return 0, nil + return result } - lagsChan := make(chan time.Duration, instanceKeyMap.Len()) - errorsChan := make(chan error, instanceKeyMap.Len()) + lagResults := make(chan *ReplicationLagResult, instanceKeyMap.Len()) for key := range *instanceKeyMap { connectionConfig := baseConnectionConfig.Duplicate() connectionConfig.Key = key + result := &ReplicationLagResult{Key: connectionConfig.Key} go func() { - lag, err := GetReplicationLag(connectionConfig, replicationLagQuery) - lagsChan <- lag - errorsChan <- err + result.Lag, result.Err = GetReplicationLag(connectionConfig, replicationLagQuery) + lagResults <- result }() } for range *instanceKeyMap { - if lagError := <-errorsChan; lagError != nil { - err = lagError - } - if lag := <-lagsChan; lag.Nanoseconds() > replicationLag.Nanoseconds() { - replicationLag = lag + lagResult := <-lagResults + if lagResult.Err != nil { + result = lagResult + } else if lagResult.Lag.Nanoseconds() > result.Lag.Nanoseconds() { + result = lagResult } } - return replicationLag, err + return result } func GetMasterKeyFromSlaveStatus(connectionConfig *ConnectionConfig) (masterKey *InstanceKey, err error) {