diff --git a/go/logic/applier.go b/go/logic/applier.go index 2235959..461db05 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -666,31 +666,31 @@ func (this *Applier) ShowStatusVariable(variableName string) (result int64, err return result, nil } -func (this *Applier) buildDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) (query string, args []interface{}, err error) { +func (this *Applier) buildDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) (query string, args []interface{}, rowsDelta int64, err error) { switch dmlEvent.DML { case binlog.DeleteDML: { query, uniqueKeyArgs, err := sql.BuildDMLDeleteQuery(dmlEvent.DatabaseName, this.migrationContext.GetGhostTableName(), this.migrationContext.OriginalTableColumns, &this.migrationContext.UniqueKey.Columns, dmlEvent.WhereColumnValues.AbstractValues()) - return query, uniqueKeyArgs, err + return query, uniqueKeyArgs, -1, err } case binlog.InsertDML: { query, sharedArgs, err := sql.BuildDMLInsertQuery(dmlEvent.DatabaseName, this.migrationContext.GetGhostTableName(), this.migrationContext.OriginalTableColumns, this.migrationContext.SharedColumns, dmlEvent.NewColumnValues.AbstractValues()) - return query, sharedArgs, err + return query, sharedArgs, 1, err } case binlog.UpdateDML: { query, sharedArgs, uniqueKeyArgs, err := sql.BuildDMLUpdateQuery(dmlEvent.DatabaseName, this.migrationContext.GetGhostTableName(), this.migrationContext.OriginalTableColumns, this.migrationContext.SharedColumns, &this.migrationContext.UniqueKey.Columns, dmlEvent.NewColumnValues.AbstractValues(), dmlEvent.WhereColumnValues.AbstractValues()) args = append(args, sharedArgs...) args = append(args, uniqueKeyArgs...) - return query, args, err + return query, args, 0, err } } - return "", args, fmt.Errorf("Unknown dml event type: %+v", dmlEvent.DML) + return "", args, 0, fmt.Errorf("Unknown dml event type: %+v", dmlEvent.DML) } func (this *Applier) ApplyDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) error { - query, args, err := this.buildDMLEventQuery(dmlEvent) + query, args, rowDelta, err := this.buildDMLEventQuery(dmlEvent) if err != nil { return err } @@ -698,5 +698,8 @@ func (this *Applier) ApplyDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) error { if err == nil { atomic.AddInt64(&this.migrationContext.TotalDMLEventsApplied, 1) } + if this.migrationContext.CountTableRows { + atomic.AddInt64(&this.migrationContext.RowsEstimate, rowDelta) + } return err } diff --git a/go/logic/migrator.go b/go/logic/migrator.go index 01f9330..9343227 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -528,7 +528,7 @@ func (this *Migrator) printStatus() { elapsedTime := this.migrationContext.ElapsedTime() elapsedSeconds := int64(elapsedTime.Seconds()) totalRowsCopied := this.migrationContext.GetTotalRowsCopied() - rowsEstimate := this.migrationContext.RowsEstimate + rowsEstimate := atomic.LoadInt64(&this.migrationContext.RowsEstimate) progressPct := 100.0 * float64(totalRowsCopied) / float64(rowsEstimate) shouldPrintStatus := false