diff --git a/go/binlog/gomysql_reader.go b/go/binlog/gomysql_reader.go index bc96400..b07900d 100644 --- a/go/binlog/gomysql_reader.go +++ b/go/binlog/gomysql_reader.go @@ -78,9 +78,9 @@ func (this *GoMySQLReader) ReadEntries(logFile string, startPos uint64, stopPos for _, rows := range rowsEvent.Rows { for j, d := range rows { if _, ok := d.([]byte); ok { - fmt.Print(fmt.Sprintf("yesbin %d:%q, %+v\n", j, d, reflect.TypeOf(d))) + fmt.Print(fmt.Sprintf("%d:%q, %+v\n", j, d, reflect.TypeOf(d))) } else { - fmt.Print(fmt.Sprintf("notbin %d:%#v, %+v\n", j, d, reflect.TypeOf(d))) + fmt.Print(fmt.Sprintf("%d:%#v, %+v\n", j, d, reflect.TypeOf(d))) } } fmt.Println("---") diff --git a/go/logic/applier.go b/go/logic/applier.go index d82bf0c..b35cc88 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -190,16 +190,15 @@ func (this *Applier) IterationIsComplete() (bool, error) { } func (this *Applier) CalculateNextIterationRangeEndValues() error { - startingFromValues := this.migrationContext.MigrationRangeMinValues this.migrationContext.MigrationIterationRangeMinValues = this.migrationContext.MigrationIterationRangeMaxValues - if this.migrationContext.MigrationIterationRangeMinValues != nil { - startingFromValues = this.migrationContext.MigrationIterationRangeMinValues + if this.migrationContext.MigrationIterationRangeMinValues == nil { + this.migrationContext.MigrationIterationRangeMinValues = this.migrationContext.MigrationRangeMinValues } query, explodedArgs, err := sql.BuildUniqueKeyRangeEndPreparedQuery( this.migrationContext.DatabaseName, this.migrationContext.OriginalTableName, this.migrationContext.UniqueKey.Columns, - startingFromValues.AbstractValues(), + this.migrationContext.MigrationIterationRangeMinValues.AbstractValues(), this.migrationContext.MigrationRangeMaxValues.AbstractValues(), this.migrationContext.ChunkSize, fmt.Sprintf("iteration:%d", this.migrationContext.Iteration), @@ -224,7 +223,40 @@ func (this *Applier) CalculateNextIterationRangeEndValues() error { return nil } this.migrationContext.MigrationIterationRangeMaxValues = iterationRangeMaxValues - log.Debugf("column values: %s; iteration: %d; chunk-size: %d", this.migrationContext.MigrationIterationRangeMaxValues, this.migrationContext.Iteration, this.migrationContext.ChunkSize) + log.Debugf( + "column values: [%s]..[%s]; iteration: %d; chunk-size: %d", + this.migrationContext.MigrationIterationRangeMinValues, + this.migrationContext.MigrationIterationRangeMaxValues, + this.migrationContext.Iteration, + this.migrationContext.ChunkSize, + ) + return nil +} + +func (this *Applier) ApplyIterationInsertQuery() error { + query, explodedArgs, err := sql.BuildRangeInsertPreparedQuery( + this.migrationContext.DatabaseName, + this.migrationContext.OriginalTableName, + this.migrationContext.GetGhostTableName(), + this.migrationContext.UniqueKey.Columns, + this.migrationContext.UniqueKey.Name, + this.migrationContext.UniqueKey.Columns, + this.migrationContext.MigrationIterationRangeMinValues.AbstractValues(), + this.migrationContext.MigrationIterationRangeMaxValues.AbstractValues(), + this.migrationContext.Iteration == 0, + ) + if err != nil { + return err + } + if _, err := sqlutils.Exec(this.db, query, explodedArgs...); err != nil { + return err + } + log.Debugf( + "Issued INSERT on range: [%s]..[%s]; iteration: %d; chunk-size: %d", + this.migrationContext.MigrationIterationRangeMinValues, + this.migrationContext.MigrationIterationRangeMaxValues, + this.migrationContext.Iteration, + this.migrationContext.ChunkSize) return nil } diff --git a/go/logic/inspect.go b/go/logic/inspect.go index b793432..7cd8b3c 100644 --- a/go/logic/inspect.go +++ b/go/logic/inspect.go @@ -65,7 +65,7 @@ func (this *Inspector) InitDBConnections() (err error) { return nil } -func (this *Inspector) InspectTables() (uniqueKeys [](*sql.UniqueKey), err error) { +func (this *Inspector) InspectOriginalTable() (uniqueKeys [](*sql.UniqueKey), err error) { uniqueKeys, err = this.getCandidateUniqueKeys(this.migrationContext.OriginalTableName) if err != nil { return uniqueKeys, err @@ -132,7 +132,7 @@ func (this *Inspector) validateGrants() error { return log.Errorf("User has insufficient privileges for migration.") } -// validateConnection issues a simple can-connect to MySQL +// validateBinlogs checks that binary log configuration is good to go func (this *Inspector) validateBinlogs() error { query := `select @@global.log_bin, @@global.log_slave_updates, @@global.binlog_format` var hasBinaryLogs, logSlaveUpdates bool @@ -260,6 +260,29 @@ func (this *Inspector) countTableRows() error { return nil } +func (this *Inspector) getTableColumns(databaseName, tableName string) (columns sql.ColumnList, err error) { + query := fmt.Sprintf(` + show columns from %s.%s + `, + sql.EscapeName(databaseName), + sql.EscapeName(tableName), + ) + err = sqlutils.QueryRowsMap(this.db, query, func(rowMap sqlutils.RowMap) error { + columns = append(columns, rowMap.GetString("Field")) + return nil + }) + if err != nil { + return columns, err + } + if len(columns) == 0 { + return columns, log.Errorf("Found 0 columns on %s.%s. Bailing out", + sql.EscapeName(databaseName), + sql.EscapeName(tableName), + ) + } + return columns, nil +} + // getCandidateUniqueKeys investigates a table and returns the list of unique keys // candidate for chunking func (this *Inspector) getCandidateUniqueKeys(tableName string) (uniqueKeys [](*sql.UniqueKey), err error) { diff --git a/go/logic/migrator.go b/go/logic/migrator.go index 63a73b4..63bcd16 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -17,6 +17,7 @@ import ( type Migrator struct { inspector *Inspector applier *Applier + eventsStreamer *EventsStreamer migrationContext *base.MigrationContext } @@ -38,24 +39,29 @@ func (this *Migrator) Migrate() (err error) { return fmt.Errorf("It seems like this migration attempt to run directly on master. Preferably it would be executed on a replica (and this reduces load from the master). To proceed please provide --allow-on-master") } log.Infof("Master found to be %+v", this.migrationContext.MasterConnectionConfig.Key) - uniqueKeys, err := this.inspector.InspectTables() + uniqueKeys, err := this.inspector.InspectOriginalTable() if err != nil { return err } + this.eventsStreamer = NewEventsStreamer() + if err := this.eventsStreamer.InitDBConnections(); err != nil { + return err + } + this.applier = NewApplier() if err := this.applier.InitDBConnections(); err != nil { return err } - // if err := this.applier.CreateGhostTable(); err != nil { - // log.Errorf("Unable to create ghost table, see further error details. Perhaps a previous migration failed without dropping the table? Bailing out") - // return err - // } - // if err := this.applier.AlterGhost(); err != nil { - // log.Errorf("Unable to ALTER ghost table, see further error details. Bailing out") - // return err - // } - this.migrationContext.UniqueKey = uniqueKeys[0] + if err := this.applier.CreateGhostTable(); err != nil { + log.Errorf("Unable to create ghost table, see further error details. Perhaps a previous migration failed without dropping the table? Bailing out") + return err + } + if err := this.applier.AlterGhost(); err != nil { + log.Errorf("Unable to ALTER ghost table, see further error details. Bailing out") + return err + } + this.migrationContext.UniqueKey = uniqueKeys[0] // TODO. Need to wait on replica till the ghost table exists and get shared keys if err := this.applier.ReadMigrationRangeValues(); err != nil { return err } @@ -67,15 +73,17 @@ func (this *Migrator) Migrate() (err error) { if isComplete { break } - err = this.applier.CalculateNextIterationRangeEndValues() - if err != nil { + if err = this.applier.CalculateNextIterationRangeEndValues(); err != nil { + return err + } + if err = this.applier.ApplyIterationInsertQuery(); err != nil { return err } this.migrationContext.Iteration++ } - if err := this.applier.IterateTable(uniqueKeys[0]); err != nil { - return err - } + // if err := this.applier.IterateTable(uniqueKeys[0]); err != nil { + // return err + // } return nil } diff --git a/go/mysql/instance_key.go b/go/mysql/instance_key.go index 93f2cd5..06f6bd5 100644 --- a/go/mysql/instance_key.go +++ b/go/mysql/instance_key.go @@ -1,17 +1,6 @@ /* Copyright 2015 Shlomi Noach, courtesy Booking.com - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. + See https://github.com/github/gh-osc/blob/master/LICENSE */ package mysql diff --git a/go/mysql/instance_key_map.go b/go/mysql/instance_key_map.go index ca2be67..85e3b6a 100644 --- a/go/mysql/instance_key_map.go +++ b/go/mysql/instance_key_map.go @@ -1,17 +1,6 @@ /* Copyright 2015 Shlomi Noach, courtesy Booking.com - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. + See https://github.com/github/gh-osc/blob/master/LICENSE */ package mysql