- added throttle-control-replicas flag, a list of control replicas

- when `--test-on-replica`, the tested replica is implicitly a control replica
- added `replication-lag-query`, an alternate query to `SHOW SLAVE STATUS` to get replication lag
- throttling takes both the above into consideration
This commit is contained in:
Shlomi Noach 2016-05-01 21:36:36 +03:00
parent 421ab0fc83
commit 07063a4181
6 changed files with 157 additions and 47 deletions

View File

@ -43,6 +43,8 @@ type MigrationContext struct {
ChunkSize int64 ChunkSize int64
MaxLagMillisecondsThrottleThreshold int64 MaxLagMillisecondsThrottleThreshold int64
ReplictionLagQuery string
ThrottleControlReplicaKeys *mysql.InstanceKeyMap
ThrottleFlagFile string ThrottleFlagFile string
ThrottleAdditionalFlagFile string ThrottleAdditionalFlagFile string
MaxLoad map[string]int64 MaxLoad map[string]int64
@ -102,6 +104,7 @@ func newMigrationContext() *MigrationContext {
SwapTablesTimeoutSeconds: 3, SwapTablesTimeoutSeconds: 3,
MaxLoad: make(map[string]int64), MaxLoad: make(map[string]int64),
throttleMutex: &sync.Mutex{}, throttleMutex: &sync.Mutex{},
ThrottleControlReplicaKeys: mysql.NewInstanceKeyMap(),
} }
} }

View File

