From ee447ad56077e2e3421f21db99f2079491e2236e Mon Sep 17 00:00:00 2001
From: Shlomi Noach <shlomi-noach@github.com>
Date: Thu, 17 Nov 2016 15:20:44 +0100
Subject: [PATCH 1/5] waitForEventsUpToLock timeout more info on
 AllEventsUpToLockProcessed, before and after injecting/intercepting

---
 go/logic/migrator.go | 20 ++++++++++++++++----
 1 file changed, 16 insertions(+), 4 deletions(-)

diff --git a/go/logic/migrator.go b/go/logic/migrator.go
index 88f5423..50ed38f 100644
--- a/go/logic/migrator.go
+++ b/go/logic/migrator.go
@@ -78,8 +78,8 @@ func NewMigrator() *Migrator {
 		parser:                     sql.NewParser(),
 		ghostTableMigrated:         make(chan bool),
 		firstThrottlingCollected:   make(chan bool, 1),
-		rowCopyComplete:            make(chan bool),
-		allEventsUpToLockProcessed: make(chan bool),
+		rowCopyComplete:            make(chan bool, 1),
+		allEventsUpToLockProcessed: make(chan bool, 1),
 
 		copyRowsQueue:          make(chan tableWriteFunc),
 		applyEventsQueue:       make(chan tableWriteFunc, applyEventsQueueBuffer),
@@ -181,6 +181,7 @@ func (this *Migrator) onChangelogStateEvent(dmlEvent *binlog.BinlogDMLEvent) (er
 		return nil
 	}
 	changelogState := ChangelogState(dmlEvent.NewColumnValues.StringColumn(3))
+	log.Infof("Intercepted changelog state %s", changelogState)
 	switch changelogState {
 	case GhostTableMigrated:
 		{
@@ -206,7 +207,7 @@ func (this *Migrator) onChangelogStateEvent(dmlEvent *binlog.BinlogDMLEvent) (er
 			return fmt.Errorf("Unknown changelog state: %+v", changelogState)
 		}
 	}
-	log.Debugf("Received state %+v", changelogState)
+	log.Infof("Handled changelog state %s", changelogState)
 	return nil
 }
 
@@ -445,6 +446,8 @@ func (this *Migrator) cutOver() (err error) {
 // Inject the "AllEventsUpToLockProcessed" state hint, wait for it to appear in the binary logs,
 // make sure the queue is drained.
 func (this *Migrator) waitForEventsUpToLock() (err error) {
+	timeout := time.NewTimer(time.Second * time.Duration(this.migrationContext.CutOverLockTimeoutSeconds))
+
 	this.migrationContext.MarkPointOfInterest()
 	waitForEventsUpToLockStartTime := time.Now()
 
@@ -454,7 +457,16 @@ func (this *Migrator) waitForEventsUpToLock() (err error) {
 	}
 	log.Infof("Waiting for events up to lock")
 	atomic.StoreInt64(&this.migrationContext.AllEventsUpToLockProcessedInjectedFlag, 1)
-	<-this.allEventsUpToLockProcessed
+	select {
+	case <-timeout.C:
+		{
+			return log.Errorf("Timeout while waiting for events up to lock")
+		}
+	case <-this.allEventsUpToLockProcessed:
+		{
+			log.Infof("Waiting for events up to lock: got allEventsUpToLockProcessed.")
+		}
+	}
 	waitForEventsUpToLockDuration := time.Since(waitForEventsUpToLockStartTime)
 
 	log.Infof("Done waiting for events up to lock; duration=%+v", waitForEventsUpToLockDuration)

From ef874b855163270d4691b050a40a790b8c55819f Mon Sep 17 00:00:00 2001
From: Shlomi Noach <shlomi-noach@github.com>
Date: Thu, 17 Nov 2016 15:50:54 +0100
Subject: [PATCH 2/5] AllEventsUpToLockProcessed uses unique signature

---
 go/logic/migrator.go | 40 +++++++++++++++++++++++++---------------
 1 file changed, 25 insertions(+), 15 deletions(-)

diff --git a/go/logic/migrator.go b/go/logic/migrator.go
index 50ed38f..eabc5fa 100644
--- a/go/logic/migrator.go
+++ b/go/logic/migrator.go
@@ -11,6 +11,7 @@ import (
 	"math"
 	"os"
 	"os/signal"
+	"strings"
 	"sync/atomic"
 	"syscall"
 	"time"
@@ -60,7 +61,7 @@ type Migrator struct {
 	firstThrottlingCollected   chan bool
 	ghostTableMigrated         chan bool
 	rowCopyComplete            chan bool
-	allEventsUpToLockProcessed chan bool
+	allEventsUpToLockProcessed chan string
 
 	rowCopyCompleteFlag         int64
 	inCutOverCriticalActionFlag int64
@@ -78,8 +79,8 @@ func NewMigrator() *Migrator {
 		parser:                     sql.NewParser(),
 		ghostTableMigrated:         make(chan bool),
 		firstThrottlingCollected:   make(chan bool, 1),
-		rowCopyComplete:            make(chan bool, 1),
-		allEventsUpToLockProcessed: make(chan bool, 1),
+		rowCopyComplete:            make(chan bool),
+		allEventsUpToLockProcessed: make(chan string),
 
 		copyRowsQueue:          make(chan tableWriteFunc),
 		applyEventsQueue:       make(chan tableWriteFunc, applyEventsQueueBuffer),
@@ -180,7 +181,8 @@ func (this *Migrator) onChangelogStateEvent(dmlEvent *binlog.BinlogDMLEvent) (er
 	if hint := dmlEvent.NewColumnValues.StringColumn(2); hint != "state" {
 		return nil
 	}
-	changelogState := ChangelogState(dmlEvent.NewColumnValues.StringColumn(3))
+	changelogStateString := dmlEvent.NewColumnValues.StringColumn(3)
+	changelogState := ChangelogState(strings.Split(changelogStateString, ":")[0])
 	log.Infof("Intercepted changelog state %s", changelogState)
 	switch changelogState {
 	case GhostTableMigrated:
@@ -190,7 +192,7 @@ func (this *Migrator) onChangelogStateEvent(dmlEvent *binlog.BinlogDMLEvent) (er
 	case AllEventsUpToLockProcessed:
 		{
 			applyEventFunc := func() error {
-				this.allEventsUpToLockProcessed <- true
+				this.allEventsUpToLockProcessed <- changelogStateString
 				return nil
 			}
 			// at this point we know all events up to lock have been read from the streamer,
@@ -451,20 +453,28 @@ func (this *Migrator) waitForEventsUpToLock() (err error) {
 	this.migrationContext.MarkPointOfInterest()
 	waitForEventsUpToLockStartTime := time.Now()
 
-	log.Infof("Writing changelog state: %+v", AllEventsUpToLockProcessed)
-	if _, err := this.applier.WriteChangelogState(string(AllEventsUpToLockProcessed)); err != nil {
+	allEventsUpToLockProcessedChallenge := fmt.Sprintf("%s:%d", string(AllEventsUpToLockProcessed), waitForEventsUpToLockStartTime.UnixNano())
+	log.Infof("Writing changelog state: %+v", allEventsUpToLockProcessedChallenge)
+	if _, err := this.applier.WriteChangelogState(allEventsUpToLockProcessedChallenge); err != nil {
 		return err
 	}
 	log.Infof("Waiting for events up to lock")
 	atomic.StoreInt64(&this.migrationContext.AllEventsUpToLockProcessedInjectedFlag, 1)
-	select {
-	case <-timeout.C:
-		{
-			return log.Errorf("Timeout while waiting for events up to lock")
-		}
-	case <-this.allEventsUpToLockProcessed:
-		{
-			log.Infof("Waiting for events up to lock: got allEventsUpToLockProcessed.")
+	for found := false; !found; {
+		select {
+		case <-timeout.C:
+			{
+				return log.Errorf("Timeout while waiting for events up to lock")
+			}
+		case state := <-this.allEventsUpToLockProcessed:
+			{
+				if state == allEventsUpToLockProcessedChallenge {
+					log.Infof("Waiting for events up to lock: got %s", state)
+					found = true
+				} else {
+					log.Infof("Waiting for events up to lock: skipping %s", state)
+				}
+			}
 		}
 	}
 	waitForEventsUpToLockDuration := time.Since(waitForEventsUpToLockStartTime)

From 8d987b5aaf52db5428f54a7924b1f4671280aa46 Mon Sep 17 00:00:00 2001
From: Shlomi Noach <shlomi-noach@github.com>
Date: Thu, 17 Nov 2016 15:56:59 +0100
Subject: [PATCH 3/5] extracted parsing of ChangelogState

---
 go/logic/migrator.go | 6 +++++-
 1 file changed, 5 insertions(+), 1 deletion(-)

diff --git a/go/logic/migrator.go b/go/logic/migrator.go
index eabc5fa..20475bf 100644
--- a/go/logic/migrator.go
+++ b/go/logic/migrator.go
@@ -31,6 +31,10 @@ const (
 	AllEventsUpToLockProcessed                = "AllEventsUpToLockProcessed"
 )
 
+func ReadChangelogState(s string) ChangelogState {
+	return ChangelogState(strings.Split(s, ":")[0])
+}
+
 type tableWriteFunc func() error
 
 const (
@@ -182,7 +186,7 @@ func (this *Migrator) onChangelogStateEvent(dmlEvent *binlog.BinlogDMLEvent) (er
 		return nil
 	}
 	changelogStateString := dmlEvent.NewColumnValues.StringColumn(3)
-	changelogState := ChangelogState(strings.Split(changelogStateString, ":")[0])
+	changelogState := ReadChangelogState(changelogStateString)
 	log.Infof("Intercepted changelog state %s", changelogState)
 	switch changelogState {
 	case GhostTableMigrated:

From b00cae11fa408e9e3a58da34b50cdd79db53742d Mon Sep 17 00:00:00 2001
From: Shlomi Noach <shlomi-noach@github.com>
Date: Thu, 17 Nov 2016 17:10:17 +0100
Subject: [PATCH 4/5] retry cut-over

---
 go/logic/applier.go  |  3 +--
 go/logic/migrator.go | 27 ++++++++++++---------------
 2 files changed, 13 insertions(+), 17 deletions(-)

diff --git a/go/logic/applier.go b/go/logic/applier.go
index 73d98e8..a9735b6 100644
--- a/go/logic/applier.go
+++ b/go/logic/applier.go
@@ -694,8 +694,7 @@ func (this *Applier) DropAtomicCutOverSentryTableIfExists() error {
 	return this.dropTable(tableName)
 }
 
-// DropAtomicCutOverSentryTableIfExists checks if the "old" table name
-// happens to be a cut-over magic table; if so, it drops it.
+// CreateAtomicCutOverSentryTable
 func (this *Applier) CreateAtomicCutOverSentryTable() error {
 	if err := this.DropAtomicCutOverSentryTableIfExists(); err != nil {
 		return err
diff --git a/go/logic/migrator.go b/go/logic/migrator.go
index 20475bf..442c730 100644
--- a/go/logic/migrator.go
+++ b/go/logic/migrator.go
@@ -350,7 +350,7 @@ func (this *Migrator) Migrate() (err error) {
 	if err := this.hooksExecutor.onBeforeCutOver(); err != nil {
 		return err
 	}
-	if err := this.cutOver(); err != nil {
+	if err := this.retryOperation(this.cutOver); err != nil {
 		return err
 	}
 	atomic.StoreInt64(&this.migrationContext.CutOverCompleteFlag, 1)
@@ -384,16 +384,18 @@ func (this *Migrator) cutOver() (err error) {
 	})
 
 	this.migrationContext.MarkPointOfInterest()
+	log.Debugf("checking for cut-over postpone")
 	this.sleepWhileTrue(
 		func() (bool, error) {
 			if this.migrationContext.PostponeCutOverFlagFile == "" {
 				return false, nil
 			}
 			if atomic.LoadInt64(&this.migrationContext.UserCommandedUnpostponeFlag) > 0 {
+				atomic.StoreInt64(&this.migrationContext.UserCommandedUnpostponeFlag, 0)
 				return false, nil
 			}
 			if base.FileExists(this.migrationContext.PostponeCutOverFlagFile) {
-				// Throttle file defined and exists!
+				// Postpone file defined and exists!
 				if atomic.LoadInt64(&this.migrationContext.IsPostponingCutOver) == 0 {
 					if err := this.hooksExecutor.onBeginPostponed(); err != nil {
 						return true, err
@@ -431,20 +433,11 @@ func (this *Migrator) cutOver() (err error) {
 	if this.migrationContext.CutOverType == base.CutOverAtomic {
 		// Atomic solution: we use low timeout and multiple attempts. But for
 		// each failed attempt, we throttle until replication lag is back to normal
-		err := this.retryOperation(
-			func() error {
-				return this.executeAndThrottleOnError(this.atomicCutOver)
-			},
-		)
+		err := this.atomicCutOver()
 		return err
 	}
 	if this.migrationContext.CutOverType == base.CutOverTwoStep {
-		err := this.retryOperation(
-			func() error {
-				return this.executeAndThrottleOnError(this.cutOverTwoStep)
-			},
-		)
-		return err
+		return this.cutOverTwoStep()
 	}
 	return log.Fatalf("Unknown cut-over type: %d; should never get here!", this.migrationContext.CutOverType)
 }
@@ -452,6 +445,7 @@ func (this *Migrator) cutOver() (err error) {
 // Inject the "AllEventsUpToLockProcessed" state hint, wait for it to appear in the binary logs,
 // make sure the queue is drained.
 func (this *Migrator) waitForEventsUpToLock() (err error) {
+	//	timeout := time.NewTimer(time.Minute * time.Duration(this.migrationContext.CutOverLockTimeoutSeconds))
 	timeout := time.NewTimer(time.Second * time.Duration(this.migrationContext.CutOverLockTimeoutSeconds))
 
 	this.migrationContext.MarkPointOfInterest()
@@ -523,7 +517,9 @@ func (this *Migrator) atomicCutOver() (err error) {
 	atomic.StoreInt64(&this.inCutOverCriticalActionFlag, 1)
 	defer atomic.StoreInt64(&this.inCutOverCriticalActionFlag, 0)
 
+	okToUnlockTable := make(chan bool, 4)
 	defer func() {
+		okToUnlockTable <- true
 		this.applier.DropAtomicCutOverSentryTableIfExists()
 	}()
 
@@ -531,7 +527,6 @@ func (this *Migrator) atomicCutOver() (err error) {
 
 	lockOriginalSessionIdChan := make(chan int64, 2)
 	tableLocked := make(chan error, 2)
-	okToUnlockTable := make(chan bool, 3)
 	tableUnlocked := make(chan error, 2)
 	go func() {
 		if err := this.applier.AtomicCutOverMagicLock(lockOriginalSessionIdChan, tableLocked, okToUnlockTable, tableUnlocked); err != nil {
@@ -545,7 +540,9 @@ func (this *Migrator) atomicCutOver() (err error) {
 	log.Infof("Session locking original & magic tables is %+v", lockOriginalSessionId)
 	// At this point we know the original table is locked.
 	// We know any newly incoming DML on original table is blocked.
-	this.waitForEventsUpToLock()
+	if err := this.waitForEventsUpToLock(); err != nil {
+		return log.Errore(err)
+	}
 
 	// Step 2
 	// We now attempt an atomic RENAME on original & ghost tables, and expect it to block.

From 7ab6af8f5fcaac824c26c16b1fbd60847dc9ebda Mon Sep 17 00:00:00 2001
From: Shlomi Noach <shlomi-noach@github.com>
Date: Thu, 17 Nov 2016 17:22:13 +0100
Subject: [PATCH 5/5] never throttling inside cut-over critical section

---
 go/base/context.go   | 11 +++++++++++
 go/logic/migrator.go | 24 ++++++++----------------
 2 files changed, 19 insertions(+), 16 deletions(-)

diff --git a/go/base/context.go b/go/base/context.go
index f82fb22..21c5758 100644
--- a/go/base/context.go
+++ b/go/base/context.go
@@ -156,6 +156,7 @@ type MigrationContext struct {
 	CleanupImminentFlag                    int64
 	UserCommandedUnpostponeFlag            int64
 	CutOverCompleteFlag                    int64
+	InCutOverCriticalSectionFlag           int64
 	PanicAbort                             chan error
 
 	OriginalTableColumnsOnApplier    *sql.ColumnList
@@ -438,6 +439,16 @@ func (this *MigrationContext) SetThrottled(throttle bool, reason string, reasonH
 func (this *MigrationContext) IsThrottled() (bool, string, ThrottleReasonHint) {
 	this.throttleMutex.Lock()
 	defer this.throttleMutex.Unlock()
+
+	// we don't throttle when cutting over. We _do_ throttle:
+	// - during copy phase
+	// - just before cut-over
+	// - in between cut-over retries
+	// When cutting over, we need to be aggressive. Cut-over holds table locks.
+	// We need to release those asap.
+	if atomic.LoadInt64(&this.InCutOverCriticalSectionFlag) > 0 {
+		return false, "critical section", NoThrottleReasonHint
+	}
 	return this.isThrottled, this.throttleReason, this.throttleReasonHint
 }
 
diff --git a/go/logic/migrator.go b/go/logic/migrator.go
index 442c730..cf59972 100644
--- a/go/logic/migrator.go
+++ b/go/logic/migrator.go
@@ -67,8 +67,7 @@ type Migrator struct {
 	rowCopyComplete            chan bool
 	allEventsUpToLockProcessed chan string
 
-	rowCopyCompleteFlag         int64
-	inCutOverCriticalActionFlag int64
+	rowCopyCompleteFlag int64
 	// copyRowsQueue should not be buffered; if buffered some non-damaging but
 	//  excessive work happens at the end of the iteration as new copy-jobs arrive befroe realizing the copy is complete
 	copyRowsQueue    chan tableWriteFunc
@@ -409,6 +408,7 @@ func (this *Migrator) cutOver() (err error) {
 	)
 	atomic.StoreInt64(&this.migrationContext.IsPostponingCutOver, 0)
 	this.migrationContext.MarkPointOfInterest()
+	log.Debugf("checking for cut-over postpone: complete")
 
 	if this.migrationContext.TestOnReplica {
 		// With `--test-on-replica` we stop replication thread, and then proceed to use
@@ -445,7 +445,6 @@ func (this *Migrator) cutOver() (err error) {
 // Inject the "AllEventsUpToLockProcessed" state hint, wait for it to appear in the binary logs,
 // make sure the queue is drained.
 func (this *Migrator) waitForEventsUpToLock() (err error) {
-	//	timeout := time.NewTimer(time.Minute * time.Duration(this.migrationContext.CutOverLockTimeoutSeconds))
 	timeout := time.NewTimer(time.Second * time.Duration(this.migrationContext.CutOverLockTimeoutSeconds))
 
 	this.migrationContext.MarkPointOfInterest()
@@ -488,8 +487,8 @@ func (this *Migrator) waitForEventsUpToLock() (err error) {
 // There is a point in time where the "original" table does not exist and queries are non-blocked
 // and failing.
 func (this *Migrator) cutOverTwoStep() (err error) {
-	atomic.StoreInt64(&this.inCutOverCriticalActionFlag, 1)
-	defer atomic.StoreInt64(&this.inCutOverCriticalActionFlag, 0)
+	atomic.StoreInt64(&this.migrationContext.InCutOverCriticalSectionFlag, 1)
+	defer atomic.StoreInt64(&this.migrationContext.InCutOverCriticalSectionFlag, 0)
 	atomic.StoreInt64(&this.migrationContext.AllEventsUpToLockProcessedInjectedFlag, 0)
 
 	if err := this.retryOperation(this.applier.LockOriginalTable); err != nil {
@@ -514,8 +513,8 @@ func (this *Migrator) cutOverTwoStep() (err error) {
 
 // atomicCutOver
 func (this *Migrator) atomicCutOver() (err error) {
-	atomic.StoreInt64(&this.inCutOverCriticalActionFlag, 1)
-	defer atomic.StoreInt64(&this.inCutOverCriticalActionFlag, 0)
+	atomic.StoreInt64(&this.migrationContext.InCutOverCriticalSectionFlag, 1)
+	defer atomic.StoreInt64(&this.migrationContext.InCutOverCriticalSectionFlag, 0)
 
 	okToUnlockTable := make(chan bool, 4)
 	defer func() {
@@ -1022,15 +1021,8 @@ func (this *Migrator) executeWriteFuncs() error {
 		return nil
 	}
 	for {
-		if atomic.LoadInt64(&this.inCutOverCriticalActionFlag) == 0 {
-			// we don't throttle when cutting over. We _do_ throttle:
-			// - during copy phase
-			// - just before cut-over
-			// - in between cut-over retries
-			this.throttler.throttle(nil)
-			// When cutting over, we need to be aggressive. Cut-over holds table locks.
-			// We need to release those asap.
-		}
+		this.throttler.throttle(nil)
+
 		// We give higher priority to event processing, then secondary priority to
 		// rowcopy
 		select {