Merge pull request #15 from openark/limit-mysql-connetions

All MySQL DBs limited to max 3 concurrent/idle connections
This commit is contained in:
Shlomi Noach 2021-02-22 12:31:18 +02:00 committed by GitHub
commit 2b5cf78b4d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 33 additions and 21 deletions

View File

@ -13,6 +13,7 @@ import (
"time" "time"
gosql "database/sql" gosql "database/sql"
"github.com/github/gh-ost/go/mysql" "github.com/github/gh-ost/go/mysql"
) )
@ -62,7 +63,7 @@ func StringContainsAll(s string, substrings ...string) bool {
return nonEmptyStringsFound return nonEmptyStringsFound
} }
func ValidateConnection(db *gosql.DB, connectionConfig *mysql.ConnectionConfig, migrationContext *MigrationContext) (string, error) { func ValidateConnection(db *gosql.DB, connectionConfig *mysql.ConnectionConfig, migrationContext *MigrationContext, name string) (string, error) {
versionQuery := `select @@global.version` versionQuery := `select @@global.version`
var port, extraPort int var port, extraPort int
var version string var version string
@ -86,7 +87,7 @@ func ValidateConnection(db *gosql.DB, connectionConfig *mysql.ConnectionConfig,
} }
if connectionConfig.Key.Port == port || (extraPort > 0 && connectionConfig.Key.Port == extraPort) { if connectionConfig.Key.Port == port || (extraPort > 0 && connectionConfig.Key.Port == extraPort) {
migrationContext.Log.Infof("connection validated on %+v", connectionConfig.Key) migrationContext.Log.Infof("%s connection validated on %+v", name, connectionConfig.Key)
return version, nil return version, nil
} else if extraPort == 0 { } else if extraPort == 0 {
return "", fmt.Errorf("Unexpected database port reported: %+v", port) return "", fmt.Errorf("Unexpected database port reported: %+v", port)

View File

@ -57,6 +57,7 @@ type Applier struct {
singletonDB *gosql.DB singletonDB *gosql.DB
migrationContext *base.MigrationContext migrationContext *base.MigrationContext
finishedMigrating int64 finishedMigrating int64
name string
} }
func NewApplier(migrationContext *base.MigrationContext) *Applier { func NewApplier(migrationContext *base.MigrationContext) *Applier {
@ -64,6 +65,7 @@ func NewApplier(migrationContext *base.MigrationContext) *Applier {
connectionConfig: migrationContext.ApplierConnectionConfig, connectionConfig: migrationContext.ApplierConnectionConfig,
migrationContext: migrationContext, migrationContext: migrationContext,
finishedMigrating: 0, finishedMigrating: 0,
name: "applier",
} }
} }
@ -78,11 +80,11 @@ func (this *Applier) InitDBConnections() (err error) {
return err return err
} }
this.singletonDB.SetMaxOpenConns(1) this.singletonDB.SetMaxOpenConns(1)
version, err := base.ValidateConnection(this.db, this.connectionConfig, this.migrationContext) version, err := base.ValidateConnection(this.db, this.connectionConfig, this.migrationContext, this.name)
if err != nil { if err != nil {
return err return err
} }
if _, err := base.ValidateConnection(this.singletonDB, this.connectionConfig, this.migrationContext); err != nil { if _, err := base.ValidateConnection(this.singletonDB, this.connectionConfig, this.migrationContext, this.name); err != nil {
return err return err
} }
this.migrationContext.ApplierMySQLVersion = version this.migrationContext.ApplierMySQLVersion = version

View File

@ -29,12 +29,14 @@ type Inspector struct {
db *gosql.DB db *gosql.DB
informationSchemaDb *gosql.DB informationSchemaDb *gosql.DB
migrationContext *base.MigrationContext migrationContext *base.MigrationContext
name string
} }
func NewInspector(migrationContext *base.MigrationContext) *Inspector { func NewInspector(migrationContext *base.MigrationContext) *Inspector {
return &Inspector{ return &Inspector{
connectionConfig: migrationContext.InspectorConnectionConfig, connectionConfig: migrationContext.InspectorConnectionConfig,
migrationContext: migrationContext, migrationContext: migrationContext,
name: "inspector",
} }
} }
@ -206,7 +208,7 @@ func (this *Inspector) validateConnection() error {
return fmt.Errorf("MySQL replication length limited to 32 characters. See https://dev.mysql.com/doc/refman/5.7/en/assigning-passwords.html") return fmt.Errorf("MySQL replication length limited to 32 characters. See https://dev.mysql.com/doc/refman/5.7/en/assigning-passwords.html")
} }
version, err := base.ValidateConnection(this.db, this.connectionConfig, this.migrationContext) version, err := base.ValidateConnection(this.db, this.connectionConfig, this.migrationContext, this.name)
this.migrationContext.InspectorMySQLVersion = version this.migrationContext.InspectorMySQLVersion = version
return err return err
} }

