From e6dd2c90d20f99356de74925f97d9a88f350c07c Mon Sep 17 00:00:00 2001 From: Shlomi Noach Date: Tue, 11 Jul 2017 13:39:11 +0300 Subject: [PATCH] responding to errors in rowcopy --- go/logic/migrator.go | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/go/logic/migrator.go b/go/logic/migrator.go index 8720991..f4e10fb 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -79,7 +79,7 @@ type Migrator struct { firstThrottlingCollected chan bool ghostTableMigrated chan bool - rowCopyComplete chan bool + rowCopyComplete chan error allEventsUpToLockProcessed chan string rowCopyCompleteFlag int64 @@ -97,7 +97,7 @@ func NewMigrator() *Migrator { parser: sql.NewParser(), ghostTableMigrated: make(chan bool), firstThrottlingCollected: make(chan bool, 3), - rowCopyComplete: make(chan bool), + rowCopyComplete: make(chan error), allEventsUpToLockProcessed: make(chan string), copyRowsQueue: make(chan tableWriteFunc), @@ -180,11 +180,16 @@ func (this *Migrator) executeAndThrottleOnError(operation func() error) (err err // consumeRowCopyComplete blocks on the rowCopyComplete channel once, and then // consumes and drops any further incoming events that may be left hanging. func (this *Migrator) consumeRowCopyComplete() { - <-this.rowCopyComplete + if err := <-this.rowCopyComplete; err != nil { + this.migrationContext.PanicAbort <- err + } atomic.StoreInt64(&this.rowCopyCompleteFlag, 1) this.migrationContext.MarkRowCopyEndTime() go func() { - for <-this.rowCopyComplete { + for err := range this.rowCopyComplete { + if err != nil { + this.migrationContext.PanicAbort <- err + } } }() } @@ -1024,7 +1029,7 @@ func (this *Migrator) initiateApplier() error { // a chunk of rows onto the ghost table. func (this *Migrator) iterateChunks() error { terminateRowIteration := func(err error) error { - this.rowCopyComplete <- true + this.rowCopyComplete <- err return log.Errore(err) } if this.migrationContext.Noop { @@ -1076,7 +1081,10 @@ func (this *Migrator) iterateChunks() error { atomic.AddInt64(&this.migrationContext.Iteration, 1) return nil } - return this.retryOperation(applyCopyRowsFunc) + if err := this.retryOperation(applyCopyRowsFunc); err != nil { + return terminateRowIteration(err) + } + return nil } // Enqueue copy operation; to be executed by executeWriteFuncs() this.copyRowsQueue <- copyRowsFunc