Merge pull request #379 from github/fix-post-cut-over-writes

mitigating cut-over/write race condition
This commit is contained in:
Shlomi Noach 2017-03-05 18:10:01 +02:00 committed by GitHub
commit 6b981512f9
2 changed files with 16 additions and 3 deletions

View File

@ -348,7 +348,7 @@ func (this *Inspector) validateLogSlaveUpdates() error {
} }
if this.migrationContext.IsTungsten { if this.migrationContext.IsTungsten {
log.Warning("log_slave_updates not found on %s:%d, but --tungsten provided, so I'm proceeding", this.connectionConfig.Key.Hostname, this.connectionConfig.Key.Port) log.Warningf("log_slave_updates not found on %s:%d, but --tungsten provided, so I'm proceeding", this.connectionConfig.Key.Hostname, this.connectionConfig.Key.Port)
return nil return nil
} }
@ -357,7 +357,7 @@ func (this *Inspector) validateLogSlaveUpdates() error {
} }
if this.migrationContext.InspectorIsAlsoApplier() { if this.migrationContext.InspectorIsAlsoApplier() {
log.Warning("log_slave_updates not found on %s:%d, but executing directly on master, so I'm proceeeding", this.connectionConfig.Key.Hostname, this.connectionConfig.Key.Port) log.Warningf("log_slave_updates not found on %s:%d, but executing directly on master, so I'm proceeeding", this.connectionConfig.Key.Hostname, this.connectionConfig.Key.Port)
return nil return nil
} }

View File

@ -1000,11 +1000,13 @@ func (this *Migrator) iterateChunks() error {
for { for {
if atomic.LoadInt64(&this.rowCopyCompleteFlag) == 1 { if atomic.LoadInt64(&this.rowCopyCompleteFlag) == 1 {
// Done // Done
// There's another such check down the line
return nil return nil
} }
copyRowsFunc := func() error { copyRowsFunc := func() error {
if atomic.LoadInt64(&this.rowCopyCompleteFlag) == 1 { if atomic.LoadInt64(&this.rowCopyCompleteFlag) == 1 {
// Done // Done.
// There's another such check down the line
return nil return nil
} }
hasFurtherRange, err := this.applier.CalculateNextIterationRangeEndValues() hasFurtherRange, err := this.applier.CalculateNextIterationRangeEndValues()
@ -1016,6 +1018,17 @@ func (this *Migrator) iterateChunks() error {
} }
// Copy task: // Copy task:
applyCopyRowsFunc := func() error { applyCopyRowsFunc := func() error {
if atomic.LoadInt64(&this.rowCopyCompleteFlag) == 1 {
// No need for more writes.
// This is the de-facto place where we avoid writing in the event of completed cut-over.
// There could _still_ be a race condition, but that's as close as we can get.
// What about the race condition? Well, there's actually no data integrity issue.
// when rowCopyCompleteFlag==1 that means **guaranteed** all necessary rows have been copied.
// But some are still then collected at the binary log, and these are the ones we're trying to
// not apply here. If the race condition wins over us, then we just attempt to apply onto the
// _ghost_ table, which no longer exists. So, bothering error messages and all, but no damage.
return nil
}
_, rowsAffected, _, err := this.applier.ApplyIterationInsertQuery() _, rowsAffected, _, err := this.applier.ApplyIterationInsertQuery()
if err != nil { if err != nil {
return terminateRowIteration(err) return terminateRowIteration(err)