- Added ok-to-drop-table flag

- Added `switch-to-rbr` flag; applying binlog format change if needed
- Using dedicated db instance for locking & renaming on applier (must be used from within same connection)
- Heartbeat now uses `time.RFC3339Nano`
- Swap tables works! Caveat: short table outage
- `--test-on-replica` works!
- retries: using `panicAbort`: from any goroutine, regardless of context, it is possible to terminate the operation
- Reintroduced changelog events listener on streamer. This is the correct implementation.
This commit is contained in:
Shlomi Noach 2016-04-18 10:57:18 -07:00
parent 75c3fe0bee
commit eeffa701d6
6 changed files with 295 additions and 112 deletions

View File

@ -37,31 +37,38 @@ type MigrationContext struct {
OriginalTableName string OriginalTableName string
AlterStatement string AlterStatement string
Noop bool CountTableRows bool
TestOnReplica bool AllowedRunningOnMaster bool
SwitchToRowBinlogFormat bool
TableEngine string
CountTableRows bool
RowsEstimate int64
UsedRowsEstimateMethod RowsEstimateMethod
ChunkSize int64 ChunkSize int64
OriginalBinlogFormat string
OriginalBinlogRowImage string
AllowedRunningOnMaster bool
InspectorConnectionConfig *mysql.ConnectionConfig
ApplierConnectionConfig *mysql.ConnectionConfig
StartTime time.Time
RowCopyStartTime time.Time
CurrentLag int64
MaxLagMillisecondsThrottleThreshold int64 MaxLagMillisecondsThrottleThreshold int64
ThrottleFlagFile string ThrottleFlagFile string
ThrottleAdditionalFlagFile string ThrottleAdditionalFlagFile string
TotalRowsCopied int64
isThrottled bool
throttleReason string
throttleMutex *sync.Mutex
MaxLoad map[string]int64 MaxLoad map[string]int64
Noop bool
TestOnReplica bool
OkToDropTable bool
TableEngine string
RowsEstimate int64
UsedRowsEstimateMethod RowsEstimateMethod
OriginalBinlogFormat string
OriginalBinlogRowImage string
InspectorConnectionConfig *mysql.ConnectionConfig
ApplierConnectionConfig *mysql.ConnectionConfig
StartTime time.Time
RowCopyStartTime time.Time
LockTablesStartTime time.Time
RenameTablesStartTime time.Time
RenameTablesEndTime time.Time
CurrentLag int64
TotalRowsCopied int64
isThrottled bool
throttleReason string
throttleMutex *sync.Mutex
OriginalTableColumns *sql.ColumnList OriginalTableColumns *sql.ColumnList
OriginalTableUniqueKeys [](*sql.UniqueKey) OriginalTableUniqueKeys [](*sql.UniqueKey)
GhostTableColumns *sql.ColumnList GhostTableColumns *sql.ColumnList

View File

@ -32,7 +32,9 @@ func main() {
executeFlag := flag.Bool("execute", false, "actually execute the alter & migrate the table. Default is noop: do some tests and exit") executeFlag := flag.Bool("execute", false, "actually execute the alter & migrate the table. Default is noop: do some tests and exit")
flag.BoolVar(&migrationContext.TestOnReplica, "test-on-replica", false, "Have the migration run on a replica, not on the master. At the end of migration tables are not swapped; gh-osc issues `STOP SLAVE` and you can compare the two tables for building trust") flag.BoolVar(&migrationContext.TestOnReplica, "test-on-replica", false, "Have the migration run on a replica, not on the master. At the end of migration tables are not swapped; gh-osc issues `STOP SLAVE` and you can compare the two tables for building trust")
flag.BoolVar(&migrationContext.OkToDropTable, "ok-to-drop-table", false, "Shall the tool drop the old table at end of operation. DROPping tables can be a long locking operation, which is why I'm not doing it by default. I'm an online tool, yes?")
flag.BoolVar(&migrationContext.SwitchToRowBinlogFormat, "switch-to-rbr", false, "let this tool automatically switch binary log format to 'ROW' on the replica, if needed. The format will NOT be switched back. I'm too scared to do that, and wish to protect you if you happen to execute another migration while this one is running")
flag.Int64Var(&migrationContext.ChunkSize, "chunk-size", 1000, "amount of rows to handle in each iteration (allowed range: 100-100,000)") flag.Int64Var(&migrationContext.ChunkSize, "chunk-size", 1000, "amount of rows to handle in each iteration (allowed range: 100-100,000)")
if migrationContext.ChunkSize < 100 { if migrationContext.ChunkSize < 100 {
migrationContext.ChunkSize = 100 migrationContext.ChunkSize = 100

View File

@ -25,6 +25,7 @@ import (
type Applier struct { type Applier struct {
connectionConfig *mysql.ConnectionConfig connectionConfig *mysql.ConnectionConfig
db *gosql.DB db *gosql.DB
singletonDB *gosql.DB
migrationContext *base.MigrationContext migrationContext *base.MigrationContext
} }
@ -36,21 +37,29 @@ func NewApplier() *Applier {
} }
func (this *Applier) InitDBConnections() (err error) { func (this *Applier) InitDBConnections() (err error) {
ApplierUri := this.connectionConfig.GetDBUri(this.migrationContext.DatabaseName) applierUri := this.connectionConfig.GetDBUri(this.migrationContext.DatabaseName)
if this.db, _, err = sqlutils.GetDB(ApplierUri); err != nil { if this.db, _, err = sqlutils.GetDB(applierUri); err != nil {
return err return err
} }
if err := this.validateConnection(); err != nil { singletonApplierUri := fmt.Sprintf("%s?timeout=0", applierUri)
if this.singletonDB, _, err = sqlutils.GetDB(singletonApplierUri); err != nil {
return err
}
this.singletonDB.SetMaxOpenConns(1)
if err := this.validateConnection(this.db); err != nil {
return err
}
if err := this.validateConnection(this.singletonDB); err != nil {
return err return err
} }
return nil return nil
} }
// validateConnection issues a simple can-connect to MySQL // validateConnection issues a simple can-connect to MySQL
func (this *Applier) validateConnection() error { func (this *Applier) validateConnection(db *gosql.DB) error {
query := `select @@global.port` query := `select @@global.port`
var port int var port int
if err := this.db.QueryRow(query).Scan(&port); err != nil { if err := db.QueryRow(query).Scan(&port); err != nil {
return err return err
} }
if port != this.connectionConfig.Key.Port { if port != this.connectionConfig.Key.Port {
@ -125,6 +134,10 @@ func (this *Applier) CreateChangelogTable() error {
// dropTable drops a given table on the applied host // dropTable drops a given table on the applied host
func (this *Applier) dropTable(tableName string) error { func (this *Applier) dropTable(tableName string) error {
if this.migrationContext.Noop {
log.Debugf("Noop operation; not really dropping table %s", sql.EscapeName(tableName))
return nil
}
query := fmt.Sprintf(`drop /* gh-osc */ table if exists %s.%s`, query := fmt.Sprintf(`drop /* gh-osc */ table if exists %s.%s`,
sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(tableName), sql.EscapeName(tableName),
@ -192,7 +205,7 @@ func (this *Applier) WriteChangelogState(value string) (string, error) {
func (this *Applier) InitiateHeartbeat(heartbeatIntervalMilliseconds int64) { func (this *Applier) InitiateHeartbeat(heartbeatIntervalMilliseconds int64) {
numSuccessiveFailures := 0 numSuccessiveFailures := 0
injectHeartbeat := func() error { injectHeartbeat := func() error {
if _, err := this.WriteChangelog("heartbeat", time.Now().Format(time.RFC3339)); err != nil { if _, err := this.WriteChangelog("heartbeat", time.Now().Format(time.RFC3339Nano)); err != nil {
numSuccessiveFailures++ numSuccessiveFailures++
if numSuccessiveFailures > this.migrationContext.MaxRetries() { if numSuccessiveFailures > this.migrationContext.MaxRetries() {
return log.Errore(err) return log.Errore(err)
@ -391,16 +404,21 @@ func (this *Applier) LockTables() error {
return nil return nil
} }
query := fmt.Sprintf(`lock /* gh-osc */ tables %s.%s write, %s.%s write, %s.%s write`, // query := fmt.Sprintf(`lock /* gh-osc */ tables %s.%s write, %s.%s write, %s.%s write`,
// sql.EscapeName(this.migrationContext.DatabaseName),
// sql.EscapeName(this.migrationContext.OriginalTableName),
// sql.EscapeName(this.migrationContext.DatabaseName),
// sql.EscapeName(this.migrationContext.GetGhostTableName()),
// sql.EscapeName(this.migrationContext.DatabaseName),
// sql.EscapeName(this.migrationContext.GetChangelogTableName()),
// )
query := fmt.Sprintf(`lock /* gh-osc */ tables %s.%s write`,
sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.OriginalTableName), sql.EscapeName(this.migrationContext.OriginalTableName),
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.GetGhostTableName()),
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.GetChangelogTableName()),
) )
log.Infof("Locking tables") log.Infof("Locking tables")
if _, err := sqlutils.ExecNoPrepare(this.db, query); err != nil { this.migrationContext.LockTablesStartTime = time.Now()
if _, err := sqlutils.ExecNoPrepare(this.singletonDB, query); err != nil {
return err return err
} }
log.Infof("Tables locked") log.Infof("Tables locked")
@ -411,13 +429,70 @@ func (this *Applier) LockTables() error {
func (this *Applier) UnlockTables() error { func (this *Applier) UnlockTables() error {
query := `unlock /* gh-osc */ tables` query := `unlock /* gh-osc */ tables`
log.Infof("Unlocking tables") log.Infof("Unlocking tables")
if _, err := sqlutils.ExecNoPrepare(this.db, query); err != nil { if _, err := sqlutils.ExecNoPrepare(this.singletonDB, query); err != nil {
return err return err
} }
log.Infof("Tables unlocked") log.Infof("Tables unlocked")
return nil return nil
} }
// LockTables
func (this *Applier) SwapTables() error {
if this.migrationContext.Noop {
log.Debugf("Noop operation; not really swapping tables")
return nil
}
// query := fmt.Sprintf(`rename /* gh-osc */ table %s.%s to %s.%s, %s.%s to %s.%s`,
// sql.EscapeName(this.migrationContext.DatabaseName),
// sql.EscapeName(this.migrationContext.OriginalTableName),
// sql.EscapeName(this.migrationContext.DatabaseName),
// sql.EscapeName(this.migrationContext.GetOldTableName()),
// sql.EscapeName(this.migrationContext.DatabaseName),
// sql.EscapeName(this.migrationContext.GetGhostTableName()),
// sql.EscapeName(this.migrationContext.DatabaseName),
// sql.EscapeName(this.migrationContext.OriginalTableName),
// )
// log.Infof("Renaming tables")
// if _, err := sqlutils.ExecNoPrepare(this.singletonDB, query); err != nil {
// return err
// }
query := fmt.Sprintf(`alter /* gh-osc */ table %s.%s rename %s`,
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.OriginalTableName),
sql.EscapeName(this.migrationContext.GetOldTableName()),
)
log.Infof("Renaming original table")
this.migrationContext.RenameTablesStartTime = time.Now()
if _, err := sqlutils.ExecNoPrepare(this.singletonDB, query); err != nil {
return err
}
query = fmt.Sprintf(`alter /* gh-osc */ table %s.%s rename %s`,
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.GetGhostTableName()),
sql.EscapeName(this.migrationContext.OriginalTableName),
)
log.Infof("Renaming ghost table")
if _, err := sqlutils.ExecNoPrepare(this.db, query); err != nil {
return err
}
this.migrationContext.RenameTablesEndTime = time.Now()
log.Infof("Tables renamed")
return nil
}
// StopSlaveIOThread is applicable with --test-on-replica; it stops the IO thread
func (this *Applier) StopSlaveIOThread() error {
query := `stop /* gh-osc */ slave io_thread`
log.Infof("Stopping replication")
if _, err := sqlutils.ExecNoPrepare(this.db, query); err != nil {
return err
}
log.Infof("Replication stopped")
return nil
}
func (this *Applier) ShowStatusVariable(variableName string) (result int64, err error) { func (this *Applier) ShowStatusVariable(variableName string) (result int64, err error) {
query := fmt.Sprintf(`show global status like '%s'`, variableName) query := fmt.Sprintf(`show global status like '%s'`, variableName)
if err := this.db.QueryRow(query).Scan(&variableName, &result); err != nil { if err := this.db.QueryRow(query).Scan(&variableName, &result); err != nil {

View File

@ -44,10 +44,13 @@ func (this *Inspector) InitDBConnections() (err error) {
if err := this.validateGrants(); err != nil { if err := this.validateGrants(); err != nil {
return err return err
} }
if err := this.restartReplication(); err != nil { // if err := this.restartReplication(); err != nil {
// return err
// }
if err := this.validateBinlogs(); err != nil {
return err return err
} }
if err := this.validateBinlogs(); err != nil { if err := this.applyBinlogFormat(); err != nil {
return err return err
} }
return nil return nil
@ -204,6 +207,24 @@ func (this *Inspector) restartReplication() error {
return nil return nil
} }
// applyBinlogFormat sets ROW binlog format and restarts replication to make
// the replication thread apply it.
func (this *Inspector) applyBinlogFormat() error {
if this.migrationContext.RequiresBinlogFormatChange() {
if _, err := sqlutils.ExecNoPrepare(this.db, `set global binlog_format='ROW'`); err != nil {
return err
}
if _, err := sqlutils.ExecNoPrepare(this.db, `set session binlog_format='ROW'`); err != nil {
return err
}
log.Debugf("'ROW' binlog format applied")
}
if err := this.restartReplication(); err != nil {
return err
}
return nil
}
// validateBinlogs checks that binary log configuration is good to go // validateBinlogs checks that binary log configuration is good to go
func (this *Inspector) validateBinlogs() error { func (this *Inspector) validateBinlogs() error {
query := `select @@global.log_bin, @@global.log_slave_updates, @@global.binlog_format` query := `select @@global.log_bin, @@global.log_slave_updates, @@global.binlog_format`
@ -218,6 +239,9 @@ func (this *Inspector) validateBinlogs() error {
return fmt.Errorf("%s:%d must have log_slave_updates enabled", this.connectionConfig.Key.Hostname, this.connectionConfig.Key.Port) return fmt.Errorf("%s:%d must have log_slave_updates enabled", this.connectionConfig.Key.Hostname, this.connectionConfig.Key.Port)
} }
if this.migrationContext.RequiresBinlogFormatChange() { if this.migrationContext.RequiresBinlogFormatChange() {
if !this.migrationContext.SwitchToRowBinlogFormat {
return fmt.Errorf("You must be using ROW binlog format. I can switch it for you, provided --switch-to-rbr and that %s:%d doesn't have replicas", this.connectionConfig.Key.Hostname, this.connectionConfig.Key.Port)
}
query := fmt.Sprintf(`show /* gh-osc */ slave hosts`) query := fmt.Sprintf(`show /* gh-osc */ slave hosts`)
countReplicas := 0 countReplicas := 0
err := sqlutils.QueryRowsMap(this.db, query, func(rowMap sqlutils.RowMap) error { err := sqlutils.QueryRowsMap(this.db, query, func(rowMap sqlutils.RowMap) error {
@ -230,7 +254,7 @@ func (this *Inspector) validateBinlogs() error {
if countReplicas > 0 { if countReplicas > 0 {
return fmt.Errorf("%s:%d has %s binlog_format, but I'm too scared to change it to ROW because it has replicas. Bailing out", this.connectionConfig.Key.Hostname, this.connectionConfig.Key.Port, this.migrationContext.OriginalBinlogFormat) return fmt.Errorf("%s:%d has %s binlog_format, but I'm too scared to change it to ROW because it has replicas. Bailing out", this.connectionConfig.Key.Hostname, this.connectionConfig.Key.Port, this.migrationContext.OriginalBinlogFormat)
} }
log.Infof("%s:%d has %s binlog_format. I will change it to ROW for the duration of this migration.", this.connectionConfig.Key.Hostname, this.connectionConfig.Key.Port, this.migrationContext.OriginalBinlogFormat) log.Infof("%s:%d has %s binlog_format. I will change it to ROW, and will NOT change it back, even in the event of failure.", this.connectionConfig.Key.Hostname, this.connectionConfig.Key.Port, this.migrationContext.OriginalBinlogFormat)
} }
query = `select @@global.binlog_row_image` query = `select @@global.binlog_row_image`
if err := this.db.QueryRow(query).Scan(&this.migrationContext.OriginalBinlogRowImage); err != nil { if err := this.db.QueryRow(query).Scan(&this.migrationContext.OriginalBinlogRowImage); err != nil {
@ -369,6 +393,7 @@ func (this *Inspector) getCandidateUniqueKeys(tableName string) (uniqueKeys [](*
UNIQUES.COUNT_COLUMN_IN_INDEX, UNIQUES.COUNT_COLUMN_IN_INDEX,
COLUMNS.DATA_TYPE, COLUMNS.DATA_TYPE,
COLUMNS.CHARACTER_SET_NAME, COLUMNS.CHARACTER_SET_NAME,
LOCATE('auto_increment', EXTRA) > 0 as is_auto_increment,
has_nullable has_nullable
FROM INFORMATION_SCHEMA.COLUMNS INNER JOIN ( FROM INFORMATION_SCHEMA.COLUMNS INNER JOIN (
SELECT SELECT
@ -414,11 +439,12 @@ func (this *Inspector) getCandidateUniqueKeys(tableName string) (uniqueKeys [](*
END, END,
COUNT_COLUMN_IN_INDEX COUNT_COLUMN_IN_INDEX
` `
err = sqlutils.QueryRowsMap(this.db, query, func(rowMap sqlutils.RowMap) error { err = sqlutils.QueryRowsMap(this.db, query, func(m sqlutils.RowMap) error {
uniqueKey := &sql.UniqueKey{ uniqueKey := &sql.UniqueKey{
Name: rowMap.GetString("INDEX_NAME"), Name: m.GetString("INDEX_NAME"),
Columns: *sql.ParseColumnList(rowMap.GetString("COLUMN_NAMES")), Columns: *sql.ParseColumnList(m.GetString("COLUMN_NAMES")),
HasNullable: rowMap.GetBool("has_nullable"), HasNullable: m.GetBool("has_nullable"),
IsAutoIncrement: m.GetBool("is_auto_increment"),
} }
uniqueKeys = append(uniqueKeys, uniqueKey) uniqueKeys = append(uniqueKeys, uniqueKey)
return nil return nil
@ -426,7 +452,7 @@ func (this *Inspector) getCandidateUniqueKeys(tableName string) (uniqueKeys [](*
if err != nil { if err != nil {
return uniqueKeys, err return uniqueKeys, err
} }
log.Debugf("Potential unique keys: %+v", uniqueKeys) log.Debugf("Potential unique keys in %+v: %+v", tableName, uniqueKeys)
return uniqueKeys, nil return uniqueKeys, nil
} }

View File

@ -9,13 +9,13 @@ import (
"fmt" "fmt"
"os" "os"
"os/signal" "os/signal"
"regexp"
"sync/atomic" "sync/atomic"
"syscall" "syscall"
"time" "time"
"github.com/github/gh-osc/go/base" "github.com/github/gh-osc/go/base"
"github.com/github/gh-osc/go/binlog" "github.com/github/gh-osc/go/binlog"
"github.com/github/gh-osc/go/sql"
"github.com/outbrain/golib/log" "github.com/outbrain/golib/log"
) )
@ -34,10 +34,6 @@ const (
heartbeatIntervalMilliseconds = 1000 heartbeatIntervalMilliseconds = 1000
) )
var (
prettifyDurationRegexp = regexp.MustCompile("([.][0-9]+)")
)
// Migrator is the main schema migration flow manager. // Migrator is the main schema migration flow manager.
type Migrator struct { type Migrator struct {
inspector *Inspector inspector *Inspector
@ -48,6 +44,7 @@ type Migrator struct {
tablesInPlace chan bool tablesInPlace chan bool
rowCopyComplete chan bool rowCopyComplete chan bool
allEventsUpToLockProcessed chan bool allEventsUpToLockProcessed chan bool
panicAbort chan error
// copyRowsQueue should not be buffered; if buffered some non-damaging but // copyRowsQueue should not be buffered; if buffered some non-damaging but
// excessive work happens at the end of the iteration as new copy-jobs arrive befroe realizing the copy is complete // excessive work happens at the end of the iteration as new copy-jobs arrive befroe realizing the copy is complete
@ -63,6 +60,7 @@ func NewMigrator() *Migrator {
tablesInPlace: make(chan bool), tablesInPlace: make(chan bool),
rowCopyComplete: make(chan bool), rowCopyComplete: make(chan bool),
allEventsUpToLockProcessed: make(chan bool), allEventsUpToLockProcessed: make(chan bool),
panicAbort: make(chan error),
copyRowsQueue: make(chan tableWriteFunc), copyRowsQueue: make(chan tableWriteFunc),
applyEventsQueue: make(chan tableWriteFunc, applyEventsQueueBuffer), applyEventsQueue: make(chan tableWriteFunc, applyEventsQueueBuffer),
@ -71,15 +69,6 @@ func NewMigrator() *Migrator {
return migrator return migrator
} }
func prettifyDurationOutput(d time.Duration) string {
if d < time.Second {
return "0s"
}
result := fmt.Sprintf("%s", d)
result = prettifyDurationRegexp.ReplaceAllString(result, "")
return result
}
// acceptSignals registers for OS signals // acceptSignals registers for OS signals
func (this *Migrator) acceptSignals() { func (this *Migrator) acceptSignals() {
c := make(chan os.Signal, 1) c := make(chan os.Signal, 1)
@ -182,6 +171,7 @@ func (this *Migrator) retryOperation(operation func() error) (err error) {
} }
// there's an error. Let's try again. // there's an error. Let's try again.
} }
this.panicAbort <- err
return err return err
} }
@ -189,9 +179,34 @@ func (this *Migrator) canStopStreaming() bool {
return false return false
} }
func (this *Migrator) onChangelogStateEvent(dmlEvent *binlog.BinlogDMLEvent) (err error) {
// Hey, I created the changlog table, I know the type of columns it has!
if hint := dmlEvent.NewColumnValues.StringColumn(2); hint != "state" {
return nil
}
changelogState := ChangelogState(dmlEvent.NewColumnValues.StringColumn(3))
switch changelogState {
case TablesInPlace:
{
this.tablesInPlace <- true
}
case AllEventsUpToLockProcessed:
{
this.allEventsUpToLockProcessed <- true
}
default:
{
return fmt.Errorf("Unknown changelog state: %+v", changelogState)
}
}
log.Debugf("Received state %+v", changelogState)
return nil
}
func (this *Migrator) onChangelogState(stateValue string) (err error) { func (this *Migrator) onChangelogState(stateValue string) (err error) {
log.Fatalf("I shouldn't be here")
if this.handledChangelogStates[stateValue] { if this.handledChangelogStates[stateValue] {
return return nil
} }
this.handledChangelogStates[stateValue] = true this.handledChangelogStates[stateValue] = true
@ -215,7 +230,7 @@ func (this *Migrator) onChangelogState(stateValue string) (err error) {
} }
func (this *Migrator) onChangelogHeartbeat(heartbeatValue string) (err error) { func (this *Migrator) onChangelogHeartbeat(heartbeatValue string) (err error) {
heartbeatTime, err := time.Parse(time.RFC3339, heartbeatValue) heartbeatTime, err := time.Parse(time.RFC3339Nano, heartbeatValue)
if err != nil { if err != nil {
return log.Errore(err) return log.Errore(err)
} }
@ -226,9 +241,105 @@ func (this *Migrator) onChangelogHeartbeat(heartbeatValue string) (err error) {
return nil return nil
} }
//
func (this *Migrator) listenOnPanicAbort() {
err := <-this.panicAbort
log.Fatale(err)
}
func (this *Migrator) Migrate() (err error) { func (this *Migrator) Migrate() (err error) {
this.migrationContext.StartTime = time.Now() this.migrationContext.StartTime = time.Now()
go this.listenOnPanicAbort()
if err := this.initiateInspector(); err != nil {
return err
}
if err := this.initiateStreaming(); err != nil {
return err
}
if err := this.initiateApplier(); err != nil {
return err
}
log.Debugf("Waiting for tables to be in place")
<-this.tablesInPlace
log.Debugf("Tables are in place")
// Yay! We now know the Ghost and Changelog tables are good to examine!
// When running on replica, this means the replica has those tables. When running
// on master this is always true, of course, and yet it also implies this knowledge
// is in the binlogs.
if err := this.inspector.InspectOriginalAndGhostTables(); err != nil {
return err
}
go this.initiateHeartbeatListener()
if err := this.applier.ReadMigrationRangeValues(); err != nil {
return err
}
go this.initiateThrottler()
go this.executeWriteFuncs()
go this.iterateChunks()
this.migrationContext.RowCopyStartTime = time.Now()
go this.initiateStatus()
log.Debugf("Operating until row copy is complete")
<-this.rowCopyComplete
log.Debugf("Row copy complete")
this.printStatus()
this.stopWritesAndCompleteMigration()
return nil
}
func (this *Migrator) stopWritesAndCompleteMigration() (err error) {
this.throttle(func() {
log.Debugf("throttling before LOCK TABLES")
})
if this.migrationContext.TestOnReplica {
log.Debugf("testing on replica. Instead of LOCK tables I will STOP SLAVE")
if err := this.retryOperation(this.applier.StopSlaveIOThread); err != nil {
return err
}
} else {
if err := this.retryOperation(this.applier.LockTables); err != nil {
return err
}
}
this.applier.WriteChangelogState(string(AllEventsUpToLockProcessed))
log.Debugf("Waiting for events up to lock")
<-this.allEventsUpToLockProcessed
log.Debugf("Done waiting for events up to lock")
if this.migrationContext.TestOnReplica {
log.Info("Table duplicated with new schema. Am not touching the original table. You may now compare the two tables to gain trust into this tool's operation")
} else {
if err := this.retryOperation(this.applier.SwapTables); err != nil {
return err
}
// Unlock
if err := this.retryOperation(this.applier.UnlockTables); err != nil {
return err
}
// Drop old table
if this.migrationContext.OkToDropTable {
dropTableFunc := func() error {
return this.applier.dropTable(this.migrationContext.GetOldTableName())
}
if err := this.retryOperation(dropTableFunc); err != nil {
return err
}
}
}
lockAndRenameDuration := this.migrationContext.RenameTablesEndTime.Sub(this.migrationContext.LockTablesStartTime)
renameDuration := this.migrationContext.RenameTablesEndTime.Sub(this.migrationContext.RenameTablesStartTime)
log.Debugf("Lock & rename duration: %s (rename only: %s). During this time, queries on %s were locked or failing", lockAndRenameDuration, renameDuration, sql.EscapeName(this.migrationContext.OriginalTableName))
return nil
}
func (this *Migrator) initiateInspector() (err error) {
this.inspector = NewInspector() this.inspector = NewInspector()
if err := this.inspector.InitDBConnections(); err != nil { if err := this.inspector.InitDBConnections(); err != nil {
return err return err
@ -257,53 +368,6 @@ func (this *Migrator) Migrate() (err error) {
} }
log.Infof("Master found to be %+v", this.migrationContext.ApplierConnectionConfig.Key) log.Infof("Master found to be %+v", this.migrationContext.ApplierConnectionConfig.Key)
go this.initiateChangelogListener()
if err := this.initiateStreaming(); err != nil {
return err
}
if err := this.initiateApplier(); err != nil {
return err
}
log.Debugf("Waiting for tables to be in place")
<-this.tablesInPlace
log.Debugf("Tables are in place")
// Yay! We now know the Ghost and Changelog tables are good to examine!
// When running on replica, this means the replica has those tables. When running
// on master this is always true, of course, and yet it also implies this knowledge
// is in the binlogs.
if err := this.inspector.InspectOriginalAndGhostTables(); err != nil {
return err
}
if err := this.applier.ReadMigrationRangeValues(); err != nil {
return err
}
go this.initiateThrottler()
go this.executeWriteFuncs()
go this.iterateChunks()
this.migrationContext.RowCopyStartTime = time.Now()
go this.initiateStatus()
log.Debugf("Operating until row copy is complete")
<-this.rowCopyComplete
log.Debugf("Row copy complete")
this.printStatus()
this.throttle(func() {
log.Debugf("throttling on LOCK TABLES")
})
// TODO retries!!
this.applier.LockTables()
this.applier.WriteChangelogState(string(AllEventsUpToLockProcessed))
log.Debugf("Waiting for events up to lock")
<-this.allEventsUpToLockProcessed
log.Debugf("Done waiting for events up to lock")
// TODO retries!!
this.applier.UnlockTables()
return nil return nil
} }
@ -347,7 +411,7 @@ func (this *Migrator) printStatus() {
status := fmt.Sprintf("Copy: %d/%d %.1f%%; Backlog: %d/%d; Elapsed: %+v(copy), %+v(total); ETA: %s", status := fmt.Sprintf("Copy: %d/%d %.1f%%; Backlog: %d/%d; Elapsed: %+v(copy), %+v(total); ETA: %s",
totalRowsCopied, rowsEstimate, progressPct, totalRowsCopied, rowsEstimate, progressPct,
len(this.applyEventsQueue), cap(this.applyEventsQueue), len(this.applyEventsQueue), cap(this.applyEventsQueue),
prettifyDurationOutput(this.migrationContext.ElapsedRowCopyTime()), prettifyDurationOutput(elapsedTime), base.PrettifyDurationOutput(this.migrationContext.ElapsedRowCopyTime()), base.PrettifyDurationOutput(elapsedTime),
eta, eta,
) )
this.applier.WriteChangelog( this.applier.WriteChangelog(
@ -357,7 +421,7 @@ func (this *Migrator) printStatus() {
fmt.Println(status) fmt.Println(status)
} }
func (this *Migrator) initiateChangelogListener() { func (this *Migrator) initiateHeartbeatListener() {
ticker := time.Tick((heartbeatIntervalMilliseconds * time.Millisecond) / 2) ticker := time.Tick((heartbeatIntervalMilliseconds * time.Millisecond) / 2)
for range ticker { for range ticker {
go func() error { go func() error {
@ -367,10 +431,6 @@ func (this *Migrator) initiateChangelogListener() {
} }
for hint, value := range changelogState { for hint, value := range changelogState {
switch hint { switch hint {
case "state":
{
this.onChangelogState(value)
}
case "heartbeat": case "heartbeat":
{ {
this.onChangelogHeartbeat(value) this.onChangelogHeartbeat(value)
@ -392,6 +452,14 @@ func (this *Migrator) initiateStreaming() error {
log.Debugf("Noop operation; not really listening on binlog events") log.Debugf("Noop operation; not really listening on binlog events")
return nil return nil
} }
this.eventsStreamer.AddListener(
false,
this.migrationContext.DatabaseName,
this.migrationContext.GetChangelogTableName(),
func(dmlEvent *binlog.BinlogDMLEvent) error {
return this.onChangelogStateEvent(dmlEvent)
},
)
this.eventsStreamer.AddListener( this.eventsStreamer.AddListener(
true, true,
this.migrationContext.DatabaseName, this.migrationContext.DatabaseName,

View File

@ -71,9 +71,10 @@ func (this *ColumnList) Len() int {
// UniqueKey is the combination of a key's name and columns // UniqueKey is the combination of a key's name and columns
type UniqueKey struct { type UniqueKey struct {
Name string Name string
Columns ColumnList Columns ColumnList
HasNullable bool HasNullable bool
IsAutoIncrement bool
} }
// IsPrimary checks if this unique key is primary // IsPrimary checks if this unique key is primary
@ -86,7 +87,11 @@ func (this *UniqueKey) Len() int {
} }
func (this *UniqueKey) String() string { func (this *UniqueKey) String() string {
return fmt.Sprintf("%s: %s; has nullable: %+v", this.Name, this.Columns, this.HasNullable) description := this.Name
if this.IsAutoIncrement {
description = fmt.Sprintf("%s (auto_incrmenet)", description)
}
return fmt.Sprintf("%s: %s; has nullable: %+v", description, this.Columns.Names, this.HasNullable)
} }
type ColumnValues struct { type ColumnValues struct {