diff --git a/build.sh b/build.sh index d79fab0..deb0e83 100644 --- a/build.sh +++ b/build.sh @@ -1,7 +1,7 @@ #!/bin/bash # # -RELEASE_VERSION="0.7.16" +RELEASE_VERSION="0.7.17" buildpath=/tmp/gh-ost target=gh-ost diff --git a/go/binlog/binlog_reader.go b/go/binlog/binlog_reader.go index 5720a43..e7dc2cf 100644 --- a/go/binlog/binlog_reader.go +++ b/go/binlog/binlog_reader.go @@ -5,14 +5,11 @@ package binlog -import ( - "github.com/github/gh-ost/go/mysql" -) +import () // BinlogReader is a general interface whose implementations can choose their methods of reading // a binary log file and parsing it into binlog entries type BinlogReader interface { StreamEvents(canStopStreaming func() bool, entriesChannel chan<- *BinlogEntry) error - GetCurrentBinlogCoordinates() *mysql.BinlogCoordinates Reconnect() error } diff --git a/go/binlog/gomysql_reader.go b/go/binlog/gomysql_reader.go index a566523..3cae0e6 100644 --- a/go/binlog/gomysql_reader.go +++ b/go/binlog/gomysql_reader.go @@ -7,6 +7,7 @@ package binlog import ( "fmt" + "sync" "github.com/github/gh-ost/go/mysql" "github.com/github/gh-ost/go/sql" @@ -23,20 +24,21 @@ const ( ) type GoMySQLReader struct { - connectionConfig *mysql.ConnectionConfig - binlogSyncer *replication.BinlogSyncer - binlogStreamer *replication.BinlogStreamer - tableMap map[uint64]string - currentCoordinates mysql.BinlogCoordinates - lastHandledCoordinates mysql.BinlogCoordinates + connectionConfig *mysql.ConnectionConfig + binlogSyncer *replication.BinlogSyncer + binlogStreamer *replication.BinlogStreamer + currentCoordinates mysql.BinlogCoordinates + currentCoordinatesMutex *sync.Mutex + LastAppliedRowsEventHint mysql.BinlogCoordinates } func NewGoMySQLReader(connectionConfig *mysql.ConnectionConfig) (binlogReader *GoMySQLReader, err error) { binlogReader = &GoMySQLReader{ - connectionConfig: connectionConfig, - tableMap: make(map[uint64]string), - currentCoordinates: mysql.BinlogCoordinates{}, - binlogStreamer: nil, + connectionConfig: connectionConfig, + currentCoordinates: mysql.BinlogCoordinates{}, + currentCoordinatesMutex: &sync.Mutex{}, + binlogSyncer: nil, + binlogStreamer: nil, } binlogReader.binlogSyncer = replication.NewBinlogSyncer(serverId, "mysql") @@ -63,11 +65,7 @@ func (this *GoMySQLReader) ConnectBinlogStreamer(coordinates mysql.BinlogCoordin func (this *GoMySQLReader) Reconnect() error { this.binlogSyncer.Close() - - connectCoordinates := &this.lastHandledCoordinates - if connectCoordinates.IsEmpty() { - connectCoordinates = &this.currentCoordinates - } + connectCoordinates := &mysql.BinlogCoordinates{LogFile: this.currentCoordinates.LogFile, LogPos: 4} if err := this.ConnectBinlogStreamer(*connectCoordinates); err != nil { return err } @@ -75,13 +73,16 @@ func (this *GoMySQLReader) Reconnect() error { } func (this *GoMySQLReader) GetCurrentBinlogCoordinates() *mysql.BinlogCoordinates { - return &this.currentCoordinates + this.currentCoordinatesMutex.Lock() + defer this.currentCoordinatesMutex.Unlock() + returnCoordinates := this.currentCoordinates + return &returnCoordinates } // StreamEvents func (this *GoMySQLReader) handleRowsEvent(ev *replication.BinlogEvent, rowsEvent *replication.RowsEvent, entriesChannel chan<- *BinlogEntry) error { - if this.currentCoordinates.SmallerThanOrEquals(&this.lastHandledCoordinates) && !this.lastHandledCoordinates.IsEmpty() { - log.Infof("Skipping handled query at %+v", this.currentCoordinates) + if this.currentCoordinates.SmallerThanOrEquals(&this.LastAppliedRowsEventHint) { + log.Debugf("Skipping handled query at %+v", this.currentCoordinates) return nil } @@ -122,6 +123,7 @@ func (this *GoMySQLReader) handleRowsEvent(ev *replication.BinlogEvent, rowsEven // In reality, reads will be synchronous entriesChannel <- binlogEntry } + this.LastAppliedRowsEventHint = this.currentCoordinates return nil } @@ -135,21 +137,33 @@ func (this *GoMySQLReader) StreamEvents(canStopStreaming func() bool, entriesCha if err != nil { return err } - this.currentCoordinates.LogPos = int64(ev.Header.LogPos) + // if rand.Intn(1000) == 0 { + // this.binlogSyncer.Close() + // log.Debugf("current: %+v, hint: %+v", this.currentCoordinates, this.LastAppliedRowsEventHint) + // return log.Errorf(".............haha got random error") + // } + // log.Debugf("0001 ........ currentCoordinates: %+v", this.currentCoordinates) //TODO + func() { + this.currentCoordinatesMutex.Lock() + defer this.currentCoordinatesMutex.Unlock() + this.currentCoordinates.LogPos = int64(ev.Header.LogPos) + }() if rotateEvent, ok := ev.Event.(*replication.RotateEvent); ok { - this.currentCoordinates.LogFile = string(rotateEvent.NextLogName) + // log.Debugf("0008 ........ currentCoordinates: %+v", this.currentCoordinates) //TODO + // ev.Dump(os.Stdout) + func() { + this.currentCoordinatesMutex.Lock() + defer this.currentCoordinatesMutex.Unlock() + this.currentCoordinates.LogFile = string(rotateEvent.NextLogName) + }() + // log.Debugf("0001 ........ currentCoordinates: %+v", this.currentCoordinates) //TODO log.Infof("rotate to next log name: %s", rotateEvent.NextLogName) - } else if tableMapEvent, ok := ev.Event.(*replication.TableMapEvent); ok { - // Actually not being used, since Table is available in RowsEvent. - // Keeping this here in case I'm wrong about this. Sometime in the near - // future I should remove this. - this.tableMap[tableMapEvent.TableID] = string(tableMapEvent.Table) } else if rowsEvent, ok := ev.Event.(*replication.RowsEvent); ok { if err := this.handleRowsEvent(ev, rowsEvent, entriesChannel); err != nil { return err } } - this.lastHandledCoordinates = this.currentCoordinates + // log.Debugf("TODO ........ currentCoordinates: %+v", this.currentCoordinates) //TODO } log.Debugf("done streaming events") diff --git a/go/logic/streamer.go b/go/logic/streamer.go index 9485c4d..e6beaa2 100644 --- a/go/logic/streamer.go +++ b/go/logic/streamer.go @@ -42,7 +42,7 @@ type EventsStreamer struct { listeners [](*BinlogEventListener) listenersMutex *sync.Mutex eventsChannel chan *binlog.BinlogEntry - binlogReader binlog.BinlogReader + binlogReader *binlog.GoMySQLReader } func NewEventsStreamer() *EventsStreamer { @@ -110,15 +110,22 @@ func (this *EventsStreamer) InitDBConnections() (err error) { if err := this.readCurrentBinlogCoordinates(); err != nil { return err } + if err := this.initBinlogReader(this.initialBinlogCoordinates); err != nil { + return err + } + + return nil +} + +func (this *EventsStreamer) initBinlogReader(binlogCoordinates *mysql.BinlogCoordinates) error { goMySQLReader, err := binlog.NewGoMySQLReader(this.migrationContext.InspectorConnectionConfig) if err != nil { return err } - if err := goMySQLReader.ConnectBinlogStreamer(*this.initialBinlogCoordinates); err != nil { + if err := goMySQLReader.ConnectBinlogStreamer(*binlogCoordinates); err != nil { return err } this.binlogReader = goMySQLReader - return nil } @@ -140,6 +147,10 @@ func (this *EventsStreamer) GetCurrentBinlogCoordinates() *mysql.BinlogCoordinat return this.binlogReader.GetCurrentBinlogCoordinates() } +func (this *EventsStreamer) GetReconnectBinlogCoordinates() *mysql.BinlogCoordinates { + return &mysql.BinlogCoordinates{LogFile: this.GetCurrentBinlogCoordinates().LogFile, LogPos: 4} +} + // validateGrants verifies the user by which we're executing has necessary grants // to do its thang. func (this *EventsStreamer) readCurrentBinlogCoordinates() error { @@ -177,14 +188,19 @@ func (this *EventsStreamer) StreamEvents(canStopStreaming func() bool) error { // The next should block and execute forever, unless there's a serious error for { if err := this.binlogReader.StreamEvents(canStopStreaming, this.eventsChannel); err != nil { - // Reposition at same coordinates. Single attempt (TODO: make multiple attempts?) log.Infof("StreamEvents encountered unexpected error: %+v", err) time.Sleep(ReconnectStreamerSleepSeconds * time.Second) - log.Infof("Reconnecting...") - err = this.binlogReader.Reconnect() - if err != nil { + + // Reposition at same binlog file. Single attempt (TODO: make multiple attempts?) + lastAppliedRowsEventHint := this.binlogReader.LastAppliedRowsEventHint + log.Infof("Reconnecting... Will resume at %+v", lastAppliedRowsEventHint) + // if err := this.binlogReader.Reconnect(); err != nil { + // return err + // } + if err := this.initBinlogReader(this.GetReconnectBinlogCoordinates()); err != nil { return err } + this.binlogReader.LastAppliedRowsEventHint = lastAppliedRowsEventHint } } }