Ensure cleanup happens, even on error

This commit is contained in:
Nikhil Mathew 2017-08-28 15:53:47 -07:00
parent e4bb70df43
commit 0a7e713e9f
6 changed files with 41 additions and 13 deletions

View File

@ -290,12 +290,6 @@ func (this *Applier) WriteChangelogState(value string) (string, error) {
return this.WriteAndLogChangelog("state", value)
}
func (this *Applier) FinalCleanup() {
this.db.Close()
this.singletonDB.Close()
this.finishedMigrating = true
}
// InitiateHeartbeat creates a heartbeat cycle, writing to the changelog table.
// This is done asynchronously
func (this *Applier) InitiateHeartbeat() {
@ -1044,3 +1038,10 @@ func (this *Applier) ApplyDMLEventQueries(dmlEvents [](*binlog.BinlogDMLEvent))
log.Debugf("ApplyDMLEventQueries() applied %d events in one transaction", len(dmlEvents))
return nil
}
func (this *Applier) Teardown() {
log.Debugf("Tearing down...")
this.db.Close()
this.singletonDB.Close()
this.finishedMigrating = true
}

View File

@ -723,3 +723,8 @@ func (this *Inspector) getReplicationLag() (replicationLag time.Duration, err er
)
return replicationLag, err
}
func (this *Inspector) Teardown() {
this.db.Close()
return
}

View File

@ -291,6 +291,11 @@ func (this *Migrator) Migrate() (err error) {
if err := this.validateStatement(); err != nil {
return err
}
// After this point, we'll need to teardown anything that's been started
// so we don't leave things hanging around
defer this.teardown()
if err := this.initiateInspector(); err != nil {
return err
}
@ -1223,10 +1228,26 @@ func (this *Migrator) finalCleanup() error {
}
}
this.finishedMigrating = true
this.applier.FinalCleanup()
this.eventsStreamer.FinalCleanup()
sqlutils.ResetDBCache()
return nil
}
func (this *Migrator) teardown() {
this.finishedMigrating = true
if this.inspector != nil {
log.Infof("Tearing down inspector")
this.inspector.Teardown()
}
if this.applier != nil {
log.Infof("Tearing down applier")
this.applier.Teardown()
}
if this.eventsStreamer != nil {
log.Infof("Tearing down streamer")
this.eventsStreamer.Teardown()
}
sqlutils.ResetDBCache()
}

View File

@ -231,7 +231,7 @@ func (this *EventsStreamer) Close() (err error) {
return err
}
func (this *EventsStreamer) FinalCleanup() {
func (this *EventsStreamer) Teardown() {
this.db.Close()
return
}

View File

@ -187,6 +187,7 @@ func (this *Throttler) collectControlReplicasLag() {
} else if err = db.QueryRow(replicationLagQuery).Scan(&heartbeatValue); err != nil {
return lag, err
}
lag, err = parseChangelogHeartbeat(heartbeatValue)
return lag, err
}

View File

@ -40,6 +40,7 @@ func GetReplicationLag(connectionConfig *ConnectionConfig) (replicationLag time.
if db, _, err = sqlutils.GetDB(dbUri); err != nil {
return replicationLag, err
}
defer db.Close()
err = sqlutils.QueryRowsMap(db, `show slave status`, func(m sqlutils.RowMap) error {
slaveIORunning := m.GetString("Slave_IO_Running")
@ -52,7 +53,6 @@ func GetReplicationLag(connectionConfig *ConnectionConfig) (replicationLag time.
return nil
})
db.Close()
return replicationLag, err
}