cleanup locking semantics
This commit is contained in:
parent
7e89c85541
commit
e7734543b1
|
@ -165,7 +165,7 @@ type MigrationContext struct {
|
|||
DynamicChunkSize bool
|
||||
DynamicChunkSizeTargetMillis int64
|
||||
targetChunkSizeMutex sync.Mutex
|
||||
targetchunkFeedback []time.Duration
|
||||
targetChunkFeedback []time.Duration
|
||||
targetChunkSize int64
|
||||
|
||||
DropServeSocket bool
|
||||
|
@ -577,37 +577,40 @@ func (this *MigrationContext) GetTotalRowsCopied() int64 {
|
|||
return atomic.LoadInt64(&this.TotalRowsCopied)
|
||||
}
|
||||
|
||||
// ChunkDurationFeedback collects samples from copy-rows tasks, and feeds them
|
||||
// back into a moving p90 that is used to return a more precise value
|
||||
// in GetChunkSize() calls. Usually we wait for multiple samples and then recalculate
|
||||
// in GetChunkSize(), however if the input value far exceeds what was expected (>5x)
|
||||
// we synchronously reduce the chunk size. If it was a one off, it's not an issue
|
||||
// because the next few samples will always scale the value back up.
|
||||
func (this *MigrationContext) ChunkDurationFeedback(d time.Duration) {
|
||||
this.targetChunkSizeMutex.Lock()
|
||||
defer this.targetChunkSizeMutex.Unlock()
|
||||
|
||||
if int64(d) > (this.DynamicChunkSizeTargetMillis * DynamicPanicFactor * int64(time.Millisecond)) {
|
||||
// We're 5x our target size. Something went seriously wrong,
|
||||
// let's pump the brakes immediately to get back into range.
|
||||
fmt.Printf("## error: chunk was too slow! Dividing by 10\n")
|
||||
this.targetchunkFeedback = []time.Duration{}
|
||||
this.targetChunkFeedback = []time.Duration{}
|
||||
newTarget := float64(this.targetChunkSize) / float64(DynamicPanicFactor*2)
|
||||
this.targetChunkSize = int64(newTarget)
|
||||
}
|
||||
this.targetchunkFeedback = append(this.targetchunkFeedback, d)
|
||||
this.targetChunkFeedback = append(this.targetChunkFeedback, d)
|
||||
}
|
||||
|
||||
// calculateNewTargetChunkSize is called by GetChunkSize()
|
||||
// under a mutex. It's safe to read this.targetchunkFeedback.
|
||||
func (this *MigrationContext) calculateNewTargetChunkSize() int64 {
|
||||
this.targetChunkSizeMutex.Lock()
|
||||
defer this.targetChunkSizeMutex.Unlock()
|
||||
|
||||
// We do all our math as float64 of time in ns
|
||||
p90 := float64(findP90(this.targetchunkFeedback))
|
||||
p90 := float64(findP90(this.targetChunkFeedback))
|
||||
targetTime := float64(this.DynamicChunkSizeTargetMillis * int64(time.Millisecond))
|
||||
newTargetRows := float64(this.targetChunkSize) * (targetTime / p90)
|
||||
|
||||
// Apply some final boundary checking:
|
||||
// We are only allowed to scale up/down 50x from the original chunk size.
|
||||
if newTargetRows < float64(this.chunkSize)/MaxDynamicScaleFactor {
|
||||
newTargetRows = float64(this.chunkSize) / MaxDynamicScaleFactor
|
||||
// We are only allowed to scale up/down 50x from
|
||||
// the original ("reference") chunk size.
|
||||
referenceSize := float64(atomic.LoadInt64(&this.chunkSize))
|
||||
if newTargetRows < (referenceSize / MaxDynamicScaleFactor) {
|
||||
newTargetRows = referenceSize / MaxDynamicScaleFactor
|
||||
}
|
||||
if newTargetRows > float64(this.chunkSize)*MaxDynamicScaleFactor {
|
||||
newTargetRows = float64(this.chunkSize) * MaxDynamicScaleFactor
|
||||
if newTargetRows > (referenceSize * MaxDynamicScaleFactor) {
|
||||
newTargetRows = referenceSize * MaxDynamicScaleFactor
|
||||
}
|
||||
// We only ever increase by 50% at a time.
|
||||
// This ensures a gradual step up if there is some non-linear behavior.
|
||||
|
@ -620,33 +623,40 @@ func (this *MigrationContext) calculateNewTargetChunkSize() int64 {
|
|||
if newTargetRows < MinDynamicRowSize {
|
||||
newTargetRows = MinDynamicRowSize
|
||||
}
|
||||
this.targetchunkFeedback = []time.Duration{} // reset
|
||||
return int64(newTargetRows)
|
||||
}
|
||||
|
||||
// GetChunkSize returns the number of rows to copy in a single chunk:
|
||||
// - If DynamicChunkSize is disabled, it will return this.chunkSize.
|
||||
// - If DynamicChunkSize is enabled, it will return a dynamic value that
|
||||
// automatically adjusts based on the duration of the last few
|
||||
// copy-rows tasks.
|
||||
//
|
||||
// Historically gh-ost has used a static chunk size (i.e. 1000 rows)
|
||||
// which can be adjusted while gh-ost is running.
|
||||
// An ideal chunk size is large enough that it can batch operations,
|
||||
// but small enough that it doesn't cause spikes in replica lag.
|
||||
//
|
||||
// The problem with basing the configurable on row-size is two fold:
|
||||
// - Fow very narrow rows, it's not enough (leaving performance on the table).
|
||||
// - For very wide rows (or with many secondary indexes) 1000 might be too high!
|
||||
//
|
||||
// Dynamic chunk size addresses this by using row-size as a starting point,
|
||||
// *but* the main configurable is based on time (in ms).
|
||||
func (this *MigrationContext) GetChunkSize() int64 {
|
||||
if !this.DynamicChunkSize {
|
||||
return atomic.LoadInt64(&this.chunkSize)
|
||||
}
|
||||
// Historically gh-ost has used a static chunk size (i.e. 1000 rows)
|
||||
// which is can be adjusted while gh-ost is running.
|
||||
// An ideal chunk size is large enough that it can batch operations,
|
||||
// but small enough that it doesn't cause spikes in replica lag.
|
||||
//
|
||||
// The problem with basing the configurable on row-size is two fold:
|
||||
// - Fow very narrow rows, it's not enough (leaving performance on the table).
|
||||
// - For very wide rows (or with many secondary indexes) 1000 might be too high!
|
||||
//
|
||||
// Dynamic chunk size addresses this by using row-size as a starting point,
|
||||
// and then increases or decreases the number based on feedback from previous
|
||||
// copy tasks.
|
||||
this.targetChunkSizeMutex.Lock()
|
||||
defer this.targetChunkSizeMutex.Unlock()
|
||||
if this.targetChunkSize == 0 {
|
||||
this.targetChunkSize = atomic.LoadInt64(&this.chunkSize)
|
||||
}
|
||||
// We need 10 samples to make a decision because we
|
||||
// calculate it from the p90 (i.e. 2nd to highest value).
|
||||
if len(this.targetchunkFeedback) >= 10 {
|
||||
if len(this.targetChunkFeedback) >= 10 {
|
||||
this.targetChunkSize = this.calculateNewTargetChunkSize()
|
||||
this.targetChunkFeedback = []time.Duration{} // reset
|
||||
fmt.Printf("# Adjusting chunk size based on feedback: %d\n", this.targetChunkSize)
|
||||
}
|
||||
return this.targetChunkSize
|
||||
|
|
Loading…
Reference in New Issue
Block a user