something that works! True resurrection applied

This commit is contained in:
Shlomi Noach 2016-12-21 17:55:40 +02:00
parent bad30a8871
commit 5f25f741ad
5 changed files with 82 additions and 16 deletions

View File

@ -198,10 +198,10 @@ type ContextConfig struct {
var context *MigrationContext
func init() {
context = newMigrationContext()
context = NewMigrationContext()
}
func newMigrationContext() *MigrationContext {
func NewMigrationContext() *MigrationContext {
return &MigrationContext{
defaultNumRetries: 60,
ChunkSize: 1000,
@ -250,6 +250,37 @@ func (this *MigrationContext) ToJSON() (string, error) {
return string(jsonBytes), nil
}
// LoadJSON treats given json as context-dump, and attempts to load this context's data.
func (this *MigrationContext) LoadJSON(jsonString string) error {
this.throttleMutex.Lock()
defer this.throttleMutex.Unlock()
// Some stuff that is in context but is more of a config that may be overriden by --resurrect kind of execution:
// Push
hooksPath := this.HooksPath
jsonBytes := []byte(jsonString)
err := json.Unmarshal(jsonBytes, this)
if this.MigrationRangeMinValues, err = sql.NewColumnValuesFromBase64(this.EncodedRangeValues["MigrationRangeMinValues"]); err != nil {
return err
}
if this.MigrationRangeMaxValues, err = sql.NewColumnValuesFromBase64(this.EncodedRangeValues["MigrationRangeMaxValues"]); err != nil {
return err
}
if this.MigrationIterationRangeMinValues, err = sql.NewColumnValuesFromBase64(this.EncodedRangeValues["MigrationIterationRangeMinValues"]); err != nil {
return err
}
if this.MigrationIterationRangeMaxValues, err = sql.NewColumnValuesFromBase64(this.EncodedRangeValues["MigrationIterationRangeMaxValues"]); err != nil {
return err
}
// Pop
this.HooksPath = hooksPath
return err
}
// GetGhostTableName generates the name of ghost table, based on original table name
func (this *MigrationContext) GetGhostTableName() string {
return fmt.Sprintf("_%s_gho", this.OriginalTableName)

View File

@ -680,18 +680,18 @@ func (this *Inspector) showCreateTable(tableName string) (createTableStatement s
}
// readChangelogState reads changelog hints
func (this *Inspector) readChangelogState() (map[string]string, error) {
func (this *Inspector) readChangelogState(hint string) (string, error) {
query := fmt.Sprintf(`
select hint, value from %s.%s where id <= 255
select hint, value from %s.%s where hint = ? and id <= 255
`,
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.GetChangelogTableName()),
)
result := make(map[string]string)
result := ""
err := sqlutils.QueryRowsMap(this.db, query, func(m sqlutils.RowMap) error {
result[m.GetString("hint")] = m.GetString("value")
result = m.GetString("value")
return nil
})
}, hint)
return result, err
}

View File

@ -272,6 +272,38 @@ func (this *Migrator) countTableRows() (err error) {
return countRowsFunc()
}
func (this *Migrator) resurrect() error {
encodedContext, err := this.inspector.readChangelogState("context")
if err != nil {
return err
}
if encodedContext == "" {
return fmt.Errorf("No resurrect info found")
}
log.Infof("Proceeding to resurrection")
// Dry run: loading migration context to a temporary location just to confirm there's no errors:
loadedContext := base.NewMigrationContext()
if err := loadedContext.LoadJSON(encodedContext); err != nil {
return err
}
// Sanity: heuristically verify loaded context truly reflects our very own context (e.g. is this the same migration on the same table?)
if this.migrationContext.DatabaseName != loadedContext.DatabaseName {
return fmt.Errorf("Resurrection: given --database not identical to resurrected one. Bailing out")
}
if this.migrationContext.OriginalTableName != loadedContext.OriginalTableName {
return fmt.Errorf("Resurrection: given --table not identical to resurrected one. Bailing out")
}
if this.migrationContext.AlterStatement != loadedContext.AlterStatement {
return fmt.Errorf("Resurrection: given --alter statement not identical to resurrected one. Bailing out")
}
// Happy. Let's go live and load the context for real.
if err := this.migrationContext.LoadJSON(encodedContext); err != nil {
return err
}
return nil
}
// Migrate executes the complete migration logic. This is *the* major gh-ost function.
func (this *Migrator) Migrate() (err error) {
log.Infof("Migrating %s.%s", sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.OriginalTableName))
@ -340,6 +372,11 @@ func (this *Migrator) Migrate() (err error) {
if err := this.hooksExecutor.onBeforeRowCopy(); err != nil {
return err
}
if this.migrationContext.Resurrect {
if err := this.resurrect(); err != nil {
return err
}
}
go this.executeWriteFuncs()
go this.iterateChunks()
this.migrationContext.MarkRowCopyStartTime()

View File

@ -83,11 +83,9 @@ func (this *Throttler) collectHeartbeat() {
if atomic.LoadInt64(&this.migrationContext.CleanupImminentFlag) > 0 {
return nil
}
changelogState, err := this.inspector.readChangelogState()
if err != nil {
if heartbeatValue, err := this.inspector.readChangelogState("heartbeat"); err != nil {
return log.Errore(err)
}
if heartbeatValue, ok := changelogState["heartbeat"]; ok {
} else {
this.parseChangelogHeartbeat(heartbeatValue)
}
return nil

View File

@ -9,7 +9,7 @@ import (
"bytes"
"encoding/base64"
"encoding/gob"
"encoding/json"
// "encoding/json"
"fmt"
"reflect"
"strconv"
@ -277,10 +277,10 @@ func (this *ColumnValues) ToBase64() (b64 string, err error) {
return base64.StdEncoding.EncodeToString(buff.Bytes()), nil
}
// MarshalJSON will marshal this object as JSON
func (this *ColumnValues) MarshalJSON() ([]byte, error) {
return json.Marshal(this.abstractValues)
}
// // MarshalJSON will marshal this object as JSON
// func (this *ColumnValues) MarshalJSON() ([]byte, error) {
// return json.Marshal(this.abstractValues)
// }
func (this *ColumnValues) AbstractValues() []interface{} {
return this.abstractValues