ongoing development:

- accepts --max-load
- accepts multiple conditions in --max-load
- throttle includes reason
- chunk-size sanity check
- change log state writes both in appending (history) mode and in replacing (current) mode
- more atomic checks
- inspecting ghost table columns, unique key
- comparing unique keys between tables; sanity
- intersecting columns between tables
- prettify status
- refactored throttle() and retries()
This commit is contained in:
Shlomi Noach 2016-04-08 14:35:06 +02:00
parent 4652bb7728
commit a1a34b8150
6 changed files with 277 additions and 88 deletions

View File

@ -7,6 +7,7 @@ package base
import ( import (
"fmt" "fmt"
"strconv"
"strings" "strings"
"sync/atomic" "sync/atomic"
"time" "time"
@ -44,20 +45,29 @@ type MigrationContext struct {
AllowedRunningOnMaster bool AllowedRunningOnMaster bool
InspectorConnectionConfig *mysql.ConnectionConfig InspectorConnectionConfig *mysql.ConnectionConfig
MasterConnectionConfig *mysql.ConnectionConfig MasterConnectionConfig *mysql.ConnectionConfig
MigrationRangeMinValues *sql.ColumnValues
MigrationRangeMaxValues *sql.ColumnValues
Iteration int64
MigrationIterationRangeMinValues *sql.ColumnValues
MigrationIterationRangeMaxValues *sql.ColumnValues
UniqueKey *sql.UniqueKey
StartTime time.Time StartTime time.Time
RowCopyStartTime time.Time RowCopyStartTime time.Time
CurrentLag int64 CurrentLag int64
MaxLagMillisecondsThrottleThreshold int64 MaxLagMillisecondsThrottleThreshold int64
ThrottleFlagFile string ThrottleFlagFile string
TotalRowsCopied int64 TotalRowsCopied int64
isThrottled int64
ThrottleReason string
MaxLoad map[string]int64
OriginalTableColumns sql.ColumnList
OriginalTableColumnsMap sql.ColumnsMap
OriginalTableUniqueKeys [](*sql.UniqueKey)
GhostTableColumns sql.ColumnList
GhostTableUniqueKeys [](*sql.UniqueKey)
UniqueKey *sql.UniqueKey
SharedColumns sql.ColumnList
MigrationRangeMinValues *sql.ColumnValues
MigrationRangeMaxValues *sql.ColumnValues
Iteration int64
MigrationIterationRangeMinValues *sql.ColumnValues
MigrationIterationRangeMaxValues *sql.ColumnValues
IsThrottled func() bool
CanStopStreaming func() bool CanStopStreaming func() bool
} }
@ -73,6 +83,7 @@ func newMigrationContext() *MigrationContext {
InspectorConnectionConfig: mysql.NewConnectionConfig(), InspectorConnectionConfig: mysql.NewConnectionConfig(),
MasterConnectionConfig: mysql.NewConnectionConfig(), MasterConnectionConfig: mysql.NewConnectionConfig(),
MaxLagMillisecondsThrottleThreshold: 1000, MaxLagMillisecondsThrottleThreshold: 1000,
MaxLoad: make(map[string]int64),
} }
} }
@ -141,3 +152,41 @@ func (this *MigrationContext) ElapsedRowCopyTime() time.Duration {
func (this *MigrationContext) GetTotalRowsCopied() int64 { func (this *MigrationContext) GetTotalRowsCopied() int64 {
return atomic.LoadInt64(&this.TotalRowsCopied) return atomic.LoadInt64(&this.TotalRowsCopied)
} }
func (this *MigrationContext) GetIteration() int64 {
return atomic.LoadInt64(&this.Iteration)
}
func (this *MigrationContext) SetThrottled(throttle bool) {
if throttle {
atomic.StoreInt64(&this.isThrottled, 1)
} else {
atomic.StoreInt64(&this.isThrottled, 0)
}
}
func (this *MigrationContext) IsThrottled() bool {
return atomic.LoadInt64(&this.isThrottled) != 0
}
func (this *MigrationContext) ReadMaxLoad(maxLoadList string) error {
if maxLoadList == "" {
return nil
}
maxLoadConditions := strings.Split(maxLoadList, ",")
for _, maxLoadCondition := range maxLoadConditions {
maxLoadTokens := strings.Split(maxLoadCondition, "=")
if len(maxLoadTokens) != 2 {
return fmt.Errorf("Error parsing max-load condition: %s", maxLoadCondition)
}
if maxLoadTokens[0] == "" {
return fmt.Errorf("Error parsing status variable in max-load condition: %s", maxLoadCondition)
}
if n, err := strconv.ParseInt(maxLoadTokens[1], 10, 0); err != nil {
return fmt.Errorf("Error parsing numeric value in max-load condition: %s", maxLoadCondition)
} else {
this.MaxLoad[maxLoadTokens[0]] = n
}
}
return nil
}

View File

@ -19,11 +19,6 @@ import (
func main() { func main() {
migrationContext := base.GetMigrationContext() migrationContext := base.GetMigrationContext()
// mysqlBasedir := flag.String("mysql-basedir", "", "the --basedir config for MySQL (auto-detected if not given)")
// mysqlDatadir := flag.String("mysql-datadir", "", "the --datadir config for MySQL (auto-detected if not given)")
internalExperiment := flag.Bool("internal-experiment", false, "issue an internal experiment")
binlogFile := flag.String("binlog-file", "", "Name of binary log file")
flag.StringVar(&migrationContext.InspectorConnectionConfig.Key.Hostname, "host", "127.0.0.1", "MySQL hostname (preferably a replica, not the master)") flag.StringVar(&migrationContext.InspectorConnectionConfig.Key.Hostname, "host", "127.0.0.1", "MySQL hostname (preferably a replica, not the master)")
flag.IntVar(&migrationContext.InspectorConnectionConfig.Key.Port, "port", 3306, "MySQL port (preferably a replica, not the master)") flag.IntVar(&migrationContext.InspectorConnectionConfig.Key.Port, "port", 3306, "MySQL port (preferably a replica, not the master)")
flag.StringVar(&migrationContext.InspectorConnectionConfig.User, "user", "root", "MySQL user") flag.StringVar(&migrationContext.InspectorConnectionConfig.User, "user", "root", "MySQL user")
@ -35,9 +30,16 @@ func main() {
flag.BoolVar(&migrationContext.CountTableRows, "exact-rowcount", false, "actually count table rows as opposed to estimate them (results in more accurate progress estimation)") flag.BoolVar(&migrationContext.CountTableRows, "exact-rowcount", false, "actually count table rows as opposed to estimate them (results in more accurate progress estimation)")
flag.BoolVar(&migrationContext.AllowedRunningOnMaster, "allow-on-master", false, "allow this migration to run directly on master. Preferably it would run on a replica") flag.BoolVar(&migrationContext.AllowedRunningOnMaster, "allow-on-master", false, "allow this migration to run directly on master. Preferably it would run on a replica")
flag.Int64Var(&migrationContext.ChunkSize, "chunk-size", 1000, "amount of rows to handle in each iteration") flag.Int64Var(&migrationContext.ChunkSize, "chunk-size", 1000, "amount of rows to handle in each iteration (allowed range: 100-100,000)")
flag.StringVar(&migrationContext.ThrottleFlagFile, "throttle-flag-file", "", "operation pauses when this file exists") if migrationContext.ChunkSize < 100 {
migrationContext.ChunkSize = 100
}
if migrationContext.ChunkSize > 100000 {
migrationContext.ChunkSize = 100000
}
flag.Int64Var(&migrationContext.MaxLagMillisecondsThrottleThreshold, "max-lag-millis", 1000, "replication lag at which to throttle operation")
flag.StringVar(&migrationContext.ThrottleFlagFile, "throttle-flag-file", "/tmp/gh-osc.throttle", "operation pauses when this file exists")
maxLoad := flag.String("max-load", "", "Comma delimited status-name=threshold. e.g: 'Threads_running=100,Threads_connected=500'")
quiet := flag.Bool("quiet", false, "quiet") quiet := flag.Bool("quiet", false, "quiet")
verbose := flag.Bool("verbose", false, "verbose") verbose := flag.Bool("verbose", false, "verbose")
debug := flag.Bool("debug", false, "debug mode (very verbose)") debug := flag.Bool("debug", false, "debug mode (very verbose)")
@ -75,23 +77,12 @@ func main() {
if migrationContext.AlterStatement == "" { if migrationContext.AlterStatement == "" {
log.Fatalf("--alter must be provided and statement must not be empty") log.Fatalf("--alter must be provided and statement must not be empty")
} }
if err := migrationContext.ReadMaxLoad(*maxLoad); err != nil {
log.Fatale(err)
}
log.Info("starting gh-osc") log.Info("starting gh-osc")
if *internalExperiment {
log.Debug("starting experiment with %+v", *binlogFile)
//binlogReader = binlog.NewMySQLBinlogReader(*mysqlBasedir, *mysqlDatadir)
// binlogReader, err := binlog.NewGoMySQLReader(migrationContext.InspectorConnectionConfig)
// if err != nil {
// log.Fatale(err)
// }
// if err := binlogReader.ConnectBinlogStreamer(mysql.BinlogCoordinates{LogFile: *binlogFile, LogPos: 0}); err != nil {
// log.Fatale(err)
// }
// binlogReader.StreamEvents(func() bool { return false })
// return
}
migrator := logic.NewMigrator() migrator := logic.NewMigrator()
err := migrator.Migrate() err := migrator.Migrate()
if err != nil { if err != nil {

View File

@ -104,10 +104,10 @@ func (this *Applier) AlterGhost() error {
// CreateChangelogTable creates the changelog table on the master // CreateChangelogTable creates the changelog table on the master
func (this *Applier) CreateChangelogTable() error { func (this *Applier) CreateChangelogTable() error {
query := fmt.Sprintf(`create /* gh-osc */ table %s.%s ( query := fmt.Sprintf(`create /* gh-osc */ table %s.%s (
id int auto_increment, id bigint auto_increment,
last_update timestamp not null DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, last_update timestamp not null DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
hint varchar(64) charset ascii not null, hint varchar(64) charset ascii not null,
value varchar(64) charset ascii not null, value varchar(255) charset ascii not null,
primary key(id), primary key(id),
unique key hint_uidx(hint) unique key hint_uidx(hint)
) auto_increment=2 ) auto_increment=2
@ -162,6 +162,12 @@ func (this *Applier) WriteChangelog(hint, value string) (string, error) {
return hint, err return hint, err
} }
func (this *Applier) WriteChangelogState(value string) (string, error) {
hint := "state"
this.WriteChangelog(hint, value)
return this.WriteChangelog(fmt.Sprintf("%s at %d", hint, time.Now().UnixNano()), value)
}
// InitiateHeartbeat creates a heartbeat cycle, writing to the changelog table. // InitiateHeartbeat creates a heartbeat cycle, writing to the changelog table.
// This is done asynchronously // This is done asynchronously
func (this *Applier) InitiateHeartbeat() { func (this *Applier) InitiateHeartbeat() {
@ -315,7 +321,7 @@ func (this *Applier) CalculateNextIterationRangeEndValues() (hasFurtherRange boo
this.migrationContext.MigrationIterationRangeMinValues.AbstractValues(), this.migrationContext.MigrationIterationRangeMinValues.AbstractValues(),
this.migrationContext.MigrationRangeMaxValues.AbstractValues(), this.migrationContext.MigrationRangeMaxValues.AbstractValues(),
this.migrationContext.ChunkSize, this.migrationContext.ChunkSize,
fmt.Sprintf("iteration:%d", this.migrationContext.Iteration), fmt.Sprintf("iteration:%d", this.migrationContext.GetIteration()),
) )
if err != nil { if err != nil {
return hasFurtherRange, err return hasFurtherRange, err
@ -336,13 +342,13 @@ func (this *Applier) CalculateNextIterationRangeEndValues() (hasFurtherRange boo
return hasFurtherRange, nil return hasFurtherRange, nil
} }
this.migrationContext.MigrationIterationRangeMaxValues = iterationRangeMaxValues this.migrationContext.MigrationIterationRangeMaxValues = iterationRangeMaxValues
log.Debugf( // log.Debugf(
"column values: [%s]..[%s]; iteration: %d; chunk-size: %d", // "column values: [%s]..[%s]; iteration: %d; chunk-size: %d",
this.migrationContext.MigrationIterationRangeMinValues, // this.migrationContext.MigrationIterationRangeMinValues,
this.migrationContext.MigrationIterationRangeMaxValues, // this.migrationContext.MigrationIterationRangeMaxValues,
this.migrationContext.Iteration, // this.migrationContext.GetIteration(),
this.migrationContext.ChunkSize, // this.migrationContext.ChunkSize,
) // )
return hasFurtherRange, nil return hasFurtherRange, nil
} }
@ -354,12 +360,12 @@ func (this *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected
this.migrationContext.DatabaseName, this.migrationContext.DatabaseName,
this.migrationContext.OriginalTableName, this.migrationContext.OriginalTableName,
this.migrationContext.GetGhostTableName(), this.migrationContext.GetGhostTableName(),
this.migrationContext.UniqueKey.Columns, this.migrationContext.SharedColumns,
this.migrationContext.UniqueKey.Name, this.migrationContext.UniqueKey.Name,
this.migrationContext.UniqueKey.Columns, this.migrationContext.UniqueKey.Columns,
this.migrationContext.MigrationIterationRangeMinValues.AbstractValues(), this.migrationContext.MigrationIterationRangeMinValues.AbstractValues(),
this.migrationContext.MigrationIterationRangeMaxValues.AbstractValues(), this.migrationContext.MigrationIterationRangeMaxValues.AbstractValues(),
this.migrationContext.Iteration == 0, this.migrationContext.GetIteration() == 0,
this.migrationContext.IsTransactionalTable(), this.migrationContext.IsTransactionalTable(),
) )
if err != nil { if err != nil {
@ -371,15 +377,11 @@ func (this *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected
} }
rowsAffected, _ = sqlResult.RowsAffected() rowsAffected, _ = sqlResult.RowsAffected()
duration = time.Now().Sub(startTime) duration = time.Now().Sub(startTime)
this.WriteChangelog(
fmt.Sprintf("copy iteration %d", this.migrationContext.Iteration),
fmt.Sprintf("chunk: %d; affected: %d; duration: %d", chunkSize, rowsAffected, duration),
)
log.Debugf( log.Debugf(
"Issued INSERT on range: [%s]..[%s]; iteration: %d; chunk-size: %d", "Issued INSERT on range: [%s]..[%s]; iteration: %d; chunk-size: %d",
this.migrationContext.MigrationIterationRangeMinValues, this.migrationContext.MigrationIterationRangeMinValues,
this.migrationContext.MigrationIterationRangeMaxValues, this.migrationContext.MigrationIterationRangeMaxValues,
this.migrationContext.Iteration, this.migrationContext.GetIteration(),
chunkSize) chunkSize)
return chunkSize, rowsAffected, duration, nil return chunkSize, rowsAffected, duration, nil
} }
@ -412,3 +414,11 @@ func (this *Applier) UnlockTables() error {
log.Infof("Tables unlocked") log.Infof("Tables unlocked")
return nil return nil
} }
func (this *Applier) ShowStatusVariable(variableName string) (result int64, err error) {
query := fmt.Sprintf(`show global status like '%s'`, variableName)
if err := this.db.QueryRow(query).Scan(&variableName, &result); err != nil {
return 0, err
}
return result, nil
}

View File

@ -69,15 +69,50 @@ func (this *Inspector) ValidateOriginalTable() (err error) {
return nil return nil
} }
func (this *Inspector) InspectOriginalTable() (uniqueKeys [](*sql.UniqueKey), err error) { func (this *Inspector) InspectTableColumnsAndUniqueKeys(tableName string) (columns sql.ColumnList, uniqueKeys [](*sql.UniqueKey), err error) {
uniqueKeys, err = this.getCandidateUniqueKeys(this.migrationContext.OriginalTableName) uniqueKeys, err = this.getCandidateUniqueKeys(tableName)
if err != nil { if err != nil {
return uniqueKeys, err return columns, uniqueKeys, err
} }
if len(uniqueKeys) == 0 { if len(uniqueKeys) == 0 {
return uniqueKeys, fmt.Errorf("No PRIMARY nor UNIQUE key found in table! Bailing out") return columns, uniqueKeys, fmt.Errorf("No PRIMARY nor UNIQUE key found in table! Bailing out")
} }
return uniqueKeys, err columns, err = this.getTableColumns(this.migrationContext.DatabaseName, tableName)
if err != nil {
return columns, uniqueKeys, err
}
return columns, uniqueKeys, nil
}
func (this *Inspector) InspectOriginalTable() (err error) {
this.migrationContext.OriginalTableColumns, this.migrationContext.OriginalTableUniqueKeys, err = this.InspectTableColumnsAndUniqueKeys(this.migrationContext.OriginalTableName)
if err == nil {
return err
}
this.migrationContext.OriginalTableColumnsMap = sql.NewColumnsMap(this.migrationContext.OriginalTableColumns)
return nil
}
func (this *Inspector) InspectOriginalAndGhostTables() (err error) {
this.migrationContext.GhostTableColumns, this.migrationContext.GhostTableUniqueKeys, err = this.InspectTableColumnsAndUniqueKeys(this.migrationContext.GetGhostTableName())
if err != nil {
return err
}
sharedUniqueKeys, err := this.getSharedUniqueKeys(this.migrationContext.OriginalTableUniqueKeys, this.migrationContext.GhostTableUniqueKeys)
if err != nil {
return err
}
if len(sharedUniqueKeys) == 0 {
return fmt.Errorf("No shared unique key can be found after ALTER! Bailing out")
}
this.migrationContext.UniqueKey = sharedUniqueKeys[0]
log.Infof("Chosen shared unique key is %s", this.migrationContext.UniqueKey.Name)
this.migrationContext.SharedColumns = this.getSharedColumns(this.migrationContext.OriginalTableColumns, this.migrationContext.GhostTableColumns)
log.Infof("Shared columns are %s", this.migrationContext.SharedColumns)
// By fact that a non-empty unique key exists we also know the shared columns are non-empty
return nil
} }
// validateConnection issues a simple can-connect to MySQL // validateConnection issues a simple can-connect to MySQL
@ -361,17 +396,9 @@ func (this *Inspector) getCandidateUniqueKeys(tableName string) (uniqueKeys [](*
return uniqueKeys, nil return uniqueKeys, nil
} }
// getCandidateUniqueKeys investigates a table and returns the list of unique keys // getSharedUniqueKeys returns the intersection of two given unique keys,
// candidate for chunking // testing by list of columns
func (this *Inspector) getSharedUniqueKeys() (uniqueKeys [](*sql.UniqueKey), err error) { func (this *Inspector) getSharedUniqueKeys(originalUniqueKeys, ghostUniqueKeys [](*sql.UniqueKey)) (uniqueKeys [](*sql.UniqueKey), err error) {
originalUniqueKeys, err := this.getCandidateUniqueKeys(this.migrationContext.OriginalTableName)
if err != nil {
return uniqueKeys, err
}
ghostUniqueKeys, err := this.getCandidateUniqueKeys(this.migrationContext.GetGhostTableName())
if err != nil {
return uniqueKeys, err
}
// We actually do NOT rely on key name, just on the set of columns. This is because maybe // We actually do NOT rely on key name, just on the set of columns. This is because maybe
// the ALTER is on the name itself... // the ALTER is on the name itself...
for _, originalUniqueKey := range originalUniqueKeys { for _, originalUniqueKey := range originalUniqueKeys {
@ -384,6 +411,20 @@ func (this *Inspector) getSharedUniqueKeys() (uniqueKeys [](*sql.UniqueKey), err
return uniqueKeys, nil return uniqueKeys, nil
} }
// getSharedColumns returns the intersection of two lists of columns in same order as the first list
func (this *Inspector) getSharedColumns(originalColumns, ghostColumns sql.ColumnList) (sharedColumns sql.ColumnList) {
columnsInGhost := make(map[string]bool)
for _, ghostColumn := range ghostColumns {
columnsInGhost[ghostColumn] = true
}
for _, originalColumn := range originalColumns {
if columnsInGhost[originalColumn] {
sharedColumns = append(sharedColumns, originalColumn)
}
}
return sharedColumns
}
func (this *Inspector) getMasterConnectionConfig() (masterConfig *mysql.ConnectionConfig, err error) { func (this *Inspector) getMasterConnectionConfig() (masterConfig *mysql.ConnectionConfig, err error) {
visitedKeys := mysql.NewInstanceKeyMap() visitedKeys := mysql.NewInstanceKeyMap()
return getMasterConnectionConfigSafe(this.connectionConfig, this.migrationContext.DatabaseName, visitedKeys) return getMasterConnectionConfigSafe(this.connectionConfig, this.migrationContext.DatabaseName, visitedKeys)

View File

@ -8,6 +8,7 @@ package logic
import ( import (
"fmt" "fmt"
"os" "os"
"regexp"
"sync/atomic" "sync/atomic"
"time" "time"
@ -30,6 +31,10 @@ const (
applyEventsQueueBuffer = 100 applyEventsQueueBuffer = 100
) )
var (
prettifyDurationRegexp = regexp.MustCompile("([.][0-9]+)")
)
// Migrator is the main schema migration flow manager. // Migrator is the main schema migration flow manager.
type Migrator struct { type Migrator struct {
inspector *Inspector inspector *Inspector
@ -57,25 +62,94 @@ func NewMigrator() *Migrator {
copyRowsQueue: make(chan tableWriteFunc), copyRowsQueue: make(chan tableWriteFunc),
applyEventsQueue: make(chan tableWriteFunc, applyEventsQueueBuffer), applyEventsQueue: make(chan tableWriteFunc, applyEventsQueueBuffer),
} }
migrator.migrationContext.IsThrottled = func() bool {
return migrator.shouldThrottle()
}
return migrator return migrator
} }
func (this *Migrator) shouldThrottle() bool { func prettifyDurationOutput(d time.Duration) string {
if d < time.Second {
return "0s"
}
result := fmt.Sprintf("%s", d)
result = prettifyDurationRegexp.ReplaceAllString(result, "")
return result
}
func (this *Migrator) shouldThrottle() (result bool, reason string) {
lag := atomic.LoadInt64(&this.migrationContext.CurrentLag) lag := atomic.LoadInt64(&this.migrationContext.CurrentLag)
shouldThrottle := false
if time.Duration(lag) > time.Duration(this.migrationContext.MaxLagMillisecondsThrottleThreshold)*time.Millisecond { if time.Duration(lag) > time.Duration(this.migrationContext.MaxLagMillisecondsThrottleThreshold)*time.Millisecond {
shouldThrottle = true return true, fmt.Sprintf("lag=%fs", time.Duration(lag).Seconds())
} else if this.migrationContext.ThrottleFlagFile != "" { }
if this.migrationContext.ThrottleFlagFile != "" {
if _, err := os.Stat(this.migrationContext.ThrottleFlagFile); err == nil { if _, err := os.Stat(this.migrationContext.ThrottleFlagFile); err == nil {
//Throttle file defined and exists! //Throttle file defined and exists!
shouldThrottle = true return true, "flag-file"
} }
} }
return shouldThrottle
for variableName, threshold := range this.migrationContext.MaxLoad {
value, err := this.applier.ShowStatusVariable(variableName)
if err != nil {
return true, fmt.Sprintf("%s %s", variableName, err)
}
if value > threshold {
return true, fmt.Sprintf("%s=%d", variableName, value)
}
}
return false, ""
}
// throttle initiates a throttling event, if need be, updates the Context and
// calls callback functions, if any
func (this *Migrator) throttle(
onStartThrottling func(),
onContinuousThrottling func(),
onEndThrottling func(),
) {
hasThrottledYet := false
for {
shouldThrottle, reason := this.shouldThrottle()
if !shouldThrottle {
break
}
this.migrationContext.ThrottleReason = reason
if !hasThrottledYet {
hasThrottledYet = true
if onStartThrottling != nil {
onStartThrottling()
}
this.migrationContext.SetThrottled(true)
}
time.Sleep(time.Second)
if onContinuousThrottling != nil {
onContinuousThrottling()
}
}
if hasThrottledYet {
if onEndThrottling != nil {
onEndThrottling()
}
this.migrationContext.SetThrottled(false)
}
}
// retryOperation attempts up to `count` attempts at running given function,
// exiting as soon as it returns with non-error.
func (this *Migrator) retryOperation(operation func() error) (err error) {
maxRetries := this.migrationContext.MaxRetries()
for i := 0; i < maxRetries; i++ {
if i != 0 {
// sleep after previous iteration
time.Sleep(1 * time.Second)
}
err = operation()
if err == nil {
return nil
}
// there's an error. Let's try again.
}
return err
} }
func (this *Migrator) canStopStreaming() bool { func (this *Migrator) canStopStreaming() bool {
@ -102,7 +176,7 @@ func (this *Migrator) onChangelogStateEvent(dmlEvent *binlog.BinlogDMLEvent) (er
return fmt.Errorf("Unknown changelog state: %+v", changelogState) return fmt.Errorf("Unknown changelog state: %+v", changelogState)
} }
} }
log.Debugf("---- - - - - - state %+v", changelogState) log.Debugf("Received state %+v", changelogState)
return nil return nil
} }
@ -132,8 +206,7 @@ func (this *Migrator) Migrate() (err error) {
if err := this.inspector.ValidateOriginalTable(); err != nil { if err := this.inspector.ValidateOriginalTable(); err != nil {
return err return err
} }
uniqueKeys, err := this.inspector.InspectOriginalTable() if err := this.inspector.InspectOriginalTable(); err != nil {
if err != nil {
return err return err
} }
// So far so good, table is accessible and valid. // So far so good, table is accessible and valid.
@ -159,22 +232,24 @@ func (this *Migrator) Migrate() (err error) {
// When running on replica, this means the replica has those tables. When running // When running on replica, this means the replica has those tables. When running
// on master this is always true, of course, and yet it also implies this knowledge // on master this is always true, of course, and yet it also implies this knowledge
// is in the binlogs. // is in the binlogs.
if err := this.inspector.InspectOriginalAndGhostTables(); err != nil {
return err
}
this.migrationContext.UniqueKey = uniqueKeys[0] // TODO. Need to wait on replica till the ghost table exists and get shared keys
if err := this.applier.ReadMigrationRangeValues(); err != nil { if err := this.applier.ReadMigrationRangeValues(); err != nil {
return err return err
} }
go this.initiateStatus()
go this.executeWriteFuncs() go this.executeWriteFuncs()
go this.iterateChunks() go this.iterateChunks()
this.migrationContext.RowCopyStartTime = time.Now()
go this.initiateStatus()
log.Debugf("Operating until row copy is complete") log.Debugf("Operating until row copy is complete")
<-this.rowCopyComplete <-this.rowCopyComplete
log.Debugf("Row copy complete") log.Debugf("Row copy complete")
this.printStatus() this.printStatus()
throttleMigration( this.throttle(
this.migrationContext,
func() { func() {
log.Debugf("throttling before LOCK TABLES") log.Debugf("throttling before LOCK TABLES")
}, },
@ -185,7 +260,7 @@ func (this *Migrator) Migrate() (err error) {
) )
// TODO retries!! // TODO retries!!
this.applier.LockTables() this.applier.LockTables()
this.applier.WriteChangelog("state", string(AllEventsUpToLockProcessed)) this.applier.WriteChangelogState(string(AllEventsUpToLockProcessed))
log.Debugf("Waiting for events up to lock") log.Debugf("Waiting for events up to lock")
<-this.allEventsUpToLockProcessed <-this.allEventsUpToLockProcessed
log.Debugf("Done waiting for events up to lock") log.Debugf("Done waiting for events up to lock")
@ -228,10 +303,20 @@ func (this *Migrator) printStatus() {
return return
} }
status := fmt.Sprintf("Copy: %d/%d %.1f%% Backlog: %d/%d Elapsed: %+v(copy), %+v(total) ETA: N/A", eta := "N/A"
if this.migrationContext.IsThrottled() {
eta = fmt.Sprintf("throttled, %s", this.migrationContext.ThrottleReason)
}
status := fmt.Sprintf("Copy: %d/%d %.1f%%; Backlog: %d/%d; Elapsed: %+v(copy), %+v(total); ETA: %s",
totalRowsCopied, rowsEstimate, progressPct, totalRowsCopied, rowsEstimate, progressPct,
len(this.applyEventsQueue), cap(this.applyEventsQueue), len(this.applyEventsQueue), cap(this.applyEventsQueue),
this.migrationContext.ElapsedRowCopyTime(), elapsedTime) prettifyDurationOutput(this.migrationContext.ElapsedRowCopyTime()), prettifyDurationOutput(elapsedTime),
eta,
)
this.applier.WriteChangelog(
fmt.Sprintf("copy iteration %d at %d", this.migrationContext.GetIteration(), time.Now().Unix()),
status,
)
fmt.Println(status) fmt.Println(status)
} }
@ -281,13 +366,12 @@ func (this *Migrator) initiateApplier() error {
return err return err
} }
this.applier.WriteChangelog("state", string(TablesInPlace)) this.applier.WriteChangelogState(string(TablesInPlace))
this.applier.InitiateHeartbeat() this.applier.InitiateHeartbeat()
return nil return nil
} }
func (this *Migrator) iterateChunks() error { func (this *Migrator) iterateChunks() error {
this.migrationContext.RowCopyStartTime = time.Now()
terminateRowIteration := func(err error) error { terminateRowIteration := func(err error) error {
this.rowCopyComplete <- true this.rowCopyComplete <- true
return log.Errore(err) return log.Errore(err)
@ -306,7 +390,7 @@ func (this *Migrator) iterateChunks() error {
return terminateRowIteration(err) return terminateRowIteration(err)
} }
atomic.AddInt64(&this.migrationContext.TotalRowsCopied, rowsAffected) atomic.AddInt64(&this.migrationContext.TotalRowsCopied, rowsAffected)
this.migrationContext.Iteration++ atomic.AddInt64(&this.migrationContext.Iteration, 1)
return nil return nil
} }
this.copyRowsQueue <- copyRowsFunc this.copyRowsQueue <- copyRowsFunc
@ -316,8 +400,7 @@ func (this *Migrator) iterateChunks() error {
func (this *Migrator) executeWriteFuncs() error { func (this *Migrator) executeWriteFuncs() error {
for { for {
throttleMigration( this.throttle(
this.migrationContext,
func() { func() {
log.Debugf("throttling writes") log.Debugf("throttling writes")
}, },
@ -331,14 +414,18 @@ func (this *Migrator) executeWriteFuncs() error {
select { select {
case applyEventFunc := <-this.applyEventsQueue: case applyEventFunc := <-this.applyEventsQueue:
{ {
retryOperation(applyEventFunc, this.migrationContext.MaxRetries()) if err := this.retryOperation(applyEventFunc); err != nil {
return log.Errore(err)
}
} }
default: default:
{ {
select { select {
case copyRowsFunc := <-this.copyRowsQueue: case copyRowsFunc := <-this.copyRowsQueue:
{ {
retryOperation(copyRowsFunc, this.migrationContext.MaxRetries()) if err := this.retryOperation(copyRowsFunc); err != nil {
return log.Errore(err)
}
} }
default: default:
{ {

View File

@ -28,6 +28,17 @@ func (this *ColumnList) Equals(other *ColumnList) bool {
return reflect.DeepEqual(*this, *other) return reflect.DeepEqual(*this, *other)
} }
// ColumnsMap maps a column onto its ordinal position
type ColumnsMap map[string]int
func NewColumnsMap(columnList ColumnList) ColumnsMap {
columnsMap := make(map[string]int)
for i, column := range columnList {
columnsMap[column] = i
}
return ColumnsMap(columnsMap)
}
// UniqueKey is the combination of a key's name and columns // UniqueKey is the combination of a key's name and columns
type UniqueKey struct { type UniqueKey struct {
Name string Name string