supporting update to columns of migration key

This commit is contained in:
Shlomi Noach 2017-11-20 08:17:20 +02:00
parent 8a59d7e823
commit 3898d49f6c
2 changed files with 94 additions and 59 deletions

View File

@ -24,6 +24,32 @@ const (
atomicCutOverMagicHint = "ghost-cut-over-sentry" atomicCutOverMagicHint = "ghost-cut-over-sentry"
) )
type dmlBuildResult struct {
query string
args []interface{}
rowsDelta int64
err error
}
func newDmlBuildResult(query string, args []interface{}, rowsDelta int64, err error) *dmlBuildResult {
return &dmlBuildResult{
query: query,
args: args,
rowsDelta: rowsDelta,
err: err,
}
}
func newDmlBuildResultError(err error) *dmlBuildResult {
return &dmlBuildResult{
err: err,
}
}
func buildResults(args ...(*dmlBuildResult)) [](*dmlBuildResult) {
return args
}
// Applier connects and writes the the applier-server, which is the server where migration // Applier connects and writes the the applier-server, which is the server where migration
// happens. This is typically the master, but could be a replica when `--test-on-replica` or // happens. This is typically the master, but could be a replica when `--test-on-replica` or
// `--execute-on-replica` are given. // `--execute-on-replica` are given.
@ -899,7 +925,7 @@ func (this *Applier) ShowStatusVariable(variableName string) (result int64, err
return result, nil return result, nil
} }
func (this *Applier) validateUpdateDoesNotModifyMigrationUniqueKeyColumns(dmlEvent *binlog.BinlogDMLEvent) error { func (this *Applier) updateModifiesUniqueKeyColumns(dmlEvent *binlog.BinlogDMLEvent) (modifiedColumn string, isModified bool) {
// log.Debugf("............ UPDATE") // log.Debugf("............ UPDATE")
// log.Debugf("............ UPDATE: %+v", this.migrationContext.UniqueKey.Columns.String()) // log.Debugf("............ UPDATE: %+v", this.migrationContext.UniqueKey.Columns.String())
// log.Debugf("............ UPDATE: %+v", dmlEvent.WhereColumnValues.String()) // log.Debugf("............ UPDATE: %+v", dmlEvent.WhereColumnValues.String())
@ -912,46 +938,54 @@ func (this *Applier) validateUpdateDoesNotModifyMigrationUniqueKeyColumns(dmlEve
// log.Debugf("............ UPDATE: new value= %+v", newColumnValue) // log.Debugf("............ UPDATE: new value= %+v", newColumnValue)
// log.Debugf("............ UPDATE: equals? %+v", newColumnValue == whereColumnValue) // log.Debugf("............ UPDATE: equals? %+v", newColumnValue == whereColumnValue)
if newColumnValue != whereColumnValue { if newColumnValue != whereColumnValue {
return log.Errorf("gh-ost detected an UPDATE to a unique key column this migration is iterating on. Such update is not supported. Column is %s", column.Name) return column.Name, true
} }
} }
return nil return "", false
} }
// buildDMLEventQuery creates a query to operate on the ghost table, based on an intercepted binlog // buildDMLEventQuery creates a query to operate on the ghost table, based on an intercepted binlog
// event entry on the original table. // event entry on the original table.
func (this *Applier) buildDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) (query string, args []interface{}, rowsDelta int64, err error) { func (this *Applier) buildDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) (results [](*dmlBuildResult)) {
switch dmlEvent.DML { switch dmlEvent.DML {
case binlog.DeleteDML: case binlog.DeleteDML:
{ {
query, uniqueKeyArgs, err := sql.BuildDMLDeleteQuery(dmlEvent.DatabaseName, this.migrationContext.GetGhostTableName(), this.migrationContext.OriginalTableColumns, &this.migrationContext.UniqueKey.Columns, dmlEvent.WhereColumnValues.AbstractValues()) query, uniqueKeyArgs, err := sql.BuildDMLDeleteQuery(dmlEvent.DatabaseName, this.migrationContext.GetGhostTableName(), this.migrationContext.OriginalTableColumns, &this.migrationContext.UniqueKey.Columns, dmlEvent.WhereColumnValues.AbstractValues())
return query, uniqueKeyArgs, -1, err return buildResults(newDmlBuildResult(query, uniqueKeyArgs, -1, err))
} }
case binlog.InsertDML: case binlog.InsertDML:
{ {
query, sharedArgs, err := sql.BuildDMLInsertQuery(dmlEvent.DatabaseName, this.migrationContext.GetGhostTableName(), this.migrationContext.OriginalTableColumns, this.migrationContext.SharedColumns, this.migrationContext.MappedSharedColumns, dmlEvent.NewColumnValues.AbstractValues()) query, sharedArgs, err := sql.BuildDMLInsertQuery(dmlEvent.DatabaseName, this.migrationContext.GetGhostTableName(), this.migrationContext.OriginalTableColumns, this.migrationContext.SharedColumns, this.migrationContext.MappedSharedColumns, dmlEvent.NewColumnValues.AbstractValues())
return query, sharedArgs, 1, err return buildResults(newDmlBuildResult(query, sharedArgs, 1, err))
} }
case binlog.UpdateDML: case binlog.UpdateDML:
{ {
if err := this.validateUpdateDoesNotModifyMigrationUniqueKeyColumns(dmlEvent); err != nil { if modifiedColumn, isModified := this.updateModifiesUniqueKeyColumns(dmlEvent); isModified {
return query, args, rowsDelta, err log.Debugf("---------------- Detected modifiedColumn: %+v. Will turn into DELETE+INSERT", modifiedColumn)
dmlEvent.DML = binlog.DeleteDML
results = append(results, this.buildDMLEventQuery(dmlEvent)...)
dmlEvent.DML = binlog.InsertDML
results = append(results, this.buildDMLEventQuery(dmlEvent)...)
return results
// return buildResults(newDmlBuildResultError(log.Errorf("gh-ost detected an UPDATE to a unique key column this migration is iterating on. Such update is not supported. Column is `%s`", modifiedColumn)))
// return query, args, rowsDelta, log.Errorf("gh-ost detected an UPDATE to a unique key column this migration is iterating on. Such update is not supported. Column is `%s`", modifiedColumn)
} }
query, sharedArgs, uniqueKeyArgs, err := sql.BuildDMLUpdateQuery(dmlEvent.DatabaseName, this.migrationContext.GetGhostTableName(), this.migrationContext.OriginalTableColumns, this.migrationContext.SharedColumns, this.migrationContext.MappedSharedColumns, &this.migrationContext.UniqueKey.Columns, dmlEvent.NewColumnValues.AbstractValues(), dmlEvent.WhereColumnValues.AbstractValues()) query, sharedArgs, uniqueKeyArgs, err := sql.BuildDMLUpdateQuery(dmlEvent.DatabaseName, this.migrationContext.GetGhostTableName(), this.migrationContext.OriginalTableColumns, this.migrationContext.SharedColumns, this.migrationContext.MappedSharedColumns, &this.migrationContext.UniqueKey.Columns, dmlEvent.NewColumnValues.AbstractValues(), dmlEvent.WhereColumnValues.AbstractValues())
args := sqlutils.Args()
args = append(args, sharedArgs...) args = append(args, sharedArgs...)
args = append(args, uniqueKeyArgs...) args = append(args, uniqueKeyArgs...)
return query, args, 0, err return buildResults(newDmlBuildResult(query, args, 0, err))
} }
} }
return "", args, 0, fmt.Errorf("Unknown dml event type: %+v", dmlEvent.DML) return buildResults(newDmlBuildResultError(fmt.Errorf("Unknown dml event type: %+v", dmlEvent.DML)))
} }
// ApplyDMLEventQuery writes an entry to the ghost table, in response to an intercepted // ApplyDMLEventQuery writes an entry to the ghost table, in response to an intercepted
// original-table binlog event // original-table binlog event
func (this *Applier) ApplyDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) error { func (this *Applier) ApplyDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) error {
query, args, rowDelta, err := this.buildDMLEventQuery(dmlEvent) for _, buildResult := range this.buildDMLEventQuery(dmlEvent) {
if err != nil { if buildResult.err != nil {
return err return buildResult.err
} }
// TODO The below is in preparation for transactional writes on the ghost tables. // TODO The below is in preparation for transactional writes on the ghost tables.
// Such writes would be, for example: // Such writes would be, for example:
@ -965,7 +999,7 @@ func (this *Applier) ApplyDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) error {
// is solved by silently converting unsigned bigints to string values. // is solved by silently converting unsigned bigints to string values.
// //
err = func() error { err := func() error {
tx, err := this.db.Begin() tx, err := this.db.Begin()
if err != nil { if err != nil {
return err return err
@ -977,7 +1011,7 @@ func (this *Applier) ApplyDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) error {
if _, err := tx.Exec(sessionQuery); err != nil { if _, err := tx.Exec(sessionQuery); err != nil {
return err return err
} }
if _, err := tx.Exec(query, args...); err != nil { if _, err := tx.Exec(buildResult.query, buildResult.args...); err != nil {
return err return err
} }
if err := tx.Commit(); err != nil { if err := tx.Commit(); err != nil {
@ -987,13 +1021,14 @@ func (this *Applier) ApplyDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) error {
}() }()
if err != nil { if err != nil {
err = fmt.Errorf("%s; query=%s; args=%+v", err.Error(), query, args) err = fmt.Errorf("%s; query=%s; args=%+v", err.Error(), buildResult.query, buildResult.args)
return log.Errore(err) return log.Errore(err)
} }
// no error // no error
atomic.AddInt64(&this.migrationContext.TotalDMLEventsApplied, 1) atomic.AddInt64(&this.migrationContext.TotalDMLEventsApplied, 1)
if this.migrationContext.CountTableRows { if this.migrationContext.CountTableRows {
atomic.AddInt64(&this.migrationContext.RowsDeltaEstimate, rowDelta) atomic.AddInt64(&this.migrationContext.RowsDeltaEstimate, buildResult.rowsDelta)
}
} }
return nil return nil
} }
@ -1022,15 +1057,16 @@ func (this *Applier) ApplyDMLEventQueries(dmlEvents [](*binlog.BinlogDMLEvent))
return rollback(err) return rollback(err)
} }
for _, dmlEvent := range dmlEvents { for _, dmlEvent := range dmlEvents {
query, args, rowDelta, err := this.buildDMLEventQuery(dmlEvent) for _, buildResult := range this.buildDMLEventQuery(dmlEvent) {
if err != nil { if buildResult.err != nil {
return rollback(buildResult.err)
}
if _, err := tx.Exec(buildResult.query, buildResult.args...); err != nil {
err = fmt.Errorf("%s; query=%s; args=%+v", err.Error(), buildResult.query, buildResult.args)
return rollback(err) return rollback(err)
} }
if _, err := tx.Exec(query, args...); err != nil { totalDelta += buildResult.rowsDelta
err = fmt.Errorf("%s; query=%s; args=%+v", err.Error(), query, args)
return rollback(err)
} }
totalDelta += rowDelta
} }
if err := tx.Commit(); err != nil { if err := tx.Commit(); err != nil {
return err return err

View File

@ -1 +0,0 @@
gh-ost detected an UPDATE to a unique key column this migration is iterating on