From 7b63b4a275aaed7ea0cf9a87b6579b6e22596a70 Mon Sep 17 00:00:00 2001 From: Shlomi Noach Date: Thu, 27 Oct 2016 13:52:37 +0200 Subject: [PATCH] proper cleanup of streamer connection --- go/base/context.go | 1 + go/binlog/gomysql_reader.go | 8 ++++++++ go/logic/migrator.go | 6 +++++- go/logic/streamer.go | 6 ++++++ 4 files changed, 20 insertions(+), 1 deletion(-) diff --git a/go/base/context.go b/go/base/context.go index fe201e4..f405fdc 100644 --- a/go/base/context.go +++ b/go/base/context.go @@ -145,6 +145,7 @@ type MigrationContext struct { AllEventsUpToLockProcessedInjectedFlag int64 CleanupImminentFlag int64 UserCommandedUnpostponeFlag int64 + CutOverCompleteFlag int64 PanicAbort chan error OriginalTableColumnsOnApplier *sql.ColumnList diff --git a/go/binlog/gomysql_reader.go b/go/binlog/gomysql_reader.go index ac6d890..61f8e3b 100644 --- a/go/binlog/gomysql_reader.go +++ b/go/binlog/gomysql_reader.go @@ -120,6 +120,9 @@ func (this *GoMySQLReader) handleRowsEvent(ev *replication.BinlogEvent, rowsEven // StreamEvents func (this *GoMySQLReader) StreamEvents(canStopStreaming func() bool, entriesChannel chan<- *BinlogEntry) error { + if canStopStreaming() { + return nil + } for { if canStopStreaming() { break @@ -150,3 +153,8 @@ func (this *GoMySQLReader) StreamEvents(canStopStreaming func() bool, entriesCha return nil } + +func (this *GoMySQLReader) Close() error { + this.binlogSyncer.Close() + return nil +} diff --git a/go/logic/migrator.go b/go/logic/migrator.go index b0a6246..c135ef8 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -171,7 +171,7 @@ func (this *Migrator) consumeRowCopyComplete() { } func (this *Migrator) canStopStreaming() bool { - return false + return atomic.LoadInt64(&this.migrationContext.CutOverCompleteFlag) != 0 } // onChangelogStateEvent is called when a binlog event operation on the changelog table is intercepted. @@ -345,6 +345,7 @@ func (this *Migrator) Migrate() (err error) { if err := this.cutOver(); err != nil { return err } + atomic.StoreInt64(&this.migrationContext.CutOverCompleteFlag, 1) if err := this.finalCleanup(); err != nil { return nil @@ -1058,6 +1059,9 @@ func (this *Migrator) finalCleanup() error { log.Errore(err) } } + if err := this.eventsStreamer.Close(); err != nil { + log.Errore(err) + } if err := this.retryOperation(this.applier.DropChangelogTable); err != nil { return err diff --git a/go/logic/streamer.go b/go/logic/streamer.go index 86faab1..dc5ba60 100644 --- a/go/logic/streamer.go +++ b/go/logic/streamer.go @@ -217,3 +217,9 @@ func (this *EventsStreamer) StreamEvents(canStopStreaming func() bool) error { } } } + +func (this *EventsStreamer) Close() (err error) { + err = this.binlogReader.Close() + log.Infof("Closed streamer connection. err=%+v", err) + return err +}