diff --git a/go/logic/migrator.go b/go/logic/migrator.go index f36c28a..f44b6d6 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -767,11 +767,11 @@ func (this *Migrator) printStatus(rule PrintStatusRule, writers ...io.Writer) { // and there is no further need to keep updating the value. rowsEstimate = totalRowsCopied } - var progressPct float64 + var progressRatio float64 if rowsEstimate == 0 { - progressPct = 100.0 + progressRatio = 1.0 } else { - progressPct = 100.0 * float64(totalRowsCopied) / float64(rowsEstimate) + progressRatio = float64(totalRowsCopied) / float64(rowsEstimate) } // Before status, let's see if we should print a nice reminder for what exactly we're doing here. shouldPrintMigrationStatusHint := (elapsedSeconds%600 == 0) @@ -786,27 +786,25 @@ func (this *Migrator) printStatus(rule PrintStatusRule, writers ...io.Writer) { } var etaSeconds float64 = math.MaxFloat64 - this.progressHistory.markState() - eta := "N/A" - if progressPct >= 100.0 { - eta = "due" - } else if etaTime := this.progressHistory.getETA(); progressPct >= 0.1 && !etaTime.IsZero() { - etaDuration := etaTime.Sub(time.Now()) - eta = base.PrettifyDurationOutput(etaDuration) - etaSeconds = etaDuration.Seconds() - log.Errorf("==== etaTime: %+v, eta=%+v, etaSeonds=%+v", etaTime, eta, etaSeconds) - } - log.Errorf("===1 etaTime: %+v, eta=%+v, etaSeonds=%+v", "na", eta, etaSeconds) - if etaSeconds < 0 { - eta = "due" + visualETA := "N/A" + if progressRatio >= 1.0 { + visualETA = "due" + } else if _, err := this.progressHistory.markState(this.migrationContext.ElapsedRowCopyTime(), progressRatio); err == nil { + eta := this.progressHistory.GetETA() + etaDuration := eta.Sub(time.Now()) + etaSeconds = etaDuration.Seconds() + visualETA = base.PrettifyDurationOutput(etaDuration) + } + if etaSeconds <= 0 { + visualETA = "due" } state := "migrating" if atomic.LoadInt64(&this.migrationContext.CountingRowsFlag) > 0 && !this.migrationContext.ConcurrentCountTableRows { state = "counting rows" } else if atomic.LoadInt64(&this.migrationContext.IsPostponingCutOver) > 0 { - eta = "due" + visualETA = "due" state = "postponing cut-over" } else if isThrottled, throttleReason := this.migrationContext.IsThrottled(); isThrottled { state = fmt.Sprintf("throttled, %s", throttleReason) @@ -837,6 +835,7 @@ func (this *Migrator) printStatus(rule PrintStatusRule, writers ...io.Writer) { currentBinlogCoordinates := *this.eventsStreamer.GetCurrentBinlogCoordinates() + progressPct := progressRatio * 100.0 status := fmt.Sprintf("Copy: %d/%d %.1f%%; Applied: %d; Backlog: %d/%d; Time: %+v(total), %+v(copy); streamer: %+v; State: %s; ETA: %s", totalRowsCopied, rowsEstimate, progressPct, atomic.LoadInt64(&this.migrationContext.TotalDMLEventsApplied), @@ -844,7 +843,7 @@ func (this *Migrator) printStatus(rule PrintStatusRule, writers ...io.Writer) { base.PrettifyDurationOutput(elapsedTime), base.PrettifyDurationOutput(this.migrationContext.ElapsedRowCopyTime()), currentBinlogCoordinates, state, - eta, + visualETA, ) this.applier.WriteChangelog( fmt.Sprintf("copy iteration %d at %d", this.migrationContext.GetIteration(), time.Now().Unix()), diff --git a/go/logic/progress.go b/go/logic/progress.go index 0b95321..978f3d6 100644 --- a/go/logic/progress.go +++ b/go/logic/progress.go @@ -6,138 +6,64 @@ package logic import ( + "fmt" "sync" - "sync/atomic" "time" - - "github.com/github/gh-ost/go/base" ) -const maxHistoryDuration time.Duration = time.Minute * 61 +const minProgressRatio float64 = 0.001 +const alpha float64 = 0.25 type ProgressState struct { - mark time.Time - rowsCopied int64 + elapsedTime time.Duration + progressRatio float64 + expectedTotalDuration time.Duration } -func NewProgressState(rowsCopied int64) *ProgressState { +func NewProgressState(elapsedTime time.Duration, progressRatio float64, expectedTotalDuration time.Duration) *ProgressState { result := &ProgressState{ - mark: time.Now(), - rowsCopied: rowsCopied, + elapsedTime: elapsedTime, + progressRatio: progressRatio, + expectedTotalDuration: expectedTotalDuration, } return result } type ProgressHistory struct { - migrationContext *base.MigrationContext - history [](*ProgressState) - historyMutex *sync.Mutex + lastProgressState *ProgressState + historyMutex *sync.Mutex + eta time.Time } func NewProgressHistory() *ProgressHistory { result := &ProgressHistory{ - history: make([](*ProgressState), 0), - historyMutex: &sync.Mutex{}, - migrationContext: base.GetMigrationContext(), + lastProgressState: nil, + historyMutex: &sync.Mutex{}, } return result } -func (this *ProgressHistory) oldestState() *ProgressState { - if len(this.history) == 0 { - return nil +func (this *ProgressHistory) markState(elapsedTime time.Duration, progressRatio float64) (expectedTotalDuration time.Duration, err error) { + if progressRatio < minProgressRatio || elapsedTime == 0 { + return expectedTotalDuration, fmt.Errorf("eta n/a") } - return this.history[0] -} -func (this *ProgressHistory) firstStateSince(since time.Time) *ProgressState { - for _, state := range this.history { - if !state.mark.Before(since) { - return state - } - } - return nil -} - -func (this *ProgressHistory) newestState() *ProgressState { - if len(this.history) == 0 { - return nil - } - return this.history[len(this.history)-1] -} - -func (this *ProgressHistory) oldestMark() (mark time.Time) { - if oldest := this.oldestState(); oldest != nil { - return oldest.mark - } - return mark -} - -func (this *ProgressHistory) markState() { this.historyMutex.Lock() defer this.historyMutex.Unlock() - state := NewProgressState(this.migrationContext.GetTotalRowsCopied()) - this.history = append(this.history, state) - for time.Since(this.oldestMark()) > maxHistoryDuration { - if len(this.history) == 0 { - return - } - this.history = this.history[1:] + newExpectedTotalDuration := float64(elapsedTime.Nanoseconds()) / progressRatio + if this.lastProgressState != nil { + newExpectedTotalDuration = alpha*float64(this.lastProgressState.expectedTotalDuration.Nanoseconds()) + (1.0-alpha)*newExpectedTotalDuration } + expectedTotalDuration = time.Duration(int64(newExpectedTotalDuration)) + this.lastProgressState = NewProgressState(elapsedTime, progressRatio, expectedTotalDuration) + this.eta = time.Now().Add(expectedTotalDuration - elapsedTime) + return expectedTotalDuration, nil } -// hasEnoughData tells us whether there's at all enough information to predict an ETA -// this function is not concurrent-safe -func (this *ProgressHistory) hasEnoughData() bool { - oldest := this.oldestState() - if oldest == nil { - return false - } - newest := this.newestState() +func (this *ProgressHistory) GetETA() time.Time { + this.historyMutex.Lock() + defer this.historyMutex.Unlock() - if !oldest.mark.Before(newest.mark) { - // single point in time; cannot extrapolate - return false - } - if oldest.rowsCopied == newest.rowsCopied { - // Nothing really happened; cannot extrapolate - return false - } - return true -} - -func (this *ProgressHistory) getETABasedOnRange(basedOnRange time.Duration) (eta time.Time) { - if !this.hasEnoughData() { - return eta - } - - oldest := this.firstStateSince(time.Now().Add(-basedOnRange)) - newest := this.newestState() - rowsEstimate := atomic.LoadInt64(&this.migrationContext.RowsEstimate) + atomic.LoadInt64(&this.migrationContext.RowsDeltaEstimate) - ratio := float64(rowsEstimate-oldest.rowsCopied) / float64(newest.rowsCopied-oldest.rowsCopied) - // ratio is also float64(totaltime-oldest.mark) / float64(newest.mark-oldest.mark) - totalTimeNanosecondsFromOldestMark := ratio * float64(newest.mark.Sub(oldest.mark).Nanoseconds()) - eta = oldest.mark.Add(time.Duration(totalTimeNanosecondsFromOldestMark)) - - return eta -} - -func (this *ProgressHistory) getETA() (eta time.Time) { - if eta = this.getETABasedOnRange(time.Minute * 1); !eta.IsZero() { - return eta - } - if eta = this.getETABasedOnRange(time.Minute * 5); !eta.IsZero() { - return eta - } - if eta = this.getETABasedOnRange(time.Minute * 10); !eta.IsZero() { - return eta - } - if eta = this.getETABasedOnRange(time.Minute * 30); !eta.IsZero() { - return eta - } - if eta = this.getETABasedOnRange(time.Minute * 60); !eta.IsZero() { - return eta - } - return eta + return this.eta } diff --git a/go/logic/progress_test.go b/go/logic/progress_test.go index 63881d8..d13b115 100644 --- a/go/logic/progress_test.go +++ b/go/logic/progress_test.go @@ -19,41 +19,28 @@ func init() { func TestNewProgressHistory(t *testing.T) { progressHistory := NewProgressHistory() - test.S(t).ExpectEquals(len(progressHistory.history), 0) + test.S(t).ExpectTrue(progressHistory.lastProgressState == nil) } func TestMarkState(t *testing.T) { { progressHistory := NewProgressHistory() - test.S(t).ExpectEquals(len(progressHistory.history), 0) + _, err := progressHistory.markState(0, 0) + test.S(t).ExpectNotNil(err) } { progressHistory := NewProgressHistory() - progressHistory.markState() - progressHistory.markState() - progressHistory.markState() - progressHistory.markState() - progressHistory.markState() - test.S(t).ExpectEquals(len(progressHistory.history), 5) + _, err := progressHistory.markState(0, 0.01) + test.S(t).ExpectNotNil(err) } { progressHistory := NewProgressHistory() - progressHistory.markState() - progressHistory.markState() - progressHistory.history[0].mark = time.Now().Add(-2 * time.Hour) - progressHistory.markState() - progressHistory.markState() - progressHistory.markState() - test.S(t).ExpectEquals(len(progressHistory.history), 4) - } -} - -func TestOldestMark(t *testing.T) { - { - progressHistory := NewProgressHistory() - oldestState := progressHistory.oldestState() - test.S(t).ExpectTrue(oldestState == nil) - oldestMark := progressHistory.oldestMark() - test.S(t).ExpectTrue(oldestMark.IsZero()) + _, err := progressHistory.markState(0, 50) + test.S(t).ExpectNotNil(err) + } + { + progressHistory := NewProgressHistory() + _, err := progressHistory.markState(time.Hour, 50) + test.S(t).ExpectNil(err) } }