diff --git a/go/logic/applier.go b/go/logic/applier.go index 2a59f3f..e162b0c 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -30,16 +30,18 @@ const ( // Applier is the one to actually write row data and apply binlog events onto the ghost table. // It is where the ghost & changelog tables get created. It is where the cut-over phase happens. type Applier struct { - connectionConfig *mysql.ConnectionConfig - db *gosql.DB - singletonDB *gosql.DB - migrationContext *base.MigrationContext + connectionConfig *mysql.ConnectionConfig + db *gosql.DB + singletonDB *gosql.DB + migrationContext *base.MigrationContext + finishedMigrating bool } func NewApplier(migrationContext *base.MigrationContext) *Applier { return &Applier{ - connectionConfig: migrationContext.ApplierConnectionConfig, - migrationContext: migrationContext, + connectionConfig: migrationContext.ApplierConnectionConfig, + migrationContext: migrationContext, + finishedMigrating: false, } } @@ -288,6 +290,10 @@ func (this *Applier) WriteChangelogState(value string) (string, error) { return this.WriteAndLogChangelog("state", value) } +func (this *Applier) FinalCleanup() { + this.finishedMigrating = true +} + // InitiateHeartbeat creates a heartbeat cycle, writing to the changelog table. // This is done asynchronously func (this *Applier) InitiateHeartbeat() { @@ -310,6 +316,9 @@ func (this *Applier) InitiateHeartbeat() { heartbeatTick := time.Tick(time.Duration(this.migrationContext.HeartbeatIntervalMilliseconds) * time.Millisecond) for range heartbeatTick { + if this.finishedMigrating { + return + } // Generally speaking, we would issue a goroutine, but I'd actually rather // have this block the loop rather than spam the master in the event something // goes wrong diff --git a/go/logic/migrator.go b/go/logic/migrator.go index e63ce86..98e4ae2 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -83,6 +83,8 @@ type Migrator struct { applyEventsQueue chan *applyEventStruct handledChangelogStates map[string]bool + + finishedMigrating bool } func NewMigrator(context *base.MigrationContext) *Migrator { @@ -97,6 +99,7 @@ func NewMigrator(context *base.MigrationContext) *Migrator { copyRowsQueue: make(chan tableWriteFunc), applyEventsQueue: make(chan *applyEventStruct, base.MaxEventsBatchSize), handledChangelogStates: make(map[string]bool), + finishedMigrating: false, } return migrator } @@ -718,6 +721,9 @@ func (this *Migrator) initiateStatus() error { this.printStatus(ForcePrintStatusAndHintRule) statusTick := time.Tick(1 * time.Second) for range statusTick { + if this.finishedMigrating { + return nil + } go this.printStatus(HeuristicPrintStatusRule) } @@ -942,6 +948,9 @@ func (this *Migrator) initiateStreaming() error { go func() { ticker := time.Tick(1 * time.Second) for range ticker { + if this.finishedMigrating { + return + } this.migrationContext.SetRecentBinlogCoordinates(*this.eventsStreamer.GetCurrentBinlogCoordinates()) } }() @@ -1132,6 +1141,10 @@ func (this *Migrator) executeWriteFuncs() error { return nil } for { + if this.finishedMigrating { + return nil + } + this.throttler.throttle(nil) // We give higher priority to event processing, then secondary priority to @@ -1209,5 +1222,8 @@ func (this *Migrator) finalCleanup() error { } } + this.finishedMigrating = true + this.applier.FinalCleanup() + return nil }