use current copy rate to project ETA

This commit is contained in:
Morgan Tocker 2022-12-13 17:54:06 -07:00
parent 7320fda848
commit 2ddde19b2d
3 changed files with 32 additions and 3 deletions

View File

@ -188,6 +188,7 @@ type MigrationContext struct {
CurrentLag int64
currentProgress uint64
etaNanoseonds int64
EtaRowsPerSecond int64
ThrottleHTTPIntervalMillis int64
ThrottleHTTPStatusCode int64
ThrottleHTTPTimeoutMillis int64

View File

@ -813,11 +813,18 @@ func (this *Migrator) initiateStatus() {
this.printStatus(ForcePrintStatusAndHintRule)
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
var previousCount int64
for range ticker.C {
if atomic.LoadInt64(&this.finishedMigrating) > 0 {
return
}
go this.printStatus(HeuristicPrintStatusRule)
totalCopied := atomic.LoadInt64(&this.migrationContext.TotalRowsCopied)
if previousCount > 0 {
copiedThisLoop := totalCopied - previousCount
atomic.StoreInt64(&this.migrationContext.EtaRowsPerSecond, copiedThisLoop)
}
previousCount = totalCopied
}
}
@ -919,9 +926,20 @@ func (this *Migrator) getMigrationETA(rowsEstimate int64) (eta string, duration
duration = 0
} else if progressPct >= 0.1 {
totalRowsCopied := this.migrationContext.GetTotalRowsCopied()
elapsedRowCopySeconds := this.migrationContext.ElapsedRowCopyTime().Seconds()
totalExpectedSeconds := elapsedRowCopySeconds * float64(rowsEstimate) / float64(totalRowsCopied)
etaSeconds := totalExpectedSeconds - elapsedRowCopySeconds
etaRowsPerSecond := atomic.LoadInt64(&this.migrationContext.EtaRowsPerSecond)
var etaSeconds float64
// If there is data available on our current row-copies-per-second rate, use it.
// Otherwise we can fallback to the total elapsed time and extrapolate.
// This is going to be less accurate on a longer copy as the insert rate
// will tend to slow down.
if etaRowsPerSecond > 0 {
remainingRows := float64(rowsEstimate) - float64(totalRowsCopied)
etaSeconds = remainingRows / float64(etaRowsPerSecond)
} else {
elapsedRowCopySeconds := this.migrationContext.ElapsedRowCopyTime().Seconds()
totalExpectedSeconds := elapsedRowCopySeconds * float64(rowsEstimate) / float64(totalRowsCopied)
etaSeconds = totalExpectedSeconds - elapsedRowCopySeconds
}
if etaSeconds >= 0 {
duration = time.Duration(etaSeconds) * time.Second
} else {

View File

@ -209,6 +209,15 @@ func TestMigratorGetMigrationStateAndETA(t *testing.T) {
tests.S(t).ExpectEquals(eta, "4h29m44s")
tests.S(t).ExpectEquals(etaDuration.String(), "4h29m44s")
}
{
// Test using rows-per-second added data.
migrationContext.TotalRowsCopied = 456
migrationContext.EtaRowsPerSecond = 100
state, eta, etaDuration := migrator.getMigrationStateAndETA(123456)
tests.S(t).ExpectEquals(state, "migrating")
tests.S(t).ExpectEquals(eta, "20m30s")
tests.S(t).ExpectEquals(etaDuration.String(), "20m30s")
}
{
migrationContext.TotalRowsCopied = 456
state, eta, etaDuration := migrator.getMigrationStateAndETA(456)
@ -238,6 +247,7 @@ func TestMigratorGetMigrationStateAndETA(t *testing.T) {
tests.S(t).ExpectEquals(eta, "due")
tests.S(t).ExpectEquals(etaDuration.String(), "0s")
}
}
func TestMigratorShouldPrintStatus(t *testing.T) {