Added BuildPKSelectPreparedQuery() and tests; this is first part in the decoupleing of insert...select

This commit is contained in:
Shlomi Noach 2017-01-04 14:55:08 +02:00
parent 3fcd8d093b
commit 39c9f5da75
2 changed files with 85 additions and 0 deletions

View File

@ -507,3 +507,51 @@ func BuildPKInsertPreparedQuery(databaseName, originalTableName, ghostTableName
pkValues := buildColumnsPreparedValues(&uniqueKey.Columns)
return BuildPKInsertQuery(databaseName, originalTableName, ghostTableName, tableColumns, sharedColumns, mappedSharedColumns, uniqueKey, pkValues, args, transactionalTable)
}
func BuildPKSelectQuery(databaseName, originalTableName string, tableColumns *ColumnList, sharedColumns []string, uniqueKey *UniqueKey, pkValues []string, args []interface{}, transactionalTable bool) (result string, uniqueKeyArgs []interface{}, err error) {
if !uniqueKey.IsPrimary() {
return "", uniqueKeyArgs, fmt.Errorf("BuildPKSelectQuery only works for PRIMARY KEY")
}
if len(sharedColumns) == 0 {
return "", uniqueKeyArgs, fmt.Errorf("Got 0 shared columns in BuildPKSelectQuery")
}
databaseName = EscapeName(databaseName)
originalTableName = EscapeName(originalTableName)
sharedColumns = duplicateNames(sharedColumns)
for i := range sharedColumns {
sharedColumns[i] = EscapeName(sharedColumns[i])
}
sharedColumnsListing := strings.Join(sharedColumns, ", ")
uniqueKeyName := EscapeName(uniqueKey.Name)
equalsComparison, err := BuildEqualsComparison(uniqueKey.Columns.Names(), pkValues)
if err != nil {
return "", uniqueKeyArgs, err
}
for _, column := range uniqueKey.Columns.Columns() {
tableOrdinal := tableColumns.Ordinals[column.Name]
arg := column.convertArg(args[tableOrdinal])
uniqueKeyArgs = append(uniqueKeyArgs, arg)
}
transactionalClause := ""
if transactionalTable {
transactionalClause = "lock in share mode"
}
result = fmt.Sprintf(`
select %s from %s.%s force index (%s)
where %s %s
`,
sharedColumnsListing, databaseName, originalTableName, uniqueKeyName,
equalsComparison, transactionalClause)
return result, uniqueKeyArgs, nil
}
func BuildPKSelectPreparedQuery(databaseName, originalTableName string, tableColumns *ColumnList, sharedColumns []string, uniqueKey *UniqueKey, args []interface{}, transactionalTable bool) (result string, uniqueKeyArgs []interface{}, err error) {
pkValues := buildColumnsPreparedValues(&uniqueKey.Columns)
return BuildPKSelectQuery(databaseName, originalTableName, tableColumns, sharedColumns, uniqueKey, pkValues, args, transactionalTable)
}

View File

@ -746,3 +746,40 @@ func TestBuildPKInsertPreparedQuery(t *testing.T) {
test.S(t).ExpectTrue(reflect.DeepEqual(explodedArgs, []interface{}{"gromit", "dog"}))
}
}
func TestBuildPKSelectPreparedQuery(t *testing.T) {
databaseName := "mydb"
originalTableName := "tbl"
sharedColumns := []string{"id", "name", "position"}
tableColumns := NewColumnList(sharedColumns)
args := []interface{}{3, "gromit", "dog"}
{
uniqueKey := UniqueKey{Name: "PRIMARY", Columns: *NewColumnList([]string{"name", "position"})}
query, uniqueKeyArgs, err := BuildPKSelectPreparedQuery(databaseName, originalTableName, tableColumns, sharedColumns, &uniqueKey, args, false)
test.S(t).ExpectNil(err)
expected := `
select id, name, position from mydb.tbl force index (PRIMARY) where ((name = ?) and (position = ?))
`
test.S(t).ExpectEquals(normalizeQuery(query), normalizeQuery(expected))
test.S(t).ExpectTrue(reflect.DeepEqual(uniqueKeyArgs, []interface{}{"gromit", "dog"}))
}
{
sharedColumns := []string{"id", "name"}
uniqueKey := UniqueKey{Name: "PRIMARY", Columns: *NewColumnList([]string{"name"})}
query, uniqueKeyArgs, err := BuildPKSelectPreparedQuery(databaseName, originalTableName, tableColumns, sharedColumns, &uniqueKey, args, false)
test.S(t).ExpectNil(err)
expected := `
select id, name from mydb.tbl force index (PRIMARY) where ((name = ?))
`
test.S(t).ExpectEquals(normalizeQuery(query), normalizeQuery(expected))
test.S(t).ExpectTrue(reflect.DeepEqual(uniqueKeyArgs, []interface{}{"gromit"}))
}
{
uniqueKey := UniqueKey{Name: "name_uidx", Columns: *NewColumnList([]string{"name"})}
_, _, err := BuildPKSelectPreparedQuery(databaseName, originalTableName, tableColumns, sharedColumns, &uniqueKey, args, false)
test.S(t).ExpectNotNil(err)
}
}