merged master, resolved conflicts
This commit is contained in:
commit
ac6159791d
2
build.sh
2
build.sh
@ -2,7 +2,7 @@
|
|||||||
#
|
#
|
||||||
#
|
#
|
||||||
|
|
||||||
RELEASE_VERSION="1.0.24"
|
RELEASE_VERSION="1.0.26"
|
||||||
|
|
||||||
function build {
|
function build {
|
||||||
osname=$1
|
osname=$1
|
||||||
|
@ -147,6 +147,7 @@ type MigrationContext struct {
|
|||||||
UserCommandedUnpostponeFlag int64
|
UserCommandedUnpostponeFlag int64
|
||||||
PanicAbort chan error
|
PanicAbort chan error
|
||||||
|
|
||||||
|
OriginalTableColumnsOnApplier *sql.ColumnList
|
||||||
OriginalTableColumns *sql.ColumnList
|
OriginalTableColumns *sql.ColumnList
|
||||||
OriginalTableUniqueKeys [](*sql.UniqueKey)
|
OriginalTableUniqueKeys [](*sql.UniqueKey)
|
||||||
GhostTableColumns *sql.ColumnList
|
GhostTableColumns *sql.ColumnList
|
||||||
|
@ -67,6 +67,9 @@ func (this *Applier) InitDBConnections() (err error) {
|
|||||||
} else {
|
} else {
|
||||||
this.connectionConfig.ImpliedKey = impliedKey
|
this.connectionConfig.ImpliedKey = impliedKey
|
||||||
}
|
}
|
||||||
|
if err := this.readTableColumns(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -95,6 +98,16 @@ func (this *Applier) validateAndReadTimeZone() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// readTableColumns reads table columns on applier
|
||||||
|
func (this *Applier) readTableColumns() (err error) {
|
||||||
|
log.Infof("Examining table structure on applier")
|
||||||
|
this.migrationContext.OriginalTableColumnsOnApplier, err = mysql.GetTableColumns(this.db, this.migrationContext.DatabaseName, this.migrationContext.OriginalTableName)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// showTableStatus returns the output of `show table status like '...'` command
|
// showTableStatus returns the output of `show table status like '...'` command
|
||||||
func (this *Applier) showTableStatus(tableName string) (rowMap sqlutils.RowMap) {
|
func (this *Applier) showTableStatus(tableName string) (rowMap sqlutils.RowMap) {
|
||||||
rowMap = nil
|
rowMap = nil
|
||||||
|
@ -8,6 +8,7 @@ package logic
|
|||||||
import (
|
import (
|
||||||
gosql "database/sql"
|
gosql "database/sql"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"reflect"
|
||||||
"strings"
|
"strings"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
|
|
||||||
@ -83,7 +84,7 @@ func (this *Inspector) InspectTableColumnsAndUniqueKeys(tableName string) (colum
|
|||||||
if len(uniqueKeys) == 0 {
|
if len(uniqueKeys) == 0 {
|
||||||
return columns, uniqueKeys, fmt.Errorf("No PRIMARY nor UNIQUE key found in table! Bailing out")
|
return columns, uniqueKeys, fmt.Errorf("No PRIMARY nor UNIQUE key found in table! Bailing out")
|
||||||
}
|
}
|
||||||
columns, err = this.getTableColumns(this.migrationContext.DatabaseName, tableName)
|
columns, err = mysql.GetTableColumns(this.db, this.migrationContext.DatabaseName, tableName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return columns, uniqueKeys, err
|
return columns, uniqueKeys, err
|
||||||
}
|
}
|
||||||
@ -99,9 +100,15 @@ func (this *Inspector) InspectOriginalTable() (err error) {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// InspectOriginalAndGhostTables compares original and ghost tables to see whether the migration
|
// inspectOriginalAndGhostTables compares original and ghost tables to see whether the migration
|
||||||
// makes sense and is valid. It extracts the list of shared columns and the chosen migration unique key
|
// makes sense and is valid. It extracts the list of shared columns and the chosen migration unique key
|
||||||
func (this *Inspector) InspectOriginalAndGhostTables() (err error) {
|
func (this *Inspector) inspectOriginalAndGhostTables() (err error) {
|
||||||
|
originalNamesOnApplier := this.migrationContext.OriginalTableColumnsOnApplier.Names()
|
||||||
|
originalNames := this.migrationContext.OriginalTableColumns.Names()
|
||||||
|
if !reflect.DeepEqual(originalNames, originalNamesOnApplier) {
|
||||||
|
return fmt.Errorf("It seems like table structure is not identical between master and replica. This scenario is not supported.")
|
||||||
|
}
|
||||||
|
|
||||||
this.migrationContext.GhostTableColumns, this.migrationContext.GhostTableUniqueKeys, err = this.InspectTableColumnsAndUniqueKeys(this.migrationContext.GetGhostTableName())
|
this.migrationContext.GhostTableColumns, this.migrationContext.GhostTableUniqueKeys, err = this.InspectTableColumnsAndUniqueKeys(this.migrationContext.GetGhostTableName())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@ -479,31 +486,6 @@ func (this *Inspector) CountTableRows() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// getTableColumns reads column list from given table
|
|
||||||
func (this *Inspector) getTableColumns(databaseName, tableName string) (*sql.ColumnList, error) {
|
|
||||||
query := fmt.Sprintf(`
|
|
||||||
show columns from %s.%s
|
|
||||||
`,
|
|
||||||
sql.EscapeName(databaseName),
|
|
||||||
sql.EscapeName(tableName),
|
|
||||||
)
|
|
||||||
columnNames := []string{}
|
|
||||||
err := sqlutils.QueryRowsMap(this.db, query, func(rowMap sqlutils.RowMap) error {
|
|
||||||
columnNames = append(columnNames, rowMap.GetString("Field"))
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
if len(columnNames) == 0 {
|
|
||||||
return nil, log.Errorf("Found 0 columns on %s.%s. Bailing out",
|
|
||||||
sql.EscapeName(databaseName),
|
|
||||||
sql.EscapeName(tableName),
|
|
||||||
)
|
|
||||||
}
|
|
||||||
return sql.NewColumnList(columnNames), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// applyColumnTypes
|
// applyColumnTypes
|
||||||
func (this *Inspector) applyColumnTypes(databaseName, tableName string, columnsLists ...*sql.ColumnList) error {
|
func (this *Inspector) applyColumnTypes(databaseName, tableName string, columnsLists ...*sql.ColumnList) error {
|
||||||
query := `
|
query := `
|
||||||
|
@ -26,7 +26,7 @@ import (
|
|||||||
type ChangelogState string
|
type ChangelogState string
|
||||||
|
|
||||||
const (
|
const (
|
||||||
TablesInPlace ChangelogState = "TablesInPlace"
|
GhostTableMigrated ChangelogState = "GhostTableMigrated"
|
||||||
AllEventsUpToLockProcessed = "AllEventsUpToLockProcessed"
|
AllEventsUpToLockProcessed = "AllEventsUpToLockProcessed"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -58,7 +58,7 @@ type Migrator struct {
|
|||||||
migrationContext *base.MigrationContext
|
migrationContext *base.MigrationContext
|
||||||
|
|
||||||
firstThrottlingCollected chan bool
|
firstThrottlingCollected chan bool
|
||||||
tablesInPlace chan bool
|
ghostTableMigrated chan bool
|
||||||
rowCopyComplete chan bool
|
rowCopyComplete chan bool
|
||||||
allEventsUpToLockProcessed chan bool
|
allEventsUpToLockProcessed chan bool
|
||||||
|
|
||||||
@ -76,7 +76,7 @@ func NewMigrator() *Migrator {
|
|||||||
migrator := &Migrator{
|
migrator := &Migrator{
|
||||||
migrationContext: base.GetMigrationContext(),
|
migrationContext: base.GetMigrationContext(),
|
||||||
parser: sql.NewParser(),
|
parser: sql.NewParser(),
|
||||||
tablesInPlace: make(chan bool),
|
ghostTableMigrated: make(chan bool),
|
||||||
firstThrottlingCollected: make(chan bool, 1),
|
firstThrottlingCollected: make(chan bool, 1),
|
||||||
rowCopyComplete: make(chan bool),
|
rowCopyComplete: make(chan bool),
|
||||||
allEventsUpToLockProcessed: make(chan bool),
|
allEventsUpToLockProcessed: make(chan bool),
|
||||||
@ -182,9 +182,9 @@ func (this *Migrator) onChangelogStateEvent(dmlEvent *binlog.BinlogDMLEvent) (er
|
|||||||
}
|
}
|
||||||
changelogState := ChangelogState(dmlEvent.NewColumnValues.StringColumn(3))
|
changelogState := ChangelogState(dmlEvent.NewColumnValues.StringColumn(3))
|
||||||
switch changelogState {
|
switch changelogState {
|
||||||
case TablesInPlace:
|
case GhostTableMigrated:
|
||||||
{
|
{
|
||||||
this.tablesInPlace <- true
|
this.ghostTableMigrated <- true
|
||||||
}
|
}
|
||||||
case AllEventsUpToLockProcessed:
|
case AllEventsUpToLockProcessed:
|
||||||
{
|
{
|
||||||
@ -291,14 +291,14 @@ func (this *Migrator) Migrate() (err error) {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Infof("Waiting for tables to be in place")
|
log.Infof("Waiting for ghost table to be migrated")
|
||||||
<-this.tablesInPlace
|
<-this.ghostTableMigrated
|
||||||
log.Debugf("Tables are in place")
|
log.Debugf("ghost table migrated")
|
||||||
// Yay! We now know the Ghost and Changelog tables are good to examine!
|
// Yay! We now know the Ghost and Changelog tables are good to examine!
|
||||||
// When running on replica, this means the replica has those tables. When running
|
// When running on replica, this means the replica has those tables. When running
|
||||||
// on master this is always true, of course, and yet it also implies this knowledge
|
// on master this is always true, of course, and yet it also implies this knowledge
|
||||||
// is in the binlogs.
|
// is in the binlogs.
|
||||||
if err := this.inspector.InspectOriginalAndGhostTables(); err != nil {
|
if err := this.inspector.inspectOriginalAndGhostTables(); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
// Validation complete! We're good to execute this migration
|
// Validation complete! We're good to execute this migration
|
||||||
@ -926,12 +926,13 @@ func (this *Migrator) initiateApplier() error {
|
|||||||
log.Errorf("Unable to create ghost table, see further error details. Perhaps a previous migration failed without dropping the table? Bailing out")
|
log.Errorf("Unable to create ghost table, see further error details. Perhaps a previous migration failed without dropping the table? Bailing out")
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := this.applier.AlterGhost(); err != nil {
|
if err := this.applier.AlterGhost(); err != nil {
|
||||||
log.Errorf("Unable to ALTER ghost table, see further error details. Bailing out")
|
log.Errorf("Unable to ALTER ghost table, see further error details. Bailing out")
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
this.applier.WriteChangelogState(string(TablesInPlace))
|
this.applier.WriteChangelogState(string(GhostTableMigrated))
|
||||||
go this.applier.InitiateHeartbeat()
|
go this.applier.InitiateHeartbeat()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -10,6 +10,8 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/github/gh-ost/go/sql"
|
||||||
|
|
||||||
"github.com/outbrain/golib/log"
|
"github.com/outbrain/golib/log"
|
||||||
"github.com/outbrain/golib/sqlutils"
|
"github.com/outbrain/golib/sqlutils"
|
||||||
)
|
)
|
||||||
@ -149,3 +151,28 @@ func GetInstanceKey(db *gosql.DB) (instanceKey *InstanceKey, err error) {
|
|||||||
err = db.QueryRow(`select @@global.hostname, @@global.port`).Scan(&instanceKey.Hostname, &instanceKey.Port)
|
err = db.QueryRow(`select @@global.hostname, @@global.port`).Scan(&instanceKey.Hostname, &instanceKey.Port)
|
||||||
return instanceKey, err
|
return instanceKey, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetTableColumns reads column list from given table
|
||||||
|
func GetTableColumns(db *gosql.DB, databaseName, tableName string) (*sql.ColumnList, error) {
|
||||||
|
query := fmt.Sprintf(`
|
||||||
|
show columns from %s.%s
|
||||||
|
`,
|
||||||
|
sql.EscapeName(databaseName),
|
||||||
|
sql.EscapeName(tableName),
|
||||||
|
)
|
||||||
|
columnNames := []string{}
|
||||||
|
err := sqlutils.QueryRowsMap(db, query, func(rowMap sqlutils.RowMap) error {
|
||||||
|
columnNames = append(columnNames, rowMap.GetString("Field"))
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if len(columnNames) == 0 {
|
||||||
|
return nil, log.Errorf("Found 0 columns on %s.%s. Bailing out",
|
||||||
|
sql.EscapeName(databaseName),
|
||||||
|
sql.EscapeName(tableName),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
return sql.NewColumnList(columnNames), nil
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user