diff --git a/go/binlog/binlog_test.go b/go/binlog/binlog_test.go index 1314d1c..55fff25 100644 --- a/go/binlog/binlog_test.go +++ b/go/binlog/binlog_test.go @@ -17,9 +17,10 @@ package binlog import ( + "testing" + "github.com/outbrain/golib/log" test "github.com/outbrain/golib/tests" - "testing" ) func init() { diff --git a/go/binlog/mysqlbinlog_reader.go b/go/binlog/mysqlbinlog_reader.go index 28c733d..c368631 100644 --- a/go/binlog/mysqlbinlog_reader.go +++ b/go/binlog/mysqlbinlog_reader.go @@ -23,17 +23,17 @@ var ( startEntryUnknownTableRegexp = regexp.MustCompile("^### Row event for unknown table .*? at ([0-9]+)$") endLogPosRegexp = regexp.MustCompile("^#[0-9]{6} .*? end_log_pos ([0-9]+)") statementRegxp = regexp.MustCompile("### (INSERT INTO|UPDATE|DELETE FROM) `(.*?)`[.]`(.*?)`") + tokenRegxp = regexp.MustCompile("### (WHERE|SET)$") ) type BinlogEntryState string const ( - InvalidState BinlogEntryState = "InvalidState" - SearchForStartPosState = "SearchForStartPosState" - ExpectEndLogPosState = "ExpectEndLogPosState" - SearchForStatementState = "SearchForStatementState" - ExpectTokenState = "ExpectTokenState" - PositionalColumnAssignmentState = "PositionalColumnAssignmentState" + InvalidState BinlogEntryState = "InvalidState" + SearchForStartPosOrStatementState = "SearchForStartPosOrStatementState" + ExpectEndLogPosState = "ExpectEndLogPosState" + ExpectTokenState = "ExpectTokenState" + PositionalColumnAssignmentState = "PositionalColumnAssignmentState" ) // MySQLBinlogReader reads binary log entries by executing the `mysqlbinlog` @@ -72,7 +72,8 @@ func (this *MySQLBinlogReader) ReadEntries(logFile string, startPos uint64, stop if err != nil { return entries, log.Errore(err) } - chunkEntries, err := parseEntries(entriesBytes) + + chunkEntries, err := parseEntries(bufio.NewScanner(bytes.NewReader(entriesBytes))) if err != nil { return entries, log.Errore(err) } @@ -88,7 +89,7 @@ func (this *MySQLBinlogReader) ReadEntries(logFile string, startPos uint64, stop return entries, err } -func searchForStartPos(scanner *bufio.Scanner, binlogEntry *BinlogEntry, previousEndLogPos uint64) (nextState BinlogEntryState, nextBinlogEntry *BinlogEntry, err error) { +func searchForStartPosOrStatement(scanner *bufio.Scanner, binlogEntry *BinlogEntry, previousEndLogPos uint64) (nextState BinlogEntryState, nextBinlogEntry *BinlogEntry, err error) { onStartEntry := func(submatch []string) (BinlogEntryState, *BinlogEntry, error) { startLogPos, _ := strconv.ParseUint(submatch[1], 10, 64) @@ -106,6 +107,20 @@ func searchForStartPos(scanner *bufio.Scanner, binlogEntry *BinlogEntry, previou return ExpectEndLogPosState, nextBinlogEntry, nil } + onStatementEntry := func(submatch []string) (BinlogEntryState, *BinlogEntry, error) { + nextBinlogEntry = binlogEntry + if binlogEntry.LogPos != 0 && binlogEntry.StatementType != "" { + // Current entry is already a true entry, with startpos and with statement + nextBinlogEntry = &BinlogEntry{LogPos: binlogEntry.LogPos, EndLogPos: binlogEntry.EndLogPos} + } + + nextBinlogEntry.StatementType = strings.Split(submatch[1], " ")[0] + nextBinlogEntry.DatabaseName = submatch[2] + nextBinlogEntry.TableName = submatch[3] + + return ExpectTokenState, nextBinlogEntry, nil + } + line := scanner.Text() if submatch := startEntryRegexp.FindStringSubmatch(line); len(submatch) > 1 { return onStartEntry(submatch) @@ -113,53 +128,50 @@ func searchForStartPos(scanner *bufio.Scanner, binlogEntry *BinlogEntry, previou if submatch := startEntryUnknownTableRegexp.FindStringSubmatch(line); len(submatch) > 1 { return onStartEntry(submatch) } - // Haven't found a start entry - return SearchForStartPosState, binlogEntry, nil + if submatch := statementRegxp.FindStringSubmatch(line); len(submatch) > 1 { + return onStatementEntry(submatch) + } + // Haven't found a match + return SearchForStartPosOrStatementState, binlogEntry, nil } func expectEndLogPos(scanner *bufio.Scanner, binlogEntry *BinlogEntry) (nextState BinlogEntryState, err error) { line := scanner.Text() submatch := endLogPosRegexp.FindStringSubmatch(line) - if len(submatch) <= 1 { - return InvalidState, fmt.Errorf("Expected to find end_log_pos following pos %+v", binlogEntry.LogPos) + if len(submatch) > 1 { + binlogEntry.EndLogPos, _ = strconv.ParseUint(submatch[1], 10, 64) + return SearchForStartPosOrStatementState, nil } - binlogEntry.EndLogPos, _ = strconv.ParseUint(submatch[1], 10, 64) - - return SearchForStatementState, nil + return InvalidState, fmt.Errorf("Expected to find end_log_pos following pos %+v", binlogEntry.LogPos) } -func searchForStatement(scanner *bufio.Scanner, binlogEntry *BinlogEntry) (nextState BinlogEntryState, err error) { +func expectToken(scanner *bufio.Scanner, binlogEntry *BinlogEntry) (nextState BinlogEntryState, err error) { line := scanner.Text() - - if submatch := statementRegxp.FindStringSubmatch(line); len(submatch) > 1 { - binlogEntry.StatementType = strings.Split(submatch[1], " ")[0] - binlogEntry.DatabaseName = submatch[2] - binlogEntry.TableName = submatch[3] - - return SearchForStartPosState, nil + if submatch := tokenRegxp.FindStringSubmatch(line); len(submatch) > 1 { + return SearchForStartPosOrStatementState, nil } - return SearchForStatementState, nil + return InvalidState, fmt.Errorf("Expected to find token following pos %+v", binlogEntry.LogPos) } -func parseEntries(entriesBytes []byte) (entries [](*BinlogEntry), err error) { - scanner := bufio.NewScanner(bytes.NewReader(entriesBytes)) +func parseEntries(scanner *bufio.Scanner) (entries [](*BinlogEntry), err error) { binlogEntry := &BinlogEntry{} - var state BinlogEntryState = SearchForStartPosState + var state BinlogEntryState = SearchForStartPosOrStatementState var endLogPos uint64 appendBinlogEntry := func() { - entries = append(entries, binlogEntry) if binlogEntry.StatementType != "" { + entries = append(entries, binlogEntry) log.Debugf("entry: %+v", *binlogEntry) + //fmt.Println(fmt.Sprintf("%s `%s`.`%s`", binlogEntry.StatementType, binlogEntry.DatabaseName, binlogEntry.TableName)) } } for scanner.Scan() { switch state { - case SearchForStartPosState: + case SearchForStartPosOrStatementState: { var nextBinlogEntry *BinlogEntry - state, nextBinlogEntry, err = searchForStartPos(scanner, binlogEntry, endLogPos) + state, nextBinlogEntry, err = searchForStartPosOrStatement(scanner, binlogEntry, endLogPos) if nextBinlogEntry != binlogEntry { appendBinlogEntry() binlogEntry = nextBinlogEntry @@ -169,9 +181,9 @@ func parseEntries(entriesBytes []byte) (entries [](*BinlogEntry), err error) { { state, err = expectEndLogPos(scanner, binlogEntry) } - case SearchForStatementState: + case ExpectTokenState: { - state, err = searchForStatement(scanner, binlogEntry) + state, err = expectToken(scanner, binlogEntry) } default: { diff --git a/go/binlog/mysqlbinlog_reader_test.go b/go/binlog/mysqlbinlog_reader_test.go new file mode 100644 index 0000000..24a6e1a --- /dev/null +++ b/go/binlog/mysqlbinlog_reader_test.go @@ -0,0 +1,61 @@ +/* + Copyright 2014 Outbrain Inc. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package binlog + +import ( + "bufio" + "os" + "testing" + + "github.com/outbrain/golib/log" + test "github.com/outbrain/golib/tests" +) + +func init() { + log.SetLevel(log.ERROR) +} + +func TestRBRSample0(t *testing.T) { + testFile, err := os.Open("testdata/rbr-sample-0.txt") + test.S(t).ExpectNil(err) + defer testFile.Close() + + scanner := bufio.NewScanner(testFile) + entries, err := parseEntries(scanner) + test.S(t).ExpectNil(err) + + test.S(t).ExpectEquals(len(entries), 17) + test.S(t).ExpectEquals(entries[0].DatabaseName, "test") + test.S(t).ExpectEquals(entries[0].TableName, "samplet") + test.S(t).ExpectEquals(entries[0].StatementType, "INSERT") + test.S(t).ExpectEquals(entries[1].StatementType, "INSERT") + test.S(t).ExpectEquals(entries[2].StatementType, "INSERT") + test.S(t).ExpectEquals(entries[3].StatementType, "INSERT") + test.S(t).ExpectEquals(entries[4].StatementType, "INSERT") + test.S(t).ExpectEquals(entries[5].StatementType, "INSERT") + test.S(t).ExpectEquals(entries[6].StatementType, "UPDATE") + test.S(t).ExpectEquals(entries[7].StatementType, "DELETE") + test.S(t).ExpectEquals(entries[8].StatementType, "UPDATE") + test.S(t).ExpectEquals(entries[9].StatementType, "INSERT") + test.S(t).ExpectEquals(entries[10].StatementType, "INSERT") + test.S(t).ExpectEquals(entries[11].StatementType, "DELETE") + test.S(t).ExpectEquals(entries[12].StatementType, "DELETE") + test.S(t).ExpectEquals(entries[13].StatementType, "INSERT") + test.S(t).ExpectEquals(entries[14].StatementType, "UPDATE") + test.S(t).ExpectEquals(entries[15].StatementType, "DELETE") + test.S(t).ExpectEquals(entries[16].StatementType, "INSERT") +} diff --git a/go/binlog/testdata/rbr-sample-0.txt b/go/binlog/testdata/rbr-sample-0.txt new file mode 100644 index 0000000..a7c0509 --- /dev/null +++ b/go/binlog/testdata/rbr-sample-0.txt @@ -0,0 +1,244 @@ +/*!50530 SET @@SESSION.PSEUDO_SLAVE_MODE=1*/; +/*!40019 SET @@session.max_insert_delayed_threads=0*/; +/*!50003 SET @OLD_COMPLETION_TYPE=@@COMPLETION_TYPE,COMPLETION_TYPE=0*/; +DELIMITER /*!*/; +# at 4 +#160324 11:06:59 server id 104 end_log_pos 120 CRC32 0x059174d8 Start: binlog v 4, server v 5.6.28-log created 160324 11:06:59 +# at 120 +#160324 11:09:53 server id 1 end_log_pos 313 CRC32 0x511efdf1 Query thread_id=7940 exec_time=0 error_code=0 +use `test`/*!*/; +SET TIMESTAMP=1458814193/*!*/; +SET @@session.pseudo_thread_id=7940/*!*/; +SET @@session.foreign_key_checks=1, @@session.sql_auto_is_null=0, @@session.unique_checks=1, @@session.autocommit=1/*!*/; +SET @@session.sql_mode=1073741824/*!*/; +SET @@session.auto_increment_increment=1, @@session.auto_increment_offset=1/*!*/; +/*!\C utf8 *//*!*/; +SET @@session.character_set_client=33,@@session.collation_connection=33,@@session.collation_server=8/*!*/; +SET @@session.lc_time_names=0/*!*/; +SET @@session.collation_database=DEFAULT/*!*/; +create table samplet(id int primary key, license int, name varchar(64), unique key license_uidx(license)) engine=innodb +/*!*/; +# at 313 +#160324 11:10:13 server id 1 end_log_pos 385 CRC32 0x6b95100a Query thread_id=7940 exec_time=0 error_code=0 +SET TIMESTAMP=1458814213/*!*/; +BEGIN +/*!*/; +# at 385 +#160324 11:10:13 server id 1 end_log_pos 439 CRC32 0xfa97ad69 Table_map: `test`.`samplet` mapped to number 72 +# at 439 +#160324 11:10:13 server id 1 end_log_pos 485 CRC32 0xae356826 Write_rows: table id 72 flags: STMT_END_F +### INSERT INTO `test`.`samplet` +### SET +### @1=1 +### @2=1 +### @3='a' +# at 485 +#160324 11:10:13 server id 1 end_log_pos 516 CRC32 0xf60389e3 Xid = 49802 +COMMIT/*!*/; +# at 516 +#160324 11:10:35 server id 1 end_log_pos 588 CRC32 0x1a8730ad Query thread_id=7940 exec_time=0 error_code=0 +SET TIMESTAMP=1458814235/*!*/; +BEGIN +/*!*/; +# at 588 +#160324 11:10:35 server id 1 end_log_pos 642 CRC32 0xac564207 Table_map: `test`.`samplet` mapped to number 72 +# at 642 +#160324 11:10:35 server id 1 end_log_pos 713 CRC32 0x3020ee9e Write_rows: table id 72 flags: STMT_END_F +### INSERT INTO `test`.`samplet` +### SET +### @1=2 +### @2=2 +### @3='extended' +### INSERT INTO `test`.`samplet` +### SET +### @1=3 +### @2=3 +### @3='extended' +# at 713 +#160324 11:10:35 server id 1 end_log_pos 744 CRC32 0x341f0c1d Xid = 49848 +COMMIT/*!*/; +# at 744 +#160324 11:10:47 server id 1 end_log_pos 816 CRC32 0x2454c8aa Query thread_id=7940 exec_time=14 error_code=0 +SET TIMESTAMP=1458814247/*!*/; +BEGIN +/*!*/; +# at 816 +#160324 11:10:47 server id 1 end_log_pos 870 CRC32 0x92018566 Table_map: `test`.`samplet` mapped to number 72 +# at 870 +#160324 11:10:47 server id 1 end_log_pos 926 CRC32 0x5b882310 Write_rows: table id 72 flags: STMT_END_F +### INSERT INTO `test`.`samplet` +### SET +### @1=4 +### @2=4 +### @3='transaction' +# at 926 +#160324 11:10:54 server id 1 end_log_pos 980 CRC32 0x374b624b Table_map: `test`.`samplet` mapped to number 72 +# at 980 +#160324 11:10:54 server id 1 end_log_pos 1036 CRC32 0xfff6a2b9 Write_rows: table id 72 flags: STMT_END_F +### INSERT INTO `test`.`samplet` +### SET +### @1=5 +### @2=5 +### @3='transaction' +# at 1036 +#160324 11:10:59 server id 1 end_log_pos 1090 CRC32 0x37e19690 Table_map: `test`.`samplet` mapped to number 72 +# at 1090 +#160324 11:10:59 server id 1 end_log_pos 1146 CRC32 0x58a01053 Write_rows: table id 72 flags: STMT_END_F +### INSERT INTO `test`.`samplet` +### SET +### @1=6 +### @2=6 +### @3='transaction' +# at 1146 +#160324 11:10:59 server id 1 end_log_pos 1177 CRC32 0xdd5de027 Xid = 49894 +COMMIT/*!*/; +# at 1177 +#160324 11:11:16 server id 1 end_log_pos 1249 CRC32 0x5c4a609b Query thread_id=7940 exec_time=0 error_code=0 +SET TIMESTAMP=1458814276/*!*/; +BEGIN +/*!*/; +# at 1249 +#160324 11:11:16 server id 1 end_log_pos 1303 CRC32 0x9d3c756b Table_map: `test`.`samplet` mapped to number 72 +# at 1303 +#160324 11:11:16 server id 1 end_log_pos 1352 CRC32 0x9b0d2ff4 Update_rows: table id 72 flags: STMT_END_F +### UPDATE `test`.`samplet` +### WHERE +### @1=5 +### SET +### @3='update' +# at 1352 +#160324 11:11:16 server id 1 end_log_pos 1383 CRC32 0x8e051bed Xid = 49931 +COMMIT/*!*/; +# at 1383 +#160324 11:11:44 server id 1 end_log_pos 1455 CRC32 0xe9744e83 Query thread_id=7940 exec_time=0 error_code=0 +SET TIMESTAMP=1458814304/*!*/; +BEGIN +/*!*/; +# at 1455 +#160324 11:11:44 server id 1 end_log_pos 1509 CRC32 0x34672cb1 Table_map: `test`.`samplet` mapped to number 72 +# at 1509 +#160324 11:11:44 server id 1 end_log_pos 1549 CRC32 0x4383e9ee Delete_rows: table id 72 +# at 1549 +#160324 11:11:44 server id 1 end_log_pos 1612 CRC32 0x899eb398 Update_rows: table id 72 flags: STMT_END_F +### DELETE FROM `test`.`samplet` +### WHERE +### @1=2 +### UPDATE `test`.`samplet` +### WHERE +### @1=4 +### SET +### @1=2 +### @2=4 +### @3='replaced 2,4' +# at 1612 +#160324 11:11:44 server id 1 end_log_pos 1643 CRC32 0x037a8fe1 Xid = 49977 +COMMIT/*!*/; +# at 1643 +#160324 11:11:54 server id 1 end_log_pos 1715 CRC32 0xb02520cd Query thread_id=7940 exec_time=0 error_code=0 +SET TIMESTAMP=1458814314/*!*/; +BEGIN +/*!*/; +# at 1715 +#160324 11:11:54 server id 1 end_log_pos 1769 CRC32 0xcbcf4323 Table_map: `test`.`samplet` mapped to number 72 +# at 1769 +#160324 11:11:54 server id 1 end_log_pos 1815 CRC32 0x4d52b057 Write_rows: table id 72 flags: STMT_END_F +### INSERT INTO `test`.`samplet` +### SET +### @1=7 +### @2=7 +### @3='7' +# at 1815 +#160324 11:11:54 server id 1 end_log_pos 1846 CRC32 0x5289b6a4 Xid = 50001 +COMMIT/*!*/; +# at 1846 +#160324 11:11:59 server id 1 end_log_pos 1918 CRC32 0x1758ab97 Query thread_id=7940 exec_time=0 error_code=0 +SET TIMESTAMP=1458814319/*!*/; +BEGIN +/*!*/; +# at 1918 +#160324 11:11:59 server id 1 end_log_pos 1972 CRC32 0xa4602796 Table_map: `test`.`samplet` mapped to number 72 +# at 1972 +#160324 11:11:59 server id 1 end_log_pos 2018 CRC32 0x6a6eb0c9 Write_rows: table id 72 flags: STMT_END_F +### INSERT INTO `test`.`samplet` +### SET +### @1=8 +### @2=8 +### @3='8' +# at 2018 +#160324 11:11:59 server id 1 end_log_pos 2049 CRC32 0x6d0fef4d Xid = 50014 +COMMIT/*!*/; +# at 2049 +#160324 11:12:12 server id 1 end_log_pos 2121 CRC32 0x6cd5da13 Query thread_id=7940 exec_time=0 error_code=0 +SET TIMESTAMP=1458814332/*!*/; +BEGIN +/*!*/; +# at 2121 +#160324 11:12:12 server id 1 end_log_pos 2175 CRC32 0x8339241f Table_map: `test`.`samplet` mapped to number 72 +# at 2175 +#160324 11:12:12 server id 1 end_log_pos 2220 CRC32 0x669385e1 Delete_rows: table id 72 flags: STMT_END_F +### DELETE FROM `test`.`samplet` +### WHERE +### @1=7 +### DELETE FROM `test`.`samplet` +### WHERE +### @1=8 +# at 2220 +#160324 11:12:12 server id 1 end_log_pos 2251 CRC32 0xba81d2b0 Xid = 50038 +COMMIT/*!*/; +# at 2251 +#160324 11:12:20 server id 1 end_log_pos 2323 CRC32 0x4c58be8c Query thread_id=7940 exec_time=0 error_code=0 +SET TIMESTAMP=1458814340/*!*/; +BEGIN +/*!*/; +# at 2323 +#160324 11:12:20 server id 1 end_log_pos 2377 CRC32 0x9eb23ab9 Table_map: `test`.`samplet` mapped to number 72 +# at 2377 +#160324 11:12:20 server id 1 end_log_pos 2423 CRC32 0xac8116ec Write_rows: table id 72 flags: STMT_END_F +### INSERT INTO `test`.`samplet` +### SET +### @1=9 +### @2=9 +### @3='9' +# at 2423 +#160324 11:12:20 server id 1 end_log_pos 2454 CRC32 0x5ce77ad6 Xid = 50051 +COMMIT/*!*/; +# at 2454 +#160324 11:12:50 server id 1 end_log_pos 2526 CRC32 0xed19acbd Query thread_id=7940 exec_time=26 error_code=0 +SET TIMESTAMP=1458814370/*!*/; +BEGIN +/*!*/; +# at 2526 +#160324 11:12:50 server id 1 end_log_pos 2580 CRC32 0x0bf6b98f Table_map: `test`.`samplet` mapped to number 72 +# at 2580 +#160324 11:12:50 server id 1 end_log_pos 2631 CRC32 0x263c4579 Update_rows: table id 72 flags: STMT_END_F +### UPDATE `test`.`samplet` +### WHERE +### @1=9 +### SET +### @3='update 9' +# at 2631 +#160324 11:13:00 server id 1 end_log_pos 2685 CRC32 0x94b24c8b Table_map: `test`.`samplet` mapped to number 72 +# at 2685 +#160324 11:13:00 server id 1 end_log_pos 2725 CRC32 0xca43fe3a Delete_rows: table id 72 flags: STMT_END_F +### DELETE FROM `test`.`samplet` +### WHERE +### @1=3 +# at 2725 +#160324 11:13:14 server id 1 end_log_pos 2779 CRC32 0xc36088a2 Table_map: `test`.`samplet` mapped to number 72 +# at 2779 +#160324 11:13:14 server id 1 end_log_pos 2826 CRC32 0x98fc9dea Write_rows: table id 72 flags: STMT_END_F +### INSERT INTO `test`.`samplet` +### SET +### @1=10 +### @2=10 +### @3='10' +# at 2826 +#160324 11:13:14 server id 1 end_log_pos 2857 CRC32 0x729c371f Xid = 50163 +COMMIT/*!*/; +# at 2857 +#160324 11:13:31 server id 104 end_log_pos 2904 CRC32 0x38531c7d Rotate to mysql-bin.000053 pos: 4 +DELIMITER ; +# End of log file +ROLLBACK /* added by mysqlbinlog */; +/*!50003 SET COMPLETION_TYPE=@OLD_COMPLETION_TYPE*/; +/*!50530 SET @@SESSION.PSEUDO_SLAVE_MODE=0*/;