From 1768b55b3b36f4ae794800ff3f65711488e563ca Mon Sep 17 00:00:00 2001 From: Shlomi Noach Date: Wed, 23 Mar 2016 15:25:45 +0100 Subject: [PATCH 1/5] mysqlbinlog_reader is now a simple state machine --- go/binlog/mysqlbinlog_reader.go | 143 +++++++++++++++++++++----------- 1 file changed, 96 insertions(+), 47 deletions(-) diff --git a/go/binlog/mysqlbinlog_reader.go b/go/binlog/mysqlbinlog_reader.go index aa5356a..28c733d 100644 --- a/go/binlog/mysqlbinlog_reader.go +++ b/go/binlog/mysqlbinlog_reader.go @@ -25,6 +25,17 @@ var ( statementRegxp = regexp.MustCompile("### (INSERT INTO|UPDATE|DELETE FROM) `(.*?)`[.]`(.*?)`") ) +type BinlogEntryState string + +const ( + InvalidState BinlogEntryState = "InvalidState" + SearchForStartPosState = "SearchForStartPosState" + ExpectEndLogPosState = "ExpectEndLogPosState" + SearchForStatementState = "SearchForStatementState" + ExpectTokenState = "ExpectTokenState" + PositionalColumnAssignmentState = "PositionalColumnAssignmentState" +) + // MySQLBinlogReader reads binary log entries by executing the `mysqlbinlog` // process and textually parsing its output type MySQLBinlogReader struct { @@ -77,64 +88,102 @@ func (this *MySQLBinlogReader) ReadEntries(logFile string, startPos uint64, stop return entries, err } +func searchForStartPos(scanner *bufio.Scanner, binlogEntry *BinlogEntry, previousEndLogPos uint64) (nextState BinlogEntryState, nextBinlogEntry *BinlogEntry, err error) { + + onStartEntry := func(submatch []string) (BinlogEntryState, *BinlogEntry, error) { + startLogPos, _ := strconv.ParseUint(submatch[1], 10, 64) + + if previousEndLogPos != 0 && startLogPos != previousEndLogPos { + return InvalidState, binlogEntry, fmt.Errorf("Expected startLogPos %+v to equal previous endLogPos %+v", startLogPos, previousEndLogPos) + } + nextBinlogEntry = binlogEntry + if binlogEntry.LogPos != 0 && binlogEntry.StatementType != "" { + // Current entry is already a true entry, with startpos and with statement + nextBinlogEntry = &BinlogEntry{} + } + + nextBinlogEntry.LogPos = startLogPos + return ExpectEndLogPosState, nextBinlogEntry, nil + } + + line := scanner.Text() + if submatch := startEntryRegexp.FindStringSubmatch(line); len(submatch) > 1 { + return onStartEntry(submatch) + } + if submatch := startEntryUnknownTableRegexp.FindStringSubmatch(line); len(submatch) > 1 { + return onStartEntry(submatch) + } + // Haven't found a start entry + return SearchForStartPosState, binlogEntry, nil +} + +func expectEndLogPos(scanner *bufio.Scanner, binlogEntry *BinlogEntry) (nextState BinlogEntryState, err error) { + line := scanner.Text() + + submatch := endLogPosRegexp.FindStringSubmatch(line) + if len(submatch) <= 1 { + return InvalidState, fmt.Errorf("Expected to find end_log_pos following pos %+v", binlogEntry.LogPos) + } + binlogEntry.EndLogPos, _ = strconv.ParseUint(submatch[1], 10, 64) + + return SearchForStatementState, nil +} + +func searchForStatement(scanner *bufio.Scanner, binlogEntry *BinlogEntry) (nextState BinlogEntryState, err error) { + line := scanner.Text() + + if submatch := statementRegxp.FindStringSubmatch(line); len(submatch) > 1 { + binlogEntry.StatementType = strings.Split(submatch[1], " ")[0] + binlogEntry.DatabaseName = submatch[2] + binlogEntry.TableName = submatch[3] + + return SearchForStartPosState, nil + } + return SearchForStatementState, nil +} + func parseEntries(entriesBytes []byte) (entries [](*BinlogEntry), err error) { scanner := bufio.NewScanner(bytes.NewReader(entriesBytes)) - expectEndLogPos := false - var startLogPos uint64 + binlogEntry := &BinlogEntry{} + var state BinlogEntryState = SearchForStartPosState var endLogPos uint64 - binlogEntry := &BinlogEntry{} - + appendBinlogEntry := func() { + entries = append(entries, binlogEntry) + if binlogEntry.StatementType != "" { + log.Debugf("entry: %+v", *binlogEntry) + } + } for scanner.Scan() { - line := scanner.Text() - - onStartEntry := func(submatch []string) error { - startLogPos, _ = strconv.ParseUint(submatch[1], 10, 64) - - if endLogPos != 0 && startLogPos != endLogPos { - return fmt.Errorf("Expected startLogPos %+v to equal previous endLogPos %+v", startLogPos, endLogPos) + switch state { + case SearchForStartPosState: + { + var nextBinlogEntry *BinlogEntry + state, nextBinlogEntry, err = searchForStartPos(scanner, binlogEntry, endLogPos) + if nextBinlogEntry != binlogEntry { + appendBinlogEntry() + binlogEntry = nextBinlogEntry + } } - // We are entering a new entry, let's push the previous one - if binlogEntry.LogPos != 0 && binlogEntry.StatementType != "" { - entries = append(entries, binlogEntry) - log.Debugf("entry: %+v", *binlogEntry) - binlogEntry = &BinlogEntry{} + case ExpectEndLogPosState: + { + state, err = expectEndLogPos(scanner, binlogEntry) + } + case SearchForStatementState: + { + state, err = searchForStatement(scanner, binlogEntry) + } + default: + { + err = fmt.Errorf("Unexpected state %+v", state) } - - //log.Debugf(line) - binlogEntry.LogPos = startLogPos - // Next iteration we will read the end_log_pos - expectEndLogPos = true - - return nil } - if expectEndLogPos { - submatch := endLogPosRegexp.FindStringSubmatch(line) - if len(submatch) <= 1 { - return entries, log.Errorf("Expected to find end_log_pos following pos %+v", startLogPos) - } - endLogPos, _ = strconv.ParseUint(submatch[1], 10, 64) - - binlogEntry.EndLogPos = endLogPos - expectEndLogPos = false - } else if submatch := startEntryRegexp.FindStringSubmatch(line); len(submatch) > 1 { - if err := onStartEntry(submatch); err != nil { - return entries, log.Errore(err) - } - } else if submatch := startEntryUnknownTableRegexp.FindStringSubmatch(line); len(submatch) > 1 { - if err := onStartEntry(submatch); err != nil { - return entries, log.Errore(err) - } - } else if submatch := statementRegxp.FindStringSubmatch(line); len(submatch) > 1 { - binlogEntry.StatementType = strings.Split(submatch[1], " ")[0] - binlogEntry.DatabaseName = submatch[2] - binlogEntry.TableName = submatch[3] + if err != nil { + return entries, log.Errore(err) } - } if binlogEntry.LogPos != 0 { - entries = append(entries, binlogEntry) - log.Debugf("entry: %+v", *binlogEntry) + appendBinlogEntry() } return entries, err } From 8aa6a9750cdfc944ed471c40c8be6d7f146e2352 Mon Sep 17 00:00:00 2001 From: Shlomi Noach Date: Thu, 24 Mar 2016 14:25:52 +0100 Subject: [PATCH 2/5] adjusted automaton to accept multi-statement entries; added tests --- go/binlog/binlog_test.go | 3 +- go/binlog/mysqlbinlog_reader.go | 76 +++++---- go/binlog/mysqlbinlog_reader_test.go | 61 +++++++ go/binlog/testdata/rbr-sample-0.txt | 244 +++++++++++++++++++++++++++ 4 files changed, 351 insertions(+), 33 deletions(-) create mode 100644 go/binlog/mysqlbinlog_reader_test.go create mode 100644 go/binlog/testdata/rbr-sample-0.txt diff --git a/go/binlog/binlog_test.go b/go/binlog/binlog_test.go index 1314d1c..55fff25 100644 --- a/go/binlog/binlog_test.go +++ b/go/binlog/binlog_test.go @@ -17,9 +17,10 @@ package binlog import ( + "testing" + "github.com/outbrain/golib/log" test "github.com/outbrain/golib/tests" - "testing" ) func init() { diff --git a/go/binlog/mysqlbinlog_reader.go b/go/binlog/mysqlbinlog_reader.go index 28c733d..c368631 100644 --- a/go/binlog/mysqlbinlog_reader.go +++ b/go/binlog/mysqlbinlog_reader.go @@ -23,17 +23,17 @@ var ( startEntryUnknownTableRegexp = regexp.MustCompile("^### Row event for unknown table .*? at ([0-9]+)$") endLogPosRegexp = regexp.MustCompile("^#[0-9]{6} .*? end_log_pos ([0-9]+)") statementRegxp = regexp.MustCompile("### (INSERT INTO|UPDATE|DELETE FROM) `(.*?)`[.]`(.*?)`") + tokenRegxp = regexp.MustCompile("### (WHERE|SET)$") ) type BinlogEntryState string const ( - InvalidState BinlogEntryState = "InvalidState" - SearchForStartPosState = "SearchForStartPosState" - ExpectEndLogPosState = "ExpectEndLogPosState" - SearchForStatementState = "SearchForStatementState" - ExpectTokenState = "ExpectTokenState" - PositionalColumnAssignmentState = "PositionalColumnAssignmentState" + InvalidState BinlogEntryState = "InvalidState" + SearchForStartPosOrStatementState = "SearchForStartPosOrStatementState" + ExpectEndLogPosState = "ExpectEndLogPosState" + ExpectTokenState = "ExpectTokenState" + PositionalColumnAssignmentState = "PositionalColumnAssignmentState" ) // MySQLBinlogReader reads binary log entries by executing the `mysqlbinlog` @@ -72,7 +72,8 @@ func (this *MySQLBinlogReader) ReadEntries(logFile string, startPos uint64, stop if err != nil { return entries, log.Errore(err) } - chunkEntries, err := parseEntries(entriesBytes) + + chunkEntries, err := parseEntries(bufio.NewScanner(bytes.NewReader(entriesBytes))) if err != nil { return entries, log.Errore(err) } @@ -88,7 +89,7 @@ func (this *MySQLBinlogReader) ReadEntries(logFile string, startPos uint64, stop return entries, err } -func searchForStartPos(scanner *bufio.Scanner, binlogEntry *BinlogEntry, previousEndLogPos uint64) (nextState BinlogEntryState, nextBinlogEntry *BinlogEntry, err error) { +func searchForStartPosOrStatement(scanner *bufio.Scanner, binlogEntry *BinlogEntry, previousEndLogPos uint64) (nextState BinlogEntryState, nextBinlogEntry *BinlogEntry, err error) { onStartEntry := func(submatch []string) (BinlogEntryState, *BinlogEntry, error) { startLogPos, _ := strconv.ParseUint(submatch[1], 10, 64) @@ -106,6 +107,20 @@ func searchForStartPos(scanner *bufio.Scanner, binlogEntry *BinlogEntry, previou return ExpectEndLogPosState, nextBinlogEntry, nil } + onStatementEntry := func(submatch []string) (BinlogEntryState, *BinlogEntry, error) { + nextBinlogEntry = binlogEntry + if binlogEntry.LogPos != 0 && binlogEntry.StatementType != "" { + // Current entry is already a true entry, with startpos and with statement + nextBinlogEntry = &BinlogEntry{LogPos: binlogEntry.LogPos, EndLogPos: binlogEntry.EndLogPos} + } + + nextBinlogEntry.StatementType = strings.Split(submatch[1], " ")[0] + nextBinlogEntry.DatabaseName = submatch[2] + nextBinlogEntry.TableName = submatch[3] + + return ExpectTokenState, nextBinlogEntry, nil + } + line := scanner.Text() if submatch := startEntryRegexp.FindStringSubmatch(line); len(submatch) > 1 { return onStartEntry(submatch) @@ -113,53 +128,50 @@ func searchForStartPos(scanner *bufio.Scanner, binlogEntry *BinlogEntry, previou if submatch := startEntryUnknownTableRegexp.FindStringSubmatch(line); len(submatch) > 1 { return onStartEntry(submatch) } - // Haven't found a start entry - return SearchForStartPosState, binlogEntry, nil + if submatch := statementRegxp.FindStringSubmatch(line); len(submatch) > 1 { + return onStatementEntry(submatch) + } + // Haven't found a match + return SearchForStartPosOrStatementState, binlogEntry, nil } func expectEndLogPos(scanner *bufio.Scanner, binlogEntry *BinlogEntry) (nextState BinlogEntryState, err error) { line := scanner.Text() submatch := endLogPosRegexp.FindStringSubmatch(line) - if len(submatch) <= 1 { - return InvalidState, fmt.Errorf("Expected to find end_log_pos following pos %+v", binlogEntry.LogPos) + if len(submatch) > 1 { + binlogEntry.EndLogPos, _ = strconv.ParseUint(submatch[1], 10, 64) + return SearchForStartPosOrStatementState, nil } - binlogEntry.EndLogPos, _ = strconv.ParseUint(submatch[1], 10, 64) - - return SearchForStatementState, nil + return InvalidState, fmt.Errorf("Expected to find end_log_pos following pos %+v", binlogEntry.LogPos) } -func searchForStatement(scanner *bufio.Scanner, binlogEntry *BinlogEntry) (nextState BinlogEntryState, err error) { +func expectToken(scanner *bufio.Scanner, binlogEntry *BinlogEntry) (nextState BinlogEntryState, err error) { line := scanner.Text() - - if submatch := statementRegxp.FindStringSubmatch(line); len(submatch) > 1 { - binlogEntry.StatementType = strings.Split(submatch[1], " ")[0] - binlogEntry.DatabaseName = submatch[2] - binlogEntry.TableName = submatch[3] - - return SearchForStartPosState, nil + if submatch := tokenRegxp.FindStringSubmatch(line); len(submatch) > 1 { + return SearchForStartPosOrStatementState, nil } - return SearchForStatementState, nil + return InvalidState, fmt.Errorf("Expected to find token following pos %+v", binlogEntry.LogPos) } -func parseEntries(entriesBytes []byte) (entries [](*BinlogEntry), err error) { - scanner := bufio.NewScanner(bytes.NewReader(entriesBytes)) +func parseEntries(scanner *bufio.Scanner) (entries [](*BinlogEntry), err error) { binlogEntry := &BinlogEntry{} - var state BinlogEntryState = SearchForStartPosState + var state BinlogEntryState = SearchForStartPosOrStatementState var endLogPos uint64 appendBinlogEntry := func() { - entries = append(entries, binlogEntry) if binlogEntry.StatementType != "" { + entries = append(entries, binlogEntry) log.Debugf("entry: %+v", *binlogEntry) + //fmt.Println(fmt.Sprintf("%s `%s`.`%s`", binlogEntry.StatementType, binlogEntry.DatabaseName, binlogEntry.TableName)) } } for scanner.Scan() { switch state { - case SearchForStartPosState: + case SearchForStartPosOrStatementState: { var nextBinlogEntry *BinlogEntry - state, nextBinlogEntry, err = searchForStartPos(scanner, binlogEntry, endLogPos) + state, nextBinlogEntry, err = searchForStartPosOrStatement(scanner, binlogEntry, endLogPos) if nextBinlogEntry != binlogEntry { appendBinlogEntry() binlogEntry = nextBinlogEntry @@ -169,9 +181,9 @@ func parseEntries(entriesBytes []byte) (entries [](*BinlogEntry), err error) { { state, err = expectEndLogPos(scanner, binlogEntry) } - case SearchForStatementState: + case ExpectTokenState: { - state, err = searchForStatement(scanner, binlogEntry) + state, err = expectToken(scanner, binlogEntry) } default: { diff --git a/go/binlog/mysqlbinlog_reader_test.go b/go/binlog/mysqlbinlog_reader_test.go new file mode 100644 index 0000000..24a6e1a --- /dev/null +++ b/go/binlog/mysqlbinlog_reader_test.go @@ -0,0 +1,61 @@ +/* + Copyright 2014 Outbrain Inc. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package binlog + +import ( + "bufio" + "os" + "testing" + + "github.com/outbrain/golib/log" + test "github.com/outbrain/golib/tests" +) + +func init() { + log.SetLevel(log.ERROR) +} + +func TestRBRSample0(t *testing.T) { + testFile, err := os.Open("testdata/rbr-sample-0.txt") + test.S(t).ExpectNil(err) + defer testFile.Close() + + scanner := bufio.NewScanner(testFile) + entries, err := parseEntries(scanner) + test.S(t).ExpectNil(err) + + test.S(t).ExpectEquals(len(entries), 17) + test.S(t).ExpectEquals(entries[0].DatabaseName, "test") + test.S(t).ExpectEquals(entries[0].TableName, "samplet") + test.S(t).ExpectEquals(entries[0].StatementType, "INSERT") + test.S(t).ExpectEquals(entries[1].StatementType, "INSERT") + test.S(t).ExpectEquals(entries[2].StatementType, "INSERT") + test.S(t).ExpectEquals(entries[3].StatementType, "INSERT") + test.S(t).ExpectEquals(entries[4].StatementType, "INSERT") + test.S(t).ExpectEquals(entries[5].StatementType, "INSERT") + test.S(t).ExpectEquals(entries[6].StatementType, "UPDATE") + test.S(t).ExpectEquals(entries[7].StatementType, "DELETE") + test.S(t).ExpectEquals(entries[8].StatementType, "UPDATE") + test.S(t).ExpectEquals(entries[9].StatementType, "INSERT") + test.S(t).ExpectEquals(entries[10].StatementType, "INSERT") + test.S(t).ExpectEquals(entries[11].StatementType, "DELETE") + test.S(t).ExpectEquals(entries[12].StatementType, "DELETE") + test.S(t).ExpectEquals(entries[13].StatementType, "INSERT") + test.S(t).ExpectEquals(entries[14].StatementType, "UPDATE") + test.S(t).ExpectEquals(entries[15].StatementType, "DELETE") + test.S(t).ExpectEquals(entries[16].StatementType, "INSERT") +} diff --git a/go/binlog/testdata/rbr-sample-0.txt b/go/binlog/testdata/rbr-sample-0.txt new file mode 100644 index 0000000..a7c0509 --- /dev/null +++ b/go/binlog/testdata/rbr-sample-0.txt @@ -0,0 +1,244 @@ +/*!50530 SET @@SESSION.PSEUDO_SLAVE_MODE=1*/; +/*!40019 SET @@session.max_insert_delayed_threads=0*/; +/*!50003 SET @OLD_COMPLETION_TYPE=@@COMPLETION_TYPE,COMPLETION_TYPE=0*/; +DELIMITER /*!*/; +# at 4 +#160324 11:06:59 server id 104 end_log_pos 120 CRC32 0x059174d8 Start: binlog v 4, server v 5.6.28-log created 160324 11:06:59 +# at 120 +#160324 11:09:53 server id 1 end_log_pos 313 CRC32 0x511efdf1 Query thread_id=7940 exec_time=0 error_code=0 +use `test`/*!*/; +SET TIMESTAMP=1458814193/*!*/; +SET @@session.pseudo_thread_id=7940/*!*/; +SET @@session.foreign_key_checks=1, @@session.sql_auto_is_null=0, @@session.unique_checks=1, @@session.autocommit=1/*!*/; +SET @@session.sql_mode=1073741824/*!*/; +SET @@session.auto_increment_increment=1, @@session.auto_increment_offset=1/*!*/; +/*!\C utf8 *//*!*/; +SET @@session.character_set_client=33,@@session.collation_connection=33,@@session.collation_server=8/*!*/; +SET @@session.lc_time_names=0/*!*/; +SET @@session.collation_database=DEFAULT/*!*/; +create table samplet(id int primary key, license int, name varchar(64), unique key license_uidx(license)) engine=innodb +/*!*/; +# at 313 +#160324 11:10:13 server id 1 end_log_pos 385 CRC32 0x6b95100a Query thread_id=7940 exec_time=0 error_code=0 +SET TIMESTAMP=1458814213/*!*/; +BEGIN +/*!*/; +# at 385 +#160324 11:10:13 server id 1 end_log_pos 439 CRC32 0xfa97ad69 Table_map: `test`.`samplet` mapped to number 72 +# at 439 +#160324 11:10:13 server id 1 end_log_pos 485 CRC32 0xae356826 Write_rows: table id 72 flags: STMT_END_F +### INSERT INTO `test`.`samplet` +### SET +### @1=1 +### @2=1 +### @3='a' +# at 485 +#160324 11:10:13 server id 1 end_log_pos 516 CRC32 0xf60389e3 Xid = 49802 +COMMIT/*!*/; +# at 516 +#160324 11:10:35 server id 1 end_log_pos 588 CRC32 0x1a8730ad Query thread_id=7940 exec_time=0 error_code=0 +SET TIMESTAMP=1458814235/*!*/; +BEGIN +/*!*/; +# at 588 +#160324 11:10:35 server id 1 end_log_pos 642 CRC32 0xac564207 Table_map: `test`.`samplet` mapped to number 72 +# at 642 +#160324 11:10:35 server id 1 end_log_pos 713 CRC32 0x3020ee9e Write_rows: table id 72 flags: STMT_END_F +### INSERT INTO `test`.`samplet` +### SET +### @1=2 +### @2=2 +### @3='extended' +### INSERT INTO `test`.`samplet` +### SET +### @1=3 +### @2=3 +### @3='extended' +# at 713 +#160324 11:10:35 server id 1 end_log_pos 744 CRC32 0x341f0c1d Xid = 49848 +COMMIT/*!*/; +# at 744 +#160324 11:10:47 server id 1 end_log_pos 816 CRC32 0x2454c8aa Query thread_id=7940 exec_time=14 error_code=0 +SET TIMESTAMP=1458814247/*!*/; +BEGIN +/*!*/; +# at 816 +#160324 11:10:47 server id 1 end_log_pos 870 CRC32 0x92018566 Table_map: `test`.`samplet` mapped to number 72 +# at 870 +#160324 11:10:47 server id 1 end_log_pos 926 CRC32 0x5b882310 Write_rows: table id 72 flags: STMT_END_F +### INSERT INTO `test`.`samplet` +### SET +### @1=4 +### @2=4 +### @3='transaction' +# at 926 +#160324 11:10:54 server id 1 end_log_pos 980 CRC32 0x374b624b Table_map: `test`.`samplet` mapped to number 72 +# at 980 +#160324 11:10:54 server id 1 end_log_pos 1036 CRC32 0xfff6a2b9 Write_rows: table id 72 flags: STMT_END_F +### INSERT INTO `test`.`samplet` +### SET +### @1=5 +### @2=5 +### @3='transaction' +# at 1036 +#160324 11:10:59 server id 1 end_log_pos 1090 CRC32 0x37e19690 Table_map: `test`.`samplet` mapped to number 72 +# at 1090 +#160324 11:10:59 server id 1 end_log_pos 1146 CRC32 0x58a01053 Write_rows: table id 72 flags: STMT_END_F +### INSERT INTO `test`.`samplet` +### SET +### @1=6 +### @2=6 +### @3='transaction' +# at 1146 +#160324 11:10:59 server id 1 end_log_pos 1177 CRC32 0xdd5de027 Xid = 49894 +COMMIT/*!*/; +# at 1177 +#160324 11:11:16 server id 1 end_log_pos 1249 CRC32 0x5c4a609b Query thread_id=7940 exec_time=0 error_code=0 +SET TIMESTAMP=1458814276/*!*/; +BEGIN +/*!*/; +# at 1249 +#160324 11:11:16 server id 1 end_log_pos 1303 CRC32 0x9d3c756b Table_map: `test`.`samplet` mapped to number 72 +# at 1303 +#160324 11:11:16 server id 1 end_log_pos 1352 CRC32 0x9b0d2ff4 Update_rows: table id 72 flags: STMT_END_F +### UPDATE `test`.`samplet` +### WHERE +### @1=5 +### SET +### @3='update' +# at 1352 +#160324 11:11:16 server id 1 end_log_pos 1383 CRC32 0x8e051bed Xid = 49931 +COMMIT/*!*/; +# at 1383 +#160324 11:11:44 server id 1 end_log_pos 1455 CRC32 0xe9744e83 Query thread_id=7940 exec_time=0 error_code=0 +SET TIMESTAMP=1458814304/*!*/; +BEGIN +/*!*/; +# at 1455 +#160324 11:11:44 server id 1 end_log_pos 1509 CRC32 0x34672cb1 Table_map: `test`.`samplet` mapped to number 72 +# at 1509 +#160324 11:11:44 server id 1 end_log_pos 1549 CRC32 0x4383e9ee Delete_rows: table id 72 +# at 1549 +#160324 11:11:44 server id 1 end_log_pos 1612 CRC32 0x899eb398 Update_rows: table id 72 flags: STMT_END_F +### DELETE FROM `test`.`samplet` +### WHERE +### @1=2 +### UPDATE `test`.`samplet` +### WHERE +### @1=4 +### SET +### @1=2 +### @2=4 +### @3='replaced 2,4' +# at 1612 +#160324 11:11:44 server id 1 end_log_pos 1643 CRC32 0x037a8fe1 Xid = 49977 +COMMIT/*!*/; +# at 1643 +#160324 11:11:54 server id 1 end_log_pos 1715 CRC32 0xb02520cd Query thread_id=7940 exec_time=0 error_code=0 +SET TIMESTAMP=1458814314/*!*/; +BEGIN +/*!*/; +# at 1715 +#160324 11:11:54 server id 1 end_log_pos 1769 CRC32 0xcbcf4323 Table_map: `test`.`samplet` mapped to number 72 +# at 1769 +#160324 11:11:54 server id 1 end_log_pos 1815 CRC32 0x4d52b057 Write_rows: table id 72 flags: STMT_END_F +### INSERT INTO `test`.`samplet` +### SET +### @1=7 +### @2=7 +### @3='7' +# at 1815 +#160324 11:11:54 server id 1 end_log_pos 1846 CRC32 0x5289b6a4 Xid = 50001 +COMMIT/*!*/; +# at 1846 +#160324 11:11:59 server id 1 end_log_pos 1918 CRC32 0x1758ab97 Query thread_id=7940 exec_time=0 error_code=0 +SET TIMESTAMP=1458814319/*!*/; +BEGIN +/*!*/; +# at 1918 +#160324 11:11:59 server id 1 end_log_pos 1972 CRC32 0xa4602796 Table_map: `test`.`samplet` mapped to number 72 +# at 1972 +#160324 11:11:59 server id 1 end_log_pos 2018 CRC32 0x6a6eb0c9 Write_rows: table id 72 flags: STMT_END_F +### INSERT INTO `test`.`samplet` +### SET +### @1=8 +### @2=8 +### @3='8' +# at 2018 +#160324 11:11:59 server id 1 end_log_pos 2049 CRC32 0x6d0fef4d Xid = 50014 +COMMIT/*!*/; +# at 2049 +#160324 11:12:12 server id 1 end_log_pos 2121 CRC32 0x6cd5da13 Query thread_id=7940 exec_time=0 error_code=0 +SET TIMESTAMP=1458814332/*!*/; +BEGIN +/*!*/; +# at 2121 +#160324 11:12:12 server id 1 end_log_pos 2175 CRC32 0x8339241f Table_map: `test`.`samplet` mapped to number 72 +# at 2175 +#160324 11:12:12 server id 1 end_log_pos 2220 CRC32 0x669385e1 Delete_rows: table id 72 flags: STMT_END_F +### DELETE FROM `test`.`samplet` +### WHERE +### @1=7 +### DELETE FROM `test`.`samplet` +### WHERE +### @1=8 +# at 2220 +#160324 11:12:12 server id 1 end_log_pos 2251 CRC32 0xba81d2b0 Xid = 50038 +COMMIT/*!*/; +# at 2251 +#160324 11:12:20 server id 1 end_log_pos 2323 CRC32 0x4c58be8c Query thread_id=7940 exec_time=0 error_code=0 +SET TIMESTAMP=1458814340/*!*/; +BEGIN +/*!*/; +# at 2323 +#160324 11:12:20 server id 1 end_log_pos 2377 CRC32 0x9eb23ab9 Table_map: `test`.`samplet` mapped to number 72 +# at 2377 +#160324 11:12:20 server id 1 end_log_pos 2423 CRC32 0xac8116ec Write_rows: table id 72 flags: STMT_END_F +### INSERT INTO `test`.`samplet` +### SET +### @1=9 +### @2=9 +### @3='9' +# at 2423 +#160324 11:12:20 server id 1 end_log_pos 2454 CRC32 0x5ce77ad6 Xid = 50051 +COMMIT/*!*/; +# at 2454 +#160324 11:12:50 server id 1 end_log_pos 2526 CRC32 0xed19acbd Query thread_id=7940 exec_time=26 error_code=0 +SET TIMESTAMP=1458814370/*!*/; +BEGIN +/*!*/; +# at 2526 +#160324 11:12:50 server id 1 end_log_pos 2580 CRC32 0x0bf6b98f Table_map: `test`.`samplet` mapped to number 72 +# at 2580 +#160324 11:12:50 server id 1 end_log_pos 2631 CRC32 0x263c4579 Update_rows: table id 72 flags: STMT_END_F +### UPDATE `test`.`samplet` +### WHERE +### @1=9 +### SET +### @3='update 9' +# at 2631 +#160324 11:13:00 server id 1 end_log_pos 2685 CRC32 0x94b24c8b Table_map: `test`.`samplet` mapped to number 72 +# at 2685 +#160324 11:13:00 server id 1 end_log_pos 2725 CRC32 0xca43fe3a Delete_rows: table id 72 flags: STMT_END_F +### DELETE FROM `test`.`samplet` +### WHERE +### @1=3 +# at 2725 +#160324 11:13:14 server id 1 end_log_pos 2779 CRC32 0xc36088a2 Table_map: `test`.`samplet` mapped to number 72 +# at 2779 +#160324 11:13:14 server id 1 end_log_pos 2826 CRC32 0x98fc9dea Write_rows: table id 72 flags: STMT_END_F +### INSERT INTO `test`.`samplet` +### SET +### @1=10 +### @2=10 +### @3='10' +# at 2826 +#160324 11:13:14 server id 1 end_log_pos 2857 CRC32 0x729c371f Xid = 50163 +COMMIT/*!*/; +# at 2857 +#160324 11:13:31 server id 104 end_log_pos 2904 CRC32 0x38531c7d Rotate to mysql-bin.000053 pos: 4 +DELIMITER ; +# End of log file +ROLLBACK /* added by mysqlbinlog */; +/*!50003 SET COMPLETION_TYPE=@OLD_COMPLETION_TYPE*/; +/*!50530 SET @@SESSION.PSEUDO_SLAVE_MODE=0*/; From fb04eb232fec74686765fd5124ba2c230509d338 Mon Sep 17 00:00:00 2001 From: Shlomi Noach Date: Thu, 24 Mar 2016 14:52:49 +0100 Subject: [PATCH 3/5] simplified appendBinlogEntry() --- go/binlog/mysqlbinlog_reader.go | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/go/binlog/mysqlbinlog_reader.go b/go/binlog/mysqlbinlog_reader.go index c368631..5f33284 100644 --- a/go/binlog/mysqlbinlog_reader.go +++ b/go/binlog/mysqlbinlog_reader.go @@ -160,11 +160,15 @@ func parseEntries(scanner *bufio.Scanner) (entries [](*BinlogEntry), err error) var endLogPos uint64 appendBinlogEntry := func() { - if binlogEntry.StatementType != "" { - entries = append(entries, binlogEntry) - log.Debugf("entry: %+v", *binlogEntry) - //fmt.Println(fmt.Sprintf("%s `%s`.`%s`", binlogEntry.StatementType, binlogEntry.DatabaseName, binlogEntry.TableName)) + if binlogEntry.LogPos == 0 { + return } + if binlogEntry.StatementType == "" { + return + } + entries = append(entries, binlogEntry) + log.Debugf("entry: %+v", *binlogEntry) + fmt.Println(fmt.Sprintf("%s `%s`.`%s`", binlogEntry.StatementType, binlogEntry.DatabaseName, binlogEntry.TableName)) } for scanner.Scan() { switch state { @@ -194,8 +198,6 @@ func parseEntries(scanner *bufio.Scanner) (entries [](*BinlogEntry), err error) return entries, log.Errore(err) } } - if binlogEntry.LogPos != 0 { - appendBinlogEntry() - } + appendBinlogEntry() return entries, err } From 96a8fd50c3271b30e1a70cc84ce40f916727febc Mon Sep 17 00:00:00 2001 From: Shlomi Noach Date: Thu, 24 Mar 2016 15:11:56 +0100 Subject: [PATCH 4/5] minor refactoring; fixed license comments --- build.sh | 2 ++ go/binlog/binlog.go | 4 +++- go/binlog/binlog_entry.go | 14 ++++++++++++++ go/binlog/binlog_reader.go | 13 ++++--------- go/binlog/binlog_test.go | 15 ++------------- go/binlog/mysqlbinlog_reader.go | 13 +++++++++++-- go/binlog/mysqlbinlog_reader_test.go | 15 ++------------- go/cmd/gh-osc/main.go | 5 +++++ go/os/process.go | 2 -- 9 files changed, 43 insertions(+), 40 deletions(-) create mode 100644 go/binlog/binlog_entry.go diff --git a/build.sh b/build.sh index fc48ca9..2041f2e 100644 --- a/build.sh +++ b/build.sh @@ -1,4 +1,6 @@ #!/bin/bash +# +# buildpath=/tmp/gh-osc target=gh-osc diff --git a/go/binlog/binlog.go b/go/binlog/binlog.go index 6925fa1..8f7c67e 100644 --- a/go/binlog/binlog.go +++ b/go/binlog/binlog.go @@ -1,5 +1,5 @@ /* - Copyright 2015 Shlomi Noach, courtesy Booking.com + Copyright 2015 Shlomi Noach */ package binlog @@ -11,8 +11,10 @@ import ( "strings" ) +// BinlogType identifies the type of the log: relay or binary log type BinlogType int +// BinaryLog, RelayLog are binlog types const ( BinaryLog BinlogType = iota RelayLog diff --git a/go/binlog/binlog_entry.go b/go/binlog/binlog_entry.go new file mode 100644 index 0000000..bbfbecc --- /dev/null +++ b/go/binlog/binlog_entry.go @@ -0,0 +1,14 @@ +/* + Copyright 2016 GitHub Inc. + See https://github.com/github/gh-osc/blob/master/LICENSE +*/ + +package binlog + +type BinlogEntry struct { + LogPos uint64 + EndLogPos uint64 + StatementType string // INSERT, UPDATE, DELETE + DatabaseName string + TableName string +} diff --git a/go/binlog/binlog_reader.go b/go/binlog/binlog_reader.go index a837249..3392460 100644 --- a/go/binlog/binlog_reader.go +++ b/go/binlog/binlog_reader.go @@ -1,17 +1,12 @@ /* - Copyright 2016 GitHub Inc. + Copyright 2016 GitHub Inc. + See https://github.com/github/gh-osc/blob/master/LICENSE */ package binlog -type BinlogEntry struct { - LogPos uint64 - EndLogPos uint64 - StatementType string // INSERT, UPDATE, DELETE - DatabaseName string - TableName string -} - +// BinlogReader is a general interface whose implementations can choose their methods of reading +// a binary log file and parsing it into binlog entries type BinlogReader interface { ReadEntries(logFile string, startPos uint64, stopPos uint64) (entries [](*BinlogEntry), err error) } diff --git a/go/binlog/binlog_test.go b/go/binlog/binlog_test.go index 55fff25..75cca90 100644 --- a/go/binlog/binlog_test.go +++ b/go/binlog/binlog_test.go @@ -1,17 +1,6 @@ /* - Copyright 2014 Outbrain Inc. - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. + Copyright 2016 GitHub Inc. + See https://github.com/github/gh-osc/blob/master/LICENSE */ package binlog diff --git a/go/binlog/mysqlbinlog_reader.go b/go/binlog/mysqlbinlog_reader.go index 5f33284..44a60b8 100644 --- a/go/binlog/mysqlbinlog_reader.go +++ b/go/binlog/mysqlbinlog_reader.go @@ -1,5 +1,6 @@ /* - Copyright 2016 GitHub Inc. + Copyright 2016 GitHub Inc. + See https://github.com/github/gh-osc/blob/master/LICENSE */ package binlog @@ -26,8 +27,10 @@ var ( tokenRegxp = regexp.MustCompile("### (WHERE|SET)$") ) +// BinlogEntryState is a state in the binlog parser automaton / state machine type BinlogEntryState string +// States of the state machine const ( InvalidState BinlogEntryState = "InvalidState" SearchForStartPosOrStatementState = "SearchForStartPosOrStatementState" @@ -44,6 +47,7 @@ type MySQLBinlogReader struct { MySQLBinlogBinary string } +// NewMySQLBinlogReader creates a new reader that directly parses binlog files from the filesystem func NewMySQLBinlogReader(basedir string, datadir string) (mySQLBinlogReader *MySQLBinlogReader) { mySQLBinlogReader = &MySQLBinlogReader{ Basedir: basedir, @@ -89,8 +93,8 @@ func (this *MySQLBinlogReader) ReadEntries(logFile string, startPos uint64, stop return entries, err } +// automaton step: accept wither beginning of new entry, or beginning of new statement func searchForStartPosOrStatement(scanner *bufio.Scanner, binlogEntry *BinlogEntry, previousEndLogPos uint64) (nextState BinlogEntryState, nextBinlogEntry *BinlogEntry, err error) { - onStartEntry := func(submatch []string) (BinlogEntryState, *BinlogEntry, error) { startLogPos, _ := strconv.ParseUint(submatch[1], 10, 64) @@ -135,6 +139,7 @@ func searchForStartPosOrStatement(scanner *bufio.Scanner, binlogEntry *BinlogEnt return SearchForStartPosOrStatementState, binlogEntry, nil } +// automaton step: expect an end_log_pos line` func expectEndLogPos(scanner *bufio.Scanner, binlogEntry *BinlogEntry) (nextState BinlogEntryState, err error) { line := scanner.Text() @@ -146,6 +151,8 @@ func expectEndLogPos(scanner *bufio.Scanner, binlogEntry *BinlogEntry) (nextStat return InvalidState, fmt.Errorf("Expected to find end_log_pos following pos %+v", binlogEntry.LogPos) } +// automaton step: a not-strictly-required but good-to-have-around validation that +// we see an expected token following a statement func expectToken(scanner *bufio.Scanner, binlogEntry *BinlogEntry) (nextState BinlogEntryState, err error) { line := scanner.Text() if submatch := tokenRegxp.FindStringSubmatch(line); len(submatch) > 1 { @@ -154,6 +161,8 @@ func expectToken(scanner *bufio.Scanner, binlogEntry *BinlogEntry) (nextState Bi return InvalidState, fmt.Errorf("Expected to find token following pos %+v", binlogEntry.LogPos) } +// parseEntries will parse output of `mysqlbinlog --verbose --base64-output=DECODE-ROWS` +// It issues an automaton / state machine to do its thang. func parseEntries(scanner *bufio.Scanner) (entries [](*BinlogEntry), err error) { binlogEntry := &BinlogEntry{} var state BinlogEntryState = SearchForStartPosOrStatementState diff --git a/go/binlog/mysqlbinlog_reader_test.go b/go/binlog/mysqlbinlog_reader_test.go index 24a6e1a..0f75e82 100644 --- a/go/binlog/mysqlbinlog_reader_test.go +++ b/go/binlog/mysqlbinlog_reader_test.go @@ -1,17 +1,6 @@ /* - Copyright 2014 Outbrain Inc. - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. + Copyright 2016 GitHub Inc. + See https://github.com/github/gh-osc/blob/master/LICENSE */ package binlog diff --git a/go/cmd/gh-osc/main.go b/go/cmd/gh-osc/main.go index c5e9d05..4cb8a10 100644 --- a/go/cmd/gh-osc/main.go +++ b/go/cmd/gh-osc/main.go @@ -1,3 +1,8 @@ +/* + Copyright 2016 GitHub Inc. + See https://github.com/github/gh-osc/blob/master/LICENSE +*/ + package main import ( diff --git a/go/os/process.go b/go/os/process.go index ee19b66..edcb62c 100644 --- a/go/os/process.go +++ b/go/os/process.go @@ -35,8 +35,6 @@ func execCmd(commandText string, arguments ...string) (*exec.Cmd, string, error) shellArguments = append(shellArguments, arguments...) log.Debugf("%+v", shellArguments) return exec.Command("bash", shellArguments...), tmpFile.Name(), nil - - //return exec.Command(commandText, arguments...) , "", nil } // CommandRun executes a command From 30a472f74141fefeba7c6e591ffaf93ba02debbc Mon Sep 17 00:00:00 2001 From: Shlomi Noach Date: Thu, 24 Mar 2016 22:27:39 +0100 Subject: [PATCH 5/5] begin work on positional columns. Incomplete --- go/binlog/binlog_entry.go | 27 ++++++++++++++++++++++----- go/binlog/mysqlbinlog_reader.go | 23 ++++++++++++++++++++--- go/binlog/testdata/rbr-sample-0.txt | 23 +++++++++++++++++++++++ 3 files changed, 65 insertions(+), 8 deletions(-) diff --git a/go/binlog/binlog_entry.go b/go/binlog/binlog_entry.go index bbfbecc..0610c70 100644 --- a/go/binlog/binlog_entry.go +++ b/go/binlog/binlog_entry.go @@ -5,10 +5,27 @@ package binlog +// BinlogEntry describes an entry in the binary log type BinlogEntry struct { - LogPos uint64 - EndLogPos uint64 - StatementType string // INSERT, UPDATE, DELETE - DatabaseName string - TableName string + LogPos uint64 + EndLogPos uint64 + StatementType string // INSERT, UPDATE, DELETE + DatabaseName string + TableName string + PositionalColumns map[uint64]interface{} +} + +// NewBinlogEntry creates an empty, ready to go BinlogEntry object +func NewBinlogEntry() *BinlogEntry { + binlogEntry := &BinlogEntry{} + binlogEntry.PositionalColumns = make(map[uint64]interface{}) + return binlogEntry +} + +// Duplicate creates and returns a new binlog entry, with some of the attributes pre-assigned +func (this *BinlogEntry) Duplicate() *BinlogEntry { + binlogEntry := NewBinlogEntry() + binlogEntry.LogPos = this.LogPos + binlogEntry.EndLogPos = this.EndLogPos + return binlogEntry } diff --git a/go/binlog/mysqlbinlog_reader.go b/go/binlog/mysqlbinlog_reader.go index 44a60b8..9ba02ae 100644 --- a/go/binlog/mysqlbinlog_reader.go +++ b/go/binlog/mysqlbinlog_reader.go @@ -25,6 +25,7 @@ var ( endLogPosRegexp = regexp.MustCompile("^#[0-9]{6} .*? end_log_pos ([0-9]+)") statementRegxp = regexp.MustCompile("### (INSERT INTO|UPDATE|DELETE FROM) `(.*?)`[.]`(.*?)`") tokenRegxp = regexp.MustCompile("### (WHERE|SET)$") + positionalColumnRegexp = regexp.MustCompile("### @([0-9]+)=(.+)$") ) // BinlogEntryState is a state in the binlog parser automaton / state machine @@ -104,7 +105,7 @@ func searchForStartPosOrStatement(scanner *bufio.Scanner, binlogEntry *BinlogEnt nextBinlogEntry = binlogEntry if binlogEntry.LogPos != 0 && binlogEntry.StatementType != "" { // Current entry is already a true entry, with startpos and with statement - nextBinlogEntry = &BinlogEntry{} + nextBinlogEntry = NewBinlogEntry() } nextBinlogEntry.LogPos = startLogPos @@ -115,7 +116,7 @@ func searchForStartPosOrStatement(scanner *bufio.Scanner, binlogEntry *BinlogEnt nextBinlogEntry = binlogEntry if binlogEntry.LogPos != 0 && binlogEntry.StatementType != "" { // Current entry is already a true entry, with startpos and with statement - nextBinlogEntry = &BinlogEntry{LogPos: binlogEntry.LogPos, EndLogPos: binlogEntry.EndLogPos} + nextBinlogEntry = binlogEntry.Duplicate() } nextBinlogEntry.StatementType = strings.Split(submatch[1], " ")[0] @@ -125,6 +126,19 @@ func searchForStartPosOrStatement(scanner *bufio.Scanner, binlogEntry *BinlogEnt return ExpectTokenState, nextBinlogEntry, nil } + onPositionalColumn := func(submatch []string) (BinlogEntryState, *BinlogEntry, error) { + columnIndex, _ := strconv.ParseUint(submatch[1], 10, 64) + if _, found := binlogEntry.PositionalColumns[columnIndex]; found { + return InvalidState, binlogEntry, fmt.Errorf("Positional column %+v found more than once in %+v, statement=%+v", columnIndex, binlogEntry.LogPos, binlogEntry.StatementType) + } + columnValue := submatch[2] + columnValue = strings.TrimPrefix(columnValue, "'") + columnValue = strings.TrimSuffix(columnValue, "'") + binlogEntry.PositionalColumns[columnIndex] = columnValue + + return SearchForStartPosOrStatementState, binlogEntry, nil + } + line := scanner.Text() if submatch := startEntryRegexp.FindStringSubmatch(line); len(submatch) > 1 { return onStartEntry(submatch) @@ -135,6 +149,9 @@ func searchForStartPosOrStatement(scanner *bufio.Scanner, binlogEntry *BinlogEnt if submatch := statementRegxp.FindStringSubmatch(line); len(submatch) > 1 { return onStatementEntry(submatch) } + if submatch := positionalColumnRegexp.FindStringSubmatch(line); len(submatch) > 1 { + return onPositionalColumn(submatch) + } // Haven't found a match return SearchForStartPosOrStatementState, binlogEntry, nil } @@ -164,7 +181,7 @@ func expectToken(scanner *bufio.Scanner, binlogEntry *BinlogEntry) (nextState Bi // parseEntries will parse output of `mysqlbinlog --verbose --base64-output=DECODE-ROWS` // It issues an automaton / state machine to do its thang. func parseEntries(scanner *bufio.Scanner) (entries [](*BinlogEntry), err error) { - binlogEntry := &BinlogEntry{} + binlogEntry := NewBinlogEntry() var state BinlogEntryState = SearchForStartPosOrStatementState var endLogPos uint64 diff --git a/go/binlog/testdata/rbr-sample-0.txt b/go/binlog/testdata/rbr-sample-0.txt index a7c0509..1ea750b 100644 --- a/go/binlog/testdata/rbr-sample-0.txt +++ b/go/binlog/testdata/rbr-sample-0.txt @@ -1,3 +1,26 @@ +/* + these are the statements that were used to execute the RBR log: + + create table samplet(id int primary key, license int, name varchar(64), unique key license_uidx(license)) engine=innodb; + insert into samplet values (1,1,'a'); + insert into samplet values (2,2,'extended'),(3,3,'extended'); + begin; + insert into samplet values (4,4,'transaction'); + insert into samplet values (5,5,'transaction'); + insert into samplet values (6,6,'transaction'); + commit; + update samplet set name='update' where id=5; + replace into samplet values (2,4,'replaced 2,4'); + insert into samplet values (7,7,'7'); + insert into samplet values (8,8,'8'); + delete from samplet where id >= 7; + insert into samplet values (9,9,'9'); + begin; + update samplet set name='update 9' where id=9; + delete from samplet where license=3; + insert into samplet values (10,10,'10'); + commit; +*/ /*!50530 SET @@SESSION.PSEUDO_SLAVE_MODE=1*/; /*!40019 SET @@session.max_insert_delayed_threads=0*/; /*!50003 SET @OLD_COMPLETION_TYPE=@@COMPLETION_TYPE,COMPLETION_TYPE=0*/;