Merge pull request #51 from github/cut-over
cut-over flag and other stuff
This commit is contained in:
commit
57347ec5b0
2
build.sh
2
build.sh
@ -1,7 +1,7 @@
|
|||||||
#!/bin/bash
|
#!/bin/bash
|
||||||
#
|
#
|
||||||
#
|
#
|
||||||
RELEASE_VERSION="0.8.2"
|
RELEASE_VERSION="0.8.3"
|
||||||
|
|
||||||
buildpath=/tmp/gh-ost
|
buildpath=/tmp/gh-ost
|
||||||
target=gh-ost
|
target=gh-ost
|
||||||
|
@ -28,6 +28,14 @@ const (
|
|||||||
CountRowsEstimate = "CountRowsEstimate"
|
CountRowsEstimate = "CountRowsEstimate"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type CutOver int
|
||||||
|
|
||||||
|
const (
|
||||||
|
CutOverTwoStep CutOver = 1
|
||||||
|
CutOverVoluntaryLock
|
||||||
|
CutOverUdfWait
|
||||||
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
maxRetries = 10
|
maxRetries = 10
|
||||||
)
|
)
|
||||||
@ -63,9 +71,9 @@ type MigrationContext struct {
|
|||||||
Noop bool
|
Noop bool
|
||||||
TestOnReplica bool
|
TestOnReplica bool
|
||||||
OkToDropTable bool
|
OkToDropTable bool
|
||||||
QuickAndBumpySwapTables bool
|
|
||||||
InitiallyDropOldTable bool
|
InitiallyDropOldTable bool
|
||||||
InitiallyDropGhostTable bool
|
InitiallyDropGhostTable bool
|
||||||
|
CutOverType CutOver
|
||||||
|
|
||||||
TableEngine string
|
TableEngine string
|
||||||
RowsEstimate int64
|
RowsEstimate int64
|
||||||
|
@ -59,9 +59,9 @@ func main() {
|
|||||||
executeFlag := flag.Bool("execute", false, "actually execute the alter & migrate the table. Default is noop: do some tests and exit")
|
executeFlag := flag.Bool("execute", false, "actually execute the alter & migrate the table. Default is noop: do some tests and exit")
|
||||||
flag.BoolVar(&migrationContext.TestOnReplica, "test-on-replica", false, "Have the migration run on a replica, not on the master. At the end of migration tables are not swapped; gh-ost issues `STOP SLAVE` and you can compare the two tables for building trust")
|
flag.BoolVar(&migrationContext.TestOnReplica, "test-on-replica", false, "Have the migration run on a replica, not on the master. At the end of migration tables are not swapped; gh-ost issues `STOP SLAVE` and you can compare the two tables for building trust")
|
||||||
flag.BoolVar(&migrationContext.OkToDropTable, "ok-to-drop-table", false, "Shall the tool drop the old table at end of operation. DROPping tables can be a long locking operation, which is why I'm not doing it by default. I'm an online tool, yes?")
|
flag.BoolVar(&migrationContext.OkToDropTable, "ok-to-drop-table", false, "Shall the tool drop the old table at end of operation. DROPping tables can be a long locking operation, which is why I'm not doing it by default. I'm an online tool, yes?")
|
||||||
flag.BoolVar(&migrationContext.QuickAndBumpySwapTables, "quick-and-bumpy-swap-tables", false, "Shall the tool issue a faster swapping of tables at end of operation, at the cost of causing a brief period of time when the table does not exist? This will cause queries on table to fail with error (as opposed to being locked for a longer duration of a swap)")
|
|
||||||
flag.BoolVar(&migrationContext.InitiallyDropOldTable, "initially-drop-old-table", false, "Drop a possibly existing OLD table (remains from a previous run?) before beginning operation. Default is to panic and abort if such table exists")
|
flag.BoolVar(&migrationContext.InitiallyDropOldTable, "initially-drop-old-table", false, "Drop a possibly existing OLD table (remains from a previous run?) before beginning operation. Default is to panic and abort if such table exists")
|
||||||
flag.BoolVar(&migrationContext.InitiallyDropGhostTable, "initially-drop-ghost-table", false, "Drop a possibly existing Ghost table (remains from a previous run?) before beginning operation. Default is to panic and abort if such table exists")
|
flag.BoolVar(&migrationContext.InitiallyDropGhostTable, "initially-drop-ghost-table", false, "Drop a possibly existing Ghost table (remains from a previous run?) before beginning operation. Default is to panic and abort if such table exists")
|
||||||
|
cutOver := flag.String("cut-over", "", "(mandatory) choose cut-over type (two-step, voluntary-lock)")
|
||||||
|
|
||||||
flag.BoolVar(&migrationContext.SwitchToRowBinlogFormat, "switch-to-rbr", false, "let this tool automatically switch binary log format to 'ROW' on the replica, if needed. The format will NOT be switched back. I'm too scared to do that, and wish to protect you if you happen to execute another migration while this one is running")
|
flag.BoolVar(&migrationContext.SwitchToRowBinlogFormat, "switch-to-rbr", false, "let this tool automatically switch binary log format to 'ROW' on the replica, if needed. The format will NOT be switched back. I'm too scared to do that, and wish to protect you if you happen to execute another migration while this one is running")
|
||||||
flag.Int64Var(&migrationContext.ChunkSize, "chunk-size", 1000, "amount of rows to handle in each iteration (allowed range: 100-100,000)")
|
flag.Int64Var(&migrationContext.ChunkSize, "chunk-size", 1000, "amount of rows to handle in each iteration (allowed range: 100-100,000)")
|
||||||
@ -129,8 +129,15 @@ func main() {
|
|||||||
if migrationContext.AllowedRunningOnMaster && migrationContext.TestOnReplica {
|
if migrationContext.AllowedRunningOnMaster && migrationContext.TestOnReplica {
|
||||||
log.Fatalf("--allow-on-master and --test-on-replica are mutually exclusive")
|
log.Fatalf("--allow-on-master and --test-on-replica are mutually exclusive")
|
||||||
}
|
}
|
||||||
if migrationContext.QuickAndBumpySwapTables && migrationContext.TestOnReplica {
|
switch *cutOver {
|
||||||
log.Fatalf("--quick-and-bumpy-swap-tables and --test-on-replica are mutually exclusive (the former implies migrating on master)")
|
case "two-step":
|
||||||
|
migrationContext.CutOverType = base.CutOverTwoStep
|
||||||
|
case "voluntary-lock":
|
||||||
|
migrationContext.CutOverType = base.CutOverVoluntaryLock
|
||||||
|
case "":
|
||||||
|
log.Fatalf("--cut-over must be specified")
|
||||||
|
default:
|
||||||
|
log.Fatalf("Unknown cut-over: %s", *cutOver)
|
||||||
}
|
}
|
||||||
if err := migrationContext.ReadConfigFile(); err != nil {
|
if err := migrationContext.ReadConfigFile(); err != nil {
|
||||||
log.Fatale(err)
|
log.Fatale(err)
|
||||||
|
@ -63,15 +63,9 @@ func (this *Inspector) ValidateOriginalTable() (err error) {
|
|||||||
if err := this.validateTableForeignKeys(); err != nil {
|
if err := this.validateTableForeignKeys(); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if this.migrationContext.CountTableRows {
|
|
||||||
if err := this.countTableRows(); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
if err := this.estimateTableRowsViaExplain(); err != nil {
|
if err := this.estimateTableRowsViaExplain(); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -99,6 +93,8 @@ func (this *Inspector) InspectOriginalTable() (err error) {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// InspectOriginalAndGhostTables compares original and ghost tables to see whether the migration
|
||||||
|
// makes sense and is valid. It extracts the list of shared columns and the chosen migration unique key
|
||||||
func (this *Inspector) InspectOriginalAndGhostTables() (err error) {
|
func (this *Inspector) InspectOriginalAndGhostTables() (err error) {
|
||||||
this.migrationContext.GhostTableColumns, this.migrationContext.GhostTableUniqueKeys, err = this.InspectTableColumnsAndUniqueKeys(this.migrationContext.GetGhostTableName())
|
this.migrationContext.GhostTableColumns, this.migrationContext.GhostTableUniqueKeys, err = this.InspectTableColumnsAndUniqueKeys(this.migrationContext.GetGhostTableName())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -353,7 +349,7 @@ func (this *Inspector) estimateTableRowsViaExplain() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (this *Inspector) countTableRows() error {
|
func (this *Inspector) CountTableRows() error {
|
||||||
log.Infof("As instructed, I'm issuing a SELECT COUNT(*) on the table. This may take a while")
|
log.Infof("As instructed, I'm issuing a SELECT COUNT(*) on the table. This may take a while")
|
||||||
query := fmt.Sprintf(`select /* gh-ost */ count(*) as rows from %s.%s`, sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.OriginalTableName))
|
query := fmt.Sprintf(`select /* gh-ost */ count(*) as rows from %s.%s`, sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.OriginalTableName))
|
||||||
if err := this.db.QueryRow(query).Scan(&this.migrationContext.RowsEstimate); err != nil {
|
if err := this.db.QueryRow(query).Scan(&this.migrationContext.RowsEstimate); err != nil {
|
||||||
|
@ -51,6 +51,7 @@ type Migrator struct {
|
|||||||
|
|
||||||
rowCopyCompleteFlag int64
|
rowCopyCompleteFlag int64
|
||||||
allEventsUpToLockProcessedInjectedFlag int64
|
allEventsUpToLockProcessedInjectedFlag int64
|
||||||
|
cleanupImminentFlag int64
|
||||||
// copyRowsQueue should not be buffered; if buffered some non-damaging but
|
// copyRowsQueue should not be buffered; if buffered some non-damaging but
|
||||||
// excessive work happens at the end of the iteration as new copy-jobs arrive befroe realizing the copy is complete
|
// excessive work happens at the end of the iteration as new copy-jobs arrive befroe realizing the copy is complete
|
||||||
copyRowsQueue chan tableWriteFunc
|
copyRowsQueue chan tableWriteFunc
|
||||||
@ -312,6 +313,14 @@ func (this *Migrator) Migrate() (err error) {
|
|||||||
if err := this.inspector.InspectOriginalAndGhostTables(); err != nil {
|
if err := this.inspector.InspectOriginalAndGhostTables(); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
if this.migrationContext.CountTableRows {
|
||||||
|
if this.migrationContext.Noop {
|
||||||
|
log.Debugf("Noop operation; not really counting table rows")
|
||||||
|
} else if err := this.inspector.CountTableRows(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if err := this.addDMLEventsListener(); err != nil {
|
if err := this.addDMLEventsListener(); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -371,7 +380,7 @@ func (this *Migrator) stopWritesAndCompleteMigration() (err error) {
|
|||||||
return this.stopWritesAndCompleteMigrationOnReplica()
|
return this.stopWritesAndCompleteMigrationOnReplica()
|
||||||
}
|
}
|
||||||
// Running on master
|
// Running on master
|
||||||
if this.migrationContext.QuickAndBumpySwapTables {
|
if this.migrationContext.CutOverType == base.CutOverTwoStep {
|
||||||
return this.stopWritesAndCompleteMigrationOnMasterQuickAndBumpy()
|
return this.stopWritesAndCompleteMigrationOnMasterQuickAndBumpy()
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -555,14 +564,30 @@ func (this *Migrator) initiateStatus() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (this *Migrator) printMigrationStatusHint() {
|
func (this *Migrator) printMigrationStatusHint() {
|
||||||
hint := fmt.Sprintf("# Migrating %s.%s; Ghost table is %s.%s; migration started at %+v",
|
fmt.Println(fmt.Sprintf("# Migrating %s.%s; Ghost table is %s.%s",
|
||||||
sql.EscapeName(this.migrationContext.DatabaseName),
|
sql.EscapeName(this.migrationContext.DatabaseName),
|
||||||
sql.EscapeName(this.migrationContext.OriginalTableName),
|
sql.EscapeName(this.migrationContext.OriginalTableName),
|
||||||
sql.EscapeName(this.migrationContext.DatabaseName),
|
sql.EscapeName(this.migrationContext.DatabaseName),
|
||||||
sql.EscapeName(this.migrationContext.GetGhostTableName()),
|
sql.EscapeName(this.migrationContext.GetGhostTableName()),
|
||||||
|
))
|
||||||
|
fmt.Println(fmt.Sprintf("# Migration started at %+v",
|
||||||
this.migrationContext.StartTime.Format(time.RubyDate),
|
this.migrationContext.StartTime.Format(time.RubyDate),
|
||||||
)
|
))
|
||||||
fmt.Println(hint)
|
fmt.Println(fmt.Sprintf("# chunk-size: %+v; max lag: %+vms; max-load: %+v",
|
||||||
|
atomic.LoadInt64(&this.migrationContext.ChunkSize),
|
||||||
|
atomic.LoadInt64(&this.migrationContext.MaxLagMillisecondsThrottleThreshold),
|
||||||
|
this.migrationContext.MaxLoad,
|
||||||
|
))
|
||||||
|
if this.migrationContext.ThrottleFlagFile != "" {
|
||||||
|
fmt.Println(fmt.Sprintf("# Throttle flag file: %+v",
|
||||||
|
this.migrationContext.ThrottleFlagFile,
|
||||||
|
))
|
||||||
|
}
|
||||||
|
if this.migrationContext.ThrottleAdditionalFlagFile != "" {
|
||||||
|
fmt.Println(fmt.Sprintf("# Throttle additional flag file: %+v",
|
||||||
|
this.migrationContext.ThrottleAdditionalFlagFile,
|
||||||
|
))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (this *Migrator) printStatus() {
|
func (this *Migrator) printStatus() {
|
||||||
@ -638,6 +663,9 @@ func (this *Migrator) initiateHeartbeatListener() {
|
|||||||
ticker := time.Tick((heartbeatIntervalMilliseconds * time.Millisecond) / 2)
|
ticker := time.Tick((heartbeatIntervalMilliseconds * time.Millisecond) / 2)
|
||||||
for range ticker {
|
for range ticker {
|
||||||
go func() error {
|
go func() error {
|
||||||
|
if atomic.LoadInt64(&this.cleanupImminentFlag) > 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
changelogState, err := this.inspector.readChangelogState()
|
changelogState, err := this.inspector.readChangelogState()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return log.Errore(err)
|
return log.Errore(err)
|
||||||
@ -808,6 +836,7 @@ func (this *Migrator) executeWriteFuncs() error {
|
|||||||
|
|
||||||
// finalCleanup takes actions at very end of migration, dropping tables etc.
|
// finalCleanup takes actions at very end of migration, dropping tables etc.
|
||||||
func (this *Migrator) finalCleanup() error {
|
func (this *Migrator) finalCleanup() error {
|
||||||
|
atomic.StoreInt64(&this.cleanupImminentFlag, 1)
|
||||||
if err := this.retryOperation(this.applier.DropChangelogTable); err != nil {
|
if err := this.retryOperation(this.applier.DropChangelogTable); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user