Cancel any row count queries before attempting to cut over (#846)
* Cancel any row count queries before attempting to cut over Closes #830. Switches from using `QueryRow` to `QueryRowContext`, and stores a context.CancelFunc in the migration context, which is called to halt any running row count query before beginning the cut over. * Make it threadsafe * Kill the count query on the database side as well * Explicitly grab a connection to run the count, store its connection id * When the query context is canceled, run a `KILL QUERY ?` on that connection id * Rewrite these to use the threadsafe functions, stop exporting the cancel func * Update logger * Update logger Co-authored-by: Tim Vaillancourt <timvaillancourt@github.com> Co-authored-by: Tim Vaillancourt <tim@timvaillancourt.com> Co-authored-by: dm-2 <45519614+dm-2@users.noreply.github.com>
This commit is contained in:
parent
308ba7f915
commit
b751499091
@ -82,6 +82,8 @@ type MigrationContext struct {
|
|||||||
AlterStatement string
|
AlterStatement string
|
||||||
AlterStatementOptions string // anything following the 'ALTER TABLE [schema.]table' from AlterStatement
|
AlterStatementOptions string // anything following the 'ALTER TABLE [schema.]table' from AlterStatement
|
||||||
|
|
||||||
|
countMutex sync.Mutex
|
||||||
|
countTableRowsCancelFunc func()
|
||||||
CountTableRows bool
|
CountTableRows bool
|
||||||
ConcurrentCountTableRows bool
|
ConcurrentCountTableRows bool
|
||||||
AllowedRunningOnMaster bool
|
AllowedRunningOnMaster bool
|
||||||
@ -428,6 +430,36 @@ func (this *MigrationContext) IsTransactionalTable() bool {
|
|||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SetCountTableRowsCancelFunc sets the cancel function for the CountTableRows query context
|
||||||
|
func (this *MigrationContext) SetCountTableRowsCancelFunc(f func()) {
|
||||||
|
this.countMutex.Lock()
|
||||||
|
defer this.countMutex.Unlock()
|
||||||
|
|
||||||
|
this.countTableRowsCancelFunc = f
|
||||||
|
}
|
||||||
|
|
||||||
|
// IsCountingTableRows returns true if the migration has a table count query running
|
||||||
|
func (this *MigrationContext) IsCountingTableRows() bool {
|
||||||
|
this.countMutex.Lock()
|
||||||
|
defer this.countMutex.Unlock()
|
||||||
|
|
||||||
|
return this.countTableRowsCancelFunc != nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// CancelTableRowsCount cancels the CountTableRows query context. It is safe to
|
||||||
|
// call function even when IsCountingTableRows is false.
|
||||||
|
func (this *MigrationContext) CancelTableRowsCount() {
|
||||||
|
this.countMutex.Lock()
|
||||||
|
defer this.countMutex.Unlock()
|
||||||
|
|
||||||
|
if this.countTableRowsCancelFunc == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
this.countTableRowsCancelFunc()
|
||||||
|
this.countTableRowsCancelFunc = nil
|
||||||
|
}
|
||||||
|
|
||||||
// ElapsedTime returns time since very beginning of the process
|
// ElapsedTime returns time since very beginning of the process
|
||||||
func (this *MigrationContext) ElapsedTime() time.Duration {
|
func (this *MigrationContext) ElapsedTime() time.Duration {
|
||||||
return time.Since(this.StartTime)
|
return time.Since(this.StartTime)
|
||||||
|
@ -6,6 +6,7 @@
|
|||||||
package logic
|
package logic
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
gosql "database/sql"
|
gosql "database/sql"
|
||||||
"fmt"
|
"fmt"
|
||||||
"reflect"
|
"reflect"
|
||||||
@ -532,18 +533,48 @@ func (this *Inspector) estimateTableRowsViaExplain() error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Kill kills a query for connectionID.
|
||||||
|
// - @amason: this should go somewhere _other_ than `logic`, but I couldn't decide
|
||||||
|
// between `base`, `sql`, or `mysql`.
|
||||||
|
func Kill(db *gosql.DB, connectionID string) error {
|
||||||
|
_, err := db.Exec(`KILL QUERY %s`, connectionID)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
// CountTableRows counts exact number of rows on the original table
|
// CountTableRows counts exact number of rows on the original table
|
||||||
func (this *Inspector) CountTableRows() error {
|
func (this *Inspector) CountTableRows(ctx context.Context) error {
|
||||||
atomic.StoreInt64(&this.migrationContext.CountingRowsFlag, 1)
|
atomic.StoreInt64(&this.migrationContext.CountingRowsFlag, 1)
|
||||||
defer atomic.StoreInt64(&this.migrationContext.CountingRowsFlag, 0)
|
defer atomic.StoreInt64(&this.migrationContext.CountingRowsFlag, 0)
|
||||||
|
|
||||||
this.migrationContext.Log.Infof("As instructed, I'm issuing a SELECT COUNT(*) on the table. This may take a while")
|
this.migrationContext.Log.Infof("As instructed, I'm issuing a SELECT COUNT(*) on the table. This may take a while")
|
||||||
|
|
||||||
query := fmt.Sprintf(`select /* gh-ost */ count(*) as count_rows from %s.%s`, sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.OriginalTableName))
|
conn, err := this.db.Conn(ctx)
|
||||||
var rowsEstimate int64
|
if err != nil {
|
||||||
if err := this.db.QueryRow(query).Scan(&rowsEstimate); err != nil {
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
defer conn.Close()
|
||||||
|
|
||||||
|
var connectionID string
|
||||||
|
if err := conn.QueryRowContext(ctx, `SELECT /* gh-ost */ CONNECTION_ID()`).Scan(&connectionID); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
query := fmt.Sprintf(`select /* gh-ost */ count(*) as count_rows from %s.%s`, sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.OriginalTableName))
|
||||||
|
var rowsEstimate int64
|
||||||
|
if err := conn.QueryRowContext(ctx, query).Scan(&rowsEstimate); err != nil {
|
||||||
|
switch err {
|
||||||
|
case context.Canceled, context.DeadlineExceeded:
|
||||||
|
this.migrationContext.Log.Infof("exact row count cancelled (%s), likely because I'm about to cut over. I'm going to kill that query.", ctx.Err())
|
||||||
|
return Kill(this.db, connectionID)
|
||||||
|
default:
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// row count query finished. nil out the cancel func, so the main migration thread
|
||||||
|
// doesn't bother calling it after row copy is done.
|
||||||
|
this.migrationContext.SetCountTableRowsCancelFunc(nil)
|
||||||
|
|
||||||
atomic.StoreInt64(&this.migrationContext.RowsEstimate, rowsEstimate)
|
atomic.StoreInt64(&this.migrationContext.RowsEstimate, rowsEstimate)
|
||||||
this.migrationContext.UsedRowsEstimateMethod = base.CountRowsEstimate
|
this.migrationContext.UsedRowsEstimateMethod = base.CountRowsEstimate
|
||||||
|
|
||||||
|
@ -6,6 +6,7 @@
|
|||||||
package logic
|
package logic
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"math"
|
"math"
|
||||||
@ -295,8 +296,8 @@ func (this *Migrator) countTableRows() (err error) {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
countRowsFunc := func() error {
|
countRowsFunc := func(ctx context.Context) error {
|
||||||
if err := this.inspector.CountTableRows(); err != nil {
|
if err := this.inspector.CountTableRows(ctx); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if err := this.hooksExecutor.onRowCountComplete(); err != nil {
|
if err := this.hooksExecutor.onRowCountComplete(); err != nil {
|
||||||
@ -306,12 +307,17 @@ func (this *Migrator) countTableRows() (err error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if this.migrationContext.ConcurrentCountTableRows {
|
if this.migrationContext.ConcurrentCountTableRows {
|
||||||
|
// store a cancel func so we can stop this query before a cut over
|
||||||
|
rowCountContext, rowCountCancel := context.WithCancel(context.Background())
|
||||||
|
this.migrationContext.SetCountTableRowsCancelFunc(rowCountCancel)
|
||||||
|
|
||||||
this.migrationContext.Log.Infof("As instructed, counting rows in the background; meanwhile I will use an estimated count, and will update it later on")
|
this.migrationContext.Log.Infof("As instructed, counting rows in the background; meanwhile I will use an estimated count, and will update it later on")
|
||||||
go countRowsFunc()
|
go countRowsFunc(rowCountContext)
|
||||||
|
|
||||||
// and we ignore errors, because this turns to be a background job
|
// and we ignore errors, because this turns to be a background job
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
return countRowsFunc()
|
return countRowsFunc(context.Background())
|
||||||
}
|
}
|
||||||
|
|
||||||
func (this *Migrator) createFlagFiles() (err error) {
|
func (this *Migrator) createFlagFiles() (err error) {
|
||||||
@ -415,6 +421,10 @@ func (this *Migrator) Migrate() (err error) {
|
|||||||
}
|
}
|
||||||
this.printStatus(ForcePrintStatusRule)
|
this.printStatus(ForcePrintStatusRule)
|
||||||
|
|
||||||
|
if this.migrationContext.IsCountingTableRows() {
|
||||||
|
this.migrationContext.Log.Info("stopping query for exact row count, because that can accidentally lock out the cut over")
|
||||||
|
this.migrationContext.CancelTableRowsCount()
|
||||||
|
}
|
||||||
if err := this.hooksExecutor.onBeforeCutOver(); err != nil {
|
if err := this.hooksExecutor.onBeforeCutOver(); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user