Merge branch 'master' into datetime-6-millis-corruption
This commit is contained in:
commit
5af587a92b
@ -19,6 +19,7 @@ Both interfaces may serve at the same time. Both respond to simple text command,
|
|||||||
- `sup`: returns a brief status summary of migration progress
|
- `sup`: returns a brief status summary of migration progress
|
||||||
- `coordinates`: returns recent (though not exactly up to date) binary log coordinates of the inspected server
|
- `coordinates`: returns recent (though not exactly up to date) binary log coordinates of the inspected server
|
||||||
- `chunk-size=<newsize>`: modify the `chunk-size`; applies on next running copy-iteration
|
- `chunk-size=<newsize>`: modify the `chunk-size`; applies on next running copy-iteration
|
||||||
|
- `dml-batch-size=<newsize>`: modify the `dml-batch-size`; applies on next applying of binary log events
|
||||||
- `max-lag-millis=<max-lag>`: modify the maximum replication lag threshold (milliseconds, minimum value is `100`, i.e. `0.1` second)
|
- `max-lag-millis=<max-lag>`: modify the maximum replication lag threshold (milliseconds, minimum value is `100`, i.e. `0.1` second)
|
||||||
- `max-load=<max-load-thresholds>`: modify the `max-load` config; applies on next running copy-iteration
|
- `max-load=<max-load-thresholds>`: modify the `max-load` config; applies on next running copy-iteration
|
||||||
- The `max-load` format must be: `some_status=<numeric-threshold>[,some_status=<numeric-threshold>...]`'
|
- The `max-load` format must be: `some_status=<numeric-threshold>[,some_status=<numeric-threshold>...]`'
|
||||||
@ -52,7 +53,7 @@ While migration is running:
|
|||||||
$ echo status | nc -U /tmp/gh-ost.test.sample_data_0.sock
|
$ echo status | nc -U /tmp/gh-ost.test.sample_data_0.sock
|
||||||
# Migrating `test`.`sample_data_0`; Ghost table is `test`.`_sample_data_0_gst`
|
# Migrating `test`.`sample_data_0`; Ghost table is `test`.`_sample_data_0_gst`
|
||||||
# Migration started at Tue Jun 07 11:45:16 +0200 2016
|
# Migration started at Tue Jun 07 11:45:16 +0200 2016
|
||||||
# chunk-size: 200; max lag: 1500ms; max-load: map[Threads_connected:20]
|
# chunk-size: 200; max lag: 1500ms; dml-batch-size: 10; max-load: map[Threads_connected:20]
|
||||||
# Throttle additional flag file: /tmp/gh-ost.throttle
|
# Throttle additional flag file: /tmp/gh-ost.throttle
|
||||||
# Serving on unix socket: /tmp/gh-ost.test.sample_data_0.sock
|
# Serving on unix socket: /tmp/gh-ost.test.sample_data_0.sock
|
||||||
# Serving on TCP port: 10001
|
# Serving on TCP port: 10001
|
||||||
@ -63,7 +64,7 @@ Copy: 0/2915 0.0%; Applied: 0; Backlog: 0/100; Elapsed: 40s(copy), 41s(total); s
|
|||||||
$ echo "chunk-size=250" | nc -U /tmp/gh-ost.test.sample_data_0.sock
|
$ echo "chunk-size=250" | nc -U /tmp/gh-ost.test.sample_data_0.sock
|
||||||
# Migrating `test`.`sample_data_0`; Ghost table is `test`.`_sample_data_0_gst`
|
# Migrating `test`.`sample_data_0`; Ghost table is `test`.`_sample_data_0_gst`
|
||||||
# Migration started at Tue Jun 07 11:56:03 +0200 2016
|
# Migration started at Tue Jun 07 11:56:03 +0200 2016
|
||||||
# chunk-size: 250; max lag: 1500ms; max-load: map[Threads_connected:20]
|
# chunk-size: 250; max lag: 1500ms; dml-batch-size: 10; max-load: map[Threads_connected:20]
|
||||||
# Throttle additional flag file: /tmp/gh-ost.throttle
|
# Throttle additional flag file: /tmp/gh-ost.throttle
|
||||||
# Serving on unix socket: /tmp/gh-ost.test.sample_data_0.sock
|
# Serving on unix socket: /tmp/gh-ost.test.sample_data_0.sock
|
||||||
# Serving on TCP port: 10001
|
# Serving on TCP port: 10001
|
||||||
|
@ -46,8 +46,8 @@ const (
|
|||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
HTTPStatusOK = 200
|
HTTPStatusOK = 200
|
||||||
maxBatchSize = 1000
|
MaxEventsBatchSize = 1000
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@ -243,9 +243,18 @@ func GetMigrationContext() *MigrationContext {
|
|||||||
return context
|
return context
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func getSafeTableName(baseName string, suffix string) string {
|
||||||
|
name := fmt.Sprintf("_%s_%s", baseName, suffix)
|
||||||
|
if len(name) <= mysql.MaxTableNameLength {
|
||||||
|
return name
|
||||||
|
}
|
||||||
|
extraCharacters := len(name) - mysql.MaxTableNameLength
|
||||||
|
return fmt.Sprintf("_%s_%s", baseName[0:len(baseName)-extraCharacters], suffix)
|
||||||
|
}
|
||||||
|
|
||||||
// GetGhostTableName generates the name of ghost table, based on original table name
|
// GetGhostTableName generates the name of ghost table, based on original table name
|
||||||
func (this *MigrationContext) GetGhostTableName() string {
|
func (this *MigrationContext) GetGhostTableName() string {
|
||||||
return fmt.Sprintf("_%s_gho", this.OriginalTableName)
|
return getSafeTableName(this.OriginalTableName, "gho")
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetOldTableName generates the name of the "old" table, into which the original table is renamed.
|
// GetOldTableName generates the name of the "old" table, into which the original table is renamed.
|
||||||
@ -255,14 +264,14 @@ func (this *MigrationContext) GetOldTableName() string {
|
|||||||
timestamp := fmt.Sprintf("%d%02d%02d%02d%02d%02d",
|
timestamp := fmt.Sprintf("%d%02d%02d%02d%02d%02d",
|
||||||
t.Year(), t.Month(), t.Day(),
|
t.Year(), t.Month(), t.Day(),
|
||||||
t.Hour(), t.Minute(), t.Second())
|
t.Hour(), t.Minute(), t.Second())
|
||||||
return fmt.Sprintf("_%s_%s_del", this.OriginalTableName, timestamp)
|
return getSafeTableName(this.OriginalTableName, fmt.Sprintf("%s_del", timestamp))
|
||||||
}
|
}
|
||||||
return fmt.Sprintf("_%s_del", this.OriginalTableName)
|
return getSafeTableName(this.OriginalTableName, "del")
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetChangelogTableName generates the name of changelog table, based on original table name
|
// GetChangelogTableName generates the name of changelog table, based on original table name
|
||||||
func (this *MigrationContext) GetChangelogTableName() string {
|
func (this *MigrationContext) GetChangelogTableName() string {
|
||||||
return fmt.Sprintf("_%s_ghc", this.OriginalTableName)
|
return getSafeTableName(this.OriginalTableName, "ghc")
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetVoluntaryLockName returns a name of a voluntary lock to be used throughout
|
// GetVoluntaryLockName returns a name of a voluntary lock to be used throughout
|
||||||
@ -442,8 +451,8 @@ func (this *MigrationContext) SetDMLBatchSize(batchSize int64) {
|
|||||||
if batchSize < 1 {
|
if batchSize < 1 {
|
||||||
batchSize = 1
|
batchSize = 1
|
||||||
}
|
}
|
||||||
if batchSize > maxBatchSize {
|
if batchSize > MaxEventsBatchSize {
|
||||||
batchSize = maxBatchSize
|
batchSize = MaxEventsBatchSize
|
||||||
}
|
}
|
||||||
atomic.StoreInt64(&this.DMLBatchSize, batchSize)
|
atomic.StoreInt64(&this.DMLBatchSize, batchSize)
|
||||||
}
|
}
|
||||||
|
47
go/base/context_test.go
Normal file
47
go/base/context_test.go
Normal file
@ -0,0 +1,47 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2016 GitHub Inc.
|
||||||
|
See https://github.com/github/gh-ost/blob/master/LICENSE
|
||||||
|
*/
|
||||||
|
|
||||||
|
package base
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/outbrain/golib/log"
|
||||||
|
test "github.com/outbrain/golib/tests"
|
||||||
|
)
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
log.SetLevel(log.ERROR)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestGetTableNames(t *testing.T) {
|
||||||
|
context = newMigrationContext()
|
||||||
|
{
|
||||||
|
context.OriginalTableName = "some_table"
|
||||||
|
test.S(t).ExpectEquals(context.GetOldTableName(), "_some_table_del")
|
||||||
|
test.S(t).ExpectEquals(context.GetGhostTableName(), "_some_table_gho")
|
||||||
|
test.S(t).ExpectEquals(context.GetChangelogTableName(), "_some_table_ghc")
|
||||||
|
}
|
||||||
|
{
|
||||||
|
context.OriginalTableName = "a123456789012345678901234567890123456789012345678901234567890"
|
||||||
|
test.S(t).ExpectEquals(context.GetOldTableName(), "_a1234567890123456789012345678901234567890123456789012345678_del")
|
||||||
|
test.S(t).ExpectEquals(context.GetGhostTableName(), "_a1234567890123456789012345678901234567890123456789012345678_gho")
|
||||||
|
test.S(t).ExpectEquals(context.GetChangelogTableName(), "_a1234567890123456789012345678901234567890123456789012345678_ghc")
|
||||||
|
}
|
||||||
|
{
|
||||||
|
context.OriginalTableName = "a123456789012345678901234567890123456789012345678901234567890123"
|
||||||
|
oldTableName := context.GetOldTableName()
|
||||||
|
test.S(t).ExpectEquals(oldTableName, "_a1234567890123456789012345678901234567890123456789012345678_del")
|
||||||
|
}
|
||||||
|
{
|
||||||
|
context.OriginalTableName = "a123456789012345678901234567890123456789012345678901234567890123"
|
||||||
|
context.TimestampOldTable = true
|
||||||
|
longForm := "Jan 2, 2006 at 3:04pm (MST)"
|
||||||
|
context.StartTime, _ = time.Parse(longForm, "Feb 3, 2013 at 7:54pm (PST)")
|
||||||
|
oldTableName := context.GetOldTableName()
|
||||||
|
test.S(t).ExpectEquals(oldTableName, "_a1234567890123456789012345678901234567890123_20130203195400_del")
|
||||||
|
}
|
||||||
|
}
|
@ -52,10 +52,6 @@ func newApplyEventStructByDML(dmlEvent *binlog.BinlogDMLEvent) *applyEventStruct
|
|||||||
return result
|
return result
|
||||||
}
|
}
|
||||||
|
|
||||||
const (
|
|
||||||
applyEventsQueueBuffer = 100
|
|
||||||
)
|
|
||||||
|
|
||||||
type PrintStatusRule int
|
type PrintStatusRule int
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@ -101,7 +97,7 @@ func NewMigrator() *Migrator {
|
|||||||
allEventsUpToLockProcessed: make(chan string),
|
allEventsUpToLockProcessed: make(chan string),
|
||||||
|
|
||||||
copyRowsQueue: make(chan tableWriteFunc),
|
copyRowsQueue: make(chan tableWriteFunc),
|
||||||
applyEventsQueue: make(chan *applyEventStruct, applyEventsQueueBuffer),
|
applyEventsQueue: make(chan *applyEventStruct, base.MaxEventsBatchSize),
|
||||||
handledChangelogStates: make(map[string]bool),
|
handledChangelogStates: make(map[string]bool),
|
||||||
}
|
}
|
||||||
return migrator
|
return migrator
|
||||||
@ -767,9 +763,10 @@ func (this *Migrator) printMigrationStatusHint(writers ...io.Writer) {
|
|||||||
))
|
))
|
||||||
maxLoad := this.migrationContext.GetMaxLoad()
|
maxLoad := this.migrationContext.GetMaxLoad()
|
||||||
criticalLoad := this.migrationContext.GetCriticalLoad()
|
criticalLoad := this.migrationContext.GetCriticalLoad()
|
||||||
fmt.Fprintln(w, fmt.Sprintf("# chunk-size: %+v; max-lag-millis: %+vms; max-load: %s; critical-load: %s; nice-ratio: %f",
|
fmt.Fprintln(w, fmt.Sprintf("# chunk-size: %+v; max-lag-millis: %+vms; dml-batch-size: %+v; max-load: %s; critical-load: %s; nice-ratio: %f",
|
||||||
atomic.LoadInt64(&this.migrationContext.ChunkSize),
|
atomic.LoadInt64(&this.migrationContext.ChunkSize),
|
||||||
atomic.LoadInt64(&this.migrationContext.MaxLagMillisecondsThrottleThreshold),
|
atomic.LoadInt64(&this.migrationContext.MaxLagMillisecondsThrottleThreshold),
|
||||||
|
atomic.LoadInt64(&this.migrationContext.DMLBatchSize),
|
||||||
maxLoad.String(),
|
maxLoad.String(),
|
||||||
criticalLoad.String(),
|
criticalLoad.String(),
|
||||||
this.migrationContext.GetNiceRatio(),
|
this.migrationContext.GetNiceRatio(),
|
||||||
|
@ -146,6 +146,7 @@ status # Print a detailed status message
|
|||||||
sup # Print a short status message
|
sup # Print a short status message
|
||||||
coordinates # Print the currently inspected coordinates
|
coordinates # Print the currently inspected coordinates
|
||||||
chunk-size=<newsize> # Set a new chunk-size
|
chunk-size=<newsize> # Set a new chunk-size
|
||||||
|
dml-batch-size=<newsize> # Set a new dml-batch-size
|
||||||
nice-ratio=<ratio> # Set a new nice-ratio, immediate sleep after each row-copy operation, float (examples: 0 is agrressive, 0.7 adds 70% runtime, 1.0 doubles runtime, 2.0 triples runtime, ...)
|
nice-ratio=<ratio> # Set a new nice-ratio, immediate sleep after each row-copy operation, float (examples: 0 is agrressive, 0.7 adds 70% runtime, 1.0 doubles runtime, 2.0 triples runtime, ...)
|
||||||
critical-load=<load> # Set a new set of max-load thresholds
|
critical-load=<load> # Set a new set of max-load thresholds
|
||||||
max-lag-millis=<max-lag> # Set a new replication lag threshold
|
max-lag-millis=<max-lag> # Set a new replication lag threshold
|
||||||
@ -187,6 +188,19 @@ help # This message
|
|||||||
return ForcePrintStatusAndHintRule, nil
|
return ForcePrintStatusAndHintRule, nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
case "dml-batch-size":
|
||||||
|
{
|
||||||
|
if argIsQuestion {
|
||||||
|
fmt.Fprintf(writer, "%+v\n", atomic.LoadInt64(&this.migrationContext.DMLBatchSize))
|
||||||
|
return NoPrintStatusRule, nil
|
||||||
|
}
|
||||||
|
if dmlBatchSize, err := strconv.Atoi(arg); err != nil {
|
||||||
|
return NoPrintStatusRule, err
|
||||||
|
} else {
|
||||||
|
this.migrationContext.SetDMLBatchSize(int64(dmlBatchSize))
|
||||||
|
return ForcePrintStatusAndHintRule, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
case "max-lag-millis":
|
case "max-lag-millis":
|
||||||
{
|
{
|
||||||
if argIsQuestion {
|
if argIsQuestion {
|
||||||
|
Loading…
Reference in New Issue
Block a user