Improved connection type logging

This commit is contained in:
Akshay Chhajed 2017-10-29 19:53:32 +05:30
parent ca2340ae0a
commit ccb7654235
4 changed files with 13 additions and 6 deletions

View File

@ -13,6 +13,7 @@ import (
"time" "time"
gosql "database/sql" gosql "database/sql"
"github.com/github/gh-ost/go/mysql" "github.com/github/gh-ost/go/mysql"
"github.com/outbrain/golib/log" "github.com/outbrain/golib/log"
) )
@ -64,7 +65,7 @@ func StringContainsAll(s string, substrings ...string) bool {
return nonEmptyStringsFound return nonEmptyStringsFound
} }
func ValidateConnection(db *gosql.DB, connectionConfig *mysql.ConnectionConfig) (string, error) { func ValidateConnection(db *gosql.DB, connectionConfig *mysql.ConnectionConfig, name string) (string, error) {
query := `select @@global.port, @@global.version` query := `select @@global.port, @@global.version`
var port, extraPort int var port, extraPort int
var version string var version string
@ -77,7 +78,7 @@ func ValidateConnection(db *gosql.DB, connectionConfig *mysql.ConnectionConfig)
} }
if connectionConfig.Key.Port == port || (extraPort > 0 && connectionConfig.Key.Port == extraPort) { if connectionConfig.Key.Port == port || (extraPort > 0 && connectionConfig.Key.Port == extraPort) {
log.Infof("connection validated on %+v", connectionConfig.Key) log.Infof("%s connection validated on %+v", name, connectionConfig.Key)
return version, nil return version, nil
} else if extraPort == 0 { } else if extraPort == 0 {
return "", fmt.Errorf("Unexpected database port reported: %+v", port) return "", fmt.Errorf("Unexpected database port reported: %+v", port)

View File

@ -34,12 +34,14 @@ type Applier struct {
db *gosql.DB db *gosql.DB
singletonDB *gosql.DB singletonDB *gosql.DB
migrationContext *base.MigrationContext migrationContext *base.MigrationContext
name string
} }
func NewApplier() *Applier { func NewApplier() *Applier {
return &Applier{ return &Applier{
connectionConfig: base.GetMigrationContext().ApplierConnectionConfig, connectionConfig: base.GetMigrationContext().ApplierConnectionConfig,
migrationContext: base.GetMigrationContext(), migrationContext: base.GetMigrationContext(),
name: "applier",
} }
} }
@ -53,11 +55,11 @@ func (this *Applier) InitDBConnections() (err error) {
return err return err
} }
this.singletonDB.SetMaxOpenConns(1) this.singletonDB.SetMaxOpenConns(1)
version, err := base.ValidateConnection(this.db, this.connectionConfig) version, err := base.ValidateConnection(this.db, this.connectionConfig, this.name)
if err != nil { if err != nil {
return err return err
} }
if _, err := base.ValidateConnection(this.singletonDB, this.connectionConfig); err != nil { if _, err := base.ValidateConnection(this.singletonDB, this.connectionConfig, this.name); err != nil {
return err return err
} }
this.migrationContext.ApplierMySQLVersion = version this.migrationContext.ApplierMySQLVersion = version

View File

@ -29,12 +29,14 @@ type Inspector struct {
connectionConfig *mysql.ConnectionConfig connectionConfig *mysql.ConnectionConfig
db *gosql.DB db *gosql.DB
migrationContext *base.MigrationContext migrationContext *base.MigrationContext
name string
} }
func NewInspector() *Inspector { func NewInspector() *Inspector {
return &Inspector{ return &Inspector{
connectionConfig: base.GetMigrationContext().InspectorConnectionConfig, connectionConfig: base.GetMigrationContext().InspectorConnectionConfig,
migrationContext: base.GetMigrationContext(), migrationContext: base.GetMigrationContext(),
name: "inspector",
} }
} }
@ -196,7 +198,7 @@ func (this *Inspector) validateConnection() error {
return fmt.Errorf("MySQL replication length limited to 32 characters. See https://dev.mysql.com/doc/refman/5.7/en/assigning-passwords.html") return fmt.Errorf("MySQL replication length limited to 32 characters. See https://dev.mysql.com/doc/refman/5.7/en/assigning-passwords.html")
} }
version, err := base.ValidateConnection(this.db, this.connectionConfig) version, err := base.ValidateConnection(this.db, this.connectionConfig, this.name)
this.migrationContext.InspectorMySQLVersion = version this.migrationContext.InspectorMySQLVersion = version
return err return err
} }

View File

@ -43,6 +43,7 @@ type EventsStreamer struct {
listenersMutex *sync.Mutex listenersMutex *sync.Mutex
eventsChannel chan *binlog.BinlogEntry eventsChannel chan *binlog.BinlogEntry
binlogReader *binlog.GoMySQLReader binlogReader *binlog.GoMySQLReader
name string
} }
func NewEventsStreamer() *EventsStreamer { func NewEventsStreamer() *EventsStreamer {
@ -52,6 +53,7 @@ func NewEventsStreamer() *EventsStreamer {
listeners: [](*BinlogEventListener){}, listeners: [](*BinlogEventListener){},
listenersMutex: &sync.Mutex{}, listenersMutex: &sync.Mutex{},
eventsChannel: make(chan *binlog.BinlogEntry, EventsChannelBufferSize), eventsChannel: make(chan *binlog.BinlogEntry, EventsChannelBufferSize),
name: "streamer",
} }
} }
@ -107,7 +109,7 @@ func (this *EventsStreamer) InitDBConnections() (err error) {
if this.db, _, err = sqlutils.GetDB(EventsStreamerUri); err != nil { if this.db, _, err = sqlutils.GetDB(EventsStreamerUri); err != nil {
return err return err
} }
if _, err := base.ValidateConnection(this.db, this.connectionConfig); err != nil { if _, err := base.ValidateConnection(this.db, this.connectionConfig, this.name); err != nil {
return err return err
} }
if err := this.readCurrentBinlogCoordinates(); err != nil { if err := this.readCurrentBinlogCoordinates(); err != nil {