copy and update text using convert when charset changes

addresses #290

Note: there is currently no issue backfilling the ghost table when the
characterset changes, likely because it's a insert-into-select-from and
it all occurs within mysql.

However, when applying DML events (UPDATE, DELETE, etc) the values are
sprintf'd into a prepared statement and due to the possibility of
migrating text column data containing invalid characters in the
destination charset, a conversion step is often necessary.

For example, when migrating a table/column from latin1 to utf8mb4, the
latin1 column may contain characters that are invalid single-byte utf8
characters. Characters in the \x80-\xFF range are most common. When
written to utf8mb4 column without conversion, they fail as they do not
exist in the utf8 codepage.

Converting these texts/characters to the destination charset using
convert(? using {charset}) will convert appropriately and the
update/replace will succeed.

I only point out the "Note:" above because there are two tests added
for this: latin1text-to-utf8mb4 and latin1text-to-ut8mb4-insert

The former is a test that fails prior to this commit. The latter is a
test that succeeds prior to this comment. Both are affected by the code
in this commit.

convert text to original charset, then destination

converting text first to the original charset and then to the
destination charset produces the most consistent results, as inserting
the binary into a utf8-charset column may encounter an error if there is
no prior context of latin1 encoding.

mysql> select hex(convert(char(189) using utf8mb4));
+---------------------------------------+
| hex(convert(char(189) using utf8mb4)) |
+---------------------------------------+
|                                       |
+---------------------------------------+
1 row in set, 1 warning (0.00 sec)

mysql> select hex(convert(convert(char(189) using latin1) using utf8mb4));
+-------------------------------------------------------------+
| hex(convert(convert(char(189) using latin1) using utf8mb4)) |
+-------------------------------------------------------------+
| C2BD                                                        |
+-------------------------------------------------------------+
1 row in set (0.00 sec)

as seen in this failure on 5.5.62

 Error 1300: Invalid utf8mb4 character string: 'BD'; query=
			replace /* gh-ost `test`.`_gh_ost_test_gho` */ into
				`test`.`_gh_ost_test_gho`
					(`id`, `t`)
				values
					(?, convert(? using utf8mb4))
This commit is contained in:
Josh Bielick 2021-07-09 16:17:01 -04:00
parent b84af7bdc7
commit 84e55ff904
No known key found for this signature in database
GPG Key ID: 3DA8A86A1E1EF5D5
5 changed files with 26 additions and 5 deletions

View File

@ -191,6 +191,9 @@ func (this *Inspector) inspectOriginalAndGhostTables() (err error) {
this.migrationContext.MappedSharedColumns.SetEnumToTextConversion(column.Name) this.migrationContext.MappedSharedColumns.SetEnumToTextConversion(column.Name)
this.migrationContext.MappedSharedColumns.SetEnumValues(column.Name, column.EnumValues) this.migrationContext.MappedSharedColumns.SetEnumValues(column.Name, column.EnumValues)
} }
if column.Name == mappedColumn.Name && column.Charset != mappedColumn.Charset {
this.migrationContext.MappedSharedColumns.SetCharsetConversion(column.Name, column.Charset, mappedColumn.Charset)
}
} }
for _, column := range this.migrationContext.UniqueKey.Columns.Columns() { for _, column := range this.migrationContext.UniqueKey.Columns.Columns() {

View File

@ -42,6 +42,8 @@ func buildColumnsPreparedValues(columns *ColumnList) []string {
token = fmt.Sprintf("ELT(?, %s)", column.EnumValues) token = fmt.Sprintf("ELT(?, %s)", column.EnumValues)
} else if column.Type == JSONColumnType { } else if column.Type == JSONColumnType {
token = "convert(? using utf8mb4)" token = "convert(? using utf8mb4)"
} else if column.charsetConversion != nil {
token = fmt.Sprintf("convert(convert(? using %s) using %s)", column.charsetConversion.FromCharset, column.charsetConversion.ToCharset)
} else { } else {
token = "?" token = "?"
} }
@ -114,6 +116,8 @@ func BuildSetPreparedClause(columns *ColumnList) (result string, err error) {
setToken = fmt.Sprintf("%s=ELT(?, %s)", EscapeName(column.Name), column.EnumValues) setToken = fmt.Sprintf("%s=ELT(?, %s)", EscapeName(column.Name), column.EnumValues)
} else if column.Type == JSONColumnType { } else if column.Type == JSONColumnType {
setToken = fmt.Sprintf("%s=convert(? using utf8mb4)", EscapeName(column.Name)) setToken = fmt.Sprintf("%s=convert(? using utf8mb4)", EscapeName(column.Name))
} else if column.charsetConversion != nil {
setToken = fmt.Sprintf("%s=convert(convert(? using %s) using %s)", EscapeName(column.Name), column.charsetConversion.FromCharset, column.charsetConversion.ToCharset)
} else { } else {
setToken = fmt.Sprintf("%s=?", EscapeName(column.Name)) setToken = fmt.Sprintf("%s=?", EscapeName(column.Name))
} }

