- Removed use of master_pos_wait()
. It was unneccessary in the first place and introduced new problems.
- Supporting `--allow-nullable-unique-key` - Tool will bail out if chosen key has nullable columns and the above is not provided - Fixed `OriginalBinlogRowImage` comaprison (lower/upper case issue) - Introduced reasonable streamer reconnect sleep time
This commit is contained in:
parent
9b54d0208f
commit
5375aa4f69
2
build.sh
2
build.sh
@ -1,7 +1,7 @@
|
||||
#!/bin/bash
|
||||
#
|
||||
#
|
||||
RELEASE_VERSION="0.7.13"
|
||||
RELEASE_VERSION="0.7.16"
|
||||
|
||||
buildpath=/tmp/gh-ost
|
||||
target=gh-ost
|
||||
|
@ -39,9 +39,10 @@ type MigrationContext struct {
|
||||
OriginalTableName string
|
||||
AlterStatement string
|
||||
|
||||
CountTableRows bool
|
||||
AllowedRunningOnMaster bool
|
||||
SwitchToRowBinlogFormat bool
|
||||
CountTableRows bool
|
||||
AllowedRunningOnMaster bool
|
||||
SwitchToRowBinlogFormat bool
|
||||
NullableUniqueKeyAllowed bool
|
||||
|
||||
config ContextConfig
|
||||
configMutex *sync.Mutex
|
||||
|
@ -32,6 +32,7 @@ func main() {
|
||||
flag.StringVar(&migrationContext.AlterStatement, "alter", "", "alter statement (mandatory)")
|
||||
flag.BoolVar(&migrationContext.CountTableRows, "exact-rowcount", false, "actually count table rows as opposed to estimate them (results in more accurate progress estimation)")
|
||||
flag.BoolVar(&migrationContext.AllowedRunningOnMaster, "allow-on-master", false, "allow this migration to run directly on master. Preferably it would run on a replica")
|
||||
flag.BoolVar(&migrationContext.NullableUniqueKeyAllowed, "allow-nullable-unique-key", false, "allow gh-ost to migrate based on a unique key with nullable columns. As long as no NULL values exist, this should be OK. If NULL values exist in chosen key, data may be corrupted. Use at your own risk!")
|
||||
|
||||
executeFlag := flag.Bool("execute", false, "actually execute the alter & migrate the table. Default is noop: do some tests and exit")
|
||||
flag.BoolVar(&migrationContext.TestOnReplica, "test-on-replica", false, "Have the migration run on a replica, not on the master. At the end of migration tables are not swapped; gh-ost issues `STOP SLAVE` and you can compare the two tables for building trust")
|
||||
|
@ -549,6 +549,17 @@ func (this *Applier) StopSlaveIOThread() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// StartSlaveSQLThread is applicable with --test-on-replica
|
||||
func (this *Applier) StartSlaveSQLThread() error {
|
||||
query := `start /* gh-ost */ slave sql_thread`
|
||||
log.Infof("Verifying SQL thread is running")
|
||||
if _, err := sqlutils.ExecNoPrepare(this.db, query); err != nil {
|
||||
return err
|
||||
}
|
||||
log.Infof("SQL thread started")
|
||||
return nil
|
||||
}
|
||||
|
||||
// MasterPosWait is applicable with --test-on-replica
|
||||
func (this *Applier) MasterPosWait(binlogCoordinates *mysql.BinlogCoordinates) error {
|
||||
var appliedRows int64
|
||||
@ -565,22 +576,14 @@ func (this *Applier) StopSlaveNicely() error {
|
||||
if err := this.StopSlaveIOThread(); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := this.StartSlaveSQLThread(); err != nil {
|
||||
return err
|
||||
}
|
||||
readBinlogCoordinates, executeBinlogCoordinates, err := mysql.GetReplicationBinlogCoordinates(this.db)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
log.Infof("Replication IO thread at %+v. SQL thread is at %+v", *readBinlogCoordinates, *executeBinlogCoordinates)
|
||||
log.Infof("Will wait for SQL thread to catch up with IO thread")
|
||||
if err := this.MasterPosWait(readBinlogCoordinates); err != nil {
|
||||
log.Errorf("Error waiting for SQL thread to catch up. Replication IO thread at %+v. SQL thread is at %+v", *readBinlogCoordinates, *executeBinlogCoordinates)
|
||||
return err
|
||||
}
|
||||
log.Infof("Replication SQL thread applied all events up to %+v", *readBinlogCoordinates)
|
||||
if selfBinlogCoordinates, err := mysql.GetSelfBinlogCoordinates(this.db); err != nil {
|
||||
return err
|
||||
} else {
|
||||
log.Infof("Self binlog coordinates: %+v", *selfBinlogCoordinates)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -113,9 +113,16 @@ func (this *Inspector) InspectOriginalAndGhostTables() (err error) {
|
||||
}
|
||||
this.migrationContext.UniqueKey = sharedUniqueKeys[0]
|
||||
log.Infof("Chosen shared unique key is %s", this.migrationContext.UniqueKey.Name)
|
||||
if this.migrationContext.UniqueKey.HasNullable {
|
||||
if this.migrationContext.NullableUniqueKeyAllowed {
|
||||
log.Warningf("Chosen key (%s) has nullable columns. You have supplied with --allow-nullable-unique-key and so this migration proceeds. As long as there aren't NULL values in this key's column, migration should be fine. NULL values will corrupt migration's data", this.migrationContext.UniqueKey)
|
||||
} else {
|
||||
return fmt.Errorf("Chosen key (%s) has nullable columns. Bailing out. To force this operation to continue, supply --allow-nullable-unique-key flag. Only do so if you are certain there are no actual NULL values in this key. As long as there aren't, migration should be fine. NULL values in columns of this key will corrupt migration's data", this.migrationContext.UniqueKey)
|
||||
}
|
||||
}
|
||||
if !this.migrationContext.UniqueKey.IsPrimary() {
|
||||
if this.migrationContext.OriginalBinlogRowImage != "full" {
|
||||
return fmt.Errorf("binlog_row_image is '%s' and chosen key is %s, which is not the primary key. This operation cannot proceed. You may `set global binlog_row_image='full'` and try again")
|
||||
if this.migrationContext.OriginalBinlogRowImage != "FULL" {
|
||||
return fmt.Errorf("binlog_row_image is '%s' and chosen key is %s, which is not the primary key. This operation cannot proceed. You may `set global binlog_row_image='full'` and try again", this.migrationContext.OriginalBinlogRowImage, this.migrationContext.UniqueKey)
|
||||
}
|
||||
}
|
||||
|
||||
@ -261,6 +268,7 @@ func (this *Inspector) validateBinlogs() error {
|
||||
// Only as of 5.6. We wish to support 5.5 as well
|
||||
this.migrationContext.OriginalBinlogRowImage = ""
|
||||
}
|
||||
this.migrationContext.OriginalBinlogRowImage = strings.ToUpper(this.migrationContext.OriginalBinlogRowImage)
|
||||
|
||||
log.Infof("binary logs validated on %s:%d", this.connectionConfig.Key.Hostname, this.connectionConfig.Key.Port)
|
||||
return nil
|
||||
|
@ -10,6 +10,7 @@ import (
|
||||
"fmt"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/github/gh-ost/go/base"
|
||||
"github.com/github/gh-ost/go/binlog"
|
||||
@ -27,7 +28,8 @@ type BinlogEventListener struct {
|
||||
}
|
||||
|
||||
const (
|
||||
EventsChannelBufferSize = 1
|
||||
EventsChannelBufferSize = 1
|
||||
ReconnectStreamerSleepSeconds = 5
|
||||
)
|
||||
|
||||
// EventsStreamer reads data from binary logs and streams it on. It acts as a publisher,
|
||||
@ -177,6 +179,8 @@ func (this *EventsStreamer) StreamEvents(canStopStreaming func() bool) error {
|
||||
if err := this.binlogReader.StreamEvents(canStopStreaming, this.eventsChannel); err != nil {
|
||||
// Reposition at same coordinates. Single attempt (TODO: make multiple attempts?)
|
||||
log.Infof("StreamEvents encountered unexpected error: %+v", err)
|
||||
time.Sleep(ReconnectStreamerSleepSeconds * time.Second)
|
||||
log.Infof("Reconnecting...")
|
||||
err = this.binlogReader.Reconnect()
|
||||
if err != nil {
|
||||
return err
|
||||
|
Loading…
Reference in New Issue
Block a user