Merge pull request #578 from MOON-CLJ/minor_changes

minor changes and typo correction
This commit is contained in:
Shlomi Noach 2019-01-23 08:13:04 +02:00 committed by GitHub
commit e48844de0b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 20 additions and 20 deletions

View File

@ -26,7 +26,7 @@ func NewBinlogEntry(logFile string, logPos uint64) *BinlogEntry {
return binlogEntry return binlogEntry
} }
// NewBinlogEntry creates an empty, ready to go BinlogEntry object // NewBinlogEntryAt creates an empty, ready to go BinlogEntry object
func NewBinlogEntryAt(coordinates mysql.BinlogCoordinates) *BinlogEntry { func NewBinlogEntryAt(coordinates mysql.BinlogCoordinates) *BinlogEntry {
binlogEntry := &BinlogEntry{ binlogEntry := &BinlogEntry{
Coordinates: coordinates, Coordinates: coordinates,
@ -41,7 +41,7 @@ func (this *BinlogEntry) Duplicate() *BinlogEntry {
return binlogEntry return binlogEntry
} }
// Duplicate creates and returns a new binlog entry, with some of the attributes pre-assigned // String() returns a string representation of this binlog entry
func (this *BinlogEntry) String() string { func (this *BinlogEntry) String() string {
return fmt.Sprintf("[BinlogEntry at %+v; dml:%+v]", this.Coordinates, this.DmlEvent) return fmt.Sprintf("[BinlogEntry at %+v; dml:%+v]", this.Coordinates, this.DmlEvent)
} }

View File

@ -112,7 +112,7 @@ func (this *GoMySQLReader) handleRowsEvent(ev *replication.BinlogEvent, rowsEven
binlogEntry.DmlEvent.WhereColumnValues = sql.ToColumnValues(row) binlogEntry.DmlEvent.WhereColumnValues = sql.ToColumnValues(row)
} }
} }
// The channel will do the throttling. Whoever is reding from the channel // The channel will do the throttling. Whoever is reading from the channel
// decides whether action is taken synchronously (meaning we wait before // decides whether action is taken synchronously (meaning we wait before
// next iteration) or asynchronously (we keep pushing more events) // next iteration) or asynchronously (we keep pushing more events)
// In reality, reads will be synchronous // In reality, reads will be synchronous

View File

