Merge pull request #22 from github/throttle-control-replicas
throttle via control replicas
This commit is contained in:
commit
93ac641455
@ -43,6 +43,8 @@ type MigrationContext struct {
|
||||
|
||||
ChunkSize int64
|
||||
MaxLagMillisecondsThrottleThreshold int64
|
||||
ReplictionLagQuery string
|
||||
ThrottleControlReplicaKeys *mysql.InstanceKeyMap
|
||||
ThrottleFlagFile string
|
||||
ThrottleAdditionalFlagFile string
|
||||
MaxLoad map[string]int64
|
||||
@ -102,6 +104,7 @@ func newMigrationContext() *MigrationContext {
|
||||
SwapTablesTimeoutSeconds: 3,
|
||||
MaxLoad: make(map[string]int64),
|
||||
throttleMutex: &sync.Mutex{},
|
||||
ThrottleControlReplicaKeys: mysql.NewInstanceKeyMap(),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -44,6 +44,8 @@ func main() {
|
||||
migrationContext.ChunkSize = 100000
|
||||
}
|
||||
flag.Int64Var(&migrationContext.MaxLagMillisecondsThrottleThreshold, "max-lag-millis", 1500, "replication lag at which to throttle operation")
|
||||
flag.StringVar(&migrationContext.ReplictionLagQuery, "replication-lag-query", "", "Query that detects replication lag in seconds. Result can be a floating point (by default gh-osc issues SHOW SLAVE STATUS and reads Seconds_behind_master). If you're using pt-heartbeat, query would be something like: SELECT ROUND(UNIX_TIMESTAMP() - MAX(UNIX_TIMESTAMP(ts))) AS delay FROM my_schema.heartbeat")
|
||||
throttleControlReplicas := flag.String("throttle-control-replicas", "", "List of replicas on which to check for lag; comma delimited. Example: myhost1.com:3306,myhost2.com,myhost3.com:3307")
|
||||
flag.StringVar(&migrationContext.ThrottleFlagFile, "throttle-flag-file", "", "operation pauses when this file exists; hint: use a file that is specific to the table being altered")
|
||||
flag.StringVar(&migrationContext.ThrottleAdditionalFlagFile, "throttle-additional-flag-file", "/tmp/gh-osc.throttle", "operation pauses when this file exists; hint: keep default, use for throttling multiple gh-osc operations")
|
||||
maxLoad := flag.String("max-load", "", "Comma delimited status-name=threshold. e.g: 'Threads_running=100,Threads_connected=500'")
|
||||
@ -91,6 +93,9 @@ func main() {
|
||||
if migrationContext.QuickAndBumpySwapTables && migrationContext.TestOnReplica {
|
||||
log.Fatalf("--quick-and-bumpy-swap-tables and --test-on-replica are mutually exclusive (the former implies migrating on master)")
|
||||
}
|
||||
if err := migrationContext.ThrottleControlReplicaKeys.ReadCommaDelimitedList(*throttleControlReplicas); err != nil {
|
||||
log.Fatale(err)
|
||||
}
|
||||
if err := migrationContext.ReadMaxLoad(*maxLoad); err != nil {
|
||||
log.Fatale(err)
|
||||
}
|
||||
|
@ -188,7 +188,7 @@ func (this *Inspector) validateGrants() error {
|
||||
func (this *Inspector) restartReplication() error {
|
||||
log.Infof("Restarting replication on %s:%d to make sure binlog settings apply to replication thread", this.connectionConfig.Key.Hostname, this.connectionConfig.Key.Port)
|
||||
|
||||
masterKey, _ := getMasterKeyFromSlaveStatus(this.connectionConfig)
|
||||
masterKey, _ := mysql.GetMasterKeyFromSlaveStatus(this.connectionConfig)
|
||||
if masterKey == nil {
|
||||
// This is not a replica
|
||||
return nil
|
||||
@ -503,45 +503,5 @@ func (this *Inspector) readChangelogState() (map[string]string, error) {
|
||||
|
||||
func (this *Inspector) getMasterConnectionConfig() (applierConfig *mysql.ConnectionConfig, err error) {
|
||||
visitedKeys := mysql.NewInstanceKeyMap()
|
||||
return getMasterConnectionConfigSafe(this.connectionConfig, visitedKeys)
|
||||
}
|
||||
|
||||
func getMasterKeyFromSlaveStatus(connectionConfig *mysql.ConnectionConfig) (masterKey *mysql.InstanceKey, err error) {
|
||||
currentUri := connectionConfig.GetDBUri("information_schema")
|
||||
db, _, err := sqlutils.GetDB(currentUri)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
err = sqlutils.QueryRowsMap(db, `show slave status`, func(rowMap sqlutils.RowMap) error {
|
||||
masterKey = &mysql.InstanceKey{
|
||||
Hostname: rowMap.GetString("Master_Host"),
|
||||
Port: rowMap.GetInt("Master_Port"),
|
||||
}
|
||||
return nil
|
||||
})
|
||||
return masterKey, err
|
||||
}
|
||||
|
||||
func getMasterConnectionConfigSafe(connectionConfig *mysql.ConnectionConfig, visitedKeys *mysql.InstanceKeyMap) (masterConfig *mysql.ConnectionConfig, err error) {
|
||||
log.Debugf("Looking for master on %+v", connectionConfig.Key)
|
||||
|
||||
masterKey, err := getMasterKeyFromSlaveStatus(connectionConfig)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if masterKey == nil {
|
||||
return connectionConfig, nil
|
||||
}
|
||||
if !masterKey.IsValid() {
|
||||
return connectionConfig, nil
|
||||
}
|
||||
masterConfig = connectionConfig.Duplicate()
|
||||
masterConfig.Key = *masterKey
|
||||
|
||||
log.Debugf("Master of %+v is %+v", connectionConfig.Key, masterConfig.Key)
|
||||
if visitedKeys.HasKey(masterConfig.Key) {
|
||||
return nil, fmt.Errorf("There seems to be a master-master setup at %+v. This is unsupported. Bailing out", masterConfig.Key)
|
||||
}
|
||||
visitedKeys.AddKey(masterConfig.Key)
|
||||
return getMasterConnectionConfigSafe(masterConfig, visitedKeys)
|
||||
return mysql.GetMasterConnectionConfigSafe(this.connectionConfig, visitedKeys)
|
||||
}
|
||||
|
@ -15,6 +15,7 @@ import (
|
||||
|
||||
"github.com/github/gh-osc/go/base"
|
||||
"github.com/github/gh-osc/go/binlog"
|
||||
"github.com/github/gh-osc/go/mysql"
|
||||
"github.com/github/gh-osc/go/sql"
|
||||
|
||||
"github.com/outbrain/golib/log"
|
||||
@ -87,11 +88,7 @@ func (this *Migrator) acceptSignals() {
|
||||
}
|
||||
|
||||
func (this *Migrator) shouldThrottle() (result bool, reason string) {
|
||||
lag := atomic.LoadInt64(&this.migrationContext.CurrentLag)
|
||||
|
||||
if time.Duration(lag) > time.Duration(this.migrationContext.MaxLagMillisecondsThrottleThreshold)*time.Millisecond {
|
||||
return true, fmt.Sprintf("lag=%fs", time.Duration(lag).Seconds())
|
||||
}
|
||||
// User-based throttle
|
||||
if this.migrationContext.ThrottleFlagFile != "" {
|
||||
if _, err := os.Stat(this.migrationContext.ThrottleFlagFile); err == nil {
|
||||
// Throttle file defined and exists!
|
||||
@ -104,6 +101,20 @@ func (this *Migrator) shouldThrottle() (result bool, reason string) {
|
||||
return true, "flag-file"
|
||||
}
|
||||
}
|
||||
// Replication lag throttle
|
||||
lag := atomic.LoadInt64(&this.migrationContext.CurrentLag)
|
||||
if time.Duration(lag) > time.Duration(this.migrationContext.MaxLagMillisecondsThrottleThreshold)*time.Millisecond {
|
||||
return true, fmt.Sprintf("lag=%fs", time.Duration(lag).Seconds())
|
||||
}
|
||||
if this.migrationContext.TestOnReplica {
|
||||
replicationLag, err := mysql.GetMaxReplicationLag(this.migrationContext.InspectorConnectionConfig, this.migrationContext.ThrottleControlReplicaKeys, this.migrationContext.ReplictionLagQuery)
|
||||
if err != nil {
|
||||
return true, err.Error()
|
||||
}
|
||||
if replicationLag > time.Duration(this.migrationContext.MaxLagMillisecondsThrottleThreshold)*time.Millisecond {
|
||||
return true, fmt.Sprintf("replica-lag=%fs", replicationLag.Seconds())
|
||||
}
|
||||
}
|
||||
|
||||
for variableName, threshold := range this.migrationContext.MaxLoad {
|
||||
value, err := this.applier.ShowStatusVariable(variableName)
|
||||
@ -307,6 +318,8 @@ func (this *Migrator) Migrate() (err error) {
|
||||
return nil
|
||||
}
|
||||
|
||||
// stopWritesAndCompleteMigration performs the final step of migration, based on migration
|
||||
// type (on replica? bumpy? safe?)
|
||||
func (this *Migrator) stopWritesAndCompleteMigration() (err error) {
|
||||
if this.migrationContext.Noop {
|
||||
log.Debugf("Noop operation; not really swapping tables")
|
||||
@ -335,6 +348,10 @@ func (this *Migrator) stopWritesAndCompleteMigration() (err error) {
|
||||
return
|
||||
}
|
||||
|
||||
// stopWritesAndCompleteMigrationOnMasterQuickAndBumpy will lock down the original table, execute
|
||||
// what's left of last DML entries, and **non-atomically** swap original->old, then new->original.
|
||||
// There is a point in time where the "original" table does not exist and queries are non-blocked
|
||||
// and failing.
|
||||
func (this *Migrator) stopWritesAndCompleteMigrationOnMasterQuickAndBumpy() (err error) {
|
||||
if err := this.retryOperation(this.applier.LockTables); err != nil {
|
||||
return err
|
||||
@ -366,6 +383,8 @@ func (this *Migrator) stopWritesAndCompleteMigrationOnMasterQuickAndBumpy() (err
|
||||
return nil
|
||||
}
|
||||
|
||||
// stopWritesAndCompleteMigrationOnMasterViaLock will lock down the original table, execute
|
||||
// what's left of last DML entries, and atomically swap & unlock (original->old && new->original)
|
||||
func (this *Migrator) stopWritesAndCompleteMigrationOnMasterViaLock() (err error) {
|
||||
lockGrabbed := make(chan error, 1)
|
||||
okToReleaseLock := make(chan bool, 1)
|
||||
@ -430,6 +449,10 @@ func (this *Migrator) stopWritesAndCompleteMigrationOnMasterViaLock() (err error
|
||||
return nil
|
||||
}
|
||||
|
||||
// stopWritesAndCompleteMigrationOnReplica will stop replication IO thread, apply
|
||||
// what DML events are left, and that's it.
|
||||
// This only applies in --test-on-replica. It leaves replication stopped, with both tables
|
||||
// in sync. There is no table swap.
|
||||
func (this *Migrator) stopWritesAndCompleteMigrationOnReplica() (err error) {
|
||||
log.Debugf("testing on replica. Instead of LOCK tables I will STOP SLAVE")
|
||||
if err := this.retryOperation(this.applier.StopSlaveIOThread); err != nil {
|
||||
@ -469,6 +492,9 @@ func (this *Migrator) initiateInspector() (err error) {
|
||||
this.migrationContext.ApplierConnectionConfig.Key, this.migrationContext.InspectorConnectionConfig.Key,
|
||||
)
|
||||
this.migrationContext.ApplierConnectionConfig = this.migrationContext.InspectorConnectionConfig.Duplicate()
|
||||
if this.migrationContext.ThrottleControlReplicaKeys.Len() == 0 {
|
||||
this.migrationContext.ThrottleControlReplicaKeys.AddKey(this.migrationContext.InspectorConnectionConfig.Key)
|
||||
}
|
||||
} else if this.migrationContext.InspectorIsAlsoApplier() && !this.migrationContext.AllowedRunningOnMaster {
|
||||
return fmt.Errorf("It seems like this migration attempt to run directly on master. Preferably it would be executed on a replica (and this reduces load from the master). To proceed please provide --allow-on-master")
|
||||
}
|
||||
|
@ -17,6 +17,10 @@ func NewInstanceKeyMap() *InstanceKeyMap {
|
||||
return &InstanceKeyMap{}
|
||||
}
|
||||
|
||||
func (this *InstanceKeyMap) Len() int {
|
||||
return len(*this)
|
||||
}
|
||||
|
||||
// AddKey adds a single key to this map
|
||||
func (this *InstanceKeyMap) AddKey(key InstanceKey) {
|
||||
(*this)[key] = true
|
||||
@ -83,6 +87,9 @@ func (this *InstanceKeyMap) ReadJson(jsonString string) error {
|
||||
|
||||
// ReadJson unmarshalls a json into this map
|
||||
func (this *InstanceKeyMap) ReadCommaDelimitedList(list string) error {
|
||||
if list == "" {
|
||||
return nil
|
||||
}
|
||||
tokens := strings.Split(list, ",")
|
||||
for _, token := range tokens {
|
||||
key, err := ParseRawInstanceKeyLoose(token)
|
||||
|
109
go/mysql/utils.go
Normal file
109
go/mysql/utils.go
Normal file
@ -0,0 +1,109 @@
|
||||
/*
|
||||
Copyright 2016 GitHub Inc.
|
||||
See https://github.com/github/gh-osc/blob/master/LICENSE
|
||||
*/
|
||||
|
||||
package mysql
|
||||
|
||||
import (
|
||||
gosql "database/sql"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/outbrain/golib/log"
|
||||
"github.com/outbrain/golib/sqlutils"
|
||||
)
|
||||
|
||||
// GetReplicationLag returns replication lag for a given connection config; either by explicit query
|
||||
// or via SHOW SLAVE STATUS
|
||||
func GetReplicationLag(connectionConfig *ConnectionConfig, replicationLagQuery string) (replicationLag time.Duration, err error) {
|
||||
dbUri := connectionConfig.GetDBUri("information_schema")
|
||||
var db *gosql.DB
|
||||
if db, _, err = sqlutils.GetDB(dbUri); err != nil {
|
||||
return replicationLag, err
|
||||
}
|
||||
|
||||
if replicationLagQuery != "" {
|
||||
var floatLag float64
|
||||
err = db.QueryRow(replicationLagQuery).Scan(&floatLag)
|
||||
return time.Duration(int64(floatLag*1000)) * time.Millisecond, err
|
||||
}
|
||||
// No explicit replication lag query.
|
||||
err = sqlutils.QueryRowsMap(db, `show slave status`, func(m sqlutils.RowMap) error {
|
||||
secondsBehindMaster := m.GetNullInt64("Seconds_Behind_Master")
|
||||
if !secondsBehindMaster.Valid {
|
||||
return fmt.Errorf("Replication not running on %+v", connectionConfig.Key)
|
||||
}
|
||||
replicationLag = time.Duration(secondsBehindMaster.Int64) * time.Second
|
||||
return nil
|
||||
})
|
||||
return replicationLag, err
|
||||
}
|
||||
|
||||
// GetMaxReplicationLag concurrently checks for replication lag on given list of instance keys,
|
||||
// each via GetReplicationLag
|
||||
func GetMaxReplicationLag(baseConnectionConfig *ConnectionConfig, instanceKeyMap *InstanceKeyMap, replicationLagQuery string) (replicationLag time.Duration, err error) {
|
||||
if instanceKeyMap.Len() == 0 {
|
||||
return 0, nil
|
||||
}
|
||||
lagsChan := make(chan time.Duration, instanceKeyMap.Len())
|
||||
errorsChan := make(chan error, instanceKeyMap.Len())
|
||||
for key := range *instanceKeyMap {
|
||||
connectionConfig := baseConnectionConfig.Duplicate()
|
||||
connectionConfig.Key = key
|
||||
go func() {
|
||||
lag, err := GetReplicationLag(connectionConfig, replicationLagQuery)
|
||||
lagsChan <- lag
|
||||
errorsChan <- err
|
||||
}()
|
||||
}
|
||||
for range *instanceKeyMap {
|
||||
if lagError := <-errorsChan; lagError != nil {
|
||||
err = lagError
|
||||
}
|
||||
if lag := <-lagsChan; lag.Nanoseconds() > replicationLag.Nanoseconds() {
|
||||
replicationLag = lag
|
||||
}
|
||||
}
|
||||
return replicationLag, err
|
||||
}
|
||||
|
||||
func GetMasterKeyFromSlaveStatus(connectionConfig *ConnectionConfig) (masterKey *InstanceKey, err error) {
|
||||
currentUri := connectionConfig.GetDBUri("information_schema")
|
||||
db, _, err := sqlutils.GetDB(currentUri)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
err = sqlutils.QueryRowsMap(db, `show slave status`, func(rowMap sqlutils.RowMap) error {
|
||||
masterKey = &InstanceKey{
|
||||
Hostname: rowMap.GetString("Master_Host"),
|
||||
Port: rowMap.GetInt("Master_Port"),
|
||||
}
|
||||
return nil
|
||||
})
|
||||
return masterKey, err
|
||||
}
|
||||
|
||||
func GetMasterConnectionConfigSafe(connectionConfig *ConnectionConfig, visitedKeys *InstanceKeyMap) (masterConfig *ConnectionConfig, err error) {
|
||||
log.Debugf("Looking for master on %+v", connectionConfig.Key)
|
||||
|
||||
masterKey, err := GetMasterKeyFromSlaveStatus(connectionConfig)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if masterKey == nil {
|
||||
return connectionConfig, nil
|
||||
}
|
||||
if !masterKey.IsValid() {
|
||||
return connectionConfig, nil
|
||||
}
|
||||
masterConfig = connectionConfig.Duplicate()
|
||||
masterConfig.Key = *masterKey
|
||||
|
||||
log.Debugf("Master of %+v is %+v", connectionConfig.Key, masterConfig.Key)
|
||||
if visitedKeys.HasKey(masterConfig.Key) {
|
||||
return nil, fmt.Errorf("There seems to be a master-master setup at %+v. This is unsupported. Bailing out", masterConfig.Key)
|
||||
}
|
||||
visitedKeys.AddKey(masterConfig.Key)
|
||||
return GetMasterConnectionConfigSafe(masterConfig, visitedKeys)
|
||||
}
|
Loading…
Reference in New Issue
Block a user