diff --git a/go/base/context.go b/go/base/context.go index f82fb22..21c5758 100644 --- a/go/base/context.go +++ b/go/base/context.go @@ -156,6 +156,7 @@ type MigrationContext struct { CleanupImminentFlag int64 UserCommandedUnpostponeFlag int64 CutOverCompleteFlag int64 + InCutOverCriticalSectionFlag int64 PanicAbort chan error OriginalTableColumnsOnApplier *sql.ColumnList @@ -438,6 +439,16 @@ func (this *MigrationContext) SetThrottled(throttle bool, reason string, reasonH func (this *MigrationContext) IsThrottled() (bool, string, ThrottleReasonHint) { this.throttleMutex.Lock() defer this.throttleMutex.Unlock() + + // we don't throttle when cutting over. We _do_ throttle: + // - during copy phase + // - just before cut-over + // - in between cut-over retries + // When cutting over, we need to be aggressive. Cut-over holds table locks. + // We need to release those asap. + if atomic.LoadInt64(&this.InCutOverCriticalSectionFlag) > 0 { + return false, "critical section", NoThrottleReasonHint + } return this.isThrottled, this.throttleReason, this.throttleReasonHint } diff --git a/go/logic/applier.go b/go/logic/applier.go index 73d98e8..a9735b6 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -694,8 +694,7 @@ func (this *Applier) DropAtomicCutOverSentryTableIfExists() error { return this.dropTable(tableName) } -// DropAtomicCutOverSentryTableIfExists checks if the "old" table name -// happens to be a cut-over magic table; if so, it drops it. +// CreateAtomicCutOverSentryTable func (this *Applier) CreateAtomicCutOverSentryTable() error { if err := this.DropAtomicCutOverSentryTableIfExists(); err != nil { return err diff --git a/go/logic/migrator.go b/go/logic/migrator.go index 88f5423..cf59972 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -11,6 +11,7 @@ import ( "math" "os" "os/signal" + "strings" "sync/atomic" "syscall" "time" @@ -30,6 +31,10 @@ const ( AllEventsUpToLockProcessed = "AllEventsUpToLockProcessed" ) +func ReadChangelogState(s string) ChangelogState { + return ChangelogState(strings.Split(s, ":")[0]) +} + type tableWriteFunc func() error const ( @@ -60,10 +65,9 @@ type Migrator struct { firstThrottlingCollected chan bool ghostTableMigrated chan bool rowCopyComplete chan bool - allEventsUpToLockProcessed chan bool + allEventsUpToLockProcessed chan string - rowCopyCompleteFlag int64 - inCutOverCriticalActionFlag int64 + rowCopyCompleteFlag int64 // copyRowsQueue should not be buffered; if buffered some non-damaging but // excessive work happens at the end of the iteration as new copy-jobs arrive befroe realizing the copy is complete copyRowsQueue chan tableWriteFunc @@ -79,7 +83,7 @@ func NewMigrator() *Migrator { ghostTableMigrated: make(chan bool), firstThrottlingCollected: make(chan bool, 1), rowCopyComplete: make(chan bool), - allEventsUpToLockProcessed: make(chan bool), + allEventsUpToLockProcessed: make(chan string), copyRowsQueue: make(chan tableWriteFunc), applyEventsQueue: make(chan tableWriteFunc, applyEventsQueueBuffer), @@ -180,7 +184,9 @@ func (this *Migrator) onChangelogStateEvent(dmlEvent *binlog.BinlogDMLEvent) (er if hint := dmlEvent.NewColumnValues.StringColumn(2); hint != "state" { return nil } - changelogState := ChangelogState(dmlEvent.NewColumnValues.StringColumn(3)) + changelogStateString := dmlEvent.NewColumnValues.StringColumn(3) + changelogState := ReadChangelogState(changelogStateString) + log.Infof("Intercepted changelog state %s", changelogState) switch changelogState { case GhostTableMigrated: { @@ -189,7 +195,7 @@ func (this *Migrator) onChangelogStateEvent(dmlEvent *binlog.BinlogDMLEvent) (er case AllEventsUpToLockProcessed: { applyEventFunc := func() error { - this.allEventsUpToLockProcessed <- true + this.allEventsUpToLockProcessed <- changelogStateString return nil } // at this point we know all events up to lock have been read from the streamer, @@ -206,7 +212,7 @@ func (this *Migrator) onChangelogStateEvent(dmlEvent *binlog.BinlogDMLEvent) (er return fmt.Errorf("Unknown changelog state: %+v", changelogState) } } - log.Debugf("Received state %+v", changelogState) + log.Infof("Handled changelog state %s", changelogState) return nil } @@ -343,7 +349,7 @@ func (this *Migrator) Migrate() (err error) { if err := this.hooksExecutor.onBeforeCutOver(); err != nil { return err } - if err := this.cutOver(); err != nil { + if err := this.retryOperation(this.cutOver); err != nil { return err } atomic.StoreInt64(&this.migrationContext.CutOverCompleteFlag, 1) @@ -377,16 +383,18 @@ func (this *Migrator) cutOver() (err error) { }) this.migrationContext.MarkPointOfInterest() + log.Debugf("checking for cut-over postpone") this.sleepWhileTrue( func() (bool, error) { if this.migrationContext.PostponeCutOverFlagFile == "" { return false, nil } if atomic.LoadInt64(&this.migrationContext.UserCommandedUnpostponeFlag) > 0 { + atomic.StoreInt64(&this.migrationContext.UserCommandedUnpostponeFlag, 0) return false, nil } if base.FileExists(this.migrationContext.PostponeCutOverFlagFile) { - // Throttle file defined and exists! + // Postpone file defined and exists! if atomic.LoadInt64(&this.migrationContext.IsPostponingCutOver) == 0 { if err := this.hooksExecutor.onBeginPostponed(); err != nil { return true, err @@ -400,6 +408,7 @@ func (this *Migrator) cutOver() (err error) { ) atomic.StoreInt64(&this.migrationContext.IsPostponingCutOver, 0) this.migrationContext.MarkPointOfInterest() + log.Debugf("checking for cut-over postpone: complete") if this.migrationContext.TestOnReplica { // With `--test-on-replica` we stop replication thread, and then proceed to use @@ -424,20 +433,11 @@ func (this *Migrator) cutOver() (err error) { if this.migrationContext.CutOverType == base.CutOverAtomic { // Atomic solution: we use low timeout and multiple attempts. But for // each failed attempt, we throttle until replication lag is back to normal - err := this.retryOperation( - func() error { - return this.executeAndThrottleOnError(this.atomicCutOver) - }, - ) + err := this.atomicCutOver() return err } if this.migrationContext.CutOverType == base.CutOverTwoStep { - err := this.retryOperation( - func() error { - return this.executeAndThrottleOnError(this.cutOverTwoStep) - }, - ) - return err + return this.cutOverTwoStep() } return log.Fatalf("Unknown cut-over type: %d; should never get here!", this.migrationContext.CutOverType) } @@ -445,16 +445,35 @@ func (this *Migrator) cutOver() (err error) { // Inject the "AllEventsUpToLockProcessed" state hint, wait for it to appear in the binary logs, // make sure the queue is drained. func (this *Migrator) waitForEventsUpToLock() (err error) { + timeout := time.NewTimer(time.Second * time.Duration(this.migrationContext.CutOverLockTimeoutSeconds)) + this.migrationContext.MarkPointOfInterest() waitForEventsUpToLockStartTime := time.Now() - log.Infof("Writing changelog state: %+v", AllEventsUpToLockProcessed) - if _, err := this.applier.WriteChangelogState(string(AllEventsUpToLockProcessed)); err != nil { + allEventsUpToLockProcessedChallenge := fmt.Sprintf("%s:%d", string(AllEventsUpToLockProcessed), waitForEventsUpToLockStartTime.UnixNano()) + log.Infof("Writing changelog state: %+v", allEventsUpToLockProcessedChallenge) + if _, err := this.applier.WriteChangelogState(allEventsUpToLockProcessedChallenge); err != nil { return err } log.Infof("Waiting for events up to lock") atomic.StoreInt64(&this.migrationContext.AllEventsUpToLockProcessedInjectedFlag, 1) - <-this.allEventsUpToLockProcessed + for found := false; !found; { + select { + case <-timeout.C: + { + return log.Errorf("Timeout while waiting for events up to lock") + } + case state := <-this.allEventsUpToLockProcessed: + { + if state == allEventsUpToLockProcessedChallenge { + log.Infof("Waiting for events up to lock: got %s", state) + found = true + } else { + log.Infof("Waiting for events up to lock: skipping %s", state) + } + } + } + } waitForEventsUpToLockDuration := time.Since(waitForEventsUpToLockStartTime) log.Infof("Done waiting for events up to lock; duration=%+v", waitForEventsUpToLockDuration) @@ -468,8 +487,8 @@ func (this *Migrator) waitForEventsUpToLock() (err error) { // There is a point in time where the "original" table does not exist and queries are non-blocked // and failing. func (this *Migrator) cutOverTwoStep() (err error) { - atomic.StoreInt64(&this.inCutOverCriticalActionFlag, 1) - defer atomic.StoreInt64(&this.inCutOverCriticalActionFlag, 0) + atomic.StoreInt64(&this.migrationContext.InCutOverCriticalSectionFlag, 1) + defer atomic.StoreInt64(&this.migrationContext.InCutOverCriticalSectionFlag, 0) atomic.StoreInt64(&this.migrationContext.AllEventsUpToLockProcessedInjectedFlag, 0) if err := this.retryOperation(this.applier.LockOriginalTable); err != nil { @@ -494,10 +513,12 @@ func (this *Migrator) cutOverTwoStep() (err error) { // atomicCutOver func (this *Migrator) atomicCutOver() (err error) { - atomic.StoreInt64(&this.inCutOverCriticalActionFlag, 1) - defer atomic.StoreInt64(&this.inCutOverCriticalActionFlag, 0) + atomic.StoreInt64(&this.migrationContext.InCutOverCriticalSectionFlag, 1) + defer atomic.StoreInt64(&this.migrationContext.InCutOverCriticalSectionFlag, 0) + okToUnlockTable := make(chan bool, 4) defer func() { + okToUnlockTable <- true this.applier.DropAtomicCutOverSentryTableIfExists() }() @@ -505,7 +526,6 @@ func (this *Migrator) atomicCutOver() (err error) { lockOriginalSessionIdChan := make(chan int64, 2) tableLocked := make(chan error, 2) - okToUnlockTable := make(chan bool, 3) tableUnlocked := make(chan error, 2) go func() { if err := this.applier.AtomicCutOverMagicLock(lockOriginalSessionIdChan, tableLocked, okToUnlockTable, tableUnlocked); err != nil { @@ -519,7 +539,9 @@ func (this *Migrator) atomicCutOver() (err error) { log.Infof("Session locking original & magic tables is %+v", lockOriginalSessionId) // At this point we know the original table is locked. // We know any newly incoming DML on original table is blocked. - this.waitForEventsUpToLock() + if err := this.waitForEventsUpToLock(); err != nil { + return log.Errore(err) + } // Step 2 // We now attempt an atomic RENAME on original & ghost tables, and expect it to block. @@ -999,15 +1021,8 @@ func (this *Migrator) executeWriteFuncs() error { return nil } for { - if atomic.LoadInt64(&this.inCutOverCriticalActionFlag) == 0 { - // we don't throttle when cutting over. We _do_ throttle: - // - during copy phase - // - just before cut-over - // - in between cut-over retries - this.throttler.throttle(nil) - // When cutting over, we need to be aggressive. Cut-over holds table locks. - // We need to release those asap. - } + this.throttler.throttle(nil) + // We give higher priority to event processing, then secondary priority to // rowcopy select {