diff --git a/go/base/context.go b/go/base/context.go index 33e52f2..cf36157 100644 --- a/go/base/context.go +++ b/go/base/context.go @@ -46,6 +46,7 @@ type MigrationContext struct { ThrottleFlagFile string ThrottleAdditionalFlagFile string MaxLoad map[string]int64 + SwapTablesTimeoutSeconds int64 Noop bool TestOnReplica bool @@ -98,8 +99,9 @@ func newMigrationContext() *MigrationContext { InspectorConnectionConfig: mysql.NewConnectionConfig(), ApplierConnectionConfig: mysql.NewConnectionConfig(), MaxLagMillisecondsThrottleThreshold: 1000, - MaxLoad: make(map[string]int64), - throttleMutex: &sync.Mutex{}, + SwapTablesTimeoutSeconds: 3, + MaxLoad: make(map[string]int64), + throttleMutex: &sync.Mutex{}, } } diff --git a/go/logic/applier.go b/go/logic/applier.go index 314d683..bdbc25b 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -429,20 +429,6 @@ func (this *Applier) UnlockTables() error { // SwapTablesQuickAndBumpy func (this *Applier) SwapTablesQuickAndBumpy() error { - // query := fmt.Sprintf(`rename /* gh-osc */ table %s.%s to %s.%s, %s.%s to %s.%s`, - // sql.EscapeName(this.migrationContext.DatabaseName), - // sql.EscapeName(this.migrationContext.OriginalTableName), - // sql.EscapeName(this.migrationContext.DatabaseName), - // sql.EscapeName(this.migrationContext.GetOldTableName()), - // sql.EscapeName(this.migrationContext.DatabaseName), - // sql.EscapeName(this.migrationContext.GetGhostTableName()), - // sql.EscapeName(this.migrationContext.DatabaseName), - // sql.EscapeName(this.migrationContext.OriginalTableName), - // ) - // log.Infof("Renaming tables") - // if _, err := sqlutils.ExecNoPrepare(this.singletonDB, query); err != nil { - // return err - // } query := fmt.Sprintf(`alter /* gh-osc */ table %s.%s rename %s`, sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.OriginalTableName), @@ -468,9 +454,27 @@ func (this *Applier) SwapTablesQuickAndBumpy() error { return nil } -// SwapTablesQuickAndBumpy -func (this *Applier) SwapTablesAtomic() error { - query := fmt.Sprintf(`rename /* gh-osc */ table %s.%s to %s.%s, %s.%s to %s.%s`, +// SwapTablesAtomic issues a single two-table RENAME statement to swap ghost table +// into original's place +func (this *Applier) SwapTablesAtomic(sessionIdChan chan int64) error { + + tx, err := this.db.Begin() + if err != nil { + return err + } + log.Infof("Setting timeout for RENAME for %d seconds", this.migrationContext.SwapTablesTimeoutSeconds) + query := fmt.Sprintf(`set session lock_wait_timeout:=%d`, this.migrationContext.SwapTablesTimeoutSeconds) + if _, err := tx.Exec(query); err != nil { + return err + } + + var sessionId int64 + if err := tx.QueryRow(`select connection_id()`).Scan(&sessionId); err != nil { + return err + } + sessionIdChan <- sessionId + + query = fmt.Sprintf(`rename /* gh-osc */ table %s.%s to %s.%s, %s.%s to %s.%s`, sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.OriginalTableName), sql.EscapeName(this.migrationContext.DatabaseName), @@ -481,14 +485,20 @@ func (this *Applier) SwapTablesAtomic() error { sql.EscapeName(this.migrationContext.OriginalTableName), ) log.Infof("Renaming tables") - if _, err := sqlutils.ExecNoPrepare(this.singletonDB, query); err != nil { + + this.migrationContext.RenameTablesStartTime = time.Now() + if _, err := tx.Exec(query); err != nil { return err } + this.migrationContext.RenameTablesEndTime = time.Now() + tx.Commit() log.Infof("Tables renamed") return nil } -// StopSlaveIOThread is applicable with --test-on-replica; it stops the IO thread +// StopSlaveIOThread is applicable with --test-on-replica; it stops the IO thread, duh. +// We need to keep the SQL thread active so as to complete processing received events, +// and have them written to the binary log, so that we can then read them via streamer func (this *Applier) StopSlaveIOThread() error { query := `stop /* gh-osc */ slave io_thread` log.Infof("Stopping replication") @@ -499,6 +509,116 @@ func (this *Applier) StopSlaveIOThread() error { return nil } +// GrabVoluntaryLock gets a named lock (`GET_LOCK`) and listens +// on a okToRelease in order to release it +func (this *Applier) GrabVoluntaryLock(lockGrabbed chan<- error, okToRelease <-chan bool) error { + lockName := this.migrationContext.GetVoluntaryLockName() + + tx, err := this.db.Begin() + if err != nil { + lockGrabbed <- err + return err + } + // Grab + query := `select get_lock(?, 0)` + lockResult := 0 + log.Infof("Grabbing voluntary lock: %s", lockName) + if err := tx.QueryRow(query, lockName).Scan(&lockResult); err != nil { + lockGrabbed <- err + return err + } + if lockResult != 1 { + err := fmt.Errorf("Lock was not acquired") + lockGrabbed <- err + return err + } + log.Infof("Voluntary lock grabbed") + lockGrabbed <- nil // No error. + + // Listeners on the above will proceed to submit the "all queries till lock have been found" + // We will wait here till we're told to. This will happen once all DML events up till lock + // have been appleid on the ghost table + <-okToRelease + // Release + query = `select ifnull(release_lock(?),0)` + log.Infof("Releasing voluntary lock") + if err := tx.QueryRow(query, lockName).Scan(&lockResult); err != nil { + return log.Errore(err) + } + if lockResult != 1 { + // Generally speaking we should never get this. + return log.Errorf("release_lock result was %+v", lockResult) + } + tx.Rollback() + + log.Infof("Voluntary lock released") + return nil + +} + +// IssueBlockingQueryOnVoluntaryLock will SELECT on the original table using a +// conditional on a known to be occupied lock. This query is expected to block, +// and will further block the followup RENAME statement +func (this *Applier) IssueBlockingQueryOnVoluntaryLock(sessionIdChan chan int64) error { + lockName := this.migrationContext.GetVoluntaryLockName() + + tx, err := this.db.Begin() + if err != nil { + return err + } + var sessionId int64 + if err := tx.QueryRow(`select connection_id()`).Scan(&sessionId); err != nil { + return err + } + sessionIdChan <- sessionId + + // Grab + query := fmt.Sprintf(` + select /* gh-osc blocking-query-%s */ + release_lock(?) + from %s.%s + where + get_lock(?, 86400) >= 0 + limit 1 + `, + lockName, + sql.EscapeName(this.migrationContext.DatabaseName), + sql.EscapeName(this.migrationContext.OriginalTableName), + ) + + dummyResult := 0 + log.Infof("Issuing blocking query") + this.migrationContext.LockTablesStartTime = time.Now() + tx.QueryRow(query, lockName, lockName).Scan(&dummyResult) + tx.Rollback() + log.Infof("Blocking query released") + return nil +} + +func (this *Applier) ExpectProcess(sessionId int64, stateHint, infoHint string) error { + found := false + query := ` + select id + from information_schema.processlist + where + id != connection_id() + and ? in (0, id) + and state like concat('%', ?, '%') + and info like concat('%', ?, '%') + ` + err := sqlutils.QueryRowsMap(this.db, query, func(m sqlutils.RowMap) error { + found = true + return nil + }, sessionId, stateHint, infoHint) + if err != nil { + return err + } + if !found { + return fmt.Errorf("Cannot find process. Hints: %s, %s", stateHint, infoHint) + } + return nil +} + func (this *Applier) ShowStatusVariable(variableName string) (result int64, err error) { query := fmt.Sprintf(`show global status like '%s'`, variableName) if err := this.db.QueryRow(query).Scan(&variableName, &result); err != nil { diff --git a/go/logic/migrator.go b/go/logic/migrator.go index df6eaba..fdabc3a 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -44,6 +44,7 @@ type Migrator struct { tablesInPlace chan bool rowCopyComplete chan bool allEventsUpToLockProcessed chan bool + voluntaryLockAcquired chan bool panicAbort chan error // copyRowsQueue should not be buffered; if buffered some non-damaging but @@ -60,6 +61,7 @@ func NewMigrator() *Migrator { tablesInPlace: make(chan bool), rowCopyComplete: make(chan bool), allEventsUpToLockProcessed: make(chan bool), + voluntaryLockAcquired: make(chan bool, 1), panicAbort: make(chan error), copyRowsQueue: make(chan tableWriteFunc), @@ -175,6 +177,16 @@ func (this *Migrator) retryOperation(operation func() error) (err error) { return err } +// executeAndThrottleOnError executes a given function. If it errors, it +// throttles. +func (this *Migrator) executeAndThrottleOnError(operation func() error) (err error) { + if err := operation(); err != nil { + this.throttle(nil) + return err + } + return nil +} + func (this *Migrator) canStopStreaming() bool { return false } @@ -288,7 +300,9 @@ func (this *Migrator) Migrate() (err error) { log.Debugf("Row copy complete") this.printStatus() - this.stopWritesAndCompleteMigration() + if err := this.stopWritesAndCompleteMigration(); err != nil { + return err + } return nil } @@ -309,7 +323,16 @@ func (this *Migrator) stopWritesAndCompleteMigration() (err error) { if this.migrationContext.QuickAndBumpySwapTables { return this.stopWritesAndCompleteMigrationOnMasterQuickAndBumpy() } - return this.stopWritesAndCompleteMigrationOnMasterViaLock() + // Lock-based solution: we use low timeout and multiple attempts. But for + // each failed attempt, we throttle until replication lag is back to normal + if err := this.retryOperation( + func() error { + return this.executeAndThrottleOnError(this.stopWritesAndCompleteMigrationOnMasterViaLock) + }); err != nil { + return err + } + + return } func (this *Migrator) stopWritesAndCompleteMigrationOnMasterQuickAndBumpy() (err error) { @@ -344,33 +367,66 @@ func (this *Migrator) stopWritesAndCompleteMigrationOnMasterQuickAndBumpy() (err } func (this *Migrator) stopWritesAndCompleteMigrationOnMasterViaLock() (err error) { - if err := this.retryOperation(this.applier.LockTables); err != nil { + lockGrabbed := make(chan error, 1) + okToReleaseLock := make(chan bool, 1) + swapResult := make(chan error, 1) + go func() { + if err := this.applier.GrabVoluntaryLock(lockGrabbed, okToReleaseLock); err != nil { + log.Errore(err) + } + }() + if err := <-lockGrabbed; err != nil { + return log.Errore(err) + } + blockingQuerySessionIdChan := make(chan int64, 1) + go func() { + this.applier.IssueBlockingQueryOnVoluntaryLock(blockingQuerySessionIdChan) + }() + blockingQuerySessionId := <-blockingQuerySessionIdChan + log.Infof("Intentional blocking query connection id is %+v", blockingQuerySessionId) + + if err := this.retryOperation( + func() error { + return this.applier.ExpectProcess(blockingQuerySessionId, "User lock", this.migrationContext.GetVoluntaryLockName()) + }); err != nil { return err } + log.Infof("Found blocking query to be executing") + swapSessionIdChan := make(chan int64, 1) + go func() { + swapResult <- this.applier.SwapTablesAtomic(swapSessionIdChan) + }() + swapSessionId := <-swapSessionIdChan + log.Infof("RENAME connection id is %+v", swapSessionId) + if err := this.retryOperation( + func() error { + return this.applier.ExpectProcess(swapSessionId, "metadata lock", "rename") + }); err != nil { + return err + } + log.Infof("Found RENAME to be executing") + + // OK, at this time we know any newly incoming DML on original table is blocked. this.applier.WriteChangelogState(string(AllEventsUpToLockProcessed)) log.Debugf("Waiting for events up to lock") <-this.allEventsUpToLockProcessed log.Debugf("Done waiting for events up to lock") - if err := this.retryOperation(this.applier.SwapTablesAtomic); err != nil { + okToReleaseLock <- true + // BAM: voluntary lock is released, blocking query is released, rename is released. + // We now check RENAME result. We have lock_wait_timeout. We put it on purpose, to avoid + // locking the tables for too long. If lock time exceeds said timeout, the RENAME fails + // and returns a non-nil error, in which case tables have not been swapped, and we are + // not really done. We are, however, good to go for more retries. + if err := <-swapResult; err != nil { + // Bummer. We shall rest a while and try again return err } - if err := this.retryOperation(this.applier.UnlockTables); err != nil { - return err - } - if this.migrationContext.OkToDropTable { - dropTableFunc := func() error { - return this.applier.dropTable(this.migrationContext.GetOldTableName()) - } - if err := this.retryOperation(dropTableFunc); err != nil { - return err - } - } - + // ooh nice! We're actually truly and thankfully done lockAndRenameDuration := this.migrationContext.RenameTablesEndTime.Sub(this.migrationContext.LockTablesStartTime) renameDuration := this.migrationContext.RenameTablesEndTime.Sub(this.migrationContext.RenameTablesStartTime) - log.Debugf("Lock & rename duration: %s (rename only: %s). During this time, queries on %s were locked or failing", lockAndRenameDuration, renameDuration, sql.EscapeName(this.migrationContext.OriginalTableName)) + log.Debugf("Lock & rename duration: %s. Of this, rename time was %s. During rename time, queries on %s were blocked", lockAndRenameDuration, renameDuration, sql.EscapeName(this.migrationContext.OriginalTableName)) return nil }