Merge branch 'master' into case-insensitive-columns

This commit is contained in:
Shlomi Noach 2017-07-16 06:51:39 +03:00 committed by GitHub
commit 4a3a5a1ce1

View File

@ -79,7 +79,7 @@ type Migrator struct {
firstThrottlingCollected chan bool firstThrottlingCollected chan bool
ghostTableMigrated chan bool ghostTableMigrated chan bool
rowCopyComplete chan bool rowCopyComplete chan error
allEventsUpToLockProcessed chan string allEventsUpToLockProcessed chan string
rowCopyCompleteFlag int64 rowCopyCompleteFlag int64
@ -97,7 +97,7 @@ func NewMigrator() *Migrator {
parser: sql.NewParser(), parser: sql.NewParser(),
ghostTableMigrated: make(chan bool), ghostTableMigrated: make(chan bool),
firstThrottlingCollected: make(chan bool, 3), firstThrottlingCollected: make(chan bool, 3),
rowCopyComplete: make(chan bool), rowCopyComplete: make(chan error),
allEventsUpToLockProcessed: make(chan string), allEventsUpToLockProcessed: make(chan string),
copyRowsQueue: make(chan tableWriteFunc), copyRowsQueue: make(chan tableWriteFunc),
@ -180,11 +180,16 @@ func (this *Migrator) executeAndThrottleOnError(operation func() error) (err err
// consumeRowCopyComplete blocks on the rowCopyComplete channel once, and then // consumeRowCopyComplete blocks on the rowCopyComplete channel once, and then
// consumes and drops any further incoming events that may be left hanging. // consumes and drops any further incoming events that may be left hanging.
func (this *Migrator) consumeRowCopyComplete() { func (this *Migrator) consumeRowCopyComplete() {
<-this.rowCopyComplete if err := <-this.rowCopyComplete; err != nil {
this.migrationContext.PanicAbort <- err
}
atomic.StoreInt64(&this.rowCopyCompleteFlag, 1) atomic.StoreInt64(&this.rowCopyCompleteFlag, 1)
this.migrationContext.MarkRowCopyEndTime() this.migrationContext.MarkRowCopyEndTime()
go func() { go func() {
for <-this.rowCopyComplete { for err := range this.rowCopyComplete {
if err != nil {
this.migrationContext.PanicAbort <- err
}
} }
}() }()
} }
@ -1024,7 +1029,7 @@ func (this *Migrator) initiateApplier() error {
// a chunk of rows onto the ghost table. // a chunk of rows onto the ghost table.
func (this *Migrator) iterateChunks() error { func (this *Migrator) iterateChunks() error {
terminateRowIteration := func(err error) error { terminateRowIteration := func(err error) error {
this.rowCopyComplete <- true this.rowCopyComplete <- err
return log.Errore(err) return log.Errore(err)
} }
if this.migrationContext.Noop { if this.migrationContext.Noop {
@ -1076,7 +1081,10 @@ func (this *Migrator) iterateChunks() error {
atomic.AddInt64(&this.migrationContext.Iteration, 1) atomic.AddInt64(&this.migrationContext.Iteration, 1)
return nil return nil
} }
return this.retryOperation(applyCopyRowsFunc) if err := this.retryOperation(applyCopyRowsFunc); err != nil {
return terminateRowIteration(err)
}
return nil
} }
// Enqueue copy operation; to be executed by executeWriteFuncs() // Enqueue copy operation; to be executed by executeWriteFuncs()
this.copyRowsQueue <- copyRowsFunc this.copyRowsQueue <- copyRowsFunc