context, status and hooks: progressPct and CurrentLag

This commit is contained in:
Shlomi Noach 2019-10-06 17:08:35 +03:00
parent dcc3e90f29
commit 7869889988
3 changed files with 13 additions and 2 deletions

View File

@ -174,6 +174,7 @@ type MigrationContext struct {
pointOfInterestTime time.Time
pointOfInterestTimeMutex *sync.Mutex
CurrentLag int64
CurrentProgress uint64 // math.Float64bits([f=0..100])
ThrottleHTTPStatusCode int64
controlReplicasLagResult mysql.ReplicationLagResult
TotalRowsCopied int64
@ -428,6 +429,10 @@ func (this *MigrationContext) MarkRowCopyEndTime() {
this.RowCopyEndTime = time.Now()
}
func (this *MigrationContext) GetCurrentLagDuration() time.Duration {
return time.Duration(atomic.LoadInt64(&this.CurrentLag))
}
// GetTotalRowsCopied returns the accurate number of rows being copied (affected)
// This is not exactly the same as the rows being iterated via chunks, but potentially close enough
func (this *MigrationContext) GetTotalRowsCopied() int64 {

View File

@ -8,6 +8,7 @@ package logic
import (
"fmt"
"math"
"os"
"os/exec"
"path/filepath"
@ -63,6 +64,8 @@ func (this *HooksExecutor) applyEnvironmentVariables(extraVariables ...string) [
env = append(env, fmt.Sprintf("GH_OST_MIGRATED_HOST=%s", this.migrationContext.GetApplierHostname()))
env = append(env, fmt.Sprintf("GH_OST_INSPECTED_HOST=%s", this.migrationContext.GetInspectorHostname()))
env = append(env, fmt.Sprintf("GH_OST_EXECUTING_HOST=%s", this.migrationContext.Hostname))
env = append(env, fmt.Sprintf("GH_OST_INSPECTED_LAG=%f", this.migrationContext.GetCurrentLagDuration().Seconds()))
env = append(env, fmt.Sprintf("GH_OST_PROGRESS=%f", math.Float64frombits(atomic.LoadUint64(&this.migrationContext.CurrentProgress))))
env = append(env, fmt.Sprintf("GH_OST_HOOKS_HINT=%s", this.migrationContext.HooksHintMessage))
env = append(env, fmt.Sprintf("GH_OST_HOOKS_HINT_OWNER=%s", this.migrationContext.HooksHintOwner))
env = append(env, fmt.Sprintf("GH_OST_HOOKS_HINT_TOKEN=%s", this.migrationContext.HooksHintToken))

View File

@ -895,6 +895,8 @@ func (this *Migrator) printStatus(rule PrintStatusRule, writers ...io.Writer) {
} else {
progressPct = 100.0 * float64(totalRowsCopied) / float64(rowsEstimate)
}
// we take the opportunity to update migration context with progressPct
atomic.StoreUint64(&this.migrationContext.CurrentProgress, math.Float64bits(progressPct))
// Before status, let's see if we should print a nice reminder for what exactly we're doing here.
shouldPrintMigrationStatusHint := (elapsedSeconds%600 == 0)
if rule == ForcePrintStatusAndHintRule {
@ -911,7 +913,7 @@ func (this *Migrator) printStatus(rule PrintStatusRule, writers ...io.Writer) {
eta := "N/A"
if progressPct >= 100.0 {
eta = "due"
} else if progressPct >= 1.0 {
} else if progressPct >= 0.1 {
elapsedRowCopySeconds := this.migrationContext.ElapsedRowCopyTime().Seconds()
totalExpectedSeconds := elapsedRowCopySeconds * float64(rowsEstimate) / float64(totalRowsCopied)
etaSeconds = totalExpectedSeconds - elapsedRowCopySeconds
@ -958,12 +960,13 @@ func (this *Migrator) printStatus(rule PrintStatusRule, writers ...io.Writer) {
currentBinlogCoordinates := *this.eventsStreamer.GetCurrentBinlogCoordinates()
status := fmt.Sprintf("Copy: %d/%d %.1f%%; Applied: %d; Backlog: %d/%d; Time: %+v(total), %+v(copy); streamer: %+v; State: %s; ETA: %s",
status := fmt.Sprintf("Copy: %d/%d %.1f%%; Applied: %d; Backlog: %d/%d; Time: %+v(total), %+v(copy); streamer: %+v; Lag: %+v, State: %s; ETA: %s",
totalRowsCopied, rowsEstimate, progressPct,
atomic.LoadInt64(&this.migrationContext.TotalDMLEventsApplied),
len(this.applyEventsQueue), cap(this.applyEventsQueue),
base.PrettifyDurationOutput(elapsedTime), base.PrettifyDurationOutput(this.migrationContext.ElapsedRowCopyTime()),
currentBinlogCoordinates,
this.migrationContext.GetCurrentLagDuration().Seconds(),
state,
eta,
)