diff --git a/go/logic/applier.go b/go/logic/applier.go index ff7212e..e1bbcc1 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -24,6 +24,36 @@ const ( atomicCutOverMagicHint = "ghost-cut-over-sentry" ) +type BuiltDMLQuery struct { + query string + args []interface{} + dataViaBinlog bool + rowsDelta int64 + err error +} + +func NewBuiltDMLQuery(query string, args []interface{}, dataViaBinlog bool, rowsDelta int64, err error) *BuiltDMLQuery { + return &BuiltDMLQuery{ + query: query, + args: args, + dataViaBinlog: dataViaBinlog, + rowsDelta: rowsDelta, + err: err, + } +} + +type BuiltDMLQueryPair struct { + readQuery *BuiltDMLQuery + writeQuery *BuiltDMLQuery +} + +func NewBuiltDMLQueryPair(readQuery *BuiltDMLQuery, writeQuery *BuiltDMLQuery) *BuiltDMLQueryPair { + return &BuiltDMLQueryPair{ + readQuery: readQuery, + writeQuery: writeQuery, + } +} + // Applier connects and writes the the applier-server, which is the server where migration // happens. This is typically the master, but could be a replica when `--test-on-replica` or // `--execute-on-replica` are given. @@ -904,35 +934,38 @@ func (this *Applier) ShowStatusVariable(variableName string) (result int64, err // buildDMLEventQuery creates a query to operate on the ghost table, based on an intercepted binlog // event entry on the original table. -func (this *Applier) buildDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) (query string, args []interface{}, dataViaBinlog bool, rowsDelta int64, err error) { +func (this *Applier) buildDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) *BuiltDMLQueryPair { + var args []interface{} switch dmlEvent.DML { case binlog.DeleteDML: { query, uniqueKeyArgs, err := sql.BuildDMLDeleteQuery(dmlEvent.DatabaseName, this.migrationContext.GetGhostTableName(), this.migrationContext.OriginalTableColumns, &this.migrationContext.UniqueKey.Columns, dmlEvent.WhereColumnValues.AbstractValues()) - return query, uniqueKeyArgs, true, -1, err + return NewBuiltDMLQueryPair(nil, NewBuiltDMLQuery(query, uniqueKeyArgs, true, -1, err)) } case binlog.InsertDML: { - if this.migrationContext.UniqueKey.IsPrimary() { - query, sharedArgs, err := sql.BuildPKInsertPreparedQuery(dmlEvent.DatabaseName, this.migrationContext.OriginalTableName, this.migrationContext.GetGhostTableName(), this.migrationContext.OriginalTableColumns, this.migrationContext.SharedColumns.Names(), this.migrationContext.MappedSharedColumns.Names(), this.migrationContext.UniqueKey, dmlEvent.NewColumnValues.AbstractValues(), this.migrationContext.IsTransactionalTable()) - return query, sharedArgs, false, 1, err - } + // if this.migrationContext.UniqueKey.IsPrimary() { + // query, uniqueKeyArgs, err := sql.BuildPKSelectPreparedQuery(dmlEvent.DatabaseName, this.migrationContext.OriginalTableName, this.migrationContext.OriginalTableColumns, this.migrationContext.SharedColumns.Names(), this.migrationContext.UniqueKey, args, false) + // return NewBuiltDMLQuery(query, uniqueKeyArgs, false, 1, err) + // query, sharedArgs, err := sql.BuildPKInsertPreparedQuery(dmlEvent.DatabaseName, this.migrationContext.OriginalTableName, this.migrationContext.GetGhostTableName(), this.migrationContext.OriginalTableColumns, this.migrationContext.SharedColumns.Names(), this.migrationContext.MappedSharedColumns.Names(), this.migrationContext.UniqueKey, dmlEvent.NewColumnValues.AbstractValues(), this.migrationContext.IsTransactionalTable()) + // return NewBuiltDMLQuery(query, sharedArgs, false, 1, err) + // } query, sharedArgs, err := sql.BuildDMLInsertQuery(dmlEvent.DatabaseName, this.migrationContext.GetGhostTableName(), this.migrationContext.OriginalTableColumns, this.migrationContext.SharedColumns, this.migrationContext.MappedSharedColumns, dmlEvent.NewColumnValues.AbstractValues()) - return query, sharedArgs, true, 1, err + return NewBuiltDMLQueryPair(nil, NewBuiltDMLQuery(query, sharedArgs, true, 1, err)) } case binlog.UpdateDML: { - if this.migrationContext.UniqueKey.IsPrimary() { - query, sharedArgs, err := sql.BuildPKInsertPreparedQuery(dmlEvent.DatabaseName, this.migrationContext.OriginalTableName, this.migrationContext.GetGhostTableName(), this.migrationContext.OriginalTableColumns, this.migrationContext.SharedColumns.Names(), this.migrationContext.MappedSharedColumns.Names(), this.migrationContext.UniqueKey, dmlEvent.NewColumnValues.AbstractValues(), this.migrationContext.IsTransactionalTable()) - return query, sharedArgs, false, 1, err - } + // if this.migrationContext.UniqueKey.IsPrimary() { + // query, sharedArgs, err := sql.BuildPKInsertPreparedQuery(dmlEvent.DatabaseName, this.migrationContext.OriginalTableName, this.migrationContext.GetGhostTableName(), this.migrationContext.OriginalTableColumns, this.migrationContext.SharedColumns.Names(), this.migrationContext.MappedSharedColumns.Names(), this.migrationContext.UniqueKey, dmlEvent.NewColumnValues.AbstractValues(), this.migrationContext.IsTransactionalTable()) + // return NewBuiltDMLQuery(query, sharedArgs, false, 1, err) + // } query, sharedArgs, uniqueKeyArgs, err := sql.BuildDMLUpdateQuery(dmlEvent.DatabaseName, this.migrationContext.GetGhostTableName(), this.migrationContext.OriginalTableColumns, this.migrationContext.SharedColumns, this.migrationContext.MappedSharedColumns, &this.migrationContext.UniqueKey.Columns, dmlEvent.NewColumnValues.AbstractValues(), dmlEvent.WhereColumnValues.AbstractValues()) args = append(args, sharedArgs...) args = append(args, uniqueKeyArgs...) - return query, args, true, 0, err + return NewBuiltDMLQueryPair(nil, NewBuiltDMLQuery(query, args, true, 0, err)) } } - return "", args, false, 0, fmt.Errorf("Unknown dml event type: %+v", dmlEvent.DML) + return NewBuiltDMLQueryPair(nil, NewBuiltDMLQuery("", args, false, 0, fmt.Errorf("Unknown dml event type: %+v", dmlEvent.DML))) } // ApplyDMLEventQueries applies multiple DML queries onto the _ghost_ table @@ -959,15 +992,16 @@ func (this *Applier) ApplyDMLEventQueries(dmlEvents [](*binlog.BinlogDMLEvent)) return rollback(err) } for _, dmlEvent := range dmlEvents { - query, args, _, rowDelta, err := this.buildDMLEventQuery(dmlEvent) - if err != nil { + builtDmlQueries := this.buildDMLEventQuery(dmlEvent) + writeQuery := builtDmlQueries.writeQuery + if writeQuery.err != nil { return rollback(err) } - if _, err := tx.Exec(query, args...); err != nil { - err = fmt.Errorf("%s; query=%s; args=%+v", err.Error(), query, args) + if _, err := tx.Exec(writeQuery.query, writeQuery.args...); err != nil { + err = fmt.Errorf("%s; query=%s; args=%+v", err.Error(), writeQuery.query, writeQuery.args) return rollback(err) } - totalDelta += rowDelta + totalDelta += writeQuery.rowsDelta } if err := tx.Commit(); err != nil { return err