.
This commit is contained in:
parent
d8fefb3d6f
commit
f771016bd5
130
go/logic/streamer.go
Normal file
130
go/logic/streamer.go
Normal file
@ -0,0 +1,130 @@
|
||||
/*
|
||||
Copyright 2016 GitHub Inc.
|
||||
See https://github.com/github/gh-osc/blob/master/LICENSE
|
||||
*/
|
||||
|
||||
package logic
|
||||
|
||||
import (
|
||||
gosql "database/sql"
|
||||
"fmt"
|
||||
"github.com/github/gh-osc/go/base"
|
||||
"github.com/github/gh-osc/go/mysql"
|
||||
|
||||
"github.com/outbrain/golib/log"
|
||||
"github.com/outbrain/golib/sqlutils"
|
||||
)
|
||||
|
||||
type BinlogEventListener struct {
|
||||
async bool
|
||||
databaseName string
|
||||
tableName string
|
||||
onEvent func(event *mysql.BinlogEvent) error
|
||||
}
|
||||
|
||||
// EventsStreamer reads data from binary logs and streams it on. It acts as a publisher,
|
||||
// and interested parties may subscribe for per-table events.
|
||||
type EventsStreamer struct {
|
||||
connectionConfig *mysql.ConnectionConfig
|
||||
db *gosql.DB
|
||||
migrationContext *base.MigrationContext
|
||||
nextBinlogCoordinates *mysql.BinlogCoordinates
|
||||
listeners [](*BinlogEventListener)
|
||||
}
|
||||
|
||||
func NewEventsStreamer() *EventsStreamer {
|
||||
return &EventsStreamer{
|
||||
connectionConfig: base.GetMigrationContext().InspectorConnectionConfig,
|
||||
migrationContext: base.GetMigrationContext(),
|
||||
listeners: [](*BinlogEventListener){},
|
||||
}
|
||||
}
|
||||
|
||||
func (this *EventsStreamer) AddListener(
|
||||
async bool, databaseName string, tableName string, onEvent func(event *mysql.BinlogEvent) error) (err error) {
|
||||
if databaseName == "" {
|
||||
return fmt.Errorf("Empty database name in AddListener")
|
||||
}
|
||||
if tableName == "" {
|
||||
return fmt.Errorf("Empty table name in AddListener")
|
||||
}
|
||||
listener := &BinlogEventListener{
|
||||
async: async,
|
||||
databaseName: databaseName,
|
||||
tableName: tableName,
|
||||
onEvent: onEvent,
|
||||
}
|
||||
this.listeners = append(this.listeners, listener)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (this *EventsStreamer) notifyListeners(binlogEvent *mysql.BinlogEvent) {
|
||||
for _, listener := range this.listeners {
|
||||
if listener.databaseName != binlogEvent.DatabaseName {
|
||||
continue
|
||||
}
|
||||
if listener.tableName != binlogEvent.TableName {
|
||||
continue
|
||||
}
|
||||
onEvent := listener.onEvent
|
||||
if listener.async {
|
||||
go func() {
|
||||
onEvent(binlogEvent)
|
||||
}()
|
||||
} else {
|
||||
onEvent(binlogEvent)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (this *EventsStreamer) InitDBConnections() (err error) {
|
||||
EventsStreamerUri := this.connectionConfig.GetDBUri(this.migrationContext.DatabaseName)
|
||||
if this.db, _, err = sqlutils.GetDB(EventsStreamerUri); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := this.validateConnection(); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := this.readCurrentBinlogCoordinates(); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// validateConnection issues a simple can-connect to MySQL
|
||||
func (this *EventsStreamer) validateConnection() error {
|
||||
query := `select @@global.port`
|
||||
var port int
|
||||
if err := this.db.QueryRow(query).Scan(&port); err != nil {
|
||||
return err
|
||||
}
|
||||
if port != this.connectionConfig.Key.Port {
|
||||
return fmt.Errorf("Unexpected database port reported: %+v", port)
|
||||
}
|
||||
log.Infof("connection validated on %+v", this.connectionConfig.Key)
|
||||
return nil
|
||||
}
|
||||
|
||||
// validateGrants verifies the user by which we're executing has necessary grants
|
||||
// to do its thang.
|
||||
func (this *EventsStreamer) readCurrentBinlogCoordinates() error {
|
||||
query := `show /* gh-osc readCurrentBinlogCoordinates */ master status`
|
||||
foundMasterStatus := false
|
||||
err := sqlutils.QueryRowsMap(this.db, query, func(m sqlutils.RowMap) error {
|
||||
this.nextBinlogCoordinates = &mysql.BinlogCoordinates{
|
||||
LogFile: m.GetString("File"),
|
||||
LogPos: m.GetInt64("Position"),
|
||||
}
|
||||
foundMasterStatus = true
|
||||
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !foundMasterStatus {
|
||||
return fmt.Errorf("Got no results from SHOW MASTER STATUS. Bailing out")
|
||||
}
|
||||
log.Debugf("Streamer binlog coordinates: %+v", *this.nextBinlogCoordinates)
|
||||
return nil
|
||||
}
|
162
go/mysql/binlog.go
Normal file
162
go/mysql/binlog.go
Normal file
@ -0,0 +1,162 @@
|
||||
/*
|
||||
Copyright 2015 Shlomi Noach, courtesy Booking.com
|
||||
See https://github.com/github/gh-osc/blob/master/LICENSE
|
||||
*/
|
||||
|
||||
package mysql
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"regexp"
|
||||
"strconv"
|
||||
"strings"
|
||||
)
|
||||
|
||||
var detachPattern *regexp.Regexp
|
||||
|
||||
func init() {
|
||||
detachPattern, _ = regexp.Compile(`//([^/:]+):([\d]+)`) // e.g. `//binlog.01234:567890`
|
||||
}
|
||||
|
||||
type BinlogType int
|
||||
|
||||
const (
|
||||
BinaryLog BinlogType = iota
|
||||
RelayLog
|
||||
)
|
||||
|
||||
// BinlogCoordinates described binary log coordinates in the form of log file & log position.
|
||||
type BinlogCoordinates struct {
|
||||
LogFile string
|
||||
LogPos int64
|
||||
Type BinlogType
|
||||
}
|
||||
|
||||
// ParseInstanceKey will parse an InstanceKey from a string representation such as 127.0.0.1:3306
|
||||
func ParseBinlogCoordinates(logFileLogPos string) (*BinlogCoordinates, error) {
|
||||
tokens := strings.SplitN(logFileLogPos, ":", 2)
|
||||
if len(tokens) != 2 {
|
||||
return nil, fmt.Errorf("ParseBinlogCoordinates: Cannot parse BinlogCoordinates from %s. Expected format is file:pos", logFileLogPos)
|
||||
}
|
||||
|
||||
if logPos, err := strconv.ParseInt(tokens[1], 10, 0); err != nil {
|
||||
return nil, fmt.Errorf("ParseBinlogCoordinates: invalid pos: %s", tokens[1])
|
||||
} else {
|
||||
return &BinlogCoordinates{LogFile: tokens[0], LogPos: logPos}, nil
|
||||
}
|
||||
}
|
||||
|
||||
// DisplayString returns a user-friendly string representation of these coordinates
|
||||
func (this *BinlogCoordinates) DisplayString() string {
|
||||
return fmt.Sprintf("%s:%d", this.LogFile, this.LogPos)
|
||||
}
|
||||
|
||||
// String returns a user-friendly string representation of these coordinates
|
||||
func (this BinlogCoordinates) String() string {
|
||||
return this.DisplayString()
|
||||
}
|
||||
|
||||
// Equals tests equality of this corrdinate and another one.
|
||||
func (this *BinlogCoordinates) Equals(other *BinlogCoordinates) bool {
|
||||
if other == nil {
|
||||
return false
|
||||
}
|
||||
return this.LogFile == other.LogFile && this.LogPos == other.LogPos && this.Type == other.Type
|
||||
}
|
||||
|
||||
// IsEmpty returns true if the log file is empty, unnamed
|
||||
func (this *BinlogCoordinates) IsEmpty() bool {
|
||||
return this.LogFile == ""
|
||||
}
|
||||
|
||||
// SmallerThan returns true if this coordinate is strictly smaller than the other.
|
||||
func (this *BinlogCoordinates) SmallerThan(other *BinlogCoordinates) bool {
|
||||
if this.LogFile < other.LogFile {
|
||||
return true
|
||||
}
|
||||
if this.LogFile == other.LogFile && this.LogPos < other.LogPos {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// SmallerThanOrEquals returns true if this coordinate is the same or equal to the other one.
|
||||
// We do NOT compare the type so we can not use this.Equals()
|
||||
func (this *BinlogCoordinates) SmallerThanOrEquals(other *BinlogCoordinates) bool {
|
||||
if this.SmallerThan(other) {
|
||||
return true
|
||||
}
|
||||
return this.LogFile == other.LogFile && this.LogPos == other.LogPos // No Type comparison
|
||||
}
|
||||
|
||||
// FileSmallerThan returns true if this coordinate's file is strictly smaller than the other's.
|
||||
func (this *BinlogCoordinates) FileSmallerThan(other *BinlogCoordinates) bool {
|
||||
return this.LogFile < other.LogFile
|
||||
}
|
||||
|
||||
// FileNumberDistance returns the numeric distance between this corrdinate's file number and the other's.
|
||||
// Effectively it means "how many roatets/FLUSHes would make these coordinates's file reach the other's"
|
||||
func (this *BinlogCoordinates) FileNumberDistance(other *BinlogCoordinates) int {
|
||||
thisNumber, _ := this.FileNumber()
|
||||
otherNumber, _ := other.FileNumber()
|
||||
return otherNumber - thisNumber
|
||||
}
|
||||
|
||||
// FileNumber returns the numeric value of the file, and the length in characters representing the number in the filename.
|
||||
// Example: FileNumber() of mysqld.log.000789 is (789, 6)
|
||||
func (this *BinlogCoordinates) FileNumber() (int, int) {
|
||||
tokens := strings.Split(this.LogFile, ".")
|
||||
numPart := tokens[len(tokens)-1]
|
||||
numLen := len(numPart)
|
||||
fileNum, err := strconv.Atoi(numPart)
|
||||
if err != nil {
|
||||
return 0, 0
|
||||
}
|
||||
return fileNum, numLen
|
||||
}
|
||||
|
||||
// PreviousFileCoordinatesBy guesses the filename of the previous binlog/relaylog, by given offset (number of files back)
|
||||
func (this *BinlogCoordinates) PreviousFileCoordinatesBy(offset int) (BinlogCoordinates, error) {
|
||||
result := BinlogCoordinates{LogPos: 0, Type: this.Type}
|
||||
|
||||
fileNum, numLen := this.FileNumber()
|
||||
if fileNum == 0 {
|
||||
return result, errors.New("Log file number is zero, cannot detect previous file")
|
||||
}
|
||||
newNumStr := fmt.Sprintf("%d", (fileNum - offset))
|
||||
newNumStr = strings.Repeat("0", numLen-len(newNumStr)) + newNumStr
|
||||
|
||||
tokens := strings.Split(this.LogFile, ".")
|
||||
tokens[len(tokens)-1] = newNumStr
|
||||
result.LogFile = strings.Join(tokens, ".")
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// PreviousFileCoordinates guesses the filename of the previous binlog/relaylog
|
||||
func (this *BinlogCoordinates) PreviousFileCoordinates() (BinlogCoordinates, error) {
|
||||
return this.PreviousFileCoordinatesBy(1)
|
||||
}
|
||||
|
||||
// PreviousFileCoordinates guesses the filename of the previous binlog/relaylog
|
||||
func (this *BinlogCoordinates) NextFileCoordinates() (BinlogCoordinates, error) {
|
||||
result := BinlogCoordinates{LogPos: 0, Type: this.Type}
|
||||
|
||||
fileNum, numLen := this.FileNumber()
|
||||
newNumStr := fmt.Sprintf("%d", (fileNum + 1))
|
||||
newNumStr = strings.Repeat("0", numLen-len(newNumStr)) + newNumStr
|
||||
|
||||
tokens := strings.Split(this.LogFile, ".")
|
||||
tokens[len(tokens)-1] = newNumStr
|
||||
result.LogFile = strings.Join(tokens, ".")
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// FileSmallerThan returns true if this coordinate's file is strictly smaller than the other's.
|
||||
func (this *BinlogCoordinates) DetachedCoordinates() (isDetached bool, detachedLogFile string, detachedLogPos string) {
|
||||
detachedCoordinatesSubmatch := detachPattern.FindStringSubmatch(this.LogFile)
|
||||
if len(detachedCoordinatesSubmatch) == 0 {
|
||||
return false, "", ""
|
||||
}
|
||||
return true, detachedCoordinatesSubmatch[1], detachedCoordinatesSubmatch[2]
|
||||
}
|
14
go/mysql/binlog_event.go
Normal file
14
go/mysql/binlog_event.go
Normal file
@ -0,0 +1,14 @@
|
||||
/*
|
||||
Copyright 2016 GitHub Inc.
|
||||
See https://github.com/github/gh-osc/blob/master/LICENSE
|
||||
*/
|
||||
|
||||
package mysql
|
||||
|
||||
import ()
|
||||
|
||||
// BinlogEvent is a binary log event entry, with data
|
||||
type BinlogEvent struct {
|
||||
TableName string
|
||||
DatabaseName string
|
||||
}
|
Loading…
Reference in New Issue
Block a user