Use switch statements for readability, simplify .NewGoMySQLReader() (#1135)

* Use `switch` statements for readability

* Simplify initBinlogReader()
This commit is contained in:
Tim Vaillancourt 2022-07-06 23:45:26 +02:00 committed by GitHub
parent 6bf32f2015
commit 0b066c16a5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 32 additions and 40 deletions

View File

@ -28,31 +28,24 @@ type GoMySQLReader struct {
LastAppliedRowsEventHint mysql.BinlogCoordinates LastAppliedRowsEventHint mysql.BinlogCoordinates
} }
func NewGoMySQLReader(migrationContext *base.MigrationContext) (binlogReader *GoMySQLReader, err error) { func NewGoMySQLReader(migrationContext *base.MigrationContext) *GoMySQLReader {
binlogReader = &GoMySQLReader{ connectionConfig := migrationContext.InspectorConnectionConfig
return &GoMySQLReader{
migrationContext: migrationContext, migrationContext: migrationContext,
connectionConfig: migrationContext.InspectorConnectionConfig, connectionConfig: connectionConfig,
currentCoordinates: mysql.BinlogCoordinates{}, currentCoordinates: mysql.BinlogCoordinates{},
currentCoordinatesMutex: &sync.Mutex{}, currentCoordinatesMutex: &sync.Mutex{},
binlogSyncer: nil, binlogSyncer: replication.NewBinlogSyncer(replication.BinlogSyncerConfig{
binlogStreamer: nil, ServerID: uint32(migrationContext.ReplicaServerId),
} Flavor: gomysql.MySQLFlavor,
Host: connectionConfig.Key.Hostname,
serverId := uint32(migrationContext.ReplicaServerId) Port: uint16(connectionConfig.Key.Port),
User: connectionConfig.User,
binlogSyncerConfig := replication.BinlogSyncerConfig{ Password: connectionConfig.Password,
ServerID: serverId, TLSConfig: connectionConfig.TLSConfig(),
Flavor: "mysql",
Host: binlogReader.connectionConfig.Key.Hostname,
Port: uint16(binlogReader.connectionConfig.Key.Port),
User: binlogReader.connectionConfig.User,
Password: binlogReader.connectionConfig.Password,
TLSConfig: binlogReader.connectionConfig.TLSConfig(),
UseDecimal: true, UseDecimal: true,
}),
} }
binlogReader.binlogSyncer = replication.NewBinlogSyncer(binlogSyncerConfig)
return binlogReader, err
} }
// ConnectBinlogStreamer // ConnectBinlogStreamer
@ -145,15 +138,17 @@ func (this *GoMySQLReader) StreamEvents(canStopStreaming func() bool, entriesCha
defer this.currentCoordinatesMutex.Unlock() defer this.currentCoordinatesMutex.Unlock()
this.currentCoordinates.LogPos = int64(ev.Header.LogPos) this.currentCoordinates.LogPos = int64(ev.Header.LogPos)
}() }()
if rotateEvent, ok := ev.Event.(*replication.RotateEvent); ok {
switch binlogEvent := ev.Event.(type) {
case *replication.RotateEvent:
func() { func() {
this.currentCoordinatesMutex.Lock() this.currentCoordinatesMutex.Lock()
defer this.currentCoordinatesMutex.Unlock() defer this.currentCoordinatesMutex.Unlock()
this.currentCoordinates.LogFile = string(rotateEvent.NextLogName) this.currentCoordinates.LogFile = string(binlogEvent.NextLogName)
}() }()
this.migrationContext.Log.Infof("rotate to next log from %s:%d to %s", this.currentCoordinates.LogFile, int64(ev.Header.LogPos), rotateEvent.NextLogName) this.migrationContext.Log.Infof("rotate to next log from %s:%d to %s", this.currentCoordinates.LogFile, int64(ev.Header.LogPos), binlogEvent.NextLogName)
} else if rowsEvent, ok := ev.Event.(*replication.RowsEvent); ok { case *replication.RowsEvent:
if err := this.handleRowsEvent(ev, rowsEvent, entriesChannel); err != nil { if err := this.handleRowsEvent(ev, binlogEvent, entriesChannel); err != nil {
return err return err
} }
} }

View File

@ -537,19 +537,19 @@ func (this *Migrator) cutOver() (err error) {
} }
} }
} }
if this.migrationContext.CutOverType == base.CutOverAtomic {
switch this.migrationContext.CutOverType {
case base.CutOverAtomic:
// Atomic solution: we use low timeout and multiple attempts. But for // Atomic solution: we use low timeout and multiple attempts. But for
// each failed attempt, we throttle until replication lag is back to normal // each failed attempt, we throttle until replication lag is back to normal
err := this.atomicCutOver() err = this.atomicCutOver()
this.handleCutOverResult(err) case base.CutOverTwoStep:
return err err = this.cutOverTwoStep()
} default:
if this.migrationContext.CutOverType == base.CutOverTwoStep {
err := this.cutOverTwoStep()
this.handleCutOverResult(err)
return err
}
return this.migrationContext.Log.Fatalf("Unknown cut-over type: %d; should never get here!", this.migrationContext.CutOverType) return this.migrationContext.Log.Fatalf("Unknown cut-over type: %d; should never get here!", this.migrationContext.CutOverType)
}
this.handleCutOverResult(err)
return err
} }
// Inject the "AllEventsUpToLockProcessed" state hint, wait for it to appear in the binary logs, // Inject the "AllEventsUpToLockProcessed" state hint, wait for it to appear in the binary logs,

View File

@ -123,10 +123,7 @@ func (this *EventsStreamer) InitDBConnections() (err error) {
// initBinlogReader creates and connects the reader: we hook up to a MySQL server as a replica // initBinlogReader creates and connects the reader: we hook up to a MySQL server as a replica
func (this *EventsStreamer) initBinlogReader(binlogCoordinates *mysql.BinlogCoordinates) error { func (this *EventsStreamer) initBinlogReader(binlogCoordinates *mysql.BinlogCoordinates) error {
goMySQLReader, err := binlog.NewGoMySQLReader(this.migrationContext) goMySQLReader := binlog.NewGoMySQLReader(this.migrationContext)
if err != nil {
return err
}
if err := goMySQLReader.ConnectBinlogStreamer(*binlogCoordinates); err != nil { if err := goMySQLReader.ConnectBinlogStreamer(*binlogCoordinates); err != nil {
return err return err
} }