Merge pull request #453 from github/dynamic-dml-batch-size

Dynamic DML batch size; apadting buffer size to match
This commit is contained in:
Shlomi Noach 2017-07-20 16:47:41 +03:00 committed by GitHub
commit 2f4e226b67
4 changed files with 24 additions and 12 deletions

View File

@ -19,6 +19,7 @@ Both interfaces may serve at the same time. Both respond to simple text command,
- `sup`: returns a brief status summary of migration progress - `sup`: returns a brief status summary of migration progress
- `coordinates`: returns recent (though not exactly up to date) binary log coordinates of the inspected server - `coordinates`: returns recent (though not exactly up to date) binary log coordinates of the inspected server
- `chunk-size=<newsize>`: modify the `chunk-size`; applies on next running copy-iteration - `chunk-size=<newsize>`: modify the `chunk-size`; applies on next running copy-iteration
- `dml-batch-size=<newsize>`: modify the `dml-batch-size`; applies on next applying of binary log events
- `max-lag-millis=<max-lag>`: modify the maximum replication lag threshold (milliseconds, minimum value is `100`, i.e. `0.1` second) - `max-lag-millis=<max-lag>`: modify the maximum replication lag threshold (milliseconds, minimum value is `100`, i.e. `0.1` second)
- `max-load=<max-load-thresholds>`: modify the `max-load` config; applies on next running copy-iteration - `max-load=<max-load-thresholds>`: modify the `max-load` config; applies on next running copy-iteration
- The `max-load` format must be: `some_status=<numeric-threshold>[,some_status=<numeric-threshold>...]`' - The `max-load` format must be: `some_status=<numeric-threshold>[,some_status=<numeric-threshold>...]`'
@ -52,7 +53,7 @@ While migration is running:
$ echo status | nc -U /tmp/gh-ost.test.sample_data_0.sock $ echo status | nc -U /tmp/gh-ost.test.sample_data_0.sock
# Migrating `test`.`sample_data_0`; Ghost table is `test`.`_sample_data_0_gst` # Migrating `test`.`sample_data_0`; Ghost table is `test`.`_sample_data_0_gst`
# Migration started at Tue Jun 07 11:45:16 +0200 2016 # Migration started at Tue Jun 07 11:45:16 +0200 2016
# chunk-size: 200; max lag: 1500ms; max-load: map[Threads_connected:20] # chunk-size: 200; max lag: 1500ms; dml-batch-size: 10; max-load: map[Threads_connected:20]
# Throttle additional flag file: /tmp/gh-ost.throttle # Throttle additional flag file: /tmp/gh-ost.throttle
# Serving on unix socket: /tmp/gh-ost.test.sample_data_0.sock # Serving on unix socket: /tmp/gh-ost.test.sample_data_0.sock
# Serving on TCP port: 10001 # Serving on TCP port: 10001
@ -63,7 +64,7 @@ Copy: 0/2915 0.0%; Applied: 0; Backlog: 0/100; Elapsed: 40s(copy), 41s(total); s
$ echo "chunk-size=250" | nc -U /tmp/gh-ost.test.sample_data_0.sock $ echo "chunk-size=250" | nc -U /tmp/gh-ost.test.sample_data_0.sock
# Migrating `test`.`sample_data_0`; Ghost table is `test`.`_sample_data_0_gst` # Migrating `test`.`sample_data_0`; Ghost table is `test`.`_sample_data_0_gst`
# Migration started at Tue Jun 07 11:56:03 +0200 2016 # Migration started at Tue Jun 07 11:56:03 +0200 2016
# chunk-size: 250; max lag: 1500ms; max-load: map[Threads_connected:20] # chunk-size: 250; max lag: 1500ms; dml-batch-size: 10; max-load: map[Threads_connected:20]
# Throttle additional flag file: /tmp/gh-ost.throttle # Throttle additional flag file: /tmp/gh-ost.throttle
# Serving on unix socket: /tmp/gh-ost.test.sample_data_0.sock # Serving on unix socket: /tmp/gh-ost.test.sample_data_0.sock
# Serving on TCP port: 10001 # Serving on TCP port: 10001

View File

