batching DML writes, configurable --dml-batch-size

This commit is contained in:
Shlomi Noach 2017-01-03 14:31:19 +02:00
parent 3e28f462d8
commit 445c67635d
4 changed files with 65 additions and 12 deletions

View File

@ -149,6 +149,7 @@ type MigrationContext struct {
controlReplicasLagResult mysql.ReplicationLagResult controlReplicasLagResult mysql.ReplicationLagResult
TotalRowsCopied int64 TotalRowsCopied int64
TotalDMLEventsApplied int64 TotalDMLEventsApplied int64
DMLBatchSize int64
isThrottled bool isThrottled bool
throttleReason string throttleReason string
throttleReasonHint ThrottleReasonHint throttleReasonHint ThrottleReasonHint
@ -208,6 +209,7 @@ func newMigrationContext() *MigrationContext {
ApplierConnectionConfig: mysql.NewConnectionConfig(), ApplierConnectionConfig: mysql.NewConnectionConfig(),
MaxLagMillisecondsThrottleThreshold: 1500, MaxLagMillisecondsThrottleThreshold: 1500,
CutOverLockTimeoutSeconds: 3, CutOverLockTimeoutSeconds: 3,
DMLBatchSize: 10,
maxLoad: NewLoadMap(), maxLoad: NewLoadMap(),
criticalLoad: NewLoadMap(), criticalLoad: NewLoadMap(),
throttleMutex: &sync.Mutex{}, throttleMutex: &sync.Mutex{},
@ -418,6 +420,16 @@ func (this *MigrationContext) SetChunkSize(chunkSize int64) {
atomic.StoreInt64(&this.ChunkSize, chunkSize) atomic.StoreInt64(&this.ChunkSize, chunkSize)
} }
func (this *MigrationContext) SetDMLBatchSize(batchSize int64) {
if batchSize < 1 {
batchSize = 1
}
if batchSize > 100 {
batchSize = 100
}
atomic.StoreInt64(&this.DMLBatchSize, batchSize)
}
func (this *MigrationContext) SetThrottleGeneralCheckResult(checkResult *ThrottleCheckResult) *ThrottleCheckResult { func (this *MigrationContext) SetThrottleGeneralCheckResult(checkResult *ThrottleCheckResult) *ThrottleCheckResult {
this.throttleMutex.Lock() this.throttleMutex.Lock()
defer this.throttleMutex.Unlock() defer this.throttleMutex.Unlock()

View File

@ -83,6 +83,7 @@ func main() {
flag.BoolVar(&migrationContext.SwitchToRowBinlogFormat, "switch-to-rbr", false, "let this tool automatically switch binary log format to 'ROW' on the replica, if needed. The format will NOT be switched back. I'm too scared to do that, and wish to protect you if you happen to execute another migration while this one is running") flag.BoolVar(&migrationContext.SwitchToRowBinlogFormat, "switch-to-rbr", false, "let this tool automatically switch binary log format to 'ROW' on the replica, if needed. The format will NOT be switched back. I'm too scared to do that, and wish to protect you if you happen to execute another migration while this one is running")
flag.BoolVar(&migrationContext.AssumeRBR, "assume-rbr", false, "set to 'true' when you know for certain your server uses 'ROW' binlog_format. gh-ost is unable to tell, event after reading binlog_format, whether the replication process does indeed use 'ROW', and restarts replication to be certain RBR setting is applied. Such operation requires SUPER privileges which you might not have. Setting this flag avoids restarting replication and you can proceed to use gh-ost without SUPER privileges") flag.BoolVar(&migrationContext.AssumeRBR, "assume-rbr", false, "set to 'true' when you know for certain your server uses 'ROW' binlog_format. gh-ost is unable to tell, event after reading binlog_format, whether the replication process does indeed use 'ROW', and restarts replication to be certain RBR setting is applied. Such operation requires SUPER privileges which you might not have. Setting this flag avoids restarting replication and you can proceed to use gh-ost without SUPER privileges")
chunkSize := flag.Int64("chunk-size", 1000, "amount of rows to handle in each iteration (allowed range: 100-100,000)") chunkSize := flag.Int64("chunk-size", 1000, "amount of rows to handle in each iteration (allowed range: 100-100,000)")
dmlBatchSize := flag.Int64("dml-batch-size", 10, "batch size for DML events to apply in a single transaction (range 1-100)")
defaultRetries := flag.Int64("default-retries", 60, "Default number of retries for various operations before panicking") defaultRetries := flag.Int64("default-retries", 60, "Default number of retries for various operations before panicking")
cutOverLockTimeoutSeconds := flag.Int64("cut-over-lock-timeout-seconds", 3, "Max number of seconds to hold locks on tables while attempting to cut-over (retry attempted when lock exceeds timeout)") cutOverLockTimeoutSeconds := flag.Int64("cut-over-lock-timeout-seconds", 3, "Max number of seconds to hold locks on tables while attempting to cut-over (retry attempted when lock exceeds timeout)")
niceRatio := flag.Float64("nice-ratio", 0, "force being 'nice', imply sleep time per chunk time; range: [0.0..100.0]. Example values: 0 is aggressive. 1: for every 1ms spent copying rows, sleep additional 1ms (effectively doubling runtime); 0.7: for every 10ms spend in a rowcopy chunk, spend 7ms sleeping immediately after") niceRatio := flag.Float64("nice-ratio", 0, "force being 'nice', imply sleep time per chunk time; range: [0.0..100.0]. Example values: 0 is aggressive. 1: for every 1ms spent copying rows, sleep additional 1ms (effectively doubling runtime); 0.7: for every 10ms spend in a rowcopy chunk, spend 7ms sleeping immediately after")
@ -220,6 +221,7 @@ func main() {
migrationContext.SetHeartbeatIntervalMilliseconds(*heartbeatIntervalMillis) migrationContext.SetHeartbeatIntervalMilliseconds(*heartbeatIntervalMillis)
migrationContext.SetNiceRatio(*niceRatio) migrationContext.SetNiceRatio(*niceRatio)
migrationContext.SetChunkSize(*chunkSize) migrationContext.SetChunkSize(*chunkSize)
migrationContext.SetDMLBatchSize(*dmlBatchSize)
migrationContext.SetMaxLagMillisecondsThrottleThreshold(*maxLagMillis) migrationContext.SetMaxLagMillisecondsThrottleThreshold(*maxLagMillis)
migrationContext.SetReplicationLagQuery(*replicationLagQuery) migrationContext.SetReplicationLagQuery(*replicationLagQuery)
migrationContext.SetThrottleQuery(*throttleQuery) migrationContext.SetThrottleQuery(*throttleQuery)

View File

@ -993,5 +993,6 @@ func (this *Applier) ApplyDMLEventQueries(dmlEvents [](*binlog.BinlogDMLEvent))
if this.migrationContext.CountTableRows { if this.migrationContext.CountTableRows {
atomic.AddInt64(&this.migrationContext.RowsDeltaEstimate, totalDelta) atomic.AddInt64(&this.migrationContext.RowsDeltaEstimate, totalDelta)
} }
log.Debugf("ApplyDMLEventQueries() applied %d events in one transaction", len(dmlEvents))
return nil return nil
} }

View File

@ -37,6 +37,21 @@ func ReadChangelogState(s string) ChangelogState {
type tableWriteFunc func() error type tableWriteFunc func() error
type applyEventStruct struct {
writeFunc *tableWriteFunc
dmlEvent *binlog.BinlogDMLEvent
}
func newApplyEventStructByFunc(writeFunc *tableWriteFunc) *applyEventStruct {
result := &applyEventStruct{writeFunc: writeFunc}
return result
}
func newApplyEventStructByDML(dmlEvent *binlog.BinlogDMLEvent) *applyEventStruct {
result := &applyEventStruct{dmlEvent: dmlEvent}
return result
}
const ( const (
applyEventsQueueBuffer = 100 applyEventsQueueBuffer = 100
) )
@ -71,7 +86,7 @@ type Migrator struct {
// copyRowsQueue should not be buffered; if buffered some non-damaging but // copyRowsQueue should not be buffered; if buffered some non-damaging but
// excessive work happens at the end of the iteration as new copy-jobs arrive befroe realizing the copy is complete // excessive work happens at the end of the iteration as new copy-jobs arrive befroe realizing the copy is complete
copyRowsQueue chan tableWriteFunc copyRowsQueue chan tableWriteFunc
applyEventsQueue chan tableWriteFunc applyEventsQueue chan *applyEventStruct
handledChangelogStates map[string]bool handledChangelogStates map[string]bool
} }
@ -86,7 +101,7 @@ func NewMigrator() *Migrator {
allEventsUpToLockProcessed: make(chan string), allEventsUpToLockProcessed: make(chan string),
copyRowsQueue: make(chan tableWriteFunc), copyRowsQueue: make(chan tableWriteFunc),
applyEventsQueue: make(chan tableWriteFunc, applyEventsQueueBuffer), applyEventsQueue: make(chan *applyEventStruct, applyEventsQueueBuffer),
handledChangelogStates: make(map[string]bool), handledChangelogStates: make(map[string]bool),
} }
return migrator return migrator
@ -194,7 +209,7 @@ func (this *Migrator) onChangelogStateEvent(dmlEvent *binlog.BinlogDMLEvent) (er
} }
case AllEventsUpToLockProcessed: case AllEventsUpToLockProcessed:
{ {
applyEventFunc := func() error { var applyEventFunc tableWriteFunc = func() error {
this.allEventsUpToLockProcessed <- changelogStateString this.allEventsUpToLockProcessed <- changelogStateString
return nil return nil
} }
@ -204,7 +219,7 @@ func (this *Migrator) onChangelogStateEvent(dmlEvent *binlog.BinlogDMLEvent) (er
// So as not to create a potential deadlock, we write this func to applyEventsQueue // So as not to create a potential deadlock, we write this func to applyEventsQueue
// asynchronously, understanding it doesn't really matter. // asynchronously, understanding it doesn't really matter.
go func() { go func() {
this.applyEventsQueue <- applyEventFunc this.applyEventsQueue <- newApplyEventStructByFunc(&applyEventFunc)
}() }()
} }
default: default:
@ -917,11 +932,7 @@ func (this *Migrator) addDMLEventsListener() error {
this.migrationContext.DatabaseName, this.migrationContext.DatabaseName,
this.migrationContext.OriginalTableName, this.migrationContext.OriginalTableName,
func(dmlEvent *binlog.BinlogDMLEvent) error { func(dmlEvent *binlog.BinlogDMLEvent) error {
// Create a task to apply the DML event; this will be execute by executeWriteFuncs() this.applyEventsQueue <- newApplyEventStructByDML(dmlEvent)
applyEventFunc := func() error {
return this.applier.ApplyDMLEventQuery(dmlEvent)
}
this.applyEventsQueue <- applyEventFunc
return nil return nil
}, },
) )
@ -1032,12 +1043,39 @@ func (this *Migrator) executeWriteFuncs() error {
// We give higher priority to event processing, then secondary priority to // We give higher priority to event processing, then secondary priority to
// rowcopy // rowcopy
select { select {
case applyEventFunc := <-this.applyEventsQueue: case applyEventStruct := <-this.applyEventsQueue:
{ {
if applyEventStruct.writeFunc != nil {
if err := this.retryOperation(*applyEventStruct.writeFunc); err != nil {
return log.Errore(err)
}
}
if applyEventStruct.dmlEvent != nil {
dmlEvents := [](*binlog.BinlogDMLEvent){}
dmlEvents = append(dmlEvents, applyEventStruct.dmlEvent)
availableEvents := len(this.applyEventsQueue)
batchSize := int(atomic.LoadInt64(&this.migrationContext.DMLBatchSize))
if availableEvents > batchSize {
availableEvents = batchSize
}
for i := 0; i < availableEvents; i++ {
additionalStruct := <-this.applyEventsQueue
if additionalStruct.dmlEvent == nil {
// Not a DML. We don't group this, and we don't batch any further
break
}
dmlEvents = append(dmlEvents, additionalStruct.dmlEvent)
}
// Create a task to apply the DML event; this will be execute by executeWriteFuncs()
var applyEventFunc tableWriteFunc = func() error {
return this.applier.ApplyDMLEventQueries(dmlEvents)
}
if err := this.retryOperation(applyEventFunc); err != nil { if err := this.retryOperation(applyEventFunc); err != nil {
return log.Errore(err) return log.Errore(err)
} }
} }
}
default: default:
{ {
select { select {