From 1e10f1f29ebc48ab28edee49e5077e741ff2e7f9 Mon Sep 17 00:00:00 2001
From: Shlomi Noach <shlomi-noach@github.com>
Date: Mon, 16 May 2016 11:03:15 +0200
Subject: [PATCH] Solved various race conditions: - Operation would terminate
 after events lock noticed but before applying all events: race condition
 where the event would be captured asynchronously. The event is now handled
 sequentially with the DML events, hence now safe. - Multiple rowcopy
 operations would still write to `rowCopyComplete` channel. This is still the
 case, but now we only wait for the first and then just flush (read and
 discard) any others, to avoid blocking - Events DML listener is only added
 after table creation: the problem was that with very busy tables, the events
 func buffer would fill up, and the "tables-created" event would be blocked. -
 `waitForEventsUpToLock()` unifies the waiting on all variants of
 complete-migration - With `--test-on-replica`, now stopping replication
 "nicely", using `master_pos_wait()` - With `--test-on-replica`, not
 throttling on replication after replication is stopped (duh) - More debug
 output

---
 go/base/context.go   |   6 +-
 go/logic/applier.go  |  33 +++++++++
 go/logic/migrator.go | 155 ++++++++++++++++++++++++-------------------
 go/logic/streamer.go |  10 +++
 go/mysql/utils.go    |  22 ++++++
 5 files changed, 155 insertions(+), 71 deletions(-)

