diff --git a/go/binlog/binlog_dml_event.go b/go/binlog/binlog_dml_event.go new file mode 100644 index 0000000..069b0a5 --- /dev/null +++ b/go/binlog/binlog_dml_event.go @@ -0,0 +1,66 @@ +/* + Copyright 2016 GitHub Inc. + See https://github.com/github/gh-osc/blob/master/LICENSE +*/ + +package binlog + +import ( + "fmt" + "github.com/github/gh-osc/go/sql" + "strings" +) + +type EventDML string + +const ( + NotDML EventDML = "NoDML" + InsertDML = "Insert" + UpdateDML = "Update" + DeleteDML = "Delete" +) + +func ToEventDML(description string) EventDML { + // description can be a statement (`UPDATE my_table ...`) or a RBR event name (`UpdateRowsEventV2`) + description = strings.TrimSpace(strings.Split(description, " ")[0]) + switch strings.ToLower(description) { + case "insert": + return InsertDML + case "update": + return UpdateDML + case "delete": + return DeleteDML + } + if strings.HasPrefix(description, "WriteRows") { + return InsertDML + } + if strings.HasPrefix(description, "UpdateRows") { + return UpdateDML + } + if strings.HasPrefix(description, "DeleteRows") { + return DeleteDML + } + return NotDML +} + +// BinlogDMLEvent is a binary log rows (DML) event entry, with data +type BinlogDMLEvent struct { + DatabaseName string + TableName string + DML EventDML + WhereColumnValues *sql.ColumnValues + NewColumnValues *sql.ColumnValues +} + +func NewBinlogDMLEvent(databaseName, tableName string, dml EventDML) *BinlogDMLEvent { + event := &BinlogDMLEvent{ + DatabaseName: databaseName, + TableName: tableName, + DML: dml, + } + return event +} + +func (this *BinlogDMLEvent) String() string { + return fmt.Sprintf("[%+v on %s:%s]", this.DML, this.DatabaseName, this.TableName) +} diff --git a/go/binlog/binlog_entry.go b/go/binlog/binlog_entry.go index 0610c70..909414b 100644 --- a/go/binlog/binlog_entry.go +++ b/go/binlog/binlog_entry.go @@ -5,27 +5,43 @@ package binlog +import ( + "fmt" + "github.com/github/gh-osc/go/mysql" +) + // BinlogEntry describes an entry in the binary log type BinlogEntry struct { - LogPos uint64 - EndLogPos uint64 - StatementType string // INSERT, UPDATE, DELETE - DatabaseName string - TableName string - PositionalColumns map[uint64]interface{} + Coordinates mysql.BinlogCoordinates + EndLogPos uint64 + + dmlEvent *BinlogDMLEvent } // NewBinlogEntry creates an empty, ready to go BinlogEntry object -func NewBinlogEntry() *BinlogEntry { - binlogEntry := &BinlogEntry{} - binlogEntry.PositionalColumns = make(map[uint64]interface{}) +func NewBinlogEntry(logFile string, logPos uint64) *BinlogEntry { + binlogEntry := &BinlogEntry{ + Coordinates: mysql.BinlogCoordinates{LogFile: logFile, LogPos: int64(logPos)}, + } + return binlogEntry +} + +// NewBinlogEntry creates an empty, ready to go BinlogEntry object +func NewBinlogEntryAt(coordinates mysql.BinlogCoordinates) *BinlogEntry { + binlogEntry := &BinlogEntry{ + Coordinates: coordinates, + } return binlogEntry } // Duplicate creates and returns a new binlog entry, with some of the attributes pre-assigned func (this *BinlogEntry) Duplicate() *BinlogEntry { - binlogEntry := NewBinlogEntry() - binlogEntry.LogPos = this.LogPos + binlogEntry := NewBinlogEntry(this.Coordinates.LogFile, uint64(this.Coordinates.LogPos)) binlogEntry.EndLogPos = this.EndLogPos return binlogEntry } + +// Duplicate creates and returns a new binlog entry, with some of the attributes pre-assigned +func (this *BinlogEntry) String() string { + return fmt.Sprintf("[BinlogEntry at %+v; dml:%+v]", this.Coordinates, this.dmlEvent) +} diff --git a/go/binlog/gomysql_reader.go b/go/binlog/gomysql_reader.go index b07900d..2ebaa71 100644 --- a/go/binlog/gomysql_reader.go +++ b/go/binlog/gomysql_reader.go @@ -7,11 +7,11 @@ package binlog import ( "fmt" - "os" - "reflect" "strings" "github.com/github/gh-osc/go/mysql" + "github.com/github/gh-osc/go/sql" + "github.com/outbrain/golib/log" gomysql "github.com/siddontang/go-mysql/mysql" "github.com/siddontang/go-mysql/replication" @@ -24,13 +24,17 @@ const ( ) type GoMySQLReader struct { - connectionConfig *mysql.ConnectionConfig - binlogSyncer *replication.BinlogSyncer + connectionConfig *mysql.ConnectionConfig + binlogSyncer *replication.BinlogSyncer + tableMap map[uint64]string + currentCoordinates mysql.BinlogCoordinates } func NewGoMySQLReader(connectionConfig *mysql.ConnectionConfig) (binlogReader *GoMySQLReader, err error) { binlogReader = &GoMySQLReader{ - connectionConfig: connectionConfig, + connectionConfig: connectionConfig, + tableMap: make(map[uint64]string), + currentCoordinates: mysql.BinlogCoordinates{}, } binlogReader.binlogSyncer = replication.NewBinlogSyncer(serverId, "mysql") @@ -59,6 +63,7 @@ func (this *GoMySQLReader) isDMLEvent(event *replication.BinlogEvent) bool { // ReadEntries will read binlog entries from parsed text output of `mysqlbinlog` utility func (this *GoMySQLReader) ReadEntries(logFile string, startPos uint64, stopPos uint64) (entries [](*BinlogEntry), err error) { + this.currentCoordinates.LogFile = logFile // Start sync with sepcified binlog file and position streamer, err := this.binlogSyncer.StartSync(gomysql.Position{logFile, uint32(startPos)}) if err != nil { @@ -70,28 +75,52 @@ func (this *GoMySQLReader) ReadEntries(logFile string, startPos uint64, stopPos if err != nil { return entries, err } - if rowsEvent, ok := ev.Event.(*replication.RowsEvent); ok { - if true { - fmt.Println(ev.Header.EventType) - fmt.Println(len(rowsEvent.Rows)) - - for _, rows := range rowsEvent.Rows { - for j, d := range rows { - if _, ok := d.([]byte); ok { - fmt.Print(fmt.Sprintf("%d:%q, %+v\n", j, d, reflect.TypeOf(d))) - } else { - fmt.Print(fmt.Sprintf("%d:%#v, %+v\n", j, d, reflect.TypeOf(d))) - } - } - fmt.Println("---") - } - } else { - ev.Dump(os.Stdout) + this.currentCoordinates.LogPos = int64(ev.Header.LogPos) + log.Infof("at: %+v", this.currentCoordinates) + if rotateEvent, ok := ev.Event.(*replication.RotateEvent); ok { + this.currentCoordinates.LogFile = string(rotateEvent.NextLogName) + log.Infof("rotate to next log name: %s", rotateEvent.NextLogName) + } else if tableMapEvent, ok := ev.Event.(*replication.TableMapEvent); ok { + // Actually not being used, since Table is available in RowsEvent. + // Keeping this here in case I'm wrong about this. Sometime in the near + // future I should remove this. + this.tableMap[tableMapEvent.TableID] = string(tableMapEvent.Table) + } else if rowsEvent, ok := ev.Event.(*replication.RowsEvent); ok { + dml := ToEventDML(ev.Header.EventType.String()) + if dml == NotDML { + return entries, fmt.Errorf("Unknown DML type: %s", ev.Header.EventType.String()) + } + for i, row := range rowsEvent.Rows { + if dml == UpdateDML && i%2 == 1 { + // An update has two rows (WHERE+SET) + // We do both at the same time + continue + } + binlogEntry := NewBinlogEntryAt(this.currentCoordinates) + binlogEntry.dmlEvent = NewBinlogDMLEvent( + string(rowsEvent.Table.Schema), + string(rowsEvent.Table.Table), + dml, + ) + switch dml { + case InsertDML: + { + binlogEntry.dmlEvent.NewColumnValues = sql.ToColumnValues(row) + log.Debugf("insert: %+v", binlogEntry.dmlEvent.NewColumnValues) + } + case UpdateDML: + { + binlogEntry.dmlEvent.WhereColumnValues = sql.ToColumnValues(row) + binlogEntry.dmlEvent.NewColumnValues = sql.ToColumnValues(rowsEvent.Rows[i+1]) + log.Debugf("update: %+v where %+v", binlogEntry.dmlEvent.NewColumnValues, binlogEntry.dmlEvent.WhereColumnValues) + } + case DeleteDML: + { + binlogEntry.dmlEvent.WhereColumnValues = sql.ToColumnValues(row) + log.Debugf("delete: %+v", binlogEntry.dmlEvent.WhereColumnValues) + } + } } - // TODO : convert to entries - // need to parse multi-row entries - // insert & delete are just one row per db orw - // update: where-row_>values-row, repeating } } log.Debugf("done") diff --git a/go/binlog/mysqlbinlog_reader.go b/go/binlog/mysqlbinlog_reader.go index 9ba02ae..569ff97 100644 --- a/go/binlog/mysqlbinlog_reader.go +++ b/go/binlog/mysqlbinlog_reader.go @@ -12,7 +12,7 @@ import ( "path" "regexp" "strconv" - "strings" + // "strings" "github.com/github/gh-osc/go/os" "github.com/outbrain/golib/log" @@ -78,7 +78,7 @@ func (this *MySQLBinlogReader) ReadEntries(logFile string, startPos uint64, stop return entries, log.Errore(err) } - chunkEntries, err := parseEntries(bufio.NewScanner(bytes.NewReader(entriesBytes))) + chunkEntries, err := parseEntries(bufio.NewScanner(bytes.NewReader(entriesBytes)), logFile) if err != nil { return entries, log.Errore(err) } @@ -103,41 +103,38 @@ func searchForStartPosOrStatement(scanner *bufio.Scanner, binlogEntry *BinlogEnt return InvalidState, binlogEntry, fmt.Errorf("Expected startLogPos %+v to equal previous endLogPos %+v", startLogPos, previousEndLogPos) } nextBinlogEntry = binlogEntry - if binlogEntry.LogPos != 0 && binlogEntry.StatementType != "" { + if binlogEntry.Coordinates.LogPos != 0 && binlogEntry.dmlEvent != nil { // Current entry is already a true entry, with startpos and with statement - nextBinlogEntry = NewBinlogEntry() + nextBinlogEntry = NewBinlogEntry(binlogEntry.Coordinates.LogFile, startLogPos) } - - nextBinlogEntry.LogPos = startLogPos return ExpectEndLogPosState, nextBinlogEntry, nil } onStatementEntry := func(submatch []string) (BinlogEntryState, *BinlogEntry, error) { nextBinlogEntry = binlogEntry - if binlogEntry.LogPos != 0 && binlogEntry.StatementType != "" { + if binlogEntry.Coordinates.LogPos != 0 && binlogEntry.dmlEvent != nil { // Current entry is already a true entry, with startpos and with statement nextBinlogEntry = binlogEntry.Duplicate() } - - nextBinlogEntry.StatementType = strings.Split(submatch[1], " ")[0] - nextBinlogEntry.DatabaseName = submatch[2] - nextBinlogEntry.TableName = submatch[3] + nextBinlogEntry.dmlEvent = NewBinlogDMLEvent(submatch[2], submatch[3], ToEventDML(submatch[1])) return ExpectTokenState, nextBinlogEntry, nil } - onPositionalColumn := func(submatch []string) (BinlogEntryState, *BinlogEntry, error) { - columnIndex, _ := strconv.ParseUint(submatch[1], 10, 64) - if _, found := binlogEntry.PositionalColumns[columnIndex]; found { - return InvalidState, binlogEntry, fmt.Errorf("Positional column %+v found more than once in %+v, statement=%+v", columnIndex, binlogEntry.LogPos, binlogEntry.StatementType) - } - columnValue := submatch[2] - columnValue = strings.TrimPrefix(columnValue, "'") - columnValue = strings.TrimSuffix(columnValue, "'") - binlogEntry.PositionalColumns[columnIndex] = columnValue + // Defuncting the following: - return SearchForStartPosOrStatementState, binlogEntry, nil - } + // onPositionalColumn := func(submatch []string) (BinlogEntryState, *BinlogEntry, error) { + // columnIndex, _ := strconv.ParseUint(submatch[1], 10, 64) + // if _, found := binlogEntry.PositionalColumns[columnIndex]; found { + // return InvalidState, binlogEntry, fmt.Errorf("Positional column %+v found more than once in %+v, statement=%+v", columnIndex, binlogEntry.LogPos, binlogEntry.dmlEvent.DML) + // } + // columnValue := submatch[2] + // columnValue = strings.TrimPrefix(columnValue, "'") + // columnValue = strings.TrimSuffix(columnValue, "'") + // binlogEntry.PositionalColumns[columnIndex] = columnValue + // + // return SearchForStartPosOrStatementState, binlogEntry, nil + // } line := scanner.Text() if submatch := startEntryRegexp.FindStringSubmatch(line); len(submatch) > 1 { @@ -150,7 +147,7 @@ func searchForStartPosOrStatement(scanner *bufio.Scanner, binlogEntry *BinlogEnt return onStatementEntry(submatch) } if submatch := positionalColumnRegexp.FindStringSubmatch(line); len(submatch) > 1 { - return onPositionalColumn(submatch) + // Defuncting return onPositionalColumn(submatch) } // Haven't found a match return SearchForStartPosOrStatementState, binlogEntry, nil @@ -165,7 +162,7 @@ func expectEndLogPos(scanner *bufio.Scanner, binlogEntry *BinlogEntry) (nextStat binlogEntry.EndLogPos, _ = strconv.ParseUint(submatch[1], 10, 64) return SearchForStartPosOrStatementState, nil } - return InvalidState, fmt.Errorf("Expected to find end_log_pos following pos %+v", binlogEntry.LogPos) + return InvalidState, fmt.Errorf("Expected to find end_log_pos following pos %+v", binlogEntry.Coordinates.LogPos) } // automaton step: a not-strictly-required but good-to-have-around validation that @@ -175,26 +172,26 @@ func expectToken(scanner *bufio.Scanner, binlogEntry *BinlogEntry) (nextState Bi if submatch := tokenRegxp.FindStringSubmatch(line); len(submatch) > 1 { return SearchForStartPosOrStatementState, nil } - return InvalidState, fmt.Errorf("Expected to find token following pos %+v", binlogEntry.LogPos) + return InvalidState, fmt.Errorf("Expected to find token following pos %+v", binlogEntry.Coordinates.LogPos) } // parseEntries will parse output of `mysqlbinlog --verbose --base64-output=DECODE-ROWS` // It issues an automaton / state machine to do its thang. -func parseEntries(scanner *bufio.Scanner) (entries [](*BinlogEntry), err error) { - binlogEntry := NewBinlogEntry() +func parseEntries(scanner *bufio.Scanner, logFile string) (entries [](*BinlogEntry), err error) { + binlogEntry := NewBinlogEntry(logFile, 0) var state BinlogEntryState = SearchForStartPosOrStatementState var endLogPos uint64 appendBinlogEntry := func() { - if binlogEntry.LogPos == 0 { + if binlogEntry.Coordinates.LogPos == 0 { return } - if binlogEntry.StatementType == "" { + if binlogEntry.dmlEvent == nil { return } entries = append(entries, binlogEntry) log.Debugf("entry: %+v", *binlogEntry) - fmt.Println(fmt.Sprintf("%s `%s`.`%s`", binlogEntry.StatementType, binlogEntry.DatabaseName, binlogEntry.TableName)) + fmt.Println(fmt.Sprintf("%s `%s`.`%s`", binlogEntry.dmlEvent.DML, binlogEntry.dmlEvent.DatabaseName, binlogEntry.dmlEvent.TableName)) } for scanner.Scan() { switch state { diff --git a/go/logic/streamer.go b/go/logic/streamer.go index 43d2b7c..9640ece 100644 --- a/go/logic/streamer.go +++ b/go/logic/streamer.go @@ -9,6 +9,7 @@ import ( gosql "database/sql" "fmt" "github.com/github/gh-osc/go/base" + "github.com/github/gh-osc/go/binlog" "github.com/github/gh-osc/go/mysql" "github.com/outbrain/golib/log" @@ -19,7 +20,7 @@ type BinlogEventListener struct { async bool databaseName string tableName string - onEvent func(event *mysql.BinlogEvent) error + onDmlEvent func(event *binlog.BinlogDMLEvent) error } // EventsStreamer reads data from binary logs and streams it on. It acts as a publisher, @@ -41,7 +42,7 @@ func NewEventsStreamer() *EventsStreamer { } func (this *EventsStreamer) AddListener( - async bool, databaseName string, tableName string, onEvent func(event *mysql.BinlogEvent) error) (err error) { + async bool, databaseName string, tableName string, onDmlEvent func(event *binlog.BinlogDMLEvent) error) (err error) { if databaseName == "" { return fmt.Errorf("Empty database name in AddListener") } @@ -52,13 +53,13 @@ func (this *EventsStreamer) AddListener( async: async, databaseName: databaseName, tableName: tableName, - onEvent: onEvent, + onDmlEvent: onDmlEvent, } this.listeners = append(this.listeners, listener) return nil } -func (this *EventsStreamer) notifyListeners(binlogEvent *mysql.BinlogEvent) { +func (this *EventsStreamer) notifyListeners(binlogEvent *binlog.BinlogDMLEvent) { for _, listener := range this.listeners { if listener.databaseName != binlogEvent.DatabaseName { continue @@ -66,13 +67,13 @@ func (this *EventsStreamer) notifyListeners(binlogEvent *mysql.BinlogEvent) { if listener.tableName != binlogEvent.TableName { continue } - onEvent := listener.onEvent + onDmlEvent := listener.onDmlEvent if listener.async { go func() { - onEvent(binlogEvent) + onDmlEvent(binlogEvent) }() } else { - onEvent(binlogEvent) + onDmlEvent(binlogEvent) } } } diff --git a/go/mysql/binlog_event.go b/go/mysql/binlog_event.go deleted file mode 100644 index a1a22fb..0000000 --- a/go/mysql/binlog_event.go +++ /dev/null @@ -1,14 +0,0 @@ -/* - Copyright 2016 GitHub Inc. - See https://github.com/github/gh-osc/blob/master/LICENSE -*/ - -package mysql - -import () - -// BinlogEvent is a binary log event entry, with data -type BinlogEvent struct { - TableName string - DatabaseName string -} diff --git a/go/sql/types.go b/go/sql/types.go index 69a68ee..567d75b 100644 --- a/go/sql/types.go +++ b/go/sql/types.go @@ -35,7 +35,7 @@ type UniqueKey struct { HasNullable bool } -// IsPrimary cehcks if this unique key is primary +// IsPrimary checks if this unique key is primary func (this *UniqueKey) IsPrimary() bool { return this.Name == "PRIMARY" } @@ -61,6 +61,18 @@ func NewColumnValues(length int) *ColumnValues { return result } +func ToColumnValues(abstractValues []interface{}) *ColumnValues { + result := &ColumnValues{ + abstractValues: abstractValues, + ValuesPointers: make([]interface{}, len(abstractValues)), + } + for i := 0; i < len(abstractValues); i++ { + result.ValuesPointers[i] = &result.abstractValues[i] + } + + return result +} + func (this *ColumnValues) AbstractValues() []interface{} { return this.abstractValues }