diff --git a/go/base/context.go b/go/base/context.go index f82fb22..21c5758 100644 --- a/go/base/context.go +++ b/go/base/context.go @@ -156,6 +156,7 @@ type MigrationContext struct { CleanupImminentFlag int64 UserCommandedUnpostponeFlag int64 CutOverCompleteFlag int64 + InCutOverCriticalSectionFlag int64 PanicAbort chan error OriginalTableColumnsOnApplier *sql.ColumnList @@ -438,6 +439,16 @@ func (this *MigrationContext) SetThrottled(throttle bool, reason string, reasonH func (this *MigrationContext) IsThrottled() (bool, string, ThrottleReasonHint) { this.throttleMutex.Lock() defer this.throttleMutex.Unlock() + + // we don't throttle when cutting over. We _do_ throttle: + // - during copy phase + // - just before cut-over + // - in between cut-over retries + // When cutting over, we need to be aggressive. Cut-over holds table locks. + // We need to release those asap. + if atomic.LoadInt64(&this.InCutOverCriticalSectionFlag) > 0 { + return false, "critical section", NoThrottleReasonHint + } return this.isThrottled, this.throttleReason, this.throttleReasonHint } diff --git a/go/logic/migrator.go b/go/logic/migrator.go index 442c730..cf59972 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -67,8 +67,7 @@ type Migrator struct { rowCopyComplete chan bool allEventsUpToLockProcessed chan string - rowCopyCompleteFlag int64 - inCutOverCriticalActionFlag int64 + rowCopyCompleteFlag int64 // copyRowsQueue should not be buffered; if buffered some non-damaging but // excessive work happens at the end of the iteration as new copy-jobs arrive befroe realizing the copy is complete copyRowsQueue chan tableWriteFunc @@ -409,6 +408,7 @@ func (this *Migrator) cutOver() (err error) { ) atomic.StoreInt64(&this.migrationContext.IsPostponingCutOver, 0) this.migrationContext.MarkPointOfInterest() + log.Debugf("checking for cut-over postpone: complete") if this.migrationContext.TestOnReplica { // With `--test-on-replica` we stop replication thread, and then proceed to use @@ -445,7 +445,6 @@ func (this *Migrator) cutOver() (err error) { // Inject the "AllEventsUpToLockProcessed" state hint, wait for it to appear in the binary logs, // make sure the queue is drained. func (this *Migrator) waitForEventsUpToLock() (err error) { - // timeout := time.NewTimer(time.Minute * time.Duration(this.migrationContext.CutOverLockTimeoutSeconds)) timeout := time.NewTimer(time.Second * time.Duration(this.migrationContext.CutOverLockTimeoutSeconds)) this.migrationContext.MarkPointOfInterest() @@ -488,8 +487,8 @@ func (this *Migrator) waitForEventsUpToLock() (err error) { // There is a point in time where the "original" table does not exist and queries are non-blocked // and failing. func (this *Migrator) cutOverTwoStep() (err error) { - atomic.StoreInt64(&this.inCutOverCriticalActionFlag, 1) - defer atomic.StoreInt64(&this.inCutOverCriticalActionFlag, 0) + atomic.StoreInt64(&this.migrationContext.InCutOverCriticalSectionFlag, 1) + defer atomic.StoreInt64(&this.migrationContext.InCutOverCriticalSectionFlag, 0) atomic.StoreInt64(&this.migrationContext.AllEventsUpToLockProcessedInjectedFlag, 0) if err := this.retryOperation(this.applier.LockOriginalTable); err != nil { @@ -514,8 +513,8 @@ func (this *Migrator) cutOverTwoStep() (err error) { // atomicCutOver func (this *Migrator) atomicCutOver() (err error) { - atomic.StoreInt64(&this.inCutOverCriticalActionFlag, 1) - defer atomic.StoreInt64(&this.inCutOverCriticalActionFlag, 0) + atomic.StoreInt64(&this.migrationContext.InCutOverCriticalSectionFlag, 1) + defer atomic.StoreInt64(&this.migrationContext.InCutOverCriticalSectionFlag, 0) okToUnlockTable := make(chan bool, 4) defer func() { @@ -1022,15 +1021,8 @@ func (this *Migrator) executeWriteFuncs() error { return nil } for { - if atomic.LoadInt64(&this.inCutOverCriticalActionFlag) == 0 { - // we don't throttle when cutting over. We _do_ throttle: - // - during copy phase - // - just before cut-over - // - in between cut-over retries - this.throttler.throttle(nil) - // When cutting over, we need to be aggressive. Cut-over holds table locks. - // We need to release those asap. - } + this.throttler.throttle(nil) + // We give higher priority to event processing, then secondary priority to // rowcopy select {