2016-03-22 14:12:51 +00:00
/ *
2016-03-24 14:11:56 +00:00
Copyright 2016 GitHub Inc .
2016-05-16 09:09:17 +00:00
See https : //github.com/github/gh-ost/blob/master/LICENSE
2016-03-22 14:12:51 +00:00
* /
package binlog
2016-03-23 11:40:17 +00:00
import (
"bufio"
"bytes"
"fmt"
"path"
"regexp"
"strconv"
2016-04-06 16:44:54 +00:00
// "strings"
2016-03-23 11:40:17 +00:00
2016-05-16 09:09:17 +00:00
"github.com/github/gh-ost/go/os"
2016-03-23 11:40:17 +00:00
"github.com/outbrain/golib/log"
)
var (
binlogChunkSizeBytes uint64 = 32 * 1024 * 1024
startEntryRegexp = regexp . MustCompile ( "^# at ([0-9]+)$" )
startEntryUnknownTableRegexp = regexp . MustCompile ( "^### Row event for unknown table .*? at ([0-9]+)$" )
endLogPosRegexp = regexp . MustCompile ( "^#[0-9]{6} .*? end_log_pos ([0-9]+)" )
statementRegxp = regexp . MustCompile ( "### (INSERT INTO|UPDATE|DELETE FROM) `(.*?)`[.]`(.*?)`" )
2016-03-24 13:25:52 +00:00
tokenRegxp = regexp . MustCompile ( "### (WHERE|SET)$" )
2016-03-24 21:27:39 +00:00
positionalColumnRegexp = regexp . MustCompile ( "### @([0-9]+)=(.+)$" )
2016-03-23 11:40:17 +00:00
)
2016-03-24 14:11:56 +00:00
// BinlogEntryState is a state in the binlog parser automaton / state machine
2016-03-23 14:25:45 +00:00
type BinlogEntryState string
2016-03-24 14:11:56 +00:00
// States of the state machine
2016-03-23 14:25:45 +00:00
const (
2016-03-24 13:25:52 +00:00
InvalidState BinlogEntryState = "InvalidState"
SearchForStartPosOrStatementState = "SearchForStartPosOrStatementState"
ExpectEndLogPosState = "ExpectEndLogPosState"
ExpectTokenState = "ExpectTokenState"
PositionalColumnAssignmentState = "PositionalColumnAssignmentState"
2016-03-23 14:25:45 +00:00
)
2016-03-23 11:40:17 +00:00
// MySQLBinlogReader reads binary log entries by executing the `mysqlbinlog`
// process and textually parsing its output
2016-03-22 14:12:51 +00:00
type MySQLBinlogReader struct {
2016-03-23 11:40:17 +00:00
Basedir string
Datadir string
MySQLBinlogBinary string
}
2016-03-24 14:11:56 +00:00
// NewMySQLBinlogReader creates a new reader that directly parses binlog files from the filesystem
2016-03-23 11:40:17 +00:00
func NewMySQLBinlogReader ( basedir string , datadir string ) ( mySQLBinlogReader * MySQLBinlogReader ) {
mySQLBinlogReader = & MySQLBinlogReader {
Basedir : basedir ,
Datadir : datadir ,
}
mySQLBinlogReader . MySQLBinlogBinary = path . Join ( mySQLBinlogReader . Basedir , "bin/mysqlbinlog" )
return mySQLBinlogReader
}
// ReadEntries will read binlog entries from parsed text output of `mysqlbinlog` utility
func ( this * MySQLBinlogReader ) ReadEntries ( logFile string , startPos uint64 , stopPos uint64 ) ( entries [ ] ( * BinlogEntry ) , err error ) {
if startPos == 0 {
startPos = 4
}
done := false
chunkStartPos := startPos
for ! done {
chunkStopPos := chunkStartPos + binlogChunkSizeBytes
if chunkStopPos > stopPos && stopPos != 0 {
chunkStopPos = stopPos
}
log . Debugf ( "Next chunk range %d - %d" , chunkStartPos , chunkStopPos )
binlogFilePath := path . Join ( this . Datadir , logFile )
command := fmt . Sprintf ( ` %s --verbose --base64-output=DECODE-ROWS --start-position=%d --stop-position=%d %s ` , this . MySQLBinlogBinary , chunkStartPos , chunkStopPos , binlogFilePath )
entriesBytes , err := os . RunCommandWithOutput ( command )
if err != nil {
return entries , log . Errore ( err )
}
2016-03-24 13:25:52 +00:00
2016-04-06 16:44:54 +00:00
chunkEntries , err := parseEntries ( bufio . NewScanner ( bytes . NewReader ( entriesBytes ) ) , logFile )
2016-03-23 11:40:17 +00:00
if err != nil {
return entries , log . Errore ( err )
}
if len ( chunkEntries ) == 0 {
done = true
} else {
entries = append ( entries , chunkEntries ... )
lastChunkEntry := chunkEntries [ len ( chunkEntries ) - 1 ]
chunkStartPos = lastChunkEntry . EndLogPos
}
}
return entries , err
2016-03-22 14:12:51 +00:00
}
2016-03-24 14:11:56 +00:00
// automaton step: accept wither beginning of new entry, or beginning of new statement
2016-03-24 13:25:52 +00:00
func searchForStartPosOrStatement ( scanner * bufio . Scanner , binlogEntry * BinlogEntry , previousEndLogPos uint64 ) ( nextState BinlogEntryState , nextBinlogEntry * BinlogEntry , err error ) {
2016-03-23 14:25:45 +00:00
onStartEntry := func ( submatch [ ] string ) ( BinlogEntryState , * BinlogEntry , error ) {
startLogPos , _ := strconv . ParseUint ( submatch [ 1 ] , 10 , 64 )
2016-03-23 11:40:17 +00:00
2016-03-23 14:25:45 +00:00
if previousEndLogPos != 0 && startLogPos != previousEndLogPos {
return InvalidState , binlogEntry , fmt . Errorf ( "Expected startLogPos %+v to equal previous endLogPos %+v" , startLogPos , previousEndLogPos )
}
nextBinlogEntry = binlogEntry
2016-04-07 13:57:12 +00:00
if binlogEntry . Coordinates . LogPos != 0 && binlogEntry . DmlEvent != nil {
2016-03-23 14:25:45 +00:00
// Current entry is already a true entry, with startpos and with statement
2016-04-06 16:44:54 +00:00
nextBinlogEntry = NewBinlogEntry ( binlogEntry . Coordinates . LogFile , startLogPos )
2016-03-23 14:25:45 +00:00
}
return ExpectEndLogPosState , nextBinlogEntry , nil
}
2016-03-23 11:40:17 +00:00
2016-03-24 13:25:52 +00:00
onStatementEntry := func ( submatch [ ] string ) ( BinlogEntryState , * BinlogEntry , error ) {
nextBinlogEntry = binlogEntry
2016-04-07 13:57:12 +00:00
if binlogEntry . Coordinates . LogPos != 0 && binlogEntry . DmlEvent != nil {
2016-03-24 13:25:52 +00:00
// Current entry is already a true entry, with startpos and with statement
2016-03-24 21:27:39 +00:00
nextBinlogEntry = binlogEntry . Duplicate ( )
2016-03-24 13:25:52 +00:00
}
2016-04-07 13:57:12 +00:00
nextBinlogEntry . DmlEvent = NewBinlogDMLEvent ( submatch [ 2 ] , submatch [ 3 ] , ToEventDML ( submatch [ 1 ] ) )
2016-03-24 13:25:52 +00:00
return ExpectTokenState , nextBinlogEntry , nil
}
2016-04-06 16:44:54 +00:00
// Defuncting the following:
// onPositionalColumn := func(submatch []string) (BinlogEntryState, *BinlogEntry, error) {
// columnIndex, _ := strconv.ParseUint(submatch[1], 10, 64)
// if _, found := binlogEntry.PositionalColumns[columnIndex]; found {
2016-04-07 13:57:12 +00:00
// return InvalidState, binlogEntry, fmt.Errorf("Positional column %+v found more than once in %+v, statement=%+v", columnIndex, binlogEntry.LogPos, binlogEntry.DmlEvent.DML)
2016-04-06 16:44:54 +00:00
// }
// columnValue := submatch[2]
// columnValue = strings.TrimPrefix(columnValue, "'")
// columnValue = strings.TrimSuffix(columnValue, "'")
// binlogEntry.PositionalColumns[columnIndex] = columnValue
//
// return SearchForStartPosOrStatementState, binlogEntry, nil
// }
2016-03-24 21:27:39 +00:00
2016-03-23 14:25:45 +00:00
line := scanner . Text ( )
if submatch := startEntryRegexp . FindStringSubmatch ( line ) ; len ( submatch ) > 1 {
return onStartEntry ( submatch )
}
if submatch := startEntryUnknownTableRegexp . FindStringSubmatch ( line ) ; len ( submatch ) > 1 {
return onStartEntry ( submatch )
}
2016-03-24 13:25:52 +00:00
if submatch := statementRegxp . FindStringSubmatch ( line ) ; len ( submatch ) > 1 {
return onStatementEntry ( submatch )
}
2016-03-24 21:27:39 +00:00
if submatch := positionalColumnRegexp . FindStringSubmatch ( line ) ; len ( submatch ) > 1 {
2016-04-06 16:44:54 +00:00
// Defuncting return onPositionalColumn(submatch)
2016-03-24 21:27:39 +00:00
}
2016-03-24 13:25:52 +00:00
// Haven't found a match
return SearchForStartPosOrStatementState , binlogEntry , nil
2016-03-23 14:25:45 +00:00
}
2016-03-23 11:40:17 +00:00
2016-03-24 14:11:56 +00:00
// automaton step: expect an end_log_pos line`
2016-03-23 14:25:45 +00:00
func expectEndLogPos ( scanner * bufio . Scanner , binlogEntry * BinlogEntry ) ( nextState BinlogEntryState , err error ) {
line := scanner . Text ( )
submatch := endLogPosRegexp . FindStringSubmatch ( line )
2016-03-24 13:25:52 +00:00
if len ( submatch ) > 1 {
binlogEntry . EndLogPos , _ = strconv . ParseUint ( submatch [ 1 ] , 10 , 64 )
return SearchForStartPosOrStatementState , nil
2016-03-23 14:25:45 +00:00
}
2016-04-06 16:44:54 +00:00
return InvalidState , fmt . Errorf ( "Expected to find end_log_pos following pos %+v" , binlogEntry . Coordinates . LogPos )
2016-03-23 14:25:45 +00:00
}
2016-03-23 11:40:17 +00:00
2016-03-24 14:11:56 +00:00
// automaton step: a not-strictly-required but good-to-have-around validation that
// we see an expected token following a statement
2016-03-24 13:25:52 +00:00
func expectToken ( scanner * bufio . Scanner , binlogEntry * BinlogEntry ) ( nextState BinlogEntryState , err error ) {
2016-03-23 14:25:45 +00:00
line := scanner . Text ( )
2016-03-24 13:25:52 +00:00
if submatch := tokenRegxp . FindStringSubmatch ( line ) ; len ( submatch ) > 1 {
return SearchForStartPosOrStatementState , nil
2016-03-23 14:25:45 +00:00
}
2016-04-06 16:44:54 +00:00
return InvalidState , fmt . Errorf ( "Expected to find token following pos %+v" , binlogEntry . Coordinates . LogPos )
2016-03-23 14:25:45 +00:00
}
2016-03-24 14:11:56 +00:00
// parseEntries will parse output of `mysqlbinlog --verbose --base64-output=DECODE-ROWS`
// It issues an automaton / state machine to do its thang.
2016-04-06 16:44:54 +00:00
func parseEntries ( scanner * bufio . Scanner , logFile string ) ( entries [ ] ( * BinlogEntry ) , err error ) {
binlogEntry := NewBinlogEntry ( logFile , 0 )
2016-03-24 13:25:52 +00:00
var state BinlogEntryState = SearchForStartPosOrStatementState
2016-03-23 14:25:45 +00:00
var endLogPos uint64
appendBinlogEntry := func ( ) {
2016-04-06 16:44:54 +00:00
if binlogEntry . Coordinates . LogPos == 0 {
2016-03-24 13:52:49 +00:00
return
2016-03-23 11:40:17 +00:00
}
2016-04-07 13:57:12 +00:00
if binlogEntry . DmlEvent == nil {
2016-03-24 13:52:49 +00:00
return
}
entries = append ( entries , binlogEntry )
log . Debugf ( "entry: %+v" , * binlogEntry )
2016-04-07 13:57:12 +00:00
fmt . Println ( fmt . Sprintf ( "%s `%s`.`%s`" , binlogEntry . DmlEvent . DML , binlogEntry . DmlEvent . DatabaseName , binlogEntry . DmlEvent . TableName ) )
2016-03-23 14:25:45 +00:00
}
for scanner . Scan ( ) {
switch state {
2016-03-24 13:25:52 +00:00
case SearchForStartPosOrStatementState :
2016-03-23 14:25:45 +00:00
{
var nextBinlogEntry * BinlogEntry
2016-03-24 13:25:52 +00:00
state , nextBinlogEntry , err = searchForStartPosOrStatement ( scanner , binlogEntry , endLogPos )
2016-03-23 14:25:45 +00:00
if nextBinlogEntry != binlogEntry {
appendBinlogEntry ( )
binlogEntry = nextBinlogEntry
}
2016-03-23 11:40:17 +00:00
}
2016-03-23 14:25:45 +00:00
case ExpectEndLogPosState :
{
state , err = expectEndLogPos ( scanner , binlogEntry )
2016-03-23 11:40:17 +00:00
}
2016-03-24 13:25:52 +00:00
case ExpectTokenState :
2016-03-23 14:25:45 +00:00
{
2016-03-24 13:25:52 +00:00
state , err = expectToken ( scanner , binlogEntry )
2016-03-23 14:25:45 +00:00
}
default :
{
err = fmt . Errorf ( "Unexpected state %+v" , state )
2016-03-23 11:40:17 +00:00
}
}
2016-03-23 14:25:45 +00:00
if err != nil {
return entries , log . Errore ( err )
}
2016-03-23 11:40:17 +00:00
}
2016-03-24 13:52:49 +00:00
appendBinlogEntry ( )
2016-03-22 14:12:51 +00:00
return entries , err
}