gh-ost/go/sql/types.go
Josh Bielick 84e55ff904
copy and update text using convert when charset changes
addresses #290

Note: there is currently no issue backfilling the ghost table when the
characterset changes, likely because it's a insert-into-select-from and
it all occurs within mysql.

However, when applying DML events (UPDATE, DELETE, etc) the values are
sprintf'd into a prepared statement and due to the possibility of
migrating text column data containing invalid characters in the
destination charset, a conversion step is often necessary.

For example, when migrating a table/column from latin1 to utf8mb4, the
latin1 column may contain characters that are invalid single-byte utf8
characters. Characters in the \x80-\xFF range are most common. When
written to utf8mb4 column without conversion, they fail as they do not
exist in the utf8 codepage.

Converting these texts/characters to the destination charset using
convert(? using {charset}) will convert appropriately and the
update/replace will succeed.

I only point out the "Note:" above because there are two tests added
for this: latin1text-to-utf8mb4 and latin1text-to-ut8mb4-insert

The former is a test that fails prior to this commit. The latter is a
test that succeeds prior to this comment. Both are affected by the code
in this commit.

convert text to original charset, then destination

converting text first to the original charset and then to the
destination charset produces the most consistent results, as inserting
the binary into a utf8-charset column may encounter an error if there is
no prior context of latin1 encoding.

mysql> select hex(convert(char(189) using utf8mb4));
+---------------------------------------+
| hex(convert(char(189) using utf8mb4)) |
+---------------------------------------+
|                                       |
+---------------------------------------+
1 row in set, 1 warning (0.00 sec)

mysql> select hex(convert(convert(char(189) using latin1) using utf8mb4));
+-------------------------------------------------------------+
| hex(convert(convert(char(189) using latin1) using utf8mb4)) |
+-------------------------------------------------------------+
| C2BD                                                        |
+-------------------------------------------------------------+
1 row in set (0.00 sec)

as seen in this failure on 5.5.62

 Error 1300: Invalid utf8mb4 character string: 'BD'; query=
			replace /* gh-ost `test`.`_gh_ost_test_gho` */ into
				`test`.`_gh_ost_test_gho`
					(`id`, `t`)
				values
					(?, convert(? using utf8mb4))
2021-07-14 09:20:24 -04:00

328 lines
8.0 KiB
Go

