Add managed rowcopy

This commit is contained in:
everpcpc 2018-03-23 15:21:42 +08:00
parent d2aca21a01
commit 74b35d8de7
No known key found for this signature in database
GPG Key ID: B068E3C65BC46F6B
5 changed files with 248 additions and 5 deletions

View File

@ -137,6 +137,7 @@ type MigrationContext struct {
TimestampOldTable bool // Should old table name include a timestamp
CutOverType CutOver
ReplicaServerId uint
ManagedRowCopy bool
Hostname string
AssumeMasterHostname string

View File

@ -79,6 +79,7 @@ func main() {
flag.BoolVar(&migrationContext.TimestampOldTable, "timestamp-old-table", false, "Use a timestamp in old table name. This makes old table names unique and non conflicting cross migrations")
cutOver := flag.String("cut-over", "atomic", "choose cut-over type (default|atomic, two-step)")
flag.BoolVar(&migrationContext.ForceNamedCutOverCommand, "force-named-cut-over", false, "When true, the 'unpostpone|cut-over' interactive command must name the migrated table")
flag.BoolVar(&migrationContext.ManagedRowCopy, "managed-rowcopy", false, "Copy row data by first reading rows into app, then applying them (default: rowcopy local to applied server via INSERT INTO ... SELECT)")
flag.BoolVar(&migrationContext.SwitchToRowBinlogFormat, "switch-to-rbr", false, "let this tool automatically switch binary log format to 'ROW' on the replica, if needed. The format will NOT be switched back. I'm too scared to do that, and wish to protect you if you happen to execute another migration while this one is running")
flag.BoolVar(&migrationContext.AssumeRBR, "assume-rbr", false, "set to 'true' when you know for certain your server uses 'ROW' binlog_format. gh-ost is unable to tell, event after reading binlog_format, whether the replication process does indeed use 'ROW', and restarts replication to be certain RBR setting is applied. Such operation requires SUPER privileges which you might not have. Setting this flag avoids restarting replication and you can proceed to use gh-ost without SUPER privileges")

View File

@ -8,6 +8,7 @@ package logic
import (
gosql "database/sql"
"fmt"
"strings"
"sync/atomic"
"time"
@ -457,6 +458,129 @@ func (this *Applier) CalculateNextIterationRangeEndValues() (hasFurtherRange boo
func (this *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected int64, duration time.Duration, err error) {
startTime := time.Now()
chunkSize = atomic.LoadInt64(&this.migrationContext.ChunkSize)
sessionQuery := fmt.Sprintf(`SET
SESSION time_zone = '%s',
sql_mode = CONCAT(@@session.sql_mode, ',STRICT_ALL_TABLES')
`, this.migrationContext.ApplierTimeZone)
if this.migrationContext.ManagedRowCopy {
selectQuery, selectExplodedArgs, err := sql.BuildRangeSelectPreparedQuery(
this.migrationContext.DatabaseName,
this.migrationContext.OriginalTableName,
this.migrationContext.SharedColumns.Names(),
this.migrationContext.UniqueKey.Name,
&this.migrationContext.UniqueKey.Columns,
this.migrationContext.MigrationIterationRangeMinValues.AbstractValues(),
this.migrationContext.MigrationIterationRangeMaxValues.AbstractValues(),
this.migrationContext.GetIteration() == 0,
this.migrationContext.IsTransactionalTable(),
)
if err != nil {
return chunkSize, rowsAffected, duration, err
}
applyQuery, err := sql.BuildRangeApplyPreparedQuery(
this.migrationContext.DatabaseName,
this.migrationContext.GetGhostTableName(),
this.migrationContext.MappedSharedColumns.Names(),
)
applyExplodedArgs := []interface{}{}
placeholder := make([]string, this.migrationContext.SharedColumns.Len())
for i := range placeholder {
placeholder[i] = "?"
}
applyPreparedQuery := fmt.Sprintf(` (%s),`, strings.Join(placeholder, ","))
err = func() error {
tx, err := this.db.Begin()
if err != nil {
return err
}
defer tx.Rollback()
if _, err := tx.Exec(sessionQuery); err != nil {
return err
}
values := make([]gosql.RawBytes, this.migrationContext.SharedColumns.Len())
scanArgs := make([]interface{}, this.migrationContext.SharedColumns.Len())
for i := range values {
scanArgs[i] = &values[i]
}
rows, err := tx.Query(selectQuery, selectExplodedArgs...)
if err != nil {
return err
}
for rows.Next() {
err = rows.Scan(scanArgs...)
if err != nil {
return err
}
applyQuery += applyPreparedQuery
for _, value := range values {
if value == nil {
applyExplodedArgs = append(applyExplodedArgs, nil)
} else {
applyExplodedArgs = append(applyExplodedArgs, string(value))
}
}
}
return nil
}()
if err != nil {
return chunkSize, rowsAffected, duration, err
}
log.Debugf(
"Issued SELECT on range: [%s]..[%s]; iteration: %d; chunk-size: %d",
this.migrationContext.MigrationIterationRangeMinValues,
this.migrationContext.MigrationIterationRangeMaxValues,
this.migrationContext.GetIteration(),
chunkSize)
// strip the last comma
applyQuery = applyQuery[0 : len(applyQuery)-1]
sqlResult, err := func() (gosql.Result, error) {
tx, err := this.db.Begin()
if err != nil {
return nil, err
}
defer tx.Rollback()
if _, err := tx.Exec(sessionQuery); err != nil {
return nil, err
}
stmt, err := tx.Prepare(applyQuery)
if err != nil {
return nil, err
}
result, err := stmt.Exec(applyExplodedArgs...)
if err != nil {
return nil, err
}
err = stmt.Close()
if err != nil {
return nil, err
}
if err := tx.Commit(); err != nil {
return nil, err
}
return result, nil
}()
if err != nil {
return chunkSize, rowsAffected, duration, err
}
rowsAffected, _ = sqlResult.RowsAffected()
duration = time.Since(startTime)
log.Debugf(
"Issued INSERT on range: [%s]..[%s]; iteration: %d; chunk-size: %d",
this.migrationContext.MigrationIterationRangeMinValues,
this.migrationContext.MigrationIterationRangeMaxValues,
this.migrationContext.GetIteration(),
chunkSize)
return chunkSize, rowsAffected, duration, nil
}
query, explodedArgs, err := sql.BuildRangeInsertPreparedQuery(
this.migrationContext.DatabaseName,
@ -480,10 +604,6 @@ func (this *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected
if err != nil {
return nil, err
}
sessionQuery := fmt.Sprintf(`SET
SESSION time_zone = '%s',
sql_mode = CONCAT(@@session.sql_mode, ',STRICT_ALL_TABLES')
`, this.migrationContext.ApplierTimeZone)
if _, err := tx.Exec(sessionQuery); err != nil {
return nil, err
}
@ -990,7 +1110,7 @@ func (this *Applier) ApplyDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) error {
// - prepended with SET FK_CHECKS=0
// etc.
//
// a known problem: https://github.com/golang/go/issues/9373 -- bitint unsigned values, not supported in database/sql
// a known problem: https://github.com/golang/go/issues/9373 -- bigint unsigned values, not supported in database/sql
// is solved by silently converting unsigned bigints to string values.
//

View File

@ -235,6 +235,75 @@ func BuildRangeInsertPreparedQuery(databaseName, originalTableName, ghostTableNa
return BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName, sharedColumns, mappedSharedColumns, uniqueKey, uniqueKeyColumns, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, includeRangeStartValues, transactionalTable)
}
// BuildRangeSelectQuery ...
func BuildRangeSelectQuery(databaseName, originalTableName string, sharedColumns []string, uniqueKey string, uniqueKeyColumns *ColumnList, rangeStartValues, rangeEndValues []string, rangeStartArgs, rangeEndArgs []interface{}, includeRangeStartValues bool, transactionalTable bool) (result string, explodedArgs []interface{}, err error) {
if len(sharedColumns) == 0 {
return "", explodedArgs, fmt.Errorf("Got 0 shared columns in BuildRangeSelectQuery")
}
databaseName = EscapeName(databaseName)
originalTableName = EscapeName(originalTableName)
sharedColumns = duplicateNames(sharedColumns)
for i := range sharedColumns {
sharedColumns[i] = EscapeName(sharedColumns[i])
}
sharedColumnsListing := strings.Join(sharedColumns, ", ")
uniqueKey = EscapeName(uniqueKey)
var minRangeComparisonSign ValueComparisonSign = GreaterThanComparisonSign
if includeRangeStartValues {
minRangeComparisonSign = GreaterThanOrEqualsComparisonSign
}
rangeStartComparison, rangeExplodedArgs, err := BuildRangeComparison(uniqueKeyColumns.Names(), rangeStartValues, rangeStartArgs, minRangeComparisonSign)
if err != nil {
return "", explodedArgs, err
}
explodedArgs = append(explodedArgs, rangeExplodedArgs...)
rangeEndComparison, rangeExplodedArgs, err := BuildRangeComparison(uniqueKeyColumns.Names(), rangeEndValues, rangeEndArgs, LessThanOrEqualsComparisonSign)
if err != nil {
return "", explodedArgs, err
}
explodedArgs = append(explodedArgs, rangeExplodedArgs...)
transactionalClause := ""
if transactionalTable {
transactionalClause = "lock in share mode"
}
result = fmt.Sprintf(`
select /* gh-ost %s.%s */ %s from %s.%s force index (%s) where (%s and %s) %s
`, databaseName, originalTableName, sharedColumnsListing,
databaseName, originalTableName, uniqueKey,
rangeStartComparison, rangeEndComparison, transactionalClause,
)
return result, explodedArgs, nil
}
// BuildRangeSelectPreparedQuery ...
func BuildRangeSelectPreparedQuery(databaseName, originalTableName string, sharedColumns []string, uniqueKey string, uniqueKeyColumns *ColumnList, rangeStartArgs, rangeEndArgs []interface{}, includeRangeStartValues bool, transactionalTable bool) (result string, explodedArgs []interface{}, err error) {
rangeStartValues := buildColumnsPreparedValues(uniqueKeyColumns)
rangeEndValues := buildColumnsPreparedValues(uniqueKeyColumns)
return BuildRangeSelectQuery(databaseName, originalTableName, sharedColumns, uniqueKey, uniqueKeyColumns, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, includeRangeStartValues, transactionalTable)
}
// BuildRangeApplyPreparedQuery ...
func BuildRangeApplyPreparedQuery(databaseName, ghostTableName string, mappedSharedColumns []string) (result string, err error) {
if len(mappedSharedColumns) == 0 {
return "", fmt.Errorf("Got 0 mapped shared columns in BuildRangeApplyQuery")
}
databaseName = EscapeName(databaseName)
ghostTableName = EscapeName(ghostTableName)
mappedSharedColumns = duplicateNames(mappedSharedColumns)
for i := range mappedSharedColumns {
mappedSharedColumns[i] = EscapeName(mappedSharedColumns[i])
}
mappedSharedColumnsListing := strings.Join(mappedSharedColumns, ", ")
result = fmt.Sprintf(`
insert /* gh-ost %s.%s */ ignore into %s.%s (%s) values
`, databaseName, ghostTableName, databaseName, ghostTableName, mappedSharedColumnsListing)
return result, nil
}
func BuildUniqueKeyRangeEndPreparedQueryViaOffset(databaseName, tableName string, uniqueKeyColumns *ColumnList, rangeStartArgs, rangeEndArgs []interface{}, chunkSize int64, includeRangeStartValues bool, hint string) (result string, explodedArgs []interface{}, err error) {
if uniqueKeyColumns.Len() == 0 {
return "", explodedArgs, fmt.Errorf("Got 0 columns in BuildUniqueKeyRangeEndPreparedQuery")

View File

@ -204,6 +204,58 @@ func TestBuildRangeInsertQuery(t *testing.T) {
}
}
func TestBuildRangeSelectQuery(t *testing.T) {
databaseName := "mydb"
originalTableName := "tbl"
sharedColumns := []string{"id", "name", "position"}
{
uniqueKey := "PRIMARY"
uniqueKeyColumns := NewColumnList([]string{"id"})
rangeStartValues := []string{"@v1s"}
rangeEndValues := []string{"@v1e"}
rangeStartArgs := []interface{}{3}
rangeEndArgs := []interface{}{103}
query, explodedArgs, err := BuildRangeSelectQuery(databaseName, originalTableName, sharedColumns, uniqueKey, uniqueKeyColumns, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, true, false)
test.S(t).ExpectNil(err)
expected := `
select /* gh-ost mydb.tbl */ id, name, position from mydb.tbl force index (PRIMARY)
where (((id > @v1s) or ((id = @v1s))) and ((id < @v1e) or ((id = @v1e))))
`
test.S(t).ExpectEquals(normalizeQuery(query), normalizeQuery(expected))
test.S(t).ExpectTrue(reflect.DeepEqual(explodedArgs, []interface{}{3, 3, 103, 103}))
}
{
uniqueKey := "name_position_uidx"
uniqueKeyColumns := NewColumnList([]string{"name", "position"})
rangeStartValues := []string{"@v1s", "@v2s"}
rangeEndValues := []string{"@v1e", "@v2e"}
rangeStartArgs := []interface{}{3, 17}
rangeEndArgs := []interface{}{103, 117}
query, explodedArgs, err := BuildRangeSelectQuery(databaseName, originalTableName, sharedColumns, uniqueKey, uniqueKeyColumns, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, true, false)
test.S(t).ExpectNil(err)
expected := `
select /* gh-ost mydb.tbl */ id, name, position from mydb.tbl force index (name_position_uidx)
where (((name > @v1s) or (((name = @v1s)) AND (position > @v2s)) or ((name = @v1s) and (position = @v2s))) and ((name < @v1e) or (((name = @v1e)) AND (position < @v2e)) or ((name = @v1e) and (position = @v2e))))
`
test.S(t).ExpectEquals(normalizeQuery(query), normalizeQuery(expected))
test.S(t).ExpectTrue(reflect.DeepEqual(explodedArgs, []interface{}{3, 3, 17, 3, 17, 103, 103, 117, 103, 117}))
}
}
func TestBuildApplyQuery(t *testing.T) {
databaseName := "mydb"
ghostTableName := "ghost"
mappedSharedColumns := []string{"id", "name", "position"}
{
query, err := BuildRangeApplyPreparedQuery(databaseName, ghostTableName, mappedSharedColumns)
test.S(t).ExpectNil(err)
expected := `insert /* gh-ost mydb.ghost */ ignore into mydb.ghost (id, name, position) values`
test.S(t).ExpectEquals(normalizeQuery(query), normalizeQuery(expected))
}
}
func TestBuildRangeInsertQueryRenameMap(t *testing.T) {
databaseName := "mydb"
originalTableName := "tbl"