Merge pull request #352 from github/batch-apply-dml-events
Batch apply dml events
This commit is contained in:
commit
8d8ef34c23
@ -1 +1 @@
|
|||||||
1.0.32
|
1.0.34
|
||||||
|
@ -65,6 +65,17 @@ At this time (10-2016) `gh-ost` does not support foreign keys on migrated tables
|
|||||||
|
|
||||||
See also: [`skip-foreign-key-checks`](#skip-foreign-key-checks)
|
See also: [`skip-foreign-key-checks`](#skip-foreign-key-checks)
|
||||||
|
|
||||||
|
|
||||||
|
### dml-batch-size
|
||||||
|
|
||||||
|
`gh-ost` reads event from the binary log and applies them onto the _ghost_ table. It does so in batched writes: grouping multiple events to apply in a single transaction. This gives better write throughput as we don't need to sync the transaction log to disk for each event.
|
||||||
|
|
||||||
|
The `--dml-batch-size` flag controls the size of the batched write. Allowed values are `1 - 100`, where `1` means no batching (every event from the binary log is applied onto the _ghost_ table on its own transaction). Default value is `10`.
|
||||||
|
|
||||||
|
Why is this behavior configurable? Different workloads have different characteristics. Some workloads have very large writes, such that aggregating even `50` writes into a transaction makes for a significant transaction size. On other workloads write rate is high such that one just can't allow for a hundred more syncs to disk per second. The default value of `10` is a modest compromise that should probably work very well for most workloads. Your mileage may vary.
|
||||||
|
|
||||||
|
Noteworthy is that setting `--dml-batch-size` to higher value _does not_ mean `gh-ost` blocks or waits on writes. The batch size is an upper limit on transaction size, not a minimal one. If `gh-ost` doesn't have "enough" events in the pipe, it does not wait on the binary log, it just writes what it already has. This conveniently suggests that if write load is light enough for `gh-ost` to only see a few events in the binary log at a given time, then it is also light neough for `gh-ost` to apply a fraction of the batch size.
|
||||||
|
|
||||||
### exact-rowcount
|
### exact-rowcount
|
||||||
|
|
||||||
A `gh-ost` execution need to copy whatever rows you have in your existing table onto the ghost table. This can, and often be, a large number. Exactly what that number is?
|
A `gh-ost` execution need to copy whatever rows you have in your existing table onto the ghost table. This can, and often be, a large number. Exactly what that number is?
|
||||||
|
@ -148,6 +148,7 @@ type MigrationContext struct {
|
|||||||
controlReplicasLagResult mysql.ReplicationLagResult
|
controlReplicasLagResult mysql.ReplicationLagResult
|
||||||
TotalRowsCopied int64
|
TotalRowsCopied int64
|
||||||
TotalDMLEventsApplied int64
|
TotalDMLEventsApplied int64
|
||||||
|
DMLBatchSize int64
|
||||||
isThrottled bool
|
isThrottled bool
|
||||||
throttleReason string
|
throttleReason string
|
||||||
throttleReasonHint ThrottleReasonHint
|
throttleReasonHint ThrottleReasonHint
|
||||||
@ -207,6 +208,7 @@ func newMigrationContext() *MigrationContext {
|
|||||||
ApplierConnectionConfig: mysql.NewConnectionConfig(),
|
ApplierConnectionConfig: mysql.NewConnectionConfig(),
|
||||||
MaxLagMillisecondsThrottleThreshold: 1500,
|
MaxLagMillisecondsThrottleThreshold: 1500,
|
||||||
CutOverLockTimeoutSeconds: 3,
|
CutOverLockTimeoutSeconds: 3,
|
||||||
|
DMLBatchSize: 10,
|
||||||
maxLoad: NewLoadMap(),
|
maxLoad: NewLoadMap(),
|
||||||
criticalLoad: NewLoadMap(),
|
criticalLoad: NewLoadMap(),
|
||||||
throttleMutex: &sync.Mutex{},
|
throttleMutex: &sync.Mutex{},
|
||||||
@ -417,6 +419,16 @@ func (this *MigrationContext) SetChunkSize(chunkSize int64) {
|
|||||||
atomic.StoreInt64(&this.ChunkSize, chunkSize)
|
atomic.StoreInt64(&this.ChunkSize, chunkSize)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (this *MigrationContext) SetDMLBatchSize(batchSize int64) {
|
||||||
|
if batchSize < 1 {
|
||||||
|
batchSize = 1
|
||||||
|
}
|
||||||
|
if batchSize > 100 {
|
||||||
|
batchSize = 100
|
||||||
|
}
|
||||||
|
atomic.StoreInt64(&this.DMLBatchSize, batchSize)
|
||||||
|
}
|
||||||
|
|
||||||
func (this *MigrationContext) SetThrottleGeneralCheckResult(checkResult *ThrottleCheckResult) *ThrottleCheckResult {
|
func (this *MigrationContext) SetThrottleGeneralCheckResult(checkResult *ThrottleCheckResult) *ThrottleCheckResult {
|
||||||
this.throttleMutex.Lock()
|
this.throttleMutex.Lock()
|
||||||
defer this.throttleMutex.Unlock()
|
defer this.throttleMutex.Unlock()
|
||||||
|
@ -83,6 +83,7 @@ func main() {
|
|||||||
flag.BoolVar(&migrationContext.SwitchToRowBinlogFormat, "switch-to-rbr", false, "let this tool automatically switch binary log format to 'ROW' on the replica, if needed. The format will NOT be switched back. I'm too scared to do that, and wish to protect you if you happen to execute another migration while this one is running")
|
flag.BoolVar(&migrationContext.SwitchToRowBinlogFormat, "switch-to-rbr", false, "let this tool automatically switch binary log format to 'ROW' on the replica, if needed. The format will NOT be switched back. I'm too scared to do that, and wish to protect you if you happen to execute another migration while this one is running")
|
||||||
flag.BoolVar(&migrationContext.AssumeRBR, "assume-rbr", false, "set to 'true' when you know for certain your server uses 'ROW' binlog_format. gh-ost is unable to tell, event after reading binlog_format, whether the replication process does indeed use 'ROW', and restarts replication to be certain RBR setting is applied. Such operation requires SUPER privileges which you might not have. Setting this flag avoids restarting replication and you can proceed to use gh-ost without SUPER privileges")
|
flag.BoolVar(&migrationContext.AssumeRBR, "assume-rbr", false, "set to 'true' when you know for certain your server uses 'ROW' binlog_format. gh-ost is unable to tell, event after reading binlog_format, whether the replication process does indeed use 'ROW', and restarts replication to be certain RBR setting is applied. Such operation requires SUPER privileges which you might not have. Setting this flag avoids restarting replication and you can proceed to use gh-ost without SUPER privileges")
|
||||||
chunkSize := flag.Int64("chunk-size", 1000, "amount of rows to handle in each iteration (allowed range: 100-100,000)")
|
chunkSize := flag.Int64("chunk-size", 1000, "amount of rows to handle in each iteration (allowed range: 100-100,000)")
|
||||||
|
dmlBatchSize := flag.Int64("dml-batch-size", 10, "batch size for DML events to apply in a single transaction (range 1-100)")
|
||||||
defaultRetries := flag.Int64("default-retries", 60, "Default number of retries for various operations before panicking")
|
defaultRetries := flag.Int64("default-retries", 60, "Default number of retries for various operations before panicking")
|
||||||
cutOverLockTimeoutSeconds := flag.Int64("cut-over-lock-timeout-seconds", 3, "Max number of seconds to hold locks on tables while attempting to cut-over (retry attempted when lock exceeds timeout)")
|
cutOverLockTimeoutSeconds := flag.Int64("cut-over-lock-timeout-seconds", 3, "Max number of seconds to hold locks on tables while attempting to cut-over (retry attempted when lock exceeds timeout)")
|
||||||
niceRatio := flag.Float64("nice-ratio", 0, "force being 'nice', imply sleep time per chunk time; range: [0.0..100.0]. Example values: 0 is aggressive. 1: for every 1ms spent copying rows, sleep additional 1ms (effectively doubling runtime); 0.7: for every 10ms spend in a rowcopy chunk, spend 7ms sleeping immediately after")
|
niceRatio := flag.Float64("nice-ratio", 0, "force being 'nice', imply sleep time per chunk time; range: [0.0..100.0]. Example values: 0 is aggressive. 1: for every 1ms spent copying rows, sleep additional 1ms (effectively doubling runtime); 0.7: for every 10ms spend in a rowcopy chunk, spend 7ms sleeping immediately after")
|
||||||
@ -223,6 +224,7 @@ func main() {
|
|||||||
migrationContext.SetHeartbeatIntervalMilliseconds(*heartbeatIntervalMillis)
|
migrationContext.SetHeartbeatIntervalMilliseconds(*heartbeatIntervalMillis)
|
||||||
migrationContext.SetNiceRatio(*niceRatio)
|
migrationContext.SetNiceRatio(*niceRatio)
|
||||||
migrationContext.SetChunkSize(*chunkSize)
|
migrationContext.SetChunkSize(*chunkSize)
|
||||||
|
migrationContext.SetDMLBatchSize(*dmlBatchSize)
|
||||||
migrationContext.SetMaxLagMillisecondsThrottleThreshold(*maxLagMillis)
|
migrationContext.SetMaxLagMillisecondsThrottleThreshold(*maxLagMillis)
|
||||||
migrationContext.SetThrottleQuery(*throttleQuery)
|
migrationContext.SetThrottleQuery(*throttleQuery)
|
||||||
migrationContext.SetDefaultNumRetries(*defaultRetries)
|
migrationContext.SetDefaultNumRetries(*defaultRetries)
|
||||||
|
@ -950,3 +950,55 @@ func (this *Applier) ApplyDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) error {
|
|||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ApplyDMLEventQueries applies multiple DML queries onto the _ghost_ table
|
||||||
|
func (this *Applier) ApplyDMLEventQueries(dmlEvents [](*binlog.BinlogDMLEvent)) error {
|
||||||
|
|
||||||
|
var totalDelta int64
|
||||||
|
|
||||||
|
err := func() error {
|
||||||
|
tx, err := this.db.Begin()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
rollback := func(err error) error {
|
||||||
|
tx.Rollback()
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
sessionQuery := `SET
|
||||||
|
SESSION time_zone = '+00:00',
|
||||||
|
sql_mode = CONCAT(@@session.sql_mode, ',STRICT_ALL_TABLES')
|
||||||
|
`
|
||||||
|
if _, err := tx.Exec(sessionQuery); err != nil {
|
||||||
|
return rollback(err)
|
||||||
|
}
|
||||||
|
for _, dmlEvent := range dmlEvents {
|
||||||
|
query, args, rowDelta, err := this.buildDMLEventQuery(dmlEvent)
|
||||||
|
if err != nil {
|
||||||
|
return rollback(err)
|
||||||
|
}
|
||||||
|
if _, err := tx.Exec(query, args...); err != nil {
|
||||||
|
err = fmt.Errorf("%s; query=%s; args=%+v", err.Error(), query, args)
|
||||||
|
return rollback(err)
|
||||||
|
}
|
||||||
|
totalDelta += rowDelta
|
||||||
|
}
|
||||||
|
if err := tx.Commit(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}()
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return log.Errore(err)
|
||||||
|
}
|
||||||
|
// no error
|
||||||
|
atomic.AddInt64(&this.migrationContext.TotalDMLEventsApplied, int64(len(dmlEvents)))
|
||||||
|
if this.migrationContext.CountTableRows {
|
||||||
|
atomic.AddInt64(&this.migrationContext.RowsDeltaEstimate, totalDelta)
|
||||||
|
}
|
||||||
|
log.Debugf("ApplyDMLEventQueries() applied %d events in one transaction", len(dmlEvents))
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
@ -37,6 +37,21 @@ func ReadChangelogState(s string) ChangelogState {
|
|||||||
|
|
||||||
type tableWriteFunc func() error
|
type tableWriteFunc func() error
|
||||||
|
|
||||||
|
type applyEventStruct struct {
|
||||||
|
writeFunc *tableWriteFunc
|
||||||
|
dmlEvent *binlog.BinlogDMLEvent
|
||||||
|
}
|
||||||
|
|
||||||
|
func newApplyEventStructByFunc(writeFunc *tableWriteFunc) *applyEventStruct {
|
||||||
|
result := &applyEventStruct{writeFunc: writeFunc}
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
|
func newApplyEventStructByDML(dmlEvent *binlog.BinlogDMLEvent) *applyEventStruct {
|
||||||
|
result := &applyEventStruct{dmlEvent: dmlEvent}
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
const (
|
const (
|
||||||
applyEventsQueueBuffer = 100
|
applyEventsQueueBuffer = 100
|
||||||
)
|
)
|
||||||
@ -71,7 +86,7 @@ type Migrator struct {
|
|||||||
// copyRowsQueue should not be buffered; if buffered some non-damaging but
|
// copyRowsQueue should not be buffered; if buffered some non-damaging but
|
||||||
// excessive work happens at the end of the iteration as new copy-jobs arrive befroe realizing the copy is complete
|
// excessive work happens at the end of the iteration as new copy-jobs arrive befroe realizing the copy is complete
|
||||||
copyRowsQueue chan tableWriteFunc
|
copyRowsQueue chan tableWriteFunc
|
||||||
applyEventsQueue chan tableWriteFunc
|
applyEventsQueue chan *applyEventStruct
|
||||||
|
|
||||||
handledChangelogStates map[string]bool
|
handledChangelogStates map[string]bool
|
||||||
}
|
}
|
||||||
@ -86,7 +101,7 @@ func NewMigrator() *Migrator {
|
|||||||
allEventsUpToLockProcessed: make(chan string),
|
allEventsUpToLockProcessed: make(chan string),
|
||||||
|
|
||||||
copyRowsQueue: make(chan tableWriteFunc),
|
copyRowsQueue: make(chan tableWriteFunc),
|
||||||
applyEventsQueue: make(chan tableWriteFunc, applyEventsQueueBuffer),
|
applyEventsQueue: make(chan *applyEventStruct, applyEventsQueueBuffer),
|
||||||
handledChangelogStates: make(map[string]bool),
|
handledChangelogStates: make(map[string]bool),
|
||||||
}
|
}
|
||||||
return migrator
|
return migrator
|
||||||
@ -194,7 +209,7 @@ func (this *Migrator) onChangelogStateEvent(dmlEvent *binlog.BinlogDMLEvent) (er
|
|||||||
}
|
}
|
||||||
case AllEventsUpToLockProcessed:
|
case AllEventsUpToLockProcessed:
|
||||||
{
|
{
|
||||||
applyEventFunc := func() error {
|
var applyEventFunc tableWriteFunc = func() error {
|
||||||
this.allEventsUpToLockProcessed <- changelogStateString
|
this.allEventsUpToLockProcessed <- changelogStateString
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -204,7 +219,7 @@ func (this *Migrator) onChangelogStateEvent(dmlEvent *binlog.BinlogDMLEvent) (er
|
|||||||
// So as not to create a potential deadlock, we write this func to applyEventsQueue
|
// So as not to create a potential deadlock, we write this func to applyEventsQueue
|
||||||
// asynchronously, understanding it doesn't really matter.
|
// asynchronously, understanding it doesn't really matter.
|
||||||
go func() {
|
go func() {
|
||||||
this.applyEventsQueue <- applyEventFunc
|
this.applyEventsQueue <- newApplyEventStructByFunc(&applyEventFunc)
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
default:
|
default:
|
||||||
@ -912,11 +927,7 @@ func (this *Migrator) addDMLEventsListener() error {
|
|||||||
this.migrationContext.DatabaseName,
|
this.migrationContext.DatabaseName,
|
||||||
this.migrationContext.OriginalTableName,
|
this.migrationContext.OriginalTableName,
|
||||||
func(dmlEvent *binlog.BinlogDMLEvent) error {
|
func(dmlEvent *binlog.BinlogDMLEvent) error {
|
||||||
// Create a task to apply the DML event; this will be execute by executeWriteFuncs()
|
this.applyEventsQueue <- newApplyEventStructByDML(dmlEvent)
|
||||||
applyEventFunc := func() error {
|
|
||||||
return this.applier.ApplyDMLEventQuery(dmlEvent)
|
|
||||||
}
|
|
||||||
this.applyEventsQueue <- applyEventFunc
|
|
||||||
return nil
|
return nil
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
@ -1013,6 +1024,55 @@ func (this *Migrator) iterateChunks() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (this *Migrator) onApplyEventStruct(eventStruct *applyEventStruct) error {
|
||||||
|
handleNonDMLEventStruct := func(eventStruct *applyEventStruct) error {
|
||||||
|
if eventStruct.writeFunc != nil {
|
||||||
|
if err := this.retryOperation(*eventStruct.writeFunc); err != nil {
|
||||||
|
return log.Errore(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if eventStruct.dmlEvent == nil {
|
||||||
|
return handleNonDMLEventStruct(eventStruct)
|
||||||
|
}
|
||||||
|
if eventStruct.dmlEvent != nil {
|
||||||
|
dmlEvents := [](*binlog.BinlogDMLEvent){}
|
||||||
|
dmlEvents = append(dmlEvents, eventStruct.dmlEvent)
|
||||||
|
var nonDmlStructToApply *applyEventStruct
|
||||||
|
|
||||||
|
availableEvents := len(this.applyEventsQueue)
|
||||||
|
batchSize := int(atomic.LoadInt64(&this.migrationContext.DMLBatchSize))
|
||||||
|
if availableEvents > batchSize {
|
||||||
|
availableEvents = batchSize
|
||||||
|
}
|
||||||
|
for i := 0; i < availableEvents; i++ {
|
||||||
|
additionalStruct := <-this.applyEventsQueue
|
||||||
|
if additionalStruct.dmlEvent == nil {
|
||||||
|
// Not a DML. We don't group this, and we don't batch any further
|
||||||
|
nonDmlStructToApply = additionalStruct
|
||||||
|
break
|
||||||
|
}
|
||||||
|
dmlEvents = append(dmlEvents, additionalStruct.dmlEvent)
|
||||||
|
}
|
||||||
|
// Create a task to apply the DML event; this will be execute by executeWriteFuncs()
|
||||||
|
var applyEventFunc tableWriteFunc = func() error {
|
||||||
|
return this.applier.ApplyDMLEventQueries(dmlEvents)
|
||||||
|
}
|
||||||
|
if err := this.retryOperation(applyEventFunc); err != nil {
|
||||||
|
return log.Errore(err)
|
||||||
|
}
|
||||||
|
if nonDmlStructToApply != nil {
|
||||||
|
// We pulled DML events from the queue, and then we hit a non-DML event. Wait!
|
||||||
|
// We need to handle it!
|
||||||
|
if err := handleNonDMLEventStruct(nonDmlStructToApply); err != nil {
|
||||||
|
return log.Errore(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// executeWriteFuncs writes data via applier: both the rowcopy and the events backlog.
|
// executeWriteFuncs writes data via applier: both the rowcopy and the events backlog.
|
||||||
// This is where the ghost table gets the data. The function fills the data single-threaded.
|
// This is where the ghost table gets the data. The function fills the data single-threaded.
|
||||||
// Both event backlog and rowcopy events are polled; the backlog events have precedence.
|
// Both event backlog and rowcopy events are polled; the backlog events have precedence.
|
||||||
@ -1027,10 +1087,10 @@ func (this *Migrator) executeWriteFuncs() error {
|
|||||||
// We give higher priority to event processing, then secondary priority to
|
// We give higher priority to event processing, then secondary priority to
|
||||||
// rowcopy
|
// rowcopy
|
||||||
select {
|
select {
|
||||||
case applyEventFunc := <-this.applyEventsQueue:
|
case eventStruct := <-this.applyEventsQueue:
|
||||||
{
|
{
|
||||||
if err := this.retryOperation(applyEventFunc); err != nil {
|
if err := this.onApplyEventStruct(eventStruct); err != nil {
|
||||||
return log.Errore(err)
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
default:
|
default:
|
||||||
|
Loading…
Reference in New Issue
Block a user