Merge 5f3122dff2
into 0a033c76c1
This commit is contained in:
commit
edc9acea1c
|
@ -188,6 +188,7 @@ type MigrationContext struct {
|
|||
CurrentLag int64
|
||||
currentProgress uint64
|
||||
etaNanoseonds int64
|
||||
EtaRowsPerSecond int64
|
||||
ThrottleHTTPIntervalMillis int64
|
||||
ThrottleHTTPStatusCode int64
|
||||
ThrottleHTTPTimeoutMillis int64
|
||||
|
|
|
@ -808,11 +808,18 @@ func (this *Migrator) initiateStatus() {
|
|||
this.printStatus(ForcePrintStatusAndHintRule)
|
||||
ticker := time.NewTicker(time.Second)
|
||||
defer ticker.Stop()
|
||||
var previousCount int64
|
||||
for range ticker.C {
|
||||
if atomic.LoadInt64(&this.finishedMigrating) > 0 {
|
||||
return
|
||||
}
|
||||
go this.printStatus(HeuristicPrintStatusRule)
|
||||
totalCopied := atomic.LoadInt64(&this.migrationContext.TotalRowsCopied)
|
||||
if previousCount > 0 {
|
||||
copiedThisLoop := totalCopied - previousCount
|
||||
atomic.StoreInt64(&this.migrationContext.EtaRowsPerSecond, copiedThisLoop)
|
||||
}
|
||||
previousCount = totalCopied
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -914,9 +921,20 @@ func (this *Migrator) getMigrationETA(rowsEstimate int64) (eta string, duration
|
|||
duration = 0
|
||||
} else if progressPct >= 0.1 {
|
||||
totalRowsCopied := this.migrationContext.GetTotalRowsCopied()
|
||||
elapsedRowCopySeconds := this.migrationContext.ElapsedRowCopyTime().Seconds()
|
||||
totalExpectedSeconds := elapsedRowCopySeconds * float64(rowsEstimate) / float64(totalRowsCopied)
|
||||
etaSeconds := totalExpectedSeconds - elapsedRowCopySeconds
|
||||
etaRowsPerSecond := atomic.LoadInt64(&this.migrationContext.EtaRowsPerSecond)
|
||||
var etaSeconds float64
|
||||
// If there is data available on our current row-copies-per-second rate, use it.
|
||||
// Otherwise we can fallback to the total elapsed time and extrapolate.
|
||||
// This is going to be less accurate on a longer copy as the insert rate
|
||||
// will tend to slow down.
|
||||
if etaRowsPerSecond > 0 {
|
||||
remainingRows := float64(rowsEstimate) - float64(totalRowsCopied)
|
||||
etaSeconds = remainingRows / float64(etaRowsPerSecond)
|
||||
} else {
|
||||
elapsedRowCopySeconds := this.migrationContext.ElapsedRowCopyTime().Seconds()
|
||||
totalExpectedSeconds := elapsedRowCopySeconds * float64(rowsEstimate) / float64(totalRowsCopied)
|
||||
etaSeconds = totalExpectedSeconds - elapsedRowCopySeconds
|
||||
}
|
||||
if etaSeconds >= 0 {
|
||||
duration = time.Duration(etaSeconds) * time.Second
|
||||
} else {
|
||||
|
|
|
@ -209,6 +209,15 @@ func TestMigratorGetMigrationStateAndETA(t *testing.T) {
|
|||
tests.S(t).ExpectEquals(eta, "4h29m44s")
|
||||
tests.S(t).ExpectEquals(etaDuration.String(), "4h29m44s")
|
||||
}
|
||||
{
|
||||
// Test using rows-per-second added data.
|
||||
migrationContext.TotalRowsCopied = 456
|
||||
migrationContext.EtaRowsPerSecond = 100
|
||||
state, eta, etaDuration := migrator.getMigrationStateAndETA(123456)
|
||||
tests.S(t).ExpectEquals(state, "migrating")
|
||||
tests.S(t).ExpectEquals(eta, "20m30s")
|
||||
tests.S(t).ExpectEquals(etaDuration.String(), "20m30s")
|
||||
}
|
||||
{
|
||||
migrationContext.TotalRowsCopied = 456
|
||||
state, eta, etaDuration := migrator.getMigrationStateAndETA(456)
|
||||
|
|
Loading…
Reference in New Issue
Block a user