gh-ost/go/logic/hooks.go

163 lines
5.9 KiB
Go
Raw Normal View History

2016-08-25 12:42:49 +00:00
/*
Copyright 2022 GitHub Inc.
2016-08-19 12:52:49 +00:00
See https://github.com/github/gh-ost/blob/master/LICENSE
*/
package logic
import (
"fmt"
2022-10-21 15:47:39 +00:00
"io"
2016-08-19 12:52:49 +00:00
"os"
"os/exec"
2016-08-20 06:24:20 +00:00
"path/filepath"
2016-09-02 08:48:29 +00:00
"sync/atomic"
2016-08-19 12:52:49 +00:00
"github.com/github/gh-ost/go/base"
"github.com/openark/golib/log"
2016-08-19 12:52:49 +00:00
)
const (
onStartup = "gh-ost-on-startup"
onValidated = "gh-ost-on-validated"
onRowCountComplete = "gh-ost-on-rowcount-complete"
2016-08-23 09:40:32 +00:00
onBeforeRowCopy = "gh-ost-on-before-row-copy"
onRowCopyComplete = "gh-ost-on-row-copy-complete"
onBeginPostponed = "gh-ost-on-begin-postponed"
2016-08-23 09:40:32 +00:00
onBeforeCutOver = "gh-ost-on-before-cut-over"
onInteractiveCommand = "gh-ost-on-interactive-command"
onSuccess = "gh-ost-on-success"
onFailure = "gh-ost-on-failure"
onStatus = "gh-ost-on-status"
onStopReplication = "gh-ost-on-stop-replication"
onStartReplication = "gh-ost-on-start-replication"
)
2016-08-19 12:52:49 +00:00
type HooksExecutor struct {
migrationContext *base.MigrationContext
2022-10-21 15:47:39 +00:00
writer io.Writer
2016-08-19 12:52:49 +00:00
}
2017-08-08 20:36:54 +00:00
func NewHooksExecutor(migrationContext *base.MigrationContext) *HooksExecutor {
2016-08-19 12:52:49 +00:00
return &HooksExecutor{
2017-08-08 20:36:54 +00:00
migrationContext: migrationContext,
2022-10-21 15:47:39 +00:00
writer: os.Stderr,
2016-08-19 12:52:49 +00:00
}
}
func (this *HooksExecutor) applyEnvironmentVariables(extraVariables ...string) []string {
2016-08-19 12:52:49 +00:00
env := os.Environ()
env = append(env, fmt.Sprintf("GH_OST_DATABASE_NAME=%s", this.migrationContext.DatabaseName))
env = append(env, fmt.Sprintf("GH_OST_TABLE_NAME=%s", this.migrationContext.OriginalTableName))
env = append(env, fmt.Sprintf("GH_OST_GHOST_TABLE_NAME=%s", this.migrationContext.GetGhostTableName()))
env = append(env, fmt.Sprintf("GH_OST_OLD_TABLE_NAME=%s", this.migrationContext.GetOldTableName()))
env = append(env, fmt.Sprintf("GH_OST_DDL=%s", this.migrationContext.AlterStatement))
env = append(env, fmt.Sprintf("GH_OST_ELAPSED_SECONDS=%f", this.migrationContext.ElapsedTime().Seconds()))
2016-09-02 08:48:29 +00:00
env = append(env, fmt.Sprintf("GH_OST_ELAPSED_COPY_SECONDS=%f", this.migrationContext.ElapsedRowCopyTime().Seconds()))
estimatedRows := atomic.LoadInt64(&this.migrationContext.RowsEstimate) + atomic.LoadInt64(&this.migrationContext.RowsDeltaEstimate)
env = append(env, fmt.Sprintf("GH_OST_ESTIMATED_ROWS=%d", estimatedRows))
totalRowsCopied := this.migrationContext.GetTotalRowsCopied()
env = append(env, fmt.Sprintf("GH_OST_COPIED_ROWS=%d", totalRowsCopied))
env = append(env, fmt.Sprintf("GH_OST_MIGRATED_HOST=%s", this.migrationContext.GetApplierHostname()))
env = append(env, fmt.Sprintf("GH_OST_INSPECTED_HOST=%s", this.migrationContext.GetInspectorHostname()))
env = append(env, fmt.Sprintf("GH_OST_EXECUTING_HOST=%s", this.migrationContext.Hostname))
env = append(env, fmt.Sprintf("GH_OST_INSPECTED_LAG=%f", this.migrationContext.GetCurrentLagDuration().Seconds()))
env = append(env, fmt.Sprintf("GH_OST_HEARTBEAT_LAG=%f", this.migrationContext.TimeSinceLastHeartbeatOnChangelog().Seconds()))
env = append(env, fmt.Sprintf("GH_OST_PROGRESS=%f", this.migrationContext.GetProgressPct()))
hooks: reporting GH_OST_ETA_SECONDS. ETA as part of migration context (#936) * v1.1.0 * WIP: copying AUTO_INCREMENT value to ghost table Initial commit: towards setting up a test suite Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> * greping for 'expect_table_structure' content * Adding simple test for 'expect_table_structure' scenario * adding tests for AUTO_INCREMENT value after row deletes. Should initially fail * clear event beforehand * parsing AUTO_INCREMENT from alter query, reading AUTO_INCREMENT from original table, applying AUTO_INCREMENT value onto ghost table if applicable and user has not specified AUTO_INCREMENT in alter statement * support GetUint64 Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> * minor update to test Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> * adding test for user defined AUTO_INCREMENT statement * Generated column as part of UNIQUE (or PRIMARY) KEY Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> * skip analysis of generated column data type in unique key Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> * All MySQL DBs limited to max 3 concurrent/idle connections Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> * hooks: reporting GH_OST_ETA_SECONDS. ETA stored as part of migration context Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> * GH_OST_ETA_NANOSECONDS Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> * N/A denoted by negative value Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> * ETAUnknown constant Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com>
2021-05-31 12:15:51 +00:00
env = append(env, fmt.Sprintf("GH_OST_ETA_SECONDS=%d", this.migrationContext.GetETASeconds()))
env = append(env, fmt.Sprintf("GH_OST_HOOKS_HINT=%s", this.migrationContext.HooksHintMessage))
env = append(env, fmt.Sprintf("GH_OST_HOOKS_HINT_OWNER=%s", this.migrationContext.HooksHintOwner))
env = append(env, fmt.Sprintf("GH_OST_HOOKS_HINT_TOKEN=%s", this.migrationContext.HooksHintToken))
env = append(env, fmt.Sprintf("GH_OST_DRY_RUN=%t", this.migrationContext.Noop))
env = append(env, extraVariables...)
2016-08-19 12:52:49 +00:00
return env
}
// executeHook executes a command, and sets relevant environment variables
2022-10-21 15:47:39 +00:00
// combined output & error are printed to the configured writer.
func (this *HooksExecutor) executeHook(hook string, extraVariables ...string) error {
cmd := exec.Command(hook)
cmd.Env = this.applyEnvironmentVariables(extraVariables...)
2016-08-19 12:52:49 +00:00
combinedOutput, err := cmd.CombinedOutput()
2022-10-21 15:47:39 +00:00
fmt.Fprintln(this.writer, string(combinedOutput))
return log.Errore(err)
2016-08-19 12:52:49 +00:00
}
2016-08-20 06:24:20 +00:00
func (this *HooksExecutor) detectHooks(baseName string) (hooks []string, err error) {
if this.migrationContext.HooksPath == "" {
return hooks, err
}
pattern := fmt.Sprintf("%s/%s*", this.migrationContext.HooksPath, baseName)
hooks, err = filepath.Glob(pattern)
return hooks, err
}
func (this *HooksExecutor) executeHooks(baseName string, extraVariables ...string) error {
2016-08-20 06:24:20 +00:00
hooks, err := this.detectHooks(baseName)
if err != nil {
return err
}
for _, hook := range hooks {
2016-08-25 12:42:49 +00:00
log.Infof("executing %+v hook: %+v", baseName, hook)
if err := this.executeHook(hook, extraVariables...); err != nil {
2016-08-20 06:24:20 +00:00
return err
}
}
return nil
}
2016-08-19 12:52:49 +00:00
func (this *HooksExecutor) onStartup() error {
return this.executeHooks(onStartup)
2016-08-19 12:52:49 +00:00
}
func (this *HooksExecutor) onValidated() error {
return this.executeHooks(onValidated)
2016-08-19 12:52:49 +00:00
}
func (this *HooksExecutor) onRowCountComplete() error {
return this.executeHooks(onRowCountComplete)
}
2016-08-23 09:40:32 +00:00
func (this *HooksExecutor) onBeforeRowCopy() error {
return this.executeHooks(onBeforeRowCopy)
2016-08-19 12:52:49 +00:00
}
func (this *HooksExecutor) onRowCopyComplete() error {
return this.executeHooks(onRowCopyComplete)
2016-08-19 12:52:49 +00:00
}
func (this *HooksExecutor) onBeginPostponed() error {
return this.executeHooks(onBeginPostponed)
2016-08-19 12:52:49 +00:00
}
2016-08-23 09:40:32 +00:00
func (this *HooksExecutor) onBeforeCutOver() error {
return this.executeHooks(onBeforeCutOver)
2016-08-19 12:52:49 +00:00
}
func (this *HooksExecutor) onInteractiveCommand(command string) error {
v := fmt.Sprintf("GH_OST_COMMAND='%s'", command)
return this.executeHooks(onInteractiveCommand, v)
2016-08-19 12:52:49 +00:00
}
func (this *HooksExecutor) onSuccess() error {
return this.executeHooks(onSuccess)
2016-08-19 12:52:49 +00:00
}
func (this *HooksExecutor) onFailure() error {
return this.executeHooks(onFailure)
2016-08-19 12:52:49 +00:00
}
2016-08-22 14:24:41 +00:00
2016-08-29 08:44:43 +00:00
func (this *HooksExecutor) onStatus(statusMessage string) error {
v := fmt.Sprintf("GH_OST_STATUS='%s'", statusMessage)
return this.executeHooks(onStatus, v)
}
func (this *HooksExecutor) onStopReplication() error {
return this.executeHooks(onStopReplication)
2016-08-22 14:24:41 +00:00
}
func (this *HooksExecutor) onStartReplication() error {
return this.executeHooks(onStartReplication)
}