package main import ( "flag" "fmt" "os" "os/signal" "strings" "syscall" "time" "github.com/siddontang/go-mysql/canal" "github.com/siddontang/go-mysql/mysql" ) var host = flag.String("host", "127.0.0.1", "MySQL host") var port = flag.Int("port", 3306, "MySQL port") var user = flag.String("user", "root", "MySQL user, must have replication privilege") var password = flag.String("password", "", "MySQL password") var flavor = flag.String("flavor", "mysql", "Flavor: mysql or mariadb") var serverID = flag.Int("server-id", 101, "Unique Server ID") var mysqldump = flag.String("mysqldump", "mysqldump", "mysqldump execution path") var dbs = flag.String("dbs", "test", "dump databases, seperated by comma") var tables = flag.String("tables", "", "dump tables, seperated by comma, will overwrite dbs") var tableDB = flag.String("table_db", "test", "database for dump tables") var ignoreTables = flag.String("ignore_tables", "", "ignore tables, must be database.table format, separated by comma") var startName = flag.String("bin_name", "", "start sync from binlog name") var startPos = flag.Uint("bin_pos", 0, "start sync from binlog position of") var heartbeatPeriod = flag.Duration("heartbeat", 60*time.Second, "master heartbeat period") var readTimeout = flag.Duration("read_timeout", 90*time.Second, "connection read timeout") func main() { flag.Parse() cfg := canal.NewDefaultConfig() cfg.Addr = fmt.Sprintf("%s:%d", *host, *port) cfg.User = *user cfg.Password = *password cfg.Flavor = *flavor cfg.UseDecimal = true cfg.ReadTimeout = *readTimeout cfg.HeartbeatPeriod = *heartbeatPeriod cfg.ServerID = uint32(*serverID) cfg.Dump.ExecutionPath = *mysqldump cfg.Dump.DiscardErr = false c, err := canal.NewCanal(cfg) if err != nil { fmt.Printf("create canal err %v", err) os.Exit(1) } if len(*ignoreTables) == 0 { subs := strings.Split(*ignoreTables, ",") for _, sub := range subs { if seps := strings.Split(sub, "."); len(seps) == 2 { c.AddDumpIgnoreTables(seps[0], seps[1]) } } } if len(*tables) > 0 && len(*tableDB) > 0 { subs := strings.Split(*tables, ",") c.AddDumpTables(*tableDB, subs...) } else if len(*dbs) > 0 { subs := strings.Split(*dbs, ",") c.AddDumpDatabases(subs...) } c.SetEventHandler(&handler{}) startPos := mysql.Position{ Name: *startName, Pos: uint32(*startPos), } go func() { err = c.RunFrom(startPos) if err != nil { fmt.Printf("start canal err %v", err) } }() sc := make(chan os.Signal, 1) signal.Notify(sc, os.Kill, os.Interrupt, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) <-sc c.Close() } type handler struct { canal.DummyEventHandler } func (h *handler) OnRow(e *canal.RowsEvent) error { fmt.Printf("%v\n", e) return nil } func (h *handler) String() string { return "TestHandler" }