decoupling read query and write query

This commit is contained in:
Shlomi Noach 2017-03-26 19:19:36 +03:00
parent adac52e482
commit a19f0b48ae

View File

@ -24,6 +24,36 @@ const (
atomicCutOverMagicHint = "ghost-cut-over-sentry"
)
type BuiltDMLQuery struct {
query string
args []interface{}
dataViaBinlog bool
rowsDelta int64
err error
}
func NewBuiltDMLQuery(query string, args []interface{}, dataViaBinlog bool, rowsDelta int64, err error) *BuiltDMLQuery {
return &BuiltDMLQuery{
query: query,
args: args,
dataViaBinlog: dataViaBinlog,
rowsDelta: rowsDelta,
err: err,
}
}
type BuiltDMLQueryPair struct {
readQuery *BuiltDMLQuery
writeQuery *BuiltDMLQuery
}
func NewBuiltDMLQueryPair(readQuery *BuiltDMLQuery, writeQuery *BuiltDMLQuery) *BuiltDMLQueryPair {
return &BuiltDMLQueryPair{
readQuery: readQuery,
writeQuery: writeQuery,
}
}
// Applier connects and writes the the applier-server, which is the server where migration
// happens. This is typically the master, but could be a replica when `--test-on-replica` or
// `--execute-on-replica` are given.
@ -904,35 +934,38 @@ func (this *Applier) ShowStatusVariable(variableName string) (result int64, err
// buildDMLEventQuery creates a query to operate on the ghost table, based on an intercepted binlog
// event entry on the original table.
func (this *Applier) buildDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) (query string, args []interface{}, dataViaBinlog bool, rowsDelta int64, err error) {
func (this *Applier) buildDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) *BuiltDMLQueryPair {
var args []interface{}
switch dmlEvent.DML {
case binlog.DeleteDML:
{
query, uniqueKeyArgs, err := sql.BuildDMLDeleteQuery(dmlEvent.DatabaseName, this.migrationContext.GetGhostTableName(), this.migrationContext.OriginalTableColumns, &this.migrationContext.UniqueKey.Columns, dmlEvent.WhereColumnValues.AbstractValues())
return query, uniqueKeyArgs, true, -1, err
return NewBuiltDMLQueryPair(nil, NewBuiltDMLQuery(query, uniqueKeyArgs, true, -1, err))
}
case binlog.InsertDML:
{
if this.migrationContext.UniqueKey.IsPrimary() {
query, sharedArgs, err := sql.BuildPKInsertPreparedQuery(dmlEvent.DatabaseName, this.migrationContext.OriginalTableName, this.migrationContext.GetGhostTableName(), this.migrationContext.OriginalTableColumns, this.migrationContext.SharedColumns.Names(), this.migrationContext.MappedSharedColumns.Names(), this.migrationContext.UniqueKey, dmlEvent.NewColumnValues.AbstractValues(), this.migrationContext.IsTransactionalTable())
return query, sharedArgs, false, 1, err
}
// if this.migrationContext.UniqueKey.IsPrimary() {
// query, uniqueKeyArgs, err := sql.BuildPKSelectPreparedQuery(dmlEvent.DatabaseName, this.migrationContext.OriginalTableName, this.migrationContext.OriginalTableColumns, this.migrationContext.SharedColumns.Names(), this.migrationContext.UniqueKey, args, false)
// return NewBuiltDMLQuery(query, uniqueKeyArgs, false, 1, err)
// query, sharedArgs, err := sql.BuildPKInsertPreparedQuery(dmlEvent.DatabaseName, this.migrationContext.OriginalTableName, this.migrationContext.GetGhostTableName(), this.migrationContext.OriginalTableColumns, this.migrationContext.SharedColumns.Names(), this.migrationContext.MappedSharedColumns.Names(), this.migrationContext.UniqueKey, dmlEvent.NewColumnValues.AbstractValues(), this.migrationContext.IsTransactionalTable())
// return NewBuiltDMLQuery(query, sharedArgs, false, 1, err)
// }
query, sharedArgs, err := sql.BuildDMLInsertQuery(dmlEvent.DatabaseName, this.migrationContext.GetGhostTableName(), this.migrationContext.OriginalTableColumns, this.migrationContext.SharedColumns, this.migrationContext.MappedSharedColumns, dmlEvent.NewColumnValues.AbstractValues())
return query, sharedArgs, true, 1, err
return NewBuiltDMLQueryPair(nil, NewBuiltDMLQuery(query, sharedArgs, true, 1, err))
}
case binlog.UpdateDML:
{
if this.migrationContext.UniqueKey.IsPrimary() {
query, sharedArgs, err := sql.BuildPKInsertPreparedQuery(dmlEvent.DatabaseName, this.migrationContext.OriginalTableName, this.migrationContext.GetGhostTableName(), this.migrationContext.OriginalTableColumns, this.migrationContext.SharedColumns.Names(), this.migrationContext.MappedSharedColumns.Names(), this.migrationContext.UniqueKey, dmlEvent.NewColumnValues.AbstractValues(), this.migrationContext.IsTransactionalTable())
return query, sharedArgs, false, 1, err
}
// if this.migrationContext.UniqueKey.IsPrimary() {
// query, sharedArgs, err := sql.BuildPKInsertPreparedQuery(dmlEvent.DatabaseName, this.migrationContext.OriginalTableName, this.migrationContext.GetGhostTableName(), this.migrationContext.OriginalTableColumns, this.migrationContext.SharedColumns.Names(), this.migrationContext.MappedSharedColumns.Names(), this.migrationContext.UniqueKey, dmlEvent.NewColumnValues.AbstractValues(), this.migrationContext.IsTransactionalTable())
// return NewBuiltDMLQuery(query, sharedArgs, false, 1, err)
// }
query, sharedArgs, uniqueKeyArgs, err := sql.BuildDMLUpdateQuery(dmlEvent.DatabaseName, this.migrationContext.GetGhostTableName(), this.migrationContext.OriginalTableColumns, this.migrationContext.SharedColumns, this.migrationContext.MappedSharedColumns, &this.migrationContext.UniqueKey.Columns, dmlEvent.NewColumnValues.AbstractValues(), dmlEvent.WhereColumnValues.AbstractValues())
args = append(args, sharedArgs...)
args = append(args, uniqueKeyArgs...)
return query, args, true, 0, err
return NewBuiltDMLQueryPair(nil, NewBuiltDMLQuery(query, args, true, 0, err))
}
}
return "", args, false, 0, fmt.Errorf("Unknown dml event type: %+v", dmlEvent.DML)
return NewBuiltDMLQueryPair(nil, NewBuiltDMLQuery("", args, false, 0, fmt.Errorf("Unknown dml event type: %+v", dmlEvent.DML)))
}
// ApplyDMLEventQueries applies multiple DML queries onto the _ghost_ table
@ -959,15 +992,16 @@ func (this *Applier) ApplyDMLEventQueries(dmlEvents [](*binlog.BinlogDMLEvent))
return rollback(err)
}
for _, dmlEvent := range dmlEvents {
query, args, _, rowDelta, err := this.buildDMLEventQuery(dmlEvent)
if err != nil {
builtDmlQueries := this.buildDMLEventQuery(dmlEvent)
writeQuery := builtDmlQueries.writeQuery
if writeQuery.err != nil {
return rollback(err)
}
if _, err := tx.Exec(query, args...); err != nil {
err = fmt.Errorf("%s; query=%s; args=%+v", err.Error(), query, args)
if _, err := tx.Exec(writeQuery.query, writeQuery.args...); err != nil {
err = fmt.Errorf("%s; query=%s; args=%+v", err.Error(), writeQuery.query, writeQuery.args)
return rollback(err)
}
totalDelta += rowDelta
totalDelta += writeQuery.rowsDelta
}
if err := tx.Commit(); err != nil {
return err