gh-ost/go/logic/applier.go

1197 lines
43 KiB
Go
Raw Normal View History

/*
Copyright 2022 GitHub Inc.
2016-05-16 09:09:17 +00:00
See https://github.com/github/gh-ost/blob/master/LICENSE
*/
package logic
import (
gosql "database/sql"
"fmt"
"strings"
"sync"
"sync/atomic"
"time"
2016-05-16 09:09:17 +00:00
"github.com/github/gh-ost/go/base"
"github.com/github/gh-ost/go/binlog"
"github.com/github/gh-ost/go/mysql"
"github.com/github/gh-ost/go/sql"
"github.com/openark/golib/log"
"github.com/openark/golib/sqlutils"
)
const (
2022-06-06 23:49:18 +00:00
GhostChangelogTableComment = "gh-ost changelog"
atomicCutOverMagicHint = "ghost-cut-over-sentry"
)
type dmlBuildResult struct {
query string
args []interface{}
rowsDelta int64
err error
}
func newDmlBuildResult(query string, args []interface{}, rowsDelta int64, err error) *dmlBuildResult {
return &dmlBuildResult{
query: query,
args: args,
rowsDelta: rowsDelta,
err: err,
}
}
func newDmlBuildResultError(err error) *dmlBuildResult {
return &dmlBuildResult{
err: err,
}
}
// Applier connects and writes the applier-server, which is the server where migration
// happens. This is typically the master, but could be a replica when `--test-on-replica` or
// `--execute-on-replica` are given.
// Applier is the one to actually write row data and apply binlog events onto the ghost table.
// It is where the ghost & changelog tables get created. It is where the cut-over phase happens.
type Applier struct {
2017-08-08 22:31:25 +00:00
connectionConfig *mysql.ConnectionConfig
db *gosql.DB
singletonDB *gosql.DB
migrationContext *base.MigrationContext
finishedMigrating int64
name string
}
2017-08-08 20:36:54 +00:00
func NewApplier(migrationContext *base.MigrationContext) *Applier {
return &Applier{
2017-08-08 22:31:25 +00:00
connectionConfig: migrationContext.ApplierConnectionConfig,
migrationContext: migrationContext,
finishedMigrating: 0,
name: "applier",
}
}
func (this *Applier) InitDBConnections() (err error) {
applierUri := this.connectionConfig.GetDBUri(this.migrationContext.DatabaseName)
if this.db, _, err = mysql.GetDB(this.migrationContext.Uuid, applierUri); err != nil {
return err
}
singletonApplierUri := fmt.Sprintf("%s&timeout=0", applierUri)
if this.singletonDB, _, err = mysql.GetDB(this.migrationContext.Uuid, singletonApplierUri); err != nil {
return err
}
this.singletonDB.SetMaxOpenConns(1)
version, err := base.ValidateConnection(this.db, this.connectionConfig, this.migrationContext, this.name)
Allow gh-ost to modify the server using extra port Both Percona and Maria allow MySQL to be configured to listen on an extra port when their thread pool is enable. * https://www.percona.com/doc/percona-server/5.7/performance/threadpool.html * https://mariadb.com/kb/en/the-mariadb-library/thread-pool-in-mariadb-51-53/ This is valuable because if the table has a lot of traffic (read or write load), gh-ost can end up starving the thread pool as incomming connections are immediately blocked. By using gh-ost on the extra port, MySQL locking will still behave the same, but MySQL will keep a dedicated thread for each gh-ost connection. When doing this, it's important to inspect the extra-max-connections variable. Both Percona and Maria default to 1, so gh-ost may easily exceed with its threads. An example local run using this ``` $ mysql -S /tmp/mysql_sandbox20393.sock -e "select @@global.port, @@global.extra_port" +---------------+---------------------+ | @@global.port | @@global.extra_port | +---------------+---------------------+ | 20393 | 30393 | +---------------+---------------------+ ./bin/gh-ost \ --initially-drop-ghost-table \ --initially-drop-old-table \ --assume-rbr \ --port="20395" \ --assume-master-host="127.0.0.1:30393" \ --max-load=Threads_running=25 \ --critical-load=Threads_running=1000 \ --chunk-size=1000 \ --max-lag-millis=1500 \ --user="gh-ost" \ --password="gh-ost" \ --database="test" \ --table="mytable" \ --verbose \ --alter="ADD mynewcol decimal(11,2) DEFAULT 0.0 NOT NULL" \ --exact-rowcount \ --concurrent-rowcount \ --default-retries=120 \ --panic-flag-file=/tmp/ghost.panic.flag \ --postpone-cut-over-flag-file=/tmp/ghost.postpone.flag \ --execute ```
2017-09-06 18:25:35 +00:00
if err != nil {
return err
}
if _, err := base.ValidateConnection(this.singletonDB, this.connectionConfig, this.migrationContext, this.name); err != nil {
return err
}
Allow gh-ost to modify the server using extra port Both Percona and Maria allow MySQL to be configured to listen on an extra port when their thread pool is enable. * https://www.percona.com/doc/percona-server/5.7/performance/threadpool.html * https://mariadb.com/kb/en/the-mariadb-library/thread-pool-in-mariadb-51-53/ This is valuable because if the table has a lot of traffic (read or write load), gh-ost can end up starving the thread pool as incomming connections are immediately blocked. By using gh-ost on the extra port, MySQL locking will still behave the same, but MySQL will keep a dedicated thread for each gh-ost connection. When doing this, it's important to inspect the extra-max-connections variable. Both Percona and Maria default to 1, so gh-ost may easily exceed with its threads. An example local run using this ``` $ mysql -S /tmp/mysql_sandbox20393.sock -e "select @@global.port, @@global.extra_port" +---------------+---------------------+ | @@global.port | @@global.extra_port | +---------------+---------------------+ | 20393 | 30393 | +---------------+---------------------+ ./bin/gh-ost \ --initially-drop-ghost-table \ --initially-drop-old-table \ --assume-rbr \ --port="20395" \ --assume-master-host="127.0.0.1:30393" \ --max-load=Threads_running=25 \ --critical-load=Threads_running=1000 \ --chunk-size=1000 \ --max-lag-millis=1500 \ --user="gh-ost" \ --password="gh-ost" \ --database="test" \ --table="mytable" \ --verbose \ --alter="ADD mynewcol decimal(11,2) DEFAULT 0.0 NOT NULL" \ --exact-rowcount \ --concurrent-rowcount \ --default-retries=120 \ --panic-flag-file=/tmp/ghost.panic.flag \ --postpone-cut-over-flag-file=/tmp/ghost.postpone.flag \ --execute ```
2017-09-06 18:25:35 +00:00
this.migrationContext.ApplierMySQLVersion = version
if err := this.validateAndReadTimeZone(); err != nil {
return err
}
2019-12-17 03:47:14 +00:00
if !this.migrationContext.AliyunRDS && !this.migrationContext.GoogleCloudPlatform && !this.migrationContext.AzureMySQL {
if impliedKey, err := mysql.GetInstanceKey(this.db); err != nil {
return err
} else {
this.connectionConfig.ImpliedKey = impliedKey
}
}
if err := this.readTableColumns(); err != nil {
return err
}
2019-10-07 15:10:36 +00:00
this.migrationContext.Log.Infof("Applier initiated on %+v, version %+v", this.connectionConfig.ImpliedKey, this.migrationContext.ApplierMySQLVersion)
return nil
}
// validateAndReadTimeZone potentially reads server time-zone
func (this *Applier) validateAndReadTimeZone() error {
2016-10-14 10:56:43 +00:00
query := `select @@global.time_zone`
if err := this.db.QueryRow(query).Scan(&this.migrationContext.ApplierTimeZone); err != nil {
return err
}
2016-10-14 10:56:43 +00:00
2019-10-07 15:10:36 +00:00
this.migrationContext.Log.Infof("will use time_zone='%s' on applier", this.migrationContext.ApplierTimeZone)
return nil
}
// generateSqlModeQuery return a `sql_mode = ...` query, to be wrapped with a `set session` or `set global`,
// based on gh-ost configuration:
// - User may skip strict mode
// - User may allow zero dats or zero in dates
func (this *Applier) generateSqlModeQuery() string {
sqlModeAddendum := []string{`NO_AUTO_VALUE_ON_ZERO`}
if !this.migrationContext.SkipStrictMode {
sqlModeAddendum = append(sqlModeAddendum, `STRICT_ALL_TABLES`)
}
sqlModeQuery := fmt.Sprintf("CONCAT(@@session.sql_mode, ',%s')", strings.Join(sqlModeAddendum, ","))
if this.migrationContext.AllowZeroInDate {
sqlModeQuery = fmt.Sprintf("REPLACE(REPLACE(%s, 'NO_ZERO_IN_DATE', ''), 'NO_ZERO_DATE', '')", sqlModeQuery)
}
return fmt.Sprintf("sql_mode = %s", sqlModeQuery)
}
// readTableColumns reads table columns on applier
func (this *Applier) readTableColumns() (err error) {
2019-10-07 15:10:36 +00:00
this.migrationContext.Log.Infof("Examining table structure on applier")
this.migrationContext.OriginalTableColumnsOnApplier, _, err = mysql.GetTableColumns(this.db, this.migrationContext.DatabaseName, this.migrationContext.OriginalTableName)
if err != nil {
return err
}
return nil
}
// showTableStatus returns the output of `show table status like '...'` command
func (this *Applier) showTableStatus(tableName string) (rowMap sqlutils.RowMap) {
2016-05-16 09:09:17 +00:00
query := fmt.Sprintf(`show /* gh-ost */ table status from %s like '%s'`, sql.EscapeName(this.migrationContext.DatabaseName), tableName)
sqlutils.QueryRowsMap(this.db, query, func(m sqlutils.RowMap) error {
rowMap = m
return nil
})
return rowMap
}
// tableExists checks if a given table exists in database
func (this *Applier) tableExists(tableName string) (tableFound bool) {
m := this.showTableStatus(tableName)
return (m != nil)
}
// ValidateOrDropExistingTables verifies ghost and changelog tables do not exist,
// or attempts to drop them if instructed to.
func (this *Applier) ValidateOrDropExistingTables() error {
if this.migrationContext.InitiallyDropGhostTable {
if err := this.DropGhostTable(); err != nil {
return err
}
}
if this.tableExists(this.migrationContext.GetGhostTableName()) {
2016-08-22 06:49:27 +00:00
return fmt.Errorf("Table %s already exists. Panicking. Use --initially-drop-ghost-table to force dropping it, though I really prefer that you drop it or rename it away", sql.EscapeName(this.migrationContext.GetGhostTableName()))
}
if this.migrationContext.InitiallyDropOldTable {
if err := this.DropOldTable(); err != nil {
return err
}
}
2017-02-22 00:34:49 +00:00
if len(this.migrationContext.GetOldTableName()) > mysql.MaxTableNameLength {
2019-10-07 15:10:36 +00:00
this.migrationContext.Log.Fatalf("--timestamp-old-table defined, but resulting table name (%s) is too long (only %d characters allowed)", this.migrationContext.GetOldTableName(), mysql.MaxTableNameLength)
2017-02-22 00:34:49 +00:00
}
if this.tableExists(this.migrationContext.GetOldTableName()) {
2016-08-22 06:49:27 +00:00
return fmt.Errorf("Table %s already exists. Panicking. Use --initially-drop-old-table to force dropping it, though I really prefer that you drop it or rename it away", sql.EscapeName(this.migrationContext.GetOldTableName()))
}
return nil
}
2022-11-10 03:11:49 +00:00
// AttemptInstantDDL attempts to use instant DDL (from MySQL 8.0, and earlier in Aurora and some others.)
// to apply the ALTER statement immediately. If it errors, the original
// gh-ost algorithm can be used. However, if it's successful -- a lot
// of time can potentially be saved. Instant operations include:
// - Adding a column
// - Dropping a column
// - Dropping an index
// - Extending a varchar column
// It is safer to attempt the change than try and parse the DDL, since
// there might be specifics about the table which make it not possible to apply instantly.
func (this *Applier) AttemptInstantDDL() error {
query := fmt.Sprintf(`ALTER /* gh-ost */ TABLE %s.%s %s, ALGORITHM=INSTANT`,
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.OriginalTableName),
this.migrationContext.AlterStatementOptions,
)
2022-11-10 16:30:13 +00:00
this.migrationContext.Log.Infof("INSTANT DDL query is: %s", query)
// We don't need a trx, because for instant DDL the SQL mode doesn't matter.
_, err := this.db.Exec(query)
2022-11-10 03:11:49 +00:00
return err
}
// CreateGhostTable creates the ghost table on the applier host
func (this *Applier) CreateGhostTable() error {
2016-05-16 09:09:17 +00:00
query := fmt.Sprintf(`create /* gh-ost */ table %s.%s like %s.%s`,
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.GetGhostTableName()),
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.OriginalTableName),
)
2019-10-07 15:10:36 +00:00
this.migrationContext.Log.Infof("Creating ghost table %s.%s",
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.GetGhostTableName()),
)
err := func() error {
tx, err := this.db.Begin()
if err != nil {
return err
}
defer tx.Rollback()
sessionQuery := fmt.Sprintf(`SET SESSION time_zone = '%s'`, this.migrationContext.ApplierTimeZone)
sessionQuery = fmt.Sprintf("%s, %s", sessionQuery, this.generateSqlModeQuery())
if _, err := tx.Exec(sessionQuery); err != nil {
return err
}
if _, err := tx.Exec(query); err != nil {
return err
}
this.migrationContext.Log.Infof("Ghost table created")
if err := tx.Commit(); err != nil {
// Neither SET SESSION nor ALTER are really transactional, so strictly speaking
// there's no need to commit; but let's do this the legit way anyway.
return err
}
return nil
}()
return err
}
// AlterGhost applies `alter` statement on ghost table
func (this *Applier) AlterGhost() error {
2016-05-16 09:09:17 +00:00
query := fmt.Sprintf(`alter /* gh-ost */ table %s.%s %s`,
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.GetGhostTableName()),
2020-07-23 08:38:05 +00:00
this.migrationContext.AlterStatementOptions,
)
2019-10-07 15:10:36 +00:00
this.migrationContext.Log.Infof("Altering ghost table %s.%s",
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.GetGhostTableName()),
)
2019-10-07 15:10:36 +00:00
this.migrationContext.Log.Debugf("ALTER statement: %s", query)
err := func() error {
tx, err := this.db.Begin()
if err != nil {
return err
}
defer tx.Rollback()
sessionQuery := fmt.Sprintf(`SET SESSION time_zone = '%s'`, this.migrationContext.ApplierTimeZone)
sessionQuery = fmt.Sprintf("%s, %s", sessionQuery, this.generateSqlModeQuery())
if _, err := tx.Exec(sessionQuery); err != nil {
return err
}
if _, err := tx.Exec(query); err != nil {
return err
}
this.migrationContext.Log.Infof("Ghost table altered")
if err := tx.Commit(); err != nil {
// Neither SET SESSION nor ALTER are really transactional, so strictly speaking
// there's no need to commit; but let's do this the legit way anyway.
return err
}
return nil
}()
return err
}
// AlterGhost applies `alter` statement on ghost table
func (this *Applier) AlterGhostAutoIncrement() error {
query := fmt.Sprintf(`alter /* gh-ost */ table %s.%s AUTO_INCREMENT=%d`,
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.GetGhostTableName()),
this.migrationContext.OriginalTableAutoIncrement,
)
this.migrationContext.Log.Infof("Altering ghost table AUTO_INCREMENT value %s.%s",
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.GetGhostTableName()),
)
this.migrationContext.Log.Debugf("AUTO_INCREMENT ALTER statement: %s", query)
if _, err := sqlutils.ExecNoPrepare(this.db, query); err != nil {
return err
}
this.migrationContext.Log.Infof("Ghost table AUTO_INCREMENT altered")
return nil
}
// CreateChangelogTable creates the changelog table on the applier host
func (this *Applier) CreateChangelogTable() error {
if err := this.DropChangelogTable(); err != nil {
return err
}
2016-05-16 09:09:17 +00:00
query := fmt.Sprintf(`create /* gh-ost */ table %s.%s (
2022-06-06 23:47:51 +00:00
id bigint unsigned auto_increment,
last_update timestamp not null DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
hint varchar(64) charset ascii not null,
2017-09-03 07:27:04 +00:00
value varchar(4096) charset ascii not null,
primary key(id),
unique key hint_uidx(hint)
2022-06-06 23:47:51 +00:00
) auto_increment=256 comment='%s'`,
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.GetChangelogTableName()),
2022-06-06 23:49:18 +00:00
GhostChangelogTableComment,
)
2019-10-07 15:10:36 +00:00
this.migrationContext.Log.Infof("Creating changelog table %s.%s",
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.GetChangelogTableName()),
)
if _, err := sqlutils.ExecNoPrepare(this.db, query); err != nil {
return err
}
2019-10-07 15:10:36 +00:00
this.migrationContext.Log.Infof("Changelog table created")
return nil
}
// dropTable drops a given table on the applied host
func (this *Applier) dropTable(tableName string) error {
2016-05-16 09:09:17 +00:00
query := fmt.Sprintf(`drop /* gh-ost */ table if exists %s.%s`,
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(tableName),
)
2019-10-07 15:10:36 +00:00
this.migrationContext.Log.Infof("Dropping table %s.%s",
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(tableName),
)
if _, err := sqlutils.ExecNoPrepare(this.db, query); err != nil {
return err
}
2019-10-07 15:10:36 +00:00
this.migrationContext.Log.Infof("Table dropped")
return nil
}
// DropChangelogTable drops the changelog table on the applier host
func (this *Applier) DropChangelogTable() error {
return this.dropTable(this.migrationContext.GetChangelogTableName())
}
// DropOldTable drops the _Old table on the applier host
func (this *Applier) DropOldTable() error {
return this.dropTable(this.migrationContext.GetOldTableName())
}
// DropGhostTable drops the ghost table on the applier host
func (this *Applier) DropGhostTable() error {
return this.dropTable(this.migrationContext.GetGhostTableName())
}
// WriteChangelog writes a value to the changelog table.
// It returns the hint as given, for convenience
func (this *Applier) WriteChangelog(hint, value string) (string, error) {
explicitId := 0
switch hint {
case "heartbeat":
explicitId = 1
case "state":
explicitId = 2
case "throttle":
explicitId = 3
}
query := fmt.Sprintf(`
2016-05-16 09:09:17 +00:00
insert /* gh-ost */ into %s.%s
(id, hint, value)
values
(NULLIF(?, 0), ?, ?)
on duplicate key update
last_update=NOW(),
value=VALUES(value)
`,
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.GetChangelogTableName()),
)
_, err := sqlutils.ExecNoPrepare(this.db, query, explicitId, hint, value)
return hint, err
}
func (this *Applier) WriteAndLogChangelog(hint, value string) (string, error) {
this.WriteChangelog(hint, value)
return this.WriteChangelog(fmt.Sprintf("%s at %d", hint, time.Now().UnixNano()), value)
}
func (this *Applier) WriteChangelogState(value string) (string, error) {
return this.WriteAndLogChangelog("state", value)
}
// InitiateHeartbeat creates a heartbeat cycle, writing to the changelog table.
// This is done asynchronously
func (this *Applier) InitiateHeartbeat() {
var numSuccessiveFailures int64
injectHeartbeat := func() error {
2017-05-24 05:32:13 +00:00
if atomic.LoadInt64(&this.migrationContext.HibernateUntil) > 0 {
return nil
}
if _, err := this.WriteChangelog("heartbeat", time.Now().Format(time.RFC3339Nano)); err != nil {
numSuccessiveFailures++
if numSuccessiveFailures > this.migrationContext.MaxRetries() {
2019-10-07 15:10:36 +00:00
return this.migrationContext.Log.Errore(err)
}
} else {
numSuccessiveFailures = 0
}
return nil
}
injectHeartbeat()
ticker := time.NewTicker(time.Duration(this.migrationContext.HeartbeatIntervalMilliseconds) * time.Millisecond)
defer ticker.Stop()
2022-07-07 03:05:37 +00:00
for range ticker.C {
if atomic.LoadInt64(&this.finishedMigrating) > 0 {
2017-08-08 22:31:25 +00:00
return
}
// Generally speaking, we would issue a goroutine, but I'd actually rather
// have this block the loop rather than spam the master in the event something
// goes wrong
if throttle, _, reasonHint := this.migrationContext.IsThrottled(); throttle && (reasonHint == base.UserCommandThrottleReasonHint) {
continue
}
if err := injectHeartbeat(); err != nil {
return
}
}
}
// ExecuteThrottleQuery executes the `--throttle-query` and returns its results.
func (this *Applier) ExecuteThrottleQuery() (int64, error) {
throttleQuery := this.migrationContext.GetThrottleQuery()
if throttleQuery == "" {
return 0, nil
}
var result int64
if err := this.db.QueryRow(throttleQuery).Scan(&result); err != nil {
2019-10-07 15:10:36 +00:00
return 0, this.migrationContext.Log.Errore(err)
}
return result, nil
}
// readMigrationMinValues returns the minimum values to be iterated on rowcopy
func (this *Applier) readMigrationMinValues(tx *gosql.Tx, uniqueKey *sql.UniqueKey) error {
2019-10-07 15:10:36 +00:00
this.migrationContext.Log.Debugf("Reading migration range according to key: %s", uniqueKey.Name)
2016-10-19 13:22:29 +00:00
query, err := sql.BuildUniqueKeyMinValuesPreparedQuery(this.migrationContext.DatabaseName, this.migrationContext.OriginalTableName, &uniqueKey.Columns)
if err != nil {
return err
}
rows, err := tx.Query(query)
if err != nil {
return err
}
defer rows.Close()
for rows.Next() {
this.migrationContext.MigrationRangeMinValues = sql.NewColumnValues(uniqueKey.Len())
if err = rows.Scan(this.migrationContext.MigrationRangeMinValues.ValuesPointers...); err != nil {
return err
}
}
2019-10-07 15:10:36 +00:00
this.migrationContext.Log.Infof("Migration min values: [%s]", this.migrationContext.MigrationRangeMinValues)
2020-04-21 16:50:23 +00:00
return rows.Err()
}
// readMigrationMaxValues returns the maximum values to be iterated on rowcopy
func (this *Applier) readMigrationMaxValues(tx *gosql.Tx, uniqueKey *sql.UniqueKey) error {
2019-10-07 15:10:36 +00:00
this.migrationContext.Log.Debugf("Reading migration range according to key: %s", uniqueKey.Name)
2016-10-19 13:22:29 +00:00
query, err := sql.BuildUniqueKeyMaxValuesPreparedQuery(this.migrationContext.DatabaseName, this.migrationContext.OriginalTableName, &uniqueKey.Columns)
if err != nil {
return err
}
rows, err := tx.Query(query)
if err != nil {
return err
}
defer rows.Close()
for rows.Next() {
this.migrationContext.MigrationRangeMaxValues = sql.NewColumnValues(uniqueKey.Len())
if err = rows.Scan(this.migrationContext.MigrationRangeMaxValues.ValuesPointers...); err != nil {
return err
}
}
2019-10-07 15:10:36 +00:00
this.migrationContext.Log.Infof("Migration max values: [%s]", this.migrationContext.MigrationRangeMaxValues)
2020-04-21 16:50:23 +00:00
return rows.Err()
}
// ReadMigrationRangeValues reads min/max values that will be used for rowcopy.
// Before read min/max, write a changelog state into the ghc table to avoid lost data in mysql two-phase commit.
/*
Detail description of the lost data in mysql two-phase commit issue by @Fanduzi:
When using semi-sync and setting rpl_semi_sync_master_wait_point=AFTER_SYNC,
if an INSERT statement is being committed but blocks due to an unmet ack count,
the data inserted by the transaction is not visible to ReadMigrationRangeValues,
so the copy of the existing data in the table does not include the new row inserted by the transaction.
However, the binlog event for the transaction is already written to the binlog,
so the addDMLEventsListener only captures the binlog event after the transaction,
and thus the transaction's binlog event is not captured, resulting in data loss.
If write a changelog into ghc table before ReadMigrationRangeValues, and the transaction commit blocks
because the ack is not met, then the changelog will not be able to write, so the ReadMigrationRangeValues
will not be run. When the changelog writes successfully, the ReadMigrationRangeValues will read the
newly inserted data, thus Avoiding data loss due to the above problem.
*/
func (this *Applier) ReadMigrationRangeValues() error {
if _, err := this.WriteChangelogState(string(ReadMigrationRangeValues)); err != nil {
return err
}
tx, err := this.db.Begin()
if err != nil {
return err
}
defer tx.Rollback()
if err := this.readMigrationMinValues(tx, this.migrationContext.UniqueKey); err != nil {
return err
}
if err := this.readMigrationMaxValues(tx, this.migrationContext.UniqueKey); err != nil {
return err
}
return tx.Commit()
}
// CalculateNextIterationRangeEndValues reads the next-iteration-range-end unique key values,
// which will be used for copying the next chunk of rows. Ir returns "false" if there is
// no further chunk to work through, i.e. we're past the last chunk and are done with
2017-11-08 00:49:23 +00:00
// iterating the range (and this done with copying row chunks)
func (this *Applier) CalculateNextIterationRangeEndValues() (hasFurtherRange bool, err error) {
this.migrationContext.MigrationIterationRangeMinValues = this.migrationContext.MigrationIterationRangeMaxValues
if this.migrationContext.MigrationIterationRangeMinValues == nil {
this.migrationContext.MigrationIterationRangeMinValues = this.migrationContext.MigrationRangeMinValues
}
2017-08-21 05:12:41 +00:00
for i := 0; i < 2; i++ {
buildFunc := sql.BuildUniqueKeyRangeEndPreparedQueryViaOffset
if i == 1 {
buildFunc = sql.BuildUniqueKeyRangeEndPreparedQueryViaTemptable
}
query, explodedArgs, err := buildFunc(
this.migrationContext.DatabaseName,
this.migrationContext.OriginalTableName,
&this.migrationContext.UniqueKey.Columns,
this.migrationContext.MigrationIterationRangeMinValues.AbstractValues(),
this.migrationContext.MigrationRangeMaxValues.AbstractValues(),
atomic.LoadInt64(&this.migrationContext.ChunkSize),
this.migrationContext.GetIteration() == 0,
fmt.Sprintf("iteration:%d", this.migrationContext.GetIteration()),
)
if err != nil {
return hasFurtherRange, err
}
2017-08-21 05:12:41 +00:00
rows, err := this.db.Query(query, explodedArgs...)
if err != nil {
return hasFurtherRange, err
}
defer rows.Close()
2017-08-21 05:12:41 +00:00
iterationRangeMaxValues := sql.NewColumnValues(this.migrationContext.UniqueKey.Len())
for rows.Next() {
if err = rows.Scan(iterationRangeMaxValues.ValuesPointers...); err != nil {
return hasFurtherRange, err
}
hasFurtherRange = true
}
if err = rows.Err(); err != nil {
return hasFurtherRange, err
}
2017-08-21 05:12:41 +00:00
if hasFurtherRange {
this.migrationContext.MigrationIterationRangeMaxValues = iterationRangeMaxValues
return hasFurtherRange, nil
}
}
2019-10-07 15:10:36 +00:00
this.migrationContext.Log.Debugf("Iteration complete: no further range to iterate")
return hasFurtherRange, nil
}
// ApplyIterationInsertQuery issues a chunk-INSERT query on the ghost table. It is where
// data actually gets copied from original table.
func (this *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected int64, duration time.Duration, err error) {
startTime := time.Now()
chunkSize = atomic.LoadInt64(&this.migrationContext.ChunkSize)
query, explodedArgs, err := sql.BuildRangeInsertPreparedQuery(
this.migrationContext.DatabaseName,
this.migrationContext.OriginalTableName,
this.migrationContext.GetGhostTableName(),
this.migrationContext.SharedColumns.Names(),
this.migrationContext.MappedSharedColumns.Names(),
this.migrationContext.UniqueKey.Name,
2016-10-19 13:22:29 +00:00
&this.migrationContext.UniqueKey.Columns,
this.migrationContext.MigrationIterationRangeMinValues.AbstractValues(),
this.migrationContext.MigrationIterationRangeMaxValues.AbstractValues(),
this.migrationContext.GetIteration() == 0,
this.migrationContext.IsTransactionalTable(),
)
if err != nil {
return chunkSize, rowsAffected, duration, err
}
2016-10-08 09:06:27 +00:00
sqlResult, err := func() (gosql.Result, error) {
tx, err := this.db.Begin()
if err != nil {
return nil, err
}
2018-04-27 06:58:07 +00:00
defer tx.Rollback()
2019-02-14 14:58:49 +00:00
sessionQuery := fmt.Sprintf(`SET SESSION time_zone = '%s'`, this.migrationContext.ApplierTimeZone)
sessionQuery = fmt.Sprintf("%s, %s", sessionQuery, this.generateSqlModeQuery())
2019-03-24 09:32:42 +00:00
if _, err := tx.Exec(sessionQuery); err != nil {
2016-10-08 09:06:27 +00:00
return nil, err
}
result, err := tx.Exec(query, explodedArgs...)
if err != nil {
return nil, err
}
if err := tx.Commit(); err != nil {
return nil, err
}
return result, nil
}()
if err != nil {
return chunkSize, rowsAffected, duration, err
}
rowsAffected, _ = sqlResult.RowsAffected()
duration = time.Since(startTime)
2019-10-07 15:10:36 +00:00
this.migrationContext.Log.Debugf(
"Issued INSERT on range: [%s]..[%s]; iteration: %d; chunk-size: %d",
this.migrationContext.MigrationIterationRangeMinValues,
this.migrationContext.MigrationIterationRangeMaxValues,
this.migrationContext.GetIteration(),
chunkSize)
return chunkSize, rowsAffected, duration, nil
}
// LockOriginalTable places a write lock on the original table
func (this *Applier) LockOriginalTable() error {
2016-05-16 09:09:17 +00:00
query := fmt.Sprintf(`lock /* gh-ost */ tables %s.%s write`,
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.OriginalTableName),
)
2019-10-07 15:10:36 +00:00
this.migrationContext.Log.Infof("Locking %s.%s",
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.OriginalTableName),
)
this.migrationContext.LockTablesStartTime = time.Now()
if _, err := sqlutils.ExecNoPrepare(this.singletonDB, query); err != nil {
return err
}
2019-10-07 15:10:36 +00:00
this.migrationContext.Log.Infof("Table locked")
return nil
}
// UnlockTables makes tea. No wait, it unlocks tables.
func (this *Applier) UnlockTables() error {
2016-05-16 09:09:17 +00:00
query := `unlock /* gh-ost */ tables`
2019-10-07 15:10:36 +00:00
this.migrationContext.Log.Infof("Unlocking tables")
if _, err := sqlutils.ExecNoPrepare(this.singletonDB, query); err != nil {
return err
}
2019-10-07 15:10:36 +00:00
this.migrationContext.Log.Infof("Tables unlocked")
return nil
}
// SwapTablesQuickAndBumpy issues a two-step swap table operation:
// - rename original table to _old
// - rename ghost table to original
// There is a point in time in between where the table does not exist.
func (this *Applier) SwapTablesQuickAndBumpy() error {
2016-05-16 09:09:17 +00:00
query := fmt.Sprintf(`alter /* gh-ost */ table %s.%s rename %s`,
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.OriginalTableName),
sql.EscapeName(this.migrationContext.GetOldTableName()),
)
2019-10-07 15:10:36 +00:00
this.migrationContext.Log.Infof("Renaming original table")
this.migrationContext.RenameTablesStartTime = time.Now()
if _, err := sqlutils.ExecNoPrepare(this.singletonDB, query); err != nil {
return err
}
2016-05-16 09:09:17 +00:00
query = fmt.Sprintf(`alter /* gh-ost */ table %s.%s rename %s`,
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.GetGhostTableName()),
sql.EscapeName(this.migrationContext.OriginalTableName),
)
2019-10-07 15:10:36 +00:00
this.migrationContext.Log.Infof("Renaming ghost table")
if _, err := sqlutils.ExecNoPrepare(this.db, query); err != nil {
return err
}
this.migrationContext.RenameTablesEndTime = time.Now()
2019-10-07 15:10:36 +00:00
this.migrationContext.Log.Infof("Tables renamed")
return nil
}
// RenameTablesRollback renames back both table: original back to ghost,
// _old back to original. This is used by `--test-on-replica`
func (this *Applier) RenameTablesRollback() (renameError error) {
// Restoring tables to original names.
// We prefer the single, atomic operation:
query := fmt.Sprintf(`rename /* gh-ost */ table %s.%s to %s.%s, %s.%s to %s.%s`,
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.OriginalTableName),
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.GetGhostTableName()),
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.GetOldTableName()),
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.OriginalTableName),
)
2019-10-07 15:10:36 +00:00
this.migrationContext.Log.Infof("Renaming back both tables")
if _, err := sqlutils.ExecNoPrepare(this.db, query); err == nil {
return nil
}
// But, if for some reason the above was impossible to do, we rename one by one.
query = fmt.Sprintf(`rename /* gh-ost */ table %s.%s to %s.%s`,
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.OriginalTableName),
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.GetGhostTableName()),
)
2019-10-07 15:10:36 +00:00
this.migrationContext.Log.Infof("Renaming back to ghost table")
if _, err := sqlutils.ExecNoPrepare(this.db, query); err != nil {
renameError = err
}
query = fmt.Sprintf(`rename /* gh-ost */ table %s.%s to %s.%s`,
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.GetOldTableName()),
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.OriginalTableName),
)
2019-10-07 15:10:36 +00:00
this.migrationContext.Log.Infof("Renaming back to original table")
if _, err := sqlutils.ExecNoPrepare(this.db, query); err != nil {
renameError = err
}
2019-10-07 15:10:36 +00:00
return this.migrationContext.Log.Errore(renameError)
}
// StopSlaveIOThread is applicable with --test-on-replica; it stops the IO thread, duh.
// We need to keep the SQL thread active so as to complete processing received events,
// and have them written to the binary log, so that we can then read them via streamer.
func (this *Applier) StopSlaveIOThread() error {
2016-05-16 09:09:17 +00:00
query := `stop /* gh-ost */ slave io_thread`
2019-10-07 15:10:36 +00:00
this.migrationContext.Log.Infof("Stopping replication IO thread")
if _, err := sqlutils.ExecNoPrepare(this.db, query); err != nil {
return err
}
2019-10-07 15:10:36 +00:00
this.migrationContext.Log.Infof("Replication IO thread stopped")
return nil
}
// StartSlaveIOThread is applicable with --test-on-replica
func (this *Applier) StartSlaveIOThread() error {
query := `start /* gh-ost */ slave io_thread`
2019-10-07 15:10:36 +00:00
this.migrationContext.Log.Infof("Starting replication IO thread")
if _, err := sqlutils.ExecNoPrepare(this.db, query); err != nil {
return err
}
2019-10-07 15:10:36 +00:00
this.migrationContext.Log.Infof("Replication IO thread started")
return nil
}
// StartSlaveSQLThread is applicable with --test-on-replica
func (this *Applier) StopSlaveSQLThread() error {
query := `stop /* gh-ost */ slave sql_thread`
2019-10-07 15:10:36 +00:00
this.migrationContext.Log.Infof("Verifying SQL thread is stopped")
if _, err := sqlutils.ExecNoPrepare(this.db, query); err != nil {
return err
}
2019-10-07 15:10:36 +00:00
this.migrationContext.Log.Infof("SQL thread stopped")
return nil
}
// StartSlaveSQLThread is applicable with --test-on-replica
func (this *Applier) StartSlaveSQLThread() error {
query := `start /* gh-ost */ slave sql_thread`
2019-10-07 15:10:36 +00:00
this.migrationContext.Log.Infof("Verifying SQL thread is running")
if _, err := sqlutils.ExecNoPrepare(this.db, query); err != nil {
return err
}
2019-10-07 15:10:36 +00:00
this.migrationContext.Log.Infof("SQL thread started")
return nil
}
// StopReplication is used by `--test-on-replica` and stops replication.
func (this *Applier) StopReplication() error {
if err := this.StopSlaveIOThread(); err != nil {
return err
}
if err := this.StopSlaveSQLThread(); err != nil {
return err
}
readBinlogCoordinates, executeBinlogCoordinates, err := mysql.GetReplicationBinlogCoordinates(this.db)
if err != nil {
return err
}
2019-10-07 15:10:36 +00:00
this.migrationContext.Log.Infof("Replication IO thread at %+v. SQL thread is at %+v", *readBinlogCoordinates, *executeBinlogCoordinates)
return nil
}
// StartReplication is used by `--test-on-replica` on cut-over failure
func (this *Applier) StartReplication() error {
if err := this.StartSlaveIOThread(); err != nil {
return err
}
if err := this.StartSlaveSQLThread(); err != nil {
return err
}
2019-10-07 15:10:36 +00:00
this.migrationContext.Log.Infof("Replication started")
return nil
}
// GetSessionLockName returns a name for the special hint session voluntary lock
func (this *Applier) GetSessionLockName(sessionId int64) string {
return fmt.Sprintf("gh-ost.%d.lock", sessionId)
}
// ExpectUsedLock expects the special hint voluntary lock to exist on given session
func (this *Applier) ExpectUsedLock(sessionId int64) error {
var result int64
query := `select is_used_lock(?)`
lockName := this.GetSessionLockName(sessionId)
2019-10-07 15:10:36 +00:00
this.migrationContext.Log.Infof("Checking session lock: %s", lockName)
if err := this.db.QueryRow(query, lockName).Scan(&result); err != nil || result != sessionId {
return fmt.Errorf("Session lock %s expected to be found but wasn't", lockName)
}
return nil
}
// ExpectProcess expects a process to show up in `SHOW PROCESSLIST` that has given characteristics
func (this *Applier) ExpectProcess(sessionId int64, stateHint, infoHint string) error {
found := false
query := `
select id
from information_schema.processlist
where
id != connection_id()
and ? in (0, id)
and state like concat('%', ?, '%')
and info like concat('%', ?, '%')
`
err := sqlutils.QueryRowsMap(this.db, query, func(m sqlutils.RowMap) error {
found = true
return nil
}, sessionId, stateHint, infoHint)
if err != nil {
return err
}
if !found {
return fmt.Errorf("Cannot find process. Hints: %s, %s", stateHint, infoHint)
}
return nil
}
// DropAtomicCutOverSentryTableIfExists checks if the "old" table name
// happens to be a cut-over magic table; if so, it drops it.
func (this *Applier) DropAtomicCutOverSentryTableIfExists() error {
2019-10-07 15:10:36 +00:00
this.migrationContext.Log.Infof("Looking for magic cut-over table")
tableName := this.migrationContext.GetOldTableName()
rowMap := this.showTableStatus(tableName)
if rowMap == nil {
// Table does not exist
return nil
}
if rowMap["Comment"].String != atomicCutOverMagicHint {
return fmt.Errorf("Expected magic comment on %s, did not find it", tableName)
}
2019-10-07 15:10:36 +00:00
this.migrationContext.Log.Infof("Dropping magic cut-over table")
return this.dropTable(tableName)
}
2016-11-17 16:10:17 +00:00
// CreateAtomicCutOverSentryTable
func (this *Applier) CreateAtomicCutOverSentryTable() error {
if err := this.DropAtomicCutOverSentryTableIfExists(); err != nil {
return err
}
tableName := this.migrationContext.GetOldTableName()
query := fmt.Sprintf(`create /* gh-ost */ table %s.%s (
id int auto_increment primary key
) engine=%s comment='%s'
`,
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(tableName),
this.migrationContext.TableEngine,
atomicCutOverMagicHint,
)
2019-10-07 15:10:36 +00:00
this.migrationContext.Log.Infof("Creating magic cut-over table %s.%s",
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(tableName),
)
if _, err := sqlutils.ExecNoPrepare(this.db, query); err != nil {
return err
}
2019-10-07 15:10:36 +00:00
this.migrationContext.Log.Infof("Magic cut-over table created")
return nil
}
// AtomicCutOverMagicLock
func (this *Applier) AtomicCutOverMagicLock(sessionIdChan chan int64, tableLocked chan<- error, okToUnlockTable <-chan bool, tableUnlocked chan<- error, dropCutOverSentryTableOnce *sync.Once) error {
tx, err := this.db.Begin()
if err != nil {
tableLocked <- err
return err
}
defer func() {
sessionIdChan <- -1
tableLocked <- fmt.Errorf("Unexpected error in AtomicCutOverMagicLock(), injected to release blocking channel reads")
tableUnlocked <- fmt.Errorf("Unexpected error in AtomicCutOverMagicLock(), injected to release blocking channel reads")
tx.Rollback()
}()
var sessionId int64
if err := tx.QueryRow(`select connection_id()`).Scan(&sessionId); err != nil {
tableLocked <- err
return err
}
sessionIdChan <- sessionId
lockResult := 0
query := `select get_lock(?, 0)`
lockName := this.GetSessionLockName(sessionId)
2019-10-07 15:10:36 +00:00
this.migrationContext.Log.Infof("Grabbing voluntary lock: %s", lockName)
if err := tx.QueryRow(query, lockName).Scan(&lockResult); err != nil || lockResult != 1 {
err := fmt.Errorf("Unable to acquire lock %s", lockName)
tableLocked <- err
return err
}
tableLockTimeoutSeconds := this.migrationContext.CutOverLockTimeoutSeconds * 2
2019-10-07 15:10:36 +00:00
this.migrationContext.Log.Infof("Setting LOCK timeout as %d seconds", tableLockTimeoutSeconds)
query = fmt.Sprintf(`set session lock_wait_timeout:=%d`, tableLockTimeoutSeconds)
if _, err := tx.Exec(query); err != nil {
tableLocked <- err
return err
}
if err := this.CreateAtomicCutOverSentryTable(); err != nil {
tableLocked <- err
return err
}
query = fmt.Sprintf(`lock /* gh-ost */ tables %s.%s write, %s.%s write`,
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.OriginalTableName),
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.GetOldTableName()),
)
2019-10-07 15:10:36 +00:00
this.migrationContext.Log.Infof("Locking %s.%s, %s.%s",
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.OriginalTableName),
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.GetOldTableName()),
)
this.migrationContext.LockTablesStartTime = time.Now()
if _, err := tx.Exec(query); err != nil {
tableLocked <- err
return err
}
2019-10-07 15:10:36 +00:00
this.migrationContext.Log.Infof("Tables locked")
tableLocked <- nil // No error.
// From this point on, we are committed to UNLOCK TABLES. No matter what happens,
// the UNLOCK must execute (or, alternatively, this connection dies, which gets the same impact)
// The cut-over phase will proceed to apply remaining backlog onto ghost table,
// and issue RENAME. We wait here until told to proceed.
<-okToUnlockTable
2019-10-07 15:10:36 +00:00
this.migrationContext.Log.Infof("Will now proceed to drop magic table and unlock tables")
// The magic table is here because we locked it. And we are the only ones allowed to drop it.
// And in fact, we will:
2019-10-07 15:10:36 +00:00
this.migrationContext.Log.Infof("Dropping magic cut-over table")
query = fmt.Sprintf(`drop /* gh-ost */ table if exists %s.%s`,
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.GetOldTableName()),
)
dropCutOverSentryTableOnce.Do(func() {
if _, err := tx.Exec(query); err != nil {
this.migrationContext.Log.Errore(err)
// We DO NOT return here because we must `UNLOCK TABLES`!
}
})
// Tables still locked
2019-10-07 15:10:36 +00:00
this.migrationContext.Log.Infof("Releasing lock from %s.%s, %s.%s",
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.OriginalTableName),
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.GetOldTableName()),
)
query = `unlock tables`
if _, err := tx.Exec(query); err != nil {
tableUnlocked <- err
2019-10-07 15:10:36 +00:00
return this.migrationContext.Log.Errore(err)
}
2019-10-07 15:10:36 +00:00
this.migrationContext.Log.Infof("Tables unlocked")
tableUnlocked <- nil
return nil
}
// AtomicCutoverRename
func (this *Applier) AtomicCutoverRename(sessionIdChan chan int64, tablesRenamed chan<- error) error {
tx, err := this.db.Begin()
if err != nil {
return err
}
defer func() {
tx.Rollback()
sessionIdChan <- -1
tablesRenamed <- fmt.Errorf("Unexpected error in AtomicCutoverRename(), injected to release blocking channel reads")
}()
var sessionId int64
if err := tx.QueryRow(`select connection_id()`).Scan(&sessionId); err != nil {
return err
}
sessionIdChan <- sessionId
2019-10-07 15:10:36 +00:00
this.migrationContext.Log.Infof("Setting RENAME timeout as %d seconds", this.migrationContext.CutOverLockTimeoutSeconds)
query := fmt.Sprintf(`set session lock_wait_timeout:=%d`, this.migrationContext.CutOverLockTimeoutSeconds)
if _, err := tx.Exec(query); err != nil {
return err
}
query = fmt.Sprintf(`rename /* gh-ost */ table %s.%s to %s.%s, %s.%s to %s.%s`,
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.OriginalTableName),
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.GetOldTableName()),
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.GetGhostTableName()),
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.OriginalTableName),
)
2019-10-07 15:10:36 +00:00
this.migrationContext.Log.Infof("Issuing and expecting this to block: %s", query)
if _, err := tx.Exec(query); err != nil {
tablesRenamed <- err
2019-10-07 15:10:36 +00:00
return this.migrationContext.Log.Errore(err)
}
tablesRenamed <- nil
2019-10-07 15:10:36 +00:00
this.migrationContext.Log.Infof("Tables renamed")
return nil
}
func (this *Applier) ShowStatusVariable(variableName string) (result int64, err error) {
query := fmt.Sprintf(`show global status like '%s'`, variableName)
if err := this.db.QueryRow(query).Scan(&variableName, &result); err != nil {
return 0, err
}
return result, nil
}
2017-11-21 07:14:04 +00:00
// updateModifiesUniqueKeyColumns checks whether a UPDATE DML event actually
// modifies values of the migration's unique key (the iterated key). This will call
// for special handling.
func (this *Applier) updateModifiesUniqueKeyColumns(dmlEvent *binlog.BinlogDMLEvent) (modifiedColumn string, isModified bool) {
2017-11-08 09:11:17 +00:00
for _, column := range this.migrationContext.UniqueKey.Columns.Columns() {
tableOrdinal := this.migrationContext.OriginalTableColumns.Ordinals[column.Name]
whereColumnValue := dmlEvent.WhereColumnValues.AbstractValues()[tableOrdinal]
newColumnValue := dmlEvent.NewColumnValues.AbstractValues()[tableOrdinal]
if newColumnValue != whereColumnValue {
return column.Name, true
2017-11-08 09:11:17 +00:00
}
}
return "", false
2017-11-08 09:11:17 +00:00
}
// buildDMLEventQuery creates a query to operate on the ghost table, based on an intercepted binlog
// event entry on the original table.
func (this *Applier) buildDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) (results [](*dmlBuildResult)) {
switch dmlEvent.DML {
case binlog.DeleteDML:
{
query, uniqueKeyArgs, err := sql.BuildDMLDeleteQuery(dmlEvent.DatabaseName, this.migrationContext.GetGhostTableName(), this.migrationContext.OriginalTableColumns, &this.migrationContext.UniqueKey.Columns, dmlEvent.WhereColumnValues.AbstractValues())
2017-11-20 08:24:28 +00:00
return append(results, newDmlBuildResult(query, uniqueKeyArgs, -1, err))
}
case binlog.InsertDML:
{
query, sharedArgs, err := sql.BuildDMLInsertQuery(dmlEvent.DatabaseName, this.migrationContext.GetGhostTableName(), this.migrationContext.OriginalTableColumns, this.migrationContext.SharedColumns, this.migrationContext.MappedSharedColumns, dmlEvent.NewColumnValues.AbstractValues())
2017-11-20 08:24:28 +00:00
return append(results, newDmlBuildResult(query, sharedArgs, 1, err))
}
case binlog.UpdateDML:
{
2017-11-21 07:14:04 +00:00
if _, isModified := this.updateModifiesUniqueKeyColumns(dmlEvent); isModified {
dmlEvent.DML = binlog.DeleteDML
results = append(results, this.buildDMLEventQuery(dmlEvent)...)
dmlEvent.DML = binlog.InsertDML
results = append(results, this.buildDMLEventQuery(dmlEvent)...)
return results
2017-11-08 09:11:17 +00:00
}
2016-08-22 06:49:27 +00:00
query, sharedArgs, uniqueKeyArgs, err := sql.BuildDMLUpdateQuery(dmlEvent.DatabaseName, this.migrationContext.GetGhostTableName(), this.migrationContext.OriginalTableColumns, this.migrationContext.SharedColumns, this.migrationContext.MappedSharedColumns, &this.migrationContext.UniqueKey.Columns, dmlEvent.NewColumnValues.AbstractValues(), dmlEvent.WhereColumnValues.AbstractValues())
args := sqlutils.Args()
args = append(args, sharedArgs...)
args = append(args, uniqueKeyArgs...)
2017-11-20 08:24:28 +00:00
return append(results, newDmlBuildResult(query, args, 0, err))
}
}
2017-11-20 08:24:28 +00:00
return append(results, newDmlBuildResultError(fmt.Errorf("Unknown dml event type: %+v", dmlEvent.DML)))
}
// ApplyDMLEventQueries applies multiple DML queries onto the _ghost_ table
func (this *Applier) ApplyDMLEventQueries(dmlEvents [](*binlog.BinlogDMLEvent)) error {
var totalDelta int64
err := func() error {
tx, err := this.db.Begin()
if err != nil {
return err
}
rollback := func(err error) error {
tx.Rollback()
return err
}
2019-02-14 14:58:49 +00:00
sessionQuery := "SET SESSION time_zone = '+00:00'"
sessionQuery = fmt.Sprintf("%s, %s", sessionQuery, this.generateSqlModeQuery())
2019-03-24 09:32:42 +00:00
if _, err := tx.Exec(sessionQuery); err != nil {
return rollback(err)
}
for _, dmlEvent := range dmlEvents {
for _, buildResult := range this.buildDMLEventQuery(dmlEvent) {
if buildResult.err != nil {
return rollback(buildResult.err)
}
result, err := tx.Exec(buildResult.query, buildResult.args...)
if err != nil {
err = fmt.Errorf("%w; query=%s; args=%+v", err, buildResult.query, buildResult.args)
return rollback(err)
}
rowsAffected, err := result.RowsAffected()
if err != nil {
log.Warningf("error getting rows affected from DML event query: %s. i'm going to assume that the DML affected a single row, but this may result in inaccurate statistics", err)
rowsAffected = 1
}
// each DML is either a single insert (delta +1), update (delta +0) or delete (delta -1).
// multiplying by the rows actually affected (either 0 or 1) will give an accurate row delta for this DML event
totalDelta += buildResult.rowsDelta * rowsAffected
}
}
if err := tx.Commit(); err != nil {
return err
}
return nil
}()
if err != nil {
2019-10-07 15:10:36 +00:00
return this.migrationContext.Log.Errore(err)
}
// no error
atomic.AddInt64(&this.migrationContext.TotalDMLEventsApplied, int64(len(dmlEvents)))
if this.migrationContext.CountTableRows {
atomic.AddInt64(&this.migrationContext.RowsDeltaEstimate, totalDelta)
}
2019-10-07 15:10:36 +00:00
this.migrationContext.Log.Debugf("ApplyDMLEventQueries() applied %d events in one transaction", len(dmlEvents))
return nil
}
2017-08-28 22:53:47 +00:00
func (this *Applier) Teardown() {
2019-10-07 15:10:36 +00:00
this.migrationContext.Log.Debugf("Tearing down...")
2017-08-28 22:53:47 +00:00
this.db.Close()
this.singletonDB.Close()
atomic.StoreInt64(&this.finishedMigrating, 1)
2017-08-28 22:53:47 +00:00
}