Merge pull request #70 from github/identify-rename

Supporting column rename
This commit is contained in:
Shlomi Noach 2016-06-17 08:04:03 +02:00 committed by GitHub
commit 2b0f7af84b
10 changed files with 173 additions and 35 deletions

View File

@ -1,7 +1,7 @@
#!/bin/bash #!/bin/bash
# #
# #
RELEASE_VERSION="0.9.2" RELEASE_VERSION="0.9.3"
buildpath=/tmp/gh-ost buildpath=/tmp/gh-ost
target=gh-ost target=gh-ost

View File

@ -36,7 +36,7 @@ const (
) )
const ( const (
maxRetries = 10 maxRetries = 60
) )
// MigrationContext has the general, global state of migration. It is used by // MigrationContext has the general, global state of migration. It is used by
@ -50,6 +50,8 @@ type MigrationContext struct {
AllowedRunningOnMaster bool AllowedRunningOnMaster bool
SwitchToRowBinlogFormat bool SwitchToRowBinlogFormat bool
NullableUniqueKeyAllowed bool NullableUniqueKeyAllowed bool
ApproveRenamedColumns bool
SkipRenamedColumns bool
config ContextConfig config ContextConfig
configMutex *sync.Mutex configMutex *sync.Mutex
@ -108,6 +110,8 @@ type MigrationContext struct {
GhostTableUniqueKeys [](*sql.UniqueKey) GhostTableUniqueKeys [](*sql.UniqueKey)
UniqueKey *sql.UniqueKey UniqueKey *sql.UniqueKey
SharedColumns *sql.ColumnList SharedColumns *sql.ColumnList
ColumnRenameMap map[string]string
MappedSharedColumns *sql.ColumnList
MigrationRangeMinValues *sql.ColumnValues MigrationRangeMinValues *sql.ColumnValues
MigrationRangeMaxValues *sql.ColumnValues MigrationRangeMaxValues *sql.ColumnValues
Iteration int64 Iteration int64
@ -149,6 +153,7 @@ func newMigrationContext() *MigrationContext {
ThrottleControlReplicaKeys: mysql.NewInstanceKeyMap(), ThrottleControlReplicaKeys: mysql.NewInstanceKeyMap(),
configMutex: &sync.Mutex{}, configMutex: &sync.Mutex{},
pointOfInterestTimeMutex: &sync.Mutex{}, pointOfInterestTimeMutex: &sync.Mutex{},
ColumnRenameMap: make(map[string]string),
} }
} }

View File

@ -9,6 +9,7 @@ import (
"fmt" "fmt"
"os" "os"
"regexp" "regexp"
"strings"
"time" "time"
) )
@ -31,3 +32,19 @@ func FileExists(fileName string) bool {
} }
return false return false
} }
func StringContainsAll(s string, substrings ...string) bool {
nonEmptyStringsFound := false
for _, substring := range substrings {
if s == "" {
continue
}
if strings.Contains(s, substring) {
nonEmptyStringsFound = true
} else {
// Immediate failure
return false
}
}
return nonEmptyStringsFound
}

View File

