Merge pull request #283 from github/validate-table-master-replica

validating table structure on applier and migrator
This commit is contained in:
Shlomi Noach 2016-10-24 07:44:42 +02:00 committed by GitHub
commit b718bf6938
6 changed files with 63 additions and 39 deletions

View File

@ -2,7 +2,7 @@
# #
# #
RELEASE_VERSION="1.0.23" RELEASE_VERSION="1.0.26"
function build { function build {
osname=$1 osname=$1

View File

@ -147,6 +147,7 @@ type MigrationContext struct {
UserCommandedUnpostponeFlag int64 UserCommandedUnpostponeFlag int64
PanicAbort chan error PanicAbort chan error
OriginalTableColumnsOnApplier *sql.ColumnList
OriginalTableColumns *sql.ColumnList OriginalTableColumns *sql.ColumnList
OriginalTableUniqueKeys [](*sql.UniqueKey) OriginalTableUniqueKeys [](*sql.UniqueKey)
GhostTableColumns *sql.ColumnList GhostTableColumns *sql.ColumnList

View File

@ -67,6 +67,9 @@ func (this *Applier) InitDBConnections() (err error) {
} else { } else {
this.connectionConfig.ImpliedKey = impliedKey this.connectionConfig.ImpliedKey = impliedKey
} }
if err := this.readTableColumns(); err != nil {
return err
}
return nil return nil
} }
@ -95,6 +98,16 @@ func (this *Applier) validateAndReadTimeZone() error {
return nil return nil
} }
// readTableColumns reads table columns on applier
func (this *Applier) readTableColumns() (err error) {
log.Infof("Examining table structure on applier")
this.migrationContext.OriginalTableColumnsOnApplier, err = mysql.GetTableColumns(this.db, this.migrationContext.DatabaseName, this.migrationContext.OriginalTableName)
if err != nil {
return err
}
return nil
}
// showTableStatus returns the output of `show table status like '...'` command // showTableStatus returns the output of `show table status like '...'` command
func (this *Applier) showTableStatus(tableName string) (rowMap sqlutils.RowMap) { func (this *Applier) showTableStatus(tableName string) (rowMap sqlutils.RowMap) {
rowMap = nil rowMap = nil

View File

@ -8,6 +8,7 @@ package logic
import ( import (
gosql "database/sql" gosql "database/sql"
"fmt" "fmt"
"reflect"
"strings" "strings"
"sync/atomic" "sync/atomic"
@ -83,7 +84,7 @@ func (this *Inspector) InspectTableColumnsAndUniqueKeys(tableName string) (colum
if len(uniqueKeys) == 0 { if len(uniqueKeys) == 0 {
return columns, uniqueKeys, fmt.Errorf("No PRIMARY nor UNIQUE key found in table! Bailing out") return columns, uniqueKeys, fmt.Errorf("No PRIMARY nor UNIQUE key found in table! Bailing out")
} }
columns, err = this.getTableColumns(this.migrationContext.DatabaseName, tableName) columns, err = mysql.GetTableColumns(this.db, this.migrationContext.DatabaseName, tableName)
if err != nil { if err != nil {
return columns, uniqueKeys, err return columns, uniqueKeys, err
} }
@ -99,9 +100,15 @@ func (this *Inspector) InspectOriginalTable() (err error) {
return nil return nil
} }
// InspectOriginalAndGhostTables compares original and ghost tables to see whether the migration // inspectOriginalAndGhostTables compares original and ghost tables to see whether the migration
// makes sense and is valid. It extracts the list of shared columns and the chosen migration unique key // makes sense and is valid. It extracts the list of shared columns and the chosen migration unique key
func (this *Inspector) InspectOriginalAndGhostTables() (err error) { func (this *Inspector) inspectOriginalAndGhostTables() (err error) {
originalNamesOnApplier := this.migrationContext.OriginalTableColumnsOnApplier.Names()
originalNames := this.migrationContext.OriginalTableColumns.Names()
if !reflect.DeepEqual(originalNames, originalNamesOnApplier) {
return fmt.Errorf("It seems like table structure is not identical between master and replica. This scenario is not supported.")
}
this.migrationContext.GhostTableColumns, this.migrationContext.GhostTableUniqueKeys, err = this.InspectTableColumnsAndUniqueKeys(this.migrationContext.GetGhostTableName()) this.migrationContext.GhostTableColumns, this.migrationContext.GhostTableUniqueKeys, err = this.InspectTableColumnsAndUniqueKeys(this.migrationContext.GetGhostTableName())
if err != nil { if err != nil {
return err return err
@ -478,31 +485,6 @@ func (this *Inspector) CountTableRows() error {
return nil return nil
} }
// getTableColumns reads column list from given table
func (this *Inspector) getTableColumns(databaseName, tableName string) (*sql.ColumnList, error) {
query := fmt.Sprintf(`
show columns from %s.%s
`,
sql.EscapeName(databaseName),
sql.EscapeName(tableName),
)
columnNames := []string{}
err := sqlutils.QueryRowsMap(this.db, query, func(rowMap sqlutils.RowMap) error {
columnNames = append(columnNames, rowMap.GetString("Field"))
return nil
})
if err != nil {
return nil, err
}
if len(columnNames) == 0 {
return nil, log.Errorf("Found 0 columns on %s.%s. Bailing out",
sql.EscapeName(databaseName),
sql.EscapeName(tableName),
)
}
return sql.NewColumnList(columnNames), nil
}
// applyColumnTypes // applyColumnTypes
func (this *Inspector) applyColumnTypes(databaseName, tableName string, columnsLists ...*sql.ColumnList) error { func (this *Inspector) applyColumnTypes(databaseName, tableName string, columnsLists ...*sql.ColumnList) error {
query := ` query := `

View File

@ -26,7 +26,7 @@ import (
type ChangelogState string type ChangelogState string
const ( const (
TablesInPlace ChangelogState = "TablesInPlace" GhostTableMigrated ChangelogState = "GhostTableMigrated"
AllEventsUpToLockProcessed = "AllEventsUpToLockProcessed" AllEventsUpToLockProcessed = "AllEventsUpToLockProcessed"
) )
@ -58,7 +58,7 @@ type Migrator struct {
migrationContext *base.MigrationContext migrationContext *base.MigrationContext
firstThrottlingCollected chan bool firstThrottlingCollected chan bool
tablesInPlace chan bool ghostTableMigrated chan bool
rowCopyComplete chan bool rowCopyComplete chan bool
allEventsUpToLockProcessed chan bool allEventsUpToLockProcessed chan bool
@ -76,7 +76,7 @@ func NewMigrator() *Migrator {
migrator := &Migrator{ migrator := &Migrator{
migrationContext: base.GetMigrationContext(), migrationContext: base.GetMigrationContext(),
parser: sql.NewParser(), parser: sql.NewParser(),
tablesInPlace: make(chan bool), ghostTableMigrated: make(chan bool),
firstThrottlingCollected: make(chan bool, 1), firstThrottlingCollected: make(chan bool, 1),
rowCopyComplete: make(chan bool), rowCopyComplete: make(chan bool),
allEventsUpToLockProcessed: make(chan bool), allEventsUpToLockProcessed: make(chan bool),
@ -182,9 +182,9 @@ func (this *Migrator) onChangelogStateEvent(dmlEvent *binlog.BinlogDMLEvent) (er
} }
changelogState := ChangelogState(dmlEvent.NewColumnValues.StringColumn(3)) changelogState := ChangelogState(dmlEvent.NewColumnValues.StringColumn(3))
switch changelogState { switch changelogState {
case TablesInPlace: case GhostTableMigrated:
{ {
this.tablesInPlace <- true this.ghostTableMigrated <- true
} }
case AllEventsUpToLockProcessed: case AllEventsUpToLockProcessed:
{ {
@ -291,14 +291,14 @@ func (this *Migrator) Migrate() (err error) {
return err return err
} }
log.Infof("Waiting for tables to be in place") log.Infof("Waiting for ghost table to be migrated")
<-this.tablesInPlace <-this.ghostTableMigrated
log.Debugf("Tables are in place") log.Debugf("ghost table migrated")
// Yay! We now know the Ghost and Changelog tables are good to examine! // Yay! We now know the Ghost and Changelog tables are good to examine!
// When running on replica, this means the replica has those tables. When running // When running on replica, this means the replica has those tables. When running
// on master this is always true, of course, and yet it also implies this knowledge // on master this is always true, of course, and yet it also implies this knowledge
// is in the binlogs. // is in the binlogs.
if err := this.inspector.InspectOriginalAndGhostTables(); err != nil { if err := this.inspector.inspectOriginalAndGhostTables(); err != nil {
return err return err
} }
// Validation complete! We're good to execute this migration // Validation complete! We're good to execute this migration
@ -926,12 +926,13 @@ func (this *Migrator) initiateApplier() error {
log.Errorf("Unable to create ghost table, see further error details. Perhaps a previous migration failed without dropping the table? Bailing out") log.Errorf("Unable to create ghost table, see further error details. Perhaps a previous migration failed without dropping the table? Bailing out")
return err return err
} }
if err := this.applier.AlterGhost(); err != nil { if err := this.applier.AlterGhost(); err != nil {
log.Errorf("Unable to ALTER ghost table, see further error details. Bailing out") log.Errorf("Unable to ALTER ghost table, see further error details. Bailing out")
return err return err
} }
this.applier.WriteChangelogState(string(TablesInPlace)) this.applier.WriteChangelogState(string(GhostTableMigrated))
go this.applier.InitiateHeartbeat() go this.applier.InitiateHeartbeat()
return nil return nil
} }

View File

@ -10,6 +10,8 @@ import (
"fmt" "fmt"
"time" "time"
"github.com/github/gh-ost/go/sql"
"github.com/outbrain/golib/log" "github.com/outbrain/golib/log"
"github.com/outbrain/golib/sqlutils" "github.com/outbrain/golib/sqlutils"
) )
@ -149,3 +151,28 @@ func GetInstanceKey(db *gosql.DB) (instanceKey *InstanceKey, err error) {
err = db.QueryRow(`select @@global.hostname, @@global.port`).Scan(&instanceKey.Hostname, &instanceKey.Port) err = db.QueryRow(`select @@global.hostname, @@global.port`).Scan(&instanceKey.Hostname, &instanceKey.Port)
return instanceKey, err return instanceKey, err
} }
// GetTableColumns reads column list from given table
func GetTableColumns(db *gosql.DB, databaseName, tableName string) (*sql.ColumnList, error) {
query := fmt.Sprintf(`
show columns from %s.%s
`,
sql.EscapeName(databaseName),
sql.EscapeName(tableName),
)
columnNames := []string{}
err := sqlutils.QueryRowsMap(db, query, func(rowMap sqlutils.RowMap) error {
columnNames = append(columnNames, rowMap.GetString("Field"))
return nil
})
if err != nil {
return nil, err
}
if len(columnNames) == 0 {
return nil, log.Errorf("Found 0 columns on %s.%s. Bailing out",
sql.EscapeName(databaseName),
sql.EscapeName(tableName),
)
}
return sql.NewColumnList(columnNames), nil
}