diff --git a/go/base/context.go b/go/base/context.go
index f32ff10..4a7029e 100644
--- a/go/base/context.go
+++ b/go/base/context.go
@@ -121,17 +121,17 @@ func GetMigrationContext() *MigrationContext {
 
 // GetGhostTableName generates the name of ghost table, based on original table name
 func (this *MigrationContext) GetGhostTableName() string {
-	return fmt.Sprintf("_%s_New", this.OriginalTableName)
+	return fmt.Sprintf("_%s_gst", this.OriginalTableName)
 }
 
 // GetOldTableName generates the name of the "old" table, into which the original table is renamed.
 func (this *MigrationContext) GetOldTableName() string {
-	return fmt.Sprintf("_%s_Old", this.OriginalTableName)
+	return fmt.Sprintf("_%s_old", this.OriginalTableName)
 }
 
 // GetChangelogTableName generates the name of changelog table, based on original table name
 func (this *MigrationContext) GetChangelogTableName() string {
-	return fmt.Sprintf("_%s_OSC", this.OriginalTableName)
+	return fmt.Sprintf("_%s_osc", this.OriginalTableName)
 }
 
 // GetVoluntaryLockName returns a name of a voluntary lock to be used throughout
diff --git a/go/logic/applier.go b/go/logic/applier.go
index 461db05..b34b20b 100644
--- a/go/logic/applier.go
+++ b/go/logic/applier.go
@@ -548,6 +548,39 @@ func (this *Applier) StopSlaveIOThread() error {
 	return nil
 }
 
+// MasterPosWait is applicable with --test-on-replica
+func (this *Applier) MasterPosWait(binlogCoordinates *mysql.BinlogCoordinates) error {
+	var appliedRows int64
+	if err := this.db.QueryRow(`select master_pos_wait(?, ?, ?)`, binlogCoordinates.LogFile, binlogCoordinates.LogPos, 1).Scan(&appliedRows); err != nil {
+		return err
+	}
+	if appliedRows < 0 {
+		return fmt.Errorf("Timeout waiting on master_pos_wait()")
+	}
+	return nil
+}
+
+func (this *Applier) StopSlaveNicely() error {
+	if err := this.StopSlaveIOThread(); err != nil {
+		return err
+	}
+	binlogCoordinates, err := mysql.GetReadBinlogCoordinates(this.db)
+	if err != nil {
+		return err
+	}
+	log.Debugf("Replication stopped at %+v. Will wait for SQL thread to apply", *binlogCoordinates)
+	if err := this.MasterPosWait(binlogCoordinates); err != nil {
+		return err
+	}
+	log.Debugf("Replication SQL thread applied all events")
+	if selfBinlogCoordinates, err := mysql.GetSelfBinlogCoordinates(this.db); err != nil {
+		return err
+	} else {
+		log.Debugf("Self binlog coordinates: %+v", *selfBinlogCoordinates)
+	}
+	return nil
+}
+
 // GrabVoluntaryLock gets a named lock (`GET_LOCK`) and listens
 // on a okToRelease in order to release it
 func (this *Applier) GrabVoluntaryLock(lockGrabbed chan<- error, okToRelease <-chan bool) error {
diff --git a/go/logic/migrator.go b/go/logic/migrator.go
index c234652..b667015 100644
--- a/go/logic/migrator.go
+++ b/go/logic/migrator.go
@@ -48,6 +48,7 @@ type Migrator struct {
 	voluntaryLockAcquired      chan bool
 	panicAbort                 chan error
 
+	allEventsUpToLockProcessedFlag int64
 	// copyRowsQueue should not be buffered; if buffered some non-damaging but
 	//  excessive work happens at the end of the iteration as new copy-jobs arrive befroe realizing the copy is complete
 	copyRowsQueue    chan tableWriteFunc
@@ -65,6 +66,8 @@ func NewMigrator() *Migrator {
 		voluntaryLockAcquired:      make(chan bool, 1),
 		panicAbort:                 make(chan error),
 
+		allEventsUpToLockProcessedFlag: 0,
+
 		copyRowsQueue:          make(chan tableWriteFunc),
 		applyEventsQueue:       make(chan tableWriteFunc, applyEventsQueueBuffer),
 		handledChangelogStates: make(map[string]bool),
@@ -106,7 +109,7 @@ func (this *Migrator) shouldThrottle() (result bool, reason string) {
 	if time.Duration(lag) > time.Duration(this.migrationContext.MaxLagMillisecondsThrottleThreshold)*time.Millisecond {
 		return true, fmt.Sprintf("lag=%fs", time.Duration(lag).Seconds())
 	}
-	if this.migrationContext.TestOnReplica {
+	if this.migrationContext.TestOnReplica && (atomic.LoadInt64(&this.allEventsUpToLockProcessedFlag) == 0) {
 		replicationLag, err := mysql.GetMaxReplicationLag(this.migrationContext.InspectorConnectionConfig, this.migrationContext.ThrottleControlReplicaKeys, this.migrationContext.ReplictionLagQuery)
 		if err != nil {
 			return true, err.Error()
@@ -198,6 +201,16 @@ func (this *Migrator) executeAndThrottleOnError(operation func() error) (err err
 	return nil
 }
 
+// consumeRowCopyComplete blocks on the rowCopyComplete channel once, and then
+// consumers and drops any further incoming events that may be left hanging.
+func (this *Migrator) consumeRowCopyComplete() {
+	<-this.rowCopyComplete
+	go func() {
+		for <-this.rowCopyComplete {
+		}
+	}()
+}
+
 func (this *Migrator) canStopStreaming() bool {
 	return false
 }
@@ -215,33 +228,18 @@ func (this *Migrator) onChangelogStateEvent(dmlEvent *binlog.BinlogDMLEvent) (er
 		}
 	case AllEventsUpToLockProcessed:
 		{
-			this.allEventsUpToLockProcessed <- true
-		}
-	default:
-		{
-			return fmt.Errorf("Unknown changelog state: %+v", changelogState)
-		}
-	}
-	log.Debugf("Received state %+v", changelogState)
-	return nil
-}
-
-func (this *Migrator) onChangelogState(stateValue string) (err error) {
-	log.Fatalf("I shouldn't be here")
-	if this.handledChangelogStates[stateValue] {
-		return nil
-	}
-	this.handledChangelogStates[stateValue] = true
-
-	changelogState := ChangelogState(stateValue)
-	switch changelogState {
-	case TablesInPlace:
-		{
-			this.tablesInPlace <- true
-		}
-	case AllEventsUpToLockProcessed:
-		{
-			this.allEventsUpToLockProcessed <- true
+			applyEventFunc := func() error {
+				this.allEventsUpToLockProcessed <- true
+				return nil
+			}
+			// at this point we know all events up to lock have been read from the streamer,
+			// because the streamer works sequentially. So those events are either already handled,
+			// or have event functions in applyEventsQueue.
+			// So as not to create a potential deadlock, we write this func to applyEventsQueue
+			// asynchronously, understanding it doesn't really matter.
+			go func() {
+				this.applyEventsQueue <- applyEventFunc
+			}()
 		}
 	default:
 		{
@@ -295,6 +293,9 @@ func (this *Migrator) Migrate() (err error) {
 	if err := this.inspector.InspectOriginalAndGhostTables(); err != nil {
 		return err
 	}
+	if err := this.addDMLEventsListener(); err != nil {
+		return err
+	}
 	go this.initiateHeartbeatListener()
 
 	if err := this.applier.ReadMigrationRangeValues(); err != nil {
@@ -307,7 +308,7 @@ func (this *Migrator) Migrate() (err error) {
 	go this.initiateStatus()
 
 	log.Debugf("Operating until row copy is complete")
-	<-this.rowCopyComplete
+	this.consumeRowCopyComplete()
 	log.Debugf("Row copy complete")
 	this.printStatus()
 
@@ -336,18 +337,20 @@ func (this *Migrator) stopWritesAndCompleteMigration() (err error) {
 	if this.migrationContext.QuickAndBumpySwapTables {
 		return this.stopWritesAndCompleteMigrationOnMasterQuickAndBumpy()
 	}
-	// Lock-based solution: we use low timeout and multiple attempts. But for
-	// each failed attempt, we throttle until replication lag is back to normal
-	if err := this.retryOperation(
-		func() error {
-			return this.executeAndThrottleOnError(this.stopWritesAndCompleteMigrationOnMasterViaLock)
-		}); err != nil {
-		return err
-	}
-	if err := this.dropOldTableIfRequired(); err != nil {
-		return err
-	}
 
+	{
+		// Lock-based solution: we use low timeout and multiple attempts. But for
+		// each failed attempt, we throttle until replication lag is back to normal
+		if err := this.retryOperation(
+			func() error {
+				return this.executeAndThrottleOnError(this.stopWritesAndCompleteMigrationOnMasterViaLock)
+			}); err != nil {
+			return err
+		}
+		if err := this.dropOldTableIfRequired(); err != nil {
+			return err
+		}
+	}
 	return
 }
 
@@ -364,6 +367,21 @@ func (this *Migrator) dropOldTableIfRequired() (err error) {
 	return nil
 }
 
+// Inject the "AllEventsUpToLockProcessed" state hint, wait for it to appear in the binary logs,
+// make sure the queue is drained.
+func (this *Migrator) waitForEventsUpToLock() (err error) {
+	if _, err := this.applier.WriteChangelogState(string(AllEventsUpToLockProcessed)); err != nil {
+		return err
+	}
+	log.Debugf("Waiting for events up to lock")
+	<-this.allEventsUpToLockProcessed
+	atomic.StoreInt64(&this.allEventsUpToLockProcessedFlag, 1)
+	log.Debugf("Done waiting for events up to lock")
+	this.printStatus()
+
+	return nil
+}
+
 // stopWritesAndCompleteMigrationOnMasterQuickAndBumpy will lock down the original table, execute
 // what's left of last DML entries, and **non-atomically** swap original->old, then new->original.
 // There is a point in time where the "original" table does not exist and queries are non-blocked
@@ -373,11 +391,9 @@ func (this *Migrator) stopWritesAndCompleteMigrationOnMasterQuickAndBumpy() (err
 		return err
 	}
 
-	this.applier.WriteChangelogState(string(AllEventsUpToLockProcessed))
-	log.Debugf("Waiting for events up to lock")
-	<-this.allEventsUpToLockProcessed
-	log.Debugf("Done waiting for events up to lock")
-
+	if err := this.retryOperation(this.waitForEventsUpToLock); err != nil {
+		return err
+	}
 	if err := this.retryOperation(this.applier.SwapTablesQuickAndBumpy); err != nil {
 		return err
 	}
@@ -438,10 +454,7 @@ func (this *Migrator) stopWritesAndCompleteMigrationOnMasterViaLock() (err error
 	log.Infof("Found RENAME to be executing")
 
 	// OK, at this time we know any newly incoming DML on original table is blocked.
-	this.applier.WriteChangelogState(string(AllEventsUpToLockProcessed))
-	log.Debugf("Waiting for events up to lock")
-	<-this.allEventsUpToLockProcessed
-	log.Debugf("Done waiting for events up to lock")
+	this.waitForEventsUpToLock()
 
 	okToReleaseLock <- true
 	// BAM: voluntary lock is released, blocking query is released, rename is released.
@@ -466,14 +479,11 @@ func (this *Migrator) stopWritesAndCompleteMigrationOnMasterViaLock() (err error
 // in sync. There is no table swap.
 func (this *Migrator) stopWritesAndCompleteMigrationOnReplica() (err error) {
 	log.Debugf("testing on replica. Instead of LOCK tables I will STOP SLAVE")
-	if err := this.retryOperation(this.applier.StopSlaveIOThread); err != nil {
+	if err := this.retryOperation(this.applier.StopSlaveNicely); err != nil {
 		return err
 	}
 
-	this.applier.WriteChangelogState(string(AllEventsUpToLockProcessed))
-	log.Debugf("Waiting for events up to lock")
-	<-this.allEventsUpToLockProcessed
-	log.Debugf("Done waiting for events up to lock")
+	this.waitForEventsUpToLock()
 
 	log.Info("Table duplicated with new schema. Am not touching the original table. Replication is stopped. You may now compare the two tables to gain trust into this tool's operation")
 	return nil
@@ -612,7 +622,17 @@ func (this *Migrator) initiateStreaming() error {
 			return this.onChangelogStateEvent(dmlEvent)
 		},
 	)
-	this.eventsStreamer.AddListener(
+
+	go func() {
+		log.Debugf("Beginning streaming")
+		this.eventsStreamer.StreamEvents(func() bool { return this.canStopStreaming() })
+	}()
+	return nil
+}
+
+// addDMLEventsListener
+func (this *Migrator) addDMLEventsListener() error {
+	err := this.eventsStreamer.AddListener(
 		false,
 		this.migrationContext.DatabaseName,
 		this.migrationContext.OriginalTableName,
@@ -624,12 +644,7 @@ func (this *Migrator) initiateStreaming() error {
 			return nil
 		},
 	)
-
-	go func() {
-		log.Debugf("Beginning streaming")
-		this.eventsStreamer.StreamEvents(func() bool { return this.canStopStreaming() })
-	}()
-	return nil
+	return err
 }
 
 func (this *Migrator) initiateApplier() error {
@@ -680,13 +695,16 @@ func (this *Migrator) iterateChunks() error {
 			if !hasFurtherRange {
 				return terminateRowIteration(nil)
 			}
-			_, rowsAffected, _, err := this.applier.ApplyIterationInsertQuery()
-			if err != nil {
-				return terminateRowIteration(err)
+			applyCopyRowsFunc := func() error {
+				_, rowsAffected, _, err := this.applier.ApplyIterationInsertQuery()
+				if err != nil {
+					return terminateRowIteration(err)
+				}
+				atomic.AddInt64(&this.migrationContext.TotalRowsCopied, rowsAffected)
+				atomic.AddInt64(&this.migrationContext.Iteration, 1)
+				return nil
 			}
-			atomic.AddInt64(&this.migrationContext.TotalRowsCopied, rowsAffected)
-			atomic.AddInt64(&this.migrationContext.Iteration, 1)
-			return nil
+			return this.retryOperation(applyCopyRowsFunc)
 		}
 		this.copyRowsQueue <- copyRowsFunc
 	}
@@ -714,7 +732,8 @@ func (this *Migrator) executeWriteFuncs() error {
 				select {
 				case copyRowsFunc := <-this.copyRowsQueue:
 					{
-						if err := this.retryOperation(copyRowsFunc); err != nil {
+						// Retries are handled within the copyRowsFunc
+						if err := copyRowsFunc(); err != nil {
 							return log.Errore(err)
 						}
 					}
diff --git a/go/logic/streamer.go b/go/logic/streamer.go
index 176ddf2..333c842 100644
--- a/go/logic/streamer.go
+++ b/go/logic/streamer.go
@@ -9,6 +9,7 @@ import (
 	gosql "database/sql"
 	"fmt"
 	"strings"
+	"sync"
 
 	"github.com/github/gh-osc/go/base"
 	"github.com/github/gh-osc/go/binlog"
@@ -37,6 +38,7 @@ type EventsStreamer struct {
 	migrationContext      *base.MigrationContext
 	nextBinlogCoordinates *mysql.BinlogCoordinates
 	listeners             [](*BinlogEventListener)
+	listenersMutex        *sync.Mutex
 	eventsChannel         chan *binlog.BinlogEntry
 	binlogReader          binlog.BinlogReader
 }
@@ -46,12 +48,17 @@ func NewEventsStreamer() *EventsStreamer {
 		connectionConfig: base.GetMigrationContext().InspectorConnectionConfig,
 		migrationContext: base.GetMigrationContext(),
 		listeners:        [](*BinlogEventListener){},
+		listenersMutex:   &sync.Mutex{},
 		eventsChannel:    make(chan *binlog.BinlogEntry, EventsChannelBufferSize),
 	}
 }
 
 func (this *EventsStreamer) AddListener(
 	async bool, databaseName string, tableName string, onDmlEvent func(event *binlog.BinlogDMLEvent) error) (err error) {
+
+	this.listenersMutex.Lock()
+	defer this.listenersMutex.Unlock()
+
 	if databaseName == "" {
 		return fmt.Errorf("Empty database name in AddListener")
 	}
@@ -69,6 +76,9 @@ func (this *EventsStreamer) AddListener(
 }
 
 func (this *EventsStreamer) notifyListeners(binlogEvent *binlog.BinlogDMLEvent) {
+	this.listenersMutex.Lock()
+	defer this.listenersMutex.Unlock()
+
 	for _, listener := range this.listeners {
 		if strings.ToLower(listener.databaseName) != strings.ToLower(binlogEvent.DatabaseName) {
 			continue
diff --git a/go/mysql/utils.go b/go/mysql/utils.go
index 3240120..834f9d4 100644
--- a/go/mysql/utils.go
+++ b/go/mysql/utils.go
@@ -107,3 +107,25 @@ func GetMasterConnectionConfigSafe(connectionConfig *ConnectionConfig, visitedKe
 	visitedKeys.AddKey(masterConfig.Key)
 	return GetMasterConnectionConfigSafe(masterConfig, visitedKeys)
 }
+
+func GetReadBinlogCoordinates(db *gosql.DB) (readBinlogCoordinates *BinlogCoordinates, err error) {
+	err = sqlutils.QueryRowsMap(db, `show slave status`, func(m sqlutils.RowMap) error {
+		readBinlogCoordinates = &BinlogCoordinates{
+			LogFile: m.GetString("Master_Log_File"),
+			LogPos:  m.GetInt64("Read_Master_Log_Pos"),
+		}
+		return nil
+	})
+	return readBinlogCoordinates, err
+}
+
+func GetSelfBinlogCoordinates(db *gosql.DB) (selfBinlogCoordinates *BinlogCoordinates, err error) {
+	err = sqlutils.QueryRowsMap(db, `show master status`, func(m sqlutils.RowMap) error {
+		selfBinlogCoordinates = &BinlogCoordinates{
+			LogFile: m.GetString("File"),
+			LogPos:  m.GetInt64("Position"),
+		}
+		return nil
+	})
+	return selfBinlogCoordinates, err
+}