Initial support for batching multiple DMLs when writing to ghost table
This commit is contained in:
parent
6bcf5154cd
commit
3e28f462d8
@ -950,3 +950,48 @@ func (this *Applier) ApplyDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) error {
|
|||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ApplyDMLEventQueries applies multiple DML queries onto the _ghost_ table
|
||||||
|
func (this *Applier) ApplyDMLEventQueries(dmlEvents [](*binlog.BinlogDMLEvent)) error {
|
||||||
|
|
||||||
|
var totalDelta int64
|
||||||
|
|
||||||
|
err := func() error {
|
||||||
|
tx, err := this.db.Begin()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
sessionQuery := `SET
|
||||||
|
SESSION time_zone = '+00:00',
|
||||||
|
sql_mode = CONCAT(@@session.sql_mode, ',STRICT_ALL_TABLES')
|
||||||
|
`
|
||||||
|
if _, err := tx.Exec(sessionQuery); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
for _, dmlEvent := range dmlEvents {
|
||||||
|
query, args, rowDelta, err := this.buildDMLEventQuery(dmlEvent)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if _, err := tx.Exec(query, args...); err != nil {
|
||||||
|
err = fmt.Errorf("%s; query=%s; args=%+v", err.Error(), query, args)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
totalDelta += rowDelta
|
||||||
|
}
|
||||||
|
if err := tx.Commit(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}()
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return log.Errore(err)
|
||||||
|
}
|
||||||
|
// no error
|
||||||
|
atomic.AddInt64(&this.migrationContext.TotalDMLEventsApplied, int64(len(dmlEvents)))
|
||||||
|
if this.migrationContext.CountTableRows {
|
||||||
|
atomic.AddInt64(&this.migrationContext.RowsDeltaEstimate, totalDelta)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user