Merge pull request #294 from github/close-streamer-connection

Close streamer connection
This commit is contained in:
Shlomi Noach 2016-11-01 12:22:29 +01:00 committed by GitHub
commit ccaa840125
6 changed files with 22 additions and 2 deletions

1
RELEASE_VERSION Normal file
View File

@ -0,0 +1 @@
1.0.28

View File

@ -2,7 +2,7 @@
# #
# #
RELEASE_VERSION="1.0.26" RELEASE_VERSION=$(cat RELEASE_VERSION)
function build { function build {
osname=$1 osname=$1

View File

@ -155,6 +155,7 @@ type MigrationContext struct {
AllEventsUpToLockProcessedInjectedFlag int64 AllEventsUpToLockProcessedInjectedFlag int64
CleanupImminentFlag int64 CleanupImminentFlag int64
UserCommandedUnpostponeFlag int64 UserCommandedUnpostponeFlag int64
CutOverCompleteFlag int64
PanicAbort chan error PanicAbort chan error
OriginalTableColumnsOnApplier *sql.ColumnList OriginalTableColumnsOnApplier *sql.ColumnList

View File

@ -118,6 +118,9 @@ func (this *GoMySQLReader) handleRowsEvent(ev *replication.BinlogEvent, rowsEven
// StreamEvents // StreamEvents
func (this *GoMySQLReader) StreamEvents(canStopStreaming func() bool, entriesChannel chan<- *BinlogEntry) error { func (this *GoMySQLReader) StreamEvents(canStopStreaming func() bool, entriesChannel chan<- *BinlogEntry) error {
if canStopStreaming() {
return nil
}
for { for {
if canStopStreaming() { if canStopStreaming() {
break break
@ -148,3 +151,8 @@ func (this *GoMySQLReader) StreamEvents(canStopStreaming func() bool, entriesCha
return nil return nil
} }
func (this *GoMySQLReader) Close() error {
this.binlogSyncer.Close()
return nil
}

View File

@ -171,7 +171,7 @@ func (this *Migrator) consumeRowCopyComplete() {
} }
func (this *Migrator) canStopStreaming() bool { func (this *Migrator) canStopStreaming() bool {
return false return atomic.LoadInt64(&this.migrationContext.CutOverCompleteFlag) != 0
} }
// onChangelogStateEvent is called when a binlog event operation on the changelog table is intercepted. // onChangelogStateEvent is called when a binlog event operation on the changelog table is intercepted.
@ -345,6 +345,7 @@ func (this *Migrator) Migrate() (err error) {
if err := this.cutOver(); err != nil { if err := this.cutOver(); err != nil {
return err return err
} }
atomic.StoreInt64(&this.migrationContext.CutOverCompleteFlag, 1)
if err := this.finalCleanup(); err != nil { if err := this.finalCleanup(); err != nil {
return nil return nil
@ -1058,6 +1059,9 @@ func (this *Migrator) finalCleanup() error {
log.Errore(err) log.Errore(err)
} }
} }
if err := this.eventsStreamer.Close(); err != nil {
log.Errore(err)
}
if err := this.retryOperation(this.applier.DropChangelogTable); err != nil { if err := this.retryOperation(this.applier.DropChangelogTable); err != nil {
return err return err

View File

@ -217,3 +217,9 @@ func (this *EventsStreamer) StreamEvents(canStopStreaming func() bool) error {
} }
} }
} }
func (this *EventsStreamer) Close() (err error) {
err = this.binlogReader.Close()
log.Infof("Closed streamer connection. err=%+v", err)
return err
}