diff --git a/go/base/context.go b/go/base/context.go index 06a1c15..23fe6f6 100644 --- a/go/base/context.go +++ b/go/base/context.go @@ -14,6 +14,8 @@ import ( "sync/atomic" "time" + "github.com/satori/go.uuid" + "github.com/github/gh-ost/go/mysql" "github.com/github/gh-ost/go/sql" @@ -71,6 +73,8 @@ func NewThrottleCheckResult(throttle bool, reason string, reasonHint ThrottleRea // MigrationContext has the general, global state of migration. It is used by // all components throughout the migration process. type MigrationContext struct { + Uuid string + DatabaseName string OriginalTableName string AlterStatement string @@ -212,6 +216,7 @@ type ContextConfig struct { func NewMigrationContext() *MigrationContext { return &MigrationContext{ + Uuid: uuid.NewV4().String(), defaultNumRetries: 60, ChunkSize: 1000, InspectorConnectionConfig: mysql.NewConnectionConfig(), diff --git a/go/binlog/gomysql_reader.go b/go/binlog/gomysql_reader.go index cb6d2b7..6016f81 100644 --- a/go/binlog/gomysql_reader.go +++ b/go/binlog/gomysql_reader.go @@ -158,10 +158,6 @@ func (this *GoMySQLReader) StreamEvents(canStopStreaming func() bool, entriesCha } func (this *GoMySQLReader) Close() error { - // Historically there was a: - // this.binlogSyncer.Close() - // here. A new go-mysql version closes the binlog syncer connection independently. - // I will go against the sacred rules of comments and just leave this here. - // This is the year 2017. Let's see what year these comments get deleted. + this.binlogSyncer.Close() return nil } diff --git a/go/logic/applier.go b/go/logic/applier.go index 7e9b204..227b59e 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -68,12 +68,13 @@ func NewApplier(migrationContext *base.MigrationContext) *Applier { } func (this *Applier) InitDBConnections() (err error) { + applierUri := this.connectionConfig.GetDBUri(this.migrationContext.DatabaseName) - if this.db, err = mysql.GetDB(applierUri); err != nil { + if this.db, _, err = mysql.GetDB(this.migrationContext.Uuid, applierUri); err != nil { return err } singletonApplierUri := fmt.Sprintf("%s?timeout=0", applierUri) - if this.singletonDB, err = mysql.GetDB(singletonApplierUri); err != nil { + if this.singletonDB, _, err = mysql.GetDB(this.migrationContext.Uuid, singletonApplierUri); err != nil { return err } this.singletonDB.SetMaxOpenConns(1) diff --git a/go/logic/inspect.go b/go/logic/inspect.go index cdefb80..31c81dc 100644 --- a/go/logic/inspect.go +++ b/go/logic/inspect.go @@ -41,12 +41,12 @@ func NewInspector(migrationContext *base.MigrationContext) *Inspector { func (this *Inspector) InitDBConnections() (err error) { inspectorUri := this.connectionConfig.GetDBUri(this.migrationContext.DatabaseName) - if this.db, err = mysql.GetDB(inspectorUri); err != nil { + if this.db, _, err = mysql.GetDB(this.migrationContext.Uuid, inspectorUri); err != nil { return err } informationSchemaUri := this.connectionConfig.GetDBUri("information_schema") - if this.informationSchemaDb, err = mysql.GetDB(informationSchemaUri); err != nil { + if this.informationSchemaDb, _, err = mysql.GetDB(this.migrationContext.Uuid, informationSchemaUri); err != nil { return err } diff --git a/go/logic/streamer.go b/go/logic/streamer.go index 4f11a3d..37e7195 100644 --- a/go/logic/streamer.go +++ b/go/logic/streamer.go @@ -104,7 +104,7 @@ func (this *EventsStreamer) notifyListeners(binlogEvent *binlog.BinlogDMLEvent) func (this *EventsStreamer) InitDBConnections() (err error) { EventsStreamerUri := this.connectionConfig.GetDBUri(this.migrationContext.DatabaseName) - if this.db, err = mysql.GetDB(EventsStreamerUri); err != nil { + if this.db, _, err = mysql.GetDB(this.migrationContext.Uuid, EventsStreamerUri); err != nil { return err } if _, err := base.ValidateConnection(this.db, this.connectionConfig); err != nil { diff --git a/go/logic/throttler.go b/go/logic/throttler.go index efc1a4d..624956a 100644 --- a/go/logic/throttler.go +++ b/go/logic/throttler.go @@ -41,16 +41,18 @@ const frenoMagicHint = "freno" // Throttler collects metrics related to throttling and makes informed decision // whether throttling should take place. type Throttler struct { - migrationContext *base.MigrationContext - applier *Applier - inspector *Inspector + migrationContext *base.MigrationContext + applier *Applier + inspector *Inspector + finishedMigrating int64 } func NewThrottler(migrationContext *base.MigrationContext, applier *Applier, inspector *Inspector) *Throttler { return &Throttler{ - migrationContext: migrationContext, - applier: applier, - inspector: inspector, + migrationContext: migrationContext, + applier: applier, + inspector: inspector, + finishedMigrating: 0, } } @@ -159,6 +161,9 @@ func (this *Throttler) collectReplicationLag(firstThrottlingCollected chan<- boo ticker := time.Tick(time.Duration(this.migrationContext.HeartbeatIntervalMilliseconds) * time.Millisecond) for range ticker { + if atomic.LoadInt64(&this.finishedMigrating) > 0 { + return + } go collectFunc() } } @@ -181,7 +186,7 @@ func (this *Throttler) collectControlReplicasLag() { dbUri := connectionConfig.GetDBUri("information_schema") var heartbeatValue string - if db, err := mysql.GetDB(dbUri); err != nil { + if db, _, err := mysql.GetDB(this.migrationContext.Uuid, dbUri); err != nil { return lag, err } else if err = db.QueryRow(replicationLagQuery).Scan(&heartbeatValue); err != nil { return lag, err @@ -233,6 +238,9 @@ func (this *Throttler) collectControlReplicasLag() { shouldReadLagAggressively := false for range aggressiveTicker { + if atomic.LoadInt64(&this.finishedMigrating) > 0 { + return + } if counter%relaxedFactor == 0 { // we only check if we wish to be aggressive once per second. The parameters for being aggressive // do not typically change at all throughout the migration, but nonetheless we check them. @@ -285,6 +293,10 @@ func (this *Throttler) collectThrottleHTTPStatus(firstThrottlingCollected chan<- ticker := time.Tick(100 * time.Millisecond) for range ticker { + if atomic.LoadInt64(&this.finishedMigrating) > 0 { + return + } + if sleep, _ := collectFunc(); sleep { time.Sleep(1 * time.Second) } @@ -393,6 +405,10 @@ func (this *Throttler) initiateThrottlerCollection(firstThrottlingCollected chan throttlerMetricsTick := time.Tick(1 * time.Second) for range throttlerMetricsTick { + if atomic.LoadInt64(&this.finishedMigrating) > 0 { + return + } + this.collectGeneralThrottleMetrics() } }() @@ -419,6 +435,9 @@ func (this *Throttler) initiateThrottlerChecks() error { } throttlerFunction() for range throttlerTick { + if atomic.LoadInt64(&this.finishedMigrating) > 0 { + return nil + } throttlerFunction() } @@ -440,3 +459,8 @@ func (this *Throttler) throttle(onThrottled func()) { time.Sleep(250 * time.Millisecond) } } + +func (this *Throttler) Teardown() { + log.Debugf("Tearing down...") + atomic.StoreInt64(&this.finishedMigrating, 1) +} diff --git a/go/mysql/utils.go b/go/mysql/utils.go index 28e2ec1..532cbb4 100644 --- a/go/mysql/utils.go +++ b/go/mysql/utils.go @@ -8,6 +8,7 @@ package mysql import ( gosql "database/sql" "fmt" + "sync" "time" "github.com/github/gh-ost/go/sql" @@ -33,13 +34,27 @@ func (this *ReplicationLagResult) HasLag() bool { return this.Lag > 0 } -func GetDB(mysql_uri string) (*gosql.DB, error) { - db, err := gosql.Open("mysql", mysql_uri) - if err == nil { - return db, nil - } else { - return nil, err +// knownDBs is a DB cache by uri +var knownDBs map[string]*gosql.DB = make(map[string]*gosql.DB) +var knownDBsMutex = &sync.Mutex{} + +func GetDB(migrationUuid string, mysql_uri string) (*gosql.DB, bool, error) { + cacheKey := migrationUuid + ":" + mysql_uri + + knownDBsMutex.Lock() + defer func() { + knownDBsMutex.Unlock() + }() + + var exists bool + if _, exists = knownDBs[cacheKey]; !exists { + if db, err := gosql.Open("mysql", mysql_uri); err == nil { + knownDBs[cacheKey] = db + } else { + return db, exists, err + } } + return knownDBs[cacheKey], exists, nil } // GetReplicationLag returns replication lag for a given connection config; either by explicit query @@ -62,7 +77,10 @@ func GetReplicationLag(informationSchemaDb *gosql.DB, connectionConfig *Connecti func GetMasterKeyFromSlaveStatus(connectionConfig *ConnectionConfig) (masterKey *InstanceKey, err error) { currentUri := connectionConfig.GetDBUri("information_schema") // This function is only called once, okay to not have a cached connection pool - db, err := GetDB(currentUri) + db, err := gosql.Open("mysql", currentUri) + if err != nil { + return nil, err + } defer db.Close() if err != nil {