@ -126,7 +126,6 @@ func (this *Applier) readTableColumns() (err error) {
// showTableStatus returns the output of `show table status like '...'` command // showTableStatus returns the output of `show table status like '...'` command
func (this *Applier) showTableStatus(tableName string) (rowMap sqlutils.RowMap) { func (this *Applier) showTableStatus(tableName string) (rowMap sqlutils.RowMap) {
rowMap = nil
query := fmt.Sprintf(`show /* gh-ost */ table status from %s like '%s'`, sql.EscapeName(this.migrationContext.DatabaseName), tableName) query := fmt.Sprintf(`show /* gh-ost */ table status from %s like '%s'`, sql.EscapeName(this.migrationContext.DatabaseName), tableName)
sqlutils.QueryRowsMap(this.db, query, func(m sqlutils.RowMap) error { sqlutils.QueryRowsMap(this.db, query, func(m sqlutils.RowMap) error {
rowMap = m rowMap = m
@ -482,6 +481,7 @@ func (this *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected
if err != nil { if err != nil {
return nil, err return nil, err
} }
defer tx.Rollback()
sessionQuery := fmt.Sprintf(`SET sessionQuery := fmt.Sprintf(`SET
SESSION time_zone = '%s', SESSION time_zone = '%s',
sql_mode = CONCAT(@@session.sql_mode, ',STRICT_ALL_TABLES') sql_mode = CONCAT(@@session.sql_mode, ',STRICT_ALL_TABLES')
@ -1001,15 +1001,19 @@ func (this *Applier) ApplyDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) error {
if err != nil { if err != nil {
return err return err
} }
rollback := func(err error) error {
tx.Rollback()
return err
}
sessionQuery := `SET sessionQuery := `SET
SESSION time_zone = '+00:00', SESSION time_zone = '+00:00',
sql_mode = CONCAT(@@session.sql_mode, ',STRICT_ALL_TABLES') sql_mode = CONCAT(@@session.sql_mode, ',STRICT_ALL_TABLES')
` `
if _, err := tx.Exec(sessionQuery); err != nil { if _, err := tx.Exec(sessionQuery); err != nil {
return err return rollback(err)
} }
if _, err := tx.Exec(buildResult.query, buildResult.args...); err != nil { if _, err := tx.Exec(buildResult.query, buildResult.args...); err != nil {
return err return rollback(err)
} }
if err := tx.Commit(); err != nil { if err := tx.Commit(); err != nil {
return err return err

View File

@ -692,14 +692,17 @@ func (this *Inspector) getSharedColumns(originalColumns, ghostColumns *sql.Colum
for _, ghostColumn := range ghostColumns.Names() { for _, ghostColumn := range ghostColumns.Names() {
if strings.EqualFold(originalColumn, ghostColumn) { if strings.EqualFold(originalColumn, ghostColumn) {
isSharedColumn = true isSharedColumn = true
break
} }
if strings.EqualFold(columnRenameMap[originalColumn], ghostColumn) { if strings.EqualFold(columnRenameMap[originalColumn], ghostColumn) {
isSharedColumn = true isSharedColumn = true
break
} }
} }
for droppedColumn := range this.migrationContext.DroppedColumnsMap { for droppedColumn := range this.migrationContext.DroppedColumnsMap {
if strings.EqualFold(originalColumn, droppedColumn) { if strings.EqualFold(originalColumn, droppedColumn) {
isSharedColumn = false isSharedColumn = false
break
} }
} }
for _, virtualColumn := range originalVirtualColumns.Names() { for _, virtualColumn := range originalVirtualColumns.Names() {
@ -758,9 +761,8 @@ func (this *Inspector) getMasterConnectionConfig() (applierConfig *mysql.Connect
} }
func (this *Inspector) getReplicationLag() (replicationLag time.Duration, err error) { func (this *Inspector) getReplicationLag() (replicationLag time.Duration, err error) {
replicationLag, err = mysql.GetReplicationLag( replicationLag, err = mysql.GetReplicationLagFromSlaveStatus(
this.informationSchemaDb, this.informationSchemaDb,
this.migrationContext.InspectorConnectionConfig,
) )
return replicationLag, err return replicationLag, err
} }

View File

@ -78,7 +78,7 @@ type Migrator struct {
rowCopyCompleteFlag int64 rowCopyCompleteFlag int64
// copyRowsQueue should not be buffered; if buffered some non-damaging but // copyRowsQueue should not be buffered; if buffered some non-damaging but
// excessive work happens at the end of the iteration as new copy-jobs arrive befroe realizing the copy is complete // excessive work happens at the end of the iteration as new copy-jobs arrive before realizing the copy is complete
copyRowsQueue chan tableWriteFunc copyRowsQueue chan tableWriteFunc
applyEventsQueue chan *applyEventStruct applyEventsQueue chan *applyEventStruct

View File

@ -140,8 +140,8 @@ func (this *Throttler) collectReplicationLag(firstThrottlingCollected chan<- boo
if this.migrationContext.TestOnReplica || this.migrationContext.MigrateOnReplica { if this.migrationContext.TestOnReplica || this.migrationContext.MigrateOnReplica {
// when running on replica, the heartbeat injection is also done on the replica. // when running on replica, the heartbeat injection is also done on the replica.
// This means we will always get a good heartbeat value. // This means we will always get a good heartbeat value.
// When runnign on replica, we should instead check the `SHOW SLAVE STATUS` output. // When running on replica, we should instead check the `SHOW SLAVE STATUS` output.
if lag, err := mysql.GetReplicationLag(this.inspector.informationSchemaDb, this.inspector.connectionConfig); err != nil { if lag, err := mysql.GetReplicationLagFromSlaveStatus(this.inspector.informationSchemaDb); err != nil {
return log.Errore(err) return log.Errore(err)
} else { } else {
atomic.StoreInt64(&this.migrationContext.CurrentLag, int64(lag)) atomic.StoreInt64(&this.migrationContext.CurrentLag, int64(lag))

View File

@ -58,9 +58,8 @@ func GetDB(migrationUuid string, mysql_uri string) (*gosql.DB, bool, error) {
return knownDBs[cacheKey], exists, nil return knownDBs[cacheKey], exists, nil
} }
// GetReplicationLag returns replication lag for a given connection config; either by explicit query // GetReplicationLagFromSlaveStatus returns replication lag for a given db; via SHOW SLAVE STATUS
// or via SHOW SLAVE STATUS func GetReplicationLagFromSlaveStatus(informationSchemaDb *gosql.DB) (replicationLag time.Duration, err error) {
func GetReplicationLag(informationSchemaDb *gosql.DB, connectionConfig *ConnectionConfig) (replicationLag time.Duration, err error) {
err = sqlutils.QueryRowsMap(informationSchemaDb, `show slave status`, func(m sqlutils.RowMap) error { err = sqlutils.QueryRowsMap(informationSchemaDb, `show slave status`, func(m sqlutils.RowMap) error {
slaveIORunning := m.GetString("Slave_IO_Running") slaveIORunning := m.GetString("Slave_IO_Running")
slaveSQLRunning := m.GetString("Slave_SQL_Running") slaveSQLRunning := m.GetString("Slave_SQL_Running")
@ -84,9 +83,6 @@ func GetMasterKeyFromSlaveStatus(connectionConfig *ConnectionConfig) (masterKey
} }
defer db.Close() defer db.Close()
if err != nil {
return nil, err
}
err = sqlutils.QueryRowsMap(db, `show slave status`, func(rowMap sqlutils.RowMap) error { err = sqlutils.QueryRowsMap(db, `show slave status`, func(rowMap sqlutils.RowMap) error {
// We wish to recognize the case where the topology's master actually has replication configuration. // We wish to recognize the case where the topology's master actually has replication configuration.
// This can happen when a DBA issues a `RESET SLAVE` instead of `RESET SLAVE ALL`. // This can happen when a DBA issues a `RESET SLAVE` instead of `RESET SLAVE ALL`.
@ -99,7 +95,6 @@ func GetMasterKeyFromSlaveStatus(connectionConfig *ConnectionConfig) (masterKey
slaveIORunning := rowMap.GetString("Slave_IO_Running") slaveIORunning := rowMap.GetString("Slave_IO_Running")
slaveSQLRunning := rowMap.GetString("Slave_SQL_Running") slaveSQLRunning := rowMap.GetString("Slave_SQL_Running")
//
if slaveIORunning != "Yes" || slaveSQLRunning != "Yes" { if slaveIORunning != "Yes" || slaveSQLRunning != "Yes" {
return fmt.Errorf("Replication on %+v is broken: Slave_IO_Running: %s, Slave_SQL_Running: %s. Please make sure replication runs before using gh-ost.", return fmt.Errorf("Replication on %+v is broken: Slave_IO_Running: %s, Slave_SQL_Running: %s. Please make sure replication runs before using gh-ost.",
connectionConfig.Key, connectionConfig.Key,

View File

@ -140,13 +140,12 @@ func BuildRangeComparison(columns []string, values []string, args []interface{},
comparisons := []string{} comparisons := []string{}
for i, column := range columns { for i, column := range columns {
//
value := values[i] value := values[i]
rangeComparison, err := BuildValueComparison(column, value, comparisonSign) rangeComparison, err := BuildValueComparison(column, value, comparisonSign)
if err != nil { if err != nil {
return "", explodedArgs, err return "", explodedArgs, err
} }
if len(columns[0:i]) > 0 { if i > 0 {
equalitiesComparison, err := BuildEqualsComparison(columns[0:i], values[0:i]) equalitiesComparison, err := BuildEqualsComparison(columns[0:i], values[0:i])
if err != nil { if err != nil {
return "", explodedArgs, err return "", explodedArgs, err