hook names; added on-stop-replication hook

This commit is contained in:
Shlomi Noach 2016-08-23 11:35:48 +02:00
parent 972728cf40
commit 1c2a77ef95
4 changed files with 55 additions and 20 deletions

View File

@ -92,6 +92,7 @@ type MigrationContext struct {
InitiallyDropGhostTable bool InitiallyDropGhostTable bool
CutOverType CutOver CutOverType CutOver
Hostname string
TableEngine string TableEngine string
RowsEstimate int64 RowsEstimate int64
UsedRowsEstimateMethod RowsEstimateMethod UsedRowsEstimateMethod RowsEstimateMethod

View File

@ -189,6 +189,7 @@ func main() {
migrator := logic.NewMigrator() migrator := logic.NewMigrator()
err := migrator.Migrate() err := migrator.Migrate()
if err != nil { if err != nil {
migrator.ExecOnFailureHook()
log.Fatale(err) log.Fatale(err)
} }
log.Info("Done") log.Info("Done")

View File

@ -15,6 +15,20 @@ import (
"github.com/openark/golib/log" "github.com/openark/golib/log"
) )
const (
onStartup = "gh-ost-on-startup"
onValidated = "gh-ost-on-validated"
onAboutToRowCopy = "gh-ost-on-about-row-copy"
onRowCopyComplete = "gh-ost-on-row-copy-complete"
onBeginPostponed = "gh-ost-on-begin-postponed"
onAboutToCutOver = "gh-ost-on-about-cut-over"
onInteractiveCommand = "gh-ost-on-interactive-command"
onSuccess = "gh-ost-on-success"
onFailure = "gh-ost-on-failure"
onStatus = "gh-ost-on-status"
onStopReplication = "gh-ost-on-stop-replication"
)
type HooksExecutor struct { type HooksExecutor struct {
migrationContext *base.MigrationContext migrationContext *base.MigrationContext
} }
@ -29,7 +43,7 @@ func (this *HooksExecutor) initHooks() error {
return nil return nil
} }
func (this *HooksExecutor) applyEnvironmentVairables() []string { func (this *HooksExecutor) applyEnvironmentVairables(extraVariables ...string) []string {
env := os.Environ() env := os.Environ()
env = append(env, fmt.Sprintf("GH_OST_DATABASE_NAME=%s", this.migrationContext.DatabaseName)) env = append(env, fmt.Sprintf("GH_OST_DATABASE_NAME=%s", this.migrationContext.DatabaseName))
env = append(env, fmt.Sprintf("GH_OST_TABLE_NAME=%s", this.migrationContext.OriginalTableName)) env = append(env, fmt.Sprintf("GH_OST_TABLE_NAME=%s", this.migrationContext.OriginalTableName))
@ -37,13 +51,20 @@ func (this *HooksExecutor) applyEnvironmentVairables() []string {
env = append(env, fmt.Sprintf("GH_OST_OLD_TABLE_NAME=%s", this.migrationContext.GetOldTableName())) env = append(env, fmt.Sprintf("GH_OST_OLD_TABLE_NAME=%s", this.migrationContext.GetOldTableName()))
env = append(env, fmt.Sprintf("GH_OST_DDL=%s", this.migrationContext.AlterStatement)) env = append(env, fmt.Sprintf("GH_OST_DDL=%s", this.migrationContext.AlterStatement))
env = append(env, fmt.Sprintf("GH_OST_ELAPSED_SECONDS=%f", this.migrationContext.ElapsedTime().Seconds())) env = append(env, fmt.Sprintf("GH_OST_ELAPSED_SECONDS=%f", this.migrationContext.ElapsedTime().Seconds()))
env = append(env, fmt.Sprintf("GH_OST_MIGRATED_HOST=%s", this.migrationContext.ApplierConnectionConfig.ImpliedKey.Hostname))
env = append(env, fmt.Sprintf("GH_OST_INSPECTED_HOST=%s", this.migrationContext.InspectorConnectionConfig.ImpliedKey.Hostname))
env = append(env, fmt.Sprintf("GH_OST_EXECUTING_HOST=%s", this.migrationContext.Hostname))
for _, variable := range extraVariables {
env = append(env, variable)
}
return env return env
} }
// executeHook executes a command with arguments, and set relevant environment variables // executeHook executes a command, and sets relevant environment variables
func (this *HooksExecutor) executeHook(hook string, arguments ...string) error { func (this *HooksExecutor) executeHook(hook string, extraVariables ...string) error {
cmd := exec.Command(hook, arguments...) cmd := exec.Command(hook)
cmd.Env = this.applyEnvironmentVairables() cmd.Env = this.applyEnvironmentVairables(extraVariables...)
if err := cmd.Run(); err != nil { if err := cmd.Run(); err != nil {
return log.Errore(err) return log.Errore(err)
@ -60,13 +81,13 @@ func (this *HooksExecutor) detectHooks(baseName string) (hooks []string, err err
return hooks, err return hooks, err
} }
func (this *HooksExecutor) executeHooks(baseName string) error { func (this *HooksExecutor) executeHooks(baseName string, extraVariables ...string) error {
hooks, err := this.detectHooks(baseName) hooks, err := this.detectHooks(baseName)
if err != nil { if err != nil {
return err return err
} }
for _, hook := range hooks { for _, hook := range hooks {
if err := this.executeHook(hook); err != nil { if err := this.executeHook(hook, extraVariables...); err != nil {
return err return err
} }
} }
@ -74,41 +95,47 @@ func (this *HooksExecutor) executeHooks(baseName string) error {
} }
func (this *HooksExecutor) onStartup() error { func (this *HooksExecutor) onStartup() error {
return nil return this.executeHooks(onStartup)
} }
func (this *HooksExecutor) onValidated() error { func (this *HooksExecutor) onValidated() error {
return nil return this.executeHooks(onValidated)
} }
func (this *HooksExecutor) onAboutToRowCopy() error { func (this *HooksExecutor) onAboutToRowCopy() error {
return nil return this.executeHooks(onAboutToRowCopy)
} }
func (this *HooksExecutor) onRowCopyComplete() error { func (this *HooksExecutor) onRowCopyComplete() error {
return nil return this.executeHooks(onRowCopyComplete)
} }
func (this *HooksExecutor) onBeginPostponed() error { func (this *HooksExecutor) onBeginPostponed() error {
return nil return this.executeHooks(onBeginPostponed)
} }
func (this *HooksExecutor) onAboutToCutOver() error { func (this *HooksExecutor) onAboutToCutOver() error {
return nil return this.executeHooks(onAboutToCutOver)
} }
func (this *HooksExecutor) onInteractiveCommand(command string) error { func (this *HooksExecutor) onInteractiveCommand(command string) error {
return nil v := fmt.Sprintf("GH_OST_COMMAND='%s'", command)
return this.executeHooks(onInteractiveCommand, v)
} }
func (this *HooksExecutor) onSuccess() error { func (this *HooksExecutor) onSuccess() error {
return nil return this.executeHooks(onSuccess)
} }
func (this *HooksExecutor) onFailure() error { func (this *HooksExecutor) onFailure() error {
return nil return this.executeHooks(onFailure)
} }
func (this *HooksExecutor) onStatus(statusMessage string) error { func (this *HooksExecutor) onStatus(statusMessage string) error {
return nil v := fmt.Sprintf("GH_OST_STATUS='%s'", statusMessage)
return this.executeHooks(onStatus, v)
}
func (this *HooksExecutor) onStopReplication() error {
return this.executeHooks(onStopReplication)
} }

View File

@ -58,7 +58,6 @@ type Migrator struct {
server *Server server *Server
hooksExecutor *HooksExecutor hooksExecutor *HooksExecutor
migrationContext *base.MigrationContext migrationContext *base.MigrationContext
hostname string
tablesInPlace chan bool tablesInPlace chan bool
rowCopyComplete chan bool rowCopyComplete chan bool
@ -375,7 +374,7 @@ func (this *Migrator) validateStatement() (err error) {
func (this *Migrator) Migrate() (err error) { func (this *Migrator) Migrate() (err error) {
log.Infof("Migrating %s.%s", sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.OriginalTableName)) log.Infof("Migrating %s.%s", sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.OriginalTableName))
this.migrationContext.StartTime = time.Now() this.migrationContext.StartTime = time.Now()
if this.hostname, err = os.Hostname(); err != nil { if this.migrationContext.Hostname, err = os.Hostname(); err != nil {
return err return err
} }
@ -473,6 +472,12 @@ func (this *Migrator) Migrate() (err error) {
return nil return nil
} }
// ExecOnFailureHook executes the onFailure hook, and this method is provided as the only external
// hook access point
func (this *Migrator) ExecOnFailureHook() (err error) {
return this.hooksExecutor.onFailure()
}
// cutOver performs the final step of migration, based on migration // cutOver performs the final step of migration, based on migration
// type (on replica? bumpy? safe?) // type (on replica? bumpy? safe?)
func (this *Migrator) cutOver() (err error) { func (this *Migrator) cutOver() (err error) {
@ -516,6 +521,7 @@ func (this *Migrator) cutOver() (err error) {
// and swap the tables. // and swap the tables.
// The difference is that we will later swap the tables back. // The difference is that we will later swap the tables back.
log.Debugf("testing on replica. Stopping replication IO thread") log.Debugf("testing on replica. Stopping replication IO thread")
this.hooksExecutor.onStopReplication()
if err := this.retryOperation(this.applier.StopReplication); err != nil { if err := this.retryOperation(this.applier.StopReplication); err != nil {
return err return err
} }
@ -909,7 +915,7 @@ func (this *Migrator) printMigrationStatusHint(writers ...io.Writer) {
fmt.Fprintln(w, fmt.Sprintf("# Migrating %+v; inspecting %+v; executing on %+v", fmt.Fprintln(w, fmt.Sprintf("# Migrating %+v; inspecting %+v; executing on %+v",
*this.applier.connectionConfig.ImpliedKey, *this.applier.connectionConfig.ImpliedKey,
*this.inspector.connectionConfig.ImpliedKey, *this.inspector.connectionConfig.ImpliedKey,
this.hostname, this.migrationContext.Hostname,
)) ))
fmt.Fprintln(w, fmt.Sprintf("# Migration started at %+v", fmt.Fprintln(w, fmt.Sprintf("# Migration started at %+v",
this.migrationContext.StartTime.Format(time.RubyDate), this.migrationContext.StartTime.Format(time.RubyDate),