diff --git a/go/logic/applier.go b/go/logic/applier.go index d1ad331..45e92f3 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -588,11 +588,22 @@ func (this *Applier) RenameTablesRollback() (renameError error) { // and have them written to the binary log, so that we can then read them via streamer. func (this *Applier) StopSlaveIOThread() error { query := `stop /* gh-ost */ slave io_thread` - log.Infof("Stopping replication") + log.Infof("Stopping replication IO thread") if _, err := sqlutils.ExecNoPrepare(this.db, query); err != nil { return err } - log.Infof("Replication stopped") + log.Infof("Replication IO thread stopped") + return nil +} + +// StartSlaveIOThread is applicable with --test-on-replica +func (this *Applier) StartSlaveIOThread() error { + query := `start /* gh-ost */ slave io_thread` + log.Infof("Starting replication IO thread") + if _, err := sqlutils.ExecNoPrepare(this.db, query); err != nil { + return err + } + log.Infof("Replication IO thread started") return nil } @@ -635,6 +646,18 @@ func (this *Applier) StopReplication() error { return nil } +// StartReplication is used by `--test-on-replica` on cut-over failure +func (this *Applier) StartReplication() error { + if err := this.StartSlaveIOThread(); err != nil { + return err + } + if err := this.StartSlaveSQLThread(); err != nil { + return err + } + log.Infof("Replication started") + return nil +} + // GetSessionLockName returns a name for the special hint session voluntary lock func (this *Applier) GetSessionLockName(sessionId int64) string { return fmt.Sprintf("gh-ost.%d.lock", sessionId) diff --git a/go/logic/hooks.go b/go/logic/hooks.go index 25d745a..89dbd65 100644 --- a/go/logic/hooks.go +++ b/go/logic/hooks.go @@ -30,6 +30,7 @@ const ( onFailure = "gh-ost-on-failure" onStatus = "gh-ost-on-status" onStopReplication = "gh-ost-on-stop-replication" + onStartReplication = "gh-ost-on-start-replication" ) type HooksExecutor struct { @@ -152,3 +153,7 @@ func (this *HooksExecutor) onStatus(statusMessage string) error { func (this *HooksExecutor) onStopReplication() error { return this.executeHooks(onStopReplication) } + +func (this *HooksExecutor) onStartReplication() error { + return this.executeHooks(onStartReplication) +} diff --git a/go/logic/migrator.go b/go/logic/migrator.go index 61db6db..58b5e79 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -385,8 +385,38 @@ func (this *Migrator) ExecOnFailureHook() (err error) { return this.hooksExecutor.onFailure() } +func (this *Migrator) handleCutOverResult(cutOverError error) (err error) { + if this.migrationContext.TestOnReplica { + // We're merly testing, we don't want to keep this state. Rollback the renames as possible + this.applier.RenameTablesRollback() + } + if cutOverError == nil { + return nil + } + // Only on error: + + if this.migrationContext.TestOnReplica { + // With `--test-on-replica` we stop replication thread, and then proceed to use + // the same cut-over phase as the master would use. That means we take locks + // and swap the tables. + // The difference is that we will later swap the tables back. + if err := this.hooksExecutor.onStartReplication(); err != nil { + return log.Errore(err) + } + if this.migrationContext.TestOnReplicaSkipReplicaStop { + log.Warningf("--test-on-replica-skip-replica-stop enabled, we are not starting replication.") + } else { + log.Debugf("testing on replica. Starting replication IO thread after cut-over failure") + if err := this.retryOperation(this.applier.StartReplication); err != nil { + return log.Errore(err) + } + } + } + return nil +} + // cutOver performs the final step of migration, based on migration -// type (on replica? bumpy? safe?) +// type (on replica? atomic? safe?) func (this *Migrator) cutOver() (err error) { if this.migrationContext.Noop { log.Debugf("Noop operation; not really swapping tables") @@ -441,18 +471,18 @@ func (this *Migrator) cutOver() (err error) { return err } } - // We're merly testing, we don't want to keep this state. Rollback the renames as possible - defer this.applier.RenameTablesRollback() - // We further proceed to do the cutover by normal means; the 'defer' above will rollback the swap } if this.migrationContext.CutOverType == base.CutOverAtomic { // Atomic solution: we use low timeout and multiple attempts. But for // each failed attempt, we throttle until replication lag is back to normal err := this.atomicCutOver() + this.handleCutOverResult(err) return err } if this.migrationContext.CutOverType == base.CutOverTwoStep { - return this.cutOverTwoStep() + err := this.cutOverTwoStep() + this.handleCutOverResult(err) + return err } return log.Fatalf("Unknown cut-over type: %d; should never get here!", this.migrationContext.CutOverType) } @@ -1043,8 +1073,10 @@ func (this *Migrator) onApplyEventStruct(eventStruct *applyEventStruct) error { availableEvents := len(this.applyEventsQueue) batchSize := int(atomic.LoadInt64(&this.migrationContext.DMLBatchSize)) - if availableEvents > batchSize { - availableEvents = batchSize + if availableEvents > batchSize-1 { + // The "- 1" is because we already consumed one event: the original event that led to this function getting called. + // So, if DMLBatchSize==1 we wish to not process any further events + availableEvents = batchSize - 1 } for i := 0; i < availableEvents; i++ { additionalStruct := <-this.applyEventsQueue