Merge pull request #315 from github/wait-for-events-up-to-lock-fix
Fixing hanging cut-over waiting on `AllEventsUpToLockProcessed`
This commit is contained in:
commit
915be21055
@ -156,6 +156,7 @@ type MigrationContext struct {
|
||||
CleanupImminentFlag int64
|
||||
UserCommandedUnpostponeFlag int64
|
||||
CutOverCompleteFlag int64
|
||||
InCutOverCriticalSectionFlag int64
|
||||
PanicAbort chan error
|
||||
|
||||
OriginalTableColumnsOnApplier *sql.ColumnList
|
||||
@ -438,6 +439,16 @@ func (this *MigrationContext) SetThrottled(throttle bool, reason string, reasonH
|
||||
func (this *MigrationContext) IsThrottled() (bool, string, ThrottleReasonHint) {
|
||||
this.throttleMutex.Lock()
|
||||
defer this.throttleMutex.Unlock()
|
||||
|
||||
// we don't throttle when cutting over. We _do_ throttle:
|
||||
// - during copy phase
|
||||
// - just before cut-over
|
||||
// - in between cut-over retries
|
||||
// When cutting over, we need to be aggressive. Cut-over holds table locks.
|
||||
// We need to release those asap.
|
||||
if atomic.LoadInt64(&this.InCutOverCriticalSectionFlag) > 0 {
|
||||
return false, "critical section", NoThrottleReasonHint
|
||||
}
|
||||
return this.isThrottled, this.throttleReason, this.throttleReasonHint
|
||||
}
|
||||
|
||||
|
@ -694,8 +694,7 @@ func (this *Applier) DropAtomicCutOverSentryTableIfExists() error {
|
||||
return this.dropTable(tableName)
|
||||
}
|
||||
|
||||
// DropAtomicCutOverSentryTableIfExists checks if the "old" table name
|
||||
// happens to be a cut-over magic table; if so, it drops it.
|
||||
// CreateAtomicCutOverSentryTable
|
||||
func (this *Applier) CreateAtomicCutOverSentryTable() error {
|
||||
if err := this.DropAtomicCutOverSentryTableIfExists(); err != nil {
|
||||
return err
|
||||
|
@ -11,6 +11,7 @@ import (
|
||||
"math"
|
||||
"os"
|
||||
"os/signal"
|
||||
"strings"
|
||||
"sync/atomic"
|
||||
"syscall"
|
||||
"time"
|
||||
@ -30,6 +31,10 @@ const (
|
||||
AllEventsUpToLockProcessed = "AllEventsUpToLockProcessed"
|
||||
)
|
||||
|
||||
func ReadChangelogState(s string) ChangelogState {
|
||||
return ChangelogState(strings.Split(s, ":")[0])
|
||||
}
|
||||
|
||||
type tableWriteFunc func() error
|
||||
|
||||
const (
|
||||
@ -60,10 +65,9 @@ type Migrator struct {
|
||||
firstThrottlingCollected chan bool
|
||||
ghostTableMigrated chan bool
|
||||
rowCopyComplete chan bool
|
||||
allEventsUpToLockProcessed chan bool
|
||||
allEventsUpToLockProcessed chan string
|
||||
|
||||
rowCopyCompleteFlag int64
|
||||
inCutOverCriticalActionFlag int64
|
||||
rowCopyCompleteFlag int64
|
||||
// copyRowsQueue should not be buffered; if buffered some non-damaging but
|
||||
// excessive work happens at the end of the iteration as new copy-jobs arrive befroe realizing the copy is complete
|
||||
copyRowsQueue chan tableWriteFunc
|
||||
@ -79,7 +83,7 @@ func NewMigrator() *Migrator {
|
||||
ghostTableMigrated: make(chan bool),
|
||||
firstThrottlingCollected: make(chan bool, 1),
|
||||
rowCopyComplete: make(chan bool),
|
||||
allEventsUpToLockProcessed: make(chan bool),
|
||||
allEventsUpToLockProcessed: make(chan string),
|
||||
|
||||
copyRowsQueue: make(chan tableWriteFunc),
|
||||
applyEventsQueue: make(chan tableWriteFunc, applyEventsQueueBuffer),
|
||||
@ -180,7 +184,9 @@ func (this *Migrator) onChangelogStateEvent(dmlEvent *binlog.BinlogDMLEvent) (er
|
||||
if hint := dmlEvent.NewColumnValues.StringColumn(2); hint != "state" {
|
||||
return nil
|
||||
}
|
||||
changelogState := ChangelogState(dmlEvent.NewColumnValues.StringColumn(3))
|
||||
changelogStateString := dmlEvent.NewColumnValues.StringColumn(3)
|
||||
changelogState := ReadChangelogState(changelogStateString)
|
||||
log.Infof("Intercepted changelog state %s", changelogState)
|
||||
switch changelogState {
|
||||
case GhostTableMigrated:
|
||||
{
|
||||
@ -189,7 +195,7 @@ func (this *Migrator) onChangelogStateEvent(dmlEvent *binlog.BinlogDMLEvent) (er
|
||||
case AllEventsUpToLockProcessed:
|
||||
{
|
||||
applyEventFunc := func() error {
|
||||
this.allEventsUpToLockProcessed <- true
|
||||
this.allEventsUpToLockProcessed <- changelogStateString
|
||||
return nil
|
||||
}
|
||||
// at this point we know all events up to lock have been read from the streamer,
|
||||
@ -206,7 +212,7 @@ func (this *Migrator) onChangelogStateEvent(dmlEvent *binlog.BinlogDMLEvent) (er
|
||||
return fmt.Errorf("Unknown changelog state: %+v", changelogState)
|
||||
}
|
||||
}
|
||||
log.Debugf("Received state %+v", changelogState)
|
||||
log.Infof("Handled changelog state %s", changelogState)
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -343,7 +349,7 @@ func (this *Migrator) Migrate() (err error) {
|
||||
if err := this.hooksExecutor.onBeforeCutOver(); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := this.cutOver(); err != nil {
|
||||
if err := this.retryOperation(this.cutOver); err != nil {
|
||||
return err
|
||||
}
|
||||
atomic.StoreInt64(&this.migrationContext.CutOverCompleteFlag, 1)
|
||||
@ -377,16 +383,18 @@ func (this *Migrator) cutOver() (err error) {
|
||||
})
|
||||
|
||||
this.migrationContext.MarkPointOfInterest()
|
||||
log.Debugf("checking for cut-over postpone")
|
||||
this.sleepWhileTrue(
|
||||
func() (bool, error) {
|
||||
if this.migrationContext.PostponeCutOverFlagFile == "" {
|
||||
return false, nil
|
||||
}
|
||||
if atomic.LoadInt64(&this.migrationContext.UserCommandedUnpostponeFlag) > 0 {
|
||||
atomic.StoreInt64(&this.migrationContext.UserCommandedUnpostponeFlag, 0)
|
||||
return false, nil
|
||||
}
|
||||
if base.FileExists(this.migrationContext.PostponeCutOverFlagFile) {
|
||||
// Throttle file defined and exists!
|
||||
// Postpone file defined and exists!
|
||||
if atomic.LoadInt64(&this.migrationContext.IsPostponingCutOver) == 0 {
|
||||
if err := this.hooksExecutor.onBeginPostponed(); err != nil {
|
||||
return true, err
|
||||
@ -400,6 +408,7 @@ func (this *Migrator) cutOver() (err error) {
|
||||
)
|
||||
atomic.StoreInt64(&this.migrationContext.IsPostponingCutOver, 0)
|
||||
this.migrationContext.MarkPointOfInterest()
|
||||
log.Debugf("checking for cut-over postpone: complete")
|
||||
|
||||
if this.migrationContext.TestOnReplica {
|
||||
// With `--test-on-replica` we stop replication thread, and then proceed to use
|
||||
@ -424,20 +433,11 @@ func (this *Migrator) cutOver() (err error) {
|
||||
if this.migrationContext.CutOverType == base.CutOverAtomic {
|
||||
// Atomic solution: we use low timeout and multiple attempts. But for
|
||||
// each failed attempt, we throttle until replication lag is back to normal
|
||||
err := this.retryOperation(
|
||||
func() error {
|
||||
return this.executeAndThrottleOnError(this.atomicCutOver)
|
||||
},
|
||||
)
|
||||
err := this.atomicCutOver()
|
||||
return err
|
||||
}
|
||||
if this.migrationContext.CutOverType == base.CutOverTwoStep {
|
||||
err := this.retryOperation(
|
||||
func() error {
|
||||
return this.executeAndThrottleOnError(this.cutOverTwoStep)
|
||||
},
|
||||
)
|
||||
return err
|
||||
return this.cutOverTwoStep()
|
||||
}
|
||||
return log.Fatalf("Unknown cut-over type: %d; should never get here!", this.migrationContext.CutOverType)
|
||||
}
|
||||
@ -445,16 +445,35 @@ func (this *Migrator) cutOver() (err error) {
|
||||
// Inject the "AllEventsUpToLockProcessed" state hint, wait for it to appear in the binary logs,
|
||||
// make sure the queue is drained.
|
||||
func (this *Migrator) waitForEventsUpToLock() (err error) {
|
||||
timeout := time.NewTimer(time.Second * time.Duration(this.migrationContext.CutOverLockTimeoutSeconds))
|
||||
|
||||
this.migrationContext.MarkPointOfInterest()
|
||||
waitForEventsUpToLockStartTime := time.Now()
|
||||
|
||||
log.Infof("Writing changelog state: %+v", AllEventsUpToLockProcessed)
|
||||
if _, err := this.applier.WriteChangelogState(string(AllEventsUpToLockProcessed)); err != nil {
|
||||
allEventsUpToLockProcessedChallenge := fmt.Sprintf("%s:%d", string(AllEventsUpToLockProcessed), waitForEventsUpToLockStartTime.UnixNano())
|
||||
log.Infof("Writing changelog state: %+v", allEventsUpToLockProcessedChallenge)
|
||||
if _, err := this.applier.WriteChangelogState(allEventsUpToLockProcessedChallenge); err != nil {
|
||||
return err
|
||||
}
|
||||
log.Infof("Waiting for events up to lock")
|
||||
atomic.StoreInt64(&this.migrationContext.AllEventsUpToLockProcessedInjectedFlag, 1)
|
||||
<-this.allEventsUpToLockProcessed
|
||||
for found := false; !found; {
|
||||
select {
|
||||
case <-timeout.C:
|
||||
{
|
||||
return log.Errorf("Timeout while waiting for events up to lock")
|
||||
}
|
||||
case state := <-this.allEventsUpToLockProcessed:
|
||||
{
|
||||
if state == allEventsUpToLockProcessedChallenge {
|
||||
log.Infof("Waiting for events up to lock: got %s", state)
|
||||
found = true
|
||||
} else {
|
||||
log.Infof("Waiting for events up to lock: skipping %s", state)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
waitForEventsUpToLockDuration := time.Since(waitForEventsUpToLockStartTime)
|
||||
|
||||
log.Infof("Done waiting for events up to lock; duration=%+v", waitForEventsUpToLockDuration)
|
||||
@ -468,8 +487,8 @@ func (this *Migrator) waitForEventsUpToLock() (err error) {
|
||||
// There is a point in time where the "original" table does not exist and queries are non-blocked
|
||||
// and failing.
|
||||
func (this *Migrator) cutOverTwoStep() (err error) {
|
||||
atomic.StoreInt64(&this.inCutOverCriticalActionFlag, 1)
|
||||
defer atomic.StoreInt64(&this.inCutOverCriticalActionFlag, 0)
|
||||
atomic.StoreInt64(&this.migrationContext.InCutOverCriticalSectionFlag, 1)
|
||||
defer atomic.StoreInt64(&this.migrationContext.InCutOverCriticalSectionFlag, 0)
|
||||
atomic.StoreInt64(&this.migrationContext.AllEventsUpToLockProcessedInjectedFlag, 0)
|
||||
|
||||
if err := this.retryOperation(this.applier.LockOriginalTable); err != nil {
|
||||
@ -494,10 +513,12 @@ func (this *Migrator) cutOverTwoStep() (err error) {
|
||||
|
||||
// atomicCutOver
|
||||
func (this *Migrator) atomicCutOver() (err error) {
|
||||
atomic.StoreInt64(&this.inCutOverCriticalActionFlag, 1)
|
||||
defer atomic.StoreInt64(&this.inCutOverCriticalActionFlag, 0)
|
||||
atomic.StoreInt64(&this.migrationContext.InCutOverCriticalSectionFlag, 1)
|
||||
defer atomic.StoreInt64(&this.migrationContext.InCutOverCriticalSectionFlag, 0)
|
||||
|
||||
okToUnlockTable := make(chan bool, 4)
|
||||
defer func() {
|
||||
okToUnlockTable <- true
|
||||
this.applier.DropAtomicCutOverSentryTableIfExists()
|
||||
}()
|
||||
|
||||
@ -505,7 +526,6 @@ func (this *Migrator) atomicCutOver() (err error) {
|
||||
|
||||
lockOriginalSessionIdChan := make(chan int64, 2)
|
||||
tableLocked := make(chan error, 2)
|
||||
okToUnlockTable := make(chan bool, 3)
|
||||
tableUnlocked := make(chan error, 2)
|
||||
go func() {
|
||||
if err := this.applier.AtomicCutOverMagicLock(lockOriginalSessionIdChan, tableLocked, okToUnlockTable, tableUnlocked); err != nil {
|
||||
@ -519,7 +539,9 @@ func (this *Migrator) atomicCutOver() (err error) {
|
||||
log.Infof("Session locking original & magic tables is %+v", lockOriginalSessionId)
|
||||
// At this point we know the original table is locked.
|
||||
// We know any newly incoming DML on original table is blocked.
|
||||
this.waitForEventsUpToLock()
|
||||
if err := this.waitForEventsUpToLock(); err != nil {
|
||||
return log.Errore(err)
|
||||
}
|
||||
|
||||
// Step 2
|
||||
// We now attempt an atomic RENAME on original & ghost tables, and expect it to block.
|
||||
@ -999,15 +1021,8 @@ func (this *Migrator) executeWriteFuncs() error {
|
||||
return nil
|
||||
}
|
||||
for {
|
||||
if atomic.LoadInt64(&this.inCutOverCriticalActionFlag) == 0 {
|
||||
// we don't throttle when cutting over. We _do_ throttle:
|
||||
// - during copy phase
|
||||
// - just before cut-over
|
||||
// - in between cut-over retries
|
||||
this.throttler.throttle(nil)
|
||||
// When cutting over, we need to be aggressive. Cut-over holds table locks.
|
||||
// We need to release those asap.
|
||||
}
|
||||
this.throttler.throttle(nil)
|
||||
|
||||
// We give higher priority to event processing, then secondary priority to
|
||||
// rowcopy
|
||||
select {
|
||||
|
Loading…
Reference in New Issue
Block a user