inspector uptime check: validate inspector has not restarted

This commit is contained in:
Shlomi Noach 2019-10-06 11:44:59 +03:00
parent dcc3e90f29
commit 52d382d971
3 changed files with 24 additions and 0 deletions

View File

@ -179,6 +179,7 @@ type MigrationContext struct {
TotalRowsCopied int64 TotalRowsCopied int64
TotalDMLEventsApplied int64 TotalDMLEventsApplied int64
DMLBatchSize int64 DMLBatchSize int64
InspectorUptimeSeconds int64
isThrottled bool isThrottled bool
throttleReason string throttleReason string
throttleReasonHint ThrottleReasonHint throttleReasonHint ThrottleReasonHint

View File

@ -752,6 +752,13 @@ func (this *Inspector) readChangelogState(hint string) (string, error) {
return result, err return result, err
} }
// readUptime reads MySQL server uptime (in seconds)
func (this *Inspector) readUptime() (uptime int64, err error) {
var dummy string
err = this.db.QueryRow("show global status like 'Uptime'").Scan(&dummy, &uptime)
return uptime, err
}
func (this *Inspector) getMasterConnectionConfig() (applierConfig *mysql.ConnectionConfig, err error) { func (this *Inspector) getMasterConnectionConfig() (applierConfig *mysql.ConnectionConfig, err error) {
log.Infof("Recursively searching for replication master") log.Infof("Recursively searching for replication master")
visitedKeys := mysql.NewInstanceKeyMap() visitedKeys := mysql.NewInstanceKeyMap()

View File

@ -342,6 +342,7 @@ func (this *Migrator) Migrate() (err error) {
if err := this.initiateInspector(); err != nil { if err := this.initiateInspector(); err != nil {
return err return err
} }
go this.initiateInspectorHealthCheck()
if err := this.initiateStreaming(); err != nil { if err := this.initiateStreaming(); err != nil {
return err return err
} }
@ -711,6 +712,21 @@ func (this *Migrator) initiateServer() (err error) {
return nil return nil
} }
func (this *Migrator) initiateInspectorHealthCheck() {
ticker := time.Tick(10 * time.Second)
for range ticker {
lastUptime := atomic.LoadInt64(&this.migrationContext.InspectorUptimeSeconds)
if uptime, err := this.inspector.readUptime(); err != nil {
log.Errore(err)
} else {
if uptime < lastUptime {
this.migrationContext.PanicAbort <- fmt.Errorf("Inspector Uptime is %+v, less than previously measured uptime %+v. Has the inspector been restarted? Bailing out.", uptime, lastUptime)
}
atomic.StoreInt64(&this.migrationContext.InspectorUptimeSeconds, uptime)
}
}
}
// initiateInspector connects, validates and inspects the "inspector" server. // initiateInspector connects, validates and inspects the "inspector" server.
// The "inspector" server is typically a replica; it is where we issue some // The "inspector" server is typically a replica; it is where we issue some
// queries such as: // queries such as: