capped streamer retries

This commit is contained in:
Shlomi Noach 2016-07-25 15:17:30 +02:00
parent a74c1c0fd6
commit 1d77425fbe
2 changed files with 14 additions and 14 deletions

View File

@ -63,15 +63,6 @@ func (this *GoMySQLReader) ConnectBinlogStreamer(coordinates mysql.BinlogCoordin
return err
}
func (this *GoMySQLReader) Reconnect() error {
this.binlogSyncer.Close()
connectCoordinates := &mysql.BinlogCoordinates{LogFile: this.currentCoordinates.LogFile, LogPos: 4}
if err := this.ConnectBinlogStreamer(*connectCoordinates); err != nil {
return err
}
return nil
}
func (this *GoMySQLReader) GetCurrentBinlogCoordinates() *mysql.BinlogCoordinates {
this.currentCoordinatesMutex.Lock()
defer this.currentCoordinatesMutex.Unlock()

View File

@ -189,18 +189,27 @@ func (this *EventsStreamer) StreamEvents(canStopStreaming func() bool) error {
}
}()
// The next should block and execute forever, unless there's a serious error
var successiveFailures int64
var lastAppliedRowsEventHint mysql.BinlogCoordinates
for {
if err := this.binlogReader.StreamEvents(canStopStreaming, this.eventsChannel); err != nil {
log.Infof("StreamEvents encountered unexpected error: %+v", err)
this.migrationContext.MarkPointOfInterest()
time.Sleep(ReconnectStreamerSleepSeconds * time.Second)
// Reposition at same binlog file. Single attempt (TODO: make multiple attempts?)
lastAppliedRowsEventHint := this.binlogReader.LastAppliedRowsEventHint
// See if there's retry overflow
if this.binlogReader.LastAppliedRowsEventHint.Equals(&lastAppliedRowsEventHint) {
successiveFailures += 1
} else {
successiveFailures = 0
}
if successiveFailures > this.migrationContext.MaxRetries() {
return fmt.Errorf("%d successive failures in streamer reconnect at coordinates %+v", lastAppliedRowsEventHint)
}
// Reposition at same binlog file.
lastAppliedRowsEventHint = this.binlogReader.LastAppliedRowsEventHint
log.Infof("Reconnecting... Will resume at %+v", lastAppliedRowsEventHint)
// if err := this.binlogReader.Reconnect(); err != nil {
// return err
// }
if err := this.initBinlogReader(this.GetReconnectBinlogCoordinates()); err != nil {
return err
}