adjusted automaton to accept multi-statement entries; added tests

This commit is contained in:
Shlomi Noach 2016-03-24 14:25:52 +01:00
parent 1768b55b3b
commit 8aa6a9750c
4 changed files with 351 additions and 33 deletions

View File

@ -17,9 +17,10 @@
package binlog package binlog
import ( import (
"testing"
"github.com/outbrain/golib/log" "github.com/outbrain/golib/log"
test "github.com/outbrain/golib/tests" test "github.com/outbrain/golib/tests"
"testing"
) )
func init() { func init() {

View File

@ -23,17 +23,17 @@ var (
startEntryUnknownTableRegexp = regexp.MustCompile("^### Row event for unknown table .*? at ([0-9]+)$") startEntryUnknownTableRegexp = regexp.MustCompile("^### Row event for unknown table .*? at ([0-9]+)$")
endLogPosRegexp = regexp.MustCompile("^#[0-9]{6} .*? end_log_pos ([0-9]+)") endLogPosRegexp = regexp.MustCompile("^#[0-9]{6} .*? end_log_pos ([0-9]+)")
statementRegxp = regexp.MustCompile("### (INSERT INTO|UPDATE|DELETE FROM) `(.*?)`[.]`(.*?)`") statementRegxp = regexp.MustCompile("### (INSERT INTO|UPDATE|DELETE FROM) `(.*?)`[.]`(.*?)`")
tokenRegxp = regexp.MustCompile("### (WHERE|SET)$")
) )
type BinlogEntryState string type BinlogEntryState string
const ( const (
InvalidState BinlogEntryState = "InvalidState" InvalidState BinlogEntryState = "InvalidState"
SearchForStartPosState = "SearchForStartPosState" SearchForStartPosOrStatementState = "SearchForStartPosOrStatementState"
ExpectEndLogPosState = "ExpectEndLogPosState" ExpectEndLogPosState = "ExpectEndLogPosState"
SearchForStatementState = "SearchForStatementState" ExpectTokenState = "ExpectTokenState"
ExpectTokenState = "ExpectTokenState" PositionalColumnAssignmentState = "PositionalColumnAssignmentState"
PositionalColumnAssignmentState = "PositionalColumnAssignmentState"
) )
// MySQLBinlogReader reads binary log entries by executing the `mysqlbinlog` // MySQLBinlogReader reads binary log entries by executing the `mysqlbinlog`
@ -72,7 +72,8 @@ func (this *MySQLBinlogReader) ReadEntries(logFile string, startPos uint64, stop
if err != nil { if err != nil {
return entries, log.Errore(err) return entries, log.Errore(err)
} }
chunkEntries, err := parseEntries(entriesBytes)
chunkEntries, err := parseEntries(bufio.NewScanner(bytes.NewReader(entriesBytes)))
if err != nil { if err != nil {
return entries, log.Errore(err) return entries, log.Errore(err)
} }
@ -88,7 +89,7 @@ func (this *MySQLBinlogReader) ReadEntries(logFile string, startPos uint64, stop
return entries, err return entries, err
} }
func searchForStartPos(scanner *bufio.Scanner, binlogEntry *BinlogEntry, previousEndLogPos uint64) (nextState BinlogEntryState, nextBinlogEntry *BinlogEntry, err error) { func searchForStartPosOrStatement(scanner *bufio.Scanner, binlogEntry *BinlogEntry, previousEndLogPos uint64) (nextState BinlogEntryState, nextBinlogEntry *BinlogEntry, err error) {
onStartEntry := func(submatch []string) (BinlogEntryState, *BinlogEntry, error) { onStartEntry := func(submatch []string) (BinlogEntryState, *BinlogEntry, error) {
startLogPos, _ := strconv.ParseUint(submatch[1], 10, 64) startLogPos, _ := strconv.ParseUint(submatch[1], 10, 64)
@ -106,6 +107,20 @@ func searchForStartPos(scanner *bufio.Scanner, binlogEntry *BinlogEntry, previou
return ExpectEndLogPosState, nextBinlogEntry, nil return ExpectEndLogPosState, nextBinlogEntry, nil
} }
onStatementEntry := func(submatch []string) (BinlogEntryState, *BinlogEntry, error) {
nextBinlogEntry = binlogEntry
if binlogEntry.LogPos != 0 && binlogEntry.StatementType != "" {
// Current entry is already a true entry, with startpos and with statement
nextBinlogEntry = &BinlogEntry{LogPos: binlogEntry.LogPos, EndLogPos: binlogEntry.EndLogPos}
}
nextBinlogEntry.StatementType = strings.Split(submatch[1], " ")[0]
nextBinlogEntry.DatabaseName = submatch[2]
nextBinlogEntry.TableName = submatch[3]
return ExpectTokenState, nextBinlogEntry, nil
}
line := scanner.Text() line := scanner.Text()
if submatch := startEntryRegexp.FindStringSubmatch(line); len(submatch) > 1 { if submatch := startEntryRegexp.FindStringSubmatch(line); len(submatch) > 1 {
return onStartEntry(submatch) return onStartEntry(submatch)
@ -113,53 +128,50 @@ func searchForStartPos(scanner *bufio.Scanner, binlogEntry *BinlogEntry, previou
if submatch := startEntryUnknownTableRegexp.FindStringSubmatch(line); len(submatch) > 1 { if submatch := startEntryUnknownTableRegexp.FindStringSubmatch(line); len(submatch) > 1 {
return onStartEntry(submatch) return onStartEntry(submatch)
} }
// Haven't found a start entry if submatch := statementRegxp.FindStringSubmatch(line); len(submatch) > 1 {
return SearchForStartPosState, binlogEntry, nil return onStatementEntry(submatch)
}
// Haven't found a match
return SearchForStartPosOrStatementState, binlogEntry, nil
} }
func expectEndLogPos(scanner *bufio.Scanner, binlogEntry *BinlogEntry) (nextState BinlogEntryState, err error) { func expectEndLogPos(scanner *bufio.Scanner, binlogEntry *BinlogEntry) (nextState BinlogEntryState, err error) {
line := scanner.Text() line := scanner.Text()
submatch := endLogPosRegexp.FindStringSubmatch(line) submatch := endLogPosRegexp.FindStringSubmatch(line)
if len(submatch) <= 1 { if len(submatch) > 1 {
return InvalidState, fmt.Errorf("Expected to find end_log_pos following pos %+v", binlogEntry.LogPos) binlogEntry.EndLogPos, _ = strconv.ParseUint(submatch[1], 10, 64)
return SearchForStartPosOrStatementState, nil
} }
binlogEntry.EndLogPos, _ = strconv.ParseUint(submatch[1], 10, 64) return InvalidState, fmt.Errorf("Expected to find end_log_pos following pos %+v", binlogEntry.LogPos)
return SearchForStatementState, nil
} }
func searchForStatement(scanner *bufio.Scanner, binlogEntry *BinlogEntry) (nextState BinlogEntryState, err error) { func expectToken(scanner *bufio.Scanner, binlogEntry *BinlogEntry) (nextState BinlogEntryState, err error) {
line := scanner.Text() line := scanner.Text()
if submatch := tokenRegxp.FindStringSubmatch(line); len(submatch) > 1 {
if submatch := statementRegxp.FindStringSubmatch(line); len(submatch) > 1 { return SearchForStartPosOrStatementState, nil
binlogEntry.StatementType = strings.Split(submatch[1], " ")[0]
binlogEntry.DatabaseName = submatch[2]
binlogEntry.TableName = submatch[3]
return SearchForStartPosState, nil
} }
return SearchForStatementState, nil return InvalidState, fmt.Errorf("Expected to find token following pos %+v", binlogEntry.LogPos)
} }
func parseEntries(entriesBytes []byte) (entries [](*BinlogEntry), err error) { func parseEntries(scanner *bufio.Scanner) (entries [](*BinlogEntry), err error) {
scanner := bufio.NewScanner(bytes.NewReader(entriesBytes))
binlogEntry := &BinlogEntry{} binlogEntry := &BinlogEntry{}
var state BinlogEntryState = SearchForStartPosState var state BinlogEntryState = SearchForStartPosOrStatementState
var endLogPos uint64 var endLogPos uint64
appendBinlogEntry := func() { appendBinlogEntry := func() {
entries = append(entries, binlogEntry)
if binlogEntry.StatementType != "" { if binlogEntry.StatementType != "" {
entries = append(entries, binlogEntry)
log.Debugf("entry: %+v", *binlogEntry) log.Debugf("entry: %+v", *binlogEntry)
//fmt.Println(fmt.Sprintf("%s `%s`.`%s`", binlogEntry.StatementType, binlogEntry.DatabaseName, binlogEntry.TableName))
} }
} }
for scanner.Scan() { for scanner.Scan() {
switch state { switch state {
case SearchForStartPosState: case SearchForStartPosOrStatementState:
{ {
var nextBinlogEntry *BinlogEntry var nextBinlogEntry *BinlogEntry
state, nextBinlogEntry, err = searchForStartPos(scanner, binlogEntry, endLogPos) state, nextBinlogEntry, err = searchForStartPosOrStatement(scanner, binlogEntry, endLogPos)
if nextBinlogEntry != binlogEntry { if nextBinlogEntry != binlogEntry {
appendBinlogEntry() appendBinlogEntry()
binlogEntry = nextBinlogEntry binlogEntry = nextBinlogEntry
@ -169,9 +181,9 @@ func parseEntries(entriesBytes []byte) (entries [](*BinlogEntry), err error) {
{ {
state, err = expectEndLogPos(scanner, binlogEntry) state, err = expectEndLogPos(scanner, binlogEntry)
} }
case SearchForStatementState: case ExpectTokenState:
{ {
state, err = searchForStatement(scanner, binlogEntry) state, err = expectToken(scanner, binlogEntry)
} }
default: default:
{ {

View File

@ -0,0 +1,61 @@
/*
Copyright 2014 Outbrain Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package binlog
import (
"bufio"
"os"
"testing"
"github.com/outbrain/golib/log"
test "github.com/outbrain/golib/tests"
)
func init() {
log.SetLevel(log.ERROR)
}
func TestRBRSample0(t *testing.T) {
testFile, err := os.Open("testdata/rbr-sample-0.txt")
test.S(t).ExpectNil(err)
defer testFile.Close()
scanner := bufio.NewScanner(testFile)
entries, err := parseEntries(scanner)
test.S(t).ExpectNil(err)
test.S(t).ExpectEquals(len(entries), 17)
test.S(t).ExpectEquals(entries[0].DatabaseName, "test")
test.S(t).ExpectEquals(entries[0].TableName, "samplet")
test.S(t).ExpectEquals(entries[0].StatementType, "INSERT")
test.S(t).ExpectEquals(entries[1].StatementType, "INSERT")
test.S(t).ExpectEquals(entries[2].StatementType, "INSERT")
test.S(t).ExpectEquals(entries[3].StatementType, "INSERT")
test.S(t).ExpectEquals(entries[4].StatementType, "INSERT")
test.S(t).ExpectEquals(entries[5].StatementType, "INSERT")
test.S(t).ExpectEquals(entries[6].StatementType, "UPDATE")
test.S(t).ExpectEquals(entries[7].StatementType, "DELETE")
test.S(t).ExpectEquals(entries[8].StatementType, "UPDATE")
test.S(t).ExpectEquals(entries[9].StatementType, "INSERT")
test.S(t).ExpectEquals(entries[10].StatementType, "INSERT")
test.S(t).ExpectEquals(entries[11].StatementType, "DELETE")
test.S(t).ExpectEquals(entries[12].StatementType, "DELETE")
test.S(t).ExpectEquals(entries[13].StatementType, "INSERT")
test.S(t).ExpectEquals(entries[14].StatementType, "UPDATE")
test.S(t).ExpectEquals(entries[15].StatementType, "DELETE")
test.S(t).ExpectEquals(entries[16].StatementType, "INSERT")
}

244
go/binlog/testdata/rbr-sample-0.txt vendored Normal file
View File

@ -0,0 +1,244 @@
/*!50530 SET @@SESSION.PSEUDO_SLAVE_MODE=1*/;
/*!40019 SET @@session.max_insert_delayed_threads=0*/;
/*!50003 SET @OLD_COMPLETION_TYPE=@@COMPLETION_TYPE,COMPLETION_TYPE=0*/;
DELIMITER /*!*/;
# at 4
#160324 11:06:59 server id 104 end_log_pos 120 CRC32 0x059174d8 Start: binlog v 4, server v 5.6.28-log created 160324 11:06:59
# at 120
#160324 11:09:53 server id 1 end_log_pos 313 CRC32 0x511efdf1 Query thread_id=7940 exec_time=0 error_code=0
use `test`/*!*/;
SET TIMESTAMP=1458814193/*!*/;
SET @@session.pseudo_thread_id=7940/*!*/;
SET @@session.foreign_key_checks=1, @@session.sql_auto_is_null=0, @@session.unique_checks=1, @@session.autocommit=1/*!*/;
SET @@session.sql_mode=1073741824/*!*/;
SET @@session.auto_increment_increment=1, @@session.auto_increment_offset=1/*!*/;
/*!\C utf8 *//*!*/;
SET @@session.character_set_client=33,@@session.collation_connection=33,@@session.collation_server=8/*!*/;
SET @@session.lc_time_names=0/*!*/;
SET @@session.collation_database=DEFAULT/*!*/;
create table samplet(id int primary key, license int, name varchar(64), unique key license_uidx(license)) engine=innodb
/*!*/;
# at 313
#160324 11:10:13 server id 1 end_log_pos 385 CRC32 0x6b95100a Query thread_id=7940 exec_time=0 error_code=0
SET TIMESTAMP=1458814213/*!*/;
BEGIN
/*!*/;
# at 385
#160324 11:10:13 server id 1 end_log_pos 439 CRC32 0xfa97ad69 Table_map: `test`.`samplet` mapped to number 72
# at 439
#160324 11:10:13 server id 1 end_log_pos 485 CRC32 0xae356826 Write_rows: table id 72 flags: STMT_END_F
### INSERT INTO `test`.`samplet`
### SET
### @1=1
### @2=1
### @3='a'
# at 485
#160324 11:10:13 server id 1 end_log_pos 516 CRC32 0xf60389e3 Xid = 49802
COMMIT/*!*/;
# at 516
#160324 11:10:35 server id 1 end_log_pos 588 CRC32 0x1a8730ad Query thread_id=7940 exec_time=0 error_code=0
SET TIMESTAMP=1458814235/*!*/;
BEGIN
/*!*/;
# at 588
#160324 11:10:35 server id 1 end_log_pos 642 CRC32 0xac564207 Table_map: `test`.`samplet` mapped to number 72
# at 642
#160324 11:10:35 server id 1 end_log_pos 713 CRC32 0x3020ee9e Write_rows: table id 72 flags: STMT_END_F
### INSERT INTO `test`.`samplet`
### SET
### @1=2
### @2=2
### @3='extended'
### INSERT INTO `test`.`samplet`
### SET
### @1=3
### @2=3
### @3='extended'
# at 713
#160324 11:10:35 server id 1 end_log_pos 744 CRC32 0x341f0c1d Xid = 49848
COMMIT/*!*/;
# at 744
#160324 11:10:47 server id 1 end_log_pos 816 CRC32 0x2454c8aa Query thread_id=7940 exec_time=14 error_code=0
SET TIMESTAMP=1458814247/*!*/;
BEGIN
/*!*/;
# at 816
#160324 11:10:47 server id 1 end_log_pos 870 CRC32 0x92018566 Table_map: `test`.`samplet` mapped to number 72
# at 870
#160324 11:10:47 server id 1 end_log_pos 926 CRC32 0x5b882310 Write_rows: table id 72 flags: STMT_END_F
### INSERT INTO `test`.`samplet`
### SET
### @1=4
### @2=4
### @3='transaction'
# at 926
#160324 11:10:54 server id 1 end_log_pos 980 CRC32 0x374b624b Table_map: `test`.`samplet` mapped to number 72
# at 980
#160324 11:10:54 server id 1 end_log_pos 1036 CRC32 0xfff6a2b9 Write_rows: table id 72 flags: STMT_END_F
### INSERT INTO `test`.`samplet`
### SET
### @1=5
### @2=5
### @3='transaction'
# at 1036
#160324 11:10:59 server id 1 end_log_pos 1090 CRC32 0x37e19690 Table_map: `test`.`samplet` mapped to number 72
# at 1090
#160324 11:10:59 server id 1 end_log_pos 1146 CRC32 0x58a01053 Write_rows: table id 72 flags: STMT_END_F
### INSERT INTO `test`.`samplet`
### SET
### @1=6
### @2=6
### @3='transaction'
# at 1146
#160324 11:10:59 server id 1 end_log_pos 1177 CRC32 0xdd5de027 Xid = 49894
COMMIT/*!*/;
# at 1177
#160324 11:11:16 server id 1 end_log_pos 1249 CRC32 0x5c4a609b Query thread_id=7940 exec_time=0 error_code=0
SET TIMESTAMP=1458814276/*!*/;
BEGIN
/*!*/;
# at 1249
#160324 11:11:16 server id 1 end_log_pos 1303 CRC32 0x9d3c756b Table_map: `test`.`samplet` mapped to number 72
# at 1303
#160324 11:11:16 server id 1 end_log_pos 1352 CRC32 0x9b0d2ff4 Update_rows: table id 72 flags: STMT_END_F
### UPDATE `test`.`samplet`
### WHERE
### @1=5
### SET
### @3='update'
# at 1352
#160324 11:11:16 server id 1 end_log_pos 1383 CRC32 0x8e051bed Xid = 49931
COMMIT/*!*/;
# at 1383
#160324 11:11:44 server id 1 end_log_pos 1455 CRC32 0xe9744e83 Query thread_id=7940 exec_time=0 error_code=0
SET TIMESTAMP=1458814304/*!*/;
BEGIN
/*!*/;
# at 1455
#160324 11:11:44 server id 1 end_log_pos 1509 CRC32 0x34672cb1 Table_map: `test`.`samplet` mapped to number 72
# at 1509
#160324 11:11:44 server id 1 end_log_pos 1549 CRC32 0x4383e9ee Delete_rows: table id 72
# at 1549
#160324 11:11:44 server id 1 end_log_pos 1612 CRC32 0x899eb398 Update_rows: table id 72 flags: STMT_END_F
### DELETE FROM `test`.`samplet`
### WHERE
### @1=2
### UPDATE `test`.`samplet`
### WHERE
### @1=4
### SET
### @1=2
### @2=4
### @3='replaced 2,4'
# at 1612
#160324 11:11:44 server id 1 end_log_pos 1643 CRC32 0x037a8fe1 Xid = 49977
COMMIT/*!*/;
# at 1643
#160324 11:11:54 server id 1 end_log_pos 1715 CRC32 0xb02520cd Query thread_id=7940 exec_time=0 error_code=0
SET TIMESTAMP=1458814314/*!*/;
BEGIN
/*!*/;
# at 1715
#160324 11:11:54 server id 1 end_log_pos 1769 CRC32 0xcbcf4323 Table_map: `test`.`samplet` mapped to number 72
# at 1769
#160324 11:11:54 server id 1 end_log_pos 1815 CRC32 0x4d52b057 Write_rows: table id 72 flags: STMT_END_F
### INSERT INTO `test`.`samplet`
### SET
### @1=7
### @2=7
### @3='7'
# at 1815
#160324 11:11:54 server id 1 end_log_pos 1846 CRC32 0x5289b6a4 Xid = 50001
COMMIT/*!*/;
# at 1846
#160324 11:11:59 server id 1 end_log_pos 1918 CRC32 0x1758ab97 Query thread_id=7940 exec_time=0 error_code=0
SET TIMESTAMP=1458814319/*!*/;
BEGIN
/*!*/;
# at 1918
#160324 11:11:59 server id 1 end_log_pos 1972 CRC32 0xa4602796 Table_map: `test`.`samplet` mapped to number 72
# at 1972
#160324 11:11:59 server id 1 end_log_pos 2018 CRC32 0x6a6eb0c9 Write_rows: table id 72 flags: STMT_END_F
### INSERT INTO `test`.`samplet`
### SET
### @1=8
### @2=8
### @3='8'
# at 2018
#160324 11:11:59 server id 1 end_log_pos 2049 CRC32 0x6d0fef4d Xid = 50014
COMMIT/*!*/;
# at 2049
#160324 11:12:12 server id 1 end_log_pos 2121 CRC32 0x6cd5da13 Query thread_id=7940 exec_time=0 error_code=0
SET TIMESTAMP=1458814332/*!*/;
BEGIN
/*!*/;
# at 2121
#160324 11:12:12 server id 1 end_log_pos 2175 CRC32 0x8339241f Table_map: `test`.`samplet` mapped to number 72
# at 2175
#160324 11:12:12 server id 1 end_log_pos 2220 CRC32 0x669385e1 Delete_rows: table id 72 flags: STMT_END_F
### DELETE FROM `test`.`samplet`
### WHERE
### @1=7
### DELETE FROM `test`.`samplet`
### WHERE
### @1=8
# at 2220
#160324 11:12:12 server id 1 end_log_pos 2251 CRC32 0xba81d2b0 Xid = 50038
COMMIT/*!*/;
# at 2251
#160324 11:12:20 server id 1 end_log_pos 2323 CRC32 0x4c58be8c Query thread_id=7940 exec_time=0 error_code=0
SET TIMESTAMP=1458814340/*!*/;
BEGIN
/*!*/;
# at 2323
#160324 11:12:20 server id 1 end_log_pos 2377 CRC32 0x9eb23ab9 Table_map: `test`.`samplet` mapped to number 72
# at 2377
#160324 11:12:20 server id 1 end_log_pos 2423 CRC32 0xac8116ec Write_rows: table id 72 flags: STMT_END_F
### INSERT INTO `test`.`samplet`
### SET
### @1=9
### @2=9
### @3='9'
# at 2423
#160324 11:12:20 server id 1 end_log_pos 2454 CRC32 0x5ce77ad6 Xid = 50051
COMMIT/*!*/;
# at 2454
#160324 11:12:50 server id 1 end_log_pos 2526 CRC32 0xed19acbd Query thread_id=7940 exec_time=26 error_code=0
SET TIMESTAMP=1458814370/*!*/;
BEGIN
/*!*/;
# at 2526
#160324 11:12:50 server id 1 end_log_pos 2580 CRC32 0x0bf6b98f Table_map: `test`.`samplet` mapped to number 72
# at 2580
#160324 11:12:50 server id 1 end_log_pos 2631 CRC32 0x263c4579 Update_rows: table id 72 flags: STMT_END_F
### UPDATE `test`.`samplet`
### WHERE
### @1=9
### SET
### @3='update 9'
# at 2631
#160324 11:13:00 server id 1 end_log_pos 2685 CRC32 0x94b24c8b Table_map: `test`.`samplet` mapped to number 72
# at 2685
#160324 11:13:00 server id 1 end_log_pos 2725 CRC32 0xca43fe3a Delete_rows: table id 72 flags: STMT_END_F
### DELETE FROM `test`.`samplet`
### WHERE
### @1=3
# at 2725
#160324 11:13:14 server id 1 end_log_pos 2779 CRC32 0xc36088a2 Table_map: `test`.`samplet` mapped to number 72
# at 2779
#160324 11:13:14 server id 1 end_log_pos 2826 CRC32 0x98fc9dea Write_rows: table id 72 flags: STMT_END_F
### INSERT INTO `test`.`samplet`
### SET
### @1=10
### @2=10
### @3='10'
# at 2826
#160324 11:13:14 server id 1 end_log_pos 2857 CRC32 0x729c371f Xid = 50163
COMMIT/*!*/;
# at 2857
#160324 11:13:31 server id 104 end_log_pos 2904 CRC32 0x38531c7d Rotate to mysql-bin.000053 pos: 4
DELIMITER ;
# End of log file
ROLLBACK /* added by mysqlbinlog */;
/*!50003 SET COMPLETION_TYPE=@OLD_COMPLETION_TYPE*/;
/*!50530 SET @@SESSION.PSEUDO_SLAVE_MODE=0*/;