From 982b8eede98cdeb6b80fbf0f0a290abcb3206626 Mon Sep 17 00:00:00 2001 From: Nikhil Mathew Date: Tue, 8 Aug 2017 13:36:54 -0700 Subject: [PATCH] Refactor global migration context --- go/base/context.go | 13 +------------ go/base/context_test.go | 10 +++++----- go/binlog/gomysql_reader.go | 4 ++-- go/cmd/gh-ost/main.go | 4 ++-- go/logic/applier.go | 6 +++--- go/logic/hooks.go | 4 ++-- go/logic/inspect.go | 6 +++--- go/logic/migrator.go | 16 ++++++++-------- go/logic/server.go | 4 ++-- go/logic/streamer.go | 8 ++++---- go/logic/throttler.go | 4 ++-- 11 files changed, 34 insertions(+), 45 deletions(-) diff --git a/go/base/context.go b/go/base/context.go index fff040e..acfca5a 100644 --- a/go/base/context.go +++ b/go/base/context.go @@ -212,13 +212,7 @@ type ContextConfig struct { } } -var context *MigrationContext - -func init() { - context = newMigrationContext() -} - -func newMigrationContext() *MigrationContext { +func NewMigrationContext() *MigrationContext { return &MigrationContext{ defaultNumRetries: 60, ChunkSize: 1000, @@ -239,11 +233,6 @@ func newMigrationContext() *MigrationContext { } } -// GetMigrationContext -func GetMigrationContext() *MigrationContext { - return context -} - func getSafeTableName(baseName string, suffix string) string { name := fmt.Sprintf("_%s_%s", baseName, suffix) if len(name) <= mysql.MaxTableNameLength { diff --git a/go/base/context_test.go b/go/base/context_test.go index b3a98eb..8a9c6a5 100644 --- a/go/base/context_test.go +++ b/go/base/context_test.go @@ -19,27 +19,27 @@ func init() { func TestGetTableNames(t *testing.T) { { - context = newMigrationContext() + context := NewMigrationContext() context.OriginalTableName = "some_table" test.S(t).ExpectEquals(context.GetOldTableName(), "_some_table_del") test.S(t).ExpectEquals(context.GetGhostTableName(), "_some_table_gho") test.S(t).ExpectEquals(context.GetChangelogTableName(), "_some_table_ghc") } { - context = newMigrationContext() + context := NewMigrationContext() context.OriginalTableName = "a123456789012345678901234567890123456789012345678901234567890" test.S(t).ExpectEquals(context.GetOldTableName(), "_a1234567890123456789012345678901234567890123456789012345678_del") test.S(t).ExpectEquals(context.GetGhostTableName(), "_a1234567890123456789012345678901234567890123456789012345678_gho") test.S(t).ExpectEquals(context.GetChangelogTableName(), "_a1234567890123456789012345678901234567890123456789012345678_ghc") } { - context = newMigrationContext() + context := NewMigrationContext() context.OriginalTableName = "a123456789012345678901234567890123456789012345678901234567890123" oldTableName := context.GetOldTableName() test.S(t).ExpectEquals(oldTableName, "_a1234567890123456789012345678901234567890123456789012345678_del") } { - context = newMigrationContext() + context := NewMigrationContext() context.OriginalTableName = "a123456789012345678901234567890123456789012345678901234567890123" context.TimestampOldTable = true longForm := "Jan 2, 2006 at 3:04pm (MST)" @@ -48,7 +48,7 @@ func TestGetTableNames(t *testing.T) { test.S(t).ExpectEquals(oldTableName, "_a1234567890123456789012345678901234567890123_20130203195400_del") } { - context = newMigrationContext() + context := NewMigrationContext() context.OriginalTableName = "foo_bar_baz" context.ForceTmpTableName = "tmp" test.S(t).ExpectEquals(context.GetOldTableName(), "_tmp_del") diff --git a/go/binlog/gomysql_reader.go b/go/binlog/gomysql_reader.go index 9feca87..b27de94 100644 --- a/go/binlog/gomysql_reader.go +++ b/go/binlog/gomysql_reader.go @@ -29,14 +29,14 @@ type GoMySQLReader struct { MigrationContext *base.MigrationContext } -func NewGoMySQLReader(connectionConfig *mysql.ConnectionConfig) (binlogReader *GoMySQLReader, err error) { +func NewGoMySQLReader(migrationContext *base.MigrationContext, connectionConfig *mysql.ConnectionConfig) (binlogReader *GoMySQLReader, err error) { binlogReader = &GoMySQLReader{ connectionConfig: connectionConfig, currentCoordinates: mysql.BinlogCoordinates{}, currentCoordinatesMutex: &sync.Mutex{}, binlogSyncer: nil, binlogStreamer: nil, - MigrationContext: base.GetMigrationContext(), + MigrationContext: migrationContext, } serverId := uint32(binlogReader.MigrationContext.ReplicaServerId) diff --git a/go/cmd/gh-ost/main.go b/go/cmd/gh-ost/main.go index 7aabe81..d55b631 100644 --- a/go/cmd/gh-ost/main.go +++ b/go/cmd/gh-ost/main.go @@ -43,7 +43,7 @@ func acceptSignals(migrationContext *base.MigrationContext) { // main is the application's entry point. It will either spawn a CLI or HTTP interfaces. func main() { - migrationContext := base.GetMigrationContext() + migrationContext := base.NewMigrationContext() flag.StringVar(&migrationContext.InspectorConnectionConfig.Key.Hostname, "host", "127.0.0.1", "MySQL hostname (preferably a replica, not the master)") flag.StringVar(&migrationContext.AssumeMasterHostname, "assume-master-host", "", "(optional) explicitly tell gh-ost the identity of the master. Format: some.host.com[:port] This is useful in master-master setups where you wish to pick an explicit master, or in a tungsten-replicator where gh-ost is unabel to determine the master") @@ -241,7 +241,7 @@ func main() { log.Infof("starting gh-ost %+v", AppVersion) acceptSignals(migrationContext) - migrator := logic.NewMigrator() + migrator := logic.NewMigrator(migrationContext) err := migrator.Migrate() if err != nil { migrator.ExecOnFailureHook() diff --git a/go/logic/applier.go b/go/logic/applier.go index b167de8..2a59f3f 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -36,10 +36,10 @@ type Applier struct { migrationContext *base.MigrationContext } -func NewApplier() *Applier { +func NewApplier(migrationContext *base.MigrationContext) *Applier { return &Applier{ - connectionConfig: base.GetMigrationContext().ApplierConnectionConfig, - migrationContext: base.GetMigrationContext(), + connectionConfig: migrationContext.ApplierConnectionConfig, + migrationContext: migrationContext, } } diff --git a/go/logic/hooks.go b/go/logic/hooks.go index 58825ee..1fdfd5c 100644 --- a/go/logic/hooks.go +++ b/go/logic/hooks.go @@ -37,9 +37,9 @@ type HooksExecutor struct { migrationContext *base.MigrationContext } -func NewHooksExecutor() *HooksExecutor { +func NewHooksExecutor(migrationContext *base.MigrationContext) *HooksExecutor { return &HooksExecutor{ - migrationContext: base.GetMigrationContext(), + migrationContext: migrationContext, } } diff --git a/go/logic/inspect.go b/go/logic/inspect.go index 5049193..0537e9e 100644 --- a/go/logic/inspect.go +++ b/go/logic/inspect.go @@ -31,10 +31,10 @@ type Inspector struct { migrationContext *base.MigrationContext } -func NewInspector() *Inspector { +func NewInspector(migrationContext *base.MigrationContext) *Inspector { return &Inspector{ - connectionConfig: base.GetMigrationContext().InspectorConnectionConfig, - migrationContext: base.GetMigrationContext(), + connectionConfig: migrationContext.InspectorConnectionConfig, + migrationContext: migrationContext, } } diff --git a/go/logic/migrator.go b/go/logic/migrator.go index ec8d425..8dd4fb7 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -87,9 +87,9 @@ type Migrator struct { handledChangelogStates map[string]bool } -func NewMigrator() *Migrator { +func NewMigrator(context *base.MigrationContext) *Migrator { migrator := &Migrator{ - migrationContext: base.GetMigrationContext(), + migrationContext: context, parser: sql.NewParser(), ghostTableMigrated: make(chan bool), firstThrottlingCollected: make(chan bool, 3), @@ -120,7 +120,7 @@ func (this *Migrator) acceptSignals() { // initiateHooksExecutor func (this *Migrator) initiateHooksExecutor() (err error) { - this.hooksExecutor = NewHooksExecutor() + this.hooksExecutor = NewHooksExecutor(this.migrationContext) if err := this.hooksExecutor.initHooks(); err != nil { return err } @@ -655,7 +655,7 @@ func (this *Migrator) initiateServer() (err error) { var f printStatusFunc = func(rule PrintStatusRule, writer io.Writer) { this.printStatus(rule, writer) } - this.server = NewServer(this.hooksExecutor, f) + this.server = NewServer(this.migrationContext, this.hooksExecutor, f) if err := this.server.BindSocketFile(); err != nil { return err } @@ -675,7 +675,7 @@ func (this *Migrator) initiateServer() (err error) { // - heartbeat // When `--allow-on-master` is supplied, the inspector is actually the master. func (this *Migrator) initiateInspector() (err error) { - this.inspector = NewInspector() + this.inspector = NewInspector(this.migrationContext) if err := this.inspector.InitDBConnections(); err != nil { return err } @@ -934,7 +934,7 @@ func (this *Migrator) printStatus(rule PrintStatusRule, writers ...io.Writer) { // initiateStreaming begins treaming of binary log events and registers listeners for such events func (this *Migrator) initiateStreaming() error { - this.eventsStreamer = NewEventsStreamer() + this.eventsStreamer = NewEventsStreamer(this.migrationContext) if err := this.eventsStreamer.InitDBConnections(); err != nil { return err } @@ -982,7 +982,7 @@ func (this *Migrator) addDMLEventsListener() error { // initiateThrottler kicks in the throttling collection and the throttling checks. func (this *Migrator) initiateThrottler() error { - this.throttler = NewThrottler(this.applier, this.inspector) + this.throttler = NewThrottler(this.migrationContext, this.applier, this.inspector) go this.throttler.initiateThrottlerCollection(this.firstThrottlingCollected) log.Infof("Waiting for first throttle metrics to be collected") @@ -996,7 +996,7 @@ func (this *Migrator) initiateThrottler() error { } func (this *Migrator) initiateApplier() error { - this.applier = NewApplier() + this.applier = NewApplier(this.migrationContext) if err := this.applier.InitDBConnections(); err != nil { return err } diff --git a/go/logic/server.go b/go/logic/server.go index 7034cfd..45c237a 100644 --- a/go/logic/server.go +++ b/go/logic/server.go @@ -30,9 +30,9 @@ type Server struct { printStatus printStatusFunc } -func NewServer(hooksExecutor *HooksExecutor, printStatus printStatusFunc) *Server { +func NewServer(migrationContext *base.MigrationContext, hooksExecutor *HooksExecutor, printStatus printStatusFunc) *Server { return &Server{ - migrationContext: base.GetMigrationContext(), + migrationContext: migrationContext, hooksExecutor: hooksExecutor, printStatus: printStatus, } diff --git a/go/logic/streamer.go b/go/logic/streamer.go index dc5ba60..cacf32c 100644 --- a/go/logic/streamer.go +++ b/go/logic/streamer.go @@ -45,10 +45,10 @@ type EventsStreamer struct { binlogReader *binlog.GoMySQLReader } -func NewEventsStreamer() *EventsStreamer { +func NewEventsStreamer(migrationContext *base.MigrationContext) *EventsStreamer { return &EventsStreamer{ - connectionConfig: base.GetMigrationContext().InspectorConnectionConfig, - migrationContext: base.GetMigrationContext(), + connectionConfig: migrationContext.InspectorConnectionConfig, + migrationContext: migrationContext, listeners: [](*BinlogEventListener){}, listenersMutex: &sync.Mutex{}, eventsChannel: make(chan *binlog.BinlogEntry, EventsChannelBufferSize), @@ -122,7 +122,7 @@ func (this *EventsStreamer) InitDBConnections() (err error) { // initBinlogReader creates and connects the reader: we hook up to a MySQL server as a replica func (this *EventsStreamer) initBinlogReader(binlogCoordinates *mysql.BinlogCoordinates) error { - goMySQLReader, err := binlog.NewGoMySQLReader(this.migrationContext.InspectorConnectionConfig) + goMySQLReader, err := binlog.NewGoMySQLReader(this.migrationContext, this.migrationContext.InspectorConnectionConfig) if err != nil { return err } diff --git a/go/logic/throttler.go b/go/logic/throttler.go index ae95b70..9843f04 100644 --- a/go/logic/throttler.go +++ b/go/logic/throttler.go @@ -47,9 +47,9 @@ type Throttler struct { inspector *Inspector } -func NewThrottler(applier *Applier, inspector *Inspector) *Throttler { +func NewThrottler(migrationContext *base.MigrationContext, applier *Applier, inspector *Inspector) *Throttler { return &Throttler{ - migrationContext: base.GetMigrationContext(), + migrationContext: migrationContext, applier: applier, inspector: inspector, }