Compare commits
18 Commits
master
...
row-image-
Author | SHA1 | Date | |
---|---|---|---|
|
a19f0b48ae | ||
|
adac52e482 | ||
|
d7b6a9ff11 | ||
|
a45aa4f665 | ||
|
5ba2aebf03 | ||
|
2f6975d39a | ||
|
beeefa2ed3 | ||
|
fc787dd75f | ||
|
3d217765cc | ||
|
38d18e0a09 | ||
|
80054b98db | ||
|
88fc345c28 | ||
|
5e5197b579 | ||
|
39c9f5da75 | ||
|
3fcd8d093b | ||
|
41013901b1 | ||
|
004a6fe65c | ||
|
c4b9e1ed5e |
@ -24,6 +24,36 @@ const (
|
|||||||
atomicCutOverMagicHint = "ghost-cut-over-sentry"
|
atomicCutOverMagicHint = "ghost-cut-over-sentry"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type BuiltDMLQuery struct {
|
||||||
|
query string
|
||||||
|
args []interface{}
|
||||||
|
dataViaBinlog bool
|
||||||
|
rowsDelta int64
|
||||||
|
err error
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewBuiltDMLQuery(query string, args []interface{}, dataViaBinlog bool, rowsDelta int64, err error) *BuiltDMLQuery {
|
||||||
|
return &BuiltDMLQuery{
|
||||||
|
query: query,
|
||||||
|
args: args,
|
||||||
|
dataViaBinlog: dataViaBinlog,
|
||||||
|
rowsDelta: rowsDelta,
|
||||||
|
err: err,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type BuiltDMLQueryPair struct {
|
||||||
|
readQuery *BuiltDMLQuery
|
||||||
|
writeQuery *BuiltDMLQuery
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewBuiltDMLQueryPair(readQuery *BuiltDMLQuery, writeQuery *BuiltDMLQuery) *BuiltDMLQueryPair {
|
||||||
|
return &BuiltDMLQueryPair{
|
||||||
|
readQuery: readQuery,
|
||||||
|
writeQuery: writeQuery,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Applier connects and writes the the applier-server, which is the server where migration
|
// Applier connects and writes the the applier-server, which is the server where migration
|
||||||
// happens. This is typically the master, but could be a replica when `--test-on-replica` or
|
// happens. This is typically the master, but could be a replica when `--test-on-replica` or
|
||||||
// `--execute-on-replica` are given.
|
// `--execute-on-replica` are given.
|
||||||
@ -904,79 +934,38 @@ func (this *Applier) ShowStatusVariable(variableName string) (result int64, err
|
|||||||
|
|
||||||
// buildDMLEventQuery creates a query to operate on the ghost table, based on an intercepted binlog
|
// buildDMLEventQuery creates a query to operate on the ghost table, based on an intercepted binlog
|
||||||
// event entry on the original table.
|
// event entry on the original table.
|
||||||
func (this *Applier) buildDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) (query string, args []interface{}, rowsDelta int64, err error) {
|
func (this *Applier) buildDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) *BuiltDMLQueryPair {
|
||||||
|
var args []interface{}
|
||||||
switch dmlEvent.DML {
|
switch dmlEvent.DML {
|
||||||
case binlog.DeleteDML:
|
case binlog.DeleteDML:
|
||||||
{
|
{
|
||||||
query, uniqueKeyArgs, err := sql.BuildDMLDeleteQuery(dmlEvent.DatabaseName, this.migrationContext.GetGhostTableName(), this.migrationContext.OriginalTableColumns, &this.migrationContext.UniqueKey.Columns, dmlEvent.WhereColumnValues.AbstractValues())
|
query, uniqueKeyArgs, err := sql.BuildDMLDeleteQuery(dmlEvent.DatabaseName, this.migrationContext.GetGhostTableName(), this.migrationContext.OriginalTableColumns, &this.migrationContext.UniqueKey.Columns, dmlEvent.WhereColumnValues.AbstractValues())
|
||||||
return query, uniqueKeyArgs, -1, err
|
return NewBuiltDMLQueryPair(nil, NewBuiltDMLQuery(query, uniqueKeyArgs, true, -1, err))
|
||||||
}
|
}
|
||||||
case binlog.InsertDML:
|
case binlog.InsertDML:
|
||||||
{
|
{
|
||||||
|
// if this.migrationContext.UniqueKey.IsPrimary() {
|
||||||
|
// query, uniqueKeyArgs, err := sql.BuildPKSelectPreparedQuery(dmlEvent.DatabaseName, this.migrationContext.OriginalTableName, this.migrationContext.OriginalTableColumns, this.migrationContext.SharedColumns.Names(), this.migrationContext.UniqueKey, args, false)
|
||||||
|
// return NewBuiltDMLQuery(query, uniqueKeyArgs, false, 1, err)
|
||||||
|
// query, sharedArgs, err := sql.BuildPKInsertPreparedQuery(dmlEvent.DatabaseName, this.migrationContext.OriginalTableName, this.migrationContext.GetGhostTableName(), this.migrationContext.OriginalTableColumns, this.migrationContext.SharedColumns.Names(), this.migrationContext.MappedSharedColumns.Names(), this.migrationContext.UniqueKey, dmlEvent.NewColumnValues.AbstractValues(), this.migrationContext.IsTransactionalTable())
|
||||||
|
// return NewBuiltDMLQuery(query, sharedArgs, false, 1, err)
|
||||||
|
// }
|
||||||
query, sharedArgs, err := sql.BuildDMLInsertQuery(dmlEvent.DatabaseName, this.migrationContext.GetGhostTableName(), this.migrationContext.OriginalTableColumns, this.migrationContext.SharedColumns, this.migrationContext.MappedSharedColumns, dmlEvent.NewColumnValues.AbstractValues())
|
query, sharedArgs, err := sql.BuildDMLInsertQuery(dmlEvent.DatabaseName, this.migrationContext.GetGhostTableName(), this.migrationContext.OriginalTableColumns, this.migrationContext.SharedColumns, this.migrationContext.MappedSharedColumns, dmlEvent.NewColumnValues.AbstractValues())
|
||||||
return query, sharedArgs, 1, err
|
return NewBuiltDMLQueryPair(nil, NewBuiltDMLQuery(query, sharedArgs, true, 1, err))
|
||||||
}
|
}
|
||||||
case binlog.UpdateDML:
|
case binlog.UpdateDML:
|
||||||
{
|
{
|
||||||
|
// if this.migrationContext.UniqueKey.IsPrimary() {
|
||||||
|
// query, sharedArgs, err := sql.BuildPKInsertPreparedQuery(dmlEvent.DatabaseName, this.migrationContext.OriginalTableName, this.migrationContext.GetGhostTableName(), this.migrationContext.OriginalTableColumns, this.migrationContext.SharedColumns.Names(), this.migrationContext.MappedSharedColumns.Names(), this.migrationContext.UniqueKey, dmlEvent.NewColumnValues.AbstractValues(), this.migrationContext.IsTransactionalTable())
|
||||||
|
// return NewBuiltDMLQuery(query, sharedArgs, false, 1, err)
|
||||||
|
// }
|
||||||
query, sharedArgs, uniqueKeyArgs, err := sql.BuildDMLUpdateQuery(dmlEvent.DatabaseName, this.migrationContext.GetGhostTableName(), this.migrationContext.OriginalTableColumns, this.migrationContext.SharedColumns, this.migrationContext.MappedSharedColumns, &this.migrationContext.UniqueKey.Columns, dmlEvent.NewColumnValues.AbstractValues(), dmlEvent.WhereColumnValues.AbstractValues())
|
query, sharedArgs, uniqueKeyArgs, err := sql.BuildDMLUpdateQuery(dmlEvent.DatabaseName, this.migrationContext.GetGhostTableName(), this.migrationContext.OriginalTableColumns, this.migrationContext.SharedColumns, this.migrationContext.MappedSharedColumns, &this.migrationContext.UniqueKey.Columns, dmlEvent.NewColumnValues.AbstractValues(), dmlEvent.WhereColumnValues.AbstractValues())
|
||||||
args = append(args, sharedArgs...)
|
args = append(args, sharedArgs...)
|
||||||
args = append(args, uniqueKeyArgs...)
|
args = append(args, uniqueKeyArgs...)
|
||||||
return query, args, 0, err
|
return NewBuiltDMLQueryPair(nil, NewBuiltDMLQuery(query, args, true, 0, err))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return "", args, 0, fmt.Errorf("Unknown dml event type: %+v", dmlEvent.DML)
|
return NewBuiltDMLQueryPair(nil, NewBuiltDMLQuery("", args, false, 0, fmt.Errorf("Unknown dml event type: %+v", dmlEvent.DML)))
|
||||||
}
|
|
||||||
|
|
||||||
// ApplyDMLEventQuery writes an entry to the ghost table, in response to an intercepted
|
|
||||||
// original-table binlog event
|
|
||||||
func (this *Applier) ApplyDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) error {
|
|
||||||
query, args, rowDelta, err := this.buildDMLEventQuery(dmlEvent)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
// TODO The below is in preparation for transactional writes on the ghost tables.
|
|
||||||
// Such writes would be, for example:
|
|
||||||
// - prepended with sql_mode setup
|
|
||||||
// - prepended with time zone setup
|
|
||||||
// - prepended with SET SQL_LOG_BIN=0
|
|
||||||
// - prepended with SET FK_CHECKS=0
|
|
||||||
// etc.
|
|
||||||
//
|
|
||||||
// a known problem: https://github.com/golang/go/issues/9373 -- bitint unsigned values, not supported in database/sql
|
|
||||||
// is solved by silently converting unsigned bigints to string values.
|
|
||||||
//
|
|
||||||
|
|
||||||
err = func() error {
|
|
||||||
tx, err := this.db.Begin()
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
sessionQuery := `SET
|
|
||||||
SESSION time_zone = '+00:00',
|
|
||||||
sql_mode = CONCAT(@@session.sql_mode, ',STRICT_ALL_TABLES')
|
|
||||||
`
|
|
||||||
if _, err := tx.Exec(sessionQuery); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if _, err := tx.Exec(query, args...); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if err := tx.Commit(); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}()
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
err = fmt.Errorf("%s; query=%s; args=%+v", err.Error(), query, args)
|
|
||||||
return log.Errore(err)
|
|
||||||
}
|
|
||||||
// no error
|
|
||||||
atomic.AddInt64(&this.migrationContext.TotalDMLEventsApplied, 1)
|
|
||||||
if this.migrationContext.CountTableRows {
|
|
||||||
atomic.AddInt64(&this.migrationContext.RowsDeltaEstimate, rowDelta)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// ApplyDMLEventQueries applies multiple DML queries onto the _ghost_ table
|
// ApplyDMLEventQueries applies multiple DML queries onto the _ghost_ table
|
||||||
@ -1003,15 +992,16 @@ func (this *Applier) ApplyDMLEventQueries(dmlEvents [](*binlog.BinlogDMLEvent))
|
|||||||
return rollback(err)
|
return rollback(err)
|
||||||
}
|
}
|
||||||
for _, dmlEvent := range dmlEvents {
|
for _, dmlEvent := range dmlEvents {
|
||||||
query, args, rowDelta, err := this.buildDMLEventQuery(dmlEvent)
|
builtDmlQueries := this.buildDMLEventQuery(dmlEvent)
|
||||||
if err != nil {
|
writeQuery := builtDmlQueries.writeQuery
|
||||||
|
if writeQuery.err != nil {
|
||||||
return rollback(err)
|
return rollback(err)
|
||||||
}
|
}
|
||||||
if _, err := tx.Exec(query, args...); err != nil {
|
if _, err := tx.Exec(writeQuery.query, writeQuery.args...); err != nil {
|
||||||
err = fmt.Errorf("%s; query=%s; args=%+v", err.Error(), query, args)
|
err = fmt.Errorf("%s; query=%s; args=%+v", err.Error(), writeQuery.query, writeQuery.args)
|
||||||
return rollback(err)
|
return rollback(err)
|
||||||
}
|
}
|
||||||
totalDelta += rowDelta
|
totalDelta += writeQuery.rowsDelta
|
||||||
}
|
}
|
||||||
if err := tx.Commit(); err != nil {
|
if err := tx.Commit(); err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -450,3 +450,108 @@ func BuildDMLUpdateQuery(databaseName, tableName string, tableColumns, sharedCol
|
|||||||
)
|
)
|
||||||
return result, sharedArgs, uniqueKeyArgs, nil
|
return result, sharedArgs, uniqueKeyArgs, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func BuildPKInsertQuery(databaseName, originalTableName, ghostTableName string, tableColumns *ColumnList, sharedColumns []string, mappedSharedColumns []string, uniqueKey *UniqueKey, pkValues []string, args []interface{}, transactionalTable bool) (result string, uniqueKeyArgs []interface{}, err error) {
|
||||||
|
if !uniqueKey.IsPrimary() {
|
||||||
|
return "", uniqueKeyArgs, fmt.Errorf("BuildPKInsertQuery only works for PRIMARY KEY")
|
||||||
|
}
|
||||||
|
uniqueKeyColumns := &uniqueKey.Columns
|
||||||
|
if len(sharedColumns) == 0 {
|
||||||
|
return "", uniqueKeyArgs, fmt.Errorf("Got 0 shared columns in BuildRangeInsertQuery")
|
||||||
|
}
|
||||||
|
databaseName = EscapeName(databaseName)
|
||||||
|
originalTableName = EscapeName(originalTableName)
|
||||||
|
ghostTableName = EscapeName(ghostTableName)
|
||||||
|
|
||||||
|
mappedSharedColumns = duplicateNames(mappedSharedColumns)
|
||||||
|
for i := range mappedSharedColumns {
|
||||||
|
mappedSharedColumns[i] = EscapeName(mappedSharedColumns[i])
|
||||||
|
}
|
||||||
|
mappedSharedColumnsListing := strings.Join(mappedSharedColumns, ", ")
|
||||||
|
|
||||||
|
sharedColumns = duplicateNames(sharedColumns)
|
||||||
|
for i := range sharedColumns {
|
||||||
|
sharedColumns[i] = EscapeName(sharedColumns[i])
|
||||||
|
}
|
||||||
|
sharedColumnsListing := strings.Join(sharedColumns, ", ")
|
||||||
|
|
||||||
|
uniqueKeyName := EscapeName(uniqueKey.Name)
|
||||||
|
|
||||||
|
equalsComparison, err := BuildEqualsComparison(uniqueKeyColumns.Names(), pkValues)
|
||||||
|
if err != nil {
|
||||||
|
return "", uniqueKeyArgs, err
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, column := range uniqueKeyColumns.Columns() {
|
||||||
|
tableOrdinal := tableColumns.Ordinals[column.Name]
|
||||||
|
arg := column.convertArg(args[tableOrdinal])
|
||||||
|
uniqueKeyArgs = append(uniqueKeyArgs, arg)
|
||||||
|
}
|
||||||
|
|
||||||
|
transactionalClause := ""
|
||||||
|
if transactionalTable {
|
||||||
|
transactionalClause = "lock in share mode"
|
||||||
|
}
|
||||||
|
result = fmt.Sprintf(`
|
||||||
|
replace /* gh-ost %s.%s */ into %s.%s (%s)
|
||||||
|
(select %s from %s.%s force index (%s)
|
||||||
|
where (%s) %s
|
||||||
|
)
|
||||||
|
`, databaseName, originalTableName, databaseName, ghostTableName, mappedSharedColumnsListing,
|
||||||
|
sharedColumnsListing, databaseName, originalTableName, uniqueKeyName,
|
||||||
|
equalsComparison, transactionalClause)
|
||||||
|
return result, uniqueKeyArgs, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func BuildPKInsertPreparedQuery(databaseName, originalTableName, ghostTableName string, tableColumns *ColumnList, sharedColumns []string, mappedSharedColumns []string, uniqueKey *UniqueKey, args []interface{}, transactionalTable bool) (result string, uniqueKeyArgs []interface{}, err error) {
|
||||||
|
pkValues := buildColumnsPreparedValues(&uniqueKey.Columns)
|
||||||
|
return BuildPKInsertQuery(databaseName, originalTableName, ghostTableName, tableColumns, sharedColumns, mappedSharedColumns, uniqueKey, pkValues, args, transactionalTable)
|
||||||
|
}
|
||||||
|
|
||||||
|
func BuildPKSelectQuery(databaseName, originalTableName string, tableColumns *ColumnList, sharedColumns []string, uniqueKey *UniqueKey, pkValues []string, args []interface{}, transactionalTable bool) (result string, uniqueKeyArgs []interface{}, err error) {
|
||||||
|
if !uniqueKey.IsPrimary() {
|
||||||
|
return "", uniqueKeyArgs, fmt.Errorf("BuildPKSelectQuery only works for PRIMARY KEY")
|
||||||
|
}
|
||||||
|
if len(sharedColumns) == 0 {
|
||||||
|
return "", uniqueKeyArgs, fmt.Errorf("Got 0 shared columns in BuildPKSelectQuery")
|
||||||
|
}
|
||||||
|
databaseName = EscapeName(databaseName)
|
||||||
|
originalTableName = EscapeName(originalTableName)
|
||||||
|
|
||||||
|
sharedColumns = duplicateNames(sharedColumns)
|
||||||
|
for i := range sharedColumns {
|
||||||
|
sharedColumns[i] = EscapeName(sharedColumns[i])
|
||||||
|
}
|
||||||
|
sharedColumnsListing := strings.Join(sharedColumns, ", ")
|
||||||
|
|
||||||
|
uniqueKeyName := EscapeName(uniqueKey.Name)
|
||||||
|
|
||||||
|
equalsComparison, err := BuildEqualsComparison(uniqueKey.Columns.Names(), pkValues)
|
||||||
|
if err != nil {
|
||||||
|
return "", uniqueKeyArgs, err
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, column := range uniqueKey.Columns.Columns() {
|
||||||
|
tableOrdinal := tableColumns.Ordinals[column.Name]
|
||||||
|
arg := column.convertArg(args[tableOrdinal])
|
||||||
|
uniqueKeyArgs = append(uniqueKeyArgs, arg)
|
||||||
|
}
|
||||||
|
|
||||||
|
transactionalClause := ""
|
||||||
|
if transactionalTable {
|
||||||
|
transactionalClause = "lock in share mode"
|
||||||
|
}
|
||||||
|
|
||||||
|
result = fmt.Sprintf(`
|
||||||
|
select %s from %s.%s force index (%s)
|
||||||
|
where %s %s
|
||||||
|
`,
|
||||||
|
sharedColumnsListing, databaseName, originalTableName, uniqueKeyName,
|
||||||
|
equalsComparison, transactionalClause)
|
||||||
|
return result, uniqueKeyArgs, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func BuildPKSelectPreparedQuery(databaseName, originalTableName string, tableColumns *ColumnList, sharedColumns []string, uniqueKey *UniqueKey, args []interface{}, transactionalTable bool) (result string, uniqueKeyArgs []interface{}, err error) {
|
||||||
|
pkValues := buildColumnsPreparedValues(&uniqueKey.Columns)
|
||||||
|
return BuildPKSelectQuery(databaseName, originalTableName, tableColumns, sharedColumns, uniqueKey, pkValues, args, transactionalTable)
|
||||||
|
}
|
||||||
|
@ -677,3 +677,109 @@ func TestBuildDMLUpdateQuerySignedUnsigned(t *testing.T) {
|
|||||||
test.S(t).ExpectTrue(reflect.DeepEqual(uniqueKeyArgs, []interface{}{uint8(253)}))
|
test.S(t).ExpectTrue(reflect.DeepEqual(uniqueKeyArgs, []interface{}{uint8(253)}))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestBuildPKInsertQuery(t *testing.T) {
|
||||||
|
databaseName := "mydb"
|
||||||
|
originalTableName := "tbl"
|
||||||
|
ghostTableName := "ghost"
|
||||||
|
sharedColumns := []string{"id", "name", "position"}
|
||||||
|
tableColumns := NewColumnList(sharedColumns)
|
||||||
|
args := []interface{}{3, "gromit", "dog"}
|
||||||
|
{
|
||||||
|
uniqueKey := UniqueKey{Name: "PRIMARY", Columns: *NewColumnList([]string{"id"})}
|
||||||
|
pkValues := []string{"@v1"}
|
||||||
|
|
||||||
|
query, explodedArgs, err := BuildPKInsertQuery(databaseName, originalTableName, ghostTableName, tableColumns, sharedColumns, sharedColumns, &uniqueKey, pkValues, args, false)
|
||||||
|
test.S(t).ExpectNil(err)
|
||||||
|
expected := `
|
||||||
|
replace /* gh-ost mydb.tbl */ into mydb.ghost (id, name, position)
|
||||||
|
(select id, name, position from mydb.tbl force index (PRIMARY)
|
||||||
|
where (((id = @v1)))
|
||||||
|
)
|
||||||
|
`
|
||||||
|
test.S(t).ExpectEquals(normalizeQuery(query), normalizeQuery(expected))
|
||||||
|
test.S(t).ExpectTrue(reflect.DeepEqual(explodedArgs, []interface{}{3}))
|
||||||
|
}
|
||||||
|
{
|
||||||
|
uniqueKey := UniqueKey{Name: "PRIMARY", Columns: *NewColumnList([]string{"name", "position"})}
|
||||||
|
pkValues := []string{"@v1", "@v2"}
|
||||||
|
|
||||||
|
query, explodedArgs, err := BuildPKInsertQuery(databaseName, originalTableName, ghostTableName, tableColumns, sharedColumns, sharedColumns, &uniqueKey, pkValues, args, false)
|
||||||
|
test.S(t).ExpectNil(err)
|
||||||
|
expected := `
|
||||||
|
replace /* gh-ost mydb.tbl */ into mydb.ghost (id, name, position)
|
||||||
|
(select id, name, position from mydb.tbl force index (PRIMARY)
|
||||||
|
where (((name = @v1) and (position = @v2)))
|
||||||
|
)
|
||||||
|
`
|
||||||
|
test.S(t).ExpectEquals(normalizeQuery(query), normalizeQuery(expected))
|
||||||
|
test.S(t).ExpectTrue(reflect.DeepEqual(explodedArgs, []interface{}{"gromit", "dog"}))
|
||||||
|
}
|
||||||
|
{
|
||||||
|
uniqueKey := UniqueKey{Name: "name_position_uidx", Columns: *NewColumnList([]string{"name", "position"})}
|
||||||
|
pkValues := []string{"@v1", "@v2"}
|
||||||
|
|
||||||
|
_, _, err := BuildPKInsertQuery(databaseName, originalTableName, ghostTableName, tableColumns, sharedColumns, sharedColumns, &uniqueKey, pkValues, args, false)
|
||||||
|
test.S(t).ExpectNotNil(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestBuildPKInsertPreparedQuery(t *testing.T) {
|
||||||
|
databaseName := "mydb"
|
||||||
|
originalTableName := "tbl"
|
||||||
|
ghostTableName := "ghost"
|
||||||
|
sharedColumns := []string{"id", "name", "position"}
|
||||||
|
tableColumns := NewColumnList(sharedColumns)
|
||||||
|
args := []interface{}{3, "gromit", "dog"}
|
||||||
|
{
|
||||||
|
uniqueKey := UniqueKey{Name: "PRIMARY", Columns: *NewColumnList([]string{"name", "position"})}
|
||||||
|
|
||||||
|
query, explodedArgs, err := BuildPKInsertPreparedQuery(databaseName, originalTableName, ghostTableName, tableColumns, sharedColumns, sharedColumns, &uniqueKey, args, false)
|
||||||
|
test.S(t).ExpectNil(err)
|
||||||
|
expected := `
|
||||||
|
replace /* gh-ost mydb.tbl */ into mydb.ghost (id, name, position)
|
||||||
|
(select id, name, position from mydb.tbl force index (PRIMARY)
|
||||||
|
where (((name = ?) and (position = ?)))
|
||||||
|
)
|
||||||
|
`
|
||||||
|
test.S(t).ExpectEquals(normalizeQuery(query), normalizeQuery(expected))
|
||||||
|
test.S(t).ExpectTrue(reflect.DeepEqual(explodedArgs, []interface{}{"gromit", "dog"}))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestBuildPKSelectPreparedQuery(t *testing.T) {
|
||||||
|
databaseName := "mydb"
|
||||||
|
originalTableName := "tbl"
|
||||||
|
sharedColumns := []string{"id", "name", "position"}
|
||||||
|
tableColumns := NewColumnList(sharedColumns)
|
||||||
|
args := []interface{}{3, "gromit", "dog"}
|
||||||
|
{
|
||||||
|
uniqueKey := UniqueKey{Name: "PRIMARY", Columns: *NewColumnList([]string{"name", "position"})}
|
||||||
|
|
||||||
|
query, uniqueKeyArgs, err := BuildPKSelectPreparedQuery(databaseName, originalTableName, tableColumns, sharedColumns, &uniqueKey, args, false)
|
||||||
|
test.S(t).ExpectNil(err)
|
||||||
|
expected := `
|
||||||
|
select id, name, position from mydb.tbl force index (PRIMARY) where ((name = ?) and (position = ?))
|
||||||
|
`
|
||||||
|
test.S(t).ExpectEquals(normalizeQuery(query), normalizeQuery(expected))
|
||||||
|
test.S(t).ExpectTrue(reflect.DeepEqual(uniqueKeyArgs, []interface{}{"gromit", "dog"}))
|
||||||
|
}
|
||||||
|
{
|
||||||
|
sharedColumns := []string{"id", "name"}
|
||||||
|
uniqueKey := UniqueKey{Name: "PRIMARY", Columns: *NewColumnList([]string{"name"})}
|
||||||
|
|
||||||
|
query, uniqueKeyArgs, err := BuildPKSelectPreparedQuery(databaseName, originalTableName, tableColumns, sharedColumns, &uniqueKey, args, false)
|
||||||
|
test.S(t).ExpectNil(err)
|
||||||
|
expected := `
|
||||||
|
select id, name from mydb.tbl force index (PRIMARY) where ((name = ?))
|
||||||
|
`
|
||||||
|
test.S(t).ExpectEquals(normalizeQuery(query), normalizeQuery(expected))
|
||||||
|
test.S(t).ExpectTrue(reflect.DeepEqual(uniqueKeyArgs, []interface{}{"gromit"}))
|
||||||
|
}
|
||||||
|
{
|
||||||
|
uniqueKey := UniqueKey{Name: "name_uidx", Columns: *NewColumnList([]string{"name"})}
|
||||||
|
|
||||||
|
_, _, err := BuildPKSelectPreparedQuery(databaseName, originalTableName, tableColumns, sharedColumns, &uniqueKey, args, false)
|
||||||
|
test.S(t).ExpectNotNil(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
40
localtests/compound-pk-ts/create.sql
Normal file
40
localtests/compound-pk-ts/create.sql
Normal file
@ -0,0 +1,40 @@
|
|||||||
|
drop table if exists gh_ost_test;
|
||||||
|
create table gh_ost_test (
|
||||||
|
id int auto_increment,
|
||||||
|
i int not null,
|
||||||
|
ts0 timestamp(6) default current_timestamp(6),
|
||||||
|
updated tinyint unsigned default 0,
|
||||||
|
primary key(id, ts0)
|
||||||
|
) auto_increment=1;
|
||||||
|
|
||||||
|
drop event if exists gh_ost_test;
|
||||||
|
delimiter ;;
|
||||||
|
create event gh_ost_test
|
||||||
|
on schedule every 1 second
|
||||||
|
starts current_timestamp
|
||||||
|
ends current_timestamp + interval 60 second
|
||||||
|
on completion not preserve
|
||||||
|
enable
|
||||||
|
do
|
||||||
|
begin
|
||||||
|
insert into gh_ost_test values (null, 11, now(6), 0);
|
||||||
|
update gh_ost_test set updated = 1 where i = 11 order by id desc limit 1;
|
||||||
|
|
||||||
|
insert into gh_ost_test values (null, 13, now(6), 0);
|
||||||
|
update gh_ost_test set updated = 1 where i = 13 order by id desc limit 1;
|
||||||
|
|
||||||
|
insert into gh_ost_test values (null, 17, now(6), 0);
|
||||||
|
update gh_ost_test set updated = 1 where i = 17 order by id desc limit 1;
|
||||||
|
|
||||||
|
insert into gh_ost_test values (null, 19, now(6), 0);
|
||||||
|
update gh_ost_test set updated = 1 where i = 19 order by id desc limit 1;
|
||||||
|
|
||||||
|
insert into gh_ost_test values (null, 23, now(6), 0);
|
||||||
|
update gh_ost_test set updated = 1 where i = 23 order by id desc limit 1;
|
||||||
|
|
||||||
|
insert into gh_ost_test values (null, 29, now(6), 0);
|
||||||
|
insert into gh_ost_test values (null, 31, now(6), 0);
|
||||||
|
insert into gh_ost_test values (null, 37, now(6), 0);
|
||||||
|
insert into gh_ost_test values (null, 41, now(6), 0);
|
||||||
|
delete from gh_ost_test where i = 31 order by id desc limit 1;
|
||||||
|
end ;;
|
34
localtests/compound-pk/create.sql
Normal file
34
localtests/compound-pk/create.sql
Normal file
@ -0,0 +1,34 @@
|
|||||||
|
drop table if exists gh_ost_test;
|
||||||
|
create table gh_ost_test (
|
||||||
|
id int auto_increment,
|
||||||
|
i int not null,
|
||||||
|
v varchar(128),
|
||||||
|
updated tinyint unsigned default 0,
|
||||||
|
primary key(id, v)
|
||||||
|
) auto_increment=1;
|
||||||
|
|
||||||
|
drop event if exists gh_ost_test;
|
||||||
|
delimiter ;;
|
||||||
|
create event gh_ost_test
|
||||||
|
on schedule every 1 second
|
||||||
|
starts current_timestamp
|
||||||
|
ends current_timestamp + interval 60 second
|
||||||
|
on completion not preserve
|
||||||
|
enable
|
||||||
|
do
|
||||||
|
begin
|
||||||
|
insert into gh_ost_test values (null, 11, 'eleven', 0);
|
||||||
|
update gh_ost_test set updated = 1 where i = 11 order by id desc limit 1;
|
||||||
|
|
||||||
|
insert into gh_ost_test values (null, 13, 'thirteen', 0);
|
||||||
|
update gh_ost_test set updated = 1 where i = 13 order by id desc limit 1;
|
||||||
|
|
||||||
|
insert into gh_ost_test values (null, 17, 'seventeen', 0);
|
||||||
|
update gh_ost_test set updated = 1 where i = 17 order by id desc limit 1;
|
||||||
|
|
||||||
|
insert into gh_ost_test values (null, 19, 'nineteen', 0);
|
||||||
|
update gh_ost_test set updated = 1 where i = 19 order by id desc limit 1;
|
||||||
|
|
||||||
|
insert into gh_ost_test values (null, 23, 'twenty three', 0);
|
||||||
|
update gh_ost_test set updated = 1 where i = 23 order by id desc limit 1;
|
||||||
|
end ;;
|
@ -11,6 +11,8 @@ tests_path=$(dirname $0)
|
|||||||
test_logfile=/tmp/gh-ost-test.log
|
test_logfile=/tmp/gh-ost-test.log
|
||||||
ghost_binary=/tmp/gh-ost-test
|
ghost_binary=/tmp/gh-ost-test
|
||||||
exec_command_file=/tmp/gh-ost-test.bash
|
exec_command_file=/tmp/gh-ost-test.bash
|
||||||
|
orig_table_csv_file=/tmp/gh-ost-test.orig.csv
|
||||||
|
ghost_table_csv_file=/tmp/gh-ost-test.ghost.csv
|
||||||
|
|
||||||
test_pattern="${1:-.}"
|
test_pattern="${1:-.}"
|
||||||
|
|
||||||
@ -137,11 +139,15 @@ test_single() {
|
|||||||
ghost_checksum=$(gh-ost-test-mysql-replica --default-character-set=utf8mb4 test -e "select ${ghost_columns} from _gh_ost_test_gho ${order_by}" -ss | md5sum)
|
ghost_checksum=$(gh-ost-test-mysql-replica --default-character-set=utf8mb4 test -e "select ${ghost_columns} from _gh_ost_test_gho ${order_by}" -ss | md5sum)
|
||||||
|
|
||||||
if [ "$orig_checksum" != "$ghost_checksum" ] ; then
|
if [ "$orig_checksum" != "$ghost_checksum" ] ; then
|
||||||
|
gh-ost-test-mysql-replica --default-character-set=utf8mb4 test -e "select ${orig_columns} from gh_ost_test" -ss > ${orig_table_csv_file}
|
||||||
|
gh-ost-test-mysql-replica --default-character-set=utf8mb4 test -e "select ${ghost_columns} from _gh_ost_test_gho" -ss > ${ghost_table_csv_file}
|
||||||
echo "ERROR $test_name: checksum mismatch"
|
echo "ERROR $test_name: checksum mismatch"
|
||||||
echo "---"
|
echo "---"
|
||||||
gh-ost-test-mysql-replica --default-character-set=utf8mb4 test -e "select ${orig_columns} from gh_ost_test" -ss
|
cat ${orig_table_csv_file}
|
||||||
echo "---"
|
echo "---"
|
||||||
gh-ost-test-mysql-replica --default-character-set=utf8mb4 test -e "select ${ghost_columns} from _gh_ost_test_gho" -ss
|
cat ${ghost_table_csv_file}
|
||||||
|
echo "---"
|
||||||
|
echo "diff ${orig_table_csv_file} ${ghost_table_csv_file}"
|
||||||
return 1
|
return 1
|
||||||
fi
|
fi
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user