WIP
This commit is contained in:
parent
3c6abf9290
commit
8c3938cd05
@ -81,7 +81,7 @@ func main() {
|
||||
flag.BoolVar(&migrationContext.AliyunRDS, "aliyun-rds", false, "set to 'true' when you execute on Aliyun RDS.")
|
||||
flag.BoolVar(&migrationContext.GoogleCloudPlatform, "gcp", false, "set to 'true' when you execute on a 1st generation Google Cloud Platform (GCP).")
|
||||
flag.BoolVar(&migrationContext.AzureMySQL, "azure", false, "set to 'true' when you execute on Azure Database on MySQL.")
|
||||
flag.BoolVar(&migrationContext.UseGTIDs, "gtid", false, "set to 'true' to enable MySQL GTIDs for replication binlog positioning.")
|
||||
flag.BoolVar(&migrationContext.UseGTIDs, "gtid", false, "(experimental) set to 'true' to use MySQL GTIDs for binlog positioning.")
|
||||
|
||||
executeFlag := flag.Bool("execute", false, "actually execute the alter & migrate the table. Default is noop: do some tests and exit")
|
||||
flag.BoolVar(&migrationContext.TestOnReplica, "test-on-replica", false, "Have the migration run on a replica, not on the master. At the end of migration replication is stopped, and tables are swapped and immediately swap-revert. Replication remains stopped and you can compare the two tables for building trust")
|
||||
|
@ -702,7 +702,7 @@ func (this *Applier) StopReplication() error {
|
||||
return err
|
||||
}
|
||||
|
||||
readBinlogCoordinates, executeBinlogCoordinates, err := mysql.GetReplicationBinlogCoordinates(this.db)
|
||||
readBinlogCoordinates, executeBinlogCoordinates, err := mysql.GetReplicationBinlogCoordinates(this.db, this.migrationContext.UseGTIDs)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -6,8 +6,9 @@
|
||||
|
||||
package mysql
|
||||
|
||||
import "errors"
|
||||
|
||||
type BinlogCoordinates interface {
|
||||
Name() string
|
||||
String() string
|
||||
DisplayString() string
|
||||
IsEmpty() bool
|
||||
@ -15,3 +16,20 @@ type BinlogCoordinates interface {
|
||||
SmallerThan(other BinlogCoordinates) bool
|
||||
SmallerThanOrEquals(other BinlogCoordinates) bool
|
||||
}
|
||||
|
||||
func binlogCoordinatesToImplementation(in BinlogCoordinates, out interface{}) (err error) {
|
||||
var ok bool
|
||||
switch out.(type) {
|
||||
case *FileBinlogCoordinates:
|
||||
out, ok = in.(*FileBinlogCoordinates)
|
||||
case *GTIDBinlogCoordinates:
|
||||
out, ok = in.(*GTIDBinlogCoordinates)
|
||||
default:
|
||||
err = errors.New("unrecognized BinlogCoordinates implementation")
|
||||
}
|
||||
|
||||
if !ok {
|
||||
err = errors.New("failed to reflect BinlogCoordinates implementation")
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
@ -20,18 +20,17 @@ func init() {
|
||||
detachPattern, _ = regexp.Compile(`//([^/:]+):([\d]+)`) // e.g. `//binlog.01234:567890`
|
||||
}
|
||||
|
||||
type BinlogType int
|
||||
|
||||
const (
|
||||
BinaryLog BinlogType = iota
|
||||
RelayLog
|
||||
)
|
||||
|
||||
// FileBinlogCoordinates described binary log coordinates in the form of a binlog file & log position.
|
||||
type FileBinlogCoordinates struct {
|
||||
LogFile string
|
||||
LogPos int64
|
||||
Type BinlogType
|
||||
}
|
||||
|
||||
func NewFileBinlogCoordinates(logFile string, logPos int64) *FileBinlogCoordinates {
|
||||
return &FileBinlogCoordinates{
|
||||
LogFile: logFile,
|
||||
LogPos: logPos,
|
||||
}
|
||||
}
|
||||
|
||||
// ParseFileBinlogCoordinates parses a log file/position string into a *BinlogCoordinates struct.
|
||||
@ -48,11 +47,6 @@ func ParseFileBinlogCoordinates(logFileLogPos string) (*FileBinlogCoordinates, e
|
||||
}
|
||||
}
|
||||
|
||||
// Name returns the name of the BinlogCoordinates interface implementation
|
||||
func (this *FileBinlogCoordinates) Name() string {
|
||||
return "file"
|
||||
}
|
||||
|
||||
// DisplayString returns a user-friendly string representation of these coordinates
|
||||
func (this *FileBinlogCoordinates) DisplayString() string {
|
||||
return fmt.Sprintf("%s:%d", this.LogFile, this.LogPos)
|
||||
@ -69,7 +63,7 @@ func (this *FileBinlogCoordinates) Equals(other BinlogCoordinates) bool {
|
||||
if !ok || other == nil {
|
||||
return false
|
||||
}
|
||||
return this.LogFile == coord.LogFile && this.LogPos == coord.LogPos && this.Type == coord.Type
|
||||
return this.LogFile == coord.LogFile && this.LogPos == coord.LogPos
|
||||
}
|
||||
|
||||
// IsEmpty returns true if the log file is empty, unnamed
|
||||
@ -137,7 +131,7 @@ func (this *FileBinlogCoordinates) FileNumber() (int, int) {
|
||||
|
||||
// PreviousFileCoordinatesBy guesses the filename of the previous binlog/relaylog, by given offset (number of files back)
|
||||
func (this *FileBinlogCoordinates) PreviousFileCoordinatesBy(offset int) (BinlogCoordinates, error) {
|
||||
result := &FileBinlogCoordinates{LogPos: 0, Type: this.Type}
|
||||
result := &FileBinlogCoordinates{}
|
||||
|
||||
fileNum, numLen := this.FileNumber()
|
||||
if fileNum == 0 {
|
||||
@ -159,7 +153,7 @@ func (this *FileBinlogCoordinates) PreviousFileCoordinates() (BinlogCoordinates,
|
||||
|
||||
// PreviousFileCoordinates guesses the filename of the previous binlog/relaylog
|
||||
func (this *FileBinlogCoordinates) NextFileCoordinates() (BinlogCoordinates, error) {
|
||||
result := &FileBinlogCoordinates{LogPos: 0, Type: this.Type}
|
||||
result := &FileBinlogCoordinates{}
|
||||
|
||||
fileNum, numLen := this.FileNumber()
|
||||
newNumStr := fmt.Sprintf("%d", (fileNum + 1))
|
||||
|
@ -6,79 +6,83 @@
|
||||
package mysql
|
||||
|
||||
import (
|
||||
"errors"
|
||||
|
||||
gomysql "github.com/go-mysql-org/go-mysql/mysql"
|
||||
)
|
||||
|
||||
// GTIDBinlogCoordinates described binary log coordinates in the form of a binlog file & log position.
|
||||
// GTIDBinlogCoordinates describe binary log coordinates in MySQL GTID format.
|
||||
type GTIDBinlogCoordinates struct {
|
||||
Set *gomysql.MysqlGTIDSet
|
||||
GTIDSet *gomysql.MysqlGTIDSet
|
||||
UUIDSet *gomysql.UUIDSet
|
||||
}
|
||||
|
||||
// ParseGTIDSetBinlogCoordinates parses a MySQL GTID set into a *GTIDBinlogCoordinates struct.
|
||||
func ParseGTIDSetBinlogCoordinates(gtidSet string) (*GTIDBinlogCoordinates, error) {
|
||||
// NewGTIDBinlogCoordinates parses a MySQL GTID set into a *GTIDBinlogCoordinates struct.
|
||||
func NewGTIDBinlogCoordinates(gtidSet string) (*GTIDBinlogCoordinates, error) {
|
||||
set, err := gomysql.ParseMysqlGTIDSet(gtidSet)
|
||||
return >IDBinlogCoordinates{set.(*gomysql.MysqlGTIDSet)}, err
|
||||
return >IDBinlogCoordinates{
|
||||
GTIDSet: set.(*gomysql.MysqlGTIDSet),
|
||||
}, err
|
||||
}
|
||||
|
||||
// DisplayString returns a user-friendly string representation of these coordinates
|
||||
// DisplayString returns a user-friendly string representation of these current UUID set or the full GTID set.
|
||||
func (this *GTIDBinlogCoordinates) DisplayString() string {
|
||||
return this.Set.String()
|
||||
if this.UUIDSet != nil {
|
||||
return this.UUIDSet.String()
|
||||
}
|
||||
return this.String()
|
||||
}
|
||||
|
||||
// String returns a user-friendly string representation of these coordinates
|
||||
// String returns a user-friendly string representation of these full GTID set.
|
||||
func (this GTIDBinlogCoordinates) String() string {
|
||||
return this.DisplayString()
|
||||
return this.GTIDSet.String()
|
||||
}
|
||||
|
||||
// Equals tests equality of this coordinate and another one.
|
||||
func (this *GTIDBinlogCoordinates) Equals(other *GTIDBinlogCoordinates) bool {
|
||||
if other == nil {
|
||||
func (this *GTIDBinlogCoordinates) Equals(other BinlogCoordinates) bool {
|
||||
if other == nil || this.IsEmpty() || other.IsEmpty() {
|
||||
return false
|
||||
}
|
||||
return other.Set != nil && this.Set.Equal(other.Set)
|
||||
|
||||
otherBinlogCoordinates := >IDBinlogCoordinates{}
|
||||
if err := binlogCoordinatesToImplementation(other, otherBinlogCoordinates); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
return this.GTIDSet.Equal(otherBinlogCoordinates.GTIDSet)
|
||||
}
|
||||
|
||||
// IsEmpty returns true if the GTID set is empty, unnamed
|
||||
// IsEmpty returns true if the GTID set is empty.
|
||||
func (this *GTIDBinlogCoordinates) IsEmpty() bool {
|
||||
return this.Set == nil
|
||||
return this.GTIDSet == nil
|
||||
}
|
||||
|
||||
// SmallerThan returns true if this coordinate is strictly smaller than the other.
|
||||
func (this *GTIDBinlogCoordinates) SmallerThan(other *GTIDBinlogCoordinates) bool {
|
||||
// if GTID SIDs are equal we compare the interval stop points
|
||||
// if GTID SIDs differ we have to assume there is a new/larger event
|
||||
if other.Set == nil || other.Set.Sets == nil {
|
||||
return false
|
||||
func (this *GTIDBinlogCoordinates) SmallerThan(other BinlogCoordinates) bool {
|
||||
otherBinlogCoordinates := >IDBinlogCoordinates{}
|
||||
if err := binlogCoordinatesToImplementation(other, otherBinlogCoordinates); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
if len(this.Set.Sets) < len(other.Set.Sets) {
|
||||
return true
|
||||
}
|
||||
for sid, otherSet := range other.Set.Sets {
|
||||
thisSet, ok := this.Set.Sets[sid]
|
||||
if !ok {
|
||||
return true // 'this' is missing an SID
|
||||
}
|
||||
if len(thisSet.Intervals) < len(otherSet.Intervals) {
|
||||
return true // 'this' has fewer intervals
|
||||
}
|
||||
for i, otherInterval := range otherSet.Intervals {
|
||||
if len(thisSet.Intervals)-1 > i {
|
||||
return true
|
||||
}
|
||||
thisInterval := thisSet.Intervals[i]
|
||||
if thisInterval.Start < otherInterval.Start || thisInterval.Stop < otherInterval.Stop {
|
||||
return true
|
||||
}
|
||||
}
|
||||
}
|
||||
return false
|
||||
|
||||
// if 'this' does not contain the same sets we assume we are behind 'other'.
|
||||
// there are probably edge cases where this isn't true
|
||||
return !this.GTIDSet.Contain(other.GTIDSet)
|
||||
}
|
||||
|
||||
// SmallerThanOrEquals returns true if this coordinate is the same or equal to the other one.
|
||||
// We do NOT compare the type so we can not use this.Equals()
|
||||
func (this *GTIDBinlogCoordinates) SmallerThanOrEquals(other *GTIDBinlogCoordinates) bool {
|
||||
if this.SmallerThan(other) {
|
||||
return true
|
||||
}
|
||||
return other.Set != nil && this.Set.Equal(other.Set)
|
||||
func (this *GTIDBinlogCoordinates) SmallerThanOrEquals(other BinlogCoordinates) bool {
|
||||
return this.Equals(other) || this.SmallerThan(other)
|
||||
}
|
||||
|
||||
func (this *GTIDBinlogCoordinates) Update(update interface{}) error {
|
||||
switch u := update.(type) {
|
||||
case *gomysql.UUIDSet:
|
||||
this.GTIDSet.AddSet(u)
|
||||
this.UUIDSet = u
|
||||
case *gomysql.MysqlGTIDSet:
|
||||
this.GTIDSet = u
|
||||
default:
|
||||
return errors.New("unsupported update")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
36
go/mysql/binlog_test.go
Normal file
36
go/mysql/binlog_test.go
Normal file
@ -0,0 +1,36 @@
|
||||
/*
|
||||
Copyright 2022 GitHub Inc.
|
||||
See https://github.com/github/gh-ost/blob/master/LICENSE
|
||||
*/
|
||||
|
||||
package mysql
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/openark/golib/log"
|
||||
test "github.com/openark/golib/tests"
|
||||
)
|
||||
|
||||
func init() {
|
||||
log.SetLevel(log.ERROR)
|
||||
}
|
||||
|
||||
func TestBinlogCoordinatesToImplementation(t *testing.T) {
|
||||
test.S(t).ExpectNil(binlogCoordinatesToImplementation(
|
||||
&FileBinlogCoordinates{},
|
||||
&FileBinlogCoordinates{},
|
||||
))
|
||||
test.S(t).ExpectNil(binlogCoordinatesToImplementation(
|
||||
>IDBinlogCoordinates{},
|
||||
>IDBinlogCoordinates{},
|
||||
))
|
||||
test.S(t).ExpectNotNil(binlogCoordinatesToImplementation(
|
||||
&FileBinlogCoordinates{},
|
||||
>IDBinlogCoordinates{},
|
||||
))
|
||||
test.S(t).ExpectNotNil(binlogCoordinatesToImplementation(
|
||||
&FileBinlogCoordinates{},
|
||||
map[string]string{},
|
||||
))
|
||||
}
|
@ -14,7 +14,6 @@ import (
|
||||
|
||||
"github.com/github/gh-ost/go/sql"
|
||||
|
||||
gomysql "github.com/go-mysql-org/go-mysql/mysql"
|
||||
"github.com/openark/golib/log"
|
||||
"github.com/openark/golib/sqlutils"
|
||||
)
|
||||
@ -143,33 +142,41 @@ func GetMasterConnectionConfigSafe(connectionConfig *ConnectionConfig, visitedKe
|
||||
return GetMasterConnectionConfigSafe(masterConfig, visitedKeys, allowMasterMaster)
|
||||
}
|
||||
|
||||
func GetReplicationBinlogCoordinates(db *gosql.DB) (readBinlogCoordinates *BinlogCoordinates, executeBinlogCoordinates *BinlogCoordinates, err error) {
|
||||
func GetReplicationBinlogCoordinates(db *gosql.DB, gtid bool) (readBinlogCoordinates, executeBinlogCoordinates BinlogCoordinates, err error) {
|
||||
err = sqlutils.QueryRowsMap(db, `show slave status`, func(m sqlutils.RowMap) error {
|
||||
readBinlogCoordinates = &BinlogCoordinates{
|
||||
LogFile: m.GetString("Master_Log_File"),
|
||||
LogPos: m.GetInt64("Read_Master_Log_Pos"),
|
||||
}
|
||||
executeBinlogCoordinates = &BinlogCoordinates{
|
||||
LogFile: m.GetString("Relay_Master_Log_File"),
|
||||
LogPos: m.GetInt64("Exec_Master_Log_Pos"),
|
||||
if gtid {
|
||||
executeBinlogCoordinates, err = NewGTIDBinlogCoordinates(m.GetString("Executed_Gtid_Set"))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
readBinlogCoordinates, err = NewGTIDBinlogCoordinates(m.GetString("Retrieved_Gtid_Set"))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
readBinlogCoordinates = NewFileBinlogCoordinates(
|
||||
m.GetString("Master_Log_File"),
|
||||
m.GetInt64("Read_Master_Log_Pos"),
|
||||
)
|
||||
executeBinlogCoordinates = NewFileBinlogCoordinates(
|
||||
m.GetString("Relay_Master_Log_File"),
|
||||
m.GetInt64("Exec_Master_Log_Pos"),
|
||||
)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
return readBinlogCoordinates, executeBinlogCoordinates, err
|
||||
}
|
||||
|
||||
func GetSelfBinlogCoordinates(db *gosql.DB) (selfBinlogCoordinates *BinlogCoordinates, err error) {
|
||||
func GetSelfBinlogCoordinates(db *gosql.DB, gtid bool) (selfBinlogCoordinates BinlogCoordinates, err error) {
|
||||
err = sqlutils.QueryRowsMap(db, `show master status`, func(m sqlutils.RowMap) error {
|
||||
selfBinlogCoordinates = &BinlogCoordinates{
|
||||
LogFile: m.GetString("File"),
|
||||
LogPos: m.GetInt64("Position"),
|
||||
}
|
||||
if execGtidSet := m.GetString("Executed_Gtid_Set"); execGtidSet != "" {
|
||||
gtidSet, err := gomysql.ParseMysqlGTIDSet(execGtidSet)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
selfBinlogCoordinates.GTIDSet = gtidSet.(*gomysql.MysqlGTIDSet)
|
||||
if gtid {
|
||||
selfBinlogCoordinates, err = NewGTIDBinlogCoordinates(m.GetString("Executed_Gtid_Set"))
|
||||
} else {
|
||||
selfBinlogCoordinates = NewFileBinlogCoordinates(
|
||||
m.GetString("File"),
|
||||
m.GetInt64("Position"),
|
||||
)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
Loading…
Reference in New Issue
Block a user