From f11f6f978ffe3a1de46e4e6302c5834340b9a496 Mon Sep 17 00:00:00 2001 From: Shlomi Noach Date: Fri, 24 Feb 2017 14:48:10 -0700 Subject: [PATCH] mitigating cut-over/write race condition --- go/logic/inspect.go | 4 ++-- go/logic/migrator.go | 15 ++++++++++++++- 2 files changed, 16 insertions(+), 3 deletions(-) diff --git a/go/logic/inspect.go b/go/logic/inspect.go index 862be16..1d30cb5 100644 --- a/go/logic/inspect.go +++ b/go/logic/inspect.go @@ -348,7 +348,7 @@ func (this *Inspector) validateLogSlaveUpdates() error { } if this.migrationContext.IsTungsten { - log.Warning("log_slave_updates not found on %s:%d, but --tungsten provided, so I'm proceeding", this.connectionConfig.Key.Hostname, this.connectionConfig.Key.Port) + log.Warningf("log_slave_updates not found on %s:%d, but --tungsten provided, so I'm proceeding", this.connectionConfig.Key.Hostname, this.connectionConfig.Key.Port) return nil } @@ -357,7 +357,7 @@ func (this *Inspector) validateLogSlaveUpdates() error { } if this.migrationContext.InspectorIsAlsoApplier() { - log.Warning("log_slave_updates not found on %s:%d, but executing directly on master, so I'm proceeeding", this.connectionConfig.Key.Hostname, this.connectionConfig.Key.Port) + log.Warningf("log_slave_updates not found on %s:%d, but executing directly on master, so I'm proceeeding", this.connectionConfig.Key.Hostname, this.connectionConfig.Key.Port) return nil } diff --git a/go/logic/migrator.go b/go/logic/migrator.go index de3a018..94b5e54 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -1000,11 +1000,13 @@ func (this *Migrator) iterateChunks() error { for { if atomic.LoadInt64(&this.rowCopyCompleteFlag) == 1 { // Done + // There's another such check down the line return nil } copyRowsFunc := func() error { if atomic.LoadInt64(&this.rowCopyCompleteFlag) == 1 { - // Done + // Done. + // There's another such check down the line return nil } hasFurtherRange, err := this.applier.CalculateNextIterationRangeEndValues() @@ -1016,6 +1018,17 @@ func (this *Migrator) iterateChunks() error { } // Copy task: applyCopyRowsFunc := func() error { + if atomic.LoadInt64(&this.rowCopyCompleteFlag) == 1 { + // No need for more writes. + // This is the de-facto place where we avoid writing in the event of completed cut-over. + // There could _still_ be a race condition, but that's as close as we can get. + // What about the race condition? Well, there's actually no data integrity issue. + // when rowCopyCompleteFlag==1 that means **guaranteed** all necessary rows have been copied. + // But some are still then collected at the binary log, and these are the ones we're trying to + // not apply here. If the race condition wins over us, then we just attempt to apply onto the + // _ghost_ table, which no longer exists. So, bothering error messages and all, but no damage. + return nil + } _, rowsAffected, _, err := this.applier.ApplyIterationInsertQuery() if err != nil { return terminateRowIteration(err)