2017-02-12 13:13:54 +02:00

308 lines
6.2 KiB
Go

package canal
import (
"fmt"
"io/ioutil"
"os"
"path"
"strconv"
"strings"
"sync"
"github.com/juju/errors"
"github.com/ngaut/log"
"github.com/siddontang/go-mysql/client"
"github.com/siddontang/go-mysql/dump"
"github.com/siddontang/go-mysql/mysql"
"github.com/siddontang/go-mysql/replication"
"github.com/siddontang/go-mysql/schema"
"github.com/siddontang/go/sync2"
)
var errCanalClosed = errors.New("canal was closed")
// Canal can sync your MySQL data into everywhere, like Elasticsearch, Redis, etc...
// MySQL must open row format for binlog
type Canal struct {
m sync.Mutex
cfg *Config
master *masterInfo
dumper *dump.Dumper
dumpDoneCh chan struct{}
syncer *replication.BinlogSyncer
rsLock sync.Mutex
rsHandlers []RowsEventHandler
connLock sync.Mutex
conn *client.Conn
wg sync.WaitGroup
tableLock sync.Mutex
tables map[string]*schema.Table
quit chan struct{}
closed sync2.AtomicBool
}
func NewCanal(cfg *Config) (*Canal, error) {
c := new(Canal)
c.cfg = cfg
c.closed.Set(false)
c.quit = make(chan struct{})
os.MkdirAll(cfg.DataDir, 0755)
c.dumpDoneCh = make(chan struct{})
c.rsHandlers = make([]RowsEventHandler, 0, 4)
c.tables = make(map[string]*schema.Table)
var err error
if c.master, err = loadMasterInfo(c.masterInfoPath()); err != nil {
return nil, errors.Trace(err)
} else if len(c.master.Addr) != 0 && c.master.Addr != c.cfg.Addr {
log.Infof("MySQL addr %s in old master.info, but new %s, reset", c.master.Addr, c.cfg.Addr)
// may use another MySQL, reset
c.master = &masterInfo{}
}
c.master.Addr = c.cfg.Addr
if err := c.prepareDumper(); err != nil {
return nil, errors.Trace(err)
}
if err = c.prepareSyncer(); err != nil {
return nil, errors.Trace(err)
}
if err := c.checkBinlogRowFormat(); err != nil {
return nil, errors.Trace(err)
}
return c, nil
}
func (c *Canal) prepareDumper() error {
var err error
dumpPath := c.cfg.Dump.ExecutionPath
if len(dumpPath) == 0 {
// ignore mysqldump, use binlog only
return nil
}
if c.dumper, err = dump.NewDumper(dumpPath,
c.cfg.Addr, c.cfg.User, c.cfg.Password); err != nil {
return errors.Trace(err)
}
if c.dumper == nil {
//no mysqldump, use binlog only
return nil
}
dbs := c.cfg.Dump.Databases
tables := c.cfg.Dump.Tables
tableDB := c.cfg.Dump.TableDB
if len(tables) == 0 {
c.dumper.AddDatabases(dbs...)
} else {
c.dumper.AddTables(tableDB, tables...)
}
for _, ignoreTable := range c.cfg.Dump.IgnoreTables {
if seps := strings.Split(ignoreTable, ","); len(seps) == 2 {
c.dumper.AddIgnoreTables(seps[0], seps[1])
}
}
if c.cfg.Dump.DiscardErr {
c.dumper.SetErrOut(ioutil.Discard)
} else {
c.dumper.SetErrOut(os.Stderr)
}
return nil
}
func (c *Canal) Start() error {
c.wg.Add(1)
go c.run()
return nil
}
func (c *Canal) run() error {
defer c.wg.Done()
if err := c.tryDump(); err != nil {
log.Errorf("canal dump mysql err: %v", err)
return errors.Trace(err)
}
close(c.dumpDoneCh)
if err := c.startSyncBinlog(); err != nil {
if !c.isClosed() {
log.Errorf("canal start sync binlog err: %v", err)
}
return errors.Trace(err)
}
return nil
}
func (c *Canal) isClosed() bool {
return c.closed.Get()
}
func (c *Canal) Close() {
log.Infof("close canal")
c.m.Lock()
defer c.m.Unlock()
if c.isClosed() {
return
}
c.closed.Set(true)
close(c.quit)
c.connLock.Lock()
c.conn.Close()
c.conn = nil
c.connLock.Unlock()
if c.syncer != nil {
c.syncer.Close()
c.syncer = nil
}
c.master.Close()
c.wg.Wait()
}
func (c *Canal) WaitDumpDone() <-chan struct{} {
return c.dumpDoneCh
}
func (c *Canal) GetTable(db string, table string) (*schema.Table, error) {
key := fmt.Sprintf("%s.%s", db, table)
c.tableLock.Lock()
t, ok := c.tables[key]
c.tableLock.Unlock()
if ok {
return t, nil
}
t, err := schema.NewTable(c, db, table)
if err != nil {
return nil, errors.Trace(err)
}
c.tableLock.Lock()
c.tables[key] = t
c.tableLock.Unlock()
return t, nil
}
// Check MySQL binlog row image, must be in FULL, MINIMAL, NOBLOB
func (c *Canal) CheckBinlogRowImage(image string) error {
// need to check MySQL binlog row image? full, minimal or noblob?
// now only log
if c.cfg.Flavor == mysql.MySQLFlavor {
if res, err := c.Execute(`SHOW GLOBAL VARIABLES LIKE "binlog_row_image"`); err != nil {
return errors.Trace(err)
} else {
// MySQL has binlog row image from 5.6, so older will return empty
rowImage, _ := res.GetString(0, 1)
if rowImage != "" && !strings.EqualFold(rowImage, image) {
return errors.Errorf("MySQL uses %s binlog row image, but we want %s", rowImage, image)
}
}
}
return nil
}
func (c *Canal) checkBinlogRowFormat() error {
res, err := c.Execute(`SHOW GLOBAL VARIABLES LIKE "binlog_format";`)
if err != nil {
return errors.Trace(err)
} else if f, _ := res.GetString(0, 1); f != "ROW" {
return errors.Errorf("binlog must ROW format, but %s now", f)
}
return nil
}
func (c *Canal) prepareSyncer() error {
seps := strings.Split(c.cfg.Addr, ":")
if len(seps) != 2 {
return errors.Errorf("invalid mysql addr format %s, must host:port", c.cfg.Addr)
}
port, err := strconv.ParseUint(seps[1], 10, 16)
if err != nil {
return errors.Trace(err)
}
cfg := replication.BinlogSyncerConfig{
ServerID: c.cfg.ServerID,
Flavor: c.cfg.Flavor,
Host: seps[0],
Port: uint16(port),
User: c.cfg.User,
Password: c.cfg.Password,
}
c.syncer = replication.NewBinlogSyncer(&cfg)
return nil
}
func (c *Canal) masterInfoPath() string {
return path.Join(c.cfg.DataDir, "master.info")
}
// Execute a SQL
func (c *Canal) Execute(cmd string, args ...interface{}) (rr *mysql.Result, err error) {
c.connLock.Lock()
defer c.connLock.Unlock()
retryNum := 3
for i := 0; i < retryNum; i++ {
if c.conn == nil {
c.conn, err = client.Connect(c.cfg.Addr, c.cfg.User, c.cfg.Password, "")
if err != nil {
return nil, errors.Trace(err)
}
}
rr, err = c.conn.Execute(cmd, args...)
if err != nil && !mysql.ErrorEqual(err, mysql.ErrBadConn) {
return
} else if mysql.ErrorEqual(err, mysql.ErrBadConn) {
c.conn.Close()
c.conn = nil
continue
} else {
return
}
}
return
}
func (c *Canal) SyncedPosition() mysql.Position {
return c.master.Pos()
}