Merge branch 'github:master' into add-rocksdb-as-transactional-engine

This commit is contained in:
wangzihuacool 2022-11-23 11:01:33 +08:00 committed by GitHub
commit 9c0babdc07
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 250 additions and 52 deletions

View File

@ -5,7 +5,7 @@ on: [pull_request]
jobs: jobs:
build: build:
runs-on: ubuntu-latest runs-on: ubuntu-20.04
steps: steps:
- uses: actions/checkout@v2 - uses: actions/checkout@v2

View File

@ -5,7 +5,7 @@ on: [pull_request]
jobs: jobs:
build: build:
runs-on: ubuntu-latest runs-on: ubuntu-20.04
strategy: strategy:
matrix: matrix:
version: [mysql-5.7.25,mysql-8.0.16] version: [mysql-5.7.25,mysql-8.0.16]

View File

@ -4,14 +4,19 @@ linters:
disable: disable:
- errcheck - errcheck
enable: enable:
- bodyclose
- containedctx
- contextcheck - contextcheck
- dogsled
- durationcheck - durationcheck
- errname - errname
- errorlint
- execinquery - execinquery
- gofmt - gofmt
- ifshort - ifshort
- misspell - misspell
- nilerr - nilerr
- nilnil
- noctx - noctx
- nolintlint - nolintlint
- nosprintfhostport - nosprintfhostport
@ -19,6 +24,7 @@ linters:
- rowserrcheck - rowserrcheck
- sqlclosecheck - sqlclosecheck
- unconvert - unconvert
- unparam
- unused - unused
- wastedassign - wastedassign
- whitespace - whitespace

View File