@ -47,7 +47,7 @@ const (
const ( const (
HTTPStatusOK = 200 HTTPStatusOK = 200
maxBatchSize = 1000 MaxEventsBatchSize = 1000
) )
var ( var (
@ -451,8 +451,8 @@ func (this *MigrationContext) SetDMLBatchSize(batchSize int64) {
if batchSize < 1 { if batchSize < 1 {
batchSize = 1 batchSize = 1
} }
if batchSize > maxBatchSize { if batchSize > MaxEventsBatchSize {
batchSize = maxBatchSize batchSize = MaxEventsBatchSize
} }
atomic.StoreInt64(&this.DMLBatchSize, batchSize) atomic.StoreInt64(&this.DMLBatchSize, batchSize)
} }

View File

@ -52,10 +52,6 @@ func newApplyEventStructByDML(dmlEvent *binlog.BinlogDMLEvent) *applyEventStruct
return result return result
} }
const (
applyEventsQueueBuffer = 100
)
type PrintStatusRule int type PrintStatusRule int
const ( const (
@ -101,7 +97,7 @@ func NewMigrator() *Migrator {
allEventsUpToLockProcessed: make(chan string), allEventsUpToLockProcessed: make(chan string),
copyRowsQueue: make(chan tableWriteFunc), copyRowsQueue: make(chan tableWriteFunc),
applyEventsQueue: make(chan *applyEventStruct, applyEventsQueueBuffer), applyEventsQueue: make(chan *applyEventStruct, base.MaxEventsBatchSize),
handledChangelogStates: make(map[string]bool), handledChangelogStates: make(map[string]bool),
} }
return migrator return migrator
@ -767,9 +763,10 @@ func (this *Migrator) printMigrationStatusHint(writers ...io.Writer) {
)) ))
maxLoad := this.migrationContext.GetMaxLoad() maxLoad := this.migrationContext.GetMaxLoad()
criticalLoad := this.migrationContext.GetCriticalLoad() criticalLoad := this.migrationContext.GetCriticalLoad()
fmt.Fprintln(w, fmt.Sprintf("# chunk-size: %+v; max-lag-millis: %+vms; max-load: %s; critical-load: %s; nice-ratio: %f", fmt.Fprintln(w, fmt.Sprintf("# chunk-size: %+v; max-lag-millis: %+vms; dml-batch-size: %+v; max-load: %s; critical-load: %s; nice-ratio: %f",
atomic.LoadInt64(&this.migrationContext.ChunkSize), atomic.LoadInt64(&this.migrationContext.ChunkSize),
atomic.LoadInt64(&this.migrationContext.MaxLagMillisecondsThrottleThreshold), atomic.LoadInt64(&this.migrationContext.MaxLagMillisecondsThrottleThreshold),
atomic.LoadInt64(&this.migrationContext.DMLBatchSize),
maxLoad.String(), maxLoad.String(),
criticalLoad.String(), criticalLoad.String(),
this.migrationContext.GetNiceRatio(), this.migrationContext.GetNiceRatio(),

View File

@ -146,6 +146,7 @@ status # Print a detailed status message
sup # Print a short status message sup # Print a short status message
coordinates # Print the currently inspected coordinates coordinates # Print the currently inspected coordinates
chunk-size=<newsize> # Set a new chunk-size chunk-size=<newsize> # Set a new chunk-size
dml-batch-size=<newsize> # Set a new dml-batch-size
nice-ratio=<ratio> # Set a new nice-ratio, immediate sleep after each row-copy operation, float (examples: 0 is agrressive, 0.7 adds 70% runtime, 1.0 doubles runtime, 2.0 triples runtime, ...) nice-ratio=<ratio> # Set a new nice-ratio, immediate sleep after each row-copy operation, float (examples: 0 is agrressive, 0.7 adds 70% runtime, 1.0 doubles runtime, 2.0 triples runtime, ...)
critical-load=<load> # Set a new set of max-load thresholds critical-load=<load> # Set a new set of max-load thresholds
max-lag-millis=<max-lag> # Set a new replication lag threshold max-lag-millis=<max-lag> # Set a new replication lag threshold
@ -187,6 +188,19 @@ help # This message
return ForcePrintStatusAndHintRule, nil return ForcePrintStatusAndHintRule, nil
} }
} }
case "dml-batch-size":
{
if argIsQuestion {
fmt.Fprintf(writer, "%+v\n", atomic.LoadInt64(&this.migrationContext.DMLBatchSize))
return NoPrintStatusRule, nil
}
if dmlBatchSize, err := strconv.Atoi(arg); err != nil {
return NoPrintStatusRule, err
} else {
this.migrationContext.SetDMLBatchSize(int64(dmlBatchSize))
return ForcePrintStatusAndHintRule, nil
}
}
case "max-lag-millis": case "max-lag-millis":
{ {
if argIsQuestion { if argIsQuestion {