adding interactive user commands

This commit is contained in:
Shlomi Noach 2016-06-07 11:59:17 +02:00
parent c9d02b99fc
commit fc00cb2289
7 changed files with 260 additions and 17 deletions

View File

@ -1,7 +1,7 @@
#!/bin/bash #!/bin/bash
# #
# #
RELEASE_VERSION="0.8.3" RELEASE_VERSION="0.8.4"
buildpath=/tmp/gh-ost buildpath=/tmp/gh-ost
target=gh-ost target=gh-ost

View File

@ -0,0 +1,59 @@
# Interactive commands
`gh-ost` is designed to be operations friendly. To that effect, it allows the user to control its behavior even while it is running.
### Interactive interfaces
`gh-ost` listens on:
- Unix socket file: either provided via `--serve-socket-file` or determined by `gh-ost`, this interface is always up.
When self-determined, `gh-ost` will advertise the identify of socket file upon start up and throughout the migration.
- TCP: if `--serve-tcp-port` is provided
Both interfaces may serve at the same time. Both respond to simple text command, which makes it easy to interact via shell.
### Known commands
- `help`: shows a brief list of available commands
- `status`: returns a status summary of migration progress and configuration
- `throttle`: force migration suspend
- `no-throttle`: cancel forced suspension (though other throttling reasons may still apply)
- `chunk-size=<newsize>`: modify the `chunk-size`; applies on next running copy-iteration
### Examples
While migration is running:
```shell
$ echo status | nc -U /tmp/gh-ost.test.sample_data_0.sock
# Migrating `test`.`sample_data_0`; Ghost table is `test`.`_sample_data_0_gst`
# Migration started at Tue Jun 07 11:45:16 +0200 2016
# chunk-size: 200; max lag: 1500ms; max-load: map[Threads_connected:20]
# Throttle additional flag file: /tmp/gh-ost.throttle
# Serving on unix socket: /tmp/gh-ost.test.sample_data_0.sock
# Serving on TCP port: 10001
Copy: 0/2915 0.0%; Applied: 0; Backlog: 0/100; Elapsed: 40s(copy), 41s(total); streamer: mysql-bin.000550:49942; ETA: throttled, flag-file
```
```shell
$ echo "chunk-size=250" | nc -U /tmp/gh-ost.test.sample_data_0.sock
# Migrating `test`.`sample_data_0`; Ghost table is `test`.`_sample_data_0_gst`
# Migration started at Tue Jun 07 11:56:03 +0200 2016
# chunk-size: 250; max lag: 1500ms; max-load: map[Threads_connected:20]
# Throttle additional flag file: /tmp/gh-ost.throttle
# Serving on unix socket: /tmp/gh-ost.test.sample_data_0.sock
# Serving on TCP port: 10001
```
```shell
$ echo throttle | nc -U /tmp/gh-ost.test.sample_data_0.sock
$ echo status | nc -U /tmp/gh-ost.test.sample_data_0.sock
# Migrating `test`.`sample_data_0`; Ghost table is `test`.`_sample_data_0_gst`
# Migration started at Tue Jun 07 11:56:03 +0200 2016
# chunk-size: 250; max lag: 1500ms; max-load: map[Threads_connected:20]
# Throttle additional flag file: /tmp/gh-ost.throttle
# Serving on unix socket: /tmp/gh-ost.test.sample_data_0.sock
# Serving on TCP port: 10001
Copy: 0/2915 0.0%; Applied: 0; Backlog: 0/100; Elapsed: 59s(copy), 59s(total); streamer: mysql-bin.000551:68067; ETA: throttled, commanded by user
```

View File

