reading table (range) min/max values, right now according to hardcoded unique key
This commit is contained in:
parent
937491674c
commit
ea0906f4e5
@ -9,6 +9,7 @@ import (
|
||||
"fmt"
|
||||
|
||||
"github.com/github/gh-osc/go/mysql"
|
||||
"github.com/github/gh-osc/go/sql"
|
||||
)
|
||||
|
||||
type RowsEstimateMethod string
|
||||
@ -33,6 +34,8 @@ type MigrationContext struct {
|
||||
AllowedRunningOnMaster bool
|
||||
InspectorConnectionConfig *mysql.ConnectionConfig
|
||||
MasterConnectionConfig *mysql.ConnectionConfig
|
||||
MigrationRangeMinValues *sql.ColumnValues
|
||||
MigrationRangeMaxValues *sql.ColumnValues
|
||||
}
|
||||
|
||||
var context *MigrationContext
|
||||
@ -49,10 +52,12 @@ func newMigrationContext() *MigrationContext {
|
||||
}
|
||||
}
|
||||
|
||||
// GetMigrationContext
|
||||
func GetMigrationContext() *MigrationContext {
|
||||
return context
|
||||
}
|
||||
|
||||
// GetGhostTableName
|
||||
func (this *MigrationContext) GetGhostTableName() string {
|
||||
return fmt.Sprintf("_%s_New", this.OriginalTableName)
|
||||
}
|
||||
@ -62,7 +67,12 @@ func (this *MigrationContext) RequiresBinlogFormatChange() bool {
|
||||
return this.OriginalBinlogFormat != "ROW"
|
||||
}
|
||||
|
||||
// RequiresBinlogFormatChange
|
||||
// IsRunningOnMaster
|
||||
func (this *MigrationContext) IsRunningOnMaster() bool {
|
||||
return this.InspectorConnectionConfig.Equals(this.MasterConnectionConfig)
|
||||
}
|
||||
|
||||
// HasMigrationRange
|
||||
func (this *MigrationContext) HasMigrationRange() bool {
|
||||
return this.MigrationRangeMinValues != nil && MigrationRangeMaxValues != nil
|
||||
}
|
||||
|
@ -36,6 +36,8 @@ func main() {
|
||||
flag.BoolVar(&migrationContext.CountTableRows, "exact-rowcount", false, "actually count table rows as opposed to estimate them (results in more accurate progress estimation)")
|
||||
flag.BoolVar(&migrationContext.AllowedRunningOnMaster, "allow-on-master", false, "allow this migration to run directly on master. Preferably it would run on a replica")
|
||||
|
||||
flag.IntVar(&migrationContext.ChunkSize, "chunk-size", 1000, "amount of rows to handle in each iteration")
|
||||
|
||||
quiet := flag.Bool("quiet", false, "quiet")
|
||||
verbose := flag.Bool("verbose", false, "verbose")
|
||||
debug := flag.Bool("debug", false, "debug mode (very verbose)")
|
||||
|
@ -11,6 +11,7 @@ import (
|
||||
"github.com/github/gh-osc/go/base"
|
||||
"github.com/github/gh-osc/go/mysql"
|
||||
"github.com/github/gh-osc/go/sql"
|
||||
"reflect"
|
||||
|
||||
"github.com/outbrain/golib/log"
|
||||
"github.com/outbrain/golib/sqlutils"
|
||||
@ -93,3 +94,85 @@ func (this *Applier) AlterGhost() error {
|
||||
log.Infof("Table altered")
|
||||
return nil
|
||||
}
|
||||
|
||||
// ReadMigrationMinValues
|
||||
func (this *Applier) ReadMigrationMinValues(uniqueKey *sql.UniqueKey) error {
|
||||
log.Debugf("Reading migration range according to key: %s", uniqueKey.Name)
|
||||
query, err := sql.BuildUniqueKeyMinValuesPreparedQuery(this.migrationContext.DatabaseName, this.migrationContext.OriginalTableName, uniqueKey.Columns)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
rows, err := this.db.Query(query)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for rows.Next() {
|
||||
this.migrationContext.MigrationRangeMinValues = sql.NewColumnValues(len(uniqueKey.Columns))
|
||||
if err = rows.Scan(this.migrationContext.MigrationRangeMinValues.ValuesPointers...); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
log.Infof("Migration min values: [%s]", this.migrationContext.MigrationRangeMinValues)
|
||||
return err
|
||||
}
|
||||
|
||||
// ReadMigrationMinValues
|
||||
func (this *Applier) ReadMigrationMaxValues(uniqueKey *sql.UniqueKey) error {
|
||||
log.Debugf("Reading migration range according to key: %s", uniqueKey.Name)
|
||||
query, err := sql.BuildUniqueKeyMaxValuesPreparedQuery(this.migrationContext.DatabaseName, this.migrationContext.OriginalTableName, uniqueKey.Columns)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
rows, err := this.db.Query(query)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for rows.Next() {
|
||||
this.migrationContext.MigrationRangeMaxValues = sql.NewColumnValues(len(uniqueKey.Columns))
|
||||
if err = rows.Scan(this.migrationContext.MigrationRangeMaxValues.ValuesPointers...); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
log.Infof("Migration max values: [%s]", this.migrationContext.MigrationRangeMaxValues)
|
||||
return err
|
||||
}
|
||||
|
||||
func (this *Applier) ReadMigrationRangeValues(uniqueKey *sql.UniqueKey) error {
|
||||
if err := this.ReadMigrationMinValues(uniqueKey); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := this.ReadMigrationMaxValues(uniqueKey); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// IterateTable
|
||||
func (this *Applier) IterateTable(uniqueKey *sql.UniqueKey) error {
|
||||
query, err := sql.BuildUniqueKeyMinValuesPreparedQuery(this.migrationContext.DatabaseName, this.migrationContext.OriginalTableName, uniqueKey.Columns)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
columnValues := sql.NewColumnValues(len(uniqueKey.Columns))
|
||||
|
||||
rows, err := this.db.Query(query)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for rows.Next() {
|
||||
if err = rows.Scan(columnValues.ValuesPointers...); err != nil {
|
||||
return err
|
||||
}
|
||||
for _, val := range columnValues.BinaryValues() {
|
||||
log.Debugf("%s", reflect.TypeOf(val))
|
||||
log.Debugf("%s", string(val))
|
||||
}
|
||||
}
|
||||
log.Debugf("column values: %s", columnValues)
|
||||
query = `insert into test.sample_data_dump (category, ts) values (?, ?)`
|
||||
if _, err := sqlutils.Exec(this.db, query, columnValues.AbstractValues()...); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -50,6 +50,9 @@ func (this *Inspector) InitDBConnections() (err error) {
|
||||
if err := this.validateTable(); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := this.validateTableForeignKeys(); err != nil {
|
||||
return err
|
||||
}
|
||||
if this.migrationContext.CountTableRows {
|
||||
if err := this.countTableRows(); err != nil {
|
||||
return err
|
||||
@ -62,15 +65,15 @@ func (this *Inspector) InitDBConnections() (err error) {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (this *Inspector) InspectTables() (err error) {
|
||||
uniqueKeys, err := this.getCandidateUniqueKeys(this.migrationContext.OriginalTableName)
|
||||
func (this *Inspector) InspectTables() (uniqueKeys [](*sql.UniqueKey), err error) {
|
||||
uniqueKeys, err = this.getCandidateUniqueKeys(this.migrationContext.OriginalTableName)
|
||||
if err != nil {
|
||||
return err
|
||||
return uniqueKeys, err
|
||||
}
|
||||
if len(uniqueKeys) == 0 {
|
||||
return fmt.Errorf("No PRIMARY nor UNIQUE key found in table! Bailing out")
|
||||
return uniqueKeys, fmt.Errorf("No PRIMARY nor UNIQUE key found in table! Bailing out")
|
||||
}
|
||||
return nil
|
||||
return uniqueKeys, err
|
||||
}
|
||||
|
||||
// validateConnection issues a simple can-connect to MySQL
|
||||
@ -194,6 +197,37 @@ func (this *Inspector) validateTable() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (this *Inspector) validateTableForeignKeys() error {
|
||||
query := `
|
||||
SELECT COUNT(*) AS num_foreign_keys
|
||||
FROM INFORMATION_SCHEMA.KEY_COLUMN_USAGE
|
||||
WHERE
|
||||
REFERENCED_TABLE_NAME IS NOT NULL
|
||||
AND ((TABLE_SCHEMA=? AND TABLE_NAME=?)
|
||||
OR (REFERENCED_TABLE_SCHEMA=? AND REFERENCED_TABLE_NAME=?)
|
||||
)
|
||||
`
|
||||
numForeignKeys := 0
|
||||
err := sqlutils.QueryRowsMap(this.db, query, func(rowMap sqlutils.RowMap) error {
|
||||
numForeignKeys = rowMap.GetInt("num_foreign_keys")
|
||||
|
||||
return nil
|
||||
},
|
||||
this.migrationContext.DatabaseName,
|
||||
this.migrationContext.OriginalTableName,
|
||||
this.migrationContext.DatabaseName,
|
||||
this.migrationContext.OriginalTableName,
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if numForeignKeys > 0 {
|
||||
return log.Errorf("Found %d foreign keys on %s.%s. Foreign keys are not supported. Bailing out", numForeignKeys, sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.OriginalTableName))
|
||||
}
|
||||
log.Debugf("Validated no foreign keys exist on table")
|
||||
return nil
|
||||
}
|
||||
|
||||
func (this *Inspector) estimateTableRowsViaExplain() error {
|
||||
query := fmt.Sprintf(`explain select /* gh-osc */ * from %s.%s where 1=1`, sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.OriginalTableName))
|
||||
|
||||
|
@ -38,7 +38,8 @@ func (this *Migrator) Migrate() (err error) {
|
||||
return fmt.Errorf("It seems like this migration attempt to run directly on master. Preferably it would be executed on a replica (and this reduces load from the master). To proceed please provide --allow-on-master")
|
||||
}
|
||||
log.Infof("Master found to be %+v", this.migrationContext.MasterConnectionConfig.Key)
|
||||
if err := this.inspector.InspectTables(); err != nil {
|
||||
uniqueKeys, err := this.inspector.InspectTables()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@ -46,12 +47,19 @@ func (this *Migrator) Migrate() (err error) {
|
||||
if err := this.applier.InitDBConnections(); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := this.applier.CreateGhostTable(); err != nil {
|
||||
log.Errorf("Unable to create ghost table, see further error details. Perhaps a previous migration failed without dropping the table? Bailing out")
|
||||
// if err := this.applier.CreateGhostTable(); err != nil {
|
||||
// log.Errorf("Unable to create ghost table, see further error details. Perhaps a previous migration failed without dropping the table? Bailing out")
|
||||
// return err
|
||||
// }
|
||||
// if err := this.applier.AlterGhost(); err != nil {
|
||||
// log.Errorf("Unable to ALTER ghost table, see further error details. Bailing out")
|
||||
// return err
|
||||
// }
|
||||
|
||||
if err := this.applier.ReadMigrationRangeValues(uniqueKeys[0]); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := this.applier.AlterGhost(); err != nil {
|
||||
log.Errorf("Unable to ALTER ghost table, see further error details. Bailing out")
|
||||
if err := this.applier.IterateTable(uniqueKeys[0]); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -163,12 +163,12 @@ func BuildRangeInsertPreparedQuery(databaseName, originalTableName, ghostTableNa
|
||||
return BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName, sharedColumns, uniqueKey, uniqueKeyColumns, rangeStartValues, rangeEndValues)
|
||||
}
|
||||
|
||||
func BuildUniqueKeyRangeEndPreparedQuery(databaseName, originalTableName string, uniqueKeyColumns []string, chunkSize int) (string, error) {
|
||||
func BuildUniqueKeyRangeEndPreparedQuery(databaseName, tableName string, uniqueKeyColumns []string, chunkSize int) (string, error) {
|
||||
if len(uniqueKeyColumns) == 0 {
|
||||
return "", fmt.Errorf("Got 0 shared columns in BuildRangeInsertQuery")
|
||||
return "", fmt.Errorf("Got 0 columns in BuildUniqueKeyRangeEndPreparedQuery")
|
||||
}
|
||||
databaseName = EscapeName(databaseName)
|
||||
originalTableName = EscapeName(originalTableName)
|
||||
tableName = EscapeName(tableName)
|
||||
|
||||
rangeStartComparison, err := BuildRangePreparedComparison(uniqueKeyColumns, GreaterThanComparisonSign)
|
||||
if err != nil {
|
||||
@ -200,11 +200,45 @@ func BuildUniqueKeyRangeEndPreparedQuery(databaseName, originalTableName string,
|
||||
order by
|
||||
%s
|
||||
limit 1
|
||||
`, databaseName, originalTableName, strings.Join(uniqueKeyColumns, ", "),
|
||||
strings.Join(uniqueKeyColumns, ", "), databaseName, originalTableName,
|
||||
`, databaseName, tableName, strings.Join(uniqueKeyColumns, ", "),
|
||||
strings.Join(uniqueKeyColumns, ", "), databaseName, tableName,
|
||||
rangeStartComparison, rangeEndComparison,
|
||||
strings.Join(uniqueKeyColumnAscending, ", "), chunkSize,
|
||||
strings.Join(uniqueKeyColumnDescending, ", "),
|
||||
)
|
||||
return query, nil
|
||||
}
|
||||
|
||||
func BuildUniqueKeyMinValuesPreparedQuery(databaseName, tableName string, uniqueKeyColumns []string) (string, error) {
|
||||
return buildUniqueKeyMinMaxValuesPreparedQuery(databaseName, tableName, uniqueKeyColumns, "asc")
|
||||
}
|
||||
|
||||
func BuildUniqueKeyMaxValuesPreparedQuery(databaseName, tableName string, uniqueKeyColumns []string) (string, error) {
|
||||
return buildUniqueKeyMinMaxValuesPreparedQuery(databaseName, tableName, uniqueKeyColumns, "desc")
|
||||
}
|
||||
|
||||
func buildUniqueKeyMinMaxValuesPreparedQuery(databaseName, tableName string, uniqueKeyColumns []string, order string) (string, error) {
|
||||
if len(uniqueKeyColumns) == 0 {
|
||||
return "", fmt.Errorf("Got 0 columns in BuildUniqueKeyMinMaxValuesPreparedQuery")
|
||||
}
|
||||
databaseName = EscapeName(databaseName)
|
||||
tableName = EscapeName(tableName)
|
||||
|
||||
uniqueKeyColumnOrder := make([]string, len(uniqueKeyColumns), len(uniqueKeyColumns))
|
||||
for i := range uniqueKeyColumns {
|
||||
uniqueKeyColumns[i] = EscapeName(uniqueKeyColumns[i])
|
||||
uniqueKeyColumnOrder[i] = fmt.Sprintf("%s %s", uniqueKeyColumns[i], order)
|
||||
}
|
||||
query := fmt.Sprintf(`
|
||||
select /* gh-osc %s.%s */ %s
|
||||
from
|
||||
%s.%s
|
||||
order by
|
||||
%s
|
||||
limit 1
|
||||
`, databaseName, tableName, strings.Join(uniqueKeyColumns, ", "),
|
||||
databaseName, tableName,
|
||||
strings.Join(uniqueKeyColumnOrder, ", "),
|
||||
)
|
||||
return query, nil
|
||||
}
|
||||
|
@ -205,3 +205,35 @@ func TestBuildUniqueKeyRangeEndPreparedQuery(t *testing.T) {
|
||||
test.S(t).ExpectEquals(normalizeQuery(query), normalizeQuery(expected))
|
||||
}
|
||||
}
|
||||
|
||||
func TestBuildUniqueKeyMinValuesPreparedQuery(t *testing.T) {
|
||||
databaseName := "mydb"
|
||||
originalTableName := "tbl"
|
||||
uniqueKeyColumns := []string{"name", "position"}
|
||||
{
|
||||
query, err := BuildUniqueKeyMinValuesPreparedQuery(databaseName, originalTableName, uniqueKeyColumns)
|
||||
test.S(t).ExpectNil(err)
|
||||
expected := `
|
||||
select /* gh-osc mydb.tbl */ name, position
|
||||
from
|
||||
mydb.tbl
|
||||
order by
|
||||
name asc, position asc
|
||||
limit 1
|
||||
`
|
||||
test.S(t).ExpectEquals(normalizeQuery(query), normalizeQuery(expected))
|
||||
}
|
||||
{
|
||||
query, err := BuildUniqueKeyMaxValuesPreparedQuery(databaseName, originalTableName, uniqueKeyColumns)
|
||||
test.S(t).ExpectNil(err)
|
||||
expected := `
|
||||
select /* gh-osc mydb.tbl */ name, position
|
||||
from
|
||||
mydb.tbl
|
||||
order by
|
||||
name desc, position desc
|
||||
limit 1
|
||||
`
|
||||
test.S(t).ExpectEquals(normalizeQuery(query), normalizeQuery(expected))
|
||||
}
|
||||
}
|
||||
|
@ -43,3 +43,40 @@ func (this *UniqueKey) IsPrimary() bool {
|
||||
func (this *UniqueKey) String() string {
|
||||
return fmt.Sprintf("%s: %s; has nullable: %+v", this.Name, this.Columns, this.HasNullable)
|
||||
}
|
||||
|
||||
type ColumnValues struct {
|
||||
abstractValues []interface{}
|
||||
ValuesPointers []interface{}
|
||||
}
|
||||
|
||||
func NewColumnValues(length int) *ColumnValues {
|
||||
result := &ColumnValues{
|
||||
abstractValues: make([]interface{}, length),
|
||||
ValuesPointers: make([]interface{}, length),
|
||||
}
|
||||
for i := 0; i < length; i++ {
|
||||
result.ValuesPointers[i] = &result.abstractValues[i]
|
||||
}
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
func (this *ColumnValues) AbstractValues() []interface{} {
|
||||
return this.abstractValues
|
||||
}
|
||||
|
||||
func (this *ColumnValues) BinaryValues() [][]uint8 {
|
||||
result := make([][]uint8, len(this.abstractValues), len(this.abstractValues))
|
||||
for i, val := range this.abstractValues {
|
||||
result[i] = val.([]uint8)
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
func (this *ColumnValues) String() string {
|
||||
stringValues := []string{}
|
||||
for _, val := range this.BinaryValues() {
|
||||
stringValues = append(stringValues, string(val))
|
||||
}
|
||||
return strings.Join(stringValues, ",")
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user