From 8f3d13e0714bace07a7f347fa43dd923cf4c063e Mon Sep 17 00:00:00 2001 From: Shlomi Noach Date: Wed, 30 Mar 2016 15:43:40 +0200 Subject: [PATCH] still experimenting, yes? go-mysql binlog parser looks good --- go/binlog/gomysql_reader.go | 95 ++++++++++++++++++++++++++++ go/binlog/testdata/mysql-bin.000066 | Bin 0 -> 3687 bytes go/binlog/testdata/mysql-bin.000070 | Bin 0 -> 4257 bytes go/binlog/testdata/rbr-sample-1.txt | 28 ++++++++ go/binlog/testdata/rbr-sample-2.txt | 25 ++++++++ go/cmd/gh-osc/main.go | 22 ++++++- go/mysql/connection.go | 14 ++++ 7 files changed, 181 insertions(+), 3 deletions(-) create mode 100644 go/binlog/gomysql_reader.go create mode 100644 go/binlog/testdata/mysql-bin.000066 create mode 100644 go/binlog/testdata/mysql-bin.000070 create mode 100644 go/binlog/testdata/rbr-sample-1.txt create mode 100644 go/binlog/testdata/rbr-sample-2.txt create mode 100644 go/mysql/connection.go diff --git a/go/binlog/gomysql_reader.go b/go/binlog/gomysql_reader.go new file mode 100644 index 0000000..2aac7d3 --- /dev/null +++ b/go/binlog/gomysql_reader.go @@ -0,0 +1,95 @@ +/* + Copyright 2016 GitHub Inc. + See https://github.com/github/gh-osc/blob/master/LICENSE +*/ + +package binlog + +import ( + "fmt" + "os" + "reflect" + "strings" + + "github.com/github/gh-osc/go/mysql" + "github.com/outbrain/golib/log" + gomysql "github.com/siddontang/go-mysql/mysql" + "github.com/siddontang/go-mysql/replication" +) + +var () + +const ( + serverId = 99999 +) + +type GoMySQLReader struct { + connectionConfig *mysql.ConnectionConfig + binlogSyncer *replication.BinlogSyncer +} + +func NewGoMySQLReader(connectionConfig *mysql.ConnectionConfig) (binlogReader *GoMySQLReader, err error) { + binlogReader = &GoMySQLReader{ + connectionConfig: connectionConfig, + } + binlogReader.binlogSyncer = replication.NewBinlogSyncer(serverId, "mysql") + + // Register slave, the MySQL master is at 127.0.0.1:3306, with user root and an empty password + err = binlogReader.binlogSyncer.RegisterSlave(connectionConfig.Hostname, uint16(connectionConfig.Port), connectionConfig.User, connectionConfig.Password) + if err != nil { + return binlogReader, err + } + + return binlogReader, err +} + +func (this *GoMySQLReader) isDMLEvent(event *replication.BinlogEvent) bool { + eventType := event.Header.EventType.String() + if strings.HasPrefix(eventType, "WriteRows") { + return true + } + if strings.HasPrefix(eventType, "UpdateRows") { + return true + } + if strings.HasPrefix(eventType, "DeleteRows") { + return true + } + return false +} + +// ReadEntries will read binlog entries from parsed text output of `mysqlbinlog` utility +func (this *GoMySQLReader) ReadEntries(logFile string, startPos uint64, stopPos uint64) (entries [](*BinlogEntry), err error) { + // Start sync with sepcified binlog file and position + streamer, err := this.binlogSyncer.StartSync(gomysql.Position{logFile, uint32(startPos)}) + if err != nil { + return entries, err + } + + for { + ev, err := streamer.GetEvent() + if err != nil { + return entries, err + } + if rowsEvent, ok := ev.Event.(*replication.RowsEvent); ok { + if true { + fmt.Println(ev.Header.EventType) + fmt.Println(len(rowsEvent.Rows)) + + for _, rows := range rowsEvent.Rows { + for j, d := range rows { + if _, ok := d.([]byte); ok { + fmt.Print(fmt.Sprintf("yesbin %d:%q, %+v\n", j, d, reflect.TypeOf(d))) + } else { + fmt.Print(fmt.Sprintf("notbin %d:%#v, %+v\n", j, d, reflect.TypeOf(d))) + } + } + fmt.Println("---") + } + } else { + ev.Dump(os.Stdout) + } + } + } + log.Debugf("done") + return entries, err +} diff --git a/go/binlog/testdata/mysql-bin.000066 b/go/binlog/testdata/mysql-bin.000066 new file mode 100644 index 0000000000000000000000000000000000000000..77b03bd026eaaca85d48bfd497c647aacf1da419 GIT binary patch literal 3687 zcmd6qe@t6d6vywQ4=APlb}BgG&7di>{IozJLw>JR9aJF8kY&&xFHkHGXbS^kR8WI+ ze@xvp(-{+|%;_es^T#$5$s8oh66QZ<1c}a!#f(i8H{t{*cAj&2JbM!xmi{Hld9Ux@ z^S$SO&OP_Ohu`Luk}v$VHLjfy;sxsmBZMdUSp`{n#!OdN8*RA5OOi2$s7WH>;V+Sp z?SxNJgH)|nhrvyw(Ih7mF3g}gb|1tH=qayGcV8^#3mb7eqnzZ{)|w0ybgW? zw5y4h^NJoXp_i)qI*ZU)Uhzb=VBRcLZ!=n!lJ zf=6`siEd$IcAo#9XO$V8hX7|WR46mByG0mX@LFvyQIG@coDRV$d4+Dbv(xGx5S|tX z^n%N27bTB~9eP2sc8Wru)opLLx^)GH4833zyiRGr=IXNPg`^X&>Mef-D0=PYKLP#Pj4uD=`9PBg~&*7A<=Rc)-~LgNZ|?XuLGVl*ilc#1LCRMo>h9*w?XUwy5; z0Np(GrV29l?fR?mz_*oeDJR6YCwke6#SztA)8ru}b^?t%+jZChjj86U@s+6?iYe^f0vtL~fdIsP~Xq<7pW7Dq;%)`_> zPKNpT_+upuj0d_yfSLYtwuyl`!b~bCOy&OHmr+M|N+!&BhSAZTg7x7?8vQ~7eTONh z*dldKkXbIJ|1rz3ltL|qz;gQJ$W4Z2C)T8r;GirwUnyQdo$|df%IN6A;Dph*qQ(6W zJdWN9%p~TqGS zek55|uN=uj(f?-$`yOfCjgwcPi*R7gx^4U%b}Qw00J=liTR-jg%rP|IF|!IvbLqKv(@{suaWhOg zITkL;o?KZO`h}p8S0-Oy;K#SAKcHb@@Ny{wX{4j1$dH=XebW69gl`8OrmT&2(@!&a X>KyR&xH94Y%B&ptD=7S|?;-vl51Y$) literal 0 HcmV?d00001 diff --git a/go/binlog/testdata/mysql-bin.000070 b/go/binlog/testdata/mysql-bin.000070 new file mode 100644 index 0000000000000000000000000000000000000000..92ac64b3ae4a6e5c57f06326adf9f6c580f70928 GIT binary patch literal 4257 zcmd5=eN0nV6u*Uvr3htUwn!#CVobnP3J3^`+xQe36)G^InVY=VK4`JLme*IMW}69` zEQ~F%i7sX_n=?^F10geG%XA;Q=@v5lh?@>@G0T>WF*Dsv);KqI&h_!=&DKEr7fsH4 z?LGH*&-wlCxgYS4;F5%I|7tKf2_XttI~XB)lAl$Om0P&l?P;bB^L!*1#uEccB6|2F z5weZwQw#t#7z|PHG8&D`mJ!{7(_cI#Ld+-~v~_^p2P0&BFMUC5lK$`;`it>vOtepN z=+mLyK$hqf(WemlD6g%tarGso8!I^Ldah!NwXVL7YxME0Ubm<;a+XzGvnYu&uZRv# z2yi}8-YLr5D$B{?{%u+YUqgV47%Gws?6L@>a|$oGMNSQz;c{>;N#VS*tCg1n+zv5d z=G-p3DEUNmm^q1W6}g?fYlw4%~lZ91O%6ONON{R+V< zan48sJ^-tyJr}DSi9R~o^dP#J=n_^|w+0ZGinuH=KZA=R`spCLsPPxX{jXu~MEscs z)=PUopG{w6;L59a88Q>l9s!vaWoQLMW@$9UnwyM#CTUC~QwfM?85|{pnW7oQc14sN zq9cfX&}Lg+zI~8kxtk8tsIs)Y;B04Do&$FTEI+>bos(gCpN5%p%0{aGwrMPJs&iq{ z&8kCLVhdWFBJ+}uw<|7>l$<@jhl$t1;KG&&aa>=+#Cw;)(Rhm>7FuY$6&sFy#>5+A zaAD((OnsTo#7n}Fwb`ffVxR#nG+yqfwi}4kiZh4e1a;c>J2QLHO*bxFX#sFYFm2+! z^^Y>NhM4ri(pvq=@Fm2h;o7l+-Dhw}YdVNcXra8c#u&fX0sGa|trPJK&t<3tX;OJw z-Mw`TPafP6@H~6c{RYGH5DhbDdES11@&V#FM2Rq7t7U8X~!QT-?#i;?% z^2P9A=6pJc<%7nEgF|8C5?S=Rc{`jHx#qmdB8wk~(@J3-9nPsrv|*^`D!R2&*az+i zB-*g{%^Dm>+jP$`S%)nJPr>C65l1)O4R$UoyOfkZ*cxzR8 zExZ%a=&{`My0ZYg`D!am$-wC2lkh{|T(YTzkfH9l2Rv3&thZ{bkC24(Xu6A@J_j_W zTFXo73v2AQv7gJ51kRdoyW1Bdl)CeJV=6Orq~)u$#Ns~>o`s&oT2Wm-P}P!GO~_?I|5lR_->wOWKE96Eat4dJ>56o)?~%Atc1bQ%EN@wbcLV) z61$Nh(@1-}RWd`9%a1T*`oJ9lnX3M0Xi{2@++vanTO*IXv$qH1(`5t`M~>qBYa$?- z1z97(S%n`N?L5sWe2!9(RD};*>0)mS#<++S-n#CsG7O_BTngjNDiv1YklT_Qd8(m zh7C6O(fIDlD6>;S&012(9NfNVyJAedYw1y)deGL+!3;RBI^TToABwsWx84b3S-_4^H(tK*(;D!#xmUw~B(k#Mvo>c}>%J5J E0=JxosQ>@~ literal 0 HcmV?d00001 diff --git a/go/binlog/testdata/rbr-sample-1.txt b/go/binlog/testdata/rbr-sample-1.txt new file mode 100644 index 0000000..c75b42f --- /dev/null +++ b/go/binlog/testdata/rbr-sample-1.txt @@ -0,0 +1,28 @@ +/* + these are the statements that were used to execute the RBR log: + + drop table if exists samplet; + create table samplet(id int primary key, license int, name varchar(64), unique key license_uidx(license)) engine=innodb; + insert into samplet values (1,1,'a'); + insert into samplet values (2,2,'extended'),(3,3,'extended'); + begin; + insert into samplet values (4,4,'transaction'); + insert into samplet values (5,5,'transaction'); + insert into samplet values (6,6,'transaction'); + commit; + update samplet set name='update' where id=5; + replace into samplet values (2,4,'replaced 2,4'); + insert into samplet values (7,7,'7'); + insert into samplet values (8,8,'8'); + delete from samplet where id >= 7; + insert into samplet values (9,9,'9'); + begin; + update samplet set name='update 9' where id=9; + delete from samplet where license=3; + insert into samplet values (10,10,'10'); + commit; + update samplet set name='update 5,6' where id in (5,6); + begin; + delete from samplet where id=5; + rollback; +*/ diff --git a/go/binlog/testdata/rbr-sample-2.txt b/go/binlog/testdata/rbr-sample-2.txt new file mode 100644 index 0000000..676e46a --- /dev/null +++ b/go/binlog/testdata/rbr-sample-2.txt @@ -0,0 +1,25 @@ +drop table if exists samplet; +create table samplet(id int primary key, license int, name varchar(64), b tinyblob, unique key license_uidx(license)) engine=innodb; +insert into samplet (id, license, name) values (1,1,'a'); +insert into samplet (id, license, name) values (2,2,'extended'),(3,3,'extended'); +begin; +insert into samplet (id, license, name) values (4,4,'transaction'); +insert into samplet (id, license, name) values (5,5,'transaction'); +insert into samplet (id, license, name) values (6,6,'transaction'); +commit; +update samplet set name='update' where id=5; +replace into samplet (id, license, name) values (2,4,'replaced 2,4'); +insert into samplet (id, license, name, b) values (7,7,'7', x'89504E470D0A1A0A0000000D494844520000001000000010080200000090916836000000017352474200AECE1CE90000000467414D410000B18F0BFC6105000000097048597300000EC300000EC301C76FA8640000001E49444154384F6350DAE843126220493550F1A80662426C349406472801006AC91F1040F796BD0000000049454E44AE426082'); +insert into samplet (id, license, name) values (8,8,'8'); +delete from samplet where id >= 7; +insert into samplet (id, license, name) values (9,9,'9'); +begin; +update samplet set name='update 9', b=x'89504E470D0A1A0A0000000D494844520000001000000010080200000090916836000000017352474200AECE1CE90000000467414D410000B18F0BFC6105000000097048597300000EC300000EC301C76FA8640000001E49444154384F6350DAE843126220493550F1A80662426C349406472801006AC91F1040F796BD0000000049454E44AE426082' where id=9; +update samplet set name='update 9b' where id=9; +delete from samplet where license=3; +insert into samplet (id, license, name) values (10,10,'10'); +commit; +update samplet set name='update 5,6' where id in (5,6); +begin; +delete from samplet where id=5; +rollback; diff --git a/go/cmd/gh-osc/main.go b/go/cmd/gh-osc/main.go index 4cb8a10..ce1823a 100644 --- a/go/cmd/gh-osc/main.go +++ b/go/cmd/gh-osc/main.go @@ -11,15 +11,24 @@ import ( "os" "github.com/github/gh-osc/go/binlog" + "github.com/github/gh-osc/go/mysql" "github.com/outbrain/golib/log" ) // main is the application's entry point. It will either spawn a CLI or HTTP itnerfaces. func main() { - mysqlBasedir := flag.String("mysql-basedir", "", "the --basedir config for MySQL (auto-detected if not given)") - mysqlDatadir := flag.String("mysql-datadir", "", "the --datadir config for MySQL (auto-detected if not given)") + var connectionConfig mysql.ConnectionConfig + + // mysqlBasedir := flag.String("mysql-basedir", "", "the --basedir config for MySQL (auto-detected if not given)") + // mysqlDatadir := flag.String("mysql-datadir", "", "the --datadir config for MySQL (auto-detected if not given)") internalExperiment := flag.Bool("internal-experiment", false, "issue an internal experiment") binlogFile := flag.String("binlog-file", "", "Name of binary log file") + + flag.StringVar(&connectionConfig.Hostname, "host", "127.0.0.1", "MySQL hostname (preferably a replica, not the master)") + flag.IntVar(&connectionConfig.Port, "port", 3306, "MySQL port (preferably a replica, not the master)") + flag.StringVar(&connectionConfig.User, "user", "root", "MySQL user") + flag.StringVar(&connectionConfig.Password, "password", "", "MySQL password") + quiet := flag.Bool("quiet", false, "quiet") verbose := flag.Bool("verbose", false, "verbose") debug := flag.Bool("debug", false, "debug mode (very verbose)") @@ -51,7 +60,14 @@ func main() { if *internalExperiment { log.Debug("starting experiment") - binlogReader := binlog.NewMySQLBinlogReader(*mysqlBasedir, *mysqlDatadir) + var binlogReader binlog.BinlogReader + var err error + + //binlogReader = binlog.NewMySQLBinlogReader(*mysqlBasedir, *mysqlDatadir) + binlogReader, err = binlog.NewGoMySQLReader(&connectionConfig) + if err != nil { + log.Fatale(err) + } binlogReader.ReadEntries(*binlogFile, 0, 0) } } diff --git a/go/mysql/connection.go b/go/mysql/connection.go new file mode 100644 index 0000000..fc4115b --- /dev/null +++ b/go/mysql/connection.go @@ -0,0 +1,14 @@ +/* + Copyright 2016 GitHub Inc. + See https://github.com/github/gh-osc/blob/master/LICENSE +*/ + +package mysql + +// ConnectionConfig is the minimal configuration required to connect to a MySQL server +type ConnectionConfig struct { + Hostname string + Port int + User string + Password string +}