ETA calculation based on past hour's progress (moving window)

This commit is contained in:
Shlomi Noach 2016-10-21 21:43:24 +02:00
parent cf08babc55
commit 90d557be9e
2 changed files with 22 additions and 10 deletions

View File

@ -70,6 +70,8 @@ type Migrator struct {
applyEventsQueue chan tableWriteFunc applyEventsQueue chan tableWriteFunc
handledChangelogStates map[string]bool handledChangelogStates map[string]bool
progressHistory *ProgressHistory
} }
func NewMigrator() *Migrator { func NewMigrator() *Migrator {
@ -84,6 +86,7 @@ func NewMigrator() *Migrator {
copyRowsQueue: make(chan tableWriteFunc), copyRowsQueue: make(chan tableWriteFunc),
applyEventsQueue: make(chan tableWriteFunc, applyEventsQueueBuffer), applyEventsQueue: make(chan tableWriteFunc, applyEventsQueueBuffer),
handledChangelogStates: make(map[string]bool), handledChangelogStates: make(map[string]bool),
progressHistory: NewProgressHistory(),
} }
return migrator return migrator
} }
@ -782,19 +785,18 @@ func (this *Migrator) printStatus(rule PrintStatusRule, writers ...io.Writer) {
} }
var etaSeconds float64 = math.MaxFloat64 var etaSeconds float64 = math.MaxFloat64
this.progressHistory.markState()
eta := "N/A" eta := "N/A"
if progressPct >= 100.0 { if progressPct >= 100.0 {
eta = "due" eta = "due"
} else if progressPct >= 1.0 { } else if etaTime := this.progressHistory.getETA(); progressPct >= 0.1 && !etaTime.IsZero() {
elapsedRowCopySeconds := this.migrationContext.ElapsedRowCopyTime().Seconds() etaDuration := etaTime.Sub(time.Now())
totalExpectedSeconds := elapsedRowCopySeconds * float64(rowsEstimate) / float64(totalRowsCopied)
etaSeconds = totalExpectedSeconds - elapsedRowCopySeconds
if etaSeconds >= 0 {
etaDuration := time.Duration(etaSeconds) * time.Second
eta = base.PrettifyDurationOutput(etaDuration) eta = base.PrettifyDurationOutput(etaDuration)
} else { etaSeconds = etaDuration.Seconds()
eta = "due"
} }
if etaSeconds < 0 {
eta = "due"
} }
state := "migrating" state := "migrating"

View File

@ -7,6 +7,7 @@ package logic
import ( import (
"sync" "sync"
"sync/atomic"
"time" "time"
"github.com/github/gh-ost/go/base" "github.com/github/gh-ost/go/base"
@ -101,5 +102,14 @@ func (this *ProgressHistory) getETA() (eta time.Time) {
if !this.hasEnoughData() { if !this.hasEnoughData() {
return eta return eta
} }
oldest := this.oldestState()
newest := this.newestState()
rowsEstimate := atomic.LoadInt64(&this.migrationContext.RowsEstimate) + atomic.LoadInt64(&this.migrationContext.RowsDeltaEstimate)
ratio := float64(rowsEstimate-oldest.rowsCopied) / float64(newest.rowsCopied-oldest.rowsCopied)
// ratio is also float64(totaltime-oldest.mark) / float64(newest.mark-oldest.mark)
totalTimeNanosecondsFromOldestMark := ratio * float64(newest.mark.Sub(oldest.mark).Nanoseconds())
eta = oldest.mark.Add(time.Duration(totalTimeNanosecondsFromOldestMark))
return eta return eta
} }