- --cut-over
no longer mandatory; default to safe
- Removed `CutOverVoluntaryLock` and associated code - Removed `CutOverUdfWait` - `RenameTablesRollback()` first attempts an atomic swap
This commit is contained in:
parent
cb1c61ac47
commit
97adbf1ff8
2
build.sh
2
build.sh
@ -1,7 +1,7 @@
|
|||||||
#!/bin/bash
|
#!/bin/bash
|
||||||
#
|
#
|
||||||
#
|
#
|
||||||
RELEASE_VERSION="0.8.9"
|
RELEASE_VERSION="0.9.0"
|
||||||
|
|
||||||
buildpath=/tmp/gh-ost
|
buildpath=/tmp/gh-ost
|
||||||
target=gh-ost
|
target=gh-ost
|
||||||
|
@ -31,9 +31,8 @@ const (
|
|||||||
type CutOver int
|
type CutOver int
|
||||||
|
|
||||||
const (
|
const (
|
||||||
CutOverTwoStep CutOver = iota
|
CutOverSafe CutOver = iota
|
||||||
CutOverVoluntaryLock = iota
|
CutOverTwoStep = iota
|
||||||
CutOverUdfWait = iota
|
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
@ -128,12 +128,10 @@ func main() {
|
|||||||
log.Fatalf("--allow-on-master and --test-on-replica are mutually exclusive")
|
log.Fatalf("--allow-on-master and --test-on-replica are mutually exclusive")
|
||||||
}
|
}
|
||||||
switch *cutOver {
|
switch *cutOver {
|
||||||
|
case "safe", "default", "":
|
||||||
|
migrationContext.CutOverType = base.CutOverSafe
|
||||||
case "two-step":
|
case "two-step":
|
||||||
migrationContext.CutOverType = base.CutOverTwoStep
|
migrationContext.CutOverType = base.CutOverTwoStep
|
||||||
case "voluntary-lock":
|
|
||||||
migrationContext.CutOverType = base.CutOverVoluntaryLock
|
|
||||||
case "":
|
|
||||||
log.Fatalf("--cut-over must be specified")
|
|
||||||
default:
|
default:
|
||||||
log.Fatalf("Unknown cut-over: %s", *cutOver)
|
log.Fatalf("Unknown cut-over: %s", *cutOver)
|
||||||
}
|
}
|
||||||
|
@ -504,51 +504,25 @@ func (this *Applier) RenameTable(fromName, toName string) (err error) {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// SwapTablesAtomic issues a single two-table RENAME statement to swap ghost table
|
func (this *Applier) RenameTablesRollback() (renameError error) {
|
||||||
// into original's place
|
// Restoring tables to original names.
|
||||||
func (this *Applier) SwapTablesAtomic(sessionIdChan chan int64) error {
|
// We prefer the single, atomic operation:
|
||||||
|
query := fmt.Sprintf(`rename /* gh-ost */ table %s.%s to %s.%s, %s.%s to %s.%s`,
|
||||||
tx, err := this.db.Begin()
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
log.Infof("Setting timeout for RENAME for %d seconds", this.migrationContext.SwapTablesTimeoutSeconds)
|
|
||||||
query := fmt.Sprintf(`set session lock_wait_timeout:=%d`, this.migrationContext.SwapTablesTimeoutSeconds)
|
|
||||||
if _, err := tx.Exec(query); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
var sessionId int64
|
|
||||||
if err := tx.QueryRow(`select connection_id()`).Scan(&sessionId); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
sessionIdChan <- sessionId
|
|
||||||
|
|
||||||
query = fmt.Sprintf(`rename /* gh-ost */ table %s.%s to %s.%s, %s.%s to %s.%s`,
|
|
||||||
sql.EscapeName(this.migrationContext.DatabaseName),
|
sql.EscapeName(this.migrationContext.DatabaseName),
|
||||||
sql.EscapeName(this.migrationContext.OriginalTableName),
|
sql.EscapeName(this.migrationContext.OriginalTableName),
|
||||||
sql.EscapeName(this.migrationContext.DatabaseName),
|
sql.EscapeName(this.migrationContext.DatabaseName),
|
||||||
sql.EscapeName(this.migrationContext.GetOldTableName()),
|
|
||||||
sql.EscapeName(this.migrationContext.DatabaseName),
|
|
||||||
sql.EscapeName(this.migrationContext.GetGhostTableName()),
|
sql.EscapeName(this.migrationContext.GetGhostTableName()),
|
||||||
sql.EscapeName(this.migrationContext.DatabaseName),
|
sql.EscapeName(this.migrationContext.DatabaseName),
|
||||||
|
sql.EscapeName(this.migrationContext.GetOldTableName()),
|
||||||
|
sql.EscapeName(this.migrationContext.DatabaseName),
|
||||||
sql.EscapeName(this.migrationContext.OriginalTableName),
|
sql.EscapeName(this.migrationContext.OriginalTableName),
|
||||||
)
|
)
|
||||||
log.Infof("Renaming tables")
|
log.Infof("Renaming back both tables")
|
||||||
|
if _, err := sqlutils.ExecNoPrepare(this.db, query); err == nil {
|
||||||
this.migrationContext.RenameTablesStartTime = time.Now()
|
|
||||||
if _, err := tx.Exec(query); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
this.migrationContext.RenameTablesEndTime = time.Now()
|
|
||||||
tx.Commit()
|
|
||||||
log.Infof("Tables renamed")
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
// But, if for some reason the above was impossible to do, we rename one by one.
|
||||||
func (this *Applier) RenameTablesRollback() (renameError error) {
|
query = fmt.Sprintf(`rename /* gh-ost */ table %s.%s to %s.%s`,
|
||||||
|
|
||||||
query := fmt.Sprintf(`rename /* gh-ost */ table %s.%s to %s.%s`,
|
|
||||||
sql.EscapeName(this.migrationContext.DatabaseName),
|
sql.EscapeName(this.migrationContext.DatabaseName),
|
||||||
sql.EscapeName(this.migrationContext.OriginalTableName),
|
sql.EscapeName(this.migrationContext.OriginalTableName),
|
||||||
sql.EscapeName(this.migrationContext.DatabaseName),
|
sql.EscapeName(this.migrationContext.DatabaseName),
|
||||||
@ -573,7 +547,7 @@ func (this *Applier) RenameTablesRollback() (renameError error) {
|
|||||||
|
|
||||||
// StopSlaveIOThread is applicable with --test-on-replica; it stops the IO thread, duh.
|
// StopSlaveIOThread is applicable with --test-on-replica; it stops the IO thread, duh.
|
||||||
// We need to keep the SQL thread active so as to complete processing received events,
|
// We need to keep the SQL thread active so as to complete processing received events,
|
||||||
// and have them written to the binary log, so that we can then read them via streamer
|
// and have them written to the binary log, so that we can then read them via streamer.
|
||||||
func (this *Applier) StopSlaveIOThread() error {
|
func (this *Applier) StopSlaveIOThread() error {
|
||||||
query := `stop /* gh-ost */ slave io_thread`
|
query := `stop /* gh-ost */ slave io_thread`
|
||||||
log.Infof("Stopping replication")
|
log.Infof("Stopping replication")
|
||||||
@ -595,18 +569,6 @@ func (this *Applier) StartSlaveSQLThread() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// MasterPosWait is applicable with --test-on-replica
|
|
||||||
func (this *Applier) MasterPosWait(binlogCoordinates *mysql.BinlogCoordinates) error {
|
|
||||||
var appliedRows int64
|
|
||||||
if err := this.db.QueryRow(`select master_pos_wait(?, ?, ?)`, binlogCoordinates.LogFile, binlogCoordinates.LogPos, 3).Scan(&appliedRows); err != nil {
|
|
||||||
return log.Errore(err)
|
|
||||||
}
|
|
||||||
if appliedRows < 0 {
|
|
||||||
return fmt.Errorf("Timeout waiting on master_pos_wait()")
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (this *Applier) StopSlaveNicely() error {
|
func (this *Applier) StopSlaveNicely() error {
|
||||||
if err := this.StopSlaveIOThread(); err != nil {
|
if err := this.StopSlaveIOThread(); err != nil {
|
||||||
return err
|
return err
|
||||||
@ -760,92 +722,6 @@ func (this *Applier) RenameGhostTable(sessionIdChan chan int64, ghostTableRename
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// GrabVoluntaryLock gets a named lock (`GET_LOCK`) and listens
|
|
||||||
// on a okToRelease in order to release it
|
|
||||||
func (this *Applier) GrabVoluntaryLock(lockGrabbed chan<- error, okToRelease <-chan bool) error {
|
|
||||||
lockName := this.migrationContext.GetVoluntaryLockName()
|
|
||||||
|
|
||||||
tx, err := this.db.Begin()
|
|
||||||
if err != nil {
|
|
||||||
lockGrabbed <- err
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
// Grab
|
|
||||||
query := `select get_lock(?, 0)`
|
|
||||||
lockResult := 0
|
|
||||||
log.Infof("Grabbing voluntary lock: %s", lockName)
|
|
||||||
if err := tx.QueryRow(query, lockName).Scan(&lockResult); err != nil {
|
|
||||||
lockGrabbed <- err
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if lockResult != 1 {
|
|
||||||
err := fmt.Errorf("Lock was not acquired")
|
|
||||||
lockGrabbed <- err
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
log.Infof("Voluntary lock grabbed")
|
|
||||||
lockGrabbed <- nil // No error.
|
|
||||||
|
|
||||||
// Listeners on the above will proceed to submit the "all queries till lock have been found"
|
|
||||||
// We will wait here till we're told to. This will happen once all DML events up till lock
|
|
||||||
// have been appleid on the ghost table
|
|
||||||
<-okToRelease
|
|
||||||
// Release
|
|
||||||
query = `select ifnull(release_lock(?),0)`
|
|
||||||
log.Infof("Releasing voluntary lock")
|
|
||||||
if err := tx.QueryRow(query, lockName).Scan(&lockResult); err != nil {
|
|
||||||
return log.Errore(err)
|
|
||||||
}
|
|
||||||
if lockResult != 1 {
|
|
||||||
// Generally speaking we should never get this.
|
|
||||||
return log.Errorf("release_lock result was %+v", lockResult)
|
|
||||||
}
|
|
||||||
tx.Rollback()
|
|
||||||
|
|
||||||
log.Infof("Voluntary lock released")
|
|
||||||
return nil
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
// IssueBlockingQueryOnVoluntaryLock will SELECT on the original table using a
|
|
||||||
// conditional on a known to be occupied lock. This query is expected to block,
|
|
||||||
// and will further block the followup RENAME statement
|
|
||||||
func (this *Applier) IssueBlockingQueryOnVoluntaryLock(sessionIdChan chan int64) error {
|
|
||||||
lockName := this.migrationContext.GetVoluntaryLockName()
|
|
||||||
|
|
||||||
tx, err := this.db.Begin()
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
var sessionId int64
|
|
||||||
if err := tx.QueryRow(`select connection_id()`).Scan(&sessionId); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
sessionIdChan <- sessionId
|
|
||||||
|
|
||||||
// Grab
|
|
||||||
query := fmt.Sprintf(`
|
|
||||||
select /* gh-ost blocking-query-%s */
|
|
||||||
release_lock(?)
|
|
||||||
from %s.%s
|
|
||||||
where
|
|
||||||
get_lock(?, 86400) >= 0
|
|
||||||
limit 1
|
|
||||||
`,
|
|
||||||
lockName,
|
|
||||||
sql.EscapeName(this.migrationContext.DatabaseName),
|
|
||||||
sql.EscapeName(this.migrationContext.OriginalTableName),
|
|
||||||
)
|
|
||||||
|
|
||||||
dummyResult := 0
|
|
||||||
log.Infof("Issuing blocking query")
|
|
||||||
this.migrationContext.LockTablesStartTime = time.Now()
|
|
||||||
tx.QueryRow(query, lockName, lockName).Scan(&dummyResult)
|
|
||||||
tx.Rollback()
|
|
||||||
log.Infof("Blocking query released")
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (this *Applier) ExpectUsedLock(sessionId int64) error {
|
func (this *Applier) ExpectUsedLock(sessionId int64) error {
|
||||||
var result int64
|
var result int64
|
||||||
query := `select is_used_lock(?)`
|
query := `select is_used_lock(?)`
|
||||||
|
Loading…
Reference in New Issue
Block a user