diff --git a/build.sh b/build.sh index 56d8601..4b9b55a 100644 --- a/build.sh +++ b/build.sh @@ -1,7 +1,7 @@ #!/bin/bash # # -RELEASE_VERSION="0.7.5" +RELEASE_VERSION="0.7.13" buildpath=/tmp/gh-ost target=gh-ost diff --git a/go/binlog/binlog.go b/go/binlog/binlog.go deleted file mode 100644 index 8f7c67e..0000000 --- a/go/binlog/binlog.go +++ /dev/null @@ -1,147 +0,0 @@ -/* - Copyright 2015 Shlomi Noach -*/ - -package binlog - -import ( - "errors" - "fmt" - "strconv" - "strings" -) - -// BinlogType identifies the type of the log: relay or binary log -type BinlogType int - -// BinaryLog, RelayLog are binlog types -const ( - BinaryLog BinlogType = iota - RelayLog -) - -// BinlogCoordinates described binary log coordinates in the form of log file & log position. -type BinlogCoordinates struct { - LogFile string - LogPos int64 - Type BinlogType -} - -// ParseBinlogCoordinates will parse an InstanceKey from a string representation such as 127.0.0.1:3306 -func ParseBinlogCoordinates(logFileLogPos string) (*BinlogCoordinates, error) { - tokens := strings.SplitN(logFileLogPos, ":", 2) - if len(tokens) != 2 { - return nil, fmt.Errorf("ParseBinlogCoordinates: Cannot parse BinlogCoordinates from %s. Expected format is file:pos", logFileLogPos) - } - - if logPos, err := strconv.ParseInt(tokens[1], 10, 0); err != nil { - return nil, fmt.Errorf("ParseBinlogCoordinates: invalid pos: %s", tokens[1]) - } else { - return &BinlogCoordinates{LogFile: tokens[0], LogPos: logPos}, nil - } -} - -// DisplayString returns a user-friendly string representation of these coordinates -func (this *BinlogCoordinates) DisplayString() string { - return fmt.Sprintf("%s:%d", this.LogFile, this.LogPos) -} - -// String returns a user-friendly string representation of these coordinates -func (this BinlogCoordinates) String() string { - return this.DisplayString() -} - -// Equals tests equality of this corrdinate and another one. -func (this *BinlogCoordinates) Equals(other *BinlogCoordinates) bool { - if other == nil { - return false - } - return this.LogFile == other.LogFile && this.LogPos == other.LogPos && this.Type == other.Type -} - -// IsEmpty returns true if the log file is empty, unnamed -func (this *BinlogCoordinates) IsEmpty() bool { - return this.LogFile == "" -} - -// SmallerThan returns true if this coordinate is strictly smaller than the other. -func (this *BinlogCoordinates) SmallerThan(other *BinlogCoordinates) bool { - if this.LogFile < other.LogFile { - return true - } - if this.LogFile == other.LogFile && this.LogPos < other.LogPos { - return true - } - return false -} - -// SmallerThanOrEquals returns true if this coordinate is the same or equal to the other one. -// We do NOT compare the type so we can not use this.Equals() -func (this *BinlogCoordinates) SmallerThanOrEquals(other *BinlogCoordinates) bool { - if this.SmallerThan(other) { - return true - } - return this.LogFile == other.LogFile && this.LogPos == other.LogPos // No Type comparison -} - -// FileSmallerThan returns true if this coordinate's file is strictly smaller than the other's. -func (this *BinlogCoordinates) FileSmallerThan(other *BinlogCoordinates) bool { - return this.LogFile < other.LogFile -} - -// FileNumberDistance returns the numeric distance between this corrdinate's file number and the other's. -// Effectively it means "how many roatets/FLUSHes would make these coordinates's file reach the other's" -func (this *BinlogCoordinates) FileNumberDistance(other *BinlogCoordinates) int { - thisNumber, _ := this.FileNumber() - otherNumber, _ := other.FileNumber() - return otherNumber - thisNumber -} - -// FileNumber returns the numeric value of the file, and the length in characters representing the number in the filename. -// Example: FileNumber() of mysqld.log.000789 is (789, 6) -func (this *BinlogCoordinates) FileNumber() (int, int) { - tokens := strings.Split(this.LogFile, ".") - numPart := tokens[len(tokens)-1] - numLen := len(numPart) - fileNum, err := strconv.Atoi(numPart) - if err != nil { - return 0, 0 - } - return fileNum, numLen -} - -// PreviousFileCoordinatesBy guesses the filename of the previous binlog/relaylog, by given offset (number of files back) -func (this *BinlogCoordinates) PreviousFileCoordinatesBy(offset int) (BinlogCoordinates, error) { - result := BinlogCoordinates{LogPos: 0, Type: this.Type} - - fileNum, numLen := this.FileNumber() - if fileNum == 0 { - return result, errors.New("Log file number is zero, cannot detect previous file") - } - newNumStr := fmt.Sprintf("%d", (fileNum - offset)) - newNumStr = strings.Repeat("0", numLen-len(newNumStr)) + newNumStr - - tokens := strings.Split(this.LogFile, ".") - tokens[len(tokens)-1] = newNumStr - result.LogFile = strings.Join(tokens, ".") - return result, nil -} - -// PreviousFileCoordinates guesses the filename of the previous binlog/relaylog -func (this *BinlogCoordinates) PreviousFileCoordinates() (BinlogCoordinates, error) { - return this.PreviousFileCoordinatesBy(1) -} - -// NextFileCoordinates guesses the filename of the next binlog/relaylog -func (this *BinlogCoordinates) NextFileCoordinates() (BinlogCoordinates, error) { - result := BinlogCoordinates{LogPos: 0, Type: this.Type} - - fileNum, numLen := this.FileNumber() - newNumStr := fmt.Sprintf("%d", (fileNum + 1)) - newNumStr = strings.Repeat("0", numLen-len(newNumStr)) + newNumStr - - tokens := strings.Split(this.LogFile, ".") - tokens[len(tokens)-1] = newNumStr - result.LogFile = strings.Join(tokens, ".") - return result, nil -} diff --git a/go/binlog/binlog_reader.go b/go/binlog/binlog_reader.go index fa8e62e..5720a43 100644 --- a/go/binlog/binlog_reader.go +++ b/go/binlog/binlog_reader.go @@ -5,8 +5,14 @@ package binlog +import ( + "github.com/github/gh-ost/go/mysql" +) + // BinlogReader is a general interface whose implementations can choose their methods of reading // a binary log file and parsing it into binlog entries type BinlogReader interface { StreamEvents(canStopStreaming func() bool, entriesChannel chan<- *BinlogEntry) error + GetCurrentBinlogCoordinates() *mysql.BinlogCoordinates + Reconnect() error } diff --git a/go/binlog/gomysql_reader.go b/go/binlog/gomysql_reader.go index 88a48d5..a566523 100644 --- a/go/binlog/gomysql_reader.go +++ b/go/binlog/gomysql_reader.go @@ -23,11 +23,12 @@ const ( ) type GoMySQLReader struct { - connectionConfig *mysql.ConnectionConfig - binlogSyncer *replication.BinlogSyncer - binlogStreamer *replication.BinlogStreamer - tableMap map[uint64]string - currentCoordinates mysql.BinlogCoordinates + connectionConfig *mysql.ConnectionConfig + binlogSyncer *replication.BinlogSyncer + binlogStreamer *replication.BinlogStreamer + tableMap map[uint64]string + currentCoordinates mysql.BinlogCoordinates + lastHandledCoordinates mysql.BinlogCoordinates } func NewGoMySQLReader(connectionConfig *mysql.ConnectionConfig) (binlogReader *GoMySQLReader, err error) { @@ -39,24 +40,91 @@ func NewGoMySQLReader(connectionConfig *mysql.ConnectionConfig) (binlogReader *G } binlogReader.binlogSyncer = replication.NewBinlogSyncer(serverId, "mysql") - // Register slave, the MySQL master is at 127.0.0.1:3306, with user root and an empty password - err = binlogReader.binlogSyncer.RegisterSlave(connectionConfig.Key.Hostname, uint16(connectionConfig.Key.Port), connectionConfig.User, connectionConfig.Password) - if err != nil { - return binlogReader, err - } - return binlogReader, err } // ConnectBinlogStreamer func (this *GoMySQLReader) ConnectBinlogStreamer(coordinates mysql.BinlogCoordinates) (err error) { + if coordinates.IsEmpty() { + return log.Errorf("Emptry coordinates at ConnectBinlogStreamer()") + } + log.Infof("Registering replica at %+v:%+v", this.connectionConfig.Key.Hostname, uint16(this.connectionConfig.Key.Port)) + if err := this.binlogSyncer.RegisterSlave(this.connectionConfig.Key.Hostname, uint16(this.connectionConfig.Key.Port), this.connectionConfig.User, this.connectionConfig.Password); err != nil { + return err + } + this.currentCoordinates = coordinates + log.Infof("Connecting binlog streamer at %+v", this.currentCoordinates) // Start sync with sepcified binlog file and position - this.binlogStreamer, err = this.binlogSyncer.StartSync(gomysql.Position{coordinates.LogFile, uint32(coordinates.LogPos)}) + this.binlogStreamer, err = this.binlogSyncer.StartSync(gomysql.Position{this.currentCoordinates.LogFile, uint32(this.currentCoordinates.LogPos)}) return err } +func (this *GoMySQLReader) Reconnect() error { + this.binlogSyncer.Close() + + connectCoordinates := &this.lastHandledCoordinates + if connectCoordinates.IsEmpty() { + connectCoordinates = &this.currentCoordinates + } + if err := this.ConnectBinlogStreamer(*connectCoordinates); err != nil { + return err + } + return nil +} + +func (this *GoMySQLReader) GetCurrentBinlogCoordinates() *mysql.BinlogCoordinates { + return &this.currentCoordinates +} + +// StreamEvents +func (this *GoMySQLReader) handleRowsEvent(ev *replication.BinlogEvent, rowsEvent *replication.RowsEvent, entriesChannel chan<- *BinlogEntry) error { + if this.currentCoordinates.SmallerThanOrEquals(&this.lastHandledCoordinates) && !this.lastHandledCoordinates.IsEmpty() { + log.Infof("Skipping handled query at %+v", this.currentCoordinates) + return nil + } + + dml := ToEventDML(ev.Header.EventType.String()) + if dml == NotDML { + return fmt.Errorf("Unknown DML type: %s", ev.Header.EventType.String()) + } + for i, row := range rowsEvent.Rows { + if dml == UpdateDML && i%2 == 1 { + // An update has two rows (WHERE+SET) + // We do both at the same time + continue + } + binlogEntry := NewBinlogEntryAt(this.currentCoordinates) + binlogEntry.DmlEvent = NewBinlogDMLEvent( + string(rowsEvent.Table.Schema), + string(rowsEvent.Table.Table), + dml, + ) + switch dml { + case InsertDML: + { + binlogEntry.DmlEvent.NewColumnValues = sql.ToColumnValues(row) + } + case UpdateDML: + { + binlogEntry.DmlEvent.WhereColumnValues = sql.ToColumnValues(row) + binlogEntry.DmlEvent.NewColumnValues = sql.ToColumnValues(rowsEvent.Rows[i+1]) + } + case DeleteDML: + { + binlogEntry.DmlEvent.WhereColumnValues = sql.ToColumnValues(row) + } + } + // The channel will do the throttling. Whoever is reding from the channel + // decides whether action is taken sycnhronously (meaning we wait before + // next iteration) or asynchronously (we keep pushing more events) + // In reality, reads will be synchronous + entriesChannel <- binlogEntry + } + return nil +} + // StreamEvents func (this *GoMySQLReader) StreamEvents(canStopStreaming func() bool, entriesChannel chan<- *BinlogEntry) error { for { @@ -77,44 +145,11 @@ func (this *GoMySQLReader) StreamEvents(canStopStreaming func() bool, entriesCha // future I should remove this. this.tableMap[tableMapEvent.TableID] = string(tableMapEvent.Table) } else if rowsEvent, ok := ev.Event.(*replication.RowsEvent); ok { - dml := ToEventDML(ev.Header.EventType.String()) - if dml == NotDML { - return fmt.Errorf("Unknown DML type: %s", ev.Header.EventType.String()) - } - for i, row := range rowsEvent.Rows { - if dml == UpdateDML && i%2 == 1 { - // An update has two rows (WHERE+SET) - // We do both at the same time - continue - } - binlogEntry := NewBinlogEntryAt(this.currentCoordinates) - binlogEntry.DmlEvent = NewBinlogDMLEvent( - string(rowsEvent.Table.Schema), - string(rowsEvent.Table.Table), - dml, - ) - switch dml { - case InsertDML: - { - binlogEntry.DmlEvent.NewColumnValues = sql.ToColumnValues(row) - } - case UpdateDML: - { - binlogEntry.DmlEvent.WhereColumnValues = sql.ToColumnValues(row) - binlogEntry.DmlEvent.NewColumnValues = sql.ToColumnValues(rowsEvent.Rows[i+1]) - } - case DeleteDML: - { - binlogEntry.DmlEvent.WhereColumnValues = sql.ToColumnValues(row) - } - } - // The channel will do the throttling. Whoever is reding from the channel - // decides whether action is taken sycnhronously (meaning we wait before - // next iteration) or asynchronously (we keep pushing more events) - // In reality, reads will be synchronous - entriesChannel <- binlogEntry + if err := this.handleRowsEvent(ev, rowsEvent, entriesChannel); err != nil { + return err } } + this.lastHandledCoordinates = this.currentCoordinates } log.Debugf("done streaming events") diff --git a/go/logic/applier.go b/go/logic/applier.go index 460154c..b206503 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -552,8 +552,8 @@ func (this *Applier) StopSlaveIOThread() error { // MasterPosWait is applicable with --test-on-replica func (this *Applier) MasterPosWait(binlogCoordinates *mysql.BinlogCoordinates) error { var appliedRows int64 - if err := this.db.QueryRow(`select ifnull(master_pos_wait(?, ?, ?), 0)`, binlogCoordinates.LogFile, binlogCoordinates.LogPos, 1).Scan(&appliedRows); err != nil { - return err + if err := this.db.QueryRow(`select master_pos_wait(?, ?, ?)`, binlogCoordinates.LogFile, binlogCoordinates.LogPos, 3).Scan(&appliedRows); err != nil { + return log.Errore(err) } if appliedRows < 0 { return fmt.Errorf("Timeout waiting on master_pos_wait()") @@ -565,15 +565,17 @@ func (this *Applier) StopSlaveNicely() error { if err := this.StopSlaveIOThread(); err != nil { return err } - binlogCoordinates, err := mysql.GetReadBinlogCoordinates(this.db) + readBinlogCoordinates, executeBinlogCoordinates, err := mysql.GetReplicationBinlogCoordinates(this.db) if err != nil { return err } - log.Infof("Replication stopped at %+v. Will wait for SQL thread to apply", *binlogCoordinates) - if err := this.MasterPosWait(binlogCoordinates); err != nil { + log.Infof("Replication IO thread at %+v. SQL thread is at %+v", *readBinlogCoordinates, *executeBinlogCoordinates) + log.Infof("Will wait for SQL thread to catch up with IO thread") + if err := this.MasterPosWait(readBinlogCoordinates); err != nil { + log.Errorf("Error waiting for SQL thread to catch up. Replication IO thread at %+v. SQL thread is at %+v", *readBinlogCoordinates, *executeBinlogCoordinates) return err } - log.Infof("Replication SQL thread applied all events") + log.Infof("Replication SQL thread applied all events up to %+v", *readBinlogCoordinates) if selfBinlogCoordinates, err := mysql.GetSelfBinlogCoordinates(this.db); err != nil { return err } else { diff --git a/go/logic/migrator.go b/go/logic/migrator.go index 59802f4..040cf7f 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -7,6 +7,7 @@ package logic import ( "fmt" + "math" "os" "os/signal" "sync/atomic" @@ -286,6 +287,7 @@ func (this *Migrator) listenOnPanicAbort() { } func (this *Migrator) Migrate() (err error) { + log.Infof("Migrating %s.%s", sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.OriginalTableName)) this.migrationContext.StartTime = time.Now() go this.listenOnPanicAbort() @@ -326,13 +328,14 @@ func (this *Migrator) Migrate() (err error) { log.Debugf("Operating until row copy is complete") this.consumeRowCopyComplete() - log.Debugf("Row copy complete") + log.Infof("Row copy complete") this.printStatus() if err := this.stopWritesAndCompleteMigration(); err != nil { return err } + log.Infof("Done migrating %s.%s", sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.OriginalTableName)) return nil } @@ -401,13 +404,14 @@ func (this *Migrator) dropOldTableIfRequired() (err error) { // Inject the "AllEventsUpToLockProcessed" state hint, wait for it to appear in the binary logs, // make sure the queue is drained. func (this *Migrator) waitForEventsUpToLock() (err error) { + log.Infof("Writing changelog state: %+v", AllEventsUpToLockProcessed) if _, err := this.applier.WriteChangelogState(string(AllEventsUpToLockProcessed)); err != nil { return err } - log.Debugf("Waiting for events up to lock") + log.Infof("Waiting for events up to lock") atomic.StoreInt64(&this.allEventsUpToLockProcessedInjectedFlag, 1) <-this.allEventsUpToLockProcessed - log.Debugf("Done waiting for events up to lock") + log.Infof("Done waiting for events up to lock") this.printStatus() return nil @@ -570,16 +574,37 @@ func (this *Migrator) printStatus() { elapsedSeconds := int64(elapsedTime.Seconds()) totalRowsCopied := this.migrationContext.GetTotalRowsCopied() rowsEstimate := atomic.LoadInt64(&this.migrationContext.RowsEstimate) - progressPct := 100.0 * float64(totalRowsCopied) / float64(rowsEstimate) + var progressPct float64 + if rowsEstimate > 0 { + progressPct = 100.0 * float64(totalRowsCopied) / float64(rowsEstimate) + } + var etaSeconds float64 = math.MaxFloat64 + + eta := "N/A" + if isThrottled, throttleReason := this.migrationContext.IsThrottled(); isThrottled { + eta = fmt.Sprintf("throttled, %s", throttleReason) + } else if progressPct > 100.0 { + eta = "Due" + } else if progressPct >= 1.0 { + elapsedRowCopySeconds := this.migrationContext.ElapsedRowCopyTime().Seconds() + totalExpectedSeconds := elapsedRowCopySeconds * float64(rowsEstimate) / float64(totalRowsCopied) + etaSeconds = totalExpectedSeconds - elapsedRowCopySeconds + if etaSeconds >= 0 { + etaDuration := time.Duration(etaSeconds) * time.Second + eta = base.PrettifyDurationOutput(etaDuration) + } else { + eta = "Due" + } + } shouldPrintStatus := false if elapsedSeconds <= 60 { shouldPrintStatus = true - } else if progressPct >= 99.0 { + } else if etaSeconds <= 60 { shouldPrintStatus = true - } else if progressPct >= 95.0 { + } else if etaSeconds <= 180 { shouldPrintStatus = (elapsedSeconds%5 == 0) - } else if elapsedSeconds <= 120 { + } else if elapsedSeconds <= 180 { shouldPrintStatus = (elapsedSeconds%5 == 0) } else { shouldPrintStatus = (elapsedSeconds%30 == 0) @@ -588,27 +613,14 @@ func (this *Migrator) printStatus() { return } - eta := "N/A" - if isThrottled, throttleReason := this.migrationContext.IsThrottled(); isThrottled { - eta = fmt.Sprintf("throttled, %s", throttleReason) - } else if progressPct > 100.0 { - eta = "Due" - } else if progressPct >= 2.0 { - elapsedRowCopySeconds := this.migrationContext.ElapsedRowCopyTime().Seconds() - totalExpectedSeconds := elapsedRowCopySeconds * float64(rowsEstimate) / float64(totalRowsCopied) - etaSeconds := totalExpectedSeconds - elapsedRowCopySeconds - etaDuration := time.Duration(etaSeconds) * time.Second - if etaDuration >= 0 { - eta = base.PrettifyDurationOutput(etaDuration) - } else { - eta = "Due" - } - } - status := fmt.Sprintf("Copy: %d/%d %.1f%%; Applied: %d; Backlog: %d/%d; Elapsed: %+v(copy), %+v(total); ETA: %s", + currentBinlogCoordinates := *this.eventsStreamer.GetCurrentBinlogCoordinates() + + status := fmt.Sprintf("Copy: %d/%d %.1f%%; Applied: %d; Backlog: %d/%d; Elapsed: %+v(copy), %+v(total); streamer: %+v; ETA: %s", totalRowsCopied, rowsEstimate, progressPct, atomic.LoadInt64(&this.migrationContext.TotalDMLEventsApplied), len(this.applyEventsQueue), cap(this.applyEventsQueue), base.PrettifyDurationOutput(this.migrationContext.ElapsedRowCopyTime()), base.PrettifyDurationOutput(elapsedTime), + currentBinlogCoordinates, eta, ) this.applier.WriteChangelog( @@ -656,7 +668,11 @@ func (this *Migrator) initiateStreaming() error { go func() { log.Debugf("Beginning streaming") - this.eventsStreamer.StreamEvents(func() bool { return this.canStopStreaming() }) + err := this.eventsStreamer.StreamEvents(this.canStopStreaming) + if err != nil { + this.panicAbort <- err + } + log.Debugf("Done streaming") }() return nil } diff --git a/go/logic/streamer.go b/go/logic/streamer.go index e855aad..905494e 100644 --- a/go/logic/streamer.go +++ b/go/logic/streamer.go @@ -33,14 +33,14 @@ const ( // EventsStreamer reads data from binary logs and streams it on. It acts as a publisher, // and interested parties may subscribe for per-table events. type EventsStreamer struct { - connectionConfig *mysql.ConnectionConfig - db *gosql.DB - migrationContext *base.MigrationContext - nextBinlogCoordinates *mysql.BinlogCoordinates - listeners [](*BinlogEventListener) - listenersMutex *sync.Mutex - eventsChannel chan *binlog.BinlogEntry - binlogReader binlog.BinlogReader + connectionConfig *mysql.ConnectionConfig + db *gosql.DB + migrationContext *base.MigrationContext + initialBinlogCoordinates *mysql.BinlogCoordinates + listeners [](*BinlogEventListener) + listenersMutex *sync.Mutex + eventsChannel chan *binlog.BinlogEntry + binlogReader binlog.BinlogReader } func NewEventsStreamer() *EventsStreamer { @@ -80,19 +80,19 @@ func (this *EventsStreamer) notifyListeners(binlogEvent *binlog.BinlogDMLEvent) defer this.listenersMutex.Unlock() for _, listener := range this.listeners { + listener := listener if strings.ToLower(listener.databaseName) != strings.ToLower(binlogEvent.DatabaseName) { continue } if strings.ToLower(listener.tableName) != strings.ToLower(binlogEvent.TableName) { continue } - onDmlEvent := listener.onDmlEvent if listener.async { go func() { - onDmlEvent(binlogEvent) + listener.onDmlEvent(binlogEvent) }() } else { - onDmlEvent(binlogEvent) + listener.onDmlEvent(binlogEvent) } } } @@ -112,7 +112,7 @@ func (this *EventsStreamer) InitDBConnections() (err error) { if err != nil { return err } - if err := goMySQLReader.ConnectBinlogStreamer(*this.nextBinlogCoordinates); err != nil { + if err := goMySQLReader.ConnectBinlogStreamer(*this.initialBinlogCoordinates); err != nil { return err } this.binlogReader = goMySQLReader @@ -134,13 +134,17 @@ func (this *EventsStreamer) validateConnection() error { return nil } +func (this *EventsStreamer) GetCurrentBinlogCoordinates() *mysql.BinlogCoordinates { + return this.binlogReader.GetCurrentBinlogCoordinates() +} + // validateGrants verifies the user by which we're executing has necessary grants // to do its thang. func (this *EventsStreamer) readCurrentBinlogCoordinates() error { query := `show /* gh-ost readCurrentBinlogCoordinates */ master status` foundMasterStatus := false err := sqlutils.QueryRowsMap(this.db, query, func(m sqlutils.RowMap) error { - this.nextBinlogCoordinates = &mysql.BinlogCoordinates{ + this.initialBinlogCoordinates = &mysql.BinlogCoordinates{ LogFile: m.GetString("File"), LogPos: m.GetInt64("Position"), } @@ -154,7 +158,7 @@ func (this *EventsStreamer) readCurrentBinlogCoordinates() error { if !foundMasterStatus { return fmt.Errorf("Got no results from SHOW MASTER STATUS. Bailing out") } - log.Debugf("Streamer binlog coordinates: %+v", *this.nextBinlogCoordinates) + log.Debugf("Streamer binlog coordinates: %+v", *this.initialBinlogCoordinates) return nil } @@ -168,5 +172,15 @@ func (this *EventsStreamer) StreamEvents(canStopStreaming func() bool) error { } } }() - return this.binlogReader.StreamEvents(canStopStreaming, this.eventsChannel) + // The next should block and execute forever, unless there's a serious error + for { + if err := this.binlogReader.StreamEvents(canStopStreaming, this.eventsChannel); err != nil { + // Reposition at same coordinates. Single attempt (TODO: make multiple attempts?) + log.Infof("StreamEvents encountered unexpected error: %+v", err) + err = this.binlogReader.Reconnect() + if err != nil { + return err + } + } + } } diff --git a/go/mysql/utils.go b/go/mysql/utils.go index 80ee6b0..9abfa5a 100644 --- a/go/mysql/utils.go +++ b/go/mysql/utils.go @@ -108,15 +108,19 @@ func GetMasterConnectionConfigSafe(connectionConfig *ConnectionConfig, visitedKe return GetMasterConnectionConfigSafe(masterConfig, visitedKeys) } -func GetReadBinlogCoordinates(db *gosql.DB) (readBinlogCoordinates *BinlogCoordinates, err error) { +func GetReplicationBinlogCoordinates(db *gosql.DB) (readBinlogCoordinates *BinlogCoordinates, executeBinlogCoordinates *BinlogCoordinates, err error) { err = sqlutils.QueryRowsMap(db, `show slave status`, func(m sqlutils.RowMap) error { readBinlogCoordinates = &BinlogCoordinates{ LogFile: m.GetString("Master_Log_File"), LogPos: m.GetInt64("Read_Master_Log_Pos"), } + executeBinlogCoordinates = &BinlogCoordinates{ + LogFile: m.GetString("Relay_Master_Log_File"), + LogPos: m.GetInt64("Exec_Master_Log_Pos"), + } return nil }) - return readBinlogCoordinates, err + return readBinlogCoordinates, executeBinlogCoordinates, err } func GetSelfBinlogCoordinates(db *gosql.DB) (selfBinlogCoordinates *BinlogCoordinates, err error) {