From ea0906f4e5b388b1c16e351d7e68083012581508 Mon Sep 17 00:00:00 2001 From: Shlomi Noach Date: Mon, 4 Apr 2016 18:19:46 +0200 Subject: [PATCH] reading table (range) min/max values, right now according to hardcoded unique key --- go/base/context.go | 12 +++++- go/cmd/gh-osc/main.go | 2 + go/logic/applier.go | 83 ++++++++++++++++++++++++++++++++++++++++++ go/logic/inspect.go | 44 +++++++++++++++++++--- go/logic/migrator.go | 18 ++++++--- go/sql/builder.go | 44 +++++++++++++++++++--- go/sql/builder_test.go | 32 ++++++++++++++++ go/sql/types.go | 37 +++++++++++++++++++ 8 files changed, 256 insertions(+), 16 deletions(-) diff --git a/go/base/context.go b/go/base/context.go index d10b23e..5aea516 100644 --- a/go/base/context.go +++ b/go/base/context.go @@ -9,6 +9,7 @@ import ( "fmt" "github.com/github/gh-osc/go/mysql" + "github.com/github/gh-osc/go/sql" ) type RowsEstimateMethod string @@ -33,6 +34,8 @@ type MigrationContext struct { AllowedRunningOnMaster bool InspectorConnectionConfig *mysql.ConnectionConfig MasterConnectionConfig *mysql.ConnectionConfig + MigrationRangeMinValues *sql.ColumnValues + MigrationRangeMaxValues *sql.ColumnValues } var context *MigrationContext @@ -49,10 +52,12 @@ func newMigrationContext() *MigrationContext { } } +// GetMigrationContext func GetMigrationContext() *MigrationContext { return context } +// GetGhostTableName func (this *MigrationContext) GetGhostTableName() string { return fmt.Sprintf("_%s_New", this.OriginalTableName) } @@ -62,7 +67,12 @@ func (this *MigrationContext) RequiresBinlogFormatChange() bool { return this.OriginalBinlogFormat != "ROW" } -// RequiresBinlogFormatChange +// IsRunningOnMaster func (this *MigrationContext) IsRunningOnMaster() bool { return this.InspectorConnectionConfig.Equals(this.MasterConnectionConfig) } + +// HasMigrationRange +func (this *MigrationContext) HasMigrationRange() bool { + return this.MigrationRangeMinValues != nil && MigrationRangeMaxValues != nil +} diff --git a/go/cmd/gh-osc/main.go b/go/cmd/gh-osc/main.go index 7f73993..af1f431 100644 --- a/go/cmd/gh-osc/main.go +++ b/go/cmd/gh-osc/main.go @@ -36,6 +36,8 @@ func main() { flag.BoolVar(&migrationContext.CountTableRows, "exact-rowcount", false, "actually count table rows as opposed to estimate them (results in more accurate progress estimation)") flag.BoolVar(&migrationContext.AllowedRunningOnMaster, "allow-on-master", false, "allow this migration to run directly on master. Preferably it would run on a replica") + flag.IntVar(&migrationContext.ChunkSize, "chunk-size", 1000, "amount of rows to handle in each iteration") + quiet := flag.Bool("quiet", false, "quiet") verbose := flag.Bool("verbose", false, "verbose") debug := flag.Bool("debug", false, "debug mode (very verbose)") diff --git a/go/logic/applier.go b/go/logic/applier.go index 57a796d..cd0ad41 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -11,6 +11,7 @@ import ( "github.com/github/gh-osc/go/base" "github.com/github/gh-osc/go/mysql" "github.com/github/gh-osc/go/sql" + "reflect" "github.com/outbrain/golib/log" "github.com/outbrain/golib/sqlutils" @@ -93,3 +94,85 @@ func (this *Applier) AlterGhost() error { log.Infof("Table altered") return nil } + +// ReadMigrationMinValues +func (this *Applier) ReadMigrationMinValues(uniqueKey *sql.UniqueKey) error { + log.Debugf("Reading migration range according to key: %s", uniqueKey.Name) + query, err := sql.BuildUniqueKeyMinValuesPreparedQuery(this.migrationContext.DatabaseName, this.migrationContext.OriginalTableName, uniqueKey.Columns) + if err != nil { + return err + } + rows, err := this.db.Query(query) + if err != nil { + return err + } + for rows.Next() { + this.migrationContext.MigrationRangeMinValues = sql.NewColumnValues(len(uniqueKey.Columns)) + if err = rows.Scan(this.migrationContext.MigrationRangeMinValues.ValuesPointers...); err != nil { + return err + } + } + log.Infof("Migration min values: [%s]", this.migrationContext.MigrationRangeMinValues) + return err +} + +// ReadMigrationMinValues +func (this *Applier) ReadMigrationMaxValues(uniqueKey *sql.UniqueKey) error { + log.Debugf("Reading migration range according to key: %s", uniqueKey.Name) + query, err := sql.BuildUniqueKeyMaxValuesPreparedQuery(this.migrationContext.DatabaseName, this.migrationContext.OriginalTableName, uniqueKey.Columns) + if err != nil { + return err + } + rows, err := this.db.Query(query) + if err != nil { + return err + } + for rows.Next() { + this.migrationContext.MigrationRangeMaxValues = sql.NewColumnValues(len(uniqueKey.Columns)) + if err = rows.Scan(this.migrationContext.MigrationRangeMaxValues.ValuesPointers...); err != nil { + return err + } + } + log.Infof("Migration max values: [%s]", this.migrationContext.MigrationRangeMaxValues) + return err +} + +func (this *Applier) ReadMigrationRangeValues(uniqueKey *sql.UniqueKey) error { + if err := this.ReadMigrationMinValues(uniqueKey); err != nil { + return err + } + if err := this.ReadMigrationMaxValues(uniqueKey); err != nil { + return err + } + return nil +} + +// IterateTable +func (this *Applier) IterateTable(uniqueKey *sql.UniqueKey) error { + query, err := sql.BuildUniqueKeyMinValuesPreparedQuery(this.migrationContext.DatabaseName, this.migrationContext.OriginalTableName, uniqueKey.Columns) + if err != nil { + return err + } + columnValues := sql.NewColumnValues(len(uniqueKey.Columns)) + + rows, err := this.db.Query(query) + if err != nil { + return err + } + for rows.Next() { + if err = rows.Scan(columnValues.ValuesPointers...); err != nil { + return err + } + for _, val := range columnValues.BinaryValues() { + log.Debugf("%s", reflect.TypeOf(val)) + log.Debugf("%s", string(val)) + } + } + log.Debugf("column values: %s", columnValues) + query = `insert into test.sample_data_dump (category, ts) values (?, ?)` + if _, err := sqlutils.Exec(this.db, query, columnValues.AbstractValues()...); err != nil { + return err + } + + return nil +} diff --git a/go/logic/inspect.go b/go/logic/inspect.go index 9da7abc..b793432 100644 --- a/go/logic/inspect.go +++ b/go/logic/inspect.go @@ -50,6 +50,9 @@ func (this *Inspector) InitDBConnections() (err error) { if err := this.validateTable(); err != nil { return err } + if err := this.validateTableForeignKeys(); err != nil { + return err + } if this.migrationContext.CountTableRows { if err := this.countTableRows(); err != nil { return err @@ -62,15 +65,15 @@ func (this *Inspector) InitDBConnections() (err error) { return nil } -func (this *Inspector) InspectTables() (err error) { - uniqueKeys, err := this.getCandidateUniqueKeys(this.migrationContext.OriginalTableName) +func (this *Inspector) InspectTables() (uniqueKeys [](*sql.UniqueKey), err error) { + uniqueKeys, err = this.getCandidateUniqueKeys(this.migrationContext.OriginalTableName) if err != nil { - return err + return uniqueKeys, err } if len(uniqueKeys) == 0 { - return fmt.Errorf("No PRIMARY nor UNIQUE key found in table! Bailing out") + return uniqueKeys, fmt.Errorf("No PRIMARY nor UNIQUE key found in table! Bailing out") } - return nil + return uniqueKeys, err } // validateConnection issues a simple can-connect to MySQL @@ -194,6 +197,37 @@ func (this *Inspector) validateTable() error { return nil } +func (this *Inspector) validateTableForeignKeys() error { + query := ` + SELECT COUNT(*) AS num_foreign_keys + FROM INFORMATION_SCHEMA.KEY_COLUMN_USAGE + WHERE + REFERENCED_TABLE_NAME IS NOT NULL + AND ((TABLE_SCHEMA=? AND TABLE_NAME=?) + OR (REFERENCED_TABLE_SCHEMA=? AND REFERENCED_TABLE_NAME=?) + ) + ` + numForeignKeys := 0 + err := sqlutils.QueryRowsMap(this.db, query, func(rowMap sqlutils.RowMap) error { + numForeignKeys = rowMap.GetInt("num_foreign_keys") + + return nil + }, + this.migrationContext.DatabaseName, + this.migrationContext.OriginalTableName, + this.migrationContext.DatabaseName, + this.migrationContext.OriginalTableName, + ) + if err != nil { + return err + } + if numForeignKeys > 0 { + return log.Errorf("Found %d foreign keys on %s.%s. Foreign keys are not supported. Bailing out", numForeignKeys, sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.OriginalTableName)) + } + log.Debugf("Validated no foreign keys exist on table") + return nil +} + func (this *Inspector) estimateTableRowsViaExplain() error { query := fmt.Sprintf(`explain select /* gh-osc */ * from %s.%s where 1=1`, sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.OriginalTableName)) diff --git a/go/logic/migrator.go b/go/logic/migrator.go index d35346e..549a4d4 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -38,7 +38,8 @@ func (this *Migrator) Migrate() (err error) { return fmt.Errorf("It seems like this migration attempt to run directly on master. Preferably it would be executed on a replica (and this reduces load from the master). To proceed please provide --allow-on-master") } log.Infof("Master found to be %+v", this.migrationContext.MasterConnectionConfig.Key) - if err := this.inspector.InspectTables(); err != nil { + uniqueKeys, err := this.inspector.InspectTables() + if err != nil { return err } @@ -46,12 +47,19 @@ func (this *Migrator) Migrate() (err error) { if err := this.applier.InitDBConnections(); err != nil { return err } - if err := this.applier.CreateGhostTable(); err != nil { - log.Errorf("Unable to create ghost table, see further error details. Perhaps a previous migration failed without dropping the table? Bailing out") + // if err := this.applier.CreateGhostTable(); err != nil { + // log.Errorf("Unable to create ghost table, see further error details. Perhaps a previous migration failed without dropping the table? Bailing out") + // return err + // } + // if err := this.applier.AlterGhost(); err != nil { + // log.Errorf("Unable to ALTER ghost table, see further error details. Bailing out") + // return err + // } + + if err := this.applier.ReadMigrationRangeValues(uniqueKeys[0]); err != nil { return err } - if err := this.applier.AlterGhost(); err != nil { - log.Errorf("Unable to ALTER ghost table, see further error details. Bailing out") + if err := this.applier.IterateTable(uniqueKeys[0]); err != nil { return err } diff --git a/go/sql/builder.go b/go/sql/builder.go index e5c5659..1bae136 100644 --- a/go/sql/builder.go +++ b/go/sql/builder.go @@ -163,12 +163,12 @@ func BuildRangeInsertPreparedQuery(databaseName, originalTableName, ghostTableNa return BuildRangeInsertQuery(databaseName, originalTableName, ghostTableName, sharedColumns, uniqueKey, uniqueKeyColumns, rangeStartValues, rangeEndValues) } -func BuildUniqueKeyRangeEndPreparedQuery(databaseName, originalTableName string, uniqueKeyColumns []string, chunkSize int) (string, error) { +func BuildUniqueKeyRangeEndPreparedQuery(databaseName, tableName string, uniqueKeyColumns []string, chunkSize int) (string, error) { if len(uniqueKeyColumns) == 0 { - return "", fmt.Errorf("Got 0 shared columns in BuildRangeInsertQuery") + return "", fmt.Errorf("Got 0 columns in BuildUniqueKeyRangeEndPreparedQuery") } databaseName = EscapeName(databaseName) - originalTableName = EscapeName(originalTableName) + tableName = EscapeName(tableName) rangeStartComparison, err := BuildRangePreparedComparison(uniqueKeyColumns, GreaterThanComparisonSign) if err != nil { @@ -200,11 +200,45 @@ func BuildUniqueKeyRangeEndPreparedQuery(databaseName, originalTableName string, order by %s limit 1 - `, databaseName, originalTableName, strings.Join(uniqueKeyColumns, ", "), - strings.Join(uniqueKeyColumns, ", "), databaseName, originalTableName, + `, databaseName, tableName, strings.Join(uniqueKeyColumns, ", "), + strings.Join(uniqueKeyColumns, ", "), databaseName, tableName, rangeStartComparison, rangeEndComparison, strings.Join(uniqueKeyColumnAscending, ", "), chunkSize, strings.Join(uniqueKeyColumnDescending, ", "), ) return query, nil } + +func BuildUniqueKeyMinValuesPreparedQuery(databaseName, tableName string, uniqueKeyColumns []string) (string, error) { + return buildUniqueKeyMinMaxValuesPreparedQuery(databaseName, tableName, uniqueKeyColumns, "asc") +} + +func BuildUniqueKeyMaxValuesPreparedQuery(databaseName, tableName string, uniqueKeyColumns []string) (string, error) { + return buildUniqueKeyMinMaxValuesPreparedQuery(databaseName, tableName, uniqueKeyColumns, "desc") +} + +func buildUniqueKeyMinMaxValuesPreparedQuery(databaseName, tableName string, uniqueKeyColumns []string, order string) (string, error) { + if len(uniqueKeyColumns) == 0 { + return "", fmt.Errorf("Got 0 columns in BuildUniqueKeyMinMaxValuesPreparedQuery") + } + databaseName = EscapeName(databaseName) + tableName = EscapeName(tableName) + + uniqueKeyColumnOrder := make([]string, len(uniqueKeyColumns), len(uniqueKeyColumns)) + for i := range uniqueKeyColumns { + uniqueKeyColumns[i] = EscapeName(uniqueKeyColumns[i]) + uniqueKeyColumnOrder[i] = fmt.Sprintf("%s %s", uniqueKeyColumns[i], order) + } + query := fmt.Sprintf(` + select /* gh-osc %s.%s */ %s + from + %s.%s + order by + %s + limit 1 + `, databaseName, tableName, strings.Join(uniqueKeyColumns, ", "), + databaseName, tableName, + strings.Join(uniqueKeyColumnOrder, ", "), + ) + return query, nil +} diff --git a/go/sql/builder_test.go b/go/sql/builder_test.go index 289cacf..52bd814 100644 --- a/go/sql/builder_test.go +++ b/go/sql/builder_test.go @@ -205,3 +205,35 @@ func TestBuildUniqueKeyRangeEndPreparedQuery(t *testing.T) { test.S(t).ExpectEquals(normalizeQuery(query), normalizeQuery(expected)) } } + +func TestBuildUniqueKeyMinValuesPreparedQuery(t *testing.T) { + databaseName := "mydb" + originalTableName := "tbl" + uniqueKeyColumns := []string{"name", "position"} + { + query, err := BuildUniqueKeyMinValuesPreparedQuery(databaseName, originalTableName, uniqueKeyColumns) + test.S(t).ExpectNil(err) + expected := ` + select /* gh-osc mydb.tbl */ name, position + from + mydb.tbl + order by + name asc, position asc + limit 1 + ` + test.S(t).ExpectEquals(normalizeQuery(query), normalizeQuery(expected)) + } + { + query, err := BuildUniqueKeyMaxValuesPreparedQuery(databaseName, originalTableName, uniqueKeyColumns) + test.S(t).ExpectNil(err) + expected := ` + select /* gh-osc mydb.tbl */ name, position + from + mydb.tbl + order by + name desc, position desc + limit 1 + ` + test.S(t).ExpectEquals(normalizeQuery(query), normalizeQuery(expected)) + } +} diff --git a/go/sql/types.go b/go/sql/types.go index 2646e2f..9ff719c 100644 --- a/go/sql/types.go +++ b/go/sql/types.go @@ -43,3 +43,40 @@ func (this *UniqueKey) IsPrimary() bool { func (this *UniqueKey) String() string { return fmt.Sprintf("%s: %s; has nullable: %+v", this.Name, this.Columns, this.HasNullable) } + +type ColumnValues struct { + abstractValues []interface{} + ValuesPointers []interface{} +} + +func NewColumnValues(length int) *ColumnValues { + result := &ColumnValues{ + abstractValues: make([]interface{}, length), + ValuesPointers: make([]interface{}, length), + } + for i := 0; i < length; i++ { + result.ValuesPointers[i] = &result.abstractValues[i] + } + + return result +} + +func (this *ColumnValues) AbstractValues() []interface{} { + return this.abstractValues +} + +func (this *ColumnValues) BinaryValues() [][]uint8 { + result := make([][]uint8, len(this.abstractValues), len(this.abstractValues)) + for i, val := range this.abstractValues { + result[i] = val.([]uint8) + } + return result +} + +func (this *ColumnValues) String() string { + stringValues := []string{} + for _, val := range this.BinaryValues() { + stringValues = append(stringValues, string(val)) + } + return strings.Join(stringValues, ",") +}