fix issue 887

This commit is contained in:
thsun 2021-06-11 16:28:39 +08:00
parent fef83af378
commit acd616a17e
2 changed files with 116 additions and 4 deletions

View File

@ -790,7 +790,7 @@ func (this *Applier) CreateAtomicCutOverSentryTable() error {
}
// AtomicCutOverMagicLock
func (this *Applier) AtomicCutOverMagicLock(sessionIdChan chan int64, tableLocked chan<- error, okToUnlockTable <-chan bool, tableUnlocked chan<- error, dropCutOverSentryTableOnce *sync.Once) error {
func (this *Applier) AtomicCutOverMagicLock(sessionIdChan chan int64, tableLocked chan<- error, okToUnlockTable, unlockGhostTableDone <-chan bool, tableUnlocked chan<- error, dropCutOverSentryTableOnce *sync.Once, okToUnlockGhostTable chan<- bool) error {
tx, err := this.db.Begin()
if err != nil {
tableLocked <- err
@ -876,6 +876,17 @@ func (this *Applier) AtomicCutOverMagicLock(sessionIdChan chan int64, tableLocke
}
})
okToUnlockGhostTable <- true
// release original table lock at last,
// should send unlockGhostTableDone channel in exception scenario to make sure unlock original table lock success.
select {
case <-unlockGhostTableDone:
this.migrationContext.Log.Infof("Receive ghost table unlocked channel, unlock tables now")
case <-time.After(time.Duration(time.Second)):
this.migrationContext.Log.Errorf("Wait unlock ghost table timeout, force unlock tables now")
}
// Tables still locked
this.migrationContext.Log.Infof("Releasing lock from %s.%s, %s.%s",
sql.EscapeName(this.migrationContext.DatabaseName),
@ -894,7 +905,21 @@ func (this *Applier) AtomicCutOverMagicLock(sessionIdChan chan int64, tableLocke
}
// AtomicCutoverRename
func (this *Applier) AtomicCutoverRename(sessionIdChan chan int64, tablesRenamed chan<- error) error {
func (this *Applier) AtomicCutoverRename(sessionIdChan, lockGhostSessionIdChan chan int64, tablesRenamed, ghostTableLocked, ghostTableUnlocked chan error, okToUnlockGhostTable, unlockGhostTableDone chan bool) error {
// lock gho table before rename, after lock open a goroutine wait okToUnlockGhoTable channel
// lock original&magic table (session1) -> lock ghost table (session2) -> cut-over table (session3 #blocked) ->
// drop magic table (session1) -> unlock ghost table (session2) -> unlock original table (session1)
go func() {
if err := this.AtomicCutOverGhostLock(ghostTableLocked, ghostTableUnlocked, lockGhostSessionIdChan, okToUnlockGhostTable, unlockGhostTableDone); err != nil {
this.migrationContext.Log.Errore(err)
}
}()
if err := <-ghostTableLocked; err != nil {
sessionIdChan <- -1
return this.migrationContext.Log.Errore(err)
}
tx, err := this.db.Begin()
if err != nil {
return err
@ -936,6 +961,77 @@ func (this *Applier) AtomicCutoverRename(sessionIdChan chan int64, tablesRenamed
return nil
}
// AtomicCutOverGhostLock
func (this *Applier) AtomicCutOverGhostLock(ghostTableLocked, ghostTableUnlocked chan<- error, lockGhostSessionIdChan chan int64, okToUnlockGhostTable <-chan bool, unlockGhostTableDone chan<- bool) error {
tx, err := this.db.Begin()
if err != nil {
return err
}
defer func() {
tx.Rollback()
unlockGhostTableDone <- true
lockGhostSessionIdChan <- -1
ghostTableLocked <- fmt.Errorf("Unexpected error in AtomicCutOverGhostLock(), injected to release blocking channel reads")
ghostTableUnlocked <- fmt.Errorf("Unexpected error in AtomicCutOverGhostLock(), injected to release blocking channel reads")
}()
var sessionId int64
if err := tx.QueryRow(`select connection_id()`).Scan(&sessionId); err != nil {
return err
}
lockGhostSessionIdChan <- sessionId
lockResult := 0
query := `select get_lock(?, 0)`
lockName := this.GetSessionLockName(sessionId)
this.migrationContext.Log.Infof("Grabbing voluntary lock: %s", lockName)
if err := tx.QueryRow(query, lockName).Scan(&lockResult); err != nil || lockResult != 1 {
err := fmt.Errorf("Unable to acquire lock %s", lockName)
ghostTableLocked <- err
return err
}
this.migrationContext.Log.Infof("Setting lock ghost table timeout as %d seconds", this.migrationContext.CutOverLockTimeoutSeconds)
query = fmt.Sprintf(`set session lock_wait_timeout:=%d`, this.migrationContext.CutOverLockTimeoutSeconds)
if _, err := tx.Exec(query); err != nil {
return err
}
query = fmt.Sprintf(`lock /* gh-ost */ tables %s.%s write`,
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.GetGhostTableName()),
)
this.migrationContext.Log.Infof("Locking %s.%s",
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.GetGhostTableName()),
)
this.migrationContext.Log.Infof("Issuing and expecting get the %s.%s table lock: %s", this.migrationContext.DatabaseName, this.migrationContext.GetGhostTableName(), query)
if _, err := tx.Exec(query); err != nil {
ghostTableLocked <- err
return this.migrationContext.Log.Errore(err)
}
this.migrationContext.Log.Infof("Ghost table locked")
ghostTableLocked <- nil // No error.
<-okToUnlockGhostTable
// release gho table lock after drop magic cut-over table
this.migrationContext.Log.Infof("Will now proceed to unlock ghost table")
this.migrationContext.Log.Infof("Releasing lock from %s.%s",
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.GetGhostTableName()),
)
query = `unlock tables`
if _, err := tx.Exec(query); err != nil {
ghostTableUnlocked <- err
}
unlockGhostTableDone <- true
this.migrationContext.Log.Infof("Ghost table unlocked")
ghostTableUnlocked <- nil
return nil
}
func (this *Applier) ShowStatusVariable(variableName string) (result int64, err error) {
query := fmt.Sprintf(`show global status like '%s'`, variableName)
if err := this.db.QueryRow(query).Scan(&variableName, &result); err != nil {

View File

@ -634,9 +634,12 @@ func (this *Migrator) atomicCutOver() (err error) {
defer atomic.StoreInt64(&this.migrationContext.InCutOverCriticalSectionFlag, 0)
okToUnlockTable := make(chan bool, 4)
okToUnlockGhostTable := make(chan bool, 4)
unlockGhostTableDone := make(chan bool, 4)
var dropCutOverSentryTableOnce sync.Once
defer func() {
okToUnlockTable <- true
unlockGhostTableDone <- true
dropCutOverSentryTableOnce.Do(func() {
this.applier.DropAtomicCutOverSentryTableIfExists()
})
@ -648,7 +651,7 @@ func (this *Migrator) atomicCutOver() (err error) {
tableLocked := make(chan error, 2)
tableUnlocked := make(chan error, 2)
go func() {
if err := this.applier.AtomicCutOverMagicLock(lockOriginalSessionIdChan, tableLocked, okToUnlockTable, tableUnlocked, &dropCutOverSentryTableOnce); err != nil {
if err := this.applier.AtomicCutOverMagicLock(lockOriginalSessionIdChan, tableLocked, okToUnlockTable, unlockGhostTableDone, tableUnlocked, &dropCutOverSentryTableOnce, okToUnlockGhostTable); err != nil {
this.migrationContext.Log.Errore(err)
}
}()
@ -670,13 +673,23 @@ func (this *Migrator) atomicCutOver() (err error) {
var tableRenameKnownToHaveFailed int64
renameSessionIdChan := make(chan int64, 2)
tablesRenamed := make(chan error, 2)
ghostTableLocked := make(chan error, 2)
ghostTableUnlocked := make(chan error, 2)
lockGhostSessionIdChan := make(chan int64, 2)
go func() {
if err := this.applier.AtomicCutoverRename(renameSessionIdChan, tablesRenamed); err != nil {
if err := this.applier.AtomicCutoverRename(renameSessionIdChan, lockGhostSessionIdChan, tablesRenamed, ghostTableLocked, ghostTableUnlocked, okToUnlockGhostTable, unlockGhostTableDone); err != nil {
// Abort! Release the lock
atomic.StoreInt64(&tableRenameKnownToHaveFailed, 1)
okToUnlockTable <- true
okToUnlockGhostTable <- true
unlockGhostTableDone <- true
}
}()
lockGhoSessionId := <-lockGhostSessionIdChan
this.migrationContext.Log.Infof("Session locking ghost table is %+v", lockGhoSessionId)
renameSessionId := <-renameSessionIdChan
this.migrationContext.Log.Infof("Session renaming tables is %+v", renameSessionId)
@ -709,6 +722,9 @@ func (this *Migrator) atomicCutOver() (err error) {
okToUnlockTable <- true
// BAM! magic table dropped, original table lock is released
// -> RENAME released -> queries on original are unblocked.
if err := <-ghostTableUnlocked; err != nil {
return this.migrationContext.Log.Errore(err)
}
if err := <-tableUnlocked; err != nil {
return this.migrationContext.Log.Errore(err)
}