refactored server command into server.go

- added support for cut-over=<tablename>
- refactored more code into context
This commit is contained in:
Shlomi Noach 2016-09-12 12:38:14 +02:00
parent c3e65d45e2
commit 1c6f828091
5 changed files with 185 additions and 165 deletions

View File

@ -2,7 +2,7 @@
#
#
RELEASE_VERSION="1.0.18"
RELEASE_VERSION="1.0.19"
function build {
osname=$1

View File

@ -140,6 +140,8 @@ type MigrationContext struct {
CountingRowsFlag int64
AllEventsUpToLockProcessedInjectedFlag int64
CleanupImminentFlag int64
UserCommandedUnpostponeFlag int64
PanicAbort chan error
OriginalTableColumns *sql.ColumnList
OriginalTableUniqueKeys [](*sql.UniqueKey)
@ -192,6 +194,7 @@ func newMigrationContext() *MigrationContext {
configMutex: &sync.Mutex{},
pointOfInterestTimeMutex: &sync.Mutex{},
ColumnRenameMap: make(map[string]string),
PanicAbort: make(chan error),
}
}

View File

@ -6,14 +6,11 @@
package logic
import (
"bufio"
"fmt"
"io"
"math"
"os"
"os/signal"
"strconv"
"strings"
"sync/atomic"
"syscall"
"time"
@ -42,7 +39,8 @@ const (
type PrintStatusRule int
const (
HeuristicPrintStatusRule PrintStatusRule = iota
NoPrintStatusRule PrintStatusRule = iota
HeuristicPrintStatusRule = iota
ForcePrintStatusRule = iota
ForcePrintStatusOnlyRule = iota
ForcePrintStatusAndHintRule = iota
@ -63,11 +61,9 @@ type Migrator struct {
tablesInPlace chan bool
rowCopyComplete chan bool
allEventsUpToLockProcessed chan bool
panicAbort chan error
rowCopyCompleteFlag int64
inCutOverCriticalActionFlag int64
userCommandedUnpostponeFlag int64
// copyRowsQueue should not be buffered; if buffered some non-damaging but
// excessive work happens at the end of the iteration as new copy-jobs arrive befroe realizing the copy is complete
copyRowsQueue chan tableWriteFunc
@ -84,7 +80,6 @@ func NewMigrator() *Migrator {
firstThrottlingCollected: make(chan bool, 1),
rowCopyComplete: make(chan bool),
allEventsUpToLockProcessed: make(chan bool),
panicAbort: make(chan error),
copyRowsQueue: make(chan tableWriteFunc),
applyEventsQueue: make(chan tableWriteFunc, applyEventsQueueBuffer),
@ -148,7 +143,7 @@ func (this *Migrator) retryOperation(operation func() error, notFatalHint ...boo
// there's an error. Let's try again.
}
if len(notFatalHint) == 0 {
this.panicAbort <- err
this.migrationContext.PanicAbort <- err
}
return err
}
@ -217,7 +212,7 @@ func (this *Migrator) onChangelogStateEvent(dmlEvent *binlog.BinlogDMLEvent) (er
// listenOnPanicAbort aborts on abort request
func (this *Migrator) listenOnPanicAbort() {
err := <-this.panicAbort
err := <-this.migrationContext.PanicAbort
log.Fatale(err)
}
@ -385,7 +380,7 @@ func (this *Migrator) cutOver() (err error) {
if this.migrationContext.PostponeCutOverFlagFile == "" {
return false, nil
}
if atomic.LoadInt64(&this.userCommandedUnpostponeFlag) > 0 {
if atomic.LoadInt64(&this.migrationContext.UserCommandedUnpostponeFlag) > 0 {
return false, nil
}
if base.FileExists(this.migrationContext.PostponeCutOverFlagFile) {
@ -584,150 +579,12 @@ func (this *Migrator) atomicCutOver() (err error) {
return nil
}
// onServerCommand responds to a user's interactive command
func (this *Migrator) onServerCommand(command string, writer *bufio.Writer) (err error) {
defer writer.Flush()
tokens := strings.SplitN(command, "=", 2)
command = strings.TrimSpace(tokens[0])
arg := ""
if len(tokens) > 1 {
arg = strings.TrimSpace(tokens[1])
}
throttleHint := "# Note: you may only throttle for as long as your binary logs are not purged\n"
if err := this.hooksExecutor.onInteractiveCommand(command); err != nil {
return err
}
switch command {
case "help":
{
fmt.Fprintln(writer, `available commands:
status # Print a detailed status message
sup # Print a short status message
chunk-size=<newsize> # Set a new chunk-size
nice-ratio=<ratio> # Set a new nice-ratio, immediate sleep after each row-copy operation, float (examples: 0 is agrressive, 0.7 adds 70% runtime, 1.0 doubles runtime, 2.0 triples runtime, ...)
critical-load=<load> # Set a new set of max-load thresholds
max-lag-millis=<max-lag> # Set a new replication lag threshold
replication-lag-query=<query> # Set a new query that determines replication lag (no quotes)
max-load=<load> # Set a new set of max-load thresholds
throttle-query=<query> # Set a new throttle-query (no quotes)
throttle-control-replicas=<replicas> # Set a new comma delimited list of throttle control replicas
throttle # Force throttling
no-throttle # End forced throttling (other throttling may still apply)
unpostpone # Bail out a cut-over postpone; proceed to cut-over
panic # panic and quit without cleanup
help # This message
`)
}
case "sup":
this.printStatus(ForcePrintStatusOnlyRule, writer)
case "info", "status":
this.printStatus(ForcePrintStatusAndHintRule, writer)
case "chunk-size":
{
if chunkSize, err := strconv.Atoi(arg); err != nil {
fmt.Fprintf(writer, "%s\n", err.Error())
return log.Errore(err)
} else {
this.migrationContext.SetChunkSize(int64(chunkSize))
this.printStatus(ForcePrintStatusAndHintRule, writer)
}
}
case "max-lag-millis":
{
if maxLagMillis, err := strconv.Atoi(arg); err != nil {
fmt.Fprintf(writer, "%s\n", err.Error())
return log.Errore(err)
} else {
this.migrationContext.SetMaxLagMillisecondsThrottleThreshold(int64(maxLagMillis))
this.printStatus(ForcePrintStatusAndHintRule, writer)
}
}
case "replication-lag-query":
{
this.migrationContext.SetReplicationLagQuery(arg)
this.printStatus(ForcePrintStatusAndHintRule, writer)
}
case "nice-ratio":
{
if niceRatio, err := strconv.ParseFloat(arg, 64); err != nil {
fmt.Fprintf(writer, "%s\n", err.Error())
return log.Errore(err)
} else {
this.migrationContext.SetNiceRatio(niceRatio)
this.printStatus(ForcePrintStatusAndHintRule, writer)
}
}
case "max-load":
{
if err := this.migrationContext.ReadMaxLoad(arg); err != nil {
fmt.Fprintf(writer, "%s\n", err.Error())
return log.Errore(err)
}
this.printStatus(ForcePrintStatusAndHintRule, writer)
}
case "critical-load":
{
if err := this.migrationContext.ReadCriticalLoad(arg); err != nil {
fmt.Fprintf(writer, "%s\n", err.Error())
return log.Errore(err)
}
this.printStatus(ForcePrintStatusAndHintRule, writer)
}
case "throttle-query":
{
this.migrationContext.SetThrottleQuery(arg)
fmt.Fprintf(writer, throttleHint)
this.printStatus(ForcePrintStatusAndHintRule, writer)
}
case "throttle-control-replicas":
{
if err := this.migrationContext.ReadThrottleControlReplicaKeys(arg); err != nil {
fmt.Fprintf(writer, "%s\n", err.Error())
return log.Errore(err)
}
fmt.Fprintf(writer, "%s\n", this.migrationContext.GetThrottleControlReplicaKeys().ToCommaDelimitedList())
this.printStatus(ForcePrintStatusAndHintRule, writer)
}
case "throttle", "pause", "suspend":
{
atomic.StoreInt64(&this.migrationContext.ThrottleCommandedByUser, 1)
fmt.Fprintf(writer, throttleHint)
this.printStatus(ForcePrintStatusAndHintRule, writer)
}
case "no-throttle", "unthrottle", "resume", "continue":
{
atomic.StoreInt64(&this.migrationContext.ThrottleCommandedByUser, 0)
}
case "unpostpone", "no-postpone", "cut-over":
{
if atomic.LoadInt64(&this.migrationContext.IsPostponingCutOver) > 0 {
atomic.StoreInt64(&this.userCommandedUnpostponeFlag, 1)
fmt.Fprintf(writer, "Unpostponed\n")
} else {
fmt.Fprintf(writer, "You may only invoke this when gh-ost is actively postponing migration. At this time it is not.\n")
}
}
case "panic":
{
err := fmt.Errorf("User commanded 'panic'. I will now panic, without cleanup. PANIC!")
fmt.Fprintf(writer, "%s\n", err.Error())
this.panicAbort <- err
}
default:
err = fmt.Errorf("Unknown command: %s", command)
fmt.Fprintf(writer, "%s\n", err.Error())
return err
}
return nil
}
// initiateServer begins listening on unix socket/tcp for incoming interactive commands
func (this *Migrator) initiateServer() (err error) {
this.server = NewServer(this.onServerCommand)
var f printStatusFunc = func(rule PrintStatusRule, writer io.Writer) {
this.printStatus(rule, writer)
}
this.server = NewServer(this.hooksExecutor, f)
if err := this.server.BindSocketFile(); err != nil {
return err
}
@ -1005,7 +862,7 @@ func (this *Migrator) initiateStreaming() error {
log.Debugf("Beginning streaming")
err := this.eventsStreamer.StreamEvents(this.canStopStreaming)
if err != nil {
this.panicAbort <- err
this.migrationContext.PanicAbort <- err
}
log.Debugf("Done streaming")
}()
@ -1033,7 +890,7 @@ func (this *Migrator) addDMLEventsListener() error {
// initiateThrottler kicks in the throttling collection and the throttling checks.
func (this *Migrator) initiateThrottler() error {
this.throttler = NewThrottler(this.applier, this.inspector, this.panicAbort)
this.throttler = NewThrottler(this.applier, this.inspector)
go this.throttler.initiateThrottlerCollection(this.firstThrottlingCollected)
log.Infof("Waiting for first throttle metrics to be collected")

View File

@ -8,27 +8,33 @@ package logic
import (
"bufio"
"fmt"
"io"
"net"
"os"
"strconv"
"strings"
"sync/atomic"
"github.com/github/gh-ost/go/base"
"github.com/outbrain/golib/log"
)
type onCommandFunc func(command string, writer *bufio.Writer) error
type printStatusFunc func(PrintStatusRule, io.Writer)
// Server listens for requests on a socket file or via TCP
type Server struct {
migrationContext *base.MigrationContext
unixListener net.Listener
tcpListener net.Listener
onCommand onCommandFunc
hooksExecutor *HooksExecutor
printStatus printStatusFunc
}
func NewServer(onCommand onCommandFunc) *Server {
func NewServer(hooksExecutor *HooksExecutor, printStatus printStatusFunc) *Server {
return &Server{
migrationContext: base.GetMigrationContext(),
onCommand: onCommand,
hooksExecutor: hooksExecutor,
printStatus: printStatus,
}
}
@ -94,5 +100,161 @@ func (this *Server) Serve() (err error) {
func (this *Server) handleConnection(conn net.Conn) (err error) {
defer conn.Close()
command, _, err := bufio.NewReader(conn).ReadLine()
return this.onCommand(string(command), bufio.NewWriter(conn))
return this.onServerCommand(string(command), bufio.NewWriter(conn))
}
// onServerCommand responds to a user's interactive command
func (this *Server) onServerCommand(command string, writer *bufio.Writer) (err error) {
defer writer.Flush()
printStatusRule, err := this.applyServerCommand(command, writer)
if err == nil {
this.printStatus(printStatusRule, writer)
} else {
fmt.Fprintf(writer, "%s\n", err.Error())
}
return log.Errore(err)
}
// applyServerCommand parses and executes commands by user
func (this *Server) applyServerCommand(command string, writer *bufio.Writer) (printStatusRule PrintStatusRule, err error) {
printStatusRule = NoPrintStatusRule
tokens := strings.SplitN(command, "=", 2)
command = strings.TrimSpace(tokens[0])
arg := ""
if len(tokens) > 1 {
arg = strings.TrimSpace(tokens[1])
}
throttleHint := "# Note: you may only throttle for as long as your binary logs are not purged\n"
if err := this.hooksExecutor.onInteractiveCommand(command); err != nil {
return NoPrintStatusRule, err
}
switch command {
case "help":
{
fmt.Fprintln(writer, `available commands:
status # Print a detailed status message
sup # Print a short status message
chunk-size=<newsize> # Set a new chunk-size
nice-ratio=<ratio> # Set a new nice-ratio, immediate sleep after each row-copy operation, float (examples: 0 is agrressive, 0.7 adds 70% runtime, 1.0 doubles runtime, 2.0 triples runtime, ...)
critical-load=<load> # Set a new set of max-load thresholds
max-lag-millis=<max-lag> # Set a new replication lag threshold
replication-lag-query=<query> # Set a new query that determines replication lag (no quotes)
max-load=<load> # Set a new set of max-load thresholds
throttle-query=<query> # Set a new throttle-query (no quotes)
throttle-control-replicas=<replicas> # Set a new comma delimited list of throttle control replicas
throttle # Force throttling
no-throttle # End forced throttling (other throttling may still apply)
unpostpone # Bail out a cut-over postpone; proceed to cut-over
panic # panic and quit without cleanup
help # This message
`)
}
case "sup":
return ForcePrintStatusOnlyRule, nil
case "info", "status":
return ForcePrintStatusAndHintRule, nil
case "chunk-size":
{
if chunkSize, err := strconv.Atoi(arg); err != nil {
return NoPrintStatusRule, err
} else {
this.migrationContext.SetChunkSize(int64(chunkSize))
return ForcePrintStatusAndHintRule, nil
}
}
case "max-lag-millis":
{
if maxLagMillis, err := strconv.Atoi(arg); err != nil {
return NoPrintStatusRule, err
} else {
this.migrationContext.SetMaxLagMillisecondsThrottleThreshold(int64(maxLagMillis))
return ForcePrintStatusAndHintRule, nil
}
}
case "replication-lag-query":
{
this.migrationContext.SetReplicationLagQuery(arg)
return ForcePrintStatusAndHintRule, nil
}
case "nice-ratio":
{
if niceRatio, err := strconv.ParseFloat(arg, 64); err != nil {
return NoPrintStatusRule, err
} else {
this.migrationContext.SetNiceRatio(niceRatio)
return ForcePrintStatusAndHintRule, nil
}
}
case "max-load":
{
if err := this.migrationContext.ReadMaxLoad(arg); err != nil {
return NoPrintStatusRule, err
}
return ForcePrintStatusAndHintRule, nil
}
case "critical-load":
{
if err := this.migrationContext.ReadCriticalLoad(arg); err != nil {
return NoPrintStatusRule, err
}
return ForcePrintStatusAndHintRule, nil
}
case "throttle-query":
{
this.migrationContext.SetThrottleQuery(arg)
fmt.Fprintf(writer, throttleHint)
return ForcePrintStatusAndHintRule, nil
}
case "throttle-control-replicas":
{
if err := this.migrationContext.ReadThrottleControlReplicaKeys(arg); err != nil {
return NoPrintStatusRule, err
}
fmt.Fprintf(writer, "%s\n", this.migrationContext.GetThrottleControlReplicaKeys().ToCommaDelimitedList())
return ForcePrintStatusAndHintRule, nil
}
case "throttle", "pause", "suspend":
{
atomic.StoreInt64(&this.migrationContext.ThrottleCommandedByUser, 1)
fmt.Fprintf(writer, throttleHint)
return ForcePrintStatusAndHintRule, nil
}
case "no-throttle", "unthrottle", "resume", "continue":
{
atomic.StoreInt64(&this.migrationContext.ThrottleCommandedByUser, 0)
return ForcePrintStatusAndHintRule, nil
}
case "unpostpone", "no-postpone", "cut-over":
{
if arg != "" {
if arg != this.migrationContext.OriginalTableName {
// User exlpicitly provided table name. This is a courtesy protection mechanism
err := fmt.Errorf("User commanded 'unpostpone' on %s, but migrated table is %s; ingoring request.", arg, this.migrationContext.OriginalTableName)
return NoPrintStatusRule, err
}
}
if atomic.LoadInt64(&this.migrationContext.IsPostponingCutOver) > 0 {
atomic.StoreInt64(&this.migrationContext.UserCommandedUnpostponeFlag, 1)
fmt.Fprintf(writer, "Unpostponed\n")
return ForcePrintStatusAndHintRule, nil
}
fmt.Fprintf(writer, "You may only invoke this when gh-ost is actively postponing migration. At this time it is not.\n")
return NoPrintStatusRule, nil
}
case "panic":
{
err := fmt.Errorf("User commanded 'panic'. I will now panic, without cleanup. PANIC!")
this.migrationContext.PanicAbort <- err
return NoPrintStatusRule, err
}
default:
err = fmt.Errorf("Unknown command: %s", command)
return NoPrintStatusRule, err
}
return NoPrintStatusRule, nil
}

View File

@ -21,15 +21,13 @@ type Throttler struct {
migrationContext *base.MigrationContext
applier *Applier
inspector *Inspector
panicAbort chan error
}
func NewThrottler(applier *Applier, inspector *Inspector, panicAbort chan error) *Throttler {
func NewThrottler(applier *Applier, inspector *Inspector) *Throttler {
return &Throttler{
migrationContext: base.GetMigrationContext(),
applier: applier,
inspector: inspector,
panicAbort: panicAbort,
}
}
@ -143,7 +141,7 @@ func (this *Throttler) collectGeneralThrottleMetrics() error {
// Regardless of throttle, we take opportunity to check for panic-abort
if this.migrationContext.PanicFlagFile != "" {
if base.FileExists(this.migrationContext.PanicFlagFile) {
this.panicAbort <- fmt.Errorf("Found panic-file %s. Aborting without cleanup", this.migrationContext.PanicFlagFile)
this.migrationContext.PanicAbort <- fmt.Errorf("Found panic-file %s. Aborting without cleanup", this.migrationContext.PanicFlagFile)
}
}
criticalLoad := this.migrationContext.GetCriticalLoad()
@ -153,7 +151,7 @@ func (this *Throttler) collectGeneralThrottleMetrics() error {
return setThrottle(true, fmt.Sprintf("%s %s", variableName, err))
}
if value >= threshold {
this.panicAbort <- fmt.Errorf("critical-load met: %s=%d, >=%d", variableName, value, threshold)
this.migrationContext.PanicAbort <- fmt.Errorf("critical-load met: %s=%d, >=%d", variableName, value, threshold)
}
}