2016-04-04 10:27:51 +00:00
/ *
Copyright 2016 GitHub Inc .
2016-05-16 09:09:17 +00:00
See https : //github.com/github/gh-ost/blob/master/LICENSE
2016-04-04 10:27:51 +00:00
* /
package logic
import (
2016-06-07 09:59:17 +00:00
"bufio"
2016-04-04 13:29:02 +00:00
"fmt"
2016-06-07 09:59:17 +00:00
"io"
2016-05-19 13:11:36 +00:00
"math"
2016-04-08 08:34:44 +00:00
"os"
2016-04-11 15:27:16 +00:00
"os/signal"
2016-06-07 09:59:17 +00:00
"strconv"
"strings"
2016-04-07 13:57:12 +00:00
"sync/atomic"
2016-04-11 15:27:16 +00:00
"syscall"
2016-04-07 13:57:12 +00:00
"time"
2016-04-04 13:29:02 +00:00
2016-05-16 09:09:17 +00:00
"github.com/github/gh-ost/go/base"
"github.com/github/gh-ost/go/binlog"
"github.com/github/gh-ost/go/mysql"
"github.com/github/gh-ost/go/sql"
2016-04-04 13:29:02 +00:00
"github.com/outbrain/golib/log"
2016-04-04 10:27:51 +00:00
)
2016-04-08 08:34:44 +00:00
type ChangelogState string
const (
TablesInPlace ChangelogState = "TablesInPlace"
AllEventsUpToLockProcessed = "AllEventsUpToLockProcessed"
)
type tableWriteFunc func ( ) error
const (
2016-08-30 07:41:59 +00:00
applyEventsQueueBuffer = 100
2016-04-08 08:34:44 +00:00
)
2016-06-18 19:12:07 +00:00
type PrintStatusRule int
const (
2016-08-19 07:16:17 +00:00
HeuristicPrintStatusRule PrintStatusRule = iota
ForcePrintStatusRule = iota
ForcePrintStatusOnlyRule = iota
ForcePrintStatusAndHintRule = iota
2016-06-18 19:12:07 +00:00
)
2016-04-04 10:27:51 +00:00
// Migrator is the main schema migration flow manager.
type Migrator struct {
2016-06-17 06:03:18 +00:00
parser * sql . Parser
2016-04-04 10:27:51 +00:00
inspector * Inspector
2016-04-04 13:29:02 +00:00
applier * Applier
2016-04-06 11:05:58 +00:00
eventsStreamer * EventsStreamer
2016-06-07 09:59:17 +00:00
server * Server
2016-08-19 12:52:49 +00:00
hooksExecutor * HooksExecutor
2016-04-04 13:29:02 +00:00
migrationContext * base . MigrationContext
2016-04-07 13:57:12 +00:00
2016-04-08 08:34:44 +00:00
tablesInPlace chan bool
rowCopyComplete chan bool
allEventsUpToLockProcessed chan bool
2016-04-18 17:57:18 +00:00
panicAbort chan error
2016-04-08 08:34:44 +00:00
2016-05-17 12:40:37 +00:00
rowCopyCompleteFlag int64
allEventsUpToLockProcessedInjectedFlag int64
2016-06-21 07:21:58 +00:00
inCutOverCriticalActionFlag int64
2016-06-06 10:33:05 +00:00
cleanupImminentFlag int64
2016-07-01 08:59:09 +00:00
userCommandedUnpostponeFlag int64
2016-04-08 08:34:44 +00:00
// copyRowsQueue should not be buffered; if buffered some non-damaging but
// excessive work happens at the end of the iteration as new copy-jobs arrive befroe realizing the copy is complete
copyRowsQueue chan tableWriteFunc
applyEventsQueue chan tableWriteFunc
2016-04-14 11:37:56 +00:00
handledChangelogStates map [ string ] bool
2016-04-04 10:27:51 +00:00
}
2016-04-04 13:29:02 +00:00
func NewMigrator ( ) * Migrator {
2016-04-07 13:57:12 +00:00
migrator := & Migrator {
2016-04-08 08:34:44 +00:00
migrationContext : base . GetMigrationContext ( ) ,
2016-06-17 06:03:18 +00:00
parser : sql . NewParser ( ) ,
2016-04-08 08:34:44 +00:00
tablesInPlace : make ( chan bool ) ,
rowCopyComplete : make ( chan bool ) ,
allEventsUpToLockProcessed : make ( chan bool ) ,
2016-04-18 17:57:18 +00:00
panicAbort : make ( chan error ) ,
2016-04-08 08:34:44 +00:00
2016-05-17 12:40:37 +00:00
allEventsUpToLockProcessedInjectedFlag : 0 ,
2016-05-16 09:03:15 +00:00
2016-04-14 11:37:56 +00:00
copyRowsQueue : make ( chan tableWriteFunc ) ,
applyEventsQueue : make ( chan tableWriteFunc , applyEventsQueueBuffer ) ,
handledChangelogStates : make ( map [ string ] bool ) ,
2016-04-07 13:57:12 +00:00
}
return migrator
}
2016-04-11 15:27:16 +00:00
// acceptSignals registers for OS signals
func ( this * Migrator ) acceptSignals ( ) {
c := make ( chan os . Signal , 1 )
signal . Notify ( c , syscall . SIGHUP )
go func ( ) {
for sig := range c {
switch sig {
case syscall . SIGHUP :
log . Debugf ( "Received SIGHUP. Reloading configuration" )
}
}
} ( )
}
2016-08-19 12:52:49 +00:00
// initiateHooksExecutor
func ( this * Migrator ) initiateHooksExecutor ( ) ( err error ) {
this . hooksExecutor = NewHooksExecutor ( )
2016-08-20 06:24:20 +00:00
if err := this . hooksExecutor . initHooks ( ) ; err != nil {
2016-08-19 12:52:49 +00:00
return err
}
return nil
}
2016-06-19 15:55:37 +00:00
// shouldThrottle performs checks to see whether we should currently be throttling.
// It also checks for critical-load and panic aborts.
2016-04-08 12:35:06 +00:00
func ( this * Migrator ) shouldThrottle ( ) ( result bool , reason string ) {
2016-06-17 09:40:08 +00:00
// Regardless of throttle, we take opportunity to check for panic-abort
if this . migrationContext . PanicFlagFile != "" {
if base . FileExists ( this . migrationContext . PanicFlagFile ) {
this . panicAbort <- fmt . Errorf ( "Found panic-file %s. Aborting without cleanup" , this . migrationContext . PanicFlagFile )
}
}
2016-06-18 19:12:07 +00:00
criticalLoad := this . migrationContext . GetCriticalLoad ( )
for variableName , threshold := range criticalLoad {
value , err := this . applier . ShowStatusVariable ( variableName )
if err != nil {
return true , fmt . Sprintf ( "%s %s" , variableName , err )
}
if value >= threshold {
this . panicAbort <- fmt . Errorf ( "critical-load met: %s=%d, >=%d" , variableName , value , threshold )
}
}
2016-06-17 09:40:08 +00:00
// Back to throttle considerations
2016-05-01 18:36:36 +00:00
// User-based throttle
2016-06-07 09:59:17 +00:00
if atomic . LoadInt64 ( & this . migrationContext . ThrottleCommandedByUser ) > 0 {
return true , "commanded by user"
}
2016-04-08 12:35:06 +00:00
if this . migrationContext . ThrottleFlagFile != "" {
2016-05-17 12:40:37 +00:00
if base . FileExists ( this . migrationContext . ThrottleFlagFile ) {
2016-04-11 15:27:16 +00:00
// Throttle file defined and exists!
return true , "flag-file"
}
}
if this . migrationContext . ThrottleAdditionalFlagFile != "" {
2016-05-17 12:40:37 +00:00
if base . FileExists ( this . migrationContext . ThrottleAdditionalFlagFile ) {
2016-04-11 15:27:16 +00:00
// 2nd Throttle file defined and exists!
2016-04-08 12:35:06 +00:00
return true , "flag-file"
}
}
2016-05-01 18:36:36 +00:00
// Replication lag throttle
2016-07-13 07:44:00 +00:00
maxLagMillisecondsThrottleThreshold := atomic . LoadInt64 ( & this . migrationContext . MaxLagMillisecondsThrottleThreshold )
2016-05-01 18:36:36 +00:00
lag := atomic . LoadInt64 ( & this . migrationContext . CurrentLag )
2016-07-13 07:44:00 +00:00
if time . Duration ( lag ) > time . Duration ( maxLagMillisecondsThrottleThreshold ) * time . Millisecond {
2016-05-01 18:36:36 +00:00
return true , fmt . Sprintf ( "lag=%fs" , time . Duration ( lag ) . Seconds ( ) )
}
2016-07-26 09:51:24 +00:00
checkThrottleControlReplicas := true
if ( this . migrationContext . TestOnReplica || this . migrationContext . MigrateOnReplica ) && ( atomic . LoadInt64 ( & this . allEventsUpToLockProcessedInjectedFlag ) > 0 ) {
checkThrottleControlReplicas = false
}
if checkThrottleControlReplicas {
2016-08-30 07:41:59 +00:00
lagResult := this . migrationContext . GetControlReplicasLagResult ( )
2016-07-27 07:59:46 +00:00
if lagResult . Err != nil {
return true , fmt . Sprintf ( "%+v %+v" , lagResult . Key , lagResult . Err )
2016-05-01 18:36:36 +00:00
}
2016-07-27 07:59:46 +00:00
if lagResult . Lag > time . Duration ( maxLagMillisecondsThrottleThreshold ) * time . Millisecond {
return true , fmt . Sprintf ( "%+v replica-lag=%fs" , lagResult . Key , lagResult . Lag . Seconds ( ) )
2016-05-01 18:36:36 +00:00
}
}
2016-04-08 12:35:06 +00:00
2016-06-09 09:25:01 +00:00
maxLoad := this . migrationContext . GetMaxLoad ( )
for variableName , threshold := range maxLoad {
2016-04-08 12:35:06 +00:00
value , err := this . applier . ShowStatusVariable ( variableName )
if err != nil {
return true , fmt . Sprintf ( "%s %s" , variableName , err )
}
2016-06-18 19:12:07 +00:00
if value >= threshold {
return true , fmt . Sprintf ( "max-load %s=%d >= %d" , variableName , value , threshold )
}
}
if this . migrationContext . GetThrottleQuery ( ) != "" {
if res , _ := this . applier . ExecuteThrottleQuery ( ) ; res > 0 {
return true , "throttle-query"
2016-04-08 08:34:44 +00:00
}
2016-04-07 13:57:12 +00:00
}
2016-04-08 12:35:06 +00:00
return false , ""
}
2016-06-19 15:55:37 +00:00
// initiateThrottler initiates the throttle ticker and sets the basic behavior of throttling.
2016-04-11 15:27:16 +00:00
func ( this * Migrator ) initiateThrottler ( ) error {
throttlerTick := time . Tick ( 1 * time . Second )
throttlerFunction := func ( ) {
alreadyThrottling , currentReason := this . migrationContext . IsThrottled ( )
shouldThrottle , throttleReason := this . shouldThrottle ( )
if shouldThrottle && ! alreadyThrottling {
// New throttling
this . applier . WriteAndLogChangelog ( "throttle" , throttleReason )
} else if shouldThrottle && alreadyThrottling && ( currentReason != throttleReason ) {
// Change of reason
this . applier . WriteAndLogChangelog ( "throttle" , throttleReason )
} else if alreadyThrottling && ! shouldThrottle {
// End of throttling
this . applier . WriteAndLogChangelog ( "throttle" , "done throttling" )
}
this . migrationContext . SetThrottled ( shouldThrottle , throttleReason )
}
throttlerFunction ( )
for range throttlerTick {
throttlerFunction ( )
}
return nil
}
2016-04-08 12:35:06 +00:00
// throttle initiates a throttling event, if need be, updates the Context and
// calls callback functions, if any
2016-04-11 15:27:16 +00:00
func ( this * Migrator ) throttle ( onThrottled func ( ) ) {
2016-04-08 12:35:06 +00:00
for {
2016-07-27 07:59:46 +00:00
// IsThrottled() is non-blocking; the throttling decision making takes place asynchronously.
// Therefore calling IsThrottled() is cheap
2016-04-11 15:27:16 +00:00
if shouldThrottle , _ := this . migrationContext . IsThrottled ( ) ; ! shouldThrottle {
return
2016-04-08 12:35:06 +00:00
}
2016-04-11 15:27:16 +00:00
if onThrottled != nil {
onThrottled ( )
2016-04-08 12:35:06 +00:00
}
2016-07-27 07:59:46 +00:00
time . Sleep ( 250 * time . Millisecond )
2016-04-08 12:35:06 +00:00
}
}
2016-05-17 12:40:37 +00:00
// sleepWhileTrue sleeps indefinitely until the given function returns 'false'
// (or fails with error)
func ( this * Migrator ) sleepWhileTrue ( operation func ( ) ( bool , error ) ) error {
for {
shouldSleep , err := operation ( )
if err != nil {
return err
}
if ! shouldSleep {
return nil
}
time . Sleep ( time . Second )
}
}
2016-04-08 12:35:06 +00:00
// retryOperation attempts up to `count` attempts at running given function,
// exiting as soon as it returns with non-error.
2016-06-27 09:08:06 +00:00
func ( this * Migrator ) retryOperation ( operation func ( ) error , notFatalHint ... bool ) ( err error ) {
2016-06-19 15:55:37 +00:00
maxRetries := int ( this . migrationContext . MaxRetries ( ) )
2016-04-08 12:35:06 +00:00
for i := 0 ; i < maxRetries ; i ++ {
if i != 0 {
// sleep after previous iteration
time . Sleep ( 1 * time . Second )
}
err = operation ( )
if err == nil {
return nil
}
// there's an error. Let's try again.
}
2016-06-27 09:08:06 +00:00
if len ( notFatalHint ) == 0 {
this . panicAbort <- err
}
2016-04-08 12:35:06 +00:00
return err
2016-04-07 13:57:12 +00:00
}
2016-04-23 02:46:34 +00:00
// executeAndThrottleOnError executes a given function. If it errors, it
// throttles.
func ( this * Migrator ) executeAndThrottleOnError ( operation func ( ) error ) ( err error ) {
if err := operation ( ) ; err != nil {
this . throttle ( nil )
return err
}
return nil
}
2016-05-16 09:03:15 +00:00
// consumeRowCopyComplete blocks on the rowCopyComplete channel once, and then
2016-08-19 12:52:49 +00:00
// consumes and drops any further incoming events that may be left hanging.
2016-05-16 09:03:15 +00:00
func ( this * Migrator ) consumeRowCopyComplete ( ) {
<- this . rowCopyComplete
2016-05-17 12:40:37 +00:00
atomic . StoreInt64 ( & this . rowCopyCompleteFlag , 1 )
2016-06-19 15:55:37 +00:00
this . migrationContext . MarkRowCopyEndTime ( )
2016-05-16 09:03:15 +00:00
go func ( ) {
for <- this . rowCopyComplete {
}
} ( )
}
2016-04-07 13:57:12 +00:00
func ( this * Migrator ) canStopStreaming ( ) bool {
return false
}
2016-06-19 15:55:37 +00:00
// onChangelogStateEvent is called when a binlog event operation on the changelog table is intercepted.
2016-04-18 17:57:18 +00:00
func ( this * Migrator ) onChangelogStateEvent ( dmlEvent * binlog . BinlogDMLEvent ) ( err error ) {
// Hey, I created the changlog table, I know the type of columns it has!
if hint := dmlEvent . NewColumnValues . StringColumn ( 2 ) ; hint != "state" {
return nil
}
changelogState := ChangelogState ( dmlEvent . NewColumnValues . StringColumn ( 3 ) )
switch changelogState {
case TablesInPlace :
{
this . tablesInPlace <- true
}
case AllEventsUpToLockProcessed :
{
2016-05-16 09:03:15 +00:00
applyEventFunc := func ( ) error {
this . allEventsUpToLockProcessed <- true
return nil
}
// at this point we know all events up to lock have been read from the streamer,
// because the streamer works sequentially. So those events are either already handled,
// or have event functions in applyEventsQueue.
// So as not to create a potential deadlock, we write this func to applyEventsQueue
// asynchronously, understanding it doesn't really matter.
go func ( ) {
this . applyEventsQueue <- applyEventFunc
} ( )
2016-04-08 08:34:44 +00:00
}
2016-04-07 13:57:12 +00:00
default :
{
return fmt . Errorf ( "Unknown changelog state: %+v" , changelogState )
}
}
2016-04-08 12:35:06 +00:00
log . Debugf ( "Received state %+v" , changelogState )
2016-04-07 13:57:12 +00:00
return nil
}
2016-08-30 07:41:59 +00:00
// parseChangelogHeartbeat is called when a heartbeat event is intercepted
func ( this * Migrator ) parseChangelogHeartbeat ( heartbeatValue string ) ( err error ) {
2016-04-18 17:57:18 +00:00
heartbeatTime , err := time . Parse ( time . RFC3339Nano , heartbeatValue )
2016-04-07 13:57:12 +00:00
if err != nil {
return log . Errore ( err )
}
2016-08-02 12:38:56 +00:00
lag := time . Since ( heartbeatTime )
2016-04-07 13:57:12 +00:00
atomic . StoreInt64 ( & this . migrationContext . CurrentLag , int64 ( lag ) )
return nil
2016-04-04 10:27:51 +00:00
}
2016-06-19 15:55:37 +00:00
// listenOnPanicAbort aborts on abort request
2016-04-18 17:57:18 +00:00
func ( this * Migrator ) listenOnPanicAbort ( ) {
err := <- this . panicAbort
log . Fatale ( err )
}
2016-06-17 09:40:08 +00:00
2016-06-19 15:55:37 +00:00
// validateStatement validates the `alter` statement meets criteria.
// At this time this means:
// - column renames are approved
2016-06-17 06:03:18 +00:00
func ( this * Migrator ) validateStatement ( ) ( err error ) {
if this . parser . HasNonTrivialRenames ( ) && ! this . migrationContext . SkipRenamedColumns {
this . migrationContext . ColumnRenameMap = this . parser . GetNonTrivialRenames ( )
if ! this . migrationContext . ApproveRenamedColumns {
2016-08-22 06:49:27 +00:00
return fmt . Errorf ( "gh-ost believes the ALTER statement renames columns, as follows: %v; as precation, you are asked to confirm gh-ost is correct, and provide with `--approve-renamed-columns`, and we're all happy. Or you can skip renamed columns via `--skip-renamed-columns`, in which case column data may be lost" , this . parser . GetNonTrivialRenames ( ) )
2016-06-17 06:03:18 +00:00
}
2016-06-18 19:12:07 +00:00
log . Infof ( "Alter statement has column(s) renamed. gh-ost finds the following renames: %v; --approve-renamed-columns is given and so migration proceeds." , this . parser . GetNonTrivialRenames ( ) )
2016-06-17 06:03:18 +00:00
}
return nil
}
2016-04-18 17:57:18 +00:00
2016-08-24 09:39:44 +00:00
func ( this * Migrator ) countTableRows ( ) ( err error ) {
if ! this . migrationContext . CountTableRows {
// Not counting; we stay with an estimate
return nil
}
if this . migrationContext . Noop {
log . Debugf ( "Noop operation; not really counting table rows" )
return nil
}
2016-08-29 07:58:31 +00:00
countRowsFunc := func ( ) error {
if err := this . inspector . CountTableRows ( ) ; err != nil {
return err
}
if err := this . hooksExecutor . onRowCountComplete ( ) ; err != nil {
return err
}
return nil
}
2016-08-24 09:39:44 +00:00
if this . migrationContext . ConcurrentCountTableRows {
log . Infof ( "As instructed, counting rows in the background; meanwhile I will use an estimated count, and will update it later on" )
2016-08-29 07:58:31 +00:00
go countRowsFunc ( )
2016-08-24 09:39:44 +00:00
// and we ignore errors, because this turns to be a background job
return nil
}
2016-08-29 07:58:31 +00:00
return countRowsFunc ( )
2016-08-24 09:39:44 +00:00
}
2016-06-19 15:55:37 +00:00
// Migrate executes the complete migration logic. This is *the* major gh-ost function.
2016-04-04 13:29:02 +00:00
func ( this * Migrator ) Migrate ( ) ( err error ) {
2016-05-19 13:11:36 +00:00
log . Infof ( "Migrating %s.%s" , sql . EscapeName ( this . migrationContext . DatabaseName ) , sql . EscapeName ( this . migrationContext . OriginalTableName ) )
2016-04-08 08:34:44 +00:00
this . migrationContext . StartTime = time . Now ( )
2016-08-23 09:35:48 +00:00
if this . migrationContext . Hostname , err = os . Hostname ( ) ; err != nil {
2016-06-19 15:55:37 +00:00
return err
}
2016-04-08 08:34:44 +00:00
2016-04-18 17:57:18 +00:00
go this . listenOnPanicAbort ( )
2016-06-17 06:03:18 +00:00
2016-08-19 12:52:49 +00:00
if err := this . initiateHooksExecutor ( ) ; err != nil {
return err
}
if err := this . hooksExecutor . onStartup ( ) ; err != nil {
return err
}
2016-06-17 06:03:18 +00:00
if err := this . parser . ParseAlterStatement ( this . migrationContext . AlterStatement ) ; err != nil {
return err
}
if err := this . validateStatement ( ) ; err != nil {
return err
}
2016-04-18 17:57:18 +00:00
if err := this . initiateInspector ( ) ; err != nil {
2016-04-04 13:29:02 +00:00
return err
}
2016-04-08 08:34:44 +00:00
if err := this . initiateStreaming ( ) ; err != nil {
return err
}
if err := this . initiateApplier ( ) ; err != nil {
return err
}
2016-08-23 07:41:07 +00:00
log . Infof ( "Waiting for tables to be in place" )
2016-04-08 08:34:44 +00:00
<- this . tablesInPlace
log . Debugf ( "Tables are in place" )
// Yay! We now know the Ghost and Changelog tables are good to examine!
// When running on replica, this means the replica has those tables. When running
// on master this is always true, of course, and yet it also implies this knowledge
// is in the binlogs.
2016-04-08 12:35:06 +00:00
if err := this . inspector . InspectOriginalAndGhostTables ( ) ; err != nil {
return err
}
2016-08-19 12:52:49 +00:00
// Validation complete! We're good to execute this migration
if err := this . hooksExecutor . onValidated ( ) ; err != nil {
return err
}
2016-07-28 11:01:26 +00:00
if err := this . initiateServer ( ) ; err != nil {
return err
}
2016-08-11 07:01:14 +00:00
defer this . server . RemoveSocketFile ( )
2016-08-24 09:39:44 +00:00
if err := this . countTableRows ( ) ; err != nil {
return err
2016-06-06 10:33:05 +00:00
}
2016-05-16 09:03:15 +00:00
if err := this . addDMLEventsListener ( ) ; err != nil {
return err
}
2016-08-30 07:41:59 +00:00
go this . initiateHeartbeatReader ( )
go this . initiateControlReplicasReader ( )
2016-04-08 08:34:44 +00:00
if err := this . applier . ReadMigrationRangeValues ( ) ; err != nil {
return err
}
2016-04-11 15:27:16 +00:00
go this . initiateThrottler ( )
2016-08-23 09:40:32 +00:00
if err := this . hooksExecutor . onBeforeRowCopy ( ) ; err != nil {
2016-08-19 12:52:49 +00:00
return err
}
2016-04-08 08:34:44 +00:00
go this . executeWriteFuncs ( )
go this . iterateChunks ( )
2016-07-29 08:40:23 +00:00
this . migrationContext . MarkRowCopyStartTime ( )
2016-04-08 12:35:06 +00:00
go this . initiateStatus ( )
2016-04-08 08:34:44 +00:00
log . Debugf ( "Operating until row copy is complete" )
2016-05-16 09:03:15 +00:00
this . consumeRowCopyComplete ( )
2016-05-19 13:11:36 +00:00
log . Infof ( "Row copy complete" )
2016-08-19 12:52:49 +00:00
if err := this . hooksExecutor . onRowCopyComplete ( ) ; err != nil {
return err
}
2016-06-18 19:12:07 +00:00
this . printStatus ( ForcePrintStatusRule )
2016-04-08 08:34:44 +00:00
2016-08-23 09:40:32 +00:00
if err := this . hooksExecutor . onBeforeCutOver ( ) ; err != nil {
2016-08-19 12:52:49 +00:00
return err
}
2016-06-15 08:13:06 +00:00
if err := this . cutOver ( ) ; err != nil {
2016-04-23 02:46:34 +00:00
return err
}
2016-04-18 17:57:18 +00:00
2016-06-01 08:40:49 +00:00
if err := this . finalCleanup ( ) ; err != nil {
return nil
}
2016-08-19 12:52:49 +00:00
if err := this . hooksExecutor . onSuccess ( ) ; err != nil {
return err
}
2016-05-19 13:11:36 +00:00
log . Infof ( "Done migrating %s.%s" , sql . EscapeName ( this . migrationContext . DatabaseName ) , sql . EscapeName ( this . migrationContext . OriginalTableName ) )
2016-04-18 17:57:18 +00:00
return nil
}
2016-08-23 09:35:48 +00:00
// ExecOnFailureHook executes the onFailure hook, and this method is provided as the only external
// hook access point
func ( this * Migrator ) ExecOnFailureHook ( ) ( err error ) {
return this . hooksExecutor . onFailure ( )
}
2016-06-15 08:13:06 +00:00
// cutOver performs the final step of migration, based on migration
2016-05-01 18:36:36 +00:00
// type (on replica? bumpy? safe?)
2016-06-15 08:13:06 +00:00
func ( this * Migrator ) cutOver ( ) ( err error ) {
2016-04-19 11:25:32 +00:00
if this . migrationContext . Noop {
log . Debugf ( "Noop operation; not really swapping tables" )
return nil
}
Solved cut-over stall; change of table names
- Cutover would stall after `lock tables` wait-timeout due do waiting on a channel that would never be written to. This has been identified, reproduced, fixed, confirmed.
- Change of table names. Heres the story:
- Because were testing this even while `pt-online-schema-change` is being used in production, the `_tbl_old` naming convention makes for a collision.
- "old" table name is now `_tbl_del`, "del" standing for "delete"
- ghost table name is now `_tbl_gho`
- when issuing `--test-on-replica`, we keep the ghost table around, and were also briefly renaming original table to "old". Well this collides with a potentially existing "old" table on master (one that hasnt been dropped yet).
`--test-on-replica` uses `_tbl_ght` (ghost-test)
- similar problem with `--execute-on-replica`, and in this case the table doesnt stick around; calling it `_tbl_ghr` (ghost-replica)
- changelog table is now `_tbl_ghc` (ghost-changelog)
- To clarify, I dont want to go down the path of creating "old" tables with 2 or 3 or 4 or 5 or infinite leading underscored. I think this is very confusing and actually not operations friendly. Its OK that the migration will fail saying "hey, you ALREADY have an old table here, why dont you take care of it first", rather than create _yet_another_ `____tbl_old` table. Were always confused on which table it actually is that gets migrated, which is safe to `drop`, etc.
- just after rowcopy completing, just before cutover, during cutover: marking as point in time _of interest_ so as to increase logging frequency.
2016-06-21 10:56:01 +00:00
this . migrationContext . MarkPointOfInterest ( )
2016-04-11 15:27:16 +00:00
this . throttle ( func ( ) {
2016-04-22 20:18:56 +00:00
log . Debugf ( "throttling before swapping tables" )
2016-04-11 15:27:16 +00:00
} )
2016-04-18 17:57:18 +00:00
Solved cut-over stall; change of table names
- Cutover would stall after `lock tables` wait-timeout due do waiting on a channel that would never be written to. This has been identified, reproduced, fixed, confirmed.
- Change of table names. Heres the story:
- Because were testing this even while `pt-online-schema-change` is being used in production, the `_tbl_old` naming convention makes for a collision.
- "old" table name is now `_tbl_del`, "del" standing for "delete"
- ghost table name is now `_tbl_gho`
- when issuing `--test-on-replica`, we keep the ghost table around, and were also briefly renaming original table to "old". Well this collides with a potentially existing "old" table on master (one that hasnt been dropped yet).
`--test-on-replica` uses `_tbl_ght` (ghost-test)
- similar problem with `--execute-on-replica`, and in this case the table doesnt stick around; calling it `_tbl_ghr` (ghost-replica)
- changelog table is now `_tbl_ghc` (ghost-changelog)
- To clarify, I dont want to go down the path of creating "old" tables with 2 or 3 or 4 or 5 or infinite leading underscored. I think this is very confusing and actually not operations friendly. Its OK that the migration will fail saying "hey, you ALREADY have an old table here, why dont you take care of it first", rather than create _yet_another_ `____tbl_old` table. Were always confused on which table it actually is that gets migrated, which is safe to `drop`, etc.
- just after rowcopy completing, just before cutover, during cutover: marking as point in time _of interest_ so as to increase logging frequency.
2016-06-21 10:56:01 +00:00
this . migrationContext . MarkPointOfInterest ( )
2016-05-17 12:40:37 +00:00
this . sleepWhileTrue (
func ( ) ( bool , error ) {
2016-06-07 12:05:25 +00:00
if this . migrationContext . PostponeCutOverFlagFile == "" {
2016-05-17 12:40:37 +00:00
return false , nil
}
2016-07-01 08:59:09 +00:00
if atomic . LoadInt64 ( & this . userCommandedUnpostponeFlag ) > 0 {
return false , nil
}
2016-06-07 12:05:25 +00:00
if base . FileExists ( this . migrationContext . PostponeCutOverFlagFile ) {
2016-05-17 12:40:37 +00:00
// Throttle file defined and exists!
2016-08-19 12:52:49 +00:00
if atomic . LoadInt64 ( & this . migrationContext . IsPostponingCutOver ) == 0 {
if err := this . hooksExecutor . onBeginPostponed ( ) ; err != nil {
return true , err
}
}
2016-06-13 16:36:29 +00:00
atomic . StoreInt64 ( & this . migrationContext . IsPostponingCutOver , 1 )
2016-05-17 12:40:37 +00:00
return true , nil
}
return false , nil
} ,
)
2016-06-13 16:36:29 +00:00
atomic . StoreInt64 ( & this . migrationContext . IsPostponingCutOver , 0 )
Solved cut-over stall; change of table names
- Cutover would stall after `lock tables` wait-timeout due do waiting on a channel that would never be written to. This has been identified, reproduced, fixed, confirmed.
- Change of table names. Heres the story:
- Because were testing this even while `pt-online-schema-change` is being used in production, the `_tbl_old` naming convention makes for a collision.
- "old" table name is now `_tbl_del`, "del" standing for "delete"
- ghost table name is now `_tbl_gho`
- when issuing `--test-on-replica`, we keep the ghost table around, and were also briefly renaming original table to "old". Well this collides with a potentially existing "old" table on master (one that hasnt been dropped yet).
`--test-on-replica` uses `_tbl_ght` (ghost-test)
- similar problem with `--execute-on-replica`, and in this case the table doesnt stick around; calling it `_tbl_ghr` (ghost-replica)
- changelog table is now `_tbl_ghc` (ghost-changelog)
- To clarify, I dont want to go down the path of creating "old" tables with 2 or 3 or 4 or 5 or infinite leading underscored. I think this is very confusing and actually not operations friendly. Its OK that the migration will fail saying "hey, you ALREADY have an old table here, why dont you take care of it first", rather than create _yet_another_ `____tbl_old` table. Were always confused on which table it actually is that gets migrated, which is safe to `drop`, etc.
- just after rowcopy completing, just before cutover, during cutover: marking as point in time _of interest_ so as to increase logging frequency.
2016-06-21 10:56:01 +00:00
this . migrationContext . MarkPointOfInterest ( )
2016-05-17 12:40:37 +00:00
2016-04-18 17:57:18 +00:00
if this . migrationContext . TestOnReplica {
2016-06-10 09:15:11 +00:00
// With `--test-on-replica` we stop replication thread, and then proceed to use
// the same cut-over phase as the master would use. That means we take locks
// and swap the tables.
// The difference is that we will later swap the tables back.
2016-08-23 09:35:48 +00:00
this . hooksExecutor . onStopReplication ( )
2016-08-23 22:34:10 +00:00
if this . migrationContext . TestOnReplicaSkipReplicaStop {
log . Warningf ( "--test-on-replica-skip-replica-stop enabled, we are not stopping replication." )
} else {
log . Debugf ( "testing on replica. Stopping replication IO thread" )
if err := this . retryOperation ( this . applier . StopReplication ) ; err != nil {
return err
}
2016-06-10 09:15:11 +00:00
}
// We're merly testing, we don't want to keep this state. Rollback the renames as possible
defer this . applier . RenameTablesRollback ( )
2016-07-26 10:06:20 +00:00
// We further proceed to do the cutover by normal means; the 'defer' above will rollback the swap
2016-04-22 20:18:56 +00:00
}
2016-06-27 09:08:06 +00:00
if this . migrationContext . CutOverType == base . CutOverAtomic {
// Atomic solution: we use low timeout and multiple attempts. But for
// each failed attempt, we throttle until replication lag is back to normal
err := this . retryOperation (
func ( ) error {
return this . executeAndThrottleOnError ( this . atomicCutOver )
} ,
)
return err
}
2016-06-06 10:33:05 +00:00
if this . migrationContext . CutOverType == base . CutOverTwoStep {
Solved cut-over stall; change of table names
- Cutover would stall after `lock tables` wait-timeout due do waiting on a channel that would never be written to. This has been identified, reproduced, fixed, confirmed.
- Change of table names. Heres the story:
- Because were testing this even while `pt-online-schema-change` is being used in production, the `_tbl_old` naming convention makes for a collision.
- "old" table name is now `_tbl_del`, "del" standing for "delete"
- ghost table name is now `_tbl_gho`
- when issuing `--test-on-replica`, we keep the ghost table around, and were also briefly renaming original table to "old". Well this collides with a potentially existing "old" table on master (one that hasnt been dropped yet).
`--test-on-replica` uses `_tbl_ght` (ghost-test)
- similar problem with `--execute-on-replica`, and in this case the table doesnt stick around; calling it `_tbl_ghr` (ghost-replica)
- changelog table is now `_tbl_ghc` (ghost-changelog)
- To clarify, I dont want to go down the path of creating "old" tables with 2 or 3 or 4 or 5 or infinite leading underscored. I think this is very confusing and actually not operations friendly. Its OK that the migration will fail saying "hey, you ALREADY have an old table here, why dont you take care of it first", rather than create _yet_another_ `____tbl_old` table. Were always confused on which table it actually is that gets migrated, which is safe to `drop`, etc.
- just after rowcopy completing, just before cutover, during cutover: marking as point in time _of interest_ so as to increase logging frequency.
2016-06-21 10:56:01 +00:00
err := this . retryOperation (
func ( ) error {
return this . executeAndThrottleOnError ( this . cutOverTwoStep )
} ,
)
2016-06-14 07:00:56 +00:00
return err
2016-05-16 09:03:15 +00:00
}
Solved cut-over stall; change of table names
- Cutover would stall after `lock tables` wait-timeout due do waiting on a channel that would never be written to. This has been identified, reproduced, fixed, confirmed.
- Change of table names. Heres the story:
- Because were testing this even while `pt-online-schema-change` is being used in production, the `_tbl_old` naming convention makes for a collision.
- "old" table name is now `_tbl_del`, "del" standing for "delete"
- ghost table name is now `_tbl_gho`
- when issuing `--test-on-replica`, we keep the ghost table around, and were also briefly renaming original table to "old". Well this collides with a potentially existing "old" table on master (one that hasnt been dropped yet).
`--test-on-replica` uses `_tbl_ght` (ghost-test)
- similar problem with `--execute-on-replica`, and in this case the table doesnt stick around; calling it `_tbl_ghr` (ghost-replica)
- changelog table is now `_tbl_ghc` (ghost-changelog)
- To clarify, I dont want to go down the path of creating "old" tables with 2 or 3 or 4 or 5 or infinite leading underscored. I think this is very confusing and actually not operations friendly. Its OK that the migration will fail saying "hey, you ALREADY have an old table here, why dont you take care of it first", rather than create _yet_another_ `____tbl_old` table. Were always confused on which table it actually is that gets migrated, which is safe to `drop`, etc.
- just after rowcopy completing, just before cutover, during cutover: marking as point in time _of interest_ so as to increase logging frequency.
2016-06-21 10:56:01 +00:00
return log . Fatalf ( "Unknown cut-over type: %d; should never get here!" , this . migrationContext . CutOverType )
2016-04-22 20:18:56 +00:00
}
2016-05-16 09:03:15 +00:00
// Inject the "AllEventsUpToLockProcessed" state hint, wait for it to appear in the binary logs,
// make sure the queue is drained.
func ( this * Migrator ) waitForEventsUpToLock ( ) ( err error ) {
Solved cut-over stall; change of table names
- Cutover would stall after `lock tables` wait-timeout due do waiting on a channel that would never be written to. This has been identified, reproduced, fixed, confirmed.
- Change of table names. Heres the story:
- Because were testing this even while `pt-online-schema-change` is being used in production, the `_tbl_old` naming convention makes for a collision.
- "old" table name is now `_tbl_del`, "del" standing for "delete"
- ghost table name is now `_tbl_gho`
- when issuing `--test-on-replica`, we keep the ghost table around, and were also briefly renaming original table to "old". Well this collides with a potentially existing "old" table on master (one that hasnt been dropped yet).
`--test-on-replica` uses `_tbl_ght` (ghost-test)
- similar problem with `--execute-on-replica`, and in this case the table doesnt stick around; calling it `_tbl_ghr` (ghost-replica)
- changelog table is now `_tbl_ghc` (ghost-changelog)
- To clarify, I dont want to go down the path of creating "old" tables with 2 or 3 or 4 or 5 or infinite leading underscored. I think this is very confusing and actually not operations friendly. Its OK that the migration will fail saying "hey, you ALREADY have an old table here, why dont you take care of it first", rather than create _yet_another_ `____tbl_old` table. Were always confused on which table it actually is that gets migrated, which is safe to `drop`, etc.
- just after rowcopy completing, just before cutover, during cutover: marking as point in time _of interest_ so as to increase logging frequency.
2016-06-21 10:56:01 +00:00
this . migrationContext . MarkPointOfInterest ( )
2016-06-14 06:35:07 +00:00
waitForEventsUpToLockStartTime := time . Now ( )
2016-05-19 13:11:36 +00:00
log . Infof ( "Writing changelog state: %+v" , AllEventsUpToLockProcessed )
2016-05-16 09:03:15 +00:00
if _ , err := this . applier . WriteChangelogState ( string ( AllEventsUpToLockProcessed ) ) ; err != nil {
return err
}
2016-05-19 13:11:36 +00:00
log . Infof ( "Waiting for events up to lock" )
2016-05-17 12:40:37 +00:00
atomic . StoreInt64 ( & this . allEventsUpToLockProcessedInjectedFlag , 1 )
2016-05-16 09:03:15 +00:00
<- this . allEventsUpToLockProcessed
2016-08-02 12:38:56 +00:00
waitForEventsUpToLockDuration := time . Since ( waitForEventsUpToLockStartTime )
2016-06-14 06:35:07 +00:00
log . Infof ( "Done waiting for events up to lock; duration=%+v" , waitForEventsUpToLockDuration )
2016-08-19 07:16:17 +00:00
this . printStatus ( ForcePrintStatusAndHintRule )
2016-05-16 09:03:15 +00:00
return nil
}
2016-06-14 07:00:56 +00:00
// cutOverTwoStep will lock down the original table, execute
2016-05-01 18:36:36 +00:00
// what's left of last DML entries, and **non-atomically** swap original->old, then new->original.
// There is a point in time where the "original" table does not exist and queries are non-blocked
// and failing.
2016-06-14 07:00:56 +00:00
func ( this * Migrator ) cutOverTwoStep ( ) ( err error ) {
2016-06-21 07:21:58 +00:00
atomic . StoreInt64 ( & this . inCutOverCriticalActionFlag , 1 )
defer atomic . StoreInt64 ( & this . inCutOverCriticalActionFlag , 0 )
2016-07-26 10:06:20 +00:00
atomic . StoreInt64 ( & this . allEventsUpToLockProcessedInjectedFlag , 0 )
2016-06-21 07:21:58 +00:00
2016-06-14 06:35:07 +00:00
if err := this . retryOperation ( this . applier . LockOriginalTable ) ; err != nil {
2016-04-22 20:18:56 +00:00
return err
}
2016-05-16 09:03:15 +00:00
if err := this . retryOperation ( this . waitForEventsUpToLock ) ; err != nil {
return err
}
2016-04-22 20:41:20 +00:00
if err := this . retryOperation ( this . applier . SwapTablesQuickAndBumpy ) ; err != nil {
2016-04-22 20:18:56 +00:00
return err
}
if err := this . retryOperation ( this . applier . UnlockTables ) ; err != nil {
return err
}
lockAndRenameDuration := this . migrationContext . RenameTablesEndTime . Sub ( this . migrationContext . LockTablesStartTime )
renameDuration := this . migrationContext . RenameTablesEndTime . Sub ( this . migrationContext . RenameTablesStartTime )
log . Debugf ( "Lock & rename duration: %s (rename only: %s). During this time, queries on %s were locked or failing" , lockAndRenameDuration , renameDuration , sql . EscapeName ( this . migrationContext . OriginalTableName ) )
2016-04-18 17:57:18 +00:00
return nil
}
2016-06-27 09:08:06 +00:00
// atomicCutOver
func ( this * Migrator ) atomicCutOver ( ) ( err error ) {
atomic . StoreInt64 ( & this . inCutOverCriticalActionFlag , 1 )
defer atomic . StoreInt64 ( & this . inCutOverCriticalActionFlag , 0 )
defer func ( ) {
this . applier . DropAtomicCutOverSentryTableIfExists ( )
} ( )
2016-07-26 10:06:20 +00:00
atomic . StoreInt64 ( & this . allEventsUpToLockProcessedInjectedFlag , 0 )
2016-06-27 09:08:06 +00:00
lockOriginalSessionIdChan := make ( chan int64 , 2 )
tableLocked := make ( chan error , 2 )
okToUnlockTable := make ( chan bool , 3 )
tableUnlocked := make ( chan error , 2 )
go func ( ) {
if err := this . applier . AtomicCutOverMagicLock ( lockOriginalSessionIdChan , tableLocked , okToUnlockTable , tableUnlocked ) ; err != nil {
log . Errore ( err )
}
} ( )
if err := <- tableLocked ; err != nil {
return log . Errore ( err )
}
lockOriginalSessionId := <- lockOriginalSessionIdChan
log . Infof ( "Session locking original & magic tables is %+v" , lockOriginalSessionId )
// At this point we know the original table is locked.
// We know any newly incoming DML on original table is blocked.
this . waitForEventsUpToLock ( )
// Step 2
// We now attempt an atomic RENAME on original & ghost tables, and expect it to block.
this . migrationContext . RenameTablesStartTime = time . Now ( )
var tableRenameKnownToHaveFailed int64
renameSessionIdChan := make ( chan int64 , 2 )
tablesRenamed := make ( chan error , 2 )
go func ( ) {
if err := this . applier . AtomicCutoverRename ( renameSessionIdChan , tablesRenamed ) ; err != nil {
// Abort! Release the lock
atomic . StoreInt64 ( & tableRenameKnownToHaveFailed , 1 )
okToUnlockTable <- true
}
} ( )
renameSessionId := <- renameSessionIdChan
log . Infof ( "Session renaming tables is %+v" , renameSessionId )
waitForRename := func ( ) error {
if atomic . LoadInt64 ( & tableRenameKnownToHaveFailed ) == 1 {
// We return `nil` here so as to avoid the `retry`. The RENAME has failed,
// it won't show up in PROCESSLIST, no point in waiting
return nil
}
return this . applier . ExpectProcess ( renameSessionId , "metadata lock" , "rename" )
}
// Wait for the RENAME to appear in PROCESSLIST
if err := this . retryOperation ( waitForRename , true ) ; err != nil {
// Abort! Release the lock
okToUnlockTable <- true
return err
}
if atomic . LoadInt64 ( & tableRenameKnownToHaveFailed ) == 0 {
log . Infof ( "Found atomic RENAME to be blocking, as expected. Double checking the lock is still in place (though I don't strictly have to)" )
}
if err := this . applier . ExpectUsedLock ( lockOriginalSessionId ) ; err != nil {
// Abort operation. Just make sure to drop the magic table.
return log . Errore ( err )
}
log . Infof ( "Connection holding lock on original table still exists" )
// Now that we've found the RENAME blocking, AND the locking connection still alive,
// we know it is safe to proceed to release the lock
okToUnlockTable <- true
// BAM! magic table dropped, original table lock is released
// -> RENAME released -> queries on original are unblocked.
if err := <- tableUnlocked ; err != nil {
return log . Errore ( err )
}
if err := <- tablesRenamed ; err != nil {
return log . Errore ( err )
}
this . migrationContext . RenameTablesEndTime = time . Now ( )
// ooh nice! We're actually truly and thankfully done
lockAndRenameDuration := this . migrationContext . RenameTablesEndTime . Sub ( this . migrationContext . LockTablesStartTime )
2016-06-14 06:35:07 +00:00
log . Infof ( "Lock & rename duration: %s. During this time, queries on %s were blocked" , lockAndRenameDuration , sql . EscapeName ( this . migrationContext . OriginalTableName ) )
return nil
}
2016-06-19 15:55:37 +00:00
// onServerCommand responds to a user's interactive command
2016-06-07 09:59:17 +00:00
func ( this * Migrator ) onServerCommand ( command string , writer * bufio . Writer ) ( err error ) {
2016-06-09 09:25:01 +00:00
defer writer . Flush ( )
tokens := strings . SplitN ( command , "=" , 2 )
2016-06-07 09:59:17 +00:00
command = strings . TrimSpace ( tokens [ 0 ] )
arg := ""
if len ( tokens ) > 1 {
arg = strings . TrimSpace ( tokens [ 1 ] )
}
2016-06-17 09:40:08 +00:00
2016-07-27 08:45:22 +00:00
throttleHint := "# Note: you may only throttle for as long as your binary logs are not purged\n"
2016-08-19 12:52:49 +00:00
if err := this . hooksExecutor . onInteractiveCommand ( command ) ; err != nil {
return err
}
2016-06-07 09:59:17 +00:00
switch command {
case "help" :
{
fmt . Fprintln ( writer , ` available commands :
2016-08-19 07:16:17 +00:00
status # Print a detailed status message
sup # Print a short status message
2016-06-20 10:09:04 +00:00
chunk - size = < newsize > # Set a new chunk - size
2016-08-18 11:13:51 +00:00
nice - ratio = < ratio > # Set a new nice - ratio , immediate sleep after each row - copy operation , float ( examples : 0 is agrressive , 0.7 adds 70 % runtime , 1.0 doubles runtime , 2.0 triples runtime , ... )
2016-06-20 10:09:04 +00:00
critical - load = < load > # Set a new set of max - load thresholds
2016-07-13 07:44:00 +00:00
max - lag - millis = < max - lag > # Set a new replication lag threshold
2016-07-26 12:14:25 +00:00
replication - lag - query = < query > # Set a new query that determines replication lag ( no quotes )
2016-06-20 10:09:04 +00:00
max - load = < load > # Set a new set of max - load thresholds
2016-07-26 12:14:25 +00:00
throttle - query = < query > # Set a new throttle - query ( no quotes )
throttle - control - replicas = < replicas > # Set a new comma delimited list of throttle control replicas
2016-06-20 10:09:04 +00:00
throttle # Force throttling
no - throttle # End forced throttling ( other throttling may still apply )
2016-07-01 08:59:09 +00:00
unpostpone # Bail out a cut - over postpone ; proceed to cut - over
2016-06-20 10:09:04 +00:00
panic # panic and quit without cleanup
help # This message
2016-06-07 09:59:17 +00:00
` )
}
2016-08-19 07:16:17 +00:00
case "sup" :
this . printStatus ( ForcePrintStatusOnlyRule , writer )
2016-06-07 09:59:17 +00:00
case "info" , "status" :
2016-08-19 07:16:17 +00:00
this . printStatus ( ForcePrintStatusAndHintRule , writer )
2016-06-07 09:59:17 +00:00
case "chunk-size" :
{
if chunkSize , err := strconv . Atoi ( arg ) ; err != nil {
2016-06-09 09:25:01 +00:00
fmt . Fprintf ( writer , "%s\n" , err . Error ( ) )
2016-06-07 09:59:17 +00:00
return log . Errore ( err )
} else {
this . migrationContext . SetChunkSize ( int64 ( chunkSize ) )
2016-08-19 07:16:17 +00:00
this . printStatus ( ForcePrintStatusAndHintRule , writer )
2016-06-07 09:59:17 +00:00
}
}
2016-07-13 07:44:00 +00:00
case "max-lag-millis" :
{
if maxLagMillis , err := strconv . Atoi ( arg ) ; err != nil {
fmt . Fprintf ( writer , "%s\n" , err . Error ( ) )
return log . Errore ( err )
} else {
this . migrationContext . SetMaxLagMillisecondsThrottleThreshold ( int64 ( maxLagMillis ) )
2016-08-19 07:16:17 +00:00
this . printStatus ( ForcePrintStatusAndHintRule , writer )
2016-07-13 07:44:00 +00:00
}
}
2016-07-26 12:14:25 +00:00
case "replication-lag-query" :
{
this . migrationContext . SetReplicationLagQuery ( arg )
2016-08-19 07:16:17 +00:00
this . printStatus ( ForcePrintStatusAndHintRule , writer )
2016-07-26 12:14:25 +00:00
}
2016-07-04 12:29:09 +00:00
case "nice-ratio" :
{
2016-07-28 12:37:17 +00:00
if niceRatio , err := strconv . ParseFloat ( arg , 64 ) ; err != nil {
2016-07-04 12:29:09 +00:00
fmt . Fprintf ( writer , "%s\n" , err . Error ( ) )
return log . Errore ( err )
} else {
2016-07-28 12:37:17 +00:00
this . migrationContext . SetNiceRatio ( niceRatio )
2016-08-19 07:16:17 +00:00
this . printStatus ( ForcePrintStatusAndHintRule , writer )
2016-07-04 12:29:09 +00:00
}
}
2016-06-09 09:25:01 +00:00
case "max-load" :
{
if err := this . migrationContext . ReadMaxLoad ( arg ) ; err != nil {
fmt . Fprintf ( writer , "%s\n" , err . Error ( ) )
return log . Errore ( err )
}
2016-08-19 07:16:17 +00:00
this . printStatus ( ForcePrintStatusAndHintRule , writer )
2016-06-18 19:12:07 +00:00
}
case "critical-load" :
{
if err := this . migrationContext . ReadCriticalLoad ( arg ) ; err != nil {
fmt . Fprintf ( writer , "%s\n" , err . Error ( ) )
return log . Errore ( err )
}
2016-08-19 07:16:17 +00:00
this . printStatus ( ForcePrintStatusAndHintRule , writer )
2016-06-18 19:12:07 +00:00
}
case "throttle-query" :
{
this . migrationContext . SetThrottleQuery ( arg )
2016-07-27 08:45:22 +00:00
fmt . Fprintf ( writer , throttleHint )
2016-08-19 07:16:17 +00:00
this . printStatus ( ForcePrintStatusAndHintRule , writer )
2016-06-09 09:25:01 +00:00
}
2016-06-20 10:09:04 +00:00
case "throttle-control-replicas" :
{
if err := this . migrationContext . ReadThrottleControlReplicaKeys ( arg ) ; err != nil {
fmt . Fprintf ( writer , "%s\n" , err . Error ( ) )
return log . Errore ( err )
}
fmt . Fprintf ( writer , "%s\n" , this . migrationContext . GetThrottleControlReplicaKeys ( ) . ToCommaDelimitedList ( ) )
2016-08-19 07:16:17 +00:00
this . printStatus ( ForcePrintStatusAndHintRule , writer )
2016-06-20 10:09:04 +00:00
}
2016-06-07 09:59:17 +00:00
case "throttle" , "pause" , "suspend" :
{
atomic . StoreInt64 ( & this . migrationContext . ThrottleCommandedByUser , 1 )
2016-07-27 08:45:22 +00:00
fmt . Fprintf ( writer , throttleHint )
2016-08-19 07:16:17 +00:00
this . printStatus ( ForcePrintStatusAndHintRule , writer )
2016-06-07 09:59:17 +00:00
}
case "no-throttle" , "unthrottle" , "resume" , "continue" :
{
atomic . StoreInt64 ( & this . migrationContext . ThrottleCommandedByUser , 0 )
}
2016-07-01 08:59:09 +00:00
case "unpostpone" , "no-postpone" , "cut-over" :
{
if atomic . LoadInt64 ( & this . migrationContext . IsPostponingCutOver ) > 0 {
atomic . StoreInt64 ( & this . userCommandedUnpostponeFlag , 1 )
fmt . Fprintf ( writer , "Unpostponed\n" )
} else {
fmt . Fprintf ( writer , "You may only invoke this when gh-ost is actively postponing migration. At this time it is not.\n" )
}
}
2016-06-20 04:38:29 +00:00
case "panic" :
{
err := fmt . Errorf ( "User commanded 'panic'. I will now panic, without cleanup. PANIC!" )
fmt . Fprintf ( writer , "%s\n" , err . Error ( ) )
this . panicAbort <- err
}
2016-06-07 09:59:17 +00:00
default :
2016-06-09 09:25:01 +00:00
err = fmt . Errorf ( "Unknown command: %s" , command )
fmt . Fprintf ( writer , "%s\n" , err . Error ( ) )
return err
2016-06-07 09:59:17 +00:00
}
return nil
}
2016-06-19 15:55:37 +00:00
// initiateServer begins listening on unix socket/tcp for incoming interactive commands
2016-06-07 09:59:17 +00:00
func ( this * Migrator ) initiateServer ( ) ( err error ) {
this . server = NewServer ( this . onServerCommand )
if err := this . server . BindSocketFile ( ) ; err != nil {
return err
}
if err := this . server . BindTCPPort ( ) ; err != nil {
return err
}
go this . server . Serve ( )
return nil
}
2016-06-19 15:55:37 +00:00
// initiateInspector connects, validates and inspects the "inspector" server.
// The "inspector" server is typically a replica; it is where we issue some
// queries such as:
// - table row count
// - schema validation
// - heartbeat
// When `--allow-on-master` is supplied, the inspector is actually the master.
2016-04-18 17:57:18 +00:00
func ( this * Migrator ) initiateInspector ( ) ( err error ) {
this . inspector = NewInspector ( )
if err := this . inspector . InitDBConnections ( ) ; err != nil {
return err
}
if err := this . inspector . ValidateOriginalTable ( ) ; err != nil {
return err
}
if err := this . inspector . InspectOriginalTable ( ) ; err != nil {
return err
}
// So far so good, table is accessible and valid.
// Let's get master connection config
if this . migrationContext . ApplierConnectionConfig , err = this . inspector . getMasterConnectionConfig ( ) ; err != nil {
return err
}
2016-06-15 10:18:59 +00:00
if this . migrationContext . TestOnReplica || this . migrationContext . MigrateOnReplica {
2016-04-18 17:57:18 +00:00
if this . migrationContext . InspectorIsAlsoApplier ( ) {
2016-06-15 10:18:59 +00:00
return fmt . Errorf ( "Instructed to --test-on-replica or --migrate-on-replica, but the server we connect to doesn't seem to be a replica" )
2016-04-18 17:57:18 +00:00
}
2016-06-15 10:18:59 +00:00
log . Infof ( "--test-on-replica or --migrate-on-replica given. Will not execute on master %+v but rather on replica %+v itself" ,
2016-06-19 15:55:37 +00:00
* this . migrationContext . ApplierConnectionConfig . ImpliedKey , * this . migrationContext . InspectorConnectionConfig . ImpliedKey ,
2016-04-18 17:57:18 +00:00
)
this . migrationContext . ApplierConnectionConfig = this . migrationContext . InspectorConnectionConfig . Duplicate ( )
2016-07-26 12:14:25 +00:00
if this . migrationContext . GetThrottleControlReplicaKeys ( ) . Len ( ) == 0 {
this . migrationContext . AddThrottleControlReplicaKey ( this . migrationContext . InspectorConnectionConfig . Key )
2016-05-01 18:36:36 +00:00
}
2016-04-18 17:57:18 +00:00
} else if this . migrationContext . InspectorIsAlsoApplier ( ) && ! this . migrationContext . AllowedRunningOnMaster {
return fmt . Errorf ( "It seems like this migration attempt to run directly on master. Preferably it would be executed on a replica (and this reduces load from the master). To proceed please provide --allow-on-master" )
}
2016-08-11 15:37:50 +00:00
if err := this . inspector . validateLogSlaveUpdates ( ) ; err != nil {
2016-08-11 12:49:14 +00:00
return err
}
2016-04-18 17:57:18 +00:00
2016-06-19 15:55:37 +00:00
log . Infof ( "Master found to be %+v" , * this . migrationContext . ApplierConnectionConfig . ImpliedKey )
2016-04-08 08:34:44 +00:00
return nil
}
2016-06-19 15:55:37 +00:00
// initiateStatus sets and activates the printStatus() ticker
2016-04-08 08:34:44 +00:00
func ( this * Migrator ) initiateStatus ( ) error {
2016-08-19 07:16:17 +00:00
this . printStatus ( ForcePrintStatusAndHintRule )
2016-04-08 08:34:44 +00:00
statusTick := time . Tick ( 1 * time . Second )
for range statusTick {
2016-06-18 19:12:07 +00:00
go this . printStatus ( HeuristicPrintStatusRule )
2016-04-08 08:34:44 +00:00
}
return nil
}
2016-06-19 15:55:37 +00:00
// printMigrationStatusHint prints a detailed configuration dump, that is useful
// to keep in mind; such as the name of migrated table, throttle params etc.
// This gets printed at beginning and end of migration, every 10 minutes throughout
// migration, and as reponse to the "status" interactive command.
2016-06-07 09:59:17 +00:00
func ( this * Migrator ) printMigrationStatusHint ( writers ... io . Writer ) {
w := io . MultiWriter ( writers ... )
fmt . Fprintln ( w , fmt . Sprintf ( "# Migrating %s.%s; Ghost table is %s.%s" ,
2016-06-01 08:40:49 +00:00
sql . EscapeName ( this . migrationContext . DatabaseName ) ,
sql . EscapeName ( this . migrationContext . OriginalTableName ) ,
sql . EscapeName ( this . migrationContext . DatabaseName ) ,
sql . EscapeName ( this . migrationContext . GetGhostTableName ( ) ) ,
2016-06-06 10:33:05 +00:00
) )
2016-06-19 15:55:37 +00:00
fmt . Fprintln ( w , fmt . Sprintf ( "# Migrating %+v; inspecting %+v; executing on %+v" ,
* this . applier . connectionConfig . ImpliedKey ,
* this . inspector . connectionConfig . ImpliedKey ,
2016-08-23 09:35:48 +00:00
this . migrationContext . Hostname ,
2016-06-19 15:55:37 +00:00
) )
2016-06-07 09:59:17 +00:00
fmt . Fprintln ( w , fmt . Sprintf ( "# Migration started at %+v" ,
2016-06-01 08:40:49 +00:00
this . migrationContext . StartTime . Format ( time . RubyDate ) ,
2016-06-06 10:33:05 +00:00
) )
2016-06-09 09:25:01 +00:00
maxLoad := this . migrationContext . GetMaxLoad ( )
2016-06-18 19:12:07 +00:00
criticalLoad := this . migrationContext . GetCriticalLoad ( )
2016-07-28 12:37:17 +00:00
fmt . Fprintln ( w , fmt . Sprintf ( "# chunk-size: %+v; max-lag-millis: %+vms; max-load: %s; critical-load: %s; nice-ratio: %f" ,
2016-06-06 10:33:05 +00:00
atomic . LoadInt64 ( & this . migrationContext . ChunkSize ) ,
atomic . LoadInt64 ( & this . migrationContext . MaxLagMillisecondsThrottleThreshold ) ,
2016-06-18 19:12:07 +00:00
maxLoad . String ( ) ,
criticalLoad . String ( ) ,
2016-07-28 12:37:17 +00:00
this . migrationContext . GetNiceRatio ( ) ,
2016-06-06 10:33:05 +00:00
) )
2016-07-26 12:14:25 +00:00
if replicationLagQuery := this . migrationContext . GetReplicationLagQuery ( ) ; replicationLagQuery != "" {
2016-07-29 07:20:00 +00:00
fmt . Fprintln ( w , fmt . Sprintf ( "# replication-lag-query: %+v" ,
2016-07-26 12:14:25 +00:00
replicationLagQuery ,
) )
}
2016-06-06 10:33:05 +00:00
if this . migrationContext . ThrottleFlagFile != "" {
2016-07-27 08:36:24 +00:00
setIndicator := ""
if base . FileExists ( this . migrationContext . ThrottleFlagFile ) {
setIndicator = "[set]"
}
2016-07-29 07:20:00 +00:00
fmt . Fprintln ( w , fmt . Sprintf ( "# throttle-flag-file: %+v %+v" ,
2016-07-27 08:36:24 +00:00
this . migrationContext . ThrottleFlagFile , setIndicator ,
2016-06-06 10:33:05 +00:00
) )
}
if this . migrationContext . ThrottleAdditionalFlagFile != "" {
2016-07-27 08:36:24 +00:00
setIndicator := ""
if base . FileExists ( this . migrationContext . ThrottleAdditionalFlagFile ) {
setIndicator = "[set]"
}
2016-07-29 07:20:00 +00:00
fmt . Fprintln ( w , fmt . Sprintf ( "# throttle-additional-flag-file: %+v %+v" ,
2016-07-27 08:36:24 +00:00
this . migrationContext . ThrottleAdditionalFlagFile , setIndicator ,
2016-06-06 10:33:05 +00:00
) )
}
2016-06-18 19:12:07 +00:00
if throttleQuery := this . migrationContext . GetThrottleQuery ( ) ; throttleQuery != "" {
2016-07-29 07:20:00 +00:00
fmt . Fprintln ( w , fmt . Sprintf ( "# throttle-query: %+v" ,
2016-06-18 19:12:07 +00:00
throttleQuery ,
) )
}
2016-06-17 09:40:08 +00:00
if this . migrationContext . PostponeCutOverFlagFile != "" {
2016-07-27 08:36:24 +00:00
setIndicator := ""
if base . FileExists ( this . migrationContext . PostponeCutOverFlagFile ) {
setIndicator = "[set]"
}
2016-07-29 07:20:00 +00:00
fmt . Fprintln ( w , fmt . Sprintf ( "# postpone-cut-over-flag-file: %+v %+v" ,
2016-07-27 08:36:24 +00:00
this . migrationContext . PostponeCutOverFlagFile , setIndicator ,
2016-06-17 09:40:08 +00:00
) )
}
if this . migrationContext . PanicFlagFile != "" {
2016-07-29 07:20:00 +00:00
fmt . Fprintln ( w , fmt . Sprintf ( "# panic-flag-file: %+v" ,
2016-06-17 09:40:08 +00:00
this . migrationContext . PanicFlagFile ,
) )
}
2016-06-07 09:59:17 +00:00
fmt . Fprintln ( w , fmt . Sprintf ( "# Serving on unix socket: %+v" ,
this . migrationContext . ServeSocketFile ,
) )
if this . migrationContext . ServeTCPPort != 0 {
fmt . Fprintln ( w , fmt . Sprintf ( "# Serving on TCP port: %+v" , this . migrationContext . ServeTCPPort ) )
}
2016-06-01 08:40:49 +00:00
}
2016-06-19 15:55:37 +00:00
// printStatus prints the prgoress status, and optionally additionally detailed
// dump of configuration.
// `rule` indicates the type of output expected.
// By default the status is written to standard output, but other writers can
// be used as well.
2016-06-18 19:12:07 +00:00
func ( this * Migrator ) printStatus ( rule PrintStatusRule , writers ... io . Writer ) {
writers = append ( writers , os . Stdout )
2016-04-08 08:34:44 +00:00
elapsedTime := this . migrationContext . ElapsedTime ( )
elapsedSeconds := int64 ( elapsedTime . Seconds ( ) )
totalRowsCopied := this . migrationContext . GetTotalRowsCopied ( )
2016-08-24 10:16:34 +00:00
rowsEstimate := atomic . LoadInt64 ( & this . migrationContext . RowsEstimate ) + atomic . LoadInt64 ( & this . migrationContext . RowsDeltaEstimate )
2016-05-19 13:11:36 +00:00
var progressPct float64
2016-08-18 11:20:09 +00:00
if rowsEstimate == 0 {
progressPct = 100.0
} else {
2016-05-19 13:11:36 +00:00
progressPct = 100.0 * float64 ( totalRowsCopied ) / float64 ( rowsEstimate )
2016-04-08 08:34:44 +00:00
}
2016-05-26 12:25:32 +00:00
// Before status, let's see if we should print a nice reminder for what exactly we're doing here.
2016-06-01 08:40:49 +00:00
shouldPrintMigrationStatusHint := ( elapsedSeconds % 600 == 0 )
2016-08-19 07:16:17 +00:00
if rule == ForcePrintStatusAndHintRule {
2016-06-18 19:12:07 +00:00
shouldPrintMigrationStatusHint = true
}
2016-08-19 07:16:17 +00:00
if rule == ForcePrintStatusOnlyRule {
shouldPrintMigrationStatusHint = false
}
2016-06-01 08:40:49 +00:00
if shouldPrintMigrationStatusHint {
2016-06-18 19:12:07 +00:00
this . printMigrationStatusHint ( writers ... )
2016-05-26 12:25:32 +00:00
}
var etaSeconds float64 = math . MaxFloat64
2016-04-08 12:35:06 +00:00
eta := "N/A"
2016-08-24 09:39:44 +00:00
if atomic . LoadInt64 ( & this . migrationContext . CountingRowsFlag ) > 0 && ! this . migrationContext . ConcurrentCountTableRows {
2016-07-29 08:40:23 +00:00
eta = "counting rows"
} else if atomic . LoadInt64 ( & this . migrationContext . IsPostponingCutOver ) > 0 {
2016-06-13 16:36:29 +00:00
eta = "postponing cut-over"
} else if isThrottled , throttleReason := this . migrationContext . IsThrottled ( ) ; isThrottled {
2016-04-11 15:27:16 +00:00
eta = fmt . Sprintf ( "throttled, %s" , throttleReason )
2016-05-05 06:18:19 +00:00
} else if progressPct > 100.0 {
eta = "Due"
2016-05-19 13:11:36 +00:00
} else if progressPct >= 1.0 {
2016-05-05 06:18:19 +00:00
elapsedRowCopySeconds := this . migrationContext . ElapsedRowCopyTime ( ) . Seconds ( )
totalExpectedSeconds := elapsedRowCopySeconds * float64 ( rowsEstimate ) / float64 ( totalRowsCopied )
2016-05-19 13:11:36 +00:00
etaSeconds = totalExpectedSeconds - elapsedRowCopySeconds
if etaSeconds >= 0 {
etaDuration := time . Duration ( etaSeconds ) * time . Second
2016-05-05 06:18:19 +00:00
eta = base . PrettifyDurationOutput ( etaDuration )
} else {
eta = "Due"
}
2016-04-08 12:35:06 +00:00
}
2016-05-19 13:11:36 +00:00
shouldPrintStatus := false
2016-08-19 07:41:25 +00:00
if rule == HeuristicPrintStatusRule {
if elapsedSeconds <= 60 {
shouldPrintStatus = true
} else if etaSeconds <= 60 {
shouldPrintStatus = true
} else if etaSeconds <= 180 {
shouldPrintStatus = ( elapsedSeconds % 5 == 0 )
} else if elapsedSeconds <= 180 {
shouldPrintStatus = ( elapsedSeconds % 5 == 0 )
} else if this . migrationContext . TimeSincePointOfInterest ( ) . Seconds ( ) <= 60 {
shouldPrintStatus = ( elapsedSeconds % 5 == 0 )
} else {
shouldPrintStatus = ( elapsedSeconds % 30 == 0 )
}
2016-05-19 13:11:36 +00:00
} else {
2016-08-19 07:41:25 +00:00
// Not heuristic
2016-06-18 19:12:07 +00:00
shouldPrintStatus = true
}
2016-05-19 13:11:36 +00:00
if ! shouldPrintStatus {
return
}
currentBinlogCoordinates := * this . eventsStreamer . GetCurrentBinlogCoordinates ( )
2016-06-19 15:55:37 +00:00
status := fmt . Sprintf ( "Copy: %d/%d %.1f%%; Applied: %d; Backlog: %d/%d; Time: %+v(total), %+v(copy); streamer: %+v; ETA: %s" ,
2016-04-08 08:34:44 +00:00
totalRowsCopied , rowsEstimate , progressPct ,
2016-04-19 11:25:32 +00:00
atomic . LoadInt64 ( & this . migrationContext . TotalDMLEventsApplied ) ,
2016-04-08 08:34:44 +00:00
len ( this . applyEventsQueue ) , cap ( this . applyEventsQueue ) ,
2016-06-19 15:55:37 +00:00
base . PrettifyDurationOutput ( elapsedTime ) , base . PrettifyDurationOutput ( this . migrationContext . ElapsedRowCopyTime ( ) ) ,
2016-05-19 13:11:36 +00:00
currentBinlogCoordinates ,
2016-04-08 12:35:06 +00:00
eta ,
)
this . applier . WriteChangelog (
fmt . Sprintf ( "copy iteration %d at %d" , this . migrationContext . GetIteration ( ) , time . Now ( ) . Unix ( ) ) ,
status ,
)
2016-06-07 09:59:17 +00:00
w := io . MultiWriter ( writers ... )
fmt . Fprintln ( w , status )
2016-08-22 14:24:41 +00:00
if elapsedSeconds % 60 == 0 {
2016-08-29 08:44:43 +00:00
this . hooksExecutor . onStatus ( status )
2016-08-22 14:24:41 +00:00
}
2016-04-08 08:34:44 +00:00
}
2016-08-30 07:41:59 +00:00
// initiateHeartbeatReader listens for heartbeat events. gh-ost implements its own
2016-06-19 15:55:37 +00:00
// heartbeat mechanism, whether your DB has or hasn't an existing heartbeat solution.
// Heartbeat is supplied via the changelog table
2016-08-30 07:41:59 +00:00
func ( this * Migrator ) initiateHeartbeatReader ( ) {
ticker := time . Tick ( time . Duration ( this . migrationContext . HeartbeatIntervalMilliseconds ) * time . Millisecond )
2016-04-14 11:37:56 +00:00
for range ticker {
go func ( ) error {
2016-06-06 10:33:05 +00:00
if atomic . LoadInt64 ( & this . cleanupImminentFlag ) > 0 {
return nil
}
2016-04-14 11:37:56 +00:00
changelogState , err := this . inspector . readChangelogState ( )
if err != nil {
return log . Errore ( err )
}
2016-08-30 07:41:59 +00:00
if heartbeatValue , ok := changelogState [ "heartbeat" ] ; ok {
this . parseChangelogHeartbeat ( heartbeatValue )
2016-04-14 11:37:56 +00:00
}
return nil
} ( )
}
}
2016-08-30 07:41:59 +00:00
// initiateControlReplicasReader
func ( this * Migrator ) initiateControlReplicasReader ( ) {
readControlReplicasLag := func ( replicationLagQuery string ) error {
if ( this . migrationContext . TestOnReplica || this . migrationContext . MigrateOnReplica ) && ( atomic . LoadInt64 ( & this . allEventsUpToLockProcessedInjectedFlag ) > 0 ) {
return nil
}
lagResult := mysql . GetMaxReplicationLag ( this . migrationContext . InspectorConnectionConfig , this . migrationContext . GetThrottleControlReplicaKeys ( ) , replicationLagQuery )
this . migrationContext . SetControlReplicasLagResult ( lagResult )
return nil
}
aggressiveTicker := time . Tick ( 100 * time . Millisecond )
relaxedFactor := 10
counter := 0
shouldReadLagAggressively := false
replicationLagQuery := ""
for range aggressiveTicker {
if counter % relaxedFactor == 0 {
// we only check if we wish to be aggressive once per second. The parameters for being aggressive
// do not typically change at all throughout the migration, but nonetheless we check them.
counter = 0
maxLagMillisecondsThrottleThreshold := atomic . LoadInt64 ( & this . migrationContext . MaxLagMillisecondsThrottleThreshold )
replicationLagQuery = this . migrationContext . GetReplicationLagQuery ( )
shouldReadLagAggressively = ( replicationLagQuery != "" && maxLagMillisecondsThrottleThreshold < 1000 )
}
if counter == 0 || shouldReadLagAggressively {
// We check replication lag every so often, or if we wish to be aggressive
readControlReplicasLag ( replicationLagQuery )
}
counter ++
}
}
2016-04-14 11:37:56 +00:00
// initiateStreaming begins treaming of binary log events and registers listeners for such events
2016-04-08 08:34:44 +00:00
func ( this * Migrator ) initiateStreaming ( ) error {
2016-04-06 11:05:58 +00:00
this . eventsStreamer = NewEventsStreamer ( )
if err := this . eventsStreamer . InitDBConnections ( ) ; err != nil {
return err
}
2016-04-18 17:57:18 +00:00
this . eventsStreamer . AddListener (
false ,
this . migrationContext . DatabaseName ,
this . migrationContext . GetChangelogTableName ( ) ,
func ( dmlEvent * binlog . BinlogDMLEvent ) error {
return this . onChangelogStateEvent ( dmlEvent )
} ,
)
2016-05-16 09:03:15 +00:00
go func ( ) {
log . Debugf ( "Beginning streaming" )
2016-05-19 13:11:36 +00:00
err := this . eventsStreamer . StreamEvents ( this . canStopStreaming )
if err != nil {
this . panicAbort <- err
}
log . Debugf ( "Done streaming" )
2016-05-16 09:03:15 +00:00
} ( )
return nil
}
2016-06-19 15:55:37 +00:00
// addDMLEventsListener begins listening for binlog events on the original table,
// and creates & enqueues a write task per such event.
2016-05-16 09:03:15 +00:00
func ( this * Migrator ) addDMLEventsListener ( ) error {
err := this . eventsStreamer . AddListener (
2016-05-05 14:14:55 +00:00
false ,
2016-04-07 13:57:12 +00:00
this . migrationContext . DatabaseName ,
2016-04-14 11:37:56 +00:00
this . migrationContext . OriginalTableName ,
2016-04-07 13:57:12 +00:00
func ( dmlEvent * binlog . BinlogDMLEvent ) error {
2016-06-19 15:55:37 +00:00
// Create a task to apply the DML event; this will be execute by executeWriteFuncs()
2016-04-14 11:37:56 +00:00
applyEventFunc := func ( ) error {
return this . applier . ApplyDMLEventQuery ( dmlEvent )
}
this . applyEventsQueue <- applyEventFunc
return nil
2016-04-07 13:57:12 +00:00
} ,
)
2016-05-16 09:03:15 +00:00
return err
2016-04-08 08:34:44 +00:00
}
2016-04-06 11:05:58 +00:00
2016-04-08 08:34:44 +00:00
func ( this * Migrator ) initiateApplier ( ) error {
2016-04-04 13:29:02 +00:00
this . applier = NewApplier ( )
if err := this . applier . InitDBConnections ( ) ; err != nil {
return err
}
2016-05-03 09:55:17 +00:00
if err := this . applier . ValidateOrDropExistingTables ( ) ; err != nil {
return err
}
2016-06-14 06:35:07 +00:00
if err := this . applier . CreateChangelogTable ( ) ; err != nil {
log . Errorf ( "Unable to create changelog table, see further error details. Perhaps a previous migration failed without dropping the table? OR is there a running migration? Bailing out" )
return err
}
2016-04-06 11:05:58 +00:00
if err := this . applier . CreateGhostTable ( ) ; err != nil {
log . Errorf ( "Unable to create ghost table, see further error details. Perhaps a previous migration failed without dropping the table? Bailing out" )
return err
}
if err := this . applier . AlterGhost ( ) ; err != nil {
log . Errorf ( "Unable to ALTER ghost table, see further error details. Bailing out" )
return err
}
2016-04-07 13:57:12 +00:00
2016-04-08 12:35:06 +00:00
this . applier . WriteChangelogState ( string ( TablesInPlace ) )
2016-08-30 07:41:59 +00:00
go this . applier . InitiateHeartbeat ( )
2016-04-08 08:34:44 +00:00
return nil
}
2016-04-07 13:57:12 +00:00
2016-06-19 15:55:37 +00:00
// iterateChunks iterates the existing table rows, and generates a copy task of
// a chunk of rows onto the ghost table.
2016-04-08 08:34:44 +00:00
func ( this * Migrator ) iterateChunks ( ) error {
terminateRowIteration := func ( err error ) error {
this . rowCopyComplete <- true
return log . Errore ( err )
}
2016-04-14 11:37:56 +00:00
if this . migrationContext . Noop {
log . Debugf ( "Noop operation; not really copying data" )
return terminateRowIteration ( nil )
}
if this . migrationContext . MigrationRangeMinValues == nil {
log . Debugf ( "No rows found in table. Rowcopy will be implicitly empty" )
return terminateRowIteration ( nil )
}
2016-06-19 15:55:37 +00:00
// Iterate per chunk:
2016-04-08 08:34:44 +00:00
for {
2016-05-17 12:40:37 +00:00
if atomic . LoadInt64 ( & this . rowCopyCompleteFlag ) == 1 {
// Done
return nil
}
2016-04-08 08:34:44 +00:00
copyRowsFunc := func ( ) error {
2016-08-23 12:26:47 +00:00
if atomic . LoadInt64 ( & this . rowCopyCompleteFlag ) == 1 {
// Done
return nil
}
2016-04-08 08:34:44 +00:00
hasFurtherRange , err := this . applier . CalculateNextIterationRangeEndValues ( )
if err != nil {
return terminateRowIteration ( err )
}
if ! hasFurtherRange {
return terminateRowIteration ( nil )
}
2016-06-19 15:55:37 +00:00
// Copy task:
2016-05-16 09:03:15 +00:00
applyCopyRowsFunc := func ( ) error {
_ , rowsAffected , _ , err := this . applier . ApplyIterationInsertQuery ( )
if err != nil {
return terminateRowIteration ( err )
}
atomic . AddInt64 ( & this . migrationContext . TotalRowsCopied , rowsAffected )
atomic . AddInt64 ( & this . migrationContext . Iteration , 1 )
return nil
2016-04-08 08:34:44 +00:00
}
2016-05-16 09:03:15 +00:00
return this . retryOperation ( applyCopyRowsFunc )
2016-04-08 08:34:44 +00:00
}
2016-06-19 15:55:37 +00:00
// Enqueue copy operation; to be executed by executeWriteFuncs()
2016-04-08 08:34:44 +00:00
this . copyRowsQueue <- copyRowsFunc
2016-04-04 13:29:02 +00:00
}
2016-04-08 08:34:44 +00:00
return nil
}
2016-06-19 15:55:37 +00:00
// executeWriteFuncs writes data via applier: both the rowcopy and the events backlog.
// This is where the ghost table gets the data. The function fills the data single-threaded.
// Both event backlog and rowcopy events are polled; the backlog events have precedence.
2016-04-08 08:34:44 +00:00
func ( this * Migrator ) executeWriteFuncs ( ) error {
2016-04-14 11:37:56 +00:00
if this . migrationContext . Noop {
2016-04-19 11:25:32 +00:00
log . Debugf ( "Noop operation; not really executing write funcs" )
2016-04-14 11:37:56 +00:00
return nil
}
2016-04-05 07:14:22 +00:00
for {
2016-06-21 07:21:58 +00:00
if atomic . LoadInt64 ( & this . inCutOverCriticalActionFlag ) == 0 {
// we don't throttle when cutting over. We _do_ throttle:
// - during copy phase
// - just before cut-over
// - in between cut-over retries
this . throttle ( nil )
// When cutting over, we need to be aggressive. Cut-over holds table locks.
// We need to release those asap.
}
2016-04-08 08:34:44 +00:00
// We give higher priority to event processing, then secondary priority to
// rowcopy
select {
case applyEventFunc := <- this . applyEventsQueue :
{
2016-04-08 12:35:06 +00:00
if err := this . retryOperation ( applyEventFunc ) ; err != nil {
return log . Errore ( err )
}
2016-04-08 08:34:44 +00:00
}
default :
{
select {
case copyRowsFunc := <- this . copyRowsQueue :
{
2016-07-04 12:29:09 +00:00
copyRowsStartTime := time . Now ( )
2016-05-16 09:03:15 +00:00
// Retries are handled within the copyRowsFunc
if err := copyRowsFunc ( ) ; err != nil {
2016-04-08 12:35:06 +00:00
return log . Errore ( err )
}
2016-07-28 12:37:17 +00:00
if niceRatio := this . migrationContext . GetNiceRatio ( ) ; niceRatio > 0 {
2016-08-02 12:38:56 +00:00
copyRowsDuration := time . Since ( copyRowsStartTime )
2016-07-28 12:37:17 +00:00
sleepTimeNanosecondFloat64 := niceRatio * float64 ( copyRowsDuration . Nanoseconds ( ) )
sleepTime := time . Duration ( time . Duration ( int64 ( sleepTimeNanosecondFloat64 ) ) * time . Nanosecond )
2016-07-04 12:29:09 +00:00
time . Sleep ( sleepTime )
}
2016-04-08 08:34:44 +00:00
}
default :
{
// Hmmmmm... nothing in the queue; no events, but also no row copy.
// This is possible upon load. Let's just sleep it over.
log . Debugf ( "Getting nothing in the write queue. Sleeping..." )
time . Sleep ( time . Second )
}
}
}
2016-04-06 11:05:58 +00:00
}
2016-04-07 13:57:12 +00:00
}
2016-04-04 10:27:51 +00:00
return nil
}
2016-06-01 08:40:49 +00:00
// finalCleanup takes actions at very end of migration, dropping tables etc.
func ( this * Migrator ) finalCleanup ( ) error {
2016-06-06 10:33:05 +00:00
atomic . StoreInt64 ( & this . cleanupImminentFlag , 1 )
2016-06-22 10:39:13 +00:00
if this . migrationContext . Noop {
if createTableStatement , err := this . inspector . showCreateTable ( this . migrationContext . GetGhostTableName ( ) ) ; err == nil {
log . Infof ( "New table structure follows" )
fmt . Println ( createTableStatement )
} else {
log . Errore ( err )
}
}
2016-06-01 08:40:49 +00:00
if err := this . retryOperation ( this . applier . DropChangelogTable ) ; err != nil {
return err
}
if this . migrationContext . OkToDropTable && ! this . migrationContext . TestOnReplica {
2016-06-22 08:48:17 +00:00
if err := this . retryOperation ( this . applier . DropOldTable ) ; err != nil {
return err
2016-06-01 08:40:49 +00:00
}
2016-06-27 06:49:26 +00:00
} else {
if ! this . migrationContext . Noop {
log . Infof ( "Am not dropping old table because I want this operation to be as live as possible. If you insist I should do it, please add `--ok-to-drop-table` next time. But I prefer you do not. To drop the old table, issue:" )
log . Infof ( "-- drop table %s.%s" , sql . EscapeName ( this . migrationContext . DatabaseName ) , sql . EscapeName ( this . migrationContext . GetOldTableName ( ) ) )
}
2016-06-22 08:48:17 +00:00
}
if this . migrationContext . Noop {
if err := this . retryOperation ( this . applier . DropGhostTable ) ; err != nil {
2016-06-01 08:40:49 +00:00
return err
}
}
return nil
}