woohoo, logic complete
- Introduced `SwapTablesTimeoutSeconds`; `RENAME` is limited by this timeout - If `RENAME` fails (due to the above), we throttle and retry - `SwapTablesAtomic()` sets `lock_wait_timeout` and notifies with connection id - `GrabVoluntaryLock()` intentionally grabs (and later releases) voluntary lock. It notifies when it is taken and awaits instructions as for when it could be released. - `IssueBlockingQueryOnVoluntaryLock()` does what it says. It notifies with its connection_id so that it can be easily traced - `stopWritesAndCompleteMigrationOnMasterViaLock()` does the thang. Oh dear this was agonizing and the code is a pain to look at, though under the limitations I do believe it is as clean as I could hope for.
This commit is contained in:
parent
1ed1b0d156
commit
421ab0fc83
@ -46,6 +46,7 @@ type MigrationContext struct {
|
||||
ThrottleFlagFile string
|
||||
ThrottleAdditionalFlagFile string
|
||||
MaxLoad map[string]int64
|
||||
SwapTablesTimeoutSeconds int64
|
||||
|
||||
Noop bool
|
||||
TestOnReplica bool
|
||||
@ -98,8 +99,9 @@ func newMigrationContext() *MigrationContext {
|
||||
InspectorConnectionConfig: mysql.NewConnectionConfig(),
|
||||
ApplierConnectionConfig: mysql.NewConnectionConfig(),
|
||||
MaxLagMillisecondsThrottleThreshold: 1000,
|
||||
MaxLoad: make(map[string]int64),
|
||||
throttleMutex: &sync.Mutex{},
|
||||
SwapTablesTimeoutSeconds: 3,
|
||||
MaxLoad: make(map[string]int64),
|
||||
throttleMutex: &sync.Mutex{},
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -429,20 +429,6 @@ func (this *Applier) UnlockTables() error {
|
||||
|
||||
// SwapTablesQuickAndBumpy
|
||||
func (this *Applier) SwapTablesQuickAndBumpy() error {
|
||||
// query := fmt.Sprintf(`rename /* gh-osc */ table %s.%s to %s.%s, %s.%s to %s.%s`,
|
||||
// sql.EscapeName(this.migrationContext.DatabaseName),
|
||||
// sql.EscapeName(this.migrationContext.OriginalTableName),
|
||||
// sql.EscapeName(this.migrationContext.DatabaseName),
|
||||
// sql.EscapeName(this.migrationContext.GetOldTableName()),
|
||||
// sql.EscapeName(this.migrationContext.DatabaseName),
|
||||
// sql.EscapeName(this.migrationContext.GetGhostTableName()),
|
||||
// sql.EscapeName(this.migrationContext.DatabaseName),
|
||||
// sql.EscapeName(this.migrationContext.OriginalTableName),
|
||||
// )
|
||||
// log.Infof("Renaming tables")
|
||||
// if _, err := sqlutils.ExecNoPrepare(this.singletonDB, query); err != nil {
|
||||
// return err
|
||||
// }
|
||||
query := fmt.Sprintf(`alter /* gh-osc */ table %s.%s rename %s`,
|
||||
sql.EscapeName(this.migrationContext.DatabaseName),
|
||||
sql.EscapeName(this.migrationContext.OriginalTableName),
|
||||
@ -468,9 +454,27 @@ func (this *Applier) SwapTablesQuickAndBumpy() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// SwapTablesQuickAndBumpy
|
||||
func (this *Applier) SwapTablesAtomic() error {
|
||||
query := fmt.Sprintf(`rename /* gh-osc */ table %s.%s to %s.%s, %s.%s to %s.%s`,
|
||||
// SwapTablesAtomic issues a single two-table RENAME statement to swap ghost table
|
||||
// into original's place
|
||||
func (this *Applier) SwapTablesAtomic(sessionIdChan chan int64) error {
|
||||
|
||||
tx, err := this.db.Begin()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
log.Infof("Setting timeout for RENAME for %d seconds", this.migrationContext.SwapTablesTimeoutSeconds)
|
||||
query := fmt.Sprintf(`set session lock_wait_timeout:=%d`, this.migrationContext.SwapTablesTimeoutSeconds)
|
||||
if _, err := tx.Exec(query); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var sessionId int64
|
||||
if err := tx.QueryRow(`select connection_id()`).Scan(&sessionId); err != nil {
|
||||
return err
|
||||
}
|
||||
sessionIdChan <- sessionId
|
||||
|
||||
query = fmt.Sprintf(`rename /* gh-osc */ table %s.%s to %s.%s, %s.%s to %s.%s`,
|
||||
sql.EscapeName(this.migrationContext.DatabaseName),
|
||||
sql.EscapeName(this.migrationContext.OriginalTableName),
|
||||
sql.EscapeName(this.migrationContext.DatabaseName),
|
||||
@ -481,14 +485,20 @@ func (this *Applier) SwapTablesAtomic() error {
|
||||
sql.EscapeName(this.migrationContext.OriginalTableName),
|
||||
)
|
||||
log.Infof("Renaming tables")
|
||||
if _, err := sqlutils.ExecNoPrepare(this.singletonDB, query); err != nil {
|
||||
|
||||
this.migrationContext.RenameTablesStartTime = time.Now()
|
||||
if _, err := tx.Exec(query); err != nil {
|
||||
return err
|
||||
}
|
||||
this.migrationContext.RenameTablesEndTime = time.Now()
|
||||
tx.Commit()
|
||||
log.Infof("Tables renamed")
|
||||
return nil
|
||||
}
|
||||
|
||||
// StopSlaveIOThread is applicable with --test-on-replica; it stops the IO thread
|
||||
// StopSlaveIOThread is applicable with --test-on-replica; it stops the IO thread, duh.
|
||||
// We need to keep the SQL thread active so as to complete processing received events,
|
||||
// and have them written to the binary log, so that we can then read them via streamer
|
||||
func (this *Applier) StopSlaveIOThread() error {
|
||||
query := `stop /* gh-osc */ slave io_thread`
|
||||
log.Infof("Stopping replication")
|
||||
@ -499,6 +509,116 @@ func (this *Applier) StopSlaveIOThread() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// GrabVoluntaryLock gets a named lock (`GET_LOCK`) and listens
|
||||
// on a okToRelease in order to release it
|
||||
func (this *Applier) GrabVoluntaryLock(lockGrabbed chan<- error, okToRelease <-chan bool) error {
|
||||
lockName := this.migrationContext.GetVoluntaryLockName()
|
||||
|
||||
tx, err := this.db.Begin()
|
||||
if err != nil {
|
||||
lockGrabbed <- err
|
||||
return err
|
||||
}
|
||||
// Grab
|
||||
query := `select get_lock(?, 0)`
|
||||
lockResult := 0
|
||||
log.Infof("Grabbing voluntary lock: %s", lockName)
|
||||
if err := tx.QueryRow(query, lockName).Scan(&lockResult); err != nil {
|
||||
lockGrabbed <- err
|
||||
return err
|
||||
}
|
||||
if lockResult != 1 {
|
||||
err := fmt.Errorf("Lock was not acquired")
|
||||
lockGrabbed <- err
|
||||
return err
|
||||
}
|
||||
log.Infof("Voluntary lock grabbed")
|
||||
lockGrabbed <- nil // No error.
|
||||
|
||||
// Listeners on the above will proceed to submit the "all queries till lock have been found"
|
||||
// We will wait here till we're told to. This will happen once all DML events up till lock
|
||||
// have been appleid on the ghost table
|
||||
<-okToRelease
|
||||
// Release
|
||||
query = `select ifnull(release_lock(?),0)`
|
||||
log.Infof("Releasing voluntary lock")
|
||||
if err := tx.QueryRow(query, lockName).Scan(&lockResult); err != nil {
|
||||
return log.Errore(err)
|
||||
}
|
||||
if lockResult != 1 {
|
||||
// Generally speaking we should never get this.
|
||||
return log.Errorf("release_lock result was %+v", lockResult)
|
||||
}
|
||||
tx.Rollback()
|
||||
|
||||
log.Infof("Voluntary lock released")
|
||||
return nil
|
||||
|
||||
}
|
||||
|
||||
// IssueBlockingQueryOnVoluntaryLock will SELECT on the original table using a
|
||||
// conditional on a known to be occupied lock. This query is expected to block,
|
||||
// and will further block the followup RENAME statement
|
||||
func (this *Applier) IssueBlockingQueryOnVoluntaryLock(sessionIdChan chan int64) error {
|
||||
lockName := this.migrationContext.GetVoluntaryLockName()
|
||||
|
||||
tx, err := this.db.Begin()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
var sessionId int64
|
||||
if err := tx.QueryRow(`select connection_id()`).Scan(&sessionId); err != nil {
|
||||
return err
|
||||
}
|
||||
sessionIdChan <- sessionId
|
||||
|
||||
// Grab
|
||||
query := fmt.Sprintf(`
|
||||
select /* gh-osc blocking-query-%s */
|
||||
release_lock(?)
|
||||
from %s.%s
|
||||
where
|
||||
get_lock(?, 86400) >= 0
|
||||
limit 1
|
||||
`,
|
||||
lockName,
|
||||
sql.EscapeName(this.migrationContext.DatabaseName),
|
||||
sql.EscapeName(this.migrationContext.OriginalTableName),
|
||||
)
|
||||
|
||||
dummyResult := 0
|
||||
log.Infof("Issuing blocking query")
|
||||
this.migrationContext.LockTablesStartTime = time.Now()
|
||||
tx.QueryRow(query, lockName, lockName).Scan(&dummyResult)
|
||||
tx.Rollback()
|
||||
log.Infof("Blocking query released")
|
||||
return nil
|
||||
}
|
||||
|
||||
func (this *Applier) ExpectProcess(sessionId int64, stateHint, infoHint string) error {
|
||||
found := false
|
||||
query := `
|
||||
select id
|
||||
from information_schema.processlist
|
||||
where
|
||||
id != connection_id()
|
||||
and ? in (0, id)
|
||||
and state like concat('%', ?, '%')
|
||||
and info like concat('%', ?, '%')
|
||||
`
|
||||
err := sqlutils.QueryRowsMap(this.db, query, func(m sqlutils.RowMap) error {
|
||||
found = true
|
||||
return nil
|
||||
}, sessionId, stateHint, infoHint)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !found {
|
||||
return fmt.Errorf("Cannot find process. Hints: %s, %s", stateHint, infoHint)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (this *Applier) ShowStatusVariable(variableName string) (result int64, err error) {
|
||||
query := fmt.Sprintf(`show global status like '%s'`, variableName)
|
||||
if err := this.db.QueryRow(query).Scan(&variableName, &result); err != nil {
|
||||
|
@ -44,6 +44,7 @@ type Migrator struct {
|
||||
tablesInPlace chan bool
|
||||
rowCopyComplete chan bool
|
||||
allEventsUpToLockProcessed chan bool
|
||||
voluntaryLockAcquired chan bool
|
||||
panicAbort chan error
|
||||
|
||||
// copyRowsQueue should not be buffered; if buffered some non-damaging but
|
||||
@ -60,6 +61,7 @@ func NewMigrator() *Migrator {
|
||||
tablesInPlace: make(chan bool),
|
||||
rowCopyComplete: make(chan bool),
|
||||
allEventsUpToLockProcessed: make(chan bool),
|
||||
voluntaryLockAcquired: make(chan bool, 1),
|
||||
panicAbort: make(chan error),
|
||||
|
||||
copyRowsQueue: make(chan tableWriteFunc),
|
||||
@ -175,6 +177,16 @@ func (this *Migrator) retryOperation(operation func() error) (err error) {
|
||||
return err
|
||||
}
|
||||
|
||||
// executeAndThrottleOnError executes a given function. If it errors, it
|
||||
// throttles.
|
||||
func (this *Migrator) executeAndThrottleOnError(operation func() error) (err error) {
|
||||
if err := operation(); err != nil {
|
||||
this.throttle(nil)
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (this *Migrator) canStopStreaming() bool {
|
||||
return false
|
||||
}
|
||||
@ -288,7 +300,9 @@ func (this *Migrator) Migrate() (err error) {
|
||||
log.Debugf("Row copy complete")
|
||||
this.printStatus()
|
||||
|
||||
this.stopWritesAndCompleteMigration()
|
||||
if err := this.stopWritesAndCompleteMigration(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
@ -309,7 +323,16 @@ func (this *Migrator) stopWritesAndCompleteMigration() (err error) {
|
||||
if this.migrationContext.QuickAndBumpySwapTables {
|
||||
return this.stopWritesAndCompleteMigrationOnMasterQuickAndBumpy()
|
||||
}
|
||||
return this.stopWritesAndCompleteMigrationOnMasterViaLock()
|
||||
// Lock-based solution: we use low timeout and multiple attempts. But for
|
||||
// each failed attempt, we throttle until replication lag is back to normal
|
||||
if err := this.retryOperation(
|
||||
func() error {
|
||||
return this.executeAndThrottleOnError(this.stopWritesAndCompleteMigrationOnMasterViaLock)
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func (this *Migrator) stopWritesAndCompleteMigrationOnMasterQuickAndBumpy() (err error) {
|
||||
@ -344,33 +367,66 @@ func (this *Migrator) stopWritesAndCompleteMigrationOnMasterQuickAndBumpy() (err
|
||||
}
|
||||
|
||||
func (this *Migrator) stopWritesAndCompleteMigrationOnMasterViaLock() (err error) {
|
||||
if err := this.retryOperation(this.applier.LockTables); err != nil {
|
||||
lockGrabbed := make(chan error, 1)
|
||||
okToReleaseLock := make(chan bool, 1)
|
||||
swapResult := make(chan error, 1)
|
||||
go func() {
|
||||
if err := this.applier.GrabVoluntaryLock(lockGrabbed, okToReleaseLock); err != nil {
|
||||
log.Errore(err)
|
||||
}
|
||||
}()
|
||||
if err := <-lockGrabbed; err != nil {
|
||||
return log.Errore(err)
|
||||
}
|
||||
blockingQuerySessionIdChan := make(chan int64, 1)
|
||||
go func() {
|
||||
this.applier.IssueBlockingQueryOnVoluntaryLock(blockingQuerySessionIdChan)
|
||||
}()
|
||||
blockingQuerySessionId := <-blockingQuerySessionIdChan
|
||||
log.Infof("Intentional blocking query connection id is %+v", blockingQuerySessionId)
|
||||
|
||||
if err := this.retryOperation(
|
||||
func() error {
|
||||
return this.applier.ExpectProcess(blockingQuerySessionId, "User lock", this.migrationContext.GetVoluntaryLockName())
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
log.Infof("Found blocking query to be executing")
|
||||
swapSessionIdChan := make(chan int64, 1)
|
||||
go func() {
|
||||
swapResult <- this.applier.SwapTablesAtomic(swapSessionIdChan)
|
||||
}()
|
||||
|
||||
swapSessionId := <-swapSessionIdChan
|
||||
log.Infof("RENAME connection id is %+v", swapSessionId)
|
||||
if err := this.retryOperation(
|
||||
func() error {
|
||||
return this.applier.ExpectProcess(swapSessionId, "metadata lock", "rename")
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
log.Infof("Found RENAME to be executing")
|
||||
|
||||
// OK, at this time we know any newly incoming DML on original table is blocked.
|
||||
this.applier.WriteChangelogState(string(AllEventsUpToLockProcessed))
|
||||
log.Debugf("Waiting for events up to lock")
|
||||
<-this.allEventsUpToLockProcessed
|
||||
log.Debugf("Done waiting for events up to lock")
|
||||
|
||||
if err := this.retryOperation(this.applier.SwapTablesAtomic); err != nil {
|
||||
okToReleaseLock <- true
|
||||
// BAM: voluntary lock is released, blocking query is released, rename is released.
|
||||
// We now check RENAME result. We have lock_wait_timeout. We put it on purpose, to avoid
|
||||
// locking the tables for too long. If lock time exceeds said timeout, the RENAME fails
|
||||
// and returns a non-nil error, in which case tables have not been swapped, and we are
|
||||
// not really done. We are, however, good to go for more retries.
|
||||
if err := <-swapResult; err != nil {
|
||||
// Bummer. We shall rest a while and try again
|
||||
return err
|
||||
}
|
||||
if err := this.retryOperation(this.applier.UnlockTables); err != nil {
|
||||
return err
|
||||
}
|
||||
if this.migrationContext.OkToDropTable {
|
||||
dropTableFunc := func() error {
|
||||
return this.applier.dropTable(this.migrationContext.GetOldTableName())
|
||||
}
|
||||
if err := this.retryOperation(dropTableFunc); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// ooh nice! We're actually truly and thankfully done
|
||||
lockAndRenameDuration := this.migrationContext.RenameTablesEndTime.Sub(this.migrationContext.LockTablesStartTime)
|
||||
renameDuration := this.migrationContext.RenameTablesEndTime.Sub(this.migrationContext.RenameTablesStartTime)
|
||||
log.Debugf("Lock & rename duration: %s (rename only: %s). During this time, queries on %s were locked or failing", lockAndRenameDuration, renameDuration, sql.EscapeName(this.migrationContext.OriginalTableName))
|
||||
log.Debugf("Lock & rename duration: %s. Of this, rename time was %s. During rename time, queries on %s were blocked", lockAndRenameDuration, renameDuration, sql.EscapeName(this.migrationContext.OriginalTableName))
|
||||
return nil
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user