Merge pull request #44 from github/reconnect-streamer

Handling gh-ost replication timeouts
This commit is contained in:
Shlomi Noach 2016-05-24 08:48:25 +02:00
commit 6d9a8baa68
4 changed files with 65 additions and 38 deletions

View File

@ -1,7 +1,7 @@
#!/bin/bash #!/bin/bash
# #
# #
RELEASE_VERSION="0.7.16" RELEASE_VERSION="0.7.17"
buildpath=/tmp/gh-ost buildpath=/tmp/gh-ost
target=gh-ost target=gh-ost

View File

@ -5,14 +5,11 @@
package binlog package binlog
import ( import ()
"github.com/github/gh-ost/go/mysql"
)
// BinlogReader is a general interface whose implementations can choose their methods of reading // BinlogReader is a general interface whose implementations can choose their methods of reading
// a binary log file and parsing it into binlog entries // a binary log file and parsing it into binlog entries
type BinlogReader interface { type BinlogReader interface {
StreamEvents(canStopStreaming func() bool, entriesChannel chan<- *BinlogEntry) error StreamEvents(canStopStreaming func() bool, entriesChannel chan<- *BinlogEntry) error
GetCurrentBinlogCoordinates() *mysql.BinlogCoordinates
Reconnect() error Reconnect() error
} }

View File

