diff --git a/go/base/utils.go b/go/base/utils.go index 65e47f0..ed14514 100644 --- a/go/base/utils.go +++ b/go/base/utils.go @@ -13,6 +13,7 @@ import ( "time" gosql "database/sql" + "github.com/github/gh-ost/go/mysql" ) @@ -62,7 +63,7 @@ func StringContainsAll(s string, substrings ...string) bool { return nonEmptyStringsFound } -func ValidateConnection(db *gosql.DB, connectionConfig *mysql.ConnectionConfig, migrationContext *MigrationContext) (string, error) { +func ValidateConnection(db *gosql.DB, connectionConfig *mysql.ConnectionConfig, migrationContext *MigrationContext, name string) (string, error) { versionQuery := `select @@global.version` var port, extraPort int var version string @@ -86,7 +87,7 @@ func ValidateConnection(db *gosql.DB, connectionConfig *mysql.ConnectionConfig, } if connectionConfig.Key.Port == port || (extraPort > 0 && connectionConfig.Key.Port == extraPort) { - migrationContext.Log.Infof("connection validated on %+v", connectionConfig.Key) + migrationContext.Log.Infof("%s connection validated on %+v", name, connectionConfig.Key) return version, nil } else if extraPort == 0 { return "", fmt.Errorf("Unexpected database port reported: %+v", port) diff --git a/go/logic/applier.go b/go/logic/applier.go index 821b4d9..52628ec 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -57,6 +57,7 @@ type Applier struct { singletonDB *gosql.DB migrationContext *base.MigrationContext finishedMigrating int64 + name string } func NewApplier(migrationContext *base.MigrationContext) *Applier { @@ -64,6 +65,7 @@ func NewApplier(migrationContext *base.MigrationContext) *Applier { connectionConfig: migrationContext.ApplierConnectionConfig, migrationContext: migrationContext, finishedMigrating: 0, + name: "applier", } } @@ -78,11 +80,11 @@ func (this *Applier) InitDBConnections() (err error) { return err } this.singletonDB.SetMaxOpenConns(1) - version, err := base.ValidateConnection(this.db, this.connectionConfig, this.migrationContext) + version, err := base.ValidateConnection(this.db, this.connectionConfig, this.migrationContext, this.name) if err != nil { return err } - if _, err := base.ValidateConnection(this.singletonDB, this.connectionConfig, this.migrationContext); err != nil { + if _, err := base.ValidateConnection(this.singletonDB, this.connectionConfig, this.migrationContext, this.name); err != nil { return err } this.migrationContext.ApplierMySQLVersion = version diff --git a/go/logic/inspect.go b/go/logic/inspect.go index 95b8ad4..0128010 100644 --- a/go/logic/inspect.go +++ b/go/logic/inspect.go @@ -29,12 +29,14 @@ type Inspector struct { db *gosql.DB informationSchemaDb *gosql.DB migrationContext *base.MigrationContext + name string } func NewInspector(migrationContext *base.MigrationContext) *Inspector { return &Inspector{ connectionConfig: migrationContext.InspectorConnectionConfig, migrationContext: migrationContext, + name: "inspector", } } @@ -198,7 +200,7 @@ func (this *Inspector) validateConnection() error { return fmt.Errorf("MySQL replication length limited to 32 characters. See https://dev.mysql.com/doc/refman/5.7/en/assigning-passwords.html") } - version, err := base.ValidateConnection(this.db, this.connectionConfig, this.migrationContext) + version, err := base.ValidateConnection(this.db, this.connectionConfig, this.migrationContext, this.name) this.migrationContext.InspectorMySQLVersion = version return err } @@ -553,6 +555,7 @@ func (this *Inspector) applyColumnTypes(databaseName, tableName string, columnsL err := sqlutils.QueryRowsMap(this.db, query, func(m sqlutils.RowMap) error { columnName := m.GetString("COLUMN_NAME") columnType := m.GetString("COLUMN_TYPE") + columnOctetLength := m.GetUint("CHARACTER_OCTET_LENGTH") for _, columnsList := range columnsLists { column := columnsList.GetColumn(columnName) if column == nil { @@ -580,6 +583,10 @@ func (this *Inspector) applyColumnTypes(databaseName, tableName string, columnsL if strings.HasPrefix(columnType, "enum") { column.Type = sql.EnumColumnType } + if strings.HasPrefix(columnType, "binary") { + column.Type = sql.BinaryColumnType + column.BinaryOctetLength = columnOctetLength + } if charset := m.GetString("CHARACTER_SET_NAME"); charset != "" { column.Charset = charset } diff --git a/go/logic/streamer.go b/go/logic/streamer.go index 5f11fd0..a07240c 100644 --- a/go/logic/streamer.go +++ b/go/logic/streamer.go @@ -42,6 +42,7 @@ type EventsStreamer struct { listenersMutex *sync.Mutex eventsChannel chan *binlog.BinlogEntry binlogReader *binlog.GoMySQLReader + name string } func NewEventsStreamer(migrationContext *base.MigrationContext) *EventsStreamer { @@ -51,6 +52,7 @@ func NewEventsStreamer(migrationContext *base.MigrationContext) *EventsStreamer listeners: [](*BinlogEventListener){}, listenersMutex: &sync.Mutex{}, eventsChannel: make(chan *binlog.BinlogEntry, EventsChannelBufferSize), + name: "streamer", } } @@ -106,7 +108,7 @@ func (this *EventsStreamer) InitDBConnections() (err error) { if this.db, _, err = mysql.GetDB(this.migrationContext.Uuid, EventsStreamerUri); err != nil { return err } - if _, err := base.ValidateConnection(this.db, this.connectionConfig, this.migrationContext); err != nil { + if _, err := base.ValidateConnection(this.db, this.connectionConfig, this.migrationContext, this.name); err != nil { return err } if err := this.readCurrentBinlogCoordinates(); err != nil { diff --git a/go/sql/builder.go b/go/sql/builder.go index 4b019bc..776a10d 100644 --- a/go/sql/builder.go +++ b/go/sql/builder.go @@ -396,7 +396,7 @@ func BuildDMLDeleteQuery(databaseName, tableName string, tableColumns, uniqueKey } for _, column := range uniqueKeyColumns.Columns() { tableOrdinal := tableColumns.Ordinals[column.Name] - arg := column.convertArg(args[tableOrdinal]) + arg := column.convertArg(args[tableOrdinal], true) uniqueKeyArgs = append(uniqueKeyArgs, arg) } databaseName = EscapeName(databaseName) @@ -433,7 +433,7 @@ func BuildDMLInsertQuery(databaseName, tableName string, tableColumns, sharedCol for _, column := range sharedColumns.Columns() { tableOrdinal := tableColumns.Ordinals[column.Name] - arg := column.convertArg(args[tableOrdinal]) + arg := column.convertArg(args[tableOrdinal], false) sharedArgs = append(sharedArgs, arg) } @@ -481,13 +481,13 @@ func BuildDMLUpdateQuery(databaseName, tableName string, tableColumns, sharedCol for _, column := range sharedColumns.Columns() { tableOrdinal := tableColumns.Ordinals[column.Name] - arg := column.convertArg(valueArgs[tableOrdinal]) + arg := column.convertArg(valueArgs[tableOrdinal], false) sharedArgs = append(sharedArgs, arg) } for _, column := range uniqueKeyColumns.Columns() { tableOrdinal := tableColumns.Ordinals[column.Name] - arg := column.convertArg(whereArgs[tableOrdinal]) + arg := column.convertArg(whereArgs[tableOrdinal], true) uniqueKeyArgs = append(uniqueKeyArgs, arg) } diff --git a/go/sql/types.go b/go/sql/types.go index ef83819..fa6b74e 100644 --- a/go/sql/types.go +++ b/go/sql/types.go @@ -6,6 +6,7 @@ package sql import ( + "bytes" "fmt" "reflect" "strconv" @@ -22,6 +23,7 @@ const ( MediumIntColumnType JSONColumnType FloatColumnType + BinaryColumnType ) const maxMediumintUnsigned int32 = 16777215 @@ -31,19 +33,36 @@ type TimezoneConversion struct { } type Column struct { - Name string - IsUnsigned bool - Charset string - Type ColumnType + Name string + IsUnsigned bool + Charset string + Type ColumnType + + // add Octet length for binary type, fix bytes with suffix "00" get clipped in mysql binlog. + // https://github.com/github/gh-ost/issues/909 + BinaryOctetLength uint timezoneConversion *TimezoneConversion } -func (this *Column) convertArg(arg interface{}) interface{} { +func (this *Column) convertArg(arg interface{}, isUniqueKeyColumn bool) interface{} { if s, ok := arg.(string); ok { // string, charset conversion if encoding, ok := charsetEncodingMap[this.Charset]; ok { arg, _ = encoding.NewDecoder().String(s) } + + if this.Type == BinaryColumnType && isUniqueKeyColumn { + arg2Bytes := []byte(arg.(string)) + size := len(arg2Bytes) + if uint(size) < this.BinaryOctetLength { + buf := bytes.NewBuffer(arg2Bytes) + for i := uint(0); i < (this.BinaryOctetLength - uint(size)); i++ { + buf.Write([]byte{0}) + } + arg = buf.String() + } + } + return arg }