From 791d963ea06dc15a00fd1f7a4d3541061a494cc0 Mon Sep 17 00:00:00 2001 From: Shlomi Noach Date: Wed, 7 Sep 2016 14:24:11 +0200 Subject: [PATCH] Character set recognition and manipulation - Identifying textual characters sets; converting into specific type when applying dml events - Refactored `ColumnsList`: introducing `Column` type - Refactored `unsigned` handling, as part of `Column` - `Column` type supports `convertArg()`: converting value of argument according to column data type - DB URI attempts `utf8mb4,utf8,latin1` charsets in that order (first one to be recognized wins) - Local tests filter by pattern - Local tests append table schema on failure - Local tests do not have postpone flag file - Added character set local tests: `utf8`, `utf8mb4`, `latin1` --- build.sh | 2 +- go/logic/applier.go | 12 +- go/logic/inspect.go | 42 ++++--- go/mysql/connection.go | 8 +- go/sql/builder.go | 57 +++------ go/sql/types.go | 116 ++++++++++++++---- go/sql/types_test.go | 5 +- localtests/latin1/create.sql | 4 +- localtests/test.sh | 7 +- .../go-mysql/replication/row_event.go | 2 +- 10 files changed, 158 insertions(+), 97 deletions(-) diff --git a/build.sh b/build.sh index 77146c3..ff48aaa 100755 --- a/build.sh +++ b/build.sh @@ -2,7 +2,7 @@ # # -RELEASE_VERSION="1.0.17" +RELEASE_VERSION="1.0.18" function build { osname=$1 diff --git a/go/logic/applier.go b/go/logic/applier.go index 97be6a2..e798596 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -301,7 +301,7 @@ func (this *Applier) ExecuteThrottleQuery() (int64, error) { // ReadMigrationMinValues returns the minimum values to be iterated on rowcopy func (this *Applier) ReadMigrationMinValues(uniqueKey *sql.UniqueKey) error { log.Debugf("Reading migration range according to key: %s", uniqueKey.Name) - query, err := sql.BuildUniqueKeyMinValuesPreparedQuery(this.migrationContext.DatabaseName, this.migrationContext.OriginalTableName, uniqueKey.Columns.Names) + query, err := sql.BuildUniqueKeyMinValuesPreparedQuery(this.migrationContext.DatabaseName, this.migrationContext.OriginalTableName, uniqueKey.Columns.Names()) if err != nil { return err } @@ -322,7 +322,7 @@ func (this *Applier) ReadMigrationMinValues(uniqueKey *sql.UniqueKey) error { // ReadMigrationMaxValues returns the maximum values to be iterated on rowcopy func (this *Applier) ReadMigrationMaxValues(uniqueKey *sql.UniqueKey) error { log.Debugf("Reading migration range according to key: %s", uniqueKey.Name) - query, err := sql.BuildUniqueKeyMaxValuesPreparedQuery(this.migrationContext.DatabaseName, this.migrationContext.OriginalTableName, uniqueKey.Columns.Names) + query, err := sql.BuildUniqueKeyMaxValuesPreparedQuery(this.migrationContext.DatabaseName, this.migrationContext.OriginalTableName, uniqueKey.Columns.Names()) if err != nil { return err } @@ -363,7 +363,7 @@ func (this *Applier) CalculateNextIterationRangeEndValues() (hasFurtherRange boo query, explodedArgs, err := sql.BuildUniqueKeyRangeEndPreparedQuery( this.migrationContext.DatabaseName, this.migrationContext.OriginalTableName, - this.migrationContext.UniqueKey.Columns.Names, + this.migrationContext.UniqueKey.Columns.Names(), this.migrationContext.MigrationIterationRangeMinValues.AbstractValues(), this.migrationContext.MigrationRangeMaxValues.AbstractValues(), atomic.LoadInt64(&this.migrationContext.ChunkSize), @@ -402,10 +402,10 @@ func (this *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected this.migrationContext.DatabaseName, this.migrationContext.OriginalTableName, this.migrationContext.GetGhostTableName(), - this.migrationContext.SharedColumns.Names, - this.migrationContext.MappedSharedColumns.Names, + this.migrationContext.SharedColumns.Names(), + this.migrationContext.MappedSharedColumns.Names(), this.migrationContext.UniqueKey.Name, - this.migrationContext.UniqueKey.Columns.Names, + this.migrationContext.UniqueKey.Columns.Names(), this.migrationContext.MigrationIterationRangeMinValues.AbstractValues(), this.migrationContext.MigrationIterationRangeMaxValues.AbstractValues(), this.migrationContext.GetIteration() == 0, diff --git a/go/logic/inspect.go b/go/logic/inspect.go index 6e332f2..28d1884 100644 --- a/go/logic/inspect.go +++ b/go/logic/inspect.go @@ -135,8 +135,8 @@ func (this *Inspector) InspectOriginalAndGhostTables() (err error) { // This additional step looks at which columns are unsigned. We could have merged this within // the `getTableColumns()` function, but it's a later patch and introduces some complexity; I feel // comfortable in doing this as a separate step. - this.applyUnsignedColumns(this.migrationContext.DatabaseName, this.migrationContext.OriginalTableName, this.migrationContext.OriginalTableColumns, this.migrationContext.SharedColumns) - this.applyUnsignedColumns(this.migrationContext.DatabaseName, this.migrationContext.GetGhostTableName(), this.migrationContext.GhostTableColumns, this.migrationContext.MappedSharedColumns) + this.applyColumnTypes(this.migrationContext.DatabaseName, this.migrationContext.OriginalTableName, this.migrationContext.OriginalTableColumns, this.migrationContext.SharedColumns) + this.applyColumnTypes(this.migrationContext.DatabaseName, this.migrationContext.GetGhostTableName(), this.migrationContext.GhostTableColumns, this.migrationContext.MappedSharedColumns) return nil } @@ -477,23 +477,31 @@ func (this *Inspector) getTableColumns(databaseName, tableName string) (*sql.Col return sql.NewColumnList(columnNames), nil } -// applyUnsignedColumns -func (this *Inspector) applyUnsignedColumns(databaseName, tableName string, columnsLists ...*sql.ColumnList) error { - query := fmt.Sprintf(` - show columns from %s.%s - `, - sql.EscapeName(databaseName), - sql.EscapeName(tableName), - ) - err := sqlutils.QueryRowsMap(this.db, query, func(rowMap sqlutils.RowMap) error { - columnName := rowMap.GetString("Field") - if strings.Contains(rowMap.GetString("Type"), "unsigned") { +// applyColumnTypes +func (this *Inspector) applyColumnTypes(databaseName, tableName string, columnsLists ...*sql.ColumnList) error { + query := ` + select + * + from + information_schema.columns + where + table_schema=? + and table_name=? + ` + err := sqlutils.QueryRowsMap(this.db, query, func(m sqlutils.RowMap) error { + columnName := m.GetString("COLUMN_NAME") + if strings.Contains(m.GetString("COLUMN_TYPE"), "unsigned") { for _, columnsList := range columnsLists { columnsList.SetUnsigned(columnName) } } + if charset := m.GetString("CHARACTER_SET_NAME"); charset != "" { + for _, columnsList := range columnsLists { + columnsList.SetCharset(columnName, charset) + } + } return nil - }) + }, databaseName, tableName) return err } @@ -583,7 +591,7 @@ func (this *Inspector) getSharedUniqueKeys(originalUniqueKeys, ghostUniqueKeys [ // the ALTER is on the name itself... for _, originalUniqueKey := range originalUniqueKeys { for _, ghostUniqueKey := range ghostUniqueKeys { - if originalUniqueKey.Columns.Equals(&ghostUniqueKey.Columns) { + if originalUniqueKey.Columns.EqualsByNames(&ghostUniqueKey.Columns) { uniqueKeys = append(uniqueKeys, originalUniqueKey) } } @@ -594,11 +602,11 @@ func (this *Inspector) getSharedUniqueKeys(originalUniqueKeys, ghostUniqueKeys [ // getSharedColumns returns the intersection of two lists of columns in same order as the first list func (this *Inspector) getSharedColumns(originalColumns, ghostColumns *sql.ColumnList, columnRenameMap map[string]string) (*sql.ColumnList, *sql.ColumnList) { columnsInGhost := make(map[string]bool) - for _, ghostColumn := range ghostColumns.Names { + for _, ghostColumn := range ghostColumns.Names() { columnsInGhost[ghostColumn] = true } sharedColumnNames := []string{} - for _, originalColumn := range originalColumns.Names { + for _, originalColumn := range originalColumns.Names() { if columnsInGhost[originalColumn] || columnsInGhost[columnRenameMap[originalColumn]] { sharedColumnNames = append(sharedColumnNames, originalColumn) } diff --git a/go/mysql/connection.go b/go/mysql/connection.go index 01c1c91..649da93 100644 --- a/go/mysql/connection.go +++ b/go/mysql/connection.go @@ -48,11 +48,11 @@ func (this *ConnectionConfig) Equals(other *ConnectionConfig) bool { } func (this *ConnectionConfig) GetDBUri(databaseName string) string { - var ip = net.ParseIP(this.Key.Hostname) + hostname := this.Key.Hostname + var ip = net.ParseIP(hostname) if (ip != nil) && (ip.To4() == nil) { // Wrap IPv6 literals in square brackets - return fmt.Sprintf("%s:%s@tcp([%s]:%d)/%s", this.User, this.Password, this.Key.Hostname, this.Key.Port, databaseName) - } else { - return fmt.Sprintf("%s:%s@tcp(%s:%d)/%s", this.User, this.Password, this.Key.Hostname, this.Key.Port, databaseName) + hostname = fmt.Sprintf("[%s]", hostname) } + return fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8mb4,utf8,latin1", this.User, this.Password, hostname, this.Key.Port, databaseName) } diff --git a/go/sql/builder.go b/go/sql/builder.go index 79cea47..260f276 100644 --- a/go/sql/builder.go +++ b/go/sql/builder.go @@ -32,29 +32,6 @@ func EscapeName(name string) string { return fmt.Sprintf("`%s`", name) } -func fixArgType(arg interface{}, isUnsigned bool) interface{} { - if !isUnsigned { - return arg - } - // unsigned - if i, ok := arg.(int8); ok { - return uint8(i) - } - if i, ok := arg.(int16); ok { - return uint16(i) - } - if i, ok := arg.(int32); ok { - return uint32(i) - } - if i, ok := arg.(int64); ok { - return strconv.FormatUint(uint64(i), 10) - } - if i, ok := arg.(int); ok { - return uint(i) - } - return arg -} - func buildPreparedValues(length int) []string { values := make([]string, length, length) for i := 0; i < length; i++ { @@ -330,14 +307,14 @@ func BuildDMLDeleteQuery(databaseName, tableName string, tableColumns, uniqueKey if uniqueKeyColumns.Len() == 0 { return result, uniqueKeyArgs, fmt.Errorf("No unique key columns found in BuildDMLDeleteQuery") } - for _, column := range uniqueKeyColumns.Names { - tableOrdinal := tableColumns.Ordinals[column] - arg := fixArgType(args[tableOrdinal], uniqueKeyColumns.IsUnsigned(column)) + for _, column := range uniqueKeyColumns.Columns() { + tableOrdinal := tableColumns.Ordinals[column.Name] + arg := column.convertArg(args[tableOrdinal]) uniqueKeyArgs = append(uniqueKeyArgs, arg) } databaseName = EscapeName(databaseName) tableName = EscapeName(tableName) - equalsComparison, err := BuildEqualsPreparedComparison(uniqueKeyColumns.Names) + equalsComparison, err := BuildEqualsPreparedComparison(uniqueKeyColumns.Names()) if err != nil { return result, uniqueKeyArgs, err } @@ -367,13 +344,13 @@ func BuildDMLInsertQuery(databaseName, tableName string, tableColumns, sharedCol databaseName = EscapeName(databaseName) tableName = EscapeName(tableName) - for _, column := range mappedSharedColumns.Names { - tableOrdinal := tableColumns.Ordinals[column] - arg := fixArgType(args[tableOrdinal], mappedSharedColumns.IsUnsigned(column)) + for _, column := range mappedSharedColumns.Columns() { + tableOrdinal := tableColumns.Ordinals[column.Name] + arg := column.convertArg(args[tableOrdinal]) sharedArgs = append(sharedArgs, arg) } - mappedSharedColumnNames := duplicateNames(mappedSharedColumns.Names) + mappedSharedColumnNames := duplicateNames(mappedSharedColumns.Names()) for i := range mappedSharedColumnNames { mappedSharedColumnNames[i] = EscapeName(mappedSharedColumnNames[i]) } @@ -415,26 +392,26 @@ func BuildDMLUpdateQuery(databaseName, tableName string, tableColumns, sharedCol databaseName = EscapeName(databaseName) tableName = EscapeName(tableName) - for i, column := range sharedColumns.Names { - mappedColumn := mappedSharedColumns.Names[i] - tableOrdinal := tableColumns.Ordinals[column] - arg := fixArgType(valueArgs[tableOrdinal], mappedSharedColumns.IsUnsigned(mappedColumn)) + for i, column := range sharedColumns.Columns() { + mappedColumn := mappedSharedColumns.Columns()[i] + tableOrdinal := tableColumns.Ordinals[column.Name] + arg := mappedColumn.convertArg(valueArgs[tableOrdinal]) sharedArgs = append(sharedArgs, arg) } - for _, column := range uniqueKeyColumns.Names { - tableOrdinal := tableColumns.Ordinals[column] - arg := fixArgType(whereArgs[tableOrdinal], uniqueKeyColumns.IsUnsigned(column)) + for _, column := range uniqueKeyColumns.Columns() { + tableOrdinal := tableColumns.Ordinals[column.Name] + arg := column.convertArg(whereArgs[tableOrdinal]) uniqueKeyArgs = append(uniqueKeyArgs, arg) } - mappedSharedColumnNames := duplicateNames(mappedSharedColumns.Names) + mappedSharedColumnNames := duplicateNames(mappedSharedColumns.Names()) for i := range mappedSharedColumnNames { mappedSharedColumnNames[i] = EscapeName(mappedSharedColumnNames[i]) } setClause, err := BuildSetPreparedClause(mappedSharedColumnNames) - equalsComparison, err := BuildEqualsPreparedComparison(uniqueKeyColumns.Names) + equalsComparison, err := BuildEqualsPreparedComparison(uniqueKeyColumns.Names()) result = fmt.Sprintf(` update /* gh-ost %s.%s */ %s.%s diff --git a/go/sql/types.go b/go/sql/types.go index a8ffa9a..b33848c 100644 --- a/go/sql/types.go +++ b/go/sql/types.go @@ -8,10 +8,61 @@ package sql import ( "fmt" "reflect" + "strconv" "strings" + + "golang.org/x/text/encoding/charmap" ) -// ColumnsMap maps a column onto its ordinal position +type Column struct { + Name string + IsUnsigned bool + Charset string +} + +func (this *Column) convertArg(arg interface{}) interface{} { + if s, ok := arg.(string); ok { + switch this.Charset { + case "latin1": + arg, _ = charmap.Windows1252.NewDecoder().String(s) + } + return arg + } + + if this.IsUnsigned { + if i, ok := arg.(int8); ok { + return uint8(i) + } + if i, ok := arg.(int16); ok { + return uint16(i) + } + if i, ok := arg.(int32); ok { + return uint32(i) + } + if i, ok := arg.(int64); ok { + return strconv.FormatUint(uint64(i), 10) + } + if i, ok := arg.(int); ok { + return uint(i) + } + } + return arg +} + +func NewColumns(names []string) []Column { + result := make([]Column, len(names)) + for i := range names { + result[i].Name = names[i] + } + return result +} + +func ParseColumns(names string) []Column { + namesArray := strings.Split(names, ",") + return NewColumns(namesArray) +} + +// ColumnsMap maps a column name onto its ordinal position type ColumnsMap map[string]int func NewEmptyColumnsMap() ColumnsMap { @@ -19,62 +70,83 @@ func NewEmptyColumnsMap() ColumnsMap { return ColumnsMap(columnsMap) } -func NewColumnsMap(orderedNames []string) ColumnsMap { +func NewColumnsMap(orderedColumns []Column) ColumnsMap { columnsMap := NewEmptyColumnsMap() - for i, column := range orderedNames { - columnsMap[column] = i + for i, column := range orderedColumns { + columnsMap[column.Name] = i } return columnsMap } // ColumnList makes for a named list of columns type ColumnList struct { - Names []string - Ordinals ColumnsMap - UnsignedFlags ColumnsMap + columns []Column + Ordinals ColumnsMap } // NewColumnList creates an object given ordered list of column names func NewColumnList(names []string) *ColumnList { result := &ColumnList{ - Names: names, - UnsignedFlags: NewEmptyColumnsMap(), + columns: NewColumns(names), } - result.Ordinals = NewColumnsMap(result.Names) + result.Ordinals = NewColumnsMap(result.columns) return result } // ParseColumnList parses a comma delimited list of column names -func ParseColumnList(columns string) *ColumnList { +func ParseColumnList(names string) *ColumnList { result := &ColumnList{ - Names: strings.Split(columns, ","), - UnsignedFlags: NewEmptyColumnsMap(), + columns: ParseColumns(names), } - result.Ordinals = NewColumnsMap(result.Names) + result.Ordinals = NewColumnsMap(result.columns) return result } +func (this *ColumnList) Columns() []Column { + return this.columns +} + +func (this *ColumnList) Names() []string { + names := make([]string, len(this.columns)) + for i := range this.columns { + names[i] = this.columns[i].Name + } + return names +} + func (this *ColumnList) SetUnsigned(columnName string) { - this.UnsignedFlags[columnName] = 1 + this.columns[this.Ordinals[columnName]].IsUnsigned = true } func (this *ColumnList) IsUnsigned(columnName string) bool { - return this.UnsignedFlags[columnName] == 1 + return this.columns[this.Ordinals[columnName]].IsUnsigned +} + +func (this *ColumnList) SetCharset(columnName string, charset string) { + this.columns[this.Ordinals[columnName]].Charset = charset +} + +func (this *ColumnList) GetCharset(columnName string) string { + return this.columns[this.Ordinals[columnName]].Charset } func (this *ColumnList) String() string { - return strings.Join(this.Names, ",") + return strings.Join(this.Names(), ",") } func (this *ColumnList) Equals(other *ColumnList) bool { - return reflect.DeepEqual(this.Names, other.Names) + return reflect.DeepEqual(this.Columns, other.Columns) +} + +func (this *ColumnList) EqualsByNames(other *ColumnList) bool { + return reflect.DeepEqual(this.Names(), other.Names()) } // IsSubsetOf returns 'true' when column names of this list are a subset of // another list, in arbitrary order (order agnostic) func (this *ColumnList) IsSubsetOf(other *ColumnList) bool { - for _, column := range this.Names { - if _, exists := other.Ordinals[column]; !exists { + for _, column := range this.columns { + if _, exists := other.Ordinals[column.Name]; !exists { return false } } @@ -82,7 +154,7 @@ func (this *ColumnList) IsSubsetOf(other *ColumnList) bool { } func (this *ColumnList) Len() int { - return len(this.Names) + return len(this.columns) } // UniqueKey is the combination of a key's name and columns @@ -107,7 +179,7 @@ func (this *UniqueKey) String() string { if this.IsAutoIncrement { description = fmt.Sprintf("%s (auto_increment)", description) } - return fmt.Sprintf("%s: %s; has nullable: %+v", description, this.Columns.Names, this.HasNullable) + return fmt.Sprintf("%s: %s; has nullable: %+v", description, this.Columns.Names(), this.HasNullable) } type ColumnValues struct { diff --git a/go/sql/types_test.go b/go/sql/types_test.go index cc0d792..be2cc73 100644 --- a/go/sql/types_test.go +++ b/go/sql/types_test.go @@ -8,9 +8,10 @@ package sql import ( "testing" + "reflect" + "github.com/outbrain/golib/log" test "github.com/outbrain/golib/tests" - "reflect" ) func init() { @@ -22,7 +23,7 @@ func TestParseColumnList(t *testing.T) { columnList := ParseColumnList(names) test.S(t).ExpectEquals(columnList.Len(), 3) - test.S(t).ExpectTrue(reflect.DeepEqual(columnList.Names, []string{"id", "category", "max_len"})) + test.S(t).ExpectTrue(reflect.DeepEqual(columnList.Names(), []string{"id", "category", "max_len"})) test.S(t).ExpectEquals(columnList.Ordinals["id"], 0) test.S(t).ExpectEquals(columnList.Ordinals["category"], 1) test.S(t).ExpectEquals(columnList.Ordinals["max_len"], 2) diff --git a/localtests/latin1/create.sql b/localtests/latin1/create.sql index 7f63ce2..3ca9b47 100644 --- a/localtests/latin1/create.sql +++ b/localtests/latin1/create.sql @@ -16,6 +16,6 @@ create event gh_ost_test do begin insert into gh_ost_test values (null, md5(rand())); - insert into gh_ost_test values (null, 'novo proprietário'); - insert into gh_ost_test values (null, 'usuário'); + insert into gh_ost_test values (null, 'átesting'); + insert into gh_ost_test values (null, 'testátest'); end ;; diff --git a/localtests/test.sh b/localtests/test.sh index a8718a0..3362d3f 100755 --- a/localtests/test.sh +++ b/localtests/test.sh @@ -50,7 +50,7 @@ test_single() { echo_dot gh-ost-test-mysql-replica -e "start slave" echo_dot - gh-ost-test-mysql-master test < $tests_path/$test_name/create.sql + gh-ost-test-mysql-master --default-character-set=utf8mb4 test < $tests_path/$test_name/create.sql extra_args="" if [ -f $tests_path/$test_name/extra_args ] ; then @@ -79,7 +79,7 @@ test_single() { --throttle-query='select timestampdiff(second, min(last_update), now()) < 5 from _gh_ost_test_ghc' \ --serve-socket-file=/tmp/gh-ost.test.sock \ --initially-drop-socket-file \ - --postpone-cut-over-flag-file=/tmp/gh-ost.postpone.flag \ + --postpone-cut-over-flag-file="" \ --test-on-replica \ --default-retries=1 \ --verbose \ @@ -115,9 +115,12 @@ test_all() { find $tests_path ! -path . -type d -mindepth 1 -maxdepth 1 | cut -d "/" -f 3 | egrep "$test_pattern" | while read test_name ; do test_single "$test_name" if [ $? -ne 0 ] ; then + create_statement=$(gh-ost-test-mysql-replica test -t -e "show create table _gh_ost_test_gho \G") + echo "$create_statement" >> $test_logfile echo "+ FAIL" return 1 else + echo echo "+ pass" fi gh-ost-test-mysql-replica -e "start slave" diff --git a/vendor/github.com/siddontang/go-mysql/replication/row_event.go b/vendor/github.com/siddontang/go-mysql/replication/row_event.go index 0e4eb17..9986d43 100644 --- a/vendor/github.com/siddontang/go-mysql/replication/row_event.go +++ b/vendor/github.com/siddontang/go-mysql/replication/row_event.go @@ -609,7 +609,7 @@ func decodeTimestamp2(data []byte, dec uint16) (string, int, error) { return "0000-00-00 00:00:00", n, nil } - t := time.Unix(sec, usec*1000).UTC() + t := time.Unix(sec, usec*1000).UTC() // .UTC() converted by shlomi-noach return t.Format(TimeFormat), n, nil }