Refactor global migration context
This commit is contained in:
parent
ed5c8044e9
commit
982b8eede9
@ -212,13 +212,7 @@ type ContextConfig struct {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
var context *MigrationContext
|
func NewMigrationContext() *MigrationContext {
|
||||||
|
|
||||||
func init() {
|
|
||||||
context = newMigrationContext()
|
|
||||||
}
|
|
||||||
|
|
||||||
func newMigrationContext() *MigrationContext {
|
|
||||||
return &MigrationContext{
|
return &MigrationContext{
|
||||||
defaultNumRetries: 60,
|
defaultNumRetries: 60,
|
||||||
ChunkSize: 1000,
|
ChunkSize: 1000,
|
||||||
@ -239,11 +233,6 @@ func newMigrationContext() *MigrationContext {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetMigrationContext
|
|
||||||
func GetMigrationContext() *MigrationContext {
|
|
||||||
return context
|
|
||||||
}
|
|
||||||
|
|
||||||
func getSafeTableName(baseName string, suffix string) string {
|
func getSafeTableName(baseName string, suffix string) string {
|
||||||
name := fmt.Sprintf("_%s_%s", baseName, suffix)
|
name := fmt.Sprintf("_%s_%s", baseName, suffix)
|
||||||
if len(name) <= mysql.MaxTableNameLength {
|
if len(name) <= mysql.MaxTableNameLength {
|
||||||
|
@ -19,27 +19,27 @@ func init() {
|
|||||||
|
|
||||||
func TestGetTableNames(t *testing.T) {
|
func TestGetTableNames(t *testing.T) {
|
||||||
{
|
{
|
||||||
context = newMigrationContext()
|
context := NewMigrationContext()
|
||||||
context.OriginalTableName = "some_table"
|
context.OriginalTableName = "some_table"
|
||||||
test.S(t).ExpectEquals(context.GetOldTableName(), "_some_table_del")
|
test.S(t).ExpectEquals(context.GetOldTableName(), "_some_table_del")
|
||||||
test.S(t).ExpectEquals(context.GetGhostTableName(), "_some_table_gho")
|
test.S(t).ExpectEquals(context.GetGhostTableName(), "_some_table_gho")
|
||||||
test.S(t).ExpectEquals(context.GetChangelogTableName(), "_some_table_ghc")
|
test.S(t).ExpectEquals(context.GetChangelogTableName(), "_some_table_ghc")
|
||||||
}
|
}
|
||||||
{
|
{
|
||||||
context = newMigrationContext()
|
context := NewMigrationContext()
|
||||||
context.OriginalTableName = "a123456789012345678901234567890123456789012345678901234567890"
|
context.OriginalTableName = "a123456789012345678901234567890123456789012345678901234567890"
|
||||||
test.S(t).ExpectEquals(context.GetOldTableName(), "_a1234567890123456789012345678901234567890123456789012345678_del")
|
test.S(t).ExpectEquals(context.GetOldTableName(), "_a1234567890123456789012345678901234567890123456789012345678_del")
|
||||||
test.S(t).ExpectEquals(context.GetGhostTableName(), "_a1234567890123456789012345678901234567890123456789012345678_gho")
|
test.S(t).ExpectEquals(context.GetGhostTableName(), "_a1234567890123456789012345678901234567890123456789012345678_gho")
|
||||||
test.S(t).ExpectEquals(context.GetChangelogTableName(), "_a1234567890123456789012345678901234567890123456789012345678_ghc")
|
test.S(t).ExpectEquals(context.GetChangelogTableName(), "_a1234567890123456789012345678901234567890123456789012345678_ghc")
|
||||||
}
|
}
|
||||||
{
|
{
|
||||||
context = newMigrationContext()
|
context := NewMigrationContext()
|
||||||
context.OriginalTableName = "a123456789012345678901234567890123456789012345678901234567890123"
|
context.OriginalTableName = "a123456789012345678901234567890123456789012345678901234567890123"
|
||||||
oldTableName := context.GetOldTableName()
|
oldTableName := context.GetOldTableName()
|
||||||
test.S(t).ExpectEquals(oldTableName, "_a1234567890123456789012345678901234567890123456789012345678_del")
|
test.S(t).ExpectEquals(oldTableName, "_a1234567890123456789012345678901234567890123456789012345678_del")
|
||||||
}
|
}
|
||||||
{
|
{
|
||||||
context = newMigrationContext()
|
context := NewMigrationContext()
|
||||||
context.OriginalTableName = "a123456789012345678901234567890123456789012345678901234567890123"
|
context.OriginalTableName = "a123456789012345678901234567890123456789012345678901234567890123"
|
||||||
context.TimestampOldTable = true
|
context.TimestampOldTable = true
|
||||||
longForm := "Jan 2, 2006 at 3:04pm (MST)"
|
longForm := "Jan 2, 2006 at 3:04pm (MST)"
|
||||||
@ -48,7 +48,7 @@ func TestGetTableNames(t *testing.T) {
|
|||||||
test.S(t).ExpectEquals(oldTableName, "_a1234567890123456789012345678901234567890123_20130203195400_del")
|
test.S(t).ExpectEquals(oldTableName, "_a1234567890123456789012345678901234567890123_20130203195400_del")
|
||||||
}
|
}
|
||||||
{
|
{
|
||||||
context = newMigrationContext()
|
context := NewMigrationContext()
|
||||||
context.OriginalTableName = "foo_bar_baz"
|
context.OriginalTableName = "foo_bar_baz"
|
||||||
context.ForceTmpTableName = "tmp"
|
context.ForceTmpTableName = "tmp"
|
||||||
test.S(t).ExpectEquals(context.GetOldTableName(), "_tmp_del")
|
test.S(t).ExpectEquals(context.GetOldTableName(), "_tmp_del")
|
||||||
|
@ -29,14 +29,14 @@ type GoMySQLReader struct {
|
|||||||
MigrationContext *base.MigrationContext
|
MigrationContext *base.MigrationContext
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewGoMySQLReader(connectionConfig *mysql.ConnectionConfig) (binlogReader *GoMySQLReader, err error) {
|
func NewGoMySQLReader(migrationContext *base.MigrationContext, connectionConfig *mysql.ConnectionConfig) (binlogReader *GoMySQLReader, err error) {
|
||||||
binlogReader = &GoMySQLReader{
|
binlogReader = &GoMySQLReader{
|
||||||
connectionConfig: connectionConfig,
|
connectionConfig: connectionConfig,
|
||||||
currentCoordinates: mysql.BinlogCoordinates{},
|
currentCoordinates: mysql.BinlogCoordinates{},
|
||||||
currentCoordinatesMutex: &sync.Mutex{},
|
currentCoordinatesMutex: &sync.Mutex{},
|
||||||
binlogSyncer: nil,
|
binlogSyncer: nil,
|
||||||
binlogStreamer: nil,
|
binlogStreamer: nil,
|
||||||
MigrationContext: base.GetMigrationContext(),
|
MigrationContext: migrationContext,
|
||||||
}
|
}
|
||||||
|
|
||||||
serverId := uint32(binlogReader.MigrationContext.ReplicaServerId)
|
serverId := uint32(binlogReader.MigrationContext.ReplicaServerId)
|
||||||
|
@ -43,7 +43,7 @@ func acceptSignals(migrationContext *base.MigrationContext) {
|
|||||||
|
|
||||||
// main is the application's entry point. It will either spawn a CLI or HTTP interfaces.
|
// main is the application's entry point. It will either spawn a CLI or HTTP interfaces.
|
||||||
func main() {
|
func main() {
|
||||||
migrationContext := base.GetMigrationContext()
|
migrationContext := base.NewMigrationContext()
|
||||||
|
|
||||||
flag.StringVar(&migrationContext.InspectorConnectionConfig.Key.Hostname, "host", "127.0.0.1", "MySQL hostname (preferably a replica, not the master)")
|
flag.StringVar(&migrationContext.InspectorConnectionConfig.Key.Hostname, "host", "127.0.0.1", "MySQL hostname (preferably a replica, not the master)")
|
||||||
flag.StringVar(&migrationContext.AssumeMasterHostname, "assume-master-host", "", "(optional) explicitly tell gh-ost the identity of the master. Format: some.host.com[:port] This is useful in master-master setups where you wish to pick an explicit master, or in a tungsten-replicator where gh-ost is unabel to determine the master")
|
flag.StringVar(&migrationContext.AssumeMasterHostname, "assume-master-host", "", "(optional) explicitly tell gh-ost the identity of the master. Format: some.host.com[:port] This is useful in master-master setups where you wish to pick an explicit master, or in a tungsten-replicator where gh-ost is unabel to determine the master")
|
||||||
@ -241,7 +241,7 @@ func main() {
|
|||||||
log.Infof("starting gh-ost %+v", AppVersion)
|
log.Infof("starting gh-ost %+v", AppVersion)
|
||||||
acceptSignals(migrationContext)
|
acceptSignals(migrationContext)
|
||||||
|
|
||||||
migrator := logic.NewMigrator()
|
migrator := logic.NewMigrator(migrationContext)
|
||||||
err := migrator.Migrate()
|
err := migrator.Migrate()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
migrator.ExecOnFailureHook()
|
migrator.ExecOnFailureHook()
|
||||||
|
@ -36,10 +36,10 @@ type Applier struct {
|
|||||||
migrationContext *base.MigrationContext
|
migrationContext *base.MigrationContext
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewApplier() *Applier {
|
func NewApplier(migrationContext *base.MigrationContext) *Applier {
|
||||||
return &Applier{
|
return &Applier{
|
||||||
connectionConfig: base.GetMigrationContext().ApplierConnectionConfig,
|
connectionConfig: migrationContext.ApplierConnectionConfig,
|
||||||
migrationContext: base.GetMigrationContext(),
|
migrationContext: migrationContext,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -37,9 +37,9 @@ type HooksExecutor struct {
|
|||||||
migrationContext *base.MigrationContext
|
migrationContext *base.MigrationContext
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewHooksExecutor() *HooksExecutor {
|
func NewHooksExecutor(migrationContext *base.MigrationContext) *HooksExecutor {
|
||||||
return &HooksExecutor{
|
return &HooksExecutor{
|
||||||
migrationContext: base.GetMigrationContext(),
|
migrationContext: migrationContext,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -31,10 +31,10 @@ type Inspector struct {
|
|||||||
migrationContext *base.MigrationContext
|
migrationContext *base.MigrationContext
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewInspector() *Inspector {
|
func NewInspector(migrationContext *base.MigrationContext) *Inspector {
|
||||||
return &Inspector{
|
return &Inspector{
|
||||||
connectionConfig: base.GetMigrationContext().InspectorConnectionConfig,
|
connectionConfig: migrationContext.InspectorConnectionConfig,
|
||||||
migrationContext: base.GetMigrationContext(),
|
migrationContext: migrationContext,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -87,9 +87,9 @@ type Migrator struct {
|
|||||||
handledChangelogStates map[string]bool
|
handledChangelogStates map[string]bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewMigrator() *Migrator {
|
func NewMigrator(context *base.MigrationContext) *Migrator {
|
||||||
migrator := &Migrator{
|
migrator := &Migrator{
|
||||||
migrationContext: base.GetMigrationContext(),
|
migrationContext: context,
|
||||||
parser: sql.NewParser(),
|
parser: sql.NewParser(),
|
||||||
ghostTableMigrated: make(chan bool),
|
ghostTableMigrated: make(chan bool),
|
||||||
firstThrottlingCollected: make(chan bool, 3),
|
firstThrottlingCollected: make(chan bool, 3),
|
||||||
@ -120,7 +120,7 @@ func (this *Migrator) acceptSignals() {
|
|||||||
|
|
||||||
// initiateHooksExecutor
|
// initiateHooksExecutor
|
||||||
func (this *Migrator) initiateHooksExecutor() (err error) {
|
func (this *Migrator) initiateHooksExecutor() (err error) {
|
||||||
this.hooksExecutor = NewHooksExecutor()
|
this.hooksExecutor = NewHooksExecutor(this.migrationContext)
|
||||||
if err := this.hooksExecutor.initHooks(); err != nil {
|
if err := this.hooksExecutor.initHooks(); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -655,7 +655,7 @@ func (this *Migrator) initiateServer() (err error) {
|
|||||||
var f printStatusFunc = func(rule PrintStatusRule, writer io.Writer) {
|
var f printStatusFunc = func(rule PrintStatusRule, writer io.Writer) {
|
||||||
this.printStatus(rule, writer)
|
this.printStatus(rule, writer)
|
||||||
}
|
}
|
||||||
this.server = NewServer(this.hooksExecutor, f)
|
this.server = NewServer(this.migrationContext, this.hooksExecutor, f)
|
||||||
if err := this.server.BindSocketFile(); err != nil {
|
if err := this.server.BindSocketFile(); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -675,7 +675,7 @@ func (this *Migrator) initiateServer() (err error) {
|
|||||||
// - heartbeat
|
// - heartbeat
|
||||||
// When `--allow-on-master` is supplied, the inspector is actually the master.
|
// When `--allow-on-master` is supplied, the inspector is actually the master.
|
||||||
func (this *Migrator) initiateInspector() (err error) {
|
func (this *Migrator) initiateInspector() (err error) {
|
||||||
this.inspector = NewInspector()
|
this.inspector = NewInspector(this.migrationContext)
|
||||||
if err := this.inspector.InitDBConnections(); err != nil {
|
if err := this.inspector.InitDBConnections(); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -934,7 +934,7 @@ func (this *Migrator) printStatus(rule PrintStatusRule, writers ...io.Writer) {
|
|||||||
|
|
||||||
// initiateStreaming begins treaming of binary log events and registers listeners for such events
|
// initiateStreaming begins treaming of binary log events and registers listeners for such events
|
||||||
func (this *Migrator) initiateStreaming() error {
|
func (this *Migrator) initiateStreaming() error {
|
||||||
this.eventsStreamer = NewEventsStreamer()
|
this.eventsStreamer = NewEventsStreamer(this.migrationContext)
|
||||||
if err := this.eventsStreamer.InitDBConnections(); err != nil {
|
if err := this.eventsStreamer.InitDBConnections(); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -982,7 +982,7 @@ func (this *Migrator) addDMLEventsListener() error {
|
|||||||
|
|
||||||
// initiateThrottler kicks in the throttling collection and the throttling checks.
|
// initiateThrottler kicks in the throttling collection and the throttling checks.
|
||||||
func (this *Migrator) initiateThrottler() error {
|
func (this *Migrator) initiateThrottler() error {
|
||||||
this.throttler = NewThrottler(this.applier, this.inspector)
|
this.throttler = NewThrottler(this.migrationContext, this.applier, this.inspector)
|
||||||
|
|
||||||
go this.throttler.initiateThrottlerCollection(this.firstThrottlingCollected)
|
go this.throttler.initiateThrottlerCollection(this.firstThrottlingCollected)
|
||||||
log.Infof("Waiting for first throttle metrics to be collected")
|
log.Infof("Waiting for first throttle metrics to be collected")
|
||||||
@ -996,7 +996,7 @@ func (this *Migrator) initiateThrottler() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (this *Migrator) initiateApplier() error {
|
func (this *Migrator) initiateApplier() error {
|
||||||
this.applier = NewApplier()
|
this.applier = NewApplier(this.migrationContext)
|
||||||
if err := this.applier.InitDBConnections(); err != nil {
|
if err := this.applier.InitDBConnections(); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -30,9 +30,9 @@ type Server struct {
|
|||||||
printStatus printStatusFunc
|
printStatus printStatusFunc
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewServer(hooksExecutor *HooksExecutor, printStatus printStatusFunc) *Server {
|
func NewServer(migrationContext *base.MigrationContext, hooksExecutor *HooksExecutor, printStatus printStatusFunc) *Server {
|
||||||
return &Server{
|
return &Server{
|
||||||
migrationContext: base.GetMigrationContext(),
|
migrationContext: migrationContext,
|
||||||
hooksExecutor: hooksExecutor,
|
hooksExecutor: hooksExecutor,
|
||||||
printStatus: printStatus,
|
printStatus: printStatus,
|
||||||
}
|
}
|
||||||
|
@ -45,10 +45,10 @@ type EventsStreamer struct {
|
|||||||
binlogReader *binlog.GoMySQLReader
|
binlogReader *binlog.GoMySQLReader
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewEventsStreamer() *EventsStreamer {
|
func NewEventsStreamer(migrationContext *base.MigrationContext) *EventsStreamer {
|
||||||
return &EventsStreamer{
|
return &EventsStreamer{
|
||||||
connectionConfig: base.GetMigrationContext().InspectorConnectionConfig,
|
connectionConfig: migrationContext.InspectorConnectionConfig,
|
||||||
migrationContext: base.GetMigrationContext(),
|
migrationContext: migrationContext,
|
||||||
listeners: [](*BinlogEventListener){},
|
listeners: [](*BinlogEventListener){},
|
||||||
listenersMutex: &sync.Mutex{},
|
listenersMutex: &sync.Mutex{},
|
||||||
eventsChannel: make(chan *binlog.BinlogEntry, EventsChannelBufferSize),
|
eventsChannel: make(chan *binlog.BinlogEntry, EventsChannelBufferSize),
|
||||||
@ -122,7 +122,7 @@ func (this *EventsStreamer) InitDBConnections() (err error) {
|
|||||||
|
|
||||||
// initBinlogReader creates and connects the reader: we hook up to a MySQL server as a replica
|
// initBinlogReader creates and connects the reader: we hook up to a MySQL server as a replica
|
||||||
func (this *EventsStreamer) initBinlogReader(binlogCoordinates *mysql.BinlogCoordinates) error {
|
func (this *EventsStreamer) initBinlogReader(binlogCoordinates *mysql.BinlogCoordinates) error {
|
||||||
goMySQLReader, err := binlog.NewGoMySQLReader(this.migrationContext.InspectorConnectionConfig)
|
goMySQLReader, err := binlog.NewGoMySQLReader(this.migrationContext, this.migrationContext.InspectorConnectionConfig)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -47,9 +47,9 @@ type Throttler struct {
|
|||||||
inspector *Inspector
|
inspector *Inspector
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewThrottler(applier *Applier, inspector *Inspector) *Throttler {
|
func NewThrottler(migrationContext *base.MigrationContext, applier *Applier, inspector *Inspector) *Throttler {
|
||||||
return &Throttler{
|
return &Throttler{
|
||||||
migrationContext: base.GetMigrationContext(),
|
migrationContext: migrationContext,
|
||||||
applier: applier,
|
applier: applier,
|
||||||
inspector: inspector,
|
inspector: inspector,
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user