Safe cut-over

- Supporting multi-step, safe cut-over phase, where queries are blocked throughout the phase, and worst case scenario is table outage (no data corruption)
- Self-rollsback in case of failure (restored original table)
This commit is contained in:
Shlomi Noach 2016-06-14 08:35:07 +02:00
parent e4ed801df5
commit 8292f5608f
2 changed files with 293 additions and 20 deletions

View File

@ -433,26 +433,21 @@ func (this *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected
return chunkSize, rowsAffected, duration, nil return chunkSize, rowsAffected, duration, nil
} }
// LockTables // LockOriginalTable places a write lock on the original table
func (this *Applier) LockTables() error { func (this *Applier) LockOriginalTable() error {
// query := fmt.Sprintf(`lock /* gh-ost */ tables %s.%s write, %s.%s write, %s.%s write`,
// sql.EscapeName(this.migrationContext.DatabaseName),
// sql.EscapeName(this.migrationContext.OriginalTableName),
// sql.EscapeName(this.migrationContext.DatabaseName),
// sql.EscapeName(this.migrationContext.GetGhostTableName()),
// sql.EscapeName(this.migrationContext.DatabaseName),
// sql.EscapeName(this.migrationContext.GetChangelogTableName()),
// )
query := fmt.Sprintf(`lock /* gh-ost */ tables %s.%s write`, query := fmt.Sprintf(`lock /* gh-ost */ tables %s.%s write`,
sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.OriginalTableName), sql.EscapeName(this.migrationContext.OriginalTableName),
) )
log.Infof("Locking tables") log.Infof("Locking %s.%s",
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.OriginalTableName),
)
this.migrationContext.LockTablesStartTime = time.Now() this.migrationContext.LockTablesStartTime = time.Now()
if _, err := sqlutils.ExecNoPrepare(this.singletonDB, query); err != nil { if _, err := sqlutils.ExecNoPrepare(this.singletonDB, query); err != nil {
return err return err
} }
log.Infof("Tables locked") log.Infof("Table locked")
return nil return nil
} }
@ -494,6 +489,21 @@ func (this *Applier) SwapTablesQuickAndBumpy() error {
return nil return nil
} }
func (this *Applier) RenameTable(fromName, toName string) (err error) {
query := fmt.Sprintf(`rename /* gh-ost */ table %s.%s to %s.%s`,
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(fromName),
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(toName),
)
log.Infof("Renaming %s to %s", fromName, toName)
if _, err := sqlutils.ExecNoPrepare(this.db, query); err != nil {
return log.Errore(err)
}
log.Infof("Table renamed")
return nil
}
// SwapTablesAtomic issues a single two-table RENAME statement to swap ghost table // SwapTablesAtomic issues a single two-table RENAME statement to swap ghost table
// into original's place // into original's place
func (this *Applier) SwapTablesAtomic(sessionIdChan chan int64) error { func (this *Applier) SwapTablesAtomic(sessionIdChan chan int64) error {
@ -587,6 +597,144 @@ func (this *Applier) StopSlaveNicely() error {
return nil return nil
} }
func (this *Applier) GetSessionLockName(sessionId int64) string {
return fmt.Sprintf("gh-ost.%d.lock", sessionId)
}
// LockOriginalTableAndWait locks the original table, notifies the lock is in
// place, and awaits further instruction
func (this *Applier) LockOriginalTableAndWait(sessionIdChan chan int64, tableLocked chan<- error, okToUnlockTable <-chan bool, tableUnlocked chan<- error) error {
tx, err := this.db.Begin()
if err != nil {
tableLocked <- err
return err
}
defer func() {
tx.Rollback()
}()
var sessionId int64
if err := tx.QueryRow(`select connection_id()`).Scan(&sessionId); err != nil {
return err
}
sessionIdChan <- sessionId
query := `select get_lock(?, 0)`
lockResult := 0
lockName := this.GetSessionLockName(sessionId)
log.Infof("Grabbing voluntary lock: %s", lockName)
if err := tx.QueryRow(query, lockName).Scan(&lockResult); err != nil || lockResult != 1 {
return fmt.Errorf("Unable to acquire lock %s", lockName)
}
query = fmt.Sprintf(`lock /* gh-ost */ tables %s.%s write`,
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.OriginalTableName),
)
log.Infof("Locking %s.%s",
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.OriginalTableName),
)
this.migrationContext.LockTablesStartTime = time.Now()
if _, err := tx.Exec(query); err != nil {
tableLocked <- err
return err
}
log.Infof("Table locked")
tableLocked <- nil // No error.
// The cut-over phase will proceed to apply remaining backlon onto ghost table,
// and issue RENAMEs. We wait here until told to proceed.
<-okToUnlockTable
// Release
query = `unlock tables`
log.Infof("Releasing lock from %s.%s",
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.OriginalTableName),
)
if _, err := tx.Exec(query); err != nil {
tableUnlocked <- err
return log.Errore(err)
}
log.Infof("Table unlocked")
tableUnlocked <- nil
return nil
}
// RenameOriginalTable will attempt renaming the original table into _old
func (this *Applier) RenameOriginalTable(sessionIdChan chan int64, originalTableRenamed chan<- error) error {
tx, err := this.db.Begin()
if err != nil {
return err
}
defer func() {
tx.Rollback()
originalTableRenamed <- nil
}()
var sessionId int64
if err := tx.QueryRow(`select connection_id()`).Scan(&sessionId); err != nil {
return err
}
sessionIdChan <- sessionId
log.Infof("Setting RENAME timeout as %d seconds", this.migrationContext.SwapTablesTimeoutSeconds)
query := fmt.Sprintf(`set session lock_wait_timeout:=%d`, this.migrationContext.SwapTablesTimeoutSeconds)
if _, err := tx.Exec(query); err != nil {
return err
}
query = fmt.Sprintf(`rename /* gh-ost */ table %s.%s to %s.%s`,
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.OriginalTableName),
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.GetOldTableName()),
)
log.Infof("Issuing and expecting this to block: %s", query)
if _, err := tx.Exec(query); err != nil {
return log.Errore(err)
}
log.Infof("Original table renamed")
return nil
}
// RenameGhostTable will attempt renaming the ghost table into original
func (this *Applier) RenameGhostTable(sessionIdChan chan int64, ghostTableRenamed chan<- error) error {
tx, err := this.db.Begin()
if err != nil {
return err
}
defer func() {
tx.Rollback()
}()
var sessionId int64
if err := tx.QueryRow(`select connection_id()`).Scan(&sessionId); err != nil {
return err
}
sessionIdChan <- sessionId
log.Infof("Setting RENAME timeout as %d seconds", this.migrationContext.SwapTablesTimeoutSeconds)
query := fmt.Sprintf(`set session lock_wait_timeout:=%d`, this.migrationContext.SwapTablesTimeoutSeconds)
if _, err := tx.Exec(query); err != nil {
return err
}
query = fmt.Sprintf(`rename /* gh-ost */ table %s.%s to %s.%s`,
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.GetGhostTableName()),
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.OriginalTableName),
)
log.Infof("Issuing and expecting this to block: %s", query)
if _, err := tx.Exec(query); err != nil {
ghostTableRenamed <- err
return log.Errore(err)
}
log.Infof("Ghost table renamed")
ghostTableRenamed <- nil
return nil
}
// GrabVoluntaryLock gets a named lock (`GET_LOCK`) and listens // GrabVoluntaryLock gets a named lock (`GET_LOCK`) and listens
// on a okToRelease in order to release it // on a okToRelease in order to release it
func (this *Applier) GrabVoluntaryLock(lockGrabbed chan<- error, okToRelease <-chan bool) error { func (this *Applier) GrabVoluntaryLock(lockGrabbed chan<- error, okToRelease <-chan bool) error {
@ -673,6 +821,17 @@ func (this *Applier) IssueBlockingQueryOnVoluntaryLock(sessionIdChan chan int64)
return nil return nil
} }
func (this *Applier) ExpectUsedLock(sessionId int64) error {
var result int64
query := `select is_used_lock(?)`
lockName := this.GetSessionLockName(sessionId)
log.Infof("Checking session lock: %s", lockName)
if err := this.db.QueryRow(query, lockName).Scan(&result); err != nil || result != sessionId {
return fmt.Errorf("Session lock %s expected to be found but wasn't", lockName)
}
return nil
}
func (this *Applier) ExpectProcess(sessionId int64, stateHint, infoHint string) error { func (this *Applier) ExpectProcess(sessionId int64, stateHint, infoHint string) error {
found := false found := false
query := ` query := `

View File

@ -51,7 +51,6 @@ type Migrator struct {
tablesInPlace chan bool tablesInPlace chan bool
rowCopyComplete chan bool rowCopyComplete chan bool
allEventsUpToLockProcessed chan bool allEventsUpToLockProcessed chan bool
voluntaryLockAcquired chan bool
panicAbort chan error panicAbort chan error
rowCopyCompleteFlag int64 rowCopyCompleteFlag int64
@ -71,7 +70,6 @@ func NewMigrator() *Migrator {
tablesInPlace: make(chan bool), tablesInPlace: make(chan bool),
rowCopyComplete: make(chan bool), rowCopyComplete: make(chan bool),
allEventsUpToLockProcessed: make(chan bool), allEventsUpToLockProcessed: make(chan bool),
voluntaryLockAcquired: make(chan bool, 1),
panicAbort: make(chan error), panicAbort: make(chan error),
allEventsUpToLockProcessedInjectedFlag: 0, allEventsUpToLockProcessedInjectedFlag: 0,
@ -394,6 +392,17 @@ func (this *Migrator) stopWritesAndCompleteMigration() (err error) {
return this.stopWritesAndCompleteMigrationOnReplica() return this.stopWritesAndCompleteMigrationOnReplica()
} }
// Running on master // Running on master
{
// Lock-based solution: we use low timeout and multiple attempts. But for
// each failed attempt, we throttle until replication lag is back to normal
err := this.retryOperation(
func() error {
return this.executeAndThrottleOnError(this.safeCutOver)
},
)
return err
}
if this.migrationContext.CutOverType == base.CutOverTwoStep { if this.migrationContext.CutOverType == base.CutOverTwoStep {
return this.stopWritesAndCompleteMigrationOnMasterQuickAndBumpy() return this.stopWritesAndCompleteMigrationOnMasterQuickAndBumpy()
} }
@ -414,6 +423,8 @@ func (this *Migrator) stopWritesAndCompleteMigration() (err error) {
// Inject the "AllEventsUpToLockProcessed" state hint, wait for it to appear in the binary logs, // Inject the "AllEventsUpToLockProcessed" state hint, wait for it to appear in the binary logs,
// make sure the queue is drained. // make sure the queue is drained.
func (this *Migrator) waitForEventsUpToLock() (err error) { func (this *Migrator) waitForEventsUpToLock() (err error) {
waitForEventsUpToLockStartTime := time.Now()
log.Infof("Writing changelog state: %+v", AllEventsUpToLockProcessed) log.Infof("Writing changelog state: %+v", AllEventsUpToLockProcessed)
if _, err := this.applier.WriteChangelogState(string(AllEventsUpToLockProcessed)); err != nil { if _, err := this.applier.WriteChangelogState(string(AllEventsUpToLockProcessed)); err != nil {
return err return err
@ -421,7 +432,9 @@ func (this *Migrator) waitForEventsUpToLock() (err error) {
log.Infof("Waiting for events up to lock") log.Infof("Waiting for events up to lock")
atomic.StoreInt64(&this.allEventsUpToLockProcessedInjectedFlag, 1) atomic.StoreInt64(&this.allEventsUpToLockProcessedInjectedFlag, 1)
<-this.allEventsUpToLockProcessed <-this.allEventsUpToLockProcessed
log.Infof("Done waiting for events up to lock") waitForEventsUpToLockDuration := time.Now().Sub(waitForEventsUpToLockStartTime)
log.Infof("Done waiting for events up to lock; duration=%+v", waitForEventsUpToLockDuration)
this.printStatus() this.printStatus()
return nil return nil
@ -432,7 +445,7 @@ func (this *Migrator) waitForEventsUpToLock() (err error) {
// There is a point in time where the "original" table does not exist and queries are non-blocked // There is a point in time where the "original" table does not exist and queries are non-blocked
// and failing. // and failing.
func (this *Migrator) stopWritesAndCompleteMigrationOnMasterQuickAndBumpy() (err error) { func (this *Migrator) stopWritesAndCompleteMigrationOnMasterQuickAndBumpy() (err error) {
if err := this.retryOperation(this.applier.LockTables); err != nil { if err := this.retryOperation(this.applier.LockOriginalTable); err != nil {
return err return err
} }
@ -515,6 +528,107 @@ func (this *Migrator) stopWritesAndCompleteMigrationOnMasterViaLock() (err error
return nil return nil
} }
// cutOverSafe performs a safe cut over, where normally (no failure) the original table
// is being locked until swapped, hence DML queries being locked and unaware of the cut-over.
// In the worst case, there will ba a minor outage, where the original table would not exist.
func (this *Migrator) safeCutOver() (err error) {
okToUnlockTable := make(chan bool, 2)
originalTableRenamed := make(chan error, 1)
defer func() {
// The following is to make sure we unlock the table no-matter-what!
// There's enough buffer in the channel to support a redundant write here.
okToUnlockTable <- true
// We need to make sure we wait for the original-rename, successful or not,
// so as to be able to rollback in case the ghost-rename fails.
<-originalTableRenamed
// Rollback operation
if !this.applier.tableExists(this.migrationContext.OriginalTableName) {
log.Infof("Cannot find %s, rolling back", this.migrationContext.OriginalTableName)
err := this.applier.RenameTable(this.migrationContext.GetOldTableName(), this.migrationContext.OriginalTableName)
log.Errore(err)
}
}()
lockOriginalSessionIdChan := make(chan int64, 1)
tableLocked := make(chan error, 1)
tableUnlocked := make(chan error, 1)
go func() {
if err := this.applier.LockOriginalTableAndWait(lockOriginalSessionIdChan, tableLocked, okToUnlockTable, tableUnlocked); err != nil {
log.Errore(err)
}
}()
if err := <-tableLocked; err != nil {
return log.Errore(err)
}
lockOriginalSessionId := <-lockOriginalSessionIdChan
log.Infof("Session locking original table is %+v", lockOriginalSessionId)
// At this point we know the table is locked.
// We know any newly incoming DML on original table is blocked.
this.waitForEventsUpToLock()
// Step 2
// We now attempt a RENAME on the original table, and expect it to block
renameOriginalSessionIdChan := make(chan int64, 1)
this.migrationContext.RenameTablesStartTime = time.Now()
go func() {
this.applier.RenameOriginalTable(renameOriginalSessionIdChan, originalTableRenamed)
}()
renameOriginalSessionId := <-renameOriginalSessionIdChan
log.Infof("Session renaming original table is %+v", renameOriginalSessionId)
if err := this.retryOperation(
func() error {
return this.applier.ExpectProcess(renameOriginalSessionId, "metadata lock", "rename")
}); err != nil {
return err
}
log.Infof("Found RENAME on original table to be blocking, as expected. Double checking original is still being locked")
if err := this.applier.ExpectUsedLock(lockOriginalSessionId); err != nil {
// Abort operation; but make sure to unlock table!
return log.Errore(err)
}
log.Infof("Connection holding lock on original table still exists")
// Now that we've found the RENAME blocking, AND the locking connection still alive,
// we know it is safe to proceed to renaming ghost table.
// Step 3
// We now attempt a RENAME on the ghost table, and expect it to block
renameGhostSessionIdChan := make(chan int64, 1)
ghostTableRenamed := make(chan error, 1)
go func() {
this.applier.RenameGhostTable(renameGhostSessionIdChan, ghostTableRenamed)
}()
renameGhostSessionId := <-renameGhostSessionIdChan
log.Infof("Session renaming ghost table is %+v", renameGhostSessionId)
if err := this.retryOperation(
func() error {
return this.applier.ExpectProcess(renameGhostSessionId, "metadata lock", "rename")
}); err != nil {
return err
}
log.Infof("Found RENAME on ghost table to be blocking, as expected. Will next release lock on original table")
// Step 4
okToUnlockTable <- true
// BAM! original table lock is released, RENAME original->old released,
// RENAME ghost->original is released, queries on original are unblocked.
// (that is, assuming all went well)
if err := <-tableUnlocked; err != nil {
return log.Errore(err)
}
if err := <-ghostTableRenamed; err != nil {
return log.Errore(err)
}
this.migrationContext.RenameTablesEndTime = time.Now()
// ooh nice! We're actually truly and thankfully done
lockAndRenameDuration := this.migrationContext.RenameTablesEndTime.Sub(this.migrationContext.LockTablesStartTime)
log.Infof("Lock & rename duration: %s. During this time, queries on %s were blocked", lockAndRenameDuration, sql.EscapeName(this.migrationContext.OriginalTableName))
return nil
}
// stopWritesAndCompleteMigrationOnReplica will stop replication IO thread, apply // stopWritesAndCompleteMigrationOnReplica will stop replication IO thread, apply
// what DML events are left, and that's it. // what DML events are left, and that's it.
// This only applies in --test-on-replica. It leaves replication stopped, with both tables // This only applies in --test-on-replica. It leaves replication stopped, with both tables
@ -831,6 +945,10 @@ func (this *Migrator) initiateApplier() error {
if err := this.applier.ValidateOrDropExistingTables(); err != nil { if err := this.applier.ValidateOrDropExistingTables(); err != nil {
return err return err
} }
if err := this.applier.CreateChangelogTable(); err != nil {
log.Errorf("Unable to create changelog table, see further error details. Perhaps a previous migration failed without dropping the table? OR is there a running migration? Bailing out")
return err
}
if err := this.applier.CreateGhostTable(); err != nil { if err := this.applier.CreateGhostTable(); err != nil {
log.Errorf("Unable to create ghost table, see further error details. Perhaps a previous migration failed without dropping the table? Bailing out") log.Errorf("Unable to create ghost table, see further error details. Perhaps a previous migration failed without dropping the table? Bailing out")
return err return err
@ -839,10 +957,6 @@ func (this *Migrator) initiateApplier() error {
log.Errorf("Unable to ALTER ghost table, see further error details. Bailing out") log.Errorf("Unable to ALTER ghost table, see further error details. Bailing out")
return err return err
} }
if err := this.applier.CreateChangelogTable(); err != nil {
log.Errorf("Unable to create changelog table, see further error details. Perhaps a previous migration failed without dropping the table? OR is there a running migration? Bailing out")
return err
}
this.applier.WriteChangelogState(string(TablesInPlace)) this.applier.WriteChangelogState(string(TablesInPlace))
go this.applier.InitiateHeartbeat(heartbeatIntervalMilliseconds) go this.applier.InitiateHeartbeat(heartbeatIntervalMilliseconds)