From 1c6f8280917496b4167d539d59a4cd333f91a2af Mon Sep 17 00:00:00 2001 From: Shlomi Noach Date: Mon, 12 Sep 2016 12:38:14 +0200 Subject: [PATCH] refactored server command into server.go - added support for cut-over= - refactored more code into context --- build.sh | 2 +- go/base/context.go | 3 + go/logic/migrator.go | 165 +++------------------------------------- go/logic/server.go | 172 ++++++++++++++++++++++++++++++++++++++++-- go/logic/throttler.go | 8 +- 5 files changed, 185 insertions(+), 165 deletions(-) diff --git a/build.sh b/build.sh index ff48aaa..2c98df8 100755 --- a/build.sh +++ b/build.sh @@ -2,7 +2,7 @@ # # -RELEASE_VERSION="1.0.18" +RELEASE_VERSION="1.0.19" function build { osname=$1 diff --git a/go/base/context.go b/go/base/context.go index 9f114fd..d1f2d10 100644 --- a/go/base/context.go +++ b/go/base/context.go @@ -140,6 +140,8 @@ type MigrationContext struct { CountingRowsFlag int64 AllEventsUpToLockProcessedInjectedFlag int64 CleanupImminentFlag int64 + UserCommandedUnpostponeFlag int64 + PanicAbort chan error OriginalTableColumns *sql.ColumnList OriginalTableUniqueKeys [](*sql.UniqueKey) @@ -192,6 +194,7 @@ func newMigrationContext() *MigrationContext { configMutex: &sync.Mutex{}, pointOfInterestTimeMutex: &sync.Mutex{}, ColumnRenameMap: make(map[string]string), + PanicAbort: make(chan error), } } diff --git a/go/logic/migrator.go b/go/logic/migrator.go index d416dcf..9e7a93b 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -6,14 +6,11 @@ package logic import ( - "bufio" "fmt" "io" "math" "os" "os/signal" - "strconv" - "strings" "sync/atomic" "syscall" "time" @@ -42,7 +39,8 @@ const ( type PrintStatusRule int const ( - HeuristicPrintStatusRule PrintStatusRule = iota + NoPrintStatusRule PrintStatusRule = iota + HeuristicPrintStatusRule = iota ForcePrintStatusRule = iota ForcePrintStatusOnlyRule = iota ForcePrintStatusAndHintRule = iota @@ -63,11 +61,9 @@ type Migrator struct { tablesInPlace chan bool rowCopyComplete chan bool allEventsUpToLockProcessed chan bool - panicAbort chan error rowCopyCompleteFlag int64 inCutOverCriticalActionFlag int64 - userCommandedUnpostponeFlag int64 // copyRowsQueue should not be buffered; if buffered some non-damaging but // excessive work happens at the end of the iteration as new copy-jobs arrive befroe realizing the copy is complete copyRowsQueue chan tableWriteFunc @@ -84,7 +80,6 @@ func NewMigrator() *Migrator { firstThrottlingCollected: make(chan bool, 1), rowCopyComplete: make(chan bool), allEventsUpToLockProcessed: make(chan bool), - panicAbort: make(chan error), copyRowsQueue: make(chan tableWriteFunc), applyEventsQueue: make(chan tableWriteFunc, applyEventsQueueBuffer), @@ -148,7 +143,7 @@ func (this *Migrator) retryOperation(operation func() error, notFatalHint ...boo // there's an error. Let's try again. } if len(notFatalHint) == 0 { - this.panicAbort <- err + this.migrationContext.PanicAbort <- err } return err } @@ -217,7 +212,7 @@ func (this *Migrator) onChangelogStateEvent(dmlEvent *binlog.BinlogDMLEvent) (er // listenOnPanicAbort aborts on abort request func (this *Migrator) listenOnPanicAbort() { - err := <-this.panicAbort + err := <-this.migrationContext.PanicAbort log.Fatale(err) } @@ -385,7 +380,7 @@ func (this *Migrator) cutOver() (err error) { if this.migrationContext.PostponeCutOverFlagFile == "" { return false, nil } - if atomic.LoadInt64(&this.userCommandedUnpostponeFlag) > 0 { + if atomic.LoadInt64(&this.migrationContext.UserCommandedUnpostponeFlag) > 0 { return false, nil } if base.FileExists(this.migrationContext.PostponeCutOverFlagFile) { @@ -584,150 +579,12 @@ func (this *Migrator) atomicCutOver() (err error) { return nil } -// onServerCommand responds to a user's interactive command -func (this *Migrator) onServerCommand(command string, writer *bufio.Writer) (err error) { - defer writer.Flush() - - tokens := strings.SplitN(command, "=", 2) - command = strings.TrimSpace(tokens[0]) - arg := "" - if len(tokens) > 1 { - arg = strings.TrimSpace(tokens[1]) - } - - throttleHint := "# Note: you may only throttle for as long as your binary logs are not purged\n" - - if err := this.hooksExecutor.onInteractiveCommand(command); err != nil { - return err - } - - switch command { - case "help": - { - fmt.Fprintln(writer, `available commands: -status # Print a detailed status message -sup # Print a short status message -chunk-size= # Set a new chunk-size -nice-ratio= # Set a new nice-ratio, immediate sleep after each row-copy operation, float (examples: 0 is agrressive, 0.7 adds 70% runtime, 1.0 doubles runtime, 2.0 triples runtime, ...) -critical-load= # Set a new set of max-load thresholds -max-lag-millis= # Set a new replication lag threshold -replication-lag-query= # Set a new query that determines replication lag (no quotes) -max-load= # Set a new set of max-load thresholds -throttle-query= # Set a new throttle-query (no quotes) -throttle-control-replicas= # Set a new comma delimited list of throttle control replicas -throttle # Force throttling -no-throttle # End forced throttling (other throttling may still apply) -unpostpone # Bail out a cut-over postpone; proceed to cut-over -panic # panic and quit without cleanup -help # This message -`) - } - case "sup": - this.printStatus(ForcePrintStatusOnlyRule, writer) - case "info", "status": - this.printStatus(ForcePrintStatusAndHintRule, writer) - case "chunk-size": - { - if chunkSize, err := strconv.Atoi(arg); err != nil { - fmt.Fprintf(writer, "%s\n", err.Error()) - return log.Errore(err) - } else { - this.migrationContext.SetChunkSize(int64(chunkSize)) - this.printStatus(ForcePrintStatusAndHintRule, writer) - } - } - case "max-lag-millis": - { - if maxLagMillis, err := strconv.Atoi(arg); err != nil { - fmt.Fprintf(writer, "%s\n", err.Error()) - return log.Errore(err) - } else { - this.migrationContext.SetMaxLagMillisecondsThrottleThreshold(int64(maxLagMillis)) - this.printStatus(ForcePrintStatusAndHintRule, writer) - } - } - case "replication-lag-query": - { - this.migrationContext.SetReplicationLagQuery(arg) - this.printStatus(ForcePrintStatusAndHintRule, writer) - } - case "nice-ratio": - { - if niceRatio, err := strconv.ParseFloat(arg, 64); err != nil { - fmt.Fprintf(writer, "%s\n", err.Error()) - return log.Errore(err) - } else { - this.migrationContext.SetNiceRatio(niceRatio) - this.printStatus(ForcePrintStatusAndHintRule, writer) - } - } - case "max-load": - { - if err := this.migrationContext.ReadMaxLoad(arg); err != nil { - fmt.Fprintf(writer, "%s\n", err.Error()) - return log.Errore(err) - } - this.printStatus(ForcePrintStatusAndHintRule, writer) - } - case "critical-load": - { - if err := this.migrationContext.ReadCriticalLoad(arg); err != nil { - fmt.Fprintf(writer, "%s\n", err.Error()) - return log.Errore(err) - } - this.printStatus(ForcePrintStatusAndHintRule, writer) - } - case "throttle-query": - { - this.migrationContext.SetThrottleQuery(arg) - fmt.Fprintf(writer, throttleHint) - this.printStatus(ForcePrintStatusAndHintRule, writer) - } - case "throttle-control-replicas": - { - if err := this.migrationContext.ReadThrottleControlReplicaKeys(arg); err != nil { - fmt.Fprintf(writer, "%s\n", err.Error()) - return log.Errore(err) - } - fmt.Fprintf(writer, "%s\n", this.migrationContext.GetThrottleControlReplicaKeys().ToCommaDelimitedList()) - this.printStatus(ForcePrintStatusAndHintRule, writer) - } - case "throttle", "pause", "suspend": - { - atomic.StoreInt64(&this.migrationContext.ThrottleCommandedByUser, 1) - fmt.Fprintf(writer, throttleHint) - this.printStatus(ForcePrintStatusAndHintRule, writer) - } - case "no-throttle", "unthrottle", "resume", "continue": - { - atomic.StoreInt64(&this.migrationContext.ThrottleCommandedByUser, 0) - } - case "unpostpone", "no-postpone", "cut-over": - { - if atomic.LoadInt64(&this.migrationContext.IsPostponingCutOver) > 0 { - atomic.StoreInt64(&this.userCommandedUnpostponeFlag, 1) - fmt.Fprintf(writer, "Unpostponed\n") - } else { - fmt.Fprintf(writer, "You may only invoke this when gh-ost is actively postponing migration. At this time it is not.\n") - } - } - case "panic": - { - err := fmt.Errorf("User commanded 'panic'. I will now panic, without cleanup. PANIC!") - fmt.Fprintf(writer, "%s\n", err.Error()) - this.panicAbort <- err - } - default: - err = fmt.Errorf("Unknown command: %s", command) - fmt.Fprintf(writer, "%s\n", err.Error()) - return err - } - return nil -} - // initiateServer begins listening on unix socket/tcp for incoming interactive commands func (this *Migrator) initiateServer() (err error) { - this.server = NewServer(this.onServerCommand) + var f printStatusFunc = func(rule PrintStatusRule, writer io.Writer) { + this.printStatus(rule, writer) + } + this.server = NewServer(this.hooksExecutor, f) if err := this.server.BindSocketFile(); err != nil { return err } @@ -1005,7 +862,7 @@ func (this *Migrator) initiateStreaming() error { log.Debugf("Beginning streaming") err := this.eventsStreamer.StreamEvents(this.canStopStreaming) if err != nil { - this.panicAbort <- err + this.migrationContext.PanicAbort <- err } log.Debugf("Done streaming") }() @@ -1033,7 +890,7 @@ func (this *Migrator) addDMLEventsListener() error { // initiateThrottler kicks in the throttling collection and the throttling checks. func (this *Migrator) initiateThrottler() error { - this.throttler = NewThrottler(this.applier, this.inspector, this.panicAbort) + this.throttler = NewThrottler(this.applier, this.inspector) go this.throttler.initiateThrottlerCollection(this.firstThrottlingCollected) log.Infof("Waiting for first throttle metrics to be collected") diff --git a/go/logic/server.go b/go/logic/server.go index 41668b3..64906c4 100644 --- a/go/logic/server.go +++ b/go/logic/server.go @@ -8,27 +8,33 @@ package logic import ( "bufio" "fmt" + "io" "net" "os" + "strconv" + "strings" + "sync/atomic" "github.com/github/gh-ost/go/base" "github.com/outbrain/golib/log" ) -type onCommandFunc func(command string, writer *bufio.Writer) error +type printStatusFunc func(PrintStatusRule, io.Writer) // Server listens for requests on a socket file or via TCP type Server struct { migrationContext *base.MigrationContext unixListener net.Listener tcpListener net.Listener - onCommand onCommandFunc + hooksExecutor *HooksExecutor + printStatus printStatusFunc } -func NewServer(onCommand onCommandFunc) *Server { +func NewServer(hooksExecutor *HooksExecutor, printStatus printStatusFunc) *Server { return &Server{ migrationContext: base.GetMigrationContext(), - onCommand: onCommand, + hooksExecutor: hooksExecutor, + printStatus: printStatus, } } @@ -94,5 +100,161 @@ func (this *Server) Serve() (err error) { func (this *Server) handleConnection(conn net.Conn) (err error) { defer conn.Close() command, _, err := bufio.NewReader(conn).ReadLine() - return this.onCommand(string(command), bufio.NewWriter(conn)) + return this.onServerCommand(string(command), bufio.NewWriter(conn)) +} + +// onServerCommand responds to a user's interactive command +func (this *Server) onServerCommand(command string, writer *bufio.Writer) (err error) { + defer writer.Flush() + + printStatusRule, err := this.applyServerCommand(command, writer) + if err == nil { + this.printStatus(printStatusRule, writer) + } else { + fmt.Fprintf(writer, "%s\n", err.Error()) + } + return log.Errore(err) +} + +// applyServerCommand parses and executes commands by user +func (this *Server) applyServerCommand(command string, writer *bufio.Writer) (printStatusRule PrintStatusRule, err error) { + printStatusRule = NoPrintStatusRule + + tokens := strings.SplitN(command, "=", 2) + command = strings.TrimSpace(tokens[0]) + arg := "" + if len(tokens) > 1 { + arg = strings.TrimSpace(tokens[1]) + } + + throttleHint := "# Note: you may only throttle for as long as your binary logs are not purged\n" + + if err := this.hooksExecutor.onInteractiveCommand(command); err != nil { + return NoPrintStatusRule, err + } + + switch command { + case "help": + { + fmt.Fprintln(writer, `available commands: +status # Print a detailed status message +sup # Print a short status message +chunk-size= # Set a new chunk-size +nice-ratio= # Set a new nice-ratio, immediate sleep after each row-copy operation, float (examples: 0 is agrressive, 0.7 adds 70% runtime, 1.0 doubles runtime, 2.0 triples runtime, ...) +critical-load= # Set a new set of max-load thresholds +max-lag-millis= # Set a new replication lag threshold +replication-lag-query= # Set a new query that determines replication lag (no quotes) +max-load= # Set a new set of max-load thresholds +throttle-query= # Set a new throttle-query (no quotes) +throttle-control-replicas= # Set a new comma delimited list of throttle control replicas +throttle # Force throttling +no-throttle # End forced throttling (other throttling may still apply) +unpostpone # Bail out a cut-over postpone; proceed to cut-over +panic # panic and quit without cleanup +help # This message +`) + } + case "sup": + return ForcePrintStatusOnlyRule, nil + case "info", "status": + return ForcePrintStatusAndHintRule, nil + case "chunk-size": + { + if chunkSize, err := strconv.Atoi(arg); err != nil { + return NoPrintStatusRule, err + } else { + this.migrationContext.SetChunkSize(int64(chunkSize)) + return ForcePrintStatusAndHintRule, nil + } + } + case "max-lag-millis": + { + if maxLagMillis, err := strconv.Atoi(arg); err != nil { + return NoPrintStatusRule, err + } else { + this.migrationContext.SetMaxLagMillisecondsThrottleThreshold(int64(maxLagMillis)) + return ForcePrintStatusAndHintRule, nil + } + } + case "replication-lag-query": + { + this.migrationContext.SetReplicationLagQuery(arg) + return ForcePrintStatusAndHintRule, nil + } + case "nice-ratio": + { + if niceRatio, err := strconv.ParseFloat(arg, 64); err != nil { + return NoPrintStatusRule, err + } else { + this.migrationContext.SetNiceRatio(niceRatio) + return ForcePrintStatusAndHintRule, nil + } + } + case "max-load": + { + if err := this.migrationContext.ReadMaxLoad(arg); err != nil { + return NoPrintStatusRule, err + } + return ForcePrintStatusAndHintRule, nil + } + case "critical-load": + { + if err := this.migrationContext.ReadCriticalLoad(arg); err != nil { + return NoPrintStatusRule, err + } + return ForcePrintStatusAndHintRule, nil + } + case "throttle-query": + { + this.migrationContext.SetThrottleQuery(arg) + fmt.Fprintf(writer, throttleHint) + return ForcePrintStatusAndHintRule, nil + } + case "throttle-control-replicas": + { + if err := this.migrationContext.ReadThrottleControlReplicaKeys(arg); err != nil { + return NoPrintStatusRule, err + } + fmt.Fprintf(writer, "%s\n", this.migrationContext.GetThrottleControlReplicaKeys().ToCommaDelimitedList()) + return ForcePrintStatusAndHintRule, nil + } + case "throttle", "pause", "suspend": + { + atomic.StoreInt64(&this.migrationContext.ThrottleCommandedByUser, 1) + fmt.Fprintf(writer, throttleHint) + return ForcePrintStatusAndHintRule, nil + } + case "no-throttle", "unthrottle", "resume", "continue": + { + atomic.StoreInt64(&this.migrationContext.ThrottleCommandedByUser, 0) + return ForcePrintStatusAndHintRule, nil + } + case "unpostpone", "no-postpone", "cut-over": + { + if arg != "" { + if arg != this.migrationContext.OriginalTableName { + // User exlpicitly provided table name. This is a courtesy protection mechanism + err := fmt.Errorf("User commanded 'unpostpone' on %s, but migrated table is %s; ingoring request.", arg, this.migrationContext.OriginalTableName) + return NoPrintStatusRule, err + } + } + if atomic.LoadInt64(&this.migrationContext.IsPostponingCutOver) > 0 { + atomic.StoreInt64(&this.migrationContext.UserCommandedUnpostponeFlag, 1) + fmt.Fprintf(writer, "Unpostponed\n") + return ForcePrintStatusAndHintRule, nil + } + fmt.Fprintf(writer, "You may only invoke this when gh-ost is actively postponing migration. At this time it is not.\n") + return NoPrintStatusRule, nil + } + case "panic": + { + err := fmt.Errorf("User commanded 'panic'. I will now panic, without cleanup. PANIC!") + this.migrationContext.PanicAbort <- err + return NoPrintStatusRule, err + } + default: + err = fmt.Errorf("Unknown command: %s", command) + return NoPrintStatusRule, err + } + return NoPrintStatusRule, nil } diff --git a/go/logic/throttler.go b/go/logic/throttler.go index 65e0157..08cf6c3 100644 --- a/go/logic/throttler.go +++ b/go/logic/throttler.go @@ -21,15 +21,13 @@ type Throttler struct { migrationContext *base.MigrationContext applier *Applier inspector *Inspector - panicAbort chan error } -func NewThrottler(applier *Applier, inspector *Inspector, panicAbort chan error) *Throttler { +func NewThrottler(applier *Applier, inspector *Inspector) *Throttler { return &Throttler{ migrationContext: base.GetMigrationContext(), applier: applier, inspector: inspector, - panicAbort: panicAbort, } } @@ -143,7 +141,7 @@ func (this *Throttler) collectGeneralThrottleMetrics() error { // Regardless of throttle, we take opportunity to check for panic-abort if this.migrationContext.PanicFlagFile != "" { if base.FileExists(this.migrationContext.PanicFlagFile) { - this.panicAbort <- fmt.Errorf("Found panic-file %s. Aborting without cleanup", this.migrationContext.PanicFlagFile) + this.migrationContext.PanicAbort <- fmt.Errorf("Found panic-file %s. Aborting without cleanup", this.migrationContext.PanicFlagFile) } } criticalLoad := this.migrationContext.GetCriticalLoad() @@ -153,7 +151,7 @@ func (this *Throttler) collectGeneralThrottleMetrics() error { return setThrottle(true, fmt.Sprintf("%s %s", variableName, err)) } if value >= threshold { - this.panicAbort <- fmt.Errorf("critical-load met: %s=%d, >=%d", variableName, value, threshold) + this.migrationContext.PanicAbort <- fmt.Errorf("critical-load met: %s=%d, >=%d", variableName, value, threshold) } }