2016-04-04 10:27:51 +00:00
/ *
Copyright 2016 GitHub Inc .
2016-05-16 09:09:17 +00:00
See https : //github.com/github/gh-ost/blob/master/LICENSE
2016-04-04 10:27:51 +00:00
* /
package logic
import (
2016-06-07 09:59:17 +00:00
"bufio"
2016-04-04 13:29:02 +00:00
"fmt"
2016-06-07 09:59:17 +00:00
"io"
2016-05-19 13:11:36 +00:00
"math"
2016-04-08 08:34:44 +00:00
"os"
2016-04-11 15:27:16 +00:00
"os/signal"
2016-06-07 09:59:17 +00:00
"strconv"
"strings"
2016-04-07 13:57:12 +00:00
"sync/atomic"
2016-04-11 15:27:16 +00:00
"syscall"
2016-04-07 13:57:12 +00:00
"time"
2016-04-04 13:29:02 +00:00
2016-05-16 09:09:17 +00:00
"github.com/github/gh-ost/go/base"
"github.com/github/gh-ost/go/binlog"
"github.com/github/gh-ost/go/mysql"
"github.com/github/gh-ost/go/sql"
2016-04-04 13:29:02 +00:00
"github.com/outbrain/golib/log"
2016-04-04 10:27:51 +00:00
)
2016-04-08 08:34:44 +00:00
type ChangelogState string
const (
TablesInPlace ChangelogState = "TablesInPlace"
AllEventsUpToLockProcessed = "AllEventsUpToLockProcessed"
)
type tableWriteFunc func ( ) error
const (
2016-04-14 11:37:56 +00:00
applyEventsQueueBuffer = 100
heartbeatIntervalMilliseconds = 1000
2016-04-08 08:34:44 +00:00
)
2016-06-18 19:12:07 +00:00
type PrintStatusRule int
const (
HeuristicPrintStatusRule PrintStatusRule = iota
ForcePrintStatusRule = iota
ForcePrintStatusAndHint = iota
)
2016-04-04 10:27:51 +00:00
// Migrator is the main schema migration flow manager.
type Migrator struct {
2016-06-17 06:03:18 +00:00
parser * sql . Parser
2016-04-04 10:27:51 +00:00
inspector * Inspector
2016-04-04 13:29:02 +00:00
applier * Applier
2016-04-06 11:05:58 +00:00
eventsStreamer * EventsStreamer
2016-06-07 09:59:17 +00:00
server * Server
2016-04-04 13:29:02 +00:00
migrationContext * base . MigrationContext
2016-06-19 15:55:37 +00:00
hostname string
2016-04-07 13:57:12 +00:00
2016-04-08 08:34:44 +00:00
tablesInPlace chan bool
rowCopyComplete chan bool
allEventsUpToLockProcessed chan bool
2016-04-18 17:57:18 +00:00
panicAbort chan error
2016-04-08 08:34:44 +00:00
2016-05-17 12:40:37 +00:00
rowCopyCompleteFlag int64
allEventsUpToLockProcessedInjectedFlag int64
2016-06-21 07:21:58 +00:00
inCutOverCriticalActionFlag int64
2016-06-06 10:33:05 +00:00
cleanupImminentFlag int64
2016-04-08 08:34:44 +00:00
// copyRowsQueue should not be buffered; if buffered some non-damaging but
// excessive work happens at the end of the iteration as new copy-jobs arrive befroe realizing the copy is complete
copyRowsQueue chan tableWriteFunc
applyEventsQueue chan tableWriteFunc
2016-04-14 11:37:56 +00:00
handledChangelogStates map [ string ] bool
2016-04-04 10:27:51 +00:00
}
2016-04-04 13:29:02 +00:00
func NewMigrator ( ) * Migrator {
2016-04-07 13:57:12 +00:00
migrator := & Migrator {
2016-04-08 08:34:44 +00:00
migrationContext : base . GetMigrationContext ( ) ,
2016-06-17 06:03:18 +00:00
parser : sql . NewParser ( ) ,
2016-04-08 08:34:44 +00:00
tablesInPlace : make ( chan bool ) ,
rowCopyComplete : make ( chan bool ) ,
allEventsUpToLockProcessed : make ( chan bool ) ,
2016-04-18 17:57:18 +00:00
panicAbort : make ( chan error ) ,
2016-04-08 08:34:44 +00:00
2016-05-17 12:40:37 +00:00
allEventsUpToLockProcessedInjectedFlag : 0 ,
2016-05-16 09:03:15 +00:00
2016-04-14 11:37:56 +00:00
copyRowsQueue : make ( chan tableWriteFunc ) ,
applyEventsQueue : make ( chan tableWriteFunc , applyEventsQueueBuffer ) ,
handledChangelogStates : make ( map [ string ] bool ) ,
2016-04-07 13:57:12 +00:00
}
return migrator
}
2016-04-11 15:27:16 +00:00
// acceptSignals registers for OS signals
func ( this * Migrator ) acceptSignals ( ) {
c := make ( chan os . Signal , 1 )
signal . Notify ( c , syscall . SIGHUP )
go func ( ) {
for sig := range c {
switch sig {
case syscall . SIGHUP :
log . Debugf ( "Received SIGHUP. Reloading configuration" )
}
}
} ( )
}
2016-06-19 15:55:37 +00:00
// shouldThrottle performs checks to see whether we should currently be throttling.
// It also checks for critical-load and panic aborts.
2016-04-08 12:35:06 +00:00
func ( this * Migrator ) shouldThrottle ( ) ( result bool , reason string ) {
2016-06-17 09:40:08 +00:00
// Regardless of throttle, we take opportunity to check for panic-abort
if this . migrationContext . PanicFlagFile != "" {
if base . FileExists ( this . migrationContext . PanicFlagFile ) {
this . panicAbort <- fmt . Errorf ( "Found panic-file %s. Aborting without cleanup" , this . migrationContext . PanicFlagFile )
}
}
2016-06-18 19:12:07 +00:00
criticalLoad := this . migrationContext . GetCriticalLoad ( )
for variableName , threshold := range criticalLoad {
value , err := this . applier . ShowStatusVariable ( variableName )
if err != nil {
return true , fmt . Sprintf ( "%s %s" , variableName , err )
}
if value >= threshold {
this . panicAbort <- fmt . Errorf ( "critical-load met: %s=%d, >=%d" , variableName , value , threshold )
}
}
2016-06-17 09:40:08 +00:00
// Back to throttle considerations
2016-05-01 18:36:36 +00:00
// User-based throttle
2016-06-07 09:59:17 +00:00
if atomic . LoadInt64 ( & this . migrationContext . ThrottleCommandedByUser ) > 0 {
return true , "commanded by user"
}
2016-04-08 12:35:06 +00:00
if this . migrationContext . ThrottleFlagFile != "" {
2016-05-17 12:40:37 +00:00
if base . FileExists ( this . migrationContext . ThrottleFlagFile ) {
2016-04-11 15:27:16 +00:00
// Throttle file defined and exists!
return true , "flag-file"
}
}
if this . migrationContext . ThrottleAdditionalFlagFile != "" {
2016-05-17 12:40:37 +00:00
if base . FileExists ( this . migrationContext . ThrottleAdditionalFlagFile ) {
2016-04-11 15:27:16 +00:00
// 2nd Throttle file defined and exists!
2016-04-08 12:35:06 +00:00
return true , "flag-file"
}
}
2016-05-01 18:36:36 +00:00
// Replication lag throttle
lag := atomic . LoadInt64 ( & this . migrationContext . CurrentLag )
if time . Duration ( lag ) > time . Duration ( this . migrationContext . MaxLagMillisecondsThrottleThreshold ) * time . Millisecond {
return true , fmt . Sprintf ( "lag=%fs" , time . Duration ( lag ) . Seconds ( ) )
}
2016-06-15 10:18:59 +00:00
if ( this . migrationContext . TestOnReplica || this . migrationContext . MigrateOnReplica ) && ( atomic . LoadInt64 ( & this . allEventsUpToLockProcessedInjectedFlag ) == 0 ) {
2016-05-01 18:36:36 +00:00
replicationLag , err := mysql . GetMaxReplicationLag ( this . migrationContext . InspectorConnectionConfig , this . migrationContext . ThrottleControlReplicaKeys , this . migrationContext . ReplictionLagQuery )
if err != nil {
return true , err . Error ( )
}
if replicationLag > time . Duration ( this . migrationContext . MaxLagMillisecondsThrottleThreshold ) * time . Millisecond {
return true , fmt . Sprintf ( "replica-lag=%fs" , replicationLag . Seconds ( ) )
}
}
2016-04-08 12:35:06 +00:00
2016-06-09 09:25:01 +00:00
maxLoad := this . migrationContext . GetMaxLoad ( )
for variableName , threshold := range maxLoad {
2016-04-08 12:35:06 +00:00
value , err := this . applier . ShowStatusVariable ( variableName )
if err != nil {
return true , fmt . Sprintf ( "%s %s" , variableName , err )
}
2016-06-18 19:12:07 +00:00
if value >= threshold {
return true , fmt . Sprintf ( "max-load %s=%d >= %d" , variableName , value , threshold )
}
}
if this . migrationContext . GetThrottleQuery ( ) != "" {
if res , _ := this . applier . ExecuteThrottleQuery ( ) ; res > 0 {
return true , "throttle-query"
2016-04-08 08:34:44 +00:00
}
2016-04-07 13:57:12 +00:00
}
2016-04-08 12:35:06 +00:00
return false , ""
}
2016-06-19 15:55:37 +00:00
// initiateThrottler initiates the throttle ticker and sets the basic behavior of throttling.
2016-04-11 15:27:16 +00:00
func ( this * Migrator ) initiateThrottler ( ) error {
throttlerTick := time . Tick ( 1 * time . Second )
throttlerFunction := func ( ) {
alreadyThrottling , currentReason := this . migrationContext . IsThrottled ( )
shouldThrottle , throttleReason := this . shouldThrottle ( )
if shouldThrottle && ! alreadyThrottling {
// New throttling
this . applier . WriteAndLogChangelog ( "throttle" , throttleReason )
} else if shouldThrottle && alreadyThrottling && ( currentReason != throttleReason ) {
// Change of reason
this . applier . WriteAndLogChangelog ( "throttle" , throttleReason )
} else if alreadyThrottling && ! shouldThrottle {
// End of throttling
this . applier . WriteAndLogChangelog ( "throttle" , "done throttling" )
}
this . migrationContext . SetThrottled ( shouldThrottle , throttleReason )
}
throttlerFunction ( )
for range throttlerTick {
throttlerFunction ( )
}
return nil
}
2016-04-08 12:35:06 +00:00
// throttle initiates a throttling event, if need be, updates the Context and
// calls callback functions, if any
2016-04-11 15:27:16 +00:00
func ( this * Migrator ) throttle ( onThrottled func ( ) ) {
2016-04-08 12:35:06 +00:00
for {
2016-04-11 15:27:16 +00:00
if shouldThrottle , _ := this . migrationContext . IsThrottled ( ) ; ! shouldThrottle {
return
2016-04-08 12:35:06 +00:00
}
2016-04-11 15:27:16 +00:00
if onThrottled != nil {
onThrottled ( )
2016-04-08 12:35:06 +00:00
}
time . Sleep ( time . Second )
}
}
2016-05-17 12:40:37 +00:00
// sleepWhileTrue sleeps indefinitely until the given function returns 'false'
// (or fails with error)
func ( this * Migrator ) sleepWhileTrue ( operation func ( ) ( bool , error ) ) error {
for {
shouldSleep , err := operation ( )
if err != nil {
return err
}
if ! shouldSleep {
return nil
}
time . Sleep ( time . Second )
}
}
2016-04-08 12:35:06 +00:00
// retryOperation attempts up to `count` attempts at running given function,
// exiting as soon as it returns with non-error.
func ( this * Migrator ) retryOperation ( operation func ( ) error ) ( err error ) {
2016-06-19 15:55:37 +00:00
maxRetries := int ( this . migrationContext . MaxRetries ( ) )
2016-04-08 12:35:06 +00:00
for i := 0 ; i < maxRetries ; i ++ {
if i != 0 {
// sleep after previous iteration
time . Sleep ( 1 * time . Second )
}
err = operation ( )
if err == nil {
return nil
}
// there's an error. Let's try again.
}
2016-04-18 17:57:18 +00:00
this . panicAbort <- err
2016-04-08 12:35:06 +00:00
return err
2016-04-07 13:57:12 +00:00
}
2016-04-23 02:46:34 +00:00
// executeAndThrottleOnError executes a given function. If it errors, it
// throttles.
func ( this * Migrator ) executeAndThrottleOnError ( operation func ( ) error ) ( err error ) {
if err := operation ( ) ; err != nil {
this . throttle ( nil )
return err
}
return nil
}
2016-05-16 09:03:15 +00:00
// consumeRowCopyComplete blocks on the rowCopyComplete channel once, and then
// consumers and drops any further incoming events that may be left hanging.
func ( this * Migrator ) consumeRowCopyComplete ( ) {
<- this . rowCopyComplete
2016-05-17 12:40:37 +00:00
atomic . StoreInt64 ( & this . rowCopyCompleteFlag , 1 )
2016-06-19 15:55:37 +00:00
this . migrationContext . MarkRowCopyEndTime ( )
2016-05-16 09:03:15 +00:00
go func ( ) {
for <- this . rowCopyComplete {
}
} ( )
}
2016-04-07 13:57:12 +00:00
func ( this * Migrator ) canStopStreaming ( ) bool {
return false
}
2016-06-19 15:55:37 +00:00
// onChangelogStateEvent is called when a binlog event operation on the changelog table is intercepted.
2016-04-18 17:57:18 +00:00
func ( this * Migrator ) onChangelogStateEvent ( dmlEvent * binlog . BinlogDMLEvent ) ( err error ) {
// Hey, I created the changlog table, I know the type of columns it has!
if hint := dmlEvent . NewColumnValues . StringColumn ( 2 ) ; hint != "state" {
return nil
}
changelogState := ChangelogState ( dmlEvent . NewColumnValues . StringColumn ( 3 ) )
switch changelogState {
case TablesInPlace :
{
this . tablesInPlace <- true
}
case AllEventsUpToLockProcessed :
{
2016-05-16 09:03:15 +00:00
applyEventFunc := func ( ) error {
this . allEventsUpToLockProcessed <- true
return nil
}
// at this point we know all events up to lock have been read from the streamer,
// because the streamer works sequentially. So those events are either already handled,
// or have event functions in applyEventsQueue.
// So as not to create a potential deadlock, we write this func to applyEventsQueue
// asynchronously, understanding it doesn't really matter.
go func ( ) {
this . applyEventsQueue <- applyEventFunc
} ( )
2016-04-08 08:34:44 +00:00
}
2016-04-07 13:57:12 +00:00
default :
{
return fmt . Errorf ( "Unknown changelog state: %+v" , changelogState )
}
}
2016-04-08 12:35:06 +00:00
log . Debugf ( "Received state %+v" , changelogState )
2016-04-07 13:57:12 +00:00
return nil
}
2016-06-19 15:55:37 +00:00
// onChangelogHeartbeat is called when a heartbeat event is intercepted
2016-04-14 11:37:56 +00:00
func ( this * Migrator ) onChangelogHeartbeat ( heartbeatValue string ) ( err error ) {
2016-04-18 17:57:18 +00:00
heartbeatTime , err := time . Parse ( time . RFC3339Nano , heartbeatValue )
2016-04-07 13:57:12 +00:00
if err != nil {
return log . Errore ( err )
}
lag := time . Now ( ) . Sub ( heartbeatTime )
atomic . StoreInt64 ( & this . migrationContext . CurrentLag , int64 ( lag ) )
return nil
2016-04-04 10:27:51 +00:00
}
2016-06-19 15:55:37 +00:00
// listenOnPanicAbort aborts on abort request
2016-04-18 17:57:18 +00:00
func ( this * Migrator ) listenOnPanicAbort ( ) {
err := <- this . panicAbort
log . Fatale ( err )
}
2016-06-17 09:40:08 +00:00
2016-06-19 15:55:37 +00:00
// validateStatement validates the `alter` statement meets criteria.
// At this time this means:
// - column renames are approved
2016-06-17 06:03:18 +00:00
func ( this * Migrator ) validateStatement ( ) ( err error ) {
if this . parser . HasNonTrivialRenames ( ) && ! this . migrationContext . SkipRenamedColumns {
this . migrationContext . ColumnRenameMap = this . parser . GetNonTrivialRenames ( )
if ! this . migrationContext . ApproveRenamedColumns {
return fmt . Errorf ( "Alter statement has column(s) renamed. gh-ost suspects the following renames: %v; but to proceed you must approve via `--approve-renamed-columns` (or you can skip renamed columns via `--skip-renamed-columns`)" , this . parser . GetNonTrivialRenames ( ) )
}
2016-06-18 19:12:07 +00:00
log . Infof ( "Alter statement has column(s) renamed. gh-ost finds the following renames: %v; --approve-renamed-columns is given and so migration proceeds." , this . parser . GetNonTrivialRenames ( ) )
2016-06-17 06:03:18 +00:00
}
return nil
}
2016-04-18 17:57:18 +00:00
2016-06-19 15:55:37 +00:00
// Migrate executes the complete migration logic. This is *the* major gh-ost function.
2016-04-04 13:29:02 +00:00
func ( this * Migrator ) Migrate ( ) ( err error ) {
2016-05-19 13:11:36 +00:00
log . Infof ( "Migrating %s.%s" , sql . EscapeName ( this . migrationContext . DatabaseName ) , sql . EscapeName ( this . migrationContext . OriginalTableName ) )
2016-04-08 08:34:44 +00:00
this . migrationContext . StartTime = time . Now ( )
2016-06-19 15:55:37 +00:00
if this . hostname , err = os . Hostname ( ) ; err != nil {
return err
}
2016-04-08 08:34:44 +00:00
2016-04-18 17:57:18 +00:00
go this . listenOnPanicAbort ( )
2016-06-17 06:03:18 +00:00
if err := this . parser . ParseAlterStatement ( this . migrationContext . AlterStatement ) ; err != nil {
return err
}
if err := this . validateStatement ( ) ; err != nil {
return err
}
2016-04-18 17:57:18 +00:00
if err := this . initiateInspector ( ) ; err != nil {
2016-04-04 13:29:02 +00:00
return err
}
2016-04-08 08:34:44 +00:00
if err := this . initiateStreaming ( ) ; err != nil {
return err
}
if err := this . initiateApplier ( ) ; err != nil {
return err
}
log . Debugf ( "Waiting for tables to be in place" )
<- this . tablesInPlace
log . Debugf ( "Tables are in place" )
// Yay! We now know the Ghost and Changelog tables are good to examine!
// When running on replica, this means the replica has those tables. When running
// on master this is always true, of course, and yet it also implies this knowledge
// is in the binlogs.
2016-04-08 12:35:06 +00:00
if err := this . inspector . InspectOriginalAndGhostTables ( ) ; err != nil {
return err
}
2016-06-06 10:33:05 +00:00
if this . migrationContext . CountTableRows {
if this . migrationContext . Noop {
log . Debugf ( "Noop operation; not really counting table rows" )
} else if err := this . inspector . CountTableRows ( ) ; err != nil {
return err
}
}
2016-06-07 09:59:17 +00:00
if err := this . initiateServer ( ) ; err != nil {
return err
}
2016-05-16 09:03:15 +00:00
if err := this . addDMLEventsListener ( ) ; err != nil {
return err
}
2016-04-18 17:57:18 +00:00
go this . initiateHeartbeatListener ( )
2016-04-08 08:34:44 +00:00
if err := this . applier . ReadMigrationRangeValues ( ) ; err != nil {
return err
}
2016-04-11 15:27:16 +00:00
go this . initiateThrottler ( )
2016-04-08 08:34:44 +00:00
go this . executeWriteFuncs ( )
go this . iterateChunks ( )
2016-04-08 12:35:06 +00:00
this . migrationContext . RowCopyStartTime = time . Now ( )
go this . initiateStatus ( )
2016-04-08 08:34:44 +00:00
log . Debugf ( "Operating until row copy is complete" )
2016-05-16 09:03:15 +00:00
this . consumeRowCopyComplete ( )
2016-05-19 13:11:36 +00:00
log . Infof ( "Row copy complete" )
2016-06-18 19:12:07 +00:00
this . printStatus ( ForcePrintStatusRule )
2016-04-08 08:34:44 +00:00
2016-06-15 08:13:06 +00:00
if err := this . cutOver ( ) ; err != nil {
2016-04-23 02:46:34 +00:00
return err
}
2016-04-18 17:57:18 +00:00
2016-06-01 08:40:49 +00:00
if err := this . finalCleanup ( ) ; err != nil {
return nil
}
2016-05-19 13:11:36 +00:00
log . Infof ( "Done migrating %s.%s" , sql . EscapeName ( this . migrationContext . DatabaseName ) , sql . EscapeName ( this . migrationContext . OriginalTableName ) )
2016-04-18 17:57:18 +00:00
return nil
}
2016-06-15 08:13:06 +00:00
// cutOver performs the final step of migration, based on migration
2016-05-01 18:36:36 +00:00
// type (on replica? bumpy? safe?)
2016-06-15 08:13:06 +00:00
func ( this * Migrator ) cutOver ( ) ( err error ) {
2016-04-19 11:25:32 +00:00
if this . migrationContext . Noop {
log . Debugf ( "Noop operation; not really swapping tables" )
return nil
}
Solved cut-over stall; change of table names
- Cutover would stall after `lock tables` wait-timeout due do waiting on a channel that would never be written to. This has been identified, reproduced, fixed, confirmed.
- Change of table names. Heres the story:
- Because were testing this even while `pt-online-schema-change` is being used in production, the `_tbl_old` naming convention makes for a collision.
- "old" table name is now `_tbl_del`, "del" standing for "delete"
- ghost table name is now `_tbl_gho`
- when issuing `--test-on-replica`, we keep the ghost table around, and were also briefly renaming original table to "old". Well this collides with a potentially existing "old" table on master (one that hasnt been dropped yet).
`--test-on-replica` uses `_tbl_ght` (ghost-test)
- similar problem with `--execute-on-replica`, and in this case the table doesnt stick around; calling it `_tbl_ghr` (ghost-replica)
- changelog table is now `_tbl_ghc` (ghost-changelog)
- To clarify, I dont want to go down the path of creating "old" tables with 2 or 3 or 4 or 5 or infinite leading underscored. I think this is very confusing and actually not operations friendly. Its OK that the migration will fail saying "hey, you ALREADY have an old table here, why dont you take care of it first", rather than create _yet_another_ `____tbl_old` table. Were always confused on which table it actually is that gets migrated, which is safe to `drop`, etc.
- just after rowcopy completing, just before cutover, during cutover: marking as point in time _of interest_ so as to increase logging frequency.
2016-06-21 10:56:01 +00:00
this . migrationContext . MarkPointOfInterest ( )
2016-04-11 15:27:16 +00:00
this . throttle ( func ( ) {
2016-04-22 20:18:56 +00:00
log . Debugf ( "throttling before swapping tables" )
2016-04-11 15:27:16 +00:00
} )
2016-04-18 17:57:18 +00:00
Solved cut-over stall; change of table names
- Cutover would stall after `lock tables` wait-timeout due do waiting on a channel that would never be written to. This has been identified, reproduced, fixed, confirmed.
- Change of table names. Heres the story:
- Because were testing this even while `pt-online-schema-change` is being used in production, the `_tbl_old` naming convention makes for a collision.
- "old" table name is now `_tbl_del`, "del" standing for "delete"
- ghost table name is now `_tbl_gho`
- when issuing `--test-on-replica`, we keep the ghost table around, and were also briefly renaming original table to "old". Well this collides with a potentially existing "old" table on master (one that hasnt been dropped yet).
`--test-on-replica` uses `_tbl_ght` (ghost-test)
- similar problem with `--execute-on-replica`, and in this case the table doesnt stick around; calling it `_tbl_ghr` (ghost-replica)
- changelog table is now `_tbl_ghc` (ghost-changelog)
- To clarify, I dont want to go down the path of creating "old" tables with 2 or 3 or 4 or 5 or infinite leading underscored. I think this is very confusing and actually not operations friendly. Its OK that the migration will fail saying "hey, you ALREADY have an old table here, why dont you take care of it first", rather than create _yet_another_ `____tbl_old` table. Were always confused on which table it actually is that gets migrated, which is safe to `drop`, etc.
- just after rowcopy completing, just before cutover, during cutover: marking as point in time _of interest_ so as to increase logging frequency.
2016-06-21 10:56:01 +00:00
this . migrationContext . MarkPointOfInterest ( )
2016-05-17 12:40:37 +00:00
this . sleepWhileTrue (
func ( ) ( bool , error ) {
2016-06-07 12:05:25 +00:00
if this . migrationContext . PostponeCutOverFlagFile == "" {
2016-05-17 12:40:37 +00:00
return false , nil
}
2016-06-07 12:05:25 +00:00
if base . FileExists ( this . migrationContext . PostponeCutOverFlagFile ) {
2016-05-17 12:40:37 +00:00
// Throttle file defined and exists!
2016-06-13 16:36:29 +00:00
atomic . StoreInt64 ( & this . migrationContext . IsPostponingCutOver , 1 )
2016-06-19 15:55:37 +00:00
//log.Debugf("Postponing final table swap as flag file exists: %+v", this.migrationContext.PostponeCutOverFlagFile)
2016-05-17 12:40:37 +00:00
return true , nil
}
return false , nil
} ,
)
2016-06-13 16:36:29 +00:00
atomic . StoreInt64 ( & this . migrationContext . IsPostponingCutOver , 0 )
Solved cut-over stall; change of table names
- Cutover would stall after `lock tables` wait-timeout due do waiting on a channel that would never be written to. This has been identified, reproduced, fixed, confirmed.
- Change of table names. Heres the story:
- Because were testing this even while `pt-online-schema-change` is being used in production, the `_tbl_old` naming convention makes for a collision.
- "old" table name is now `_tbl_del`, "del" standing for "delete"
- ghost table name is now `_tbl_gho`
- when issuing `--test-on-replica`, we keep the ghost table around, and were also briefly renaming original table to "old". Well this collides with a potentially existing "old" table on master (one that hasnt been dropped yet).
`--test-on-replica` uses `_tbl_ght` (ghost-test)
- similar problem with `--execute-on-replica`, and in this case the table doesnt stick around; calling it `_tbl_ghr` (ghost-replica)
- changelog table is now `_tbl_ghc` (ghost-changelog)
- To clarify, I dont want to go down the path of creating "old" tables with 2 or 3 or 4 or 5 or infinite leading underscored. I think this is very confusing and actually not operations friendly. Its OK that the migration will fail saying "hey, you ALREADY have an old table here, why dont you take care of it first", rather than create _yet_another_ `____tbl_old` table. Were always confused on which table it actually is that gets migrated, which is safe to `drop`, etc.
- just after rowcopy completing, just before cutover, during cutover: marking as point in time _of interest_ so as to increase logging frequency.
2016-06-21 10:56:01 +00:00
this . migrationContext . MarkPointOfInterest ( )
2016-05-17 12:40:37 +00:00
2016-04-18 17:57:18 +00:00
if this . migrationContext . TestOnReplica {
2016-06-10 09:15:11 +00:00
// With `--test-on-replica` we stop replication thread, and then proceed to use
// the same cut-over phase as the master would use. That means we take locks
// and swap the tables.
// The difference is that we will later swap the tables back.
log . Debugf ( "testing on replica. Stopping replication IO thread" )
2016-06-19 15:55:37 +00:00
if err := this . retryOperation ( this . applier . StopReplication ) ; err != nil {
2016-06-10 09:15:11 +00:00
return err
}
// We're merly testing, we don't want to keep this state. Rollback the renames as possible
defer this . applier . RenameTablesRollback ( )
2016-04-22 20:18:56 +00:00
}
2016-06-14 07:00:56 +00:00
if this . migrationContext . CutOverType == base . CutOverSafe {
2016-06-14 06:35:07 +00:00
// Lock-based solution: we use low timeout and multiple attempts. But for
// each failed attempt, we throttle until replication lag is back to normal
err := this . retryOperation (
func ( ) error {
return this . executeAndThrottleOnError ( this . safeCutOver )
} ,
)
return err
}
2016-06-06 10:33:05 +00:00
if this . migrationContext . CutOverType == base . CutOverTwoStep {
Solved cut-over stall; change of table names
- Cutover would stall after `lock tables` wait-timeout due do waiting on a channel that would never be written to. This has been identified, reproduced, fixed, confirmed.
- Change of table names. Heres the story:
- Because were testing this even while `pt-online-schema-change` is being used in production, the `_tbl_old` naming convention makes for a collision.
- "old" table name is now `_tbl_del`, "del" standing for "delete"
- ghost table name is now `_tbl_gho`
- when issuing `--test-on-replica`, we keep the ghost table around, and were also briefly renaming original table to "old". Well this collides with a potentially existing "old" table on master (one that hasnt been dropped yet).
`--test-on-replica` uses `_tbl_ght` (ghost-test)
- similar problem with `--execute-on-replica`, and in this case the table doesnt stick around; calling it `_tbl_ghr` (ghost-replica)
- changelog table is now `_tbl_ghc` (ghost-changelog)
- To clarify, I dont want to go down the path of creating "old" tables with 2 or 3 or 4 or 5 or infinite leading underscored. I think this is very confusing and actually not operations friendly. Its OK that the migration will fail saying "hey, you ALREADY have an old table here, why dont you take care of it first", rather than create _yet_another_ `____tbl_old` table. Were always confused on which table it actually is that gets migrated, which is safe to `drop`, etc.
- just after rowcopy completing, just before cutover, during cutover: marking as point in time _of interest_ so as to increase logging frequency.
2016-06-21 10:56:01 +00:00
err := this . retryOperation (
func ( ) error {
return this . executeAndThrottleOnError ( this . cutOverTwoStep )
} ,
)
2016-06-14 07:00:56 +00:00
return err
2016-05-16 09:03:15 +00:00
}
Solved cut-over stall; change of table names
- Cutover would stall after `lock tables` wait-timeout due do waiting on a channel that would never be written to. This has been identified, reproduced, fixed, confirmed.
- Change of table names. Heres the story:
- Because were testing this even while `pt-online-schema-change` is being used in production, the `_tbl_old` naming convention makes for a collision.
- "old" table name is now `_tbl_del`, "del" standing for "delete"
- ghost table name is now `_tbl_gho`
- when issuing `--test-on-replica`, we keep the ghost table around, and were also briefly renaming original table to "old". Well this collides with a potentially existing "old" table on master (one that hasnt been dropped yet).
`--test-on-replica` uses `_tbl_ght` (ghost-test)
- similar problem with `--execute-on-replica`, and in this case the table doesnt stick around; calling it `_tbl_ghr` (ghost-replica)
- changelog table is now `_tbl_ghc` (ghost-changelog)
- To clarify, I dont want to go down the path of creating "old" tables with 2 or 3 or 4 or 5 or infinite leading underscored. I think this is very confusing and actually not operations friendly. Its OK that the migration will fail saying "hey, you ALREADY have an old table here, why dont you take care of it first", rather than create _yet_another_ `____tbl_old` table. Were always confused on which table it actually is that gets migrated, which is safe to `drop`, etc.
- just after rowcopy completing, just before cutover, during cutover: marking as point in time _of interest_ so as to increase logging frequency.
2016-06-21 10:56:01 +00:00
return log . Fatalf ( "Unknown cut-over type: %d; should never get here!" , this . migrationContext . CutOverType )
2016-04-22 20:18:56 +00:00
}
2016-05-16 09:03:15 +00:00
// Inject the "AllEventsUpToLockProcessed" state hint, wait for it to appear in the binary logs,
// make sure the queue is drained.
func ( this * Migrator ) waitForEventsUpToLock ( ) ( err error ) {
Solved cut-over stall; change of table names
- Cutover would stall after `lock tables` wait-timeout due do waiting on a channel that would never be written to. This has been identified, reproduced, fixed, confirmed.
- Change of table names. Heres the story:
- Because were testing this even while `pt-online-schema-change` is being used in production, the `_tbl_old` naming convention makes for a collision.
- "old" table name is now `_tbl_del`, "del" standing for "delete"
- ghost table name is now `_tbl_gho`
- when issuing `--test-on-replica`, we keep the ghost table around, and were also briefly renaming original table to "old". Well this collides with a potentially existing "old" table on master (one that hasnt been dropped yet).
`--test-on-replica` uses `_tbl_ght` (ghost-test)
- similar problem with `--execute-on-replica`, and in this case the table doesnt stick around; calling it `_tbl_ghr` (ghost-replica)
- changelog table is now `_tbl_ghc` (ghost-changelog)
- To clarify, I dont want to go down the path of creating "old" tables with 2 or 3 or 4 or 5 or infinite leading underscored. I think this is very confusing and actually not operations friendly. Its OK that the migration will fail saying "hey, you ALREADY have an old table here, why dont you take care of it first", rather than create _yet_another_ `____tbl_old` table. Were always confused on which table it actually is that gets migrated, which is safe to `drop`, etc.
- just after rowcopy completing, just before cutover, during cutover: marking as point in time _of interest_ so as to increase logging frequency.
2016-06-21 10:56:01 +00:00
this . migrationContext . MarkPointOfInterest ( )
2016-06-14 06:35:07 +00:00
waitForEventsUpToLockStartTime := time . Now ( )
2016-05-19 13:11:36 +00:00
log . Infof ( "Writing changelog state: %+v" , AllEventsUpToLockProcessed )
2016-05-16 09:03:15 +00:00
if _ , err := this . applier . WriteChangelogState ( string ( AllEventsUpToLockProcessed ) ) ; err != nil {
return err
}
2016-05-19 13:11:36 +00:00
log . Infof ( "Waiting for events up to lock" )
2016-05-17 12:40:37 +00:00
atomic . StoreInt64 ( & this . allEventsUpToLockProcessedInjectedFlag , 1 )
2016-05-16 09:03:15 +00:00
<- this . allEventsUpToLockProcessed
2016-06-14 06:35:07 +00:00
waitForEventsUpToLockDuration := time . Now ( ) . Sub ( waitForEventsUpToLockStartTime )
log . Infof ( "Done waiting for events up to lock; duration=%+v" , waitForEventsUpToLockDuration )
2016-06-18 19:12:07 +00:00
this . printStatus ( ForcePrintStatusAndHint )
2016-05-16 09:03:15 +00:00
return nil
}
2016-06-14 07:00:56 +00:00
// cutOverTwoStep will lock down the original table, execute
2016-05-01 18:36:36 +00:00
// what's left of last DML entries, and **non-atomically** swap original->old, then new->original.
// There is a point in time where the "original" table does not exist and queries are non-blocked
// and failing.
2016-06-14 07:00:56 +00:00
func ( this * Migrator ) cutOverTwoStep ( ) ( err error ) {
2016-06-21 07:21:58 +00:00
atomic . StoreInt64 ( & this . inCutOverCriticalActionFlag , 1 )
defer atomic . StoreInt64 ( & this . inCutOverCriticalActionFlag , 0 )
2016-06-14 06:35:07 +00:00
if err := this . retryOperation ( this . applier . LockOriginalTable ) ; err != nil {
2016-04-22 20:18:56 +00:00
return err
}
2016-05-16 09:03:15 +00:00
if err := this . retryOperation ( this . waitForEventsUpToLock ) ; err != nil {
return err
}
2016-04-22 20:41:20 +00:00
if err := this . retryOperation ( this . applier . SwapTablesQuickAndBumpy ) ; err != nil {
2016-04-22 20:18:56 +00:00
return err
}
if err := this . retryOperation ( this . applier . UnlockTables ) ; err != nil {
return err
}
lockAndRenameDuration := this . migrationContext . RenameTablesEndTime . Sub ( this . migrationContext . LockTablesStartTime )
renameDuration := this . migrationContext . RenameTablesEndTime . Sub ( this . migrationContext . RenameTablesStartTime )
log . Debugf ( "Lock & rename duration: %s (rename only: %s). During this time, queries on %s were locked or failing" , lockAndRenameDuration , renameDuration , sql . EscapeName ( this . migrationContext . OriginalTableName ) )
2016-04-18 17:57:18 +00:00
return nil
}
2016-06-14 06:35:07 +00:00
// cutOverSafe performs a safe cut over, where normally (no failure) the original table
// is being locked until swapped, hence DML queries being locked and unaware of the cut-over.
// In the worst case, there will ba a minor outage, where the original table would not exist.
func ( this * Migrator ) safeCutOver ( ) ( err error ) {
2016-06-21 07:21:58 +00:00
atomic . StoreInt64 ( & this . inCutOverCriticalActionFlag , 1 )
defer atomic . StoreInt64 ( & this . inCutOverCriticalActionFlag , 0 )
2016-06-14 06:35:07 +00:00
okToUnlockTable := make ( chan bool , 2 )
originalTableRenamed := make ( chan error , 1 )
Solved cut-over stall; change of table names
- Cutover would stall after `lock tables` wait-timeout due do waiting on a channel that would never be written to. This has been identified, reproduced, fixed, confirmed.
- Change of table names. Heres the story:
- Because were testing this even while `pt-online-schema-change` is being used in production, the `_tbl_old` naming convention makes for a collision.
- "old" table name is now `_tbl_del`, "del" standing for "delete"
- ghost table name is now `_tbl_gho`
- when issuing `--test-on-replica`, we keep the ghost table around, and were also briefly renaming original table to "old". Well this collides with a potentially existing "old" table on master (one that hasnt been dropped yet).
`--test-on-replica` uses `_tbl_ght` (ghost-test)
- similar problem with `--execute-on-replica`, and in this case the table doesnt stick around; calling it `_tbl_ghr` (ghost-replica)
- changelog table is now `_tbl_ghc` (ghost-changelog)
- To clarify, I dont want to go down the path of creating "old" tables with 2 or 3 or 4 or 5 or infinite leading underscored. I think this is very confusing and actually not operations friendly. Its OK that the migration will fail saying "hey, you ALREADY have an old table here, why dont you take care of it first", rather than create _yet_another_ `____tbl_old` table. Were always confused on which table it actually is that gets migrated, which is safe to `drop`, etc.
- just after rowcopy completing, just before cutover, during cutover: marking as point in time _of interest_ so as to increase logging frequency.
2016-06-21 10:56:01 +00:00
var originalTableRenameIntended int64
2016-06-14 06:35:07 +00:00
defer func ( ) {
Solved cut-over stall; change of table names
- Cutover would stall after `lock tables` wait-timeout due do waiting on a channel that would never be written to. This has been identified, reproduced, fixed, confirmed.
- Change of table names. Heres the story:
- Because were testing this even while `pt-online-schema-change` is being used in production, the `_tbl_old` naming convention makes for a collision.
- "old" table name is now `_tbl_del`, "del" standing for "delete"
- ghost table name is now `_tbl_gho`
- when issuing `--test-on-replica`, we keep the ghost table around, and were also briefly renaming original table to "old". Well this collides with a potentially existing "old" table on master (one that hasnt been dropped yet).
`--test-on-replica` uses `_tbl_ght` (ghost-test)
- similar problem with `--execute-on-replica`, and in this case the table doesnt stick around; calling it `_tbl_ghr` (ghost-replica)
- changelog table is now `_tbl_ghc` (ghost-changelog)
- To clarify, I dont want to go down the path of creating "old" tables with 2 or 3 or 4 or 5 or infinite leading underscored. I think this is very confusing and actually not operations friendly. Its OK that the migration will fail saying "hey, you ALREADY have an old table here, why dont you take care of it first", rather than create _yet_another_ `____tbl_old` table. Were always confused on which table it actually is that gets migrated, which is safe to `drop`, etc.
- just after rowcopy completing, just before cutover, during cutover: marking as point in time _of interest_ so as to increase logging frequency.
2016-06-21 10:56:01 +00:00
log . Infof ( "Checking to see if we need to roll back" )
2016-06-14 06:35:07 +00:00
// The following is to make sure we unlock the table no-matter-what!
// There's enough buffer in the channel to support a redundant write here.
okToUnlockTable <- true
Solved cut-over stall; change of table names
- Cutover would stall after `lock tables` wait-timeout due do waiting on a channel that would never be written to. This has been identified, reproduced, fixed, confirmed.
- Change of table names. Heres the story:
- Because were testing this even while `pt-online-schema-change` is being used in production, the `_tbl_old` naming convention makes for a collision.
- "old" table name is now `_tbl_del`, "del" standing for "delete"
- ghost table name is now `_tbl_gho`
- when issuing `--test-on-replica`, we keep the ghost table around, and were also briefly renaming original table to "old". Well this collides with a potentially existing "old" table on master (one that hasnt been dropped yet).
`--test-on-replica` uses `_tbl_ght` (ghost-test)
- similar problem with `--execute-on-replica`, and in this case the table doesnt stick around; calling it `_tbl_ghr` (ghost-replica)
- changelog table is now `_tbl_ghc` (ghost-changelog)
- To clarify, I dont want to go down the path of creating "old" tables with 2 or 3 or 4 or 5 or infinite leading underscored. I think this is very confusing and actually not operations friendly. Its OK that the migration will fail saying "hey, you ALREADY have an old table here, why dont you take care of it first", rather than create _yet_another_ `____tbl_old` table. Were always confused on which table it actually is that gets migrated, which is safe to `drop`, etc.
- just after rowcopy completing, just before cutover, during cutover: marking as point in time _of interest_ so as to increase logging frequency.
2016-06-21 10:56:01 +00:00
if atomic . LoadInt64 ( & originalTableRenameIntended ) == 1 {
log . Infof ( "Waiting for original table rename result" )
// We need to make sure we wait for the original-rename, successful or not,
// so as to be able to rollback in case the ghost-rename fails.
// But we only wait on this queue if there's actually going to be a rename.
// As an example, what happens should the initial `lock tables` fail? We would
// never proceed to rename the table, hence this queue is never written to.
<- originalTableRenamed
}
2016-06-14 06:35:07 +00:00
// Rollback operation
if ! this . applier . tableExists ( this . migrationContext . OriginalTableName ) {
log . Infof ( "Cannot find %s, rolling back" , this . migrationContext . OriginalTableName )
err := this . applier . RenameTable ( this . migrationContext . GetOldTableName ( ) , this . migrationContext . OriginalTableName )
log . Errore ( err )
Solved cut-over stall; change of table names
- Cutover would stall after `lock tables` wait-timeout due do waiting on a channel that would never be written to. This has been identified, reproduced, fixed, confirmed.
- Change of table names. Heres the story:
- Because were testing this even while `pt-online-schema-change` is being used in production, the `_tbl_old` naming convention makes for a collision.
- "old" table name is now `_tbl_del`, "del" standing for "delete"
- ghost table name is now `_tbl_gho`
- when issuing `--test-on-replica`, we keep the ghost table around, and were also briefly renaming original table to "old". Well this collides with a potentially existing "old" table on master (one that hasnt been dropped yet).
`--test-on-replica` uses `_tbl_ght` (ghost-test)
- similar problem with `--execute-on-replica`, and in this case the table doesnt stick around; calling it `_tbl_ghr` (ghost-replica)
- changelog table is now `_tbl_ghc` (ghost-changelog)
- To clarify, I dont want to go down the path of creating "old" tables with 2 or 3 or 4 or 5 or infinite leading underscored. I think this is very confusing and actually not operations friendly. Its OK that the migration will fail saying "hey, you ALREADY have an old table here, why dont you take care of it first", rather than create _yet_another_ `____tbl_old` table. Were always confused on which table it actually is that gets migrated, which is safe to `drop`, etc.
- just after rowcopy completing, just before cutover, during cutover: marking as point in time _of interest_ so as to increase logging frequency.
2016-06-21 10:56:01 +00:00
} else {
log . Info ( "No need for rollback" )
2016-06-14 06:35:07 +00:00
}
} ( )
lockOriginalSessionIdChan := make ( chan int64 , 1 )
tableLocked := make ( chan error , 1 )
tableUnlocked := make ( chan error , 1 )
go func ( ) {
if err := this . applier . LockOriginalTableAndWait ( lockOriginalSessionIdChan , tableLocked , okToUnlockTable , tableUnlocked ) ; err != nil {
log . Errore ( err )
}
} ( )
if err := <- tableLocked ; err != nil {
return log . Errore ( err )
}
lockOriginalSessionId := <- lockOriginalSessionIdChan
log . Infof ( "Session locking original table is %+v" , lockOriginalSessionId )
// At this point we know the table is locked.
// We know any newly incoming DML on original table is blocked.
this . waitForEventsUpToLock ( )
// Step 2
// We now attempt a RENAME on the original table, and expect it to block
renameOriginalSessionIdChan := make ( chan int64 , 1 )
this . migrationContext . RenameTablesStartTime = time . Now ( )
Solved cut-over stall; change of table names
- Cutover would stall after `lock tables` wait-timeout due do waiting on a channel that would never be written to. This has been identified, reproduced, fixed, confirmed.
- Change of table names. Heres the story:
- Because were testing this even while `pt-online-schema-change` is being used in production, the `_tbl_old` naming convention makes for a collision.
- "old" table name is now `_tbl_del`, "del" standing for "delete"
- ghost table name is now `_tbl_gho`
- when issuing `--test-on-replica`, we keep the ghost table around, and were also briefly renaming original table to "old". Well this collides with a potentially existing "old" table on master (one that hasnt been dropped yet).
`--test-on-replica` uses `_tbl_ght` (ghost-test)
- similar problem with `--execute-on-replica`, and in this case the table doesnt stick around; calling it `_tbl_ghr` (ghost-replica)
- changelog table is now `_tbl_ghc` (ghost-changelog)
- To clarify, I dont want to go down the path of creating "old" tables with 2 or 3 or 4 or 5 or infinite leading underscored. I think this is very confusing and actually not operations friendly. Its OK that the migration will fail saying "hey, you ALREADY have an old table here, why dont you take care of it first", rather than create _yet_another_ `____tbl_old` table. Were always confused on which table it actually is that gets migrated, which is safe to `drop`, etc.
- just after rowcopy completing, just before cutover, during cutover: marking as point in time _of interest_ so as to increase logging frequency.
2016-06-21 10:56:01 +00:00
atomic . StoreInt64 ( & originalTableRenameIntended , 1 )
2016-06-14 06:35:07 +00:00
go func ( ) {
this . applier . RenameOriginalTable ( renameOriginalSessionIdChan , originalTableRenamed )
} ( )
renameOriginalSessionId := <- renameOriginalSessionIdChan
log . Infof ( "Session renaming original table is %+v" , renameOriginalSessionId )
if err := this . retryOperation (
func ( ) error {
return this . applier . ExpectProcess ( renameOriginalSessionId , "metadata lock" , "rename" )
} ) ; err != nil {
return err
}
log . Infof ( "Found RENAME on original table to be blocking, as expected. Double checking original is still being locked" )
if err := this . applier . ExpectUsedLock ( lockOriginalSessionId ) ; err != nil {
// Abort operation; but make sure to unlock table!
return log . Errore ( err )
}
log . Infof ( "Connection holding lock on original table still exists" )
// Now that we've found the RENAME blocking, AND the locking connection still alive,
// we know it is safe to proceed to renaming ghost table.
// Step 3
// We now attempt a RENAME on the ghost table, and expect it to block
renameGhostSessionIdChan := make ( chan int64 , 1 )
ghostTableRenamed := make ( chan error , 1 )
go func ( ) {
this . applier . RenameGhostTable ( renameGhostSessionIdChan , ghostTableRenamed )
} ( )
renameGhostSessionId := <- renameGhostSessionIdChan
log . Infof ( "Session renaming ghost table is %+v" , renameGhostSessionId )
if err := this . retryOperation (
func ( ) error {
return this . applier . ExpectProcess ( renameGhostSessionId , "metadata lock" , "rename" )
} ) ; err != nil {
return err
}
log . Infof ( "Found RENAME on ghost table to be blocking, as expected. Will next release lock on original table" )
// Step 4
okToUnlockTable <- true
// BAM! original table lock is released, RENAME original->old released,
// RENAME ghost->original is released, queries on original are unblocked.
// (that is, assuming all went well)
if err := <- tableUnlocked ; err != nil {
return log . Errore ( err )
}
if err := <- ghostTableRenamed ; err != nil {
return log . Errore ( err )
}
this . migrationContext . RenameTablesEndTime = time . Now ( )
// ooh nice! We're actually truly and thankfully done
lockAndRenameDuration := this . migrationContext . RenameTablesEndTime . Sub ( this . migrationContext . LockTablesStartTime )
log . Infof ( "Lock & rename duration: %s. During this time, queries on %s were blocked" , lockAndRenameDuration , sql . EscapeName ( this . migrationContext . OriginalTableName ) )
return nil
}
2016-05-01 18:36:36 +00:00
// stopWritesAndCompleteMigrationOnReplica will stop replication IO thread, apply
// what DML events are left, and that's it.
// This only applies in --test-on-replica. It leaves replication stopped, with both tables
// in sync. There is no table swap.
2016-04-22 20:18:56 +00:00
func ( this * Migrator ) stopWritesAndCompleteMigrationOnReplica ( ) ( err error ) {
log . Debugf ( "testing on replica. Instead of LOCK tables I will STOP SLAVE" )
2016-06-19 15:55:37 +00:00
if err := this . retryOperation ( this . applier . StopReplication ) ; err != nil {
2016-04-22 20:18:56 +00:00
return err
}
2016-05-16 09:03:15 +00:00
this . waitForEventsUpToLock ( )
2016-04-22 20:18:56 +00:00
log . Info ( "Table duplicated with new schema. Am not touching the original table. Replication is stopped. You may now compare the two tables to gain trust into this tool's operation" )
return nil
}
2016-06-19 15:55:37 +00:00
// onServerCommand responds to a user's interactive command
2016-06-07 09:59:17 +00:00
func ( this * Migrator ) onServerCommand ( command string , writer * bufio . Writer ) ( err error ) {
2016-06-09 09:25:01 +00:00
defer writer . Flush ( )
tokens := strings . SplitN ( command , "=" , 2 )
2016-06-07 09:59:17 +00:00
command = strings . TrimSpace ( tokens [ 0 ] )
arg := ""
if len ( tokens ) > 1 {
arg = strings . TrimSpace ( tokens [ 1 ] )
}
2016-06-17 09:40:08 +00:00
2016-06-07 09:59:17 +00:00
switch command {
case "help" :
{
fmt . Fprintln ( writer , ` available commands :
2016-06-20 10:09:04 +00:00
status # Print a status message
chunk - size = < newsize > # Set a new chunk - size
critical - load = < load > # Set a new set of max - load thresholds
max - load = < load > # Set a new set of max - load thresholds
throttle - query = < query > # Set a new throttle - query
throttle - control - replicas = < replicas > #
throttle # Force throttling
no - throttle # End forced throttling ( other throttling may still apply )
panic # panic and quit without cleanup
help # This message
2016-06-07 09:59:17 +00:00
` )
}
case "info" , "status" :
2016-06-18 19:12:07 +00:00
this . printStatus ( ForcePrintStatusAndHint , writer )
2016-06-07 09:59:17 +00:00
case "chunk-size" :
{
if chunkSize , err := strconv . Atoi ( arg ) ; err != nil {
2016-06-09 09:25:01 +00:00
fmt . Fprintf ( writer , "%s\n" , err . Error ( ) )
2016-06-07 09:59:17 +00:00
return log . Errore ( err )
} else {
this . migrationContext . SetChunkSize ( int64 ( chunkSize ) )
2016-06-18 19:12:07 +00:00
this . printStatus ( ForcePrintStatusAndHint , writer )
2016-06-07 09:59:17 +00:00
}
}
2016-06-09 09:25:01 +00:00
case "max-load" :
{
if err := this . migrationContext . ReadMaxLoad ( arg ) ; err != nil {
fmt . Fprintf ( writer , "%s\n" , err . Error ( ) )
return log . Errore ( err )
}
2016-06-18 19:12:07 +00:00
this . printStatus ( ForcePrintStatusAndHint , writer )
}
case "critical-load" :
{
if err := this . migrationContext . ReadCriticalLoad ( arg ) ; err != nil {
fmt . Fprintf ( writer , "%s\n" , err . Error ( ) )
return log . Errore ( err )
}
this . printStatus ( ForcePrintStatusAndHint , writer )
}
case "throttle-query" :
{
this . migrationContext . SetThrottleQuery ( arg )
this . printStatus ( ForcePrintStatusAndHint , writer )
2016-06-09 09:25:01 +00:00
}
2016-06-20 10:09:04 +00:00
case "throttle-control-replicas" :
{
if err := this . migrationContext . ReadThrottleControlReplicaKeys ( arg ) ; err != nil {
fmt . Fprintf ( writer , "%s\n" , err . Error ( ) )
return log . Errore ( err )
}
fmt . Fprintf ( writer , "%s\n" , this . migrationContext . GetThrottleControlReplicaKeys ( ) . ToCommaDelimitedList ( ) )
this . printStatus ( ForcePrintStatusAndHint , writer )
}
2016-06-07 09:59:17 +00:00
case "throttle" , "pause" , "suspend" :
{
atomic . StoreInt64 ( & this . migrationContext . ThrottleCommandedByUser , 1 )
}
case "no-throttle" , "unthrottle" , "resume" , "continue" :
{
atomic . StoreInt64 ( & this . migrationContext . ThrottleCommandedByUser , 0 )
}
2016-06-20 04:38:29 +00:00
case "panic" :
{
err := fmt . Errorf ( "User commanded 'panic'. I will now panic, without cleanup. PANIC!" )
fmt . Fprintf ( writer , "%s\n" , err . Error ( ) )
this . panicAbort <- err
}
2016-06-07 09:59:17 +00:00
default :
2016-06-09 09:25:01 +00:00
err = fmt . Errorf ( "Unknown command: %s" , command )
fmt . Fprintf ( writer , "%s\n" , err . Error ( ) )
return err
2016-06-07 09:59:17 +00:00
}
return nil
}
2016-06-19 15:55:37 +00:00
// initiateServer begins listening on unix socket/tcp for incoming interactive commands
2016-06-07 09:59:17 +00:00
func ( this * Migrator ) initiateServer ( ) ( err error ) {
this . server = NewServer ( this . onServerCommand )
if err := this . server . BindSocketFile ( ) ; err != nil {
return err
}
if err := this . server . BindTCPPort ( ) ; err != nil {
return err
}
go this . server . Serve ( )
return nil
}
2016-06-19 15:55:37 +00:00
// initiateInspector connects, validates and inspects the "inspector" server.
// The "inspector" server is typically a replica; it is where we issue some
// queries such as:
// - table row count
// - schema validation
// - heartbeat
// When `--allow-on-master` is supplied, the inspector is actually the master.
2016-04-18 17:57:18 +00:00
func ( this * Migrator ) initiateInspector ( ) ( err error ) {
this . inspector = NewInspector ( )
if err := this . inspector . InitDBConnections ( ) ; err != nil {
return err
}
if err := this . inspector . ValidateOriginalTable ( ) ; err != nil {
return err
}
if err := this . inspector . InspectOriginalTable ( ) ; err != nil {
return err
}
// So far so good, table is accessible and valid.
// Let's get master connection config
if this . migrationContext . ApplierConnectionConfig , err = this . inspector . getMasterConnectionConfig ( ) ; err != nil {
return err
}
2016-06-15 10:18:59 +00:00
if this . migrationContext . TestOnReplica || this . migrationContext . MigrateOnReplica {
2016-04-18 17:57:18 +00:00
if this . migrationContext . InspectorIsAlsoApplier ( ) {
2016-06-15 10:18:59 +00:00
return fmt . Errorf ( "Instructed to --test-on-replica or --migrate-on-replica, but the server we connect to doesn't seem to be a replica" )
2016-04-18 17:57:18 +00:00
}
2016-06-15 10:18:59 +00:00
log . Infof ( "--test-on-replica or --migrate-on-replica given. Will not execute on master %+v but rather on replica %+v itself" ,
2016-06-19 15:55:37 +00:00
* this . migrationContext . ApplierConnectionConfig . ImpliedKey , * this . migrationContext . InspectorConnectionConfig . ImpliedKey ,
2016-04-18 17:57:18 +00:00
)
this . migrationContext . ApplierConnectionConfig = this . migrationContext . InspectorConnectionConfig . Duplicate ( )
2016-05-01 18:36:36 +00:00
if this . migrationContext . ThrottleControlReplicaKeys . Len ( ) == 0 {
this . migrationContext . ThrottleControlReplicaKeys . AddKey ( this . migrationContext . InspectorConnectionConfig . Key )
}
2016-04-18 17:57:18 +00:00
} else if this . migrationContext . InspectorIsAlsoApplier ( ) && ! this . migrationContext . AllowedRunningOnMaster {
return fmt . Errorf ( "It seems like this migration attempt to run directly on master. Preferably it would be executed on a replica (and this reduces load from the master). To proceed please provide --allow-on-master" )
}
2016-06-19 15:55:37 +00:00
log . Infof ( "Master found to be %+v" , * this . migrationContext . ApplierConnectionConfig . ImpliedKey )
2016-04-08 08:34:44 +00:00
return nil
}
2016-06-19 15:55:37 +00:00
// initiateStatus sets and activates the printStatus() ticker
2016-04-08 08:34:44 +00:00
func ( this * Migrator ) initiateStatus ( ) error {
2016-06-18 19:12:07 +00:00
this . printStatus ( ForcePrintStatusAndHint )
2016-04-08 08:34:44 +00:00
statusTick := time . Tick ( 1 * time . Second )
for range statusTick {
2016-06-18 19:12:07 +00:00
go this . printStatus ( HeuristicPrintStatusRule )
2016-04-08 08:34:44 +00:00
}
return nil
}
2016-06-19 15:55:37 +00:00
// printMigrationStatusHint prints a detailed configuration dump, that is useful
// to keep in mind; such as the name of migrated table, throttle params etc.
// This gets printed at beginning and end of migration, every 10 minutes throughout
// migration, and as reponse to the "status" interactive command.
2016-06-07 09:59:17 +00:00
func ( this * Migrator ) printMigrationStatusHint ( writers ... io . Writer ) {
w := io . MultiWriter ( writers ... )
fmt . Fprintln ( w , fmt . Sprintf ( "# Migrating %s.%s; Ghost table is %s.%s" ,
2016-06-01 08:40:49 +00:00
sql . EscapeName ( this . migrationContext . DatabaseName ) ,
sql . EscapeName ( this . migrationContext . OriginalTableName ) ,
sql . EscapeName ( this . migrationContext . DatabaseName ) ,
sql . EscapeName ( this . migrationContext . GetGhostTableName ( ) ) ,
2016-06-06 10:33:05 +00:00
) )
2016-06-19 15:55:37 +00:00
fmt . Fprintln ( w , fmt . Sprintf ( "# Migrating %+v; inspecting %+v; executing on %+v" ,
* this . applier . connectionConfig . ImpliedKey ,
* this . inspector . connectionConfig . ImpliedKey ,
this . hostname ,
) )
2016-06-07 09:59:17 +00:00
fmt . Fprintln ( w , fmt . Sprintf ( "# Migration started at %+v" ,
2016-06-01 08:40:49 +00:00
this . migrationContext . StartTime . Format ( time . RubyDate ) ,
2016-06-06 10:33:05 +00:00
) )
2016-06-09 09:25:01 +00:00
maxLoad := this . migrationContext . GetMaxLoad ( )
2016-06-18 19:12:07 +00:00
criticalLoad := this . migrationContext . GetCriticalLoad ( )
fmt . Fprintln ( w , fmt . Sprintf ( "# chunk-size: %+v; max lag: %+vms; max-load: %s; critical-load: %s" ,
2016-06-06 10:33:05 +00:00
atomic . LoadInt64 ( & this . migrationContext . ChunkSize ) ,
atomic . LoadInt64 ( & this . migrationContext . MaxLagMillisecondsThrottleThreshold ) ,
2016-06-18 19:12:07 +00:00
maxLoad . String ( ) ,
criticalLoad . String ( ) ,
2016-06-06 10:33:05 +00:00
) )
if this . migrationContext . ThrottleFlagFile != "" {
2016-06-07 09:59:17 +00:00
fmt . Fprintln ( w , fmt . Sprintf ( "# Throttle flag file: %+v" ,
2016-06-06 10:33:05 +00:00
this . migrationContext . ThrottleFlagFile ,
) )
}
if this . migrationContext . ThrottleAdditionalFlagFile != "" {
2016-06-07 09:59:17 +00:00
fmt . Fprintln ( w , fmt . Sprintf ( "# Throttle additional flag file: %+v" ,
2016-06-06 10:33:05 +00:00
this . migrationContext . ThrottleAdditionalFlagFile ,
) )
}
2016-06-18 19:12:07 +00:00
if throttleQuery := this . migrationContext . GetThrottleQuery ( ) ; throttleQuery != "" {
fmt . Fprintln ( w , fmt . Sprintf ( "# Throttle query: %+v" ,
throttleQuery ,
) )
}
2016-06-17 09:40:08 +00:00
if this . migrationContext . PostponeCutOverFlagFile != "" {
fmt . Fprintln ( w , fmt . Sprintf ( "# Postpone cut-over flag file: %+v" ,
this . migrationContext . PostponeCutOverFlagFile ,
) )
}
if this . migrationContext . PanicFlagFile != "" {
fmt . Fprintln ( w , fmt . Sprintf ( "# Panic flag file: %+v" ,
this . migrationContext . PanicFlagFile ,
) )
}
2016-06-07 09:59:17 +00:00
fmt . Fprintln ( w , fmt . Sprintf ( "# Serving on unix socket: %+v" ,
this . migrationContext . ServeSocketFile ,
) )
if this . migrationContext . ServeTCPPort != 0 {
fmt . Fprintln ( w , fmt . Sprintf ( "# Serving on TCP port: %+v" , this . migrationContext . ServeTCPPort ) )
}
2016-06-01 08:40:49 +00:00
}
2016-06-19 15:55:37 +00:00
// printStatus prints the prgoress status, and optionally additionally detailed
// dump of configuration.
// `rule` indicates the type of output expected.
// By default the status is written to standard output, but other writers can
// be used as well.
2016-06-18 19:12:07 +00:00
func ( this * Migrator ) printStatus ( rule PrintStatusRule , writers ... io . Writer ) {
writers = append ( writers , os . Stdout )
2016-04-08 08:34:44 +00:00
elapsedTime := this . migrationContext . ElapsedTime ( )
elapsedSeconds := int64 ( elapsedTime . Seconds ( ) )
totalRowsCopied := this . migrationContext . GetTotalRowsCopied ( )
2016-05-04 05:23:34 +00:00
rowsEstimate := atomic . LoadInt64 ( & this . migrationContext . RowsEstimate )
2016-05-19 13:11:36 +00:00
var progressPct float64
if rowsEstimate > 0 {
progressPct = 100.0 * float64 ( totalRowsCopied ) / float64 ( rowsEstimate )
2016-04-08 08:34:44 +00:00
}
2016-05-26 12:25:32 +00:00
// Before status, let's see if we should print a nice reminder for what exactly we're doing here.
2016-06-01 08:40:49 +00:00
shouldPrintMigrationStatusHint := ( elapsedSeconds % 600 == 0 )
2016-06-18 19:12:07 +00:00
if rule == ForcePrintStatusAndHint {
shouldPrintMigrationStatusHint = true
}
2016-06-01 08:40:49 +00:00
if shouldPrintMigrationStatusHint {
2016-06-18 19:12:07 +00:00
this . printMigrationStatusHint ( writers ... )
2016-05-26 12:25:32 +00:00
}
var etaSeconds float64 = math . MaxFloat64
2016-04-08 12:35:06 +00:00
eta := "N/A"
2016-06-13 16:36:29 +00:00
if atomic . LoadInt64 ( & this . migrationContext . IsPostponingCutOver ) > 0 {
eta = "postponing cut-over"
} else if isThrottled , throttleReason := this . migrationContext . IsThrottled ( ) ; isThrottled {
2016-04-11 15:27:16 +00:00
eta = fmt . Sprintf ( "throttled, %s" , throttleReason )
2016-05-05 06:18:19 +00:00
} else if progressPct > 100.0 {
eta = "Due"
2016-05-19 13:11:36 +00:00
} else if progressPct >= 1.0 {
2016-05-05 06:18:19 +00:00
elapsedRowCopySeconds := this . migrationContext . ElapsedRowCopyTime ( ) . Seconds ( )
totalExpectedSeconds := elapsedRowCopySeconds * float64 ( rowsEstimate ) / float64 ( totalRowsCopied )
2016-05-19 13:11:36 +00:00
etaSeconds = totalExpectedSeconds - elapsedRowCopySeconds
if etaSeconds >= 0 {
etaDuration := time . Duration ( etaSeconds ) * time . Second
2016-05-05 06:18:19 +00:00
eta = base . PrettifyDurationOutput ( etaDuration )
} else {
eta = "Due"
}
2016-04-08 12:35:06 +00:00
}
2016-05-19 13:11:36 +00:00
shouldPrintStatus := false
if elapsedSeconds <= 60 {
shouldPrintStatus = true
} else if etaSeconds <= 60 {
shouldPrintStatus = true
} else if etaSeconds <= 180 {
shouldPrintStatus = ( elapsedSeconds % 5 == 0 )
} else if elapsedSeconds <= 180 {
shouldPrintStatus = ( elapsedSeconds % 5 == 0 )
2016-05-25 10:27:58 +00:00
} else if this . migrationContext . TimeSincePointOfInterest ( ) . Seconds ( ) <= 60 {
2016-05-23 12:58:53 +00:00
shouldPrintStatus = ( elapsedSeconds % 5 == 0 )
2016-05-19 13:11:36 +00:00
} else {
shouldPrintStatus = ( elapsedSeconds % 30 == 0 )
}
2016-06-18 19:12:07 +00:00
if rule == ForcePrintStatusRule || rule == ForcePrintStatusAndHint {
shouldPrintStatus = true
}
2016-05-19 13:11:36 +00:00
if ! shouldPrintStatus {
return
}
currentBinlogCoordinates := * this . eventsStreamer . GetCurrentBinlogCoordinates ( )
2016-06-19 15:55:37 +00:00
status := fmt . Sprintf ( "Copy: %d/%d %.1f%%; Applied: %d; Backlog: %d/%d; Time: %+v(total), %+v(copy); streamer: %+v; ETA: %s" ,
2016-04-08 08:34:44 +00:00
totalRowsCopied , rowsEstimate , progressPct ,
2016-04-19 11:25:32 +00:00
atomic . LoadInt64 ( & this . migrationContext . TotalDMLEventsApplied ) ,
2016-04-08 08:34:44 +00:00
len ( this . applyEventsQueue ) , cap ( this . applyEventsQueue ) ,
2016-06-19 15:55:37 +00:00
base . PrettifyDurationOutput ( elapsedTime ) , base . PrettifyDurationOutput ( this . migrationContext . ElapsedRowCopyTime ( ) ) ,
2016-05-19 13:11:36 +00:00
currentBinlogCoordinates ,
2016-04-08 12:35:06 +00:00
eta ,
)
this . applier . WriteChangelog (
fmt . Sprintf ( "copy iteration %d at %d" , this . migrationContext . GetIteration ( ) , time . Now ( ) . Unix ( ) ) ,
status ,
)
2016-06-07 09:59:17 +00:00
w := io . MultiWriter ( writers ... )
fmt . Fprintln ( w , status )
2016-04-08 08:34:44 +00:00
}
2016-06-19 15:55:37 +00:00
// initiateHeartbeatListener listens for heartbeat events. gh-ost implements its own
// heartbeat mechanism, whether your DB has or hasn't an existing heartbeat solution.
// Heartbeat is supplied via the changelog table
2016-04-18 17:57:18 +00:00
func ( this * Migrator ) initiateHeartbeatListener ( ) {
2016-04-14 11:37:56 +00:00
ticker := time . Tick ( ( heartbeatIntervalMilliseconds * time . Millisecond ) / 2 )
for range ticker {
go func ( ) error {
2016-06-06 10:33:05 +00:00
if atomic . LoadInt64 ( & this . cleanupImminentFlag ) > 0 {
return nil
}
2016-04-14 11:37:56 +00:00
changelogState , err := this . inspector . readChangelogState ( )
if err != nil {
return log . Errore ( err )
}
for hint , value := range changelogState {
switch hint {
case "heartbeat" :
{
this . onChangelogHeartbeat ( value )
}
}
}
return nil
} ( )
}
}
// initiateStreaming begins treaming of binary log events and registers listeners for such events
2016-04-08 08:34:44 +00:00
func ( this * Migrator ) initiateStreaming ( ) error {
2016-04-06 11:05:58 +00:00
this . eventsStreamer = NewEventsStreamer ( )
if err := this . eventsStreamer . InitDBConnections ( ) ; err != nil {
return err
}
2016-04-18 17:57:18 +00:00
this . eventsStreamer . AddListener (
false ,
this . migrationContext . DatabaseName ,
this . migrationContext . GetChangelogTableName ( ) ,
func ( dmlEvent * binlog . BinlogDMLEvent ) error {
return this . onChangelogStateEvent ( dmlEvent )
} ,
)
2016-05-16 09:03:15 +00:00
go func ( ) {
log . Debugf ( "Beginning streaming" )
2016-05-19 13:11:36 +00:00
err := this . eventsStreamer . StreamEvents ( this . canStopStreaming )
if err != nil {
this . panicAbort <- err
}
log . Debugf ( "Done streaming" )
2016-05-16 09:03:15 +00:00
} ( )
return nil
}
2016-06-19 15:55:37 +00:00
// addDMLEventsListener begins listening for binlog events on the original table,
// and creates & enqueues a write task per such event.
2016-05-16 09:03:15 +00:00
func ( this * Migrator ) addDMLEventsListener ( ) error {
err := this . eventsStreamer . AddListener (
2016-05-05 14:14:55 +00:00
false ,
2016-04-07 13:57:12 +00:00
this . migrationContext . DatabaseName ,
2016-04-14 11:37:56 +00:00
this . migrationContext . OriginalTableName ,
2016-04-07 13:57:12 +00:00
func ( dmlEvent * binlog . BinlogDMLEvent ) error {
2016-06-19 15:55:37 +00:00
// Create a task to apply the DML event; this will be execute by executeWriteFuncs()
2016-04-14 11:37:56 +00:00
applyEventFunc := func ( ) error {
return this . applier . ApplyDMLEventQuery ( dmlEvent )
}
this . applyEventsQueue <- applyEventFunc
return nil
2016-04-07 13:57:12 +00:00
} ,
)
2016-05-16 09:03:15 +00:00
return err
2016-04-08 08:34:44 +00:00
}
2016-04-06 11:05:58 +00:00
2016-04-08 08:34:44 +00:00
func ( this * Migrator ) initiateApplier ( ) error {
2016-04-04 13:29:02 +00:00
this . applier = NewApplier ( )
if err := this . applier . InitDBConnections ( ) ; err != nil {
return err
}
2016-05-03 09:55:17 +00:00
if err := this . applier . ValidateOrDropExistingTables ( ) ; err != nil {
return err
}
2016-06-14 06:35:07 +00:00
if err := this . applier . CreateChangelogTable ( ) ; err != nil {
log . Errorf ( "Unable to create changelog table, see further error details. Perhaps a previous migration failed without dropping the table? OR is there a running migration? Bailing out" )
return err
}
2016-04-06 11:05:58 +00:00
if err := this . applier . CreateGhostTable ( ) ; err != nil {
log . Errorf ( "Unable to create ghost table, see further error details. Perhaps a previous migration failed without dropping the table? Bailing out" )
return err
}
if err := this . applier . AlterGhost ( ) ; err != nil {
log . Errorf ( "Unable to ALTER ghost table, see further error details. Bailing out" )
return err
}
2016-04-07 13:57:12 +00:00
2016-04-08 12:35:06 +00:00
this . applier . WriteChangelogState ( string ( TablesInPlace ) )
2016-04-14 11:37:56 +00:00
go this . applier . InitiateHeartbeat ( heartbeatIntervalMilliseconds )
2016-04-08 08:34:44 +00:00
return nil
}
2016-04-07 13:57:12 +00:00
2016-06-19 15:55:37 +00:00
// iterateChunks iterates the existing table rows, and generates a copy task of
// a chunk of rows onto the ghost table.
2016-04-08 08:34:44 +00:00
func ( this * Migrator ) iterateChunks ( ) error {
terminateRowIteration := func ( err error ) error {
this . rowCopyComplete <- true
return log . Errore ( err )
}
2016-04-14 11:37:56 +00:00
if this . migrationContext . Noop {
log . Debugf ( "Noop operation; not really copying data" )
return terminateRowIteration ( nil )
}
if this . migrationContext . MigrationRangeMinValues == nil {
log . Debugf ( "No rows found in table. Rowcopy will be implicitly empty" )
return terminateRowIteration ( nil )
}
2016-06-19 15:55:37 +00:00
// Iterate per chunk:
2016-04-08 08:34:44 +00:00
for {
2016-05-17 12:40:37 +00:00
if atomic . LoadInt64 ( & this . rowCopyCompleteFlag ) == 1 {
// Done
return nil
}
2016-04-08 08:34:44 +00:00
copyRowsFunc := func ( ) error {
hasFurtherRange , err := this . applier . CalculateNextIterationRangeEndValues ( )
if err != nil {
return terminateRowIteration ( err )
}
if ! hasFurtherRange {
return terminateRowIteration ( nil )
}
2016-06-19 15:55:37 +00:00
// Copy task:
2016-05-16 09:03:15 +00:00
applyCopyRowsFunc := func ( ) error {
_ , rowsAffected , _ , err := this . applier . ApplyIterationInsertQuery ( )
if err != nil {
return terminateRowIteration ( err )
}
atomic . AddInt64 ( & this . migrationContext . TotalRowsCopied , rowsAffected )
atomic . AddInt64 ( & this . migrationContext . Iteration , 1 )
return nil
2016-04-08 08:34:44 +00:00
}
2016-05-16 09:03:15 +00:00
return this . retryOperation ( applyCopyRowsFunc )
2016-04-08 08:34:44 +00:00
}
2016-06-19 15:55:37 +00:00
// Enqueue copy operation; to be executed by executeWriteFuncs()
2016-04-08 08:34:44 +00:00
this . copyRowsQueue <- copyRowsFunc
2016-04-04 13:29:02 +00:00
}
2016-04-08 08:34:44 +00:00
return nil
}
2016-06-19 15:55:37 +00:00
// executeWriteFuncs writes data via applier: both the rowcopy and the events backlog.
// This is where the ghost table gets the data. The function fills the data single-threaded.
// Both event backlog and rowcopy events are polled; the backlog events have precedence.
2016-04-08 08:34:44 +00:00
func ( this * Migrator ) executeWriteFuncs ( ) error {
2016-04-14 11:37:56 +00:00
if this . migrationContext . Noop {
2016-04-19 11:25:32 +00:00
log . Debugf ( "Noop operation; not really executing write funcs" )
2016-04-14 11:37:56 +00:00
return nil
}
2016-04-05 07:14:22 +00:00
for {
2016-06-21 07:21:58 +00:00
if atomic . LoadInt64 ( & this . inCutOverCriticalActionFlag ) == 0 {
// we don't throttle when cutting over. We _do_ throttle:
// - during copy phase
// - just before cut-over
// - in between cut-over retries
this . throttle ( nil )
// When cutting over, we need to be aggressive. Cut-over holds table locks.
// We need to release those asap.
}
2016-04-08 08:34:44 +00:00
// We give higher priority to event processing, then secondary priority to
// rowcopy
select {
case applyEventFunc := <- this . applyEventsQueue :
{
2016-04-08 12:35:06 +00:00
if err := this . retryOperation ( applyEventFunc ) ; err != nil {
return log . Errore ( err )
}
2016-04-08 08:34:44 +00:00
}
default :
{
select {
case copyRowsFunc := <- this . copyRowsQueue :
{
2016-05-16 09:03:15 +00:00
// Retries are handled within the copyRowsFunc
if err := copyRowsFunc ( ) ; err != nil {
2016-04-08 12:35:06 +00:00
return log . Errore ( err )
}
2016-04-08 08:34:44 +00:00
}
default :
{
// Hmmmmm... nothing in the queue; no events, but also no row copy.
// This is possible upon load. Let's just sleep it over.
log . Debugf ( "Getting nothing in the write queue. Sleeping..." )
time . Sleep ( time . Second )
}
}
}
2016-04-06 11:05:58 +00:00
}
2016-04-07 13:57:12 +00:00
}
2016-04-04 10:27:51 +00:00
return nil
}
2016-06-01 08:40:49 +00:00
// finalCleanup takes actions at very end of migration, dropping tables etc.
func ( this * Migrator ) finalCleanup ( ) error {
2016-06-06 10:33:05 +00:00
atomic . StoreInt64 ( & this . cleanupImminentFlag , 1 )
2016-06-22 10:39:13 +00:00
if this . migrationContext . Noop {
if createTableStatement , err := this . inspector . showCreateTable ( this . migrationContext . GetGhostTableName ( ) ) ; err == nil {
log . Infof ( "New table structure follows" )
fmt . Println ( createTableStatement )
} else {
log . Errore ( err )
}
}
2016-06-01 08:40:49 +00:00
if err := this . retryOperation ( this . applier . DropChangelogTable ) ; err != nil {
return err
}
if this . migrationContext . OkToDropTable && ! this . migrationContext . TestOnReplica {
2016-06-22 08:48:17 +00:00
if err := this . retryOperation ( this . applier . DropOldTable ) ; err != nil {
return err
2016-06-01 08:40:49 +00:00
}
2016-06-22 08:48:17 +00:00
}
if this . migrationContext . Noop {
if err := this . retryOperation ( this . applier . DropGhostTable ) ; err != nil {
2016-06-01 08:40:49 +00:00
return err
}
}
return nil
}