Merge pull request #103 from github/limited-streamer-retries

capped streamer retries
This commit is contained in:
Shlomi Noach 2016-07-26 09:27:49 +02:00 committed by GitHub
commit 071817a28a
2 changed files with 14 additions and 14 deletions

View File

@ -63,15 +63,6 @@ func (this *GoMySQLReader) ConnectBinlogStreamer(coordinates mysql.BinlogCoordin
return err return err
} }
func (this *GoMySQLReader) Reconnect() error {
this.binlogSyncer.Close()
connectCoordinates := &mysql.BinlogCoordinates{LogFile: this.currentCoordinates.LogFile, LogPos: 4}
if err := this.ConnectBinlogStreamer(*connectCoordinates); err != nil {
return err
}
return nil
}
func (this *GoMySQLReader) GetCurrentBinlogCoordinates() *mysql.BinlogCoordinates { func (this *GoMySQLReader) GetCurrentBinlogCoordinates() *mysql.BinlogCoordinates {
this.currentCoordinatesMutex.Lock() this.currentCoordinatesMutex.Lock()
defer this.currentCoordinatesMutex.Unlock() defer this.currentCoordinatesMutex.Unlock()

View File

@ -189,18 +189,27 @@ func (this *EventsStreamer) StreamEvents(canStopStreaming func() bool) error {
} }
}() }()
// The next should block and execute forever, unless there's a serious error // The next should block and execute forever, unless there's a serious error
var successiveFailures int64
var lastAppliedRowsEventHint mysql.BinlogCoordinates
for { for {
if err := this.binlogReader.StreamEvents(canStopStreaming, this.eventsChannel); err != nil { if err := this.binlogReader.StreamEvents(canStopStreaming, this.eventsChannel); err != nil {
log.Infof("StreamEvents encountered unexpected error: %+v", err) log.Infof("StreamEvents encountered unexpected error: %+v", err)
this.migrationContext.MarkPointOfInterest() this.migrationContext.MarkPointOfInterest()
time.Sleep(ReconnectStreamerSleepSeconds * time.Second) time.Sleep(ReconnectStreamerSleepSeconds * time.Second)
// Reposition at same binlog file. Single attempt (TODO: make multiple attempts?) // See if there's retry overflow
lastAppliedRowsEventHint := this.binlogReader.LastAppliedRowsEventHint if this.binlogReader.LastAppliedRowsEventHint.Equals(&lastAppliedRowsEventHint) {
successiveFailures += 1
} else {
successiveFailures = 0
}
if successiveFailures > this.migrationContext.MaxRetries() {
return fmt.Errorf("%d successive failures in streamer reconnect at coordinates %+v", lastAppliedRowsEventHint)
}
// Reposition at same binlog file.
lastAppliedRowsEventHint = this.binlogReader.LastAppliedRowsEventHint
log.Infof("Reconnecting... Will resume at %+v", lastAppliedRowsEventHint) log.Infof("Reconnecting... Will resume at %+v", lastAppliedRowsEventHint)
// if err := this.binlogReader.Reconnect(); err != nil {
// return err
// }
if err := this.initBinlogReader(this.GetReconnectBinlogCoordinates()); err != nil { if err := this.initBinlogReader(this.GetReconnectBinlogCoordinates()); err != nil {
return err return err
} }