From eeffa701d6dde409f72599a12134ee2144a9d15a Mon Sep 17 00:00:00 2001 From: Shlomi Noach Date: Mon, 18 Apr 2016 10:57:18 -0700 Subject: [PATCH] - Added `ok-to-drop-table` flag - Added `switch-to-rbr` flag; applying binlog format change if needed - Using dedicated db instance for locking & renaming on applier (must be used from within same connection) - Heartbeat now uses `time.RFC3339Nano` - Swap tables works! Caveat: short table outage - `--test-on-replica` works! - retries: using `panicAbort`: from any goroutine, regardless of context, it is possible to terminate the operation - Reintroduced changelog events listener on streamer. This is the correct implementation. --- go/base/context.go | 43 +++++---- go/cmd/gh-osc/main.go | 2 + go/logic/applier.go | 101 ++++++++++++++++++--- go/logic/inspect.go | 42 +++++++-- go/logic/migrator.go | 206 ++++++++++++++++++++++++++++-------------- go/sql/types.go | 13 ++- 6 files changed, 295 insertions(+), 112 deletions(-) diff --git a/go/base/context.go b/go/base/context.go index 5f1876b..e6c5c15 100644 --- a/go/base/context.go +++ b/go/base/context.go @@ -37,31 +37,38 @@ type MigrationContext struct { OriginalTableName string AlterStatement string - Noop bool - TestOnReplica bool + CountTableRows bool + AllowedRunningOnMaster bool + SwitchToRowBinlogFormat bool - TableEngine string - CountTableRows bool - RowsEstimate int64 - UsedRowsEstimateMethod RowsEstimateMethod ChunkSize int64 - OriginalBinlogFormat string - OriginalBinlogRowImage string - AllowedRunningOnMaster bool - InspectorConnectionConfig *mysql.ConnectionConfig - ApplierConnectionConfig *mysql.ConnectionConfig - StartTime time.Time - RowCopyStartTime time.Time - CurrentLag int64 MaxLagMillisecondsThrottleThreshold int64 ThrottleFlagFile string ThrottleAdditionalFlagFile string - TotalRowsCopied int64 - isThrottled bool - throttleReason string - throttleMutex *sync.Mutex MaxLoad map[string]int64 + Noop bool + TestOnReplica bool + OkToDropTable bool + + TableEngine string + RowsEstimate int64 + UsedRowsEstimateMethod RowsEstimateMethod + OriginalBinlogFormat string + OriginalBinlogRowImage string + InspectorConnectionConfig *mysql.ConnectionConfig + ApplierConnectionConfig *mysql.ConnectionConfig + StartTime time.Time + RowCopyStartTime time.Time + LockTablesStartTime time.Time + RenameTablesStartTime time.Time + RenameTablesEndTime time.Time + CurrentLag int64 + TotalRowsCopied int64 + isThrottled bool + throttleReason string + throttleMutex *sync.Mutex + OriginalTableColumns *sql.ColumnList OriginalTableUniqueKeys [](*sql.UniqueKey) GhostTableColumns *sql.ColumnList diff --git a/go/cmd/gh-osc/main.go b/go/cmd/gh-osc/main.go index 09aa337..10ba760 100644 --- a/go/cmd/gh-osc/main.go +++ b/go/cmd/gh-osc/main.go @@ -32,7 +32,9 @@ func main() { executeFlag := flag.Bool("execute", false, "actually execute the alter & migrate the table. Default is noop: do some tests and exit") flag.BoolVar(&migrationContext.TestOnReplica, "test-on-replica", false, "Have the migration run on a replica, not on the master. At the end of migration tables are not swapped; gh-osc issues `STOP SLAVE` and you can compare the two tables for building trust") + flag.BoolVar(&migrationContext.OkToDropTable, "ok-to-drop-table", false, "Shall the tool drop the old table at end of operation. DROPping tables can be a long locking operation, which is why I'm not doing it by default. I'm an online tool, yes?") + flag.BoolVar(&migrationContext.SwitchToRowBinlogFormat, "switch-to-rbr", false, "let this tool automatically switch binary log format to 'ROW' on the replica, if needed. The format will NOT be switched back. I'm too scared to do that, and wish to protect you if you happen to execute another migration while this one is running") flag.Int64Var(&migrationContext.ChunkSize, "chunk-size", 1000, "amount of rows to handle in each iteration (allowed range: 100-100,000)") if migrationContext.ChunkSize < 100 { migrationContext.ChunkSize = 100 diff --git a/go/logic/applier.go b/go/logic/applier.go index 5e7c83f..8d355b8 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -25,6 +25,7 @@ import ( type Applier struct { connectionConfig *mysql.ConnectionConfig db *gosql.DB + singletonDB *gosql.DB migrationContext *base.MigrationContext } @@ -36,21 +37,29 @@ func NewApplier() *Applier { } func (this *Applier) InitDBConnections() (err error) { - ApplierUri := this.connectionConfig.GetDBUri(this.migrationContext.DatabaseName) - if this.db, _, err = sqlutils.GetDB(ApplierUri); err != nil { + applierUri := this.connectionConfig.GetDBUri(this.migrationContext.DatabaseName) + if this.db, _, err = sqlutils.GetDB(applierUri); err != nil { return err } - if err := this.validateConnection(); err != nil { + singletonApplierUri := fmt.Sprintf("%s?timeout=0", applierUri) + if this.singletonDB, _, err = sqlutils.GetDB(singletonApplierUri); err != nil { + return err + } + this.singletonDB.SetMaxOpenConns(1) + if err := this.validateConnection(this.db); err != nil { + return err + } + if err := this.validateConnection(this.singletonDB); err != nil { return err } return nil } // validateConnection issues a simple can-connect to MySQL -func (this *Applier) validateConnection() error { +func (this *Applier) validateConnection(db *gosql.DB) error { query := `select @@global.port` var port int - if err := this.db.QueryRow(query).Scan(&port); err != nil { + if err := db.QueryRow(query).Scan(&port); err != nil { return err } if port != this.connectionConfig.Key.Port { @@ -125,6 +134,10 @@ func (this *Applier) CreateChangelogTable() error { // dropTable drops a given table on the applied host func (this *Applier) dropTable(tableName string) error { + if this.migrationContext.Noop { + log.Debugf("Noop operation; not really dropping table %s", sql.EscapeName(tableName)) + return nil + } query := fmt.Sprintf(`drop /* gh-osc */ table if exists %s.%s`, sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(tableName), @@ -192,7 +205,7 @@ func (this *Applier) WriteChangelogState(value string) (string, error) { func (this *Applier) InitiateHeartbeat(heartbeatIntervalMilliseconds int64) { numSuccessiveFailures := 0 injectHeartbeat := func() error { - if _, err := this.WriteChangelog("heartbeat", time.Now().Format(time.RFC3339)); err != nil { + if _, err := this.WriteChangelog("heartbeat", time.Now().Format(time.RFC3339Nano)); err != nil { numSuccessiveFailures++ if numSuccessiveFailures > this.migrationContext.MaxRetries() { return log.Errore(err) @@ -391,16 +404,21 @@ func (this *Applier) LockTables() error { return nil } - query := fmt.Sprintf(`lock /* gh-osc */ tables %s.%s write, %s.%s write, %s.%s write`, + // query := fmt.Sprintf(`lock /* gh-osc */ tables %s.%s write, %s.%s write, %s.%s write`, + // sql.EscapeName(this.migrationContext.DatabaseName), + // sql.EscapeName(this.migrationContext.OriginalTableName), + // sql.EscapeName(this.migrationContext.DatabaseName), + // sql.EscapeName(this.migrationContext.GetGhostTableName()), + // sql.EscapeName(this.migrationContext.DatabaseName), + // sql.EscapeName(this.migrationContext.GetChangelogTableName()), + // ) + query := fmt.Sprintf(`lock /* gh-osc */ tables %s.%s write`, sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.OriginalTableName), - sql.EscapeName(this.migrationContext.DatabaseName), - sql.EscapeName(this.migrationContext.GetGhostTableName()), - sql.EscapeName(this.migrationContext.DatabaseName), - sql.EscapeName(this.migrationContext.GetChangelogTableName()), ) log.Infof("Locking tables") - if _, err := sqlutils.ExecNoPrepare(this.db, query); err != nil { + this.migrationContext.LockTablesStartTime = time.Now() + if _, err := sqlutils.ExecNoPrepare(this.singletonDB, query); err != nil { return err } log.Infof("Tables locked") @@ -411,13 +429,70 @@ func (this *Applier) LockTables() error { func (this *Applier) UnlockTables() error { query := `unlock /* gh-osc */ tables` log.Infof("Unlocking tables") - if _, err := sqlutils.ExecNoPrepare(this.db, query); err != nil { + if _, err := sqlutils.ExecNoPrepare(this.singletonDB, query); err != nil { return err } log.Infof("Tables unlocked") return nil } +// LockTables +func (this *Applier) SwapTables() error { + if this.migrationContext.Noop { + log.Debugf("Noop operation; not really swapping tables") + return nil + } + + // query := fmt.Sprintf(`rename /* gh-osc */ table %s.%s to %s.%s, %s.%s to %s.%s`, + // sql.EscapeName(this.migrationContext.DatabaseName), + // sql.EscapeName(this.migrationContext.OriginalTableName), + // sql.EscapeName(this.migrationContext.DatabaseName), + // sql.EscapeName(this.migrationContext.GetOldTableName()), + // sql.EscapeName(this.migrationContext.DatabaseName), + // sql.EscapeName(this.migrationContext.GetGhostTableName()), + // sql.EscapeName(this.migrationContext.DatabaseName), + // sql.EscapeName(this.migrationContext.OriginalTableName), + // ) + // log.Infof("Renaming tables") + // if _, err := sqlutils.ExecNoPrepare(this.singletonDB, query); err != nil { + // return err + // } + query := fmt.Sprintf(`alter /* gh-osc */ table %s.%s rename %s`, + sql.EscapeName(this.migrationContext.DatabaseName), + sql.EscapeName(this.migrationContext.OriginalTableName), + sql.EscapeName(this.migrationContext.GetOldTableName()), + ) + log.Infof("Renaming original table") + this.migrationContext.RenameTablesStartTime = time.Now() + if _, err := sqlutils.ExecNoPrepare(this.singletonDB, query); err != nil { + return err + } + query = fmt.Sprintf(`alter /* gh-osc */ table %s.%s rename %s`, + sql.EscapeName(this.migrationContext.DatabaseName), + sql.EscapeName(this.migrationContext.GetGhostTableName()), + sql.EscapeName(this.migrationContext.OriginalTableName), + ) + log.Infof("Renaming ghost table") + if _, err := sqlutils.ExecNoPrepare(this.db, query); err != nil { + return err + } + this.migrationContext.RenameTablesEndTime = time.Now() + + log.Infof("Tables renamed") + return nil +} + +// StopSlaveIOThread is applicable with --test-on-replica; it stops the IO thread +func (this *Applier) StopSlaveIOThread() error { + query := `stop /* gh-osc */ slave io_thread` + log.Infof("Stopping replication") + if _, err := sqlutils.ExecNoPrepare(this.db, query); err != nil { + return err + } + log.Infof("Replication stopped") + return nil +} + func (this *Applier) ShowStatusVariable(variableName string) (result int64, err error) { query := fmt.Sprintf(`show global status like '%s'`, variableName) if err := this.db.QueryRow(query).Scan(&variableName, &result); err != nil { diff --git a/go/logic/inspect.go b/go/logic/inspect.go index 902ccbe..5ca22f8 100644 --- a/go/logic/inspect.go +++ b/go/logic/inspect.go @@ -44,10 +44,13 @@ func (this *Inspector) InitDBConnections() (err error) { if err := this.validateGrants(); err != nil { return err } - if err := this.restartReplication(); err != nil { + // if err := this.restartReplication(); err != nil { + // return err + // } + if err := this.validateBinlogs(); err != nil { return err } - if err := this.validateBinlogs(); err != nil { + if err := this.applyBinlogFormat(); err != nil { return err } return nil @@ -204,6 +207,24 @@ func (this *Inspector) restartReplication() error { return nil } +// applyBinlogFormat sets ROW binlog format and restarts replication to make +// the replication thread apply it. +func (this *Inspector) applyBinlogFormat() error { + if this.migrationContext.RequiresBinlogFormatChange() { + if _, err := sqlutils.ExecNoPrepare(this.db, `set global binlog_format='ROW'`); err != nil { + return err + } + if _, err := sqlutils.ExecNoPrepare(this.db, `set session binlog_format='ROW'`); err != nil { + return err + } + log.Debugf("'ROW' binlog format applied") + } + if err := this.restartReplication(); err != nil { + return err + } + return nil +} + // validateBinlogs checks that binary log configuration is good to go func (this *Inspector) validateBinlogs() error { query := `select @@global.log_bin, @@global.log_slave_updates, @@global.binlog_format` @@ -218,6 +239,9 @@ func (this *Inspector) validateBinlogs() error { return fmt.Errorf("%s:%d must have log_slave_updates enabled", this.connectionConfig.Key.Hostname, this.connectionConfig.Key.Port) } if this.migrationContext.RequiresBinlogFormatChange() { + if !this.migrationContext.SwitchToRowBinlogFormat { + return fmt.Errorf("You must be using ROW binlog format. I can switch it for you, provided --switch-to-rbr and that %s:%d doesn't have replicas", this.connectionConfig.Key.Hostname, this.connectionConfig.Key.Port) + } query := fmt.Sprintf(`show /* gh-osc */ slave hosts`) countReplicas := 0 err := sqlutils.QueryRowsMap(this.db, query, func(rowMap sqlutils.RowMap) error { @@ -230,7 +254,7 @@ func (this *Inspector) validateBinlogs() error { if countReplicas > 0 { return fmt.Errorf("%s:%d has %s binlog_format, but I'm too scared to change it to ROW because it has replicas. Bailing out", this.connectionConfig.Key.Hostname, this.connectionConfig.Key.Port, this.migrationContext.OriginalBinlogFormat) } - log.Infof("%s:%d has %s binlog_format. I will change it to ROW for the duration of this migration.", this.connectionConfig.Key.Hostname, this.connectionConfig.Key.Port, this.migrationContext.OriginalBinlogFormat) + log.Infof("%s:%d has %s binlog_format. I will change it to ROW, and will NOT change it back, even in the event of failure.", this.connectionConfig.Key.Hostname, this.connectionConfig.Key.Port, this.migrationContext.OriginalBinlogFormat) } query = `select @@global.binlog_row_image` if err := this.db.QueryRow(query).Scan(&this.migrationContext.OriginalBinlogRowImage); err != nil { @@ -369,6 +393,7 @@ func (this *Inspector) getCandidateUniqueKeys(tableName string) (uniqueKeys [](* UNIQUES.COUNT_COLUMN_IN_INDEX, COLUMNS.DATA_TYPE, COLUMNS.CHARACTER_SET_NAME, + LOCATE('auto_increment', EXTRA) > 0 as is_auto_increment, has_nullable FROM INFORMATION_SCHEMA.COLUMNS INNER JOIN ( SELECT @@ -414,11 +439,12 @@ func (this *Inspector) getCandidateUniqueKeys(tableName string) (uniqueKeys [](* END, COUNT_COLUMN_IN_INDEX ` - err = sqlutils.QueryRowsMap(this.db, query, func(rowMap sqlutils.RowMap) error { + err = sqlutils.QueryRowsMap(this.db, query, func(m sqlutils.RowMap) error { uniqueKey := &sql.UniqueKey{ - Name: rowMap.GetString("INDEX_NAME"), - Columns: *sql.ParseColumnList(rowMap.GetString("COLUMN_NAMES")), - HasNullable: rowMap.GetBool("has_nullable"), + Name: m.GetString("INDEX_NAME"), + Columns: *sql.ParseColumnList(m.GetString("COLUMN_NAMES")), + HasNullable: m.GetBool("has_nullable"), + IsAutoIncrement: m.GetBool("is_auto_increment"), } uniqueKeys = append(uniqueKeys, uniqueKey) return nil @@ -426,7 +452,7 @@ func (this *Inspector) getCandidateUniqueKeys(tableName string) (uniqueKeys [](* if err != nil { return uniqueKeys, err } - log.Debugf("Potential unique keys: %+v", uniqueKeys) + log.Debugf("Potential unique keys in %+v: %+v", tableName, uniqueKeys) return uniqueKeys, nil } diff --git a/go/logic/migrator.go b/go/logic/migrator.go index 62fc6e3..1b5b406 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -9,13 +9,13 @@ import ( "fmt" "os" "os/signal" - "regexp" "sync/atomic" "syscall" "time" "github.com/github/gh-osc/go/base" "github.com/github/gh-osc/go/binlog" + "github.com/github/gh-osc/go/sql" "github.com/outbrain/golib/log" ) @@ -34,10 +34,6 @@ const ( heartbeatIntervalMilliseconds = 1000 ) -var ( - prettifyDurationRegexp = regexp.MustCompile("([.][0-9]+)") -) - // Migrator is the main schema migration flow manager. type Migrator struct { inspector *Inspector @@ -48,6 +44,7 @@ type Migrator struct { tablesInPlace chan bool rowCopyComplete chan bool allEventsUpToLockProcessed chan bool + panicAbort chan error // copyRowsQueue should not be buffered; if buffered some non-damaging but // excessive work happens at the end of the iteration as new copy-jobs arrive befroe realizing the copy is complete @@ -63,6 +60,7 @@ func NewMigrator() *Migrator { tablesInPlace: make(chan bool), rowCopyComplete: make(chan bool), allEventsUpToLockProcessed: make(chan bool), + panicAbort: make(chan error), copyRowsQueue: make(chan tableWriteFunc), applyEventsQueue: make(chan tableWriteFunc, applyEventsQueueBuffer), @@ -71,15 +69,6 @@ func NewMigrator() *Migrator { return migrator } -func prettifyDurationOutput(d time.Duration) string { - if d < time.Second { - return "0s" - } - result := fmt.Sprintf("%s", d) - result = prettifyDurationRegexp.ReplaceAllString(result, "") - return result -} - // acceptSignals registers for OS signals func (this *Migrator) acceptSignals() { c := make(chan os.Signal, 1) @@ -182,6 +171,7 @@ func (this *Migrator) retryOperation(operation func() error) (err error) { } // there's an error. Let's try again. } + this.panicAbort <- err return err } @@ -189,9 +179,34 @@ func (this *Migrator) canStopStreaming() bool { return false } +func (this *Migrator) onChangelogStateEvent(dmlEvent *binlog.BinlogDMLEvent) (err error) { + // Hey, I created the changlog table, I know the type of columns it has! + if hint := dmlEvent.NewColumnValues.StringColumn(2); hint != "state" { + return nil + } + changelogState := ChangelogState(dmlEvent.NewColumnValues.StringColumn(3)) + switch changelogState { + case TablesInPlace: + { + this.tablesInPlace <- true + } + case AllEventsUpToLockProcessed: + { + this.allEventsUpToLockProcessed <- true + } + default: + { + return fmt.Errorf("Unknown changelog state: %+v", changelogState) + } + } + log.Debugf("Received state %+v", changelogState) + return nil +} + func (this *Migrator) onChangelogState(stateValue string) (err error) { + log.Fatalf("I shouldn't be here") if this.handledChangelogStates[stateValue] { - return + return nil } this.handledChangelogStates[stateValue] = true @@ -215,7 +230,7 @@ func (this *Migrator) onChangelogState(stateValue string) (err error) { } func (this *Migrator) onChangelogHeartbeat(heartbeatValue string) (err error) { - heartbeatTime, err := time.Parse(time.RFC3339, heartbeatValue) + heartbeatTime, err := time.Parse(time.RFC3339Nano, heartbeatValue) if err != nil { return log.Errore(err) } @@ -226,9 +241,105 @@ func (this *Migrator) onChangelogHeartbeat(heartbeatValue string) (err error) { return nil } +// +func (this *Migrator) listenOnPanicAbort() { + err := <-this.panicAbort + log.Fatale(err) +} + func (this *Migrator) Migrate() (err error) { this.migrationContext.StartTime = time.Now() + go this.listenOnPanicAbort() + if err := this.initiateInspector(); err != nil { + return err + } + + if err := this.initiateStreaming(); err != nil { + return err + } + if err := this.initiateApplier(); err != nil { + return err + } + + log.Debugf("Waiting for tables to be in place") + <-this.tablesInPlace + log.Debugf("Tables are in place") + // Yay! We now know the Ghost and Changelog tables are good to examine! + // When running on replica, this means the replica has those tables. When running + // on master this is always true, of course, and yet it also implies this knowledge + // is in the binlogs. + if err := this.inspector.InspectOriginalAndGhostTables(); err != nil { + return err + } + go this.initiateHeartbeatListener() + + if err := this.applier.ReadMigrationRangeValues(); err != nil { + return err + } + go this.initiateThrottler() + go this.executeWriteFuncs() + go this.iterateChunks() + this.migrationContext.RowCopyStartTime = time.Now() + go this.initiateStatus() + + log.Debugf("Operating until row copy is complete") + <-this.rowCopyComplete + log.Debugf("Row copy complete") + this.printStatus() + + this.stopWritesAndCompleteMigration() + + return nil +} + +func (this *Migrator) stopWritesAndCompleteMigration() (err error) { + this.throttle(func() { + log.Debugf("throttling before LOCK TABLES") + }) + + if this.migrationContext.TestOnReplica { + log.Debugf("testing on replica. Instead of LOCK tables I will STOP SLAVE") + if err := this.retryOperation(this.applier.StopSlaveIOThread); err != nil { + return err + } + } else { + if err := this.retryOperation(this.applier.LockTables); err != nil { + return err + } + } + this.applier.WriteChangelogState(string(AllEventsUpToLockProcessed)) + log.Debugf("Waiting for events up to lock") + <-this.allEventsUpToLockProcessed + log.Debugf("Done waiting for events up to lock") + + if this.migrationContext.TestOnReplica { + log.Info("Table duplicated with new schema. Am not touching the original table. You may now compare the two tables to gain trust into this tool's operation") + } else { + if err := this.retryOperation(this.applier.SwapTables); err != nil { + return err + } + // Unlock + if err := this.retryOperation(this.applier.UnlockTables); err != nil { + return err + } + // Drop old table + if this.migrationContext.OkToDropTable { + dropTableFunc := func() error { + return this.applier.dropTable(this.migrationContext.GetOldTableName()) + } + if err := this.retryOperation(dropTableFunc); err != nil { + return err + } + } + } + lockAndRenameDuration := this.migrationContext.RenameTablesEndTime.Sub(this.migrationContext.LockTablesStartTime) + renameDuration := this.migrationContext.RenameTablesEndTime.Sub(this.migrationContext.RenameTablesStartTime) + log.Debugf("Lock & rename duration: %s (rename only: %s). During this time, queries on %s were locked or failing", lockAndRenameDuration, renameDuration, sql.EscapeName(this.migrationContext.OriginalTableName)) + return nil +} + +func (this *Migrator) initiateInspector() (err error) { this.inspector = NewInspector() if err := this.inspector.InitDBConnections(); err != nil { return err @@ -257,53 +368,6 @@ func (this *Migrator) Migrate() (err error) { } log.Infof("Master found to be %+v", this.migrationContext.ApplierConnectionConfig.Key) - - go this.initiateChangelogListener() - - if err := this.initiateStreaming(); err != nil { - return err - } - if err := this.initiateApplier(); err != nil { - return err - } - - log.Debugf("Waiting for tables to be in place") - <-this.tablesInPlace - log.Debugf("Tables are in place") - // Yay! We now know the Ghost and Changelog tables are good to examine! - // When running on replica, this means the replica has those tables. When running - // on master this is always true, of course, and yet it also implies this knowledge - // is in the binlogs. - if err := this.inspector.InspectOriginalAndGhostTables(); err != nil { - return err - } - - if err := this.applier.ReadMigrationRangeValues(); err != nil { - return err - } - go this.initiateThrottler() - go this.executeWriteFuncs() - go this.iterateChunks() - this.migrationContext.RowCopyStartTime = time.Now() - go this.initiateStatus() - - log.Debugf("Operating until row copy is complete") - <-this.rowCopyComplete - log.Debugf("Row copy complete") - this.printStatus() - - this.throttle(func() { - log.Debugf("throttling on LOCK TABLES") - }) - // TODO retries!! - this.applier.LockTables() - this.applier.WriteChangelogState(string(AllEventsUpToLockProcessed)) - log.Debugf("Waiting for events up to lock") - <-this.allEventsUpToLockProcessed - log.Debugf("Done waiting for events up to lock") - // TODO retries!! - this.applier.UnlockTables() - return nil } @@ -347,7 +411,7 @@ func (this *Migrator) printStatus() { status := fmt.Sprintf("Copy: %d/%d %.1f%%; Backlog: %d/%d; Elapsed: %+v(copy), %+v(total); ETA: %s", totalRowsCopied, rowsEstimate, progressPct, len(this.applyEventsQueue), cap(this.applyEventsQueue), - prettifyDurationOutput(this.migrationContext.ElapsedRowCopyTime()), prettifyDurationOutput(elapsedTime), + base.PrettifyDurationOutput(this.migrationContext.ElapsedRowCopyTime()), base.PrettifyDurationOutput(elapsedTime), eta, ) this.applier.WriteChangelog( @@ -357,7 +421,7 @@ func (this *Migrator) printStatus() { fmt.Println(status) } -func (this *Migrator) initiateChangelogListener() { +func (this *Migrator) initiateHeartbeatListener() { ticker := time.Tick((heartbeatIntervalMilliseconds * time.Millisecond) / 2) for range ticker { go func() error { @@ -367,10 +431,6 @@ func (this *Migrator) initiateChangelogListener() { } for hint, value := range changelogState { switch hint { - case "state": - { - this.onChangelogState(value) - } case "heartbeat": { this.onChangelogHeartbeat(value) @@ -392,6 +452,14 @@ func (this *Migrator) initiateStreaming() error { log.Debugf("Noop operation; not really listening on binlog events") return nil } + this.eventsStreamer.AddListener( + false, + this.migrationContext.DatabaseName, + this.migrationContext.GetChangelogTableName(), + func(dmlEvent *binlog.BinlogDMLEvent) error { + return this.onChangelogStateEvent(dmlEvent) + }, + ) this.eventsStreamer.AddListener( true, this.migrationContext.DatabaseName, diff --git a/go/sql/types.go b/go/sql/types.go index e82720c..ea73b3a 100644 --- a/go/sql/types.go +++ b/go/sql/types.go @@ -71,9 +71,10 @@ func (this *ColumnList) Len() int { // UniqueKey is the combination of a key's name and columns type UniqueKey struct { - Name string - Columns ColumnList - HasNullable bool + Name string + Columns ColumnList + HasNullable bool + IsAutoIncrement bool } // IsPrimary checks if this unique key is primary @@ -86,7 +87,11 @@ func (this *UniqueKey) Len() int { } func (this *UniqueKey) String() string { - return fmt.Sprintf("%s: %s; has nullable: %+v", this.Name, this.Columns, this.HasNullable) + description := this.Name + if this.IsAutoIncrement { + description = fmt.Sprintf("%s (auto_incrmenet)", description) + } + return fmt.Sprintf("%s: %s; has nullable: %+v", description, this.Columns.Names, this.HasNullable) } type ColumnValues struct {