Merge pull request #77 from github/cut-over-lock-table-names

Solved cut-over stall; change of table names
This commit is contained in:
Shlomi Noach 2016-06-21 12:56:53 +02:00 committed by GitHub
commit b6d88ddece
4 changed files with 47 additions and 15 deletions

View File

@ -1,7 +1,7 @@
#!/bin/bash #!/bin/bash
# #
# #
RELEASE_VERSION="0.9.6" RELEASE_VERSION="0.9.7"
buildpath=/tmp/gh-ost buildpath=/tmp/gh-ost
target=gh-ost target=gh-ost

View File

@ -164,20 +164,23 @@ func GetMigrationContext() *MigrationContext {
// GetGhostTableName generates the name of ghost table, based on original table name // GetGhostTableName generates the name of ghost table, based on original table name
func (this *MigrationContext) GetGhostTableName() string { func (this *MigrationContext) GetGhostTableName() string {
return fmt.Sprintf("_%s_gst", this.OriginalTableName) return fmt.Sprintf("_%s_gho", this.OriginalTableName)
} }
// GetOldTableName generates the name of the "old" table, into which the original table is renamed. // GetOldTableName generates the name of the "old" table, into which the original table is renamed.
func (this *MigrationContext) GetOldTableName() string { func (this *MigrationContext) GetOldTableName() string {
// if this.TestOnReplica { if this.TestOnReplica {
// return fmt.Sprintf("_%s_tst", this.OriginalTableName) return fmt.Sprintf("_%s_ght", this.OriginalTableName)
// } }
return fmt.Sprintf("_%s_old", this.OriginalTableName) if this.MigrateOnReplica {
return fmt.Sprintf("_%s_ghr", this.OriginalTableName)
}
return fmt.Sprintf("_%s_del", this.OriginalTableName)
} }
// GetChangelogTableName generates the name of changelog table, based on original table name // GetChangelogTableName generates the name of changelog table, based on original table name
func (this *MigrationContext) GetChangelogTableName() string { func (this *MigrationContext) GetChangelogTableName() string {
return fmt.Sprintf("_%s_osc", this.OriginalTableName) return fmt.Sprintf("_%s_ghc", this.OriginalTableName)
} }
// GetVoluntaryLockName returns a name of a voluntary lock to be used throughout // GetVoluntaryLockName returns a name of a voluntary lock to be used throughout

View File

@ -607,6 +607,7 @@ func (this *Applier) LockOriginalTableAndWait(sessionIdChan chan int64, tableLoc
var sessionId int64 var sessionId int64
if err := tx.QueryRow(`select connection_id()`).Scan(&sessionId); err != nil { if err := tx.QueryRow(`select connection_id()`).Scan(&sessionId); err != nil {
tableLocked <- err
return err return err
} }
sessionIdChan <- sessionId sessionIdChan <- sessionId
@ -616,7 +617,17 @@ func (this *Applier) LockOriginalTableAndWait(sessionIdChan chan int64, tableLoc
lockName := this.GetSessionLockName(sessionId) lockName := this.GetSessionLockName(sessionId)
log.Infof("Grabbing voluntary lock: %s", lockName) log.Infof("Grabbing voluntary lock: %s", lockName)
if err := tx.QueryRow(query, lockName).Scan(&lockResult); err != nil || lockResult != 1 { if err := tx.QueryRow(query, lockName).Scan(&lockResult); err != nil || lockResult != 1 {
return fmt.Errorf("Unable to acquire lock %s", lockName) err := fmt.Errorf("Unable to acquire lock %s", lockName)
tableLocked <- err
return err
}
tableLockTimeoutSeconds := this.migrationContext.SwapTablesTimeoutSeconds * 2
log.Infof("Setting LOCK timeout as %d seconds", tableLockTimeoutSeconds)
query = fmt.Sprintf(`set session lock_wait_timeout:=%d`, tableLockTimeoutSeconds)
if _, err := tx.Exec(query); err != nil {
tableLocked <- err
return err
} }
query = fmt.Sprintf(`lock /* gh-ost */ tables %s.%s write`, query = fmt.Sprintf(`lock /* gh-ost */ tables %s.%s write`,

View File

@ -415,7 +415,6 @@ func (this *Migrator) Migrate() (err error) {
this.consumeRowCopyComplete() this.consumeRowCopyComplete()
log.Infof("Row copy complete") log.Infof("Row copy complete")
this.printStatus(ForcePrintStatusRule) this.printStatus(ForcePrintStatusRule)
this.migrationContext.MarkPointOfInterest()
if err := this.cutOver(); err != nil { if err := this.cutOver(); err != nil {
return err return err
@ -435,10 +434,12 @@ func (this *Migrator) cutOver() (err error) {
log.Debugf("Noop operation; not really swapping tables") log.Debugf("Noop operation; not really swapping tables")
return nil return nil
} }
this.migrationContext.MarkPointOfInterest()
this.throttle(func() { this.throttle(func() {
log.Debugf("throttling before swapping tables") log.Debugf("throttling before swapping tables")
}) })
this.migrationContext.MarkPointOfInterest()
this.sleepWhileTrue( this.sleepWhileTrue(
func() (bool, error) { func() (bool, error) {
if this.migrationContext.PostponeCutOverFlagFile == "" { if this.migrationContext.PostponeCutOverFlagFile == "" {
@ -454,6 +455,7 @@ func (this *Migrator) cutOver() (err error) {
}, },
) )
atomic.StoreInt64(&this.migrationContext.IsPostponingCutOver, 0) atomic.StoreInt64(&this.migrationContext.IsPostponingCutOver, 0)
this.migrationContext.MarkPointOfInterest()
if this.migrationContext.TestOnReplica { if this.migrationContext.TestOnReplica {
// With `--test-on-replica` we stop replication thread, and then proceed to use // With `--test-on-replica` we stop replication thread, and then proceed to use
@ -478,15 +480,20 @@ func (this *Migrator) cutOver() (err error) {
return err return err
} }
if this.migrationContext.CutOverType == base.CutOverTwoStep { if this.migrationContext.CutOverType == base.CutOverTwoStep {
err := this.retryOperation(this.cutOverTwoStep) err := this.retryOperation(
func() error {
return this.executeAndThrottleOnError(this.cutOverTwoStep)
},
)
return err return err
} }
return nil return log.Fatalf("Unknown cut-over type: %d; should never get here!", this.migrationContext.CutOverType)
} }
// Inject the "AllEventsUpToLockProcessed" state hint, wait for it to appear in the binary logs, // Inject the "AllEventsUpToLockProcessed" state hint, wait for it to appear in the binary logs,
// make sure the queue is drained. // make sure the queue is drained.
func (this *Migrator) waitForEventsUpToLock() (err error) { func (this *Migrator) waitForEventsUpToLock() (err error) {
this.migrationContext.MarkPointOfInterest()
waitForEventsUpToLockStartTime := time.Now() waitForEventsUpToLockStartTime := time.Now()
log.Infof("Writing changelog state: %+v", AllEventsUpToLockProcessed) log.Infof("Writing changelog state: %+v", AllEventsUpToLockProcessed)
@ -541,19 +548,28 @@ func (this *Migrator) safeCutOver() (err error) {
okToUnlockTable := make(chan bool, 2) okToUnlockTable := make(chan bool, 2)
originalTableRenamed := make(chan error, 1) originalTableRenamed := make(chan error, 1)
var originalTableRenameIntended int64
defer func() { defer func() {
log.Infof("Checking to see if we need to roll back")
// The following is to make sure we unlock the table no-matter-what! // The following is to make sure we unlock the table no-matter-what!
// There's enough buffer in the channel to support a redundant write here. // There's enough buffer in the channel to support a redundant write here.
okToUnlockTable <- true okToUnlockTable <- true
if atomic.LoadInt64(&originalTableRenameIntended) == 1 {
log.Infof("Waiting for original table rename result")
// We need to make sure we wait for the original-rename, successful or not, // We need to make sure we wait for the original-rename, successful or not,
// so as to be able to rollback in case the ghost-rename fails. // so as to be able to rollback in case the ghost-rename fails.
// But we only wait on this queue if there's actually going to be a rename.
// As an example, what happens should the initial `lock tables` fail? We would
// never proceed to rename the table, hence this queue is never written to.
<-originalTableRenamed <-originalTableRenamed
}
// Rollback operation // Rollback operation
if !this.applier.tableExists(this.migrationContext.OriginalTableName) { if !this.applier.tableExists(this.migrationContext.OriginalTableName) {
log.Infof("Cannot find %s, rolling back", this.migrationContext.OriginalTableName) log.Infof("Cannot find %s, rolling back", this.migrationContext.OriginalTableName)
err := this.applier.RenameTable(this.migrationContext.GetOldTableName(), this.migrationContext.OriginalTableName) err := this.applier.RenameTable(this.migrationContext.GetOldTableName(), this.migrationContext.OriginalTableName)
log.Errore(err) log.Errore(err)
} else {
log.Info("No need for rollback")
} }
}() }()
lockOriginalSessionIdChan := make(chan int64, 1) lockOriginalSessionIdChan := make(chan int64, 1)
@ -577,6 +593,8 @@ func (this *Migrator) safeCutOver() (err error) {
// We now attempt a RENAME on the original table, and expect it to block // We now attempt a RENAME on the original table, and expect it to block
renameOriginalSessionIdChan := make(chan int64, 1) renameOriginalSessionIdChan := make(chan int64, 1)
this.migrationContext.RenameTablesStartTime = time.Now() this.migrationContext.RenameTablesStartTime = time.Now()
atomic.StoreInt64(&originalTableRenameIntended, 1)
go func() { go func() {
this.applier.RenameOriginalTable(renameOriginalSessionIdChan, originalTableRenamed) this.applier.RenameOriginalTable(renameOriginalSessionIdChan, originalTableRenamed)
}() }()