From ccb76542350adf4b1d7917660f44246ce78fe7bf Mon Sep 17 00:00:00 2001
From: Akshay Chhajed <akshaychhajed@users.noreply.github.com>
Date: Sun, 29 Oct 2017 19:53:32 +0530
Subject: [PATCH 1/4] Improved connection type logging

---
 go/base/utils.go     | 5 +++--
 go/logic/applier.go  | 6 ++++--
 go/logic/inspect.go  | 4 +++-
 go/logic/streamer.go | 4 +++-
 4 files changed, 13 insertions(+), 6 deletions(-)

diff --git a/go/base/utils.go b/go/base/utils.go
index 727bc57..9c47407 100644
--- a/go/base/utils.go
+++ b/go/base/utils.go
@@ -13,6 +13,7 @@ import (
 	"time"
 
 	gosql "database/sql"
+
 	"github.com/github/gh-ost/go/mysql"
 	"github.com/outbrain/golib/log"
 )
@@ -64,7 +65,7 @@ func StringContainsAll(s string, substrings ...string) bool {
 	return nonEmptyStringsFound
 }
 
-func ValidateConnection(db *gosql.DB, connectionConfig *mysql.ConnectionConfig) (string, error) {
+func ValidateConnection(db *gosql.DB, connectionConfig *mysql.ConnectionConfig, name string) (string, error) {
 	query := `select @@global.port, @@global.version`
 	var port, extraPort int
 	var version string
@@ -77,7 +78,7 @@ func ValidateConnection(db *gosql.DB, connectionConfig *mysql.ConnectionConfig)
 	}
 
 	if connectionConfig.Key.Port == port || (extraPort > 0 && connectionConfig.Key.Port == extraPort) {
-		log.Infof("connection validated on %+v", connectionConfig.Key)
+		log.Infof("%s connection validated on %+v", name, connectionConfig.Key)
 		return version, nil
 	} else if extraPort == 0 {
 		return "", fmt.Errorf("Unexpected database port reported: %+v", port)
diff --git a/go/logic/applier.go b/go/logic/applier.go
index 419800d..9e645f4 100644
--- a/go/logic/applier.go
+++ b/go/logic/applier.go
@@ -34,12 +34,14 @@ type Applier struct {
 	db               *gosql.DB
 	singletonDB      *gosql.DB
 	migrationContext *base.MigrationContext
+	name             string
 }
 
 func NewApplier() *Applier {
 	return &Applier{
 		connectionConfig: base.GetMigrationContext().ApplierConnectionConfig,
 		migrationContext: base.GetMigrationContext(),
+		name:             "applier",
 	}
 }
 
@@ -53,11 +55,11 @@ func (this *Applier) InitDBConnections() (err error) {
 		return err
 	}
 	this.singletonDB.SetMaxOpenConns(1)
-	version, err := base.ValidateConnection(this.db, this.connectionConfig)
+	version, err := base.ValidateConnection(this.db, this.connectionConfig, this.name)
 	if err != nil {
 		return err
 	}
-	if _, err := base.ValidateConnection(this.singletonDB, this.connectionConfig); err != nil {
+	if _, err := base.ValidateConnection(this.singletonDB, this.connectionConfig, this.name); err != nil {
 		return err
 	}
 	this.migrationContext.ApplierMySQLVersion = version
diff --git a/go/logic/inspect.go b/go/logic/inspect.go
index 6efacf9..1c18642 100644
--- a/go/logic/inspect.go
+++ b/go/logic/inspect.go
@@ -29,12 +29,14 @@ type Inspector struct {
 	connectionConfig *mysql.ConnectionConfig
 	db               *gosql.DB
 	migrationContext *base.MigrationContext
+	name             string
 }
 
 func NewInspector() *Inspector {
 	return &Inspector{
 		connectionConfig: base.GetMigrationContext().InspectorConnectionConfig,
 		migrationContext: base.GetMigrationContext(),
+		name:             "inspector",
 	}
 }
 
@@ -196,7 +198,7 @@ func (this *Inspector) validateConnection() error {
 		return fmt.Errorf("MySQL replication length limited to 32 characters. See https://dev.mysql.com/doc/refman/5.7/en/assigning-passwords.html")
 	}
 
-	version, err := base.ValidateConnection(this.db, this.connectionConfig)
+	version, err := base.ValidateConnection(this.db, this.connectionConfig, this.name)
 	this.migrationContext.InspectorMySQLVersion = version
 	return err
 }
diff --git a/go/logic/streamer.go b/go/logic/streamer.go
index 14ac6ab..275af55 100644
--- a/go/logic/streamer.go
+++ b/go/logic/streamer.go
@@ -43,6 +43,7 @@ type EventsStreamer struct {
 	listenersMutex           *sync.Mutex
 	eventsChannel            chan *binlog.BinlogEntry
 	binlogReader             *binlog.GoMySQLReader
+	name                     string
 }
 
 func NewEventsStreamer() *EventsStreamer {
@@ -52,6 +53,7 @@ func NewEventsStreamer() *EventsStreamer {
 		listeners:        [](*BinlogEventListener){},
 		listenersMutex:   &sync.Mutex{},
 		eventsChannel:    make(chan *binlog.BinlogEntry, EventsChannelBufferSize),
+		name:             "streamer",
 	}
 }
 
@@ -107,7 +109,7 @@ func (this *EventsStreamer) InitDBConnections() (err error) {
 	if this.db, _, err = sqlutils.GetDB(EventsStreamerUri); err != nil {
 		return err
 	}
-	if _, err := base.ValidateConnection(this.db, this.connectionConfig); err != nil {
+	if _, err := base.ValidateConnection(this.db, this.connectionConfig, this.name); err != nil {
 		return err
 	}
 	if err := this.readCurrentBinlogCoordinates(); err != nil {

From 72ccd0b0d01f0d5782769d245732974e14cda360 Mon Sep 17 00:00:00 2001
From: Tim Vaillancourt <timvaillancourt@github.com>
Date: Mon, 8 Feb 2021 13:36:59 +0100
Subject: [PATCH 2/4] Fix whitespace after merge-conflict fix

---
 go/logic/inspect.go | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/go/logic/inspect.go b/go/logic/inspect.go
index 2c1846b..63abc26 100644
--- a/go/logic/inspect.go
+++ b/go/logic/inspect.go
@@ -29,14 +29,14 @@ type Inspector struct {
 	db                  *gosql.DB
 	informationSchemaDb *gosql.DB
 	migrationContext    *base.MigrationContext
-  name                string
+	name                string
 }
 
 func NewInspector(migrationContext *base.MigrationContext) *Inspector {
 	return &Inspector{
 		connectionConfig: migrationContext.InspectorConnectionConfig,
 		migrationContext: migrationContext,
-    name:             "inspector",
+		name:             "inspector",
 	}
 }
 

From 048d5838db9ca19ac02e10a6f86ee5fab6b27a6e Mon Sep 17 00:00:00 2001
From: Tim Vaillancourt <timvaillancourt@github.com>
Date: Mon, 8 Feb 2021 13:37:39 +0100
Subject: [PATCH 3/4] Fix whitespace after merge-conflict fix

---
 go/logic/applier.go | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/go/logic/applier.go b/go/logic/applier.go
index fb4bc8d..52628ec 100644
--- a/go/logic/applier.go
+++ b/go/logic/applier.go
@@ -57,7 +57,7 @@ type Applier struct {
 	singletonDB       *gosql.DB
 	migrationContext  *base.MigrationContext
 	finishedMigrating int64
-  name              string
+	name              string
 }
 
 func NewApplier(migrationContext *base.MigrationContext) *Applier {
@@ -65,7 +65,7 @@ func NewApplier(migrationContext *base.MigrationContext) *Applier {
 		connectionConfig:  migrationContext.ApplierConnectionConfig,
 		migrationContext:  migrationContext,
 		finishedMigrating: 0,
-    name:              "applier",
+		name:              "applier",
 	}
 }
 

From 710c9ddda575c74b668f211241ea5eb2ec11bcfc Mon Sep 17 00:00:00 2001
From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com>
Date: Thu, 18 Feb 2021 10:44:47 +0200
Subject: [PATCH 4/4] All MySQL DBs limited to max 3 concurrent/idle
 connections

Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com>
---
 go/logic/throttler.go |  7 +++++--
 go/mysql/utils.go     | 28 +++++++++++++++-------------
 2 files changed, 20 insertions(+), 15 deletions(-)

diff --git a/go/logic/throttler.go b/go/logic/throttler.go
index d234ea6..abe8669 100644
--- a/go/logic/throttler.go
+++ b/go/logic/throttler.go
@@ -188,9 +188,12 @@ func (this *Throttler) collectControlReplicasLag() {
 		dbUri := connectionConfig.GetDBUri("information_schema")
 
 		var heartbeatValue string
-		if db, _, err := mysql.GetDB(this.migrationContext.Uuid, dbUri); err != nil {
+		db, _, err := mysql.GetDB(this.migrationContext.Uuid, dbUri)
+		if err != nil {
 			return lag, err
-		} else if err = db.QueryRow(replicationLagQuery).Scan(&heartbeatValue); err != nil {
+		}
+
+		if err := db.QueryRow(replicationLagQuery).Scan(&heartbeatValue); err != nil {
 			return lag, err
 		}
 
diff --git a/go/mysql/utils.go b/go/mysql/utils.go
index 17bb5fc..43a228e 100644
--- a/go/mysql/utils.go
+++ b/go/mysql/utils.go
@@ -18,8 +18,11 @@ import (
 	"github.com/outbrain/golib/sqlutils"
 )
 
-const MaxTableNameLength = 64
-const MaxReplicationPasswordLength = 32
+const (
+	MaxTableNameLength           = 64
+	MaxReplicationPasswordLength = 32
+	MaxDBPoolConnections         = 3
+)
 
 type ReplicationLagResult struct {
 	Key InstanceKey
@@ -39,23 +42,22 @@ func (this *ReplicationLagResult) HasLag() bool {
 var knownDBs map[string]*gosql.DB = make(map[string]*gosql.DB)
 var knownDBsMutex = &sync.Mutex{}
 
-func GetDB(migrationUuid string, mysql_uri string) (*gosql.DB, bool, error) {
+func GetDB(migrationUuid string, mysql_uri string) (db *gosql.DB, exists bool, err error) {
 	cacheKey := migrationUuid + ":" + mysql_uri
 
 	knownDBsMutex.Lock()
-	defer func() {
-		knownDBsMutex.Unlock()
-	}()
+	defer knownDBsMutex.Unlock()
 
-	var exists bool
-	if _, exists = knownDBs[cacheKey]; !exists {
-		if db, err := gosql.Open("mysql", mysql_uri); err == nil {
-			knownDBs[cacheKey] = db
-		} else {
-			return db, exists, err
+	if db, exists = knownDBs[cacheKey]; !exists {
+		db, err = gosql.Open("mysql", mysql_uri)
+		if err != nil {
+			return nil, false, err
 		}
+		db.SetMaxOpenConns(MaxDBPoolConnections)
+		db.SetMaxIdleConns(MaxDBPoolConnections)
+		knownDBs[cacheKey] = db
 	}
-	return knownDBs[cacheKey], exists, nil
+	return db, exists, nil
 }
 
 // GetReplicationLagFromSlaveStatus returns replication lag for a given db; via SHOW SLAVE STATUS