diff --git a/go/base/context.go b/go/base/context.go index 51b43df..b9badc4 100644 --- a/go/base/context.go +++ b/go/base/context.go @@ -52,6 +52,7 @@ const ( const ( HTTPStatusOK = 200 MaxEventsBatchSize = 1000 + ETAUnknown = math.MinInt64 ) var ( @@ -182,6 +183,7 @@ type MigrationContext struct { lastHeartbeatOnChangelogMutex *sync.Mutex CurrentLag int64 currentProgress uint64 + etaNanoseonds int64 ThrottleHTTPStatusCode int64 controlReplicasLagResult mysql.ReplicationLagResult TotalRowsCopied int64 @@ -267,6 +269,7 @@ func NewMigrationContext() *MigrationContext { MaxLagMillisecondsThrottleThreshold: 1500, CutOverLockTimeoutSeconds: 3, DMLBatchSize: 10, + etaNanoseonds: ETAUnknown, maxLoad: NewLoadMap(), criticalLoad: NewLoadMap(), throttleMutex: &sync.Mutex{}, @@ -474,6 +477,22 @@ func (this *MigrationContext) SetProgressPct(progressPct float64) { atomic.StoreUint64(&this.currentProgress, math.Float64bits(progressPct)) } +func (this *MigrationContext) GetETADuration() time.Duration { + return time.Duration(atomic.LoadInt64(&this.etaNanoseonds)) +} + +func (this *MigrationContext) SetETADuration(etaDuration time.Duration) { + atomic.StoreInt64(&this.etaNanoseonds, etaDuration.Nanoseconds()) +} + +func (this *MigrationContext) GetETASeconds() int64 { + nano := atomic.LoadInt64(&this.etaNanoseonds) + if nano < 0 { + return ETAUnknown + } + return nano / int64(time.Second) +} + // math.Float64bits([f=0..100]) // GetTotalRowsCopied returns the accurate number of rows being copied (affected) diff --git a/go/logic/hooks.go b/go/logic/hooks.go index 71f070c..2275ede 100644 --- a/go/logic/hooks.go +++ b/go/logic/hooks.go @@ -66,6 +66,7 @@ func (this *HooksExecutor) applyEnvironmentVariables(extraVariables ...string) [ env = append(env, fmt.Sprintf("GH_OST_INSPECTED_LAG=%f", this.migrationContext.GetCurrentLagDuration().Seconds())) env = append(env, fmt.Sprintf("GH_OST_HEARTBEAT_LAG=%f", this.migrationContext.TimeSinceLastHeartbeatOnChangelog().Seconds())) env = append(env, fmt.Sprintf("GH_OST_PROGRESS=%f", this.migrationContext.GetProgressPct())) + env = append(env, fmt.Sprintf("GH_OST_ETA_SECONDS=%d", this.migrationContext.GetETASeconds())) env = append(env, fmt.Sprintf("GH_OST_HOOKS_HINT=%s", this.migrationContext.HooksHintMessage)) env = append(env, fmt.Sprintf("GH_OST_HOOKS_HINT_OWNER=%s", this.migrationContext.HooksHintOwner)) env = append(env, fmt.Sprintf("GH_OST_HOOKS_HINT_TOKEN=%s", this.migrationContext.HooksHintToken)) diff --git a/go/logic/migrator.go b/go/logic/migrator.go index dfddccf..c12c21f 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -939,20 +939,29 @@ func (this *Migrator) printStatus(rule PrintStatusRule, writers ...io.Writer) { } var etaSeconds float64 = math.MaxFloat64 - eta := "N/A" + var etaDuration = time.Duration(base.ETAUnknown) if progressPct >= 100.0 { - eta = "due" + etaDuration = 0 } else if progressPct >= 0.1 { elapsedRowCopySeconds := this.migrationContext.ElapsedRowCopyTime().Seconds() totalExpectedSeconds := elapsedRowCopySeconds * float64(rowsEstimate) / float64(totalRowsCopied) etaSeconds = totalExpectedSeconds - elapsedRowCopySeconds if etaSeconds >= 0 { - etaDuration := time.Duration(etaSeconds) * time.Second - eta = base.PrettifyDurationOutput(etaDuration) + etaDuration = time.Duration(etaSeconds) * time.Second } else { - eta = "due" + etaDuration = 0 } } + this.migrationContext.SetETADuration(etaDuration) + var eta string + switch etaDuration { + case 0: + eta = "due" + case time.Duration(base.ETAUnknown): + eta = "N/A" + default: + eta = base.PrettifyDurationOutput(etaDuration) + } state := "migrating" if atomic.LoadInt64(&this.migrationContext.CountingRowsFlag) > 0 && !this.migrationContext.ConcurrentCountTableRows {