Compare commits
6 Commits
master
...
abort-miss
Author | SHA1 | Date | |
---|---|---|---|
|
b3502ebc59 | ||
|
0986970fc7 | ||
|
9aef2707ea | ||
|
1a45b5fffc | ||
|
e2482d4e2e | ||
|
21a251b620 |
@ -752,3 +752,20 @@ func (this *MigrationContext) ReadConfigFile() error {
|
|||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (this *MigrationContext) PanicAbortIfTableError(err error) {
|
||||||
|
if err == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if strings.Contains(err.Error(), mysql.Error1146TableDoesntExist) || strings.Contains(err.Error(), mysql.Error1017CantFindFile) {
|
||||||
|
this.PanicAbortOnError(err)
|
||||||
|
}
|
||||||
|
// otherwise irrelevant error and we do not panic
|
||||||
|
}
|
||||||
|
|
||||||
|
func (this *MigrationContext) PanicAbortOnError(err error) {
|
||||||
|
if err == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
this.PanicAbort <- err
|
||||||
|
}
|
||||||
|
@ -290,6 +290,7 @@ func (this *Applier) WriteChangelog(hint, value string) (string, error) {
|
|||||||
sql.EscapeName(this.migrationContext.GetChangelogTableName()),
|
sql.EscapeName(this.migrationContext.GetChangelogTableName()),
|
||||||
)
|
)
|
||||||
_, err := sqlutils.ExecNoPrepare(this.db, query, explicitId, hint, value)
|
_, err := sqlutils.ExecNoPrepare(this.db, query, explicitId, hint, value)
|
||||||
|
this.migrationContext.PanicAbortIfTableError(err)
|
||||||
return hint, err
|
return hint, err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -916,6 +917,8 @@ func (this *Applier) AtomicCutoverRename(sessionIdChan chan int64, tablesRenamed
|
|||||||
)
|
)
|
||||||
log.Infof("Issuing and expecting this to block: %s", query)
|
log.Infof("Issuing and expecting this to block: %s", query)
|
||||||
if _, err := tx.Exec(query); err != nil {
|
if _, err := tx.Exec(query); err != nil {
|
||||||
|
this.migrationContext.PanicAbortIfTableError(err)
|
||||||
|
|
||||||
tablesRenamed <- err
|
tablesRenamed <- err
|
||||||
return log.Errore(err)
|
return log.Errore(err)
|
||||||
}
|
}
|
||||||
|
@ -144,7 +144,7 @@ func (this *Migrator) retryOperation(operation func() error, notFatalHint ...boo
|
|||||||
// there's an error. Let's try again.
|
// there's an error. Let's try again.
|
||||||
}
|
}
|
||||||
if len(notFatalHint) == 0 {
|
if len(notFatalHint) == 0 {
|
||||||
this.migrationContext.PanicAbort <- err
|
this.migrationContext.PanicAbortOnError(err)
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -172,7 +172,7 @@ func (this *Migrator) retryOperationWithExponentialBackoff(operation func() erro
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
if len(notFatalHint) == 0 {
|
if len(notFatalHint) == 0 {
|
||||||
this.migrationContext.PanicAbort <- err
|
this.migrationContext.PanicAbortOnError(err)
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -191,14 +191,14 @@ func (this *Migrator) executeAndThrottleOnError(operation func() error) (err err
|
|||||||
// consumes and drops any further incoming events that may be left hanging.
|
// consumes and drops any further incoming events that may be left hanging.
|
||||||
func (this *Migrator) consumeRowCopyComplete() {
|
func (this *Migrator) consumeRowCopyComplete() {
|
||||||
if err := <-this.rowCopyComplete; err != nil {
|
if err := <-this.rowCopyComplete; err != nil {
|
||||||
this.migrationContext.PanicAbort <- err
|
this.migrationContext.PanicAbortOnError(err)
|
||||||
}
|
}
|
||||||
atomic.StoreInt64(&this.rowCopyCompleteFlag, 1)
|
atomic.StoreInt64(&this.rowCopyCompleteFlag, 1)
|
||||||
this.migrationContext.MarkRowCopyEndTime()
|
this.migrationContext.MarkRowCopyEndTime()
|
||||||
go func() {
|
go func() {
|
||||||
for err := range this.rowCopyComplete {
|
for err := range this.rowCopyComplete {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
this.migrationContext.PanicAbort <- err
|
this.migrationContext.PanicAbortOnError(err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
@ -620,10 +620,12 @@ func (this *Migrator) atomicCutOver() (err error) {
|
|||||||
tableUnlocked := make(chan error, 2)
|
tableUnlocked := make(chan error, 2)
|
||||||
go func() {
|
go func() {
|
||||||
if err := this.applier.AtomicCutOverMagicLock(lockOriginalSessionIdChan, tableLocked, okToUnlockTable, tableUnlocked); err != nil {
|
if err := this.applier.AtomicCutOverMagicLock(lockOriginalSessionIdChan, tableLocked, okToUnlockTable, tableUnlocked); err != nil {
|
||||||
|
this.migrationContext.PanicAbortIfTableError(err)
|
||||||
log.Errore(err)
|
log.Errore(err)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
if err := <-tableLocked; err != nil {
|
if err := <-tableLocked; err != nil {
|
||||||
|
this.migrationContext.PanicAbortIfTableError(err)
|
||||||
return log.Errore(err)
|
return log.Errore(err)
|
||||||
}
|
}
|
||||||
lockOriginalSessionId := <-lockOriginalSessionIdChan
|
lockOriginalSessionId := <-lockOriginalSessionIdChan
|
||||||
@ -631,6 +633,7 @@ func (this *Migrator) atomicCutOver() (err error) {
|
|||||||
// At this point we know the original table is locked.
|
// At this point we know the original table is locked.
|
||||||
// We know any newly incoming DML on original table is blocked.
|
// We know any newly incoming DML on original table is blocked.
|
||||||
if err := this.waitForEventsUpToLock(); err != nil {
|
if err := this.waitForEventsUpToLock(); err != nil {
|
||||||
|
this.migrationContext.PanicAbortIfTableError(err)
|
||||||
return log.Errore(err)
|
return log.Errore(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -644,6 +647,7 @@ func (this *Migrator) atomicCutOver() (err error) {
|
|||||||
go func() {
|
go func() {
|
||||||
if err := this.applier.AtomicCutoverRename(renameSessionIdChan, tablesRenamed); err != nil {
|
if err := this.applier.AtomicCutoverRename(renameSessionIdChan, tablesRenamed); err != nil {
|
||||||
// Abort! Release the lock
|
// Abort! Release the lock
|
||||||
|
this.migrationContext.PanicAbortIfTableError(err)
|
||||||
atomic.StoreInt64(&tableRenameKnownToHaveFailed, 1)
|
atomic.StoreInt64(&tableRenameKnownToHaveFailed, 1)
|
||||||
okToUnlockTable <- true
|
okToUnlockTable <- true
|
||||||
}
|
}
|
||||||
@ -999,9 +1003,8 @@ func (this *Migrator) initiateStreaming() error {
|
|||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
log.Debugf("Beginning streaming")
|
log.Debugf("Beginning streaming")
|
||||||
err := this.eventsStreamer.StreamEvents(this.canStopStreaming)
|
if err := this.eventsStreamer.StreamEvents(this.canStopStreaming); err != nil {
|
||||||
if err != nil {
|
this.migrationContext.PanicAbortOnError(err)
|
||||||
this.migrationContext.PanicAbort <- err
|
|
||||||
}
|
}
|
||||||
log.Debugf("Done streaming")
|
log.Debugf("Done streaming")
|
||||||
}()
|
}()
|
||||||
|
@ -342,7 +342,7 @@ help # This message
|
|||||||
return NoPrintStatusRule, err
|
return NoPrintStatusRule, err
|
||||||
}
|
}
|
||||||
err := fmt.Errorf("User commanded 'panic'. The migration will be aborted without cleanup. Please drop the gh-ost tables before trying again.")
|
err := fmt.Errorf("User commanded 'panic'. The migration will be aborted without cleanup. Please drop the gh-ost tables before trying again.")
|
||||||
this.migrationContext.PanicAbort <- err
|
this.migrationContext.PanicAbortOnError(err)
|
||||||
return NoPrintStatusRule, err
|
return NoPrintStatusRule, err
|
||||||
}
|
}
|
||||||
default:
|
default:
|
||||||
|
@ -209,6 +209,7 @@ func (this *Throttler) collectControlReplicasLag() {
|
|||||||
lagResult := &mysql.ReplicationLagResult{Key: connectionConfig.Key}
|
lagResult := &mysql.ReplicationLagResult{Key: connectionConfig.Key}
|
||||||
go func() {
|
go func() {
|
||||||
lagResult.Lag, lagResult.Err = readReplicaLag(connectionConfig)
|
lagResult.Lag, lagResult.Err = readReplicaLag(connectionConfig)
|
||||||
|
this.migrationContext.PanicAbortIfTableError(lagResult.Err)
|
||||||
lagResults <- lagResult
|
lagResults <- lagResult
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
@ -317,7 +318,7 @@ func (this *Throttler) collectGeneralThrottleMetrics() error {
|
|||||||
// Regardless of throttle, we take opportunity to check for panic-abort
|
// Regardless of throttle, we take opportunity to check for panic-abort
|
||||||
if this.migrationContext.PanicFlagFile != "" {
|
if this.migrationContext.PanicFlagFile != "" {
|
||||||
if base.FileExists(this.migrationContext.PanicFlagFile) {
|
if base.FileExists(this.migrationContext.PanicFlagFile) {
|
||||||
this.migrationContext.PanicAbort <- fmt.Errorf("Found panic-file %s. Aborting without cleanup", this.migrationContext.PanicFlagFile)
|
this.migrationContext.PanicAbortOnError(fmt.Errorf("Found panic-file %s. Aborting without cleanup", this.migrationContext.PanicFlagFile))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -340,7 +341,7 @@ func (this *Throttler) collectGeneralThrottleMetrics() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if criticalLoadMet && this.migrationContext.CriticalLoadIntervalMilliseconds == 0 {
|
if criticalLoadMet && this.migrationContext.CriticalLoadIntervalMilliseconds == 0 {
|
||||||
this.migrationContext.PanicAbort <- fmt.Errorf("critical-load met: %s=%d, >=%d", variableName, value, threshold)
|
this.migrationContext.PanicAbortOnError(fmt.Errorf("critical-load met: %s=%d, >=%d", variableName, value, threshold))
|
||||||
}
|
}
|
||||||
if criticalLoadMet && this.migrationContext.CriticalLoadIntervalMilliseconds > 0 {
|
if criticalLoadMet && this.migrationContext.CriticalLoadIntervalMilliseconds > 0 {
|
||||||
log.Errorf("critical-load met once: %s=%d, >=%d. Will check again in %d millis", variableName, value, threshold, this.migrationContext.CriticalLoadIntervalMilliseconds)
|
log.Errorf("critical-load met once: %s=%d, >=%d. Will check again in %d millis", variableName, value, threshold, this.migrationContext.CriticalLoadIntervalMilliseconds)
|
||||||
@ -348,7 +349,7 @@ func (this *Throttler) collectGeneralThrottleMetrics() error {
|
|||||||
timer := time.NewTimer(time.Millisecond * time.Duration(this.migrationContext.CriticalLoadIntervalMilliseconds))
|
timer := time.NewTimer(time.Millisecond * time.Duration(this.migrationContext.CriticalLoadIntervalMilliseconds))
|
||||||
<-timer.C
|
<-timer.C
|
||||||
if criticalLoadMetAgain, variableName, value, threshold, _ := this.criticalLoadIsMet(); criticalLoadMetAgain {
|
if criticalLoadMetAgain, variableName, value, threshold, _ := this.criticalLoadIsMet(); criticalLoadMetAgain {
|
||||||
this.migrationContext.PanicAbort <- fmt.Errorf("critical-load met again after %d millis: %s=%d, >=%d", this.migrationContext.CriticalLoadIntervalMilliseconds, variableName, value, threshold)
|
this.migrationContext.PanicAbortOnError(fmt.Errorf("critical-load met again after %d millis: %s=%d, >=%d", this.migrationContext.CriticalLoadIntervalMilliseconds, variableName, value, threshold))
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
@ -18,6 +18,11 @@ import (
|
|||||||
"github.com/outbrain/golib/sqlutils"
|
"github.com/outbrain/golib/sqlutils"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
Error1017CantFindFile = "Error 1017:"
|
||||||
|
Error1146TableDoesntExist = "Error 1146:"
|
||||||
|
)
|
||||||
|
|
||||||
const MaxTableNameLength = 64
|
const MaxTableNameLength = 64
|
||||||
const MaxReplicationPasswordLength = 32
|
const MaxReplicationPasswordLength = 32
|
||||||
|
|
||||||
|
23
localtests/fail-drop-ghc-table-no-read/create.sql
Normal file
23
localtests/fail-drop-ghc-table-no-read/create.sql
Normal file
@ -0,0 +1,23 @@
|
|||||||
|
drop table if exists gh_ost_test;
|
||||||
|
create table gh_ost_test (
|
||||||
|
id int auto_increment,
|
||||||
|
i int not null,
|
||||||
|
color varchar(32),
|
||||||
|
primary key(id)
|
||||||
|
) auto_increment=1;
|
||||||
|
|
||||||
|
insert into gh_ost_test values (null, 1, 'red');
|
||||||
|
|
||||||
|
drop event if exists gh_ost_test;
|
||||||
|
delimiter ;;
|
||||||
|
create event gh_ost_test
|
||||||
|
on schedule every 1 second
|
||||||
|
starts current_timestamp + interval 3 second
|
||||||
|
ends current_timestamp + interval 60 second
|
||||||
|
on completion not preserve
|
||||||
|
enable
|
||||||
|
do
|
||||||
|
begin
|
||||||
|
insert into gh_ost_test values (null, 1, 'blue');
|
||||||
|
drop table if exists _gh_ost_test_ghc;
|
||||||
|
end ;;
|
1
localtests/fail-drop-ghc-table-no-read/expect_failure
Normal file
1
localtests/fail-drop-ghc-table-no-read/expect_failure
Normal file
@ -0,0 +1 @@
|
|||||||
|
Error 1146: Table 'test._gh_ost_test_ghc' doesn't exist
|
1
localtests/fail-drop-ghc-table-no-read/extra_args
Normal file
1
localtests/fail-drop-ghc-table-no-read/extra_args
Normal file
@ -0,0 +1 @@
|
|||||||
|
--throttle-query='select sleep(1)'
|
23
localtests/fail-drop-ghc-table/create.sql
Normal file
23
localtests/fail-drop-ghc-table/create.sql
Normal file
@ -0,0 +1,23 @@
|
|||||||
|
drop table if exists gh_ost_test;
|
||||||
|
create table gh_ost_test (
|
||||||
|
id int auto_increment,
|
||||||
|
i int not null,
|
||||||
|
color varchar(32),
|
||||||
|
primary key(id)
|
||||||
|
) auto_increment=1;
|
||||||
|
|
||||||
|
insert into gh_ost_test values (null, 1, 'red');
|
||||||
|
|
||||||
|
drop event if exists gh_ost_test;
|
||||||
|
delimiter ;;
|
||||||
|
create event gh_ost_test
|
||||||
|
on schedule every 1 second
|
||||||
|
starts current_timestamp + interval 3 second
|
||||||
|
ends current_timestamp + interval 60 second
|
||||||
|
on completion not preserve
|
||||||
|
enable
|
||||||
|
do
|
||||||
|
begin
|
||||||
|
insert into gh_ost_test values (null, 1, 'blue');
|
||||||
|
drop table if exists _gh_ost_test_ghc;
|
||||||
|
end ;;
|
1
localtests/fail-drop-ghc-table/expect_failure
Normal file
1
localtests/fail-drop-ghc-table/expect_failure
Normal file
@ -0,0 +1 @@
|
|||||||
|
Error 1146: Table 'test._gh_ost_test_ghc' doesn't exist
|
22
localtests/fail-drop-gho-table-no-data/create.sql
Normal file
22
localtests/fail-drop-gho-table-no-data/create.sql
Normal file
@ -0,0 +1,22 @@
|
|||||||
|
drop table if exists gh_ost_test;
|
||||||
|
create table gh_ost_test (
|
||||||
|
id int auto_increment,
|
||||||
|
i int not null,
|
||||||
|
color varchar(32),
|
||||||
|
primary key(id)
|
||||||
|
) auto_increment=1;
|
||||||
|
|
||||||
|
insert into gh_ost_test values (null, 1, 'blue');
|
||||||
|
|
||||||
|
drop event if exists gh_ost_test;
|
||||||
|
delimiter ;;
|
||||||
|
create event gh_ost_test
|
||||||
|
on schedule every 1 second
|
||||||
|
starts current_timestamp
|
||||||
|
ends current_timestamp + interval 60 second
|
||||||
|
on completion not preserve
|
||||||
|
enable
|
||||||
|
do
|
||||||
|
begin
|
||||||
|
drop table if exists _gh_ost_test_gho;
|
||||||
|
end ;;
|
1
localtests/fail-drop-gho-table-no-data/expect_failure
Normal file
1
localtests/fail-drop-gho-table-no-data/expect_failure
Normal file
@ -0,0 +1 @@
|
|||||||
|
Error 1146: Table 'test._gh_ost_test_gho' doesn't exist
|
1
localtests/fail-drop-gho-table-no-data/extra_args
Normal file
1
localtests/fail-drop-gho-table-no-data/extra_args
Normal file
@ -0,0 +1 @@
|
|||||||
|
--throttle-query='select timestampdiff(second, min(last_update), now()) < 5 from _gh_ost_test_ghc'
|
22
localtests/fail-drop-gho-table/create.sql
Normal file
22
localtests/fail-drop-gho-table/create.sql
Normal file
@ -0,0 +1,22 @@
|
|||||||
|
drop table if exists gh_ost_test;
|
||||||
|
create table gh_ost_test (
|
||||||
|
id int auto_increment,
|
||||||
|
i int not null,
|
||||||
|
color varchar(32),
|
||||||
|
primary key(id)
|
||||||
|
) auto_increment=1;
|
||||||
|
|
||||||
|
|
||||||
|
drop event if exists gh_ost_test;
|
||||||
|
delimiter ;;
|
||||||
|
create event gh_ost_test
|
||||||
|
on schedule every 1 second
|
||||||
|
starts current_timestamp
|
||||||
|
ends current_timestamp + interval 60 second
|
||||||
|
on completion not preserve
|
||||||
|
enable
|
||||||
|
do
|
||||||
|
begin
|
||||||
|
insert into gh_ost_test values (null, 1, 'blue');
|
||||||
|
drop table if exists _gh_ost_test_gho;
|
||||||
|
end ;;
|
1
localtests/fail-drop-gho-table/expect_failure
Normal file
1
localtests/fail-drop-gho-table/expect_failure
Normal file
@ -0,0 +1 @@
|
|||||||
|
Error 1146: Table 'test._gh_ost_test_gho' doesn't exist
|
Loading…
Reference in New Issue
Block a user