Merge pull request #471 from github/range-query-offset
Range query offset
This commit is contained in:
commit
7fa16df6aa
@ -398,35 +398,41 @@ func (this *Applier) CalculateNextIterationRangeEndValues() (hasFurtherRange boo
|
||||
if this.migrationContext.MigrationIterationRangeMinValues == nil {
|
||||
this.migrationContext.MigrationIterationRangeMinValues = this.migrationContext.MigrationRangeMinValues
|
||||
}
|
||||
query, explodedArgs, err := sql.BuildUniqueKeyRangeEndPreparedQuery(
|
||||
this.migrationContext.DatabaseName,
|
||||
this.migrationContext.OriginalTableName,
|
||||
&this.migrationContext.UniqueKey.Columns,
|
||||
this.migrationContext.MigrationIterationRangeMinValues.AbstractValues(),
|
||||
this.migrationContext.MigrationRangeMaxValues.AbstractValues(),
|
||||
atomic.LoadInt64(&this.migrationContext.ChunkSize),
|
||||
this.migrationContext.GetIteration() == 0,
|
||||
fmt.Sprintf("iteration:%d", this.migrationContext.GetIteration()),
|
||||
)
|
||||
if err != nil {
|
||||
return hasFurtherRange, err
|
||||
}
|
||||
rows, err := this.db.Query(query, explodedArgs...)
|
||||
if err != nil {
|
||||
return hasFurtherRange, err
|
||||
}
|
||||
iterationRangeMaxValues := sql.NewColumnValues(this.migrationContext.UniqueKey.Len())
|
||||
for rows.Next() {
|
||||
if err = rows.Scan(iterationRangeMaxValues.ValuesPointers...); err != nil {
|
||||
for i := 0; i < 2; i++ {
|
||||
buildFunc := sql.BuildUniqueKeyRangeEndPreparedQueryViaOffset
|
||||
if i == 1 {
|
||||
buildFunc = sql.BuildUniqueKeyRangeEndPreparedQueryViaTemptable
|
||||
}
|
||||
query, explodedArgs, err := buildFunc(
|
||||
this.migrationContext.DatabaseName,
|
||||
this.migrationContext.OriginalTableName,
|
||||
&this.migrationContext.UniqueKey.Columns,
|
||||
this.migrationContext.MigrationIterationRangeMinValues.AbstractValues(),
|
||||
this.migrationContext.MigrationRangeMaxValues.AbstractValues(),
|
||||
atomic.LoadInt64(&this.migrationContext.ChunkSize),
|
||||
this.migrationContext.GetIteration() == 0,
|
||||
fmt.Sprintf("iteration:%d", this.migrationContext.GetIteration()),
|
||||
)
|
||||
if err != nil {
|
||||
return hasFurtherRange, err
|
||||
}
|
||||
hasFurtherRange = true
|
||||
rows, err := this.db.Query(query, explodedArgs...)
|
||||
if err != nil {
|
||||
return hasFurtherRange, err
|
||||
}
|
||||
iterationRangeMaxValues := sql.NewColumnValues(this.migrationContext.UniqueKey.Len())
|
||||
for rows.Next() {
|
||||
if err = rows.Scan(iterationRangeMaxValues.ValuesPointers...); err != nil {
|
||||
return hasFurtherRange, err
|
||||
}
|
||||
hasFurtherRange = true
|
||||
}
|
||||
if hasFurtherRange {
|
||||
this.migrationContext.MigrationIterationRangeMaxValues = iterationRangeMaxValues
|
||||
return hasFurtherRange, nil
|
||||
}
|
||||
}
|
||||
if !hasFurtherRange {
|
||||
log.Debugf("Iteration complete: no further range to iterate")
|
||||
return hasFurtherRange, nil
|
||||
}
|
||||
this.migrationContext.MigrationIterationRangeMaxValues = iterationRangeMaxValues
|
||||
log.Debugf("Iteration complete: no further range to iterate")
|
||||
return hasFurtherRange, nil
|
||||
}
|
||||
|
||||
|
@ -231,7 +231,62 @@ func BuildRangeInsertPreparedQuery(databaseName, originalTableName, ghostTableNa
|
||||
return BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName, sharedColumns, mappedSharedColumns, uniqueKey, uniqueKeyColumns, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, includeRangeStartValues, transactionalTable)
|
||||
}
|
||||
|
||||
func BuildUniqueKeyRangeEndPreparedQuery(databaseName, tableName string, uniqueKeyColumns *ColumnList, rangeStartArgs, rangeEndArgs []interface{}, chunkSize int64, includeRangeStartValues bool, hint string) (result string, explodedArgs []interface{}, err error) {
|
||||
func BuildUniqueKeyRangeEndPreparedQueryViaOffset(databaseName, tableName string, uniqueKeyColumns *ColumnList, rangeStartArgs, rangeEndArgs []interface{}, chunkSize int64, includeRangeStartValues bool, hint string) (result string, explodedArgs []interface{}, err error) {
|
||||
if uniqueKeyColumns.Len() == 0 {
|
||||
return "", explodedArgs, fmt.Errorf("Got 0 columns in BuildUniqueKeyRangeEndPreparedQuery")
|
||||
}
|
||||
databaseName = EscapeName(databaseName)
|
||||
tableName = EscapeName(tableName)
|
||||
|
||||
var startRangeComparisonSign ValueComparisonSign = GreaterThanComparisonSign
|
||||
if includeRangeStartValues {
|
||||
startRangeComparisonSign = GreaterThanOrEqualsComparisonSign
|
||||
}
|
||||
rangeStartComparison, rangeExplodedArgs, err := BuildRangePreparedComparison(uniqueKeyColumns, rangeStartArgs, startRangeComparisonSign)
|
||||
if err != nil {
|
||||
return "", explodedArgs, err
|
||||
}
|
||||
explodedArgs = append(explodedArgs, rangeExplodedArgs...)
|
||||
rangeEndComparison, rangeExplodedArgs, err := BuildRangePreparedComparison(uniqueKeyColumns, rangeEndArgs, LessThanOrEqualsComparisonSign)
|
||||
if err != nil {
|
||||
return "", explodedArgs, err
|
||||
}
|
||||
explodedArgs = append(explodedArgs, rangeExplodedArgs...)
|
||||
|
||||
uniqueKeyColumnNames := duplicateNames(uniqueKeyColumns.Names())
|
||||
uniqueKeyColumnAscending := make([]string, len(uniqueKeyColumnNames), len(uniqueKeyColumnNames))
|
||||
uniqueKeyColumnDescending := make([]string, len(uniqueKeyColumnNames), len(uniqueKeyColumnNames))
|
||||
for i, column := range uniqueKeyColumns.Columns() {
|
||||
uniqueKeyColumnNames[i] = EscapeName(uniqueKeyColumnNames[i])
|
||||
if column.Type == EnumColumnType {
|
||||
uniqueKeyColumnAscending[i] = fmt.Sprintf("concat(%s) asc", uniqueKeyColumnNames[i])
|
||||
uniqueKeyColumnDescending[i] = fmt.Sprintf("concat(%s) desc", uniqueKeyColumnNames[i])
|
||||
} else {
|
||||
uniqueKeyColumnAscending[i] = fmt.Sprintf("%s asc", uniqueKeyColumnNames[i])
|
||||
uniqueKeyColumnDescending[i] = fmt.Sprintf("%s desc", uniqueKeyColumnNames[i])
|
||||
}
|
||||
}
|
||||
result = fmt.Sprintf(`
|
||||
select /* gh-ost %s.%s %s */
|
||||
%s
|
||||
from
|
||||
%s.%s
|
||||
where %s and %s
|
||||
order by
|
||||
%s
|
||||
limit 1
|
||||
offset %d
|
||||
`, databaseName, tableName, hint,
|
||||
strings.Join(uniqueKeyColumnNames, ", "),
|
||||
databaseName, tableName,
|
||||
rangeStartComparison, rangeEndComparison,
|
||||
strings.Join(uniqueKeyColumnAscending, ", "),
|
||||
(chunkSize - 1),
|
||||
)
|
||||
return result, explodedArgs, nil
|
||||
}
|
||||
|
||||
func BuildUniqueKeyRangeEndPreparedQueryViaTemptable(databaseName, tableName string, uniqueKeyColumns *ColumnList, rangeStartArgs, rangeEndArgs []interface{}, chunkSize int64, includeRangeStartValues bool, hint string) (result string, explodedArgs []interface{}, err error) {
|
||||
if uniqueKeyColumns.Len() == 0 {
|
||||
return "", explodedArgs, fmt.Errorf("Got 0 columns in BuildUniqueKeyRangeEndPreparedQuery")
|
||||
}
|
||||
|
@ -283,7 +283,7 @@ func TestBuildUniqueKeyRangeEndPreparedQuery(t *testing.T) {
|
||||
rangeStartArgs := []interface{}{3, 17}
|
||||
rangeEndArgs := []interface{}{103, 117}
|
||||
|
||||
query, explodedArgs, err := BuildUniqueKeyRangeEndPreparedQuery(databaseName, originalTableName, uniqueKeyColumns, rangeStartArgs, rangeEndArgs, chunkSize, false, "test")
|
||||
query, explodedArgs, err := BuildUniqueKeyRangeEndPreparedQueryViaTemptable(databaseName, originalTableName, uniqueKeyColumns, rangeStartArgs, rangeEndArgs, chunkSize, false, "test")
|
||||
test.S(t).ExpectNil(err)
|
||||
expected := `
|
||||
select /* gh-ost mydb.tbl test */ name, position
|
||||
|
@ -91,6 +91,7 @@ test_single() {
|
||||
--postpone-cut-over-flag-file=/tmp/gh-ost.test.postpone.flag \
|
||||
--test-on-replica \
|
||||
--default-retries=1 \
|
||||
--chunk-size=10 \
|
||||
--verbose \
|
||||
--debug \
|
||||
--stack \
|
||||
|
Loading…
Reference in New Issue
Block a user