2016-03-22 15:12:51 +01:00
/ *
2016-03-24 15:11:56 +01:00
Copyright 2016 GitHub Inc .
See https : //github.com/github/gh-osc/blob/master/LICENSE
2016-03-22 15:12:51 +01:00
* /
package binlog
2016-03-23 12:40:17 +01:00
import (
"bufio"
"bytes"
"fmt"
"path"
"regexp"
"strconv"
"strings"
"github.com/github/gh-osc/go/os"
"github.com/outbrain/golib/log"
)
var (
binlogChunkSizeBytes uint64 = 32 * 1024 * 1024
startEntryRegexp = regexp . MustCompile ( "^# at ([0-9]+)$" )
startEntryUnknownTableRegexp = regexp . MustCompile ( "^### Row event for unknown table .*? at ([0-9]+)$" )
endLogPosRegexp = regexp . MustCompile ( "^#[0-9]{6} .*? end_log_pos ([0-9]+)" )
statementRegxp = regexp . MustCompile ( "### (INSERT INTO|UPDATE|DELETE FROM) `(.*?)`[.]`(.*?)`" )
2016-03-24 14:25:52 +01:00
tokenRegxp = regexp . MustCompile ( "### (WHERE|SET)$" )
2016-03-24 22:27:39 +01:00
positionalColumnRegexp = regexp . MustCompile ( "### @([0-9]+)=(.+)$" )
2016-03-23 12:40:17 +01:00
)
2016-03-24 15:11:56 +01:00
// BinlogEntryState is a state in the binlog parser automaton / state machine
2016-03-23 15:25:45 +01:00
type BinlogEntryState string
2016-03-24 15:11:56 +01:00
// States of the state machine
2016-03-23 15:25:45 +01:00
const (
2016-03-24 14:25:52 +01:00
InvalidState BinlogEntryState = "InvalidState"
SearchForStartPosOrStatementState = "SearchForStartPosOrStatementState"
ExpectEndLogPosState = "ExpectEndLogPosState"
ExpectTokenState = "ExpectTokenState"
PositionalColumnAssignmentState = "PositionalColumnAssignmentState"
2016-03-23 15:25:45 +01:00
)
2016-03-23 12:40:17 +01:00
// MySQLBinlogReader reads binary log entries by executing the `mysqlbinlog`
// process and textually parsing its output
2016-03-22 15:12:51 +01:00
type MySQLBinlogReader struct {
2016-03-23 12:40:17 +01:00
Basedir string
Datadir string
MySQLBinlogBinary string
}
2016-03-24 15:11:56 +01:00
// NewMySQLBinlogReader creates a new reader that directly parses binlog files from the filesystem
2016-03-23 12:40:17 +01:00
func NewMySQLBinlogReader ( basedir string , datadir string ) ( mySQLBinlogReader * MySQLBinlogReader ) {
mySQLBinlogReader = & MySQLBinlogReader {
Basedir : basedir ,
Datadir : datadir ,
}
mySQLBinlogReader . MySQLBinlogBinary = path . Join ( mySQLBinlogReader . Basedir , "bin/mysqlbinlog" )
return mySQLBinlogReader
}
// ReadEntries will read binlog entries from parsed text output of `mysqlbinlog` utility
func ( this * MySQLBinlogReader ) ReadEntries ( logFile string , startPos uint64 , stopPos uint64 ) ( entries [ ] ( * BinlogEntry ) , err error ) {
if startPos == 0 {
startPos = 4
}
done := false
chunkStartPos := startPos
for ! done {
chunkStopPos := chunkStartPos + binlogChunkSizeBytes
if chunkStopPos > stopPos && stopPos != 0 {
chunkStopPos = stopPos
}
log . Debugf ( "Next chunk range %d - %d" , chunkStartPos , chunkStopPos )
binlogFilePath := path . Join ( this . Datadir , logFile )
command := fmt . Sprintf ( ` %s --verbose --base64-output=DECODE-ROWS --start-position=%d --stop-position=%d %s ` , this . MySQLBinlogBinary , chunkStartPos , chunkStopPos , binlogFilePath )
entriesBytes , err := os . RunCommandWithOutput ( command )
if err != nil {
return entries , log . Errore ( err )
}
2016-03-24 14:25:52 +01:00
chunkEntries , err := parseEntries ( bufio . NewScanner ( bytes . NewReader ( entriesBytes ) ) )
2016-03-23 12:40:17 +01:00
if err != nil {
return entries , log . Errore ( err )
}
if len ( chunkEntries ) == 0 {
done = true
} else {
entries = append ( entries , chunkEntries ... )
lastChunkEntry := chunkEntries [ len ( chunkEntries ) - 1 ]
chunkStartPos = lastChunkEntry . EndLogPos
}
}
return entries , err
2016-03-22 15:12:51 +01:00
}
2016-03-24 15:11:56 +01:00
// automaton step: accept wither beginning of new entry, or beginning of new statement
2016-03-24 14:25:52 +01:00
func searchForStartPosOrStatement ( scanner * bufio . Scanner , binlogEntry * BinlogEntry , previousEndLogPos uint64 ) ( nextState BinlogEntryState , nextBinlogEntry * BinlogEntry , err error ) {
2016-03-23 15:25:45 +01:00
onStartEntry := func ( submatch [ ] string ) ( BinlogEntryState , * BinlogEntry , error ) {
startLogPos , _ := strconv . ParseUint ( submatch [ 1 ] , 10 , 64 )
2016-03-23 12:40:17 +01:00
2016-03-23 15:25:45 +01:00
if previousEndLogPos != 0 && startLogPos != previousEndLogPos {
return InvalidState , binlogEntry , fmt . Errorf ( "Expected startLogPos %+v to equal previous endLogPos %+v" , startLogPos , previousEndLogPos )
}
nextBinlogEntry = binlogEntry
if binlogEntry . LogPos != 0 && binlogEntry . StatementType != "" {
// Current entry is already a true entry, with startpos and with statement
2016-03-24 22:27:39 +01:00
nextBinlogEntry = NewBinlogEntry ( )
2016-03-23 15:25:45 +01:00
}
nextBinlogEntry . LogPos = startLogPos
return ExpectEndLogPosState , nextBinlogEntry , nil
}
2016-03-23 12:40:17 +01:00
2016-03-24 14:25:52 +01:00
onStatementEntry := func ( submatch [ ] string ) ( BinlogEntryState , * BinlogEntry , error ) {
nextBinlogEntry = binlogEntry
if binlogEntry . LogPos != 0 && binlogEntry . StatementType != "" {
// Current entry is already a true entry, with startpos and with statement
2016-03-24 22:27:39 +01:00
nextBinlogEntry = binlogEntry . Duplicate ( )
2016-03-24 14:25:52 +01:00
}
nextBinlogEntry . StatementType = strings . Split ( submatch [ 1 ] , " " ) [ 0 ]
nextBinlogEntry . DatabaseName = submatch [ 2 ]
nextBinlogEntry . TableName = submatch [ 3 ]
return ExpectTokenState , nextBinlogEntry , nil
}
2016-03-24 22:27:39 +01:00
onPositionalColumn := func ( submatch [ ] string ) ( BinlogEntryState , * BinlogEntry , error ) {
columnIndex , _ := strconv . ParseUint ( submatch [ 1 ] , 10 , 64 )
if _ , found := binlogEntry . PositionalColumns [ columnIndex ] ; found {
return InvalidState , binlogEntry , fmt . Errorf ( "Positional column %+v found more than once in %+v, statement=%+v" , columnIndex , binlogEntry . LogPos , binlogEntry . StatementType )
}
columnValue := submatch [ 2 ]
columnValue = strings . TrimPrefix ( columnValue , "'" )
columnValue = strings . TrimSuffix ( columnValue , "'" )
binlogEntry . PositionalColumns [ columnIndex ] = columnValue
return SearchForStartPosOrStatementState , binlogEntry , nil
}
2016-03-23 15:25:45 +01:00
line := scanner . Text ( )
if submatch := startEntryRegexp . FindStringSubmatch ( line ) ; len ( submatch ) > 1 {
return onStartEntry ( submatch )
}
if submatch := startEntryUnknownTableRegexp . FindStringSubmatch ( line ) ; len ( submatch ) > 1 {
return onStartEntry ( submatch )
}
2016-03-24 14:25:52 +01:00
if submatch := statementRegxp . FindStringSubmatch ( line ) ; len ( submatch ) > 1 {
return onStatementEntry ( submatch )
}
2016-03-24 22:27:39 +01:00
if submatch := positionalColumnRegexp . FindStringSubmatch ( line ) ; len ( submatch ) > 1 {
return onPositionalColumn ( submatch )
}
2016-03-24 14:25:52 +01:00
// Haven't found a match
return SearchForStartPosOrStatementState , binlogEntry , nil
2016-03-23 15:25:45 +01:00
}
2016-03-23 12:40:17 +01:00
2016-03-24 15:11:56 +01:00
// automaton step: expect an end_log_pos line`
2016-03-23 15:25:45 +01:00
func expectEndLogPos ( scanner * bufio . Scanner , binlogEntry * BinlogEntry ) ( nextState BinlogEntryState , err error ) {
line := scanner . Text ( )
submatch := endLogPosRegexp . FindStringSubmatch ( line )
2016-03-24 14:25:52 +01:00
if len ( submatch ) > 1 {
binlogEntry . EndLogPos , _ = strconv . ParseUint ( submatch [ 1 ] , 10 , 64 )
return SearchForStartPosOrStatementState , nil
2016-03-23 15:25:45 +01:00
}
2016-03-24 14:25:52 +01:00
return InvalidState , fmt . Errorf ( "Expected to find end_log_pos following pos %+v" , binlogEntry . LogPos )
2016-03-23 15:25:45 +01:00
}
2016-03-23 12:40:17 +01:00
2016-03-24 15:11:56 +01:00
// automaton step: a not-strictly-required but good-to-have-around validation that
// we see an expected token following a statement
2016-03-24 14:25:52 +01:00
func expectToken ( scanner * bufio . Scanner , binlogEntry * BinlogEntry ) ( nextState BinlogEntryState , err error ) {
2016-03-23 15:25:45 +01:00
line := scanner . Text ( )
2016-03-24 14:25:52 +01:00
if submatch := tokenRegxp . FindStringSubmatch ( line ) ; len ( submatch ) > 1 {
return SearchForStartPosOrStatementState , nil
2016-03-23 15:25:45 +01:00
}
2016-03-24 14:25:52 +01:00
return InvalidState , fmt . Errorf ( "Expected to find token following pos %+v" , binlogEntry . LogPos )
2016-03-23 15:25:45 +01:00
}
2016-03-24 15:11:56 +01:00
// parseEntries will parse output of `mysqlbinlog --verbose --base64-output=DECODE-ROWS`
// It issues an automaton / state machine to do its thang.
2016-03-24 14:25:52 +01:00
func parseEntries ( scanner * bufio . Scanner ) ( entries [ ] ( * BinlogEntry ) , err error ) {
2016-03-24 22:27:39 +01:00
binlogEntry := NewBinlogEntry ( )
2016-03-24 14:25:52 +01:00
var state BinlogEntryState = SearchForStartPosOrStatementState
2016-03-23 15:25:45 +01:00
var endLogPos uint64
appendBinlogEntry := func ( ) {
2016-03-24 14:52:49 +01:00
if binlogEntry . LogPos == 0 {
return
2016-03-23 12:40:17 +01:00
}
2016-03-24 14:52:49 +01:00
if binlogEntry . StatementType == "" {
return
}
entries = append ( entries , binlogEntry )
log . Debugf ( "entry: %+v" , * binlogEntry )
fmt . Println ( fmt . Sprintf ( "%s `%s`.`%s`" , binlogEntry . StatementType , binlogEntry . DatabaseName , binlogEntry . TableName ) )
2016-03-23 15:25:45 +01:00
}
for scanner . Scan ( ) {
switch state {
2016-03-24 14:25:52 +01:00
case SearchForStartPosOrStatementState :
2016-03-23 15:25:45 +01:00
{
var nextBinlogEntry * BinlogEntry
2016-03-24 14:25:52 +01:00
state , nextBinlogEntry , err = searchForStartPosOrStatement ( scanner , binlogEntry , endLogPos )
2016-03-23 15:25:45 +01:00
if nextBinlogEntry != binlogEntry {
appendBinlogEntry ( )
binlogEntry = nextBinlogEntry
}
2016-03-23 12:40:17 +01:00
}
2016-03-23 15:25:45 +01:00
case ExpectEndLogPosState :
{
state , err = expectEndLogPos ( scanner , binlogEntry )
2016-03-23 12:40:17 +01:00
}
2016-03-24 14:25:52 +01:00
case ExpectTokenState :
2016-03-23 15:25:45 +01:00
{
2016-03-24 14:25:52 +01:00
state , err = expectToken ( scanner , binlogEntry )
2016-03-23 15:25:45 +01:00
}
default :
{
err = fmt . Errorf ( "Unexpected state %+v" , state )
2016-03-23 12:40:17 +01:00
}
}
2016-03-23 15:25:45 +01:00
if err != nil {
return entries , log . Errore ( err )
}
2016-03-23 12:40:17 +01:00
}
2016-03-24 14:52:49 +01:00
appendBinlogEntry ( )
2016-03-22 15:12:51 +01:00
return entries , err
}