proper cleanup of streamer connection
This commit is contained in:
parent
8add1c8203
commit
7b63b4a275
@ -145,6 +145,7 @@ type MigrationContext struct {
|
|||||||
AllEventsUpToLockProcessedInjectedFlag int64
|
AllEventsUpToLockProcessedInjectedFlag int64
|
||||||
CleanupImminentFlag int64
|
CleanupImminentFlag int64
|
||||||
UserCommandedUnpostponeFlag int64
|
UserCommandedUnpostponeFlag int64
|
||||||
|
CutOverCompleteFlag int64
|
||||||
PanicAbort chan error
|
PanicAbort chan error
|
||||||
|
|
||||||
OriginalTableColumnsOnApplier *sql.ColumnList
|
OriginalTableColumnsOnApplier *sql.ColumnList
|
||||||
|
@ -120,6 +120,9 @@ func (this *GoMySQLReader) handleRowsEvent(ev *replication.BinlogEvent, rowsEven
|
|||||||
|
|
||||||
// StreamEvents
|
// StreamEvents
|
||||||
func (this *GoMySQLReader) StreamEvents(canStopStreaming func() bool, entriesChannel chan<- *BinlogEntry) error {
|
func (this *GoMySQLReader) StreamEvents(canStopStreaming func() bool, entriesChannel chan<- *BinlogEntry) error {
|
||||||
|
if canStopStreaming() {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
for {
|
for {
|
||||||
if canStopStreaming() {
|
if canStopStreaming() {
|
||||||
break
|
break
|
||||||
@ -150,3 +153,8 @@ func (this *GoMySQLReader) StreamEvents(canStopStreaming func() bool, entriesCha
|
|||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (this *GoMySQLReader) Close() error {
|
||||||
|
this.binlogSyncer.Close()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
@ -171,7 +171,7 @@ func (this *Migrator) consumeRowCopyComplete() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (this *Migrator) canStopStreaming() bool {
|
func (this *Migrator) canStopStreaming() bool {
|
||||||
return false
|
return atomic.LoadInt64(&this.migrationContext.CutOverCompleteFlag) != 0
|
||||||
}
|
}
|
||||||
|
|
||||||
// onChangelogStateEvent is called when a binlog event operation on the changelog table is intercepted.
|
// onChangelogStateEvent is called when a binlog event operation on the changelog table is intercepted.
|
||||||
@ -345,6 +345,7 @@ func (this *Migrator) Migrate() (err error) {
|
|||||||
if err := this.cutOver(); err != nil {
|
if err := this.cutOver(); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
atomic.StoreInt64(&this.migrationContext.CutOverCompleteFlag, 1)
|
||||||
|
|
||||||
if err := this.finalCleanup(); err != nil {
|
if err := this.finalCleanup(); err != nil {
|
||||||
return nil
|
return nil
|
||||||
@ -1058,6 +1059,9 @@ func (this *Migrator) finalCleanup() error {
|
|||||||
log.Errore(err)
|
log.Errore(err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if err := this.eventsStreamer.Close(); err != nil {
|
||||||
|
log.Errore(err)
|
||||||
|
}
|
||||||
|
|
||||||
if err := this.retryOperation(this.applier.DropChangelogTable); err != nil {
|
if err := this.retryOperation(this.applier.DropChangelogTable); err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -217,3 +217,9 @@ func (this *EventsStreamer) StreamEvents(canStopStreaming func() bool) error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (this *EventsStreamer) Close() (err error) {
|
||||||
|
err = this.binlogReader.Close()
|
||||||
|
log.Infof("Closed streamer connection. err=%+v", err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user