Retries, better visibility, documentation
- Rowcopy time is bounded by copy end-time - Retries are configurable via `--default-retries` (default: `60`) - `migrator` notes the hostname - `applier` and `inspector` note `impliedKey` (`@@hostname` and `@@port`) - Added lots of code comments - Adding documentation for "triggerless design"
This commit is contained in:
parent
23cb8ea7e9
commit
62b8a897e3
@ -0,0 +1,138 @@
|
||||
# Triggerless design
|
||||
|
||||
A breakdown of the logic and algorithm behind `gh-ost`'s triggerless design, followed by the implications, advantages and disadvantages of such design.
|
||||
|
||||
### Trigger-based migrations background
|
||||
|
||||
It is worthwhile to consider two popular existing online schema change solutions:
|
||||
|
||||
- [pt-online-schema-change](https://www.percona.com/doc/percona-toolkit/2.2/pt-online-schema-change.html)
|
||||
- [Facebook OSC](https://www.facebook.com/notes/mysql-at-facebook/online-schema-change-for-mysql/430801045932/)
|
||||
|
||||
The former uses a synchronous design: it adds three triggers (`AFTER INSERT`, `AFTER UPDATE`, `AFTER DELETE`) on the original table. Each such trigger relays the operation onto the ghost table. So for every `UPDATE` on the original table, an `UPDATE` executes on the ghost table. A `DELETE` on the original table triggers a `DELETE` on the ghost table. Same for `INSERT`. The triggers live in the same transaction space as the original query.
|
||||
|
||||
The latter uses an asynchronous design: it adds three triggers (`AFTER INSERT`, `AFTER UPDATE`, `AFTER DELETE`) on the original table. It also creates a _changelog_ table. The triggers do not relay operations directly to the ghost table. Instead, they each add an entry to the changelog table. An `UPDATE` on the original table makes for an `INSERT` on the changelog table saying "There was an UPDATE on the original table with this and that values"; likewise for `INSERT` and `DELETE`.
|
||||
A background process tails the changelog table and applies the changes onto the ghost table. This approach is asynchronous in that the applier does not live in the same transaction space as the original table, and may operate on a change event seconds or more after said event was written.
|
||||
It is noteworthy that the writes to the changelog table still live in the same transaction space as the writes on the original table.
|
||||
|
||||
### Triggerless based asynchronous migrations
|
||||
|
||||
`gh-ost`'s triggerless design uses an asynchronous approach. However it does not require triggers because it does not require having a _changelog_ table like the FB tool does. The reason it does not require a changelog table is that it finds the changelog in another place: the binary logs.
|
||||
|
||||
In particular, it reads Row Based Replication (RBR) entries (you can still [use it with Statement Based Replication!](migrating-with-sbr.md)) and searches for entries that apply to the original table.
|
||||
|
||||
RBR entries are very convenient for this job: they break complex statements, potentially multi-table, into distinct, per-table, per-row entries, which are easy to read and apply.
|
||||
|
||||
`gh-ost` pretends to be a MySQL replica: it connects to the MySQL server and begins requesting for binlog events as though it were a real replication server. Thus, it gets a continuous streaming of the binary logs, and filters out those events that apply to the original table.
|
||||
|
||||
`gh-ost` can connect directly to the master, but prefers to connect to one of its replicas. Such a replica would need to use `log-slave-updates` and use `binlog-format=ROW` (`gh-ost` can change the latter setting for you).
|
||||
|
||||
Reading from the binary log, specially in the case of reading those on a replica, further stresses the asynchronous nature of the algorithm. While the transaction _may_ (based on configuration) be synced with the binlog entry write, it will take time until `gh-ost` - pretending to be a replica - will get notification for that, copy the event downstream and apply it.
|
||||
|
||||
The asynchronous design implies many noteworthy outcomes, to be discussed later on.
|
||||
|
||||
### Workflow overview
|
||||
|
||||
The workflow includes reading table data from the server, reading event data from the binary log, checking for replication lag or other throttling parameters, applying changes onto the server (typically the master), sending hints through the binary log stream and more.
|
||||
|
||||
Some flow breakdown:
|
||||
|
||||
#### Initial setup & validation
|
||||
Initial setup is a no-concurrency operation
|
||||
|
||||
- Connecting to replica/master, detecting master identify
|
||||
- Pre-validating `alter` statement
|
||||
- Initial sanity: privileges, existence of tables
|
||||
- Creation of changelog and ghost tables.
|
||||
- Applying `alter` on ghost table
|
||||
- Comparing structure of original & ghost table. Looking for shared columns, shared unique keys, validating foreign keys. Choosing shared unique key, the key by which we chunk the table and process it.
|
||||
- Setting up the binlog listener; begin listening on changelog events
|
||||
- Injecting a "good to go" ebtry onto the changelog table (to be intercepted via binary logs)
|
||||
- Begin listening on binlog events for original table DMLs
|
||||
- Reading original table's chosen key min/max values
|
||||
|
||||
#### Copy flow
|
||||
This setup includes multiple moving parts, all acting concurrently with some coordination
|
||||
|
||||
- Setting up a heartbeat mechanism: frequent writes on the changelog table (we consider this to be low, negligible write load for throttling purposes)
|
||||
- Continuously updating status
|
||||
- Periodically (frequently) checking for potential throttle scenarios or hints
|
||||
- Work through the original table's rows range, chunk by chunk, queueing copy tasks onto the ghost table
|
||||
- Reading DML events from the binlogs, queueing apply tasks onto the ghost table
|
||||
- Processing the copy tasks queue and the apply tasks queue and sequentially applying onto ghost table
|
||||
- Suspending by throttle state
|
||||
- Injecting/intercepting "copy all done" once full row-copy range has been exhausted
|
||||
- Stall/postpone while `postpone-cut-over-flag-file` exists (we keep apply ongoing DMLs)
|
||||
|
||||
#### Cut-over and completion
|
||||
|
||||
- Locking the original table for writes, working on what remains on the binlog event backlog (recall this is an asynchronous operation, and so even as the table is locked, we still have unhandled events in our pipe).
|
||||
- Swapping the original table out, the ghost table in
|
||||
- Cleanup: potential drop of tables
|
||||
|
||||
### Asynchronous design implications
|
||||
|
||||
#### Cut-over phase
|
||||
|
||||
A complication the asynchronous approach presents is the cut-over phase: the swapping of the tables. In the synchronous approach, the two tables are kept in sync thanks to the transaction-space in which the triggers operate. Thus, a simple, atomic `rename table original to _original_old, ghost to original` suffices and is valid.
|
||||
|
||||
In the asynchronous approach, as we lock the original table, we often still have events in the pipeline, changes in the binary log we still need to apply onto the ghost table. An atomic swap would be a premature and incorrect solution, since it would imply the write load would immediately proceed to operate on what used to be the ghost table, even before we completed applying those last changes.
|
||||
|
||||
The Facebook solution uses an "outage", two-step rename:
|
||||
|
||||
- Lock the original table, work on backlog
|
||||
- Rename original table to `_old`
|
||||
- Rename ghost table to original
|
||||
|
||||
In between those two renames there's a point in time where the table does not exist, hence there's a "table outage".
|
||||
|
||||
`gh-ost` solves this by using an optimistic three-step locking algorithm. It is optimistic in that if no connection gets killed throughout this process, the cut-over is locking; queries are blocking on the original table and are unblocked after the ghost table has taken its place. Should any of the participating connections get killed throughout this process, the algorithm resort to "table outage" which is then rolled back.
|
||||
|
||||
Read more on the [cut-over](cut-over.md) documentation.
|
||||
|
||||
#### Decoupling
|
||||
|
||||
The most impacting change the triggerless, asynchronous approach provides is the decoupling of workload. With triggers, either synchronous or asynchronous, every write on your table implied an immediate write on another table.
|
||||
|
||||
We will break down the meaning of workload decoupling, shortly. But it is important to understand that `gh-ost` interprets the situation in its own time and acts in its own time, yet still makes this an online operation.
|
||||
|
||||
The decoupling is important not only as the tool's logic goes, but very importantly as the master server sees it. As far as the master knows, write to the table and writes to the ghost table are unrelated.
|
||||
|
||||
#### Writer load
|
||||
|
||||
Not using triggers means the master no longer needs to overload multiple, concurrent writes with stored routine interpretation combined with lock contention on the ghost table.
|
||||
|
||||
The responsibility for applying data to the ghost table is completely `gh-ost`'s. As such, `gh-ost` decides which data gets to be written to the ghost table and when. We are decoupled from the original table's write load, and choose to write to the ghost table in a single thread.
|
||||
|
||||
MySQL does not perform well on multiple concurrent massive writes to a specific table. Locking becomes an issue. This is why we choose to alternate between the massive row-copy and the ongoing binlog events backlog such that the server only sees writes from a single connection.
|
||||
|
||||
It is also interesting to observe that `gh-ost` is the only application writing to the ghost table. No one else is even aware of its existence. Thus, the trigger originated problem of high concurrency, high contention writes simply does not exist in `gh-ost`.
|
||||
|
||||
#### Pausability
|
||||
|
||||
When `gh-ost` pauses (throttles), it issues no writes on the ghost table. Because there are no triggers, write workload is decoupled from the `gh-ost` write workload. And because we're using an asynchronous approach, the algorithm already handles a time difference between a master write time and the ghost apply time. A difference of a few microseconds is no different from a difference of minutes or hours.
|
||||
|
||||
When `gh-ost` [throttles](throttle.md), either by replication lag, `max-load` setting or and explicit [interactive user command](interactive-commands.md), the master is back to normal. It sees no more writes on the ghost table.
|
||||
An exception is the ongoing heartbeat writes onto the changelog table, which we consider to be negligible.
|
||||
|
||||
#### Testability
|
||||
|
||||
We are able to test the migration process: as we've decoupled the migration operation from the master's workload, we are good to apply the changes not to the master, but to one of its replicas. We are able to migrate a table on a replica.
|
||||
|
||||
This in itself is a nice feature; but it also presents us with testability: just as we complete the migration, we stop replication on the replica. We cut-over but rollback again. We do not drop any table. The result is both the original and ghost table exist on the replica, which is not taking any further changes. We have time to examine the two tables and compare them to our satisfaction.
|
||||
|
||||
This is the method used by GitHub to continuously validate the tool's integrity: multiple production replicas are continuously and repeatedly doing a "trivial migration" (no actually change of column) on all our production tables. Each migration is followed by a checksum of the entire table data, on both original and ghost tables. We expect the checksums to be identical and we log the results. We expect zero failures.
|
||||
|
||||
#### Multiple, concurrent migrations
|
||||
|
||||
`gh-ost` was designed with having multiple concurrent migration running in parallel (no two on the same table, of course). The asynchronous approach supports that design by not caring when data is being shipped to the ghost table. The fact no triggers exist means multiple migrations appear to the master (or other migrated host) just as multiple connections, each writing to some otherwise unknown table. Each can throttle in its own time, or we can throttle all together.
|
||||
|
||||
#### Going outside the server space
|
||||
|
||||
More to come as we make progress.
|
||||
|
||||
#### Code complexity
|
||||
|
||||
With the synchronous, trigger based approach, the role of the migration tool is relatively small. A lot of the migration is based on the triggers doing their job within the transaction space. Issues such as rollback, datatypes, cut-over are implicitly taken care of by the database. With `gh-ost`'s asynchronous approach, the tool turns complex. It connects to the master and onto a replica; it imposes as a replicating server; it writes heartbeat events; it reads binlog data into the app to be written again onto the migrated host; it need to manage connection failures, replication lag, and more.
|
||||
|
||||
The tool has therefore a larger codebase and a more complicated asynchronous, concurrent logic. But we jumped the opportunity to add some [perks](perks.md) and completely redesign how an online migration tool should work.
|
@ -34,10 +34,6 @@ const (
|
||||
CutOverTwoStep = iota
|
||||
)
|
||||
|
||||
const (
|
||||
maxRetries = 60
|
||||
)
|
||||
|
||||
// MigrationContext has the general, global state of migration. It is used by
|
||||
// all components throughout the migration process.
|
||||
type MigrationContext struct {
|
||||
@ -58,6 +54,7 @@ type MigrationContext struct {
|
||||
CliUser string
|
||||
CliPassword string
|
||||
|
||||
defaultNumRetries int64
|
||||
ChunkSize int64
|
||||
MaxLagMillisecondsThrottleThreshold int64
|
||||
ReplictionLagQuery string
|
||||
@ -92,6 +89,7 @@ type MigrationContext struct {
|
||||
ApplierConnectionConfig *mysql.ConnectionConfig
|
||||
StartTime time.Time
|
||||
RowCopyStartTime time.Time
|
||||
RowCopyEndTime time.Time
|
||||
LockTablesStartTime time.Time
|
||||
RenameTablesStartTime time.Time
|
||||
RenameTablesEndTime time.Time
|
||||
@ -143,6 +141,7 @@ func init() {
|
||||
|
||||
func newMigrationContext() *MigrationContext {
|
||||
return &MigrationContext{
|
||||
defaultNumRetries: 60,
|
||||
ChunkSize: 1000,
|
||||
InspectorConnectionConfig: mysql.NewConnectionConfig(),
|
||||
ApplierConnectionConfig: mysql.NewConnectionConfig(),
|
||||
@ -202,8 +201,18 @@ func (this *MigrationContext) HasMigrationRange() bool {
|
||||
return this.MigrationRangeMinValues != nil && this.MigrationRangeMaxValues != nil
|
||||
}
|
||||
|
||||
func (this *MigrationContext) MaxRetries() int {
|
||||
return maxRetries
|
||||
func (this *MigrationContext) SetDefaultNumRetries(retries int64) {
|
||||
this.throttleMutex.Lock()
|
||||
defer this.throttleMutex.Unlock()
|
||||
if retries > 0 {
|
||||
this.defaultNumRetries = retries
|
||||
}
|
||||
}
|
||||
func (this *MigrationContext) MaxRetries() int64 {
|
||||
this.throttleMutex.Lock()
|
||||
defer this.throttleMutex.Unlock()
|
||||
retries := this.defaultNumRetries
|
||||
return retries
|
||||
}
|
||||
|
||||
func (this *MigrationContext) IsTransactionalTable() bool {
|
||||
@ -227,7 +236,20 @@ func (this *MigrationContext) ElapsedTime() time.Duration {
|
||||
|
||||
// ElapsedRowCopyTime returns time since starting to copy chunks of rows
|
||||
func (this *MigrationContext) ElapsedRowCopyTime() time.Duration {
|
||||
return time.Now().Sub(this.RowCopyStartTime)
|
||||
this.throttleMutex.Lock()
|
||||
defer this.throttleMutex.Unlock()
|
||||
|
||||
if this.RowCopyEndTime.IsZero() {
|
||||
return time.Now().Sub(this.RowCopyStartTime)
|
||||
}
|
||||
return this.RowCopyEndTime.Sub(this.RowCopyStartTime)
|
||||
}
|
||||
|
||||
// ElapsedRowCopyTime returns time since starting to copy chunks of rows
|
||||
func (this *MigrationContext) MarkRowCopyEndTime() {
|
||||
this.throttleMutex.Lock()
|
||||
defer this.throttleMutex.Unlock()
|
||||
this.RowCopyEndTime = time.Now()
|
||||
}
|
||||
|
||||
// GetTotalRowsCopied returns the accurate number of rows being copied (affected)
|
||||
|
@ -69,6 +69,7 @@ func main() {
|
||||
|
||||
flag.BoolVar(&migrationContext.SwitchToRowBinlogFormat, "switch-to-rbr", false, "let this tool automatically switch binary log format to 'ROW' on the replica, if needed. The format will NOT be switched back. I'm too scared to do that, and wish to protect you if you happen to execute another migration while this one is running")
|
||||
chunkSize := flag.Int64("chunk-size", 1000, "amount of rows to handle in each iteration (allowed range: 100-100,000)")
|
||||
defaultRetries := flag.Int64("default-retries", 60, "Default number of retries for various operations before panicking")
|
||||
|
||||
flag.Int64Var(&migrationContext.MaxLagMillisecondsThrottleThreshold, "max-lag-millis", 1500, "replication lag at which to throttle operation")
|
||||
flag.StringVar(&migrationContext.ReplictionLagQuery, "replication-lag-query", "", "Query that detects replication lag in seconds. Result can be a floating point (by default gh-ost issues SHOW SLAVE STATUS and reads Seconds_behind_master). If you're using pt-heartbeat, query would be something like: SELECT ROUND(UNIX_TIMESTAMP() - MAX(UNIX_TIMESTAMP(ts))) AS delay FROM my_schema.heartbeat")
|
||||
@ -165,6 +166,7 @@ func main() {
|
||||
migrationContext.ServeSocketFile = fmt.Sprintf("/tmp/gh-ost.%s.%s.sock", migrationContext.DatabaseName, migrationContext.OriginalTableName)
|
||||
}
|
||||
migrationContext.SetChunkSize(*chunkSize)
|
||||
migrationContext.SetDefaultNumRetries(*defaultRetries)
|
||||
migrationContext.ApplyCredentials()
|
||||
|
||||
log.Infof("starting gh-ost %+v", AppVersion)
|
||||
|
@ -20,8 +20,11 @@ import (
|
||||
"github.com/outbrain/golib/sqlutils"
|
||||
)
|
||||
|
||||
// Applier reads data from the read-MySQL-server (typically a replica, but can be the master)
|
||||
// It is used for gaining initial status and structure, and later also follow up on progress and changelog
|
||||
// Applier connects and writes the the applier-server, which is the server where migration
|
||||
// happens. This is typically the master, but could be a replica when `--test-on-replica` or
|
||||
// `--execute-on-replica` are given.
|
||||
// Applier is the one to actually write row data and apply binlog events onto the ghost table.
|
||||
// It is where the ghost & changelog tables get created. It is where the cut-over phase happens.
|
||||
type Applier struct {
|
||||
connectionConfig *mysql.ConnectionConfig
|
||||
db *gosql.DB
|
||||
@ -52,6 +55,11 @@ func (this *Applier) InitDBConnections() (err error) {
|
||||
if err := this.validateConnection(this.singletonDB); err != nil {
|
||||
return err
|
||||
}
|
||||
if impliedKey, err := mysql.GetInstanceKey(this.db); err != nil {
|
||||
return err
|
||||
} else {
|
||||
this.connectionConfig.ImpliedKey = impliedKey
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -69,6 +77,7 @@ func (this *Applier) validateConnection(db *gosql.DB) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// tableExists checks if a given table exists in database
|
||||
func (this *Applier) tableExists(tableName string) (tableFound bool) {
|
||||
query := fmt.Sprintf(`show /* gh-ost */ table status from %s like '%s'`, sql.EscapeName(this.migrationContext.DatabaseName), tableName)
|
||||
|
||||
@ -79,6 +88,8 @@ func (this *Applier) tableExists(tableName string) (tableFound bool) {
|
||||
return tableFound
|
||||
}
|
||||
|
||||
// ValidateOrDropExistingTables verifies ghost and changelog tables do not exist,
|
||||
// or attempts to drop them if instructed to.
|
||||
func (this *Applier) ValidateOrDropExistingTables() error {
|
||||
if this.migrationContext.InitiallyDropGhostTable {
|
||||
if err := this.DropGhostTable(); err != nil {
|
||||
@ -238,7 +249,7 @@ func (this *Applier) WriteChangelogState(value string) (string, error) {
|
||||
// InitiateHeartbeat creates a heartbeat cycle, writing to the changelog table.
|
||||
// This is done asynchronously
|
||||
func (this *Applier) InitiateHeartbeat(heartbeatIntervalMilliseconds int64) {
|
||||
numSuccessiveFailures := 0
|
||||
var numSuccessiveFailures int64
|
||||
injectHeartbeat := func() error {
|
||||
if _, err := this.WriteChangelog("heartbeat", time.Now().Format(time.RFC3339Nano)); err != nil {
|
||||
numSuccessiveFailures++
|
||||
@ -263,6 +274,7 @@ func (this *Applier) InitiateHeartbeat(heartbeatIntervalMilliseconds int64) {
|
||||
}
|
||||
}
|
||||
|
||||
// ExecuteThrottleQuery executes the `--throttle-query` and returns its results.
|
||||
func (this *Applier) ExecuteThrottleQuery() (int64, error) {
|
||||
throttleQuery := this.migrationContext.GetThrottleQuery()
|
||||
|
||||
@ -276,7 +288,7 @@ func (this *Applier) ExecuteThrottleQuery() (int64, error) {
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// ReadMigrationMinValues
|
||||
// ReadMigrationMinValues returns the minimum values to be iterated on rowcopy
|
||||
func (this *Applier) ReadMigrationMinValues(uniqueKey *sql.UniqueKey) error {
|
||||
log.Debugf("Reading migration range according to key: %s", uniqueKey.Name)
|
||||
query, err := sql.BuildUniqueKeyMinValuesPreparedQuery(this.migrationContext.DatabaseName, this.migrationContext.OriginalTableName, uniqueKey.Columns.Names)
|
||||
@ -297,7 +309,7 @@ func (this *Applier) ReadMigrationMinValues(uniqueKey *sql.UniqueKey) error {
|
||||
return err
|
||||
}
|
||||
|
||||
// ReadMigrationMinValues
|
||||
// ReadMigrationMaxValues returns the maximum values to be iterated on rowcopy
|
||||
func (this *Applier) ReadMigrationMaxValues(uniqueKey *sql.UniqueKey) error {
|
||||
log.Debugf("Reading migration range according to key: %s", uniqueKey.Name)
|
||||
query, err := sql.BuildUniqueKeyMaxValuesPreparedQuery(this.migrationContext.DatabaseName, this.migrationContext.OriginalTableName, uniqueKey.Columns.Names)
|
||||
@ -318,6 +330,7 @@ func (this *Applier) ReadMigrationMaxValues(uniqueKey *sql.UniqueKey) error {
|
||||
return err
|
||||
}
|
||||
|
||||
// ReadMigrationRangeValues reads min/max values that will be used for rowcopy
|
||||
func (this *Applier) ReadMigrationRangeValues() error {
|
||||
if err := this.ReadMigrationMinValues(this.migrationContext.UniqueKey); err != nil {
|
||||
return err
|
||||
@ -328,49 +341,6 @@ func (this *Applier) ReadMigrationRangeValues() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// __unused_IterationIsComplete lets us know when the copy-iteration phase is complete, i.e.
|
||||
// we've exhausted all rows
|
||||
func (this *Applier) __unused_IterationIsComplete() (bool, error) {
|
||||
if !this.migrationContext.HasMigrationRange() {
|
||||
return false, nil
|
||||
}
|
||||
if this.migrationContext.MigrationIterationRangeMinValues == nil {
|
||||
return false, nil
|
||||
}
|
||||
args := sqlutils.Args()
|
||||
compareWithIterationRangeStart, explodedArgs, err := sql.BuildRangePreparedComparison(this.migrationContext.UniqueKey.Columns.Names, this.migrationContext.MigrationIterationRangeMinValues.AbstractValues(), sql.GreaterThanOrEqualsComparisonSign)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
args = append(args, explodedArgs...)
|
||||
compareWithRangeEnd, explodedArgs, err := sql.BuildRangePreparedComparison(this.migrationContext.UniqueKey.Columns.Names, this.migrationContext.MigrationRangeMaxValues.AbstractValues(), sql.LessThanComparisonSign)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
args = append(args, explodedArgs...)
|
||||
query := fmt.Sprintf(`
|
||||
select /* gh-ost IterationIsComplete */ 1
|
||||
from %s.%s
|
||||
where (%s) and (%s)
|
||||
limit 1
|
||||
`,
|
||||
sql.EscapeName(this.migrationContext.DatabaseName),
|
||||
sql.EscapeName(this.migrationContext.OriginalTableName),
|
||||
compareWithIterationRangeStart,
|
||||
compareWithRangeEnd,
|
||||
)
|
||||
|
||||
moreRowsFound := false
|
||||
err = sqlutils.QueryRowsMap(this.db, query, func(rowMap sqlutils.RowMap) error {
|
||||
moreRowsFound = true
|
||||
return nil
|
||||
}, args...)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
return !moreRowsFound, nil
|
||||
}
|
||||
|
||||
// CalculateNextIterationRangeEndValues reads the next-iteration-range-end unique key values,
|
||||
// which will be used for copying the next chunk of rows. Ir returns "false" if there is
|
||||
// no further chunk to work through, i.e. we're past the last chunk and are done with
|
||||
@ -412,6 +382,8 @@ func (this *Applier) CalculateNextIterationRangeEndValues() (hasFurtherRange boo
|
||||
return hasFurtherRange, nil
|
||||
}
|
||||
|
||||
// ApplyIterationInsertQuery issues a chunk-INSERT query on the ghost table. It is where
|
||||
// data actually gets copied from original table.
|
||||
func (this *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected int64, duration time.Duration, err error) {
|
||||
startTime := time.Now()
|
||||
chunkSize = atomic.LoadInt64(&this.migrationContext.ChunkSize)
|
||||
@ -465,7 +437,7 @@ func (this *Applier) LockOriginalTable() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// UnlockTables
|
||||
// UnlockTables makes tea. No wait, it unlocks tables.
|
||||
func (this *Applier) UnlockTables() error {
|
||||
query := `unlock /* gh-ost */ tables`
|
||||
log.Infof("Unlocking tables")
|
||||
@ -476,7 +448,10 @@ func (this *Applier) UnlockTables() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// SwapTablesQuickAndBumpy
|
||||
// SwapTablesQuickAndBumpy issues a two-step swap table operation:
|
||||
// - rename original table to _old
|
||||
// - rename ghost table to original
|
||||
// There is a point in time in between where the table does not exist.
|
||||
func (this *Applier) SwapTablesQuickAndBumpy() error {
|
||||
query := fmt.Sprintf(`alter /* gh-ost */ table %s.%s rename %s`,
|
||||
sql.EscapeName(this.migrationContext.DatabaseName),
|
||||
@ -503,6 +478,7 @@ func (this *Applier) SwapTablesQuickAndBumpy() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// RenameTable makes coffee. No, wait. It renames a table.
|
||||
func (this *Applier) RenameTable(fromName, toName string) (err error) {
|
||||
query := fmt.Sprintf(`rename /* gh-ost */ table %s.%s to %s.%s`,
|
||||
sql.EscapeName(this.migrationContext.DatabaseName),
|
||||
@ -518,6 +494,8 @@ func (this *Applier) RenameTable(fromName, toName string) (err error) {
|
||||
return nil
|
||||
}
|
||||
|
||||
// RenameTablesRollback renames back both table: original back to ghost,
|
||||
// _old back to original. This is used by `--test-on-replica`
|
||||
func (this *Applier) RenameTablesRollback() (renameError error) {
|
||||
// Restoring tables to original names.
|
||||
// We prefer the single, atomic operation:
|
||||
@ -594,7 +572,8 @@ func (this *Applier) StartSlaveSQLThread() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (this *Applier) StopSlaveNicely() error {
|
||||
// StopReplication is used by `--test-on-replica` and stops replication.
|
||||
func (this *Applier) StopReplication() error {
|
||||
if err := this.StopSlaveIOThread(); err != nil {
|
||||
return err
|
||||
}
|
||||
@ -609,6 +588,7 @@ func (this *Applier) StopSlaveNicely() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetSessionLockName returns a name for the special hint session voluntary lock
|
||||
func (this *Applier) GetSessionLockName(sessionId int64) string {
|
||||
return fmt.Sprintf("gh-ost.%d.lock", sessionId)
|
||||
}
|
||||
@ -747,6 +727,7 @@ func (this *Applier) RenameGhostTable(sessionIdChan chan int64, ghostTableRename
|
||||
return nil
|
||||
}
|
||||
|
||||
// ExpectUsedLock expects the special hint voluntary lock to exist on given session
|
||||
func (this *Applier) ExpectUsedLock(sessionId int64) error {
|
||||
var result int64
|
||||
query := `select is_used_lock(?)`
|
||||
@ -758,6 +739,7 @@ func (this *Applier) ExpectUsedLock(sessionId int64) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// ExpectProcess expects a process to show up in `SHOW PROCESSLIST` that has given characteristics
|
||||
func (this *Applier) ExpectProcess(sessionId int64, stateHint, infoHint string) error {
|
||||
found := false
|
||||
query := `
|
||||
@ -790,6 +772,8 @@ func (this *Applier) ShowStatusVariable(variableName string) (result int64, err
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// buildDMLEventQuery creates a query to operate on the ghost table, based on an intercepted binlog
|
||||
// event entry on the original table.
|
||||
func (this *Applier) buildDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) (query string, args []interface{}, rowsDelta int64, err error) {
|
||||
switch dmlEvent.DML {
|
||||
case binlog.DeleteDML:
|
||||
@ -813,6 +797,8 @@ func (this *Applier) buildDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) (query
|
||||
return "", args, 0, fmt.Errorf("Unknown dml event type: %+v", dmlEvent.DML)
|
||||
}
|
||||
|
||||
// ApplyDMLEventQuery writes an entry to the ghost table, in response to an intercepted
|
||||
// original-table binlog event
|
||||
func (this *Applier) ApplyDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) error {
|
||||
query, args, rowDelta, err := this.buildDMLEventQuery(dmlEvent)
|
||||
if err != nil {
|
||||
|
@ -41,6 +41,11 @@ func (this *Inspector) InitDBConnections() (err error) {
|
||||
if err := this.validateConnection(); err != nil {
|
||||
return err
|
||||
}
|
||||
if impliedKey, err := mysql.GetInstanceKey(this.db); err != nil {
|
||||
return err
|
||||
} else {
|
||||
this.connectionConfig.ImpliedKey = impliedKey
|
||||
}
|
||||
if err := this.validateGrants(); err != nil {
|
||||
return err
|
||||
}
|
||||
@ -303,6 +308,7 @@ func (this *Inspector) validateTable() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// validateTableForeignKeys makes sure no foreign keys exist on the migrated table
|
||||
func (this *Inspector) validateTableForeignKeys() error {
|
||||
query := `
|
||||
SELECT COUNT(*) AS num_foreign_keys
|
||||
@ -334,6 +340,7 @@ func (this *Inspector) validateTableForeignKeys() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// estimateTableRowsViaExplain estimates number of rows on original table
|
||||
func (this *Inspector) estimateTableRowsViaExplain() error {
|
||||
query := fmt.Sprintf(`explain select /* gh-ost */ * from %s.%s where 1=1`, sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.OriginalTableName))
|
||||
|
||||
@ -355,6 +362,7 @@ func (this *Inspector) estimateTableRowsViaExplain() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// CountTableRows counts exact number of rows on the original table
|
||||
func (this *Inspector) CountTableRows() error {
|
||||
log.Infof("As instructed, I'm issuing a SELECT COUNT(*) on the table. This may take a while")
|
||||
query := fmt.Sprintf(`select /* gh-ost */ count(*) as rows from %s.%s`, sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.OriginalTableName))
|
||||
@ -366,6 +374,7 @@ func (this *Inspector) CountTableRows() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// getTableColumns reads column list from given table
|
||||
func (this *Inspector) getTableColumns(databaseName, tableName string) (*sql.ColumnList, error) {
|
||||
query := fmt.Sprintf(`
|
||||
show columns from %s.%s
|
||||
@ -507,6 +516,7 @@ func (this *Inspector) getSharedColumns(originalColumns, ghostColumns *sql.Colum
|
||||
return sql.NewColumnList(sharedColumnNames), sql.NewColumnList(mappedSharedColumnNames)
|
||||
}
|
||||
|
||||
// readChangelogState reads changelog hints
|
||||
func (this *Inspector) readChangelogState() (map[string]string, error) {
|
||||
query := fmt.Sprintf(`
|
||||
select hint, value from %s.%s where id <= 255
|
||||
|
@ -56,6 +56,7 @@ type Migrator struct {
|
||||
eventsStreamer *EventsStreamer
|
||||
server *Server
|
||||
migrationContext *base.MigrationContext
|
||||
hostname string
|
||||
|
||||
tablesInPlace chan bool
|
||||
rowCopyComplete chan bool
|
||||
@ -106,8 +107,9 @@ func (this *Migrator) acceptSignals() {
|
||||
}()
|
||||
}
|
||||
|
||||
// shouldThrottle performs checks to see whether we should currently be throttling.
|
||||
// It also checks for critical-load and panic aborts.
|
||||
func (this *Migrator) shouldThrottle() (result bool, reason string) {
|
||||
|
||||
// Regardless of throttle, we take opportunity to check for panic-abort
|
||||
if this.migrationContext.PanicFlagFile != "" {
|
||||
if base.FileExists(this.migrationContext.PanicFlagFile) {
|
||||
@ -177,6 +179,7 @@ func (this *Migrator) shouldThrottle() (result bool, reason string) {
|
||||
return false, ""
|
||||
}
|
||||
|
||||
// initiateThrottler initiates the throttle ticker and sets the basic behavior of throttling.
|
||||
func (this *Migrator) initiateThrottler() error {
|
||||
throttlerTick := time.Tick(1 * time.Second)
|
||||
|
||||
@ -235,7 +238,7 @@ func (this *Migrator) sleepWhileTrue(operation func() (bool, error)) error {
|
||||
// retryOperation attempts up to `count` attempts at running given function,
|
||||
// exiting as soon as it returns with non-error.
|
||||
func (this *Migrator) retryOperation(operation func() error) (err error) {
|
||||
maxRetries := this.migrationContext.MaxRetries()
|
||||
maxRetries := int(this.migrationContext.MaxRetries())
|
||||
for i := 0; i < maxRetries; i++ {
|
||||
if i != 0 {
|
||||
// sleep after previous iteration
|
||||
@ -266,6 +269,7 @@ func (this *Migrator) executeAndThrottleOnError(operation func() error) (err err
|
||||
func (this *Migrator) consumeRowCopyComplete() {
|
||||
<-this.rowCopyComplete
|
||||
atomic.StoreInt64(&this.rowCopyCompleteFlag, 1)
|
||||
this.migrationContext.MarkRowCopyEndTime()
|
||||
go func() {
|
||||
for <-this.rowCopyComplete {
|
||||
}
|
||||
@ -276,6 +280,7 @@ func (this *Migrator) canStopStreaming() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
// onChangelogStateEvent is called when a binlog event operation on the changelog table is intercepted.
|
||||
func (this *Migrator) onChangelogStateEvent(dmlEvent *binlog.BinlogDMLEvent) (err error) {
|
||||
// Hey, I created the changlog table, I know the type of columns it has!
|
||||
if hint := dmlEvent.NewColumnValues.StringColumn(2); hint != "state" {
|
||||
@ -311,6 +316,7 @@ func (this *Migrator) onChangelogStateEvent(dmlEvent *binlog.BinlogDMLEvent) (er
|
||||
return nil
|
||||
}
|
||||
|
||||
// onChangelogHeartbeat is called when a heartbeat event is intercepted
|
||||
func (this *Migrator) onChangelogHeartbeat(heartbeatValue string) (err error) {
|
||||
heartbeatTime, err := time.Parse(time.RFC3339Nano, heartbeatValue)
|
||||
if err != nil {
|
||||
@ -323,12 +329,15 @@ func (this *Migrator) onChangelogHeartbeat(heartbeatValue string) (err error) {
|
||||
return nil
|
||||
}
|
||||
|
||||
//
|
||||
// listenOnPanicAbort aborts on abort request
|
||||
func (this *Migrator) listenOnPanicAbort() {
|
||||
err := <-this.panicAbort
|
||||
log.Fatale(err)
|
||||
}
|
||||
|
||||
// validateStatement validates the `alter` statement meets criteria.
|
||||
// At this time this means:
|
||||
// - column renames are approved
|
||||
func (this *Migrator) validateStatement() (err error) {
|
||||
if this.parser.HasNonTrivialRenames() && !this.migrationContext.SkipRenamedColumns {
|
||||
this.migrationContext.ColumnRenameMap = this.parser.GetNonTrivialRenames()
|
||||
@ -340,9 +349,13 @@ func (this *Migrator) validateStatement() (err error) {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Migrate executes the complete migration logic. This is *the* major gh-ost function.
|
||||
func (this *Migrator) Migrate() (err error) {
|
||||
log.Infof("Migrating %s.%s", sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.OriginalTableName))
|
||||
this.migrationContext.StartTime = time.Now()
|
||||
if this.hostname, err = os.Hostname(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
go this.listenOnPanicAbort()
|
||||
|
||||
@ -432,7 +445,7 @@ func (this *Migrator) cutOver() (err error) {
|
||||
if base.FileExists(this.migrationContext.PostponeCutOverFlagFile) {
|
||||
// Throttle file defined and exists!
|
||||
atomic.StoreInt64(&this.migrationContext.IsPostponingCutOver, 1)
|
||||
log.Debugf("Postponing final table swap as flag file exists: %+v", this.migrationContext.PostponeCutOverFlagFile)
|
||||
//log.Debugf("Postponing final table swap as flag file exists: %+v", this.migrationContext.PostponeCutOverFlagFile)
|
||||
return true, nil
|
||||
}
|
||||
return false, nil
|
||||
@ -446,7 +459,7 @@ func (this *Migrator) cutOver() (err error) {
|
||||
// and swap the tables.
|
||||
// The difference is that we will later swap the tables back.
|
||||
log.Debugf("testing on replica. Stopping replication IO thread")
|
||||
if err := this.retryOperation(this.applier.StopSlaveNicely); err != nil {
|
||||
if err := this.retryOperation(this.applier.StopReplication); err != nil {
|
||||
return err
|
||||
}
|
||||
// We're merly testing, we don't want to keep this state. Rollback the renames as possible
|
||||
@ -621,7 +634,7 @@ func (this *Migrator) safeCutOver() (err error) {
|
||||
// in sync. There is no table swap.
|
||||
func (this *Migrator) stopWritesAndCompleteMigrationOnReplica() (err error) {
|
||||
log.Debugf("testing on replica. Instead of LOCK tables I will STOP SLAVE")
|
||||
if err := this.retryOperation(this.applier.StopSlaveNicely); err != nil {
|
||||
if err := this.retryOperation(this.applier.StopReplication); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@ -631,6 +644,7 @@ func (this *Migrator) stopWritesAndCompleteMigrationOnReplica() (err error) {
|
||||
return nil
|
||||
}
|
||||
|
||||
// onServerCommand responds to a user's interactive command
|
||||
func (this *Migrator) onServerCommand(command string, writer *bufio.Writer) (err error) {
|
||||
defer writer.Flush()
|
||||
|
||||
@ -704,6 +718,7 @@ help # This message
|
||||
return nil
|
||||
}
|
||||
|
||||
// initiateServer begins listening on unix socket/tcp for incoming interactive commands
|
||||
func (this *Migrator) initiateServer() (err error) {
|
||||
this.server = NewServer(this.onServerCommand)
|
||||
if err := this.server.BindSocketFile(); err != nil {
|
||||
@ -717,6 +732,13 @@ func (this *Migrator) initiateServer() (err error) {
|
||||
return nil
|
||||
}
|
||||
|
||||
// initiateInspector connects, validates and inspects the "inspector" server.
|
||||
// The "inspector" server is typically a replica; it is where we issue some
|
||||
// queries such as:
|
||||
// - table row count
|
||||
// - schema validation
|
||||
// - heartbeat
|
||||
// When `--allow-on-master` is supplied, the inspector is actually the master.
|
||||
func (this *Migrator) initiateInspector() (err error) {
|
||||
this.inspector = NewInspector()
|
||||
if err := this.inspector.InitDBConnections(); err != nil {
|
||||
@ -738,7 +760,7 @@ func (this *Migrator) initiateInspector() (err error) {
|
||||
return fmt.Errorf("Instructed to --test-on-replica or --migrate-on-replica, but the server we connect to doesn't seem to be a replica")
|
||||
}
|
||||
log.Infof("--test-on-replica or --migrate-on-replica given. Will not execute on master %+v but rather on replica %+v itself",
|
||||
this.migrationContext.ApplierConnectionConfig.Key, this.migrationContext.InspectorConnectionConfig.Key,
|
||||
*this.migrationContext.ApplierConnectionConfig.ImpliedKey, *this.migrationContext.InspectorConnectionConfig.ImpliedKey,
|
||||
)
|
||||
this.migrationContext.ApplierConnectionConfig = this.migrationContext.InspectorConnectionConfig.Duplicate()
|
||||
if this.migrationContext.ThrottleControlReplicaKeys.Len() == 0 {
|
||||
@ -748,10 +770,11 @@ func (this *Migrator) initiateInspector() (err error) {
|
||||
return fmt.Errorf("It seems like this migration attempt to run directly on master. Preferably it would be executed on a replica (and this reduces load from the master). To proceed please provide --allow-on-master")
|
||||
}
|
||||
|
||||
log.Infof("Master found to be %+v", this.migrationContext.ApplierConnectionConfig.Key)
|
||||
log.Infof("Master found to be %+v", *this.migrationContext.ApplierConnectionConfig.ImpliedKey)
|
||||
return nil
|
||||
}
|
||||
|
||||
// initiateStatus sets and activates the printStatus() ticker
|
||||
func (this *Migrator) initiateStatus() error {
|
||||
this.printStatus(ForcePrintStatusAndHint)
|
||||
statusTick := time.Tick(1 * time.Second)
|
||||
@ -762,6 +785,10 @@ func (this *Migrator) initiateStatus() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// printMigrationStatusHint prints a detailed configuration dump, that is useful
|
||||
// to keep in mind; such as the name of migrated table, throttle params etc.
|
||||
// This gets printed at beginning and end of migration, every 10 minutes throughout
|
||||
// migration, and as reponse to the "status" interactive command.
|
||||
func (this *Migrator) printMigrationStatusHint(writers ...io.Writer) {
|
||||
w := io.MultiWriter(writers...)
|
||||
fmt.Fprintln(w, fmt.Sprintf("# Migrating %s.%s; Ghost table is %s.%s",
|
||||
@ -770,6 +797,11 @@ func (this *Migrator) printMigrationStatusHint(writers ...io.Writer) {
|
||||
sql.EscapeName(this.migrationContext.DatabaseName),
|
||||
sql.EscapeName(this.migrationContext.GetGhostTableName()),
|
||||
))
|
||||
fmt.Fprintln(w, fmt.Sprintf("# Migrating %+v; inspecting %+v; executing on %+v",
|
||||
*this.applier.connectionConfig.ImpliedKey,
|
||||
*this.inspector.connectionConfig.ImpliedKey,
|
||||
this.hostname,
|
||||
))
|
||||
fmt.Fprintln(w, fmt.Sprintf("# Migration started at %+v",
|
||||
this.migrationContext.StartTime.Format(time.RubyDate),
|
||||
))
|
||||
@ -814,6 +846,11 @@ func (this *Migrator) printMigrationStatusHint(writers ...io.Writer) {
|
||||
}
|
||||
}
|
||||
|
||||
// printStatus prints the prgoress status, and optionally additionally detailed
|
||||
// dump of configuration.
|
||||
// `rule` indicates the type of output expected.
|
||||
// By default the status is written to standard output, but other writers can
|
||||
// be used as well.
|
||||
func (this *Migrator) printStatus(rule PrintStatusRule, writers ...io.Writer) {
|
||||
writers = append(writers, os.Stdout)
|
||||
|
||||
@ -878,11 +915,11 @@ func (this *Migrator) printStatus(rule PrintStatusRule, writers ...io.Writer) {
|
||||
|
||||
currentBinlogCoordinates := *this.eventsStreamer.GetCurrentBinlogCoordinates()
|
||||
|
||||
status := fmt.Sprintf("Copy: %d/%d %.1f%%; Applied: %d; Backlog: %d/%d; Elapsed: %+v(copy), %+v(total); streamer: %+v; ETA: %s",
|
||||
status := fmt.Sprintf("Copy: %d/%d %.1f%%; Applied: %d; Backlog: %d/%d; Time: %+v(total), %+v(copy); streamer: %+v; ETA: %s",
|
||||
totalRowsCopied, rowsEstimate, progressPct,
|
||||
atomic.LoadInt64(&this.migrationContext.TotalDMLEventsApplied),
|
||||
len(this.applyEventsQueue), cap(this.applyEventsQueue),
|
||||
base.PrettifyDurationOutput(this.migrationContext.ElapsedRowCopyTime()), base.PrettifyDurationOutput(elapsedTime),
|
||||
base.PrettifyDurationOutput(elapsedTime), base.PrettifyDurationOutput(this.migrationContext.ElapsedRowCopyTime()),
|
||||
currentBinlogCoordinates,
|
||||
eta,
|
||||
)
|
||||
@ -894,6 +931,9 @@ func (this *Migrator) printStatus(rule PrintStatusRule, writers ...io.Writer) {
|
||||
fmt.Fprintln(w, status)
|
||||
}
|
||||
|
||||
// initiateHeartbeatListener listens for heartbeat events. gh-ost implements its own
|
||||
// heartbeat mechanism, whether your DB has or hasn't an existing heartbeat solution.
|
||||
// Heartbeat is supplied via the changelog table
|
||||
func (this *Migrator) initiateHeartbeatListener() {
|
||||
ticker := time.Tick((heartbeatIntervalMilliseconds * time.Millisecond) / 2)
|
||||
for range ticker {
|
||||
@ -944,13 +984,15 @@ func (this *Migrator) initiateStreaming() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// addDMLEventsListener
|
||||
// addDMLEventsListener begins listening for binlog events on the original table,
|
||||
// and creates & enqueues a write task per such event.
|
||||
func (this *Migrator) addDMLEventsListener() error {
|
||||
err := this.eventsStreamer.AddListener(
|
||||
false,
|
||||
this.migrationContext.DatabaseName,
|
||||
this.migrationContext.OriginalTableName,
|
||||
func(dmlEvent *binlog.BinlogDMLEvent) error {
|
||||
// Create a task to apply the DML event; this will be execute by executeWriteFuncs()
|
||||
applyEventFunc := func() error {
|
||||
return this.applier.ApplyDMLEventQuery(dmlEvent)
|
||||
}
|
||||
@ -987,6 +1029,8 @@ func (this *Migrator) initiateApplier() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// iterateChunks iterates the existing table rows, and generates a copy task of
|
||||
// a chunk of rows onto the ghost table.
|
||||
func (this *Migrator) iterateChunks() error {
|
||||
terminateRowIteration := func(err error) error {
|
||||
this.rowCopyComplete <- true
|
||||
@ -1000,6 +1044,7 @@ func (this *Migrator) iterateChunks() error {
|
||||
log.Debugf("No rows found in table. Rowcopy will be implicitly empty")
|
||||
return terminateRowIteration(nil)
|
||||
}
|
||||
// Iterate per chunk:
|
||||
for {
|
||||
if atomic.LoadInt64(&this.rowCopyCompleteFlag) == 1 {
|
||||
// Done
|
||||
@ -1013,6 +1058,7 @@ func (this *Migrator) iterateChunks() error {
|
||||
if !hasFurtherRange {
|
||||
return terminateRowIteration(nil)
|
||||
}
|
||||
// Copy task:
|
||||
applyCopyRowsFunc := func() error {
|
||||
_, rowsAffected, _, err := this.applier.ApplyIterationInsertQuery()
|
||||
if err != nil {
|
||||
@ -1024,11 +1070,15 @@ func (this *Migrator) iterateChunks() error {
|
||||
}
|
||||
return this.retryOperation(applyCopyRowsFunc)
|
||||
}
|
||||
// Enqueue copy operation; to be executed by executeWriteFuncs()
|
||||
this.copyRowsQueue <- copyRowsFunc
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// executeWriteFuncs writes data via applier: both the rowcopy and the events backlog.
|
||||
// This is where the ghost table gets the data. The function fills the data single-threaded.
|
||||
// Both event backlog and rowcopy events are polled; the backlog events have precedence.
|
||||
func (this *Migrator) executeWriteFuncs() error {
|
||||
if this.migrationContext.Noop {
|
||||
log.Debugf("Noop operation; not really executing write funcs")
|
||||
|
@ -59,6 +59,7 @@ func (this *Server) BindTCPPort() (err error) {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Serve begins listening & serving on whichever device was configured
|
||||
func (this *Server) Serve() (err error) {
|
||||
go func() {
|
||||
for {
|
||||
|
@ -55,6 +55,7 @@ func NewEventsStreamer() *EventsStreamer {
|
||||
}
|
||||
}
|
||||
|
||||
// AddListener registers a new listener for binlog events, on a per-table basis
|
||||
func (this *EventsStreamer) AddListener(
|
||||
async bool, databaseName string, tableName string, onDmlEvent func(event *binlog.BinlogDMLEvent) error) (err error) {
|
||||
|
||||
@ -77,6 +78,8 @@ func (this *EventsStreamer) AddListener(
|
||||
return nil
|
||||
}
|
||||
|
||||
// notifyListeners will notify relevant listeners with given DML event. Only
|
||||
// listeners registered for changes on the table on which the DML operates are notified.
|
||||
func (this *EventsStreamer) notifyListeners(binlogEvent *binlog.BinlogDMLEvent) {
|
||||
this.listenersMutex.Lock()
|
||||
defer this.listenersMutex.Unlock()
|
||||
@ -117,6 +120,7 @@ func (this *EventsStreamer) InitDBConnections() (err error) {
|
||||
return nil
|
||||
}
|
||||
|
||||
// initBinlogReader creates and connects the reader: we hook up to a MySQL server as a replica
|
||||
func (this *EventsStreamer) initBinlogReader(binlogCoordinates *mysql.BinlogCoordinates) error {
|
||||
goMySQLReader, err := binlog.NewGoMySQLReader(this.migrationContext.InspectorConnectionConfig)
|
||||
if err != nil {
|
||||
@ -151,8 +155,7 @@ func (this *EventsStreamer) GetReconnectBinlogCoordinates() *mysql.BinlogCoordin
|
||||
return &mysql.BinlogCoordinates{LogFile: this.GetCurrentBinlogCoordinates().LogFile, LogPos: 4}
|
||||
}
|
||||
|
||||
// validateGrants verifies the user by which we're executing has necessary grants
|
||||
// to do its thang.
|
||||
// readCurrentBinlogCoordinates reads master status from hooked server
|
||||
func (this *EventsStreamer) readCurrentBinlogCoordinates() error {
|
||||
query := `show /* gh-ost readCurrentBinlogCoordinates */ master status`
|
||||
foundMasterStatus := false
|
||||
|
@ -11,15 +11,17 @@ import (
|
||||
|
||||
// ConnectionConfig is the minimal configuration required to connect to a MySQL server
|
||||
type ConnectionConfig struct {
|
||||
Key InstanceKey
|
||||
User string
|
||||
Password string
|
||||
Key InstanceKey
|
||||
User string
|
||||
Password string
|
||||
ImpliedKey *InstanceKey
|
||||
}
|
||||
|
||||
func NewConnectionConfig() *ConnectionConfig {
|
||||
config := &ConnectionConfig{
|
||||
Key: InstanceKey{},
|
||||
}
|
||||
config.ImpliedKey = &config.Key
|
||||
return config
|
||||
}
|
||||
|
||||
@ -32,6 +34,7 @@ func (this *ConnectionConfig) Duplicate() *ConnectionConfig {
|
||||
User: this.User,
|
||||
Password: this.Password,
|
||||
}
|
||||
config.ImpliedKey = &config.Key
|
||||
return config
|
||||
}
|
||||
|
||||
@ -40,7 +43,7 @@ func (this *ConnectionConfig) String() string {
|
||||
}
|
||||
|
||||
func (this *ConnectionConfig) Equals(other *ConnectionConfig) bool {
|
||||
return this.Key.Equals(&other.Key)
|
||||
return this.Key.Equals(&other.Key) || this.ImpliedKey.Equals(other.ImpliedKey)
|
||||
}
|
||||
|
||||
func (this *ConnectionConfig) GetDBUri(databaseName string) string {
|
||||
|
@ -133,3 +133,10 @@ func GetSelfBinlogCoordinates(db *gosql.DB) (selfBinlogCoordinates *BinlogCoordi
|
||||
})
|
||||
return selfBinlogCoordinates, err
|
||||
}
|
||||
|
||||
// GetInstanceKey reads hostname and port on given DB
|
||||
func GetInstanceKey(db *gosql.DB) (instanceKey *InstanceKey, err error) {
|
||||
instanceKey = &InstanceKey{}
|
||||
err = db.QueryRow(`select @@global.hostname, @@global.port`).Scan(&instanceKey.Hostname, &instanceKey.Port)
|
||||
return instanceKey, err
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user