diff --git a/go/binlog/gomysql_reader.go b/go/binlog/gomysql_reader.go index 7b57a99..445617a 100644 --- a/go/binlog/gomysql_reader.go +++ b/go/binlog/gomysql_reader.go @@ -16,6 +16,7 @@ import ( "github.com/outbrain/golib/log" gomysql "github.com/siddontang/go-mysql/mysql" "github.com/siddontang/go-mysql/replication" + "golang.org/x/net/context" ) type GoMySQLReader struct { @@ -39,7 +40,16 @@ func NewGoMySQLReader(connectionConfig *mysql.ConnectionConfig) (binlogReader *G } serverId := uint32(binlogReader.MigrationContext.ReplicaServerId) - binlogReader.binlogSyncer = replication.NewBinlogSyncer(serverId, "mysql") + + binlogSyncerConfig := &replication.BinlogSyncerConfig{ + ServerID: serverId, + Flavor: "mysql", + Host: connectionConfig.Key.Hostname, + Port: uint16(connectionConfig.Key.Port), + User: connectionConfig.User, + Password: connectionConfig.Password, + } + binlogReader.binlogSyncer = replication.NewBinlogSyncer(binlogSyncerConfig) return binlogReader, err } @@ -49,10 +59,6 @@ func (this *GoMySQLReader) ConnectBinlogStreamer(coordinates mysql.BinlogCoordin if coordinates.IsEmpty() { return log.Errorf("Emptry coordinates at ConnectBinlogStreamer()") } - log.Infof("Registering replica at %+v:%+v", this.connectionConfig.Key.Hostname, uint16(this.connectionConfig.Key.Port)) - if err := this.binlogSyncer.RegisterSlave(this.connectionConfig.Key.Hostname, uint16(this.connectionConfig.Key.Port), this.connectionConfig.User, this.connectionConfig.Password); err != nil { - return err - } this.currentCoordinates = coordinates log.Infof("Connecting binlog streamer at %+v", this.currentCoordinates) @@ -126,7 +132,7 @@ func (this *GoMySQLReader) StreamEvents(canStopStreaming func() bool, entriesCha if canStopStreaming() { break } - ev, err := this.binlogStreamer.GetEvent() + ev, err := this.binlogStreamer.GetEvent(context.Background()) if err != nil { return err }