diff --git a/go/logic/migrator.go b/go/logic/migrator.go index 88f5423..50ed38f 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -78,8 +78,8 @@ func NewMigrator() *Migrator { parser: sql.NewParser(), ghostTableMigrated: make(chan bool), firstThrottlingCollected: make(chan bool, 1), - rowCopyComplete: make(chan bool), - allEventsUpToLockProcessed: make(chan bool), + rowCopyComplete: make(chan bool, 1), + allEventsUpToLockProcessed: make(chan bool, 1), copyRowsQueue: make(chan tableWriteFunc), applyEventsQueue: make(chan tableWriteFunc, applyEventsQueueBuffer), @@ -181,6 +181,7 @@ func (this *Migrator) onChangelogStateEvent(dmlEvent *binlog.BinlogDMLEvent) (er return nil } changelogState := ChangelogState(dmlEvent.NewColumnValues.StringColumn(3)) + log.Infof("Intercepted changelog state %s", changelogState) switch changelogState { case GhostTableMigrated: { @@ -206,7 +207,7 @@ func (this *Migrator) onChangelogStateEvent(dmlEvent *binlog.BinlogDMLEvent) (er return fmt.Errorf("Unknown changelog state: %+v", changelogState) } } - log.Debugf("Received state %+v", changelogState) + log.Infof("Handled changelog state %s", changelogState) return nil } @@ -445,6 +446,8 @@ func (this *Migrator) cutOver() (err error) { // Inject the "AllEventsUpToLockProcessed" state hint, wait for it to appear in the binary logs, // make sure the queue is drained. func (this *Migrator) waitForEventsUpToLock() (err error) { + timeout := time.NewTimer(time.Second * time.Duration(this.migrationContext.CutOverLockTimeoutSeconds)) + this.migrationContext.MarkPointOfInterest() waitForEventsUpToLockStartTime := time.Now() @@ -454,7 +457,16 @@ func (this *Migrator) waitForEventsUpToLock() (err error) { } log.Infof("Waiting for events up to lock") atomic.StoreInt64(&this.migrationContext.AllEventsUpToLockProcessedInjectedFlag, 1) - <-this.allEventsUpToLockProcessed + select { + case <-timeout.C: + { + return log.Errorf("Timeout while waiting for events up to lock") + } + case <-this.allEventsUpToLockProcessed: + { + log.Infof("Waiting for events up to lock: got allEventsUpToLockProcessed.") + } + } waitForEventsUpToLockDuration := time.Since(waitForEventsUpToLockStartTime) log.Infof("Done waiting for events up to lock; duration=%+v", waitForEventsUpToLockDuration)