Merge branch 'master' into row-image-minimal

This commit is contained in:
Shlomi Noach 2017-02-12 17:59:10 +02:00 committed by GitHub
commit 88fc345c28
34 changed files with 546 additions and 55 deletions

View File

@ -58,9 +58,10 @@ More tips:
Also see:
- [requirements and limitations](doc/requirements-and-limitations.md)
- [common questions](doc/questions.md)
- [what if?](doc/what-if.md)
- [the fine print](doc/the-fine-print.md)
- [Questions](https://github.com/github/gh-ost/issues?q=label%3Aquestion)
- [Community questions](https://github.com/github/gh-ost/issues?q=label%3Aquestion)
## What's in a name?

View File

@ -1 +1 @@
1.0.32
1.0.35

View File

@ -146,8 +146,12 @@ gh-ost --allow-master-master --assume-master-host=a.specific.master.com
Topologies using _tungsten replicator_ are peculiar in that the participating servers are not actually aware they are replicating. The _tungsten replicator_ looks just like another app issuing queries on those hosts. `gh-ost` is unable to identify that a server participates in a _tungsten_ topology.
If you choose to migrate directly on master (see above), there's nothing special you need to do. If you choose to migrate via replica, then you must supply the identity of the master, and indicate this is a tungsten setup, as follows:
If you choose to migrate directly on master (see above), there's nothing special you need to do.
If you choose to migrate via replica, then you need to make sure Tungsten is configured with log-slave-updates parameter (note this is different from MySQL's own log-slave-updates parameter), otherwise changes will not be in the replica's binlog, causing data to be corrupted after table swap. You must also supply the identity of the master, and indicate this is a tungsten setup, as follows:
```
gh-ost --tungsten --assume-master-host=the.topology.master.com
```
Also note that `--switch-to-rbr` does not work for a Tungsten setup as the replication process is external, so you need to make sure `binlog_format` is set to ROW before Tungsten Replicator connects to the server and starts applying events from the master.

View File

@ -65,6 +65,17 @@ At this time (10-2016) `gh-ost` does not support foreign keys on migrated tables
See also: [`skip-foreign-key-checks`](#skip-foreign-key-checks)
### dml-batch-size
`gh-ost` reads event from the binary log and applies them onto the _ghost_ table. It does so in batched writes: grouping multiple events to apply in a single transaction. This gives better write throughput as we don't need to sync the transaction log to disk for each event.
The `--dml-batch-size` flag controls the size of the batched write. Allowed values are `1 - 100`, where `1` means no batching (every event from the binary log is applied onto the _ghost_ table on its own transaction). Default value is `10`.
Why is this behavior configurable? Different workloads have different characteristics. Some workloads have very large writes, such that aggregating even `50` writes into a transaction makes for a significant transaction size. On other workloads write rate is high such that one just can't allow for a hundred more syncs to disk per second. The default value of `10` is a modest compromise that should probably work very well for most workloads. Your mileage may vary.
Noteworthy is that setting `--dml-batch-size` to higher value _does not_ mean `gh-ost` blocks or waits on writes. The batch size is an upper limit on transaction size, not a minimal one. If `gh-ost` doesn't have "enough" events in the pipe, it does not wait on the binary log, it just writes what it already has. This conveniently suggests that if write load is light enough for `gh-ost` to only see a few events in the binary log at a given time, then it is also light neough for `gh-ost` to apply a fraction of the batch size.
### exact-rowcount
A `gh-ost` execution need to copy whatever rows you have in your existing table onto the ghost table. This can, and often be, a large number. Exactly what that number is?

View File

@ -26,9 +26,9 @@ Both interfaces may serve at the same time. Both respond to simple text command,
- The `critical-load` format must be: `some_status=<numeric-threshold>[,some_status=<numeric-threshold>...]`'
- For example: `Threads_running=1000,threads_connected=5000`, and you would then write/echo `critical-load=Threads_running=1000,threads_connected=5000` to the socket.
- `nice-ratio=<ratio>`: change _nice_ ratio: 0 for aggressive (not nice, not sleeping), positive integer `n`:
- For any `1ms` spent copying rows, spend `n*1ms` units of time sleeping.
- Examples: assume a single rows chunk copy takes `100ms` to complete.
- `nice-ratio=0.5` will cause `gh-ost` to sleep for `50ms` immediately following.
- For any `1ms` spent copying rows, spend `n*1ms` units of time sleeping.
- Examples: assume a single rows chunk copy takes `100ms` to complete.
- `nice-ratio=0.5` will cause `gh-ost` to sleep for `50ms` immediately following.
- `nice-ratio=1` will cause `gh-ost` to sleep for `100ms`, effectively doubling runtime
- value of `2` will effectively triple the runtime; etc.
- `throttle-query`: change throttle query
@ -38,6 +38,10 @@ Both interfaces may serve at the same time. Both respond to simple text command,
- `unpostpone`: at a time where `gh-ost` is postponing the [cut-over](cut-over.md) phase, instruct `gh-ost` to stop postponing and proceed immediately to cut-over.
- `panic`: immediately panic and abort operation
### Querying for data
For commands that accept an argumetn as value, pass `?` (question mark) to _get_ current value rather than _set_ a new one.
### Examples
While migration is running:
@ -63,6 +67,11 @@ $ echo "chunk-size=250" | nc -U /tmp/gh-ost.test.sample_data_0.sock
# Serving on TCP port: 10001
```
```shell
$ echo "chunk-size=?" | nc -U /tmp/gh-ost.test.sample_data_0.sock
250
```
```shell
$ echo throttle | nc -U /tmp/gh-ost.test.sample_data_0.sock

26
doc/questions.md Normal file
View File

@ -0,0 +1,26 @@
# How?
### How does the cut-over work? Is it really atomic?
The cut-over phase, where the original table is swapped away, and the _ghost_ table takes its place, is an atomic, blocking, controlled operation.
- Atomic: the tables are swapped together. There is no gap where your table does not exist.
- Blocking: all app queries involving the migrated (original) table are either operate on the original table, or are blocked, or proceed to operate on the _new_ table (formerly the _ghost_ table, now swapped in).
- Controlled: the cut-over times out at pre-defined threshold, and is atomically aborted, then re-attempted. Cut-over only takes place when no lags are present, and otherwise no throttling reason is found. Cut-over step itself gets high priority and is never throttled.
Read more on [cut-over](cut-over.md) and on the [cut-over design Issue](https://github.com/github/gh-ost/issues/82)
# Is it possible to?
### Is it possible to add a UNIQUE KEY?
Adding a `UNIQUE KEY` is possible, in the condition that no violation will occur. That is, you must make sure there aren't any violating rows on your table before, and during the migration.
At this time there is no equivalent to `ALTER IGNORE`, where duplicates are implicitly and silently thrown away. The MySQL `5.7` docs say:
> As of MySQL 5.7.4, the IGNORE clause for ALTER TABLE is removed and its use produces an error.
It is therefore unlikely that `gh-ost` will support this behavior.
# Why

View File

@ -30,11 +30,12 @@ The `SUPER` privilege is required for `STOP SLAVE`, `START SLAVE` operations. Th
- MySQL 5.7 `JSON` columns are not supported. They are likely to be supported shortly.
- The two _before_ & _after_ tables must share some `UNIQUE KEY`. Such key would be used by `gh-ost` to iterate the table.
- As an example, if your table has a single `UNIQUE KEY` and no `PRIMARY KEY`, and you wish to replace it with a `PRIMARY KEY`, you will need two migrations: one to add the `PRIMARY KEY` (this migration will use the existing `UNIQUE KEY`), another to drop the now redundant `UNIQUE KEY` (this migration will use the `PRIMARY KEY`).
- The chosen migration key must not include columns with `NULL` values.
- `gh-ost` will do its best to pick a migration key with non-nullable columns. It will by default refuse a migration where the only possible `UNIQUE KEY` includes nullable-columns. You may override this refusal via `--allow-nullable-unique-key` but **you must** be sure there are no actual `NULL` values in those columns. Such `NULL` values would cause a data integrity problem and potentially a corrupted migration.
- The two _before_ & _after_ tables must share a `PRIMARY KEY` or other `UNIQUE KEY`. This key will be used by `gh-ost` to iterate through the table rows when copying. [Read more](shared-key.md)
- The migration key must not include columns with NULL values. This means either:
1. The columns are `NOT NULL`, or
2. The columns are nullable but don't contain any NULL values.
- by default, `gh-ost` will not run if the only `UNIQUE KEY` includes nullable columns.
- You may override this via `--allow-nullable-unique-key` but make sure there are no actual `NULL` values in those columns. Existing NULL values can't guarantee data integrity on the migrated table.
- It is not allowed to migrate a table where another table exists with same name and different upper/lower case.
- For example, you may not migrate `MyTable` if another table called `MYtable` exists in the same schema.
@ -48,4 +49,4 @@ The `SUPER` privilege is required for `STOP SLAVE`, `START SLAVE` operations. Th
- If you have en `enum` field as part of your migration key (typically the `PRIMARY KEY`), migration performance will be degraded and potentially bad. [Read more](https://github.com/github/gh-ost/pull/277#issuecomment-254811520)
- Migrating a `FEDERATED` table is unsupported and is irrelevant to the problem `gh-ost` tackles.
- Migrating a `FEDERATED` table is unsupported and is irrelevant to the problem `gh-ost` tackles.

68
doc/shared-key.md Normal file
View File

@ -0,0 +1,68 @@
# Shared key
A requirement for a migration to run is that the two _before_ and _after_ tables have a shared unique key. This is to elaborate and illustrate on the matter.
### Introduction
Consider a classic, simple migration. The table is any normal:
```
CREATE TABLE tbl (
id bigint unsigned not null auto_increment,
data varchar(255),
more_data int,
PRIMARY KEY(id)
)
```
And the migration is a simple `add column ts timestamp`.
In such migration there is no change in indexes, and in particular no change to any unique key, and specifically no change to the `PRIMARY KEY`. To run this migration, `gh-ost` would iterate the `tbl` table using the primary key, copy rows from `tbl` to the _ghost_ table `_tbl_gho` by order of `id`, and then apply binlog events onto `_tbl_gho`.
Applying the binlog events assumes the existence of a shared unique key. For example, an `UPDATE` statement in the binary log translate to a `REPLACE` statement which `gh-ost` applies to the _ghost_ table. Such statement expects to add or replace an existing row based on given row data. In particular, it would _replace_ an existing row if a unique key violation is met.
So `gh-ost` correlates `tbl` and `_tbl_gho` rows using a unique key. In the above example that would be the `PRIMARY KEY`.
### Rules
There must be a shared set of not-null columns for which there is a unique constraint in both the original table and the migration (_ghost_) table.
### Interpreting the rules
The same columns must be covered by a unique key in both tables. This doesn't have to be the `PRIMARY KEY`. This doesn't have to be a key of the same name.
Upon migration, `gh-ost` inspects both the original and _ghost_ table and attempts to find at least one such unique key (or rather, a set of columns) that is shared between the two. Typically this would just be the `PRIMARY KEY`, but sometimes you may change the `PRIMARY KEY` itself, in which case `gh-ost` will look for other options.
`gh-ost` expects unique keys where no `NULL` values are found, i.e. all columns covered by the unique key are defined as `NOT NULL`. This is implicitly true for `PRIMARY KEY`s. If no such key can be found, `gh-ost` bails out. In the event there is no such key, but you happen to _know_ your columns have no `NULL` values even though they're `NULL`-able, you may take responsibility and pass the `--allow-nullable-unique-key`. The migration will run well as long as no `NULL` values are found in the unique key's columns. Any actual `NULL`s may corrupt the migration.
### Examples: allowed and not allowed
```
create table some_table (
id int auto_increment,
ts timestamp,
name varchar(128) not null,
owner_id int not null,
loc_id int,
primary key(id),
unique key name_uidx(name)
)
```
Following are examples of migrations that are _good to run_:
- `add column i int`
- `add key owner_idx(owner_id)`
- `add unique key owner_name_idx(owner_id, name)` - though you need to make sure to not write conflicting rows while this migration runs
- `drop key name_uidx` - `primary key` is shared between the tables
- `drop primary key, add primary key(owner_id, loc_id)` - `name_uidx` is shared between the tables and is used for migration
- `change id bigint unsigned` - the `'primary key` is used. The change of type still makes the `primary key` workable.
- `drop primary key, drop key name_uidx, create primary key(name), create unique key id_uidx(id)` - swapping the two keys. `gh-ost` is still happy because `id` is still unique in both tables. So is `name`.
Following are examples of migrations that _cannot run_:
- `drop primary key, drop key name_uidx` - no unique key to _ghost_ table, so clearly cannot run
- `drop primary key, drop key name_uidx, create primary key(name, owner_id)` - no shared columns to both tables. Even though `name` exists in the _ghost_ table's `primary key`, it is only part of the key and in itself does not guarantee uniqueness in the _ghost_ table.
Also, you cannot run a migration on a table that doesn't have some form of `unique key` in the first place, such as `some_table (id int, ts timestamp)`

View File

@ -135,7 +135,9 @@ type MigrationContext struct {
OriginalBinlogFormat string
OriginalBinlogRowImage string
InspectorConnectionConfig *mysql.ConnectionConfig
InspectorMySQLVersion string
ApplierConnectionConfig *mysql.ConnectionConfig
ApplierMySQLVersion string
StartTime time.Time
RowCopyStartTime time.Time
RowCopyEndTime time.Time
@ -148,6 +150,7 @@ type MigrationContext struct {
controlReplicasLagResult mysql.ReplicationLagResult
TotalRowsCopied int64
TotalDMLEventsApplied int64
DMLBatchSize int64
isThrottled bool
throttleReason string
throttleReasonHint ThrottleReasonHint
@ -207,6 +210,7 @@ func newMigrationContext() *MigrationContext {
ApplierConnectionConfig: mysql.NewConnectionConfig(),
MaxLagMillisecondsThrottleThreshold: 1500,
CutOverLockTimeoutSeconds: 3,
DMLBatchSize: 10,
maxLoad: NewLoadMap(),
criticalLoad: NewLoadMap(),
throttleMutex: &sync.Mutex{},
@ -417,6 +421,16 @@ func (this *MigrationContext) SetChunkSize(chunkSize int64) {
atomic.StoreInt64(&this.ChunkSize, chunkSize)
}
func (this *MigrationContext) SetDMLBatchSize(batchSize int64) {
if batchSize < 1 {
batchSize = 1
}
if batchSize > 100 {
batchSize = 100
}
atomic.StoreInt64(&this.DMLBatchSize, batchSize)
}
func (this *MigrationContext) SetThrottleGeneralCheckResult(checkResult *ThrottleCheckResult) *ThrottleCheckResult {
this.throttleMutex.Lock()
defer this.throttleMutex.Unlock()
@ -547,7 +561,11 @@ func (this *MigrationContext) GetControlReplicasLagResult() mysql.ReplicationLag
func (this *MigrationContext) SetControlReplicasLagResult(lagResult *mysql.ReplicationLagResult) {
this.throttleMutex.Lock()
defer this.throttleMutex.Unlock()
this.controlReplicasLagResult = *lagResult
if lagResult == nil {
this.controlReplicasLagResult = *mysql.NewNoReplicationLagResult()
} else {
this.controlReplicasLagResult = *lagResult
}
}
func (this *MigrationContext) GetThrottleControlReplicaKeys() *mysql.InstanceKeyMap {

View File

@ -83,6 +83,7 @@ func main() {
flag.BoolVar(&migrationContext.SwitchToRowBinlogFormat, "switch-to-rbr", false, "let this tool automatically switch binary log format to 'ROW' on the replica, if needed. The format will NOT be switched back. I'm too scared to do that, and wish to protect you if you happen to execute another migration while this one is running")
flag.BoolVar(&migrationContext.AssumeRBR, "assume-rbr", false, "set to 'true' when you know for certain your server uses 'ROW' binlog_format. gh-ost is unable to tell, event after reading binlog_format, whether the replication process does indeed use 'ROW', and restarts replication to be certain RBR setting is applied. Such operation requires SUPER privileges which you might not have. Setting this flag avoids restarting replication and you can proceed to use gh-ost without SUPER privileges")
chunkSize := flag.Int64("chunk-size", 1000, "amount of rows to handle in each iteration (allowed range: 100-100,000)")
dmlBatchSize := flag.Int64("dml-batch-size", 10, "batch size for DML events to apply in a single transaction (range 1-100)")
defaultRetries := flag.Int64("default-retries", 60, "Default number of retries for various operations before panicking")
cutOverLockTimeoutSeconds := flag.Int64("cut-over-lock-timeout-seconds", 3, "Max number of seconds to hold locks on tables while attempting to cut-over (retry attempted when lock exceeds timeout)")
niceRatio := flag.Float64("nice-ratio", 0, "force being 'nice', imply sleep time per chunk time; range: [0.0..100.0]. Example values: 0 is aggressive. 1: for every 1ms spent copying rows, sleep additional 1ms (effectively doubling runtime); 0.7: for every 10ms spend in a rowcopy chunk, spend 7ms sleeping immediately after")
@ -223,6 +224,7 @@ func main() {
migrationContext.SetHeartbeatIntervalMilliseconds(*heartbeatIntervalMillis)
migrationContext.SetNiceRatio(*niceRatio)
migrationContext.SetChunkSize(*chunkSize)
migrationContext.SetDMLBatchSize(*dmlBatchSize)
migrationContext.SetMaxLagMillisecondsThrottleThreshold(*maxLagMillis)
migrationContext.SetThrottleQuery(*throttleQuery)
migrationContext.SetDefaultNumRetries(*defaultRetries)
@ -240,5 +242,5 @@ func main() {
migrator.ExecOnFailureHook()
log.Fatale(err)
}
log.Info("Done")
fmt.Fprintf(os.Stdout, "# Done\n")
}

View File

@ -70,14 +70,15 @@ func (this *Applier) InitDBConnections() (err error) {
if err := this.readTableColumns(); err != nil {
return err
}
log.Infof("Applier initiated on %+v, version %+v", this.connectionConfig.ImpliedKey, this.migrationContext.ApplierMySQLVersion)
return nil
}
// validateConnection issues a simple can-connect to MySQL
func (this *Applier) validateConnection(db *gosql.DB) error {
query := `select @@global.port`
query := `select @@global.port, @@global.version`
var port int
if err := db.QueryRow(query).Scan(&port); err != nil {
if err := db.QueryRow(query).Scan(&port, &this.migrationContext.ApplierMySQLVersion); err != nil {
return err
}
if port != this.connectionConfig.Key.Port {
@ -971,3 +972,55 @@ func (this *Applier) ApplyDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) error {
}
return nil
}
// ApplyDMLEventQueries applies multiple DML queries onto the _ghost_ table
func (this *Applier) ApplyDMLEventQueries(dmlEvents [](*binlog.BinlogDMLEvent)) error {
var totalDelta int64
err := func() error {
tx, err := this.db.Begin()
if err != nil {
return err
}
rollback := func(err error) error {
tx.Rollback()
return err
}
sessionQuery := `SET
SESSION time_zone = '+00:00',
sql_mode = CONCAT(@@session.sql_mode, ',STRICT_ALL_TABLES')
`
if _, err := tx.Exec(sessionQuery); err != nil {
return rollback(err)
}
for _, dmlEvent := range dmlEvents {
query, args, rowDelta, err := this.buildDMLEventQuery(dmlEvent)
if err != nil {
return rollback(err)
}
if _, err := tx.Exec(query, args...); err != nil {
err = fmt.Errorf("%s; query=%s; args=%+v", err.Error(), query, args)
return rollback(err)
}
totalDelta += rowDelta
}
if err := tx.Commit(); err != nil {
return err
}
return nil
}()
if err != nil {
return log.Errore(err)
}
// no error
atomic.AddInt64(&this.migrationContext.TotalDMLEventsApplied, int64(len(dmlEvents)))
if this.migrationContext.CountTableRows {
atomic.AddInt64(&this.migrationContext.RowsDeltaEstimate, totalDelta)
}
log.Debugf("ApplyDMLEventQueries() applied %d events in one transaction", len(dmlEvents))
return nil
}

View File

@ -60,6 +60,7 @@ func (this *Inspector) InitDBConnections() (err error) {
if err := this.applyBinlogFormat(); err != nil {
return err
}
log.Infof("Inspector initiated on %+v, version %+v", this.connectionConfig.ImpliedKey, this.migrationContext.InspectorMySQLVersion)
return nil
}
@ -168,9 +169,9 @@ func (this *Inspector) inspectOriginalAndGhostTables() (err error) {
// validateConnection issues a simple can-connect to MySQL
func (this *Inspector) validateConnection() error {
query := `select @@global.port`
query := `select @@global.port, @@global.version`
var port int
if err := this.db.QueryRow(query).Scan(&port); err != nil {
if err := this.db.QueryRow(query).Scan(&port, &this.migrationContext.InspectorMySQLVersion); err != nil {
return err
}
if port != this.connectionConfig.Key.Port {
@ -529,6 +530,11 @@ func (this *Inspector) applyColumnTypes(databaseName, tableName string, columnsL
columnsList.SetUnsigned(columnName)
}
}
if strings.Contains(columnType, "mediumint") {
for _, columnsList := range columnsLists {
columnsList.GetColumn(columnName).Type = sql.MediumIntColumnType
}
}
if strings.Contains(columnType, "timestamp") {
for _, columnsList := range columnsLists {
columnsList.GetColumn(columnName).Type = sql.TimestampColumnType
@ -541,7 +547,7 @@ func (this *Inspector) applyColumnTypes(databaseName, tableName string, columnsL
}
if strings.HasPrefix(columnType, "enum") {
for _, columnsList := range columnsLists {
columnsList.GetColumn(columnName).Type = sql.EnumColumnValue
columnsList.GetColumn(columnName).Type = sql.EnumColumnType
}
}
if charset := m.GetString("CHARACTER_SET_NAME"); charset != "" {

View File

@ -37,6 +37,21 @@ func ReadChangelogState(s string) ChangelogState {
type tableWriteFunc func() error
type applyEventStruct struct {
writeFunc *tableWriteFunc
dmlEvent *binlog.BinlogDMLEvent
}
func newApplyEventStructByFunc(writeFunc *tableWriteFunc) *applyEventStruct {
result := &applyEventStruct{writeFunc: writeFunc}
return result
}
func newApplyEventStructByDML(dmlEvent *binlog.BinlogDMLEvent) *applyEventStruct {
result := &applyEventStruct{dmlEvent: dmlEvent}
return result
}
const (
applyEventsQueueBuffer = 100
)
@ -71,7 +86,7 @@ type Migrator struct {
// copyRowsQueue should not be buffered; if buffered some non-damaging but
// excessive work happens at the end of the iteration as new copy-jobs arrive befroe realizing the copy is complete
copyRowsQueue chan tableWriteFunc
applyEventsQueue chan tableWriteFunc
applyEventsQueue chan *applyEventStruct
handledChangelogStates map[string]bool
}
@ -86,7 +101,7 @@ func NewMigrator() *Migrator {
allEventsUpToLockProcessed: make(chan string),
copyRowsQueue: make(chan tableWriteFunc),
applyEventsQueue: make(chan tableWriteFunc, applyEventsQueueBuffer),
applyEventsQueue: make(chan *applyEventStruct, applyEventsQueueBuffer),
handledChangelogStates: make(map[string]bool),
}
return migrator
@ -194,7 +209,7 @@ func (this *Migrator) onChangelogStateEvent(dmlEvent *binlog.BinlogDMLEvent) (er
}
case AllEventsUpToLockProcessed:
{
applyEventFunc := func() error {
var applyEventFunc tableWriteFunc = func() error {
this.allEventsUpToLockProcessed <- changelogStateString
return nil
}
@ -204,7 +219,7 @@ func (this *Migrator) onChangelogStateEvent(dmlEvent *binlog.BinlogDMLEvent) (er
// So as not to create a potential deadlock, we write this func to applyEventsQueue
// asynchronously, understanding it doesn't really matter.
go func() {
this.applyEventsQueue <- applyEventFunc
this.applyEventsQueue <- newApplyEventStructByFunc(&applyEventFunc)
}()
}
default:
@ -746,6 +761,12 @@ func (this *Migrator) printMigrationStatusHint(writers ...io.Writer) {
throttleQuery,
))
}
if throttleControlReplicaKeys := this.migrationContext.GetThrottleControlReplicaKeys(); throttleControlReplicaKeys.Len() > 0 {
fmt.Fprintln(w, fmt.Sprintf("# throttle-control-replicas count: %+v",
throttleControlReplicaKeys.Len(),
))
}
if this.migrationContext.PostponeCutOverFlagFile != "" {
setIndicator := ""
if base.FileExists(this.migrationContext.PostponeCutOverFlagFile) {
@ -912,11 +933,7 @@ func (this *Migrator) addDMLEventsListener() error {
this.migrationContext.DatabaseName,
this.migrationContext.OriginalTableName,
func(dmlEvent *binlog.BinlogDMLEvent) error {
// Create a task to apply the DML event; this will be execute by executeWriteFuncs()
applyEventFunc := func() error {
return this.applier.ApplyDMLEventQuery(dmlEvent)
}
this.applyEventsQueue <- applyEventFunc
this.applyEventsQueue <- newApplyEventStructByDML(dmlEvent)
return nil
},
)
@ -929,7 +946,9 @@ func (this *Migrator) initiateThrottler() error {
go this.throttler.initiateThrottlerCollection(this.firstThrottlingCollected)
log.Infof("Waiting for first throttle metrics to be collected")
<-this.firstThrottlingCollected
<-this.firstThrottlingCollected // replication lag
<-this.firstThrottlingCollected // other metrics
log.Infof("First throttle metrics collected")
go this.throttler.initiateThrottlerChecks()
return nil
@ -1013,6 +1032,55 @@ func (this *Migrator) iterateChunks() error {
return nil
}
func (this *Migrator) onApplyEventStruct(eventStruct *applyEventStruct) error {
handleNonDMLEventStruct := func(eventStruct *applyEventStruct) error {
if eventStruct.writeFunc != nil {
if err := this.retryOperation(*eventStruct.writeFunc); err != nil {
return log.Errore(err)
}
}
return nil
}
if eventStruct.dmlEvent == nil {
return handleNonDMLEventStruct(eventStruct)
}
if eventStruct.dmlEvent != nil {
dmlEvents := [](*binlog.BinlogDMLEvent){}
dmlEvents = append(dmlEvents, eventStruct.dmlEvent)
var nonDmlStructToApply *applyEventStruct
availableEvents := len(this.applyEventsQueue)
batchSize := int(atomic.LoadInt64(&this.migrationContext.DMLBatchSize))
if availableEvents > batchSize {
availableEvents = batchSize
}
for i := 0; i < availableEvents; i++ {
additionalStruct := <-this.applyEventsQueue
if additionalStruct.dmlEvent == nil {
// Not a DML. We don't group this, and we don't batch any further
nonDmlStructToApply = additionalStruct
break
}
dmlEvents = append(dmlEvents, additionalStruct.dmlEvent)
}
// Create a task to apply the DML event; this will be execute by executeWriteFuncs()
var applyEventFunc tableWriteFunc = func() error {
return this.applier.ApplyDMLEventQueries(dmlEvents)
}
if err := this.retryOperation(applyEventFunc); err != nil {
return log.Errore(err)
}
if nonDmlStructToApply != nil {
// We pulled DML events from the queue, and then we hit a non-DML event. Wait!
// We need to handle it!
if err := handleNonDMLEventStruct(nonDmlStructToApply); err != nil {
return log.Errore(err)
}
}
}
return nil
}
// executeWriteFuncs writes data via applier: both the rowcopy and the events backlog.
// This is where the ghost table gets the data. The function fills the data single-threaded.
// Both event backlog and rowcopy events are polled; the backlog events have precedence.
@ -1027,10 +1095,10 @@ func (this *Migrator) executeWriteFuncs() error {
// We give higher priority to event processing, then secondary priority to
// rowcopy
select {
case applyEventFunc := <-this.applyEventsQueue:
case eventStruct := <-this.applyEventsQueue:
{
if err := this.retryOperation(applyEventFunc); err != nil {
return log.Errore(err)
if err := this.onApplyEventStruct(eventStruct); err != nil {
return err
}
}
default:

View File

@ -126,7 +126,7 @@ func (this *Server) applyServerCommand(command string, writer *bufio.Writer) (pr
if len(tokens) > 1 {
arg = strings.TrimSpace(tokens[1])
}
argIsQuestion := (arg == "?")
throttleHint := "# Note: you may only throttle for as long as your binary logs are not purged\n"
if err := this.hooksExecutor.onInteractiveCommand(command); err != nil {
@ -152,6 +152,7 @@ no-throttle # End forced throttling (other throttling m
unpostpone # Bail out a cut-over postpone; proceed to cut-over
panic # panic and quit without cleanup
help # This message
- use '?' (question mark) as argument to get info rather than set. e.g. "max-load=?" will just print out current max-load.
`)
}
case "sup":
@ -160,6 +161,10 @@ help # This message
return ForcePrintStatusAndHintRule, nil
case "chunk-size":
{
if argIsQuestion {
fmt.Fprintf(writer, "%+v\n", atomic.LoadInt64(&this.migrationContext.ChunkSize))
return NoPrintStatusRule, nil
}
if chunkSize, err := strconv.Atoi(arg); err != nil {
return NoPrintStatusRule, err
} else {
@ -169,6 +174,10 @@ help # This message
}
case "max-lag-millis":
{
if argIsQuestion {
fmt.Fprintf(writer, "%+v\n", atomic.LoadInt64(&this.migrationContext.MaxLagMillisecondsThrottleThreshold))
return NoPrintStatusRule, nil
}
if maxLagMillis, err := strconv.Atoi(arg); err != nil {
return NoPrintStatusRule, err
} else {
@ -182,6 +191,10 @@ help # This message
}
case "nice-ratio":
{
if argIsQuestion {
fmt.Fprintf(writer, "%+v\n", this.migrationContext.GetNiceRatio())
return NoPrintStatusRule, nil
}
if niceRatio, err := strconv.ParseFloat(arg, 64); err != nil {
return NoPrintStatusRule, err
} else {
@ -191,6 +204,11 @@ help # This message
}
case "max-load":
{
if argIsQuestion {
maxLoad := this.migrationContext.GetMaxLoad()
fmt.Fprintf(writer, "%s\n", maxLoad.String())
return NoPrintStatusRule, nil
}
if err := this.migrationContext.ReadMaxLoad(arg); err != nil {
return NoPrintStatusRule, err
}
@ -198,6 +216,11 @@ help # This message
}
case "critical-load":
{
if argIsQuestion {
criticalLoad := this.migrationContext.GetCriticalLoad()
fmt.Fprintf(writer, "%s\n", criticalLoad.String())
return NoPrintStatusRule, nil
}
if err := this.migrationContext.ReadCriticalLoad(arg); err != nil {
return NoPrintStatusRule, err
}
@ -205,12 +228,20 @@ help # This message
}
case "throttle-query":
{
if argIsQuestion {
fmt.Fprintf(writer, "%+v\n", this.migrationContext.GetThrottleQuery())
return NoPrintStatusRule, nil
}
this.migrationContext.SetThrottleQuery(arg)
fmt.Fprintf(writer, throttleHint)
return ForcePrintStatusAndHintRule, nil
}
case "throttle-control-replicas":
{
if argIsQuestion {
fmt.Fprintf(writer, "%s\n", this.migrationContext.GetThrottleControlReplicaKeys().ToCommaDelimitedList())
return NoPrintStatusRule, nil
}
if err := this.migrationContext.ReadThrottleControlReplicaKeys(arg); err != nil {
return NoPrintStatusRule, err
}

View File

@ -84,21 +84,38 @@ func (this *Throttler) parseChangelogHeartbeat(heartbeatValue string) (err error
}
}
// collectHeartbeat reads the latest changelog heartbeat value
func (this *Throttler) collectHeartbeat() {
ticker := time.Tick(time.Duration(this.migrationContext.HeartbeatIntervalMilliseconds) * time.Millisecond)
for range ticker {
go func() error {
if atomic.LoadInt64(&this.migrationContext.CleanupImminentFlag) > 0 {
return nil
// collectReplicationLag reads the latest changelog heartbeat value
func (this *Throttler) collectReplicationLag(firstThrottlingCollected chan<- bool) {
collectFunc := func() error {
if atomic.LoadInt64(&this.migrationContext.CleanupImminentFlag) > 0 {
return nil
}
if this.migrationContext.TestOnReplica || this.migrationContext.MigrateOnReplica {
// when running on replica, the heartbeat injection is also done on the replica.
// This means we will always get a good heartbeat value.
// When runnign on replica, we should instead check the `SHOW SLAVE STATUS` output.
if lag, err := mysql.GetReplicationLag(this.inspector.connectionConfig); err != nil {
return log.Errore(err)
} else {
atomic.StoreInt64(&this.migrationContext.CurrentLag, int64(lag))
}
} else {
if heartbeatValue, err := this.inspector.readChangelogState("heartbeat"); err != nil {
return log.Errore(err)
} else {
this.parseChangelogHeartbeat(heartbeatValue)
}
return nil
}()
}
return nil
}
collectFunc()
firstThrottlingCollected <- true
ticker := time.Tick(time.Duration(this.migrationContext.HeartbeatIntervalMilliseconds) * time.Millisecond)
for range ticker {
go collectFunc()
}
}
@ -114,6 +131,7 @@ func (this *Throttler) collectControlReplicasLag() {
readReplicaLag := func(connectionConfig *mysql.ConnectionConfig) (lag time.Duration, err error) {
dbUri := connectionConfig.GetDBUri("information_schema")
var heartbeatValue string
if db, _, err := sqlutils.GetDB(dbUri); err != nil {
return lag, err
@ -158,9 +176,7 @@ func (this *Throttler) collectControlReplicasLag() {
// No need to read lag
return
}
if result := readControlReplicasLag(); result != nil {
this.migrationContext.SetControlReplicasLagResult(result)
}
this.migrationContext.SetControlReplicasLagResult(readControlReplicasLag())
}
aggressiveTicker := time.Tick(100 * time.Millisecond)
relaxedFactor := 10
@ -272,13 +288,14 @@ func (this *Throttler) collectGeneralThrottleMetrics() error {
// that may affect throttling. There are several components, all running independently,
// that collect such metrics.
func (this *Throttler) initiateThrottlerCollection(firstThrottlingCollected chan<- bool) {
go this.collectHeartbeat()
go this.collectReplicationLag(firstThrottlingCollected)
go this.collectControlReplicasLag()
go func() {
throttlerMetricsTick := time.Tick(1 * time.Second)
this.collectGeneralThrottleMetrics()
firstThrottlingCollected <- true
throttlerMetricsTick := time.Tick(1 * time.Second)
for range throttlerMetricsTick {
this.collectGeneralThrottleMetrics()
}

View File

@ -22,6 +22,14 @@ type ReplicationLagResult struct {
Err error
}
func NewNoReplicationLagResult() *ReplicationLagResult {
return &ReplicationLagResult{Lag: 0, Err: nil}
}
func (this *ReplicationLagResult) HasLag() bool {
return this.Lag > 0
}
// GetReplicationLag returns replication lag for a given connection config; either by explicit query
// or via SHOW SLAVE STATUS
func GetReplicationLag(connectionConfig *ConnectionConfig) (replicationLag time.Duration, err error) {
@ -32,9 +40,11 @@ func GetReplicationLag(connectionConfig *ConnectionConfig) (replicationLag time.
}
err = sqlutils.QueryRowsMap(db, `show slave status`, func(m sqlutils.RowMap) error {
slaveIORunning := m.GetString("Slave_IO_Running")
slaveSQLRunning := m.GetString("Slave_SQL_Running")
secondsBehindMaster := m.GetNullInt64("Seconds_Behind_Master")
if !secondsBehindMaster.Valid {
return fmt.Errorf("replication not running")
return fmt.Errorf("replication not running; Slave_IO_Running=%+v, Slave_SQL_Running=%+v", slaveIORunning, slaveSQLRunning)
}
replicationLag = time.Duration(secondsBehindMaster.Int64) * time.Second
return nil

View File

@ -258,7 +258,7 @@ func BuildUniqueKeyRangeEndPreparedQuery(databaseName, tableName string, uniqueK
uniqueKeyColumnDescending := make([]string, len(uniqueKeyColumnNames), len(uniqueKeyColumnNames))
for i, column := range uniqueKeyColumns.Columns() {
uniqueKeyColumnNames[i] = EscapeName(uniqueKeyColumnNames[i])
if column.Type == EnumColumnValue {
if column.Type == EnumColumnType {
uniqueKeyColumnAscending[i] = fmt.Sprintf("concat(%s) asc", uniqueKeyColumnNames[i])
uniqueKeyColumnDescending[i] = fmt.Sprintf("concat(%s) desc", uniqueKeyColumnNames[i])
} else {
@ -309,7 +309,7 @@ func buildUniqueKeyMinMaxValuesPreparedQuery(databaseName, tableName string, uni
uniqueKeyColumnOrder := make([]string, len(uniqueKeyColumnNames), len(uniqueKeyColumnNames))
for i, column := range uniqueKeyColumns.Columns() {
uniqueKeyColumnNames[i] = EscapeName(uniqueKeyColumnNames[i])
if column.Type == EnumColumnValue {
if column.Type == EnumColumnType {
uniqueKeyColumnOrder[i] = fmt.Sprintf("concat(%s) %s", uniqueKeyColumnNames[i], order)
} else {
uniqueKeyColumnOrder[i] = fmt.Sprintf("%s %s", uniqueKeyColumnNames[i], order)

View File

@ -18,9 +18,12 @@ const (
UnknownColumnType ColumnType = iota
TimestampColumnType = iota
DateTimeColumnType = iota
EnumColumnValue = iota
EnumColumnType = iota
MediumIntColumnType = iota
)
const maxMediumintUnsigned int32 = 16777215
type TimezoneConvertion struct {
ToTimezone string
}
@ -50,6 +53,14 @@ func (this *Column) convertArg(arg interface{}) interface{} {
return uint16(i)
}
if i, ok := arg.(int32); ok {
if this.Type == MediumIntColumnType {
// problem with mediumint is that it's a 3-byte type. There is no compatible golang type to match that.
// So to convert from negative to positive we'd need to convert the value manually
if i >= 0 {
return i
}
return uint32(maxMediumintUnsigned + i + 1)
}
return uint32(i)
}
if i, ok := arg.(int64); ok {

View File

@ -0,0 +1,22 @@
drop table if exists gh_ost_test;
create table gh_ost_test (
id int auto_increment,
i int not null,
ts timestamp,
primary key(id)
) auto_increment=1;
drop event if exists gh_ost_test;
delimiter ;;
create event gh_ost_test
on schedule every 1 second
starts current_timestamp
ends current_timestamp + interval 60 second
on completion not preserve
enable
do
begin
insert into gh_ost_test values (null, 11, now());
insert into gh_ost_test values (null, 13, now());
insert into gh_ost_test values (null, 17, now());
end ;;

View File

@ -0,0 +1 @@
No PRIMARY nor UNIQUE key found in table

View File

@ -0,0 +1 @@
--alter="change id id int, drop primary key"

View File

@ -0,0 +1,22 @@
drop table if exists gh_ost_test;
create table gh_ost_test (
id int auto_increment,
i int not null,
ts timestamp,
primary key(id)
) auto_increment=1;
drop event if exists gh_ost_test;
delimiter ;;
create event gh_ost_test
on schedule every 1 second
starts current_timestamp
ends current_timestamp + interval 60 second
on completion not preserve
enable
do
begin
insert into gh_ost_test values (null, 11, now());
insert into gh_ost_test values (null, 13, now());
insert into gh_ost_test values (null, 17, now());
end ;;

View File

@ -0,0 +1 @@
No shared unique key can be found after ALTER

View File

@ -0,0 +1 @@
--alter="drop primary key, add primary key (id, i)"

View File

@ -0,0 +1,24 @@
drop table if exists gh_ost_test;
create table gh_ost_test (
id bigint,
i int not null,
ts timestamp(6),
primary key(id),
unique key its_uidx(i, ts)
) ;
drop event if exists gh_ost_test;
delimiter ;;
create event gh_ost_test
on schedule every 1 second
starts current_timestamp
ends current_timestamp + interval 60 second
on completion not preserve
enable
do
begin
insert into gh_ost_test values ((unix_timestamp() << 2) + 0, 11, now(6));
insert into gh_ost_test values ((unix_timestamp() << 2) + 1, 13, now(6));
insert into gh_ost_test values ((unix_timestamp() << 2) + 2, 17, now(6));
insert into gh_ost_test values ((unix_timestamp() << 2) + 3, 19, now(6));
end ;;

View File

@ -0,0 +1 @@
--alter="drop primary key, drop key its_uidx, add primary key (i, ts), add unique key id_uidx(id)"

View File

@ -0,0 +1 @@
id

View File

@ -0,0 +1,24 @@
drop table if exists gh_ost_test;
create table gh_ost_test (
id bigint,
i int not null,
ts timestamp(6),
unique key id_uidx(id),
unique key its_uidx(i, ts)
) ;
drop event if exists gh_ost_test;
delimiter ;;
create event gh_ost_test
on schedule every 1 second
starts current_timestamp
ends current_timestamp + interval 60 second
on completion not preserve
enable
do
begin
insert into gh_ost_test values ((unix_timestamp() << 2) + 0, 11, now(6));
insert into gh_ost_test values ((unix_timestamp() << 2) + 1, 13, now(6));
insert into gh_ost_test values ((unix_timestamp() << 2) + 2, 17, now(6));
insert into gh_ost_test values ((unix_timestamp() << 2) + 3, 19, now(6));
end ;;

View File

@ -0,0 +1 @@
--alter="drop key id_uidx, drop key its_uidx, add unique key its2_uidx(i, ts), add unique key id2_uidx(id)"

View File

@ -0,0 +1 @@
id

View File

@ -0,0 +1,22 @@
drop table if exists gh_ost_test;
create table gh_ost_test (
id int auto_increment,
i int not null,
ts timestamp,
primary key(id)
) auto_increment=1;
drop event if exists gh_ost_test;
delimiter ;;
create event gh_ost_test
on schedule every 1 second
starts current_timestamp
ends current_timestamp + interval 60 second
on completion not preserve
enable
do
begin
insert into gh_ost_test values (null, 11, now());
insert into gh_ost_test values (null, 13, now());
insert into gh_ost_test values (null, 17, now());
end ;;

View File

@ -0,0 +1 @@
--alter="drop primary key, add unique key(id)"

View File

@ -49,7 +49,7 @@ test_single() {
echo -n "Testing: $test_name"
echo_dot
gh-ost-test-mysql-replica -e "start slave"
gh-ost-test-mysql-replica -e "stop slave; start slave; do sleep(1)"
echo_dot
gh-ost-test-mysql-master --default-character-set=utf8mb4 test < $tests_path/$test_name/create.sql
@ -59,12 +59,16 @@ test_single() {
fi
orig_columns="*"
ghost_columns="*"
order_by=""
if [ -f $tests_path/$test_name/orig_columns ] ; then
orig_columns=$(cat $tests_path/$test_name/orig_columns)
fi
if [ -f $tests_path/$test_name/ghost_columns ] ; then
ghost_columns=$(cat $tests_path/$test_name/ghost_columns)
fi
if [ -f $tests_path/$test_name/order_by ] ; then
order_by="order by $(cat $tests_path/$test_name/order_by)"
fi
# graceful sleep for replica to catch up
echo_dot
sleep 1
@ -84,7 +88,7 @@ test_single() {
--throttle-query='select timestampdiff(second, min(last_update), now()) < 5 from _gh_ost_test_ghc' \
--serve-socket-file=/tmp/gh-ost.test.sock \
--initially-drop-socket-file \
--postpone-cut-over-flag-file="" \
--postpone-cut-over-flag-file=/tmp/gh-ost.test.postpone.flag \
--test-on-replica \
--default-retries=1 \
--verbose \
@ -129,8 +133,8 @@ test_single() {
fi
echo_dot
orig_checksum=$(gh-ost-test-mysql-replica --default-character-set=utf8mb4 test -e "select ${orig_columns} from gh_ost_test" -ss | md5sum)
ghost_checksum=$(gh-ost-test-mysql-replica --default-character-set=utf8mb4 test -e "select ${ghost_columns} from _gh_ost_test_gho" -ss | md5sum)
orig_checksum=$(gh-ost-test-mysql-replica --default-character-set=utf8mb4 test -e "select ${orig_columns} from gh_ost_test ${order_by}" -ss | md5sum)
ghost_checksum=$(gh-ost-test-mysql-replica --default-character-set=utf8mb4 test -e "select ${ghost_columns} from _gh_ost_test_gho ${order_by}" -ss | md5sum)
if [ "$orig_checksum" != "$ghost_checksum" ] ; then
echo "ERROR $test_name: checksum mismatch"

View File

@ -0,0 +1,28 @@
drop table if exists gh_ost_test;
create table gh_ost_test (
id bigint(20) NOT NULL AUTO_INCREMENT,
column1 int(11) NOT NULL,
column2 smallint(5) unsigned NOT NULL,
column3 mediumint(8) unsigned NOT NULL,
column4 tinyint(3) unsigned NOT NULL,
column5 int(11) NOT NULL,
column6 int(11) NOT NULL,
PRIMARY KEY (id),
KEY c12_ix (column1, column2)
) auto_increment=1;
drop event if exists gh_ost_test;
delimiter ;;
create event gh_ost_test
on schedule every 1 second
starts current_timestamp
ends current_timestamp + interval 60 second
on completion not preserve
enable
do
begin
-- mediumint maxvalue: 16777215 (unsigned), 8388607 (signed)
insert into gh_ost_test values (NULL, 13382498, 536, 8388607, 3, 1483892217, 1483892218);
insert into gh_ost_test values (NULL, 13382498, 536, 8388607, 250, 1483892217, 1483892218);
insert into gh_ost_test values (NULL, 13382498, 536, 10000000, 3, 1483892217, 1483892218);
end ;;