detecting master (includes sanity checks). Introducing Applier. Creating and altering ghost table

This commit is contained in:
Shlomi Noach 2016-04-04 15:29:02 +02:00
parent bba352922a
commit cf87d16044
6 changed files with 163 additions and 47 deletions

View File

@ -5,7 +5,11 @@
package base package base
import () import (
"fmt"
"github.com/github/gh-osc/go/mysql"
)
type RowsEstimateMethod string type RowsEstimateMethod string
@ -16,17 +20,19 @@ const (
) )
type MigrationContext struct { type MigrationContext struct {
DatabaseName string DatabaseName string
OriginalTableName string OriginalTableName string
GhostTableName string AlterStatement string
AlterStatement string TableEngine string
TableEngine string CountTableRows bool
CountTableRows bool RowsEstimate int64
RowsEstimate int64 UsedRowsEstimateMethod RowsEstimateMethod
UsedRowsEstimateMethod RowsEstimateMethod ChunkSize int
ChunkSize int OriginalBinlogFormat string
OriginalBinlogFormat string OriginalBinlogRowImage string
OriginalBinlogRowImage string AllowedRunningOnMaster bool
InspectorConnectionConfig *mysql.ConnectionConfig
MasterConnectionConfig *mysql.ConnectionConfig
} }
var context *MigrationContext var context *MigrationContext
@ -37,7 +43,9 @@ func init() {
func newMigrationContext() *MigrationContext { func newMigrationContext() *MigrationContext {
return &MigrationContext{ return &MigrationContext{
ChunkSize: 1000, ChunkSize: 1000,
InspectorConnectionConfig: mysql.NewConnectionConfig(),
MasterConnectionConfig: mysql.NewConnectionConfig(),
} }
} }
@ -45,7 +53,16 @@ func GetMigrationContext() *MigrationContext {
return context return context
} }
func (this *MigrationContext) GetGhostTableName() string {
return fmt.Sprintf("_%s_New", this.OriginalTableName)
}
// RequiresBinlogFormatChange // RequiresBinlogFormatChange
func (this *MigrationContext) RequiresBinlogFormatChange() bool { func (this *MigrationContext) RequiresBinlogFormatChange() bool {
return this.OriginalBinlogFormat != "ROW" return this.OriginalBinlogFormat != "ROW"
} }
// RequiresBinlogFormatChange
func (this *MigrationContext) IsRunningOnMaster() bool {
return this.InspectorConnectionConfig.Equals(this.MasterConnectionConfig)
}

View File

