From 8292f5608f4f8af1c345b40d9c1245b0cbf7df22 Mon Sep 17 00:00:00 2001 From: Shlomi Noach Date: Tue, 14 Jun 2016 08:35:07 +0200 Subject: [PATCH] Safe cut-over - Supporting multi-step, safe cut-over phase, where queries are blocked throughout the phase, and worst case scenario is table outage (no data corruption) - Self-rollsback in case of failure (restored original table) --- go/logic/applier.go | 183 ++++++++++++++++++++++++++++++++++++++++--- go/logic/migrator.go | 130 ++++++++++++++++++++++++++++-- 2 files changed, 293 insertions(+), 20 deletions(-) diff --git a/go/logic/applier.go b/go/logic/applier.go index dcf2539..02468da 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -433,26 +433,21 @@ func (this *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected return chunkSize, rowsAffected, duration, nil } -// LockTables -func (this *Applier) LockTables() error { - // query := fmt.Sprintf(`lock /* gh-ost */ tables %s.%s write, %s.%s write, %s.%s write`, - // sql.EscapeName(this.migrationContext.DatabaseName), - // sql.EscapeName(this.migrationContext.OriginalTableName), - // sql.EscapeName(this.migrationContext.DatabaseName), - // sql.EscapeName(this.migrationContext.GetGhostTableName()), - // sql.EscapeName(this.migrationContext.DatabaseName), - // sql.EscapeName(this.migrationContext.GetChangelogTableName()), - // ) +// LockOriginalTable places a write lock on the original table +func (this *Applier) LockOriginalTable() error { query := fmt.Sprintf(`lock /* gh-ost */ tables %s.%s write`, sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.OriginalTableName), ) - log.Infof("Locking tables") + log.Infof("Locking %s.%s", + sql.EscapeName(this.migrationContext.DatabaseName), + sql.EscapeName(this.migrationContext.OriginalTableName), + ) this.migrationContext.LockTablesStartTime = time.Now() if _, err := sqlutils.ExecNoPrepare(this.singletonDB, query); err != nil { return err } - log.Infof("Tables locked") + log.Infof("Table locked") return nil } @@ -494,6 +489,21 @@ func (this *Applier) SwapTablesQuickAndBumpy() error { return nil } +func (this *Applier) RenameTable(fromName, toName string) (err error) { + query := fmt.Sprintf(`rename /* gh-ost */ table %s.%s to %s.%s`, + sql.EscapeName(this.migrationContext.DatabaseName), + sql.EscapeName(fromName), + sql.EscapeName(this.migrationContext.DatabaseName), + sql.EscapeName(toName), + ) + log.Infof("Renaming %s to %s", fromName, toName) + if _, err := sqlutils.ExecNoPrepare(this.db, query); err != nil { + return log.Errore(err) + } + log.Infof("Table renamed") + return nil +} + // SwapTablesAtomic issues a single two-table RENAME statement to swap ghost table // into original's place func (this *Applier) SwapTablesAtomic(sessionIdChan chan int64) error { @@ -587,6 +597,144 @@ func (this *Applier) StopSlaveNicely() error { return nil } +func (this *Applier) GetSessionLockName(sessionId int64) string { + return fmt.Sprintf("gh-ost.%d.lock", sessionId) +} + +// LockOriginalTableAndWait locks the original table, notifies the lock is in +// place, and awaits further instruction +func (this *Applier) LockOriginalTableAndWait(sessionIdChan chan int64, tableLocked chan<- error, okToUnlockTable <-chan bool, tableUnlocked chan<- error) error { + tx, err := this.db.Begin() + if err != nil { + tableLocked <- err + return err + } + defer func() { + tx.Rollback() + }() + + var sessionId int64 + if err := tx.QueryRow(`select connection_id()`).Scan(&sessionId); err != nil { + return err + } + sessionIdChan <- sessionId + + query := `select get_lock(?, 0)` + lockResult := 0 + lockName := this.GetSessionLockName(sessionId) + log.Infof("Grabbing voluntary lock: %s", lockName) + if err := tx.QueryRow(query, lockName).Scan(&lockResult); err != nil || lockResult != 1 { + return fmt.Errorf("Unable to acquire lock %s", lockName) + } + + query = fmt.Sprintf(`lock /* gh-ost */ tables %s.%s write`, + sql.EscapeName(this.migrationContext.DatabaseName), + sql.EscapeName(this.migrationContext.OriginalTableName), + ) + log.Infof("Locking %s.%s", + sql.EscapeName(this.migrationContext.DatabaseName), + sql.EscapeName(this.migrationContext.OriginalTableName), + ) + this.migrationContext.LockTablesStartTime = time.Now() + if _, err := tx.Exec(query); err != nil { + tableLocked <- err + return err + } + log.Infof("Table locked") + tableLocked <- nil // No error. + + // The cut-over phase will proceed to apply remaining backlon onto ghost table, + // and issue RENAMEs. We wait here until told to proceed. + <-okToUnlockTable + // Release + query = `unlock tables` + log.Infof("Releasing lock from %s.%s", + sql.EscapeName(this.migrationContext.DatabaseName), + sql.EscapeName(this.migrationContext.OriginalTableName), + ) + if _, err := tx.Exec(query); err != nil { + tableUnlocked <- err + return log.Errore(err) + } + log.Infof("Table unlocked") + tableUnlocked <- nil + return nil +} + +// RenameOriginalTable will attempt renaming the original table into _old +func (this *Applier) RenameOriginalTable(sessionIdChan chan int64, originalTableRenamed chan<- error) error { + tx, err := this.db.Begin() + if err != nil { + return err + } + defer func() { + tx.Rollback() + originalTableRenamed <- nil + }() + var sessionId int64 + if err := tx.QueryRow(`select connection_id()`).Scan(&sessionId); err != nil { + return err + } + sessionIdChan <- sessionId + + log.Infof("Setting RENAME timeout as %d seconds", this.migrationContext.SwapTablesTimeoutSeconds) + query := fmt.Sprintf(`set session lock_wait_timeout:=%d`, this.migrationContext.SwapTablesTimeoutSeconds) + if _, err := tx.Exec(query); err != nil { + return err + } + + query = fmt.Sprintf(`rename /* gh-ost */ table %s.%s to %s.%s`, + sql.EscapeName(this.migrationContext.DatabaseName), + sql.EscapeName(this.migrationContext.OriginalTableName), + sql.EscapeName(this.migrationContext.DatabaseName), + sql.EscapeName(this.migrationContext.GetOldTableName()), + ) + log.Infof("Issuing and expecting this to block: %s", query) + if _, err := tx.Exec(query); err != nil { + return log.Errore(err) + } + log.Infof("Original table renamed") + return nil +} + +// RenameGhostTable will attempt renaming the ghost table into original +func (this *Applier) RenameGhostTable(sessionIdChan chan int64, ghostTableRenamed chan<- error) error { + tx, err := this.db.Begin() + if err != nil { + return err + } + defer func() { + tx.Rollback() + }() + var sessionId int64 + if err := tx.QueryRow(`select connection_id()`).Scan(&sessionId); err != nil { + return err + } + sessionIdChan <- sessionId + + log.Infof("Setting RENAME timeout as %d seconds", this.migrationContext.SwapTablesTimeoutSeconds) + query := fmt.Sprintf(`set session lock_wait_timeout:=%d`, this.migrationContext.SwapTablesTimeoutSeconds) + if _, err := tx.Exec(query); err != nil { + return err + } + + query = fmt.Sprintf(`rename /* gh-ost */ table %s.%s to %s.%s`, + sql.EscapeName(this.migrationContext.DatabaseName), + sql.EscapeName(this.migrationContext.GetGhostTableName()), + sql.EscapeName(this.migrationContext.DatabaseName), + sql.EscapeName(this.migrationContext.OriginalTableName), + ) + log.Infof("Issuing and expecting this to block: %s", query) + if _, err := tx.Exec(query); err != nil { + ghostTableRenamed <- err + return log.Errore(err) + } + log.Infof("Ghost table renamed") + ghostTableRenamed <- nil + + return nil +} + // GrabVoluntaryLock gets a named lock (`GET_LOCK`) and listens // on a okToRelease in order to release it func (this *Applier) GrabVoluntaryLock(lockGrabbed chan<- error, okToRelease <-chan bool) error { @@ -673,6 +821,17 @@ func (this *Applier) IssueBlockingQueryOnVoluntaryLock(sessionIdChan chan int64) return nil } +func (this *Applier) ExpectUsedLock(sessionId int64) error { + var result int64 + query := `select is_used_lock(?)` + lockName := this.GetSessionLockName(sessionId) + log.Infof("Checking session lock: %s", lockName) + if err := this.db.QueryRow(query, lockName).Scan(&result); err != nil || result != sessionId { + return fmt.Errorf("Session lock %s expected to be found but wasn't", lockName) + } + return nil +} + func (this *Applier) ExpectProcess(sessionId int64, stateHint, infoHint string) error { found := false query := ` diff --git a/go/logic/migrator.go b/go/logic/migrator.go index c4dea8c..05e132a 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -51,7 +51,6 @@ type Migrator struct { tablesInPlace chan bool rowCopyComplete chan bool allEventsUpToLockProcessed chan bool - voluntaryLockAcquired chan bool panicAbort chan error rowCopyCompleteFlag int64 @@ -71,7 +70,6 @@ func NewMigrator() *Migrator { tablesInPlace: make(chan bool), rowCopyComplete: make(chan bool), allEventsUpToLockProcessed: make(chan bool), - voluntaryLockAcquired: make(chan bool, 1), panicAbort: make(chan error), allEventsUpToLockProcessedInjectedFlag: 0, @@ -394,6 +392,17 @@ func (this *Migrator) stopWritesAndCompleteMigration() (err error) { return this.stopWritesAndCompleteMigrationOnReplica() } // Running on master + + { + // Lock-based solution: we use low timeout and multiple attempts. But for + // each failed attempt, we throttle until replication lag is back to normal + err := this.retryOperation( + func() error { + return this.executeAndThrottleOnError(this.safeCutOver) + }, + ) + return err + } if this.migrationContext.CutOverType == base.CutOverTwoStep { return this.stopWritesAndCompleteMigrationOnMasterQuickAndBumpy() } @@ -414,6 +423,8 @@ func (this *Migrator) stopWritesAndCompleteMigration() (err error) { // Inject the "AllEventsUpToLockProcessed" state hint, wait for it to appear in the binary logs, // make sure the queue is drained. func (this *Migrator) waitForEventsUpToLock() (err error) { + waitForEventsUpToLockStartTime := time.Now() + log.Infof("Writing changelog state: %+v", AllEventsUpToLockProcessed) if _, err := this.applier.WriteChangelogState(string(AllEventsUpToLockProcessed)); err != nil { return err @@ -421,7 +432,9 @@ func (this *Migrator) waitForEventsUpToLock() (err error) { log.Infof("Waiting for events up to lock") atomic.StoreInt64(&this.allEventsUpToLockProcessedInjectedFlag, 1) <-this.allEventsUpToLockProcessed - log.Infof("Done waiting for events up to lock") + waitForEventsUpToLockDuration := time.Now().Sub(waitForEventsUpToLockStartTime) + + log.Infof("Done waiting for events up to lock; duration=%+v", waitForEventsUpToLockDuration) this.printStatus() return nil @@ -432,7 +445,7 @@ func (this *Migrator) waitForEventsUpToLock() (err error) { // There is a point in time where the "original" table does not exist and queries are non-blocked // and failing. func (this *Migrator) stopWritesAndCompleteMigrationOnMasterQuickAndBumpy() (err error) { - if err := this.retryOperation(this.applier.LockTables); err != nil { + if err := this.retryOperation(this.applier.LockOriginalTable); err != nil { return err } @@ -515,6 +528,107 @@ func (this *Migrator) stopWritesAndCompleteMigrationOnMasterViaLock() (err error return nil } +// cutOverSafe performs a safe cut over, where normally (no failure) the original table +// is being locked until swapped, hence DML queries being locked and unaware of the cut-over. +// In the worst case, there will ba a minor outage, where the original table would not exist. +func (this *Migrator) safeCutOver() (err error) { + okToUnlockTable := make(chan bool, 2) + originalTableRenamed := make(chan error, 1) + defer func() { + // The following is to make sure we unlock the table no-matter-what! + // There's enough buffer in the channel to support a redundant write here. + okToUnlockTable <- true + // We need to make sure we wait for the original-rename, successful or not, + // so as to be able to rollback in case the ghost-rename fails. + <-originalTableRenamed + + // Rollback operation + if !this.applier.tableExists(this.migrationContext.OriginalTableName) { + log.Infof("Cannot find %s, rolling back", this.migrationContext.OriginalTableName) + err := this.applier.RenameTable(this.migrationContext.GetOldTableName(), this.migrationContext.OriginalTableName) + log.Errore(err) + } + }() + lockOriginalSessionIdChan := make(chan int64, 1) + tableLocked := make(chan error, 1) + tableUnlocked := make(chan error, 1) + go func() { + if err := this.applier.LockOriginalTableAndWait(lockOriginalSessionIdChan, tableLocked, okToUnlockTable, tableUnlocked); err != nil { + log.Errore(err) + } + }() + if err := <-tableLocked; err != nil { + return log.Errore(err) + } + lockOriginalSessionId := <-lockOriginalSessionIdChan + log.Infof("Session locking original table is %+v", lockOriginalSessionId) + // At this point we know the table is locked. + // We know any newly incoming DML on original table is blocked. + this.waitForEventsUpToLock() + + // Step 2 + // We now attempt a RENAME on the original table, and expect it to block + renameOriginalSessionIdChan := make(chan int64, 1) + this.migrationContext.RenameTablesStartTime = time.Now() + go func() { + this.applier.RenameOriginalTable(renameOriginalSessionIdChan, originalTableRenamed) + }() + renameOriginalSessionId := <-renameOriginalSessionIdChan + log.Infof("Session renaming original table is %+v", renameOriginalSessionId) + + if err := this.retryOperation( + func() error { + return this.applier.ExpectProcess(renameOriginalSessionId, "metadata lock", "rename") + }); err != nil { + return err + } + log.Infof("Found RENAME on original table to be blocking, as expected. Double checking original is still being locked") + if err := this.applier.ExpectUsedLock(lockOriginalSessionId); err != nil { + // Abort operation; but make sure to unlock table! + return log.Errore(err) + } + log.Infof("Connection holding lock on original table still exists") + + // Now that we've found the RENAME blocking, AND the locking connection still alive, + // we know it is safe to proceed to renaming ghost table. + + // Step 3 + // We now attempt a RENAME on the ghost table, and expect it to block + renameGhostSessionIdChan := make(chan int64, 1) + ghostTableRenamed := make(chan error, 1) + go func() { + this.applier.RenameGhostTable(renameGhostSessionIdChan, ghostTableRenamed) + }() + renameGhostSessionId := <-renameGhostSessionIdChan + log.Infof("Session renaming ghost table is %+v", renameGhostSessionId) + + if err := this.retryOperation( + func() error { + return this.applier.ExpectProcess(renameGhostSessionId, "metadata lock", "rename") + }); err != nil { + return err + } + log.Infof("Found RENAME on ghost table to be blocking, as expected. Will next release lock on original table") + + // Step 4 + okToUnlockTable <- true + // BAM! original table lock is released, RENAME original->old released, + // RENAME ghost->original is released, queries on original are unblocked. + // (that is, assuming all went well) + if err := <-tableUnlocked; err != nil { + return log.Errore(err) + } + if err := <-ghostTableRenamed; err != nil { + return log.Errore(err) + } + this.migrationContext.RenameTablesEndTime = time.Now() + + // ooh nice! We're actually truly and thankfully done + lockAndRenameDuration := this.migrationContext.RenameTablesEndTime.Sub(this.migrationContext.LockTablesStartTime) + log.Infof("Lock & rename duration: %s. During this time, queries on %s were blocked", lockAndRenameDuration, sql.EscapeName(this.migrationContext.OriginalTableName)) + return nil +} + // stopWritesAndCompleteMigrationOnReplica will stop replication IO thread, apply // what DML events are left, and that's it. // This only applies in --test-on-replica. It leaves replication stopped, with both tables @@ -831,6 +945,10 @@ func (this *Migrator) initiateApplier() error { if err := this.applier.ValidateOrDropExistingTables(); err != nil { return err } + if err := this.applier.CreateChangelogTable(); err != nil { + log.Errorf("Unable to create changelog table, see further error details. Perhaps a previous migration failed without dropping the table? OR is there a running migration? Bailing out") + return err + } if err := this.applier.CreateGhostTable(); err != nil { log.Errorf("Unable to create ghost table, see further error details. Perhaps a previous migration failed without dropping the table? Bailing out") return err @@ -839,10 +957,6 @@ func (this *Migrator) initiateApplier() error { log.Errorf("Unable to ALTER ghost table, see further error details. Bailing out") return err } - if err := this.applier.CreateChangelogTable(); err != nil { - log.Errorf("Unable to create changelog table, see further error details. Perhaps a previous migration failed without dropping the table? OR is there a running migration? Bailing out") - return err - } this.applier.WriteChangelogState(string(TablesInPlace)) go this.applier.InitiateHeartbeat(heartbeatIntervalMilliseconds)