Merge branch 'master' into cathal/safer_cut_over
This commit is contained in:
commit
d3bf3cde4d
@ -13,6 +13,7 @@ import (
|
||||
"time"
|
||||
|
||||
gosql "database/sql"
|
||||
|
||||
"github.com/github/gh-ost/go/mysql"
|
||||
)
|
||||
|
||||
@ -62,7 +63,7 @@ func StringContainsAll(s string, substrings ...string) bool {
|
||||
return nonEmptyStringsFound
|
||||
}
|
||||
|
||||
func ValidateConnection(db *gosql.DB, connectionConfig *mysql.ConnectionConfig, migrationContext *MigrationContext) (string, error) {
|
||||
func ValidateConnection(db *gosql.DB, connectionConfig *mysql.ConnectionConfig, migrationContext *MigrationContext, name string) (string, error) {
|
||||
versionQuery := `select @@global.version`
|
||||
var port, extraPort int
|
||||
var version string
|
||||
@ -86,7 +87,7 @@ func ValidateConnection(db *gosql.DB, connectionConfig *mysql.ConnectionConfig,
|
||||
}
|
||||
|
||||
if connectionConfig.Key.Port == port || (extraPort > 0 && connectionConfig.Key.Port == extraPort) {
|
||||
migrationContext.Log.Infof("connection validated on %+v", connectionConfig.Key)
|
||||
migrationContext.Log.Infof("%s connection validated on %+v", name, connectionConfig.Key)
|
||||
return version, nil
|
||||
} else if extraPort == 0 {
|
||||
return "", fmt.Errorf("Unexpected database port reported: %+v", port)
|
||||
|
@ -57,6 +57,7 @@ type Applier struct {
|
||||
singletonDB *gosql.DB
|
||||
migrationContext *base.MigrationContext
|
||||
finishedMigrating int64
|
||||
name string
|
||||
}
|
||||
|
||||
func NewApplier(migrationContext *base.MigrationContext) *Applier {
|
||||
@ -64,6 +65,7 @@ func NewApplier(migrationContext *base.MigrationContext) *Applier {
|
||||
connectionConfig: migrationContext.ApplierConnectionConfig,
|
||||
migrationContext: migrationContext,
|
||||
finishedMigrating: 0,
|
||||
name: "applier",
|
||||
}
|
||||
}
|
||||
|
||||
@ -78,11 +80,11 @@ func (this *Applier) InitDBConnections() (err error) {
|
||||
return err
|
||||
}
|
||||
this.singletonDB.SetMaxOpenConns(1)
|
||||
version, err := base.ValidateConnection(this.db, this.connectionConfig, this.migrationContext)
|
||||
version, err := base.ValidateConnection(this.db, this.connectionConfig, this.migrationContext, this.name)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if _, err := base.ValidateConnection(this.singletonDB, this.connectionConfig, this.migrationContext); err != nil {
|
||||
if _, err := base.ValidateConnection(this.singletonDB, this.connectionConfig, this.migrationContext, this.name); err != nil {
|
||||
return err
|
||||
}
|
||||
this.migrationContext.ApplierMySQLVersion = version
|
||||
|
@ -29,12 +29,14 @@ type Inspector struct {
|
||||
db *gosql.DB
|
||||
informationSchemaDb *gosql.DB
|
||||
migrationContext *base.MigrationContext
|
||||
name string
|
||||
}
|
||||
|
||||
func NewInspector(migrationContext *base.MigrationContext) *Inspector {
|
||||
return &Inspector{
|
||||
connectionConfig: migrationContext.InspectorConnectionConfig,
|
||||
migrationContext: migrationContext,
|
||||
name: "inspector",
|
||||
}
|
||||
}
|
||||
|
||||
@ -198,7 +200,7 @@ func (this *Inspector) validateConnection() error {
|
||||
return fmt.Errorf("MySQL replication length limited to 32 characters. See https://dev.mysql.com/doc/refman/5.7/en/assigning-passwords.html")
|
||||
}
|
||||
|
||||
version, err := base.ValidateConnection(this.db, this.connectionConfig, this.migrationContext)
|
||||
version, err := base.ValidateConnection(this.db, this.connectionConfig, this.migrationContext, this.name)
|
||||
this.migrationContext.InspectorMySQLVersion = version
|
||||
return err
|
||||
}
|
||||
@ -553,6 +555,7 @@ func (this *Inspector) applyColumnTypes(databaseName, tableName string, columnsL
|
||||
err := sqlutils.QueryRowsMap(this.db, query, func(m sqlutils.RowMap) error {
|
||||
columnName := m.GetString("COLUMN_NAME")
|
||||
columnType := m.GetString("COLUMN_TYPE")
|
||||
columnOctetLength := m.GetUint("CHARACTER_OCTET_LENGTH")
|
||||
for _, columnsList := range columnsLists {
|
||||
column := columnsList.GetColumn(columnName)
|
||||
if column == nil {
|
||||
@ -580,6 +583,10 @@ func (this *Inspector) applyColumnTypes(databaseName, tableName string, columnsL
|
||||
if strings.HasPrefix(columnType, "enum") {
|
||||
column.Type = sql.EnumColumnType
|
||||
}
|
||||
if strings.HasPrefix(columnType, "binary") {
|
||||
column.Type = sql.BinaryColumnType
|
||||
column.BinaryOctetLength = columnOctetLength
|
||||
}
|
||||
if charset := m.GetString("CHARACTER_SET_NAME"); charset != "" {
|
||||
column.Charset = charset
|
||||
}
|
||||
|
@ -42,6 +42,7 @@ type EventsStreamer struct {
|
||||
listenersMutex *sync.Mutex
|
||||
eventsChannel chan *binlog.BinlogEntry
|
||||
binlogReader *binlog.GoMySQLReader
|
||||
name string
|
||||
}
|
||||
|
||||
func NewEventsStreamer(migrationContext *base.MigrationContext) *EventsStreamer {
|
||||
@ -51,6 +52,7 @@ func NewEventsStreamer(migrationContext *base.MigrationContext) *EventsStreamer
|
||||
listeners: [](*BinlogEventListener){},
|
||||
listenersMutex: &sync.Mutex{},
|
||||
eventsChannel: make(chan *binlog.BinlogEntry, EventsChannelBufferSize),
|
||||
name: "streamer",
|
||||
}
|
||||
}
|
||||
|
||||
@ -106,7 +108,7 @@ func (this *EventsStreamer) InitDBConnections() (err error) {
|
||||
if this.db, _, err = mysql.GetDB(this.migrationContext.Uuid, EventsStreamerUri); err != nil {
|
||||
return err
|
||||
}
|
||||
if _, err := base.ValidateConnection(this.db, this.connectionConfig, this.migrationContext); err != nil {
|
||||
if _, err := base.ValidateConnection(this.db, this.connectionConfig, this.migrationContext, this.name); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := this.readCurrentBinlogCoordinates(); err != nil {
|
||||
|
@ -396,7 +396,7 @@ func BuildDMLDeleteQuery(databaseName, tableName string, tableColumns, uniqueKey
|
||||
}
|
||||
for _, column := range uniqueKeyColumns.Columns() {
|
||||
tableOrdinal := tableColumns.Ordinals[column.Name]
|
||||
arg := column.convertArg(args[tableOrdinal])
|
||||
arg := column.convertArg(args[tableOrdinal], true)
|
||||
uniqueKeyArgs = append(uniqueKeyArgs, arg)
|
||||
}
|
||||
databaseName = EscapeName(databaseName)
|
||||
@ -433,7 +433,7 @@ func BuildDMLInsertQuery(databaseName, tableName string, tableColumns, sharedCol
|
||||
|
||||
for _, column := range sharedColumns.Columns() {
|
||||
tableOrdinal := tableColumns.Ordinals[column.Name]
|
||||
arg := column.convertArg(args[tableOrdinal])
|
||||
arg := column.convertArg(args[tableOrdinal], false)
|
||||
sharedArgs = append(sharedArgs, arg)
|
||||
}
|
||||
|
||||
@ -481,13 +481,13 @@ func BuildDMLUpdateQuery(databaseName, tableName string, tableColumns, sharedCol
|
||||
|
||||
for _, column := range sharedColumns.Columns() {
|
||||
tableOrdinal := tableColumns.Ordinals[column.Name]
|
||||
arg := column.convertArg(valueArgs[tableOrdinal])
|
||||
arg := column.convertArg(valueArgs[tableOrdinal], false)
|
||||
sharedArgs = append(sharedArgs, arg)
|
||||
}
|
||||
|
||||
for _, column := range uniqueKeyColumns.Columns() {
|
||||
tableOrdinal := tableColumns.Ordinals[column.Name]
|
||||
arg := column.convertArg(whereArgs[tableOrdinal])
|
||||
arg := column.convertArg(whereArgs[tableOrdinal], true)
|
||||
uniqueKeyArgs = append(uniqueKeyArgs, arg)
|
||||
}
|
||||
|
||||
|
@ -6,6 +6,7 @@
|
||||
package sql
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"strconv"
|
||||
@ -22,6 +23,7 @@ const (
|
||||
MediumIntColumnType
|
||||
JSONColumnType
|
||||
FloatColumnType
|
||||
BinaryColumnType
|
||||
)
|
||||
|
||||
const maxMediumintUnsigned int32 = 16777215
|
||||
@ -31,19 +33,36 @@ type TimezoneConversion struct {
|
||||
}
|
||||
|
||||
type Column struct {
|
||||
Name string
|
||||
IsUnsigned bool
|
||||
Charset string
|
||||
Type ColumnType
|
||||
Name string
|
||||
IsUnsigned bool
|
||||
Charset string
|
||||
Type ColumnType
|
||||
|
||||
// add Octet length for binary type, fix bytes with suffix "00" get clipped in mysql binlog.
|
||||
// https://github.com/github/gh-ost/issues/909
|
||||
BinaryOctetLength uint
|
||||
timezoneConversion *TimezoneConversion
|
||||
}
|
||||
|
||||
func (this *Column) convertArg(arg interface{}) interface{} {
|
||||
func (this *Column) convertArg(arg interface{}, isUniqueKeyColumn bool) interface{} {
|
||||
if s, ok := arg.(string); ok {
|
||||
// string, charset conversion
|
||||
if encoding, ok := charsetEncodingMap[this.Charset]; ok {
|
||||
arg, _ = encoding.NewDecoder().String(s)
|
||||
}
|
||||
|
||||
if this.Type == BinaryColumnType && isUniqueKeyColumn {
|
||||
arg2Bytes := []byte(arg.(string))
|
||||
size := len(arg2Bytes)
|
||||
if uint(size) < this.BinaryOctetLength {
|
||||
buf := bytes.NewBuffer(arg2Bytes)
|
||||
for i := uint(0); i < (this.BinaryOctetLength - uint(size)); i++ {
|
||||
buf.Write([]byte{0})
|
||||
}
|
||||
arg = buf.String()
|
||||
}
|
||||
}
|
||||
|
||||
return arg
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user