816 lines
28 KiB
Go
816 lines
28 KiB
Go
/*
|
|
Copyright 2016 GitHub Inc.
|
|
See https://github.com/github/gh-ost/blob/master/LICENSE
|
|
*/
|
|
|
|
package logic
|
|
|
|
import (
|
|
gosql "database/sql"
|
|
"fmt"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/github/gh-ost/go/base"
|
|
"github.com/github/gh-ost/go/binlog"
|
|
"github.com/github/gh-ost/go/mysql"
|
|
"github.com/github/gh-ost/go/sql"
|
|
|
|
"github.com/outbrain/golib/log"
|
|
"github.com/outbrain/golib/sqlutils"
|
|
)
|
|
|
|
// Applier reads data from the read-MySQL-server (typically a replica, but can be the master)
|
|
// It is used for gaining initial status and structure, and later also follow up on progress and changelog
|
|
type Applier struct {
|
|
connectionConfig *mysql.ConnectionConfig
|
|
db *gosql.DB
|
|
singletonDB *gosql.DB
|
|
migrationContext *base.MigrationContext
|
|
}
|
|
|
|
func NewApplier() *Applier {
|
|
return &Applier{
|
|
connectionConfig: base.GetMigrationContext().ApplierConnectionConfig,
|
|
migrationContext: base.GetMigrationContext(),
|
|
}
|
|
}
|
|
|
|
func (this *Applier) InitDBConnections() (err error) {
|
|
applierUri := this.connectionConfig.GetDBUri(this.migrationContext.DatabaseName)
|
|
if this.db, _, err = sqlutils.GetDB(applierUri); err != nil {
|
|
return err
|
|
}
|
|
singletonApplierUri := fmt.Sprintf("%s?timeout=0", applierUri)
|
|
if this.singletonDB, _, err = sqlutils.GetDB(singletonApplierUri); err != nil {
|
|
return err
|
|
}
|
|
this.singletonDB.SetMaxOpenConns(1)
|
|
if err := this.validateConnection(this.db); err != nil {
|
|
return err
|
|
}
|
|
if err := this.validateConnection(this.singletonDB); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// validateConnection issues a simple can-connect to MySQL
|
|
func (this *Applier) validateConnection(db *gosql.DB) error {
|
|
query := `select @@global.port`
|
|
var port int
|
|
if err := db.QueryRow(query).Scan(&port); err != nil {
|
|
return err
|
|
}
|
|
if port != this.connectionConfig.Key.Port {
|
|
return fmt.Errorf("Unexpected database port reported: %+v", port)
|
|
}
|
|
log.Infof("connection validated on %+v", this.connectionConfig.Key)
|
|
return nil
|
|
}
|
|
|
|
func (this *Applier) tableExists(tableName string) (tableFound bool) {
|
|
query := fmt.Sprintf(`show /* gh-ost */ table status from %s like '%s'`, sql.EscapeName(this.migrationContext.DatabaseName), tableName)
|
|
|
|
sqlutils.QueryRowsMap(this.db, query, func(m sqlutils.RowMap) error {
|
|
tableFound = true
|
|
return nil
|
|
})
|
|
return tableFound
|
|
}
|
|
|
|
func (this *Applier) ValidateOrDropExistingTables() error {
|
|
if this.migrationContext.InitiallyDropGhostTable {
|
|
if err := this.DropGhostTable(); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
if this.tableExists(this.migrationContext.GetGhostTableName()) {
|
|
return fmt.Errorf("Table %s already exists. Panicking. Use --initially-drop-ghost-table to force dropping it", sql.EscapeName(this.migrationContext.GetGhostTableName()))
|
|
}
|
|
if this.migrationContext.InitiallyDropOldTable {
|
|
if err := this.DropOldTable(); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
if this.tableExists(this.migrationContext.GetOldTableName()) {
|
|
return fmt.Errorf("Table %s already exists. Panicking. Use --initially-drop-old-table to force dropping it", sql.EscapeName(this.migrationContext.GetOldTableName()))
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// CreateGhostTable creates the ghost table on the applier host
|
|
func (this *Applier) CreateGhostTable() error {
|
|
query := fmt.Sprintf(`create /* gh-ost */ table %s.%s like %s.%s`,
|
|
sql.EscapeName(this.migrationContext.DatabaseName),
|
|
sql.EscapeName(this.migrationContext.GetGhostTableName()),
|
|
sql.EscapeName(this.migrationContext.DatabaseName),
|
|
sql.EscapeName(this.migrationContext.OriginalTableName),
|
|
)
|
|
log.Infof("Creating ghost table %s.%s",
|
|
sql.EscapeName(this.migrationContext.DatabaseName),
|
|
sql.EscapeName(this.migrationContext.GetGhostTableName()),
|
|
)
|
|
if _, err := sqlutils.ExecNoPrepare(this.db, query); err != nil {
|
|
return err
|
|
}
|
|
log.Infof("Ghost table created")
|
|
return nil
|
|
}
|
|
|
|
// AlterGhost applies `alter` statement on ghost table
|
|
func (this *Applier) AlterGhost() error {
|
|
query := fmt.Sprintf(`alter /* gh-ost */ table %s.%s %s`,
|
|
sql.EscapeName(this.migrationContext.DatabaseName),
|
|
sql.EscapeName(this.migrationContext.GetGhostTableName()),
|
|
this.migrationContext.AlterStatement,
|
|
)
|
|
log.Infof("Altering ghost table %s.%s",
|
|
sql.EscapeName(this.migrationContext.DatabaseName),
|
|
sql.EscapeName(this.migrationContext.GetGhostTableName()),
|
|
)
|
|
log.Debugf("ALTER statement: %s", query)
|
|
if _, err := sqlutils.ExecNoPrepare(this.db, query); err != nil {
|
|
return err
|
|
}
|
|
log.Infof("Ghost table altered")
|
|
return nil
|
|
}
|
|
|
|
// CreateChangelogTable creates the changelog table on the applier host
|
|
func (this *Applier) CreateChangelogTable() error {
|
|
if err := this.DropChangelogTable(); err != nil {
|
|
return err
|
|
}
|
|
query := fmt.Sprintf(`create /* gh-ost */ table %s.%s (
|
|
id bigint auto_increment,
|
|
last_update timestamp not null DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
|
|
hint varchar(64) charset ascii not null,
|
|
value varchar(255) charset ascii not null,
|
|
primary key(id),
|
|
unique key hint_uidx(hint)
|
|
) auto_increment=256
|
|
`,
|
|
sql.EscapeName(this.migrationContext.DatabaseName),
|
|
sql.EscapeName(this.migrationContext.GetChangelogTableName()),
|
|
)
|
|
log.Infof("Creating changelog table %s.%s",
|
|
sql.EscapeName(this.migrationContext.DatabaseName),
|
|
sql.EscapeName(this.migrationContext.GetChangelogTableName()),
|
|
)
|
|
if _, err := sqlutils.ExecNoPrepare(this.db, query); err != nil {
|
|
return err
|
|
}
|
|
log.Infof("Changelog table created")
|
|
return nil
|
|
}
|
|
|
|
// dropTable drops a given table on the applied host
|
|
func (this *Applier) dropTable(tableName string) error {
|
|
query := fmt.Sprintf(`drop /* gh-ost */ table if exists %s.%s`,
|
|
sql.EscapeName(this.migrationContext.DatabaseName),
|
|
sql.EscapeName(tableName),
|
|
)
|
|
log.Infof("Droppping table %s.%s",
|
|
sql.EscapeName(this.migrationContext.DatabaseName),
|
|
sql.EscapeName(tableName),
|
|
)
|
|
if _, err := sqlutils.ExecNoPrepare(this.db, query); err != nil {
|
|
return err
|
|
}
|
|
log.Infof("Table dropped")
|
|
return nil
|
|
}
|
|
|
|
// DropChangelogTable drops the changelog table on the applier host
|
|
func (this *Applier) DropChangelogTable() error {
|
|
return this.dropTable(this.migrationContext.GetChangelogTableName())
|
|
}
|
|
|
|
// DropOldTable drops the _Old table on the applier host
|
|
func (this *Applier) DropOldTable() error {
|
|
return this.dropTable(this.migrationContext.GetOldTableName())
|
|
}
|
|
|
|
// DropGhostTable drops the ghost table on the applier host
|
|
func (this *Applier) DropGhostTable() error {
|
|
return this.dropTable(this.migrationContext.GetGhostTableName())
|
|
}
|
|
|
|
// WriteChangelog writes a value to the changelog table.
|
|
// It returns the hint as given, for convenience
|
|
func (this *Applier) WriteChangelog(hint, value string) (string, error) {
|
|
explicitId := 0
|
|
switch hint {
|
|
case "heartbeat":
|
|
explicitId = 1
|
|
case "state":
|
|
explicitId = 2
|
|
case "throttle":
|
|
explicitId = 3
|
|
}
|
|
query := fmt.Sprintf(`
|
|
insert /* gh-ost */ into %s.%s
|
|
(id, hint, value)
|
|
values
|
|
(NULLIF(?, 0), ?, ?)
|
|
on duplicate key update
|
|
last_update=NOW(),
|
|
value=VALUES(value)
|
|
`,
|
|
sql.EscapeName(this.migrationContext.DatabaseName),
|
|
sql.EscapeName(this.migrationContext.GetChangelogTableName()),
|
|
)
|
|
_, err := sqlutils.Exec(this.db, query, explicitId, hint, value)
|
|
return hint, err
|
|
}
|
|
|
|
func (this *Applier) WriteAndLogChangelog(hint, value string) (string, error) {
|
|
this.WriteChangelog(hint, value)
|
|
return this.WriteChangelog(fmt.Sprintf("%s at %d", hint, time.Now().UnixNano()), value)
|
|
}
|
|
|
|
func (this *Applier) WriteChangelogState(value string) (string, error) {
|
|
return this.WriteAndLogChangelog("state", value)
|
|
}
|
|
|
|
// InitiateHeartbeat creates a heartbeat cycle, writing to the changelog table.
|
|
// This is done asynchronously
|
|
func (this *Applier) InitiateHeartbeat(heartbeatIntervalMilliseconds int64) {
|
|
numSuccessiveFailures := 0
|
|
injectHeartbeat := func() error {
|
|
if _, err := this.WriteChangelog("heartbeat", time.Now().Format(time.RFC3339Nano)); err != nil {
|
|
numSuccessiveFailures++
|
|
if numSuccessiveFailures > this.migrationContext.MaxRetries() {
|
|
return log.Errore(err)
|
|
}
|
|
} else {
|
|
numSuccessiveFailures = 0
|
|
}
|
|
return nil
|
|
}
|
|
injectHeartbeat()
|
|
|
|
heartbeatTick := time.Tick(time.Duration(heartbeatIntervalMilliseconds) * time.Millisecond)
|
|
for range heartbeatTick {
|
|
// Generally speaking, we would issue a goroutine, but I'd actually rather
|
|
// have this blocked rather than spam the master in the event something
|
|
// goes wrong
|
|
if err := injectHeartbeat(); err != nil {
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// ReadMigrationMinValues
|
|
func (this *Applier) ReadMigrationMinValues(uniqueKey *sql.UniqueKey) error {
|
|
log.Debugf("Reading migration range according to key: %s", uniqueKey.Name)
|
|
query, err := sql.BuildUniqueKeyMinValuesPreparedQuery(this.migrationContext.DatabaseName, this.migrationContext.OriginalTableName, uniqueKey.Columns.Names)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
rows, err := this.db.Query(query)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
for rows.Next() {
|
|
this.migrationContext.MigrationRangeMinValues = sql.NewColumnValues(uniqueKey.Len())
|
|
if err = rows.Scan(this.migrationContext.MigrationRangeMinValues.ValuesPointers...); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
log.Infof("Migration min values: [%s]", this.migrationContext.MigrationRangeMinValues)
|
|
return err
|
|
}
|
|
|
|
// ReadMigrationMinValues
|
|
func (this *Applier) ReadMigrationMaxValues(uniqueKey *sql.UniqueKey) error {
|
|
log.Debugf("Reading migration range according to key: %s", uniqueKey.Name)
|
|
query, err := sql.BuildUniqueKeyMaxValuesPreparedQuery(this.migrationContext.DatabaseName, this.migrationContext.OriginalTableName, uniqueKey.Columns.Names)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
rows, err := this.db.Query(query)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
for rows.Next() {
|
|
this.migrationContext.MigrationRangeMaxValues = sql.NewColumnValues(uniqueKey.Len())
|
|
if err = rows.Scan(this.migrationContext.MigrationRangeMaxValues.ValuesPointers...); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
log.Infof("Migration max values: [%s]", this.migrationContext.MigrationRangeMaxValues)
|
|
return err
|
|
}
|
|
|
|
func (this *Applier) ReadMigrationRangeValues() error {
|
|
if err := this.ReadMigrationMinValues(this.migrationContext.UniqueKey); err != nil {
|
|
return err
|
|
}
|
|
if err := this.ReadMigrationMaxValues(this.migrationContext.UniqueKey); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// __unused_IterationIsComplete lets us know when the copy-iteration phase is complete, i.e.
|
|
// we've exhausted all rows
|
|
func (this *Applier) __unused_IterationIsComplete() (bool, error) {
|
|
if !this.migrationContext.HasMigrationRange() {
|
|
return false, nil
|
|
}
|
|
if this.migrationContext.MigrationIterationRangeMinValues == nil {
|
|
return false, nil
|
|
}
|
|
args := sqlutils.Args()
|
|
compareWithIterationRangeStart, explodedArgs, err := sql.BuildRangePreparedComparison(this.migrationContext.UniqueKey.Columns.Names, this.migrationContext.MigrationIterationRangeMinValues.AbstractValues(), sql.GreaterThanOrEqualsComparisonSign)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
args = append(args, explodedArgs...)
|
|
compareWithRangeEnd, explodedArgs, err := sql.BuildRangePreparedComparison(this.migrationContext.UniqueKey.Columns.Names, this.migrationContext.MigrationRangeMaxValues.AbstractValues(), sql.LessThanComparisonSign)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
args = append(args, explodedArgs...)
|
|
query := fmt.Sprintf(`
|
|
select /* gh-ost IterationIsComplete */ 1
|
|
from %s.%s
|
|
where (%s) and (%s)
|
|
limit 1
|
|
`,
|
|
sql.EscapeName(this.migrationContext.DatabaseName),
|
|
sql.EscapeName(this.migrationContext.OriginalTableName),
|
|
compareWithIterationRangeStart,
|
|
compareWithRangeEnd,
|
|
)
|
|
|
|
moreRowsFound := false
|
|
err = sqlutils.QueryRowsMap(this.db, query, func(rowMap sqlutils.RowMap) error {
|
|
moreRowsFound = true
|
|
return nil
|
|
}, args...)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
return !moreRowsFound, nil
|
|
}
|
|
|
|
// CalculateNextIterationRangeEndValues reads the next-iteration-range-end unique key values,
|
|
// which will be used for copying the next chunk of rows. Ir returns "false" if there is
|
|
// no further chunk to work through, i.e. we're past the last chunk and are done with
|
|
// itrating the range (and this done with copying row chunks)
|
|
func (this *Applier) CalculateNextIterationRangeEndValues() (hasFurtherRange bool, err error) {
|
|
this.migrationContext.MigrationIterationRangeMinValues = this.migrationContext.MigrationIterationRangeMaxValues
|
|
if this.migrationContext.MigrationIterationRangeMinValues == nil {
|
|
this.migrationContext.MigrationIterationRangeMinValues = this.migrationContext.MigrationRangeMinValues
|
|
}
|
|
query, explodedArgs, err := sql.BuildUniqueKeyRangeEndPreparedQuery(
|
|
this.migrationContext.DatabaseName,
|
|
this.migrationContext.OriginalTableName,
|
|
this.migrationContext.UniqueKey.Columns.Names,
|
|
this.migrationContext.MigrationIterationRangeMinValues.AbstractValues(),
|
|
this.migrationContext.MigrationRangeMaxValues.AbstractValues(),
|
|
atomic.LoadInt64(&this.migrationContext.ChunkSize),
|
|
this.migrationContext.GetIteration() == 0,
|
|
fmt.Sprintf("iteration:%d", this.migrationContext.GetIteration()),
|
|
)
|
|
if err != nil {
|
|
return hasFurtherRange, err
|
|
}
|
|
rows, err := this.db.Query(query, explodedArgs...)
|
|
if err != nil {
|
|
return hasFurtherRange, err
|
|
}
|
|
iterationRangeMaxValues := sql.NewColumnValues(this.migrationContext.UniqueKey.Len())
|
|
for rows.Next() {
|
|
if err = rows.Scan(iterationRangeMaxValues.ValuesPointers...); err != nil {
|
|
return hasFurtherRange, err
|
|
}
|
|
hasFurtherRange = true
|
|
}
|
|
if !hasFurtherRange {
|
|
log.Debugf("Iteration complete: no further range to iterate")
|
|
return hasFurtherRange, nil
|
|
}
|
|
this.migrationContext.MigrationIterationRangeMaxValues = iterationRangeMaxValues
|
|
return hasFurtherRange, nil
|
|
}
|
|
|
|
func (this *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected int64, duration time.Duration, err error) {
|
|
startTime := time.Now()
|
|
chunkSize = atomic.LoadInt64(&this.migrationContext.ChunkSize)
|
|
|
|
query, explodedArgs, err := sql.BuildRangeInsertPreparedQuery(
|
|
this.migrationContext.DatabaseName,
|
|
this.migrationContext.OriginalTableName,
|
|
this.migrationContext.GetGhostTableName(),
|
|
this.migrationContext.SharedColumns.Names,
|
|
this.migrationContext.UniqueKey.Name,
|
|
this.migrationContext.UniqueKey.Columns.Names,
|
|
this.migrationContext.MigrationIterationRangeMinValues.AbstractValues(),
|
|
this.migrationContext.MigrationIterationRangeMaxValues.AbstractValues(),
|
|
this.migrationContext.GetIteration() == 0,
|
|
this.migrationContext.IsTransactionalTable(),
|
|
)
|
|
if err != nil {
|
|
return chunkSize, rowsAffected, duration, err
|
|
}
|
|
sqlResult, err := sqlutils.Exec(this.db, query, explodedArgs...)
|
|
if err != nil {
|
|
return chunkSize, rowsAffected, duration, err
|
|
}
|
|
rowsAffected, _ = sqlResult.RowsAffected()
|
|
duration = time.Now().Sub(startTime)
|
|
log.Debugf(
|
|
"Issued INSERT on range: [%s]..[%s]; iteration: %d; chunk-size: %d",
|
|
this.migrationContext.MigrationIterationRangeMinValues,
|
|
this.migrationContext.MigrationIterationRangeMaxValues,
|
|
this.migrationContext.GetIteration(),
|
|
chunkSize)
|
|
return chunkSize, rowsAffected, duration, nil
|
|
}
|
|
|
|
// LockOriginalTable places a write lock on the original table
|
|
func (this *Applier) LockOriginalTable() error {
|
|
query := fmt.Sprintf(`lock /* gh-ost */ tables %s.%s write`,
|
|
sql.EscapeName(this.migrationContext.DatabaseName),
|
|
sql.EscapeName(this.migrationContext.OriginalTableName),
|
|
)
|
|
log.Infof("Locking %s.%s",
|
|
sql.EscapeName(this.migrationContext.DatabaseName),
|
|
sql.EscapeName(this.migrationContext.OriginalTableName),
|
|
)
|
|
this.migrationContext.LockTablesStartTime = time.Now()
|
|
if _, err := sqlutils.ExecNoPrepare(this.singletonDB, query); err != nil {
|
|
return err
|
|
}
|
|
log.Infof("Table locked")
|
|
return nil
|
|
}
|
|
|
|
// UnlockTables
|
|
func (this *Applier) UnlockTables() error {
|
|
query := `unlock /* gh-ost */ tables`
|
|
log.Infof("Unlocking tables")
|
|
if _, err := sqlutils.ExecNoPrepare(this.singletonDB, query); err != nil {
|
|
return err
|
|
}
|
|
log.Infof("Tables unlocked")
|
|
return nil
|
|
}
|
|
|
|
// SwapTablesQuickAndBumpy
|
|
func (this *Applier) SwapTablesQuickAndBumpy() error {
|
|
query := fmt.Sprintf(`alter /* gh-ost */ table %s.%s rename %s`,
|
|
sql.EscapeName(this.migrationContext.DatabaseName),
|
|
sql.EscapeName(this.migrationContext.OriginalTableName),
|
|
sql.EscapeName(this.migrationContext.GetOldTableName()),
|
|
)
|
|
log.Infof("Renaming original table")
|
|
this.migrationContext.RenameTablesStartTime = time.Now()
|
|
if _, err := sqlutils.ExecNoPrepare(this.singletonDB, query); err != nil {
|
|
return err
|
|
}
|
|
query = fmt.Sprintf(`alter /* gh-ost */ table %s.%s rename %s`,
|
|
sql.EscapeName(this.migrationContext.DatabaseName),
|
|
sql.EscapeName(this.migrationContext.GetGhostTableName()),
|
|
sql.EscapeName(this.migrationContext.OriginalTableName),
|
|
)
|
|
log.Infof("Renaming ghost table")
|
|
if _, err := sqlutils.ExecNoPrepare(this.db, query); err != nil {
|
|
return err
|
|
}
|
|
this.migrationContext.RenameTablesEndTime = time.Now()
|
|
|
|
log.Infof("Tables renamed")
|
|
return nil
|
|
}
|
|
|
|
func (this *Applier) RenameTable(fromName, toName string) (err error) {
|
|
query := fmt.Sprintf(`rename /* gh-ost */ table %s.%s to %s.%s`,
|
|
sql.EscapeName(this.migrationContext.DatabaseName),
|
|
sql.EscapeName(fromName),
|
|
sql.EscapeName(this.migrationContext.DatabaseName),
|
|
sql.EscapeName(toName),
|
|
)
|
|
log.Infof("Renaming %s to %s", fromName, toName)
|
|
if _, err := sqlutils.ExecNoPrepare(this.db, query); err != nil {
|
|
return log.Errore(err)
|
|
}
|
|
log.Infof("Table renamed")
|
|
return nil
|
|
}
|
|
|
|
func (this *Applier) RenameTablesRollback() (renameError error) {
|
|
// Restoring tables to original names.
|
|
// We prefer the single, atomic operation:
|
|
query := fmt.Sprintf(`rename /* gh-ost */ table %s.%s to %s.%s, %s.%s to %s.%s`,
|
|
sql.EscapeName(this.migrationContext.DatabaseName),
|
|
sql.EscapeName(this.migrationContext.OriginalTableName),
|
|
sql.EscapeName(this.migrationContext.DatabaseName),
|
|
sql.EscapeName(this.migrationContext.GetGhostTableName()),
|
|
sql.EscapeName(this.migrationContext.DatabaseName),
|
|
sql.EscapeName(this.migrationContext.GetOldTableName()),
|
|
sql.EscapeName(this.migrationContext.DatabaseName),
|
|
sql.EscapeName(this.migrationContext.OriginalTableName),
|
|
)
|
|
log.Infof("Renaming back both tables")
|
|
if _, err := sqlutils.ExecNoPrepare(this.db, query); err == nil {
|
|
return nil
|
|
}
|
|
// But, if for some reason the above was impossible to do, we rename one by one.
|
|
query = fmt.Sprintf(`rename /* gh-ost */ table %s.%s to %s.%s`,
|
|
sql.EscapeName(this.migrationContext.DatabaseName),
|
|
sql.EscapeName(this.migrationContext.OriginalTableName),
|
|
sql.EscapeName(this.migrationContext.DatabaseName),
|
|
sql.EscapeName(this.migrationContext.GetGhostTableName()),
|
|
)
|
|
log.Infof("Renaming back to ghost table")
|
|
if _, err := sqlutils.ExecNoPrepare(this.db, query); err != nil {
|
|
renameError = err
|
|
}
|
|
query = fmt.Sprintf(`rename /* gh-ost */ table %s.%s to %s.%s`,
|
|
sql.EscapeName(this.migrationContext.DatabaseName),
|
|
sql.EscapeName(this.migrationContext.GetOldTableName()),
|
|
sql.EscapeName(this.migrationContext.DatabaseName),
|
|
sql.EscapeName(this.migrationContext.OriginalTableName),
|
|
)
|
|
log.Infof("Renaming back to original table")
|
|
if _, err := sqlutils.ExecNoPrepare(this.db, query); err != nil {
|
|
renameError = err
|
|
}
|
|
return log.Errore(renameError)
|
|
}
|
|
|
|
// StopSlaveIOThread is applicable with --test-on-replica; it stops the IO thread, duh.
|
|
// We need to keep the SQL thread active so as to complete processing received events,
|
|
// and have them written to the binary log, so that we can then read them via streamer.
|
|
func (this *Applier) StopSlaveIOThread() error {
|
|
query := `stop /* gh-ost */ slave io_thread`
|
|
log.Infof("Stopping replication")
|
|
if _, err := sqlutils.ExecNoPrepare(this.db, query); err != nil {
|
|
return err
|
|
}
|
|
log.Infof("Replication stopped")
|
|
return nil
|
|
}
|
|
|
|
// StartSlaveSQLThread is applicable with --test-on-replica
|
|
func (this *Applier) StopSlaveSQLThread() error {
|
|
query := `stop /* gh-ost */ slave sql_thread`
|
|
log.Infof("Verifying SQL thread is stopped")
|
|
if _, err := sqlutils.ExecNoPrepare(this.db, query); err != nil {
|
|
return err
|
|
}
|
|
log.Infof("SQL thread stopped")
|
|
return nil
|
|
}
|
|
|
|
// StartSlaveSQLThread is applicable with --test-on-replica
|
|
func (this *Applier) StartSlaveSQLThread() error {
|
|
query := `start /* gh-ost */ slave sql_thread`
|
|
log.Infof("Verifying SQL thread is running")
|
|
if _, err := sqlutils.ExecNoPrepare(this.db, query); err != nil {
|
|
return err
|
|
}
|
|
log.Infof("SQL thread started")
|
|
return nil
|
|
}
|
|
|
|
func (this *Applier) StopSlaveNicely() error {
|
|
if err := this.StopSlaveIOThread(); err != nil {
|
|
return err
|
|
}
|
|
if err := this.StopSlaveSQLThread(); err != nil {
|
|
return err
|
|
}
|
|
readBinlogCoordinates, executeBinlogCoordinates, err := mysql.GetReplicationBinlogCoordinates(this.db)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
log.Infof("Replication IO thread at %+v. SQL thread is at %+v", *readBinlogCoordinates, *executeBinlogCoordinates)
|
|
return nil
|
|
}
|
|
|
|
func (this *Applier) GetSessionLockName(sessionId int64) string {
|
|
return fmt.Sprintf("gh-ost.%d.lock", sessionId)
|
|
}
|
|
|
|
// LockOriginalTableAndWait locks the original table, notifies the lock is in
|
|
// place, and awaits further instruction
|
|
func (this *Applier) LockOriginalTableAndWait(sessionIdChan chan int64, tableLocked chan<- error, okToUnlockTable <-chan bool, tableUnlocked chan<- error) error {
|
|
tx, err := this.db.Begin()
|
|
if err != nil {
|
|
tableLocked <- err
|
|
return err
|
|
}
|
|
defer func() {
|
|
tx.Rollback()
|
|
}()
|
|
|
|
var sessionId int64
|
|
if err := tx.QueryRow(`select connection_id()`).Scan(&sessionId); err != nil {
|
|
return err
|
|
}
|
|
sessionIdChan <- sessionId
|
|
|
|
query := `select get_lock(?, 0)`
|
|
lockResult := 0
|
|
lockName := this.GetSessionLockName(sessionId)
|
|
log.Infof("Grabbing voluntary lock: %s", lockName)
|
|
if err := tx.QueryRow(query, lockName).Scan(&lockResult); err != nil || lockResult != 1 {
|
|
return fmt.Errorf("Unable to acquire lock %s", lockName)
|
|
}
|
|
|
|
query = fmt.Sprintf(`lock /* gh-ost */ tables %s.%s write`,
|
|
sql.EscapeName(this.migrationContext.DatabaseName),
|
|
sql.EscapeName(this.migrationContext.OriginalTableName),
|
|
)
|
|
log.Infof("Locking %s.%s",
|
|
sql.EscapeName(this.migrationContext.DatabaseName),
|
|
sql.EscapeName(this.migrationContext.OriginalTableName),
|
|
)
|
|
this.migrationContext.LockTablesStartTime = time.Now()
|
|
if _, err := tx.Exec(query); err != nil {
|
|
tableLocked <- err
|
|
return err
|
|
}
|
|
log.Infof("Table locked")
|
|
tableLocked <- nil // No error.
|
|
|
|
// The cut-over phase will proceed to apply remaining backlon onto ghost table,
|
|
// and issue RENAMEs. We wait here until told to proceed.
|
|
<-okToUnlockTable
|
|
// Release
|
|
query = `unlock tables`
|
|
log.Infof("Releasing lock from %s.%s",
|
|
sql.EscapeName(this.migrationContext.DatabaseName),
|
|
sql.EscapeName(this.migrationContext.OriginalTableName),
|
|
)
|
|
if _, err := tx.Exec(query); err != nil {
|
|
tableUnlocked <- err
|
|
return log.Errore(err)
|
|
}
|
|
log.Infof("Table unlocked")
|
|
tableUnlocked <- nil
|
|
return nil
|
|
}
|
|
|
|
// RenameOriginalTable will attempt renaming the original table into _old
|
|
func (this *Applier) RenameOriginalTable(sessionIdChan chan int64, originalTableRenamed chan<- error) error {
|
|
tx, err := this.db.Begin()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer func() {
|
|
tx.Rollback()
|
|
originalTableRenamed <- nil
|
|
}()
|
|
var sessionId int64
|
|
if err := tx.QueryRow(`select connection_id()`).Scan(&sessionId); err != nil {
|
|
return err
|
|
}
|
|
sessionIdChan <- sessionId
|
|
|
|
log.Infof("Setting RENAME timeout as %d seconds", this.migrationContext.SwapTablesTimeoutSeconds)
|
|
query := fmt.Sprintf(`set session lock_wait_timeout:=%d`, this.migrationContext.SwapTablesTimeoutSeconds)
|
|
if _, err := tx.Exec(query); err != nil {
|
|
return err
|
|
}
|
|
|
|
query = fmt.Sprintf(`rename /* gh-ost */ table %s.%s to %s.%s`,
|
|
sql.EscapeName(this.migrationContext.DatabaseName),
|
|
sql.EscapeName(this.migrationContext.OriginalTableName),
|
|
sql.EscapeName(this.migrationContext.DatabaseName),
|
|
sql.EscapeName(this.migrationContext.GetOldTableName()),
|
|
)
|
|
log.Infof("Issuing and expecting this to block: %s", query)
|
|
if _, err := tx.Exec(query); err != nil {
|
|
return log.Errore(err)
|
|
}
|
|
log.Infof("Original table renamed")
|
|
return nil
|
|
}
|
|
|
|
// RenameGhostTable will attempt renaming the ghost table into original
|
|
func (this *Applier) RenameGhostTable(sessionIdChan chan int64, ghostTableRenamed chan<- error) error {
|
|
tx, err := this.db.Begin()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer func() {
|
|
tx.Rollback()
|
|
}()
|
|
var sessionId int64
|
|
if err := tx.QueryRow(`select connection_id()`).Scan(&sessionId); err != nil {
|
|
return err
|
|
}
|
|
sessionIdChan <- sessionId
|
|
|
|
log.Infof("Setting RENAME timeout as %d seconds", this.migrationContext.SwapTablesTimeoutSeconds)
|
|
query := fmt.Sprintf(`set session lock_wait_timeout:=%d`, this.migrationContext.SwapTablesTimeoutSeconds)
|
|
if _, err := tx.Exec(query); err != nil {
|
|
return err
|
|
}
|
|
|
|
query = fmt.Sprintf(`rename /* gh-ost */ table %s.%s to %s.%s`,
|
|
sql.EscapeName(this.migrationContext.DatabaseName),
|
|
sql.EscapeName(this.migrationContext.GetGhostTableName()),
|
|
sql.EscapeName(this.migrationContext.DatabaseName),
|
|
sql.EscapeName(this.migrationContext.OriginalTableName),
|
|
)
|
|
log.Infof("Issuing and expecting this to block: %s", query)
|
|
if _, err := tx.Exec(query); err != nil {
|
|
ghostTableRenamed <- err
|
|
return log.Errore(err)
|
|
}
|
|
log.Infof("Ghost table renamed")
|
|
ghostTableRenamed <- nil
|
|
|
|
return nil
|
|
}
|
|
|
|
func (this *Applier) ExpectUsedLock(sessionId int64) error {
|
|
var result int64
|
|
query := `select is_used_lock(?)`
|
|
lockName := this.GetSessionLockName(sessionId)
|
|
log.Infof("Checking session lock: %s", lockName)
|
|
if err := this.db.QueryRow(query, lockName).Scan(&result); err != nil || result != sessionId {
|
|
return fmt.Errorf("Session lock %s expected to be found but wasn't", lockName)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (this *Applier) ExpectProcess(sessionId int64, stateHint, infoHint string) error {
|
|
found := false
|
|
query := `
|
|
select id
|
|
from information_schema.processlist
|
|
where
|
|
id != connection_id()
|
|
and ? in (0, id)
|
|
and state like concat('%', ?, '%')
|
|
and info like concat('%', ?, '%')
|
|
`
|
|
err := sqlutils.QueryRowsMap(this.db, query, func(m sqlutils.RowMap) error {
|
|
found = true
|
|
return nil
|
|
}, sessionId, stateHint, infoHint)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if !found {
|
|
return fmt.Errorf("Cannot find process. Hints: %s, %s", stateHint, infoHint)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (this *Applier) ShowStatusVariable(variableName string) (result int64, err error) {
|
|
query := fmt.Sprintf(`show global status like '%s'`, variableName)
|
|
if err := this.db.QueryRow(query).Scan(&variableName, &result); err != nil {
|
|
return 0, err
|
|
}
|
|
return result, nil
|
|
}
|
|
|
|
func (this *Applier) buildDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) (query string, args []interface{}, rowsDelta int64, err error) {
|
|
switch dmlEvent.DML {
|
|
case binlog.DeleteDML:
|
|
{
|
|
query, uniqueKeyArgs, err := sql.BuildDMLDeleteQuery(dmlEvent.DatabaseName, this.migrationContext.GetGhostTableName(), this.migrationContext.OriginalTableColumns, &this.migrationContext.UniqueKey.Columns, dmlEvent.WhereColumnValues.AbstractValues())
|
|
return query, uniqueKeyArgs, -1, err
|
|
}
|
|
case binlog.InsertDML:
|
|
{
|
|
query, sharedArgs, err := sql.BuildDMLInsertQuery(dmlEvent.DatabaseName, this.migrationContext.GetGhostTableName(), this.migrationContext.OriginalTableColumns, this.migrationContext.SharedColumns, dmlEvent.NewColumnValues.AbstractValues())
|
|
return query, sharedArgs, 1, err
|
|
}
|
|
case binlog.UpdateDML:
|
|
{
|
|
query, sharedArgs, uniqueKeyArgs, err := sql.BuildDMLUpdateQuery(dmlEvent.DatabaseName, this.migrationContext.GetGhostTableName(), this.migrationContext.OriginalTableColumns, this.migrationContext.SharedColumns, &this.migrationContext.UniqueKey.Columns, dmlEvent.NewColumnValues.AbstractValues(), dmlEvent.WhereColumnValues.AbstractValues())
|
|
args = append(args, sharedArgs...)
|
|
args = append(args, uniqueKeyArgs...)
|
|
return query, args, 0, err
|
|
}
|
|
}
|
|
return "", args, 0, fmt.Errorf("Unknown dml event type: %+v", dmlEvent.DML)
|
|
}
|
|
|
|
func (this *Applier) ApplyDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) error {
|
|
query, args, rowDelta, err := this.buildDMLEventQuery(dmlEvent)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
_, err = sqlutils.Exec(this.db, query, args...)
|
|
if err == nil {
|
|
atomic.AddInt64(&this.migrationContext.TotalDMLEventsApplied, 1)
|
|
}
|
|
if this.migrationContext.CountTableRows {
|
|
atomic.AddInt64(&this.migrationContext.RowsEstimate, rowDelta)
|
|
}
|
|
return err
|
|
}
|