@ -55,6 +55,8 @@ func main() {
flag.BoolVar(&migrationContext.CountTableRows, "exact-rowcount", false, "actually count table rows as opposed to estimate them (results in more accurate progress estimation)") flag.BoolVar(&migrationContext.CountTableRows, "exact-rowcount", false, "actually count table rows as opposed to estimate them (results in more accurate progress estimation)")
flag.BoolVar(&migrationContext.AllowedRunningOnMaster, "allow-on-master", false, "allow this migration to run directly on master. Preferably it would run on a replica") flag.BoolVar(&migrationContext.AllowedRunningOnMaster, "allow-on-master", false, "allow this migration to run directly on master. Preferably it would run on a replica")
flag.BoolVar(&migrationContext.NullableUniqueKeyAllowed, "allow-nullable-unique-key", false, "allow gh-ost to migrate based on a unique key with nullable columns. As long as no NULL values exist, this should be OK. If NULL values exist in chosen key, data may be corrupted. Use at your own risk!") flag.BoolVar(&migrationContext.NullableUniqueKeyAllowed, "allow-nullable-unique-key", false, "allow gh-ost to migrate based on a unique key with nullable columns. As long as no NULL values exist, this should be OK. If NULL values exist in chosen key, data may be corrupted. Use at your own risk!")
flag.BoolVar(&migrationContext.ApproveRenamedColumns, "approve-renamed-columns", false, "in case your `ALTER` statement renames columns, gh-ost will note that and offer its interpretation of the rename. By default gh-ost does not proceed to execute. This flag approves that gh-ost's interpretation si correct")
flag.BoolVar(&migrationContext.SkipRenamedColumns, "skip-renamed-columns", false, "in case your `ALTER` statement renames columns, gh-ost will note that and offer its interpretation of the rename. By default gh-ost does not proceed to execute. This flag tells gh-ost to skip the renamed columns, i.e. to treat what gh-ost thinks are renamed columns as unrelated columns. NOTE: you may lose column data")
executeFlag := flag.Bool("execute", false, "actually execute the alter & migrate the table. Default is noop: do some tests and exit") executeFlag := flag.Bool("execute", false, "actually execute the alter & migrate the table. Default is noop: do some tests and exit")
flag.BoolVar(&migrationContext.TestOnReplica, "test-on-replica", false, "Have the migration run on a replica, not on the master. At the end of migration replication is stopped, and tables are swapped and immediately swap-revert. Replication remains stopped and you can compare the two tables for building trust") flag.BoolVar(&migrationContext.TestOnReplica, "test-on-replica", false, "Have the migration run on a replica, not on the master. At the end of migration replication is stopped, and tables are swapped and immediately swap-revert. Replication remains stopped and you can compare the two tables for building trust")

View File

@ -408,6 +408,7 @@ func (this *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected
this.migrationContext.OriginalTableName, this.migrationContext.OriginalTableName,
this.migrationContext.GetGhostTableName(), this.migrationContext.GetGhostTableName(),
this.migrationContext.SharedColumns.Names, this.migrationContext.SharedColumns.Names,
this.migrationContext.MappedSharedColumns.Names,
this.migrationContext.UniqueKey.Name, this.migrationContext.UniqueKey.Name,
this.migrationContext.UniqueKey.Columns.Names, this.migrationContext.UniqueKey.Columns.Names,
this.migrationContext.MigrationIterationRangeMinValues.AbstractValues(), this.migrationContext.MigrationIterationRangeMinValues.AbstractValues(),
@ -785,12 +786,12 @@ func (this *Applier) buildDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) (query
} }
case binlog.InsertDML: case binlog.InsertDML:
{ {
query, sharedArgs, err := sql.BuildDMLInsertQuery(dmlEvent.DatabaseName, this.migrationContext.GetGhostTableName(), this.migrationContext.OriginalTableColumns, this.migrationContext.SharedColumns, dmlEvent.NewColumnValues.AbstractValues()) query, sharedArgs, err := sql.BuildDMLInsertQuery(dmlEvent.DatabaseName, this.migrationContext.GetGhostTableName(), this.migrationContext.OriginalTableColumns, this.migrationContext.MappedSharedColumns, dmlEvent.NewColumnValues.AbstractValues())
return query, sharedArgs, 1, err return query, sharedArgs, 1, err
} }
case binlog.UpdateDML: case binlog.UpdateDML:
{ {
query, sharedArgs, uniqueKeyArgs, err := sql.BuildDMLUpdateQuery(dmlEvent.DatabaseName, this.migrationContext.GetGhostTableName(), this.migrationContext.OriginalTableColumns, this.migrationContext.SharedColumns, &this.migrationContext.UniqueKey.Columns, dmlEvent.NewColumnValues.AbstractValues(), dmlEvent.WhereColumnValues.AbstractValues()) query, sharedArgs, uniqueKeyArgs, err := sql.BuildDMLUpdateQuery(dmlEvent.DatabaseName, this.migrationContext.GetGhostTableName(), this.migrationContext.OriginalTableColumns, this.migrationContext.MappedSharedColumns, &this.migrationContext.UniqueKey.Columns, dmlEvent.NewColumnValues.AbstractValues(), dmlEvent.WhereColumnValues.AbstractValues())
args = append(args, sharedArgs...) args = append(args, sharedArgs...)
args = append(args, uniqueKeyArgs...) args = append(args, uniqueKeyArgs...)
return query, args, 0, err return query, args, 0, err

View File

@ -122,7 +122,7 @@ func (this *Inspector) InspectOriginalAndGhostTables() (err error) {
} }
} }
this.migrationContext.SharedColumns = this.getSharedColumns(this.migrationContext.OriginalTableColumns, this.migrationContext.GhostTableColumns) this.migrationContext.SharedColumns, this.migrationContext.MappedSharedColumns = this.getSharedColumns(this.migrationContext.OriginalTableColumns, this.migrationContext.GhostTableColumns, this.migrationContext.ColumnRenameMap)
log.Infof("Shared columns are %s", this.migrationContext.SharedColumns) log.Infof("Shared columns are %s", this.migrationContext.SharedColumns)
// By fact that a non-empty unique key exists we also know the shared columns are non-empty // By fact that a non-empty unique key exists we also know the shared columns are non-empty
return nil return nil
@ -145,21 +145,6 @@ func (this *Inspector) validateConnection() error {
// validateGrants verifies the user by which we're executing has necessary grants // validateGrants verifies the user by which we're executing has necessary grants
// to do its thang. // to do its thang.
func (this *Inspector) validateGrants() error { func (this *Inspector) validateGrants() error {
stringContainsAll := func(s string, substrings ...string) bool {
nonEmptyStringsFound := false
for _, substring := range substrings {
if s == "" {
continue
}
if strings.Contains(s, substring) {
nonEmptyStringsFound = true
} else {
// Immediate failure
return false
}
}
return nonEmptyStringsFound
}
query := `show /* gh-ost */ grants for current_user()` query := `show /* gh-ost */ grants for current_user()`
foundAll := false foundAll := false
foundSuper := false foundSuper := false
@ -181,10 +166,10 @@ func (this *Inspector) validateGrants() error {
if strings.Contains(grant, fmt.Sprintf("GRANT ALL PRIVILEGES ON `%s`.*", this.migrationContext.DatabaseName)) { if strings.Contains(grant, fmt.Sprintf("GRANT ALL PRIVILEGES ON `%s`.*", this.migrationContext.DatabaseName)) {
foundDBAll = true foundDBAll = true
} }
if stringContainsAll(grant, `ALTER`, `CREATE`, `DELETE`, `DROP`, `INDEX`, `INSERT`, `LOCK TABLES`, `SELECT`, `TRIGGER`, `UPDATE`, ` ON *.*`) { if base.StringContainsAll(grant, `ALTER`, `CREATE`, `DELETE`, `DROP`, `INDEX`, `INSERT`, `LOCK TABLES`, `SELECT`, `TRIGGER`, `UPDATE`, ` ON *.*`) {
foundDBAll = true foundDBAll = true
} }
if stringContainsAll(grant, `ALTER`, `CREATE`, `DELETE`, `DROP`, `INDEX`, `INSERT`, `LOCK TABLES`, `SELECT`, `TRIGGER`, `UPDATE`, fmt.Sprintf(" ON `%s`.*", this.migrationContext.DatabaseName)) { if base.StringContainsAll(grant, `ALTER`, `CREATE`, `DELETE`, `DROP`, `INDEX`, `INSERT`, `LOCK TABLES`, `SELECT`, `TRIGGER`, `UPDATE`, fmt.Sprintf(" ON `%s`.*", this.migrationContext.DatabaseName)) {
foundDBAll = true foundDBAll = true
} }
} }
@ -500,18 +485,26 @@ func (this *Inspector) getSharedUniqueKeys(originalUniqueKeys, ghostUniqueKeys [
} }
// getSharedColumns returns the intersection of two lists of columns in same order as the first list // getSharedColumns returns the intersection of two lists of columns in same order as the first list
func (this *Inspector) getSharedColumns(originalColumns, ghostColumns *sql.ColumnList) *sql.ColumnList { func (this *Inspector) getSharedColumns(originalColumns, ghostColumns *sql.ColumnList, columnRenameMap map[string]string) (*sql.ColumnList, *sql.ColumnList) {
columnsInGhost := make(map[string]bool) columnsInGhost := make(map[string]bool)
for _, ghostColumn := range ghostColumns.Names { for _, ghostColumn := range ghostColumns.Names {
columnsInGhost[ghostColumn] = true columnsInGhost[ghostColumn] = true
} }
sharedColumnNames := []string{} sharedColumnNames := []string{}
for _, originalColumn := range originalColumns.Names { for _, originalColumn := range originalColumns.Names {
if columnsInGhost[originalColumn] { if columnsInGhost[originalColumn] || columnsInGhost[columnRenameMap[originalColumn]] {
sharedColumnNames = append(sharedColumnNames, originalColumn) sharedColumnNames = append(sharedColumnNames, originalColumn)
} }
} }
return sql.NewColumnList(sharedColumnNames) mappedSharedColumnNames := []string{}
for _, columnName := range sharedColumnNames {
if mapped, ok := columnRenameMap[columnName]; ok {
mappedSharedColumnNames = append(mappedSharedColumnNames, mapped)
} else {
mappedSharedColumnNames = append(mappedSharedColumnNames, columnName)
}
}
return sql.NewColumnList(sharedColumnNames), sql.NewColumnList(mappedSharedColumnNames)
} }
func (this *Inspector) readChangelogState() (map[string]string, error) { func (this *Inspector) readChangelogState() (map[string]string, error) {

View File

@ -42,6 +42,7 @@ const (
// Migrator is the main schema migration flow manager. // Migrator is the main schema migration flow manager.
type Migrator struct { type Migrator struct {
parser *sql.Parser
inspector *Inspector inspector *Inspector
applier *Applier applier *Applier
eventsStreamer *EventsStreamer eventsStreamer *EventsStreamer
@ -67,6 +68,7 @@ type Migrator struct {
func NewMigrator() *Migrator { func NewMigrator() *Migrator {
migrator := &Migrator{ migrator := &Migrator{
migrationContext: base.GetMigrationContext(), migrationContext: base.GetMigrationContext(),
parser: sql.NewParser(),
tablesInPlace: make(chan bool), tablesInPlace: make(chan bool),
rowCopyComplete: make(chan bool), rowCopyComplete: make(chan bool),
allEventsUpToLockProcessed: make(chan bool), allEventsUpToLockProcessed: make(chan bool),
@ -293,16 +295,31 @@ func (this *Migrator) listenOnPanicAbort() {
err := <-this.panicAbort err := <-this.panicAbort
log.Fatale(err) log.Fatale(err)
} }
func (this *Migrator) validateStatement() (err error) {
if this.parser.HasNonTrivialRenames() && !this.migrationContext.SkipRenamedColumns {
this.migrationContext.ColumnRenameMap = this.parser.GetNonTrivialRenames()
if !this.migrationContext.ApproveRenamedColumns {
return fmt.Errorf("Alter statement has column(s) renamed. gh-ost suspects the following renames: %v; but to proceed you must approve via `--approve-renamed-columns` (or you can skip renamed columns via `--skip-renamed-columns`)", this.parser.GetNonTrivialRenames())
}
}
return nil
}
func (this *Migrator) Migrate() (err error) { func (this *Migrator) Migrate() (err error) {
log.Infof("Migrating %s.%s", sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.OriginalTableName)) log.Infof("Migrating %s.%s", sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.OriginalTableName))
this.migrationContext.StartTime = time.Now() this.migrationContext.StartTime = time.Now()
go this.listenOnPanicAbort() go this.listenOnPanicAbort()
if err := this.parser.ParseAlterStatement(this.migrationContext.AlterStatement); err != nil {
return err
}
if err := this.validateStatement(); err != nil {
return err
}
if err := this.initiateInspector(); err != nil { if err := this.initiateInspector(); err != nil {
return err return err
} }
if err := this.initiateStreaming(); err != nil { if err := this.initiateStreaming(); err != nil {
return err return err
} }

View File

@ -155,20 +155,27 @@ func BuildRangePreparedComparison(columns []string, args []interface{}, comparis
return BuildRangeComparison(columns, values, args, comparisonSign) return BuildRangeComparison(columns, values, args, comparisonSign)
} }
func BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName string, sharedColumns []string, uniqueKey string, uniqueKeyColumns, rangeStartValues, rangeEndValues []string, rangeStartArgs, rangeEndArgs []interface{}, includeRangeStartValues bool, transactionalTable bool) (result string, explodedArgs []interface{}, err error) { func BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName string, sharedColumns []string, mappedSharedColumns []string, uniqueKey string, uniqueKeyColumns, rangeStartValues, rangeEndValues []string, rangeStartArgs, rangeEndArgs []interface{}, includeRangeStartValues bool, transactionalTable bool) (result string, explodedArgs []interface{}, err error) {
if len(sharedColumns) == 0 { if len(sharedColumns) == 0 {
return "", explodedArgs, fmt.Errorf("Got 0 shared columns in BuildRangeInsertQuery") return "", explodedArgs, fmt.Errorf("Got 0 shared columns in BuildRangeInsertQuery")
} }
databaseName = EscapeName(databaseName) databaseName = EscapeName(databaseName)
originalTableName = EscapeName(originalTableName) originalTableName = EscapeName(originalTableName)
ghostTableName = EscapeName(ghostTableName) ghostTableName = EscapeName(ghostTableName)
mappedSharedColumns = duplicateNames(mappedSharedColumns)
for i := range mappedSharedColumns {
mappedSharedColumns[i] = EscapeName(mappedSharedColumns[i])
}
mappedSharedColumnsListing := strings.Join(mappedSharedColumns, ", ")
sharedColumns = duplicateNames(sharedColumns) sharedColumns = duplicateNames(sharedColumns)
for i := range sharedColumns { for i := range sharedColumns {
sharedColumns[i] = EscapeName(sharedColumns[i]) sharedColumns[i] = EscapeName(sharedColumns[i])
} }
uniqueKey = EscapeName(uniqueKey)
sharedColumnsListing := strings.Join(sharedColumns, ", ") sharedColumnsListing := strings.Join(sharedColumns, ", ")
uniqueKey = EscapeName(uniqueKey)
var minRangeComparisonSign ValueComparisonSign = GreaterThanComparisonSign var minRangeComparisonSign ValueComparisonSign = GreaterThanComparisonSign
if includeRangeStartValues { if includeRangeStartValues {
minRangeComparisonSign = GreaterThanOrEqualsComparisonSign minRangeComparisonSign = GreaterThanOrEqualsComparisonSign
@ -192,16 +199,16 @@ func BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName strin
(select %s from %s.%s force index (%s) (select %s from %s.%s force index (%s)
where (%s and %s) %s where (%s and %s) %s
) )
`, databaseName, originalTableName, databaseName, ghostTableName, sharedColumnsListing, `, databaseName, originalTableName, databaseName, ghostTableName, mappedSharedColumnsListing,
sharedColumnsListing, databaseName, originalTableName, uniqueKey, sharedColumnsListing, databaseName, originalTableName, uniqueKey,
rangeStartComparison, rangeEndComparison, transactionalClause) rangeStartComparison, rangeEndComparison, transactionalClause)
return result, explodedArgs, nil return result, explodedArgs, nil
} }
func BuildRangeInsertPreparedQuery(databaseName, originalTableName, ghostTableName string, sharedColumns []string, uniqueKey string, uniqueKeyColumns []string, rangeStartArgs, rangeEndArgs []interface{}, includeRangeStartValues bool, transactionalTable bool) (result string, explodedArgs []interface{}, err error) { func BuildRangeInsertPreparedQuery(databaseName, originalTableName, ghostTableName string, sharedColumns []string, mappedSharedColumns []string, uniqueKey string, uniqueKeyColumns []string, rangeStartArgs, rangeEndArgs []interface{}, includeRangeStartValues bool, transactionalTable bool) (result string, explodedArgs []interface{}, err error) {
rangeStartValues := buildPreparedValues(len(uniqueKeyColumns)) rangeStartValues := buildPreparedValues(len(uniqueKeyColumns))
rangeEndValues := buildPreparedValues(len(uniqueKeyColumns)) rangeEndValues := buildPreparedValues(len(uniqueKeyColumns))
return BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName, sharedColumns, uniqueKey, uniqueKeyColumns, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, includeRangeStartValues, transactionalTable) return BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName, sharedColumns, mappedSharedColumns, uniqueKey, uniqueKeyColumns, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, includeRangeStartValues, transactionalTable)
} }
func BuildUniqueKeyRangeEndPreparedQuery(databaseName, tableName string, uniqueKeyColumns []string, rangeStartArgs, rangeEndArgs []interface{}, chunkSize int64, includeRangeStartValues bool, hint string) (result string, explodedArgs []interface{}, err error) { func BuildUniqueKeyRangeEndPreparedQuery(databaseName, tableName string, uniqueKeyColumns []string, rangeStartArgs, rangeEndArgs []interface{}, chunkSize int64, includeRangeStartValues bool, hint string) (result string, explodedArgs []interface{}, err error) {

View File

@ -172,7 +172,7 @@ func TestBuildRangeInsertQuery(t *testing.T) {
rangeStartArgs := []interface{}{3} rangeStartArgs := []interface{}{3}
rangeEndArgs := []interface{}{103} rangeEndArgs := []interface{}{103}
query, explodedArgs, err := BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName, sharedColumns, uniqueKey, uniqueKeyColumns, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, true, false) query, explodedArgs, err := BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName, sharedColumns, sharedColumns, uniqueKey, uniqueKeyColumns, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, true, false)
test.S(t).ExpectNil(err) test.S(t).ExpectNil(err)
expected := ` expected := `
insert /* gh-ost mydb.tbl */ ignore into mydb.ghost (id, name, position) insert /* gh-ost mydb.tbl */ ignore into mydb.ghost (id, name, position)
@ -191,7 +191,7 @@ func TestBuildRangeInsertQuery(t *testing.T) {
rangeStartArgs := []interface{}{3, 17} rangeStartArgs := []interface{}{3, 17}
rangeEndArgs := []interface{}{103, 117} rangeEndArgs := []interface{}{103, 117}
query, explodedArgs, err := BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName, sharedColumns, uniqueKey, uniqueKeyColumns, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, true, false) query, explodedArgs, err := BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName, sharedColumns, sharedColumns, uniqueKey, uniqueKeyColumns, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, true, false)
test.S(t).ExpectNil(err) test.S(t).ExpectNil(err)
expected := ` expected := `
insert /* gh-ost mydb.tbl */ ignore into mydb.ghost (id, name, position) insert /* gh-ost mydb.tbl */ ignore into mydb.ghost (id, name, position)
@ -204,6 +204,52 @@ func TestBuildRangeInsertQuery(t *testing.T) {
} }
} }
func TestBuildRangeInsertQueryRenameMap(t *testing.T) {
databaseName := "mydb"
originalTableName := "tbl"
ghostTableName := "ghost"
sharedColumns := []string{"id", "name", "position"}
mappedSharedColumns := []string{"id", "name", "location"}
{
uniqueKey := "PRIMARY"
uniqueKeyColumns := []string{"id"}
rangeStartValues := []string{"@v1s"}
rangeEndValues := []string{"@v1e"}
rangeStartArgs := []interface{}{3}
rangeEndArgs := []interface{}{103}
query, explodedArgs, err := BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName, sharedColumns, mappedSharedColumns, uniqueKey, uniqueKeyColumns, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, true, false)
test.S(t).ExpectNil(err)
expected := `
insert /* gh-ost mydb.tbl */ ignore into mydb.ghost (id, name, location)
(select id, name, position from mydb.tbl force index (PRIMARY)
where (((id > @v1s) or ((id = @v1s))) and ((id < @v1e) or ((id = @v1e))))
)
`
test.S(t).ExpectEquals(normalizeQuery(query), normalizeQuery(expected))
test.S(t).ExpectTrue(reflect.DeepEqual(explodedArgs, []interface{}{3, 3, 103, 103}))
}
{
uniqueKey := "name_position_uidx"
uniqueKeyColumns := []string{"name", "position"}
rangeStartValues := []string{"@v1s", "@v2s"}
rangeEndValues := []string{"@v1e", "@v2e"}
rangeStartArgs := []interface{}{3, 17}
rangeEndArgs := []interface{}{103, 117}
query, explodedArgs, err := BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName, sharedColumns, mappedSharedColumns, uniqueKey, uniqueKeyColumns, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, true, false)
test.S(t).ExpectNil(err)
expected := `
insert /* gh-ost mydb.tbl */ ignore into mydb.ghost (id, name, location)
(select id, name, position from mydb.tbl force index (name_position_uidx)
where (((name > @v1s) or (((name = @v1s)) AND (position > @v2s)) or ((name = @v1s) and (position = @v2s))) and ((name < @v1e) or (((name = @v1e)) AND (position < @v2e)) or ((name = @v1e) and (position = @v2e))))
)
`
test.S(t).ExpectEquals(normalizeQuery(query), normalizeQuery(expected))
test.S(t).ExpectTrue(reflect.DeepEqual(explodedArgs, []interface{}{3, 3, 17, 3, 17, 103, 103, 117, 103, 117}))
}
}
func TestBuildRangeInsertPreparedQuery(t *testing.T) { func TestBuildRangeInsertPreparedQuery(t *testing.T) {
databaseName := "mydb" databaseName := "mydb"
originalTableName := "tbl" originalTableName := "tbl"
@ -215,7 +261,7 @@ func TestBuildRangeInsertPreparedQuery(t *testing.T) {
rangeStartArgs := []interface{}{3, 17} rangeStartArgs := []interface{}{3, 17}
rangeEndArgs := []interface{}{103, 117} rangeEndArgs := []interface{}{103, 117}
query, explodedArgs, err := BuildRangeInsertPreparedQuery(databaseName, originalTableName, ghostTableName, sharedColumns, uniqueKey, uniqueKeyColumns, rangeStartArgs, rangeEndArgs, true, true) query, explodedArgs, err := BuildRangeInsertPreparedQuery(databaseName, originalTableName, ghostTableName, sharedColumns, sharedColumns, uniqueKey, uniqueKeyColumns, rangeStartArgs, rangeEndArgs, true, true)
test.S(t).ExpectNil(err) test.S(t).ExpectNil(err)
expected := ` expected := `
insert /* gh-ost mydb.tbl */ ignore into mydb.ghost (id, name, position) insert /* gh-ost mydb.tbl */ ignore into mydb.ghost (id, name, position)

50
go/sql/parser.go Normal file
View File

@ -0,0 +1,50 @@
/*
Copyright 2016 GitHub Inc.
See https://github.com/github/gh-ost/blob/master/LICENSE
*/
package sql
import (
"regexp"
"strconv"
)
var (
renameColumnRegexp = regexp.MustCompile(`(?i)CHANGE\s+(column\s+|)([\S]+)\s+([\S]+)\s+`)
)
type Parser struct {
columnRenameMap map[string]string
}
func NewParser() *Parser {
return &Parser{
columnRenameMap: make(map[string]string),
}
}
func (this *Parser) ParseAlterStatement(alterStatement string) (err error) {
allStringSubmatch := renameColumnRegexp.FindAllStringSubmatch(alterStatement, -1)
for _, submatch := range allStringSubmatch {
submatch[2], _ = strconv.Unquote(submatch[2])
submatch[3], _ = strconv.Unquote(submatch[3])
this.columnRenameMap[submatch[2]] = submatch[3]
}
return nil
}
func (this *Parser) GetNonTrivialRenames() map[string]string {
result := make(map[string]string)
for column, renamed := range this.columnRenameMap {
if column != renamed {
result[column] = renamed
}
}
return result
}
func (this *Parser) HasNonTrivialRenames() bool {
return len(this.GetNonTrivialRenames()) > 0
}