- Handling gomysql.replication connection timeouts: reconnecting on last known position
- `printStatus()` takes ETA into account - More info around `master_pos_wait()`
This commit is contained in:
parent
ec34a5ef75
commit
9b54d0208f
2
build.sh
2
build.sh
@ -1,7 +1,7 @@
|
||||
#!/bin/bash
|
||||
#
|
||||
#
|
||||
RELEASE_VERSION="0.7.5"
|
||||
RELEASE_VERSION="0.7.13"
|
||||
|
||||
buildpath=/tmp/gh-ost
|
||||
target=gh-ost
|
||||
|
@ -1,147 +0,0 @@
|
||||
/*
|
||||
Copyright 2015 Shlomi Noach
|
||||
*/
|
||||
|
||||
package binlog
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"strings"
|
||||
)
|
||||
|
||||
// BinlogType identifies the type of the log: relay or binary log
|
||||
type BinlogType int
|
||||
|
||||
// BinaryLog, RelayLog are binlog types
|
||||
const (
|
||||
BinaryLog BinlogType = iota
|
||||
RelayLog
|
||||
)
|
||||
|
||||
// BinlogCoordinates described binary log coordinates in the form of log file & log position.
|
||||
type BinlogCoordinates struct {
|
||||
LogFile string
|
||||
LogPos int64
|
||||
Type BinlogType
|
||||
}
|
||||
|
||||
// ParseBinlogCoordinates will parse an InstanceKey from a string representation such as 127.0.0.1:3306
|
||||
func ParseBinlogCoordinates(logFileLogPos string) (*BinlogCoordinates, error) {
|
||||
tokens := strings.SplitN(logFileLogPos, ":", 2)
|
||||
if len(tokens) != 2 {
|
||||
return nil, fmt.Errorf("ParseBinlogCoordinates: Cannot parse BinlogCoordinates from %s. Expected format is file:pos", logFileLogPos)
|
||||
}
|
||||
|
||||
if logPos, err := strconv.ParseInt(tokens[1], 10, 0); err != nil {
|
||||
return nil, fmt.Errorf("ParseBinlogCoordinates: invalid pos: %s", tokens[1])
|
||||
} else {
|
||||
return &BinlogCoordinates{LogFile: tokens[0], LogPos: logPos}, nil
|
||||
}
|
||||
}
|
||||
|
||||
// DisplayString returns a user-friendly string representation of these coordinates
|
||||
func (this *BinlogCoordinates) DisplayString() string {
|
||||
return fmt.Sprintf("%s:%d", this.LogFile, this.LogPos)
|
||||
}
|
||||
|
||||
// String returns a user-friendly string representation of these coordinates
|
||||
func (this BinlogCoordinates) String() string {
|
||||
return this.DisplayString()
|
||||
}
|
||||
|
||||
// Equals tests equality of this corrdinate and another one.
|
||||
func (this *BinlogCoordinates) Equals(other *BinlogCoordinates) bool {
|
||||
if other == nil {
|
||||
return false
|
||||
}
|
||||
return this.LogFile == other.LogFile && this.LogPos == other.LogPos && this.Type == other.Type
|
||||
}
|
||||
|
||||
// IsEmpty returns true if the log file is empty, unnamed
|
||||
func (this *BinlogCoordinates) IsEmpty() bool {
|
||||
return this.LogFile == ""
|
||||
}
|
||||
|
||||
// SmallerThan returns true if this coordinate is strictly smaller than the other.
|
||||
func (this *BinlogCoordinates) SmallerThan(other *BinlogCoordinates) bool {
|
||||
if this.LogFile < other.LogFile {
|
||||
return true
|
||||
}
|
||||
if this.LogFile == other.LogFile && this.LogPos < other.LogPos {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// SmallerThanOrEquals returns true if this coordinate is the same or equal to the other one.
|
||||
// We do NOT compare the type so we can not use this.Equals()
|
||||
func (this *BinlogCoordinates) SmallerThanOrEquals(other *BinlogCoordinates) bool {
|
||||
if this.SmallerThan(other) {
|
||||
return true
|
||||
}
|
||||
return this.LogFile == other.LogFile && this.LogPos == other.LogPos // No Type comparison
|
||||
}
|
||||
|
||||
// FileSmallerThan returns true if this coordinate's file is strictly smaller than the other's.
|
||||
func (this *BinlogCoordinates) FileSmallerThan(other *BinlogCoordinates) bool {
|
||||
return this.LogFile < other.LogFile
|
||||
}
|
||||
|
||||
// FileNumberDistance returns the numeric distance between this corrdinate's file number and the other's.
|
||||
// Effectively it means "how many roatets/FLUSHes would make these coordinates's file reach the other's"
|
||||
func (this *BinlogCoordinates) FileNumberDistance(other *BinlogCoordinates) int {
|
||||
thisNumber, _ := this.FileNumber()
|
||||
otherNumber, _ := other.FileNumber()
|
||||
return otherNumber - thisNumber
|
||||
}
|
||||
|
||||
// FileNumber returns the numeric value of the file, and the length in characters representing the number in the filename.
|
||||
// Example: FileNumber() of mysqld.log.000789 is (789, 6)
|
||||
func (this *BinlogCoordinates) FileNumber() (int, int) {
|
||||
tokens := strings.Split(this.LogFile, ".")
|
||||
numPart := tokens[len(tokens)-1]
|
||||
numLen := len(numPart)
|
||||
fileNum, err := strconv.Atoi(numPart)
|
||||
if err != nil {
|
||||
return 0, 0
|
||||
}
|
||||
return fileNum, numLen
|
||||
}
|
||||
|
||||
// PreviousFileCoordinatesBy guesses the filename of the previous binlog/relaylog, by given offset (number of files back)
|
||||
func (this *BinlogCoordinates) PreviousFileCoordinatesBy(offset int) (BinlogCoordinates, error) {
|
||||
result := BinlogCoordinates{LogPos: 0, Type: this.Type}
|
||||
|
||||
fileNum, numLen := this.FileNumber()
|
||||
if fileNum == 0 {
|
||||
return result, errors.New("Log file number is zero, cannot detect previous file")
|
||||
}
|
||||
newNumStr := fmt.Sprintf("%d", (fileNum - offset))
|
||||
newNumStr = strings.Repeat("0", numLen-len(newNumStr)) + newNumStr
|
||||
|
||||
tokens := strings.Split(this.LogFile, ".")
|
||||
tokens[len(tokens)-1] = newNumStr
|
||||
result.LogFile = strings.Join(tokens, ".")
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// PreviousFileCoordinates guesses the filename of the previous binlog/relaylog
|
||||
func (this *BinlogCoordinates) PreviousFileCoordinates() (BinlogCoordinates, error) {
|
||||
return this.PreviousFileCoordinatesBy(1)
|
||||
}
|
||||
|
||||
// NextFileCoordinates guesses the filename of the next binlog/relaylog
|
||||
func (this *BinlogCoordinates) NextFileCoordinates() (BinlogCoordinates, error) {
|
||||
result := BinlogCoordinates{LogPos: 0, Type: this.Type}
|
||||
|
||||
fileNum, numLen := this.FileNumber()
|
||||
newNumStr := fmt.Sprintf("%d", (fileNum + 1))
|
||||
newNumStr = strings.Repeat("0", numLen-len(newNumStr)) + newNumStr
|
||||
|
||||
tokens := strings.Split(this.LogFile, ".")
|
||||
tokens[len(tokens)-1] = newNumStr
|
||||
result.LogFile = strings.Join(tokens, ".")
|
||||
return result, nil
|
||||
}
|
@ -5,8 +5,14 @@
|
||||
|
||||
package binlog
|
||||
|
||||
import (
|
||||
"github.com/github/gh-ost/go/mysql"
|
||||
)
|
||||
|
||||
// BinlogReader is a general interface whose implementations can choose their methods of reading
|
||||
// a binary log file and parsing it into binlog entries
|
||||
type BinlogReader interface {
|
||||
StreamEvents(canStopStreaming func() bool, entriesChannel chan<- *BinlogEntry) error
|
||||
GetCurrentBinlogCoordinates() *mysql.BinlogCoordinates
|
||||
Reconnect() error
|
||||
}
|
||||
|
@ -23,11 +23,12 @@ const (
|
||||
)
|
||||
|
||||
type GoMySQLReader struct {
|
||||
connectionConfig *mysql.ConnectionConfig
|
||||
binlogSyncer *replication.BinlogSyncer
|
||||
binlogStreamer *replication.BinlogStreamer
|
||||
tableMap map[uint64]string
|
||||
currentCoordinates mysql.BinlogCoordinates
|
||||
connectionConfig *mysql.ConnectionConfig
|
||||
binlogSyncer *replication.BinlogSyncer
|
||||
binlogStreamer *replication.BinlogStreamer
|
||||
tableMap map[uint64]string
|
||||
currentCoordinates mysql.BinlogCoordinates
|
||||
lastHandledCoordinates mysql.BinlogCoordinates
|
||||
}
|
||||
|
||||
func NewGoMySQLReader(connectionConfig *mysql.ConnectionConfig) (binlogReader *GoMySQLReader, err error) {
|
||||
@ -39,24 +40,91 @@ func NewGoMySQLReader(connectionConfig *mysql.ConnectionConfig) (binlogReader *G
|
||||
}
|
||||
binlogReader.binlogSyncer = replication.NewBinlogSyncer(serverId, "mysql")
|
||||
|
||||
// Register slave, the MySQL master is at 127.0.0.1:3306, with user root and an empty password
|
||||
err = binlogReader.binlogSyncer.RegisterSlave(connectionConfig.Key.Hostname, uint16(connectionConfig.Key.Port), connectionConfig.User, connectionConfig.Password)
|
||||
if err != nil {
|
||||
return binlogReader, err
|
||||
}
|
||||
|
||||
return binlogReader, err
|
||||
}
|
||||
|
||||
// ConnectBinlogStreamer
|
||||
func (this *GoMySQLReader) ConnectBinlogStreamer(coordinates mysql.BinlogCoordinates) (err error) {
|
||||
if coordinates.IsEmpty() {
|
||||
return log.Errorf("Emptry coordinates at ConnectBinlogStreamer()")
|
||||
}
|
||||
log.Infof("Registering replica at %+v:%+v", this.connectionConfig.Key.Hostname, uint16(this.connectionConfig.Key.Port))
|
||||
if err := this.binlogSyncer.RegisterSlave(this.connectionConfig.Key.Hostname, uint16(this.connectionConfig.Key.Port), this.connectionConfig.User, this.connectionConfig.Password); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
this.currentCoordinates = coordinates
|
||||
log.Infof("Connecting binlog streamer at %+v", this.currentCoordinates)
|
||||
// Start sync with sepcified binlog file and position
|
||||
this.binlogStreamer, err = this.binlogSyncer.StartSync(gomysql.Position{coordinates.LogFile, uint32(coordinates.LogPos)})
|
||||
this.binlogStreamer, err = this.binlogSyncer.StartSync(gomysql.Position{this.currentCoordinates.LogFile, uint32(this.currentCoordinates.LogPos)})
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (this *GoMySQLReader) Reconnect() error {
|
||||
this.binlogSyncer.Close()
|
||||
|
||||
connectCoordinates := &this.lastHandledCoordinates
|
||||
if connectCoordinates.IsEmpty() {
|
||||
connectCoordinates = &this.currentCoordinates
|
||||
}
|
||||
if err := this.ConnectBinlogStreamer(*connectCoordinates); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (this *GoMySQLReader) GetCurrentBinlogCoordinates() *mysql.BinlogCoordinates {
|
||||
return &this.currentCoordinates
|
||||
}
|
||||
|
||||
// StreamEvents
|
||||
func (this *GoMySQLReader) handleRowsEvent(ev *replication.BinlogEvent, rowsEvent *replication.RowsEvent, entriesChannel chan<- *BinlogEntry) error {
|
||||
if this.currentCoordinates.SmallerThanOrEquals(&this.lastHandledCoordinates) && !this.lastHandledCoordinates.IsEmpty() {
|
||||
log.Infof("Skipping handled query at %+v", this.currentCoordinates)
|
||||
return nil
|
||||
}
|
||||
|
||||
dml := ToEventDML(ev.Header.EventType.String())
|
||||
if dml == NotDML {
|
||||
return fmt.Errorf("Unknown DML type: %s", ev.Header.EventType.String())
|
||||
}
|
||||
for i, row := range rowsEvent.Rows {
|
||||
if dml == UpdateDML && i%2 == 1 {
|
||||
// An update has two rows (WHERE+SET)
|
||||
// We do both at the same time
|
||||
continue
|
||||
}
|
||||
binlogEntry := NewBinlogEntryAt(this.currentCoordinates)
|
||||
binlogEntry.DmlEvent = NewBinlogDMLEvent(
|
||||
string(rowsEvent.Table.Schema),
|
||||
string(rowsEvent.Table.Table),
|
||||
dml,
|
||||
)
|
||||
switch dml {
|
||||
case InsertDML:
|
||||
{
|
||||
binlogEntry.DmlEvent.NewColumnValues = sql.ToColumnValues(row)
|
||||
}
|
||||
case UpdateDML:
|
||||
{
|
||||
binlogEntry.DmlEvent.WhereColumnValues = sql.ToColumnValues(row)
|
||||
binlogEntry.DmlEvent.NewColumnValues = sql.ToColumnValues(rowsEvent.Rows[i+1])
|
||||
}
|
||||
case DeleteDML:
|
||||
{
|
||||
binlogEntry.DmlEvent.WhereColumnValues = sql.ToColumnValues(row)
|
||||
}
|
||||
}
|
||||
// The channel will do the throttling. Whoever is reding from the channel
|
||||
// decides whether action is taken sycnhronously (meaning we wait before
|
||||
// next iteration) or asynchronously (we keep pushing more events)
|
||||
// In reality, reads will be synchronous
|
||||
entriesChannel <- binlogEntry
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// StreamEvents
|
||||
func (this *GoMySQLReader) StreamEvents(canStopStreaming func() bool, entriesChannel chan<- *BinlogEntry) error {
|
||||
for {
|
||||
@ -77,44 +145,11 @@ func (this *GoMySQLReader) StreamEvents(canStopStreaming func() bool, entriesCha
|
||||
// future I should remove this.
|
||||
this.tableMap[tableMapEvent.TableID] = string(tableMapEvent.Table)
|
||||
} else if rowsEvent, ok := ev.Event.(*replication.RowsEvent); ok {
|
||||
dml := ToEventDML(ev.Header.EventType.String())
|
||||
if dml == NotDML {
|
||||
return fmt.Errorf("Unknown DML type: %s", ev.Header.EventType.String())
|
||||
}
|
||||
for i, row := range rowsEvent.Rows {
|
||||
if dml == UpdateDML && i%2 == 1 {
|
||||
// An update has two rows (WHERE+SET)
|
||||
// We do both at the same time
|
||||
continue
|
||||
}
|
||||
binlogEntry := NewBinlogEntryAt(this.currentCoordinates)
|
||||
binlogEntry.DmlEvent = NewBinlogDMLEvent(
|
||||
string(rowsEvent.Table.Schema),
|
||||
string(rowsEvent.Table.Table),
|
||||
dml,
|
||||
)
|
||||
switch dml {
|
||||
case InsertDML:
|
||||
{
|
||||
binlogEntry.DmlEvent.NewColumnValues = sql.ToColumnValues(row)
|
||||
}
|
||||
case UpdateDML:
|
||||
{
|
||||
binlogEntry.DmlEvent.WhereColumnValues = sql.ToColumnValues(row)
|
||||
binlogEntry.DmlEvent.NewColumnValues = sql.ToColumnValues(rowsEvent.Rows[i+1])
|
||||
}
|
||||
case DeleteDML:
|
||||
{
|
||||
binlogEntry.DmlEvent.WhereColumnValues = sql.ToColumnValues(row)
|
||||
}
|
||||
}
|
||||
// The channel will do the throttling. Whoever is reding from the channel
|
||||
// decides whether action is taken sycnhronously (meaning we wait before
|
||||
// next iteration) or asynchronously (we keep pushing more events)
|
||||
// In reality, reads will be synchronous
|
||||
entriesChannel <- binlogEntry
|
||||
if err := this.handleRowsEvent(ev, rowsEvent, entriesChannel); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
this.lastHandledCoordinates = this.currentCoordinates
|
||||
}
|
||||
log.Debugf("done streaming events")
|
||||
|
||||
|
@ -552,8 +552,8 @@ func (this *Applier) StopSlaveIOThread() error {
|
||||
// MasterPosWait is applicable with --test-on-replica
|
||||
func (this *Applier) MasterPosWait(binlogCoordinates *mysql.BinlogCoordinates) error {
|
||||
var appliedRows int64
|
||||
if err := this.db.QueryRow(`select ifnull(master_pos_wait(?, ?, ?), 0)`, binlogCoordinates.LogFile, binlogCoordinates.LogPos, 1).Scan(&appliedRows); err != nil {
|
||||
return err
|
||||
if err := this.db.QueryRow(`select master_pos_wait(?, ?, ?)`, binlogCoordinates.LogFile, binlogCoordinates.LogPos, 3).Scan(&appliedRows); err != nil {
|
||||
return log.Errore(err)
|
||||
}
|
||||
if appliedRows < 0 {
|
||||
return fmt.Errorf("Timeout waiting on master_pos_wait()")
|
||||
@ -565,15 +565,17 @@ func (this *Applier) StopSlaveNicely() error {
|
||||
if err := this.StopSlaveIOThread(); err != nil {
|
||||
return err
|
||||
}
|
||||
binlogCoordinates, err := mysql.GetReadBinlogCoordinates(this.db)
|
||||
readBinlogCoordinates, executeBinlogCoordinates, err := mysql.GetReplicationBinlogCoordinates(this.db)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
log.Infof("Replication stopped at %+v. Will wait for SQL thread to apply", *binlogCoordinates)
|
||||
if err := this.MasterPosWait(binlogCoordinates); err != nil {
|
||||
log.Infof("Replication IO thread at %+v. SQL thread is at %+v", *readBinlogCoordinates, *executeBinlogCoordinates)
|
||||
log.Infof("Will wait for SQL thread to catch up with IO thread")
|
||||
if err := this.MasterPosWait(readBinlogCoordinates); err != nil {
|
||||
log.Errorf("Error waiting for SQL thread to catch up. Replication IO thread at %+v. SQL thread is at %+v", *readBinlogCoordinates, *executeBinlogCoordinates)
|
||||
return err
|
||||
}
|
||||
log.Infof("Replication SQL thread applied all events")
|
||||
log.Infof("Replication SQL thread applied all events up to %+v", *readBinlogCoordinates)
|
||||
if selfBinlogCoordinates, err := mysql.GetSelfBinlogCoordinates(this.db); err != nil {
|
||||
return err
|
||||
} else {
|
||||
|
@ -7,6 +7,7 @@ package logic
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"math"
|
||||
"os"
|
||||
"os/signal"
|
||||
"sync/atomic"
|
||||
@ -286,6 +287,7 @@ func (this *Migrator) listenOnPanicAbort() {
|
||||
}
|
||||
|
||||
func (this *Migrator) Migrate() (err error) {
|
||||
log.Infof("Migrating %s.%s", sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.OriginalTableName))
|
||||
this.migrationContext.StartTime = time.Now()
|
||||
|
||||
go this.listenOnPanicAbort()
|
||||
@ -326,13 +328,14 @@ func (this *Migrator) Migrate() (err error) {
|
||||
|
||||
log.Debugf("Operating until row copy is complete")
|
||||
this.consumeRowCopyComplete()
|
||||
log.Debugf("Row copy complete")
|
||||
log.Infof("Row copy complete")
|
||||
this.printStatus()
|
||||
|
||||
if err := this.stopWritesAndCompleteMigration(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
log.Infof("Done migrating %s.%s", sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.OriginalTableName))
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -401,13 +404,14 @@ func (this *Migrator) dropOldTableIfRequired() (err error) {
|
||||
// Inject the "AllEventsUpToLockProcessed" state hint, wait for it to appear in the binary logs,
|
||||
// make sure the queue is drained.
|
||||
func (this *Migrator) waitForEventsUpToLock() (err error) {
|
||||
log.Infof("Writing changelog state: %+v", AllEventsUpToLockProcessed)
|
||||
if _, err := this.applier.WriteChangelogState(string(AllEventsUpToLockProcessed)); err != nil {
|
||||
return err
|
||||
}
|
||||
log.Debugf("Waiting for events up to lock")
|
||||
log.Infof("Waiting for events up to lock")
|
||||
atomic.StoreInt64(&this.allEventsUpToLockProcessedInjectedFlag, 1)
|
||||
<-this.allEventsUpToLockProcessed
|
||||
log.Debugf("Done waiting for events up to lock")
|
||||
log.Infof("Done waiting for events up to lock")
|
||||
this.printStatus()
|
||||
|
||||
return nil
|
||||
@ -570,16 +574,37 @@ func (this *Migrator) printStatus() {
|
||||
elapsedSeconds := int64(elapsedTime.Seconds())
|
||||
totalRowsCopied := this.migrationContext.GetTotalRowsCopied()
|
||||
rowsEstimate := atomic.LoadInt64(&this.migrationContext.RowsEstimate)
|
||||
progressPct := 100.0 * float64(totalRowsCopied) / float64(rowsEstimate)
|
||||
var progressPct float64
|
||||
if rowsEstimate > 0 {
|
||||
progressPct = 100.0 * float64(totalRowsCopied) / float64(rowsEstimate)
|
||||
}
|
||||
var etaSeconds float64 = math.MaxFloat64
|
||||
|
||||
eta := "N/A"
|
||||
if isThrottled, throttleReason := this.migrationContext.IsThrottled(); isThrottled {
|
||||
eta = fmt.Sprintf("throttled, %s", throttleReason)
|
||||
} else if progressPct > 100.0 {
|
||||
eta = "Due"
|
||||
} else if progressPct >= 1.0 {
|
||||
elapsedRowCopySeconds := this.migrationContext.ElapsedRowCopyTime().Seconds()
|
||||
totalExpectedSeconds := elapsedRowCopySeconds * float64(rowsEstimate) / float64(totalRowsCopied)
|
||||
etaSeconds = totalExpectedSeconds - elapsedRowCopySeconds
|
||||
if etaSeconds >= 0 {
|
||||
etaDuration := time.Duration(etaSeconds) * time.Second
|
||||
eta = base.PrettifyDurationOutput(etaDuration)
|
||||
} else {
|
||||
eta = "Due"
|
||||
}
|
||||
}
|
||||
|
||||
shouldPrintStatus := false
|
||||
if elapsedSeconds <= 60 {
|
||||
shouldPrintStatus = true
|
||||
} else if progressPct >= 99.0 {
|
||||
} else if etaSeconds <= 60 {
|
||||
shouldPrintStatus = true
|
||||
} else if progressPct >= 95.0 {
|
||||
} else if etaSeconds <= 180 {
|
||||
shouldPrintStatus = (elapsedSeconds%5 == 0)
|
||||
} else if elapsedSeconds <= 120 {
|
||||
} else if elapsedSeconds <= 180 {
|
||||
shouldPrintStatus = (elapsedSeconds%5 == 0)
|
||||
} else {
|
||||
shouldPrintStatus = (elapsedSeconds%30 == 0)
|
||||
@ -588,27 +613,14 @@ func (this *Migrator) printStatus() {
|
||||
return
|
||||
}
|
||||
|
||||
eta := "N/A"
|
||||
if isThrottled, throttleReason := this.migrationContext.IsThrottled(); isThrottled {
|
||||
eta = fmt.Sprintf("throttled, %s", throttleReason)
|
||||
} else if progressPct > 100.0 {
|
||||
eta = "Due"
|
||||
} else if progressPct >= 2.0 {
|
||||
elapsedRowCopySeconds := this.migrationContext.ElapsedRowCopyTime().Seconds()
|
||||
totalExpectedSeconds := elapsedRowCopySeconds * float64(rowsEstimate) / float64(totalRowsCopied)
|
||||
etaSeconds := totalExpectedSeconds - elapsedRowCopySeconds
|
||||
etaDuration := time.Duration(etaSeconds) * time.Second
|
||||
if etaDuration >= 0 {
|
||||
eta = base.PrettifyDurationOutput(etaDuration)
|
||||
} else {
|
||||
eta = "Due"
|
||||
}
|
||||
}
|
||||
status := fmt.Sprintf("Copy: %d/%d %.1f%%; Applied: %d; Backlog: %d/%d; Elapsed: %+v(copy), %+v(total); ETA: %s",
|
||||
currentBinlogCoordinates := *this.eventsStreamer.GetCurrentBinlogCoordinates()
|
||||
|
||||
status := fmt.Sprintf("Copy: %d/%d %.1f%%; Applied: %d; Backlog: %d/%d; Elapsed: %+v(copy), %+v(total); streamer: %+v; ETA: %s",
|
||||
totalRowsCopied, rowsEstimate, progressPct,
|
||||
atomic.LoadInt64(&this.migrationContext.TotalDMLEventsApplied),
|
||||
len(this.applyEventsQueue), cap(this.applyEventsQueue),
|
||||
base.PrettifyDurationOutput(this.migrationContext.ElapsedRowCopyTime()), base.PrettifyDurationOutput(elapsedTime),
|
||||
currentBinlogCoordinates,
|
||||
eta,
|
||||
)
|
||||
this.applier.WriteChangelog(
|
||||
@ -656,7 +668,11 @@ func (this *Migrator) initiateStreaming() error {
|
||||
|
||||
go func() {
|
||||
log.Debugf("Beginning streaming")
|
||||
this.eventsStreamer.StreamEvents(func() bool { return this.canStopStreaming() })
|
||||
err := this.eventsStreamer.StreamEvents(this.canStopStreaming)
|
||||
if err != nil {
|
||||
this.panicAbort <- err
|
||||
}
|
||||
log.Debugf("Done streaming")
|
||||
}()
|
||||
return nil
|
||||
}
|
||||
|
@ -33,14 +33,14 @@ const (
|
||||
// EventsStreamer reads data from binary logs and streams it on. It acts as a publisher,
|
||||
// and interested parties may subscribe for per-table events.
|
||||
type EventsStreamer struct {
|
||||
connectionConfig *mysql.ConnectionConfig
|
||||
db *gosql.DB
|
||||
migrationContext *base.MigrationContext
|
||||
nextBinlogCoordinates *mysql.BinlogCoordinates
|
||||
listeners [](*BinlogEventListener)
|
||||
listenersMutex *sync.Mutex
|
||||
eventsChannel chan *binlog.BinlogEntry
|
||||
binlogReader binlog.BinlogReader
|
||||
connectionConfig *mysql.ConnectionConfig
|
||||
db *gosql.DB
|
||||
migrationContext *base.MigrationContext
|
||||
initialBinlogCoordinates *mysql.BinlogCoordinates
|
||||
listeners [](*BinlogEventListener)
|
||||
listenersMutex *sync.Mutex
|
||||
eventsChannel chan *binlog.BinlogEntry
|
||||
binlogReader binlog.BinlogReader
|
||||
}
|
||||
|
||||
func NewEventsStreamer() *EventsStreamer {
|
||||
@ -80,19 +80,19 @@ func (this *EventsStreamer) notifyListeners(binlogEvent *binlog.BinlogDMLEvent)
|
||||
defer this.listenersMutex.Unlock()
|
||||
|
||||
for _, listener := range this.listeners {
|
||||
listener := listener
|
||||
if strings.ToLower(listener.databaseName) != strings.ToLower(binlogEvent.DatabaseName) {
|
||||
continue
|
||||
}
|
||||
if strings.ToLower(listener.tableName) != strings.ToLower(binlogEvent.TableName) {
|
||||
continue
|
||||
}
|
||||
onDmlEvent := listener.onDmlEvent
|
||||
if listener.async {
|
||||
go func() {
|
||||
onDmlEvent(binlogEvent)
|
||||
listener.onDmlEvent(binlogEvent)
|
||||
}()
|
||||
} else {
|
||||
onDmlEvent(binlogEvent)
|
||||
listener.onDmlEvent(binlogEvent)
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -112,7 +112,7 @@ func (this *EventsStreamer) InitDBConnections() (err error) {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := goMySQLReader.ConnectBinlogStreamer(*this.nextBinlogCoordinates); err != nil {
|
||||
if err := goMySQLReader.ConnectBinlogStreamer(*this.initialBinlogCoordinates); err != nil {
|
||||
return err
|
||||
}
|
||||
this.binlogReader = goMySQLReader
|
||||
@ -134,13 +134,17 @@ func (this *EventsStreamer) validateConnection() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (this *EventsStreamer) GetCurrentBinlogCoordinates() *mysql.BinlogCoordinates {
|
||||
return this.binlogReader.GetCurrentBinlogCoordinates()
|
||||
}
|
||||
|
||||
// validateGrants verifies the user by which we're executing has necessary grants
|
||||
// to do its thang.
|
||||
func (this *EventsStreamer) readCurrentBinlogCoordinates() error {
|
||||
query := `show /* gh-ost readCurrentBinlogCoordinates */ master status`
|
||||
foundMasterStatus := false
|
||||
err := sqlutils.QueryRowsMap(this.db, query, func(m sqlutils.RowMap) error {
|
||||
this.nextBinlogCoordinates = &mysql.BinlogCoordinates{
|
||||
this.initialBinlogCoordinates = &mysql.BinlogCoordinates{
|
||||
LogFile: m.GetString("File"),
|
||||
LogPos: m.GetInt64("Position"),
|
||||
}
|
||||
@ -154,7 +158,7 @@ func (this *EventsStreamer) readCurrentBinlogCoordinates() error {
|
||||
if !foundMasterStatus {
|
||||
return fmt.Errorf("Got no results from SHOW MASTER STATUS. Bailing out")
|
||||
}
|
||||
log.Debugf("Streamer binlog coordinates: %+v", *this.nextBinlogCoordinates)
|
||||
log.Debugf("Streamer binlog coordinates: %+v", *this.initialBinlogCoordinates)
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -168,5 +172,15 @@ func (this *EventsStreamer) StreamEvents(canStopStreaming func() bool) error {
|
||||
}
|
||||
}
|
||||
}()
|
||||
return this.binlogReader.StreamEvents(canStopStreaming, this.eventsChannel)
|
||||
// The next should block and execute forever, unless there's a serious error
|
||||
for {
|
||||
if err := this.binlogReader.StreamEvents(canStopStreaming, this.eventsChannel); err != nil {
|
||||
// Reposition at same coordinates. Single attempt (TODO: make multiple attempts?)
|
||||
log.Infof("StreamEvents encountered unexpected error: %+v", err)
|
||||
err = this.binlogReader.Reconnect()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -108,15 +108,19 @@ func GetMasterConnectionConfigSafe(connectionConfig *ConnectionConfig, visitedKe
|
||||
return GetMasterConnectionConfigSafe(masterConfig, visitedKeys)
|
||||
}
|
||||
|
||||
func GetReadBinlogCoordinates(db *gosql.DB) (readBinlogCoordinates *BinlogCoordinates, err error) {
|
||||
func GetReplicationBinlogCoordinates(db *gosql.DB) (readBinlogCoordinates *BinlogCoordinates, executeBinlogCoordinates *BinlogCoordinates, err error) {
|
||||
err = sqlutils.QueryRowsMap(db, `show slave status`, func(m sqlutils.RowMap) error {
|
||||
readBinlogCoordinates = &BinlogCoordinates{
|
||||
LogFile: m.GetString("Master_Log_File"),
|
||||
LogPos: m.GetInt64("Read_Master_Log_Pos"),
|
||||
}
|
||||
executeBinlogCoordinates = &BinlogCoordinates{
|
||||
LogFile: m.GetString("Relay_Master_Log_File"),
|
||||
LogPos: m.GetInt64("Exec_Master_Log_Pos"),
|
||||
}
|
||||
return nil
|
||||
})
|
||||
return readBinlogCoordinates, err
|
||||
return readBinlogCoordinates, executeBinlogCoordinates, err
|
||||
}
|
||||
|
||||
func GetSelfBinlogCoordinates(db *gosql.DB) (selfBinlogCoordinates *BinlogCoordinates, err error) {
|
||||
|
Loading…
Reference in New Issue
Block a user