diff --git a/go/logic/throttler.go b/go/logic/throttler.go index 1635b5e..41ece40 100644 --- a/go/logic/throttler.go +++ b/go/logic/throttler.go @@ -84,18 +84,30 @@ func (this *Throttler) parseChangelogHeartbeat(heartbeatValue string) (err error } } -// collectHeartbeat reads the latest changelog heartbeat value -func (this *Throttler) collectHeartbeat() { +// collectReplicationLag reads the latest changelog heartbeat value +func (this *Throttler) collectReplicationLag() { ticker := time.Tick(time.Duration(this.migrationContext.HeartbeatIntervalMilliseconds) * time.Millisecond) for range ticker { go func() error { if atomic.LoadInt64(&this.migrationContext.CleanupImminentFlag) > 0 { return nil } - if heartbeatValue, err := this.inspector.readChangelogState("heartbeat"); err != nil { - return log.Errore(err) + + if this.migrationContext.TestOnReplica || this.migrationContext.MigrateOnReplica { + // when running on replica, the heartbeat injection is also done on the replica. + // This means we will always get a good heartbeat value. + // When runnign on replica, we should instead check the `SHOW SLAVE STATUS` output. + if lag, err := mysql.GetReplicationLag(this.inspector.connectionConfig); err != nil { + return log.Errore(err) + } else { + atomic.StoreInt64(&this.migrationContext.CurrentLag, int64(lag)) + } } else { - this.parseChangelogHeartbeat(heartbeatValue) + if heartbeatValue, err := this.inspector.readChangelogState("heartbeat"); err != nil { + return log.Errore(err) + } else { + this.parseChangelogHeartbeat(heartbeatValue) + } } return nil }() @@ -114,6 +126,7 @@ func (this *Throttler) collectControlReplicasLag() { readReplicaLag := func(connectionConfig *mysql.ConnectionConfig) (lag time.Duration, err error) { dbUri := connectionConfig.GetDBUri("information_schema") + var heartbeatValue string if db, _, err := sqlutils.GetDB(dbUri); err != nil { return lag, err @@ -272,7 +285,7 @@ func (this *Throttler) collectGeneralThrottleMetrics() error { // that may affect throttling. There are several components, all running independently, // that collect such metrics. func (this *Throttler) initiateThrottlerCollection(firstThrottlingCollected chan<- bool) { - go this.collectHeartbeat() + go this.collectReplicationLag() go this.collectControlReplicasLag() go func() {