From 1768b55b3b36f4ae794800ff3f65711488e563ca Mon Sep 17 00:00:00 2001 From: Shlomi Noach Date: Wed, 23 Mar 2016 15:25:45 +0100 Subject: [PATCH] mysqlbinlog_reader is now a simple state machine --- go/binlog/mysqlbinlog_reader.go | 143 +++++++++++++++++++++----------- 1 file changed, 96 insertions(+), 47 deletions(-) diff --git a/go/binlog/mysqlbinlog_reader.go b/go/binlog/mysqlbinlog_reader.go index aa5356a..28c733d 100644 --- a/go/binlog/mysqlbinlog_reader.go +++ b/go/binlog/mysqlbinlog_reader.go @@ -25,6 +25,17 @@ var ( statementRegxp = regexp.MustCompile("### (INSERT INTO|UPDATE|DELETE FROM) `(.*?)`[.]`(.*?)`") ) +type BinlogEntryState string + +const ( + InvalidState BinlogEntryState = "InvalidState" + SearchForStartPosState = "SearchForStartPosState" + ExpectEndLogPosState = "ExpectEndLogPosState" + SearchForStatementState = "SearchForStatementState" + ExpectTokenState = "ExpectTokenState" + PositionalColumnAssignmentState = "PositionalColumnAssignmentState" +) + // MySQLBinlogReader reads binary log entries by executing the `mysqlbinlog` // process and textually parsing its output type MySQLBinlogReader struct { @@ -77,64 +88,102 @@ func (this *MySQLBinlogReader) ReadEntries(logFile string, startPos uint64, stop return entries, err } +func searchForStartPos(scanner *bufio.Scanner, binlogEntry *BinlogEntry, previousEndLogPos uint64) (nextState BinlogEntryState, nextBinlogEntry *BinlogEntry, err error) { + + onStartEntry := func(submatch []string) (BinlogEntryState, *BinlogEntry, error) { + startLogPos, _ := strconv.ParseUint(submatch[1], 10, 64) + + if previousEndLogPos != 0 && startLogPos != previousEndLogPos { + return InvalidState, binlogEntry, fmt.Errorf("Expected startLogPos %+v to equal previous endLogPos %+v", startLogPos, previousEndLogPos) + } + nextBinlogEntry = binlogEntry + if binlogEntry.LogPos != 0 && binlogEntry.StatementType != "" { + // Current entry is already a true entry, with startpos and with statement + nextBinlogEntry = &BinlogEntry{} + } + + nextBinlogEntry.LogPos = startLogPos + return ExpectEndLogPosState, nextBinlogEntry, nil + } + + line := scanner.Text() + if submatch := startEntryRegexp.FindStringSubmatch(line); len(submatch) > 1 { + return onStartEntry(submatch) + } + if submatch := startEntryUnknownTableRegexp.FindStringSubmatch(line); len(submatch) > 1 { + return onStartEntry(submatch) + } + // Haven't found a start entry + return SearchForStartPosState, binlogEntry, nil +} + +func expectEndLogPos(scanner *bufio.Scanner, binlogEntry *BinlogEntry) (nextState BinlogEntryState, err error) { + line := scanner.Text() + + submatch := endLogPosRegexp.FindStringSubmatch(line) + if len(submatch) <= 1 { + return InvalidState, fmt.Errorf("Expected to find end_log_pos following pos %+v", binlogEntry.LogPos) + } + binlogEntry.EndLogPos, _ = strconv.ParseUint(submatch[1], 10, 64) + + return SearchForStatementState, nil +} + +func searchForStatement(scanner *bufio.Scanner, binlogEntry *BinlogEntry) (nextState BinlogEntryState, err error) { + line := scanner.Text() + + if submatch := statementRegxp.FindStringSubmatch(line); len(submatch) > 1 { + binlogEntry.StatementType = strings.Split(submatch[1], " ")[0] + binlogEntry.DatabaseName = submatch[2] + binlogEntry.TableName = submatch[3] + + return SearchForStartPosState, nil + } + return SearchForStatementState, nil +} + func parseEntries(entriesBytes []byte) (entries [](*BinlogEntry), err error) { scanner := bufio.NewScanner(bytes.NewReader(entriesBytes)) - expectEndLogPos := false - var startLogPos uint64 + binlogEntry := &BinlogEntry{} + var state BinlogEntryState = SearchForStartPosState var endLogPos uint64 - binlogEntry := &BinlogEntry{} - + appendBinlogEntry := func() { + entries = append(entries, binlogEntry) + if binlogEntry.StatementType != "" { + log.Debugf("entry: %+v", *binlogEntry) + } + } for scanner.Scan() { - line := scanner.Text() - - onStartEntry := func(submatch []string) error { - startLogPos, _ = strconv.ParseUint(submatch[1], 10, 64) - - if endLogPos != 0 && startLogPos != endLogPos { - return fmt.Errorf("Expected startLogPos %+v to equal previous endLogPos %+v", startLogPos, endLogPos) + switch state { + case SearchForStartPosState: + { + var nextBinlogEntry *BinlogEntry + state, nextBinlogEntry, err = searchForStartPos(scanner, binlogEntry, endLogPos) + if nextBinlogEntry != binlogEntry { + appendBinlogEntry() + binlogEntry = nextBinlogEntry + } } - // We are entering a new entry, let's push the previous one - if binlogEntry.LogPos != 0 && binlogEntry.StatementType != "" { - entries = append(entries, binlogEntry) - log.Debugf("entry: %+v", *binlogEntry) - binlogEntry = &BinlogEntry{} + case ExpectEndLogPosState: + { + state, err = expectEndLogPos(scanner, binlogEntry) + } + case SearchForStatementState: + { + state, err = searchForStatement(scanner, binlogEntry) + } + default: + { + err = fmt.Errorf("Unexpected state %+v", state) } - - //log.Debugf(line) - binlogEntry.LogPos = startLogPos - // Next iteration we will read the end_log_pos - expectEndLogPos = true - - return nil } - if expectEndLogPos { - submatch := endLogPosRegexp.FindStringSubmatch(line) - if len(submatch) <= 1 { - return entries, log.Errorf("Expected to find end_log_pos following pos %+v", startLogPos) - } - endLogPos, _ = strconv.ParseUint(submatch[1], 10, 64) - - binlogEntry.EndLogPos = endLogPos - expectEndLogPos = false - } else if submatch := startEntryRegexp.FindStringSubmatch(line); len(submatch) > 1 { - if err := onStartEntry(submatch); err != nil { - return entries, log.Errore(err) - } - } else if submatch := startEntryUnknownTableRegexp.FindStringSubmatch(line); len(submatch) > 1 { - if err := onStartEntry(submatch); err != nil { - return entries, log.Errore(err) - } - } else if submatch := statementRegxp.FindStringSubmatch(line); len(submatch) > 1 { - binlogEntry.StatementType = strings.Split(submatch[1], " ")[0] - binlogEntry.DatabaseName = submatch[2] - binlogEntry.TableName = submatch[3] + if err != nil { + return entries, log.Errore(err) } - } if binlogEntry.LogPos != 0 { - entries = append(entries, binlogEntry) - log.Debugf("entry: %+v", *binlogEntry) + appendBinlogEntry() } return entries, err }