storing and updating streamer binlog coordinates

This commit is contained in:
Shlomi Noach 2016-12-20 16:47:06 +02:00
parent 3223a9389e
commit 6f81d62a31
2 changed files with 10 additions and 0 deletions

View File

@ -177,6 +177,7 @@ type MigrationContext struct {
MigrationIterationRangeMinValues *sql.ColumnValues MigrationIterationRangeMinValues *sql.ColumnValues
MigrationIterationRangeMaxValues *sql.ColumnValues MigrationIterationRangeMaxValues *sql.ColumnValues
EncodedRangeValues map[string]string EncodedRangeValues map[string]string
StreamerBinlogCoordinates mysql.BinlogCoordinates
} }
type ContextConfig struct { type ContextConfig struct {
@ -212,6 +213,7 @@ func newMigrationContext() *MigrationContext {
throttleControlReplicaKeys: mysql.NewInstanceKeyMap(), throttleControlReplicaKeys: mysql.NewInstanceKeyMap(),
configMutex: &sync.Mutex{}, configMutex: &sync.Mutex{},
pointOfInterestTimeMutex: &sync.Mutex{}, pointOfInterestTimeMutex: &sync.Mutex{},
StreamerBinlogCoordinates: mysql.BinlogCoordinates{},
ColumnRenameMap: make(map[string]string), ColumnRenameMap: make(map[string]string),
EncodedRangeValues: make(map[string]string), EncodedRangeValues: make(map[string]string),
} }
@ -546,6 +548,13 @@ func (this *MigrationContext) SetNiceRatio(newRatio float64) {
this.niceRatio = newRatio this.niceRatio = newRatio
} }
func (this *MigrationContext) SetStreamerBinlogCoordinates(binlogCoordinates *mysql.BinlogCoordinates) {
this.throttleMutex.Lock()
defer this.throttleMutex.Unlock()
this.StreamerBinlogCoordinates = *binlogCoordinates
}
// ReadMaxLoad parses the `--max-load` flag, which is in multiple key-value format, // ReadMaxLoad parses the `--max-load` flag, which is in multiple key-value format,
// such as: 'Threads_running=100,Threads_connected=500' // such as: 'Threads_running=100,Threads_connected=500'
// It only applies changes in case there's no parsing error. // It only applies changes in case there's no parsing error.

View File

@ -1042,6 +1042,7 @@ func (this *Migrator) executeWriteFuncs() error {
select { select {
case <-contextDumpTick: case <-contextDumpTick:
{ {
this.migrationContext.SetStreamerBinlogCoordinates(this.eventsStreamer.GetCurrentBinlogCoordinates())
if jsonString, err := this.migrationContext.ToJSON(); err == nil { if jsonString, err := this.migrationContext.ToJSON(); err == nil {
this.applier.WriteChangelog("context", jsonString) this.applier.WriteChangelog("context", jsonString)
log.Debugf("Context dumped") log.Debugf("Context dumped")