Handle onChangelogHeartbeatEvent and update CurrentHeartbeatLag

This commit is contained in:
Cathal Coffey 2021-01-31 18:24:26 +00:00
parent 7207bc146a
commit 8aee288fd7
2 changed files with 13 additions and 0 deletions

View File

@ -178,6 +178,7 @@ type MigrationContext struct {
RenameTablesEndTime time.Time RenameTablesEndTime time.Time
pointOfInterestTime time.Time pointOfInterestTime time.Time
pointOfInterestTimeMutex *sync.Mutex pointOfInterestTimeMutex *sync.Mutex
CurrentHeartbeatLag int64
CurrentLag int64 CurrentLag int64
currentProgress uint64 currentProgress uint64
ThrottleHTTPStatusCode int64 ThrottleHTTPStatusCode int64

View File

@ -213,6 +213,8 @@ func (this *Migrator) onChangelogEvent(dmlEvent *binlog.BinlogDMLEvent) (err err
switch hint := dmlEvent.NewColumnValues.StringColumn(2); hint { switch hint := dmlEvent.NewColumnValues.StringColumn(2); hint {
case "state": case "state":
return this.onChangelogStateEvent(dmlEvent) return this.onChangelogStateEvent(dmlEvent)
case "heartbeat":
return this.onChangelogHeartbeatEvent(dmlEvent)
default: default:
return nil return nil
} }
@ -251,6 +253,16 @@ func (this *Migrator) onChangelogStateEvent(dmlEvent *binlog.BinlogDMLEvent) (er
return nil return nil
} }
func (this *Migrator) onChangelogHeartbeatEvent(dmlEvent *binlog.BinlogDMLEvent) (err error) {
changelogHeartbeatString := dmlEvent.NewColumnValues.StringColumn(3)
if lag, err := parseChangelogHeartbeat(changelogHeartbeatString); err != nil {
return this.migrationContext.Log.Errore(err)
} else {
atomic.StoreInt64(&this.migrationContext.CurrentHeartbeatLag, int64(lag))
return nil
}
}
// listenOnPanicAbort aborts on abort request // listenOnPanicAbort aborts on abort request
func (this *Migrator) listenOnPanicAbort() { func (this *Migrator) listenOnPanicAbort() {
err := <-this.migrationContext.PanicAbort err := <-this.migrationContext.PanicAbort