@ -35,7 +35,7 @@ func NewGoMySQLReader(connectionConfig *mysql.ConnectionConfig) (binlogReader *G
binlogReader.binlogSyncer = replication.NewBinlogSyncer(serverId, "mysql") binlogReader.binlogSyncer = replication.NewBinlogSyncer(serverId, "mysql")
// Register slave, the MySQL master is at 127.0.0.1:3306, with user root and an empty password // Register slave, the MySQL master is at 127.0.0.1:3306, with user root and an empty password
err = binlogReader.binlogSyncer.RegisterSlave(connectionConfig.Hostname, uint16(connectionConfig.Port), connectionConfig.User, connectionConfig.Password) err = binlogReader.binlogSyncer.RegisterSlave(connectionConfig.Key.Hostname, uint16(connectionConfig.Key.Port), connectionConfig.User, connectionConfig.Password)
if err != nil { if err != nil {
return binlogReader, err return binlogReader, err
} }

View File

@ -13,13 +13,11 @@ import (
"github.com/github/gh-osc/go/base" "github.com/github/gh-osc/go/base"
"github.com/github/gh-osc/go/binlog" "github.com/github/gh-osc/go/binlog"
"github.com/github/gh-osc/go/logic" "github.com/github/gh-osc/go/logic"
"github.com/github/gh-osc/go/mysql"
"github.com/outbrain/golib/log" "github.com/outbrain/golib/log"
) )
// main is the application's entry point. It will either spawn a CLI or HTTP itnerfaces. // main is the application's entry point. It will either spawn a CLI or HTTP itnerfaces.
func main() { func main() {
var connectionConfig mysql.ConnectionConfig
migrationContext := base.GetMigrationContext() migrationContext := base.GetMigrationContext()
// mysqlBasedir := flag.String("mysql-basedir", "", "the --basedir config for MySQL (auto-detected if not given)") // mysqlBasedir := flag.String("mysql-basedir", "", "the --basedir config for MySQL (auto-detected if not given)")
@ -27,15 +25,16 @@ func main() {
internalExperiment := flag.Bool("internal-experiment", false, "issue an internal experiment") internalExperiment := flag.Bool("internal-experiment", false, "issue an internal experiment")
binlogFile := flag.String("binlog-file", "", "Name of binary log file") binlogFile := flag.String("binlog-file", "", "Name of binary log file")
flag.StringVar(&connectionConfig.Hostname, "host", "127.0.0.1", "MySQL hostname (preferably a replica, not the master)") flag.StringVar(&migrationContext.InspectorConnectionConfig.Key.Hostname, "host", "127.0.0.1", "MySQL hostname (preferably a replica, not the master)")
flag.IntVar(&connectionConfig.Port, "port", 3306, "MySQL port (preferably a replica, not the master)") flag.IntVar(&migrationContext.InspectorConnectionConfig.Key.Port, "port", 3306, "MySQL port (preferably a replica, not the master)")
flag.StringVar(&connectionConfig.User, "user", "root", "MySQL user") flag.StringVar(&migrationContext.InspectorConnectionConfig.User, "user", "root", "MySQL user")
flag.StringVar(&connectionConfig.Password, "password", "", "MySQL password") flag.StringVar(&migrationContext.InspectorConnectionConfig.Password, "password", "", "MySQL password")
flag.StringVar(&migrationContext.DatabaseName, "database", "", "database name (mandatory)") flag.StringVar(&migrationContext.DatabaseName, "database", "", "database name (mandatory)")
flag.StringVar(&migrationContext.OriginalTableName, "table", "", "table name (mandatory)") flag.StringVar(&migrationContext.OriginalTableName, "table", "", "table name (mandatory)")
flag.StringVar(&migrationContext.AlterStatement, "alter", "", "alter statement (mandatory)") flag.StringVar(&migrationContext.AlterStatement, "alter", "", "alter statement (mandatory)")
flag.BoolVar(&migrationContext.CountTableRows, "exact-rowcount", false, "actually count table rows as opposed to estimate them (results in more accurate progress estimation)") flag.BoolVar(&migrationContext.CountTableRows, "exact-rowcount", false, "actually count table rows as opposed to estimate them (results in more accurate progress estimation)")
flag.BoolVar(&migrationContext.AllowedRunningOnMaster, "allow-on-master", false, "allow this migration to run directly on master. Preferably it would run on a replica")
quiet := flag.Bool("quiet", false, "quiet") quiet := flag.Bool("quiet", false, "quiet")
verbose := flag.Bool("verbose", false, "verbose") verbose := flag.Bool("verbose", false, "verbose")
@ -83,14 +82,14 @@ func main() {
var err error var err error
//binlogReader = binlog.NewMySQLBinlogReader(*mysqlBasedir, *mysqlDatadir) //binlogReader = binlog.NewMySQLBinlogReader(*mysqlBasedir, *mysqlDatadir)
binlogReader, err = binlog.NewGoMySQLReader(&connectionConfig) binlogReader, err = binlog.NewGoMySQLReader(migrationContext.InspectorConnectionConfig)
if err != nil { if err != nil {
log.Fatale(err) log.Fatale(err)
} }
binlogReader.ReadEntries(*binlogFile, 0, 0) binlogReader.ReadEntries(*binlogFile, 0, 0)
return return
} }
migrator := logic.NewMigrator(&connectionConfig) migrator := logic.NewMigrator()
err := migrator.Migrate() err := migrator.Migrate()
if err != nil { if err != nil {
log.Fatale(err) log.Fatale(err)

View File

@ -26,15 +26,15 @@ type Inspector struct {
migrationContext *base.MigrationContext migrationContext *base.MigrationContext
} }
func NewInspector(connectionConfig *mysql.ConnectionConfig) *Inspector { func NewInspector() *Inspector {
return &Inspector{ return &Inspector{
connectionConfig: connectionConfig, connectionConfig: base.GetMigrationContext().InspectorConnectionConfig,
migrationContext: base.GetMigrationContext(), migrationContext: base.GetMigrationContext(),
} }
} }
func (this *Inspector) InitDBConnections() (err error) { func (this *Inspector) InitDBConnections() (err error) {
inspectorUri := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s", this.connectionConfig.User, this.connectionConfig.Password, this.connectionConfig.Hostname, this.connectionConfig.Port, this.migrationContext.DatabaseName) inspectorUri := this.connectionConfig.GetDBUri(this.migrationContext.DatabaseName)
if this.db, _, err = sqlutils.GetDB(inspectorUri); err != nil { if this.db, _, err = sqlutils.GetDB(inspectorUri); err != nil {
return err return err
} }
@ -59,7 +59,6 @@ func (this *Inspector) InitDBConnections() (err error) {
return err return err
} }
} }
return nil return nil
} }
@ -76,15 +75,15 @@ func (this *Inspector) InspectTables() (err error) {
// validateConnection issues a simple can-connect to MySQL // validateConnection issues a simple can-connect to MySQL
func (this *Inspector) validateConnection() error { func (this *Inspector) validateConnection() error {
query := `select @@port` query := `select @@global.port`
var port int var port int
if err := this.db.QueryRow(query).Scan(&port); err != nil { if err := this.db.QueryRow(query).Scan(&port); err != nil {
return err return err
} }
if port != this.connectionConfig.Port { if port != this.connectionConfig.Key.Port {
return fmt.Errorf("Unexpected database port reported: %+v", port) return fmt.Errorf("Unexpected database port reported: %+v", port)
} }
log.Infof("connection validated on port %+v", port) log.Infof("connection validated on %+v", this.connectionConfig.Key)
return nil return nil
} }
@ -116,7 +115,7 @@ func (this *Inspector) validateGrants() error {
return nil return nil
}) })
if err != nil { if err != nil {
return log.Errore(err) return err
} }
if foundAll { if foundAll {
@ -138,10 +137,10 @@ func (this *Inspector) validateBinlogs() error {
return err return err
} }
if !hasBinaryLogs { if !hasBinaryLogs {
return fmt.Errorf("%s:%d must have binary logs enabled", this.connectionConfig.Hostname, this.connectionConfig.Port) return fmt.Errorf("%s:%d must have binary logs enabled", this.connectionConfig.Key.Hostname, this.connectionConfig.Key.Port)
} }
if !logSlaveUpdates { if !logSlaveUpdates {
return fmt.Errorf("%s:%d must have log_slave_updates enabled", this.connectionConfig.Hostname, this.connectionConfig.Port) return fmt.Errorf("%s:%d must have log_slave_updates enabled", this.connectionConfig.Key.Hostname, this.connectionConfig.Key.Port)
} }
if this.migrationContext.RequiresBinlogFormatChange() { if this.migrationContext.RequiresBinlogFormatChange() {
query := fmt.Sprintf(`show /* gh-osc */ slave hosts`) query := fmt.Sprintf(`show /* gh-osc */ slave hosts`)
@ -151,12 +150,12 @@ func (this *Inspector) validateBinlogs() error {
return nil return nil
}) })
if err != nil { if err != nil {
return log.Errore(err) return err
} }
if countReplicas > 0 { if countReplicas > 0 {
return fmt.Errorf("%s:%d has %s binlog_format, but I'm too scared to change it to ROW because it has replicas. Bailing out", this.connectionConfig.Hostname, this.connectionConfig.Port, this.migrationContext.OriginalBinlogFormat) return fmt.Errorf("%s:%d has %s binlog_format, but I'm too scared to change it to ROW because it has replicas. Bailing out", this.connectionConfig.Key.Hostname, this.connectionConfig.Key.Port, this.migrationContext.OriginalBinlogFormat)
} }
log.Infof("%s:%d has %s binlog_format. I will change it to ROW for the duration of this migration.", this.connectionConfig.Hostname, this.connectionConfig.Port, this.migrationContext.OriginalBinlogFormat) log.Infof("%s:%d has %s binlog_format. I will change it to ROW for the duration of this migration.", this.connectionConfig.Key.Hostname, this.connectionConfig.Key.Port, this.migrationContext.OriginalBinlogFormat)
} }
query = `select @@global.binlog_row_image` query = `select @@global.binlog_row_image`
if err := this.db.QueryRow(query).Scan(&this.migrationContext.OriginalBinlogRowImage); err != nil { if err := this.db.QueryRow(query).Scan(&this.migrationContext.OriginalBinlogRowImage); err != nil {
@ -164,7 +163,7 @@ func (this *Inspector) validateBinlogs() error {
this.migrationContext.OriginalBinlogRowImage = "" this.migrationContext.OriginalBinlogRowImage = ""
} }
log.Infof("binary logs validated on %s:%d", this.connectionConfig.Hostname, this.connectionConfig.Port) log.Infof("binary logs validated on %s:%d", this.connectionConfig.Key.Hostname, this.connectionConfig.Key.Port)
return nil return nil
} }
@ -185,7 +184,7 @@ func (this *Inspector) validateTable() error {
return nil return nil
}) })
if err != nil { if err != nil {
return log.Errore(err) return err
} }
if !tableFound { if !tableFound {
return log.Errorf("Cannot find table %s.%s!", sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.OriginalTableName)) return log.Errorf("Cannot find table %s.%s!", sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.OriginalTableName))
@ -207,7 +206,7 @@ func (this *Inspector) estimateTableRowsViaExplain() error {
return nil return nil
}) })
if err != nil { if err != nil {
return log.Errore(err) return err
} }
if !outputFound { if !outputFound {
return log.Errorf("Cannot run EXPLAIN on %s.%s!", sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.OriginalTableName)) return log.Errorf("Cannot run EXPLAIN on %s.%s!", sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.OriginalTableName))
@ -308,7 +307,7 @@ func (this *Inspector) getSharedUniqueKeys() (uniqueKeys [](*sql.UniqueKey), err
if err != nil { if err != nil {
return uniqueKeys, err return uniqueKeys, err
} }
ghostUniqueKeys, err := this.getCandidateUniqueKeys(this.migrationContext.GhostTableName) ghostUniqueKeys, err := this.getCandidateUniqueKeys(this.migrationContext.GetGhostTableName())
if err != nil { if err != nil {
return uniqueKeys, err return uniqueKeys, err
} }
@ -323,3 +322,44 @@ func (this *Inspector) getSharedUniqueKeys() (uniqueKeys [](*sql.UniqueKey), err
} }
return uniqueKeys, nil return uniqueKeys, nil
} }
func (this *Inspector) getMasterConnectionConfig() (masterConfig *mysql.ConnectionConfig, err error) {
visitedKeys := mysql.NewInstanceKeyMap()
return getMasterConnectionConfigSafe(this.connectionConfig, this.migrationContext.DatabaseName, visitedKeys)
}
func getMasterConnectionConfigSafe(connectionConfig *mysql.ConnectionConfig, databaseName string, visitedKeys *mysql.InstanceKeyMap) (masterConfig *mysql.ConnectionConfig, err error) {
log.Debugf("Looking for master on %+v", connectionConfig.Key)
currentUri := connectionConfig.GetDBUri(databaseName)
db, _, err := sqlutils.GetDB(currentUri)
if err != nil {
return nil, err
}
hasMaster := false
masterConfig = connectionConfig.Duplicate()
err = sqlutils.QueryRowsMap(db, `show slave status`, func(rowMap sqlutils.RowMap) error {
masterKey := mysql.InstanceKey{
Hostname: rowMap.GetString("Master_Host"),
Port: rowMap.GetInt("Master_Port"),
}
if masterKey.IsValid() {
masterConfig.Key = masterKey
hasMaster = true
}
return nil
})
if err != nil {
return nil, err
}
if hasMaster {
log.Debugf("Master of %+v is %+v", connectionConfig.Key, masterConfig.Key)
if visitedKeys.HasKey(masterConfig.Key) {
return nil, fmt.Errorf("There seems to be a master-master setup at %+v. This is unsupported. Bailing out", masterConfig.Key)
}
visitedKeys.AddKey(masterConfig.Key)
return getMasterConnectionConfigSafe(masterConfig, databaseName, visitedKeys)
}
return masterConfig, nil
}

