Merge pull request #1190 from wangzihuacool/add-rocksdb-as-transactional-engine

add rocksdb as transactional engine
This commit is contained in:
dm-2 2022-11-29 11:38:54 +00:00 committed by GitHub
commit 7320fda848
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 108 additions and 23 deletions

View File

@ -8,7 +8,7 @@ jobs:
runs-on: ubuntu-20.04 runs-on: ubuntu-20.04
strategy: strategy:
matrix: matrix:
version: [mysql-5.7.25,mysql-8.0.16] version: [mysql-5.7.25,mysql-8.0.16,PerconaServer-8.0.21]
steps: steps:
- uses: actions/checkout@v2 - uses: actions/checkout@v2

View File

@ -246,6 +246,18 @@ Allows `gh-ost` to connect to the MySQL servers using encrypted connections, but
`--ssl-key=/path/to/ssl-key.key`: SSL private key file (in PEM format). `--ssl-key=/path/to/ssl-key.key`: SSL private key file (in PEM format).
### storage-engine
Default is `innodb`, and `rocksdb` support is currently experimental. InnoDB and RocksDB are both transactional engines, supporting both shared and exclusive row locks.
But RocksDB currently lacks a few features support compared to InnoDB:
- Gap Locks
- Foreign Key
- Generated Columns
- Spatial
- Geometry
When `--storage-engine=rocksdb`, `gh-ost` will make some changes necessary (e.g. sets isolation level to `READ_COMMITTED`) to support RocksDB.
### test-on-replica ### test-on-replica
Issue the migration on a replica; do not modify data on master. Useful for validating, testing and benchmarking. See [`testing-on-replica`](testing-on-replica.md) Issue the migration on a replica; do not modify data on master. Useful for validating, testing and benchmarking. See [`testing-on-replica`](testing-on-replica.md)

View File

