2016-04-04 13:30:49 +00:00
/ *
Copyright 2016 GitHub Inc .
2016-05-16 09:09:17 +00:00
See https : //github.com/github/gh-ost/blob/master/LICENSE
2016-04-04 13:30:49 +00:00
* /
package logic
import (
gosql "database/sql"
"fmt"
2016-04-08 08:34:44 +00:00
"sync/atomic"
2016-04-07 13:57:12 +00:00
"time"
2016-05-16 09:09:17 +00:00
"github.com/github/gh-ost/go/base"
"github.com/github/gh-ost/go/binlog"
"github.com/github/gh-ost/go/mysql"
"github.com/github/gh-ost/go/sql"
2016-04-04 13:30:49 +00:00
"github.com/outbrain/golib/log"
"github.com/outbrain/golib/sqlutils"
)
2016-06-27 09:08:06 +00:00
const (
atomicCutOverMagicHint = "ghost-cut-over-sentry"
)
2016-06-19 15:55:37 +00:00
// Applier connects and writes the the applier-server, which is the server where migration
// happens. This is typically the master, but could be a replica when `--test-on-replica` or
// `--execute-on-replica` are given.
// Applier is the one to actually write row data and apply binlog events onto the ghost table.
// It is where the ghost & changelog tables get created. It is where the cut-over phase happens.
2016-04-04 13:30:49 +00:00
type Applier struct {
connectionConfig * mysql . ConnectionConfig
db * gosql . DB
2016-04-18 17:57:18 +00:00
singletonDB * gosql . DB
2016-04-04 13:30:49 +00:00
migrationContext * base . MigrationContext
}
func NewApplier ( ) * Applier {
return & Applier {
2016-04-14 11:37:56 +00:00
connectionConfig : base . GetMigrationContext ( ) . ApplierConnectionConfig ,
2016-04-04 13:30:49 +00:00
migrationContext : base . GetMigrationContext ( ) ,
}
}
func ( this * Applier ) InitDBConnections ( ) ( err error ) {
2016-04-18 17:57:18 +00:00
applierUri := this . connectionConfig . GetDBUri ( this . migrationContext . DatabaseName )
if this . db , _ , err = sqlutils . GetDB ( applierUri ) ; err != nil {
2016-04-04 13:30:49 +00:00
return err
}
2016-04-18 17:57:18 +00:00
singletonApplierUri := fmt . Sprintf ( "%s?timeout=0" , applierUri )
if this . singletonDB , _ , err = sqlutils . GetDB ( singletonApplierUri ) ; err != nil {
return err
}
this . singletonDB . SetMaxOpenConns ( 1 )
if err := this . validateConnection ( this . db ) ; err != nil {
return err
}
if err := this . validateConnection ( this . singletonDB ) ; err != nil {
2016-04-04 13:30:49 +00:00
return err
}
2016-10-11 14:00:26 +00:00
if err := this . validateAndReadTimeZone ( ) ; err != nil {
return err
}
2016-06-19 15:55:37 +00:00
if impliedKey , err := mysql . GetInstanceKey ( this . db ) ; err != nil {
return err
} else {
this . connectionConfig . ImpliedKey = impliedKey
}
2016-10-20 09:29:30 +00:00
if err := this . readTableColumns ( ) ; err != nil {
return err
}
2016-04-04 13:30:49 +00:00
return nil
}
// validateConnection issues a simple can-connect to MySQL
2016-04-18 17:57:18 +00:00
func ( this * Applier ) validateConnection ( db * gosql . DB ) error {
2016-04-04 13:30:49 +00:00
query := ` select @@global.port `
var port int
2016-04-18 17:57:18 +00:00
if err := db . QueryRow ( query ) . Scan ( & port ) ; err != nil {
2016-04-04 13:30:49 +00:00
return err
}
if port != this . connectionConfig . Key . Port {
return fmt . Errorf ( "Unexpected database port reported: %+v" , port )
}
log . Infof ( "connection validated on %+v" , this . connectionConfig . Key )
return nil
}
2016-10-11 14:00:26 +00:00
// validateAndReadTimeZone potentially reads server time-zone
func ( this * Applier ) validateAndReadTimeZone ( ) error {
2016-10-14 10:56:43 +00:00
query := ` select @@global.time_zone `
if err := this . db . QueryRow ( query ) . Scan ( & this . migrationContext . ApplierTimeZone ) ; err != nil {
return err
2016-10-11 14:00:26 +00:00
}
2016-10-14 10:56:43 +00:00
2016-10-11 14:00:26 +00:00
log . Infof ( "will use time_zone='%s' on applier" , this . migrationContext . ApplierTimeZone )
return nil
}
2016-10-20 09:29:30 +00:00
// readTableColumns reads table columns on applier
func ( this * Applier ) readTableColumns ( ) ( err error ) {
log . Infof ( "Examining table structure on applier" )
this . migrationContext . OriginalTableColumnsOnApplier , err = mysql . GetTableColumns ( this . db , this . migrationContext . DatabaseName , this . migrationContext . OriginalTableName )
if err != nil {
return err
}
return nil
}
2016-06-27 09:08:06 +00:00
// showTableStatus returns the output of `show table status like '...'` command
func ( this * Applier ) showTableStatus ( tableName string ) ( rowMap sqlutils . RowMap ) {
rowMap = nil
2016-05-16 09:09:17 +00:00
query := fmt . Sprintf ( ` show /* gh-ost */ table status from %s like '%s' ` , sql . EscapeName ( this . migrationContext . DatabaseName ) , tableName )
2016-05-03 09:55:17 +00:00
sqlutils . QueryRowsMap ( this . db , query , func ( m sqlutils . RowMap ) error {
2016-06-27 09:08:06 +00:00
rowMap = m
2016-05-03 09:55:17 +00:00
return nil
} )
2016-06-27 09:08:06 +00:00
return rowMap
}
// tableExists checks if a given table exists in database
func ( this * Applier ) tableExists ( tableName string ) ( tableFound bool ) {
m := this . showTableStatus ( tableName )
return ( m != nil )
2016-05-03 09:55:17 +00:00
}
2016-06-19 15:55:37 +00:00
// ValidateOrDropExistingTables verifies ghost and changelog tables do not exist,
// or attempts to drop them if instructed to.
2016-05-03 09:55:17 +00:00
func ( this * Applier ) ValidateOrDropExistingTables ( ) error {
if this . migrationContext . InitiallyDropGhostTable {
if err := this . DropGhostTable ( ) ; err != nil {
return err
}
}
if this . tableExists ( this . migrationContext . GetGhostTableName ( ) ) {
2016-08-22 06:49:27 +00:00
return fmt . Errorf ( "Table %s already exists. Panicking. Use --initially-drop-ghost-table to force dropping it, though I really prefer that you drop it or rename it away" , sql . EscapeName ( this . migrationContext . GetGhostTableName ( ) ) )
2016-05-03 09:55:17 +00:00
}
if this . migrationContext . InitiallyDropOldTable {
if err := this . DropOldTable ( ) ; err != nil {
return err
}
}
if this . tableExists ( this . migrationContext . GetOldTableName ( ) ) {
2016-08-22 06:49:27 +00:00
return fmt . Errorf ( "Table %s already exists. Panicking. Use --initially-drop-old-table to force dropping it, though I really prefer that you drop it or rename it away" , sql . EscapeName ( this . migrationContext . GetOldTableName ( ) ) )
2016-05-03 09:55:17 +00:00
}
return nil
}
2016-04-11 15:27:16 +00:00
// CreateGhostTable creates the ghost table on the applier host
2016-04-04 13:30:49 +00:00
func ( this * Applier ) CreateGhostTable ( ) error {
2016-05-16 09:09:17 +00:00
query := fmt . Sprintf ( ` create /* gh-ost */ table %s.%s like %s.%s ` ,
2016-04-04 13:30:49 +00:00
sql . EscapeName ( this . migrationContext . DatabaseName ) ,
sql . EscapeName ( this . migrationContext . GetGhostTableName ( ) ) ,
sql . EscapeName ( this . migrationContext . DatabaseName ) ,
sql . EscapeName ( this . migrationContext . OriginalTableName ) ,
)
log . Infof ( "Creating ghost table %s.%s" ,
sql . EscapeName ( this . migrationContext . DatabaseName ) ,
sql . EscapeName ( this . migrationContext . GetGhostTableName ( ) ) ,
)
if _ , err := sqlutils . ExecNoPrepare ( this . db , query ) ; err != nil {
return err
}
2016-04-07 13:57:12 +00:00
log . Infof ( "Ghost table created" )
2016-04-04 13:30:49 +00:00
return nil
}
2016-04-11 15:27:16 +00:00
// AlterGhost applies `alter` statement on ghost table
2016-04-04 13:30:49 +00:00
func ( this * Applier ) AlterGhost ( ) error {
2016-05-16 09:09:17 +00:00
query := fmt . Sprintf ( ` alter /* gh-ost */ table %s.%s %s ` ,
2016-04-04 13:30:49 +00:00
sql . EscapeName ( this . migrationContext . DatabaseName ) ,
sql . EscapeName ( this . migrationContext . GetGhostTableName ( ) ) ,
this . migrationContext . AlterStatement ,
)
log . Infof ( "Altering ghost table %s.%s" ,
sql . EscapeName ( this . migrationContext . DatabaseName ) ,
sql . EscapeName ( this . migrationContext . GetGhostTableName ( ) ) ,
)
log . Debugf ( "ALTER statement: %s" , query )
if _ , err := sqlutils . ExecNoPrepare ( this . db , query ) ; err != nil {
return err
}
2016-04-07 13:57:12 +00:00
log . Infof ( "Ghost table altered" )
return nil
}
2016-04-11 15:27:16 +00:00
// CreateChangelogTable creates the changelog table on the applier host
2016-04-07 13:57:12 +00:00
func ( this * Applier ) CreateChangelogTable ( ) error {
2016-05-03 09:55:17 +00:00
if err := this . DropChangelogTable ( ) ; err != nil {
return err
}
2016-05-16 09:09:17 +00:00
query := fmt . Sprintf ( ` create /* gh-ost */ table % s . % s (
2016-04-08 12:35:06 +00:00
id bigint auto_increment ,
2016-04-07 13:57:12 +00:00
last_update timestamp not null DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP ,
hint varchar ( 64 ) charset ascii not null ,
2016-04-08 12:35:06 +00:00
value varchar ( 255 ) charset ascii not null ,
2016-04-07 13:57:12 +00:00
primary key ( id ) ,
unique key hint_uidx ( hint )
2016-04-11 15:27:16 +00:00
) auto_increment = 256
2016-04-07 13:57:12 +00:00
` ,
sql . EscapeName ( this . migrationContext . DatabaseName ) ,
sql . EscapeName ( this . migrationContext . GetChangelogTableName ( ) ) ,
)
log . Infof ( "Creating changelog table %s.%s" ,
sql . EscapeName ( this . migrationContext . DatabaseName ) ,
sql . EscapeName ( this . migrationContext . GetChangelogTableName ( ) ) ,
)
if _ , err := sqlutils . ExecNoPrepare ( this . db , query ) ; err != nil {
return err
}
log . Infof ( "Changelog table created" )
return nil
}
2016-04-11 15:27:16 +00:00
// dropTable drops a given table on the applied host
func ( this * Applier ) dropTable ( tableName string ) error {
2016-05-16 09:09:17 +00:00
query := fmt . Sprintf ( ` drop /* gh-ost */ table if exists %s.%s ` ,
2016-04-07 13:57:12 +00:00
sql . EscapeName ( this . migrationContext . DatabaseName ) ,
2016-04-11 15:27:16 +00:00
sql . EscapeName ( tableName ) ,
2016-04-07 13:57:12 +00:00
)
2016-04-11 15:27:16 +00:00
log . Infof ( "Droppping table %s.%s" ,
2016-04-07 13:57:12 +00:00
sql . EscapeName ( this . migrationContext . DatabaseName ) ,
2016-04-11 15:27:16 +00:00
sql . EscapeName ( tableName ) ,
2016-04-07 13:57:12 +00:00
)
if _ , err := sqlutils . ExecNoPrepare ( this . db , query ) ; err != nil {
return err
}
2016-04-11 15:27:16 +00:00
log . Infof ( "Table dropped" )
2016-04-04 13:30:49 +00:00
return nil
}
2016-04-04 16:19:46 +00:00
2016-04-11 15:27:16 +00:00
// DropChangelogTable drops the changelog table on the applier host
func ( this * Applier ) DropChangelogTable ( ) error {
return this . dropTable ( this . migrationContext . GetChangelogTableName ( ) )
}
2016-05-03 09:55:17 +00:00
// DropOldTable drops the _Old table on the applier host
func ( this * Applier ) DropOldTable ( ) error {
return this . dropTable ( this . migrationContext . GetOldTableName ( ) )
}
2016-04-11 15:27:16 +00:00
// DropGhostTable drops the ghost table on the applier host
func ( this * Applier ) DropGhostTable ( ) error {
return this . dropTable ( this . migrationContext . GetGhostTableName ( ) )
}
2016-04-07 13:57:12 +00:00
// WriteChangelog writes a value to the changelog table.
// It returns the hint as given, for convenience
func ( this * Applier ) WriteChangelog ( hint , value string ) ( string , error ) {
2016-04-14 11:37:56 +00:00
explicitId := 0
switch hint {
case "heartbeat" :
explicitId = 1
case "state" :
explicitId = 2
case "throttle" :
explicitId = 3
}
2016-04-07 13:57:12 +00:00
query := fmt . Sprintf ( `
2016-05-16 09:09:17 +00:00
insert /* gh-ost */ into % s . % s
2016-04-07 13:57:12 +00:00
( id , hint , value )
values
2016-04-14 11:37:56 +00:00
( NULLIF ( ? , 0 ) , ? , ? )
2016-04-07 13:57:12 +00:00
on duplicate key update
last_update = NOW ( ) ,
value = VALUES ( value )
` ,
sql . EscapeName ( this . migrationContext . DatabaseName ) ,
sql . EscapeName ( this . migrationContext . GetChangelogTableName ( ) ) ,
)
2016-04-14 11:37:56 +00:00
_ , err := sqlutils . Exec ( this . db , query , explicitId , hint , value )
2016-04-07 13:57:12 +00:00
return hint , err
}
2016-04-11 15:27:16 +00:00
func ( this * Applier ) WriteAndLogChangelog ( hint , value string ) ( string , error ) {
2016-04-08 12:35:06 +00:00
this . WriteChangelog ( hint , value )
return this . WriteChangelog ( fmt . Sprintf ( "%s at %d" , hint , time . Now ( ) . UnixNano ( ) ) , value )
}
2016-04-11 15:27:16 +00:00
func ( this * Applier ) WriteChangelogState ( value string ) ( string , error ) {
return this . WriteAndLogChangelog ( "state" , value )
}
2016-04-07 13:57:12 +00:00
// InitiateHeartbeat creates a heartbeat cycle, writing to the changelog table.
// This is done asynchronously
2016-08-30 07:41:59 +00:00
func ( this * Applier ) InitiateHeartbeat ( ) {
2016-06-19 15:55:37 +00:00
var numSuccessiveFailures int64
2016-04-14 11:37:56 +00:00
injectHeartbeat := func ( ) error {
2016-04-18 17:57:18 +00:00
if _ , err := this . WriteChangelog ( "heartbeat" , time . Now ( ) . Format ( time . RFC3339Nano ) ) ; err != nil {
2016-04-14 11:37:56 +00:00
numSuccessiveFailures ++
if numSuccessiveFailures > this . migrationContext . MaxRetries ( ) {
return log . Errore ( err )
2016-04-07 13:57:12 +00:00
}
2016-04-14 11:37:56 +00:00
} else {
numSuccessiveFailures = 0
2016-04-07 13:57:12 +00:00
}
2016-04-14 11:37:56 +00:00
return nil
}
injectHeartbeat ( )
2016-08-30 07:41:59 +00:00
heartbeatTick := time . Tick ( time . Duration ( this . migrationContext . HeartbeatIntervalMilliseconds ) * time . Millisecond )
2016-04-14 11:37:56 +00:00
for range heartbeatTick {
// Generally speaking, we would issue a goroutine, but I'd actually rather
2016-08-30 07:41:59 +00:00
// have this block the loop rather than spam the master in the event something
2016-04-14 11:37:56 +00:00
// goes wrong
2016-10-27 12:51:38 +00:00
if throttle , _ , reasonHint := this . migrationContext . IsThrottled ( ) ; throttle && ( reasonHint == base . UserCommandThrottleReasonHint ) {
continue
}
2016-04-14 11:37:56 +00:00
if err := injectHeartbeat ( ) ; err != nil {
return
2016-04-07 13:57:12 +00:00
}
2016-04-14 11:37:56 +00:00
}
2016-04-07 13:57:12 +00:00
}
2016-06-19 15:55:37 +00:00
// ExecuteThrottleQuery executes the `--throttle-query` and returns its results.
2016-06-18 19:12:07 +00:00
func ( this * Applier ) ExecuteThrottleQuery ( ) ( int64 , error ) {
throttleQuery := this . migrationContext . GetThrottleQuery ( )
if throttleQuery == "" {
return 0 , nil
}
var result int64
if err := this . db . QueryRow ( throttleQuery ) . Scan ( & result ) ; err != nil {
return 0 , log . Errore ( err )
}
return result , nil
}
2016-06-19 15:55:37 +00:00
// ReadMigrationMinValues returns the minimum values to be iterated on rowcopy
2016-04-04 16:19:46 +00:00
func ( this * Applier ) ReadMigrationMinValues ( uniqueKey * sql . UniqueKey ) error {
log . Debugf ( "Reading migration range according to key: %s" , uniqueKey . Name )
2016-10-19 13:22:29 +00:00
query , err := sql . BuildUniqueKeyMinValuesPreparedQuery ( this . migrationContext . DatabaseName , this . migrationContext . OriginalTableName , & uniqueKey . Columns )
2016-04-04 16:19:46 +00:00
if err != nil {
return err
}
rows , err := this . db . Query ( query )
if err != nil {
return err
}
for rows . Next ( ) {
2016-04-11 15:27:16 +00:00
this . migrationContext . MigrationRangeMinValues = sql . NewColumnValues ( uniqueKey . Len ( ) )
2016-04-04 16:19:46 +00:00
if err = rows . Scan ( this . migrationContext . MigrationRangeMinValues . ValuesPointers ... ) ; err != nil {
return err
}
}
log . Infof ( "Migration min values: [%s]" , this . migrationContext . MigrationRangeMinValues )
return err
}
2016-06-19 15:55:37 +00:00
// ReadMigrationMaxValues returns the maximum values to be iterated on rowcopy
2016-04-04 16:19:46 +00:00
func ( this * Applier ) ReadMigrationMaxValues ( uniqueKey * sql . UniqueKey ) error {
log . Debugf ( "Reading migration range according to key: %s" , uniqueKey . Name )
2016-10-19 13:22:29 +00:00
query , err := sql . BuildUniqueKeyMaxValuesPreparedQuery ( this . migrationContext . DatabaseName , this . migrationContext . OriginalTableName , & uniqueKey . Columns )
2016-04-04 16:19:46 +00:00
if err != nil {
return err
}
rows , err := this . db . Query ( query )
if err != nil {
return err
}
for rows . Next ( ) {
2016-04-11 15:27:16 +00:00
this . migrationContext . MigrationRangeMaxValues = sql . NewColumnValues ( uniqueKey . Len ( ) )
2016-04-04 16:19:46 +00:00
if err = rows . Scan ( this . migrationContext . MigrationRangeMaxValues . ValuesPointers ... ) ; err != nil {
return err
}
}
log . Infof ( "Migration max values: [%s]" , this . migrationContext . MigrationRangeMaxValues )
return err
}
2016-06-19 15:55:37 +00:00
// ReadMigrationRangeValues reads min/max values that will be used for rowcopy
2016-04-05 07:14:22 +00:00
func ( this * Applier ) ReadMigrationRangeValues ( ) error {
if err := this . ReadMigrationMinValues ( this . migrationContext . UniqueKey ) ; err != nil {
2016-04-04 16:19:46 +00:00
return err
}
2016-04-05 07:14:22 +00:00
if err := this . ReadMigrationMaxValues ( this . migrationContext . UniqueKey ) ; err != nil {
2016-04-04 16:19:46 +00:00
return err
}
return nil
}
2016-04-08 08:34:44 +00:00
// CalculateNextIterationRangeEndValues reads the next-iteration-range-end unique key values,
// which will be used for copying the next chunk of rows. Ir returns "false" if there is
// no further chunk to work through, i.e. we're past the last chunk and are done with
// itrating the range (and this done with copying row chunks)
func ( this * Applier ) CalculateNextIterationRangeEndValues ( ) ( hasFurtherRange bool , err error ) {
2016-04-05 07:14:22 +00:00
this . migrationContext . MigrationIterationRangeMinValues = this . migrationContext . MigrationIterationRangeMaxValues
2016-04-06 11:05:58 +00:00
if this . migrationContext . MigrationIterationRangeMinValues == nil {
this . migrationContext . MigrationIterationRangeMinValues = this . migrationContext . MigrationRangeMinValues
2016-04-05 07:14:22 +00:00
}
2016-04-05 17:50:49 +00:00
query , explodedArgs , err := sql . BuildUniqueKeyRangeEndPreparedQuery (
this . migrationContext . DatabaseName ,
this . migrationContext . OriginalTableName ,
2016-10-19 13:22:29 +00:00
& this . migrationContext . UniqueKey . Columns ,
2016-04-06 11:05:58 +00:00
this . migrationContext . MigrationIterationRangeMinValues . AbstractValues ( ) ,
2016-04-05 17:50:49 +00:00
this . migrationContext . MigrationRangeMaxValues . AbstractValues ( ) ,
2016-06-07 09:59:17 +00:00
atomic . LoadInt64 ( & this . migrationContext . ChunkSize ) ,
2016-05-18 12:53:09 +00:00
this . migrationContext . GetIteration ( ) == 0 ,
2016-04-08 12:35:06 +00:00
fmt . Sprintf ( "iteration:%d" , this . migrationContext . GetIteration ( ) ) ,
2016-04-05 17:50:49 +00:00
)
if err != nil {
2016-04-08 08:34:44 +00:00
return hasFurtherRange , err
2016-04-05 17:50:49 +00:00
}
rows , err := this . db . Query ( query , explodedArgs ... )
2016-04-05 07:14:22 +00:00
if err != nil {
2016-04-08 08:34:44 +00:00
return hasFurtherRange , err
2016-04-05 07:14:22 +00:00
}
2016-04-11 15:27:16 +00:00
iterationRangeMaxValues := sql . NewColumnValues ( this . migrationContext . UniqueKey . Len ( ) )
2016-04-05 07:14:22 +00:00
for rows . Next ( ) {
if err = rows . Scan ( iterationRangeMaxValues . ValuesPointers ... ) ; err != nil {
2016-04-08 08:34:44 +00:00
return hasFurtherRange , err
2016-04-05 07:14:22 +00:00
}
2016-04-08 08:34:44 +00:00
hasFurtherRange = true
2016-04-05 07:14:22 +00:00
}
2016-04-08 08:34:44 +00:00
if ! hasFurtherRange {
2016-04-14 11:37:56 +00:00
log . Debugf ( "Iteration complete: no further range to iterate" )
2016-04-08 08:34:44 +00:00
return hasFurtherRange , nil
2016-04-05 07:14:22 +00:00
}
this . migrationContext . MigrationIterationRangeMaxValues = iterationRangeMaxValues
2016-04-08 08:34:44 +00:00
return hasFurtherRange , nil
2016-04-06 11:05:58 +00:00
}
2016-06-19 15:55:37 +00:00
// ApplyIterationInsertQuery issues a chunk-INSERT query on the ghost table. It is where
// data actually gets copied from original table.
2016-04-08 08:34:44 +00:00
func ( this * Applier ) ApplyIterationInsertQuery ( ) ( chunkSize int64 , rowsAffected int64 , duration time . Duration , err error ) {
startTime := time . Now ( )
chunkSize = atomic . LoadInt64 ( & this . migrationContext . ChunkSize )
2016-04-06 11:05:58 +00:00
query , explodedArgs , err := sql . BuildRangeInsertPreparedQuery (
this . migrationContext . DatabaseName ,
this . migrationContext . OriginalTableName ,
this . migrationContext . GetGhostTableName ( ) ,
2016-09-07 12:24:11 +00:00
this . migrationContext . SharedColumns . Names ( ) ,
this . migrationContext . MappedSharedColumns . Names ( ) ,
2016-04-06 11:05:58 +00:00
this . migrationContext . UniqueKey . Name ,
2016-10-19 13:22:29 +00:00
& this . migrationContext . UniqueKey . Columns ,
2016-04-06 11:05:58 +00:00
this . migrationContext . MigrationIterationRangeMinValues . AbstractValues ( ) ,
this . migrationContext . MigrationIterationRangeMaxValues . AbstractValues ( ) ,
2016-04-08 12:35:06 +00:00
this . migrationContext . GetIteration ( ) == 0 ,
2016-04-08 08:34:44 +00:00
this . migrationContext . IsTransactionalTable ( ) ,
2016-04-06 11:05:58 +00:00
)
if err != nil {
2016-04-08 08:34:44 +00:00
return chunkSize , rowsAffected , duration , err
2016-04-06 11:05:58 +00:00
}
2016-10-08 09:06:27 +00:00
sqlResult , err := func ( ) ( gosql . Result , error ) {
tx , err := this . db . Begin ( )
if err != nil {
return nil , err
}
2016-10-10 09:39:57 +00:00
sessionQuery := fmt . Sprintf ( ` SET
SESSION time_zone = ' % s ' ,
2016-10-08 09:06:27 +00:00
sql_mode = CONCAT ( @ @ session . sql_mode , ' , STRICT_ALL_TABLES ' )
2016-10-11 14:00:26 +00:00
` , this . migrationContext . ApplierTimeZone )
2016-10-10 09:39:57 +00:00
if _ , err := tx . Exec ( sessionQuery ) ; err != nil {
2016-10-08 09:06:27 +00:00
return nil , err
}
result , err := tx . Exec ( query , explodedArgs ... )
if err != nil {
return nil , err
}
if err := tx . Commit ( ) ; err != nil {
return nil , err
}
return result , nil
} ( )
2016-04-08 08:34:44 +00:00
if err != nil {
return chunkSize , rowsAffected , duration , err
2016-04-06 11:05:58 +00:00
}
2016-04-08 08:34:44 +00:00
rowsAffected , _ = sqlResult . RowsAffected ( )
2016-08-02 12:38:56 +00:00
duration = time . Since ( startTime )
2016-04-06 11:05:58 +00:00
log . Debugf (
"Issued INSERT on range: [%s]..[%s]; iteration: %d; chunk-size: %d" ,
this . migrationContext . MigrationIterationRangeMinValues ,
this . migrationContext . MigrationIterationRangeMaxValues ,
2016-04-08 12:35:06 +00:00
this . migrationContext . GetIteration ( ) ,
2016-04-08 08:34:44 +00:00
chunkSize )
return chunkSize , rowsAffected , duration , nil
2016-04-05 07:14:22 +00:00
}
2016-06-14 06:35:07 +00:00
// LockOriginalTable places a write lock on the original table
func ( this * Applier ) LockOriginalTable ( ) error {
2016-05-16 09:09:17 +00:00
query := fmt . Sprintf ( ` lock /* gh-ost */ tables %s.%s write ` ,
2016-04-08 08:34:44 +00:00
sql . EscapeName ( this . migrationContext . DatabaseName ) ,
sql . EscapeName ( this . migrationContext . OriginalTableName ) ,
)
2016-06-14 06:35:07 +00:00
log . Infof ( "Locking %s.%s" ,
sql . EscapeName ( this . migrationContext . DatabaseName ) ,
sql . EscapeName ( this . migrationContext . OriginalTableName ) ,
)
2016-04-18 17:57:18 +00:00
this . migrationContext . LockTablesStartTime = time . Now ( )
if _ , err := sqlutils . ExecNoPrepare ( this . singletonDB , query ) ; err != nil {
2016-04-04 16:19:46 +00:00
return err
}
2016-06-14 06:35:07 +00:00
log . Infof ( "Table locked" )
2016-04-08 08:34:44 +00:00
return nil
}
2016-04-04 16:19:46 +00:00
2016-06-19 15:55:37 +00:00
// UnlockTables makes tea. No wait, it unlocks tables.
2016-04-08 08:34:44 +00:00
func ( this * Applier ) UnlockTables ( ) error {
2016-05-16 09:09:17 +00:00
query := ` unlock /* gh-ost */ tables `
2016-04-08 08:34:44 +00:00
log . Infof ( "Unlocking tables" )
2016-04-18 17:57:18 +00:00
if _ , err := sqlutils . ExecNoPrepare ( this . singletonDB , query ) ; err != nil {
2016-04-04 16:19:46 +00:00
return err
}
2016-04-08 08:34:44 +00:00
log . Infof ( "Tables unlocked" )
2016-04-04 16:19:46 +00:00
return nil
}
2016-04-08 12:35:06 +00:00
2016-06-19 15:55:37 +00:00
// SwapTablesQuickAndBumpy issues a two-step swap table operation:
// - rename original table to _old
// - rename ghost table to original
// There is a point in time in between where the table does not exist.
2016-04-22 20:41:20 +00:00
func ( this * Applier ) SwapTablesQuickAndBumpy ( ) error {
2016-05-16 09:09:17 +00:00
query := fmt . Sprintf ( ` alter /* gh-ost */ table %s.%s rename %s ` ,
2016-04-18 17:57:18 +00:00
sql . EscapeName ( this . migrationContext . DatabaseName ) ,
sql . EscapeName ( this . migrationContext . OriginalTableName ) ,
sql . EscapeName ( this . migrationContext . GetOldTableName ( ) ) ,
)
log . Infof ( "Renaming original table" )
this . migrationContext . RenameTablesStartTime = time . Now ( )
if _ , err := sqlutils . ExecNoPrepare ( this . singletonDB , query ) ; err != nil {
return err
}
2016-05-16 09:09:17 +00:00
query = fmt . Sprintf ( ` alter /* gh-ost */ table %s.%s rename %s ` ,
2016-04-18 17:57:18 +00:00
sql . EscapeName ( this . migrationContext . DatabaseName ) ,
sql . EscapeName ( this . migrationContext . GetGhostTableName ( ) ) ,
sql . EscapeName ( this . migrationContext . OriginalTableName ) ,
)
log . Infof ( "Renaming ghost table" )
if _ , err := sqlutils . ExecNoPrepare ( this . db , query ) ; err != nil {
return err
}
this . migrationContext . RenameTablesEndTime = time . Now ( )
log . Infof ( "Tables renamed" )
return nil
}
2016-06-19 15:55:37 +00:00
// RenameTablesRollback renames back both table: original back to ghost,
// _old back to original. This is used by `--test-on-replica`
2016-06-14 07:01:06 +00:00
func ( this * Applier ) RenameTablesRollback ( ) ( renameError error ) {
// Restoring tables to original names.
// We prefer the single, atomic operation:
query := fmt . Sprintf ( ` rename /* gh-ost */ table %s.%s to %s.%s, %s.%s to %s.%s ` ,
2016-04-22 20:41:20 +00:00
sql . EscapeName ( this . migrationContext . DatabaseName ) ,
sql . EscapeName ( this . migrationContext . OriginalTableName ) ,
sql . EscapeName ( this . migrationContext . DatabaseName ) ,
sql . EscapeName ( this . migrationContext . GetGhostTableName ( ) ) ,
sql . EscapeName ( this . migrationContext . DatabaseName ) ,
2016-06-14 07:01:06 +00:00
sql . EscapeName ( this . migrationContext . GetOldTableName ( ) ) ,
sql . EscapeName ( this . migrationContext . DatabaseName ) ,
2016-04-22 20:41:20 +00:00
sql . EscapeName ( this . migrationContext . OriginalTableName ) ,
)
2016-06-14 07:01:06 +00:00
log . Infof ( "Renaming back both tables" )
if _ , err := sqlutils . ExecNoPrepare ( this . db , query ) ; err == nil {
return nil
2016-04-22 20:41:20 +00:00
}
2016-06-14 07:01:06 +00:00
// But, if for some reason the above was impossible to do, we rename one by one.
query = fmt . Sprintf ( ` rename /* gh-ost */ table %s.%s to %s.%s ` ,
2016-06-10 09:15:11 +00:00
sql . EscapeName ( this . migrationContext . DatabaseName ) ,
sql . EscapeName ( this . migrationContext . OriginalTableName ) ,
sql . EscapeName ( this . migrationContext . DatabaseName ) ,
sql . EscapeName ( this . migrationContext . GetGhostTableName ( ) ) ,
)
log . Infof ( "Renaming back to ghost table" )
if _ , err := sqlutils . ExecNoPrepare ( this . db , query ) ; err != nil {
renameError = err
}
query = fmt . Sprintf ( ` rename /* gh-ost */ table %s.%s to %s.%s ` ,
sql . EscapeName ( this . migrationContext . DatabaseName ) ,
sql . EscapeName ( this . migrationContext . GetOldTableName ( ) ) ,
sql . EscapeName ( this . migrationContext . DatabaseName ) ,
sql . EscapeName ( this . migrationContext . OriginalTableName ) ,
)
log . Infof ( "Renaming back to original table" )
if _ , err := sqlutils . ExecNoPrepare ( this . db , query ) ; err != nil {
renameError = err
}
return log . Errore ( renameError )
}
2016-04-23 02:46:34 +00:00
// StopSlaveIOThread is applicable with --test-on-replica; it stops the IO thread, duh.
// We need to keep the SQL thread active so as to complete processing received events,
2016-06-14 07:01:06 +00:00
// and have them written to the binary log, so that we can then read them via streamer.
2016-04-18 17:57:18 +00:00
func ( this * Applier ) StopSlaveIOThread ( ) error {
2016-05-16 09:09:17 +00:00
query := ` stop /* gh-ost */ slave io_thread `
2016-04-18 17:57:18 +00:00
log . Infof ( "Stopping replication" )
if _ , err := sqlutils . ExecNoPrepare ( this . db , query ) ; err != nil {
return err
}
log . Infof ( "Replication stopped" )
return nil
}
2016-06-14 10:50:07 +00:00
// StartSlaveSQLThread is applicable with --test-on-replica
func ( this * Applier ) StopSlaveSQLThread ( ) error {
query := ` stop /* gh-ost */ slave sql_thread `
log . Infof ( "Verifying SQL thread is stopped" )
if _ , err := sqlutils . ExecNoPrepare ( this . db , query ) ; err != nil {
return err
}
log . Infof ( "SQL thread stopped" )
return nil
}
2016-05-20 10:52:14 +00:00
// StartSlaveSQLThread is applicable with --test-on-replica
func ( this * Applier ) StartSlaveSQLThread ( ) error {
query := ` start /* gh-ost */ slave sql_thread `
log . Infof ( "Verifying SQL thread is running" )
if _ , err := sqlutils . ExecNoPrepare ( this . db , query ) ; err != nil {
return err
}
log . Infof ( "SQL thread started" )
return nil
}
2016-06-19 15:55:37 +00:00
// StopReplication is used by `--test-on-replica` and stops replication.
func ( this * Applier ) StopReplication ( ) error {
2016-05-16 09:03:15 +00:00
if err := this . StopSlaveIOThread ( ) ; err != nil {
return err
}
2016-06-14 10:50:07 +00:00
if err := this . StopSlaveSQLThread ( ) ; err != nil {
2016-05-20 10:52:14 +00:00
return err
}
2016-08-15 16:56:17 +00:00
2016-05-19 13:11:36 +00:00
readBinlogCoordinates , executeBinlogCoordinates , err := mysql . GetReplicationBinlogCoordinates ( this . db )
2016-05-16 09:03:15 +00:00
if err != nil {
return err
}
2016-05-19 13:11:36 +00:00
log . Infof ( "Replication IO thread at %+v. SQL thread is at %+v" , * readBinlogCoordinates , * executeBinlogCoordinates )
2016-05-16 09:03:15 +00:00
return nil
}
2016-06-19 15:55:37 +00:00
// GetSessionLockName returns a name for the special hint session voluntary lock
2016-06-14 06:35:07 +00:00
func ( this * Applier ) GetSessionLockName ( sessionId int64 ) string {
return fmt . Sprintf ( "gh-ost.%d.lock" , sessionId )
}
2016-06-19 15:55:37 +00:00
// ExpectUsedLock expects the special hint voluntary lock to exist on given session
2016-06-14 06:35:07 +00:00
func ( this * Applier ) ExpectUsedLock ( sessionId int64 ) error {
var result int64
query := ` select is_used_lock(?) `
lockName := this . GetSessionLockName ( sessionId )
log . Infof ( "Checking session lock: %s" , lockName )
if err := this . db . QueryRow ( query , lockName ) . Scan ( & result ) ; err != nil || result != sessionId {
return fmt . Errorf ( "Session lock %s expected to be found but wasn't" , lockName )
}
return nil
}
2016-06-19 15:55:37 +00:00
// ExpectProcess expects a process to show up in `SHOW PROCESSLIST` that has given characteristics
2016-04-23 02:46:34 +00:00
func ( this * Applier ) ExpectProcess ( sessionId int64 , stateHint , infoHint string ) error {
found := false
query := `
select id
from information_schema . processlist
where
id != connection_id ( )
and ? in ( 0 , id )
and state like concat ( '%' , ? , '%' )
and info like concat ( '%' , ? , '%' )
`
err := sqlutils . QueryRowsMap ( this . db , query , func ( m sqlutils . RowMap ) error {
found = true
return nil
} , sessionId , stateHint , infoHint )
if err != nil {
return err
}
if ! found {
return fmt . Errorf ( "Cannot find process. Hints: %s, %s" , stateHint , infoHint )
}
return nil
}
2016-06-27 09:08:06 +00:00
// DropAtomicCutOverSentryTableIfExists checks if the "old" table name
// happens to be a cut-over magic table; if so, it drops it.
func ( this * Applier ) DropAtomicCutOverSentryTableIfExists ( ) error {
log . Infof ( "Looking for magic cut-over table" )
tableName := this . migrationContext . GetOldTableName ( )
rowMap := this . showTableStatus ( tableName )
if rowMap == nil {
// Table does not exist
return nil
}
if rowMap [ "Comment" ] . String != atomicCutOverMagicHint {
return fmt . Errorf ( "Expected magic comment on %s, did not find it" , tableName )
}
log . Infof ( "Dropping magic cut-over table" )
return this . dropTable ( tableName )
}
2016-11-17 16:10:17 +00:00
// CreateAtomicCutOverSentryTable
2016-06-27 09:08:06 +00:00
func ( this * Applier ) CreateAtomicCutOverSentryTable ( ) error {
if err := this . DropAtomicCutOverSentryTableIfExists ( ) ; err != nil {
return err
}
tableName := this . migrationContext . GetOldTableName ( )
query := fmt . Sprintf ( ` create /* gh-ost */ table % s . % s (
id int auto_increment primary key
2016-12-01 08:04:04 +00:00
) engine = % s comment = ' % s '
2016-06-27 09:08:06 +00:00
` ,
sql . EscapeName ( this . migrationContext . DatabaseName ) ,
sql . EscapeName ( tableName ) ,
2016-12-02 03:56:29 +00:00
this . migrationContext . TableEngine ,
2016-06-27 09:08:06 +00:00
atomicCutOverMagicHint ,
)
log . Infof ( "Creating magic cut-over table %s.%s" ,
sql . EscapeName ( this . migrationContext . DatabaseName ) ,
sql . EscapeName ( tableName ) ,
)
if _ , err := sqlutils . ExecNoPrepare ( this . db , query ) ; err != nil {
return err
}
log . Infof ( "Magic cut-over table created" )
return nil
}
// AtomicCutOverMagicLock
func ( this * Applier ) AtomicCutOverMagicLock ( sessionIdChan chan int64 , tableLocked chan <- error , okToUnlockTable <- chan bool , tableUnlocked chan <- error ) error {
tx , err := this . db . Begin ( )
if err != nil {
tableLocked <- err
return err
}
defer func ( ) {
sessionIdChan <- - 1
tableLocked <- fmt . Errorf ( "Unexpected error in AtomicCutOverMagicLock(), injected to release blocking channel reads" )
tableUnlocked <- fmt . Errorf ( "Unexpected error in AtomicCutOverMagicLock(), injected to release blocking channel reads" )
tx . Rollback ( )
} ( )
var sessionId int64
if err := tx . QueryRow ( ` select connection_id() ` ) . Scan ( & sessionId ) ; err != nil {
tableLocked <- err
return err
}
sessionIdChan <- sessionId
lockResult := 0
query := ` select get_lock(?, 0) `
lockName := this . GetSessionLockName ( sessionId )
log . Infof ( "Grabbing voluntary lock: %s" , lockName )
if err := tx . QueryRow ( query , lockName ) . Scan ( & lockResult ) ; err != nil || lockResult != 1 {
err := fmt . Errorf ( "Unable to acquire lock %s" , lockName )
tableLocked <- err
return err
}
2016-07-08 08:14:58 +00:00
tableLockTimeoutSeconds := this . migrationContext . CutOverLockTimeoutSeconds * 2
2016-06-27 09:08:06 +00:00
log . Infof ( "Setting LOCK timeout as %d seconds" , tableLockTimeoutSeconds )
query = fmt . Sprintf ( ` set session lock_wait_timeout:=%d ` , tableLockTimeoutSeconds )
if _ , err := tx . Exec ( query ) ; err != nil {
tableLocked <- err
return err
}
if err := this . CreateAtomicCutOverSentryTable ( ) ; err != nil {
tableLocked <- err
return err
}
query = fmt . Sprintf ( ` lock /* gh-ost */ tables %s.%s write, %s.%s write ` ,
sql . EscapeName ( this . migrationContext . DatabaseName ) ,
sql . EscapeName ( this . migrationContext . OriginalTableName ) ,
sql . EscapeName ( this . migrationContext . DatabaseName ) ,
sql . EscapeName ( this . migrationContext . GetOldTableName ( ) ) ,
)
log . Infof ( "Locking %s.%s, %s.%s" ,
sql . EscapeName ( this . migrationContext . DatabaseName ) ,
sql . EscapeName ( this . migrationContext . OriginalTableName ) ,
sql . EscapeName ( this . migrationContext . DatabaseName ) ,
sql . EscapeName ( this . migrationContext . GetOldTableName ( ) ) ,
)
this . migrationContext . LockTablesStartTime = time . Now ( )
if _ , err := tx . Exec ( query ) ; err != nil {
tableLocked <- err
return err
}
log . Infof ( "Tables locked" )
tableLocked <- nil // No error.
// From this point on, we are committed to UNLOCK TABLES. No matter what happens,
// the UNLOCK must execute (or, alternatively, this connection dies, which gets the same impact)
// The cut-over phase will proceed to apply remaining backlog onto ghost table,
// and issue RENAME. We wait here until told to proceed.
<- okToUnlockTable
log . Infof ( "Will now proceed to drop magic table and unlock tables" )
// The magic table is here because we locked it. And we are the only ones allowed to drop it.
// And in fact, we will:
log . Infof ( "Dropping magic cut-over table" )
query = fmt . Sprintf ( ` drop /* gh-ost */ table if exists %s.%s ` ,
sql . EscapeName ( this . migrationContext . DatabaseName ) ,
sql . EscapeName ( this . migrationContext . GetOldTableName ( ) ) ,
)
if _ , err := tx . Exec ( query ) ; err != nil {
log . Errore ( err )
// We DO NOT return here because we must `UNLOCK TABLES`!
}
// Tables still locked
log . Infof ( "Releasing lock from %s.%s, %s.%s" ,
sql . EscapeName ( this . migrationContext . DatabaseName ) ,
sql . EscapeName ( this . migrationContext . OriginalTableName ) ,
sql . EscapeName ( this . migrationContext . DatabaseName ) ,
sql . EscapeName ( this . migrationContext . GetOldTableName ( ) ) ,
)
query = ` unlock tables `
if _ , err := tx . Exec ( query ) ; err != nil {
tableUnlocked <- err
return log . Errore ( err )
}
log . Infof ( "Tables unlocked" )
tableUnlocked <- nil
return nil
}
2016-07-16 14:12:19 +00:00
// AtomicCutoverRename
2016-06-27 09:08:06 +00:00
func ( this * Applier ) AtomicCutoverRename ( sessionIdChan chan int64 , tablesRenamed chan <- error ) error {
tx , err := this . db . Begin ( )
if err != nil {
return err
}
defer func ( ) {
tx . Rollback ( )
sessionIdChan <- - 1
tablesRenamed <- fmt . Errorf ( "Unexpected error in AtomicCutoverRename(), injected to release blocking channel reads" )
} ( )
var sessionId int64
if err := tx . QueryRow ( ` select connection_id() ` ) . Scan ( & sessionId ) ; err != nil {
return err
}
sessionIdChan <- sessionId
2016-07-08 08:14:58 +00:00
log . Infof ( "Setting RENAME timeout as %d seconds" , this . migrationContext . CutOverLockTimeoutSeconds )
query := fmt . Sprintf ( ` set session lock_wait_timeout:=%d ` , this . migrationContext . CutOverLockTimeoutSeconds )
2016-06-27 09:08:06 +00:00
if _ , err := tx . Exec ( query ) ; err != nil {
return err
}
query = fmt . Sprintf ( ` rename /* gh-ost */ table %s.%s to %s.%s, %s.%s to %s.%s ` ,
sql . EscapeName ( this . migrationContext . DatabaseName ) ,
sql . EscapeName ( this . migrationContext . OriginalTableName ) ,
sql . EscapeName ( this . migrationContext . DatabaseName ) ,
sql . EscapeName ( this . migrationContext . GetOldTableName ( ) ) ,
sql . EscapeName ( this . migrationContext . DatabaseName ) ,
sql . EscapeName ( this . migrationContext . GetGhostTableName ( ) ) ,
sql . EscapeName ( this . migrationContext . DatabaseName ) ,
sql . EscapeName ( this . migrationContext . OriginalTableName ) ,
)
log . Infof ( "Issuing and expecting this to block: %s" , query )
if _ , err := tx . Exec ( query ) ; err != nil {
tablesRenamed <- err
return log . Errore ( err )
}
tablesRenamed <- nil
log . Infof ( "Tables renamed" )
return nil
}
2016-04-08 12:35:06 +00:00
func ( this * Applier ) ShowStatusVariable ( variableName string ) ( result int64 , err error ) {
query := fmt . Sprintf ( ` show global status like '%s' ` , variableName )
if err := this . db . QueryRow ( query ) . Scan ( & variableName , & result ) ; err != nil {
return 0 , err
}
return result , nil
}
2016-04-11 15:27:16 +00:00
2016-06-19 15:55:37 +00:00
// buildDMLEventQuery creates a query to operate on the ghost table, based on an intercepted binlog
// event entry on the original table.
2016-05-04 05:23:34 +00:00
func ( this * Applier ) buildDMLEventQuery ( dmlEvent * binlog . BinlogDMLEvent ) ( query string , args [ ] interface { } , rowsDelta int64 , err error ) {
2016-04-11 15:27:16 +00:00
switch dmlEvent . DML {
case binlog . DeleteDML :
{
2016-04-14 11:37:56 +00:00
query , uniqueKeyArgs , err := sql . BuildDMLDeleteQuery ( dmlEvent . DatabaseName , this . migrationContext . GetGhostTableName ( ) , this . migrationContext . OriginalTableColumns , & this . migrationContext . UniqueKey . Columns , dmlEvent . WhereColumnValues . AbstractValues ( ) )
2016-05-04 05:23:34 +00:00
return query , uniqueKeyArgs , - 1 , err
2016-04-14 11:37:56 +00:00
}
case binlog . InsertDML :
{
2016-08-22 14:00:15 +00:00
query , sharedArgs , err := sql . BuildDMLInsertQuery ( dmlEvent . DatabaseName , this . migrationContext . GetGhostTableName ( ) , this . migrationContext . OriginalTableColumns , this . migrationContext . SharedColumns , this . migrationContext . MappedSharedColumns , dmlEvent . NewColumnValues . AbstractValues ( ) )
2016-05-04 05:23:34 +00:00
return query , sharedArgs , 1 , err
2016-04-14 11:37:56 +00:00
}
case binlog . UpdateDML :
{
2016-08-22 06:49:27 +00:00
query , sharedArgs , uniqueKeyArgs , err := sql . BuildDMLUpdateQuery ( dmlEvent . DatabaseName , this . migrationContext . GetGhostTableName ( ) , this . migrationContext . OriginalTableColumns , this . migrationContext . SharedColumns , this . migrationContext . MappedSharedColumns , & this . migrationContext . UniqueKey . Columns , dmlEvent . NewColumnValues . AbstractValues ( ) , dmlEvent . WhereColumnValues . AbstractValues ( ) )
2016-04-14 11:37:56 +00:00
args = append ( args , sharedArgs ... )
args = append ( args , uniqueKeyArgs ... )
2016-05-04 05:23:34 +00:00
return query , args , 0 , err
2016-04-11 15:27:16 +00:00
}
}
2016-05-04 05:23:34 +00:00
return "" , args , 0 , fmt . Errorf ( "Unknown dml event type: %+v" , dmlEvent . DML )
2016-04-14 11:37:56 +00:00
}
2016-06-19 15:55:37 +00:00
// ApplyDMLEventQuery writes an entry to the ghost table, in response to an intercepted
// original-table binlog event
2016-04-14 11:37:56 +00:00
func ( this * Applier ) ApplyDMLEventQuery ( dmlEvent * binlog . BinlogDMLEvent ) error {
2016-05-04 05:23:34 +00:00
query , args , rowDelta , err := this . buildDMLEventQuery ( dmlEvent )
2016-04-14 11:37:56 +00:00
if err != nil {
return err
}
2016-08-18 11:31:53 +00:00
// TODO The below is in preparation for transactional writes on the ghost tables.
2016-08-17 08:50:41 +00:00
// Such writes would be, for example:
// - prepended with sql_mode setup
2016-08-18 11:31:53 +00:00
// - prepended with time zone setup
2016-08-17 08:50:41 +00:00
// - prepended with SET SQL_LOG_BIN=0
// - prepended with SET FK_CHECKS=0
// etc.
//
2016-08-18 11:31:53 +00:00
// a known problem: https://github.com/golang/go/issues/9373 -- bitint unsigned values, not supported in database/sql
// is solved by silently converting unsigned bigints to string values.
2016-08-17 08:50:41 +00:00
//
2016-08-18 11:31:53 +00:00
err = func ( ) error {
tx , err := this . db . Begin ( )
if err != nil {
return err
}
2016-10-13 11:08:02 +00:00
sessionQuery := ` SET
SESSION time_zone = ' + 00 : 00 ' ,
2016-08-23 09:58:52 +00:00
sql_mode = CONCAT ( @ @ session . sql_mode , ' , STRICT_ALL_TABLES ' )
2016-10-13 11:08:02 +00:00
`
2016-10-10 09:39:57 +00:00
if _ , err := tx . Exec ( sessionQuery ) ; err != nil {
2016-08-19 07:06:00 +00:00
return err
}
2016-08-18 11:31:53 +00:00
if _ , err := tx . Exec ( query , args ... ) ; err != nil {
return err
}
if err := tx . Commit ( ) ; err != nil {
return err
}
return nil
} ( )
2016-08-18 11:38:23 +00:00
if err != nil {
err = fmt . Errorf ( "%s; query=%s; args=%+v" , err . Error ( ) , query , args )
return log . Errore ( err )
2016-04-19 11:25:32 +00:00
}
2016-08-18 11:38:23 +00:00
// no error
atomic . AddInt64 ( & this . migrationContext . TotalDMLEventsApplied , 1 )
2016-05-04 05:23:34 +00:00
if this . migrationContext . CountTableRows {
2016-08-24 10:16:34 +00:00
atomic . AddInt64 ( & this . migrationContext . RowsDeltaEstimate , rowDelta )
2016-05-04 05:23:34 +00:00
}
2016-08-18 11:38:23 +00:00
return nil
2016-04-11 15:27:16 +00:00
}
2017-01-03 11:44:52 +00:00
// ApplyDMLEventQueries applies multiple DML queries onto the _ghost_ table
func ( this * Applier ) ApplyDMLEventQueries ( dmlEvents [ ] ( * binlog . BinlogDMLEvent ) ) error {
var totalDelta int64
err := func ( ) error {
tx , err := this . db . Begin ( )
if err != nil {
return err
}
2017-01-04 06:44:04 +00:00
rollback := func ( err error ) error {
tx . Rollback ( )
return err
}
2017-01-03 11:44:52 +00:00
sessionQuery := ` SET
SESSION time_zone = ' + 00 : 00 ' ,
sql_mode = CONCAT ( @ @ session . sql_mode , ' , STRICT_ALL_TABLES ' )
`
if _ , err := tx . Exec ( sessionQuery ) ; err != nil {
2017-01-04 06:44:04 +00:00
return rollback ( err )
2017-01-03 11:44:52 +00:00
}
for _ , dmlEvent := range dmlEvents {
query , args , rowDelta , err := this . buildDMLEventQuery ( dmlEvent )
if err != nil {
2017-01-04 06:44:04 +00:00
return rollback ( err )
2017-01-03 11:44:52 +00:00
}
if _ , err := tx . Exec ( query , args ... ) ; err != nil {
err = fmt . Errorf ( "%s; query=%s; args=%+v" , err . Error ( ) , query , args )
2017-01-04 06:44:04 +00:00
return rollback ( err )
2017-01-03 11:44:52 +00:00
}
totalDelta += rowDelta
}
if err := tx . Commit ( ) ; err != nil {
return err
}
return nil
} ( )
if err != nil {
return log . Errore ( err )
}
// no error
atomic . AddInt64 ( & this . migrationContext . TotalDMLEventsApplied , int64 ( len ( dmlEvents ) ) )
if this . migrationContext . CountTableRows {
atomic . AddInt64 ( & this . migrationContext . RowsDeltaEstimate , totalDelta )
}
2017-01-03 12:31:19 +00:00
log . Debugf ( "ApplyDMLEventQueries() applied %d events in one transaction" , len ( dmlEvents ) )
2017-01-03 11:44:52 +00:00
return nil
}