View File

@ -6,28 +6,54 @@
package logic package logic
import ( import (
"github.com/github/gh-osc/go/mysql" "fmt"
"github.com/github/gh-osc/go/base"
"github.com/outbrain/golib/log"
) )
// Migrator is the main schema migration flow manager. // Migrator is the main schema migration flow manager.
type Migrator struct { type Migrator struct {
connectionConfig *mysql.ConnectionConfig
inspector *Inspector inspector *Inspector
applier *Applier
migrationContext *base.MigrationContext
} }
func NewMigrator(connectionConfig *mysql.ConnectionConfig) *Migrator { func NewMigrator() *Migrator {
return &Migrator{ return &Migrator{
connectionConfig: connectionConfig, migrationContext: base.GetMigrationContext(),
inspector: NewInspector(connectionConfig),
} }
} }
func (this *Migrator) Migrate() error { func (this *Migrator) Migrate() (err error) {
this.inspector = NewInspector()
if err := this.inspector.InitDBConnections(); err != nil { if err := this.inspector.InitDBConnections(); err != nil {
return err return err
} }
if this.migrationContext.MasterConnectionConfig, err = this.inspector.getMasterConnectionConfig(); err != nil {
return err
}
if this.migrationContext.IsRunningOnMaster() && !this.migrationContext.AllowedRunningOnMaster {
return fmt.Errorf("It seems like this migration attempt to run directly on master. Preferably it would be executed on a replica (and this reduces load from the master). To proceed please provide --allow-on-master")
}
log.Infof("Master found to be %+v", this.migrationContext.MasterConnectionConfig.Key)
if err := this.inspector.InspectTables(); err != nil { if err := this.inspector.InspectTables(); err != nil {
return err return err
} }
this.applier = NewApplier()
if err := this.applier.InitDBConnections(); err != nil {
return err
}
if err := this.applier.CreateGhostTable(); err != nil {
log.Errorf("Unable to create ghost table, see further error details. Perhaps a previous migration failed without dropping the table? Bailing out")
return err
}
if err := this.applier.AlterGhost(); err != nil {
log.Errorf("Unable to ALTER ghost table, see further error details. Bailing out")
return err
}
return nil return nil
} }

