Merge pull request #21 from github/lossless-swap-tables

Lossless, atomic swap tables
This commit is contained in:
Shlomi Noach 2016-05-01 21:32:46 +03:00
commit 128658579a
4 changed files with 303 additions and 46 deletions

View File

@ -46,10 +46,12 @@ type MigrationContext struct {
ThrottleFlagFile string
ThrottleAdditionalFlagFile string
MaxLoad map[string]int64
SwapTablesTimeoutSeconds int64
Noop bool
TestOnReplica bool
OkToDropTable bool
Noop bool
TestOnReplica bool
OkToDropTable bool
QuickAndBumpySwapTables bool
TableEngine string
RowsEstimate int64
@ -97,8 +99,9 @@ func newMigrationContext() *MigrationContext {
InspectorConnectionConfig: mysql.NewConnectionConfig(),
ApplierConnectionConfig: mysql.NewConnectionConfig(),
MaxLagMillisecondsThrottleThreshold: 1000,
MaxLoad: make(map[string]int64),
throttleMutex: &sync.Mutex{},
SwapTablesTimeoutSeconds: 3,
MaxLoad: make(map[string]int64),
throttleMutex: &sync.Mutex{},
}
}
@ -122,6 +125,12 @@ func (this *MigrationContext) GetChangelogTableName() string {
return fmt.Sprintf("_%s_OSC", this.OriginalTableName)
}
// GetVoluntaryLockName returns a name of a voluntary lock to be used throughout
// the swap-tables process.
func (this *MigrationContext) GetVoluntaryLockName() string {
return fmt.Sprintf("%s.%s.lock", this.DatabaseName, this.OriginalTableName)
}
// RequiresBinlogFormatChange is `true` when the original binlog format isn't `ROW`
func (this *MigrationContext) RequiresBinlogFormatChange() bool {
return this.OriginalBinlogFormat != "ROW"

View File

@ -33,6 +33,7 @@ func main() {
executeFlag := flag.Bool("execute", false, "actually execute the alter & migrate the table. Default is noop: do some tests and exit")
flag.BoolVar(&migrationContext.TestOnReplica, "test-on-replica", false, "Have the migration run on a replica, not on the master. At the end of migration tables are not swapped; gh-osc issues `STOP SLAVE` and you can compare the two tables for building trust")
flag.BoolVar(&migrationContext.OkToDropTable, "ok-to-drop-table", false, "Shall the tool drop the old table at end of operation. DROPping tables can be a long locking operation, which is why I'm not doing it by default. I'm an online tool, yes?")
flag.BoolVar(&migrationContext.QuickAndBumpySwapTables, "quick-and-bumpy-swap-tables", false, "Shall the tool issue a faster swapping of tables at end of operation, at the cost of causing a brief period of time when the table does not exist? This will cause queries on table to fail with error (as opposed to being locked for a longer duration of a swap)")
flag.BoolVar(&migrationContext.SwitchToRowBinlogFormat, "switch-to-rbr", false, "let this tool automatically switch binary log format to 'ROW' on the replica, if needed. The format will NOT be switched back. I'm too scared to do that, and wish to protect you if you happen to execute another migration while this one is running")
flag.Int64Var(&migrationContext.ChunkSize, "chunk-size", 1000, "amount of rows to handle in each iteration (allowed range: 100-100,000)")
@ -87,6 +88,9 @@ func main() {
if migrationContext.AllowedRunningOnMaster && migrationContext.TestOnReplica {
log.Fatalf("--allow-on-master and --test-on-replica are mutually exclusive")
}
if migrationContext.QuickAndBumpySwapTables && migrationContext.TestOnReplica {
log.Fatalf("--quick-and-bumpy-swap-tables and --test-on-replica are mutually exclusive (the former implies migrating on master)")
}
if err := migrationContext.ReadMaxLoad(*maxLoad); err != nil {
log.Fatale(err)
}

View File

@ -427,22 +427,8 @@ func (this *Applier) UnlockTables() error {
return nil
}
// LockTables
func (this *Applier) SwapTables() error {
// query := fmt.Sprintf(`rename /* gh-osc */ table %s.%s to %s.%s, %s.%s to %s.%s`,
// sql.EscapeName(this.migrationContext.DatabaseName),
// sql.EscapeName(this.migrationContext.OriginalTableName),
// sql.EscapeName(this.migrationContext.DatabaseName),
// sql.EscapeName(this.migrationContext.GetOldTableName()),
// sql.EscapeName(this.migrationContext.DatabaseName),
// sql.EscapeName(this.migrationContext.GetGhostTableName()),
// sql.EscapeName(this.migrationContext.DatabaseName),
// sql.EscapeName(this.migrationContext.OriginalTableName),
// )
// log.Infof("Renaming tables")
// if _, err := sqlutils.ExecNoPrepare(this.singletonDB, query); err != nil {
// return err
// }
// SwapTablesQuickAndBumpy
func (this *Applier) SwapTablesQuickAndBumpy() error {
query := fmt.Sprintf(`alter /* gh-osc */ table %s.%s rename %s`,
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.OriginalTableName),
@ -468,7 +454,51 @@ func (this *Applier) SwapTables() error {
return nil
}
// StopSlaveIOThread is applicable with --test-on-replica; it stops the IO thread
// SwapTablesAtomic issues a single two-table RENAME statement to swap ghost table
// into original's place
func (this *Applier) SwapTablesAtomic(sessionIdChan chan int64) error {
tx, err := this.db.Begin()
if err != nil {
return err
}
log.Infof("Setting timeout for RENAME for %d seconds", this.migrationContext.SwapTablesTimeoutSeconds)
query := fmt.Sprintf(`set session lock_wait_timeout:=%d`, this.migrationContext.SwapTablesTimeoutSeconds)
if _, err := tx.Exec(query); err != nil {
return err
}
var sessionId int64
if err := tx.QueryRow(`select connection_id()`).Scan(&sessionId); err != nil {
return err
}
sessionIdChan <- sessionId
query = fmt.Sprintf(`rename /* gh-osc */ table %s.%s to %s.%s, %s.%s to %s.%s`,
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.OriginalTableName),
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.GetOldTableName()),
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.GetGhostTableName()),
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.OriginalTableName),
)
log.Infof("Renaming tables")
this.migrationContext.RenameTablesStartTime = time.Now()
if _, err := tx.Exec(query); err != nil {
return err
}
this.migrationContext.RenameTablesEndTime = time.Now()
tx.Commit()
log.Infof("Tables renamed")
return nil
}
// StopSlaveIOThread is applicable with --test-on-replica; it stops the IO thread, duh.
// We need to keep the SQL thread active so as to complete processing received events,
// and have them written to the binary log, so that we can then read them via streamer
func (this *Applier) StopSlaveIOThread() error {
query := `stop /* gh-osc */ slave io_thread`
log.Infof("Stopping replication")
@ -479,6 +509,116 @@ func (this *Applier) StopSlaveIOThread() error {
return nil
}
// GrabVoluntaryLock gets a named lock (`GET_LOCK`) and listens
// on a okToRelease in order to release it
func (this *Applier) GrabVoluntaryLock(lockGrabbed chan<- error, okToRelease <-chan bool) error {
lockName := this.migrationContext.GetVoluntaryLockName()
tx, err := this.db.Begin()
if err != nil {
lockGrabbed <- err
return err
}
// Grab
query := `select get_lock(?, 0)`
lockResult := 0
log.Infof("Grabbing voluntary lock: %s", lockName)
if err := tx.QueryRow(query, lockName).Scan(&lockResult); err != nil {
lockGrabbed <- err
return err
}
if lockResult != 1 {
err := fmt.Errorf("Lock was not acquired")
lockGrabbed <- err
return err
}
log.Infof("Voluntary lock grabbed")
lockGrabbed <- nil // No error.
// Listeners on the above will proceed to submit the "all queries till lock have been found"
// We will wait here till we're told to. This will happen once all DML events up till lock
// have been appleid on the ghost table
<-okToRelease
// Release
query = `select ifnull(release_lock(?),0)`
log.Infof("Releasing voluntary lock")
if err := tx.QueryRow(query, lockName).Scan(&lockResult); err != nil {
return log.Errore(err)
}
if lockResult != 1 {
// Generally speaking we should never get this.
return log.Errorf("release_lock result was %+v", lockResult)
}
tx.Rollback()
log.Infof("Voluntary lock released")
return nil
}
// IssueBlockingQueryOnVoluntaryLock will SELECT on the original table using a
// conditional on a known to be occupied lock. This query is expected to block,
// and will further block the followup RENAME statement
func (this *Applier) IssueBlockingQueryOnVoluntaryLock(sessionIdChan chan int64) error {
lockName := this.migrationContext.GetVoluntaryLockName()
tx, err := this.db.Begin()
if err != nil {
return err
}
var sessionId int64
if err := tx.QueryRow(`select connection_id()`).Scan(&sessionId); err != nil {
return err
}
sessionIdChan <- sessionId
// Grab
query := fmt.Sprintf(`
select /* gh-osc blocking-query-%s */
release_lock(?)
from %s.%s
where
get_lock(?, 86400) >= 0
limit 1
`,
lockName,
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.OriginalTableName),
)
dummyResult := 0
log.Infof("Issuing blocking query")
this.migrationContext.LockTablesStartTime = time.Now()
tx.QueryRow(query, lockName, lockName).Scan(&dummyResult)
tx.Rollback()
log.Infof("Blocking query released")
return nil
}
func (this *Applier) ExpectProcess(sessionId int64, stateHint, infoHint string) error {
found := false
query := `
select id
from information_schema.processlist
where
id != connection_id()
and ? in (0, id)
and state like concat('%', ?, '%')
and info like concat('%', ?, '%')
`
err := sqlutils.QueryRowsMap(this.db, query, func(m sqlutils.RowMap) error {
found = true
return nil
}, sessionId, stateHint, infoHint)
if err != nil {
return err
}
if !found {
return fmt.Errorf("Cannot find process. Hints: %s, %s", stateHint, infoHint)
}
return nil
}
func (this *Applier) ShowStatusVariable(variableName string) (result int64, err error) {
query := fmt.Sprintf(`show global status like '%s'`, variableName)
if err := this.db.QueryRow(query).Scan(&variableName, &result); err != nil {

View File

@ -44,6 +44,7 @@ type Migrator struct {
tablesInPlace chan bool
rowCopyComplete chan bool
allEventsUpToLockProcessed chan bool
voluntaryLockAcquired chan bool
panicAbort chan error
// copyRowsQueue should not be buffered; if buffered some non-damaging but
@ -60,6 +61,7 @@ func NewMigrator() *Migrator {
tablesInPlace: make(chan bool),
rowCopyComplete: make(chan bool),
allEventsUpToLockProcessed: make(chan bool),
voluntaryLockAcquired: make(chan bool, 1),
panicAbort: make(chan error),
copyRowsQueue: make(chan tableWriteFunc),
@ -175,6 +177,16 @@ func (this *Migrator) retryOperation(operation func() error) (err error) {
return err
}
// executeAndThrottleOnError executes a given function. If it errors, it
// throttles.
func (this *Migrator) executeAndThrottleOnError(operation func() error) (err error) {
if err := operation(); err != nil {
this.throttle(nil)
return err
}
return nil
}
func (this *Migrator) canStopStreaming() bool {
return false
}
@ -288,7 +300,9 @@ func (this *Migrator) Migrate() (err error) {
log.Debugf("Row copy complete")
this.printStatus()
this.stopWritesAndCompleteMigration()
if err := this.stopWritesAndCompleteMigration(); err != nil {
return err
}
return nil
}
@ -299,48 +313,138 @@ func (this *Migrator) stopWritesAndCompleteMigration() (err error) {
return nil
}
this.throttle(func() {
log.Debugf("throttling before LOCK TABLES")
log.Debugf("throttling before swapping tables")
})
if this.migrationContext.TestOnReplica {
log.Debugf("testing on replica. Instead of LOCK tables I will STOP SLAVE")
if err := this.retryOperation(this.applier.StopSlaveIOThread); err != nil {
return err
}
} else {
if err := this.retryOperation(this.applier.LockTables); err != nil {
return err
}
return this.stopWritesAndCompleteMigrationOnReplica()
}
// Running on master
if this.migrationContext.QuickAndBumpySwapTables {
return this.stopWritesAndCompleteMigrationOnMasterQuickAndBumpy()
}
// Lock-based solution: we use low timeout and multiple attempts. But for
// each failed attempt, we throttle until replication lag is back to normal
if err := this.retryOperation(
func() error {
return this.executeAndThrottleOnError(this.stopWritesAndCompleteMigrationOnMasterViaLock)
}); err != nil {
return err
}
return
}
func (this *Migrator) stopWritesAndCompleteMigrationOnMasterQuickAndBumpy() (err error) {
if err := this.retryOperation(this.applier.LockTables); err != nil {
return err
}
this.applier.WriteChangelogState(string(AllEventsUpToLockProcessed))
log.Debugf("Waiting for events up to lock")
<-this.allEventsUpToLockProcessed
log.Debugf("Done waiting for events up to lock")
if this.migrationContext.TestOnReplica {
log.Info("Table duplicated with new schema. Am not touching the original table. You may now compare the two tables to gain trust into this tool's operation")
} else {
if err := this.retryOperation(this.applier.SwapTables); err != nil {
return err
if err := this.retryOperation(this.applier.SwapTablesQuickAndBumpy); err != nil {
return err
}
if err := this.retryOperation(this.applier.UnlockTables); err != nil {
return err
}
if this.migrationContext.OkToDropTable {
dropTableFunc := func() error {
return this.applier.dropTable(this.migrationContext.GetOldTableName())
}
if err := this.retryOperation(this.applier.UnlockTables); err != nil {
if err := this.retryOperation(dropTableFunc); err != nil {
return err
}
if this.migrationContext.OkToDropTable {
dropTableFunc := func() error {
return this.applier.dropTable(this.migrationContext.GetOldTableName())
}
if err := this.retryOperation(dropTableFunc); err != nil {
return err
}
}
}
lockAndRenameDuration := this.migrationContext.RenameTablesEndTime.Sub(this.migrationContext.LockTablesStartTime)
renameDuration := this.migrationContext.RenameTablesEndTime.Sub(this.migrationContext.RenameTablesStartTime)
log.Debugf("Lock & rename duration: %s (rename only: %s). During this time, queries on %s were locked or failing", lockAndRenameDuration, renameDuration, sql.EscapeName(this.migrationContext.OriginalTableName))
return nil
}
func (this *Migrator) stopWritesAndCompleteMigrationOnMasterViaLock() (err error) {
lockGrabbed := make(chan error, 1)
okToReleaseLock := make(chan bool, 1)
swapResult := make(chan error, 1)
go func() {
if err := this.applier.GrabVoluntaryLock(lockGrabbed, okToReleaseLock); err != nil {
log.Errore(err)
}
}()
if err := <-lockGrabbed; err != nil {
return log.Errore(err)
}
blockingQuerySessionIdChan := make(chan int64, 1)
go func() {
this.applier.IssueBlockingQueryOnVoluntaryLock(blockingQuerySessionIdChan)
}()
blockingQuerySessionId := <-blockingQuerySessionIdChan
log.Infof("Intentional blocking query connection id is %+v", blockingQuerySessionId)
if err := this.retryOperation(
func() error {
return this.applier.ExpectProcess(blockingQuerySessionId, "User lock", this.migrationContext.GetVoluntaryLockName())
}); err != nil {
return err
}
log.Infof("Found blocking query to be executing")
swapSessionIdChan := make(chan int64, 1)
go func() {
swapResult <- this.applier.SwapTablesAtomic(swapSessionIdChan)
}()
swapSessionId := <-swapSessionIdChan
log.Infof("RENAME connection id is %+v", swapSessionId)
if err := this.retryOperation(
func() error {
return this.applier.ExpectProcess(swapSessionId, "metadata lock", "rename")
}); err != nil {
return err
}
log.Infof("Found RENAME to be executing")
// OK, at this time we know any newly incoming DML on original table is blocked.
this.applier.WriteChangelogState(string(AllEventsUpToLockProcessed))
log.Debugf("Waiting for events up to lock")
<-this.allEventsUpToLockProcessed
log.Debugf("Done waiting for events up to lock")
okToReleaseLock <- true
// BAM: voluntary lock is released, blocking query is released, rename is released.
// We now check RENAME result. We have lock_wait_timeout. We put it on purpose, to avoid
// locking the tables for too long. If lock time exceeds said timeout, the RENAME fails
// and returns a non-nil error, in which case tables have not been swapped, and we are
// not really done. We are, however, good to go for more retries.
if err := <-swapResult; err != nil {
// Bummer. We shall rest a while and try again
return err
}
// ooh nice! We're actually truly and thankfully done
lockAndRenameDuration := this.migrationContext.RenameTablesEndTime.Sub(this.migrationContext.LockTablesStartTime)
renameDuration := this.migrationContext.RenameTablesEndTime.Sub(this.migrationContext.RenameTablesStartTime)
log.Debugf("Lock & rename duration: %s. Of this, rename time was %s. During rename time, queries on %s were blocked", lockAndRenameDuration, renameDuration, sql.EscapeName(this.migrationContext.OriginalTableName))
return nil
}
func (this *Migrator) stopWritesAndCompleteMigrationOnReplica() (err error) {
log.Debugf("testing on replica. Instead of LOCK tables I will STOP SLAVE")
if err := this.retryOperation(this.applier.StopSlaveIOThread); err != nil {
return err
}
this.applier.WriteChangelogState(string(AllEventsUpToLockProcessed))
log.Debugf("Waiting for events up to lock")
<-this.allEventsUpToLockProcessed
log.Debugf("Done waiting for events up to lock")
log.Info("Table duplicated with new schema. Am not touching the original table. Replication is stopped. You may now compare the two tables to gain trust into this tool's operation")
return nil
}
func (this *Migrator) initiateInspector() (err error) {
this.inspector = NewInspector()
if err := this.inspector.InitDBConnections(); err != nil {