View File

@ -32,6 +32,11 @@ type TimezoneConversion struct {
ToTimezone string ToTimezone string
} }
type CharsetConversion struct {
ToCharset string
FromCharset string
}
type Column struct { type Column struct {
Name string Name string
IsUnsigned bool IsUnsigned bool
@ -40,6 +45,7 @@ type Column struct {
EnumValues string EnumValues string
timezoneConversion *TimezoneConversion timezoneConversion *TimezoneConversion
enumToTextConversion bool enumToTextConversion bool
charsetConversion *CharsetConversion
// add Octet length for binary type, fix bytes with suffix "00" get clipped in mysql binlog. // add Octet length for binary type, fix bytes with suffix "00" get clipped in mysql binlog.
// https://github.com/github/gh-ost/issues/909 // https://github.com/github/gh-ost/issues/909
BinaryOctetLength uint BinaryOctetLength uint
@ -211,6 +217,14 @@ func (this *ColumnList) SetEnumValues(columnName string, enumValues string) {
this.GetColumn(columnName).EnumValues = enumValues this.GetColumn(columnName).EnumValues = enumValues
} }
func (this *ColumnList) SetCharsetConversion(columnName string, fromCharset string, toCharset string) {
this.GetColumn(columnName).charsetConversion = &CharsetConversion{FromCharset: fromCharset, ToCharset: toCharset}
}
func (this *ColumnList) IsCharsetConversion(columnName string) bool {
return this.GetColumn(columnName).charsetConversion != nil
}
func (this *ColumnList) String() string { func (this *ColumnList) String() string {
return strings.Join(this.Names(), ",") return strings.Join(this.Names(), ",")
} }

View File

@ -7,7 +7,7 @@ create table gh_ost_test (
primary key(id) primary key(id)
) auto_increment=1; ) auto_increment=1;
insert into gh_ost_test values (null, 'átesting'); insert into gh_ost_test values (null, 'átesting', '', '');
insert into gh_ost_test values (null, 'Hello world, Καλημέρα κόσμε, コンニチハ', 'átesting0', 'initial'); insert into gh_ost_test values (null, 'Hello world, Καλημέρα κόσμε, コンニチハ', 'átesting0', 'initial');

View File

@ -5,7 +5,7 @@ create table gh_ost_test (
primary key(id) primary key(id)
) auto_increment=1 charset latin1 collate latin1_swedish_ci; ) auto_increment=1 charset latin1 collate latin1_swedish_ci;
insert into gh_ost_test values (null, char(128)); insert into gh_ost_test values (null, char(189));
drop event if exists gh_ost_test; drop event if exists gh_ost_test;
delimiter ;; delimiter ;;
@ -18,7 +18,7 @@ create event gh_ost_test
do do
begin begin
insert into gh_ost_test values (null, md5(rand())); insert into gh_ost_test values (null, md5(rand()));
insert into gh_ost_test values (null, char(128)); insert into gh_ost_test values (null, char(189));
update gh_ost_test set t=char(230) order by id desc limit 1; update gh_ost_test set t=char(190) order by id desc limit 1;
delete from gh_ost_test where t=char(230); delete from gh_ost_test where t=char(190);
end ;; end ;;