WIP: decoupling general throttling from throttle logic
This commit is contained in:
parent
75b2542f26
commit
23357d0643
@ -41,6 +41,18 @@ var (
|
||||
envVariableRegexp = regexp.MustCompile("[$][{](.*)[}]")
|
||||
)
|
||||
|
||||
type ThrottleCheckResult struct {
|
||||
ShouldThrottle bool
|
||||
Reason string
|
||||
}
|
||||
|
||||
func NewThrottleCheckResult(throttle bool, reason string) *ThrottleCheckResult {
|
||||
return &ThrottleCheckResult{
|
||||
ShouldThrottle: throttle,
|
||||
Reason: reason,
|
||||
}
|
||||
}
|
||||
|
||||
// MigrationContext has the general, global state of migration. It is used by
|
||||
// all components throughout the migration process.
|
||||
type MigrationContext struct {
|
||||
@ -96,33 +108,34 @@ type MigrationContext struct {
|
||||
InitiallyDropGhostTable bool
|
||||
CutOverType CutOver
|
||||
|
||||
Hostname string
|
||||
TableEngine string
|
||||
RowsEstimate int64
|
||||
RowsDeltaEstimate int64
|
||||
UsedRowsEstimateMethod RowsEstimateMethod
|
||||
HasSuperPrivilege bool
|
||||
OriginalBinlogFormat string
|
||||
OriginalBinlogRowImage string
|
||||
InspectorConnectionConfig *mysql.ConnectionConfig
|
||||
ApplierConnectionConfig *mysql.ConnectionConfig
|
||||
StartTime time.Time
|
||||
RowCopyStartTime time.Time
|
||||
RowCopyEndTime time.Time
|
||||
LockTablesStartTime time.Time
|
||||
RenameTablesStartTime time.Time
|
||||
RenameTablesEndTime time.Time
|
||||
pointOfInterestTime time.Time
|
||||
pointOfInterestTimeMutex *sync.Mutex
|
||||
CurrentLag int64
|
||||
controlReplicasLagResult mysql.ReplicationLagResult
|
||||
TotalRowsCopied int64
|
||||
TotalDMLEventsApplied int64
|
||||
isThrottled bool
|
||||
throttleReason string
|
||||
throttleMutex *sync.Mutex
|
||||
IsPostponingCutOver int64
|
||||
CountingRowsFlag int64
|
||||
Hostname string
|
||||
TableEngine string
|
||||
RowsEstimate int64
|
||||
RowsDeltaEstimate int64
|
||||
UsedRowsEstimateMethod RowsEstimateMethod
|
||||
HasSuperPrivilege bool
|
||||
OriginalBinlogFormat string
|
||||
OriginalBinlogRowImage string
|
||||
InspectorConnectionConfig *mysql.ConnectionConfig
|
||||
ApplierConnectionConfig *mysql.ConnectionConfig
|
||||
StartTime time.Time
|
||||
RowCopyStartTime time.Time
|
||||
RowCopyEndTime time.Time
|
||||
LockTablesStartTime time.Time
|
||||
RenameTablesStartTime time.Time
|
||||
RenameTablesEndTime time.Time
|
||||
pointOfInterestTime time.Time
|
||||
pointOfInterestTimeMutex *sync.Mutex
|
||||
CurrentLag int64
|
||||
controlReplicasLagResult mysql.ReplicationLagResult
|
||||
TotalRowsCopied int64
|
||||
TotalDMLEventsApplied int64
|
||||
isThrottled bool
|
||||
throttleReason string
|
||||
throttleGeneralCheckResult ThrottleCheckResult
|
||||
throttleMutex *sync.Mutex
|
||||
IsPostponingCutOver int64
|
||||
CountingRowsFlag int64
|
||||
|
||||
OriginalTableColumns *sql.ColumnList
|
||||
OriginalTableUniqueKeys [](*sql.UniqueKey)
|
||||
@ -355,6 +368,20 @@ func (this *MigrationContext) SetChunkSize(chunkSize int64) {
|
||||
atomic.StoreInt64(&this.ChunkSize, chunkSize)
|
||||
}
|
||||
|
||||
func (this *MigrationContext) SetThrottleGeneralCheckResult(checkResult *ThrottleCheckResult) *ThrottleCheckResult {
|
||||
this.throttleMutex.Lock()
|
||||
defer this.throttleMutex.Unlock()
|
||||
this.throttleGeneralCheckResult = *checkResult
|
||||
return checkResult
|
||||
}
|
||||
|
||||
func (this *MigrationContext) GetThrottleGeneralCheckResult() *ThrottleCheckResult {
|
||||
this.throttleMutex.Lock()
|
||||
defer this.throttleMutex.Unlock()
|
||||
result := this.throttleGeneralCheckResult
|
||||
return &result
|
||||
}
|
||||
|
||||
func (this *MigrationContext) SetThrottled(throttle bool, reason string) {
|
||||
this.throttleMutex.Lock()
|
||||
defer this.throttleMutex.Unlock()
|
||||
|
@ -55,9 +55,11 @@ type Migrator struct {
|
||||
applier *Applier
|
||||
eventsStreamer *EventsStreamer
|
||||
server *Server
|
||||
throttler *Throttler
|
||||
hooksExecutor *HooksExecutor
|
||||
migrationContext *base.MigrationContext
|
||||
|
||||
firstThrottlingCollected chan bool
|
||||
tablesInPlace chan bool
|
||||
rowCopyComplete chan bool
|
||||
allEventsUpToLockProcessed chan bool
|
||||
@ -81,6 +83,7 @@ func NewMigrator() *Migrator {
|
||||
migrationContext: base.GetMigrationContext(),
|
||||
parser: sql.NewParser(),
|
||||
tablesInPlace: make(chan bool),
|
||||
firstThrottlingCollected: make(chan bool, 1),
|
||||
rowCopyComplete: make(chan bool),
|
||||
allEventsUpToLockProcessed: make(chan bool),
|
||||
panicAbort: make(chan error),
|
||||
@ -119,42 +122,12 @@ func (this *Migrator) initiateHooksExecutor() (err error) {
|
||||
}
|
||||
|
||||
// shouldThrottle performs checks to see whether we should currently be throttling.
|
||||
// It also checks for critical-load and panic aborts.
|
||||
// It merely observes the metrics collected by other components, it does not issue
|
||||
// its own metric collection.
|
||||
func (this *Migrator) shouldThrottle() (result bool, reason string) {
|
||||
// Regardless of throttle, we take opportunity to check for panic-abort
|
||||
if this.migrationContext.PanicFlagFile != "" {
|
||||
if base.FileExists(this.migrationContext.PanicFlagFile) {
|
||||
this.panicAbort <- fmt.Errorf("Found panic-file %s. Aborting without cleanup", this.migrationContext.PanicFlagFile)
|
||||
}
|
||||
}
|
||||
criticalLoad := this.migrationContext.GetCriticalLoad()
|
||||
for variableName, threshold := range criticalLoad {
|
||||
value, err := this.applier.ShowStatusVariable(variableName)
|
||||
if err != nil {
|
||||
return true, fmt.Sprintf("%s %s", variableName, err)
|
||||
}
|
||||
if value >= threshold {
|
||||
this.panicAbort <- fmt.Errorf("critical-load met: %s=%d, >=%d", variableName, value, threshold)
|
||||
}
|
||||
}
|
||||
|
||||
// Back to throttle considerations
|
||||
|
||||
// User-based throttle
|
||||
if atomic.LoadInt64(&this.migrationContext.ThrottleCommandedByUser) > 0 {
|
||||
return true, "commanded by user"
|
||||
}
|
||||
if this.migrationContext.ThrottleFlagFile != "" {
|
||||
if base.FileExists(this.migrationContext.ThrottleFlagFile) {
|
||||
// Throttle file defined and exists!
|
||||
return true, "flag-file"
|
||||
}
|
||||
}
|
||||
if this.migrationContext.ThrottleAdditionalFlagFile != "" {
|
||||
if base.FileExists(this.migrationContext.ThrottleAdditionalFlagFile) {
|
||||
// 2nd Throttle file defined and exists!
|
||||
return true, "flag-file"
|
||||
}
|
||||
generalCheckResult := this.migrationContext.GetThrottleGeneralCheckResult()
|
||||
if generalCheckResult.ShouldThrottle {
|
||||
return generalCheckResult.ShouldThrottle, generalCheckResult.Reason
|
||||
}
|
||||
// Replication lag throttle
|
||||
maxLagMillisecondsThrottleThreshold := atomic.LoadInt64(&this.migrationContext.MaxLagMillisecondsThrottleThreshold)
|
||||
@ -175,29 +148,93 @@ func (this *Migrator) shouldThrottle() (result bool, reason string) {
|
||||
return true, fmt.Sprintf("%+v replica-lag=%fs", lagResult.Key, lagResult.Lag.Seconds())
|
||||
}
|
||||
}
|
||||
// Got here? No metrics indicates we need throttling.
|
||||
return false, ""
|
||||
}
|
||||
|
||||
// readGeneralThrottleMetrics reads the once-per-sec metrics, and stores them onto this.migrationContext
|
||||
func (this *Migrator) readGeneralThrottleMetrics() error {
|
||||
|
||||
setThrottle := func(throttle bool, reason string) error {
|
||||
this.migrationContext.SetThrottleGeneralCheckResult(base.NewThrottleCheckResult(throttle, reason))
|
||||
return nil
|
||||
}
|
||||
|
||||
// Regardless of throttle, we take opportunity to check for panic-abort
|
||||
if this.migrationContext.PanicFlagFile != "" {
|
||||
if base.FileExists(this.migrationContext.PanicFlagFile) {
|
||||
this.panicAbort <- fmt.Errorf("Found panic-file %s. Aborting without cleanup", this.migrationContext.PanicFlagFile)
|
||||
}
|
||||
}
|
||||
criticalLoad := this.migrationContext.GetCriticalLoad()
|
||||
for variableName, threshold := range criticalLoad {
|
||||
value, err := this.applier.ShowStatusVariable(variableName)
|
||||
if err != nil {
|
||||
return setThrottle(true, fmt.Sprintf("%s %s", variableName, err))
|
||||
}
|
||||
if value >= threshold {
|
||||
this.panicAbort <- fmt.Errorf("critical-load met: %s=%d, >=%d", variableName, value, threshold)
|
||||
}
|
||||
}
|
||||
|
||||
// Back to throttle considerations
|
||||
|
||||
// User-based throttle
|
||||
if atomic.LoadInt64(&this.migrationContext.ThrottleCommandedByUser) > 0 {
|
||||
return setThrottle(true, "commanded by user")
|
||||
}
|
||||
if this.migrationContext.ThrottleFlagFile != "" {
|
||||
if base.FileExists(this.migrationContext.ThrottleFlagFile) {
|
||||
// Throttle file defined and exists!
|
||||
return setThrottle(true, "flag-file")
|
||||
}
|
||||
}
|
||||
if this.migrationContext.ThrottleAdditionalFlagFile != "" {
|
||||
if base.FileExists(this.migrationContext.ThrottleAdditionalFlagFile) {
|
||||
// 2nd Throttle file defined and exists!
|
||||
return setThrottle(true, "flag-file")
|
||||
}
|
||||
}
|
||||
|
||||
maxLoad := this.migrationContext.GetMaxLoad()
|
||||
for variableName, threshold := range maxLoad {
|
||||
value, err := this.applier.ShowStatusVariable(variableName)
|
||||
if err != nil {
|
||||
return true, fmt.Sprintf("%s %s", variableName, err)
|
||||
return setThrottle(true, fmt.Sprintf("%s %s", variableName, err))
|
||||
}
|
||||
if value >= threshold {
|
||||
return true, fmt.Sprintf("max-load %s=%d >= %d", variableName, value, threshold)
|
||||
return setThrottle(true, fmt.Sprintf("max-load %s=%d >= %d", variableName, value, threshold))
|
||||
}
|
||||
}
|
||||
if this.migrationContext.GetThrottleQuery() != "" {
|
||||
if res, _ := this.applier.ExecuteThrottleQuery(); res > 0 {
|
||||
return true, "throttle-query"
|
||||
return setThrottle(true, "throttle-query")
|
||||
}
|
||||
}
|
||||
|
||||
return false, ""
|
||||
return setThrottle(false, "")
|
||||
}
|
||||
|
||||
// initiateThrottlerMetrics initiates the various processes that collect measurements
|
||||
// that may affect throttling. There are several components, all running independently,
|
||||
// that collect such metrics.
|
||||
func (this *Migrator) initiateThrottlerMetrics() {
|
||||
go this.initiateHeartbeatReader()
|
||||
go this.initiateControlReplicasReader()
|
||||
|
||||
go func() {
|
||||
throttlerMetricsTick := time.Tick(1 * time.Second)
|
||||
this.readGeneralThrottleMetrics()
|
||||
this.firstThrottlingCollected <- true
|
||||
for range throttlerMetricsTick {
|
||||
this.readGeneralThrottleMetrics()
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// initiateThrottler initiates the throttle ticker and sets the basic behavior of throttling.
|
||||
func (this *Migrator) initiateThrottler() error {
|
||||
throttlerTick := time.Tick(1 * time.Second)
|
||||
throttlerTick := time.Tick(100 * time.Millisecond)
|
||||
|
||||
throttlerFunction := func() {
|
||||
alreadyThrottling, currentReason := this.migrationContext.IsThrottled()
|
||||
@ -453,16 +490,15 @@ func (this *Migrator) Migrate() (err error) {
|
||||
if err := this.countTableRows(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := this.addDMLEventsListener(); err != nil {
|
||||
return err
|
||||
}
|
||||
go this.initiateHeartbeatReader()
|
||||
go this.initiateControlReplicasReader()
|
||||
|
||||
if err := this.applier.ReadMigrationRangeValues(); err != nil {
|
||||
return err
|
||||
}
|
||||
go this.initiateThrottlerMetrics()
|
||||
log.Infof("Waiting for first throttle metrics to be collected")
|
||||
<-this.firstThrottlingCollected
|
||||
go this.initiateThrottler()
|
||||
if err := this.hooksExecutor.onBeforeRowCopy(); err != nil {
|
||||
return err
|
||||
@ -1206,6 +1242,11 @@ func (this *Migrator) addDMLEventsListener() error {
|
||||
return err
|
||||
}
|
||||
|
||||
func (this *Migrator) initiateThrottler() error {
|
||||
this.throttler = NewThrottler(this.panicAbort)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (this *Migrator) initiateApplier() error {
|
||||
this.applier = NewApplier()
|
||||
if err := this.applier.InitDBConnections(); err != nil {
|
||||
|
Loading…
x
Reference in New Issue
Block a user