Merge pull request #4 from github/parsing-mysqlbinlog

further work on mysqlbinlog_reader
This commit is contained in:
Shlomi Noach 2016-03-30 10:25:59 +02:00
commit e3210a9fa2
10 changed files with 509 additions and 80 deletions

View File

@ -1,4 +1,6 @@
#!/bin/bash
#
#
buildpath=/tmp/gh-osc
target=gh-osc

View File

@ -1,5 +1,5 @@
/*
Copyright 2015 Shlomi Noach, courtesy Booking.com
Copyright 2015 Shlomi Noach
*/
package binlog
@ -11,8 +11,10 @@ import (
"strings"
)
// BinlogType identifies the type of the log: relay or binary log
type BinlogType int
// BinaryLog, RelayLog are binlog types
const (
BinaryLog BinlogType = iota
RelayLog

31
go/binlog/binlog_entry.go Normal file
View File

@ -0,0 +1,31 @@
/*
Copyright 2016 GitHub Inc.
See https://github.com/github/gh-osc/blob/master/LICENSE
*/
package binlog
// BinlogEntry describes an entry in the binary log
type BinlogEntry struct {
LogPos uint64
EndLogPos uint64
StatementType string // INSERT, UPDATE, DELETE
DatabaseName string
TableName string
PositionalColumns map[uint64]interface{}
}
// NewBinlogEntry creates an empty, ready to go BinlogEntry object
func NewBinlogEntry() *BinlogEntry {
binlogEntry := &BinlogEntry{}
binlogEntry.PositionalColumns = make(map[uint64]interface{})
return binlogEntry
}
// Duplicate creates and returns a new binlog entry, with some of the attributes pre-assigned
func (this *BinlogEntry) Duplicate() *BinlogEntry {
binlogEntry := NewBinlogEntry()
binlogEntry.LogPos = this.LogPos
binlogEntry.EndLogPos = this.EndLogPos
return binlogEntry
}

View File

@ -1,17 +1,12 @@
/*
Copyright 2016 GitHub Inc.
See https://github.com/github/gh-osc/blob/master/LICENSE
*/
package binlog
type BinlogEntry struct {
LogPos uint64
EndLogPos uint64
StatementType string // INSERT, UPDATE, DELETE
DatabaseName string
TableName string
}
// BinlogReader is a general interface whose implementations can choose their methods of reading
// a binary log file and parsing it into binlog entries
type BinlogReader interface {
ReadEntries(logFile string, startPos uint64, stopPos uint64) (entries [](*BinlogEntry), err error)
}

View File

@ -1,25 +1,15 @@
/*
Copyright 2014 Outbrain Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
Copyright 2016 GitHub Inc.
See https://github.com/github/gh-osc/blob/master/LICENSE
*/
package binlog
import (
"testing"
"github.com/outbrain/golib/log"
test "github.com/outbrain/golib/tests"
"testing"
)
func init() {

View File

@ -1,5 +1,6 @@
/*
Copyright 2016 GitHub Inc.
See https://github.com/github/gh-osc/blob/master/LICENSE
*/
package binlog
@ -23,6 +24,20 @@ var (
startEntryUnknownTableRegexp = regexp.MustCompile("^### Row event for unknown table .*? at ([0-9]+)$")
endLogPosRegexp = regexp.MustCompile("^#[0-9]{6} .*? end_log_pos ([0-9]+)")
statementRegxp = regexp.MustCompile("### (INSERT INTO|UPDATE|DELETE FROM) `(.*?)`[.]`(.*?)`")
tokenRegxp = regexp.MustCompile("### (WHERE|SET)$")
positionalColumnRegexp = regexp.MustCompile("### @([0-9]+)=(.+)$")
)
// BinlogEntryState is a state in the binlog parser automaton / state machine
type BinlogEntryState string
// States of the state machine
const (
InvalidState BinlogEntryState = "InvalidState"
SearchForStartPosOrStatementState = "SearchForStartPosOrStatementState"
ExpectEndLogPosState = "ExpectEndLogPosState"
ExpectTokenState = "ExpectTokenState"
PositionalColumnAssignmentState = "PositionalColumnAssignmentState"
)
// MySQLBinlogReader reads binary log entries by executing the `mysqlbinlog`
@ -33,6 +48,7 @@ type MySQLBinlogReader struct {
MySQLBinlogBinary string
}
// NewMySQLBinlogReader creates a new reader that directly parses binlog files from the filesystem
func NewMySQLBinlogReader(basedir string, datadir string) (mySQLBinlogReader *MySQLBinlogReader) {
mySQLBinlogReader = &MySQLBinlogReader{
Basedir: basedir,
@ -61,7 +77,8 @@ func (this *MySQLBinlogReader) ReadEntries(logFile string, startPos uint64, stop
if err != nil {
return entries, log.Errore(err)
}
chunkEntries, err := parseEntries(entriesBytes)
chunkEntries, err := parseEntries(bufio.NewScanner(bytes.NewReader(entriesBytes)))
if err != nil {
return entries, log.Errore(err)
}
@ -77,64 +94,136 @@ func (this *MySQLBinlogReader) ReadEntries(logFile string, startPos uint64, stop
return entries, err
}
func parseEntries(entriesBytes []byte) (entries [](*BinlogEntry), err error) {
scanner := bufio.NewScanner(bytes.NewReader(entriesBytes))
expectEndLogPos := false
var startLogPos uint64
var endLogPos uint64
// automaton step: accept wither beginning of new entry, or beginning of new statement
func searchForStartPosOrStatement(scanner *bufio.Scanner, binlogEntry *BinlogEntry, previousEndLogPos uint64) (nextState BinlogEntryState, nextBinlogEntry *BinlogEntry, err error) {
onStartEntry := func(submatch []string) (BinlogEntryState, *BinlogEntry, error) {
startLogPos, _ := strconv.ParseUint(submatch[1], 10, 64)
binlogEntry := &BinlogEntry{}
if previousEndLogPos != 0 && startLogPos != previousEndLogPos {
return InvalidState, binlogEntry, fmt.Errorf("Expected startLogPos %+v to equal previous endLogPos %+v", startLogPos, previousEndLogPos)
}
nextBinlogEntry = binlogEntry
if binlogEntry.LogPos != 0 && binlogEntry.StatementType != "" {
// Current entry is already a true entry, with startpos and with statement
nextBinlogEntry = NewBinlogEntry()
}
for scanner.Scan() {
nextBinlogEntry.LogPos = startLogPos
return ExpectEndLogPosState, nextBinlogEntry, nil
}
onStatementEntry := func(submatch []string) (BinlogEntryState, *BinlogEntry, error) {
nextBinlogEntry = binlogEntry
if binlogEntry.LogPos != 0 && binlogEntry.StatementType != "" {
// Current entry is already a true entry, with startpos and with statement
nextBinlogEntry = binlogEntry.Duplicate()
}
nextBinlogEntry.StatementType = strings.Split(submatch[1], " ")[0]
nextBinlogEntry.DatabaseName = submatch[2]
nextBinlogEntry.TableName = submatch[3]
return ExpectTokenState, nextBinlogEntry, nil
}
onPositionalColumn := func(submatch []string) (BinlogEntryState, *BinlogEntry, error) {
columnIndex, _ := strconv.ParseUint(submatch[1], 10, 64)
if _, found := binlogEntry.PositionalColumns[columnIndex]; found {
return InvalidState, binlogEntry, fmt.Errorf("Positional column %+v found more than once in %+v, statement=%+v", columnIndex, binlogEntry.LogPos, binlogEntry.StatementType)
}
columnValue := submatch[2]
columnValue = strings.TrimPrefix(columnValue, "'")
columnValue = strings.TrimSuffix(columnValue, "'")
binlogEntry.PositionalColumns[columnIndex] = columnValue
return SearchForStartPosOrStatementState, binlogEntry, nil
}
line := scanner.Text()
if submatch := startEntryRegexp.FindStringSubmatch(line); len(submatch) > 1 {
return onStartEntry(submatch)
}
if submatch := startEntryUnknownTableRegexp.FindStringSubmatch(line); len(submatch) > 1 {
return onStartEntry(submatch)
}
if submatch := statementRegxp.FindStringSubmatch(line); len(submatch) > 1 {
return onStatementEntry(submatch)
}
if submatch := positionalColumnRegexp.FindStringSubmatch(line); len(submatch) > 1 {
return onPositionalColumn(submatch)
}
// Haven't found a match
return SearchForStartPosOrStatementState, binlogEntry, nil
}
// automaton step: expect an end_log_pos line`
func expectEndLogPos(scanner *bufio.Scanner, binlogEntry *BinlogEntry) (nextState BinlogEntryState, err error) {
line := scanner.Text()
onStartEntry := func(submatch []string) error {
startLogPos, _ = strconv.ParseUint(submatch[1], 10, 64)
if endLogPos != 0 && startLogPos != endLogPos {
return fmt.Errorf("Expected startLogPos %+v to equal previous endLogPos %+v", startLogPos, endLogPos)
}
// We are entering a new entry, let's push the previous one
if binlogEntry.LogPos != 0 && binlogEntry.StatementType != "" {
entries = append(entries, binlogEntry)
log.Debugf("entry: %+v", *binlogEntry)
binlogEntry = &BinlogEntry{}
}
//log.Debugf(line)
binlogEntry.LogPos = startLogPos
// Next iteration we will read the end_log_pos
expectEndLogPos = true
return nil
}
if expectEndLogPos {
submatch := endLogPosRegexp.FindStringSubmatch(line)
if len(submatch) <= 1 {
return entries, log.Errorf("Expected to find end_log_pos following pos %+v", startLogPos)
if len(submatch) > 1 {
binlogEntry.EndLogPos, _ = strconv.ParseUint(submatch[1], 10, 64)
return SearchForStartPosOrStatementState, nil
}
endLogPos, _ = strconv.ParseUint(submatch[1], 10, 64)
binlogEntry.EndLogPos = endLogPos
expectEndLogPos = false
} else if submatch := startEntryRegexp.FindStringSubmatch(line); len(submatch) > 1 {
if err := onStartEntry(submatch); err != nil {
return entries, log.Errore(err)
}
} else if submatch := startEntryUnknownTableRegexp.FindStringSubmatch(line); len(submatch) > 1 {
if err := onStartEntry(submatch); err != nil {
return entries, log.Errore(err)
}
} else if submatch := statementRegxp.FindStringSubmatch(line); len(submatch) > 1 {
binlogEntry.StatementType = strings.Split(submatch[1], " ")[0]
binlogEntry.DatabaseName = submatch[2]
binlogEntry.TableName = submatch[3]
return InvalidState, fmt.Errorf("Expected to find end_log_pos following pos %+v", binlogEntry.LogPos)
}
// automaton step: a not-strictly-required but good-to-have-around validation that
// we see an expected token following a statement
func expectToken(scanner *bufio.Scanner, binlogEntry *BinlogEntry) (nextState BinlogEntryState, err error) {
line := scanner.Text()
if submatch := tokenRegxp.FindStringSubmatch(line); len(submatch) > 1 {
return SearchForStartPosOrStatementState, nil
}
return InvalidState, fmt.Errorf("Expected to find token following pos %+v", binlogEntry.LogPos)
}
// parseEntries will parse output of `mysqlbinlog --verbose --base64-output=DECODE-ROWS`
// It issues an automaton / state machine to do its thang.
func parseEntries(scanner *bufio.Scanner) (entries [](*BinlogEntry), err error) {
binlogEntry := NewBinlogEntry()
var state BinlogEntryState = SearchForStartPosOrStatementState
var endLogPos uint64
appendBinlogEntry := func() {
if binlogEntry.LogPos == 0 {
return
}
if binlogEntry.StatementType == "" {
return
}
if binlogEntry.LogPos != 0 {
entries = append(entries, binlogEntry)
log.Debugf("entry: %+v", *binlogEntry)
fmt.Println(fmt.Sprintf("%s `%s`.`%s`", binlogEntry.StatementType, binlogEntry.DatabaseName, binlogEntry.TableName))
}
for scanner.Scan() {
switch state {
case SearchForStartPosOrStatementState:
{
var nextBinlogEntry *BinlogEntry
state, nextBinlogEntry, err = searchForStartPosOrStatement(scanner, binlogEntry, endLogPos)
if nextBinlogEntry != binlogEntry {
appendBinlogEntry()
binlogEntry = nextBinlogEntry
}
}
case ExpectEndLogPosState:
{
state, err = expectEndLogPos(scanner, binlogEntry)
}
case ExpectTokenState:
{
state, err = expectToken(scanner, binlogEntry)
}
default:
{
err = fmt.Errorf("Unexpected state %+v", state)
}
}
if err != nil {
return entries, log.Errore(err)
}
}
appendBinlogEntry()
return entries, err
}

View File

@ -0,0 +1,50 @@
/*
Copyright 2016 GitHub Inc.
See https://github.com/github/gh-osc/blob/master/LICENSE
*/
package binlog
import (
"bufio"
"os"
"testing"
"github.com/outbrain/golib/log"
test "github.com/outbrain/golib/tests"
)
func init() {
log.SetLevel(log.ERROR)
}
func TestRBRSample0(t *testing.T) {
testFile, err := os.Open("testdata/rbr-sample-0.txt")
test.S(t).ExpectNil(err)
defer testFile.Close()
scanner := bufio.NewScanner(testFile)
entries, err := parseEntries(scanner)
test.S(t).ExpectNil(err)
test.S(t).ExpectEquals(len(entries), 17)
test.S(t).ExpectEquals(entries[0].DatabaseName, "test")
test.S(t).ExpectEquals(entries[0].TableName, "samplet")
test.S(t).ExpectEquals(entries[0].StatementType, "INSERT")
test.S(t).ExpectEquals(entries[1].StatementType, "INSERT")
test.S(t).ExpectEquals(entries[2].StatementType, "INSERT")
test.S(t).ExpectEquals(entries[3].StatementType, "INSERT")
test.S(t).ExpectEquals(entries[4].StatementType, "INSERT")
test.S(t).ExpectEquals(entries[5].StatementType, "INSERT")
test.S(t).ExpectEquals(entries[6].StatementType, "UPDATE")
test.S(t).ExpectEquals(entries[7].StatementType, "DELETE")
test.S(t).ExpectEquals(entries[8].StatementType, "UPDATE")
test.S(t).ExpectEquals(entries[9].StatementType, "INSERT")
test.S(t).ExpectEquals(entries[10].StatementType, "INSERT")
test.S(t).ExpectEquals(entries[11].StatementType, "DELETE")
test.S(t).ExpectEquals(entries[12].StatementType, "DELETE")
test.S(t).ExpectEquals(entries[13].StatementType, "INSERT")
test.S(t).ExpectEquals(entries[14].StatementType, "UPDATE")
test.S(t).ExpectEquals(entries[15].StatementType, "DELETE")
test.S(t).ExpectEquals(entries[16].StatementType, "INSERT")
}

267
go/binlog/testdata/rbr-sample-0.txt vendored Normal file
View File

@ -0,0 +1,267 @@
/*
these are the statements that were used to execute the RBR log:
create table samplet(id int primary key, license int, name varchar(64), unique key license_uidx(license)) engine=innodb;
insert into samplet values (1,1,'a');
insert into samplet values (2,2,'extended'),(3,3,'extended');
begin;
insert into samplet values (4,4,'transaction');
insert into samplet values (5,5,'transaction');
insert into samplet values (6,6,'transaction');
commit;
update samplet set name='update' where id=5;
replace into samplet values (2,4,'replaced 2,4');
insert into samplet values (7,7,'7');
insert into samplet values (8,8,'8');
delete from samplet where id >= 7;
insert into samplet values (9,9,'9');
begin;
update samplet set name='update 9' where id=9;
delete from samplet where license=3;
insert into samplet values (10,10,'10');
commit;
*/
/*!50530 SET @@SESSION.PSEUDO_SLAVE_MODE=1*/;
/*!40019 SET @@session.max_insert_delayed_threads=0*/;
/*!50003 SET @OLD_COMPLETION_TYPE=@@COMPLETION_TYPE,COMPLETION_TYPE=0*/;
DELIMITER /*!*/;
# at 4
#160324 11:06:59 server id 104 end_log_pos 120 CRC32 0x059174d8 Start: binlog v 4, server v 5.6.28-log created 160324 11:06:59
# at 120
#160324 11:09:53 server id 1 end_log_pos 313 CRC32 0x511efdf1 Query thread_id=7940 exec_time=0 error_code=0
use `test`/*!*/;
SET TIMESTAMP=1458814193/*!*/;
SET @@session.pseudo_thread_id=7940/*!*/;
SET @@session.foreign_key_checks=1, @@session.sql_auto_is_null=0, @@session.unique_checks=1, @@session.autocommit=1/*!*/;
SET @@session.sql_mode=1073741824/*!*/;
SET @@session.auto_increment_increment=1, @@session.auto_increment_offset=1/*!*/;
/*!\C utf8 *//*!*/;
SET @@session.character_set_client=33,@@session.collation_connection=33,@@session.collation_server=8/*!*/;
SET @@session.lc_time_names=0/*!*/;
SET @@session.collation_database=DEFAULT/*!*/;
create table samplet(id int primary key, license int, name varchar(64), unique key license_uidx(license)) engine=innodb
/*!*/;
# at 313
#160324 11:10:13 server id 1 end_log_pos 385 CRC32 0x6b95100a Query thread_id=7940 exec_time=0 error_code=0
SET TIMESTAMP=1458814213/*!*/;
BEGIN
/*!*/;
# at 385
#160324 11:10:13 server id 1 end_log_pos 439 CRC32 0xfa97ad69 Table_map: `test`.`samplet` mapped to number 72
# at 439
#160324 11:10:13 server id 1 end_log_pos 485 CRC32 0xae356826 Write_rows: table id 72 flags: STMT_END_F
### INSERT INTO `test`.`samplet`
### SET
### @1=1
### @2=1
### @3='a'
# at 485
#160324 11:10:13 server id 1 end_log_pos 516 CRC32 0xf60389e3 Xid = 49802
COMMIT/*!*/;
# at 516
#160324 11:10:35 server id 1 end_log_pos 588 CRC32 0x1a8730ad Query thread_id=7940 exec_time=0 error_code=0
SET TIMESTAMP=1458814235/*!*/;
BEGIN
/*!*/;
# at 588
#160324 11:10:35 server id 1 end_log_pos 642 CRC32 0xac564207 Table_map: `test`.`samplet` mapped to number 72
# at 642
#160324 11:10:35 server id 1 end_log_pos 713 CRC32 0x3020ee9e Write_rows: table id 72 flags: STMT_END_F
### INSERT INTO `test`.`samplet`
### SET
### @1=2
### @2=2
### @3='extended'
### INSERT INTO `test`.`samplet`
### SET
### @1=3
### @2=3
### @3='extended'
# at 713
#160324 11:10:35 server id 1 end_log_pos 744 CRC32 0x341f0c1d Xid = 49848
COMMIT/*!*/;
# at 744
#160324 11:10:47 server id 1 end_log_pos 816 CRC32 0x2454c8aa Query thread_id=7940 exec_time=14 error_code=0
SET TIMESTAMP=1458814247/*!*/;
BEGIN
/*!*/;
# at 816
#160324 11:10:47 server id 1 end_log_pos 870 CRC32 0x92018566 Table_map: `test`.`samplet` mapped to number 72
# at 870
#160324 11:10:47 server id 1 end_log_pos 926 CRC32 0x5b882310 Write_rows: table id 72 flags: STMT_END_F
### INSERT INTO `test`.`samplet`
### SET
### @1=4
### @2=4
### @3='transaction'
# at 926
#160324 11:10:54 server id 1 end_log_pos 980 CRC32 0x374b624b Table_map: `test`.`samplet` mapped to number 72
# at 980
#160324 11:10:54 server id 1 end_log_pos 1036 CRC32 0xfff6a2b9 Write_rows: table id 72 flags: STMT_END_F
### INSERT INTO `test`.`samplet`
### SET
### @1=5
### @2=5
### @3='transaction'
# at 1036
#160324 11:10:59 server id 1 end_log_pos 1090 CRC32 0x37e19690 Table_map: `test`.`samplet` mapped to number 72
# at 1090
#160324 11:10:59 server id 1 end_log_pos 1146 CRC32 0x58a01053 Write_rows: table id 72 flags: STMT_END_F
### INSERT INTO `test`.`samplet`
### SET
### @1=6
### @2=6
### @3='transaction'
# at 1146
#160324 11:10:59 server id 1 end_log_pos 1177 CRC32 0xdd5de027 Xid = 49894
COMMIT/*!*/;
# at 1177
#160324 11:11:16 server id 1 end_log_pos 1249 CRC32 0x5c4a609b Query thread_id=7940 exec_time=0 error_code=0
SET TIMESTAMP=1458814276/*!*/;
BEGIN
/*!*/;
# at 1249
#160324 11:11:16 server id 1 end_log_pos 1303 CRC32 0x9d3c756b Table_map: `test`.`samplet` mapped to number 72
# at 1303
#160324 11:11:16 server id 1 end_log_pos 1352 CRC32 0x9b0d2ff4 Update_rows: table id 72 flags: STMT_END_F
### UPDATE `test`.`samplet`
### WHERE
### @1=5
### SET
### @3='update'
# at 1352
#160324 11:11:16 server id 1 end_log_pos 1383 CRC32 0x8e051bed Xid = 49931
COMMIT/*!*/;
# at 1383
#160324 11:11:44 server id 1 end_log_pos 1455 CRC32 0xe9744e83 Query thread_id=7940 exec_time=0 error_code=0
SET TIMESTAMP=1458814304/*!*/;
BEGIN
/*!*/;
# at 1455
#160324 11:11:44 server id 1 end_log_pos 1509 CRC32 0x34672cb1 Table_map: `test`.`samplet` mapped to number 72
# at 1509
#160324 11:11:44 server id 1 end_log_pos 1549 CRC32 0x4383e9ee Delete_rows: table id 72
# at 1549
#160324 11:11:44 server id 1 end_log_pos 1612 CRC32 0x899eb398 Update_rows: table id 72 flags: STMT_END_F
### DELETE FROM `test`.`samplet`
### WHERE
### @1=2
### UPDATE `test`.`samplet`
### WHERE
### @1=4
### SET
### @1=2
### @2=4
### @3='replaced 2,4'
# at 1612
#160324 11:11:44 server id 1 end_log_pos 1643 CRC32 0x037a8fe1 Xid = 49977
COMMIT/*!*/;
# at 1643
#160324 11:11:54 server id 1 end_log_pos 1715 CRC32 0xb02520cd Query thread_id=7940 exec_time=0 error_code=0
SET TIMESTAMP=1458814314/*!*/;
BEGIN
/*!*/;
# at 1715
#160324 11:11:54 server id 1 end_log_pos 1769 CRC32 0xcbcf4323 Table_map: `test`.`samplet` mapped to number 72
# at 1769
#160324 11:11:54 server id 1 end_log_pos 1815 CRC32 0x4d52b057 Write_rows: table id 72 flags: STMT_END_F
### INSERT INTO `test`.`samplet`
### SET
### @1=7
### @2=7
### @3='7'
# at 1815
#160324 11:11:54 server id 1 end_log_pos 1846 CRC32 0x5289b6a4 Xid = 50001
COMMIT/*!*/;
# at 1846
#160324 11:11:59 server id 1 end_log_pos 1918 CRC32 0x1758ab97 Query thread_id=7940 exec_time=0 error_code=0
SET TIMESTAMP=1458814319/*!*/;
BEGIN
/*!*/;
# at 1918
#160324 11:11:59 server id 1 end_log_pos 1972 CRC32 0xa4602796 Table_map: `test`.`samplet` mapped to number 72
# at 1972
#160324 11:11:59 server id 1 end_log_pos 2018 CRC32 0x6a6eb0c9 Write_rows: table id 72 flags: STMT_END_F
### INSERT INTO `test`.`samplet`
### SET
### @1=8
### @2=8
### @3='8'
# at 2018
#160324 11:11:59 server id 1 end_log_pos 2049 CRC32 0x6d0fef4d Xid = 50014
COMMIT/*!*/;
# at 2049
#160324 11:12:12 server id 1 end_log_pos 2121 CRC32 0x6cd5da13 Query thread_id=7940 exec_time=0 error_code=0
SET TIMESTAMP=1458814332/*!*/;
BEGIN
/*!*/;
# at 2121
#160324 11:12:12 server id 1 end_log_pos 2175 CRC32 0x8339241f Table_map: `test`.`samplet` mapped to number 72
# at 2175
#160324 11:12:12 server id 1 end_log_pos 2220 CRC32 0x669385e1 Delete_rows: table id 72 flags: STMT_END_F
### DELETE FROM `test`.`samplet`
### WHERE
### @1=7
### DELETE FROM `test`.`samplet`
### WHERE
### @1=8
# at 2220
#160324 11:12:12 server id 1 end_log_pos 2251 CRC32 0xba81d2b0 Xid = 50038
COMMIT/*!*/;
# at 2251
#160324 11:12:20 server id 1 end_log_pos 2323 CRC32 0x4c58be8c Query thread_id=7940 exec_time=0 error_code=0
SET TIMESTAMP=1458814340/*!*/;
BEGIN
/*!*/;
# at 2323
#160324 11:12:20 server id 1 end_log_pos 2377 CRC32 0x9eb23ab9 Table_map: `test`.`samplet` mapped to number 72
# at 2377
#160324 11:12:20 server id 1 end_log_pos 2423 CRC32 0xac8116ec Write_rows: table id 72 flags: STMT_END_F
### INSERT INTO `test`.`samplet`
### SET
### @1=9
### @2=9
### @3='9'
# at 2423
#160324 11:12:20 server id 1 end_log_pos 2454 CRC32 0x5ce77ad6 Xid = 50051
COMMIT/*!*/;
# at 2454
#160324 11:12:50 server id 1 end_log_pos 2526 CRC32 0xed19acbd Query thread_id=7940 exec_time=26 error_code=0
SET TIMESTAMP=1458814370/*!*/;
BEGIN
/*!*/;
# at 2526
#160324 11:12:50 server id 1 end_log_pos 2580 CRC32 0x0bf6b98f Table_map: `test`.`samplet` mapped to number 72
# at 2580
#160324 11:12:50 server id 1 end_log_pos 2631 CRC32 0x263c4579 Update_rows: table id 72 flags: STMT_END_F
### UPDATE `test`.`samplet`
### WHERE
### @1=9
### SET
### @3='update 9'
# at 2631
#160324 11:13:00 server id 1 end_log_pos 2685 CRC32 0x94b24c8b Table_map: `test`.`samplet` mapped to number 72
# at 2685
#160324 11:13:00 server id 1 end_log_pos 2725 CRC32 0xca43fe3a Delete_rows: table id 72 flags: STMT_END_F
### DELETE FROM `test`.`samplet`
### WHERE
### @1=3
# at 2725
#160324 11:13:14 server id 1 end_log_pos 2779 CRC32 0xc36088a2 Table_map: `test`.`samplet` mapped to number 72
# at 2779
#160324 11:13:14 server id 1 end_log_pos 2826 CRC32 0x98fc9dea Write_rows: table id 72 flags: STMT_END_F
### INSERT INTO `test`.`samplet`
### SET
### @1=10
### @2=10
### @3='10'
# at 2826
#160324 11:13:14 server id 1 end_log_pos 2857 CRC32 0x729c371f Xid = 50163
COMMIT/*!*/;
# at 2857
#160324 11:13:31 server id 104 end_log_pos 2904 CRC32 0x38531c7d Rotate to mysql-bin.000053 pos: 4
DELIMITER ;
# End of log file
ROLLBACK /* added by mysqlbinlog */;
/*!50003 SET COMPLETION_TYPE=@OLD_COMPLETION_TYPE*/;
/*!50530 SET @@SESSION.PSEUDO_SLAVE_MODE=0*/;

View File

@ -1,3 +1,8 @@
/*
Copyright 2016 GitHub Inc.
See https://github.com/github/gh-osc/blob/master/LICENSE
*/
package main
import (

View File

@ -35,8 +35,6 @@ func execCmd(commandText string, arguments ...string) (*exec.Cmd, string, error)
shellArguments = append(shellArguments, arguments...)
log.Debugf("%+v", shellArguments)
return exec.Command("bash", shellArguments...), tmpFile.Name(), nil
//return exec.Command(commandText, arguments...) , "", nil
}
// CommandRun executes a command