Add tests, reorganize/improve code.

This commit is contained in:
Morgan Tocker 2022-12-07 09:06:30 -07:00
parent e7734543b1
commit 67fa12fb1b
7 changed files with 134 additions and 23 deletions

View File

@ -61,6 +61,12 @@ It is not reliable to parse the `ALTER` statement to determine if it is instant
`gh-ost` will automatically fallback to the normal DDL process if the attempt to use instant DDL is unsuccessful.
### chunk-size
Chunk size is the number of rows to copy in a single batch for copying data from the original table to the ghost table. The default value is 1000. Increasing the chunk-size can improve performance (via more batching) but also increases the risk of replica delay.
See also: [`dynamic-chunking`](#dynamic-chunking)
### conf
`--conf=/path/to/my.cnf`: file where credentials are specified. Should be in (or contain) the following format:
@ -122,6 +128,27 @@ Why is this behavior configurable? Different workloads have different characteri
Noteworthy is that setting `--dml-batch-size` to higher value _does not_ mean `gh-ost` blocks or waits on writes. The batch size is an upper limit on transaction size, not a minimal one. If `gh-ost` doesn't have "enough" events in the pipe, it does not wait on the binary log, it just writes what it already has. This conveniently suggests that if write load is light enough for `gh-ost` to only see a few events in the binary log at a given time, then it is also light enough for `gh-ost` to apply a fraction of the batch size.
### dynamic-chunking
Dynamic chunking (default: `OFF`) is a feature that allows `gh-ost` to automatically increase or decrease the `--chunk-size` up to 50x, based on the execution time of previous copy-row operations. The goal is to find the optimal batch size to reach `--dynamic-chunk-size-target-millis` (default: 50).
For example, assume `--chunk-size=1000`, `--dynamic-chunking=true` and `--dynamic-chunk-size-target-millis=50`:
- The actual "target" chunk size used will always be in the range of `[20,50000]` (within 50x the chunk size)
- Approximately every 1 second, `gh-ost` will re-assess if the target chunk size is optimal based on the `p90` of recent executions.
- Increases in target chunk size are scaled up by no more than 50% of the current target size at a time.
- If any copy-row operations exceed 250ms (5x the target), the target chunk size is immediately reduced to 10% of its current value.
Enabling dynamic chunk size can be more reliable than the static `--chunk-size=N`, because tables are not created equally. For a table with a very high number of columns and several indexes, `1000` rows may actually be too large of a chunk size. Similarly, for a table with very few columns and no indexes, the ideal batch size may be 20K+ rows (while still being under the 50ms target).
See also: [`chunk-size`](#chunk-size), [`dynamic-chunk-size-target-millis`](#dynamic-chunk-size-target-millis)
### dynamic-chunk-size-target-millis
The target execution time for each copy-row operation when [`--dynamic-chunking`](#dynamic-chunking) (default: `OFF`) is enabled.
The default value of `50` is a good starting point for most workloads. If you find that read-replicas are intermittently falling behind, you may want to decrease this value. Similarly, if you do not use read-replicas there may be a benefit from increasing this value slightly. The recommended range is `[10,10000]`. Values larger than this have limited added benefit and are not recommended.
### exact-rowcount
A `gh-ost` execution need to copy whatever rows you have in your existing table onto the ghost table. This can and often will be, a large number. Exactly what that number is?

View File

@ -162,7 +162,7 @@ type MigrationContext struct {
HooksHintToken string
HooksStatusIntervalSec int64
DynamicChunkSize bool
DynamicChunking bool
DynamicChunkSizeTargetMillis int64
targetChunkSizeMutex sync.Mutex
targetChunkFeedback []time.Duration
@ -309,7 +309,7 @@ func NewMigrationContext() *MigrationContext {
ColumnRenameMap: make(map[string]string),
PanicAbort: make(chan error),
Log: NewDefaultLogger(),
DynamicChunkSize: false,
DynamicChunking: false,
}
}
@ -583,28 +583,37 @@ func (this *MigrationContext) GetTotalRowsCopied() int64 {
// in GetChunkSize(), however if the input value far exceeds what was expected (>5x)
// we synchronously reduce the chunk size. If it was a one off, it's not an issue
// because the next few samples will always scale the value back up.
func (this *MigrationContext) ChunkDurationFeedback(d time.Duration) {
func (this *MigrationContext) ChunkDurationFeedback(d time.Duration) (outOfRange bool) {
if !this.DynamicChunking {
return false
}
this.targetChunkSizeMutex.Lock()
defer this.targetChunkSizeMutex.Unlock()
if int64(d) > (this.DynamicChunkSizeTargetMillis * DynamicPanicFactor * int64(time.Millisecond)) {
this.targetChunkFeedback = []time.Duration{}
newTarget := float64(this.targetChunkSize) / float64(DynamicPanicFactor*2)
this.targetChunkSize = int64(newTarget)
this.targetChunkSize = this.boundaryCheckTargetChunkSize(newTarget)
return true // don't include in feedback
}
this.targetChunkFeedback = append(this.targetChunkFeedback, d)
return false
}
// calculateNewTargetChunkSize is called by GetChunkSize()
// under a mutex. It's safe to read this.targetchunkFeedback.
func (this *MigrationContext) calculateNewTargetChunkSize() int64 {
// We do all our math as float64 of time in ns
p90 := float64(findP90(this.targetChunkFeedback))
p90 := float64(lazyFindP90(this.targetChunkFeedback))
targetTime := float64(this.DynamicChunkSizeTargetMillis * int64(time.Millisecond))
newTargetRows := float64(this.targetChunkSize) * (targetTime / p90)
return this.boundaryCheckTargetChunkSize(newTargetRows)
}
// Apply some final boundary checking:
// We are only allowed to scale up/down 50x from
// the original ("reference") chunk size.
// boundaryCheckTargetChunkSize makes sure the new target is not
// too large/small since we are only allowed to scale up/down 50x from
// the original ("reference") chunk size, and only permitted to increase
// by 50% at a time. This is called under a mutex.
func (this *MigrationContext) boundaryCheckTargetChunkSize(newTargetRows float64) int64 {
referenceSize := float64(atomic.LoadInt64(&this.chunkSize))
if newTargetRows < (referenceSize / MaxDynamicScaleFactor) {
newTargetRows = referenceSize / MaxDynamicScaleFactor
@ -612,14 +621,9 @@ func (this *MigrationContext) calculateNewTargetChunkSize() int64 {
if newTargetRows > (referenceSize * MaxDynamicScaleFactor) {
newTargetRows = referenceSize * MaxDynamicScaleFactor
}
// We only ever increase by 50% at a time.
// This ensures a gradual step up if there is some non-linear behavior.
// There's plenty of time to increase more.
if newTargetRows > float64(this.targetChunkSize)*MaxDynamicStepFactor {
newTargetRows = float64(this.targetChunkSize) * MaxDynamicStepFactor
}
// The newTargetRows must be at least 10,
// otherwise we're going too low
if newTargetRows < MinDynamicRowSize {
newTargetRows = MinDynamicRowSize
}
@ -627,8 +631,8 @@ func (this *MigrationContext) calculateNewTargetChunkSize() int64 {
}
// GetChunkSize returns the number of rows to copy in a single chunk:
// - If DynamicChunkSize is disabled, it will return this.chunkSize.
// - If DynamicChunkSize is enabled, it will return a dynamic value that
// - If Dynamic Chunking is disabled, it will return this.chunkSize.
// - If Dynamic Chunking is enabled, it will return a value that
// automatically adjusts based on the duration of the last few
// copy-rows tasks.
//
@ -641,10 +645,10 @@ func (this *MigrationContext) calculateNewTargetChunkSize() int64 {
// - Fow very narrow rows, it's not enough (leaving performance on the table).
// - For very wide rows (or with many secondary indexes) 1000 might be too high!
//
// Dynamic chunk size addresses this by using row-size as a starting point,
// Dynamic chunking addresses this by using row-size as a starting point,
// *but* the main configurable is based on time (in ms).
func (this *MigrationContext) GetChunkSize() int64 {
if !this.DynamicChunkSize {
if !this.DynamicChunking {
return atomic.LoadInt64(&this.chunkSize)
}
this.targetChunkSizeMutex.Lock()
@ -657,7 +661,6 @@ func (this *MigrationContext) GetChunkSize() int64 {
if len(this.targetChunkFeedback) >= 10 {
this.targetChunkSize = this.calculateNewTargetChunkSize()
this.targetChunkFeedback = []time.Duration{} // reset
fmt.Printf("# Adjusting chunk size based on feedback: %d\n", this.targetChunkSize)
}
return this.targetChunkSize
}

View File

@ -120,3 +120,78 @@ func TestReadConfigFile(t *testing.T) {
}
}
}
func TestDynamicChunker(t *testing.T) {
context := NewMigrationContext()
context.chunkSize = 1000
context.DynamicChunking = true
context.DynamicChunkSizeTargetMillis = 50
// Before feedback it should match the static chunk size
test.S(t).ExpectEquals(context.GetChunkSize(), int64(1000))
// 1s is >5x the target, so it should immediately /10 the target
context.ChunkDurationFeedback(1 * time.Second)
test.S(t).ExpectEquals(context.GetChunkSize(), int64(100))
// Let's provide 10 pieces of feedback, and see the chunk size
// be adjusted based on the p90th value.
context.ChunkDurationFeedback(time.Duration(33 * time.Millisecond)) // 1st
context.ChunkDurationFeedback(time.Duration(33 * time.Millisecond)) // 2nd
context.ChunkDurationFeedback(time.Duration(32 * time.Millisecond)) // 3rd
context.ChunkDurationFeedback(time.Duration(40 * time.Millisecond))
context.ChunkDurationFeedback(time.Duration(61 * time.Millisecond))
context.ChunkDurationFeedback(time.Duration(37 * time.Millisecond))
context.ChunkDurationFeedback(time.Duration(38 * time.Millisecond))
context.ChunkDurationFeedback(time.Duration(35 * time.Millisecond))
context.ChunkDurationFeedback(time.Duration(29 * time.Millisecond))
test.S(t).ExpectEquals(context.GetChunkSize(), int64(100)) // 9th
context.ChunkDurationFeedback(time.Duration(38 * time.Millisecond)) // 10th
// Because 10 items of feedback have been received,
// the chunk size is recalculated. The p90 is 40ms (below our target)
// so the adjusted chunk size increases 25% to 125
test.S(t).ExpectEquals(context.GetChunkSize(), int64(125))
// Collect some new feedback where the p90 is 500us (much lower than our target)
// We have boundary checking on the value which limits it to 50% greater
// than the previous chunk size.
context.ChunkDurationFeedback(time.Duration(400 * time.Microsecond))
context.ChunkDurationFeedback(time.Duration(450 * time.Microsecond))
context.ChunkDurationFeedback(time.Duration(470 * time.Microsecond))
context.ChunkDurationFeedback(time.Duration(520 * time.Microsecond))
context.ChunkDurationFeedback(time.Duration(500 * time.Microsecond))
context.ChunkDurationFeedback(time.Duration(490 * time.Microsecond))
context.ChunkDurationFeedback(time.Duration(300 * time.Microsecond))
context.ChunkDurationFeedback(time.Duration(450 * time.Microsecond))
context.ChunkDurationFeedback(time.Duration(460 * time.Microsecond))
context.ChunkDurationFeedback(time.Duration(480 * time.Microsecond))
test.S(t).ExpectEquals(context.GetChunkSize(), int64(187)) // very minor increase
// Test that the chunk size is not allowed to grow larger than 50x
// the original chunk size. Because of the gradual step up, we need to
// provide a lot of feedback first.
for i := 0; i < 1000; i++ {
context.ChunkDurationFeedback(time.Duration(480 * time.Microsecond))
context.GetChunkSize()
}
test.S(t).ExpectEquals(context.GetChunkSize(), int64(50000))
// Similarly, the minimum chunksize is 1000/50=20 rows no matter what the feedback.
// The downscaling rule is /10 for values that immediately exceed 5x the target,
// so it usually scales down before the feedback re-evaluation kicks in.
for i := 0; i < 100; i++ {
context.ChunkDurationFeedback(time.Duration(10 * time.Second))
context.GetChunkSize()
}
test.S(t).ExpectEquals(context.GetChunkSize(), int64(20))
// If we set the chunkSize to 100, then 100/50=2 is the minimum.
// But there is a hard coded minimum of 10 rows for safety.
context.chunkSize = 100
for i := 0; i < 100; i++ {
context.ChunkDurationFeedback(time.Duration(10 * time.Second))
context.GetChunkSize()
}
test.S(t).ExpectEquals(context.GetChunkSize(), int64(10))
}

View File

@ -95,7 +95,10 @@ func ValidateConnection(db *gosql.DB, connectionConfig *mysql.ConnectionConfig,
}
}
func findP90(a []time.Duration) time.Duration {
// lazyFindP90 finds the second to last value in a slice.
// This is the same as a p90 if there are 10 values, but if
// there were 100 values it would technically be a p99 etc.
func lazyFindP90(a []time.Duration) time.Duration {
sort.Slice(a, func(i, j int) bool {
return a[i] > a[j]
})

View File

@ -42,5 +42,5 @@ func TestFindP90(t *testing.T) {
1 * time.Second,
1 * time.Second,
}
test.S(t).ExpectEquals(findP90(times), 3*time.Second)
test.S(t).ExpectEquals(lazyFindP90(times), 3*time.Second)
}

View File

@ -104,8 +104,8 @@ func main() {
flag.BoolVar(&migrationContext.CutOverExponentialBackoff, "cut-over-exponential-backoff", false, "Wait exponentially longer intervals between failed cut-over attempts. Wait intervals obey a maximum configurable with 'exponential-backoff-max-interval').")
exponentialBackoffMaxInterval := flag.Int64("exponential-backoff-max-interval", 64, "Maximum number of seconds to wait between attempts when performing various operations with exponential backoff.")
chunkSize := flag.Int64("chunk-size", 1000, "amount of rows to handle in each iteration (allowed range: 10-100,000)")
flag.BoolVar(&migrationContext.DynamicChunkSize, "dynamic-chunk-size", true, "let this tool dynamically adjust the chunk size based on a time-target")
flag.Int64Var(&migrationContext.DynamicChunkSizeTargetMillis, "dynamic-chunk-size-target-millis", 50, "target duration of a chunk when dynamic chunk-size is enabled")
flag.BoolVar(&migrationContext.DynamicChunking, "dynamic-chunking", false, "automatically adjust the chunk size based on a time-target")
flag.Int64Var(&migrationContext.DynamicChunkSizeTargetMillis, "dynamic-chunk-size-target-millis", 50, "target duration of a chunk when dynamic-chunking is enabled")
dmlBatchSize := flag.Int64("dml-batch-size", 10, "batch size for DML events to apply in a single transaction (range 1-100)")
defaultRetries := flag.Int64("default-retries", 60, "Default number of retries for various operations before panicking")
cutOverLockTimeoutSeconds := flag.Int64("cut-over-lock-timeout-seconds", 3, "Max number of seconds to hold locks on tables while attempting to cut-over (retry attempted when lock exceeds timeout)")

View File

@ -1317,7 +1317,10 @@ func (this *Migrator) executeWriteFuncs() error {
}
// Send feedback to the chunker.
copyRowsDuration := time.Since(copyRowsStartTime)
this.migrationContext.ChunkDurationFeedback(copyRowsDuration)
outOfRange := this.migrationContext.ChunkDurationFeedback(copyRowsDuration)
if outOfRange {
this.migrationContext.Log.Warningf("Chunk duration took: %s, throttling copy-rows", copyRowsDuration)
}
if niceRatio := this.migrationContext.GetNiceRatio(); niceRatio > 0 {
sleepTimeNanosecondFloat64 := niceRatio * float64(copyRowsDuration.Nanoseconds())
sleepTime := time.Duration(int64(sleepTimeNanosecondFloat64)) * time.Nanosecond