From 3e28f462d88c9eb9cc84a26b6228fd278170fdb3 Mon Sep 17 00:00:00 2001 From: Shlomi Noach Date: Tue, 3 Jan 2017 13:44:52 +0200 Subject: [PATCH 1/7] Initial support for batching multiple DMLs when writing to ghost table --- go/logic/applier.go | 45 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 45 insertions(+) diff --git a/go/logic/applier.go b/go/logic/applier.go index afff578..8fb940a 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -950,3 +950,48 @@ func (this *Applier) ApplyDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) error { } return nil } + +// ApplyDMLEventQueries applies multiple DML queries onto the _ghost_ table +func (this *Applier) ApplyDMLEventQueries(dmlEvents [](*binlog.BinlogDMLEvent)) error { + + var totalDelta int64 + + err := func() error { + tx, err := this.db.Begin() + if err != nil { + return err + } + sessionQuery := `SET + SESSION time_zone = '+00:00', + sql_mode = CONCAT(@@session.sql_mode, ',STRICT_ALL_TABLES') + ` + if _, err := tx.Exec(sessionQuery); err != nil { + return err + } + for _, dmlEvent := range dmlEvents { + query, args, rowDelta, err := this.buildDMLEventQuery(dmlEvent) + if err != nil { + return err + } + if _, err := tx.Exec(query, args...); err != nil { + err = fmt.Errorf("%s; query=%s; args=%+v", err.Error(), query, args) + return err + } + totalDelta += rowDelta + } + if err := tx.Commit(); err != nil { + return err + } + return nil + }() + + if err != nil { + return log.Errore(err) + } + // no error + atomic.AddInt64(&this.migrationContext.TotalDMLEventsApplied, int64(len(dmlEvents))) + if this.migrationContext.CountTableRows { + atomic.AddInt64(&this.migrationContext.RowsDeltaEstimate, totalDelta) + } + return nil +} From 445c67635dcaa4a42467a3f9ec86556bb6484ca1 Mon Sep 17 00:00:00 2001 From: Shlomi Noach Date: Tue, 3 Jan 2017 14:31:19 +0200 Subject: [PATCH 2/7] batching DML writes, configurable --dml-batch-size --- go/base/context.go | 12 +++++++++ go/cmd/gh-ost/main.go | 2 ++ go/logic/applier.go | 1 + go/logic/migrator.go | 62 ++++++++++++++++++++++++++++++++++--------- 4 files changed, 65 insertions(+), 12 deletions(-) diff --git a/go/base/context.go b/go/base/context.go index 0f74f97..8775408 100644 --- a/go/base/context.go +++ b/go/base/context.go @@ -149,6 +149,7 @@ type MigrationContext struct { controlReplicasLagResult mysql.ReplicationLagResult TotalRowsCopied int64 TotalDMLEventsApplied int64 + DMLBatchSize int64 isThrottled bool throttleReason string throttleReasonHint ThrottleReasonHint @@ -208,6 +209,7 @@ func newMigrationContext() *MigrationContext { ApplierConnectionConfig: mysql.NewConnectionConfig(), MaxLagMillisecondsThrottleThreshold: 1500, CutOverLockTimeoutSeconds: 3, + DMLBatchSize: 10, maxLoad: NewLoadMap(), criticalLoad: NewLoadMap(), throttleMutex: &sync.Mutex{}, @@ -418,6 +420,16 @@ func (this *MigrationContext) SetChunkSize(chunkSize int64) { atomic.StoreInt64(&this.ChunkSize, chunkSize) } +func (this *MigrationContext) SetDMLBatchSize(batchSize int64) { + if batchSize < 1 { + batchSize = 1 + } + if batchSize > 100 { + batchSize = 100 + } + atomic.StoreInt64(&this.DMLBatchSize, batchSize) +} + func (this *MigrationContext) SetThrottleGeneralCheckResult(checkResult *ThrottleCheckResult) *ThrottleCheckResult { this.throttleMutex.Lock() defer this.throttleMutex.Unlock() diff --git a/go/cmd/gh-ost/main.go b/go/cmd/gh-ost/main.go index fdfa919..4986d1e 100644 --- a/go/cmd/gh-ost/main.go +++ b/go/cmd/gh-ost/main.go @@ -83,6 +83,7 @@ func main() { flag.BoolVar(&migrationContext.SwitchToRowBinlogFormat, "switch-to-rbr", false, "let this tool automatically switch binary log format to 'ROW' on the replica, if needed. The format will NOT be switched back. I'm too scared to do that, and wish to protect you if you happen to execute another migration while this one is running") flag.BoolVar(&migrationContext.AssumeRBR, "assume-rbr", false, "set to 'true' when you know for certain your server uses 'ROW' binlog_format. gh-ost is unable to tell, event after reading binlog_format, whether the replication process does indeed use 'ROW', and restarts replication to be certain RBR setting is applied. Such operation requires SUPER privileges which you might not have. Setting this flag avoids restarting replication and you can proceed to use gh-ost without SUPER privileges") chunkSize := flag.Int64("chunk-size", 1000, "amount of rows to handle in each iteration (allowed range: 100-100,000)") + dmlBatchSize := flag.Int64("dml-batch-size", 10, "batch size for DML events to apply in a single transaction (range 1-100)") defaultRetries := flag.Int64("default-retries", 60, "Default number of retries for various operations before panicking") cutOverLockTimeoutSeconds := flag.Int64("cut-over-lock-timeout-seconds", 3, "Max number of seconds to hold locks on tables while attempting to cut-over (retry attempted when lock exceeds timeout)") niceRatio := flag.Float64("nice-ratio", 0, "force being 'nice', imply sleep time per chunk time; range: [0.0..100.0]. Example values: 0 is aggressive. 1: for every 1ms spent copying rows, sleep additional 1ms (effectively doubling runtime); 0.7: for every 10ms spend in a rowcopy chunk, spend 7ms sleeping immediately after") @@ -220,6 +221,7 @@ func main() { migrationContext.SetHeartbeatIntervalMilliseconds(*heartbeatIntervalMillis) migrationContext.SetNiceRatio(*niceRatio) migrationContext.SetChunkSize(*chunkSize) + migrationContext.SetDMLBatchSize(*dmlBatchSize) migrationContext.SetMaxLagMillisecondsThrottleThreshold(*maxLagMillis) migrationContext.SetReplicationLagQuery(*replicationLagQuery) migrationContext.SetThrottleQuery(*throttleQuery) diff --git a/go/logic/applier.go b/go/logic/applier.go index 8fb940a..d530d11 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -993,5 +993,6 @@ func (this *Applier) ApplyDMLEventQueries(dmlEvents [](*binlog.BinlogDMLEvent)) if this.migrationContext.CountTableRows { atomic.AddInt64(&this.migrationContext.RowsDeltaEstimate, totalDelta) } + log.Debugf("ApplyDMLEventQueries() applied %d events in one transaction", len(dmlEvents)) return nil } diff --git a/go/logic/migrator.go b/go/logic/migrator.go index 423ed27..5db089a 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -37,6 +37,21 @@ func ReadChangelogState(s string) ChangelogState { type tableWriteFunc func() error +type applyEventStruct struct { + writeFunc *tableWriteFunc + dmlEvent *binlog.BinlogDMLEvent +} + +func newApplyEventStructByFunc(writeFunc *tableWriteFunc) *applyEventStruct { + result := &applyEventStruct{writeFunc: writeFunc} + return result +} + +func newApplyEventStructByDML(dmlEvent *binlog.BinlogDMLEvent) *applyEventStruct { + result := &applyEventStruct{dmlEvent: dmlEvent} + return result +} + const ( applyEventsQueueBuffer = 100 ) @@ -71,7 +86,7 @@ type Migrator struct { // copyRowsQueue should not be buffered; if buffered some non-damaging but // excessive work happens at the end of the iteration as new copy-jobs arrive befroe realizing the copy is complete copyRowsQueue chan tableWriteFunc - applyEventsQueue chan tableWriteFunc + applyEventsQueue chan *applyEventStruct handledChangelogStates map[string]bool } @@ -86,7 +101,7 @@ func NewMigrator() *Migrator { allEventsUpToLockProcessed: make(chan string), copyRowsQueue: make(chan tableWriteFunc), - applyEventsQueue: make(chan tableWriteFunc, applyEventsQueueBuffer), + applyEventsQueue: make(chan *applyEventStruct, applyEventsQueueBuffer), handledChangelogStates: make(map[string]bool), } return migrator @@ -194,7 +209,7 @@ func (this *Migrator) onChangelogStateEvent(dmlEvent *binlog.BinlogDMLEvent) (er } case AllEventsUpToLockProcessed: { - applyEventFunc := func() error { + var applyEventFunc tableWriteFunc = func() error { this.allEventsUpToLockProcessed <- changelogStateString return nil } @@ -204,7 +219,7 @@ func (this *Migrator) onChangelogStateEvent(dmlEvent *binlog.BinlogDMLEvent) (er // So as not to create a potential deadlock, we write this func to applyEventsQueue // asynchronously, understanding it doesn't really matter. go func() { - this.applyEventsQueue <- applyEventFunc + this.applyEventsQueue <- newApplyEventStructByFunc(&applyEventFunc) }() } default: @@ -917,11 +932,7 @@ func (this *Migrator) addDMLEventsListener() error { this.migrationContext.DatabaseName, this.migrationContext.OriginalTableName, func(dmlEvent *binlog.BinlogDMLEvent) error { - // Create a task to apply the DML event; this will be execute by executeWriteFuncs() - applyEventFunc := func() error { - return this.applier.ApplyDMLEventQuery(dmlEvent) - } - this.applyEventsQueue <- applyEventFunc + this.applyEventsQueue <- newApplyEventStructByDML(dmlEvent) return nil }, ) @@ -1032,10 +1043,37 @@ func (this *Migrator) executeWriteFuncs() error { // We give higher priority to event processing, then secondary priority to // rowcopy select { - case applyEventFunc := <-this.applyEventsQueue: + case applyEventStruct := <-this.applyEventsQueue: { - if err := this.retryOperation(applyEventFunc); err != nil { - return log.Errore(err) + if applyEventStruct.writeFunc != nil { + if err := this.retryOperation(*applyEventStruct.writeFunc); err != nil { + return log.Errore(err) + } + } + if applyEventStruct.dmlEvent != nil { + dmlEvents := [](*binlog.BinlogDMLEvent){} + dmlEvents = append(dmlEvents, applyEventStruct.dmlEvent) + + availableEvents := len(this.applyEventsQueue) + batchSize := int(atomic.LoadInt64(&this.migrationContext.DMLBatchSize)) + if availableEvents > batchSize { + availableEvents = batchSize + } + for i := 0; i < availableEvents; i++ { + additionalStruct := <-this.applyEventsQueue + if additionalStruct.dmlEvent == nil { + // Not a DML. We don't group this, and we don't batch any further + break + } + dmlEvents = append(dmlEvents, additionalStruct.dmlEvent) + } + // Create a task to apply the DML event; this will be execute by executeWriteFuncs() + var applyEventFunc tableWriteFunc = func() error { + return this.applier.ApplyDMLEventQueries(dmlEvents) + } + if err := this.retryOperation(applyEventFunc); err != nil { + return log.Errore(err) + } } } default: From 8d0faa55e3f30954dd42f45f4d81e4358254dbc2 Mon Sep 17 00:00:00 2001 From: Shlomi Noach Date: Wed, 4 Jan 2017 08:44:04 +0200 Subject: [PATCH 3/7] explicit rollback in ApplyDMLEventQueries() --- go/logic/applier.go | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/go/logic/applier.go b/go/logic/applier.go index d530d11..d1ad331 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -961,21 +961,27 @@ func (this *Applier) ApplyDMLEventQueries(dmlEvents [](*binlog.BinlogDMLEvent)) if err != nil { return err } + + rollback := func(err error) error { + tx.Rollback() + return err + } + sessionQuery := `SET SESSION time_zone = '+00:00', sql_mode = CONCAT(@@session.sql_mode, ',STRICT_ALL_TABLES') ` if _, err := tx.Exec(sessionQuery); err != nil { - return err + return rollback(err) } for _, dmlEvent := range dmlEvents { query, args, rowDelta, err := this.buildDMLEventQuery(dmlEvent) if err != nil { - return err + return rollback(err) } if _, err := tx.Exec(query, args...); err != nil { err = fmt.Errorf("%s; query=%s; args=%+v", err.Error(), query, args) - return err + return rollback(err) } totalDelta += rowDelta } From 645af21d038138d421a059c804a6a5d0bdc8f679 Mon Sep 17 00:00:00 2001 From: Shlomi Noach Date: Wed, 4 Jan 2017 12:39:57 +0200 Subject: [PATCH 4/7] extracted onApplyEventStruct() --- go/logic/migrator.go | 67 ++++++++++++++++++++++++-------------------- 1 file changed, 36 insertions(+), 31 deletions(-) diff --git a/go/logic/migrator.go b/go/logic/migrator.go index 4478b0d..3d83d27 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -1024,6 +1024,40 @@ func (this *Migrator) iterateChunks() error { return nil } +func (this *Migrator) onApplyEventStruct(eventStruct *applyEventStruct) error { + if eventStruct.writeFunc != nil { + if err := this.retryOperation(*eventStruct.writeFunc); err != nil { + return log.Errore(err) + } + } + if eventStruct.dmlEvent != nil { + dmlEvents := [](*binlog.BinlogDMLEvent){} + dmlEvents = append(dmlEvents, eventStruct.dmlEvent) + + availableEvents := len(this.applyEventsQueue) + batchSize := int(atomic.LoadInt64(&this.migrationContext.DMLBatchSize)) + if availableEvents > batchSize { + availableEvents = batchSize + } + for i := 0; i < availableEvents; i++ { + additionalStruct := <-this.applyEventsQueue + if additionalStruct.dmlEvent == nil { + // Not a DML. We don't group this, and we don't batch any further + break + } + dmlEvents = append(dmlEvents, additionalStruct.dmlEvent) + } + // Create a task to apply the DML event; this will be execute by executeWriteFuncs() + var applyEventFunc tableWriteFunc = func() error { + return this.applier.ApplyDMLEventQueries(dmlEvents) + } + if err := this.retryOperation(applyEventFunc); err != nil { + return log.Errore(err) + } + } + return nil +} + // executeWriteFuncs writes data via applier: both the rowcopy and the events backlog. // This is where the ghost table gets the data. The function fills the data single-threaded. // Both event backlog and rowcopy events are polled; the backlog events have precedence. @@ -1038,38 +1072,9 @@ func (this *Migrator) executeWriteFuncs() error { // We give higher priority to event processing, then secondary priority to // rowcopy select { - case applyEventStruct := <-this.applyEventsQueue: + case eventStruct := <-this.applyEventsQueue: { - if applyEventStruct.writeFunc != nil { - if err := this.retryOperation(*applyEventStruct.writeFunc); err != nil { - return log.Errore(err) - } - } - if applyEventStruct.dmlEvent != nil { - dmlEvents := [](*binlog.BinlogDMLEvent){} - dmlEvents = append(dmlEvents, applyEventStruct.dmlEvent) - - availableEvents := len(this.applyEventsQueue) - batchSize := int(atomic.LoadInt64(&this.migrationContext.DMLBatchSize)) - if availableEvents > batchSize { - availableEvents = batchSize - } - for i := 0; i < availableEvents; i++ { - additionalStruct := <-this.applyEventsQueue - if additionalStruct.dmlEvent == nil { - // Not a DML. We don't group this, and we don't batch any further - break - } - dmlEvents = append(dmlEvents, additionalStruct.dmlEvent) - } - // Create a task to apply the DML event; this will be execute by executeWriteFuncs() - var applyEventFunc tableWriteFunc = func() error { - return this.applier.ApplyDMLEventQueries(dmlEvents) - } - if err := this.retryOperation(applyEventFunc); err != nil { - return log.Errore(err) - } - } + this.onApplyEventStruct(eventStruct) } default: { From baaa255182fa22452b9a6a33451f3913f6dc367c Mon Sep 17 00:00:00 2001 From: Shlomi Noach Date: Wed, 4 Jan 2017 12:42:21 +0200 Subject: [PATCH 5/7] bailing out from onApplyEventStruct() --- go/logic/migrator.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/go/logic/migrator.go b/go/logic/migrator.go index 3d83d27..7b3daf3 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -1074,7 +1074,9 @@ func (this *Migrator) executeWriteFuncs() error { select { case eventStruct := <-this.applyEventsQueue: { - this.onApplyEventStruct(eventStruct) + if err := this.onApplyEventStruct(eventStruct); err != nil { + return err + } } default: { From f7d2beb4d28b4aa1d7b0acaf4c9adf0cd6166363 Mon Sep 17 00:00:00 2001 From: Shlomi Noach Date: Thu, 5 Jan 2017 08:13:51 +0200 Subject: [PATCH 6/7] handling a non-DML event at the end of a dml-event sequence --- go/logic/migrator.go | 21 ++++++++++++++++++--- 1 file changed, 18 insertions(+), 3 deletions(-) diff --git a/go/logic/migrator.go b/go/logic/migrator.go index 7b3daf3..61db6db 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -1025,14 +1025,21 @@ func (this *Migrator) iterateChunks() error { } func (this *Migrator) onApplyEventStruct(eventStruct *applyEventStruct) error { - if eventStruct.writeFunc != nil { - if err := this.retryOperation(*eventStruct.writeFunc); err != nil { - return log.Errore(err) + handleNonDMLEventStruct := func(eventStruct *applyEventStruct) error { + if eventStruct.writeFunc != nil { + if err := this.retryOperation(*eventStruct.writeFunc); err != nil { + return log.Errore(err) + } } + return nil + } + if eventStruct.dmlEvent == nil { + return handleNonDMLEventStruct(eventStruct) } if eventStruct.dmlEvent != nil { dmlEvents := [](*binlog.BinlogDMLEvent){} dmlEvents = append(dmlEvents, eventStruct.dmlEvent) + var nonDmlStructToApply *applyEventStruct availableEvents := len(this.applyEventsQueue) batchSize := int(atomic.LoadInt64(&this.migrationContext.DMLBatchSize)) @@ -1043,6 +1050,7 @@ func (this *Migrator) onApplyEventStruct(eventStruct *applyEventStruct) error { additionalStruct := <-this.applyEventsQueue if additionalStruct.dmlEvent == nil { // Not a DML. We don't group this, and we don't batch any further + nonDmlStructToApply = additionalStruct break } dmlEvents = append(dmlEvents, additionalStruct.dmlEvent) @@ -1054,6 +1062,13 @@ func (this *Migrator) onApplyEventStruct(eventStruct *applyEventStruct) error { if err := this.retryOperation(applyEventFunc); err != nil { return log.Errore(err) } + if nonDmlStructToApply != nil { + // We pulled DML events from the queue, and then we hit a non-DML event. Wait! + // We need to handle it! + if err := handleNonDMLEventStruct(nonDmlStructToApply); err != nil { + return log.Errore(err) + } + } } return nil } From 115702716146c07e824c99d5d77360a02c558d11 Mon Sep 17 00:00:00 2001 From: Shlomi Noach Date: Sun, 8 Jan 2017 09:46:12 +0200 Subject: [PATCH 7/7] documenting --dml-batch-size --- RELEASE_VERSION | 2 +- doc/command-line-flags.md | 11 +++++++++++ 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/RELEASE_VERSION b/RELEASE_VERSION index 15245f3..ffcbe71 100644 --- a/RELEASE_VERSION +++ b/RELEASE_VERSION @@ -1 +1 @@ -1.0.32 +1.0.34 diff --git a/doc/command-line-flags.md b/doc/command-line-flags.md index d7bc97b..83bfe05 100644 --- a/doc/command-line-flags.md +++ b/doc/command-line-flags.md @@ -65,6 +65,17 @@ At this time (10-2016) `gh-ost` does not support foreign keys on migrated tables See also: [`skip-foreign-key-checks`](#skip-foreign-key-checks) + +### dml-batch-size + +`gh-ost` reads event from the binary log and applies them onto the _ghost_ table. It does so in batched writes: grouping multiple events to apply in a single transaction. This gives better write throughput as we don't need to sync the transaction log to disk for each event. + +The `--dml-batch-size` flag controls the size of the batched write. Allowed values are `1 - 100`, where `1` means no batching (every event from the binary log is applied onto the _ghost_ table on its own transaction). Default value is `10`. + +Why is this behavior configurable? Different workloads have different characteristics. Some workloads have very large writes, such that aggregating even `50` writes into a transaction makes for a significant transaction size. On other workloads write rate is high such that one just can't allow for a hundred more syncs to disk per second. The default value of `10` is a modest compromise that should probably work very well for most workloads. Your mileage may vary. + +Noteworthy is that setting `--dml-batch-size` to higher value _does not_ mean `gh-ost` blocks or waits on writes. The batch size is an upper limit on transaction size, not a minimal one. If `gh-ost` doesn't have "enough" events in the pipe, it does not wait on the binary log, it just writes what it already has. This conveniently suggests that if write load is light enough for `gh-ost` to only see a few events in the binary log at a given time, then it is also light neough for `gh-ost` to apply a fraction of the batch size. + ### exact-rowcount A `gh-ost` execution need to copy whatever rows you have in your existing table onto the ghost table. This can, and often be, a large number. Exactly what that number is?