diff --git a/build.sh b/build.sh index aec6a30..92db66a 100644 --- a/build.sh +++ b/build.sh @@ -1,7 +1,7 @@ #!/bin/bash # # -RELEASE_VERSION="0.8.3" +RELEASE_VERSION="0.8.4" buildpath=/tmp/gh-ost target=gh-ost diff --git a/doc/interactive-commands.md b/doc/interactive-commands.md new file mode 100644 index 0000000..3282f5d --- /dev/null +++ b/doc/interactive-commands.md @@ -0,0 +1,59 @@ +# Interactive commands + +`gh-ost` is designed to be operations friendly. To that effect, it allows the user to control its behavior even while it is running. + +### Interactive interfaces + +`gh-ost` listens on: + +- Unix socket file: either provided via `--serve-socket-file` or determined by `gh-ost`, this interface is always up. + When self-determined, `gh-ost` will advertise the identify of socket file upon start up and throughout the migration. +- TCP: if `--serve-tcp-port` is provided + +Both interfaces may serve at the same time. Both respond to simple text command, which makes it easy to interact via shell. + +### Known commands + +- `help`: shows a brief list of available commands +- `status`: returns a status summary of migration progress and configuration +- `throttle`: force migration suspend +- `no-throttle`: cancel forced suspension (though other throttling reasons may still apply) +- `chunk-size=`: modify the `chunk-size`; applies on next running copy-iteration + +### Examples + +While migration is running: + +```shell +$ echo status | nc -U /tmp/gh-ost.test.sample_data_0.sock +# Migrating `test`.`sample_data_0`; Ghost table is `test`.`_sample_data_0_gst` +# Migration started at Tue Jun 07 11:45:16 +0200 2016 +# chunk-size: 200; max lag: 1500ms; max-load: map[Threads_connected:20] +# Throttle additional flag file: /tmp/gh-ost.throttle +# Serving on unix socket: /tmp/gh-ost.test.sample_data_0.sock +# Serving on TCP port: 10001 +Copy: 0/2915 0.0%; Applied: 0; Backlog: 0/100; Elapsed: 40s(copy), 41s(total); streamer: mysql-bin.000550:49942; ETA: throttled, flag-file +``` + +```shell +$ echo "chunk-size=250" | nc -U /tmp/gh-ost.test.sample_data_0.sock +# Migrating `test`.`sample_data_0`; Ghost table is `test`.`_sample_data_0_gst` +# Migration started at Tue Jun 07 11:56:03 +0200 2016 +# chunk-size: 250; max lag: 1500ms; max-load: map[Threads_connected:20] +# Throttle additional flag file: /tmp/gh-ost.throttle +# Serving on unix socket: /tmp/gh-ost.test.sample_data_0.sock +# Serving on TCP port: 10001 +``` + +```shell +$ echo throttle | nc -U /tmp/gh-ost.test.sample_data_0.sock + +$ echo status | nc -U /tmp/gh-ost.test.sample_data_0.sock +# Migrating `test`.`sample_data_0`; Ghost table is `test`.`_sample_data_0_gst` +# Migration started at Tue Jun 07 11:56:03 +0200 2016 +# chunk-size: 250; max lag: 1500ms; max-load: map[Threads_connected:20] +# Throttle additional flag file: /tmp/gh-ost.throttle +# Serving on unix socket: /tmp/gh-ost.test.sample_data_0.sock +# Serving on TCP port: 10001 +Copy: 0/2915 0.0%; Applied: 0; Backlog: 0/100; Elapsed: 59s(copy), 59s(total); streamer: mysql-bin.000551:68067; ETA: throttled, commanded by user +``` diff --git a/go/base/context.go b/go/base/context.go index 1bda8e6..34acdb6 100644 --- a/go/base/context.go +++ b/go/base/context.go @@ -64,10 +64,14 @@ type MigrationContext struct { ThrottleControlReplicaKeys *mysql.InstanceKeyMap ThrottleFlagFile string ThrottleAdditionalFlagFile string + ThrottleCommandedByUser int64 MaxLoad map[string]int64 PostponeSwapTablesFlagFile string SwapTablesTimeoutSeconds int64 + ServeSocketFile string + ServeTCPPort int64 + Noop bool TestOnReplica bool OkToDropTable bool @@ -242,6 +246,16 @@ func (this *MigrationContext) TimeSincePointOfInterest() time.Duration { return time.Now().Sub(this.pointOfInterestTime) } +func (this *MigrationContext) SetChunkSize(chunkSize int64) { + if chunkSize < 100 { + chunkSize = 100 + } + if chunkSize > 100000 { + chunkSize = 100000 + } + atomic.StoreInt64(&this.ChunkSize, chunkSize) +} + func (this *MigrationContext) SetThrottled(throttle bool, reason string) { this.throttleMutex.Lock() defer this.throttleMutex.Unlock() diff --git a/go/cmd/gh-ost/main.go b/go/cmd/gh-ost/main.go index 2c10473..150a4d5 100644 --- a/go/cmd/gh-ost/main.go +++ b/go/cmd/gh-ost/main.go @@ -64,13 +64,8 @@ func main() { cutOver := flag.String("cut-over", "", "(mandatory) choose cut-over type (two-step, voluntary-lock)") flag.BoolVar(&migrationContext.SwitchToRowBinlogFormat, "switch-to-rbr", false, "let this tool automatically switch binary log format to 'ROW' on the replica, if needed. The format will NOT be switched back. I'm too scared to do that, and wish to protect you if you happen to execute another migration while this one is running") - flag.Int64Var(&migrationContext.ChunkSize, "chunk-size", 1000, "amount of rows to handle in each iteration (allowed range: 100-100,000)") - if migrationContext.ChunkSize < 100 { - migrationContext.ChunkSize = 100 - } - if migrationContext.ChunkSize > 100000 { - migrationContext.ChunkSize = 100000 - } + chunkSize := flag.Int64("chunk-size", 1000, "amount of rows to handle in each iteration (allowed range: 100-100,000)") + flag.Int64Var(&migrationContext.MaxLagMillisecondsThrottleThreshold, "max-lag-millis", 1500, "replication lag at which to throttle operation") flag.StringVar(&migrationContext.ReplictionLagQuery, "replication-lag-query", "", "Query that detects replication lag in seconds. Result can be a floating point (by default gh-ost issues SHOW SLAVE STATUS and reads Seconds_behind_master). If you're using pt-heartbeat, query would be something like: SELECT ROUND(UNIX_TIMESTAMP() - MAX(UNIX_TIMESTAMP(ts))) AS delay FROM my_schema.heartbeat") throttleControlReplicas := flag.String("throttle-control-replicas", "", "List of replicas on which to check for lag; comma delimited. Example: myhost1.com:3306,myhost2.com,myhost3.com:3307") @@ -78,6 +73,9 @@ func main() { flag.StringVar(&migrationContext.ThrottleAdditionalFlagFile, "throttle-additional-flag-file", "/tmp/gh-ost.throttle", "operation pauses when this file exists; hint: keep default, use for throttling multiple gh-ost operations") flag.StringVar(&migrationContext.PostponeSwapTablesFlagFile, "postpone-swap-tables-flag-file", "", "while this file exists, migration will postpone the final stage of swapping tables, and will keep on syncing the ghost table. Swapping would be ready to perform the moment the file is deleted.") + flag.StringVar(&migrationContext.ServeSocketFile, "serve-socket-file", "", "Unix socket file to serve on. Default: auto-determined and advertised upon startup") + flag.Int64Var(&migrationContext.ServeTCPPort, "serve-tcp-port", 0, "TCP port to serve on. Default: disabled") + maxLoad := flag.String("max-load", "", "Comma delimited status-name=threshold. e.g: 'Threads_running=100,Threads_connected=500'") quiet := flag.Bool("quiet", false, "quiet") verbose := flag.Bool("verbose", false, "verbose") @@ -148,6 +146,10 @@ func main() { if err := migrationContext.ReadMaxLoad(*maxLoad); err != nil { log.Fatale(err) } + if migrationContext.ServeSocketFile == "" { + migrationContext.ServeSocketFile = fmt.Sprintf("/tmp/gh-ost.%s.%s.sock", migrationContext.DatabaseName, migrationContext.OriginalTableName) + } + migrationContext.SetChunkSize(*chunkSize) migrationContext.ApplyCredentials() log.Infof("starting gh-ost %+v", AppVersion) diff --git a/go/logic/applier.go b/go/logic/applier.go index 9d1f0dd..dcf2539 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -373,7 +373,7 @@ func (this *Applier) CalculateNextIterationRangeEndValues() (hasFurtherRange boo this.migrationContext.UniqueKey.Columns.Names, this.migrationContext.MigrationIterationRangeMinValues.AbstractValues(), this.migrationContext.MigrationRangeMaxValues.AbstractValues(), - this.migrationContext.ChunkSize, + atomic.LoadInt64(&this.migrationContext.ChunkSize), this.migrationContext.GetIteration() == 0, fmt.Sprintf("iteration:%d", this.migrationContext.GetIteration()), ) diff --git a/go/logic/migrator.go b/go/logic/migrator.go index ead4f35..8e8c78f 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -6,10 +6,14 @@ package logic import ( + "bufio" "fmt" + "io" "math" "os" "os/signal" + "strconv" + "strings" "sync/atomic" "syscall" "time" @@ -41,6 +45,7 @@ type Migrator struct { inspector *Inspector applier *Applier eventsStreamer *EventsStreamer + server *Server migrationContext *base.MigrationContext tablesInPlace chan bool @@ -95,6 +100,9 @@ func (this *Migrator) acceptSignals() { func (this *Migrator) shouldThrottle() (result bool, reason string) { // User-based throttle + if atomic.LoadInt64(&this.migrationContext.ThrottleCommandedByUser) > 0 { + return true, "commanded by user" + } if this.migrationContext.ThrottleFlagFile != "" { if base.FileExists(this.migrationContext.ThrottleFlagFile) { // Throttle file defined and exists! @@ -321,6 +329,9 @@ func (this *Migrator) Migrate() (err error) { } } + if err := this.initiateServer(); err != nil { + return err + } if err := this.addDMLEventsListener(); err != nil { return err } @@ -518,6 +529,64 @@ func (this *Migrator) stopWritesAndCompleteMigrationOnReplica() (err error) { return nil } +func (this *Migrator) onServerCommand(command string, writer *bufio.Writer) (err error) { + tokens := strings.Split(command, "=") + command = strings.TrimSpace(tokens[0]) + arg := "" + if len(tokens) > 1 { + arg = strings.TrimSpace(tokens[1]) + } + switch command { + case "help": + { + fmt.Fprintln(writer, `available commands: + status # Print a status message + chunk-size= # Set a new chunk-size + throttle # Force throttling + no-throttle # End forced throttling (other throttling may still apply) + help # This message +`) + } + case "info", "status": + this.printMigrationStatusHint(writer) + this.printStatus(writer) + case "chunk-size": + { + if chunkSize, err := strconv.Atoi(arg); err != nil { + return log.Errore(err) + } else { + this.migrationContext.SetChunkSize(int64(chunkSize)) + this.printMigrationStatusHint(writer) + } + } + case "throttle", "pause", "suspend": + { + atomic.StoreInt64(&this.migrationContext.ThrottleCommandedByUser, 1) + } + case "no-throttle", "unthrottle", "resume", "continue": + { + atomic.StoreInt64(&this.migrationContext.ThrottleCommandedByUser, 0) + } + default: + return fmt.Errorf("Unknown command: %s", command) + } + writer.Flush() + return nil +} + +func (this *Migrator) initiateServer() (err error) { + this.server = NewServer(this.onServerCommand) + if err := this.server.BindSocketFile(); err != nil { + return err + } + if err := this.server.BindTCPPort(); err != nil { + return err + } + + go this.server.Serve() + return nil +} + func (this *Migrator) initiateInspector() (err error) { this.inspector = NewInspector() if err := this.inspector.InitDBConnections(); err != nil { @@ -563,34 +632,42 @@ func (this *Migrator) initiateStatus() error { return nil } -func (this *Migrator) printMigrationStatusHint() { - fmt.Println(fmt.Sprintf("# Migrating %s.%s; Ghost table is %s.%s", +func (this *Migrator) printMigrationStatusHint(writers ...io.Writer) { + writers = append(writers, os.Stdout) + w := io.MultiWriter(writers...) + fmt.Fprintln(w, fmt.Sprintf("# Migrating %s.%s; Ghost table is %s.%s", sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.OriginalTableName), sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.GetGhostTableName()), )) - fmt.Println(fmt.Sprintf("# Migration started at %+v", + fmt.Fprintln(w, fmt.Sprintf("# Migration started at %+v", this.migrationContext.StartTime.Format(time.RubyDate), )) - fmt.Println(fmt.Sprintf("# chunk-size: %+v; max lag: %+vms; max-load: %+v", + fmt.Fprintln(w, fmt.Sprintf("# chunk-size: %+v; max lag: %+vms; max-load: %+v", atomic.LoadInt64(&this.migrationContext.ChunkSize), atomic.LoadInt64(&this.migrationContext.MaxLagMillisecondsThrottleThreshold), this.migrationContext.MaxLoad, )) if this.migrationContext.ThrottleFlagFile != "" { - fmt.Println(fmt.Sprintf("# Throttle flag file: %+v", + fmt.Fprintln(w, fmt.Sprintf("# Throttle flag file: %+v", this.migrationContext.ThrottleFlagFile, )) } if this.migrationContext.ThrottleAdditionalFlagFile != "" { - fmt.Println(fmt.Sprintf("# Throttle additional flag file: %+v", + fmt.Fprintln(w, fmt.Sprintf("# Throttle additional flag file: %+v", this.migrationContext.ThrottleAdditionalFlagFile, )) } + fmt.Fprintln(w, fmt.Sprintf("# Serving on unix socket: %+v", + this.migrationContext.ServeSocketFile, + )) + if this.migrationContext.ServeTCPPort != 0 { + fmt.Fprintln(w, fmt.Sprintf("# Serving on TCP port: %+v", this.migrationContext.ServeTCPPort)) + } } -func (this *Migrator) printStatus() { +func (this *Migrator) printStatus(writers ...io.Writer) { elapsedTime := this.migrationContext.ElapsedTime() elapsedSeconds := int64(elapsedTime.Seconds()) totalRowsCopied := this.migrationContext.GetTotalRowsCopied() @@ -656,7 +733,9 @@ func (this *Migrator) printStatus() { fmt.Sprintf("copy iteration %d at %d", this.migrationContext.GetIteration(), time.Now().Unix()), status, ) - fmt.Println(status) + writers = append(writers, os.Stdout) + w := io.MultiWriter(writers...) + fmt.Fprintln(w, status) } func (this *Migrator) initiateHeartbeatListener() { diff --git a/go/logic/server.go b/go/logic/server.go new file mode 100644 index 0000000..215e374 --- /dev/null +++ b/go/logic/server.go @@ -0,0 +1,89 @@ +/* + Copyright 2016 GitHub Inc. + See https://github.com/github/gh-ost/blob/master/LICENSE +*/ + +package logic + +import ( + "bufio" + "fmt" + "net" + "os" + + "github.com/github/gh-ost/go/base" + "github.com/outbrain/golib/log" +) + +type onCommandFunc func(command string, writer *bufio.Writer) error + +// Server listens for requests on a socket file or via TCP +type Server struct { + migrationContext *base.MigrationContext + unixListener net.Listener + tcpListener net.Listener + onCommand onCommandFunc +} + +func NewServer(onCommand onCommandFunc) *Server { + return &Server{ + migrationContext: base.GetMigrationContext(), + onCommand: onCommand, + } +} + +func (this *Server) BindSocketFile() (err error) { + if this.migrationContext.ServeSocketFile == "" { + return nil + } + if base.FileExists(this.migrationContext.ServeSocketFile) { + os.Remove(this.migrationContext.ServeSocketFile) + } + this.unixListener, err = net.Listen("unix", this.migrationContext.ServeSocketFile) + if err != nil { + return err + } + log.Infof("Listening on unix socket file: %s", this.migrationContext.ServeSocketFile) + return nil +} + +func (this *Server) BindTCPPort() (err error) { + if this.migrationContext.ServeTCPPort == 0 { + return nil + } + this.tcpListener, err = net.Listen("tcp", fmt.Sprintf(":%d", this.migrationContext.ServeTCPPort)) + if err != nil { + return err + } + log.Infof("Listening on tcp port: %d", this.migrationContext.ServeTCPPort) + return nil +} + +func (this *Server) Serve() (err error) { + go func() { + for { + conn, err := this.unixListener.Accept() + if err != nil { + log.Errore(err) + } + go this.handleConnection(conn) + } + }() + go func() { + for { + conn, err := this.tcpListener.Accept() + if err != nil { + log.Errore(err) + } + go this.handleConnection(conn) + } + }() + + return nil +} + +func (this *Server) handleConnection(conn net.Conn) (err error) { + defer conn.Close() + command, _, err := bufio.NewReader(conn).ReadLine() + return this.onCommand(string(command), bufio.NewWriter(conn)) +}