diff --git a/go/binlog/gomysql_reader.go b/go/binlog/gomysql_reader.go new file mode 100644 index 0000000..2aac7d3 --- /dev/null +++ b/go/binlog/gomysql_reader.go @@ -0,0 +1,95 @@ +/* + Copyright 2016 GitHub Inc. + See https://github.com/github/gh-osc/blob/master/LICENSE +*/ + +package binlog + +import ( + "fmt" + "os" + "reflect" + "strings" + + "github.com/github/gh-osc/go/mysql" + "github.com/outbrain/golib/log" + gomysql "github.com/siddontang/go-mysql/mysql" + "github.com/siddontang/go-mysql/replication" +) + +var () + +const ( + serverId = 99999 +) + +type GoMySQLReader struct { + connectionConfig *mysql.ConnectionConfig + binlogSyncer *replication.BinlogSyncer +} + +func NewGoMySQLReader(connectionConfig *mysql.ConnectionConfig) (binlogReader *GoMySQLReader, err error) { + binlogReader = &GoMySQLReader{ + connectionConfig: connectionConfig, + } + binlogReader.binlogSyncer = replication.NewBinlogSyncer(serverId, "mysql") + + // Register slave, the MySQL master is at 127.0.0.1:3306, with user root and an empty password + err = binlogReader.binlogSyncer.RegisterSlave(connectionConfig.Hostname, uint16(connectionConfig.Port), connectionConfig.User, connectionConfig.Password) + if err != nil { + return binlogReader, err + } + + return binlogReader, err +} + +func (this *GoMySQLReader) isDMLEvent(event *replication.BinlogEvent) bool { + eventType := event.Header.EventType.String() + if strings.HasPrefix(eventType, "WriteRows") { + return true + } + if strings.HasPrefix(eventType, "UpdateRows") { + return true + } + if strings.HasPrefix(eventType, "DeleteRows") { + return true + } + return false +} + +// ReadEntries will read binlog entries from parsed text output of `mysqlbinlog` utility +func (this *GoMySQLReader) ReadEntries(logFile string, startPos uint64, stopPos uint64) (entries [](*BinlogEntry), err error) { + // Start sync with sepcified binlog file and position + streamer, err := this.binlogSyncer.StartSync(gomysql.Position{logFile, uint32(startPos)}) + if err != nil { + return entries, err + } + + for { + ev, err := streamer.GetEvent() + if err != nil { + return entries, err + } + if rowsEvent, ok := ev.Event.(*replication.RowsEvent); ok { + if true { + fmt.Println(ev.Header.EventType) + fmt.Println(len(rowsEvent.Rows)) + + for _, rows := range rowsEvent.Rows { + for j, d := range rows { + if _, ok := d.([]byte); ok { + fmt.Print(fmt.Sprintf("yesbin %d:%q, %+v\n", j, d, reflect.TypeOf(d))) + } else { + fmt.Print(fmt.Sprintf("notbin %d:%#v, %+v\n", j, d, reflect.TypeOf(d))) + } + } + fmt.Println("---") + } + } else { + ev.Dump(os.Stdout) + } + } + } + log.Debugf("done") + return entries, err +} diff --git a/go/binlog/testdata/mysql-bin.000066 b/go/binlog/testdata/mysql-bin.000066 new file mode 100644 index 0000000..77b03bd Binary files /dev/null and b/go/binlog/testdata/mysql-bin.000066 differ diff --git a/go/binlog/testdata/mysql-bin.000070 b/go/binlog/testdata/mysql-bin.000070 new file mode 100644 index 0000000..92ac64b Binary files /dev/null and b/go/binlog/testdata/mysql-bin.000070 differ diff --git a/go/binlog/testdata/rbr-sample-1.txt b/go/binlog/testdata/rbr-sample-1.txt new file mode 100644 index 0000000..c75b42f --- /dev/null +++ b/go/binlog/testdata/rbr-sample-1.txt @@ -0,0 +1,28 @@ +/* + these are the statements that were used to execute the RBR log: + + drop table if exists samplet; + create table samplet(id int primary key, license int, name varchar(64), unique key license_uidx(license)) engine=innodb; + insert into samplet values (1,1,'a'); + insert into samplet values (2,2,'extended'),(3,3,'extended'); + begin; + insert into samplet values (4,4,'transaction'); + insert into samplet values (5,5,'transaction'); + insert into samplet values (6,6,'transaction'); + commit; + update samplet set name='update' where id=5; + replace into samplet values (2,4,'replaced 2,4'); + insert into samplet values (7,7,'7'); + insert into samplet values (8,8,'8'); + delete from samplet where id >= 7; + insert into samplet values (9,9,'9'); + begin; + update samplet set name='update 9' where id=9; + delete from samplet where license=3; + insert into samplet values (10,10,'10'); + commit; + update samplet set name='update 5,6' where id in (5,6); + begin; + delete from samplet where id=5; + rollback; +*/ diff --git a/go/binlog/testdata/rbr-sample-2.txt b/go/binlog/testdata/rbr-sample-2.txt new file mode 100644 index 0000000..676e46a --- /dev/null +++ b/go/binlog/testdata/rbr-sample-2.txt @@ -0,0 +1,25 @@ +drop table if exists samplet; +create table samplet(id int primary key, license int, name varchar(64), b tinyblob, unique key license_uidx(license)) engine=innodb; +insert into samplet (id, license, name) values (1,1,'a'); +insert into samplet (id, license, name) values (2,2,'extended'),(3,3,'extended'); +begin; +insert into samplet (id, license, name) values (4,4,'transaction'); +insert into samplet (id, license, name) values (5,5,'transaction'); +insert into samplet (id, license, name) values (6,6,'transaction'); +commit; +update samplet set name='update' where id=5; +replace into samplet (id, license, name) values (2,4,'replaced 2,4'); +insert into samplet (id, license, name, b) values (7,7,'7', x'89504E470D0A1A0A0000000D494844520000001000000010080200000090916836000000017352474200AECE1CE90000000467414D410000B18F0BFC6105000000097048597300000EC300000EC301C76FA8640000001E49444154384F6350DAE843126220493550F1A80662426C349406472801006AC91F1040F796BD0000000049454E44AE426082'); +insert into samplet (id, license, name) values (8,8,'8'); +delete from samplet where id >= 7; +insert into samplet (id, license, name) values (9,9,'9'); +begin; +update samplet set name='update 9', b=x'89504E470D0A1A0A0000000D494844520000001000000010080200000090916836000000017352474200AECE1CE90000000467414D410000B18F0BFC6105000000097048597300000EC300000EC301C76FA8640000001E49444154384F6350DAE843126220493550F1A80662426C349406472801006AC91F1040F796BD0000000049454E44AE426082' where id=9; +update samplet set name='update 9b' where id=9; +delete from samplet where license=3; +insert into samplet (id, license, name) values (10,10,'10'); +commit; +update samplet set name='update 5,6' where id in (5,6); +begin; +delete from samplet where id=5; +rollback; diff --git a/go/cmd/gh-osc/main.go b/go/cmd/gh-osc/main.go index 4cb8a10..ce1823a 100644 --- a/go/cmd/gh-osc/main.go +++ b/go/cmd/gh-osc/main.go @@ -11,15 +11,24 @@ import ( "os" "github.com/github/gh-osc/go/binlog" + "github.com/github/gh-osc/go/mysql" "github.com/outbrain/golib/log" ) // main is the application's entry point. It will either spawn a CLI or HTTP itnerfaces. func main() { - mysqlBasedir := flag.String("mysql-basedir", "", "the --basedir config for MySQL (auto-detected if not given)") - mysqlDatadir := flag.String("mysql-datadir", "", "the --datadir config for MySQL (auto-detected if not given)") + var connectionConfig mysql.ConnectionConfig + + // mysqlBasedir := flag.String("mysql-basedir", "", "the --basedir config for MySQL (auto-detected if not given)") + // mysqlDatadir := flag.String("mysql-datadir", "", "the --datadir config for MySQL (auto-detected if not given)") internalExperiment := flag.Bool("internal-experiment", false, "issue an internal experiment") binlogFile := flag.String("binlog-file", "", "Name of binary log file") + + flag.StringVar(&connectionConfig.Hostname, "host", "127.0.0.1", "MySQL hostname (preferably a replica, not the master)") + flag.IntVar(&connectionConfig.Port, "port", 3306, "MySQL port (preferably a replica, not the master)") + flag.StringVar(&connectionConfig.User, "user", "root", "MySQL user") + flag.StringVar(&connectionConfig.Password, "password", "", "MySQL password") + quiet := flag.Bool("quiet", false, "quiet") verbose := flag.Bool("verbose", false, "verbose") debug := flag.Bool("debug", false, "debug mode (very verbose)") @@ -51,7 +60,14 @@ func main() { if *internalExperiment { log.Debug("starting experiment") - binlogReader := binlog.NewMySQLBinlogReader(*mysqlBasedir, *mysqlDatadir) + var binlogReader binlog.BinlogReader + var err error + + //binlogReader = binlog.NewMySQLBinlogReader(*mysqlBasedir, *mysqlDatadir) + binlogReader, err = binlog.NewGoMySQLReader(&connectionConfig) + if err != nil { + log.Fatale(err) + } binlogReader.ReadEntries(*binlogFile, 0, 0) } } diff --git a/go/mysql/connection.go b/go/mysql/connection.go new file mode 100644 index 0000000..fc4115b --- /dev/null +++ b/go/mysql/connection.go @@ -0,0 +1,14 @@ +/* + Copyright 2016 GitHub Inc. + See https://github.com/github/gh-osc/blob/master/LICENSE +*/ + +package mysql + +// ConnectionConfig is the minimal configuration required to connect to a MySQL server +type ConnectionConfig struct { + Hostname string + Port int + User string + Password string +}