From 3e28f462d88c9eb9cc84a26b6228fd278170fdb3 Mon Sep 17 00:00:00 2001 From: Shlomi Noach Date: Tue, 3 Jan 2017 13:44:52 +0200 Subject: [PATCH] Initial support for batching multiple DMLs when writing to ghost table --- go/logic/applier.go | 45 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 45 insertions(+) diff --git a/go/logic/applier.go b/go/logic/applier.go index afff578..8fb940a 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -950,3 +950,48 @@ func (this *Applier) ApplyDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) error { } return nil } + +// ApplyDMLEventQueries applies multiple DML queries onto the _ghost_ table +func (this *Applier) ApplyDMLEventQueries(dmlEvents [](*binlog.BinlogDMLEvent)) error { + + var totalDelta int64 + + err := func() error { + tx, err := this.db.Begin() + if err != nil { + return err + } + sessionQuery := `SET + SESSION time_zone = '+00:00', + sql_mode = CONCAT(@@session.sql_mode, ',STRICT_ALL_TABLES') + ` + if _, err := tx.Exec(sessionQuery); err != nil { + return err + } + for _, dmlEvent := range dmlEvents { + query, args, rowDelta, err := this.buildDMLEventQuery(dmlEvent) + if err != nil { + return err + } + if _, err := tx.Exec(query, args...); err != nil { + err = fmt.Errorf("%s; query=%s; args=%+v", err.Error(), query, args) + return err + } + totalDelta += rowDelta + } + if err := tx.Commit(); err != nil { + return err + } + return nil + }() + + if err != nil { + return log.Errore(err) + } + // no error + atomic.AddInt64(&this.migrationContext.TotalDMLEventsApplied, int64(len(dmlEvents))) + if this.migrationContext.CountTableRows { + atomic.AddInt64(&this.migrationContext.RowsDeltaEstimate, totalDelta) + } + return nil +}