Merge pull request #160 from github/fix-unsigned-columns

Fix unsigned columns
This commit is contained in:
Shlomi Noach 2016-08-17 13:22:05 +02:00 committed by GitHub
commit 825c64fd62
7 changed files with 245 additions and 21 deletions

View File

@ -2,7 +2,7 @@
#
#
RELEASE_VERSION="1.0.9"
RELEASE_VERSION="1.0.10"
function build {
osname=$1

View File

@ -128,33 +128,23 @@ func (this *GoMySQLReader) StreamEvents(canStopStreaming func() bool, entriesCha
if err != nil {
return err
}
// if rand.Intn(1000) == 0 {
// this.binlogSyncer.Close()
// log.Debugf("current: %+v, hint: %+v", this.currentCoordinates, this.LastAppliedRowsEventHint)
// return log.Errorf(".............haha got random error")
// }
// log.Debugf("0001 ........ currentCoordinates: %+v", this.currentCoordinates) //TODO
func() {
this.currentCoordinatesMutex.Lock()
defer this.currentCoordinatesMutex.Unlock()
this.currentCoordinates.LogPos = int64(ev.Header.LogPos)
}()
if rotateEvent, ok := ev.Event.(*replication.RotateEvent); ok {
// log.Debugf("0008 ........ currentCoordinates: %+v", this.currentCoordinates) //TODO
// ev.Dump(os.Stdout)
func() {
this.currentCoordinatesMutex.Lock()
defer this.currentCoordinatesMutex.Unlock()
this.currentCoordinates.LogFile = string(rotateEvent.NextLogName)
}()
// log.Debugf("0001 ........ currentCoordinates: %+v", this.currentCoordinates) //TODO
log.Infof("rotate to next log name: %s", rotateEvent.NextLogName)
} else if rowsEvent, ok := ev.Event.(*replication.RowsEvent); ok {
if err := this.handleRowsEvent(ev, rowsEvent, entriesChannel); err != nil {
return err
}
}
// log.Debugf("TODO ........ currentCoordinates: %+v", this.currentCoordinates) //TODO
}
log.Debugf("done streaming events")

View File

@ -853,6 +853,31 @@ func (this *Applier) ApplyDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) error {
if err != nil {
return err
}
// TODO The below is commented, and is in preparation for transactional writes on the ghost tables.
// Such writes would be, for example:
// - prepended with sql_mode setup
// - prepended with SET SQL_LOG_BIN=0
// - prepended with SET FK_CHECKS=0
// etc.
//
// Current known problem: https://github.com/golang/go/issues/9373 -- bitint unsigned values, not supported in database/sql
//
// err = func() error {
// tx, err := this.db.Begin()
// if err != nil {
// return err
// }
// if _, err := tx.Exec(query, args...); err != nil {
// return err
// }
// if err := tx.Commit(); err != nil {
// return err
// }
// return nil
// }()
_, err = sqlutils.Exec(this.db, query, args...)
if err == nil {
atomic.AddInt64(&this.migrationContext.TotalDMLEventsApplied, 1)
@ -860,5 +885,9 @@ func (this *Applier) ApplyDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) error {
if this.migrationContext.CountTableRows {
atomic.AddInt64(&this.migrationContext.RowsEstimate, rowDelta)
}
if err != nil {
err = fmt.Errorf("%s; query=%s; args=%+v", err.Error(), query, args)
log.Errore(err)
}
return err
}

View File

@ -134,6 +134,13 @@ func (this *Inspector) InspectOriginalAndGhostTables() (err error) {
this.migrationContext.SharedColumns, this.migrationContext.MappedSharedColumns = this.getSharedColumns(this.migrationContext.OriginalTableColumns, this.migrationContext.GhostTableColumns, this.migrationContext.ColumnRenameMap)
log.Infof("Shared columns are %s", this.migrationContext.SharedColumns)
// By fact that a non-empty unique key exists we also know the shared columns are non-empty
// This additional step looks at which columns are unsigned. We could have merged this within
// the `getTableColumns()` function, but it's a later patch and introduces some complexity; I feel
// comfortable in doing this as a separate step.
this.applyUnsignedColumns(this.migrationContext.DatabaseName, this.migrationContext.OriginalTableName, this.migrationContext.OriginalTableColumns, this.migrationContext.SharedColumns)
this.applyUnsignedColumns(this.migrationContext.DatabaseName, this.migrationContext.GetGhostTableName(), this.migrationContext.GhostTableColumns, this.migrationContext.MappedSharedColumns)
return nil
}
@ -450,6 +457,26 @@ func (this *Inspector) getTableColumns(databaseName, tableName string) (*sql.Col
return sql.NewColumnList(columnNames), nil
}
// applyUnsignedColumns
func (this *Inspector) applyUnsignedColumns(databaseName, tableName string, columnsLists ...*sql.ColumnList) error {
query := fmt.Sprintf(`
show columns from %s.%s
`,
sql.EscapeName(databaseName),
sql.EscapeName(tableName),
)
err := sqlutils.QueryRowsMap(this.db, query, func(rowMap sqlutils.RowMap) error {
columnName := rowMap.GetString("Field")
if strings.Contains(rowMap.GetString("Type"), "unsigned") {
for _, columnsList := range columnsLists {
columnsList.SetUnsigned(columnName)
}
}
return nil
})
return err
}
// getCandidateUniqueKeys investigates a table and returns the list of unique keys
// candidate for chunking
func (this *Inspector) getCandidateUniqueKeys(tableName string) (uniqueKeys [](*sql.UniqueKey), err error) {

View File

@ -32,6 +32,29 @@ func EscapeName(name string) string {
return fmt.Sprintf("`%s`", name)
}
func fixArgType(arg interface{}, isUnsigned bool) interface{} {
if !isUnsigned {
return arg
}
// unsigned
if i, ok := arg.(int8); ok {
return uint8(i)
}
if i, ok := arg.(int16); ok {
return uint16(i)
}
if i, ok := arg.(int32); ok {
return uint32(i)
}
if i, ok := arg.(int64); ok {
return uint64(i)
}
if i, ok := arg.(int); ok {
return uint(i)
}
return arg
}
func buildPreparedValues(length int) []string {
values := make([]string, length, length)
for i := 0; i < length; i++ {
@ -309,7 +332,8 @@ func BuildDMLDeleteQuery(databaseName, tableName string, tableColumns, uniqueKey
}
for _, column := range uniqueKeyColumns.Names {
tableOrdinal := tableColumns.Ordinals[column]
uniqueKeyArgs = append(uniqueKeyArgs, args[tableOrdinal])
arg := fixArgType(args[tableOrdinal], uniqueKeyColumns.IsUnsigned(column))
uniqueKeyArgs = append(uniqueKeyArgs, arg)
}
databaseName = EscapeName(databaseName)
tableName = EscapeName(tableName)
@ -345,7 +369,8 @@ func BuildDMLInsertQuery(databaseName, tableName string, tableColumns, sharedCol
for _, column := range sharedColumns.Names {
tableOrdinal := tableColumns.Ordinals[column]
sharedArgs = append(sharedArgs, args[tableOrdinal])
arg := fixArgType(args[tableOrdinal], sharedColumns.IsUnsigned(column))
sharedArgs = append(sharedArgs, arg)
}
sharedColumnNames := duplicateNames(sharedColumns.Names)
@ -392,12 +417,14 @@ func BuildDMLUpdateQuery(databaseName, tableName string, tableColumns, sharedCol
for _, column := range sharedColumns.Names {
tableOrdinal := tableColumns.Ordinals[column]
sharedArgs = append(sharedArgs, valueArgs[tableOrdinal])
arg := fixArgType(valueArgs[tableOrdinal], sharedColumns.IsUnsigned(column))
sharedArgs = append(sharedArgs, arg)
}
for _, column := range uniqueKeyColumns.Names {
tableOrdinal := tableColumns.Ordinals[column]
uniqueKeyArgs = append(uniqueKeyArgs, whereArgs[tableOrdinal])
arg := fixArgType(whereArgs[tableOrdinal], uniqueKeyColumns.IsUnsigned(column))
uniqueKeyArgs = append(uniqueKeyArgs, arg)
}
sharedColumnNames := duplicateNames(sharedColumns.Names)

View File

@ -397,6 +397,44 @@ func TestBuildDMLDeleteQuery(t *testing.T) {
}
}
func TestBuildDMLDeleteQuerySignedUnsigned(t *testing.T) {
databaseName := "mydb"
tableName := "tbl"
tableColumns := NewColumnList([]string{"id", "name", "rank", "position", "age"})
uniqueKeyColumns := NewColumnList([]string{"position"})
{
// test signed (expect no change)
args := []interface{}{3, "testname", "first", -1, 23}
query, uniqueKeyArgs, err := BuildDMLDeleteQuery(databaseName, tableName, tableColumns, uniqueKeyColumns, args)
test.S(t).ExpectNil(err)
expected := `
delete /* gh-ost mydb.tbl */
from
mydb.tbl
where
((position = ?))
`
test.S(t).ExpectEquals(normalizeQuery(query), normalizeQuery(expected))
test.S(t).ExpectTrue(reflect.DeepEqual(uniqueKeyArgs, []interface{}{-1}))
}
{
// test unsigned
args := []interface{}{3, "testname", "first", int8(-1), 23}
uniqueKeyColumns.SetUnsigned("position")
query, uniqueKeyArgs, err := BuildDMLDeleteQuery(databaseName, tableName, tableColumns, uniqueKeyColumns, args)
test.S(t).ExpectNil(err)
expected := `
delete /* gh-ost mydb.tbl */
from
mydb.tbl
where
((position = ?))
`
test.S(t).ExpectEquals(normalizeQuery(query), normalizeQuery(expected))
test.S(t).ExpectTrue(reflect.DeepEqual(uniqueKeyArgs, []interface{}{uint8(255)}))
}
}
func TestBuildDMLInsertQuery(t *testing.T) {
databaseName := "mydb"
tableName := "tbl"
@ -442,6 +480,61 @@ func TestBuildDMLInsertQuery(t *testing.T) {
}
}
func TestBuildDMLInsertQuerySignedUnsigned(t *testing.T) {
databaseName := "mydb"
tableName := "tbl"
tableColumns := NewColumnList([]string{"id", "name", "rank", "position", "age"})
sharedColumns := NewColumnList([]string{"id", "name", "position", "age"})
{
// testing signed
args := []interface{}{3, "testname", "first", int8(-1), 23}
sharedColumns := NewColumnList([]string{"id", "name", "position", "age"})
query, sharedArgs, err := BuildDMLInsertQuery(databaseName, tableName, tableColumns, sharedColumns, args)
test.S(t).ExpectNil(err)
expected := `
replace /* gh-ost mydb.tbl */
into mydb.tbl
(id, name, position, age)
values
(?, ?, ?, ?)
`
test.S(t).ExpectEquals(normalizeQuery(query), normalizeQuery(expected))
test.S(t).ExpectTrue(reflect.DeepEqual(sharedArgs, []interface{}{3, "testname", int8(-1), 23}))
}
{
// testing unsigned
args := []interface{}{3, "testname", "first", int8(-1), 23}
sharedColumns.SetUnsigned("position")
query, sharedArgs, err := BuildDMLInsertQuery(databaseName, tableName, tableColumns, sharedColumns, args)
test.S(t).ExpectNil(err)
expected := `
replace /* gh-ost mydb.tbl */
into mydb.tbl
(id, name, position, age)
values
(?, ?, ?, ?)
`
test.S(t).ExpectEquals(normalizeQuery(query), normalizeQuery(expected))
test.S(t).ExpectTrue(reflect.DeepEqual(sharedArgs, []interface{}{3, "testname", uint8(255), 23}))
}
{
// testing unsigned
args := []interface{}{3, "testname", "first", int32(-1), 23}
sharedColumns.SetUnsigned("position")
query, sharedArgs, err := BuildDMLInsertQuery(databaseName, tableName, tableColumns, sharedColumns, args)
test.S(t).ExpectNil(err)
expected := `
replace /* gh-ost mydb.tbl */
into mydb.tbl
(id, name, position, age)
values
(?, ?, ?, ?)
`
test.S(t).ExpectEquals(normalizeQuery(query), normalizeQuery(expected))
test.S(t).ExpectTrue(reflect.DeepEqual(sharedArgs, []interface{}{3, "testname", uint32(4294967295), 23}))
}
}
func TestBuildDMLUpdateQuery(t *testing.T) {
databaseName := "mydb"
tableName := "tbl"
@ -525,3 +618,45 @@ func TestBuildDMLUpdateQuery(t *testing.T) {
test.S(t).ExpectNotNil(err)
}
}
func TestBuildDMLUpdateQuerySignedUnsigned(t *testing.T) {
databaseName := "mydb"
tableName := "tbl"
tableColumns := NewColumnList([]string{"id", "name", "rank", "position", "age"})
valueArgs := []interface{}{3, "testname", "newval", int8(-17), int8(-2)}
whereArgs := []interface{}{3, "testname", "findme", int8(-3), 56}
sharedColumns := NewColumnList([]string{"id", "name", "position", "age"})
uniqueKeyColumns := NewColumnList([]string{"position"})
{
// test signed
query, sharedArgs, uniqueKeyArgs, err := BuildDMLUpdateQuery(databaseName, tableName, tableColumns, sharedColumns, uniqueKeyColumns, valueArgs, whereArgs)
test.S(t).ExpectNil(err)
expected := `
update /* gh-ost mydb.tbl */
mydb.tbl
set id=?, name=?, position=?, age=?
where
((position = ?))
`
test.S(t).ExpectEquals(normalizeQuery(query), normalizeQuery(expected))
test.S(t).ExpectTrue(reflect.DeepEqual(sharedArgs, []interface{}{3, "testname", int8(-17), int8(-2)}))
test.S(t).ExpectTrue(reflect.DeepEqual(uniqueKeyArgs, []interface{}{int8(-3)}))
}
{
// test unsigned
sharedColumns.SetUnsigned("age")
uniqueKeyColumns.SetUnsigned("position")
query, sharedArgs, uniqueKeyArgs, err := BuildDMLUpdateQuery(databaseName, tableName, tableColumns, sharedColumns, uniqueKeyColumns, valueArgs, whereArgs)
test.S(t).ExpectNil(err)
expected := `
update /* gh-ost mydb.tbl */
mydb.tbl
set id=?, name=?, position=?, age=?
where
((position = ?))
`
test.S(t).ExpectEquals(normalizeQuery(query), normalizeQuery(expected))
test.S(t).ExpectTrue(reflect.DeepEqual(sharedArgs, []interface{}{3, "testname", int8(-17), uint8(254)}))
test.S(t).ExpectTrue(reflect.DeepEqual(uniqueKeyArgs, []interface{}{uint8(253)}))
}
}

View File

@ -14,24 +14,31 @@ import (
// ColumnsMap maps a column onto its ordinal position
type ColumnsMap map[string]int
func NewColumnsMap(orderedNames []string) ColumnsMap {
func NewEmptyColumnsMap() ColumnsMap {
columnsMap := make(map[string]int)
return ColumnsMap(columnsMap)
}
func NewColumnsMap(orderedNames []string) ColumnsMap {
columnsMap := NewEmptyColumnsMap()
for i, column := range orderedNames {
columnsMap[column] = i
}
return ColumnsMap(columnsMap)
return columnsMap
}
// ColumnList makes for a named list of columns
type ColumnList struct {
Names []string
Ordinals ColumnsMap
Names []string
Ordinals ColumnsMap
UnsignedFlags ColumnsMap
}
// NewColumnList creates an object given ordered list of column names
func NewColumnList(names []string) *ColumnList {
result := &ColumnList{
Names: names,
Names: names,
UnsignedFlags: NewEmptyColumnsMap(),
}
result.Ordinals = NewColumnsMap(result.Names)
return result
@ -40,12 +47,21 @@ func NewColumnList(names []string) *ColumnList {
// ParseColumnList parses a comma delimited list of column names
func ParseColumnList(columns string) *ColumnList {
result := &ColumnList{
Names: strings.Split(columns, ","),
Names: strings.Split(columns, ","),
UnsignedFlags: NewEmptyColumnsMap(),
}
result.Ordinals = NewColumnsMap(result.Names)
return result
}
func (this *ColumnList) SetUnsigned(columnName string) {
this.UnsignedFlags[columnName] = 1
}
func (this *ColumnList) IsUnsigned(columnName string) bool {
return this.UnsignedFlags[columnName] == 1
}
func (this *ColumnList) String() string {
return strings.Join(this.Names, ",")
}