Store lastHeartbeatOnChangelogTime instead of CurrentHeartbeatLag

This commit is contained in:
Cathal Coffey 2021-02-03 09:02:58 +00:00
parent 4efd156759
commit 48ce0873de
3 changed files with 28 additions and 10 deletions

View File

@ -178,7 +178,8 @@ type MigrationContext struct {
RenameTablesEndTime time.Time
pointOfInterestTime time.Time
pointOfInterestTimeMutex *sync.Mutex
CurrentHeartbeatLag int64
lastHeartbeatOnChangelogTime time.Time
lastHeartbeatOnChangelogMutex *sync.Mutex
CurrentLag int64
currentProgress uint64
ThrottleHTTPStatusCode int64
@ -272,6 +273,7 @@ func NewMigrationContext() *MigrationContext {
throttleControlReplicaKeys: mysql.NewInstanceKeyMap(),
configMutex: &sync.Mutex{},
pointOfInterestTimeMutex: &sync.Mutex{},
lastHeartbeatOnChangelogMutex: &sync.Mutex{},
ColumnRenameMap: make(map[string]string),
PanicAbort: make(chan error),
Log: NewDefaultLogger(),
@ -455,8 +457,8 @@ func (this *MigrationContext) MarkRowCopyEndTime() {
this.RowCopyEndTime = time.Now()
}
func (this *MigrationContext) GetCurrentHeartbeatLagDuration() time.Duration {
return time.Duration(atomic.LoadInt64(&this.CurrentHeartbeatLag))
func (this *MigrationContext) TimeSinceLastHeartbeatOnChangelog() time.Duration {
return time.Since(this.GetLastHeartbeatOnChangelogTime())
}
func (this *MigrationContext) GetCurrentLagDuration() time.Duration {
@ -498,6 +500,20 @@ func (this *MigrationContext) TimeSincePointOfInterest() time.Duration {
return time.Since(this.pointOfInterestTime)
}
func (this *MigrationContext) SetLastHeartbeatOnChangelogTime(t time.Time) {
this.lastHeartbeatOnChangelogMutex.Lock()
defer this.lastHeartbeatOnChangelogMutex.Unlock()
this.lastHeartbeatOnChangelogTime = t
}
func (this *MigrationContext) GetLastHeartbeatOnChangelogTime() time.Time {
this.lastHeartbeatOnChangelogMutex.Lock()
defer this.lastHeartbeatOnChangelogMutex.Unlock()
return this.lastHeartbeatOnChangelogTime
}
func (this *MigrationContext) SetHeartbeatIntervalMilliseconds(heartbeatIntervalMilliseconds int64) {
if heartbeatIntervalMilliseconds < 100 {
heartbeatIntervalMilliseconds = 100

View File

@ -64,7 +64,7 @@ func (this *HooksExecutor) applyEnvironmentVariables(extraVariables ...string) [
env = append(env, fmt.Sprintf("GH_OST_INSPECTED_HOST=%s", this.migrationContext.GetInspectorHostname()))
env = append(env, fmt.Sprintf("GH_OST_EXECUTING_HOST=%s", this.migrationContext.Hostname))
env = append(env, fmt.Sprintf("GH_OST_INSPECTED_LAG=%f", this.migrationContext.GetCurrentLagDuration().Seconds()))
env = append(env, fmt.Sprintf("GH_OST_HEARTBEAT_LAG=%f", this.migrationContext.GetCurrentHeartbeatLagDuration().Seconds()))
env = append(env, fmt.Sprintf("GH_OST_HEARTBEAT_LAG=%f", this.migrationContext.TimeSinceLastHeartbeatOnChangelog().Seconds()))
env = append(env, fmt.Sprintf("GH_OST_PROGRESS=%f", this.migrationContext.GetProgressPct()))
env = append(env, fmt.Sprintf("GH_OST_HOOKS_HINT=%s", this.migrationContext.HooksHintMessage))
env = append(env, fmt.Sprintf("GH_OST_HOOKS_HINT_OWNER=%s", this.migrationContext.HooksHintOwner))

View File

@ -255,10 +255,12 @@ func (this *Migrator) onChangelogStateEvent(dmlEvent *binlog.BinlogDMLEvent) (er
func (this *Migrator) onChangelogHeartbeatEvent(dmlEvent *binlog.BinlogDMLEvent) (err error) {
changelogHeartbeatString := dmlEvent.NewColumnValues.StringColumn(3)
if lag, err := parseChangelogHeartbeat(changelogHeartbeatString); err != nil {
heartbeatTime, err := time.Parse(time.RFC3339Nano, changelogHeartbeatString)
if err != nil {
return this.migrationContext.Log.Errore(err)
} else {
atomic.StoreInt64(&this.migrationContext.CurrentHeartbeatLag, int64(lag))
this.migrationContext.SetLastHeartbeatOnChangelogTime(heartbeatTime)
return nil
}
}
@ -521,10 +523,10 @@ func (this *Migrator) cutOver() (err error) {
this.migrationContext.Log.Infof("Waiting for heartbeat lag to be low enough to proceed")
this.sleepWhileTrue(
func() (bool, error) {
currentHeartbeatLag := atomic.LoadInt64(&this.migrationContext.CurrentHeartbeatLag)
heartbeatLag := this.migrationContext.TimeSinceLastHeartbeatOnChangelog()
maxLagMillisecondsThrottleThreshold := atomic.LoadInt64(&this.migrationContext.MaxLagMillisecondsThrottleThreshold)
if time.Duration(currentHeartbeatLag) > time.Duration(maxLagMillisecondsThrottleThreshold)*time.Millisecond {
this.migrationContext.Log.Debugf("current HeartbeatLag (%.2fs) is too high, it needs to be less than --max-lag-millis (%.2fs) to continue", time.Duration(currentHeartbeatLag).Seconds(), (time.Duration(maxLagMillisecondsThrottleThreshold) * time.Millisecond).Seconds())
if heartbeatLag > time.Duration(maxLagMillisecondsThrottleThreshold)*time.Millisecond {
this.migrationContext.Log.Infof("current HeartbeatLag (%.2fs) is too high, it needs to be less than --max-lag-millis (%.2fs) to continue", heartbeatLag.Seconds(), (time.Duration(maxLagMillisecondsThrottleThreshold) * time.Millisecond).Seconds())
return true, nil
} else {
return false, nil
@ -1003,7 +1005,7 @@ func (this *Migrator) printStatus(rule PrintStatusRule, writers ...io.Writer) {
base.PrettifyDurationOutput(elapsedTime), base.PrettifyDurationOutput(this.migrationContext.ElapsedRowCopyTime()),
currentBinlogCoordinates,
this.migrationContext.GetCurrentLagDuration().Seconds(),
this.migrationContext.GetCurrentHeartbeatLagDuration().Seconds(),
this.migrationContext.TimeSinceLastHeartbeatOnChangelog().Seconds(),
state,
eta,
)