2016-06-16 11:15:56 +02:00

284 lines
5.7 KiB
Go

package replication
import (
"flag"
"fmt"
"os"
"sync"
"testing"
"time"
"github.com/siddontang/go-mysql/client"
"github.com/siddontang/go-mysql/mysql"
. "gopkg.in/check.v1"
)
// Use docker mysql to test, mysql is 3306, mariadb is 3316
var testHost = flag.String("host", "127.0.0.1", "MySQL master host")
var testOutputLogs = flag.Bool("out", true, "output binlog event")
func TestBinLogSyncer(t *testing.T) {
TestingT(t)
}
type testSyncerSuite struct {
b *BinlogSyncer
c *client.Conn
wg sync.WaitGroup
flavor string
}
var _ = Suite(&testSyncerSuite{})
func (t *testSyncerSuite) SetUpSuite(c *C) {
}
func (t *testSyncerSuite) TearDownSuite(c *C) {
}
func (t *testSyncerSuite) SetUpTest(c *C) {
}
func (t *testSyncerSuite) TearDownTest(c *C) {
if t.b != nil {
t.b.Close()
t.b = nil
}
if t.c != nil {
t.c.Close()
t.c = nil
}
}
func (t *testSyncerSuite) testExecute(c *C, query string) {
_, err := t.c.Execute(query)
c.Assert(err, IsNil)
}
func (t *testSyncerSuite) testSync(c *C, s *BinlogStreamer) {
t.wg.Add(1)
go func() {
defer t.wg.Done()
if s == nil {
return
}
for {
e, err := s.GetEventTimeout(2 * time.Second)
if err != nil {
if err != ErrGetEventTimeout {
c.Fatal(err)
}
return
}
if *testOutputLogs {
e.Dump(os.Stdout)
os.Stdout.Sync()
}
}
}()
//use mixed format
t.testExecute(c, "SET SESSION binlog_format = 'MIXED'")
str := `DROP TABLE IF EXISTS test_replication`
t.testExecute(c, str)
str = `CREATE TABLE IF NOT EXISTS test_replication (
id BIGINT(64) UNSIGNED NOT NULL AUTO_INCREMENT,
str VARCHAR(256),
f FLOAT,
d DOUBLE,
de DECIMAL(10,2),
i INT,
bi BIGINT,
e enum ("e1", "e2"),
b BIT(8),
y YEAR,
da DATE,
ts TIMESTAMP,
dt DATETIME,
tm TIME,
t TEXT,
bb BLOB,
se SET('a', 'b', 'c'),
PRIMARY KEY (id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8`
t.testExecute(c, str)
//use row format
t.testExecute(c, "SET SESSION binlog_format = 'ROW'")
t.testExecute(c, `INSERT INTO test_replication (str, f, i, e, b, y, da, ts, dt, tm, de, t, bb, se)
VALUES ("3", -3.14, 10, "e1", 0b0011, 1985,
"2012-05-07", "2012-05-07 14:01:01", "2012-05-07 14:01:01",
"14:01:01", -45363.64, "abc", "12345", "a,b")`)
id := 100
if t.flavor == mysql.MySQLFlavor {
t.testExecute(c, "SET SESSION binlog_row_image = 'MINIMAL'")
t.testExecute(c, fmt.Sprintf(`INSERT INTO test_replication (id, str, f, i, bb, de) VALUES (%d, "4", -3.14, 100, "abc", -45635.64)`, id))
t.testExecute(c, fmt.Sprintf(`UPDATE test_replication SET f = -12.14, de = 555.34 WHERE id = %d`, id))
t.testExecute(c, fmt.Sprintf(`DELETE FROM test_replication WHERE id = %d`, id))
}
t.wg.Wait()
}
func (t *testSyncerSuite) setupTest(c *C, flavor string) {
var port uint16 = 3306
switch flavor {
case mysql.MariaDBFlavor:
port = 3316
}
t.flavor = flavor
var err error
if t.c != nil {
t.c.Close()
}
t.c, err = client.Connect(fmt.Sprintf("%s:%d", *testHost, port), "root", "", "")
if err != nil {
c.Skip(err.Error())
}
// _, err = t.c.Execute("CREATE DATABASE IF NOT EXISTS test")
// c.Assert(err, IsNil)
_, err = t.c.Execute("USE test")
c.Assert(err, IsNil)
if t.b != nil {
t.b.Close()
}
t.b = NewBinlogSyncer(100, flavor)
err = t.b.RegisterSlave(*testHost, port, "root", "")
c.Assert(err, IsNil)
}
func (t *testSyncerSuite) testPositionSync(c *C) {
//get current master binlog file and position
r, err := t.c.Execute("SHOW MASTER STATUS")
c.Assert(err, IsNil)
binFile, _ := r.GetString(0, 0)
binPos, _ := r.GetInt(0, 1)
s, err := t.b.StartSync(mysql.Position{binFile, uint32(binPos)})
c.Assert(err, IsNil)
t.testSync(c, s)
}
func (t *testSyncerSuite) TestMysqlPositionSync(c *C) {
t.setupTest(c, mysql.MySQLFlavor)
t.testPositionSync(c)
}
func (t *testSyncerSuite) TestMysqlGTIDSync(c *C) {
t.setupTest(c, mysql.MySQLFlavor)
r, err := t.c.Execute("SELECT @@gtid_mode")
c.Assert(err, IsNil)
modeOn, _ := r.GetString(0, 0)
if modeOn != "ON" {
c.Skip("GTID mode is not ON")
}
masterUuid, err := t.b.GetMasterUUID()
c.Assert(err, IsNil)
set, _ := mysql.ParseMysqlGTIDSet(fmt.Sprintf("%s:%d-%d", masterUuid.String(), 1, 2))
s, err := t.b.StartSyncGTID(set)
c.Assert(err, IsNil)
t.testSync(c, s)
}
func (t *testSyncerSuite) TestMariadbPositionSync(c *C) {
t.setupTest(c, mysql.MariaDBFlavor)
t.testPositionSync(c)
}
func (t *testSyncerSuite) TestMariadbGTIDSync(c *C) {
t.setupTest(c, mysql.MariaDBFlavor)
// get current master gtid binlog pos
r, err := t.c.Execute("SELECT @@gtid_binlog_pos")
c.Assert(err, IsNil)
str, _ := r.GetString(0, 0)
set, _ := mysql.ParseMariadbGTIDSet(str)
s, err := t.b.StartSyncGTID(set)
c.Assert(err, IsNil)
t.testSync(c, s)
}
func (t *testSyncerSuite) TestMysqlSemiPositionSync(c *C) {
t.setupTest(c, mysql.MySQLFlavor)
err := t.b.EnableSemiSync()
if err != nil {
c.Skip(fmt.Sprintf("mysql doest not support semi synchronous replication %v", err))
}
t.testPositionSync(c)
}
func (t *testSyncerSuite) TestMysqlBinlogCodec(c *C) {
t.setupTest(c, mysql.MySQLFlavor)
t.testExecute(c, "RESET MASTER")
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
t.testSync(c, nil)
t.testExecute(c, "FLUSH LOGS")
t.testSync(c, nil)
}()
os.RemoveAll("./var")
err := t.b.StartBackup("./var", mysql.Position{"", uint32(0)}, 2*time.Second)
c.Check(err, Equals, ErrGetEventTimeout)
p := NewBinlogParser()
f := func(e *BinlogEvent) error {
if *testOutputLogs {
e.Dump(os.Stdout)
os.Stdout.Sync()
}
return nil
}
err = p.ParseFile("./var/mysql.000001", 0, f)
c.Assert(err, IsNil)
err = p.ParseFile("./var/mysql.000002", 0, f)
c.Assert(err, IsNil)
}