eqma-based ETA

This commit is contained in:
Shlomi Noach 2016-10-24 13:37:19 +02:00
parent fba9741821
commit 87cebc2ccf
3 changed files with 58 additions and 146 deletions

View File

@ -767,11 +767,11 @@ func (this *Migrator) printStatus(rule PrintStatusRule, writers ...io.Writer) {
// and there is no further need to keep updating the value.
rowsEstimate = totalRowsCopied
}
var progressPct float64
var progressRatio float64
if rowsEstimate == 0 {
progressPct = 100.0
progressRatio = 1.0
} else {
progressPct = 100.0 * float64(totalRowsCopied) / float64(rowsEstimate)
progressRatio = float64(totalRowsCopied) / float64(rowsEstimate)
}
// Before status, let's see if we should print a nice reminder for what exactly we're doing here.
shouldPrintMigrationStatusHint := (elapsedSeconds%600 == 0)
@ -786,27 +786,25 @@ func (this *Migrator) printStatus(rule PrintStatusRule, writers ...io.Writer) {
}
var etaSeconds float64 = math.MaxFloat64
this.progressHistory.markState()
eta := "N/A"
if progressPct >= 100.0 {
eta = "due"
} else if etaTime := this.progressHistory.getETA(); progressPct >= 0.1 && !etaTime.IsZero() {
etaDuration := etaTime.Sub(time.Now())
eta = base.PrettifyDurationOutput(etaDuration)
etaSeconds = etaDuration.Seconds()
log.Errorf("==== etaTime: %+v, eta=%+v, etaSeonds=%+v", etaTime, eta, etaSeconds)
}
log.Errorf("===1 etaTime: %+v, eta=%+v, etaSeonds=%+v", "na", eta, etaSeconds)
if etaSeconds < 0 {
eta = "due"
visualETA := "N/A"
if progressRatio >= 1.0 {
visualETA = "due"
} else if _, err := this.progressHistory.markState(this.migrationContext.ElapsedRowCopyTime(), progressRatio); err == nil {
eta := this.progressHistory.GetETA()
etaDuration := eta.Sub(time.Now())
etaSeconds = etaDuration.Seconds()
visualETA = base.PrettifyDurationOutput(etaDuration)
}
if etaSeconds <= 0 {
visualETA = "due"
}
state := "migrating"
if atomic.LoadInt64(&this.migrationContext.CountingRowsFlag) > 0 && !this.migrationContext.ConcurrentCountTableRows {
state = "counting rows"
} else if atomic.LoadInt64(&this.migrationContext.IsPostponingCutOver) > 0 {
eta = "due"
visualETA = "due"
state = "postponing cut-over"
} else if isThrottled, throttleReason := this.migrationContext.IsThrottled(); isThrottled {
state = fmt.Sprintf("throttled, %s", throttleReason)
@ -837,6 +835,7 @@ func (this *Migrator) printStatus(rule PrintStatusRule, writers ...io.Writer) {
currentBinlogCoordinates := *this.eventsStreamer.GetCurrentBinlogCoordinates()
progressPct := progressRatio * 100.0
status := fmt.Sprintf("Copy: %d/%d %.1f%%; Applied: %d; Backlog: %d/%d; Time: %+v(total), %+v(copy); streamer: %+v; State: %s; ETA: %s",
totalRowsCopied, rowsEstimate, progressPct,
atomic.LoadInt64(&this.migrationContext.TotalDMLEventsApplied),
@ -844,7 +843,7 @@ func (this *Migrator) printStatus(rule PrintStatusRule, writers ...io.Writer) {
base.PrettifyDurationOutput(elapsedTime), base.PrettifyDurationOutput(this.migrationContext.ElapsedRowCopyTime()),
currentBinlogCoordinates,
state,
eta,
visualETA,
)
this.applier.WriteChangelog(
fmt.Sprintf("copy iteration %d at %d", this.migrationContext.GetIteration(), time.Now().Unix()),

View File

@ -6,138 +6,64 @@
package logic
import (
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/github/gh-ost/go/base"
)
const maxHistoryDuration time.Duration = time.Minute * 61
const minProgressRatio float64 = 0.001
const alpha float64 = 0.25
type ProgressState struct {
mark time.Time
rowsCopied int64
elapsedTime time.Duration
progressRatio float64
expectedTotalDuration time.Duration
}
func NewProgressState(rowsCopied int64) *ProgressState {
func NewProgressState(elapsedTime time.Duration, progressRatio float64, expectedTotalDuration time.Duration) *ProgressState {
result := &ProgressState{
mark: time.Now(),
rowsCopied: rowsCopied,
elapsedTime: elapsedTime,
progressRatio: progressRatio,
expectedTotalDuration: expectedTotalDuration,
}
return result
}
type ProgressHistory struct {
migrationContext *base.MigrationContext
history [](*ProgressState)
historyMutex *sync.Mutex
lastProgressState *ProgressState
historyMutex *sync.Mutex
eta time.Time
}
func NewProgressHistory() *ProgressHistory {
result := &ProgressHistory{
history: make([](*ProgressState), 0),
historyMutex: &sync.Mutex{},
migrationContext: base.GetMigrationContext(),
lastProgressState: nil,
historyMutex: &sync.Mutex{},
}
return result
}
func (this *ProgressHistory) oldestState() *ProgressState {
if len(this.history) == 0 {
return nil
func (this *ProgressHistory) markState(elapsedTime time.Duration, progressRatio float64) (expectedTotalDuration time.Duration, err error) {
if progressRatio < minProgressRatio || elapsedTime == 0 {
return expectedTotalDuration, fmt.Errorf("eta n/a")
}
return this.history[0]
}
func (this *ProgressHistory) firstStateSince(since time.Time) *ProgressState {
for _, state := range this.history {
if !state.mark.Before(since) {
return state
}
}
return nil
}
func (this *ProgressHistory) newestState() *ProgressState {
if len(this.history) == 0 {
return nil
}
return this.history[len(this.history)-1]
}
func (this *ProgressHistory) oldestMark() (mark time.Time) {
if oldest := this.oldestState(); oldest != nil {
return oldest.mark
}
return mark
}
func (this *ProgressHistory) markState() {
this.historyMutex.Lock()
defer this.historyMutex.Unlock()
state := NewProgressState(this.migrationContext.GetTotalRowsCopied())
this.history = append(this.history, state)
for time.Since(this.oldestMark()) > maxHistoryDuration {
if len(this.history) == 0 {
return
}
this.history = this.history[1:]
newExpectedTotalDuration := float64(elapsedTime.Nanoseconds()) / progressRatio
if this.lastProgressState != nil {
newExpectedTotalDuration = alpha*float64(this.lastProgressState.expectedTotalDuration.Nanoseconds()) + (1.0-alpha)*newExpectedTotalDuration
}
expectedTotalDuration = time.Duration(int64(newExpectedTotalDuration))
this.lastProgressState = NewProgressState(elapsedTime, progressRatio, expectedTotalDuration)
this.eta = time.Now().Add(expectedTotalDuration - elapsedTime)
return expectedTotalDuration, nil
}
// hasEnoughData tells us whether there's at all enough information to predict an ETA
// this function is not concurrent-safe
func (this *ProgressHistory) hasEnoughData() bool {
oldest := this.oldestState()
if oldest == nil {
return false
}
newest := this.newestState()
func (this *ProgressHistory) GetETA() time.Time {
this.historyMutex.Lock()
defer this.historyMutex.Unlock()
if !oldest.mark.Before(newest.mark) {
// single point in time; cannot extrapolate
return false
}
if oldest.rowsCopied == newest.rowsCopied {
// Nothing really happened; cannot extrapolate
return false
}
return true
}
func (this *ProgressHistory) getETABasedOnRange(basedOnRange time.Duration) (eta time.Time) {
if !this.hasEnoughData() {
return eta
}
oldest := this.firstStateSince(time.Now().Add(-basedOnRange))
newest := this.newestState()
rowsEstimate := atomic.LoadInt64(&this.migrationContext.RowsEstimate) + atomic.LoadInt64(&this.migrationContext.RowsDeltaEstimate)
ratio := float64(rowsEstimate-oldest.rowsCopied) / float64(newest.rowsCopied-oldest.rowsCopied)
// ratio is also float64(totaltime-oldest.mark) / float64(newest.mark-oldest.mark)
totalTimeNanosecondsFromOldestMark := ratio * float64(newest.mark.Sub(oldest.mark).Nanoseconds())
eta = oldest.mark.Add(time.Duration(totalTimeNanosecondsFromOldestMark))
return eta
}
func (this *ProgressHistory) getETA() (eta time.Time) {
if eta = this.getETABasedOnRange(time.Minute * 1); !eta.IsZero() {
return eta
}
if eta = this.getETABasedOnRange(time.Minute * 5); !eta.IsZero() {
return eta
}
if eta = this.getETABasedOnRange(time.Minute * 10); !eta.IsZero() {
return eta
}
if eta = this.getETABasedOnRange(time.Minute * 30); !eta.IsZero() {
return eta
}
if eta = this.getETABasedOnRange(time.Minute * 60); !eta.IsZero() {
return eta
}
return eta
return this.eta
}

View File

@ -19,41 +19,28 @@ func init() {
func TestNewProgressHistory(t *testing.T) {
progressHistory := NewProgressHistory()
test.S(t).ExpectEquals(len(progressHistory.history), 0)
test.S(t).ExpectTrue(progressHistory.lastProgressState == nil)
}
func TestMarkState(t *testing.T) {
{
progressHistory := NewProgressHistory()
test.S(t).ExpectEquals(len(progressHistory.history), 0)
_, err := progressHistory.markState(0, 0)
test.S(t).ExpectNotNil(err)
}
{
progressHistory := NewProgressHistory()
progressHistory.markState()
progressHistory.markState()
progressHistory.markState()
progressHistory.markState()
progressHistory.markState()
test.S(t).ExpectEquals(len(progressHistory.history), 5)
_, err := progressHistory.markState(0, 0.01)
test.S(t).ExpectNotNil(err)
}
{
progressHistory := NewProgressHistory()
progressHistory.markState()
progressHistory.markState()
progressHistory.history[0].mark = time.Now().Add(-2 * time.Hour)
progressHistory.markState()
progressHistory.markState()
progressHistory.markState()
test.S(t).ExpectEquals(len(progressHistory.history), 4)
}
}
func TestOldestMark(t *testing.T) {
{
progressHistory := NewProgressHistory()
oldestState := progressHistory.oldestState()
test.S(t).ExpectTrue(oldestState == nil)
oldestMark := progressHistory.oldestMark()
test.S(t).ExpectTrue(oldestMark.IsZero())
_, err := progressHistory.markState(0, 50)
test.S(t).ExpectNotNil(err)
}
{
progressHistory := NewProgressHistory()
_, err := progressHistory.markState(time.Hour, 50)
test.S(t).ExpectNil(err)
}
}