Merge 11540b453b
into b7db8c6ca7
This commit is contained in:
commit
da8a6a48fe
|
@ -232,6 +232,8 @@ type MigrationContext struct {
|
|||
|
||||
recentBinlogCoordinates mysql.BinlogCoordinates
|
||||
|
||||
AllowSetupMetadataLockInstruments bool
|
||||
|
||||
Log Logger
|
||||
}
|
||||
|
||||
|
|
|
@ -134,7 +134,7 @@ func main() {
|
|||
flag.Int64Var(&migrationContext.HooksStatusIntervalSec, "hooks-status-interval", 60, "how many seconds to wait between calling onStatus hook")
|
||||
|
||||
flag.UintVar(&migrationContext.ReplicaServerId, "replica-server-id", 99999, "server id used by gh-ost process. Default: 99999")
|
||||
|
||||
flag.BoolVar(&migrationContext.AllowSetupMetadataLockInstruments, "allow-setup-metadata-lock-instruments", false, "validate rename session acquiring lock whether is original table before unlock tables in cut-over phase")
|
||||
maxLoad := flag.String("max-load", "", "Comma delimited status-name=threshold. e.g: 'Threads_running=100,Threads_connected=500'. When status exceeds threshold, app throttles writes")
|
||||
criticalLoad := flag.String("critical-load", "", "Comma delimited status-name=threshold, same format as --max-load. When status exceeds threshold, app panics and quits")
|
||||
flag.Int64Var(&migrationContext.CriticalLoadIntervalMilliseconds, "critical-load-interval-millis", 0, "When 0, migration immediately bails out upon meeting critical-load. When non-zero, a second check is done after given interval, and migration only bails out if 2nd check still meets critical load")
|
||||
|
|
|
@ -7,6 +7,7 @@ package logic
|
|||
|
||||
import (
|
||||
gosql "database/sql"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strings"
|
||||
"sync/atomic"
|
||||
|
@ -416,6 +417,24 @@ func (this *Applier) WriteChangelogState(value string) (string, error) {
|
|||
return this.WriteAndLogChangelog("state", value)
|
||||
}
|
||||
|
||||
func (this *Applier) EnableMetadataLockInstrument() (err error) {
|
||||
query := `select /*+ MAX_EXECUTION_TIME(300) */ ENABLED, TIMED from performance_schema.setup_instruments WHERE NAME = 'wait/lock/metadata/sql/mdl'`
|
||||
var enabled, timed string
|
||||
if err = this.db.QueryRow(query).Scan(&enabled, &timed); err != nil {
|
||||
return this.migrationContext.Log.Errorf("query performance_schema.setup_instruments with name wait/lock/metadata/sql/mdl error: %s", err)
|
||||
}
|
||||
if strings.EqualFold(enabled, "YES") && strings.EqualFold(timed, "YES") {
|
||||
this.migrationContext.Log.Infof("instrument wait/lock/metadata/sql/mdl already has been enabled")
|
||||
return nil
|
||||
}
|
||||
this.migrationContext.Log.Infof("instrument wait/lock/metadata/sql/mdl state: enabled %s, timed %s", enabled, timed)
|
||||
if _, err = this.db.Exec(`UPDATE performance_schema.setup_instruments SET ENABLED = 'YES', TIMED = 'YES' WHERE NAME = 'wait/lock/metadata/sql/mdl'`); err != nil {
|
||||
return this.migrationContext.Log.Errorf("enable instrument wait/lock/metadata/sql/mdl error: %s", err)
|
||||
}
|
||||
this.migrationContext.Log.Infof("instrument wait/lock/metadata/sql/mdl enabled")
|
||||
return nil
|
||||
}
|
||||
|
||||
// InitiateHeartbeat creates a heartbeat cycle, writing to the changelog table.
|
||||
// This is done asynchronously
|
||||
func (this *Applier) InitiateHeartbeat() {
|
||||
|
@ -934,7 +953,7 @@ func (this *Applier) CreateAtomicCutOverSentryTable() error {
|
|||
}
|
||||
|
||||
// AtomicCutOverMagicLock
|
||||
func (this *Applier) AtomicCutOverMagicLock(sessionIdChan chan int64, tableLocked chan<- error, okToUnlockTable <-chan bool, tableUnlocked chan<- error) error {
|
||||
func (this *Applier) AtomicCutOverMagicLock(sessionIdChan chan int64, tableLocked chan<- error, okToUnlockTable <-chan bool, tableUnlocked chan<- error, okToDropSentryTable <-chan bool, dropSentryTableDone chan<- bool) error {
|
||||
tx, err := this.db.Begin()
|
||||
if err != nil {
|
||||
tableLocked <- err
|
||||
|
@ -1003,7 +1022,16 @@ func (this *Applier) AtomicCutOverMagicLock(sessionIdChan chan int64, tableLocke
|
|||
|
||||
// The cut-over phase will proceed to apply remaining backlog onto ghost table,
|
||||
// and issue RENAME. We wait here until told to proceed.
|
||||
<-okToUnlockTable
|
||||
|
||||
// we should make sure that the whole lock tables duration(include wait channel cost) not higher than rename session timeout.
|
||||
// receive timeout channel, Rename session already has timeout&quit
|
||||
select {
|
||||
case <-okToDropSentryTable:
|
||||
this.migrationContext.Log.Infof("Receive drop magic table channel, drop table %s.%s now", sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.GetOldTableName()))
|
||||
case <-time.After(time.Duration(tableLockTimeoutSeconds)*time.Second - time.Since(this.migrationContext.LockTablesStartTime)):
|
||||
this.migrationContext.Log.Warningf("Wait drop magic table channel timeout, drop table %s.%s forcefully now. Noteworthy, it is unreasonable, timeout was used for foolproof, but normally it should not be here.", sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.GetOldTableName()))
|
||||
}
|
||||
|
||||
this.migrationContext.Log.Infof("Will now proceed to drop magic table and unlock tables")
|
||||
|
||||
// The magic table is here because we locked it. And we are the only ones allowed to drop it.
|
||||
|
@ -1019,6 +1047,16 @@ func (this *Applier) AtomicCutOverMagicLock(sessionIdChan chan int64, tableLocke
|
|||
// We DO NOT return here because we must `UNLOCK TABLES`!
|
||||
}
|
||||
|
||||
// Anyway, send dropSentryTableDone channel here
|
||||
dropSentryTableDone <- true
|
||||
|
||||
// receive timeout channel, Rename session already has timeout&quit
|
||||
select {
|
||||
case <-okToUnlockTable:
|
||||
this.migrationContext.Log.Infof("Receive unlock table channel, unlock tables now")
|
||||
case <-time.After(time.Duration(tableLockTimeoutSeconds)*time.Second - time.Since(this.migrationContext.LockTablesStartTime)):
|
||||
this.migrationContext.Log.Warningf("Wait unlock table channel timeout, unlock tables forcefully now. Noteworthy, it is unreasonable, timeout was used for foolproof, but normally it should not be here.")
|
||||
}
|
||||
// Tables still locked
|
||||
this.migrationContext.Log.Infof("Releasing lock from %s.%s, %s.%s",
|
||||
sql.EscapeName(this.migrationContext.DatabaseName),
|
||||
|
@ -1079,6 +1117,46 @@ func (this *Applier) AtomicCutoverRename(sessionIdChan chan int64, tablesRenamed
|
|||
return nil
|
||||
}
|
||||
|
||||
// ValidateGhostTableLocked validate original table has been locked via wait/lock/metadata/sql/mdl instrument.
|
||||
// return error means the ghost table has not been locked by rename session which is not expected. Need kill rename session, unlock table try again later.
|
||||
func (this *Applier) ValidateGhostTableLocked(renameSessionId int64) (err error) {
|
||||
var schema, object, state, info string
|
||||
query := `select /*+ MAX_EXECUTION_TIME(300) */ ifnull(a.object_schema,''),a.object_name,b.PROCESSLIST_STATE,b.PROCESSLIST_INFO from performance_schema.metadata_locks a join performance_schema.threads b on a.owner_thread_id=b.thread_id where b.processlist_id=? and a.lock_status='PENDING';`
|
||||
// Not strictly validate here
|
||||
if err := this.db.QueryRow(query, renameSessionId).Scan(&schema, &object, &state, &info); err != nil {
|
||||
if errors.Is(err, gosql.ErrNoRows) {
|
||||
this.migrationContext.Log.Warningf("query metadata locks returns %s, perhaps instrument wait/lock/metadata/sql/mdl not enabled, enable it via -allow-setup-metadata-lock-instruments", err)
|
||||
}
|
||||
this.migrationContext.Log.Warningf("Grabbing rename session acquire metadata lock error: %s, query:%s", err, query)
|
||||
} else {
|
||||
this.migrationContext.Log.Infof("Grabbing rename session acquire metadata lock is schema:%s, object:%s, state:%s, info: %s", schema, object, state, info)
|
||||
if !strings.EqualFold(strings.ToLower(object), strings.ToLower(this.migrationContext.OriginalTableName)) {
|
||||
return this.migrationContext.Log.Errorf("Expect rename session %d acquiring table metadata lock is %s.%s, but got %s.%s",
|
||||
renameSessionId,
|
||||
sql.EscapeName(this.migrationContext.DatabaseName),
|
||||
sql.EscapeName(this.migrationContext.OriginalTableName),
|
||||
sql.EscapeName(schema),
|
||||
sql.EscapeName(object))
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// KillRenameSession Kill rename session while ghost table not locked.
|
||||
// Check rename session id whether is an expect process before execute kill command.
|
||||
func (this *Applier) KillRenameSession(renameSessionId int64) (err error) {
|
||||
if err := this.ExpectProcess(renameSessionId, "metadata lock", "rename"); err != nil {
|
||||
return err
|
||||
}
|
||||
this.migrationContext.Log.Infof("Starting kill rename session %d", renameSessionId)
|
||||
query := fmt.Sprintf(`kill /* gh-ost */ %d`, renameSessionId)
|
||||
if _, err := this.db.Exec(query); err != nil {
|
||||
return this.migrationContext.Log.Errorf("kill rename session %d error %s, anyway starting release original table now", renameSessionId, err)
|
||||
}
|
||||
this.migrationContext.Log.Infof("Kill rename session %d done", renameSessionId)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (this *Applier) ShowStatusVariable(variableName string) (result int64, err error) {
|
||||
query := fmt.Sprintf(`show global status like '%s'`, variableName)
|
||||
if err := this.db.QueryRow(query).Scan(&variableName, &result); err != nil {
|
||||
|
|
|
@ -638,7 +638,10 @@ func (this *Migrator) atomicCutOver() (err error) {
|
|||
defer atomic.StoreInt64(&this.migrationContext.InCutOverCriticalSectionFlag, 0)
|
||||
|
||||
okToUnlockTable := make(chan bool, 4)
|
||||
okToDropSentryTable := make(chan bool, 4)
|
||||
dropSentryTableDone := make(chan bool, 2)
|
||||
defer func() {
|
||||
okToDropSentryTable <- true
|
||||
okToUnlockTable <- true
|
||||
}()
|
||||
|
||||
|
@ -648,7 +651,7 @@ func (this *Migrator) atomicCutOver() (err error) {
|
|||
tableLocked := make(chan error, 2)
|
||||
tableUnlocked := make(chan error, 2)
|
||||
go func() {
|
||||
if err := this.applier.AtomicCutOverMagicLock(lockOriginalSessionIdChan, tableLocked, okToUnlockTable, tableUnlocked); err != nil {
|
||||
if err := this.applier.AtomicCutOverMagicLock(lockOriginalSessionIdChan, tableLocked, okToUnlockTable, tableUnlocked, okToDropSentryTable, dropSentryTableDone); err != nil {
|
||||
this.migrationContext.Log.Errore(err)
|
||||
}
|
||||
}()
|
||||
|
@ -674,6 +677,7 @@ func (this *Migrator) atomicCutOver() (err error) {
|
|||
if err := this.applier.AtomicCutoverRename(renameSessionIdChan, tablesRenamed); err != nil {
|
||||
// Abort! Release the lock
|
||||
atomic.StoreInt64(&tableRenameKnownToHaveFailed, 1)
|
||||
okToDropSentryTable <- true
|
||||
okToUnlockTable <- true
|
||||
}
|
||||
}()
|
||||
|
@ -691,6 +695,7 @@ func (this *Migrator) atomicCutOver() (err error) {
|
|||
// Wait for the RENAME to appear in PROCESSLIST
|
||||
if err := this.retryOperation(waitForRename, true); err != nil {
|
||||
// Abort! Release the lock
|
||||
okToDropSentryTable <- true
|
||||
okToUnlockTable <- true
|
||||
return err
|
||||
}
|
||||
|
@ -703,9 +708,20 @@ func (this *Migrator) atomicCutOver() (err error) {
|
|||
}
|
||||
this.migrationContext.Log.Infof("Connection holding lock on original table still exists")
|
||||
|
||||
okToDropSentryTable <- true
|
||||
<-dropSentryTableDone
|
||||
|
||||
if err := this.applier.ValidateGhostTableLocked(renameSessionId); err != nil {
|
||||
// Abort! Kill Rename session and release the lock
|
||||
if err := this.applier.KillRenameSession(renameSessionId); err != nil {
|
||||
this.migrationContext.Log.Errore(err)
|
||||
}
|
||||
okToUnlockTable <- true
|
||||
return err
|
||||
}
|
||||
|
||||
// Now that we've found the RENAME blocking, AND the locking connection still alive,
|
||||
// we know it is safe to proceed to release the lock
|
||||
|
||||
okToUnlockTable <- true
|
||||
// BAM! magic table dropped, original table lock is released
|
||||
// -> RENAME released -> queries on original are unblocked.
|
||||
|
@ -1146,6 +1162,13 @@ func (this *Migrator) initiateApplier() error {
|
|||
}
|
||||
}
|
||||
this.applier.WriteChangelogState(string(GhostTableMigrated))
|
||||
|
||||
if this.migrationContext.AllowSetupMetadataLockInstruments {
|
||||
if err := this.applier.EnableMetadataLockInstrument(); err != nil {
|
||||
this.migrationContext.Log.Errorf("Unable to enable metadata lock instrument, see further error details. Bailing out")
|
||||
return err
|
||||
}
|
||||
}
|
||||
go this.applier.InitiateHeartbeat()
|
||||
return nil
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue
Block a user