- Better, fewer NOOP checks around the code
- Keeping track of `TotalDMLEventsApplied`
This commit is contained in:
parent
4efbfd6e0f
commit
3c85298b77
@ -65,6 +65,7 @@ type MigrationContext struct {
|
|||||||
RenameTablesEndTime time.Time
|
RenameTablesEndTime time.Time
|
||||||
CurrentLag int64
|
CurrentLag int64
|
||||||
TotalRowsCopied int64
|
TotalRowsCopied int64
|
||||||
|
TotalDMLEventsApplied int64
|
||||||
isThrottled bool
|
isThrottled bool
|
||||||
throttleReason string
|
throttleReason string
|
||||||
throttleMutex *sync.Mutex
|
throttleMutex *sync.Mutex
|
||||||
|
@ -134,10 +134,6 @@ func (this *Applier) CreateChangelogTable() error {
|
|||||||
|
|
||||||
// dropTable drops a given table on the applied host
|
// dropTable drops a given table on the applied host
|
||||||
func (this *Applier) dropTable(tableName string) error {
|
func (this *Applier) dropTable(tableName string) error {
|
||||||
if this.migrationContext.Noop {
|
|
||||||
log.Debugf("Noop operation; not really dropping table %s", sql.EscapeName(tableName))
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
query := fmt.Sprintf(`drop /* gh-osc */ table if exists %s.%s`,
|
query := fmt.Sprintf(`drop /* gh-osc */ table if exists %s.%s`,
|
||||||
sql.EscapeName(this.migrationContext.DatabaseName),
|
sql.EscapeName(this.migrationContext.DatabaseName),
|
||||||
sql.EscapeName(tableName),
|
sql.EscapeName(tableName),
|
||||||
@ -399,11 +395,6 @@ func (this *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected
|
|||||||
|
|
||||||
// LockTables
|
// LockTables
|
||||||
func (this *Applier) LockTables() error {
|
func (this *Applier) LockTables() error {
|
||||||
if this.migrationContext.Noop {
|
|
||||||
log.Debugf("Noop operation; not really locking tables")
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// query := fmt.Sprintf(`lock /* gh-osc */ tables %s.%s write, %s.%s write, %s.%s write`,
|
// query := fmt.Sprintf(`lock /* gh-osc */ tables %s.%s write, %s.%s write, %s.%s write`,
|
||||||
// sql.EscapeName(this.migrationContext.DatabaseName),
|
// sql.EscapeName(this.migrationContext.DatabaseName),
|
||||||
// sql.EscapeName(this.migrationContext.OriginalTableName),
|
// sql.EscapeName(this.migrationContext.OriginalTableName),
|
||||||
@ -438,11 +429,6 @@ func (this *Applier) UnlockTables() error {
|
|||||||
|
|
||||||
// LockTables
|
// LockTables
|
||||||
func (this *Applier) SwapTables() error {
|
func (this *Applier) SwapTables() error {
|
||||||
if this.migrationContext.Noop {
|
|
||||||
log.Debugf("Noop operation; not really swapping tables")
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// query := fmt.Sprintf(`rename /* gh-osc */ table %s.%s to %s.%s, %s.%s to %s.%s`,
|
// query := fmt.Sprintf(`rename /* gh-osc */ table %s.%s to %s.%s, %s.%s to %s.%s`,
|
||||||
// sql.EscapeName(this.migrationContext.DatabaseName),
|
// sql.EscapeName(this.migrationContext.DatabaseName),
|
||||||
// sql.EscapeName(this.migrationContext.OriginalTableName),
|
// sql.EscapeName(this.migrationContext.OriginalTableName),
|
||||||
@ -529,7 +515,9 @@ func (this *Applier) ApplyDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
log.Errorf(query)
|
|
||||||
_, err = sqlutils.Exec(this.db, query, args...)
|
_, err = sqlutils.Exec(this.db, query, args...)
|
||||||
|
if err == nil {
|
||||||
|
atomic.AddInt64(&this.migrationContext.TotalDMLEventsApplied, 1)
|
||||||
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -294,6 +294,10 @@ func (this *Migrator) Migrate() (err error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (this *Migrator) stopWritesAndCompleteMigration() (err error) {
|
func (this *Migrator) stopWritesAndCompleteMigration() (err error) {
|
||||||
|
if this.migrationContext.Noop {
|
||||||
|
log.Debugf("Noop operation; not really swapping tables")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
this.throttle(func() {
|
this.throttle(func() {
|
||||||
log.Debugf("throttling before LOCK TABLES")
|
log.Debugf("throttling before LOCK TABLES")
|
||||||
})
|
})
|
||||||
@ -319,11 +323,9 @@ func (this *Migrator) stopWritesAndCompleteMigration() (err error) {
|
|||||||
if err := this.retryOperation(this.applier.SwapTables); err != nil {
|
if err := this.retryOperation(this.applier.SwapTables); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
// Unlock
|
|
||||||
if err := this.retryOperation(this.applier.UnlockTables); err != nil {
|
if err := this.retryOperation(this.applier.UnlockTables); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
// Drop old table
|
|
||||||
if this.migrationContext.OkToDropTable {
|
if this.migrationContext.OkToDropTable {
|
||||||
dropTableFunc := func() error {
|
dropTableFunc := func() error {
|
||||||
return this.applier.dropTable(this.migrationContext.GetOldTableName())
|
return this.applier.dropTable(this.migrationContext.GetOldTableName())
|
||||||
@ -408,8 +410,9 @@ func (this *Migrator) printStatus() {
|
|||||||
if isThrottled, throttleReason := this.migrationContext.IsThrottled(); isThrottled {
|
if isThrottled, throttleReason := this.migrationContext.IsThrottled(); isThrottled {
|
||||||
eta = fmt.Sprintf("throttled, %s", throttleReason)
|
eta = fmt.Sprintf("throttled, %s", throttleReason)
|
||||||
}
|
}
|
||||||
status := fmt.Sprintf("Copy: %d/%d %.1f%%; Backlog: %d/%d; Elapsed: %+v(copy), %+v(total); ETA: %s",
|
status := fmt.Sprintf("Copy: %d/%d %.1f%%; Applied: %d; Backlog: %d/%d; Elapsed: %+v(copy), %+v(total); ETA: %s",
|
||||||
totalRowsCopied, rowsEstimate, progressPct,
|
totalRowsCopied, rowsEstimate, progressPct,
|
||||||
|
atomic.LoadInt64(&this.migrationContext.TotalDMLEventsApplied),
|
||||||
len(this.applyEventsQueue), cap(this.applyEventsQueue),
|
len(this.applyEventsQueue), cap(this.applyEventsQueue),
|
||||||
base.PrettifyDurationOutput(this.migrationContext.ElapsedRowCopyTime()), base.PrettifyDurationOutput(elapsedTime),
|
base.PrettifyDurationOutput(this.migrationContext.ElapsedRowCopyTime()), base.PrettifyDurationOutput(elapsedTime),
|
||||||
eta,
|
eta,
|
||||||
@ -448,10 +451,6 @@ func (this *Migrator) initiateStreaming() error {
|
|||||||
if err := this.eventsStreamer.InitDBConnections(); err != nil {
|
if err := this.eventsStreamer.InitDBConnections(); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if this.migrationContext.Noop {
|
|
||||||
log.Debugf("Noop operation; not really listening on binlog events")
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
this.eventsStreamer.AddListener(
|
this.eventsStreamer.AddListener(
|
||||||
false,
|
false,
|
||||||
this.migrationContext.DatabaseName,
|
this.migrationContext.DatabaseName,
|
||||||
@ -540,7 +539,7 @@ func (this *Migrator) iterateChunks() error {
|
|||||||
|
|
||||||
func (this *Migrator) executeWriteFuncs() error {
|
func (this *Migrator) executeWriteFuncs() error {
|
||||||
if this.migrationContext.Noop {
|
if this.migrationContext.Noop {
|
||||||
log.Debugf("Noop operation; not really doing writes")
|
log.Debugf("Noop operation; not really executing write funcs")
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
for {
|
for {
|
||||||
|
Loading…
Reference in New Issue
Block a user