at resurrection, pointing streamer back at last known applied coordinates
This commit is contained in:
parent
1080b11d81
commit
e50361ab61
@ -31,7 +31,7 @@ const (
|
||||
AllEventsUpToLockProcessed = "AllEventsUpToLockProcessed"
|
||||
)
|
||||
|
||||
const contextDumpInterval time.Duration = 1 * time.Second
|
||||
const contextDumpInterval time.Duration = 1 * time.Minute
|
||||
|
||||
func ReadChangelogState(s string) ChangelogState {
|
||||
return ChangelogState(strings.Split(s, ":")[0])
|
||||
@ -55,14 +55,15 @@ const (
|
||||
|
||||
// Migrator is the main schema migration flow manager.
|
||||
type Migrator struct {
|
||||
parser *sql.Parser
|
||||
inspector *Inspector
|
||||
applier *Applier
|
||||
eventsStreamer *EventsStreamer
|
||||
server *Server
|
||||
throttler *Throttler
|
||||
hooksExecutor *HooksExecutor
|
||||
migrationContext *base.MigrationContext
|
||||
parser *sql.Parser
|
||||
inspector *Inspector
|
||||
applier *Applier
|
||||
eventsStreamer *EventsStreamer
|
||||
server *Server
|
||||
throttler *Throttler
|
||||
hooksExecutor *HooksExecutor
|
||||
migrationContext *base.MigrationContext
|
||||
resurrectedContext *base.MigrationContext
|
||||
|
||||
firstThrottlingCollected chan bool
|
||||
ghostTableMigrated chan bool
|
||||
@ -272,7 +273,7 @@ func (this *Migrator) countTableRows() (err error) {
|
||||
return countRowsFunc()
|
||||
}
|
||||
|
||||
func (this *Migrator) resurrect() error {
|
||||
func (this *Migrator) readResurrectedContext() error {
|
||||
encodedContext, err := this.inspector.readChangelogState("context")
|
||||
if err != nil {
|
||||
return err
|
||||
@ -280,25 +281,30 @@ func (this *Migrator) resurrect() error {
|
||||
if encodedContext == "" {
|
||||
return fmt.Errorf("No resurrect info found")
|
||||
}
|
||||
log.Infof("Proceeding to resurrection")
|
||||
|
||||
// Loading migration context to a temporary location:
|
||||
loadedContext := base.NewMigrationContext()
|
||||
if err := loadedContext.LoadJSON(encodedContext); err != nil {
|
||||
this.resurrectedContext = base.NewMigrationContext()
|
||||
if err := this.resurrectedContext.LoadJSON(encodedContext); err != nil {
|
||||
return err
|
||||
}
|
||||
// Sanity: heuristically verify loaded context truly reflects our very own context (e.g. is this the same migration on the same table?)
|
||||
if this.migrationContext.DatabaseName != loadedContext.DatabaseName {
|
||||
if this.migrationContext.DatabaseName != this.resurrectedContext.DatabaseName {
|
||||
return fmt.Errorf("Resurrection: given --database not identical to resurrected one. Bailing out")
|
||||
}
|
||||
if this.migrationContext.OriginalTableName != loadedContext.OriginalTableName {
|
||||
if this.migrationContext.OriginalTableName != this.resurrectedContext.OriginalTableName {
|
||||
return fmt.Errorf("Resurrection: given --table not identical to resurrected one. Bailing out")
|
||||
}
|
||||
if this.migrationContext.AlterStatement != loadedContext.AlterStatement {
|
||||
if this.migrationContext.AlterStatement != this.resurrectedContext.AlterStatement {
|
||||
return fmt.Errorf("Resurrection: given --alter statement not identical to resurrected one. Bailing out")
|
||||
}
|
||||
// Happy. Let's go live and update our real context
|
||||
this.migrationContext.ApplyResurrectedContext(loadedContext)
|
||||
if this.resurrectedContext.AppliedBinlogCoordinates.IsEmpty() {
|
||||
return fmt.Errorf("Resurrection: no applied binlog coordinates. Seems like the migration you're trying to resurrect crashed before applying a single binlog event. There's not enough info for resurrection, and not much point to it. Just run your migration again.")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (this *Migrator) applyResurrectedContext() error {
|
||||
this.migrationContext.ApplyResurrectedContext(this.resurrectedContext)
|
||||
|
||||
return nil
|
||||
}
|
||||
@ -328,6 +334,11 @@ func (this *Migrator) Migrate() (err error) {
|
||||
if err := this.initiateInspector(); err != nil {
|
||||
return err
|
||||
}
|
||||
if this.migrationContext.Resurrect {
|
||||
if err := this.readResurrectedContext(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if err := this.initiateStreaming(); err != nil {
|
||||
return err
|
||||
}
|
||||
@ -372,7 +383,7 @@ func (this *Migrator) Migrate() (err error) {
|
||||
return err
|
||||
}
|
||||
if this.migrationContext.Resurrect {
|
||||
if err := this.resurrect(); err != nil {
|
||||
if err := this.applyResurrectedContext(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
@ -929,15 +940,19 @@ func (this *Migrator) printStatus(rule PrintStatusRule, writers ...io.Writer) {
|
||||
// initiateStreaming begins treaming of binary log events and registers listeners for such events
|
||||
func (this *Migrator) initiateStreaming() error {
|
||||
this.eventsStreamer = NewEventsStreamer()
|
||||
if err := this.eventsStreamer.InitDBConnections(); err != nil {
|
||||
if err := this.eventsStreamer.InitDBConnections(this.resurrectedContext); err != nil {
|
||||
return err
|
||||
}
|
||||
this.eventsStreamer.AddListener(
|
||||
false,
|
||||
this.migrationContext.DatabaseName,
|
||||
this.migrationContext.GetChangelogTableName(),
|
||||
func(dmlEvent *binlog.BinlogDMLEvent, _ *mysql.BinlogCoordinates) error {
|
||||
return this.onChangelogStateEvent(dmlEvent)
|
||||
func(dmlEvent *binlog.BinlogDMLEvent, coordinates *mysql.BinlogCoordinates) error {
|
||||
err := this.onChangelogStateEvent(dmlEvent)
|
||||
if err == nil {
|
||||
this.migrationContext.SetAppliedBinlogCoordinates(coordinates)
|
||||
}
|
||||
return err
|
||||
},
|
||||
)
|
||||
|
||||
@ -963,7 +978,7 @@ func (this *Migrator) addDMLEventsListener() error {
|
||||
// Create a task to apply the DML event; this will be execute by executeWriteFuncs()
|
||||
applyEventFunc := func() error {
|
||||
err := this.applier.ApplyDMLEventQuery(dmlEvent)
|
||||
if err != nil {
|
||||
if err == nil {
|
||||
this.migrationContext.SetAppliedBinlogCoordinates(coordinates)
|
||||
}
|
||||
return err
|
||||
@ -1091,7 +1106,7 @@ func (this *Migrator) executeWriteFuncs() error {
|
||||
if !this.migrationContext.Resurrect || this.migrationContext.IsResurrected {
|
||||
if jsonString, err := this.migrationContext.ToJSON(); err == nil {
|
||||
this.applier.WriteChangelog("context", jsonString)
|
||||
log.Debugf("Context dumped")
|
||||
log.Debugf("Context dumped. Applied coordinates: %+v", this.migrationContext.AppliedBinlogCoordinates)
|
||||
}
|
||||
}
|
||||
// If we're about to resurrect (resurrect requested) but haven't done so yet, do not wrtie resurrect info.
|
||||
|
@ -105,7 +105,7 @@ func (this *EventsStreamer) notifyListeners(binlogEntry *binlog.BinlogEntry) {
|
||||
}
|
||||
}
|
||||
|
||||
func (this *EventsStreamer) InitDBConnections() (err error) {
|
||||
func (this *EventsStreamer) InitDBConnections(resurrectedContext *base.MigrationContext) (err error) {
|
||||
EventsStreamerUri := this.connectionConfig.GetDBUri(this.migrationContext.DatabaseName)
|
||||
if this.db, _, err = sqlutils.GetDB(EventsStreamerUri); err != nil {
|
||||
return err
|
||||
@ -113,8 +113,13 @@ func (this *EventsStreamer) InitDBConnections() (err error) {
|
||||
if err := this.validateConnection(); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := this.readCurrentBinlogCoordinates(); err != nil {
|
||||
return err
|
||||
if this.migrationContext.Resurrect {
|
||||
log.Infof("Resurrection: initiating streamer at resurrected coordinates %+v", resurrectedContext.AppliedBinlogCoordinates)
|
||||
this.initialBinlogCoordinates = &resurrectedContext.AppliedBinlogCoordinates
|
||||
} else {
|
||||
if err := this.readCurrentBinlogCoordinates(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
if err := this.initBinlogReader(this.initialBinlogCoordinates); err != nil {
|
||||
return err
|
||||
|
Loading…
x
Reference in New Issue
Block a user