Compare commits

...

6 Commits

Author SHA1 Message Date
Shlomi Noach
3f3a69c708 Merge branch 'master' into eta-ewma 2017-03-19 08:25:55 +02:00
Shlomi Noach
87cebc2ccf eqma-based ETA 2016-10-24 13:37:19 +02:00
Shlomi Noach
fba9741821 incremental ETAs 2016-10-24 09:30:03 +02:00
Shlomi Noach
90d557be9e ETA calculation based on past hour's progress (moving window) 2016-10-21 21:43:24 +02:00
Shlomi Noach
cf08babc55 Merge branch 'master' into adaptive-eta 2016-10-20 18:41:57 +02:00
Shlomi Noach
88b70886b3 ETA based on recent activity 2016-09-12 19:05:57 +02:00
3 changed files with 138 additions and 19 deletions

View File

@ -89,6 +89,8 @@ type Migrator struct {
applyEventsQueue chan *applyEventStruct applyEventsQueue chan *applyEventStruct
handledChangelogStates map[string]bool handledChangelogStates map[string]bool
progressHistory *ProgressHistory
} }
func NewMigrator() *Migrator { func NewMigrator() *Migrator {
@ -103,6 +105,7 @@ func NewMigrator() *Migrator {
copyRowsQueue: make(chan tableWriteFunc), copyRowsQueue: make(chan tableWriteFunc),
applyEventsQueue: make(chan *applyEventStruct, applyEventsQueueBuffer), applyEventsQueue: make(chan *applyEventStruct, applyEventsQueueBuffer),
handledChangelogStates: make(map[string]bool), handledChangelogStates: make(map[string]bool),
progressHistory: NewProgressHistory(),
} }
return migrator return migrator
} }
@ -756,8 +759,9 @@ func (this *Migrator) printMigrationStatusHint(writers ...io.Writer) {
*this.inspector.connectionConfig.ImpliedKey, *this.inspector.connectionConfig.ImpliedKey,
this.migrationContext.Hostname, this.migrationContext.Hostname,
)) ))
fmt.Fprintln(w, fmt.Sprintf("# Migration started at %+v", fmt.Fprintln(w, fmt.Sprintf("# Migration started at %+v; time now is %+v",
this.migrationContext.StartTime.Format(time.RubyDate), this.migrationContext.StartTime.Format(time.RubyDate),
time.Now().Format(time.RubyDate),
)) ))
maxLoad := this.migrationContext.GetMaxLoad() maxLoad := this.migrationContext.GetMaxLoad()
criticalLoad := this.migrationContext.GetCriticalLoad() criticalLoad := this.migrationContext.GetCriticalLoad()
@ -839,11 +843,11 @@ func (this *Migrator) printStatus(rule PrintStatusRule, writers ...io.Writer) {
// and there is no further need to keep updating the value. // and there is no further need to keep updating the value.
rowsEstimate = totalRowsCopied rowsEstimate = totalRowsCopied
} }
var progressPct float64 var progressRatio float64
if rowsEstimate == 0 { if rowsEstimate == 0 {
progressPct = 100.0 progressRatio = 1.0
} else { } else {
progressPct = 100.0 * float64(totalRowsCopied) / float64(rowsEstimate) progressRatio = float64(totalRowsCopied) / float64(rowsEstimate)
} }
// Before status, let's see if we should print a nice reminder for what exactly we're doing here. // Before status, let's see if we should print a nice reminder for what exactly we're doing here.
shouldPrintMigrationStatusHint := (elapsedSeconds%600 == 0) shouldPrintMigrationStatusHint := (elapsedSeconds%600 == 0)
@ -858,26 +862,25 @@ func (this *Migrator) printStatus(rule PrintStatusRule, writers ...io.Writer) {
} }
var etaSeconds float64 = math.MaxFloat64 var etaSeconds float64 = math.MaxFloat64
eta := "N/A"
if progressPct >= 100.0 { visualETA := "N/A"
eta = "due" if progressRatio >= 1.0 {
} else if progressPct >= 1.0 { visualETA = "due"
elapsedRowCopySeconds := this.migrationContext.ElapsedRowCopyTime().Seconds() } else if _, err := this.progressHistory.markState(this.migrationContext.ElapsedRowCopyTime(), progressRatio); err == nil {
totalExpectedSeconds := elapsedRowCopySeconds * float64(rowsEstimate) / float64(totalRowsCopied) eta := this.progressHistory.GetETA()
etaSeconds = totalExpectedSeconds - elapsedRowCopySeconds etaDuration := eta.Sub(time.Now())
if etaSeconds >= 0 { etaSeconds = etaDuration.Seconds()
etaDuration := time.Duration(etaSeconds) * time.Second visualETA = base.PrettifyDurationOutput(etaDuration)
eta = base.PrettifyDurationOutput(etaDuration) }
} else { if etaSeconds <= 0 {
eta = "due" visualETA = "due"
}
} }
state := "migrating" state := "migrating"
if atomic.LoadInt64(&this.migrationContext.CountingRowsFlag) > 0 && !this.migrationContext.ConcurrentCountTableRows { if atomic.LoadInt64(&this.migrationContext.CountingRowsFlag) > 0 && !this.migrationContext.ConcurrentCountTableRows {
state = "counting rows" state = "counting rows"
} else if atomic.LoadInt64(&this.migrationContext.IsPostponingCutOver) > 0 { } else if atomic.LoadInt64(&this.migrationContext.IsPostponingCutOver) > 0 {
eta = "due" visualETA = "due"
state = "postponing cut-over" state = "postponing cut-over"
} else if isThrottled, throttleReason, _ := this.migrationContext.IsThrottled(); isThrottled { } else if isThrottled, throttleReason, _ := this.migrationContext.IsThrottled(); isThrottled {
state = fmt.Sprintf("throttled, %s", throttleReason) state = fmt.Sprintf("throttled, %s", throttleReason)
@ -908,6 +911,7 @@ func (this *Migrator) printStatus(rule PrintStatusRule, writers ...io.Writer) {
currentBinlogCoordinates := *this.eventsStreamer.GetCurrentBinlogCoordinates() currentBinlogCoordinates := *this.eventsStreamer.GetCurrentBinlogCoordinates()
progressPct := progressRatio * 100.0
status := fmt.Sprintf("Copy: %d/%d %.1f%%; Applied: %d; Backlog: %d/%d; Time: %+v(total), %+v(copy); streamer: %+v; State: %s; ETA: %s", status := fmt.Sprintf("Copy: %d/%d %.1f%%; Applied: %d; Backlog: %d/%d; Time: %+v(total), %+v(copy); streamer: %+v; State: %s; ETA: %s",
totalRowsCopied, rowsEstimate, progressPct, totalRowsCopied, rowsEstimate, progressPct,
atomic.LoadInt64(&this.migrationContext.TotalDMLEventsApplied), atomic.LoadInt64(&this.migrationContext.TotalDMLEventsApplied),
@ -915,7 +919,7 @@ func (this *Migrator) printStatus(rule PrintStatusRule, writers ...io.Writer) {
base.PrettifyDurationOutput(elapsedTime), base.PrettifyDurationOutput(this.migrationContext.ElapsedRowCopyTime()), base.PrettifyDurationOutput(elapsedTime), base.PrettifyDurationOutput(this.migrationContext.ElapsedRowCopyTime()),
currentBinlogCoordinates, currentBinlogCoordinates,
state, state,
eta, visualETA,
) )
this.applier.WriteChangelog( this.applier.WriteChangelog(
fmt.Sprintf("copy iteration %d at %d", this.migrationContext.GetIteration(), time.Now().Unix()), fmt.Sprintf("copy iteration %d at %d", this.migrationContext.GetIteration(), time.Now().Unix()),

69
go/logic/progress.go Normal file
View File

@ -0,0 +1,69 @@
/*
Copyright 2016 GitHub Inc.
See https://github.com/github/gh-ost/blob/master/LICENSE
*/
package logic
import (
"fmt"
"sync"
"time"
)
const minProgressRatio float64 = 0.001
const alpha float64 = 0.25
type ProgressState struct {
elapsedTime time.Duration
progressRatio float64
expectedTotalDuration time.Duration
}
func NewProgressState(elapsedTime time.Duration, progressRatio float64, expectedTotalDuration time.Duration) *ProgressState {
result := &ProgressState{
elapsedTime: elapsedTime,
progressRatio: progressRatio,
expectedTotalDuration: expectedTotalDuration,
}
return result
}
type ProgressHistory struct {
lastProgressState *ProgressState
historyMutex *sync.Mutex
eta time.Time
}
func NewProgressHistory() *ProgressHistory {
result := &ProgressHistory{
lastProgressState: nil,
historyMutex: &sync.Mutex{},
}
return result
}
func (this *ProgressHistory) markState(elapsedTime time.Duration, progressRatio float64) (expectedTotalDuration time.Duration, err error) {
if progressRatio < minProgressRatio || elapsedTime == 0 {
return expectedTotalDuration, fmt.Errorf("eta n/a")
}
this.historyMutex.Lock()
defer this.historyMutex.Unlock()
newExpectedTotalDuration := float64(elapsedTime.Nanoseconds()) / progressRatio
if this.lastProgressState != nil {
newExpectedTotalDuration = alpha*float64(this.lastProgressState.expectedTotalDuration.Nanoseconds()) + (1.0-alpha)*newExpectedTotalDuration
}
expectedTotalDuration = time.Duration(int64(newExpectedTotalDuration))
this.lastProgressState = NewProgressState(elapsedTime, progressRatio, expectedTotalDuration)
this.eta = time.Now().Add(expectedTotalDuration - elapsedTime)
return expectedTotalDuration, nil
}
func (this *ProgressHistory) GetETA() time.Time {
this.historyMutex.Lock()
defer this.historyMutex.Unlock()
return this.eta
}

46
go/logic/progress_test.go Normal file
View File

@ -0,0 +1,46 @@
/*
Copyright 2016 GitHub Inc.
See https://github.com/github/gh-ost/blob/master/LICENSE
*/
package logic
import (
"testing"
"time"
"github.com/outbrain/golib/log"
test "github.com/outbrain/golib/tests"
)
func init() {
log.SetLevel(log.ERROR)
}
func TestNewProgressHistory(t *testing.T) {
progressHistory := NewProgressHistory()
test.S(t).ExpectTrue(progressHistory.lastProgressState == nil)
}
func TestMarkState(t *testing.T) {
{
progressHistory := NewProgressHistory()
_, err := progressHistory.markState(0, 0)
test.S(t).ExpectNotNil(err)
}
{
progressHistory := NewProgressHistory()
_, err := progressHistory.markState(0, 0.01)
test.S(t).ExpectNotNil(err)
}
{
progressHistory := NewProgressHistory()
_, err := progressHistory.markState(0, 50)
test.S(t).ExpectNotNil(err)
}
{
progressHistory := NewProgressHistory()
_, err := progressHistory.markState(time.Hour, 50)
test.S(t).ExpectNil(err)
}
}