Merge pull request #90 from github/control-lock-wait-timeout

supporting --cut-over-lock-timeout-seconds
This commit is contained in:
Shlomi Noach 2016-07-08 10:16:40 +02:00 committed by GitHub
commit 6824447861
4 changed files with 27 additions and 11 deletions

View File

@ -1,7 +1,7 @@
#!/bin/bash #!/bin/bash
# #
# #
RELEASE_VERSION="0.9.9" RELEASE_VERSION="1.0.1"
buildpath=/tmp/gh-ost buildpath=/tmp/gh-ost
target=gh-ost target=gh-ost

View File

@ -69,7 +69,7 @@ type MigrationContext struct {
maxLoad LoadMap maxLoad LoadMap
criticalLoad LoadMap criticalLoad LoadMap
PostponeCutOverFlagFile string PostponeCutOverFlagFile string
SwapTablesTimeoutSeconds int64 CutOverLockTimeoutSeconds int64
PanicFlagFile string PanicFlagFile string
ServeSocketFile string ServeSocketFile string
@ -149,7 +149,7 @@ func newMigrationContext() *MigrationContext {
InspectorConnectionConfig: mysql.NewConnectionConfig(), InspectorConnectionConfig: mysql.NewConnectionConfig(),
ApplierConnectionConfig: mysql.NewConnectionConfig(), ApplierConnectionConfig: mysql.NewConnectionConfig(),
MaxLagMillisecondsThrottleThreshold: 1000, MaxLagMillisecondsThrottleThreshold: 1000,
SwapTablesTimeoutSeconds: 3, CutOverLockTimeoutSeconds: 3,
maxLoad: NewLoadMap(), maxLoad: NewLoadMap(),
criticalLoad: NewLoadMap(), criticalLoad: NewLoadMap(),
throttleMutex: &sync.Mutex{}, throttleMutex: &sync.Mutex{},
@ -210,6 +210,17 @@ func (this *MigrationContext) HasMigrationRange() bool {
return this.MigrationRangeMinValues != nil && this.MigrationRangeMaxValues != nil return this.MigrationRangeMinValues != nil && this.MigrationRangeMaxValues != nil
} }
func (this *MigrationContext) SetCutOverLockTimeoutSeconds(timeoutSeconds int64) error {
if timeoutSeconds < 1 {
return fmt.Errorf("Minimal timeout is 1sec. Timeout remains at %d", this.CutOverLockTimeoutSeconds)
}
if timeoutSeconds > 10 {
return fmt.Errorf("Maximal timeout is 10sec. Timeout remains at %d", this.CutOverLockTimeoutSeconds)
}
this.CutOverLockTimeoutSeconds = timeoutSeconds
return nil
}
func (this *MigrationContext) SetDefaultNumRetries(retries int64) { func (this *MigrationContext) SetDefaultNumRetries(retries int64) {
this.throttleMutex.Lock() this.throttleMutex.Lock()
defer this.throttleMutex.Unlock() defer this.throttleMutex.Unlock()
@ -217,6 +228,7 @@ func (this *MigrationContext) SetDefaultNumRetries(retries int64) {
this.defaultNumRetries = retries this.defaultNumRetries = retries
} }
} }
func (this *MigrationContext) MaxRetries() int64 { func (this *MigrationContext) MaxRetries() int64 {
this.throttleMutex.Lock() this.throttleMutex.Lock()
defer this.throttleMutex.Unlock() defer this.throttleMutex.Unlock()

View File

@ -71,6 +71,7 @@ func main() {
flag.BoolVar(&migrationContext.SwitchToRowBinlogFormat, "switch-to-rbr", false, "let this tool automatically switch binary log format to 'ROW' on the replica, if needed. The format will NOT be switched back. I'm too scared to do that, and wish to protect you if you happen to execute another migration while this one is running") flag.BoolVar(&migrationContext.SwitchToRowBinlogFormat, "switch-to-rbr", false, "let this tool automatically switch binary log format to 'ROW' on the replica, if needed. The format will NOT be switched back. I'm too scared to do that, and wish to protect you if you happen to execute another migration while this one is running")
chunkSize := flag.Int64("chunk-size", 1000, "amount of rows to handle in each iteration (allowed range: 100-100,000)") chunkSize := flag.Int64("chunk-size", 1000, "amount of rows to handle in each iteration (allowed range: 100-100,000)")
defaultRetries := flag.Int64("default-retries", 60, "Default number of retries for various operations before panicking") defaultRetries := flag.Int64("default-retries", 60, "Default number of retries for various operations before panicking")
cutOverLockTimeoutSeconds := flag.Int64("cut-over-lock-timeout-seconds", 3, "Max number of seconds to hold locks on tables while attempting to cut-over (retry attempted when lock exceeds timeout)")
flag.Int64Var(&migrationContext.NiceRatio, "nice-ratio", 0, "force being 'nice', imply sleep time per chunk time. Example values: 0 is aggressive. 3: for every ms spend in a rowcopy chunk, spend 3ms sleeping immediately after") flag.Int64Var(&migrationContext.NiceRatio, "nice-ratio", 0, "force being 'nice', imply sleep time per chunk time. Example values: 0 is aggressive. 3: for every ms spend in a rowcopy chunk, spend 3ms sleeping immediately after")
flag.Int64Var(&migrationContext.MaxLagMillisecondsThrottleThreshold, "max-lag-millis", 1500, "replication lag at which to throttle operation") flag.Int64Var(&migrationContext.MaxLagMillisecondsThrottleThreshold, "max-lag-millis", 1500, "replication lag at which to throttle operation")
@ -172,6 +173,9 @@ func main() {
migrationContext.SetChunkSize(*chunkSize) migrationContext.SetChunkSize(*chunkSize)
migrationContext.SetDefaultNumRetries(*defaultRetries) migrationContext.SetDefaultNumRetries(*defaultRetries)
migrationContext.ApplyCredentials() migrationContext.ApplyCredentials()
if err := migrationContext.SetCutOverLockTimeoutSeconds(*cutOverLockTimeoutSeconds); err != nil {
log.Errore(err)
}
log.Infof("starting gh-ost %+v", AppVersion) log.Infof("starting gh-ost %+v", AppVersion)
acceptSignals(migrationContext) acceptSignals(migrationContext)

View File

@ -632,7 +632,7 @@ func (this *Applier) LockOriginalTableAndWait(sessionIdChan chan int64, tableLoc
return err return err
} }
tableLockTimeoutSeconds := this.migrationContext.SwapTablesTimeoutSeconds * 2 tableLockTimeoutSeconds := this.migrationContext.CutOverLockTimeoutSeconds * 2
log.Infof("Setting LOCK timeout as %d seconds", tableLockTimeoutSeconds) log.Infof("Setting LOCK timeout as %d seconds", tableLockTimeoutSeconds)
query = fmt.Sprintf(`set session lock_wait_timeout:=%d`, tableLockTimeoutSeconds) query = fmt.Sprintf(`set session lock_wait_timeout:=%d`, tableLockTimeoutSeconds)
if _, err := tx.Exec(query); err != nil { if _, err := tx.Exec(query); err != nil {
@ -690,8 +690,8 @@ func (this *Applier) RenameOriginalTable(sessionIdChan chan int64, originalTable
} }
sessionIdChan <- sessionId sessionIdChan <- sessionId
log.Infof("Setting RENAME timeout as %d seconds", this.migrationContext.SwapTablesTimeoutSeconds) log.Infof("Setting RENAME timeout as %d seconds", this.migrationContext.CutOverLockTimeoutSeconds)
query := fmt.Sprintf(`set session lock_wait_timeout:=%d`, this.migrationContext.SwapTablesTimeoutSeconds) query := fmt.Sprintf(`set session lock_wait_timeout:=%d`, this.migrationContext.CutOverLockTimeoutSeconds)
if _, err := tx.Exec(query); err != nil { if _, err := tx.Exec(query); err != nil {
return err return err
} }
@ -725,8 +725,8 @@ func (this *Applier) RenameGhostTable(sessionIdChan chan int64, ghostTableRename
} }
sessionIdChan <- sessionId sessionIdChan <- sessionId
log.Infof("Setting RENAME timeout as %d seconds", this.migrationContext.SwapTablesTimeoutSeconds) log.Infof("Setting RENAME timeout as %d seconds", this.migrationContext.CutOverLockTimeoutSeconds)
query := fmt.Sprintf(`set session lock_wait_timeout:=%d`, this.migrationContext.SwapTablesTimeoutSeconds) query := fmt.Sprintf(`set session lock_wait_timeout:=%d`, this.migrationContext.CutOverLockTimeoutSeconds)
if _, err := tx.Exec(query); err != nil { if _, err := tx.Exec(query); err != nil {
return err return err
} }
@ -861,7 +861,7 @@ func (this *Applier) AtomicCutOverMagicLock(sessionIdChan chan int64, tableLocke
return err return err
} }
tableLockTimeoutSeconds := this.migrationContext.SwapTablesTimeoutSeconds * 2 tableLockTimeoutSeconds := this.migrationContext.CutOverLockTimeoutSeconds * 2
log.Infof("Setting LOCK timeout as %d seconds", tableLockTimeoutSeconds) log.Infof("Setting LOCK timeout as %d seconds", tableLockTimeoutSeconds)
query = fmt.Sprintf(`set session lock_wait_timeout:=%d`, tableLockTimeoutSeconds) query = fmt.Sprintf(`set session lock_wait_timeout:=%d`, tableLockTimeoutSeconds)
if _, err := tx.Exec(query); err != nil { if _, err := tx.Exec(query); err != nil {
@ -948,8 +948,8 @@ func (this *Applier) AtomicCutoverRename(sessionIdChan chan int64, tablesRenamed
} }
sessionIdChan <- sessionId sessionIdChan <- sessionId
log.Infof("Setting RENAME timeout as %d seconds", this.migrationContext.SwapTablesTimeoutSeconds) log.Infof("Setting RENAME timeout as %d seconds", this.migrationContext.CutOverLockTimeoutSeconds)
query := fmt.Sprintf(`set session lock_wait_timeout:=%d`, this.migrationContext.SwapTablesTimeoutSeconds) query := fmt.Sprintf(`set session lock_wait_timeout:=%d`, this.migrationContext.CutOverLockTimeoutSeconds)
if _, err := tx.Exec(query); err != nil { if _, err := tx.Exec(query); err != nil {
return err return err
} }