Merge pull request #73 from github/throttle-critical
Throttling & critical load
This commit is contained in:
commit
9197eedc64
2
build.sh
2
build.sh
@ -1,7 +1,7 @@
|
||||
#!/bin/bash
|
||||
#
|
||||
#
|
||||
RELEASE_VERSION="0.9.4"
|
||||
RELEASE_VERSION="0.9.5"
|
||||
|
||||
buildpath=/tmp/gh-ost
|
||||
target=gh-ost
|
||||
|
@ -7,7 +7,6 @@ package base
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
@ -65,9 +64,10 @@ type MigrationContext struct {
|
||||
ThrottleControlReplicaKeys *mysql.InstanceKeyMap
|
||||
ThrottleFlagFile string
|
||||
ThrottleAdditionalFlagFile string
|
||||
ThrottleQuery string
|
||||
ThrottleCommandedByUser int64
|
||||
maxLoad map[string]int64
|
||||
maxLoadMutex *sync.Mutex
|
||||
maxLoad LoadMap
|
||||
criticalLoad LoadMap
|
||||
PostponeCutOverFlagFile string
|
||||
SwapTablesTimeoutSeconds int64
|
||||
PanicFlagFile string
|
||||
@ -148,8 +148,8 @@ func newMigrationContext() *MigrationContext {
|
||||
ApplierConnectionConfig: mysql.NewConnectionConfig(),
|
||||
MaxLagMillisecondsThrottleThreshold: 1000,
|
||||
SwapTablesTimeoutSeconds: 3,
|
||||
maxLoad: make(map[string]int64),
|
||||
maxLoadMutex: &sync.Mutex{},
|
||||
maxLoad: NewLoadMap(),
|
||||
criticalLoad: NewLoadMap(),
|
||||
throttleMutex: &sync.Mutex{},
|
||||
ThrottleControlReplicaKeys: mysql.NewInstanceKeyMap(),
|
||||
configMutex: &sync.Mutex{},
|
||||
@ -278,46 +278,64 @@ func (this *MigrationContext) IsThrottled() (bool, string) {
|
||||
return this.isThrottled, this.throttleReason
|
||||
}
|
||||
|
||||
func (this *MigrationContext) GetMaxLoad() map[string]int64 {
|
||||
this.maxLoadMutex.Lock()
|
||||
defer this.maxLoadMutex.Unlock()
|
||||
func (this *MigrationContext) GetThrottleQuery() string {
|
||||
var query string
|
||||
|
||||
tmpMaxLoadMap := make(map[string]int64)
|
||||
for k, v := range this.maxLoad {
|
||||
tmpMaxLoadMap[k] = v
|
||||
}
|
||||
return tmpMaxLoadMap
|
||||
this.throttleMutex.Lock()
|
||||
defer this.throttleMutex.Unlock()
|
||||
|
||||
query = this.ThrottleQuery
|
||||
return query
|
||||
}
|
||||
|
||||
func (this *MigrationContext) SetThrottleQuery(newQuery string) {
|
||||
this.throttleMutex.Lock()
|
||||
defer this.throttleMutex.Unlock()
|
||||
|
||||
this.ThrottleQuery = newQuery
|
||||
}
|
||||
|
||||
func (this *MigrationContext) GetMaxLoad() LoadMap {
|
||||
this.throttleMutex.Lock()
|
||||
defer this.throttleMutex.Unlock()
|
||||
|
||||
return this.maxLoad.Duplicate()
|
||||
}
|
||||
|
||||
func (this *MigrationContext) GetCriticalLoad() LoadMap {
|
||||
this.throttleMutex.Lock()
|
||||
defer this.throttleMutex.Unlock()
|
||||
|
||||
return this.criticalLoad.Duplicate()
|
||||
}
|
||||
|
||||
// ReadMaxLoad parses the `--max-load` flag, which is in multiple key-value format,
|
||||
// such as: 'Threads_running=100,Threads_connected=500'
|
||||
// It only applies changes in case there's no parsing error.
|
||||
func (this *MigrationContext) ReadMaxLoad(maxLoadList string) error {
|
||||
if maxLoadList == "" {
|
||||
return nil
|
||||
loadMap, err := ParseLoadMap(maxLoadList)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
this.maxLoadMutex.Lock()
|
||||
defer this.maxLoadMutex.Unlock()
|
||||
this.throttleMutex.Lock()
|
||||
defer this.throttleMutex.Unlock()
|
||||
|
||||
tmpMaxLoadMap := make(map[string]int64)
|
||||
this.maxLoad = loadMap
|
||||
return nil
|
||||
}
|
||||
|
||||
maxLoadConditions := strings.Split(maxLoadList, ",")
|
||||
for _, maxLoadCondition := range maxLoadConditions {
|
||||
maxLoadTokens := strings.Split(maxLoadCondition, "=")
|
||||
if len(maxLoadTokens) != 2 {
|
||||
return fmt.Errorf("Error parsing max-load condition: %s", maxLoadCondition)
|
||||
}
|
||||
if maxLoadTokens[0] == "" {
|
||||
return fmt.Errorf("Error parsing status variable in max-load condition: %s", maxLoadCondition)
|
||||
}
|
||||
if n, err := strconv.ParseInt(maxLoadTokens[1], 10, 0); err != nil {
|
||||
return fmt.Errorf("Error parsing numeric value in max-load condition: %s", maxLoadCondition)
|
||||
} else {
|
||||
tmpMaxLoadMap[maxLoadTokens[0]] = n
|
||||
}
|
||||
// ReadMaxLoad parses the `--max-load` flag, which is in multiple key-value format,
|
||||
// such as: 'Threads_running=100,Threads_connected=500'
|
||||
// It only applies changes in case there's no parsing error.
|
||||
func (this *MigrationContext) ReadCriticalLoad(criticalLoadList string) error {
|
||||
loadMap, err := ParseLoadMap(criticalLoadList)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
this.throttleMutex.Lock()
|
||||
defer this.throttleMutex.Unlock()
|
||||
|
||||
this.maxLoad = tmpMaxLoadMap
|
||||
this.criticalLoad = loadMap
|
||||
return nil
|
||||
}
|
||||
|
||||
|
70
go/base/load_map.go
Normal file
70
go/base/load_map.go
Normal file
@ -0,0 +1,70 @@
|
||||
/*
|
||||
Copyright 2016 GitHub Inc.
|
||||
See https://github.com/github/gh-ost/blob/master/LICENSE
|
||||
*/
|
||||
|
||||
package base
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
)
|
||||
|
||||
// LoadMap is a mapping of status variable & threshold
|
||||
// e.g. [Threads_connected: 100, Threads_running: 50]
|
||||
type LoadMap map[string]int64
|
||||
|
||||
func NewLoadMap() LoadMap {
|
||||
result := make(map[string]int64)
|
||||
return result
|
||||
}
|
||||
|
||||
// NewLoadMap parses a `--*-load` flag (e.g. `--max-load`), which is in multiple
|
||||
// key-value format, such as:
|
||||
// 'Threads_running=100,Threads_connected=500'
|
||||
func ParseLoadMap(loadList string) (LoadMap, error) {
|
||||
result := NewLoadMap()
|
||||
if loadList == "" {
|
||||
return result, nil
|
||||
}
|
||||
|
||||
loadConditions := strings.Split(loadList, ",")
|
||||
for _, loadCondition := range loadConditions {
|
||||
loadTokens := strings.Split(loadCondition, "=")
|
||||
if len(loadTokens) != 2 {
|
||||
return result, fmt.Errorf("Error parsing load condition: %s", loadCondition)
|
||||
}
|
||||
if loadTokens[0] == "" {
|
||||
return result, fmt.Errorf("Error parsing status variable in load condition: %s", loadCondition)
|
||||
}
|
||||
if n, err := strconv.ParseInt(loadTokens[1], 10, 0); err != nil {
|
||||
return result, fmt.Errorf("Error parsing numeric value in load condition: %s", loadCondition)
|
||||
} else {
|
||||
result[loadTokens[0]] = n
|
||||
}
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// Duplicate creates a clone of this map
|
||||
func (this *LoadMap) Duplicate() LoadMap {
|
||||
dup := make(map[string]int64)
|
||||
for k, v := range *this {
|
||||
dup[k] = v
|
||||
}
|
||||
return dup
|
||||
}
|
||||
|
||||
// String() returns a string representation of this map
|
||||
func (this *LoadMap) String() string {
|
||||
tokens := []string{}
|
||||
for key, val := range *this {
|
||||
token := fmt.Sprintf("%s=%d", key, val)
|
||||
tokens = append(tokens, token)
|
||||
}
|
||||
sort.Strings(tokens)
|
||||
return strings.Join(tokens, ",")
|
||||
}
|
58
go/base/load_map_test.go
Normal file
58
go/base/load_map_test.go
Normal file
@ -0,0 +1,58 @@
|
||||
/*
|
||||
Copyright 2016 GitHub Inc.
|
||||
See https://github.com/github/gh-ost/blob/master/LICENSE
|
||||
*/
|
||||
|
||||
package base
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/outbrain/golib/log"
|
||||
test "github.com/outbrain/golib/tests"
|
||||
)
|
||||
|
||||
func init() {
|
||||
log.SetLevel(log.ERROR)
|
||||
}
|
||||
|
||||
func TestParseLoadMap(t *testing.T) {
|
||||
{
|
||||
loadList := ""
|
||||
m, err := ParseLoadMap(loadList)
|
||||
test.S(t).ExpectNil(err)
|
||||
test.S(t).ExpectEquals(len(m), 0)
|
||||
}
|
||||
{
|
||||
loadList := "threads_running=20,threads_connected=10"
|
||||
m, err := ParseLoadMap(loadList)
|
||||
test.S(t).ExpectNil(err)
|
||||
test.S(t).ExpectEquals(len(m), 2)
|
||||
test.S(t).ExpectEquals(m["threads_running"], int64(20))
|
||||
test.S(t).ExpectEquals(m["threads_connected"], int64(10))
|
||||
}
|
||||
{
|
||||
loadList := "threads_running=20=30,threads_connected=10"
|
||||
_, err := ParseLoadMap(loadList)
|
||||
test.S(t).ExpectNotNil(err)
|
||||
}
|
||||
{
|
||||
loadList := "threads_running=20,threads_connected"
|
||||
_, err := ParseLoadMap(loadList)
|
||||
test.S(t).ExpectNotNil(err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestString(t *testing.T) {
|
||||
{
|
||||
m, _ := ParseLoadMap("")
|
||||
s := m.String()
|
||||
test.S(t).ExpectEquals(s, "")
|
||||
}
|
||||
{
|
||||
loadList := "threads_running=20,threads_connected=10"
|
||||
m, _ := ParseLoadMap(loadList)
|
||||
s := m.String()
|
||||
test.S(t).ExpectEquals(s, "threads_connected=10,threads_running=20")
|
||||
}
|
||||
}
|
@ -33,10 +33,12 @@ func FileExists(fileName string) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
// StringContainsAll returns true if `s` contains all non empty given `substrings`
|
||||
// The function returns `false` if no non-empty arguments are given.
|
||||
func StringContainsAll(s string, substrings ...string) bool {
|
||||
nonEmptyStringsFound := false
|
||||
for _, substring := range substrings {
|
||||
if s == "" {
|
||||
if substring == "" {
|
||||
continue
|
||||
}
|
||||
if strings.Contains(s, substring) {
|
||||
|
29
go/base/utils_test.go
Normal file
29
go/base/utils_test.go
Normal file
@ -0,0 +1,29 @@
|
||||
/*
|
||||
Copyright 2016 GitHub Inc.
|
||||
See https://github.com/github/gh-ost/blob/master/LICENSE
|
||||
*/
|
||||
|
||||
package base
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/outbrain/golib/log"
|
||||
test "github.com/outbrain/golib/tests"
|
||||
)
|
||||
|
||||
func init() {
|
||||
log.SetLevel(log.ERROR)
|
||||
}
|
||||
|
||||
func TestStringContainsAll(t *testing.T) {
|
||||
s := `insert,delete,update`
|
||||
|
||||
test.S(t).ExpectFalse(StringContainsAll(s))
|
||||
test.S(t).ExpectFalse(StringContainsAll(s, ""))
|
||||
test.S(t).ExpectFalse(StringContainsAll(s, "drop"))
|
||||
test.S(t).ExpectTrue(StringContainsAll(s, "insert"))
|
||||
test.S(t).ExpectFalse(StringContainsAll(s, "insert", "drop"))
|
||||
test.S(t).ExpectTrue(StringContainsAll(s, "insert", ""))
|
||||
test.S(t).ExpectTrue(StringContainsAll(s, "insert", "update", "delete"))
|
||||
}
|
@ -1,226 +0,0 @@
|
||||
/*
|
||||
Copyright 2016 GitHub Inc.
|
||||
See https://github.com/github/gh-ost/blob/master/LICENSE
|
||||
*/
|
||||
|
||||
package binlog
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"fmt"
|
||||
"path"
|
||||
"regexp"
|
||||
"strconv"
|
||||
// "strings"
|
||||
|
||||
"github.com/github/gh-ost/go/os"
|
||||
"github.com/outbrain/golib/log"
|
||||
)
|
||||
|
||||
var (
|
||||
binlogChunkSizeBytes uint64 = 32 * 1024 * 1024
|
||||
startEntryRegexp = regexp.MustCompile("^# at ([0-9]+)$")
|
||||
startEntryUnknownTableRegexp = regexp.MustCompile("^### Row event for unknown table .*? at ([0-9]+)$")
|
||||
endLogPosRegexp = regexp.MustCompile("^#[0-9]{6} .*? end_log_pos ([0-9]+)")
|
||||
statementRegxp = regexp.MustCompile("### (INSERT INTO|UPDATE|DELETE FROM) `(.*?)`[.]`(.*?)`")
|
||||
tokenRegxp = regexp.MustCompile("### (WHERE|SET)$")
|
||||
positionalColumnRegexp = regexp.MustCompile("### @([0-9]+)=(.+)$")
|
||||
)
|
||||
|
||||
// BinlogEntryState is a state in the binlog parser automaton / state machine
|
||||
type BinlogEntryState string
|
||||
|
||||
// States of the state machine
|
||||
const (
|
||||
InvalidState BinlogEntryState = "InvalidState"
|
||||
SearchForStartPosOrStatementState = "SearchForStartPosOrStatementState"
|
||||
ExpectEndLogPosState = "ExpectEndLogPosState"
|
||||
ExpectTokenState = "ExpectTokenState"
|
||||
PositionalColumnAssignmentState = "PositionalColumnAssignmentState"
|
||||
)
|
||||
|
||||
// MySQLBinlogReader reads binary log entries by executing the `mysqlbinlog`
|
||||
// process and textually parsing its output
|
||||
type MySQLBinlogReader struct {
|
||||
Basedir string
|
||||
Datadir string
|
||||
MySQLBinlogBinary string
|
||||
}
|
||||
|
||||
// NewMySQLBinlogReader creates a new reader that directly parses binlog files from the filesystem
|
||||
func NewMySQLBinlogReader(basedir string, datadir string) (mySQLBinlogReader *MySQLBinlogReader) {
|
||||
mySQLBinlogReader = &MySQLBinlogReader{
|
||||
Basedir: basedir,
|
||||
Datadir: datadir,
|
||||
}
|
||||
mySQLBinlogReader.MySQLBinlogBinary = path.Join(mySQLBinlogReader.Basedir, "bin/mysqlbinlog")
|
||||
return mySQLBinlogReader
|
||||
}
|
||||
|
||||
// ReadEntries will read binlog entries from parsed text output of `mysqlbinlog` utility
|
||||
func (this *MySQLBinlogReader) ReadEntries(logFile string, startPos uint64, stopPos uint64) (entries [](*BinlogEntry), err error) {
|
||||
if startPos == 0 {
|
||||
startPos = 4
|
||||
}
|
||||
done := false
|
||||
chunkStartPos := startPos
|
||||
for !done {
|
||||
chunkStopPos := chunkStartPos + binlogChunkSizeBytes
|
||||
if chunkStopPos > stopPos && stopPos != 0 {
|
||||
chunkStopPos = stopPos
|
||||
}
|
||||
log.Debugf("Next chunk range %d - %d", chunkStartPos, chunkStopPos)
|
||||
binlogFilePath := path.Join(this.Datadir, logFile)
|
||||
command := fmt.Sprintf(`%s --verbose --base64-output=DECODE-ROWS --start-position=%d --stop-position=%d %s`, this.MySQLBinlogBinary, chunkStartPos, chunkStopPos, binlogFilePath)
|
||||
entriesBytes, err := os.RunCommandWithOutput(command)
|
||||
if err != nil {
|
||||
return entries, log.Errore(err)
|
||||
}
|
||||
|
||||
chunkEntries, err := parseEntries(bufio.NewScanner(bytes.NewReader(entriesBytes)), logFile)
|
||||
if err != nil {
|
||||
return entries, log.Errore(err)
|
||||
}
|
||||
|
||||
if len(chunkEntries) == 0 {
|
||||
done = true
|
||||
} else {
|
||||
entries = append(entries, chunkEntries...)
|
||||
lastChunkEntry := chunkEntries[len(chunkEntries)-1]
|
||||
chunkStartPos = lastChunkEntry.EndLogPos
|
||||
}
|
||||
}
|
||||
return entries, err
|
||||
}
|
||||
|
||||
// automaton step: accept wither beginning of new entry, or beginning of new statement
|
||||
func searchForStartPosOrStatement(scanner *bufio.Scanner, binlogEntry *BinlogEntry, previousEndLogPos uint64) (nextState BinlogEntryState, nextBinlogEntry *BinlogEntry, err error) {
|
||||
onStartEntry := func(submatch []string) (BinlogEntryState, *BinlogEntry, error) {
|
||||
startLogPos, _ := strconv.ParseUint(submatch[1], 10, 64)
|
||||
|
||||
if previousEndLogPos != 0 && startLogPos != previousEndLogPos {
|
||||
return InvalidState, binlogEntry, fmt.Errorf("Expected startLogPos %+v to equal previous endLogPos %+v", startLogPos, previousEndLogPos)
|
||||
}
|
||||
nextBinlogEntry = binlogEntry
|
||||
if binlogEntry.Coordinates.LogPos != 0 && binlogEntry.DmlEvent != nil {
|
||||
// Current entry is already a true entry, with startpos and with statement
|
||||
nextBinlogEntry = NewBinlogEntry(binlogEntry.Coordinates.LogFile, startLogPos)
|
||||
}
|
||||
return ExpectEndLogPosState, nextBinlogEntry, nil
|
||||
}
|
||||
|
||||
onStatementEntry := func(submatch []string) (BinlogEntryState, *BinlogEntry, error) {
|
||||
nextBinlogEntry = binlogEntry
|
||||
if binlogEntry.Coordinates.LogPos != 0 && binlogEntry.DmlEvent != nil {
|
||||
// Current entry is already a true entry, with startpos and with statement
|
||||
nextBinlogEntry = binlogEntry.Duplicate()
|
||||
}
|
||||
nextBinlogEntry.DmlEvent = NewBinlogDMLEvent(submatch[2], submatch[3], ToEventDML(submatch[1]))
|
||||
|
||||
return ExpectTokenState, nextBinlogEntry, nil
|
||||
}
|
||||
|
||||
// Defuncting the following:
|
||||
|
||||
// onPositionalColumn := func(submatch []string) (BinlogEntryState, *BinlogEntry, error) {
|
||||
// columnIndex, _ := strconv.ParseUint(submatch[1], 10, 64)
|
||||
// if _, found := binlogEntry.PositionalColumns[columnIndex]; found {
|
||||
// return InvalidState, binlogEntry, fmt.Errorf("Positional column %+v found more than once in %+v, statement=%+v", columnIndex, binlogEntry.LogPos, binlogEntry.DmlEvent.DML)
|
||||
// }
|
||||
// columnValue := submatch[2]
|
||||
// columnValue = strings.TrimPrefix(columnValue, "'")
|
||||
// columnValue = strings.TrimSuffix(columnValue, "'")
|
||||
// binlogEntry.PositionalColumns[columnIndex] = columnValue
|
||||
//
|
||||
// return SearchForStartPosOrStatementState, binlogEntry, nil
|
||||
// }
|
||||
|
||||
line := scanner.Text()
|
||||
if submatch := startEntryRegexp.FindStringSubmatch(line); len(submatch) > 1 {
|
||||
return onStartEntry(submatch)
|
||||
}
|
||||
if submatch := startEntryUnknownTableRegexp.FindStringSubmatch(line); len(submatch) > 1 {
|
||||
return onStartEntry(submatch)
|
||||
}
|
||||
if submatch := statementRegxp.FindStringSubmatch(line); len(submatch) > 1 {
|
||||
return onStatementEntry(submatch)
|
||||
}
|
||||
if submatch := positionalColumnRegexp.FindStringSubmatch(line); len(submatch) > 1 {
|
||||
// Defuncting return onPositionalColumn(submatch)
|
||||
}
|
||||
// Haven't found a match
|
||||
return SearchForStartPosOrStatementState, binlogEntry, nil
|
||||
}
|
||||
|
||||
// automaton step: expect an end_log_pos line`
|
||||
func expectEndLogPos(scanner *bufio.Scanner, binlogEntry *BinlogEntry) (nextState BinlogEntryState, err error) {
|
||||
line := scanner.Text()
|
||||
|
||||
submatch := endLogPosRegexp.FindStringSubmatch(line)
|
||||
if len(submatch) > 1 {
|
||||
binlogEntry.EndLogPos, _ = strconv.ParseUint(submatch[1], 10, 64)
|
||||
return SearchForStartPosOrStatementState, nil
|
||||
}
|
||||
return InvalidState, fmt.Errorf("Expected to find end_log_pos following pos %+v", binlogEntry.Coordinates.LogPos)
|
||||
}
|
||||
|
||||
// automaton step: a not-strictly-required but good-to-have-around validation that
|
||||
// we see an expected token following a statement
|
||||
func expectToken(scanner *bufio.Scanner, binlogEntry *BinlogEntry) (nextState BinlogEntryState, err error) {
|
||||
line := scanner.Text()
|
||||
if submatch := tokenRegxp.FindStringSubmatch(line); len(submatch) > 1 {
|
||||
return SearchForStartPosOrStatementState, nil
|
||||
}
|
||||
return InvalidState, fmt.Errorf("Expected to find token following pos %+v", binlogEntry.Coordinates.LogPos)
|
||||
}
|
||||
|
||||
// parseEntries will parse output of `mysqlbinlog --verbose --base64-output=DECODE-ROWS`
|
||||
// It issues an automaton / state machine to do its thang.
|
||||
func parseEntries(scanner *bufio.Scanner, logFile string) (entries [](*BinlogEntry), err error) {
|
||||
binlogEntry := NewBinlogEntry(logFile, 0)
|
||||
var state BinlogEntryState = SearchForStartPosOrStatementState
|
||||
var endLogPos uint64
|
||||
|
||||
appendBinlogEntry := func() {
|
||||
if binlogEntry.Coordinates.LogPos == 0 {
|
||||
return
|
||||
}
|
||||
if binlogEntry.DmlEvent == nil {
|
||||
return
|
||||
}
|
||||
entries = append(entries, binlogEntry)
|
||||
log.Debugf("entry: %+v", *binlogEntry)
|
||||
fmt.Println(fmt.Sprintf("%s `%s`.`%s`", binlogEntry.DmlEvent.DML, binlogEntry.DmlEvent.DatabaseName, binlogEntry.DmlEvent.TableName))
|
||||
}
|
||||
for scanner.Scan() {
|
||||
switch state {
|
||||
case SearchForStartPosOrStatementState:
|
||||
{
|
||||
var nextBinlogEntry *BinlogEntry
|
||||
state, nextBinlogEntry, err = searchForStartPosOrStatement(scanner, binlogEntry, endLogPos)
|
||||
if nextBinlogEntry != binlogEntry {
|
||||
appendBinlogEntry()
|
||||
binlogEntry = nextBinlogEntry
|
||||
}
|
||||
}
|
||||
case ExpectEndLogPosState:
|
||||
{
|
||||
state, err = expectEndLogPos(scanner, binlogEntry)
|
||||
}
|
||||
case ExpectTokenState:
|
||||
{
|
||||
state, err = expectToken(scanner, binlogEntry)
|
||||
}
|
||||
default:
|
||||
{
|
||||
err = fmt.Errorf("Unexpected state %+v", state)
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
return entries, log.Errore(err)
|
||||
}
|
||||
}
|
||||
appendBinlogEntry()
|
||||
return entries, err
|
||||
}
|
@ -1,50 +0,0 @@
|
||||
/*
|
||||
Copyright 2016 GitHub Inc.
|
||||
See https://github.com/github/gh-ost/blob/master/LICENSE
|
||||
*/
|
||||
|
||||
package binlog
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
"github.com/outbrain/golib/log"
|
||||
test "github.com/outbrain/golib/tests"
|
||||
)
|
||||
|
||||
func init() {
|
||||
log.SetLevel(log.ERROR)
|
||||
}
|
||||
|
||||
func __TestRBRSample0(t *testing.T) {
|
||||
testFile, err := os.Open("testdata/rbr-sample-0.txt")
|
||||
test.S(t).ExpectNil(err)
|
||||
defer testFile.Close()
|
||||
|
||||
scanner := bufio.NewScanner(testFile)
|
||||
entries, err := parseEntries(scanner)
|
||||
test.S(t).ExpectNil(err)
|
||||
|
||||
test.S(t).ExpectEquals(len(entries), 17)
|
||||
test.S(t).ExpectEquals(entries[0].DatabaseName, "test")
|
||||
test.S(t).ExpectEquals(entries[0].TableName, "samplet")
|
||||
test.S(t).ExpectEquals(entries[0].StatementType, "INSERT")
|
||||
test.S(t).ExpectEquals(entries[1].StatementType, "INSERT")
|
||||
test.S(t).ExpectEquals(entries[2].StatementType, "INSERT")
|
||||
test.S(t).ExpectEquals(entries[3].StatementType, "INSERT")
|
||||
test.S(t).ExpectEquals(entries[4].StatementType, "INSERT")
|
||||
test.S(t).ExpectEquals(entries[5].StatementType, "INSERT")
|
||||
test.S(t).ExpectEquals(entries[6].StatementType, "UPDATE")
|
||||
test.S(t).ExpectEquals(entries[7].StatementType, "DELETE")
|
||||
test.S(t).ExpectEquals(entries[8].StatementType, "UPDATE")
|
||||
test.S(t).ExpectEquals(entries[9].StatementType, "INSERT")
|
||||
test.S(t).ExpectEquals(entries[10].StatementType, "INSERT")
|
||||
test.S(t).ExpectEquals(entries[11].StatementType, "DELETE")
|
||||
test.S(t).ExpectEquals(entries[12].StatementType, "DELETE")
|
||||
test.S(t).ExpectEquals(entries[13].StatementType, "INSERT")
|
||||
test.S(t).ExpectEquals(entries[14].StatementType, "UPDATE")
|
||||
test.S(t).ExpectEquals(entries[15].StatementType, "DELETE")
|
||||
test.S(t).ExpectEquals(entries[16].StatementType, "INSERT")
|
||||
}
|
@ -73,6 +73,7 @@ func main() {
|
||||
flag.Int64Var(&migrationContext.MaxLagMillisecondsThrottleThreshold, "max-lag-millis", 1500, "replication lag at which to throttle operation")
|
||||
flag.StringVar(&migrationContext.ReplictionLagQuery, "replication-lag-query", "", "Query that detects replication lag in seconds. Result can be a floating point (by default gh-ost issues SHOW SLAVE STATUS and reads Seconds_behind_master). If you're using pt-heartbeat, query would be something like: SELECT ROUND(UNIX_TIMESTAMP() - MAX(UNIX_TIMESTAMP(ts))) AS delay FROM my_schema.heartbeat")
|
||||
throttleControlReplicas := flag.String("throttle-control-replicas", "", "List of replicas on which to check for lag; comma delimited. Example: myhost1.com:3306,myhost2.com,myhost3.com:3307")
|
||||
flag.StringVar(&migrationContext.ThrottleQuery, "throttle-query", "", "when given, issued (every second) to check if operation should throttle. Expecting to return zero for no-throttle, >0 for throttle. Query is issued on the migrated server. Make sure this query is lightweight")
|
||||
flag.StringVar(&migrationContext.ThrottleFlagFile, "throttle-flag-file", "", "operation pauses when this file exists; hint: use a file that is specific to the table being altered")
|
||||
flag.StringVar(&migrationContext.ThrottleAdditionalFlagFile, "throttle-additional-flag-file", "/tmp/gh-ost.throttle", "operation pauses when this file exists; hint: keep default, use for throttling multiple gh-ost operations")
|
||||
flag.StringVar(&migrationContext.PostponeCutOverFlagFile, "postpone-cut-over-flag-file", "", "while this file exists, migration will postpone the final stage of swapping tables, and will keep on syncing the ghost table. Cut-over/swapping would be ready to perform the moment the file is deleted.")
|
||||
@ -81,7 +82,8 @@ func main() {
|
||||
flag.StringVar(&migrationContext.ServeSocketFile, "serve-socket-file", "", "Unix socket file to serve on. Default: auto-determined and advertised upon startup")
|
||||
flag.Int64Var(&migrationContext.ServeTCPPort, "serve-tcp-port", 0, "TCP port to serve on. Default: disabled")
|
||||
|
||||
maxLoad := flag.String("max-load", "", "Comma delimited status-name=threshold. e.g: 'Threads_running=100,Threads_connected=500'")
|
||||
maxLoad := flag.String("max-load", "", "Comma delimited status-name=threshold. e.g: 'Threads_running=100,Threads_connected=500'. When status exceeds threshold, app throttles writes")
|
||||
criticalLoad := flag.String("critical-load", "", "Comma delimited status-name=threshold, same format as `--max-load`. When status exceeds threshold, app panics and quits")
|
||||
quiet := flag.Bool("quiet", false, "quiet")
|
||||
verbose := flag.Bool("verbose", false, "verbose")
|
||||
debug := flag.Bool("debug", false, "debug mode (very verbose)")
|
||||
@ -156,6 +158,9 @@ func main() {
|
||||
if err := migrationContext.ReadMaxLoad(*maxLoad); err != nil {
|
||||
log.Fatale(err)
|
||||
}
|
||||
if err := migrationContext.ReadCriticalLoad(*criticalLoad); err != nil {
|
||||
log.Fatale(err)
|
||||
}
|
||||
if migrationContext.ServeSocketFile == "" {
|
||||
migrationContext.ServeSocketFile = fmt.Sprintf("/tmp/gh-ost.%s.%s.sock", migrationContext.DatabaseName, migrationContext.OriginalTableName)
|
||||
}
|
||||
|
@ -263,6 +263,19 @@ func (this *Applier) InitiateHeartbeat(heartbeatIntervalMilliseconds int64) {
|
||||
}
|
||||
}
|
||||
|
||||
func (this *Applier) ExecuteThrottleQuery() (int64, error) {
|
||||
throttleQuery := this.migrationContext.GetThrottleQuery()
|
||||
|
||||
if throttleQuery == "" {
|
||||
return 0, nil
|
||||
}
|
||||
var result int64
|
||||
if err := this.db.QueryRow(throttleQuery).Scan(&result); err != nil {
|
||||
return 0, log.Errore(err)
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// ReadMigrationMinValues
|
||||
func (this *Applier) ReadMigrationMinValues(uniqueKey *sql.UniqueKey) error {
|
||||
log.Debugf("Reading migration range according to key: %s", uniqueKey.Name)
|
||||
|
@ -40,6 +40,14 @@ const (
|
||||
heartbeatIntervalMilliseconds = 1000
|
||||
)
|
||||
|
||||
type PrintStatusRule int
|
||||
|
||||
const (
|
||||
HeuristicPrintStatusRule PrintStatusRule = iota
|
||||
ForcePrintStatusRule = iota
|
||||
ForcePrintStatusAndHint = iota
|
||||
)
|
||||
|
||||
// Migrator is the main schema migration flow manager.
|
||||
type Migrator struct {
|
||||
parser *sql.Parser
|
||||
@ -106,6 +114,17 @@ func (this *Migrator) shouldThrottle() (result bool, reason string) {
|
||||
this.panicAbort <- fmt.Errorf("Found panic-file %s. Aborting without cleanup", this.migrationContext.PanicFlagFile)
|
||||
}
|
||||
}
|
||||
criticalLoad := this.migrationContext.GetCriticalLoad()
|
||||
for variableName, threshold := range criticalLoad {
|
||||
value, err := this.applier.ShowStatusVariable(variableName)
|
||||
if err != nil {
|
||||
return true, fmt.Sprintf("%s %s", variableName, err)
|
||||
}
|
||||
if value >= threshold {
|
||||
this.panicAbort <- fmt.Errorf("critical-load met: %s=%d, >=%d", variableName, value, threshold)
|
||||
}
|
||||
}
|
||||
|
||||
// Back to throttle considerations
|
||||
|
||||
// User-based throttle
|
||||
@ -145,8 +164,13 @@ func (this *Migrator) shouldThrottle() (result bool, reason string) {
|
||||
if err != nil {
|
||||
return true, fmt.Sprintf("%s %s", variableName, err)
|
||||
}
|
||||
if value > threshold {
|
||||
return true, fmt.Sprintf("%s=%d", variableName, value)
|
||||
if value >= threshold {
|
||||
return true, fmt.Sprintf("max-load %s=%d >= %d", variableName, value, threshold)
|
||||
}
|
||||
}
|
||||
if this.migrationContext.GetThrottleQuery() != "" {
|
||||
if res, _ := this.applier.ExecuteThrottleQuery(); res > 0 {
|
||||
return true, "throttle-query"
|
||||
}
|
||||
}
|
||||
|
||||
@ -311,6 +335,7 @@ func (this *Migrator) validateStatement() (err error) {
|
||||
if !this.migrationContext.ApproveRenamedColumns {
|
||||
return fmt.Errorf("Alter statement has column(s) renamed. gh-ost suspects the following renames: %v; but to proceed you must approve via `--approve-renamed-columns` (or you can skip renamed columns via `--skip-renamed-columns`)", this.parser.GetNonTrivialRenames())
|
||||
}
|
||||
log.Infof("Alter statement has column(s) renamed. gh-ost finds the following renames: %v; --approve-renamed-columns is given and so migration proceeds.", this.parser.GetNonTrivialRenames())
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@ -375,7 +400,7 @@ func (this *Migrator) Migrate() (err error) {
|
||||
log.Debugf("Operating until row copy is complete")
|
||||
this.consumeRowCopyComplete()
|
||||
log.Infof("Row copy complete")
|
||||
this.printStatus()
|
||||
this.printStatus(ForcePrintStatusRule)
|
||||
|
||||
if err := this.cutOver(); err != nil {
|
||||
return err
|
||||
@ -459,8 +484,7 @@ func (this *Migrator) waitForEventsUpToLock() (err error) {
|
||||
waitForEventsUpToLockDuration := time.Now().Sub(waitForEventsUpToLockStartTime)
|
||||
|
||||
log.Infof("Done waiting for events up to lock; duration=%+v", waitForEventsUpToLockDuration)
|
||||
this.printMigrationStatusHint()
|
||||
this.printStatus()
|
||||
this.printStatus(ForcePrintStatusAndHint)
|
||||
|
||||
return nil
|
||||
}
|
||||
@ -621,17 +645,18 @@ func (this *Migrator) onServerCommand(command string, writer *bufio.Writer) (err
|
||||
case "help":
|
||||
{
|
||||
fmt.Fprintln(writer, `available commands:
|
||||
status # Print a status message
|
||||
chunk-size=<newsize> # Set a new chunk-size
|
||||
max-load=<maxload> # Set a new set of max-load thresholds
|
||||
throttle # Force throttling
|
||||
no-throttle # End forced throttling (other throttling may still apply)
|
||||
help # This message
|
||||
status # Print a status message
|
||||
chunk-size=<newsize> # Set a new chunk-size
|
||||
critical-load=<load> # Set a new set of max-load thresholds
|
||||
max-load=<load> # Set a new set of max-load thresholds
|
||||
throttle-query=<query> # Set a new throttle-query
|
||||
throttle # Force throttling
|
||||
no-throttle # End forced throttling (other throttling may still apply)
|
||||
help # This message
|
||||
`)
|
||||
}
|
||||
case "info", "status":
|
||||
this.printMigrationStatusHint(writer)
|
||||
this.printStatus(writer)
|
||||
this.printStatus(ForcePrintStatusAndHint, writer)
|
||||
case "chunk-size":
|
||||
{
|
||||
if chunkSize, err := strconv.Atoi(arg); err != nil {
|
||||
@ -639,7 +664,7 @@ func (this *Migrator) onServerCommand(command string, writer *bufio.Writer) (err
|
||||
return log.Errore(err)
|
||||
} else {
|
||||
this.migrationContext.SetChunkSize(int64(chunkSize))
|
||||
this.printMigrationStatusHint(writer)
|
||||
this.printStatus(ForcePrintStatusAndHint, writer)
|
||||
}
|
||||
}
|
||||
case "max-load":
|
||||
@ -648,7 +673,20 @@ func (this *Migrator) onServerCommand(command string, writer *bufio.Writer) (err
|
||||
fmt.Fprintf(writer, "%s\n", err.Error())
|
||||
return log.Errore(err)
|
||||
}
|
||||
this.printMigrationStatusHint(writer)
|
||||
this.printStatus(ForcePrintStatusAndHint, writer)
|
||||
}
|
||||
case "critical-load":
|
||||
{
|
||||
if err := this.migrationContext.ReadCriticalLoad(arg); err != nil {
|
||||
fmt.Fprintf(writer, "%s\n", err.Error())
|
||||
return log.Errore(err)
|
||||
}
|
||||
this.printStatus(ForcePrintStatusAndHint, writer)
|
||||
}
|
||||
case "throttle-query":
|
||||
{
|
||||
this.migrationContext.SetThrottleQuery(arg)
|
||||
this.printStatus(ForcePrintStatusAndHint, writer)
|
||||
}
|
||||
case "throttle", "pause", "suspend":
|
||||
{
|
||||
@ -715,17 +753,16 @@ func (this *Migrator) initiateInspector() (err error) {
|
||||
}
|
||||
|
||||
func (this *Migrator) initiateStatus() error {
|
||||
this.printStatus()
|
||||
this.printStatus(ForcePrintStatusAndHint)
|
||||
statusTick := time.Tick(1 * time.Second)
|
||||
for range statusTick {
|
||||
go this.printStatus()
|
||||
go this.printStatus(HeuristicPrintStatusRule)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (this *Migrator) printMigrationStatusHint(writers ...io.Writer) {
|
||||
writers = append(writers, os.Stdout)
|
||||
w := io.MultiWriter(writers...)
|
||||
fmt.Fprintln(w, fmt.Sprintf("# Migrating %s.%s; Ghost table is %s.%s",
|
||||
sql.EscapeName(this.migrationContext.DatabaseName),
|
||||
@ -737,10 +774,12 @@ func (this *Migrator) printMigrationStatusHint(writers ...io.Writer) {
|
||||
this.migrationContext.StartTime.Format(time.RubyDate),
|
||||
))
|
||||
maxLoad := this.migrationContext.GetMaxLoad()
|
||||
fmt.Fprintln(w, fmt.Sprintf("# chunk-size: %+v; max lag: %+vms; max-load: %+v",
|
||||
criticalLoad := this.migrationContext.GetCriticalLoad()
|
||||
fmt.Fprintln(w, fmt.Sprintf("# chunk-size: %+v; max lag: %+vms; max-load: %s; critical-load: %s",
|
||||
atomic.LoadInt64(&this.migrationContext.ChunkSize),
|
||||
atomic.LoadInt64(&this.migrationContext.MaxLagMillisecondsThrottleThreshold),
|
||||
maxLoad,
|
||||
maxLoad.String(),
|
||||
criticalLoad.String(),
|
||||
))
|
||||
if this.migrationContext.ThrottleFlagFile != "" {
|
||||
fmt.Fprintln(w, fmt.Sprintf("# Throttle flag file: %+v",
|
||||
@ -752,6 +791,11 @@ func (this *Migrator) printMigrationStatusHint(writers ...io.Writer) {
|
||||
this.migrationContext.ThrottleAdditionalFlagFile,
|
||||
))
|
||||
}
|
||||
if throttleQuery := this.migrationContext.GetThrottleQuery(); throttleQuery != "" {
|
||||
fmt.Fprintln(w, fmt.Sprintf("# Throttle query: %+v",
|
||||
throttleQuery,
|
||||
))
|
||||
}
|
||||
if this.migrationContext.PostponeCutOverFlagFile != "" {
|
||||
fmt.Fprintln(w, fmt.Sprintf("# Postpone cut-over flag file: %+v",
|
||||
this.migrationContext.PostponeCutOverFlagFile,
|
||||
@ -770,7 +814,9 @@ func (this *Migrator) printMigrationStatusHint(writers ...io.Writer) {
|
||||
}
|
||||
}
|
||||
|
||||
func (this *Migrator) printStatus(writers ...io.Writer) {
|
||||
func (this *Migrator) printStatus(rule PrintStatusRule, writers ...io.Writer) {
|
||||
writers = append(writers, os.Stdout)
|
||||
|
||||
elapsedTime := this.migrationContext.ElapsedTime()
|
||||
elapsedSeconds := int64(elapsedTime.Seconds())
|
||||
totalRowsCopied := this.migrationContext.GetTotalRowsCopied()
|
||||
@ -782,8 +828,11 @@ func (this *Migrator) printStatus(writers ...io.Writer) {
|
||||
|
||||
// Before status, let's see if we should print a nice reminder for what exactly we're doing here.
|
||||
shouldPrintMigrationStatusHint := (elapsedSeconds%600 == 0)
|
||||
if rule == ForcePrintStatusAndHint {
|
||||
shouldPrintMigrationStatusHint = true
|
||||
}
|
||||
if shouldPrintMigrationStatusHint {
|
||||
this.printMigrationStatusHint()
|
||||
this.printMigrationStatusHint(writers...)
|
||||
}
|
||||
|
||||
var etaSeconds float64 = math.MaxFloat64
|
||||
@ -820,6 +869,9 @@ func (this *Migrator) printStatus(writers ...io.Writer) {
|
||||
} else {
|
||||
shouldPrintStatus = (elapsedSeconds%30 == 0)
|
||||
}
|
||||
if rule == ForcePrintStatusRule || rule == ForcePrintStatusAndHint {
|
||||
shouldPrintStatus = true
|
||||
}
|
||||
if !shouldPrintStatus {
|
||||
return
|
||||
}
|
||||
@ -838,7 +890,6 @@ func (this *Migrator) printStatus(writers ...io.Writer) {
|
||||
fmt.Sprintf("copy iteration %d at %d", this.migrationContext.GetIteration(), time.Now().Unix()),
|
||||
status,
|
||||
)
|
||||
writers = append(writers, os.Stdout)
|
||||
w := io.MultiWriter(writers...)
|
||||
fmt.Fprintln(w, status)
|
||||
}
|
||||
|
@ -3,7 +3,7 @@
|
||||
See https://github.com/github/gh-ost/blob/master/LICENSE
|
||||
*/
|
||||
|
||||
package binlog
|
||||
package mysql
|
||||
|
||||
import (
|
||||
"testing"
|
@ -11,7 +11,7 @@ import (
|
||||
)
|
||||
|
||||
var (
|
||||
renameColumnRegexp = regexp.MustCompile(`(?i)CHANGE\s+(column\s+|)([\S]+)\s+([\S]+)\s+`)
|
||||
renameColumnRegexp = regexp.MustCompile(`(?i)change\s+(column\s+|)([\S]+)\s+([\S]+)\s+`)
|
||||
)
|
||||
|
||||
type Parser struct {
|
||||
@ -27,8 +27,12 @@ func NewParser() *Parser {
|
||||
func (this *Parser) ParseAlterStatement(alterStatement string) (err error) {
|
||||
allStringSubmatch := renameColumnRegexp.FindAllStringSubmatch(alterStatement, -1)
|
||||
for _, submatch := range allStringSubmatch {
|
||||
submatch[2], _ = strconv.Unquote(submatch[2])
|
||||
submatch[3], _ = strconv.Unquote(submatch[3])
|
||||
if unquoted, err := strconv.Unquote(submatch[2]); err == nil {
|
||||
submatch[2] = unquoted
|
||||
}
|
||||
if unquoted, err := strconv.Unquote(submatch[3]); err == nil {
|
||||
submatch[3] = unquoted
|
||||
}
|
||||
|
||||
this.columnRenameMap[submatch[2]] = submatch[3]
|
||||
}
|
||||
|
68
go/sql/parser_test.go
Normal file
68
go/sql/parser_test.go
Normal file
@ -0,0 +1,68 @@
|
||||
/*
|
||||
Copyright 2016 GitHub Inc.
|
||||
See https://github.com/github/gh-ost/blob/master/LICENSE
|
||||
*/
|
||||
|
||||
package sql
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/outbrain/golib/log"
|
||||
test "github.com/outbrain/golib/tests"
|
||||
)
|
||||
|
||||
func init() {
|
||||
log.SetLevel(log.ERROR)
|
||||
}
|
||||
|
||||
func TestParseAlterStatement(t *testing.T) {
|
||||
statement := "add column t int, engine=innodb"
|
||||
parser := NewParser()
|
||||
err := parser.ParseAlterStatement(statement)
|
||||
test.S(t).ExpectNil(err)
|
||||
test.S(t).ExpectFalse(parser.HasNonTrivialRenames())
|
||||
}
|
||||
|
||||
func TestParseAlterStatementTrivialRename(t *testing.T) {
|
||||
statement := "add column t int, change ts ts timestamp, engine=innodb"
|
||||
parser := NewParser()
|
||||
err := parser.ParseAlterStatement(statement)
|
||||
test.S(t).ExpectNil(err)
|
||||
test.S(t).ExpectFalse(parser.HasNonTrivialRenames())
|
||||
test.S(t).ExpectEquals(len(parser.columnRenameMap), 1)
|
||||
test.S(t).ExpectEquals(parser.columnRenameMap["ts"], "ts")
|
||||
}
|
||||
|
||||
func TestParseAlterStatementTrivialRenames(t *testing.T) {
|
||||
statement := "add column t int, change ts ts timestamp, CHANGE f `f` float, engine=innodb"
|
||||
parser := NewParser()
|
||||
err := parser.ParseAlterStatement(statement)
|
||||
test.S(t).ExpectNil(err)
|
||||
test.S(t).ExpectFalse(parser.HasNonTrivialRenames())
|
||||
test.S(t).ExpectEquals(len(parser.columnRenameMap), 2)
|
||||
test.S(t).ExpectEquals(parser.columnRenameMap["ts"], "ts")
|
||||
test.S(t).ExpectEquals(parser.columnRenameMap["f"], "f")
|
||||
}
|
||||
|
||||
func TestParseAlterStatementNonTrivial(t *testing.T) {
|
||||
statements := []string{
|
||||
`add column b bigint, change f fl float, change i count int, engine=innodb`,
|
||||
"add column b bigint, change column `f` fl float, change `i` `count` int, engine=innodb",
|
||||
"add column b bigint, change column `f` fl float, change `i` `count` int, change ts ts timestamp, engine=innodb",
|
||||
`change
|
||||
f fl float,
|
||||
CHANGE COLUMN i
|
||||
count int, engine=innodb`,
|
||||
}
|
||||
|
||||
for _, statement := range statements {
|
||||
parser := NewParser()
|
||||
err := parser.ParseAlterStatement(statement)
|
||||
test.S(t).ExpectNil(err)
|
||||
renames := parser.GetNonTrivialRenames()
|
||||
test.S(t).ExpectEquals(len(renames), 2)
|
||||
test.S(t).ExpectEquals(renames["i"], "count")
|
||||
test.S(t).ExpectEquals(renames["f"], "fl")
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user