still experimenting, yes? go-mysql binlog parser looks good

This commit is contained in:
Shlomi Noach 2016-03-30 15:43:40 +02:00
parent e3210a9fa2
commit 8f3d13e071
7 changed files with 181 additions and 3 deletions

View File

@ -0,0 +1,95 @@
/*
Copyright 2016 GitHub Inc.
See https://github.com/github/gh-osc/blob/master/LICENSE
*/
package binlog
import (
"fmt"
"os"
"reflect"
"strings"
"github.com/github/gh-osc/go/mysql"
"github.com/outbrain/golib/log"
gomysql "github.com/siddontang/go-mysql/mysql"
"github.com/siddontang/go-mysql/replication"
)
var ()
const (
serverId = 99999
)
type GoMySQLReader struct {
connectionConfig *mysql.ConnectionConfig
binlogSyncer *replication.BinlogSyncer
}
func NewGoMySQLReader(connectionConfig *mysql.ConnectionConfig) (binlogReader *GoMySQLReader, err error) {
binlogReader = &GoMySQLReader{
connectionConfig: connectionConfig,
}
binlogReader.binlogSyncer = replication.NewBinlogSyncer(serverId, "mysql")
// Register slave, the MySQL master is at 127.0.0.1:3306, with user root and an empty password
err = binlogReader.binlogSyncer.RegisterSlave(connectionConfig.Hostname, uint16(connectionConfig.Port), connectionConfig.User, connectionConfig.Password)
if err != nil {
return binlogReader, err
}
return binlogReader, err
}
func (this *GoMySQLReader) isDMLEvent(event *replication.BinlogEvent) bool {
eventType := event.Header.EventType.String()
if strings.HasPrefix(eventType, "WriteRows") {
return true
}
if strings.HasPrefix(eventType, "UpdateRows") {
return true
}
if strings.HasPrefix(eventType, "DeleteRows") {
return true
}
return false
}
// ReadEntries will read binlog entries from parsed text output of `mysqlbinlog` utility
func (this *GoMySQLReader) ReadEntries(logFile string, startPos uint64, stopPos uint64) (entries [](*BinlogEntry), err error) {
// Start sync with sepcified binlog file and position
streamer, err := this.binlogSyncer.StartSync(gomysql.Position{logFile, uint32(startPos)})
if err != nil {
return entries, err
}
for {
ev, err := streamer.GetEvent()
if err != nil {
return entries, err
}
if rowsEvent, ok := ev.Event.(*replication.RowsEvent); ok {
if true {
fmt.Println(ev.Header.EventType)
fmt.Println(len(rowsEvent.Rows))
for _, rows := range rowsEvent.Rows {
for j, d := range rows {
if _, ok := d.([]byte); ok {
fmt.Print(fmt.Sprintf("yesbin %d:%q, %+v\n", j, d, reflect.TypeOf(d)))
} else {
fmt.Print(fmt.Sprintf("notbin %d:%#v, %+v\n", j, d, reflect.TypeOf(d)))
}
}
fmt.Println("---")
}
} else {
ev.Dump(os.Stdout)
}
}
}
log.Debugf("done")
return entries, err
}

BIN
go/binlog/testdata/mysql-bin.000066 vendored Normal file

Binary file not shown.

BIN
go/binlog/testdata/mysql-bin.000070 vendored Normal file

Binary file not shown.

28
go/binlog/testdata/rbr-sample-1.txt vendored Normal file
View File

@ -0,0 +1,28 @@
/*
these are the statements that were used to execute the RBR log:
drop table if exists samplet;
create table samplet(id int primary key, license int, name varchar(64), unique key license_uidx(license)) engine=innodb;
insert into samplet values (1,1,'a');
insert into samplet values (2,2,'extended'),(3,3,'extended');
begin;
insert into samplet values (4,4,'transaction');
insert into samplet values (5,5,'transaction');
insert into samplet values (6,6,'transaction');
commit;
update samplet set name='update' where id=5;
replace into samplet values (2,4,'replaced 2,4');
insert into samplet values (7,7,'7');
insert into samplet values (8,8,'8');
delete from samplet where id >= 7;
insert into samplet values (9,9,'9');
begin;
update samplet set name='update 9' where id=9;
delete from samplet where license=3;
insert into samplet values (10,10,'10');
commit;
update samplet set name='update 5,6' where id in (5,6);
begin;
delete from samplet where id=5;
rollback;
*/

