diff --git a/go/base/context.go b/go/base/context.go index f32ff10..4a7029e 100644 --- a/go/base/context.go +++ b/go/base/context.go @@ -121,17 +121,17 @@ func GetMigrationContext() *MigrationContext { // GetGhostTableName generates the name of ghost table, based on original table name func (this *MigrationContext) GetGhostTableName() string { - return fmt.Sprintf("_%s_New", this.OriginalTableName) + return fmt.Sprintf("_%s_gst", this.OriginalTableName) } // GetOldTableName generates the name of the "old" table, into which the original table is renamed. func (this *MigrationContext) GetOldTableName() string { - return fmt.Sprintf("_%s_Old", this.OriginalTableName) + return fmt.Sprintf("_%s_old", this.OriginalTableName) } // GetChangelogTableName generates the name of changelog table, based on original table name func (this *MigrationContext) GetChangelogTableName() string { - return fmt.Sprintf("_%s_OSC", this.OriginalTableName) + return fmt.Sprintf("_%s_osc", this.OriginalTableName) } // GetVoluntaryLockName returns a name of a voluntary lock to be used throughout diff --git a/go/logic/applier.go b/go/logic/applier.go index 461db05..b34b20b 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -548,6 +548,39 @@ func (this *Applier) StopSlaveIOThread() error { return nil } +// MasterPosWait is applicable with --test-on-replica +func (this *Applier) MasterPosWait(binlogCoordinates *mysql.BinlogCoordinates) error { + var appliedRows int64 + if err := this.db.QueryRow(`select master_pos_wait(?, ?, ?)`, binlogCoordinates.LogFile, binlogCoordinates.LogPos, 1).Scan(&appliedRows); err != nil { + return err + } + if appliedRows < 0 { + return fmt.Errorf("Timeout waiting on master_pos_wait()") + } + return nil +} + +func (this *Applier) StopSlaveNicely() error { + if err := this.StopSlaveIOThread(); err != nil { + return err + } + binlogCoordinates, err := mysql.GetReadBinlogCoordinates(this.db) + if err != nil { + return err + } + log.Debugf("Replication stopped at %+v. Will wait for SQL thread to apply", *binlogCoordinates) + if err := this.MasterPosWait(binlogCoordinates); err != nil { + return err + } + log.Debugf("Replication SQL thread applied all events") + if selfBinlogCoordinates, err := mysql.GetSelfBinlogCoordinates(this.db); err != nil { + return err + } else { + log.Debugf("Self binlog coordinates: %+v", *selfBinlogCoordinates) + } + return nil +} + // GrabVoluntaryLock gets a named lock (`GET_LOCK`) and listens // on a okToRelease in order to release it func (this *Applier) GrabVoluntaryLock(lockGrabbed chan<- error, okToRelease <-chan bool) error { diff --git a/go/logic/migrator.go b/go/logic/migrator.go index 83e4068..b667015 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -48,6 +48,7 @@ type Migrator struct { voluntaryLockAcquired chan bool panicAbort chan error + allEventsUpToLockProcessedFlag int64 // copyRowsQueue should not be buffered; if buffered some non-damaging but // excessive work happens at the end of the iteration as new copy-jobs arrive befroe realizing the copy is complete copyRowsQueue chan tableWriteFunc @@ -65,6 +66,8 @@ func NewMigrator() *Migrator { voluntaryLockAcquired: make(chan bool, 1), panicAbort: make(chan error), + allEventsUpToLockProcessedFlag: 0, + copyRowsQueue: make(chan tableWriteFunc), applyEventsQueue: make(chan tableWriteFunc, applyEventsQueueBuffer), handledChangelogStates: make(map[string]bool), @@ -106,7 +109,7 @@ func (this *Migrator) shouldThrottle() (result bool, reason string) { if time.Duration(lag) > time.Duration(this.migrationContext.MaxLagMillisecondsThrottleThreshold)*time.Millisecond { return true, fmt.Sprintf("lag=%fs", time.Duration(lag).Seconds()) } - if this.migrationContext.TestOnReplica { + if this.migrationContext.TestOnReplica && (atomic.LoadInt64(&this.allEventsUpToLockProcessedFlag) == 0) { replicationLag, err := mysql.GetMaxReplicationLag(this.migrationContext.InspectorConnectionConfig, this.migrationContext.ThrottleControlReplicaKeys, this.migrationContext.ReplictionLagQuery) if err != nil { return true, err.Error() @@ -198,6 +201,16 @@ func (this *Migrator) executeAndThrottleOnError(operation func() error) (err err return nil } +// consumeRowCopyComplete blocks on the rowCopyComplete channel once, and then +// consumers and drops any further incoming events that may be left hanging. +func (this *Migrator) consumeRowCopyComplete() { + <-this.rowCopyComplete + go func() { + for <-this.rowCopyComplete { + } + }() +} + func (this *Migrator) canStopStreaming() bool { return false } @@ -215,33 +228,18 @@ func (this *Migrator) onChangelogStateEvent(dmlEvent *binlog.BinlogDMLEvent) (er } case AllEventsUpToLockProcessed: { - this.allEventsUpToLockProcessed <- true - } - default: - { - return fmt.Errorf("Unknown changelog state: %+v", changelogState) - } - } - log.Debugf("Received state %+v", changelogState) - return nil -} - -func (this *Migrator) onChangelogState(stateValue string) (err error) { - log.Fatalf("I shouldn't be here") - if this.handledChangelogStates[stateValue] { - return nil - } - this.handledChangelogStates[stateValue] = true - - changelogState := ChangelogState(stateValue) - switch changelogState { - case TablesInPlace: - { - this.tablesInPlace <- true - } - case AllEventsUpToLockProcessed: - { - this.allEventsUpToLockProcessed <- true + applyEventFunc := func() error { + this.allEventsUpToLockProcessed <- true + return nil + } + // at this point we know all events up to lock have been read from the streamer, + // because the streamer works sequentially. So those events are either already handled, + // or have event functions in applyEventsQueue. + // So as not to create a potential deadlock, we write this func to applyEventsQueue + // asynchronously, understanding it doesn't really matter. + go func() { + this.applyEventsQueue <- applyEventFunc + }() } default: { @@ -295,6 +293,9 @@ func (this *Migrator) Migrate() (err error) { if err := this.inspector.InspectOriginalAndGhostTables(); err != nil { return err } + if err := this.addDMLEventsListener(); err != nil { + return err + } go this.initiateHeartbeatListener() if err := this.applier.ReadMigrationRangeValues(); err != nil { @@ -307,7 +308,7 @@ func (this *Migrator) Migrate() (err error) { go this.initiateStatus() log.Debugf("Operating until row copy is complete") - <-this.rowCopyComplete + this.consumeRowCopyComplete() log.Debugf("Row copy complete") this.printStatus() @@ -336,18 +337,20 @@ func (this *Migrator) stopWritesAndCompleteMigration() (err error) { if this.migrationContext.QuickAndBumpySwapTables { return this.stopWritesAndCompleteMigrationOnMasterQuickAndBumpy() } - // Lock-based solution: we use low timeout and multiple attempts. But for - // each failed attempt, we throttle until replication lag is back to normal - if err := this.retryOperation( - func() error { - return this.executeAndThrottleOnError(this.stopWritesAndCompleteMigrationOnMasterViaLock) - }); err != nil { - return err - } - if err := this.dropOldTableIfRequired(); err != nil { - return err - } + { + // Lock-based solution: we use low timeout and multiple attempts. But for + // each failed attempt, we throttle until replication lag is back to normal + if err := this.retryOperation( + func() error { + return this.executeAndThrottleOnError(this.stopWritesAndCompleteMigrationOnMasterViaLock) + }); err != nil { + return err + } + if err := this.dropOldTableIfRequired(); err != nil { + return err + } + } return } @@ -364,6 +367,21 @@ func (this *Migrator) dropOldTableIfRequired() (err error) { return nil } +// Inject the "AllEventsUpToLockProcessed" state hint, wait for it to appear in the binary logs, +// make sure the queue is drained. +func (this *Migrator) waitForEventsUpToLock() (err error) { + if _, err := this.applier.WriteChangelogState(string(AllEventsUpToLockProcessed)); err != nil { + return err + } + log.Debugf("Waiting for events up to lock") + <-this.allEventsUpToLockProcessed + atomic.StoreInt64(&this.allEventsUpToLockProcessedFlag, 1) + log.Debugf("Done waiting for events up to lock") + this.printStatus() + + return nil +} + // stopWritesAndCompleteMigrationOnMasterQuickAndBumpy will lock down the original table, execute // what's left of last DML entries, and **non-atomically** swap original->old, then new->original. // There is a point in time where the "original" table does not exist and queries are non-blocked @@ -373,11 +391,9 @@ func (this *Migrator) stopWritesAndCompleteMigrationOnMasterQuickAndBumpy() (err return err } - this.applier.WriteChangelogState(string(AllEventsUpToLockProcessed)) - log.Debugf("Waiting for events up to lock") - <-this.allEventsUpToLockProcessed - log.Debugf("Done waiting for events up to lock") - + if err := this.retryOperation(this.waitForEventsUpToLock); err != nil { + return err + } if err := this.retryOperation(this.applier.SwapTablesQuickAndBumpy); err != nil { return err } @@ -438,10 +454,7 @@ func (this *Migrator) stopWritesAndCompleteMigrationOnMasterViaLock() (err error log.Infof("Found RENAME to be executing") // OK, at this time we know any newly incoming DML on original table is blocked. - this.applier.WriteChangelogState(string(AllEventsUpToLockProcessed)) - log.Debugf("Waiting for events up to lock") - <-this.allEventsUpToLockProcessed - log.Debugf("Done waiting for events up to lock") + this.waitForEventsUpToLock() okToReleaseLock <- true // BAM: voluntary lock is released, blocking query is released, rename is released. @@ -466,14 +479,11 @@ func (this *Migrator) stopWritesAndCompleteMigrationOnMasterViaLock() (err error // in sync. There is no table swap. func (this *Migrator) stopWritesAndCompleteMigrationOnReplica() (err error) { log.Debugf("testing on replica. Instead of LOCK tables I will STOP SLAVE") - if err := this.retryOperation(this.applier.StopSlaveIOThread); err != nil { + if err := this.retryOperation(this.applier.StopSlaveNicely); err != nil { return err } - this.applier.WriteChangelogState(string(AllEventsUpToLockProcessed)) - log.Debugf("Waiting for events up to lock") - <-this.allEventsUpToLockProcessed - log.Debugf("Done waiting for events up to lock") + this.waitForEventsUpToLock() log.Info("Table duplicated with new schema. Am not touching the original table. Replication is stopped. You may now compare the two tables to gain trust into this tool's operation") return nil @@ -612,8 +622,18 @@ func (this *Migrator) initiateStreaming() error { return this.onChangelogStateEvent(dmlEvent) }, ) - this.eventsStreamer.AddListener( - true, + + go func() { + log.Debugf("Beginning streaming") + this.eventsStreamer.StreamEvents(func() bool { return this.canStopStreaming() }) + }() + return nil +} + +// addDMLEventsListener +func (this *Migrator) addDMLEventsListener() error { + err := this.eventsStreamer.AddListener( + false, this.migrationContext.DatabaseName, this.migrationContext.OriginalTableName, func(dmlEvent *binlog.BinlogDMLEvent) error { @@ -624,12 +644,7 @@ func (this *Migrator) initiateStreaming() error { return nil }, ) - - go func() { - log.Debugf("Beginning streaming") - this.eventsStreamer.StreamEvents(func() bool { return this.canStopStreaming() }) - }() - return nil + return err } func (this *Migrator) initiateApplier() error { @@ -680,13 +695,16 @@ func (this *Migrator) iterateChunks() error { if !hasFurtherRange { return terminateRowIteration(nil) } - _, rowsAffected, _, err := this.applier.ApplyIterationInsertQuery() - if err != nil { - return terminateRowIteration(err) + applyCopyRowsFunc := func() error { + _, rowsAffected, _, err := this.applier.ApplyIterationInsertQuery() + if err != nil { + return terminateRowIteration(err) + } + atomic.AddInt64(&this.migrationContext.TotalRowsCopied, rowsAffected) + atomic.AddInt64(&this.migrationContext.Iteration, 1) + return nil } - atomic.AddInt64(&this.migrationContext.TotalRowsCopied, rowsAffected) - atomic.AddInt64(&this.migrationContext.Iteration, 1) - return nil + return this.retryOperation(applyCopyRowsFunc) } this.copyRowsQueue <- copyRowsFunc } @@ -714,7 +732,8 @@ func (this *Migrator) executeWriteFuncs() error { select { case copyRowsFunc := <-this.copyRowsQueue: { - if err := this.retryOperation(copyRowsFunc); err != nil { + // Retries are handled within the copyRowsFunc + if err := copyRowsFunc(); err != nil { return log.Errore(err) } } diff --git a/go/logic/streamer.go b/go/logic/streamer.go index 176ddf2..333c842 100644 --- a/go/logic/streamer.go +++ b/go/logic/streamer.go @@ -9,6 +9,7 @@ import ( gosql "database/sql" "fmt" "strings" + "sync" "github.com/github/gh-osc/go/base" "github.com/github/gh-osc/go/binlog" @@ -37,6 +38,7 @@ type EventsStreamer struct { migrationContext *base.MigrationContext nextBinlogCoordinates *mysql.BinlogCoordinates listeners [](*BinlogEventListener) + listenersMutex *sync.Mutex eventsChannel chan *binlog.BinlogEntry binlogReader binlog.BinlogReader } @@ -46,12 +48,17 @@ func NewEventsStreamer() *EventsStreamer { connectionConfig: base.GetMigrationContext().InspectorConnectionConfig, migrationContext: base.GetMigrationContext(), listeners: [](*BinlogEventListener){}, + listenersMutex: &sync.Mutex{}, eventsChannel: make(chan *binlog.BinlogEntry, EventsChannelBufferSize), } } func (this *EventsStreamer) AddListener( async bool, databaseName string, tableName string, onDmlEvent func(event *binlog.BinlogDMLEvent) error) (err error) { + + this.listenersMutex.Lock() + defer this.listenersMutex.Unlock() + if databaseName == "" { return fmt.Errorf("Empty database name in AddListener") } @@ -69,6 +76,9 @@ func (this *EventsStreamer) AddListener( } func (this *EventsStreamer) notifyListeners(binlogEvent *binlog.BinlogDMLEvent) { + this.listenersMutex.Lock() + defer this.listenersMutex.Unlock() + for _, listener := range this.listeners { if strings.ToLower(listener.databaseName) != strings.ToLower(binlogEvent.DatabaseName) { continue diff --git a/go/mysql/utils.go b/go/mysql/utils.go index 3240120..834f9d4 100644 --- a/go/mysql/utils.go +++ b/go/mysql/utils.go @@ -107,3 +107,25 @@ func GetMasterConnectionConfigSafe(connectionConfig *ConnectionConfig, visitedKe visitedKeys.AddKey(masterConfig.Key) return GetMasterConnectionConfigSafe(masterConfig, visitedKeys) } + +func GetReadBinlogCoordinates(db *gosql.DB) (readBinlogCoordinates *BinlogCoordinates, err error) { + err = sqlutils.QueryRowsMap(db, `show slave status`, func(m sqlutils.RowMap) error { + readBinlogCoordinates = &BinlogCoordinates{ + LogFile: m.GetString("Master_Log_File"), + LogPos: m.GetInt64("Read_Master_Log_Pos"), + } + return nil + }) + return readBinlogCoordinates, err +} + +func GetSelfBinlogCoordinates(db *gosql.DB) (selfBinlogCoordinates *BinlogCoordinates, err error) { + err = sqlutils.QueryRowsMap(db, `show master status`, func(m sqlutils.RowMap) error { + selfBinlogCoordinates = &BinlogCoordinates{ + LogFile: m.GetString("File"), + LogPos: m.GetInt64("Position"), + } + return nil + }) + return selfBinlogCoordinates, err +}