Merge pull request #84 from github/cut-over-atomic
an atomic cut-over implementation
This commit is contained in:
commit
b583458bc2
2
build.sh
2
build.sh
@ -1,7 +1,7 @@
|
||||
#!/bin/bash
|
||||
#
|
||||
#
|
||||
RELEASE_VERSION="0.9.7"
|
||||
RELEASE_VERSION="0.9.8"
|
||||
|
||||
buildpath=/tmp/gh-ost
|
||||
target=gh-ost
|
||||
|
@ -30,7 +30,8 @@ const (
|
||||
type CutOver int
|
||||
|
||||
const (
|
||||
CutOverSafe CutOver = iota
|
||||
CutOverAtomic CutOver = iota
|
||||
CutOverSafe = iota
|
||||
CutOverTwoStep = iota
|
||||
)
|
||||
|
||||
|
@ -66,7 +66,7 @@ func main() {
|
||||
flag.BoolVar(&migrationContext.OkToDropTable, "ok-to-drop-table", false, "Shall the tool drop the old table at end of operation. DROPping tables can be a long locking operation, which is why I'm not doing it by default. I'm an online tool, yes?")
|
||||
flag.BoolVar(&migrationContext.InitiallyDropOldTable, "initially-drop-old-table", false, "Drop a possibly existing OLD table (remains from a previous run?) before beginning operation. Default is to panic and abort if such table exists")
|
||||
flag.BoolVar(&migrationContext.InitiallyDropGhostTable, "initially-drop-ghost-table", false, "Drop a possibly existing Ghost table (remains from a previous run?) before beginning operation. Default is to panic and abort if such table exists")
|
||||
cutOver := flag.String("cut-over", "", "(mandatory) choose cut-over type (two-step, voluntary-lock)")
|
||||
cutOver := flag.String("cut-over", "atomic", "choose cut-over type (atomic, two-step, voluntary-lock)")
|
||||
|
||||
flag.BoolVar(&migrationContext.SwitchToRowBinlogFormat, "switch-to-rbr", false, "let this tool automatically switch binary log format to 'ROW' on the replica, if needed. The format will NOT be switched back. I'm too scared to do that, and wish to protect you if you happen to execute another migration while this one is running")
|
||||
chunkSize := flag.Int64("chunk-size", 1000, "amount of rows to handle in each iteration (allowed range: 100-100,000)")
|
||||
@ -144,7 +144,9 @@ func main() {
|
||||
}
|
||||
|
||||
switch *cutOver {
|
||||
case "safe", "default", "":
|
||||
case "atomic", "default", "":
|
||||
migrationContext.CutOverType = base.CutOverAtomic
|
||||
case "safe":
|
||||
migrationContext.CutOverType = base.CutOverSafe
|
||||
case "two-step":
|
||||
migrationContext.CutOverType = base.CutOverTwoStep
|
||||
|
@ -20,6 +20,10 @@ import (
|
||||
"github.com/outbrain/golib/sqlutils"
|
||||
)
|
||||
|
||||
const (
|
||||
atomicCutOverMagicHint = "ghost-cut-over-sentry"
|
||||
)
|
||||
|
||||
// Applier connects and writes the the applier-server, which is the server where migration
|
||||
// happens. This is typically the master, but could be a replica when `--test-on-replica` or
|
||||
// `--execute-on-replica` are given.
|
||||
@ -77,15 +81,21 @@ func (this *Applier) validateConnection(db *gosql.DB) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// tableExists checks if a given table exists in database
|
||||
func (this *Applier) tableExists(tableName string) (tableFound bool) {
|
||||
// showTableStatus returns the output of `show table status like '...'` command
|
||||
func (this *Applier) showTableStatus(tableName string) (rowMap sqlutils.RowMap) {
|
||||
rowMap = nil
|
||||
query := fmt.Sprintf(`show /* gh-ost */ table status from %s like '%s'`, sql.EscapeName(this.migrationContext.DatabaseName), tableName)
|
||||
|
||||
sqlutils.QueryRowsMap(this.db, query, func(m sqlutils.RowMap) error {
|
||||
tableFound = true
|
||||
rowMap = m
|
||||
return nil
|
||||
})
|
||||
return tableFound
|
||||
return rowMap
|
||||
}
|
||||
|
||||
// tableExists checks if a given table exists in database
|
||||
func (this *Applier) tableExists(tableName string) (tableFound bool) {
|
||||
m := this.showTableStatus(tableName)
|
||||
return (m != nil)
|
||||
}
|
||||
|
||||
// ValidateOrDropExistingTables verifies ghost and changelog tables do not exist,
|
||||
@ -775,6 +785,195 @@ func (this *Applier) ExpectProcess(sessionId int64, stateHint, infoHint string)
|
||||
return nil
|
||||
}
|
||||
|
||||
// DropAtomicCutOverSentryTableIfExists checks if the "old" table name
|
||||
// happens to be a cut-over magic table; if so, it drops it.
|
||||
func (this *Applier) DropAtomicCutOverSentryTableIfExists() error {
|
||||
log.Infof("Looking for magic cut-over table")
|
||||
tableName := this.migrationContext.GetOldTableName()
|
||||
rowMap := this.showTableStatus(tableName)
|
||||
if rowMap == nil {
|
||||
// Table does not exist
|
||||
return nil
|
||||
}
|
||||
if rowMap["Comment"].String != atomicCutOverMagicHint {
|
||||
return fmt.Errorf("Expected magic comment on %s, did not find it", tableName)
|
||||
}
|
||||
log.Infof("Dropping magic cut-over table")
|
||||
return this.dropTable(tableName)
|
||||
}
|
||||
|
||||
// DropAtomicCutOverSentryTableIfExists checks if the "old" table name
|
||||
// happens to be a cut-over magic table; if so, it drops it.
|
||||
func (this *Applier) CreateAtomicCutOverSentryTable() error {
|
||||
if err := this.DropAtomicCutOverSentryTableIfExists(); err != nil {
|
||||
return err
|
||||
}
|
||||
tableName := this.migrationContext.GetOldTableName()
|
||||
|
||||
query := fmt.Sprintf(`create /* gh-ost */ table %s.%s (
|
||||
id int auto_increment primary key
|
||||
) comment='%s'
|
||||
`,
|
||||
sql.EscapeName(this.migrationContext.DatabaseName),
|
||||
sql.EscapeName(tableName),
|
||||
atomicCutOverMagicHint,
|
||||
)
|
||||
log.Infof("Creating magic cut-over table %s.%s",
|
||||
sql.EscapeName(this.migrationContext.DatabaseName),
|
||||
sql.EscapeName(tableName),
|
||||
)
|
||||
if _, err := sqlutils.ExecNoPrepare(this.db, query); err != nil {
|
||||
return err
|
||||
}
|
||||
log.Infof("Magic cut-over table created")
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// AtomicCutOverMagicLock
|
||||
func (this *Applier) AtomicCutOverMagicLock(sessionIdChan chan int64, tableLocked chan<- error, okToUnlockTable <-chan bool, tableUnlocked chan<- error) error {
|
||||
tx, err := this.db.Begin()
|
||||
if err != nil {
|
||||
tableLocked <- err
|
||||
return err
|
||||
}
|
||||
defer func() {
|
||||
sessionIdChan <- -1
|
||||
tableLocked <- fmt.Errorf("Unexpected error in AtomicCutOverMagicLock(), injected to release blocking channel reads")
|
||||
tableUnlocked <- fmt.Errorf("Unexpected error in AtomicCutOverMagicLock(), injected to release blocking channel reads")
|
||||
tx.Rollback()
|
||||
}()
|
||||
|
||||
var sessionId int64
|
||||
if err := tx.QueryRow(`select connection_id()`).Scan(&sessionId); err != nil {
|
||||
tableLocked <- err
|
||||
return err
|
||||
}
|
||||
sessionIdChan <- sessionId
|
||||
|
||||
lockResult := 0
|
||||
query := `select get_lock(?, 0)`
|
||||
lockName := this.GetSessionLockName(sessionId)
|
||||
log.Infof("Grabbing voluntary lock: %s", lockName)
|
||||
if err := tx.QueryRow(query, lockName).Scan(&lockResult); err != nil || lockResult != 1 {
|
||||
err := fmt.Errorf("Unable to acquire lock %s", lockName)
|
||||
tableLocked <- err
|
||||
return err
|
||||
}
|
||||
|
||||
tableLockTimeoutSeconds := this.migrationContext.SwapTablesTimeoutSeconds * 2
|
||||
log.Infof("Setting LOCK timeout as %d seconds", tableLockTimeoutSeconds)
|
||||
query = fmt.Sprintf(`set session lock_wait_timeout:=%d`, tableLockTimeoutSeconds)
|
||||
if _, err := tx.Exec(query); err != nil {
|
||||
tableLocked <- err
|
||||
return err
|
||||
}
|
||||
|
||||
if err := this.CreateAtomicCutOverSentryTable(); err != nil {
|
||||
tableLocked <- err
|
||||
return err
|
||||
}
|
||||
|
||||
query = fmt.Sprintf(`lock /* gh-ost */ tables %s.%s write, %s.%s write`,
|
||||
sql.EscapeName(this.migrationContext.DatabaseName),
|
||||
sql.EscapeName(this.migrationContext.OriginalTableName),
|
||||
sql.EscapeName(this.migrationContext.DatabaseName),
|
||||
sql.EscapeName(this.migrationContext.GetOldTableName()),
|
||||
)
|
||||
log.Infof("Locking %s.%s, %s.%s",
|
||||
sql.EscapeName(this.migrationContext.DatabaseName),
|
||||
sql.EscapeName(this.migrationContext.OriginalTableName),
|
||||
sql.EscapeName(this.migrationContext.DatabaseName),
|
||||
sql.EscapeName(this.migrationContext.GetOldTableName()),
|
||||
)
|
||||
this.migrationContext.LockTablesStartTime = time.Now()
|
||||
if _, err := tx.Exec(query); err != nil {
|
||||
tableLocked <- err
|
||||
return err
|
||||
}
|
||||
log.Infof("Tables locked")
|
||||
tableLocked <- nil // No error.
|
||||
|
||||
// From this point on, we are committed to UNLOCK TABLES. No matter what happens,
|
||||
// the UNLOCK must execute (or, alternatively, this connection dies, which gets the same impact)
|
||||
|
||||
// The cut-over phase will proceed to apply remaining backlog onto ghost table,
|
||||
// and issue RENAME. We wait here until told to proceed.
|
||||
<-okToUnlockTable
|
||||
log.Infof("Will now proceed to drop magic table and unlock tables")
|
||||
|
||||
// The magic table is here because we locked it. And we are the only ones allowed to drop it.
|
||||
// And in fact, we will:
|
||||
log.Infof("Dropping magic cut-over table")
|
||||
query = fmt.Sprintf(`drop /* gh-ost */ table if exists %s.%s`,
|
||||
sql.EscapeName(this.migrationContext.DatabaseName),
|
||||
sql.EscapeName(this.migrationContext.GetOldTableName()),
|
||||
)
|
||||
if _, err := tx.Exec(query); err != nil {
|
||||
log.Errore(err)
|
||||
// We DO NOT return here because we must `UNLOCK TABLES`!
|
||||
}
|
||||
|
||||
// Tables still locked
|
||||
log.Infof("Releasing lock from %s.%s, %s.%s",
|
||||
sql.EscapeName(this.migrationContext.DatabaseName),
|
||||
sql.EscapeName(this.migrationContext.OriginalTableName),
|
||||
sql.EscapeName(this.migrationContext.DatabaseName),
|
||||
sql.EscapeName(this.migrationContext.GetOldTableName()),
|
||||
)
|
||||
query = `unlock tables`
|
||||
if _, err := tx.Exec(query); err != nil {
|
||||
tableUnlocked <- err
|
||||
return log.Errore(err)
|
||||
}
|
||||
log.Infof("Tables unlocked")
|
||||
tableUnlocked <- nil
|
||||
return nil
|
||||
}
|
||||
|
||||
// RenameOriginalTable will attempt renaming the original table into _old
|
||||
func (this *Applier) AtomicCutoverRename(sessionIdChan chan int64, tablesRenamed chan<- error) error {
|
||||
tx, err := this.db.Begin()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer func() {
|
||||
tx.Rollback()
|
||||
sessionIdChan <- -1
|
||||
tablesRenamed <- fmt.Errorf("Unexpected error in AtomicCutoverRename(), injected to release blocking channel reads")
|
||||
}()
|
||||
var sessionId int64
|
||||
if err := tx.QueryRow(`select connection_id()`).Scan(&sessionId); err != nil {
|
||||
return err
|
||||
}
|
||||
sessionIdChan <- sessionId
|
||||
|
||||
log.Infof("Setting RENAME timeout as %d seconds", this.migrationContext.SwapTablesTimeoutSeconds)
|
||||
query := fmt.Sprintf(`set session lock_wait_timeout:=%d`, this.migrationContext.SwapTablesTimeoutSeconds)
|
||||
if _, err := tx.Exec(query); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
query = fmt.Sprintf(`rename /* gh-ost */ table %s.%s to %s.%s, %s.%s to %s.%s`,
|
||||
sql.EscapeName(this.migrationContext.DatabaseName),
|
||||
sql.EscapeName(this.migrationContext.OriginalTableName),
|
||||
sql.EscapeName(this.migrationContext.DatabaseName),
|
||||
sql.EscapeName(this.migrationContext.GetOldTableName()),
|
||||
sql.EscapeName(this.migrationContext.DatabaseName),
|
||||
sql.EscapeName(this.migrationContext.GetGhostTableName()),
|
||||
sql.EscapeName(this.migrationContext.DatabaseName),
|
||||
sql.EscapeName(this.migrationContext.OriginalTableName),
|
||||
)
|
||||
log.Infof("Issuing and expecting this to block: %s", query)
|
||||
if _, err := tx.Exec(query); err != nil {
|
||||
tablesRenamed <- err
|
||||
return log.Errore(err)
|
||||
}
|
||||
tablesRenamed <- nil
|
||||
log.Infof("Tables renamed")
|
||||
return nil
|
||||
}
|
||||
|
||||
func (this *Applier) ShowStatusVariable(variableName string) (result int64, err error) {
|
||||
query := fmt.Sprintf(`show global status like '%s'`, variableName)
|
||||
if err := this.db.QueryRow(query).Scan(&variableName, &result); err != nil {
|
||||
|
@ -238,7 +238,7 @@ func (this *Migrator) sleepWhileTrue(operation func() (bool, error)) error {
|
||||
|
||||
// retryOperation attempts up to `count` attempts at running given function,
|
||||
// exiting as soon as it returns with non-error.
|
||||
func (this *Migrator) retryOperation(operation func() error) (err error) {
|
||||
func (this *Migrator) retryOperation(operation func() error, notFatalHint ...bool) (err error) {
|
||||
maxRetries := int(this.migrationContext.MaxRetries())
|
||||
for i := 0; i < maxRetries; i++ {
|
||||
if i != 0 {
|
||||
@ -251,7 +251,9 @@ func (this *Migrator) retryOperation(operation func() error) (err error) {
|
||||
}
|
||||
// there's an error. Let's try again.
|
||||
}
|
||||
this.panicAbort <- err
|
||||
if len(notFatalHint) == 0 {
|
||||
this.panicAbort <- err
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
@ -469,6 +471,16 @@ func (this *Migrator) cutOver() (err error) {
|
||||
// We're merly testing, we don't want to keep this state. Rollback the renames as possible
|
||||
defer this.applier.RenameTablesRollback()
|
||||
}
|
||||
if this.migrationContext.CutOverType == base.CutOverAtomic {
|
||||
// Atomic solution: we use low timeout and multiple attempts. But for
|
||||
// each failed attempt, we throttle until replication lag is back to normal
|
||||
err := this.retryOperation(
|
||||
func() error {
|
||||
return this.executeAndThrottleOnError(this.atomicCutOver)
|
||||
},
|
||||
)
|
||||
return err
|
||||
}
|
||||
if this.migrationContext.CutOverType == base.CutOverSafe {
|
||||
// Lock-based solution: we use low timeout and multiple attempts. But for
|
||||
// each failed attempt, we throttle until replication lag is back to normal
|
||||
@ -539,6 +551,93 @@ func (this *Migrator) cutOverTwoStep() (err error) {
|
||||
return nil
|
||||
}
|
||||
|
||||
// atomicCutOver
|
||||
func (this *Migrator) atomicCutOver() (err error) {
|
||||
atomic.StoreInt64(&this.inCutOverCriticalActionFlag, 1)
|
||||
defer atomic.StoreInt64(&this.inCutOverCriticalActionFlag, 0)
|
||||
|
||||
defer func() {
|
||||
this.applier.DropAtomicCutOverSentryTableIfExists()
|
||||
}()
|
||||
|
||||
lockOriginalSessionIdChan := make(chan int64, 2)
|
||||
tableLocked := make(chan error, 2)
|
||||
okToUnlockTable := make(chan bool, 3)
|
||||
tableUnlocked := make(chan error, 2)
|
||||
go func() {
|
||||
if err := this.applier.AtomicCutOverMagicLock(lockOriginalSessionIdChan, tableLocked, okToUnlockTable, tableUnlocked); err != nil {
|
||||
log.Errore(err)
|
||||
}
|
||||
}()
|
||||
if err := <-tableLocked; err != nil {
|
||||
return log.Errore(err)
|
||||
}
|
||||
lockOriginalSessionId := <-lockOriginalSessionIdChan
|
||||
log.Infof("Session locking original & magic tables is %+v", lockOriginalSessionId)
|
||||
// At this point we know the original table is locked.
|
||||
// We know any newly incoming DML on original table is blocked.
|
||||
this.waitForEventsUpToLock()
|
||||
|
||||
// Step 2
|
||||
// We now attempt an atomic RENAME on original & ghost tables, and expect it to block.
|
||||
this.migrationContext.RenameTablesStartTime = time.Now()
|
||||
|
||||
var tableRenameKnownToHaveFailed int64
|
||||
renameSessionIdChan := make(chan int64, 2)
|
||||
tablesRenamed := make(chan error, 2)
|
||||
go func() {
|
||||
if err := this.applier.AtomicCutoverRename(renameSessionIdChan, tablesRenamed); err != nil {
|
||||
// Abort! Release the lock
|
||||
atomic.StoreInt64(&tableRenameKnownToHaveFailed, 1)
|
||||
okToUnlockTable <- true
|
||||
}
|
||||
}()
|
||||
renameSessionId := <-renameSessionIdChan
|
||||
log.Infof("Session renaming tables is %+v", renameSessionId)
|
||||
|
||||
waitForRename := func() error {
|
||||
if atomic.LoadInt64(&tableRenameKnownToHaveFailed) == 1 {
|
||||
// We return `nil` here so as to avoid the `retry`. The RENAME has failed,
|
||||
// it won't show up in PROCESSLIST, no point in waiting
|
||||
return nil
|
||||
}
|
||||
return this.applier.ExpectProcess(renameSessionId, "metadata lock", "rename")
|
||||
}
|
||||
// Wait for the RENAME to appear in PROCESSLIST
|
||||
if err := this.retryOperation(waitForRename, true); err != nil {
|
||||
// Abort! Release the lock
|
||||
okToUnlockTable <- true
|
||||
return err
|
||||
}
|
||||
if atomic.LoadInt64(&tableRenameKnownToHaveFailed) == 0 {
|
||||
log.Infof("Found atomic RENAME to be blocking, as expected. Double checking the lock is still in place (though I don't strictly have to)")
|
||||
}
|
||||
if err := this.applier.ExpectUsedLock(lockOriginalSessionId); err != nil {
|
||||
// Abort operation. Just make sure to drop the magic table.
|
||||
return log.Errore(err)
|
||||
}
|
||||
log.Infof("Connection holding lock on original table still exists")
|
||||
|
||||
// Now that we've found the RENAME blocking, AND the locking connection still alive,
|
||||
// we know it is safe to proceed to release the lock
|
||||
|
||||
okToUnlockTable <- true
|
||||
// BAM! magic table dropped, original table lock is released
|
||||
// -> RENAME released -> queries on original are unblocked.
|
||||
if err := <-tableUnlocked; err != nil {
|
||||
return log.Errore(err)
|
||||
}
|
||||
if err := <-tablesRenamed; err != nil {
|
||||
return log.Errore(err)
|
||||
}
|
||||
this.migrationContext.RenameTablesEndTime = time.Now()
|
||||
|
||||
// ooh nice! We're actually truly and thankfully done
|
||||
lockAndRenameDuration := this.migrationContext.RenameTablesEndTime.Sub(this.migrationContext.LockTablesStartTime)
|
||||
log.Infof("Lock & rename duration: %s. During this time, queries on %s were blocked", lockAndRenameDuration, sql.EscapeName(this.migrationContext.OriginalTableName))
|
||||
return nil
|
||||
}
|
||||
|
||||
// cutOverSafe performs a safe cut over, where normally (no failure) the original table
|
||||
// is being locked until swapped, hence DML queries being locked and unaware of the cut-over.
|
||||
// In the worst case, there will ba a minor outage, where the original table would not exist.
|
||||
|
Loading…
Reference in New Issue
Block a user