819 lines
26 KiB
Go
819 lines
26 KiB
Go
/*
|
|
Copyright 2016 GitHub Inc.
|
|
See https://github.com/github/gh-ost/blob/master/LICENSE
|
|
*/
|
|
|
|
package logic
|
|
|
|
import (
|
|
"fmt"
|
|
"math"
|
|
"os"
|
|
"os/signal"
|
|
"sync/atomic"
|
|
"syscall"
|
|
"time"
|
|
|
|
"github.com/github/gh-ost/go/base"
|
|
"github.com/github/gh-ost/go/binlog"
|
|
"github.com/github/gh-ost/go/mysql"
|
|
"github.com/github/gh-ost/go/sql"
|
|
|
|
"github.com/outbrain/golib/log"
|
|
)
|
|
|
|
type ChangelogState string
|
|
|
|
const (
|
|
TablesInPlace ChangelogState = "TablesInPlace"
|
|
AllEventsUpToLockProcessed = "AllEventsUpToLockProcessed"
|
|
)
|
|
|
|
type tableWriteFunc func() error
|
|
|
|
const (
|
|
applyEventsQueueBuffer = 100
|
|
heartbeatIntervalMilliseconds = 1000
|
|
)
|
|
|
|
// Migrator is the main schema migration flow manager.
|
|
type Migrator struct {
|
|
inspector *Inspector
|
|
applier *Applier
|
|
eventsStreamer *EventsStreamer
|
|
migrationContext *base.MigrationContext
|
|
|
|
tablesInPlace chan bool
|
|
rowCopyComplete chan bool
|
|
allEventsUpToLockProcessed chan bool
|
|
voluntaryLockAcquired chan bool
|
|
panicAbort chan error
|
|
|
|
rowCopyCompleteFlag int64
|
|
allEventsUpToLockProcessedInjectedFlag int64
|
|
// copyRowsQueue should not be buffered; if buffered some non-damaging but
|
|
// excessive work happens at the end of the iteration as new copy-jobs arrive befroe realizing the copy is complete
|
|
copyRowsQueue chan tableWriteFunc
|
|
applyEventsQueue chan tableWriteFunc
|
|
|
|
handledChangelogStates map[string]bool
|
|
}
|
|
|
|
func NewMigrator() *Migrator {
|
|
migrator := &Migrator{
|
|
migrationContext: base.GetMigrationContext(),
|
|
tablesInPlace: make(chan bool),
|
|
rowCopyComplete: make(chan bool),
|
|
allEventsUpToLockProcessed: make(chan bool),
|
|
voluntaryLockAcquired: make(chan bool, 1),
|
|
panicAbort: make(chan error),
|
|
|
|
allEventsUpToLockProcessedInjectedFlag: 0,
|
|
|
|
copyRowsQueue: make(chan tableWriteFunc),
|
|
applyEventsQueue: make(chan tableWriteFunc, applyEventsQueueBuffer),
|
|
handledChangelogStates: make(map[string]bool),
|
|
}
|
|
return migrator
|
|
}
|
|
|
|
// acceptSignals registers for OS signals
|
|
func (this *Migrator) acceptSignals() {
|
|
c := make(chan os.Signal, 1)
|
|
|
|
signal.Notify(c, syscall.SIGHUP)
|
|
go func() {
|
|
for sig := range c {
|
|
switch sig {
|
|
case syscall.SIGHUP:
|
|
log.Debugf("Received SIGHUP. Reloading configuration")
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
func (this *Migrator) shouldThrottle() (result bool, reason string) {
|
|
// User-based throttle
|
|
if this.migrationContext.ThrottleFlagFile != "" {
|
|
if base.FileExists(this.migrationContext.ThrottleFlagFile) {
|
|
// Throttle file defined and exists!
|
|
return true, "flag-file"
|
|
}
|
|
}
|
|
if this.migrationContext.ThrottleAdditionalFlagFile != "" {
|
|
if base.FileExists(this.migrationContext.ThrottleAdditionalFlagFile) {
|
|
// 2nd Throttle file defined and exists!
|
|
return true, "flag-file"
|
|
}
|
|
}
|
|
// Replication lag throttle
|
|
lag := atomic.LoadInt64(&this.migrationContext.CurrentLag)
|
|
if time.Duration(lag) > time.Duration(this.migrationContext.MaxLagMillisecondsThrottleThreshold)*time.Millisecond {
|
|
return true, fmt.Sprintf("lag=%fs", time.Duration(lag).Seconds())
|
|
}
|
|
if this.migrationContext.TestOnReplica && (atomic.LoadInt64(&this.allEventsUpToLockProcessedInjectedFlag) == 0) {
|
|
replicationLag, err := mysql.GetMaxReplicationLag(this.migrationContext.InspectorConnectionConfig, this.migrationContext.ThrottleControlReplicaKeys, this.migrationContext.ReplictionLagQuery)
|
|
if err != nil {
|
|
return true, err.Error()
|
|
}
|
|
if replicationLag > time.Duration(this.migrationContext.MaxLagMillisecondsThrottleThreshold)*time.Millisecond {
|
|
return true, fmt.Sprintf("replica-lag=%fs", replicationLag.Seconds())
|
|
}
|
|
}
|
|
|
|
for variableName, threshold := range this.migrationContext.MaxLoad {
|
|
value, err := this.applier.ShowStatusVariable(variableName)
|
|
if err != nil {
|
|
return true, fmt.Sprintf("%s %s", variableName, err)
|
|
}
|
|
if value > threshold {
|
|
return true, fmt.Sprintf("%s=%d", variableName, value)
|
|
}
|
|
}
|
|
|
|
return false, ""
|
|
}
|
|
|
|
func (this *Migrator) initiateThrottler() error {
|
|
throttlerTick := time.Tick(1 * time.Second)
|
|
|
|
throttlerFunction := func() {
|
|
alreadyThrottling, currentReason := this.migrationContext.IsThrottled()
|
|
shouldThrottle, throttleReason := this.shouldThrottle()
|
|
if shouldThrottle && !alreadyThrottling {
|
|
// New throttling
|
|
this.applier.WriteAndLogChangelog("throttle", throttleReason)
|
|
} else if shouldThrottle && alreadyThrottling && (currentReason != throttleReason) {
|
|
// Change of reason
|
|
this.applier.WriteAndLogChangelog("throttle", throttleReason)
|
|
} else if alreadyThrottling && !shouldThrottle {
|
|
// End of throttling
|
|
this.applier.WriteAndLogChangelog("throttle", "done throttling")
|
|
}
|
|
this.migrationContext.SetThrottled(shouldThrottle, throttleReason)
|
|
}
|
|
throttlerFunction()
|
|
for range throttlerTick {
|
|
throttlerFunction()
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// throttle initiates a throttling event, if need be, updates the Context and
|
|
// calls callback functions, if any
|
|
func (this *Migrator) throttle(onThrottled func()) {
|
|
for {
|
|
if shouldThrottle, _ := this.migrationContext.IsThrottled(); !shouldThrottle {
|
|
return
|
|
}
|
|
if onThrottled != nil {
|
|
onThrottled()
|
|
}
|
|
time.Sleep(time.Second)
|
|
}
|
|
}
|
|
|
|
// sleepWhileTrue sleeps indefinitely until the given function returns 'false'
|
|
// (or fails with error)
|
|
func (this *Migrator) sleepWhileTrue(operation func() (bool, error)) error {
|
|
for {
|
|
shouldSleep, err := operation()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if !shouldSleep {
|
|
return nil
|
|
}
|
|
time.Sleep(time.Second)
|
|
}
|
|
}
|
|
|
|
// retryOperation attempts up to `count` attempts at running given function,
|
|
// exiting as soon as it returns with non-error.
|
|
func (this *Migrator) retryOperation(operation func() error) (err error) {
|
|
maxRetries := this.migrationContext.MaxRetries()
|
|
for i := 0; i < maxRetries; i++ {
|
|
if i != 0 {
|
|
// sleep after previous iteration
|
|
time.Sleep(1 * time.Second)
|
|
}
|
|
err = operation()
|
|
if err == nil {
|
|
return nil
|
|
}
|
|
// there's an error. Let's try again.
|
|
}
|
|
this.panicAbort <- err
|
|
return err
|
|
}
|
|
|
|
// executeAndThrottleOnError executes a given function. If it errors, it
|
|
// throttles.
|
|
func (this *Migrator) executeAndThrottleOnError(operation func() error) (err error) {
|
|
if err := operation(); err != nil {
|
|
this.throttle(nil)
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// consumeRowCopyComplete blocks on the rowCopyComplete channel once, and then
|
|
// consumers and drops any further incoming events that may be left hanging.
|
|
func (this *Migrator) consumeRowCopyComplete() {
|
|
<-this.rowCopyComplete
|
|
atomic.StoreInt64(&this.rowCopyCompleteFlag, 1)
|
|
go func() {
|
|
for <-this.rowCopyComplete {
|
|
}
|
|
}()
|
|
}
|
|
|
|
func (this *Migrator) canStopStreaming() bool {
|
|
return false
|
|
}
|
|
|
|
func (this *Migrator) onChangelogStateEvent(dmlEvent *binlog.BinlogDMLEvent) (err error) {
|
|
// Hey, I created the changlog table, I know the type of columns it has!
|
|
if hint := dmlEvent.NewColumnValues.StringColumn(2); hint != "state" {
|
|
return nil
|
|
}
|
|
changelogState := ChangelogState(dmlEvent.NewColumnValues.StringColumn(3))
|
|
switch changelogState {
|
|
case TablesInPlace:
|
|
{
|
|
this.tablesInPlace <- true
|
|
}
|
|
case AllEventsUpToLockProcessed:
|
|
{
|
|
applyEventFunc := func() error {
|
|
this.allEventsUpToLockProcessed <- true
|
|
return nil
|
|
}
|
|
// at this point we know all events up to lock have been read from the streamer,
|
|
// because the streamer works sequentially. So those events are either already handled,
|
|
// or have event functions in applyEventsQueue.
|
|
// So as not to create a potential deadlock, we write this func to applyEventsQueue
|
|
// asynchronously, understanding it doesn't really matter.
|
|
go func() {
|
|
this.applyEventsQueue <- applyEventFunc
|
|
}()
|
|
}
|
|
default:
|
|
{
|
|
return fmt.Errorf("Unknown changelog state: %+v", changelogState)
|
|
}
|
|
}
|
|
log.Debugf("Received state %+v", changelogState)
|
|
return nil
|
|
}
|
|
|
|
func (this *Migrator) onChangelogHeartbeat(heartbeatValue string) (err error) {
|
|
heartbeatTime, err := time.Parse(time.RFC3339Nano, heartbeatValue)
|
|
if err != nil {
|
|
return log.Errore(err)
|
|
}
|
|
lag := time.Now().Sub(heartbeatTime)
|
|
|
|
atomic.StoreInt64(&this.migrationContext.CurrentLag, int64(lag))
|
|
|
|
return nil
|
|
}
|
|
|
|
//
|
|
func (this *Migrator) listenOnPanicAbort() {
|
|
err := <-this.panicAbort
|
|
log.Fatale(err)
|
|
}
|
|
|
|
func (this *Migrator) Migrate() (err error) {
|
|
log.Infof("Migrating %s.%s", sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.OriginalTableName))
|
|
this.migrationContext.StartTime = time.Now()
|
|
|
|
go this.listenOnPanicAbort()
|
|
if err := this.initiateInspector(); err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := this.initiateStreaming(); err != nil {
|
|
return err
|
|
}
|
|
if err := this.initiateApplier(); err != nil {
|
|
return err
|
|
}
|
|
|
|
log.Debugf("Waiting for tables to be in place")
|
|
<-this.tablesInPlace
|
|
log.Debugf("Tables are in place")
|
|
// Yay! We now know the Ghost and Changelog tables are good to examine!
|
|
// When running on replica, this means the replica has those tables. When running
|
|
// on master this is always true, of course, and yet it also implies this knowledge
|
|
// is in the binlogs.
|
|
if err := this.inspector.InspectOriginalAndGhostTables(); err != nil {
|
|
return err
|
|
}
|
|
if err := this.addDMLEventsListener(); err != nil {
|
|
return err
|
|
}
|
|
go this.initiateHeartbeatListener()
|
|
|
|
if err := this.applier.ReadMigrationRangeValues(); err != nil {
|
|
return err
|
|
}
|
|
go this.initiateThrottler()
|
|
go this.executeWriteFuncs()
|
|
go this.iterateChunks()
|
|
this.migrationContext.RowCopyStartTime = time.Now()
|
|
go this.initiateStatus()
|
|
|
|
log.Debugf("Operating until row copy is complete")
|
|
this.consumeRowCopyComplete()
|
|
log.Infof("Row copy complete")
|
|
this.printStatus()
|
|
|
|
if err := this.stopWritesAndCompleteMigration(); err != nil {
|
|
return err
|
|
}
|
|
|
|
log.Infof("Done migrating %s.%s", sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.OriginalTableName))
|
|
return nil
|
|
}
|
|
|
|
// stopWritesAndCompleteMigration performs the final step of migration, based on migration
|
|
// type (on replica? bumpy? safe?)
|
|
func (this *Migrator) stopWritesAndCompleteMigration() (err error) {
|
|
if this.migrationContext.Noop {
|
|
log.Debugf("Noop operation; not really swapping tables")
|
|
return nil
|
|
}
|
|
this.throttle(func() {
|
|
log.Debugf("throttling before swapping tables")
|
|
})
|
|
|
|
this.sleepWhileTrue(
|
|
func() (bool, error) {
|
|
if this.migrationContext.PostponeSwapTablesFlagFile == "" {
|
|
return false, nil
|
|
}
|
|
if base.FileExists(this.migrationContext.PostponeSwapTablesFlagFile) {
|
|
// Throttle file defined and exists!
|
|
log.Debugf("Postponing final table swap as flag file exists: %+v", this.migrationContext.PostponeSwapTablesFlagFile)
|
|
return true, nil
|
|
}
|
|
return false, nil
|
|
},
|
|
)
|
|
|
|
if this.migrationContext.TestOnReplica {
|
|
return this.stopWritesAndCompleteMigrationOnReplica()
|
|
}
|
|
// Running on master
|
|
if this.migrationContext.QuickAndBumpySwapTables {
|
|
return this.stopWritesAndCompleteMigrationOnMasterQuickAndBumpy()
|
|
}
|
|
|
|
{
|
|
// Lock-based solution: we use low timeout and multiple attempts. But for
|
|
// each failed attempt, we throttle until replication lag is back to normal
|
|
if err := this.retryOperation(
|
|
func() error {
|
|
return this.executeAndThrottleOnError(this.stopWritesAndCompleteMigrationOnMasterViaLock)
|
|
}); err != nil {
|
|
return err
|
|
}
|
|
if err := this.dropOldTableIfRequired(); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return
|
|
}
|
|
|
|
func (this *Migrator) dropOldTableIfRequired() (err error) {
|
|
if !this.migrationContext.OkToDropTable {
|
|
return nil
|
|
}
|
|
dropTableFunc := func() error {
|
|
return this.applier.dropTable(this.migrationContext.GetOldTableName())
|
|
}
|
|
if err := this.retryOperation(dropTableFunc); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Inject the "AllEventsUpToLockProcessed" state hint, wait for it to appear in the binary logs,
|
|
// make sure the queue is drained.
|
|
func (this *Migrator) waitForEventsUpToLock() (err error) {
|
|
log.Infof("Writing changelog state: %+v", AllEventsUpToLockProcessed)
|
|
if _, err := this.applier.WriteChangelogState(string(AllEventsUpToLockProcessed)); err != nil {
|
|
return err
|
|
}
|
|
log.Infof("Waiting for events up to lock")
|
|
atomic.StoreInt64(&this.allEventsUpToLockProcessedInjectedFlag, 1)
|
|
<-this.allEventsUpToLockProcessed
|
|
log.Infof("Done waiting for events up to lock")
|
|
this.printStatus()
|
|
|
|
return nil
|
|
}
|
|
|
|
// stopWritesAndCompleteMigrationOnMasterQuickAndBumpy will lock down the original table, execute
|
|
// what's left of last DML entries, and **non-atomically** swap original->old, then new->original.
|
|
// There is a point in time where the "original" table does not exist and queries are non-blocked
|
|
// and failing.
|
|
func (this *Migrator) stopWritesAndCompleteMigrationOnMasterQuickAndBumpy() (err error) {
|
|
if err := this.retryOperation(this.applier.LockTables); err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := this.retryOperation(this.waitForEventsUpToLock); err != nil {
|
|
return err
|
|
}
|
|
if err := this.retryOperation(this.applier.SwapTablesQuickAndBumpy); err != nil {
|
|
return err
|
|
}
|
|
if err := this.retryOperation(this.applier.UnlockTables); err != nil {
|
|
return err
|
|
}
|
|
if err := this.dropOldTableIfRequired(); err != nil {
|
|
return err
|
|
}
|
|
|
|
lockAndRenameDuration := this.migrationContext.RenameTablesEndTime.Sub(this.migrationContext.LockTablesStartTime)
|
|
renameDuration := this.migrationContext.RenameTablesEndTime.Sub(this.migrationContext.RenameTablesStartTime)
|
|
log.Debugf("Lock & rename duration: %s (rename only: %s). During this time, queries on %s were locked or failing", lockAndRenameDuration, renameDuration, sql.EscapeName(this.migrationContext.OriginalTableName))
|
|
return nil
|
|
}
|
|
|
|
// stopWritesAndCompleteMigrationOnMasterViaLock will lock down the original table, execute
|
|
// what's left of last DML entries, and atomically swap & unlock (original->old && new->original)
|
|
func (this *Migrator) stopWritesAndCompleteMigrationOnMasterViaLock() (err error) {
|
|
lockGrabbed := make(chan error, 1)
|
|
okToReleaseLock := make(chan bool, 1)
|
|
swapResult := make(chan error, 1)
|
|
go func() {
|
|
if err := this.applier.GrabVoluntaryLock(lockGrabbed, okToReleaseLock); err != nil {
|
|
log.Errore(err)
|
|
}
|
|
}()
|
|
if err := <-lockGrabbed; err != nil {
|
|
return log.Errore(err)
|
|
}
|
|
blockingQuerySessionIdChan := make(chan int64, 1)
|
|
go func() {
|
|
this.applier.IssueBlockingQueryOnVoluntaryLock(blockingQuerySessionIdChan)
|
|
}()
|
|
blockingQuerySessionId := <-blockingQuerySessionIdChan
|
|
log.Infof("Intentional blocking query connection id is %+v", blockingQuerySessionId)
|
|
|
|
if err := this.retryOperation(
|
|
func() error {
|
|
return this.applier.ExpectProcess(blockingQuerySessionId, "User lock", this.migrationContext.GetVoluntaryLockName())
|
|
}); err != nil {
|
|
return err
|
|
}
|
|
log.Infof("Found blocking query to be executing")
|
|
swapSessionIdChan := make(chan int64, 1)
|
|
go func() {
|
|
swapResult <- this.applier.SwapTablesAtomic(swapSessionIdChan)
|
|
}()
|
|
|
|
swapSessionId := <-swapSessionIdChan
|
|
log.Infof("RENAME connection id is %+v", swapSessionId)
|
|
if err := this.retryOperation(
|
|
func() error {
|
|
return this.applier.ExpectProcess(swapSessionId, "metadata lock", "rename")
|
|
}); err != nil {
|
|
return err
|
|
}
|
|
log.Infof("Found RENAME to be executing")
|
|
|
|
// OK, at this time we know any newly incoming DML on original table is blocked.
|
|
this.waitForEventsUpToLock()
|
|
|
|
okToReleaseLock <- true
|
|
// BAM: voluntary lock is released, blocking query is released, rename is released.
|
|
// We now check RENAME result. We have lock_wait_timeout. We put it on purpose, to avoid
|
|
// locking the tables for too long. If lock time exceeds said timeout, the RENAME fails
|
|
// and returns a non-nil error, in which case tables have not been swapped, and we are
|
|
// not really done. We are, however, good to go for more retries.
|
|
if err := <-swapResult; err != nil {
|
|
// Bummer. We shall rest a while and try again
|
|
return err
|
|
}
|
|
// ooh nice! We're actually truly and thankfully done
|
|
lockAndRenameDuration := this.migrationContext.RenameTablesEndTime.Sub(this.migrationContext.LockTablesStartTime)
|
|
renameDuration := this.migrationContext.RenameTablesEndTime.Sub(this.migrationContext.RenameTablesStartTime)
|
|
log.Debugf("Lock & rename duration: %s. Of this, rename time was %s. During rename time, queries on %s were blocked", lockAndRenameDuration, renameDuration, sql.EscapeName(this.migrationContext.OriginalTableName))
|
|
return nil
|
|
}
|
|
|
|
// stopWritesAndCompleteMigrationOnReplica will stop replication IO thread, apply
|
|
// what DML events are left, and that's it.
|
|
// This only applies in --test-on-replica. It leaves replication stopped, with both tables
|
|
// in sync. There is no table swap.
|
|
func (this *Migrator) stopWritesAndCompleteMigrationOnReplica() (err error) {
|
|
log.Debugf("testing on replica. Instead of LOCK tables I will STOP SLAVE")
|
|
if err := this.retryOperation(this.applier.StopSlaveNicely); err != nil {
|
|
return err
|
|
}
|
|
|
|
this.waitForEventsUpToLock()
|
|
|
|
log.Info("Table duplicated with new schema. Am not touching the original table. Replication is stopped. You may now compare the two tables to gain trust into this tool's operation")
|
|
return nil
|
|
}
|
|
|
|
func (this *Migrator) initiateInspector() (err error) {
|
|
this.inspector = NewInspector()
|
|
if err := this.inspector.InitDBConnections(); err != nil {
|
|
return err
|
|
}
|
|
if err := this.inspector.ValidateOriginalTable(); err != nil {
|
|
return err
|
|
}
|
|
if err := this.inspector.InspectOriginalTable(); err != nil {
|
|
return err
|
|
}
|
|
// So far so good, table is accessible and valid.
|
|
// Let's get master connection config
|
|
if this.migrationContext.ApplierConnectionConfig, err = this.inspector.getMasterConnectionConfig(); err != nil {
|
|
return err
|
|
}
|
|
if this.migrationContext.TestOnReplica {
|
|
if this.migrationContext.InspectorIsAlsoApplier() {
|
|
return fmt.Errorf("Instructed to --test-on-replica, but the server we connect to doesn't seem to be a replica")
|
|
}
|
|
log.Infof("--test-on-replica given. Will not execute on master %+v but rather on replica %+v itself",
|
|
this.migrationContext.ApplierConnectionConfig.Key, this.migrationContext.InspectorConnectionConfig.Key,
|
|
)
|
|
this.migrationContext.ApplierConnectionConfig = this.migrationContext.InspectorConnectionConfig.Duplicate()
|
|
if this.migrationContext.ThrottleControlReplicaKeys.Len() == 0 {
|
|
this.migrationContext.ThrottleControlReplicaKeys.AddKey(this.migrationContext.InspectorConnectionConfig.Key)
|
|
}
|
|
} else if this.migrationContext.InspectorIsAlsoApplier() && !this.migrationContext.AllowedRunningOnMaster {
|
|
return fmt.Errorf("It seems like this migration attempt to run directly on master. Preferably it would be executed on a replica (and this reduces load from the master). To proceed please provide --allow-on-master")
|
|
}
|
|
|
|
log.Infof("Master found to be %+v", this.migrationContext.ApplierConnectionConfig.Key)
|
|
return nil
|
|
}
|
|
|
|
func (this *Migrator) initiateStatus() error {
|
|
this.printStatus()
|
|
statusTick := time.Tick(1 * time.Second)
|
|
for range statusTick {
|
|
go this.printStatus()
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (this *Migrator) printStatus() {
|
|
elapsedTime := this.migrationContext.ElapsedTime()
|
|
elapsedSeconds := int64(elapsedTime.Seconds())
|
|
totalRowsCopied := this.migrationContext.GetTotalRowsCopied()
|
|
rowsEstimate := atomic.LoadInt64(&this.migrationContext.RowsEstimate)
|
|
var progressPct float64
|
|
if rowsEstimate > 0 {
|
|
progressPct = 100.0 * float64(totalRowsCopied) / float64(rowsEstimate)
|
|
}
|
|
|
|
// Before status, let's see if we should print a nice reminder for what exactly we're doing here.
|
|
shouldPrintCourtesyReminder := (elapsedSeconds%600 == 0)
|
|
if shouldPrintCourtesyReminder {
|
|
courtesyReminder := fmt.Sprintf("# Migrating %s.%s; Ghost table is %s.%s; migration started at %+v",
|
|
sql.EscapeName(this.migrationContext.DatabaseName),
|
|
sql.EscapeName(this.migrationContext.OriginalTableName),
|
|
sql.EscapeName(this.migrationContext.DatabaseName),
|
|
sql.EscapeName(this.migrationContext.GetGhostTableName()),
|
|
this.migrationContext.StartTime.Format(time.RubyDate),
|
|
)
|
|
fmt.Println(courtesyReminder)
|
|
}
|
|
|
|
var etaSeconds float64 = math.MaxFloat64
|
|
eta := "N/A"
|
|
if isThrottled, throttleReason := this.migrationContext.IsThrottled(); isThrottled {
|
|
eta = fmt.Sprintf("throttled, %s", throttleReason)
|
|
} else if progressPct > 100.0 {
|
|
eta = "Due"
|
|
} else if progressPct >= 1.0 {
|
|
elapsedRowCopySeconds := this.migrationContext.ElapsedRowCopyTime().Seconds()
|
|
totalExpectedSeconds := elapsedRowCopySeconds * float64(rowsEstimate) / float64(totalRowsCopied)
|
|
etaSeconds = totalExpectedSeconds - elapsedRowCopySeconds
|
|
if etaSeconds >= 0 {
|
|
etaDuration := time.Duration(etaSeconds) * time.Second
|
|
eta = base.PrettifyDurationOutput(etaDuration)
|
|
} else {
|
|
eta = "Due"
|
|
}
|
|
}
|
|
|
|
shouldPrintStatus := false
|
|
if elapsedSeconds <= 60 {
|
|
shouldPrintStatus = true
|
|
} else if etaSeconds <= 60 {
|
|
shouldPrintStatus = true
|
|
} else if etaSeconds <= 180 {
|
|
shouldPrintStatus = (elapsedSeconds%5 == 0)
|
|
} else if elapsedSeconds <= 180 {
|
|
shouldPrintStatus = (elapsedSeconds%5 == 0)
|
|
} else if this.migrationContext.TimeSincePointOfInterest().Seconds() <= 60 {
|
|
shouldPrintStatus = (elapsedSeconds%5 == 0)
|
|
} else {
|
|
shouldPrintStatus = (elapsedSeconds%30 == 0)
|
|
}
|
|
if !shouldPrintStatus {
|
|
return
|
|
}
|
|
|
|
currentBinlogCoordinates := *this.eventsStreamer.GetCurrentBinlogCoordinates()
|
|
|
|
status := fmt.Sprintf("Copy: %d/%d %.1f%%; Applied: %d; Backlog: %d/%d; Elapsed: %+v(copy), %+v(total); streamer: %+v; ETA: %s",
|
|
totalRowsCopied, rowsEstimate, progressPct,
|
|
atomic.LoadInt64(&this.migrationContext.TotalDMLEventsApplied),
|
|
len(this.applyEventsQueue), cap(this.applyEventsQueue),
|
|
base.PrettifyDurationOutput(this.migrationContext.ElapsedRowCopyTime()), base.PrettifyDurationOutput(elapsedTime),
|
|
currentBinlogCoordinates,
|
|
eta,
|
|
)
|
|
this.applier.WriteChangelog(
|
|
fmt.Sprintf("copy iteration %d at %d", this.migrationContext.GetIteration(), time.Now().Unix()),
|
|
status,
|
|
)
|
|
fmt.Println(status)
|
|
}
|
|
|
|
func (this *Migrator) initiateHeartbeatListener() {
|
|
ticker := time.Tick((heartbeatIntervalMilliseconds * time.Millisecond) / 2)
|
|
for range ticker {
|
|
go func() error {
|
|
changelogState, err := this.inspector.readChangelogState()
|
|
if err != nil {
|
|
return log.Errore(err)
|
|
}
|
|
for hint, value := range changelogState {
|
|
switch hint {
|
|
case "heartbeat":
|
|
{
|
|
this.onChangelogHeartbeat(value)
|
|
}
|
|
}
|
|
}
|
|
return nil
|
|
}()
|
|
}
|
|
}
|
|
|
|
// initiateStreaming begins treaming of binary log events and registers listeners for such events
|
|
func (this *Migrator) initiateStreaming() error {
|
|
this.eventsStreamer = NewEventsStreamer()
|
|
if err := this.eventsStreamer.InitDBConnections(); err != nil {
|
|
return err
|
|
}
|
|
this.eventsStreamer.AddListener(
|
|
false,
|
|
this.migrationContext.DatabaseName,
|
|
this.migrationContext.GetChangelogTableName(),
|
|
func(dmlEvent *binlog.BinlogDMLEvent) error {
|
|
return this.onChangelogStateEvent(dmlEvent)
|
|
},
|
|
)
|
|
|
|
go func() {
|
|
log.Debugf("Beginning streaming")
|
|
err := this.eventsStreamer.StreamEvents(this.canStopStreaming)
|
|
if err != nil {
|
|
this.panicAbort <- err
|
|
}
|
|
log.Debugf("Done streaming")
|
|
}()
|
|
return nil
|
|
}
|
|
|
|
// addDMLEventsListener
|
|
func (this *Migrator) addDMLEventsListener() error {
|
|
err := this.eventsStreamer.AddListener(
|
|
false,
|
|
this.migrationContext.DatabaseName,
|
|
this.migrationContext.OriginalTableName,
|
|
func(dmlEvent *binlog.BinlogDMLEvent) error {
|
|
applyEventFunc := func() error {
|
|
return this.applier.ApplyDMLEventQuery(dmlEvent)
|
|
}
|
|
this.applyEventsQueue <- applyEventFunc
|
|
return nil
|
|
},
|
|
)
|
|
return err
|
|
}
|
|
|
|
func (this *Migrator) initiateApplier() error {
|
|
this.applier = NewApplier()
|
|
if err := this.applier.InitDBConnections(); err != nil {
|
|
return err
|
|
}
|
|
if err := this.applier.ValidateOrDropExistingTables(); err != nil {
|
|
return err
|
|
}
|
|
if err := this.applier.CreateGhostTable(); err != nil {
|
|
log.Errorf("Unable to create ghost table, see further error details. Perhaps a previous migration failed without dropping the table? Bailing out")
|
|
return err
|
|
}
|
|
if err := this.applier.AlterGhost(); err != nil {
|
|
log.Errorf("Unable to ALTER ghost table, see further error details. Bailing out")
|
|
return err
|
|
}
|
|
if err := this.applier.CreateChangelogTable(); err != nil {
|
|
log.Errorf("Unable to create changelog table, see further error details. Perhaps a previous migration failed without dropping the table? OR is there a running migration? Bailing out")
|
|
return err
|
|
}
|
|
|
|
this.applier.WriteChangelogState(string(TablesInPlace))
|
|
go this.applier.InitiateHeartbeat(heartbeatIntervalMilliseconds)
|
|
return nil
|
|
}
|
|
|
|
func (this *Migrator) iterateChunks() error {
|
|
terminateRowIteration := func(err error) error {
|
|
this.rowCopyComplete <- true
|
|
return log.Errore(err)
|
|
}
|
|
if this.migrationContext.Noop {
|
|
log.Debugf("Noop operation; not really copying data")
|
|
return terminateRowIteration(nil)
|
|
}
|
|
if this.migrationContext.MigrationRangeMinValues == nil {
|
|
log.Debugf("No rows found in table. Rowcopy will be implicitly empty")
|
|
return terminateRowIteration(nil)
|
|
}
|
|
for {
|
|
if atomic.LoadInt64(&this.rowCopyCompleteFlag) == 1 {
|
|
// Done
|
|
return nil
|
|
}
|
|
copyRowsFunc := func() error {
|
|
hasFurtherRange, err := this.applier.CalculateNextIterationRangeEndValues()
|
|
if err != nil {
|
|
return terminateRowIteration(err)
|
|
}
|
|
if !hasFurtherRange {
|
|
return terminateRowIteration(nil)
|
|
}
|
|
applyCopyRowsFunc := func() error {
|
|
_, rowsAffected, _, err := this.applier.ApplyIterationInsertQuery()
|
|
if err != nil {
|
|
return terminateRowIteration(err)
|
|
}
|
|
atomic.AddInt64(&this.migrationContext.TotalRowsCopied, rowsAffected)
|
|
atomic.AddInt64(&this.migrationContext.Iteration, 1)
|
|
return nil
|
|
}
|
|
return this.retryOperation(applyCopyRowsFunc)
|
|
}
|
|
this.copyRowsQueue <- copyRowsFunc
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (this *Migrator) executeWriteFuncs() error {
|
|
if this.migrationContext.Noop {
|
|
log.Debugf("Noop operation; not really executing write funcs")
|
|
return nil
|
|
}
|
|
for {
|
|
this.throttle(nil)
|
|
// We give higher priority to event processing, then secondary priority to
|
|
// rowcopy
|
|
select {
|
|
case applyEventFunc := <-this.applyEventsQueue:
|
|
{
|
|
if err := this.retryOperation(applyEventFunc); err != nil {
|
|
return log.Errore(err)
|
|
}
|
|
}
|
|
default:
|
|
{
|
|
select {
|
|
case copyRowsFunc := <-this.copyRowsQueue:
|
|
{
|
|
// Retries are handled within the copyRowsFunc
|
|
if err := copyRowsFunc(); err != nil {
|
|
return log.Errore(err)
|
|
}
|
|
}
|
|
default:
|
|
{
|
|
// Hmmmmm... nothing in the queue; no events, but also no row copy.
|
|
// This is possible upon load. Let's just sleep it over.
|
|
log.Debugf("Getting nothing in the write queue. Sleeping...")
|
|
time.Sleep(time.Second)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
return nil
|
|
}
|