gh-ost/go/logic/applier.go

1071 lines
38 KiB
Go
Raw Normal View History

/*
Copyright 2016 GitHub Inc.
2016-05-16 09:09:17 +00:00
See https://github.com/github/gh-ost/blob/master/LICENSE
*/
package logic
import (
gosql "database/sql"
"fmt"
"sync/atomic"
"time"
2016-05-16 09:09:17 +00:00
"github.com/github/gh-ost/go/base"
"github.com/github/gh-ost/go/binlog"
"github.com/github/gh-ost/go/mysql"
"github.com/github/gh-ost/go/sql"
"github.com/outbrain/golib/sqlutils"
)
const (
atomicCutOverMagicHint = "ghost-cut-over-sentry"
)
type dmlBuildResult struct {
query string
args []interface{}
rowsDelta int64
err error
}
func newDmlBuildResult(query string, args []interface{}, rowsDelta int64, err error) *dmlBuildResult {
return &dmlBuildResult{
query: query,
args: args,
rowsDelta: rowsDelta,
err: err,
}
}
func newDmlBuildResultError(err error) *dmlBuildResult {
return &dmlBuildResult{
err: err,
}
}
// Applier connects and writes the the applier-server, which is the server where migration
// happens. This is typically the master, but could be a replica when `--test-on-replica` or
// `--execute-on-replica` are given.
// Applier is the one to actually write row data and apply binlog events onto the ghost table.
// It is where the ghost & changelog tables get created. It is where the cut-over phase happens.
type Applier struct {
2017-08-08 22:31:25 +00:00
connectionConfig *mysql.ConnectionConfig
db *gosql.DB
singletonDB *gosql.DB
migrationContext *base.MigrationContext
finishedMigrating int64
}
2017-08-08 20:36:54 +00:00
func NewApplier(migrationContext *base.MigrationContext) *Applier {
return &Applier{
2017-08-08 22:31:25 +00:00
connectionConfig: migrationContext.ApplierConnectionConfig,
migrationContext: migrationContext,
finishedMigrating: 0,
}
}
func (this *Applier) InitDBConnections() (err error) {
applierUri := this.connectionConfig.GetDBUri(this.migrationContext.DatabaseName)
if this.db, _, err = mysql.GetDB(this.migrationContext.Uuid, applierUri); err != nil {
return err
}
singletonApplierUri := fmt.Sprintf("%s&timeout=0", applierUri)
if this.singletonDB, _, err = mysql.GetDB(this.migrationContext.Uuid, singletonApplierUri); err != nil {
return err
}
this.singletonDB.SetMaxOpenConns(1)
version, err := base.ValidateConnection(this.db, this.connectionConfig, this.migrationContext)
Allow gh-ost to modify the server using extra port Both Percona and Maria allow MySQL to be configured to listen on an extra port when their thread pool is enable. * https://www.percona.com/doc/percona-server/5.7/performance/threadpool.html * https://mariadb.com/kb/en/the-mariadb-library/thread-pool-in-mariadb-51-53/ This is valuable because if the table has a lot of traffic (read or write load), gh-ost can end up starving the thread pool as incomming connections are immediately blocked. By using gh-ost on the extra port, MySQL locking will still behave the same, but MySQL will keep a dedicated thread for each gh-ost connection. When doing this, it's important to inspect the extra-max-connections variable. Both Percona and Maria default to 1, so gh-ost may easily exceed with its threads. An example local run using this ``` $ mysql -S /tmp/mysql_sandbox20393.sock -e "select @@global.port, @@global.extra_port" +---------------+---------------------+ | @@global.port | @@global.extra_port | +---------------+---------------------+ | 20393 | 30393 | +---------------+---------------------+ ./bin/gh-ost \ --initially-drop-ghost-table \ --initially-drop-old-table \ --assume-rbr \ --port="20395" \ --assume-master-host="127.0.0.1:30393" \ --max-load=Threads_running=25 \ --critical-load=Threads_running=1000 \ --chunk-size=1000 \ --max-lag-millis=1500 \ --user="gh-ost" \ --password="gh-ost" \ --database="test" \ --table="mytable" \ --verbose \ --alter="ADD mynewcol decimal(11,2) DEFAULT 0.0 NOT NULL" \ --exact-rowcount \ --concurrent-rowcount \ --default-retries=120 \ --panic-flag-file=/tmp/ghost.panic.flag \ --postpone-cut-over-flag-file=/tmp/ghost.postpone.flag \ --execute ```
2017-09-06 18:25:35 +00:00
if err != nil {
return err
}
if _, err := base.ValidateConnection(this.singletonDB, this.connectionConfig, this.migrationContext); err != nil {
return err
}
Allow gh-ost to modify the server using extra port Both Percona and Maria allow MySQL to be configured to listen on an extra port when their thread pool is enable. * https://www.percona.com/doc/percona-server/5.7/performance/threadpool.html * https://mariadb.com/kb/en/the-mariadb-library/thread-pool-in-mariadb-51-53/ This is valuable because if the table has a lot of traffic (read or write load), gh-ost can end up starving the thread pool as incomming connections are immediately blocked. By using gh-ost on the extra port, MySQL locking will still behave the same, but MySQL will keep a dedicated thread for each gh-ost connection. When doing this, it's important to inspect the extra-max-connections variable. Both Percona and Maria default to 1, so gh-ost may easily exceed with its threads. An example local run using this ``` $ mysql -S /tmp/mysql_sandbox20393.sock -e "select @@global.port, @@global.extra_port" +---------------+---------------------+ | @@global.port | @@global.extra_port | +---------------+---------------------+ | 20393 | 30393 | +---------------+---------------------+ ./bin/gh-ost \ --initially-drop-ghost-table \ --initially-drop-old-table \ --assume-rbr \ --port="20395" \ --assume-master-host="127.0.0.1:30393" \ --max-load=Threads_running=25 \ --critical-load=Threads_running=1000 \ --chunk-size=1000 \ --max-lag-millis=1500 \ --user="gh-ost" \ --password="gh-ost" \ --database="test" \ --table="mytable" \ --verbose \ --alter="ADD mynewcol decimal(11,2) DEFAULT 0.0 NOT NULL" \ --exact-rowcount \ --concurrent-rowcount \ --default-retries=120 \ --panic-flag-file=/tmp/ghost.panic.flag \ --postpone-cut-over-flag-file=/tmp/ghost.postpone.flag \ --execute ```
2017-09-06 18:25:35 +00:00
this.migrationContext.ApplierMySQLVersion = version
if err := this.validateAndReadTimeZone(); err != nil {
return err
}
if !this.migrationContext.AliyunRDS && !this.migrationContext.GoogleCloudPlatform {
if impliedKey, err := mysql.GetInstanceKey(this.db); err != nil {
return err
} else {
this.connectionConfig.ImpliedKey = impliedKey
}
}
if err := this.readTableColumns(); err != nil {
return err
}
2019-10-07 15:10:36 +00:00
this.migrationContext.Log.Infof("Applier initiated on %+v, version %+v", this.connectionConfig.ImpliedKey, this.migrationContext.ApplierMySQLVersion)
return nil
}
// validateAndReadTimeZone potentially reads server time-zone
func (this *Applier) validateAndReadTimeZone() error {
2016-10-14 10:56:43 +00:00
query := `select @@global.time_zone`
if err := this.db.QueryRow(query).Scan(&this.migrationContext.ApplierTimeZone); err != nil {
return err
}
2016-10-14 10:56:43 +00:00
2019-10-07 15:10:36 +00:00
this.migrationContext.Log.Infof("will use time_zone='%s' on applier", this.migrationContext.ApplierTimeZone)
return nil
}
// readTableColumns reads table columns on applier
func (this *Applier) readTableColumns() (err error) {
2019-10-07 15:10:36 +00:00
this.migrationContext.Log.Infof("Examining table structure on applier")
this.migrationContext.OriginalTableColumnsOnApplier, _, err = mysql.GetTableColumns(this.db, this.migrationContext.DatabaseName, this.migrationContext.OriginalTableName)
if err != nil {
return err
}
return nil
}
// showTableStatus returns the output of `show table status like '...'` command
func (this *Applier) showTableStatus(tableName string) (rowMap sqlutils.RowMap) {
2016-05-16 09:09:17 +00:00
query := fmt.Sprintf(`show /* gh-ost */ table status from %s like '%s'`, sql.EscapeName(this.migrationContext.DatabaseName), tableName)
sqlutils.QueryRowsMap(this.db, query, func(m sqlutils.RowMap) error {
rowMap = m
return nil
})
return rowMap
}
// tableExists checks if a given table exists in database
func (this *Applier) tableExists(tableName string) (tableFound bool) {
m := this.showTableStatus(tableName)
return (m != nil)
}
// ValidateOrDropExistingTables verifies ghost and changelog tables do not exist,
// or attempts to drop them if instructed to.
func (this *Applier) ValidateOrDropExistingTables() error {
if this.migrationContext.InitiallyDropGhostTable {
if err := this.DropGhostTable(); err != nil {
return err
}
}
if this.tableExists(this.migrationContext.GetGhostTableName()) {
2016-08-22 06:49:27 +00:00
return fmt.Errorf("Table %s already exists. Panicking. Use --initially-drop-ghost-table to force dropping it, though I really prefer that you drop it or rename it away", sql.EscapeName(this.migrationContext.GetGhostTableName()))
}
if this.migrationContext.InitiallyDropOldTable {
if err := this.DropOldTable(); err != nil {
return err
}
}
2017-02-22 00:34:49 +00:00
if len(this.migrationContext.GetOldTableName()) > mysql.MaxTableNameLength {
2019-10-07 15:10:36 +00:00
this.migrationContext.Log.Fatalf("--timestamp-old-table defined, but resulting table name (%s) is too long (only %d characters allowed)", this.migrationContext.GetOldTableName(), mysql.MaxTableNameLength)
2017-02-22 00:34:49 +00:00
}
if this.tableExists(this.migrationContext.GetOldTableName()) {
2016-08-22 06:49:27 +00:00
return fmt.Errorf("Table %s already exists. Panicking. Use --initially-drop-old-table to force dropping it, though I really prefer that you drop it or rename it away", sql.EscapeName(this.migrationContext.GetOldTableName()))
}
return nil
}
// CreateGhostTable creates the ghost table on the applier host
func (this *Applier) CreateGhostTable() error {
2016-05-16 09:09:17 +00:00
query := fmt.Sprintf(`create /* gh-ost */ table %s.%s like %s.%s`,
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.GetGhostTableName()),
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.OriginalTableName),
)
2019-10-07 15:10:36 +00:00
this.migrationContext.Log.Infof("Creating ghost table %s.%s",
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.GetGhostTableName()),
)
if _, err := sqlutils.ExecNoPrepare(this.db, query); err != nil {
return err
}
2019-10-07 15:10:36 +00:00
this.migrationContext.Log.Infof("Ghost table created")
return nil
}
// AlterGhost applies `alter` statement on ghost table
func (this *Applier) AlterGhost() error {
2016-05-16 09:09:17 +00:00
query := fmt.Sprintf(`alter /* gh-ost */ table %s.%s %s`,
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.GetGhostTableName()),
2020-07-23 08:38:05 +00:00
this.migrationContext.AlterStatementOptions,
)
2019-10-07 15:10:36 +00:00
this.migrationContext.Log.Infof("Altering ghost table %s.%s",
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.GetGhostTableName()),
)
2019-10-07 15:10:36 +00:00
this.migrationContext.Log.Debugf("ALTER statement: %s", query)
if _, err := sqlutils.ExecNoPrepare(this.db, query); err != nil {
return err
}
2019-10-07 15:10:36 +00:00
this.migrationContext.Log.Infof("Ghost table altered")
return nil
}
// AlterGhost applies `alter` statement on ghost table
func (this *Applier) AlterGhostAutoIncrement() error {
query := fmt.Sprintf(`alter /* gh-ost */ table %s.%s AUTO_INCREMENT=%d`,
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.GetGhostTableName()),
this.migrationContext.OriginalTableAutoIncrement,
)
this.migrationContext.Log.Infof("Altering ghost table AUTO_INCREMENT value %s.%s",
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.GetGhostTableName()),
)
this.migrationContext.Log.Debugf("AUTO_INCREMENT ALTER statement: %s", query)
if _, err := sqlutils.ExecNoPrepare(this.db, query); err != nil {
return err
}
this.migrationContext.Log.Infof("Ghost table AUTO_INCREMENT altered")
return nil
}
// CreateChangelogTable creates the changelog table on the applier host
func (this *Applier) CreateChangelogTable() error {
if err := this.DropChangelogTable(); err != nil {
return err
}
2016-05-16 09:09:17 +00:00
query := fmt.Sprintf(`create /* gh-ost */ table %s.%s (
id bigint auto_increment,
last_update timestamp not null DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
hint varchar(64) charset ascii not null,
2017-09-03 07:27:04 +00:00
value varchar(4096) charset ascii not null,
primary key(id),
unique key hint_uidx(hint)
) auto_increment=256
`,
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.GetChangelogTableName()),
)
2019-10-07 15:10:36 +00:00
this.migrationContext.Log.Infof("Creating changelog table %s.%s",
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.GetChangelogTableName()),
)
if _, err := sqlutils.ExecNoPrepare(this.db, query); err != nil {
return err
}
2019-10-07 15:10:36 +00:00
this.migrationContext.Log.Infof("Changelog table created")
return nil
}
// dropTable drops a given table on the applied host
func (this *Applier) dropTable(tableName string) error {
2016-05-16 09:09:17 +00:00
query := fmt.Sprintf(`drop /* gh-ost */ table if exists %s.%s`,
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(tableName),
)
2019-10-07 15:10:36 +00:00
this.migrationContext.Log.Infof("Dropping table %s.%s",
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(tableName),
)
if _, err := sqlutils.ExecNoPrepare(this.db, query); err != nil {
return err
}
2019-10-07 15:10:36 +00:00
this.migrationContext.Log.Infof("Table dropped")
return nil
}
// DropChangelogTable drops the changelog table on the applier host
func (this *Applier) DropChangelogTable() error {
return this.dropTable(this.migrationContext.GetChangelogTableName())
}
// DropOldTable drops the _Old table on the applier host
func (this *Applier) DropOldTable() error {
return this.dropTable(this.migrationContext.GetOldTableName())
}
// DropGhostTable drops the ghost table on the applier host
func (this *Applier) DropGhostTable() error {
return this.dropTable(this.migrationContext.GetGhostTableName())
}
// WriteChangelog writes a value to the changelog table.
// It returns the hint as given, for convenience
func (this *Applier) WriteChangelog(hint, value string) (string, error) {
explicitId := 0
switch hint {
case "heartbeat":
explicitId = 1
case "state":
explicitId = 2
case "throttle":
explicitId = 3
}
query := fmt.Sprintf(`
2016-05-16 09:09:17 +00:00
insert /* gh-ost */ into %s.%s
(id, hint, value)
values
(NULLIF(?, 0), ?, ?)
on duplicate key update
last_update=NOW(),
value=VALUES(value)
`,
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.GetChangelogTableName()),
)
_, err := sqlutils.ExecNoPrepare(this.db, query, explicitId, hint, value)
return hint, err
}
func (this *Applier) WriteAndLogChangelog(hint, value string) (string, error) {
this.WriteChangelog(hint, value)
return this.WriteChangelog(fmt.Sprintf("%s at %d", hint, time.Now().UnixNano()), value)
}
func (this *Applier) WriteChangelogState(value string) (string, error) {
return this.WriteAndLogChangelog("state", value)
}
// InitiateHeartbeat creates a heartbeat cycle, writing to the changelog table.
// This is done asynchronously
func (this *Applier) InitiateHeartbeat() {
var numSuccessiveFailures int64
injectHeartbeat := func() error {
2017-05-24 05:32:13 +00:00
if atomic.LoadInt64(&this.migrationContext.HibernateUntil) > 0 {
return nil
}
if _, err := this.WriteChangelog("heartbeat", time.Now().Format(time.RFC3339Nano)); err != nil {
numSuccessiveFailures++
if numSuccessiveFailures > this.migrationContext.MaxRetries() {
2019-10-07 15:10:36 +00:00
return this.migrationContext.Log.Errore(err)
}
} else {
numSuccessiveFailures = 0
}
return nil
}
injectHeartbeat()
heartbeatTick := time.Tick(time.Duration(this.migrationContext.HeartbeatIntervalMilliseconds) * time.Millisecond)
for range heartbeatTick {
if atomic.LoadInt64(&this.finishedMigrating) > 0 {
2017-08-08 22:31:25 +00:00
return
}
// Generally speaking, we would issue a goroutine, but I'd actually rather
// have this block the loop rather than spam the master in the event something
// goes wrong
if throttle, _, reasonHint := this.migrationContext.IsThrottled(); throttle && (reasonHint == base.UserCommandThrottleReasonHint) {
continue
}
if err := injectHeartbeat(); err != nil {
return
}
}
}
// ExecuteThrottleQuery executes the `--throttle-query` and returns its results.
func (this *Applier) ExecuteThrottleQuery() (int64, error) {
throttleQuery := this.migrationContext.GetThrottleQuery()
if throttleQuery == "" {
return 0, nil
}
var result int64
if err := this.db.QueryRow(throttleQuery).Scan(&result); err != nil {
2019-10-07 15:10:36 +00:00
return 0, this.migrationContext.Log.Errore(err)
}
return result, nil
}
// ReadMigrationMinValues returns the minimum values to be iterated on rowcopy
func (this *Applier) ReadMigrationMinValues(uniqueKey *sql.UniqueKey) error {
2019-10-07 15:10:36 +00:00
this.migrationContext.Log.Debugf("Reading migration range according to key: %s", uniqueKey.Name)
2016-10-19 13:22:29 +00:00
query, err := sql.BuildUniqueKeyMinValuesPreparedQuery(this.migrationContext.DatabaseName, this.migrationContext.OriginalTableName, &uniqueKey.Columns)
if err != nil {
return err
}
rows, err := this.db.Query(query)
if err != nil {
return err
}
for rows.Next() {
this.migrationContext.MigrationRangeMinValues = sql.NewColumnValues(uniqueKey.Len())
if err = rows.Scan(this.migrationContext.MigrationRangeMinValues.ValuesPointers...); err != nil {
return err
}
}
2019-10-07 15:10:36 +00:00
this.migrationContext.Log.Infof("Migration min values: [%s]", this.migrationContext.MigrationRangeMinValues)
2020-04-21 16:50:23 +00:00
err = rows.Err()
return err
}
// ReadMigrationMaxValues returns the maximum values to be iterated on rowcopy
func (this *Applier) ReadMigrationMaxValues(uniqueKey *sql.UniqueKey) error {
2019-10-07 15:10:36 +00:00
this.migrationContext.Log.Debugf("Reading migration range according to key: %s", uniqueKey.Name)
2016-10-19 13:22:29 +00:00
query, err := sql.BuildUniqueKeyMaxValuesPreparedQuery(this.migrationContext.DatabaseName, this.migrationContext.OriginalTableName, &uniqueKey.Columns)
if err != nil {
return err
}
rows, err := this.db.Query(query)
if err != nil {
return err
}
for rows.Next() {
this.migrationContext.MigrationRangeMaxValues = sql.NewColumnValues(uniqueKey.Len())
if err = rows.Scan(this.migrationContext.MigrationRangeMaxValues.ValuesPointers...); err != nil {
return err
}
}
2019-10-07 15:10:36 +00:00
this.migrationContext.Log.Infof("Migration max values: [%s]", this.migrationContext.MigrationRangeMaxValues)
2020-04-21 16:50:23 +00:00
err = rows.Err()
return err
}
// ReadMigrationRangeValues reads min/max values that will be used for rowcopy
func (this *Applier) ReadMigrationRangeValues() error {
if err := this.ReadMigrationMinValues(this.migrationContext.UniqueKey); err != nil {
return err
}
if err := this.ReadMigrationMaxValues(this.migrationContext.UniqueKey); err != nil {
return err
}
return nil
}
// CalculateNextIterationRangeEndValues reads the next-iteration-range-end unique key values,
// which will be used for copying the next chunk of rows. Ir returns "false" if there is
// no further chunk to work through, i.e. we're past the last chunk and are done with
2017-11-08 00:49:23 +00:00
// iterating the range (and this done with copying row chunks)
func (this *Applier) CalculateNextIterationRangeEndValues() (hasFurtherRange bool, err error) {
this.migrationContext.MigrationIterationRangeMinValues = this.migrationContext.MigrationIterationRangeMaxValues
if this.migrationContext.MigrationIterationRangeMinValues == nil {
this.migrationContext.MigrationIterationRangeMinValues = this.migrationContext.MigrationRangeMinValues
}
2017-08-21 05:12:41 +00:00
for i := 0; i < 2; i++ {
buildFunc := sql.BuildUniqueKeyRangeEndPreparedQueryViaOffset
if i == 1 {
buildFunc = sql.BuildUniqueKeyRangeEndPreparedQueryViaTemptable
}
query, explodedArgs, err := buildFunc(
this.migrationContext.DatabaseName,
this.migrationContext.OriginalTableName,
&this.migrationContext.UniqueKey.Columns,
this.migrationContext.MigrationIterationRangeMinValues.AbstractValues(),
this.migrationContext.MigrationRangeMaxValues.AbstractValues(),
atomic.LoadInt64(&this.migrationContext.ChunkSize),
this.migrationContext.GetIteration() == 0,
fmt.Sprintf("iteration:%d", this.migrationContext.GetIteration()),
)
if err != nil {
return hasFurtherRange, err
}
2017-08-21 05:12:41 +00:00
rows, err := this.db.Query(query, explodedArgs...)
if err != nil {
return hasFurtherRange, err
}
iterationRangeMaxValues := sql.NewColumnValues(this.migrationContext.UniqueKey.Len())
for rows.Next() {
if err = rows.Scan(iterationRangeMaxValues.ValuesPointers...); err != nil {
return hasFurtherRange, err
}
hasFurtherRange = true
}
if err = rows.Err(); err != nil {
return hasFurtherRange, err
}
2017-08-21 05:12:41 +00:00
if hasFurtherRange {
this.migrationContext.MigrationIterationRangeMaxValues = iterationRangeMaxValues
return hasFurtherRange, nil
}
}
2019-10-07 15:10:36 +00:00
this.migrationContext.Log.Debugf("Iteration complete: no further range to iterate")
return hasFurtherRange, nil
}
// ApplyIterationInsertQuery issues a chunk-INSERT query on the ghost table. It is where
// data actually gets copied from original table.
func (this *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected int64, duration time.Duration, err error) {
startTime := time.Now()
chunkSize = atomic.LoadInt64(&this.migrationContext.ChunkSize)
query, explodedArgs, err := sql.BuildRangeInsertPreparedQuery(
this.migrationContext.DatabaseName,
this.migrationContext.OriginalTableName,
this.migrationContext.GetGhostTableName(),
this.migrationContext.SharedColumns.Names(),
this.migrationContext.MappedSharedColumns.Names(),
this.migrationContext.UniqueKey.Name,
2016-10-19 13:22:29 +00:00
&this.migrationContext.UniqueKey.Columns,
this.migrationContext.MigrationIterationRangeMinValues.AbstractValues(),
this.migrationContext.MigrationIterationRangeMaxValues.AbstractValues(),
this.migrationContext.GetIteration() == 0,
this.migrationContext.IsTransactionalTable(),
)
if err != nil {
return chunkSize, rowsAffected, duration, err
}
2016-10-08 09:06:27 +00:00
sqlResult, err := func() (gosql.Result, error) {
tx, err := this.db.Begin()
if err != nil {
return nil, err
}
2018-04-27 06:58:07 +00:00
defer tx.Rollback()
2019-02-14 14:58:49 +00:00
sessionQuery := fmt.Sprintf(`SET SESSION time_zone = '%s'`, this.migrationContext.ApplierTimeZone)
2019-03-24 09:32:42 +00:00
sqlModeAddendum := `,NO_AUTO_VALUE_ON_ZERO`
2019-02-14 14:58:49 +00:00
if !this.migrationContext.SkipStrictMode {
2019-03-24 09:32:42 +00:00
sqlModeAddendum = fmt.Sprintf("%s,STRICT_ALL_TABLES", sqlModeAddendum)
2019-02-14 14:58:49 +00:00
}
2019-03-24 09:32:42 +00:00
sessionQuery = fmt.Sprintf("%s, sql_mode = CONCAT(@@session.sql_mode, ',%s')", sessionQuery, sqlModeAddendum)
if _, err := tx.Exec(sessionQuery); err != nil {
2016-10-08 09:06:27 +00:00
return nil, err
}
result, err := tx.Exec(query, explodedArgs...)
if err != nil {
return nil, err
}
if err := tx.Commit(); err != nil {
return nil, err
}
return result, nil
}()
if err != nil {
return chunkSize, rowsAffected, duration, err
}
rowsAffected, _ = sqlResult.RowsAffected()
duration = time.Since(startTime)
2019-10-07 15:10:36 +00:00
this.migrationContext.Log.Debugf(
"Issued INSERT on range: [%s]..[%s]; iteration: %d; chunk-size: %d",
this.migrationContext.MigrationIterationRangeMinValues,
this.migrationContext.MigrationIterationRangeMaxValues,
this.migrationContext.GetIteration(),
chunkSize)
return chunkSize, rowsAffected, duration, nil
}
// LockOriginalTable places a write lock on the original table
func (this *Applier) LockOriginalTable() error {
2016-05-16 09:09:17 +00:00
query := fmt.Sprintf(`lock /* gh-ost */ tables %s.%s write`,
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.OriginalTableName),
)
2019-10-07 15:10:36 +00:00
this.migrationContext.Log.Infof("Locking %s.%s",
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.OriginalTableName),
)
this.migrationContext.LockTablesStartTime = time.Now()
if _, err := sqlutils.ExecNoPrepare(this.singletonDB, query); err != nil {
return err
}
2019-10-07 15:10:36 +00:00
this.migrationContext.Log.Infof("Table locked")
return nil
}
// UnlockTables makes tea. No wait, it unlocks tables.
func (this *Applier) UnlockTables() error {
2016-05-16 09:09:17 +00:00
query := `unlock /* gh-ost */ tables`
2019-10-07 15:10:36 +00:00
this.migrationContext.Log.Infof("Unlocking tables")
if _, err := sqlutils.ExecNoPrepare(this.singletonDB, query); err != nil {
return err
}
2019-10-07 15:10:36 +00:00
this.migrationContext.Log.Infof("Tables unlocked")
return nil
}
// SwapTablesQuickAndBumpy issues a two-step swap table operation:
// - rename original table to _old
// - rename ghost table to original
// There is a point in time in between where the table does not exist.
func (this *Applier) SwapTablesQuickAndBumpy() error {
2016-05-16 09:09:17 +00:00
query := fmt.Sprintf(`alter /* gh-ost */ table %s.%s rename %s`,
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.OriginalTableName),
sql.EscapeName(this.migrationContext.GetOldTableName()),
)
2019-10-07 15:10:36 +00:00
this.migrationContext.Log.Infof("Renaming original table")
this.migrationContext.RenameTablesStartTime = time.Now()
if _, err := sqlutils.ExecNoPrepare(this.singletonDB, query); err != nil {
return err
}
2016-05-16 09:09:17 +00:00
query = fmt.Sprintf(`alter /* gh-ost */ table %s.%s rename %s`,
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.GetGhostTableName()),
sql.EscapeName(this.migrationContext.OriginalTableName),
)
2019-10-07 15:10:36 +00:00
this.migrationContext.Log.Infof("Renaming ghost table")
if _, err := sqlutils.ExecNoPrepare(this.db, query); err != nil {
return err
}
this.migrationContext.RenameTablesEndTime = time.Now()
2019-10-07 15:10:36 +00:00
this.migrationContext.Log.Infof("Tables renamed")
return nil
}
// RenameTablesRollback renames back both table: original back to ghost,
// _old back to original. This is used by `--test-on-replica`
func (this *Applier) RenameTablesRollback() (renameError error) {
// Restoring tables to original names.
// We prefer the single, atomic operation:
query := fmt.Sprintf(`rename /* gh-ost */ table %s.%s to %s.%s, %s.%s to %s.%s`,
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.OriginalTableName),
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.GetGhostTableName()),
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.GetOldTableName()),
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.OriginalTableName),
)
2019-10-07 15:10:36 +00:00
this.migrationContext.Log.Infof("Renaming back both tables")
if _, err := sqlutils.ExecNoPrepare(this.db, query); err == nil {
return nil
}
// But, if for some reason the above was impossible to do, we rename one by one.
query = fmt.Sprintf(`rename /* gh-ost */ table %s.%s to %s.%s`,
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.OriginalTableName),
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.GetGhostTableName()),
)
2019-10-07 15:10:36 +00:00
this.migrationContext.Log.Infof("Renaming back to ghost table")
if _, err := sqlutils.ExecNoPrepare(this.db, query); err != nil {
renameError = err
}
query = fmt.Sprintf(`rename /* gh-ost */ table %s.%s to %s.%s`,
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.GetOldTableName()),
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.OriginalTableName),
)
2019-10-07 15:10:36 +00:00
this.migrationContext.Log.Infof("Renaming back to original table")
if _, err := sqlutils.ExecNoPrepare(this.db, query); err != nil {
renameError = err
}
2019-10-07 15:10:36 +00:00
return this.migrationContext.Log.Errore(renameError)
}
// StopSlaveIOThread is applicable with --test-on-replica; it stops the IO thread, duh.
// We need to keep the SQL thread active so as to complete processing received events,
// and have them written to the binary log, so that we can then read them via streamer.
func (this *Applier) StopSlaveIOThread() error {
2016-05-16 09:09:17 +00:00
query := `stop /* gh-ost */ slave io_thread`
2019-10-07 15:10:36 +00:00
this.migrationContext.Log.Infof("Stopping replication IO thread")
if _, err := sqlutils.ExecNoPrepare(this.db, query); err != nil {
return err
}
2019-10-07 15:10:36 +00:00
this.migrationContext.Log.Infof("Replication IO thread stopped")
return nil
}
// StartSlaveIOThread is applicable with --test-on-replica
func (this *Applier) StartSlaveIOThread() error {
query := `start /* gh-ost */ slave io_thread`
2019-10-07 15:10:36 +00:00
this.migrationContext.Log.Infof("Starting replication IO thread")
if _, err := sqlutils.ExecNoPrepare(this.db, query); err != nil {
return err
}
2019-10-07 15:10:36 +00:00
this.migrationContext.Log.Infof("Replication IO thread started")
return nil
}
// StartSlaveSQLThread is applicable with --test-on-replica
func (this *Applier) StopSlaveSQLThread() error {
query := `stop /* gh-ost */ slave sql_thread`
2019-10-07 15:10:36 +00:00
this.migrationContext.Log.Infof("Verifying SQL thread is stopped")
if _, err := sqlutils.ExecNoPrepare(this.db, query); err != nil {
return err
}
2019-10-07 15:10:36 +00:00
this.migrationContext.Log.Infof("SQL thread stopped")
return nil
}
// StartSlaveSQLThread is applicable with --test-on-replica
func (this *Applier) StartSlaveSQLThread() error {
query := `start /* gh-ost */ slave sql_thread`
2019-10-07 15:10:36 +00:00
this.migrationContext.Log.Infof("Verifying SQL thread is running")
if _, err := sqlutils.ExecNoPrepare(this.db, query); err != nil {
return err
}
2019-10-07 15:10:36 +00:00
this.migrationContext.Log.Infof("SQL thread started")
return nil
}
// StopReplication is used by `--test-on-replica` and stops replication.
func (this *Applier) StopReplication() error {
if err := this.StopSlaveIOThread(); err != nil {
return err
}
if err := this.StopSlaveSQLThread(); err != nil {
return err
}
readBinlogCoordinates, executeBinlogCoordinates, err := mysql.GetReplicationBinlogCoordinates(this.db)
if err != nil {
return err
}
2019-10-07 15:10:36 +00:00
this.migrationContext.Log.Infof("Replication IO thread at %+v. SQL thread is at %+v", *readBinlogCoordinates, *executeBinlogCoordinates)
return nil
}
// StartReplication is used by `--test-on-replica` on cut-over failure
func (this *Applier) StartReplication() error {
if err := this.StartSlaveIOThread(); err != nil {
return err
}
if err := this.StartSlaveSQLThread(); err != nil {
return err
}
2019-10-07 15:10:36 +00:00
this.migrationContext.Log.Infof("Replication started")
return nil
}
// GetSessionLockName returns a name for the special hint session voluntary lock
func (this *Applier) GetSessionLockName(sessionId int64) string {
return fmt.Sprintf("gh-ost.%d.lock", sessionId)
}
// ExpectUsedLock expects the special hint voluntary lock to exist on given session
func (this *Applier) ExpectUsedLock(sessionId int64) error {
var result int64
query := `select is_used_lock(?)`
lockName := this.GetSessionLockName(sessionId)
2019-10-07 15:10:36 +00:00
this.migrationContext.Log.Infof("Checking session lock: %s", lockName)
if err := this.db.QueryRow(query, lockName).Scan(&result); err != nil || result != sessionId {
return fmt.Errorf("Session lock %s expected to be found but wasn't", lockName)
}
return nil
}
// ExpectProcess expects a process to show up in `SHOW PROCESSLIST` that has given characteristics
func (this *Applier) ExpectProcess(sessionId int64, stateHint, infoHint string) error {
found := false
query := `
select id
from information_schema.processlist
where
id != connection_id()
and ? in (0, id)
and state like concat('%', ?, '%')
and info like concat('%', ?, '%')
`
err := sqlutils.QueryRowsMap(this.db, query, func(m sqlutils.RowMap) error {
found = true
return nil
}, sessionId, stateHint, infoHint)
if err != nil {
return err
}
if !found {
return fmt.Errorf("Cannot find process. Hints: %s, %s", stateHint, infoHint)
}
return nil
}
// DropAtomicCutOverSentryTableIfExists checks if the "old" table name
// happens to be a cut-over magic table; if so, it drops it.
func (this *Applier) DropAtomicCutOverSentryTableIfExists() error {
2019-10-07 15:10:36 +00:00
this.migrationContext.Log.Infof("Looking for magic cut-over table")
tableName := this.migrationContext.GetOldTableName()
rowMap := this.showTableStatus(tableName)
if rowMap == nil {
// Table does not exist
return nil
}
if rowMap["Comment"].String != atomicCutOverMagicHint {
return fmt.Errorf("Expected magic comment on %s, did not find it", tableName)
}
2019-10-07 15:10:36 +00:00
this.migrationContext.Log.Infof("Dropping magic cut-over table")
return this.dropTable(tableName)
}
2016-11-17 16:10:17 +00:00
// CreateAtomicCutOverSentryTable
func (this *Applier) CreateAtomicCutOverSentryTable() error {
if err := this.DropAtomicCutOverSentryTableIfExists(); err != nil {
return err
}
tableName := this.migrationContext.GetOldTableName()
query := fmt.Sprintf(`create /* gh-ost */ table %s.%s (
id int auto_increment primary key
) engine=%s comment='%s'
`,
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(tableName),
this.migrationContext.TableEngine,
atomicCutOverMagicHint,
)
2019-10-07 15:10:36 +00:00
this.migrationContext.Log.Infof("Creating magic cut-over table %s.%s",
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(tableName),
)
if _, err := sqlutils.ExecNoPrepare(this.db, query); err != nil {
return err
}
2019-10-07 15:10:36 +00:00
this.migrationContext.Log.Infof("Magic cut-over table created")
return nil
}
// AtomicCutOverMagicLock
func (this *Applier) AtomicCutOverMagicLock(sessionIdChan chan int64, tableLocked chan<- error, okToUnlockTable <-chan bool, tableUnlocked chan<- error) error {
tx, err := this.db.Begin()
if err != nil {
tableLocked <- err
return err
}
defer func() {
sessionIdChan <- -1
tableLocked <- fmt.Errorf("Unexpected error in AtomicCutOverMagicLock(), injected to release blocking channel reads")
tableUnlocked <- fmt.Errorf("Unexpected error in AtomicCutOverMagicLock(), injected to release blocking channel reads")
tx.Rollback()
}()
var sessionId int64
if err := tx.QueryRow(`select connection_id()`).Scan(&sessionId); err != nil {
tableLocked <- err
return err
}
sessionIdChan <- sessionId
lockResult := 0
query := `select get_lock(?, 0)`
lockName := this.GetSessionLockName(sessionId)
2019-10-07 15:10:36 +00:00
this.migrationContext.Log.Infof("Grabbing voluntary lock: %s", lockName)
if err := tx.QueryRow(query, lockName).Scan(&lockResult); err != nil || lockResult != 1 {
err := fmt.Errorf("Unable to acquire lock %s", lockName)
tableLocked <- err
return err
}
tableLockTimeoutSeconds := this.migrationContext.CutOverLockTimeoutSeconds * 2
2019-10-07 15:10:36 +00:00
this.migrationContext.Log.Infof("Setting LOCK timeout as %d seconds", tableLockTimeoutSeconds)
query = fmt.Sprintf(`set session lock_wait_timeout:=%d`, tableLockTimeoutSeconds)
if _, err := tx.Exec(query); err != nil {
tableLocked <- err
return err
}
if err := this.CreateAtomicCutOverSentryTable(); err != nil {
tableLocked <- err
return err
}
query = fmt.Sprintf(`lock /* gh-ost */ tables %s.%s write, %s.%s write`,
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.OriginalTableName),
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.GetOldTableName()),
)
2019-10-07 15:10:36 +00:00
this.migrationContext.Log.Infof("Locking %s.%s, %s.%s",
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.OriginalTableName),
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.GetOldTableName()),
)
this.migrationContext.LockTablesStartTime = time.Now()
if _, err := tx.Exec(query); err != nil {
tableLocked <- err
return err
}
2019-10-07 15:10:36 +00:00
this.migrationContext.Log.Infof("Tables locked")
tableLocked <- nil // No error.
// From this point on, we are committed to UNLOCK TABLES. No matter what happens,
// the UNLOCK must execute (or, alternatively, this connection dies, which gets the same impact)
// The cut-over phase will proceed to apply remaining backlog onto ghost table,
// and issue RENAME. We wait here until told to proceed.
<-okToUnlockTable
2019-10-07 15:10:36 +00:00
this.migrationContext.Log.Infof("Will now proceed to drop magic table and unlock tables")
// The magic table is here because we locked it. And we are the only ones allowed to drop it.
// And in fact, we will:
2019-10-07 15:10:36 +00:00
this.migrationContext.Log.Infof("Dropping magic cut-over table")
query = fmt.Sprintf(`drop /* gh-ost */ table if exists %s.%s`,
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.GetOldTableName()),
)
if _, err := tx.Exec(query); err != nil {
2019-10-07 15:10:36 +00:00
this.migrationContext.Log.Errore(err)
// We DO NOT return here because we must `UNLOCK TABLES`!
}
// Tables still locked
2019-10-07 15:10:36 +00:00
this.migrationContext.Log.Infof("Releasing lock from %s.%s, %s.%s",
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.OriginalTableName),
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.GetOldTableName()),
)
query = `unlock tables`
if _, err := tx.Exec(query); err != nil {
tableUnlocked <- err
2019-10-07 15:10:36 +00:00
return this.migrationContext.Log.Errore(err)
}
2019-10-07 15:10:36 +00:00
this.migrationContext.Log.Infof("Tables unlocked")
tableUnlocked <- nil
return nil
}
// AtomicCutoverRename
func (this *Applier) AtomicCutoverRename(sessionIdChan chan int64, tablesRenamed chan<- error) error {
tx, err := this.db.Begin()
if err != nil {
return err
}
defer func() {
tx.Rollback()
sessionIdChan <- -1
tablesRenamed <- fmt.Errorf("Unexpected error in AtomicCutoverRename(), injected to release blocking channel reads")
}()
var sessionId int64
if err := tx.QueryRow(`select connection_id()`).Scan(&sessionId); err != nil {
return err
}
sessionIdChan <- sessionId
2019-10-07 15:10:36 +00:00
this.migrationContext.Log.Infof("Setting RENAME timeout as %d seconds", this.migrationContext.CutOverLockTimeoutSeconds)
query := fmt.Sprintf(`set session lock_wait_timeout:=%d`, this.migrationContext.CutOverLockTimeoutSeconds)
if _, err := tx.Exec(query); err != nil {
return err
}
query = fmt.Sprintf(`rename /* gh-ost */ table %s.%s to %s.%s, %s.%s to %s.%s`,
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.OriginalTableName),
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.GetOldTableName()),
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.GetGhostTableName()),
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.OriginalTableName),
)
2019-10-07 15:10:36 +00:00
this.migrationContext.Log.Infof("Issuing and expecting this to block: %s", query)
if _, err := tx.Exec(query); err != nil {
tablesRenamed <- err
2019-10-07 15:10:36 +00:00
return this.migrationContext.Log.Errore(err)
}
tablesRenamed <- nil
2019-10-07 15:10:36 +00:00
this.migrationContext.Log.Infof("Tables renamed")
return nil
}
func (this *Applier) ShowStatusVariable(variableName string) (result int64, err error) {
query := fmt.Sprintf(`show global status like '%s'`, variableName)
if err := this.db.QueryRow(query).Scan(&variableName, &result); err != nil {
return 0, err
}
return result, nil
}
2017-11-21 07:14:04 +00:00
// updateModifiesUniqueKeyColumns checks whether a UPDATE DML event actually
// modifies values of the migration's unique key (the iterated key). This will call
// for special handling.
func (this *Applier) updateModifiesUniqueKeyColumns(dmlEvent *binlog.BinlogDMLEvent) (modifiedColumn string, isModified bool) {
2017-11-08 09:11:17 +00:00
for _, column := range this.migrationContext.UniqueKey.Columns.Columns() {
tableOrdinal := this.migrationContext.OriginalTableColumns.Ordinals[column.Name]
whereColumnValue := dmlEvent.WhereColumnValues.AbstractValues()[tableOrdinal]
newColumnValue := dmlEvent.NewColumnValues.AbstractValues()[tableOrdinal]
if newColumnValue != whereColumnValue {
return column.Name, true
2017-11-08 09:11:17 +00:00
}
}
return "", false
2017-11-08 09:11:17 +00:00
}
// buildDMLEventQuery creates a query to operate on the ghost table, based on an intercepted binlog
// event entry on the original table.
func (this *Applier) buildDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) (results [](*dmlBuildResult)) {
switch dmlEvent.DML {
case binlog.DeleteDML:
{
query, uniqueKeyArgs, err := sql.BuildDMLDeleteQuery(dmlEvent.DatabaseName, this.migrationContext.GetGhostTableName(), this.migrationContext.OriginalTableColumns, &this.migrationContext.UniqueKey.Columns, dmlEvent.WhereColumnValues.AbstractValues())
2017-11-20 08:24:28 +00:00
return append(results, newDmlBuildResult(query, uniqueKeyArgs, -1, err))
}
case binlog.InsertDML:
{
query, sharedArgs, err := sql.BuildDMLInsertQuery(dmlEvent.DatabaseName, this.migrationContext.GetGhostTableName(), this.migrationContext.OriginalTableColumns, this.migrationContext.SharedColumns, this.migrationContext.MappedSharedColumns, dmlEvent.NewColumnValues.AbstractValues())
2017-11-20 08:24:28 +00:00
return append(results, newDmlBuildResult(query, sharedArgs, 1, err))
}
case binlog.UpdateDML:
{
2017-11-21 07:14:04 +00:00
if _, isModified := this.updateModifiesUniqueKeyColumns(dmlEvent); isModified {
dmlEvent.DML = binlog.DeleteDML
results = append(results, this.buildDMLEventQuery(dmlEvent)...)
dmlEvent.DML = binlog.InsertDML
results = append(results, this.buildDMLEventQuery(dmlEvent)...)
return results
2017-11-08 09:11:17 +00:00
}
2016-08-22 06:49:27 +00:00
query, sharedArgs, uniqueKeyArgs, err := sql.BuildDMLUpdateQuery(dmlEvent.DatabaseName, this.migrationContext.GetGhostTableName(), this.migrationContext.OriginalTableColumns, this.migrationContext.SharedColumns, this.migrationContext.MappedSharedColumns, &this.migrationContext.UniqueKey.Columns, dmlEvent.NewColumnValues.AbstractValues(), dmlEvent.WhereColumnValues.AbstractValues())
args := sqlutils.Args()
args = append(args, sharedArgs...)
args = append(args, uniqueKeyArgs...)
2017-11-20 08:24:28 +00:00
return append(results, newDmlBuildResult(query, args, 0, err))
}
}
2017-11-20 08:24:28 +00:00
return append(results, newDmlBuildResultError(fmt.Errorf("Unknown dml event type: %+v", dmlEvent.DML)))
}
// ApplyDMLEventQueries applies multiple DML queries onto the _ghost_ table
func (this *Applier) ApplyDMLEventQueries(dmlEvents [](*binlog.BinlogDMLEvent)) error {
var totalDelta int64
err := func() error {
tx, err := this.db.Begin()
if err != nil {
return err
}
rollback := func(err error) error {
tx.Rollback()
return err
}
2019-02-14 14:58:49 +00:00
sessionQuery := "SET SESSION time_zone = '+00:00'"
2019-03-24 09:32:42 +00:00
sqlModeAddendum := `,NO_AUTO_VALUE_ON_ZERO`
2019-02-14 14:58:49 +00:00
if !this.migrationContext.SkipStrictMode {
2019-03-24 09:32:42 +00:00
sqlModeAddendum = fmt.Sprintf("%s,STRICT_ALL_TABLES", sqlModeAddendum)
2019-02-14 14:58:49 +00:00
}
2019-03-24 09:32:42 +00:00
sessionQuery = fmt.Sprintf("%s, sql_mode = CONCAT(@@session.sql_mode, ',%s')", sessionQuery, sqlModeAddendum)
if _, err := tx.Exec(sessionQuery); err != nil {
return rollback(err)
}
for _, dmlEvent := range dmlEvents {
for _, buildResult := range this.buildDMLEventQuery(dmlEvent) {
if buildResult.err != nil {
return rollback(buildResult.err)
}
if _, err := tx.Exec(buildResult.query, buildResult.args...); err != nil {
err = fmt.Errorf("%s; query=%s; args=%+v", err.Error(), buildResult.query, buildResult.args)
return rollback(err)
}
totalDelta += buildResult.rowsDelta
}
}
if err := tx.Commit(); err != nil {
return err
}
return nil
}()
if err != nil {
2019-10-07 15:10:36 +00:00
return this.migrationContext.Log.Errore(err)
}
// no error
atomic.AddInt64(&this.migrationContext.TotalDMLEventsApplied, int64(len(dmlEvents)))
if this.migrationContext.CountTableRows {
atomic.AddInt64(&this.migrationContext.RowsDeltaEstimate, totalDelta)
}
2019-10-07 15:10:36 +00:00
this.migrationContext.Log.Debugf("ApplyDMLEventQueries() applied %d events in one transaction", len(dmlEvents))
return nil
}
2017-08-28 22:53:47 +00:00
func (this *Applier) Teardown() {
2019-10-07 15:10:36 +00:00
this.migrationContext.Log.Debugf("Tearing down...")
2017-08-28 22:53:47 +00:00
this.db.Close()
this.singletonDB.Close()
atomic.StoreInt64(&this.finishedMigrating, 1)
2017-08-28 22:53:47 +00:00
}