From 7207bc146a1cf144b15ce5ca4fdcd0a786d57729 Mon Sep 17 00:00:00 2001 From: Cathal Coffey Date: Sun, 31 Jan 2021 18:23:09 +0000 Subject: [PATCH 1/9] Make it easier to handle different onChangelogEvents --- go/logic/migrator.go | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/go/logic/migrator.go b/go/logic/migrator.go index 291a490..908a42f 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -207,12 +207,18 @@ func (this *Migrator) canStopStreaming() bool { return atomic.LoadInt64(&this.migrationContext.CutOverCompleteFlag) != 0 } -// onChangelogStateEvent is called when a binlog event operation on the changelog table is intercepted. -func (this *Migrator) onChangelogStateEvent(dmlEvent *binlog.BinlogDMLEvent) (err error) { +// onChangelogEvent is called when a binlog event operation on the changelog table is intercepted. +func (this *Migrator) onChangelogEvent(dmlEvent *binlog.BinlogDMLEvent) (err error) { // Hey, I created the changelog table, I know the type of columns it has! - if hint := dmlEvent.NewColumnValues.StringColumn(2); hint != "state" { + switch hint := dmlEvent.NewColumnValues.StringColumn(2); hint { + case "state": + return this.onChangelogStateEvent(dmlEvent) + default: return nil } +} + +func (this *Migrator) onChangelogStateEvent(dmlEvent *binlog.BinlogDMLEvent) (err error) { changelogStateString := dmlEvent.NewColumnValues.StringColumn(3) changelogState := ReadChangelogState(changelogStateString) this.migrationContext.Log.Infof("Intercepted changelog state %s", changelogState) @@ -995,7 +1001,7 @@ func (this *Migrator) initiateStreaming() error { this.migrationContext.DatabaseName, this.migrationContext.GetChangelogTableName(), func(dmlEvent *binlog.BinlogDMLEvent) error { - return this.onChangelogStateEvent(dmlEvent) + return this.onChangelogEvent(dmlEvent) }, ) From 8aee288fd79ead7b9ae6023b8002116f4d62c69b Mon Sep 17 00:00:00 2001 From: Cathal Coffey Date: Sun, 31 Jan 2021 18:24:26 +0000 Subject: [PATCH 2/9] Handle onChangelogHeartbeatEvent and update CurrentHeartbeatLag --- go/base/context.go | 1 + go/logic/migrator.go | 12 ++++++++++++ 2 files changed, 13 insertions(+) diff --git a/go/base/context.go b/go/base/context.go index 3211067..3026dc5 100644 --- a/go/base/context.go +++ b/go/base/context.go @@ -178,6 +178,7 @@ type MigrationContext struct { RenameTablesEndTime time.Time pointOfInterestTime time.Time pointOfInterestTimeMutex *sync.Mutex + CurrentHeartbeatLag int64 CurrentLag int64 currentProgress uint64 ThrottleHTTPStatusCode int64 diff --git a/go/logic/migrator.go b/go/logic/migrator.go index 908a42f..83e4228 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -213,6 +213,8 @@ func (this *Migrator) onChangelogEvent(dmlEvent *binlog.BinlogDMLEvent) (err err switch hint := dmlEvent.NewColumnValues.StringColumn(2); hint { case "state": return this.onChangelogStateEvent(dmlEvent) + case "heartbeat": + return this.onChangelogHeartbeatEvent(dmlEvent) default: return nil } @@ -251,6 +253,16 @@ func (this *Migrator) onChangelogStateEvent(dmlEvent *binlog.BinlogDMLEvent) (er return nil } +func (this *Migrator) onChangelogHeartbeatEvent(dmlEvent *binlog.BinlogDMLEvent) (err error) { + changelogHeartbeatString := dmlEvent.NewColumnValues.StringColumn(3) + if lag, err := parseChangelogHeartbeat(changelogHeartbeatString); err != nil { + return this.migrationContext.Log.Errore(err) + } else { + atomic.StoreInt64(&this.migrationContext.CurrentHeartbeatLag, int64(lag)) + return nil + } +} + // listenOnPanicAbort aborts on abort request func (this *Migrator) listenOnPanicAbort() { err := <-this.migrationContext.PanicAbort From a4218cd6f416c9448298c219fdac442bd82961ec Mon Sep 17 00:00:00 2001 From: Cathal Coffey Date: Sun, 31 Jan 2021 18:27:34 +0000 Subject: [PATCH 3/9] Progress should print HeartbeatLag --- doc/hooks.md | 1 + go/base/context.go | 4 ++++ go/logic/hooks.go | 1 + go/logic/migrator.go | 3 ++- 4 files changed, 8 insertions(+), 1 deletion(-) diff --git a/doc/hooks.md b/doc/hooks.md index 4c49c85..91e1311 100644 --- a/doc/hooks.md +++ b/doc/hooks.md @@ -66,6 +66,7 @@ The following variables are available on all hooks: - `GH_OST_ESTIMATED_ROWS` - estimated total rows in table - `GH_OST_COPIED_ROWS` - number of rows copied by `gh-ost` - `GH_OST_INSPECTED_LAG` - lag in seconds (floating point) of inspected server +- `GH_OST_HEARTBEAT_LAG` - lag in seconds (floating point) of heartbeat - `GH_OST_PROGRESS` - progress pct ([0..100], floating point) of migration - `GH_OST_MIGRATED_HOST` - `GH_OST_INSPECTED_HOST` diff --git a/go/base/context.go b/go/base/context.go index 3026dc5..cc8e9cc 100644 --- a/go/base/context.go +++ b/go/base/context.go @@ -455,6 +455,10 @@ func (this *MigrationContext) MarkRowCopyEndTime() { this.RowCopyEndTime = time.Now() } +func (this *MigrationContext) GetCurrentHeartbeatLagDuration() time.Duration { + return time.Duration(atomic.LoadInt64(&this.CurrentHeartbeatLag)) +} + func (this *MigrationContext) GetCurrentLagDuration() time.Duration { return time.Duration(atomic.LoadInt64(&this.CurrentLag)) } diff --git a/go/logic/hooks.go b/go/logic/hooks.go index fa5011e..9f119be 100644 --- a/go/logic/hooks.go +++ b/go/logic/hooks.go @@ -64,6 +64,7 @@ func (this *HooksExecutor) applyEnvironmentVariables(extraVariables ...string) [ env = append(env, fmt.Sprintf("GH_OST_INSPECTED_HOST=%s", this.migrationContext.GetInspectorHostname())) env = append(env, fmt.Sprintf("GH_OST_EXECUTING_HOST=%s", this.migrationContext.Hostname)) env = append(env, fmt.Sprintf("GH_OST_INSPECTED_LAG=%f", this.migrationContext.GetCurrentLagDuration().Seconds())) + env = append(env, fmt.Sprintf("GH_OST_HEARTBEAT_LAG=%f", this.migrationContext.GetCurrentHeartbeatLagDuration().Seconds())) env = append(env, fmt.Sprintf("GH_OST_PROGRESS=%f", this.migrationContext.GetProgressPct())) env = append(env, fmt.Sprintf("GH_OST_HOOKS_HINT=%s", this.migrationContext.HooksHintMessage)) env = append(env, fmt.Sprintf("GH_OST_HOOKS_HINT_OWNER=%s", this.migrationContext.HooksHintOwner)) diff --git a/go/logic/migrator.go b/go/logic/migrator.go index 83e4228..aae3cb3 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -980,13 +980,14 @@ func (this *Migrator) printStatus(rule PrintStatusRule, writers ...io.Writer) { currentBinlogCoordinates := *this.eventsStreamer.GetCurrentBinlogCoordinates() - status := fmt.Sprintf("Copy: %d/%d %.1f%%; Applied: %d; Backlog: %d/%d; Time: %+v(total), %+v(copy); streamer: %+v; Lag: %.2fs, State: %s; ETA: %s", + status := fmt.Sprintf("Copy: %d/%d %.1f%%; Applied: %d; Backlog: %d/%d; Time: %+v(total), %+v(copy); streamer: %+v; Lag: %.2fs, HeartbeatLag: %.2fs, State: %s; ETA: %s", totalRowsCopied, rowsEstimate, progressPct, atomic.LoadInt64(&this.migrationContext.TotalDMLEventsApplied), len(this.applyEventsQueue), cap(this.applyEventsQueue), base.PrettifyDurationOutput(elapsedTime), base.PrettifyDurationOutput(this.migrationContext.ElapsedRowCopyTime()), currentBinlogCoordinates, this.migrationContext.GetCurrentLagDuration().Seconds(), + this.migrationContext.GetCurrentHeartbeatLagDuration().Seconds(), state, eta, ) From 8a26c9ebf47008d40efb255b55892ff1f9e0b634 Mon Sep 17 00:00:00 2001 From: Cathal Coffey Date: Sun, 31 Jan 2021 18:28:25 +0000 Subject: [PATCH 4/9] Don't cut-over until it is safe to do so --- go/logic/migrator.go | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/go/logic/migrator.go b/go/logic/migrator.go index aae3cb3..633b3d0 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -518,6 +518,22 @@ func (this *Migrator) cutOver() (err error) { this.migrationContext.MarkPointOfInterest() this.migrationContext.Log.Debugf("checking for cut-over postpone: complete") + this.migrationContext.Log.Infof("Waiting for heartbeat lag to be low enough to proceed") + this.sleepWhileTrue( + func() (bool, error) { + currentHeartbeatLag := atomic.LoadInt64(&this.migrationContext.CurrentHeartbeatLag) + maxLagMillisecondsThrottleThreshold := atomic.LoadInt64(&this.migrationContext.MaxLagMillisecondsThrottleThreshold) + if time.Duration(currentHeartbeatLag) > time.Duration(maxLagMillisecondsThrottleThreshold)*time.Millisecond { + this.migrationContext.Log.Infof("current HeartbeatLag (%.2fs) is too high, it needs to be less than --max-lag-millis (%.2fs) to continue", time.Duration(currentHeartbeatLag).Seconds(), (time.Duration(maxLagMillisecondsThrottleThreshold) * time.Millisecond).Seconds()) + return true, nil + } else { + return false, nil + } + }, + ) + this.migrationContext.MarkPointOfInterest() + this.migrationContext.Log.Infof("Heartbeat lag is low enough, proceeding") + if this.migrationContext.TestOnReplica { // With `--test-on-replica` we stop replication thread, and then proceed to use // the same cut-over phase as the master would use. That means we take locks From 4efd15675948aebcb3868cfdaa361b9699d90dfa Mon Sep 17 00:00:00 2001 From: Cathal Coffey Date: Tue, 2 Feb 2021 09:17:31 +0000 Subject: [PATCH 5/9] Move 'heartbeat is too high' to Debug logs --- go/logic/migrator.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/logic/migrator.go b/go/logic/migrator.go index 633b3d0..2c95e63 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -524,7 +524,7 @@ func (this *Migrator) cutOver() (err error) { currentHeartbeatLag := atomic.LoadInt64(&this.migrationContext.CurrentHeartbeatLag) maxLagMillisecondsThrottleThreshold := atomic.LoadInt64(&this.migrationContext.MaxLagMillisecondsThrottleThreshold) if time.Duration(currentHeartbeatLag) > time.Duration(maxLagMillisecondsThrottleThreshold)*time.Millisecond { - this.migrationContext.Log.Infof("current HeartbeatLag (%.2fs) is too high, it needs to be less than --max-lag-millis (%.2fs) to continue", time.Duration(currentHeartbeatLag).Seconds(), (time.Duration(maxLagMillisecondsThrottleThreshold) * time.Millisecond).Seconds()) + this.migrationContext.Log.Debugf("current HeartbeatLag (%.2fs) is too high, it needs to be less than --max-lag-millis (%.2fs) to continue", time.Duration(currentHeartbeatLag).Seconds(), (time.Duration(maxLagMillisecondsThrottleThreshold) * time.Millisecond).Seconds()) return true, nil } else { return false, nil From 48ce0873de2979d9dba8bb83e75543a004652d3a Mon Sep 17 00:00:00 2001 From: Cathal Coffey Date: Wed, 3 Feb 2021 09:02:58 +0000 Subject: [PATCH 6/9] Store lastHeartbeatOnChangelogTime instead of CurrentHeartbeatLag --- go/base/context.go | 22 +++++++++++++++++++--- go/logic/hooks.go | 2 +- go/logic/migrator.go | 14 ++++++++------ 3 files changed, 28 insertions(+), 10 deletions(-) diff --git a/go/base/context.go b/go/base/context.go index cc8e9cc..3757653 100644 --- a/go/base/context.go +++ b/go/base/context.go @@ -178,7 +178,8 @@ type MigrationContext struct { RenameTablesEndTime time.Time pointOfInterestTime time.Time pointOfInterestTimeMutex *sync.Mutex - CurrentHeartbeatLag int64 + lastHeartbeatOnChangelogTime time.Time + lastHeartbeatOnChangelogMutex *sync.Mutex CurrentLag int64 currentProgress uint64 ThrottleHTTPStatusCode int64 @@ -272,6 +273,7 @@ func NewMigrationContext() *MigrationContext { throttleControlReplicaKeys: mysql.NewInstanceKeyMap(), configMutex: &sync.Mutex{}, pointOfInterestTimeMutex: &sync.Mutex{}, + lastHeartbeatOnChangelogMutex: &sync.Mutex{}, ColumnRenameMap: make(map[string]string), PanicAbort: make(chan error), Log: NewDefaultLogger(), @@ -455,8 +457,8 @@ func (this *MigrationContext) MarkRowCopyEndTime() { this.RowCopyEndTime = time.Now() } -func (this *MigrationContext) GetCurrentHeartbeatLagDuration() time.Duration { - return time.Duration(atomic.LoadInt64(&this.CurrentHeartbeatLag)) +func (this *MigrationContext) TimeSinceLastHeartbeatOnChangelog() time.Duration { + return time.Since(this.GetLastHeartbeatOnChangelogTime()) } func (this *MigrationContext) GetCurrentLagDuration() time.Duration { @@ -498,6 +500,20 @@ func (this *MigrationContext) TimeSincePointOfInterest() time.Duration { return time.Since(this.pointOfInterestTime) } +func (this *MigrationContext) SetLastHeartbeatOnChangelogTime(t time.Time) { + this.lastHeartbeatOnChangelogMutex.Lock() + defer this.lastHeartbeatOnChangelogMutex.Unlock() + + this.lastHeartbeatOnChangelogTime = t +} + +func (this *MigrationContext) GetLastHeartbeatOnChangelogTime() time.Time { + this.lastHeartbeatOnChangelogMutex.Lock() + defer this.lastHeartbeatOnChangelogMutex.Unlock() + + return this.lastHeartbeatOnChangelogTime +} + func (this *MigrationContext) SetHeartbeatIntervalMilliseconds(heartbeatIntervalMilliseconds int64) { if heartbeatIntervalMilliseconds < 100 { heartbeatIntervalMilliseconds = 100 diff --git a/go/logic/hooks.go b/go/logic/hooks.go index 9f119be..71f070c 100644 --- a/go/logic/hooks.go +++ b/go/logic/hooks.go @@ -64,7 +64,7 @@ func (this *HooksExecutor) applyEnvironmentVariables(extraVariables ...string) [ env = append(env, fmt.Sprintf("GH_OST_INSPECTED_HOST=%s", this.migrationContext.GetInspectorHostname())) env = append(env, fmt.Sprintf("GH_OST_EXECUTING_HOST=%s", this.migrationContext.Hostname)) env = append(env, fmt.Sprintf("GH_OST_INSPECTED_LAG=%f", this.migrationContext.GetCurrentLagDuration().Seconds())) - env = append(env, fmt.Sprintf("GH_OST_HEARTBEAT_LAG=%f", this.migrationContext.GetCurrentHeartbeatLagDuration().Seconds())) + env = append(env, fmt.Sprintf("GH_OST_HEARTBEAT_LAG=%f", this.migrationContext.TimeSinceLastHeartbeatOnChangelog().Seconds())) env = append(env, fmt.Sprintf("GH_OST_PROGRESS=%f", this.migrationContext.GetProgressPct())) env = append(env, fmt.Sprintf("GH_OST_HOOKS_HINT=%s", this.migrationContext.HooksHintMessage)) env = append(env, fmt.Sprintf("GH_OST_HOOKS_HINT_OWNER=%s", this.migrationContext.HooksHintOwner)) diff --git a/go/logic/migrator.go b/go/logic/migrator.go index 2c95e63..5bc6997 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -255,10 +255,12 @@ func (this *Migrator) onChangelogStateEvent(dmlEvent *binlog.BinlogDMLEvent) (er func (this *Migrator) onChangelogHeartbeatEvent(dmlEvent *binlog.BinlogDMLEvent) (err error) { changelogHeartbeatString := dmlEvent.NewColumnValues.StringColumn(3) - if lag, err := parseChangelogHeartbeat(changelogHeartbeatString); err != nil { + + heartbeatTime, err := time.Parse(time.RFC3339Nano, changelogHeartbeatString) + if err != nil { return this.migrationContext.Log.Errore(err) } else { - atomic.StoreInt64(&this.migrationContext.CurrentHeartbeatLag, int64(lag)) + this.migrationContext.SetLastHeartbeatOnChangelogTime(heartbeatTime) return nil } } @@ -521,10 +523,10 @@ func (this *Migrator) cutOver() (err error) { this.migrationContext.Log.Infof("Waiting for heartbeat lag to be low enough to proceed") this.sleepWhileTrue( func() (bool, error) { - currentHeartbeatLag := atomic.LoadInt64(&this.migrationContext.CurrentHeartbeatLag) + heartbeatLag := this.migrationContext.TimeSinceLastHeartbeatOnChangelog() maxLagMillisecondsThrottleThreshold := atomic.LoadInt64(&this.migrationContext.MaxLagMillisecondsThrottleThreshold) - if time.Duration(currentHeartbeatLag) > time.Duration(maxLagMillisecondsThrottleThreshold)*time.Millisecond { - this.migrationContext.Log.Debugf("current HeartbeatLag (%.2fs) is too high, it needs to be less than --max-lag-millis (%.2fs) to continue", time.Duration(currentHeartbeatLag).Seconds(), (time.Duration(maxLagMillisecondsThrottleThreshold) * time.Millisecond).Seconds()) + if heartbeatLag > time.Duration(maxLagMillisecondsThrottleThreshold)*time.Millisecond { + this.migrationContext.Log.Infof("current HeartbeatLag (%.2fs) is too high, it needs to be less than --max-lag-millis (%.2fs) to continue", heartbeatLag.Seconds(), (time.Duration(maxLagMillisecondsThrottleThreshold) * time.Millisecond).Seconds()) return true, nil } else { return false, nil @@ -1003,7 +1005,7 @@ func (this *Migrator) printStatus(rule PrintStatusRule, writers ...io.Writer) { base.PrettifyDurationOutput(elapsedTime), base.PrettifyDurationOutput(this.migrationContext.ElapsedRowCopyTime()), currentBinlogCoordinates, this.migrationContext.GetCurrentLagDuration().Seconds(), - this.migrationContext.GetCurrentHeartbeatLagDuration().Seconds(), + this.migrationContext.TimeSinceLastHeartbeatOnChangelog().Seconds(), state, eta, ) From d5c2414893fefc19c1b8b6d4666a6d209addf7fb Mon Sep 17 00:00:00 2001 From: Cathal Coffey Date: Fri, 5 Feb 2021 10:12:54 +0000 Subject: [PATCH 7/9] Move 'heartbeat is too high' back to Debug logs again --- go/logic/migrator.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/logic/migrator.go b/go/logic/migrator.go index 5bc6997..f866cfc 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -526,7 +526,7 @@ func (this *Migrator) cutOver() (err error) { heartbeatLag := this.migrationContext.TimeSinceLastHeartbeatOnChangelog() maxLagMillisecondsThrottleThreshold := atomic.LoadInt64(&this.migrationContext.MaxLagMillisecondsThrottleThreshold) if heartbeatLag > time.Duration(maxLagMillisecondsThrottleThreshold)*time.Millisecond { - this.migrationContext.Log.Infof("current HeartbeatLag (%.2fs) is too high, it needs to be less than --max-lag-millis (%.2fs) to continue", heartbeatLag.Seconds(), (time.Duration(maxLagMillisecondsThrottleThreshold) * time.Millisecond).Seconds()) + this.migrationContext.Log.Debugf("current HeartbeatLag (%.2fs) is too high, it needs to be less than --max-lag-millis (%.2fs) to continue", heartbeatLag.Seconds(), (time.Duration(maxLagMillisecondsThrottleThreshold) * time.Millisecond).Seconds()) return true, nil } else { return false, nil From 503b7b0d6cff548669a143f579255b9c1e2ca3af Mon Sep 17 00:00:00 2001 From: Cathal Coffey Date: Sun, 7 Feb 2021 13:52:59 +0000 Subject: [PATCH 8/9] Consolidate the two sleepWhileTrue loops --- go/logic/migrator.go | 22 ++++++---------------- 1 file changed, 6 insertions(+), 16 deletions(-) diff --git a/go/logic/migrator.go b/go/logic/migrator.go index f866cfc..c64b8be 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -496,6 +496,12 @@ func (this *Migrator) cutOver() (err error) { this.migrationContext.Log.Debugf("checking for cut-over postpone") this.sleepWhileTrue( func() (bool, error) { + heartbeatLag := this.migrationContext.TimeSinceLastHeartbeatOnChangelog() + maxLagMillisecondsThrottleThreshold := atomic.LoadInt64(&this.migrationContext.MaxLagMillisecondsThrottleThreshold) + if heartbeatLag > time.Duration(maxLagMillisecondsThrottleThreshold)*time.Millisecond { + this.migrationContext.Log.Debugf("current HeartbeatLag (%.2fs) is too high, it needs to be less than --max-lag-millis (%.2fs) to continue", heartbeatLag.Seconds(), (time.Duration(maxLagMillisecondsThrottleThreshold) * time.Millisecond).Seconds()) + return true, nil + } if this.migrationContext.PostponeCutOverFlagFile == "" { return false, nil } @@ -520,22 +526,6 @@ func (this *Migrator) cutOver() (err error) { this.migrationContext.MarkPointOfInterest() this.migrationContext.Log.Debugf("checking for cut-over postpone: complete") - this.migrationContext.Log.Infof("Waiting for heartbeat lag to be low enough to proceed") - this.sleepWhileTrue( - func() (bool, error) { - heartbeatLag := this.migrationContext.TimeSinceLastHeartbeatOnChangelog() - maxLagMillisecondsThrottleThreshold := atomic.LoadInt64(&this.migrationContext.MaxLagMillisecondsThrottleThreshold) - if heartbeatLag > time.Duration(maxLagMillisecondsThrottleThreshold)*time.Millisecond { - this.migrationContext.Log.Debugf("current HeartbeatLag (%.2fs) is too high, it needs to be less than --max-lag-millis (%.2fs) to continue", heartbeatLag.Seconds(), (time.Duration(maxLagMillisecondsThrottleThreshold) * time.Millisecond).Seconds()) - return true, nil - } else { - return false, nil - } - }, - ) - this.migrationContext.MarkPointOfInterest() - this.migrationContext.Log.Infof("Heartbeat lag is low enough, proceeding") - if this.migrationContext.TestOnReplica { // With `--test-on-replica` we stop replication thread, and then proceed to use // the same cut-over phase as the master would use. That means we take locks From 3135a25c1f5bf07ebc08be668a19a4640e579c72 Mon Sep 17 00:00:00 2001 From: Cathal Coffey Date: Sun, 7 Feb 2021 14:29:10 +0000 Subject: [PATCH 9/9] HeartbeatLag must be < than --max-lag-millis and --cut-over-lock-timeout-seconds to continue --- go/logic/migrator.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/go/logic/migrator.go b/go/logic/migrator.go index c64b8be..2a038a8 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -497,9 +497,10 @@ func (this *Migrator) cutOver() (err error) { this.sleepWhileTrue( func() (bool, error) { heartbeatLag := this.migrationContext.TimeSinceLastHeartbeatOnChangelog() - maxLagMillisecondsThrottleThreshold := atomic.LoadInt64(&this.migrationContext.MaxLagMillisecondsThrottleThreshold) - if heartbeatLag > time.Duration(maxLagMillisecondsThrottleThreshold)*time.Millisecond { - this.migrationContext.Log.Debugf("current HeartbeatLag (%.2fs) is too high, it needs to be less than --max-lag-millis (%.2fs) to continue", heartbeatLag.Seconds(), (time.Duration(maxLagMillisecondsThrottleThreshold) * time.Millisecond).Seconds()) + maxLagMillisecondsThrottle := time.Duration(atomic.LoadInt64(&this.migrationContext.MaxLagMillisecondsThrottleThreshold)) * time.Millisecond + cutOverLockTimeout := time.Duration(this.migrationContext.CutOverLockTimeoutSeconds) * time.Second + if heartbeatLag > maxLagMillisecondsThrottle || heartbeatLag > cutOverLockTimeout { + this.migrationContext.Log.Debugf("current HeartbeatLag (%.2fs) is too high, it needs to be less than both --max-lag-millis (%.2fs) and --cut-over-lock-timeout-seconds (%.2fs) to continue", heartbeatLag.Seconds(), maxLagMillisecondsThrottle.Seconds(), cutOverLockTimeout.Seconds()) return true, nil } if this.migrationContext.PostponeCutOverFlagFile == "" {