@ -44,6 +44,8 @@ func main() {
migrationContext.ChunkSize = 100000 migrationContext.ChunkSize = 100000
} }
flag.Int64Var(&migrationContext.MaxLagMillisecondsThrottleThreshold, "max-lag-millis", 1500, "replication lag at which to throttle operation") flag.Int64Var(&migrationContext.MaxLagMillisecondsThrottleThreshold, "max-lag-millis", 1500, "replication lag at which to throttle operation")
flag.StringVar(&migrationContext.ReplictionLagQuery, "replication-lag-query", "", "Query that detects replication lag in seconds. Result can be a floating point (by default gh-osc issues SHOW SLAVE STATUS and reads Seconds_behind_master). If you're using pt-heartbeat, query would be something like: SELECT ROUND(UNIX_TIMESTAMP() - MAX(UNIX_TIMESTAMP(ts))) AS delay FROM my_schema.heartbeat")
throttleControlReplicas := flag.String("throttle-control-replicas", "", "List of replicas on which to check for lag; comma delimited. Example: myhost1.com:3306,myhost2.com,myhost3.com:3307")
flag.StringVar(&migrationContext.ThrottleFlagFile, "throttle-flag-file", "", "operation pauses when this file exists; hint: use a file that is specific to the table being altered") flag.StringVar(&migrationContext.ThrottleFlagFile, "throttle-flag-file", "", "operation pauses when this file exists; hint: use a file that is specific to the table being altered")
flag.StringVar(&migrationContext.ThrottleAdditionalFlagFile, "throttle-additional-flag-file", "/tmp/gh-osc.throttle", "operation pauses when this file exists; hint: keep default, use for throttling multiple gh-osc operations") flag.StringVar(&migrationContext.ThrottleAdditionalFlagFile, "throttle-additional-flag-file", "/tmp/gh-osc.throttle", "operation pauses when this file exists; hint: keep default, use for throttling multiple gh-osc operations")
maxLoad := flag.String("max-load", "", "Comma delimited status-name=threshold. e.g: 'Threads_running=100,Threads_connected=500'") maxLoad := flag.String("max-load", "", "Comma delimited status-name=threshold. e.g: 'Threads_running=100,Threads_connected=500'")
@ -91,6 +93,9 @@ func main() {
if migrationContext.QuickAndBumpySwapTables && migrationContext.TestOnReplica { if migrationContext.QuickAndBumpySwapTables && migrationContext.TestOnReplica {
log.Fatalf("--quick-and-bumpy-swap-tables and --test-on-replica are mutually exclusive (the former implies migrating on master)") log.Fatalf("--quick-and-bumpy-swap-tables and --test-on-replica are mutually exclusive (the former implies migrating on master)")
} }
if err := migrationContext.ThrottleControlReplicaKeys.ReadCommaDelimitedList(*throttleControlReplicas); err != nil {
log.Fatale(err)
}
if err := migrationContext.ReadMaxLoad(*maxLoad); err != nil { if err := migrationContext.ReadMaxLoad(*maxLoad); err != nil {
log.Fatale(err) log.Fatale(err)
} }

View File

@ -188,7 +188,7 @@ func (this *Inspector) validateGrants() error {
func (this *Inspector) restartReplication() error { func (this *Inspector) restartReplication() error {
log.Infof("Restarting replication on %s:%d to make sure binlog settings apply to replication thread", this.connectionConfig.Key.Hostname, this.connectionConfig.Key.Port) log.Infof("Restarting replication on %s:%d to make sure binlog settings apply to replication thread", this.connectionConfig.Key.Hostname, this.connectionConfig.Key.Port)
masterKey, _ := getMasterKeyFromSlaveStatus(this.connectionConfig) masterKey, _ := mysql.GetMasterKeyFromSlaveStatus(this.connectionConfig)
if masterKey == nil { if masterKey == nil {
// This is not a replica // This is not a replica
return nil return nil
@ -503,45 +503,5 @@ func (this *Inspector) readChangelogState() (map[string]string, error) {
func (this *Inspector) getMasterConnectionConfig() (applierConfig *mysql.ConnectionConfig, err error) { func (this *Inspector) getMasterConnectionConfig() (applierConfig *mysql.ConnectionConfig, err error) {
visitedKeys := mysql.NewInstanceKeyMap() visitedKeys := mysql.NewInstanceKeyMap()
return getMasterConnectionConfigSafe(this.connectionConfig, visitedKeys) return mysql.GetMasterConnectionConfigSafe(this.connectionConfig, visitedKeys)
}
func getMasterKeyFromSlaveStatus(connectionConfig *mysql.ConnectionConfig) (masterKey *mysql.InstanceKey, err error) {
currentUri := connectionConfig.GetDBUri("information_schema")
db, _, err := sqlutils.GetDB(currentUri)
if err != nil {
return nil, err
}
err = sqlutils.QueryRowsMap(db, `show slave status`, func(rowMap sqlutils.RowMap) error {
masterKey = &mysql.InstanceKey{
Hostname: rowMap.GetString("Master_Host"),
Port: rowMap.GetInt("Master_Port"),
}
return nil
})
return masterKey, err
}
func getMasterConnectionConfigSafe(connectionConfig *mysql.ConnectionConfig, visitedKeys *mysql.InstanceKeyMap) (masterConfig *mysql.ConnectionConfig, err error) {
log.Debugf("Looking for master on %+v", connectionConfig.Key)
masterKey, err := getMasterKeyFromSlaveStatus(connectionConfig)
if err != nil {
return nil, err
}
if masterKey == nil {
return connectionConfig, nil
}
if !masterKey.IsValid() {
return connectionConfig, nil
}
masterConfig = connectionConfig.Duplicate()
masterConfig.Key = *masterKey
log.Debugf("Master of %+v is %+v", connectionConfig.Key, masterConfig.Key)
if visitedKeys.HasKey(masterConfig.Key) {
return nil, fmt.Errorf("There seems to be a master-master setup at %+v. This is unsupported. Bailing out", masterConfig.Key)
}
visitedKeys.AddKey(masterConfig.Key)
return getMasterConnectionConfigSafe(masterConfig, visitedKeys)
} }

View File

@ -15,6 +15,7 @@ import (
"github.com/github/gh-osc/go/base" "github.com/github/gh-osc/go/base"
"github.com/github/gh-osc/go/binlog" "github.com/github/gh-osc/go/binlog"
"github.com/github/gh-osc/go/mysql"
"github.com/github/gh-osc/go/sql" "github.com/github/gh-osc/go/sql"
"github.com/outbrain/golib/log" "github.com/outbrain/golib/log"
@ -87,11 +88,7 @@ func (this *Migrator) acceptSignals() {
} }
func (this *Migrator) shouldThrottle() (result bool, reason string) { func (this *Migrator) shouldThrottle() (result bool, reason string) {
lag := atomic.LoadInt64(&this.migrationContext.CurrentLag) // User-based throttle
if time.Duration(lag) > time.Duration(this.migrationContext.MaxLagMillisecondsThrottleThreshold)*time.Millisecond {
return true, fmt.Sprintf("lag=%fs", time.Duration(lag).Seconds())
}
if this.migrationContext.ThrottleFlagFile != "" { if this.migrationContext.ThrottleFlagFile != "" {
if _, err := os.Stat(this.migrationContext.ThrottleFlagFile); err == nil { if _, err := os.Stat(this.migrationContext.ThrottleFlagFile); err == nil {
// Throttle file defined and exists! // Throttle file defined and exists!
@ -104,6 +101,20 @@ func (this *Migrator) shouldThrottle() (result bool, reason string) {
return true, "flag-file" return true, "flag-file"
} }
} }
// Replication lag throttle
lag := atomic.LoadInt64(&this.migrationContext.CurrentLag)
if time.Duration(lag) > time.Duration(this.migrationContext.MaxLagMillisecondsThrottleThreshold)*time.Millisecond {
return true, fmt.Sprintf("lag=%fs", time.Duration(lag).Seconds())
}
if this.migrationContext.TestOnReplica {
replicationLag, err := mysql.GetMaxReplicationLag(this.migrationContext.InspectorConnectionConfig, this.migrationContext.ThrottleControlReplicaKeys, this.migrationContext.ReplictionLagQuery)
if err != nil {
return true, err.Error()
}
if replicationLag > time.Duration(this.migrationContext.MaxLagMillisecondsThrottleThreshold)*time.Millisecond {
return true, fmt.Sprintf("replica-lag=%fs", replicationLag.Seconds())
}
}
for variableName, threshold := range this.migrationContext.MaxLoad { for variableName, threshold := range this.migrationContext.MaxLoad {
value, err := this.applier.ShowStatusVariable(variableName) value, err := this.applier.ShowStatusVariable(variableName)
@ -307,6 +318,8 @@ func (this *Migrator) Migrate() (err error) {
return nil return nil
} }
// stopWritesAndCompleteMigration performs the final step of migration, based on migration
// type (on replica? bumpy? safe?)
func (this *Migrator) stopWritesAndCompleteMigration() (err error) { func (this *Migrator) stopWritesAndCompleteMigration() (err error) {
if this.migrationContext.Noop { if this.migrationContext.Noop {
log.Debugf("Noop operation; not really swapping tables") log.Debugf("Noop operation; not really swapping tables")
@ -335,6 +348,10 @@ func (this *Migrator) stopWritesAndCompleteMigration() (err error) {
return return
} }
// stopWritesAndCompleteMigrationOnMasterQuickAndBumpy will lock down the original table, execute
// what's left of last DML entries, and **non-atomically** swap original->old, then new->original.
// There is a point in time where the "original" table does not exist and queries are non-blocked
// and failing.
func (this *Migrator) stopWritesAndCompleteMigrationOnMasterQuickAndBumpy() (err error) { func (this *Migrator) stopWritesAndCompleteMigrationOnMasterQuickAndBumpy() (err error) {
if err := this.retryOperation(this.applier.LockTables); err != nil { if err := this.retryOperation(this.applier.LockTables); err != nil {
return err return err
@ -366,6 +383,8 @@ func (this *Migrator) stopWritesAndCompleteMigrationOnMasterQuickAndBumpy() (err
return nil return nil
} }
// stopWritesAndCompleteMigrationOnMasterViaLock will lock down the original table, execute
// what's left of last DML entries, and atomically swap & unlock (original->old && new->original)
func (this *Migrator) stopWritesAndCompleteMigrationOnMasterViaLock() (err error) { func (this *Migrator) stopWritesAndCompleteMigrationOnMasterViaLock() (err error) {
lockGrabbed := make(chan error, 1) lockGrabbed := make(chan error, 1)
okToReleaseLock := make(chan bool, 1) okToReleaseLock := make(chan bool, 1)
@ -430,6 +449,10 @@ func (this *Migrator) stopWritesAndCompleteMigrationOnMasterViaLock() (err error
return nil return nil
} }
// stopWritesAndCompleteMigrationOnReplica will stop replication IO thread, apply
// what DML events are left, and that's it.
// This only applies in --test-on-replica. It leaves replication stopped, with both tables
// in sync. There is no table swap.
func (this *Migrator) stopWritesAndCompleteMigrationOnReplica() (err error) { func (this *Migrator) stopWritesAndCompleteMigrationOnReplica() (err error) {
log.Debugf("testing on replica. Instead of LOCK tables I will STOP SLAVE") log.Debugf("testing on replica. Instead of LOCK tables I will STOP SLAVE")
if err := this.retryOperation(this.applier.StopSlaveIOThread); err != nil { if err := this.retryOperation(this.applier.StopSlaveIOThread); err != nil {
@ -469,6 +492,9 @@ func (this *Migrator) initiateInspector() (err error) {
this.migrationContext.ApplierConnectionConfig.Key, this.migrationContext.InspectorConnectionConfig.Key, this.migrationContext.ApplierConnectionConfig.Key, this.migrationContext.InspectorConnectionConfig.Key,
) )
this.migrationContext.ApplierConnectionConfig = this.migrationContext.InspectorConnectionConfig.Duplicate() this.migrationContext.ApplierConnectionConfig = this.migrationContext.InspectorConnectionConfig.Duplicate()
if this.migrationContext.ThrottleControlReplicaKeys.Len() == 0 {
this.migrationContext.ThrottleControlReplicaKeys.AddKey(this.migrationContext.InspectorConnectionConfig.Key)
}
} else if this.migrationContext.InspectorIsAlsoApplier() && !this.migrationContext.AllowedRunningOnMaster { } else if this.migrationContext.InspectorIsAlsoApplier() && !this.migrationContext.AllowedRunningOnMaster {
return fmt.Errorf("It seems like this migration attempt to run directly on master. Preferably it would be executed on a replica (and this reduces load from the master). To proceed please provide --allow-on-master") return fmt.Errorf("It seems like this migration attempt to run directly on master. Preferably it would be executed on a replica (and this reduces load from the master). To proceed please provide --allow-on-master")
} }

View File

@ -17,6 +17,10 @@ func NewInstanceKeyMap() *InstanceKeyMap {
return &InstanceKeyMap{} return &InstanceKeyMap{}
} }
func (this *InstanceKeyMap) Len() int {
return len(*this)
}
// AddKey adds a single key to this map // AddKey adds a single key to this map
func (this *InstanceKeyMap) AddKey(key InstanceKey) { func (this *InstanceKeyMap) AddKey(key InstanceKey) {
(*this)[key] = true (*this)[key] = true
@ -83,6 +87,9 @@ func (this *InstanceKeyMap) ReadJson(jsonString string) error {
// ReadJson unmarshalls a json into this map // ReadJson unmarshalls a json into this map
func (this *InstanceKeyMap) ReadCommaDelimitedList(list string) error { func (this *InstanceKeyMap) ReadCommaDelimitedList(list string) error {
if list == "" {
return nil
}
tokens := strings.Split(list, ",") tokens := strings.Split(list, ",")
for _, token := range tokens { for _, token := range tokens {
key, err := ParseRawInstanceKeyLoose(token) key, err := ParseRawInstanceKeyLoose(token)

109
go/mysql/utils.go Normal file
View File

@ -0,0 +1,109 @@
/*
Copyright 2016 GitHub Inc.
See https://github.com/github/gh-osc/blob/master/LICENSE
*/
package mysql
import (
gosql "database/sql"
"fmt"
"time"
"github.com/outbrain/golib/log"
"github.com/outbrain/golib/sqlutils"
)
// GetReplicationLag returns replication lag for a given connection config; either by explicit query
// or via SHOW SLAVE STATUS
func GetReplicationLag(connectionConfig *ConnectionConfig, replicationLagQuery string) (replicationLag time.Duration, err error) {
dbUri := connectionConfig.GetDBUri("information_schema")
var db *gosql.DB
if db, _, err = sqlutils.GetDB(dbUri); err != nil {
return replicationLag, err
}
if replicationLagQuery != "" {
var floatLag float64
err = db.QueryRow(replicationLagQuery).Scan(&floatLag)
return time.Duration(int64(floatLag*1000)) * time.Millisecond, err
}
// No explicit replication lag query.
err = sqlutils.QueryRowsMap(db, `show slave status`, func(m sqlutils.RowMap) error {
secondsBehindMaster := m.GetNullInt64("Seconds_Behind_Master")
if !secondsBehindMaster.Valid {
return fmt.Errorf("Replication not running on %+v", connectionConfig.Key)
}
replicationLag = time.Duration(secondsBehindMaster.Int64) * time.Second
return nil
})
return replicationLag, err
}
// GetMaxReplicationLag concurrently checks for replication lag on given list of instance keys,
// each via GetReplicationLag
func GetMaxReplicationLag(baseConnectionConfig *ConnectionConfig, instanceKeyMap *InstanceKeyMap, replicationLagQuery string) (replicationLag time.Duration, err error) {
if instanceKeyMap.Len() == 0 {
return 0, nil
}
lagsChan := make(chan time.Duration, instanceKeyMap.Len())
errorsChan := make(chan error, instanceKeyMap.Len())
for key := range *instanceKeyMap {
connectionConfig := baseConnectionConfig.Duplicate()
connectionConfig.Key = key
go func() {
lag, err := GetReplicationLag(connectionConfig, replicationLagQuery)
lagsChan <- lag
errorsChan <- err
}()
}
for range *instanceKeyMap {
if lagError := <-errorsChan; lagError != nil {
err = lagError
}
if lag := <-lagsChan; lag.Nanoseconds() > replicationLag.Nanoseconds() {
replicationLag = lag
}
}
return replicationLag, err
}
func GetMasterKeyFromSlaveStatus(connectionConfig *ConnectionConfig) (masterKey *InstanceKey, err error) {
currentUri := connectionConfig.GetDBUri("information_schema")
db, _, err := sqlutils.GetDB(currentUri)
if err != nil {
return nil, err
}
err = sqlutils.QueryRowsMap(db, `show slave status`, func(rowMap sqlutils.RowMap) error {
masterKey = &InstanceKey{
Hostname: rowMap.GetString("Master_Host"),
Port: rowMap.GetInt("Master_Port"),
}
return nil
})
return masterKey, err
}
func GetMasterConnectionConfigSafe(connectionConfig *ConnectionConfig, visitedKeys *InstanceKeyMap) (masterConfig *ConnectionConfig, err error) {
log.Debugf("Looking for master on %+v", connectionConfig.Key)
masterKey, err := GetMasterKeyFromSlaveStatus(connectionConfig)
if err != nil {
return nil, err
}
if masterKey == nil {
return connectionConfig, nil
}
if !masterKey.IsValid() {
return connectionConfig, nil
}
masterConfig = connectionConfig.Duplicate()
masterConfig.Key = *masterKey
log.Debugf("Master of %+v is %+v", connectionConfig.Key, masterConfig.Key)
if visitedKeys.HasKey(masterConfig.Key) {
return nil, fmt.Errorf("There seems to be a master-master setup at %+v. This is unsupported. Bailing out", masterConfig.Key)
}
visitedKeys.AddKey(masterConfig.Key)
return GetMasterConnectionConfigSafe(masterConfig, visitedKeys)
}