support rocksdb as transactional engine
This commit is contained in:
parent
9c0babdc07
commit
f053ccd9a6
@ -246,6 +246,9 @@ Allows `gh-ost` to connect to the MySQL servers using encrypted connections, but
|
|||||||
|
|
||||||
`--ssl-key=/path/to/ssl-key.key`: SSL private key file (in PEM format).
|
`--ssl-key=/path/to/ssl-key.key`: SSL private key file (in PEM format).
|
||||||
|
|
||||||
|
### storage-engine
|
||||||
|
default is `innodb`. When set to `rocksdb`, some necessary changes (e.g. sets isolation level to READ_COMMITTED) is made to support rocksdb as transactional engine.
|
||||||
|
|
||||||
### test-on-replica
|
### test-on-replica
|
||||||
|
|
||||||
Issue the migration on a replica; do not modify data on master. Useful for validating, testing and benchmarking. See [`testing-on-replica`](testing-on-replica.md)
|
Issue the migration on a replica; do not modify data on master. Useful for validating, testing and benchmarking. See [`testing-on-replica`](testing-on-replica.md)
|
||||||
|
@ -270,8 +270,6 @@ func NewMigrationContext() *MigrationContext {
|
|||||||
Uuid: uuid.NewV4().String(),
|
Uuid: uuid.NewV4().String(),
|
||||||
defaultNumRetries: 60,
|
defaultNumRetries: 60,
|
||||||
ChunkSize: 1000,
|
ChunkSize: 1000,
|
||||||
InspectorConnectionConfig: mysql.NewConnectionConfig(),
|
|
||||||
ApplierConnectionConfig: mysql.NewConnectionConfig(),
|
|
||||||
MaxLagMillisecondsThrottleThreshold: 1500,
|
MaxLagMillisecondsThrottleThreshold: 1500,
|
||||||
CutOverLockTimeoutSeconds: 3,
|
CutOverLockTimeoutSeconds: 3,
|
||||||
DMLBatchSize: 10,
|
DMLBatchSize: 10,
|
||||||
@ -290,6 +288,21 @@ func NewMigrationContext() *MigrationContext {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (this *MigrationContext) SetConnectionConfig(storageEngine string) error {
|
||||||
|
transactionIsolation := "REPEATABLE-READ"
|
||||||
|
switch storageEngine {
|
||||||
|
case "rocksdb":
|
||||||
|
transactionIsolation = "READ-COMMITTED"
|
||||||
|
case "innodb":
|
||||||
|
transactionIsolation = "REPEATABLE-READ"
|
||||||
|
default:
|
||||||
|
transactionIsolation = "REPEATABLE-READ"
|
||||||
|
}
|
||||||
|
this.InspectorConnectionConfig = mysql.NewConnectionConfig(transactionIsolation)
|
||||||
|
this.ApplierConnectionConfig = mysql.NewConnectionConfig(transactionIsolation)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func getSafeTableName(baseName string, suffix string) string {
|
func getSafeTableName(baseName string, suffix string) string {
|
||||||
name := fmt.Sprintf("_%s_%s", baseName, suffix)
|
name := fmt.Sprintf("_%s_%s", baseName, suffix)
|
||||||
if len(name) <= mysql.MaxTableNameLength {
|
if len(name) <= mysql.MaxTableNameLength {
|
||||||
|
@ -68,6 +68,7 @@ func main() {
|
|||||||
flag.StringVar(&migrationContext.OriginalTableName, "table", "", "table name (mandatory)")
|
flag.StringVar(&migrationContext.OriginalTableName, "table", "", "table name (mandatory)")
|
||||||
flag.StringVar(&migrationContext.AlterStatement, "alter", "", "alter statement (mandatory)")
|
flag.StringVar(&migrationContext.AlterStatement, "alter", "", "alter statement (mandatory)")
|
||||||
flag.BoolVar(&migrationContext.AttemptInstantDDL, "attempt-instant-ddl", false, "Attempt to use instant DDL for this migration first")
|
flag.BoolVar(&migrationContext.AttemptInstantDDL, "attempt-instant-ddl", false, "Attempt to use instant DDL for this migration first")
|
||||||
|
storageEngine := flag.String("storage-engine", "innodb", "Specify table storage engine (default: 'innodb'). When 'rocksdb': change session transaction isolation level to READ_COMMITTED.")
|
||||||
|
|
||||||
flag.BoolVar(&migrationContext.CountTableRows, "exact-rowcount", false, "actually count table rows as opposed to estimate them (results in more accurate progress estimation)")
|
flag.BoolVar(&migrationContext.CountTableRows, "exact-rowcount", false, "actually count table rows as opposed to estimate them (results in more accurate progress estimation)")
|
||||||
flag.BoolVar(&migrationContext.ConcurrentCountTableRows, "concurrent-rowcount", true, "(with --exact-rowcount), when true (default): count rows after row-copy begins, concurrently, and adjust row estimate later on; when false: first count rows, then start row copy")
|
flag.BoolVar(&migrationContext.ConcurrentCountTableRows, "concurrent-rowcount", true, "(with --exact-rowcount), when true (default): count rows after row-copy begins, concurrently, and adjust row estimate later on; when false: first count rows, then start row copy")
|
||||||
@ -248,6 +249,10 @@ func main() {
|
|||||||
migrationContext.Log.Warning("--replication-lag-query is deprecated")
|
migrationContext.Log.Warning("--replication-lag-query is deprecated")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if err := migrationContext.SetConnectionConfig(*storageEngine); err != nil {
|
||||||
|
migrationContext.Log.Fatale(err)
|
||||||
|
}
|
||||||
|
|
||||||
switch *cutOver {
|
switch *cutOver {
|
||||||
case "atomic", "default", "":
|
case "atomic", "default", "":
|
||||||
migrationContext.CutOverType = base.CutOverAtomic
|
migrationContext.CutOverType = base.CutOverAtomic
|
||||||
|
@ -18,7 +18,6 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
transactionIsolation = "REPEATABLE-READ"
|
|
||||||
TLS_CONFIG_KEY = "ghost"
|
TLS_CONFIG_KEY = "ghost"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -30,11 +29,13 @@ type ConnectionConfig struct {
|
|||||||
ImpliedKey *InstanceKey
|
ImpliedKey *InstanceKey
|
||||||
tlsConfig *tls.Config
|
tlsConfig *tls.Config
|
||||||
Timeout float64
|
Timeout float64
|
||||||
|
transactionIsolation string
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewConnectionConfig() *ConnectionConfig {
|
func NewConnectionConfig(transactionIsolation string) *ConnectionConfig {
|
||||||
config := &ConnectionConfig{
|
config := &ConnectionConfig{
|
||||||
Key: InstanceKey{},
|
Key: InstanceKey{},
|
||||||
|
transactionIsolation: transactionIsolation,
|
||||||
}
|
}
|
||||||
config.ImpliedKey = &config.Key
|
config.ImpliedKey = &config.Key
|
||||||
return config
|
return config
|
||||||
@ -126,7 +127,7 @@ func (this *ConnectionConfig) GetDBUri(databaseName string) string {
|
|||||||
"charset=utf8mb4,utf8,latin1",
|
"charset=utf8mb4,utf8,latin1",
|
||||||
"interpolateParams=true",
|
"interpolateParams=true",
|
||||||
fmt.Sprintf("tls=%s", tlsOption),
|
fmt.Sprintf("tls=%s", tlsOption),
|
||||||
fmt.Sprintf("transaction_isolation=%q", transactionIsolation),
|
fmt.Sprintf("transaction_isolation=%q", this.transactionIsolation),
|
||||||
fmt.Sprintf("timeout=%fs", this.Timeout),
|
fmt.Sprintf("timeout=%fs", this.Timeout),
|
||||||
fmt.Sprintf("readTimeout=%fs", this.Timeout),
|
fmt.Sprintf("readTimeout=%fs", this.Timeout),
|
||||||
fmt.Sprintf("writeTimeout=%fs", this.Timeout),
|
fmt.Sprintf("writeTimeout=%fs", this.Timeout),
|
||||||
|
@ -13,12 +13,17 @@ import (
|
|||||||
test "github.com/openark/golib/tests"
|
test "github.com/openark/golib/tests"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
transactionIsolation = "REPEATABLE-READ"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
log.SetLevel(log.ERROR)
|
log.SetLevel(log.ERROR)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestNewConnectionConfig(t *testing.T) {
|
func TestNewConnectionConfig(t *testing.T) {
|
||||||
c := NewConnectionConfig()
|
c := NewConnectionConfig(transactionIsolation)
|
||||||
test.S(t).ExpectEquals(c.Key.Hostname, "")
|
test.S(t).ExpectEquals(c.Key.Hostname, "")
|
||||||
test.S(t).ExpectEquals(c.Key.Port, 0)
|
test.S(t).ExpectEquals(c.Key.Port, 0)
|
||||||
test.S(t).ExpectEquals(c.ImpliedKey.Hostname, "")
|
test.S(t).ExpectEquals(c.ImpliedKey.Hostname, "")
|
||||||
@ -28,7 +33,7 @@ func TestNewConnectionConfig(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestDuplicateCredentials(t *testing.T) {
|
func TestDuplicateCredentials(t *testing.T) {
|
||||||
c := NewConnectionConfig()
|
c := NewConnectionConfig(transactionIsolation)
|
||||||
c.Key = InstanceKey{Hostname: "myhost", Port: 3306}
|
c.Key = InstanceKey{Hostname: "myhost", Port: 3306}
|
||||||
c.User = "gromit"
|
c.User = "gromit"
|
||||||
c.Password = "penguin"
|
c.Password = "penguin"
|
||||||
@ -48,7 +53,7 @@ func TestDuplicateCredentials(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestDuplicate(t *testing.T) {
|
func TestDuplicate(t *testing.T) {
|
||||||
c := NewConnectionConfig()
|
c := NewConnectionConfig(transactionIsolation)
|
||||||
c.Key = InstanceKey{Hostname: "myhost", Port: 3306}
|
c.Key = InstanceKey{Hostname: "myhost", Port: 3306}
|
||||||
c.User = "gromit"
|
c.User = "gromit"
|
||||||
c.Password = "penguin"
|
c.Password = "penguin"
|
||||||
@ -63,7 +68,7 @@ func TestDuplicate(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestGetDBUri(t *testing.T) {
|
func TestGetDBUri(t *testing.T) {
|
||||||
c := NewConnectionConfig()
|
c := NewConnectionConfig(transactionIsolation)
|
||||||
c.Key = InstanceKey{Hostname: "myhost", Port: 3306}
|
c.Key = InstanceKey{Hostname: "myhost", Port: 3306}
|
||||||
c.User = "gromit"
|
c.User = "gromit"
|
||||||
c.Password = "penguin"
|
c.Password = "penguin"
|
||||||
@ -74,7 +79,7 @@ func TestGetDBUri(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestGetDBUriWithTLSSetup(t *testing.T) {
|
func TestGetDBUriWithTLSSetup(t *testing.T) {
|
||||||
c := NewConnectionConfig()
|
c := NewConnectionConfig(transactionIsolation)
|
||||||
c.Key = InstanceKey{Hostname: "myhost", Port: 3306}
|
c.Key = InstanceKey{Hostname: "myhost", Port: 3306}
|
||||||
c.User = "gromit"
|
c.User = "gromit"
|
||||||
c.Password = "penguin"
|
c.Password = "penguin"
|
||||||
|
Loading…
Reference in New Issue
Block a user