Merge pull request #1134 from github/changelog-migrated-state

Add `Migrated` changelog event state
This commit is contained in:
dm-2 2022-07-08 10:48:40 +01:00 committed by GitHub
commit 4648da15ec
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -25,8 +25,9 @@ import (
type ChangelogState string type ChangelogState string
const ( const (
GhostTableMigrated ChangelogState = "GhostTableMigrated"
AllEventsUpToLockProcessed ChangelogState = "AllEventsUpToLockProcessed" AllEventsUpToLockProcessed ChangelogState = "AllEventsUpToLockProcessed"
GhostTableMigrated ChangelogState = "GhostTableMigrated"
Migrated ChangelogState = "Migrated"
ReadMigrationRangeValues ChangelogState = "ReadMigrationRangeValues" ReadMigrationRangeValues ChangelogState = "ReadMigrationRangeValues"
) )
@ -219,6 +220,8 @@ func (this *Migrator) onChangelogStateEvent(dmlEvent *binlog.BinlogDMLEvent) (er
changelogState := ReadChangelogState(changelogStateString) changelogState := ReadChangelogState(changelogStateString)
this.migrationContext.Log.Infof("Intercepted changelog state %s", changelogState) this.migrationContext.Log.Infof("Intercepted changelog state %s", changelogState)
switch changelogState { switch changelogState {
case Migrated, ReadMigrationRangeValues:
// no-op event
case GhostTableMigrated: case GhostTableMigrated:
{ {
this.ghostTableMigrated <- true this.ghostTableMigrated <- true
@ -238,8 +241,6 @@ func (this *Migrator) onChangelogStateEvent(dmlEvent *binlog.BinlogDMLEvent) (er
this.applyEventsQueue <- newApplyEventStructByFunc(&applyEventFunc) this.applyEventsQueue <- newApplyEventStructByFunc(&applyEventFunc)
}() }()
} }
case ReadMigrationRangeValues:
// no-op event
default: default:
{ {
return fmt.Errorf("Unknown changelog state: %+v", changelogState) return fmt.Errorf("Unknown changelog state: %+v", changelogState)
@ -1312,6 +1313,11 @@ func (this *Migrator) executeWriteFuncs() error {
func (this *Migrator) finalCleanup() error { func (this *Migrator) finalCleanup() error {
atomic.StoreInt64(&this.migrationContext.CleanupImminentFlag, 1) atomic.StoreInt64(&this.migrationContext.CleanupImminentFlag, 1)
this.migrationContext.Log.Infof("Writing changelog state: %+v", Migrated)
if _, err := this.applier.WriteChangelogState(string(Migrated)); err != nil {
return err
}
if this.migrationContext.Noop { if this.migrationContext.Noop {
if createTableStatement, err := this.inspector.showCreateTable(this.migrationContext.GetGhostTableName()); err == nil { if createTableStatement, err := this.inspector.showCreateTable(this.migrationContext.GetGhostTableName()); err == nil {
this.migrationContext.Log.Infof("New table structure follows") this.migrationContext.Log.Infof("New table structure follows")