Merge pull request #64 from github/safe-cut-over

Safe cut over
This commit is contained in:
Shlomi Noach 2016-06-15 10:14:07 +02:00 committed by GitHub
commit 574c372fba
5 changed files with 301 additions and 216 deletions

View File

@ -1,7 +1,7 @@
#!/bin/bash #!/bin/bash
# #
# #
RELEASE_VERSION="0.8.9" RELEASE_VERSION="0.9.1"
buildpath=/tmp/gh-ost buildpath=/tmp/gh-ost
target=gh-ost target=gh-ost

View File

@ -31,9 +31,8 @@ const (
type CutOver int type CutOver int
const ( const (
CutOverTwoStep CutOver = iota CutOverSafe CutOver = iota
CutOverVoluntaryLock = iota CutOverTwoStep = iota
CutOverUdfWait = iota
) )
const ( const (
@ -100,6 +99,7 @@ type MigrationContext struct {
isThrottled bool isThrottled bool
throttleReason string throttleReason string
throttleMutex *sync.Mutex throttleMutex *sync.Mutex
IsPostponingCutOver int64
OriginalTableColumns *sql.ColumnList OriginalTableColumns *sql.ColumnList
OriginalTableUniqueKeys [](*sql.UniqueKey) OriginalTableUniqueKeys [](*sql.UniqueKey)

View File

@ -128,12 +128,10 @@ func main() {
log.Fatalf("--allow-on-master and --test-on-replica are mutually exclusive") log.Fatalf("--allow-on-master and --test-on-replica are mutually exclusive")
} }
switch *cutOver { switch *cutOver {
case "safe", "default", "":
migrationContext.CutOverType = base.CutOverSafe
case "two-step": case "two-step":
migrationContext.CutOverType = base.CutOverTwoStep migrationContext.CutOverType = base.CutOverTwoStep
case "voluntary-lock":
migrationContext.CutOverType = base.CutOverVoluntaryLock
case "":
log.Fatalf("--cut-over must be specified")
default: default:
log.Fatalf("Unknown cut-over: %s", *cutOver) log.Fatalf("Unknown cut-over: %s", *cutOver)
} }

View File

@ -433,26 +433,21 @@ func (this *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected
return chunkSize, rowsAffected, duration, nil return chunkSize, rowsAffected, duration, nil
} }
// LockTables // LockOriginalTable places a write lock on the original table
func (this *Applier) LockTables() error { func (this *Applier) LockOriginalTable() error {
// query := fmt.Sprintf(`lock /* gh-ost */ tables %s.%s write, %s.%s write, %s.%s write`,
// sql.EscapeName(this.migrationContext.DatabaseName),
// sql.EscapeName(this.migrationContext.OriginalTableName),
// sql.EscapeName(this.migrationContext.DatabaseName),
// sql.EscapeName(this.migrationContext.GetGhostTableName()),
// sql.EscapeName(this.migrationContext.DatabaseName),
// sql.EscapeName(this.migrationContext.GetChangelogTableName()),
// )
query := fmt.Sprintf(`lock /* gh-ost */ tables %s.%s write`, query := fmt.Sprintf(`lock /* gh-ost */ tables %s.%s write`,
sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.OriginalTableName), sql.EscapeName(this.migrationContext.OriginalTableName),
) )
log.Infof("Locking tables") log.Infof("Locking %s.%s",
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.OriginalTableName),
)
this.migrationContext.LockTablesStartTime = time.Now() this.migrationContext.LockTablesStartTime = time.Now()
if _, err := sqlutils.ExecNoPrepare(this.singletonDB, query); err != nil { if _, err := sqlutils.ExecNoPrepare(this.singletonDB, query); err != nil {
return err return err
} }
log.Infof("Tables locked") log.Infof("Table locked")
return nil return nil
} }
@ -494,51 +489,40 @@ func (this *Applier) SwapTablesQuickAndBumpy() error {
return nil return nil
} }
// SwapTablesAtomic issues a single two-table RENAME statement to swap ghost table func (this *Applier) RenameTable(fromName, toName string) (err error) {
// into original's place query := fmt.Sprintf(`rename /* gh-ost */ table %s.%s to %s.%s`,
func (this *Applier) SwapTablesAtomic(sessionIdChan chan int64) error {
tx, err := this.db.Begin()
if err != nil {
return err
}
log.Infof("Setting timeout for RENAME for %d seconds", this.migrationContext.SwapTablesTimeoutSeconds)
query := fmt.Sprintf(`set session lock_wait_timeout:=%d`, this.migrationContext.SwapTablesTimeoutSeconds)
if _, err := tx.Exec(query); err != nil {
return err
}
var sessionId int64
if err := tx.QueryRow(`select connection_id()`).Scan(&sessionId); err != nil {
return err
}
sessionIdChan <- sessionId
query = fmt.Sprintf(`rename /* gh-ost */ table %s.%s to %s.%s, %s.%s to %s.%s`,
sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.OriginalTableName), sql.EscapeName(fromName),
sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.GetOldTableName()), sql.EscapeName(toName),
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.GetGhostTableName()),
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.OriginalTableName),
) )
log.Infof("Renaming tables") log.Infof("Renaming %s to %s", fromName, toName)
if _, err := sqlutils.ExecNoPrepare(this.db, query); err != nil {
this.migrationContext.RenameTablesStartTime = time.Now() return log.Errore(err)
if _, err := tx.Exec(query); err != nil {
return err
} }
this.migrationContext.RenameTablesEndTime = time.Now() log.Infof("Table renamed")
tx.Commit()
log.Infof("Tables renamed")
return nil return nil
} }
func (this *Applier) RenameTablesRollback() (renameError error) { func (this *Applier) RenameTablesRollback() (renameError error) {
// Restoring tables to original names.
query := fmt.Sprintf(`rename /* gh-ost */ table %s.%s to %s.%s`, // We prefer the single, atomic operation:
query := fmt.Sprintf(`rename /* gh-ost */ table %s.%s to %s.%s, %s.%s to %s.%s`,
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.OriginalTableName),
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.GetGhostTableName()),
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.GetOldTableName()),
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.OriginalTableName),
)
log.Infof("Renaming back both tables")
if _, err := sqlutils.ExecNoPrepare(this.db, query); err == nil {
return nil
}
// But, if for some reason the above was impossible to do, we rename one by one.
query = fmt.Sprintf(`rename /* gh-ost */ table %s.%s to %s.%s`,
sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.OriginalTableName), sql.EscapeName(this.migrationContext.OriginalTableName),
sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.DatabaseName),
@ -563,7 +547,7 @@ func (this *Applier) RenameTablesRollback() (renameError error) {
// StopSlaveIOThread is applicable with --test-on-replica; it stops the IO thread, duh. // StopSlaveIOThread is applicable with --test-on-replica; it stops the IO thread, duh.
// We need to keep the SQL thread active so as to complete processing received events, // We need to keep the SQL thread active so as to complete processing received events,
// and have them written to the binary log, so that we can then read them via streamer // and have them written to the binary log, so that we can then read them via streamer.
func (this *Applier) StopSlaveIOThread() error { func (this *Applier) StopSlaveIOThread() error {
query := `stop /* gh-ost */ slave io_thread` query := `stop /* gh-ost */ slave io_thread`
log.Infof("Stopping replication") log.Infof("Stopping replication")
@ -574,6 +558,17 @@ func (this *Applier) StopSlaveIOThread() error {
return nil return nil
} }
// StartSlaveSQLThread is applicable with --test-on-replica
func (this *Applier) StopSlaveSQLThread() error {
query := `stop /* gh-ost */ slave sql_thread`
log.Infof("Verifying SQL thread is stopped")
if _, err := sqlutils.ExecNoPrepare(this.db, query); err != nil {
return err
}
log.Infof("SQL thread stopped")
return nil
}
// StartSlaveSQLThread is applicable with --test-on-replica // StartSlaveSQLThread is applicable with --test-on-replica
func (this *Applier) StartSlaveSQLThread() error { func (this *Applier) StartSlaveSQLThread() error {
query := `start /* gh-ost */ slave sql_thread` query := `start /* gh-ost */ slave sql_thread`
@ -585,23 +580,11 @@ func (this *Applier) StartSlaveSQLThread() error {
return nil return nil
} }
// MasterPosWait is applicable with --test-on-replica
func (this *Applier) MasterPosWait(binlogCoordinates *mysql.BinlogCoordinates) error {
var appliedRows int64
if err := this.db.QueryRow(`select master_pos_wait(?, ?, ?)`, binlogCoordinates.LogFile, binlogCoordinates.LogPos, 3).Scan(&appliedRows); err != nil {
return log.Errore(err)
}
if appliedRows < 0 {
return fmt.Errorf("Timeout waiting on master_pos_wait()")
}
return nil
}
func (this *Applier) StopSlaveNicely() error { func (this *Applier) StopSlaveNicely() error {
if err := this.StopSlaveIOThread(); err != nil { if err := this.StopSlaveIOThread(); err != nil {
return err return err
} }
if err := this.StartSlaveSQLThread(); err != nil { if err := this.StopSlaveSQLThread(); err != nil {
return err return err
} }
readBinlogCoordinates, executeBinlogCoordinates, err := mysql.GetReplicationBinlogCoordinates(this.db) readBinlogCoordinates, executeBinlogCoordinates, err := mysql.GetReplicationBinlogCoordinates(this.db)
@ -612,89 +595,152 @@ func (this *Applier) StopSlaveNicely() error {
return nil return nil
} }
// GrabVoluntaryLock gets a named lock (`GET_LOCK`) and listens func (this *Applier) GetSessionLockName(sessionId int64) string {
// on a okToRelease in order to release it return fmt.Sprintf("gh-ost.%d.lock", sessionId)
func (this *Applier) GrabVoluntaryLock(lockGrabbed chan<- error, okToRelease <-chan bool) error {
lockName := this.migrationContext.GetVoluntaryLockName()
tx, err := this.db.Begin()
if err != nil {
lockGrabbed <- err
return err
}
// Grab
query := `select get_lock(?, 0)`
lockResult := 0
log.Infof("Grabbing voluntary lock: %s", lockName)
if err := tx.QueryRow(query, lockName).Scan(&lockResult); err != nil {
lockGrabbed <- err
return err
}
if lockResult != 1 {
err := fmt.Errorf("Lock was not acquired")
lockGrabbed <- err
return err
}
log.Infof("Voluntary lock grabbed")
lockGrabbed <- nil // No error.
// Listeners on the above will proceed to submit the "all queries till lock have been found"
// We will wait here till we're told to. This will happen once all DML events up till lock
// have been appleid on the ghost table
<-okToRelease
// Release
query = `select ifnull(release_lock(?),0)`
log.Infof("Releasing voluntary lock")
if err := tx.QueryRow(query, lockName).Scan(&lockResult); err != nil {
return log.Errore(err)
}
if lockResult != 1 {
// Generally speaking we should never get this.
return log.Errorf("release_lock result was %+v", lockResult)
}
tx.Rollback()
log.Infof("Voluntary lock released")
return nil
} }
// IssueBlockingQueryOnVoluntaryLock will SELECT on the original table using a // LockOriginalTableAndWait locks the original table, notifies the lock is in
// conditional on a known to be occupied lock. This query is expected to block, // place, and awaits further instruction
// and will further block the followup RENAME statement func (this *Applier) LockOriginalTableAndWait(sessionIdChan chan int64, tableLocked chan<- error, okToUnlockTable <-chan bool, tableUnlocked chan<- error) error {
func (this *Applier) IssueBlockingQueryOnVoluntaryLock(sessionIdChan chan int64) error {
lockName := this.migrationContext.GetVoluntaryLockName()
tx, err := this.db.Begin() tx, err := this.db.Begin()
if err != nil { if err != nil {
tableLocked <- err
return err return err
} }
defer func() {
tx.Rollback()
}()
var sessionId int64 var sessionId int64
if err := tx.QueryRow(`select connection_id()`).Scan(&sessionId); err != nil { if err := tx.QueryRow(`select connection_id()`).Scan(&sessionId); err != nil {
return err return err
} }
sessionIdChan <- sessionId sessionIdChan <- sessionId
// Grab query := `select get_lock(?, 0)`
query := fmt.Sprintf(` lockResult := 0
select /* gh-ost blocking-query-%s */ lockName := this.GetSessionLockName(sessionId)
release_lock(?) log.Infof("Grabbing voluntary lock: %s", lockName)
from %s.%s if err := tx.QueryRow(query, lockName).Scan(&lockResult); err != nil || lockResult != 1 {
where return fmt.Errorf("Unable to acquire lock %s", lockName)
get_lock(?, 86400) >= 0 }
limit 1
`, query = fmt.Sprintf(`lock /* gh-ost */ tables %s.%s write`,
lockName, sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.OriginalTableName),
)
log.Infof("Locking %s.%s",
sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.OriginalTableName), sql.EscapeName(this.migrationContext.OriginalTableName),
) )
dummyResult := 0
log.Infof("Issuing blocking query")
this.migrationContext.LockTablesStartTime = time.Now() this.migrationContext.LockTablesStartTime = time.Now()
tx.QueryRow(query, lockName, lockName).Scan(&dummyResult) if _, err := tx.Exec(query); err != nil {
tx.Rollback() tableLocked <- err
log.Infof("Blocking query released") return err
}
log.Infof("Table locked")
tableLocked <- nil // No error.
// The cut-over phase will proceed to apply remaining backlon onto ghost table,
// and issue RENAMEs. We wait here until told to proceed.
<-okToUnlockTable
// Release
query = `unlock tables`
log.Infof("Releasing lock from %s.%s",
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.OriginalTableName),
)
if _, err := tx.Exec(query); err != nil {
tableUnlocked <- err
return log.Errore(err)
}
log.Infof("Table unlocked")
tableUnlocked <- nil
return nil
}
// RenameOriginalTable will attempt renaming the original table into _old
func (this *Applier) RenameOriginalTable(sessionIdChan chan int64, originalTableRenamed chan<- error) error {
tx, err := this.db.Begin()
if err != nil {
return err
}
defer func() {
tx.Rollback()
originalTableRenamed <- nil
}()
var sessionId int64
if err := tx.QueryRow(`select connection_id()`).Scan(&sessionId); err != nil {
return err
}
sessionIdChan <- sessionId
log.Infof("Setting RENAME timeout as %d seconds", this.migrationContext.SwapTablesTimeoutSeconds)
query := fmt.Sprintf(`set session lock_wait_timeout:=%d`, this.migrationContext.SwapTablesTimeoutSeconds)
if _, err := tx.Exec(query); err != nil {
return err
}
query = fmt.Sprintf(`rename /* gh-ost */ table %s.%s to %s.%s`,
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.OriginalTableName),
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.GetOldTableName()),
)
log.Infof("Issuing and expecting this to block: %s", query)
if _, err := tx.Exec(query); err != nil {
return log.Errore(err)
}
log.Infof("Original table renamed")
return nil
}
// RenameGhostTable will attempt renaming the ghost table into original
func (this *Applier) RenameGhostTable(sessionIdChan chan int64, ghostTableRenamed chan<- error) error {
tx, err := this.db.Begin()
if err != nil {
return err
}
defer func() {
tx.Rollback()
}()
var sessionId int64
if err := tx.QueryRow(`select connection_id()`).Scan(&sessionId); err != nil {
return err
}
sessionIdChan <- sessionId
log.Infof("Setting RENAME timeout as %d seconds", this.migrationContext.SwapTablesTimeoutSeconds)
query := fmt.Sprintf(`set session lock_wait_timeout:=%d`, this.migrationContext.SwapTablesTimeoutSeconds)
if _, err := tx.Exec(query); err != nil {
return err
}
query = fmt.Sprintf(`rename /* gh-ost */ table %s.%s to %s.%s`,
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.GetGhostTableName()),
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.OriginalTableName),
)
log.Infof("Issuing and expecting this to block: %s", query)
if _, err := tx.Exec(query); err != nil {
ghostTableRenamed <- err
return log.Errore(err)
}
log.Infof("Ghost table renamed")
ghostTableRenamed <- nil
return nil
}
func (this *Applier) ExpectUsedLock(sessionId int64) error {
var result int64
query := `select is_used_lock(?)`
lockName := this.GetSessionLockName(sessionId)
log.Infof("Checking session lock: %s", lockName)
if err := this.db.QueryRow(query, lockName).Scan(&result); err != nil || result != sessionId {
return fmt.Errorf("Session lock %s expected to be found but wasn't", lockName)
}
return nil return nil
} }

