This commit is contained in:
Morgan Tocker 2023-02-10 10:51:07 +09:00 committed by GitHub
commit 6364669345
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 256 additions and 8 deletions

View File

@ -61,6 +61,12 @@ It is not reliable to parse the `ALTER` statement to determine if it is instant
`gh-ost` will automatically fallback to the normal DDL process if the attempt to use instant DDL is unsuccessful.
### chunk-size
Chunk size is the number of rows to copy in a single batch for copying data from the original table to the ghost table. The default value is 1000. Increasing the chunk-size can improve performance (via more batching) but also increases the risk of replica delay.
See also: [`dynamic-chunking`](#dynamic-chunking)
### conf
`--conf=/path/to/my.cnf`: file where credentials are specified. Should be in (or contain) the following format:
@ -122,6 +128,27 @@ Why is this behavior configurable? Different workloads have different characteri
Noteworthy is that setting `--dml-batch-size` to higher value _does not_ mean `gh-ost` blocks or waits on writes. The batch size is an upper limit on transaction size, not a minimal one. If `gh-ost` doesn't have "enough" events in the pipe, it does not wait on the binary log, it just writes what it already has. This conveniently suggests that if write load is light enough for `gh-ost` to only see a few events in the binary log at a given time, then it is also light enough for `gh-ost` to apply a fraction of the batch size.
### dynamic-chunking
Dynamic chunking (default: `OFF`) is a feature that allows `gh-ost` to automatically increase or decrease the `--chunk-size` up to 50x, based on the execution time of previous copy-row operations. The goal is to find the optimal batch size to reach `--dynamic-chunk-size-target-millis` (default: 50).
For example, assume `--chunk-size=1000`, `--dynamic-chunking=true` and `--dynamic-chunk-size-target-millis=50`:
- The actual "target" chunk size used will always be in the range of `[20,50000]` (within 50x the chunk size)
- Approximately every 1 second, `gh-ost` will re-assess if the target chunk size is optimal based on the `p90` of recent executions.
- Increases in target chunk size are scaled up by no more than 50% of the current target size at a time.
- If any copy-row operations exceed 250ms (5x the target), the target chunk size is immediately reduced to 10% of its current value.
Enabling dynamic chunk size can be more reliable than the static `--chunk-size=N`, because tables are not created equally. For a table with a very high number of columns and several indexes, `1000` rows may actually be too large of a chunk size. Similarly, for a table with very few columns and no indexes, the ideal batch size may be 20K+ rows (while still being under the 50ms target).
See also: [`chunk-size`](#chunk-size), [`dynamic-chunk-size-target-millis`](#dynamic-chunk-size-target-millis)
### dynamic-chunk-size-target-millis
The target execution time for each copy-row operation when [`--dynamic-chunking`](#dynamic-chunking) (default: `OFF`) is enabled.
The default value of `50` is a good starting point for most workloads. If you find that read-replicas are intermittently falling behind, you may want to decrease this value. Similarly, if you do not use read-replicas there may be a benefit from increasing this value slightly. The recommended range is `[10,10000]`. Values larger than this have limited added benefit and are not recommended.
### exact-rowcount
A `gh-ost` execution need to copy whatever rows you have in your existing table onto the ghost table. This can and often will be, a large number. Exactly what that number is?

View File

@ -52,6 +52,22 @@ const (
HTTPStatusOK = 200
MaxEventsBatchSize = 1000
ETAUnknown = math.MinInt64
// MaxDynamicScaleFactor is the maximum factor dynamic scaling can change the chunkSize from
// the setting chunkSize. For example, if the factor is 10, and chunkSize is 1000, then the
// values will be in the range of 100 to 10000.
MaxDynamicScaleFactor = 50
// MaxDynamicStepFactor is the maximum amount each recalculation of the dynamic chunkSize can
// increase by. For example, if the newTarget is 5000 but the current target is 1000, the newTarget
// will be capped back down to 1500. Over time the number 5000 will be reached, but not straight away.
MaxDynamicStepFactor = 1.5
// MinDynamicChunkSize is the minimum chunkSize that can be used when dynamic chunkSize is enabled.
// This helps prevent a scenario where the chunk size is too small (it can never be less than 1).
MinDynamicRowSize = 10
// DynamicPanicFactor is the factor by which the feedback process takes immediate action when
// the chunkSize appears to be too large. For example, if the PanicFactor is 5, and the target *time*
// is 50ms, an actual time 250ms+ will cause the dynamic chunk size to immediately be reduced.
DynamicPanicFactor = 5
)
var (
@ -118,7 +134,7 @@ type MigrationContext struct {
HeartbeatIntervalMilliseconds int64
defaultNumRetries int64
ChunkSize int64
chunkSize int64
niceRatio float64
MaxLagMillisecondsThrottleThreshold int64
throttleControlReplicaKeys *mysql.InstanceKeyMap
@ -146,6 +162,12 @@ type MigrationContext struct {
HooksHintToken string
HooksStatusIntervalSec int64
DynamicChunking bool
DynamicChunkSizeTargetMillis int64
targetChunkSizeMutex sync.Mutex
targetChunkFeedback []time.Duration
targetChunkSize int64
DropServeSocket bool
ServeSocketFile string
ServeTCPPort int64
@ -269,7 +291,7 @@ func NewMigrationContext() *MigrationContext {
return &MigrationContext{
Uuid: uuid.NewV4().String(),
defaultNumRetries: 60,
ChunkSize: 1000,
chunkSize: 1000,
InspectorConnectionConfig: mysql.NewConnectionConfig(),
ApplierConnectionConfig: mysql.NewConnectionConfig(),
MaxLagMillisecondsThrottleThreshold: 1500,
@ -287,6 +309,7 @@ func NewMigrationContext() *MigrationContext {
ColumnRenameMap: make(map[string]string),
PanicAbort: make(chan error),
Log: NewDefaultLogger(),
DynamicChunking: false,
}
}
@ -554,6 +577,94 @@ func (this *MigrationContext) GetTotalRowsCopied() int64 {
return atomic.LoadInt64(&this.TotalRowsCopied)
}
// ChunkDurationFeedback collects samples from copy-rows tasks, and feeds them
// back into a moving p90 that is used to return a more precise value
// in GetChunkSize() calls. Usually we wait for multiple samples and then recalculate
// in GetChunkSize(), however if the input value far exceeds what was expected (>5x)
// we synchronously reduce the chunk size. If it was a one off, it's not an issue
// because the next few samples will always scale the value back up.
func (this *MigrationContext) ChunkDurationFeedback(d time.Duration) (outOfRange bool) {
if !this.DynamicChunking {
return false
}
this.targetChunkSizeMutex.Lock()
defer this.targetChunkSizeMutex.Unlock()
if int64(d) > (this.DynamicChunkSizeTargetMillis * DynamicPanicFactor * int64(time.Millisecond)) {
this.targetChunkFeedback = []time.Duration{}
newTarget := float64(this.targetChunkSize) / float64(DynamicPanicFactor*2)
this.targetChunkSize = this.boundaryCheckTargetChunkSize(newTarget)
return true // don't include in feedback
}
this.targetChunkFeedback = append(this.targetChunkFeedback, d)
return false
}
// calculateNewTargetChunkSize is called by GetChunkSize()
// under a mutex. It's safe to read this.targetchunkFeedback.
func (this *MigrationContext) calculateNewTargetChunkSize() int64 {
// We do all our math as float64 of time in ns
p90 := float64(lazyFindP90(this.targetChunkFeedback))
targetTime := float64(this.DynamicChunkSizeTargetMillis * int64(time.Millisecond))
newTargetRows := float64(this.targetChunkSize) * (targetTime / p90)
return this.boundaryCheckTargetChunkSize(newTargetRows)
}
// boundaryCheckTargetChunkSize makes sure the new target is not
// too large/small since we are only allowed to scale up/down 50x from
// the original ("reference") chunk size, and only permitted to increase
// by 50% at a time. This is called under a mutex.
func (this *MigrationContext) boundaryCheckTargetChunkSize(newTargetRows float64) int64 {
referenceSize := float64(atomic.LoadInt64(&this.chunkSize))
if newTargetRows < (referenceSize / MaxDynamicScaleFactor) {
newTargetRows = referenceSize / MaxDynamicScaleFactor
}
if newTargetRows > (referenceSize * MaxDynamicScaleFactor) {
newTargetRows = referenceSize * MaxDynamicScaleFactor
}
if newTargetRows > float64(this.targetChunkSize)*MaxDynamicStepFactor {
newTargetRows = float64(this.targetChunkSize) * MaxDynamicStepFactor
}
if newTargetRows < MinDynamicRowSize {
newTargetRows = MinDynamicRowSize
}
return int64(newTargetRows)
}
// GetChunkSize returns the number of rows to copy in a single chunk:
// - If Dynamic Chunking is disabled, it will return this.chunkSize.
// - If Dynamic Chunking is enabled, it will return a value that
// automatically adjusts based on the duration of the last few
// copy-rows tasks.
//
// Historically gh-ost has used a static chunk size (i.e. 1000 rows)
// which can be adjusted while gh-ost is running.
// An ideal chunk size is large enough that it can batch operations,
// but small enough that it doesn't cause spikes in replica lag.
//
// The problem with basing the configurable on row-size is two fold:
// - Fow very narrow rows, it's not enough (leaving performance on the table).
// - For very wide rows (or with many secondary indexes) 1000 might be too high!
//
// Dynamic chunking addresses this by using row-size as a starting point,
// *but* the main configurable is based on time (in ms).
func (this *MigrationContext) GetChunkSize() int64 {
if !this.DynamicChunking {
return atomic.LoadInt64(&this.chunkSize)
}
this.targetChunkSizeMutex.Lock()
defer this.targetChunkSizeMutex.Unlock()
if this.targetChunkSize == 0 {
this.targetChunkSize = atomic.LoadInt64(&this.chunkSize)
}
// We need 10 samples to make a decision because we
// calculate it from the p90 (i.e. 2nd to highest value).
if len(this.targetChunkFeedback) >= 10 {
this.targetChunkSize = this.calculateNewTargetChunkSize()
this.targetChunkFeedback = []time.Duration{} // reset
}
return this.targetChunkSize
}
func (this *MigrationContext) GetIteration() int64 {
return atomic.LoadInt64(&this.Iteration)
}
@ -611,7 +722,7 @@ func (this *MigrationContext) SetChunkSize(chunkSize int64) {
if chunkSize > 100000 {
chunkSize = 100000
}
atomic.StoreInt64(&this.ChunkSize, chunkSize)
atomic.StoreInt64(&this.chunkSize, chunkSize)
}
func (this *MigrationContext) SetDMLBatchSize(batchSize int64) {

View File

@ -120,3 +120,78 @@ func TestReadConfigFile(t *testing.T) {
}
}
}
func TestDynamicChunker(t *testing.T) {
context := NewMigrationContext()
context.chunkSize = 1000
context.DynamicChunking = true
context.DynamicChunkSizeTargetMillis = 50
// Before feedback it should match the static chunk size
test.S(t).ExpectEquals(context.GetChunkSize(), int64(1000))
// 1s is >5x the target, so it should immediately /10 the target
context.ChunkDurationFeedback(1 * time.Second)
test.S(t).ExpectEquals(context.GetChunkSize(), int64(100))
// Let's provide 10 pieces of feedback, and see the chunk size
// be adjusted based on the p90th value.
context.ChunkDurationFeedback(33 * time.Millisecond) // 1st
context.ChunkDurationFeedback(33 * time.Millisecond) // 2nd
context.ChunkDurationFeedback(32 * time.Millisecond) // 3rd
context.ChunkDurationFeedback(40 * time.Millisecond)
context.ChunkDurationFeedback(61 * time.Millisecond)
context.ChunkDurationFeedback(37 * time.Millisecond)
context.ChunkDurationFeedback(38 * time.Millisecond)
context.ChunkDurationFeedback(35 * time.Millisecond)
context.ChunkDurationFeedback(29 * time.Millisecond)
test.S(t).ExpectEquals(context.GetChunkSize(), int64(100)) // 9th
context.ChunkDurationFeedback(38 * time.Millisecond) // 10th
// Because 10 items of feedback have been received,
// the chunk size is recalculated. The p90 is 40ms (below our target)
// so the adjusted chunk size increases 25% to 125
test.S(t).ExpectEquals(context.GetChunkSize(), int64(125))
// Collect some new feedback where the p90 is 500us (much lower than our target)
// We have boundary checking on the value which limits it to 50% greater
// than the previous chunk size.
context.ChunkDurationFeedback(400 * time.Microsecond)
context.ChunkDurationFeedback(450 * time.Microsecond)
context.ChunkDurationFeedback(470 * time.Microsecond)
context.ChunkDurationFeedback(520 * time.Microsecond)
context.ChunkDurationFeedback(500 * time.Microsecond)
context.ChunkDurationFeedback(490 * time.Microsecond)
context.ChunkDurationFeedback(300 * time.Microsecond)
context.ChunkDurationFeedback(450 * time.Microsecond)
context.ChunkDurationFeedback(460 * time.Microsecond)
context.ChunkDurationFeedback(480 * time.Microsecond)
test.S(t).ExpectEquals(context.GetChunkSize(), int64(187)) // very minor increase
// Test that the chunk size is not allowed to grow larger than 50x
// the original chunk size. Because of the gradual step up, we need to
// provide a lot of feedback first.
for i := 0; i < 1000; i++ {
context.ChunkDurationFeedback(480 * time.Microsecond)
context.GetChunkSize()
}
test.S(t).ExpectEquals(context.GetChunkSize(), int64(50000))
// Similarly, the minimum chunksize is 1000/50=20 rows no matter what the feedback.
// The downscaling rule is /10 for values that immediately exceed 5x the target,
// so it usually scales down before the feedback re-evaluation kicks in.
for i := 0; i < 100; i++ {
context.ChunkDurationFeedback(10 * time.Second)
context.GetChunkSize()
}
test.S(t).ExpectEquals(context.GetChunkSize(), int64(20))
// If we set the chunkSize to 100, then 100/50=2 is the minimum.
// But there is a hard coded minimum of 10 rows for safety.
context.chunkSize = 100
for i := 0; i < 100; i++ {
context.ChunkDurationFeedback(10 * time.Second)
context.GetChunkSize()
}
test.S(t).ExpectEquals(context.GetChunkSize(), int64(10))
}

View File

@ -9,6 +9,7 @@ import (
"fmt"
"os"
"regexp"
"sort"
"strings"
"time"
@ -93,3 +94,13 @@ func ValidateConnection(db *gosql.DB, connectionConfig *mysql.ConnectionConfig,
return "", fmt.Errorf("Unexpected database port reported: %+v / extra_port: %+v", port, extraPort)
}
}
// lazyFindP90 finds the second to last value in a slice.
// This is the same as a p90 if there are 10 values, but if
// there were 100 values it would technically be a p99 etc.
func lazyFindP90(a []time.Duration) time.Duration {
sort.Slice(a, func(i, j int) bool {
return a[i] > a[j]
})
return a[len(a)/10]
}

View File

@ -7,6 +7,7 @@ package base
import (
"testing"
"time"
"github.com/openark/golib/log"
test "github.com/openark/golib/tests"
@ -27,3 +28,19 @@ func TestStringContainsAll(t *testing.T) {
test.S(t).ExpectTrue(StringContainsAll(s, "insert", ""))
test.S(t).ExpectTrue(StringContainsAll(s, "insert", "update", "delete"))
}
func TestFindP90(t *testing.T) {
times := []time.Duration{
1 * time.Second,
2 * time.Second,
1 * time.Second,
3 * time.Second,
10 * time.Second,
1 * time.Second,
1 * time.Second,
1 * time.Second,
1 * time.Second,
1 * time.Second,
}
test.S(t).ExpectEquals(lazyFindP90(times), 3*time.Second)
}

View File

@ -104,6 +104,8 @@ func main() {
flag.BoolVar(&migrationContext.CutOverExponentialBackoff, "cut-over-exponential-backoff", false, "Wait exponentially longer intervals between failed cut-over attempts. Wait intervals obey a maximum configurable with 'exponential-backoff-max-interval').")
exponentialBackoffMaxInterval := flag.Int64("exponential-backoff-max-interval", 64, "Maximum number of seconds to wait between attempts when performing various operations with exponential backoff.")
chunkSize := flag.Int64("chunk-size", 1000, "amount of rows to handle in each iteration (allowed range: 10-100,000)")
flag.BoolVar(&migrationContext.DynamicChunking, "dynamic-chunking", false, "automatically adjust the chunk size based on a time-target")
flag.Int64Var(&migrationContext.DynamicChunkSizeTargetMillis, "dynamic-chunk-size-target-millis", 50, "target duration of a chunk when dynamic-chunking is enabled")
dmlBatchSize := flag.Int64("dml-batch-size", 10, "batch size for DML events to apply in a single transaction (range 1-100)")
defaultRetries := flag.Int64("default-retries", 60, "Default number of retries for various operations before panicking")
cutOverLockTimeoutSeconds := flag.Int64("cut-over-lock-timeout-seconds", 3, "Max number of seconds to hold locks on tables while attempting to cut-over (retry attempted when lock exceeds timeout)")

View File

@ -576,7 +576,7 @@ func (this *Applier) CalculateNextIterationRangeEndValues() (hasFurtherRange boo
&this.migrationContext.UniqueKey.Columns,
this.migrationContext.MigrationIterationRangeMinValues.AbstractValues(),
this.migrationContext.MigrationRangeMaxValues.AbstractValues(),
atomic.LoadInt64(&this.migrationContext.ChunkSize),
this.migrationContext.GetChunkSize(),
this.migrationContext.GetIteration() == 0,
fmt.Sprintf("iteration:%d", this.migrationContext.GetIteration()),
)
@ -613,7 +613,7 @@ func (this *Applier) CalculateNextIterationRangeEndValues() (hasFurtherRange boo
// data actually gets copied from original table.
func (this *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected int64, duration time.Duration, err error) {
startTime := time.Now()
chunkSize = atomic.LoadInt64(&this.migrationContext.ChunkSize)
chunkSize = this.migrationContext.GetChunkSize()
query, explodedArgs, err := sql.BuildRangeInsertPreparedQuery(
this.migrationContext.DatabaseName,

View File

@ -839,7 +839,7 @@ func (this *Migrator) printMigrationStatusHint(writers ...io.Writer) {
maxLoad := this.migrationContext.GetMaxLoad()
criticalLoad := this.migrationContext.GetCriticalLoad()
fmt.Fprintf(w, "# chunk-size: %+v; max-lag-millis: %+vms; dml-batch-size: %+v; max-load: %s; critical-load: %s; nice-ratio: %f\n",
atomic.LoadInt64(&this.migrationContext.ChunkSize),
this.migrationContext.GetChunkSize(),
atomic.LoadInt64(&this.migrationContext.MaxLagMillisecondsThrottleThreshold),
atomic.LoadInt64(&this.migrationContext.DMLBatchSize),
maxLoad.String(),
@ -1310,8 +1310,13 @@ func (this *Migrator) executeWriteFuncs() error {
if err := copyRowsFunc(); err != nil {
return this.migrationContext.Log.Errore(err)
}
// Send feedback to the chunker.
copyRowsDuration := time.Since(copyRowsStartTime)
outOfRange := this.migrationContext.ChunkDurationFeedback(copyRowsDuration)
if outOfRange {
this.migrationContext.Log.Warningf("Chunk duration took: %s, throttling copy-rows", copyRowsDuration)
}
if niceRatio := this.migrationContext.GetNiceRatio(); niceRatio > 0 {
copyRowsDuration := time.Since(copyRowsStartTime)
sleepTimeNanosecondFloat64 := niceRatio * float64(copyRowsDuration.Nanoseconds())
sleepTime := time.Duration(int64(sleepTimeNanosecondFloat64)) * time.Nanosecond
time.Sleep(sleepTime)

View File

@ -196,7 +196,7 @@ help # This message
case "chunk-size":
{
if argIsQuestion {
fmt.Fprintf(writer, "%+v\n", atomic.LoadInt64(&this.migrationContext.ChunkSize))
fmt.Fprintf(writer, "%+v\n", this.migrationContext.GetChunkSize())
return NoPrintStatusRule, nil
}
if chunkSize, err := strconv.Atoi(arg); err != nil {