diff --git a/go/base/context.go b/go/base/context.go index 3211067..3026dc5 100644 --- a/go/base/context.go +++ b/go/base/context.go @@ -178,6 +178,7 @@ type MigrationContext struct { RenameTablesEndTime time.Time pointOfInterestTime time.Time pointOfInterestTimeMutex *sync.Mutex + CurrentHeartbeatLag int64 CurrentLag int64 currentProgress uint64 ThrottleHTTPStatusCode int64 diff --git a/go/logic/migrator.go b/go/logic/migrator.go index 908a42f..83e4228 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -213,6 +213,8 @@ func (this *Migrator) onChangelogEvent(dmlEvent *binlog.BinlogDMLEvent) (err err switch hint := dmlEvent.NewColumnValues.StringColumn(2); hint { case "state": return this.onChangelogStateEvent(dmlEvent) + case "heartbeat": + return this.onChangelogHeartbeatEvent(dmlEvent) default: return nil } @@ -251,6 +253,16 @@ func (this *Migrator) onChangelogStateEvent(dmlEvent *binlog.BinlogDMLEvent) (er return nil } +func (this *Migrator) onChangelogHeartbeatEvent(dmlEvent *binlog.BinlogDMLEvent) (err error) { + changelogHeartbeatString := dmlEvent.NewColumnValues.StringColumn(3) + if lag, err := parseChangelogHeartbeat(changelogHeartbeatString); err != nil { + return this.migrationContext.Log.Errore(err) + } else { + atomic.StoreInt64(&this.migrationContext.CurrentHeartbeatLag, int64(lag)) + return nil + } +} + // listenOnPanicAbort aborts on abort request func (this *Migrator) listenOnPanicAbort() { err := <-this.migrationContext.PanicAbort