diff --git a/go/logic/streamer.go b/go/logic/streamer.go new file mode 100644 index 0000000..43d2b7c --- /dev/null +++ b/go/logic/streamer.go @@ -0,0 +1,130 @@ +/* + Copyright 2016 GitHub Inc. + See https://github.com/github/gh-osc/blob/master/LICENSE +*/ + +package logic + +import ( + gosql "database/sql" + "fmt" + "github.com/github/gh-osc/go/base" + "github.com/github/gh-osc/go/mysql" + + "github.com/outbrain/golib/log" + "github.com/outbrain/golib/sqlutils" +) + +type BinlogEventListener struct { + async bool + databaseName string + tableName string + onEvent func(event *mysql.BinlogEvent) error +} + +// EventsStreamer reads data from binary logs and streams it on. It acts as a publisher, +// and interested parties may subscribe for per-table events. +type EventsStreamer struct { + connectionConfig *mysql.ConnectionConfig + db *gosql.DB + migrationContext *base.MigrationContext + nextBinlogCoordinates *mysql.BinlogCoordinates + listeners [](*BinlogEventListener) +} + +func NewEventsStreamer() *EventsStreamer { + return &EventsStreamer{ + connectionConfig: base.GetMigrationContext().InspectorConnectionConfig, + migrationContext: base.GetMigrationContext(), + listeners: [](*BinlogEventListener){}, + } +} + +func (this *EventsStreamer) AddListener( + async bool, databaseName string, tableName string, onEvent func(event *mysql.BinlogEvent) error) (err error) { + if databaseName == "" { + return fmt.Errorf("Empty database name in AddListener") + } + if tableName == "" { + return fmt.Errorf("Empty table name in AddListener") + } + listener := &BinlogEventListener{ + async: async, + databaseName: databaseName, + tableName: tableName, + onEvent: onEvent, + } + this.listeners = append(this.listeners, listener) + return nil +} + +func (this *EventsStreamer) notifyListeners(binlogEvent *mysql.BinlogEvent) { + for _, listener := range this.listeners { + if listener.databaseName != binlogEvent.DatabaseName { + continue + } + if listener.tableName != binlogEvent.TableName { + continue + } + onEvent := listener.onEvent + if listener.async { + go func() { + onEvent(binlogEvent) + }() + } else { + onEvent(binlogEvent) + } + } +} + +func (this *EventsStreamer) InitDBConnections() (err error) { + EventsStreamerUri := this.connectionConfig.GetDBUri(this.migrationContext.DatabaseName) + if this.db, _, err = sqlutils.GetDB(EventsStreamerUri); err != nil { + return err + } + if err := this.validateConnection(); err != nil { + return err + } + if err := this.readCurrentBinlogCoordinates(); err != nil { + return err + } + return nil +} + +// validateConnection issues a simple can-connect to MySQL +func (this *EventsStreamer) validateConnection() error { + query := `select @@global.port` + var port int + if err := this.db.QueryRow(query).Scan(&port); err != nil { + return err + } + if port != this.connectionConfig.Key.Port { + return fmt.Errorf("Unexpected database port reported: %+v", port) + } + log.Infof("connection validated on %+v", this.connectionConfig.Key) + return nil +} + +// validateGrants verifies the user by which we're executing has necessary grants +// to do its thang. +func (this *EventsStreamer) readCurrentBinlogCoordinates() error { + query := `show /* gh-osc readCurrentBinlogCoordinates */ master status` + foundMasterStatus := false + err := sqlutils.QueryRowsMap(this.db, query, func(m sqlutils.RowMap) error { + this.nextBinlogCoordinates = &mysql.BinlogCoordinates{ + LogFile: m.GetString("File"), + LogPos: m.GetInt64("Position"), + } + foundMasterStatus = true + + return nil + }) + if err != nil { + return err + } + if !foundMasterStatus { + return fmt.Errorf("Got no results from SHOW MASTER STATUS. Bailing out") + } + log.Debugf("Streamer binlog coordinates: %+v", *this.nextBinlogCoordinates) + return nil +} diff --git a/go/mysql/binlog.go b/go/mysql/binlog.go new file mode 100644 index 0000000..42cf933 --- /dev/null +++ b/go/mysql/binlog.go @@ -0,0 +1,162 @@ +/* + Copyright 2015 Shlomi Noach, courtesy Booking.com + See https://github.com/github/gh-osc/blob/master/LICENSE +*/ + +package mysql + +import ( + "errors" + "fmt" + "regexp" + "strconv" + "strings" +) + +var detachPattern *regexp.Regexp + +func init() { + detachPattern, _ = regexp.Compile(`//([^/:]+):([\d]+)`) // e.g. `//binlog.01234:567890` +} + +type BinlogType int + +const ( + BinaryLog BinlogType = iota + RelayLog +) + +// BinlogCoordinates described binary log coordinates in the form of log file & log position. +type BinlogCoordinates struct { + LogFile string + LogPos int64 + Type BinlogType +} + +// ParseInstanceKey will parse an InstanceKey from a string representation such as 127.0.0.1:3306 +func ParseBinlogCoordinates(logFileLogPos string) (*BinlogCoordinates, error) { + tokens := strings.SplitN(logFileLogPos, ":", 2) + if len(tokens) != 2 { + return nil, fmt.Errorf("ParseBinlogCoordinates: Cannot parse BinlogCoordinates from %s. Expected format is file:pos", logFileLogPos) + } + + if logPos, err := strconv.ParseInt(tokens[1], 10, 0); err != nil { + return nil, fmt.Errorf("ParseBinlogCoordinates: invalid pos: %s", tokens[1]) + } else { + return &BinlogCoordinates{LogFile: tokens[0], LogPos: logPos}, nil + } +} + +// DisplayString returns a user-friendly string representation of these coordinates +func (this *BinlogCoordinates) DisplayString() string { + return fmt.Sprintf("%s:%d", this.LogFile, this.LogPos) +} + +// String returns a user-friendly string representation of these coordinates +func (this BinlogCoordinates) String() string { + return this.DisplayString() +} + +// Equals tests equality of this corrdinate and another one. +func (this *BinlogCoordinates) Equals(other *BinlogCoordinates) bool { + if other == nil { + return false + } + return this.LogFile == other.LogFile && this.LogPos == other.LogPos && this.Type == other.Type +} + +// IsEmpty returns true if the log file is empty, unnamed +func (this *BinlogCoordinates) IsEmpty() bool { + return this.LogFile == "" +} + +// SmallerThan returns true if this coordinate is strictly smaller than the other. +func (this *BinlogCoordinates) SmallerThan(other *BinlogCoordinates) bool { + if this.LogFile < other.LogFile { + return true + } + if this.LogFile == other.LogFile && this.LogPos < other.LogPos { + return true + } + return false +} + +// SmallerThanOrEquals returns true if this coordinate is the same or equal to the other one. +// We do NOT compare the type so we can not use this.Equals() +func (this *BinlogCoordinates) SmallerThanOrEquals(other *BinlogCoordinates) bool { + if this.SmallerThan(other) { + return true + } + return this.LogFile == other.LogFile && this.LogPos == other.LogPos // No Type comparison +} + +// FileSmallerThan returns true if this coordinate's file is strictly smaller than the other's. +func (this *BinlogCoordinates) FileSmallerThan(other *BinlogCoordinates) bool { + return this.LogFile < other.LogFile +} + +// FileNumberDistance returns the numeric distance between this corrdinate's file number and the other's. +// Effectively it means "how many roatets/FLUSHes would make these coordinates's file reach the other's" +func (this *BinlogCoordinates) FileNumberDistance(other *BinlogCoordinates) int { + thisNumber, _ := this.FileNumber() + otherNumber, _ := other.FileNumber() + return otherNumber - thisNumber +} + +// FileNumber returns the numeric value of the file, and the length in characters representing the number in the filename. +// Example: FileNumber() of mysqld.log.000789 is (789, 6) +func (this *BinlogCoordinates) FileNumber() (int, int) { + tokens := strings.Split(this.LogFile, ".") + numPart := tokens[len(tokens)-1] + numLen := len(numPart) + fileNum, err := strconv.Atoi(numPart) + if err != nil { + return 0, 0 + } + return fileNum, numLen +} + +// PreviousFileCoordinatesBy guesses the filename of the previous binlog/relaylog, by given offset (number of files back) +func (this *BinlogCoordinates) PreviousFileCoordinatesBy(offset int) (BinlogCoordinates, error) { + result := BinlogCoordinates{LogPos: 0, Type: this.Type} + + fileNum, numLen := this.FileNumber() + if fileNum == 0 { + return result, errors.New("Log file number is zero, cannot detect previous file") + } + newNumStr := fmt.Sprintf("%d", (fileNum - offset)) + newNumStr = strings.Repeat("0", numLen-len(newNumStr)) + newNumStr + + tokens := strings.Split(this.LogFile, ".") + tokens[len(tokens)-1] = newNumStr + result.LogFile = strings.Join(tokens, ".") + return result, nil +} + +// PreviousFileCoordinates guesses the filename of the previous binlog/relaylog +func (this *BinlogCoordinates) PreviousFileCoordinates() (BinlogCoordinates, error) { + return this.PreviousFileCoordinatesBy(1) +} + +// PreviousFileCoordinates guesses the filename of the previous binlog/relaylog +func (this *BinlogCoordinates) NextFileCoordinates() (BinlogCoordinates, error) { + result := BinlogCoordinates{LogPos: 0, Type: this.Type} + + fileNum, numLen := this.FileNumber() + newNumStr := fmt.Sprintf("%d", (fileNum + 1)) + newNumStr = strings.Repeat("0", numLen-len(newNumStr)) + newNumStr + + tokens := strings.Split(this.LogFile, ".") + tokens[len(tokens)-1] = newNumStr + result.LogFile = strings.Join(tokens, ".") + return result, nil +} + +// FileSmallerThan returns true if this coordinate's file is strictly smaller than the other's. +func (this *BinlogCoordinates) DetachedCoordinates() (isDetached bool, detachedLogFile string, detachedLogPos string) { + detachedCoordinatesSubmatch := detachPattern.FindStringSubmatch(this.LogFile) + if len(detachedCoordinatesSubmatch) == 0 { + return false, "", "" + } + return true, detachedCoordinatesSubmatch[1], detachedCoordinatesSubmatch[2] +} diff --git a/go/mysql/binlog_event.go b/go/mysql/binlog_event.go new file mode 100644 index 0000000..a1a22fb --- /dev/null +++ b/go/mysql/binlog_event.go @@ -0,0 +1,14 @@ +/* + Copyright 2016 GitHub Inc. + See https://github.com/github/gh-osc/blob/master/LICENSE +*/ + +package mysql + +import () + +// BinlogEvent is a binary log event entry, with data +type BinlogEvent struct { + TableName string + DatabaseName string +}