responding to errors in rowcopy
This commit is contained in:
parent
787419ac2e
commit
e6dd2c90d2
@ -79,7 +79,7 @@ type Migrator struct {
|
||||
|
||||
firstThrottlingCollected chan bool
|
||||
ghostTableMigrated chan bool
|
||||
rowCopyComplete chan bool
|
||||
rowCopyComplete chan error
|
||||
allEventsUpToLockProcessed chan string
|
||||
|
||||
rowCopyCompleteFlag int64
|
||||
@ -97,7 +97,7 @@ func NewMigrator() *Migrator {
|
||||
parser: sql.NewParser(),
|
||||
ghostTableMigrated: make(chan bool),
|
||||
firstThrottlingCollected: make(chan bool, 3),
|
||||
rowCopyComplete: make(chan bool),
|
||||
rowCopyComplete: make(chan error),
|
||||
allEventsUpToLockProcessed: make(chan string),
|
||||
|
||||
copyRowsQueue: make(chan tableWriteFunc),
|
||||
@ -180,11 +180,16 @@ func (this *Migrator) executeAndThrottleOnError(operation func() error) (err err
|
||||
// consumeRowCopyComplete blocks on the rowCopyComplete channel once, and then
|
||||
// consumes and drops any further incoming events that may be left hanging.
|
||||
func (this *Migrator) consumeRowCopyComplete() {
|
||||
<-this.rowCopyComplete
|
||||
if err := <-this.rowCopyComplete; err != nil {
|
||||
this.migrationContext.PanicAbort <- err
|
||||
}
|
||||
atomic.StoreInt64(&this.rowCopyCompleteFlag, 1)
|
||||
this.migrationContext.MarkRowCopyEndTime()
|
||||
go func() {
|
||||
for <-this.rowCopyComplete {
|
||||
for err := range this.rowCopyComplete {
|
||||
if err != nil {
|
||||
this.migrationContext.PanicAbort <- err
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
@ -1024,7 +1029,7 @@ func (this *Migrator) initiateApplier() error {
|
||||
// a chunk of rows onto the ghost table.
|
||||
func (this *Migrator) iterateChunks() error {
|
||||
terminateRowIteration := func(err error) error {
|
||||
this.rowCopyComplete <- true
|
||||
this.rowCopyComplete <- err
|
||||
return log.Errore(err)
|
||||
}
|
||||
if this.migrationContext.Noop {
|
||||
@ -1076,7 +1081,10 @@ func (this *Migrator) iterateChunks() error {
|
||||
atomic.AddInt64(&this.migrationContext.Iteration, 1)
|
||||
return nil
|
||||
}
|
||||
return this.retryOperation(applyCopyRowsFunc)
|
||||
if err := this.retryOperation(applyCopyRowsFunc); err != nil {
|
||||
return terminateRowIteration(err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
// Enqueue copy operation; to be executed by executeWriteFuncs()
|
||||
this.copyRowsQueue <- copyRowsFunc
|
||||
|
Loading…
Reference in New Issue
Block a user