replica-migration cleanup; updating allEventsUpToLockProcessedInjectedFlag

This commit is contained in:
Shlomi Noach 2016-07-26 12:06:20 +02:00
parent f3d78be28e
commit 7a70c24503

View File

@ -475,6 +475,7 @@ func (this *Migrator) cutOver() (err error) {
}
// We're merly testing, we don't want to keep this state. Rollback the renames as possible
defer this.applier.RenameTablesRollback()
// We further proceed to do the cutover by normal means; the 'defer' above will rollback the swap
}
if this.migrationContext.CutOverType == base.CutOverAtomic {
// Atomic solution: we use low timeout and multiple attempts. But for
@ -525,6 +526,7 @@ func (this *Migrator) waitForEventsUpToLock() (err error) {
func (this *Migrator) cutOverTwoStep() (err error) {
atomic.StoreInt64(&this.inCutOverCriticalActionFlag, 1)
defer atomic.StoreInt64(&this.inCutOverCriticalActionFlag, 0)
atomic.StoreInt64(&this.allEventsUpToLockProcessedInjectedFlag, 0)
if err := this.retryOperation(this.applier.LockOriginalTable); err != nil {
return err
@ -555,6 +557,8 @@ func (this *Migrator) atomicCutOver() (err error) {
this.applier.DropAtomicCutOverSentryTableIfExists()
}()
atomic.StoreInt64(&this.allEventsUpToLockProcessedInjectedFlag, 0)
lockOriginalSessionIdChan := make(chan int64, 2)
tableLocked := make(chan error, 2)
okToUnlockTable := make(chan bool, 3)
@ -633,22 +637,6 @@ func (this *Migrator) atomicCutOver() (err error) {
return nil
}
// stopWritesAndCompleteMigrationOnReplica will stop replication IO thread, apply
// what DML events are left, and that's it.
// This only applies in --test-on-replica. It leaves replication stopped, with both tables
// in sync. There is no table swap.
func (this *Migrator) stopWritesAndCompleteMigrationOnReplica() (err error) {
log.Debugf("testing on replica. Instead of LOCK tables I will STOP SLAVE")
if err := this.retryOperation(this.applier.StopReplication); err != nil {
return err
}
this.waitForEventsUpToLock()
log.Info("Table duplicated with new schema. Am not touching the original table. Replication is stopped. You may now compare the two tables to gain trust into this tool's operation")
return nil
}
// onServerCommand responds to a user's interactive command
func (this *Migrator) onServerCommand(command string, writer *bufio.Writer) (err error) {
defer writer.Flush()