Compare commits

...

5 Commits

Author SHA1 Message Date
Shlomi Noach
b674567cfa
Merge branch 'master' into mysql8-rename-table 2019-04-30 15:34:31 +03:00
Shlomi Noach
93d5e20f4b removed legacy code 2019-02-24 08:06:14 +02:00
Shlomi Noach
81df93dab3 checking specifically for 8.0.13 2019-02-19 17:30:35 +02:00
Shlomi Noach
8ce66775ed utils test 2019-02-17 15:53:42 +02:00
Shlomi Noach
157575540e Utilize MySQL 8.0.13 RENAME TABLE with LOCK 2019-02-17 15:53:31 +02:00
5 changed files with 167 additions and 67 deletions

View File

@ -542,6 +542,29 @@ func (this *Applier) UnlockTables() error {
return nil
}
func (this *Applier) RenameTablesMySQL8() error {
query := fmt.Sprintf(`set session lock_wait_timeout:=%d`, this.migrationContext.CutOverLockTimeoutSeconds)
if _, err := sqlutils.ExecNoPrepare(this.singletonDB, query); err != nil {
return err
}
query = fmt.Sprintf(`rename /* gh-ost */ table %s.%s to %s.%s, %s.%s to %s.%s`,
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.OriginalTableName),
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.GetOldTableName()),
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.GetGhostTableName()),
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.OriginalTableName),
)
log.Infof("Issuing and expecting this to succeed: %s", query)
if _, err := sqlutils.ExecNoPrepare(this.singletonDB, query); err != nil {
return err
}
return nil
}
// SwapTablesQuickAndBumpy issues a two-step swap table operation:
// - rename original table to _old
// - rename ghost table to original
@ -977,62 +1000,6 @@ func (this *Applier) buildDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) (result
return append(results, newDmlBuildResultError(fmt.Errorf("Unknown dml event type: %+v", dmlEvent.DML)))
}
// ApplyDMLEventQuery writes an entry to the ghost table, in response to an intercepted
// original-table binlog event
func (this *Applier) ApplyDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) error {
for _, buildResult := range this.buildDMLEventQuery(dmlEvent) {
if buildResult.err != nil {
return buildResult.err
}
// TODO The below is in preparation for transactional writes on the ghost tables.
// Such writes would be, for example:
// - prepended with sql_mode setup
// - prepended with time zone setup
// - prepended with SET SQL_LOG_BIN=0
// - prepended with SET FK_CHECKS=0
// etc.
//
// a known problem: https://github.com/golang/go/issues/9373 -- bitint unsigned values, not supported in database/sql
// is solved by silently converting unsigned bigints to string values.
//
err := func() error {
tx, err := this.db.Begin()
if err != nil {
return err
}
rollback := func(err error) error {
tx.Rollback()
return err
}
sessionQuery := fmt.Sprintf("SET SESSION time_zone = '+00:00'")
if !this.migrationContext.SkipStrictMode {
sessionQuery += ", sql_mode = CONCAT(@@session.sql_mode, ',STRICT_ALL_TABLES')"
}
if _, err := tx.Exec(sessionQuery); err != nil {
return rollback(err)
}
if _, err := tx.Exec(buildResult.query, buildResult.args...); err != nil {
return rollback(err)
}
if err := tx.Commit(); err != nil {
return err
}
return nil
}()
if err != nil {
err = fmt.Errorf("%s; query=%s; args=%+v", err.Error(), buildResult.query, buildResult.args)
return log.Errore(err)
}
// no error
atomic.AddInt64(&this.migrationContext.TotalDMLEventsApplied, 1)
if this.migrationContext.CountTableRows {
atomic.AddInt64(&this.migrationContext.RowsDeltaEstimate, buildResult.rowsDelta)
}
}
return nil
}
// ApplyDMLEventQueries applies multiple DML queries onto the _ghost_ table
func (this *Applier) ApplyDMLEventQueries(dmlEvents [](*binlog.BinlogDMLEvent)) error {

View File

@ -518,19 +518,23 @@ func (this *Migrator) cutOver() (err error) {
}
}
}
if this.migrationContext.CutOverType == base.CutOverAtomic {
var cutOverFunc func() error
if !mysql.IsSmallerMinorVersion(this.migrationContext.ApplierMySQLVersion, "8.0.13") {
// This is MySQL 8.0 or above. We can utilize a new ALTER TABLE featiure that supports
// RENAME while the table is locked.
cutOverFunc = this.cutOverMySQL8013
} else if this.migrationContext.CutOverType == base.CutOverAtomic {
// Atomic solution: we use low timeout and multiple attempts. But for
// each failed attempt, we throttle until replication lag is back to normal
err := this.atomicCutOver()
this.handleCutOverResult(err)
return err
cutOverFunc = this.atomicCutOver
} else if this.migrationContext.CutOverType == base.CutOverTwoStep {
cutOverFunc = this.cutOverTwoStep
} else {
return log.Fatalf("Unknown cut-over type: %d; should never get here!", this.migrationContext.CutOverType)
}
if this.migrationContext.CutOverType == base.CutOverTwoStep {
err := this.cutOverTwoStep()
this.handleCutOverResult(err)
return err
}
return log.Fatalf("Unknown cut-over type: %d; should never get here!", this.migrationContext.CutOverType)
err = cutOverFunc()
this.handleCutOverResult(err)
return err
}
// Inject the "AllEventsUpToLockProcessed" state hint, wait for it to appear in the binary logs,
@ -573,6 +577,34 @@ func (this *Migrator) waitForEventsUpToLock() (err error) {
return nil
}
// cutOverMySQL8013 utilizes a new deveopment starting MySQL 8.0.13 where RENAME TABLE is
// possible where a table is LOCKED under WRITE LOCK.
// This feature was developed specifically at the request of the `gh-ost` maintainers.
func (this *Migrator) cutOverMySQL8013() (err error) {
atomic.StoreInt64(&this.migrationContext.InCutOverCriticalSectionFlag, 1)
defer atomic.StoreInt64(&this.migrationContext.InCutOverCriticalSectionFlag, 0)
atomic.StoreInt64(&this.migrationContext.AllEventsUpToLockProcessedInjectedFlag, 0)
if err := this.retryOperation(this.applier.LockOriginalTable); err != nil {
return err
}
if err := this.retryOperation(this.waitForEventsUpToLock); err != nil {
return err
}
if err := this.retryOperation(this.applier.RenameTablesMySQL8); err != nil {
return err
}
if err := this.retryOperation(this.applier.UnlockTables); err != nil {
return err
}
lockAndRenameDuration := this.migrationContext.RenameTablesEndTime.Sub(this.migrationContext.LockTablesStartTime)
renameDuration := this.migrationContext.RenameTablesEndTime.Sub(this.migrationContext.RenameTablesStartTime)
log.Debugf("Lock & rename duration: %s (rename only: %s). During this time, queries on %s were locked or failing", lockAndRenameDuration, renameDuration, sql.EscapeName(this.migrationContext.OriginalTableName))
return nil
}
// cutOverTwoStep will lock down the original table, execute
// what's left of last DML entries, and **non-atomically** swap original->old, then new->original.
// There is a point in time where the "original" table does not exist and queries are non-blocked

