From 23cb8ea7e9f1e70ed25ac53406ab9e2623415c9d Mon Sep 17 00:00:00 2001 From: Shlomi Noach Date: Sat, 18 Jun 2016 21:12:07 +0200 Subject: [PATCH] Throttling & critical load - Added `--throttle-query` param (when returns > 0, throttling applies) - Added `--critical-load`, similar to `--max-load` but implies panic and quit - Recoded *-load as `LoadMap` - More info on *-load throttle/panic - `printStatus()` now gets printing heuristic. Always shows up on interactive `"status"` - Fixed `change column` (aka rename) handling with quotes - Removed legacy `mysqlbinlog` parser code - Added tests --- build.sh | 2 +- go/base/context.go | 84 ++++++---- go/base/load_map.go | 70 +++++++++ go/base/load_map_test.go | 58 +++++++ go/base/utils.go | 4 +- go/base/utils_test.go | 29 ++++ go/binlog/mysqlbinlog_reader.go | 226 --------------------------- go/binlog/mysqlbinlog_reader_test.go | 50 ------ go/cmd/gh-ost/main.go | 7 +- go/logic/applier.go | 13 ++ go/logic/migrator.go | 97 +++++++++--- go/{binlog => mysql}/binlog_test.go | 2 +- go/sql/parser.go | 10 +- go/sql/parser_test.go | 68 ++++++++ 14 files changed, 381 insertions(+), 339 deletions(-) create mode 100644 go/base/load_map.go create mode 100644 go/base/load_map_test.go create mode 100644 go/base/utils_test.go delete mode 100644 go/binlog/mysqlbinlog_reader.go delete mode 100644 go/binlog/mysqlbinlog_reader_test.go rename go/{binlog => mysql}/binlog_test.go (99%) create mode 100644 go/sql/parser_test.go diff --git a/build.sh b/build.sh index ef26c15..44b1605 100644 --- a/build.sh +++ b/build.sh @@ -1,7 +1,7 @@ #!/bin/bash # # -RELEASE_VERSION="0.9.4" +RELEASE_VERSION="0.9.5" buildpath=/tmp/gh-ost target=gh-ost diff --git a/go/base/context.go b/go/base/context.go index f8ac7d4..d16dc08 100644 --- a/go/base/context.go +++ b/go/base/context.go @@ -7,7 +7,6 @@ package base import ( "fmt" - "strconv" "strings" "sync" "sync/atomic" @@ -65,9 +64,10 @@ type MigrationContext struct { ThrottleControlReplicaKeys *mysql.InstanceKeyMap ThrottleFlagFile string ThrottleAdditionalFlagFile string + ThrottleQuery string ThrottleCommandedByUser int64 - maxLoad map[string]int64 - maxLoadMutex *sync.Mutex + maxLoad LoadMap + criticalLoad LoadMap PostponeCutOverFlagFile string SwapTablesTimeoutSeconds int64 PanicFlagFile string @@ -148,8 +148,8 @@ func newMigrationContext() *MigrationContext { ApplierConnectionConfig: mysql.NewConnectionConfig(), MaxLagMillisecondsThrottleThreshold: 1000, SwapTablesTimeoutSeconds: 3, - maxLoad: make(map[string]int64), - maxLoadMutex: &sync.Mutex{}, + maxLoad: NewLoadMap(), + criticalLoad: NewLoadMap(), throttleMutex: &sync.Mutex{}, ThrottleControlReplicaKeys: mysql.NewInstanceKeyMap(), configMutex: &sync.Mutex{}, @@ -278,46 +278,64 @@ func (this *MigrationContext) IsThrottled() (bool, string) { return this.isThrottled, this.throttleReason } -func (this *MigrationContext) GetMaxLoad() map[string]int64 { - this.maxLoadMutex.Lock() - defer this.maxLoadMutex.Unlock() +func (this *MigrationContext) GetThrottleQuery() string { + var query string - tmpMaxLoadMap := make(map[string]int64) - for k, v := range this.maxLoad { - tmpMaxLoadMap[k] = v - } - return tmpMaxLoadMap + this.throttleMutex.Lock() + defer this.throttleMutex.Unlock() + + query = this.ThrottleQuery + return query +} + +func (this *MigrationContext) SetThrottleQuery(newQuery string) { + this.throttleMutex.Lock() + defer this.throttleMutex.Unlock() + + this.ThrottleQuery = newQuery +} + +func (this *MigrationContext) GetMaxLoad() LoadMap { + this.throttleMutex.Lock() + defer this.throttleMutex.Unlock() + + return this.maxLoad.Duplicate() +} + +func (this *MigrationContext) GetCriticalLoad() LoadMap { + this.throttleMutex.Lock() + defer this.throttleMutex.Unlock() + + return this.criticalLoad.Duplicate() } // ReadMaxLoad parses the `--max-load` flag, which is in multiple key-value format, // such as: 'Threads_running=100,Threads_connected=500' // It only applies changes in case there's no parsing error. func (this *MigrationContext) ReadMaxLoad(maxLoadList string) error { - if maxLoadList == "" { - return nil + loadMap, err := ParseLoadMap(maxLoadList) + if err != nil { + return err } - this.maxLoadMutex.Lock() - defer this.maxLoadMutex.Unlock() + this.throttleMutex.Lock() + defer this.throttleMutex.Unlock() - tmpMaxLoadMap := make(map[string]int64) + this.maxLoad = loadMap + return nil +} - maxLoadConditions := strings.Split(maxLoadList, ",") - for _, maxLoadCondition := range maxLoadConditions { - maxLoadTokens := strings.Split(maxLoadCondition, "=") - if len(maxLoadTokens) != 2 { - return fmt.Errorf("Error parsing max-load condition: %s", maxLoadCondition) - } - if maxLoadTokens[0] == "" { - return fmt.Errorf("Error parsing status variable in max-load condition: %s", maxLoadCondition) - } - if n, err := strconv.ParseInt(maxLoadTokens[1], 10, 0); err != nil { - return fmt.Errorf("Error parsing numeric value in max-load condition: %s", maxLoadCondition) - } else { - tmpMaxLoadMap[maxLoadTokens[0]] = n - } +// ReadMaxLoad parses the `--max-load` flag, which is in multiple key-value format, +// such as: 'Threads_running=100,Threads_connected=500' +// It only applies changes in case there's no parsing error. +func (this *MigrationContext) ReadCriticalLoad(criticalLoadList string) error { + loadMap, err := ParseLoadMap(criticalLoadList) + if err != nil { + return err } + this.throttleMutex.Lock() + defer this.throttleMutex.Unlock() - this.maxLoad = tmpMaxLoadMap + this.criticalLoad = loadMap return nil } diff --git a/go/base/load_map.go b/go/base/load_map.go new file mode 100644 index 0000000..2a07b7f --- /dev/null +++ b/go/base/load_map.go @@ -0,0 +1,70 @@ +/* + Copyright 2016 GitHub Inc. + See https://github.com/github/gh-ost/blob/master/LICENSE +*/ + +package base + +import ( + "fmt" + "sort" + "strconv" + "strings" +) + +// LoadMap is a mapping of status variable & threshold +// e.g. [Threads_connected: 100, Threads_running: 50] +type LoadMap map[string]int64 + +func NewLoadMap() LoadMap { + result := make(map[string]int64) + return result +} + +// NewLoadMap parses a `--*-load` flag (e.g. `--max-load`), which is in multiple +// key-value format, such as: +// 'Threads_running=100,Threads_connected=500' +func ParseLoadMap(loadList string) (LoadMap, error) { + result := NewLoadMap() + if loadList == "" { + return result, nil + } + + loadConditions := strings.Split(loadList, ",") + for _, loadCondition := range loadConditions { + loadTokens := strings.Split(loadCondition, "=") + if len(loadTokens) != 2 { + return result, fmt.Errorf("Error parsing load condition: %s", loadCondition) + } + if loadTokens[0] == "" { + return result, fmt.Errorf("Error parsing status variable in load condition: %s", loadCondition) + } + if n, err := strconv.ParseInt(loadTokens[1], 10, 0); err != nil { + return result, fmt.Errorf("Error parsing numeric value in load condition: %s", loadCondition) + } else { + result[loadTokens[0]] = n + } + } + + return result, nil +} + +// Duplicate creates a clone of this map +func (this *LoadMap) Duplicate() LoadMap { + dup := make(map[string]int64) + for k, v := range *this { + dup[k] = v + } + return dup +} + +// String() returns a string representation of this map +func (this *LoadMap) String() string { + tokens := []string{} + for key, val := range *this { + token := fmt.Sprintf("%s=%d", key, val) + tokens = append(tokens, token) + } + sort.Strings(tokens) + return strings.Join(tokens, ",") +} diff --git a/go/base/load_map_test.go b/go/base/load_map_test.go new file mode 100644 index 0000000..3a58e78 --- /dev/null +++ b/go/base/load_map_test.go @@ -0,0 +1,58 @@ +/* + Copyright 2016 GitHub Inc. + See https://github.com/github/gh-ost/blob/master/LICENSE +*/ + +package base + +import ( + "testing" + + "github.com/outbrain/golib/log" + test "github.com/outbrain/golib/tests" +) + +func init() { + log.SetLevel(log.ERROR) +} + +func TestParseLoadMap(t *testing.T) { + { + loadList := "" + m, err := ParseLoadMap(loadList) + test.S(t).ExpectNil(err) + test.S(t).ExpectEquals(len(m), 0) + } + { + loadList := "threads_running=20,threads_connected=10" + m, err := ParseLoadMap(loadList) + test.S(t).ExpectNil(err) + test.S(t).ExpectEquals(len(m), 2) + test.S(t).ExpectEquals(m["threads_running"], int64(20)) + test.S(t).ExpectEquals(m["threads_connected"], int64(10)) + } + { + loadList := "threads_running=20=30,threads_connected=10" + _, err := ParseLoadMap(loadList) + test.S(t).ExpectNotNil(err) + } + { + loadList := "threads_running=20,threads_connected" + _, err := ParseLoadMap(loadList) + test.S(t).ExpectNotNil(err) + } +} + +func TestString(t *testing.T) { + { + m, _ := ParseLoadMap("") + s := m.String() + test.S(t).ExpectEquals(s, "") + } + { + loadList := "threads_running=20,threads_connected=10" + m, _ := ParseLoadMap(loadList) + s := m.String() + test.S(t).ExpectEquals(s, "threads_connected=10,threads_running=20") + } +} diff --git a/go/base/utils.go b/go/base/utils.go index 1d3b7cd..2721742 100644 --- a/go/base/utils.go +++ b/go/base/utils.go @@ -33,10 +33,12 @@ func FileExists(fileName string) bool { return false } +// StringContainsAll returns true if `s` contains all non empty given `substrings` +// The function returns `false` if no non-empty arguments are given. func StringContainsAll(s string, substrings ...string) bool { nonEmptyStringsFound := false for _, substring := range substrings { - if s == "" { + if substring == "" { continue } if strings.Contains(s, substring) { diff --git a/go/base/utils_test.go b/go/base/utils_test.go new file mode 100644 index 0000000..d11cc21 --- /dev/null +++ b/go/base/utils_test.go @@ -0,0 +1,29 @@ +/* + Copyright 2016 GitHub Inc. + See https://github.com/github/gh-ost/blob/master/LICENSE +*/ + +package base + +import ( + "testing" + + "github.com/outbrain/golib/log" + test "github.com/outbrain/golib/tests" +) + +func init() { + log.SetLevel(log.ERROR) +} + +func TestStringContainsAll(t *testing.T) { + s := `insert,delete,update` + + test.S(t).ExpectFalse(StringContainsAll(s)) + test.S(t).ExpectFalse(StringContainsAll(s, "")) + test.S(t).ExpectFalse(StringContainsAll(s, "drop")) + test.S(t).ExpectTrue(StringContainsAll(s, "insert")) + test.S(t).ExpectFalse(StringContainsAll(s, "insert", "drop")) + test.S(t).ExpectTrue(StringContainsAll(s, "insert", "")) + test.S(t).ExpectTrue(StringContainsAll(s, "insert", "update", "delete")) +} diff --git a/go/binlog/mysqlbinlog_reader.go b/go/binlog/mysqlbinlog_reader.go deleted file mode 100644 index c9c64ce..0000000 --- a/go/binlog/mysqlbinlog_reader.go +++ /dev/null @@ -1,226 +0,0 @@ -/* - Copyright 2016 GitHub Inc. - See https://github.com/github/gh-ost/blob/master/LICENSE -*/ - -package binlog - -import ( - "bufio" - "bytes" - "fmt" - "path" - "regexp" - "strconv" - // "strings" - - "github.com/github/gh-ost/go/os" - "github.com/outbrain/golib/log" -) - -var ( - binlogChunkSizeBytes uint64 = 32 * 1024 * 1024 - startEntryRegexp = regexp.MustCompile("^# at ([0-9]+)$") - startEntryUnknownTableRegexp = regexp.MustCompile("^### Row event for unknown table .*? at ([0-9]+)$") - endLogPosRegexp = regexp.MustCompile("^#[0-9]{6} .*? end_log_pos ([0-9]+)") - statementRegxp = regexp.MustCompile("### (INSERT INTO|UPDATE|DELETE FROM) `(.*?)`[.]`(.*?)`") - tokenRegxp = regexp.MustCompile("### (WHERE|SET)$") - positionalColumnRegexp = regexp.MustCompile("### @([0-9]+)=(.+)$") -) - -// BinlogEntryState is a state in the binlog parser automaton / state machine -type BinlogEntryState string - -// States of the state machine -const ( - InvalidState BinlogEntryState = "InvalidState" - SearchForStartPosOrStatementState = "SearchForStartPosOrStatementState" - ExpectEndLogPosState = "ExpectEndLogPosState" - ExpectTokenState = "ExpectTokenState" - PositionalColumnAssignmentState = "PositionalColumnAssignmentState" -) - -// MySQLBinlogReader reads binary log entries by executing the `mysqlbinlog` -// process and textually parsing its output -type MySQLBinlogReader struct { - Basedir string - Datadir string - MySQLBinlogBinary string -} - -// NewMySQLBinlogReader creates a new reader that directly parses binlog files from the filesystem -func NewMySQLBinlogReader(basedir string, datadir string) (mySQLBinlogReader *MySQLBinlogReader) { - mySQLBinlogReader = &MySQLBinlogReader{ - Basedir: basedir, - Datadir: datadir, - } - mySQLBinlogReader.MySQLBinlogBinary = path.Join(mySQLBinlogReader.Basedir, "bin/mysqlbinlog") - return mySQLBinlogReader -} - -// ReadEntries will read binlog entries from parsed text output of `mysqlbinlog` utility -func (this *MySQLBinlogReader) ReadEntries(logFile string, startPos uint64, stopPos uint64) (entries [](*BinlogEntry), err error) { - if startPos == 0 { - startPos = 4 - } - done := false - chunkStartPos := startPos - for !done { - chunkStopPos := chunkStartPos + binlogChunkSizeBytes - if chunkStopPos > stopPos && stopPos != 0 { - chunkStopPos = stopPos - } - log.Debugf("Next chunk range %d - %d", chunkStartPos, chunkStopPos) - binlogFilePath := path.Join(this.Datadir, logFile) - command := fmt.Sprintf(`%s --verbose --base64-output=DECODE-ROWS --start-position=%d --stop-position=%d %s`, this.MySQLBinlogBinary, chunkStartPos, chunkStopPos, binlogFilePath) - entriesBytes, err := os.RunCommandWithOutput(command) - if err != nil { - return entries, log.Errore(err) - } - - chunkEntries, err := parseEntries(bufio.NewScanner(bytes.NewReader(entriesBytes)), logFile) - if err != nil { - return entries, log.Errore(err) - } - - if len(chunkEntries) == 0 { - done = true - } else { - entries = append(entries, chunkEntries...) - lastChunkEntry := chunkEntries[len(chunkEntries)-1] - chunkStartPos = lastChunkEntry.EndLogPos - } - } - return entries, err -} - -// automaton step: accept wither beginning of new entry, or beginning of new statement -func searchForStartPosOrStatement(scanner *bufio.Scanner, binlogEntry *BinlogEntry, previousEndLogPos uint64) (nextState BinlogEntryState, nextBinlogEntry *BinlogEntry, err error) { - onStartEntry := func(submatch []string) (BinlogEntryState, *BinlogEntry, error) { - startLogPos, _ := strconv.ParseUint(submatch[1], 10, 64) - - if previousEndLogPos != 0 && startLogPos != previousEndLogPos { - return InvalidState, binlogEntry, fmt.Errorf("Expected startLogPos %+v to equal previous endLogPos %+v", startLogPos, previousEndLogPos) - } - nextBinlogEntry = binlogEntry - if binlogEntry.Coordinates.LogPos != 0 && binlogEntry.DmlEvent != nil { - // Current entry is already a true entry, with startpos and with statement - nextBinlogEntry = NewBinlogEntry(binlogEntry.Coordinates.LogFile, startLogPos) - } - return ExpectEndLogPosState, nextBinlogEntry, nil - } - - onStatementEntry := func(submatch []string) (BinlogEntryState, *BinlogEntry, error) { - nextBinlogEntry = binlogEntry - if binlogEntry.Coordinates.LogPos != 0 && binlogEntry.DmlEvent != nil { - // Current entry is already a true entry, with startpos and with statement - nextBinlogEntry = binlogEntry.Duplicate() - } - nextBinlogEntry.DmlEvent = NewBinlogDMLEvent(submatch[2], submatch[3], ToEventDML(submatch[1])) - - return ExpectTokenState, nextBinlogEntry, nil - } - - // Defuncting the following: - - // onPositionalColumn := func(submatch []string) (BinlogEntryState, *BinlogEntry, error) { - // columnIndex, _ := strconv.ParseUint(submatch[1], 10, 64) - // if _, found := binlogEntry.PositionalColumns[columnIndex]; found { - // return InvalidState, binlogEntry, fmt.Errorf("Positional column %+v found more than once in %+v, statement=%+v", columnIndex, binlogEntry.LogPos, binlogEntry.DmlEvent.DML) - // } - // columnValue := submatch[2] - // columnValue = strings.TrimPrefix(columnValue, "'") - // columnValue = strings.TrimSuffix(columnValue, "'") - // binlogEntry.PositionalColumns[columnIndex] = columnValue - // - // return SearchForStartPosOrStatementState, binlogEntry, nil - // } - - line := scanner.Text() - if submatch := startEntryRegexp.FindStringSubmatch(line); len(submatch) > 1 { - return onStartEntry(submatch) - } - if submatch := startEntryUnknownTableRegexp.FindStringSubmatch(line); len(submatch) > 1 { - return onStartEntry(submatch) - } - if submatch := statementRegxp.FindStringSubmatch(line); len(submatch) > 1 { - return onStatementEntry(submatch) - } - if submatch := positionalColumnRegexp.FindStringSubmatch(line); len(submatch) > 1 { - // Defuncting return onPositionalColumn(submatch) - } - // Haven't found a match - return SearchForStartPosOrStatementState, binlogEntry, nil -} - -// automaton step: expect an end_log_pos line` -func expectEndLogPos(scanner *bufio.Scanner, binlogEntry *BinlogEntry) (nextState BinlogEntryState, err error) { - line := scanner.Text() - - submatch := endLogPosRegexp.FindStringSubmatch(line) - if len(submatch) > 1 { - binlogEntry.EndLogPos, _ = strconv.ParseUint(submatch[1], 10, 64) - return SearchForStartPosOrStatementState, nil - } - return InvalidState, fmt.Errorf("Expected to find end_log_pos following pos %+v", binlogEntry.Coordinates.LogPos) -} - -// automaton step: a not-strictly-required but good-to-have-around validation that -// we see an expected token following a statement -func expectToken(scanner *bufio.Scanner, binlogEntry *BinlogEntry) (nextState BinlogEntryState, err error) { - line := scanner.Text() - if submatch := tokenRegxp.FindStringSubmatch(line); len(submatch) > 1 { - return SearchForStartPosOrStatementState, nil - } - return InvalidState, fmt.Errorf("Expected to find token following pos %+v", binlogEntry.Coordinates.LogPos) -} - -// parseEntries will parse output of `mysqlbinlog --verbose --base64-output=DECODE-ROWS` -// It issues an automaton / state machine to do its thang. -func parseEntries(scanner *bufio.Scanner, logFile string) (entries [](*BinlogEntry), err error) { - binlogEntry := NewBinlogEntry(logFile, 0) - var state BinlogEntryState = SearchForStartPosOrStatementState - var endLogPos uint64 - - appendBinlogEntry := func() { - if binlogEntry.Coordinates.LogPos == 0 { - return - } - if binlogEntry.DmlEvent == nil { - return - } - entries = append(entries, binlogEntry) - log.Debugf("entry: %+v", *binlogEntry) - fmt.Println(fmt.Sprintf("%s `%s`.`%s`", binlogEntry.DmlEvent.DML, binlogEntry.DmlEvent.DatabaseName, binlogEntry.DmlEvent.TableName)) - } - for scanner.Scan() { - switch state { - case SearchForStartPosOrStatementState: - { - var nextBinlogEntry *BinlogEntry - state, nextBinlogEntry, err = searchForStartPosOrStatement(scanner, binlogEntry, endLogPos) - if nextBinlogEntry != binlogEntry { - appendBinlogEntry() - binlogEntry = nextBinlogEntry - } - } - case ExpectEndLogPosState: - { - state, err = expectEndLogPos(scanner, binlogEntry) - } - case ExpectTokenState: - { - state, err = expectToken(scanner, binlogEntry) - } - default: - { - err = fmt.Errorf("Unexpected state %+v", state) - } - } - if err != nil { - return entries, log.Errore(err) - } - } - appendBinlogEntry() - return entries, err -} diff --git a/go/binlog/mysqlbinlog_reader_test.go b/go/binlog/mysqlbinlog_reader_test.go deleted file mode 100644 index c2e641e..0000000 --- a/go/binlog/mysqlbinlog_reader_test.go +++ /dev/null @@ -1,50 +0,0 @@ -/* - Copyright 2016 GitHub Inc. - See https://github.com/github/gh-ost/blob/master/LICENSE -*/ - -package binlog - -import ( - "bufio" - "os" - "testing" - - "github.com/outbrain/golib/log" - test "github.com/outbrain/golib/tests" -) - -func init() { - log.SetLevel(log.ERROR) -} - -func __TestRBRSample0(t *testing.T) { - testFile, err := os.Open("testdata/rbr-sample-0.txt") - test.S(t).ExpectNil(err) - defer testFile.Close() - - scanner := bufio.NewScanner(testFile) - entries, err := parseEntries(scanner) - test.S(t).ExpectNil(err) - - test.S(t).ExpectEquals(len(entries), 17) - test.S(t).ExpectEquals(entries[0].DatabaseName, "test") - test.S(t).ExpectEquals(entries[0].TableName, "samplet") - test.S(t).ExpectEquals(entries[0].StatementType, "INSERT") - test.S(t).ExpectEquals(entries[1].StatementType, "INSERT") - test.S(t).ExpectEquals(entries[2].StatementType, "INSERT") - test.S(t).ExpectEquals(entries[3].StatementType, "INSERT") - test.S(t).ExpectEquals(entries[4].StatementType, "INSERT") - test.S(t).ExpectEquals(entries[5].StatementType, "INSERT") - test.S(t).ExpectEquals(entries[6].StatementType, "UPDATE") - test.S(t).ExpectEquals(entries[7].StatementType, "DELETE") - test.S(t).ExpectEquals(entries[8].StatementType, "UPDATE") - test.S(t).ExpectEquals(entries[9].StatementType, "INSERT") - test.S(t).ExpectEquals(entries[10].StatementType, "INSERT") - test.S(t).ExpectEquals(entries[11].StatementType, "DELETE") - test.S(t).ExpectEquals(entries[12].StatementType, "DELETE") - test.S(t).ExpectEquals(entries[13].StatementType, "INSERT") - test.S(t).ExpectEquals(entries[14].StatementType, "UPDATE") - test.S(t).ExpectEquals(entries[15].StatementType, "DELETE") - test.S(t).ExpectEquals(entries[16].StatementType, "INSERT") -} diff --git a/go/cmd/gh-ost/main.go b/go/cmd/gh-ost/main.go index 484bad8..e0bb6cd 100644 --- a/go/cmd/gh-ost/main.go +++ b/go/cmd/gh-ost/main.go @@ -73,6 +73,7 @@ func main() { flag.Int64Var(&migrationContext.MaxLagMillisecondsThrottleThreshold, "max-lag-millis", 1500, "replication lag at which to throttle operation") flag.StringVar(&migrationContext.ReplictionLagQuery, "replication-lag-query", "", "Query that detects replication lag in seconds. Result can be a floating point (by default gh-ost issues SHOW SLAVE STATUS and reads Seconds_behind_master). If you're using pt-heartbeat, query would be something like: SELECT ROUND(UNIX_TIMESTAMP() - MAX(UNIX_TIMESTAMP(ts))) AS delay FROM my_schema.heartbeat") throttleControlReplicas := flag.String("throttle-control-replicas", "", "List of replicas on which to check for lag; comma delimited. Example: myhost1.com:3306,myhost2.com,myhost3.com:3307") + flag.StringVar(&migrationContext.ThrottleQuery, "throttle-query", "", "when given, issued (every second) to check if operation should throttle. Expecting to return zero for no-throttle, >0 for throttle. Query is issued on the migrated server. Make sure this query is lightweight") flag.StringVar(&migrationContext.ThrottleFlagFile, "throttle-flag-file", "", "operation pauses when this file exists; hint: use a file that is specific to the table being altered") flag.StringVar(&migrationContext.ThrottleAdditionalFlagFile, "throttle-additional-flag-file", "/tmp/gh-ost.throttle", "operation pauses when this file exists; hint: keep default, use for throttling multiple gh-ost operations") flag.StringVar(&migrationContext.PostponeCutOverFlagFile, "postpone-cut-over-flag-file", "", "while this file exists, migration will postpone the final stage of swapping tables, and will keep on syncing the ghost table. Cut-over/swapping would be ready to perform the moment the file is deleted.") @@ -81,7 +82,8 @@ func main() { flag.StringVar(&migrationContext.ServeSocketFile, "serve-socket-file", "", "Unix socket file to serve on. Default: auto-determined and advertised upon startup") flag.Int64Var(&migrationContext.ServeTCPPort, "serve-tcp-port", 0, "TCP port to serve on. Default: disabled") - maxLoad := flag.String("max-load", "", "Comma delimited status-name=threshold. e.g: 'Threads_running=100,Threads_connected=500'") + maxLoad := flag.String("max-load", "", "Comma delimited status-name=threshold. e.g: 'Threads_running=100,Threads_connected=500'. When status exceeds threshold, app throttles writes") + criticalLoad := flag.String("critical-load", "", "Comma delimited status-name=threshold, same format as `--max-load`. When status exceeds threshold, app panics and quits") quiet := flag.Bool("quiet", false, "quiet") verbose := flag.Bool("verbose", false, "verbose") debug := flag.Bool("debug", false, "debug mode (very verbose)") @@ -156,6 +158,9 @@ func main() { if err := migrationContext.ReadMaxLoad(*maxLoad); err != nil { log.Fatale(err) } + if err := migrationContext.ReadCriticalLoad(*criticalLoad); err != nil { + log.Fatale(err) + } if migrationContext.ServeSocketFile == "" { migrationContext.ServeSocketFile = fmt.Sprintf("/tmp/gh-ost.%s.%s.sock", migrationContext.DatabaseName, migrationContext.OriginalTableName) } diff --git a/go/logic/applier.go b/go/logic/applier.go index 4c2bdae..808cfec 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -263,6 +263,19 @@ func (this *Applier) InitiateHeartbeat(heartbeatIntervalMilliseconds int64) { } } +func (this *Applier) ExecuteThrottleQuery() (int64, error) { + throttleQuery := this.migrationContext.GetThrottleQuery() + + if throttleQuery == "" { + return 0, nil + } + var result int64 + if err := this.db.QueryRow(throttleQuery).Scan(&result); err != nil { + return 0, log.Errore(err) + } + return result, nil +} + // ReadMigrationMinValues func (this *Applier) ReadMigrationMinValues(uniqueKey *sql.UniqueKey) error { log.Debugf("Reading migration range according to key: %s", uniqueKey.Name) diff --git a/go/logic/migrator.go b/go/logic/migrator.go index e2c3fc5..6257689 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -40,6 +40,14 @@ const ( heartbeatIntervalMilliseconds = 1000 ) +type PrintStatusRule int + +const ( + HeuristicPrintStatusRule PrintStatusRule = iota + ForcePrintStatusRule = iota + ForcePrintStatusAndHint = iota +) + // Migrator is the main schema migration flow manager. type Migrator struct { parser *sql.Parser @@ -106,6 +114,17 @@ func (this *Migrator) shouldThrottle() (result bool, reason string) { this.panicAbort <- fmt.Errorf("Found panic-file %s. Aborting without cleanup", this.migrationContext.PanicFlagFile) } } + criticalLoad := this.migrationContext.GetCriticalLoad() + for variableName, threshold := range criticalLoad { + value, err := this.applier.ShowStatusVariable(variableName) + if err != nil { + return true, fmt.Sprintf("%s %s", variableName, err) + } + if value >= threshold { + this.panicAbort <- fmt.Errorf("critical-load met: %s=%d, >=%d", variableName, value, threshold) + } + } + // Back to throttle considerations // User-based throttle @@ -145,8 +164,13 @@ func (this *Migrator) shouldThrottle() (result bool, reason string) { if err != nil { return true, fmt.Sprintf("%s %s", variableName, err) } - if value > threshold { - return true, fmt.Sprintf("%s=%d", variableName, value) + if value >= threshold { + return true, fmt.Sprintf("max-load %s=%d >= %d", variableName, value, threshold) + } + } + if this.migrationContext.GetThrottleQuery() != "" { + if res, _ := this.applier.ExecuteThrottleQuery(); res > 0 { + return true, "throttle-query" } } @@ -311,6 +335,7 @@ func (this *Migrator) validateStatement() (err error) { if !this.migrationContext.ApproveRenamedColumns { return fmt.Errorf("Alter statement has column(s) renamed. gh-ost suspects the following renames: %v; but to proceed you must approve via `--approve-renamed-columns` (or you can skip renamed columns via `--skip-renamed-columns`)", this.parser.GetNonTrivialRenames()) } + log.Infof("Alter statement has column(s) renamed. gh-ost finds the following renames: %v; --approve-renamed-columns is given and so migration proceeds.", this.parser.GetNonTrivialRenames()) } return nil } @@ -375,7 +400,7 @@ func (this *Migrator) Migrate() (err error) { log.Debugf("Operating until row copy is complete") this.consumeRowCopyComplete() log.Infof("Row copy complete") - this.printStatus() + this.printStatus(ForcePrintStatusRule) if err := this.cutOver(); err != nil { return err @@ -459,8 +484,7 @@ func (this *Migrator) waitForEventsUpToLock() (err error) { waitForEventsUpToLockDuration := time.Now().Sub(waitForEventsUpToLockStartTime) log.Infof("Done waiting for events up to lock; duration=%+v", waitForEventsUpToLockDuration) - this.printMigrationStatusHint() - this.printStatus() + this.printStatus(ForcePrintStatusAndHint) return nil } @@ -621,17 +645,18 @@ func (this *Migrator) onServerCommand(command string, writer *bufio.Writer) (err case "help": { fmt.Fprintln(writer, `available commands: - status # Print a status message - chunk-size= # Set a new chunk-size - max-load= # Set a new set of max-load thresholds - throttle # Force throttling - no-throttle # End forced throttling (other throttling may still apply) - help # This message +status # Print a status message +chunk-size= # Set a new chunk-size +critical-load= # Set a new set of max-load thresholds +max-load= # Set a new set of max-load thresholds +throttle-query= # Set a new throttle-query +throttle # Force throttling +no-throttle # End forced throttling (other throttling may still apply) +help # This message `) } case "info", "status": - this.printMigrationStatusHint(writer) - this.printStatus(writer) + this.printStatus(ForcePrintStatusAndHint, writer) case "chunk-size": { if chunkSize, err := strconv.Atoi(arg); err != nil { @@ -639,7 +664,7 @@ func (this *Migrator) onServerCommand(command string, writer *bufio.Writer) (err return log.Errore(err) } else { this.migrationContext.SetChunkSize(int64(chunkSize)) - this.printMigrationStatusHint(writer) + this.printStatus(ForcePrintStatusAndHint, writer) } } case "max-load": @@ -648,7 +673,20 @@ func (this *Migrator) onServerCommand(command string, writer *bufio.Writer) (err fmt.Fprintf(writer, "%s\n", err.Error()) return log.Errore(err) } - this.printMigrationStatusHint(writer) + this.printStatus(ForcePrintStatusAndHint, writer) + } + case "critical-load": + { + if err := this.migrationContext.ReadCriticalLoad(arg); err != nil { + fmt.Fprintf(writer, "%s\n", err.Error()) + return log.Errore(err) + } + this.printStatus(ForcePrintStatusAndHint, writer) + } + case "throttle-query": + { + this.migrationContext.SetThrottleQuery(arg) + this.printStatus(ForcePrintStatusAndHint, writer) } case "throttle", "pause", "suspend": { @@ -715,17 +753,16 @@ func (this *Migrator) initiateInspector() (err error) { } func (this *Migrator) initiateStatus() error { - this.printStatus() + this.printStatus(ForcePrintStatusAndHint) statusTick := time.Tick(1 * time.Second) for range statusTick { - go this.printStatus() + go this.printStatus(HeuristicPrintStatusRule) } return nil } func (this *Migrator) printMigrationStatusHint(writers ...io.Writer) { - writers = append(writers, os.Stdout) w := io.MultiWriter(writers...) fmt.Fprintln(w, fmt.Sprintf("# Migrating %s.%s; Ghost table is %s.%s", sql.EscapeName(this.migrationContext.DatabaseName), @@ -737,10 +774,12 @@ func (this *Migrator) printMigrationStatusHint(writers ...io.Writer) { this.migrationContext.StartTime.Format(time.RubyDate), )) maxLoad := this.migrationContext.GetMaxLoad() - fmt.Fprintln(w, fmt.Sprintf("# chunk-size: %+v; max lag: %+vms; max-load: %+v", + criticalLoad := this.migrationContext.GetCriticalLoad() + fmt.Fprintln(w, fmt.Sprintf("# chunk-size: %+v; max lag: %+vms; max-load: %s; critical-load: %s", atomic.LoadInt64(&this.migrationContext.ChunkSize), atomic.LoadInt64(&this.migrationContext.MaxLagMillisecondsThrottleThreshold), - maxLoad, + maxLoad.String(), + criticalLoad.String(), )) if this.migrationContext.ThrottleFlagFile != "" { fmt.Fprintln(w, fmt.Sprintf("# Throttle flag file: %+v", @@ -752,6 +791,11 @@ func (this *Migrator) printMigrationStatusHint(writers ...io.Writer) { this.migrationContext.ThrottleAdditionalFlagFile, )) } + if throttleQuery := this.migrationContext.GetThrottleQuery(); throttleQuery != "" { + fmt.Fprintln(w, fmt.Sprintf("# Throttle query: %+v", + throttleQuery, + )) + } if this.migrationContext.PostponeCutOverFlagFile != "" { fmt.Fprintln(w, fmt.Sprintf("# Postpone cut-over flag file: %+v", this.migrationContext.PostponeCutOverFlagFile, @@ -770,7 +814,9 @@ func (this *Migrator) printMigrationStatusHint(writers ...io.Writer) { } } -func (this *Migrator) printStatus(writers ...io.Writer) { +func (this *Migrator) printStatus(rule PrintStatusRule, writers ...io.Writer) { + writers = append(writers, os.Stdout) + elapsedTime := this.migrationContext.ElapsedTime() elapsedSeconds := int64(elapsedTime.Seconds()) totalRowsCopied := this.migrationContext.GetTotalRowsCopied() @@ -782,8 +828,11 @@ func (this *Migrator) printStatus(writers ...io.Writer) { // Before status, let's see if we should print a nice reminder for what exactly we're doing here. shouldPrintMigrationStatusHint := (elapsedSeconds%600 == 0) + if rule == ForcePrintStatusAndHint { + shouldPrintMigrationStatusHint = true + } if shouldPrintMigrationStatusHint { - this.printMigrationStatusHint() + this.printMigrationStatusHint(writers...) } var etaSeconds float64 = math.MaxFloat64 @@ -820,6 +869,9 @@ func (this *Migrator) printStatus(writers ...io.Writer) { } else { shouldPrintStatus = (elapsedSeconds%30 == 0) } + if rule == ForcePrintStatusRule || rule == ForcePrintStatusAndHint { + shouldPrintStatus = true + } if !shouldPrintStatus { return } @@ -838,7 +890,6 @@ func (this *Migrator) printStatus(writers ...io.Writer) { fmt.Sprintf("copy iteration %d at %d", this.migrationContext.GetIteration(), time.Now().Unix()), status, ) - writers = append(writers, os.Stdout) w := io.MultiWriter(writers...) fmt.Fprintln(w, status) } diff --git a/go/binlog/binlog_test.go b/go/mysql/binlog_test.go similarity index 99% rename from go/binlog/binlog_test.go rename to go/mysql/binlog_test.go index 4abf88f..b878c58 100644 --- a/go/binlog/binlog_test.go +++ b/go/mysql/binlog_test.go @@ -3,7 +3,7 @@ See https://github.com/github/gh-ost/blob/master/LICENSE */ -package binlog +package mysql import ( "testing" diff --git a/go/sql/parser.go b/go/sql/parser.go index fc472e8..144f265 100644 --- a/go/sql/parser.go +++ b/go/sql/parser.go @@ -11,7 +11,7 @@ import ( ) var ( - renameColumnRegexp = regexp.MustCompile(`(?i)CHANGE\s+(column\s+|)([\S]+)\s+([\S]+)\s+`) + renameColumnRegexp = regexp.MustCompile(`(?i)change\s+(column\s+|)([\S]+)\s+([\S]+)\s+`) ) type Parser struct { @@ -27,8 +27,12 @@ func NewParser() *Parser { func (this *Parser) ParseAlterStatement(alterStatement string) (err error) { allStringSubmatch := renameColumnRegexp.FindAllStringSubmatch(alterStatement, -1) for _, submatch := range allStringSubmatch { - submatch[2], _ = strconv.Unquote(submatch[2]) - submatch[3], _ = strconv.Unquote(submatch[3]) + if unquoted, err := strconv.Unquote(submatch[2]); err == nil { + submatch[2] = unquoted + } + if unquoted, err := strconv.Unquote(submatch[3]); err == nil { + submatch[3] = unquoted + } this.columnRenameMap[submatch[2]] = submatch[3] } diff --git a/go/sql/parser_test.go b/go/sql/parser_test.go new file mode 100644 index 0000000..2107963 --- /dev/null +++ b/go/sql/parser_test.go @@ -0,0 +1,68 @@ +/* + Copyright 2016 GitHub Inc. + See https://github.com/github/gh-ost/blob/master/LICENSE +*/ + +package sql + +import ( + "testing" + + "github.com/outbrain/golib/log" + test "github.com/outbrain/golib/tests" +) + +func init() { + log.SetLevel(log.ERROR) +} + +func TestParseAlterStatement(t *testing.T) { + statement := "add column t int, engine=innodb" + parser := NewParser() + err := parser.ParseAlterStatement(statement) + test.S(t).ExpectNil(err) + test.S(t).ExpectFalse(parser.HasNonTrivialRenames()) +} + +func TestParseAlterStatementTrivialRename(t *testing.T) { + statement := "add column t int, change ts ts timestamp, engine=innodb" + parser := NewParser() + err := parser.ParseAlterStatement(statement) + test.S(t).ExpectNil(err) + test.S(t).ExpectFalse(parser.HasNonTrivialRenames()) + test.S(t).ExpectEquals(len(parser.columnRenameMap), 1) + test.S(t).ExpectEquals(parser.columnRenameMap["ts"], "ts") +} + +func TestParseAlterStatementTrivialRenames(t *testing.T) { + statement := "add column t int, change ts ts timestamp, CHANGE f `f` float, engine=innodb" + parser := NewParser() + err := parser.ParseAlterStatement(statement) + test.S(t).ExpectNil(err) + test.S(t).ExpectFalse(parser.HasNonTrivialRenames()) + test.S(t).ExpectEquals(len(parser.columnRenameMap), 2) + test.S(t).ExpectEquals(parser.columnRenameMap["ts"], "ts") + test.S(t).ExpectEquals(parser.columnRenameMap["f"], "f") +} + +func TestParseAlterStatementNonTrivial(t *testing.T) { + statements := []string{ + `add column b bigint, change f fl float, change i count int, engine=innodb`, + "add column b bigint, change column `f` fl float, change `i` `count` int, engine=innodb", + "add column b bigint, change column `f` fl float, change `i` `count` int, change ts ts timestamp, engine=innodb", + `change + f fl float, + CHANGE COLUMN i + count int, engine=innodb`, + } + + for _, statement := range statements { + parser := NewParser() + err := parser.ParseAlterStatement(statement) + test.S(t).ExpectNil(err) + renames := parser.GetNonTrivialRenames() + test.S(t).ExpectEquals(len(renames), 2) + test.S(t).ExpectEquals(renames["i"], "count") + test.S(t).ExpectEquals(renames["f"], "fl") + } +}