Merge branch 'master' into fix-variables-spelling
This commit is contained in:
commit
36bcc031ce
4
.github/PULL_REQUEST_TEMPLATE.md
vendored
4
.github/PULL_REQUEST_TEMPLATE.md
vendored
@ -16,6 +16,4 @@ This PR [briefly explain what is does]
|
||||
> In case this PR introduced Go code changes:
|
||||
|
||||
- [ ] contributed code is using same conventions as original code
|
||||
- [ ] code is formatted via `gofmt` (please avoid `goimports`)
|
||||
- [ ] code is built via `./build.sh`
|
||||
- [ ] code is tested via `./test.sh`
|
||||
- [ ] `script/cibuild` returns with no formatting errors, build errors or unit test errors.
|
||||
|
21
.travis.yml
21
.travis.yml
@ -1,7 +1,20 @@
|
||||
# http://docs.travis-ci.com/user/languages/go/
|
||||
language: go
|
||||
|
||||
go:
|
||||
- 1.6
|
||||
- tip
|
||||
go: 1.8
|
||||
|
||||
script: ./test.sh
|
||||
os:
|
||||
- linux
|
||||
|
||||
env:
|
||||
- MYSQL_USER=root
|
||||
|
||||
before_install:
|
||||
- mysql -e 'CREATE DATABASE IF NOT EXISTS test;'
|
||||
|
||||
install: true
|
||||
|
||||
script: script/cibuild
|
||||
|
||||
notifications:
|
||||
email: false
|
||||
|
@ -1,5 +1,7 @@
|
||||
# gh-ost
|
||||
|
||||
[![build status](https://travis-ci.org/github/gh-ost.svg)](https://travis-ci.org/github/gh-ost) [![downloads](https://img.shields.io/github/downloads/github/gh-ost/total.svg)](https://github.com/github/gh-ost/releases) [![release](https://img.shields.io/github/release/github/gh-ost.svg)](https://github.com/github/gh-ost/releases)
|
||||
|
||||
#### GitHub's online schema migration for MySQL <img src="doc/images/gh-ost-logo-light-160.png" align="right">
|
||||
|
||||
`gh-ost` is a triggerless online schema migration solution for MySQL. It is testable and provides pausability, dynamic control/reconfiguration, auditing, and many operational perks.
|
||||
@ -62,6 +64,7 @@ Also see:
|
||||
- [what if?](doc/what-if.md)
|
||||
- [the fine print](doc/the-fine-print.md)
|
||||
- [Community questions](https://github.com/github/gh-ost/issues?q=label%3Aquestion)
|
||||
- [Using `gh-ost` on AWS RDS](doc/rds.md)
|
||||
|
||||
## What's in a name?
|
||||
|
||||
@ -81,6 +84,8 @@ But then a rare genetic mutation happened, and the `c` transformed into `t`. And
|
||||
|
||||
We develop `gh-ost` at GitHub and for the community. We may have different priorities than others. From time to time we may suggest a contribution that is not on our immediate roadmap but which may appeal to others.
|
||||
|
||||
Please see [Coding gh-ost](https://github.com/github/gh-ost/blob/develdocs/doc/coding-ghost.md) for a guide to getting started developing with gh-ost.
|
||||
|
||||
## Download/binaries/source
|
||||
|
||||
`gh-ost` is now GA and stable.
|
||||
@ -89,7 +94,9 @@ We develop `gh-ost` at GitHub and for the community. We may have different prior
|
||||
|
||||
[Download latest release here](https://github.com/github/gh-ost/releases/latest)
|
||||
|
||||
`gh-ost` is a Go project; it is built with Go 1.5 with "experimental vendor". Soon to migrate to Go 1.6. See and use [build file](https://github.com/github/gh-ost/blob/master/build.sh) for compiling it on your own.
|
||||
`gh-ost` is a Go project; it is built with Go `1.8` (though `1.7` should work as well). To build on your own, use either:
|
||||
- [script/build](https://github.com/github/gh-ost/blob/master/script/build) - this is the same build script used by CI hence the authoritative; artifact is `./bin/gh-ost` binary.
|
||||
- [build.sh](https://github.com/github/gh-ost/blob/master/build.sh) for building `tar.gz` artifacts in `/tmp/gh-ost`
|
||||
|
||||
Generally speaking, `master` branch is stable, but only [releases](https://github.com/github/gh-ost/releases) are to be used in production.
|
||||
|
||||
|
@ -1 +1 @@
|
||||
1.0.35
|
||||
1.0.36
|
||||
|
19
doc/coding-ghost.md
Normal file
19
doc/coding-ghost.md
Normal file
@ -0,0 +1,19 @@
|
||||
# Getting started with gh-ost development.
|
||||
|
||||
## Overview
|
||||
|
||||
Getting started with gh-ost development is simple!
|
||||
|
||||
- First obtain the repository with `git clone` or `go get`.
|
||||
- From inside of the repository run `script/cibuild`
|
||||
- This will bootstrap the environment if needed, format the code, build the code, and then run the unit test.
|
||||
|
||||
## CI build workflow
|
||||
|
||||
`script/cibuild` performs the following actions will bootstrap the environment to build `gh-ost` correctly, build, perform syntax checks and run unit tests.
|
||||
|
||||
If additional steps are needed, please add them into this workflow so that the workflow remains simple.
|
||||
|
||||
## Notes:
|
||||
|
||||
Currently, `script/ensure-go-installed` will install `go` for Mac OS X and Linux. We welcome PR's to add other platforms.
|
@ -131,6 +131,14 @@ See `approve-renamed-columns`
|
||||
|
||||
Issue the migration on a replica; do not modify data on master. Useful for validating, testing and benchmarking. See [testing-on-replica](testing-on-replica.md)
|
||||
|
||||
### throttle-control-replicas
|
||||
|
||||
Provide a command delimited list of replicas; `gh-ost` will throttle when any of the given replicas lag beyond `--max-lag-millis`. The list can be queried and updated dynamically via [interactive commands](interactive-commands.md)
|
||||
|
||||
### throttle-http
|
||||
|
||||
Provide a HTTP endpoint; `gh-ost` will issue `HEAD` requests on given URL and throttle whenever response status code is not `200`. The URL can be queried and updated dynamically via [interactive commands](interactive-commands.md). Empty URL disables the HTTP check.
|
||||
|
||||
### timestamp-old-table
|
||||
|
||||
Makes the _old_ table include a timestamp value. The _old_ table is what the original table is renamed to at the end of a successful migration. For example, if the table is `gh_ost_test`, then the _old_ table would normally be `_gh_ost_test_del`. With `--timestamp-old-table` it would be, for example, `_gh_ost_test_20170221103147_del`.
|
||||
|
@ -17,6 +17,7 @@ Both interfaces may serve at the same time. Both respond to simple text command,
|
||||
- `help`: shows a brief list of available commands
|
||||
- `status`: returns a detailed status summary of migration progress and configuration
|
||||
- `sup`: returns a brief status summary of migration progress
|
||||
- `coordinates`: returns recent (though not exactly up to date) binary log coordinates of the inspected server
|
||||
- `chunk-size=<newsize>`: modify the `chunk-size`; applies on next running copy-iteration
|
||||
- `max-lag-millis=<max-lag>`: modify the maximum replication lag threshold (milliseconds, minimum value is `100`, i.e. `0.1` second)
|
||||
- `max-load=<max-load-thresholds>`: modify the `max-load` config; applies on next running copy-iteration
|
||||
@ -31,6 +32,7 @@ Both interfaces may serve at the same time. Both respond to simple text command,
|
||||
- `nice-ratio=0.5` will cause `gh-ost` to sleep for `50ms` immediately following.
|
||||
- `nice-ratio=1` will cause `gh-ost` to sleep for `100ms`, effectively doubling runtime
|
||||
- value of `2` will effectively triple the runtime; etc.
|
||||
- `throttle-http`: change throttle HTTP endpoint
|
||||
- `throttle-query`: change throttle query
|
||||
- `throttle-control-replicas='replica1,replica2'`: change list of throttle-control replicas, these are replicas `gh-ost` will check. This takes a comma separated list of replica's to check and replaces the previous list.
|
||||
- `throttle`: force migration suspend
|
||||
|
45
doc/rds.md
Normal file
45
doc/rds.md
Normal file
@ -0,0 +1,45 @@
|
||||
`gh-ost` has been updated to work with Amazon RDS however due to GitHub not relying using AWS for databases, this documentation is community driven so if you find a bug please [open an issue][new_issue]!
|
||||
|
||||
# Amazon RDS
|
||||
|
||||
## Limitations
|
||||
|
||||
- No `SUPER` privileges.
|
||||
- `gh-ost` runs should be setup use [`--assume-rbr`][assume_rbr_docs] and use `binlog_format=ROW`.
|
||||
- Aurora does not allow editing of the `read_only` parameter. While it is defined as `{TrueIfReplica}`, the parameter is non-modifiable field.
|
||||
|
||||
## Aurora
|
||||
|
||||
#### Replication
|
||||
|
||||
In Aurora replication, you have separate reader and writer endpoints however because the cluster shares the underlying storage layer, `gh-ost` will detect it is running on the master. This becomes an issue when you wish to use [migrate/test on replica][migrate_test_on_replica_docs] because you won't be able to use a single cluster in the same way you would with MySQL RDS.
|
||||
|
||||
To work around this, you can follow along the [AWS replication between clusters documentation][aws_replication_docs] for Aurora with one small caveat. For the "Create a Snapshot of Your Replication Master" step, the binlog position is not available in the AWS console. You will need to issue the SQL query `SHOW SLAVE STATUS` or `aws rds describe-events` API call to get the correct position.
|
||||
|
||||
#### Percona Toolkit
|
||||
|
||||
If you use `pt-table-checksum` as a part of your data integrity checks, you might want to check out [this patch][percona_toolkit_patch] which will enable you to run `pt-table-checksum` with the `--no-binlog-format-check` flag and prevent errors like the following:
|
||||
|
||||
```
|
||||
03-24T12:51:06 Failed to /*!50108 SET @@binlog_format := 'STATEMENT'*/: DBD::mysql::db do failed: Access denied; you need (at least one of) the SUPER privilege(s) for this operation [for Statement "/*!50108 SET @@binlog_format := 'STATEMENT'*/"] at pt-table-checksum line 9292.
|
||||
|
||||
This tool requires binlog_format=STATEMENT, but the current binlog_format is set to ROW and an error occurred while attempting to change it. If running MySQL 5.1.29 or newer, setting binlog_format requires the SUPER privilege. You will need to manually set binlog_format to 'STATEMENT' before running this tool.
|
||||
```
|
||||
|
||||
#### Preflight checklist
|
||||
|
||||
Before trying to run any `gh-ost` migrations you will want to confirm the following:
|
||||
|
||||
- [ ] You have a secondary cluster available that will act as a replica. Rule of thumb here has been a 1 instance per cluster to mimic MySQL-style replication as opposed to Aurora style.
|
||||
- [ ] The database instance parameters and database cluster parameters are consistent between your master and replicas
|
||||
- [ ] Executing `SHOW SLAVE STATUS\G` on your replica cluster displays the correct master host, binlog position, etc.
|
||||
- [ ] Database backup retention is greater than 1 day to enable binlogs
|
||||
- [ ] You have setup [`hooks`][ghost_hooks] to issue RDS procedures for stopping and starting replication. (see [github/gh-ost#163][ghost_rds_issue_tracking] for examples)
|
||||
|
||||
[new_issue]: https://github.com/github/gh-ost/issues/new
|
||||
[assume_rbr_docs]: https://github.com/github/gh-ost/blob/master/doc/command-line-flags.md#assume-rbr
|
||||
[migrate_test_on_replica_docs]: https://github.com/github/gh-ost/blob/master/doc/cheatsheet.md#c-migratetest-on-replica
|
||||
[aws_replication_docs]: http://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/Aurora.Overview.Replication.MySQLReplication.html
|
||||
[percona_toolkit_patch]: https://github.com/jacobbednarz/percona-toolkit/commit/0271ba6a094da446a5e5bb8d99b5c26f1777f2b9
|
||||
[ghost_hooks]: https://github.com/github/gh-ost/blob/master/doc/hooks.md
|
||||
[ghost_rds_issue_tracking]: https://github.com/github/gh-ost/issues/163
|
@ -40,8 +40,8 @@ The `SUPER` privilege is required for `STOP SLAVE`, `START SLAVE` operations. Th
|
||||
- It is not allowed to migrate a table where another table exists with same name and different upper/lower case.
|
||||
- For example, you may not migrate `MyTable` if another table called `MYtable` exists in the same schema.
|
||||
|
||||
- Amazon RDS and Google Cloud SQL are currently not supported
|
||||
- We began working towards removing this limitation. See tracking issue: https://github.com/github/gh-ost/issues/163
|
||||
- Amazon RDS works, but has it's own [limitations](rds.md).
|
||||
- Google Cloud SQL is currently not supported
|
||||
|
||||
- Multisource is not supported when migrating via replica. It _should_ work (but never tested) when connecting directly to master (`--allow-on-master`)
|
||||
|
||||
|
@ -42,6 +42,11 @@ type ThrottleReasonHint string
|
||||
const (
|
||||
NoThrottleReasonHint ThrottleReasonHint = "NoThrottleReasonHint"
|
||||
UserCommandThrottleReasonHint = "UserCommandThrottleReasonHint"
|
||||
LeavingHibernationThrottleReasonHint = "LeavingHibernationThrottleReasonHint"
|
||||
)
|
||||
|
||||
const (
|
||||
HTTPStatusOK = 200
|
||||
)
|
||||
|
||||
var (
|
||||
@ -99,10 +104,13 @@ type MigrationContext struct {
|
||||
ThrottleFlagFile string
|
||||
ThrottleAdditionalFlagFile string
|
||||
throttleQuery string
|
||||
throttleHTTP string
|
||||
ThrottleCommandedByUser int64
|
||||
HibernateUntil int64
|
||||
maxLoad LoadMap
|
||||
criticalLoad LoadMap
|
||||
CriticalLoadIntervalMilliseconds int64
|
||||
CriticalLoadHibernateSeconds int64
|
||||
PostponeCutOverFlagFile string
|
||||
CutOverLockTimeoutSeconds int64
|
||||
ForceNamedCutOverCommand bool
|
||||
@ -148,6 +156,7 @@ type MigrationContext struct {
|
||||
pointOfInterestTime time.Time
|
||||
pointOfInterestTimeMutex *sync.Mutex
|
||||
CurrentLag int64
|
||||
ThrottleHTTPStatusCode int64
|
||||
controlReplicasLagResult mysql.ReplicationLagResult
|
||||
TotalRowsCopied int64
|
||||
TotalDMLEventsApplied int64
|
||||
@ -157,6 +166,7 @@ type MigrationContext struct {
|
||||
throttleReasonHint ThrottleReasonHint
|
||||
throttleGeneralCheckResult ThrottleCheckResult
|
||||
throttleMutex *sync.Mutex
|
||||
throttleHTTPMutex *sync.Mutex
|
||||
IsPostponingCutOver int64
|
||||
CountingRowsFlag int64
|
||||
AllEventsUpToLockProcessedInjectedFlag int64
|
||||
@ -174,6 +184,7 @@ type MigrationContext struct {
|
||||
UniqueKey *sql.UniqueKey
|
||||
SharedColumns *sql.ColumnList
|
||||
ColumnRenameMap map[string]string
|
||||
DroppedColumnsMap map[string]bool
|
||||
MappedSharedColumns *sql.ColumnList
|
||||
MigrationRangeMinValues *sql.ColumnValues
|
||||
MigrationRangeMaxValues *sql.ColumnValues
|
||||
@ -181,6 +192,8 @@ type MigrationContext struct {
|
||||
MigrationIterationRangeMinValues *sql.ColumnValues
|
||||
MigrationIterationRangeMaxValues *sql.ColumnValues
|
||||
|
||||
recentBinlogCoordinates mysql.BinlogCoordinates
|
||||
|
||||
CanStopStreaming func() bool
|
||||
}
|
||||
|
||||
@ -215,6 +228,7 @@ func newMigrationContext() *MigrationContext {
|
||||
maxLoad: NewLoadMap(),
|
||||
criticalLoad: NewLoadMap(),
|
||||
throttleMutex: &sync.Mutex{},
|
||||
throttleHTTPMutex: &sync.Mutex{},
|
||||
throttleControlReplicaKeys: mysql.NewInstanceKeyMap(),
|
||||
configMutex: &sync.Mutex{},
|
||||
pointOfInterestTimeMutex: &sync.Mutex{},
|
||||
@ -472,12 +486,10 @@ func (this *MigrationContext) IsThrottled() (bool, string, ThrottleReasonHint) {
|
||||
}
|
||||
|
||||
func (this *MigrationContext) GetThrottleQuery() string {
|
||||
var query string
|
||||
|
||||
this.throttleMutex.Lock()
|
||||
defer this.throttleMutex.Unlock()
|
||||
|
||||
query = this.throttleQuery
|
||||
var query = this.throttleQuery
|
||||
return query
|
||||
}
|
||||
|
||||
@ -488,6 +500,21 @@ func (this *MigrationContext) SetThrottleQuery(newQuery string) {
|
||||
this.throttleQuery = newQuery
|
||||
}
|
||||
|
||||
func (this *MigrationContext) GetThrottleHTTP() string {
|
||||
this.throttleHTTPMutex.Lock()
|
||||
defer this.throttleHTTPMutex.Unlock()
|
||||
|
||||
var throttleHTTP = this.throttleHTTP
|
||||
return throttleHTTP
|
||||
}
|
||||
|
||||
func (this *MigrationContext) SetThrottleHTTP(throttleHTTP string) {
|
||||
this.throttleHTTPMutex.Lock()
|
||||
defer this.throttleHTTPMutex.Unlock()
|
||||
|
||||
this.throttleHTTP = throttleHTTP
|
||||
}
|
||||
|
||||
func (this *MigrationContext) GetMaxLoad() LoadMap {
|
||||
this.throttleMutex.Lock()
|
||||
defer this.throttleMutex.Unlock()
|
||||
@ -522,6 +549,19 @@ func (this *MigrationContext) SetNiceRatio(newRatio float64) {
|
||||
this.niceRatio = newRatio
|
||||
}
|
||||
|
||||
func (this *MigrationContext) GetRecentBinlogCoordinates() mysql.BinlogCoordinates {
|
||||
this.throttleMutex.Lock()
|
||||
defer this.throttleMutex.Unlock()
|
||||
|
||||
return this.recentBinlogCoordinates
|
||||
}
|
||||
|
||||
func (this *MigrationContext) SetRecentBinlogCoordinates(coordinates mysql.BinlogCoordinates) {
|
||||
this.throttleMutex.Lock()
|
||||
defer this.throttleMutex.Unlock()
|
||||
this.recentBinlogCoordinates = coordinates
|
||||
}
|
||||
|
||||
// ReadMaxLoad parses the `--max-load` flag, which is in multiple key-value format,
|
||||
// such as: 'Threads_running=100,Threads_connected=500'
|
||||
// It only applies changes in case there's no parsing error.
|
||||
|
@ -160,6 +160,10 @@ func (this *GoMySQLReader) StreamEvents(canStopStreaming func() bool, entriesCha
|
||||
}
|
||||
|
||||
func (this *GoMySQLReader) Close() error {
|
||||
this.binlogSyncer.Close()
|
||||
// Historically there was a:
|
||||
// this.binlogSyncer.Close()
|
||||
// here. A new go-mysql version closes the binlog syncer connection independently.
|
||||
// I will go against the sacred rules of comments and just leave this here.
|
||||
// This is the year 2017. Let's see what year these comments get deleted.
|
||||
return nil
|
||||
}
|
||||
|
@ -93,6 +93,7 @@ func main() {
|
||||
replicationLagQuery := flag.String("replication-lag-query", "", "Deprecated. gh-ost uses an internal, subsecond resolution query")
|
||||
throttleControlReplicas := flag.String("throttle-control-replicas", "", "List of replicas on which to check for lag; comma delimited. Example: myhost1.com:3306,myhost2.com,myhost3.com:3307")
|
||||
throttleQuery := flag.String("throttle-query", "", "when given, issued (every second) to check if operation should throttle. Expecting to return zero for no-throttle, >0 for throttle. Query is issued on the migrated server. Make sure this query is lightweight")
|
||||
throttleHTTP := flag.String("throttle-http", "", "when given, gh-ost checks given URL via HEAD request; any response code other than 200 (OK) causes throttling; make sure it has low latency response")
|
||||
heartbeatIntervalMillis := flag.Int64("heartbeat-interval-millis", 100, "how frequently would gh-ost inject a heartbeat value")
|
||||
flag.StringVar(&migrationContext.ThrottleFlagFile, "throttle-flag-file", "", "operation pauses when this file exists; hint: use a file that is specific to the table being altered")
|
||||
flag.StringVar(&migrationContext.ThrottleAdditionalFlagFile, "throttle-additional-flag-file", "/tmp/gh-ost.throttle", "operation pauses when this file exists; hint: keep default, use for throttling multiple gh-ost operations")
|
||||
@ -111,6 +112,7 @@ func main() {
|
||||
maxLoad := flag.String("max-load", "", "Comma delimited status-name=threshold. e.g: 'Threads_running=100,Threads_connected=500'. When status exceeds threshold, app throttles writes")
|
||||
criticalLoad := flag.String("critical-load", "", "Comma delimited status-name=threshold, same format as --max-load. When status exceeds threshold, app panics and quits")
|
||||
flag.Int64Var(&migrationContext.CriticalLoadIntervalMilliseconds, "critical-load-interval-millis", 0, "When 0, migration immediately bails out upon meeting critical-load. When non-zero, a second check is done after given interval, and migration only bails out if 2nd check still meets critical load")
|
||||
flag.Int64Var(&migrationContext.CriticalLoadHibernateSeconds, "critical-load-hibernate-seconds", 0, "When nonzero, critical-load does not panic and bail out; instead, gh-ost goes into hibernate for the specified duration. It will not read/write anything to from/to any server")
|
||||
quiet := flag.Bool("quiet", false, "quiet")
|
||||
verbose := flag.Bool("verbose", false, "verbose")
|
||||
debug := flag.Bool("debug", false, "debug mode (very verbose)")
|
||||
@ -228,6 +230,7 @@ func main() {
|
||||
migrationContext.SetDMLBatchSize(*dmlBatchSize)
|
||||
migrationContext.SetMaxLagMillisecondsThrottleThreshold(*maxLagMillis)
|
||||
migrationContext.SetThrottleQuery(*throttleQuery)
|
||||
migrationContext.SetThrottleHTTP(*throttleHTTP)
|
||||
migrationContext.SetDefaultNumRetries(*defaultRetries)
|
||||
migrationContext.ApplyCredentials()
|
||||
if err := migrationContext.SetCutOverLockTimeoutSeconds(*cutOverLockTimeoutSeconds); err != nil {
|
||||
|
@ -293,6 +293,9 @@ func (this *Applier) WriteChangelogState(value string) (string, error) {
|
||||
func (this *Applier) InitiateHeartbeat() {
|
||||
var numSuccessiveFailures int64
|
||||
injectHeartbeat := func() error {
|
||||
if atomic.LoadInt64(&this.migrationContext.HibernateUntil) > 0 {
|
||||
return nil
|
||||
}
|
||||
if _, err := this.WriteChangelog("heartbeat", time.Now().Format(time.RFC3339Nano)); err != nil {
|
||||
numSuccessiveFailures++
|
||||
if numSuccessiveFailures > this.migrationContext.MaxRetries() {
|
||||
|
@ -662,7 +662,14 @@ func (this *Inspector) getSharedColumns(originalColumns, ghostColumns *sql.Colum
|
||||
}
|
||||
sharedColumnNames := []string{}
|
||||
for _, originalColumn := range originalColumns.Names() {
|
||||
isSharedColumn := false
|
||||
if columnsInGhost[originalColumn] || columnsInGhost[columnRenameMap[originalColumn]] {
|
||||
isSharedColumn = true
|
||||
}
|
||||
if this.migrationContext.DroppedColumnsMap[originalColumn] {
|
||||
isSharedColumn = false
|
||||
}
|
||||
if isSharedColumn {
|
||||
sharedColumnNames = append(sharedColumnNames, originalColumn)
|
||||
}
|
||||
}
|
||||
|
@ -96,7 +96,7 @@ func NewMigrator() *Migrator {
|
||||
migrationContext: base.GetMigrationContext(),
|
||||
parser: sql.NewParser(),
|
||||
ghostTableMigrated: make(chan bool),
|
||||
firstThrottlingCollected: make(chan bool, 1),
|
||||
firstThrottlingCollected: make(chan bool, 3),
|
||||
rowCopyComplete: make(chan bool),
|
||||
allEventsUpToLockProcessed: make(chan string),
|
||||
|
||||
@ -248,6 +248,7 @@ func (this *Migrator) validateStatement() (err error) {
|
||||
}
|
||||
log.Infof("Alter statement has column(s) renamed. gh-ost finds the following renames: %v; --approve-renamed-columns is given and so migration proceeds.", this.parser.GetNonTrivialRenames())
|
||||
}
|
||||
this.migrationContext.DroppedColumnsMap = this.parser.DroppedColumnsMap()
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -952,6 +953,13 @@ func (this *Migrator) initiateStreaming() error {
|
||||
}
|
||||
log.Debugf("Done streaming")
|
||||
}()
|
||||
|
||||
go func() {
|
||||
ticker := time.Tick(1 * time.Second)
|
||||
for range ticker {
|
||||
this.migrationContext.SetRecentBinlogCoordinates(*this.eventsStreamer.GetCurrentBinlogCoordinates())
|
||||
}
|
||||
}()
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -977,7 +985,8 @@ func (this *Migrator) initiateThrottler() error {
|
||||
go this.throttler.initiateThrottlerCollection(this.firstThrottlingCollected)
|
||||
log.Infof("Waiting for first throttle metrics to be collected")
|
||||
<-this.firstThrottlingCollected // replication lag
|
||||
<-this.firstThrottlingCollected // other metrics
|
||||
<-this.firstThrottlingCollected // HTTP status
|
||||
<-this.firstThrottlingCollected // other, general metrics
|
||||
log.Infof("First throttle metrics collected")
|
||||
go this.throttler.initiateThrottlerChecks()
|
||||
|
||||
|
@ -98,8 +98,13 @@ func (this *Server) Serve() (err error) {
|
||||
}
|
||||
|
||||
func (this *Server) handleConnection(conn net.Conn) (err error) {
|
||||
if conn != nil {
|
||||
defer conn.Close()
|
||||
}
|
||||
command, _, err := bufio.NewReader(conn).ReadLine()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return this.onServerCommand(string(command), bufio.NewWriter(conn))
|
||||
}
|
||||
|
||||
@ -139,6 +144,7 @@ func (this *Server) applyServerCommand(command string, writer *bufio.Writer) (pr
|
||||
fmt.Fprintln(writer, `available commands:
|
||||
status # Print a detailed status message
|
||||
sup # Print a short status message
|
||||
coordinates # Print the currently inspected coordinates
|
||||
chunk-size=<newsize> # Set a new chunk-size
|
||||
nice-ratio=<ratio> # Set a new nice-ratio, immediate sleep after each row-copy operation, float (examples: 0 is agrressive, 0.7 adds 70% runtime, 1.0 doubles runtime, 2.0 triples runtime, ...)
|
||||
critical-load=<load> # Set a new set of max-load thresholds
|
||||
@ -146,6 +152,7 @@ max-lag-millis=<max-lag> # Set a new replication lag threshold
|
||||
replication-lag-query=<query> # Set a new query that determines replication lag (no quotes)
|
||||
max-load=<load> # Set a new set of max-load thresholds
|
||||
throttle-query=<query> # Set a new throttle-query (no quotes)
|
||||
throttle-http=<URL> # Set a new throttle URL
|
||||
throttle-control-replicas=<replicas> # Set a new comma delimited list of throttle control replicas
|
||||
throttle # Force throttling
|
||||
no-throttle # End forced throttling (other throttling may still apply)
|
||||
@ -159,6 +166,14 @@ help # This message
|
||||
return ForcePrintStatusOnlyRule, nil
|
||||
case "info", "status":
|
||||
return ForcePrintStatusAndHintRule, nil
|
||||
case "coordinates":
|
||||
{
|
||||
if argIsQuestion || arg == "" {
|
||||
fmt.Fprintf(writer, "%+v\n", this.migrationContext.GetRecentBinlogCoordinates())
|
||||
return NoPrintStatusRule, nil
|
||||
}
|
||||
return NoPrintStatusRule, fmt.Errorf("coordinates are read-only")
|
||||
}
|
||||
case "chunk-size":
|
||||
{
|
||||
if argIsQuestion {
|
||||
@ -236,6 +251,16 @@ help # This message
|
||||
fmt.Fprintf(writer, throttleHint)
|
||||
return ForcePrintStatusAndHintRule, nil
|
||||
}
|
||||
case "throttle-http":
|
||||
{
|
||||
if argIsQuestion {
|
||||
fmt.Fprintf(writer, "%+v\n", this.migrationContext.GetThrottleHTTP())
|
||||
return NoPrintStatusRule, nil
|
||||
}
|
||||
this.migrationContext.SetThrottleHTTP(arg)
|
||||
fmt.Fprintf(writer, throttleHint)
|
||||
return ForcePrintStatusAndHintRule, nil
|
||||
}
|
||||
case "throttle-control-replicas":
|
||||
{
|
||||
if argIsQuestion {
|
||||
|
@ -7,6 +7,7 @@ package logic
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
@ -37,10 +38,19 @@ func NewThrottler(applier *Applier, inspector *Inspector) *Throttler {
|
||||
// It merely observes the metrics collected by other components, it does not issue
|
||||
// its own metric collection.
|
||||
func (this *Throttler) shouldThrottle() (result bool, reason string, reasonHint base.ThrottleReasonHint) {
|
||||
if hibernateUntil := atomic.LoadInt64(&this.migrationContext.HibernateUntil); hibernateUntil > 0 {
|
||||
hibernateUntilTime := time.Unix(0, hibernateUntil)
|
||||
return true, fmt.Sprintf("critical-load-hibernate until %+v", hibernateUntilTime), base.NoThrottleReasonHint
|
||||
}
|
||||
generalCheckResult := this.migrationContext.GetThrottleGeneralCheckResult()
|
||||
if generalCheckResult.ShouldThrottle {
|
||||
return generalCheckResult.ShouldThrottle, generalCheckResult.Reason, generalCheckResult.ReasonHint
|
||||
}
|
||||
// HTTP throttle
|
||||
statusCode := atomic.LoadInt64(&this.migrationContext.ThrottleHTTPStatusCode)
|
||||
if statusCode != 0 && statusCode != http.StatusOK {
|
||||
return true, fmt.Sprintf("http=%d", statusCode), base.NoThrottleReasonHint
|
||||
}
|
||||
// Replication lag throttle
|
||||
maxLagMillisecondsThrottleThreshold := atomic.LoadInt64(&this.migrationContext.MaxLagMillisecondsThrottleThreshold)
|
||||
lag := atomic.LoadInt64(&this.migrationContext.CurrentLag)
|
||||
@ -90,6 +100,9 @@ func (this *Throttler) collectReplicationLag(firstThrottlingCollected chan<- boo
|
||||
if atomic.LoadInt64(&this.migrationContext.CleanupImminentFlag) > 0 {
|
||||
return nil
|
||||
}
|
||||
if atomic.LoadInt64(&this.migrationContext.HibernateUntil) > 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
if this.migrationContext.TestOnReplica || this.migrationContext.MigrateOnReplica {
|
||||
// when running on replica, the heartbeat injection is also done on the replica.
|
||||
@ -122,6 +135,10 @@ func (this *Throttler) collectReplicationLag(firstThrottlingCollected chan<- boo
|
||||
// collectControlReplicasLag polls all the control replicas to get maximum lag value
|
||||
func (this *Throttler) collectControlReplicasLag() {
|
||||
|
||||
if atomic.LoadInt64(&this.migrationContext.HibernateUntil) > 0 {
|
||||
return
|
||||
}
|
||||
|
||||
replicationLagQuery := fmt.Sprintf(`
|
||||
select value from %s.%s where hint = 'heartbeat' and id <= 255
|
||||
`,
|
||||
@ -213,8 +230,40 @@ func (this *Throttler) criticalLoadIsMet() (met bool, variableName string, value
|
||||
return false, variableName, value, threshold, nil
|
||||
}
|
||||
|
||||
// collectReplicationLag reads the latest changelog heartbeat value
|
||||
func (this *Throttler) collectThrottleHTTPStatus(firstThrottlingCollected chan<- bool) {
|
||||
collectFunc := func() (sleep bool, err error) {
|
||||
if atomic.LoadInt64(&this.migrationContext.HibernateUntil) > 0 {
|
||||
return true, nil
|
||||
}
|
||||
url := this.migrationContext.GetThrottleHTTP()
|
||||
if url == "" {
|
||||
return true, nil
|
||||
}
|
||||
resp, err := http.Head(url)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
atomic.StoreInt64(&this.migrationContext.ThrottleHTTPStatusCode, int64(resp.StatusCode))
|
||||
return false, nil
|
||||
}
|
||||
|
||||
collectFunc()
|
||||
firstThrottlingCollected <- true
|
||||
|
||||
ticker := time.Tick(100 * time.Millisecond)
|
||||
for range ticker {
|
||||
if sleep, _ := collectFunc(); sleep {
|
||||
time.Sleep(1 * time.Second)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// collectGeneralThrottleMetrics reads the once-per-sec metrics, and stores them onto this.migrationContext
|
||||
func (this *Throttler) collectGeneralThrottleMetrics() error {
|
||||
if atomic.LoadInt64(&this.migrationContext.HibernateUntil) > 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
setThrottle := func(throttle bool, reason string, reasonHint base.ThrottleReasonHint) error {
|
||||
this.migrationContext.SetThrottleGeneralCheckResult(base.NewThrottleCheckResult(throttle, reason, reasonHint))
|
||||
@ -232,6 +281,20 @@ func (this *Throttler) collectGeneralThrottleMetrics() error {
|
||||
if err != nil {
|
||||
return setThrottle(true, fmt.Sprintf("%s %s", variableName, err), base.NoThrottleReasonHint)
|
||||
}
|
||||
|
||||
if criticalLoadMet && this.migrationContext.CriticalLoadHibernateSeconds > 0 {
|
||||
hibernateDuration := time.Duration(this.migrationContext.CriticalLoadHibernateSeconds) * time.Second
|
||||
hibernateUntilTime := time.Now().Add(hibernateDuration)
|
||||
atomic.StoreInt64(&this.migrationContext.HibernateUntil, hibernateUntilTime.UnixNano())
|
||||
log.Errorf("critical-load met: %s=%d, >=%d. Will hibernate for the duration of %+v, until %+v", variableName, value, threshold, hibernateDuration, hibernateUntilTime)
|
||||
go func() {
|
||||
time.Sleep(hibernateDuration)
|
||||
this.migrationContext.SetThrottleGeneralCheckResult(base.NewThrottleCheckResult(true, "leaving hibernation", base.LeavingHibernationThrottleReasonHint))
|
||||
atomic.StoreInt64(&this.migrationContext.HibernateUntil, 0)
|
||||
}()
|
||||
return nil
|
||||
}
|
||||
|
||||
if criticalLoadMet && this.migrationContext.CriticalLoadIntervalMilliseconds == 0 {
|
||||
this.migrationContext.PanicAbort <- fmt.Errorf("critical-load met: %s=%d, >=%d", variableName, value, threshold)
|
||||
}
|
||||
@ -290,6 +353,7 @@ func (this *Throttler) collectGeneralThrottleMetrics() error {
|
||||
func (this *Throttler) initiateThrottlerCollection(firstThrottlingCollected chan<- bool) {
|
||||
go this.collectReplicationLag(firstThrottlingCollected)
|
||||
go this.collectControlReplicasLag()
|
||||
go this.collectThrottleHTTPStatus(firstThrottlingCollected)
|
||||
|
||||
go func() {
|
||||
this.collectGeneralThrottleMetrics()
|
||||
|
@ -14,15 +14,18 @@ import (
|
||||
var (
|
||||
sanitizeQuotesRegexp = regexp.MustCompile("('[^']*')")
|
||||
renameColumnRegexp = regexp.MustCompile(`(?i)\bchange\s+(column\s+|)([\S]+)\s+([\S]+)\s+`)
|
||||
dropColumnRegexp = regexp.MustCompile(`(?i)\bdrop\s+(column\s+|)([\S]+)$`)
|
||||
)
|
||||
|
||||
type Parser struct {
|
||||
columnRenameMap map[string]string
|
||||
droppedColumns map[string]bool
|
||||
}
|
||||
|
||||
func NewParser() *Parser {
|
||||
return &Parser{
|
||||
columnRenameMap: make(map[string]string),
|
||||
droppedColumns: make(map[string]bool),
|
||||
}
|
||||
}
|
||||
|
||||
@ -59,10 +62,9 @@ func (this *Parser) sanitizeQuotesFromAlterStatement(alterStatement string) (str
|
||||
return strippedStatement
|
||||
}
|
||||
|
||||
func (this *Parser) ParseAlterStatement(alterStatement string) (err error) {
|
||||
alterTokens, _ := this.tokenizeAlterStatement(alterStatement)
|
||||
for _, alterToken := range alterTokens {
|
||||
alterToken = this.sanitizeQuotesFromAlterStatement(alterToken)
|
||||
func (this *Parser) parseAlterToken(alterToken string) (err error) {
|
||||
{
|
||||
// rename
|
||||
allStringSubmatch := renameColumnRegexp.FindAllStringSubmatch(alterToken, -1)
|
||||
for _, submatch := range allStringSubmatch {
|
||||
if unquoted, err := strconv.Unquote(submatch[2]); err == nil {
|
||||
@ -71,10 +73,28 @@ func (this *Parser) ParseAlterStatement(alterStatement string) (err error) {
|
||||
if unquoted, err := strconv.Unquote(submatch[3]); err == nil {
|
||||
submatch[3] = unquoted
|
||||
}
|
||||
|
||||
this.columnRenameMap[submatch[2]] = submatch[3]
|
||||
}
|
||||
}
|
||||
{
|
||||
// drop
|
||||
allStringSubmatch := dropColumnRegexp.FindAllStringSubmatch(alterToken, -1)
|
||||
for _, submatch := range allStringSubmatch {
|
||||
if unquoted, err := strconv.Unquote(submatch[2]); err == nil {
|
||||
submatch[2] = unquoted
|
||||
}
|
||||
this.droppedColumns[submatch[2]] = true
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (this *Parser) ParseAlterStatement(alterStatement string) (err error) {
|
||||
alterTokens, _ := this.tokenizeAlterStatement(alterStatement)
|
||||
for _, alterToken := range alterTokens {
|
||||
alterToken = this.sanitizeQuotesFromAlterStatement(alterToken)
|
||||
this.parseAlterToken(alterToken)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -91,3 +111,7 @@ func (this *Parser) GetNonTrivialRenames() map[string]string {
|
||||
func (this *Parser) HasNonTrivialRenames() bool {
|
||||
return len(this.GetNonTrivialRenames()) > 0
|
||||
}
|
||||
|
||||
func (this *Parser) DroppedColumnsMap() map[string]bool {
|
||||
return this.droppedColumns
|
||||
}
|
||||
|
@ -120,3 +120,42 @@ func TestSanitizeQuotesFromAlterStatement(t *testing.T) {
|
||||
test.S(t).ExpectEquals(strippedStatement, "change column i int ''")
|
||||
}
|
||||
}
|
||||
|
||||
func TestParseAlterStatementDroppedColumns(t *testing.T) {
|
||||
|
||||
{
|
||||
parser := NewParser()
|
||||
statement := "drop column b"
|
||||
err := parser.ParseAlterStatement(statement)
|
||||
test.S(t).ExpectNil(err)
|
||||
test.S(t).ExpectEquals(len(parser.droppedColumns), 1)
|
||||
test.S(t).ExpectTrue(parser.droppedColumns["b"])
|
||||
}
|
||||
{
|
||||
parser := NewParser()
|
||||
statement := "drop column b, drop key c_idx, drop column `d`"
|
||||
err := parser.ParseAlterStatement(statement)
|
||||
test.S(t).ExpectNil(err)
|
||||
test.S(t).ExpectEquals(len(parser.droppedColumns), 2)
|
||||
test.S(t).ExpectTrue(parser.droppedColumns["b"])
|
||||
test.S(t).ExpectTrue(parser.droppedColumns["d"])
|
||||
}
|
||||
{
|
||||
parser := NewParser()
|
||||
statement := "drop column b, drop key c_idx, drop column `d`, drop `e`, drop primary key, drop foreign key fk_1"
|
||||
err := parser.ParseAlterStatement(statement)
|
||||
test.S(t).ExpectNil(err)
|
||||
test.S(t).ExpectEquals(len(parser.droppedColumns), 3)
|
||||
test.S(t).ExpectTrue(parser.droppedColumns["b"])
|
||||
test.S(t).ExpectTrue(parser.droppedColumns["d"])
|
||||
test.S(t).ExpectTrue(parser.droppedColumns["e"])
|
||||
}
|
||||
{
|
||||
parser := NewParser()
|
||||
statement := "drop column b, drop bad statement, add column i int"
|
||||
err := parser.ParseAlterStatement(statement)
|
||||
test.S(t).ExpectNil(err)
|
||||
test.S(t).ExpectEquals(len(parser.droppedColumns), 1)
|
||||
test.S(t).ExpectTrue(parser.droppedColumns["b"])
|
||||
}
|
||||
}
|
||||
|
30
localtests/drop-null-add-not-null/create.sql
Normal file
30
localtests/drop-null-add-not-null/create.sql
Normal file
@ -0,0 +1,30 @@
|
||||
drop table if exists gh_ost_test;
|
||||
create table gh_ost_test (
|
||||
id int auto_increment,
|
||||
c1 int null,
|
||||
c2 int not null,
|
||||
primary key (id)
|
||||
) auto_increment=1;
|
||||
|
||||
insert into gh_ost_test values (null, null, 17);
|
||||
insert into gh_ost_test values (null, null, 19);
|
||||
|
||||
drop event if exists gh_ost_test;
|
||||
delimiter ;;
|
||||
create event gh_ost_test
|
||||
on schedule every 1 second
|
||||
starts current_timestamp
|
||||
ends current_timestamp + interval 60 second
|
||||
on completion not preserve
|
||||
enable
|
||||
do
|
||||
begin
|
||||
insert ignore into gh_ost_test values (101, 11, 23);
|
||||
insert ignore into gh_ost_test values (102, 13, 23);
|
||||
insert into gh_ost_test values (null, 17, 23);
|
||||
insert into gh_ost_test values (null, null, 29);
|
||||
set @last_insert_id := last_insert_id();
|
||||
-- update gh_ost_test set c2=c2+@last_insert_id where id=@last_insert_id order by id desc limit 1;
|
||||
delete from gh_ost_test where id=1;
|
||||
delete from gh_ost_test where c1=13; -- id=2
|
||||
end ;;
|
1
localtests/drop-null-add-not-null/extra_args
Normal file
1
localtests/drop-null-add-not-null/extra_args
Normal file
@ -0,0 +1 @@
|
||||
--alter="drop column c1, add column c1 int not null default 47"
|
1
localtests/drop-null-add-not-null/ghost_columns
Normal file
1
localtests/drop-null-add-not-null/ghost_columns
Normal file
@ -0,0 +1 @@
|
||||
c2
|
1
localtests/drop-null-add-not-null/orig_columns
Normal file
1
localtests/drop-null-add-not-null/orig_columns
Normal file
@ -0,0 +1 @@
|
||||
c2
|
Loading…
Reference in New Issue
Block a user