This commit is contained in:
debug-LiXiwen 2023-05-27 19:26:03 +00:00 committed by GitHub
commit d3d79d03a5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -1300,36 +1300,43 @@ func (this *Migrator) executeWriteFuncs() error {
return err
}
}
default:
case copyRowsFunc := <-this.copyRowsQueue:
if err := this.drainApplierEventQueue(); err != nil {
return this.migrationContext.Log.Errore(err)
}
{
select {
case copyRowsFunc := <-this.copyRowsQueue:
{
copyRowsStartTime := time.Now()
// Retries are handled within the copyRowsFunc
if err := copyRowsFunc(); err != nil {
return this.migrationContext.Log.Errore(err)
}
if niceRatio := this.migrationContext.GetNiceRatio(); niceRatio > 0 {
copyRowsDuration := time.Since(copyRowsStartTime)
sleepTimeNanosecondFloat64 := niceRatio * float64(copyRowsDuration.Nanoseconds())
sleepTime := time.Duration(int64(sleepTimeNanosecondFloat64)) * time.Nanosecond
time.Sleep(sleepTime)
}
}
default:
{
// Hmmmmm... nothing in the queue; no events, but also no row copy.
// This is possible upon load. Let's just sleep it over.
this.migrationContext.Log.Debugf("Getting nothing in the write queue. Sleeping...")
time.Sleep(time.Second)
}
copyRowsStartTime := time.Now()
// Retries are handled within the copyRowsFunc
if err := copyRowsFunc(); err != nil {
return this.migrationContext.Log.Errore(err)
}
if niceRatio := this.migrationContext.GetNiceRatio(); niceRatio > 0 {
copyRowsDuration := time.Since(copyRowsStartTime)
sleepTimeNanosecondFloat64 := niceRatio * float64(copyRowsDuration.Nanoseconds())
sleepTime := time.Duration(sleepTimeNanosecondFloat64) * time.Nanosecond
time.Sleep(sleepTime)
}
}
}
}
}
func (this *Migrator) drainApplierEventQueue() error {
for {
select {
case eventStruct := <-this.applyEventsQueue:
{
this.throttler.throttle(nil)
if err := this.onApplyEventStruct(eventStruct); err != nil {
return err
}
}
default:
return nil
}
}
}
// finalCleanup takes actions at very end of migration, dropping tables etc.
func (this *Migrator) finalCleanup() error {
atomic.StoreInt64(&this.migrationContext.CleanupImminentFlag, 1)