Compare commits

...

3 Commits

Author SHA1 Message Date
Shlomi Noach
90d557be9e ETA calculation based on past hour's progress (moving window) 2016-10-21 21:43:24 +02:00
Shlomi Noach
cf08babc55 Merge branch 'master' into adaptive-eta 2016-10-20 18:41:57 +02:00
Shlomi Noach
88b70886b3 ETA based on recent activity 2016-09-12 19:05:57 +02:00
3 changed files with 186 additions and 10 deletions

View File

@ -70,6 +70,8 @@ type Migrator struct {
applyEventsQueue chan tableWriteFunc
handledChangelogStates map[string]bool
progressHistory *ProgressHistory
}
func NewMigrator() *Migrator {
@ -84,6 +86,7 @@ func NewMigrator() *Migrator {
copyRowsQueue: make(chan tableWriteFunc),
applyEventsQueue: make(chan tableWriteFunc, applyEventsQueueBuffer),
handledChangelogStates: make(map[string]bool),
progressHistory: NewProgressHistory(),
}
return migrator
}
@ -782,19 +785,18 @@ func (this *Migrator) printStatus(rule PrintStatusRule, writers ...io.Writer) {
}
var etaSeconds float64 = math.MaxFloat64
this.progressHistory.markState()
eta := "N/A"
if progressPct >= 100.0 {
eta = "due"
} else if progressPct >= 1.0 {
elapsedRowCopySeconds := this.migrationContext.ElapsedRowCopyTime().Seconds()
totalExpectedSeconds := elapsedRowCopySeconds * float64(rowsEstimate) / float64(totalRowsCopied)
etaSeconds = totalExpectedSeconds - elapsedRowCopySeconds
if etaSeconds >= 0 {
etaDuration := time.Duration(etaSeconds) * time.Second
eta = base.PrettifyDurationOutput(etaDuration)
} else {
eta = "due"
}
} else if etaTime := this.progressHistory.getETA(); progressPct >= 0.1 && !etaTime.IsZero() {
etaDuration := etaTime.Sub(time.Now())
eta = base.PrettifyDurationOutput(etaDuration)
etaSeconds = etaDuration.Seconds()
}
if etaSeconds < 0 {
eta = "due"
}
state := "migrating"

115
go/logic/progress.go Normal file
View File

@ -0,0 +1,115 @@
/*
Copyright 2016 GitHub Inc.
See https://github.com/github/gh-ost/blob/master/LICENSE
*/
package logic
import (
"sync"
"sync/atomic"
"time"
"github.com/github/gh-ost/go/base"
)
const maxHistoryDuration time.Duration = time.Hour
type ProgressState struct {
mark time.Time
rowsCopied int64
}
func NewProgressState(rowsCopied int64) *ProgressState {
result := &ProgressState{
mark: time.Now(),
rowsCopied: rowsCopied,
}
return result
}
type ProgressHistory struct {
migrationContext *base.MigrationContext
history [](*ProgressState)
historyMutex *sync.Mutex
}
func NewProgressHistory() *ProgressHistory {
result := &ProgressHistory{
history: make([](*ProgressState), 0),
historyMutex: &sync.Mutex{},
migrationContext: base.GetMigrationContext(),
}
return result
}
func (this *ProgressHistory) oldestState() *ProgressState {
if len(this.history) == 0 {
return nil
}
return this.history[0]
}
func (this *ProgressHistory) newestState() *ProgressState {
if len(this.history) == 0 {
return nil
}
return this.history[len(this.history)-1]
}
func (this *ProgressHistory) oldestMark() (mark time.Time) {
if oldest := this.oldestState(); oldest != nil {
return oldest.mark
}
return mark
}
func (this *ProgressHistory) markState() {
this.historyMutex.Lock()
defer this.historyMutex.Unlock()
state := NewProgressState(this.migrationContext.GetTotalRowsCopied())
this.history = append(this.history, state)
for time.Since(this.oldestMark()) > maxHistoryDuration {
if len(this.history) == 0 {
return
}
this.history = this.history[1:]
}
}
// hasEnoughData tells us whether there's at all enough information to predict an ETA
// this function is not concurrent-safe
func (this *ProgressHistory) hasEnoughData() bool {
oldest := this.oldestState()
if oldest == nil {
return false
}
newest := this.newestState()
if !oldest.mark.Before(newest.mark) {
// single point in time; cannot extrapolate
return false
}
if oldest.rowsCopied == newest.rowsCopied {
// Nothing really happened; cannot extrapolate
return false
}
return true
}
func (this *ProgressHistory) getETA() (eta time.Time) {
if !this.hasEnoughData() {
return eta
}
oldest := this.oldestState()
newest := this.newestState()
rowsEstimate := atomic.LoadInt64(&this.migrationContext.RowsEstimate) + atomic.LoadInt64(&this.migrationContext.RowsDeltaEstimate)
ratio := float64(rowsEstimate-oldest.rowsCopied) / float64(newest.rowsCopied-oldest.rowsCopied)
// ratio is also float64(totaltime-oldest.mark) / float64(newest.mark-oldest.mark)
totalTimeNanosecondsFromOldestMark := ratio * float64(newest.mark.Sub(oldest.mark).Nanoseconds())
eta = oldest.mark.Add(time.Duration(totalTimeNanosecondsFromOldestMark))
return eta
}

59
go/logic/progress_test.go Normal file
View File

@ -0,0 +1,59 @@
/*
Copyright 2016 GitHub Inc.
See https://github.com/github/gh-ost/blob/master/LICENSE
*/
package logic
import (
"testing"
"time"
"github.com/outbrain/golib/log"
test "github.com/outbrain/golib/tests"
)
func init() {
log.SetLevel(log.ERROR)
}
func TestNewProgressHistory(t *testing.T) {
progressHistory := NewProgressHistory()
test.S(t).ExpectEquals(len(progressHistory.history), 0)
}
func TestMarkState(t *testing.T) {
{
progressHistory := NewProgressHistory()
test.S(t).ExpectEquals(len(progressHistory.history), 0)
}
{
progressHistory := NewProgressHistory()
progressHistory.markState()
progressHistory.markState()
progressHistory.markState()
progressHistory.markState()
progressHistory.markState()
test.S(t).ExpectEquals(len(progressHistory.history), 5)
}
{
progressHistory := NewProgressHistory()
progressHistory.markState()
progressHistory.markState()
progressHistory.history[0].mark = time.Now().Add(-2 * time.Hour)
progressHistory.markState()
progressHistory.markState()
progressHistory.markState()
test.S(t).ExpectEquals(len(progressHistory.history), 4)
}
}
func TestOldestMark(t *testing.T) {
{
progressHistory := NewProgressHistory()
oldestState := progressHistory.oldestState()
test.S(t).ExpectTrue(oldestState == nil)
oldestMark := progressHistory.oldestMark()
test.S(t).ExpectTrue(oldestMark.IsZero())
}
}