/*
Copyright 2016 GitHub Inc.
See https://github.com/github/gh-ost/blob/master/LICENSE
*/
package sql
import (
"bytes"
"fmt"
"reflect"
"strconv"
"strings"
)
type ColumnType int
const (
UnknownColumnType ColumnType = iota
TimestampColumnType
DateTimeColumnType
EnumColumnType
MediumIntColumnType
JSONColumnType
FloatColumnType
BinaryColumnType
)
const maxMediumintUnsigned int32 = 16777215
type TimezoneConversion struct {
ToTimezone string
}
type CharsetConversion struct {
ToCharset string
FromCharset string
}
type Column struct {
Name string
IsUnsigned bool
Charset string
Type ColumnType
EnumValues string
timezoneConversion *TimezoneConversion
enumToTextConversion bool
charsetConversion *CharsetConversion
// add Octet length for binary type, fix bytes with suffix "00" get clipped in mysql binlog.
// https://github.com/github/gh-ost/issues/909
BinaryOctetLength uint
}
func (this *Column) convertArg(arg interface{}, isUniqueKeyColumn bool) interface{} {
if s, ok := arg.(string); ok {
// string, charset conversion
if encoding, ok := charsetEncodingMap[this.Charset]; ok {
arg, _ = encoding.NewDecoder().String(s)
}
if this.Type == BinaryColumnType && isUniqueKeyColumn {
arg2Bytes := []byte(arg.(string))
size := len(arg2Bytes)
if uint(size) < this.BinaryOctetLength {
buf := bytes.NewBuffer(arg2Bytes)
for i := uint(0); i < (this.BinaryOctetLength - uint(size)); i++ {
buf.Write([]byte{0})
}
arg = buf.String()
}
}
return arg
}
if this.IsUnsigned {
if i, ok := arg.(int8); ok {
return uint8(i)
}
if i, ok := arg.(int16); ok {
return uint16(i)
}
if i, ok := arg.(int32); ok {
if this.Type == MediumIntColumnType {
// problem with mediumint is that it's a 3-byte type. There is no compatible golang type to match that.
// So to convert from negative to positive we'd need to convert the value manually
if i >= 0 {
return i
}
return uint32(maxMediumintUnsigned + i + 1)
}
return uint32(i)
}
if i, ok := arg.(int64); ok {
return strconv.FormatUint(uint64(i), 10)
}
if i, ok := arg.(int); ok {
return uint(i)
}
}
return arg
}
func NewColumns(names []string) []Column {
result := make([]Column, len(names))
for i := range names {
result[i].Name = names[i]
}
return result
}
func ParseColumns(names string) []Column {
namesArray := strings.Split(names, ",")
return NewColumns(namesArray)
}
// ColumnsMap maps a column name onto its ordinal position
type ColumnsMap map[string]int
func NewEmptyColumnsMap() ColumnsMap {
columnsMap := make(map[string]int)
return ColumnsMap(columnsMap)
}
func NewColumnsMap(orderedColumns []Column) ColumnsMap {
columnsMap := NewEmptyColumnsMap()
for i, column := range orderedColumns {
columnsMap[column.Name] = i
}
return columnsMap
}
// ColumnList makes for a named list of columns
type ColumnList struct {
columns []Column
Ordinals ColumnsMap
}
// NewColumnList creates an object given ordered list of column names
func NewColumnList(names []string) *ColumnList {
result := &ColumnList{
columns: NewColumns(names),
}
result.Ordinals = NewColumnsMap(result.columns)
return result
}
// ParseColumnList parses a comma delimited list of column names
func ParseColumnList(names string) *ColumnList {
result := &ColumnList{
columns: ParseColumns(names),
}
result.Ordinals = NewColumnsMap(result.columns)
return result
}
func (this *ColumnList) Columns() []Column {
return this.columns
}
func (this *ColumnList) Names() []string {
names := make([]string, len(this.columns))
for i := range this.columns {
names[i] = this.columns[i].Name
}
return names
}
func (this *ColumnList) GetColumn(columnName string) *Column {
if ordinal, ok := this.Ordinals[columnName]; ok {
return &this.columns[ordinal]
}
return nil
}
func (this *ColumnList) SetUnsigned(columnName string) {
this.GetColumn(columnName).IsUnsigned = true
}
func (this *ColumnList) IsUnsigned(columnName string) bool {
return this.GetColumn(columnName).IsUnsigned
}
func (this *ColumnList) SetCharset(columnName string, charset string) {
this.GetColumn(columnName).Charset = charset
}
func (this *ColumnList) GetCharset(columnName string) string {
return this.GetColumn(columnName).Charset
}
func (this *ColumnList) SetColumnType(columnName string, columnType ColumnType) {
this.GetColumn(columnName).Type = columnType
}
func (this *ColumnList) GetColumnType(columnName string) ColumnType {
return this.GetColumn(columnName).Type
}
func (this *ColumnList) SetConvertDatetimeToTimestamp(columnName string, toTimezone string) {
this.GetColumn(columnName).timezoneConversion = &TimezoneConversion{ToTimezone: toTimezone}
}
func (this *ColumnList) HasTimezoneConversion(columnName string) bool {
return this.GetColumn(columnName).timezoneConversion != nil
}
func (this *ColumnList) SetEnumToTextConversion(columnName string) {
this.GetColumn(columnName).enumToTextConversion = true
}
func (this *ColumnList) IsEnumToTextConversion(columnName string) bool {
return this.GetColumn(columnName).enumToTextConversion
}
func (this *ColumnList) SetEnumValues(columnName string, enumValues string) {
this.GetColumn(columnName).EnumValues = enumValues
}
func (this *ColumnList) SetCharsetConversion(columnName string, fromCharset string, toCharset string) {
this.GetColumn(columnName).charsetConversion = &CharsetConversion{FromCharset: fromCharset, ToCharset: toCharset}
}
func (this *ColumnList) IsCharsetConversion(columnName string) bool {
return this.GetColumn(columnName).charsetConversion != nil
}
func (this *ColumnList) String() string {
return strings.Join(this.Names(), ",")
}
func (this *ColumnList) Equals(other *ColumnList) bool {
return reflect.DeepEqual(this.Columns, other.Columns)
}
func (this *ColumnList) EqualsByNames(other *ColumnList) bool {
return reflect.DeepEqual(this.Names(), other.Names())
}
// IsSubsetOf returns 'true' when column names of this list are a subset of
// another list, in arbitrary order (order agnostic)
func (this *ColumnList) IsSubsetOf(other *ColumnList) bool {
for _, column := range this.columns {
if _, exists := other.Ordinals[column.Name]; !exists {
return false
}
}
return true
}
func (this *ColumnList) Len() int {
return len(this.columns)
}
// UniqueKey is the combination of a key's name and columns
type UniqueKey struct {
Name string
Columns ColumnList
HasNullable bool
IsAutoIncrement bool
}
// IsPrimary checks if this unique key is primary
func (this *UniqueKey) IsPrimary() bool {
return this.Name == "PRIMARY"
}
func (this *UniqueKey) Len() int {
return this.Columns.Len()
}
func (this *UniqueKey) String() string {
description := this.Name
if this.IsAutoIncrement {
description = fmt.Sprintf("%s (auto_increment)", description)
}
return fmt.Sprintf("%s: %s; has nullable: %+v", description, this.Columns.Names(), this.HasNullable)
}
type ColumnValues struct {
abstractValues []interface{}
ValuesPointers []interface{}
}
func NewColumnValues(length int) *ColumnValues {
result := &ColumnValues{
abstractValues: make([]interface{}, length),
ValuesPointers: make([]interface{}, length),
}
for i := 0; i < length; i++ {
result.ValuesPointers[i] = &result.abstractValues[i]
}
return result
}
func ToColumnValues(abstractValues []interface{}) *ColumnValues {
result := &ColumnValues{
abstractValues: abstractValues,
ValuesPointers: make([]interface{}, len(abstractValues)),
}
for i := 0; i < len(abstractValues); i++ {
result.ValuesPointers[i] = &result.abstractValues[i]
}
return result
}
func (this *ColumnValues) AbstractValues() []interface{} {
return this.abstractValues
}
func (this *ColumnValues) StringColumn(index int) string {
val := this.AbstractValues()[index]
if ints, ok := val.([]uint8); ok {
return string(ints)
}
return fmt.Sprintf("%+v", val)
}
func (this *ColumnValues) String() string {
stringValues := []string{}
for i := range this.AbstractValues() {
stringValues = append(stringValues, this.StringColumn(i))
}
return strings.Join(stringValues, ",")
}