2016-04-04 10:27:51 +00:00
/ *
Copyright 2016 GitHub Inc .
2016-05-16 09:09:17 +00:00
See https : //github.com/github/gh-ost/blob/master/LICENSE
2016-04-04 10:27:51 +00:00
* /
package logic
import (
gosql "database/sql"
"fmt"
"strings"
2016-07-29 08:40:23 +00:00
"sync/atomic"
2016-04-04 10:27:51 +00:00
2016-05-16 09:09:17 +00:00
"github.com/github/gh-ost/go/base"
"github.com/github/gh-ost/go/mysql"
"github.com/github/gh-ost/go/sql"
2016-04-04 10:27:51 +00:00
"github.com/outbrain/golib/log"
"github.com/outbrain/golib/sqlutils"
)
// Inspector reads data from the read-MySQL-server (typically a replica, but can be the master)
// It is used for gaining initial status and structure, and later also follow up on progress and changelog
type Inspector struct {
connectionConfig * mysql . ConnectionConfig
db * gosql . DB
migrationContext * base . MigrationContext
}
2016-04-04 13:29:02 +00:00
func NewInspector ( ) * Inspector {
2016-04-04 10:27:51 +00:00
return & Inspector {
2016-04-04 13:29:02 +00:00
connectionConfig : base . GetMigrationContext ( ) . InspectorConnectionConfig ,
2016-04-04 10:27:51 +00:00
migrationContext : base . GetMigrationContext ( ) ,
}
}
func ( this * Inspector ) InitDBConnections ( ) ( err error ) {
2016-04-04 13:29:02 +00:00
inspectorUri := this . connectionConfig . GetDBUri ( this . migrationContext . DatabaseName )
2016-04-04 10:27:51 +00:00
if this . db , _ , err = sqlutils . GetDB ( inspectorUri ) ; err != nil {
return err
}
if err := this . validateConnection ( ) ; err != nil {
return err
}
2016-06-19 15:55:37 +00:00
if impliedKey , err := mysql . GetInstanceKey ( this . db ) ; err != nil {
return err
} else {
this . connectionConfig . ImpliedKey = impliedKey
}
2016-04-04 10:27:51 +00:00
if err := this . validateGrants ( ) ; err != nil {
return err
}
2016-04-18 17:57:18 +00:00
if err := this . validateBinlogs ( ) ; err != nil {
2016-04-11 15:27:16 +00:00
return err
}
2016-04-18 17:57:18 +00:00
if err := this . applyBinlogFormat ( ) ; err != nil {
2016-04-04 10:27:51 +00:00
return err
}
2016-04-07 13:57:12 +00:00
return nil
}
func ( this * Inspector ) ValidateOriginalTable ( ) ( err error ) {
2016-04-04 10:27:51 +00:00
if err := this . validateTable ( ) ; err != nil {
return err
}
2016-04-04 16:19:46 +00:00
if err := this . validateTableForeignKeys ( ) ; err != nil {
return err
}
2016-08-11 12:10:35 +00:00
if err := this . validateTableTriggers ( ) ; err != nil {
return err
}
2016-06-06 10:33:05 +00:00
if err := this . estimateTableRowsViaExplain ( ) ; err != nil {
return err
2016-04-04 10:27:51 +00:00
}
return nil
}
2016-04-11 15:27:16 +00:00
func ( this * Inspector ) InspectTableColumnsAndUniqueKeys ( tableName string ) ( columns * sql . ColumnList , uniqueKeys [ ] ( * sql . UniqueKey ) , err error ) {
2016-04-08 12:35:06 +00:00
uniqueKeys , err = this . getCandidateUniqueKeys ( tableName )
2016-04-04 10:27:51 +00:00
if err != nil {
2016-04-08 12:35:06 +00:00
return columns , uniqueKeys , err
2016-04-04 10:27:51 +00:00
}
if len ( uniqueKeys ) == 0 {
2016-04-08 12:35:06 +00:00
return columns , uniqueKeys , fmt . Errorf ( "No PRIMARY nor UNIQUE key found in table! Bailing out" )
}
columns , err = this . getTableColumns ( this . migrationContext . DatabaseName , tableName )
if err != nil {
return columns , uniqueKeys , err
}
return columns , uniqueKeys , nil
}
func ( this * Inspector ) InspectOriginalTable ( ) ( err error ) {
this . migrationContext . OriginalTableColumns , this . migrationContext . OriginalTableUniqueKeys , err = this . InspectTableColumnsAndUniqueKeys ( this . migrationContext . OriginalTableName )
if err == nil {
return err
}
return nil
}
2016-06-06 10:33:05 +00:00
// InspectOriginalAndGhostTables compares original and ghost tables to see whether the migration
// makes sense and is valid. It extracts the list of shared columns and the chosen migration unique key
2016-04-08 12:35:06 +00:00
func ( this * Inspector ) InspectOriginalAndGhostTables ( ) ( err error ) {
this . migrationContext . GhostTableColumns , this . migrationContext . GhostTableUniqueKeys , err = this . InspectTableColumnsAndUniqueKeys ( this . migrationContext . GetGhostTableName ( ) )
if err != nil {
return err
}
sharedUniqueKeys , err := this . getSharedUniqueKeys ( this . migrationContext . OriginalTableUniqueKeys , this . migrationContext . GhostTableUniqueKeys )
if err != nil {
return err
}
if len ( sharedUniqueKeys ) == 0 {
return fmt . Errorf ( "No shared unique key can be found after ALTER! Bailing out" )
2016-04-04 10:27:51 +00:00
}
2016-04-08 12:35:06 +00:00
this . migrationContext . UniqueKey = sharedUniqueKeys [ 0 ]
log . Infof ( "Chosen shared unique key is %s" , this . migrationContext . UniqueKey . Name )
2016-05-20 10:52:14 +00:00
if this . migrationContext . UniqueKey . HasNullable {
if this . migrationContext . NullableUniqueKeyAllowed {
log . Warningf ( "Chosen key (%s) has nullable columns. You have supplied with --allow-nullable-unique-key and so this migration proceeds. As long as there aren't NULL values in this key's column, migration should be fine. NULL values will corrupt migration's data" , this . migrationContext . UniqueKey )
} else {
return fmt . Errorf ( "Chosen key (%s) has nullable columns. Bailing out. To force this operation to continue, supply --allow-nullable-unique-key flag. Only do so if you are certain there are no actual NULL values in this key. As long as there aren't, migration should be fine. NULL values in columns of this key will corrupt migration's data" , this . migrationContext . UniqueKey )
}
}
2016-04-11 15:27:16 +00:00
if ! this . migrationContext . UniqueKey . IsPrimary ( ) {
2016-05-20 10:52:14 +00:00
if this . migrationContext . OriginalBinlogRowImage != "FULL" {
return fmt . Errorf ( "binlog_row_image is '%s' and chosen key is %s, which is not the primary key. This operation cannot proceed. You may `set global binlog_row_image='full'` and try again" , this . migrationContext . OriginalBinlogRowImage , this . migrationContext . UniqueKey )
2016-04-11 15:27:16 +00:00
}
}
2016-04-08 12:35:06 +00:00
2016-06-17 06:03:18 +00:00
this . migrationContext . SharedColumns , this . migrationContext . MappedSharedColumns = this . getSharedColumns ( this . migrationContext . OriginalTableColumns , this . migrationContext . GhostTableColumns , this . migrationContext . ColumnRenameMap )
2016-04-08 12:35:06 +00:00
log . Infof ( "Shared columns are %s" , this . migrationContext . SharedColumns )
// By fact that a non-empty unique key exists we also know the shared columns are non-empty
2016-08-17 04:51:58 +00:00
// This additional step looks at which columns are unsigned. We could have merged this within
// the `getTableColumns()` function, but it's a later patch and introduces some complexity; I feel
// comfortable in doing this as a separate step.
2016-09-07 12:24:11 +00:00
this . applyColumnTypes ( this . migrationContext . DatabaseName , this . migrationContext . OriginalTableName , this . migrationContext . OriginalTableColumns , this . migrationContext . SharedColumns )
this . applyColumnTypes ( this . migrationContext . DatabaseName , this . migrationContext . GetGhostTableName ( ) , this . migrationContext . GhostTableColumns , this . migrationContext . MappedSharedColumns )
2016-08-17 04:51:58 +00:00
2016-04-08 12:35:06 +00:00
return nil
2016-04-04 10:27:51 +00:00
}
// validateConnection issues a simple can-connect to MySQL
func ( this * Inspector ) validateConnection ( ) error {
2016-04-04 13:29:02 +00:00
query := ` select @@global.port `
2016-04-04 10:27:51 +00:00
var port int
if err := this . db . QueryRow ( query ) . Scan ( & port ) ; err != nil {
return err
}
2016-04-04 13:29:02 +00:00
if port != this . connectionConfig . Key . Port {
2016-04-04 10:27:51 +00:00
return fmt . Errorf ( "Unexpected database port reported: %+v" , port )
}
2016-04-04 13:29:02 +00:00
log . Infof ( "connection validated on %+v" , this . connectionConfig . Key )
2016-04-04 10:27:51 +00:00
return nil
}
// validateGrants verifies the user by which we're executing has necessary grants
// to do its thang.
func ( this * Inspector ) validateGrants ( ) error {
2016-05-16 09:09:17 +00:00
query := ` show /* gh-ost */ grants for current_user() `
2016-04-04 10:27:51 +00:00
foundAll := false
foundSuper := false
2016-08-12 12:26:58 +00:00
foundReplicationClient := false
2016-04-04 10:27:51 +00:00
foundReplicationSlave := false
foundDBAll := false
err := sqlutils . QueryRowsMap ( this . db , query , func ( rowMap sqlutils . RowMap ) error {
for _ , grantData := range rowMap {
grant := grantData . String
if strings . Contains ( grant , ` GRANT ALL PRIVILEGES ON *.* ` ) {
foundAll = true
}
if strings . Contains ( grant , ` SUPER ` ) && strings . Contains ( grant , ` ON *.* ` ) {
foundSuper = true
}
2016-08-12 12:26:58 +00:00
if strings . Contains ( grant , ` REPLICATION CLIENT ` ) && strings . Contains ( grant , ` ON *.* ` ) {
foundReplicationClient = true
}
2016-04-04 10:27:51 +00:00
if strings . Contains ( grant , ` REPLICATION SLAVE ` ) && strings . Contains ( grant , ` ON *.* ` ) {
foundReplicationSlave = true
}
if strings . Contains ( grant , fmt . Sprintf ( "GRANT ALL PRIVILEGES ON `%s`.*" , this . migrationContext . DatabaseName ) ) {
foundDBAll = true
}
2016-06-17 06:03:18 +00:00
if base . StringContainsAll ( grant , ` ALTER ` , ` CREATE ` , ` DELETE ` , ` DROP ` , ` INDEX ` , ` INSERT ` , ` LOCK TABLES ` , ` SELECT ` , ` TRIGGER ` , ` UPDATE ` , ` ON *.* ` ) {
2016-06-16 14:06:26 +00:00
foundDBAll = true
}
2016-06-17 06:03:18 +00:00
if base . StringContainsAll ( grant , ` ALTER ` , ` CREATE ` , ` DELETE ` , ` DROP ` , ` INDEX ` , ` INSERT ` , ` LOCK TABLES ` , ` SELECT ` , ` TRIGGER ` , ` UPDATE ` , fmt . Sprintf ( " ON `%s`.*" , this . migrationContext . DatabaseName ) ) {
2016-06-16 14:06:26 +00:00
foundDBAll = true
}
2016-04-04 10:27:51 +00:00
}
return nil
} )
if err != nil {
2016-04-04 13:29:02 +00:00
return err
2016-04-04 10:27:51 +00:00
}
2016-08-12 12:26:58 +00:00
this . migrationContext . HasSuperPrivilege = foundSuper
2016-04-04 10:27:51 +00:00
if foundAll {
log . Infof ( "User has ALL privileges" )
return nil
}
if foundSuper && foundReplicationSlave && foundDBAll {
2016-08-05 09:41:36 +00:00
log . Infof ( "User has SUPER, REPLICATION SLAVE privileges, and has ALL privileges on %s.*" , sql . EscapeName ( this . migrationContext . DatabaseName ) )
2016-04-04 10:27:51 +00:00
return nil
}
2016-08-12 12:26:58 +00:00
if foundReplicationClient && foundReplicationSlave && foundDBAll {
log . Infof ( "User has REPLICATION CLIENT, REPLICATION SLAVE privileges, and has ALL privileges on %s.*" , sql . EscapeName ( this . migrationContext . DatabaseName ) )
return nil
}
log . Debugf ( "Privileges: Super: %t, REPLICATION CLIENT: %t, REPLICATION SLAVE: %t, ALL on *.*: %t, ALL on %s.*: %t" , foundSuper , foundReplicationClient , foundReplicationSlave , foundAll , sql . EscapeName ( this . migrationContext . DatabaseName ) , foundDBAll )
2016-08-29 10:39:31 +00:00
return log . Errorf ( "User has insufficient privileges for migration. Needed: SUPER|REPLICATION CLIENT, REPLICATION SLAVE and ALL on %s.*" , sql . EscapeName ( this . migrationContext . DatabaseName ) )
2016-04-04 10:27:51 +00:00
}
2016-04-11 15:27:16 +00:00
// restartReplication is required so that we are _certain_ the binlog format and
// row image settings have actually been applied to the replication thread.
// It is entriely possible, for example, that the replication is using 'STATEMENT'
// binlog format even as the variable says 'ROW'
func ( this * Inspector ) restartReplication ( ) error {
log . Infof ( "Restarting replication on %s:%d to make sure binlog settings apply to replication thread" , this . connectionConfig . Key . Hostname , this . connectionConfig . Key . Port )
2016-05-01 18:36:36 +00:00
masterKey , _ := mysql . GetMasterKeyFromSlaveStatus ( this . connectionConfig )
2016-04-14 11:37:56 +00:00
if masterKey == nil {
// This is not a replica
return nil
}
2016-04-11 15:27:16 +00:00
var stopError , startError error
_ , stopError = sqlutils . ExecNoPrepare ( this . db , ` stop slave ` )
_ , startError = sqlutils . ExecNoPrepare ( this . db , ` start slave ` )
if stopError != nil {
return stopError
}
if startError != nil {
return startError
}
log . Debugf ( "Replication restarted" )
return nil
}
2016-04-18 17:57:18 +00:00
// applyBinlogFormat sets ROW binlog format and restarts replication to make
// the replication thread apply it.
func ( this * Inspector ) applyBinlogFormat ( ) error {
if this . migrationContext . RequiresBinlogFormatChange ( ) {
2016-08-12 12:26:58 +00:00
if ! this . migrationContext . SwitchToRowBinlogFormat {
return fmt . Errorf ( "Existing binlog_format is %s. Am not switching it to ROW unless you specify --switch-to-rbr" , this . migrationContext . OriginalBinlogFormat )
}
2016-04-18 17:57:18 +00:00
if _ , err := sqlutils . ExecNoPrepare ( this . db , ` set global binlog_format='ROW' ` ) ; err != nil {
return err
}
if _ , err := sqlutils . ExecNoPrepare ( this . db , ` set session binlog_format='ROW' ` ) ; err != nil {
return err
}
2016-08-12 12:26:58 +00:00
if err := this . restartReplication ( ) ; err != nil {
return err
}
2016-04-18 17:57:18 +00:00
log . Debugf ( "'ROW' binlog format applied" )
2016-08-12 12:26:58 +00:00
return nil
2016-04-18 17:57:18 +00:00
}
2016-08-12 12:26:58 +00:00
// We already have RBR, no explicit switch
2016-08-15 09:05:51 +00:00
if ! this . migrationContext . AssumeRBR {
2016-08-12 12:26:58 +00:00
if err := this . restartReplication ( ) ; err != nil {
return err
}
2016-04-18 17:57:18 +00:00
}
return nil
}
2016-04-06 11:05:58 +00:00
// validateBinlogs checks that binary log configuration is good to go
2016-04-04 10:27:51 +00:00
func ( this * Inspector ) validateBinlogs ( ) error {
2016-08-11 12:49:14 +00:00
query := ` select @@global.log_bin, @@global.binlog_format `
var hasBinaryLogs bool
if err := this . db . QueryRow ( query ) . Scan ( & hasBinaryLogs , & this . migrationContext . OriginalBinlogFormat ) ; err != nil {
2016-04-04 10:27:51 +00:00
return err
}
if ! hasBinaryLogs {
2016-04-04 13:29:02 +00:00
return fmt . Errorf ( "%s:%d must have binary logs enabled" , this . connectionConfig . Key . Hostname , this . connectionConfig . Key . Port )
2016-04-04 10:27:51 +00:00
}
if this . migrationContext . RequiresBinlogFormatChange ( ) {
2016-04-18 17:57:18 +00:00
if ! this . migrationContext . SwitchToRowBinlogFormat {
return fmt . Errorf ( "You must be using ROW binlog format. I can switch it for you, provided --switch-to-rbr and that %s:%d doesn't have replicas" , this . connectionConfig . Key . Hostname , this . connectionConfig . Key . Port )
}
2016-05-16 09:09:17 +00:00
query := fmt . Sprintf ( ` show /* gh-ost */ slave hosts ` )
2016-04-04 10:27:51 +00:00
countReplicas := 0
err := sqlutils . QueryRowsMap ( this . db , query , func ( rowMap sqlutils . RowMap ) error {
countReplicas ++
return nil
} )
if err != nil {
2016-04-04 13:29:02 +00:00
return err
2016-04-04 10:27:51 +00:00
}
if countReplicas > 0 {
2016-04-04 13:29:02 +00:00
return fmt . Errorf ( "%s:%d has %s binlog_format, but I'm too scared to change it to ROW because it has replicas. Bailing out" , this . connectionConfig . Key . Hostname , this . connectionConfig . Key . Port , this . migrationContext . OriginalBinlogFormat )
2016-04-04 10:27:51 +00:00
}
2016-04-18 17:57:18 +00:00
log . Infof ( "%s:%d has %s binlog_format. I will change it to ROW, and will NOT change it back, even in the event of failure." , this . connectionConfig . Key . Hostname , this . connectionConfig . Key . Port , this . migrationContext . OriginalBinlogFormat )
2016-04-04 10:27:51 +00:00
}
query = ` select @@global.binlog_row_image `
if err := this . db . QueryRow ( query ) . Scan ( & this . migrationContext . OriginalBinlogRowImage ) ; err != nil {
// Only as of 5.6. We wish to support 5.5 as well
2016-08-25 14:49:25 +00:00
this . migrationContext . OriginalBinlogRowImage = "FULL"
2016-04-04 10:27:51 +00:00
}
2016-05-20 10:52:14 +00:00
this . migrationContext . OriginalBinlogRowImage = strings . ToUpper ( this . migrationContext . OriginalBinlogRowImage )
2016-04-04 10:27:51 +00:00
2016-04-04 13:29:02 +00:00
log . Infof ( "binary logs validated on %s:%d" , this . connectionConfig . Key . Hostname , this . connectionConfig . Key . Port )
2016-04-04 10:27:51 +00:00
return nil
}
2016-08-11 15:37:50 +00:00
// validateLogSlaveUpdates checks that binary log log_slave_updates is set. This test is not required when migrating on replica or when migrating directly on master
func ( this * Inspector ) validateLogSlaveUpdates ( ) error {
2016-08-11 12:49:14 +00:00
query := ` select @@global.log_slave_updates `
var logSlaveUpdates bool
if err := this . db . QueryRow ( query ) . Scan ( & logSlaveUpdates ) ; err != nil {
return err
}
if ! logSlaveUpdates && ! this . migrationContext . InspectorIsAlsoApplier ( ) {
return fmt . Errorf ( "%s:%d must have log_slave_updates enabled" , this . connectionConfig . Key . Hostname , this . connectionConfig . Key . Port )
}
log . Infof ( "binary logs updates validated on %s:%d" , this . connectionConfig . Key . Hostname , this . connectionConfig . Key . Port )
return nil
2016-04-04 10:27:51 +00:00
}
// validateTable makes sure the table we need to operate on actually exists
func ( this * Inspector ) validateTable ( ) error {
2016-05-16 09:09:17 +00:00
query := fmt . Sprintf ( ` show /* gh-ost */ table status from %s like '%s' ` , sql . EscapeName ( this . migrationContext . DatabaseName ) , this . migrationContext . OriginalTableName )
2016-04-04 10:27:51 +00:00
tableFound := false
err := sqlutils . QueryRowsMap ( this . db , query , func ( rowMap sqlutils . RowMap ) error {
this . migrationContext . TableEngine = rowMap . GetString ( "Engine" )
this . migrationContext . RowsEstimate = rowMap . GetInt64 ( "Rows" )
this . migrationContext . UsedRowsEstimateMethod = base . TableStatusRowsEstimate
if rowMap . GetString ( "Comment" ) == "VIEW" {
return fmt . Errorf ( "%s.%s is a VIEW, not a real table. Bailing out" , sql . EscapeName ( this . migrationContext . DatabaseName ) , sql . EscapeName ( this . migrationContext . OriginalTableName ) )
}
tableFound = true
return nil
} )
if err != nil {
2016-04-04 13:29:02 +00:00
return err
2016-04-04 10:27:51 +00:00
}
if ! tableFound {
return log . Errorf ( "Cannot find table %s.%s!" , sql . EscapeName ( this . migrationContext . DatabaseName ) , sql . EscapeName ( this . migrationContext . OriginalTableName ) )
}
log . Infof ( "Table found. Engine=%s" , this . migrationContext . TableEngine )
log . Debugf ( "Estimated number of rows via STATUS: %d" , this . migrationContext . RowsEstimate )
return nil
}
2016-06-19 15:55:37 +00:00
// validateTableForeignKeys makes sure no foreign keys exist on the migrated table
2016-04-04 16:19:46 +00:00
func ( this * Inspector ) validateTableForeignKeys ( ) error {
query := `
2016-08-03 08:19:27 +00:00
SELECT TABLE_SCHEMA , TABLE_NAME
2016-04-04 16:19:46 +00:00
FROM INFORMATION_SCHEMA . KEY_COLUMN_USAGE
WHERE
REFERENCED_TABLE_NAME IS NOT NULL
AND ( ( TABLE_SCHEMA = ? AND TABLE_NAME = ? )
OR ( REFERENCED_TABLE_SCHEMA = ? AND REFERENCED_TABLE_NAME = ? )
)
`
numForeignKeys := 0
err := sqlutils . QueryRowsMap ( this . db , query , func ( rowMap sqlutils . RowMap ) error {
2016-08-03 08:19:27 +00:00
fkSchema := rowMap . GetString ( "TABLE_SCHEMA" )
fkTable := rowMap . GetString ( "TABLE_NAME" )
log . Infof ( "Found foreign key on %s.%s related to %s.%s" , sql . EscapeName ( fkSchema ) , sql . EscapeName ( fkTable ) , sql . EscapeName ( this . migrationContext . DatabaseName ) , sql . EscapeName ( this . migrationContext . OriginalTableName ) )
numForeignKeys ++
2016-04-04 16:19:46 +00:00
return nil
} ,
this . migrationContext . DatabaseName ,
this . migrationContext . OriginalTableName ,
this . migrationContext . DatabaseName ,
this . migrationContext . OriginalTableName ,
)
if err != nil {
return err
}
if numForeignKeys > 0 {
2016-08-03 08:19:27 +00:00
return log . Errorf ( "Found %d foreign keys related to %s.%s. Foreign keys are not supported. Bailing out" , numForeignKeys , sql . EscapeName ( this . migrationContext . DatabaseName ) , sql . EscapeName ( this . migrationContext . OriginalTableName ) )
2016-04-04 16:19:46 +00:00
}
log . Debugf ( "Validated no foreign keys exist on table" )
return nil
}
2016-08-11 12:10:35 +00:00
// validateTableTriggers makes sure no triggers exist on the migrated table
func ( this * Inspector ) validateTableTriggers ( ) error {
query := `
SELECT COUNT ( * ) AS num_triggers
FROM INFORMATION_SCHEMA . TRIGGERS
WHERE
TRIGGER_SCHEMA = ?
AND EVENT_OBJECT_TABLE = ?
`
numTriggers := 0
err := sqlutils . QueryRowsMap ( this . db , query , func ( rowMap sqlutils . RowMap ) error {
numTriggers = rowMap . GetInt ( "num_triggers" )
return nil
} ,
this . migrationContext . DatabaseName ,
this . migrationContext . OriginalTableName ,
)
if err != nil {
return err
}
if numTriggers > 0 {
return log . Errorf ( "Found triggers on %s.%s. Triggers are not supported at this time. Bailing out" , sql . EscapeName ( this . migrationContext . DatabaseName ) , sql . EscapeName ( this . migrationContext . OriginalTableName ) )
}
log . Debugf ( "Validated no triggers exist on table" )
return nil
}
2016-06-19 15:55:37 +00:00
// estimateTableRowsViaExplain estimates number of rows on original table
2016-04-04 10:27:51 +00:00
func ( this * Inspector ) estimateTableRowsViaExplain ( ) error {
2016-05-16 09:09:17 +00:00
query := fmt . Sprintf ( ` explain select /* gh-ost */ * from %s.%s where 1=1 ` , sql . EscapeName ( this . migrationContext . DatabaseName ) , sql . EscapeName ( this . migrationContext . OriginalTableName ) )
2016-04-04 10:27:51 +00:00
outputFound := false
err := sqlutils . QueryRowsMap ( this . db , query , func ( rowMap sqlutils . RowMap ) error {
this . migrationContext . RowsEstimate = rowMap . GetInt64 ( "rows" )
this . migrationContext . UsedRowsEstimateMethod = base . ExplainRowsEstimate
outputFound = true
return nil
} )
if err != nil {
2016-04-04 13:29:02 +00:00
return err
2016-04-04 10:27:51 +00:00
}
if ! outputFound {
return log . Errorf ( "Cannot run EXPLAIN on %s.%s!" , sql . EscapeName ( this . migrationContext . DatabaseName ) , sql . EscapeName ( this . migrationContext . OriginalTableName ) )
}
log . Infof ( "Estimated number of rows via EXPLAIN: %d" , this . migrationContext . RowsEstimate )
return nil
}
2016-06-19 15:55:37 +00:00
// CountTableRows counts exact number of rows on the original table
2016-06-06 10:33:05 +00:00
func ( this * Inspector ) CountTableRows ( ) error {
2016-07-29 08:40:23 +00:00
atomic . StoreInt64 ( & this . migrationContext . CountingRowsFlag , 1 )
defer atomic . StoreInt64 ( & this . migrationContext . CountingRowsFlag , 0 )
2016-04-04 10:27:51 +00:00
log . Infof ( "As instructed, I'm issuing a SELECT COUNT(*) on the table. This may take a while" )
2016-07-29 08:40:23 +00:00
2016-05-16 09:09:17 +00:00
query := fmt . Sprintf ( ` select /* gh-ost */ count(*) as rows from %s.%s ` , sql . EscapeName ( this . migrationContext . DatabaseName ) , sql . EscapeName ( this . migrationContext . OriginalTableName ) )
2016-08-24 09:39:44 +00:00
var rowsEstimate int64
if err := this . db . QueryRow ( query ) . Scan ( & rowsEstimate ) ; err != nil {
2016-04-04 10:27:51 +00:00
return err
}
2016-08-24 09:39:44 +00:00
atomic . StoreInt64 ( & this . migrationContext . RowsEstimate , rowsEstimate )
2016-04-04 10:27:51 +00:00
this . migrationContext . UsedRowsEstimateMethod = base . CountRowsEstimate
2016-07-29 08:40:23 +00:00
2016-08-24 09:39:44 +00:00
log . Infof ( "Exact number of rows via COUNT: %d" , rowsEstimate )
2016-07-29 08:40:23 +00:00
2016-04-04 10:27:51 +00:00
return nil
}
2016-06-19 15:55:37 +00:00
// getTableColumns reads column list from given table
2016-04-11 15:27:16 +00:00
func ( this * Inspector ) getTableColumns ( databaseName , tableName string ) ( * sql . ColumnList , error ) {
2016-04-06 11:05:58 +00:00
query := fmt . Sprintf ( `
show columns from % s . % s
` ,
sql . EscapeName ( databaseName ) ,
sql . EscapeName ( tableName ) ,
)
2016-04-11 15:27:16 +00:00
columnNames := [ ] string { }
err := sqlutils . QueryRowsMap ( this . db , query , func ( rowMap sqlutils . RowMap ) error {
columnNames = append ( columnNames , rowMap . GetString ( "Field" ) )
2016-04-06 11:05:58 +00:00
return nil
} )
if err != nil {
2016-04-11 15:27:16 +00:00
return nil , err
2016-04-06 11:05:58 +00:00
}
2016-04-11 15:27:16 +00:00
if len ( columnNames ) == 0 {
return nil , log . Errorf ( "Found 0 columns on %s.%s. Bailing out" ,
2016-04-06 11:05:58 +00:00
sql . EscapeName ( databaseName ) ,
sql . EscapeName ( tableName ) ,
)
}
2016-04-11 15:27:16 +00:00
return sql . NewColumnList ( columnNames ) , nil
2016-04-06 11:05:58 +00:00
}
2016-09-07 12:24:11 +00:00
// applyColumnTypes
func ( this * Inspector ) applyColumnTypes ( databaseName , tableName string , columnsLists ... * sql . ColumnList ) error {
query := `
select
*
from
information_schema . columns
where
table_schema = ?
and table_name = ?
`
err := sqlutils . QueryRowsMap ( this . db , query , func ( m sqlutils . RowMap ) error {
columnName := m . GetString ( "COLUMN_NAME" )
if strings . Contains ( m . GetString ( "COLUMN_TYPE" ) , "unsigned" ) {
2016-08-17 04:51:58 +00:00
for _ , columnsList := range columnsLists {
columnsList . SetUnsigned ( columnName )
}
}
2016-09-07 12:24:11 +00:00
if charset := m . GetString ( "CHARACTER_SET_NAME" ) ; charset != "" {
for _ , columnsList := range columnsLists {
columnsList . SetCharset ( columnName , charset )
}
}
2016-08-17 04:51:58 +00:00
return nil
2016-09-07 12:24:11 +00:00
} , databaseName , tableName )
2016-08-17 04:51:58 +00:00
return err
}
2016-04-04 10:27:51 +00:00
// getCandidateUniqueKeys investigates a table and returns the list of unique keys
// candidate for chunking
func ( this * Inspector ) getCandidateUniqueKeys ( tableName string ) ( uniqueKeys [ ] ( * sql . UniqueKey ) , err error ) {
query := `
SELECT
COLUMNS . TABLE_SCHEMA ,
COLUMNS . TABLE_NAME ,
COLUMNS . COLUMN_NAME ,
UNIQUES . INDEX_NAME ,
UNIQUES . COLUMN_NAMES ,
UNIQUES . COUNT_COLUMN_IN_INDEX ,
COLUMNS . DATA_TYPE ,
COLUMNS . CHARACTER_SET_NAME ,
2016-04-18 17:57:18 +00:00
LOCATE ( ' auto_increment ' , EXTRA ) > 0 as is_auto_increment ,
2016-04-04 10:27:51 +00:00
has_nullable
FROM INFORMATION_SCHEMA . COLUMNS INNER JOIN (
SELECT
TABLE_SCHEMA ,
TABLE_NAME ,
INDEX_NAME ,
COUNT ( * ) AS COUNT_COLUMN_IN_INDEX ,
GROUP_CONCAT ( COLUMN_NAME ORDER BY SEQ_IN_INDEX ASC ) AS COLUMN_NAMES ,
SUBSTRING_INDEX ( GROUP_CONCAT ( COLUMN_NAME ORDER BY SEQ_IN_INDEX ASC ) , ',' , 1 ) AS FIRST_COLUMN_NAME ,
SUM ( NULLABLE = ' YES ' ) > 0 AS has_nullable
FROM INFORMATION_SCHEMA . STATISTICS
2016-05-04 06:50:00 +00:00
WHERE
NON_UNIQUE = 0
AND TABLE_SCHEMA = ?
AND TABLE_NAME = ?
2016-04-04 10:27:51 +00:00
GROUP BY TABLE_SCHEMA , TABLE_NAME , INDEX_NAME
) AS UNIQUES
ON (
COLUMNS . TABLE_SCHEMA = UNIQUES . TABLE_SCHEMA AND
COLUMNS . TABLE_NAME = UNIQUES . TABLE_NAME AND
COLUMNS . COLUMN_NAME = UNIQUES . FIRST_COLUMN_NAME
)
WHERE
COLUMNS . TABLE_SCHEMA = ?
AND COLUMNS . TABLE_NAME = ?
ORDER BY
COLUMNS . TABLE_SCHEMA , COLUMNS . TABLE_NAME ,
CASE UNIQUES . INDEX_NAME
WHEN ' PRIMARY ' THEN 0
ELSE 1
END ,
CASE has_nullable
WHEN 0 THEN 0
ELSE 1
END ,
CASE IFNULL ( CHARACTER_SET_NAME , ' ' )
WHEN ' ' THEN 0
ELSE 1
END ,
CASE DATA_TYPE
WHEN ' tinyint ' THEN 0
WHEN ' smallint ' THEN 1
WHEN ' int ' THEN 2
WHEN ' bigint ' THEN 3
ELSE 100
END ,
COUNT_COLUMN_IN_INDEX
`
2016-04-18 17:57:18 +00:00
err = sqlutils . QueryRowsMap ( this . db , query , func ( m sqlutils . RowMap ) error {
2016-04-04 10:27:51 +00:00
uniqueKey := & sql . UniqueKey {
2016-04-18 17:57:18 +00:00
Name : m . GetString ( "INDEX_NAME" ) ,
Columns : * sql . ParseColumnList ( m . GetString ( "COLUMN_NAMES" ) ) ,
HasNullable : m . GetBool ( "has_nullable" ) ,
IsAutoIncrement : m . GetBool ( "is_auto_increment" ) ,
2016-04-04 10:27:51 +00:00
}
uniqueKeys = append ( uniqueKeys , uniqueKey )
return nil
2016-05-04 06:50:00 +00:00
} , this . migrationContext . DatabaseName , tableName , this . migrationContext . DatabaseName , tableName )
2016-04-04 10:27:51 +00:00
if err != nil {
return uniqueKeys , err
}
2016-04-18 17:57:18 +00:00
log . Debugf ( "Potential unique keys in %+v: %+v" , tableName , uniqueKeys )
2016-04-04 10:27:51 +00:00
return uniqueKeys , nil
}
2016-04-08 12:35:06 +00:00
// getSharedUniqueKeys returns the intersection of two given unique keys,
// testing by list of columns
func ( this * Inspector ) getSharedUniqueKeys ( originalUniqueKeys , ghostUniqueKeys [ ] ( * sql . UniqueKey ) ) ( uniqueKeys [ ] ( * sql . UniqueKey ) , err error ) {
2016-04-04 10:27:51 +00:00
// We actually do NOT rely on key name, just on the set of columns. This is because maybe
// the ALTER is on the name itself...
for _ , originalUniqueKey := range originalUniqueKeys {
for _ , ghostUniqueKey := range ghostUniqueKeys {
2016-09-07 12:24:11 +00:00
if originalUniqueKey . Columns . EqualsByNames ( & ghostUniqueKey . Columns ) {
2016-04-04 10:27:51 +00:00
uniqueKeys = append ( uniqueKeys , originalUniqueKey )
}
}
}
return uniqueKeys , nil
}
2016-04-04 13:29:02 +00:00
2016-04-08 12:35:06 +00:00
// getSharedColumns returns the intersection of two lists of columns in same order as the first list
2016-06-17 06:03:18 +00:00
func ( this * Inspector ) getSharedColumns ( originalColumns , ghostColumns * sql . ColumnList , columnRenameMap map [ string ] string ) ( * sql . ColumnList , * sql . ColumnList ) {
2016-04-08 12:35:06 +00:00
columnsInGhost := make ( map [ string ] bool )
2016-09-07 12:24:11 +00:00
for _ , ghostColumn := range ghostColumns . Names ( ) {
2016-04-08 12:35:06 +00:00
columnsInGhost [ ghostColumn ] = true
}
2016-04-11 15:27:16 +00:00
sharedColumnNames := [ ] string { }
2016-09-07 12:24:11 +00:00
for _ , originalColumn := range originalColumns . Names ( ) {
2016-06-17 06:03:18 +00:00
if columnsInGhost [ originalColumn ] || columnsInGhost [ columnRenameMap [ originalColumn ] ] {
2016-04-11 15:27:16 +00:00
sharedColumnNames = append ( sharedColumnNames , originalColumn )
2016-04-08 12:35:06 +00:00
}
}
2016-06-17 06:03:18 +00:00
mappedSharedColumnNames := [ ] string { }
for _ , columnName := range sharedColumnNames {
if mapped , ok := columnRenameMap [ columnName ] ; ok {
mappedSharedColumnNames = append ( mappedSharedColumnNames , mapped )
} else {
mappedSharedColumnNames = append ( mappedSharedColumnNames , columnName )
}
}
return sql . NewColumnList ( sharedColumnNames ) , sql . NewColumnList ( mappedSharedColumnNames )
2016-04-08 12:35:06 +00:00
}
2016-06-22 10:39:13 +00:00
// showCreateTable returns the `show create table` statement for given table
func ( this * Inspector ) showCreateTable ( tableName string ) ( createTableStatement string , err error ) {
var dummy string
query := fmt . Sprintf ( ` show /* gh-ost */ create table %s.%s ` , sql . EscapeName ( this . migrationContext . DatabaseName ) , sql . EscapeName ( tableName ) )
err = this . db . QueryRow ( query ) . Scan ( & dummy , & createTableStatement )
return createTableStatement , err
}
2016-06-19 15:55:37 +00:00
// readChangelogState reads changelog hints
2016-04-14 11:37:56 +00:00
func ( this * Inspector ) readChangelogState ( ) ( map [ string ] string , error ) {
query := fmt . Sprintf ( `
select hint , value from % s . % s where id <= 255
` ,
sql . EscapeName ( this . migrationContext . DatabaseName ) ,
sql . EscapeName ( this . migrationContext . GetChangelogTableName ( ) ) ,
)
result := make ( map [ string ] string )
err := sqlutils . QueryRowsMap ( this . db , query , func ( m sqlutils . RowMap ) error {
result [ m . GetString ( "hint" ) ] = m . GetString ( "value" )
return nil
} )
return result , err
2016-04-04 13:29:02 +00:00
}
2016-04-14 11:37:56 +00:00
func ( this * Inspector ) getMasterConnectionConfig ( ) ( applierConfig * mysql . ConnectionConfig , err error ) {
visitedKeys := mysql . NewInstanceKeyMap ( )
2016-06-22 08:38:13 +00:00
return mysql . GetMasterConnectionConfigSafe ( this . connectionConfig , visitedKeys , this . migrationContext . AllowedMasterMaster )
2016-04-04 13:29:02 +00:00
}