Compare commits
6 Commits
Author | SHA1 | Date | |
---|---|---|---|
|
3f3a69c708 | ||
|
87cebc2ccf | ||
|
fba9741821 | ||
|
90d557be9e | ||
|
cf08babc55 | ||
|
88b70886b3 |
@ -89,6 +89,8 @@ type Migrator struct {
|
|||||||
applyEventsQueue chan *applyEventStruct
|
applyEventsQueue chan *applyEventStruct
|
||||||
|
|
||||||
handledChangelogStates map[string]bool
|
handledChangelogStates map[string]bool
|
||||||
|
|
||||||
|
progressHistory *ProgressHistory
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewMigrator() *Migrator {
|
func NewMigrator() *Migrator {
|
||||||
@ -103,6 +105,7 @@ func NewMigrator() *Migrator {
|
|||||||
copyRowsQueue: make(chan tableWriteFunc),
|
copyRowsQueue: make(chan tableWriteFunc),
|
||||||
applyEventsQueue: make(chan *applyEventStruct, applyEventsQueueBuffer),
|
applyEventsQueue: make(chan *applyEventStruct, applyEventsQueueBuffer),
|
||||||
handledChangelogStates: make(map[string]bool),
|
handledChangelogStates: make(map[string]bool),
|
||||||
|
progressHistory: NewProgressHistory(),
|
||||||
}
|
}
|
||||||
return migrator
|
return migrator
|
||||||
}
|
}
|
||||||
@ -756,8 +759,9 @@ func (this *Migrator) printMigrationStatusHint(writers ...io.Writer) {
|
|||||||
*this.inspector.connectionConfig.ImpliedKey,
|
*this.inspector.connectionConfig.ImpliedKey,
|
||||||
this.migrationContext.Hostname,
|
this.migrationContext.Hostname,
|
||||||
))
|
))
|
||||||
fmt.Fprintln(w, fmt.Sprintf("# Migration started at %+v",
|
fmt.Fprintln(w, fmt.Sprintf("# Migration started at %+v; time now is %+v",
|
||||||
this.migrationContext.StartTime.Format(time.RubyDate),
|
this.migrationContext.StartTime.Format(time.RubyDate),
|
||||||
|
time.Now().Format(time.RubyDate),
|
||||||
))
|
))
|
||||||
maxLoad := this.migrationContext.GetMaxLoad()
|
maxLoad := this.migrationContext.GetMaxLoad()
|
||||||
criticalLoad := this.migrationContext.GetCriticalLoad()
|
criticalLoad := this.migrationContext.GetCriticalLoad()
|
||||||
@ -839,11 +843,11 @@ func (this *Migrator) printStatus(rule PrintStatusRule, writers ...io.Writer) {
|
|||||||
// and there is no further need to keep updating the value.
|
// and there is no further need to keep updating the value.
|
||||||
rowsEstimate = totalRowsCopied
|
rowsEstimate = totalRowsCopied
|
||||||
}
|
}
|
||||||
var progressPct float64
|
var progressRatio float64
|
||||||
if rowsEstimate == 0 {
|
if rowsEstimate == 0 {
|
||||||
progressPct = 100.0
|
progressRatio = 1.0
|
||||||
} else {
|
} else {
|
||||||
progressPct = 100.0 * float64(totalRowsCopied) / float64(rowsEstimate)
|
progressRatio = float64(totalRowsCopied) / float64(rowsEstimate)
|
||||||
}
|
}
|
||||||
// Before status, let's see if we should print a nice reminder for what exactly we're doing here.
|
// Before status, let's see if we should print a nice reminder for what exactly we're doing here.
|
||||||
shouldPrintMigrationStatusHint := (elapsedSeconds%600 == 0)
|
shouldPrintMigrationStatusHint := (elapsedSeconds%600 == 0)
|
||||||
@ -858,26 +862,25 @@ func (this *Migrator) printStatus(rule PrintStatusRule, writers ...io.Writer) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
var etaSeconds float64 = math.MaxFloat64
|
var etaSeconds float64 = math.MaxFloat64
|
||||||
eta := "N/A"
|
|
||||||
if progressPct >= 100.0 {
|
visualETA := "N/A"
|
||||||
eta = "due"
|
if progressRatio >= 1.0 {
|
||||||
} else if progressPct >= 1.0 {
|
visualETA = "due"
|
||||||
elapsedRowCopySeconds := this.migrationContext.ElapsedRowCopyTime().Seconds()
|
} else if _, err := this.progressHistory.markState(this.migrationContext.ElapsedRowCopyTime(), progressRatio); err == nil {
|
||||||
totalExpectedSeconds := elapsedRowCopySeconds * float64(rowsEstimate) / float64(totalRowsCopied)
|
eta := this.progressHistory.GetETA()
|
||||||
etaSeconds = totalExpectedSeconds - elapsedRowCopySeconds
|
etaDuration := eta.Sub(time.Now())
|
||||||
if etaSeconds >= 0 {
|
etaSeconds = etaDuration.Seconds()
|
||||||
etaDuration := time.Duration(etaSeconds) * time.Second
|
visualETA = base.PrettifyDurationOutput(etaDuration)
|
||||||
eta = base.PrettifyDurationOutput(etaDuration)
|
}
|
||||||
} else {
|
if etaSeconds <= 0 {
|
||||||
eta = "due"
|
visualETA = "due"
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
state := "migrating"
|
state := "migrating"
|
||||||
if atomic.LoadInt64(&this.migrationContext.CountingRowsFlag) > 0 && !this.migrationContext.ConcurrentCountTableRows {
|
if atomic.LoadInt64(&this.migrationContext.CountingRowsFlag) > 0 && !this.migrationContext.ConcurrentCountTableRows {
|
||||||
state = "counting rows"
|
state = "counting rows"
|
||||||
} else if atomic.LoadInt64(&this.migrationContext.IsPostponingCutOver) > 0 {
|
} else if atomic.LoadInt64(&this.migrationContext.IsPostponingCutOver) > 0 {
|
||||||
eta = "due"
|
visualETA = "due"
|
||||||
state = "postponing cut-over"
|
state = "postponing cut-over"
|
||||||
} else if isThrottled, throttleReason, _ := this.migrationContext.IsThrottled(); isThrottled {
|
} else if isThrottled, throttleReason, _ := this.migrationContext.IsThrottled(); isThrottled {
|
||||||
state = fmt.Sprintf("throttled, %s", throttleReason)
|
state = fmt.Sprintf("throttled, %s", throttleReason)
|
||||||
@ -908,6 +911,7 @@ func (this *Migrator) printStatus(rule PrintStatusRule, writers ...io.Writer) {
|
|||||||
|
|
||||||
currentBinlogCoordinates := *this.eventsStreamer.GetCurrentBinlogCoordinates()
|
currentBinlogCoordinates := *this.eventsStreamer.GetCurrentBinlogCoordinates()
|
||||||
|
|
||||||
|
progressPct := progressRatio * 100.0
|
||||||
status := fmt.Sprintf("Copy: %d/%d %.1f%%; Applied: %d; Backlog: %d/%d; Time: %+v(total), %+v(copy); streamer: %+v; State: %s; ETA: %s",
|
status := fmt.Sprintf("Copy: %d/%d %.1f%%; Applied: %d; Backlog: %d/%d; Time: %+v(total), %+v(copy); streamer: %+v; State: %s; ETA: %s",
|
||||||
totalRowsCopied, rowsEstimate, progressPct,
|
totalRowsCopied, rowsEstimate, progressPct,
|
||||||
atomic.LoadInt64(&this.migrationContext.TotalDMLEventsApplied),
|
atomic.LoadInt64(&this.migrationContext.TotalDMLEventsApplied),
|
||||||
@ -915,7 +919,7 @@ func (this *Migrator) printStatus(rule PrintStatusRule, writers ...io.Writer) {
|
|||||||
base.PrettifyDurationOutput(elapsedTime), base.PrettifyDurationOutput(this.migrationContext.ElapsedRowCopyTime()),
|
base.PrettifyDurationOutput(elapsedTime), base.PrettifyDurationOutput(this.migrationContext.ElapsedRowCopyTime()),
|
||||||
currentBinlogCoordinates,
|
currentBinlogCoordinates,
|
||||||
state,
|
state,
|
||||||
eta,
|
visualETA,
|
||||||
)
|
)
|
||||||
this.applier.WriteChangelog(
|
this.applier.WriteChangelog(
|
||||||
fmt.Sprintf("copy iteration %d at %d", this.migrationContext.GetIteration(), time.Now().Unix()),
|
fmt.Sprintf("copy iteration %d at %d", this.migrationContext.GetIteration(), time.Now().Unix()),
|
||||||
|
69
go/logic/progress.go
Normal file
69
go/logic/progress.go
Normal file
@ -0,0 +1,69 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2016 GitHub Inc.
|
||||||
|
See https://github.com/github/gh-ost/blob/master/LICENSE
|
||||||
|
*/
|
||||||
|
|
||||||
|
package logic
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
const minProgressRatio float64 = 0.001
|
||||||
|
const alpha float64 = 0.25
|
||||||
|
|
||||||
|
type ProgressState struct {
|
||||||
|
elapsedTime time.Duration
|
||||||
|
progressRatio float64
|
||||||
|
expectedTotalDuration time.Duration
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewProgressState(elapsedTime time.Duration, progressRatio float64, expectedTotalDuration time.Duration) *ProgressState {
|
||||||
|
result := &ProgressState{
|
||||||
|
elapsedTime: elapsedTime,
|
||||||
|
progressRatio: progressRatio,
|
||||||
|
expectedTotalDuration: expectedTotalDuration,
|
||||||
|
}
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
|
type ProgressHistory struct {
|
||||||
|
lastProgressState *ProgressState
|
||||||
|
historyMutex *sync.Mutex
|
||||||
|
eta time.Time
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewProgressHistory() *ProgressHistory {
|
||||||
|
result := &ProgressHistory{
|
||||||
|
lastProgressState: nil,
|
||||||
|
historyMutex: &sync.Mutex{},
|
||||||
|
}
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
|
func (this *ProgressHistory) markState(elapsedTime time.Duration, progressRatio float64) (expectedTotalDuration time.Duration, err error) {
|
||||||
|
if progressRatio < minProgressRatio || elapsedTime == 0 {
|
||||||
|
return expectedTotalDuration, fmt.Errorf("eta n/a")
|
||||||
|
}
|
||||||
|
|
||||||
|
this.historyMutex.Lock()
|
||||||
|
defer this.historyMutex.Unlock()
|
||||||
|
|
||||||
|
newExpectedTotalDuration := float64(elapsedTime.Nanoseconds()) / progressRatio
|
||||||
|
if this.lastProgressState != nil {
|
||||||
|
newExpectedTotalDuration = alpha*float64(this.lastProgressState.expectedTotalDuration.Nanoseconds()) + (1.0-alpha)*newExpectedTotalDuration
|
||||||
|
}
|
||||||
|
expectedTotalDuration = time.Duration(int64(newExpectedTotalDuration))
|
||||||
|
this.lastProgressState = NewProgressState(elapsedTime, progressRatio, expectedTotalDuration)
|
||||||
|
this.eta = time.Now().Add(expectedTotalDuration - elapsedTime)
|
||||||
|
return expectedTotalDuration, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (this *ProgressHistory) GetETA() time.Time {
|
||||||
|
this.historyMutex.Lock()
|
||||||
|
defer this.historyMutex.Unlock()
|
||||||
|
|
||||||
|
return this.eta
|
||||||
|
}
|
46
go/logic/progress_test.go
Normal file
46
go/logic/progress_test.go
Normal file
@ -0,0 +1,46 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2016 GitHub Inc.
|
||||||
|
See https://github.com/github/gh-ost/blob/master/LICENSE
|
||||||
|
*/
|
||||||
|
|
||||||
|
package logic
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/outbrain/golib/log"
|
||||||
|
test "github.com/outbrain/golib/tests"
|
||||||
|
)
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
log.SetLevel(log.ERROR)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestNewProgressHistory(t *testing.T) {
|
||||||
|
progressHistory := NewProgressHistory()
|
||||||
|
test.S(t).ExpectTrue(progressHistory.lastProgressState == nil)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestMarkState(t *testing.T) {
|
||||||
|
{
|
||||||
|
progressHistory := NewProgressHistory()
|
||||||
|
_, err := progressHistory.markState(0, 0)
|
||||||
|
test.S(t).ExpectNotNil(err)
|
||||||
|
}
|
||||||
|
{
|
||||||
|
progressHistory := NewProgressHistory()
|
||||||
|
_, err := progressHistory.markState(0, 0.01)
|
||||||
|
test.S(t).ExpectNotNil(err)
|
||||||
|
}
|
||||||
|
{
|
||||||
|
progressHistory := NewProgressHistory()
|
||||||
|
_, err := progressHistory.markState(0, 50)
|
||||||
|
test.S(t).ExpectNotNil(err)
|
||||||
|
}
|
||||||
|
{
|
||||||
|
progressHistory := NewProgressHistory()
|
||||||
|
_, err := progressHistory.markState(time.Hour, 50)
|
||||||
|
test.S(t).ExpectNil(err)
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user