CR Revisions: Make concurrent operations safe

This commit is contained in:
Nikhil Mathew 2017-09-22 12:19:43 -07:00
parent a2847015d6
commit 5788ab5347
3 changed files with 12 additions and 16 deletions

View File

@ -34,14 +34,14 @@ type Applier struct {
db *gosql.DB db *gosql.DB
singletonDB *gosql.DB singletonDB *gosql.DB
migrationContext *base.MigrationContext migrationContext *base.MigrationContext
finishedMigrating bool finishedMigrating int64
} }
func NewApplier(migrationContext *base.MigrationContext) *Applier { func NewApplier(migrationContext *base.MigrationContext) *Applier {
return &Applier{ return &Applier{
connectionConfig: migrationContext.ApplierConnectionConfig, connectionConfig: migrationContext.ApplierConnectionConfig,
migrationContext: migrationContext, migrationContext: migrationContext,
finishedMigrating: false, finishedMigrating: 0,
} }
} }
@ -312,7 +312,7 @@ func (this *Applier) InitiateHeartbeat() {
heartbeatTick := time.Tick(time.Duration(this.migrationContext.HeartbeatIntervalMilliseconds) * time.Millisecond) heartbeatTick := time.Tick(time.Duration(this.migrationContext.HeartbeatIntervalMilliseconds) * time.Millisecond)
for range heartbeatTick { for range heartbeatTick {
if this.finishedMigrating { if atomic.LoadInt64(&this.finishedMigrating) > 0 {
return return
} }
// Generally speaking, we would issue a goroutine, but I'd actually rather // Generally speaking, we would issue a goroutine, but I'd actually rather
@ -1049,5 +1049,5 @@ func (this *Applier) Teardown() {
log.Debugf("Tearing down...") log.Debugf("Tearing down...")
this.db.Close() this.db.Close()
this.singletonDB.Close() this.singletonDB.Close()
this.finishedMigrating = true atomic.StoreInt64(&this.finishedMigrating, 1)
} }

View File

@ -85,7 +85,7 @@ type Migrator struct {
handledChangelogStates map[string]bool handledChangelogStates map[string]bool
finishedMigrating bool finishedMigrating int64
} }
func NewMigrator(context *base.MigrationContext) *Migrator { func NewMigrator(context *base.MigrationContext) *Migrator {
@ -100,7 +100,7 @@ func NewMigrator(context *base.MigrationContext) *Migrator {
copyRowsQueue: make(chan tableWriteFunc), copyRowsQueue: make(chan tableWriteFunc),
applyEventsQueue: make(chan *applyEventStruct, base.MaxEventsBatchSize), applyEventsQueue: make(chan *applyEventStruct, base.MaxEventsBatchSize),
handledChangelogStates: make(map[string]bool), handledChangelogStates: make(map[string]bool),
finishedMigrating: false, finishedMigrating: 0,
} }
return migrator return migrator
} }
@ -727,7 +727,7 @@ func (this *Migrator) initiateStatus() error {
this.printStatus(ForcePrintStatusAndHintRule) this.printStatus(ForcePrintStatusAndHintRule)
statusTick := time.Tick(1 * time.Second) statusTick := time.Tick(1 * time.Second)
for range statusTick { for range statusTick {
if this.finishedMigrating { if atomic.LoadInt64(&this.finishedMigrating) > 0 {
return nil return nil
} }
go this.printStatus(HeuristicPrintStatusRule) go this.printStatus(HeuristicPrintStatusRule)
@ -954,7 +954,7 @@ func (this *Migrator) initiateStreaming() error {
go func() { go func() {
ticker := time.Tick(1 * time.Second) ticker := time.Tick(1 * time.Second)
for range ticker { for range ticker {
if this.finishedMigrating { if atomic.LoadInt64(&this.finishedMigrating) > 0 {
return return
} }
this.migrationContext.SetRecentBinlogCoordinates(*this.eventsStreamer.GetCurrentBinlogCoordinates()) this.migrationContext.SetRecentBinlogCoordinates(*this.eventsStreamer.GetCurrentBinlogCoordinates())
@ -1147,7 +1147,7 @@ func (this *Migrator) executeWriteFuncs() error {
return nil return nil
} }
for { for {
if this.finishedMigrating { if atomic.LoadInt64(&this.finishedMigrating) > 0 {
return nil return nil
} }
@ -1232,7 +1232,7 @@ func (this *Migrator) finalCleanup() error {
} }
func (this *Migrator) teardown() { func (this *Migrator) teardown() {
this.finishedMigrating = true atomic.StoreInt64(&this.finishedMigrating, 1)
if this.inspector != nil { if this.inspector != nil {
log.Infof("Tearing down inspector") log.Infof("Tearing down inspector")

View File

@ -129,9 +129,7 @@ var knownDBsMutex = &sync.Mutex{}
// bool result indicates whether the DB was returned from cache; err // bool result indicates whether the DB was returned from cache; err
func GetDB(mysql_uri string) (*sql.DB, bool, error) { func GetDB(mysql_uri string) (*sql.DB, bool, error) {
knownDBsMutex.Lock() knownDBsMutex.Lock()
defer func() { defer knownDBsMutex.Unlock()
knownDBsMutex.Unlock()
}()
var exists bool var exists bool
if _, exists = knownDBs[mysql_uri]; !exists { if _, exists = knownDBs[mysql_uri]; !exists {
@ -148,9 +146,7 @@ func GetDB(mysql_uri string) (*sql.DB, bool, error) {
// and new connections are needed to access the DB // and new connections are needed to access the DB
func ResetDBCache() { func ResetDBCache() {
knownDBsMutex.Lock() knownDBsMutex.Lock()
defer func() { defer knownDBsMutex.Unlock()
knownDBsMutex.Unlock()
}()
knownDBs = make(map[string]*sql.DB) knownDBs = make(map[string]*sql.DB)
} }