Solved cut-over stall; change of table names
- Cutover would stall after `lock tables` wait-timeout due do waiting on a channel that would never be written to. This has been identified, reproduced, fixed, confirmed. - Change of table names. Heres the story: - Because were testing this even while `pt-online-schema-change` is being used in production, the `_tbl_old` naming convention makes for a collision. - "old" table name is now `_tbl_del`, "del" standing for "delete" - ghost table name is now `_tbl_gho` - when issuing `--test-on-replica`, we keep the ghost table around, and were also briefly renaming original table to "old". Well this collides with a potentially existing "old" table on master (one that hasnt been dropped yet). `--test-on-replica` uses `_tbl_ght` (ghost-test) - similar problem with `--execute-on-replica`, and in this case the table doesnt stick around; calling it `_tbl_ghr` (ghost-replica) - changelog table is now `_tbl_ghc` (ghost-changelog) - To clarify, I dont want to go down the path of creating "old" tables with 2 or 3 or 4 or 5 or infinite leading underscored. I think this is very confusing and actually not operations friendly. Its OK that the migration will fail saying "hey, you ALREADY have an old table here, why dont you take care of it first", rather than create _yet_another_ `____tbl_old` table. Were always confused on which table it actually is that gets migrated, which is safe to `drop`, etc. - just after rowcopy completing, just before cutover, during cutover: marking as point in time _of interest_ so as to increase logging frequency.
This commit is contained in:
parent
dc8d27466c
commit
96e8419a35
2
build.sh
2
build.sh
@ -1,7 +1,7 @@
|
||||
#!/bin/bash
|
||||
#
|
||||
#
|
||||
RELEASE_VERSION="0.9.6"
|
||||
RELEASE_VERSION="0.9.7"
|
||||
|
||||
buildpath=/tmp/gh-ost
|
||||
target=gh-ost
|
||||
|
@ -164,20 +164,23 @@ func GetMigrationContext() *MigrationContext {
|
||||
|
||||
// GetGhostTableName generates the name of ghost table, based on original table name
|
||||
func (this *MigrationContext) GetGhostTableName() string {
|
||||
return fmt.Sprintf("_%s_gst", this.OriginalTableName)
|
||||
return fmt.Sprintf("_%s_gho", this.OriginalTableName)
|
||||
}
|
||||
|
||||
// GetOldTableName generates the name of the "old" table, into which the original table is renamed.
|
||||
func (this *MigrationContext) GetOldTableName() string {
|
||||
// if this.TestOnReplica {
|
||||
// return fmt.Sprintf("_%s_tst", this.OriginalTableName)
|
||||
// }
|
||||
return fmt.Sprintf("_%s_old", this.OriginalTableName)
|
||||
if this.TestOnReplica {
|
||||
return fmt.Sprintf("_%s_ght", this.OriginalTableName)
|
||||
}
|
||||
if this.MigrateOnReplica {
|
||||
return fmt.Sprintf("_%s_ghr", this.OriginalTableName)
|
||||
}
|
||||
return fmt.Sprintf("_%s_del", this.OriginalTableName)
|
||||
}
|
||||
|
||||
// GetChangelogTableName generates the name of changelog table, based on original table name
|
||||
func (this *MigrationContext) GetChangelogTableName() string {
|
||||
return fmt.Sprintf("_%s_osc", this.OriginalTableName)
|
||||
return fmt.Sprintf("_%s_ghc", this.OriginalTableName)
|
||||
}
|
||||
|
||||
// GetVoluntaryLockName returns a name of a voluntary lock to be used throughout
|
||||
|
@ -607,6 +607,7 @@ func (this *Applier) LockOriginalTableAndWait(sessionIdChan chan int64, tableLoc
|
||||
|
||||
var sessionId int64
|
||||
if err := tx.QueryRow(`select connection_id()`).Scan(&sessionId); err != nil {
|
||||
tableLocked <- err
|
||||
return err
|
||||
}
|
||||
sessionIdChan <- sessionId
|
||||
@ -616,7 +617,17 @@ func (this *Applier) LockOriginalTableAndWait(sessionIdChan chan int64, tableLoc
|
||||
lockName := this.GetSessionLockName(sessionId)
|
||||
log.Infof("Grabbing voluntary lock: %s", lockName)
|
||||
if err := tx.QueryRow(query, lockName).Scan(&lockResult); err != nil || lockResult != 1 {
|
||||
return fmt.Errorf("Unable to acquire lock %s", lockName)
|
||||
err := fmt.Errorf("Unable to acquire lock %s", lockName)
|
||||
tableLocked <- err
|
||||
return err
|
||||
}
|
||||
|
||||
tableLockTimeoutSeconds := this.migrationContext.SwapTablesTimeoutSeconds * 2
|
||||
log.Infof("Setting LOCK timeout as %d seconds", tableLockTimeoutSeconds)
|
||||
query = fmt.Sprintf(`set session lock_wait_timeout:=%d`, tableLockTimeoutSeconds)
|
||||
if _, err := tx.Exec(query); err != nil {
|
||||
tableLocked <- err
|
||||
return err
|
||||
}
|
||||
|
||||
query = fmt.Sprintf(`lock /* gh-ost */ tables %s.%s write`,
|
||||
|
@ -415,7 +415,6 @@ func (this *Migrator) Migrate() (err error) {
|
||||
this.consumeRowCopyComplete()
|
||||
log.Infof("Row copy complete")
|
||||
this.printStatus(ForcePrintStatusRule)
|
||||
this.migrationContext.MarkPointOfInterest()
|
||||
|
||||
if err := this.cutOver(); err != nil {
|
||||
return err
|
||||
@ -435,10 +434,12 @@ func (this *Migrator) cutOver() (err error) {
|
||||
log.Debugf("Noop operation; not really swapping tables")
|
||||
return nil
|
||||
}
|
||||
this.migrationContext.MarkPointOfInterest()
|
||||
this.throttle(func() {
|
||||
log.Debugf("throttling before swapping tables")
|
||||
})
|
||||
|
||||
this.migrationContext.MarkPointOfInterest()
|
||||
this.sleepWhileTrue(
|
||||
func() (bool, error) {
|
||||
if this.migrationContext.PostponeCutOverFlagFile == "" {
|
||||
@ -454,6 +455,7 @@ func (this *Migrator) cutOver() (err error) {
|
||||
},
|
||||
)
|
||||
atomic.StoreInt64(&this.migrationContext.IsPostponingCutOver, 0)
|
||||
this.migrationContext.MarkPointOfInterest()
|
||||
|
||||
if this.migrationContext.TestOnReplica {
|
||||
// With `--test-on-replica` we stop replication thread, and then proceed to use
|
||||
@ -478,15 +480,20 @@ func (this *Migrator) cutOver() (err error) {
|
||||
return err
|
||||
}
|
||||
if this.migrationContext.CutOverType == base.CutOverTwoStep {
|
||||
err := this.retryOperation(this.cutOverTwoStep)
|
||||
err := this.retryOperation(
|
||||
func() error {
|
||||
return this.executeAndThrottleOnError(this.cutOverTwoStep)
|
||||
},
|
||||
)
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
return log.Fatalf("Unknown cut-over type: %d; should never get here!", this.migrationContext.CutOverType)
|
||||
}
|
||||
|
||||
// Inject the "AllEventsUpToLockProcessed" state hint, wait for it to appear in the binary logs,
|
||||
// make sure the queue is drained.
|
||||
func (this *Migrator) waitForEventsUpToLock() (err error) {
|
||||
this.migrationContext.MarkPointOfInterest()
|
||||
waitForEventsUpToLockStartTime := time.Now()
|
||||
|
||||
log.Infof("Writing changelog state: %+v", AllEventsUpToLockProcessed)
|
||||
@ -541,19 +548,28 @@ func (this *Migrator) safeCutOver() (err error) {
|
||||
|
||||
okToUnlockTable := make(chan bool, 2)
|
||||
originalTableRenamed := make(chan error, 1)
|
||||
var originalTableRenameIntended int64
|
||||
defer func() {
|
||||
log.Infof("Checking to see if we need to roll back")
|
||||
// The following is to make sure we unlock the table no-matter-what!
|
||||
// There's enough buffer in the channel to support a redundant write here.
|
||||
okToUnlockTable <- true
|
||||
// We need to make sure we wait for the original-rename, successful or not,
|
||||
// so as to be able to rollback in case the ghost-rename fails.
|
||||
<-originalTableRenamed
|
||||
|
||||
if atomic.LoadInt64(&originalTableRenameIntended) == 1 {
|
||||
log.Infof("Waiting for original table rename result")
|
||||
// We need to make sure we wait for the original-rename, successful or not,
|
||||
// so as to be able to rollback in case the ghost-rename fails.
|
||||
// But we only wait on this queue if there's actually going to be a rename.
|
||||
// As an example, what happens should the initial `lock tables` fail? We would
|
||||
// never proceed to rename the table, hence this queue is never written to.
|
||||
<-originalTableRenamed
|
||||
}
|
||||
// Rollback operation
|
||||
if !this.applier.tableExists(this.migrationContext.OriginalTableName) {
|
||||
log.Infof("Cannot find %s, rolling back", this.migrationContext.OriginalTableName)
|
||||
err := this.applier.RenameTable(this.migrationContext.GetOldTableName(), this.migrationContext.OriginalTableName)
|
||||
log.Errore(err)
|
||||
} else {
|
||||
log.Info("No need for rollback")
|
||||
}
|
||||
}()
|
||||
lockOriginalSessionIdChan := make(chan int64, 1)
|
||||
@ -577,6 +593,8 @@ func (this *Migrator) safeCutOver() (err error) {
|
||||
// We now attempt a RENAME on the original table, and expect it to block
|
||||
renameOriginalSessionIdChan := make(chan int64, 1)
|
||||
this.migrationContext.RenameTablesStartTime = time.Now()
|
||||
atomic.StoreInt64(&originalTableRenameIntended, 1)
|
||||
|
||||
go func() {
|
||||
this.applier.RenameOriginalTable(renameOriginalSessionIdChan, originalTableRenamed)
|
||||
}()
|
||||
|
Loading…
x
Reference in New Issue
Block a user