fix: because lock is not release, drop cutover sentry table is hanged

This commit is contained in:
lmtwga 2022-09-16 17:49:09 +08:00
parent 1df37c207f
commit 321e5847ba
2 changed files with 7 additions and 14 deletions

View File

@ -9,7 +9,6 @@ import (
gosql "database/sql" gosql "database/sql"
"fmt" "fmt"
"strings" "strings"
"sync"
"sync/atomic" "sync/atomic"
"time" "time"
@ -904,7 +903,7 @@ func (this *Applier) CreateAtomicCutOverSentryTable() error {
} }
// AtomicCutOverMagicLock // AtomicCutOverMagicLock
func (this *Applier) AtomicCutOverMagicLock(sessionIdChan chan int64, tableLocked chan<- error, okToUnlockTable <-chan bool, tableUnlocked chan<- error, dropCutOverSentryTableOnce *sync.Once) error { func (this *Applier) AtomicCutOverMagicLock(sessionIdChan chan int64, tableLocked chan<- error, okToUnlockTable <-chan bool, tableUnlocked chan<- error) error {
tx, err := this.db.Begin() tx, err := this.db.Begin()
if err != nil { if err != nil {
tableLocked <- err tableLocked <- err
@ -915,6 +914,7 @@ func (this *Applier) AtomicCutOverMagicLock(sessionIdChan chan int64, tableLocke
tableLocked <- fmt.Errorf("Unexpected error in AtomicCutOverMagicLock(), injected to release blocking channel reads") tableLocked <- fmt.Errorf("Unexpected error in AtomicCutOverMagicLock(), injected to release blocking channel reads")
tableUnlocked <- fmt.Errorf("Unexpected error in AtomicCutOverMagicLock(), injected to release blocking channel reads") tableUnlocked <- fmt.Errorf("Unexpected error in AtomicCutOverMagicLock(), injected to release blocking channel reads")
tx.Rollback() tx.Rollback()
this.DropAtomicCutOverSentryTableIfExists()
}() }()
var sessionId int64 var sessionId int64
@ -983,12 +983,10 @@ func (this *Applier) AtomicCutOverMagicLock(sessionIdChan chan int64, tableLocke
sql.EscapeName(this.migrationContext.GetOldTableName()), sql.EscapeName(this.migrationContext.GetOldTableName()),
) )
dropCutOverSentryTableOnce.Do(func() { if _, err := tx.Exec(query); err != nil {
if _, err := tx.Exec(query); err != nil { this.migrationContext.Log.Errore(err)
this.migrationContext.Log.Errore(err) // We DO NOT return here because we must `UNLOCK TABLES`!
// We DO NOT return here because we must `UNLOCK TABLES`! }
}
})
// Tables still locked // Tables still locked
this.migrationContext.Log.Infof("Releasing lock from %s.%s, %s.%s", this.migrationContext.Log.Infof("Releasing lock from %s.%s, %s.%s",

View File

@ -13,7 +13,6 @@ import (
"math" "math"
"os" "os"
"strings" "strings"
"sync"
"sync/atomic" "sync/atomic"
"time" "time"
@ -639,12 +638,8 @@ func (this *Migrator) atomicCutOver() (err error) {
defer atomic.StoreInt64(&this.migrationContext.InCutOverCriticalSectionFlag, 0) defer atomic.StoreInt64(&this.migrationContext.InCutOverCriticalSectionFlag, 0)
okToUnlockTable := make(chan bool, 4) okToUnlockTable := make(chan bool, 4)
var dropCutOverSentryTableOnce sync.Once
defer func() { defer func() {
okToUnlockTable <- true okToUnlockTable <- true
dropCutOverSentryTableOnce.Do(func() {
this.applier.DropAtomicCutOverSentryTableIfExists()
})
}() }()
atomic.StoreInt64(&this.migrationContext.AllEventsUpToLockProcessedInjectedFlag, 0) atomic.StoreInt64(&this.migrationContext.AllEventsUpToLockProcessedInjectedFlag, 0)
@ -653,7 +648,7 @@ func (this *Migrator) atomicCutOver() (err error) {
tableLocked := make(chan error, 2) tableLocked := make(chan error, 2)
tableUnlocked := make(chan error, 2) tableUnlocked := make(chan error, 2)
go func() { go func() {
if err := this.applier.AtomicCutOverMagicLock(lockOriginalSessionIdChan, tableLocked, okToUnlockTable, tableUnlocked, &dropCutOverSentryTableOnce); err != nil { if err := this.applier.AtomicCutOverMagicLock(lockOriginalSessionIdChan, tableLocked, okToUnlockTable, tableUnlocked); err != nil {
this.migrationContext.Log.Errore(err) this.migrationContext.Log.Errore(err)
} }
}() }()