Merge pull request #49 from github/end-cleanup

cleanup at end of operation
This commit is contained in:
Shlomi Noach 2016-06-01 10:41:52 +02:00
commit 78a39d90f8

View File

@ -335,6 +335,9 @@ func (this *Migrator) Migrate() (err error) {
return err return err
} }
if err := this.finalCleanup(); err != nil {
return nil
}
log.Infof("Done migrating %s.%s", sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.OriginalTableName)) log.Infof("Done migrating %s.%s", sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.OriginalTableName))
return nil return nil
} }
@ -381,26 +384,10 @@ func (this *Migrator) stopWritesAndCompleteMigration() (err error) {
}); err != nil { }); err != nil {
return err return err
} }
if err := this.dropOldTableIfRequired(); err != nil {
return err
}
} }
return return
} }
func (this *Migrator) dropOldTableIfRequired() (err error) {
if !this.migrationContext.OkToDropTable {
return nil
}
dropTableFunc := func() error {
return this.applier.dropTable(this.migrationContext.GetOldTableName())
}
if err := this.retryOperation(dropTableFunc); err != nil {
return err
}
return nil
}
// Inject the "AllEventsUpToLockProcessed" state hint, wait for it to appear in the binary logs, // Inject the "AllEventsUpToLockProcessed" state hint, wait for it to appear in the binary logs,
// make sure the queue is drained. // make sure the queue is drained.
func (this *Migrator) waitForEventsUpToLock() (err error) { func (this *Migrator) waitForEventsUpToLock() (err error) {
@ -435,9 +422,6 @@ func (this *Migrator) stopWritesAndCompleteMigrationOnMasterQuickAndBumpy() (err
if err := this.retryOperation(this.applier.UnlockTables); err != nil { if err := this.retryOperation(this.applier.UnlockTables); err != nil {
return err return err
} }
if err := this.dropOldTableIfRequired(); err != nil {
return err
}
lockAndRenameDuration := this.migrationContext.RenameTablesEndTime.Sub(this.migrationContext.LockTablesStartTime) lockAndRenameDuration := this.migrationContext.RenameTablesEndTime.Sub(this.migrationContext.LockTablesStartTime)
renameDuration := this.migrationContext.RenameTablesEndTime.Sub(this.migrationContext.RenameTablesStartTime) renameDuration := this.migrationContext.RenameTablesEndTime.Sub(this.migrationContext.RenameTablesStartTime)
@ -520,6 +504,7 @@ func (this *Migrator) stopWritesAndCompleteMigrationOnReplica() (err error) {
this.waitForEventsUpToLock() this.waitForEventsUpToLock()
this.printMigrationStatusHint()
log.Info("Table duplicated with new schema. Am not touching the original table. Replication is stopped. You may now compare the two tables to gain trust into this tool's operation") log.Info("Table duplicated with new schema. Am not touching the original table. Replication is stopped. You may now compare the two tables to gain trust into this tool's operation")
return nil return nil
} }
@ -569,6 +554,17 @@ func (this *Migrator) initiateStatus() error {
return nil return nil
} }
func (this *Migrator) printMigrationStatusHint() {
hint := fmt.Sprintf("# Migrating %s.%s; Ghost table is %s.%s; migration started at %+v",
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.OriginalTableName),
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.GetGhostTableName()),
this.migrationContext.StartTime.Format(time.RubyDate),
)
fmt.Println(hint)
}
func (this *Migrator) printStatus() { func (this *Migrator) printStatus() {
elapsedTime := this.migrationContext.ElapsedTime() elapsedTime := this.migrationContext.ElapsedTime()
elapsedSeconds := int64(elapsedTime.Seconds()) elapsedSeconds := int64(elapsedTime.Seconds())
@ -580,16 +576,9 @@ func (this *Migrator) printStatus() {
} }
// Before status, let's see if we should print a nice reminder for what exactly we're doing here. // Before status, let's see if we should print a nice reminder for what exactly we're doing here.
shouldPrintCourtesyReminder := (elapsedSeconds%600 == 0) shouldPrintMigrationStatusHint := (elapsedSeconds%600 == 0)
if shouldPrintCourtesyReminder { if shouldPrintMigrationStatusHint {
courtesyReminder := fmt.Sprintf("# Migrating %s.%s; Ghost table is %s.%s; migration started at %+v", this.printMigrationStatusHint()
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.OriginalTableName),
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.GetGhostTableName()),
this.migrationContext.StartTime.Format(time.RubyDate),
)
fmt.Println(courtesyReminder)
} }
var etaSeconds float64 = math.MaxFloat64 var etaSeconds float64 = math.MaxFloat64
@ -816,3 +805,20 @@ func (this *Migrator) executeWriteFuncs() error {
} }
return nil return nil
} }
// finalCleanup takes actions at very end of migration, dropping tables etc.
func (this *Migrator) finalCleanup() error {
if err := this.retryOperation(this.applier.DropChangelogTable); err != nil {
return err
}
if this.migrationContext.OkToDropTable && !this.migrationContext.TestOnReplica {
dropTableFunc := func() error {
return this.applier.dropTable(this.migrationContext.GetOldTableName())
}
if err := this.retryOperation(dropTableFunc); err != nil {
return err
}
}
return nil
}