waitForEventsUpToLock timeout

more info on AllEventsUpToLockProcessed, before and after injecting/intercepting
This commit is contained in:
Shlomi Noach 2016-11-17 15:20:44 +01:00
parent 2771232589
commit ee447ad560

View File

@ -78,8 +78,8 @@ func NewMigrator() *Migrator {
parser: sql.NewParser(), parser: sql.NewParser(),
ghostTableMigrated: make(chan bool), ghostTableMigrated: make(chan bool),
firstThrottlingCollected: make(chan bool, 1), firstThrottlingCollected: make(chan bool, 1),
rowCopyComplete: make(chan bool), rowCopyComplete: make(chan bool, 1),
allEventsUpToLockProcessed: make(chan bool), allEventsUpToLockProcessed: make(chan bool, 1),
copyRowsQueue: make(chan tableWriteFunc), copyRowsQueue: make(chan tableWriteFunc),
applyEventsQueue: make(chan tableWriteFunc, applyEventsQueueBuffer), applyEventsQueue: make(chan tableWriteFunc, applyEventsQueueBuffer),
@ -181,6 +181,7 @@ func (this *Migrator) onChangelogStateEvent(dmlEvent *binlog.BinlogDMLEvent) (er
return nil return nil
} }
changelogState := ChangelogState(dmlEvent.NewColumnValues.StringColumn(3)) changelogState := ChangelogState(dmlEvent.NewColumnValues.StringColumn(3))
log.Infof("Intercepted changelog state %s", changelogState)
switch changelogState { switch changelogState {
case GhostTableMigrated: case GhostTableMigrated:
{ {
@ -206,7 +207,7 @@ func (this *Migrator) onChangelogStateEvent(dmlEvent *binlog.BinlogDMLEvent) (er
return fmt.Errorf("Unknown changelog state: %+v", changelogState) return fmt.Errorf("Unknown changelog state: %+v", changelogState)
} }
} }
log.Debugf("Received state %+v", changelogState) log.Infof("Handled changelog state %s", changelogState)
return nil return nil
} }
@ -445,6 +446,8 @@ func (this *Migrator) cutOver() (err error) {
// Inject the "AllEventsUpToLockProcessed" state hint, wait for it to appear in the binary logs, // Inject the "AllEventsUpToLockProcessed" state hint, wait for it to appear in the binary logs,
// make sure the queue is drained. // make sure the queue is drained.
func (this *Migrator) waitForEventsUpToLock() (err error) { func (this *Migrator) waitForEventsUpToLock() (err error) {
timeout := time.NewTimer(time.Second * time.Duration(this.migrationContext.CutOverLockTimeoutSeconds))
this.migrationContext.MarkPointOfInterest() this.migrationContext.MarkPointOfInterest()
waitForEventsUpToLockStartTime := time.Now() waitForEventsUpToLockStartTime := time.Now()
@ -454,7 +457,16 @@ func (this *Migrator) waitForEventsUpToLock() (err error) {
} }
log.Infof("Waiting for events up to lock") log.Infof("Waiting for events up to lock")
atomic.StoreInt64(&this.migrationContext.AllEventsUpToLockProcessedInjectedFlag, 1) atomic.StoreInt64(&this.migrationContext.AllEventsUpToLockProcessedInjectedFlag, 1)
<-this.allEventsUpToLockProcessed select {
case <-timeout.C:
{
return log.Errorf("Timeout while waiting for events up to lock")
}
case <-this.allEventsUpToLockProcessed:
{
log.Infof("Waiting for events up to lock: got allEventsUpToLockProcessed.")
}
}
waitForEventsUpToLockDuration := time.Since(waitForEventsUpToLockStartTime) waitForEventsUpToLockDuration := time.Since(waitForEventsUpToLockStartTime)
log.Infof("Done waiting for events up to lock; duration=%+v", waitForEventsUpToLockDuration) log.Infof("Done waiting for events up to lock; duration=%+v", waitForEventsUpToLockDuration)