compute sharedVirtualColumns and take it into account while sanity-testing chosen uniqueKeyColumns

Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com>
This commit is contained in:
Shlomi Noach 2021-07-08 15:30:11 +03:00
parent 4610fa771f
commit bb06e817fe
5 changed files with 46 additions and 6 deletions

View File

@ -213,6 +213,7 @@ type MigrationContext struct {
GhostTableUniqueKeys [](*sql.UniqueKey) GhostTableUniqueKeys [](*sql.UniqueKey)
UniqueKey *sql.UniqueKey UniqueKey *sql.UniqueKey
SharedColumns *sql.ColumnList SharedColumns *sql.ColumnList
SharedVirtualColumns *sql.ColumnList
ColumnRenameMap map[string]string ColumnRenameMap map[string]string
DroppedColumnsMap map[string]bool DroppedColumnsMap map[string]bool
MappedSharedColumns *sql.ColumnList MappedSharedColumns *sql.ColumnList

View File

@ -1001,7 +1001,7 @@ func (this *Applier) buildDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) (result
results = append(results, this.buildDMLEventQuery(dmlEvent)...) results = append(results, this.buildDMLEventQuery(dmlEvent)...)
return results return results
} }
query, sharedArgs, uniqueKeyArgs, err := sql.BuildDMLUpdateQuery(dmlEvent.DatabaseName, this.migrationContext.GetGhostTableName(), this.migrationContext.OriginalTableColumns, this.migrationContext.SharedColumns, this.migrationContext.MappedSharedColumns, &this.migrationContext.UniqueKey.Columns, dmlEvent.NewColumnValues.AbstractValues(), dmlEvent.WhereColumnValues.AbstractValues()) query, sharedArgs, uniqueKeyArgs, err := sql.BuildDMLUpdateQuery(dmlEvent.DatabaseName, this.migrationContext.GetGhostTableName(), this.migrationContext.OriginalTableColumns, this.migrationContext.SharedColumns, this.migrationContext.MappedSharedColumns, this.migrationContext.SharedVirtualColumns, &this.migrationContext.UniqueKey.Columns, dmlEvent.NewColumnValues.AbstractValues(), dmlEvent.WhereColumnValues.AbstractValues())
args := sqlutils.Args() args := sqlutils.Args()
args = append(args, sharedArgs...) args = append(args, sharedArgs...)
args = append(args, uniqueKeyArgs...) args = append(args, uniqueKeyArgs...)

View File

