2016-04-01 11:36:56 +00:00
|
|
|
/*
|
|
|
|
Copyright 2016 GitHub Inc.
|
2016-05-16 09:09:17 +00:00
|
|
|
See https://github.com/github/gh-ost/blob/master/LICENSE
|
2016-04-01 11:36:56 +00:00
|
|
|
*/
|
|
|
|
|
|
|
|
package base
|
|
|
|
|
2016-04-04 13:29:02 +00:00
|
|
|
import (
|
|
|
|
"fmt"
|
2016-07-25 13:46:37 +00:00
|
|
|
"os"
|
|
|
|
"regexp"
|
2016-04-08 08:34:44 +00:00
|
|
|
"strings"
|
2016-04-11 15:27:16 +00:00
|
|
|
"sync"
|
2016-04-08 08:34:44 +00:00
|
|
|
"sync/atomic"
|
2016-04-07 13:57:12 +00:00
|
|
|
"time"
|
2016-04-04 13:29:02 +00:00
|
|
|
|
2016-05-16 09:09:17 +00:00
|
|
|
"github.com/github/gh-ost/go/mysql"
|
|
|
|
"github.com/github/gh-ost/go/sql"
|
2016-05-03 07:28:48 +00:00
|
|
|
|
|
|
|
"gopkg.in/gcfg.v1"
|
2016-08-18 11:58:38 +00:00
|
|
|
gcfgscanner "gopkg.in/gcfg.v1/scanner"
|
2016-04-04 13:29:02 +00:00
|
|
|
)
|
2016-04-01 11:36:56 +00:00
|
|
|
|
2016-04-05 07:14:22 +00:00
|
|
|
// RowsEstimateMethod is the type of row number estimation
|
2016-04-04 10:27:51 +00:00
|
|
|
type RowsEstimateMethod string
|
|
|
|
|
|
|
|
const (
|
|
|
|
TableStatusRowsEstimate RowsEstimateMethod = "TableStatusRowsEstimate"
|
|
|
|
ExplainRowsEstimate = "ExplainRowsEstimate"
|
|
|
|
CountRowsEstimate = "CountRowsEstimate"
|
|
|
|
)
|
|
|
|
|
2016-06-06 10:33:05 +00:00
|
|
|
type CutOver int
|
|
|
|
|
|
|
|
const (
|
2016-06-27 09:08:06 +00:00
|
|
|
CutOverAtomic CutOver = iota
|
2016-06-14 07:01:06 +00:00
|
|
|
CutOverTwoStep = iota
|
2016-06-06 10:33:05 +00:00
|
|
|
)
|
|
|
|
|
2016-07-25 13:46:37 +00:00
|
|
|
var (
|
|
|
|
envVariableRegexp = regexp.MustCompile("[$][{](.*)[}]")
|
|
|
|
)
|
|
|
|
|
2016-08-30 09:32:17 +00:00
|
|
|
type ThrottleCheckResult struct {
|
|
|
|
ShouldThrottle bool
|
|
|
|
Reason string
|
|
|
|
}
|
|
|
|
|
|
|
|
func NewThrottleCheckResult(throttle bool, reason string) *ThrottleCheckResult {
|
|
|
|
return &ThrottleCheckResult{
|
|
|
|
ShouldThrottle: throttle,
|
|
|
|
Reason: reason,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2016-04-05 07:14:22 +00:00
|
|
|
// MigrationContext has the general, global state of migration. It is used by
|
|
|
|
// all components throughout the migration process.
|
2016-04-01 11:36:56 +00:00
|
|
|
type MigrationContext struct {
|
2016-04-14 11:37:56 +00:00
|
|
|
DatabaseName string
|
|
|
|
OriginalTableName string
|
|
|
|
AlterStatement string
|
|
|
|
|
2016-05-20 10:52:14 +00:00
|
|
|
CountTableRows bool
|
2016-08-24 09:39:44 +00:00
|
|
|
ConcurrentCountTableRows bool
|
2016-05-20 10:52:14 +00:00
|
|
|
AllowedRunningOnMaster bool
|
2016-06-22 08:38:13 +00:00
|
|
|
AllowedMasterMaster bool
|
2016-05-20 10:52:14 +00:00
|
|
|
SwitchToRowBinlogFormat bool
|
2016-08-15 09:05:51 +00:00
|
|
|
AssumeRBR bool
|
2016-05-20 10:52:14 +00:00
|
|
|
NullableUniqueKeyAllowed bool
|
2016-06-17 06:03:18 +00:00
|
|
|
ApproveRenamedColumns bool
|
|
|
|
SkipRenamedColumns bool
|
2016-09-02 11:09:18 +00:00
|
|
|
IsTungsten bool
|
2016-04-14 11:37:56 +00:00
|
|
|
|
2016-05-17 13:35:44 +00:00
|
|
|
config ContextConfig
|
|
|
|
configMutex *sync.Mutex
|
|
|
|
ConfigFile string
|
|
|
|
CliUser string
|
|
|
|
CliPassword string
|
2016-05-03 07:28:48 +00:00
|
|
|
|
2016-08-30 07:41:59 +00:00
|
|
|
HeartbeatIntervalMilliseconds int64
|
2016-06-19 15:55:37 +00:00
|
|
|
defaultNumRetries int64
|
2016-04-08 08:34:44 +00:00
|
|
|
ChunkSize int64
|
2016-07-28 12:37:17 +00:00
|
|
|
niceRatio float64
|
2016-04-07 13:57:12 +00:00
|
|
|
MaxLagMillisecondsThrottleThreshold int64
|
2016-07-26 12:14:25 +00:00
|
|
|
replicationLagQuery string
|
|
|
|
throttleControlReplicaKeys *mysql.InstanceKeyMap
|
2016-04-08 08:34:44 +00:00
|
|
|
ThrottleFlagFile string
|
2016-04-11 15:27:16 +00:00
|
|
|
ThrottleAdditionalFlagFile string
|
2016-07-26 12:14:25 +00:00
|
|
|
throttleQuery string
|
2016-06-07 09:59:17 +00:00
|
|
|
ThrottleCommandedByUser int64
|
2016-06-18 19:12:07 +00:00
|
|
|
maxLoad LoadMap
|
|
|
|
criticalLoad LoadMap
|
2016-10-10 11:21:01 +00:00
|
|
|
CriticalLoadIntervalMilliseconds int64
|
2016-06-07 12:05:25 +00:00
|
|
|
PostponeCutOverFlagFile string
|
2016-07-08 08:14:58 +00:00
|
|
|
CutOverLockTimeoutSeconds int64
|
2016-09-12 17:17:36 +00:00
|
|
|
ForceNamedCutOverCommand bool
|
2016-06-17 09:40:08 +00:00
|
|
|
PanicFlagFile string
|
2016-08-20 06:24:20 +00:00
|
|
|
HooksPath string
|
2016-08-25 11:54:42 +00:00
|
|
|
HooksHintMessage string
|
2016-04-08 12:35:06 +00:00
|
|
|
|
2016-07-22 15:34:18 +00:00
|
|
|
DropServeSocket bool
|
2016-06-07 09:59:17 +00:00
|
|
|
ServeSocketFile string
|
|
|
|
ServeTCPPort int64
|
|
|
|
|
2016-08-19 21:34:08 +00:00
|
|
|
Noop bool
|
|
|
|
TestOnReplica bool
|
|
|
|
MigrateOnReplica bool
|
|
|
|
TestOnReplicaSkipReplicaStop bool
|
|
|
|
OkToDropTable bool
|
|
|
|
InitiallyDropOldTable bool
|
|
|
|
InitiallyDropGhostTable bool
|
|
|
|
CutOverType CutOver
|
2016-04-18 17:57:18 +00:00
|
|
|
|
2016-08-30 10:25:45 +00:00
|
|
|
Hostname string
|
2016-09-02 11:09:18 +00:00
|
|
|
AssumeMasterHostname string
|
2016-08-30 10:25:45 +00:00
|
|
|
TableEngine string
|
|
|
|
RowsEstimate int64
|
|
|
|
RowsDeltaEstimate int64
|
|
|
|
UsedRowsEstimateMethod RowsEstimateMethod
|
|
|
|
HasSuperPrivilege bool
|
|
|
|
OriginalBinlogFormat string
|
|
|
|
OriginalBinlogRowImage string
|
|
|
|
InspectorConnectionConfig *mysql.ConnectionConfig
|
|
|
|
ApplierConnectionConfig *mysql.ConnectionConfig
|
|
|
|
StartTime time.Time
|
|
|
|
RowCopyStartTime time.Time
|
|
|
|
RowCopyEndTime time.Time
|
|
|
|
LockTablesStartTime time.Time
|
|
|
|
RenameTablesStartTime time.Time
|
|
|
|
RenameTablesEndTime time.Time
|
|
|
|
pointOfInterestTime time.Time
|
|
|
|
pointOfInterestTimeMutex *sync.Mutex
|
|
|
|
CurrentLag int64
|
|
|
|
controlReplicasLagResult mysql.ReplicationLagResult
|
|
|
|
TotalRowsCopied int64
|
|
|
|
TotalDMLEventsApplied int64
|
|
|
|
isThrottled bool
|
|
|
|
throttleReason string
|
|
|
|
throttleGeneralCheckResult ThrottleCheckResult
|
|
|
|
throttleMutex *sync.Mutex
|
|
|
|
IsPostponingCutOver int64
|
|
|
|
CountingRowsFlag int64
|
|
|
|
AllEventsUpToLockProcessedInjectedFlag int64
|
|
|
|
CleanupImminentFlag int64
|
2016-09-12 10:38:14 +00:00
|
|
|
UserCommandedUnpostponeFlag int64
|
|
|
|
PanicAbort chan error
|
2016-04-18 17:57:18 +00:00
|
|
|
|
2016-04-11 15:27:16 +00:00
|
|
|
OriginalTableColumns *sql.ColumnList
|
2016-04-08 12:35:06 +00:00
|
|
|
OriginalTableUniqueKeys [](*sql.UniqueKey)
|
2016-04-11 15:27:16 +00:00
|
|
|
GhostTableColumns *sql.ColumnList
|
2016-04-08 12:35:06 +00:00
|
|
|
GhostTableUniqueKeys [](*sql.UniqueKey)
|
|
|
|
UniqueKey *sql.UniqueKey
|
2016-04-11 15:27:16 +00:00
|
|
|
SharedColumns *sql.ColumnList
|
2016-06-17 06:03:18 +00:00
|
|
|
ColumnRenameMap map[string]string
|
|
|
|
MappedSharedColumns *sql.ColumnList
|
2016-04-08 12:35:06 +00:00
|
|
|
MigrationRangeMinValues *sql.ColumnValues
|
|
|
|
MigrationRangeMaxValues *sql.ColumnValues
|
|
|
|
Iteration int64
|
|
|
|
MigrationIterationRangeMinValues *sql.ColumnValues
|
|
|
|
MigrationIterationRangeMaxValues *sql.ColumnValues
|
2016-04-07 13:57:12 +00:00
|
|
|
|
|
|
|
CanStopStreaming func() bool
|
2016-04-01 11:36:56 +00:00
|
|
|
}
|
2016-04-01 14:05:44 +00:00
|
|
|
|
2016-05-17 13:35:44 +00:00
|
|
|
type ContextConfig struct {
|
|
|
|
Client struct {
|
|
|
|
User string
|
|
|
|
Password string
|
|
|
|
}
|
|
|
|
Osc struct {
|
|
|
|
Chunk_Size int64
|
|
|
|
Max_Lag_Millis int64
|
|
|
|
Replication_Lag_Query string
|
|
|
|
Max_Load string
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2016-04-04 10:27:51 +00:00
|
|
|
var context *MigrationContext
|
|
|
|
|
|
|
|
func init() {
|
|
|
|
context = newMigrationContext()
|
|
|
|
}
|
2016-04-01 14:05:44 +00:00
|
|
|
|
|
|
|
func newMigrationContext() *MigrationContext {
|
2016-04-04 10:27:51 +00:00
|
|
|
return &MigrationContext{
|
2016-06-19 15:55:37 +00:00
|
|
|
defaultNumRetries: 60,
|
2016-04-07 13:57:12 +00:00
|
|
|
ChunkSize: 1000,
|
|
|
|
InspectorConnectionConfig: mysql.NewConnectionConfig(),
|
2016-04-14 11:37:56 +00:00
|
|
|
ApplierConnectionConfig: mysql.NewConnectionConfig(),
|
2016-07-13 07:44:00 +00:00
|
|
|
MaxLagMillisecondsThrottleThreshold: 1500,
|
2016-07-08 08:14:58 +00:00
|
|
|
CutOverLockTimeoutSeconds: 3,
|
2016-06-18 19:12:07 +00:00
|
|
|
maxLoad: NewLoadMap(),
|
|
|
|
criticalLoad: NewLoadMap(),
|
2016-04-23 02:46:34 +00:00
|
|
|
throttleMutex: &sync.Mutex{},
|
2016-07-26 12:14:25 +00:00
|
|
|
throttleControlReplicaKeys: mysql.NewInstanceKeyMap(),
|
2016-05-17 13:35:44 +00:00
|
|
|
configMutex: &sync.Mutex{},
|
2016-05-23 12:58:53 +00:00
|
|
|
pointOfInterestTimeMutex: &sync.Mutex{},
|
2016-06-17 06:03:18 +00:00
|
|
|
ColumnRenameMap: make(map[string]string),
|
2016-09-12 10:38:14 +00:00
|
|
|
PanicAbort: make(chan error),
|
2016-04-04 10:27:51 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2016-04-04 16:19:46 +00:00
|
|
|
// GetMigrationContext
|
2016-04-04 10:27:51 +00:00
|
|
|
func GetMigrationContext() *MigrationContext {
|
|
|
|
return context
|
|
|
|
}
|
|
|
|
|
2016-04-05 07:14:22 +00:00
|
|
|
// GetGhostTableName generates the name of ghost table, based on original table name
|
2016-04-04 13:29:02 +00:00
|
|
|
func (this *MigrationContext) GetGhostTableName() string {
|
Solved cut-over stall; change of table names
- Cutover would stall after `lock tables` wait-timeout due do waiting on a channel that would never be written to. This has been identified, reproduced, fixed, confirmed.
- Change of table names. Heres the story:
- Because were testing this even while `pt-online-schema-change` is being used in production, the `_tbl_old` naming convention makes for a collision.
- "old" table name is now `_tbl_del`, "del" standing for "delete"
- ghost table name is now `_tbl_gho`
- when issuing `--test-on-replica`, we keep the ghost table around, and were also briefly renaming original table to "old". Well this collides with a potentially existing "old" table on master (one that hasnt been dropped yet).
`--test-on-replica` uses `_tbl_ght` (ghost-test)
- similar problem with `--execute-on-replica`, and in this case the table doesnt stick around; calling it `_tbl_ghr` (ghost-replica)
- changelog table is now `_tbl_ghc` (ghost-changelog)
- To clarify, I dont want to go down the path of creating "old" tables with 2 or 3 or 4 or 5 or infinite leading underscored. I think this is very confusing and actually not operations friendly. Its OK that the migration will fail saying "hey, you ALREADY have an old table here, why dont you take care of it first", rather than create _yet_another_ `____tbl_old` table. Were always confused on which table it actually is that gets migrated, which is safe to `drop`, etc.
- just after rowcopy completing, just before cutover, during cutover: marking as point in time _of interest_ so as to increase logging frequency.
2016-06-21 10:56:01 +00:00
|
|
|
return fmt.Sprintf("_%s_gho", this.OriginalTableName)
|
2016-04-04 13:29:02 +00:00
|
|
|
}
|
|
|
|
|
2016-04-11 15:27:16 +00:00
|
|
|
// GetOldTableName generates the name of the "old" table, into which the original table is renamed.
|
|
|
|
func (this *MigrationContext) GetOldTableName() string {
|
Solved cut-over stall; change of table names
- Cutover would stall after `lock tables` wait-timeout due do waiting on a channel that would never be written to. This has been identified, reproduced, fixed, confirmed.
- Change of table names. Heres the story:
- Because were testing this even while `pt-online-schema-change` is being used in production, the `_tbl_old` naming convention makes for a collision.
- "old" table name is now `_tbl_del`, "del" standing for "delete"
- ghost table name is now `_tbl_gho`
- when issuing `--test-on-replica`, we keep the ghost table around, and were also briefly renaming original table to "old". Well this collides with a potentially existing "old" table on master (one that hasnt been dropped yet).
`--test-on-replica` uses `_tbl_ght` (ghost-test)
- similar problem with `--execute-on-replica`, and in this case the table doesnt stick around; calling it `_tbl_ghr` (ghost-replica)
- changelog table is now `_tbl_ghc` (ghost-changelog)
- To clarify, I dont want to go down the path of creating "old" tables with 2 or 3 or 4 or 5 or infinite leading underscored. I think this is very confusing and actually not operations friendly. Its OK that the migration will fail saying "hey, you ALREADY have an old table here, why dont you take care of it first", rather than create _yet_another_ `____tbl_old` table. Were always confused on which table it actually is that gets migrated, which is safe to `drop`, etc.
- just after rowcopy completing, just before cutover, during cutover: marking as point in time _of interest_ so as to increase logging frequency.
2016-06-21 10:56:01 +00:00
|
|
|
if this.TestOnReplica {
|
|
|
|
return fmt.Sprintf("_%s_ght", this.OriginalTableName)
|
|
|
|
}
|
|
|
|
if this.MigrateOnReplica {
|
|
|
|
return fmt.Sprintf("_%s_ghr", this.OriginalTableName)
|
|
|
|
}
|
|
|
|
return fmt.Sprintf("_%s_del", this.OriginalTableName)
|
2016-04-11 15:27:16 +00:00
|
|
|
}
|
|
|
|
|
2016-04-07 13:57:12 +00:00
|
|
|
// GetChangelogTableName generates the name of changelog table, based on original table name
|
|
|
|
func (this *MigrationContext) GetChangelogTableName() string {
|
Solved cut-over stall; change of table names
- Cutover would stall after `lock tables` wait-timeout due do waiting on a channel that would never be written to. This has been identified, reproduced, fixed, confirmed.
- Change of table names. Heres the story:
- Because were testing this even while `pt-online-schema-change` is being used in production, the `_tbl_old` naming convention makes for a collision.
- "old" table name is now `_tbl_del`, "del" standing for "delete"
- ghost table name is now `_tbl_gho`
- when issuing `--test-on-replica`, we keep the ghost table around, and were also briefly renaming original table to "old". Well this collides with a potentially existing "old" table on master (one that hasnt been dropped yet).
`--test-on-replica` uses `_tbl_ght` (ghost-test)
- similar problem with `--execute-on-replica`, and in this case the table doesnt stick around; calling it `_tbl_ghr` (ghost-replica)
- changelog table is now `_tbl_ghc` (ghost-changelog)
- To clarify, I dont want to go down the path of creating "old" tables with 2 or 3 or 4 or 5 or infinite leading underscored. I think this is very confusing and actually not operations friendly. Its OK that the migration will fail saying "hey, you ALREADY have an old table here, why dont you take care of it first", rather than create _yet_another_ `____tbl_old` table. Were always confused on which table it actually is that gets migrated, which is safe to `drop`, etc.
- just after rowcopy completing, just before cutover, during cutover: marking as point in time _of interest_ so as to increase logging frequency.
2016-06-21 10:56:01 +00:00
|
|
|
return fmt.Sprintf("_%s_ghc", this.OriginalTableName)
|
2016-04-07 13:57:12 +00:00
|
|
|
}
|
|
|
|
|
2016-04-22 20:41:20 +00:00
|
|
|
// GetVoluntaryLockName returns a name of a voluntary lock to be used throughout
|
|
|
|
// the swap-tables process.
|
|
|
|
func (this *MigrationContext) GetVoluntaryLockName() string {
|
|
|
|
return fmt.Sprintf("%s.%s.lock", this.DatabaseName, this.OriginalTableName)
|
|
|
|
}
|
|
|
|
|
2016-04-05 07:14:22 +00:00
|
|
|
// RequiresBinlogFormatChange is `true` when the original binlog format isn't `ROW`
|
2016-04-04 10:27:51 +00:00
|
|
|
func (this *MigrationContext) RequiresBinlogFormatChange() bool {
|
|
|
|
return this.OriginalBinlogFormat != "ROW"
|
2016-04-01 14:05:44 +00:00
|
|
|
}
|
2016-04-04 13:29:02 +00:00
|
|
|
|
2016-04-14 11:37:56 +00:00
|
|
|
// InspectorIsAlsoApplier is `true` when the both inspector and applier are the
|
|
|
|
// same database instance. This would be true when running directly on master or when
|
|
|
|
// testing on replica.
|
|
|
|
func (this *MigrationContext) InspectorIsAlsoApplier() bool {
|
|
|
|
return this.InspectorConnectionConfig.Equals(this.ApplierConnectionConfig)
|
2016-04-04 13:29:02 +00:00
|
|
|
}
|
2016-04-04 16:19:46 +00:00
|
|
|
|
2016-04-05 07:14:22 +00:00
|
|
|
// HasMigrationRange tells us whether there's a range to iterate for copying rows.
|
|
|
|
// It will be `false` if the table is initially empty
|
2016-04-04 16:19:46 +00:00
|
|
|
func (this *MigrationContext) HasMigrationRange() bool {
|
2016-04-05 07:14:22 +00:00
|
|
|
return this.MigrationRangeMinValues != nil && this.MigrationRangeMaxValues != nil
|
2016-04-04 16:19:46 +00:00
|
|
|
}
|
2016-04-07 13:57:12 +00:00
|
|
|
|
2016-07-08 08:14:58 +00:00
|
|
|
func (this *MigrationContext) SetCutOverLockTimeoutSeconds(timeoutSeconds int64) error {
|
|
|
|
if timeoutSeconds < 1 {
|
|
|
|
return fmt.Errorf("Minimal timeout is 1sec. Timeout remains at %d", this.CutOverLockTimeoutSeconds)
|
|
|
|
}
|
|
|
|
if timeoutSeconds > 10 {
|
|
|
|
return fmt.Errorf("Maximal timeout is 10sec. Timeout remains at %d", this.CutOverLockTimeoutSeconds)
|
|
|
|
}
|
|
|
|
this.CutOverLockTimeoutSeconds = timeoutSeconds
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2016-06-19 15:55:37 +00:00
|
|
|
func (this *MigrationContext) SetDefaultNumRetries(retries int64) {
|
|
|
|
this.throttleMutex.Lock()
|
|
|
|
defer this.throttleMutex.Unlock()
|
|
|
|
if retries > 0 {
|
|
|
|
this.defaultNumRetries = retries
|
|
|
|
}
|
|
|
|
}
|
2016-07-08 08:14:58 +00:00
|
|
|
|
2016-06-19 15:55:37 +00:00
|
|
|
func (this *MigrationContext) MaxRetries() int64 {
|
|
|
|
this.throttleMutex.Lock()
|
|
|
|
defer this.throttleMutex.Unlock()
|
|
|
|
retries := this.defaultNumRetries
|
|
|
|
return retries
|
2016-04-07 13:57:12 +00:00
|
|
|
}
|
2016-04-08 08:34:44 +00:00
|
|
|
|
|
|
|
func (this *MigrationContext) IsTransactionalTable() bool {
|
|
|
|
switch strings.ToLower(this.TableEngine) {
|
|
|
|
case "innodb":
|
|
|
|
{
|
|
|
|
return true
|
|
|
|
}
|
|
|
|
case "tokudb":
|
|
|
|
{
|
|
|
|
return true
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
|
|
|
|
// ElapsedTime returns time since very beginning of the process
|
|
|
|
func (this *MigrationContext) ElapsedTime() time.Duration {
|
2016-08-02 12:38:56 +00:00
|
|
|
return time.Since(this.StartTime)
|
2016-04-08 08:34:44 +00:00
|
|
|
}
|
|
|
|
|
2016-07-29 08:40:23 +00:00
|
|
|
// MarkRowCopyStartTime
|
|
|
|
func (this *MigrationContext) MarkRowCopyStartTime() {
|
|
|
|
this.throttleMutex.Lock()
|
|
|
|
defer this.throttleMutex.Unlock()
|
|
|
|
this.RowCopyStartTime = time.Now()
|
|
|
|
}
|
|
|
|
|
2016-04-08 08:34:44 +00:00
|
|
|
// ElapsedRowCopyTime returns time since starting to copy chunks of rows
|
|
|
|
func (this *MigrationContext) ElapsedRowCopyTime() time.Duration {
|
2016-06-19 15:55:37 +00:00
|
|
|
this.throttleMutex.Lock()
|
|
|
|
defer this.throttleMutex.Unlock()
|
|
|
|
|
2016-07-29 08:40:23 +00:00
|
|
|
if this.RowCopyStartTime.IsZero() {
|
|
|
|
// Row copy hasn't started yet
|
|
|
|
return 0
|
|
|
|
}
|
|
|
|
|
2016-06-19 15:55:37 +00:00
|
|
|
if this.RowCopyEndTime.IsZero() {
|
2016-08-02 12:38:56 +00:00
|
|
|
return time.Since(this.RowCopyStartTime)
|
2016-06-19 15:55:37 +00:00
|
|
|
}
|
|
|
|
return this.RowCopyEndTime.Sub(this.RowCopyStartTime)
|
|
|
|
}
|
|
|
|
|
|
|
|
// ElapsedRowCopyTime returns time since starting to copy chunks of rows
|
|
|
|
func (this *MigrationContext) MarkRowCopyEndTime() {
|
|
|
|
this.throttleMutex.Lock()
|
|
|
|
defer this.throttleMutex.Unlock()
|
|
|
|
this.RowCopyEndTime = time.Now()
|
2016-04-08 08:34:44 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// GetTotalRowsCopied returns the accurate number of rows being copied (affected)
|
|
|
|
// This is not exactly the same as the rows being iterated via chunks, but potentially close enough
|
|
|
|
func (this *MigrationContext) GetTotalRowsCopied() int64 {
|
|
|
|
return atomic.LoadInt64(&this.TotalRowsCopied)
|
|
|
|
}
|
2016-04-08 12:35:06 +00:00
|
|
|
|
|
|
|
func (this *MigrationContext) GetIteration() int64 {
|
|
|
|
return atomic.LoadInt64(&this.Iteration)
|
|
|
|
}
|
|
|
|
|
2016-05-23 12:58:53 +00:00
|
|
|
func (this *MigrationContext) MarkPointOfInterest() int64 {
|
|
|
|
this.pointOfInterestTimeMutex.Lock()
|
|
|
|
defer this.pointOfInterestTimeMutex.Unlock()
|
|
|
|
|
|
|
|
this.pointOfInterestTime = time.Now()
|
|
|
|
return atomic.LoadInt64(&this.Iteration)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (this *MigrationContext) TimeSincePointOfInterest() time.Duration {
|
|
|
|
this.pointOfInterestTimeMutex.Lock()
|
|
|
|
defer this.pointOfInterestTimeMutex.Unlock()
|
|
|
|
|
2016-08-02 12:38:56 +00:00
|
|
|
return time.Since(this.pointOfInterestTime)
|
2016-05-23 12:58:53 +00:00
|
|
|
}
|
|
|
|
|
2016-08-30 07:41:59 +00:00
|
|
|
func (this *MigrationContext) SetHeartbeatIntervalMilliseconds(heartbeatIntervalMilliseconds int64) {
|
|
|
|
if heartbeatIntervalMilliseconds < 100 {
|
|
|
|
heartbeatIntervalMilliseconds = 100
|
|
|
|
}
|
|
|
|
if heartbeatIntervalMilliseconds > 1000 {
|
|
|
|
heartbeatIntervalMilliseconds = 1000
|
|
|
|
}
|
|
|
|
this.HeartbeatIntervalMilliseconds = heartbeatIntervalMilliseconds
|
|
|
|
}
|
|
|
|
|
2016-07-13 07:44:00 +00:00
|
|
|
func (this *MigrationContext) SetMaxLagMillisecondsThrottleThreshold(maxLagMillisecondsThrottleThreshold int64) {
|
2016-08-26 23:44:40 +00:00
|
|
|
if maxLagMillisecondsThrottleThreshold < 100 {
|
|
|
|
maxLagMillisecondsThrottleThreshold = 100
|
2016-07-13 07:44:00 +00:00
|
|
|
}
|
|
|
|
atomic.StoreInt64(&this.MaxLagMillisecondsThrottleThreshold, maxLagMillisecondsThrottleThreshold)
|
|
|
|
}
|
|
|
|
|
2016-06-07 09:59:17 +00:00
|
|
|
func (this *MigrationContext) SetChunkSize(chunkSize int64) {
|
|
|
|
if chunkSize < 100 {
|
|
|
|
chunkSize = 100
|
|
|
|
}
|
|
|
|
if chunkSize > 100000 {
|
|
|
|
chunkSize = 100000
|
|
|
|
}
|
|
|
|
atomic.StoreInt64(&this.ChunkSize, chunkSize)
|
|
|
|
}
|
|
|
|
|
2016-08-30 09:32:17 +00:00
|
|
|
func (this *MigrationContext) SetThrottleGeneralCheckResult(checkResult *ThrottleCheckResult) *ThrottleCheckResult {
|
|
|
|
this.throttleMutex.Lock()
|
|
|
|
defer this.throttleMutex.Unlock()
|
|
|
|
this.throttleGeneralCheckResult = *checkResult
|
|
|
|
return checkResult
|
|
|
|
}
|
|
|
|
|
|
|
|
func (this *MigrationContext) GetThrottleGeneralCheckResult() *ThrottleCheckResult {
|
|
|
|
this.throttleMutex.Lock()
|
|
|
|
defer this.throttleMutex.Unlock()
|
|
|
|
result := this.throttleGeneralCheckResult
|
|
|
|
return &result
|
|
|
|
}
|
|
|
|
|
2016-04-11 15:27:16 +00:00
|
|
|
func (this *MigrationContext) SetThrottled(throttle bool, reason string) {
|
|
|
|
this.throttleMutex.Lock()
|
2016-05-23 12:58:53 +00:00
|
|
|
defer this.throttleMutex.Unlock()
|
2016-04-11 15:27:16 +00:00
|
|
|
this.isThrottled = throttle
|
|
|
|
this.throttleReason = reason
|
2016-04-08 12:35:06 +00:00
|
|
|
}
|
|
|
|
|
2016-04-11 15:27:16 +00:00
|
|
|
func (this *MigrationContext) IsThrottled() (bool, string) {
|
|
|
|
this.throttleMutex.Lock()
|
2016-05-23 12:58:53 +00:00
|
|
|
defer this.throttleMutex.Unlock()
|
2016-04-11 15:27:16 +00:00
|
|
|
return this.isThrottled, this.throttleReason
|
2016-04-08 12:35:06 +00:00
|
|
|
}
|
|
|
|
|
2016-07-26 12:14:25 +00:00
|
|
|
func (this *MigrationContext) GetReplicationLagQuery() string {
|
|
|
|
var query string
|
|
|
|
|
|
|
|
this.throttleMutex.Lock()
|
|
|
|
defer this.throttleMutex.Unlock()
|
|
|
|
|
|
|
|
query = this.replicationLagQuery
|
|
|
|
return query
|
|
|
|
}
|
|
|
|
|
|
|
|
func (this *MigrationContext) SetReplicationLagQuery(newQuery string) {
|
|
|
|
this.throttleMutex.Lock()
|
|
|
|
defer this.throttleMutex.Unlock()
|
|
|
|
|
|
|
|
this.replicationLagQuery = newQuery
|
|
|
|
}
|
|
|
|
|
2016-06-18 19:12:07 +00:00
|
|
|
func (this *MigrationContext) GetThrottleQuery() string {
|
|
|
|
var query string
|
2016-06-09 09:25:01 +00:00
|
|
|
|
2016-06-18 19:12:07 +00:00
|
|
|
this.throttleMutex.Lock()
|
|
|
|
defer this.throttleMutex.Unlock()
|
|
|
|
|
2016-07-26 12:14:25 +00:00
|
|
|
query = this.throttleQuery
|
2016-06-18 19:12:07 +00:00
|
|
|
return query
|
|
|
|
}
|
|
|
|
|
|
|
|
func (this *MigrationContext) SetThrottleQuery(newQuery string) {
|
|
|
|
this.throttleMutex.Lock()
|
|
|
|
defer this.throttleMutex.Unlock()
|
|
|
|
|
2016-07-26 12:14:25 +00:00
|
|
|
this.throttleQuery = newQuery
|
2016-06-18 19:12:07 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func (this *MigrationContext) GetMaxLoad() LoadMap {
|
|
|
|
this.throttleMutex.Lock()
|
|
|
|
defer this.throttleMutex.Unlock()
|
|
|
|
|
|
|
|
return this.maxLoad.Duplicate()
|
|
|
|
}
|
|
|
|
|
|
|
|
func (this *MigrationContext) GetCriticalLoad() LoadMap {
|
|
|
|
this.throttleMutex.Lock()
|
|
|
|
defer this.throttleMutex.Unlock()
|
|
|
|
|
|
|
|
return this.criticalLoad.Duplicate()
|
2016-06-09 09:25:01 +00:00
|
|
|
}
|
|
|
|
|
2016-07-28 12:37:17 +00:00
|
|
|
func (this *MigrationContext) GetNiceRatio() float64 {
|
|
|
|
this.throttleMutex.Lock()
|
|
|
|
defer this.throttleMutex.Unlock()
|
|
|
|
|
|
|
|
return this.niceRatio
|
|
|
|
}
|
|
|
|
|
|
|
|
func (this *MigrationContext) SetNiceRatio(newRatio float64) {
|
|
|
|
if newRatio < 0.0 {
|
|
|
|
newRatio = 0.0
|
|
|
|
}
|
|
|
|
if newRatio > 100.0 {
|
|
|
|
newRatio = 100.0
|
|
|
|
}
|
|
|
|
|
|
|
|
this.throttleMutex.Lock()
|
|
|
|
defer this.throttleMutex.Unlock()
|
|
|
|
this.niceRatio = newRatio
|
|
|
|
}
|
|
|
|
|
2016-05-17 13:35:44 +00:00
|
|
|
// ReadMaxLoad parses the `--max-load` flag, which is in multiple key-value format,
|
|
|
|
// such as: 'Threads_running=100,Threads_connected=500'
|
2016-06-09 09:25:01 +00:00
|
|
|
// It only applies changes in case there's no parsing error.
|
2016-04-08 12:35:06 +00:00
|
|
|
func (this *MigrationContext) ReadMaxLoad(maxLoadList string) error {
|
2016-06-18 19:12:07 +00:00
|
|
|
loadMap, err := ParseLoadMap(maxLoadList)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
2016-04-08 12:35:06 +00:00
|
|
|
}
|
2016-06-18 19:12:07 +00:00
|
|
|
this.throttleMutex.Lock()
|
|
|
|
defer this.throttleMutex.Unlock()
|
2016-06-09 09:25:01 +00:00
|
|
|
|
2016-06-18 19:12:07 +00:00
|
|
|
this.maxLoad = loadMap
|
|
|
|
return nil
|
|
|
|
}
|
2016-06-09 09:25:01 +00:00
|
|
|
|
2016-06-18 19:12:07 +00:00
|
|
|
// ReadMaxLoad parses the `--max-load` flag, which is in multiple key-value format,
|
|
|
|
// such as: 'Threads_running=100,Threads_connected=500'
|
|
|
|
// It only applies changes in case there's no parsing error.
|
|
|
|
func (this *MigrationContext) ReadCriticalLoad(criticalLoadList string) error {
|
|
|
|
loadMap, err := ParseLoadMap(criticalLoadList)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
2016-04-08 12:35:06 +00:00
|
|
|
}
|
2016-06-18 19:12:07 +00:00
|
|
|
this.throttleMutex.Lock()
|
|
|
|
defer this.throttleMutex.Unlock()
|
2016-06-09 09:25:01 +00:00
|
|
|
|
2016-06-18 19:12:07 +00:00
|
|
|
this.criticalLoad = loadMap
|
2016-04-08 12:35:06 +00:00
|
|
|
return nil
|
|
|
|
}
|
2016-05-03 07:28:48 +00:00
|
|
|
|
2016-08-30 07:41:59 +00:00
|
|
|
func (this *MigrationContext) GetControlReplicasLagResult() mysql.ReplicationLagResult {
|
|
|
|
this.throttleMutex.Lock()
|
|
|
|
defer this.throttleMutex.Unlock()
|
|
|
|
|
|
|
|
lagResult := this.controlReplicasLagResult
|
|
|
|
return lagResult
|
|
|
|
}
|
|
|
|
|
|
|
|
func (this *MigrationContext) SetControlReplicasLagResult(lagResult *mysql.ReplicationLagResult) {
|
|
|
|
this.throttleMutex.Lock()
|
|
|
|
defer this.throttleMutex.Unlock()
|
|
|
|
this.controlReplicasLagResult = *lagResult
|
|
|
|
}
|
|
|
|
|
2016-06-20 10:09:04 +00:00
|
|
|
func (this *MigrationContext) GetThrottleControlReplicaKeys() *mysql.InstanceKeyMap {
|
|
|
|
this.throttleMutex.Lock()
|
|
|
|
defer this.throttleMutex.Unlock()
|
|
|
|
|
|
|
|
keys := mysql.NewInstanceKeyMap()
|
2016-07-26 12:14:25 +00:00
|
|
|
keys.AddKeys(this.throttleControlReplicaKeys.GetInstanceKeys())
|
2016-06-20 10:09:04 +00:00
|
|
|
return keys
|
|
|
|
}
|
|
|
|
|
|
|
|
func (this *MigrationContext) ReadThrottleControlReplicaKeys(throttleControlReplicas string) error {
|
|
|
|
keys := mysql.NewInstanceKeyMap()
|
|
|
|
if err := keys.ReadCommaDelimitedList(throttleControlReplicas); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
this.throttleMutex.Lock()
|
|
|
|
defer this.throttleMutex.Unlock()
|
|
|
|
|
2016-07-26 12:14:25 +00:00
|
|
|
this.throttleControlReplicaKeys = keys
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (this *MigrationContext) AddThrottleControlReplicaKey(key mysql.InstanceKey) error {
|
|
|
|
this.throttleMutex.Lock()
|
|
|
|
defer this.throttleMutex.Unlock()
|
|
|
|
|
|
|
|
this.throttleControlReplicaKeys.AddKey(key)
|
2016-06-20 10:09:04 +00:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2016-05-17 13:35:44 +00:00
|
|
|
// ApplyCredentials sorts out the credentials between the config file and the CLI flags
|
|
|
|
func (this *MigrationContext) ApplyCredentials() {
|
|
|
|
this.configMutex.Lock()
|
|
|
|
defer this.configMutex.Unlock()
|
|
|
|
|
|
|
|
if this.config.Client.User != "" {
|
|
|
|
this.InspectorConnectionConfig.User = this.config.Client.User
|
|
|
|
}
|
|
|
|
if this.CliUser != "" {
|
|
|
|
// Override
|
|
|
|
this.InspectorConnectionConfig.User = this.CliUser
|
|
|
|
}
|
|
|
|
if this.config.Client.Password != "" {
|
|
|
|
this.InspectorConnectionConfig.Password = this.config.Client.Password
|
|
|
|
}
|
|
|
|
if this.CliPassword != "" {
|
|
|
|
// Override
|
|
|
|
this.InspectorConnectionConfig.Password = this.CliPassword
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// ReadConfigFile attempts to read the config file, if it exists
|
2016-05-03 07:28:48 +00:00
|
|
|
func (this *MigrationContext) ReadConfigFile() error {
|
2016-05-17 13:35:44 +00:00
|
|
|
this.configMutex.Lock()
|
|
|
|
defer this.configMutex.Unlock()
|
|
|
|
|
2016-05-03 07:28:48 +00:00
|
|
|
if this.ConfigFile == "" {
|
|
|
|
return nil
|
|
|
|
}
|
2016-07-25 13:46:37 +00:00
|
|
|
gcfg.RelaxedParserMode = true
|
2016-08-18 11:58:38 +00:00
|
|
|
gcfgscanner.RelaxedScannerMode = true
|
2016-05-17 13:35:44 +00:00
|
|
|
if err := gcfg.ReadFileInto(&this.config, this.ConfigFile); err != nil {
|
2016-05-03 07:28:48 +00:00
|
|
|
return err
|
|
|
|
}
|
2016-07-25 13:46:37 +00:00
|
|
|
|
|
|
|
// We accept user & password in the form "${SOME_ENV_VARIABLE}" in which case we pull
|
|
|
|
// the given variable from os env
|
|
|
|
if submatch := envVariableRegexp.FindStringSubmatch(this.config.Client.User); len(submatch) > 1 {
|
|
|
|
this.config.Client.User = os.Getenv(submatch[1])
|
|
|
|
}
|
|
|
|
if submatch := envVariableRegexp.FindStringSubmatch(this.config.Client.Password); len(submatch) > 1 {
|
|
|
|
this.config.Client.Password = os.Getenv(submatch[1])
|
|
|
|
}
|
|
|
|
|
2016-05-03 07:28:48 +00:00
|
|
|
return nil
|
|
|
|
}
|