Compare commits

...

1 Commits

Author SHA1 Message Date
Shlomi Noach
bc6f6109c7 initial support for GTID on --test-on-replica
- streamer: binlog coordinates listeners
2016-08-13 08:00:43 +02:00
3 changed files with 46 additions and 42 deletions

View File

@ -119,7 +119,7 @@ func (this *GoMySQLReader) handleRowsEvent(ev *replication.BinlogEvent, rowsEven
} }
// StreamEvents // StreamEvents
func (this *GoMySQLReader) StreamEvents(canStopStreaming func() bool, entriesChannel chan<- *BinlogEntry) error { func (this *GoMySQLReader) StreamEvents(canStopStreaming func() bool, entriesChannel chan<- *BinlogEntry, binlogCoordinatesChannel chan<- *mysql.BinlogCoordinates) error {
for { for {
if canStopStreaming() { if canStopStreaming() {
break break
@ -128,33 +128,25 @@ func (this *GoMySQLReader) StreamEvents(canStopStreaming func() bool, entriesCha
if err != nil { if err != nil {
return err return err
} }
// if rand.Intn(1000) == 0 {
// this.binlogSyncer.Close()
// log.Debugf("current: %+v, hint: %+v", this.currentCoordinates, this.LastAppliedRowsEventHint)
// return log.Errorf(".............haha got random error")
// }
// log.Debugf("0001 ........ currentCoordinates: %+v", this.currentCoordinates) //TODO
func() { func() {
this.currentCoordinatesMutex.Lock() this.currentCoordinatesMutex.Lock()
defer this.currentCoordinatesMutex.Unlock() defer this.currentCoordinatesMutex.Unlock()
this.currentCoordinates.LogPos = int64(ev.Header.LogPos) this.currentCoordinates.LogPos = int64(ev.Header.LogPos)
}() }()
if rotateEvent, ok := ev.Event.(*replication.RotateEvent); ok { if rotateEvent, ok := ev.Event.(*replication.RotateEvent); ok {
// log.Debugf("0008 ........ currentCoordinates: %+v", this.currentCoordinates) //TODO
// ev.Dump(os.Stdout)
func() { func() {
this.currentCoordinatesMutex.Lock() this.currentCoordinatesMutex.Lock()
defer this.currentCoordinatesMutex.Unlock() defer this.currentCoordinatesMutex.Unlock()
this.currentCoordinates.LogFile = string(rotateEvent.NextLogName) this.currentCoordinates.LogFile = string(rotateEvent.NextLogName)
}() }()
// log.Debugf("0001 ........ currentCoordinates: %+v", this.currentCoordinates) //TODO
log.Infof("rotate to next log name: %s", rotateEvent.NextLogName) log.Infof("rotate to next log name: %s", rotateEvent.NextLogName)
} else if rowsEvent, ok := ev.Event.(*replication.RowsEvent); ok { } else if rowsEvent, ok := ev.Event.(*replication.RowsEvent); ok {
if err := this.handleRowsEvent(ev, rowsEvent, entriesChannel); err != nil { if err := this.handleRowsEvent(ev, rowsEvent, entriesChannel); err != nil {
return err return err
} }
} }
// log.Debugf("TODO ........ currentCoordinates: %+v", this.currentCoordinates) //TODO eventCoordinates := this.GetCurrentBinlogCoordinates()
binlogCoordinatesChannel <- eventCoordinates
} }
log.Debugf("done streaming events") log.Debugf("done streaming events")

View File

@ -1048,8 +1048,7 @@ func (this *Migrator) initiateStreaming() error {
if err := this.eventsStreamer.InitDBConnections(); err != nil { if err := this.eventsStreamer.InitDBConnections(); err != nil {
return err return err
} }
this.eventsStreamer.AddListener( this.eventsStreamer.addDmlListener(
false,
this.migrationContext.DatabaseName, this.migrationContext.DatabaseName,
this.migrationContext.GetChangelogTableName(), this.migrationContext.GetChangelogTableName(),
func(dmlEvent *binlog.BinlogDMLEvent) error { func(dmlEvent *binlog.BinlogDMLEvent) error {
@ -1071,8 +1070,7 @@ func (this *Migrator) initiateStreaming() error {
// addDMLEventsListener begins listening for binlog events on the original table, // addDMLEventsListener begins listening for binlog events on the original table,
// and creates & enqueues a write task per such event. // and creates & enqueues a write task per such event.
func (this *Migrator) addDMLEventsListener() error { func (this *Migrator) addDMLEventsListener() error {
err := this.eventsStreamer.AddListener( err := this.eventsStreamer.addDmlListener(
false,
this.migrationContext.DatabaseName, this.migrationContext.DatabaseName,
this.migrationContext.OriginalTableName, this.migrationContext.OriginalTableName,
func(dmlEvent *binlog.BinlogDMLEvent) error { func(dmlEvent *binlog.BinlogDMLEvent) error {

View File

@ -21,12 +21,15 @@ import (
) )
type BinlogEventListener struct { type BinlogEventListener struct {
async bool
databaseName string databaseName string
tableName string tableName string
onDmlEvent func(event *binlog.BinlogDMLEvent) error onDmlEvent func(event *binlog.BinlogDMLEvent) error
} }
type BinlogCoordinatesListener struct {
onBinlogCoordinates func(binlogCoordinates *mysql.BinlogCoordinates) error
}
const ( const (
EventsChannelBufferSize = 1 EventsChannelBufferSize = 1
ReconnectStreamerSleepSeconds = 5 ReconnectStreamerSleepSeconds = 5
@ -39,9 +42,9 @@ type EventsStreamer struct {
db *gosql.DB db *gosql.DB
migrationContext *base.MigrationContext migrationContext *base.MigrationContext
initialBinlogCoordinates *mysql.BinlogCoordinates initialBinlogCoordinates *mysql.BinlogCoordinates
listeners [](*BinlogEventListener) dmlListeners [](*BinlogEventListener)
coordinatesListeners [](*BinlogCoordinatesListener)
listenersMutex *sync.Mutex listenersMutex *sync.Mutex
eventsChannel chan *binlog.BinlogEntry
binlogReader *binlog.GoMySQLReader binlogReader *binlog.GoMySQLReader
} }
@ -49,15 +52,15 @@ func NewEventsStreamer() *EventsStreamer {
return &EventsStreamer{ return &EventsStreamer{
connectionConfig: base.GetMigrationContext().InspectorConnectionConfig, connectionConfig: base.GetMigrationContext().InspectorConnectionConfig,
migrationContext: base.GetMigrationContext(), migrationContext: base.GetMigrationContext(),
listeners: [](*BinlogEventListener){}, dmlListeners: [](*BinlogEventListener){},
coordinatesListeners: [](*BinlogCoordinatesListener){},
listenersMutex: &sync.Mutex{}, listenersMutex: &sync.Mutex{},
eventsChannel: make(chan *binlog.BinlogEntry, EventsChannelBufferSize),
} }
} }
// AddListener registers a new listener for binlog events, on a per-table basis // addDmlListener registers a new listener for binlog events, on a per-table basis
func (this *EventsStreamer) AddListener( func (this *EventsStreamer) addDmlListener(
async bool, databaseName string, tableName string, onDmlEvent func(event *binlog.BinlogDMLEvent) error) (err error) { databaseName string, tableName string, onDmlEvent func(event *binlog.BinlogDMLEvent) error) (err error) {
this.listenersMutex.Lock() this.listenersMutex.Lock()
defer this.listenersMutex.Unlock() defer this.listenersMutex.Unlock()
@ -69,37 +72,40 @@ func (this *EventsStreamer) AddListener(
return fmt.Errorf("Empty table name in AddListener") return fmt.Errorf("Empty table name in AddListener")
} }
listener := &BinlogEventListener{ listener := &BinlogEventListener{
async: async,
databaseName: databaseName, databaseName: databaseName,
tableName: tableName, tableName: tableName,
onDmlEvent: onDmlEvent, onDmlEvent: onDmlEvent,
} }
this.listeners = append(this.listeners, listener) this.dmlListeners = append(this.dmlListeners, listener)
return nil return nil
} }
// notifyListeners will notify relevant listeners with given DML event. Only // notifyDmlListeners will notify relevant listeners with given DML event. Only
// listeners registered for changes on the table on which the DML operates are notified. // listeners registered for changes on the table on which the DML operates are notified.
func (this *EventsStreamer) notifyListeners(binlogEvent *binlog.BinlogDMLEvent) { func (this *EventsStreamer) notifyDmlListeners(binlogEvent *binlog.BinlogDMLEvent) {
this.listenersMutex.Lock() this.listenersMutex.Lock()
defer this.listenersMutex.Unlock() defer this.listenersMutex.Unlock()
for _, listener := range this.listeners { for _, listener := range this.dmlListeners {
listener := listener
if strings.ToLower(listener.databaseName) != strings.ToLower(binlogEvent.DatabaseName) { if strings.ToLower(listener.databaseName) != strings.ToLower(binlogEvent.DatabaseName) {
continue continue
} }
if strings.ToLower(listener.tableName) != strings.ToLower(binlogEvent.TableName) { if strings.ToLower(listener.tableName) != strings.ToLower(binlogEvent.TableName) {
continue continue
} }
if listener.async {
go func() {
listener.onDmlEvent(binlogEvent)
}()
} else {
listener.onDmlEvent(binlogEvent) listener.onDmlEvent(binlogEvent)
} }
} }
// notifyBinlogCoordinatesListeners will notify all coordinates listeners, and these can decide what
// to do with the information
func (this *EventsStreamer) notifyBinlogCoordinatesListeners(binlogCoordinates *mysql.BinlogCoordinates) {
this.listenersMutex.Lock()
defer this.listenersMutex.Unlock()
for _, listener := range this.coordinatesListeners {
listener.onBinlogCoordinates(binlogCoordinates)
}
} }
func (this *EventsStreamer) InitDBConnections() (err error) { func (this *EventsStreamer) InitDBConnections() (err error) {
@ -181,18 +187,26 @@ func (this *EventsStreamer) readCurrentBinlogCoordinates() error {
// StreamEvents will begin streaming events. It will be blocking, so should be // StreamEvents will begin streaming events. It will be blocking, so should be
// executed by a goroutine // executed by a goroutine
func (this *EventsStreamer) StreamEvents(canStopStreaming func() bool) error { func (this *EventsStreamer) StreamEvents(canStopStreaming func() bool) error {
eventsChannel := make(chan *binlog.BinlogEntry, EventsChannelBufferSize)
go func() { go func() {
for binlogEntry := range this.eventsChannel { for binlogEntry := range eventsChannel {
if binlogEntry.DmlEvent != nil { if binlogEntry.DmlEvent != nil {
this.notifyListeners(binlogEntry.DmlEvent) this.notifyDmlListeners(binlogEntry.DmlEvent)
} }
} }
}() }()
binlogCoordinatesChannel := make(chan *mysql.BinlogCoordinates, EventsChannelBufferSize)
go func() {
for binlogCoordinates := range binlogCoordinatesChannel {
this.notifyBinlogCoordinatesListeners(binlogCoordinates)
}
}()
// The next should block and execute forever, unless there's a serious error // The next should block and execute forever, unless there's a serious error
var successiveFailures int64 var successiveFailures int64
var lastAppliedRowsEventHint mysql.BinlogCoordinates var lastAppliedRowsEventHint mysql.BinlogCoordinates
for { for {
if err := this.binlogReader.StreamEvents(canStopStreaming, this.eventsChannel); err != nil { if err := this.binlogReader.StreamEvents(canStopStreaming, eventsChannel, binlogCoordinatesChannel); err != nil {
log.Infof("StreamEvents encountered unexpected error: %+v", err) log.Infof("StreamEvents encountered unexpected error: %+v", err)
this.migrationContext.MarkPointOfInterest() this.migrationContext.MarkPointOfInterest()
time.Sleep(ReconnectStreamerSleepSeconds * time.Second) time.Sleep(ReconnectStreamerSleepSeconds * time.Second)