initial seems-to-be-working parser for binary logs, which reads log pos, end log pos, statement type, schema and table. Performs some validations along the way. Reads in chunks of up to 32M per chunk

This commit is contained in:
Shlomi Noach 2016-03-23 12:40:17 +01:00
parent 1576119576
commit e36bb51b69
4 changed files with 213 additions and 2 deletions

View File

@ -5,8 +5,13 @@
package binlog
type BinlogEntry struct {
LogPos uint64
EndLogPos uint64
StatementType string // INSERT, UPDATE, DELETE
DatabaseName string
TableName string
}
type BinlogReader interface {
ReadEntries(logFile string, startPos uint, endPos uint) (entries [](*BinlogEntry), err error)
ReadEntries(logFile string, startPos uint64, stopPos uint64) (entries [](*BinlogEntry), err error)
}

View File

@ -4,9 +4,137 @@
package binlog
import (
"bufio"
"bytes"
"fmt"
"path"
"regexp"
"strconv"
"strings"
"github.com/github/gh-osc/go/os"
"github.com/outbrain/golib/log"
)
var (
binlogChunkSizeBytes uint64 = 32 * 1024 * 1024
startEntryRegexp = regexp.MustCompile("^# at ([0-9]+)$")
startEntryUnknownTableRegexp = regexp.MustCompile("^### Row event for unknown table .*? at ([0-9]+)$")
endLogPosRegexp = regexp.MustCompile("^#[0-9]{6} .*? end_log_pos ([0-9]+)")
statementRegxp = regexp.MustCompile("### (INSERT INTO|UPDATE|DELETE FROM) `(.*?)`[.]`(.*?)`")
)
// MySQLBinlogReader reads binary log entries by executing the `mysqlbinlog`
// process and textually parsing its output
type MySQLBinlogReader struct {
Basedir string
Datadir string
MySQLBinlogBinary string
}
func (this *MySQLBinlogReader) ReadEntries(logFile string, startPos uint, endPos uint) (entries [](*BinlogEntry), err error) {
func NewMySQLBinlogReader(basedir string, datadir string) (mySQLBinlogReader *MySQLBinlogReader) {
mySQLBinlogReader = &MySQLBinlogReader{
Basedir: basedir,
Datadir: datadir,
}
mySQLBinlogReader.MySQLBinlogBinary = path.Join(mySQLBinlogReader.Basedir, "bin/mysqlbinlog")
return mySQLBinlogReader
}
// ReadEntries will read binlog entries from parsed text output of `mysqlbinlog` utility
func (this *MySQLBinlogReader) ReadEntries(logFile string, startPos uint64, stopPos uint64) (entries [](*BinlogEntry), err error) {
if startPos == 0 {
startPos = 4
}
done := false
chunkStartPos := startPos
for !done {
chunkStopPos := chunkStartPos + binlogChunkSizeBytes
if chunkStopPos > stopPos && stopPos != 0 {
chunkStopPos = stopPos
}
log.Debugf("Next chunk range %d - %d", chunkStartPos, chunkStopPos)
binlogFilePath := path.Join(this.Datadir, logFile)
command := fmt.Sprintf(`%s --verbose --base64-output=DECODE-ROWS --start-position=%d --stop-position=%d %s`, this.MySQLBinlogBinary, chunkStartPos, chunkStopPos, binlogFilePath)
entriesBytes, err := os.RunCommandWithOutput(command)
if err != nil {
return entries, log.Errore(err)
}
chunkEntries, err := parseEntries(entriesBytes)
if err != nil {
return entries, log.Errore(err)
}
if len(chunkEntries) == 0 {
done = true
} else {
entries = append(entries, chunkEntries...)
lastChunkEntry := chunkEntries[len(chunkEntries)-1]
chunkStartPos = lastChunkEntry.EndLogPos
}
}
return entries, err
}
func parseEntries(entriesBytes []byte) (entries [](*BinlogEntry), err error) {
scanner := bufio.NewScanner(bytes.NewReader(entriesBytes))
expectEndLogPos := false
var startLogPos uint64
var endLogPos uint64
binlogEntry := &BinlogEntry{}
for scanner.Scan() {
line := scanner.Text()
onStartEntry := func(submatch []string) error {
startLogPos, _ = strconv.ParseUint(submatch[1], 10, 64)
if endLogPos != 0 && startLogPos != endLogPos {
return fmt.Errorf("Expected startLogPos %+v to equal previous endLogPos %+v", startLogPos, endLogPos)
}
// We are entering a new entry, let's push the previous one
if binlogEntry.LogPos != 0 && binlogEntry.StatementType != "" {
entries = append(entries, binlogEntry)
log.Debugf("entry: %+v", *binlogEntry)
binlogEntry = &BinlogEntry{}
}
//log.Debugf(line)
binlogEntry.LogPos = startLogPos
// Next iteration we will read the end_log_pos
expectEndLogPos = true
return nil
}
if expectEndLogPos {
submatch := endLogPosRegexp.FindStringSubmatch(line)
if len(submatch) <= 1 {
return entries, log.Errorf("Expected to find end_log_pos following pos %+v", startLogPos)
}
endLogPos, _ = strconv.ParseUint(submatch[1], 10, 64)
binlogEntry.EndLogPos = endLogPos
expectEndLogPos = false
} else if submatch := startEntryRegexp.FindStringSubmatch(line); len(submatch) > 1 {
if err := onStartEntry(submatch); err != nil {
return entries, log.Errore(err)
}
} else if submatch := startEntryUnknownTableRegexp.FindStringSubmatch(line); len(submatch) > 1 {
if err := onStartEntry(submatch); err != nil {
return entries, log.Errore(err)
}
} else if submatch := statementRegxp.FindStringSubmatch(line); len(submatch) > 1 {
binlogEntry.StatementType = strings.Split(submatch[1], " ")[0]
binlogEntry.DatabaseName = submatch[2]
binlogEntry.TableName = submatch[3]
}
}
if binlogEntry.LogPos != 0 {
entries = append(entries, binlogEntry)
log.Debugf("entry: %+v", *binlogEntry)
}
return entries, err
}

View File

@ -5,11 +5,16 @@ import (
"fmt"
"os"
"github.com/github/gh-osc/go/binlog"
"github.com/outbrain/golib/log"
)
// main is the application's entry point. It will either spawn a CLI or HTTP itnerfaces.
func main() {
mysqlBasedir := flag.String("mysql-basedir", "", "the --basedir config for MySQL (auto-detected if not given)")
mysqlDatadir := flag.String("mysql-datadir", "", "the --datadir config for MySQL (auto-detected if not given)")
internalExperiment := flag.Bool("internal-experiment", false, "issue an internal experiment")
binlogFile := flag.String("binlog-file", "", "Name of binary log file")
quiet := flag.Bool("quiet", false, "quiet")
verbose := flag.Bool("verbose", false, "verbose")
debug := flag.Bool("debug", false, "debug mode (very verbose)")
@ -38,4 +43,10 @@ func main() {
log.SetLevel(log.ERROR)
}
log.Info("starting gh-osc")
if *internalExperiment {
log.Debug("starting experiment")
binlogReader := binlog.NewMySQLBinlogReader(*mysqlBasedir, *mysqlDatadir)
binlogReader.ReadEntries(*binlogFile, 0, 0)
}
}

67
go/os/process.go Normal file
View File

@ -0,0 +1,67 @@
/*
Copyright 2014 Outbrain Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package os
import (
"github.com/outbrain/golib/log"
"io/ioutil"
"os"
"os/exec"
)
func execCmd(commandText string, arguments ...string) (*exec.Cmd, string, error) {
commandBytes := []byte(commandText)
tmpFile, err := ioutil.TempFile("", "gh-osc-process-cmd-")
if err != nil {
return nil, "", log.Errore(err)
}
ioutil.WriteFile(tmpFile.Name(), commandBytes, 0644)
log.Debugf("execCmd: %s", commandText)
shellArguments := append([]string{}, tmpFile.Name())
shellArguments = append(shellArguments, arguments...)
log.Debugf("%+v", shellArguments)
return exec.Command("bash", shellArguments...), tmpFile.Name(), nil
//return exec.Command(commandText, arguments...) , "", nil
}
// CommandRun executes a command
func CommandRun(commandText string, arguments ...string) error {
cmd, tmpFileName, err := execCmd(commandText, arguments...)
defer os.Remove(tmpFileName)
if err != nil {
return log.Errore(err)
}
err = cmd.Run()
return log.Errore(err)
}
// RunCommandWithOutput executes a command and return output bytes
func RunCommandWithOutput(commandText string) ([]byte, error) {
cmd, tmpFileName, err := execCmd(commandText)
defer os.Remove(tmpFileName)
if err != nil {
return nil, log.Errore(err)
}
outputBytes, err := cmd.Output()
if err != nil {
return nil, log.Errore(err)
}
return outputBytes, nil
}