2016-03-30 13:43:40 +00:00
|
|
|
/*
|
|
|
|
Copyright 2016 GitHub Inc.
|
2016-05-16 09:09:17 +00:00
|
|
|
See https://github.com/github/gh-ost/blob/master/LICENSE
|
2016-03-30 13:43:40 +00:00
|
|
|
*/
|
|
|
|
|
|
|
|
package binlog
|
|
|
|
|
|
|
|
import (
|
|
|
|
"fmt"
|
2016-05-23 09:12:59 +00:00
|
|
|
"sync"
|
2016-03-30 13:43:40 +00:00
|
|
|
|
2016-05-16 09:09:17 +00:00
|
|
|
"github.com/github/gh-ost/go/mysql"
|
|
|
|
"github.com/github/gh-ost/go/sql"
|
2016-04-06 16:44:54 +00:00
|
|
|
|
2016-03-30 13:43:40 +00:00
|
|
|
"github.com/outbrain/golib/log"
|
|
|
|
gomysql "github.com/siddontang/go-mysql/mysql"
|
|
|
|
"github.com/siddontang/go-mysql/replication"
|
|
|
|
)
|
|
|
|
|
|
|
|
var ()
|
|
|
|
|
|
|
|
const (
|
|
|
|
serverId = 99999
|
|
|
|
)
|
|
|
|
|
|
|
|
type GoMySQLReader struct {
|
2016-05-23 09:12:59 +00:00
|
|
|
connectionConfig *mysql.ConnectionConfig
|
|
|
|
binlogSyncer *replication.BinlogSyncer
|
|
|
|
binlogStreamer *replication.BinlogStreamer
|
|
|
|
currentCoordinates mysql.BinlogCoordinates
|
|
|
|
currentCoordinatesMutex *sync.Mutex
|
|
|
|
LastAppliedRowsEventHint mysql.BinlogCoordinates
|
2016-03-30 13:43:40 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func NewGoMySQLReader(connectionConfig *mysql.ConnectionConfig) (binlogReader *GoMySQLReader, err error) {
|
|
|
|
binlogReader = &GoMySQLReader{
|
2016-05-23 09:12:59 +00:00
|
|
|
connectionConfig: connectionConfig,
|
|
|
|
currentCoordinates: mysql.BinlogCoordinates{},
|
|
|
|
currentCoordinatesMutex: &sync.Mutex{},
|
|
|
|
binlogSyncer: nil,
|
|
|
|
binlogStreamer: nil,
|
2016-03-30 13:43:40 +00:00
|
|
|
}
|
|
|
|
binlogReader.binlogSyncer = replication.NewBinlogSyncer(serverId, "mysql")
|
|
|
|
|
|
|
|
return binlogReader, err
|
|
|
|
}
|
|
|
|
|
2016-04-07 13:57:12 +00:00
|
|
|
// ConnectBinlogStreamer
|
|
|
|
func (this *GoMySQLReader) ConnectBinlogStreamer(coordinates mysql.BinlogCoordinates) (err error) {
|
2016-05-19 13:11:36 +00:00
|
|
|
if coordinates.IsEmpty() {
|
|
|
|
return log.Errorf("Emptry coordinates at ConnectBinlogStreamer()")
|
|
|
|
}
|
|
|
|
log.Infof("Registering replica at %+v:%+v", this.connectionConfig.Key.Hostname, uint16(this.connectionConfig.Key.Port))
|
|
|
|
if err := this.binlogSyncer.RegisterSlave(this.connectionConfig.Key.Hostname, uint16(this.connectionConfig.Key.Port), this.connectionConfig.User, this.connectionConfig.Password); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2016-04-07 13:57:12 +00:00
|
|
|
this.currentCoordinates = coordinates
|
2016-05-19 13:11:36 +00:00
|
|
|
log.Infof("Connecting binlog streamer at %+v", this.currentCoordinates)
|
2016-04-07 13:57:12 +00:00
|
|
|
// Start sync with sepcified binlog file and position
|
2016-05-19 13:11:36 +00:00
|
|
|
this.binlogStreamer, err = this.binlogSyncer.StartSync(gomysql.Position{this.currentCoordinates.LogFile, uint32(this.currentCoordinates.LogPos)})
|
2016-04-07 13:57:12 +00:00
|
|
|
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2016-05-19 13:11:36 +00:00
|
|
|
func (this *GoMySQLReader) Reconnect() error {
|
|
|
|
this.binlogSyncer.Close()
|
2016-05-23 09:12:59 +00:00
|
|
|
connectCoordinates := &mysql.BinlogCoordinates{LogFile: this.currentCoordinates.LogFile, LogPos: 4}
|
2016-05-19 13:11:36 +00:00
|
|
|
if err := this.ConnectBinlogStreamer(*connectCoordinates); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (this *GoMySQLReader) GetCurrentBinlogCoordinates() *mysql.BinlogCoordinates {
|
2016-05-23 09:12:59 +00:00
|
|
|
this.currentCoordinatesMutex.Lock()
|
|
|
|
defer this.currentCoordinatesMutex.Unlock()
|
|
|
|
returnCoordinates := this.currentCoordinates
|
|
|
|
return &returnCoordinates
|
2016-05-19 13:11:36 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// StreamEvents
|
|
|
|
func (this *GoMySQLReader) handleRowsEvent(ev *replication.BinlogEvent, rowsEvent *replication.RowsEvent, entriesChannel chan<- *BinlogEntry) error {
|
2016-05-23 09:12:59 +00:00
|
|
|
if this.currentCoordinates.SmallerThanOrEquals(&this.LastAppliedRowsEventHint) {
|
|
|
|
log.Debugf("Skipping handled query at %+v", this.currentCoordinates)
|
2016-05-19 13:11:36 +00:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
dml := ToEventDML(ev.Header.EventType.String())
|
|
|
|
if dml == NotDML {
|
|
|
|
return fmt.Errorf("Unknown DML type: %s", ev.Header.EventType.String())
|
|
|
|
}
|
|
|
|
for i, row := range rowsEvent.Rows {
|
|
|
|
if dml == UpdateDML && i%2 == 1 {
|
|
|
|
// An update has two rows (WHERE+SET)
|
|
|
|
// We do both at the same time
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
binlogEntry := NewBinlogEntryAt(this.currentCoordinates)
|
|
|
|
binlogEntry.DmlEvent = NewBinlogDMLEvent(
|
|
|
|
string(rowsEvent.Table.Schema),
|
|
|
|
string(rowsEvent.Table.Table),
|
|
|
|
dml,
|
|
|
|
)
|
|
|
|
switch dml {
|
|
|
|
case InsertDML:
|
|
|
|
{
|
|
|
|
binlogEntry.DmlEvent.NewColumnValues = sql.ToColumnValues(row)
|
|
|
|
}
|
|
|
|
case UpdateDML:
|
|
|
|
{
|
|
|
|
binlogEntry.DmlEvent.WhereColumnValues = sql.ToColumnValues(row)
|
|
|
|
binlogEntry.DmlEvent.NewColumnValues = sql.ToColumnValues(rowsEvent.Rows[i+1])
|
|
|
|
}
|
|
|
|
case DeleteDML:
|
|
|
|
{
|
|
|
|
binlogEntry.DmlEvent.WhereColumnValues = sql.ToColumnValues(row)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
// The channel will do the throttling. Whoever is reding from the channel
|
|
|
|
// decides whether action is taken sycnhronously (meaning we wait before
|
|
|
|
// next iteration) or asynchronously (we keep pushing more events)
|
|
|
|
// In reality, reads will be synchronous
|
|
|
|
entriesChannel <- binlogEntry
|
|
|
|
}
|
2016-05-23 09:12:59 +00:00
|
|
|
this.LastAppliedRowsEventHint = this.currentCoordinates
|
2016-05-19 13:11:36 +00:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2016-04-07 13:57:12 +00:00
|
|
|
// StreamEvents
|
|
|
|
func (this *GoMySQLReader) StreamEvents(canStopStreaming func() bool, entriesChannel chan<- *BinlogEntry) error {
|
|
|
|
for {
|
|
|
|
if canStopStreaming() {
|
|
|
|
break
|
|
|
|
}
|
|
|
|
ev, err := this.binlogStreamer.GetEvent()
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2016-05-23 09:12:59 +00:00
|
|
|
// if rand.Intn(1000) == 0 {
|
|
|
|
// this.binlogSyncer.Close()
|
|
|
|
// log.Debugf("current: %+v, hint: %+v", this.currentCoordinates, this.LastAppliedRowsEventHint)
|
|
|
|
// return log.Errorf(".............haha got random error")
|
|
|
|
// }
|
|
|
|
// log.Debugf("0001 ........ currentCoordinates: %+v", this.currentCoordinates) //TODO
|
|
|
|
func() {
|
|
|
|
this.currentCoordinatesMutex.Lock()
|
|
|
|
defer this.currentCoordinatesMutex.Unlock()
|
|
|
|
this.currentCoordinates.LogPos = int64(ev.Header.LogPos)
|
|
|
|
}()
|
2016-04-07 13:57:12 +00:00
|
|
|
if rotateEvent, ok := ev.Event.(*replication.RotateEvent); ok {
|
2016-05-23 09:12:59 +00:00
|
|
|
// log.Debugf("0008 ........ currentCoordinates: %+v", this.currentCoordinates) //TODO
|
|
|
|
// ev.Dump(os.Stdout)
|
|
|
|
func() {
|
|
|
|
this.currentCoordinatesMutex.Lock()
|
|
|
|
defer this.currentCoordinatesMutex.Unlock()
|
|
|
|
this.currentCoordinates.LogFile = string(rotateEvent.NextLogName)
|
|
|
|
}()
|
|
|
|
// log.Debugf("0001 ........ currentCoordinates: %+v", this.currentCoordinates) //TODO
|
2016-04-07 13:57:12 +00:00
|
|
|
log.Infof("rotate to next log name: %s", rotateEvent.NextLogName)
|
|
|
|
} else if rowsEvent, ok := ev.Event.(*replication.RowsEvent); ok {
|
2016-05-19 13:11:36 +00:00
|
|
|
if err := this.handleRowsEvent(ev, rowsEvent, entriesChannel); err != nil {
|
|
|
|
return err
|
2016-04-07 13:57:12 +00:00
|
|
|
}
|
|
|
|
}
|
2016-05-23 09:12:59 +00:00
|
|
|
// log.Debugf("TODO ........ currentCoordinates: %+v", this.currentCoordinates) //TODO
|
2016-03-30 13:43:40 +00:00
|
|
|
}
|
2016-04-07 13:57:12 +00:00
|
|
|
log.Debugf("done streaming events")
|
|
|
|
|
|
|
|
return nil
|
2016-03-30 13:43:40 +00:00
|
|
|
}
|