Merge pull request #27 from github/exact-rowcount

exact-rowcount implices updating number of rows as we make progress
This commit is contained in:
Shlomi Noach 2016-05-04 08:26:17 +03:00
commit dfdac964f1
2 changed files with 10 additions and 7 deletions

View File

@ -666,31 +666,31 @@ func (this *Applier) ShowStatusVariable(variableName string) (result int64, err
return result, nil return result, nil
} }
func (this *Applier) buildDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) (query string, args []interface{}, err error) { func (this *Applier) buildDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) (query string, args []interface{}, rowsDelta int64, err error) {
switch dmlEvent.DML { switch dmlEvent.DML {
case binlog.DeleteDML: case binlog.DeleteDML:
{ {
query, uniqueKeyArgs, err := sql.BuildDMLDeleteQuery(dmlEvent.DatabaseName, this.migrationContext.GetGhostTableName(), this.migrationContext.OriginalTableColumns, &this.migrationContext.UniqueKey.Columns, dmlEvent.WhereColumnValues.AbstractValues()) query, uniqueKeyArgs, err := sql.BuildDMLDeleteQuery(dmlEvent.DatabaseName, this.migrationContext.GetGhostTableName(), this.migrationContext.OriginalTableColumns, &this.migrationContext.UniqueKey.Columns, dmlEvent.WhereColumnValues.AbstractValues())
return query, uniqueKeyArgs, err return query, uniqueKeyArgs, -1, err
} }
case binlog.InsertDML: case binlog.InsertDML:
{ {
query, sharedArgs, err := sql.BuildDMLInsertQuery(dmlEvent.DatabaseName, this.migrationContext.GetGhostTableName(), this.migrationContext.OriginalTableColumns, this.migrationContext.SharedColumns, dmlEvent.NewColumnValues.AbstractValues()) query, sharedArgs, err := sql.BuildDMLInsertQuery(dmlEvent.DatabaseName, this.migrationContext.GetGhostTableName(), this.migrationContext.OriginalTableColumns, this.migrationContext.SharedColumns, dmlEvent.NewColumnValues.AbstractValues())
return query, sharedArgs, err return query, sharedArgs, 1, err
} }
case binlog.UpdateDML: case binlog.UpdateDML:
{ {
query, sharedArgs, uniqueKeyArgs, err := sql.BuildDMLUpdateQuery(dmlEvent.DatabaseName, this.migrationContext.GetGhostTableName(), this.migrationContext.OriginalTableColumns, this.migrationContext.SharedColumns, &this.migrationContext.UniqueKey.Columns, dmlEvent.NewColumnValues.AbstractValues(), dmlEvent.WhereColumnValues.AbstractValues()) query, sharedArgs, uniqueKeyArgs, err := sql.BuildDMLUpdateQuery(dmlEvent.DatabaseName, this.migrationContext.GetGhostTableName(), this.migrationContext.OriginalTableColumns, this.migrationContext.SharedColumns, &this.migrationContext.UniqueKey.Columns, dmlEvent.NewColumnValues.AbstractValues(), dmlEvent.WhereColumnValues.AbstractValues())
args = append(args, sharedArgs...) args = append(args, sharedArgs...)
args = append(args, uniqueKeyArgs...) args = append(args, uniqueKeyArgs...)
return query, args, err return query, args, 0, err
} }
} }
return "", args, fmt.Errorf("Unknown dml event type: %+v", dmlEvent.DML) return "", args, 0, fmt.Errorf("Unknown dml event type: %+v", dmlEvent.DML)
} }
func (this *Applier) ApplyDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) error { func (this *Applier) ApplyDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) error {
query, args, err := this.buildDMLEventQuery(dmlEvent) query, args, rowDelta, err := this.buildDMLEventQuery(dmlEvent)
if err != nil { if err != nil {
return err return err
} }
@ -698,5 +698,8 @@ func (this *Applier) ApplyDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) error {
if err == nil { if err == nil {
atomic.AddInt64(&this.migrationContext.TotalDMLEventsApplied, 1) atomic.AddInt64(&this.migrationContext.TotalDMLEventsApplied, 1)
} }
if this.migrationContext.CountTableRows {
atomic.AddInt64(&this.migrationContext.RowsEstimate, rowDelta)
}
return err return err
} }

View File

@ -528,7 +528,7 @@ func (this *Migrator) printStatus() {
elapsedTime := this.migrationContext.ElapsedTime() elapsedTime := this.migrationContext.ElapsedTime()
elapsedSeconds := int64(elapsedTime.Seconds()) elapsedSeconds := int64(elapsedTime.Seconds())
totalRowsCopied := this.migrationContext.GetTotalRowsCopied() totalRowsCopied := this.migrationContext.GetTotalRowsCopied()
rowsEstimate := this.migrationContext.RowsEstimate rowsEstimate := atomic.LoadInt64(&this.migrationContext.RowsEstimate)
progressPct := 100.0 * float64(totalRowsCopied) / float64(rowsEstimate) progressPct := 100.0 * float64(totalRowsCopied) / float64(rowsEstimate)
shouldPrintStatus := false shouldPrintStatus := false