gh-ost/go/logic/streamer.go

224 lines
6.7 KiB
Go
Raw Normal View History

2016-04-06 11:05:21 +00:00
/*
Copyright 2016 GitHub Inc.
2016-05-16 09:09:17 +00:00
See https://github.com/github/gh-ost/blob/master/LICENSE
2016-04-06 11:05:21 +00:00
*/
package logic
import (
gosql "database/sql"
"fmt"
"strings"
"sync"
"time"
2016-05-16 09:09:17 +00:00
"github.com/github/gh-ost/go/base"
"github.com/github/gh-ost/go/binlog"
"github.com/github/gh-ost/go/mysql"
2016-04-06 11:05:21 +00:00
"github.com/outbrain/golib/log"
"github.com/outbrain/golib/sqlutils"
)
type BinlogEventListener struct {
async bool
databaseName string
tableName string
onDmlEvent func(event *binlog.BinlogDMLEvent) error
2016-04-06 11:05:21 +00:00
}
const (
EventsChannelBufferSize = 1
ReconnectStreamerSleepSeconds = 5
)
2016-04-06 11:05:21 +00:00
// EventsStreamer reads data from binary logs and streams it on. It acts as a publisher,
// and interested parties may subscribe for per-table events.
type EventsStreamer struct {
connectionConfig *mysql.ConnectionConfig
db *gosql.DB
migrationContext *base.MigrationContext
initialBinlogCoordinates *mysql.BinlogCoordinates
listeners [](*BinlogEventListener)
listenersMutex *sync.Mutex
eventsChannel chan *binlog.BinlogEntry
binlogReader *binlog.GoMySQLReader
2016-04-06 11:05:21 +00:00
}
2017-08-08 20:36:54 +00:00
func NewEventsStreamer(migrationContext *base.MigrationContext) *EventsStreamer {
2016-04-06 11:05:21 +00:00
return &EventsStreamer{
2017-08-08 20:36:54 +00:00
connectionConfig: migrationContext.InspectorConnectionConfig,
migrationContext: migrationContext,
2016-04-06 11:05:21 +00:00
listeners: [](*BinlogEventListener){},
listenersMutex: &sync.Mutex{},
eventsChannel: make(chan *binlog.BinlogEntry, EventsChannelBufferSize),
2016-04-06 11:05:21 +00:00
}
}
// AddListener registers a new listener for binlog events, on a per-table basis
2016-04-06 11:05:21 +00:00
func (this *EventsStreamer) AddListener(
async bool, databaseName string, tableName string, onDmlEvent func(event *binlog.BinlogDMLEvent) error) (err error) {
this.listenersMutex.Lock()
defer this.listenersMutex.Unlock()
2016-04-06 11:05:21 +00:00
if databaseName == "" {
return fmt.Errorf("Empty database name in AddListener")
}
if tableName == "" {
return fmt.Errorf("Empty table name in AddListener")
}
listener := &BinlogEventListener{
async: async,
databaseName: databaseName,
tableName: tableName,
onDmlEvent: onDmlEvent,
2016-04-06 11:05:21 +00:00
}
this.listeners = append(this.listeners, listener)
return nil
}
// notifyListeners will notify relevant listeners with given DML event. Only
// listeners registered for changes on the table on which the DML operates are notified.
func (this *EventsStreamer) notifyListeners(binlogEvent *binlog.BinlogDMLEvent) {
this.listenersMutex.Lock()
defer this.listenersMutex.Unlock()
2016-04-06 11:05:21 +00:00
for _, listener := range this.listeners {
listener := listener
if strings.ToLower(listener.databaseName) != strings.ToLower(binlogEvent.DatabaseName) {
2016-04-06 11:05:21 +00:00
continue
}
if strings.ToLower(listener.tableName) != strings.ToLower(binlogEvent.TableName) {
2016-04-06 11:05:21 +00:00
continue
}
if listener.async {
go func() {
listener.onDmlEvent(binlogEvent)
2016-04-06 11:05:21 +00:00
}()
} else {
listener.onDmlEvent(binlogEvent)
2016-04-06 11:05:21 +00:00
}
}
}
func (this *EventsStreamer) InitDBConnections() (err error) {
EventsStreamerUri := this.connectionConfig.GetDBUri(this.migrationContext.DatabaseName)
if this.db, _, err = mysql.GetDB(this.migrationContext.Uuid, EventsStreamerUri); err != nil {
2016-04-06 11:05:21 +00:00
return err
}
Allow gh-ost to modify the server using extra port Both Percona and Maria allow MySQL to be configured to listen on an extra port when their thread pool is enable. * https://www.percona.com/doc/percona-server/5.7/performance/threadpool.html * https://mariadb.com/kb/en/the-mariadb-library/thread-pool-in-mariadb-51-53/ This is valuable because if the table has a lot of traffic (read or write load), gh-ost can end up starving the thread pool as incomming connections are immediately blocked. By using gh-ost on the extra port, MySQL locking will still behave the same, but MySQL will keep a dedicated thread for each gh-ost connection. When doing this, it's important to inspect the extra-max-connections variable. Both Percona and Maria default to 1, so gh-ost may easily exceed with its threads. An example local run using this ``` $ mysql -S /tmp/mysql_sandbox20393.sock -e "select @@global.port, @@global.extra_port" +---------------+---------------------+ | @@global.port | @@global.extra_port | +---------------+---------------------+ | 20393 | 30393 | +---------------+---------------------+ ./bin/gh-ost \ --initially-drop-ghost-table \ --initially-drop-old-table \ --assume-rbr \ --port="20395" \ --assume-master-host="127.0.0.1:30393" \ --max-load=Threads_running=25 \ --critical-load=Threads_running=1000 \ --chunk-size=1000 \ --max-lag-millis=1500 \ --user="gh-ost" \ --password="gh-ost" \ --database="test" \ --table="mytable" \ --verbose \ --alter="ADD mynewcol decimal(11,2) DEFAULT 0.0 NOT NULL" \ --exact-rowcount \ --concurrent-rowcount \ --default-retries=120 \ --panic-flag-file=/tmp/ghost.panic.flag \ --postpone-cut-over-flag-file=/tmp/ghost.postpone.flag \ --execute ```
2017-09-06 18:25:35 +00:00
if _, err := base.ValidateConnection(this.db, this.connectionConfig); err != nil {
2016-04-06 11:05:21 +00:00
return err
}
if err := this.readCurrentBinlogCoordinates(); err != nil {
return err
}
if err := this.initBinlogReader(this.initialBinlogCoordinates); err != nil {
return err
}
return nil
}
// initBinlogReader creates and connects the reader: we hook up to a MySQL server as a replica
func (this *EventsStreamer) initBinlogReader(binlogCoordinates *mysql.BinlogCoordinates) error {
2017-08-08 20:41:46 +00:00
goMySQLReader, err := binlog.NewGoMySQLReader(this.migrationContext)
if err != nil {
return err
}
if err := goMySQLReader.ConnectBinlogStreamer(*binlogCoordinates); err != nil {
return err
}
this.binlogReader = goMySQLReader
2016-04-06 11:05:21 +00:00
return nil
}
func (this *EventsStreamer) GetCurrentBinlogCoordinates() *mysql.BinlogCoordinates {
return this.binlogReader.GetCurrentBinlogCoordinates()
}
func (this *EventsStreamer) GetReconnectBinlogCoordinates() *mysql.BinlogCoordinates {
return &mysql.BinlogCoordinates{LogFile: this.GetCurrentBinlogCoordinates().LogFile, LogPos: 4}
}
// readCurrentBinlogCoordinates reads master status from hooked server
2016-04-06 11:05:21 +00:00
func (this *EventsStreamer) readCurrentBinlogCoordinates() error {
2016-05-16 09:09:17 +00:00
query := `show /* gh-ost readCurrentBinlogCoordinates */ master status`
2016-04-06 11:05:21 +00:00
foundMasterStatus := false
err := sqlutils.QueryRowsMap(this.db, query, func(m sqlutils.RowMap) error {
this.initialBinlogCoordinates = &mysql.BinlogCoordinates{
2016-04-06 11:05:21 +00:00
LogFile: m.GetString("File"),
LogPos: m.GetInt64("Position"),
}
foundMasterStatus = true
return nil
})
if err != nil {
return err
}
if !foundMasterStatus {
return fmt.Errorf("Got no results from SHOW MASTER STATUS. Bailing out")
}
log.Debugf("Streamer binlog coordinates: %+v", *this.initialBinlogCoordinates)
2016-04-06 11:05:21 +00:00
return nil
}
// StreamEvents will begin streaming events. It will be blocking, so should be
// executed by a goroutine
func (this *EventsStreamer) StreamEvents(canStopStreaming func() bool) error {
go func() {
for binlogEntry := range this.eventsChannel {
if binlogEntry.DmlEvent != nil {
this.notifyListeners(binlogEntry.DmlEvent)
}
}
}()
// The next should block and execute forever, unless there's a serious error
2016-07-25 13:17:30 +00:00
var successiveFailures int64
var lastAppliedRowsEventHint mysql.BinlogCoordinates
for {
2017-08-28 21:05:15 +00:00
if canStopStreaming() {
return nil
}
if err := this.binlogReader.StreamEvents(canStopStreaming, this.eventsChannel); err != nil {
2017-08-28 21:05:15 +00:00
if canStopStreaming() {
return nil
}
log.Infof("StreamEvents encountered unexpected error: %+v", err)
this.migrationContext.MarkPointOfInterest()
time.Sleep(ReconnectStreamerSleepSeconds * time.Second)
2016-07-25 13:17:30 +00:00
// See if there's retry overflow
if this.binlogReader.LastAppliedRowsEventHint.Equals(&lastAppliedRowsEventHint) {
successiveFailures += 1
} else {
successiveFailures = 0
}
if successiveFailures > this.migrationContext.MaxRetries() {
2016-10-13 11:10:16 +00:00
return fmt.Errorf("%d successive failures in streamer reconnect at coordinates %+v", successiveFailures, this.GetReconnectBinlogCoordinates())
2016-07-25 13:17:30 +00:00
}
// Reposition at same binlog file.
lastAppliedRowsEventHint = this.binlogReader.LastAppliedRowsEventHint
log.Infof("Reconnecting... Will resume at %+v", lastAppliedRowsEventHint)
if err := this.initBinlogReader(this.GetReconnectBinlogCoordinates()); err != nil {
return err
}
this.binlogReader.LastAppliedRowsEventHint = lastAppliedRowsEventHint
}
}
}
2016-10-27 11:52:37 +00:00
func (this *EventsStreamer) Close() (err error) {
err = this.binlogReader.Close()
log.Infof("Closed streamer connection. err=%+v", err)
return err
}
2017-08-28 21:05:15 +00:00
2017-08-28 22:53:47 +00:00
func (this *EventsStreamer) Teardown() {
2017-08-28 21:05:15 +00:00
this.db.Close()
return
}