From f19f101610cbaaf67113aa4a1c223b702152dafb Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Mon, 31 May 2021 15:15:51 +0300 Subject: [PATCH] hooks: reporting GH_OST_ETA_SECONDS. ETA as part of migration context (#936) * v1.1.0 * WIP: copying AUTO_INCREMENT value to ghost table Initial commit: towards setting up a test suite Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> * greping for 'expect_table_structure' content * Adding simple test for 'expect_table_structure' scenario * adding tests for AUTO_INCREMENT value after row deletes. Should initially fail * clear event beforehand * parsing AUTO_INCREMENT from alter query, reading AUTO_INCREMENT from original table, applying AUTO_INCREMENT value onto ghost table if applicable and user has not specified AUTO_INCREMENT in alter statement * support GetUint64 Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> * minor update to test Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> * adding test for user defined AUTO_INCREMENT statement * Generated column as part of UNIQUE (or PRIMARY) KEY Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> * skip analysis of generated column data type in unique key Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> * All MySQL DBs limited to max 3 concurrent/idle connections Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> * hooks: reporting GH_OST_ETA_SECONDS. ETA stored as part of migration context Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> * GH_OST_ETA_NANOSECONDS Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> * N/A denoted by negative value Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> * ETAUnknown constant Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- go/base/context.go | 19 +++++++++++++++++++ go/logic/hooks.go | 1 + go/logic/migrator.go | 19 ++++++++++++++----- 3 files changed, 34 insertions(+), 5 deletions(-) diff --git a/go/base/context.go b/go/base/context.go index 51b43df..b9badc4 100644 --- a/go/base/context.go +++ b/go/base/context.go @@ -52,6 +52,7 @@ const ( const ( HTTPStatusOK = 200 MaxEventsBatchSize = 1000 + ETAUnknown = math.MinInt64 ) var ( @@ -182,6 +183,7 @@ type MigrationContext struct { lastHeartbeatOnChangelogMutex *sync.Mutex CurrentLag int64 currentProgress uint64 + etaNanoseonds int64 ThrottleHTTPStatusCode int64 controlReplicasLagResult mysql.ReplicationLagResult TotalRowsCopied int64 @@ -267,6 +269,7 @@ func NewMigrationContext() *MigrationContext { MaxLagMillisecondsThrottleThreshold: 1500, CutOverLockTimeoutSeconds: 3, DMLBatchSize: 10, + etaNanoseonds: ETAUnknown, maxLoad: NewLoadMap(), criticalLoad: NewLoadMap(), throttleMutex: &sync.Mutex{}, @@ -474,6 +477,22 @@ func (this *MigrationContext) SetProgressPct(progressPct float64) { atomic.StoreUint64(&this.currentProgress, math.Float64bits(progressPct)) } +func (this *MigrationContext) GetETADuration() time.Duration { + return time.Duration(atomic.LoadInt64(&this.etaNanoseonds)) +} + +func (this *MigrationContext) SetETADuration(etaDuration time.Duration) { + atomic.StoreInt64(&this.etaNanoseonds, etaDuration.Nanoseconds()) +} + +func (this *MigrationContext) GetETASeconds() int64 { + nano := atomic.LoadInt64(&this.etaNanoseonds) + if nano < 0 { + return ETAUnknown + } + return nano / int64(time.Second) +} + // math.Float64bits([f=0..100]) // GetTotalRowsCopied returns the accurate number of rows being copied (affected) diff --git a/go/logic/hooks.go b/go/logic/hooks.go index 71f070c..2275ede 100644 --- a/go/logic/hooks.go +++ b/go/logic/hooks.go @@ -66,6 +66,7 @@ func (this *HooksExecutor) applyEnvironmentVariables(extraVariables ...string) [ env = append(env, fmt.Sprintf("GH_OST_INSPECTED_LAG=%f", this.migrationContext.GetCurrentLagDuration().Seconds())) env = append(env, fmt.Sprintf("GH_OST_HEARTBEAT_LAG=%f", this.migrationContext.TimeSinceLastHeartbeatOnChangelog().Seconds())) env = append(env, fmt.Sprintf("GH_OST_PROGRESS=%f", this.migrationContext.GetProgressPct())) + env = append(env, fmt.Sprintf("GH_OST_ETA_SECONDS=%d", this.migrationContext.GetETASeconds())) env = append(env, fmt.Sprintf("GH_OST_HOOKS_HINT=%s", this.migrationContext.HooksHintMessage)) env = append(env, fmt.Sprintf("GH_OST_HOOKS_HINT_OWNER=%s", this.migrationContext.HooksHintOwner)) env = append(env, fmt.Sprintf("GH_OST_HOOKS_HINT_TOKEN=%s", this.migrationContext.HooksHintToken)) diff --git a/go/logic/migrator.go b/go/logic/migrator.go index dfddccf..c12c21f 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -939,20 +939,29 @@ func (this *Migrator) printStatus(rule PrintStatusRule, writers ...io.Writer) { } var etaSeconds float64 = math.MaxFloat64 - eta := "N/A" + var etaDuration = time.Duration(base.ETAUnknown) if progressPct >= 100.0 { - eta = "due" + etaDuration = 0 } else if progressPct >= 0.1 { elapsedRowCopySeconds := this.migrationContext.ElapsedRowCopyTime().Seconds() totalExpectedSeconds := elapsedRowCopySeconds * float64(rowsEstimate) / float64(totalRowsCopied) etaSeconds = totalExpectedSeconds - elapsedRowCopySeconds if etaSeconds >= 0 { - etaDuration := time.Duration(etaSeconds) * time.Second - eta = base.PrettifyDurationOutput(etaDuration) + etaDuration = time.Duration(etaSeconds) * time.Second } else { - eta = "due" + etaDuration = 0 } } + this.migrationContext.SetETADuration(etaDuration) + var eta string + switch etaDuration { + case 0: + eta = "due" + case time.Duration(base.ETAUnknown): + eta = "N/A" + default: + eta = base.PrettifyDurationOutput(etaDuration) + } state := "migrating" if atomic.LoadInt64(&this.migrationContext.CountingRowsFlag) > 0 && !this.migrationContext.ConcurrentCountTableRows {