Merge branch 'master' into myisam-gtid

This commit is contained in:
Shlomi Noach 2016-12-01 09:43:38 +01:00 committed by GitHub
commit 9b068ec222
26 changed files with 449 additions and 82 deletions

4
.gitignore vendored Normal file
View File

@ -0,0 +1,4 @@
/.gopath/
/bin/
/libexec/
/.vendor/

View File

@ -60,6 +60,7 @@ Also see:
- [requirements and limitations](doc/requirements-and-limitations.md)
- [what if?](doc/what-if.md)
- [the fine print](doc/the-fine-print.md)
- [Questions](https://github.com/github/gh-ost/issues?q=label%3Aquestion)
## What's in a name?

1
RELEASE_VERSION Normal file
View File

@ -0,0 +1 @@
1.0.28

View File

@ -2,7 +2,7 @@
#
#
RELEASE_VERSION="1.0.26"
RELEASE_VERSION=$(cat RELEASE_VERSION)
function build {
osname=$1

View File

@ -63,6 +63,8 @@ Optional. Default is `safe`. See more discussion in [cut-over](cut-over.md)
At this time (10-2016) `gh-ost` does not support foreign keys on migrated tables (it bails out when it notices a FK on the migrated table). However, it is able to support _dropping_ of foreign keys via this flag. If you're trying to get rid of foreign keys in your environment, this is a useful flag.
See also: [`skip-foreign-key-checks`](#skip-foreign-key-checks)
### exact-rowcount
A `gh-ost` execution need to copy whatever rows you have in your existing table onto the ghost table. This can, and often be, a large number. Exactly what that number is?
@ -71,7 +73,8 @@ A `gh-ost` execution need to copy whatever rows you have in your existing table
`gh-ost` also supports the `--exact-rowcount` flag. When this flag is given, two things happen:
- An initial, authoritative `select count(*) from your_table`.
This query may take a long time to complete, but is performed before we begin the massive operations.
When `--concurrent-rowcount` is also specified, this runs in paralell to row copy.
When `--concurrent-rowcount` is also specified, this runs in parallel to row copy.
Note: `--concurrent-rowcount` now defaults to `true`.
- A continuous update to the estimate as we make progress applying events.
We heuristically update the number of rows based on the queries we process from the binlogs.
@ -108,6 +111,10 @@ See also: [Sub-second replication lag throttling](subsecond-lag.md)
Typically `gh-ost` is used to migrate tables on a master. If you wish to only perform the migration in full on a replica, connect `gh-ost` to said replica and pass `--migrate-on-replica`. `gh-ost` will briefly connect to the master but other issue no changes on the master. Migration will be fully executed on the replica, while making sure to maintain a small replication lag.
### skip-foreign-key-checks
By default `gh-ost` verifies no foreign keys exist on the migrated table. On servers with large number of tables this check can take a long time. If you're absolutely certain no foreign keys exist (table does not referenece other table nor is referenced by other tables) and wish to save the check time, provide with `--skip-foreign-key-checks`.
### skip-renamed-columns
See `approve-renamed-columns`

View File

@ -4,7 +4,7 @@
- You will need to have one server serving Row Based Replication (RBR) format binary logs. Right now `FULL` row image is supported. `MINIMAL` to be supported in the near future. `gh-ost` prefers to work with replicas. You may [still have your master configured with Statement Based Replication](migrating-with-sbr.md) (SBR).
- If you are using a replica, the table must have an identical schema between the master and replica.
- If you are using a replica, the table must have an identical schema between the master and replica.
- `gh-ost` requires an account with these privileges:
@ -28,6 +28,8 @@ The `SUPER` privilege is required for `STOP SLAVE`, `START SLAVE` operations. Th
- MySQL 5.7 generated columns are not supported. They may be supported in the future.
- MySQL 5.7 `JSON` columns are not supported. They are likely to be supported shortly.
- The two _before_ & _after_ tables must share some `UNIQUE KEY`. Such key would be used by `gh-ost` to iterate the table.
- As an example, if your table has a single `UNIQUE KEY` and no `PRIMARY KEY`, and you wish to replace it with a `PRIMARY KEY`, you will need two migrations: one to add the `PRIMARY KEY` (this migration will use the existing `UNIQUE KEY`), another to drop the now redundant `UNIQUE KEY` (this migration will use the `PRIMARY KEY`).
@ -43,4 +45,7 @@ The `SUPER` privilege is required for `STOP SLAVE`, `START SLAVE` operations. Th
- Multisource is not supported when migrating via replica. It _should_ work (but never tested) when connecting directly to master (`--allow-on-master`)
- Master-master setup is only supported in active-passive setup. Active-active (where table is being written to on both masters concurrently) is unsupported. It may be supported in the future.
- If you have en `enum` field as part of your migration key (typically the `PRIMARY KEY`), migration performance will be degraded and potentially bad. [Read more](https://github.com/github/gh-ost/pull/277#issuecomment-254811520)
- Migrating a `FEDERATED` table is unsupported and is irrelevant to the problem `gh-ost` tackles.

View File

@ -47,7 +47,7 @@ Initial setup is a no-concurrency operation
- Applying `alter` on ghost table
- Comparing structure of original & ghost table. Looking for shared columns, shared unique keys, validating foreign keys. Choosing shared unique key, the key by which we chunk the table and process it.
- Setting up the binlog listener; begin listening on changelog events
- Injecting a "good to go" ebtry onto the changelog table (to be intercepted via binary logs)
- Injecting a "good to go" entry onto the changelog table (to be intercepted via binary logs)
- Begin listening on binlog events for original table DMLs
- Reading original table's chosen key min/max values

View File

@ -37,6 +37,13 @@ const (
CutOverTwoStep = iota
)
type ThrottleReasonHint string
const (
NoThrottleReasonHint ThrottleReasonHint = "NoThrottleReasonHint"
UserCommandThrottleReasonHint = "UserCommandThrottleReasonHint"
)
var (
envVariableRegexp = regexp.MustCompile("[$][{](.*)[}]")
)
@ -44,12 +51,14 @@ var (
type ThrottleCheckResult struct {
ShouldThrottle bool
Reason string
ReasonHint ThrottleReasonHint
}
func NewThrottleCheckResult(throttle bool, reason string) *ThrottleCheckResult {
func NewThrottleCheckResult(throttle bool, reason string, reasonHint ThrottleReasonHint) *ThrottleCheckResult {
return &ThrottleCheckResult{
ShouldThrottle: throttle,
Reason: reason,
ReasonHint: reasonHint,
}
}
@ -66,6 +75,7 @@ type MigrationContext struct {
AllowedMasterMaster bool
SwitchToRowBinlogFormat bool
AssumeRBR bool
SkipForeignKeyChecks bool
NullableUniqueKeyAllowed bool
ApproveRenamedColumns bool
SkipRenamedColumns bool
@ -138,6 +148,7 @@ type MigrationContext struct {
TotalDMLEventsApplied int64
isThrottled bool
throttleReason string
throttleReasonHint ThrottleReasonHint
throttleGeneralCheckResult ThrottleCheckResult
throttleMutex *sync.Mutex
IsPostponingCutOver int64
@ -145,6 +156,8 @@ type MigrationContext struct {
AllEventsUpToLockProcessedInjectedFlag int64
CleanupImminentFlag int64
UserCommandedUnpostponeFlag int64
CutOverCompleteFlag int64
InCutOverCriticalSectionFlag int64
PanicAbort chan error
OriginalTableColumnsOnApplier *sql.ColumnList
@ -416,17 +429,28 @@ func (this *MigrationContext) GetThrottleGeneralCheckResult() *ThrottleCheckResu
return &result
}
func (this *MigrationContext) SetThrottled(throttle bool, reason string) {
func (this *MigrationContext) SetThrottled(throttle bool, reason string, reasonHint ThrottleReasonHint) {
this.throttleMutex.Lock()
defer this.throttleMutex.Unlock()
this.isThrottled = throttle
this.throttleReason = reason
this.throttleReasonHint = reasonHint
}
func (this *MigrationContext) IsThrottled() (bool, string) {
func (this *MigrationContext) IsThrottled() (bool, string, ThrottleReasonHint) {
this.throttleMutex.Lock()
defer this.throttleMutex.Unlock()
return this.isThrottled, this.throttleReason
// we don't throttle when cutting over. We _do_ throttle:
// - during copy phase
// - just before cut-over
// - in between cut-over retries
// When cutting over, we need to be aggressive. Cut-over holds table locks.
// We need to release those asap.
if atomic.LoadInt64(&this.InCutOverCriticalSectionFlag) > 0 {
return false, "critical section", NoThrottleReasonHint
}
return this.isThrottled, this.throttleReason, this.throttleReasonHint
}
func (this *MigrationContext) GetReplicationLagQuery() string {

View File

@ -5,8 +5,6 @@
package binlog
import ()
// BinlogReader is a general interface whose implementations can choose their methods of reading
// a binary log file and parsing it into binlog entries
type BinlogReader interface {

View File

@ -17,8 +17,6 @@ import (
"github.com/siddontang/go-mysql/replication"
)
var ()
const (
serverId = 99999
)
@ -120,6 +118,9 @@ func (this *GoMySQLReader) handleRowsEvent(ev *replication.BinlogEvent, rowsEven
// StreamEvents
func (this *GoMySQLReader) StreamEvents(canStopStreaming func() bool, entriesChannel chan<- *BinlogEntry) error {
if canStopStreaming() {
return nil
}
for {
if canStopStreaming() {
break
@ -150,3 +151,8 @@ func (this *GoMySQLReader) StreamEvents(canStopStreaming func() bool, entriesCha
return nil
}
func (this *GoMySQLReader) Close() error {
this.binlogSyncer.Close()
return nil
}

View File

@ -54,7 +54,7 @@ func main() {
flag.StringVar(&migrationContext.OriginalTableName, "table", "", "table name (mandatory)")
flag.StringVar(&migrationContext.AlterStatement, "alter", "", "alter statement (mandatory)")
flag.BoolVar(&migrationContext.CountTableRows, "exact-rowcount", false, "actually count table rows as opposed to estimate them (results in more accurate progress estimation)")
flag.BoolVar(&migrationContext.ConcurrentCountTableRows, "concurrent-rowcount", false, "(with --exact-rowcount), when true: count rows after row-copy begins, concurrently, and adjust row estimate later on; defaults false: first count rows, then start row copy")
flag.BoolVar(&migrationContext.ConcurrentCountTableRows, "concurrent-rowcount", true, "(with --exact-rowcount), when true (default): count rows after row-copy begins, concurrently, and adjust row estimate later on; when false: first count rows, then start row copy")
flag.BoolVar(&migrationContext.AllowedRunningOnMaster, "allow-on-master", false, "allow this migration to run directly on master. Preferably it would run on a replica")
flag.BoolVar(&migrationContext.AllowedMasterMaster, "allow-master-master", false, "explicitly allow running in a master-master setup")
flag.BoolVar(&migrationContext.NullableUniqueKeyAllowed, "allow-nullable-unique-key", false, "allow gh-ost to migrate based on a unique key with nullable columns. As long as no NULL values exist, this should be OK. If NULL values exist in chosen key, data may be corrupted. Use at your own risk!")
@ -62,6 +62,7 @@ func main() {
flag.BoolVar(&migrationContext.SkipRenamedColumns, "skip-renamed-columns", false, "in case your `ALTER` statement renames columns, gh-ost will note that and offer its interpretation of the rename. By default gh-ost does not proceed to execute. This flag tells gh-ost to skip the renamed columns, i.e. to treat what gh-ost thinks are renamed columns as unrelated columns. NOTE: you may lose column data")
flag.BoolVar(&migrationContext.IsTungsten, "tungsten", false, "explicitly let gh-ost know that you are running on a tungsten-replication based topology (you are likely to also provide --assume-master-host)")
flag.BoolVar(&migrationContext.DiscardForeignKeys, "discard-foreign-keys", false, "DANGER! This flag will migrate a table that has foreign keys and will NOT create foreign keys on the ghost table, thus your altered table will have NO foreign keys. This is useful for intentional dropping of foreign keys")
flag.BoolVar(&migrationContext.SkipForeignKeyChecks, "skip-foreign-key-checks", false, "set to 'true' when you know for certain there are no foreign keys on your table, and wish to skip the time it takes for gh-ost to verify that")
executeFlag := flag.Bool("execute", false, "actually execute the alter & migrate the table. Default is noop: do some tests and exit")
flag.BoolVar(&migrationContext.TestOnReplica, "test-on-replica", false, "Have the migration run on a replica, not on the master. At the end of migration replication is stopped, and tables are swapped and immediately swap-revert. Replication remains stopped and you can compare the two tables for building trust")

View File

@ -305,6 +305,9 @@ func (this *Applier) InitiateHeartbeat() {
// Generally speaking, we would issue a goroutine, but I'd actually rather
// have this block the loop rather than spam the master in the event something
// goes wrong
if throttle, _, reasonHint := this.migrationContext.IsThrottled(); throttle && (reasonHint == base.UserCommandThrottleReasonHint) {
continue
}
if err := injectHeartbeat(); err != nil {
return
}
@ -691,8 +694,7 @@ func (this *Applier) DropAtomicCutOverSentryTableIfExists() error {
return this.dropTable(tableName)
}
// DropAtomicCutOverSentryTableIfExists checks if the "old" table name
// happens to be a cut-over magic table; if so, it drops it.
// CreateAtomicCutOverSentryTable
func (this *Applier) CreateAtomicCutOverSentryTable() error {
if err := this.DropAtomicCutOverSentryTableIfExists(); err != nil {
return err

View File

@ -11,6 +11,7 @@ import (
"reflect"
"strings"
"sync/atomic"
"time"
"github.com/github/gh-ost/go/base"
"github.com/github/gh-ost/go/mysql"
@ -335,12 +336,27 @@ func (this *Inspector) validateLogSlaveUpdates() error {
if err := this.db.QueryRow(query).Scan(&logSlaveUpdates); err != nil {
return err
}
if !logSlaveUpdates && !this.migrationContext.InspectorIsAlsoApplier() && !this.migrationContext.IsTungsten {
return fmt.Errorf("%s:%d must have log_slave_updates enabled", this.connectionConfig.Key.Hostname, this.connectionConfig.Key.Port)
if logSlaveUpdates {
log.Infof("log_slave_updates validated on %s:%d", this.connectionConfig.Key.Hostname, this.connectionConfig.Key.Port)
return nil
}
log.Infof("binary logs updates validated on %s:%d", this.connectionConfig.Key.Hostname, this.connectionConfig.Key.Port)
return nil
if this.migrationContext.IsTungsten {
log.Warning("log_slave_updates not found on %s:%d, but --tungsten provided, so I'm proceeding", this.connectionConfig.Key.Hostname, this.connectionConfig.Key.Port)
return nil
}
if this.migrationContext.TestOnReplica || this.migrationContext.MigrateOnReplica {
return fmt.Errorf("%s:%d must have log_slave_updates enabled for testing/migrating on replica", this.connectionConfig.Key.Hostname, this.connectionConfig.Key.Port)
}
if this.migrationContext.InspectorIsAlsoApplier() {
log.Warning("log_slave_updates not found on %s:%d, but executing directly on master, so I'm proceeeding", this.connectionConfig.Key.Hostname, this.connectionConfig.Key.Port)
return nil
}
return fmt.Errorf("%s:%d must have log_slave_updates enabled for executing migration", this.connectionConfig.Key.Hostname, this.connectionConfig.Key.Port)
}
// validateTable makes sure the table we need to operate on actually exists
@ -372,6 +388,10 @@ func (this *Inspector) validateTable() error {
// validateTableForeignKeys makes sure no foreign keys exist on the migrated table
func (this *Inspector) validateTableForeignKeys(allowChildForeignKeys bool) error {
if this.migrationContext.SkipForeignKeyChecks {
log.Warning("--skip-foreign-key-checks provided: will not check for foreign keys")
return nil
}
query := `
SELECT
SUM(REFERENCED_TABLE_NAME IS NOT NULL AND TABLE_SCHEMA=? AND TABLE_NAME=?) as num_child_side_fk,
@ -675,3 +695,12 @@ func (this *Inspector) getMasterConnectionConfig() (applierConfig *mysql.Connect
visitedKeys := mysql.NewInstanceKeyMap()
return mysql.GetMasterConnectionConfigSafe(this.connectionConfig, visitedKeys, this.migrationContext.AllowedMasterMaster)
}
func (this *Inspector) getReplicationLag() (replicationLag time.Duration, err error) {
replicationLagQuery := this.migrationContext.GetReplicationLagQuery()
replicationLag, err = mysql.GetReplicationLag(
this.migrationContext.InspectorConnectionConfig,
replicationLagQuery,
)
return replicationLag, err
}

View File

@ -11,6 +11,7 @@ import (
"math"
"os"
"os/signal"
"strings"
"sync/atomic"
"syscall"
"time"
@ -30,6 +31,10 @@ const (
AllEventsUpToLockProcessed = "AllEventsUpToLockProcessed"
)
func ReadChangelogState(s string) ChangelogState {
return ChangelogState(strings.Split(s, ":")[0])
}
type tableWriteFunc func() error
const (
@ -60,10 +65,9 @@ type Migrator struct {
firstThrottlingCollected chan bool
ghostTableMigrated chan bool
rowCopyComplete chan bool
allEventsUpToLockProcessed chan bool
allEventsUpToLockProcessed chan string
rowCopyCompleteFlag int64
inCutOverCriticalActionFlag int64
rowCopyCompleteFlag int64
// copyRowsQueue should not be buffered; if buffered some non-damaging but
// excessive work happens at the end of the iteration as new copy-jobs arrive befroe realizing the copy is complete
copyRowsQueue chan tableWriteFunc
@ -79,7 +83,7 @@ func NewMigrator() *Migrator {
ghostTableMigrated: make(chan bool),
firstThrottlingCollected: make(chan bool, 1),
rowCopyComplete: make(chan bool),
allEventsUpToLockProcessed: make(chan bool),
allEventsUpToLockProcessed: make(chan string),
copyRowsQueue: make(chan tableWriteFunc),
applyEventsQueue: make(chan tableWriteFunc, applyEventsQueueBuffer),
@ -171,7 +175,7 @@ func (this *Migrator) consumeRowCopyComplete() {
}
func (this *Migrator) canStopStreaming() bool {
return false
return atomic.LoadInt64(&this.migrationContext.CutOverCompleteFlag) != 0
}
// onChangelogStateEvent is called when a binlog event operation on the changelog table is intercepted.
@ -180,7 +184,9 @@ func (this *Migrator) onChangelogStateEvent(dmlEvent *binlog.BinlogDMLEvent) (er
if hint := dmlEvent.NewColumnValues.StringColumn(2); hint != "state" {
return nil
}
changelogState := ChangelogState(dmlEvent.NewColumnValues.StringColumn(3))
changelogStateString := dmlEvent.NewColumnValues.StringColumn(3)
changelogState := ReadChangelogState(changelogStateString)
log.Infof("Intercepted changelog state %s", changelogState)
switch changelogState {
case GhostTableMigrated:
{
@ -189,7 +195,7 @@ func (this *Migrator) onChangelogStateEvent(dmlEvent *binlog.BinlogDMLEvent) (er
case AllEventsUpToLockProcessed:
{
applyEventFunc := func() error {
this.allEventsUpToLockProcessed <- true
this.allEventsUpToLockProcessed <- changelogStateString
return nil
}
// at this point we know all events up to lock have been read from the streamer,
@ -206,7 +212,7 @@ func (this *Migrator) onChangelogStateEvent(dmlEvent *binlog.BinlogDMLEvent) (er
return fmt.Errorf("Unknown changelog state: %+v", changelogState)
}
}
log.Debugf("Received state %+v", changelogState)
log.Infof("Handled changelog state %s", changelogState)
return nil
}
@ -291,7 +297,8 @@ func (this *Migrator) Migrate() (err error) {
return err
}
log.Infof("Waiting for ghost table to be migrated")
initialLag, _ := this.inspector.getReplicationLag()
log.Infof("Waiting for ghost table to be migrated. Current lag is %+v", initialLag)
<-this.ghostTableMigrated
log.Debugf("ghost table migrated")
// Yay! We now know the Ghost and Changelog tables are good to examine!
@ -342,9 +349,10 @@ func (this *Migrator) Migrate() (err error) {
if err := this.hooksExecutor.onBeforeCutOver(); err != nil {
return err
}
if err := this.cutOver(); err != nil {
if err := this.retryOperation(this.cutOver); err != nil {
return err
}
atomic.StoreInt64(&this.migrationContext.CutOverCompleteFlag, 1)
if err := this.finalCleanup(); err != nil {
return nil
@ -375,16 +383,18 @@ func (this *Migrator) cutOver() (err error) {
})
this.migrationContext.MarkPointOfInterest()
log.Debugf("checking for cut-over postpone")
this.sleepWhileTrue(
func() (bool, error) {
if this.migrationContext.PostponeCutOverFlagFile == "" {
return false, nil
}
if atomic.LoadInt64(&this.migrationContext.UserCommandedUnpostponeFlag) > 0 {
atomic.StoreInt64(&this.migrationContext.UserCommandedUnpostponeFlag, 0)
return false, nil
}
if base.FileExists(this.migrationContext.PostponeCutOverFlagFile) {
// Throttle file defined and exists!
// Postpone file defined and exists!
if atomic.LoadInt64(&this.migrationContext.IsPostponingCutOver) == 0 {
if err := this.hooksExecutor.onBeginPostponed(); err != nil {
return true, err
@ -398,6 +408,7 @@ func (this *Migrator) cutOver() (err error) {
)
atomic.StoreInt64(&this.migrationContext.IsPostponingCutOver, 0)
this.migrationContext.MarkPointOfInterest()
log.Debugf("checking for cut-over postpone: complete")
if this.migrationContext.TestOnReplica {
// With `--test-on-replica` we stop replication thread, and then proceed to use
@ -422,20 +433,11 @@ func (this *Migrator) cutOver() (err error) {
if this.migrationContext.CutOverType == base.CutOverAtomic {
// Atomic solution: we use low timeout and multiple attempts. But for
// each failed attempt, we throttle until replication lag is back to normal
err := this.retryOperation(
func() error {
return this.executeAndThrottleOnError(this.atomicCutOver)
},
)
err := this.atomicCutOver()
return err
}
if this.migrationContext.CutOverType == base.CutOverTwoStep {
err := this.retryOperation(
func() error {
return this.executeAndThrottleOnError(this.cutOverTwoStep)
},
)
return err
return this.cutOverTwoStep()
}
return log.Fatalf("Unknown cut-over type: %d; should never get here!", this.migrationContext.CutOverType)
}
@ -443,16 +445,35 @@ func (this *Migrator) cutOver() (err error) {
// Inject the "AllEventsUpToLockProcessed" state hint, wait for it to appear in the binary logs,
// make sure the queue is drained.
func (this *Migrator) waitForEventsUpToLock() (err error) {
timeout := time.NewTimer(time.Second * time.Duration(this.migrationContext.CutOverLockTimeoutSeconds))
this.migrationContext.MarkPointOfInterest()
waitForEventsUpToLockStartTime := time.Now()
log.Infof("Writing changelog state: %+v", AllEventsUpToLockProcessed)
if _, err := this.applier.WriteChangelogState(string(AllEventsUpToLockProcessed)); err != nil {
allEventsUpToLockProcessedChallenge := fmt.Sprintf("%s:%d", string(AllEventsUpToLockProcessed), waitForEventsUpToLockStartTime.UnixNano())
log.Infof("Writing changelog state: %+v", allEventsUpToLockProcessedChallenge)
if _, err := this.applier.WriteChangelogState(allEventsUpToLockProcessedChallenge); err != nil {
return err
}
log.Infof("Waiting for events up to lock")
atomic.StoreInt64(&this.migrationContext.AllEventsUpToLockProcessedInjectedFlag, 1)
<-this.allEventsUpToLockProcessed
for found := false; !found; {
select {
case <-timeout.C:
{
return log.Errorf("Timeout while waiting for events up to lock")
}
case state := <-this.allEventsUpToLockProcessed:
{
if state == allEventsUpToLockProcessedChallenge {
log.Infof("Waiting for events up to lock: got %s", state)
found = true
} else {
log.Infof("Waiting for events up to lock: skipping %s", state)
}
}
}
}
waitForEventsUpToLockDuration := time.Since(waitForEventsUpToLockStartTime)
log.Infof("Done waiting for events up to lock; duration=%+v", waitForEventsUpToLockDuration)
@ -466,8 +487,8 @@ func (this *Migrator) waitForEventsUpToLock() (err error) {
// There is a point in time where the "original" table does not exist and queries are non-blocked
// and failing.
func (this *Migrator) cutOverTwoStep() (err error) {
atomic.StoreInt64(&this.inCutOverCriticalActionFlag, 1)
defer atomic.StoreInt64(&this.inCutOverCriticalActionFlag, 0)
atomic.StoreInt64(&this.migrationContext.InCutOverCriticalSectionFlag, 1)
defer atomic.StoreInt64(&this.migrationContext.InCutOverCriticalSectionFlag, 0)
atomic.StoreInt64(&this.migrationContext.AllEventsUpToLockProcessedInjectedFlag, 0)
if err := this.retryOperation(this.applier.LockOriginalTable); err != nil {
@ -492,10 +513,12 @@ func (this *Migrator) cutOverTwoStep() (err error) {
// atomicCutOver
func (this *Migrator) atomicCutOver() (err error) {
atomic.StoreInt64(&this.inCutOverCriticalActionFlag, 1)
defer atomic.StoreInt64(&this.inCutOverCriticalActionFlag, 0)
atomic.StoreInt64(&this.migrationContext.InCutOverCriticalSectionFlag, 1)
defer atomic.StoreInt64(&this.migrationContext.InCutOverCriticalSectionFlag, 0)
okToUnlockTable := make(chan bool, 4)
defer func() {
okToUnlockTable <- true
this.applier.DropAtomicCutOverSentryTableIfExists()
}()
@ -503,7 +526,6 @@ func (this *Migrator) atomicCutOver() (err error) {
lockOriginalSessionIdChan := make(chan int64, 2)
tableLocked := make(chan error, 2)
okToUnlockTable := make(chan bool, 3)
tableUnlocked := make(chan error, 2)
go func() {
if err := this.applier.AtomicCutOverMagicLock(lockOriginalSessionIdChan, tableLocked, okToUnlockTable, tableUnlocked); err != nil {
@ -517,7 +539,9 @@ func (this *Migrator) atomicCutOver() (err error) {
log.Infof("Session locking original & magic tables is %+v", lockOriginalSessionId)
// At this point we know the original table is locked.
// We know any newly incoming DML on original table is blocked.
this.waitForEventsUpToLock()
if err := this.waitForEventsUpToLock(); err != nil {
return log.Errore(err)
}
// Step 2
// We now attempt an atomic RENAME on original & ghost tables, and expect it to block.
@ -803,7 +827,7 @@ func (this *Migrator) printStatus(rule PrintStatusRule, writers ...io.Writer) {
} else if atomic.LoadInt64(&this.migrationContext.IsPostponingCutOver) > 0 {
eta = "due"
state = "postponing cut-over"
} else if isThrottled, throttleReason := this.migrationContext.IsThrottled(); isThrottled {
} else if isThrottled, throttleReason, _ := this.migrationContext.IsThrottled(); isThrottled {
state = fmt.Sprintf("throttled, %s", throttleReason)
}
@ -997,15 +1021,8 @@ func (this *Migrator) executeWriteFuncs() error {
return nil
}
for {
if atomic.LoadInt64(&this.inCutOverCriticalActionFlag) == 0 {
// we don't throttle when cutting over. We _do_ throttle:
// - during copy phase
// - just before cut-over
// - in between cut-over retries
this.throttler.throttle(nil)
// When cutting over, we need to be aggressive. Cut-over holds table locks.
// We need to release those asap.
}
this.throttler.throttle(nil)
// We give higher priority to event processing, then secondary priority to
// rowcopy
select {
@ -1058,6 +1075,9 @@ func (this *Migrator) finalCleanup() error {
log.Errore(err)
}
}
if err := this.eventsStreamer.Close(); err != nil {
log.Errore(err)
}
if err := this.retryOperation(this.applier.DropChangelogTable); err != nil {
return err

View File

@ -217,3 +217,9 @@ func (this *EventsStreamer) StreamEvents(canStopStreaming func() bool) error {
}
}
}
func (this *EventsStreamer) Close() (err error) {
err = this.binlogReader.Close()
log.Infof("Closed streamer connection. err=%+v", err)
return err
}

View File

@ -34,16 +34,16 @@ func NewThrottler(applier *Applier, inspector *Inspector) *Throttler {
// shouldThrottle performs checks to see whether we should currently be throttling.
// It merely observes the metrics collected by other components, it does not issue
// its own metric collection.
func (this *Throttler) shouldThrottle() (result bool, reason string) {
func (this *Throttler) shouldThrottle() (result bool, reason string, reasonHint base.ThrottleReasonHint) {
generalCheckResult := this.migrationContext.GetThrottleGeneralCheckResult()
if generalCheckResult.ShouldThrottle {
return generalCheckResult.ShouldThrottle, generalCheckResult.Reason
return generalCheckResult.ShouldThrottle, generalCheckResult.Reason, generalCheckResult.ReasonHint
}
// Replication lag throttle
maxLagMillisecondsThrottleThreshold := atomic.LoadInt64(&this.migrationContext.MaxLagMillisecondsThrottleThreshold)
lag := atomic.LoadInt64(&this.migrationContext.CurrentLag)
if time.Duration(lag) > time.Duration(maxLagMillisecondsThrottleThreshold)*time.Millisecond {
return true, fmt.Sprintf("lag=%fs", time.Duration(lag).Seconds())
return true, fmt.Sprintf("lag=%fs", time.Duration(lag).Seconds()), base.NoThrottleReasonHint
}
checkThrottleControlReplicas := true
if (this.migrationContext.TestOnReplica || this.migrationContext.MigrateOnReplica) && (atomic.LoadInt64(&this.migrationContext.AllEventsUpToLockProcessedInjectedFlag) > 0) {
@ -52,14 +52,14 @@ func (this *Throttler) shouldThrottle() (result bool, reason string) {
if checkThrottleControlReplicas {
lagResult := this.migrationContext.GetControlReplicasLagResult()
if lagResult.Err != nil {
return true, fmt.Sprintf("%+v %+v", lagResult.Key, lagResult.Err)
return true, fmt.Sprintf("%+v %+v", lagResult.Key, lagResult.Err), base.NoThrottleReasonHint
}
if lagResult.Lag > time.Duration(maxLagMillisecondsThrottleThreshold)*time.Millisecond {
return true, fmt.Sprintf("%+v replica-lag=%fs", lagResult.Key, lagResult.Lag.Seconds())
return true, fmt.Sprintf("%+v replica-lag=%fs", lagResult.Key, lagResult.Lag.Seconds()), base.NoThrottleReasonHint
}
}
// Got here? No metrics indicates we need throttling.
return false, ""
return false, "", base.NoThrottleReasonHint
}
// parseChangelogHeartbeat is called when a heartbeat event is intercepted
@ -147,8 +147,8 @@ func (this *Throttler) criticalLoadIsMet() (met bool, variableName string, value
// collectGeneralThrottleMetrics reads the once-per-sec metrics, and stores them onto this.migrationContext
func (this *Throttler) collectGeneralThrottleMetrics() error {
setThrottle := func(throttle bool, reason string) error {
this.migrationContext.SetThrottleGeneralCheckResult(base.NewThrottleCheckResult(throttle, reason))
setThrottle := func(throttle bool, reason string, reasonHint base.ThrottleReasonHint) error {
this.migrationContext.SetThrottleGeneralCheckResult(base.NewThrottleCheckResult(throttle, reason, reasonHint))
return nil
}
@ -161,7 +161,7 @@ func (this *Throttler) collectGeneralThrottleMetrics() error {
criticalLoadMet, variableName, value, threshold, err := this.criticalLoadIsMet()
if err != nil {
return setThrottle(true, fmt.Sprintf("%s %s", variableName, err))
return setThrottle(true, fmt.Sprintf("%s %s", variableName, err), base.NoThrottleReasonHint)
}
if criticalLoadMet && this.migrationContext.CriticalLoadIntervalMilliseconds == 0 {
this.migrationContext.PanicAbort <- fmt.Errorf("critical-load met: %s=%d, >=%d", variableName, value, threshold)
@ -181,18 +181,18 @@ func (this *Throttler) collectGeneralThrottleMetrics() error {
// User-based throttle
if atomic.LoadInt64(&this.migrationContext.ThrottleCommandedByUser) > 0 {
return setThrottle(true, "commanded by user")
return setThrottle(true, "commanded by user", base.UserCommandThrottleReasonHint)
}
if this.migrationContext.ThrottleFlagFile != "" {
if base.FileExists(this.migrationContext.ThrottleFlagFile) {
// Throttle file defined and exists!
return setThrottle(true, "flag-file")
return setThrottle(true, "flag-file", base.NoThrottleReasonHint)
}
}
if this.migrationContext.ThrottleAdditionalFlagFile != "" {
if base.FileExists(this.migrationContext.ThrottleAdditionalFlagFile) {
// 2nd Throttle file defined and exists!
return setThrottle(true, "flag-file")
return setThrottle(true, "flag-file", base.NoThrottleReasonHint)
}
}
@ -200,19 +200,19 @@ func (this *Throttler) collectGeneralThrottleMetrics() error {
for variableName, threshold := range maxLoad {
value, err := this.applier.ShowStatusVariable(variableName)
if err != nil {
return setThrottle(true, fmt.Sprintf("%s %s", variableName, err))
return setThrottle(true, fmt.Sprintf("%s %s", variableName, err), base.NoThrottleReasonHint)
}
if value >= threshold {
return setThrottle(true, fmt.Sprintf("max-load %s=%d >= %d", variableName, value, threshold))
return setThrottle(true, fmt.Sprintf("max-load %s=%d >= %d", variableName, value, threshold), base.NoThrottleReasonHint)
}
}
if this.migrationContext.GetThrottleQuery() != "" {
if res, _ := this.applier.ExecuteThrottleQuery(); res > 0 {
return setThrottle(true, "throttle-query")
return setThrottle(true, "throttle-query", base.NoThrottleReasonHint)
}
}
return setThrottle(false, "")
return setThrottle(false, "", base.NoThrottleReasonHint)
}
// initiateThrottlerMetrics initiates the various processes that collect measurements
@ -237,8 +237,8 @@ func (this *Throttler) initiateThrottlerChecks() error {
throttlerTick := time.Tick(100 * time.Millisecond)
throttlerFunction := func() {
alreadyThrottling, currentReason := this.migrationContext.IsThrottled()
shouldThrottle, throttleReason := this.shouldThrottle()
alreadyThrottling, currentReason, _ := this.migrationContext.IsThrottled()
shouldThrottle, throttleReason, throttleReasonHint := this.shouldThrottle()
if shouldThrottle && !alreadyThrottling {
// New throttling
this.applier.WriteAndLogChangelog("throttle", throttleReason)
@ -249,7 +249,7 @@ func (this *Throttler) initiateThrottlerChecks() error {
// End of throttling
this.applier.WriteAndLogChangelog("throttle", "done throttling")
}
this.migrationContext.SetThrottled(shouldThrottle, throttleReason)
this.migrationContext.SetThrottled(shouldThrottle, throttleReason, throttleReasonHint)
}
throttlerFunction()
for range throttlerTick {
@ -265,7 +265,7 @@ func (this *Throttler) throttle(onThrottled func()) {
for {
// IsThrottled() is non-blocking; the throttling decision making takes place asynchronously.
// Therefore calling IsThrottled() is cheap
if shouldThrottle, _ := this.migrationContext.IsThrottled(); !shouldThrottle {
if shouldThrottle, _, _ := this.migrationContext.IsThrottled(); !shouldThrottle {
return
}
if onThrottled != nil {

View File

@ -0,0 +1,28 @@
drop table if exists gh_ost_test;
create table gh_ost_test (
id int auto_increment,
i int not null,
dt0 datetime(6),
dt1 datetime(6),
ts2 timestamp(6),
updated tinyint unsigned default 0,
primary key(id),
key i_idx(i)
) auto_increment=1;
drop event if exists gh_ost_test;
delimiter ;;
create event gh_ost_test
on schedule every 1 second
starts current_timestamp
ends current_timestamp + interval 60 second
on completion not preserve
enable
do
begin
insert into gh_ost_test values (null, 11, now(), now(), now(), 0);
update gh_ost_test set dt1='2016-10-31 11:22:33.444', updated = 1 where i = 11 order by id desc limit 1;
insert into gh_ost_test values (null, 13, now(), now(), now(), 0);
update gh_ost_test set ts1='2016-11-01 11:22:33.444', updated = 1 where i = 13 order by id desc limit 1;
end ;;

View File

@ -1,7 +1,7 @@
drop table if exists gh_ost_test;
create table gh_ost_test (
id int auto_increment,
t varchar(128),
t varchar(128) charset latin1 collate latin1_swedish_ci,
primary key(id)
) auto_increment=1 charset latin1 collate latin1_swedish_ci;
@ -17,5 +17,9 @@ create event gh_ost_test
begin
insert into gh_ost_test values (null, md5(rand()));
insert into gh_ost_test values (null, 'átesting');
insert into gh_ost_test values (null, 'ádelete');
insert into gh_ost_test values (null, 'testátest');
update gh_ost_test set t='áupdated' order by id desc limit 1;
update gh_ost_test set t='áupdated1' where t='áupdated' order by id desc limit 1;
delete from gh_ost_test where t='ádelete';
end ;;

16
script/bootstrap Executable file
View File

@ -0,0 +1,16 @@
#!/bin/bash
set -e
# Make sure we have the version of Go we want to depend on, either from the
# system or one we grab ourselves.
. script/ensure-go-installed
# Since we want to be able to build this outside of GOPATH, we set it
# up so it points back to us and go is none the wiser
set -x
rm -rf .gopath
mkdir -p .gopath/src/github.com/github
ln -s "$PWD" .gopath/src/github.com/github/gh-ost
export GOPATH=$PWD/.gopath:$GOPATH

20
script/build Executable file
View File

@ -0,0 +1,20 @@
#!/bin/bash
set -e
. script/bootstrap
mkdir -p bin
bindir="$PWD"/bin
scriptdir="$PWD"/script
# We have a few binaries that we want to build, so let's put them into bin/
version=$(git rev-parse HEAD)
describe=$(git describe --tags --always --dirty)
export GOPATH="$PWD/.gopath"
cd .gopath/src/github.com/github/gh-ost
# We put the binaries directly into the bindir, because we have no need for shim wrappers
go build -o "$bindir/gh-ost" -ldflags "-X main.AppVersion=${version} -X main.BuildDescribe=${describe}" ./go/cmd/gh-ost/main.go

17
script/cibuild Executable file
View File

@ -0,0 +1,17 @@
#!/bin/bash
set -e
. script/bootstrap
echo "Verifying code is formatted via 'gofmt -s -w go/'"
gofmt -s -w go/
git diff --exit-code --quiet
echo "Building"
script/build
cd .gopath/src/github.com/github/gh-ost
echo "Running unit tests"
go test ./go/...

View File

@ -0,0 +1,37 @@
#!/bin/sh
set -e
script/cibuild
# Get a fresh directory and make sure to delete it afterwards
build_dir=tmp/build
rm -rf $build_dir
mkdir -p $build_dir
trap "rm -rf $build_dir" EXIT
commit_sha=$(git rev-parse HEAD)
if [ $(uname -s) = "Darwin" ]; then
build_arch="$(uname -sr | tr -d ' ' | tr '[:upper:]' '[:lower:]')-$(uname -m)"
else
build_arch="$(lsb_release -sc | tr -d ' ' | tr '[:upper:]' '[:lower:]')-$(uname -m)"
fi
tarball=$build_dir/${commit_sha}-${build_arch}.tar
# Create the tarball
tar cvf $tarball --mode="ugo=rx" bin/
# Compress it and copy it to the directory for the CI to upload it
gzip $tarball
mkdir -p "$BUILD_ARTIFACT_DIR"/gh-ost
cp ${tarball}.gz "$BUILD_ARTIFACT_DIR"/gh-ost/
### HACK HACK HACK ###
# Blame @carlosmn. In the good way.
# We don't have any jessie machines for building, but a pure-Go binary depends
# on a version of libc and ld which are widely available, so we can copy the
# tarball over with jessie in its name so we can deploy it on jessie machines.
jessie_tarball_name=$(echo $(basename "${tarball}") | sed s/-precise-/-jessie-/)
cp ${tarball}.gz "$BUILD_ARTIFACT_DIR/gh-ost/${jessie_tarball_name}.gz"

51
script/ensure-go-installed Executable file
View File

@ -0,0 +1,51 @@
#!/bin/bash
GO_VERSION=go1.7
GO_PKG_DARWIN=${GO_VERSION}.darwin-amd64.pkg
GO_PKG_DARWIN_SHA=e7089843bc7148ffcc147759985b213604d22bb9fd19bd930b515aa981bf1b22
GO_PKG_LINUX=${GO_VERSION}.linux-amd64.tar.gz
GO_PKG_LINUX_SHA=702ad90f705365227e902b42d91dd1a40e48ca7f67a2f4b2fd052aaa4295cd95
export ROOTDIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )/.." && pwd )"
cd $ROOTDIR
# If Go isn't installed globally, setup environment variables for local install.
if [ -z "$(which go)" ] || [ -z "$(go version | grep $GO_VERSION)" ]; then
GODIR="$ROOTDIR/.vendor/go17"
if [ $(uname -s) = "Darwin" ]; then
export GOROOT="$GODIR/usr/local/go"
else
export GOROOT="$GODIR/go"
fi
export PATH="$GOROOT/bin:$PATH"
fi
# Check if local install exists, and install otherwise.
if [ -z "$(which go)" ] || [ -z "$(go version | grep $GO_VERSION)" ]; then
[ -d "$GODIR" ] && rm -rf $GODIR
mkdir -p "$GODIR"
cd "$GODIR";
if [ $(uname -s) = "Darwin" ]; then
curl -L -O https://storage.googleapis.com/golang/$GO_PKG_DARWIN
shasum -a256 $GO_PKG_DARWIN | grep $GO_PKG_DARWIN_SHA
xar -xf $GO_PKG_DARWIN
cpio -i < com.googlecode.go.pkg/Payload
else
curl -L -O https://storage.googleapis.com/golang/$GO_PKG_LINUX
shasum -a256 $GO_PKG_LINUX | grep $GO_PKG_LINUX_SHA
tar xf $GO_PKG_LINUX
fi
# Prove we did something right
echo "$GO_VERSION installed in $GODIR: Go Binary: $(which go)"
fi
cd $ROOTDIR
# Configure the new go to be the first go found
export GOPATH=$ROOTDIR/.vendor

11
script/go Executable file
View File

@ -0,0 +1,11 @@
#!/bin/bash
set -e
. script/bootstrap
mkdir -p bin
bindir="$PWD"/bin
cd .gopath/src/github.com/github/gh-ost
go "$@"

76
vendor/github.com/outbrain/golib/tests/spec.go generated vendored Normal file
View File

@ -0,0 +1,76 @@
package tests
import (
"testing"
)
// Spec is an access point to test Expections
type Spec struct {
t *testing.T
}
// S generates a spec. You will want to use it once in a test file, once in a test or once per each check
func S(t *testing.T) *Spec {
return &Spec{t: t}
}
// ExpectNil expects given value to be nil, or errors
func (spec *Spec) ExpectNil(actual interface{}) {
if actual == nil {
return
}
spec.t.Errorf("Expected %+v to be nil", actual)
}
// ExpectNotNil expects given value to be not nil, or errors
func (spec *Spec) ExpectNotNil(actual interface{}) {
if actual != nil {
return
}
spec.t.Errorf("Expected %+v to be not nil", actual)
}
// ExpectEquals expects given values to be equal (comparison via `==`), or errors
func (spec *Spec) ExpectEquals(actual, value interface{}) {
if actual == value {
return
}
spec.t.Errorf("Expected %+v, got %+v", value, actual)
}
// ExpectNotEquals expects given values to be nonequal (comparison via `==`), or errors
func (spec *Spec) ExpectNotEquals(actual, value interface{}) {
if !(actual == value) {
return
}
spec.t.Errorf("Expected not %+v", value)
}
// ExpectEqualsAny expects given actual to equal (comparison via `==`) at least one of given values, or errors
func (spec *Spec) ExpectEqualsAny(actual interface{}, values ...interface{}) {
for _, value := range values {
if actual == value {
return
}
}
spec.t.Errorf("Expected %+v to equal any of given values", actual)
}
// ExpectNotEqualsAny expects given actual to be nonequal (comparison via `==`)tp any of given values, or errors
func (spec *Spec) ExpectNotEqualsAny(actual interface{}, values ...interface{}) {
for _, value := range values {
if actual == value {
spec.t.Errorf("Expected not %+v", value)
}
}
}
// ExpectFalse expects given values to be false, or errors
func (spec *Spec) ExpectFalse(actual interface{}) {
spec.ExpectEquals(actual, false)
}
// ExpectTrue expects given values to be true, or errors
func (spec *Spec) ExpectTrue(actual interface{}) {
spec.ExpectEquals(actual, true)
}

View File

@ -642,7 +642,7 @@ func decodeDatetime2(data []byte, dec uint16) (interface{}, int, error) {
}
//ingore second part, no precision now
//var secPart int64 = tmp % (1 << 24)
var secPart int64 = tmp % (1 << 24)
ymdhms := tmp >> 24
ymd := ymdhms >> 17
@ -657,6 +657,9 @@ func decodeDatetime2(data []byte, dec uint16) (interface{}, int, error) {
minute := int((hms >> 6) % (1 << 6))
hour := int((hms >> 12))
if secPart != 0 {
return fmt.Sprintf("%04d-%02d-%02d %02d:%02d:%02d.%d", year, month, day, hour, minute, second, secPart), n, nil // commented by Shlomi Noach. Yes I know about `git blame`
}
return fmt.Sprintf("%04d-%02d-%02d %02d:%02d:%02d", year, month, day, hour, minute, second), n, nil // commented by Shlomi Noach. Yes I know about `git blame`
}