Compare commits

...

6 Commits

Author SHA1 Message Date
Shao Hou Kun
0a033c76c1
feat(feat-set-binlogsyncer-maxreconnectattempts): feat-set-binlogsyncer-maxreconnectattempts (#1279)
Co-authored-by: shaohoukun <shaohoukun@meituan.com>
2023-05-25 22:45:53 +02:00
Rashiq
b7db8c6ca7
Merge pull request #1180 from lmtwga/master 2023-02-06 14:42:04 +01:00
Tim Vaillancourt
bea5323816
Merge branch 'master' into master 2023-01-28 00:19:37 +01:00
dm-2
b5387331f8
Merge branch 'master' into master 2022-10-21 17:02:17 +01:00
dm-2
659e4401ba
Merge branch 'master' into master 2022-10-21 17:00:05 +01:00
lmtwga
321e5847ba fix: because lock is not release, drop cutover sentry table is hanged 2022-09-16 17:49:09 +08:00
6 changed files with 22 additions and 22 deletions

View File

@ -61,6 +61,9 @@ It is not reliable to parse the `ALTER` statement to determine if it is instant
`gh-ost` will automatically fallback to the normal DDL process if the attempt to use instant DDL is unsuccessful.
### binlogsyncer-max-reconnect-attempts
`--binlogsyncer-max-reconnect-attempts=0`, the maximum number of attempts to re-establish a broken inspector connection for sync binlog. `0` or `negative number` means infinite retry, default `0`
### conf
`--conf=/path/to/my.cnf`: file where credentials are specified. Should be in (or contain) the following format:

View File

@ -232,6 +232,8 @@ type MigrationContext struct {
recentBinlogCoordinates mysql.BinlogCoordinates
BinlogSyncerMaxReconnectAttempts int
Log Logger
}

View File

@ -36,14 +36,15 @@ func NewGoMySQLReader(migrationContext *base.MigrationContext) *GoMySQLReader {
currentCoordinates: mysql.BinlogCoordinates{},
currentCoordinatesMutex: &sync.Mutex{},
binlogSyncer: replication.NewBinlogSyncer(replication.BinlogSyncerConfig{
ServerID: uint32(migrationContext.ReplicaServerId),
Flavor: gomysql.MySQLFlavor,
Host: connectionConfig.Key.Hostname,
Port: uint16(connectionConfig.Key.Port),
User: connectionConfig.User,
Password: connectionConfig.Password,
TLSConfig: connectionConfig.TLSConfig(),
UseDecimal: true,
ServerID: uint32(migrationContext.ReplicaServerId),
Flavor: gomysql.MySQLFlavor,
Host: connectionConfig.Key.Hostname,
Port: uint16(connectionConfig.Key.Port),
User: connectionConfig.User,
Password: connectionConfig.Password,
TLSConfig: connectionConfig.TLSConfig(),
UseDecimal: true,
MaxReconnectAttempts: migrationContext.BinlogSyncerMaxReconnectAttempts,
}),
}
}

View File

@ -134,6 +134,7 @@ func main() {
flag.Int64Var(&migrationContext.HooksStatusIntervalSec, "hooks-status-interval", 60, "how many seconds to wait between calling onStatus hook")
flag.UintVar(&migrationContext.ReplicaServerId, "replica-server-id", 99999, "server id used by gh-ost process. Default: 99999")
flag.IntVar(&migrationContext.BinlogSyncerMaxReconnectAttempts, "binlogsyncer-max-reconnect-attempts", 0, "when master node fails, the maximum number of binlog synchronization attempts to reconnect. 0 is unlimited")
maxLoad := flag.String("max-load", "", "Comma delimited status-name=threshold. e.g: 'Threads_running=100,Threads_connected=500'. When status exceeds threshold, app throttles writes")
criticalLoad := flag.String("critical-load", "", "Comma delimited status-name=threshold, same format as --max-load. When status exceeds threshold, app panics and quits")

View File

@ -9,7 +9,6 @@ import (
gosql "database/sql"
"fmt"
"strings"
"sync"
"sync/atomic"
"time"
@ -935,7 +934,7 @@ func (this *Applier) CreateAtomicCutOverSentryTable() error {
}
// AtomicCutOverMagicLock
func (this *Applier) AtomicCutOverMagicLock(sessionIdChan chan int64, tableLocked chan<- error, okToUnlockTable <-chan bool, tableUnlocked chan<- error, dropCutOverSentryTableOnce *sync.Once) error {
func (this *Applier) AtomicCutOverMagicLock(sessionIdChan chan int64, tableLocked chan<- error, okToUnlockTable <-chan bool, tableUnlocked chan<- error) error {
tx, err := this.db.Begin()
if err != nil {
tableLocked <- err
@ -946,6 +945,7 @@ func (this *Applier) AtomicCutOverMagicLock(sessionIdChan chan int64, tableLocke
tableLocked <- fmt.Errorf("Unexpected error in AtomicCutOverMagicLock(), injected to release blocking channel reads")
tableUnlocked <- fmt.Errorf("Unexpected error in AtomicCutOverMagicLock(), injected to release blocking channel reads")
tx.Rollback()
this.DropAtomicCutOverSentryTableIfExists()
}()
var sessionId int64
@ -1014,12 +1014,10 @@ func (this *Applier) AtomicCutOverMagicLock(sessionIdChan chan int64, tableLocke
sql.EscapeName(this.migrationContext.GetOldTableName()),
)
dropCutOverSentryTableOnce.Do(func() {
if _, err := tx.Exec(query); err != nil {
this.migrationContext.Log.Errore(err)
// We DO NOT return here because we must `UNLOCK TABLES`!
}
})
if _, err := tx.Exec(query); err != nil {
this.migrationContext.Log.Errore(err)
// We DO NOT return here because we must `UNLOCK TABLES`!
}
// Tables still locked
this.migrationContext.Log.Infof("Releasing lock from %s.%s, %s.%s",

View File

@ -13,7 +13,6 @@ import (
"math"
"os"
"strings"
"sync"
"sync/atomic"
"time"
@ -639,12 +638,8 @@ func (this *Migrator) atomicCutOver() (err error) {
defer atomic.StoreInt64(&this.migrationContext.InCutOverCriticalSectionFlag, 0)
okToUnlockTable := make(chan bool, 4)
var dropCutOverSentryTableOnce sync.Once
defer func() {
okToUnlockTable <- true
dropCutOverSentryTableOnce.Do(func() {
this.applier.DropAtomicCutOverSentryTableIfExists()
})
}()
atomic.StoreInt64(&this.migrationContext.AllEventsUpToLockProcessedInjectedFlag, 0)
@ -653,7 +648,7 @@ func (this *Migrator) atomicCutOver() (err error) {
tableLocked := make(chan error, 2)
tableUnlocked := make(chan error, 2)
go func() {
if err := this.applier.AtomicCutOverMagicLock(lockOriginalSessionIdChan, tableLocked, okToUnlockTable, tableUnlocked, &dropCutOverSentryTableOnce); err != nil {
if err := this.applier.AtomicCutOverMagicLock(lockOriginalSessionIdChan, tableLocked, okToUnlockTable, tableUnlocked); err != nil {
this.migrationContext.Log.Errore(err)
}
}()