138 lines
3.1 KiB
Go
138 lines
3.1 KiB
Go
|
package canal
|
||
|
|
||
|
import (
|
||
|
"time"
|
||
|
|
||
|
"golang.org/x/net/context"
|
||
|
|
||
|
"github.com/juju/errors"
|
||
|
"github.com/ngaut/log"
|
||
|
"github.com/siddontang/go-mysql/mysql"
|
||
|
"github.com/siddontang/go-mysql/replication"
|
||
|
)
|
||
|
|
||
|
func (c *Canal) startSyncBinlog() error {
|
||
|
pos := mysql.Position{c.master.Name, c.master.Position}
|
||
|
|
||
|
log.Infof("start sync binlog at %v", pos)
|
||
|
|
||
|
s, err := c.syncer.StartSync(pos)
|
||
|
if err != nil {
|
||
|
return errors.Errorf("start sync replication at %v error %v", pos, err)
|
||
|
}
|
||
|
|
||
|
timeout := time.Second
|
||
|
forceSavePos := false
|
||
|
for {
|
||
|
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
||
|
ev, err := s.GetEvent(ctx)
|
||
|
cancel()
|
||
|
|
||
|
if err == context.DeadlineExceeded {
|
||
|
timeout = 2 * timeout
|
||
|
continue
|
||
|
}
|
||
|
|
||
|
if err != nil {
|
||
|
return errors.Trace(err)
|
||
|
}
|
||
|
|
||
|
timeout = time.Second
|
||
|
|
||
|
//next binlog pos
|
||
|
pos.Pos = ev.Header.LogPos
|
||
|
|
||
|
forceSavePos = false
|
||
|
|
||
|
// We only save position with RotateEvent and XIDEvent.
|
||
|
// For RowsEvent, we can't save the position until meeting XIDEvent
|
||
|
// which tells the whole transaction is over.
|
||
|
// TODO: If we meet any DDL query, we must save too.
|
||
|
switch e := ev.Event.(type) {
|
||
|
case *replication.RotateEvent:
|
||
|
pos.Name = string(e.NextLogName)
|
||
|
pos.Pos = uint32(e.Position)
|
||
|
// r.ev <- pos
|
||
|
forceSavePos = true
|
||
|
log.Infof("rotate binlog to %v", pos)
|
||
|
case *replication.RowsEvent:
|
||
|
// we only focus row based event
|
||
|
if err = c.handleRowsEvent(ev); err != nil {
|
||
|
log.Errorf("handle rows event error %v", err)
|
||
|
return errors.Trace(err)
|
||
|
}
|
||
|
continue
|
||
|
case *replication.XIDEvent:
|
||
|
// try to save the position later
|
||
|
default:
|
||
|
continue
|
||
|
}
|
||
|
|
||
|
c.master.Update(pos.Name, pos.Pos)
|
||
|
c.master.Save(forceSavePos)
|
||
|
}
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (c *Canal) handleRowsEvent(e *replication.BinlogEvent) error {
|
||
|
ev := e.Event.(*replication.RowsEvent)
|
||
|
|
||
|
// Caveat: table may be altered at runtime.
|
||
|
schema := string(ev.Table.Schema)
|
||
|
table := string(ev.Table.Table)
|
||
|
|
||
|
t, err := c.GetTable(schema, table)
|
||
|
if err != nil {
|
||
|
return errors.Trace(err)
|
||
|
}
|
||
|
var action string
|
||
|
switch e.Header.EventType {
|
||
|
case replication.WRITE_ROWS_EVENTv1, replication.WRITE_ROWS_EVENTv2:
|
||
|
action = InsertAction
|
||
|
case replication.DELETE_ROWS_EVENTv1, replication.DELETE_ROWS_EVENTv2:
|
||
|
action = DeleteAction
|
||
|
case replication.UPDATE_ROWS_EVENTv1, replication.UPDATE_ROWS_EVENTv2:
|
||
|
action = UpdateAction
|
||
|
default:
|
||
|
return errors.Errorf("%s not supported now", e.Header.EventType)
|
||
|
}
|
||
|
events := newRowsEvent(t, action, ev.Rows)
|
||
|
return c.travelRowsEventHandler(events)
|
||
|
}
|
||
|
|
||
|
func (c *Canal) WaitUntilPos(pos mysql.Position, timeout int) error {
|
||
|
if timeout <= 0 {
|
||
|
timeout = 60
|
||
|
}
|
||
|
|
||
|
timer := time.NewTimer(time.Duration(timeout) * time.Second)
|
||
|
for {
|
||
|
select {
|
||
|
case <-timer.C:
|
||
|
return errors.Errorf("wait position %v err", pos)
|
||
|
default:
|
||
|
curpos := c.master.Pos()
|
||
|
if curpos.Compare(pos) >= 0 {
|
||
|
return nil
|
||
|
} else {
|
||
|
time.Sleep(100 * time.Millisecond)
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (c *Canal) CatchMasterPos(timeout int) error {
|
||
|
rr, err := c.Execute("SHOW MASTER STATUS")
|
||
|
if err != nil {
|
||
|
return errors.Trace(err)
|
||
|
}
|
||
|
|
||
|
name, _ := rr.GetString(0, 0)
|
||
|
pos, _ := rr.GetInt(0, 1)
|
||
|
|
||
|
return c.WaitUntilPos(mysql.Position{name, uint32(pos)}, timeout)
|
||
|
}
|