Merge pull request #18 from github/ongoing-initial-work-3

Full cycle works!
This commit is contained in:
Shlomi Noach 2016-04-18 10:58:59 -07:00
commit bb264fdc2b
6 changed files with 295 additions and 112 deletions

View File

@ -37,31 +37,38 @@ type MigrationContext struct {
OriginalTableName string
AlterStatement string
Noop bool
TestOnReplica bool
CountTableRows bool
AllowedRunningOnMaster bool
SwitchToRowBinlogFormat bool
TableEngine string
CountTableRows bool
RowsEstimate int64
UsedRowsEstimateMethod RowsEstimateMethod
ChunkSize int64
OriginalBinlogFormat string
OriginalBinlogRowImage string
AllowedRunningOnMaster bool
InspectorConnectionConfig *mysql.ConnectionConfig
ApplierConnectionConfig *mysql.ConnectionConfig
StartTime time.Time
RowCopyStartTime time.Time
CurrentLag int64
MaxLagMillisecondsThrottleThreshold int64
ThrottleFlagFile string
ThrottleAdditionalFlagFile string
TotalRowsCopied int64
isThrottled bool
throttleReason string
throttleMutex *sync.Mutex
MaxLoad map[string]int64
Noop bool
TestOnReplica bool
OkToDropTable bool
TableEngine string
RowsEstimate int64
UsedRowsEstimateMethod RowsEstimateMethod
OriginalBinlogFormat string
OriginalBinlogRowImage string
InspectorConnectionConfig *mysql.ConnectionConfig
ApplierConnectionConfig *mysql.ConnectionConfig
StartTime time.Time
RowCopyStartTime time.Time
LockTablesStartTime time.Time
RenameTablesStartTime time.Time
RenameTablesEndTime time.Time
CurrentLag int64
TotalRowsCopied int64
isThrottled bool
throttleReason string
throttleMutex *sync.Mutex
OriginalTableColumns *sql.ColumnList
OriginalTableUniqueKeys [](*sql.UniqueKey)
GhostTableColumns *sql.ColumnList

View File

@ -32,7 +32,9 @@ func main() {
executeFlag := flag.Bool("execute", false, "actually execute the alter & migrate the table. Default is noop: do some tests and exit")
flag.BoolVar(&migrationContext.TestOnReplica, "test-on-replica", false, "Have the migration run on a replica, not on the master. At the end of migration tables are not swapped; gh-osc issues `STOP SLAVE` and you can compare the two tables for building trust")
flag.BoolVar(&migrationContext.OkToDropTable, "ok-to-drop-table", false, "Shall the tool drop the old table at end of operation. DROPping tables can be a long locking operation, which is why I'm not doing it by default. I'm an online tool, yes?")
flag.BoolVar(&migrationContext.SwitchToRowBinlogFormat, "switch-to-rbr", false, "let this tool automatically switch binary log format to 'ROW' on the replica, if needed. The format will NOT be switched back. I'm too scared to do that, and wish to protect you if you happen to execute another migration while this one is running")
flag.Int64Var(&migrationContext.ChunkSize, "chunk-size", 1000, "amount of rows to handle in each iteration (allowed range: 100-100,000)")
if migrationContext.ChunkSize < 100 {
migrationContext.ChunkSize = 100

View File

@ -25,6 +25,7 @@ import (
type Applier struct {
connectionConfig *mysql.ConnectionConfig
db *gosql.DB
singletonDB *gosql.DB
migrationContext *base.MigrationContext
}
@ -36,21 +37,29 @@ func NewApplier() *Applier {
}
func (this *Applier) InitDBConnections() (err error) {
ApplierUri := this.connectionConfig.GetDBUri(this.migrationContext.DatabaseName)
if this.db, _, err = sqlutils.GetDB(ApplierUri); err != nil {
applierUri := this.connectionConfig.GetDBUri(this.migrationContext.DatabaseName)
if this.db, _, err = sqlutils.GetDB(applierUri); err != nil {
return err
}
if err := this.validateConnection(); err != nil {
singletonApplierUri := fmt.Sprintf("%s?timeout=0", applierUri)
if this.singletonDB, _, err = sqlutils.GetDB(singletonApplierUri); err != nil {
return err
}
this.singletonDB.SetMaxOpenConns(1)
if err := this.validateConnection(this.db); err != nil {
return err
}
if err := this.validateConnection(this.singletonDB); err != nil {
return err
}
return nil
}
// validateConnection issues a simple can-connect to MySQL
func (this *Applier) validateConnection() error {
func (this *Applier) validateConnection(db *gosql.DB) error {
query := `select @@global.port`
var port int
if err := this.db.QueryRow(query).Scan(&port); err != nil {
if err := db.QueryRow(query).Scan(&port); err != nil {
return err
}
if port != this.connectionConfig.Key.Port {
@ -125,6 +134,10 @@ func (this *Applier) CreateChangelogTable() error {
// dropTable drops a given table on the applied host
func (this *Applier) dropTable(tableName string) error {
if this.migrationContext.Noop {
log.Debugf("Noop operation; not really dropping table %s", sql.EscapeName(tableName))
return nil
}
query := fmt.Sprintf(`drop /* gh-osc */ table if exists %s.%s`,
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(tableName),
@ -192,7 +205,7 @@ func (this *Applier) WriteChangelogState(value string) (string, error) {
func (this *Applier) InitiateHeartbeat(heartbeatIntervalMilliseconds int64) {
numSuccessiveFailures := 0
injectHeartbeat := func() error {
if _, err := this.WriteChangelog("heartbeat", time.Now().Format(time.RFC3339)); err != nil {
if _, err := this.WriteChangelog("heartbeat", time.Now().Format(time.RFC3339Nano)); err != nil {
numSuccessiveFailures++
if numSuccessiveFailures > this.migrationContext.MaxRetries() {
return log.Errore(err)
@ -391,16 +404,21 @@ func (this *Applier) LockTables() error {
return nil
}
query := fmt.Sprintf(`lock /* gh-osc */ tables %s.%s write, %s.%s write, %s.%s write`,
// query := fmt.Sprintf(`lock /* gh-osc */ tables %s.%s write, %s.%s write, %s.%s write`,
// sql.EscapeName(this.migrationContext.DatabaseName),
// sql.EscapeName(this.migrationContext.OriginalTableName),
// sql.EscapeName(this.migrationContext.DatabaseName),
// sql.EscapeName(this.migrationContext.GetGhostTableName()),
// sql.EscapeName(this.migrationContext.DatabaseName),
// sql.EscapeName(this.migrationContext.GetChangelogTableName()),
// )
query := fmt.Sprintf(`lock /* gh-osc */ tables %s.%s write`,
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.OriginalTableName),
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.GetGhostTableName()),
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.GetChangelogTableName()),
)
log.Infof("Locking tables")
if _, err := sqlutils.ExecNoPrepare(this.db, query); err != nil {
this.migrationContext.LockTablesStartTime = time.Now()
if _, err := sqlutils.ExecNoPrepare(this.singletonDB, query); err != nil {
return err
}
log.Infof("Tables locked")
@ -411,13 +429,70 @@ func (this *Applier) LockTables() error {
func (this *Applier) UnlockTables() error {
query := `unlock /* gh-osc */ tables`
log.Infof("Unlocking tables")
if _, err := sqlutils.ExecNoPrepare(this.db, query); err != nil {
if _, err := sqlutils.ExecNoPrepare(this.singletonDB, query); err != nil {
return err
}
log.Infof("Tables unlocked")
return nil
}
// LockTables
func (this *Applier) SwapTables() error {
if this.migrationContext.Noop {
log.Debugf("Noop operation; not really swapping tables")
return nil
}
// query := fmt.Sprintf(`rename /* gh-osc */ table %s.%s to %s.%s, %s.%s to %s.%s`,
// sql.EscapeName(this.migrationContext.DatabaseName),
// sql.EscapeName(this.migrationContext.OriginalTableName),
// sql.EscapeName(this.migrationContext.DatabaseName),
// sql.EscapeName(this.migrationContext.GetOldTableName()),
// sql.EscapeName(this.migrationContext.DatabaseName),
// sql.EscapeName(this.migrationContext.GetGhostTableName()),
// sql.EscapeName(this.migrationContext.DatabaseName),
// sql.EscapeName(this.migrationContext.OriginalTableName),
// )
// log.Infof("Renaming tables")
// if _, err := sqlutils.ExecNoPrepare(this.singletonDB, query); err != nil {
// return err
// }
query := fmt.Sprintf(`alter /* gh-osc */ table %s.%s rename %s`,
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.OriginalTableName),
sql.EscapeName(this.migrationContext.GetOldTableName()),
)
log.Infof("Renaming original table")
this.migrationContext.RenameTablesStartTime = time.Now()
if _, err := sqlutils.ExecNoPrepare(this.singletonDB, query); err != nil {
return err
}
query = fmt.Sprintf(`alter /* gh-osc */ table %s.%s rename %s`,
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.GetGhostTableName()),
sql.EscapeName(this.migrationContext.OriginalTableName),
)
log.Infof("Renaming ghost table")
if _, err := sqlutils.ExecNoPrepare(this.db, query); err != nil {
return err
}
this.migrationContext.RenameTablesEndTime = time.Now()
log.Infof("Tables renamed")
return nil
}
// StopSlaveIOThread is applicable with --test-on-replica; it stops the IO thread
func (this *Applier) StopSlaveIOThread() error {
query := `stop /* gh-osc */ slave io_thread`
log.Infof("Stopping replication")
if _, err := sqlutils.ExecNoPrepare(this.db, query); err != nil {
return err
}
log.Infof("Replication stopped")
return nil
}
func (this *Applier) ShowStatusVariable(variableName string) (result int64, err error) {
query := fmt.Sprintf(`show global status like '%s'`, variableName)
if err := this.db.QueryRow(query).Scan(&variableName, &result); err != nil {

View File

@ -44,10 +44,13 @@ func (this *Inspector) InitDBConnections() (err error) {
if err := this.validateGrants(); err != nil {
return err
}
if err := this.restartReplication(); err != nil {
// if err := this.restartReplication(); err != nil {
// return err
// }
if err := this.validateBinlogs(); err != nil {
return err
}
if err := this.validateBinlogs(); err != nil {
if err := this.applyBinlogFormat(); err != nil {
return err
}
return nil
@ -204,6 +207,24 @@ func (this *Inspector) restartReplication() error {
return nil
}
// applyBinlogFormat sets ROW binlog format and restarts replication to make
// the replication thread apply it.
func (this *Inspector) applyBinlogFormat() error {
if this.migrationContext.RequiresBinlogFormatChange() {
if _, err := sqlutils.ExecNoPrepare(this.db, `set global binlog_format='ROW'`); err != nil {
return err
}
if _, err := sqlutils.ExecNoPrepare(this.db, `set session binlog_format='ROW'`); err != nil {
return err
}
log.Debugf("'ROW' binlog format applied")
}
if err := this.restartReplication(); err != nil {
return err
}
return nil
}
// validateBinlogs checks that binary log configuration is good to go
func (this *Inspector) validateBinlogs() error {
query := `select @@global.log_bin, @@global.log_slave_updates, @@global.binlog_format`
@ -218,6 +239,9 @@ func (this *Inspector) validateBinlogs() error {
return fmt.Errorf("%s:%d must have log_slave_updates enabled", this.connectionConfig.Key.Hostname, this.connectionConfig.Key.Port)
}
if this.migrationContext.RequiresBinlogFormatChange() {
if !this.migrationContext.SwitchToRowBinlogFormat {
return fmt.Errorf("You must be using ROW binlog format. I can switch it for you, provided --switch-to-rbr and that %s:%d doesn't have replicas", this.connectionConfig.Key.Hostname, this.connectionConfig.Key.Port)
}
query := fmt.Sprintf(`show /* gh-osc */ slave hosts`)
countReplicas := 0
err := sqlutils.QueryRowsMap(this.db, query, func(rowMap sqlutils.RowMap) error {
@ -230,7 +254,7 @@ func (this *Inspector) validateBinlogs() error {
if countReplicas > 0 {
return fmt.Errorf("%s:%d has %s binlog_format, but I'm too scared to change it to ROW because it has replicas. Bailing out", this.connectionConfig.Key.Hostname, this.connectionConfig.Key.Port, this.migrationContext.OriginalBinlogFormat)
}
log.Infof("%s:%d has %s binlog_format. I will change it to ROW for the duration of this migration.", this.connectionConfig.Key.Hostname, this.connectionConfig.Key.Port, this.migrationContext.OriginalBinlogFormat)
log.Infof("%s:%d has %s binlog_format. I will change it to ROW, and will NOT change it back, even in the event of failure.", this.connectionConfig.Key.Hostname, this.connectionConfig.Key.Port, this.migrationContext.OriginalBinlogFormat)
}
query = `select @@global.binlog_row_image`
if err := this.db.QueryRow(query).Scan(&this.migrationContext.OriginalBinlogRowImage); err != nil {
@ -369,6 +393,7 @@ func (this *Inspector) getCandidateUniqueKeys(tableName string) (uniqueKeys [](*
UNIQUES.COUNT_COLUMN_IN_INDEX,
COLUMNS.DATA_TYPE,
COLUMNS.CHARACTER_SET_NAME,
LOCATE('auto_increment', EXTRA) > 0 as is_auto_increment,
has_nullable
FROM INFORMATION_SCHEMA.COLUMNS INNER JOIN (
SELECT
@ -414,11 +439,12 @@ func (this *Inspector) getCandidateUniqueKeys(tableName string) (uniqueKeys [](*
END,
COUNT_COLUMN_IN_INDEX
`
err = sqlutils.QueryRowsMap(this.db, query, func(rowMap sqlutils.RowMap) error {
err = sqlutils.QueryRowsMap(this.db, query, func(m sqlutils.RowMap) error {
uniqueKey := &sql.UniqueKey{
Name: rowMap.GetString("INDEX_NAME"),
Columns: *sql.ParseColumnList(rowMap.GetString("COLUMN_NAMES")),
HasNullable: rowMap.GetBool("has_nullable"),
Name: m.GetString("INDEX_NAME"),
Columns: *sql.ParseColumnList(m.GetString("COLUMN_NAMES")),
HasNullable: m.GetBool("has_nullable"),
IsAutoIncrement: m.GetBool("is_auto_increment"),
}
uniqueKeys = append(uniqueKeys, uniqueKey)
return nil
@ -426,7 +452,7 @@ func (this *Inspector) getCandidateUniqueKeys(tableName string) (uniqueKeys [](*
if err != nil {
return uniqueKeys, err
}
log.Debugf("Potential unique keys: %+v", uniqueKeys)
log.Debugf("Potential unique keys in %+v: %+v", tableName, uniqueKeys)
return uniqueKeys, nil
}

View File

@ -9,13 +9,13 @@ import (
"fmt"
"os"
"os/signal"
"regexp"
"sync/atomic"
"syscall"
"time"
"github.com/github/gh-osc/go/base"
"github.com/github/gh-osc/go/binlog"
"github.com/github/gh-osc/go/sql"
"github.com/outbrain/golib/log"
)
@ -34,10 +34,6 @@ const (
heartbeatIntervalMilliseconds = 1000
)
var (
prettifyDurationRegexp = regexp.MustCompile("([.][0-9]+)")
)
// Migrator is the main schema migration flow manager.
type Migrator struct {
inspector *Inspector
@ -48,6 +44,7 @@ type Migrator struct {
tablesInPlace chan bool
rowCopyComplete chan bool
allEventsUpToLockProcessed chan bool
panicAbort chan error
// copyRowsQueue should not be buffered; if buffered some non-damaging but
// excessive work happens at the end of the iteration as new copy-jobs arrive befroe realizing the copy is complete
@ -63,6 +60,7 @@ func NewMigrator() *Migrator {
tablesInPlace: make(chan bool),
rowCopyComplete: make(chan bool),
allEventsUpToLockProcessed: make(chan bool),
panicAbort: make(chan error),
copyRowsQueue: make(chan tableWriteFunc),
applyEventsQueue: make(chan tableWriteFunc, applyEventsQueueBuffer),
@ -71,15 +69,6 @@ func NewMigrator() *Migrator {
return migrator
}
func prettifyDurationOutput(d time.Duration) string {
if d < time.Second {
return "0s"
}
result := fmt.Sprintf("%s", d)
result = prettifyDurationRegexp.ReplaceAllString(result, "")
return result
}
// acceptSignals registers for OS signals
func (this *Migrator) acceptSignals() {
c := make(chan os.Signal, 1)
@ -182,6 +171,7 @@ func (this *Migrator) retryOperation(operation func() error) (err error) {
}
// there's an error. Let's try again.
}
this.panicAbort <- err
return err
}
@ -189,9 +179,34 @@ func (this *Migrator) canStopStreaming() bool {
return false
}
func (this *Migrator) onChangelogStateEvent(dmlEvent *binlog.BinlogDMLEvent) (err error) {
// Hey, I created the changlog table, I know the type of columns it has!
if hint := dmlEvent.NewColumnValues.StringColumn(2); hint != "state" {
return nil
}
changelogState := ChangelogState(dmlEvent.NewColumnValues.StringColumn(3))
switch changelogState {
case TablesInPlace:
{
this.tablesInPlace <- true
}
case AllEventsUpToLockProcessed:
{
this.allEventsUpToLockProcessed <- true
}
default:
{
return fmt.Errorf("Unknown changelog state: %+v", changelogState)
}
}
log.Debugf("Received state %+v", changelogState)
return nil
}
func (this *Migrator) onChangelogState(stateValue string) (err error) {
log.Fatalf("I shouldn't be here")
if this.handledChangelogStates[stateValue] {
return
return nil
}
this.handledChangelogStates[stateValue] = true
@ -215,7 +230,7 @@ func (this *Migrator) onChangelogState(stateValue string) (err error) {
}
func (this *Migrator) onChangelogHeartbeat(heartbeatValue string) (err error) {
heartbeatTime, err := time.Parse(time.RFC3339, heartbeatValue)
heartbeatTime, err := time.Parse(time.RFC3339Nano, heartbeatValue)
if err != nil {
return log.Errore(err)
}
@ -226,9 +241,105 @@ func (this *Migrator) onChangelogHeartbeat(heartbeatValue string) (err error) {
return nil
}
//
func (this *Migrator) listenOnPanicAbort() {
err := <-this.panicAbort
log.Fatale(err)
}
func (this *Migrator) Migrate() (err error) {
this.migrationContext.StartTime = time.Now()
go this.listenOnPanicAbort()
if err := this.initiateInspector(); err != nil {
return err
}
if err := this.initiateStreaming(); err != nil {
return err
}
if err := this.initiateApplier(); err != nil {
return err
}
log.Debugf("Waiting for tables to be in place")
<-this.tablesInPlace
log.Debugf("Tables are in place")
// Yay! We now know the Ghost and Changelog tables are good to examine!
// When running on replica, this means the replica has those tables. When running
// on master this is always true, of course, and yet it also implies this knowledge
// is in the binlogs.
if err := this.inspector.InspectOriginalAndGhostTables(); err != nil {
return err
}
go this.initiateHeartbeatListener()
if err := this.applier.ReadMigrationRangeValues(); err != nil {
return err
}
go this.initiateThrottler()
go this.executeWriteFuncs()
go this.iterateChunks()
this.migrationContext.RowCopyStartTime = time.Now()
go this.initiateStatus()
log.Debugf("Operating until row copy is complete")
<-this.rowCopyComplete
log.Debugf("Row copy complete")
this.printStatus()
this.stopWritesAndCompleteMigration()
return nil
}
func (this *Migrator) stopWritesAndCompleteMigration() (err error) {
this.throttle(func() {
log.Debugf("throttling before LOCK TABLES")
})
if this.migrationContext.TestOnReplica {
log.Debugf("testing on replica. Instead of LOCK tables I will STOP SLAVE")
if err := this.retryOperation(this.applier.StopSlaveIOThread); err != nil {
return err
}
} else {
if err := this.retryOperation(this.applier.LockTables); err != nil {
return err
}
}
this.applier.WriteChangelogState(string(AllEventsUpToLockProcessed))
log.Debugf("Waiting for events up to lock")
<-this.allEventsUpToLockProcessed
log.Debugf("Done waiting for events up to lock")
if this.migrationContext.TestOnReplica {
log.Info("Table duplicated with new schema. Am not touching the original table. You may now compare the two tables to gain trust into this tool's operation")
} else {
if err := this.retryOperation(this.applier.SwapTables); err != nil {
return err
}
// Unlock
if err := this.retryOperation(this.applier.UnlockTables); err != nil {
return err
}
// Drop old table
if this.migrationContext.OkToDropTable {
dropTableFunc := func() error {
return this.applier.dropTable(this.migrationContext.GetOldTableName())
}
if err := this.retryOperation(dropTableFunc); err != nil {
return err
}
}
}
lockAndRenameDuration := this.migrationContext.RenameTablesEndTime.Sub(this.migrationContext.LockTablesStartTime)
renameDuration := this.migrationContext.RenameTablesEndTime.Sub(this.migrationContext.RenameTablesStartTime)
log.Debugf("Lock & rename duration: %s (rename only: %s). During this time, queries on %s were locked or failing", lockAndRenameDuration, renameDuration, sql.EscapeName(this.migrationContext.OriginalTableName))
return nil
}
func (this *Migrator) initiateInspector() (err error) {
this.inspector = NewInspector()
if err := this.inspector.InitDBConnections(); err != nil {
return err
@ -257,53 +368,6 @@ func (this *Migrator) Migrate() (err error) {
}
log.Infof("Master found to be %+v", this.migrationContext.ApplierConnectionConfig.Key)
go this.initiateChangelogListener()
if err := this.initiateStreaming(); err != nil {
return err
}
if err := this.initiateApplier(); err != nil {
return err
}
log.Debugf("Waiting for tables to be in place")
<-this.tablesInPlace
log.Debugf("Tables are in place")
// Yay! We now know the Ghost and Changelog tables are good to examine!
// When running on replica, this means the replica has those tables. When running
// on master this is always true, of course, and yet it also implies this knowledge
// is in the binlogs.
if err := this.inspector.InspectOriginalAndGhostTables(); err != nil {
return err
}
if err := this.applier.ReadMigrationRangeValues(); err != nil {
return err
}
go this.initiateThrottler()
go this.executeWriteFuncs()
go this.iterateChunks()
this.migrationContext.RowCopyStartTime = time.Now()
go this.initiateStatus()
log.Debugf("Operating until row copy is complete")
<-this.rowCopyComplete
log.Debugf("Row copy complete")
this.printStatus()
this.throttle(func() {
log.Debugf("throttling on LOCK TABLES")
})
// TODO retries!!
this.applier.LockTables()
this.applier.WriteChangelogState(string(AllEventsUpToLockProcessed))
log.Debugf("Waiting for events up to lock")
<-this.allEventsUpToLockProcessed
log.Debugf("Done waiting for events up to lock")
// TODO retries!!
this.applier.UnlockTables()
return nil
}
@ -347,7 +411,7 @@ func (this *Migrator) printStatus() {
status := fmt.Sprintf("Copy: %d/%d %.1f%%; Backlog: %d/%d; Elapsed: %+v(copy), %+v(total); ETA: %s",
totalRowsCopied, rowsEstimate, progressPct,
len(this.applyEventsQueue), cap(this.applyEventsQueue),
prettifyDurationOutput(this.migrationContext.ElapsedRowCopyTime()), prettifyDurationOutput(elapsedTime),
base.PrettifyDurationOutput(this.migrationContext.ElapsedRowCopyTime()), base.PrettifyDurationOutput(elapsedTime),
eta,
)
this.applier.WriteChangelog(
@ -357,7 +421,7 @@ func (this *Migrator) printStatus() {
fmt.Println(status)
}
func (this *Migrator) initiateChangelogListener() {
func (this *Migrator) initiateHeartbeatListener() {
ticker := time.Tick((heartbeatIntervalMilliseconds * time.Millisecond) / 2)
for range ticker {
go func() error {
@ -367,10 +431,6 @@ func (this *Migrator) initiateChangelogListener() {
}
for hint, value := range changelogState {
switch hint {
case "state":
{
this.onChangelogState(value)
}
case "heartbeat":
{
this.onChangelogHeartbeat(value)
@ -392,6 +452,14 @@ func (this *Migrator) initiateStreaming() error {
log.Debugf("Noop operation; not really listening on binlog events")
return nil
}
this.eventsStreamer.AddListener(
false,
this.migrationContext.DatabaseName,
this.migrationContext.GetChangelogTableName(),
func(dmlEvent *binlog.BinlogDMLEvent) error {
return this.onChangelogStateEvent(dmlEvent)
},
)
this.eventsStreamer.AddListener(
true,
this.migrationContext.DatabaseName,

View File

@ -71,9 +71,10 @@ func (this *ColumnList) Len() int {
// UniqueKey is the combination of a key's name and columns
type UniqueKey struct {
Name string
Columns ColumnList
HasNullable bool
Name string
Columns ColumnList
HasNullable bool
IsAutoIncrement bool
}
// IsPrimary checks if this unique key is primary
@ -86,7 +87,11 @@ func (this *UniqueKey) Len() int {
}
func (this *UniqueKey) String() string {
return fmt.Sprintf("%s: %s; has nullable: %+v", this.Name, this.Columns, this.HasNullable)
description := this.Name
if this.IsAutoIncrement {
description = fmt.Sprintf("%s (auto_incrmenet)", description)
}
return fmt.Sprintf("%s: %s; has nullable: %+v", description, this.Columns.Names, this.HasNullable)
}
type ColumnValues struct {