Compare commits
32 Commits
Author | SHA1 | Date | |
---|---|---|---|
|
856d0d487a | ||
|
0b6d834a2b | ||
|
7bdfd1bff5 | ||
|
24f5c6da62 | ||
|
e9e9d6d9da | ||
|
e4874c84bd | ||
|
90f61f812d | ||
|
738270aabe | ||
|
8952e24aba | ||
|
874cf24512 | ||
|
af74e8c6cd | ||
|
0e8e5de7aa | ||
|
7dfb740519 | ||
|
fa399e0608 | ||
|
45b63f6500 | ||
|
6128076485 | ||
|
e50361ab61 | ||
|
1080b11d81 | ||
|
89ca346919 | ||
|
5f25f741ad | ||
|
bad30a8871 | ||
|
47d8306c0f | ||
|
171cad2a98 | ||
|
c72851e1f6 | ||
|
4c6f42f2f1 | ||
|
6f81d62a31 | ||
|
3223a9389e | ||
|
5e0f38cc6f | ||
|
6999b4e8bf | ||
|
776c8d3b8b | ||
|
75b6f9edf2 | ||
|
66894d3a52 |
@ -30,6 +30,7 @@ In addition, it offers many [operational perks](doc/perks.md) that make it safer
|
||||
- Auditing: you may query `gh-ost` for status. `gh-ost` listens on unix socket or TCP.
|
||||
- Control over cut-over phase: `gh-ost` can be instructed to postpone what is probably the most critical step: the swap of tables, until such time that you're comfortably available. No need to worry about ETA being outside office hours.
|
||||
- External [hooks](doc/hooks.md) can couple `gh-ost` with your particular environment.
|
||||
- [Resurrection](doc/resurrect.md) can resume a failed migration, proceeding from last known good position.
|
||||
|
||||
Please refer to the [docs](doc) for more information. No, really, read the [docs](doc).
|
||||
|
||||
@ -76,19 +77,17 @@ But then a rare genetic mutation happened, and the `c` transformed into `t`. And
|
||||
|
||||
## Community
|
||||
|
||||
`gh-ost` is released at a stable state, but with mileage to go. We are [open to pull requests](https://github.com/github/gh-ost/blob/master/.github/CONTRIBUTING.md). Please first discuss your intentions via [Issues](https://github.com/github/gh-ost/issues).
|
||||
`gh-ost` is released at a stable state, and still with mileage to go. We are [open to pull requests](https://github.com/github/gh-ost/blob/master/.github/CONTRIBUTING.md). Please first discuss your intentions via [Issues](https://github.com/github/gh-ost/issues).
|
||||
|
||||
We develop `gh-ost` at GitHub and for the community. We may have different priorities than others. From time to time we may suggest a contribution that is not on our immediate roadmap but which may appeal to others.
|
||||
|
||||
## Download/binaries/source
|
||||
|
||||
`gh-ost` is now GA and stable.
|
||||
|
||||
`gh-ost` is available in binary format for Linux and Mac OS/X
|
||||
`gh-ost` is GA and stable, available in binary format for Linux and Mac OS/X
|
||||
|
||||
[Download latest release here](https://github.com/github/gh-ost/releases/latest)
|
||||
|
||||
`gh-ost` is a Go project; it is built with Go 1.5 with "experimental vendor". Soon to migrate to Go 1.6. See and use [build file](https://github.com/github/gh-ost/blob/master/build.sh) for compiling it on your own.
|
||||
`gh-ost` is a Go project; it is built with Go 1.7. See and use [build file](https://github.com/github/gh-ost/blob/master/build.sh) for compiling it on your own.
|
||||
|
||||
Generally speaking, `master` branch is stable, but only [releases](https://github.com/github/gh-ost/releases) are to be used in production.
|
||||
|
||||
|
@ -1 +1 @@
|
||||
1.0.32
|
||||
1.1.0
|
||||
|
@ -111,6 +111,14 @@ See also: [Sub-second replication lag throttling](subsecond-lag.md)
|
||||
|
||||
Typically `gh-ost` is used to migrate tables on a master. If you wish to only perform the migration in full on a replica, connect `gh-ost` to said replica and pass `--migrate-on-replica`. `gh-ost` will briefly connect to the master but other issue no changes on the master. Migration will be fully executed on the replica, while making sure to maintain a small replication lag.
|
||||
|
||||
### resurrect
|
||||
|
||||
It is possible to resurrect/resume a failed migration. Such a migration would be a valid execution, which bailed out throughout the migration process. A migration would bail out on meeting with `--critical-load`, or perhaps a user `kill -9`'d it.
|
||||
|
||||
Use `--resurrect` with exact same other flags (same `--database, --table, --alter`) to resume a failed migration.
|
||||
|
||||
Read more on [resurrection docs](resurrect.md)
|
||||
|
||||
### skip-foreign-key-checks
|
||||
|
||||
By default `gh-ost` verifies no foreign keys exist on the migrated table. On servers with large number of tables this check can take a long time. If you're absolutely certain no foreign keys exist (table does not referenece other table nor is referenced by other tables) and wish to save the check time, provide with `--skip-foreign-key-checks`.
|
||||
|
@ -38,6 +38,7 @@ The full list of supported hooks is best found in code: [hooks.go](https://githu
|
||||
|
||||
- `gh-ost-on-startup`
|
||||
- `gh-ost-on-validated`
|
||||
- `gh-ost-on-resurrecting`
|
||||
- `gh-ost-on-rowcount-complete`
|
||||
- `gh-ost-on-before-row-copy`
|
||||
- `gh-ost-on-status`
|
||||
|
42
doc/resurrect.md
Normal file
42
doc/resurrect.md
Normal file
@ -0,0 +1,42 @@
|
||||
# Resurrection
|
||||
|
||||
`gh-ost` supports resurrection of a failed migration, continuing the migration from last known good position, potentially saving hours of clock-time.
|
||||
|
||||
A migration may fail as follows:
|
||||
|
||||
- On meeting with `--critical-load`
|
||||
- On successively meeting with a specific error (e.g. recurring locks)
|
||||
- Being `kill -9`'d by a user
|
||||
- MySQL crash
|
||||
- Server crash
|
||||
- Robots taking over the world and other reasons.
|
||||
|
||||
### --resurrect
|
||||
|
||||
One may resurrect such a migration by running the exact same command, adding the `--resurrect` flag.
|
||||
|
||||
The terms for resurrection are:
|
||||
|
||||
- Exact same database/table/alter
|
||||
- Previous migration ran for at least one minute
|
||||
- Previous migration began looking at row-copy and event handling (by `1` minute of execution you may expect this to be the case)
|
||||
|
||||
### How does it work?
|
||||
|
||||
`gh-ost` dumps its migration status (context) once per minute, onto the _changelog table_. The changelog table is used for internal bookkeeping, and manages heartbeat and internal message passing.
|
||||
|
||||
When `--resurrect` is provided,`gh-ost` attempts to find such status dump in the changelog table. Most interestingly this status included:
|
||||
|
||||
- Last handled binlog event coordinates (any event up to that point has been applied to _ghost_ table)
|
||||
- Last copied chunk range
|
||||
- Other useful information
|
||||
|
||||
Resurrection reconnects the streamer at last handled binlog coordinates, and skips rowcopy to proceed from last copied chunk range.
|
||||
|
||||
Noteworthy is that it is not important to resume from _exact same_ coordinates and chunk as last applied; the context dump only runs once per minute, and resurrection may re-apply a minute's worth of binary logs, and re-iterate a minute's work of copied chunks.
|
||||
|
||||
Row-based replication has the property of being idempotent for DML events. There is no damage in reapplying contiguous binlog events starting at some point in the past.
|
||||
|
||||
Chunk-reiteration likewise poses no integrity concern and there is no harm in re-copying same range of rows.
|
||||
|
||||
The only concern is to never skip binlog events, and never skip a row range. By virtue of only dumping events and ranges that have been applied, and by virtue of only processing binlog events and chunks moving forward, `gh-ost` keeps integrity intact.
|
@ -6,7 +6,9 @@
|
||||
package base
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"regexp"
|
||||
"strings"
|
||||
@ -81,14 +83,15 @@ type MigrationContext struct {
|
||||
SkipRenamedColumns bool
|
||||
IsTungsten bool
|
||||
DiscardForeignKeys bool
|
||||
Resurrect bool
|
||||
|
||||
config ContextConfig
|
||||
configMutex *sync.Mutex
|
||||
ConfigFile string
|
||||
CliUser string
|
||||
CliPassword string
|
||||
cliPassword string
|
||||
CliMasterUser string
|
||||
CliMasterPassword string
|
||||
cliMasterPassword string
|
||||
|
||||
HeartbeatIntervalMilliseconds int64
|
||||
defaultNumRetries int64
|
||||
@ -161,7 +164,7 @@ type MigrationContext struct {
|
||||
UserCommandedUnpostponeFlag int64
|
||||
CutOverCompleteFlag int64
|
||||
InCutOverCriticalSectionFlag int64
|
||||
PanicAbort chan error
|
||||
IsResurrected int64
|
||||
|
||||
OriginalTableColumnsOnApplier *sql.ColumnList
|
||||
OriginalTableColumns *sql.ColumnList
|
||||
@ -177,8 +180,8 @@ type MigrationContext struct {
|
||||
Iteration int64
|
||||
MigrationIterationRangeMinValues *sql.ColumnValues
|
||||
MigrationIterationRangeMaxValues *sql.ColumnValues
|
||||
|
||||
CanStopStreaming func() bool
|
||||
EncodedRangeValues map[string]string
|
||||
AppliedBinlogCoordinates mysql.BinlogCoordinates
|
||||
}
|
||||
|
||||
type ContextConfig struct {
|
||||
@ -197,10 +200,10 @@ type ContextConfig struct {
|
||||
var context *MigrationContext
|
||||
|
||||
func init() {
|
||||
context = newMigrationContext()
|
||||
context = NewMigrationContext()
|
||||
}
|
||||
|
||||
func newMigrationContext() *MigrationContext {
|
||||
func NewMigrationContext() *MigrationContext {
|
||||
return &MigrationContext{
|
||||
defaultNumRetries: 60,
|
||||
ChunkSize: 1000,
|
||||
@ -214,8 +217,9 @@ func newMigrationContext() *MigrationContext {
|
||||
throttleControlReplicaKeys: mysql.NewInstanceKeyMap(),
|
||||
configMutex: &sync.Mutex{},
|
||||
pointOfInterestTimeMutex: &sync.Mutex{},
|
||||
AppliedBinlogCoordinates: mysql.BinlogCoordinates{},
|
||||
ColumnRenameMap: make(map[string]string),
|
||||
PanicAbort: make(chan error),
|
||||
EncodedRangeValues: make(map[string]string),
|
||||
}
|
||||
}
|
||||
|
||||
@ -224,6 +228,78 @@ func GetMigrationContext() *MigrationContext {
|
||||
return context
|
||||
}
|
||||
|
||||
// DumpJSON exports this config to JSON string and writes it to file
|
||||
func (this *MigrationContext) ToJSON() (string, error) {
|
||||
this.throttleMutex.Lock()
|
||||
defer this.throttleMutex.Unlock()
|
||||
|
||||
if this.MigrationRangeMinValues != nil {
|
||||
this.EncodedRangeValues["MigrationRangeMinValues"], _ = this.MigrationRangeMinValues.ToBase64()
|
||||
}
|
||||
if this.MigrationRangeMaxValues != nil {
|
||||
this.EncodedRangeValues["MigrationRangeMaxValues"], _ = this.MigrationRangeMaxValues.ToBase64()
|
||||
}
|
||||
if this.MigrationIterationRangeMinValues != nil {
|
||||
this.EncodedRangeValues["MigrationIterationRangeMinValues"], _ = this.MigrationIterationRangeMinValues.ToBase64()
|
||||
}
|
||||
if this.MigrationIterationRangeMaxValues != nil {
|
||||
this.EncodedRangeValues["MigrationIterationRangeMaxValues"], _ = this.MigrationIterationRangeMaxValues.ToBase64()
|
||||
}
|
||||
jsonBytes, err := json.Marshal(this)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return string(jsonBytes), nil
|
||||
}
|
||||
|
||||
// LoadJSON treats given json as context-dump, and attempts to load this context's data.
|
||||
func (this *MigrationContext) LoadJSON(jsonString string) error {
|
||||
this.throttleMutex.Lock()
|
||||
defer this.throttleMutex.Unlock()
|
||||
|
||||
jsonBytes := []byte(jsonString)
|
||||
|
||||
if err := json.Unmarshal(jsonBytes, this); err != nil && err != io.EOF {
|
||||
return err
|
||||
}
|
||||
|
||||
var err error
|
||||
if this.MigrationRangeMinValues, err = sql.NewColumnValuesFromBase64(this.EncodedRangeValues["MigrationRangeMinValues"]); err != nil {
|
||||
return err
|
||||
}
|
||||
if this.MigrationRangeMaxValues, err = sql.NewColumnValuesFromBase64(this.EncodedRangeValues["MigrationRangeMaxValues"]); err != nil {
|
||||
return err
|
||||
}
|
||||
if this.MigrationIterationRangeMinValues, err = sql.NewColumnValuesFromBase64(this.EncodedRangeValues["MigrationIterationRangeMinValues"]); err != nil {
|
||||
return err
|
||||
}
|
||||
if this.MigrationIterationRangeMaxValues, err = sql.NewColumnValuesFromBase64(this.EncodedRangeValues["MigrationIterationRangeMaxValues"]); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// ApplyResurrectedContext loads resurrection-related infor from given context
|
||||
func (this *MigrationContext) ApplyResurrectedContext(other *MigrationContext) {
|
||||
// this.MigrationRangeMinValues = other.MigrationRangeMinValues
|
||||
// this.MigrationRangeMaxValues = other.MigrationRangeMaxValues
|
||||
if other.MigrationIterationRangeMinValues != nil {
|
||||
this.MigrationIterationRangeMinValues = other.MigrationIterationRangeMinValues
|
||||
}
|
||||
if other.MigrationIterationRangeMaxValues != nil {
|
||||
this.MigrationIterationRangeMaxValues = other.MigrationIterationRangeMaxValues
|
||||
}
|
||||
|
||||
this.RowsEstimate = other.RowsEstimate
|
||||
this.RowsDeltaEstimate = other.RowsDeltaEstimate
|
||||
this.TotalRowsCopied = other.TotalRowsCopied
|
||||
this.TotalDMLEventsApplied = other.TotalDMLEventsApplied
|
||||
|
||||
this.Iteration = other.Iteration
|
||||
this.AppliedBinlogCoordinates = other.AppliedBinlogCoordinates
|
||||
}
|
||||
|
||||
// GetGhostTableName generates the name of ghost table, based on original table name
|
||||
func (this *MigrationContext) GetGhostTableName() string {
|
||||
return fmt.Sprintf("_%s_gho", this.OriginalTableName)
|
||||
@ -232,10 +308,10 @@ func (this *MigrationContext) GetGhostTableName() string {
|
||||
// GetOldTableName generates the name of the "old" table, into which the original table is renamed.
|
||||
func (this *MigrationContext) GetOldTableName() string {
|
||||
if this.TestOnReplica {
|
||||
return fmt.Sprintf("_%s_ght", this.OriginalTableName)
|
||||
return fmt.Sprintf("_%s_delr", this.OriginalTableName)
|
||||
}
|
||||
if this.MigrateOnReplica {
|
||||
return fmt.Sprintf("_%s_ghr", this.OriginalTableName)
|
||||
return fmt.Sprintf("_%s_delr", this.OriginalTableName)
|
||||
}
|
||||
return fmt.Sprintf("_%s_del", this.OriginalTableName)
|
||||
}
|
||||
@ -524,6 +600,13 @@ func (this *MigrationContext) SetNiceRatio(newRatio float64) {
|
||||
this.niceRatio = newRatio
|
||||
}
|
||||
|
||||
func (this *MigrationContext) SetAppliedBinlogCoordinates(binlogCoordinates *mysql.BinlogCoordinates) {
|
||||
this.throttleMutex.Lock()
|
||||
defer this.throttleMutex.Unlock()
|
||||
|
||||
this.AppliedBinlogCoordinates = *binlogCoordinates
|
||||
}
|
||||
|
||||
// ReadMaxLoad parses the `--max-load` flag, which is in multiple key-value format,
|
||||
// such as: 'Threads_running=100,Threads_connected=500'
|
||||
// It only applies changes in case there's no parsing error.
|
||||
@ -598,6 +681,18 @@ func (this *MigrationContext) AddThrottleControlReplicaKey(key mysql.InstanceKey
|
||||
return nil
|
||||
}
|
||||
|
||||
func (this *MigrationContext) SetCliPassword(password string) {
|
||||
this.cliPassword = password
|
||||
}
|
||||
|
||||
func (this *MigrationContext) SetCliMasterPassword(password string) {
|
||||
this.cliMasterPassword = password
|
||||
}
|
||||
|
||||
func (this *MigrationContext) GetCliMasterPassword() string {
|
||||
return this.cliMasterPassword
|
||||
}
|
||||
|
||||
// ApplyCredentials sorts out the credentials between the config file and the CLI flags
|
||||
func (this *MigrationContext) ApplyCredentials() {
|
||||
this.configMutex.Lock()
|
||||
@ -613,9 +708,9 @@ func (this *MigrationContext) ApplyCredentials() {
|
||||
if this.config.Client.Password != "" {
|
||||
this.InspectorConnectionConfig.Password = this.config.Client.Password
|
||||
}
|
||||
if this.CliPassword != "" {
|
||||
if this.cliPassword != "" {
|
||||
// Override
|
||||
this.InspectorConnectionConfig.Password = this.CliPassword
|
||||
this.InspectorConnectionConfig.Password = this.cliPassword
|
||||
}
|
||||
}
|
||||
|
||||
|
55
go/base/context_test.go
Normal file
55
go/base/context_test.go
Normal file
@ -0,0 +1,55 @@
|
||||
/*
|
||||
Copyright 2016 GitHub Inc.
|
||||
See https://github.com/github/gh-ost/blob/master/LICENSE
|
||||
*/
|
||||
|
||||
package base
|
||||
|
||||
import (
|
||||
"io"
|
||||
"testing"
|
||||
|
||||
"github.com/outbrain/golib/log"
|
||||
test "github.com/outbrain/golib/tests"
|
||||
|
||||
"github.com/github/gh-ost/go/mysql"
|
||||
"github.com/github/gh-ost/go/sql"
|
||||
)
|
||||
|
||||
func init() {
|
||||
log.SetLevel(log.ERROR)
|
||||
}
|
||||
|
||||
func TestContextToJSON(t *testing.T) {
|
||||
context := NewMigrationContext()
|
||||
jsonString, err := context.ToJSON()
|
||||
test.S(t).ExpectNil(err)
|
||||
test.S(t).ExpectNotEquals(jsonString, "")
|
||||
}
|
||||
|
||||
func TestContextLoadJSON(t *testing.T) {
|
||||
var jsonString string
|
||||
var err error
|
||||
{
|
||||
context := NewMigrationContext()
|
||||
context.AppliedBinlogCoordinates = mysql.BinlogCoordinates{LogFile: "mysql-bin.012345", LogPos: 6789}
|
||||
|
||||
abstractValues := []interface{}{31, "2016-12-24 17:04:32"}
|
||||
context.MigrationRangeMinValues = sql.ToColumnValues(abstractValues)
|
||||
|
||||
jsonString, err = context.ToJSON()
|
||||
test.S(t).ExpectNil(err)
|
||||
test.S(t).ExpectNotEquals(jsonString, "")
|
||||
}
|
||||
{
|
||||
context := NewMigrationContext()
|
||||
err = context.LoadJSON(jsonString)
|
||||
test.S(t).ExpectEqualsAny(err, nil, io.EOF)
|
||||
test.S(t).ExpectEquals(context.AppliedBinlogCoordinates, mysql.BinlogCoordinates{LogFile: "mysql-bin.012345", LogPos: 6789})
|
||||
|
||||
abstractValues := context.MigrationRangeMinValues.AbstractValues()
|
||||
test.S(t).ExpectEquals(len(abstractValues), 2)
|
||||
test.S(t).ExpectEquals(abstractValues[0], 31)
|
||||
test.S(t).ExpectEquals(abstractValues[1], "2016-12-24 17:04:32")
|
||||
}
|
||||
}
|
@ -49,15 +49,16 @@ func main() {
|
||||
flag.StringVar(&migrationContext.AssumeMasterHostname, "assume-master-host", "", "(optional) explicitly tell gh-ost the identity of the master. Format: some.host.com[:port] This is useful in master-master setups where you wish to pick an explicit master, or in a tungsten-replicator where gh-ost is unabel to determine the master")
|
||||
flag.IntVar(&migrationContext.InspectorConnectionConfig.Key.Port, "port", 3306, "MySQL port (preferably a replica, not the master)")
|
||||
flag.StringVar(&migrationContext.CliUser, "user", "", "MySQL user")
|
||||
flag.StringVar(&migrationContext.CliPassword, "password", "", "MySQL password")
|
||||
cliPassword := flag.String("password", "", "MySQL password")
|
||||
flag.StringVar(&migrationContext.CliMasterUser, "master-user", "", "MySQL user on master, if different from that on replica. Requires --assume-master-host")
|
||||
flag.StringVar(&migrationContext.CliMasterPassword, "master-password", "", "MySQL password on master, if different from that on replica. Requires --assume-master-host")
|
||||
cliMasterPassword := flag.String("master-password", "", "MySQL password on master, if different from that on replica. Requires --assume-master-host")
|
||||
flag.StringVar(&migrationContext.ConfigFile, "conf", "", "Config file")
|
||||
askPass := flag.Bool("ask-pass", false, "prompt for MySQL password")
|
||||
|
||||
flag.StringVar(&migrationContext.DatabaseName, "database", "", "database name (mandatory)")
|
||||
flag.StringVar(&migrationContext.OriginalTableName, "table", "", "table name (mandatory)")
|
||||
flag.StringVar(&migrationContext.AlterStatement, "alter", "", "alter statement (mandatory)")
|
||||
|
||||
flag.BoolVar(&migrationContext.CountTableRows, "exact-rowcount", false, "actually count table rows as opposed to estimate them (results in more accurate progress estimation)")
|
||||
flag.BoolVar(&migrationContext.ConcurrentCountTableRows, "concurrent-rowcount", true, "(with --exact-rowcount), when true (default): count rows after row-copy begins, concurrently, and adjust row estimate later on; when false: first count rows, then start row copy")
|
||||
flag.BoolVar(&migrationContext.AllowedRunningOnMaster, "allow-on-master", false, "allow this migration to run directly on master. Preferably it would run on a replica")
|
||||
@ -68,6 +69,7 @@ func main() {
|
||||
flag.BoolVar(&migrationContext.IsTungsten, "tungsten", false, "explicitly let gh-ost know that you are running on a tungsten-replication based topology (you are likely to also provide --assume-master-host)")
|
||||
flag.BoolVar(&migrationContext.DiscardForeignKeys, "discard-foreign-keys", false, "DANGER! This flag will migrate a table that has foreign keys and will NOT create foreign keys on the ghost table, thus your altered table will have NO foreign keys. This is useful for intentional dropping of foreign keys")
|
||||
flag.BoolVar(&migrationContext.SkipForeignKeyChecks, "skip-foreign-key-checks", false, "set to 'true' when you know for certain there are no foreign keys on your table, and wish to skip the time it takes for gh-ost to verify that")
|
||||
flag.BoolVar(&migrationContext.Resurrect, "resurrect", false, "resume previously crashed migration")
|
||||
|
||||
executeFlag := flag.Bool("execute", false, "actually execute the alter & migrate the table. Default is noop: do some tests and exit")
|
||||
flag.BoolVar(&migrationContext.TestOnReplica, "test-on-replica", false, "Have the migration run on a replica, not on the master. At the end of migration replication is stopped, and tables are swapped and immediately swap-revert. Replication remains stopped and you can compare the two tables for building trust")
|
||||
@ -182,9 +184,17 @@ func main() {
|
||||
if migrationContext.CliMasterUser != "" && migrationContext.AssumeMasterHostname == "" {
|
||||
log.Fatalf("--master-user requires --assume-master-host")
|
||||
}
|
||||
if migrationContext.CliMasterPassword != "" && migrationContext.AssumeMasterHostname == "" {
|
||||
if *cliMasterPassword != "" && migrationContext.AssumeMasterHostname == "" {
|
||||
log.Fatalf("--master-password requires --assume-master-host")
|
||||
}
|
||||
if migrationContext.Resurrect && migrationContext.InitiallyDropGhostTable {
|
||||
migrationContext.InitiallyDropGhostTable = false
|
||||
log.Warningf("--resurrect given, implicitly disabling --initially-drop-ghost-table")
|
||||
}
|
||||
if migrationContext.Resurrect && migrationContext.InitiallyDropOldTable {
|
||||
migrationContext.InitiallyDropOldTable = false
|
||||
log.Warningf("--resurrect given, implicitly disabling --initially-drop-old-table")
|
||||
}
|
||||
|
||||
switch *cutOver {
|
||||
case "atomic", "default", "":
|
||||
@ -209,13 +219,15 @@ func main() {
|
||||
if migrationContext.ServeSocketFile == "" {
|
||||
migrationContext.ServeSocketFile = fmt.Sprintf("/tmp/gh-ost.%s.%s.sock", migrationContext.DatabaseName, migrationContext.OriginalTableName)
|
||||
}
|
||||
migrationContext.SetCliPassword(*cliPassword)
|
||||
migrationContext.SetCliMasterPassword(*cliMasterPassword)
|
||||
if *askPass {
|
||||
fmt.Println("Password:")
|
||||
bytePassword, err := terminal.ReadPassword(int(syscall.Stdin))
|
||||
if err != nil {
|
||||
log.Fatale(err)
|
||||
}
|
||||
migrationContext.CliPassword = string(bytePassword)
|
||||
migrationContext.SetCliPassword(string(bytePassword))
|
||||
}
|
||||
migrationContext.SetHeartbeatIntervalMilliseconds(*heartbeatIntervalMillis)
|
||||
migrationContext.SetNiceRatio(*niceRatio)
|
||||
|
@ -125,7 +125,20 @@ func (this *Applier) tableExists(tableName string) (tableFound bool) {
|
||||
return (m != nil)
|
||||
}
|
||||
|
||||
// ValidateOrDropExistingTables verifies ghost and changelog tables do not exist,
|
||||
// ValidateTablesForResurrection verifies ghost and changelog exist given resurrection request
|
||||
func (this *Applier) ValidateTablesForResurrection() error {
|
||||
ghostTableExists := this.tableExists(this.migrationContext.GetGhostTableName())
|
||||
if !ghostTableExists {
|
||||
return fmt.Errorf("--resurrect requested, but ghost table %s doesn't exist. Panicking.", this.migrationContext.GetGhostTableName())
|
||||
}
|
||||
changelogTableExists := this.tableExists(this.migrationContext.GetChangelogTableName())
|
||||
if !changelogTableExists {
|
||||
return fmt.Errorf("--resurrect requested, but changelog table %s doesn't exist. Panicking.", this.migrationContext.GetChangelogTableName())
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// ValidateOrDropExistingTables verifies ghost and old tables do not exist,
|
||||
// or attempts to drop them if instructed to.
|
||||
func (this *Applier) ValidateOrDropExistingTables() error {
|
||||
if this.migrationContext.InitiallyDropGhostTable {
|
||||
@ -195,7 +208,7 @@ func (this *Applier) CreateChangelogTable() error {
|
||||
id bigint auto_increment,
|
||||
last_update timestamp not null DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
|
||||
hint varchar(64) charset ascii not null,
|
||||
value varchar(255) charset ascii not null,
|
||||
value text charset ascii not null,
|
||||
primary key(id),
|
||||
unique key hint_uidx(hint)
|
||||
) auto_increment=256
|
||||
@ -220,7 +233,7 @@ func (this *Applier) dropTable(tableName string) error {
|
||||
sql.EscapeName(this.migrationContext.DatabaseName),
|
||||
sql.EscapeName(tableName),
|
||||
)
|
||||
log.Infof("Droppping table %s.%s",
|
||||
log.Infof("Dropping table %s.%s",
|
||||
sql.EscapeName(this.migrationContext.DatabaseName),
|
||||
sql.EscapeName(tableName),
|
||||
)
|
||||
@ -257,6 +270,8 @@ func (this *Applier) WriteChangelog(hint, value string) (string, error) {
|
||||
explicitId = 2
|
||||
case "throttle":
|
||||
explicitId = 3
|
||||
case "context":
|
||||
explicitId = 4
|
||||
}
|
||||
query := fmt.Sprintf(`
|
||||
insert /* gh-ost */ into %s.%s
|
||||
|
@ -20,6 +20,7 @@ import (
|
||||
const (
|
||||
onStartup = "gh-ost-on-startup"
|
||||
onValidated = "gh-ost-on-validated"
|
||||
onResurrecting = "gh-ost-on-resurrecting"
|
||||
onRowCountComplete = "gh-ost-on-rowcount-complete"
|
||||
onBeforeRowCopy = "gh-ost-on-before-row-copy"
|
||||
onRowCopyComplete = "gh-ost-on-row-copy-complete"
|
||||
@ -112,6 +113,10 @@ func (this *HooksExecutor) onValidated() error {
|
||||
return this.executeHooks(onValidated)
|
||||
}
|
||||
|
||||
func (this *HooksExecutor) onResurrecting() error {
|
||||
return this.executeHooks(onResurrecting)
|
||||
}
|
||||
|
||||
func (this *HooksExecutor) onRowCountComplete() error {
|
||||
return this.executeHooks(onRowCountComplete)
|
||||
}
|
||||
|
@ -124,7 +124,7 @@ func (this *Inspector) inspectOriginalAndGhostTables() (err error) {
|
||||
return fmt.Errorf("No shared unique key can be found after ALTER! Bailing out")
|
||||
}
|
||||
this.migrationContext.UniqueKey = sharedUniqueKeys[0]
|
||||
log.Infof("Chosen shared unique key is %s", this.migrationContext.UniqueKey.Name)
|
||||
log.Infof("Chosen shared unique key is %+v", this.migrationContext.UniqueKey)
|
||||
if this.migrationContext.UniqueKey.HasNullable {
|
||||
if this.migrationContext.NullableUniqueKeyAllowed {
|
||||
log.Warningf("Chosen key (%s) has nullable columns. You have supplied with --allow-nullable-unique-key and so this migration proceeds. As long as there aren't NULL values in this key's column, migration should be fine. NULL values will corrupt migration's data", this.migrationContext.UniqueKey)
|
||||
@ -680,18 +680,18 @@ func (this *Inspector) showCreateTable(tableName string) (createTableStatement s
|
||||
}
|
||||
|
||||
// readChangelogState reads changelog hints
|
||||
func (this *Inspector) readChangelogState() (map[string]string, error) {
|
||||
func (this *Inspector) readChangelogState(hint string) (string, error) {
|
||||
query := fmt.Sprintf(`
|
||||
select hint, value from %s.%s where id <= 255
|
||||
select hint, value from %s.%s where hint = ? and id <= 255
|
||||
`,
|
||||
sql.EscapeName(this.migrationContext.DatabaseName),
|
||||
sql.EscapeName(this.migrationContext.GetChangelogTableName()),
|
||||
)
|
||||
result := make(map[string]string)
|
||||
result := ""
|
||||
err := sqlutils.QueryRowsMap(this.db, query, func(m sqlutils.RowMap) error {
|
||||
result[m.GetString("hint")] = m.GetString("value")
|
||||
result = m.GetString("value")
|
||||
return nil
|
||||
})
|
||||
}, hint)
|
||||
return result, err
|
||||
}
|
||||
|
||||
|
@ -31,6 +31,8 @@ const (
|
||||
AllEventsUpToLockProcessed = "AllEventsUpToLockProcessed"
|
||||
)
|
||||
|
||||
const contextDumpInterval time.Duration = 1 * time.Minute
|
||||
|
||||
func ReadChangelogState(s string) ChangelogState {
|
||||
return ChangelogState(strings.Split(s, ":")[0])
|
||||
}
|
||||
@ -53,14 +55,15 @@ const (
|
||||
|
||||
// Migrator is the main schema migration flow manager.
|
||||
type Migrator struct {
|
||||
parser *sql.Parser
|
||||
inspector *Inspector
|
||||
applier *Applier
|
||||
eventsStreamer *EventsStreamer
|
||||
server *Server
|
||||
throttler *Throttler
|
||||
hooksExecutor *HooksExecutor
|
||||
migrationContext *base.MigrationContext
|
||||
parser *sql.Parser
|
||||
inspector *Inspector
|
||||
applier *Applier
|
||||
eventsStreamer *EventsStreamer
|
||||
server *Server
|
||||
throttler *Throttler
|
||||
hooksExecutor *HooksExecutor
|
||||
migrationContext *base.MigrationContext
|
||||
resurrectedContext *base.MigrationContext
|
||||
|
||||
firstThrottlingCollected chan bool
|
||||
ghostTableMigrated chan bool
|
||||
@ -73,7 +76,7 @@ type Migrator struct {
|
||||
copyRowsQueue chan tableWriteFunc
|
||||
applyEventsQueue chan tableWriteFunc
|
||||
|
||||
handledChangelogStates map[string]bool
|
||||
panicAbort chan error
|
||||
}
|
||||
|
||||
func NewMigrator() *Migrator {
|
||||
@ -85,9 +88,10 @@ func NewMigrator() *Migrator {
|
||||
rowCopyComplete: make(chan bool),
|
||||
allEventsUpToLockProcessed: make(chan string),
|
||||
|
||||
copyRowsQueue: make(chan tableWriteFunc),
|
||||
applyEventsQueue: make(chan tableWriteFunc, applyEventsQueueBuffer),
|
||||
handledChangelogStates: make(map[string]bool),
|
||||
copyRowsQueue: make(chan tableWriteFunc),
|
||||
applyEventsQueue: make(chan tableWriteFunc, applyEventsQueueBuffer),
|
||||
|
||||
panicAbort: make(chan error),
|
||||
}
|
||||
return migrator
|
||||
}
|
||||
@ -147,7 +151,7 @@ func (this *Migrator) retryOperation(operation func() error, notFatalHint ...boo
|
||||
// there's an error. Let's try again.
|
||||
}
|
||||
if len(notFatalHint) == 0 {
|
||||
this.migrationContext.PanicAbort <- err
|
||||
this.panicAbort <- err
|
||||
}
|
||||
return err
|
||||
}
|
||||
@ -218,7 +222,7 @@ func (this *Migrator) onChangelogStateEvent(dmlEvent *binlog.BinlogDMLEvent) (er
|
||||
|
||||
// listenOnPanicAbort aborts on abort request
|
||||
func (this *Migrator) listenOnPanicAbort() {
|
||||
err := <-this.migrationContext.PanicAbort
|
||||
err := <-this.panicAbort
|
||||
log.Fatale(err)
|
||||
}
|
||||
|
||||
@ -265,6 +269,70 @@ func (this *Migrator) countTableRows() (err error) {
|
||||
return countRowsFunc()
|
||||
}
|
||||
|
||||
func (this *Migrator) readResurrectedContext() error {
|
||||
encodedContext, err := this.inspector.readChangelogState("context")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if encodedContext == "" {
|
||||
return fmt.Errorf("No resurrect info found")
|
||||
}
|
||||
|
||||
// Loading migration context to a temporary location:
|
||||
this.resurrectedContext = base.NewMigrationContext()
|
||||
if err := this.resurrectedContext.LoadJSON(encodedContext); err != nil && err != io.EOF {
|
||||
return err
|
||||
}
|
||||
// Sanity: heuristically verify loaded context truly reflects our very own context (e.g. is this the same migration on the same table?)
|
||||
if this.migrationContext.DatabaseName != this.resurrectedContext.DatabaseName {
|
||||
return fmt.Errorf("Resurrection: given --database not identical to resurrected one. Bailing out")
|
||||
}
|
||||
if this.migrationContext.OriginalTableName != this.resurrectedContext.OriginalTableName {
|
||||
return fmt.Errorf("Resurrection: given --table not identical to resurrected one. Bailing out")
|
||||
}
|
||||
if this.migrationContext.AlterStatement != this.resurrectedContext.AlterStatement {
|
||||
return fmt.Errorf("Resurrection: given --alter statement not identical to resurrected one. Bailing out")
|
||||
}
|
||||
if this.resurrectedContext.AppliedBinlogCoordinates.IsEmpty() {
|
||||
return fmt.Errorf("Resurrection: no applied binlog coordinates. Seems like the migration you're trying to resurrect crashed before applying a single binlog event. There's not enough info for resurrection, and not much point to it. Just run your migration again.")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (this *Migrator) applyResurrectedContext() error {
|
||||
this.migrationContext.ApplyResurrectedContext(this.resurrectedContext)
|
||||
atomic.StoreInt64(&this.migrationContext.IsResurrected, 1)
|
||||
|
||||
if err := this.hooksExecutor.onResurrecting(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
log.Infof("Applied migration min values: [%s]", this.migrationContext.MigrationRangeMinValues)
|
||||
log.Infof("Applied migration max values: [%s]", this.migrationContext.MigrationRangeMaxValues)
|
||||
log.Infof("Applied migration iteration range min values: [%s]", this.migrationContext.MigrationIterationRangeMinValues)
|
||||
log.Infof("Applied migration iteration range max values: [%s]", this.migrationContext.MigrationIterationRangeMaxValues)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (this *Migrator) dumpResurrectContext() error {
|
||||
if this.migrationContext.Resurrect && atomic.LoadInt64(&this.migrationContext.IsResurrected) == 0 {
|
||||
// we're in the process of resurrecting; don't dump context, because it would overwrite
|
||||
// the very context we want to resurrect by!
|
||||
return nil
|
||||
}
|
||||
|
||||
// we dump the context. Note that this operation works sequentially to any row copy or
|
||||
// event handling. There is no concurrency issue here.
|
||||
if jsonString, err := this.migrationContext.ToJSON(); err != nil {
|
||||
return log.Errore(err)
|
||||
} else {
|
||||
this.applier.WriteChangelog("context", jsonString)
|
||||
log.Debugf("Context dumped. Applied coordinates: %+v", this.migrationContext.AppliedBinlogCoordinates)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Migrate executes the complete migration logic. This is *the* major gh-ost function.
|
||||
func (this *Migrator) Migrate() (err error) {
|
||||
log.Infof("Migrating %s.%s", sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.OriginalTableName))
|
||||
@ -290,6 +358,11 @@ func (this *Migrator) Migrate() (err error) {
|
||||
if err := this.initiateInspector(); err != nil {
|
||||
return err
|
||||
}
|
||||
if this.migrationContext.Resurrect {
|
||||
if err := this.readResurrectedContext(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if err := this.initiateStreaming(); err != nil {
|
||||
return err
|
||||
}
|
||||
@ -333,6 +406,12 @@ func (this *Migrator) Migrate() (err error) {
|
||||
if err := this.hooksExecutor.onBeforeRowCopy(); err != nil {
|
||||
return err
|
||||
}
|
||||
if this.migrationContext.Resurrect {
|
||||
if err := this.applyResurrectedContext(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
this.dumpResurrectContext()
|
||||
go this.executeWriteFuncs()
|
||||
go this.iterateChunks()
|
||||
this.migrationContext.MarkRowCopyStartTime()
|
||||
@ -608,7 +687,7 @@ func (this *Migrator) initiateServer() (err error) {
|
||||
var f printStatusFunc = func(rule PrintStatusRule, writer io.Writer) {
|
||||
this.printStatus(rule, writer)
|
||||
}
|
||||
this.server = NewServer(this.hooksExecutor, f)
|
||||
this.server = NewServer(this.hooksExecutor, f, this.panicAbort)
|
||||
if err := this.server.BindSocketFile(); err != nil {
|
||||
return err
|
||||
}
|
||||
@ -656,8 +735,8 @@ func (this *Migrator) initiateInspector() (err error) {
|
||||
if this.migrationContext.CliMasterUser != "" {
|
||||
this.migrationContext.ApplierConnectionConfig.User = this.migrationContext.CliMasterUser
|
||||
}
|
||||
if this.migrationContext.CliMasterPassword != "" {
|
||||
this.migrationContext.ApplierConnectionConfig.Password = this.migrationContext.CliMasterPassword
|
||||
if this.migrationContext.GetCliMasterPassword() != "" {
|
||||
this.migrationContext.ApplierConnectionConfig.Password = this.migrationContext.GetCliMasterPassword()
|
||||
}
|
||||
log.Infof("Master forced to be %+v", *this.migrationContext.ApplierConnectionConfig.ImpliedKey)
|
||||
}
|
||||
@ -886,15 +965,19 @@ func (this *Migrator) printStatus(rule PrintStatusRule, writers ...io.Writer) {
|
||||
// initiateStreaming begins treaming of binary log events and registers listeners for such events
|
||||
func (this *Migrator) initiateStreaming() error {
|
||||
this.eventsStreamer = NewEventsStreamer()
|
||||
if err := this.eventsStreamer.InitDBConnections(); err != nil {
|
||||
if err := this.eventsStreamer.InitDBConnections(this.resurrectedContext); err != nil {
|
||||
return err
|
||||
}
|
||||
this.eventsStreamer.AddListener(
|
||||
false,
|
||||
this.migrationContext.DatabaseName,
|
||||
this.migrationContext.GetChangelogTableName(),
|
||||
func(dmlEvent *binlog.BinlogDMLEvent) error {
|
||||
return this.onChangelogStateEvent(dmlEvent)
|
||||
func(dmlEvent *binlog.BinlogDMLEvent, coordinates *mysql.BinlogCoordinates) error {
|
||||
err := this.onChangelogStateEvent(dmlEvent)
|
||||
if err == nil {
|
||||
this.migrationContext.SetAppliedBinlogCoordinates(coordinates)
|
||||
}
|
||||
return err
|
||||
},
|
||||
)
|
||||
|
||||
@ -902,7 +985,7 @@ func (this *Migrator) initiateStreaming() error {
|
||||
log.Debugf("Beginning streaming")
|
||||
err := this.eventsStreamer.StreamEvents(this.canStopStreaming)
|
||||
if err != nil {
|
||||
this.migrationContext.PanicAbort <- err
|
||||
this.panicAbort <- err
|
||||
}
|
||||
log.Debugf("Done streaming")
|
||||
}()
|
||||
@ -916,10 +999,14 @@ func (this *Migrator) addDMLEventsListener() error {
|
||||
false,
|
||||
this.migrationContext.DatabaseName,
|
||||
this.migrationContext.OriginalTableName,
|
||||
func(dmlEvent *binlog.BinlogDMLEvent) error {
|
||||
func(dmlEvent *binlog.BinlogDMLEvent, coordinates *mysql.BinlogCoordinates) error {
|
||||
// Create a task to apply the DML event; this will be execute by executeWriteFuncs()
|
||||
applyEventFunc := func() error {
|
||||
return this.applier.ApplyDMLEventQuery(dmlEvent)
|
||||
err := this.applier.ApplyDMLEventQuery(dmlEvent)
|
||||
if err == nil {
|
||||
this.migrationContext.SetAppliedBinlogCoordinates(coordinates)
|
||||
}
|
||||
return err
|
||||
}
|
||||
this.applyEventsQueue <- applyEventFunc
|
||||
return nil
|
||||
@ -930,7 +1017,7 @@ func (this *Migrator) addDMLEventsListener() error {
|
||||
|
||||
// initiateThrottler kicks in the throttling collection and the throttling checks.
|
||||
func (this *Migrator) initiateThrottler() error {
|
||||
this.throttler = NewThrottler(this.applier, this.inspector)
|
||||
this.throttler = NewThrottler(this.applier, this.inspector, this.panicAbort)
|
||||
|
||||
go this.throttler.initiateThrottlerCollection(this.firstThrottlingCollected)
|
||||
log.Infof("Waiting for first throttle metrics to be collected")
|
||||
@ -945,21 +1032,27 @@ func (this *Migrator) initiateApplier() error {
|
||||
if err := this.applier.InitDBConnections(); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := this.applier.ValidateOrDropExistingTables(); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := this.applier.CreateChangelogTable(); err != nil {
|
||||
log.Errorf("Unable to create changelog table, see further error details. Perhaps a previous migration failed without dropping the table? OR is there a running migration? Bailing out")
|
||||
return err
|
||||
}
|
||||
if err := this.applier.CreateGhostTable(); err != nil {
|
||||
log.Errorf("Unable to create ghost table, see further error details. Perhaps a previous migration failed without dropping the table? Bailing out")
|
||||
return err
|
||||
}
|
||||
|
||||
if err := this.applier.AlterGhost(); err != nil {
|
||||
log.Errorf("Unable to ALTER ghost table, see further error details. Bailing out")
|
||||
return err
|
||||
if this.migrationContext.Resurrect {
|
||||
if err := this.applier.ValidateTablesForResurrection(); err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
// Normal operation, no resurrection
|
||||
if err := this.applier.ValidateOrDropExistingTables(); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := this.applier.CreateChangelogTable(); err != nil {
|
||||
log.Errorf("Unable to create changelog table, see further error details. Perhaps a previous migration failed without dropping the table? OR is there a running migration? Bailing out")
|
||||
return err
|
||||
}
|
||||
if err := this.applier.CreateGhostTable(); err != nil {
|
||||
log.Errorf("Unable to create ghost table, see further error details. Perhaps a previous migration failed without dropping the table? Bailing out")
|
||||
return err
|
||||
}
|
||||
if err := this.applier.AlterGhost(); err != nil {
|
||||
log.Errorf("Unable to ALTER ghost table, see further error details. Bailing out")
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
this.applier.WriteChangelogState(string(GhostTableMigrated))
|
||||
@ -1026,12 +1119,16 @@ func (this *Migrator) executeWriteFuncs() error {
|
||||
log.Debugf("Noop operation; not really executing write funcs")
|
||||
return nil
|
||||
}
|
||||
contextDumpTick := time.Tick(contextDumpInterval)
|
||||
for {
|
||||
this.throttler.throttle(nil)
|
||||
|
||||
// We give higher priority to event processing, then secondary priority to
|
||||
// rowcopy
|
||||
// We give higher priority to event processing, then secondary priority to rowcopy
|
||||
select {
|
||||
case <-contextDumpTick:
|
||||
{
|
||||
this.dumpResurrectContext()
|
||||
}
|
||||
case applyEventFunc := <-this.applyEventsQueue:
|
||||
{
|
||||
if err := this.retryOperation(applyEventFunc); err != nil {
|
||||
|
@ -28,13 +28,15 @@ type Server struct {
|
||||
tcpListener net.Listener
|
||||
hooksExecutor *HooksExecutor
|
||||
printStatus printStatusFunc
|
||||
panicAbort chan error
|
||||
}
|
||||
|
||||
func NewServer(hooksExecutor *HooksExecutor, printStatus printStatusFunc) *Server {
|
||||
func NewServer(hooksExecutor *HooksExecutor, printStatus printStatusFunc, panicAbort chan error) *Server {
|
||||
return &Server{
|
||||
migrationContext: base.GetMigrationContext(),
|
||||
hooksExecutor: hooksExecutor,
|
||||
printStatus: printStatus,
|
||||
panicAbort: panicAbort,
|
||||
}
|
||||
}
|
||||
|
||||
@ -251,7 +253,7 @@ help # This message
|
||||
case "panic":
|
||||
{
|
||||
err := fmt.Errorf("User commanded 'panic'. I will now panic, without cleanup. PANIC!")
|
||||
this.migrationContext.PanicAbort <- err
|
||||
this.panicAbort <- err
|
||||
return NoPrintStatusRule, err
|
||||
}
|
||||
default:
|
||||
|
@ -20,11 +20,13 @@ import (
|
||||
"github.com/outbrain/golib/sqlutils"
|
||||
)
|
||||
|
||||
type BinlogEventListenerFunc func(event *binlog.BinlogDMLEvent, coordinates *mysql.BinlogCoordinates) error
|
||||
|
||||
type BinlogEventListener struct {
|
||||
async bool
|
||||
databaseName string
|
||||
tableName string
|
||||
onDmlEvent func(event *binlog.BinlogDMLEvent) error
|
||||
onDmlEvent BinlogEventListenerFunc
|
||||
}
|
||||
|
||||
const (
|
||||
@ -57,7 +59,7 @@ func NewEventsStreamer() *EventsStreamer {
|
||||
|
||||
// AddListener registers a new listener for binlog events, on a per-table basis
|
||||
func (this *EventsStreamer) AddListener(
|
||||
async bool, databaseName string, tableName string, onDmlEvent func(event *binlog.BinlogDMLEvent) error) (err error) {
|
||||
async bool, databaseName string, tableName string, onDmlEvent BinlogEventListenerFunc) (err error) {
|
||||
|
||||
this.listenersMutex.Lock()
|
||||
defer this.listenersMutex.Unlock()
|
||||
@ -80,10 +82,11 @@ func (this *EventsStreamer) AddListener(
|
||||
|
||||
// notifyListeners will notify relevant listeners with given DML event. Only
|
||||
// listeners registered for changes on the table on which the DML operates are notified.
|
||||
func (this *EventsStreamer) notifyListeners(binlogEvent *binlog.BinlogDMLEvent) {
|
||||
func (this *EventsStreamer) notifyListeners(binlogEntry *binlog.BinlogEntry) {
|
||||
this.listenersMutex.Lock()
|
||||
defer this.listenersMutex.Unlock()
|
||||
|
||||
binlogEvent := binlogEntry.DmlEvent
|
||||
for _, listener := range this.listeners {
|
||||
listener := listener
|
||||
if strings.ToLower(listener.databaseName) != strings.ToLower(binlogEvent.DatabaseName) {
|
||||
@ -94,15 +97,15 @@ func (this *EventsStreamer) notifyListeners(binlogEvent *binlog.BinlogDMLEvent)
|
||||
}
|
||||
if listener.async {
|
||||
go func() {
|
||||
listener.onDmlEvent(binlogEvent)
|
||||
listener.onDmlEvent(binlogEvent, &binlogEntry.Coordinates)
|
||||
}()
|
||||
} else {
|
||||
listener.onDmlEvent(binlogEvent)
|
||||
listener.onDmlEvent(binlogEvent, &binlogEntry.Coordinates)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (this *EventsStreamer) InitDBConnections() (err error) {
|
||||
func (this *EventsStreamer) InitDBConnections(resurrectedContext *base.MigrationContext) (err error) {
|
||||
EventsStreamerUri := this.connectionConfig.GetDBUri(this.migrationContext.DatabaseName)
|
||||
if this.db, _, err = sqlutils.GetDB(EventsStreamerUri); err != nil {
|
||||
return err
|
||||
@ -110,8 +113,15 @@ func (this *EventsStreamer) InitDBConnections() (err error) {
|
||||
if err := this.validateConnection(); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := this.readCurrentBinlogCoordinates(); err != nil {
|
||||
return err
|
||||
if this.migrationContext.Resurrect {
|
||||
// Rewinding to beginning of logfile:
|
||||
resurrectedContext.AppliedBinlogCoordinates.LogPos = 4
|
||||
log.Infof("Resurrection: initiating streamer at resurrected coordinates %+v", resurrectedContext.AppliedBinlogCoordinates)
|
||||
this.initialBinlogCoordinates = &resurrectedContext.AppliedBinlogCoordinates
|
||||
} else {
|
||||
if err := this.readCurrentBinlogCoordinates(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if err := this.initBinlogReader(this.initialBinlogCoordinates); err != nil {
|
||||
return err
|
||||
@ -184,7 +194,7 @@ func (this *EventsStreamer) StreamEvents(canStopStreaming func() bool) error {
|
||||
go func() {
|
||||
for binlogEntry := range this.eventsChannel {
|
||||
if binlogEntry.DmlEvent != nil {
|
||||
this.notifyListeners(binlogEntry.DmlEvent)
|
||||
this.notifyListeners(binlogEntry)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
@ -21,13 +21,15 @@ type Throttler struct {
|
||||
migrationContext *base.MigrationContext
|
||||
applier *Applier
|
||||
inspector *Inspector
|
||||
panicAbort chan error
|
||||
}
|
||||
|
||||
func NewThrottler(applier *Applier, inspector *Inspector) *Throttler {
|
||||
func NewThrottler(applier *Applier, inspector *Inspector, panicAbort chan error) *Throttler {
|
||||
return &Throttler{
|
||||
migrationContext: base.GetMigrationContext(),
|
||||
applier: applier,
|
||||
inspector: inspector,
|
||||
panicAbort: panicAbort,
|
||||
}
|
||||
}
|
||||
|
||||
@ -81,11 +83,9 @@ func (this *Throttler) collectHeartbeat() {
|
||||
if atomic.LoadInt64(&this.migrationContext.CleanupImminentFlag) > 0 {
|
||||
return nil
|
||||
}
|
||||
changelogState, err := this.inspector.readChangelogState()
|
||||
if err != nil {
|
||||
if heartbeatValue, err := this.inspector.readChangelogState("heartbeat"); err != nil {
|
||||
return log.Errore(err)
|
||||
}
|
||||
if heartbeatValue, ok := changelogState["heartbeat"]; ok {
|
||||
} else {
|
||||
this.parseChangelogHeartbeat(heartbeatValue)
|
||||
}
|
||||
return nil
|
||||
@ -155,7 +155,7 @@ func (this *Throttler) collectGeneralThrottleMetrics() error {
|
||||
// Regardless of throttle, we take opportunity to check for panic-abort
|
||||
if this.migrationContext.PanicFlagFile != "" {
|
||||
if base.FileExists(this.migrationContext.PanicFlagFile) {
|
||||
this.migrationContext.PanicAbort <- fmt.Errorf("Found panic-file %s. Aborting without cleanup", this.migrationContext.PanicFlagFile)
|
||||
this.panicAbort <- fmt.Errorf("Found panic-file %s. Aborting without cleanup", this.migrationContext.PanicFlagFile)
|
||||
}
|
||||
}
|
||||
|
||||
@ -164,7 +164,7 @@ func (this *Throttler) collectGeneralThrottleMetrics() error {
|
||||
return setThrottle(true, fmt.Sprintf("%s %s", variableName, err), base.NoThrottleReasonHint)
|
||||
}
|
||||
if criticalLoadMet && this.migrationContext.CriticalLoadIntervalMilliseconds == 0 {
|
||||
this.migrationContext.PanicAbort <- fmt.Errorf("critical-load met: %s=%d, >=%d", variableName, value, threshold)
|
||||
this.panicAbort <- fmt.Errorf("critical-load met: %s=%d, >=%d", variableName, value, threshold)
|
||||
}
|
||||
if criticalLoadMet && this.migrationContext.CriticalLoadIntervalMilliseconds > 0 {
|
||||
log.Errorf("critical-load met once: %s=%d, >=%d. Will check again in %d millis", variableName, value, threshold, this.migrationContext.CriticalLoadIntervalMilliseconds)
|
||||
@ -172,7 +172,7 @@ func (this *Throttler) collectGeneralThrottleMetrics() error {
|
||||
timer := time.NewTimer(time.Millisecond * time.Duration(this.migrationContext.CriticalLoadIntervalMilliseconds))
|
||||
<-timer.C
|
||||
if criticalLoadMetAgain, variableName, value, threshold, _ := this.criticalLoadIsMet(); criticalLoadMetAgain {
|
||||
this.migrationContext.PanicAbort <- fmt.Errorf("critical-load met again after %d millis: %s=%d, >=%d", this.migrationContext.CriticalLoadIntervalMilliseconds, variableName, value, threshold)
|
||||
this.panicAbort <- fmt.Errorf("critical-load met again after %d millis: %s=%d, >=%d", this.migrationContext.CriticalLoadIntervalMilliseconds, variableName, value, threshold)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
@ -6,6 +6,9 @@
|
||||
package sql
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/base64"
|
||||
"encoding/gob"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"strconv"
|
||||
@ -215,7 +218,7 @@ func (this *UniqueKey) String() string {
|
||||
if this.IsAutoIncrement {
|
||||
description = fmt.Sprintf("%s (auto_increment)", description)
|
||||
}
|
||||
return fmt.Sprintf("%s: %s; has nullable: %+v", description, this.Columns.Names(), this.HasNullable)
|
||||
return fmt.Sprintf("%s: %s; has nullable: %+v", description, this.Columns.String(), this.HasNullable)
|
||||
}
|
||||
|
||||
type ColumnValues struct {
|
||||
@ -247,6 +250,32 @@ func ToColumnValues(abstractValues []interface{}) *ColumnValues {
|
||||
return result
|
||||
}
|
||||
|
||||
func NewColumnValuesFromBase64(b64 string) (columnValues *ColumnValues, err error) {
|
||||
var abstractValues []interface{}
|
||||
|
||||
b, err := base64.StdEncoding.DecodeString(b64)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
buff := bytes.Buffer{}
|
||||
buff.Write(b)
|
||||
decoder := gob.NewDecoder(&buff)
|
||||
err = decoder.Decode(&abstractValues)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return ToColumnValues(abstractValues), nil
|
||||
}
|
||||
|
||||
func (this *ColumnValues) ToBase64() (b64 string, err error) {
|
||||
buff := bytes.Buffer{}
|
||||
encoder := gob.NewEncoder(&buff)
|
||||
if err = encoder.Encode(this.abstractValues); err != nil {
|
||||
return b64, err
|
||||
}
|
||||
return base64.StdEncoding.EncodeToString(buff.Bytes()), nil
|
||||
}
|
||||
|
||||
func (this *ColumnValues) AbstractValues() []interface{} {
|
||||
return this.abstractValues
|
||||
}
|
||||
|
5
resources/hooks-sample/gh-ost-on-resurrecting-hook
Normal file
5
resources/hooks-sample/gh-ost-on-resurrecting-hook
Normal file
@ -0,0 +1,5 @@
|
||||
#!/bin/bash
|
||||
|
||||
# Sample hook file for gh-ost-on-resurrecting
|
||||
|
||||
echo "$(date) gh-ost-on-resurrecting: beginning resurrection on $GH_OST_DATABASE_NAME.$GH_OST_TABLE_NAME" >> /tmp/gh-ost.log
|
Loading…
Reference in New Issue
Block a user