minor changes and typo correction
This commit is contained in:
parent
bba8b257d1
commit
21d455013e
@ -26,7 +26,7 @@ func NewBinlogEntry(logFile string, logPos uint64) *BinlogEntry {
|
|||||||
return binlogEntry
|
return binlogEntry
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewBinlogEntry creates an empty, ready to go BinlogEntry object
|
// NewBinlogEntryAt creates an empty, ready to go BinlogEntry object
|
||||||
func NewBinlogEntryAt(coordinates mysql.BinlogCoordinates) *BinlogEntry {
|
func NewBinlogEntryAt(coordinates mysql.BinlogCoordinates) *BinlogEntry {
|
||||||
binlogEntry := &BinlogEntry{
|
binlogEntry := &BinlogEntry{
|
||||||
Coordinates: coordinates,
|
Coordinates: coordinates,
|
||||||
@ -41,7 +41,7 @@ func (this *BinlogEntry) Duplicate() *BinlogEntry {
|
|||||||
return binlogEntry
|
return binlogEntry
|
||||||
}
|
}
|
||||||
|
|
||||||
// Duplicate creates and returns a new binlog entry, with some of the attributes pre-assigned
|
// String() returns a string representation of this binlog entry
|
||||||
func (this *BinlogEntry) String() string {
|
func (this *BinlogEntry) String() string {
|
||||||
return fmt.Sprintf("[BinlogEntry at %+v; dml:%+v]", this.Coordinates, this.DmlEvent)
|
return fmt.Sprintf("[BinlogEntry at %+v; dml:%+v]", this.Coordinates, this.DmlEvent)
|
||||||
}
|
}
|
||||||
|
@ -111,7 +111,7 @@ func (this *GoMySQLReader) handleRowsEvent(ev *replication.BinlogEvent, rowsEven
|
|||||||
binlogEntry.DmlEvent.WhereColumnValues = sql.ToColumnValues(row)
|
binlogEntry.DmlEvent.WhereColumnValues = sql.ToColumnValues(row)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// The channel will do the throttling. Whoever is reding from the channel
|
// The channel will do the throttling. Whoever is reading from the channel
|
||||||
// decides whether action is taken synchronously (meaning we wait before
|
// decides whether action is taken synchronously (meaning we wait before
|
||||||
// next iteration) or asynchronously (we keep pushing more events)
|
// next iteration) or asynchronously (we keep pushing more events)
|
||||||
// In reality, reads will be synchronous
|
// In reality, reads will be synchronous
|
||||||
|
@ -126,7 +126,6 @@ func (this *Applier) readTableColumns() (err error) {
|
|||||||
|
|
||||||
// showTableStatus returns the output of `show table status like '...'` command
|
// showTableStatus returns the output of `show table status like '...'` command
|
||||||
func (this *Applier) showTableStatus(tableName string) (rowMap sqlutils.RowMap) {
|
func (this *Applier) showTableStatus(tableName string) (rowMap sqlutils.RowMap) {
|
||||||
rowMap = nil
|
|
||||||
query := fmt.Sprintf(`show /* gh-ost */ table status from %s like '%s'`, sql.EscapeName(this.migrationContext.DatabaseName), tableName)
|
query := fmt.Sprintf(`show /* gh-ost */ table status from %s like '%s'`, sql.EscapeName(this.migrationContext.DatabaseName), tableName)
|
||||||
sqlutils.QueryRowsMap(this.db, query, func(m sqlutils.RowMap) error {
|
sqlutils.QueryRowsMap(this.db, query, func(m sqlutils.RowMap) error {
|
||||||
rowMap = m
|
rowMap = m
|
||||||
@ -482,6 +481,7 @@ func (this *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
defer tx.Rollback()
|
||||||
sessionQuery := fmt.Sprintf(`SET
|
sessionQuery := fmt.Sprintf(`SET
|
||||||
SESSION time_zone = '%s',
|
SESSION time_zone = '%s',
|
||||||
sql_mode = CONCAT(@@session.sql_mode, ',STRICT_ALL_TABLES')
|
sql_mode = CONCAT(@@session.sql_mode, ',STRICT_ALL_TABLES')
|
||||||
@ -1001,15 +1001,19 @@ func (this *Applier) ApplyDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
rollback := func(err error) error {
|
||||||
|
tx.Rollback()
|
||||||
|
return err
|
||||||
|
}
|
||||||
sessionQuery := `SET
|
sessionQuery := `SET
|
||||||
SESSION time_zone = '+00:00',
|
SESSION time_zone = '+00:00',
|
||||||
sql_mode = CONCAT(@@session.sql_mode, ',STRICT_ALL_TABLES')
|
sql_mode = CONCAT(@@session.sql_mode, ',STRICT_ALL_TABLES')
|
||||||
`
|
`
|
||||||
if _, err := tx.Exec(sessionQuery); err != nil {
|
if _, err := tx.Exec(sessionQuery); err != nil {
|
||||||
return err
|
return rollback(err)
|
||||||
}
|
}
|
||||||
if _, err := tx.Exec(buildResult.query, buildResult.args...); err != nil {
|
if _, err := tx.Exec(buildResult.query, buildResult.args...); err != nil {
|
||||||
return err
|
return rollback(err)
|
||||||
}
|
}
|
||||||
if err := tx.Commit(); err != nil {
|
if err := tx.Commit(); err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -699,14 +699,17 @@ func (this *Inspector) getSharedColumns(originalColumns, ghostColumns *sql.Colum
|
|||||||
for _, ghostColumn := range ghostColumns.Names() {
|
for _, ghostColumn := range ghostColumns.Names() {
|
||||||
if strings.EqualFold(originalColumn, ghostColumn) {
|
if strings.EqualFold(originalColumn, ghostColumn) {
|
||||||
isSharedColumn = true
|
isSharedColumn = true
|
||||||
|
break
|
||||||
}
|
}
|
||||||
if strings.EqualFold(columnRenameMap[originalColumn], ghostColumn) {
|
if strings.EqualFold(columnRenameMap[originalColumn], ghostColumn) {
|
||||||
isSharedColumn = true
|
isSharedColumn = true
|
||||||
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
for droppedColumn := range this.migrationContext.DroppedColumnsMap {
|
for droppedColumn := range this.migrationContext.DroppedColumnsMap {
|
||||||
if strings.EqualFold(originalColumn, droppedColumn) {
|
if strings.EqualFold(originalColumn, droppedColumn) {
|
||||||
isSharedColumn = false
|
isSharedColumn = false
|
||||||
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if isSharedColumn {
|
if isSharedColumn {
|
||||||
|
@ -78,7 +78,7 @@ type Migrator struct {
|
|||||||
|
|
||||||
rowCopyCompleteFlag int64
|
rowCopyCompleteFlag int64
|
||||||
// copyRowsQueue should not be buffered; if buffered some non-damaging but
|
// copyRowsQueue should not be buffered; if buffered some non-damaging but
|
||||||
// excessive work happens at the end of the iteration as new copy-jobs arrive befroe realizing the copy is complete
|
// excessive work happens at the end of the iteration as new copy-jobs arrive before realizing the copy is complete
|
||||||
copyRowsQueue chan tableWriteFunc
|
copyRowsQueue chan tableWriteFunc
|
||||||
applyEventsQueue chan *applyEventStruct
|
applyEventsQueue chan *applyEventStruct
|
||||||
|
|
||||||
|
@ -83,9 +83,6 @@ func GetMasterKeyFromSlaveStatus(connectionConfig *ConnectionConfig) (masterKey
|
|||||||
}
|
}
|
||||||
defer db.Close()
|
defer db.Close()
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
err = sqlutils.QueryRowsMap(db, `show slave status`, func(rowMap sqlutils.RowMap) error {
|
err = sqlutils.QueryRowsMap(db, `show slave status`, func(rowMap sqlutils.RowMap) error {
|
||||||
// We wish to recognize the case where the topology's master actually has replication configuration.
|
// We wish to recognize the case where the topology's master actually has replication configuration.
|
||||||
// This can happen when a DBA issues a `RESET SLAVE` instead of `RESET SLAVE ALL`.
|
// This can happen when a DBA issues a `RESET SLAVE` instead of `RESET SLAVE ALL`.
|
||||||
@ -98,7 +95,6 @@ func GetMasterKeyFromSlaveStatus(connectionConfig *ConnectionConfig) (masterKey
|
|||||||
slaveIORunning := rowMap.GetString("Slave_IO_Running")
|
slaveIORunning := rowMap.GetString("Slave_IO_Running")
|
||||||
slaveSQLRunning := rowMap.GetString("Slave_SQL_Running")
|
slaveSQLRunning := rowMap.GetString("Slave_SQL_Running")
|
||||||
|
|
||||||
//
|
|
||||||
if slaveIORunning != "Yes" || slaveSQLRunning != "Yes" {
|
if slaveIORunning != "Yes" || slaveSQLRunning != "Yes" {
|
||||||
return fmt.Errorf("Replication on %+v is broken: Slave_IO_Running: %s, Slave_SQL_Running: %s. Please make sure replication runs before using gh-ost.",
|
return fmt.Errorf("Replication on %+v is broken: Slave_IO_Running: %s, Slave_SQL_Running: %s. Please make sure replication runs before using gh-ost.",
|
||||||
connectionConfig.Key,
|
connectionConfig.Key,
|
||||||
|
@ -140,13 +140,12 @@ func BuildRangeComparison(columns []string, values []string, args []interface{},
|
|||||||
comparisons := []string{}
|
comparisons := []string{}
|
||||||
|
|
||||||
for i, column := range columns {
|
for i, column := range columns {
|
||||||
//
|
|
||||||
value := values[i]
|
value := values[i]
|
||||||
rangeComparison, err := BuildValueComparison(column, value, comparisonSign)
|
rangeComparison, err := BuildValueComparison(column, value, comparisonSign)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", explodedArgs, err
|
return "", explodedArgs, err
|
||||||
}
|
}
|
||||||
if len(columns[0:i]) > 0 {
|
if i > 0 {
|
||||||
equalitiesComparison, err := BuildEqualsComparison(columns[0:i], values[0:i])
|
equalitiesComparison, err := BuildEqualsComparison(columns[0:i], values[0:i])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", explodedArgs, err
|
return "", explodedArgs, err
|
||||||
|
Loading…
Reference in New Issue
Block a user