@ -171,7 +171,7 @@ func (this *Inspector) inspectOriginalAndGhostTables() (err error) {
} }
} }
this.migrationContext.SharedColumns, this.migrationContext.MappedSharedColumns = this.getSharedColumns(this.migrationContext.OriginalTableColumns, this.migrationContext.GhostTableColumns, this.migrationContext.OriginalTableVirtualColumns, this.migrationContext.GhostTableVirtualColumns, this.migrationContext.ColumnRenameMap) this.migrationContext.SharedColumns, this.migrationContext.MappedSharedColumns, this.migrationContext.SharedVirtualColumns = this.getSharedColumns(this.migrationContext.OriginalTableColumns, this.migrationContext.GhostTableColumns, this.migrationContext.OriginalTableVirtualColumns, this.migrationContext.GhostTableVirtualColumns, this.migrationContext.ColumnRenameMap)
this.migrationContext.Log.Infof("Shared columns are %s", this.migrationContext.SharedColumns) this.migrationContext.Log.Infof("Shared columns are %s", this.migrationContext.SharedColumns)
// By fact that a non-empty unique key exists we also know the shared columns are non-empty // By fact that a non-empty unique key exists we also know the shared columns are non-empty
@ -720,8 +720,9 @@ func (this *Inspector) getSharedUniqueKeys(originalUniqueKeys, ghostUniqueKeys [
} }
// getSharedColumns returns the intersection of two lists of columns in same order as the first list // getSharedColumns returns the intersection of two lists of columns in same order as the first list
func (this *Inspector) getSharedColumns(originalColumns, ghostColumns *sql.ColumnList, originalVirtualColumns, ghostVirtualColumns *sql.ColumnList, columnRenameMap map[string]string) (*sql.ColumnList, *sql.ColumnList) { func (this *Inspector) getSharedColumns(originalColumns, ghostColumns *sql.ColumnList, originalVirtualColumns, ghostVirtualColumns *sql.ColumnList, columnRenameMap map[string]string) (*sql.ColumnList, *sql.ColumnList, *sql.ColumnList) {
sharedColumnNames := []string{} sharedColumnNames := []string{}
sharedVirtualColumnNames := []string{}
for _, originalColumn := range originalColumns.Names() { for _, originalColumn := range originalColumns.Names() {
isSharedColumn := false isSharedColumn := false
for _, ghostColumn := range ghostColumns.Names() { for _, ghostColumn := range ghostColumns.Names() {
@ -754,6 +755,29 @@ func (this *Inspector) getSharedColumns(originalColumns, ghostColumns *sql.Colum
sharedColumnNames = append(sharedColumnNames, originalColumn) sharedColumnNames = append(sharedColumnNames, originalColumn)
} }
} }
// Virtual columns
for _, originalColumn := range originalVirtualColumns.Names() {
isSharedColumn := false
for _, ghostColumn := range ghostVirtualColumns.Names() {
if strings.EqualFold(originalColumn, ghostColumn) {
isSharedColumn = true
break
}
if strings.EqualFold(columnRenameMap[originalColumn], ghostColumn) {
isSharedColumn = true
break
}
}
for droppedColumn := range this.migrationContext.DroppedColumnsMap {
if strings.EqualFold(originalColumn, droppedColumn) {
isSharedColumn = false
break
}
}
if isSharedColumn {
sharedVirtualColumnNames = append(sharedVirtualColumnNames, originalColumn)
}
}
mappedSharedColumnNames := []string{} mappedSharedColumnNames := []string{}
for _, columnName := range sharedColumnNames { for _, columnName := range sharedColumnNames {
if mapped, ok := columnRenameMap[columnName]; ok { if mapped, ok := columnRenameMap[columnName]; ok {
@ -762,7 +786,7 @@ func (this *Inspector) getSharedColumns(originalColumns, ghostColumns *sql.Colum
mappedSharedColumnNames = append(mappedSharedColumnNames, columnName) mappedSharedColumnNames = append(mappedSharedColumnNames, columnName)
} }
} }
return sql.NewColumnList(sharedColumnNames), sql.NewColumnList(mappedSharedColumnNames) return sql.NewColumnList(sharedColumnNames), sql.NewColumnList(mappedSharedColumnNames), sql.NewColumnList(sharedVirtualColumnNames)
} }
// showCreateTable returns the `show create table` statement for given table // showCreateTable returns the `show create table` statement for given table

View File

@ -461,7 +461,9 @@ func BuildDMLInsertQuery(databaseName, tableName string, tableColumns, sharedCol
return result, sharedArgs, nil return result, sharedArgs, nil
} }
func BuildDMLUpdateQuery(databaseName, tableName string, tableColumns, sharedColumns, mappedSharedColumns, uniqueKeyColumns *ColumnList, valueArgs, whereArgs []interface{}) (result string, sharedArgs, uniqueKeyArgs []interface{}, err error) { func BuildDMLUpdateQuery(databaseName, tableName string,
tableColumns, sharedColumns, mappedSharedColumns, sharedVirtualColumns, uniqueKeyColumns *ColumnList,
valueArgs, whereArgs []interface{}) (result string, sharedArgs, uniqueKeyArgs []interface{}, err error) {
if len(valueArgs) != tableColumns.Len() { if len(valueArgs) != tableColumns.Len() {
return result, sharedArgs, uniqueKeyArgs, fmt.Errorf("value args count differs from table column count in BuildDMLUpdateQuery") return result, sharedArgs, uniqueKeyArgs, fmt.Errorf("value args count differs from table column count in BuildDMLUpdateQuery")
} }
@ -471,7 +473,7 @@ func BuildDMLUpdateQuery(databaseName, tableName string, tableColumns, sharedCol
if !sharedColumns.IsSubsetOf(tableColumns) { if !sharedColumns.IsSubsetOf(tableColumns) {
return result, sharedArgs, uniqueKeyArgs, fmt.Errorf("shared columns is not a subset of table columns in BuildDMLUpdateQuery") return result, sharedArgs, uniqueKeyArgs, fmt.Errorf("shared columns is not a subset of table columns in BuildDMLUpdateQuery")
} }
if !uniqueKeyColumns.IsSubsetOf(sharedColumns) { if !uniqueKeyColumns.IsSubsetOf(ConcatenateColumnList(sharedColumns, sharedVirtualColumns)) {
return result, sharedArgs, uniqueKeyArgs, fmt.Errorf("unique key columns is not a subset of shared columns in BuildDMLUpdateQuery") return result, sharedArgs, uniqueKeyArgs, fmt.Errorf("unique key columns is not a subset of shared columns in BuildDMLUpdateQuery")
} }
if sharedColumns.Len() == 0 { if sharedColumns.Len() == 0 {

View File

@ -234,6 +234,19 @@ func (this *ColumnList) IsSubsetOf(other *ColumnList) bool {
return true return true
} }
// ConcatenateColumnList concatenates two column lists. It does not check for duplicates.
func ConcatenateColumnList(columns1, columns2 *ColumnList) *ColumnList {
result := &ColumnList{}
for _, col := range columns1.Columns() {
result.columns = append(result.columns, col)
}
for _, col := range columns2.Columns() {
result.columns = append(result.columns, col)
}
result.Ordinals = NewColumnsMap(result.columns)
return result
}
func (this *ColumnList) Len() int { func (this *ColumnList) Len() int {
return len(this.columns) return len(this.columns)
} }