Merge branch 'master' into master

This commit is contained in:
dm-2 2022-10-21 17:02:17 +01:00 committed by GitHub
commit b5387331f8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 29 additions and 26 deletions

View File

@ -4,14 +4,19 @@ linters:
disable: disable:
- errcheck - errcheck
enable: enable:
- bodyclose
- containedctx
- contextcheck - contextcheck
- dogsled
- durationcheck - durationcheck
- errname - errname
- errorlint
- execinquery - execinquery
- gofmt - gofmt
- ifshort - ifshort
- misspell - misspell
- nilerr - nilerr
- nilnil
- noctx - noctx
- nolintlint - nolintlint
- nosprintfhostport - nosprintfhostport
@ -19,6 +24,7 @@ linters:
- rowserrcheck - rowserrcheck
- sqlclosecheck - sqlclosecheck
- unconvert - unconvert
- unparam
- unused - unused
- wastedassign - wastedassign
- whitespace - whitespace

View File

@ -858,7 +858,7 @@ func (this *MigrationContext) ReadConfigFile() error {
if cfg.Section("osc").HasKey("chunk_size") { if cfg.Section("osc").HasKey("chunk_size") {
this.config.Osc.Chunk_Size, err = cfg.Section("osc").Key("chunk_size").Int64() this.config.Osc.Chunk_Size, err = cfg.Section("osc").Key("chunk_size").Int64()
if err != nil { if err != nil {
return fmt.Errorf("Unable to read osc chunk size: %s", err.Error()) return fmt.Errorf("Unable to read osc chunk size: %w", err)
} }
} }
@ -873,7 +873,7 @@ func (this *MigrationContext) ReadConfigFile() error {
if cfg.Section("osc").HasKey("max_lag_millis") { if cfg.Section("osc").HasKey("max_lag_millis") {
this.config.Osc.Max_Lag_Millis, err = cfg.Section("osc").Key("max_lag_millis").Int64() this.config.Osc.Max_Lag_Millis, err = cfg.Section("osc").Key("max_lag_millis").Int64()
if err != nil { if err != nil {
return fmt.Errorf("Unable to read max lag millis: %s", err.Error()) return fmt.Errorf("Unable to read max lag millis: %w", err)
} }
} }

View File

@ -1132,7 +1132,7 @@ func (this *Applier) ApplyDMLEventQueries(dmlEvents [](*binlog.BinlogDMLEvent))
} }
result, err := tx.Exec(buildResult.query, buildResult.args...) result, err := tx.Exec(buildResult.query, buildResult.args...)
if err != nil { if err != nil {
err = fmt.Errorf("%s; query=%s; args=%+v", err.Error(), buildResult.query, buildResult.args) err = fmt.Errorf("%w; query=%s; args=%+v", err, buildResult.query, buildResult.args)
return rollback(err) return rollback(err)
} }

View File

@ -8,6 +8,7 @@ package logic
import ( import (
"context" "context"
gosql "database/sql" gosql "database/sql"
"errors"
"fmt" "fmt"
"reflect" "reflect"
"strings" "strings"
@ -554,13 +555,11 @@ func (this *Inspector) CountTableRows(ctx context.Context) error {
query := fmt.Sprintf(`select /* gh-ost */ count(*) as count_rows from %s.%s`, sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.OriginalTableName)) query := fmt.Sprintf(`select /* gh-ost */ count(*) as count_rows from %s.%s`, sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.OriginalTableName))
var rowsEstimate int64 var rowsEstimate int64
if err := conn.QueryRowContext(ctx, query).Scan(&rowsEstimate); err != nil { if err := conn.QueryRowContext(ctx, query).Scan(&rowsEstimate); err != nil {
switch err { if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
case context.Canceled, context.DeadlineExceeded:
this.migrationContext.Log.Infof("exact row count cancelled (%s), likely because I'm about to cut over. I'm going to kill that query.", ctx.Err()) this.migrationContext.Log.Infof("exact row count cancelled (%s), likely because I'm about to cut over. I'm going to kill that query.", ctx.Err())
return mysql.Kill(this.db, connectionID) return mysql.Kill(this.db, connectionID)
default:
return err
} }
return err
} }
// row count query finished. nil out the cancel func, so the main migration thread // row count query finished. nil out the cancel func, so the main migration thread

View File

