WIP: support for MINIMAL row image
This commit is contained in:
@ -876,33 +876,48 @@ func (this *Applier) ShowStatusVariable(variableName string) (result int64, err
// buildDMLEventQuery creates a query to operate on the ghost table, based on an intercepted binlog
// event entry on the original table.
func (this *Applier) buildDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) (query string, args []interface{}, rowsDelta int64, err error) {
func (this *Applier) buildDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) (query string, args []interface{}, dataViaBinlog bool, rowsDelta int64, err error) {
switch dmlEvent.DML {
case binlog.DeleteDML:
query, uniqueKeyArgs, err := sql.BuildDMLDeleteQuery(dmlEvent.DatabaseName, this.migrationContext.GetGhostTableName(), this.migrationContext.OriginalTableColumns, &this.migrationContext.UniqueKey.Columns, dmlEvent.WhereColumnValues.AbstractValues())
return query, uniqueKeyArgs, -1, err
return query, uniqueKeyArgs, true, -1, err
case binlog.InsertDML:
if this.migrationContext.UniqueKey.IsPrimary() {
query, sharedArgs, err := sql.BuildPKInsertPreparedQuery(dmlEvent.DatabaseName, this.migrationContext.OriginalTableName, this.migrationContext.GetGhostTableName(), this.migrationContext.OriginalTableColumns, this.migrationContext.SharedColumns.Names(), this.migrationContext.MappedSharedColumns.Names(), this.migrationContext.UniqueKey, dmlEvent.NewColumnValues.AbstractValues(), this.migrationContext.IsTransactionalTable())
log.Errorf("-------------- insert")
log.Errorf("query: %+v", query)
log.Errorf("argss: %+v", sharedArgs)
return query, sharedArgs, false, 1, err
query, sharedArgs, err := sql.BuildDMLInsertQuery(dmlEvent.DatabaseName, this.migrationContext.GetGhostTableName(), this.migrationContext.OriginalTableColumns, this.migrationContext.SharedColumns, this.migrationContext.MappedSharedColumns, dmlEvent.NewColumnValues.AbstractValues())
return query, sharedArgs, 1, err
return query, sharedArgs, true, 1, err
case binlog.UpdateDML:
// if this.migrationContext.UniqueKey.IsPrimary() {
// query, sharedArgs, err := sql.BuildPKInsertPreparedQuery(dmlEvent.DatabaseName, this.migrationContext.OriginalTableName, this.migrationContext.GetGhostTableName(), this.migrationContext.OriginalTableColumns, this.migrationContext.SharedColumns.Names(), this.migrationContext.MappedSharedColumns.Names(), this.migrationContext.UniqueKey, dmlEvent.NewColumnValues.AbstractValues(), this.migrationContext.IsTransactionalTable())
// log.Errorf("-------------- update")
// log.Errorf("query: %+v", query)
// log.Errorf("argss: %+v", sharedArgs)
// return query, sharedArgs, false, 1, err
// }
query, sharedArgs, uniqueKeyArgs, err := sql.BuildDMLUpdateQuery(dmlEvent.DatabaseName, this.migrationContext.GetGhostTableName(), this.migrationContext.OriginalTableColumns, this.migrationContext.SharedColumns, this.migrationContext.MappedSharedColumns, &this.migrationContext.UniqueKey.Columns, dmlEvent.NewColumnValues.AbstractValues(), dmlEvent.WhereColumnValues.AbstractValues())
args = append(args, sharedArgs...)
args = append(args, uniqueKeyArgs...)
return query, args, 0, err
return query, args, true, 0, err
return "", args, 0, fmt.Errorf("Unknown dml event type: %+v", dmlEvent.DML)
return "", args, false, 0, fmt.Errorf("Unknown dml event type: %+v", dmlEvent.DML)
// ApplyDMLEventQuery writes an entry to the ghost table, in response to an intercepted
// original-table binlog event
func (this *Applier) ApplyDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) error {
query, args, rowDelta, err := this.buildDMLEventQuery(dmlEvent)
query, args, dataViaBinlog, rowDelta, err := this.buildDMLEventQuery(dmlEvent)
if err != nil {
return err
@ -923,10 +938,16 @@ func (this *Applier) ApplyDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) error {
if err != nil {
return err
sessionQuery := `SET
SESSION time_zone = '+00:00',
sql_mode = CONCAT(@@session.sql_mode, ',STRICT_ALL_TABLES')
extraSessionChanges := ""
if dataViaBinlog {
extraSessionChanges = `
SESSION time_zone = '+00:00',
sessionQuery := fmt.Sprintf(`SET
sql_mode = CONCAT(@@session.sql_mode, ',STRICT_ALL_TABLES')
`, extraSessionChanges)
if _, err := tx.Exec(sessionQuery); err != nil {
return err
@ -450,3 +450,60 @@ func BuildDMLUpdateQuery(databaseName, tableName string, tableColumns, sharedCol
return result, sharedArgs, uniqueKeyArgs, nil
func BuildPKInsertQuery(databaseName, originalTableName, ghostTableName string, tableColumns *ColumnList, sharedColumns []string, mappedSharedColumns []string, uniqueKey *UniqueKey, pkValues []string, args []interface{}, transactionalTable bool) (result string, uniqueKeyArgs []interface{}, err error) {
if !uniqueKey.IsPrimary() {
return "", uniqueKeyArgs, fmt.Errorf("BuildPKInsertQuery only works for PRIMARY KEY")
uniqueKeyColumns := &uniqueKey.Columns
if len(sharedColumns) == 0 {
return "", uniqueKeyArgs, fmt.Errorf("Got 0 shared columns in BuildRangeInsertQuery")
databaseName = EscapeName(databaseName)
originalTableName = EscapeName(originalTableName)
ghostTableName = EscapeName(ghostTableName)
mappedSharedColumns = duplicateNames(mappedSharedColumns)
for i := range mappedSharedColumns {
mappedSharedColumns[i] = EscapeName(mappedSharedColumns[i])
mappedSharedColumnsListing := strings.Join(mappedSharedColumns, ", ")
sharedColumns = duplicateNames(sharedColumns)
for i := range sharedColumns {
sharedColumns[i] = EscapeName(sharedColumns[i])
sharedColumnsListing := strings.Join(sharedColumns, ", ")
uniqueKeyName := EscapeName(uniqueKey.Name)
equalsComparison, err := BuildEqualsComparison(uniqueKeyColumns.Names(), pkValues)
if err != nil {
return "", uniqueKeyArgs, err
for _, column := range uniqueKeyColumns.Columns() {
tableOrdinal := tableColumns.Ordinals[column.Name]
arg := column.convertArg(args[tableOrdinal])
uniqueKeyArgs = append(uniqueKeyArgs, arg)
transactionalClause := ""
if transactionalTable {
transactionalClause = "lock in share mode"
result = fmt.Sprintf(`
replace /* gh-ost %s.%s */ into %s.%s (%s)
(select %s from %s.%s force index (%s)
where (%s) %s
`, databaseName, originalTableName, databaseName, ghostTableName, mappedSharedColumnsListing,
sharedColumnsListing, databaseName, originalTableName, uniqueKeyName,
equalsComparison, transactionalClause)
return result, uniqueKeyArgs, nil
func BuildPKInsertPreparedQuery(databaseName, originalTableName, ghostTableName string, tableColumns *ColumnList, sharedColumns []string, mappedSharedColumns []string, uniqueKey *UniqueKey, args []interface{}, transactionalTable bool) (result string, uniqueKeyArgs []interface{}, err error) {
pkValues := buildColumnsPreparedValues(&uniqueKey.Columns)
return BuildPKInsertQuery(databaseName, originalTableName, ghostTableName, tableColumns, sharedColumns, mappedSharedColumns, uniqueKey, pkValues, args, transactionalTable)
@ -677,3 +677,72 @@ func TestBuildDMLUpdateQuerySignedUnsigned(t *testing.T) {
test.S(t).ExpectTrue(reflect.DeepEqual(uniqueKeyArgs, []interface{}{uint8(253)}))
func TestBuildPKInsertQuery(t *testing.T) {
databaseName := "mydb"
originalTableName := "tbl"
ghostTableName := "ghost"
sharedColumns := []string{"id", "name", "position"}
tableColumns := NewColumnList(sharedColumns)
args := []interface{}{3, "gromit", "dog"}
uniqueKey := UniqueKey{Name: "PRIMARY", Columns: *NewColumnList([]string{"id"})}
pkValues := []string{"@v1"}
query, explodedArgs, err := BuildPKInsertQuery(databaseName, originalTableName, ghostTableName, tableColumns, sharedColumns, sharedColumns, &uniqueKey, pkValues, args, false)
expected := `
replace /* gh-ost mydb.tbl */ into mydb.ghost (id, name, position)
(select id, name, position from mydb.tbl force index (PRIMARY)
where (((id = @v1)))
test.S(t).ExpectEquals(normalizeQuery(query), normalizeQuery(expected))
test.S(t).ExpectTrue(reflect.DeepEqual(explodedArgs, []interface{}{3}))
uniqueKey := UniqueKey{Name: "PRIMARY", Columns: *NewColumnList([]string{"name", "position"})}
pkValues := []string{"@v1", "@v2"}
query, explodedArgs, err := BuildPKInsertQuery(databaseName, originalTableName, ghostTableName, tableColumns, sharedColumns, sharedColumns, &uniqueKey, pkValues, args, false)
expected := `
replace /* gh-ost mydb.tbl */ into mydb.ghost (id, name, position)
(select id, name, position from mydb.tbl force index (PRIMARY)
where (((name = @v1) and (position = @v2)))
test.S(t).ExpectEquals(normalizeQuery(query), normalizeQuery(expected))
test.S(t).ExpectTrue(reflect.DeepEqual(explodedArgs, []interface{}{"gromit", "dog"}))
uniqueKey := UniqueKey{Name: "name_position_uidx", Columns: *NewColumnList([]string{"name", "position"})}
pkValues := []string{"@v1", "@v2"}
_, _, err := BuildPKInsertQuery(databaseName, originalTableName, ghostTableName, tableColumns, sharedColumns, sharedColumns, &uniqueKey, pkValues, args, false)
func TestBuildPKInsertPreparedQuery(t *testing.T) {
databaseName := "mydb"
originalTableName := "tbl"
ghostTableName := "ghost"
sharedColumns := []string{"id", "name", "position"}
tableColumns := NewColumnList(sharedColumns)
args := []interface{}{3, "gromit", "dog"}
uniqueKey := UniqueKey{Name: "PRIMARY", Columns: *NewColumnList([]string{"name", "position"})}
query, explodedArgs, err := BuildPKInsertPreparedQuery(databaseName, originalTableName, ghostTableName, tableColumns, sharedColumns, sharedColumns, &uniqueKey, args, false)
expected := `
replace /* gh-ost mydb.tbl */ into mydb.ghost (id, name, position)
(select id, name, position from mydb.tbl force index (PRIMARY)
where (((name = ?) and (position = ?)))
test.S(t).ExpectEquals(normalizeQuery(query), normalizeQuery(expected))
test.S(t).ExpectTrue(reflect.DeepEqual(explodedArgs, []interface{}{"gromit", "dog"}))
Normal file
Normal file
@ -0,0 +1,34 @@
drop table if exists gh_ost_test;
create table gh_ost_test (
id int auto_increment,
i int not null,
v varchar(128),
updated tinyint unsigned default 0,
primary key(id, v)
) auto_increment=1;
drop event if exists gh_ost_test;
delimiter ;;
create event gh_ost_test
on schedule every 1 second
starts current_timestamp
ends current_timestamp + interval 60 second
on completion not preserve
insert into gh_ost_test values (null, 11, 'eleven', 0);
update gh_ost_test set updated = 1 where i = 11 order by id desc limit 1;
insert into gh_ost_test values (null, 13, 'thirteen', 0);
update gh_ost_test set updated = 1 where i = 13 order by id desc limit 1;
insert into gh_ost_test values (null, 17, 'seventeen', 0);
update gh_ost_test set updated = 1 where i = 17 order by id desc limit 1;
insert into gh_ost_test values (null, 19, 'nineteen', 0);
update gh_ost_test set updated = 1 where i = 19 order by id desc limit 1;
insert into gh_ost_test values (null, 23, 'twenty three', 0);
update gh_ost_test set updated = 1 where i = 23 order by id desc limit 1;
end ;;
Reference in New Issue
Block a user