View File

@ -144,7 +144,7 @@ func (this *Server) applyServerCommand(command string, writer *bufio.Writer) (pr
switch command {
case "help":
{
fmt.Fprintln(writer, `available commands:
fmt.Fprint(writer, `available commands:
status # Print a detailed status message
sup # Print a short status message
coordinates # Print the currently inspected coordinates

View File

@ -8,6 +8,7 @@ package mysql
import (
gosql "database/sql"
"fmt"
"strconv"
"strings"
"sync"
"time"
@ -203,3 +204,47 @@ func GetTableColumns(db *gosql.DB, databaseName, tableName string) (*sql.ColumnL
}
return sql.NewColumnList(columnNames), sql.NewColumnList(virtualColumnNames), nil
}
func versionTokens(version string, digits int) []int {
v := strings.Split(version, "-")[0]
tokens := strings.Split(v, ".")
intTokens := make([]int, digits)
for i := range tokens {
if i >= digits {
break
}
intTokens[i], _ = strconv.Atoi(tokens[i])
}
return intTokens
}
func isSmallerVersion(version string, otherVersion string, digits int) bool {
v := versionTokens(version, digits)
o := versionTokens(otherVersion, digits)
for i := 0; i < len(v); i++ {
if v[i] < o[i] {
return true
}
if v[i] > o[i] {
return false
}
if i == digits {
break
}
}
return false
}
// IsSmallerMajorVersion tests two versions against another and returns true if
// the former is a smaller "major" varsion than the latter.
// e.g. 5.5.36 is NOT a smaller major version as comapred to 5.5.40, but IS as compared to 5.6.9
func IsSmallerMajorVersion(version string, otherVersion string) bool {
return isSmallerVersion(version, otherVersion, 2)
}
// IsSmallerMinorVersion tests two versions against another and returns true if
// the former is a smaller "minor" varsion than the latter.
// e.g. 5.5.36 is a smaller major version as comapred to 5.5.40, as well as compared to 5.6.7
func IsSmallerMinorVersion(version string, otherVersion string) bool {
return isSmallerVersion(version, otherVersion, 3)
}

56
go/mysql/utils_test.go Normal file
View File

@ -0,0 +1,56 @@
/*
Copyright 2016 GitHub Inc.
See https://github.com/github/gh-ost/blob/master/LICENSE
*/
package mysql
import (
"reflect"
"testing"
"github.com/outbrain/golib/log"
test "github.com/outbrain/golib/tests"
)
func init() {
log.SetLevel(log.ERROR)
}
func TestVersionTokens(t *testing.T) {
test.S(t).ExpectTrue(reflect.DeepEqual(versionTokens("5.7.24-log", 3), []int{5, 7, 24}))
test.S(t).ExpectTrue(reflect.DeepEqual(versionTokens("8.0.13", 3), []int{8, 0, 13}))
test.S(t).ExpectTrue(reflect.DeepEqual(versionTokens("5.5", 2), []int{5, 5}))
test.S(t).ExpectTrue(reflect.DeepEqual(versionTokens("5.5", 3), []int{5, 5, 0}))
test.S(t).ExpectTrue(reflect.DeepEqual(versionTokens("5.5-log", 3), []int{5, 5, 0}))
}
func TestIsSmallerMajorVersion(t *testing.T) {
i55 := "5.5"
i5516 := "5.5.16"
i5517 := "5.5.17"
i56 := "5.6"
i8013 := "8.0.13"
test.S(t).ExpectFalse(IsSmallerMajorVersion(i55, i5517))
test.S(t).ExpectFalse(IsSmallerMajorVersion(i5516, i5517))
test.S(t).ExpectFalse(IsSmallerMajorVersion(i56, i5517))
test.S(t).ExpectTrue(IsSmallerMajorVersion(i55, i56))
test.S(t).ExpectTrue(IsSmallerMajorVersion(i56, i8013))
test.S(t).ExpectFalse(IsSmallerMajorVersion(i8013, i56))
}
func TestIsSmallerMinorVersion(t *testing.T) {
i55 := "5.5"
i5516 := "5.5.16"
i5517 := "5.5.17"
i56 := "5.6"
i8013 := "8.0.13"
test.S(t).ExpectTrue(IsSmallerMinorVersion(i55, i5517))
test.S(t).ExpectTrue(IsSmallerMinorVersion(i5516, i5517))
test.S(t).ExpectFalse(IsSmallerMinorVersion(i56, i5517))
test.S(t).ExpectTrue(IsSmallerMinorVersion(i55, i56))
test.S(t).ExpectTrue(IsSmallerMinorVersion(i56, i8013))
test.S(t).ExpectFalse(IsSmallerMinorVersion(i8013, i56))
}