Merge branch 'master' into named-panic
This commit is contained in:
commit
ff17d2f844
@ -1 +1 @@
|
|||||||
1.0.47
|
1.0.48
|
||||||
|
4
build.sh
4
build.sh
@ -42,8 +42,8 @@ function build {
|
|||||||
builddir=$(setuptree)
|
builddir=$(setuptree)
|
||||||
cp $buildpath/$target $builddir/gh-ost/usr/bin
|
cp $buildpath/$target $builddir/gh-ost/usr/bin
|
||||||
cd $buildpath
|
cd $buildpath
|
||||||
fpm -v "${RELEASE_VERSION}" --epoch 1 -f -s dir -n gh-ost -m shlomi-noach --description "GitHub's Online Schema Migrations for MySQL " --url "https://github.com/github/gh-ost" --vendor "GitHub" --license "Apache 2.0" -C $builddir/gh-ost --prefix=/ -t rpm .
|
fpm -v "${RELEASE_VERSION}" --epoch 1 -f -s dir -n gh-ost -m 'shlomi-noach <shlomi-noach+gh-ost-deb@github.com>' --description "GitHub's Online Schema Migrations for MySQL " --url "https://github.com/github/gh-ost" --vendor "GitHub" --license "Apache 2.0" -C $builddir/gh-ost --prefix=/ -t rpm .
|
||||||
fpm -v "${RELEASE_VERSION}" --epoch 1 -f -s dir -n gh-ost -m shlomi-noach --description "GitHub's Online Schema Migrations for MySQL " --url "https://github.com/github/gh-ost" --vendor "GitHub" --license "Apache 2.0" -C $builddir/gh-ost --prefix=/ -t deb --deb-no-default-config-files .
|
fpm -v "${RELEASE_VERSION}" --epoch 1 -f -s dir -n gh-ost -m 'shlomi-noach <shlomi-noach+gh-ost-deb@github.com>' --description "GitHub's Online Schema Migrations for MySQL " --url "https://github.com/github/gh-ost" --vendor "GitHub" --license "Apache 2.0" -C $builddir/gh-ost --prefix=/ -t deb --deb-no-default-config-files .
|
||||||
fi
|
fi
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -185,6 +185,18 @@ By default `gh-ost` verifies no foreign keys exist on the migrated table. On ser
|
|||||||
|
|
||||||
See [`approve-renamed-columns`](#approve-renamed-columns)
|
See [`approve-renamed-columns`](#approve-renamed-columns)
|
||||||
|
|
||||||
|
### ssl
|
||||||
|
|
||||||
|
By default `gh-ost` does not use ssl/tls connections to the database servers when performing migrations. This flag instructs `gh-ost` to use encrypted connections. If enabled, `gh-ost` will use the system's ca certificate pool for server certificate verification. If a different certificate is needed for server verification, see `--ssl-ca`. If you wish to skip server verification, but still use encrypted connections, use with `--ssl-allow-insecure`.
|
||||||
|
|
||||||
|
### ssl-allow-insecure
|
||||||
|
|
||||||
|
Allows `gh-ost` to connect to the MySQL servers using encrypted connections, but without verifying the validity of the certificate provided by the server during the connection. Requires `--ssl`.
|
||||||
|
|
||||||
|
### ssl-ca
|
||||||
|
|
||||||
|
`--ssl-ca=/path/to/ca-cert.pem`: ca certificate file (in PEM format) to use for server certificate verification. If specified, the default system ca cert pool will not be used for verification, only the ca cert provided here. Requires `--ssl`.
|
||||||
|
|
||||||
### test-on-replica
|
### test-on-replica
|
||||||
|
|
||||||
Issue the migration on a replica; do not modify data on master. Useful for validating, testing and benchmarking. See [`testing-on-replica`](testing-on-replica.md)
|
Issue the migration on a replica; do not modify data on master. Useful for validating, testing and benchmarking. See [`testing-on-replica`](testing-on-replica.md)
|
||||||
|
@ -69,6 +69,8 @@ The following variables are available on all hooks:
|
|||||||
- `GH_OST_INSPECTED_HOST`
|
- `GH_OST_INSPECTED_HOST`
|
||||||
- `GH_OST_EXECUTING_HOST`
|
- `GH_OST_EXECUTING_HOST`
|
||||||
- `GH_OST_HOOKS_HINT` - copy of `--hooks-hint` value
|
- `GH_OST_HOOKS_HINT` - copy of `--hooks-hint` value
|
||||||
|
- `GH_OST_HOOKS_HINT_OWNER` - copy of `--hooks-hint-owner` value
|
||||||
|
- `GH_OST_HOOKS_HINT_TOKEN` - copy of `--hooks-hint-token` value
|
||||||
- `GH_OST_DRY_RUN` - whether or not the `gh-ost` run is a dry run
|
- `GH_OST_DRY_RUN` - whether or not the `gh-ost` run is a dry run
|
||||||
|
|
||||||
The following variable are available on particular hooks:
|
The following variable are available on particular hooks:
|
||||||
|
@ -28,3 +28,9 @@ It is therefore unlikely that `gh-ost` will support this behavior.
|
|||||||
Yes. TL;DR if running all on same replica/master, make sure to provide `--replica-server-id`. [Read more](cheatsheet.md#concurrent-migrations)
|
Yes. TL;DR if running all on same replica/master, make sure to provide `--replica-server-id`. [Read more](cheatsheet.md#concurrent-migrations)
|
||||||
|
|
||||||
# Why
|
# Why
|
||||||
|
|
||||||
|
### Why Is the "Connect to Replica" mode preferred?
|
||||||
|
|
||||||
|
To avoid placing extra load on the master. `gh-ost` connects as a replication client. Each additional replica adds some load to the master.
|
||||||
|
|
||||||
|
To monitor replication lag from a replica. This makes the replication lag throttle, `--max-lag-millis`, more representative of the lag experienced by other replicas following the master (perhaps N levels deep in a tree of replicas).
|
||||||
|
@ -99,6 +99,9 @@ type MigrationContext struct {
|
|||||||
ConfigFile string
|
ConfigFile string
|
||||||
CliUser string
|
CliUser string
|
||||||
CliPassword string
|
CliPassword string
|
||||||
|
UseTLS bool
|
||||||
|
TLSAllowInsecure bool
|
||||||
|
TLSCACertificate string
|
||||||
CliMasterUser string
|
CliMasterUser string
|
||||||
CliMasterPassword string
|
CliMasterPassword string
|
||||||
|
|
||||||
@ -127,6 +130,8 @@ type MigrationContext struct {
|
|||||||
PanicFlagFile string
|
PanicFlagFile string
|
||||||
HooksPath string
|
HooksPath string
|
||||||
HooksHintMessage string
|
HooksHintMessage string
|
||||||
|
HooksHintOwner string
|
||||||
|
HooksHintToken string
|
||||||
|
|
||||||
DropServeSocket bool
|
DropServeSocket bool
|
||||||
ServeSocketFile string
|
ServeSocketFile string
|
||||||
@ -696,6 +701,13 @@ func (this *MigrationContext) ApplyCredentials() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (this *MigrationContext) SetupTLS() error {
|
||||||
|
if this.UseTLS {
|
||||||
|
return this.InspectorConnectionConfig.UseTLS(this.TLSCACertificate, this.TLSAllowInsecure)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// ReadConfigFile attempts to read the config file, if it exists
|
// ReadConfigFile attempts to read the config file, if it exists
|
||||||
func (this *MigrationContext) ReadConfigFile() error {
|
func (this *MigrationContext) ReadConfigFile() error {
|
||||||
this.configMutex.Lock()
|
this.configMutex.Lock()
|
||||||
|
@ -26,7 +26,7 @@ func NewBinlogEntry(logFile string, logPos uint64) *BinlogEntry {
|
|||||||
return binlogEntry
|
return binlogEntry
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewBinlogEntry creates an empty, ready to go BinlogEntry object
|
// NewBinlogEntryAt creates an empty, ready to go BinlogEntry object
|
||||||
func NewBinlogEntryAt(coordinates mysql.BinlogCoordinates) *BinlogEntry {
|
func NewBinlogEntryAt(coordinates mysql.BinlogCoordinates) *BinlogEntry {
|
||||||
binlogEntry := &BinlogEntry{
|
binlogEntry := &BinlogEntry{
|
||||||
Coordinates: coordinates,
|
Coordinates: coordinates,
|
||||||
@ -41,7 +41,7 @@ func (this *BinlogEntry) Duplicate() *BinlogEntry {
|
|||||||
return binlogEntry
|
return binlogEntry
|
||||||
}
|
}
|
||||||
|
|
||||||
// Duplicate creates and returns a new binlog entry, with some of the attributes pre-assigned
|
// String() returns a string representation of this binlog entry
|
||||||
func (this *BinlogEntry) String() string {
|
func (this *BinlogEntry) String() string {
|
||||||
return fmt.Sprintf("[BinlogEntry at %+v; dml:%+v]", this.Coordinates, this.DmlEvent)
|
return fmt.Sprintf("[BinlogEntry at %+v; dml:%+v]", this.Coordinates, this.DmlEvent)
|
||||||
}
|
}
|
||||||
|
@ -46,6 +46,7 @@ func NewGoMySQLReader(migrationContext *base.MigrationContext) (binlogReader *Go
|
|||||||
Port: uint16(binlogReader.connectionConfig.Key.Port),
|
Port: uint16(binlogReader.connectionConfig.Key.Port),
|
||||||
User: binlogReader.connectionConfig.User,
|
User: binlogReader.connectionConfig.User,
|
||||||
Password: binlogReader.connectionConfig.Password,
|
Password: binlogReader.connectionConfig.Password,
|
||||||
|
TLSConfig: binlogReader.connectionConfig.TLSConfig(),
|
||||||
UseDecimal: true,
|
UseDecimal: true,
|
||||||
}
|
}
|
||||||
binlogReader.binlogSyncer = replication.NewBinlogSyncer(binlogSyncerConfig)
|
binlogReader.binlogSyncer = replication.NewBinlogSyncer(binlogSyncerConfig)
|
||||||
@ -112,7 +113,7 @@ func (this *GoMySQLReader) handleRowsEvent(ev *replication.BinlogEvent, rowsEven
|
|||||||
binlogEntry.DmlEvent.WhereColumnValues = sql.ToColumnValues(row)
|
binlogEntry.DmlEvent.WhereColumnValues = sql.ToColumnValues(row)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// The channel will do the throttling. Whoever is reding from the channel
|
// The channel will do the throttling. Whoever is reading from the channel
|
||||||
// decides whether action is taken synchronously (meaning we wait before
|
// decides whether action is taken synchronously (meaning we wait before
|
||||||
// next iteration) or asynchronously (we keep pushing more events)
|
// next iteration) or asynchronously (we keep pushing more events)
|
||||||
// In reality, reads will be synchronous
|
// In reality, reads will be synchronous
|
||||||
|
@ -14,6 +14,7 @@ import (
|
|||||||
|
|
||||||
"github.com/github/gh-ost/go/base"
|
"github.com/github/gh-ost/go/base"
|
||||||
"github.com/github/gh-ost/go/logic"
|
"github.com/github/gh-ost/go/logic"
|
||||||
|
_ "github.com/go-sql-driver/mysql"
|
||||||
"github.com/outbrain/golib/log"
|
"github.com/outbrain/golib/log"
|
||||||
|
|
||||||
"golang.org/x/crypto/ssh/terminal"
|
"golang.org/x/crypto/ssh/terminal"
|
||||||
@ -54,6 +55,10 @@ func main() {
|
|||||||
flag.StringVar(&migrationContext.ConfigFile, "conf", "", "Config file")
|
flag.StringVar(&migrationContext.ConfigFile, "conf", "", "Config file")
|
||||||
askPass := flag.Bool("ask-pass", false, "prompt for MySQL password")
|
askPass := flag.Bool("ask-pass", false, "prompt for MySQL password")
|
||||||
|
|
||||||
|
flag.BoolVar(&migrationContext.UseTLS, "ssl", false, "Enable SSL encrypted connections to MySQL hosts")
|
||||||
|
flag.StringVar(&migrationContext.TLSCACertificate, "ssl-ca", "", "CA certificate in PEM format for TLS connections to MySQL hosts. Requires --ssl")
|
||||||
|
flag.BoolVar(&migrationContext.TLSAllowInsecure, "ssl-allow-insecure", false, "Skips verification of MySQL hosts' certificate chain and host name. Requires --ssl")
|
||||||
|
|
||||||
flag.StringVar(&migrationContext.DatabaseName, "database", "", "database name (mandatory)")
|
flag.StringVar(&migrationContext.DatabaseName, "database", "", "database name (mandatory)")
|
||||||
flag.StringVar(&migrationContext.OriginalTableName, "table", "", "table name (mandatory)")
|
flag.StringVar(&migrationContext.OriginalTableName, "table", "", "table name (mandatory)")
|
||||||
flag.StringVar(&migrationContext.AlterStatement, "alter", "", "alter statement (mandatory)")
|
flag.StringVar(&migrationContext.AlterStatement, "alter", "", "alter statement (mandatory)")
|
||||||
@ -110,6 +115,8 @@ func main() {
|
|||||||
|
|
||||||
flag.StringVar(&migrationContext.HooksPath, "hooks-path", "", "directory where hook files are found (default: empty, ie. hooks disabled). Hook files found on this path, and conforming to hook naming conventions will be executed")
|
flag.StringVar(&migrationContext.HooksPath, "hooks-path", "", "directory where hook files are found (default: empty, ie. hooks disabled). Hook files found on this path, and conforming to hook naming conventions will be executed")
|
||||||
flag.StringVar(&migrationContext.HooksHintMessage, "hooks-hint", "", "arbitrary message to be injected to hooks via GH_OST_HOOKS_HINT, for your convenience")
|
flag.StringVar(&migrationContext.HooksHintMessage, "hooks-hint", "", "arbitrary message to be injected to hooks via GH_OST_HOOKS_HINT, for your convenience")
|
||||||
|
flag.StringVar(&migrationContext.HooksHintOwner, "hooks-hint-owner", "", "arbitrary name of owner to be injected to hooks via GH_OST_HOOKS_HINT_OWNER, for your convenience")
|
||||||
|
flag.StringVar(&migrationContext.HooksHintToken, "hooks-hint-token", "", "arbitrary token to be injected to hooks via GH_OST_HOOKS_HINT_TOKEN, for your convenience")
|
||||||
|
|
||||||
flag.UintVar(&migrationContext.ReplicaServerId, "replica-server-id", 99999, "server id used by gh-ost process. Default: 99999")
|
flag.UintVar(&migrationContext.ReplicaServerId, "replica-server-id", 99999, "server id used by gh-ost process. Default: 99999")
|
||||||
|
|
||||||
@ -195,6 +202,12 @@ func main() {
|
|||||||
if migrationContext.CliMasterPassword != "" && migrationContext.AssumeMasterHostname == "" {
|
if migrationContext.CliMasterPassword != "" && migrationContext.AssumeMasterHostname == "" {
|
||||||
log.Fatalf("--master-password requires --assume-master-host")
|
log.Fatalf("--master-password requires --assume-master-host")
|
||||||
}
|
}
|
||||||
|
if migrationContext.TLSCACertificate != "" && !migrationContext.UseTLS {
|
||||||
|
log.Fatalf("--ssl-ca requires --ssl")
|
||||||
|
}
|
||||||
|
if migrationContext.TLSAllowInsecure && !migrationContext.UseTLS {
|
||||||
|
log.Fatalf("--ssl-allow-insecure requires --ssl")
|
||||||
|
}
|
||||||
if *replicationLagQuery != "" {
|
if *replicationLagQuery != "" {
|
||||||
log.Warningf("--replication-lag-query is deprecated")
|
log.Warningf("--replication-lag-query is deprecated")
|
||||||
}
|
}
|
||||||
@ -239,6 +252,9 @@ func main() {
|
|||||||
migrationContext.SetThrottleHTTP(*throttleHTTP)
|
migrationContext.SetThrottleHTTP(*throttleHTTP)
|
||||||
migrationContext.SetDefaultNumRetries(*defaultRetries)
|
migrationContext.SetDefaultNumRetries(*defaultRetries)
|
||||||
migrationContext.ApplyCredentials()
|
migrationContext.ApplyCredentials()
|
||||||
|
if err := migrationContext.SetupTLS(); err != nil {
|
||||||
|
log.Fatale(err)
|
||||||
|
}
|
||||||
if err := migrationContext.SetCutOverLockTimeoutSeconds(*cutOverLockTimeoutSeconds); err != nil {
|
if err := migrationContext.SetCutOverLockTimeoutSeconds(*cutOverLockTimeoutSeconds); err != nil {
|
||||||
log.Errore(err)
|
log.Errore(err)
|
||||||
}
|
}
|
||||||
|
@ -73,7 +73,7 @@ func (this *Applier) InitDBConnections() (err error) {
|
|||||||
if this.db, _, err = mysql.GetDB(this.migrationContext.Uuid, applierUri); err != nil {
|
if this.db, _, err = mysql.GetDB(this.migrationContext.Uuid, applierUri); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
singletonApplierUri := fmt.Sprintf("%s?timeout=0", applierUri)
|
singletonApplierUri := fmt.Sprintf("%s&timeout=0", applierUri)
|
||||||
if this.singletonDB, _, err = mysql.GetDB(this.migrationContext.Uuid, singletonApplierUri); err != nil {
|
if this.singletonDB, _, err = mysql.GetDB(this.migrationContext.Uuid, singletonApplierUri); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -126,7 +126,6 @@ func (this *Applier) readTableColumns() (err error) {
|
|||||||
|
|
||||||
// showTableStatus returns the output of `show table status like '...'` command
|
// showTableStatus returns the output of `show table status like '...'` command
|
||||||
func (this *Applier) showTableStatus(tableName string) (rowMap sqlutils.RowMap) {
|
func (this *Applier) showTableStatus(tableName string) (rowMap sqlutils.RowMap) {
|
||||||
rowMap = nil
|
|
||||||
query := fmt.Sprintf(`show /* gh-ost */ table status from %s like '%s'`, sql.EscapeName(this.migrationContext.DatabaseName), tableName)
|
query := fmt.Sprintf(`show /* gh-ost */ table status from %s like '%s'`, sql.EscapeName(this.migrationContext.DatabaseName), tableName)
|
||||||
sqlutils.QueryRowsMap(this.db, query, func(m sqlutils.RowMap) error {
|
sqlutils.QueryRowsMap(this.db, query, func(m sqlutils.RowMap) error {
|
||||||
rowMap = m
|
rowMap = m
|
||||||
@ -482,6 +481,7 @@ func (this *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
defer tx.Rollback()
|
||||||
sessionQuery := fmt.Sprintf(`SET
|
sessionQuery := fmt.Sprintf(`SET
|
||||||
SESSION time_zone = '%s',
|
SESSION time_zone = '%s',
|
||||||
sql_mode = CONCAT(@@session.sql_mode, ',STRICT_ALL_TABLES')
|
sql_mode = CONCAT(@@session.sql_mode, ',STRICT_ALL_TABLES')
|
||||||
@ -1001,15 +1001,19 @@ func (this *Applier) ApplyDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
rollback := func(err error) error {
|
||||||
|
tx.Rollback()
|
||||||
|
return err
|
||||||
|
}
|
||||||
sessionQuery := `SET
|
sessionQuery := `SET
|
||||||
SESSION time_zone = '+00:00',
|
SESSION time_zone = '+00:00',
|
||||||
sql_mode = CONCAT(@@session.sql_mode, ',STRICT_ALL_TABLES')
|
sql_mode = CONCAT(@@session.sql_mode, ',STRICT_ALL_TABLES')
|
||||||
`
|
`
|
||||||
if _, err := tx.Exec(sessionQuery); err != nil {
|
if _, err := tx.Exec(sessionQuery); err != nil {
|
||||||
return err
|
return rollback(err)
|
||||||
}
|
}
|
||||||
if _, err := tx.Exec(buildResult.query, buildResult.args...); err != nil {
|
if _, err := tx.Exec(buildResult.query, buildResult.args...); err != nil {
|
||||||
return err
|
return rollback(err)
|
||||||
}
|
}
|
||||||
if err := tx.Commit(); err != nil {
|
if err := tx.Commit(); err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -64,6 +64,8 @@ func (this *HooksExecutor) applyEnvironmentVariables(extraVariables ...string) [
|
|||||||
env = append(env, fmt.Sprintf("GH_OST_INSPECTED_HOST=%s", this.migrationContext.GetInspectorHostname()))
|
env = append(env, fmt.Sprintf("GH_OST_INSPECTED_HOST=%s", this.migrationContext.GetInspectorHostname()))
|
||||||
env = append(env, fmt.Sprintf("GH_OST_EXECUTING_HOST=%s", this.migrationContext.Hostname))
|
env = append(env, fmt.Sprintf("GH_OST_EXECUTING_HOST=%s", this.migrationContext.Hostname))
|
||||||
env = append(env, fmt.Sprintf("GH_OST_HOOKS_HINT=%s", this.migrationContext.HooksHintMessage))
|
env = append(env, fmt.Sprintf("GH_OST_HOOKS_HINT=%s", this.migrationContext.HooksHintMessage))
|
||||||
|
env = append(env, fmt.Sprintf("GH_OST_HOOKS_HINT_OWNER=%s", this.migrationContext.HooksHintOwner))
|
||||||
|
env = append(env, fmt.Sprintf("GH_OST_HOOKS_HINT_TOKEN=%s", this.migrationContext.HooksHintToken))
|
||||||
env = append(env, fmt.Sprintf("GH_OST_DRY_RUN=%t", this.migrationContext.Noop))
|
env = append(env, fmt.Sprintf("GH_OST_DRY_RUN=%t", this.migrationContext.Noop))
|
||||||
|
|
||||||
for _, variable := range extraVariables {
|
for _, variable := range extraVariables {
|
||||||
|
@ -622,8 +622,6 @@ func (this *Inspector) getCandidateUniqueKeys(tableName string) (uniqueKeys [](*
|
|||||||
GROUP BY TABLE_SCHEMA, TABLE_NAME, INDEX_NAME
|
GROUP BY TABLE_SCHEMA, TABLE_NAME, INDEX_NAME
|
||||||
) AS UNIQUES
|
) AS UNIQUES
|
||||||
ON (
|
ON (
|
||||||
COLUMNS.TABLE_SCHEMA = UNIQUES.TABLE_SCHEMA AND
|
|
||||||
COLUMNS.TABLE_NAME = UNIQUES.TABLE_NAME AND
|
|
||||||
COLUMNS.COLUMN_NAME = UNIQUES.FIRST_COLUMN_NAME
|
COLUMNS.COLUMN_NAME = UNIQUES.FIRST_COLUMN_NAME
|
||||||
)
|
)
|
||||||
WHERE
|
WHERE
|
||||||
@ -692,14 +690,17 @@ func (this *Inspector) getSharedColumns(originalColumns, ghostColumns *sql.Colum
|
|||||||
for _, ghostColumn := range ghostColumns.Names() {
|
for _, ghostColumn := range ghostColumns.Names() {
|
||||||
if strings.EqualFold(originalColumn, ghostColumn) {
|
if strings.EqualFold(originalColumn, ghostColumn) {
|
||||||
isSharedColumn = true
|
isSharedColumn = true
|
||||||
|
break
|
||||||
}
|
}
|
||||||
if strings.EqualFold(columnRenameMap[originalColumn], ghostColumn) {
|
if strings.EqualFold(columnRenameMap[originalColumn], ghostColumn) {
|
||||||
isSharedColumn = true
|
isSharedColumn = true
|
||||||
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
for droppedColumn := range this.migrationContext.DroppedColumnsMap {
|
for droppedColumn := range this.migrationContext.DroppedColumnsMap {
|
||||||
if strings.EqualFold(originalColumn, droppedColumn) {
|
if strings.EqualFold(originalColumn, droppedColumn) {
|
||||||
isSharedColumn = false
|
isSharedColumn = false
|
||||||
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
for _, virtualColumn := range originalVirtualColumns.Names() {
|
for _, virtualColumn := range originalVirtualColumns.Names() {
|
||||||
@ -758,9 +759,8 @@ func (this *Inspector) getMasterConnectionConfig() (applierConfig *mysql.Connect
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (this *Inspector) getReplicationLag() (replicationLag time.Duration, err error) {
|
func (this *Inspector) getReplicationLag() (replicationLag time.Duration, err error) {
|
||||||
replicationLag, err = mysql.GetReplicationLag(
|
replicationLag, err = mysql.GetReplicationLagFromSlaveStatus(
|
||||||
this.informationSchemaDb,
|
this.informationSchemaDb,
|
||||||
this.migrationContext.InspectorConnectionConfig,
|
|
||||||
)
|
)
|
||||||
return replicationLag, err
|
return replicationLag, err
|
||||||
}
|
}
|
||||||
|
@ -78,7 +78,7 @@ type Migrator struct {
|
|||||||
|
|
||||||
rowCopyCompleteFlag int64
|
rowCopyCompleteFlag int64
|
||||||
// copyRowsQueue should not be buffered; if buffered some non-damaging but
|
// copyRowsQueue should not be buffered; if buffered some non-damaging but
|
||||||
// excessive work happens at the end of the iteration as new copy-jobs arrive befroe realizing the copy is complete
|
// excessive work happens at the end of the iteration as new copy-jobs arrive before realizing the copy is complete
|
||||||
copyRowsQueue chan tableWriteFunc
|
copyRowsQueue chan tableWriteFunc
|
||||||
applyEventsQueue chan *applyEventStruct
|
applyEventsQueue chan *applyEventStruct
|
||||||
|
|
||||||
|
@ -140,8 +140,8 @@ func (this *Throttler) collectReplicationLag(firstThrottlingCollected chan<- boo
|
|||||||
if this.migrationContext.TestOnReplica || this.migrationContext.MigrateOnReplica {
|
if this.migrationContext.TestOnReplica || this.migrationContext.MigrateOnReplica {
|
||||||
// when running on replica, the heartbeat injection is also done on the replica.
|
// when running on replica, the heartbeat injection is also done on the replica.
|
||||||
// This means we will always get a good heartbeat value.
|
// This means we will always get a good heartbeat value.
|
||||||
// When runnign on replica, we should instead check the `SHOW SLAVE STATUS` output.
|
// When running on replica, we should instead check the `SHOW SLAVE STATUS` output.
|
||||||
if lag, err := mysql.GetReplicationLag(this.inspector.informationSchemaDb, this.inspector.connectionConfig); err != nil {
|
if lag, err := mysql.GetReplicationLagFromSlaveStatus(this.inspector.informationSchemaDb); err != nil {
|
||||||
return log.Errore(err)
|
return log.Errore(err)
|
||||||
} else {
|
} else {
|
||||||
atomic.StoreInt64(&this.migrationContext.CurrentLag, int64(lag))
|
atomic.StoreInt64(&this.migrationContext.CurrentLag, int64(lag))
|
||||||
|
@ -6,8 +6,14 @@
|
|||||||
package mysql
|
package mysql
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"crypto/tls"
|
||||||
|
"crypto/x509"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"io/ioutil"
|
||||||
"net"
|
"net"
|
||||||
|
|
||||||
|
"github.com/go-sql-driver/mysql"
|
||||||
)
|
)
|
||||||
|
|
||||||
// ConnectionConfig is the minimal configuration required to connect to a MySQL server
|
// ConnectionConfig is the minimal configuration required to connect to a MySQL server
|
||||||
@ -16,6 +22,7 @@ type ConnectionConfig struct {
|
|||||||
User string
|
User string
|
||||||
Password string
|
Password string
|
||||||
ImpliedKey *InstanceKey
|
ImpliedKey *InstanceKey
|
||||||
|
tlsConfig *tls.Config
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewConnectionConfig() *ConnectionConfig {
|
func NewConnectionConfig() *ConnectionConfig {
|
||||||
@ -29,9 +36,10 @@ func NewConnectionConfig() *ConnectionConfig {
|
|||||||
// DuplicateCredentials creates a new connection config with given key and with same credentials as this config
|
// DuplicateCredentials creates a new connection config with given key and with same credentials as this config
|
||||||
func (this *ConnectionConfig) DuplicateCredentials(key InstanceKey) *ConnectionConfig {
|
func (this *ConnectionConfig) DuplicateCredentials(key InstanceKey) *ConnectionConfig {
|
||||||
config := &ConnectionConfig{
|
config := &ConnectionConfig{
|
||||||
Key: key,
|
Key: key,
|
||||||
User: this.User,
|
User: this.User,
|
||||||
Password: this.Password,
|
Password: this.Password,
|
||||||
|
tlsConfig: this.tlsConfig,
|
||||||
}
|
}
|
||||||
config.ImpliedKey = &config.Key
|
config.ImpliedKey = &config.Key
|
||||||
return config
|
return config
|
||||||
@ -42,13 +50,47 @@ func (this *ConnectionConfig) Duplicate() *ConnectionConfig {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (this *ConnectionConfig) String() string {
|
func (this *ConnectionConfig) String() string {
|
||||||
return fmt.Sprintf("%s, user=%s", this.Key.DisplayString(), this.User)
|
return fmt.Sprintf("%s, user=%s, usingTLS=%t", this.Key.DisplayString(), this.User, this.tlsConfig != nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (this *ConnectionConfig) Equals(other *ConnectionConfig) bool {
|
func (this *ConnectionConfig) Equals(other *ConnectionConfig) bool {
|
||||||
return this.Key.Equals(&other.Key) || this.ImpliedKey.Equals(other.ImpliedKey)
|
return this.Key.Equals(&other.Key) || this.ImpliedKey.Equals(other.ImpliedKey)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (this *ConnectionConfig) UseTLS(caCertificatePath string, allowInsecure bool) error {
|
||||||
|
var rootCertPool *x509.CertPool
|
||||||
|
var err error
|
||||||
|
|
||||||
|
if !allowInsecure {
|
||||||
|
if caCertificatePath == "" {
|
||||||
|
rootCertPool, err = x509.SystemCertPool()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
rootCertPool = x509.NewCertPool()
|
||||||
|
pem, err := ioutil.ReadFile(caCertificatePath)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if ok := rootCertPool.AppendCertsFromPEM(pem); !ok {
|
||||||
|
return errors.New("could not add ca certificate to cert pool")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
this.tlsConfig = &tls.Config{
|
||||||
|
RootCAs: rootCertPool,
|
||||||
|
InsecureSkipVerify: allowInsecure,
|
||||||
|
}
|
||||||
|
|
||||||
|
return mysql.RegisterTLSConfig(this.Key.StringCode(), this.tlsConfig)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (this *ConnectionConfig) TLSConfig() *tls.Config {
|
||||||
|
return this.tlsConfig
|
||||||
|
}
|
||||||
|
|
||||||
func (this *ConnectionConfig) GetDBUri(databaseName string) string {
|
func (this *ConnectionConfig) GetDBUri(databaseName string) string {
|
||||||
hostname := this.Key.Hostname
|
hostname := this.Key.Hostname
|
||||||
var ip = net.ParseIP(hostname)
|
var ip = net.ParseIP(hostname)
|
||||||
@ -57,5 +99,11 @@ func (this *ConnectionConfig) GetDBUri(databaseName string) string {
|
|||||||
hostname = fmt.Sprintf("[%s]", hostname)
|
hostname = fmt.Sprintf("[%s]", hostname)
|
||||||
}
|
}
|
||||||
interpolateParams := true
|
interpolateParams := true
|
||||||
return fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?interpolateParams=%t&autocommit=true&charset=utf8mb4,utf8,latin1", this.User, this.Password, hostname, this.Key.Port, databaseName, interpolateParams)
|
// go-mysql-driver defaults to false if tls param is not provided; explicitly setting here to
|
||||||
|
// simplify construction of the DSN below.
|
||||||
|
tlsOption := "false"
|
||||||
|
if this.tlsConfig != nil {
|
||||||
|
tlsOption = this.Key.StringCode()
|
||||||
|
}
|
||||||
|
return fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?interpolateParams=%t&autocommit=true&charset=utf8mb4,utf8,latin1&tls=%s", this.User, this.Password, hostname, this.Key.Port, databaseName, interpolateParams, tlsOption)
|
||||||
}
|
}
|
||||||
|
@ -6,6 +6,7 @@
|
|||||||
package mysql
|
package mysql
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"crypto/tls"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/outbrain/golib/log"
|
"github.com/outbrain/golib/log"
|
||||||
@ -31,6 +32,10 @@ func TestDuplicateCredentials(t *testing.T) {
|
|||||||
c.Key = InstanceKey{Hostname: "myhost", Port: 3306}
|
c.Key = InstanceKey{Hostname: "myhost", Port: 3306}
|
||||||
c.User = "gromit"
|
c.User = "gromit"
|
||||||
c.Password = "penguin"
|
c.Password = "penguin"
|
||||||
|
c.tlsConfig = &tls.Config{
|
||||||
|
InsecureSkipVerify: true,
|
||||||
|
ServerName: "feathers",
|
||||||
|
}
|
||||||
|
|
||||||
dup := c.DuplicateCredentials(InstanceKey{Hostname: "otherhost", Port: 3310})
|
dup := c.DuplicateCredentials(InstanceKey{Hostname: "otherhost", Port: 3310})
|
||||||
test.S(t).ExpectEquals(dup.Key.Hostname, "otherhost")
|
test.S(t).ExpectEquals(dup.Key.Hostname, "otherhost")
|
||||||
@ -39,6 +44,7 @@ func TestDuplicateCredentials(t *testing.T) {
|
|||||||
test.S(t).ExpectEquals(dup.ImpliedKey.Port, 3310)
|
test.S(t).ExpectEquals(dup.ImpliedKey.Port, 3310)
|
||||||
test.S(t).ExpectEquals(dup.User, "gromit")
|
test.S(t).ExpectEquals(dup.User, "gromit")
|
||||||
test.S(t).ExpectEquals(dup.Password, "penguin")
|
test.S(t).ExpectEquals(dup.Password, "penguin")
|
||||||
|
test.S(t).ExpectEquals(dup.tlsConfig, c.tlsConfig)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestDuplicate(t *testing.T) {
|
func TestDuplicate(t *testing.T) {
|
||||||
@ -63,5 +69,16 @@ func TestGetDBUri(t *testing.T) {
|
|||||||
c.Password = "penguin"
|
c.Password = "penguin"
|
||||||
|
|
||||||
uri := c.GetDBUri("test")
|
uri := c.GetDBUri("test")
|
||||||
test.S(t).ExpectEquals(uri, "gromit:penguin@tcp(myhost:3306)/test?interpolateParams=true&autocommit=true&charset=utf8mb4,utf8,latin1")
|
test.S(t).ExpectEquals(uri, "gromit:penguin@tcp(myhost:3306)/test?interpolateParams=true&autocommit=true&charset=utf8mb4,utf8,latin1&tls=false")
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestGetDBUriWithTLSSetup(t *testing.T) {
|
||||||
|
c := NewConnectionConfig()
|
||||||
|
c.Key = InstanceKey{Hostname: "myhost", Port: 3306}
|
||||||
|
c.User = "gromit"
|
||||||
|
c.Password = "penguin"
|
||||||
|
c.tlsConfig = &tls.Config{}
|
||||||
|
|
||||||
|
uri := c.GetDBUri("test")
|
||||||
|
test.S(t).ExpectEquals(uri, "gromit:penguin@tcp(myhost:3306)/test?interpolateParams=true&autocommit=true&charset=utf8mb4,utf8,latin1&tls=myhost:3306")
|
||||||
}
|
}
|
||||||
|
@ -58,9 +58,8 @@ func GetDB(migrationUuid string, mysql_uri string) (*gosql.DB, bool, error) {
|
|||||||
return knownDBs[cacheKey], exists, nil
|
return knownDBs[cacheKey], exists, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetReplicationLag returns replication lag for a given connection config; either by explicit query
|
// GetReplicationLagFromSlaveStatus returns replication lag for a given db; via SHOW SLAVE STATUS
|
||||||
// or via SHOW SLAVE STATUS
|
func GetReplicationLagFromSlaveStatus(informationSchemaDb *gosql.DB) (replicationLag time.Duration, err error) {
|
||||||
func GetReplicationLag(informationSchemaDb *gosql.DB, connectionConfig *ConnectionConfig) (replicationLag time.Duration, err error) {
|
|
||||||
err = sqlutils.QueryRowsMap(informationSchemaDb, `show slave status`, func(m sqlutils.RowMap) error {
|
err = sqlutils.QueryRowsMap(informationSchemaDb, `show slave status`, func(m sqlutils.RowMap) error {
|
||||||
slaveIORunning := m.GetString("Slave_IO_Running")
|
slaveIORunning := m.GetString("Slave_IO_Running")
|
||||||
slaveSQLRunning := m.GetString("Slave_SQL_Running")
|
slaveSQLRunning := m.GetString("Slave_SQL_Running")
|
||||||
@ -84,9 +83,6 @@ func GetMasterKeyFromSlaveStatus(connectionConfig *ConnectionConfig) (masterKey
|
|||||||
}
|
}
|
||||||
defer db.Close()
|
defer db.Close()
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
err = sqlutils.QueryRowsMap(db, `show slave status`, func(rowMap sqlutils.RowMap) error {
|
err = sqlutils.QueryRowsMap(db, `show slave status`, func(rowMap sqlutils.RowMap) error {
|
||||||
// We wish to recognize the case where the topology's master actually has replication configuration.
|
// We wish to recognize the case where the topology's master actually has replication configuration.
|
||||||
// This can happen when a DBA issues a `RESET SLAVE` instead of `RESET SLAVE ALL`.
|
// This can happen when a DBA issues a `RESET SLAVE` instead of `RESET SLAVE ALL`.
|
||||||
@ -99,7 +95,6 @@ func GetMasterKeyFromSlaveStatus(connectionConfig *ConnectionConfig) (masterKey
|
|||||||
slaveIORunning := rowMap.GetString("Slave_IO_Running")
|
slaveIORunning := rowMap.GetString("Slave_IO_Running")
|
||||||
slaveSQLRunning := rowMap.GetString("Slave_SQL_Running")
|
slaveSQLRunning := rowMap.GetString("Slave_SQL_Running")
|
||||||
|
|
||||||
//
|
|
||||||
if slaveIORunning != "Yes" || slaveSQLRunning != "Yes" {
|
if slaveIORunning != "Yes" || slaveSQLRunning != "Yes" {
|
||||||
return fmt.Errorf("Replication on %+v is broken: Slave_IO_Running: %s, Slave_SQL_Running: %s. Please make sure replication runs before using gh-ost.",
|
return fmt.Errorf("Replication on %+v is broken: Slave_IO_Running: %s, Slave_SQL_Running: %s. Please make sure replication runs before using gh-ost.",
|
||||||
connectionConfig.Key,
|
connectionConfig.Key,
|
||||||
|
@ -140,13 +140,12 @@ func BuildRangeComparison(columns []string, values []string, args []interface{},
|
|||||||
comparisons := []string{}
|
comparisons := []string{}
|
||||||
|
|
||||||
for i, column := range columns {
|
for i, column := range columns {
|
||||||
//
|
|
||||||
value := values[i]
|
value := values[i]
|
||||||
rangeComparison, err := BuildValueComparison(column, value, comparisonSign)
|
rangeComparison, err := BuildValueComparison(column, value, comparisonSign)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", explodedArgs, err
|
return "", explodedArgs, err
|
||||||
}
|
}
|
||||||
if len(columns[0:i]) > 0 {
|
if i > 0 {
|
||||||
equalitiesComparison, err := BuildEqualsComparison(columns[0:i], values[0:i])
|
equalitiesComparison, err := BuildEqualsComparison(columns[0:i], values[0:i])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", explodedArgs, err
|
return "", explodedArgs, err
|
||||||
|
@ -17,7 +17,7 @@ create event gh_ost_test
|
|||||||
starts current_timestamp
|
starts current_timestamp
|
||||||
ends current_timestamp + interval 60 second
|
ends current_timestamp + interval 60 second
|
||||||
on completion not preserve
|
on completion not preserve
|
||||||
enable
|
disable on slave
|
||||||
do
|
do
|
||||||
begin
|
begin
|
||||||
insert into gh_ost_test values (null, 11, now(), now(), now(), 0);
|
insert into gh_ost_test values (null, 11, now(), now(), now(), 0);
|
||||||
|
@ -213,6 +213,7 @@ test_single() {
|
|||||||
diff $orig_content_output_file $ghost_content_output_file
|
diff $orig_content_output_file $ghost_content_output_file
|
||||||
|
|
||||||
echo "diff $orig_content_output_file $ghost_content_output_file"
|
echo "diff $orig_content_output_file $ghost_content_output_file"
|
||||||
|
|
||||||
return 1
|
return 1
|
||||||
fi
|
fi
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user