View File

@ -42,6 +42,7 @@ type EventsStreamer struct {
listenersMutex *sync.Mutex listenersMutex *sync.Mutex
eventsChannel chan *binlog.BinlogEntry eventsChannel chan *binlog.BinlogEntry
binlogReader *binlog.GoMySQLReader binlogReader *binlog.GoMySQLReader
name string
} }
func NewEventsStreamer(migrationContext *base.MigrationContext) *EventsStreamer { func NewEventsStreamer(migrationContext *base.MigrationContext) *EventsStreamer {
@ -51,6 +52,7 @@ func NewEventsStreamer(migrationContext *base.MigrationContext) *EventsStreamer
listeners: [](*BinlogEventListener){}, listeners: [](*BinlogEventListener){},
listenersMutex: &sync.Mutex{}, listenersMutex: &sync.Mutex{},
eventsChannel: make(chan *binlog.BinlogEntry, EventsChannelBufferSize), eventsChannel: make(chan *binlog.BinlogEntry, EventsChannelBufferSize),
name: "streamer",
} }
} }
@ -106,7 +108,7 @@ func (this *EventsStreamer) InitDBConnections() (err error) {
if this.db, _, err = mysql.GetDB(this.migrationContext.Uuid, EventsStreamerUri); err != nil { if this.db, _, err = mysql.GetDB(this.migrationContext.Uuid, EventsStreamerUri); err != nil {
return err return err
} }
if _, err := base.ValidateConnection(this.db, this.connectionConfig, this.migrationContext); err != nil { if _, err := base.ValidateConnection(this.db, this.connectionConfig, this.migrationContext, this.name); err != nil {
return err return err
} }
if err := this.readCurrentBinlogCoordinates(); err != nil { if err := this.readCurrentBinlogCoordinates(); err != nil {

View File

@ -188,9 +188,12 @@ func (this *Throttler) collectControlReplicasLag() {
dbUri := connectionConfig.GetDBUri("information_schema") dbUri := connectionConfig.GetDBUri("information_schema")
var heartbeatValue string var heartbeatValue string
if db, _, err := mysql.GetDB(this.migrationContext.Uuid, dbUri); err != nil { db, _, err := mysql.GetDB(this.migrationContext.Uuid, dbUri)
if err != nil {
return lag, err return lag, err
} else if err = db.QueryRow(replicationLagQuery).Scan(&heartbeatValue); err != nil { }
if err := db.QueryRow(replicationLagQuery).Scan(&heartbeatValue); err != nil {
return lag, err return lag, err
} }

View File

@ -18,8 +18,11 @@ import (
"github.com/outbrain/golib/sqlutils" "github.com/outbrain/golib/sqlutils"
) )
const MaxTableNameLength = 64 const (
const MaxReplicationPasswordLength = 32 MaxTableNameLength = 64
MaxReplicationPasswordLength = 32
MaxDBPoolConnections = 3
)
type ReplicationLagResult struct { type ReplicationLagResult struct {
Key InstanceKey Key InstanceKey
@ -39,23 +42,22 @@ func (this *ReplicationLagResult) HasLag() bool {
var knownDBs map[string]*gosql.DB = make(map[string]*gosql.DB) var knownDBs map[string]*gosql.DB = make(map[string]*gosql.DB)
var knownDBsMutex = &sync.Mutex{} var knownDBsMutex = &sync.Mutex{}
func GetDB(migrationUuid string, mysql_uri string) (*gosql.DB, bool, error) { func GetDB(migrationUuid string, mysql_uri string) (db *gosql.DB, exists bool, err error) {
cacheKey := migrationUuid + ":" + mysql_uri cacheKey := migrationUuid + ":" + mysql_uri
knownDBsMutex.Lock() knownDBsMutex.Lock()
defer func() { defer knownDBsMutex.Unlock()
knownDBsMutex.Unlock()
}()
var exists bool if db, exists = knownDBs[cacheKey]; !exists {
if _, exists = knownDBs[cacheKey]; !exists { db, err = gosql.Open("mysql", mysql_uri)
if db, err := gosql.Open("mysql", mysql_uri); err == nil { if err != nil {
return nil, false, err
}
db.SetMaxOpenConns(MaxDBPoolConnections)
db.SetMaxIdleConns(MaxDBPoolConnections)
knownDBs[cacheKey] = db knownDBs[cacheKey] = db
} else {
return db, exists, err
} }
} return db, exists, nil
return knownDBs[cacheKey], exists, nil
} }
// GetReplicationLagFromSlaveStatus returns replication lag for a given db; via SHOW SLAVE STATUS // GetReplicationLagFromSlaveStatus returns replication lag for a given db; via SHOW SLAVE STATUS