diff --git a/go/binlog/binlog_entry.go b/go/binlog/binlog_entry.go index bb70bc5..5650acc 100644 --- a/go/binlog/binlog_entry.go +++ b/go/binlog/binlog_entry.go @@ -26,7 +26,7 @@ func NewBinlogEntry(logFile string, logPos uint64) *BinlogEntry { return binlogEntry } -// NewBinlogEntry creates an empty, ready to go BinlogEntry object +// NewBinlogEntryAt creates an empty, ready to go BinlogEntry object func NewBinlogEntryAt(coordinates mysql.BinlogCoordinates) *BinlogEntry { binlogEntry := &BinlogEntry{ Coordinates: coordinates, @@ -41,7 +41,7 @@ func (this *BinlogEntry) Duplicate() *BinlogEntry { return binlogEntry } -// Duplicate creates and returns a new binlog entry, with some of the attributes pre-assigned +// String() returns a string representation of this binlog entry func (this *BinlogEntry) String() string { return fmt.Sprintf("[BinlogEntry at %+v; dml:%+v]", this.Coordinates, this.DmlEvent) } diff --git a/go/binlog/gomysql_reader.go b/go/binlog/gomysql_reader.go index 106f73d..794d545 100644 --- a/go/binlog/gomysql_reader.go +++ b/go/binlog/gomysql_reader.go @@ -112,7 +112,7 @@ func (this *GoMySQLReader) handleRowsEvent(ev *replication.BinlogEvent, rowsEven binlogEntry.DmlEvent.WhereColumnValues = sql.ToColumnValues(row) } } - // The channel will do the throttling. Whoever is reding from the channel + // The channel will do the throttling. Whoever is reading from the channel // decides whether action is taken synchronously (meaning we wait before // next iteration) or asynchronously (we keep pushing more events) // In reality, reads will be synchronous diff --git a/go/logic/applier.go b/go/logic/applier.go index ee9e066..140b1a4 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -126,7 +126,6 @@ func (this *Applier) readTableColumns() (err error) { // showTableStatus returns the output of `show table status like '...'` command func (this *Applier) showTableStatus(tableName string) (rowMap sqlutils.RowMap) { - rowMap = nil query := fmt.Sprintf(`show /* gh-ost */ table status from %s like '%s'`, sql.EscapeName(this.migrationContext.DatabaseName), tableName) sqlutils.QueryRowsMap(this.db, query, func(m sqlutils.RowMap) error { rowMap = m @@ -482,6 +481,7 @@ func (this *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected if err != nil { return nil, err } + defer tx.Rollback() sessionQuery := fmt.Sprintf(`SET SESSION time_zone = '%s', sql_mode = CONCAT(@@session.sql_mode, ',STRICT_ALL_TABLES') @@ -1001,15 +1001,19 @@ func (this *Applier) ApplyDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) error { if err != nil { return err } + rollback := func(err error) error { + tx.Rollback() + return err + } sessionQuery := `SET SESSION time_zone = '+00:00', sql_mode = CONCAT(@@session.sql_mode, ',STRICT_ALL_TABLES') ` if _, err := tx.Exec(sessionQuery); err != nil { - return err + return rollback(err) } if _, err := tx.Exec(buildResult.query, buildResult.args...); err != nil { - return err + return rollback(err) } if err := tx.Commit(); err != nil { return err diff --git a/go/logic/inspect.go b/go/logic/inspect.go index 5f875a0..9b2da44 100644 --- a/go/logic/inspect.go +++ b/go/logic/inspect.go @@ -692,14 +692,17 @@ func (this *Inspector) getSharedColumns(originalColumns, ghostColumns *sql.Colum for _, ghostColumn := range ghostColumns.Names() { if strings.EqualFold(originalColumn, ghostColumn) { isSharedColumn = true + break } if strings.EqualFold(columnRenameMap[originalColumn], ghostColumn) { isSharedColumn = true + break } } for droppedColumn := range this.migrationContext.DroppedColumnsMap { if strings.EqualFold(originalColumn, droppedColumn) { isSharedColumn = false + break } } for _, virtualColumn := range originalVirtualColumns.Names() { @@ -758,9 +761,8 @@ func (this *Inspector) getMasterConnectionConfig() (applierConfig *mysql.Connect } func (this *Inspector) getReplicationLag() (replicationLag time.Duration, err error) { - replicationLag, err = mysql.GetReplicationLag( + replicationLag, err = mysql.GetReplicationLagFromSlaveStatus( this.informationSchemaDb, - this.migrationContext.InspectorConnectionConfig, ) return replicationLag, err } diff --git a/go/logic/migrator.go b/go/logic/migrator.go index 4178f19..86de5ac 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -78,7 +78,7 @@ type Migrator struct { rowCopyCompleteFlag int64 // copyRowsQueue should not be buffered; if buffered some non-damaging but - // excessive work happens at the end of the iteration as new copy-jobs arrive befroe realizing the copy is complete + // excessive work happens at the end of the iteration as new copy-jobs arrive before realizing the copy is complete copyRowsQueue chan tableWriteFunc applyEventsQueue chan *applyEventStruct diff --git a/go/logic/throttler.go b/go/logic/throttler.go index 624956a..6f9f4bb 100644 --- a/go/logic/throttler.go +++ b/go/logic/throttler.go @@ -140,8 +140,8 @@ func (this *Throttler) collectReplicationLag(firstThrottlingCollected chan<- boo if this.migrationContext.TestOnReplica || this.migrationContext.MigrateOnReplica { // when running on replica, the heartbeat injection is also done on the replica. // This means we will always get a good heartbeat value. - // When runnign on replica, we should instead check the `SHOW SLAVE STATUS` output. - if lag, err := mysql.GetReplicationLag(this.inspector.informationSchemaDb, this.inspector.connectionConfig); err != nil { + // When running on replica, we should instead check the `SHOW SLAVE STATUS` output. + if lag, err := mysql.GetReplicationLagFromSlaveStatus(this.inspector.informationSchemaDb); err != nil { return log.Errore(err) } else { atomic.StoreInt64(&this.migrationContext.CurrentLag, int64(lag)) diff --git a/go/mysql/utils.go b/go/mysql/utils.go index 67ed7f8..17bb5fc 100644 --- a/go/mysql/utils.go +++ b/go/mysql/utils.go @@ -58,9 +58,8 @@ func GetDB(migrationUuid string, mysql_uri string) (*gosql.DB, bool, error) { return knownDBs[cacheKey], exists, nil } -// GetReplicationLag returns replication lag for a given connection config; either by explicit query -// or via SHOW SLAVE STATUS -func GetReplicationLag(informationSchemaDb *gosql.DB, connectionConfig *ConnectionConfig) (replicationLag time.Duration, err error) { +// GetReplicationLagFromSlaveStatus returns replication lag for a given db; via SHOW SLAVE STATUS +func GetReplicationLagFromSlaveStatus(informationSchemaDb *gosql.DB) (replicationLag time.Duration, err error) { err = sqlutils.QueryRowsMap(informationSchemaDb, `show slave status`, func(m sqlutils.RowMap) error { slaveIORunning := m.GetString("Slave_IO_Running") slaveSQLRunning := m.GetString("Slave_SQL_Running") @@ -84,9 +83,6 @@ func GetMasterKeyFromSlaveStatus(connectionConfig *ConnectionConfig) (masterKey } defer db.Close() - if err != nil { - return nil, err - } err = sqlutils.QueryRowsMap(db, `show slave status`, func(rowMap sqlutils.RowMap) error { // We wish to recognize the case where the topology's master actually has replication configuration. // This can happen when a DBA issues a `RESET SLAVE` instead of `RESET SLAVE ALL`. @@ -99,7 +95,6 @@ func GetMasterKeyFromSlaveStatus(connectionConfig *ConnectionConfig) (masterKey slaveIORunning := rowMap.GetString("Slave_IO_Running") slaveSQLRunning := rowMap.GetString("Slave_SQL_Running") - // if slaveIORunning != "Yes" || slaveSQLRunning != "Yes" { return fmt.Errorf("Replication on %+v is broken: Slave_IO_Running: %s, Slave_SQL_Running: %s. Please make sure replication runs before using gh-ost.", connectionConfig.Key, diff --git a/go/sql/builder.go b/go/sql/builder.go index c3a6229..2c5a7ae 100644 --- a/go/sql/builder.go +++ b/go/sql/builder.go @@ -140,13 +140,12 @@ func BuildRangeComparison(columns []string, values []string, args []interface{}, comparisons := []string{} for i, column := range columns { - // value := values[i] rangeComparison, err := BuildValueComparison(column, value, comparisonSign) if err != nil { return "", explodedArgs, err } - if len(columns[0:i]) > 0 { + if i > 0 { equalitiesComparison, err := BuildEqualsComparison(columns[0:i], values[0:i]) if err != nil { return "", explodedArgs, err