AllEventsUpToLockProcessed uses unique signature

This commit is contained in:
Shlomi Noach 2016-11-17 15:50:54 +01:00
parent ee447ad560
commit ef874b8551

View File

@ -11,6 +11,7 @@ import (
"math"
"os"
"os/signal"
"strings"
"sync/atomic"
"syscall"
"time"
@ -60,7 +61,7 @@ type Migrator struct {
firstThrottlingCollected chan bool
ghostTableMigrated chan bool
rowCopyComplete chan bool
allEventsUpToLockProcessed chan bool
allEventsUpToLockProcessed chan string
rowCopyCompleteFlag int64
inCutOverCriticalActionFlag int64
@ -78,8 +79,8 @@ func NewMigrator() *Migrator {
parser: sql.NewParser(),
ghostTableMigrated: make(chan bool),
firstThrottlingCollected: make(chan bool, 1),
rowCopyComplete: make(chan bool, 1),
allEventsUpToLockProcessed: make(chan bool, 1),
rowCopyComplete: make(chan bool),
allEventsUpToLockProcessed: make(chan string),
copyRowsQueue: make(chan tableWriteFunc),
applyEventsQueue: make(chan tableWriteFunc, applyEventsQueueBuffer),
@ -180,7 +181,8 @@ func (this *Migrator) onChangelogStateEvent(dmlEvent *binlog.BinlogDMLEvent) (er
if hint := dmlEvent.NewColumnValues.StringColumn(2); hint != "state" {
return nil
}
changelogState := ChangelogState(dmlEvent.NewColumnValues.StringColumn(3))
changelogStateString := dmlEvent.NewColumnValues.StringColumn(3)
changelogState := ChangelogState(strings.Split(changelogStateString, ":")[0])
log.Infof("Intercepted changelog state %s", changelogState)
switch changelogState {
case GhostTableMigrated:
@ -190,7 +192,7 @@ func (this *Migrator) onChangelogStateEvent(dmlEvent *binlog.BinlogDMLEvent) (er
case AllEventsUpToLockProcessed:
{
applyEventFunc := func() error {
this.allEventsUpToLockProcessed <- true
this.allEventsUpToLockProcessed <- changelogStateString
return nil
}
// at this point we know all events up to lock have been read from the streamer,
@ -451,20 +453,28 @@ func (this *Migrator) waitForEventsUpToLock() (err error) {
this.migrationContext.MarkPointOfInterest()
waitForEventsUpToLockStartTime := time.Now()
log.Infof("Writing changelog state: %+v", AllEventsUpToLockProcessed)
if _, err := this.applier.WriteChangelogState(string(AllEventsUpToLockProcessed)); err != nil {
allEventsUpToLockProcessedChallenge := fmt.Sprintf("%s:%d", string(AllEventsUpToLockProcessed), waitForEventsUpToLockStartTime.UnixNano())
log.Infof("Writing changelog state: %+v", allEventsUpToLockProcessedChallenge)
if _, err := this.applier.WriteChangelogState(allEventsUpToLockProcessedChallenge); err != nil {
return err
}
log.Infof("Waiting for events up to lock")
atomic.StoreInt64(&this.migrationContext.AllEventsUpToLockProcessedInjectedFlag, 1)
for found := false; !found; {
select {
case <-timeout.C:
{
return log.Errorf("Timeout while waiting for events up to lock")
}
case <-this.allEventsUpToLockProcessed:
case state := <-this.allEventsUpToLockProcessed:
{
log.Infof("Waiting for events up to lock: got allEventsUpToLockProcessed.")
if state == allEventsUpToLockProcessedChallenge {
log.Infof("Waiting for events up to lock: got %s", state)
found = true
} else {
log.Infof("Waiting for events up to lock: skipping %s", state)
}
}
}
}
waitForEventsUpToLockDuration := time.Since(waitForEventsUpToLockStartTime)