@ -7,6 +7,7 @@ package binlog
import ( import (
"fmt" "fmt"
"sync"
"github.com/github/gh-ost/go/mysql" "github.com/github/gh-ost/go/mysql"
"github.com/github/gh-ost/go/sql" "github.com/github/gh-ost/go/sql"
@ -26,16 +27,17 @@ type GoMySQLReader struct {
connectionConfig *mysql.ConnectionConfig connectionConfig *mysql.ConnectionConfig
binlogSyncer *replication.BinlogSyncer binlogSyncer *replication.BinlogSyncer
binlogStreamer *replication.BinlogStreamer binlogStreamer *replication.BinlogStreamer
tableMap map[uint64]string
currentCoordinates mysql.BinlogCoordinates currentCoordinates mysql.BinlogCoordinates
lastHandledCoordinates mysql.BinlogCoordinates currentCoordinatesMutex *sync.Mutex
LastAppliedRowsEventHint mysql.BinlogCoordinates
} }
func NewGoMySQLReader(connectionConfig *mysql.ConnectionConfig) (binlogReader *GoMySQLReader, err error) { func NewGoMySQLReader(connectionConfig *mysql.ConnectionConfig) (binlogReader *GoMySQLReader, err error) {
binlogReader = &GoMySQLReader{ binlogReader = &GoMySQLReader{
connectionConfig: connectionConfig, connectionConfig: connectionConfig,
tableMap: make(map[uint64]string),
currentCoordinates: mysql.BinlogCoordinates{}, currentCoordinates: mysql.BinlogCoordinates{},
currentCoordinatesMutex: &sync.Mutex{},
binlogSyncer: nil,
binlogStreamer: nil, binlogStreamer: nil,
} }
binlogReader.binlogSyncer = replication.NewBinlogSyncer(serverId, "mysql") binlogReader.binlogSyncer = replication.NewBinlogSyncer(serverId, "mysql")
@ -63,11 +65,7 @@ func (this *GoMySQLReader) ConnectBinlogStreamer(coordinates mysql.BinlogCoordin
func (this *GoMySQLReader) Reconnect() error { func (this *GoMySQLReader) Reconnect() error {
this.binlogSyncer.Close() this.binlogSyncer.Close()
connectCoordinates := &mysql.BinlogCoordinates{LogFile: this.currentCoordinates.LogFile, LogPos: 4}
connectCoordinates := &this.lastHandledCoordinates
if connectCoordinates.IsEmpty() {
connectCoordinates = &this.currentCoordinates
}
if err := this.ConnectBinlogStreamer(*connectCoordinates); err != nil { if err := this.ConnectBinlogStreamer(*connectCoordinates); err != nil {
return err return err
} }
@ -75,13 +73,16 @@ func (this *GoMySQLReader) Reconnect() error {
} }
func (this *GoMySQLReader) GetCurrentBinlogCoordinates() *mysql.BinlogCoordinates { func (this *GoMySQLReader) GetCurrentBinlogCoordinates() *mysql.BinlogCoordinates {
return &this.currentCoordinates this.currentCoordinatesMutex.Lock()
defer this.currentCoordinatesMutex.Unlock()
returnCoordinates := this.currentCoordinates
return &returnCoordinates
} }
// StreamEvents // StreamEvents
func (this *GoMySQLReader) handleRowsEvent(ev *replication.BinlogEvent, rowsEvent *replication.RowsEvent, entriesChannel chan<- *BinlogEntry) error { func (this *GoMySQLReader) handleRowsEvent(ev *replication.BinlogEvent, rowsEvent *replication.RowsEvent, entriesChannel chan<- *BinlogEntry) error {
if this.currentCoordinates.SmallerThanOrEquals(&this.lastHandledCoordinates) && !this.lastHandledCoordinates.IsEmpty() { if this.currentCoordinates.SmallerThanOrEquals(&this.LastAppliedRowsEventHint) {
log.Infof("Skipping handled query at %+v", this.currentCoordinates) log.Debugf("Skipping handled query at %+v", this.currentCoordinates)
return nil return nil
} }
@ -122,6 +123,7 @@ func (this *GoMySQLReader) handleRowsEvent(ev *replication.BinlogEvent, rowsEven
// In reality, reads will be synchronous // In reality, reads will be synchronous
entriesChannel <- binlogEntry entriesChannel <- binlogEntry
} }
this.LastAppliedRowsEventHint = this.currentCoordinates
return nil return nil
} }
@ -135,21 +137,33 @@ func (this *GoMySQLReader) StreamEvents(canStopStreaming func() bool, entriesCha
if err != nil { if err != nil {
return err return err
} }
// if rand.Intn(1000) == 0 {
// this.binlogSyncer.Close()
// log.Debugf("current: %+v, hint: %+v", this.currentCoordinates, this.LastAppliedRowsEventHint)
// return log.Errorf(".............haha got random error")
// }
// log.Debugf("0001 ........ currentCoordinates: %+v", this.currentCoordinates) //TODO
func() {
this.currentCoordinatesMutex.Lock()
defer this.currentCoordinatesMutex.Unlock()
this.currentCoordinates.LogPos = int64(ev.Header.LogPos) this.currentCoordinates.LogPos = int64(ev.Header.LogPos)
}()
if rotateEvent, ok := ev.Event.(*replication.RotateEvent); ok { if rotateEvent, ok := ev.Event.(*replication.RotateEvent); ok {
// log.Debugf("0008 ........ currentCoordinates: %+v", this.currentCoordinates) //TODO
// ev.Dump(os.Stdout)
func() {
this.currentCoordinatesMutex.Lock()
defer this.currentCoordinatesMutex.Unlock()
this.currentCoordinates.LogFile = string(rotateEvent.NextLogName) this.currentCoordinates.LogFile = string(rotateEvent.NextLogName)
}()
// log.Debugf("0001 ........ currentCoordinates: %+v", this.currentCoordinates) //TODO
log.Infof("rotate to next log name: %s", rotateEvent.NextLogName) log.Infof("rotate to next log name: %s", rotateEvent.NextLogName)
} else if tableMapEvent, ok := ev.Event.(*replication.TableMapEvent); ok {
// Actually not being used, since Table is available in RowsEvent.
// Keeping this here in case I'm wrong about this. Sometime in the near
// future I should remove this.
this.tableMap[tableMapEvent.TableID] = string(tableMapEvent.Table)
} else if rowsEvent, ok := ev.Event.(*replication.RowsEvent); ok { } else if rowsEvent, ok := ev.Event.(*replication.RowsEvent); ok {
if err := this.handleRowsEvent(ev, rowsEvent, entriesChannel); err != nil { if err := this.handleRowsEvent(ev, rowsEvent, entriesChannel); err != nil {
return err return err
} }
} }
this.lastHandledCoordinates = this.currentCoordinates // log.Debugf("TODO ........ currentCoordinates: %+v", this.currentCoordinates) //TODO
} }
log.Debugf("done streaming events") log.Debugf("done streaming events")

View File

@ -42,7 +42,7 @@ type EventsStreamer struct {
listeners [](*BinlogEventListener) listeners [](*BinlogEventListener)
listenersMutex *sync.Mutex listenersMutex *sync.Mutex
eventsChannel chan *binlog.BinlogEntry eventsChannel chan *binlog.BinlogEntry
binlogReader binlog.BinlogReader binlogReader *binlog.GoMySQLReader
} }
func NewEventsStreamer() *EventsStreamer { func NewEventsStreamer() *EventsStreamer {
@ -110,15 +110,22 @@ func (this *EventsStreamer) InitDBConnections() (err error) {
if err := this.readCurrentBinlogCoordinates(); err != nil { if err := this.readCurrentBinlogCoordinates(); err != nil {
return err return err
} }
if err := this.initBinlogReader(this.initialBinlogCoordinates); err != nil {
return err
}
return nil
}
func (this *EventsStreamer) initBinlogReader(binlogCoordinates *mysql.BinlogCoordinates) error {
goMySQLReader, err := binlog.NewGoMySQLReader(this.migrationContext.InspectorConnectionConfig) goMySQLReader, err := binlog.NewGoMySQLReader(this.migrationContext.InspectorConnectionConfig)
if err != nil { if err != nil {
return err return err
} }
if err := goMySQLReader.ConnectBinlogStreamer(*this.initialBinlogCoordinates); err != nil { if err := goMySQLReader.ConnectBinlogStreamer(*binlogCoordinates); err != nil {
return err return err
} }
this.binlogReader = goMySQLReader this.binlogReader = goMySQLReader
return nil return nil
} }
@ -140,6 +147,10 @@ func (this *EventsStreamer) GetCurrentBinlogCoordinates() *mysql.BinlogCoordinat
return this.binlogReader.GetCurrentBinlogCoordinates() return this.binlogReader.GetCurrentBinlogCoordinates()
} }
func (this *EventsStreamer) GetReconnectBinlogCoordinates() *mysql.BinlogCoordinates {
return &mysql.BinlogCoordinates{LogFile: this.GetCurrentBinlogCoordinates().LogFile, LogPos: 4}
}
// validateGrants verifies the user by which we're executing has necessary grants // validateGrants verifies the user by which we're executing has necessary grants
// to do its thang. // to do its thang.
func (this *EventsStreamer) readCurrentBinlogCoordinates() error { func (this *EventsStreamer) readCurrentBinlogCoordinates() error {
@ -177,14 +188,19 @@ func (this *EventsStreamer) StreamEvents(canStopStreaming func() bool) error {
// The next should block and execute forever, unless there's a serious error // The next should block and execute forever, unless there's a serious error
for { for {
if err := this.binlogReader.StreamEvents(canStopStreaming, this.eventsChannel); err != nil { if err := this.binlogReader.StreamEvents(canStopStreaming, this.eventsChannel); err != nil {
// Reposition at same coordinates. Single attempt (TODO: make multiple attempts?)
log.Infof("StreamEvents encountered unexpected error: %+v", err) log.Infof("StreamEvents encountered unexpected error: %+v", err)
time.Sleep(ReconnectStreamerSleepSeconds * time.Second) time.Sleep(ReconnectStreamerSleepSeconds * time.Second)
log.Infof("Reconnecting...")
err = this.binlogReader.Reconnect() // Reposition at same binlog file. Single attempt (TODO: make multiple attempts?)
if err != nil { lastAppliedRowsEventHint := this.binlogReader.LastAppliedRowsEventHint
log.Infof("Reconnecting... Will resume at %+v", lastAppliedRowsEventHint)
// if err := this.binlogReader.Reconnect(); err != nil {
// return err
// }
if err := this.initBinlogReader(this.GetReconnectBinlogCoordinates()); err != nil {
return err return err
} }
this.binlogReader.LastAppliedRowsEventHint = lastAppliedRowsEventHint
} }
} }
} }