- Creating an populating Changelog table

- Using heartbeat
- Throttling works based on heartbeat
- Refactored binlog_reader stuff. Now streaming events (into golang channel, which makes for nice buffering and throttling)
- Binlog table listeners work
- More Migrator logic; existing logic for waiting on `state` events (e.g. `TablesCreatedState`)
This commit is contained in:
Shlomi Noach 2016-04-07 15:57:12 +02:00
parent 4dd5a93ed7
commit 0e7b23e6fe
11 changed files with 417 additions and 80 deletions

View File

@ -7,6 +7,7 @@ package base
import (
"fmt"
"time"
"github.com/github/gh-osc/go/mysql"
"github.com/github/gh-osc/go/sql"
@ -21,28 +22,39 @@ const (
CountRowsEstimate = "CountRowsEstimate"
)
const (
maxRetries = 10
)
// MigrationContext has the general, global state of migration. It is used by
// all components throughout the migration process.
type MigrationContext struct {
DatabaseName string
OriginalTableName string
AlterStatement string
TableEngine string
CountTableRows bool
RowsEstimate int64
UsedRowsEstimateMethod RowsEstimateMethod
ChunkSize int
OriginalBinlogFormat string
OriginalBinlogRowImage string
AllowedRunningOnMaster bool
InspectorConnectionConfig *mysql.ConnectionConfig
MasterConnectionConfig *mysql.ConnectionConfig
MigrationRangeMinValues *sql.ColumnValues
MigrationRangeMaxValues *sql.ColumnValues
Iteration int64
MigrationIterationRangeMinValues *sql.ColumnValues
MigrationIterationRangeMaxValues *sql.ColumnValues
UniqueKey *sql.UniqueKey
DatabaseName string
OriginalTableName string
AlterStatement string
TableEngine string
CountTableRows bool
RowsEstimate int64
UsedRowsEstimateMethod RowsEstimateMethod
ChunkSize int
OriginalBinlogFormat string
OriginalBinlogRowImage string
AllowedRunningOnMaster bool
InspectorConnectionConfig *mysql.ConnectionConfig
MasterConnectionConfig *mysql.ConnectionConfig
MigrationRangeMinValues *sql.ColumnValues
MigrationRangeMaxValues *sql.ColumnValues
Iteration int64
MigrationIterationRangeMinValues *sql.ColumnValues
MigrationIterationRangeMaxValues *sql.ColumnValues
UniqueKey *sql.UniqueKey
StartTime time.Time
RowCopyStartTime time.Time
CurrentLag int64
MaxLagMillisecondsThrottleThreshold int64
IsThrottled func() bool
CanStopStreaming func() bool
}
var context *MigrationContext
@ -53,9 +65,10 @@ func init() {
func newMigrationContext() *MigrationContext {
return &MigrationContext{
ChunkSize: 1000,
InspectorConnectionConfig: mysql.NewConnectionConfig(),
MasterConnectionConfig: mysql.NewConnectionConfig(),
ChunkSize: 1000,
InspectorConnectionConfig: mysql.NewConnectionConfig(),
MasterConnectionConfig: mysql.NewConnectionConfig(),
MaxLagMillisecondsThrottleThreshold: 1000,
}
}
@ -69,6 +82,11 @@ func (this *MigrationContext) GetGhostTableName() string {
return fmt.Sprintf("_%s_New", this.OriginalTableName)
}
// GetChangelogTableName generates the name of changelog table, based on original table name
func (this *MigrationContext) GetChangelogTableName() string {
return fmt.Sprintf("_%s_OSC", this.OriginalTableName)
}
// RequiresBinlogFormatChange is `true` when the original binlog format isn't `ROW`
func (this *MigrationContext) RequiresBinlogFormatChange() bool {
return this.OriginalBinlogFormat != "ROW"
@ -85,3 +103,7 @@ func (this *MigrationContext) IsRunningOnMaster() bool {
func (this *MigrationContext) HasMigrationRange() bool {
return this.MigrationRangeMinValues != nil && this.MigrationRangeMaxValues != nil
}
func (this *MigrationContext) MaxRetries() int {
return maxRetries
}

View File

@ -15,7 +15,7 @@ type BinlogEntry struct {
Coordinates mysql.BinlogCoordinates
EndLogPos uint64
dmlEvent *BinlogDMLEvent
DmlEvent *BinlogDMLEvent
}
// NewBinlogEntry creates an empty, ready to go BinlogEntry object
@ -43,5 +43,5 @@ func (this *BinlogEntry) Duplicate() *BinlogEntry {
// Duplicate creates and returns a new binlog entry, with some of the attributes pre-assigned
func (this *BinlogEntry) String() string {
return fmt.Sprintf("[BinlogEntry at %+v; dml:%+v]", this.Coordinates, this.dmlEvent)
return fmt.Sprintf("[BinlogEntry at %+v; dml:%+v]", this.Coordinates, this.DmlEvent)
}

View File

@ -8,5 +8,5 @@ package binlog
// BinlogReader is a general interface whose implementations can choose their methods of reading
// a binary log file and parsing it into binlog entries
type BinlogReader interface {
ReadEntries(logFile string, startPos uint64, stopPos uint64) (entries [](*BinlogEntry), err error)
StreamEvents(canStopStreaming func() bool, entriesChannel chan<- *BinlogEntry) error
}

View File

@ -7,7 +7,6 @@ package binlog
import (
"fmt"
"strings"
"github.com/github/gh-osc/go/mysql"
"github.com/github/gh-osc/go/sql"
@ -26,6 +25,7 @@ const (
type GoMySQLReader struct {
connectionConfig *mysql.ConnectionConfig
binlogSyncer *replication.BinlogSyncer
binlogStreamer *replication.BinlogStreamer
tableMap map[uint64]string
currentCoordinates mysql.BinlogCoordinates
}
@ -35,6 +35,7 @@ func NewGoMySQLReader(connectionConfig *mysql.ConnectionConfig) (binlogReader *G
connectionConfig: connectionConfig,
tableMap: make(map[uint64]string),
currentCoordinates: mysql.BinlogCoordinates{},
binlogStreamer: nil,
}
binlogReader.binlogSyncer = replication.NewBinlogSyncer(serverId, "mysql")
@ -47,18 +48,77 @@ func NewGoMySQLReader(connectionConfig *mysql.ConnectionConfig) (binlogReader *G
return binlogReader, err
}
func (this *GoMySQLReader) isDMLEvent(event *replication.BinlogEvent) bool {
eventType := event.Header.EventType.String()
if strings.HasPrefix(eventType, "WriteRows") {
return true
// ConnectBinlogStreamer
func (this *GoMySQLReader) ConnectBinlogStreamer(coordinates mysql.BinlogCoordinates) (err error) {
this.currentCoordinates = coordinates
// Start sync with sepcified binlog file and position
this.binlogStreamer, err = this.binlogSyncer.StartSync(gomysql.Position{coordinates.LogFile, uint32(coordinates.LogPos)})
return err
}
// StreamEvents
func (this *GoMySQLReader) StreamEvents(canStopStreaming func() bool, entriesChannel chan<- *BinlogEntry) error {
for {
if canStopStreaming() {
break
}
ev, err := this.binlogStreamer.GetEvent()
if err != nil {
return err
}
this.currentCoordinates.LogPos = int64(ev.Header.LogPos)
if rotateEvent, ok := ev.Event.(*replication.RotateEvent); ok {
this.currentCoordinates.LogFile = string(rotateEvent.NextLogName)
log.Infof("rotate to next log name: %s", rotateEvent.NextLogName)
} else if tableMapEvent, ok := ev.Event.(*replication.TableMapEvent); ok {
// Actually not being used, since Table is available in RowsEvent.
// Keeping this here in case I'm wrong about this. Sometime in the near
// future I should remove this.
this.tableMap[tableMapEvent.TableID] = string(tableMapEvent.Table)
} else if rowsEvent, ok := ev.Event.(*replication.RowsEvent); ok {
dml := ToEventDML(ev.Header.EventType.String())
if dml == NotDML {
return fmt.Errorf("Unknown DML type: %s", ev.Header.EventType.String())
}
for i, row := range rowsEvent.Rows {
if dml == UpdateDML && i%2 == 1 {
// An update has two rows (WHERE+SET)
// We do both at the same time
continue
}
binlogEntry := NewBinlogEntryAt(this.currentCoordinates)
binlogEntry.DmlEvent = NewBinlogDMLEvent(
string(rowsEvent.Table.Schema),
string(rowsEvent.Table.Table),
dml,
)
switch dml {
case InsertDML:
{
binlogEntry.DmlEvent.NewColumnValues = sql.ToColumnValues(row)
}
case UpdateDML:
{
binlogEntry.DmlEvent.WhereColumnValues = sql.ToColumnValues(row)
binlogEntry.DmlEvent.NewColumnValues = sql.ToColumnValues(rowsEvent.Rows[i+1])
}
case DeleteDML:
{
binlogEntry.DmlEvent.WhereColumnValues = sql.ToColumnValues(row)
}
}
// The channel will do the throttling. Whoever is reding from the channel
// decides whether action is taken sycnhronously (meaning we wait before
// next iteration) or asynchronously (we keep pushing more events)
// In reality, reads will be synchronous
entriesChannel <- binlogEntry
}
}
}
if strings.HasPrefix(eventType, "UpdateRows") {
return true
}
if strings.HasPrefix(eventType, "DeleteRows") {
return true
}
return false
log.Debugf("done streaming events")
return nil
}
// ReadEntries will read binlog entries from parsed text output of `mysqlbinlog` utility
@ -76,7 +136,6 @@ func (this *GoMySQLReader) ReadEntries(logFile string, startPos uint64, stopPos
return entries, err
}
this.currentCoordinates.LogPos = int64(ev.Header.LogPos)
log.Infof("at: %+v", this.currentCoordinates)
if rotateEvent, ok := ev.Event.(*replication.RotateEvent); ok {
this.currentCoordinates.LogFile = string(rotateEvent.NextLogName)
log.Infof("rotate to next log name: %s", rotateEvent.NextLogName)
@ -97,7 +156,7 @@ func (this *GoMySQLReader) ReadEntries(logFile string, startPos uint64, stopPos
continue
}
binlogEntry := NewBinlogEntryAt(this.currentCoordinates)
binlogEntry.dmlEvent = NewBinlogDMLEvent(
binlogEntry.DmlEvent = NewBinlogDMLEvent(
string(rowsEvent.Table.Schema),
string(rowsEvent.Table.Table),
dml,
@ -105,19 +164,16 @@ func (this *GoMySQLReader) ReadEntries(logFile string, startPos uint64, stopPos
switch dml {
case InsertDML:
{
binlogEntry.dmlEvent.NewColumnValues = sql.ToColumnValues(row)
log.Debugf("insert: %+v", binlogEntry.dmlEvent.NewColumnValues)
binlogEntry.DmlEvent.NewColumnValues = sql.ToColumnValues(row)
}
case UpdateDML:
{
binlogEntry.dmlEvent.WhereColumnValues = sql.ToColumnValues(row)
binlogEntry.dmlEvent.NewColumnValues = sql.ToColumnValues(rowsEvent.Rows[i+1])
log.Debugf("update: %+v where %+v", binlogEntry.dmlEvent.NewColumnValues, binlogEntry.dmlEvent.WhereColumnValues)
binlogEntry.DmlEvent.WhereColumnValues = sql.ToColumnValues(row)
binlogEntry.DmlEvent.NewColumnValues = sql.ToColumnValues(rowsEvent.Rows[i+1])
}
case DeleteDML:
{
binlogEntry.dmlEvent.WhereColumnValues = sql.ToColumnValues(row)
log.Debugf("delete: %+v", binlogEntry.dmlEvent.WhereColumnValues)
binlogEntry.DmlEvent.WhereColumnValues = sql.ToColumnValues(row)
}
}
}

View File

@ -103,7 +103,7 @@ func searchForStartPosOrStatement(scanner *bufio.Scanner, binlogEntry *BinlogEnt
return InvalidState, binlogEntry, fmt.Errorf("Expected startLogPos %+v to equal previous endLogPos %+v", startLogPos, previousEndLogPos)
}
nextBinlogEntry = binlogEntry
if binlogEntry.Coordinates.LogPos != 0 && binlogEntry.dmlEvent != nil {
if binlogEntry.Coordinates.LogPos != 0 && binlogEntry.DmlEvent != nil {
// Current entry is already a true entry, with startpos and with statement
nextBinlogEntry = NewBinlogEntry(binlogEntry.Coordinates.LogFile, startLogPos)
}
@ -112,11 +112,11 @@ func searchForStartPosOrStatement(scanner *bufio.Scanner, binlogEntry *BinlogEnt
onStatementEntry := func(submatch []string) (BinlogEntryState, *BinlogEntry, error) {
nextBinlogEntry = binlogEntry
if binlogEntry.Coordinates.LogPos != 0 && binlogEntry.dmlEvent != nil {
if binlogEntry.Coordinates.LogPos != 0 && binlogEntry.DmlEvent != nil {
// Current entry is already a true entry, with startpos and with statement
nextBinlogEntry = binlogEntry.Duplicate()
}
nextBinlogEntry.dmlEvent = NewBinlogDMLEvent(submatch[2], submatch[3], ToEventDML(submatch[1]))
nextBinlogEntry.DmlEvent = NewBinlogDMLEvent(submatch[2], submatch[3], ToEventDML(submatch[1]))
return ExpectTokenState, nextBinlogEntry, nil
}
@ -126,7 +126,7 @@ func searchForStartPosOrStatement(scanner *bufio.Scanner, binlogEntry *BinlogEnt
// onPositionalColumn := func(submatch []string) (BinlogEntryState, *BinlogEntry, error) {
// columnIndex, _ := strconv.ParseUint(submatch[1], 10, 64)
// if _, found := binlogEntry.PositionalColumns[columnIndex]; found {
// return InvalidState, binlogEntry, fmt.Errorf("Positional column %+v found more than once in %+v, statement=%+v", columnIndex, binlogEntry.LogPos, binlogEntry.dmlEvent.DML)
// return InvalidState, binlogEntry, fmt.Errorf("Positional column %+v found more than once in %+v, statement=%+v", columnIndex, binlogEntry.LogPos, binlogEntry.DmlEvent.DML)
// }
// columnValue := submatch[2]
// columnValue = strings.TrimPrefix(columnValue, "'")
@ -186,12 +186,12 @@ func parseEntries(scanner *bufio.Scanner, logFile string) (entries [](*BinlogEnt
if binlogEntry.Coordinates.LogPos == 0 {
return
}
if binlogEntry.dmlEvent == nil {
if binlogEntry.DmlEvent == nil {
return
}
entries = append(entries, binlogEntry)
log.Debugf("entry: %+v", *binlogEntry)
fmt.Println(fmt.Sprintf("%s `%s`.`%s`", binlogEntry.dmlEvent.DML, binlogEntry.dmlEvent.DatabaseName, binlogEntry.dmlEvent.TableName))
fmt.Println(fmt.Sprintf("%s `%s`.`%s`", binlogEntry.DmlEvent.DML, binlogEntry.DmlEvent.DatabaseName, binlogEntry.DmlEvent.TableName))
}
for scanner.Scan() {
switch state {

View File

@ -11,7 +11,6 @@ import (
"os"
"github.com/github/gh-osc/go/base"
"github.com/github/gh-osc/go/binlog"
"github.com/github/gh-osc/go/logic"
"github.com/outbrain/golib/log"
)
@ -79,17 +78,18 @@ func main() {
log.Info("starting gh-osc")
if *internalExperiment {
log.Debug("starting experiment")
var binlogReader binlog.BinlogReader
var err error
log.Debug("starting experiment with %+v", *binlogFile)
//binlogReader = binlog.NewMySQLBinlogReader(*mysqlBasedir, *mysqlDatadir)
binlogReader, err = binlog.NewGoMySQLReader(migrationContext.InspectorConnectionConfig)
if err != nil {
log.Fatale(err)
}
binlogReader.ReadEntries(*binlogFile, 0, 0)
return
// binlogReader, err := binlog.NewGoMySQLReader(migrationContext.InspectorConnectionConfig)
// if err != nil {
// log.Fatale(err)
// }
// if err := binlogReader.ConnectBinlogStreamer(mysql.BinlogCoordinates{LogFile: *binlogFile, LogPos: 0}); err != nil {
// log.Fatale(err)
// }
// binlogReader.StreamEvents(func() bool { return false })
// return
}
migrator := logic.NewMigrator()
err := migrator.Migrate()

View File

@ -8,6 +8,8 @@ package logic
import (
gosql "database/sql"
"fmt"
"time"
"github.com/github/gh-osc/go/base"
"github.com/github/gh-osc/go/mysql"
"github.com/github/gh-osc/go/sql"
@ -16,6 +18,10 @@ import (
"github.com/outbrain/golib/sqlutils"
)
const (
heartbeatIntervalSeconds = 1
)
// Applier reads data from the read-MySQL-server (typically a replica, but can be the master)
// It is used for gaining initial status and structure, and later also follow up on progress and changelog
type Applier struct {
@ -71,7 +77,7 @@ func (this *Applier) CreateGhostTable() error {
if _, err := sqlutils.ExecNoPrepare(this.db, query); err != nil {
return err
}
log.Infof("Table created")
log.Infof("Ghost table created")
return nil
}
@ -90,10 +96,113 @@ func (this *Applier) AlterGhost() error {
if _, err := sqlutils.ExecNoPrepare(this.db, query); err != nil {
return err
}
log.Infof("Table altered")
log.Infof("Ghost table altered")
return nil
}
// CreateChangelogTable creates the changelog table on the master
func (this *Applier) CreateChangelogTable() error {
query := fmt.Sprintf(`create /* gh-osc */ table %s.%s (
id int auto_increment,
last_update timestamp not null DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
hint varchar(64) charset ascii not null,
value varchar(64) charset ascii not null,
primary key(id),
unique key hint_uidx(hint)
) auto_increment=2
`,
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.GetChangelogTableName()),
)
log.Infof("Creating changelog table %s.%s",
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.GetChangelogTableName()),
)
if _, err := sqlutils.ExecNoPrepare(this.db, query); err != nil {
return err
}
log.Infof("Changelog table created")
return nil
}
// DropChangelogTable drops the changelog table on the master
func (this *Applier) DropChangelogTable() error {
query := fmt.Sprintf(`drop /* gh-osc */ table if exists %s.%s`,
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.GetChangelogTableName()),
)
log.Infof("Droppping changelog table %s.%s",
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.GetChangelogTableName()),
)
if _, err := sqlutils.ExecNoPrepare(this.db, query); err != nil {
return err
}
log.Infof("Changelog table dropped")
return nil
}
// WriteChangelog writes a value to the changelog table.
// It returns the hint as given, for convenience
func (this *Applier) WriteChangelog(hint, value string) (string, error) {
query := fmt.Sprintf(`
insert /* gh-osc */ into %s.%s
(id, hint, value)
values
(NULL, ?, ?)
on duplicate key update
last_update=NOW(),
value=VALUES(value)
`,
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.GetChangelogTableName()),
)
_, err := sqlutils.Exec(this.db, query, hint, value)
return hint, err
}
// InitiateHeartbeat creates a heartbeat cycle, writing to the changelog table.
// This is done asynchronously
func (this *Applier) InitiateHeartbeat() {
go func() {
numSuccessiveFailures := 0
query := fmt.Sprintf(`
insert /* gh-osc */ into %s.%s
(id, hint, value)
values
(1, 'heartbeat', ?)
on duplicate key update
last_update=NOW(),
value=VALUES(value)
`,
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.GetChangelogTableName()),
)
injectHeartbeat := func() error {
if _, err := sqlutils.ExecNoPrepare(this.db, query, time.Now().Format(time.RFC3339)); err != nil {
numSuccessiveFailures++
if numSuccessiveFailures > this.migrationContext.MaxRetries() {
return log.Errore(err)
}
} else {
numSuccessiveFailures = 0
}
return nil
}
injectHeartbeat()
heartbeatTick := time.Tick(time.Duration(heartbeatIntervalSeconds) * time.Second)
for range heartbeatTick {
// Generally speaking, we would issue a goroutine, but I'd actually rather
// have this blocked rather than spam the master in the event something
// goes wrong
if err := injectHeartbeat(); err != nil {
return
}
}
}()
}
// ReadMigrationMinValues
func (this *Applier) ReadMigrationMinValues(uniqueKey *sql.UniqueKey) error {
log.Debugf("Reading migration range according to key: %s", uniqueKey.Name)

View File

@ -47,6 +47,10 @@ func (this *Inspector) InitDBConnections() (err error) {
if err := this.validateBinlogs(); err != nil {
return err
}
return nil
}
func (this *Inspector) ValidateOriginalTable() (err error) {
if err := this.validateTable(); err != nil {
return err
}

View File

@ -7,8 +7,11 @@ package logic
import (
"fmt"
"sync/atomic"
"time"
"github.com/github/gh-osc/go/base"
"github.com/github/gh-osc/go/binlog"
"github.com/outbrain/golib/log"
)
@ -19,12 +22,70 @@ type Migrator struct {
applier *Applier
eventsStreamer *EventsStreamer
migrationContext *base.MigrationContext
tablesInPlace chan bool
}
func NewMigrator() *Migrator {
return &Migrator{
migrator := &Migrator{
migrationContext: base.GetMigrationContext(),
tablesInPlace: make(chan bool),
}
migrator.migrationContext.IsThrottled = func() bool {
return migrator.shouldThrottle()
}
return migrator
}
func (this *Migrator) shouldThrottle() bool {
lag := atomic.LoadInt64(&this.migrationContext.CurrentLag)
shouldThrottle := false
if time.Duration(lag) > time.Duration(this.migrationContext.MaxLagMillisecondsThrottleThreshold)*time.Millisecond {
shouldThrottle = true
}
return shouldThrottle
}
func (this *Migrator) canStopStreaming() bool {
return false
}
func (this *Migrator) onChangelogStateEvent(dmlEvent *binlog.BinlogDMLEvent) (err error) {
// Hey, I created the changlog table, I know the type of columns it has!
if hint := dmlEvent.NewColumnValues.StringColumn(2); hint != "state" {
return
}
changelogState := ChangelogState(dmlEvent.NewColumnValues.StringColumn(3))
switch changelogState {
case TablesInPlace:
{
this.tablesInPlace <- true
}
default:
{
return fmt.Errorf("Unknown changelog state: %+v", changelogState)
}
}
log.Debugf("---- - - - - - state %+v", changelogState)
return nil
}
func (this *Migrator) onChangelogHeartbeatEvent(dmlEvent *binlog.BinlogDMLEvent) (err error) {
if hint := dmlEvent.NewColumnValues.StringColumn(2); hint != "heartbeat" {
return nil
}
value := dmlEvent.NewColumnValues.StringColumn(3)
heartbeatTime, err := time.Parse(time.RFC3339, value)
if err != nil {
return log.Errore(err)
}
lag := time.Now().Sub(heartbeatTime)
atomic.StoreInt64(&this.migrationContext.CurrentLag, int64(lag))
log.Debugf("---- - - - - - lag %+v", lag)
return nil
}
func (this *Migrator) Migrate() (err error) {
@ -32,6 +93,14 @@ func (this *Migrator) Migrate() (err error) {
if err := this.inspector.InitDBConnections(); err != nil {
return err
}
if err := this.inspector.ValidateOriginalTable(); err != nil {
return err
}
uniqueKeys, err := this.inspector.InspectOriginalTable()
if err != nil {
return err
}
// So far so good, table is accessible and valid.
if this.migrationContext.MasterConnectionConfig, err = this.inspector.getMasterConnectionConfig(); err != nil {
return err
}
@ -39,15 +108,31 @@ func (this *Migrator) Migrate() (err error) {
return fmt.Errorf("It seems like this migration attempt to run directly on master. Preferably it would be executed on a replica (and this reduces load from the master). To proceed please provide --allow-on-master")
}
log.Infof("Master found to be %+v", this.migrationContext.MasterConnectionConfig.Key)
uniqueKeys, err := this.inspector.InspectOriginalTable()
if err != nil {
return err
}
this.eventsStreamer = NewEventsStreamer()
if err := this.eventsStreamer.InitDBConnections(); err != nil {
return err
}
this.eventsStreamer.AddListener(
false,
this.migrationContext.DatabaseName,
this.migrationContext.GetChangelogTableName(),
func(dmlEvent *binlog.BinlogDMLEvent) error {
return this.onChangelogStateEvent(dmlEvent)
},
)
this.eventsStreamer.AddListener(
false,
this.migrationContext.DatabaseName,
this.migrationContext.GetChangelogTableName(),
func(dmlEvent *binlog.BinlogDMLEvent) error {
return this.onChangelogHeartbeatEvent(dmlEvent)
},
)
go func() {
log.Debugf("Beginning streaming")
this.eventsStreamer.StreamEvents(func() bool { return this.canStopStreaming() })
}()
this.applier = NewApplier()
if err := this.applier.InitDBConnections(); err != nil {
@ -61,11 +146,35 @@ func (this *Migrator) Migrate() (err error) {
log.Errorf("Unable to ALTER ghost table, see further error details. Bailing out")
return err
}
if err := this.applier.CreateChangelogTable(); err != nil {
log.Errorf("Unable to create changelog table, see further error details. Perhaps a previous migration failed without dropping the table? OR is there a running migration? Bailing out")
return err
}
this.applier.WriteChangelog("state", string(TablesInPlace))
this.applier.InitiateHeartbeat()
<-this.tablesInPlace
// Yay! We now know the Ghost and Changelog tables are good to examine!
// When running on replica, this means the replica has those tables. When running
// on master this is always true, of course, and yet it also implies this knowledge
// is in the binlogs.
this.migrationContext.UniqueKey = uniqueKeys[0] // TODO. Need to wait on replica till the ghost table exists and get shared keys
if err := this.applier.ReadMigrationRangeValues(); err != nil {
return err
}
for {
throttleMigration(
this.migrationContext,
func() {
log.Debugf("throttling rowcopy")
},
nil,
func() {
log.Debugf("done throttling rowcopy")
},
)
isComplete, err := this.applier.IterationIsComplete()
if err != nil {
return err
@ -81,9 +190,11 @@ func (this *Migrator) Migrate() (err error) {
}
this.migrationContext.Iteration++
}
// if err := this.applier.IterateTable(uniqueKeys[0]); err != nil {
// return err
// }
// temporary wait:
heartbeatTick := time.Tick(10 * time.Second)
for range heartbeatTick {
return nil
}
return nil
}

View File

@ -8,6 +8,8 @@ package logic
import (
gosql "database/sql"
"fmt"
"strings"
"github.com/github/gh-osc/go/base"
"github.com/github/gh-osc/go/binlog"
"github.com/github/gh-osc/go/mysql"
@ -23,6 +25,10 @@ type BinlogEventListener struct {
onDmlEvent func(event *binlog.BinlogDMLEvent) error
}
const (
EventsChannelBufferSize = 1
)
// EventsStreamer reads data from binary logs and streams it on. It acts as a publisher,
// and interested parties may subscribe for per-table events.
type EventsStreamer struct {
@ -31,6 +37,8 @@ type EventsStreamer struct {
migrationContext *base.MigrationContext
nextBinlogCoordinates *mysql.BinlogCoordinates
listeners [](*BinlogEventListener)
eventsChannel chan *binlog.BinlogEntry
binlogReader binlog.BinlogReader
}
func NewEventsStreamer() *EventsStreamer {
@ -38,6 +46,7 @@ func NewEventsStreamer() *EventsStreamer {
connectionConfig: base.GetMigrationContext().InspectorConnectionConfig,
migrationContext: base.GetMigrationContext(),
listeners: [](*BinlogEventListener){},
eventsChannel: make(chan *binlog.BinlogEntry, EventsChannelBufferSize),
}
}
@ -61,10 +70,10 @@ func (this *EventsStreamer) AddListener(
func (this *EventsStreamer) notifyListeners(binlogEvent *binlog.BinlogDMLEvent) {
for _, listener := range this.listeners {
if listener.databaseName != binlogEvent.DatabaseName {
if strings.ToLower(listener.databaseName) != strings.ToLower(binlogEvent.DatabaseName) {
continue
}
if listener.tableName != binlogEvent.TableName {
if strings.ToLower(listener.tableName) != strings.ToLower(binlogEvent.TableName) {
continue
}
onDmlEvent := listener.onDmlEvent
@ -89,6 +98,15 @@ func (this *EventsStreamer) InitDBConnections() (err error) {
if err := this.readCurrentBinlogCoordinates(); err != nil {
return err
}
goMySQLReader, err := binlog.NewGoMySQLReader(this.migrationContext.InspectorConnectionConfig)
if err != nil {
return err
}
if err := goMySQLReader.ConnectBinlogStreamer(*this.nextBinlogCoordinates); err != nil {
return err
}
this.binlogReader = goMySQLReader
return nil
}
@ -129,3 +147,16 @@ func (this *EventsStreamer) readCurrentBinlogCoordinates() error {
log.Debugf("Streamer binlog coordinates: %+v", *this.nextBinlogCoordinates)
return nil
}
// StreamEvents will begin streaming events. It will be blocking, so should be
// executed by a goroutine
func (this *EventsStreamer) StreamEvents(canStopStreaming func() bool) error {
go func() {
for binlogEntry := range this.eventsChannel {
if binlogEntry.DmlEvent != nil {
this.notifyListeners(binlogEntry.DmlEvent)
}
}
}()
return this.binlogReader.StreamEvents(canStopStreaming, this.eventsChannel)
}

View File

@ -77,14 +77,18 @@ func (this *ColumnValues) AbstractValues() []interface{} {
return this.abstractValues
}
func (this *ColumnValues) StringColumn(index int) string {
val := this.AbstractValues()[index]
if ints, ok := val.([]uint8); ok {
return string(ints)
}
return fmt.Sprintf("%+v", val)
}
func (this *ColumnValues) String() string {
stringValues := []string{}
for _, val := range this.AbstractValues() {
if ints, ok := val.([]uint8); ok {
stringValues = append(stringValues, string(ints))
} else {
stringValues = append(stringValues, fmt.Sprintf("%+v", val))
}
for i := range this.AbstractValues() {
stringValues = append(stringValues, this.StringColumn(i))
}
return strings.Join(stringValues, ",")
}