Compare commits

...

3 Commits

Author SHA1 Message Date
Shlomi Noach 6743c388bc
Merge branch 'master' into sanity-uptime 2020-02-05 12:30:39 +02:00
Shlomi Noach f636dc216a
Merge branch 'master' into sanity-uptime 2020-02-05 09:16:34 +02:00
Shlomi Noach 52d382d971 inspector uptime check: validate inspector has not restarted 2019-10-06 11:44:59 +03:00
3 changed files with 24 additions and 0 deletions

View File

@ -181,6 +181,7 @@ type MigrationContext struct {
TotalRowsCopied int64
TotalDMLEventsApplied int64
DMLBatchSize int64
InspectorUptimeSeconds int64
isThrottled bool
throttleReason string
throttleReasonHint ThrottleReasonHint

View File

@ -752,6 +752,13 @@ func (this *Inspector) readChangelogState(hint string) (string, error) {
return result, err
}
// readUptime reads MySQL server uptime (in seconds)
func (this *Inspector) readUptime() (uptime int64, err error) {
var dummy string
err = this.db.QueryRow("show global status like 'Uptime'").Scan(&dummy, &uptime)
return uptime, err
}
func (this *Inspector) getMasterConnectionConfig() (applierConfig *mysql.ConnectionConfig, err error) {
log.Infof("Recursively searching for replication master")
visitedKeys := mysql.NewInstanceKeyMap()

View File

@ -342,6 +342,7 @@ func (this *Migrator) Migrate() (err error) {
if err := this.initiateInspector(); err != nil {
return err
}
go this.initiateInspectorHealthCheck()
if err := this.initiateStreaming(); err != nil {
return err
}
@ -711,6 +712,21 @@ func (this *Migrator) initiateServer() (err error) {
return nil
}
func (this *Migrator) initiateInspectorHealthCheck() {
ticker := time.Tick(10 * time.Second)
for range ticker {
lastUptime := atomic.LoadInt64(&this.migrationContext.InspectorUptimeSeconds)
if uptime, err := this.inspector.readUptime(); err != nil {
log.Errore(err)
} else {
if uptime < lastUptime {
this.migrationContext.PanicAbort <- fmt.Errorf("Inspector Uptime is %+v, less than previously measured uptime %+v. Has the inspector been restarted? Bailing out.", uptime, lastUptime)
}
atomic.StoreInt64(&this.migrationContext.InspectorUptimeSeconds, uptime)
}
}
}
// initiateInspector connects, validates and inspects the "inspector" server.
// The "inspector" server is typically a replica; it is where we issue some
// queries such as: