Merge branch 'master' into more-charset-tests
This commit is contained in:
commit
5d9bac1c97
1
RELEASE_VERSION
Normal file
1
RELEASE_VERSION
Normal file
@ -0,0 +1 @@
|
|||||||
|
1.0.28
|
2
build.sh
2
build.sh
@ -2,7 +2,7 @@
|
|||||||
#
|
#
|
||||||
#
|
#
|
||||||
|
|
||||||
RELEASE_VERSION="1.0.26"
|
RELEASE_VERSION=$(cat RELEASE_VERSION)
|
||||||
|
|
||||||
function build {
|
function build {
|
||||||
osname=$1
|
osname=$1
|
||||||
|
@ -37,6 +37,13 @@ const (
|
|||||||
CutOverTwoStep = iota
|
CutOverTwoStep = iota
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type ThrottleReasonHint string
|
||||||
|
|
||||||
|
const (
|
||||||
|
NoThrottleReasonHint ThrottleReasonHint = "NoThrottleReasonHint"
|
||||||
|
UserCommandThrottleReasonHint = "UserCommandThrottleReasonHint"
|
||||||
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
envVariableRegexp = regexp.MustCompile("[$][{](.*)[}]")
|
envVariableRegexp = regexp.MustCompile("[$][{](.*)[}]")
|
||||||
)
|
)
|
||||||
@ -44,12 +51,14 @@ var (
|
|||||||
type ThrottleCheckResult struct {
|
type ThrottleCheckResult struct {
|
||||||
ShouldThrottle bool
|
ShouldThrottle bool
|
||||||
Reason string
|
Reason string
|
||||||
|
ReasonHint ThrottleReasonHint
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewThrottleCheckResult(throttle bool, reason string) *ThrottleCheckResult {
|
func NewThrottleCheckResult(throttle bool, reason string, reasonHint ThrottleReasonHint) *ThrottleCheckResult {
|
||||||
return &ThrottleCheckResult{
|
return &ThrottleCheckResult{
|
||||||
ShouldThrottle: throttle,
|
ShouldThrottle: throttle,
|
||||||
Reason: reason,
|
Reason: reason,
|
||||||
|
ReasonHint: reasonHint,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -138,6 +147,7 @@ type MigrationContext struct {
|
|||||||
TotalDMLEventsApplied int64
|
TotalDMLEventsApplied int64
|
||||||
isThrottled bool
|
isThrottled bool
|
||||||
throttleReason string
|
throttleReason string
|
||||||
|
throttleReasonHint ThrottleReasonHint
|
||||||
throttleGeneralCheckResult ThrottleCheckResult
|
throttleGeneralCheckResult ThrottleCheckResult
|
||||||
throttleMutex *sync.Mutex
|
throttleMutex *sync.Mutex
|
||||||
IsPostponingCutOver int64
|
IsPostponingCutOver int64
|
||||||
@ -145,6 +155,7 @@ type MigrationContext struct {
|
|||||||
AllEventsUpToLockProcessedInjectedFlag int64
|
AllEventsUpToLockProcessedInjectedFlag int64
|
||||||
CleanupImminentFlag int64
|
CleanupImminentFlag int64
|
||||||
UserCommandedUnpostponeFlag int64
|
UserCommandedUnpostponeFlag int64
|
||||||
|
CutOverCompleteFlag int64
|
||||||
PanicAbort chan error
|
PanicAbort chan error
|
||||||
|
|
||||||
OriginalTableColumnsOnApplier *sql.ColumnList
|
OriginalTableColumnsOnApplier *sql.ColumnList
|
||||||
@ -416,17 +427,18 @@ func (this *MigrationContext) GetThrottleGeneralCheckResult() *ThrottleCheckResu
|
|||||||
return &result
|
return &result
|
||||||
}
|
}
|
||||||
|
|
||||||
func (this *MigrationContext) SetThrottled(throttle bool, reason string) {
|
func (this *MigrationContext) SetThrottled(throttle bool, reason string, reasonHint ThrottleReasonHint) {
|
||||||
this.throttleMutex.Lock()
|
this.throttleMutex.Lock()
|
||||||
defer this.throttleMutex.Unlock()
|
defer this.throttleMutex.Unlock()
|
||||||
this.isThrottled = throttle
|
this.isThrottled = throttle
|
||||||
this.throttleReason = reason
|
this.throttleReason = reason
|
||||||
|
this.throttleReasonHint = reasonHint
|
||||||
}
|
}
|
||||||
|
|
||||||
func (this *MigrationContext) IsThrottled() (bool, string) {
|
func (this *MigrationContext) IsThrottled() (bool, string, ThrottleReasonHint) {
|
||||||
this.throttleMutex.Lock()
|
this.throttleMutex.Lock()
|
||||||
defer this.throttleMutex.Unlock()
|
defer this.throttleMutex.Unlock()
|
||||||
return this.isThrottled, this.throttleReason
|
return this.isThrottled, this.throttleReason, this.throttleReasonHint
|
||||||
}
|
}
|
||||||
|
|
||||||
func (this *MigrationContext) GetReplicationLagQuery() string {
|
func (this *MigrationContext) GetReplicationLagQuery() string {
|
||||||
|
@ -118,6 +118,9 @@ func (this *GoMySQLReader) handleRowsEvent(ev *replication.BinlogEvent, rowsEven
|
|||||||
|
|
||||||
// StreamEvents
|
// StreamEvents
|
||||||
func (this *GoMySQLReader) StreamEvents(canStopStreaming func() bool, entriesChannel chan<- *BinlogEntry) error {
|
func (this *GoMySQLReader) StreamEvents(canStopStreaming func() bool, entriesChannel chan<- *BinlogEntry) error {
|
||||||
|
if canStopStreaming() {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
for {
|
for {
|
||||||
if canStopStreaming() {
|
if canStopStreaming() {
|
||||||
break
|
break
|
||||||
@ -148,3 +151,8 @@ func (this *GoMySQLReader) StreamEvents(canStopStreaming func() bool, entriesCha
|
|||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (this *GoMySQLReader) Close() error {
|
||||||
|
this.binlogSyncer.Close()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
@ -305,6 +305,9 @@ func (this *Applier) InitiateHeartbeat() {
|
|||||||
// Generally speaking, we would issue a goroutine, but I'd actually rather
|
// Generally speaking, we would issue a goroutine, but I'd actually rather
|
||||||
// have this block the loop rather than spam the master in the event something
|
// have this block the loop rather than spam the master in the event something
|
||||||
// goes wrong
|
// goes wrong
|
||||||
|
if throttle, _, reasonHint := this.migrationContext.IsThrottled(); throttle && (reasonHint == base.UserCommandThrottleReasonHint) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
if err := injectHeartbeat(); err != nil {
|
if err := injectHeartbeat(); err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -335,12 +335,27 @@ func (this *Inspector) validateLogSlaveUpdates() error {
|
|||||||
if err := this.db.QueryRow(query).Scan(&logSlaveUpdates); err != nil {
|
if err := this.db.QueryRow(query).Scan(&logSlaveUpdates); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if !logSlaveUpdates && !this.migrationContext.InspectorIsAlsoApplier() && !this.migrationContext.IsTungsten {
|
|
||||||
return fmt.Errorf("%s:%d must have log_slave_updates enabled", this.connectionConfig.Key.Hostname, this.connectionConfig.Key.Port)
|
if logSlaveUpdates {
|
||||||
|
log.Infof("log_slave_updates validated on %s:%d", this.connectionConfig.Key.Hostname, this.connectionConfig.Key.Port)
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Infof("binary logs updates validated on %s:%d", this.connectionConfig.Key.Hostname, this.connectionConfig.Key.Port)
|
if this.migrationContext.IsTungsten {
|
||||||
|
log.Warning("log_slave_updates not found on %s:%d, but --tungsten provided, so I'm proceeding", this.connectionConfig.Key.Hostname, this.connectionConfig.Key.Port)
|
||||||
return nil
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if this.migrationContext.TestOnReplica || this.migrationContext.MigrateOnReplica {
|
||||||
|
return fmt.Errorf("%s:%d must have log_slave_updates enabled for testing/migrating on replica", this.connectionConfig.Key.Hostname, this.connectionConfig.Key.Port)
|
||||||
|
}
|
||||||
|
|
||||||
|
if this.migrationContext.InspectorIsAlsoApplier() {
|
||||||
|
log.Warning("log_slave_updates not found on %s:%d, but executing directly on master, so I'm proceeeding", this.connectionConfig.Key.Hostname, this.connectionConfig.Key.Port)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return fmt.Errorf("%s:%d must have log_slave_updates enabled for executing migration", this.connectionConfig.Key.Hostname, this.connectionConfig.Key.Port)
|
||||||
}
|
}
|
||||||
|
|
||||||
// validateTable makes sure the table we need to operate on actually exists
|
// validateTable makes sure the table we need to operate on actually exists
|
||||||
|
@ -171,7 +171,7 @@ func (this *Migrator) consumeRowCopyComplete() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (this *Migrator) canStopStreaming() bool {
|
func (this *Migrator) canStopStreaming() bool {
|
||||||
return false
|
return atomic.LoadInt64(&this.migrationContext.CutOverCompleteFlag) != 0
|
||||||
}
|
}
|
||||||
|
|
||||||
// onChangelogStateEvent is called when a binlog event operation on the changelog table is intercepted.
|
// onChangelogStateEvent is called when a binlog event operation on the changelog table is intercepted.
|
||||||
@ -345,6 +345,7 @@ func (this *Migrator) Migrate() (err error) {
|
|||||||
if err := this.cutOver(); err != nil {
|
if err := this.cutOver(); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
atomic.StoreInt64(&this.migrationContext.CutOverCompleteFlag, 1)
|
||||||
|
|
||||||
if err := this.finalCleanup(); err != nil {
|
if err := this.finalCleanup(); err != nil {
|
||||||
return nil
|
return nil
|
||||||
@ -803,7 +804,7 @@ func (this *Migrator) printStatus(rule PrintStatusRule, writers ...io.Writer) {
|
|||||||
} else if atomic.LoadInt64(&this.migrationContext.IsPostponingCutOver) > 0 {
|
} else if atomic.LoadInt64(&this.migrationContext.IsPostponingCutOver) > 0 {
|
||||||
eta = "due"
|
eta = "due"
|
||||||
state = "postponing cut-over"
|
state = "postponing cut-over"
|
||||||
} else if isThrottled, throttleReason := this.migrationContext.IsThrottled(); isThrottled {
|
} else if isThrottled, throttleReason, _ := this.migrationContext.IsThrottled(); isThrottled {
|
||||||
state = fmt.Sprintf("throttled, %s", throttleReason)
|
state = fmt.Sprintf("throttled, %s", throttleReason)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1058,6 +1059,9 @@ func (this *Migrator) finalCleanup() error {
|
|||||||
log.Errore(err)
|
log.Errore(err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if err := this.eventsStreamer.Close(); err != nil {
|
||||||
|
log.Errore(err)
|
||||||
|
}
|
||||||
|
|
||||||
if err := this.retryOperation(this.applier.DropChangelogTable); err != nil {
|
if err := this.retryOperation(this.applier.DropChangelogTable); err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -217,3 +217,9 @@ func (this *EventsStreamer) StreamEvents(canStopStreaming func() bool) error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (this *EventsStreamer) Close() (err error) {
|
||||||
|
err = this.binlogReader.Close()
|
||||||
|
log.Infof("Closed streamer connection. err=%+v", err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
@ -34,16 +34,16 @@ func NewThrottler(applier *Applier, inspector *Inspector) *Throttler {
|
|||||||
// shouldThrottle performs checks to see whether we should currently be throttling.
|
// shouldThrottle performs checks to see whether we should currently be throttling.
|
||||||
// It merely observes the metrics collected by other components, it does not issue
|
// It merely observes the metrics collected by other components, it does not issue
|
||||||
// its own metric collection.
|
// its own metric collection.
|
||||||
func (this *Throttler) shouldThrottle() (result bool, reason string) {
|
func (this *Throttler) shouldThrottle() (result bool, reason string, reasonHint base.ThrottleReasonHint) {
|
||||||
generalCheckResult := this.migrationContext.GetThrottleGeneralCheckResult()
|
generalCheckResult := this.migrationContext.GetThrottleGeneralCheckResult()
|
||||||
if generalCheckResult.ShouldThrottle {
|
if generalCheckResult.ShouldThrottle {
|
||||||
return generalCheckResult.ShouldThrottle, generalCheckResult.Reason
|
return generalCheckResult.ShouldThrottle, generalCheckResult.Reason, generalCheckResult.ReasonHint
|
||||||
}
|
}
|
||||||
// Replication lag throttle
|
// Replication lag throttle
|
||||||
maxLagMillisecondsThrottleThreshold := atomic.LoadInt64(&this.migrationContext.MaxLagMillisecondsThrottleThreshold)
|
maxLagMillisecondsThrottleThreshold := atomic.LoadInt64(&this.migrationContext.MaxLagMillisecondsThrottleThreshold)
|
||||||
lag := atomic.LoadInt64(&this.migrationContext.CurrentLag)
|
lag := atomic.LoadInt64(&this.migrationContext.CurrentLag)
|
||||||
if time.Duration(lag) > time.Duration(maxLagMillisecondsThrottleThreshold)*time.Millisecond {
|
if time.Duration(lag) > time.Duration(maxLagMillisecondsThrottleThreshold)*time.Millisecond {
|
||||||
return true, fmt.Sprintf("lag=%fs", time.Duration(lag).Seconds())
|
return true, fmt.Sprintf("lag=%fs", time.Duration(lag).Seconds()), base.NoThrottleReasonHint
|
||||||
}
|
}
|
||||||
checkThrottleControlReplicas := true
|
checkThrottleControlReplicas := true
|
||||||
if (this.migrationContext.TestOnReplica || this.migrationContext.MigrateOnReplica) && (atomic.LoadInt64(&this.migrationContext.AllEventsUpToLockProcessedInjectedFlag) > 0) {
|
if (this.migrationContext.TestOnReplica || this.migrationContext.MigrateOnReplica) && (atomic.LoadInt64(&this.migrationContext.AllEventsUpToLockProcessedInjectedFlag) > 0) {
|
||||||
@ -52,14 +52,14 @@ func (this *Throttler) shouldThrottle() (result bool, reason string) {
|
|||||||
if checkThrottleControlReplicas {
|
if checkThrottleControlReplicas {
|
||||||
lagResult := this.migrationContext.GetControlReplicasLagResult()
|
lagResult := this.migrationContext.GetControlReplicasLagResult()
|
||||||
if lagResult.Err != nil {
|
if lagResult.Err != nil {
|
||||||
return true, fmt.Sprintf("%+v %+v", lagResult.Key, lagResult.Err)
|
return true, fmt.Sprintf("%+v %+v", lagResult.Key, lagResult.Err), base.NoThrottleReasonHint
|
||||||
}
|
}
|
||||||
if lagResult.Lag > time.Duration(maxLagMillisecondsThrottleThreshold)*time.Millisecond {
|
if lagResult.Lag > time.Duration(maxLagMillisecondsThrottleThreshold)*time.Millisecond {
|
||||||
return true, fmt.Sprintf("%+v replica-lag=%fs", lagResult.Key, lagResult.Lag.Seconds())
|
return true, fmt.Sprintf("%+v replica-lag=%fs", lagResult.Key, lagResult.Lag.Seconds()), base.NoThrottleReasonHint
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Got here? No metrics indicates we need throttling.
|
// Got here? No metrics indicates we need throttling.
|
||||||
return false, ""
|
return false, "", base.NoThrottleReasonHint
|
||||||
}
|
}
|
||||||
|
|
||||||
// parseChangelogHeartbeat is called when a heartbeat event is intercepted
|
// parseChangelogHeartbeat is called when a heartbeat event is intercepted
|
||||||
@ -147,8 +147,8 @@ func (this *Throttler) criticalLoadIsMet() (met bool, variableName string, value
|
|||||||
// collectGeneralThrottleMetrics reads the once-per-sec metrics, and stores them onto this.migrationContext
|
// collectGeneralThrottleMetrics reads the once-per-sec metrics, and stores them onto this.migrationContext
|
||||||
func (this *Throttler) collectGeneralThrottleMetrics() error {
|
func (this *Throttler) collectGeneralThrottleMetrics() error {
|
||||||
|
|
||||||
setThrottle := func(throttle bool, reason string) error {
|
setThrottle := func(throttle bool, reason string, reasonHint base.ThrottleReasonHint) error {
|
||||||
this.migrationContext.SetThrottleGeneralCheckResult(base.NewThrottleCheckResult(throttle, reason))
|
this.migrationContext.SetThrottleGeneralCheckResult(base.NewThrottleCheckResult(throttle, reason, reasonHint))
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -161,7 +161,7 @@ func (this *Throttler) collectGeneralThrottleMetrics() error {
|
|||||||
|
|
||||||
criticalLoadMet, variableName, value, threshold, err := this.criticalLoadIsMet()
|
criticalLoadMet, variableName, value, threshold, err := this.criticalLoadIsMet()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return setThrottle(true, fmt.Sprintf("%s %s", variableName, err))
|
return setThrottle(true, fmt.Sprintf("%s %s", variableName, err), base.NoThrottleReasonHint)
|
||||||
}
|
}
|
||||||
if criticalLoadMet && this.migrationContext.CriticalLoadIntervalMilliseconds == 0 {
|
if criticalLoadMet && this.migrationContext.CriticalLoadIntervalMilliseconds == 0 {
|
||||||
this.migrationContext.PanicAbort <- fmt.Errorf("critical-load met: %s=%d, >=%d", variableName, value, threshold)
|
this.migrationContext.PanicAbort <- fmt.Errorf("critical-load met: %s=%d, >=%d", variableName, value, threshold)
|
||||||
@ -181,18 +181,18 @@ func (this *Throttler) collectGeneralThrottleMetrics() error {
|
|||||||
|
|
||||||
// User-based throttle
|
// User-based throttle
|
||||||
if atomic.LoadInt64(&this.migrationContext.ThrottleCommandedByUser) > 0 {
|
if atomic.LoadInt64(&this.migrationContext.ThrottleCommandedByUser) > 0 {
|
||||||
return setThrottle(true, "commanded by user")
|
return setThrottle(true, "commanded by user", base.UserCommandThrottleReasonHint)
|
||||||
}
|
}
|
||||||
if this.migrationContext.ThrottleFlagFile != "" {
|
if this.migrationContext.ThrottleFlagFile != "" {
|
||||||
if base.FileExists(this.migrationContext.ThrottleFlagFile) {
|
if base.FileExists(this.migrationContext.ThrottleFlagFile) {
|
||||||
// Throttle file defined and exists!
|
// Throttle file defined and exists!
|
||||||
return setThrottle(true, "flag-file")
|
return setThrottle(true, "flag-file", base.NoThrottleReasonHint)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if this.migrationContext.ThrottleAdditionalFlagFile != "" {
|
if this.migrationContext.ThrottleAdditionalFlagFile != "" {
|
||||||
if base.FileExists(this.migrationContext.ThrottleAdditionalFlagFile) {
|
if base.FileExists(this.migrationContext.ThrottleAdditionalFlagFile) {
|
||||||
// 2nd Throttle file defined and exists!
|
// 2nd Throttle file defined and exists!
|
||||||
return setThrottle(true, "flag-file")
|
return setThrottle(true, "flag-file", base.NoThrottleReasonHint)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -200,19 +200,19 @@ func (this *Throttler) collectGeneralThrottleMetrics() error {
|
|||||||
for variableName, threshold := range maxLoad {
|
for variableName, threshold := range maxLoad {
|
||||||
value, err := this.applier.ShowStatusVariable(variableName)
|
value, err := this.applier.ShowStatusVariable(variableName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return setThrottle(true, fmt.Sprintf("%s %s", variableName, err))
|
return setThrottle(true, fmt.Sprintf("%s %s", variableName, err), base.NoThrottleReasonHint)
|
||||||
}
|
}
|
||||||
if value >= threshold {
|
if value >= threshold {
|
||||||
return setThrottle(true, fmt.Sprintf("max-load %s=%d >= %d", variableName, value, threshold))
|
return setThrottle(true, fmt.Sprintf("max-load %s=%d >= %d", variableName, value, threshold), base.NoThrottleReasonHint)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if this.migrationContext.GetThrottleQuery() != "" {
|
if this.migrationContext.GetThrottleQuery() != "" {
|
||||||
if res, _ := this.applier.ExecuteThrottleQuery(); res > 0 {
|
if res, _ := this.applier.ExecuteThrottleQuery(); res > 0 {
|
||||||
return setThrottle(true, "throttle-query")
|
return setThrottle(true, "throttle-query", base.NoThrottleReasonHint)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return setThrottle(false, "")
|
return setThrottle(false, "", base.NoThrottleReasonHint)
|
||||||
}
|
}
|
||||||
|
|
||||||
// initiateThrottlerMetrics initiates the various processes that collect measurements
|
// initiateThrottlerMetrics initiates the various processes that collect measurements
|
||||||
@ -237,8 +237,8 @@ func (this *Throttler) initiateThrottlerChecks() error {
|
|||||||
throttlerTick := time.Tick(100 * time.Millisecond)
|
throttlerTick := time.Tick(100 * time.Millisecond)
|
||||||
|
|
||||||
throttlerFunction := func() {
|
throttlerFunction := func() {
|
||||||
alreadyThrottling, currentReason := this.migrationContext.IsThrottled()
|
alreadyThrottling, currentReason, _ := this.migrationContext.IsThrottled()
|
||||||
shouldThrottle, throttleReason := this.shouldThrottle()
|
shouldThrottle, throttleReason, throttleReasonHint := this.shouldThrottle()
|
||||||
if shouldThrottle && !alreadyThrottling {
|
if shouldThrottle && !alreadyThrottling {
|
||||||
// New throttling
|
// New throttling
|
||||||
this.applier.WriteAndLogChangelog("throttle", throttleReason)
|
this.applier.WriteAndLogChangelog("throttle", throttleReason)
|
||||||
@ -249,7 +249,7 @@ func (this *Throttler) initiateThrottlerChecks() error {
|
|||||||
// End of throttling
|
// End of throttling
|
||||||
this.applier.WriteAndLogChangelog("throttle", "done throttling")
|
this.applier.WriteAndLogChangelog("throttle", "done throttling")
|
||||||
}
|
}
|
||||||
this.migrationContext.SetThrottled(shouldThrottle, throttleReason)
|
this.migrationContext.SetThrottled(shouldThrottle, throttleReason, throttleReasonHint)
|
||||||
}
|
}
|
||||||
throttlerFunction()
|
throttlerFunction()
|
||||||
for range throttlerTick {
|
for range throttlerTick {
|
||||||
@ -265,7 +265,7 @@ func (this *Throttler) throttle(onThrottled func()) {
|
|||||||
for {
|
for {
|
||||||
// IsThrottled() is non-blocking; the throttling decision making takes place asynchronously.
|
// IsThrottled() is non-blocking; the throttling decision making takes place asynchronously.
|
||||||
// Therefore calling IsThrottled() is cheap
|
// Therefore calling IsThrottled() is cheap
|
||||||
if shouldThrottle, _ := this.migrationContext.IsThrottled(); !shouldThrottle {
|
if shouldThrottle, _, _ := this.migrationContext.IsThrottled(); !shouldThrottle {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if onThrottled != nil {
|
if onThrottled != nil {
|
||||||
|
28
localtests/datetime-submillis/create.sql
Normal file
28
localtests/datetime-submillis/create.sql
Normal file
@ -0,0 +1,28 @@
|
|||||||
|
drop table if exists gh_ost_test;
|
||||||
|
create table gh_ost_test (
|
||||||
|
id int auto_increment,
|
||||||
|
i int not null,
|
||||||
|
dt0 datetime(6),
|
||||||
|
dt1 datetime(6),
|
||||||
|
ts2 timestamp(6),
|
||||||
|
updated tinyint unsigned default 0,
|
||||||
|
primary key(id),
|
||||||
|
key i_idx(i)
|
||||||
|
) auto_increment=1;
|
||||||
|
|
||||||
|
drop event if exists gh_ost_test;
|
||||||
|
delimiter ;;
|
||||||
|
create event gh_ost_test
|
||||||
|
on schedule every 1 second
|
||||||
|
starts current_timestamp
|
||||||
|
ends current_timestamp + interval 60 second
|
||||||
|
on completion not preserve
|
||||||
|
enable
|
||||||
|
do
|
||||||
|
begin
|
||||||
|
insert into gh_ost_test values (null, 11, now(), now(), now(), 0);
|
||||||
|
update gh_ost_test set dt1='2016-10-31 11:22:33.444', updated = 1 where i = 11 order by id desc limit 1;
|
||||||
|
|
||||||
|
insert into gh_ost_test values (null, 13, now(), now(), now(), 0);
|
||||||
|
update gh_ost_test set ts1='2016-11-01 11:22:33.444', updated = 1 where i = 13 order by id desc limit 1;
|
||||||
|
end ;;
|
5
vendor/github.com/siddontang/go-mysql/replication/row_event.go
generated
vendored
5
vendor/github.com/siddontang/go-mysql/replication/row_event.go
generated
vendored
@ -642,7 +642,7 @@ func decodeDatetime2(data []byte, dec uint16) (interface{}, int, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
//ingore second part, no precision now
|
//ingore second part, no precision now
|
||||||
//var secPart int64 = tmp % (1 << 24)
|
var secPart int64 = tmp % (1 << 24)
|
||||||
ymdhms := tmp >> 24
|
ymdhms := tmp >> 24
|
||||||
|
|
||||||
ymd := ymdhms >> 17
|
ymd := ymdhms >> 17
|
||||||
@ -657,6 +657,9 @@ func decodeDatetime2(data []byte, dec uint16) (interface{}, int, error) {
|
|||||||
minute := int((hms >> 6) % (1 << 6))
|
minute := int((hms >> 6) % (1 << 6))
|
||||||
hour := int((hms >> 12))
|
hour := int((hms >> 12))
|
||||||
|
|
||||||
|
if secPart != 0 {
|
||||||
|
return fmt.Sprintf("%04d-%02d-%02d %02d:%02d:%02d.%d", year, month, day, hour, minute, second, secPart), n, nil // commented by Shlomi Noach. Yes I know about `git blame`
|
||||||
|
}
|
||||||
return fmt.Sprintf("%04d-%02d-%02d %02d:%02d:%02d", year, month, day, hour, minute, second), n, nil // commented by Shlomi Noach. Yes I know about `git blame`
|
return fmt.Sprintf("%04d-%02d-%02d %02d:%02d:%02d", year, month, day, hour, minute, second), n, nil // commented by Shlomi Noach. Yes I know about `git blame`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user