Merge pull request #788 from github/context-status
context, status and hooks: progressPct and CurrentLag
This commit is contained in:
commit
af43fbd52f
@ -65,6 +65,8 @@ The following variables are available on all hooks:
|
|||||||
- `GH_OST_ELAPSED_COPY_SECONDS` - row-copy time (excluding startup, row-count and postpone time)
|
- `GH_OST_ELAPSED_COPY_SECONDS` - row-copy time (excluding startup, row-count and postpone time)
|
||||||
- `GH_OST_ESTIMATED_ROWS` - estimated total rows in table
|
- `GH_OST_ESTIMATED_ROWS` - estimated total rows in table
|
||||||
- `GH_OST_COPIED_ROWS` - number of rows copied by `gh-ost`
|
- `GH_OST_COPIED_ROWS` - number of rows copied by `gh-ost`
|
||||||
|
- `GH_OST_INSPECTED_LAG` - lag in seconds (floating point) of inspected server
|
||||||
|
- `GH_OST_PROGRESS` - progress pct ([0..100], floating point) of migration
|
||||||
- `GH_OST_MIGRATED_HOST`
|
- `GH_OST_MIGRATED_HOST`
|
||||||
- `GH_OST_INSPECTED_HOST`
|
- `GH_OST_INSPECTED_HOST`
|
||||||
- `GH_OST_EXECUTING_HOST`
|
- `GH_OST_EXECUTING_HOST`
|
||||||
|
@ -7,6 +7,7 @@ package base
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"math"
|
||||||
"os"
|
"os"
|
||||||
"regexp"
|
"regexp"
|
||||||
"strings"
|
"strings"
|
||||||
@ -174,6 +175,7 @@ type MigrationContext struct {
|
|||||||
pointOfInterestTime time.Time
|
pointOfInterestTime time.Time
|
||||||
pointOfInterestTimeMutex *sync.Mutex
|
pointOfInterestTimeMutex *sync.Mutex
|
||||||
CurrentLag int64
|
CurrentLag int64
|
||||||
|
currentProgress uint64
|
||||||
ThrottleHTTPStatusCode int64
|
ThrottleHTTPStatusCode int64
|
||||||
controlReplicasLagResult mysql.ReplicationLagResult
|
controlReplicasLagResult mysql.ReplicationLagResult
|
||||||
TotalRowsCopied int64
|
TotalRowsCopied int64
|
||||||
@ -428,6 +430,20 @@ func (this *MigrationContext) MarkRowCopyEndTime() {
|
|||||||
this.RowCopyEndTime = time.Now()
|
this.RowCopyEndTime = time.Now()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (this *MigrationContext) GetCurrentLagDuration() time.Duration {
|
||||||
|
return time.Duration(atomic.LoadInt64(&this.CurrentLag))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (this *MigrationContext) GetProgressPct() float64 {
|
||||||
|
return math.Float64frombits(atomic.LoadUint64(&this.currentProgress))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (this *MigrationContext) SetProgressPct(progressPct float64) {
|
||||||
|
atomic.StoreUint64(&this.currentProgress, math.Float64bits(progressPct))
|
||||||
|
}
|
||||||
|
|
||||||
|
// math.Float64bits([f=0..100])
|
||||||
|
|
||||||
// GetTotalRowsCopied returns the accurate number of rows being copied (affected)
|
// GetTotalRowsCopied returns the accurate number of rows being copied (affected)
|
||||||
// This is not exactly the same as the rows being iterated via chunks, but potentially close enough
|
// This is not exactly the same as the rows being iterated via chunks, but potentially close enough
|
||||||
func (this *MigrationContext) GetTotalRowsCopied() int64 {
|
func (this *MigrationContext) GetTotalRowsCopied() int64 {
|
||||||
|
@ -63,6 +63,8 @@ func (this *HooksExecutor) applyEnvironmentVariables(extraVariables ...string) [
|
|||||||
env = append(env, fmt.Sprintf("GH_OST_MIGRATED_HOST=%s", this.migrationContext.GetApplierHostname()))
|
env = append(env, fmt.Sprintf("GH_OST_MIGRATED_HOST=%s", this.migrationContext.GetApplierHostname()))
|
||||||
env = append(env, fmt.Sprintf("GH_OST_INSPECTED_HOST=%s", this.migrationContext.GetInspectorHostname()))
|
env = append(env, fmt.Sprintf("GH_OST_INSPECTED_HOST=%s", this.migrationContext.GetInspectorHostname()))
|
||||||
env = append(env, fmt.Sprintf("GH_OST_EXECUTING_HOST=%s", this.migrationContext.Hostname))
|
env = append(env, fmt.Sprintf("GH_OST_EXECUTING_HOST=%s", this.migrationContext.Hostname))
|
||||||
|
env = append(env, fmt.Sprintf("GH_OST_INSPECTED_LAG=%f", this.migrationContext.GetCurrentLagDuration().Seconds()))
|
||||||
|
env = append(env, fmt.Sprintf("GH_OST_PROGRESS=%f", this.migrationContext.GetProgressPct()))
|
||||||
env = append(env, fmt.Sprintf("GH_OST_HOOKS_HINT=%s", this.migrationContext.HooksHintMessage))
|
env = append(env, fmt.Sprintf("GH_OST_HOOKS_HINT=%s", this.migrationContext.HooksHintMessage))
|
||||||
env = append(env, fmt.Sprintf("GH_OST_HOOKS_HINT_OWNER=%s", this.migrationContext.HooksHintOwner))
|
env = append(env, fmt.Sprintf("GH_OST_HOOKS_HINT_OWNER=%s", this.migrationContext.HooksHintOwner))
|
||||||
env = append(env, fmt.Sprintf("GH_OST_HOOKS_HINT_TOKEN=%s", this.migrationContext.HooksHintToken))
|
env = append(env, fmt.Sprintf("GH_OST_HOOKS_HINT_TOKEN=%s", this.migrationContext.HooksHintToken))
|
||||||
|
@ -895,6 +895,8 @@ func (this *Migrator) printStatus(rule PrintStatusRule, writers ...io.Writer) {
|
|||||||
} else {
|
} else {
|
||||||
progressPct = 100.0 * float64(totalRowsCopied) / float64(rowsEstimate)
|
progressPct = 100.0 * float64(totalRowsCopied) / float64(rowsEstimate)
|
||||||
}
|
}
|
||||||
|
// we take the opportunity to update migration context with progressPct
|
||||||
|
this.migrationContext.SetProgressPct(progressPct)
|
||||||
// Before status, let's see if we should print a nice reminder for what exactly we're doing here.
|
// Before status, let's see if we should print a nice reminder for what exactly we're doing here.
|
||||||
shouldPrintMigrationStatusHint := (elapsedSeconds%600 == 0)
|
shouldPrintMigrationStatusHint := (elapsedSeconds%600 == 0)
|
||||||
if rule == ForcePrintStatusAndHintRule {
|
if rule == ForcePrintStatusAndHintRule {
|
||||||
@ -911,7 +913,7 @@ func (this *Migrator) printStatus(rule PrintStatusRule, writers ...io.Writer) {
|
|||||||
eta := "N/A"
|
eta := "N/A"
|
||||||
if progressPct >= 100.0 {
|
if progressPct >= 100.0 {
|
||||||
eta = "due"
|
eta = "due"
|
||||||
} else if progressPct >= 1.0 {
|
} else if progressPct >= 0.1 {
|
||||||
elapsedRowCopySeconds := this.migrationContext.ElapsedRowCopyTime().Seconds()
|
elapsedRowCopySeconds := this.migrationContext.ElapsedRowCopyTime().Seconds()
|
||||||
totalExpectedSeconds := elapsedRowCopySeconds * float64(rowsEstimate) / float64(totalRowsCopied)
|
totalExpectedSeconds := elapsedRowCopySeconds * float64(rowsEstimate) / float64(totalRowsCopied)
|
||||||
etaSeconds = totalExpectedSeconds - elapsedRowCopySeconds
|
etaSeconds = totalExpectedSeconds - elapsedRowCopySeconds
|
||||||
@ -958,12 +960,13 @@ func (this *Migrator) printStatus(rule PrintStatusRule, writers ...io.Writer) {
|
|||||||
|
|
||||||
currentBinlogCoordinates := *this.eventsStreamer.GetCurrentBinlogCoordinates()
|
currentBinlogCoordinates := *this.eventsStreamer.GetCurrentBinlogCoordinates()
|
||||||
|
|
||||||
status := fmt.Sprintf("Copy: %d/%d %.1f%%; Applied: %d; Backlog: %d/%d; Time: %+v(total), %+v(copy); streamer: %+v; State: %s; ETA: %s",
|
status := fmt.Sprintf("Copy: %d/%d %.1f%%; Applied: %d; Backlog: %d/%d; Time: %+v(total), %+v(copy); streamer: %+v; Lag: %.2fs, State: %s; ETA: %s",
|
||||||
totalRowsCopied, rowsEstimate, progressPct,
|
totalRowsCopied, rowsEstimate, progressPct,
|
||||||
atomic.LoadInt64(&this.migrationContext.TotalDMLEventsApplied),
|
atomic.LoadInt64(&this.migrationContext.TotalDMLEventsApplied),
|
||||||
len(this.applyEventsQueue), cap(this.applyEventsQueue),
|
len(this.applyEventsQueue), cap(this.applyEventsQueue),
|
||||||
base.PrettifyDurationOutput(elapsedTime), base.PrettifyDurationOutput(this.migrationContext.ElapsedRowCopyTime()),
|
base.PrettifyDurationOutput(elapsedTime), base.PrettifyDurationOutput(this.migrationContext.ElapsedRowCopyTime()),
|
||||||
currentBinlogCoordinates,
|
currentBinlogCoordinates,
|
||||||
|
this.migrationContext.GetCurrentLagDuration().Seconds(),
|
||||||
state,
|
state,
|
||||||
eta,
|
eta,
|
||||||
)
|
)
|
||||||
|
Loading…
Reference in New Issue
Block a user