validating table structure on applier and migrator
- reading column list on applier - comparing original table on applier and migrator, expecting exact column list - or else bailing out
This commit is contained in:
parent
8fbff6519f
commit
bf92eec214
2
build.sh
2
build.sh
@ -2,7 +2,7 @@
|
||||
#
|
||||
#
|
||||
|
||||
RELEASE_VERSION="1.0.23"
|
||||
RELEASE_VERSION="1.0.26"
|
||||
|
||||
function build {
|
||||
osname=$1
|
||||
|
@ -147,6 +147,7 @@ type MigrationContext struct {
|
||||
UserCommandedUnpostponeFlag int64
|
||||
PanicAbort chan error
|
||||
|
||||
OriginalTableColumnsOnApplier *sql.ColumnList
|
||||
OriginalTableColumns *sql.ColumnList
|
||||
OriginalTableUniqueKeys [](*sql.UniqueKey)
|
||||
GhostTableColumns *sql.ColumnList
|
||||
|
@ -67,6 +67,9 @@ func (this *Applier) InitDBConnections() (err error) {
|
||||
} else {
|
||||
this.connectionConfig.ImpliedKey = impliedKey
|
||||
}
|
||||
if err := this.readTableColumns(); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -95,6 +98,16 @@ func (this *Applier) validateAndReadTimeZone() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// readTableColumns reads table columns on applier
|
||||
func (this *Applier) readTableColumns() (err error) {
|
||||
log.Infof("Examining table structure on applier")
|
||||
this.migrationContext.OriginalTableColumnsOnApplier, err = mysql.GetTableColumns(this.db, this.migrationContext.DatabaseName, this.migrationContext.OriginalTableName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// showTableStatus returns the output of `show table status like '...'` command
|
||||
func (this *Applier) showTableStatus(tableName string) (rowMap sqlutils.RowMap) {
|
||||
rowMap = nil
|
||||
|
@ -8,6 +8,7 @@ package logic
|
||||
import (
|
||||
gosql "database/sql"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"strings"
|
||||
"sync/atomic"
|
||||
|
||||
@ -83,7 +84,7 @@ func (this *Inspector) InspectTableColumnsAndUniqueKeys(tableName string) (colum
|
||||
if len(uniqueKeys) == 0 {
|
||||
return columns, uniqueKeys, fmt.Errorf("No PRIMARY nor UNIQUE key found in table! Bailing out")
|
||||
}
|
||||
columns, err = this.getTableColumns(this.migrationContext.DatabaseName, tableName)
|
||||
columns, err = mysql.GetTableColumns(this.db, this.migrationContext.DatabaseName, tableName)
|
||||
if err != nil {
|
||||
return columns, uniqueKeys, err
|
||||
}
|
||||
@ -99,9 +100,15 @@ func (this *Inspector) InspectOriginalTable() (err error) {
|
||||
return nil
|
||||
}
|
||||
|
||||
// InspectOriginalAndGhostTables compares original and ghost tables to see whether the migration
|
||||
// inspectOriginalAndGhostTables compares original and ghost tables to see whether the migration
|
||||
// makes sense and is valid. It extracts the list of shared columns and the chosen migration unique key
|
||||
func (this *Inspector) InspectOriginalAndGhostTables() (err error) {
|
||||
func (this *Inspector) inspectOriginalAndGhostTables() (err error) {
|
||||
originalNamesOnApplier := this.migrationContext.OriginalTableColumnsOnApplier.Names()
|
||||
originalNames := this.migrationContext.OriginalTableColumns.Names()
|
||||
if !reflect.DeepEqual(originalNames, originalNamesOnApplier) {
|
||||
return fmt.Errorf("It seems like table structure is not identical between master and replica. This scenario is not supported.")
|
||||
}
|
||||
|
||||
this.migrationContext.GhostTableColumns, this.migrationContext.GhostTableUniqueKeys, err = this.InspectTableColumnsAndUniqueKeys(this.migrationContext.GetGhostTableName())
|
||||
if err != nil {
|
||||
return err
|
||||
@ -478,31 +485,6 @@ func (this *Inspector) CountTableRows() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// getTableColumns reads column list from given table
|
||||
func (this *Inspector) getTableColumns(databaseName, tableName string) (*sql.ColumnList, error) {
|
||||
query := fmt.Sprintf(`
|
||||
show columns from %s.%s
|
||||
`,
|
||||
sql.EscapeName(databaseName),
|
||||
sql.EscapeName(tableName),
|
||||
)
|
||||
columnNames := []string{}
|
||||
err := sqlutils.QueryRowsMap(this.db, query, func(rowMap sqlutils.RowMap) error {
|
||||
columnNames = append(columnNames, rowMap.GetString("Field"))
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(columnNames) == 0 {
|
||||
return nil, log.Errorf("Found 0 columns on %s.%s. Bailing out",
|
||||
sql.EscapeName(databaseName),
|
||||
sql.EscapeName(tableName),
|
||||
)
|
||||
}
|
||||
return sql.NewColumnList(columnNames), nil
|
||||
}
|
||||
|
||||
// applyColumnTypes
|
||||
func (this *Inspector) applyColumnTypes(databaseName, tableName string, columnsLists ...*sql.ColumnList) error {
|
||||
query := `
|
||||
|
@ -26,7 +26,7 @@ import (
|
||||
type ChangelogState string
|
||||
|
||||
const (
|
||||
TablesInPlace ChangelogState = "TablesInPlace"
|
||||
GhostTableMigrated ChangelogState = "GhostTableMigrated"
|
||||
AllEventsUpToLockProcessed = "AllEventsUpToLockProcessed"
|
||||
)
|
||||
|
||||
@ -58,7 +58,7 @@ type Migrator struct {
|
||||
migrationContext *base.MigrationContext
|
||||
|
||||
firstThrottlingCollected chan bool
|
||||
tablesInPlace chan bool
|
||||
ghostTableMigrated chan bool
|
||||
rowCopyComplete chan bool
|
||||
allEventsUpToLockProcessed chan bool
|
||||
|
||||
@ -76,7 +76,7 @@ func NewMigrator() *Migrator {
|
||||
migrator := &Migrator{
|
||||
migrationContext: base.GetMigrationContext(),
|
||||
parser: sql.NewParser(),
|
||||
tablesInPlace: make(chan bool),
|
||||
ghostTableMigrated: make(chan bool),
|
||||
firstThrottlingCollected: make(chan bool, 1),
|
||||
rowCopyComplete: make(chan bool),
|
||||
allEventsUpToLockProcessed: make(chan bool),
|
||||
@ -182,9 +182,9 @@ func (this *Migrator) onChangelogStateEvent(dmlEvent *binlog.BinlogDMLEvent) (er
|
||||
}
|
||||
changelogState := ChangelogState(dmlEvent.NewColumnValues.StringColumn(3))
|
||||
switch changelogState {
|
||||
case TablesInPlace:
|
||||
case GhostTableMigrated:
|
||||
{
|
||||
this.tablesInPlace <- true
|
||||
this.ghostTableMigrated <- true
|
||||
}
|
||||
case AllEventsUpToLockProcessed:
|
||||
{
|
||||
@ -291,14 +291,14 @@ func (this *Migrator) Migrate() (err error) {
|
||||
return err
|
||||
}
|
||||
|
||||
log.Infof("Waiting for tables to be in place")
|
||||
<-this.tablesInPlace
|
||||
log.Debugf("Tables are in place")
|
||||
log.Infof("Waiting for ghost table to be migrated")
|
||||
<-this.ghostTableMigrated
|
||||
log.Debugf("ghost table migrated")
|
||||
// Yay! We now know the Ghost and Changelog tables are good to examine!
|
||||
// When running on replica, this means the replica has those tables. When running
|
||||
// on master this is always true, of course, and yet it also implies this knowledge
|
||||
// is in the binlogs.
|
||||
if err := this.inspector.InspectOriginalAndGhostTables(); err != nil {
|
||||
if err := this.inspector.inspectOriginalAndGhostTables(); err != nil {
|
||||
return err
|
||||
}
|
||||
// Validation complete! We're good to execute this migration
|
||||
@ -926,12 +926,13 @@ func (this *Migrator) initiateApplier() error {
|
||||
log.Errorf("Unable to create ghost table, see further error details. Perhaps a previous migration failed without dropping the table? Bailing out")
|
||||
return err
|
||||
}
|
||||
|
||||
if err := this.applier.AlterGhost(); err != nil {
|
||||
log.Errorf("Unable to ALTER ghost table, see further error details. Bailing out")
|
||||
return err
|
||||
}
|
||||
|
||||
this.applier.WriteChangelogState(string(TablesInPlace))
|
||||
this.applier.WriteChangelogState(string(GhostTableMigrated))
|
||||
go this.applier.InitiateHeartbeat()
|
||||
return nil
|
||||
}
|
||||
|
@ -10,6 +10,8 @@ import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/github/gh-ost/go/sql"
|
||||
|
||||
"github.com/outbrain/golib/log"
|
||||
"github.com/outbrain/golib/sqlutils"
|
||||
)
|
||||
@ -149,3 +151,28 @@ func GetInstanceKey(db *gosql.DB) (instanceKey *InstanceKey, err error) {
|
||||
err = db.QueryRow(`select @@global.hostname, @@global.port`).Scan(&instanceKey.Hostname, &instanceKey.Port)
|
||||
return instanceKey, err
|
||||
}
|
||||
|
||||
// GetTableColumns reads column list from given table
|
||||
func GetTableColumns(db *gosql.DB, databaseName, tableName string) (*sql.ColumnList, error) {
|
||||
query := fmt.Sprintf(`
|
||||
show columns from %s.%s
|
||||
`,
|
||||
sql.EscapeName(databaseName),
|
||||
sql.EscapeName(tableName),
|
||||
)
|
||||
columnNames := []string{}
|
||||
err := sqlutils.QueryRowsMap(db, query, func(rowMap sqlutils.RowMap) error {
|
||||
columnNames = append(columnNames, rowMap.GetString("Field"))
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(columnNames) == 0 {
|
||||
return nil, log.Errorf("Found 0 columns on %s.%s. Bailing out",
|
||||
sql.EscapeName(databaseName),
|
||||
sql.EscapeName(tableName),
|
||||
)
|
||||
}
|
||||
return sql.NewColumnList(columnNames), nil
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user