@ -45,6 +45,22 @@ If you happen to _know_ your servers use RBR (Row Based Replication, i.e. `binlo
Skipping this step means `gh-ost` would not need the `SUPER` privilege in order to operate. Skipping this step means `gh-ost` would not need the `SUPER` privilege in order to operate.
You may want to use this on Amazon RDS. You may want to use this on Amazon RDS.
### attempt-instant-ddl
MySQL 8.0 supports "instant DDL" for some operations. If an alter statement can be completed with instant DDL, only a metadata change is required internally. Instant operations include:
- Adding a column
- Dropping a column
- Dropping an index
- Extending a varchar column
- Adding a virtual generated column
It is not reliable to parse the `ALTER` statement to determine if it is instant or not. This is because the table might be in an older row format, or have some other incompatibility that is difficult to identify.
`--attempt-instant-ddl` is disabled by default, but the risks of enabling it are relatively minor: `gh-ost` may need to acquire a metadata lock at the start of the operation. This is not a problem for most scenarios, but it could be a problem for users that start the DDL during a period with long running transactions.
`gh-ost` will automatically fallback to the normal DDL process if the attempt to use instant DDL is unsuccessful.
### conf ### conf
`--conf=/path/to/my.cnf`: file where credentials are specified. Should be in (or contain) the following format: `--conf=/path/to/my.cnf`: file where credentials are specified. Should be in (or contain) the following format:

View File

@ -29,7 +29,7 @@ CREATE TABLE tbl (
(This is also the definition of the _ghost_ table, except that that table would be called `_tbl_gho`). (This is also the definition of the _ghost_ table, except that that table would be called `_tbl_gho`).
In this migration, the _before_ and _after_ versions contain the same unique not-null key (the PRIMARY KEY). To run this migration, `gh-ost` would iterate through the `tbl` table using the primary key, copy rows from `tbl` to the _ghost_ table `_tbl_gho` in primary key order, while also applying the binlog event writes from `tble` onto `_tbl_gho`. In this migration, the _before_ and _after_ versions contain the same unique not-null key (the PRIMARY KEY). To run this migration, `gh-ost` would iterate through the `tbl` table using the primary key, copy rows from `tbl` to the _ghost_ table `_tbl_gho` in primary key order, while also applying the binlog event writes from `tbl` onto `_tbl_gho`.
The applying of the binlog events is what requires the shared unique key. For example, an `UPDATE` statement to `tbl` translates to a `REPLACE` statement which `gh-ost` applies to `_tbl_gho`. A `REPLACE` statement expects to insert or replace an existing row based on its row's values and the table's unique key constraints. In particular, if inserting that row would result in a unique key violation (e.g., a row with that primary key already exists), it would _replace_ that existing row with the new values. The applying of the binlog events is what requires the shared unique key. For example, an `UPDATE` statement to `tbl` translates to a `REPLACE` statement which `gh-ost` applies to `_tbl_gho`. A `REPLACE` statement expects to insert or replace an existing row based on its row's values and the table's unique key constraints. In particular, if inserting that row would result in a unique key violation (e.g., a row with that primary key already exists), it would _replace_ that existing row with the new values.

View File

@ -101,6 +101,7 @@ type MigrationContext struct {
AliyunRDS bool AliyunRDS bool
GoogleCloudPlatform bool GoogleCloudPlatform bool
AzureMySQL bool AzureMySQL bool
AttemptInstantDDL bool
config ContextConfig config ContextConfig
configMutex *sync.Mutex configMutex *sync.Mutex
@ -862,7 +863,7 @@ func (this *MigrationContext) ReadConfigFile() error {
if cfg.Section("osc").HasKey("chunk_size") { if cfg.Section("osc").HasKey("chunk_size") {
this.config.Osc.Chunk_Size, err = cfg.Section("osc").Key("chunk_size").Int64() this.config.Osc.Chunk_Size, err = cfg.Section("osc").Key("chunk_size").Int64()
if err != nil { if err != nil {
return fmt.Errorf("Unable to read osc chunk size: %s", err.Error()) return fmt.Errorf("Unable to read osc chunk size: %w", err)
} }
} }
@ -877,7 +878,7 @@ func (this *MigrationContext) ReadConfigFile() error {
if cfg.Section("osc").HasKey("max_lag_millis") { if cfg.Section("osc").HasKey("max_lag_millis") {
this.config.Osc.Max_Lag_Millis, err = cfg.Section("osc").Key("max_lag_millis").Int64() this.config.Osc.Max_Lag_Millis, err = cfg.Section("osc").Key("max_lag_millis").Int64()
if err != nil { if err != nil {
return fmt.Errorf("Unable to read max lag millis: %s", err.Error()) return fmt.Errorf("Unable to read max lag millis: %w", err)
} }
} }

View File

@ -67,6 +67,8 @@ func main() {
flag.StringVar(&migrationContext.DatabaseName, "database", "", "database name (mandatory)") flag.StringVar(&migrationContext.DatabaseName, "database", "", "database name (mandatory)")
flag.StringVar(&migrationContext.OriginalTableName, "table", "", "table name (mandatory)") flag.StringVar(&migrationContext.OriginalTableName, "table", "", "table name (mandatory)")
flag.StringVar(&migrationContext.AlterStatement, "alter", "", "alter statement (mandatory)") flag.StringVar(&migrationContext.AlterStatement, "alter", "", "alter statement (mandatory)")
flag.BoolVar(&migrationContext.AttemptInstantDDL, "attempt-instant-ddl", false, "Attempt to use instant DDL for this migration first")
flag.BoolVar(&migrationContext.CountTableRows, "exact-rowcount", false, "actually count table rows as opposed to estimate them (results in more accurate progress estimation)") flag.BoolVar(&migrationContext.CountTableRows, "exact-rowcount", false, "actually count table rows as opposed to estimate them (results in more accurate progress estimation)")
flag.BoolVar(&migrationContext.ConcurrentCountTableRows, "concurrent-rowcount", true, "(with --exact-rowcount), when true (default): count rows after row-copy begins, concurrently, and adjust row estimate later on; when false: first count rows, then start row copy") flag.BoolVar(&migrationContext.ConcurrentCountTableRows, "concurrent-rowcount", true, "(with --exact-rowcount), when true (default): count rows after row-copy begins, concurrently, and adjust row estimate later on; when false: first count rows, then start row copy")
flag.BoolVar(&migrationContext.AllowedRunningOnMaster, "allow-on-master", false, "allow this migration to run directly on master. Preferably it would run on a replica") flag.BoolVar(&migrationContext.AllowedRunningOnMaster, "allow-on-master", false, "allow this migration to run directly on master. Preferably it would run on a replica")

View File

@ -135,6 +135,16 @@ func (this *Applier) generateSqlModeQuery() string {
return fmt.Sprintf("sql_mode = %s", sqlModeQuery) return fmt.Sprintf("sql_mode = %s", sqlModeQuery)
} }
// generateInstantDDLQuery returns the SQL for this ALTER operation
// with an INSTANT assertion (requires MySQL 8.0+)
func (this *Applier) generateInstantDDLQuery() string {
return fmt.Sprintf(`ALTER /* gh-ost */ TABLE %s.%s %s, ALGORITHM=INSTANT`,
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.OriginalTableName),
this.migrationContext.AlterStatementOptions,
)
}
// readTableColumns reads table columns on applier // readTableColumns reads table columns on applier
func (this *Applier) readTableColumns() (err error) { func (this *Applier) readTableColumns() (err error) {
this.migrationContext.Log.Infof("Examining table structure on applier") this.migrationContext.Log.Infof("Examining table structure on applier")
@ -188,6 +198,27 @@ func (this *Applier) ValidateOrDropExistingTables() error {
return nil return nil
} }
// AttemptInstantDDL attempts to use instant DDL (from MySQL 8.0, and earlier in Aurora and some others).
// If successful, the operation is only a meta-data change so a lot of time is saved!
// The risk of attempting to instant DDL when not supported is that a metadata lock may be acquired.
// This is minor, since gh-ost will eventually require a metadata lock anyway, but at the cut-over stage.
// Instant operations include:
// - Adding a column
// - Dropping a column
// - Dropping an index
// - Extending a VARCHAR column
// - Adding a virtual generated column
// It is not reliable to parse the `alter` statement to determine if it is instant or not.
// This is because the table might be in an older row format, or have some other incompatibility
// that is difficult to identify.
func (this *Applier) AttemptInstantDDL() error {
query := this.generateInstantDDLQuery()
this.migrationContext.Log.Infof("INSTANT DDL query is: %s", query)
// We don't need a trx, because for instant DDL the SQL mode doesn't matter.
_, err := this.db.Exec(query)
return err
}
// CreateGhostTable creates the ghost table on the applier host // CreateGhostTable creates the ghost table on the applier host
func (this *Applier) CreateGhostTable() error { func (this *Applier) CreateGhostTable() error {
query := fmt.Sprintf(`create /* gh-ost */ table %s.%s like %s.%s`, query := fmt.Sprintf(`create /* gh-ost */ table %s.%s like %s.%s`,
@ -1134,7 +1165,7 @@ func (this *Applier) ApplyDMLEventQueries(dmlEvents [](*binlog.BinlogDMLEvent))
} }
result, err := tx.Exec(buildResult.query, buildResult.args...) result, err := tx.Exec(buildResult.query, buildResult.args...)
if err != nil { if err != nil {
err = fmt.Errorf("%s; query=%s; args=%+v", err.Error(), buildResult.query, buildResult.args) err = fmt.Errorf("%w; query=%s; args=%+v", err, buildResult.query, buildResult.args)
return rollback(err) return rollback(err)
} }

View File

@ -170,3 +170,16 @@ func TestApplierBuildDMLEventQuery(t *testing.T) {
test.S(t).ExpectEquals(res[0].args[3], 42) test.S(t).ExpectEquals(res[0].args[3], 42)
}) })
} }
func TestApplierInstantDDL(t *testing.T) {
migrationContext := base.NewMigrationContext()
migrationContext.DatabaseName = "test"
migrationContext.OriginalTableName = "mytable"
migrationContext.AlterStatementOptions = "ADD INDEX (foo)"
applier := NewApplier(migrationContext)
t.Run("instantDDLstmt", func(t *testing.T) {
stmt := applier.generateInstantDDLQuery()
test.S(t).ExpectEquals(stmt, "ALTER /* gh-ost */ TABLE `test`.`mytable` ADD INDEX (foo), ALGORITHM=INSTANT")
})
}

View File

@ -7,6 +7,7 @@ package logic
import ( import (
"fmt" "fmt"
"io"
"os" "os"
"os/exec" "os/exec"
"path/filepath" "path/filepath"
@ -34,18 +35,16 @@ const (
type HooksExecutor struct { type HooksExecutor struct {
migrationContext *base.MigrationContext migrationContext *base.MigrationContext
writer io.Writer
} }
func NewHooksExecutor(migrationContext *base.MigrationContext) *HooksExecutor { func NewHooksExecutor(migrationContext *base.MigrationContext) *HooksExecutor {
return &HooksExecutor{ return &HooksExecutor{
migrationContext: migrationContext, migrationContext: migrationContext,
writer: os.Stderr,
} }
} }
func (this *HooksExecutor) initHooks() error {
return nil
}
func (this *HooksExecutor) applyEnvironmentVariables(extraVariables ...string) []string { func (this *HooksExecutor) applyEnvironmentVariables(extraVariables ...string) []string {
env := os.Environ() env := os.Environ()
env = append(env, fmt.Sprintf("GH_OST_DATABASE_NAME=%s", this.migrationContext.DatabaseName)) env = append(env, fmt.Sprintf("GH_OST_DATABASE_NAME=%s", this.migrationContext.DatabaseName))
@ -76,13 +75,13 @@ func (this *HooksExecutor) applyEnvironmentVariables(extraVariables ...string) [
} }
// executeHook executes a command, and sets relevant environment variables // executeHook executes a command, and sets relevant environment variables
// combined output & error are printed to gh-ost's standard error. // combined output & error are printed to the configured writer.
func (this *HooksExecutor) executeHook(hook string, extraVariables ...string) error { func (this *HooksExecutor) executeHook(hook string, extraVariables ...string) error {
cmd := exec.Command(hook) cmd := exec.Command(hook)
cmd.Env = this.applyEnvironmentVariables(extraVariables...) cmd.Env = this.applyEnvironmentVariables(extraVariables...)
combinedOutput, err := cmd.CombinedOutput() combinedOutput, err := cmd.CombinedOutput()
fmt.Fprintln(os.Stderr, string(combinedOutput)) fmt.Fprintln(this.writer, string(combinedOutput))
return log.Errore(err) return log.Errore(err)
} }

113
go/logic/hooks_test.go Normal file
View File

@ -0,0 +1,113 @@
/*
Copyright 2022 GitHub Inc.
See https://github.com/github/gh-ost/blob/master/LICENSE
*/
package logic
import (
"bufio"
"bytes"
"fmt"
"os"
"path/filepath"
"strconv"
"strings"
"testing"
"time"
"github.com/openark/golib/tests"
"github.com/github/gh-ost/go/base"
)
func TestHooksExecutorExecuteHooks(t *testing.T) {
migrationContext := base.NewMigrationContext()
migrationContext.AlterStatement = "ENGINE=InnoDB"
migrationContext.DatabaseName = "test"
migrationContext.Hostname = "test.example.com"
migrationContext.OriginalTableName = "tablename"
migrationContext.RowsDeltaEstimate = 1
migrationContext.RowsEstimate = 122
migrationContext.TotalRowsCopied = 123456
migrationContext.SetETADuration(time.Minute)
migrationContext.SetProgressPct(50)
hooksExecutor := NewHooksExecutor(migrationContext)
writeTmpHookFunc := func(testName, hookName, script string) (path string, err error) {
if path, err = os.MkdirTemp("", testName); err != nil {
return path, err
}
err = os.WriteFile(filepath.Join(path, hookName), []byte(script), 0777)
return path, err
}
t.Run("does-not-exist", func(t *testing.T) {
migrationContext.HooksPath = "/does/not/exist"
tests.S(t).ExpectNil(hooksExecutor.executeHooks("test-hook"))
})
t.Run("failed", func(t *testing.T) {
var err error
if migrationContext.HooksPath, err = writeTmpHookFunc(
"TestHooksExecutorExecuteHooks-failed",
"failed-hook",
"#!/bin/sh\nexit 1",
); err != nil {
panic(err)
}
defer os.RemoveAll(migrationContext.HooksPath)
tests.S(t).ExpectNotNil(hooksExecutor.executeHooks("failed-hook"))
})
t.Run("success", func(t *testing.T) {
var err error
if migrationContext.HooksPath, err = writeTmpHookFunc(
"TestHooksExecutorExecuteHooks-success",
"success-hook",
"#!/bin/sh\nenv",
); err != nil {
panic(err)
}
defer os.RemoveAll(migrationContext.HooksPath)
var buf bytes.Buffer
hooksExecutor.writer = &buf
tests.S(t).ExpectNil(hooksExecutor.executeHooks("success-hook", "TEST="+t.Name()))
scanner := bufio.NewScanner(&buf)
for scanner.Scan() {
split := strings.SplitN(scanner.Text(), "=", 2)
switch split[0] {
case "GH_OST_COPIED_ROWS":
copiedRows, _ := strconv.ParseInt(split[1], 10, 64)
tests.S(t).ExpectEquals(copiedRows, migrationContext.TotalRowsCopied)
case "GH_OST_DATABASE_NAME":
tests.S(t).ExpectEquals(split[1], migrationContext.DatabaseName)
case "GH_OST_DDL":
tests.S(t).ExpectEquals(split[1], migrationContext.AlterStatement)
case "GH_OST_DRY_RUN":
tests.S(t).ExpectEquals(split[1], "false")
case "GH_OST_ESTIMATED_ROWS":
estimatedRows, _ := strconv.ParseInt(split[1], 10, 64)
tests.S(t).ExpectEquals(estimatedRows, int64(123))
case "GH_OST_ETA_SECONDS":
etaSeconds, _ := strconv.ParseInt(split[1], 10, 64)
tests.S(t).ExpectEquals(etaSeconds, int64(60))
case "GH_OST_EXECUTING_HOST":
tests.S(t).ExpectEquals(split[1], migrationContext.Hostname)
case "GH_OST_GHOST_TABLE_NAME":
tests.S(t).ExpectEquals(split[1], fmt.Sprintf("_%s_gho", migrationContext.OriginalTableName))
case "GH_OST_OLD_TABLE_NAME":
tests.S(t).ExpectEquals(split[1], fmt.Sprintf("_%s_del", migrationContext.OriginalTableName))
case "GH_OST_PROGRESS":
progress, _ := strconv.ParseFloat(split[1], 64)
tests.S(t).ExpectEquals(progress, 50.0)
case "GH_OST_TABLE_NAME":
tests.S(t).ExpectEquals(split[1], migrationContext.OriginalTableName)
case "TEST":
tests.S(t).ExpectEquals(split[1], t.Name())
}
}
})
}

View File

@ -8,6 +8,7 @@ package logic
import ( import (
"context" "context"
gosql "database/sql" gosql "database/sql"
"errors"
"fmt" "fmt"
"reflect" "reflect"
"strings" "strings"
@ -554,13 +555,11 @@ func (this *Inspector) CountTableRows(ctx context.Context) error {
query := fmt.Sprintf(`select /* gh-ost */ count(*) as count_rows from %s.%s`, sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.OriginalTableName)) query := fmt.Sprintf(`select /* gh-ost */ count(*) as count_rows from %s.%s`, sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.OriginalTableName))
var rowsEstimate int64 var rowsEstimate int64
if err := conn.QueryRowContext(ctx, query).Scan(&rowsEstimate); err != nil { if err := conn.QueryRowContext(ctx, query).Scan(&rowsEstimate); err != nil {
switch err { if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
case context.Canceled, context.DeadlineExceeded:
this.migrationContext.Log.Infof("exact row count cancelled (%s), likely because I'm about to cut over. I'm going to kill that query.", ctx.Err()) this.migrationContext.Log.Infof("exact row count cancelled (%s), likely because I'm about to cut over. I'm going to kill that query.", ctx.Err())
return mysql.Kill(this.db, connectionID) return mysql.Kill(this.db, connectionID)
default:
return err
} }
return err
} }
// row count query finished. nil out the cancel func, so the main migration thread // row count query finished. nil out the cancel func, so the main migration thread

View File

@ -98,6 +98,7 @@ type Migrator struct {
func NewMigrator(context *base.MigrationContext, appVersion string) *Migrator { func NewMigrator(context *base.MigrationContext, appVersion string) *Migrator {
migrator := &Migrator{ migrator := &Migrator{
appVersion: appVersion, appVersion: appVersion,
hooksExecutor: NewHooksExecutor(context),
migrationContext: context, migrationContext: context,
parser: sql.NewAlterTableParser(), parser: sql.NewAlterTableParser(),
ghostTableMigrated: make(chan bool), ghostTableMigrated: make(chan bool),
@ -113,15 +114,6 @@ func NewMigrator(context *base.MigrationContext, appVersion string) *Migrator {
return migrator return migrator
} }
// initiateHooksExecutor
func (this *Migrator) initiateHooksExecutor() (err error) {
this.hooksExecutor = NewHooksExecutor(this.migrationContext)
if err := this.hooksExecutor.initHooks(); err != nil {
return err
}
return nil
}
// sleepWhileTrue sleeps indefinitely until the given function returns 'false' // sleepWhileTrue sleeps indefinitely until the given function returns 'false'
// (or fails with error) // (or fails with error)
func (this *Migrator) sleepWhileTrue(operation func() (bool, error)) error { func (this *Migrator) sleepWhileTrue(operation func() (bool, error)) error {
@ -342,9 +334,6 @@ func (this *Migrator) Migrate() (err error) {
go this.listenOnPanicAbort() go this.listenOnPanicAbort()
if err := this.initiateHooksExecutor(); err != nil {
return err
}
if err := this.hooksExecutor.onStartup(); err != nil { if err := this.hooksExecutor.onStartup(); err != nil {
return err return err
} }
@ -371,6 +360,17 @@ func (this *Migrator) Migrate() (err error) {
if err := this.createFlagFiles(); err != nil { if err := this.createFlagFiles(); err != nil {
return err return err
} }
// In MySQL 8.0 (and possibly earlier) some DDL statements can be applied instantly.
// Attempt to do this if AttemptInstantDDL is set.
if this.migrationContext.AttemptInstantDDL {
this.migrationContext.Log.Infof("Attempting to execute alter with ALGORITHM=INSTANT")
if err := this.applier.AttemptInstantDDL(); err == nil {
this.migrationContext.Log.Infof("Success! table %s.%s migrated instantly", sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.OriginalTableName))
return nil
} else {
this.migrationContext.Log.Infof("ALGORITHM=INSTANT not supported for this operation, proceeding with original algorithm: %s", err)
}
}
initialLag, _ := this.inspector.getReplicationLag() initialLag, _ := this.inspector.getReplicationLag()
this.migrationContext.Log.Infof("Waiting for ghost table to be migrated. Current lag is %+v", initialLag) this.migrationContext.Log.Infof("Waiting for ghost table to be migrated. Current lag is %+v", initialLag)
@ -402,9 +402,9 @@ func (this *Migrator) Migrate() (err error) {
if err := this.applier.ReadMigrationRangeValues(); err != nil { if err := this.applier.ReadMigrationRangeValues(); err != nil {
return err return err
} }
if err := this.initiateThrottler(); err != nil {
return err this.initiateThrottler()
}
if err := this.hooksExecutor.onBeforeRowCopy(); err != nil { if err := this.hooksExecutor.onBeforeRowCopy(); err != nil {
return err return err
} }
@ -1047,6 +1047,7 @@ func (this *Migrator) printStatus(rule PrintStatusRule, writers ...io.Writer) {
) )
w := io.MultiWriter(writers...) w := io.MultiWriter(writers...)
fmt.Fprintln(w, status) fmt.Fprintln(w, status)
this.migrationContext.Log.Infof(status)
hooksStatusIntervalSec := this.migrationContext.HooksStatusIntervalSec hooksStatusIntervalSec := this.migrationContext.HooksStatusIntervalSec
if hooksStatusIntervalSec > 0 && elapsedSeconds%hooksStatusIntervalSec == 0 { if hooksStatusIntervalSec > 0 && elapsedSeconds%hooksStatusIntervalSec == 0 {
@ -1107,7 +1108,7 @@ func (this *Migrator) addDMLEventsListener() error {
} }
// initiateThrottler kicks in the throttling collection and the throttling checks. // initiateThrottler kicks in the throttling collection and the throttling checks.
func (this *Migrator) initiateThrottler() error { func (this *Migrator) initiateThrottler() {
this.throttler = NewThrottler(this.migrationContext, this.applier, this.inspector, this.appVersion) this.throttler = NewThrottler(this.migrationContext, this.applier, this.inspector, this.appVersion)
go this.throttler.initiateThrottlerCollection(this.firstThrottlingCollected) go this.throttler.initiateThrottlerCollection(this.firstThrottlingCollected)
@ -1117,8 +1118,6 @@ func (this *Migrator) initiateThrottler() error {
<-this.firstThrottlingCollected // other, general metrics <-this.firstThrottlingCollected // other, general metrics
this.migrationContext.Log.Infof("First throttle metrics collected") this.migrationContext.Log.Infof("First throttle metrics collected")
go this.throttler.initiateThrottlerChecks() go this.throttler.initiateThrottlerChecks()
return nil
} }
func (this *Migrator) initiateApplier() error { func (this *Migrator) initiateApplier() error {

View File

@ -308,6 +308,8 @@ func (this *Throttler) collectThrottleHTTPStatus(firstThrottlingCollected chan<-
if err != nil { if err != nil {
return false, err return false, err
} }
defer resp.Body.Close()
atomic.StoreInt64(&this.migrationContext.ThrottleHTTPStatusCode, int64(resp.StatusCode)) atomic.StoreInt64(&this.migrationContext.ThrottleHTTPStatusCode, int64(resp.StatusCode))
return false, nil return false, nil
} }

View File

@ -62,7 +62,7 @@ func NewParserFromAlterStatement(alterStatement string) *AlterTableParser {
return parser return parser
} }
func (this *AlterTableParser) tokenizeAlterStatement(alterStatement string) (tokens []string, err error) { func (this *AlterTableParser) tokenizeAlterStatement(alterStatement string) (tokens []string) {
terminatingQuote := rune(0) terminatingQuote := rune(0)
f := func(c rune) bool { f := func(c rune) bool {
switch { switch {
@ -86,7 +86,7 @@ func (this *AlterTableParser) tokenizeAlterStatement(alterStatement string) (tok
for i := range tokens { for i := range tokens {
tokens[i] = strings.TrimSpace(tokens[i]) tokens[i] = strings.TrimSpace(tokens[i])
} }
return tokens, nil return tokens
} }
func (this *AlterTableParser) sanitizeQuotesFromAlterStatement(alterStatement string) (strippedStatement string) { func (this *AlterTableParser) sanitizeQuotesFromAlterStatement(alterStatement string) (strippedStatement string) {
@ -95,7 +95,7 @@ func (this *AlterTableParser) sanitizeQuotesFromAlterStatement(alterStatement st
return strippedStatement return strippedStatement
} }
func (this *AlterTableParser) parseAlterToken(alterToken string) (err error) { func (this *AlterTableParser) parseAlterToken(alterToken string) {
{ {
// rename // rename
allStringSubmatch := renameColumnRegexp.FindAllStringSubmatch(alterToken, -1) allStringSubmatch := renameColumnRegexp.FindAllStringSubmatch(alterToken, -1)
@ -131,7 +131,6 @@ func (this *AlterTableParser) parseAlterToken(alterToken string) (err error) {
this.isAutoIncrementDefined = true this.isAutoIncrementDefined = true
} }
} }
return nil
} }
func (this *AlterTableParser) ParseAlterStatement(alterStatement string) (err error) { func (this *AlterTableParser) ParseAlterStatement(alterStatement string) (err error) {
@ -151,8 +150,7 @@ func (this *AlterTableParser) ParseAlterStatement(alterStatement string) (err er
break break
} }
} }
alterTokens, _ := this.tokenizeAlterStatement(this.alterStatementOptions) for _, alterToken := range this.tokenizeAlterStatement(this.alterStatementOptions) {
for _, alterToken := range alterTokens {
alterToken = this.sanitizeQuotesFromAlterStatement(alterToken) alterToken = this.sanitizeQuotesFromAlterStatement(alterToken)
this.parseAlterToken(alterToken) this.parseAlterToken(alterToken)
this.alterTokens = append(this.alterTokens, alterToken) this.alterTokens = append(this.alterTokens, alterToken)

View File

@ -99,37 +99,37 @@ func TestTokenizeAlterStatement(t *testing.T) {
parser := NewAlterTableParser() parser := NewAlterTableParser()
{ {
alterStatement := "add column t int" alterStatement := "add column t int"
tokens, _ := parser.tokenizeAlterStatement(alterStatement) tokens := parser.tokenizeAlterStatement(alterStatement)
test.S(t).ExpectTrue(reflect.DeepEqual(tokens, []string{"add column t int"})) test.S(t).ExpectTrue(reflect.DeepEqual(tokens, []string{"add column t int"}))
} }
{ {
alterStatement := "add column t int, change column i int" alterStatement := "add column t int, change column i int"
tokens, _ := parser.tokenizeAlterStatement(alterStatement) tokens := parser.tokenizeAlterStatement(alterStatement)
test.S(t).ExpectTrue(reflect.DeepEqual(tokens, []string{"add column t int", "change column i int"})) test.S(t).ExpectTrue(reflect.DeepEqual(tokens, []string{"add column t int", "change column i int"}))
} }
{ {
alterStatement := "add column t int, change column i int 'some comment'" alterStatement := "add column t int, change column i int 'some comment'"
tokens, _ := parser.tokenizeAlterStatement(alterStatement) tokens := parser.tokenizeAlterStatement(alterStatement)
test.S(t).ExpectTrue(reflect.DeepEqual(tokens, []string{"add column t int", "change column i int 'some comment'"})) test.S(t).ExpectTrue(reflect.DeepEqual(tokens, []string{"add column t int", "change column i int 'some comment'"}))
} }
{ {
alterStatement := "add column t int, change column i int 'some comment, with comma'" alterStatement := "add column t int, change column i int 'some comment, with comma'"
tokens, _ := parser.tokenizeAlterStatement(alterStatement) tokens := parser.tokenizeAlterStatement(alterStatement)
test.S(t).ExpectTrue(reflect.DeepEqual(tokens, []string{"add column t int", "change column i int 'some comment, with comma'"})) test.S(t).ExpectTrue(reflect.DeepEqual(tokens, []string{"add column t int", "change column i int 'some comment, with comma'"}))
} }
{ {
alterStatement := "add column t int, add column d decimal(10,2)" alterStatement := "add column t int, add column d decimal(10,2)"
tokens, _ := parser.tokenizeAlterStatement(alterStatement) tokens := parser.tokenizeAlterStatement(alterStatement)
test.S(t).ExpectTrue(reflect.DeepEqual(tokens, []string{"add column t int", "add column d decimal(10,2)"})) test.S(t).ExpectTrue(reflect.DeepEqual(tokens, []string{"add column t int", "add column d decimal(10,2)"}))
} }
{ {
alterStatement := "add column t int, add column e enum('a','b','c')" alterStatement := "add column t int, add column e enum('a','b','c')"
tokens, _ := parser.tokenizeAlterStatement(alterStatement) tokens := parser.tokenizeAlterStatement(alterStatement)
test.S(t).ExpectTrue(reflect.DeepEqual(tokens, []string{"add column t int", "add column e enum('a','b','c')"})) test.S(t).ExpectTrue(reflect.DeepEqual(tokens, []string{"add column t int", "add column e enum('a','b','c')"}))
} }
{ {
alterStatement := "add column t int(11), add column e enum('a','b','c')" alterStatement := "add column t int(11), add column e enum('a','b','c')"
tokens, _ := parser.tokenizeAlterStatement(alterStatement) tokens := parser.tokenizeAlterStatement(alterStatement)
test.S(t).ExpectTrue(reflect.DeepEqual(tokens, []string{"add column t int(11)", "add column e enum('a','b','c')"})) test.S(t).ExpectTrue(reflect.DeepEqual(tokens, []string{"add column t int(11)", "add column e enum('a','b','c')"}))
} }
} }

View File

@ -0,0 +1,13 @@
drop table if exists gh_ost_test;
create table gh_ost_test (
id int auto_increment,
i int not null,
color varchar(32),
primary key(id)
) auto_increment=1;
drop event if exists gh_ost_test;
insert into gh_ost_test values (null, 11, 'red');
insert into gh_ost_test values (null, 13, 'green');
insert into gh_ost_test values (null, 17, 'blue');

View File

@ -0,0 +1 @@
--attempt-instant-ddl

View File

@ -7,9 +7,6 @@ create table gh_ost_test (
primary key(id) primary key(id)
) auto_increment=1; ) auto_increment=1;
insert into gh_ost_test values (null, 'átesting');
insert into gh_ost_test values (null, 'Hello world, Καλημέρα κόσμε, コンニチハ', 'átesting0', 'initial'); insert into gh_ost_test values (null, 'Hello world, Καλημέρα κόσμε, コンニチハ', 'átesting0', 'initial');
drop event if exists gh_ost_test; drop event if exists gh_ost_test;

View File

@ -117,6 +117,14 @@ test_single() {
fi fi
gh-ost-test-mysql-master --default-character-set=utf8mb4 test < $tests_path/$test_name/create.sql gh-ost-test-mysql-master --default-character-set=utf8mb4 test < $tests_path/$test_name/create.sql
test_create_result=$?
if [ $test_create_result -ne 0 ] ; then
echo
echo "ERROR $test_name create failure. cat $tests_path/$test_name/create.sql:"
cat $tests_path/$test_name/create.sql
return 1
fi
extra_args="" extra_args=""
if [ -f $tests_path/$test_name/extra_args ] ; then if [ -f $tests_path/$test_name/extra_args ] ; then
@ -255,7 +263,7 @@ build_binary() {
test_all() { test_all() {
build_binary build_binary
find $tests_path ! -path . -type d -mindepth 1 -maxdepth 1 | cut -d "/" -f 3 | egrep "$test_pattern" | while read test_name ; do find $tests_path ! -path . -type d -mindepth 1 -maxdepth 1 | cut -d "/" -f 3 | egrep "$test_pattern" | sort | while read test_name ; do
test_single "$test_name" test_single "$test_name"
if [ $? -ne 0 ] ; then if [ $? -ne 0 ] ; then
create_statement=$(gh-ost-test-mysql-replica test -t -e "show create table _gh_ost_test_gho \G") create_statement=$(gh-ost-test-mysql-replica test -t -e "show create table _gh_ost_test_gho \G")

View File

@ -14,4 +14,4 @@ script/build
cd .gopath/src/github.com/github/gh-ost cd .gopath/src/github.com/github/gh-ost
echo "Running unit tests" echo "Running unit tests"
go test ./go/... go test -v -covermode=atomic ./go/...