diff --git a/build.sh b/build.sh new file mode 100644 index 0000000..fc48ca9 --- /dev/null +++ b/build.sh @@ -0,0 +1,18 @@ +#!/bin/bash + +buildpath=/tmp/gh-osc +target=gh-osc +timestamp=$(date "+%Y%m%d%H%M%S") +mkdir -p ${buildpath} +gobuild="go build -o $buildpath/$target go/cmd/gh-osc/main.go" + +echo "Building OS/X binary" +echo "GO15VENDOREXPERIMENT=1 GOOS=darwin GOARCH=amd64 $gobuild" | bash +(cd $buildpath && tar cfz ./gh-osc-binary-osx-${timestamp}.tar.gz $target) + +echo "Building linux binary" +echo "GO15VENDOREXPERIMENT=1 GOOS=linux GOARCH=amd64 $gobuild" | bash +(cd $buildpath && tar cfz ./gh-osc-binary-linux-${timestamp}.tar.gz $target) + +echo "Binaries found in:" +ls -1 $buildpath/gh-osc-binary*${timestamp}.tar.gz diff --git a/go/binlog/binlog.go b/go/binlog/binlog.go new file mode 100644 index 0000000..6925fa1 --- /dev/null +++ b/go/binlog/binlog.go @@ -0,0 +1,145 @@ +/* + Copyright 2015 Shlomi Noach, courtesy Booking.com +*/ + +package binlog + +import ( + "errors" + "fmt" + "strconv" + "strings" +) + +type BinlogType int + +const ( + BinaryLog BinlogType = iota + RelayLog +) + +// BinlogCoordinates described binary log coordinates in the form of log file & log position. +type BinlogCoordinates struct { + LogFile string + LogPos int64 + Type BinlogType +} + +// ParseBinlogCoordinates will parse an InstanceKey from a string representation such as 127.0.0.1:3306 +func ParseBinlogCoordinates(logFileLogPos string) (*BinlogCoordinates, error) { + tokens := strings.SplitN(logFileLogPos, ":", 2) + if len(tokens) != 2 { + return nil, fmt.Errorf("ParseBinlogCoordinates: Cannot parse BinlogCoordinates from %s. Expected format is file:pos", logFileLogPos) + } + + if logPos, err := strconv.ParseInt(tokens[1], 10, 0); err != nil { + return nil, fmt.Errorf("ParseBinlogCoordinates: invalid pos: %s", tokens[1]) + } else { + return &BinlogCoordinates{LogFile: tokens[0], LogPos: logPos}, nil + } +} + +// DisplayString returns a user-friendly string representation of these coordinates +func (this *BinlogCoordinates) DisplayString() string { + return fmt.Sprintf("%s:%d", this.LogFile, this.LogPos) +} + +// String returns a user-friendly string representation of these coordinates +func (this BinlogCoordinates) String() string { + return this.DisplayString() +} + +// Equals tests equality of this corrdinate and another one. +func (this *BinlogCoordinates) Equals(other *BinlogCoordinates) bool { + if other == nil { + return false + } + return this.LogFile == other.LogFile && this.LogPos == other.LogPos && this.Type == other.Type +} + +// IsEmpty returns true if the log file is empty, unnamed +func (this *BinlogCoordinates) IsEmpty() bool { + return this.LogFile == "" +} + +// SmallerThan returns true if this coordinate is strictly smaller than the other. +func (this *BinlogCoordinates) SmallerThan(other *BinlogCoordinates) bool { + if this.LogFile < other.LogFile { + return true + } + if this.LogFile == other.LogFile && this.LogPos < other.LogPos { + return true + } + return false +} + +// SmallerThanOrEquals returns true if this coordinate is the same or equal to the other one. +// We do NOT compare the type so we can not use this.Equals() +func (this *BinlogCoordinates) SmallerThanOrEquals(other *BinlogCoordinates) bool { + if this.SmallerThan(other) { + return true + } + return this.LogFile == other.LogFile && this.LogPos == other.LogPos // No Type comparison +} + +// FileSmallerThan returns true if this coordinate's file is strictly smaller than the other's. +func (this *BinlogCoordinates) FileSmallerThan(other *BinlogCoordinates) bool { + return this.LogFile < other.LogFile +} + +// FileNumberDistance returns the numeric distance between this corrdinate's file number and the other's. +// Effectively it means "how many roatets/FLUSHes would make these coordinates's file reach the other's" +func (this *BinlogCoordinates) FileNumberDistance(other *BinlogCoordinates) int { + thisNumber, _ := this.FileNumber() + otherNumber, _ := other.FileNumber() + return otherNumber - thisNumber +} + +// FileNumber returns the numeric value of the file, and the length in characters representing the number in the filename. +// Example: FileNumber() of mysqld.log.000789 is (789, 6) +func (this *BinlogCoordinates) FileNumber() (int, int) { + tokens := strings.Split(this.LogFile, ".") + numPart := tokens[len(tokens)-1] + numLen := len(numPart) + fileNum, err := strconv.Atoi(numPart) + if err != nil { + return 0, 0 + } + return fileNum, numLen +} + +// PreviousFileCoordinatesBy guesses the filename of the previous binlog/relaylog, by given offset (number of files back) +func (this *BinlogCoordinates) PreviousFileCoordinatesBy(offset int) (BinlogCoordinates, error) { + result := BinlogCoordinates{LogPos: 0, Type: this.Type} + + fileNum, numLen := this.FileNumber() + if fileNum == 0 { + return result, errors.New("Log file number is zero, cannot detect previous file") + } + newNumStr := fmt.Sprintf("%d", (fileNum - offset)) + newNumStr = strings.Repeat("0", numLen-len(newNumStr)) + newNumStr + + tokens := strings.Split(this.LogFile, ".") + tokens[len(tokens)-1] = newNumStr + result.LogFile = strings.Join(tokens, ".") + return result, nil +} + +// PreviousFileCoordinates guesses the filename of the previous binlog/relaylog +func (this *BinlogCoordinates) PreviousFileCoordinates() (BinlogCoordinates, error) { + return this.PreviousFileCoordinatesBy(1) +} + +// NextFileCoordinates guesses the filename of the next binlog/relaylog +func (this *BinlogCoordinates) NextFileCoordinates() (BinlogCoordinates, error) { + result := BinlogCoordinates{LogPos: 0, Type: this.Type} + + fileNum, numLen := this.FileNumber() + newNumStr := fmt.Sprintf("%d", (fileNum + 1)) + newNumStr = strings.Repeat("0", numLen-len(newNumStr)) + newNumStr + + tokens := strings.Split(this.LogFile, ".") + tokens[len(tokens)-1] = newNumStr + result.LogFile = strings.Join(tokens, ".") + return result, nil +} diff --git a/go/binlog/binlog_reader.go b/go/binlog/binlog_reader.go new file mode 100644 index 0000000..a837249 --- /dev/null +++ b/go/binlog/binlog_reader.go @@ -0,0 +1,17 @@ +/* + Copyright 2016 GitHub Inc. +*/ + +package binlog + +type BinlogEntry struct { + LogPos uint64 + EndLogPos uint64 + StatementType string // INSERT, UPDATE, DELETE + DatabaseName string + TableName string +} + +type BinlogReader interface { + ReadEntries(logFile string, startPos uint64, stopPos uint64) (entries [](*BinlogEntry), err error) +} diff --git a/go/binlog/binlog_test.go b/go/binlog/binlog_test.go new file mode 100644 index 0000000..1314d1c --- /dev/null +++ b/go/binlog/binlog_test.go @@ -0,0 +1,132 @@ +/* + Copyright 2014 Outbrain Inc. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package binlog + +import ( + "github.com/outbrain/golib/log" + test "github.com/outbrain/golib/tests" + "testing" +) + +func init() { + log.SetLevel(log.ERROR) +} + +func TestBinlogCoordinates(t *testing.T) { + c1 := BinlogCoordinates{LogFile: "mysql-bin.00017", LogPos: 104} + c2 := BinlogCoordinates{LogFile: "mysql-bin.00017", LogPos: 104} + c3 := BinlogCoordinates{LogFile: "mysql-bin.00017", LogPos: 5000} + c4 := BinlogCoordinates{LogFile: "mysql-bin.00112", LogPos: 104} + + test.S(t).ExpectTrue(c1.Equals(&c2)) + test.S(t).ExpectFalse(c1.Equals(&c3)) + test.S(t).ExpectFalse(c1.Equals(&c4)) + test.S(t).ExpectFalse(c1.SmallerThan(&c2)) + test.S(t).ExpectTrue(c1.SmallerThan(&c3)) + test.S(t).ExpectTrue(c1.SmallerThan(&c4)) + test.S(t).ExpectTrue(c3.SmallerThan(&c4)) + test.S(t).ExpectFalse(c3.SmallerThan(&c2)) + test.S(t).ExpectFalse(c4.SmallerThan(&c2)) + test.S(t).ExpectFalse(c4.SmallerThan(&c3)) + + test.S(t).ExpectTrue(c1.SmallerThanOrEquals(&c2)) + test.S(t).ExpectTrue(c1.SmallerThanOrEquals(&c3)) +} + +func TestBinlogNext(t *testing.T) { + c1 := BinlogCoordinates{LogFile: "mysql-bin.00017", LogPos: 104} + cres, err := c1.NextFileCoordinates() + + test.S(t).ExpectNil(err) + test.S(t).ExpectEquals(c1.Type, cres.Type) + test.S(t).ExpectEquals(cres.LogFile, "mysql-bin.00018") + + c2 := BinlogCoordinates{LogFile: "mysql-bin.00099", LogPos: 104} + cres, err = c2.NextFileCoordinates() + + test.S(t).ExpectNil(err) + test.S(t).ExpectEquals(c1.Type, cres.Type) + test.S(t).ExpectEquals(cres.LogFile, "mysql-bin.00100") + + c3 := BinlogCoordinates{LogFile: "mysql.00.prod.com.00099", LogPos: 104} + cres, err = c3.NextFileCoordinates() + + test.S(t).ExpectNil(err) + test.S(t).ExpectEquals(c1.Type, cres.Type) + test.S(t).ExpectEquals(cres.LogFile, "mysql.00.prod.com.00100") +} + +func TestBinlogPrevious(t *testing.T) { + c1 := BinlogCoordinates{LogFile: "mysql-bin.00017", LogPos: 104} + cres, err := c1.PreviousFileCoordinates() + + test.S(t).ExpectNil(err) + test.S(t).ExpectEquals(c1.Type, cres.Type) + test.S(t).ExpectEquals(cres.LogFile, "mysql-bin.00016") + + c2 := BinlogCoordinates{LogFile: "mysql-bin.00100", LogPos: 104} + cres, err = c2.PreviousFileCoordinates() + + test.S(t).ExpectNil(err) + test.S(t).ExpectEquals(c1.Type, cres.Type) + test.S(t).ExpectEquals(cres.LogFile, "mysql-bin.00099") + + c3 := BinlogCoordinates{LogFile: "mysql.00.prod.com.00100", LogPos: 104} + cres, err = c3.PreviousFileCoordinates() + + test.S(t).ExpectNil(err) + test.S(t).ExpectEquals(c1.Type, cres.Type) + test.S(t).ExpectEquals(cres.LogFile, "mysql.00.prod.com.00099") + + c4 := BinlogCoordinates{LogFile: "mysql.00.prod.com.00000", LogPos: 104} + _, err = c4.PreviousFileCoordinates() + + test.S(t).ExpectNotNil(err) +} + +func TestBinlogCoordinatesAsKey(t *testing.T) { + m := make(map[BinlogCoordinates]bool) + + c1 := BinlogCoordinates{LogFile: "mysql-bin.00017", LogPos: 104} + c2 := BinlogCoordinates{LogFile: "mysql-bin.00022", LogPos: 104} + c3 := BinlogCoordinates{LogFile: "mysql-bin.00017", LogPos: 104} + c4 := BinlogCoordinates{LogFile: "mysql-bin.00017", LogPos: 222} + + m[c1] = true + m[c2] = true + m[c3] = true + m[c4] = true + + test.S(t).ExpectEquals(len(m), 3) +} + +func TestBinlogFileNumber(t *testing.T) { + c1 := BinlogCoordinates{LogFile: "mysql-bin.00017", LogPos: 104} + c2 := BinlogCoordinates{LogFile: "mysql-bin.00022", LogPos: 104} + + test.S(t).ExpectEquals(c1.FileNumberDistance(&c1), 0) + test.S(t).ExpectEquals(c1.FileNumberDistance(&c2), 5) + test.S(t).ExpectEquals(c2.FileNumberDistance(&c1), -5) +} + +func TestBinlogFileNumberDistance(t *testing.T) { + c1 := BinlogCoordinates{LogFile: "mysql-bin.00017", LogPos: 104} + fileNum, numLen := c1.FileNumber() + + test.S(t).ExpectEquals(fileNum, 17) + test.S(t).ExpectEquals(numLen, 5) +} diff --git a/go/binlog/mysqlbinlog_reader.go b/go/binlog/mysqlbinlog_reader.go new file mode 100644 index 0000000..aa5356a --- /dev/null +++ b/go/binlog/mysqlbinlog_reader.go @@ -0,0 +1,140 @@ +/* + Copyright 2016 GitHub Inc. +*/ + +package binlog + +import ( + "bufio" + "bytes" + "fmt" + "path" + "regexp" + "strconv" + "strings" + + "github.com/github/gh-osc/go/os" + "github.com/outbrain/golib/log" +) + +var ( + binlogChunkSizeBytes uint64 = 32 * 1024 * 1024 + startEntryRegexp = regexp.MustCompile("^# at ([0-9]+)$") + startEntryUnknownTableRegexp = regexp.MustCompile("^### Row event for unknown table .*? at ([0-9]+)$") + endLogPosRegexp = regexp.MustCompile("^#[0-9]{6} .*? end_log_pos ([0-9]+)") + statementRegxp = regexp.MustCompile("### (INSERT INTO|UPDATE|DELETE FROM) `(.*?)`[.]`(.*?)`") +) + +// MySQLBinlogReader reads binary log entries by executing the `mysqlbinlog` +// process and textually parsing its output +type MySQLBinlogReader struct { + Basedir string + Datadir string + MySQLBinlogBinary string +} + +func NewMySQLBinlogReader(basedir string, datadir string) (mySQLBinlogReader *MySQLBinlogReader) { + mySQLBinlogReader = &MySQLBinlogReader{ + Basedir: basedir, + Datadir: datadir, + } + mySQLBinlogReader.MySQLBinlogBinary = path.Join(mySQLBinlogReader.Basedir, "bin/mysqlbinlog") + return mySQLBinlogReader +} + +// ReadEntries will read binlog entries from parsed text output of `mysqlbinlog` utility +func (this *MySQLBinlogReader) ReadEntries(logFile string, startPos uint64, stopPos uint64) (entries [](*BinlogEntry), err error) { + if startPos == 0 { + startPos = 4 + } + done := false + chunkStartPos := startPos + for !done { + chunkStopPos := chunkStartPos + binlogChunkSizeBytes + if chunkStopPos > stopPos && stopPos != 0 { + chunkStopPos = stopPos + } + log.Debugf("Next chunk range %d - %d", chunkStartPos, chunkStopPos) + binlogFilePath := path.Join(this.Datadir, logFile) + command := fmt.Sprintf(`%s --verbose --base64-output=DECODE-ROWS --start-position=%d --stop-position=%d %s`, this.MySQLBinlogBinary, chunkStartPos, chunkStopPos, binlogFilePath) + entriesBytes, err := os.RunCommandWithOutput(command) + if err != nil { + return entries, log.Errore(err) + } + chunkEntries, err := parseEntries(entriesBytes) + if err != nil { + return entries, log.Errore(err) + } + + if len(chunkEntries) == 0 { + done = true + } else { + entries = append(entries, chunkEntries...) + lastChunkEntry := chunkEntries[len(chunkEntries)-1] + chunkStartPos = lastChunkEntry.EndLogPos + } + } + return entries, err +} + +func parseEntries(entriesBytes []byte) (entries [](*BinlogEntry), err error) { + scanner := bufio.NewScanner(bytes.NewReader(entriesBytes)) + expectEndLogPos := false + var startLogPos uint64 + var endLogPos uint64 + + binlogEntry := &BinlogEntry{} + + for scanner.Scan() { + line := scanner.Text() + + onStartEntry := func(submatch []string) error { + startLogPos, _ = strconv.ParseUint(submatch[1], 10, 64) + + if endLogPos != 0 && startLogPos != endLogPos { + return fmt.Errorf("Expected startLogPos %+v to equal previous endLogPos %+v", startLogPos, endLogPos) + } + // We are entering a new entry, let's push the previous one + if binlogEntry.LogPos != 0 && binlogEntry.StatementType != "" { + entries = append(entries, binlogEntry) + log.Debugf("entry: %+v", *binlogEntry) + binlogEntry = &BinlogEntry{} + } + + //log.Debugf(line) + binlogEntry.LogPos = startLogPos + // Next iteration we will read the end_log_pos + expectEndLogPos = true + + return nil + } + if expectEndLogPos { + submatch := endLogPosRegexp.FindStringSubmatch(line) + if len(submatch) <= 1 { + return entries, log.Errorf("Expected to find end_log_pos following pos %+v", startLogPos) + } + endLogPos, _ = strconv.ParseUint(submatch[1], 10, 64) + + binlogEntry.EndLogPos = endLogPos + expectEndLogPos = false + } else if submatch := startEntryRegexp.FindStringSubmatch(line); len(submatch) > 1 { + if err := onStartEntry(submatch); err != nil { + return entries, log.Errore(err) + } + } else if submatch := startEntryUnknownTableRegexp.FindStringSubmatch(line); len(submatch) > 1 { + if err := onStartEntry(submatch); err != nil { + return entries, log.Errore(err) + } + } else if submatch := statementRegxp.FindStringSubmatch(line); len(submatch) > 1 { + binlogEntry.StatementType = strings.Split(submatch[1], " ")[0] + binlogEntry.DatabaseName = submatch[2] + binlogEntry.TableName = submatch[3] + } + + } + if binlogEntry.LogPos != 0 { + entries = append(entries, binlogEntry) + log.Debugf("entry: %+v", *binlogEntry) + } + return entries, err +} diff --git a/go/cmd/gh-osc/main.go b/go/cmd/gh-osc/main.go new file mode 100644 index 0000000..c5e9d05 --- /dev/null +++ b/go/cmd/gh-osc/main.go @@ -0,0 +1,52 @@ +package main + +import ( + "flag" + "fmt" + "os" + + "github.com/github/gh-osc/go/binlog" + "github.com/outbrain/golib/log" +) + +// main is the application's entry point. It will either spawn a CLI or HTTP itnerfaces. +func main() { + mysqlBasedir := flag.String("mysql-basedir", "", "the --basedir config for MySQL (auto-detected if not given)") + mysqlDatadir := flag.String("mysql-datadir", "", "the --datadir config for MySQL (auto-detected if not given)") + internalExperiment := flag.Bool("internal-experiment", false, "issue an internal experiment") + binlogFile := flag.String("binlog-file", "", "Name of binary log file") + quiet := flag.Bool("quiet", false, "quiet") + verbose := flag.Bool("verbose", false, "verbose") + debug := flag.Bool("debug", false, "debug mode (very verbose)") + stack := flag.Bool("stack", false, "add stack trace upon error") + help := flag.Bool("help", false, "Display usage") + flag.Parse() + + if *help { + fmt.Fprintf(os.Stderr, "Usage of gh-osc:\n") + flag.PrintDefaults() + return + } + + log.SetLevel(log.ERROR) + if *verbose { + log.SetLevel(log.INFO) + } + if *debug { + log.SetLevel(log.DEBUG) + } + if *stack { + log.SetPrintStackTrace(*stack) + } + if *quiet { + // Override!! + log.SetLevel(log.ERROR) + } + log.Info("starting gh-osc") + + if *internalExperiment { + log.Debug("starting experiment") + binlogReader := binlog.NewMySQLBinlogReader(*mysqlBasedir, *mysqlDatadir) + binlogReader.ReadEntries(*binlogFile, 0, 0) + } +} diff --git a/go/os/process.go b/go/os/process.go new file mode 100644 index 0000000..ee19b66 --- /dev/null +++ b/go/os/process.go @@ -0,0 +1,67 @@ +/* + Copyright 2014 Outbrain Inc. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package os + +import ( + "github.com/outbrain/golib/log" + "io/ioutil" + "os" + "os/exec" +) + +func execCmd(commandText string, arguments ...string) (*exec.Cmd, string, error) { + commandBytes := []byte(commandText) + tmpFile, err := ioutil.TempFile("", "gh-osc-process-cmd-") + if err != nil { + return nil, "", log.Errore(err) + } + ioutil.WriteFile(tmpFile.Name(), commandBytes, 0644) + log.Debugf("execCmd: %s", commandText) + shellArguments := append([]string{}, tmpFile.Name()) + shellArguments = append(shellArguments, arguments...) + log.Debugf("%+v", shellArguments) + return exec.Command("bash", shellArguments...), tmpFile.Name(), nil + + //return exec.Command(commandText, arguments...) , "", nil +} + +// CommandRun executes a command +func CommandRun(commandText string, arguments ...string) error { + cmd, tmpFileName, err := execCmd(commandText, arguments...) + defer os.Remove(tmpFileName) + if err != nil { + return log.Errore(err) + } + err = cmd.Run() + return log.Errore(err) +} + +// RunCommandWithOutput executes a command and return output bytes +func RunCommandWithOutput(commandText string) ([]byte, error) { + cmd, tmpFileName, err := execCmd(commandText) + defer os.Remove(tmpFileName) + if err != nil { + return nil, log.Errore(err) + } + + outputBytes, err := cmd.Output() + if err != nil { + return nil, log.Errore(err) + } + + return outputBytes, nil +}