never throttling inside cut-over critical section

This commit is contained in:
Shlomi Noach 2016-11-17 17:22:13 +01:00
parent b00cae11fa
commit 7ab6af8f5f
2 changed files with 19 additions and 16 deletions

View File

@ -156,6 +156,7 @@ type MigrationContext struct {
CleanupImminentFlag int64 CleanupImminentFlag int64
UserCommandedUnpostponeFlag int64 UserCommandedUnpostponeFlag int64
CutOverCompleteFlag int64 CutOverCompleteFlag int64
InCutOverCriticalSectionFlag int64
PanicAbort chan error PanicAbort chan error
OriginalTableColumnsOnApplier *sql.ColumnList OriginalTableColumnsOnApplier *sql.ColumnList
@ -438,6 +439,16 @@ func (this *MigrationContext) SetThrottled(throttle bool, reason string, reasonH
func (this *MigrationContext) IsThrottled() (bool, string, ThrottleReasonHint) { func (this *MigrationContext) IsThrottled() (bool, string, ThrottleReasonHint) {
this.throttleMutex.Lock() this.throttleMutex.Lock()
defer this.throttleMutex.Unlock() defer this.throttleMutex.Unlock()
// we don't throttle when cutting over. We _do_ throttle:
// - during copy phase
// - just before cut-over
// - in between cut-over retries
// When cutting over, we need to be aggressive. Cut-over holds table locks.
// We need to release those asap.
if atomic.LoadInt64(&this.InCutOverCriticalSectionFlag) > 0 {
return false, "critical section", NoThrottleReasonHint
}
return this.isThrottled, this.throttleReason, this.throttleReasonHint return this.isThrottled, this.throttleReason, this.throttleReasonHint
} }

View File

@ -68,7 +68,6 @@ type Migrator struct {
allEventsUpToLockProcessed chan string allEventsUpToLockProcessed chan string
rowCopyCompleteFlag int64 rowCopyCompleteFlag int64
inCutOverCriticalActionFlag int64
// copyRowsQueue should not be buffered; if buffered some non-damaging but // copyRowsQueue should not be buffered; if buffered some non-damaging but
// excessive work happens at the end of the iteration as new copy-jobs arrive befroe realizing the copy is complete // excessive work happens at the end of the iteration as new copy-jobs arrive befroe realizing the copy is complete
copyRowsQueue chan tableWriteFunc copyRowsQueue chan tableWriteFunc
@ -409,6 +408,7 @@ func (this *Migrator) cutOver() (err error) {
) )
atomic.StoreInt64(&this.migrationContext.IsPostponingCutOver, 0) atomic.StoreInt64(&this.migrationContext.IsPostponingCutOver, 0)
this.migrationContext.MarkPointOfInterest() this.migrationContext.MarkPointOfInterest()
log.Debugf("checking for cut-over postpone: complete")
if this.migrationContext.TestOnReplica { if this.migrationContext.TestOnReplica {
// With `--test-on-replica` we stop replication thread, and then proceed to use // With `--test-on-replica` we stop replication thread, and then proceed to use
@ -445,7 +445,6 @@ func (this *Migrator) cutOver() (err error) {
// Inject the "AllEventsUpToLockProcessed" state hint, wait for it to appear in the binary logs, // Inject the "AllEventsUpToLockProcessed" state hint, wait for it to appear in the binary logs,
// make sure the queue is drained. // make sure the queue is drained.
func (this *Migrator) waitForEventsUpToLock() (err error) { func (this *Migrator) waitForEventsUpToLock() (err error) {
// timeout := time.NewTimer(time.Minute * time.Duration(this.migrationContext.CutOverLockTimeoutSeconds))
timeout := time.NewTimer(time.Second * time.Duration(this.migrationContext.CutOverLockTimeoutSeconds)) timeout := time.NewTimer(time.Second * time.Duration(this.migrationContext.CutOverLockTimeoutSeconds))
this.migrationContext.MarkPointOfInterest() this.migrationContext.MarkPointOfInterest()
@ -488,8 +487,8 @@ func (this *Migrator) waitForEventsUpToLock() (err error) {
// There is a point in time where the "original" table does not exist and queries are non-blocked // There is a point in time where the "original" table does not exist and queries are non-blocked
// and failing. // and failing.
func (this *Migrator) cutOverTwoStep() (err error) { func (this *Migrator) cutOverTwoStep() (err error) {
atomic.StoreInt64(&this.inCutOverCriticalActionFlag, 1) atomic.StoreInt64(&this.migrationContext.InCutOverCriticalSectionFlag, 1)
defer atomic.StoreInt64(&this.inCutOverCriticalActionFlag, 0) defer atomic.StoreInt64(&this.migrationContext.InCutOverCriticalSectionFlag, 0)
atomic.StoreInt64(&this.migrationContext.AllEventsUpToLockProcessedInjectedFlag, 0) atomic.StoreInt64(&this.migrationContext.AllEventsUpToLockProcessedInjectedFlag, 0)
if err := this.retryOperation(this.applier.LockOriginalTable); err != nil { if err := this.retryOperation(this.applier.LockOriginalTable); err != nil {
@ -514,8 +513,8 @@ func (this *Migrator) cutOverTwoStep() (err error) {
// atomicCutOver // atomicCutOver
func (this *Migrator) atomicCutOver() (err error) { func (this *Migrator) atomicCutOver() (err error) {
atomic.StoreInt64(&this.inCutOverCriticalActionFlag, 1) atomic.StoreInt64(&this.migrationContext.InCutOverCriticalSectionFlag, 1)
defer atomic.StoreInt64(&this.inCutOverCriticalActionFlag, 0) defer atomic.StoreInt64(&this.migrationContext.InCutOverCriticalSectionFlag, 0)
okToUnlockTable := make(chan bool, 4) okToUnlockTable := make(chan bool, 4)
defer func() { defer func() {
@ -1022,15 +1021,8 @@ func (this *Migrator) executeWriteFuncs() error {
return nil return nil
} }
for { for {
if atomic.LoadInt64(&this.inCutOverCriticalActionFlag) == 0 {
// we don't throttle when cutting over. We _do_ throttle:
// - during copy phase
// - just before cut-over
// - in between cut-over retries
this.throttler.throttle(nil) this.throttler.throttle(nil)
// When cutting over, we need to be aggressive. Cut-over holds table locks.
// We need to release those asap.
}
// We give higher priority to event processing, then secondary priority to // We give higher priority to event processing, then secondary priority to
// rowcopy // rowcopy
select { select {