diff --git a/go/logic/applier.go b/go/logic/applier.go index 2607f30..971f068 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -24,6 +24,28 @@ const ( atomicCutOverMagicHint = "ghost-cut-over-sentry" ) +type dmlBuildResult struct { + query string + args []interface{} + rowsDelta int64 + err error +} + +func newDmlBuildResult(query string, args []interface{}, rowsDelta int64, err error) *dmlBuildResult { + return &dmlBuildResult{ + query: query, + args: args, + rowsDelta: rowsDelta, + err: err, + } +} + +func newDmlBuildResultError(err error) *dmlBuildResult { + return &dmlBuildResult{ + err: err, + } +} + // Applier connects and writes the the applier-server, which is the server where migration // happens. This is typically the master, but could be a replica when `--test-on-replica` or // `--execute-on-replica` are given. @@ -899,79 +921,103 @@ func (this *Applier) ShowStatusVariable(variableName string) (result int64, err return result, nil } +// updateModifiesUniqueKeyColumns checks whether a UPDATE DML event actually +// modifies values of the migration's unique key (the iterated key). This will call +// for special handling. +func (this *Applier) updateModifiesUniqueKeyColumns(dmlEvent *binlog.BinlogDMLEvent) (modifiedColumn string, isModified bool) { + for _, column := range this.migrationContext.UniqueKey.Columns.Columns() { + tableOrdinal := this.migrationContext.OriginalTableColumns.Ordinals[column.Name] + whereColumnValue := dmlEvent.WhereColumnValues.AbstractValues()[tableOrdinal] + newColumnValue := dmlEvent.NewColumnValues.AbstractValues()[tableOrdinal] + if newColumnValue != whereColumnValue { + return column.Name, true + } + } + return "", false +} + // buildDMLEventQuery creates a query to operate on the ghost table, based on an intercepted binlog // event entry on the original table. -func (this *Applier) buildDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) (query string, args []interface{}, rowsDelta int64, err error) { +func (this *Applier) buildDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) (results [](*dmlBuildResult)) { switch dmlEvent.DML { case binlog.DeleteDML: { query, uniqueKeyArgs, err := sql.BuildDMLDeleteQuery(dmlEvent.DatabaseName, this.migrationContext.GetGhostTableName(), this.migrationContext.OriginalTableColumns, &this.migrationContext.UniqueKey.Columns, dmlEvent.WhereColumnValues.AbstractValues()) - return query, uniqueKeyArgs, -1, err + return append(results, newDmlBuildResult(query, uniqueKeyArgs, -1, err)) } case binlog.InsertDML: { query, sharedArgs, err := sql.BuildDMLInsertQuery(dmlEvent.DatabaseName, this.migrationContext.GetGhostTableName(), this.migrationContext.OriginalTableColumns, this.migrationContext.SharedColumns, this.migrationContext.MappedSharedColumns, dmlEvent.NewColumnValues.AbstractValues()) - return query, sharedArgs, 1, err + return append(results, newDmlBuildResult(query, sharedArgs, 1, err)) } case binlog.UpdateDML: { + if _, isModified := this.updateModifiesUniqueKeyColumns(dmlEvent); isModified { + dmlEvent.DML = binlog.DeleteDML + results = append(results, this.buildDMLEventQuery(dmlEvent)...) + dmlEvent.DML = binlog.InsertDML + results = append(results, this.buildDMLEventQuery(dmlEvent)...) + return results + } query, sharedArgs, uniqueKeyArgs, err := sql.BuildDMLUpdateQuery(dmlEvent.DatabaseName, this.migrationContext.GetGhostTableName(), this.migrationContext.OriginalTableColumns, this.migrationContext.SharedColumns, this.migrationContext.MappedSharedColumns, &this.migrationContext.UniqueKey.Columns, dmlEvent.NewColumnValues.AbstractValues(), dmlEvent.WhereColumnValues.AbstractValues()) + args := sqlutils.Args() args = append(args, sharedArgs...) args = append(args, uniqueKeyArgs...) - return query, args, 0, err + return append(results, newDmlBuildResult(query, args, 0, err)) } } - return "", args, 0, fmt.Errorf("Unknown dml event type: %+v", dmlEvent.DML) + return append(results, newDmlBuildResultError(fmt.Errorf("Unknown dml event type: %+v", dmlEvent.DML))) } // ApplyDMLEventQuery writes an entry to the ghost table, in response to an intercepted // original-table binlog event func (this *Applier) ApplyDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) error { - query, args, rowDelta, err := this.buildDMLEventQuery(dmlEvent) - if err != nil { - return err - } - // TODO The below is in preparation for transactional writes on the ghost tables. - // Such writes would be, for example: - // - prepended with sql_mode setup - // - prepended with time zone setup - // - prepended with SET SQL_LOG_BIN=0 - // - prepended with SET FK_CHECKS=0 - // etc. - // - // a known problem: https://github.com/golang/go/issues/9373 -- bitint unsigned values, not supported in database/sql - // is solved by silently converting unsigned bigints to string values. - // - - err = func() error { - tx, err := this.db.Begin() - if err != nil { - return err + for _, buildResult := range this.buildDMLEventQuery(dmlEvent) { + if buildResult.err != nil { + return buildResult.err } - sessionQuery := `SET + // TODO The below is in preparation for transactional writes on the ghost tables. + // Such writes would be, for example: + // - prepended with sql_mode setup + // - prepended with time zone setup + // - prepended with SET SQL_LOG_BIN=0 + // - prepended with SET FK_CHECKS=0 + // etc. + // + // a known problem: https://github.com/golang/go/issues/9373 -- bitint unsigned values, not supported in database/sql + // is solved by silently converting unsigned bigints to string values. + // + + err := func() error { + tx, err := this.db.Begin() + if err != nil { + return err + } + sessionQuery := `SET SESSION time_zone = '+00:00', sql_mode = CONCAT(@@session.sql_mode, ',STRICT_ALL_TABLES') ` - if _, err := tx.Exec(sessionQuery); err != nil { - return err - } - if _, err := tx.Exec(query, args...); err != nil { - return err - } - if err := tx.Commit(); err != nil { - return err - } - return nil - }() + if _, err := tx.Exec(sessionQuery); err != nil { + return err + } + if _, err := tx.Exec(buildResult.query, buildResult.args...); err != nil { + return err + } + if err := tx.Commit(); err != nil { + return err + } + return nil + }() - if err != nil { - err = fmt.Errorf("%s; query=%s; args=%+v", err.Error(), query, args) - return log.Errore(err) - } - // no error - atomic.AddInt64(&this.migrationContext.TotalDMLEventsApplied, 1) - if this.migrationContext.CountTableRows { - atomic.AddInt64(&this.migrationContext.RowsDeltaEstimate, rowDelta) + if err != nil { + err = fmt.Errorf("%s; query=%s; args=%+v", err.Error(), buildResult.query, buildResult.args) + return log.Errore(err) + } + // no error + atomic.AddInt64(&this.migrationContext.TotalDMLEventsApplied, 1) + if this.migrationContext.CountTableRows { + atomic.AddInt64(&this.migrationContext.RowsDeltaEstimate, buildResult.rowsDelta) + } } return nil } @@ -1000,15 +1046,16 @@ func (this *Applier) ApplyDMLEventQueries(dmlEvents [](*binlog.BinlogDMLEvent)) return rollback(err) } for _, dmlEvent := range dmlEvents { - query, args, rowDelta, err := this.buildDMLEventQuery(dmlEvent) - if err != nil { - return rollback(err) + for _, buildResult := range this.buildDMLEventQuery(dmlEvent) { + if buildResult.err != nil { + return rollback(buildResult.err) + } + if _, err := tx.Exec(buildResult.query, buildResult.args...); err != nil { + err = fmt.Errorf("%s; query=%s; args=%+v", err.Error(), buildResult.query, buildResult.args) + return rollback(err) + } + totalDelta += buildResult.rowsDelta } - if _, err := tx.Exec(query, args...); err != nil { - err = fmt.Errorf("%s; query=%s; args=%+v", err.Error(), query, args) - return rollback(err) - } - totalDelta += rowDelta } if err := tx.Commit(); err != nil { return err diff --git a/localtests/fail-update-pk-column/create.sql b/localtests/fail-update-pk-column/create.sql new file mode 100644 index 0000000..5cc1d37 --- /dev/null +++ b/localtests/fail-update-pk-column/create.sql @@ -0,0 +1,52 @@ +drop table if exists gh_ost_test; +create table gh_ost_test ( + id int auto_increment, + i int not null, + primary key(id) +) auto_increment=1; + +insert into gh_ost_test values (null, 101); +insert into gh_ost_test values (null, 102); +insert into gh_ost_test values (null, 103); +insert into gh_ost_test values (null, 104); +insert into gh_ost_test values (null, 105); +insert into gh_ost_test values (null, 106); +insert into gh_ost_test values (null, 107); +insert into gh_ost_test values (null, 108); +insert into gh_ost_test values (null, 109); +insert into gh_ost_test values (null, 110); +insert into gh_ost_test values (null, 111); +insert into gh_ost_test values (null, 112); +insert into gh_ost_test values (null, 113); +insert into gh_ost_test values (null, 114); +insert into gh_ost_test values (null, 115); +insert into gh_ost_test values (null, 116); +insert into gh_ost_test values (null, 117); +insert into gh_ost_test values (null, 118); +insert into gh_ost_test values (null, 119); +insert into gh_ost_test values (null, 120); +insert into gh_ost_test values (null, 121); +insert into gh_ost_test values (null, 122); +insert into gh_ost_test values (null, 123); +insert into gh_ost_test values (null, 124); +insert into gh_ost_test values (null, 125); +insert into gh_ost_test values (null, 126); +insert into gh_ost_test values (null, 127); +insert into gh_ost_test values (null, 128); +insert into gh_ost_test values (null, 129); + +drop event if exists gh_ost_test; +delimiter ;; +create event gh_ost_test + on schedule every 1 second + starts current_timestamp + interval 3 second + ends current_timestamp + interval 60 second + on completion not preserve + enable + do +begin + update gh_ost_test set id=-2 where id=21; + update gh_ost_test set id=55 where id=22; + update gh_ost_test set id=23 where id=23; + update gh_ost_test set i=5024 where id=24; +end ;; diff --git a/localtests/test.sh b/localtests/test.sh index b901ce3..477ecbb 100755 --- a/localtests/test.sh +++ b/localtests/test.sh @@ -170,7 +170,12 @@ test_single() { build_binary() { echo "Building" + rm -f $ghost_binary go build -o $ghost_binary go/cmd/gh-ost/main.go + if [ $? -ne 0 ] ; then + echo "Build failure" + exit 1 + fi } test_all() {