@ -390,9 +390,9 @@ func (this *Migrator) Migrate() (err error) {
if err := this.applier.ReadMigrationRangeValues(); err != nil { if err := this.applier.ReadMigrationRangeValues(); err != nil {
return err return err
} }
if err := this.initiateThrottler(); err != nil {
return err this.initiateThrottler()
}
if err := this.hooksExecutor.onBeforeRowCopy(); err != nil { if err := this.hooksExecutor.onBeforeRowCopy(); err != nil {
return err return err
} }
@ -1091,7 +1091,7 @@ func (this *Migrator) addDMLEventsListener() error {
} }
// initiateThrottler kicks in the throttling collection and the throttling checks. // initiateThrottler kicks in the throttling collection and the throttling checks.
func (this *Migrator) initiateThrottler() error { func (this *Migrator) initiateThrottler() {
this.throttler = NewThrottler(this.migrationContext, this.applier, this.inspector, this.appVersion) this.throttler = NewThrottler(this.migrationContext, this.applier, this.inspector, this.appVersion)
go this.throttler.initiateThrottlerCollection(this.firstThrottlingCollected) go this.throttler.initiateThrottlerCollection(this.firstThrottlingCollected)
@ -1101,8 +1101,6 @@ func (this *Migrator) initiateThrottler() error {
<-this.firstThrottlingCollected // other, general metrics <-this.firstThrottlingCollected // other, general metrics
this.migrationContext.Log.Infof("First throttle metrics collected") this.migrationContext.Log.Infof("First throttle metrics collected")
go this.throttler.initiateThrottlerChecks() go this.throttler.initiateThrottlerChecks()
return nil
} }
func (this *Migrator) initiateApplier() error { func (this *Migrator) initiateApplier() error {

View File

@ -308,6 +308,8 @@ func (this *Throttler) collectThrottleHTTPStatus(firstThrottlingCollected chan<-
if err != nil { if err != nil {
return false, err return false, err
} }
defer resp.Body.Close()
atomic.StoreInt64(&this.migrationContext.ThrottleHTTPStatusCode, int64(resp.StatusCode)) atomic.StoreInt64(&this.migrationContext.ThrottleHTTPStatusCode, int64(resp.StatusCode))
return false, nil return false, nil
} }

View File

@ -62,7 +62,7 @@ func NewParserFromAlterStatement(alterStatement string) *AlterTableParser {
return parser return parser
} }
func (this *AlterTableParser) tokenizeAlterStatement(alterStatement string) (tokens []string, err error) { func (this *AlterTableParser) tokenizeAlterStatement(alterStatement string) (tokens []string) {
terminatingQuote := rune(0) terminatingQuote := rune(0)
f := func(c rune) bool { f := func(c rune) bool {
switch { switch {
@ -86,7 +86,7 @@ func (this *AlterTableParser) tokenizeAlterStatement(alterStatement string) (tok
for i := range tokens { for i := range tokens {
tokens[i] = strings.TrimSpace(tokens[i]) tokens[i] = strings.TrimSpace(tokens[i])
} }
return tokens, nil return tokens
} }
func (this *AlterTableParser) sanitizeQuotesFromAlterStatement(alterStatement string) (strippedStatement string) { func (this *AlterTableParser) sanitizeQuotesFromAlterStatement(alterStatement string) (strippedStatement string) {
@ -95,7 +95,7 @@ func (this *AlterTableParser) sanitizeQuotesFromAlterStatement(alterStatement st
return strippedStatement return strippedStatement
} }
func (this *AlterTableParser) parseAlterToken(alterToken string) (err error) { func (this *AlterTableParser) parseAlterToken(alterToken string) {
{ {
// rename // rename
allStringSubmatch := renameColumnRegexp.FindAllStringSubmatch(alterToken, -1) allStringSubmatch := renameColumnRegexp.FindAllStringSubmatch(alterToken, -1)
@ -131,7 +131,6 @@ func (this *AlterTableParser) parseAlterToken(alterToken string) (err error) {
this.isAutoIncrementDefined = true this.isAutoIncrementDefined = true
} }
} }
return nil
} }
func (this *AlterTableParser) ParseAlterStatement(alterStatement string) (err error) { func (this *AlterTableParser) ParseAlterStatement(alterStatement string) (err error) {
@ -151,8 +150,7 @@ func (this *AlterTableParser) ParseAlterStatement(alterStatement string) (err er
break break
} }
} }
alterTokens, _ := this.tokenizeAlterStatement(this.alterStatementOptions) for _, alterToken := range this.tokenizeAlterStatement(this.alterStatementOptions) {
for _, alterToken := range alterTokens {
alterToken = this.sanitizeQuotesFromAlterStatement(alterToken) alterToken = this.sanitizeQuotesFromAlterStatement(alterToken)
this.parseAlterToken(alterToken) this.parseAlterToken(alterToken)
this.alterTokens = append(this.alterTokens, alterToken) this.alterTokens = append(this.alterTokens, alterToken)

View File

@ -99,37 +99,37 @@ func TestTokenizeAlterStatement(t *testing.T) {
parser := NewAlterTableParser() parser := NewAlterTableParser()
{ {
alterStatement := "add column t int" alterStatement := "add column t int"
tokens, _ := parser.tokenizeAlterStatement(alterStatement) tokens := parser.tokenizeAlterStatement(alterStatement)
test.S(t).ExpectTrue(reflect.DeepEqual(tokens, []string{"add column t int"})) test.S(t).ExpectTrue(reflect.DeepEqual(tokens, []string{"add column t int"}))
} }
{ {
alterStatement := "add column t int, change column i int" alterStatement := "add column t int, change column i int"
tokens, _ := parser.tokenizeAlterStatement(alterStatement) tokens := parser.tokenizeAlterStatement(alterStatement)
test.S(t).ExpectTrue(reflect.DeepEqual(tokens, []string{"add column t int", "change column i int"})) test.S(t).ExpectTrue(reflect.DeepEqual(tokens, []string{"add column t int", "change column i int"}))
} }
{ {
alterStatement := "add column t int, change column i int 'some comment'" alterStatement := "add column t int, change column i int 'some comment'"
tokens, _ := parser.tokenizeAlterStatement(alterStatement) tokens := parser.tokenizeAlterStatement(alterStatement)
test.S(t).ExpectTrue(reflect.DeepEqual(tokens, []string{"add column t int", "change column i int 'some comment'"})) test.S(t).ExpectTrue(reflect.DeepEqual(tokens, []string{"add column t int", "change column i int 'some comment'"}))
} }
{ {
alterStatement := "add column t int, change column i int 'some comment, with comma'" alterStatement := "add column t int, change column i int 'some comment, with comma'"
tokens, _ := parser.tokenizeAlterStatement(alterStatement) tokens := parser.tokenizeAlterStatement(alterStatement)
test.S(t).ExpectTrue(reflect.DeepEqual(tokens, []string{"add column t int", "change column i int 'some comment, with comma'"})) test.S(t).ExpectTrue(reflect.DeepEqual(tokens, []string{"add column t int", "change column i int 'some comment, with comma'"}))
} }
{ {
alterStatement := "add column t int, add column d decimal(10,2)" alterStatement := "add column t int, add column d decimal(10,2)"
tokens, _ := parser.tokenizeAlterStatement(alterStatement) tokens := parser.tokenizeAlterStatement(alterStatement)
test.S(t).ExpectTrue(reflect.DeepEqual(tokens, []string{"add column t int", "add column d decimal(10,2)"})) test.S(t).ExpectTrue(reflect.DeepEqual(tokens, []string{"add column t int", "add column d decimal(10,2)"}))
} }
{ {
alterStatement := "add column t int, add column e enum('a','b','c')" alterStatement := "add column t int, add column e enum('a','b','c')"
tokens, _ := parser.tokenizeAlterStatement(alterStatement) tokens := parser.tokenizeAlterStatement(alterStatement)
test.S(t).ExpectTrue(reflect.DeepEqual(tokens, []string{"add column t int", "add column e enum('a','b','c')"})) test.S(t).ExpectTrue(reflect.DeepEqual(tokens, []string{"add column t int", "add column e enum('a','b','c')"}))
} }
{ {
alterStatement := "add column t int(11), add column e enum('a','b','c')" alterStatement := "add column t int(11), add column e enum('a','b','c')"
tokens, _ := parser.tokenizeAlterStatement(alterStatement) tokens := parser.tokenizeAlterStatement(alterStatement)
test.S(t).ExpectTrue(reflect.DeepEqual(tokens, []string{"add column t int(11)", "add column e enum('a','b','c')"})) test.S(t).ExpectTrue(reflect.DeepEqual(tokens, []string{"add column t int(11)", "add column e enum('a','b','c')"}))
} }
} }