diff --git a/build.sh b/build.sh index 947a68d..c12b6a1 100755 --- a/build.sh +++ b/build.sh @@ -2,7 +2,7 @@ # # -RELEASE_VERSION="1.0.23" +RELEASE_VERSION="1.0.26" function build { osname=$1 diff --git a/go/base/context.go b/go/base/context.go index d291717..fe201e4 100644 --- a/go/base/context.go +++ b/go/base/context.go @@ -147,6 +147,7 @@ type MigrationContext struct { UserCommandedUnpostponeFlag int64 PanicAbort chan error + OriginalTableColumnsOnApplier *sql.ColumnList OriginalTableColumns *sql.ColumnList OriginalTableUniqueKeys [](*sql.UniqueKey) GhostTableColumns *sql.ColumnList diff --git a/go/logic/applier.go b/go/logic/applier.go index eb23af8..99f7098 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -67,6 +67,9 @@ func (this *Applier) InitDBConnections() (err error) { } else { this.connectionConfig.ImpliedKey = impliedKey } + if err := this.readTableColumns(); err != nil { + return err + } return nil } @@ -95,6 +98,16 @@ func (this *Applier) validateAndReadTimeZone() error { return nil } +// readTableColumns reads table columns on applier +func (this *Applier) readTableColumns() (err error) { + log.Infof("Examining table structure on applier") + this.migrationContext.OriginalTableColumnsOnApplier, err = mysql.GetTableColumns(this.db, this.migrationContext.DatabaseName, this.migrationContext.OriginalTableName) + if err != nil { + return err + } + return nil +} + // showTableStatus returns the output of `show table status like '...'` command func (this *Applier) showTableStatus(tableName string) (rowMap sqlutils.RowMap) { rowMap = nil diff --git a/go/logic/inspect.go b/go/logic/inspect.go index ceb5c16..79a9c91 100644 --- a/go/logic/inspect.go +++ b/go/logic/inspect.go @@ -8,6 +8,7 @@ package logic import ( gosql "database/sql" "fmt" + "reflect" "strings" "sync/atomic" @@ -83,7 +84,7 @@ func (this *Inspector) InspectTableColumnsAndUniqueKeys(tableName string) (colum if len(uniqueKeys) == 0 { return columns, uniqueKeys, fmt.Errorf("No PRIMARY nor UNIQUE key found in table! Bailing out") } - columns, err = this.getTableColumns(this.migrationContext.DatabaseName, tableName) + columns, err = mysql.GetTableColumns(this.db, this.migrationContext.DatabaseName, tableName) if err != nil { return columns, uniqueKeys, err } @@ -99,9 +100,15 @@ func (this *Inspector) InspectOriginalTable() (err error) { return nil } -// InspectOriginalAndGhostTables compares original and ghost tables to see whether the migration +// inspectOriginalAndGhostTables compares original and ghost tables to see whether the migration // makes sense and is valid. It extracts the list of shared columns and the chosen migration unique key -func (this *Inspector) InspectOriginalAndGhostTables() (err error) { +func (this *Inspector) inspectOriginalAndGhostTables() (err error) { + originalNamesOnApplier := this.migrationContext.OriginalTableColumnsOnApplier.Names() + originalNames := this.migrationContext.OriginalTableColumns.Names() + if !reflect.DeepEqual(originalNames, originalNamesOnApplier) { + return fmt.Errorf("It seems like table structure is not identical between master and replica. This scenario is not supported.") + } + this.migrationContext.GhostTableColumns, this.migrationContext.GhostTableUniqueKeys, err = this.InspectTableColumnsAndUniqueKeys(this.migrationContext.GetGhostTableName()) if err != nil { return err @@ -478,31 +485,6 @@ func (this *Inspector) CountTableRows() error { return nil } -// getTableColumns reads column list from given table -func (this *Inspector) getTableColumns(databaseName, tableName string) (*sql.ColumnList, error) { - query := fmt.Sprintf(` - show columns from %s.%s - `, - sql.EscapeName(databaseName), - sql.EscapeName(tableName), - ) - columnNames := []string{} - err := sqlutils.QueryRowsMap(this.db, query, func(rowMap sqlutils.RowMap) error { - columnNames = append(columnNames, rowMap.GetString("Field")) - return nil - }) - if err != nil { - return nil, err - } - if len(columnNames) == 0 { - return nil, log.Errorf("Found 0 columns on %s.%s. Bailing out", - sql.EscapeName(databaseName), - sql.EscapeName(tableName), - ) - } - return sql.NewColumnList(columnNames), nil -} - // applyColumnTypes func (this *Inspector) applyColumnTypes(databaseName, tableName string, columnsLists ...*sql.ColumnList) error { query := ` diff --git a/go/logic/migrator.go b/go/logic/migrator.go index 7c96871..b0a6246 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -26,7 +26,7 @@ import ( type ChangelogState string const ( - TablesInPlace ChangelogState = "TablesInPlace" + GhostTableMigrated ChangelogState = "GhostTableMigrated" AllEventsUpToLockProcessed = "AllEventsUpToLockProcessed" ) @@ -58,7 +58,7 @@ type Migrator struct { migrationContext *base.MigrationContext firstThrottlingCollected chan bool - tablesInPlace chan bool + ghostTableMigrated chan bool rowCopyComplete chan bool allEventsUpToLockProcessed chan bool @@ -76,7 +76,7 @@ func NewMigrator() *Migrator { migrator := &Migrator{ migrationContext: base.GetMigrationContext(), parser: sql.NewParser(), - tablesInPlace: make(chan bool), + ghostTableMigrated: make(chan bool), firstThrottlingCollected: make(chan bool, 1), rowCopyComplete: make(chan bool), allEventsUpToLockProcessed: make(chan bool), @@ -182,9 +182,9 @@ func (this *Migrator) onChangelogStateEvent(dmlEvent *binlog.BinlogDMLEvent) (er } changelogState := ChangelogState(dmlEvent.NewColumnValues.StringColumn(3)) switch changelogState { - case TablesInPlace: + case GhostTableMigrated: { - this.tablesInPlace <- true + this.ghostTableMigrated <- true } case AllEventsUpToLockProcessed: { @@ -291,14 +291,14 @@ func (this *Migrator) Migrate() (err error) { return err } - log.Infof("Waiting for tables to be in place") - <-this.tablesInPlace - log.Debugf("Tables are in place") + log.Infof("Waiting for ghost table to be migrated") + <-this.ghostTableMigrated + log.Debugf("ghost table migrated") // Yay! We now know the Ghost and Changelog tables are good to examine! // When running on replica, this means the replica has those tables. When running // on master this is always true, of course, and yet it also implies this knowledge // is in the binlogs. - if err := this.inspector.InspectOriginalAndGhostTables(); err != nil { + if err := this.inspector.inspectOriginalAndGhostTables(); err != nil { return err } // Validation complete! We're good to execute this migration @@ -926,12 +926,13 @@ func (this *Migrator) initiateApplier() error { log.Errorf("Unable to create ghost table, see further error details. Perhaps a previous migration failed without dropping the table? Bailing out") return err } + if err := this.applier.AlterGhost(); err != nil { log.Errorf("Unable to ALTER ghost table, see further error details. Bailing out") return err } - this.applier.WriteChangelogState(string(TablesInPlace)) + this.applier.WriteChangelogState(string(GhostTableMigrated)) go this.applier.InitiateHeartbeat() return nil } diff --git a/go/mysql/utils.go b/go/mysql/utils.go index cf1ce54..ae22e3a 100644 --- a/go/mysql/utils.go +++ b/go/mysql/utils.go @@ -10,6 +10,8 @@ import ( "fmt" "time" + "github.com/github/gh-ost/go/sql" + "github.com/outbrain/golib/log" "github.com/outbrain/golib/sqlutils" ) @@ -149,3 +151,28 @@ func GetInstanceKey(db *gosql.DB) (instanceKey *InstanceKey, err error) { err = db.QueryRow(`select @@global.hostname, @@global.port`).Scan(&instanceKey.Hostname, &instanceKey.Port) return instanceKey, err } + +// GetTableColumns reads column list from given table +func GetTableColumns(db *gosql.DB, databaseName, tableName string) (*sql.ColumnList, error) { + query := fmt.Sprintf(` + show columns from %s.%s + `, + sql.EscapeName(databaseName), + sql.EscapeName(tableName), + ) + columnNames := []string{} + err := sqlutils.QueryRowsMap(db, query, func(rowMap sqlutils.RowMap) error { + columnNames = append(columnNames, rowMap.GetString("Field")) + return nil + }) + if err != nil { + return nil, err + } + if len(columnNames) == 0 { + return nil, log.Errorf("Found 0 columns on %s.%s. Bailing out", + sql.EscapeName(databaseName), + sql.EscapeName(tableName), + ) + } + return sql.NewColumnList(columnNames), nil +}