initial support for hooks

This commit is contained in:
Shlomi Noach 2016-08-19 14:52:49 +02:00
parent 36a28637f2
commit cdf393a30e
2 changed files with 130 additions and 2 deletions

87
go/logic/hooks.go Normal file
View File

@ -0,0 +1,87 @@
/*
Copyright 2016 GitHub Inc.
See https://github.com/github/gh-ost/blob/master/LICENSE
*/
package logic
import (
"fmt"
"os"
"os/exec"
"github.com/github/gh-ost/go/base"
"github.com/openark/golib/log"
)
type HooksExecutor struct {
migrationContext *base.MigrationContext
}
func NewHooksExecutor() *HooksExecutor {
return &HooksExecutor{
migrationContext: base.GetMigrationContext(),
}
}
func (this *HooksExecutor) detectHooks() error {
return nil
}
func (this *HooksExecutor) applyEnvironmentVairables() []string {
env := os.Environ()
env = append(env, fmt.Sprintf("GH_OST_DATABASE_NAME=%s", this.migrationContext.DatabaseName))
env = append(env, fmt.Sprintf("GH_OST_TABLE_NAME=%s", this.migrationContext.OriginalTableName))
env = append(env, fmt.Sprintf("GH_OST_GHOST_TABLE_NAME=%s", this.migrationContext.GetGhostTableName()))
env = append(env, fmt.Sprintf("GH_OST_OLD_TABLE_NAME=%s", this.migrationContext.GetOldTableName()))
env = append(env, fmt.Sprintf("GH_OST_DDL=%s", this.migrationContext.AlterStatement))
env = append(env, fmt.Sprintf("GH_OST_ELAPSED_SECONDS=%f", this.migrationContext.ElapsedTime().Seconds()))
return env
}
// commandRun executes a command with arguments, and set relevant environment variables
func (this *HooksExecutor) commandRun(commandText string, arguments ...string) error {
cmd := exec.Command(commandText, arguments...)
cmd.Env = this.applyEnvironmentVairables()
if err := cmd.Run(); err != nil {
return log.Errore(err)
}
return nil
}
func (this *HooksExecutor) onStartup() error {
return nil
}
func (this *HooksExecutor) onValidated() error {
return nil
}
func (this *HooksExecutor) onAboutToRowCopy() error {
return nil
}
func (this *HooksExecutor) onRowCopyComplete() error {
return nil
}
func (this *HooksExecutor) onBeginPostponed() error {
return nil
}
func (this *HooksExecutor) onAboutToCutOver() error {
return nil
}
func (this *HooksExecutor) onInteractiveCommand(command string) error {
return nil
}
func (this *HooksExecutor) onSuccess() error {
return nil
}
func (this *HooksExecutor) onFailure() error {
return nil
}

View File

@ -56,6 +56,7 @@ type Migrator struct {
applier *Applier
eventsStreamer *EventsStreamer
server *Server
hooksExecutor *HooksExecutor
migrationContext *base.MigrationContext
hostname string
@ -110,6 +111,15 @@ func (this *Migrator) acceptSignals() {
}()
}
// initiateHooksExecutor
func (this *Migrator) initiateHooksExecutor() (err error) {
this.hooksExecutor = NewHooksExecutor()
if err := this.hooksExecutor.detectHooks(); err != nil {
return err
}
return nil
}
// shouldThrottle performs checks to see whether we should currently be throttling.
// It also checks for critical-load and panic aborts.
func (this *Migrator) shouldThrottle() (result bool, reason string) {
@ -277,7 +287,7 @@ func (this *Migrator) executeAndThrottleOnError(operation func() error) (err err
}
// consumeRowCopyComplete blocks on the rowCopyComplete channel once, and then
// consumers and drops any further incoming events that may be left hanging.
// consumes and drops any further incoming events that may be left hanging.
func (this *Migrator) consumeRowCopyComplete() {
<-this.rowCopyComplete
atomic.StoreInt64(&this.rowCopyCompleteFlag, 1)
@ -371,6 +381,12 @@ func (this *Migrator) Migrate() (err error) {
go this.listenOnPanicAbort()
if err := this.initiateHooksExecutor(); err != nil {
return err
}
if err := this.hooksExecutor.onStartup(); err != nil {
return err
}
if err := this.parser.ParseAlterStatement(this.migrationContext.AlterStatement); err != nil {
return err
}
@ -397,6 +413,11 @@ func (this *Migrator) Migrate() (err error) {
if err := this.inspector.InspectOriginalAndGhostTables(); err != nil {
return err
}
// Validation complete! We're good to execute this migration
if err := this.hooksExecutor.onValidated(); err != nil {
return err
}
if err := this.initiateServer(); err != nil {
return err
}
@ -419,6 +440,9 @@ func (this *Migrator) Migrate() (err error) {
return err
}
go this.initiateThrottler()
if err := this.hooksExecutor.onAboutToRowCopy(); err != nil {
return err
}
go this.executeWriteFuncs()
go this.iterateChunks()
this.migrationContext.MarkRowCopyStartTime()
@ -427,8 +451,14 @@ func (this *Migrator) Migrate() (err error) {
log.Debugf("Operating until row copy is complete")
this.consumeRowCopyComplete()
log.Infof("Row copy complete")
if err := this.hooksExecutor.onRowCopyComplete(); err != nil {
return err
}
this.printStatus(ForcePrintStatusRule)
if err := this.hooksExecutor.onAboutToCutOver(); err != nil {
return err
}
if err := this.cutOver(); err != nil {
return err
}
@ -436,6 +466,9 @@ func (this *Migrator) Migrate() (err error) {
if err := this.finalCleanup(); err != nil {
return nil
}
if err := this.hooksExecutor.onSuccess(); err != nil {
return err
}
log.Infof("Done migrating %s.%s", sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.OriginalTableName))
return nil
}
@ -463,8 +496,12 @@ func (this *Migrator) cutOver() (err error) {
}
if base.FileExists(this.migrationContext.PostponeCutOverFlagFile) {
// Throttle file defined and exists!
if atomic.LoadInt64(&this.migrationContext.IsPostponingCutOver) == 0 {
if err := this.hooksExecutor.onBeginPostponed(); err != nil {
return true, err
}
}
atomic.StoreInt64(&this.migrationContext.IsPostponingCutOver, 1)
//log.Debugf("Postponing final table swap as flag file exists: %+v", this.migrationContext.PostponeCutOverFlagFile)
return true, nil
}
return false, nil
@ -659,6 +696,10 @@ func (this *Migrator) onServerCommand(command string, writer *bufio.Writer) (err
throttleHint := "# Note: you may only throttle for as long as your binary logs are not purged\n"
if err := this.hooksExecutor.onInteractiveCommand(command); err != nil {
return err
}
switch command {
case "help":
{