Merge branch 'master' into nm-refactor-migration-context

This commit is contained in:
Shlomi Noach 2017-09-10 08:08:39 +03:00 committed by GitHub
commit a2847015d6
31 changed files with 225 additions and 48 deletions

View File

@ -28,7 +28,9 @@ The `SUPER` privilege is required for `STOP SLAVE`, `START SLAVE` operations. Th
- MySQL 5.7 generated columns are not supported. They may be supported in the future.
- MySQL 5.7 `JSON` columns are not supported. They are likely to be supported shortly.
- MySQL 5.7 `POINT` column type is not supported.
- MySQL 5.7 `JSON` columns are supported but not as part of `PRIMARY KEY`
- The two _before_ & _after_ tables must share a `PRIMARY KEY` or other `UNIQUE KEY`. This key will be used by `gh-ost` to iterate through the table rows when copying. [Read more](shared-key.md)
- The migration key must not include columns with NULL values. This means either:

View File

@ -63,7 +63,7 @@ func main() {
flag.BoolVar(&migrationContext.AllowedRunningOnMaster, "allow-on-master", false, "allow this migration to run directly on master. Preferably it would run on a replica")
flag.BoolVar(&migrationContext.AllowedMasterMaster, "allow-master-master", false, "explicitly allow running in a master-master setup")
flag.BoolVar(&migrationContext.NullableUniqueKeyAllowed, "allow-nullable-unique-key", false, "allow gh-ost to migrate based on a unique key with nullable columns. As long as no NULL values exist, this should be OK. If NULL values exist in chosen key, data may be corrupted. Use at your own risk!")
flag.BoolVar(&migrationContext.ApproveRenamedColumns, "approve-renamed-columns", false, "in case your `ALTER` statement renames columns, gh-ost will note that and offer its interpretation of the rename. By default gh-ost does not proceed to execute. This flag approves that gh-ost's interpretation si correct")
flag.BoolVar(&migrationContext.ApproveRenamedColumns, "approve-renamed-columns", false, "in case your `ALTER` statement renames columns, gh-ost will note that and offer its interpretation of the rename. By default gh-ost does not proceed to execute. This flag approves that gh-ost's interpretation is correct")
flag.BoolVar(&migrationContext.SkipRenamedColumns, "skip-renamed-columns", false, "in case your `ALTER` statement renames columns, gh-ost will note that and offer its interpretation of the rename. By default gh-ost does not proceed to execute. This flag tells gh-ost to skip the renamed columns, i.e. to treat what gh-ost thinks are renamed columns as unrelated columns. NOTE: you may lose column data")
flag.BoolVar(&migrationContext.IsTungsten, "tungsten", false, "explicitly let gh-ost know that you are running on a tungsten-replication based topology (you are likely to also provide --assume-master-host)")
flag.BoolVar(&migrationContext.DiscardForeignKeys, "discard-foreign-keys", false, "DANGER! This flag will migrate a table that has foreign keys and will NOT create foreign keys on the ghost table, thus your altered table will have NO foreign keys. This is useful for intentional dropping of foreign keys")

View File

@ -202,7 +202,7 @@ func (this *Applier) CreateChangelogTable() error {
id bigint auto_increment,
last_update timestamp not null DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
hint varchar(64) charset ascii not null,
value varchar(255) charset ascii not null,
value varchar(4096) charset ascii not null,
primary key(id),
unique key hint_uidx(hint)
) auto_increment=256
@ -403,35 +403,41 @@ func (this *Applier) CalculateNextIterationRangeEndValues() (hasFurtherRange boo
if this.migrationContext.MigrationIterationRangeMinValues == nil {
this.migrationContext.MigrationIterationRangeMinValues = this.migrationContext.MigrationRangeMinValues
}
query, explodedArgs, err := sql.BuildUniqueKeyRangeEndPreparedQuery(
this.migrationContext.DatabaseName,
this.migrationContext.OriginalTableName,
&this.migrationContext.UniqueKey.Columns,
this.migrationContext.MigrationIterationRangeMinValues.AbstractValues(),
this.migrationContext.MigrationRangeMaxValues.AbstractValues(),
atomic.LoadInt64(&this.migrationContext.ChunkSize),
this.migrationContext.GetIteration() == 0,
fmt.Sprintf("iteration:%d", this.migrationContext.GetIteration()),
)
if err != nil {
return hasFurtherRange, err
}
rows, err := this.db.Query(query, explodedArgs...)
if err != nil {
return hasFurtherRange, err
}
iterationRangeMaxValues := sql.NewColumnValues(this.migrationContext.UniqueKey.Len())
for rows.Next() {
if err = rows.Scan(iterationRangeMaxValues.ValuesPointers...); err != nil {
for i := 0; i < 2; i++ {
buildFunc := sql.BuildUniqueKeyRangeEndPreparedQueryViaOffset
if i == 1 {
buildFunc = sql.BuildUniqueKeyRangeEndPreparedQueryViaTemptable
}
query, explodedArgs, err := buildFunc(
this.migrationContext.DatabaseName,
this.migrationContext.OriginalTableName,
&this.migrationContext.UniqueKey.Columns,
this.migrationContext.MigrationIterationRangeMinValues.AbstractValues(),
this.migrationContext.MigrationRangeMaxValues.AbstractValues(),
atomic.LoadInt64(&this.migrationContext.ChunkSize),
this.migrationContext.GetIteration() == 0,
fmt.Sprintf("iteration:%d", this.migrationContext.GetIteration()),
)
if err != nil {
return hasFurtherRange, err
}
hasFurtherRange = true
rows, err := this.db.Query(query, explodedArgs...)
if err != nil {
return hasFurtherRange, err
}
iterationRangeMaxValues := sql.NewColumnValues(this.migrationContext.UniqueKey.Len())
for rows.Next() {
if err = rows.Scan(iterationRangeMaxValues.ValuesPointers...); err != nil {
return hasFurtherRange, err
}
hasFurtherRange = true
}
if hasFurtherRange {
this.migrationContext.MigrationIterationRangeMaxValues = iterationRangeMaxValues
return hasFurtherRange, nil
}
}
if !hasFurtherRange {
log.Debugf("Iteration complete: no further range to iterate")
return hasFurtherRange, nil
}
this.migrationContext.MigrationIterationRangeMaxValues = iterationRangeMaxValues
log.Debugf("Iteration complete: no further range to iterate")
return hasFurtherRange, nil
}

View File

@ -121,10 +121,33 @@ func (this *Inspector) inspectOriginalAndGhostTables() (err error) {
if err != nil {
return err
}
if len(sharedUniqueKeys) == 0 {
for i, sharedUniqueKey := range sharedUniqueKeys {
this.applyColumnTypes(this.migrationContext.DatabaseName, this.migrationContext.OriginalTableName, &sharedUniqueKey.Columns)
uniqueKeyIsValid := true
for _, column := range sharedUniqueKey.Columns.Columns() {
switch column.Type {
case sql.FloatColumnType:
{
log.Warning("Will not use %+v as shared key due to FLOAT data type", sharedUniqueKey.Name)
uniqueKeyIsValid = false
}
case sql.JSONColumnType:
{
// Noteworthy that at this time MySQL does not allow JSON indexing anyhow, but this code
// will remain in place to potentially handle the future case where JSON is supported in indexes.
log.Warning("Will not use %+v as shared key due to JSON data type", sharedUniqueKey.Name)
uniqueKeyIsValid = false
}
}
}
if uniqueKeyIsValid {
this.migrationContext.UniqueKey = sharedUniqueKeys[i]
break
}
}
if this.migrationContext.UniqueKey == nil {
return fmt.Errorf("No shared unique key can be found after ALTER! Bailing out")
}
this.migrationContext.UniqueKey = sharedUniqueKeys[0]
log.Infof("Chosen shared unique key is %s", this.migrationContext.UniqueKey.Name)
if this.migrationContext.UniqueKey.HasNullable {
if this.migrationContext.NullableUniqueKeyAllowed {
@ -169,6 +192,9 @@ func (this *Inspector) inspectOriginalAndGhostTables() (err error) {
// validateConnection issues a simple can-connect to MySQL
func (this *Inspector) validateConnection() error {
if len(this.connectionConfig.Password) > mysql.MaxReplicationPasswordLength {
return fmt.Errorf("MySQL replication length limited to 32 characters. See https://dev.mysql.com/doc/refman/5.7/en/assigning-passwords.html")
}
query := `select @@global.port, @@global.version`
var port int
if err := this.db.QueryRow(query).Scan(&port, &this.migrationContext.InspectorMySQLVersion); err != nil {
@ -545,6 +571,16 @@ func (this *Inspector) applyColumnTypes(databaseName, tableName string, columnsL
columnsList.GetColumn(columnName).Type = sql.DateTimeColumnType
}
}
if strings.Contains(columnType, "json") {
for _, columnsList := range columnsLists {
columnsList.GetColumn(columnName).Type = sql.JSONColumnType
}
}
if strings.Contains(columnType, "float") {
for _, columnsList := range columnsLists {
columnsList.GetColumn(columnName).Type = sql.FloatColumnType
}
}
if strings.HasPrefix(columnType, "enum") {
for _, columnsList := range columnsLists {
columnsList.GetColumn(columnName).Type = sql.EnumColumnType

View File

@ -56,5 +56,5 @@ func (this *ConnectionConfig) GetDBUri(databaseName string) string {
// Wrap IPv6 literals in square brackets
hostname = fmt.Sprintf("[%s]", hostname)
}
return fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8mb4,utf8,latin1", this.User, this.Password, hostname, this.Key.Port, databaseName)
return fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?interpolateParams=true&autocommit=true&charset=utf8mb4,utf8,latin1", this.User, this.Password, hostname, this.Key.Port, databaseName)
}

View File

@ -17,6 +17,7 @@ import (
)
const MaxTableNameLength = 64
const MaxReplicationPasswordLength = 32
type ReplicationLagResult struct {
Key InstanceKey

View File

@ -38,6 +38,8 @@ func buildColumnsPreparedValues(columns *ColumnList) []string {
var token string
if column.timezoneConversion != nil {
token = fmt.Sprintf("convert_tz(?, '%s', '%s')", column.timezoneConversion.ToTimezone, "+00:00")
} else if column.Type == JSONColumnType {
token = "convert(? using utf8mb4)"
} else {
token = "?"
}
@ -106,6 +108,8 @@ func BuildSetPreparedClause(columns *ColumnList) (result string, err error) {
var setToken string
if column.timezoneConversion != nil {
setToken = fmt.Sprintf("%s=convert_tz(?, '%s', '%s')", EscapeName(column.Name), column.timezoneConversion.ToTimezone, "+00:00")
} else if column.Type == JSONColumnType {
setToken = fmt.Sprintf("%s=convert(? using utf8mb4)", EscapeName(column.Name))
} else {
setToken = fmt.Sprintf("%s=?", EscapeName(column.Name))
}
@ -231,7 +235,62 @@ func BuildRangeInsertPreparedQuery(databaseName, originalTableName, ghostTableNa
return BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName, sharedColumns, mappedSharedColumns, uniqueKey, uniqueKeyColumns, rangeStartValues, rangeEndValues, rangeStartArgs, rangeEndArgs, includeRangeStartValues, transactionalTable)
}
func BuildUniqueKeyRangeEndPreparedQuery(databaseName, tableName string, uniqueKeyColumns *ColumnList, rangeStartArgs, rangeEndArgs []interface{}, chunkSize int64, includeRangeStartValues bool, hint string) (result string, explodedArgs []interface{}, err error) {
func BuildUniqueKeyRangeEndPreparedQueryViaOffset(databaseName, tableName string, uniqueKeyColumns *ColumnList, rangeStartArgs, rangeEndArgs []interface{}, chunkSize int64, includeRangeStartValues bool, hint string) (result string, explodedArgs []interface{}, err error) {
if uniqueKeyColumns.Len() == 0 {
return "", explodedArgs, fmt.Errorf("Got 0 columns in BuildUniqueKeyRangeEndPreparedQuery")
}
databaseName = EscapeName(databaseName)
tableName = EscapeName(tableName)
var startRangeComparisonSign ValueComparisonSign = GreaterThanComparisonSign
if includeRangeStartValues {
startRangeComparisonSign = GreaterThanOrEqualsComparisonSign
}
rangeStartComparison, rangeExplodedArgs, err := BuildRangePreparedComparison(uniqueKeyColumns, rangeStartArgs, startRangeComparisonSign)
if err != nil {
return "", explodedArgs, err
}
explodedArgs = append(explodedArgs, rangeExplodedArgs...)
rangeEndComparison, rangeExplodedArgs, err := BuildRangePreparedComparison(uniqueKeyColumns, rangeEndArgs, LessThanOrEqualsComparisonSign)
if err != nil {
return "", explodedArgs, err
}
explodedArgs = append(explodedArgs, rangeExplodedArgs...)
uniqueKeyColumnNames := duplicateNames(uniqueKeyColumns.Names())
uniqueKeyColumnAscending := make([]string, len(uniqueKeyColumnNames), len(uniqueKeyColumnNames))
uniqueKeyColumnDescending := make([]string, len(uniqueKeyColumnNames), len(uniqueKeyColumnNames))
for i, column := range uniqueKeyColumns.Columns() {
uniqueKeyColumnNames[i] = EscapeName(uniqueKeyColumnNames[i])
if column.Type == EnumColumnType {
uniqueKeyColumnAscending[i] = fmt.Sprintf("concat(%s) asc", uniqueKeyColumnNames[i])
uniqueKeyColumnDescending[i] = fmt.Sprintf("concat(%s) desc", uniqueKeyColumnNames[i])
} else {
uniqueKeyColumnAscending[i] = fmt.Sprintf("%s asc", uniqueKeyColumnNames[i])
uniqueKeyColumnDescending[i] = fmt.Sprintf("%s desc", uniqueKeyColumnNames[i])
}
}
result = fmt.Sprintf(`
select /* gh-ost %s.%s %s */
%s
from
%s.%s
where %s and %s
order by
%s
limit 1
offset %d
`, databaseName, tableName, hint,
strings.Join(uniqueKeyColumnNames, ", "),
databaseName, tableName,
rangeStartComparison, rangeEndComparison,
strings.Join(uniqueKeyColumnAscending, ", "),
(chunkSize - 1),
)
return result, explodedArgs, nil
}
func BuildUniqueKeyRangeEndPreparedQueryViaTemptable(databaseName, tableName string, uniqueKeyColumns *ColumnList, rangeStartArgs, rangeEndArgs []interface{}, chunkSize int64, includeRangeStartValues bool, hint string) (result string, explodedArgs []interface{}, err error) {
if uniqueKeyColumns.Len() == 0 {
return "", explodedArgs, fmt.Errorf("Got 0 columns in BuildUniqueKeyRangeEndPreparedQuery")
}

View File

@ -283,7 +283,7 @@ func TestBuildUniqueKeyRangeEndPreparedQuery(t *testing.T) {
rangeStartArgs := []interface{}{3, 17}
rangeEndArgs := []interface{}{103, 117}
query, explodedArgs, err := BuildUniqueKeyRangeEndPreparedQuery(databaseName, originalTableName, uniqueKeyColumns, rangeStartArgs, rangeEndArgs, chunkSize, false, "test")
query, explodedArgs, err := BuildUniqueKeyRangeEndPreparedQueryViaTemptable(databaseName, originalTableName, uniqueKeyColumns, rangeStartArgs, rangeEndArgs, chunkSize, false, "test")
test.S(t).ExpectNil(err)
expected := `
select /* gh-ost mydb.tbl test */ name, position

View File

@ -20,6 +20,8 @@ const (
DateTimeColumnType = iota
EnumColumnType = iota
MediumIntColumnType = iota
JSONColumnType = iota
FloatColumnType = iota
)
const maxMediumintUnsigned int32 = 16777215

View File

@ -3,9 +3,9 @@ create table gh_ost_test (
id int unsigned auto_increment,
i int not null,
ts0 timestamp default current_timestamp,
ts1 timestamp,
ts1 timestamp null,
dt2 datetime,
t datetime,
t datetime default current_timestamp,
updated tinyint unsigned default 0,
primary key(id, t),
key i_idx(i)

View File

@ -1 +1 @@
--alter="change column t t timestamp not null"
--alter="change column t t timestamp default current_timestamp"

View File

@ -3,9 +3,9 @@ create table gh_ost_test (
id int unsigned auto_increment,
i int not null,
ts0 timestamp default current_timestamp,
ts1 timestamp,
ts1 timestamp null,
dt2 datetime,
t datetime,
t datetime null,
updated tinyint unsigned default 0,
primary key(id),
key i_idx(i)

View File

@ -1 +1 @@
--alter="change column t t timestamp not null"
--alter="change column t t timestamp null"

View File

@ -2,7 +2,7 @@ drop table if exists gh_ost_test;
create table gh_ost_test (
id int auto_increment,
i int not null,
e enum('red', 'green', 'blue', 'orange') null default null collate 'utf8_bin',
e enum('red', 'green', 'blue', 'orange') not null default 'red' collate 'utf8_bin',
primary key(id, e)
) auto_increment=1;

View File

@ -0,0 +1,11 @@
drop table if exists gh_ost_test;
create table gh_ost_test (
f float,
i int not null,
ts timestamp default current_timestamp,
dt datetime,
key i_idx(i),
unique key f_uidx(f)
) auto_increment=1;
drop event if exists gh_ost_test;

View File

@ -0,0 +1 @@
No shared unique key can be found

View File

@ -0,0 +1 @@
--alter="add column v varchar(32)"

View File

@ -0,0 +1,7 @@
drop table if exists gh_ost_test;
create table gh_ost_test (
id int auto_increment,
i int not null,
ts timestamp,
primary key(id)
) auto_increment=1;

View File

@ -0,0 +1 @@
MySQL replication length limited to 32 characters

View File

@ -0,0 +1 @@
--password="0123456789abcdefghij0123456789abcdefghijxx"

View File

@ -0,0 +1,21 @@
drop table if exists gh_ost_test;
create table gh_ost_test (
id int auto_increment,
j json,
primary key(id)
) auto_increment=1;
drop event if exists gh_ost_test;
delimiter ;;
create event gh_ost_test
on schedule every 1 second
starts current_timestamp
ends current_timestamp + interval 60 second
on completion not preserve
enable
do
begin
insert into gh_ost_test values (null, '"sometext"');
insert into gh_ost_test values (null, '{"key":"val"}');
insert into gh_ost_test values (null, '{"is-it": true, "count": 3, "elements": []}');
end ;;

View File

@ -0,0 +1,27 @@
drop table if exists gh_ost_test;
create table gh_ost_test (
id int auto_increment,
i int not null,
updated tinyint not null default 0,
j json,
primary key(id)
) auto_increment=1;
drop event if exists gh_ost_test;
delimiter ;;
create event gh_ost_test
on schedule every 1 second
starts current_timestamp
ends current_timestamp + interval 60 second
on completion not preserve
enable
do
begin
insert into gh_ost_test (id, i, j) values (null, 11, '"sometext"');
insert into gh_ost_test (id, i, j) values (null, 13, '{"key":"val"}');
insert into gh_ost_test (id, i, j) values (null, 17, '{"is-it": true, "count": 3, "elements": []}');
update gh_ost_test set j = '{"updated": 11}', updated = 1 where i = 11 and updated = 0;
update gh_ost_test set j = json_set(j, '$.count', 13, '$.id', id), updated = 1 where i = 13 and updated = 0;
delete from gh_ost_test where i = 17;
end ;;

View File

@ -91,6 +91,7 @@ test_single() {
--postpone-cut-over-flag-file=/tmp/gh-ost.test.postpone.flag \
--test-on-replica \
--default-retries=1 \
--chunk-size=10 \
--verbose \
--debug \
--stack \

View File

@ -3,7 +3,7 @@ create table gh_ost_test (
id int auto_increment,
i int not null,
ts0 timestamp default current_timestamp,
ts1 timestamp,
ts1 timestamp default current_timestamp,
dt2 datetime,
t datetime,
updated tinyint unsigned default 0,

View File

@ -3,8 +3,8 @@ create table gh_ost_test (
id int auto_increment,
i int not null,
ts0 timestamp default current_timestamp,
ts1 timestamp,
ts2 timestamp,
ts1 timestamp default current_timestamp,
ts2 timestamp default current_timestamp,
updated tinyint unsigned default 0,
primary key(id),
key i_idx(i)

View File

@ -3,7 +3,7 @@ create table gh_ost_test (
id int auto_increment,
i int not null,
ts0 timestamp default current_timestamp,
ts1 timestamp,
ts1 timestamp default current_timestamp,
dt2 datetime,
t datetime,
updated tinyint unsigned default 0,

View File

@ -1 +1 @@
--alter="change column t t timestamp not null"
--alter="change column t t timestamp not null default current_timestamp"

View File

@ -3,8 +3,8 @@ create table gh_ost_test (
id int auto_increment,
i int not null,
ts0 timestamp default current_timestamp,
ts1 timestamp,
ts2 timestamp,
ts1 timestamp default current_timestamp,
ts2 timestamp default current_timestamp,
updated tinyint unsigned default 0,
primary key(id),
key i_idx(i)