From bc6f6109c724fcfacc6e351178ef3da1d2dbe265 Mon Sep 17 00:00:00 2001 From: Shlomi Noach Date: Sat, 13 Aug 2016 08:00:43 +0200 Subject: [PATCH] initial support for GTID on --test-on-replica - streamer: binlog coordinates listeners --- go/binlog/gomysql_reader.go | 14 ++------ go/logic/migrator.go | 6 ++-- go/logic/streamer.go | 68 ++++++++++++++++++++++--------------- 3 files changed, 46 insertions(+), 42 deletions(-) diff --git a/go/binlog/gomysql_reader.go b/go/binlog/gomysql_reader.go index 8fe9b41..90b7097 100644 --- a/go/binlog/gomysql_reader.go +++ b/go/binlog/gomysql_reader.go @@ -119,7 +119,7 @@ func (this *GoMySQLReader) handleRowsEvent(ev *replication.BinlogEvent, rowsEven } // StreamEvents -func (this *GoMySQLReader) StreamEvents(canStopStreaming func() bool, entriesChannel chan<- *BinlogEntry) error { +func (this *GoMySQLReader) StreamEvents(canStopStreaming func() bool, entriesChannel chan<- *BinlogEntry, binlogCoordinatesChannel chan<- *mysql.BinlogCoordinates) error { for { if canStopStreaming() { break @@ -128,33 +128,25 @@ func (this *GoMySQLReader) StreamEvents(canStopStreaming func() bool, entriesCha if err != nil { return err } - // if rand.Intn(1000) == 0 { - // this.binlogSyncer.Close() - // log.Debugf("current: %+v, hint: %+v", this.currentCoordinates, this.LastAppliedRowsEventHint) - // return log.Errorf(".............haha got random error") - // } - // log.Debugf("0001 ........ currentCoordinates: %+v", this.currentCoordinates) //TODO func() { this.currentCoordinatesMutex.Lock() defer this.currentCoordinatesMutex.Unlock() this.currentCoordinates.LogPos = int64(ev.Header.LogPos) }() if rotateEvent, ok := ev.Event.(*replication.RotateEvent); ok { - // log.Debugf("0008 ........ currentCoordinates: %+v", this.currentCoordinates) //TODO - // ev.Dump(os.Stdout) func() { this.currentCoordinatesMutex.Lock() defer this.currentCoordinatesMutex.Unlock() this.currentCoordinates.LogFile = string(rotateEvent.NextLogName) }() - // log.Debugf("0001 ........ currentCoordinates: %+v", this.currentCoordinates) //TODO log.Infof("rotate to next log name: %s", rotateEvent.NextLogName) } else if rowsEvent, ok := ev.Event.(*replication.RowsEvent); ok { if err := this.handleRowsEvent(ev, rowsEvent, entriesChannel); err != nil { return err } } - // log.Debugf("TODO ........ currentCoordinates: %+v", this.currentCoordinates) //TODO + eventCoordinates := this.GetCurrentBinlogCoordinates() + binlogCoordinatesChannel <- eventCoordinates } log.Debugf("done streaming events") diff --git a/go/logic/migrator.go b/go/logic/migrator.go index cc418a9..32edc00 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -1048,8 +1048,7 @@ func (this *Migrator) initiateStreaming() error { if err := this.eventsStreamer.InitDBConnections(); err != nil { return err } - this.eventsStreamer.AddListener( - false, + this.eventsStreamer.addDmlListener( this.migrationContext.DatabaseName, this.migrationContext.GetChangelogTableName(), func(dmlEvent *binlog.BinlogDMLEvent) error { @@ -1071,8 +1070,7 @@ func (this *Migrator) initiateStreaming() error { // addDMLEventsListener begins listening for binlog events on the original table, // and creates & enqueues a write task per such event. func (this *Migrator) addDMLEventsListener() error { - err := this.eventsStreamer.AddListener( - false, + err := this.eventsStreamer.addDmlListener( this.migrationContext.DatabaseName, this.migrationContext.OriginalTableName, func(dmlEvent *binlog.BinlogDMLEvent) error { diff --git a/go/logic/streamer.go b/go/logic/streamer.go index 7d78d9e..3dd9b6e 100644 --- a/go/logic/streamer.go +++ b/go/logic/streamer.go @@ -21,12 +21,15 @@ import ( ) type BinlogEventListener struct { - async bool databaseName string tableName string onDmlEvent func(event *binlog.BinlogDMLEvent) error } +type BinlogCoordinatesListener struct { + onBinlogCoordinates func(binlogCoordinates *mysql.BinlogCoordinates) error +} + const ( EventsChannelBufferSize = 1 ReconnectStreamerSleepSeconds = 5 @@ -39,25 +42,25 @@ type EventsStreamer struct { db *gosql.DB migrationContext *base.MigrationContext initialBinlogCoordinates *mysql.BinlogCoordinates - listeners [](*BinlogEventListener) + dmlListeners [](*BinlogEventListener) + coordinatesListeners [](*BinlogCoordinatesListener) listenersMutex *sync.Mutex - eventsChannel chan *binlog.BinlogEntry binlogReader *binlog.GoMySQLReader } func NewEventsStreamer() *EventsStreamer { return &EventsStreamer{ - connectionConfig: base.GetMigrationContext().InspectorConnectionConfig, - migrationContext: base.GetMigrationContext(), - listeners: [](*BinlogEventListener){}, - listenersMutex: &sync.Mutex{}, - eventsChannel: make(chan *binlog.BinlogEntry, EventsChannelBufferSize), + connectionConfig: base.GetMigrationContext().InspectorConnectionConfig, + migrationContext: base.GetMigrationContext(), + dmlListeners: [](*BinlogEventListener){}, + coordinatesListeners: [](*BinlogCoordinatesListener){}, + listenersMutex: &sync.Mutex{}, } } -// AddListener registers a new listener for binlog events, on a per-table basis -func (this *EventsStreamer) AddListener( - async bool, databaseName string, tableName string, onDmlEvent func(event *binlog.BinlogDMLEvent) error) (err error) { +// addDmlListener registers a new listener for binlog events, on a per-table basis +func (this *EventsStreamer) addDmlListener( + databaseName string, tableName string, onDmlEvent func(event *binlog.BinlogDMLEvent) error) (err error) { this.listenersMutex.Lock() defer this.listenersMutex.Unlock() @@ -69,36 +72,39 @@ func (this *EventsStreamer) AddListener( return fmt.Errorf("Empty table name in AddListener") } listener := &BinlogEventListener{ - async: async, databaseName: databaseName, tableName: tableName, onDmlEvent: onDmlEvent, } - this.listeners = append(this.listeners, listener) + this.dmlListeners = append(this.dmlListeners, listener) return nil } -// notifyListeners will notify relevant listeners with given DML event. Only +// notifyDmlListeners will notify relevant listeners with given DML event. Only // listeners registered for changes on the table on which the DML operates are notified. -func (this *EventsStreamer) notifyListeners(binlogEvent *binlog.BinlogDMLEvent) { +func (this *EventsStreamer) notifyDmlListeners(binlogEvent *binlog.BinlogDMLEvent) { this.listenersMutex.Lock() defer this.listenersMutex.Unlock() - for _, listener := range this.listeners { - listener := listener + for _, listener := range this.dmlListeners { if strings.ToLower(listener.databaseName) != strings.ToLower(binlogEvent.DatabaseName) { continue } if strings.ToLower(listener.tableName) != strings.ToLower(binlogEvent.TableName) { continue } - if listener.async { - go func() { - listener.onDmlEvent(binlogEvent) - }() - } else { - listener.onDmlEvent(binlogEvent) - } + listener.onDmlEvent(binlogEvent) + } +} + +// notifyBinlogCoordinatesListeners will notify all coordinates listeners, and these can decide what +// to do with the information +func (this *EventsStreamer) notifyBinlogCoordinatesListeners(binlogCoordinates *mysql.BinlogCoordinates) { + this.listenersMutex.Lock() + defer this.listenersMutex.Unlock() + + for _, listener := range this.coordinatesListeners { + listener.onBinlogCoordinates(binlogCoordinates) } } @@ -181,18 +187,26 @@ func (this *EventsStreamer) readCurrentBinlogCoordinates() error { // StreamEvents will begin streaming events. It will be blocking, so should be // executed by a goroutine func (this *EventsStreamer) StreamEvents(canStopStreaming func() bool) error { + eventsChannel := make(chan *binlog.BinlogEntry, EventsChannelBufferSize) go func() { - for binlogEntry := range this.eventsChannel { + for binlogEntry := range eventsChannel { if binlogEntry.DmlEvent != nil { - this.notifyListeners(binlogEntry.DmlEvent) + this.notifyDmlListeners(binlogEntry.DmlEvent) } } }() + binlogCoordinatesChannel := make(chan *mysql.BinlogCoordinates, EventsChannelBufferSize) + go func() { + for binlogCoordinates := range binlogCoordinatesChannel { + this.notifyBinlogCoordinatesListeners(binlogCoordinates) + } + }() + // The next should block and execute forever, unless there's a serious error var successiveFailures int64 var lastAppliedRowsEventHint mysql.BinlogCoordinates for { - if err := this.binlogReader.StreamEvents(canStopStreaming, this.eventsChannel); err != nil { + if err := this.binlogReader.StreamEvents(canStopStreaming, eventsChannel, binlogCoordinatesChannel); err != nil { log.Infof("StreamEvents encountered unexpected error: %+v", err) this.migrationContext.MarkPointOfInterest() time.Sleep(ReconnectStreamerSleepSeconds * time.Second)