@ -64,10 +64,14 @@ type MigrationContext struct {
ThrottleControlReplicaKeys *mysql.InstanceKeyMap ThrottleControlReplicaKeys *mysql.InstanceKeyMap
ThrottleFlagFile string ThrottleFlagFile string
ThrottleAdditionalFlagFile string ThrottleAdditionalFlagFile string
ThrottleCommandedByUser int64
MaxLoad map[string]int64 MaxLoad map[string]int64
PostponeSwapTablesFlagFile string PostponeSwapTablesFlagFile string
SwapTablesTimeoutSeconds int64 SwapTablesTimeoutSeconds int64
ServeSocketFile string
ServeTCPPort int64
Noop bool Noop bool
TestOnReplica bool TestOnReplica bool
OkToDropTable bool OkToDropTable bool
@ -242,6 +246,16 @@ func (this *MigrationContext) TimeSincePointOfInterest() time.Duration {
return time.Now().Sub(this.pointOfInterestTime) return time.Now().Sub(this.pointOfInterestTime)
} }
func (this *MigrationContext) SetChunkSize(chunkSize int64) {
if chunkSize < 100 {
chunkSize = 100
}
if chunkSize > 100000 {
chunkSize = 100000
}
atomic.StoreInt64(&this.ChunkSize, chunkSize)
}
func (this *MigrationContext) SetThrottled(throttle bool, reason string) { func (this *MigrationContext) SetThrottled(throttle bool, reason string) {
this.throttleMutex.Lock() this.throttleMutex.Lock()
defer this.throttleMutex.Unlock() defer this.throttleMutex.Unlock()

View File

@ -64,13 +64,8 @@ func main() {
cutOver := flag.String("cut-over", "", "(mandatory) choose cut-over type (two-step, voluntary-lock)") cutOver := flag.String("cut-over", "", "(mandatory) choose cut-over type (two-step, voluntary-lock)")
flag.BoolVar(&migrationContext.SwitchToRowBinlogFormat, "switch-to-rbr", false, "let this tool automatically switch binary log format to 'ROW' on the replica, if needed. The format will NOT be switched back. I'm too scared to do that, and wish to protect you if you happen to execute another migration while this one is running") flag.BoolVar(&migrationContext.SwitchToRowBinlogFormat, "switch-to-rbr", false, "let this tool automatically switch binary log format to 'ROW' on the replica, if needed. The format will NOT be switched back. I'm too scared to do that, and wish to protect you if you happen to execute another migration while this one is running")
flag.Int64Var(&migrationContext.ChunkSize, "chunk-size", 1000, "amount of rows to handle in each iteration (allowed range: 100-100,000)") chunkSize := flag.Int64("chunk-size", 1000, "amount of rows to handle in each iteration (allowed range: 100-100,000)")
if migrationContext.ChunkSize < 100 {
migrationContext.ChunkSize = 100
}
if migrationContext.ChunkSize > 100000 {
migrationContext.ChunkSize = 100000
}
flag.Int64Var(&migrationContext.MaxLagMillisecondsThrottleThreshold, "max-lag-millis", 1500, "replication lag at which to throttle operation") flag.Int64Var(&migrationContext.MaxLagMillisecondsThrottleThreshold, "max-lag-millis", 1500, "replication lag at which to throttle operation")
flag.StringVar(&migrationContext.ReplictionLagQuery, "replication-lag-query", "", "Query that detects replication lag in seconds. Result can be a floating point (by default gh-ost issues SHOW SLAVE STATUS and reads Seconds_behind_master). If you're using pt-heartbeat, query would be something like: SELECT ROUND(UNIX_TIMESTAMP() - MAX(UNIX_TIMESTAMP(ts))) AS delay FROM my_schema.heartbeat") flag.StringVar(&migrationContext.ReplictionLagQuery, "replication-lag-query", "", "Query that detects replication lag in seconds. Result can be a floating point (by default gh-ost issues SHOW SLAVE STATUS and reads Seconds_behind_master). If you're using pt-heartbeat, query would be something like: SELECT ROUND(UNIX_TIMESTAMP() - MAX(UNIX_TIMESTAMP(ts))) AS delay FROM my_schema.heartbeat")
throttleControlReplicas := flag.String("throttle-control-replicas", "", "List of replicas on which to check for lag; comma delimited. Example: myhost1.com:3306,myhost2.com,myhost3.com:3307") throttleControlReplicas := flag.String("throttle-control-replicas", "", "List of replicas on which to check for lag; comma delimited. Example: myhost1.com:3306,myhost2.com,myhost3.com:3307")
@ -78,6 +73,9 @@ func main() {
flag.StringVar(&migrationContext.ThrottleAdditionalFlagFile, "throttle-additional-flag-file", "/tmp/gh-ost.throttle", "operation pauses when this file exists; hint: keep default, use for throttling multiple gh-ost operations") flag.StringVar(&migrationContext.ThrottleAdditionalFlagFile, "throttle-additional-flag-file", "/tmp/gh-ost.throttle", "operation pauses when this file exists; hint: keep default, use for throttling multiple gh-ost operations")
flag.StringVar(&migrationContext.PostponeSwapTablesFlagFile, "postpone-swap-tables-flag-file", "", "while this file exists, migration will postpone the final stage of swapping tables, and will keep on syncing the ghost table. Swapping would be ready to perform the moment the file is deleted.") flag.StringVar(&migrationContext.PostponeSwapTablesFlagFile, "postpone-swap-tables-flag-file", "", "while this file exists, migration will postpone the final stage of swapping tables, and will keep on syncing the ghost table. Swapping would be ready to perform the moment the file is deleted.")
flag.StringVar(&migrationContext.ServeSocketFile, "serve-socket-file", "", "Unix socket file to serve on. Default: auto-determined and advertised upon startup")
flag.Int64Var(&migrationContext.ServeTCPPort, "serve-tcp-port", 0, "TCP port to serve on. Default: disabled")
maxLoad := flag.String("max-load", "", "Comma delimited status-name=threshold. e.g: 'Threads_running=100,Threads_connected=500'") maxLoad := flag.String("max-load", "", "Comma delimited status-name=threshold. e.g: 'Threads_running=100,Threads_connected=500'")
quiet := flag.Bool("quiet", false, "quiet") quiet := flag.Bool("quiet", false, "quiet")
verbose := flag.Bool("verbose", false, "verbose") verbose := flag.Bool("verbose", false, "verbose")
@ -148,6 +146,10 @@ func main() {
if err := migrationContext.ReadMaxLoad(*maxLoad); err != nil { if err := migrationContext.ReadMaxLoad(*maxLoad); err != nil {
log.Fatale(err) log.Fatale(err)
} }
if migrationContext.ServeSocketFile == "" {
migrationContext.ServeSocketFile = fmt.Sprintf("/tmp/gh-ost.%s.%s.sock", migrationContext.DatabaseName, migrationContext.OriginalTableName)
}
migrationContext.SetChunkSize(*chunkSize)
migrationContext.ApplyCredentials() migrationContext.ApplyCredentials()
log.Infof("starting gh-ost %+v", AppVersion) log.Infof("starting gh-ost %+v", AppVersion)

View File

@ -373,7 +373,7 @@ func (this *Applier) CalculateNextIterationRangeEndValues() (hasFurtherRange boo
this.migrationContext.UniqueKey.Columns.Names, this.migrationContext.UniqueKey.Columns.Names,
this.migrationContext.MigrationIterationRangeMinValues.AbstractValues(), this.migrationContext.MigrationIterationRangeMinValues.AbstractValues(),
this.migrationContext.MigrationRangeMaxValues.AbstractValues(), this.migrationContext.MigrationRangeMaxValues.AbstractValues(),
this.migrationContext.ChunkSize, atomic.LoadInt64(&this.migrationContext.ChunkSize),
this.migrationContext.GetIteration() == 0, this.migrationContext.GetIteration() == 0,
fmt.Sprintf("iteration:%d", this.migrationContext.GetIteration()), fmt.Sprintf("iteration:%d", this.migrationContext.GetIteration()),
) )

View File

@ -6,10 +6,14 @@
package logic package logic
import ( import (
"bufio"
"fmt" "fmt"
"io"
"math" "math"
"os" "os"
"os/signal" "os/signal"
"strconv"
"strings"
"sync/atomic" "sync/atomic"
"syscall" "syscall"
"time" "time"
@ -41,6 +45,7 @@ type Migrator struct {
inspector *Inspector inspector *Inspector
applier *Applier applier *Applier
eventsStreamer *EventsStreamer eventsStreamer *EventsStreamer
server *Server
migrationContext *base.MigrationContext migrationContext *base.MigrationContext
tablesInPlace chan bool tablesInPlace chan bool
@ -95,6 +100,9 @@ func (this *Migrator) acceptSignals() {
func (this *Migrator) shouldThrottle() (result bool, reason string) { func (this *Migrator) shouldThrottle() (result bool, reason string) {
// User-based throttle // User-based throttle
if atomic.LoadInt64(&this.migrationContext.ThrottleCommandedByUser) > 0 {
return true, "commanded by user"
}
if this.migrationContext.ThrottleFlagFile != "" { if this.migrationContext.ThrottleFlagFile != "" {
if base.FileExists(this.migrationContext.ThrottleFlagFile) { if base.FileExists(this.migrationContext.ThrottleFlagFile) {
// Throttle file defined and exists! // Throttle file defined and exists!
@ -321,6 +329,9 @@ func (this *Migrator) Migrate() (err error) {
} }
} }
if err := this.initiateServer(); err != nil {
return err
}
if err := this.addDMLEventsListener(); err != nil { if err := this.addDMLEventsListener(); err != nil {
return err return err
} }
@ -518,6 +529,64 @@ func (this *Migrator) stopWritesAndCompleteMigrationOnReplica() (err error) {
return nil return nil
} }
func (this *Migrator) onServerCommand(command string, writer *bufio.Writer) (err error) {
tokens := strings.Split(command, "=")
command = strings.TrimSpace(tokens[0])
arg := ""
if len(tokens) > 1 {
arg = strings.TrimSpace(tokens[1])
}
switch command {
case "help":
{
fmt.Fprintln(writer, `available commands:
status # Print a status message
chunk-size=<newsize> # Set a new chunk-size
throttle # Force throttling
no-throttle # End forced throttling (other throttling may still apply)
help # This message
`)
}
case "info", "status":
this.printMigrationStatusHint(writer)
this.printStatus(writer)
case "chunk-size":
{
if chunkSize, err := strconv.Atoi(arg); err != nil {
return log.Errore(err)
} else {
this.migrationContext.SetChunkSize(int64(chunkSize))
this.printMigrationStatusHint(writer)
}
}
case "throttle", "pause", "suspend":
{
atomic.StoreInt64(&this.migrationContext.ThrottleCommandedByUser, 1)
}
case "no-throttle", "unthrottle", "resume", "continue":
{
atomic.StoreInt64(&this.migrationContext.ThrottleCommandedByUser, 0)
}
default:
return fmt.Errorf("Unknown command: %s", command)
}
writer.Flush()
return nil
}
func (this *Migrator) initiateServer() (err error) {
this.server = NewServer(this.onServerCommand)
if err := this.server.BindSocketFile(); err != nil {
return err
}
if err := this.server.BindTCPPort(); err != nil {
return err
}
go this.server.Serve()
return nil
}
func (this *Migrator) initiateInspector() (err error) { func (this *Migrator) initiateInspector() (err error) {
this.inspector = NewInspector() this.inspector = NewInspector()
if err := this.inspector.InitDBConnections(); err != nil { if err := this.inspector.InitDBConnections(); err != nil {
@ -563,34 +632,42 @@ func (this *Migrator) initiateStatus() error {
return nil return nil
} }
func (this *Migrator) printMigrationStatusHint() { func (this *Migrator) printMigrationStatusHint(writers ...io.Writer) {
fmt.Println(fmt.Sprintf("# Migrating %s.%s; Ghost table is %s.%s", writers = append(writers, os.Stdout)
w := io.MultiWriter(writers...)
fmt.Fprintln(w, fmt.Sprintf("# Migrating %s.%s; Ghost table is %s.%s",
sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.OriginalTableName), sql.EscapeName(this.migrationContext.OriginalTableName),
sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.GetGhostTableName()), sql.EscapeName(this.migrationContext.GetGhostTableName()),
)) ))
fmt.Println(fmt.Sprintf("# Migration started at %+v", fmt.Fprintln(w, fmt.Sprintf("# Migration started at %+v",
this.migrationContext.StartTime.Format(time.RubyDate), this.migrationContext.StartTime.Format(time.RubyDate),
)) ))
fmt.Println(fmt.Sprintf("# chunk-size: %+v; max lag: %+vms; max-load: %+v", fmt.Fprintln(w, fmt.Sprintf("# chunk-size: %+v; max lag: %+vms; max-load: %+v",
atomic.LoadInt64(&this.migrationContext.ChunkSize), atomic.LoadInt64(&this.migrationContext.ChunkSize),
atomic.LoadInt64(&this.migrationContext.MaxLagMillisecondsThrottleThreshold), atomic.LoadInt64(&this.migrationContext.MaxLagMillisecondsThrottleThreshold),
this.migrationContext.MaxLoad, this.migrationContext.MaxLoad,
)) ))
if this.migrationContext.ThrottleFlagFile != "" { if this.migrationContext.ThrottleFlagFile != "" {
fmt.Println(fmt.Sprintf("# Throttle flag file: %+v", fmt.Fprintln(w, fmt.Sprintf("# Throttle flag file: %+v",
this.migrationContext.ThrottleFlagFile, this.migrationContext.ThrottleFlagFile,
)) ))
} }
if this.migrationContext.ThrottleAdditionalFlagFile != "" { if this.migrationContext.ThrottleAdditionalFlagFile != "" {
fmt.Println(fmt.Sprintf("# Throttle additional flag file: %+v", fmt.Fprintln(w, fmt.Sprintf("# Throttle additional flag file: %+v",
this.migrationContext.ThrottleAdditionalFlagFile, this.migrationContext.ThrottleAdditionalFlagFile,
)) ))
} }
fmt.Fprintln(w, fmt.Sprintf("# Serving on unix socket: %+v",
this.migrationContext.ServeSocketFile,
))
if this.migrationContext.ServeTCPPort != 0 {
fmt.Fprintln(w, fmt.Sprintf("# Serving on TCP port: %+v", this.migrationContext.ServeTCPPort))
}
} }
func (this *Migrator) printStatus() { func (this *Migrator) printStatus(writers ...io.Writer) {
elapsedTime := this.migrationContext.ElapsedTime() elapsedTime := this.migrationContext.ElapsedTime()
elapsedSeconds := int64(elapsedTime.Seconds()) elapsedSeconds := int64(elapsedTime.Seconds())
totalRowsCopied := this.migrationContext.GetTotalRowsCopied() totalRowsCopied := this.migrationContext.GetTotalRowsCopied()
@ -656,7 +733,9 @@ func (this *Migrator) printStatus() {
fmt.Sprintf("copy iteration %d at %d", this.migrationContext.GetIteration(), time.Now().Unix()), fmt.Sprintf("copy iteration %d at %d", this.migrationContext.GetIteration(), time.Now().Unix()),
status, status,
) )
fmt.Println(status) writers = append(writers, os.Stdout)
w := io.MultiWriter(writers...)
fmt.Fprintln(w, status)
} }
func (this *Migrator) initiateHeartbeatListener() { func (this *Migrator) initiateHeartbeatListener() {

89
go/logic/server.go Normal file
View File

@ -0,0 +1,89 @@
/*
Copyright 2016 GitHub Inc.
See https://github.com/github/gh-ost/blob/master/LICENSE
*/
package logic
import (
"bufio"
"fmt"
"net"
"os"
"github.com/github/gh-ost/go/base"
"github.com/outbrain/golib/log"
)
type onCommandFunc func(command string, writer *bufio.Writer) error
// Server listens for requests on a socket file or via TCP
type Server struct {
migrationContext *base.MigrationContext
unixListener net.Listener
tcpListener net.Listener
onCommand onCommandFunc
}
func NewServer(onCommand onCommandFunc) *Server {
return &Server{
migrationContext: base.GetMigrationContext(),
onCommand: onCommand,
}
}
func (this *Server) BindSocketFile() (err error) {
if this.migrationContext.ServeSocketFile == "" {
return nil
}
if base.FileExists(this.migrationContext.ServeSocketFile) {
os.Remove(this.migrationContext.ServeSocketFile)
}
this.unixListener, err = net.Listen("unix", this.migrationContext.ServeSocketFile)
if err != nil {
return err
}
log.Infof("Listening on unix socket file: %s", this.migrationContext.ServeSocketFile)
return nil
}
func (this *Server) BindTCPPort() (err error) {
if this.migrationContext.ServeTCPPort == 0 {
return nil
}
this.tcpListener, err = net.Listen("tcp", fmt.Sprintf(":%d", this.migrationContext.ServeTCPPort))
if err != nil {
return err
}
log.Infof("Listening on tcp port: %d", this.migrationContext.ServeTCPPort)
return nil
}
func (this *Server) Serve() (err error) {
go func() {
for {
conn, err := this.unixListener.Accept()
if err != nil {
log.Errore(err)
}
go this.handleConnection(conn)
}
}()
go func() {
for {
conn, err := this.tcpListener.Accept()
if err != nil {
log.Errore(err)
}
go this.handleConnection(conn)
}
}()
return nil
}
func (this *Server) handleConnection(conn net.Conn) (err error) {
defer conn.Close()
command, _, err := bufio.NewReader(conn).ReadLine()
return this.onCommand(string(command), bufio.NewWriter(conn))
}