View File

@ -5,10 +5,44 @@
package mysql package mysql
import (
"fmt"
)
// ConnectionConfig is the minimal configuration required to connect to a MySQL server // ConnectionConfig is the minimal configuration required to connect to a MySQL server
type ConnectionConfig struct { type ConnectionConfig struct {
Hostname string Key InstanceKey
Port int
User string User string
Password string Password string
} }
func NewConnectionConfig() *ConnectionConfig {
config := &ConnectionConfig{
Key: InstanceKey{},
}
return config
}
func (this *ConnectionConfig) Duplicate() *ConnectionConfig {
config := &ConnectionConfig{
Key: InstanceKey{
Hostname: this.Key.Hostname,
Port: this.Key.Port,
},
User: this.User,
Password: this.Password,
}
return config
}
func (this *ConnectionConfig) String() string {
return fmt.Sprintf("%s, user=%s", this.Key.DisplayString(), this.User)
}
func (this *ConnectionConfig) Equals(other *ConnectionConfig) bool {
return this.Key.Equals(&other.Key)
}
func (this *ConnectionConfig) GetDBUri(databaseName string) string {
return fmt.Sprintf("%s:%s@tcp(%s:%d)/%s", this.User, this.Password, this.Key.Hostname, this.Key.Port, databaseName)
}