From af99b247f9525a46a9e20f3b8711c457fab73da2 Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Sat, 4 Jun 2022 23:59:00 +0200 Subject: [PATCH] Add `Migrated` changelog event --- go/logic/migrator.go | 31 ++++++++++++++++++++----------- 1 file changed, 20 insertions(+), 11 deletions(-) diff --git a/go/logic/migrator.go b/go/logic/migrator.go index c5cb244..13108bc 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -24,8 +24,9 @@ import ( type ChangelogState string const ( + AllEventsUpToLockProcessed ChangelogState = "AllEventsUpToLockProcessed" GhostTableMigrated ChangelogState = "GhostTableMigrated" - AllEventsUpToLockProcessed = "AllEventsUpToLockProcessed" + Migrated ChangelogState = "Migrated" ) func ReadChangelogState(s string) ChangelogState { @@ -215,6 +216,8 @@ func (this *Migrator) onChangelogStateEvent(dmlEvent *binlog.BinlogDMLEvent) (er changelogState := ReadChangelogState(changelogStateString) this.migrationContext.Log.Infof("Intercepted changelog state %s", changelogState) switch changelogState { + case Migrated: + // no-op event case GhostTableMigrated: { this.ghostTableMigrated <- true @@ -534,19 +537,20 @@ func (this *Migrator) cutOver() (err error) { } } } - if this.migrationContext.CutOverType == base.CutOverAtomic { + + switch this.migrationContext.CutOverType { + case base.CutOverAtomic: // Atomic solution: we use low timeout and multiple attempts. But for // each failed attempt, we throttle until replication lag is back to normal - err := this.atomicCutOver() - this.handleCutOverResult(err) - return err + err = this.atomicCutOver() + case base.CutOverTwoStep: + err = this.cutOverTwoStep() + default: + return this.migrationContext.Log.Fatalf("Unknown cut-over type: %d; should never get here!", this.migrationContext.CutOverType) } - if this.migrationContext.CutOverType == base.CutOverTwoStep { - err := this.cutOverTwoStep() - this.handleCutOverResult(err) - return err - } - return this.migrationContext.Log.Fatalf("Unknown cut-over type: %d; should never get here!", this.migrationContext.CutOverType) + + this.handleCutOverResult(err) + return err } // Inject the "AllEventsUpToLockProcessed" state hint, wait for it to appear in the binary logs, @@ -1296,6 +1300,11 @@ func (this *Migrator) executeWriteFuncs() error { func (this *Migrator) finalCleanup() error { atomic.StoreInt64(&this.migrationContext.CleanupImminentFlag, 1) + this.migrationContext.Log.Infof("Writing changelog state: %+v", Migrated) + if _, err := this.applier.WriteChangelogState(string(Migrated)); err != nil { + return err + } + if this.migrationContext.Noop { if createTableStatement, err := this.inspector.showCreateTable(this.migrationContext.GetGhostTableName()); err == nil { this.migrationContext.Log.Infof("New table structure follows")