Compare commits
3 Commits
master
...
adaptive-e
Author | SHA1 | Date | |
---|---|---|---|
|
90d557be9e | ||
|
cf08babc55 | ||
|
88b70886b3 |
@ -70,6 +70,8 @@ type Migrator struct {
|
|||||||
applyEventsQueue chan tableWriteFunc
|
applyEventsQueue chan tableWriteFunc
|
||||||
|
|
||||||
handledChangelogStates map[string]bool
|
handledChangelogStates map[string]bool
|
||||||
|
|
||||||
|
progressHistory *ProgressHistory
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewMigrator() *Migrator {
|
func NewMigrator() *Migrator {
|
||||||
@ -84,6 +86,7 @@ func NewMigrator() *Migrator {
|
|||||||
copyRowsQueue: make(chan tableWriteFunc),
|
copyRowsQueue: make(chan tableWriteFunc),
|
||||||
applyEventsQueue: make(chan tableWriteFunc, applyEventsQueueBuffer),
|
applyEventsQueue: make(chan tableWriteFunc, applyEventsQueueBuffer),
|
||||||
handledChangelogStates: make(map[string]bool),
|
handledChangelogStates: make(map[string]bool),
|
||||||
|
progressHistory: NewProgressHistory(),
|
||||||
}
|
}
|
||||||
return migrator
|
return migrator
|
||||||
}
|
}
|
||||||
@ -782,19 +785,18 @@ func (this *Migrator) printStatus(rule PrintStatusRule, writers ...io.Writer) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
var etaSeconds float64 = math.MaxFloat64
|
var etaSeconds float64 = math.MaxFloat64
|
||||||
|
this.progressHistory.markState()
|
||||||
eta := "N/A"
|
eta := "N/A"
|
||||||
if progressPct >= 100.0 {
|
if progressPct >= 100.0 {
|
||||||
eta = "due"
|
eta = "due"
|
||||||
} else if progressPct >= 1.0 {
|
} else if etaTime := this.progressHistory.getETA(); progressPct >= 0.1 && !etaTime.IsZero() {
|
||||||
elapsedRowCopySeconds := this.migrationContext.ElapsedRowCopyTime().Seconds()
|
etaDuration := etaTime.Sub(time.Now())
|
||||||
totalExpectedSeconds := elapsedRowCopySeconds * float64(rowsEstimate) / float64(totalRowsCopied)
|
|
||||||
etaSeconds = totalExpectedSeconds - elapsedRowCopySeconds
|
|
||||||
if etaSeconds >= 0 {
|
|
||||||
etaDuration := time.Duration(etaSeconds) * time.Second
|
|
||||||
eta = base.PrettifyDurationOutput(etaDuration)
|
eta = base.PrettifyDurationOutput(etaDuration)
|
||||||
} else {
|
etaSeconds = etaDuration.Seconds()
|
||||||
eta = "due"
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if etaSeconds < 0 {
|
||||||
|
eta = "due"
|
||||||
}
|
}
|
||||||
|
|
||||||
state := "migrating"
|
state := "migrating"
|
||||||
|
115
go/logic/progress.go
Normal file
115
go/logic/progress.go
Normal file
@ -0,0 +1,115 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2016 GitHub Inc.
|
||||||
|
See https://github.com/github/gh-ost/blob/master/LICENSE
|
||||||
|
*/
|
||||||
|
|
||||||
|
package logic
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/github/gh-ost/go/base"
|
||||||
|
)
|
||||||
|
|
||||||
|
const maxHistoryDuration time.Duration = time.Hour
|
||||||
|
|
||||||
|
type ProgressState struct {
|
||||||
|
mark time.Time
|
||||||
|
rowsCopied int64
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewProgressState(rowsCopied int64) *ProgressState {
|
||||||
|
result := &ProgressState{
|
||||||
|
mark: time.Now(),
|
||||||
|
rowsCopied: rowsCopied,
|
||||||
|
}
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
|
type ProgressHistory struct {
|
||||||
|
migrationContext *base.MigrationContext
|
||||||
|
history [](*ProgressState)
|
||||||
|
historyMutex *sync.Mutex
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewProgressHistory() *ProgressHistory {
|
||||||
|
result := &ProgressHistory{
|
||||||
|
history: make([](*ProgressState), 0),
|
||||||
|
historyMutex: &sync.Mutex{},
|
||||||
|
migrationContext: base.GetMigrationContext(),
|
||||||
|
}
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
|
func (this *ProgressHistory) oldestState() *ProgressState {
|
||||||
|
if len(this.history) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return this.history[0]
|
||||||
|
}
|
||||||
|
|
||||||
|
func (this *ProgressHistory) newestState() *ProgressState {
|
||||||
|
if len(this.history) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return this.history[len(this.history)-1]
|
||||||
|
}
|
||||||
|
|
||||||
|
func (this *ProgressHistory) oldestMark() (mark time.Time) {
|
||||||
|
if oldest := this.oldestState(); oldest != nil {
|
||||||
|
return oldest.mark
|
||||||
|
}
|
||||||
|
return mark
|
||||||
|
}
|
||||||
|
|
||||||
|
func (this *ProgressHistory) markState() {
|
||||||
|
this.historyMutex.Lock()
|
||||||
|
defer this.historyMutex.Unlock()
|
||||||
|
|
||||||
|
state := NewProgressState(this.migrationContext.GetTotalRowsCopied())
|
||||||
|
this.history = append(this.history, state)
|
||||||
|
for time.Since(this.oldestMark()) > maxHistoryDuration {
|
||||||
|
if len(this.history) == 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
this.history = this.history[1:]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// hasEnoughData tells us whether there's at all enough information to predict an ETA
|
||||||
|
// this function is not concurrent-safe
|
||||||
|
func (this *ProgressHistory) hasEnoughData() bool {
|
||||||
|
oldest := this.oldestState()
|
||||||
|
if oldest == nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
newest := this.newestState()
|
||||||
|
|
||||||
|
if !oldest.mark.Before(newest.mark) {
|
||||||
|
// single point in time; cannot extrapolate
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
if oldest.rowsCopied == newest.rowsCopied {
|
||||||
|
// Nothing really happened; cannot extrapolate
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
func (this *ProgressHistory) getETA() (eta time.Time) {
|
||||||
|
if !this.hasEnoughData() {
|
||||||
|
return eta
|
||||||
|
}
|
||||||
|
|
||||||
|
oldest := this.oldestState()
|
||||||
|
newest := this.newestState()
|
||||||
|
rowsEstimate := atomic.LoadInt64(&this.migrationContext.RowsEstimate) + atomic.LoadInt64(&this.migrationContext.RowsDeltaEstimate)
|
||||||
|
ratio := float64(rowsEstimate-oldest.rowsCopied) / float64(newest.rowsCopied-oldest.rowsCopied)
|
||||||
|
// ratio is also float64(totaltime-oldest.mark) / float64(newest.mark-oldest.mark)
|
||||||
|
totalTimeNanosecondsFromOldestMark := ratio * float64(newest.mark.Sub(oldest.mark).Nanoseconds())
|
||||||
|
eta = oldest.mark.Add(time.Duration(totalTimeNanosecondsFromOldestMark))
|
||||||
|
|
||||||
|
return eta
|
||||||
|
}
|
59
go/logic/progress_test.go
Normal file
59
go/logic/progress_test.go
Normal file
@ -0,0 +1,59 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2016 GitHub Inc.
|
||||||
|
See https://github.com/github/gh-ost/blob/master/LICENSE
|
||||||
|
*/
|
||||||
|
|
||||||
|
package logic
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/outbrain/golib/log"
|
||||||
|
test "github.com/outbrain/golib/tests"
|
||||||
|
)
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
log.SetLevel(log.ERROR)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestNewProgressHistory(t *testing.T) {
|
||||||
|
progressHistory := NewProgressHistory()
|
||||||
|
test.S(t).ExpectEquals(len(progressHistory.history), 0)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMarkState(t *testing.T) {
|
||||||
|
{
|
||||||
|
progressHistory := NewProgressHistory()
|
||||||
|
test.S(t).ExpectEquals(len(progressHistory.history), 0)
|
||||||
|
}
|
||||||
|
{
|
||||||
|
progressHistory := NewProgressHistory()
|
||||||
|
progressHistory.markState()
|
||||||
|
progressHistory.markState()
|
||||||
|
progressHistory.markState()
|
||||||
|
progressHistory.markState()
|
||||||
|
progressHistory.markState()
|
||||||
|
test.S(t).ExpectEquals(len(progressHistory.history), 5)
|
||||||
|
}
|
||||||
|
{
|
||||||
|
progressHistory := NewProgressHistory()
|
||||||
|
progressHistory.markState()
|
||||||
|
progressHistory.markState()
|
||||||
|
progressHistory.history[0].mark = time.Now().Add(-2 * time.Hour)
|
||||||
|
progressHistory.markState()
|
||||||
|
progressHistory.markState()
|
||||||
|
progressHistory.markState()
|
||||||
|
test.S(t).ExpectEquals(len(progressHistory.history), 4)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestOldestMark(t *testing.T) {
|
||||||
|
{
|
||||||
|
progressHistory := NewProgressHistory()
|
||||||
|
oldestState := progressHistory.oldestState()
|
||||||
|
test.S(t).ExpectTrue(oldestState == nil)
|
||||||
|
oldestMark := progressHistory.oldestMark()
|
||||||
|
test.S(t).ExpectTrue(oldestMark.IsZero())
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user