added BuildUniqueKeyRangeEndPreparedQuery()

This commit is contained in:
Shlomi Noach 2016-04-01 16:05:17 +02:00
parent f0e37cde1c
commit b461c320cc
2 changed files with 91 additions and 0 deletions

View File

@ -113,10 +113,26 @@ func BuildRangeComparison(columns []string, values []string, comparisonSign Valu
return result, nil return result, nil
} }
func BuildRangePreparedComparison(columns []string, comparisonSign ValueComparisonSign) (result string, err error) {
values := make([]string, len(columns), len(columns))
for i := range columns {
values[i] = "?"
}
return BuildRangeComparison(columns, values, comparisonSign)
}
func BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName string, sharedColumns []string, uniqueKey string, uniqueKeyColumns, rangeStartValues, rangeEndValues []string) (string, error) { func BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName string, sharedColumns []string, uniqueKey string, uniqueKeyColumns, rangeStartValues, rangeEndValues []string) (string, error) {
if len(sharedColumns) == 0 { if len(sharedColumns) == 0 {
return "", fmt.Errorf("Got 0 shared columns in BuildRangeInsertQuery") return "", fmt.Errorf("Got 0 shared columns in BuildRangeInsertQuery")
} }
databaseName = EscapeName(databaseName)
originalTableName = EscapeName(originalTableName)
ghostTableName = EscapeName(ghostTableName)
for i := range sharedColumns {
sharedColumns[i] = EscapeName(sharedColumns[i])
}
uniqueKey = EscapeName(uniqueKey)
sharedColumnsListing := strings.Join(sharedColumns, ", ") sharedColumnsListing := strings.Join(sharedColumns, ", ")
rangeStartComparison, err := BuildRangeComparison(uniqueKeyColumns, rangeStartValues, GreaterThanOrEqualsComparisonSign) rangeStartComparison, err := BuildRangeComparison(uniqueKeyColumns, rangeStartValues, GreaterThanOrEqualsComparisonSign)
if err != nil { if err != nil {
@ -146,3 +162,49 @@ func BuildRangeInsertPreparedQuery(databaseName, originalTableName, ghostTableNa
} }
return BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName, sharedColumns, uniqueKey, uniqueKeyColumns, rangeStartValues, rangeEndValues) return BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName, sharedColumns, uniqueKey, uniqueKeyColumns, rangeStartValues, rangeEndValues)
} }
func BuildUniqueKeyRangeEndPreparedQuery(databaseName, originalTableName string, uniqueKeyColumns []string, chunkSize int) (string, error) {
if len(uniqueKeyColumns) == 0 {
return "", fmt.Errorf("Got 0 shared columns in BuildRangeInsertQuery")
}
databaseName = EscapeName(databaseName)
originalTableName = EscapeName(originalTableName)
rangeStartComparison, err := BuildRangePreparedComparison(uniqueKeyColumns, GreaterThanComparisonSign)
if err != nil {
return "", err
}
rangeEndComparison, err := BuildRangePreparedComparison(uniqueKeyColumns, LessThanOrEqualsComparisonSign)
if err != nil {
return "", err
}
uniqueKeyColumnAscending := make([]string, len(uniqueKeyColumns), len(uniqueKeyColumns))
uniqueKeyColumnDescending := make([]string, len(uniqueKeyColumns), len(uniqueKeyColumns))
for i := range uniqueKeyColumns {
uniqueKeyColumns[i] = EscapeName(uniqueKeyColumns[i])
uniqueKeyColumnAscending[i] = fmt.Sprintf("%s asc", uniqueKeyColumns[i])
uniqueKeyColumnDescending[i] = fmt.Sprintf("%s desc", uniqueKeyColumns[i])
}
query := fmt.Sprintf(`
select /* gh-osc %s.%s */ %s
from (
select
%s
from
%s.%s
where %s and %s
order by
%s
limit %d
) select_osc_chunk
order by
%s
limit 1
`, databaseName, originalTableName, strings.Join(uniqueKeyColumns, ", "),
strings.Join(uniqueKeyColumns, ", "), databaseName, originalTableName,
rangeStartComparison, rangeEndComparison,
strings.Join(uniqueKeyColumnAscending, ", "), chunkSize,
strings.Join(uniqueKeyColumnDescending, ", "),
)
return query, nil
}

View File

@ -176,3 +176,32 @@ func TestBuildRangeInsertPreparedQuery(t *testing.T) {
test.S(t).ExpectEquals(normalizeQuery(query), normalizeQuery(expected)) test.S(t).ExpectEquals(normalizeQuery(query), normalizeQuery(expected))
} }
} }
func TestBuildUniqueKeyRangeEndPreparedQuery(t *testing.T) {
databaseName := "mydb"
originalTableName := "tbl"
chunkSize := 500
{
uniqueKeyColumns := []string{"name", "position"}
query, err := BuildUniqueKeyRangeEndPreparedQuery(databaseName, originalTableName, uniqueKeyColumns, chunkSize)
test.S(t).ExpectNil(err)
expected := `
select /* gh-osc mydb.tbl */ name, position
from (
select
name, position
from
mydb.tbl
where ((name > ?) or (((name = ?)) AND (position > ?))) and ((name < ?) or (((name = ?)) AND (position < ?)) or ((name = ?) and (position = ?)))
order by
name asc, position asc
limit 500
) select_osc_chunk
order by
name desc, position desc
limit 1
`
test.S(t).ExpectEquals(normalizeQuery(query), normalizeQuery(expected))
}
}