Supporting column rename

- Parsing `alter` statement to catch `change old_name new_name ...` statements
- Auto deducing renamed columns
- When suspecting renamed columns, requesting explicit `--approve-renamed-columns` or `--skip-renamed-columns`
- updated tests
This commit is contained in:
Shlomi Noach 2016-06-17 08:03:18 +02:00
parent 5a8688ee2e
commit 836d0fe119
10 changed files with 173 additions and 35 deletions

View File

@ -1,7 +1,7 @@
#!/bin/bash
#
#
RELEASE_VERSION="0.9.2"
RELEASE_VERSION="0.9.3"
buildpath=/tmp/gh-ost
target=gh-ost

View File

@ -36,7 +36,7 @@ const (
)
const (
maxRetries = 10
maxRetries = 60
)
// MigrationContext has the general, global state of migration. It is used by
@ -50,6 +50,8 @@ type MigrationContext struct {
AllowedRunningOnMaster bool
SwitchToRowBinlogFormat bool
NullableUniqueKeyAllowed bool
ApproveRenamedColumns bool
SkipRenamedColumns bool
config ContextConfig
configMutex *sync.Mutex
@ -108,6 +110,8 @@ type MigrationContext struct {
GhostTableUniqueKeys [](*sql.UniqueKey)
UniqueKey *sql.UniqueKey
SharedColumns *sql.ColumnList
ColumnRenameMap map[string]string
MappedSharedColumns *sql.ColumnList
MigrationRangeMinValues *sql.ColumnValues
MigrationRangeMaxValues *sql.ColumnValues
Iteration int64
@ -149,6 +153,7 @@ func newMigrationContext() *MigrationContext {
ThrottleControlReplicaKeys: mysql.NewInstanceKeyMap(),
configMutex: &sync.Mutex{},
pointOfInterestTimeMutex: &sync.Mutex{},
ColumnRenameMap: make(map[string]string),
}
}

View File

@ -9,6 +9,7 @@ import (
"fmt"
"os"
"regexp"
"strings"
"time"
)
@ -31,3 +32,19 @@ func FileExists(fileName string) bool {
}
return false
}
func StringContainsAll(s string, substrings ...string) bool {
nonEmptyStringsFound := false
for _, substring := range substrings {
if s == "" {
continue
}
if strings.Contains(s, substring) {
nonEmptyStringsFound = true
} else {
// Immediate failure
return false
}
}
return nonEmptyStringsFound
}

View File

