diff --git a/go/base/context.go b/go/base/context.go index 52568ba..d10b23e 100644 --- a/go/base/context.go +++ b/go/base/context.go @@ -5,7 +5,11 @@ package base -import () +import ( + "fmt" + + "github.com/github/gh-osc/go/mysql" +) type RowsEstimateMethod string @@ -16,17 +20,19 @@ const ( ) type MigrationContext struct { - DatabaseName string - OriginalTableName string - GhostTableName string - AlterStatement string - TableEngine string - CountTableRows bool - RowsEstimate int64 - UsedRowsEstimateMethod RowsEstimateMethod - ChunkSize int - OriginalBinlogFormat string - OriginalBinlogRowImage string + DatabaseName string + OriginalTableName string + AlterStatement string + TableEngine string + CountTableRows bool + RowsEstimate int64 + UsedRowsEstimateMethod RowsEstimateMethod + ChunkSize int + OriginalBinlogFormat string + OriginalBinlogRowImage string + AllowedRunningOnMaster bool + InspectorConnectionConfig *mysql.ConnectionConfig + MasterConnectionConfig *mysql.ConnectionConfig } var context *MigrationContext @@ -37,7 +43,9 @@ func init() { func newMigrationContext() *MigrationContext { return &MigrationContext{ - ChunkSize: 1000, + ChunkSize: 1000, + InspectorConnectionConfig: mysql.NewConnectionConfig(), + MasterConnectionConfig: mysql.NewConnectionConfig(), } } @@ -45,7 +53,16 @@ func GetMigrationContext() *MigrationContext { return context } +func (this *MigrationContext) GetGhostTableName() string { + return fmt.Sprintf("_%s_New", this.OriginalTableName) +} + // RequiresBinlogFormatChange func (this *MigrationContext) RequiresBinlogFormatChange() bool { return this.OriginalBinlogFormat != "ROW" } + +// RequiresBinlogFormatChange +func (this *MigrationContext) IsRunningOnMaster() bool { + return this.InspectorConnectionConfig.Equals(this.MasterConnectionConfig) +} diff --git a/go/binlog/gomysql_reader.go b/go/binlog/gomysql_reader.go index 3e6db9d..bc96400 100644 --- a/go/binlog/gomysql_reader.go +++ b/go/binlog/gomysql_reader.go @@ -35,7 +35,7 @@ func NewGoMySQLReader(connectionConfig *mysql.ConnectionConfig) (binlogReader *G binlogReader.binlogSyncer = replication.NewBinlogSyncer(serverId, "mysql") // Register slave, the MySQL master is at 127.0.0.1:3306, with user root and an empty password - err = binlogReader.binlogSyncer.RegisterSlave(connectionConfig.Hostname, uint16(connectionConfig.Port), connectionConfig.User, connectionConfig.Password) + err = binlogReader.binlogSyncer.RegisterSlave(connectionConfig.Key.Hostname, uint16(connectionConfig.Key.Port), connectionConfig.User, connectionConfig.Password) if err != nil { return binlogReader, err } diff --git a/go/cmd/gh-osc/main.go b/go/cmd/gh-osc/main.go index 2cba72a..7f73993 100644 --- a/go/cmd/gh-osc/main.go +++ b/go/cmd/gh-osc/main.go @@ -13,13 +13,11 @@ import ( "github.com/github/gh-osc/go/base" "github.com/github/gh-osc/go/binlog" "github.com/github/gh-osc/go/logic" - "github.com/github/gh-osc/go/mysql" "github.com/outbrain/golib/log" ) // main is the application's entry point. It will either spawn a CLI or HTTP itnerfaces. func main() { - var connectionConfig mysql.ConnectionConfig migrationContext := base.GetMigrationContext() // mysqlBasedir := flag.String("mysql-basedir", "", "the --basedir config for MySQL (auto-detected if not given)") @@ -27,15 +25,16 @@ func main() { internalExperiment := flag.Bool("internal-experiment", false, "issue an internal experiment") binlogFile := flag.String("binlog-file", "", "Name of binary log file") - flag.StringVar(&connectionConfig.Hostname, "host", "127.0.0.1", "MySQL hostname (preferably a replica, not the master)") - flag.IntVar(&connectionConfig.Port, "port", 3306, "MySQL port (preferably a replica, not the master)") - flag.StringVar(&connectionConfig.User, "user", "root", "MySQL user") - flag.StringVar(&connectionConfig.Password, "password", "", "MySQL password") + flag.StringVar(&migrationContext.InspectorConnectionConfig.Key.Hostname, "host", "127.0.0.1", "MySQL hostname (preferably a replica, not the master)") + flag.IntVar(&migrationContext.InspectorConnectionConfig.Key.Port, "port", 3306, "MySQL port (preferably a replica, not the master)") + flag.StringVar(&migrationContext.InspectorConnectionConfig.User, "user", "root", "MySQL user") + flag.StringVar(&migrationContext.InspectorConnectionConfig.Password, "password", "", "MySQL password") flag.StringVar(&migrationContext.DatabaseName, "database", "", "database name (mandatory)") flag.StringVar(&migrationContext.OriginalTableName, "table", "", "table name (mandatory)") flag.StringVar(&migrationContext.AlterStatement, "alter", "", "alter statement (mandatory)") flag.BoolVar(&migrationContext.CountTableRows, "exact-rowcount", false, "actually count table rows as opposed to estimate them (results in more accurate progress estimation)") + flag.BoolVar(&migrationContext.AllowedRunningOnMaster, "allow-on-master", false, "allow this migration to run directly on master. Preferably it would run on a replica") quiet := flag.Bool("quiet", false, "quiet") verbose := flag.Bool("verbose", false, "verbose") @@ -83,14 +82,14 @@ func main() { var err error //binlogReader = binlog.NewMySQLBinlogReader(*mysqlBasedir, *mysqlDatadir) - binlogReader, err = binlog.NewGoMySQLReader(&connectionConfig) + binlogReader, err = binlog.NewGoMySQLReader(migrationContext.InspectorConnectionConfig) if err != nil { log.Fatale(err) } binlogReader.ReadEntries(*binlogFile, 0, 0) return } - migrator := logic.NewMigrator(&connectionConfig) + migrator := logic.NewMigrator() err := migrator.Migrate() if err != nil { log.Fatale(err) diff --git a/go/logic/inspect.go b/go/logic/inspect.go index 8839849..9da7abc 100644 --- a/go/logic/inspect.go +++ b/go/logic/inspect.go @@ -26,15 +26,15 @@ type Inspector struct { migrationContext *base.MigrationContext } -func NewInspector(connectionConfig *mysql.ConnectionConfig) *Inspector { +func NewInspector() *Inspector { return &Inspector{ - connectionConfig: connectionConfig, + connectionConfig: base.GetMigrationContext().InspectorConnectionConfig, migrationContext: base.GetMigrationContext(), } } func (this *Inspector) InitDBConnections() (err error) { - inspectorUri := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s", this.connectionConfig.User, this.connectionConfig.Password, this.connectionConfig.Hostname, this.connectionConfig.Port, this.migrationContext.DatabaseName) + inspectorUri := this.connectionConfig.GetDBUri(this.migrationContext.DatabaseName) if this.db, _, err = sqlutils.GetDB(inspectorUri); err != nil { return err } @@ -59,7 +59,6 @@ func (this *Inspector) InitDBConnections() (err error) { return err } } - return nil } @@ -76,15 +75,15 @@ func (this *Inspector) InspectTables() (err error) { // validateConnection issues a simple can-connect to MySQL func (this *Inspector) validateConnection() error { - query := `select @@port` + query := `select @@global.port` var port int if err := this.db.QueryRow(query).Scan(&port); err != nil { return err } - if port != this.connectionConfig.Port { + if port != this.connectionConfig.Key.Port { return fmt.Errorf("Unexpected database port reported: %+v", port) } - log.Infof("connection validated on port %+v", port) + log.Infof("connection validated on %+v", this.connectionConfig.Key) return nil } @@ -116,7 +115,7 @@ func (this *Inspector) validateGrants() error { return nil }) if err != nil { - return log.Errore(err) + return err } if foundAll { @@ -138,10 +137,10 @@ func (this *Inspector) validateBinlogs() error { return err } if !hasBinaryLogs { - return fmt.Errorf("%s:%d must have binary logs enabled", this.connectionConfig.Hostname, this.connectionConfig.Port) + return fmt.Errorf("%s:%d must have binary logs enabled", this.connectionConfig.Key.Hostname, this.connectionConfig.Key.Port) } if !logSlaveUpdates { - return fmt.Errorf("%s:%d must have log_slave_updates enabled", this.connectionConfig.Hostname, this.connectionConfig.Port) + return fmt.Errorf("%s:%d must have log_slave_updates enabled", this.connectionConfig.Key.Hostname, this.connectionConfig.Key.Port) } if this.migrationContext.RequiresBinlogFormatChange() { query := fmt.Sprintf(`show /* gh-osc */ slave hosts`) @@ -151,12 +150,12 @@ func (this *Inspector) validateBinlogs() error { return nil }) if err != nil { - return log.Errore(err) + return err } if countReplicas > 0 { - return fmt.Errorf("%s:%d has %s binlog_format, but I'm too scared to change it to ROW because it has replicas. Bailing out", this.connectionConfig.Hostname, this.connectionConfig.Port, this.migrationContext.OriginalBinlogFormat) + return fmt.Errorf("%s:%d has %s binlog_format, but I'm too scared to change it to ROW because it has replicas. Bailing out", this.connectionConfig.Key.Hostname, this.connectionConfig.Key.Port, this.migrationContext.OriginalBinlogFormat) } - log.Infof("%s:%d has %s binlog_format. I will change it to ROW for the duration of this migration.", this.connectionConfig.Hostname, this.connectionConfig.Port, this.migrationContext.OriginalBinlogFormat) + log.Infof("%s:%d has %s binlog_format. I will change it to ROW for the duration of this migration.", this.connectionConfig.Key.Hostname, this.connectionConfig.Key.Port, this.migrationContext.OriginalBinlogFormat) } query = `select @@global.binlog_row_image` if err := this.db.QueryRow(query).Scan(&this.migrationContext.OriginalBinlogRowImage); err != nil { @@ -164,7 +163,7 @@ func (this *Inspector) validateBinlogs() error { this.migrationContext.OriginalBinlogRowImage = "" } - log.Infof("binary logs validated on %s:%d", this.connectionConfig.Hostname, this.connectionConfig.Port) + log.Infof("binary logs validated on %s:%d", this.connectionConfig.Key.Hostname, this.connectionConfig.Key.Port) return nil } @@ -185,7 +184,7 @@ func (this *Inspector) validateTable() error { return nil }) if err != nil { - return log.Errore(err) + return err } if !tableFound { return log.Errorf("Cannot find table %s.%s!", sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.OriginalTableName)) @@ -207,7 +206,7 @@ func (this *Inspector) estimateTableRowsViaExplain() error { return nil }) if err != nil { - return log.Errore(err) + return err } if !outputFound { return log.Errorf("Cannot run EXPLAIN on %s.%s!", sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.OriginalTableName)) @@ -308,7 +307,7 @@ func (this *Inspector) getSharedUniqueKeys() (uniqueKeys [](*sql.UniqueKey), err if err != nil { return uniqueKeys, err } - ghostUniqueKeys, err := this.getCandidateUniqueKeys(this.migrationContext.GhostTableName) + ghostUniqueKeys, err := this.getCandidateUniqueKeys(this.migrationContext.GetGhostTableName()) if err != nil { return uniqueKeys, err } @@ -323,3 +322,44 @@ func (this *Inspector) getSharedUniqueKeys() (uniqueKeys [](*sql.UniqueKey), err } return uniqueKeys, nil } + +func (this *Inspector) getMasterConnectionConfig() (masterConfig *mysql.ConnectionConfig, err error) { + visitedKeys := mysql.NewInstanceKeyMap() + return getMasterConnectionConfigSafe(this.connectionConfig, this.migrationContext.DatabaseName, visitedKeys) +} + +func getMasterConnectionConfigSafe(connectionConfig *mysql.ConnectionConfig, databaseName string, visitedKeys *mysql.InstanceKeyMap) (masterConfig *mysql.ConnectionConfig, err error) { + log.Debugf("Looking for master on %+v", connectionConfig.Key) + + currentUri := connectionConfig.GetDBUri(databaseName) + db, _, err := sqlutils.GetDB(currentUri) + if err != nil { + return nil, err + } + + hasMaster := false + masterConfig = connectionConfig.Duplicate() + err = sqlutils.QueryRowsMap(db, `show slave status`, func(rowMap sqlutils.RowMap) error { + masterKey := mysql.InstanceKey{ + Hostname: rowMap.GetString("Master_Host"), + Port: rowMap.GetInt("Master_Port"), + } + if masterKey.IsValid() { + masterConfig.Key = masterKey + hasMaster = true + } + return nil + }) + if err != nil { + return nil, err + } + if hasMaster { + log.Debugf("Master of %+v is %+v", connectionConfig.Key, masterConfig.Key) + if visitedKeys.HasKey(masterConfig.Key) { + return nil, fmt.Errorf("There seems to be a master-master setup at %+v. This is unsupported. Bailing out", masterConfig.Key) + } + visitedKeys.AddKey(masterConfig.Key) + return getMasterConnectionConfigSafe(masterConfig, databaseName, visitedKeys) + } + return masterConfig, nil +} diff --git a/go/logic/migrator.go b/go/logic/migrator.go index 40adab9..d35346e 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -6,28 +6,54 @@ package logic import ( - "github.com/github/gh-osc/go/mysql" + "fmt" + + "github.com/github/gh-osc/go/base" + + "github.com/outbrain/golib/log" ) // Migrator is the main schema migration flow manager. type Migrator struct { - connectionConfig *mysql.ConnectionConfig inspector *Inspector + applier *Applier + migrationContext *base.MigrationContext } -func NewMigrator(connectionConfig *mysql.ConnectionConfig) *Migrator { +func NewMigrator() *Migrator { return &Migrator{ - connectionConfig: connectionConfig, - inspector: NewInspector(connectionConfig), + migrationContext: base.GetMigrationContext(), } } -func (this *Migrator) Migrate() error { +func (this *Migrator) Migrate() (err error) { + this.inspector = NewInspector() if err := this.inspector.InitDBConnections(); err != nil { return err } + if this.migrationContext.MasterConnectionConfig, err = this.inspector.getMasterConnectionConfig(); err != nil { + return err + } + if this.migrationContext.IsRunningOnMaster() && !this.migrationContext.AllowedRunningOnMaster { + return fmt.Errorf("It seems like this migration attempt to run directly on master. Preferably it would be executed on a replica (and this reduces load from the master). To proceed please provide --allow-on-master") + } + log.Infof("Master found to be %+v", this.migrationContext.MasterConnectionConfig.Key) if err := this.inspector.InspectTables(); err != nil { return err } + + this.applier = NewApplier() + if err := this.applier.InitDBConnections(); err != nil { + return err + } + if err := this.applier.CreateGhostTable(); err != nil { + log.Errorf("Unable to create ghost table, see further error details. Perhaps a previous migration failed without dropping the table? Bailing out") + return err + } + if err := this.applier.AlterGhost(); err != nil { + log.Errorf("Unable to ALTER ghost table, see further error details. Bailing out") + return err + } + return nil } diff --git a/go/mysql/connection.go b/go/mysql/connection.go index fc4115b..4d15241 100644 --- a/go/mysql/connection.go +++ b/go/mysql/connection.go @@ -5,10 +5,44 @@ package mysql +import ( + "fmt" +) + // ConnectionConfig is the minimal configuration required to connect to a MySQL server type ConnectionConfig struct { - Hostname string - Port int + Key InstanceKey User string Password string } + +func NewConnectionConfig() *ConnectionConfig { + config := &ConnectionConfig{ + Key: InstanceKey{}, + } + return config +} + +func (this *ConnectionConfig) Duplicate() *ConnectionConfig { + config := &ConnectionConfig{ + Key: InstanceKey{ + Hostname: this.Key.Hostname, + Port: this.Key.Port, + }, + User: this.User, + Password: this.Password, + } + return config +} + +func (this *ConnectionConfig) String() string { + return fmt.Sprintf("%s, user=%s", this.Key.DisplayString(), this.User) +} + +func (this *ConnectionConfig) Equals(other *ConnectionConfig) bool { + return this.Key.Equals(&other.Key) +} + +func (this *ConnectionConfig) GetDBUri(databaseName string) string { + return fmt.Sprintf("%s:%s@tcp(%s:%d)/%s", this.User, this.Password, this.Key.Hostname, this.Key.Port, databaseName) +}