hooks: reporting GH_OST_ETA_SECONDS. ETA as part of migration context (#936)
* v1.1.0 * WIP: copying AUTO_INCREMENT value to ghost table Initial commit: towards setting up a test suite Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> * greping for 'expect_table_structure' content * Adding simple test for 'expect_table_structure' scenario * adding tests for AUTO_INCREMENT value after row deletes. Should initially fail * clear event beforehand * parsing AUTO_INCREMENT from alter query, reading AUTO_INCREMENT from original table, applying AUTO_INCREMENT value onto ghost table if applicable and user has not specified AUTO_INCREMENT in alter statement * support GetUint64 Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> * minor update to test Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> * adding test for user defined AUTO_INCREMENT statement * Generated column as part of UNIQUE (or PRIMARY) KEY Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> * skip analysis of generated column data type in unique key Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> * All MySQL DBs limited to max 3 concurrent/idle connections Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> * hooks: reporting GH_OST_ETA_SECONDS. ETA stored as part of migration context Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> * GH_OST_ETA_NANOSECONDS Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> * N/A denoted by negative value Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> * ETAUnknown constant Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com>
This commit is contained in:
parent
c41823ecc9
commit
f19f101610
@ -52,6 +52,7 @@ const (
|
|||||||
const (
|
const (
|
||||||
HTTPStatusOK = 200
|
HTTPStatusOK = 200
|
||||||
MaxEventsBatchSize = 1000
|
MaxEventsBatchSize = 1000
|
||||||
|
ETAUnknown = math.MinInt64
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@ -182,6 +183,7 @@ type MigrationContext struct {
|
|||||||
lastHeartbeatOnChangelogMutex *sync.Mutex
|
lastHeartbeatOnChangelogMutex *sync.Mutex
|
||||||
CurrentLag int64
|
CurrentLag int64
|
||||||
currentProgress uint64
|
currentProgress uint64
|
||||||
|
etaNanoseonds int64
|
||||||
ThrottleHTTPStatusCode int64
|
ThrottleHTTPStatusCode int64
|
||||||
controlReplicasLagResult mysql.ReplicationLagResult
|
controlReplicasLagResult mysql.ReplicationLagResult
|
||||||
TotalRowsCopied int64
|
TotalRowsCopied int64
|
||||||
@ -267,6 +269,7 @@ func NewMigrationContext() *MigrationContext {
|
|||||||
MaxLagMillisecondsThrottleThreshold: 1500,
|
MaxLagMillisecondsThrottleThreshold: 1500,
|
||||||
CutOverLockTimeoutSeconds: 3,
|
CutOverLockTimeoutSeconds: 3,
|
||||||
DMLBatchSize: 10,
|
DMLBatchSize: 10,
|
||||||
|
etaNanoseonds: ETAUnknown,
|
||||||
maxLoad: NewLoadMap(),
|
maxLoad: NewLoadMap(),
|
||||||
criticalLoad: NewLoadMap(),
|
criticalLoad: NewLoadMap(),
|
||||||
throttleMutex: &sync.Mutex{},
|
throttleMutex: &sync.Mutex{},
|
||||||
@ -474,6 +477,22 @@ func (this *MigrationContext) SetProgressPct(progressPct float64) {
|
|||||||
atomic.StoreUint64(&this.currentProgress, math.Float64bits(progressPct))
|
atomic.StoreUint64(&this.currentProgress, math.Float64bits(progressPct))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (this *MigrationContext) GetETADuration() time.Duration {
|
||||||
|
return time.Duration(atomic.LoadInt64(&this.etaNanoseonds))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (this *MigrationContext) SetETADuration(etaDuration time.Duration) {
|
||||||
|
atomic.StoreInt64(&this.etaNanoseonds, etaDuration.Nanoseconds())
|
||||||
|
}
|
||||||
|
|
||||||
|
func (this *MigrationContext) GetETASeconds() int64 {
|
||||||
|
nano := atomic.LoadInt64(&this.etaNanoseonds)
|
||||||
|
if nano < 0 {
|
||||||
|
return ETAUnknown
|
||||||
|
}
|
||||||
|
return nano / int64(time.Second)
|
||||||
|
}
|
||||||
|
|
||||||
// math.Float64bits([f=0..100])
|
// math.Float64bits([f=0..100])
|
||||||
|
|
||||||
// GetTotalRowsCopied returns the accurate number of rows being copied (affected)
|
// GetTotalRowsCopied returns the accurate number of rows being copied (affected)
|
||||||
|
@ -66,6 +66,7 @@ func (this *HooksExecutor) applyEnvironmentVariables(extraVariables ...string) [
|
|||||||
env = append(env, fmt.Sprintf("GH_OST_INSPECTED_LAG=%f", this.migrationContext.GetCurrentLagDuration().Seconds()))
|
env = append(env, fmt.Sprintf("GH_OST_INSPECTED_LAG=%f", this.migrationContext.GetCurrentLagDuration().Seconds()))
|
||||||
env = append(env, fmt.Sprintf("GH_OST_HEARTBEAT_LAG=%f", this.migrationContext.TimeSinceLastHeartbeatOnChangelog().Seconds()))
|
env = append(env, fmt.Sprintf("GH_OST_HEARTBEAT_LAG=%f", this.migrationContext.TimeSinceLastHeartbeatOnChangelog().Seconds()))
|
||||||
env = append(env, fmt.Sprintf("GH_OST_PROGRESS=%f", this.migrationContext.GetProgressPct()))
|
env = append(env, fmt.Sprintf("GH_OST_PROGRESS=%f", this.migrationContext.GetProgressPct()))
|
||||||
|
env = append(env, fmt.Sprintf("GH_OST_ETA_SECONDS=%d", this.migrationContext.GetETASeconds()))
|
||||||
env = append(env, fmt.Sprintf("GH_OST_HOOKS_HINT=%s", this.migrationContext.HooksHintMessage))
|
env = append(env, fmt.Sprintf("GH_OST_HOOKS_HINT=%s", this.migrationContext.HooksHintMessage))
|
||||||
env = append(env, fmt.Sprintf("GH_OST_HOOKS_HINT_OWNER=%s", this.migrationContext.HooksHintOwner))
|
env = append(env, fmt.Sprintf("GH_OST_HOOKS_HINT_OWNER=%s", this.migrationContext.HooksHintOwner))
|
||||||
env = append(env, fmt.Sprintf("GH_OST_HOOKS_HINT_TOKEN=%s", this.migrationContext.HooksHintToken))
|
env = append(env, fmt.Sprintf("GH_OST_HOOKS_HINT_TOKEN=%s", this.migrationContext.HooksHintToken))
|
||||||
|
@ -939,20 +939,29 @@ func (this *Migrator) printStatus(rule PrintStatusRule, writers ...io.Writer) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
var etaSeconds float64 = math.MaxFloat64
|
var etaSeconds float64 = math.MaxFloat64
|
||||||
eta := "N/A"
|
var etaDuration = time.Duration(base.ETAUnknown)
|
||||||
if progressPct >= 100.0 {
|
if progressPct >= 100.0 {
|
||||||
eta = "due"
|
etaDuration = 0
|
||||||
} else if progressPct >= 0.1 {
|
} else if progressPct >= 0.1 {
|
||||||
elapsedRowCopySeconds := this.migrationContext.ElapsedRowCopyTime().Seconds()
|
elapsedRowCopySeconds := this.migrationContext.ElapsedRowCopyTime().Seconds()
|
||||||
totalExpectedSeconds := elapsedRowCopySeconds * float64(rowsEstimate) / float64(totalRowsCopied)
|
totalExpectedSeconds := elapsedRowCopySeconds * float64(rowsEstimate) / float64(totalRowsCopied)
|
||||||
etaSeconds = totalExpectedSeconds - elapsedRowCopySeconds
|
etaSeconds = totalExpectedSeconds - elapsedRowCopySeconds
|
||||||
if etaSeconds >= 0 {
|
if etaSeconds >= 0 {
|
||||||
etaDuration := time.Duration(etaSeconds) * time.Second
|
etaDuration = time.Duration(etaSeconds) * time.Second
|
||||||
eta = base.PrettifyDurationOutput(etaDuration)
|
|
||||||
} else {
|
} else {
|
||||||
eta = "due"
|
etaDuration = 0
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
this.migrationContext.SetETADuration(etaDuration)
|
||||||
|
var eta string
|
||||||
|
switch etaDuration {
|
||||||
|
case 0:
|
||||||
|
eta = "due"
|
||||||
|
case time.Duration(base.ETAUnknown):
|
||||||
|
eta = "N/A"
|
||||||
|
default:
|
||||||
|
eta = base.PrettifyDurationOutput(etaDuration)
|
||||||
|
}
|
||||||
|
|
||||||
state := "migrating"
|
state := "migrating"
|
||||||
if atomic.LoadInt64(&this.migrationContext.CountingRowsFlag) > 0 && !this.migrationContext.ConcurrentCountTableRows {
|
if atomic.LoadInt64(&this.migrationContext.CountingRowsFlag) > 0 && !this.migrationContext.ConcurrentCountTableRows {
|
||||||
|
Loading…
Reference in New Issue
Block a user