Merge pull request #196 from github/concurrent-rowcount
concurrent row-count
This commit is contained in:
commit
2b595b15f2
@ -49,6 +49,7 @@ type MigrationContext struct {
|
||||
AlterStatement string
|
||||
|
||||
CountTableRows bool
|
||||
ConcurrentCountTableRows bool
|
||||
AllowedRunningOnMaster bool
|
||||
AllowedMasterMaster bool
|
||||
SwitchToRowBinlogFormat bool
|
||||
@ -94,6 +95,7 @@ type MigrationContext struct {
|
||||
|
||||
TableEngine string
|
||||
RowsEstimate int64
|
||||
RowsDeltaEstimate int64
|
||||
UsedRowsEstimateMethod RowsEstimateMethod
|
||||
HasSuperPrivilege bool
|
||||
OriginalBinlogFormat string
|
||||
|
@ -53,6 +53,7 @@ func main() {
|
||||
flag.StringVar(&migrationContext.OriginalTableName, "table", "", "table name (mandatory)")
|
||||
flag.StringVar(&migrationContext.AlterStatement, "alter", "", "alter statement (mandatory)")
|
||||
flag.BoolVar(&migrationContext.CountTableRows, "exact-rowcount", false, "actually count table rows as opposed to estimate them (results in more accurate progress estimation)")
|
||||
flag.BoolVar(&migrationContext.ConcurrentCountTableRows, "concurrent-rowcount", false, "(with --exact-rowcount), when true: count rows after row-copy begins, concurrently, and adjust row estimate later on; defaults false: first count rows, then start row copy")
|
||||
flag.BoolVar(&migrationContext.AllowedRunningOnMaster, "allow-on-master", false, "allow this migration to run directly on master. Preferably it would run on a replica")
|
||||
flag.BoolVar(&migrationContext.AllowedMasterMaster, "allow-master-master", false, "explicitly allow running in a master-master setup")
|
||||
flag.BoolVar(&migrationContext.NullableUniqueKeyAllowed, "allow-nullable-unique-key", false, "allow gh-ost to migrate based on a unique key with nullable columns. As long as no NULL values exist, this should be OK. If NULL values exist in chosen key, data may be corrupted. Use at your own risk!")
|
||||
|
@ -893,7 +893,7 @@ func (this *Applier) ApplyDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) error {
|
||||
// no error
|
||||
atomic.AddInt64(&this.migrationContext.TotalDMLEventsApplied, 1)
|
||||
if this.migrationContext.CountTableRows {
|
||||
atomic.AddInt64(&this.migrationContext.RowsEstimate, rowDelta)
|
||||
atomic.AddInt64(&this.migrationContext.RowsDeltaEstimate, rowDelta)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -440,12 +440,14 @@ func (this *Inspector) CountTableRows() error {
|
||||
log.Infof("As instructed, I'm issuing a SELECT COUNT(*) on the table. This may take a while")
|
||||
|
||||
query := fmt.Sprintf(`select /* gh-ost */ count(*) as rows from %s.%s`, sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.OriginalTableName))
|
||||
if err := this.db.QueryRow(query).Scan(&this.migrationContext.RowsEstimate); err != nil {
|
||||
var rowsEstimate int64
|
||||
if err := this.db.QueryRow(query).Scan(&rowsEstimate); err != nil {
|
||||
return err
|
||||
}
|
||||
atomic.StoreInt64(&this.migrationContext.RowsEstimate, rowsEstimate)
|
||||
this.migrationContext.UsedRowsEstimateMethod = base.CountRowsEstimate
|
||||
|
||||
log.Infof("Exact number of rows via COUNT: %d", this.migrationContext.RowsEstimate)
|
||||
log.Infof("Exact number of rows via COUNT: %d", rowsEstimate)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -361,6 +361,24 @@ func (this *Migrator) validateStatement() (err error) {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (this *Migrator) countTableRows() (err error) {
|
||||
if !this.migrationContext.CountTableRows {
|
||||
// Not counting; we stay with an estimate
|
||||
return nil
|
||||
}
|
||||
if this.migrationContext.Noop {
|
||||
log.Debugf("Noop operation; not really counting table rows")
|
||||
return nil
|
||||
}
|
||||
if this.migrationContext.ConcurrentCountTableRows {
|
||||
go this.inspector.CountTableRows()
|
||||
log.Infof("As instructed, counting rows in the background; meanwhile I will use an estimated count, and will update it later on")
|
||||
// and we ignore errors, because this turns to be a background job
|
||||
return nil
|
||||
}
|
||||
return this.inspector.CountTableRows()
|
||||
}
|
||||
|
||||
// Migrate executes the complete migration logic. This is *the* major gh-ost function.
|
||||
func (this *Migrator) Migrate() (err error) {
|
||||
log.Infof("Migrating %s.%s", sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.OriginalTableName))
|
||||
@ -402,12 +420,8 @@ func (this *Migrator) Migrate() (err error) {
|
||||
}
|
||||
defer this.server.RemoveSocketFile()
|
||||
|
||||
if this.migrationContext.CountTableRows {
|
||||
if this.migrationContext.Noop {
|
||||
log.Debugf("Noop operation; not really counting table rows")
|
||||
} else if err := this.inspector.CountTableRows(); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := this.countTableRows(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := this.addDMLEventsListener(); err != nil {
|
||||
@ -948,7 +962,7 @@ func (this *Migrator) printStatus(rule PrintStatusRule, writers ...io.Writer) {
|
||||
elapsedTime := this.migrationContext.ElapsedTime()
|
||||
elapsedSeconds := int64(elapsedTime.Seconds())
|
||||
totalRowsCopied := this.migrationContext.GetTotalRowsCopied()
|
||||
rowsEstimate := atomic.LoadInt64(&this.migrationContext.RowsEstimate)
|
||||
rowsEstimate := atomic.LoadInt64(&this.migrationContext.RowsEstimate) + atomic.LoadInt64(&this.migrationContext.RowsDeltaEstimate)
|
||||
var progressPct float64
|
||||
if rowsEstimate == 0 {
|
||||
progressPct = 100.0
|
||||
@ -970,7 +984,7 @@ func (this *Migrator) printStatus(rule PrintStatusRule, writers ...io.Writer) {
|
||||
|
||||
var etaSeconds float64 = math.MaxFloat64
|
||||
eta := "N/A"
|
||||
if atomic.LoadInt64(&this.migrationContext.CountingRowsFlag) > 0 {
|
||||
if atomic.LoadInt64(&this.migrationContext.CountingRowsFlag) > 0 && !this.migrationContext.ConcurrentCountTableRows {
|
||||
eta = "counting rows"
|
||||
} else if atomic.LoadInt64(&this.migrationContext.IsPostponingCutOver) > 0 {
|
||||
eta = "postponing cut-over"
|
||||
|
Loading…
Reference in New Issue
Block a user