@ -55,6 +55,8 @@ func main() {
flag.BoolVar(&migrationContext.CountTableRows, "exact-rowcount", false, "actually count table rows as opposed to estimate them (results in more accurate progress estimation)")
flag.BoolVar(&migrationContext.AllowedRunningOnMaster, "allow-on-master", false, "allow this migration to run directly on master. Preferably it would run on a replica")
flag.BoolVar(&migrationContext.NullableUniqueKeyAllowed, "allow-nullable-unique-key", false, "allow gh-ost to migrate based on a unique key with nullable columns. As long as no NULL values exist, this should be OK. If NULL values exist in chosen key, data may be corrupted. Use at your own risk!")
flag.BoolVar(&migrationContext.ApproveRenamedColumns, "approve-renamed-columns", false, "in case your `ALTER` statement renames columns, gh-ost will note that and offer its interpretation of the rename. By default gh-ost does not proceed to execute. This flag approves that gh-ost's interpretation si correct")
flag.BoolVar(&migrationContext.SkipRenamedColumns, "skip-renamed-columns", false, "in case your `ALTER` statement renames columns, gh-ost will note that and offer its interpretation of the rename. By default gh-ost does not proceed to execute. This flag tells gh-ost to skip the renamed columns, i.e. to treat what gh-ost thinks are renamed columns as unrelated columns. NOTE: you may lose column data")
executeFlag := flag.Bool("execute", false, "actually execute the alter & migrate the table. Default is noop: do some tests and exit")
flag.BoolVar(&migrationContext.TestOnReplica, "test-on-replica", false, "Have the migration run on a replica, not on the master. At the end of migration replication is stopped, and tables are swapped and immediately swap-revert. Replication remains stopped and you can compare the two tables for building trust")

View File

@ -408,6 +408,7 @@ func (this *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected
this.migrationContext.OriginalTableName,
this.migrationContext.GetGhostTableName(),
this.migrationContext.SharedColumns.Names,
this.migrationContext.MappedSharedColumns.Names,
this.migrationContext.UniqueKey.Name,
this.migrationContext.UniqueKey.Columns.Names,
this.migrationContext.MigrationIterationRangeMinValues.AbstractValues(),
@ -785,12 +786,12 @@ func (this *Applier) buildDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) (query
}
case binlog.InsertDML:
{
query, sharedArgs, err := sql.BuildDMLInsertQuery(dmlEvent.DatabaseName, this.migrationContext.GetGhostTableName(), this.migrationContext.OriginalTableColumns, this.migrationContext.SharedColumns, dmlEvent.NewColumnValues.AbstractValues())
query, sharedArgs, err := sql.BuildDMLInsertQuery(dmlEvent.DatabaseName, this.migrationContext.GetGhostTableName(), this.migrationContext.OriginalTableColumns, this.migrationContext.MappedSharedColumns, dmlEvent.NewColumnValues.AbstractValues())
return query, sharedArgs, 1, err
}
case binlog.UpdateDML:
{
query, sharedArgs, uniqueKeyArgs, err := sql.BuildDMLUpdateQuery(dmlEvent.DatabaseName, this.migrationContext.GetGhostTableName(), this.migrationContext.OriginalTableColumns, this.migrationContext.SharedColumns, &this.migrationContext.UniqueKey.Columns, dmlEvent.NewColumnValues.AbstractValues(), dmlEvent.WhereColumnValues.AbstractValues())
query, sharedArgs, uniqueKeyArgs, err := sql.BuildDMLUpdateQuery(dmlEvent.DatabaseName, this.migrationContext.GetGhostTableName(), this.migrationContext.OriginalTableColumns, this.migrationContext.MappedSharedColumns, &this.migrationContext.UniqueKey.Columns, dmlEvent.NewColumnValues.AbstractValues(), dmlEvent.WhereColumnValues.AbstractValues())
args = append(args, sharedArgs...)
args = append(args, uniqueKeyArgs...)
return query, args, 0, err

View File

@ -122,7 +122,7 @@ func (this *Inspector) InspectOriginalAndGhostTables() (err error) {
}
}
this.migrationContext.SharedColumns = this.getSharedColumns(this.migrationContext.OriginalTableColumns, this.migrationContext.GhostTableColumns)
this.migrationContext.SharedColumns, this.migrationContext.MappedSharedColumns = this.getSharedColumns(this.migrationContext.OriginalTableColumns, this.migrationContext.GhostTableColumns, this.migrationContext.ColumnRenameMap)
log.Infof("Shared columns are %s", this.migrationContext.SharedColumns)
// By fact that a non-empty unique key exists we also know the shared columns are non-empty
return nil
@ -145,21 +145,6 @@ func (this *Inspector) validateConnection() error {
// validateGrants verifies the user by which we're executing has necessary grants
// to do its thang.
func (this *Inspector) validateGrants() error {
stringContainsAll := func(s string, substrings ...string) bool {
nonEmptyStringsFound := false
for _, substring := range substrings {
if s == "" {
continue
}
if strings.Contains(s, substring) {
nonEmptyStringsFound = true
} else {
// Immediate failure
return false
}
}
return nonEmptyStringsFound
}
query := `show /* gh-ost */ grants for current_user()`
foundAll := false
foundSuper := false
@ -181,10 +166,10 @@ func (this *Inspector) validateGrants() error {
if strings.Contains(grant, fmt.Sprintf("GRANT ALL PRIVILEGES ON `%s`.*", this.migrationContext.DatabaseName)) {
foundDBAll = true
}
if stringContainsAll(grant, `ALTER`, `CREATE`, `DELETE`, `DROP`, `INDEX`, `INSERT`, `LOCK TABLES`, `SELECT`, `TRIGGER`, `UPDATE`, ` ON *.*`) {
if base.StringContainsAll(grant, `ALTER`, `CREATE`, `DELETE`, `DROP`, `INDEX`, `INSERT`, `LOCK TABLES`, `SELECT`, `TRIGGER`, `UPDATE`, ` ON *.*`) {
foundDBAll = true
}
if stringContainsAll(grant, `ALTER`, `CREATE`, `DELETE`, `DROP`, `INDEX`, `INSERT`, `LOCK TABLES`, `SELECT`, `TRIGGER`, `UPDATE`, fmt.Sprintf(" ON `%s`.*", this.migrationContext.DatabaseName)) {
if base.StringContainsAll(grant, `ALTER`, `CREATE`, `DELETE`, `DROP`, `INDEX`, `INSERT`, `LOCK TABLES`, `SELECT`, `TRIGGER`, `UPDATE`, fmt.Sprintf(" ON `%s`.*", this.migrationContext.DatabaseName)) {
foundDBAll = true
}
}
@ -500,18 +485,26 @@ func (this *Inspector) getSharedUniqueKeys(originalUniqueKeys, ghostUniqueKeys [
}
// getSharedColumns returns the intersection of two lists of columns in same order as the first list
func (this *Inspector) getSharedColumns(originalColumns, ghostColumns *sql.ColumnList) *sql.ColumnList {
func (this *Inspector) getSharedColumns(originalColumns, ghostColumns *sql.ColumnList, columnRenameMap map[string]string) (*sql.ColumnList, *sql.ColumnList) {
columnsInGhost := make(map[string]bool)
for _, ghostColumn := range ghostColumns.Names {
columnsInGhost[ghostColumn] = true
}
sharedColumnNames := []string{}
for _, originalColumn := range originalColumns.Names {
if columnsInGhost[originalColumn] {
if columnsInGhost[originalColumn] || columnsInGhost[columnRenameMap[originalColumn]] {
sharedColumnNames = append(sharedColumnNames, originalColumn)
}
}
return sql.NewColumnList(sharedColumnNames)
mappedSharedColumnNames := []string{}
for _, columnName := range sharedColumnNames {
if mapped, ok := columnRenameMap[columnName]; ok {
mappedSharedColumnNames = append(mappedSharedColumnNames, mapped)
} else {
mappedSharedColumnNames = append(mappedSharedColumnNames, columnName)
}
}
return sql.NewColumnList(sharedColumnNames), sql.NewColumnList(mappedSharedColumnNames)
}
func (this *Inspector) readChangelogState() (map[string]string, error) {

View File

@ -42,6 +42,7 @@ const (
// Migrator is the main schema migration flow manager.
type Migrator struct {
parser *sql.Parser
inspector *Inspector
applier *Applier
eventsStreamer *EventsStreamer
@ -67,6 +68,7 @@ type Migrator struct {
func NewMigrator() *Migrator {
migrator := &Migrator{
migrationContext: base.GetMigrationContext(),
parser: sql.NewParser(),
tablesInPlace: make(chan bool),
rowCopyComplete: make(chan bool),
allEventsUpToLockProcessed: make(chan bool),
@ -293,16 +295,31 @@ func (this *Migrator) listenOnPanicAbort() {
err := <-this.panicAbort
log.Fatale(err)
}
func (this *Migrator) validateStatement() (err error) {
if this.parser.HasNonTrivialRenames() && !this.migrationContext.SkipRenamedColumns {
this.migrationContext.ColumnRenameMap = this.parser.GetNonTrivialRenames()
if !this.migrationContext.ApproveRenamedColumns {
return fmt.Errorf("Alter statement has column(s) renamed. gh-ost suspects the following renames: %v; but to proceed you must approve via `--approve-renamed-columns` (or you can skip renamed columns via `--skip-renamed-columns`)", this.parser.GetNonTrivialRenames())
}
}
return nil
}
func (this *Migrator) Migrate() (err error) {
log.Infof("Migrating %s.%s", sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.OriginalTableName))
this.migrationContext.StartTime = time.Now()
go this.listenOnPanicAbort()
if err := this.parser.ParseAlterStatement(this.migrationContext.AlterStatement); err != nil {
return err
}
if err := this.validateStatement(); err != nil {
return err
}
if err := this.initiateInspector(); err != nil {
return err
}
if err := this.initiateStreaming(); err != nil {
return err
}

View File

@ -155,20 +155,27 @@ func BuildRangePreparedComparison(columns []string, args []interface{}, comparis
return BuildRangeComparison(columns, values, args, comparisonSign)
}
func BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName string, sharedColumns []string, uniqueKey string, uniqueKeyColumns, rangeStartValues, rangeEndValues []string, rangeStartArgs, rangeEndArgs []interface{}, includeRangeStartValues bool, transactionalTable bool) (result string, explodedArgs []interface{}, err error) {
func BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName string, sharedColumns []string, mappedSharedColumns []string, uniqueKey string, uniqueKeyColumns, rangeStartValues, rangeEndValues []string, rangeStartArgs, rangeEndArgs []interface{}, includeRangeStartValues bool, transactionalTable bool) (result string, explodedArgs []interface{}, err error) {
if len(sharedColumns) == 0 {
return "", explodedArgs, fmt.Errorf("Got 0 shared columns in BuildRangeInsertQuery")
}
databaseName = EscapeName(databaseName)
originalTableName = EscapeName(originalTableName)
ghostTableName = EscapeName(ghostTableName)
mappedSharedColumns = duplicateNames(mappedSharedColumns)
for i := range mappedSharedColumns {
mappedSharedColumns[i] = EscapeName(mappedSharedColumns[i])
}
mappedSharedColumnsListing := strings.Join(mappedSharedColumns, ", ")
sharedColumns = duplicateNames(sharedColumns)
for i := range sharedColumns {
sharedColumns[i] = EscapeName(sharedColumns[i])
}
uniqueKey = EscapeName(uniqueKey)
sharedColumnsListing := strings.Join(sharedColumns, ", ")
uniqueKey = EscapeName(uniqueKey)
var minRangeComparisonSign ValueComparisonSign = GreaterThanComparisonSign
if includeRangeStartValues {
minRangeComparisonSign = GreaterThanOrEqualsComparisonSign
@ -192,16 +199,16 @@ func BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName strin
(select %s from %s.%s force index (%s)
where (%s and %s) %s
)
`, databaseName, originalTableName, databaseName, ghostTableName, sharedColumnsListing,
`, databaseName, originalTableName, databaseName, ghostTableName, mappedSharedColumnsListing,
sharedColumnsListing, databaseName, originalTableName, uniqueKey,
rangeStartComparison, rangeEndComparison, transactionalClause)
return result, explodedArgs, nil
}
func BuildRangeInsertPreparedQuery(databaseName, originalTableName, ghostTableName string, sharedColumns []string, uniqueKey string, uniqueKeyColumns []string, rangeStartArgs, rangeEndArgs []interface{}, includeRangeStartValues bool, transactionalTable bool) (result string, explodedArgs []interface{}, err error) {
func BuildRangeInsertPreparedQuery(databaseName, originalTableName, ghostTableName string, sharedColumns []string, mappedSharedColumns []string, uniqueKey string, uniqueKeyColumns []string, rangeStartArgs, rangeEndArgs []interface{}, includeRangeStartValues bool, transactionalTable bool) (result string, explodedArgs []interface{}, err error) {
rangeStartValues := buildPreparedValues(len(uniqueKeyColumns))
rangeEndValues := buildPreparedValues(len(uniqueKeyColumns))
return BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName, sharedColumns, uniqueKey, uniqueKeyColumns, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, includeRangeStartValues, transactionalTable)
return BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName, sharedColumns, mappedSharedColumns, uniqueKey, uniqueKeyColumns, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, includeRangeStartValues, transactionalTable)
}
func BuildUniqueKeyRangeEndPreparedQuery(databaseName, tableName string, uniqueKeyColumns []string, rangeStartArgs, rangeEndArgs []interface{}, chunkSize int64, includeRangeStartValues bool, hint string) (result string, explodedArgs []interface{}, err error) {

View File

@ -172,7 +172,7 @@ func TestBuildRangeInsertQuery(t *testing.T) {
rangeStartArgs := []interface{}{3}
rangeEndArgs := []interface{}{103}
query, explodedArgs, err := BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName, sharedColumns, uniqueKey, uniqueKeyColumns, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, true, false)
query, explodedArgs, err := BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName, sharedColumns, sharedColumns, uniqueKey, uniqueKeyColumns, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, true, false)
test.S(t).ExpectNil(err)
expected := `
insert /* gh-ost mydb.tbl */ ignore into mydb.ghost (id, name, position)
@ -191,7 +191,7 @@ func TestBuildRangeInsertQuery(t *testing.T) {
rangeStartArgs := []interface{}{3, 17}
rangeEndArgs := []interface{}{103, 117}
query, explodedArgs, err := BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName, sharedColumns, uniqueKey, uniqueKeyColumns, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, true, false)
query, explodedArgs, err := BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName, sharedColumns, sharedColumns, uniqueKey, uniqueKeyColumns, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, true, false)
test.S(t).ExpectNil(err)
expected := `
insert /* gh-ost mydb.tbl */ ignore into mydb.ghost (id, name, position)
@ -204,6 +204,52 @@ func TestBuildRangeInsertQuery(t *testing.T) {
}
}
func TestBuildRangeInsertQueryRenameMap(t *testing.T) {
databaseName := "mydb"
originalTableName := "tbl"
ghostTableName := "ghost"
sharedColumns := []string{"id", "name", "position"}
mappedSharedColumns := []string{"id", "name", "location"}
{
uniqueKey := "PRIMARY"
uniqueKeyColumns := []string{"id"}
rangeStartValues := []string{"@v1s"}
rangeEndValues := []string{"@v1e"}
rangeStartArgs := []interface{}{3}
rangeEndArgs := []interface{}{103}
query, explodedArgs, err := BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName, sharedColumns, mappedSharedColumns, uniqueKey, uniqueKeyColumns, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, true, false)
test.S(t).ExpectNil(err)
expected := `
insert /* gh-ost mydb.tbl */ ignore into mydb.ghost (id, name, location)
(select id, name, position from mydb.tbl force index (PRIMARY)
where (((id > @v1s) or ((id = @v1s))) and ((id < @v1e) or ((id = @v1e))))
)
`
test.S(t).ExpectEquals(normalizeQuery(query), normalizeQuery(expected))
test.S(t).ExpectTrue(reflect.DeepEqual(explodedArgs, []interface{}{3, 3, 103, 103}))
}
{
uniqueKey := "name_position_uidx"
uniqueKeyColumns := []string{"name", "position"}
rangeStartValues := []string{"@v1s", "@v2s"}
rangeEndValues := []string{"@v1e", "@v2e"}
rangeStartArgs := []interface{}{3, 17}
rangeEndArgs := []interface{}{103, 117}
query, explodedArgs, err := BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName, sharedColumns, mappedSharedColumns, uniqueKey, uniqueKeyColumns, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, true, false)
test.S(t).ExpectNil(err)
expected := `
insert /* gh-ost mydb.tbl */ ignore into mydb.ghost (id, name, location)
(select id, name, position from mydb.tbl force index (name_position_uidx)
where (((name > @v1s) or (((name = @v1s)) AND (position > @v2s)) or ((name = @v1s) and (position = @v2s))) and ((name < @v1e) or (((name = @v1e)) AND (position < @v2e)) or ((name = @v1e) and (position = @v2e))))
)
`
test.S(t).ExpectEquals(normalizeQuery(query), normalizeQuery(expected))
test.S(t).ExpectTrue(reflect.DeepEqual(explodedArgs, []interface{}{3, 3, 17, 3, 17, 103, 103, 117, 103, 117}))
}
}
func TestBuildRangeInsertPreparedQuery(t *testing.T) {
databaseName := "mydb"
originalTableName := "tbl"
@ -215,7 +261,7 @@ func TestBuildRangeInsertPreparedQuery(t *testing.T) {
rangeStartArgs := []interface{}{3, 17}
rangeEndArgs := []interface{}{103, 117}
query, explodedArgs, err := BuildRangeInsertPreparedQuery(databaseName, originalTableName, ghostTableName, sharedColumns, uniqueKey, uniqueKeyColumns, rangeStartArgs, rangeEndArgs, true, true)
query, explodedArgs, err := BuildRangeInsertPreparedQuery(databaseName, originalTableName, ghostTableName, sharedColumns, sharedColumns, uniqueKey, uniqueKeyColumns, rangeStartArgs, rangeEndArgs, true, true)
test.S(t).ExpectNil(err)
expected := `
insert /* gh-ost mydb.tbl */ ignore into mydb.ghost (id, name, position)

50
go/sql/parser.go Normal file
View File

@ -0,0 +1,50 @@
/*
Copyright 2016 GitHub Inc.
See https://github.com/github/gh-ost/blob/master/LICENSE
*/
package sql
import (
"regexp"
"strconv"
)
var (
renameColumnRegexp = regexp.MustCompile(`(?i)CHANGE\s+(column\s+|)([\S]+)\s+([\S]+)\s+`)
)
type Parser struct {
columnRenameMap map[string]string
}
func NewParser() *Parser {
return &Parser{
columnRenameMap: make(map[string]string),
}
}
func (this *Parser) ParseAlterStatement(alterStatement string) (err error) {
allStringSubmatch := renameColumnRegexp.FindAllStringSubmatch(alterStatement, -1)
for _, submatch := range allStringSubmatch {
submatch[2], _ = strconv.Unquote(submatch[2])
submatch[3], _ = strconv.Unquote(submatch[3])
this.columnRenameMap[submatch[2]] = submatch[3]
}
return nil
}
func (this *Parser) GetNonTrivialRenames() map[string]string {
result := make(map[string]string)
for column, renamed := range this.columnRenameMap {
if column != renamed {
result[column] = renamed
}
}
return result
}
func (this *Parser) HasNonTrivialRenames() bool {
return len(this.GetNonTrivialRenames()) > 0
}