diff --git a/go/base/context.go b/go/base/context.go index cc8e9cc..3757653 100644 --- a/go/base/context.go +++ b/go/base/context.go @@ -178,7 +178,8 @@ type MigrationContext struct { RenameTablesEndTime time.Time pointOfInterestTime time.Time pointOfInterestTimeMutex *sync.Mutex - CurrentHeartbeatLag int64 + lastHeartbeatOnChangelogTime time.Time + lastHeartbeatOnChangelogMutex *sync.Mutex CurrentLag int64 currentProgress uint64 ThrottleHTTPStatusCode int64 @@ -272,6 +273,7 @@ func NewMigrationContext() *MigrationContext { throttleControlReplicaKeys: mysql.NewInstanceKeyMap(), configMutex: &sync.Mutex{}, pointOfInterestTimeMutex: &sync.Mutex{}, + lastHeartbeatOnChangelogMutex: &sync.Mutex{}, ColumnRenameMap: make(map[string]string), PanicAbort: make(chan error), Log: NewDefaultLogger(), @@ -455,8 +457,8 @@ func (this *MigrationContext) MarkRowCopyEndTime() { this.RowCopyEndTime = time.Now() } -func (this *MigrationContext) GetCurrentHeartbeatLagDuration() time.Duration { - return time.Duration(atomic.LoadInt64(&this.CurrentHeartbeatLag)) +func (this *MigrationContext) TimeSinceLastHeartbeatOnChangelog() time.Duration { + return time.Since(this.GetLastHeartbeatOnChangelogTime()) } func (this *MigrationContext) GetCurrentLagDuration() time.Duration { @@ -498,6 +500,20 @@ func (this *MigrationContext) TimeSincePointOfInterest() time.Duration { return time.Since(this.pointOfInterestTime) } +func (this *MigrationContext) SetLastHeartbeatOnChangelogTime(t time.Time) { + this.lastHeartbeatOnChangelogMutex.Lock() + defer this.lastHeartbeatOnChangelogMutex.Unlock() + + this.lastHeartbeatOnChangelogTime = t +} + +func (this *MigrationContext) GetLastHeartbeatOnChangelogTime() time.Time { + this.lastHeartbeatOnChangelogMutex.Lock() + defer this.lastHeartbeatOnChangelogMutex.Unlock() + + return this.lastHeartbeatOnChangelogTime +} + func (this *MigrationContext) SetHeartbeatIntervalMilliseconds(heartbeatIntervalMilliseconds int64) { if heartbeatIntervalMilliseconds < 100 { heartbeatIntervalMilliseconds = 100 diff --git a/go/logic/hooks.go b/go/logic/hooks.go index 9f119be..71f070c 100644 --- a/go/logic/hooks.go +++ b/go/logic/hooks.go @@ -64,7 +64,7 @@ func (this *HooksExecutor) applyEnvironmentVariables(extraVariables ...string) [ env = append(env, fmt.Sprintf("GH_OST_INSPECTED_HOST=%s", this.migrationContext.GetInspectorHostname())) env = append(env, fmt.Sprintf("GH_OST_EXECUTING_HOST=%s", this.migrationContext.Hostname)) env = append(env, fmt.Sprintf("GH_OST_INSPECTED_LAG=%f", this.migrationContext.GetCurrentLagDuration().Seconds())) - env = append(env, fmt.Sprintf("GH_OST_HEARTBEAT_LAG=%f", this.migrationContext.GetCurrentHeartbeatLagDuration().Seconds())) + env = append(env, fmt.Sprintf("GH_OST_HEARTBEAT_LAG=%f", this.migrationContext.TimeSinceLastHeartbeatOnChangelog().Seconds())) env = append(env, fmt.Sprintf("GH_OST_PROGRESS=%f", this.migrationContext.GetProgressPct())) env = append(env, fmt.Sprintf("GH_OST_HOOKS_HINT=%s", this.migrationContext.HooksHintMessage)) env = append(env, fmt.Sprintf("GH_OST_HOOKS_HINT_OWNER=%s", this.migrationContext.HooksHintOwner)) diff --git a/go/logic/migrator.go b/go/logic/migrator.go index 2c95e63..5bc6997 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -255,10 +255,12 @@ func (this *Migrator) onChangelogStateEvent(dmlEvent *binlog.BinlogDMLEvent) (er func (this *Migrator) onChangelogHeartbeatEvent(dmlEvent *binlog.BinlogDMLEvent) (err error) { changelogHeartbeatString := dmlEvent.NewColumnValues.StringColumn(3) - if lag, err := parseChangelogHeartbeat(changelogHeartbeatString); err != nil { + + heartbeatTime, err := time.Parse(time.RFC3339Nano, changelogHeartbeatString) + if err != nil { return this.migrationContext.Log.Errore(err) } else { - atomic.StoreInt64(&this.migrationContext.CurrentHeartbeatLag, int64(lag)) + this.migrationContext.SetLastHeartbeatOnChangelogTime(heartbeatTime) return nil } } @@ -521,10 +523,10 @@ func (this *Migrator) cutOver() (err error) { this.migrationContext.Log.Infof("Waiting for heartbeat lag to be low enough to proceed") this.sleepWhileTrue( func() (bool, error) { - currentHeartbeatLag := atomic.LoadInt64(&this.migrationContext.CurrentHeartbeatLag) + heartbeatLag := this.migrationContext.TimeSinceLastHeartbeatOnChangelog() maxLagMillisecondsThrottleThreshold := atomic.LoadInt64(&this.migrationContext.MaxLagMillisecondsThrottleThreshold) - if time.Duration(currentHeartbeatLag) > time.Duration(maxLagMillisecondsThrottleThreshold)*time.Millisecond { - this.migrationContext.Log.Debugf("current HeartbeatLag (%.2fs) is too high, it needs to be less than --max-lag-millis (%.2fs) to continue", time.Duration(currentHeartbeatLag).Seconds(), (time.Duration(maxLagMillisecondsThrottleThreshold) * time.Millisecond).Seconds()) + if heartbeatLag > time.Duration(maxLagMillisecondsThrottleThreshold)*time.Millisecond { + this.migrationContext.Log.Infof("current HeartbeatLag (%.2fs) is too high, it needs to be less than --max-lag-millis (%.2fs) to continue", heartbeatLag.Seconds(), (time.Duration(maxLagMillisecondsThrottleThreshold) * time.Millisecond).Seconds()) return true, nil } else { return false, nil @@ -1003,7 +1005,7 @@ func (this *Migrator) printStatus(rule PrintStatusRule, writers ...io.Writer) { base.PrettifyDurationOutput(elapsedTime), base.PrettifyDurationOutput(this.migrationContext.ElapsedRowCopyTime()), currentBinlogCoordinates, this.migrationContext.GetCurrentLagDuration().Seconds(), - this.migrationContext.GetCurrentHeartbeatLagDuration().Seconds(), + this.migrationContext.TimeSinceLastHeartbeatOnChangelog().Seconds(), state, eta, )