Merge 47a3d3b22d
into 0a033c76c1
This commit is contained in:
commit
b5fe440de5
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
Copyright 2022 GitHub Inc.
|
||||
Copyright 2023 GitHub Inc.
|
||||
See https://github.com/github/gh-ost/blob/master/LICENSE
|
||||
*/
|
||||
|
||||
|
@ -163,18 +163,15 @@ type MigrationContext struct {
|
|||
|
||||
Hostname string
|
||||
AssumeMasterHostname string
|
||||
ApplierTimeZone string
|
||||
TableEngine string
|
||||
RowsEstimate int64
|
||||
RowsDeltaEstimate int64
|
||||
UsedRowsEstimateMethod RowsEstimateMethod
|
||||
HasSuperPrivilege bool
|
||||
OriginalBinlogFormat string
|
||||
OriginalBinlogRowImage string
|
||||
InspectorConnectionConfig *mysql.ConnectionConfig
|
||||
InspectorMySQLVersion string
|
||||
InspectorServerInfo *mysql.ServerInfo
|
||||
ApplierConnectionConfig *mysql.ConnectionConfig
|
||||
ApplierMySQLVersion string
|
||||
ApplierServerInfo *mysql.ServerInfo
|
||||
StartTime time.Time
|
||||
RowCopyStartTime time.Time
|
||||
RowCopyEndTime time.Time
|
||||
|
@ -359,11 +356,6 @@ func (this *MigrationContext) GetVoluntaryLockName() string {
|
|||
return fmt.Sprintf("%s.%s.lock", this.DatabaseName, this.OriginalTableName)
|
||||
}
|
||||
|
||||
// RequiresBinlogFormatChange is `true` when the original binlog format isn't `ROW`
|
||||
func (this *MigrationContext) RequiresBinlogFormatChange() bool {
|
||||
return this.OriginalBinlogFormat != "ROW"
|
||||
}
|
||||
|
||||
// GetApplierHostname is a safe access method to the applier hostname
|
||||
func (this *MigrationContext) GetApplierHostname() string {
|
||||
if this.ApplierConnectionConfig == nil {
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
Copyright 2022 GitHub Inc.
|
||||
Copyright 2023 GitHub Inc.
|
||||
See https://github.com/github/gh-ost/blob/master/LICENSE
|
||||
*/
|
||||
|
||||
|
@ -12,8 +12,6 @@ import (
|
|||
"strings"
|
||||
"time"
|
||||
|
||||
gosql "database/sql"
|
||||
|
||||
"github.com/github/gh-ost/go/mysql"
|
||||
)
|
||||
|
||||
|
@ -61,35 +59,22 @@ func StringContainsAll(s string, substrings ...string) bool {
|
|||
return nonEmptyStringsFound
|
||||
}
|
||||
|
||||
func ValidateConnection(db *gosql.DB, connectionConfig *mysql.ConnectionConfig, migrationContext *MigrationContext, name string) (string, error) {
|
||||
versionQuery := `select @@global.version`
|
||||
var port, extraPort int
|
||||
var version string
|
||||
if err := db.QueryRow(versionQuery).Scan(&version); err != nil {
|
||||
return "", err
|
||||
}
|
||||
extraPortQuery := `select @@global.extra_port`
|
||||
if err := db.QueryRow(extraPortQuery).Scan(&extraPort); err != nil { // nolint:staticcheck
|
||||
// swallow this error. not all servers support extra_port
|
||||
}
|
||||
// ValidateConnection confirms the database server info matches the provided connection config.
|
||||
func ValidateConnection(serverInfo *mysql.ServerInfo, connectionConfig *mysql.ConnectionConfig, migrationContext *MigrationContext, name string) error {
|
||||
// AliyunRDS set users port to "NULL", replace it by gh-ost param
|
||||
// GCP set users port to "NULL", replace it by gh-ost param
|
||||
// Azure MySQL set users port to a different value by design, replace it by gh-ost para
|
||||
// Azure MySQL set users port to a different value by design, replace it by gh-ost param
|
||||
if migrationContext.AliyunRDS || migrationContext.GoogleCloudPlatform || migrationContext.AzureMySQL {
|
||||
port = connectionConfig.Key.Port
|
||||
} else {
|
||||
portQuery := `select @@global.port`
|
||||
if err := db.QueryRow(portQuery).Scan(&port); err != nil {
|
||||
return "", err
|
||||
}
|
||||
serverInfo.Port.Int64 = connectionConfig.Key.Port
|
||||
serverInfo.Port.Valid = connectionConfig.Key.Port > 0
|
||||
}
|
||||
|
||||
if connectionConfig.Key.Port == port || (extraPort > 0 && connectionConfig.Key.Port == extraPort) {
|
||||
migrationContext.Log.Infof("%s connection validated on %+v", name, connectionConfig.Key)
|
||||
return version, nil
|
||||
} else if extraPort == 0 {
|
||||
return "", fmt.Errorf("Unexpected database port reported: %+v", port)
|
||||
} else {
|
||||
return "", fmt.Errorf("Unexpected database port reported: %+v / extra_port: %+v", port, extraPort)
|
||||
if !serverInfo.Port.Valid && !serverInfo.ExtraPort.Valid {
|
||||
return fmt.Errorf("Unexpected database port reported: %+v", serverInfo.Port.Int64)
|
||||
} else if connectionConfig.Key.Port != serverInfo.Port.Int64 && connectionConfig.Key.Port != serverInfo.ExtraPort.Int64 {
|
||||
return fmt.Errorf("Unexpected database port reported: %+v / extra_port: %+v", serverInfo.Port.Int64, serverInfo.ExtraPort.Int64)
|
||||
}
|
||||
|
||||
migrationContext.Log.Infof("%s connection validated on %+v", name, connectionConfig.Key)
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -1,13 +1,15 @@
|
|||
/*
|
||||
Copyright 2016 GitHub Inc.
|
||||
Copyright 2023 GitHub Inc.
|
||||
See https://github.com/github/gh-ost/blob/master/LICENSE
|
||||
*/
|
||||
|
||||
package base
|
||||
|
||||
import (
|
||||
gosql "database/sql"
|
||||
"testing"
|
||||
|
||||
"github.com/github/gh-ost/go/mysql"
|
||||
"github.com/openark/golib/log"
|
||||
test "github.com/openark/golib/tests"
|
||||
)
|
||||
|
@ -16,6 +18,10 @@ func init() {
|
|||
log.SetLevel(log.ERROR)
|
||||
}
|
||||
|
||||
func newMysqlPort(port int64) gosql.NullInt64 {
|
||||
return gosql.NullInt64{Int64: port, Valid: port > 0}
|
||||
}
|
||||
|
||||
func TestStringContainsAll(t *testing.T) {
|
||||
s := `insert,delete,update`
|
||||
|
||||
|
@ -27,3 +33,80 @@ func TestStringContainsAll(t *testing.T) {
|
|||
test.S(t).ExpectTrue(StringContainsAll(s, "insert", ""))
|
||||
test.S(t).ExpectTrue(StringContainsAll(s, "insert", "update", "delete"))
|
||||
}
|
||||
|
||||
func TestValidateConnection(t *testing.T) {
|
||||
connectionConfig := &mysql.ConnectionConfig{
|
||||
Key: mysql.InstanceKey{
|
||||
Hostname: t.Name(),
|
||||
Port: mysql.DefaultInstancePort,
|
||||
},
|
||||
}
|
||||
|
||||
// check valid port matching connectionConfig validates
|
||||
{
|
||||
migrationContext := &MigrationContext{Log: NewDefaultLogger()}
|
||||
serverInfo := &mysql.ServerInfo{
|
||||
Port: newMysqlPort(mysql.DefaultInstancePort),
|
||||
ExtraPort: newMysqlPort(mysql.DefaultInstancePort + 1),
|
||||
}
|
||||
test.S(t).ExpectNil(ValidateConnection(serverInfo, connectionConfig, migrationContext, "test"))
|
||||
}
|
||||
// check NULL port validates when AliyunRDS=true
|
||||
{
|
||||
migrationContext := &MigrationContext{
|
||||
Log: NewDefaultLogger(),
|
||||
AliyunRDS: true,
|
||||
}
|
||||
serverInfo := &mysql.ServerInfo{}
|
||||
test.S(t).ExpectNil(ValidateConnection(serverInfo, connectionConfig, migrationContext, "test"))
|
||||
}
|
||||
// check NULL port validates when AzureMySQL=true
|
||||
{
|
||||
migrationContext := &MigrationContext{
|
||||
Log: NewDefaultLogger(),
|
||||
AzureMySQL: true,
|
||||
}
|
||||
serverInfo := &mysql.ServerInfo{}
|
||||
test.S(t).ExpectNil(ValidateConnection(serverInfo, connectionConfig, migrationContext, "test"))
|
||||
}
|
||||
// check NULL port validates when GoogleCloudPlatform=true
|
||||
{
|
||||
migrationContext := &MigrationContext{
|
||||
Log: NewDefaultLogger(),
|
||||
GoogleCloudPlatform: true,
|
||||
}
|
||||
serverInfo := &mysql.ServerInfo{}
|
||||
test.S(t).ExpectNil(ValidateConnection(serverInfo, connectionConfig, migrationContext, "test"))
|
||||
}
|
||||
// check extra_port validates when port=NULL
|
||||
{
|
||||
migrationContext := &MigrationContext{Log: NewDefaultLogger()}
|
||||
serverInfo := &mysql.ServerInfo{
|
||||
ExtraPort: newMysqlPort(mysql.DefaultInstancePort),
|
||||
}
|
||||
test.S(t).ExpectNil(ValidateConnection(serverInfo, connectionConfig, migrationContext, "test"))
|
||||
}
|
||||
// check extra_port validates when port does not match but extra_port does
|
||||
{
|
||||
migrationContext := &MigrationContext{Log: NewDefaultLogger()}
|
||||
serverInfo := &mysql.ServerInfo{
|
||||
Port: newMysqlPort(12345),
|
||||
ExtraPort: newMysqlPort(mysql.DefaultInstancePort),
|
||||
}
|
||||
test.S(t).ExpectNil(ValidateConnection(serverInfo, connectionConfig, migrationContext, "test"))
|
||||
}
|
||||
// check validation fails when valid port does not match connectionConfig
|
||||
{
|
||||
migrationContext := &MigrationContext{Log: NewDefaultLogger()}
|
||||
serverInfo := &mysql.ServerInfo{
|
||||
Port: newMysqlPort(9999),
|
||||
}
|
||||
test.S(t).ExpectNotNil(ValidateConnection(serverInfo, connectionConfig, migrationContext, "test"))
|
||||
}
|
||||
// check validation fails when port and extra_port are invalid
|
||||
{
|
||||
migrationContext := &MigrationContext{Log: NewDefaultLogger()}
|
||||
serverInfo := &mysql.ServerInfo{}
|
||||
test.S(t).ExpectNotNil(ValidateConnection(serverInfo, connectionConfig, migrationContext, "test"))
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
Copyright 2022 GitHub Inc.
|
||||
Copyright 2023 GitHub Inc.
|
||||
See https://github.com/github/gh-ost/blob/master/LICENSE
|
||||
*/
|
||||
|
||||
|
@ -49,7 +49,7 @@ func main() {
|
|||
migrationContext := base.NewMigrationContext()
|
||||
flag.StringVar(&migrationContext.InspectorConnectionConfig.Key.Hostname, "host", "127.0.0.1", "MySQL hostname (preferably a replica, not the master)")
|
||||
flag.StringVar(&migrationContext.AssumeMasterHostname, "assume-master-host", "", "(optional) explicitly tell gh-ost the identity of the master. Format: some.host.com[:port] This is useful in master-master setups where you wish to pick an explicit master, or in a tungsten-replicator where gh-ost is unable to determine the master")
|
||||
flag.IntVar(&migrationContext.InspectorConnectionConfig.Key.Port, "port", 3306, "MySQL port (preferably a replica, not the master)")
|
||||
flag.Int64Var(&migrationContext.InspectorConnectionConfig.Key.Port, "port", 3306, "MySQL port (preferably a replica, not the master)")
|
||||
flag.Float64Var(&migrationContext.InspectorConnectionConfig.Timeout, "mysql-timeout", 0.0, "Connect, read and write timeout for MySQL")
|
||||
flag.StringVar(&migrationContext.CliUser, "user", "", "MySQL user")
|
||||
flag.StringVar(&migrationContext.CliPassword, "password", "", "MySQL password")
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
Copyright 2022 GitHub Inc.
|
||||
Copyright 2023 GitHub Inc.
|
||||
See https://github.com/github/gh-ost/blob/master/LICENSE
|
||||
*/
|
||||
|
||||
|
@ -71,25 +71,24 @@ func NewApplier(migrationContext *base.MigrationContext) *Applier {
|
|||
}
|
||||
}
|
||||
|
||||
func (this *Applier) ServerInfo() *mysql.ServerInfo {
|
||||
return this.migrationContext.ApplierServerInfo
|
||||
}
|
||||
|
||||
func (this *Applier) InitDBConnections() (err error) {
|
||||
applierUri := this.connectionConfig.GetDBUri(this.migrationContext.DatabaseName)
|
||||
if this.db, _, err = mysql.GetDB(this.migrationContext.Uuid, applierUri); err != nil {
|
||||
return err
|
||||
}
|
||||
if this.migrationContext.ApplierServerInfo, err = mysql.GetServerInfo(this.db); err != nil {
|
||||
return err
|
||||
}
|
||||
singletonApplierUri := fmt.Sprintf("%s&timeout=0", applierUri)
|
||||
if this.singletonDB, _, err = mysql.GetDB(this.migrationContext.Uuid, singletonApplierUri); err != nil {
|
||||
return err
|
||||
}
|
||||
this.singletonDB.SetMaxOpenConns(1)
|
||||
version, err := base.ValidateConnection(this.db, this.connectionConfig, this.migrationContext, this.name)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if _, err := base.ValidateConnection(this.singletonDB, this.connectionConfig, this.migrationContext, this.name); err != nil {
|
||||
return err
|
||||
}
|
||||
this.migrationContext.ApplierMySQLVersion = version
|
||||
if err := this.validateAndReadTimeZone(); err != nil {
|
||||
if err = base.ValidateConnection(this.ServerInfo(), this.connectionConfig, this.migrationContext, this.name); err != nil {
|
||||
return err
|
||||
}
|
||||
if !this.migrationContext.AliyunRDS && !this.migrationContext.GoogleCloudPlatform && !this.migrationContext.AzureMySQL {
|
||||
|
@ -102,18 +101,8 @@ func (this *Applier) InitDBConnections() (err error) {
|
|||
if err := this.readTableColumns(); err != nil {
|
||||
return err
|
||||
}
|
||||
this.migrationContext.Log.Infof("Applier initiated on %+v, version %+v", this.connectionConfig.ImpliedKey, this.migrationContext.ApplierMySQLVersion)
|
||||
return nil
|
||||
}
|
||||
|
||||
// validateAndReadTimeZone potentially reads server time-zone
|
||||
func (this *Applier) validateAndReadTimeZone() error {
|
||||
query := `select @@global.time_zone`
|
||||
if err := this.db.QueryRow(query).Scan(&this.migrationContext.ApplierTimeZone); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
this.migrationContext.Log.Infof("will use time_zone='%s' on applier", this.migrationContext.ApplierTimeZone)
|
||||
this.migrationContext.Log.Infof("Applier initiated on %+v, version %+v (%+v)", this.connectionConfig.ImpliedKey,
|
||||
this.ServerInfo().Version, this.ServerInfo().VersionComment)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -238,7 +227,7 @@ func (this *Applier) CreateGhostTable() error {
|
|||
}
|
||||
defer tx.Rollback()
|
||||
|
||||
sessionQuery := fmt.Sprintf(`SET SESSION time_zone = '%s'`, this.migrationContext.ApplierTimeZone)
|
||||
sessionQuery := fmt.Sprintf(`SET SESSION time_zone = '%s'`, this.ServerInfo().TimeZone)
|
||||
sessionQuery = fmt.Sprintf("%s, %s", sessionQuery, this.generateSqlModeQuery())
|
||||
|
||||
if _, err := tx.Exec(sessionQuery); err != nil {
|
||||
|
@ -279,7 +268,7 @@ func (this *Applier) AlterGhost() error {
|
|||
}
|
||||
defer tx.Rollback()
|
||||
|
||||
sessionQuery := fmt.Sprintf(`SET SESSION time_zone = '%s'`, this.migrationContext.ApplierTimeZone)
|
||||
sessionQuery := fmt.Sprintf(`SET SESSION time_zone = '%s'`, this.ServerInfo().TimeZone)
|
||||
sessionQuery = fmt.Sprintf("%s, %s", sessionQuery, this.generateSqlModeQuery())
|
||||
|
||||
if _, err := tx.Exec(sessionQuery); err != nil {
|
||||
|
@ -639,7 +628,7 @@ func (this *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected
|
|||
}
|
||||
defer tx.Rollback()
|
||||
|
||||
sessionQuery := fmt.Sprintf(`SET SESSION time_zone = '%s'`, this.migrationContext.ApplierTimeZone)
|
||||
sessionQuery := fmt.Sprintf(`SET SESSION time_zone = '%s'`, this.ServerInfo().TimeZone)
|
||||
sessionQuery = fmt.Sprintf("%s, %s", sessionQuery, this.generateSqlModeQuery())
|
||||
|
||||
if _, err := tx.Exec(sessionQuery); err != nil {
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
Copyright 2022 GitHub Inc.
|
||||
Copyright 2023 GitHub Inc.
|
||||
See https://github.com/github/gh-ost/blob/master/LICENSE
|
||||
*/
|
||||
|
||||
|
@ -42,6 +42,10 @@ func NewInspector(migrationContext *base.MigrationContext) *Inspector {
|
|||
}
|
||||
}
|
||||
|
||||
func (this *Inspector) ServerInfo() *mysql.ServerInfo {
|
||||
return this.migrationContext.InspectorServerInfo
|
||||
}
|
||||
|
||||
func (this *Inspector) InitDBConnections() (err error) {
|
||||
inspectorUri := this.connectionConfig.GetDBUri(this.migrationContext.DatabaseName)
|
||||
if this.db, _, err = mysql.GetDB(this.migrationContext.Uuid, inspectorUri); err != nil {
|
||||
|
@ -72,10 +76,19 @@ func (this *Inspector) InitDBConnections() (err error) {
|
|||
if err := this.applyBinlogFormat(); err != nil {
|
||||
return err
|
||||
}
|
||||
this.migrationContext.Log.Infof("Inspector initiated on %+v, version %+v", this.connectionConfig.ImpliedKey, this.migrationContext.InspectorMySQLVersion)
|
||||
this.migrationContext.Log.Infof("Inspector initiated on %+v, version %+v (%+v)", this.connectionConfig.ImpliedKey,
|
||||
this.ServerInfo().Version, this.ServerInfo().VersionComment)
|
||||
return nil
|
||||
}
|
||||
|
||||
// RequiresBinlogFormatChange is `true` when the original binlog format isn't `ROW`
|
||||
func (this *Inspector) RequiresBinlogFormatChange() bool {
|
||||
if this.ServerInfo() == nil {
|
||||
return true
|
||||
}
|
||||
return this.ServerInfo().BinlogFormat != "ROW"
|
||||
}
|
||||
|
||||
func (this *Inspector) ValidateOriginalTable() (err error) {
|
||||
if err := this.validateTable(); err != nil {
|
||||
return err
|
||||
|
@ -184,7 +197,7 @@ func (this *Inspector) inspectOriginalAndGhostTables() (err error) {
|
|||
column := this.migrationContext.SharedColumns.Columns()[i]
|
||||
mappedColumn := this.migrationContext.MappedSharedColumns.Columns()[i]
|
||||
if column.Name == mappedColumn.Name && column.Type == sql.DateTimeColumnType && mappedColumn.Type == sql.TimestampColumnType {
|
||||
this.migrationContext.MappedSharedColumns.SetConvertDatetimeToTimestamp(column.Name, this.migrationContext.ApplierTimeZone)
|
||||
this.migrationContext.MappedSharedColumns.SetConvertDatetimeToTimestamp(column.Name, this.ServerInfo().TimeZone)
|
||||
}
|
||||
if column.Name == mappedColumn.Name && column.Type == sql.EnumColumnType && mappedColumn.Charset != "" {
|
||||
this.migrationContext.MappedSharedColumns.SetEnumToTextConversion(column.Name)
|
||||
|
@ -209,14 +222,16 @@ func (this *Inspector) inspectOriginalAndGhostTables() (err error) {
|
|||
}
|
||||
|
||||
// validateConnection issues a simple can-connect to MySQL
|
||||
func (this *Inspector) validateConnection() error {
|
||||
func (this *Inspector) validateConnection() (err error) {
|
||||
if len(this.connectionConfig.Password) > mysql.MaxReplicationPasswordLength {
|
||||
return fmt.Errorf("MySQL replication length limited to 32 characters. See https://dev.mysql.com/doc/refman/5.7/en/assigning-passwords.html")
|
||||
}
|
||||
|
||||
version, err := base.ValidateConnection(this.db, this.connectionConfig, this.migrationContext, this.name)
|
||||
this.migrationContext.InspectorMySQLVersion = version
|
||||
return err
|
||||
if this.migrationContext.InspectorServerInfo, err = mysql.GetServerInfo(this.db); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return base.ValidateConnection(this.ServerInfo(), this.connectionConfig, this.migrationContext, this.name)
|
||||
}
|
||||
|
||||
// validateGrants verifies the user by which we're executing has necessary grants
|
||||
|
@ -311,9 +326,10 @@ func (this *Inspector) restartReplication() error {
|
|||
// applyBinlogFormat sets ROW binlog format and restarts replication to make
|
||||
// the replication thread apply it.
|
||||
func (this *Inspector) applyBinlogFormat() error {
|
||||
if this.migrationContext.RequiresBinlogFormatChange() {
|
||||
if this.RequiresBinlogFormatChange() {
|
||||
if !this.migrationContext.SwitchToRowBinlogFormat {
|
||||
return fmt.Errorf("Existing binlog_format is %s. Am not switching it to ROW unless you specify --switch-to-rbr", this.migrationContext.OriginalBinlogFormat)
|
||||
return fmt.Errorf("Existing binlog_format is %s. Am not switching it to ROW unless you specify --switch-to-rbr",
|
||||
this.ServerInfo().BinlogFormat)
|
||||
}
|
||||
if _, err := sqlutils.ExecNoPrepare(this.db, `set global binlog_format='ROW'`); err != nil {
|
||||
return err
|
||||
|
@ -338,15 +354,11 @@ func (this *Inspector) applyBinlogFormat() error {
|
|||
|
||||
// validateBinlogs checks that binary log configuration is good to go
|
||||
func (this *Inspector) validateBinlogs() error {
|
||||
query := `select @@global.log_bin, @@global.binlog_format`
|
||||
var hasBinaryLogs bool
|
||||
if err := this.db.QueryRow(query).Scan(&hasBinaryLogs, &this.migrationContext.OriginalBinlogFormat); err != nil {
|
||||
return err
|
||||
}
|
||||
if !hasBinaryLogs {
|
||||
if !this.ServerInfo().LogBin {
|
||||
return fmt.Errorf("%s must have binary logs enabled", this.connectionConfig.Key.String())
|
||||
}
|
||||
if this.migrationContext.RequiresBinlogFormatChange() {
|
||||
|
||||
if this.RequiresBinlogFormatChange() {
|
||||
if !this.migrationContext.SwitchToRowBinlogFormat {
|
||||
return fmt.Errorf("You must be using ROW binlog format. I can switch it for you, provided --switch-to-rbr and that %s doesn't have replicas", this.connectionConfig.Key.String())
|
||||
}
|
||||
|
@ -360,17 +372,13 @@ func (this *Inspector) validateBinlogs() error {
|
|||
return err
|
||||
}
|
||||
if countReplicas > 0 {
|
||||
return fmt.Errorf("%s has %s binlog_format, but I'm too scared to change it to ROW because it has replicas. Bailing out", this.connectionConfig.Key.String(), this.migrationContext.OriginalBinlogFormat)
|
||||
return fmt.Errorf("%s has %s binlog_format, but I'm too scared to change it to ROW because it has replicas. Bailing out", this.connectionConfig.Key.String(), this.ServerInfo().BinlogFormat)
|
||||
}
|
||||
this.migrationContext.Log.Infof("%s has %s binlog_format. I will change it to ROW, and will NOT change it back, even in the event of failure.", this.connectionConfig.Key.String(), this.migrationContext.OriginalBinlogFormat)
|
||||
this.migrationContext.Log.Infof("%s has %s binlog_format. I will change it to ROW, and will NOT change it back, even in the event of failure.", this.connectionConfig.Key.String(), this.ServerInfo().BinlogFormat)
|
||||
}
|
||||
query = `select @@global.binlog_row_image`
|
||||
if err := this.db.QueryRow(query).Scan(&this.migrationContext.OriginalBinlogRowImage); err != nil {
|
||||
return err
|
||||
}
|
||||
this.migrationContext.OriginalBinlogRowImage = strings.ToUpper(this.migrationContext.OriginalBinlogRowImage)
|
||||
if this.migrationContext.OriginalBinlogRowImage != "FULL" {
|
||||
return fmt.Errorf("%s has '%s' binlog_row_image, and only 'FULL' is supported. This operation cannot proceed. You may `set global binlog_row_image='full'` and try again", this.connectionConfig.Key.String(), this.migrationContext.OriginalBinlogRowImage)
|
||||
|
||||
if strings.ToUpper(this.ServerInfo().BinlogRowImage) != "FULL" {
|
||||
return fmt.Errorf("%s has '%s' binlog_row_image, and only 'FULL' is supported. This operation cannot proceed. You may `set global binlog_row_image='full'` and try again", this.connectionConfig.Key.String(), this.ServerInfo().BinlogRowImage)
|
||||
}
|
||||
|
||||
this.migrationContext.Log.Infof("binary logs validated on %s", this.connectionConfig.Key.String())
|
||||
|
@ -379,13 +387,7 @@ func (this *Inspector) validateBinlogs() error {
|
|||
|
||||
// validateLogSlaveUpdates checks that binary log log_slave_updates is set. This test is not required when migrating on replica or when migrating directly on master
|
||||
func (this *Inspector) validateLogSlaveUpdates() error {
|
||||
query := `select @@global.log_slave_updates`
|
||||
var logSlaveUpdates bool
|
||||
if err := this.db.QueryRow(query).Scan(&logSlaveUpdates); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if logSlaveUpdates {
|
||||
if this.ServerInfo().LogSlaveUpdates {
|
||||
this.migrationContext.Log.Infof("log_slave_updates validated on %s", this.connectionConfig.Key.String())
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
Copyright 2022 GitHub Inc.
|
||||
Copyright 2023 GitHub Inc.
|
||||
See https://github.com/github/gh-ost/blob/master/LICENSE
|
||||
*/
|
||||
|
||||
|
@ -10,6 +10,8 @@ import (
|
|||
|
||||
test "github.com/openark/golib/tests"
|
||||
|
||||
"github.com/github/gh-ost/go/base"
|
||||
"github.com/github/gh-ost/go/mysql"
|
||||
"github.com/github/gh-ost/go/sql"
|
||||
)
|
||||
|
||||
|
@ -29,3 +31,30 @@ func TestInspectGetSharedUniqueKeys(t *testing.T) {
|
|||
test.S(t).ExpectEquals(sharedUniqKeys[0].Columns.String(), "id,item_id")
|
||||
test.S(t).ExpectEquals(sharedUniqKeys[1].Columns.String(), "id,org_id")
|
||||
}
|
||||
|
||||
func TestRequiresBinlogFormatChange(t *testing.T) {
|
||||
migrationContext := &base.MigrationContext{
|
||||
InspectorServerInfo: &mysql.ServerInfo{},
|
||||
}
|
||||
inspector := &Inspector{migrationContext: migrationContext}
|
||||
{
|
||||
migrationContext.InspectorServerInfo.BinlogFormat = "ROW"
|
||||
test.S(t).ExpectFalse(inspector.RequiresBinlogFormatChange())
|
||||
}
|
||||
{
|
||||
migrationContext.InspectorServerInfo.BinlogFormat = ""
|
||||
test.S(t).ExpectTrue(inspector.RequiresBinlogFormatChange())
|
||||
}
|
||||
{
|
||||
migrationContext.InspectorServerInfo.BinlogFormat = "MINIMAL"
|
||||
test.S(t).ExpectTrue(inspector.RequiresBinlogFormatChange())
|
||||
}
|
||||
{
|
||||
migrationContext.InspectorServerInfo.BinlogFormat = "MIXED"
|
||||
test.S(t).ExpectTrue(inspector.RequiresBinlogFormatChange())
|
||||
}
|
||||
{
|
||||
migrationContext.InspectorServerInfo = nil
|
||||
test.S(t).ExpectTrue(inspector.RequiresBinlogFormatChange())
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
Copyright 2022 GitHub Inc.
|
||||
Copyright 2023 GitHub Inc.
|
||||
See https://github.com/github/gh-ost/blob/master/LICENSE
|
||||
*/
|
||||
|
||||
|
@ -178,18 +178,22 @@ help # This message
|
|||
return NoPrintStatusRule, fmt.Errorf("coordinates are read-only")
|
||||
}
|
||||
case "applier":
|
||||
if this.migrationContext.ApplierConnectionConfig != nil && this.migrationContext.ApplierConnectionConfig.ImpliedKey != nil {
|
||||
fmt.Fprintf(writer, "Host: %s, Version: %s\n",
|
||||
if this.migrationContext.ApplierConnectionConfig != nil && this.migrationContext.ApplierConnectionConfig.ImpliedKey != nil &&
|
||||
this.migrationContext.ApplierServerInfo != nil {
|
||||
fmt.Fprintf(writer, "Host: %s, Version: %s (%s)\n",
|
||||
this.migrationContext.ApplierConnectionConfig.ImpliedKey.String(),
|
||||
this.migrationContext.ApplierMySQLVersion,
|
||||
this.migrationContext.ApplierServerInfo.Version,
|
||||
this.migrationContext.ApplierServerInfo.VersionComment,
|
||||
)
|
||||
}
|
||||
return NoPrintStatusRule, nil
|
||||
case "inspector":
|
||||
if this.migrationContext.InspectorConnectionConfig != nil && this.migrationContext.InspectorConnectionConfig.ImpliedKey != nil {
|
||||
fmt.Fprintf(writer, "Host: %s, Version: %s\n",
|
||||
if this.migrationContext.InspectorConnectionConfig != nil && this.migrationContext.InspectorConnectionConfig.ImpliedKey != nil &&
|
||||
this.migrationContext.InspectorServerInfo != nil {
|
||||
fmt.Fprintf(writer, "Host: %s, Version: %s (%s)\n",
|
||||
this.migrationContext.InspectorConnectionConfig.ImpliedKey.String(),
|
||||
this.migrationContext.InspectorMySQLVersion,
|
||||
this.migrationContext.InspectorServerInfo.Version,
|
||||
this.migrationContext.InspectorServerInfo.VersionComment,
|
||||
)
|
||||
}
|
||||
return NoPrintStatusRule, nil
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
Copyright 2022 GitHub Inc.
|
||||
Copyright 2023 GitHub Inc.
|
||||
See https://github.com/github/gh-ost/blob/master/LICENSE
|
||||
*/
|
||||
|
||||
|
@ -107,7 +107,8 @@ func (this *EventsStreamer) InitDBConnections() (err error) {
|
|||
if this.db, _, err = mysql.GetDB(this.migrationContext.Uuid, EventsStreamerUri); err != nil {
|
||||
return err
|
||||
}
|
||||
if _, err := base.ValidateConnection(this.db, this.connectionConfig, this.migrationContext, this.name); err != nil {
|
||||
if err := base.ValidateConnection(this.migrationContext.InspectorServerInfo, this.connectionConfig,
|
||||
this.migrationContext, this.name); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := this.readCurrentBinlogCoordinates(); err != nil {
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
Copyright 2022 GitHub Inc.
|
||||
Copyright 2023 GitHub Inc.
|
||||
See https://github.com/github/gh-ost/blob/master/LICENSE
|
||||
*/
|
||||
|
||||
|
@ -24,9 +24,9 @@ func init() {
|
|||
func TestNewConnectionConfig(t *testing.T) {
|
||||
c := NewConnectionConfig()
|
||||
test.S(t).ExpectEquals(c.Key.Hostname, "")
|
||||
test.S(t).ExpectEquals(c.Key.Port, 0)
|
||||
test.S(t).ExpectEquals(c.Key.Port, int64(0))
|
||||
test.S(t).ExpectEquals(c.ImpliedKey.Hostname, "")
|
||||
test.S(t).ExpectEquals(c.ImpliedKey.Port, 0)
|
||||
test.S(t).ExpectEquals(c.ImpliedKey.Port, int64(0))
|
||||
test.S(t).ExpectEquals(c.User, "")
|
||||
test.S(t).ExpectEquals(c.Password, "")
|
||||
test.S(t).ExpectEquals(c.TransactionIsolation, "")
|
||||
|
@ -45,9 +45,9 @@ func TestDuplicateCredentials(t *testing.T) {
|
|||
|
||||
dup := c.DuplicateCredentials(InstanceKey{Hostname: "otherhost", Port: 3310})
|
||||
test.S(t).ExpectEquals(dup.Key.Hostname, "otherhost")
|
||||
test.S(t).ExpectEquals(dup.Key.Port, 3310)
|
||||
test.S(t).ExpectEquals(dup.Key.Port, int64(3310))
|
||||
test.S(t).ExpectEquals(dup.ImpliedKey.Hostname, "otherhost")
|
||||
test.S(t).ExpectEquals(dup.ImpliedKey.Port, 3310)
|
||||
test.S(t).ExpectEquals(dup.ImpliedKey.Port, int64(3310))
|
||||
test.S(t).ExpectEquals(dup.User, "gromit")
|
||||
test.S(t).ExpectEquals(dup.Password, "penguin")
|
||||
test.S(t).ExpectEquals(dup.tlsConfig, c.tlsConfig)
|
||||
|
@ -63,9 +63,9 @@ func TestDuplicate(t *testing.T) {
|
|||
|
||||
dup := c.Duplicate()
|
||||
test.S(t).ExpectEquals(dup.Key.Hostname, "myhost")
|
||||
test.S(t).ExpectEquals(dup.Key.Port, 3306)
|
||||
test.S(t).ExpectEquals(dup.Key.Port, int64(3306))
|
||||
test.S(t).ExpectEquals(dup.ImpliedKey.Hostname, "myhost")
|
||||
test.S(t).ExpectEquals(dup.ImpliedKey.Port, 3306)
|
||||
test.S(t).ExpectEquals(dup.ImpliedKey.Port, int64(3306))
|
||||
test.S(t).ExpectEquals(dup.User, "gromit")
|
||||
test.S(t).ExpectEquals(dup.Password, "penguin")
|
||||
test.S(t).ExpectEquals(dup.TransactionIsolation, transactionIsolation)
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
/*
|
||||
Copyright 2015 Shlomi Noach, courtesy Booking.com
|
||||
Copyright 2022 GitHub Inc.
|
||||
Copyright 2023 GitHub Inc.
|
||||
See https://github.com/github/gh-ost/blob/master/LICENSE
|
||||
*/
|
||||
|
||||
|
@ -13,7 +13,7 @@ import (
|
|||
"strings"
|
||||
)
|
||||
|
||||
const DefaultInstancePort = 3306
|
||||
const DefaultInstancePort int64 = 3306
|
||||
|
||||
var (
|
||||
ipv4HostPortRegexp = regexp.MustCompile("^([^:]+):([0-9]+)$")
|
||||
|
@ -28,7 +28,7 @@ var (
|
|||
// InstanceKey is an instance indicator, identified by hostname and port
|
||||
type InstanceKey struct {
|
||||
Hostname string
|
||||
Port int
|
||||
Port int64
|
||||
}
|
||||
|
||||
const detachHint = "//"
|
||||
|
@ -52,7 +52,7 @@ func NewRawInstanceKey(hostPort string) (*InstanceKey, error) {
|
|||
instanceKey := &InstanceKey{Hostname: hostname, Port: DefaultInstancePort}
|
||||
if port != "" {
|
||||
var err error
|
||||
if instanceKey.Port, err = strconv.Atoi(port); err != nil {
|
||||
if instanceKey.Port, err = strconv.ParseInt(port, 10, 64); err != nil {
|
||||
return instanceKey, fmt.Errorf("Invalid port: %s", port)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
Copyright 2016 GitHub Inc.
|
||||
Copyright 2023 GitHub Inc.
|
||||
See https://github.com/github/gh-ost/blob/master/LICENSE
|
||||
*/
|
||||
|
||||
|
@ -21,43 +21,43 @@ func TestParseInstanceKey(t *testing.T) {
|
|||
key, err := ParseInstanceKey("myhost:1234")
|
||||
test.S(t).ExpectNil(err)
|
||||
test.S(t).ExpectEquals(key.Hostname, "myhost")
|
||||
test.S(t).ExpectEquals(key.Port, 1234)
|
||||
test.S(t).ExpectEquals(key.Port, int64(1234))
|
||||
}
|
||||
{
|
||||
key, err := ParseInstanceKey("myhost")
|
||||
test.S(t).ExpectNil(err)
|
||||
test.S(t).ExpectEquals(key.Hostname, "myhost")
|
||||
test.S(t).ExpectEquals(key.Port, 3306)
|
||||
test.S(t).ExpectEquals(key.Port, DefaultInstancePort)
|
||||
}
|
||||
{
|
||||
key, err := ParseInstanceKey("10.0.0.3:3307")
|
||||
test.S(t).ExpectNil(err)
|
||||
test.S(t).ExpectEquals(key.Hostname, "10.0.0.3")
|
||||
test.S(t).ExpectEquals(key.Port, 3307)
|
||||
test.S(t).ExpectEquals(key.Port, int64(3307))
|
||||
}
|
||||
{
|
||||
key, err := ParseInstanceKey("10.0.0.3")
|
||||
test.S(t).ExpectNil(err)
|
||||
test.S(t).ExpectEquals(key.Hostname, "10.0.0.3")
|
||||
test.S(t).ExpectEquals(key.Port, 3306)
|
||||
test.S(t).ExpectEquals(key.Port, DefaultInstancePort)
|
||||
}
|
||||
{
|
||||
key, err := ParseInstanceKey("[2001:db8:1f70::999:de8:7648:6e8]:3308")
|
||||
test.S(t).ExpectNil(err)
|
||||
test.S(t).ExpectEquals(key.Hostname, "2001:db8:1f70::999:de8:7648:6e8")
|
||||
test.S(t).ExpectEquals(key.Port, 3308)
|
||||
test.S(t).ExpectEquals(key.Port, int64(3308))
|
||||
}
|
||||
{
|
||||
key, err := ParseInstanceKey("::1")
|
||||
test.S(t).ExpectNil(err)
|
||||
test.S(t).ExpectEquals(key.Hostname, "::1")
|
||||
test.S(t).ExpectEquals(key.Port, 3306)
|
||||
test.S(t).ExpectEquals(key.Port, DefaultInstancePort)
|
||||
}
|
||||
{
|
||||
key, err := ParseInstanceKey("0:0:0:0:0:0:0:0")
|
||||
test.S(t).ExpectNil(err)
|
||||
test.S(t).ExpectEquals(key.Hostname, "0:0:0:0:0:0:0:0")
|
||||
test.S(t).ExpectEquals(key.Port, 3306)
|
||||
test.S(t).ExpectEquals(key.Port, DefaultInstancePort)
|
||||
}
|
||||
{
|
||||
_, err := ParseInstanceKey("[2001:xxxx:1f70::999:de8:7648:6e8]:3308")
|
||||
|
|
46
go/mysql/server_info.go
Normal file
46
go/mysql/server_info.go
Normal file
|
@ -0,0 +1,46 @@
|
|||
/*
|
||||
Copyright 2023 GitHub Inc.
|
||||
See https://github.com/github/gh-ost/blob/master/LICENSE
|
||||
*/
|
||||
|
||||
package mysql
|
||||
|
||||
import gosql "database/sql"
|
||||
|
||||
// ServerInfo represents the online config of a MySQL server.
|
||||
type ServerInfo struct {
|
||||
Version string
|
||||
VersionComment string
|
||||
Hostname string
|
||||
Port gosql.NullInt64
|
||||
BinlogFormat string
|
||||
BinlogRowImage string
|
||||
LogBin bool
|
||||
LogSlaveUpdates bool
|
||||
SQLMode string
|
||||
TimeZone string
|
||||
|
||||
// @@global.extra_port is Percona/MariaDB-only
|
||||
ExtraPort gosql.NullInt64
|
||||
}
|
||||
|
||||
// GetServerInfo returns a ServerInfo struct representing
|
||||
// the online config of a MySQL server.
|
||||
func GetServerInfo(db *gosql.DB) (*ServerInfo, error) {
|
||||
var info ServerInfo
|
||||
query := `select /* gh-ost */ @@global.version, @@global.version_comment, @@global.hostname,
|
||||
@@global.port, @@global.binlog_format, @@global.binlog_row_image, @@global.log_bin,
|
||||
@@global.log_slave_updates, @@global.sql_mode, @@global.time_zone`
|
||||
if err := db.QueryRow(query).Scan(&info.Version, &info.VersionComment, &info.Hostname,
|
||||
&info.Port, &info.BinlogFormat, &info.BinlogRowImage, &info.LogBin,
|
||||
&info.LogSlaveUpdates, &info.SQLMode, &info.TimeZone,
|
||||
); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
extraPortQuery := `select @@global.extra_port`
|
||||
// swallow possible error. not all servers support extra_port
|
||||
_ = db.QueryRow(extraPortQuery).Scan(&info.ExtraPort)
|
||||
|
||||
return &info, nil
|
||||
}
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
Copyright 2016 GitHub Inc.
|
||||
Copyright 2023 GitHub Inc.
|
||||
See https://github.com/github/gh-ost/blob/master/LICENSE
|
||||
*/
|
||||
|
||||
|
@ -107,7 +107,7 @@ func GetMasterKeyFromSlaveStatus(connectionConfig *ConnectionConfig) (masterKey
|
|||
|
||||
masterKey = &InstanceKey{
|
||||
Hostname: rowMap.GetString("Master_Host"),
|
||||
Port: rowMap.GetInt("Master_Port"),
|
||||
Port: rowMap.GetInt64("Master_Port"),
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
|
Loading…
Reference in New Issue
Block a user