View File

@ -51,7 +51,6 @@ type Migrator struct {
tablesInPlace chan bool tablesInPlace chan bool
rowCopyComplete chan bool rowCopyComplete chan bool
allEventsUpToLockProcessed chan bool allEventsUpToLockProcessed chan bool
voluntaryLockAcquired chan bool
panicAbort chan error panicAbort chan error
rowCopyCompleteFlag int64 rowCopyCompleteFlag int64
@ -71,7 +70,6 @@ func NewMigrator() *Migrator {
tablesInPlace: make(chan bool), tablesInPlace: make(chan bool),
rowCopyComplete: make(chan bool), rowCopyComplete: make(chan bool),
allEventsUpToLockProcessed: make(chan bool), allEventsUpToLockProcessed: make(chan bool),
voluntaryLockAcquired: make(chan bool, 1),
panicAbort: make(chan error), panicAbort: make(chan error),
allEventsUpToLockProcessedInjectedFlag: 0, allEventsUpToLockProcessedInjectedFlag: 0,
@ -352,7 +350,7 @@ func (this *Migrator) Migrate() (err error) {
log.Infof("Row copy complete") log.Infof("Row copy complete")
this.printStatus() this.printStatus()
if err := this.stopWritesAndCompleteMigration(); err != nil { if err := this.cutOver(); err != nil {
return err return err
} }
@ -363,9 +361,9 @@ func (this *Migrator) Migrate() (err error) {
return nil return nil
} }
// stopWritesAndCompleteMigration performs the final step of migration, based on migration // cutOver performs the final step of migration, based on migration
// type (on replica? bumpy? safe?) // type (on replica? bumpy? safe?)
func (this *Migrator) stopWritesAndCompleteMigration() (err error) { func (this *Migrator) cutOver() (err error) {
if this.migrationContext.Noop { if this.migrationContext.Noop {
log.Debugf("Noop operation; not really swapping tables") log.Debugf("Noop operation; not really swapping tables")
return nil return nil
@ -381,16 +379,16 @@ func (this *Migrator) stopWritesAndCompleteMigration() (err error) {
} }
if base.FileExists(this.migrationContext.PostponeCutOverFlagFile) { if base.FileExists(this.migrationContext.PostponeCutOverFlagFile) {
// Throttle file defined and exists! // Throttle file defined and exists!
atomic.StoreInt64(&this.migrationContext.IsPostponingCutOver, 1)
log.Debugf("Postponing final table swap as flag file exists: %+v", this.migrationContext.PostponeCutOverFlagFile) log.Debugf("Postponing final table swap as flag file exists: %+v", this.migrationContext.PostponeCutOverFlagFile)
return true, nil return true, nil
} }
return false, nil return false, nil
}, },
) )
atomic.StoreInt64(&this.migrationContext.IsPostponingCutOver, 0)
if this.migrationContext.TestOnReplica { if this.migrationContext.TestOnReplica {
// return this.stopWritesAndCompleteMigrationOnReplica()
// With `--test-on-replica` we stop replication thread, and then proceed to use // With `--test-on-replica` we stop replication thread, and then proceed to use
// the same cut-over phase as the master would use. That means we take locks // the same cut-over phase as the master would use. That means we take locks
// and swap the tables. // and swap the tables.
@ -402,27 +400,28 @@ func (this *Migrator) stopWritesAndCompleteMigration() (err error) {
// We're merly testing, we don't want to keep this state. Rollback the renames as possible // We're merly testing, we don't want to keep this state. Rollback the renames as possible
defer this.applier.RenameTablesRollback() defer this.applier.RenameTablesRollback()
} }
if this.migrationContext.CutOverType == base.CutOverSafe {
if this.migrationContext.CutOverType == base.CutOverTwoStep {
return this.stopWritesAndCompleteMigrationOnMasterQuickAndBumpy()
}
if this.migrationContext.CutOverType == base.CutOverVoluntaryLock {
// Lock-based solution: we use low timeout and multiple attempts. But for // Lock-based solution: we use low timeout and multiple attempts. But for
// each failed attempt, we throttle until replication lag is back to normal // each failed attempt, we throttle until replication lag is back to normal
if err := this.retryOperation( err := this.retryOperation(
func() error { func() error {
return this.executeAndThrottleOnError(this.stopWritesAndCompleteMigrationOnMasterViaLock) return this.executeAndThrottleOnError(this.safeCutOver)
}); err != nil { },
return err )
} return err
} }
return if this.migrationContext.CutOverType == base.CutOverTwoStep {
err := this.retryOperation(this.cutOverTwoStep)
return err
}
return nil
} }
// Inject the "AllEventsUpToLockProcessed" state hint, wait for it to appear in the binary logs, // Inject the "AllEventsUpToLockProcessed" state hint, wait for it to appear in the binary logs,
// make sure the queue is drained. // make sure the queue is drained.
func (this *Migrator) waitForEventsUpToLock() (err error) { func (this *Migrator) waitForEventsUpToLock() (err error) {
waitForEventsUpToLockStartTime := time.Now()
log.Infof("Writing changelog state: %+v", AllEventsUpToLockProcessed) log.Infof("Writing changelog state: %+v", AllEventsUpToLockProcessed)
if _, err := this.applier.WriteChangelogState(string(AllEventsUpToLockProcessed)); err != nil { if _, err := this.applier.WriteChangelogState(string(AllEventsUpToLockProcessed)); err != nil {
return err return err
@ -430,18 +429,21 @@ func (this *Migrator) waitForEventsUpToLock() (err error) {
log.Infof("Waiting for events up to lock") log.Infof("Waiting for events up to lock")
atomic.StoreInt64(&this.allEventsUpToLockProcessedInjectedFlag, 1) atomic.StoreInt64(&this.allEventsUpToLockProcessedInjectedFlag, 1)
<-this.allEventsUpToLockProcessed <-this.allEventsUpToLockProcessed
log.Infof("Done waiting for events up to lock") waitForEventsUpToLockDuration := time.Now().Sub(waitForEventsUpToLockStartTime)
log.Infof("Done waiting for events up to lock; duration=%+v", waitForEventsUpToLockDuration)
this.printMigrationStatusHint()
this.printStatus() this.printStatus()
return nil return nil
} }
// stopWritesAndCompleteMigrationOnMasterQuickAndBumpy will lock down the original table, execute // cutOverTwoStep will lock down the original table, execute
// what's left of last DML entries, and **non-atomically** swap original->old, then new->original. // what's left of last DML entries, and **non-atomically** swap original->old, then new->original.
// There is a point in time where the "original" table does not exist and queries are non-blocked // There is a point in time where the "original" table does not exist and queries are non-blocked
// and failing. // and failing.
func (this *Migrator) stopWritesAndCompleteMigrationOnMasterQuickAndBumpy() (err error) { func (this *Migrator) cutOverTwoStep() (err error) {
if err := this.retryOperation(this.applier.LockTables); err != nil { if err := this.retryOperation(this.applier.LockOriginalTable); err != nil {
return err return err
} }
@ -461,66 +463,104 @@ func (this *Migrator) stopWritesAndCompleteMigrationOnMasterQuickAndBumpy() (err
return nil return nil
} }
// stopWritesAndCompleteMigrationOnMasterViaLock will lock down the original table, execute // cutOverSafe performs a safe cut over, where normally (no failure) the original table
// what's left of last DML entries, and atomically swap & unlock (original->old && new->original) // is being locked until swapped, hence DML queries being locked and unaware of the cut-over.
func (this *Migrator) stopWritesAndCompleteMigrationOnMasterViaLock() (err error) { // In the worst case, there will ba a minor outage, where the original table would not exist.
lockGrabbed := make(chan error, 1) func (this *Migrator) safeCutOver() (err error) {
okToReleaseLock := make(chan bool, 1) okToUnlockTable := make(chan bool, 2)
swapResult := make(chan error, 1) originalTableRenamed := make(chan error, 1)
go func() { defer func() {
if err := this.applier.GrabVoluntaryLock(lockGrabbed, okToReleaseLock); err != nil { // The following is to make sure we unlock the table no-matter-what!
// There's enough buffer in the channel to support a redundant write here.
okToUnlockTable <- true
// We need to make sure we wait for the original-rename, successful or not,
// so as to be able to rollback in case the ghost-rename fails.
<-originalTableRenamed
// Rollback operation
if !this.applier.tableExists(this.migrationContext.OriginalTableName) {
log.Infof("Cannot find %s, rolling back", this.migrationContext.OriginalTableName)
err := this.applier.RenameTable(this.migrationContext.GetOldTableName(), this.migrationContext.OriginalTableName)
log.Errore(err) log.Errore(err)
} }
}() }()
if err := <-lockGrabbed; err != nil { lockOriginalSessionIdChan := make(chan int64, 1)
tableLocked := make(chan error, 1)
tableUnlocked := make(chan error, 1)
go func() {
if err := this.applier.LockOriginalTableAndWait(lockOriginalSessionIdChan, tableLocked, okToUnlockTable, tableUnlocked); err != nil {
log.Errore(err)
}
}()
if err := <-tableLocked; err != nil {
return log.Errore(err) return log.Errore(err)
} }
blockingQuerySessionIdChan := make(chan int64, 1) lockOriginalSessionId := <-lockOriginalSessionIdChan
go func() { log.Infof("Session locking original table is %+v", lockOriginalSessionId)
this.applier.IssueBlockingQueryOnVoluntaryLock(blockingQuerySessionIdChan) // At this point we know the table is locked.
}() // We know any newly incoming DML on original table is blocked.
blockingQuerySessionId := <-blockingQuerySessionIdChan
log.Infof("Intentional blocking query connection id is %+v", blockingQuerySessionId)
if err := this.retryOperation(
func() error {
return this.applier.ExpectProcess(blockingQuerySessionId, "User lock", this.migrationContext.GetVoluntaryLockName())
}); err != nil {
return err
}
log.Infof("Found blocking query to be executing")
swapSessionIdChan := make(chan int64, 1)
go func() {
swapResult <- this.applier.SwapTablesAtomic(swapSessionIdChan)
}()
swapSessionId := <-swapSessionIdChan
log.Infof("RENAME connection id is %+v", swapSessionId)
if err := this.retryOperation(
func() error {
return this.applier.ExpectProcess(swapSessionId, "metadata lock", "rename")
}); err != nil {
return err
}
log.Infof("Found RENAME to be executing")
// OK, at this time we know any newly incoming DML on original table is blocked.
this.waitForEventsUpToLock() this.waitForEventsUpToLock()
okToReleaseLock <- true // Step 2
// BAM: voluntary lock is released, blocking query is released, rename is released. // We now attempt a RENAME on the original table, and expect it to block
// We now check RENAME result. We have lock_wait_timeout. We put it on purpose, to avoid renameOriginalSessionIdChan := make(chan int64, 1)
// locking the tables for too long. If lock time exceeds said timeout, the RENAME fails this.migrationContext.RenameTablesStartTime = time.Now()
// and returns a non-nil error, in which case tables have not been swapped, and we are go func() {
// not really done. We are, however, good to go for more retries. this.applier.RenameOriginalTable(renameOriginalSessionIdChan, originalTableRenamed)
if err := <-swapResult; err != nil { }()
// Bummer. We shall rest a while and try again renameOriginalSessionId := <-renameOriginalSessionIdChan
log.Infof("Session renaming original table is %+v", renameOriginalSessionId)
if err := this.retryOperation(
func() error {
return this.applier.ExpectProcess(renameOriginalSessionId, "metadata lock", "rename")
}); err != nil {
return err return err
} }
log.Infof("Found RENAME on original table to be blocking, as expected. Double checking original is still being locked")
if err := this.applier.ExpectUsedLock(lockOriginalSessionId); err != nil {
// Abort operation; but make sure to unlock table!
return log.Errore(err)
}
log.Infof("Connection holding lock on original table still exists")
// Now that we've found the RENAME blocking, AND the locking connection still alive,
// we know it is safe to proceed to renaming ghost table.
// Step 3
// We now attempt a RENAME on the ghost table, and expect it to block
renameGhostSessionIdChan := make(chan int64, 1)
ghostTableRenamed := make(chan error, 1)
go func() {
this.applier.RenameGhostTable(renameGhostSessionIdChan, ghostTableRenamed)
}()
renameGhostSessionId := <-renameGhostSessionIdChan
log.Infof("Session renaming ghost table is %+v", renameGhostSessionId)
if err := this.retryOperation(
func() error {
return this.applier.ExpectProcess(renameGhostSessionId, "metadata lock", "rename")
}); err != nil {
return err
}
log.Infof("Found RENAME on ghost table to be blocking, as expected. Will next release lock on original table")
// Step 4
okToUnlockTable <- true
// BAM! original table lock is released, RENAME original->old released,
// RENAME ghost->original is released, queries on original are unblocked.
// (that is, assuming all went well)
if err := <-tableUnlocked; err != nil {
return log.Errore(err)
}
if err := <-ghostTableRenamed; err != nil {
return log.Errore(err)
}
this.migrationContext.RenameTablesEndTime = time.Now()
// ooh nice! We're actually truly and thankfully done // ooh nice! We're actually truly and thankfully done
lockAndRenameDuration := this.migrationContext.RenameTablesEndTime.Sub(this.migrationContext.LockTablesStartTime) lockAndRenameDuration := this.migrationContext.RenameTablesEndTime.Sub(this.migrationContext.LockTablesStartTime)
renameDuration := this.migrationContext.RenameTablesEndTime.Sub(this.migrationContext.RenameTablesStartTime) log.Infof("Lock & rename duration: %s. During this time, queries on %s were blocked", lockAndRenameDuration, sql.EscapeName(this.migrationContext.OriginalTableName))
log.Debugf("Lock & rename duration: %s. Of this, rename time was %s. During rename time, queries on %s were blocked", lockAndRenameDuration, renameDuration, sql.EscapeName(this.migrationContext.OriginalTableName))
return nil return nil
} }
@ -536,7 +576,6 @@ func (this *Migrator) stopWritesAndCompleteMigrationOnReplica() (err error) {
this.waitForEventsUpToLock() this.waitForEventsUpToLock()
this.printMigrationStatusHint()
log.Info("Table duplicated with new schema. Am not touching the original table. Replication is stopped. You may now compare the two tables to gain trust into this tool's operation") log.Info("Table duplicated with new schema. Am not touching the original table. Replication is stopped. You may now compare the two tables to gain trust into this tool's operation")
return nil return nil
} }
@ -710,7 +749,9 @@ func (this *Migrator) printStatus(writers ...io.Writer) {
var etaSeconds float64 = math.MaxFloat64 var etaSeconds float64 = math.MaxFloat64
eta := "N/A" eta := "N/A"
if isThrottled, throttleReason := this.migrationContext.IsThrottled(); isThrottled { if atomic.LoadInt64(&this.migrationContext.IsPostponingCutOver) > 0 {
eta = "postponing cut-over"
} else if isThrottled, throttleReason := this.migrationContext.IsThrottled(); isThrottled {
eta = fmt.Sprintf("throttled, %s", throttleReason) eta = fmt.Sprintf("throttled, %s", throttleReason)
} else if progressPct > 100.0 { } else if progressPct > 100.0 {
eta = "Due" eta = "Due"
@ -838,6 +879,10 @@ func (this *Migrator) initiateApplier() error {
if err := this.applier.ValidateOrDropExistingTables(); err != nil { if err := this.applier.ValidateOrDropExistingTables(); err != nil {
return err return err
} }
if err := this.applier.CreateChangelogTable(); err != nil {
log.Errorf("Unable to create changelog table, see further error details. Perhaps a previous migration failed without dropping the table? OR is there a running migration? Bailing out")
return err
}
if err := this.applier.CreateGhostTable(); err != nil { if err := this.applier.CreateGhostTable(); err != nil {
log.Errorf("Unable to create ghost table, see further error details. Perhaps a previous migration failed without dropping the table? Bailing out") log.Errorf("Unable to create ghost table, see further error details. Perhaps a previous migration failed without dropping the table? Bailing out")
return err return err
@ -846,10 +891,6 @@ func (this *Migrator) initiateApplier() error {
log.Errorf("Unable to ALTER ghost table, see further error details. Bailing out") log.Errorf("Unable to ALTER ghost table, see further error details. Bailing out")
return err return err
} }
if err := this.applier.CreateChangelogTable(); err != nil {
log.Errorf("Unable to create changelog table, see further error details. Perhaps a previous migration failed without dropping the table? OR is there a running migration? Bailing out")
return err
}
this.applier.WriteChangelogState(string(TablesInPlace)) this.applier.WriteChangelogState(string(TablesInPlace))
go this.applier.InitiateHeartbeat(heartbeatIntervalMilliseconds) go this.applier.InitiateHeartbeat(heartbeatIntervalMilliseconds)