gh-ost/go/logic/streamer.go
Shlomi Noach 0e7b23e6fe - Creating an populating Changelog table
- Using heartbeat
- Throttling works based on heartbeat
- Refactored binlog_reader stuff. Now streaming events (into golang channel, which makes for nice buffering and throttling)
- Binlog table listeners work
- More Migrator logic; existing logic for waiting on `state` events (e.g. `TablesCreatedState`)
2016-04-07 15:57:12 +02:00

163 lines
4.6 KiB
Go

/*
Copyright 2016 GitHub Inc.
See https://github.com/github/gh-osc/blob/master/LICENSE
*/
package logic
import (
gosql "database/sql"
"fmt"
"strings"
"github.com/github/gh-osc/go/base"
"github.com/github/gh-osc/go/binlog"
"github.com/github/gh-osc/go/mysql"
"github.com/outbrain/golib/log"
"github.com/outbrain/golib/sqlutils"
)
type BinlogEventListener struct {
async bool
databaseName string
tableName string
onDmlEvent func(event *binlog.BinlogDMLEvent) error
}
const (
EventsChannelBufferSize = 1
)
// EventsStreamer reads data from binary logs and streams it on. It acts as a publisher,
// and interested parties may subscribe for per-table events.
type EventsStreamer struct {
connectionConfig *mysql.ConnectionConfig
db *gosql.DB
migrationContext *base.MigrationContext
nextBinlogCoordinates *mysql.BinlogCoordinates
listeners [](*BinlogEventListener)
eventsChannel chan *binlog.BinlogEntry
binlogReader binlog.BinlogReader
}
func NewEventsStreamer() *EventsStreamer {
return &EventsStreamer{
connectionConfig: base.GetMigrationContext().InspectorConnectionConfig,
migrationContext: base.GetMigrationContext(),
listeners: [](*BinlogEventListener){},
eventsChannel: make(chan *binlog.BinlogEntry, EventsChannelBufferSize),
}
}
func (this *EventsStreamer) AddListener(
async bool, databaseName string, tableName string, onDmlEvent func(event *binlog.BinlogDMLEvent) error) (err error) {
if databaseName == "" {
return fmt.Errorf("Empty database name in AddListener")
}
if tableName == "" {
return fmt.Errorf("Empty table name in AddListener")
}
listener := &BinlogEventListener{
async: async,
databaseName: databaseName,
tableName: tableName,
onDmlEvent: onDmlEvent,
}
this.listeners = append(this.listeners, listener)
return nil
}
func (this *EventsStreamer) notifyListeners(binlogEvent *binlog.BinlogDMLEvent) {
for _, listener := range this.listeners {
if strings.ToLower(listener.databaseName) != strings.ToLower(binlogEvent.DatabaseName) {
continue
}
if strings.ToLower(listener.tableName) != strings.ToLower(binlogEvent.TableName) {
continue
}
onDmlEvent := listener.onDmlEvent
if listener.async {
go func() {
onDmlEvent(binlogEvent)
}()
} else {
onDmlEvent(binlogEvent)
}
}
}
func (this *EventsStreamer) InitDBConnections() (err error) {
EventsStreamerUri := this.connectionConfig.GetDBUri(this.migrationContext.DatabaseName)
if this.db, _, err = sqlutils.GetDB(EventsStreamerUri); err != nil {
return err
}
if err := this.validateConnection(); err != nil {
return err
}
if err := this.readCurrentBinlogCoordinates(); err != nil {
return err
}
goMySQLReader, err := binlog.NewGoMySQLReader(this.migrationContext.InspectorConnectionConfig)
if err != nil {
return err
}
if err := goMySQLReader.ConnectBinlogStreamer(*this.nextBinlogCoordinates); err != nil {
return err
}
this.binlogReader = goMySQLReader
return nil
}
// validateConnection issues a simple can-connect to MySQL
func (this *EventsStreamer) validateConnection() error {
query := `select @@global.port`
var port int
if err := this.db.QueryRow(query).Scan(&port); err != nil {
return err
}
if port != this.connectionConfig.Key.Port {
return fmt.Errorf("Unexpected database port reported: %+v", port)
}
log.Infof("connection validated on %+v", this.connectionConfig.Key)
return nil
}
// validateGrants verifies the user by which we're executing has necessary grants
// to do its thang.
func (this *EventsStreamer) readCurrentBinlogCoordinates() error {
query := `show /* gh-osc readCurrentBinlogCoordinates */ master status`
foundMasterStatus := false
err := sqlutils.QueryRowsMap(this.db, query, func(m sqlutils.RowMap) error {
this.nextBinlogCoordinates = &mysql.BinlogCoordinates{
LogFile: m.GetString("File"),
LogPos: m.GetInt64("Position"),
}
foundMasterStatus = true
return nil
})
if err != nil {
return err
}
if !foundMasterStatus {
return fmt.Errorf("Got no results from SHOW MASTER STATUS. Bailing out")
}
log.Debugf("Streamer binlog coordinates: %+v", *this.nextBinlogCoordinates)
return nil
}
// StreamEvents will begin streaming events. It will be blocking, so should be
// executed by a goroutine
func (this *EventsStreamer) StreamEvents(canStopStreaming func() bool) error {
go func() {
for binlogEntry := range this.eventsChannel {
if binlogEntry.DmlEvent != nil {
this.notifyListeners(binlogEntry.DmlEvent)
}
}
}()
return this.binlogReader.StreamEvents(canStopStreaming, this.eventsChannel)
}