25
go/binlog/testdata/rbr-sample-2.txt vendored Normal file
View File

@ -0,0 +1,25 @@
drop table if exists samplet;
create table samplet(id int primary key, license int, name varchar(64), b tinyblob, unique key license_uidx(license)) engine=innodb;
insert into samplet (id, license, name) values (1,1,'a');
insert into samplet (id, license, name) values (2,2,'extended'),(3,3,'extended');
begin;
insert into samplet (id, license, name) values (4,4,'transaction');
insert into samplet (id, license, name) values (5,5,'transaction');
insert into samplet (id, license, name) values (6,6,'transaction');
commit;
update samplet set name='update' where id=5;
replace into samplet (id, license, name) values (2,4,'replaced 2,4');
insert into samplet (id, license, name, b) values (7,7,'7', x'89504E470D0A1A0A0000000D494844520000001000000010080200000090916836000000017352474200AECE1CE90000000467414D410000B18F0BFC6105000000097048597300000EC300000EC301C76FA8640000001E49444154384F6350DAE843126220493550F1A80662426C349406472801006AC91F1040F796BD0000000049454E44AE426082');
insert into samplet (id, license, name) values (8,8,'8');
delete from samplet where id >= 7;
insert into samplet (id, license, name) values (9,9,'9');
begin;
update samplet set name='update 9', b=x'89504E470D0A1A0A0000000D494844520000001000000010080200000090916836000000017352474200AECE1CE90000000467414D410000B18F0BFC6105000000097048597300000EC300000EC301C76FA8640000001E49444154384F6350DAE843126220493550F1A80662426C349406472801006AC91F1040F796BD0000000049454E44AE426082' where id=9;
update samplet set name='update 9b' where id=9;
delete from samplet where license=3;
insert into samplet (id, license, name) values (10,10,'10');
commit;
update samplet set name='update 5,6' where id in (5,6);
begin;
delete from samplet where id=5;
rollback;

View File

@ -11,15 +11,24 @@ import (
"os"
"github.com/github/gh-osc/go/binlog"
"github.com/github/gh-osc/go/mysql"
"github.com/outbrain/golib/log"
)
// main is the application's entry point. It will either spawn a CLI or HTTP itnerfaces.
func main() {
mysqlBasedir := flag.String("mysql-basedir", "", "the --basedir config for MySQL (auto-detected if not given)")
mysqlDatadir := flag.String("mysql-datadir", "", "the --datadir config for MySQL (auto-detected if not given)")
var connectionConfig mysql.ConnectionConfig
// mysqlBasedir := flag.String("mysql-basedir", "", "the --basedir config for MySQL (auto-detected if not given)")
// mysqlDatadir := flag.String("mysql-datadir", "", "the --datadir config for MySQL (auto-detected if not given)")
internalExperiment := flag.Bool("internal-experiment", false, "issue an internal experiment")
binlogFile := flag.String("binlog-file", "", "Name of binary log file")
flag.StringVar(&connectionConfig.Hostname, "host", "127.0.0.1", "MySQL hostname (preferably a replica, not the master)")
flag.IntVar(&connectionConfig.Port, "port", 3306, "MySQL port (preferably a replica, not the master)")
flag.StringVar(&connectionConfig.User, "user", "root", "MySQL user")
flag.StringVar(&connectionConfig.Password, "password", "", "MySQL password")
quiet := flag.Bool("quiet", false, "quiet")
verbose := flag.Bool("verbose", false, "verbose")
debug := flag.Bool("debug", false, "debug mode (very verbose)")
@ -51,7 +60,14 @@ func main() {
if *internalExperiment {
log.Debug("starting experiment")
binlogReader := binlog.NewMySQLBinlogReader(*mysqlBasedir, *mysqlDatadir)
var binlogReader binlog.BinlogReader
var err error
//binlogReader = binlog.NewMySQLBinlogReader(*mysqlBasedir, *mysqlDatadir)
binlogReader, err = binlog.NewGoMySQLReader(&connectionConfig)
if err != nil {
log.Fatale(err)
}
binlogReader.ReadEntries(*binlogFile, 0, 0)
}
}

14
go/mysql/connection.go Normal file
View File

@ -0,0 +1,14 @@
/*
Copyright 2016 GitHub Inc.
See https://github.com/github/gh-osc/blob/master/LICENSE
*/
package mysql
// ConnectionConfig is the minimal configuration required to connect to a MySQL server
type ConnectionConfig struct {
Hostname string
Port int
User string
Password string
}