support for 'coordinates' command

This commit is contained in:
Shlomi Noach 2017-04-28 15:50:51 -07:00
parent de349a79c6
commit 79cee99f57
4 changed files with 32 additions and 0 deletions

View File

@ -17,6 +17,7 @@ Both interfaces may serve at the same time. Both respond to simple text command,
- `help`: shows a brief list of available commands
- `status`: returns a detailed status summary of migration progress and configuration
- `sup`: returns a brief status summary of migration progress
- `coordinates`: returns recent (though not exactly up to date) binary log coordinates of the inspected server
- `chunk-size=<newsize>`: modify the `chunk-size`; applies on next running copy-iteration
- `max-lag-millis=<max-lag>`: modify the maximum replication lag threshold (milliseconds, minimum value is `100`, i.e. `0.1` second)
- `max-load=<max-load-thresholds>`: modify the `max-load` config; applies on next running copy-iteration

View File

@ -188,6 +188,8 @@ type MigrationContext struct {
MigrationIterationRangeMinValues *sql.ColumnValues
MigrationIterationRangeMaxValues *sql.ColumnValues
recentBinlogCoordinates mysql.BinlogCoordinates
CanStopStreaming func() bool
}
@ -543,6 +545,19 @@ func (this *MigrationContext) SetNiceRatio(newRatio float64) {
this.niceRatio = newRatio
}
func (this *MigrationContext) GetRecentBinlogCoordinates() mysql.BinlogCoordinates {
this.throttleMutex.Lock()
defer this.throttleMutex.Unlock()
return this.recentBinlogCoordinates
}
func (this *MigrationContext) SetRecentBinlogCoordinates(coordinates mysql.BinlogCoordinates) {
this.throttleMutex.Lock()
defer this.throttleMutex.Unlock()
this.recentBinlogCoordinates = coordinates
}
// ReadMaxLoad parses the `--max-load` flag, which is in multiple key-value format,
// such as: 'Threads_running=100,Threads_connected=500'
// It only applies changes in case there's no parsing error.

View File

@ -952,6 +952,13 @@ func (this *Migrator) initiateStreaming() error {
}
log.Debugf("Done streaming")
}()
go func() {
ticker := time.Tick(1 * time.Second)
for range ticker {
this.migrationContext.SetRecentBinlogCoordinates(*this.eventsStreamer.GetCurrentBinlogCoordinates())
}
}()
return nil
}

View File

@ -144,6 +144,7 @@ func (this *Server) applyServerCommand(command string, writer *bufio.Writer) (pr
fmt.Fprintln(writer, `available commands:
status # Print a detailed status message
sup # Print a short status message
coordinates # Print the currently inspected coordinates
chunk-size=<newsize> # Set a new chunk-size
nice-ratio=<ratio> # Set a new nice-ratio, immediate sleep after each row-copy operation, float (examples: 0 is agrressive, 0.7 adds 70% runtime, 1.0 doubles runtime, 2.0 triples runtime, ...)
critical-load=<load> # Set a new set of max-load thresholds
@ -165,6 +166,14 @@ help # This message
return ForcePrintStatusOnlyRule, nil
case "info", "status":
return ForcePrintStatusAndHintRule, nil
case "coordinates":
{
if argIsQuestion || arg == "" {
fmt.Fprintf(writer, "%+v\n", this.migrationContext.GetRecentBinlogCoordinates())
return NoPrintStatusRule, nil
}
return NoPrintStatusRule, fmt.Errorf("coordinates are read-only")
}
case "chunk-size":
{
if argIsQuestion {