exact-rowcount implices updating number of rows as we make progress

This commit is contained in:
Shlomi Noach 2016-05-04 08:23:34 +03:00
parent 567cd32ef0
commit 74d8b06db1
2 changed files with 10 additions and 7 deletions

View File

@ -666,31 +666,31 @@ func (this *Applier) ShowStatusVariable(variableName string) (result int64, err
return result, nil
}
func (this *Applier) buildDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) (query string, args []interface{}, err error) {
func (this *Applier) buildDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) (query string, args []interface{}, rowsDelta int64, err error) {
switch dmlEvent.DML {
case binlog.DeleteDML:
{
query, uniqueKeyArgs, err := sql.BuildDMLDeleteQuery(dmlEvent.DatabaseName, this.migrationContext.GetGhostTableName(), this.migrationContext.OriginalTableColumns, &this.migrationContext.UniqueKey.Columns, dmlEvent.WhereColumnValues.AbstractValues())
return query, uniqueKeyArgs, err
return query, uniqueKeyArgs, -1, err
}
case binlog.InsertDML:
{
query, sharedArgs, err := sql.BuildDMLInsertQuery(dmlEvent.DatabaseName, this.migrationContext.GetGhostTableName(), this.migrationContext.OriginalTableColumns, this.migrationContext.SharedColumns, dmlEvent.NewColumnValues.AbstractValues())
return query, sharedArgs, err
return query, sharedArgs, 1, err
}
case binlog.UpdateDML:
{
query, sharedArgs, uniqueKeyArgs, err := sql.BuildDMLUpdateQuery(dmlEvent.DatabaseName, this.migrationContext.GetGhostTableName(), this.migrationContext.OriginalTableColumns, this.migrationContext.SharedColumns, &this.migrationContext.UniqueKey.Columns, dmlEvent.NewColumnValues.AbstractValues(), dmlEvent.WhereColumnValues.AbstractValues())
args = append(args, sharedArgs...)
args = append(args, uniqueKeyArgs...)
return query, args, err
return query, args, 0, err
}
}
return "", args, fmt.Errorf("Unknown dml event type: %+v", dmlEvent.DML)
return "", args, 0, fmt.Errorf("Unknown dml event type: %+v", dmlEvent.DML)
}
func (this *Applier) ApplyDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) error {
query, args, err := this.buildDMLEventQuery(dmlEvent)
query, args, rowDelta, err := this.buildDMLEventQuery(dmlEvent)
if err != nil {
return err
}
@ -698,5 +698,8 @@ func (this *Applier) ApplyDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) error {
if err == nil {
atomic.AddInt64(&this.migrationContext.TotalDMLEventsApplied, 1)
}
if this.migrationContext.CountTableRows {
atomic.AddInt64(&this.migrationContext.RowsEstimate, rowDelta)
}
return err
}

View File

@ -528,7 +528,7 @@ func (this *Migrator) printStatus() {
elapsedTime := this.migrationContext.ElapsedTime()
elapsedSeconds := int64(elapsedTime.Seconds())
totalRowsCopied := this.migrationContext.GetTotalRowsCopied()
rowsEstimate := this.migrationContext.RowsEstimate
rowsEstimate := atomic.LoadInt64(&this.migrationContext.RowsEstimate)
progressPct := 100.0 * float64(totalRowsCopied) / float64(rowsEstimate)
shouldPrintStatus := false