Support for GENERATED (aka virtual) columns

This commit is contained in:
Shlomi Noach 2018-05-22 12:36:52 +03:00
parent bba8b257d1
commit db871b42c4
7 changed files with 67 additions and 17 deletions

View File

@ -186,8 +186,10 @@ type MigrationContext struct {
OriginalTableColumnsOnApplier *sql.ColumnList
OriginalTableColumns *sql.ColumnList
OriginalTableVirtualColumns *sql.ColumnList
OriginalTableUniqueKeys [](*sql.UniqueKey)
GhostTableColumns *sql.ColumnList
GhostTableVirtualColumns *sql.ColumnList
GhostTableUniqueKeys [](*sql.UniqueKey)
UniqueKey *sql.UniqueKey
SharedColumns *sql.ColumnList

View File

@ -117,7 +117,7 @@ func (this *Applier) validateAndReadTimeZone() error {
// readTableColumns reads table columns on applier
func (this *Applier) readTableColumns() (err error) {
log.Infof("Examining table structure on applier")
this.migrationContext.OriginalTableColumnsOnApplier, err = mysql.GetTableColumns(this.db, this.migrationContext.DatabaseName, this.migrationContext.OriginalTableName)
this.migrationContext.OriginalTableColumnsOnApplier, _, err = mysql.GetTableColumns(this.db, this.migrationContext.DatabaseName, this.migrationContext.OriginalTableName)
if err != nil {
return err
}

View File

@ -89,24 +89,24 @@ func (this *Inspector) ValidateOriginalTable() (err error) {
return nil
}
func (this *Inspector) InspectTableColumnsAndUniqueKeys(tableName string) (columns *sql.ColumnList, uniqueKeys [](*sql.UniqueKey), err error) {
func (this *Inspector) InspectTableColumnsAndUniqueKeys(tableName string) (columns *sql.ColumnList, virtualColumns *sql.ColumnList, uniqueKeys [](*sql.UniqueKey), err error) {
uniqueKeys, err = this.getCandidateUniqueKeys(tableName)
if err != nil {
return columns, uniqueKeys, err
return columns, virtualColumns, uniqueKeys, err
}
if len(uniqueKeys) == 0 {
return columns, uniqueKeys, fmt.Errorf("No PRIMARY nor UNIQUE key found in table! Bailing out")
return columns, virtualColumns, uniqueKeys, fmt.Errorf("No PRIMARY nor UNIQUE key found in table! Bailing out")
}
columns, err = mysql.GetTableColumns(this.db, this.migrationContext.DatabaseName, tableName)
columns, virtualColumns, err = mysql.GetTableColumns(this.db, this.migrationContext.DatabaseName, tableName)
if err != nil {
return columns, uniqueKeys, err
return columns, virtualColumns, uniqueKeys, err
}
return columns, uniqueKeys, nil
return columns, virtualColumns, uniqueKeys, nil
}
func (this *Inspector) InspectOriginalTable() (err error) {
this.migrationContext.OriginalTableColumns, this.migrationContext.OriginalTableUniqueKeys, err = this.InspectTableColumnsAndUniqueKeys(this.migrationContext.OriginalTableName)
this.migrationContext.OriginalTableColumns, this.migrationContext.OriginalTableVirtualColumns, this.migrationContext.OriginalTableUniqueKeys, err = this.InspectTableColumnsAndUniqueKeys(this.migrationContext.OriginalTableName)
if err != nil {
return err
}
@ -122,7 +122,7 @@ func (this *Inspector) inspectOriginalAndGhostTables() (err error) {
return fmt.Errorf("It seems like table structure is not identical between master and replica. This scenario is not supported.")
}
this.migrationContext.GhostTableColumns, this.migrationContext.GhostTableUniqueKeys, err = this.InspectTableColumnsAndUniqueKeys(this.migrationContext.GetGhostTableName())
this.migrationContext.GhostTableColumns, this.migrationContext.GhostTableVirtualColumns, this.migrationContext.GhostTableUniqueKeys, err = this.InspectTableColumnsAndUniqueKeys(this.migrationContext.GetGhostTableName())
if err != nil {
return err
}
@ -166,7 +166,7 @@ func (this *Inspector) inspectOriginalAndGhostTables() (err error) {
}
}
this.migrationContext.SharedColumns, this.migrationContext.MappedSharedColumns = this.getSharedColumns(this.migrationContext.OriginalTableColumns, this.migrationContext.GhostTableColumns, this.migrationContext.ColumnRenameMap)
this.migrationContext.SharedColumns, this.migrationContext.MappedSharedColumns = this.getSharedColumns(this.migrationContext.OriginalTableColumns, this.migrationContext.GhostTableColumns, this.migrationContext.OriginalTableVirtualColumns, this.migrationContext.GhostTableVirtualColumns, this.migrationContext.ColumnRenameMap)
log.Infof("Shared columns are %s", this.migrationContext.SharedColumns)
// By fact that a non-empty unique key exists we also know the shared columns are non-empty
@ -692,7 +692,7 @@ func (this *Inspector) getSharedUniqueKeys(originalUniqueKeys, ghostUniqueKeys [
}
// getSharedColumns returns the intersection of two lists of columns in same order as the first list
func (this *Inspector) getSharedColumns(originalColumns, ghostColumns *sql.ColumnList, columnRenameMap map[string]string) (*sql.ColumnList, *sql.ColumnList) {
func (this *Inspector) getSharedColumns(originalColumns, ghostColumns *sql.ColumnList, originalVirtualColumns, ghostVirtualColumns *sql.ColumnList, columnRenameMap map[string]string) (*sql.ColumnList, *sql.ColumnList) {
sharedColumnNames := []string{}
for _, originalColumn := range originalColumns.Names() {
isSharedColumn := false
@ -709,6 +709,16 @@ func (this *Inspector) getSharedColumns(originalColumns, ghostColumns *sql.Colum
isSharedColumn = false
}
}
for _, virtualColumn := range originalVirtualColumns.Names() {
if strings.EqualFold(originalColumn, virtualColumn) {
isSharedColumn = false
}
}
for _, virtualColumn := range ghostVirtualColumns.Names() {
if strings.EqualFold(originalColumn, virtualColumn) {
isSharedColumn = false
}
}
if isSharedColumn {
sharedColumnNames = append(sharedColumnNames, originalColumn)
}

View File

@ -8,6 +8,7 @@ package mysql
import (
gosql "database/sql"
"fmt"
"strings"
"sync"
"time"
@ -178,7 +179,7 @@ func GetInstanceKey(db *gosql.DB) (instanceKey *InstanceKey, err error) {
}
// GetTableColumns reads column list from given table
func GetTableColumns(db *gosql.DB, databaseName, tableName string) (*sql.ColumnList, error) {
func GetTableColumns(db *gosql.DB, databaseName, tableName string) (*sql.ColumnList, *sql.ColumnList, error) {
query := fmt.Sprintf(`
show columns from %s.%s
`,
@ -186,18 +187,24 @@ func GetTableColumns(db *gosql.DB, databaseName, tableName string) (*sql.ColumnL
sql.EscapeName(tableName),
)
columnNames := []string{}
virtualColumnNames := []string{}
err := sqlutils.QueryRowsMap(db, query, func(rowMap sqlutils.RowMap) error {
columnNames = append(columnNames, rowMap.GetString("Field"))
columnName := rowMap.GetString("Field")
columnNames = append(columnNames, columnName)
if strings.Contains(rowMap.GetString("Extra"), " GENERATED") {
log.Debugf("%s is a generated column", columnName)
virtualColumnNames = append(virtualColumnNames, columnName)
}
return nil
})
if err != nil {
return nil, err
return nil, nil, err
}
if len(columnNames) == 0 {
return nil, log.Errorf("Found 0 columns on %s.%s. Bailing out",
return nil, nil, log.Errorf("Found 0 columns on %s.%s. Bailing out",
sql.EscapeName(databaseName),
sql.EscapeName(tableName),
)
}
return sql.NewColumnList(columnNames), nil
return sql.NewColumnList(columnNames), sql.NewColumnList(virtualColumnNames), nil
}

View File

@ -0,0 +1,30 @@
drop table if exists gh_ost_test;
create table gh_ost_test (
id int auto_increment,
a int not null,
b int not null,
sum_ab int as (a + b) virtual not null,
primary key(id)
) auto_increment=1;
drop event if exists gh_ost_test;
delimiter ;;
create event gh_ost_test
on schedule every 1 second
starts current_timestamp
ends current_timestamp + interval 60 second
on completion not preserve
enable
do
begin
insert into gh_ost_test (id, a, b) values (null, 2,3);
insert into gh_ost_test (id, a, b) values (null, 2,4);
insert into gh_ost_test (id, a, b) values (null, 2,5);
insert into gh_ost_test (id, a, b) values (null, 2,6);
insert into gh_ost_test (id, a, b) values (null, 2,7);
insert into gh_ost_test (id, a, b) values (null, 2,8);
insert into gh_ost_test (id, a, b) values (null, 2,9);
insert into gh_ost_test (id, a, b) values (null, 2,0);
insert into gh_ost_test (id, a, b) values (null, 2,1);
insert into gh_ost_test (id, a, b) values (null, 2,2);
end ;;

View File

@ -0,0 +1 @@
(5.5|5.6)