diff --git a/RELEASE_VERSION b/RELEASE_VERSION new file mode 100644 index 0000000..8b54409 --- /dev/null +++ b/RELEASE_VERSION @@ -0,0 +1 @@ +1.0.28 diff --git a/build.sh b/build.sh index c12b6a1..3e1ce6f 100755 --- a/build.sh +++ b/build.sh @@ -2,7 +2,7 @@ # # -RELEASE_VERSION="1.0.26" +RELEASE_VERSION=$(cat RELEASE_VERSION) function build { osname=$1 diff --git a/go/base/context.go b/go/base/context.go index fe201e4..f82fb22 100644 --- a/go/base/context.go +++ b/go/base/context.go @@ -37,6 +37,13 @@ const ( CutOverTwoStep = iota ) +type ThrottleReasonHint string + +const ( + NoThrottleReasonHint ThrottleReasonHint = "NoThrottleReasonHint" + UserCommandThrottleReasonHint = "UserCommandThrottleReasonHint" +) + var ( envVariableRegexp = regexp.MustCompile("[$][{](.*)[}]") ) @@ -44,12 +51,14 @@ var ( type ThrottleCheckResult struct { ShouldThrottle bool Reason string + ReasonHint ThrottleReasonHint } -func NewThrottleCheckResult(throttle bool, reason string) *ThrottleCheckResult { +func NewThrottleCheckResult(throttle bool, reason string, reasonHint ThrottleReasonHint) *ThrottleCheckResult { return &ThrottleCheckResult{ ShouldThrottle: throttle, Reason: reason, + ReasonHint: reasonHint, } } @@ -138,6 +147,7 @@ type MigrationContext struct { TotalDMLEventsApplied int64 isThrottled bool throttleReason string + throttleReasonHint ThrottleReasonHint throttleGeneralCheckResult ThrottleCheckResult throttleMutex *sync.Mutex IsPostponingCutOver int64 @@ -145,6 +155,7 @@ type MigrationContext struct { AllEventsUpToLockProcessedInjectedFlag int64 CleanupImminentFlag int64 UserCommandedUnpostponeFlag int64 + CutOverCompleteFlag int64 PanicAbort chan error OriginalTableColumnsOnApplier *sql.ColumnList @@ -416,17 +427,18 @@ func (this *MigrationContext) GetThrottleGeneralCheckResult() *ThrottleCheckResu return &result } -func (this *MigrationContext) SetThrottled(throttle bool, reason string) { +func (this *MigrationContext) SetThrottled(throttle bool, reason string, reasonHint ThrottleReasonHint) { this.throttleMutex.Lock() defer this.throttleMutex.Unlock() this.isThrottled = throttle this.throttleReason = reason + this.throttleReasonHint = reasonHint } -func (this *MigrationContext) IsThrottled() (bool, string) { +func (this *MigrationContext) IsThrottled() (bool, string, ThrottleReasonHint) { this.throttleMutex.Lock() defer this.throttleMutex.Unlock() - return this.isThrottled, this.throttleReason + return this.isThrottled, this.throttleReason, this.throttleReasonHint } func (this *MigrationContext) GetReplicationLagQuery() string { diff --git a/go/binlog/gomysql_reader.go b/go/binlog/gomysql_reader.go index 35f93f4..6d338ca 100644 --- a/go/binlog/gomysql_reader.go +++ b/go/binlog/gomysql_reader.go @@ -118,6 +118,9 @@ func (this *GoMySQLReader) handleRowsEvent(ev *replication.BinlogEvent, rowsEven // StreamEvents func (this *GoMySQLReader) StreamEvents(canStopStreaming func() bool, entriesChannel chan<- *BinlogEntry) error { + if canStopStreaming() { + return nil + } for { if canStopStreaming() { break @@ -148,3 +151,8 @@ func (this *GoMySQLReader) StreamEvents(canStopStreaming func() bool, entriesCha return nil } + +func (this *GoMySQLReader) Close() error { + this.binlogSyncer.Close() + return nil +} diff --git a/go/logic/applier.go b/go/logic/applier.go index 286728a..73d98e8 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -305,6 +305,9 @@ func (this *Applier) InitiateHeartbeat() { // Generally speaking, we would issue a goroutine, but I'd actually rather // have this block the loop rather than spam the master in the event something // goes wrong + if throttle, _, reasonHint := this.migrationContext.IsThrottled(); throttle && (reasonHint == base.UserCommandThrottleReasonHint) { + continue + } if err := injectHeartbeat(); err != nil { return } diff --git a/go/logic/inspect.go b/go/logic/inspect.go index 595d76c..a6ecd3d 100644 --- a/go/logic/inspect.go +++ b/go/logic/inspect.go @@ -335,12 +335,27 @@ func (this *Inspector) validateLogSlaveUpdates() error { if err := this.db.QueryRow(query).Scan(&logSlaveUpdates); err != nil { return err } - if !logSlaveUpdates && !this.migrationContext.InspectorIsAlsoApplier() && !this.migrationContext.IsTungsten { - return fmt.Errorf("%s:%d must have log_slave_updates enabled", this.connectionConfig.Key.Hostname, this.connectionConfig.Key.Port) + + if logSlaveUpdates { + log.Infof("log_slave_updates validated on %s:%d", this.connectionConfig.Key.Hostname, this.connectionConfig.Key.Port) + return nil } - log.Infof("binary logs updates validated on %s:%d", this.connectionConfig.Key.Hostname, this.connectionConfig.Key.Port) - return nil + if this.migrationContext.IsTungsten { + log.Warning("log_slave_updates not found on %s:%d, but --tungsten provided, so I'm proceeding", this.connectionConfig.Key.Hostname, this.connectionConfig.Key.Port) + return nil + } + + if this.migrationContext.TestOnReplica || this.migrationContext.MigrateOnReplica { + return fmt.Errorf("%s:%d must have log_slave_updates enabled for testing/migrating on replica", this.connectionConfig.Key.Hostname, this.connectionConfig.Key.Port) + } + + if this.migrationContext.InspectorIsAlsoApplier() { + log.Warning("log_slave_updates not found on %s:%d, but executing directly on master, so I'm proceeeding", this.connectionConfig.Key.Hostname, this.connectionConfig.Key.Port) + return nil + } + + return fmt.Errorf("%s:%d must have log_slave_updates enabled for executing migration", this.connectionConfig.Key.Hostname, this.connectionConfig.Key.Port) } // validateTable makes sure the table we need to operate on actually exists diff --git a/go/logic/migrator.go b/go/logic/migrator.go index b0a6246..34728c8 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -171,7 +171,7 @@ func (this *Migrator) consumeRowCopyComplete() { } func (this *Migrator) canStopStreaming() bool { - return false + return atomic.LoadInt64(&this.migrationContext.CutOverCompleteFlag) != 0 } // onChangelogStateEvent is called when a binlog event operation on the changelog table is intercepted. @@ -345,6 +345,7 @@ func (this *Migrator) Migrate() (err error) { if err := this.cutOver(); err != nil { return err } + atomic.StoreInt64(&this.migrationContext.CutOverCompleteFlag, 1) if err := this.finalCleanup(); err != nil { return nil @@ -803,7 +804,7 @@ func (this *Migrator) printStatus(rule PrintStatusRule, writers ...io.Writer) { } else if atomic.LoadInt64(&this.migrationContext.IsPostponingCutOver) > 0 { eta = "due" state = "postponing cut-over" - } else if isThrottled, throttleReason := this.migrationContext.IsThrottled(); isThrottled { + } else if isThrottled, throttleReason, _ := this.migrationContext.IsThrottled(); isThrottled { state = fmt.Sprintf("throttled, %s", throttleReason) } @@ -1058,6 +1059,9 @@ func (this *Migrator) finalCleanup() error { log.Errore(err) } } + if err := this.eventsStreamer.Close(); err != nil { + log.Errore(err) + } if err := this.retryOperation(this.applier.DropChangelogTable); err != nil { return err diff --git a/go/logic/streamer.go b/go/logic/streamer.go index 86faab1..dc5ba60 100644 --- a/go/logic/streamer.go +++ b/go/logic/streamer.go @@ -217,3 +217,9 @@ func (this *EventsStreamer) StreamEvents(canStopStreaming func() bool) error { } } } + +func (this *EventsStreamer) Close() (err error) { + err = this.binlogReader.Close() + log.Infof("Closed streamer connection. err=%+v", err) + return err +} diff --git a/go/logic/throttler.go b/go/logic/throttler.go index ee22931..482087c 100644 --- a/go/logic/throttler.go +++ b/go/logic/throttler.go @@ -34,16 +34,16 @@ func NewThrottler(applier *Applier, inspector *Inspector) *Throttler { // shouldThrottle performs checks to see whether we should currently be throttling. // It merely observes the metrics collected by other components, it does not issue // its own metric collection. -func (this *Throttler) shouldThrottle() (result bool, reason string) { +func (this *Throttler) shouldThrottle() (result bool, reason string, reasonHint base.ThrottleReasonHint) { generalCheckResult := this.migrationContext.GetThrottleGeneralCheckResult() if generalCheckResult.ShouldThrottle { - return generalCheckResult.ShouldThrottle, generalCheckResult.Reason + return generalCheckResult.ShouldThrottle, generalCheckResult.Reason, generalCheckResult.ReasonHint } // Replication lag throttle maxLagMillisecondsThrottleThreshold := atomic.LoadInt64(&this.migrationContext.MaxLagMillisecondsThrottleThreshold) lag := atomic.LoadInt64(&this.migrationContext.CurrentLag) if time.Duration(lag) > time.Duration(maxLagMillisecondsThrottleThreshold)*time.Millisecond { - return true, fmt.Sprintf("lag=%fs", time.Duration(lag).Seconds()) + return true, fmt.Sprintf("lag=%fs", time.Duration(lag).Seconds()), base.NoThrottleReasonHint } checkThrottleControlReplicas := true if (this.migrationContext.TestOnReplica || this.migrationContext.MigrateOnReplica) && (atomic.LoadInt64(&this.migrationContext.AllEventsUpToLockProcessedInjectedFlag) > 0) { @@ -52,14 +52,14 @@ func (this *Throttler) shouldThrottle() (result bool, reason string) { if checkThrottleControlReplicas { lagResult := this.migrationContext.GetControlReplicasLagResult() if lagResult.Err != nil { - return true, fmt.Sprintf("%+v %+v", lagResult.Key, lagResult.Err) + return true, fmt.Sprintf("%+v %+v", lagResult.Key, lagResult.Err), base.NoThrottleReasonHint } if lagResult.Lag > time.Duration(maxLagMillisecondsThrottleThreshold)*time.Millisecond { - return true, fmt.Sprintf("%+v replica-lag=%fs", lagResult.Key, lagResult.Lag.Seconds()) + return true, fmt.Sprintf("%+v replica-lag=%fs", lagResult.Key, lagResult.Lag.Seconds()), base.NoThrottleReasonHint } } // Got here? No metrics indicates we need throttling. - return false, "" + return false, "", base.NoThrottleReasonHint } // parseChangelogHeartbeat is called when a heartbeat event is intercepted @@ -147,8 +147,8 @@ func (this *Throttler) criticalLoadIsMet() (met bool, variableName string, value // collectGeneralThrottleMetrics reads the once-per-sec metrics, and stores them onto this.migrationContext func (this *Throttler) collectGeneralThrottleMetrics() error { - setThrottle := func(throttle bool, reason string) error { - this.migrationContext.SetThrottleGeneralCheckResult(base.NewThrottleCheckResult(throttle, reason)) + setThrottle := func(throttle bool, reason string, reasonHint base.ThrottleReasonHint) error { + this.migrationContext.SetThrottleGeneralCheckResult(base.NewThrottleCheckResult(throttle, reason, reasonHint)) return nil } @@ -161,7 +161,7 @@ func (this *Throttler) collectGeneralThrottleMetrics() error { criticalLoadMet, variableName, value, threshold, err := this.criticalLoadIsMet() if err != nil { - return setThrottle(true, fmt.Sprintf("%s %s", variableName, err)) + return setThrottle(true, fmt.Sprintf("%s %s", variableName, err), base.NoThrottleReasonHint) } if criticalLoadMet && this.migrationContext.CriticalLoadIntervalMilliseconds == 0 { this.migrationContext.PanicAbort <- fmt.Errorf("critical-load met: %s=%d, >=%d", variableName, value, threshold) @@ -181,18 +181,18 @@ func (this *Throttler) collectGeneralThrottleMetrics() error { // User-based throttle if atomic.LoadInt64(&this.migrationContext.ThrottleCommandedByUser) > 0 { - return setThrottle(true, "commanded by user") + return setThrottle(true, "commanded by user", base.UserCommandThrottleReasonHint) } if this.migrationContext.ThrottleFlagFile != "" { if base.FileExists(this.migrationContext.ThrottleFlagFile) { // Throttle file defined and exists! - return setThrottle(true, "flag-file") + return setThrottle(true, "flag-file", base.NoThrottleReasonHint) } } if this.migrationContext.ThrottleAdditionalFlagFile != "" { if base.FileExists(this.migrationContext.ThrottleAdditionalFlagFile) { // 2nd Throttle file defined and exists! - return setThrottle(true, "flag-file") + return setThrottle(true, "flag-file", base.NoThrottleReasonHint) } } @@ -200,19 +200,19 @@ func (this *Throttler) collectGeneralThrottleMetrics() error { for variableName, threshold := range maxLoad { value, err := this.applier.ShowStatusVariable(variableName) if err != nil { - return setThrottle(true, fmt.Sprintf("%s %s", variableName, err)) + return setThrottle(true, fmt.Sprintf("%s %s", variableName, err), base.NoThrottleReasonHint) } if value >= threshold { - return setThrottle(true, fmt.Sprintf("max-load %s=%d >= %d", variableName, value, threshold)) + return setThrottle(true, fmt.Sprintf("max-load %s=%d >= %d", variableName, value, threshold), base.NoThrottleReasonHint) } } if this.migrationContext.GetThrottleQuery() != "" { if res, _ := this.applier.ExecuteThrottleQuery(); res > 0 { - return setThrottle(true, "throttle-query") + return setThrottle(true, "throttle-query", base.NoThrottleReasonHint) } } - return setThrottle(false, "") + return setThrottle(false, "", base.NoThrottleReasonHint) } // initiateThrottlerMetrics initiates the various processes that collect measurements @@ -237,8 +237,8 @@ func (this *Throttler) initiateThrottlerChecks() error { throttlerTick := time.Tick(100 * time.Millisecond) throttlerFunction := func() { - alreadyThrottling, currentReason := this.migrationContext.IsThrottled() - shouldThrottle, throttleReason := this.shouldThrottle() + alreadyThrottling, currentReason, _ := this.migrationContext.IsThrottled() + shouldThrottle, throttleReason, throttleReasonHint := this.shouldThrottle() if shouldThrottle && !alreadyThrottling { // New throttling this.applier.WriteAndLogChangelog("throttle", throttleReason) @@ -249,7 +249,7 @@ func (this *Throttler) initiateThrottlerChecks() error { // End of throttling this.applier.WriteAndLogChangelog("throttle", "done throttling") } - this.migrationContext.SetThrottled(shouldThrottle, throttleReason) + this.migrationContext.SetThrottled(shouldThrottle, throttleReason, throttleReasonHint) } throttlerFunction() for range throttlerTick { @@ -265,7 +265,7 @@ func (this *Throttler) throttle(onThrottled func()) { for { // IsThrottled() is non-blocking; the throttling decision making takes place asynchronously. // Therefore calling IsThrottled() is cheap - if shouldThrottle, _ := this.migrationContext.IsThrottled(); !shouldThrottle { + if shouldThrottle, _, _ := this.migrationContext.IsThrottled(); !shouldThrottle { return } if onThrottled != nil { diff --git a/localtests/datetime-submillis/create.sql b/localtests/datetime-submillis/create.sql new file mode 100644 index 0000000..b4e0b0b --- /dev/null +++ b/localtests/datetime-submillis/create.sql @@ -0,0 +1,28 @@ +drop table if exists gh_ost_test; +create table gh_ost_test ( + id int auto_increment, + i int not null, + dt0 datetime(6), + dt1 datetime(6), + ts2 timestamp(6), + updated tinyint unsigned default 0, + primary key(id), + key i_idx(i) +) auto_increment=1; + +drop event if exists gh_ost_test; +delimiter ;; +create event gh_ost_test + on schedule every 1 second + starts current_timestamp + ends current_timestamp + interval 60 second + on completion not preserve + enable + do +begin + insert into gh_ost_test values (null, 11, now(), now(), now(), 0); + update gh_ost_test set dt1='2016-10-31 11:22:33.444', updated = 1 where i = 11 order by id desc limit 1; + + insert into gh_ost_test values (null, 13, now(), now(), now(), 0); + update gh_ost_test set ts1='2016-11-01 11:22:33.444', updated = 1 where i = 13 order by id desc limit 1; +end ;; diff --git a/localtests/latin1/create.sql b/localtests/latin1/create.sql index 3ca9b47..bc14aaf 100644 --- a/localtests/latin1/create.sql +++ b/localtests/latin1/create.sql @@ -1,7 +1,7 @@ drop table if exists gh_ost_test; create table gh_ost_test ( id int auto_increment, - t varchar(128), + t varchar(128) charset latin1 collate latin1_swedish_ci, primary key(id) ) auto_increment=1 charset latin1 collate latin1_swedish_ci; @@ -17,5 +17,9 @@ create event gh_ost_test begin insert into gh_ost_test values (null, md5(rand())); insert into gh_ost_test values (null, 'átesting'); + insert into gh_ost_test values (null, 'ádelete'); insert into gh_ost_test values (null, 'testátest'); + update gh_ost_test set t='áupdated' order by id desc limit 1; + update gh_ost_test set t='áupdated1' where t='áupdated' order by id desc limit 1; + delete from gh_ost_test where t='ádelete'; end ;; diff --git a/script/cibuild-gh-ost-build-deploy-tarball b/script/cibuild-gh-ost-build-deploy-tarball index 13560cc..692b42b 100755 --- a/script/cibuild-gh-ost-build-deploy-tarball +++ b/script/cibuild-gh-ost-build-deploy-tarball @@ -27,3 +27,11 @@ tar cvf $tarball --mode="ugo=rx" bin/ gzip $tarball mkdir -p "$BUILD_ARTIFACT_DIR"/gh-ost cp ${tarball}.gz "$BUILD_ARTIFACT_DIR"/gh-ost/ + +### HACK HACK HACK ### +# Blame @carlosmn. In the good way. +# We don't have any jessie machines for building, but a pure-Go binary depends +# on a version of libc and ld which are widely available, so we can copy the +# tarball over with jessie in its name so we can deploy it on jessie machines. +jessie_tarball_name=$(echo $(basename "${tarball}") | sed s/-precise-/-jessie-/) +cp ${tarball}.gz "$BUILD_ARTIFACT_DIR/gh-ost/${jessie_tarball_name}.gz" diff --git a/vendor/github.com/siddontang/go-mysql/replication/row_event.go b/vendor/github.com/siddontang/go-mysql/replication/row_event.go index 9506b1e..ff6c913 100644 --- a/vendor/github.com/siddontang/go-mysql/replication/row_event.go +++ b/vendor/github.com/siddontang/go-mysql/replication/row_event.go @@ -642,7 +642,7 @@ func decodeDatetime2(data []byte, dec uint16) (interface{}, int, error) { } //ingore second part, no precision now - //var secPart int64 = tmp % (1 << 24) + var secPart int64 = tmp % (1 << 24) ymdhms := tmp >> 24 ymd := ymdhms >> 17 @@ -657,6 +657,9 @@ func decodeDatetime2(data []byte, dec uint16) (interface{}, int, error) { minute := int((hms >> 6) % (1 << 6)) hour := int((hms >> 12)) + if secPart != 0 { + return fmt.Sprintf("%04d-%02d-%02d %02d:%02d:%02d.%d", year, month, day, hour, minute, second, secPart), n, nil // commented by Shlomi Noach. Yes I know about `git blame` + } return fmt.Sprintf("%04d-%02d-%02d %02d:%02d:%02d", year, month, day, hour, minute, second), n, nil // commented by Shlomi Noach. Yes I know about `git blame` }