resurrection: dump/restore of migration context cross executions

This commit is contained in:
Shlomi Noach 2016-12-18 09:23:51 +02:00
parent 863f50808c
commit 66894d3a52
5 changed files with 66 additions and 15 deletions

View File

@ -6,7 +6,9 @@
package base
import (
"encoding/json"
"fmt"
"io/ioutil"
"os"
"regexp"
"strings"
@ -158,7 +160,6 @@ type MigrationContext struct {
UserCommandedUnpostponeFlag int64
CutOverCompleteFlag int64
InCutOverCriticalSectionFlag int64
PanicAbort chan error
OriginalTableColumnsOnApplier *sql.ColumnList
OriginalTableColumns *sql.ColumnList
@ -174,8 +175,6 @@ type MigrationContext struct {
Iteration int64
MigrationIterationRangeMinValues *sql.ColumnValues
MigrationIterationRangeMaxValues *sql.ColumnValues
CanStopStreaming func() bool
}
type ContextConfig struct {
@ -212,7 +211,6 @@ func newMigrationContext() *MigrationContext {
configMutex: &sync.Mutex{},
pointOfInterestTimeMutex: &sync.Mutex{},
ColumnRenameMap: make(map[string]string),
PanicAbort: make(chan error),
}
}
@ -221,6 +219,23 @@ func GetMigrationContext() *MigrationContext {
return context
}
// ToJSON exports this config to JSON string
func (this *MigrationContext) ToJSON() (string, error) {
b, err := json.Marshal(this)
return string(b), err
}
// DumpJSON exports this config to JSON string and writes it to file
func (this *MigrationContext) DumpJSON() (fileName string, err error) {
jsonBytes, err := json.Marshal(this)
if err != nil {
return fileName, err
}
fileName = fmt.Sprintf("%s/gh-ost.%s.%d.context.json", "/tmp", this.OriginalTableName, this.ElapsedTime())
err = ioutil.WriteFile(fileName, jsonBytes, 0644)
return fileName, err
}
// GetGhostTableName generates the name of ghost table, based on original table name
func (this *MigrationContext) GetGhostTableName() string {
return fmt.Sprintf("_%s_gho", this.OriginalTableName)

View File

@ -74,6 +74,8 @@ type Migrator struct {
applyEventsQueue chan tableWriteFunc
handledChangelogStates map[string]bool
contextDumpFiles []string
panicAbort chan error
}
func NewMigrator() *Migrator {
@ -88,6 +90,9 @@ func NewMigrator() *Migrator {
copyRowsQueue: make(chan tableWriteFunc),
applyEventsQueue: make(chan tableWriteFunc, applyEventsQueueBuffer),
handledChangelogStates: make(map[string]bool),
contextDumpFiles: []string{},
panicAbort: make(chan error),
}
return migrator
}
@ -116,6 +121,24 @@ func (this *Migrator) initiateHooksExecutor() (err error) {
return nil
}
// initiateContextDump
func (this *Migrator) initiateContextDump() (err error) {
go func() {
contextDumpTick := time.Tick(1 * time.Minute)
for range contextDumpTick {
if dumpFile, err := this.migrationContext.DumpJSON(); err == nil {
this.contextDumpFiles = append(this.contextDumpFiles, dumpFile)
if len(this.contextDumpFiles) > 2 {
oldDumpFile := this.contextDumpFiles[0]
this.contextDumpFiles = this.contextDumpFiles[1:]
os.Remove(oldDumpFile)
}
}
}
}()
return nil
}
// sleepWhileTrue sleeps indefinitely until the given function returns 'false'
// (or fails with error)
func (this *Migrator) sleepWhileTrue(operation func() (bool, error)) error {
@ -147,7 +170,7 @@ func (this *Migrator) retryOperation(operation func() error, notFatalHint ...boo
// there's an error. Let's try again.
}
if len(notFatalHint) == 0 {
this.migrationContext.PanicAbort <- err
this.panicAbort <- err
}
return err
}
@ -218,7 +241,7 @@ func (this *Migrator) onChangelogStateEvent(dmlEvent *binlog.BinlogDMLEvent) (er
// listenOnPanicAbort aborts on abort request
func (this *Migrator) listenOnPanicAbort() {
err := <-this.migrationContext.PanicAbort
err := <-this.panicAbort
log.Fatale(err)
}
@ -278,6 +301,9 @@ func (this *Migrator) Migrate() (err error) {
if err := this.initiateHooksExecutor(); err != nil {
return err
}
if err := this.initiateContextDump(); err != nil {
return err
}
if err := this.hooksExecutor.onStartup(); err != nil {
return err
}
@ -608,7 +634,7 @@ func (this *Migrator) initiateServer() (err error) {
var f printStatusFunc = func(rule PrintStatusRule, writer io.Writer) {
this.printStatus(rule, writer)
}
this.server = NewServer(this.hooksExecutor, f)
this.server = NewServer(this.hooksExecutor, f, this.panicAbort)
if err := this.server.BindSocketFile(); err != nil {
return err
}
@ -896,7 +922,7 @@ func (this *Migrator) initiateStreaming() error {
log.Debugf("Beginning streaming")
err := this.eventsStreamer.StreamEvents(this.canStopStreaming)
if err != nil {
this.migrationContext.PanicAbort <- err
this.panicAbort <- err
}
log.Debugf("Done streaming")
}()
@ -924,7 +950,7 @@ func (this *Migrator) addDMLEventsListener() error {
// initiateThrottler kicks in the throttling collection and the throttling checks.
func (this *Migrator) initiateThrottler() error {
this.throttler = NewThrottler(this.applier, this.inspector)
this.throttler = NewThrottler(this.applier, this.inspector, this.panicAbort)
go this.throttler.initiateThrottlerCollection(this.firstThrottlingCollected)
log.Infof("Waiting for first throttle metrics to be collected")

View File

@ -28,13 +28,15 @@ type Server struct {
tcpListener net.Listener
hooksExecutor *HooksExecutor
printStatus printStatusFunc
panicAbort chan error
}
func NewServer(hooksExecutor *HooksExecutor, printStatus printStatusFunc) *Server {
func NewServer(hooksExecutor *HooksExecutor, printStatus printStatusFunc, panicAbort chan error) *Server {
return &Server{
migrationContext: base.GetMigrationContext(),
hooksExecutor: hooksExecutor,
printStatus: printStatus,
panicAbort: panicAbort,
}
}
@ -251,7 +253,7 @@ help # This message
case "panic":
{
err := fmt.Errorf("User commanded 'panic'. I will now panic, without cleanup. PANIC!")
this.migrationContext.PanicAbort <- err
this.panicAbort <- err
return NoPrintStatusRule, err
}
default:

View File

@ -21,13 +21,15 @@ type Throttler struct {
migrationContext *base.MigrationContext
applier *Applier
inspector *Inspector
panicAbort chan error
}
func NewThrottler(applier *Applier, inspector *Inspector) *Throttler {
func NewThrottler(applier *Applier, inspector *Inspector, panicAbort chan error) *Throttler {
return &Throttler{
migrationContext: base.GetMigrationContext(),
applier: applier,
inspector: inspector,
panicAbort: panicAbort,
}
}
@ -155,7 +157,7 @@ func (this *Throttler) collectGeneralThrottleMetrics() error {
// Regardless of throttle, we take opportunity to check for panic-abort
if this.migrationContext.PanicFlagFile != "" {
if base.FileExists(this.migrationContext.PanicFlagFile) {
this.migrationContext.PanicAbort <- fmt.Errorf("Found panic-file %s. Aborting without cleanup", this.migrationContext.PanicFlagFile)
this.panicAbort <- fmt.Errorf("Found panic-file %s. Aborting without cleanup", this.migrationContext.PanicFlagFile)
}
}
@ -164,7 +166,7 @@ func (this *Throttler) collectGeneralThrottleMetrics() error {
return setThrottle(true, fmt.Sprintf("%s %s", variableName, err), base.NoThrottleReasonHint)
}
if criticalLoadMet && this.migrationContext.CriticalLoadIntervalMilliseconds == 0 {
this.migrationContext.PanicAbort <- fmt.Errorf("critical-load met: %s=%d, >=%d", variableName, value, threshold)
this.panicAbort <- fmt.Errorf("critical-load met: %s=%d, >=%d", variableName, value, threshold)
}
if criticalLoadMet && this.migrationContext.CriticalLoadIntervalMilliseconds > 0 {
log.Errorf("critical-load met once: %s=%d, >=%d. Will check again in %d millis", variableName, value, threshold, this.migrationContext.CriticalLoadIntervalMilliseconds)
@ -172,7 +174,7 @@ func (this *Throttler) collectGeneralThrottleMetrics() error {
timer := time.NewTimer(time.Millisecond * time.Duration(this.migrationContext.CriticalLoadIntervalMilliseconds))
<-timer.C
if criticalLoadMetAgain, variableName, value, threshold, _ := this.criticalLoadIsMet(); criticalLoadMetAgain {
this.migrationContext.PanicAbort <- fmt.Errorf("critical-load met again after %d millis: %s=%d, >=%d", this.migrationContext.CriticalLoadIntervalMilliseconds, variableName, value, threshold)
this.panicAbort <- fmt.Errorf("critical-load met again after %d millis: %s=%d, >=%d", this.migrationContext.CriticalLoadIntervalMilliseconds, variableName, value, threshold)
}
}()
}

View File

@ -6,6 +6,7 @@
package sql
import (
"encoding/json"
"fmt"
"reflect"
"strconv"
@ -247,6 +248,11 @@ func ToColumnValues(abstractValues []interface{}) *ColumnValues {
return result
}
// MarshalJSON will marshal this object as JSON
func (this *ColumnValues) MarshalJSON() ([]byte, error) {
return json.Marshal(this.abstractValues)
}
func (this *ColumnValues) AbstractValues() []interface{} {
return this.abstractValues
}