diff --git a/.golangci.yml b/.golangci.yml index a71dcdd..4e0bc4f 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -5,10 +5,7 @@ run: linters: disable: - errcheck - - staticcheck enable: - - gosimple - - govet - noctx - rowserrcheck - sqlclosecheck diff --git a/go/base/context.go b/go/base/context.go index 38c82a3..e9dae69 100644 --- a/go/base/context.go +++ b/go/base/context.go @@ -846,30 +846,30 @@ func (this *MigrationContext) ReadConfigFile() error { return err } - if cfg.Section("client").Haskey("user") { + if cfg.Section("client").HasKey("user") { this.config.Client.User = cfg.Section("client").Key("user").String() } - if cfg.Section("client").Haskey("password") { + if cfg.Section("client").HasKey("password") { this.config.Client.Password = cfg.Section("client").Key("password").String() } - if cfg.Section("osc").Haskey("chunk_size") { + if cfg.Section("osc").HasKey("chunk_size") { this.config.Osc.Chunk_Size, err = cfg.Section("osc").Key("chunk_size").Int64() if err != nil { return fmt.Errorf("Unable to read osc chunk size: %s", err.Error()) } } - if cfg.Section("osc").Haskey("max_load") { + if cfg.Section("osc").HasKey("max_load") { this.config.Osc.Max_Load = cfg.Section("osc").Key("max_load").String() } - if cfg.Section("osc").Haskey("replication_lag_query") { + if cfg.Section("osc").HasKey("replication_lag_query") { this.config.Osc.Replication_Lag_Query = cfg.Section("osc").Key("replication_lag_query").String() } - if cfg.Section("osc").Haskey("max_lag_millis") { + if cfg.Section("osc").HasKey("max_lag_millis") { this.config.Osc.Max_Lag_Millis, err = cfg.Section("osc").Key("max_lag_millis").Int64() if err != nil { return fmt.Errorf("Unable to read max lag millis: %s", err.Error()) diff --git a/go/base/utils.go b/go/base/utils.go index c0e3293..e3950f2 100644 --- a/go/base/utils.go +++ b/go/base/utils.go @@ -69,7 +69,7 @@ func ValidateConnection(db *gosql.DB, connectionConfig *mysql.ConnectionConfig, return "", err } extraPortQuery := `select @@global.extra_port` - if err := db.QueryRow(extraPortQuery).Scan(&extraPort); err != nil { + if err := db.QueryRow(extraPortQuery).Scan(&extraPort); err != nil { // nolint:staticcheck // swallow this error. not all servers support extra_port } // AliyunRDS set users port to "NULL", replace it by gh-ost param diff --git a/go/cmd/gh-ost/main.go b/go/cmd/gh-ost/main.go index d0ae4c3..d046fe8 100644 --- a/go/cmd/gh-ost/main.go +++ b/go/cmd/gh-ost/main.go @@ -19,7 +19,8 @@ import ( _ "github.com/go-sql-driver/mysql" "github.com/openark/golib/log" - "golang.org/x/crypto/ssh/terminal" + // TODO: move to golang.org/x/term + "golang.org/x/crypto/ssh/terminal" // nolint:staticcheck ) var AppVersion string diff --git a/go/logic/applier.go b/go/logic/applier.go index aa6dda3..52b3570 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -1,5 +1,5 @@ /* - Copyright 2021 GitHub Inc. + Copyright 2022 GitHub Inc. See https://github.com/github/gh-ost/blob/master/LICENSE */ @@ -71,7 +71,6 @@ func NewApplier(migrationContext *base.MigrationContext) *Applier { } func (this *Applier) InitDBConnections() (err error) { - applierUri := this.connectionConfig.GetDBUri(this.migrationContext.DatabaseName) if this.db, _, err = mysql.GetDB(this.migrationContext.Uuid, applierUri); err != nil { return err @@ -344,8 +343,10 @@ func (this *Applier) InitiateHeartbeat() { } injectHeartbeat() - heartbeatTick := time.Tick(time.Duration(this.migrationContext.HeartbeatIntervalMilliseconds) * time.Millisecond) - for range heartbeatTick { + ticker := time.NewTicker(time.Duration(this.migrationContext.HeartbeatIntervalMilliseconds) * time.Millisecond) + defer ticker.Stop() + for { + <-ticker.C if atomic.LoadInt64(&this.finishedMigrating) > 0 { return } diff --git a/go/logic/migrator.go b/go/logic/migrator.go index bc2a03f..74772ed 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -26,8 +26,8 @@ type ChangelogState string const ( GhostTableMigrated ChangelogState = "GhostTableMigrated" - AllEventsUpToLockProcessed = "AllEventsUpToLockProcessed" - ReadMigrationRangeValues = "ReadMigrationRangeValues" + AllEventsUpToLockProcessed ChangelogState = "AllEventsUpToLockProcessed" + ReadMigrationRangeValues ChangelogState = "ReadMigrationRangeValues" ) func ReadChangelogState(s string) ChangelogState { @@ -809,17 +809,17 @@ func (this *Migrator) initiateInspector() (err error) { } // initiateStatus sets and activates the printStatus() ticker -func (this *Migrator) initiateStatus() error { +func (this *Migrator) initiateStatus() { this.printStatus(ForcePrintStatusAndHintRule) - statusTick := time.Tick(1 * time.Second) - for range statusTick { + ticker := time.NewTicker(1 * time.Second) + defer ticker.Stop() + for { + <-ticker.C if atomic.LoadInt64(&this.finishedMigrating) > 0 { - return nil + return } go this.printStatus(HeuristicPrintStatusRule) } - - return nil } // printMigrationStatusHint prints a detailed configuration dump, that is useful @@ -1052,8 +1052,10 @@ func (this *Migrator) initiateStreaming() error { }() go func() { - ticker := time.Tick(1 * time.Second) - for range ticker { + ticker := time.NewTicker(1 * time.Second) + defer ticker.Stop() + for { + <-ticker.C if atomic.LoadInt64(&this.finishedMigrating) > 0 { return } diff --git a/go/logic/server.go b/go/logic/server.go index 3d128b1..5356a82 100644 --- a/go/logic/server.go +++ b/go/logic/server.go @@ -134,7 +134,7 @@ func (this *Server) applyServerCommand(command string, writer *bufio.Writer) (pr } } argIsQuestion := (arg == "?") - throttleHint := "# Note: you may only throttle for as long as your binary logs are not purged\n" + throttleHint := "# Note: you may only throttle for as long as your binary logs are not purged" if err := this.hooksExecutor.onInteractiveCommand(command); err != nil { return NoPrintStatusRule, err @@ -282,7 +282,7 @@ help # This message return NoPrintStatusRule, nil } this.migrationContext.SetThrottleQuery(arg) - fmt.Fprintf(writer, throttleHint) + fmt.Fprintln(writer, throttleHint) return ForcePrintStatusAndHintRule, nil } case "throttle-http": @@ -292,7 +292,7 @@ help # This message return NoPrintStatusRule, nil } this.migrationContext.SetThrottleHTTP(arg) - fmt.Fprintf(writer, throttleHint) + fmt.Fprintln(writer, throttleHint) return ForcePrintStatusAndHintRule, nil } case "throttle-control-replicas": @@ -315,7 +315,7 @@ help # This message return NoPrintStatusRule, err } atomic.StoreInt64(&this.migrationContext.ThrottleCommandedByUser, 1) - fmt.Fprintf(writer, throttleHint) + fmt.Fprintln(writer, throttleHint) return ForcePrintStatusAndHintRule, nil } case "no-throttle", "unthrottle", "resume", "continue": diff --git a/go/logic/streamer.go b/go/logic/streamer.go index f7215dc..3041d7b 100644 --- a/go/logic/streamer.go +++ b/go/logic/streamer.go @@ -87,10 +87,10 @@ func (this *EventsStreamer) notifyListeners(binlogEvent *binlog.BinlogDMLEvent) for _, listener := range this.listeners { listener := listener - if strings.ToLower(listener.databaseName) != strings.ToLower(binlogEvent.DatabaseName) { + if !strings.EqualFold(listener.databaseName, binlogEvent.DatabaseName) { continue } - if strings.ToLower(listener.tableName) != strings.ToLower(binlogEvent.TableName) { + if !strings.EqualFold(listener.tableName, binlogEvent.TableName) { continue } if listener.async { diff --git a/go/logic/throttler.go b/go/logic/throttler.go index 8f3d0db..33b08f3 100644 --- a/go/logic/throttler.go +++ b/go/logic/throttler.go @@ -168,8 +168,10 @@ func (this *Throttler) collectReplicationLag(firstThrottlingCollected chan<- boo collectFunc() firstThrottlingCollected <- true - ticker := time.Tick(time.Duration(this.migrationContext.HeartbeatIntervalMilliseconds) * time.Millisecond) - for range ticker { + ticker := time.NewTicker(time.Duration(this.migrationContext.HeartbeatIntervalMilliseconds) * time.Millisecond) + defer ticker.Stop() + for { + <-ticker.C if atomic.LoadInt64(&this.finishedMigrating) > 0 { return } @@ -244,12 +246,16 @@ func (this *Throttler) collectControlReplicasLag() { } this.migrationContext.SetControlReplicasLagResult(readControlReplicasLag()) } - aggressiveTicker := time.Tick(100 * time.Millisecond) + + aggressiveTicker := time.NewTicker(100 * time.Millisecond) + defer aggressiveTicker.Stop() + relaxedFactor := 10 counter := 0 shouldReadLagAggressively := false - for range aggressiveTicker { + for { + <-aggressiveTicker.C if atomic.LoadInt64(&this.finishedMigrating) > 0 { return } @@ -321,8 +327,10 @@ func (this *Throttler) collectThrottleHTTPStatus(firstThrottlingCollected chan<- firstThrottlingCollected <- true collectInterval := time.Duration(this.migrationContext.ThrottleHTTPIntervalMillis) * time.Millisecond - ticker := time.Tick(collectInterval) - for range ticker { + ticker := time.NewTicker(collectInterval) + defer ticker.Stop() + for { + <-ticker.C if atomic.LoadInt64(&this.finishedMigrating) > 0 { return } @@ -441,8 +449,10 @@ func (this *Throttler) initiateThrottlerCollection(firstThrottlingCollected chan this.collectGeneralThrottleMetrics() firstThrottlingCollected <- true - throttlerMetricsTick := time.Tick(1 * time.Second) - for range throttlerMetricsTick { + throttlerMetricsTick := time.NewTicker(1 * time.Second) + defer throttlerMetricsTick.Stop() + for { + <-throttlerMetricsTick.C if atomic.LoadInt64(&this.finishedMigrating) > 0 { return } @@ -453,8 +463,9 @@ func (this *Throttler) initiateThrottlerCollection(firstThrottlingCollected chan } // initiateThrottlerChecks initiates the throttle ticker and sets the basic behavior of throttling. -func (this *Throttler) initiateThrottlerChecks() error { - throttlerTick := time.Tick(100 * time.Millisecond) +func (this *Throttler) initiateThrottlerChecks() { + throttlerTick := time.NewTicker(100 * time.Millisecond) + defer throttlerTick.Stop() throttlerFunction := func() { alreadyThrottling, currentReason, _ := this.migrationContext.IsThrottled() @@ -472,14 +483,13 @@ func (this *Throttler) initiateThrottlerChecks() error { this.migrationContext.SetThrottled(shouldThrottle, throttleReason, throttleReasonHint) } throttlerFunction() - for range throttlerTick { + for { + <-throttlerTick.C if atomic.LoadInt64(&this.finishedMigrating) > 0 { - return nil + return } throttlerFunction() } - - return nil } // throttle sees if throttling needs take place, and if so, continuously sleeps (blocks) diff --git a/go/sql/builder.go b/go/sql/builder.go index 15199ff..b24a848 100644 --- a/go/sql/builder.go +++ b/go/sql/builder.go @@ -501,6 +501,9 @@ func BuildDMLUpdateQuery(databaseName, tableName string, tableColumns, sharedCol } equalsComparison, err := BuildEqualsPreparedComparison(uniqueKeyColumns.Names()) + if err != nil { + return "", sharedArgs, uniqueKeyArgs, err + } result = fmt.Sprintf(` update /* gh-ost %s.%s */ %s.%s