Throttling & critical load

- Added `--throttle-query` param (when returns > 0, throttling applies)
- Added `--critical-load`, similar to `--max-load` but implies panic and quit
- Recoded *-load as `LoadMap`
- More info on *-load throttle/panic
- `printStatus()` now gets printing heuristic. Always shows up on interactive `"status"`
- Fixed `change column` (aka rename) handling with quotes
- Removed legacy `mysqlbinlog` parser code
- Added tests
This commit is contained in:
Shlomi Noach 2016-06-18 21:12:07 +02:00
parent b31ba4e8ce
commit 23cb8ea7e9
14 changed files with 381 additions and 339 deletions

View File

@ -1,7 +1,7 @@
#!/bin/bash
#
#
RELEASE_VERSION="0.9.4"
RELEASE_VERSION="0.9.5"
buildpath=/tmp/gh-ost
target=gh-ost

View File

@ -7,7 +7,6 @@ package base
import (
"fmt"
"strconv"
"strings"
"sync"
"sync/atomic"
@ -65,9 +64,10 @@ type MigrationContext struct {
ThrottleControlReplicaKeys *mysql.InstanceKeyMap
ThrottleFlagFile string
ThrottleAdditionalFlagFile string
ThrottleQuery string
ThrottleCommandedByUser int64
maxLoad map[string]int64
maxLoadMutex *sync.Mutex
maxLoad LoadMap
criticalLoad LoadMap
PostponeCutOverFlagFile string
SwapTablesTimeoutSeconds int64
PanicFlagFile string
@ -148,8 +148,8 @@ func newMigrationContext() *MigrationContext {
ApplierConnectionConfig: mysql.NewConnectionConfig(),
MaxLagMillisecondsThrottleThreshold: 1000,
SwapTablesTimeoutSeconds: 3,
maxLoad: make(map[string]int64),
maxLoadMutex: &sync.Mutex{},
maxLoad: NewLoadMap(),
criticalLoad: NewLoadMap(),
throttleMutex: &sync.Mutex{},
ThrottleControlReplicaKeys: mysql.NewInstanceKeyMap(),
configMutex: &sync.Mutex{},
@ -278,46 +278,64 @@ func (this *MigrationContext) IsThrottled() (bool, string) {
return this.isThrottled, this.throttleReason
}
func (this *MigrationContext) GetMaxLoad() map[string]int64 {
this.maxLoadMutex.Lock()
defer this.maxLoadMutex.Unlock()
func (this *MigrationContext) GetThrottleQuery() string {
var query string
tmpMaxLoadMap := make(map[string]int64)
for k, v := range this.maxLoad {
tmpMaxLoadMap[k] = v
}
return tmpMaxLoadMap
this.throttleMutex.Lock()
defer this.throttleMutex.Unlock()
query = this.ThrottleQuery
return query
}
func (this *MigrationContext) SetThrottleQuery(newQuery string) {
this.throttleMutex.Lock()
defer this.throttleMutex.Unlock()
this.ThrottleQuery = newQuery
}
func (this *MigrationContext) GetMaxLoad() LoadMap {
this.throttleMutex.Lock()
defer this.throttleMutex.Unlock()
return this.maxLoad.Duplicate()
}
func (this *MigrationContext) GetCriticalLoad() LoadMap {
this.throttleMutex.Lock()
defer this.throttleMutex.Unlock()
return this.criticalLoad.Duplicate()
}
// ReadMaxLoad parses the `--max-load` flag, which is in multiple key-value format,
// such as: 'Threads_running=100,Threads_connected=500'
// It only applies changes in case there's no parsing error.
func (this *MigrationContext) ReadMaxLoad(maxLoadList string) error {
if maxLoadList == "" {
return nil
loadMap, err := ParseLoadMap(maxLoadList)
if err != nil {
return err
}
this.maxLoadMutex.Lock()
defer this.maxLoadMutex.Unlock()
this.throttleMutex.Lock()
defer this.throttleMutex.Unlock()
tmpMaxLoadMap := make(map[string]int64)
this.maxLoad = loadMap
return nil
}
maxLoadConditions := strings.Split(maxLoadList, ",")
for _, maxLoadCondition := range maxLoadConditions {
maxLoadTokens := strings.Split(maxLoadCondition, "=")
if len(maxLoadTokens) != 2 {
return fmt.Errorf("Error parsing max-load condition: %s", maxLoadCondition)
}
if maxLoadTokens[0] == "" {
return fmt.Errorf("Error parsing status variable in max-load condition: %s", maxLoadCondition)
}
if n, err := strconv.ParseInt(maxLoadTokens[1], 10, 0); err != nil {
return fmt.Errorf("Error parsing numeric value in max-load condition: %s", maxLoadCondition)
} else {
tmpMaxLoadMap[maxLoadTokens[0]] = n
}
// ReadMaxLoad parses the `--max-load` flag, which is in multiple key-value format,
// such as: 'Threads_running=100,Threads_connected=500'
// It only applies changes in case there's no parsing error.
func (this *MigrationContext) ReadCriticalLoad(criticalLoadList string) error {
loadMap, err := ParseLoadMap(criticalLoadList)
if err != nil {
return err
}
this.throttleMutex.Lock()
defer this.throttleMutex.Unlock()
this.maxLoad = tmpMaxLoadMap
this.criticalLoad = loadMap
return nil
}

70
go/base/load_map.go Normal file
View File

@ -0,0 +1,70 @@
/*
Copyright 2016 GitHub Inc.
See https://github.com/github/gh-ost/blob/master/LICENSE
*/
package base
import (
"fmt"
"sort"
"strconv"
"strings"
)
// LoadMap is a mapping of status variable & threshold
// e.g. [Threads_connected: 100, Threads_running: 50]
type LoadMap map[string]int64
func NewLoadMap() LoadMap {
result := make(map[string]int64)
return result
}
// NewLoadMap parses a `--*-load` flag (e.g. `--max-load`), which is in multiple
// key-value format, such as:
// 'Threads_running=100,Threads_connected=500'
func ParseLoadMap(loadList string) (LoadMap, error) {
result := NewLoadMap()
if loadList == "" {
return result, nil
}
loadConditions := strings.Split(loadList, ",")
for _, loadCondition := range loadConditions {
loadTokens := strings.Split(loadCondition, "=")
if len(loadTokens) != 2 {
return result, fmt.Errorf("Error parsing load condition: %s", loadCondition)
}
if loadTokens[0] == "" {
return result, fmt.Errorf("Error parsing status variable in load condition: %s", loadCondition)
}
if n, err := strconv.ParseInt(loadTokens[1], 10, 0); err != nil {
return result, fmt.Errorf("Error parsing numeric value in load condition: %s", loadCondition)
} else {
result[loadTokens[0]] = n
}
}
return result, nil
}
// Duplicate creates a clone of this map
func (this *LoadMap) Duplicate() LoadMap {
dup := make(map[string]int64)
for k, v := range *this {
dup[k] = v
}
return dup
}
// String() returns a string representation of this map
func (this *LoadMap) String() string {
tokens := []string{}
for key, val := range *this {
token := fmt.Sprintf("%s=%d", key, val)
tokens = append(tokens, token)
}
sort.Strings(tokens)
return strings.Join(tokens, ",")
}

58
go/base/load_map_test.go Normal file
View File

@ -0,0 +1,58 @@
/*
Copyright 2016 GitHub Inc.
See https://github.com/github/gh-ost/blob/master/LICENSE
*/
package base
import (
"testing"
"github.com/outbrain/golib/log"
test "github.com/outbrain/golib/tests"
)
func init() {
log.SetLevel(log.ERROR)
}
func TestParseLoadMap(t *testing.T) {
{
loadList := ""
m, err := ParseLoadMap(loadList)
test.S(t).ExpectNil(err)
test.S(t).ExpectEquals(len(m), 0)
}
{
loadList := "threads_running=20,threads_connected=10"
m, err := ParseLoadMap(loadList)
test.S(t).ExpectNil(err)
test.S(t).ExpectEquals(len(m), 2)
test.S(t).ExpectEquals(m["threads_running"], int64(20))
test.S(t).ExpectEquals(m["threads_connected"], int64(10))
}
{
loadList := "threads_running=20=30,threads_connected=10"
_, err := ParseLoadMap(loadList)
test.S(t).ExpectNotNil(err)
}
{
loadList := "threads_running=20,threads_connected"
_, err := ParseLoadMap(loadList)
test.S(t).ExpectNotNil(err)
}
}
func TestString(t *testing.T) {
{
m, _ := ParseLoadMap("")
s := m.String()
test.S(t).ExpectEquals(s, "")
}
{
loadList := "threads_running=20,threads_connected=10"
m, _ := ParseLoadMap(loadList)
s := m.String()
test.S(t).ExpectEquals(s, "threads_connected=10,threads_running=20")
}
}

View File

@ -33,10 +33,12 @@ func FileExists(fileName string) bool {
return false
}
// StringContainsAll returns true if `s` contains all non empty given `substrings`
// The function returns `false` if no non-empty arguments are given.
func StringContainsAll(s string, substrings ...string) bool {
nonEmptyStringsFound := false
for _, substring := range substrings {
if s == "" {
if substring == "" {
continue
}
if strings.Contains(s, substring) {

29
go/base/utils_test.go Normal file
View File

@ -0,0 +1,29 @@
/*
Copyright 2016 GitHub Inc.
See https://github.com/github/gh-ost/blob/master/LICENSE
*/
package base
import (
"testing"
"github.com/outbrain/golib/log"
test "github.com/outbrain/golib/tests"
)
func init() {
log.SetLevel(log.ERROR)
}
func TestStringContainsAll(t *testing.T) {
s := `insert,delete,update`
test.S(t).ExpectFalse(StringContainsAll(s))
test.S(t).ExpectFalse(StringContainsAll(s, ""))
test.S(t).ExpectFalse(StringContainsAll(s, "drop"))
test.S(t).ExpectTrue(StringContainsAll(s, "insert"))
test.S(t).ExpectFalse(StringContainsAll(s, "insert", "drop"))
test.S(t).ExpectTrue(StringContainsAll(s, "insert", ""))
test.S(t).ExpectTrue(StringContainsAll(s, "insert", "update", "delete"))
}

View File

@ -1,226 +0,0 @@
/*
Copyright 2016 GitHub Inc.
See https://github.com/github/gh-ost/blob/master/LICENSE
*/
package binlog
import (
"bufio"
"bytes"
"fmt"
"path"
"regexp"
"strconv"
// "strings"
"github.com/github/gh-ost/go/os"
"github.com/outbrain/golib/log"
)
var (
binlogChunkSizeBytes uint64 = 32 * 1024 * 1024
startEntryRegexp = regexp.MustCompile("^# at ([0-9]+)$")
startEntryUnknownTableRegexp = regexp.MustCompile("^### Row event for unknown table .*? at ([0-9]+)$")
endLogPosRegexp = regexp.MustCompile("^#[0-9]{6} .*? end_log_pos ([0-9]+)")
statementRegxp = regexp.MustCompile("### (INSERT INTO|UPDATE|DELETE FROM) `(.*?)`[.]`(.*?)`")
tokenRegxp = regexp.MustCompile("### (WHERE|SET)$")
positionalColumnRegexp = regexp.MustCompile("### @([0-9]+)=(.+)$")
)
// BinlogEntryState is a state in the binlog parser automaton / state machine
type BinlogEntryState string
// States of the state machine
const (
InvalidState BinlogEntryState = "InvalidState"
SearchForStartPosOrStatementState = "SearchForStartPosOrStatementState"
ExpectEndLogPosState = "ExpectEndLogPosState"
ExpectTokenState = "ExpectTokenState"
PositionalColumnAssignmentState = "PositionalColumnAssignmentState"
)
// MySQLBinlogReader reads binary log entries by executing the `mysqlbinlog`
// process and textually parsing its output
type MySQLBinlogReader struct {
Basedir string
Datadir string
MySQLBinlogBinary string
}
// NewMySQLBinlogReader creates a new reader that directly parses binlog files from the filesystem
func NewMySQLBinlogReader(basedir string, datadir string) (mySQLBinlogReader *MySQLBinlogReader) {
mySQLBinlogReader = &MySQLBinlogReader{
Basedir: basedir,
Datadir: datadir,
}
mySQLBinlogReader.MySQLBinlogBinary = path.Join(mySQLBinlogReader.Basedir, "bin/mysqlbinlog")
return mySQLBinlogReader
}
// ReadEntries will read binlog entries from parsed text output of `mysqlbinlog` utility
func (this *MySQLBinlogReader) ReadEntries(logFile string, startPos uint64, stopPos uint64) (entries [](*BinlogEntry), err error) {
if startPos == 0 {
startPos = 4
}
done := false
chunkStartPos := startPos
for !done {
chunkStopPos := chunkStartPos + binlogChunkSizeBytes
if chunkStopPos > stopPos && stopPos != 0 {
chunkStopPos = stopPos
}
log.Debugf("Next chunk range %d - %d", chunkStartPos, chunkStopPos)
binlogFilePath := path.Join(this.Datadir, logFile)
command := fmt.Sprintf(`%s --verbose --base64-output=DECODE-ROWS --start-position=%d --stop-position=%d %s`, this.MySQLBinlogBinary, chunkStartPos, chunkStopPos, binlogFilePath)
entriesBytes, err := os.RunCommandWithOutput(command)
if err != nil {
return entries, log.Errore(err)
}
chunkEntries, err := parseEntries(bufio.NewScanner(bytes.NewReader(entriesBytes)), logFile)
if err != nil {
return entries, log.Errore(err)
}
if len(chunkEntries) == 0 {
done = true
} else {
entries = append(entries, chunkEntries...)
lastChunkEntry := chunkEntries[len(chunkEntries)-1]
chunkStartPos = lastChunkEntry.EndLogPos
}
}
return entries, err
}
// automaton step: accept wither beginning of new entry, or beginning of new statement
func searchForStartPosOrStatement(scanner *bufio.Scanner, binlogEntry *BinlogEntry, previousEndLogPos uint64) (nextState BinlogEntryState, nextBinlogEntry *BinlogEntry, err error) {
onStartEntry := func(submatch []string) (BinlogEntryState, *BinlogEntry, error) {
startLogPos, _ := strconv.ParseUint(submatch[1], 10, 64)
if previousEndLogPos != 0 && startLogPos != previousEndLogPos {
return InvalidState, binlogEntry, fmt.Errorf("Expected startLogPos %+v to equal previous endLogPos %+v", startLogPos, previousEndLogPos)
}
nextBinlogEntry = binlogEntry
if binlogEntry.Coordinates.LogPos != 0 && binlogEntry.DmlEvent != nil {
// Current entry is already a true entry, with startpos and with statement
nextBinlogEntry = NewBinlogEntry(binlogEntry.Coordinates.LogFile, startLogPos)
}
return ExpectEndLogPosState, nextBinlogEntry, nil
}
onStatementEntry := func(submatch []string) (BinlogEntryState, *BinlogEntry, error) {
nextBinlogEntry = binlogEntry
if binlogEntry.Coordinates.LogPos != 0 && binlogEntry.DmlEvent != nil {
// Current entry is already a true entry, with startpos and with statement
nextBinlogEntry = binlogEntry.Duplicate()
}
nextBinlogEntry.DmlEvent = NewBinlogDMLEvent(submatch[2], submatch[3], ToEventDML(submatch[1]))
return ExpectTokenState, nextBinlogEntry, nil
}
// Defuncting the following:
// onPositionalColumn := func(submatch []string) (BinlogEntryState, *BinlogEntry, error) {
// columnIndex, _ := strconv.ParseUint(submatch[1], 10, 64)
// if _, found := binlogEntry.PositionalColumns[columnIndex]; found {
// return InvalidState, binlogEntry, fmt.Errorf("Positional column %+v found more than once in %+v, statement=%+v", columnIndex, binlogEntry.LogPos, binlogEntry.DmlEvent.DML)
// }
// columnValue := submatch[2]
// columnValue = strings.TrimPrefix(columnValue, "'")
// columnValue = strings.TrimSuffix(columnValue, "'")
// binlogEntry.PositionalColumns[columnIndex] = columnValue
//
// return SearchForStartPosOrStatementState, binlogEntry, nil
// }
line := scanner.Text()
if submatch := startEntryRegexp.FindStringSubmatch(line); len(submatch) > 1 {
return onStartEntry(submatch)
}
if submatch := startEntryUnknownTableRegexp.FindStringSubmatch(line); len(submatch) > 1 {
return onStartEntry(submatch)
}
if submatch := statementRegxp.FindStringSubmatch(line); len(submatch) > 1 {
return onStatementEntry(submatch)
}
if submatch := positionalColumnRegexp.FindStringSubmatch(line); len(submatch) > 1 {
// Defuncting return onPositionalColumn(submatch)
}
// Haven't found a match
return SearchForStartPosOrStatementState, binlogEntry, nil
}
// automaton step: expect an end_log_pos line`
func expectEndLogPos(scanner *bufio.Scanner, binlogEntry *BinlogEntry) (nextState BinlogEntryState, err error) {
line := scanner.Text()
submatch := endLogPosRegexp.FindStringSubmatch(line)
if len(submatch) > 1 {
binlogEntry.EndLogPos, _ = strconv.ParseUint(submatch[1], 10, 64)
return SearchForStartPosOrStatementState, nil
}
return InvalidState, fmt.Errorf("Expected to find end_log_pos following pos %+v", binlogEntry.Coordinates.LogPos)
}
// automaton step: a not-strictly-required but good-to-have-around validation that
// we see an expected token following a statement
func expectToken(scanner *bufio.Scanner, binlogEntry *BinlogEntry) (nextState BinlogEntryState, err error) {
line := scanner.Text()
if submatch := tokenRegxp.FindStringSubmatch(line); len(submatch) > 1 {
return SearchForStartPosOrStatementState, nil
}
return InvalidState, fmt.Errorf("Expected to find token following pos %+v", binlogEntry.Coordinates.LogPos)
}
// parseEntries will parse output of `mysqlbinlog --verbose --base64-output=DECODE-ROWS`
// It issues an automaton / state machine to do its thang.
func parseEntries(scanner *bufio.Scanner, logFile string) (entries [](*BinlogEntry), err error) {
binlogEntry := NewBinlogEntry(logFile, 0)
var state BinlogEntryState = SearchForStartPosOrStatementState
var endLogPos uint64
appendBinlogEntry := func() {
if binlogEntry.Coordinates.LogPos == 0 {
return
}
if binlogEntry.DmlEvent == nil {
return
}
entries = append(entries, binlogEntry)
log.Debugf("entry: %+v", *binlogEntry)
fmt.Println(fmt.Sprintf("%s `%s`.`%s`", binlogEntry.DmlEvent.DML, binlogEntry.DmlEvent.DatabaseName, binlogEntry.DmlEvent.TableName))
}
for scanner.Scan() {
switch state {
case SearchForStartPosOrStatementState:
{
var nextBinlogEntry *BinlogEntry
state, nextBinlogEntry, err = searchForStartPosOrStatement(scanner, binlogEntry, endLogPos)
if nextBinlogEntry != binlogEntry {
appendBinlogEntry()
binlogEntry = nextBinlogEntry
}
}
case ExpectEndLogPosState:
{
state, err = expectEndLogPos(scanner, binlogEntry)
}
case ExpectTokenState:
{
state, err = expectToken(scanner, binlogEntry)
}
default:
{
err = fmt.Errorf("Unexpected state %+v", state)
}
}
if err != nil {
return entries, log.Errore(err)
}
}
appendBinlogEntry()
return entries, err
}

View File

@ -1,50 +0,0 @@
/*
Copyright 2016 GitHub Inc.
See https://github.com/github/gh-ost/blob/master/LICENSE
*/
package binlog
import (
"bufio"
"os"
"testing"
"github.com/outbrain/golib/log"
test "github.com/outbrain/golib/tests"
)
func init() {
log.SetLevel(log.ERROR)
}
func __TestRBRSample0(t *testing.T) {
testFile, err := os.Open("testdata/rbr-sample-0.txt")
test.S(t).ExpectNil(err)
defer testFile.Close()
scanner := bufio.NewScanner(testFile)
entries, err := parseEntries(scanner)
test.S(t).ExpectNil(err)
test.S(t).ExpectEquals(len(entries), 17)
test.S(t).ExpectEquals(entries[0].DatabaseName, "test")
test.S(t).ExpectEquals(entries[0].TableName, "samplet")
test.S(t).ExpectEquals(entries[0].StatementType, "INSERT")
test.S(t).ExpectEquals(entries[1].StatementType, "INSERT")
test.S(t).ExpectEquals(entries[2].StatementType, "INSERT")
test.S(t).ExpectEquals(entries[3].StatementType, "INSERT")
test.S(t).ExpectEquals(entries[4].StatementType, "INSERT")
test.S(t).ExpectEquals(entries[5].StatementType, "INSERT")
test.S(t).ExpectEquals(entries[6].StatementType, "UPDATE")
test.S(t).ExpectEquals(entries[7].StatementType, "DELETE")
test.S(t).ExpectEquals(entries[8].StatementType, "UPDATE")
test.S(t).ExpectEquals(entries[9].StatementType, "INSERT")
test.S(t).ExpectEquals(entries[10].StatementType, "INSERT")
test.S(t).ExpectEquals(entries[11].StatementType, "DELETE")
test.S(t).ExpectEquals(entries[12].StatementType, "DELETE")
test.S(t).ExpectEquals(entries[13].StatementType, "INSERT")
test.S(t).ExpectEquals(entries[14].StatementType, "UPDATE")
test.S(t).ExpectEquals(entries[15].StatementType, "DELETE")
test.S(t).ExpectEquals(entries[16].StatementType, "INSERT")
}

View File

@ -73,6 +73,7 @@ func main() {
flag.Int64Var(&migrationContext.MaxLagMillisecondsThrottleThreshold, "max-lag-millis", 1500, "replication lag at which to throttle operation")
flag.StringVar(&migrationContext.ReplictionLagQuery, "replication-lag-query", "", "Query that detects replication lag in seconds. Result can be a floating point (by default gh-ost issues SHOW SLAVE STATUS and reads Seconds_behind_master). If you're using pt-heartbeat, query would be something like: SELECT ROUND(UNIX_TIMESTAMP() - MAX(UNIX_TIMESTAMP(ts))) AS delay FROM my_schema.heartbeat")
throttleControlReplicas := flag.String("throttle-control-replicas", "", "List of replicas on which to check for lag; comma delimited. Example: myhost1.com:3306,myhost2.com,myhost3.com:3307")
flag.StringVar(&migrationContext.ThrottleQuery, "throttle-query", "", "when given, issued (every second) to check if operation should throttle. Expecting to return zero for no-throttle, >0 for throttle. Query is issued on the migrated server. Make sure this query is lightweight")
flag.StringVar(&migrationContext.ThrottleFlagFile, "throttle-flag-file", "", "operation pauses when this file exists; hint: use a file that is specific to the table being altered")
flag.StringVar(&migrationContext.ThrottleAdditionalFlagFile, "throttle-additional-flag-file", "/tmp/gh-ost.throttle", "operation pauses when this file exists; hint: keep default, use for throttling multiple gh-ost operations")
flag.StringVar(&migrationContext.PostponeCutOverFlagFile, "postpone-cut-over-flag-file", "", "while this file exists, migration will postpone the final stage of swapping tables, and will keep on syncing the ghost table. Cut-over/swapping would be ready to perform the moment the file is deleted.")
@ -81,7 +82,8 @@ func main() {
flag.StringVar(&migrationContext.ServeSocketFile, "serve-socket-file", "", "Unix socket file to serve on. Default: auto-determined and advertised upon startup")
flag.Int64Var(&migrationContext.ServeTCPPort, "serve-tcp-port", 0, "TCP port to serve on. Default: disabled")
maxLoad := flag.String("max-load", "", "Comma delimited status-name=threshold. e.g: 'Threads_running=100,Threads_connected=500'")
maxLoad := flag.String("max-load", "", "Comma delimited status-name=threshold. e.g: 'Threads_running=100,Threads_connected=500'. When status exceeds threshold, app throttles writes")
criticalLoad := flag.String("critical-load", "", "Comma delimited status-name=threshold, same format as `--max-load`. When status exceeds threshold, app panics and quits")
quiet := flag.Bool("quiet", false, "quiet")
verbose := flag.Bool("verbose", false, "verbose")
debug := flag.Bool("debug", false, "debug mode (very verbose)")
@ -156,6 +158,9 @@ func main() {
if err := migrationContext.ReadMaxLoad(*maxLoad); err != nil {
log.Fatale(err)
}
if err := migrationContext.ReadCriticalLoad(*criticalLoad); err != nil {
log.Fatale(err)
}
if migrationContext.ServeSocketFile == "" {
migrationContext.ServeSocketFile = fmt.Sprintf("/tmp/gh-ost.%s.%s.sock", migrationContext.DatabaseName, migrationContext.OriginalTableName)
}

View File

@ -263,6 +263,19 @@ func (this *Applier) InitiateHeartbeat(heartbeatIntervalMilliseconds int64) {
}
}
func (this *Applier) ExecuteThrottleQuery() (int64, error) {
throttleQuery := this.migrationContext.GetThrottleQuery()
if throttleQuery == "" {
return 0, nil
}
var result int64
if err := this.db.QueryRow(throttleQuery).Scan(&result); err != nil {
return 0, log.Errore(err)
}
return result, nil
}
// ReadMigrationMinValues
func (this *Applier) ReadMigrationMinValues(uniqueKey *sql.UniqueKey) error {
log.Debugf("Reading migration range according to key: %s", uniqueKey.Name)

View File

@ -40,6 +40,14 @@ const (
heartbeatIntervalMilliseconds = 1000
)
type PrintStatusRule int
const (
HeuristicPrintStatusRule PrintStatusRule = iota
ForcePrintStatusRule = iota
ForcePrintStatusAndHint = iota
)
// Migrator is the main schema migration flow manager.
type Migrator struct {
parser *sql.Parser
@ -106,6 +114,17 @@ func (this *Migrator) shouldThrottle() (result bool, reason string) {
this.panicAbort <- fmt.Errorf("Found panic-file %s. Aborting without cleanup", this.migrationContext.PanicFlagFile)
}
}
criticalLoad := this.migrationContext.GetCriticalLoad()
for variableName, threshold := range criticalLoad {
value, err := this.applier.ShowStatusVariable(variableName)
if err != nil {
return true, fmt.Sprintf("%s %s", variableName, err)
}
if value >= threshold {
this.panicAbort <- fmt.Errorf("critical-load met: %s=%d, >=%d", variableName, value, threshold)
}
}
// Back to throttle considerations
// User-based throttle
@ -145,8 +164,13 @@ func (this *Migrator) shouldThrottle() (result bool, reason string) {
if err != nil {
return true, fmt.Sprintf("%s %s", variableName, err)
}
if value > threshold {
return true, fmt.Sprintf("%s=%d", variableName, value)
if value >= threshold {
return true, fmt.Sprintf("max-load %s=%d >= %d", variableName, value, threshold)
}
}
if this.migrationContext.GetThrottleQuery() != "" {
if res, _ := this.applier.ExecuteThrottleQuery(); res > 0 {
return true, "throttle-query"
}
}
@ -311,6 +335,7 @@ func (this *Migrator) validateStatement() (err error) {
if !this.migrationContext.ApproveRenamedColumns {
return fmt.Errorf("Alter statement has column(s) renamed. gh-ost suspects the following renames: %v; but to proceed you must approve via `--approve-renamed-columns` (or you can skip renamed columns via `--skip-renamed-columns`)", this.parser.GetNonTrivialRenames())
}
log.Infof("Alter statement has column(s) renamed. gh-ost finds the following renames: %v; --approve-renamed-columns is given and so migration proceeds.", this.parser.GetNonTrivialRenames())
}
return nil
}
@ -375,7 +400,7 @@ func (this *Migrator) Migrate() (err error) {
log.Debugf("Operating until row copy is complete")
this.consumeRowCopyComplete()
log.Infof("Row copy complete")
this.printStatus()
this.printStatus(ForcePrintStatusRule)
if err := this.cutOver(); err != nil {
return err
@ -459,8 +484,7 @@ func (this *Migrator) waitForEventsUpToLock() (err error) {
waitForEventsUpToLockDuration := time.Now().Sub(waitForEventsUpToLockStartTime)
log.Infof("Done waiting for events up to lock; duration=%+v", waitForEventsUpToLockDuration)
this.printMigrationStatusHint()
this.printStatus()
this.printStatus(ForcePrintStatusAndHint)
return nil
}
@ -621,17 +645,18 @@ func (this *Migrator) onServerCommand(command string, writer *bufio.Writer) (err
case "help":
{
fmt.Fprintln(writer, `available commands:
status # Print a status message
chunk-size=<newsize> # Set a new chunk-size
max-load=<maxload> # Set a new set of max-load thresholds
throttle # Force throttling
no-throttle # End forced throttling (other throttling may still apply)
help # This message
status # Print a status message
chunk-size=<newsize> # Set a new chunk-size
critical-load=<load> # Set a new set of max-load thresholds
max-load=<load> # Set a new set of max-load thresholds
throttle-query=<query> # Set a new throttle-query
throttle # Force throttling
no-throttle # End forced throttling (other throttling may still apply)
help # This message
`)
}
case "info", "status":
this.printMigrationStatusHint(writer)
this.printStatus(writer)
this.printStatus(ForcePrintStatusAndHint, writer)
case "chunk-size":
{
if chunkSize, err := strconv.Atoi(arg); err != nil {
@ -639,7 +664,7 @@ func (this *Migrator) onServerCommand(command string, writer *bufio.Writer) (err
return log.Errore(err)
} else {
this.migrationContext.SetChunkSize(int64(chunkSize))
this.printMigrationStatusHint(writer)
this.printStatus(ForcePrintStatusAndHint, writer)
}
}
case "max-load":
@ -648,7 +673,20 @@ func (this *Migrator) onServerCommand(command string, writer *bufio.Writer) (err
fmt.Fprintf(writer, "%s\n", err.Error())
return log.Errore(err)
}
this.printMigrationStatusHint(writer)
this.printStatus(ForcePrintStatusAndHint, writer)
}
case "critical-load":
{
if err := this.migrationContext.ReadCriticalLoad(arg); err != nil {
fmt.Fprintf(writer, "%s\n", err.Error())
return log.Errore(err)
}
this.printStatus(ForcePrintStatusAndHint, writer)
}
case "throttle-query":
{
this.migrationContext.SetThrottleQuery(arg)
this.printStatus(ForcePrintStatusAndHint, writer)
}
case "throttle", "pause", "suspend":
{
@ -715,17 +753,16 @@ func (this *Migrator) initiateInspector() (err error) {
}
func (this *Migrator) initiateStatus() error {
this.printStatus()
this.printStatus(ForcePrintStatusAndHint)
statusTick := time.Tick(1 * time.Second)
for range statusTick {
go this.printStatus()
go this.printStatus(HeuristicPrintStatusRule)
}
return nil
}
func (this *Migrator) printMigrationStatusHint(writers ...io.Writer) {
writers = append(writers, os.Stdout)
w := io.MultiWriter(writers...)
fmt.Fprintln(w, fmt.Sprintf("# Migrating %s.%s; Ghost table is %s.%s",
sql.EscapeName(this.migrationContext.DatabaseName),
@ -737,10 +774,12 @@ func (this *Migrator) printMigrationStatusHint(writers ...io.Writer) {
this.migrationContext.StartTime.Format(time.RubyDate),
))
maxLoad := this.migrationContext.GetMaxLoad()
fmt.Fprintln(w, fmt.Sprintf("# chunk-size: %+v; max lag: %+vms; max-load: %+v",
criticalLoad := this.migrationContext.GetCriticalLoad()
fmt.Fprintln(w, fmt.Sprintf("# chunk-size: %+v; max lag: %+vms; max-load: %s; critical-load: %s",
atomic.LoadInt64(&this.migrationContext.ChunkSize),
atomic.LoadInt64(&this.migrationContext.MaxLagMillisecondsThrottleThreshold),
maxLoad,
maxLoad.String(),
criticalLoad.String(),
))
if this.migrationContext.ThrottleFlagFile != "" {
fmt.Fprintln(w, fmt.Sprintf("# Throttle flag file: %+v",
@ -752,6 +791,11 @@ func (this *Migrator) printMigrationStatusHint(writers ...io.Writer) {
this.migrationContext.ThrottleAdditionalFlagFile,
))
}
if throttleQuery := this.migrationContext.GetThrottleQuery(); throttleQuery != "" {
fmt.Fprintln(w, fmt.Sprintf("# Throttle query: %+v",
throttleQuery,
))
}
if this.migrationContext.PostponeCutOverFlagFile != "" {
fmt.Fprintln(w, fmt.Sprintf("# Postpone cut-over flag file: %+v",
this.migrationContext.PostponeCutOverFlagFile,
@ -770,7 +814,9 @@ func (this *Migrator) printMigrationStatusHint(writers ...io.Writer) {
}
}
func (this *Migrator) printStatus(writers ...io.Writer) {
func (this *Migrator) printStatus(rule PrintStatusRule, writers ...io.Writer) {
writers = append(writers, os.Stdout)
elapsedTime := this.migrationContext.ElapsedTime()
elapsedSeconds := int64(elapsedTime.Seconds())
totalRowsCopied := this.migrationContext.GetTotalRowsCopied()
@ -782,8 +828,11 @@ func (this *Migrator) printStatus(writers ...io.Writer) {
// Before status, let's see if we should print a nice reminder for what exactly we're doing here.
shouldPrintMigrationStatusHint := (elapsedSeconds%600 == 0)
if rule == ForcePrintStatusAndHint {
shouldPrintMigrationStatusHint = true
}
if shouldPrintMigrationStatusHint {
this.printMigrationStatusHint()
this.printMigrationStatusHint(writers...)
}
var etaSeconds float64 = math.MaxFloat64
@ -820,6 +869,9 @@ func (this *Migrator) printStatus(writers ...io.Writer) {
} else {
shouldPrintStatus = (elapsedSeconds%30 == 0)
}
if rule == ForcePrintStatusRule || rule == ForcePrintStatusAndHint {
shouldPrintStatus = true
}
if !shouldPrintStatus {
return
}
@ -838,7 +890,6 @@ func (this *Migrator) printStatus(writers ...io.Writer) {
fmt.Sprintf("copy iteration %d at %d", this.migrationContext.GetIteration(), time.Now().Unix()),
status,
)
writers = append(writers, os.Stdout)
w := io.MultiWriter(writers...)
fmt.Fprintln(w, status)
}

View File

@ -3,7 +3,7 @@
See https://github.com/github/gh-ost/blob/master/LICENSE
*/
package binlog
package mysql
import (
"testing"

View File

@ -11,7 +11,7 @@ import (
)
var (
renameColumnRegexp = regexp.MustCompile(`(?i)CHANGE\s+(column\s+|)([\S]+)\s+([\S]+)\s+`)
renameColumnRegexp = regexp.MustCompile(`(?i)change\s+(column\s+|)([\S]+)\s+([\S]+)\s+`)
)
type Parser struct {
@ -27,8 +27,12 @@ func NewParser() *Parser {
func (this *Parser) ParseAlterStatement(alterStatement string) (err error) {
allStringSubmatch := renameColumnRegexp.FindAllStringSubmatch(alterStatement, -1)
for _, submatch := range allStringSubmatch {
submatch[2], _ = strconv.Unquote(submatch[2])
submatch[3], _ = strconv.Unquote(submatch[3])
if unquoted, err := strconv.Unquote(submatch[2]); err == nil {
submatch[2] = unquoted
}
if unquoted, err := strconv.Unquote(submatch[3]); err == nil {
submatch[3] = unquoted
}
this.columnRenameMap[submatch[2]] = submatch[3]
}

68
go/sql/parser_test.go Normal file
View File

@ -0,0 +1,68 @@
/*
Copyright 2016 GitHub Inc.
See https://github.com/github/gh-ost/blob/master/LICENSE
*/
package sql
import (
"testing"
"github.com/outbrain/golib/log"
test "github.com/outbrain/golib/tests"
)
func init() {
log.SetLevel(log.ERROR)
}
func TestParseAlterStatement(t *testing.T) {
statement := "add column t int, engine=innodb"
parser := NewParser()
err := parser.ParseAlterStatement(statement)
test.S(t).ExpectNil(err)
test.S(t).ExpectFalse(parser.HasNonTrivialRenames())
}
func TestParseAlterStatementTrivialRename(t *testing.T) {
statement := "add column t int, change ts ts timestamp, engine=innodb"
parser := NewParser()
err := parser.ParseAlterStatement(statement)
test.S(t).ExpectNil(err)
test.S(t).ExpectFalse(parser.HasNonTrivialRenames())
test.S(t).ExpectEquals(len(parser.columnRenameMap), 1)
test.S(t).ExpectEquals(parser.columnRenameMap["ts"], "ts")
}
func TestParseAlterStatementTrivialRenames(t *testing.T) {
statement := "add column t int, change ts ts timestamp, CHANGE f `f` float, engine=innodb"
parser := NewParser()
err := parser.ParseAlterStatement(statement)
test.S(t).ExpectNil(err)
test.S(t).ExpectFalse(parser.HasNonTrivialRenames())
test.S(t).ExpectEquals(len(parser.columnRenameMap), 2)
test.S(t).ExpectEquals(parser.columnRenameMap["ts"], "ts")
test.S(t).ExpectEquals(parser.columnRenameMap["f"], "f")
}
func TestParseAlterStatementNonTrivial(t *testing.T) {
statements := []string{
`add column b bigint, change f fl float, change i count int, engine=innodb`,
"add column b bigint, change column `f` fl float, change `i` `count` int, engine=innodb",
"add column b bigint, change column `f` fl float, change `i` `count` int, change ts ts timestamp, engine=innodb",
`change
f fl float,
CHANGE COLUMN i
count int, engine=innodb`,
}
for _, statement := range statements {
parser := NewParser()
err := parser.ParseAlterStatement(statement)
test.S(t).ExpectNil(err)
renames := parser.GetNonTrivialRenames()
test.S(t).ExpectEquals(len(renames), 2)
test.S(t).ExpectEquals(renames["i"], "count")
test.S(t).ExpectEquals(renames["f"], "fl")
}
}