diff --git a/build.sh b/build.sh index 1743b4d..9657437 100644 --- a/build.sh +++ b/build.sh @@ -1,7 +1,7 @@ #!/bin/bash # # -RELEASE_VERSION="0.9.6" +RELEASE_VERSION="0.9.7" buildpath=/tmp/gh-ost target=gh-ost diff --git a/go/base/context.go b/go/base/context.go index 6fcabad..d912df7 100644 --- a/go/base/context.go +++ b/go/base/context.go @@ -164,20 +164,23 @@ func GetMigrationContext() *MigrationContext { // GetGhostTableName generates the name of ghost table, based on original table name func (this *MigrationContext) GetGhostTableName() string { - return fmt.Sprintf("_%s_gst", this.OriginalTableName) + return fmt.Sprintf("_%s_gho", this.OriginalTableName) } // GetOldTableName generates the name of the "old" table, into which the original table is renamed. func (this *MigrationContext) GetOldTableName() string { - // if this.TestOnReplica { - // return fmt.Sprintf("_%s_tst", this.OriginalTableName) - // } - return fmt.Sprintf("_%s_old", this.OriginalTableName) + if this.TestOnReplica { + return fmt.Sprintf("_%s_ght", this.OriginalTableName) + } + if this.MigrateOnReplica { + return fmt.Sprintf("_%s_ghr", this.OriginalTableName) + } + return fmt.Sprintf("_%s_del", this.OriginalTableName) } // GetChangelogTableName generates the name of changelog table, based on original table name func (this *MigrationContext) GetChangelogTableName() string { - return fmt.Sprintf("_%s_osc", this.OriginalTableName) + return fmt.Sprintf("_%s_ghc", this.OriginalTableName) } // GetVoluntaryLockName returns a name of a voluntary lock to be used throughout diff --git a/go/logic/applier.go b/go/logic/applier.go index 1f1e114..2a770ca 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -607,6 +607,7 @@ func (this *Applier) LockOriginalTableAndWait(sessionIdChan chan int64, tableLoc var sessionId int64 if err := tx.QueryRow(`select connection_id()`).Scan(&sessionId); err != nil { + tableLocked <- err return err } sessionIdChan <- sessionId @@ -616,7 +617,17 @@ func (this *Applier) LockOriginalTableAndWait(sessionIdChan chan int64, tableLoc lockName := this.GetSessionLockName(sessionId) log.Infof("Grabbing voluntary lock: %s", lockName) if err := tx.QueryRow(query, lockName).Scan(&lockResult); err != nil || lockResult != 1 { - return fmt.Errorf("Unable to acquire lock %s", lockName) + err := fmt.Errorf("Unable to acquire lock %s", lockName) + tableLocked <- err + return err + } + + tableLockTimeoutSeconds := this.migrationContext.SwapTablesTimeoutSeconds * 2 + log.Infof("Setting LOCK timeout as %d seconds", tableLockTimeoutSeconds) + query = fmt.Sprintf(`set session lock_wait_timeout:=%d`, tableLockTimeoutSeconds) + if _, err := tx.Exec(query); err != nil { + tableLocked <- err + return err } query = fmt.Sprintf(`lock /* gh-ost */ tables %s.%s write`, diff --git a/go/logic/migrator.go b/go/logic/migrator.go index 4d80506..00ce4ce 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -415,7 +415,6 @@ func (this *Migrator) Migrate() (err error) { this.consumeRowCopyComplete() log.Infof("Row copy complete") this.printStatus(ForcePrintStatusRule) - this.migrationContext.MarkPointOfInterest() if err := this.cutOver(); err != nil { return err @@ -435,10 +434,12 @@ func (this *Migrator) cutOver() (err error) { log.Debugf("Noop operation; not really swapping tables") return nil } + this.migrationContext.MarkPointOfInterest() this.throttle(func() { log.Debugf("throttling before swapping tables") }) + this.migrationContext.MarkPointOfInterest() this.sleepWhileTrue( func() (bool, error) { if this.migrationContext.PostponeCutOverFlagFile == "" { @@ -454,6 +455,7 @@ func (this *Migrator) cutOver() (err error) { }, ) atomic.StoreInt64(&this.migrationContext.IsPostponingCutOver, 0) + this.migrationContext.MarkPointOfInterest() if this.migrationContext.TestOnReplica { // With `--test-on-replica` we stop replication thread, and then proceed to use @@ -478,15 +480,20 @@ func (this *Migrator) cutOver() (err error) { return err } if this.migrationContext.CutOverType == base.CutOverTwoStep { - err := this.retryOperation(this.cutOverTwoStep) + err := this.retryOperation( + func() error { + return this.executeAndThrottleOnError(this.cutOverTwoStep) + }, + ) return err } - return nil + return log.Fatalf("Unknown cut-over type: %d; should never get here!", this.migrationContext.CutOverType) } // Inject the "AllEventsUpToLockProcessed" state hint, wait for it to appear in the binary logs, // make sure the queue is drained. func (this *Migrator) waitForEventsUpToLock() (err error) { + this.migrationContext.MarkPointOfInterest() waitForEventsUpToLockStartTime := time.Now() log.Infof("Writing changelog state: %+v", AllEventsUpToLockProcessed) @@ -541,19 +548,28 @@ func (this *Migrator) safeCutOver() (err error) { okToUnlockTable := make(chan bool, 2) originalTableRenamed := make(chan error, 1) + var originalTableRenameIntended int64 defer func() { + log.Infof("Checking to see if we need to roll back") // The following is to make sure we unlock the table no-matter-what! // There's enough buffer in the channel to support a redundant write here. okToUnlockTable <- true - // We need to make sure we wait for the original-rename, successful or not, - // so as to be able to rollback in case the ghost-rename fails. - <-originalTableRenamed - + if atomic.LoadInt64(&originalTableRenameIntended) == 1 { + log.Infof("Waiting for original table rename result") + // We need to make sure we wait for the original-rename, successful or not, + // so as to be able to rollback in case the ghost-rename fails. + // But we only wait on this queue if there's actually going to be a rename. + // As an example, what happens should the initial `lock tables` fail? We would + // never proceed to rename the table, hence this queue is never written to. + <-originalTableRenamed + } // Rollback operation if !this.applier.tableExists(this.migrationContext.OriginalTableName) { log.Infof("Cannot find %s, rolling back", this.migrationContext.OriginalTableName) err := this.applier.RenameTable(this.migrationContext.GetOldTableName(), this.migrationContext.OriginalTableName) log.Errore(err) + } else { + log.Info("No need for rollback") } }() lockOriginalSessionIdChan := make(chan int64, 1) @@ -577,6 +593,8 @@ func (this *Migrator) safeCutOver() (err error) { // We now attempt a RENAME on the original table, and expect it to block renameOriginalSessionIdChan := make(chan int64, 1) this.migrationContext.RenameTablesStartTime = time.Now() + atomic.StoreInt64(&originalTableRenameIntended, 1) + go func() { this.applier.RenameOriginalTable(renameOriginalSessionIdChan, originalTableRenamed) }()