ETA counting rows, fixed copy time on count
This commit is contained in:
parent
5b0593c29b
commit
46bbea2a32
@ -111,6 +111,7 @@ type MigrationContext struct {
|
|||||||
throttleReason string
|
throttleReason string
|
||||||
throttleMutex *sync.Mutex
|
throttleMutex *sync.Mutex
|
||||||
IsPostponingCutOver int64
|
IsPostponingCutOver int64
|
||||||
|
CountingRowsFlag int64
|
||||||
|
|
||||||
OriginalTableColumns *sql.ColumnList
|
OriginalTableColumns *sql.ColumnList
|
||||||
OriginalTableUniqueKeys [](*sql.UniqueKey)
|
OriginalTableUniqueKeys [](*sql.UniqueKey)
|
||||||
@ -261,11 +262,23 @@ func (this *MigrationContext) ElapsedTime() time.Duration {
|
|||||||
return time.Now().Sub(this.StartTime)
|
return time.Now().Sub(this.StartTime)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// MarkRowCopyStartTime
|
||||||
|
func (this *MigrationContext) MarkRowCopyStartTime() {
|
||||||
|
this.throttleMutex.Lock()
|
||||||
|
defer this.throttleMutex.Unlock()
|
||||||
|
this.RowCopyStartTime = time.Now()
|
||||||
|
}
|
||||||
|
|
||||||
// ElapsedRowCopyTime returns time since starting to copy chunks of rows
|
// ElapsedRowCopyTime returns time since starting to copy chunks of rows
|
||||||
func (this *MigrationContext) ElapsedRowCopyTime() time.Duration {
|
func (this *MigrationContext) ElapsedRowCopyTime() time.Duration {
|
||||||
this.throttleMutex.Lock()
|
this.throttleMutex.Lock()
|
||||||
defer this.throttleMutex.Unlock()
|
defer this.throttleMutex.Unlock()
|
||||||
|
|
||||||
|
if this.RowCopyStartTime.IsZero() {
|
||||||
|
// Row copy hasn't started yet
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
|
||||||
if this.RowCopyEndTime.IsZero() {
|
if this.RowCopyEndTime.IsZero() {
|
||||||
return time.Now().Sub(this.RowCopyStartTime)
|
return time.Now().Sub(this.RowCopyStartTime)
|
||||||
}
|
}
|
||||||
|
@ -9,6 +9,7 @@ import (
|
|||||||
gosql "database/sql"
|
gosql "database/sql"
|
||||||
"fmt"
|
"fmt"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync/atomic"
|
||||||
|
|
||||||
"github.com/github/gh-ost/go/base"
|
"github.com/github/gh-ost/go/base"
|
||||||
"github.com/github/gh-ost/go/mysql"
|
"github.com/github/gh-ost/go/mysql"
|
||||||
@ -364,13 +365,19 @@ func (this *Inspector) estimateTableRowsViaExplain() error {
|
|||||||
|
|
||||||
// CountTableRows counts exact number of rows on the original table
|
// CountTableRows counts exact number of rows on the original table
|
||||||
func (this *Inspector) CountTableRows() error {
|
func (this *Inspector) CountTableRows() error {
|
||||||
|
atomic.StoreInt64(&this.migrationContext.CountingRowsFlag, 1)
|
||||||
|
defer atomic.StoreInt64(&this.migrationContext.CountingRowsFlag, 0)
|
||||||
|
|
||||||
log.Infof("As instructed, I'm issuing a SELECT COUNT(*) on the table. This may take a while")
|
log.Infof("As instructed, I'm issuing a SELECT COUNT(*) on the table. This may take a while")
|
||||||
|
|
||||||
query := fmt.Sprintf(`select /* gh-ost */ count(*) as rows from %s.%s`, sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.OriginalTableName))
|
query := fmt.Sprintf(`select /* gh-ost */ count(*) as rows from %s.%s`, sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.OriginalTableName))
|
||||||
if err := this.db.QueryRow(query).Scan(&this.migrationContext.RowsEstimate); err != nil {
|
if err := this.db.QueryRow(query).Scan(&this.migrationContext.RowsEstimate); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
this.migrationContext.UsedRowsEstimateMethod = base.CountRowsEstimate
|
this.migrationContext.UsedRowsEstimateMethod = base.CountRowsEstimate
|
||||||
|
|
||||||
log.Infof("Exact number of rows via COUNT: %d", this.migrationContext.RowsEstimate)
|
log.Infof("Exact number of rows via COUNT: %d", this.migrationContext.RowsEstimate)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -418,7 +418,7 @@ func (this *Migrator) Migrate() (err error) {
|
|||||||
go this.initiateThrottler()
|
go this.initiateThrottler()
|
||||||
go this.executeWriteFuncs()
|
go this.executeWriteFuncs()
|
||||||
go this.iterateChunks()
|
go this.iterateChunks()
|
||||||
this.migrationContext.RowCopyStartTime = time.Now()
|
this.migrationContext.MarkRowCopyStartTime()
|
||||||
go this.initiateStatus()
|
go this.initiateStatus()
|
||||||
|
|
||||||
log.Debugf("Operating until row copy is complete")
|
log.Debugf("Operating until row copy is complete")
|
||||||
@ -951,7 +951,9 @@ func (this *Migrator) printStatus(rule PrintStatusRule, writers ...io.Writer) {
|
|||||||
|
|
||||||
var etaSeconds float64 = math.MaxFloat64
|
var etaSeconds float64 = math.MaxFloat64
|
||||||
eta := "N/A"
|
eta := "N/A"
|
||||||
if atomic.LoadInt64(&this.migrationContext.IsPostponingCutOver) > 0 {
|
if atomic.LoadInt64(&this.migrationContext.CountingRowsFlag) > 0 {
|
||||||
|
eta = "counting rows"
|
||||||
|
} else if atomic.LoadInt64(&this.migrationContext.IsPostponingCutOver) > 0 {
|
||||||
eta = "postponing cut-over"
|
eta = "postponing cut-over"
|
||||||
} else if isThrottled, throttleReason := this.migrationContext.IsThrottled(); isThrottled {
|
} else if isThrottled, throttleReason := this.migrationContext.IsThrottled(); isThrottled {
|
||||||
eta = fmt.Sprintf("throttled, %s", throttleReason)
|
eta = fmt.Sprintf("throttled, %s", throttleReason)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user