Merge pull request #915 from cenkore/iss909

fix: issue 909
This commit is contained in:
Rashiq 2021-03-08 20:25:09 +01:00 committed by GitHub
commit d4c91e6a2c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 33 additions and 9 deletions

View File

@ -555,6 +555,7 @@ func (this *Inspector) applyColumnTypes(databaseName, tableName string, columnsL
err := sqlutils.QueryRowsMap(this.db, query, func(m sqlutils.RowMap) error { err := sqlutils.QueryRowsMap(this.db, query, func(m sqlutils.RowMap) error {
columnName := m.GetString("COLUMN_NAME") columnName := m.GetString("COLUMN_NAME")
columnType := m.GetString("COLUMN_TYPE") columnType := m.GetString("COLUMN_TYPE")
columnOctetLength := m.GetUint("CHARACTER_OCTET_LENGTH")
for _, columnsList := range columnsLists { for _, columnsList := range columnsLists {
column := columnsList.GetColumn(columnName) column := columnsList.GetColumn(columnName)
if column == nil { if column == nil {
@ -582,6 +583,10 @@ func (this *Inspector) applyColumnTypes(databaseName, tableName string, columnsL
if strings.HasPrefix(columnType, "enum") { if strings.HasPrefix(columnType, "enum") {
column.Type = sql.EnumColumnType column.Type = sql.EnumColumnType
} }
if strings.HasPrefix(columnType, "binary") {
column.Type = sql.BinaryColumnType
column.BinaryOctetLength = columnOctetLength
}
if charset := m.GetString("CHARACTER_SET_NAME"); charset != "" { if charset := m.GetString("CHARACTER_SET_NAME"); charset != "" {
column.Charset = charset column.Charset = charset
} }

View File

@ -396,7 +396,7 @@ func BuildDMLDeleteQuery(databaseName, tableName string, tableColumns, uniqueKey
} }
for _, column := range uniqueKeyColumns.Columns() { for _, column := range uniqueKeyColumns.Columns() {
tableOrdinal := tableColumns.Ordinals[column.Name] tableOrdinal := tableColumns.Ordinals[column.Name]
arg := column.convertArg(args[tableOrdinal]) arg := column.convertArg(args[tableOrdinal], true)
uniqueKeyArgs = append(uniqueKeyArgs, arg) uniqueKeyArgs = append(uniqueKeyArgs, arg)
} }
databaseName = EscapeName(databaseName) databaseName = EscapeName(databaseName)
@ -433,7 +433,7 @@ func BuildDMLInsertQuery(databaseName, tableName string, tableColumns, sharedCol
for _, column := range sharedColumns.Columns() { for _, column := range sharedColumns.Columns() {
tableOrdinal := tableColumns.Ordinals[column.Name] tableOrdinal := tableColumns.Ordinals[column.Name]
arg := column.convertArg(args[tableOrdinal]) arg := column.convertArg(args[tableOrdinal], false)
sharedArgs = append(sharedArgs, arg) sharedArgs = append(sharedArgs, arg)
} }
@ -481,13 +481,13 @@ func BuildDMLUpdateQuery(databaseName, tableName string, tableColumns, sharedCol
for _, column := range sharedColumns.Columns() { for _, column := range sharedColumns.Columns() {
tableOrdinal := tableColumns.Ordinals[column.Name] tableOrdinal := tableColumns.Ordinals[column.Name]
arg := column.convertArg(valueArgs[tableOrdinal]) arg := column.convertArg(valueArgs[tableOrdinal], false)
sharedArgs = append(sharedArgs, arg) sharedArgs = append(sharedArgs, arg)
} }
for _, column := range uniqueKeyColumns.Columns() { for _, column := range uniqueKeyColumns.Columns() {
tableOrdinal := tableColumns.Ordinals[column.Name] tableOrdinal := tableColumns.Ordinals[column.Name]
arg := column.convertArg(whereArgs[tableOrdinal]) arg := column.convertArg(whereArgs[tableOrdinal], true)
uniqueKeyArgs = append(uniqueKeyArgs, arg) uniqueKeyArgs = append(uniqueKeyArgs, arg)
} }

View File

@ -6,6 +6,7 @@
package sql package sql
import ( import (
"bytes"
"fmt" "fmt"
"reflect" "reflect"
"strconv" "strconv"
@ -22,6 +23,7 @@ const (
MediumIntColumnType MediumIntColumnType
JSONColumnType JSONColumnType
FloatColumnType FloatColumnType
BinaryColumnType
) )
const maxMediumintUnsigned int32 = 16777215 const maxMediumintUnsigned int32 = 16777215
@ -31,19 +33,36 @@ type TimezoneConversion struct {
} }
type Column struct { type Column struct {
Name string Name string
IsUnsigned bool IsUnsigned bool
Charset string Charset string
Type ColumnType Type ColumnType
// add Octet length for binary type, fix bytes with suffix "00" get clipped in mysql binlog.
// https://github.com/github/gh-ost/issues/909
BinaryOctetLength uint
timezoneConversion *TimezoneConversion timezoneConversion *TimezoneConversion
} }
func (this *Column) convertArg(arg interface{}) interface{} { func (this *Column) convertArg(arg interface{}, isUniqueKeyColumn bool) interface{} {
if s, ok := arg.(string); ok { if s, ok := arg.(string); ok {
// string, charset conversion // string, charset conversion
if encoding, ok := charsetEncodingMap[this.Charset]; ok { if encoding, ok := charsetEncodingMap[this.Charset]; ok {
arg, _ = encoding.NewDecoder().String(s) arg, _ = encoding.NewDecoder().String(s)
} }
if this.Type == BinaryColumnType && isUniqueKeyColumn {
arg2Bytes := []byte(arg.(string))
size := len(arg2Bytes)
if uint(size) < this.BinaryOctetLength {
buf := bytes.NewBuffer(arg2Bytes)
for i := uint(0); i < (this.BinaryOctetLength - uint(size)); i++ {
buf.Write([]byte{0})
}
arg = buf.String()
}
}
return arg return arg
} }