2016-04-04 10:27:51 +00:00
/ *
Copyright 2016 GitHub Inc .
See https : //github.com/github/gh-osc/blob/master/LICENSE
* /
package logic
import (
2016-04-04 13:29:02 +00:00
"fmt"
2016-04-08 08:34:44 +00:00
"os"
2016-04-11 15:27:16 +00:00
"os/signal"
2016-04-08 12:35:06 +00:00
"regexp"
2016-04-07 13:57:12 +00:00
"sync/atomic"
2016-04-11 15:27:16 +00:00
"syscall"
2016-04-07 13:57:12 +00:00
"time"
2016-04-04 13:29:02 +00:00
"github.com/github/gh-osc/go/base"
2016-04-07 13:57:12 +00:00
"github.com/github/gh-osc/go/binlog"
2016-04-04 13:29:02 +00:00
"github.com/outbrain/golib/log"
2016-04-04 10:27:51 +00:00
)
2016-04-08 08:34:44 +00:00
type ChangelogState string
const (
TablesInPlace ChangelogState = "TablesInPlace"
AllEventsUpToLockProcessed = "AllEventsUpToLockProcessed"
)
type tableWriteFunc func ( ) error
const (
applyEventsQueueBuffer = 100
)
2016-04-08 12:35:06 +00:00
var (
prettifyDurationRegexp = regexp . MustCompile ( "([.][0-9]+)" )
)
2016-04-04 10:27:51 +00:00
// Migrator is the main schema migration flow manager.
type Migrator struct {
inspector * Inspector
2016-04-04 13:29:02 +00:00
applier * Applier
2016-04-06 11:05:58 +00:00
eventsStreamer * EventsStreamer
2016-04-04 13:29:02 +00:00
migrationContext * base . MigrationContext
2016-04-07 13:57:12 +00:00
2016-04-08 08:34:44 +00:00
tablesInPlace chan bool
rowCopyComplete chan bool
allEventsUpToLockProcessed chan bool
// copyRowsQueue should not be buffered; if buffered some non-damaging but
// excessive work happens at the end of the iteration as new copy-jobs arrive befroe realizing the copy is complete
copyRowsQueue chan tableWriteFunc
applyEventsQueue chan tableWriteFunc
2016-04-04 10:27:51 +00:00
}
2016-04-04 13:29:02 +00:00
func NewMigrator ( ) * Migrator {
2016-04-07 13:57:12 +00:00
migrator := & Migrator {
2016-04-08 08:34:44 +00:00
migrationContext : base . GetMigrationContext ( ) ,
tablesInPlace : make ( chan bool ) ,
rowCopyComplete : make ( chan bool ) ,
allEventsUpToLockProcessed : make ( chan bool ) ,
copyRowsQueue : make ( chan tableWriteFunc ) ,
applyEventsQueue : make ( chan tableWriteFunc , applyEventsQueueBuffer ) ,
2016-04-07 13:57:12 +00:00
}
return migrator
}
2016-04-08 12:35:06 +00:00
func prettifyDurationOutput ( d time . Duration ) string {
if d < time . Second {
return "0s"
}
result := fmt . Sprintf ( "%s" , d )
result = prettifyDurationRegexp . ReplaceAllString ( result , "" )
return result
}
2016-04-11 15:27:16 +00:00
// acceptSignals registers for OS signals
func ( this * Migrator ) acceptSignals ( ) {
c := make ( chan os . Signal , 1 )
signal . Notify ( c , syscall . SIGHUP )
go func ( ) {
for sig := range c {
switch sig {
case syscall . SIGHUP :
log . Debugf ( "Received SIGHUP. Reloading configuration" )
}
}
} ( )
}
2016-04-08 12:35:06 +00:00
func ( this * Migrator ) shouldThrottle ( ) ( result bool , reason string ) {
2016-04-07 13:57:12 +00:00
lag := atomic . LoadInt64 ( & this . migrationContext . CurrentLag )
if time . Duration ( lag ) > time . Duration ( this . migrationContext . MaxLagMillisecondsThrottleThreshold ) * time . Millisecond {
2016-04-08 12:35:06 +00:00
return true , fmt . Sprintf ( "lag=%fs" , time . Duration ( lag ) . Seconds ( ) )
}
if this . migrationContext . ThrottleFlagFile != "" {
2016-04-08 08:34:44 +00:00
if _ , err := os . Stat ( this . migrationContext . ThrottleFlagFile ) ; err == nil {
2016-04-11 15:27:16 +00:00
// Throttle file defined and exists!
return true , "flag-file"
}
}
if this . migrationContext . ThrottleAdditionalFlagFile != "" {
if _ , err := os . Stat ( this . migrationContext . ThrottleAdditionalFlagFile ) ; err == nil {
// 2nd Throttle file defined and exists!
2016-04-08 12:35:06 +00:00
return true , "flag-file"
}
}
for variableName , threshold := range this . migrationContext . MaxLoad {
value , err := this . applier . ShowStatusVariable ( variableName )
if err != nil {
return true , fmt . Sprintf ( "%s %s" , variableName , err )
}
if value > threshold {
return true , fmt . Sprintf ( "%s=%d" , variableName , value )
2016-04-08 08:34:44 +00:00
}
2016-04-07 13:57:12 +00:00
}
2016-04-08 12:35:06 +00:00
return false , ""
}
2016-04-11 15:27:16 +00:00
func ( this * Migrator ) initiateThrottler ( ) error {
throttlerTick := time . Tick ( 1 * time . Second )
throttlerFunction := func ( ) {
alreadyThrottling , currentReason := this . migrationContext . IsThrottled ( )
shouldThrottle , throttleReason := this . shouldThrottle ( )
if shouldThrottle && ! alreadyThrottling {
// New throttling
this . applier . WriteAndLogChangelog ( "throttle" , throttleReason )
} else if shouldThrottle && alreadyThrottling && ( currentReason != throttleReason ) {
// Change of reason
this . applier . WriteAndLogChangelog ( "throttle" , throttleReason )
} else if alreadyThrottling && ! shouldThrottle {
// End of throttling
this . applier . WriteAndLogChangelog ( "throttle" , "done throttling" )
}
this . migrationContext . SetThrottled ( shouldThrottle , throttleReason )
}
throttlerFunction ( )
for range throttlerTick {
throttlerFunction ( )
}
return nil
}
2016-04-08 12:35:06 +00:00
// throttle initiates a throttling event, if need be, updates the Context and
// calls callback functions, if any
2016-04-11 15:27:16 +00:00
func ( this * Migrator ) throttle ( onThrottled func ( ) ) {
2016-04-08 12:35:06 +00:00
for {
2016-04-11 15:27:16 +00:00
if shouldThrottle , _ := this . migrationContext . IsThrottled ( ) ; ! shouldThrottle {
return
2016-04-08 12:35:06 +00:00
}
2016-04-11 15:27:16 +00:00
if onThrottled != nil {
onThrottled ( )
2016-04-08 12:35:06 +00:00
}
time . Sleep ( time . Second )
}
}
// retryOperation attempts up to `count` attempts at running given function,
// exiting as soon as it returns with non-error.
func ( this * Migrator ) retryOperation ( operation func ( ) error ) ( err error ) {
maxRetries := this . migrationContext . MaxRetries ( )
for i := 0 ; i < maxRetries ; i ++ {
if i != 0 {
// sleep after previous iteration
time . Sleep ( 1 * time . Second )
}
err = operation ( )
if err == nil {
return nil
}
// there's an error. Let's try again.
}
return err
2016-04-07 13:57:12 +00:00
}
func ( this * Migrator ) canStopStreaming ( ) bool {
return false
}
func ( this * Migrator ) onChangelogStateEvent ( dmlEvent * binlog . BinlogDMLEvent ) ( err error ) {
// Hey, I created the changlog table, I know the type of columns it has!
if hint := dmlEvent . NewColumnValues . StringColumn ( 2 ) ; hint != "state" {
return
2016-04-04 10:27:51 +00:00
}
2016-04-07 13:57:12 +00:00
changelogState := ChangelogState ( dmlEvent . NewColumnValues . StringColumn ( 3 ) )
switch changelogState {
case TablesInPlace :
{
this . tablesInPlace <- true
}
2016-04-08 08:34:44 +00:00
case AllEventsUpToLockProcessed :
{
this . allEventsUpToLockProcessed <- true
}
2016-04-07 13:57:12 +00:00
default :
{
return fmt . Errorf ( "Unknown changelog state: %+v" , changelogState )
}
}
2016-04-08 12:35:06 +00:00
log . Debugf ( "Received state %+v" , changelogState )
2016-04-07 13:57:12 +00:00
return nil
}
func ( this * Migrator ) onChangelogHeartbeatEvent ( dmlEvent * binlog . BinlogDMLEvent ) ( err error ) {
if hint := dmlEvent . NewColumnValues . StringColumn ( 2 ) ; hint != "heartbeat" {
return nil
}
value := dmlEvent . NewColumnValues . StringColumn ( 3 )
heartbeatTime , err := time . Parse ( time . RFC3339 , value )
if err != nil {
return log . Errore ( err )
}
lag := time . Now ( ) . Sub ( heartbeatTime )
atomic . StoreInt64 ( & this . migrationContext . CurrentLag , int64 ( lag ) )
return nil
2016-04-04 10:27:51 +00:00
}
2016-04-04 13:29:02 +00:00
func ( this * Migrator ) Migrate ( ) ( err error ) {
2016-04-08 08:34:44 +00:00
this . migrationContext . StartTime = time . Now ( )
2016-04-04 13:29:02 +00:00
this . inspector = NewInspector ( )
2016-04-04 10:27:51 +00:00
if err := this . inspector . InitDBConnections ( ) ; err != nil {
return err
}
2016-04-07 13:57:12 +00:00
if err := this . inspector . ValidateOriginalTable ( ) ; err != nil {
return err
}
2016-04-08 12:35:06 +00:00
if err := this . inspector . InspectOriginalTable ( ) ; err != nil {
2016-04-07 13:57:12 +00:00
return err
}
// So far so good, table is accessible and valid.
2016-04-04 13:29:02 +00:00
if this . migrationContext . MasterConnectionConfig , err = this . inspector . getMasterConnectionConfig ( ) ; err != nil {
return err
}
if this . migrationContext . IsRunningOnMaster ( ) && ! this . migrationContext . AllowedRunningOnMaster {
return fmt . Errorf ( "It seems like this migration attempt to run directly on master. Preferably it would be executed on a replica (and this reduces load from the master). To proceed please provide --allow-on-master" )
}
log . Infof ( "Master found to be %+v" , this . migrationContext . MasterConnectionConfig . Key )
2016-04-08 08:34:44 +00:00
if err := this . initiateStreaming ( ) ; err != nil {
return err
}
if err := this . initiateApplier ( ) ; err != nil {
return err
}
log . Debugf ( "Waiting for tables to be in place" )
<- this . tablesInPlace
log . Debugf ( "Tables are in place" )
// Yay! We now know the Ghost and Changelog tables are good to examine!
// When running on replica, this means the replica has those tables. When running
// on master this is always true, of course, and yet it also implies this knowledge
// is in the binlogs.
2016-04-08 12:35:06 +00:00
if err := this . inspector . InspectOriginalAndGhostTables ( ) ; err != nil {
return err
}
2016-04-08 08:34:44 +00:00
if err := this . applier . ReadMigrationRangeValues ( ) ; err != nil {
return err
}
2016-04-11 15:27:16 +00:00
go this . initiateThrottler ( )
2016-04-08 08:34:44 +00:00
go this . executeWriteFuncs ( )
go this . iterateChunks ( )
2016-04-08 12:35:06 +00:00
this . migrationContext . RowCopyStartTime = time . Now ( )
go this . initiateStatus ( )
2016-04-08 08:34:44 +00:00
log . Debugf ( "Operating until row copy is complete" )
<- this . rowCopyComplete
log . Debugf ( "Row copy complete" )
this . printStatus ( )
2016-04-11 15:27:16 +00:00
this . throttle ( func ( ) {
log . Debugf ( "throttling on LOCK TABLES" )
} )
2016-04-08 08:34:44 +00:00
// TODO retries!!
this . applier . LockTables ( )
2016-04-08 12:35:06 +00:00
this . applier . WriteChangelogState ( string ( AllEventsUpToLockProcessed ) )
2016-04-08 08:34:44 +00:00
log . Debugf ( "Waiting for events up to lock" )
<- this . allEventsUpToLockProcessed
log . Debugf ( "Done waiting for events up to lock" )
// TODO retries!!
this . applier . UnlockTables ( )
return nil
}
func ( this * Migrator ) initiateStatus ( ) error {
this . printStatus ( )
statusTick := time . Tick ( 1 * time . Second )
for range statusTick {
go this . printStatus ( )
}
return nil
}
func ( this * Migrator ) printStatus ( ) {
elapsedTime := this . migrationContext . ElapsedTime ( )
elapsedSeconds := int64 ( elapsedTime . Seconds ( ) )
totalRowsCopied := this . migrationContext . GetTotalRowsCopied ( )
rowsEstimate := this . migrationContext . RowsEstimate
progressPct := 100.0 * float64 ( totalRowsCopied ) / float64 ( rowsEstimate )
shouldPrintStatus := false
if elapsedSeconds <= 60 {
shouldPrintStatus = true
} else if progressPct >= 99.0 {
shouldPrintStatus = true
} else if progressPct >= 95.0 {
shouldPrintStatus = ( elapsedSeconds % 5 == 0 )
} else if elapsedSeconds <= 120 {
shouldPrintStatus = ( elapsedSeconds % 5 == 0 )
} else {
shouldPrintStatus = ( elapsedSeconds % 30 == 0 )
}
if ! shouldPrintStatus {
return
}
2016-04-08 12:35:06 +00:00
eta := "N/A"
2016-04-11 15:27:16 +00:00
if isThrottled , throttleReason := this . migrationContext . IsThrottled ( ) ; isThrottled {
eta = fmt . Sprintf ( "throttled, %s" , throttleReason )
2016-04-08 12:35:06 +00:00
}
status := fmt . Sprintf ( "Copy: %d/%d %.1f%%; Backlog: %d/%d; Elapsed: %+v(copy), %+v(total); ETA: %s" ,
2016-04-08 08:34:44 +00:00
totalRowsCopied , rowsEstimate , progressPct ,
len ( this . applyEventsQueue ) , cap ( this . applyEventsQueue ) ,
2016-04-08 12:35:06 +00:00
prettifyDurationOutput ( this . migrationContext . ElapsedRowCopyTime ( ) ) , prettifyDurationOutput ( elapsedTime ) ,
eta ,
)
this . applier . WriteChangelog (
fmt . Sprintf ( "copy iteration %d at %d" , this . migrationContext . GetIteration ( ) , time . Now ( ) . Unix ( ) ) ,
status ,
)
2016-04-08 08:34:44 +00:00
fmt . Println ( status )
}
func ( this * Migrator ) initiateStreaming ( ) error {
2016-04-06 11:05:58 +00:00
this . eventsStreamer = NewEventsStreamer ( )
if err := this . eventsStreamer . InitDBConnections ( ) ; err != nil {
return err
}
2016-04-07 13:57:12 +00:00
this . eventsStreamer . AddListener (
false ,
this . migrationContext . DatabaseName ,
this . migrationContext . GetChangelogTableName ( ) ,
func ( dmlEvent * binlog . BinlogDMLEvent ) error {
return this . onChangelogStateEvent ( dmlEvent )
} ,
)
this . eventsStreamer . AddListener (
false ,
this . migrationContext . DatabaseName ,
this . migrationContext . GetChangelogTableName ( ) ,
func ( dmlEvent * binlog . BinlogDMLEvent ) error {
return this . onChangelogHeartbeatEvent ( dmlEvent )
} ,
)
go func ( ) {
log . Debugf ( "Beginning streaming" )
this . eventsStreamer . StreamEvents ( func ( ) bool { return this . canStopStreaming ( ) } )
} ( )
2016-04-08 08:34:44 +00:00
return nil
}
2016-04-06 11:05:58 +00:00
2016-04-08 08:34:44 +00:00
func ( this * Migrator ) initiateApplier ( ) error {
2016-04-04 13:29:02 +00:00
this . applier = NewApplier ( )
if err := this . applier . InitDBConnections ( ) ; err != nil {
return err
}
2016-04-06 11:05:58 +00:00
if err := this . applier . CreateGhostTable ( ) ; err != nil {
log . Errorf ( "Unable to create ghost table, see further error details. Perhaps a previous migration failed without dropping the table? Bailing out" )
return err
}
if err := this . applier . AlterGhost ( ) ; err != nil {
log . Errorf ( "Unable to ALTER ghost table, see further error details. Bailing out" )
return err
}
2016-04-07 13:57:12 +00:00
if err := this . applier . CreateChangelogTable ( ) ; err != nil {
log . Errorf ( "Unable to create changelog table, see further error details. Perhaps a previous migration failed without dropping the table? OR is there a running migration? Bailing out" )
return err
}
2016-04-08 12:35:06 +00:00
this . applier . WriteChangelogState ( string ( TablesInPlace ) )
2016-04-07 13:57:12 +00:00
this . applier . InitiateHeartbeat ( )
2016-04-08 08:34:44 +00:00
return nil
}
2016-04-07 13:57:12 +00:00
2016-04-08 08:34:44 +00:00
func ( this * Migrator ) iterateChunks ( ) error {
terminateRowIteration := func ( err error ) error {
this . rowCopyComplete <- true
return log . Errore ( err )
}
for {
copyRowsFunc := func ( ) error {
hasFurtherRange , err := this . applier . CalculateNextIterationRangeEndValues ( )
if err != nil {
return terminateRowIteration ( err )
}
if ! hasFurtherRange {
return terminateRowIteration ( nil )
}
_ , rowsAffected , _ , err := this . applier . ApplyIterationInsertQuery ( )
if err != nil {
return terminateRowIteration ( err )
}
atomic . AddInt64 ( & this . migrationContext . TotalRowsCopied , rowsAffected )
2016-04-08 12:35:06 +00:00
atomic . AddInt64 ( & this . migrationContext . Iteration , 1 )
2016-04-08 08:34:44 +00:00
return nil
}
this . copyRowsQueue <- copyRowsFunc
2016-04-04 13:29:02 +00:00
}
2016-04-08 08:34:44 +00:00
return nil
}
func ( this * Migrator ) executeWriteFuncs ( ) error {
2016-04-05 07:14:22 +00:00
for {
2016-04-11 15:27:16 +00:00
this . throttle ( nil )
2016-04-08 08:34:44 +00:00
// We give higher priority to event processing, then secondary priority to
// rowcopy
select {
case applyEventFunc := <- this . applyEventsQueue :
{
2016-04-08 12:35:06 +00:00
if err := this . retryOperation ( applyEventFunc ) ; err != nil {
return log . Errore ( err )
}
2016-04-08 08:34:44 +00:00
}
default :
{
select {
case copyRowsFunc := <- this . copyRowsQueue :
{
2016-04-08 12:35:06 +00:00
if err := this . retryOperation ( copyRowsFunc ) ; err != nil {
return log . Errore ( err )
}
2016-04-08 08:34:44 +00:00
}
default :
{
// Hmmmmm... nothing in the queue; no events, but also no row copy.
// This is possible upon load. Let's just sleep it over.
log . Debugf ( "Getting nothing in the write queue. Sleeping..." )
time . Sleep ( time . Second )
}
}
}
2016-04-06 11:05:58 +00:00
}
2016-04-07 13:57:12 +00:00
}
2016-04-04 10:27:51 +00:00
return nil
}