Merge pull request #37 from github/postpone-swap-tables-flag-file
postpone-swap-tables-flag-file
This commit is contained in:
commit
fbfe0c71ff
@ -52,6 +52,7 @@ type MigrationContext struct {
|
|||||||
ThrottleFlagFile string
|
ThrottleFlagFile string
|
||||||
ThrottleAdditionalFlagFile string
|
ThrottleAdditionalFlagFile string
|
||||||
MaxLoad map[string]int64
|
MaxLoad map[string]int64
|
||||||
|
PostponeSwapTablesFlagFile string
|
||||||
SwapTablesTimeoutSeconds int64
|
SwapTablesTimeoutSeconds int64
|
||||||
|
|
||||||
Noop bool
|
Noop bool
|
||||||
|
@ -7,6 +7,7 @@ package base
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"os"
|
||||||
"regexp"
|
"regexp"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
@ -23,3 +24,10 @@ func PrettifyDurationOutput(d time.Duration) string {
|
|||||||
result = prettifyDurationRegexp.ReplaceAllString(result, "")
|
result = prettifyDurationRegexp.ReplaceAllString(result, "")
|
||||||
return result
|
return result
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func FileExists(fileName string) bool {
|
||||||
|
if _, err := os.Stat(fileName); err == nil {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
@ -53,6 +53,8 @@ func main() {
|
|||||||
throttleControlReplicas := flag.String("throttle-control-replicas", "", "List of replicas on which to check for lag; comma delimited. Example: myhost1.com:3306,myhost2.com,myhost3.com:3307")
|
throttleControlReplicas := flag.String("throttle-control-replicas", "", "List of replicas on which to check for lag; comma delimited. Example: myhost1.com:3306,myhost2.com,myhost3.com:3307")
|
||||||
flag.StringVar(&migrationContext.ThrottleFlagFile, "throttle-flag-file", "", "operation pauses when this file exists; hint: use a file that is specific to the table being altered")
|
flag.StringVar(&migrationContext.ThrottleFlagFile, "throttle-flag-file", "", "operation pauses when this file exists; hint: use a file that is specific to the table being altered")
|
||||||
flag.StringVar(&migrationContext.ThrottleAdditionalFlagFile, "throttle-additional-flag-file", "/tmp/gh-ost.throttle", "operation pauses when this file exists; hint: keep default, use for throttling multiple gh-ost operations")
|
flag.StringVar(&migrationContext.ThrottleAdditionalFlagFile, "throttle-additional-flag-file", "/tmp/gh-ost.throttle", "operation pauses when this file exists; hint: keep default, use for throttling multiple gh-ost operations")
|
||||||
|
flag.StringVar(&migrationContext.PostponeSwapTablesFlagFile, "postpone-swap-tables-flag-file", "", "while this file exists, migration will postpone the final stage of swapping tables, and will keep on syncing the ghost table. Swapping would be ready to perform the moment the file is deleted.")
|
||||||
|
|
||||||
maxLoad := flag.String("max-load", "", "Comma delimited status-name=threshold. e.g: 'Threads_running=100,Threads_connected=500'")
|
maxLoad := flag.String("max-load", "", "Comma delimited status-name=threshold. e.g: 'Threads_running=100,Threads_connected=500'")
|
||||||
quiet := flag.Bool("quiet", false, "quiet")
|
quiet := flag.Bool("quiet", false, "quiet")
|
||||||
verbose := flag.Bool("verbose", false, "verbose")
|
verbose := flag.Bool("verbose", false, "verbose")
|
||||||
@ -117,7 +119,7 @@ func main() {
|
|||||||
log.Fatale(err)
|
log.Fatale(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Info("starting gh-ost %+v", AppVersion)
|
log.Infof("starting gh-ost %+v", AppVersion)
|
||||||
|
|
||||||
migrator := logic.NewMigrator()
|
migrator := logic.NewMigrator()
|
||||||
err := migrator.Migrate()
|
err := migrator.Migrate()
|
||||||
|
@ -48,7 +48,8 @@ type Migrator struct {
|
|||||||
voluntaryLockAcquired chan bool
|
voluntaryLockAcquired chan bool
|
||||||
panicAbort chan error
|
panicAbort chan error
|
||||||
|
|
||||||
allEventsUpToLockProcessedFlag int64
|
rowCopyCompleteFlag int64
|
||||||
|
allEventsUpToLockProcessedInjectedFlag int64
|
||||||
// copyRowsQueue should not be buffered; if buffered some non-damaging but
|
// copyRowsQueue should not be buffered; if buffered some non-damaging but
|
||||||
// excessive work happens at the end of the iteration as new copy-jobs arrive befroe realizing the copy is complete
|
// excessive work happens at the end of the iteration as new copy-jobs arrive befroe realizing the copy is complete
|
||||||
copyRowsQueue chan tableWriteFunc
|
copyRowsQueue chan tableWriteFunc
|
||||||
@ -66,7 +67,7 @@ func NewMigrator() *Migrator {
|
|||||||
voluntaryLockAcquired: make(chan bool, 1),
|
voluntaryLockAcquired: make(chan bool, 1),
|
||||||
panicAbort: make(chan error),
|
panicAbort: make(chan error),
|
||||||
|
|
||||||
allEventsUpToLockProcessedFlag: 0,
|
allEventsUpToLockProcessedInjectedFlag: 0,
|
||||||
|
|
||||||
copyRowsQueue: make(chan tableWriteFunc),
|
copyRowsQueue: make(chan tableWriteFunc),
|
||||||
applyEventsQueue: make(chan tableWriteFunc, applyEventsQueueBuffer),
|
applyEventsQueue: make(chan tableWriteFunc, applyEventsQueueBuffer),
|
||||||
@ -93,13 +94,13 @@ func (this *Migrator) acceptSignals() {
|
|||||||
func (this *Migrator) shouldThrottle() (result bool, reason string) {
|
func (this *Migrator) shouldThrottle() (result bool, reason string) {
|
||||||
// User-based throttle
|
// User-based throttle
|
||||||
if this.migrationContext.ThrottleFlagFile != "" {
|
if this.migrationContext.ThrottleFlagFile != "" {
|
||||||
if _, err := os.Stat(this.migrationContext.ThrottleFlagFile); err == nil {
|
if base.FileExists(this.migrationContext.ThrottleFlagFile) {
|
||||||
// Throttle file defined and exists!
|
// Throttle file defined and exists!
|
||||||
return true, "flag-file"
|
return true, "flag-file"
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if this.migrationContext.ThrottleAdditionalFlagFile != "" {
|
if this.migrationContext.ThrottleAdditionalFlagFile != "" {
|
||||||
if _, err := os.Stat(this.migrationContext.ThrottleAdditionalFlagFile); err == nil {
|
if base.FileExists(this.migrationContext.ThrottleAdditionalFlagFile) {
|
||||||
// 2nd Throttle file defined and exists!
|
// 2nd Throttle file defined and exists!
|
||||||
return true, "flag-file"
|
return true, "flag-file"
|
||||||
}
|
}
|
||||||
@ -109,7 +110,7 @@ func (this *Migrator) shouldThrottle() (result bool, reason string) {
|
|||||||
if time.Duration(lag) > time.Duration(this.migrationContext.MaxLagMillisecondsThrottleThreshold)*time.Millisecond {
|
if time.Duration(lag) > time.Duration(this.migrationContext.MaxLagMillisecondsThrottleThreshold)*time.Millisecond {
|
||||||
return true, fmt.Sprintf("lag=%fs", time.Duration(lag).Seconds())
|
return true, fmt.Sprintf("lag=%fs", time.Duration(lag).Seconds())
|
||||||
}
|
}
|
||||||
if this.migrationContext.TestOnReplica && (atomic.LoadInt64(&this.allEventsUpToLockProcessedFlag) == 0) {
|
if this.migrationContext.TestOnReplica && (atomic.LoadInt64(&this.allEventsUpToLockProcessedInjectedFlag) == 0) {
|
||||||
replicationLag, err := mysql.GetMaxReplicationLag(this.migrationContext.InspectorConnectionConfig, this.migrationContext.ThrottleControlReplicaKeys, this.migrationContext.ReplictionLagQuery)
|
replicationLag, err := mysql.GetMaxReplicationLag(this.migrationContext.InspectorConnectionConfig, this.migrationContext.ThrottleControlReplicaKeys, this.migrationContext.ReplictionLagQuery)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return true, err.Error()
|
return true, err.Error()
|
||||||
@ -172,6 +173,21 @@ func (this *Migrator) throttle(onThrottled func()) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// sleepWhileTrue sleeps indefinitely until the given function returns 'false'
|
||||||
|
// (or fails with error)
|
||||||
|
func (this *Migrator) sleepWhileTrue(operation func() (bool, error)) error {
|
||||||
|
for {
|
||||||
|
shouldSleep, err := operation()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if !shouldSleep {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
time.Sleep(time.Second)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// retryOperation attempts up to `count` attempts at running given function,
|
// retryOperation attempts up to `count` attempts at running given function,
|
||||||
// exiting as soon as it returns with non-error.
|
// exiting as soon as it returns with non-error.
|
||||||
func (this *Migrator) retryOperation(operation func() error) (err error) {
|
func (this *Migrator) retryOperation(operation func() error) (err error) {
|
||||||
@ -205,6 +221,7 @@ func (this *Migrator) executeAndThrottleOnError(operation func() error) (err err
|
|||||||
// consumers and drops any further incoming events that may be left hanging.
|
// consumers and drops any further incoming events that may be left hanging.
|
||||||
func (this *Migrator) consumeRowCopyComplete() {
|
func (this *Migrator) consumeRowCopyComplete() {
|
||||||
<-this.rowCopyComplete
|
<-this.rowCopyComplete
|
||||||
|
atomic.StoreInt64(&this.rowCopyCompleteFlag, 1)
|
||||||
go func() {
|
go func() {
|
||||||
for <-this.rowCopyComplete {
|
for <-this.rowCopyComplete {
|
||||||
}
|
}
|
||||||
@ -330,6 +347,20 @@ func (this *Migrator) stopWritesAndCompleteMigration() (err error) {
|
|||||||
log.Debugf("throttling before swapping tables")
|
log.Debugf("throttling before swapping tables")
|
||||||
})
|
})
|
||||||
|
|
||||||
|
this.sleepWhileTrue(
|
||||||
|
func() (bool, error) {
|
||||||
|
if this.migrationContext.PostponeSwapTablesFlagFile == "" {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
if base.FileExists(this.migrationContext.PostponeSwapTablesFlagFile) {
|
||||||
|
// Throttle file defined and exists!
|
||||||
|
log.Debugf("Postponing final table swap as flag file exists: %+v", this.migrationContext.PostponeSwapTablesFlagFile)
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
return false, nil
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
if this.migrationContext.TestOnReplica {
|
if this.migrationContext.TestOnReplica {
|
||||||
return this.stopWritesAndCompleteMigrationOnReplica()
|
return this.stopWritesAndCompleteMigrationOnReplica()
|
||||||
}
|
}
|
||||||
@ -374,8 +405,8 @@ func (this *Migrator) waitForEventsUpToLock() (err error) {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
log.Debugf("Waiting for events up to lock")
|
log.Debugf("Waiting for events up to lock")
|
||||||
|
atomic.StoreInt64(&this.allEventsUpToLockProcessedInjectedFlag, 1)
|
||||||
<-this.allEventsUpToLockProcessed
|
<-this.allEventsUpToLockProcessed
|
||||||
atomic.StoreInt64(&this.allEventsUpToLockProcessedFlag, 1)
|
|
||||||
log.Debugf("Done waiting for events up to lock")
|
log.Debugf("Done waiting for events up to lock")
|
||||||
this.printStatus()
|
this.printStatus()
|
||||||
|
|
||||||
@ -687,6 +718,10 @@ func (this *Migrator) iterateChunks() error {
|
|||||||
return terminateRowIteration(nil)
|
return terminateRowIteration(nil)
|
||||||
}
|
}
|
||||||
for {
|
for {
|
||||||
|
if atomic.LoadInt64(&this.rowCopyCompleteFlag) == 1 {
|
||||||
|
// Done
|
||||||
|
return nil
|
||||||
|
}
|
||||||
copyRowsFunc := func() error {
|
copyRowsFunc := func() error {
|
||||||
hasFurtherRange, err := this.applier.CalculateNextIterationRangeEndValues()
|
hasFurtherRange, err := this.applier.CalculateNextIterationRangeEndValues()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
Loading…
Reference in New Issue
Block a user