Merge pull request #17 from openark/hooks-eta-seconds

hooks: reporting GH_OST_ETA_SECONDS. ETA as part of migration context
This commit is contained in:
Shlomi Noach 2021-03-07 14:55:31 +02:00 committed by GitHub
commit 33516f4955
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 34 additions and 5 deletions

View File

@ -52,6 +52,7 @@ const (
const (
HTTPStatusOK = 200
MaxEventsBatchSize = 1000
ETAUnknown = math.MinInt64
)
var (
@ -182,6 +183,7 @@ type MigrationContext struct {
lastHeartbeatOnChangelogMutex *sync.Mutex
CurrentLag int64
currentProgress uint64
etaNanoseonds int64
ThrottleHTTPStatusCode int64
controlReplicasLagResult mysql.ReplicationLagResult
TotalRowsCopied int64
@ -267,6 +269,7 @@ func NewMigrationContext() *MigrationContext {
MaxLagMillisecondsThrottleThreshold: 1500,
CutOverLockTimeoutSeconds: 3,
DMLBatchSize: 10,
etaNanoseonds: ETAUnknown,
maxLoad: NewLoadMap(),
criticalLoad: NewLoadMap(),
throttleMutex: &sync.Mutex{},
@ -474,6 +477,22 @@ func (this *MigrationContext) SetProgressPct(progressPct float64) {
atomic.StoreUint64(&this.currentProgress, math.Float64bits(progressPct))
}
func (this *MigrationContext) GetETADuration() time.Duration {
return time.Duration(atomic.LoadInt64(&this.etaNanoseonds))
}
func (this *MigrationContext) SetETADuration(etaDuration time.Duration) {
atomic.StoreInt64(&this.etaNanoseonds, etaDuration.Nanoseconds())
}
func (this *MigrationContext) GetETASeconds() int64 {
nano := atomic.LoadInt64(&this.etaNanoseonds)
if nano < 0 {
return ETAUnknown
}
return nano / int64(time.Second)
}
// math.Float64bits([f=0..100])
// GetTotalRowsCopied returns the accurate number of rows being copied (affected)

View File

@ -66,6 +66,7 @@ func (this *HooksExecutor) applyEnvironmentVariables(extraVariables ...string) [
env = append(env, fmt.Sprintf("GH_OST_INSPECTED_LAG=%f", this.migrationContext.GetCurrentLagDuration().Seconds()))
env = append(env, fmt.Sprintf("GH_OST_HEARTBEAT_LAG=%f", this.migrationContext.TimeSinceLastHeartbeatOnChangelog().Seconds()))
env = append(env, fmt.Sprintf("GH_OST_PROGRESS=%f", this.migrationContext.GetProgressPct()))
env = append(env, fmt.Sprintf("GH_OST_ETA_SECONDS=%d", this.migrationContext.GetETASeconds()))
env = append(env, fmt.Sprintf("GH_OST_HOOKS_HINT=%s", this.migrationContext.HooksHintMessage))
env = append(env, fmt.Sprintf("GH_OST_HOOKS_HINT_OWNER=%s", this.migrationContext.HooksHintOwner))
env = append(env, fmt.Sprintf("GH_OST_HOOKS_HINT_TOKEN=%s", this.migrationContext.HooksHintToken))

View File

@ -939,20 +939,29 @@ func (this *Migrator) printStatus(rule PrintStatusRule, writers ...io.Writer) {
}
var etaSeconds float64 = math.MaxFloat64
eta := "N/A"
var etaDuration = time.Duration(base.ETAUnknown)
if progressPct >= 100.0 {
eta = "due"
etaDuration = 0
} else if progressPct >= 0.1 {
elapsedRowCopySeconds := this.migrationContext.ElapsedRowCopyTime().Seconds()
totalExpectedSeconds := elapsedRowCopySeconds * float64(rowsEstimate) / float64(totalRowsCopied)
etaSeconds = totalExpectedSeconds - elapsedRowCopySeconds
if etaSeconds >= 0 {
etaDuration := time.Duration(etaSeconds) * time.Second
eta = base.PrettifyDurationOutput(etaDuration)
etaDuration = time.Duration(etaSeconds) * time.Second
} else {
eta = "due"
etaDuration = 0
}
}
this.migrationContext.SetETADuration(etaDuration)
var eta string
switch etaDuration {
case 0:
eta = "due"
case time.Duration(base.ETAUnknown):
eta = "N/A"
default:
eta = base.PrettifyDurationOutput(etaDuration)
}
state := "migrating"
if atomic.LoadInt64(&this.migrationContext.CountingRowsFlag) > 0 && !this.migrationContext.ConcurrentCountTableRows {