an atomic cut-over implementation, as per issue #82

This commit is contained in:
Shlomi Noach 2016-06-27 11:08:06 +02:00
parent ad25e60e0c
commit 0191b2897d
5 changed files with 312 additions and 11 deletions

View File

@ -1,7 +1,7 @@
#!/bin/bash
#
#
RELEASE_VERSION="0.9.7"
RELEASE_VERSION="0.9.8"
buildpath=/tmp/gh-ost
target=gh-ost

View File

@ -30,7 +30,8 @@ const (
type CutOver int
const (
CutOverSafe CutOver = iota
CutOverAtomic CutOver = iota
CutOverSafe = iota
CutOverTwoStep = iota
)

View File

@ -66,7 +66,7 @@ func main() {
flag.BoolVar(&migrationContext.OkToDropTable, "ok-to-drop-table", false, "Shall the tool drop the old table at end of operation. DROPping tables can be a long locking operation, which is why I'm not doing it by default. I'm an online tool, yes?")
flag.BoolVar(&migrationContext.InitiallyDropOldTable, "initially-drop-old-table", false, "Drop a possibly existing OLD table (remains from a previous run?) before beginning operation. Default is to panic and abort if such table exists")
flag.BoolVar(&migrationContext.InitiallyDropGhostTable, "initially-drop-ghost-table", false, "Drop a possibly existing Ghost table (remains from a previous run?) before beginning operation. Default is to panic and abort if such table exists")
cutOver := flag.String("cut-over", "", "(mandatory) choose cut-over type (two-step, voluntary-lock)")
cutOver := flag.String("cut-over", "atomic", "choose cut-over type (atomic, two-step, voluntary-lock)")
flag.BoolVar(&migrationContext.SwitchToRowBinlogFormat, "switch-to-rbr", false, "let this tool automatically switch binary log format to 'ROW' on the replica, if needed. The format will NOT be switched back. I'm too scared to do that, and wish to protect you if you happen to execute another migration while this one is running")
chunkSize := flag.Int64("chunk-size", 1000, "amount of rows to handle in each iteration (allowed range: 100-100,000)")
@ -144,7 +144,9 @@ func main() {
}
switch *cutOver {
case "safe", "default", "":
case "atomic", "default", "":
migrationContext.CutOverType = base.CutOverAtomic
case "safe":
migrationContext.CutOverType = base.CutOverSafe
case "two-step":
migrationContext.CutOverType = base.CutOverTwoStep

View File

@ -20,6 +20,10 @@ import (
"github.com/outbrain/golib/sqlutils"
)
const (
atomicCutOverMagicHint = "ghost-cut-over-sentry"
)
// Applier connects and writes the the applier-server, which is the server where migration
// happens. This is typically the master, but could be a replica when `--test-on-replica` or
// `--execute-on-replica` are given.
@ -77,15 +81,21 @@ func (this *Applier) validateConnection(db *gosql.DB) error {
return nil
}
// tableExists checks if a given table exists in database
func (this *Applier) tableExists(tableName string) (tableFound bool) {
// showTableStatus returns the output of `show table status like '...'` command
func (this *Applier) showTableStatus(tableName string) (rowMap sqlutils.RowMap) {
rowMap = nil
query := fmt.Sprintf(`show /* gh-ost */ table status from %s like '%s'`, sql.EscapeName(this.migrationContext.DatabaseName), tableName)
sqlutils.QueryRowsMap(this.db, query, func(m sqlutils.RowMap) error {
tableFound = true
rowMap = m
return nil
})
return tableFound
return rowMap
}
// tableExists checks if a given table exists in database
func (this *Applier) tableExists(tableName string) (tableFound bool) {
m := this.showTableStatus(tableName)
return (m != nil)
}
// ValidateOrDropExistingTables verifies ghost and changelog tables do not exist,
@ -775,6 +785,195 @@ func (this *Applier) ExpectProcess(sessionId int64, stateHint, infoHint string)
return nil
}
// DropAtomicCutOverSentryTableIfExists checks if the "old" table name
// happens to be a cut-over magic table; if so, it drops it.
func (this *Applier) DropAtomicCutOverSentryTableIfExists() error {
log.Infof("Looking for magic cut-over table")
tableName := this.migrationContext.GetOldTableName()
rowMap := this.showTableStatus(tableName)
if rowMap == nil {
// Table does not exist
return nil
}
if rowMap["Comment"].String != atomicCutOverMagicHint {
return fmt.Errorf("Expected magic comment on %s, did not find it", tableName)
}
log.Infof("Dropping magic cut-over table")
return this.dropTable(tableName)
}
// DropAtomicCutOverSentryTableIfExists checks if the "old" table name
// happens to be a cut-over magic table; if so, it drops it.
func (this *Applier) CreateAtomicCutOverSentryTable() error {
if err := this.DropAtomicCutOverSentryTableIfExists(); err != nil {
return err
}
tableName := this.migrationContext.GetOldTableName()
query := fmt.Sprintf(`create /* gh-ost */ table %s.%s (
id int auto_increment primary key
) comment='%s'
`,
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(tableName),
atomicCutOverMagicHint,
)
log.Infof("Creating magic cut-over table %s.%s",
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(tableName),
)
if _, err := sqlutils.ExecNoPrepare(this.db, query); err != nil {
return err
}
log.Infof("Magic cut-over table created")
return nil
}
// AtomicCutOverMagicLock
func (this *Applier) AtomicCutOverMagicLock(sessionIdChan chan int64, tableLocked chan<- error, okToUnlockTable <-chan bool, tableUnlocked chan<- error) error {
tx, err := this.db.Begin()
if err != nil {
tableLocked <- err
return err
}
defer func() {
sessionIdChan <- -1
tableLocked <- fmt.Errorf("Unexpected error in AtomicCutOverMagicLock(), injected to release blocking channel reads")
tableUnlocked <- fmt.Errorf("Unexpected error in AtomicCutOverMagicLock(), injected to release blocking channel reads")
tx.Rollback()
}()
var sessionId int64
if err := tx.QueryRow(`select connection_id()`).Scan(&sessionId); err != nil {
tableLocked <- err
return err
}
sessionIdChan <- sessionId
lockResult := 0
query := `select get_lock(?, 0)`
lockName := this.GetSessionLockName(sessionId)
log.Infof("Grabbing voluntary lock: %s", lockName)
if err := tx.QueryRow(query, lockName).Scan(&lockResult); err != nil || lockResult != 1 {
err := fmt.Errorf("Unable to acquire lock %s", lockName)
tableLocked <- err
return err
}
tableLockTimeoutSeconds := this.migrationContext.SwapTablesTimeoutSeconds * 2
log.Infof("Setting LOCK timeout as %d seconds", tableLockTimeoutSeconds)
query = fmt.Sprintf(`set session lock_wait_timeout:=%d`, tableLockTimeoutSeconds)
if _, err := tx.Exec(query); err != nil {
tableLocked <- err
return err
}
if err := this.CreateAtomicCutOverSentryTable(); err != nil {
tableLocked <- err
return err
}
query = fmt.Sprintf(`lock /* gh-ost */ tables %s.%s write, %s.%s write`,
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.OriginalTableName),
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.GetOldTableName()),
)
log.Infof("Locking %s.%s, %s.%s",
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.OriginalTableName),
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.GetOldTableName()),
)
this.migrationContext.LockTablesStartTime = time.Now()
if _, err := tx.Exec(query); err != nil {
tableLocked <- err
return err
}
log.Infof("Tables locked")
tableLocked <- nil // No error.
// From this point on, we are committed to UNLOCK TABLES. No matter what happens,
// the UNLOCK must execute (or, alternatively, this connection dies, which gets the same impact)
// The cut-over phase will proceed to apply remaining backlog onto ghost table,
// and issue RENAME. We wait here until told to proceed.
<-okToUnlockTable
log.Infof("Will now proceed to drop magic table and unlock tables")
// The magic table is here because we locked it. And we are the only ones allowed to drop it.
// And in fact, we will:
log.Infof("Dropping magic cut-over table")
query = fmt.Sprintf(`drop /* gh-ost */ table if exists %s.%s`,
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.GetOldTableName()),
)
if _, err := tx.Exec(query); err != nil {
log.Errore(err)
// We DO NOT return here because we must `UNLOCK TABLES`!
}
// Tables still locked
log.Infof("Releasing lock from %s.%s, %s.%s",
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.OriginalTableName),
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.GetOldTableName()),
)
query = `unlock tables`
if _, err := tx.Exec(query); err != nil {
tableUnlocked <- err
return log.Errore(err)
}
log.Infof("Tables unlocked")
tableUnlocked <- nil
return nil
}
// RenameOriginalTable will attempt renaming the original table into _old
func (this *Applier) AtomicCutoverRename(sessionIdChan chan int64, tablesRenamed chan<- error) error {
tx, err := this.db.Begin()
if err != nil {
return err
}
defer func() {
tx.Rollback()
sessionIdChan <- -1
tablesRenamed <- fmt.Errorf("Unexpected error in AtomicCutoverRename(), injected to release blocking channel reads")
}()
var sessionId int64
if err := tx.QueryRow(`select connection_id()`).Scan(&sessionId); err != nil {
return err
}
sessionIdChan <- sessionId
log.Infof("Setting RENAME timeout as %d seconds", this.migrationContext.SwapTablesTimeoutSeconds)
query := fmt.Sprintf(`set session lock_wait_timeout:=%d`, this.migrationContext.SwapTablesTimeoutSeconds)
if _, err := tx.Exec(query); err != nil {
return err
}
query = fmt.Sprintf(`rename /* gh-ost */ table %s.%s to %s.%s, %s.%s to %s.%s`,
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.OriginalTableName),
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.GetOldTableName()),
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.GetGhostTableName()),
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.OriginalTableName),
)
log.Infof("Issuing and expecting this to block: %s", query)
if _, err := tx.Exec(query); err != nil {
tablesRenamed <- err
return log.Errore(err)
}
tablesRenamed <- nil
log.Infof("Tables renamed")
return nil
}
func (this *Applier) ShowStatusVariable(variableName string) (result int64, err error) {
query := fmt.Sprintf(`show global status like '%s'`, variableName)
if err := this.db.QueryRow(query).Scan(&variableName, &result); err != nil {

View File

@ -238,7 +238,7 @@ func (this *Migrator) sleepWhileTrue(operation func() (bool, error)) error {
// retryOperation attempts up to `count` attempts at running given function,
// exiting as soon as it returns with non-error.
func (this *Migrator) retryOperation(operation func() error) (err error) {
func (this *Migrator) retryOperation(operation func() error, notFatalHint ...bool) (err error) {
maxRetries := int(this.migrationContext.MaxRetries())
for i := 0; i < maxRetries; i++ {
if i != 0 {
@ -251,7 +251,9 @@ func (this *Migrator) retryOperation(operation func() error) (err error) {
}
// there's an error. Let's try again.
}
this.panicAbort <- err
if len(notFatalHint) == 0 {
this.panicAbort <- err
}
return err
}
@ -469,6 +471,16 @@ func (this *Migrator) cutOver() (err error) {
// We're merly testing, we don't want to keep this state. Rollback the renames as possible
defer this.applier.RenameTablesRollback()
}
if this.migrationContext.CutOverType == base.CutOverAtomic {
// Atomic solution: we use low timeout and multiple attempts. But for
// each failed attempt, we throttle until replication lag is back to normal
err := this.retryOperation(
func() error {
return this.executeAndThrottleOnError(this.atomicCutOver)
},
)
return err
}
if this.migrationContext.CutOverType == base.CutOverSafe {
// Lock-based solution: we use low timeout and multiple attempts. But for
// each failed attempt, we throttle until replication lag is back to normal
@ -539,6 +551,93 @@ func (this *Migrator) cutOverTwoStep() (err error) {
return nil
}
// atomicCutOver
func (this *Migrator) atomicCutOver() (err error) {
atomic.StoreInt64(&this.inCutOverCriticalActionFlag, 1)
defer atomic.StoreInt64(&this.inCutOverCriticalActionFlag, 0)
defer func() {
this.applier.DropAtomicCutOverSentryTableIfExists()
}()
lockOriginalSessionIdChan := make(chan int64, 2)
tableLocked := make(chan error, 2)
okToUnlockTable := make(chan bool, 3)
tableUnlocked := make(chan error, 2)
go func() {
if err := this.applier.AtomicCutOverMagicLock(lockOriginalSessionIdChan, tableLocked, okToUnlockTable, tableUnlocked); err != nil {
log.Errore(err)
}
}()
if err := <-tableLocked; err != nil {
return log.Errore(err)
}
lockOriginalSessionId := <-lockOriginalSessionIdChan
log.Infof("Session locking original & magic tables is %+v", lockOriginalSessionId)
// At this point we know the original table is locked.
// We know any newly incoming DML on original table is blocked.
this.waitForEventsUpToLock()
// Step 2
// We now attempt an atomic RENAME on original & ghost tables, and expect it to block.
this.migrationContext.RenameTablesStartTime = time.Now()
var tableRenameKnownToHaveFailed int64
renameSessionIdChan := make(chan int64, 2)
tablesRenamed := make(chan error, 2)
go func() {
if err := this.applier.AtomicCutoverRename(renameSessionIdChan, tablesRenamed); err != nil {
// Abort! Release the lock
atomic.StoreInt64(&tableRenameKnownToHaveFailed, 1)
okToUnlockTable <- true
}
}()
renameSessionId := <-renameSessionIdChan
log.Infof("Session renaming tables is %+v", renameSessionId)
waitForRename := func() error {
if atomic.LoadInt64(&tableRenameKnownToHaveFailed) == 1 {
// We return `nil` here so as to avoid the `retry`. The RENAME has failed,
// it won't show up in PROCESSLIST, no point in waiting
return nil
}
return this.applier.ExpectProcess(renameSessionId, "metadata lock", "rename")
}
// Wait for the RENAME to appear in PROCESSLIST
if err := this.retryOperation(waitForRename, true); err != nil {
// Abort! Release the lock
okToUnlockTable <- true
return err
}
if atomic.LoadInt64(&tableRenameKnownToHaveFailed) == 0 {
log.Infof("Found atomic RENAME to be blocking, as expected. Double checking the lock is still in place (though I don't strictly have to)")
}
if err := this.applier.ExpectUsedLock(lockOriginalSessionId); err != nil {
// Abort operation. Just make sure to drop the magic table.
return log.Errore(err)
}
log.Infof("Connection holding lock on original table still exists")
// Now that we've found the RENAME blocking, AND the locking connection still alive,
// we know it is safe to proceed to release the lock
okToUnlockTable <- true
// BAM! magic table dropped, original table lock is released
// -> RENAME released -> queries on original are unblocked.
if err := <-tableUnlocked; err != nil {
return log.Errore(err)
}
if err := <-tablesRenamed; err != nil {
return log.Errore(err)
}
this.migrationContext.RenameTablesEndTime = time.Now()
// ooh nice! We're actually truly and thankfully done
lockAndRenameDuration := this.migrationContext.RenameTablesEndTime.Sub(this.migrationContext.LockTablesStartTime)
log.Infof("Lock & rename duration: %s. During this time, queries on %s were blocked", lockAndRenameDuration, sql.EscapeName(this.migrationContext.OriginalTableName))
return nil
}
// cutOverSafe performs a safe cut over, where normally (no failure) the original table
// is being locked until swapped, hence DML queries being locked and unaware of the cut-over.
// In the worst case, there will ba a minor outage, where the original table would not exist.