From 3c85298b77a07030bf783723be53883894206467 Mon Sep 17 00:00:00 2001 From: Shlomi Noach Date: Tue, 19 Apr 2016 04:25:32 -0700 Subject: [PATCH] - Better, fewer NOOP checks around the code - Keeping track of `TotalDMLEventsApplied` --- go/base/context.go | 1 + go/logic/applier.go | 18 +++--------------- go/logic/migrator.go | 15 +++++++-------- 3 files changed, 11 insertions(+), 23 deletions(-) diff --git a/go/base/context.go b/go/base/context.go index e6c5c15..074193f 100644 --- a/go/base/context.go +++ b/go/base/context.go @@ -65,6 +65,7 @@ type MigrationContext struct { RenameTablesEndTime time.Time CurrentLag int64 TotalRowsCopied int64 + TotalDMLEventsApplied int64 isThrottled bool throttleReason string throttleMutex *sync.Mutex diff --git a/go/logic/applier.go b/go/logic/applier.go index 8d355b8..c7ff916 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -134,10 +134,6 @@ func (this *Applier) CreateChangelogTable() error { // dropTable drops a given table on the applied host func (this *Applier) dropTable(tableName string) error { - if this.migrationContext.Noop { - log.Debugf("Noop operation; not really dropping table %s", sql.EscapeName(tableName)) - return nil - } query := fmt.Sprintf(`drop /* gh-osc */ table if exists %s.%s`, sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(tableName), @@ -399,11 +395,6 @@ func (this *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected // LockTables func (this *Applier) LockTables() error { - if this.migrationContext.Noop { - log.Debugf("Noop operation; not really locking tables") - return nil - } - // query := fmt.Sprintf(`lock /* gh-osc */ tables %s.%s write, %s.%s write, %s.%s write`, // sql.EscapeName(this.migrationContext.DatabaseName), // sql.EscapeName(this.migrationContext.OriginalTableName), @@ -438,11 +429,6 @@ func (this *Applier) UnlockTables() error { // LockTables func (this *Applier) SwapTables() error { - if this.migrationContext.Noop { - log.Debugf("Noop operation; not really swapping tables") - return nil - } - // query := fmt.Sprintf(`rename /* gh-osc */ table %s.%s to %s.%s, %s.%s to %s.%s`, // sql.EscapeName(this.migrationContext.DatabaseName), // sql.EscapeName(this.migrationContext.OriginalTableName), @@ -529,7 +515,9 @@ func (this *Applier) ApplyDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) error { if err != nil { return err } - log.Errorf(query) _, err = sqlutils.Exec(this.db, query, args...) + if err == nil { + atomic.AddInt64(&this.migrationContext.TotalDMLEventsApplied, 1) + } return err } diff --git a/go/logic/migrator.go b/go/logic/migrator.go index 1b5b406..25b910b 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -294,6 +294,10 @@ func (this *Migrator) Migrate() (err error) { } func (this *Migrator) stopWritesAndCompleteMigration() (err error) { + if this.migrationContext.Noop { + log.Debugf("Noop operation; not really swapping tables") + return nil + } this.throttle(func() { log.Debugf("throttling before LOCK TABLES") }) @@ -319,11 +323,9 @@ func (this *Migrator) stopWritesAndCompleteMigration() (err error) { if err := this.retryOperation(this.applier.SwapTables); err != nil { return err } - // Unlock if err := this.retryOperation(this.applier.UnlockTables); err != nil { return err } - // Drop old table if this.migrationContext.OkToDropTable { dropTableFunc := func() error { return this.applier.dropTable(this.migrationContext.GetOldTableName()) @@ -408,8 +410,9 @@ func (this *Migrator) printStatus() { if isThrottled, throttleReason := this.migrationContext.IsThrottled(); isThrottled { eta = fmt.Sprintf("throttled, %s", throttleReason) } - status := fmt.Sprintf("Copy: %d/%d %.1f%%; Backlog: %d/%d; Elapsed: %+v(copy), %+v(total); ETA: %s", + status := fmt.Sprintf("Copy: %d/%d %.1f%%; Applied: %d; Backlog: %d/%d; Elapsed: %+v(copy), %+v(total); ETA: %s", totalRowsCopied, rowsEstimate, progressPct, + atomic.LoadInt64(&this.migrationContext.TotalDMLEventsApplied), len(this.applyEventsQueue), cap(this.applyEventsQueue), base.PrettifyDurationOutput(this.migrationContext.ElapsedRowCopyTime()), base.PrettifyDurationOutput(elapsedTime), eta, @@ -448,10 +451,6 @@ func (this *Migrator) initiateStreaming() error { if err := this.eventsStreamer.InitDBConnections(); err != nil { return err } - if this.migrationContext.Noop { - log.Debugf("Noop operation; not really listening on binlog events") - return nil - } this.eventsStreamer.AddListener( false, this.migrationContext.DatabaseName, @@ -540,7 +539,7 @@ func (this *Migrator) iterateChunks() error { func (this *Migrator) executeWriteFuncs() error { if this.migrationContext.Noop { - log.Debugf("Noop operation; not really doing writes") + log.Debugf("Noop operation; not really executing write funcs") return nil } for {