diff --git a/go/logic/applier.go b/go/logic/applier.go index a1175b1..a355796 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -24,6 +24,32 @@ const ( atomicCutOverMagicHint = "ghost-cut-over-sentry" ) +type dmlBuildResult struct { + query string + args []interface{} + rowsDelta int64 + err error +} + +func newDmlBuildResult(query string, args []interface{}, rowsDelta int64, err error) *dmlBuildResult { + return &dmlBuildResult{ + query: query, + args: args, + rowsDelta: rowsDelta, + err: err, + } +} + +func newDmlBuildResultError(err error) *dmlBuildResult { + return &dmlBuildResult{ + err: err, + } +} + +func buildResults(args ...(*dmlBuildResult)) [](*dmlBuildResult) { + return args +} + // Applier connects and writes the the applier-server, which is the server where migration // happens. This is typically the master, but could be a replica when `--test-on-replica` or // `--execute-on-replica` are given. @@ -899,7 +925,7 @@ func (this *Applier) ShowStatusVariable(variableName string) (result int64, err return result, nil } -func (this *Applier) validateUpdateDoesNotModifyMigrationUniqueKeyColumns(dmlEvent *binlog.BinlogDMLEvent) error { +func (this *Applier) updateModifiesUniqueKeyColumns(dmlEvent *binlog.BinlogDMLEvent) (modifiedColumn string, isModified bool) { // log.Debugf("............ UPDATE") // log.Debugf("............ UPDATE: %+v", this.migrationContext.UniqueKey.Columns.String()) // log.Debugf("............ UPDATE: %+v", dmlEvent.WhereColumnValues.String()) @@ -912,88 +938,97 @@ func (this *Applier) validateUpdateDoesNotModifyMigrationUniqueKeyColumns(dmlEve // log.Debugf("............ UPDATE: new value= %+v", newColumnValue) // log.Debugf("............ UPDATE: equals? %+v", newColumnValue == whereColumnValue) if newColumnValue != whereColumnValue { - return log.Errorf("gh-ost detected an UPDATE to a unique key column this migration is iterating on. Such update is not supported. Column is %s", column.Name) + return column.Name, true } } - return nil + return "", false } // buildDMLEventQuery creates a query to operate on the ghost table, based on an intercepted binlog // event entry on the original table. -func (this *Applier) buildDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) (query string, args []interface{}, rowsDelta int64, err error) { +func (this *Applier) buildDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) (results [](*dmlBuildResult)) { switch dmlEvent.DML { case binlog.DeleteDML: { query, uniqueKeyArgs, err := sql.BuildDMLDeleteQuery(dmlEvent.DatabaseName, this.migrationContext.GetGhostTableName(), this.migrationContext.OriginalTableColumns, &this.migrationContext.UniqueKey.Columns, dmlEvent.WhereColumnValues.AbstractValues()) - return query, uniqueKeyArgs, -1, err + return buildResults(newDmlBuildResult(query, uniqueKeyArgs, -1, err)) } case binlog.InsertDML: { query, sharedArgs, err := sql.BuildDMLInsertQuery(dmlEvent.DatabaseName, this.migrationContext.GetGhostTableName(), this.migrationContext.OriginalTableColumns, this.migrationContext.SharedColumns, this.migrationContext.MappedSharedColumns, dmlEvent.NewColumnValues.AbstractValues()) - return query, sharedArgs, 1, err + return buildResults(newDmlBuildResult(query, sharedArgs, 1, err)) } case binlog.UpdateDML: { - if err := this.validateUpdateDoesNotModifyMigrationUniqueKeyColumns(dmlEvent); err != nil { - return query, args, rowsDelta, err + if modifiedColumn, isModified := this.updateModifiesUniqueKeyColumns(dmlEvent); isModified { + log.Debugf("---------------- Detected modifiedColumn: %+v. Will turn into DELETE+INSERT", modifiedColumn) + dmlEvent.DML = binlog.DeleteDML + results = append(results, this.buildDMLEventQuery(dmlEvent)...) + dmlEvent.DML = binlog.InsertDML + results = append(results, this.buildDMLEventQuery(dmlEvent)...) + return results + // return buildResults(newDmlBuildResultError(log.Errorf("gh-ost detected an UPDATE to a unique key column this migration is iterating on. Such update is not supported. Column is `%s`", modifiedColumn))) + // return query, args, rowsDelta, log.Errorf("gh-ost detected an UPDATE to a unique key column this migration is iterating on. Such update is not supported. Column is `%s`", modifiedColumn) } query, sharedArgs, uniqueKeyArgs, err := sql.BuildDMLUpdateQuery(dmlEvent.DatabaseName, this.migrationContext.GetGhostTableName(), this.migrationContext.OriginalTableColumns, this.migrationContext.SharedColumns, this.migrationContext.MappedSharedColumns, &this.migrationContext.UniqueKey.Columns, dmlEvent.NewColumnValues.AbstractValues(), dmlEvent.WhereColumnValues.AbstractValues()) + args := sqlutils.Args() args = append(args, sharedArgs...) args = append(args, uniqueKeyArgs...) - return query, args, 0, err + return buildResults(newDmlBuildResult(query, args, 0, err)) } } - return "", args, 0, fmt.Errorf("Unknown dml event type: %+v", dmlEvent.DML) + return buildResults(newDmlBuildResultError(fmt.Errorf("Unknown dml event type: %+v", dmlEvent.DML))) } // ApplyDMLEventQuery writes an entry to the ghost table, in response to an intercepted // original-table binlog event func (this *Applier) ApplyDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) error { - query, args, rowDelta, err := this.buildDMLEventQuery(dmlEvent) - if err != nil { - return err - } - // TODO The below is in preparation for transactional writes on the ghost tables. - // Such writes would be, for example: - // - prepended with sql_mode setup - // - prepended with time zone setup - // - prepended with SET SQL_LOG_BIN=0 - // - prepended with SET FK_CHECKS=0 - // etc. - // - // a known problem: https://github.com/golang/go/issues/9373 -- bitint unsigned values, not supported in database/sql - // is solved by silently converting unsigned bigints to string values. - // - - err = func() error { - tx, err := this.db.Begin() - if err != nil { - return err + for _, buildResult := range this.buildDMLEventQuery(dmlEvent) { + if buildResult.err != nil { + return buildResult.err } - sessionQuery := `SET + // TODO The below is in preparation for transactional writes on the ghost tables. + // Such writes would be, for example: + // - prepended with sql_mode setup + // - prepended with time zone setup + // - prepended with SET SQL_LOG_BIN=0 + // - prepended with SET FK_CHECKS=0 + // etc. + // + // a known problem: https://github.com/golang/go/issues/9373 -- bitint unsigned values, not supported in database/sql + // is solved by silently converting unsigned bigints to string values. + // + + err := func() error { + tx, err := this.db.Begin() + if err != nil { + return err + } + sessionQuery := `SET SESSION time_zone = '+00:00', sql_mode = CONCAT(@@session.sql_mode, ',STRICT_ALL_TABLES') ` - if _, err := tx.Exec(sessionQuery); err != nil { - return err - } - if _, err := tx.Exec(query, args...); err != nil { - return err - } - if err := tx.Commit(); err != nil { - return err - } - return nil - }() + if _, err := tx.Exec(sessionQuery); err != nil { + return err + } + if _, err := tx.Exec(buildResult.query, buildResult.args...); err != nil { + return err + } + if err := tx.Commit(); err != nil { + return err + } + return nil + }() - if err != nil { - err = fmt.Errorf("%s; query=%s; args=%+v", err.Error(), query, args) - return log.Errore(err) - } - // no error - atomic.AddInt64(&this.migrationContext.TotalDMLEventsApplied, 1) - if this.migrationContext.CountTableRows { - atomic.AddInt64(&this.migrationContext.RowsDeltaEstimate, rowDelta) + if err != nil { + err = fmt.Errorf("%s; query=%s; args=%+v", err.Error(), buildResult.query, buildResult.args) + return log.Errore(err) + } + // no error + atomic.AddInt64(&this.migrationContext.TotalDMLEventsApplied, 1) + if this.migrationContext.CountTableRows { + atomic.AddInt64(&this.migrationContext.RowsDeltaEstimate, buildResult.rowsDelta) + } } return nil } @@ -1022,15 +1057,16 @@ func (this *Applier) ApplyDMLEventQueries(dmlEvents [](*binlog.BinlogDMLEvent)) return rollback(err) } for _, dmlEvent := range dmlEvents { - query, args, rowDelta, err := this.buildDMLEventQuery(dmlEvent) - if err != nil { - return rollback(err) + for _, buildResult := range this.buildDMLEventQuery(dmlEvent) { + if buildResult.err != nil { + return rollback(buildResult.err) + } + if _, err := tx.Exec(buildResult.query, buildResult.args...); err != nil { + err = fmt.Errorf("%s; query=%s; args=%+v", err.Error(), buildResult.query, buildResult.args) + return rollback(err) + } + totalDelta += buildResult.rowsDelta } - if _, err := tx.Exec(query, args...); err != nil { - err = fmt.Errorf("%s; query=%s; args=%+v", err.Error(), query, args) - return rollback(err) - } - totalDelta += rowDelta } if err := tx.Commit(); err != nil { return err diff --git a/localtests/fail-update-pk-column/expect_failure b/localtests/fail-update-pk-column/expect_failure deleted file mode 100644 index 1b73369..0000000 --- a/localtests/fail-update-pk-column/expect_failure +++ /dev/null @@ -1 +0,0 @@ -gh-ost detected an UPDATE to a unique key column this migration is iterating on