Pass in a migrationContext UUID for a migration specific connections cache
This commit is contained in:
parent
fac1ba7026
commit
ec6ceffbcb
@ -14,6 +14,8 @@ import (
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/satori/go.uuid"
|
||||
|
||||
"github.com/github/gh-ost/go/mysql"
|
||||
"github.com/github/gh-ost/go/sql"
|
||||
|
||||
@ -71,6 +73,8 @@ func NewThrottleCheckResult(throttle bool, reason string, reasonHint ThrottleRea
|
||||
// MigrationContext has the general, global state of migration. It is used by
|
||||
// all components throughout the migration process.
|
||||
type MigrationContext struct {
|
||||
Uuid string
|
||||
|
||||
DatabaseName string
|
||||
OriginalTableName string
|
||||
AlterStatement string
|
||||
@ -212,6 +216,7 @@ type ContextConfig struct {
|
||||
|
||||
func NewMigrationContext() *MigrationContext {
|
||||
return &MigrationContext{
|
||||
Uuid: uuid.NewV4().String(),
|
||||
defaultNumRetries: 60,
|
||||
ChunkSize: 1000,
|
||||
InspectorConnectionConfig: mysql.NewConnectionConfig(),
|
||||
|
@ -68,12 +68,13 @@ func NewApplier(migrationContext *base.MigrationContext) *Applier {
|
||||
}
|
||||
|
||||
func (this *Applier) InitDBConnections() (err error) {
|
||||
|
||||
applierUri := this.connectionConfig.GetDBUri(this.migrationContext.DatabaseName)
|
||||
if this.db, err = mysql.GetDB(applierUri); err != nil {
|
||||
if this.db, _, err = mysql.GetDB(this.migrationContext.Uuid, applierUri); err != nil {
|
||||
return err
|
||||
}
|
||||
singletonApplierUri := fmt.Sprintf("%s?timeout=0", applierUri)
|
||||
if this.singletonDB, err = mysql.GetDB(singletonApplierUri); err != nil {
|
||||
if this.singletonDB, _, err = mysql.GetDB(this.migrationContext.Uuid, singletonApplierUri); err != nil {
|
||||
return err
|
||||
}
|
||||
this.singletonDB.SetMaxOpenConns(1)
|
||||
|
@ -41,12 +41,12 @@ func NewInspector(migrationContext *base.MigrationContext) *Inspector {
|
||||
|
||||
func (this *Inspector) InitDBConnections() (err error) {
|
||||
inspectorUri := this.connectionConfig.GetDBUri(this.migrationContext.DatabaseName)
|
||||
if this.db, err = mysql.GetDB(inspectorUri); err != nil {
|
||||
if this.db, _, err = mysql.GetDB(this.migrationContext.Uuid, inspectorUri); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
informationSchemaUri := this.connectionConfig.GetDBUri("information_schema")
|
||||
if this.informationSchemaDb, err = mysql.GetDB(informationSchemaUri); err != nil {
|
||||
if this.informationSchemaDb, _, err = mysql.GetDB(this.migrationContext.Uuid, informationSchemaUri); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -104,7 +104,7 @@ func (this *EventsStreamer) notifyListeners(binlogEvent *binlog.BinlogDMLEvent)
|
||||
|
||||
func (this *EventsStreamer) InitDBConnections() (err error) {
|
||||
EventsStreamerUri := this.connectionConfig.GetDBUri(this.migrationContext.DatabaseName)
|
||||
if this.db, err = mysql.GetDB(EventsStreamerUri); err != nil {
|
||||
if this.db, _, err = mysql.GetDB(this.migrationContext.Uuid, EventsStreamerUri); err != nil {
|
||||
return err
|
||||
}
|
||||
if _, err := base.ValidateConnection(this.db, this.connectionConfig); err != nil {
|
||||
|
@ -186,7 +186,7 @@ func (this *Throttler) collectControlReplicasLag() {
|
||||
dbUri := connectionConfig.GetDBUri("information_schema")
|
||||
|
||||
var heartbeatValue string
|
||||
if db, err := mysql.GetDB(dbUri); err != nil {
|
||||
if db, _, err := mysql.GetDB(this.migrationContext.Uuid, dbUri); err != nil {
|
||||
return lag, err
|
||||
} else if err = db.QueryRow(replicationLagQuery).Scan(&heartbeatValue); err != nil {
|
||||
return lag, err
|
||||
@ -461,6 +461,6 @@ func (this *Throttler) throttle(onThrottled func()) {
|
||||
}
|
||||
|
||||
func (this *Throttler) Teardown() {
|
||||
this.migrationContext.Log.Debugf("Tearing down...")
|
||||
log.Debugf("Tearing down...")
|
||||
atomic.StoreInt64(&this.finishedMigrating, 1)
|
||||
}
|
||||
|
@ -8,6 +8,7 @@ package mysql
|
||||
import (
|
||||
gosql "database/sql"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/github/gh-ost/go/sql"
|
||||
@ -33,13 +34,27 @@ func (this *ReplicationLagResult) HasLag() bool {
|
||||
return this.Lag > 0
|
||||
}
|
||||
|
||||
func GetDB(mysql_uri string) (*gosql.DB, error) {
|
||||
db, err := gosql.Open("mysql", mysql_uri)
|
||||
if err == nil {
|
||||
return db, nil
|
||||
} else {
|
||||
return nil, err
|
||||
// knownDBs is a DB cache by uri
|
||||
var knownDBs map[string]*gosql.DB = make(map[string]*gosql.DB)
|
||||
var knownDBsMutex = &sync.Mutex{}
|
||||
|
||||
func GetDB(migrationUuid string, mysql_uri string) (*gosql.DB, bool, error) {
|
||||
cacheKey := migrationUuid + ":" + mysql_uri
|
||||
|
||||
knownDBsMutex.Lock()
|
||||
defer func() {
|
||||
knownDBsMutex.Unlock()
|
||||
}()
|
||||
|
||||
var exists bool
|
||||
if _, exists = knownDBs[cacheKey]; !exists {
|
||||
if db, err := gosql.Open("mysql", mysql_uri); err == nil {
|
||||
knownDBs[cacheKey] = db
|
||||
} else {
|
||||
return db, exists, err
|
||||
}
|
||||
}
|
||||
return knownDBs[cacheKey], exists, nil
|
||||
}
|
||||
|
||||
// GetReplicationLag returns replication lag for a given connection config; either by explicit query
|
||||
@ -62,7 +77,10 @@ func GetReplicationLag(informationSchemaDb *gosql.DB, connectionConfig *Connecti
|
||||
func GetMasterKeyFromSlaveStatus(connectionConfig *ConnectionConfig) (masterKey *InstanceKey, err error) {
|
||||
currentUri := connectionConfig.GetDBUri("information_schema")
|
||||
// This function is only called once, okay to not have a cached connection pool
|
||||
db, err := GetDB(currentUri)
|
||||
db, err := gosql.Open("mysql", currentUri)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer db.Close()
|
||||
|
||||
if err != nil {
|
||||
|
Loading…
x
Reference in New Issue
Block a user