Dynamic DML batch size; apadting buffer size to match
This commit is contained in:
parent
94a325c6b4
commit
6da0b8af3d
@ -47,7 +47,7 @@ const (
|
|||||||
|
|
||||||
const (
|
const (
|
||||||
HTTPStatusOK = 200
|
HTTPStatusOK = 200
|
||||||
maxBatchSize = 1000
|
MaxEventsBatchSize = 1000
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@ -442,8 +442,8 @@ func (this *MigrationContext) SetDMLBatchSize(batchSize int64) {
|
|||||||
if batchSize < 1 {
|
if batchSize < 1 {
|
||||||
batchSize = 1
|
batchSize = 1
|
||||||
}
|
}
|
||||||
if batchSize > maxBatchSize {
|
if batchSize > MaxEventsBatchSize {
|
||||||
batchSize = maxBatchSize
|
batchSize = MaxEventsBatchSize
|
||||||
}
|
}
|
||||||
atomic.StoreInt64(&this.DMLBatchSize, batchSize)
|
atomic.StoreInt64(&this.DMLBatchSize, batchSize)
|
||||||
}
|
}
|
||||||
|
@ -52,10 +52,6 @@ func newApplyEventStructByDML(dmlEvent *binlog.BinlogDMLEvent) *applyEventStruct
|
|||||||
return result
|
return result
|
||||||
}
|
}
|
||||||
|
|
||||||
const (
|
|
||||||
applyEventsQueueBuffer = 100
|
|
||||||
)
|
|
||||||
|
|
||||||
type PrintStatusRule int
|
type PrintStatusRule int
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@ -101,7 +97,7 @@ func NewMigrator() *Migrator {
|
|||||||
allEventsUpToLockProcessed: make(chan string),
|
allEventsUpToLockProcessed: make(chan string),
|
||||||
|
|
||||||
copyRowsQueue: make(chan tableWriteFunc),
|
copyRowsQueue: make(chan tableWriteFunc),
|
||||||
applyEventsQueue: make(chan *applyEventStruct, applyEventsQueueBuffer),
|
applyEventsQueue: make(chan *applyEventStruct, base.MaxEventsBatchSize),
|
||||||
handledChangelogStates: make(map[string]bool),
|
handledChangelogStates: make(map[string]bool),
|
||||||
}
|
}
|
||||||
return migrator
|
return migrator
|
||||||
@ -767,9 +763,10 @@ func (this *Migrator) printMigrationStatusHint(writers ...io.Writer) {
|
|||||||
))
|
))
|
||||||
maxLoad := this.migrationContext.GetMaxLoad()
|
maxLoad := this.migrationContext.GetMaxLoad()
|
||||||
criticalLoad := this.migrationContext.GetCriticalLoad()
|
criticalLoad := this.migrationContext.GetCriticalLoad()
|
||||||
fmt.Fprintln(w, fmt.Sprintf("# chunk-size: %+v; max-lag-millis: %+vms; max-load: %s; critical-load: %s; nice-ratio: %f",
|
fmt.Fprintln(w, fmt.Sprintf("# chunk-size: %+v; max-lag-millis: %+vms; dml-batch-size: %+v; max-load: %s; critical-load: %s; nice-ratio: %f",
|
||||||
atomic.LoadInt64(&this.migrationContext.ChunkSize),
|
atomic.LoadInt64(&this.migrationContext.ChunkSize),
|
||||||
atomic.LoadInt64(&this.migrationContext.MaxLagMillisecondsThrottleThreshold),
|
atomic.LoadInt64(&this.migrationContext.MaxLagMillisecondsThrottleThreshold),
|
||||||
|
atomic.LoadInt64(&this.migrationContext.DMLBatchSize),
|
||||||
maxLoad.String(),
|
maxLoad.String(),
|
||||||
criticalLoad.String(),
|
criticalLoad.String(),
|
||||||
this.migrationContext.GetNiceRatio(),
|
this.migrationContext.GetNiceRatio(),
|
||||||
|
@ -187,6 +187,19 @@ help # This message
|
|||||||
return ForcePrintStatusAndHintRule, nil
|
return ForcePrintStatusAndHintRule, nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
case "dml-batch-size":
|
||||||
|
{
|
||||||
|
if argIsQuestion {
|
||||||
|
fmt.Fprintf(writer, "%+v\n", atomic.LoadInt64(&this.migrationContext.DMLBatchSize))
|
||||||
|
return NoPrintStatusRule, nil
|
||||||
|
}
|
||||||
|
if dmlBatchSize, err := strconv.Atoi(arg); err != nil {
|
||||||
|
return NoPrintStatusRule, err
|
||||||
|
} else {
|
||||||
|
this.migrationContext.SetDMLBatchSize(int64(dmlBatchSize))
|
||||||
|
return ForcePrintStatusAndHintRule, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
case "max-lag-millis":
|
case "max-lag-millis":
|
||||||
{
|
{
|
||||||
if argIsQuestion {
|
if argIsQuestion {
|
||||||
|
Loading…
Reference in New Issue
Block a user