Stop infinite goroutines

This commit is contained in:
Nikhil Mathew 2017-08-08 15:31:25 -07:00
parent 7412f42371
commit a1473dfb58
2 changed files with 31 additions and 6 deletions

View File

@ -30,16 +30,18 @@ const (
// Applier is the one to actually write row data and apply binlog events onto the ghost table. // Applier is the one to actually write row data and apply binlog events onto the ghost table.
// It is where the ghost & changelog tables get created. It is where the cut-over phase happens. // It is where the ghost & changelog tables get created. It is where the cut-over phase happens.
type Applier struct { type Applier struct {
connectionConfig *mysql.ConnectionConfig connectionConfig *mysql.ConnectionConfig
db *gosql.DB db *gosql.DB
singletonDB *gosql.DB singletonDB *gosql.DB
migrationContext *base.MigrationContext migrationContext *base.MigrationContext
finishedMigrating bool
} }
func NewApplier(migrationContext *base.MigrationContext) *Applier { func NewApplier(migrationContext *base.MigrationContext) *Applier {
return &Applier{ return &Applier{
connectionConfig: migrationContext.ApplierConnectionConfig, connectionConfig: migrationContext.ApplierConnectionConfig,
migrationContext: migrationContext, migrationContext: migrationContext,
finishedMigrating: false,
} }
} }
@ -288,6 +290,10 @@ func (this *Applier) WriteChangelogState(value string) (string, error) {
return this.WriteAndLogChangelog("state", value) return this.WriteAndLogChangelog("state", value)
} }
func (this *Applier) FinalCleanup() {
this.finishedMigrating = true
}
// InitiateHeartbeat creates a heartbeat cycle, writing to the changelog table. // InitiateHeartbeat creates a heartbeat cycle, writing to the changelog table.
// This is done asynchronously // This is done asynchronously
func (this *Applier) InitiateHeartbeat() { func (this *Applier) InitiateHeartbeat() {
@ -310,6 +316,9 @@ func (this *Applier) InitiateHeartbeat() {
heartbeatTick := time.Tick(time.Duration(this.migrationContext.HeartbeatIntervalMilliseconds) * time.Millisecond) heartbeatTick := time.Tick(time.Duration(this.migrationContext.HeartbeatIntervalMilliseconds) * time.Millisecond)
for range heartbeatTick { for range heartbeatTick {
if this.finishedMigrating {
return
}
// Generally speaking, we would issue a goroutine, but I'd actually rather // Generally speaking, we would issue a goroutine, but I'd actually rather
// have this block the loop rather than spam the master in the event something // have this block the loop rather than spam the master in the event something
// goes wrong // goes wrong

View File

@ -83,6 +83,8 @@ type Migrator struct {
applyEventsQueue chan *applyEventStruct applyEventsQueue chan *applyEventStruct
handledChangelogStates map[string]bool handledChangelogStates map[string]bool
finishedMigrating bool
} }
func NewMigrator(context *base.MigrationContext) *Migrator { func NewMigrator(context *base.MigrationContext) *Migrator {
@ -97,6 +99,7 @@ func NewMigrator(context *base.MigrationContext) *Migrator {
copyRowsQueue: make(chan tableWriteFunc), copyRowsQueue: make(chan tableWriteFunc),
applyEventsQueue: make(chan *applyEventStruct, base.MaxEventsBatchSize), applyEventsQueue: make(chan *applyEventStruct, base.MaxEventsBatchSize),
handledChangelogStates: make(map[string]bool), handledChangelogStates: make(map[string]bool),
finishedMigrating: false,
} }
return migrator return migrator
} }
@ -718,6 +721,9 @@ func (this *Migrator) initiateStatus() error {
this.printStatus(ForcePrintStatusAndHintRule) this.printStatus(ForcePrintStatusAndHintRule)
statusTick := time.Tick(1 * time.Second) statusTick := time.Tick(1 * time.Second)
for range statusTick { for range statusTick {
if this.finishedMigrating {
return nil
}
go this.printStatus(HeuristicPrintStatusRule) go this.printStatus(HeuristicPrintStatusRule)
} }
@ -942,6 +948,9 @@ func (this *Migrator) initiateStreaming() error {
go func() { go func() {
ticker := time.Tick(1 * time.Second) ticker := time.Tick(1 * time.Second)
for range ticker { for range ticker {
if this.finishedMigrating {
return
}
this.migrationContext.SetRecentBinlogCoordinates(*this.eventsStreamer.GetCurrentBinlogCoordinates()) this.migrationContext.SetRecentBinlogCoordinates(*this.eventsStreamer.GetCurrentBinlogCoordinates())
} }
}() }()
@ -1132,6 +1141,10 @@ func (this *Migrator) executeWriteFuncs() error {
return nil return nil
} }
for { for {
if this.finishedMigrating {
return nil
}
this.throttler.throttle(nil) this.throttler.throttle(nil)
// We give higher priority to event processing, then secondary priority to // We give higher priority to event processing, then secondary priority to
@ -1209,5 +1222,8 @@ func (this *Migrator) finalCleanup() error {
} }
} }
this.finishedMigrating = true
this.applier.FinalCleanup()
return nil return nil
} }