Merge pull request #231 from github/named-cut-over

Named cut over
This commit is contained in:
Shlomi Noach 2016-09-13 08:23:00 -07:00 committed by GitHub
commit eac6a726de
6 changed files with 192 additions and 165 deletions

View File

@ -2,7 +2,7 @@
#
#
RELEASE_VERSION="1.0.18"
RELEASE_VERSION="1.0.20"
function build {
osname=$1

View File

@ -92,6 +92,7 @@ type MigrationContext struct {
criticalLoad LoadMap
PostponeCutOverFlagFile string
CutOverLockTimeoutSeconds int64
ForceNamedCutOverCommand bool
PanicFlagFile string
HooksPath string
HooksHintMessage string
@ -140,6 +141,8 @@ type MigrationContext struct {
CountingRowsFlag int64
AllEventsUpToLockProcessedInjectedFlag int64
CleanupImminentFlag int64
UserCommandedUnpostponeFlag int64
PanicAbort chan error
OriginalTableColumns *sql.ColumnList
OriginalTableUniqueKeys [](*sql.UniqueKey)
@ -192,6 +195,7 @@ func newMigrationContext() *MigrationContext {
configMutex: &sync.Mutex{},
pointOfInterestTimeMutex: &sync.Mutex{},
ColumnRenameMap: make(map[string]string),
PanicAbort: make(chan error),
}
}

View File

@ -71,6 +71,7 @@ func main() {
flag.BoolVar(&migrationContext.InitiallyDropOldTable, "initially-drop-old-table", false, "Drop a possibly existing OLD table (remains from a previous run?) before beginning operation. Default is to panic and abort if such table exists")
flag.BoolVar(&migrationContext.InitiallyDropGhostTable, "initially-drop-ghost-table", false, "Drop a possibly existing Ghost table (remains from a previous run?) before beginning operation. Default is to panic and abort if such table exists")
cutOver := flag.String("cut-over", "atomic", "choose cut-over type (default|atomic, two-step)")
flag.BoolVar(&migrationContext.ForceNamedCutOverCommand, "force-named-cut-over", false, "When true, the 'unpostpone|cut-over' interactive command must name the migrated table")
flag.BoolVar(&migrationContext.SwitchToRowBinlogFormat, "switch-to-rbr", false, "let this tool automatically switch binary log format to 'ROW' on the replica, if needed. The format will NOT be switched back. I'm too scared to do that, and wish to protect you if you happen to execute another migration while this one is running")
flag.BoolVar(&migrationContext.AssumeRBR, "assume-rbr", false, "set to 'true' when you know for certain your server uses 'ROW' binlog_format. gh-ost is unable to tell, event after reading binlog_format, whether the replication process does indeed use 'ROW', and restarts replication to be certain RBR setting is applied. Such operation requires SUPER privileges which you might not have. Setting this flag avoids restarting replication and you can proceed to use gh-ost without SUPER privileges")

View File

@ -6,14 +6,11 @@
package logic
import (
"bufio"
"fmt"
"io"
"math"
"os"
"os/signal"
"strconv"
"strings"
"sync/atomic"
"syscall"
"time"
@ -42,7 +39,8 @@ const (
type PrintStatusRule int
const (
HeuristicPrintStatusRule PrintStatusRule = iota
NoPrintStatusRule PrintStatusRule = iota
HeuristicPrintStatusRule = iota
ForcePrintStatusRule = iota
ForcePrintStatusOnlyRule = iota
ForcePrintStatusAndHintRule = iota
@ -63,11 +61,9 @@ type Migrator struct {
tablesInPlace chan bool
rowCopyComplete chan bool
allEventsUpToLockProcessed chan bool
panicAbort chan error
rowCopyCompleteFlag int64
inCutOverCriticalActionFlag int64
userCommandedUnpostponeFlag int64
// copyRowsQueue should not be buffered; if buffered some non-damaging but
// excessive work happens at the end of the iteration as new copy-jobs arrive befroe realizing the copy is complete
copyRowsQueue chan tableWriteFunc
@ -84,7 +80,6 @@ func NewMigrator() *Migrator {
firstThrottlingCollected: make(chan bool, 1),
rowCopyComplete: make(chan bool),
allEventsUpToLockProcessed: make(chan bool),
panicAbort: make(chan error),
copyRowsQueue: make(chan tableWriteFunc),
applyEventsQueue: make(chan tableWriteFunc, applyEventsQueueBuffer),
@ -148,7 +143,7 @@ func (this *Migrator) retryOperation(operation func() error, notFatalHint ...boo
// there's an error. Let's try again.
}
if len(notFatalHint) == 0 {
this.panicAbort <- err
this.migrationContext.PanicAbort <- err
}
return err
}
@ -217,7 +212,7 @@ func (this *Migrator) onChangelogStateEvent(dmlEvent *binlog.BinlogDMLEvent) (er
// listenOnPanicAbort aborts on abort request
func (this *Migrator) listenOnPanicAbort() {
err := <-this.panicAbort
err := <-this.migrationContext.PanicAbort
log.Fatale(err)
}
@ -385,7 +380,7 @@ func (this *Migrator) cutOver() (err error) {
if this.migrationContext.PostponeCutOverFlagFile == "" {
return false, nil
}
if atomic.LoadInt64(&this.userCommandedUnpostponeFlag) > 0 {
if atomic.LoadInt64(&this.migrationContext.UserCommandedUnpostponeFlag) > 0 {
return false, nil
}
if base.FileExists(this.migrationContext.PostponeCutOverFlagFile) {
@ -584,150 +579,12 @@ func (this *Migrator) atomicCutOver() (err error) {
return nil
}
// onServerCommand responds to a user's interactive command
func (this *Migrator) onServerCommand(command string, writer *bufio.Writer) (err error) {
defer writer.Flush()
tokens := strings.SplitN(command, "=", 2)
command = strings.TrimSpace(tokens[0])
arg := ""
if len(tokens) > 1 {
arg = strings.TrimSpace(tokens[1])
}
throttleHint := "# Note: you may only throttle for as long as your binary logs are not purged\n"
if err := this.hooksExecutor.onInteractiveCommand(command); err != nil {
return err
}
switch command {
case "help":
{
fmt.Fprintln(writer, `available commands:
status # Print a detailed status message
sup # Print a short status message
chunk-size=<newsize> # Set a new chunk-size
nice-ratio=<ratio> # Set a new nice-ratio, immediate sleep after each row-copy operation, float (examples: 0 is agrressive, 0.7 adds 70% runtime, 1.0 doubles runtime, 2.0 triples runtime, ...)
critical-load=<load> # Set a new set of max-load thresholds
max-lag-millis=<max-lag> # Set a new replication lag threshold
replication-lag-query=<query> # Set a new query that determines replication lag (no quotes)
max-load=<load> # Set a new set of max-load thresholds
throttle-query=<query> # Set a new throttle-query (no quotes)
throttle-control-replicas=<replicas> # Set a new comma delimited list of throttle control replicas
throttle # Force throttling
no-throttle # End forced throttling (other throttling may still apply)
unpostpone # Bail out a cut-over postpone; proceed to cut-over
panic # panic and quit without cleanup
help # This message
`)
}
case "sup":
this.printStatus(ForcePrintStatusOnlyRule, writer)
case "info", "status":
this.printStatus(ForcePrintStatusAndHintRule, writer)
case "chunk-size":
{
if chunkSize, err := strconv.Atoi(arg); err != nil {
fmt.Fprintf(writer, "%s\n", err.Error())
return log.Errore(err)
} else {
this.migrationContext.SetChunkSize(int64(chunkSize))
this.printStatus(ForcePrintStatusAndHintRule, writer)
}
}
case "max-lag-millis":
{
if maxLagMillis, err := strconv.Atoi(arg); err != nil {
fmt.Fprintf(writer, "%s\n", err.Error())
return log.Errore(err)
} else {
this.migrationContext.SetMaxLagMillisecondsThrottleThreshold(int64(maxLagMillis))
this.printStatus(ForcePrintStatusAndHintRule, writer)
}
}
case "replication-lag-query":
{
this.migrationContext.SetReplicationLagQuery(arg)
this.printStatus(ForcePrintStatusAndHintRule, writer)
}
case "nice-ratio":
{
if niceRatio, err := strconv.ParseFloat(arg, 64); err != nil {
fmt.Fprintf(writer, "%s\n", err.Error())
return log.Errore(err)
} else {
this.migrationContext.SetNiceRatio(niceRatio)
this.printStatus(ForcePrintStatusAndHintRule, writer)
}
}
case "max-load":
{
if err := this.migrationContext.ReadMaxLoad(arg); err != nil {
fmt.Fprintf(writer, "%s\n", err.Error())
return log.Errore(err)
}
this.printStatus(ForcePrintStatusAndHintRule, writer)
}
case "critical-load":
{
if err := this.migrationContext.ReadCriticalLoad(arg); err != nil {
fmt.Fprintf(writer, "%s\n", err.Error())
return log.Errore(err)
}
this.printStatus(ForcePrintStatusAndHintRule, writer)
}
case "throttle-query":
{
this.migrationContext.SetThrottleQuery(arg)
fmt.Fprintf(writer, throttleHint)
this.printStatus(ForcePrintStatusAndHintRule, writer)
}
case "throttle-control-replicas":
{
if err := this.migrationContext.ReadThrottleControlReplicaKeys(arg); err != nil {
fmt.Fprintf(writer, "%s\n", err.Error())
return log.Errore(err)
}
fmt.Fprintf(writer, "%s\n", this.migrationContext.GetThrottleControlReplicaKeys().ToCommaDelimitedList())
this.printStatus(ForcePrintStatusAndHintRule, writer)
}
case "throttle", "pause", "suspend":
{
atomic.StoreInt64(&this.migrationContext.ThrottleCommandedByUser, 1)
fmt.Fprintf(writer, throttleHint)
this.printStatus(ForcePrintStatusAndHintRule, writer)
}
case "no-throttle", "unthrottle", "resume", "continue":
{
atomic.StoreInt64(&this.migrationContext.ThrottleCommandedByUser, 0)
}
case "unpostpone", "no-postpone", "cut-over":
{
if atomic.LoadInt64(&this.migrationContext.IsPostponingCutOver) > 0 {
atomic.StoreInt64(&this.userCommandedUnpostponeFlag, 1)
fmt.Fprintf(writer, "Unpostponed\n")
} else {
fmt.Fprintf(writer, "You may only invoke this when gh-ost is actively postponing migration. At this time it is not.\n")
}
}
case "panic":
{
err := fmt.Errorf("User commanded 'panic'. I will now panic, without cleanup. PANIC!")
fmt.Fprintf(writer, "%s\n", err.Error())
this.panicAbort <- err
}
default:
err = fmt.Errorf("Unknown command: %s", command)
fmt.Fprintf(writer, "%s\n", err.Error())
return err
}
return nil
}
// initiateServer begins listening on unix socket/tcp for incoming interactive commands
func (this *Migrator) initiateServer() (err error) {
this.server = NewServer(this.onServerCommand)
var f printStatusFunc = func(rule PrintStatusRule, writer io.Writer) {
this.printStatus(rule, writer)
}
this.server = NewServer(this.hooksExecutor, f)
if err := this.server.BindSocketFile(); err != nil {
return err
}
@ -887,6 +744,9 @@ func (this *Migrator) printMigrationStatusHint(writers ...io.Writer) {
// By default the status is written to standard output, but other writers can
// be used as well.
func (this *Migrator) printStatus(rule PrintStatusRule, writers ...io.Writer) {
if rule == NoPrintStatusRule {
return
}
writers = append(writers, os.Stdout)
elapsedTime := this.migrationContext.ElapsedTime()
@ -1007,7 +867,7 @@ func (this *Migrator) initiateStreaming() error {
log.Debugf("Beginning streaming")
err := this.eventsStreamer.StreamEvents(this.canStopStreaming)
if err != nil {
this.panicAbort <- err
this.migrationContext.PanicAbort <- err
}
log.Debugf("Done streaming")
}()
@ -1035,7 +895,7 @@ func (this *Migrator) addDMLEventsListener() error {
// initiateThrottler kicks in the throttling collection and the throttling checks.
func (this *Migrator) initiateThrottler() error {
this.throttler = NewThrottler(this.applier, this.inspector, this.panicAbort)
this.throttler = NewThrottler(this.applier, this.inspector)
go this.throttler.initiateThrottlerCollection(this.firstThrottlingCollected)
log.Infof("Waiting for first throttle metrics to be collected")

View File

@ -8,27 +8,33 @@ package logic
import (
"bufio"
"fmt"
"io"
"net"
"os"
"strconv"
"strings"
"sync/atomic"
"github.com/github/gh-ost/go/base"
"github.com/outbrain/golib/log"
)
type onCommandFunc func(command string, writer *bufio.Writer) error
type printStatusFunc func(PrintStatusRule, io.Writer)
// Server listens for requests on a socket file or via TCP
type Server struct {
migrationContext *base.MigrationContext
unixListener net.Listener
tcpListener net.Listener
onCommand onCommandFunc
hooksExecutor *HooksExecutor
printStatus printStatusFunc
}
func NewServer(onCommand onCommandFunc) *Server {
func NewServer(hooksExecutor *HooksExecutor, printStatus printStatusFunc) *Server {
return &Server{
migrationContext: base.GetMigrationContext(),
onCommand: onCommand,
hooksExecutor: hooksExecutor,
printStatus: printStatus,
}
}
@ -94,5 +100,163 @@ func (this *Server) Serve() (err error) {
func (this *Server) handleConnection(conn net.Conn) (err error) {
defer conn.Close()
command, _, err := bufio.NewReader(conn).ReadLine()
return this.onCommand(string(command), bufio.NewWriter(conn))
return this.onServerCommand(string(command), bufio.NewWriter(conn))
}
// onServerCommand responds to a user's interactive command
func (this *Server) onServerCommand(command string, writer *bufio.Writer) (err error) {
defer writer.Flush()
printStatusRule, err := this.applyServerCommand(command, writer)
if err == nil {
this.printStatus(printStatusRule, writer)
} else {
fmt.Fprintf(writer, "%s\n", err.Error())
}
return log.Errore(err)
}
// applyServerCommand parses and executes commands by user
func (this *Server) applyServerCommand(command string, writer *bufio.Writer) (printStatusRule PrintStatusRule, err error) {
printStatusRule = NoPrintStatusRule
tokens := strings.SplitN(command, "=", 2)
command = strings.TrimSpace(tokens[0])
arg := ""
if len(tokens) > 1 {
arg = strings.TrimSpace(tokens[1])
}
throttleHint := "# Note: you may only throttle for as long as your binary logs are not purged\n"
if err := this.hooksExecutor.onInteractiveCommand(command); err != nil {
return NoPrintStatusRule, err
}
switch command {
case "help":
{
fmt.Fprintln(writer, `available commands:
status # Print a detailed status message
sup # Print a short status message
chunk-size=<newsize> # Set a new chunk-size
nice-ratio=<ratio> # Set a new nice-ratio, immediate sleep after each row-copy operation, float (examples: 0 is agrressive, 0.7 adds 70% runtime, 1.0 doubles runtime, 2.0 triples runtime, ...)
critical-load=<load> # Set a new set of max-load thresholds
max-lag-millis=<max-lag> # Set a new replication lag threshold
replication-lag-query=<query> # Set a new query that determines replication lag (no quotes)
max-load=<load> # Set a new set of max-load thresholds
throttle-query=<query> # Set a new throttle-query (no quotes)
throttle-control-replicas=<replicas> # Set a new comma delimited list of throttle control replicas
throttle # Force throttling
no-throttle # End forced throttling (other throttling may still apply)
unpostpone # Bail out a cut-over postpone; proceed to cut-over
panic # panic and quit without cleanup
help # This message
`)
}
case "sup":
return ForcePrintStatusOnlyRule, nil
case "info", "status":
return ForcePrintStatusAndHintRule, nil
case "chunk-size":
{
if chunkSize, err := strconv.Atoi(arg); err != nil {
return NoPrintStatusRule, err
} else {
this.migrationContext.SetChunkSize(int64(chunkSize))
return ForcePrintStatusAndHintRule, nil
}
}
case "max-lag-millis":
{
if maxLagMillis, err := strconv.Atoi(arg); err != nil {
return NoPrintStatusRule, err
} else {
this.migrationContext.SetMaxLagMillisecondsThrottleThreshold(int64(maxLagMillis))
return ForcePrintStatusAndHintRule, nil
}
}
case "replication-lag-query":
{
this.migrationContext.SetReplicationLagQuery(arg)
return ForcePrintStatusAndHintRule, nil
}
case "nice-ratio":
{
if niceRatio, err := strconv.ParseFloat(arg, 64); err != nil {
return NoPrintStatusRule, err
} else {
this.migrationContext.SetNiceRatio(niceRatio)
return ForcePrintStatusAndHintRule, nil
}
}
case "max-load":
{
if err := this.migrationContext.ReadMaxLoad(arg); err != nil {
return NoPrintStatusRule, err
}
return ForcePrintStatusAndHintRule, nil
}
case "critical-load":
{
if err := this.migrationContext.ReadCriticalLoad(arg); err != nil {
return NoPrintStatusRule, err
}
return ForcePrintStatusAndHintRule, nil
}
case "throttle-query":
{
this.migrationContext.SetThrottleQuery(arg)
fmt.Fprintf(writer, throttleHint)
return ForcePrintStatusAndHintRule, nil
}
case "throttle-control-replicas":
{
if err := this.migrationContext.ReadThrottleControlReplicaKeys(arg); err != nil {
return NoPrintStatusRule, err
}
fmt.Fprintf(writer, "%s\n", this.migrationContext.GetThrottleControlReplicaKeys().ToCommaDelimitedList())
return ForcePrintStatusAndHintRule, nil
}
case "throttle", "pause", "suspend":
{
atomic.StoreInt64(&this.migrationContext.ThrottleCommandedByUser, 1)
fmt.Fprintf(writer, throttleHint)
return ForcePrintStatusAndHintRule, nil
}
case "no-throttle", "unthrottle", "resume", "continue":
{
atomic.StoreInt64(&this.migrationContext.ThrottleCommandedByUser, 0)
return ForcePrintStatusAndHintRule, nil
}
case "unpostpone", "no-postpone", "cut-over":
{
if arg == "" && this.migrationContext.ForceNamedCutOverCommand {
err := fmt.Errorf("User commanded 'unpostpone' without specifying table name, but --force-named-cut-over is set")
return NoPrintStatusRule, err
}
if arg != "" && arg != this.migrationContext.OriginalTableName {
// User exlpicitly provided table name. This is a courtesy protection mechanism
err := fmt.Errorf("User commanded 'unpostpone' on %s, but migrated table is %s; ingoring request.", arg, this.migrationContext.OriginalTableName)
return NoPrintStatusRule, err
}
if atomic.LoadInt64(&this.migrationContext.IsPostponingCutOver) > 0 {
atomic.StoreInt64(&this.migrationContext.UserCommandedUnpostponeFlag, 1)
fmt.Fprintf(writer, "Unpostponed\n")
return ForcePrintStatusAndHintRule, nil
}
fmt.Fprintf(writer, "You may only invoke this when gh-ost is actively postponing migration. At this time it is not.\n")
return NoPrintStatusRule, nil
}
case "panic":
{
err := fmt.Errorf("User commanded 'panic'. I will now panic, without cleanup. PANIC!")
this.migrationContext.PanicAbort <- err
return NoPrintStatusRule, err
}
default:
err = fmt.Errorf("Unknown command: %s", command)
return NoPrintStatusRule, err
}
return NoPrintStatusRule, nil
}

View File

@ -21,15 +21,13 @@ type Throttler struct {
migrationContext *base.MigrationContext
applier *Applier
inspector *Inspector
panicAbort chan error
}
func NewThrottler(applier *Applier, inspector *Inspector, panicAbort chan error) *Throttler {
func NewThrottler(applier *Applier, inspector *Inspector) *Throttler {
return &Throttler{
migrationContext: base.GetMigrationContext(),
applier: applier,
inspector: inspector,
panicAbort: panicAbort,
}
}
@ -143,7 +141,7 @@ func (this *Throttler) collectGeneralThrottleMetrics() error {
// Regardless of throttle, we take opportunity to check for panic-abort
if this.migrationContext.PanicFlagFile != "" {
if base.FileExists(this.migrationContext.PanicFlagFile) {
this.panicAbort <- fmt.Errorf("Found panic-file %s. Aborting without cleanup", this.migrationContext.PanicFlagFile)
this.migrationContext.PanicAbort <- fmt.Errorf("Found panic-file %s. Aborting without cleanup", this.migrationContext.PanicFlagFile)
}
}
criticalLoad := this.migrationContext.GetCriticalLoad()
@ -153,7 +151,7 @@ func (this *Throttler) collectGeneralThrottleMetrics() error {
return setThrottle(true, fmt.Sprintf("%s %s", variableName, err))
}
if value >= threshold {
this.panicAbort <- fmt.Errorf("critical-load met: %s=%d, >=%d", variableName, value, threshold)
this.migrationContext.PanicAbort <- fmt.Errorf("critical-load met: %s=%d, >=%d", variableName, value, threshold)
}
}