From 1d77425fbe1cb1ed6b59b2c6044ac164cb5b80a1 Mon Sep 17 00:00:00 2001 From: Shlomi Noach Date: Mon, 25 Jul 2016 15:17:30 +0200 Subject: [PATCH] capped streamer retries --- go/binlog/gomysql_reader.go | 9 --------- go/logic/streamer.go | 19 ++++++++++++++----- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/go/binlog/gomysql_reader.go b/go/binlog/gomysql_reader.go index 3cae0e6..8fe9b41 100644 --- a/go/binlog/gomysql_reader.go +++ b/go/binlog/gomysql_reader.go @@ -63,15 +63,6 @@ func (this *GoMySQLReader) ConnectBinlogStreamer(coordinates mysql.BinlogCoordin return err } -func (this *GoMySQLReader) Reconnect() error { - this.binlogSyncer.Close() - connectCoordinates := &mysql.BinlogCoordinates{LogFile: this.currentCoordinates.LogFile, LogPos: 4} - if err := this.ConnectBinlogStreamer(*connectCoordinates); err != nil { - return err - } - return nil -} - func (this *GoMySQLReader) GetCurrentBinlogCoordinates() *mysql.BinlogCoordinates { this.currentCoordinatesMutex.Lock() defer this.currentCoordinatesMutex.Unlock() diff --git a/go/logic/streamer.go b/go/logic/streamer.go index 68de75a..7d78d9e 100644 --- a/go/logic/streamer.go +++ b/go/logic/streamer.go @@ -189,18 +189,27 @@ func (this *EventsStreamer) StreamEvents(canStopStreaming func() bool) error { } }() // The next should block and execute forever, unless there's a serious error + var successiveFailures int64 + var lastAppliedRowsEventHint mysql.BinlogCoordinates for { if err := this.binlogReader.StreamEvents(canStopStreaming, this.eventsChannel); err != nil { log.Infof("StreamEvents encountered unexpected error: %+v", err) this.migrationContext.MarkPointOfInterest() time.Sleep(ReconnectStreamerSleepSeconds * time.Second) - // Reposition at same binlog file. Single attempt (TODO: make multiple attempts?) - lastAppliedRowsEventHint := this.binlogReader.LastAppliedRowsEventHint + // See if there's retry overflow + if this.binlogReader.LastAppliedRowsEventHint.Equals(&lastAppliedRowsEventHint) { + successiveFailures += 1 + } else { + successiveFailures = 0 + } + if successiveFailures > this.migrationContext.MaxRetries() { + return fmt.Errorf("%d successive failures in streamer reconnect at coordinates %+v", lastAppliedRowsEventHint) + } + + // Reposition at same binlog file. + lastAppliedRowsEventHint = this.binlogReader.LastAppliedRowsEventHint log.Infof("Reconnecting... Will resume at %+v", lastAppliedRowsEventHint) - // if err := this.binlogReader.Reconnect(); err != nil { - // return err - // } if err := this.initBinlogReader(this.GetReconnectBinlogCoordinates()); err != nil { return err }