Compare commits

...

32 Commits

Author SHA1 Message Date
Shlomi Noach
856d0d487a Merge branch 'master' into resurrect 2016-12-30 07:59:21 +02:00
Shlomi Noach
0b6d834a2b Merge branch 'master' into resurrect 2016-12-29 13:12:03 +02:00
Shlomi Noach
7bdfd1bff5 not applying range if nil 2016-12-29 10:26:56 +02:00
Shlomi Noach
24f5c6da62 ght/ghr suffix -> delr suffix 2016-12-29 10:23:48 +02:00
Shlomi Noach
e9e9d6d9da allowing EOF result for loadJSON 2016-12-28 23:17:01 +02:00
Shlomi Noach
e4874c84bd making sure to dump context before row-copy, so we always have some initial resurrection context 2016-12-28 23:06:18 +02:00
Shlomi Noach
90f61f812d resurrected execution does not apply migration range from terminated context 2016-12-28 14:23:50 +02:00
Shlomi Noach
738270aabe more verbose on resurrection 2016-12-28 13:35:56 +02:00
Shlomi Noach
8952e24aba rewinding resurrecting at beginning of known logfile; more verbose 2016-12-28 07:42:14 +02:00
Shlomi Noach
874cf24512 typo 2016-12-25 11:46:53 +02:00
Shlomi Noach
af74e8c6cd Resurrection documentation 2016-12-25 11:46:14 +02:00
Shlomi Noach
0e8e5de7aa added on-resurrecting hook 2016-12-25 08:53:24 +02:00
Shlomi Noach
7dfb740519 format 2016-12-24 17:44:39 +02:00
Shlomi Noach
fa399e0608 added context test, JSON export/import 2016-12-24 17:10:37 +02:00
Shlomi Noach
45b63f6500 applying IsResurrected flag 2016-12-24 10:07:59 +02:00
Shlomi Noach
6128076485 some cleanup 2016-12-24 10:01:03 +02:00
Shlomi Noach
e50361ab61 at resurrection, pointing streamer back at last known applied coordinates 2016-12-24 09:53:55 +02:00
Shlomi Noach
1080b11d81 binlog event listeners accept coordinates.
StreamerBinlogCoordinates -> AppliedBinlogCoordinates
updating AppliedBinlogCoordinates when truly applied; no longer asking streamer for coordinates (because streamer's events can be queued, but not handled, a crash implies we need to look at the last _handled_ event, not the last _streamed_ event)
2016-12-23 15:24:31 +02:00
Shlomi Noach
89ca346919 instead of loading the entire context, only updating particular fields from the resurrected context 2016-12-21 21:10:04 +02:00
Shlomi Noach
5f25f741ad something that works! True resurrection applied 2016-12-21 17:55:40 +02:00
Shlomi Noach
bad30a8871 sanity checks on --resurrection; skipping some normal-mode operations 2016-12-21 09:42:40 +02:00
Shlomi Noach
47d8306c0f comment typo 2016-12-21 09:23:57 +02:00
Shlomi Noach
171cad2a98 sanity checks for resurrection 2016-12-21 09:23:00 +02:00
Shlomi Noach
c72851e1f6 initial support for --resurrect flag 2016-12-20 22:33:44 +02:00
Shlomi Noach
4c6f42f2f1 passwords not exported in MigrationContext 2016-12-20 22:14:17 +02:00
Shlomi Noach
6f81d62a31 storing and updating streamer binlog coordinates 2016-12-20 16:47:06 +02:00
Shlomi Noach
3223a9389e context dump serialized with table writes; avoiding sync problems 2016-12-20 16:38:58 +02:00
Shlomi Noach
5e0f38cc6f Merge branch 'resurrect' of github.com:github/gh-ost into resurrect 2016-12-20 16:27:14 +02:00
Shlomi Noach
6999b4e8bf exporting to changelog table, not to file 2016-12-20 16:27:05 +02:00
Shlomi Noach
776c8d3b8b Merge branch 'master' into resurrect 2016-12-20 16:26:29 +02:00
Shlomi Noach
75b6f9edf2 encoding range values as base64 2016-12-20 15:48:42 +02:00
Shlomi Noach
66894d3a52 resurrection: dump/restore of migration context cross executions 2016-12-18 09:23:51 +02:00
17 changed files with 467 additions and 92 deletions

View File

@ -30,6 +30,7 @@ In addition, it offers many [operational perks](doc/perks.md) that make it safer
- Auditing: you may query `gh-ost` for status. `gh-ost` listens on unix socket or TCP. - Auditing: you may query `gh-ost` for status. `gh-ost` listens on unix socket or TCP.
- Control over cut-over phase: `gh-ost` can be instructed to postpone what is probably the most critical step: the swap of tables, until such time that you're comfortably available. No need to worry about ETA being outside office hours. - Control over cut-over phase: `gh-ost` can be instructed to postpone what is probably the most critical step: the swap of tables, until such time that you're comfortably available. No need to worry about ETA being outside office hours.
- External [hooks](doc/hooks.md) can couple `gh-ost` with your particular environment. - External [hooks](doc/hooks.md) can couple `gh-ost` with your particular environment.
- [Resurrection](doc/resurrect.md) can resume a failed migration, proceeding from last known good position.
Please refer to the [docs](doc) for more information. No, really, read the [docs](doc). Please refer to the [docs](doc) for more information. No, really, read the [docs](doc).
@ -76,19 +77,17 @@ But then a rare genetic mutation happened, and the `c` transformed into `t`. And
## Community ## Community
`gh-ost` is released at a stable state, but with mileage to go. We are [open to pull requests](https://github.com/github/gh-ost/blob/master/.github/CONTRIBUTING.md). Please first discuss your intentions via [Issues](https://github.com/github/gh-ost/issues). `gh-ost` is released at a stable state, and still with mileage to go. We are [open to pull requests](https://github.com/github/gh-ost/blob/master/.github/CONTRIBUTING.md). Please first discuss your intentions via [Issues](https://github.com/github/gh-ost/issues).
We develop `gh-ost` at GitHub and for the community. We may have different priorities than others. From time to time we may suggest a contribution that is not on our immediate roadmap but which may appeal to others. We develop `gh-ost` at GitHub and for the community. We may have different priorities than others. From time to time we may suggest a contribution that is not on our immediate roadmap but which may appeal to others.
## Download/binaries/source ## Download/binaries/source
`gh-ost` is now GA and stable. `gh-ost` is GA and stable, available in binary format for Linux and Mac OS/X
`gh-ost` is available in binary format for Linux and Mac OS/X
[Download latest release here](https://github.com/github/gh-ost/releases/latest) [Download latest release here](https://github.com/github/gh-ost/releases/latest)
`gh-ost` is a Go project; it is built with Go 1.5 with "experimental vendor". Soon to migrate to Go 1.6. See and use [build file](https://github.com/github/gh-ost/blob/master/build.sh) for compiling it on your own. `gh-ost` is a Go project; it is built with Go 1.7. See and use [build file](https://github.com/github/gh-ost/blob/master/build.sh) for compiling it on your own.
Generally speaking, `master` branch is stable, but only [releases](https://github.com/github/gh-ost/releases) are to be used in production. Generally speaking, `master` branch is stable, but only [releases](https://github.com/github/gh-ost/releases) are to be used in production.

View File

@ -1 +1 @@
1.0.32 1.1.0

View File

@ -111,6 +111,14 @@ See also: [Sub-second replication lag throttling](subsecond-lag.md)
Typically `gh-ost` is used to migrate tables on a master. If you wish to only perform the migration in full on a replica, connect `gh-ost` to said replica and pass `--migrate-on-replica`. `gh-ost` will briefly connect to the master but other issue no changes on the master. Migration will be fully executed on the replica, while making sure to maintain a small replication lag. Typically `gh-ost` is used to migrate tables on a master. If you wish to only perform the migration in full on a replica, connect `gh-ost` to said replica and pass `--migrate-on-replica`. `gh-ost` will briefly connect to the master but other issue no changes on the master. Migration will be fully executed on the replica, while making sure to maintain a small replication lag.
### resurrect
It is possible to resurrect/resume a failed migration. Such a migration would be a valid execution, which bailed out throughout the migration process. A migration would bail out on meeting with `--critical-load`, or perhaps a user `kill -9`'d it.
Use `--resurrect` with exact same other flags (same `--database, --table, --alter`) to resume a failed migration.
Read more on [resurrection docs](resurrect.md)
### skip-foreign-key-checks ### skip-foreign-key-checks
By default `gh-ost` verifies no foreign keys exist on the migrated table. On servers with large number of tables this check can take a long time. If you're absolutely certain no foreign keys exist (table does not referenece other table nor is referenced by other tables) and wish to save the check time, provide with `--skip-foreign-key-checks`. By default `gh-ost` verifies no foreign keys exist on the migrated table. On servers with large number of tables this check can take a long time. If you're absolutely certain no foreign keys exist (table does not referenece other table nor is referenced by other tables) and wish to save the check time, provide with `--skip-foreign-key-checks`.

View File

@ -38,6 +38,7 @@ The full list of supported hooks is best found in code: [hooks.go](https://githu
- `gh-ost-on-startup` - `gh-ost-on-startup`
- `gh-ost-on-validated` - `gh-ost-on-validated`
- `gh-ost-on-resurrecting`
- `gh-ost-on-rowcount-complete` - `gh-ost-on-rowcount-complete`
- `gh-ost-on-before-row-copy` - `gh-ost-on-before-row-copy`
- `gh-ost-on-status` - `gh-ost-on-status`

42
doc/resurrect.md Normal file
View File

@ -0,0 +1,42 @@
# Resurrection
`gh-ost` supports resurrection of a failed migration, continuing the migration from last known good position, potentially saving hours of clock-time.
A migration may fail as follows:
- On meeting with `--critical-load`
- On successively meeting with a specific error (e.g. recurring locks)
- Being `kill -9`'d by a user
- MySQL crash
- Server crash
- Robots taking over the world and other reasons.
### --resurrect
One may resurrect such a migration by running the exact same command, adding the `--resurrect` flag.
The terms for resurrection are:
- Exact same database/table/alter
- Previous migration ran for at least one minute
- Previous migration began looking at row-copy and event handling (by `1` minute of execution you may expect this to be the case)
### How does it work?
`gh-ost` dumps its migration status (context) once per minute, onto the _changelog table_. The changelog table is used for internal bookkeeping, and manages heartbeat and internal message passing.
When `--resurrect` is provided,`gh-ost` attempts to find such status dump in the changelog table. Most interestingly this status included:
- Last handled binlog event coordinates (any event up to that point has been applied to _ghost_ table)
- Last copied chunk range
- Other useful information
Resurrection reconnects the streamer at last handled binlog coordinates, and skips rowcopy to proceed from last copied chunk range.
Noteworthy is that it is not important to resume from _exact same_ coordinates and chunk as last applied; the context dump only runs once per minute, and resurrection may re-apply a minute's worth of binary logs, and re-iterate a minute's work of copied chunks.
Row-based replication has the property of being idempotent for DML events. There is no damage in reapplying contiguous binlog events starting at some point in the past.
Chunk-reiteration likewise poses no integrity concern and there is no harm in re-copying same range of rows.
The only concern is to never skip binlog events, and never skip a row range. By virtue of only dumping events and ranges that have been applied, and by virtue of only processing binlog events and chunks moving forward, `gh-ost` keeps integrity intact.

View File

@ -6,7 +6,9 @@
package base package base
import ( import (
"encoding/json"
"fmt" "fmt"
"io"
"os" "os"
"regexp" "regexp"
"strings" "strings"
@ -81,14 +83,15 @@ type MigrationContext struct {
SkipRenamedColumns bool SkipRenamedColumns bool
IsTungsten bool IsTungsten bool
DiscardForeignKeys bool DiscardForeignKeys bool
Resurrect bool
config ContextConfig config ContextConfig
configMutex *sync.Mutex configMutex *sync.Mutex
ConfigFile string ConfigFile string
CliUser string CliUser string
CliPassword string cliPassword string
CliMasterUser string CliMasterUser string
CliMasterPassword string cliMasterPassword string
HeartbeatIntervalMilliseconds int64 HeartbeatIntervalMilliseconds int64
defaultNumRetries int64 defaultNumRetries int64
@ -161,7 +164,7 @@ type MigrationContext struct {
UserCommandedUnpostponeFlag int64 UserCommandedUnpostponeFlag int64
CutOverCompleteFlag int64 CutOverCompleteFlag int64
InCutOverCriticalSectionFlag int64 InCutOverCriticalSectionFlag int64
PanicAbort chan error IsResurrected int64
OriginalTableColumnsOnApplier *sql.ColumnList OriginalTableColumnsOnApplier *sql.ColumnList
OriginalTableColumns *sql.ColumnList OriginalTableColumns *sql.ColumnList
@ -177,8 +180,8 @@ type MigrationContext struct {
Iteration int64 Iteration int64
MigrationIterationRangeMinValues *sql.ColumnValues MigrationIterationRangeMinValues *sql.ColumnValues
MigrationIterationRangeMaxValues *sql.ColumnValues MigrationIterationRangeMaxValues *sql.ColumnValues
EncodedRangeValues map[string]string
CanStopStreaming func() bool AppliedBinlogCoordinates mysql.BinlogCoordinates
} }
type ContextConfig struct { type ContextConfig struct {
@ -197,10 +200,10 @@ type ContextConfig struct {
var context *MigrationContext var context *MigrationContext
func init() { func init() {
context = newMigrationContext() context = NewMigrationContext()
} }
func newMigrationContext() *MigrationContext { func NewMigrationContext() *MigrationContext {
return &MigrationContext{ return &MigrationContext{
defaultNumRetries: 60, defaultNumRetries: 60,
ChunkSize: 1000, ChunkSize: 1000,
@ -214,8 +217,9 @@ func newMigrationContext() *MigrationContext {
throttleControlReplicaKeys: mysql.NewInstanceKeyMap(), throttleControlReplicaKeys: mysql.NewInstanceKeyMap(),
configMutex: &sync.Mutex{}, configMutex: &sync.Mutex{},
pointOfInterestTimeMutex: &sync.Mutex{}, pointOfInterestTimeMutex: &sync.Mutex{},
AppliedBinlogCoordinates: mysql.BinlogCoordinates{},
ColumnRenameMap: make(map[string]string), ColumnRenameMap: make(map[string]string),
PanicAbort: make(chan error), EncodedRangeValues: make(map[string]string),
} }
} }
@ -224,6 +228,78 @@ func GetMigrationContext() *MigrationContext {
return context return context
} }
// DumpJSON exports this config to JSON string and writes it to file
func (this *MigrationContext) ToJSON() (string, error) {
this.throttleMutex.Lock()
defer this.throttleMutex.Unlock()
if this.MigrationRangeMinValues != nil {
this.EncodedRangeValues["MigrationRangeMinValues"], _ = this.MigrationRangeMinValues.ToBase64()
}
if this.MigrationRangeMaxValues != nil {
this.EncodedRangeValues["MigrationRangeMaxValues"], _ = this.MigrationRangeMaxValues.ToBase64()
}
if this.MigrationIterationRangeMinValues != nil {
this.EncodedRangeValues["MigrationIterationRangeMinValues"], _ = this.MigrationIterationRangeMinValues.ToBase64()
}
if this.MigrationIterationRangeMaxValues != nil {
this.EncodedRangeValues["MigrationIterationRangeMaxValues"], _ = this.MigrationIterationRangeMaxValues.ToBase64()
}
jsonBytes, err := json.Marshal(this)
if err != nil {
return "", err
}
return string(jsonBytes), nil
}
// LoadJSON treats given json as context-dump, and attempts to load this context's data.
func (this *MigrationContext) LoadJSON(jsonString string) error {
this.throttleMutex.Lock()
defer this.throttleMutex.Unlock()
jsonBytes := []byte(jsonString)
if err := json.Unmarshal(jsonBytes, this); err != nil && err != io.EOF {
return err
}
var err error
if this.MigrationRangeMinValues, err = sql.NewColumnValuesFromBase64(this.EncodedRangeValues["MigrationRangeMinValues"]); err != nil {
return err
}
if this.MigrationRangeMaxValues, err = sql.NewColumnValuesFromBase64(this.EncodedRangeValues["MigrationRangeMaxValues"]); err != nil {
return err
}
if this.MigrationIterationRangeMinValues, err = sql.NewColumnValuesFromBase64(this.EncodedRangeValues["MigrationIterationRangeMinValues"]); err != nil {
return err
}
if this.MigrationIterationRangeMaxValues, err = sql.NewColumnValuesFromBase64(this.EncodedRangeValues["MigrationIterationRangeMaxValues"]); err != nil {
return err
}
return nil
}
// ApplyResurrectedContext loads resurrection-related infor from given context
func (this *MigrationContext) ApplyResurrectedContext(other *MigrationContext) {
// this.MigrationRangeMinValues = other.MigrationRangeMinValues
// this.MigrationRangeMaxValues = other.MigrationRangeMaxValues
if other.MigrationIterationRangeMinValues != nil {
this.MigrationIterationRangeMinValues = other.MigrationIterationRangeMinValues
}
if other.MigrationIterationRangeMaxValues != nil {
this.MigrationIterationRangeMaxValues = other.MigrationIterationRangeMaxValues
}
this.RowsEstimate = other.RowsEstimate
this.RowsDeltaEstimate = other.RowsDeltaEstimate
this.TotalRowsCopied = other.TotalRowsCopied
this.TotalDMLEventsApplied = other.TotalDMLEventsApplied
this.Iteration = other.Iteration
this.AppliedBinlogCoordinates = other.AppliedBinlogCoordinates
}
// GetGhostTableName generates the name of ghost table, based on original table name // GetGhostTableName generates the name of ghost table, based on original table name
func (this *MigrationContext) GetGhostTableName() string { func (this *MigrationContext) GetGhostTableName() string {
return fmt.Sprintf("_%s_gho", this.OriginalTableName) return fmt.Sprintf("_%s_gho", this.OriginalTableName)
@ -232,10 +308,10 @@ func (this *MigrationContext) GetGhostTableName() string {
// GetOldTableName generates the name of the "old" table, into which the original table is renamed. // GetOldTableName generates the name of the "old" table, into which the original table is renamed.
func (this *MigrationContext) GetOldTableName() string { func (this *MigrationContext) GetOldTableName() string {
if this.TestOnReplica { if this.TestOnReplica {
return fmt.Sprintf("_%s_ght", this.OriginalTableName) return fmt.Sprintf("_%s_delr", this.OriginalTableName)
} }
if this.MigrateOnReplica { if this.MigrateOnReplica {
return fmt.Sprintf("_%s_ghr", this.OriginalTableName) return fmt.Sprintf("_%s_delr", this.OriginalTableName)
} }
return fmt.Sprintf("_%s_del", this.OriginalTableName) return fmt.Sprintf("_%s_del", this.OriginalTableName)
} }
@ -524,6 +600,13 @@ func (this *MigrationContext) SetNiceRatio(newRatio float64) {
this.niceRatio = newRatio this.niceRatio = newRatio
} }
func (this *MigrationContext) SetAppliedBinlogCoordinates(binlogCoordinates *mysql.BinlogCoordinates) {
this.throttleMutex.Lock()
defer this.throttleMutex.Unlock()
this.AppliedBinlogCoordinates = *binlogCoordinates
}
// ReadMaxLoad parses the `--max-load` flag, which is in multiple key-value format, // ReadMaxLoad parses the `--max-load` flag, which is in multiple key-value format,
// such as: 'Threads_running=100,Threads_connected=500' // such as: 'Threads_running=100,Threads_connected=500'
// It only applies changes in case there's no parsing error. // It only applies changes in case there's no parsing error.
@ -598,6 +681,18 @@ func (this *MigrationContext) AddThrottleControlReplicaKey(key mysql.InstanceKey
return nil return nil
} }
func (this *MigrationContext) SetCliPassword(password string) {
this.cliPassword = password
}
func (this *MigrationContext) SetCliMasterPassword(password string) {
this.cliMasterPassword = password
}
func (this *MigrationContext) GetCliMasterPassword() string {
return this.cliMasterPassword
}
// ApplyCredentials sorts out the credentials between the config file and the CLI flags // ApplyCredentials sorts out the credentials between the config file and the CLI flags
func (this *MigrationContext) ApplyCredentials() { func (this *MigrationContext) ApplyCredentials() {
this.configMutex.Lock() this.configMutex.Lock()
@ -613,9 +708,9 @@ func (this *MigrationContext) ApplyCredentials() {
if this.config.Client.Password != "" { if this.config.Client.Password != "" {
this.InspectorConnectionConfig.Password = this.config.Client.Password this.InspectorConnectionConfig.Password = this.config.Client.Password
} }
if this.CliPassword != "" { if this.cliPassword != "" {
// Override // Override
this.InspectorConnectionConfig.Password = this.CliPassword this.InspectorConnectionConfig.Password = this.cliPassword
} }
} }

55
go/base/context_test.go Normal file
View File

@ -0,0 +1,55 @@
/*
Copyright 2016 GitHub Inc.
See https://github.com/github/gh-ost/blob/master/LICENSE
*/
package base
import (
"io"
"testing"
"github.com/outbrain/golib/log"
test "github.com/outbrain/golib/tests"
"github.com/github/gh-ost/go/mysql"
"github.com/github/gh-ost/go/sql"
)
func init() {
log.SetLevel(log.ERROR)
}
func TestContextToJSON(t *testing.T) {
context := NewMigrationContext()
jsonString, err := context.ToJSON()
test.S(t).ExpectNil(err)
test.S(t).ExpectNotEquals(jsonString, "")
}
func TestContextLoadJSON(t *testing.T) {
var jsonString string
var err error
{
context := NewMigrationContext()
context.AppliedBinlogCoordinates = mysql.BinlogCoordinates{LogFile: "mysql-bin.012345", LogPos: 6789}
abstractValues := []interface{}{31, "2016-12-24 17:04:32"}
context.MigrationRangeMinValues = sql.ToColumnValues(abstractValues)
jsonString, err = context.ToJSON()
test.S(t).ExpectNil(err)
test.S(t).ExpectNotEquals(jsonString, "")
}
{
context := NewMigrationContext()
err = context.LoadJSON(jsonString)
test.S(t).ExpectEqualsAny(err, nil, io.EOF)
test.S(t).ExpectEquals(context.AppliedBinlogCoordinates, mysql.BinlogCoordinates{LogFile: "mysql-bin.012345", LogPos: 6789})
abstractValues := context.MigrationRangeMinValues.AbstractValues()
test.S(t).ExpectEquals(len(abstractValues), 2)
test.S(t).ExpectEquals(abstractValues[0], 31)
test.S(t).ExpectEquals(abstractValues[1], "2016-12-24 17:04:32")
}
}

View File

@ -49,15 +49,16 @@ func main() {
flag.StringVar(&migrationContext.AssumeMasterHostname, "assume-master-host", "", "(optional) explicitly tell gh-ost the identity of the master. Format: some.host.com[:port] This is useful in master-master setups where you wish to pick an explicit master, or in a tungsten-replicator where gh-ost is unabel to determine the master") flag.StringVar(&migrationContext.AssumeMasterHostname, "assume-master-host", "", "(optional) explicitly tell gh-ost the identity of the master. Format: some.host.com[:port] This is useful in master-master setups where you wish to pick an explicit master, or in a tungsten-replicator where gh-ost is unabel to determine the master")
flag.IntVar(&migrationContext.InspectorConnectionConfig.Key.Port, "port", 3306, "MySQL port (preferably a replica, not the master)") flag.IntVar(&migrationContext.InspectorConnectionConfig.Key.Port, "port", 3306, "MySQL port (preferably a replica, not the master)")
flag.StringVar(&migrationContext.CliUser, "user", "", "MySQL user") flag.StringVar(&migrationContext.CliUser, "user", "", "MySQL user")
flag.StringVar(&migrationContext.CliPassword, "password", "", "MySQL password") cliPassword := flag.String("password", "", "MySQL password")
flag.StringVar(&migrationContext.CliMasterUser, "master-user", "", "MySQL user on master, if different from that on replica. Requires --assume-master-host") flag.StringVar(&migrationContext.CliMasterUser, "master-user", "", "MySQL user on master, if different from that on replica. Requires --assume-master-host")
flag.StringVar(&migrationContext.CliMasterPassword, "master-password", "", "MySQL password on master, if different from that on replica. Requires --assume-master-host") cliMasterPassword := flag.String("master-password", "", "MySQL password on master, if different from that on replica. Requires --assume-master-host")
flag.StringVar(&migrationContext.ConfigFile, "conf", "", "Config file") flag.StringVar(&migrationContext.ConfigFile, "conf", "", "Config file")
askPass := flag.Bool("ask-pass", false, "prompt for MySQL password") askPass := flag.Bool("ask-pass", false, "prompt for MySQL password")
flag.StringVar(&migrationContext.DatabaseName, "database", "", "database name (mandatory)") flag.StringVar(&migrationContext.DatabaseName, "database", "", "database name (mandatory)")
flag.StringVar(&migrationContext.OriginalTableName, "table", "", "table name (mandatory)") flag.StringVar(&migrationContext.OriginalTableName, "table", "", "table name (mandatory)")
flag.StringVar(&migrationContext.AlterStatement, "alter", "", "alter statement (mandatory)") flag.StringVar(&migrationContext.AlterStatement, "alter", "", "alter statement (mandatory)")
flag.BoolVar(&migrationContext.CountTableRows, "exact-rowcount", false, "actually count table rows as opposed to estimate them (results in more accurate progress estimation)") flag.BoolVar(&migrationContext.CountTableRows, "exact-rowcount", false, "actually count table rows as opposed to estimate them (results in more accurate progress estimation)")
flag.BoolVar(&migrationContext.ConcurrentCountTableRows, "concurrent-rowcount", true, "(with --exact-rowcount), when true (default): count rows after row-copy begins, concurrently, and adjust row estimate later on; when false: first count rows, then start row copy") flag.BoolVar(&migrationContext.ConcurrentCountTableRows, "concurrent-rowcount", true, "(with --exact-rowcount), when true (default): count rows after row-copy begins, concurrently, and adjust row estimate later on; when false: first count rows, then start row copy")
flag.BoolVar(&migrationContext.AllowedRunningOnMaster, "allow-on-master", false, "allow this migration to run directly on master. Preferably it would run on a replica") flag.BoolVar(&migrationContext.AllowedRunningOnMaster, "allow-on-master", false, "allow this migration to run directly on master. Preferably it would run on a replica")
@ -68,6 +69,7 @@ func main() {
flag.BoolVar(&migrationContext.IsTungsten, "tungsten", false, "explicitly let gh-ost know that you are running on a tungsten-replication based topology (you are likely to also provide --assume-master-host)") flag.BoolVar(&migrationContext.IsTungsten, "tungsten", false, "explicitly let gh-ost know that you are running on a tungsten-replication based topology (you are likely to also provide --assume-master-host)")
flag.BoolVar(&migrationContext.DiscardForeignKeys, "discard-foreign-keys", false, "DANGER! This flag will migrate a table that has foreign keys and will NOT create foreign keys on the ghost table, thus your altered table will have NO foreign keys. This is useful for intentional dropping of foreign keys") flag.BoolVar(&migrationContext.DiscardForeignKeys, "discard-foreign-keys", false, "DANGER! This flag will migrate a table that has foreign keys and will NOT create foreign keys on the ghost table, thus your altered table will have NO foreign keys. This is useful for intentional dropping of foreign keys")
flag.BoolVar(&migrationContext.SkipForeignKeyChecks, "skip-foreign-key-checks", false, "set to 'true' when you know for certain there are no foreign keys on your table, and wish to skip the time it takes for gh-ost to verify that") flag.BoolVar(&migrationContext.SkipForeignKeyChecks, "skip-foreign-key-checks", false, "set to 'true' when you know for certain there are no foreign keys on your table, and wish to skip the time it takes for gh-ost to verify that")
flag.BoolVar(&migrationContext.Resurrect, "resurrect", false, "resume previously crashed migration")
executeFlag := flag.Bool("execute", false, "actually execute the alter & migrate the table. Default is noop: do some tests and exit") executeFlag := flag.Bool("execute", false, "actually execute the alter & migrate the table. Default is noop: do some tests and exit")
flag.BoolVar(&migrationContext.TestOnReplica, "test-on-replica", false, "Have the migration run on a replica, not on the master. At the end of migration replication is stopped, and tables are swapped and immediately swap-revert. Replication remains stopped and you can compare the two tables for building trust") flag.BoolVar(&migrationContext.TestOnReplica, "test-on-replica", false, "Have the migration run on a replica, not on the master. At the end of migration replication is stopped, and tables are swapped and immediately swap-revert. Replication remains stopped and you can compare the two tables for building trust")
@ -182,9 +184,17 @@ func main() {
if migrationContext.CliMasterUser != "" && migrationContext.AssumeMasterHostname == "" { if migrationContext.CliMasterUser != "" && migrationContext.AssumeMasterHostname == "" {
log.Fatalf("--master-user requires --assume-master-host") log.Fatalf("--master-user requires --assume-master-host")
} }
if migrationContext.CliMasterPassword != "" && migrationContext.AssumeMasterHostname == "" { if *cliMasterPassword != "" && migrationContext.AssumeMasterHostname == "" {
log.Fatalf("--master-password requires --assume-master-host") log.Fatalf("--master-password requires --assume-master-host")
} }
if migrationContext.Resurrect && migrationContext.InitiallyDropGhostTable {
migrationContext.InitiallyDropGhostTable = false
log.Warningf("--resurrect given, implicitly disabling --initially-drop-ghost-table")
}
if migrationContext.Resurrect && migrationContext.InitiallyDropOldTable {
migrationContext.InitiallyDropOldTable = false
log.Warningf("--resurrect given, implicitly disabling --initially-drop-old-table")
}
switch *cutOver { switch *cutOver {
case "atomic", "default", "": case "atomic", "default", "":
@ -209,13 +219,15 @@ func main() {
if migrationContext.ServeSocketFile == "" { if migrationContext.ServeSocketFile == "" {
migrationContext.ServeSocketFile = fmt.Sprintf("/tmp/gh-ost.%s.%s.sock", migrationContext.DatabaseName, migrationContext.OriginalTableName) migrationContext.ServeSocketFile = fmt.Sprintf("/tmp/gh-ost.%s.%s.sock", migrationContext.DatabaseName, migrationContext.OriginalTableName)
} }
migrationContext.SetCliPassword(*cliPassword)
migrationContext.SetCliMasterPassword(*cliMasterPassword)
if *askPass { if *askPass {
fmt.Println("Password:") fmt.Println("Password:")
bytePassword, err := terminal.ReadPassword(int(syscall.Stdin)) bytePassword, err := terminal.ReadPassword(int(syscall.Stdin))
if err != nil { if err != nil {
log.Fatale(err) log.Fatale(err)
} }
migrationContext.CliPassword = string(bytePassword) migrationContext.SetCliPassword(string(bytePassword))
} }
migrationContext.SetHeartbeatIntervalMilliseconds(*heartbeatIntervalMillis) migrationContext.SetHeartbeatIntervalMilliseconds(*heartbeatIntervalMillis)
migrationContext.SetNiceRatio(*niceRatio) migrationContext.SetNiceRatio(*niceRatio)

View File

@ -125,7 +125,20 @@ func (this *Applier) tableExists(tableName string) (tableFound bool) {
return (m != nil) return (m != nil)
} }
// ValidateOrDropExistingTables verifies ghost and changelog tables do not exist, // ValidateTablesForResurrection verifies ghost and changelog exist given resurrection request
func (this *Applier) ValidateTablesForResurrection() error {
ghostTableExists := this.tableExists(this.migrationContext.GetGhostTableName())
if !ghostTableExists {
return fmt.Errorf("--resurrect requested, but ghost table %s doesn't exist. Panicking.", this.migrationContext.GetGhostTableName())
}
changelogTableExists := this.tableExists(this.migrationContext.GetChangelogTableName())
if !changelogTableExists {
return fmt.Errorf("--resurrect requested, but changelog table %s doesn't exist. Panicking.", this.migrationContext.GetChangelogTableName())
}
return nil
}
// ValidateOrDropExistingTables verifies ghost and old tables do not exist,
// or attempts to drop them if instructed to. // or attempts to drop them if instructed to.
func (this *Applier) ValidateOrDropExistingTables() error { func (this *Applier) ValidateOrDropExistingTables() error {
if this.migrationContext.InitiallyDropGhostTable { if this.migrationContext.InitiallyDropGhostTable {
@ -195,7 +208,7 @@ func (this *Applier) CreateChangelogTable() error {
id bigint auto_increment, id bigint auto_increment,
last_update timestamp not null DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, last_update timestamp not null DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
hint varchar(64) charset ascii not null, hint varchar(64) charset ascii not null,
value varchar(255) charset ascii not null, value text charset ascii not null,
primary key(id), primary key(id),
unique key hint_uidx(hint) unique key hint_uidx(hint)
) auto_increment=256 ) auto_increment=256
@ -220,7 +233,7 @@ func (this *Applier) dropTable(tableName string) error {
sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(tableName), sql.EscapeName(tableName),
) )
log.Infof("Droppping table %s.%s", log.Infof("Dropping table %s.%s",
sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(tableName), sql.EscapeName(tableName),
) )
@ -257,6 +270,8 @@ func (this *Applier) WriteChangelog(hint, value string) (string, error) {
explicitId = 2 explicitId = 2
case "throttle": case "throttle":
explicitId = 3 explicitId = 3
case "context":
explicitId = 4
} }
query := fmt.Sprintf(` query := fmt.Sprintf(`
insert /* gh-ost */ into %s.%s insert /* gh-ost */ into %s.%s

View File

@ -20,6 +20,7 @@ import (
const ( const (
onStartup = "gh-ost-on-startup" onStartup = "gh-ost-on-startup"
onValidated = "gh-ost-on-validated" onValidated = "gh-ost-on-validated"
onResurrecting = "gh-ost-on-resurrecting"
onRowCountComplete = "gh-ost-on-rowcount-complete" onRowCountComplete = "gh-ost-on-rowcount-complete"
onBeforeRowCopy = "gh-ost-on-before-row-copy" onBeforeRowCopy = "gh-ost-on-before-row-copy"
onRowCopyComplete = "gh-ost-on-row-copy-complete" onRowCopyComplete = "gh-ost-on-row-copy-complete"
@ -112,6 +113,10 @@ func (this *HooksExecutor) onValidated() error {
return this.executeHooks(onValidated) return this.executeHooks(onValidated)
} }
func (this *HooksExecutor) onResurrecting() error {
return this.executeHooks(onResurrecting)
}
func (this *HooksExecutor) onRowCountComplete() error { func (this *HooksExecutor) onRowCountComplete() error {
return this.executeHooks(onRowCountComplete) return this.executeHooks(onRowCountComplete)
} }

View File

@ -124,7 +124,7 @@ func (this *Inspector) inspectOriginalAndGhostTables() (err error) {
return fmt.Errorf("No shared unique key can be found after ALTER! Bailing out") return fmt.Errorf("No shared unique key can be found after ALTER! Bailing out")
} }
this.migrationContext.UniqueKey = sharedUniqueKeys[0] this.migrationContext.UniqueKey = sharedUniqueKeys[0]
log.Infof("Chosen shared unique key is %s", this.migrationContext.UniqueKey.Name) log.Infof("Chosen shared unique key is %+v", this.migrationContext.UniqueKey)
if this.migrationContext.UniqueKey.HasNullable { if this.migrationContext.UniqueKey.HasNullable {
if this.migrationContext.NullableUniqueKeyAllowed { if this.migrationContext.NullableUniqueKeyAllowed {
log.Warningf("Chosen key (%s) has nullable columns. You have supplied with --allow-nullable-unique-key and so this migration proceeds. As long as there aren't NULL values in this key's column, migration should be fine. NULL values will corrupt migration's data", this.migrationContext.UniqueKey) log.Warningf("Chosen key (%s) has nullable columns. You have supplied with --allow-nullable-unique-key and so this migration proceeds. As long as there aren't NULL values in this key's column, migration should be fine. NULL values will corrupt migration's data", this.migrationContext.UniqueKey)
@ -680,18 +680,18 @@ func (this *Inspector) showCreateTable(tableName string) (createTableStatement s
} }
// readChangelogState reads changelog hints // readChangelogState reads changelog hints
func (this *Inspector) readChangelogState() (map[string]string, error) { func (this *Inspector) readChangelogState(hint string) (string, error) {
query := fmt.Sprintf(` query := fmt.Sprintf(`
select hint, value from %s.%s where id <= 255 select hint, value from %s.%s where hint = ? and id <= 255
`, `,
sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.GetChangelogTableName()), sql.EscapeName(this.migrationContext.GetChangelogTableName()),
) )
result := make(map[string]string) result := ""
err := sqlutils.QueryRowsMap(this.db, query, func(m sqlutils.RowMap) error { err := sqlutils.QueryRowsMap(this.db, query, func(m sqlutils.RowMap) error {
result[m.GetString("hint")] = m.GetString("value") result = m.GetString("value")
return nil return nil
}) }, hint)
return result, err return result, err
} }

View File

@ -31,6 +31,8 @@ const (
AllEventsUpToLockProcessed = "AllEventsUpToLockProcessed" AllEventsUpToLockProcessed = "AllEventsUpToLockProcessed"
) )
const contextDumpInterval time.Duration = 1 * time.Minute
func ReadChangelogState(s string) ChangelogState { func ReadChangelogState(s string) ChangelogState {
return ChangelogState(strings.Split(s, ":")[0]) return ChangelogState(strings.Split(s, ":")[0])
} }
@ -53,14 +55,15 @@ const (
// Migrator is the main schema migration flow manager. // Migrator is the main schema migration flow manager.
type Migrator struct { type Migrator struct {
parser *sql.Parser parser *sql.Parser
inspector *Inspector inspector *Inspector
applier *Applier applier *Applier
eventsStreamer *EventsStreamer eventsStreamer *EventsStreamer
server *Server server *Server
throttler *Throttler throttler *Throttler
hooksExecutor *HooksExecutor hooksExecutor *HooksExecutor
migrationContext *base.MigrationContext migrationContext *base.MigrationContext
resurrectedContext *base.MigrationContext
firstThrottlingCollected chan bool firstThrottlingCollected chan bool
ghostTableMigrated chan bool ghostTableMigrated chan bool
@ -73,7 +76,7 @@ type Migrator struct {
copyRowsQueue chan tableWriteFunc copyRowsQueue chan tableWriteFunc
applyEventsQueue chan tableWriteFunc applyEventsQueue chan tableWriteFunc
handledChangelogStates map[string]bool panicAbort chan error
} }
func NewMigrator() *Migrator { func NewMigrator() *Migrator {
@ -85,9 +88,10 @@ func NewMigrator() *Migrator {
rowCopyComplete: make(chan bool), rowCopyComplete: make(chan bool),
allEventsUpToLockProcessed: make(chan string), allEventsUpToLockProcessed: make(chan string),
copyRowsQueue: make(chan tableWriteFunc), copyRowsQueue: make(chan tableWriteFunc),
applyEventsQueue: make(chan tableWriteFunc, applyEventsQueueBuffer), applyEventsQueue: make(chan tableWriteFunc, applyEventsQueueBuffer),
handledChangelogStates: make(map[string]bool),
panicAbort: make(chan error),
} }
return migrator return migrator
} }
@ -147,7 +151,7 @@ func (this *Migrator) retryOperation(operation func() error, notFatalHint ...boo
// there's an error. Let's try again. // there's an error. Let's try again.
} }
if len(notFatalHint) == 0 { if len(notFatalHint) == 0 {
this.migrationContext.PanicAbort <- err this.panicAbort <- err
} }
return err return err
} }
@ -218,7 +222,7 @@ func (this *Migrator) onChangelogStateEvent(dmlEvent *binlog.BinlogDMLEvent) (er
// listenOnPanicAbort aborts on abort request // listenOnPanicAbort aborts on abort request
func (this *Migrator) listenOnPanicAbort() { func (this *Migrator) listenOnPanicAbort() {
err := <-this.migrationContext.PanicAbort err := <-this.panicAbort
log.Fatale(err) log.Fatale(err)
} }
@ -265,6 +269,70 @@ func (this *Migrator) countTableRows() (err error) {
return countRowsFunc() return countRowsFunc()
} }
func (this *Migrator) readResurrectedContext() error {
encodedContext, err := this.inspector.readChangelogState("context")
if err != nil {
return err
}
if encodedContext == "" {
return fmt.Errorf("No resurrect info found")
}
// Loading migration context to a temporary location:
this.resurrectedContext = base.NewMigrationContext()
if err := this.resurrectedContext.LoadJSON(encodedContext); err != nil && err != io.EOF {
return err
}
// Sanity: heuristically verify loaded context truly reflects our very own context (e.g. is this the same migration on the same table?)
if this.migrationContext.DatabaseName != this.resurrectedContext.DatabaseName {
return fmt.Errorf("Resurrection: given --database not identical to resurrected one. Bailing out")
}
if this.migrationContext.OriginalTableName != this.resurrectedContext.OriginalTableName {
return fmt.Errorf("Resurrection: given --table not identical to resurrected one. Bailing out")
}
if this.migrationContext.AlterStatement != this.resurrectedContext.AlterStatement {
return fmt.Errorf("Resurrection: given --alter statement not identical to resurrected one. Bailing out")
}
if this.resurrectedContext.AppliedBinlogCoordinates.IsEmpty() {
return fmt.Errorf("Resurrection: no applied binlog coordinates. Seems like the migration you're trying to resurrect crashed before applying a single binlog event. There's not enough info for resurrection, and not much point to it. Just run your migration again.")
}
return nil
}
func (this *Migrator) applyResurrectedContext() error {
this.migrationContext.ApplyResurrectedContext(this.resurrectedContext)
atomic.StoreInt64(&this.migrationContext.IsResurrected, 1)
if err := this.hooksExecutor.onResurrecting(); err != nil {
return err
}
log.Infof("Applied migration min values: [%s]", this.migrationContext.MigrationRangeMinValues)
log.Infof("Applied migration max values: [%s]", this.migrationContext.MigrationRangeMaxValues)
log.Infof("Applied migration iteration range min values: [%s]", this.migrationContext.MigrationIterationRangeMinValues)
log.Infof("Applied migration iteration range max values: [%s]", this.migrationContext.MigrationIterationRangeMaxValues)
return nil
}
func (this *Migrator) dumpResurrectContext() error {
if this.migrationContext.Resurrect && atomic.LoadInt64(&this.migrationContext.IsResurrected) == 0 {
// we're in the process of resurrecting; don't dump context, because it would overwrite
// the very context we want to resurrect by!
return nil
}
// we dump the context. Note that this operation works sequentially to any row copy or
// event handling. There is no concurrency issue here.
if jsonString, err := this.migrationContext.ToJSON(); err != nil {
return log.Errore(err)
} else {
this.applier.WriteChangelog("context", jsonString)
log.Debugf("Context dumped. Applied coordinates: %+v", this.migrationContext.AppliedBinlogCoordinates)
}
return nil
}
// Migrate executes the complete migration logic. This is *the* major gh-ost function. // Migrate executes the complete migration logic. This is *the* major gh-ost function.
func (this *Migrator) Migrate() (err error) { func (this *Migrator) Migrate() (err error) {
log.Infof("Migrating %s.%s", sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.OriginalTableName)) log.Infof("Migrating %s.%s", sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.OriginalTableName))
@ -290,6 +358,11 @@ func (this *Migrator) Migrate() (err error) {
if err := this.initiateInspector(); err != nil { if err := this.initiateInspector(); err != nil {
return err return err
} }
if this.migrationContext.Resurrect {
if err := this.readResurrectedContext(); err != nil {
return err
}
}
if err := this.initiateStreaming(); err != nil { if err := this.initiateStreaming(); err != nil {
return err return err
} }
@ -333,6 +406,12 @@ func (this *Migrator) Migrate() (err error) {
if err := this.hooksExecutor.onBeforeRowCopy(); err != nil { if err := this.hooksExecutor.onBeforeRowCopy(); err != nil {
return err return err
} }
if this.migrationContext.Resurrect {
if err := this.applyResurrectedContext(); err != nil {
return err
}
}
this.dumpResurrectContext()
go this.executeWriteFuncs() go this.executeWriteFuncs()
go this.iterateChunks() go this.iterateChunks()
this.migrationContext.MarkRowCopyStartTime() this.migrationContext.MarkRowCopyStartTime()
@ -608,7 +687,7 @@ func (this *Migrator) initiateServer() (err error) {
var f printStatusFunc = func(rule PrintStatusRule, writer io.Writer) { var f printStatusFunc = func(rule PrintStatusRule, writer io.Writer) {
this.printStatus(rule, writer) this.printStatus(rule, writer)
} }
this.server = NewServer(this.hooksExecutor, f) this.server = NewServer(this.hooksExecutor, f, this.panicAbort)
if err := this.server.BindSocketFile(); err != nil { if err := this.server.BindSocketFile(); err != nil {
return err return err
} }
@ -656,8 +735,8 @@ func (this *Migrator) initiateInspector() (err error) {
if this.migrationContext.CliMasterUser != "" { if this.migrationContext.CliMasterUser != "" {
this.migrationContext.ApplierConnectionConfig.User = this.migrationContext.CliMasterUser this.migrationContext.ApplierConnectionConfig.User = this.migrationContext.CliMasterUser
} }
if this.migrationContext.CliMasterPassword != "" { if this.migrationContext.GetCliMasterPassword() != "" {
this.migrationContext.ApplierConnectionConfig.Password = this.migrationContext.CliMasterPassword this.migrationContext.ApplierConnectionConfig.Password = this.migrationContext.GetCliMasterPassword()
} }
log.Infof("Master forced to be %+v", *this.migrationContext.ApplierConnectionConfig.ImpliedKey) log.Infof("Master forced to be %+v", *this.migrationContext.ApplierConnectionConfig.ImpliedKey)
} }
@ -886,15 +965,19 @@ func (this *Migrator) printStatus(rule PrintStatusRule, writers ...io.Writer) {
// initiateStreaming begins treaming of binary log events and registers listeners for such events // initiateStreaming begins treaming of binary log events and registers listeners for such events
func (this *Migrator) initiateStreaming() error { func (this *Migrator) initiateStreaming() error {
this.eventsStreamer = NewEventsStreamer() this.eventsStreamer = NewEventsStreamer()
if err := this.eventsStreamer.InitDBConnections(); err != nil { if err := this.eventsStreamer.InitDBConnections(this.resurrectedContext); err != nil {
return err return err
} }
this.eventsStreamer.AddListener( this.eventsStreamer.AddListener(
false, false,
this.migrationContext.DatabaseName, this.migrationContext.DatabaseName,
this.migrationContext.GetChangelogTableName(), this.migrationContext.GetChangelogTableName(),
func(dmlEvent *binlog.BinlogDMLEvent) error { func(dmlEvent *binlog.BinlogDMLEvent, coordinates *mysql.BinlogCoordinates) error {
return this.onChangelogStateEvent(dmlEvent) err := this.onChangelogStateEvent(dmlEvent)
if err == nil {
this.migrationContext.SetAppliedBinlogCoordinates(coordinates)
}
return err
}, },
) )
@ -902,7 +985,7 @@ func (this *Migrator) initiateStreaming() error {
log.Debugf("Beginning streaming") log.Debugf("Beginning streaming")
err := this.eventsStreamer.StreamEvents(this.canStopStreaming) err := this.eventsStreamer.StreamEvents(this.canStopStreaming)
if err != nil { if err != nil {
this.migrationContext.PanicAbort <- err this.panicAbort <- err
} }
log.Debugf("Done streaming") log.Debugf("Done streaming")
}() }()
@ -916,10 +999,14 @@ func (this *Migrator) addDMLEventsListener() error {
false, false,
this.migrationContext.DatabaseName, this.migrationContext.DatabaseName,
this.migrationContext.OriginalTableName, this.migrationContext.OriginalTableName,
func(dmlEvent *binlog.BinlogDMLEvent) error { func(dmlEvent *binlog.BinlogDMLEvent, coordinates *mysql.BinlogCoordinates) error {
// Create a task to apply the DML event; this will be execute by executeWriteFuncs() // Create a task to apply the DML event; this will be execute by executeWriteFuncs()
applyEventFunc := func() error { applyEventFunc := func() error {
return this.applier.ApplyDMLEventQuery(dmlEvent) err := this.applier.ApplyDMLEventQuery(dmlEvent)
if err == nil {
this.migrationContext.SetAppliedBinlogCoordinates(coordinates)
}
return err
} }
this.applyEventsQueue <- applyEventFunc this.applyEventsQueue <- applyEventFunc
return nil return nil
@ -930,7 +1017,7 @@ func (this *Migrator) addDMLEventsListener() error {
// initiateThrottler kicks in the throttling collection and the throttling checks. // initiateThrottler kicks in the throttling collection and the throttling checks.
func (this *Migrator) initiateThrottler() error { func (this *Migrator) initiateThrottler() error {
this.throttler = NewThrottler(this.applier, this.inspector) this.throttler = NewThrottler(this.applier, this.inspector, this.panicAbort)
go this.throttler.initiateThrottlerCollection(this.firstThrottlingCollected) go this.throttler.initiateThrottlerCollection(this.firstThrottlingCollected)
log.Infof("Waiting for first throttle metrics to be collected") log.Infof("Waiting for first throttle metrics to be collected")
@ -945,21 +1032,27 @@ func (this *Migrator) initiateApplier() error {
if err := this.applier.InitDBConnections(); err != nil { if err := this.applier.InitDBConnections(); err != nil {
return err return err
} }
if err := this.applier.ValidateOrDropExistingTables(); err != nil { if this.migrationContext.Resurrect {
return err if err := this.applier.ValidateTablesForResurrection(); err != nil {
} return err
if err := this.applier.CreateChangelogTable(); err != nil { }
log.Errorf("Unable to create changelog table, see further error details. Perhaps a previous migration failed without dropping the table? OR is there a running migration? Bailing out") } else {
return err // Normal operation, no resurrection
} if err := this.applier.ValidateOrDropExistingTables(); err != nil {
if err := this.applier.CreateGhostTable(); err != nil { return err
log.Errorf("Unable to create ghost table, see further error details. Perhaps a previous migration failed without dropping the table? Bailing out") }
return err if err := this.applier.CreateChangelogTable(); err != nil {
} log.Errorf("Unable to create changelog table, see further error details. Perhaps a previous migration failed without dropping the table? OR is there a running migration? Bailing out")
return err
if err := this.applier.AlterGhost(); err != nil { }
log.Errorf("Unable to ALTER ghost table, see further error details. Bailing out") if err := this.applier.CreateGhostTable(); err != nil {
return err log.Errorf("Unable to create ghost table, see further error details. Perhaps a previous migration failed without dropping the table? Bailing out")
return err
}
if err := this.applier.AlterGhost(); err != nil {
log.Errorf("Unable to ALTER ghost table, see further error details. Bailing out")
return err
}
} }
this.applier.WriteChangelogState(string(GhostTableMigrated)) this.applier.WriteChangelogState(string(GhostTableMigrated))
@ -1026,12 +1119,16 @@ func (this *Migrator) executeWriteFuncs() error {
log.Debugf("Noop operation; not really executing write funcs") log.Debugf("Noop operation; not really executing write funcs")
return nil return nil
} }
contextDumpTick := time.Tick(contextDumpInterval)
for { for {
this.throttler.throttle(nil) this.throttler.throttle(nil)
// We give higher priority to event processing, then secondary priority to // We give higher priority to event processing, then secondary priority to rowcopy
// rowcopy
select { select {
case <-contextDumpTick:
{
this.dumpResurrectContext()
}
case applyEventFunc := <-this.applyEventsQueue: case applyEventFunc := <-this.applyEventsQueue:
{ {
if err := this.retryOperation(applyEventFunc); err != nil { if err := this.retryOperation(applyEventFunc); err != nil {

View File

@ -28,13 +28,15 @@ type Server struct {
tcpListener net.Listener tcpListener net.Listener
hooksExecutor *HooksExecutor hooksExecutor *HooksExecutor
printStatus printStatusFunc printStatus printStatusFunc
panicAbort chan error
} }
func NewServer(hooksExecutor *HooksExecutor, printStatus printStatusFunc) *Server { func NewServer(hooksExecutor *HooksExecutor, printStatus printStatusFunc, panicAbort chan error) *Server {
return &Server{ return &Server{
migrationContext: base.GetMigrationContext(), migrationContext: base.GetMigrationContext(),
hooksExecutor: hooksExecutor, hooksExecutor: hooksExecutor,
printStatus: printStatus, printStatus: printStatus,
panicAbort: panicAbort,
} }
} }
@ -251,7 +253,7 @@ help # This message
case "panic": case "panic":
{ {
err := fmt.Errorf("User commanded 'panic'. I will now panic, without cleanup. PANIC!") err := fmt.Errorf("User commanded 'panic'. I will now panic, without cleanup. PANIC!")
this.migrationContext.PanicAbort <- err this.panicAbort <- err
return NoPrintStatusRule, err return NoPrintStatusRule, err
} }
default: default:

View File

@ -20,11 +20,13 @@ import (
"github.com/outbrain/golib/sqlutils" "github.com/outbrain/golib/sqlutils"
) )
type BinlogEventListenerFunc func(event *binlog.BinlogDMLEvent, coordinates *mysql.BinlogCoordinates) error
type BinlogEventListener struct { type BinlogEventListener struct {
async bool async bool
databaseName string databaseName string
tableName string tableName string
onDmlEvent func(event *binlog.BinlogDMLEvent) error onDmlEvent BinlogEventListenerFunc
} }
const ( const (
@ -57,7 +59,7 @@ func NewEventsStreamer() *EventsStreamer {
// AddListener registers a new listener for binlog events, on a per-table basis // AddListener registers a new listener for binlog events, on a per-table basis
func (this *EventsStreamer) AddListener( func (this *EventsStreamer) AddListener(
async bool, databaseName string, tableName string, onDmlEvent func(event *binlog.BinlogDMLEvent) error) (err error) { async bool, databaseName string, tableName string, onDmlEvent BinlogEventListenerFunc) (err error) {
this.listenersMutex.Lock() this.listenersMutex.Lock()
defer this.listenersMutex.Unlock() defer this.listenersMutex.Unlock()
@ -80,10 +82,11 @@ func (this *EventsStreamer) AddListener(
// notifyListeners will notify relevant listeners with given DML event. Only // notifyListeners will notify relevant listeners with given DML event. Only
// listeners registered for changes on the table on which the DML operates are notified. // listeners registered for changes on the table on which the DML operates are notified.
func (this *EventsStreamer) notifyListeners(binlogEvent *binlog.BinlogDMLEvent) { func (this *EventsStreamer) notifyListeners(binlogEntry *binlog.BinlogEntry) {
this.listenersMutex.Lock() this.listenersMutex.Lock()
defer this.listenersMutex.Unlock() defer this.listenersMutex.Unlock()
binlogEvent := binlogEntry.DmlEvent
for _, listener := range this.listeners { for _, listener := range this.listeners {
listener := listener listener := listener
if strings.ToLower(listener.databaseName) != strings.ToLower(binlogEvent.DatabaseName) { if strings.ToLower(listener.databaseName) != strings.ToLower(binlogEvent.DatabaseName) {
@ -94,15 +97,15 @@ func (this *EventsStreamer) notifyListeners(binlogEvent *binlog.BinlogDMLEvent)
} }
if listener.async { if listener.async {
go func() { go func() {
listener.onDmlEvent(binlogEvent) listener.onDmlEvent(binlogEvent, &binlogEntry.Coordinates)
}() }()
} else { } else {
listener.onDmlEvent(binlogEvent) listener.onDmlEvent(binlogEvent, &binlogEntry.Coordinates)
} }
} }
} }
func (this *EventsStreamer) InitDBConnections() (err error) { func (this *EventsStreamer) InitDBConnections(resurrectedContext *base.MigrationContext) (err error) {
EventsStreamerUri := this.connectionConfig.GetDBUri(this.migrationContext.DatabaseName) EventsStreamerUri := this.connectionConfig.GetDBUri(this.migrationContext.DatabaseName)
if this.db, _, err = sqlutils.GetDB(EventsStreamerUri); err != nil { if this.db, _, err = sqlutils.GetDB(EventsStreamerUri); err != nil {
return err return err
@ -110,8 +113,15 @@ func (this *EventsStreamer) InitDBConnections() (err error) {
if err := this.validateConnection(); err != nil { if err := this.validateConnection(); err != nil {
return err return err
} }
if err := this.readCurrentBinlogCoordinates(); err != nil { if this.migrationContext.Resurrect {
return err // Rewinding to beginning of logfile:
resurrectedContext.AppliedBinlogCoordinates.LogPos = 4
log.Infof("Resurrection: initiating streamer at resurrected coordinates %+v", resurrectedContext.AppliedBinlogCoordinates)
this.initialBinlogCoordinates = &resurrectedContext.AppliedBinlogCoordinates
} else {
if err := this.readCurrentBinlogCoordinates(); err != nil {
return err
}
} }
if err := this.initBinlogReader(this.initialBinlogCoordinates); err != nil { if err := this.initBinlogReader(this.initialBinlogCoordinates); err != nil {
return err return err
@ -184,7 +194,7 @@ func (this *EventsStreamer) StreamEvents(canStopStreaming func() bool) error {
go func() { go func() {
for binlogEntry := range this.eventsChannel { for binlogEntry := range this.eventsChannel {
if binlogEntry.DmlEvent != nil { if binlogEntry.DmlEvent != nil {
this.notifyListeners(binlogEntry.DmlEvent) this.notifyListeners(binlogEntry)
} }
} }
}() }()

View File

@ -21,13 +21,15 @@ type Throttler struct {
migrationContext *base.MigrationContext migrationContext *base.MigrationContext
applier *Applier applier *Applier
inspector *Inspector inspector *Inspector
panicAbort chan error
} }
func NewThrottler(applier *Applier, inspector *Inspector) *Throttler { func NewThrottler(applier *Applier, inspector *Inspector, panicAbort chan error) *Throttler {
return &Throttler{ return &Throttler{
migrationContext: base.GetMigrationContext(), migrationContext: base.GetMigrationContext(),
applier: applier, applier: applier,
inspector: inspector, inspector: inspector,
panicAbort: panicAbort,
} }
} }
@ -81,11 +83,9 @@ func (this *Throttler) collectHeartbeat() {
if atomic.LoadInt64(&this.migrationContext.CleanupImminentFlag) > 0 { if atomic.LoadInt64(&this.migrationContext.CleanupImminentFlag) > 0 {
return nil return nil
} }
changelogState, err := this.inspector.readChangelogState() if heartbeatValue, err := this.inspector.readChangelogState("heartbeat"); err != nil {
if err != nil {
return log.Errore(err) return log.Errore(err)
} } else {
if heartbeatValue, ok := changelogState["heartbeat"]; ok {
this.parseChangelogHeartbeat(heartbeatValue) this.parseChangelogHeartbeat(heartbeatValue)
} }
return nil return nil
@ -155,7 +155,7 @@ func (this *Throttler) collectGeneralThrottleMetrics() error {
// Regardless of throttle, we take opportunity to check for panic-abort // Regardless of throttle, we take opportunity to check for panic-abort
if this.migrationContext.PanicFlagFile != "" { if this.migrationContext.PanicFlagFile != "" {
if base.FileExists(this.migrationContext.PanicFlagFile) { if base.FileExists(this.migrationContext.PanicFlagFile) {
this.migrationContext.PanicAbort <- fmt.Errorf("Found panic-file %s. Aborting without cleanup", this.migrationContext.PanicFlagFile) this.panicAbort <- fmt.Errorf("Found panic-file %s. Aborting without cleanup", this.migrationContext.PanicFlagFile)
} }
} }
@ -164,7 +164,7 @@ func (this *Throttler) collectGeneralThrottleMetrics() error {
return setThrottle(true, fmt.Sprintf("%s %s", variableName, err), base.NoThrottleReasonHint) return setThrottle(true, fmt.Sprintf("%s %s", variableName, err), base.NoThrottleReasonHint)
} }
if criticalLoadMet && this.migrationContext.CriticalLoadIntervalMilliseconds == 0 { if criticalLoadMet && this.migrationContext.CriticalLoadIntervalMilliseconds == 0 {
this.migrationContext.PanicAbort <- fmt.Errorf("critical-load met: %s=%d, >=%d", variableName, value, threshold) this.panicAbort <- fmt.Errorf("critical-load met: %s=%d, >=%d", variableName, value, threshold)
} }
if criticalLoadMet && this.migrationContext.CriticalLoadIntervalMilliseconds > 0 { if criticalLoadMet && this.migrationContext.CriticalLoadIntervalMilliseconds > 0 {
log.Errorf("critical-load met once: %s=%d, >=%d. Will check again in %d millis", variableName, value, threshold, this.migrationContext.CriticalLoadIntervalMilliseconds) log.Errorf("critical-load met once: %s=%d, >=%d. Will check again in %d millis", variableName, value, threshold, this.migrationContext.CriticalLoadIntervalMilliseconds)
@ -172,7 +172,7 @@ func (this *Throttler) collectGeneralThrottleMetrics() error {
timer := time.NewTimer(time.Millisecond * time.Duration(this.migrationContext.CriticalLoadIntervalMilliseconds)) timer := time.NewTimer(time.Millisecond * time.Duration(this.migrationContext.CriticalLoadIntervalMilliseconds))
<-timer.C <-timer.C
if criticalLoadMetAgain, variableName, value, threshold, _ := this.criticalLoadIsMet(); criticalLoadMetAgain { if criticalLoadMetAgain, variableName, value, threshold, _ := this.criticalLoadIsMet(); criticalLoadMetAgain {
this.migrationContext.PanicAbort <- fmt.Errorf("critical-load met again after %d millis: %s=%d, >=%d", this.migrationContext.CriticalLoadIntervalMilliseconds, variableName, value, threshold) this.panicAbort <- fmt.Errorf("critical-load met again after %d millis: %s=%d, >=%d", this.migrationContext.CriticalLoadIntervalMilliseconds, variableName, value, threshold)
} }
}() }()
} }

View File

@ -6,6 +6,9 @@
package sql package sql
import ( import (
"bytes"
"encoding/base64"
"encoding/gob"
"fmt" "fmt"
"reflect" "reflect"
"strconv" "strconv"
@ -215,7 +218,7 @@ func (this *UniqueKey) String() string {
if this.IsAutoIncrement { if this.IsAutoIncrement {
description = fmt.Sprintf("%s (auto_increment)", description) description = fmt.Sprintf("%s (auto_increment)", description)
} }
return fmt.Sprintf("%s: %s; has nullable: %+v", description, this.Columns.Names(), this.HasNullable) return fmt.Sprintf("%s: %s; has nullable: %+v", description, this.Columns.String(), this.HasNullable)
} }
type ColumnValues struct { type ColumnValues struct {
@ -247,6 +250,32 @@ func ToColumnValues(abstractValues []interface{}) *ColumnValues {
return result return result
} }
func NewColumnValuesFromBase64(b64 string) (columnValues *ColumnValues, err error) {
var abstractValues []interface{}
b, err := base64.StdEncoding.DecodeString(b64)
if err != nil {
return nil, err
}
buff := bytes.Buffer{}
buff.Write(b)
decoder := gob.NewDecoder(&buff)
err = decoder.Decode(&abstractValues)
if err != nil {
return nil, err
}
return ToColumnValues(abstractValues), nil
}
func (this *ColumnValues) ToBase64() (b64 string, err error) {
buff := bytes.Buffer{}
encoder := gob.NewEncoder(&buff)
if err = encoder.Encode(this.abstractValues); err != nil {
return b64, err
}
return base64.StdEncoding.EncodeToString(buff.Bytes()), nil
}
func (this *ColumnValues) AbstractValues() []interface{} { func (this *ColumnValues) AbstractValues() []interface{} {
return this.abstractValues return this.abstractValues
} }

View File

@ -0,0 +1,5 @@
#!/bin/bash
# Sample hook file for gh-ost-on-resurrecting
echo "$(date) gh-ost-on-resurrecting: beginning resurrection on $GH_OST_DATABASE_NAME.$GH_OST_TABLE_NAME" >> /tmp/gh-ost.log