From 596dce5993f15ee257380bd5a78ca171456730f8 Mon Sep 17 00:00:00 2001 From: Shlomi Noach Date: Mon, 15 Aug 2016 15:23:30 +0200 Subject: [PATCH 1/7] elaborate output on error in apply dml --- go/logic/applier.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/go/logic/applier.go b/go/logic/applier.go index bb92fff..447c7d7 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -860,5 +860,8 @@ func (this *Applier) ApplyDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) error { if this.migrationContext.CountTableRows { atomic.AddInt64(&this.migrationContext.RowsEstimate, rowDelta) } + if err != nil { + err = fmt.Errorf("%s; query=%s; args=%+v", err.Error(), query, args) + } return err } From 7a0e2cfe3f2e477b75308e2496f309a7216469b3 Mon Sep 17 00:00:00 2001 From: Shlomi Noach Date: Wed, 17 Aug 2016 06:49:20 +0200 Subject: [PATCH 2/7] adding UnsignedFlags to ColumnsList --- go/sql/types.go | 28 ++++++++++++++++++++++------ 1 file changed, 22 insertions(+), 6 deletions(-) diff --git a/go/sql/types.go b/go/sql/types.go index cc36142..a8ffa9a 100644 --- a/go/sql/types.go +++ b/go/sql/types.go @@ -14,24 +14,31 @@ import ( // ColumnsMap maps a column onto its ordinal position type ColumnsMap map[string]int -func NewColumnsMap(orderedNames []string) ColumnsMap { +func NewEmptyColumnsMap() ColumnsMap { columnsMap := make(map[string]int) + return ColumnsMap(columnsMap) +} + +func NewColumnsMap(orderedNames []string) ColumnsMap { + columnsMap := NewEmptyColumnsMap() for i, column := range orderedNames { columnsMap[column] = i } - return ColumnsMap(columnsMap) + return columnsMap } // ColumnList makes for a named list of columns type ColumnList struct { - Names []string - Ordinals ColumnsMap + Names []string + Ordinals ColumnsMap + UnsignedFlags ColumnsMap } // NewColumnList creates an object given ordered list of column names func NewColumnList(names []string) *ColumnList { result := &ColumnList{ - Names: names, + Names: names, + UnsignedFlags: NewEmptyColumnsMap(), } result.Ordinals = NewColumnsMap(result.Names) return result @@ -40,12 +47,21 @@ func NewColumnList(names []string) *ColumnList { // ParseColumnList parses a comma delimited list of column names func ParseColumnList(columns string) *ColumnList { result := &ColumnList{ - Names: strings.Split(columns, ","), + Names: strings.Split(columns, ","), + UnsignedFlags: NewEmptyColumnsMap(), } result.Ordinals = NewColumnsMap(result.Names) return result } +func (this *ColumnList) SetUnsigned(columnName string) { + this.UnsignedFlags[columnName] = 1 +} + +func (this *ColumnList) IsUnsigned(columnName string) bool { + return this.UnsignedFlags[columnName] == 1 +} + func (this *ColumnList) String() string { return strings.Join(this.Names, ",") } From 4c8edf637207bc45510d007ed767818d4537664e Mon Sep 17 00:00:00 2001 From: Shlomi Noach Date: Wed, 17 Aug 2016 06:50:40 +0200 Subject: [PATCH 3/7] elaborate error message on applying event data: printing out the error, query and args --- go/logic/applier.go | 1 + 1 file changed, 1 insertion(+) diff --git a/go/logic/applier.go b/go/logic/applier.go index 447c7d7..63fec3e 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -862,6 +862,7 @@ func (this *Applier) ApplyDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) error { } if err != nil { err = fmt.Errorf("%s; query=%s; args=%+v", err.Error(), query, args) + log.Errore(err) } return err } From f00a9814e65adc45050cfd39361ef181af564ccd Mon Sep 17 00:00:00 2001 From: Shlomi Noach Date: Wed, 17 Aug 2016 06:51:06 +0200 Subject: [PATCH 4/7] legacy comments cleanup --- go/binlog/gomysql_reader.go | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/go/binlog/gomysql_reader.go b/go/binlog/gomysql_reader.go index 8fe9b41..ac6d890 100644 --- a/go/binlog/gomysql_reader.go +++ b/go/binlog/gomysql_reader.go @@ -128,33 +128,23 @@ func (this *GoMySQLReader) StreamEvents(canStopStreaming func() bool, entriesCha if err != nil { return err } - // if rand.Intn(1000) == 0 { - // this.binlogSyncer.Close() - // log.Debugf("current: %+v, hint: %+v", this.currentCoordinates, this.LastAppliedRowsEventHint) - // return log.Errorf(".............haha got random error") - // } - // log.Debugf("0001 ........ currentCoordinates: %+v", this.currentCoordinates) //TODO func() { this.currentCoordinatesMutex.Lock() defer this.currentCoordinatesMutex.Unlock() this.currentCoordinates.LogPos = int64(ev.Header.LogPos) }() if rotateEvent, ok := ev.Event.(*replication.RotateEvent); ok { - // log.Debugf("0008 ........ currentCoordinates: %+v", this.currentCoordinates) //TODO - // ev.Dump(os.Stdout) func() { this.currentCoordinatesMutex.Lock() defer this.currentCoordinatesMutex.Unlock() this.currentCoordinates.LogFile = string(rotateEvent.NextLogName) }() - // log.Debugf("0001 ........ currentCoordinates: %+v", this.currentCoordinates) //TODO log.Infof("rotate to next log name: %s", rotateEvent.NextLogName) } else if rowsEvent, ok := ev.Event.(*replication.RowsEvent); ok { if err := this.handleRowsEvent(ev, rowsEvent, entriesChannel); err != nil { return err } } - // log.Debugf("TODO ........ currentCoordinates: %+v", this.currentCoordinates) //TODO } log.Debugf("done streaming events") From 29d20316ba62dd595ee04eb932c92c92296ea50c Mon Sep 17 00:00:00 2001 From: Shlomi Noach Date: Wed, 17 Aug 2016 06:51:58 +0200 Subject: [PATCH 5/7] inspector applies unsigned flags to all migration related columns --- go/logic/inspect.go | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/go/logic/inspect.go b/go/logic/inspect.go index ae56719..54fd298 100644 --- a/go/logic/inspect.go +++ b/go/logic/inspect.go @@ -134,6 +134,13 @@ func (this *Inspector) InspectOriginalAndGhostTables() (err error) { this.migrationContext.SharedColumns, this.migrationContext.MappedSharedColumns = this.getSharedColumns(this.migrationContext.OriginalTableColumns, this.migrationContext.GhostTableColumns, this.migrationContext.ColumnRenameMap) log.Infof("Shared columns are %s", this.migrationContext.SharedColumns) // By fact that a non-empty unique key exists we also know the shared columns are non-empty + + // This additional step looks at which columns are unsigned. We could have merged this within + // the `getTableColumns()` function, but it's a later patch and introduces some complexity; I feel + // comfortable in doing this as a separate step. + this.applyUnsignedColumns(this.migrationContext.DatabaseName, this.migrationContext.OriginalTableName, this.migrationContext.OriginalTableColumns, this.migrationContext.SharedColumns) + this.applyUnsignedColumns(this.migrationContext.DatabaseName, this.migrationContext.GetGhostTableName(), this.migrationContext.GhostTableColumns, this.migrationContext.MappedSharedColumns) + return nil } @@ -450,6 +457,26 @@ func (this *Inspector) getTableColumns(databaseName, tableName string) (*sql.Col return sql.NewColumnList(columnNames), nil } +// applyUnsignedColumns +func (this *Inspector) applyUnsignedColumns(databaseName, tableName string, columnsLists ...*sql.ColumnList) error { + query := fmt.Sprintf(` + show columns from %s.%s + `, + sql.EscapeName(databaseName), + sql.EscapeName(tableName), + ) + err := sqlutils.QueryRowsMap(this.db, query, func(rowMap sqlutils.RowMap) error { + columnName := rowMap.GetString("Field") + if strings.Contains(rowMap.GetString("Type"), "unsigned") { + for _, columnsList := range columnsLists { + columnsList.SetUnsigned(columnName) + } + } + return nil + }) + return err +} + // getCandidateUniqueKeys investigates a table and returns the list of unique keys // candidate for chunking func (this *Inspector) getCandidateUniqueKeys(tableName string) (uniqueKeys [](*sql.UniqueKey), err error) { From 16d76aa299ba02b0f2bdc8ef0b90ce134b624fec Mon Sep 17 00:00:00 2001 From: Shlomi Noach Date: Wed, 17 Aug 2016 06:52:23 +0200 Subject: [PATCH 6/7] builder fixes arg type from signed to unsigned based on UnsingedFlags --- go/sql/builder.go | 35 +++++++++-- go/sql/builder_test.go | 135 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 166 insertions(+), 4 deletions(-) diff --git a/go/sql/builder.go b/go/sql/builder.go index 6ccdb62..6c2e4b2 100644 --- a/go/sql/builder.go +++ b/go/sql/builder.go @@ -32,6 +32,29 @@ func EscapeName(name string) string { return fmt.Sprintf("`%s`", name) } +func fixArgType(arg interface{}, isUnsigned bool) interface{} { + if !isUnsigned { + return arg + } + // unsigned + if i, ok := arg.(int8); ok { + return uint8(i) + } + if i, ok := arg.(int16); ok { + return uint16(i) + } + if i, ok := arg.(int32); ok { + return uint32(i) + } + if i, ok := arg.(int64); ok { + return uint64(i) + } + if i, ok := arg.(int); ok { + return uint(i) + } + return arg +} + func buildPreparedValues(length int) []string { values := make([]string, length, length) for i := 0; i < length; i++ { @@ -309,7 +332,8 @@ func BuildDMLDeleteQuery(databaseName, tableName string, tableColumns, uniqueKey } for _, column := range uniqueKeyColumns.Names { tableOrdinal := tableColumns.Ordinals[column] - uniqueKeyArgs = append(uniqueKeyArgs, args[tableOrdinal]) + arg := fixArgType(args[tableOrdinal], uniqueKeyColumns.IsUnsigned(column)) + uniqueKeyArgs = append(uniqueKeyArgs, arg) } databaseName = EscapeName(databaseName) tableName = EscapeName(tableName) @@ -345,7 +369,8 @@ func BuildDMLInsertQuery(databaseName, tableName string, tableColumns, sharedCol for _, column := range sharedColumns.Names { tableOrdinal := tableColumns.Ordinals[column] - sharedArgs = append(sharedArgs, args[tableOrdinal]) + arg := fixArgType(args[tableOrdinal], sharedColumns.IsUnsigned(column)) + sharedArgs = append(sharedArgs, arg) } sharedColumnNames := duplicateNames(sharedColumns.Names) @@ -392,12 +417,14 @@ func BuildDMLUpdateQuery(databaseName, tableName string, tableColumns, sharedCol for _, column := range sharedColumns.Names { tableOrdinal := tableColumns.Ordinals[column] - sharedArgs = append(sharedArgs, valueArgs[tableOrdinal]) + arg := fixArgType(valueArgs[tableOrdinal], sharedColumns.IsUnsigned(column)) + sharedArgs = append(sharedArgs, arg) } for _, column := range uniqueKeyColumns.Names { tableOrdinal := tableColumns.Ordinals[column] - uniqueKeyArgs = append(uniqueKeyArgs, whereArgs[tableOrdinal]) + arg := fixArgType(whereArgs[tableOrdinal], uniqueKeyColumns.IsUnsigned(column)) + uniqueKeyArgs = append(uniqueKeyArgs, arg) } sharedColumnNames := duplicateNames(sharedColumns.Names) diff --git a/go/sql/builder_test.go b/go/sql/builder_test.go index 0ca4d38..db2617e 100644 --- a/go/sql/builder_test.go +++ b/go/sql/builder_test.go @@ -397,6 +397,44 @@ func TestBuildDMLDeleteQuery(t *testing.T) { } } +func TestBuildDMLDeleteQuerySignedUnsigned(t *testing.T) { + databaseName := "mydb" + tableName := "tbl" + tableColumns := NewColumnList([]string{"id", "name", "rank", "position", "age"}) + uniqueKeyColumns := NewColumnList([]string{"position"}) + { + // test signed (expect no change) + args := []interface{}{3, "testname", "first", -1, 23} + query, uniqueKeyArgs, err := BuildDMLDeleteQuery(databaseName, tableName, tableColumns, uniqueKeyColumns, args) + test.S(t).ExpectNil(err) + expected := ` + delete /* gh-ost mydb.tbl */ + from + mydb.tbl + where + ((position = ?)) + ` + test.S(t).ExpectEquals(normalizeQuery(query), normalizeQuery(expected)) + test.S(t).ExpectTrue(reflect.DeepEqual(uniqueKeyArgs, []interface{}{-1})) + } + { + // test unsigned + args := []interface{}{3, "testname", "first", int8(-1), 23} + uniqueKeyColumns.SetUnsigned("position") + query, uniqueKeyArgs, err := BuildDMLDeleteQuery(databaseName, tableName, tableColumns, uniqueKeyColumns, args) + test.S(t).ExpectNil(err) + expected := ` + delete /* gh-ost mydb.tbl */ + from + mydb.tbl + where + ((position = ?)) + ` + test.S(t).ExpectEquals(normalizeQuery(query), normalizeQuery(expected)) + test.S(t).ExpectTrue(reflect.DeepEqual(uniqueKeyArgs, []interface{}{uint8(255)})) + } +} + func TestBuildDMLInsertQuery(t *testing.T) { databaseName := "mydb" tableName := "tbl" @@ -442,6 +480,61 @@ func TestBuildDMLInsertQuery(t *testing.T) { } } +func TestBuildDMLInsertQuerySignedUnsigned(t *testing.T) { + databaseName := "mydb" + tableName := "tbl" + tableColumns := NewColumnList([]string{"id", "name", "rank", "position", "age"}) + sharedColumns := NewColumnList([]string{"id", "name", "position", "age"}) + { + // testing signed + args := []interface{}{3, "testname", "first", int8(-1), 23} + sharedColumns := NewColumnList([]string{"id", "name", "position", "age"}) + query, sharedArgs, err := BuildDMLInsertQuery(databaseName, tableName, tableColumns, sharedColumns, args) + test.S(t).ExpectNil(err) + expected := ` + replace /* gh-ost mydb.tbl */ + into mydb.tbl + (id, name, position, age) + values + (?, ?, ?, ?) + ` + test.S(t).ExpectEquals(normalizeQuery(query), normalizeQuery(expected)) + test.S(t).ExpectTrue(reflect.DeepEqual(sharedArgs, []interface{}{3, "testname", int8(-1), 23})) + } + { + // testing unsigned + args := []interface{}{3, "testname", "first", int8(-1), 23} + sharedColumns.SetUnsigned("position") + query, sharedArgs, err := BuildDMLInsertQuery(databaseName, tableName, tableColumns, sharedColumns, args) + test.S(t).ExpectNil(err) + expected := ` + replace /* gh-ost mydb.tbl */ + into mydb.tbl + (id, name, position, age) + values + (?, ?, ?, ?) + ` + test.S(t).ExpectEquals(normalizeQuery(query), normalizeQuery(expected)) + test.S(t).ExpectTrue(reflect.DeepEqual(sharedArgs, []interface{}{3, "testname", uint8(255), 23})) + } + { + // testing unsigned + args := []interface{}{3, "testname", "first", int32(-1), 23} + sharedColumns.SetUnsigned("position") + query, sharedArgs, err := BuildDMLInsertQuery(databaseName, tableName, tableColumns, sharedColumns, args) + test.S(t).ExpectNil(err) + expected := ` + replace /* gh-ost mydb.tbl */ + into mydb.tbl + (id, name, position, age) + values + (?, ?, ?, ?) + ` + test.S(t).ExpectEquals(normalizeQuery(query), normalizeQuery(expected)) + test.S(t).ExpectTrue(reflect.DeepEqual(sharedArgs, []interface{}{3, "testname", uint32(4294967295), 23})) + } +} + func TestBuildDMLUpdateQuery(t *testing.T) { databaseName := "mydb" tableName := "tbl" @@ -525,3 +618,45 @@ func TestBuildDMLUpdateQuery(t *testing.T) { test.S(t).ExpectNotNil(err) } } + +func TestBuildDMLUpdateQuerySignedUnsigned(t *testing.T) { + databaseName := "mydb" + tableName := "tbl" + tableColumns := NewColumnList([]string{"id", "name", "rank", "position", "age"}) + valueArgs := []interface{}{3, "testname", "newval", int8(-17), int8(-2)} + whereArgs := []interface{}{3, "testname", "findme", int8(-3), 56} + sharedColumns := NewColumnList([]string{"id", "name", "position", "age"}) + uniqueKeyColumns := NewColumnList([]string{"position"}) + { + // test signed + query, sharedArgs, uniqueKeyArgs, err := BuildDMLUpdateQuery(databaseName, tableName, tableColumns, sharedColumns, uniqueKeyColumns, valueArgs, whereArgs) + test.S(t).ExpectNil(err) + expected := ` + update /* gh-ost mydb.tbl */ + mydb.tbl + set id=?, name=?, position=?, age=? + where + ((position = ?)) + ` + test.S(t).ExpectEquals(normalizeQuery(query), normalizeQuery(expected)) + test.S(t).ExpectTrue(reflect.DeepEqual(sharedArgs, []interface{}{3, "testname", int8(-17), int8(-2)})) + test.S(t).ExpectTrue(reflect.DeepEqual(uniqueKeyArgs, []interface{}{int8(-3)})) + } + { + // test unsigned + sharedColumns.SetUnsigned("age") + uniqueKeyColumns.SetUnsigned("position") + query, sharedArgs, uniqueKeyArgs, err := BuildDMLUpdateQuery(databaseName, tableName, tableColumns, sharedColumns, uniqueKeyColumns, valueArgs, whereArgs) + test.S(t).ExpectNil(err) + expected := ` + update /* gh-ost mydb.tbl */ + mydb.tbl + set id=?, name=?, position=?, age=? + where + ((position = ?)) + ` + test.S(t).ExpectEquals(normalizeQuery(query), normalizeQuery(expected)) + test.S(t).ExpectTrue(reflect.DeepEqual(sharedArgs, []interface{}{3, "testname", int8(-17), uint8(254)})) + test.S(t).ExpectTrue(reflect.DeepEqual(uniqueKeyArgs, []interface{}{uint8(253)})) + } +} From 3a0ee9b4a5637d2345a7726d1e5539c236601a85 Mon Sep 17 00:00:00 2001 From: Shlomi Noach Date: Wed, 17 Aug 2016 10:50:41 +0200 Subject: [PATCH 7/7] clarified commented transactional apply --- build.sh | 2 +- go/logic/applier.go | 25 +++++++++++++++++++++++++ 2 files changed, 26 insertions(+), 1 deletion(-) diff --git a/build.sh b/build.sh index 2aa297b..150250b 100755 --- a/build.sh +++ b/build.sh @@ -2,7 +2,7 @@ # # -RELEASE_VERSION="1.0.9" +RELEASE_VERSION="1.0.10" function build { osname=$1 diff --git a/go/logic/applier.go b/go/logic/applier.go index 63fec3e..951addb 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -853,6 +853,31 @@ func (this *Applier) ApplyDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) error { if err != nil { return err } + + // TODO The below is commented, and is in preparation for transactional writes on the ghost tables. + // Such writes would be, for example: + // - prepended with sql_mode setup + // - prepended with SET SQL_LOG_BIN=0 + // - prepended with SET FK_CHECKS=0 + // etc. + // + // Current known problem: https://github.com/golang/go/issues/9373 -- bitint unsigned values, not supported in database/sql + // + + // err = func() error { + // tx, err := this.db.Begin() + // if err != nil { + // return err + // } + // if _, err := tx.Exec(query, args...); err != nil { + // return err + // } + // if err := tx.Commit(); err != nil { + // return err + // } + // return nil + // }() + _, err = sqlutils.Exec(this.db, query, args...) if err == nil { atomic.AddInt64(&this.migrationContext.TotalDMLEventsApplied, 1)