add migrationcontext to gomysql_reader
This commit is contained in:
parent
f4676bf463
commit
3ca5f89fa5
@ -13,13 +13,13 @@ import (
|
|||||||
"github.com/github/gh-ost/go/mysql"
|
"github.com/github/gh-ost/go/mysql"
|
||||||
"github.com/github/gh-ost/go/sql"
|
"github.com/github/gh-ost/go/sql"
|
||||||
|
|
||||||
"github.com/outbrain/golib/log"
|
|
||||||
gomysql "github.com/siddontang/go-mysql/mysql"
|
gomysql "github.com/siddontang/go-mysql/mysql"
|
||||||
"github.com/siddontang/go-mysql/replication"
|
"github.com/siddontang/go-mysql/replication"
|
||||||
"golang.org/x/net/context"
|
"golang.org/x/net/context"
|
||||||
)
|
)
|
||||||
|
|
||||||
type GoMySQLReader struct {
|
type GoMySQLReader struct {
|
||||||
|
migrationContext *base.MigrationContext
|
||||||
connectionConfig *mysql.ConnectionConfig
|
connectionConfig *mysql.ConnectionConfig
|
||||||
binlogSyncer *replication.BinlogSyncer
|
binlogSyncer *replication.BinlogSyncer
|
||||||
binlogStreamer *replication.BinlogStreamer
|
binlogStreamer *replication.BinlogStreamer
|
||||||
@ -30,6 +30,7 @@ type GoMySQLReader struct {
|
|||||||
|
|
||||||
func NewGoMySQLReader(migrationContext *base.MigrationContext) (binlogReader *GoMySQLReader, err error) {
|
func NewGoMySQLReader(migrationContext *base.MigrationContext) (binlogReader *GoMySQLReader, err error) {
|
||||||
binlogReader = &GoMySQLReader{
|
binlogReader = &GoMySQLReader{
|
||||||
|
migrationContext: migrationContext,
|
||||||
connectionConfig: migrationContext.InspectorConnectionConfig,
|
connectionConfig: migrationContext.InspectorConnectionConfig,
|
||||||
currentCoordinates: mysql.BinlogCoordinates{},
|
currentCoordinates: mysql.BinlogCoordinates{},
|
||||||
currentCoordinatesMutex: &sync.Mutex{},
|
currentCoordinatesMutex: &sync.Mutex{},
|
||||||
@ -57,11 +58,11 @@ func NewGoMySQLReader(migrationContext *base.MigrationContext) (binlogReader *Go
|
|||||||
// ConnectBinlogStreamer
|
// ConnectBinlogStreamer
|
||||||
func (this *GoMySQLReader) ConnectBinlogStreamer(coordinates mysql.BinlogCoordinates) (err error) {
|
func (this *GoMySQLReader) ConnectBinlogStreamer(coordinates mysql.BinlogCoordinates) (err error) {
|
||||||
if coordinates.IsEmpty() {
|
if coordinates.IsEmpty() {
|
||||||
return log.Errorf("Empty coordinates at ConnectBinlogStreamer()")
|
return this.migrationContext.Log.Errorf("Empty coordinates at ConnectBinlogStreamer()")
|
||||||
}
|
}
|
||||||
|
|
||||||
this.currentCoordinates = coordinates
|
this.currentCoordinates = coordinates
|
||||||
log.Infof("Connecting binlog streamer at %+v", this.currentCoordinates)
|
this.migrationContext.Log.Infof("Connecting binlog streamer at %+v", this.currentCoordinates)
|
||||||
// Start sync with specified binlog file and position
|
// Start sync with specified binlog file and position
|
||||||
this.binlogStreamer, err = this.binlogSyncer.StartSync(gomysql.Position{this.currentCoordinates.LogFile, uint32(this.currentCoordinates.LogPos)})
|
this.binlogStreamer, err = this.binlogSyncer.StartSync(gomysql.Position{this.currentCoordinates.LogFile, uint32(this.currentCoordinates.LogPos)})
|
||||||
|
|
||||||
@ -78,7 +79,7 @@ func (this *GoMySQLReader) GetCurrentBinlogCoordinates() *mysql.BinlogCoordinate
|
|||||||
// StreamEvents
|
// StreamEvents
|
||||||
func (this *GoMySQLReader) handleRowsEvent(ev *replication.BinlogEvent, rowsEvent *replication.RowsEvent, entriesChannel chan<- *BinlogEntry) error {
|
func (this *GoMySQLReader) handleRowsEvent(ev *replication.BinlogEvent, rowsEvent *replication.RowsEvent, entriesChannel chan<- *BinlogEntry) error {
|
||||||
if this.currentCoordinates.SmallerThanOrEquals(&this.LastAppliedRowsEventHint) {
|
if this.currentCoordinates.SmallerThanOrEquals(&this.LastAppliedRowsEventHint) {
|
||||||
log.Debugf("Skipping handled query at %+v", this.currentCoordinates)
|
this.migrationContext.Log.Debugf("Skipping handled query at %+v", this.currentCoordinates)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -147,14 +148,14 @@ func (this *GoMySQLReader) StreamEvents(canStopStreaming func() bool, entriesCha
|
|||||||
defer this.currentCoordinatesMutex.Unlock()
|
defer this.currentCoordinatesMutex.Unlock()
|
||||||
this.currentCoordinates.LogFile = string(rotateEvent.NextLogName)
|
this.currentCoordinates.LogFile = string(rotateEvent.NextLogName)
|
||||||
}()
|
}()
|
||||||
log.Infof("rotate to next log from %s:%d to %s", this.currentCoordinates.LogFile, int64(ev.Header.LogPos), rotateEvent.NextLogName)
|
this.migrationContext.Log.Infof("rotate to next log from %s:%d to %s", this.currentCoordinates.LogFile, int64(ev.Header.LogPos), rotateEvent.NextLogName)
|
||||||
} else if rowsEvent, ok := ev.Event.(*replication.RowsEvent); ok {
|
} else if rowsEvent, ok := ev.Event.(*replication.RowsEvent); ok {
|
||||||
if err := this.handleRowsEvent(ev, rowsEvent, entriesChannel); err != nil {
|
if err := this.handleRowsEvent(ev, rowsEvent, entriesChannel); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
log.Debugf("done streaming events")
|
this.migrationContext.Log.Debugf("done streaming events")
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user