retry cut-over
This commit is contained in:
parent
8d987b5aaf
commit
b00cae11fa
@ -694,8 +694,7 @@ func (this *Applier) DropAtomicCutOverSentryTableIfExists() error {
|
||||
return this.dropTable(tableName)
|
||||
}
|
||||
|
||||
// DropAtomicCutOverSentryTableIfExists checks if the "old" table name
|
||||
// happens to be a cut-over magic table; if so, it drops it.
|
||||
// CreateAtomicCutOverSentryTable
|
||||
func (this *Applier) CreateAtomicCutOverSentryTable() error {
|
||||
if err := this.DropAtomicCutOverSentryTableIfExists(); err != nil {
|
||||
return err
|
||||
|
@ -350,7 +350,7 @@ func (this *Migrator) Migrate() (err error) {
|
||||
if err := this.hooksExecutor.onBeforeCutOver(); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := this.cutOver(); err != nil {
|
||||
if err := this.retryOperation(this.cutOver); err != nil {
|
||||
return err
|
||||
}
|
||||
atomic.StoreInt64(&this.migrationContext.CutOverCompleteFlag, 1)
|
||||
@ -384,16 +384,18 @@ func (this *Migrator) cutOver() (err error) {
|
||||
})
|
||||
|
||||
this.migrationContext.MarkPointOfInterest()
|
||||
log.Debugf("checking for cut-over postpone")
|
||||
this.sleepWhileTrue(
|
||||
func() (bool, error) {
|
||||
if this.migrationContext.PostponeCutOverFlagFile == "" {
|
||||
return false, nil
|
||||
}
|
||||
if atomic.LoadInt64(&this.migrationContext.UserCommandedUnpostponeFlag) > 0 {
|
||||
atomic.StoreInt64(&this.migrationContext.UserCommandedUnpostponeFlag, 0)
|
||||
return false, nil
|
||||
}
|
||||
if base.FileExists(this.migrationContext.PostponeCutOverFlagFile) {
|
||||
// Throttle file defined and exists!
|
||||
// Postpone file defined and exists!
|
||||
if atomic.LoadInt64(&this.migrationContext.IsPostponingCutOver) == 0 {
|
||||
if err := this.hooksExecutor.onBeginPostponed(); err != nil {
|
||||
return true, err
|
||||
@ -431,20 +433,11 @@ func (this *Migrator) cutOver() (err error) {
|
||||
if this.migrationContext.CutOverType == base.CutOverAtomic {
|
||||
// Atomic solution: we use low timeout and multiple attempts. But for
|
||||
// each failed attempt, we throttle until replication lag is back to normal
|
||||
err := this.retryOperation(
|
||||
func() error {
|
||||
return this.executeAndThrottleOnError(this.atomicCutOver)
|
||||
},
|
||||
)
|
||||
err := this.atomicCutOver()
|
||||
return err
|
||||
}
|
||||
if this.migrationContext.CutOverType == base.CutOverTwoStep {
|
||||
err := this.retryOperation(
|
||||
func() error {
|
||||
return this.executeAndThrottleOnError(this.cutOverTwoStep)
|
||||
},
|
||||
)
|
||||
return err
|
||||
return this.cutOverTwoStep()
|
||||
}
|
||||
return log.Fatalf("Unknown cut-over type: %d; should never get here!", this.migrationContext.CutOverType)
|
||||
}
|
||||
@ -452,6 +445,7 @@ func (this *Migrator) cutOver() (err error) {
|
||||
// Inject the "AllEventsUpToLockProcessed" state hint, wait for it to appear in the binary logs,
|
||||
// make sure the queue is drained.
|
||||
func (this *Migrator) waitForEventsUpToLock() (err error) {
|
||||
// timeout := time.NewTimer(time.Minute * time.Duration(this.migrationContext.CutOverLockTimeoutSeconds))
|
||||
timeout := time.NewTimer(time.Second * time.Duration(this.migrationContext.CutOverLockTimeoutSeconds))
|
||||
|
||||
this.migrationContext.MarkPointOfInterest()
|
||||
@ -523,7 +517,9 @@ func (this *Migrator) atomicCutOver() (err error) {
|
||||
atomic.StoreInt64(&this.inCutOverCriticalActionFlag, 1)
|
||||
defer atomic.StoreInt64(&this.inCutOverCriticalActionFlag, 0)
|
||||
|
||||
okToUnlockTable := make(chan bool, 4)
|
||||
defer func() {
|
||||
okToUnlockTable <- true
|
||||
this.applier.DropAtomicCutOverSentryTableIfExists()
|
||||
}()
|
||||
|
||||
@ -531,7 +527,6 @@ func (this *Migrator) atomicCutOver() (err error) {
|
||||
|
||||
lockOriginalSessionIdChan := make(chan int64, 2)
|
||||
tableLocked := make(chan error, 2)
|
||||
okToUnlockTable := make(chan bool, 3)
|
||||
tableUnlocked := make(chan error, 2)
|
||||
go func() {
|
||||
if err := this.applier.AtomicCutOverMagicLock(lockOriginalSessionIdChan, tableLocked, okToUnlockTable, tableUnlocked); err != nil {
|
||||
@ -545,7 +540,9 @@ func (this *Migrator) atomicCutOver() (err error) {
|
||||
log.Infof("Session locking original & magic tables is %+v", lockOriginalSessionId)
|
||||
// At this point we know the original table is locked.
|
||||
// We know any newly incoming DML on original table is blocked.
|
||||
this.waitForEventsUpToLock()
|
||||
if err := this.waitForEventsUpToLock(); err != nil {
|
||||
return log.Errore(err)
|
||||
}
|
||||
|
||||
// Step 2
|
||||
// We now attempt an atomic RENAME on original & ghost tables, and expect it to block.
|
||||
|
Loading…
x
Reference in New Issue
Block a user