Compare commits
5 Commits
master
...
mysql8-ren
Author | SHA1 | Date | |
---|---|---|---|
|
b674567cfa | ||
|
93d5e20f4b | ||
|
81df93dab3 | ||
|
8ce66775ed | ||
|
157575540e |
@ -542,6 +542,29 @@ func (this *Applier) UnlockTables() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (this *Applier) RenameTablesMySQL8() error {
|
||||
query := fmt.Sprintf(`set session lock_wait_timeout:=%d`, this.migrationContext.CutOverLockTimeoutSeconds)
|
||||
if _, err := sqlutils.ExecNoPrepare(this.singletonDB, query); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
query = fmt.Sprintf(`rename /* gh-ost */ table %s.%s to %s.%s, %s.%s to %s.%s`,
|
||||
sql.EscapeName(this.migrationContext.DatabaseName),
|
||||
sql.EscapeName(this.migrationContext.OriginalTableName),
|
||||
sql.EscapeName(this.migrationContext.DatabaseName),
|
||||
sql.EscapeName(this.migrationContext.GetOldTableName()),
|
||||
sql.EscapeName(this.migrationContext.DatabaseName),
|
||||
sql.EscapeName(this.migrationContext.GetGhostTableName()),
|
||||
sql.EscapeName(this.migrationContext.DatabaseName),
|
||||
sql.EscapeName(this.migrationContext.OriginalTableName),
|
||||
)
|
||||
log.Infof("Issuing and expecting this to succeed: %s", query)
|
||||
if _, err := sqlutils.ExecNoPrepare(this.singletonDB, query); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// SwapTablesQuickAndBumpy issues a two-step swap table operation:
|
||||
// - rename original table to _old
|
||||
// - rename ghost table to original
|
||||
@ -977,62 +1000,6 @@ func (this *Applier) buildDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) (result
|
||||
return append(results, newDmlBuildResultError(fmt.Errorf("Unknown dml event type: %+v", dmlEvent.DML)))
|
||||
}
|
||||
|
||||
// ApplyDMLEventQuery writes an entry to the ghost table, in response to an intercepted
|
||||
// original-table binlog event
|
||||
func (this *Applier) ApplyDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) error {
|
||||
for _, buildResult := range this.buildDMLEventQuery(dmlEvent) {
|
||||
if buildResult.err != nil {
|
||||
return buildResult.err
|
||||
}
|
||||
// TODO The below is in preparation for transactional writes on the ghost tables.
|
||||
// Such writes would be, for example:
|
||||
// - prepended with sql_mode setup
|
||||
// - prepended with time zone setup
|
||||
// - prepended with SET SQL_LOG_BIN=0
|
||||
// - prepended with SET FK_CHECKS=0
|
||||
// etc.
|
||||
//
|
||||
// a known problem: https://github.com/golang/go/issues/9373 -- bitint unsigned values, not supported in database/sql
|
||||
// is solved by silently converting unsigned bigints to string values.
|
||||
//
|
||||
|
||||
err := func() error {
|
||||
tx, err := this.db.Begin()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
rollback := func(err error) error {
|
||||
tx.Rollback()
|
||||
return err
|
||||
}
|
||||
sessionQuery := fmt.Sprintf("SET SESSION time_zone = '+00:00'")
|
||||
if !this.migrationContext.SkipStrictMode {
|
||||
sessionQuery += ", sql_mode = CONCAT(@@session.sql_mode, ',STRICT_ALL_TABLES')"
|
||||
}
|
||||
if _, err := tx.Exec(sessionQuery); err != nil {
|
||||
return rollback(err)
|
||||
}
|
||||
if _, err := tx.Exec(buildResult.query, buildResult.args...); err != nil {
|
||||
return rollback(err)
|
||||
}
|
||||
if err := tx.Commit(); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}()
|
||||
|
||||
if err != nil {
|
||||
err = fmt.Errorf("%s; query=%s; args=%+v", err.Error(), buildResult.query, buildResult.args)
|
||||
return log.Errore(err)
|
||||
}
|
||||
// no error
|
||||
atomic.AddInt64(&this.migrationContext.TotalDMLEventsApplied, 1)
|
||||
if this.migrationContext.CountTableRows {
|
||||
atomic.AddInt64(&this.migrationContext.RowsDeltaEstimate, buildResult.rowsDelta)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// ApplyDMLEventQueries applies multiple DML queries onto the _ghost_ table
|
||||
func (this *Applier) ApplyDMLEventQueries(dmlEvents [](*binlog.BinlogDMLEvent)) error {
|
||||
|
@ -518,19 +518,23 @@ func (this *Migrator) cutOver() (err error) {
|
||||
}
|
||||
}
|
||||
}
|
||||
if this.migrationContext.CutOverType == base.CutOverAtomic {
|
||||
var cutOverFunc func() error
|
||||
if !mysql.IsSmallerMinorVersion(this.migrationContext.ApplierMySQLVersion, "8.0.13") {
|
||||
// This is MySQL 8.0 or above. We can utilize a new ALTER TABLE featiure that supports
|
||||
// RENAME while the table is locked.
|
||||
cutOverFunc = this.cutOverMySQL8013
|
||||
} else if this.migrationContext.CutOverType == base.CutOverAtomic {
|
||||
// Atomic solution: we use low timeout and multiple attempts. But for
|
||||
// each failed attempt, we throttle until replication lag is back to normal
|
||||
err := this.atomicCutOver()
|
||||
this.handleCutOverResult(err)
|
||||
return err
|
||||
cutOverFunc = this.atomicCutOver
|
||||
} else if this.migrationContext.CutOverType == base.CutOverTwoStep {
|
||||
cutOverFunc = this.cutOverTwoStep
|
||||
} else {
|
||||
return log.Fatalf("Unknown cut-over type: %d; should never get here!", this.migrationContext.CutOverType)
|
||||
}
|
||||
if this.migrationContext.CutOverType == base.CutOverTwoStep {
|
||||
err := this.cutOverTwoStep()
|
||||
this.handleCutOverResult(err)
|
||||
return err
|
||||
}
|
||||
return log.Fatalf("Unknown cut-over type: %d; should never get here!", this.migrationContext.CutOverType)
|
||||
err = cutOverFunc()
|
||||
this.handleCutOverResult(err)
|
||||
return err
|
||||
}
|
||||
|
||||
// Inject the "AllEventsUpToLockProcessed" state hint, wait for it to appear in the binary logs,
|
||||
@ -573,6 +577,34 @@ func (this *Migrator) waitForEventsUpToLock() (err error) {
|
||||
return nil
|
||||
}
|
||||
|
||||
// cutOverMySQL8013 utilizes a new deveopment starting MySQL 8.0.13 where RENAME TABLE is
|
||||
// possible where a table is LOCKED under WRITE LOCK.
|
||||
// This feature was developed specifically at the request of the `gh-ost` maintainers.
|
||||
func (this *Migrator) cutOverMySQL8013() (err error) {
|
||||
atomic.StoreInt64(&this.migrationContext.InCutOverCriticalSectionFlag, 1)
|
||||
defer atomic.StoreInt64(&this.migrationContext.InCutOverCriticalSectionFlag, 0)
|
||||
atomic.StoreInt64(&this.migrationContext.AllEventsUpToLockProcessedInjectedFlag, 0)
|
||||
|
||||
if err := this.retryOperation(this.applier.LockOriginalTable); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := this.retryOperation(this.waitForEventsUpToLock); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := this.retryOperation(this.applier.RenameTablesMySQL8); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := this.retryOperation(this.applier.UnlockTables); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
lockAndRenameDuration := this.migrationContext.RenameTablesEndTime.Sub(this.migrationContext.LockTablesStartTime)
|
||||
renameDuration := this.migrationContext.RenameTablesEndTime.Sub(this.migrationContext.RenameTablesStartTime)
|
||||
log.Debugf("Lock & rename duration: %s (rename only: %s). During this time, queries on %s were locked or failing", lockAndRenameDuration, renameDuration, sql.EscapeName(this.migrationContext.OriginalTableName))
|
||||
return nil
|
||||
}
|
||||
|
||||
// cutOverTwoStep will lock down the original table, execute
|
||||
// what's left of last DML entries, and **non-atomically** swap original->old, then new->original.
|
||||
// There is a point in time where the "original" table does not exist and queries are non-blocked
|
||||
|
@ -144,7 +144,7 @@ func (this *Server) applyServerCommand(command string, writer *bufio.Writer) (pr
|
||||
switch command {
|
||||
case "help":
|
||||
{
|
||||
fmt.Fprintln(writer, `available commands:
|
||||
fmt.Fprint(writer, `available commands:
|
||||
status # Print a detailed status message
|
||||
sup # Print a short status message
|
||||
coordinates # Print the currently inspected coordinates
|
||||
|
@ -8,6 +8,7 @@ package mysql
|
||||
import (
|
||||
gosql "database/sql"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
@ -203,3 +204,47 @@ func GetTableColumns(db *gosql.DB, databaseName, tableName string) (*sql.ColumnL
|
||||
}
|
||||
return sql.NewColumnList(columnNames), sql.NewColumnList(virtualColumnNames), nil
|
||||
}
|
||||
|
||||
func versionTokens(version string, digits int) []int {
|
||||
v := strings.Split(version, "-")[0]
|
||||
tokens := strings.Split(v, ".")
|
||||
intTokens := make([]int, digits)
|
||||
for i := range tokens {
|
||||
if i >= digits {
|
||||
break
|
||||
}
|
||||
intTokens[i], _ = strconv.Atoi(tokens[i])
|
||||
}
|
||||
return intTokens
|
||||
}
|
||||
|
||||
func isSmallerVersion(version string, otherVersion string, digits int) bool {
|
||||
v := versionTokens(version, digits)
|
||||
o := versionTokens(otherVersion, digits)
|
||||
for i := 0; i < len(v); i++ {
|
||||
if v[i] < o[i] {
|
||||
return true
|
||||
}
|
||||
if v[i] > o[i] {
|
||||
return false
|
||||
}
|
||||
if i == digits {
|
||||
break
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// IsSmallerMajorVersion tests two versions against another and returns true if
|
||||
// the former is a smaller "major" varsion than the latter.
|
||||
// e.g. 5.5.36 is NOT a smaller major version as comapred to 5.5.40, but IS as compared to 5.6.9
|
||||
func IsSmallerMajorVersion(version string, otherVersion string) bool {
|
||||
return isSmallerVersion(version, otherVersion, 2)
|
||||
}
|
||||
|
||||
// IsSmallerMinorVersion tests two versions against another and returns true if
|
||||
// the former is a smaller "minor" varsion than the latter.
|
||||
// e.g. 5.5.36 is a smaller major version as comapred to 5.5.40, as well as compared to 5.6.7
|
||||
func IsSmallerMinorVersion(version string, otherVersion string) bool {
|
||||
return isSmallerVersion(version, otherVersion, 3)
|
||||
}
|
||||
|
56
go/mysql/utils_test.go
Normal file
56
go/mysql/utils_test.go
Normal file
@ -0,0 +1,56 @@
|
||||
/*
|
||||
Copyright 2016 GitHub Inc.
|
||||
See https://github.com/github/gh-ost/blob/master/LICENSE
|
||||
*/
|
||||
|
||||
package mysql
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
"github.com/outbrain/golib/log"
|
||||
test "github.com/outbrain/golib/tests"
|
||||
)
|
||||
|
||||
func init() {
|
||||
log.SetLevel(log.ERROR)
|
||||
}
|
||||
|
||||
func TestVersionTokens(t *testing.T) {
|
||||
test.S(t).ExpectTrue(reflect.DeepEqual(versionTokens("5.7.24-log", 3), []int{5, 7, 24}))
|
||||
test.S(t).ExpectTrue(reflect.DeepEqual(versionTokens("8.0.13", 3), []int{8, 0, 13}))
|
||||
test.S(t).ExpectTrue(reflect.DeepEqual(versionTokens("5.5", 2), []int{5, 5}))
|
||||
test.S(t).ExpectTrue(reflect.DeepEqual(versionTokens("5.5", 3), []int{5, 5, 0}))
|
||||
test.S(t).ExpectTrue(reflect.DeepEqual(versionTokens("5.5-log", 3), []int{5, 5, 0}))
|
||||
}
|
||||
|
||||
func TestIsSmallerMajorVersion(t *testing.T) {
|
||||
i55 := "5.5"
|
||||
i5516 := "5.5.16"
|
||||
i5517 := "5.5.17"
|
||||
i56 := "5.6"
|
||||
i8013 := "8.0.13"
|
||||
|
||||
test.S(t).ExpectFalse(IsSmallerMajorVersion(i55, i5517))
|
||||
test.S(t).ExpectFalse(IsSmallerMajorVersion(i5516, i5517))
|
||||
test.S(t).ExpectFalse(IsSmallerMajorVersion(i56, i5517))
|
||||
test.S(t).ExpectTrue(IsSmallerMajorVersion(i55, i56))
|
||||
test.S(t).ExpectTrue(IsSmallerMajorVersion(i56, i8013))
|
||||
test.S(t).ExpectFalse(IsSmallerMajorVersion(i8013, i56))
|
||||
}
|
||||
|
||||
func TestIsSmallerMinorVersion(t *testing.T) {
|
||||
i55 := "5.5"
|
||||
i5516 := "5.5.16"
|
||||
i5517 := "5.5.17"
|
||||
i56 := "5.6"
|
||||
i8013 := "8.0.13"
|
||||
|
||||
test.S(t).ExpectTrue(IsSmallerMinorVersion(i55, i5517))
|
||||
test.S(t).ExpectTrue(IsSmallerMinorVersion(i5516, i5517))
|
||||
test.S(t).ExpectFalse(IsSmallerMinorVersion(i56, i5517))
|
||||
test.S(t).ExpectTrue(IsSmallerMinorVersion(i55, i56))
|
||||
test.S(t).ExpectTrue(IsSmallerMinorVersion(i56, i8013))
|
||||
test.S(t).ExpectFalse(IsSmallerMinorVersion(i8013, i56))
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user