Add Migrated changelog event

This commit is contained in:
Tim Vaillancourt 2022-06-04 23:59:00 +02:00
parent ed46138c06
commit af99b247f9

View File

@ -24,8 +24,9 @@ import (
type ChangelogState string type ChangelogState string
const ( const (
AllEventsUpToLockProcessed ChangelogState = "AllEventsUpToLockProcessed"
GhostTableMigrated ChangelogState = "GhostTableMigrated" GhostTableMigrated ChangelogState = "GhostTableMigrated"
AllEventsUpToLockProcessed = "AllEventsUpToLockProcessed" Migrated ChangelogState = "Migrated"
) )
func ReadChangelogState(s string) ChangelogState { func ReadChangelogState(s string) ChangelogState {
@ -215,6 +216,8 @@ func (this *Migrator) onChangelogStateEvent(dmlEvent *binlog.BinlogDMLEvent) (er
changelogState := ReadChangelogState(changelogStateString) changelogState := ReadChangelogState(changelogStateString)
this.migrationContext.Log.Infof("Intercepted changelog state %s", changelogState) this.migrationContext.Log.Infof("Intercepted changelog state %s", changelogState)
switch changelogState { switch changelogState {
case Migrated:
// no-op event
case GhostTableMigrated: case GhostTableMigrated:
{ {
this.ghostTableMigrated <- true this.ghostTableMigrated <- true
@ -534,19 +537,20 @@ func (this *Migrator) cutOver() (err error) {
} }
} }
} }
if this.migrationContext.CutOverType == base.CutOverAtomic {
switch this.migrationContext.CutOverType {
case base.CutOverAtomic:
// Atomic solution: we use low timeout and multiple attempts. But for // Atomic solution: we use low timeout and multiple attempts. But for
// each failed attempt, we throttle until replication lag is back to normal // each failed attempt, we throttle until replication lag is back to normal
err := this.atomicCutOver() err = this.atomicCutOver()
this.handleCutOverResult(err) case base.CutOverTwoStep:
return err err = this.cutOverTwoStep()
default:
return this.migrationContext.Log.Fatalf("Unknown cut-over type: %d; should never get here!", this.migrationContext.CutOverType)
} }
if this.migrationContext.CutOverType == base.CutOverTwoStep {
err := this.cutOverTwoStep() this.handleCutOverResult(err)
this.handleCutOverResult(err) return err
return err
}
return this.migrationContext.Log.Fatalf("Unknown cut-over type: %d; should never get here!", this.migrationContext.CutOverType)
} }
// Inject the "AllEventsUpToLockProcessed" state hint, wait for it to appear in the binary logs, // Inject the "AllEventsUpToLockProcessed" state hint, wait for it to appear in the binary logs,
@ -1296,6 +1300,11 @@ func (this *Migrator) executeWriteFuncs() error {
func (this *Migrator) finalCleanup() error { func (this *Migrator) finalCleanup() error {
atomic.StoreInt64(&this.migrationContext.CleanupImminentFlag, 1) atomic.StoreInt64(&this.migrationContext.CleanupImminentFlag, 1)
this.migrationContext.Log.Infof("Writing changelog state: %+v", Migrated)
if _, err := this.applier.WriteChangelogState(string(Migrated)); err != nil {
return err
}
if this.migrationContext.Noop { if this.migrationContext.Noop {
if createTableStatement, err := this.inspector.showCreateTable(this.migrationContext.GetGhostTableName()); err == nil { if createTableStatement, err := this.inspector.showCreateTable(this.migrationContext.GetGhostTableName()); err == nil {
this.migrationContext.Log.Infof("New table structure follows") this.migrationContext.Log.Infof("New table structure follows")