Compare commits
1 Commits
master
...
safe-gtid-
Author | SHA1 | Date | |
---|---|---|---|
|
bc6f6109c7 |
@ -119,7 +119,7 @@ func (this *GoMySQLReader) handleRowsEvent(ev *replication.BinlogEvent, rowsEven
|
|||||||
}
|
}
|
||||||
|
|
||||||
// StreamEvents
|
// StreamEvents
|
||||||
func (this *GoMySQLReader) StreamEvents(canStopStreaming func() bool, entriesChannel chan<- *BinlogEntry) error {
|
func (this *GoMySQLReader) StreamEvents(canStopStreaming func() bool, entriesChannel chan<- *BinlogEntry, binlogCoordinatesChannel chan<- *mysql.BinlogCoordinates) error {
|
||||||
for {
|
for {
|
||||||
if canStopStreaming() {
|
if canStopStreaming() {
|
||||||
break
|
break
|
||||||
@ -128,33 +128,25 @@ func (this *GoMySQLReader) StreamEvents(canStopStreaming func() bool, entriesCha
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
// if rand.Intn(1000) == 0 {
|
|
||||||
// this.binlogSyncer.Close()
|
|
||||||
// log.Debugf("current: %+v, hint: %+v", this.currentCoordinates, this.LastAppliedRowsEventHint)
|
|
||||||
// return log.Errorf(".............haha got random error")
|
|
||||||
// }
|
|
||||||
// log.Debugf("0001 ........ currentCoordinates: %+v", this.currentCoordinates) //TODO
|
|
||||||
func() {
|
func() {
|
||||||
this.currentCoordinatesMutex.Lock()
|
this.currentCoordinatesMutex.Lock()
|
||||||
defer this.currentCoordinatesMutex.Unlock()
|
defer this.currentCoordinatesMutex.Unlock()
|
||||||
this.currentCoordinates.LogPos = int64(ev.Header.LogPos)
|
this.currentCoordinates.LogPos = int64(ev.Header.LogPos)
|
||||||
}()
|
}()
|
||||||
if rotateEvent, ok := ev.Event.(*replication.RotateEvent); ok {
|
if rotateEvent, ok := ev.Event.(*replication.RotateEvent); ok {
|
||||||
// log.Debugf("0008 ........ currentCoordinates: %+v", this.currentCoordinates) //TODO
|
|
||||||
// ev.Dump(os.Stdout)
|
|
||||||
func() {
|
func() {
|
||||||
this.currentCoordinatesMutex.Lock()
|
this.currentCoordinatesMutex.Lock()
|
||||||
defer this.currentCoordinatesMutex.Unlock()
|
defer this.currentCoordinatesMutex.Unlock()
|
||||||
this.currentCoordinates.LogFile = string(rotateEvent.NextLogName)
|
this.currentCoordinates.LogFile = string(rotateEvent.NextLogName)
|
||||||
}()
|
}()
|
||||||
// log.Debugf("0001 ........ currentCoordinates: %+v", this.currentCoordinates) //TODO
|
|
||||||
log.Infof("rotate to next log name: %s", rotateEvent.NextLogName)
|
log.Infof("rotate to next log name: %s", rotateEvent.NextLogName)
|
||||||
} else if rowsEvent, ok := ev.Event.(*replication.RowsEvent); ok {
|
} else if rowsEvent, ok := ev.Event.(*replication.RowsEvent); ok {
|
||||||
if err := this.handleRowsEvent(ev, rowsEvent, entriesChannel); err != nil {
|
if err := this.handleRowsEvent(ev, rowsEvent, entriesChannel); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// log.Debugf("TODO ........ currentCoordinates: %+v", this.currentCoordinates) //TODO
|
eventCoordinates := this.GetCurrentBinlogCoordinates()
|
||||||
|
binlogCoordinatesChannel <- eventCoordinates
|
||||||
}
|
}
|
||||||
log.Debugf("done streaming events")
|
log.Debugf("done streaming events")
|
||||||
|
|
||||||
|
@ -1048,8 +1048,7 @@ func (this *Migrator) initiateStreaming() error {
|
|||||||
if err := this.eventsStreamer.InitDBConnections(); err != nil {
|
if err := this.eventsStreamer.InitDBConnections(); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
this.eventsStreamer.AddListener(
|
this.eventsStreamer.addDmlListener(
|
||||||
false,
|
|
||||||
this.migrationContext.DatabaseName,
|
this.migrationContext.DatabaseName,
|
||||||
this.migrationContext.GetChangelogTableName(),
|
this.migrationContext.GetChangelogTableName(),
|
||||||
func(dmlEvent *binlog.BinlogDMLEvent) error {
|
func(dmlEvent *binlog.BinlogDMLEvent) error {
|
||||||
@ -1071,8 +1070,7 @@ func (this *Migrator) initiateStreaming() error {
|
|||||||
// addDMLEventsListener begins listening for binlog events on the original table,
|
// addDMLEventsListener begins listening for binlog events on the original table,
|
||||||
// and creates & enqueues a write task per such event.
|
// and creates & enqueues a write task per such event.
|
||||||
func (this *Migrator) addDMLEventsListener() error {
|
func (this *Migrator) addDMLEventsListener() error {
|
||||||
err := this.eventsStreamer.AddListener(
|
err := this.eventsStreamer.addDmlListener(
|
||||||
false,
|
|
||||||
this.migrationContext.DatabaseName,
|
this.migrationContext.DatabaseName,
|
||||||
this.migrationContext.OriginalTableName,
|
this.migrationContext.OriginalTableName,
|
||||||
func(dmlEvent *binlog.BinlogDMLEvent) error {
|
func(dmlEvent *binlog.BinlogDMLEvent) error {
|
||||||
|
@ -21,12 +21,15 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type BinlogEventListener struct {
|
type BinlogEventListener struct {
|
||||||
async bool
|
|
||||||
databaseName string
|
databaseName string
|
||||||
tableName string
|
tableName string
|
||||||
onDmlEvent func(event *binlog.BinlogDMLEvent) error
|
onDmlEvent func(event *binlog.BinlogDMLEvent) error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type BinlogCoordinatesListener struct {
|
||||||
|
onBinlogCoordinates func(binlogCoordinates *mysql.BinlogCoordinates) error
|
||||||
|
}
|
||||||
|
|
||||||
const (
|
const (
|
||||||
EventsChannelBufferSize = 1
|
EventsChannelBufferSize = 1
|
||||||
ReconnectStreamerSleepSeconds = 5
|
ReconnectStreamerSleepSeconds = 5
|
||||||
@ -39,25 +42,25 @@ type EventsStreamer struct {
|
|||||||
db *gosql.DB
|
db *gosql.DB
|
||||||
migrationContext *base.MigrationContext
|
migrationContext *base.MigrationContext
|
||||||
initialBinlogCoordinates *mysql.BinlogCoordinates
|
initialBinlogCoordinates *mysql.BinlogCoordinates
|
||||||
listeners [](*BinlogEventListener)
|
dmlListeners [](*BinlogEventListener)
|
||||||
|
coordinatesListeners [](*BinlogCoordinatesListener)
|
||||||
listenersMutex *sync.Mutex
|
listenersMutex *sync.Mutex
|
||||||
eventsChannel chan *binlog.BinlogEntry
|
|
||||||
binlogReader *binlog.GoMySQLReader
|
binlogReader *binlog.GoMySQLReader
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewEventsStreamer() *EventsStreamer {
|
func NewEventsStreamer() *EventsStreamer {
|
||||||
return &EventsStreamer{
|
return &EventsStreamer{
|
||||||
connectionConfig: base.GetMigrationContext().InspectorConnectionConfig,
|
connectionConfig: base.GetMigrationContext().InspectorConnectionConfig,
|
||||||
migrationContext: base.GetMigrationContext(),
|
migrationContext: base.GetMigrationContext(),
|
||||||
listeners: [](*BinlogEventListener){},
|
dmlListeners: [](*BinlogEventListener){},
|
||||||
listenersMutex: &sync.Mutex{},
|
coordinatesListeners: [](*BinlogCoordinatesListener){},
|
||||||
eventsChannel: make(chan *binlog.BinlogEntry, EventsChannelBufferSize),
|
listenersMutex: &sync.Mutex{},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// AddListener registers a new listener for binlog events, on a per-table basis
|
// addDmlListener registers a new listener for binlog events, on a per-table basis
|
||||||
func (this *EventsStreamer) AddListener(
|
func (this *EventsStreamer) addDmlListener(
|
||||||
async bool, databaseName string, tableName string, onDmlEvent func(event *binlog.BinlogDMLEvent) error) (err error) {
|
databaseName string, tableName string, onDmlEvent func(event *binlog.BinlogDMLEvent) error) (err error) {
|
||||||
|
|
||||||
this.listenersMutex.Lock()
|
this.listenersMutex.Lock()
|
||||||
defer this.listenersMutex.Unlock()
|
defer this.listenersMutex.Unlock()
|
||||||
@ -69,36 +72,39 @@ func (this *EventsStreamer) AddListener(
|
|||||||
return fmt.Errorf("Empty table name in AddListener")
|
return fmt.Errorf("Empty table name in AddListener")
|
||||||
}
|
}
|
||||||
listener := &BinlogEventListener{
|
listener := &BinlogEventListener{
|
||||||
async: async,
|
|
||||||
databaseName: databaseName,
|
databaseName: databaseName,
|
||||||
tableName: tableName,
|
tableName: tableName,
|
||||||
onDmlEvent: onDmlEvent,
|
onDmlEvent: onDmlEvent,
|
||||||
}
|
}
|
||||||
this.listeners = append(this.listeners, listener)
|
this.dmlListeners = append(this.dmlListeners, listener)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// notifyListeners will notify relevant listeners with given DML event. Only
|
// notifyDmlListeners will notify relevant listeners with given DML event. Only
|
||||||
// listeners registered for changes on the table on which the DML operates are notified.
|
// listeners registered for changes on the table on which the DML operates are notified.
|
||||||
func (this *EventsStreamer) notifyListeners(binlogEvent *binlog.BinlogDMLEvent) {
|
func (this *EventsStreamer) notifyDmlListeners(binlogEvent *binlog.BinlogDMLEvent) {
|
||||||
this.listenersMutex.Lock()
|
this.listenersMutex.Lock()
|
||||||
defer this.listenersMutex.Unlock()
|
defer this.listenersMutex.Unlock()
|
||||||
|
|
||||||
for _, listener := range this.listeners {
|
for _, listener := range this.dmlListeners {
|
||||||
listener := listener
|
|
||||||
if strings.ToLower(listener.databaseName) != strings.ToLower(binlogEvent.DatabaseName) {
|
if strings.ToLower(listener.databaseName) != strings.ToLower(binlogEvent.DatabaseName) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if strings.ToLower(listener.tableName) != strings.ToLower(binlogEvent.TableName) {
|
if strings.ToLower(listener.tableName) != strings.ToLower(binlogEvent.TableName) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if listener.async {
|
listener.onDmlEvent(binlogEvent)
|
||||||
go func() {
|
}
|
||||||
listener.onDmlEvent(binlogEvent)
|
}
|
||||||
}()
|
|
||||||
} else {
|
// notifyBinlogCoordinatesListeners will notify all coordinates listeners, and these can decide what
|
||||||
listener.onDmlEvent(binlogEvent)
|
// to do with the information
|
||||||
}
|
func (this *EventsStreamer) notifyBinlogCoordinatesListeners(binlogCoordinates *mysql.BinlogCoordinates) {
|
||||||
|
this.listenersMutex.Lock()
|
||||||
|
defer this.listenersMutex.Unlock()
|
||||||
|
|
||||||
|
for _, listener := range this.coordinatesListeners {
|
||||||
|
listener.onBinlogCoordinates(binlogCoordinates)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -181,18 +187,26 @@ func (this *EventsStreamer) readCurrentBinlogCoordinates() error {
|
|||||||
// StreamEvents will begin streaming events. It will be blocking, so should be
|
// StreamEvents will begin streaming events. It will be blocking, so should be
|
||||||
// executed by a goroutine
|
// executed by a goroutine
|
||||||
func (this *EventsStreamer) StreamEvents(canStopStreaming func() bool) error {
|
func (this *EventsStreamer) StreamEvents(canStopStreaming func() bool) error {
|
||||||
|
eventsChannel := make(chan *binlog.BinlogEntry, EventsChannelBufferSize)
|
||||||
go func() {
|
go func() {
|
||||||
for binlogEntry := range this.eventsChannel {
|
for binlogEntry := range eventsChannel {
|
||||||
if binlogEntry.DmlEvent != nil {
|
if binlogEntry.DmlEvent != nil {
|
||||||
this.notifyListeners(binlogEntry.DmlEvent)
|
this.notifyDmlListeners(binlogEntry.DmlEvent)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
binlogCoordinatesChannel := make(chan *mysql.BinlogCoordinates, EventsChannelBufferSize)
|
||||||
|
go func() {
|
||||||
|
for binlogCoordinates := range binlogCoordinatesChannel {
|
||||||
|
this.notifyBinlogCoordinatesListeners(binlogCoordinates)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
// The next should block and execute forever, unless there's a serious error
|
// The next should block and execute forever, unless there's a serious error
|
||||||
var successiveFailures int64
|
var successiveFailures int64
|
||||||
var lastAppliedRowsEventHint mysql.BinlogCoordinates
|
var lastAppliedRowsEventHint mysql.BinlogCoordinates
|
||||||
for {
|
for {
|
||||||
if err := this.binlogReader.StreamEvents(canStopStreaming, this.eventsChannel); err != nil {
|
if err := this.binlogReader.StreamEvents(canStopStreaming, eventsChannel, binlogCoordinatesChannel); err != nil {
|
||||||
log.Infof("StreamEvents encountered unexpected error: %+v", err)
|
log.Infof("StreamEvents encountered unexpected error: %+v", err)
|
||||||
this.migrationContext.MarkPointOfInterest()
|
this.migrationContext.MarkPointOfInterest()
|
||||||
time.Sleep(ReconnectStreamerSleepSeconds * time.Second)
|
time.Sleep(ReconnectStreamerSleepSeconds * time.Second)
|
||||||
|
Loading…
Reference in New Issue
Block a user