diff --git a/go/binlog/binlog_reader.go b/go/binlog/binlog_reader.go index ed5591b..a837249 100644 --- a/go/binlog/binlog_reader.go +++ b/go/binlog/binlog_reader.go @@ -5,8 +5,13 @@ package binlog type BinlogEntry struct { + LogPos uint64 + EndLogPos uint64 + StatementType string // INSERT, UPDATE, DELETE + DatabaseName string + TableName string } type BinlogReader interface { - ReadEntries(logFile string, startPos uint, endPos uint) (entries [](*BinlogEntry), err error) + ReadEntries(logFile string, startPos uint64, stopPos uint64) (entries [](*BinlogEntry), err error) } diff --git a/go/binlog/mysqlbinlog_reader.go b/go/binlog/mysqlbinlog_reader.go index 6b23bc5..aa5356a 100644 --- a/go/binlog/mysqlbinlog_reader.go +++ b/go/binlog/mysqlbinlog_reader.go @@ -4,9 +4,137 @@ package binlog +import ( + "bufio" + "bytes" + "fmt" + "path" + "regexp" + "strconv" + "strings" + + "github.com/github/gh-osc/go/os" + "github.com/outbrain/golib/log" +) + +var ( + binlogChunkSizeBytes uint64 = 32 * 1024 * 1024 + startEntryRegexp = regexp.MustCompile("^# at ([0-9]+)$") + startEntryUnknownTableRegexp = regexp.MustCompile("^### Row event for unknown table .*? at ([0-9]+)$") + endLogPosRegexp = regexp.MustCompile("^#[0-9]{6} .*? end_log_pos ([0-9]+)") + statementRegxp = regexp.MustCompile("### (INSERT INTO|UPDATE|DELETE FROM) `(.*?)`[.]`(.*?)`") +) + +// MySQLBinlogReader reads binary log entries by executing the `mysqlbinlog` +// process and textually parsing its output type MySQLBinlogReader struct { + Basedir string + Datadir string + MySQLBinlogBinary string } -func (this *MySQLBinlogReader) ReadEntries(logFile string, startPos uint, endPos uint) (entries [](*BinlogEntry), err error) { +func NewMySQLBinlogReader(basedir string, datadir string) (mySQLBinlogReader *MySQLBinlogReader) { + mySQLBinlogReader = &MySQLBinlogReader{ + Basedir: basedir, + Datadir: datadir, + } + mySQLBinlogReader.MySQLBinlogBinary = path.Join(mySQLBinlogReader.Basedir, "bin/mysqlbinlog") + return mySQLBinlogReader +} + +// ReadEntries will read binlog entries from parsed text output of `mysqlbinlog` utility +func (this *MySQLBinlogReader) ReadEntries(logFile string, startPos uint64, stopPos uint64) (entries [](*BinlogEntry), err error) { + if startPos == 0 { + startPos = 4 + } + done := false + chunkStartPos := startPos + for !done { + chunkStopPos := chunkStartPos + binlogChunkSizeBytes + if chunkStopPos > stopPos && stopPos != 0 { + chunkStopPos = stopPos + } + log.Debugf("Next chunk range %d - %d", chunkStartPos, chunkStopPos) + binlogFilePath := path.Join(this.Datadir, logFile) + command := fmt.Sprintf(`%s --verbose --base64-output=DECODE-ROWS --start-position=%d --stop-position=%d %s`, this.MySQLBinlogBinary, chunkStartPos, chunkStopPos, binlogFilePath) + entriesBytes, err := os.RunCommandWithOutput(command) + if err != nil { + return entries, log.Errore(err) + } + chunkEntries, err := parseEntries(entriesBytes) + if err != nil { + return entries, log.Errore(err) + } + + if len(chunkEntries) == 0 { + done = true + } else { + entries = append(entries, chunkEntries...) + lastChunkEntry := chunkEntries[len(chunkEntries)-1] + chunkStartPos = lastChunkEntry.EndLogPos + } + } + return entries, err +} + +func parseEntries(entriesBytes []byte) (entries [](*BinlogEntry), err error) { + scanner := bufio.NewScanner(bytes.NewReader(entriesBytes)) + expectEndLogPos := false + var startLogPos uint64 + var endLogPos uint64 + + binlogEntry := &BinlogEntry{} + + for scanner.Scan() { + line := scanner.Text() + + onStartEntry := func(submatch []string) error { + startLogPos, _ = strconv.ParseUint(submatch[1], 10, 64) + + if endLogPos != 0 && startLogPos != endLogPos { + return fmt.Errorf("Expected startLogPos %+v to equal previous endLogPos %+v", startLogPos, endLogPos) + } + // We are entering a new entry, let's push the previous one + if binlogEntry.LogPos != 0 && binlogEntry.StatementType != "" { + entries = append(entries, binlogEntry) + log.Debugf("entry: %+v", *binlogEntry) + binlogEntry = &BinlogEntry{} + } + + //log.Debugf(line) + binlogEntry.LogPos = startLogPos + // Next iteration we will read the end_log_pos + expectEndLogPos = true + + return nil + } + if expectEndLogPos { + submatch := endLogPosRegexp.FindStringSubmatch(line) + if len(submatch) <= 1 { + return entries, log.Errorf("Expected to find end_log_pos following pos %+v", startLogPos) + } + endLogPos, _ = strconv.ParseUint(submatch[1], 10, 64) + + binlogEntry.EndLogPos = endLogPos + expectEndLogPos = false + } else if submatch := startEntryRegexp.FindStringSubmatch(line); len(submatch) > 1 { + if err := onStartEntry(submatch); err != nil { + return entries, log.Errore(err) + } + } else if submatch := startEntryUnknownTableRegexp.FindStringSubmatch(line); len(submatch) > 1 { + if err := onStartEntry(submatch); err != nil { + return entries, log.Errore(err) + } + } else if submatch := statementRegxp.FindStringSubmatch(line); len(submatch) > 1 { + binlogEntry.StatementType = strings.Split(submatch[1], " ")[0] + binlogEntry.DatabaseName = submatch[2] + binlogEntry.TableName = submatch[3] + } + + } + if binlogEntry.LogPos != 0 { + entries = append(entries, binlogEntry) + log.Debugf("entry: %+v", *binlogEntry) + } return entries, err } diff --git a/go/cmd/gh-osc/main.go b/go/cmd/gh-osc/main.go index 18cdd26..c5e9d05 100644 --- a/go/cmd/gh-osc/main.go +++ b/go/cmd/gh-osc/main.go @@ -5,11 +5,16 @@ import ( "fmt" "os" + "github.com/github/gh-osc/go/binlog" "github.com/outbrain/golib/log" ) // main is the application's entry point. It will either spawn a CLI or HTTP itnerfaces. func main() { + mysqlBasedir := flag.String("mysql-basedir", "", "the --basedir config for MySQL (auto-detected if not given)") + mysqlDatadir := flag.String("mysql-datadir", "", "the --datadir config for MySQL (auto-detected if not given)") + internalExperiment := flag.Bool("internal-experiment", false, "issue an internal experiment") + binlogFile := flag.String("binlog-file", "", "Name of binary log file") quiet := flag.Bool("quiet", false, "quiet") verbose := flag.Bool("verbose", false, "verbose") debug := flag.Bool("debug", false, "debug mode (very verbose)") @@ -38,4 +43,10 @@ func main() { log.SetLevel(log.ERROR) } log.Info("starting gh-osc") + + if *internalExperiment { + log.Debug("starting experiment") + binlogReader := binlog.NewMySQLBinlogReader(*mysqlBasedir, *mysqlDatadir) + binlogReader.ReadEntries(*binlogFile, 0, 0) + } } diff --git a/go/os/process.go b/go/os/process.go new file mode 100644 index 0000000..ee19b66 --- /dev/null +++ b/go/os/process.go @@ -0,0 +1,67 @@ +/* + Copyright 2014 Outbrain Inc. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package os + +import ( + "github.com/outbrain/golib/log" + "io/ioutil" + "os" + "os/exec" +) + +func execCmd(commandText string, arguments ...string) (*exec.Cmd, string, error) { + commandBytes := []byte(commandText) + tmpFile, err := ioutil.TempFile("", "gh-osc-process-cmd-") + if err != nil { + return nil, "", log.Errore(err) + } + ioutil.WriteFile(tmpFile.Name(), commandBytes, 0644) + log.Debugf("execCmd: %s", commandText) + shellArguments := append([]string{}, tmpFile.Name()) + shellArguments = append(shellArguments, arguments...) + log.Debugf("%+v", shellArguments) + return exec.Command("bash", shellArguments...), tmpFile.Name(), nil + + //return exec.Command(commandText, arguments...) , "", nil +} + +// CommandRun executes a command +func CommandRun(commandText string, arguments ...string) error { + cmd, tmpFileName, err := execCmd(commandText, arguments...) + defer os.Remove(tmpFileName) + if err != nil { + return log.Errore(err) + } + err = cmd.Run() + return log.Errore(err) +} + +// RunCommandWithOutput executes a command and return output bytes +func RunCommandWithOutput(commandText string) ([]byte, error) { + cmd, tmpFileName, err := execCmd(commandText) + defer os.Remove(tmpFileName) + if err != nil { + return nil, log.Errore(err) + } + + outputBytes, err := cmd.Output() + if err != nil { + return nil, log.Errore(err) + } + + return outputBytes, nil +}