force index in iteration range end value queries

This commit is contained in:
lukelewang 2023-01-09 15:27:15 +08:00
parent 7320fda848
commit 77f65ad898
3 changed files with 22 additions and 16 deletions

View File

@ -574,7 +574,7 @@ func (this *Applier) CalculateNextIterationRangeEndValues() (hasFurtherRange boo
query, explodedArgs, err := buildFunc(
this.migrationContext.DatabaseName,
this.migrationContext.OriginalTableName,
&this.migrationContext.UniqueKey.Columns,
this.migrationContext.UniqueKey,
this.migrationContext.MigrationIterationRangeMinValues.AbstractValues(),
this.migrationContext.MigrationRangeMaxValues.AbstractValues(),
atomic.LoadInt64(&this.migrationContext.ChunkSize),

View File

@ -238,32 +238,33 @@ func BuildRangeInsertPreparedQuery(databaseName, originalTableName, ghostTableNa
return BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName, sharedColumns, mappedSharedColumns, uniqueKey, uniqueKeyColumns, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, includeRangeStartValues, transactionalTable)
}
func BuildUniqueKeyRangeEndPreparedQueryViaOffset(databaseName, tableName string, uniqueKeyColumns *ColumnList, rangeStartArgs, rangeEndArgs []interface{}, chunkSize int64, includeRangeStartValues bool, hint string) (result string, explodedArgs []interface{}, err error) {
if uniqueKeyColumns.Len() == 0 {
func BuildUniqueKeyRangeEndPreparedQueryViaOffset(databaseName, tableName string, uniqueKey *UniqueKey, rangeStartArgs, rangeEndArgs []interface{}, chunkSize int64, includeRangeStartValues bool, hint string) (result string, explodedArgs []interface{}, err error) {
if uniqueKey.Columns.Len() == 0 {
return "", explodedArgs, fmt.Errorf("Got 0 columns in BuildUniqueKeyRangeEndPreparedQuery")
}
databaseName = EscapeName(databaseName)
tableName = EscapeName(tableName)
indexName := EscapeName(uniqueKey.Name)
var startRangeComparisonSign ValueComparisonSign = GreaterThanComparisonSign
if includeRangeStartValues {
startRangeComparisonSign = GreaterThanOrEqualsComparisonSign
}
rangeStartComparison, rangeExplodedArgs, err := BuildRangePreparedComparison(uniqueKeyColumns, rangeStartArgs, startRangeComparisonSign)
rangeStartComparison, rangeExplodedArgs, err := BuildRangePreparedComparison(&uniqueKey.Columns, rangeStartArgs, startRangeComparisonSign)
if err != nil {
return "", explodedArgs, err
}
explodedArgs = append(explodedArgs, rangeExplodedArgs...)
rangeEndComparison, rangeExplodedArgs, err := BuildRangePreparedComparison(uniqueKeyColumns, rangeEndArgs, LessThanOrEqualsComparisonSign)
rangeEndComparison, rangeExplodedArgs, err := BuildRangePreparedComparison(&uniqueKey.Columns, rangeEndArgs, LessThanOrEqualsComparisonSign)
if err != nil {
return "", explodedArgs, err
}
explodedArgs = append(explodedArgs, rangeExplodedArgs...)
uniqueKeyColumnNames := duplicateNames(uniqueKeyColumns.Names())
uniqueKeyColumnNames := duplicateNames(uniqueKey.Columns.Names())
uniqueKeyColumnAscending := make([]string, len(uniqueKeyColumnNames))
uniqueKeyColumnDescending := make([]string, len(uniqueKeyColumnNames))
for i, column := range uniqueKeyColumns.Columns() {
for i, column := range uniqueKey.Columns.Columns() {
uniqueKeyColumnNames[i] = EscapeName(uniqueKeyColumnNames[i])
if column.Type == EnumColumnType {
uniqueKeyColumnAscending[i] = fmt.Sprintf("concat(%s) asc", uniqueKeyColumnNames[i])
@ -278,6 +279,7 @@ func BuildUniqueKeyRangeEndPreparedQueryViaOffset(databaseName, tableName string
%s
from
%s.%s
force index (%s)
where %s and %s
order by
%s
@ -285,7 +287,7 @@ func BuildUniqueKeyRangeEndPreparedQueryViaOffset(databaseName, tableName string
offset %d
`, databaseName, tableName, hint,
strings.Join(uniqueKeyColumnNames, ", "),
databaseName, tableName,
databaseName, tableName, indexName,
rangeStartComparison, rangeEndComparison,
strings.Join(uniqueKeyColumnAscending, ", "),
(chunkSize - 1),
@ -293,32 +295,33 @@ func BuildUniqueKeyRangeEndPreparedQueryViaOffset(databaseName, tableName string
return result, explodedArgs, nil
}
func BuildUniqueKeyRangeEndPreparedQueryViaTemptable(databaseName, tableName string, uniqueKeyColumns *ColumnList, rangeStartArgs, rangeEndArgs []interface{}, chunkSize int64, includeRangeStartValues bool, hint string) (result string, explodedArgs []interface{}, err error) {
if uniqueKeyColumns.Len() == 0 {
func BuildUniqueKeyRangeEndPreparedQueryViaTemptable(databaseName, tableName string, uniqueKey *UniqueKey, rangeStartArgs, rangeEndArgs []interface{}, chunkSize int64, includeRangeStartValues bool, hint string) (result string, explodedArgs []interface{}, err error) {
if uniqueKey.Columns.Len() == 0 {
return "", explodedArgs, fmt.Errorf("Got 0 columns in BuildUniqueKeyRangeEndPreparedQuery")
}
databaseName = EscapeName(databaseName)
tableName = EscapeName(tableName)
indexName := EscapeName(uniqueKey.Name)
var startRangeComparisonSign ValueComparisonSign = GreaterThanComparisonSign
if includeRangeStartValues {
startRangeComparisonSign = GreaterThanOrEqualsComparisonSign
}
rangeStartComparison, rangeExplodedArgs, err := BuildRangePreparedComparison(uniqueKeyColumns, rangeStartArgs, startRangeComparisonSign)
rangeStartComparison, rangeExplodedArgs, err := BuildRangePreparedComparison(&uniqueKey.Columns, rangeStartArgs, startRangeComparisonSign)
if err != nil {
return "", explodedArgs, err
}
explodedArgs = append(explodedArgs, rangeExplodedArgs...)
rangeEndComparison, rangeExplodedArgs, err := BuildRangePreparedComparison(uniqueKeyColumns, rangeEndArgs, LessThanOrEqualsComparisonSign)
rangeEndComparison, rangeExplodedArgs, err := BuildRangePreparedComparison(&uniqueKey.Columns, rangeEndArgs, LessThanOrEqualsComparisonSign)
if err != nil {
return "", explodedArgs, err
}
explodedArgs = append(explodedArgs, rangeExplodedArgs...)
uniqueKeyColumnNames := duplicateNames(uniqueKeyColumns.Names())
uniqueKeyColumnNames := duplicateNames(uniqueKey.Columns.Names())
uniqueKeyColumnAscending := make([]string, len(uniqueKeyColumnNames))
uniqueKeyColumnDescending := make([]string, len(uniqueKeyColumnNames))
for i, column := range uniqueKeyColumns.Columns() {
for i, column := range uniqueKey.Columns.Columns() {
uniqueKeyColumnNames[i] = EscapeName(uniqueKeyColumnNames[i])
if column.Type == EnumColumnType {
uniqueKeyColumnAscending[i] = fmt.Sprintf("concat(%s) asc", uniqueKeyColumnNames[i])
@ -335,6 +338,7 @@ func BuildUniqueKeyRangeEndPreparedQueryViaTemptable(databaseName, tableName str
%s
from
%s.%s
force index (%s)
where %s and %s
order by
%s
@ -344,7 +348,7 @@ func BuildUniqueKeyRangeEndPreparedQueryViaTemptable(databaseName, tableName str
%s
limit 1
`, databaseName, tableName, hint, strings.Join(uniqueKeyColumnNames, ", "),
strings.Join(uniqueKeyColumnNames, ", "), databaseName, tableName,
strings.Join(uniqueKeyColumnNames, ", "), databaseName, tableName, indexName,
rangeStartComparison, rangeEndComparison,
strings.Join(uniqueKeyColumnAscending, ", "), chunkSize,
strings.Join(uniqueKeyColumnDescending, ", "),

View File

@ -280,10 +280,11 @@ func TestBuildUniqueKeyRangeEndPreparedQuery(t *testing.T) {
var chunkSize int64 = 500
{
uniqueKeyColumns := NewColumnList([]string{"name", "position"})
uniqueKey := UniqueKey{"PRIMARY", *uniqueKeyColumns, false, false}
rangeStartArgs := []interface{}{3, 17}
rangeEndArgs := []interface{}{103, 117}
query, explodedArgs, err := BuildUniqueKeyRangeEndPreparedQueryViaTemptable(databaseName, originalTableName, uniqueKeyColumns, rangeStartArgs, rangeEndArgs, chunkSize, false, "test")
query, explodedArgs, err := BuildUniqueKeyRangeEndPreparedQueryViaTemptable(databaseName, originalTableName, &uniqueKey, rangeStartArgs, rangeEndArgs, chunkSize, false, "test")
test.S(t).ExpectNil(err)
expected := `
select /* gh-ost mydb.tbl test */ name, position
@ -292,6 +293,7 @@ func TestBuildUniqueKeyRangeEndPreparedQuery(t *testing.T) {
name, position
from
mydb.tbl
force index (PRIMARY)
where ((name > ?) or (((name = ?)) AND (position > ?))) and ((name < ?) or (((name = ?)) AND (position < ?)) or ((name = ?) and (position = ?)))
order by
name asc, position asc