@ -290,6 +290,19 @@ func NewMigrationContext() *MigrationContext {
} }
} }
func (this *MigrationContext) SetConnectionConfig(storageEngine string) error {
var transactionIsolation string
switch storageEngine {
case "rocksdb":
transactionIsolation = "READ-COMMITTED"
default:
transactionIsolation = "REPEATABLE-READ"
}
this.InspectorConnectionConfig.TransactionIsolation = transactionIsolation
this.ApplierConnectionConfig.TransactionIsolation = transactionIsolation
return nil
}
func getSafeTableName(baseName string, suffix string) string { func getSafeTableName(baseName string, suffix string) string {
name := fmt.Sprintf("_%s_%s", baseName, suffix) name := fmt.Sprintf("_%s_%s", baseName, suffix)
if len(name) <= mysql.MaxTableNameLength { if len(name) <= mysql.MaxTableNameLength {
@ -428,6 +441,10 @@ func (this *MigrationContext) IsTransactionalTable() bool {
{ {
return true return true
} }
case "rocksdb":
{
return true
}
} }
return false return false
} }

View File

@ -68,6 +68,7 @@ func main() {
flag.StringVar(&migrationContext.OriginalTableName, "table", "", "table name (mandatory)") flag.StringVar(&migrationContext.OriginalTableName, "table", "", "table name (mandatory)")
flag.StringVar(&migrationContext.AlterStatement, "alter", "", "alter statement (mandatory)") flag.StringVar(&migrationContext.AlterStatement, "alter", "", "alter statement (mandatory)")
flag.BoolVar(&migrationContext.AttemptInstantDDL, "attempt-instant-ddl", false, "Attempt to use instant DDL for this migration first") flag.BoolVar(&migrationContext.AttemptInstantDDL, "attempt-instant-ddl", false, "Attempt to use instant DDL for this migration first")
storageEngine := flag.String("storage-engine", "innodb", "Specify table storage engine (default: 'innodb'). When 'rocksdb': the session transaction isolation level is changed from REPEATABLE_READ to READ_COMMITTED.")
flag.BoolVar(&migrationContext.CountTableRows, "exact-rowcount", false, "actually count table rows as opposed to estimate them (results in more accurate progress estimation)") flag.BoolVar(&migrationContext.CountTableRows, "exact-rowcount", false, "actually count table rows as opposed to estimate them (results in more accurate progress estimation)")
flag.BoolVar(&migrationContext.ConcurrentCountTableRows, "concurrent-rowcount", true, "(with --exact-rowcount), when true (default): count rows after row-copy begins, concurrently, and adjust row estimate later on; when false: first count rows, then start row copy") flag.BoolVar(&migrationContext.ConcurrentCountTableRows, "concurrent-rowcount", true, "(with --exact-rowcount), when true (default): count rows after row-copy begins, concurrently, and adjust row estimate later on; when false: first count rows, then start row copy")
@ -182,6 +183,10 @@ func main() {
migrationContext.Log.SetLevel(log.ERROR) migrationContext.Log.SetLevel(log.ERROR)
} }
if err := migrationContext.SetConnectionConfig(*storageEngine); err != nil {
migrationContext.Log.Fatale(err)
}
if migrationContext.AlterStatement == "" { if migrationContext.AlterStatement == "" {
log.Fatal("--alter must be provided and statement must not be empty") log.Fatal("--alter must be provided and statement must not be empty")
} }
@ -247,6 +252,9 @@ func main() {
if *replicationLagQuery != "" { if *replicationLagQuery != "" {
migrationContext.Log.Warning("--replication-lag-query is deprecated") migrationContext.Log.Warning("--replication-lag-query is deprecated")
} }
if *storageEngine == "rocksdb" {
migrationContext.Log.Warning("RocksDB storage engine support is experimental")
}
switch *cutOver { switch *cutOver {
case "atomic", "default", "": case "atomic", "default", "":

View File

@ -18,18 +18,18 @@ import (
) )
const ( const (
transactionIsolation = "REPEATABLE-READ" TLS_CONFIG_KEY = "ghost"
TLS_CONFIG_KEY = "ghost"
) )
// ConnectionConfig is the minimal configuration required to connect to a MySQL server // ConnectionConfig is the minimal configuration required to connect to a MySQL server
type ConnectionConfig struct { type ConnectionConfig struct {
Key InstanceKey Key InstanceKey
User string User string
Password string Password string
ImpliedKey *InstanceKey ImpliedKey *InstanceKey
tlsConfig *tls.Config tlsConfig *tls.Config
Timeout float64 Timeout float64
TransactionIsolation string
} }
func NewConnectionConfig() *ConnectionConfig { func NewConnectionConfig() *ConnectionConfig {
@ -43,11 +43,12 @@ func NewConnectionConfig() *ConnectionConfig {
// DuplicateCredentials creates a new connection config with given key and with same credentials as this config // DuplicateCredentials creates a new connection config with given key and with same credentials as this config
func (this *ConnectionConfig) DuplicateCredentials(key InstanceKey) *ConnectionConfig { func (this *ConnectionConfig) DuplicateCredentials(key InstanceKey) *ConnectionConfig {
config := &ConnectionConfig{ config := &ConnectionConfig{
Key: key, Key: key,
User: this.User, User: this.User,
Password: this.Password, Password: this.Password,
tlsConfig: this.tlsConfig, tlsConfig: this.tlsConfig,
Timeout: this.Timeout, Timeout: this.Timeout,
TransactionIsolation: this.TransactionIsolation,
} }
config.ImpliedKey = &config.Key config.ImpliedKey = &config.Key
return config return config
@ -126,7 +127,7 @@ func (this *ConnectionConfig) GetDBUri(databaseName string) string {
"charset=utf8mb4,utf8,latin1", "charset=utf8mb4,utf8,latin1",
"interpolateParams=true", "interpolateParams=true",
fmt.Sprintf("tls=%s", tlsOption), fmt.Sprintf("tls=%s", tlsOption),
fmt.Sprintf("transaction_isolation=%q", transactionIsolation), fmt.Sprintf("transaction_isolation=%q", this.TransactionIsolation),
fmt.Sprintf("timeout=%fs", this.Timeout), fmt.Sprintf("timeout=%fs", this.Timeout),
fmt.Sprintf("readTimeout=%fs", this.Timeout), fmt.Sprintf("readTimeout=%fs", this.Timeout),
fmt.Sprintf("writeTimeout=%fs", this.Timeout), fmt.Sprintf("writeTimeout=%fs", this.Timeout),

View File

@ -13,6 +13,10 @@ import (
test "github.com/openark/golib/tests" test "github.com/openark/golib/tests"
) )
const (
transactionIsolation = "REPEATABLE-READ"
)
func init() { func init() {
log.SetLevel(log.ERROR) log.SetLevel(log.ERROR)
} }
@ -25,6 +29,7 @@ func TestNewConnectionConfig(t *testing.T) {
test.S(t).ExpectEquals(c.ImpliedKey.Port, 0) test.S(t).ExpectEquals(c.ImpliedKey.Port, 0)
test.S(t).ExpectEquals(c.User, "") test.S(t).ExpectEquals(c.User, "")
test.S(t).ExpectEquals(c.Password, "") test.S(t).ExpectEquals(c.Password, "")
test.S(t).ExpectEquals(c.TransactionIsolation, "")
} }
func TestDuplicateCredentials(t *testing.T) { func TestDuplicateCredentials(t *testing.T) {
@ -36,6 +41,7 @@ func TestDuplicateCredentials(t *testing.T) {
InsecureSkipVerify: true, InsecureSkipVerify: true,
ServerName: "feathers", ServerName: "feathers",
} }
c.TransactionIsolation = transactionIsolation
dup := c.DuplicateCredentials(InstanceKey{Hostname: "otherhost", Port: 3310}) dup := c.DuplicateCredentials(InstanceKey{Hostname: "otherhost", Port: 3310})
test.S(t).ExpectEquals(dup.Key.Hostname, "otherhost") test.S(t).ExpectEquals(dup.Key.Hostname, "otherhost")
@ -45,6 +51,7 @@ func TestDuplicateCredentials(t *testing.T) {
test.S(t).ExpectEquals(dup.User, "gromit") test.S(t).ExpectEquals(dup.User, "gromit")
test.S(t).ExpectEquals(dup.Password, "penguin") test.S(t).ExpectEquals(dup.Password, "penguin")
test.S(t).ExpectEquals(dup.tlsConfig, c.tlsConfig) test.S(t).ExpectEquals(dup.tlsConfig, c.tlsConfig)
test.S(t).ExpectEquals(dup.TransactionIsolation, c.TransactionIsolation)
} }
func TestDuplicate(t *testing.T) { func TestDuplicate(t *testing.T) {
@ -52,6 +59,7 @@ func TestDuplicate(t *testing.T) {
c.Key = InstanceKey{Hostname: "myhost", Port: 3306} c.Key = InstanceKey{Hostname: "myhost", Port: 3306}
c.User = "gromit" c.User = "gromit"
c.Password = "penguin" c.Password = "penguin"
c.TransactionIsolation = transactionIsolation
dup := c.Duplicate() dup := c.Duplicate()
test.S(t).ExpectEquals(dup.Key.Hostname, "myhost") test.S(t).ExpectEquals(dup.Key.Hostname, "myhost")
@ -60,6 +68,7 @@ func TestDuplicate(t *testing.T) {
test.S(t).ExpectEquals(dup.ImpliedKey.Port, 3306) test.S(t).ExpectEquals(dup.ImpliedKey.Port, 3306)
test.S(t).ExpectEquals(dup.User, "gromit") test.S(t).ExpectEquals(dup.User, "gromit")
test.S(t).ExpectEquals(dup.Password, "penguin") test.S(t).ExpectEquals(dup.Password, "penguin")
test.S(t).ExpectEquals(dup.TransactionIsolation, transactionIsolation)
} }
func TestGetDBUri(t *testing.T) { func TestGetDBUri(t *testing.T) {
@ -68,6 +77,7 @@ func TestGetDBUri(t *testing.T) {
c.User = "gromit" c.User = "gromit"
c.Password = "penguin" c.Password = "penguin"
c.Timeout = 1.2345 c.Timeout = 1.2345
c.TransactionIsolation = transactionIsolation
uri := c.GetDBUri("test") uri := c.GetDBUri("test")
test.S(t).ExpectEquals(uri, `gromit:penguin@tcp(myhost:3306)/test?autocommit=true&charset=utf8mb4,utf8,latin1&interpolateParams=true&tls=false&transaction_isolation="REPEATABLE-READ"&timeout=1.234500s&readTimeout=1.234500s&writeTimeout=1.234500s`) test.S(t).ExpectEquals(uri, `gromit:penguin@tcp(myhost:3306)/test?autocommit=true&charset=utf8mb4,utf8,latin1&interpolateParams=true&tls=false&transaction_isolation="REPEATABLE-READ"&timeout=1.234500s&readTimeout=1.234500s&writeTimeout=1.234500s`)
@ -80,6 +90,7 @@ func TestGetDBUriWithTLSSetup(t *testing.T) {
c.Password = "penguin" c.Password = "penguin"
c.Timeout = 1.2345 c.Timeout = 1.2345
c.tlsConfig = &tls.Config{} c.tlsConfig = &tls.Config{}
c.TransactionIsolation = transactionIsolation
uri := c.GetDBUri("test") uri := c.GetDBUri("test")
test.S(t).ExpectEquals(uri, `gromit:penguin@tcp(myhost:3306)/test?autocommit=true&charset=utf8mb4,utf8,latin1&interpolateParams=true&tls=ghost&transaction_isolation="REPEATABLE-READ"&timeout=1.234500s&readTimeout=1.234500s&writeTimeout=1.234500s`) test.S(t).ExpectEquals(uri, `gromit:penguin@tcp(myhost:3306)/test?autocommit=true&charset=utf8mb4,utf8,latin1&interpolateParams=true&tls=ghost&transaction_isolation="REPEATABLE-READ"&timeout=1.234500s&readTimeout=1.234500s&writeTimeout=1.234500s`)

View File

@ -0,0 +1 @@
Percona

View File

@ -0,0 +1 @@
Percona

View File

@ -0,0 +1 @@
Percona

View File

@ -0,0 +1 @@
Percona

View File

@ -0,0 +1 @@
Percona

View File

@ -0,0 +1 @@
Percona

View File

@ -0,0 +1 @@
Percona

View File

@ -0,0 +1 @@
Percona

View File

@ -0,0 +1 @@
Percona

View File

@ -11,6 +11,7 @@ tests_path=$(dirname $0)
test_logfile=/tmp/gh-ost-test.log test_logfile=/tmp/gh-ost-test.log
default_ghost_binary=/tmp/gh-ost-test default_ghost_binary=/tmp/gh-ost-test
ghost_binary="" ghost_binary=""
storage_engine=innodb
exec_command_file=/tmp/gh-ost-test.bash exec_command_file=/tmp/gh-ost-test.bash
ghost_structure_output_file=/tmp/gh-ost-test.ghost.structure.sql ghost_structure_output_file=/tmp/gh-ost-test.ghost.structure.sql
orig_content_output_file=/tmp/gh-ost-test.orig.content.csv orig_content_output_file=/tmp/gh-ost-test.orig.content.csv
@ -24,12 +25,13 @@ replica_port=
original_sql_mode= original_sql_mode=
OPTIND=1 OPTIND=1
while getopts "b:" OPTION while getopts "b:s:" OPTION
do do
case $OPTION in case $OPTION in
b) b)
ghost_binary="$OPTARG" ghost_binary="$OPTARG";;
;; s)
storage_engine="$OPTARG";;
esac esac
done done
shift $((OPTIND-1)) shift $((OPTIND-1))
@ -99,9 +101,13 @@ test_single() {
if [ -f $tests_path/$test_name/ignore_versions ] ; then if [ -f $tests_path/$test_name/ignore_versions ] ; then
ignore_versions=$(cat $tests_path/$test_name/ignore_versions) ignore_versions=$(cat $tests_path/$test_name/ignore_versions)
mysql_version=$(gh-ost-test-mysql-master -s -s -e "select @@version") mysql_version=$(gh-ost-test-mysql-master -s -s -e "select @@version")
mysql_version_comment=$(gh-ost-test-mysql-master -s -s -e "select @@version_comment")
if echo "$mysql_version" | egrep -q "^${ignore_versions}" ; then if echo "$mysql_version" | egrep -q "^${ignore_versions}" ; then
echo -n "Skipping: $test_name" echo -n "Skipping: $test_name"
return 0 return 0
elif echo "$mysql_version_comment" | egrep -i -q "^${ignore_versions}" ; then
echo -n "Skipping: $test_name"
return 0
fi fi
fi fi
@ -154,7 +160,8 @@ test_single() {
--assume-master-host=${master_host}:${master_port} --assume-master-host=${master_host}:${master_port}
--database=test \ --database=test \
--table=gh_ost_test \ --table=gh_ost_test \
--alter='engine=innodb' \ --storage-engine=${storage_engine} \
--alter='engine=${storage_engine}' \
--exact-rowcount \ --exact-rowcount \
--assume-rbr \ --assume-rbr \
--initially-drop-old-table \ --initially-drop-old-table \

View File

@ -36,8 +36,16 @@ test_mysql_version() {
mkdir -p sandbox/binary mkdir -p sandbox/binary
rm -rf sandbox/binary/* rm -rf sandbox/binary/*
gh-ost-ci-env/bin/linux/dbdeployer unpack gh-ost-ci-env/mysql-tarballs/"$mysql_version".tar.xz --sandbox-binary ${PWD}/sandbox/binary local mysql_server=${mysql_version%-*}
if echo "$mysql_server" | egrep -i "percona" ; then
tarball_name=Percona-Server-${mysql_version#*-}-12-Linux.x86_64.glibc2.12-minimal.tar.gz
rm -f gh-ost-ci-env/mysql-tarballs/${tarball_name}
ln -s "$mysql_version".tar.xz gh-ost-ci-env/mysql-tarballs/${tarball_name}
gh-ost-ci-env/bin/linux/dbdeployer unpack gh-ost-ci-env/mysql-tarballs/${tarball_name} --sandbox-binary ${PWD}/sandbox/binary
rm -f gh-ost-ci-env/mysql-tarballs/${tarball_name}
else
gh-ost-ci-env/bin/linux/dbdeployer unpack gh-ost-ci-env/mysql-tarballs/"$mysql_version".tar.xz --sandbox-binary ${PWD}/sandbox/binary
fi
mkdir -p sandboxes mkdir -p sandboxes
rm -rf sandboxes/* rm -rf sandboxes/*
@ -60,9 +68,21 @@ test_mysql_version() {
gh-ost-test-mysql-master -uroot -e "create user 'gh-ost'@'%' identified by 'gh-ost'" gh-ost-test-mysql-master -uroot -e "create user 'gh-ost'@'%' identified by 'gh-ost'"
gh-ost-test-mysql-master -uroot -e "grant all on *.* to 'gh-ost'@'%'" gh-ost-test-mysql-master -uroot -e "grant all on *.* to 'gh-ost'@'%'"
echo "### Running gh-ost tests for $mysql_version" if echo "$mysql_server" | egrep -i "percona" ; then
./localtests/test.sh -b bin/gh-ost echo "### Preparing for rocksdb in PerconaServer"
gh-ost-test-mysql-master -uroot -e 'INSTALL PLUGIN ROCKSDB SONAME "ha_rocksdb.so"'
gh-ost-test-mysql-master -uroot -e 'set global default_storage_engine="ROCKSDB"'
gh-ost-test-mysql-master -uroot -e 'set global transaction_isolation="READ-COMMITTED"'
gh-ost-test-mysql-replica -uroot -e 'INSTALL PLUGIN ROCKSDB SONAME "ha_rocksdb.so"'
gh-ost-test-mysql-replica -uroot -e 'set global default_storage_engine="ROCKSDB"'
gh-ost-test-mysql-replica -uroot -e 'set global transaction_isolation="READ-COMMITTED"'
echo "### Running gh-ost tests for $mysql_version"
./localtests/test.sh -b bin/gh-ost -s rocksdb
else
echo "### Running gh-ost tests for $mysql_version"
./localtests/test.sh -b bin/gh-ost -s innodb
fi
find sandboxes -name "stop_all" | bash find sandboxes -name "stop_all" | bash
} }