Compare commits

...

6 Commits

Author SHA1 Message Date
Shlomi Noach
b3502ebc59
Merge branch 'master' into abort-missing-backend-tables 2020-02-04 09:06:20 +02:00
Shlomi Noach
0986970fc7
Merge branch 'master' into abort-missing-backend-tables 2019-08-15 11:28:28 +03:00
Shlomi Noach
9aef2707ea bailing out on missing _ghc table on failure to writing to it 2019-08-01 15:19:55 +03:00
Shlomi Noach
1a45b5fffc bailing out on missing _ghc table on failure to writing to it 2019-08-01 15:19:38 +03:00
Shlomi Noach
e2482d4e2e adding tests 2019-08-01 15:13:08 +03:00
Shlomi Noach
21a251b620 Panic on missing _ghc and _gho tables 2019-08-01 15:11:34 +03:00
16 changed files with 136 additions and 11 deletions

View File

@ -752,3 +752,20 @@ func (this *MigrationContext) ReadConfigFile() error {
return nil return nil
} }
func (this *MigrationContext) PanicAbortIfTableError(err error) {
if err == nil {
return
}
if strings.Contains(err.Error(), mysql.Error1146TableDoesntExist) || strings.Contains(err.Error(), mysql.Error1017CantFindFile) {
this.PanicAbortOnError(err)
}
// otherwise irrelevant error and we do not panic
}
func (this *MigrationContext) PanicAbortOnError(err error) {
if err == nil {
return
}
this.PanicAbort <- err
}

View File

@ -290,6 +290,7 @@ func (this *Applier) WriteChangelog(hint, value string) (string, error) {
sql.EscapeName(this.migrationContext.GetChangelogTableName()), sql.EscapeName(this.migrationContext.GetChangelogTableName()),
) )
_, err := sqlutils.ExecNoPrepare(this.db, query, explicitId, hint, value) _, err := sqlutils.ExecNoPrepare(this.db, query, explicitId, hint, value)
this.migrationContext.PanicAbortIfTableError(err)
return hint, err return hint, err
} }
@ -916,6 +917,8 @@ func (this *Applier) AtomicCutoverRename(sessionIdChan chan int64, tablesRenamed
) )
log.Infof("Issuing and expecting this to block: %s", query) log.Infof("Issuing and expecting this to block: %s", query)
if _, err := tx.Exec(query); err != nil { if _, err := tx.Exec(query); err != nil {
this.migrationContext.PanicAbortIfTableError(err)
tablesRenamed <- err tablesRenamed <- err
return log.Errore(err) return log.Errore(err)
} }

View File

@ -144,7 +144,7 @@ func (this *Migrator) retryOperation(operation func() error, notFatalHint ...boo
// there's an error. Let's try again. // there's an error. Let's try again.
} }
if len(notFatalHint) == 0 { if len(notFatalHint) == 0 {
this.migrationContext.PanicAbort <- err this.migrationContext.PanicAbortOnError(err)
} }
return err return err
} }
@ -172,7 +172,7 @@ func (this *Migrator) retryOperationWithExponentialBackoff(operation func() erro
} }
} }
if len(notFatalHint) == 0 { if len(notFatalHint) == 0 {
this.migrationContext.PanicAbort <- err this.migrationContext.PanicAbortOnError(err)
} }
return err return err
} }
@ -191,14 +191,14 @@ func (this *Migrator) executeAndThrottleOnError(operation func() error) (err err
// consumes and drops any further incoming events that may be left hanging. // consumes and drops any further incoming events that may be left hanging.
func (this *Migrator) consumeRowCopyComplete() { func (this *Migrator) consumeRowCopyComplete() {
if err := <-this.rowCopyComplete; err != nil { if err := <-this.rowCopyComplete; err != nil {
this.migrationContext.PanicAbort <- err this.migrationContext.PanicAbortOnError(err)
} }
atomic.StoreInt64(&this.rowCopyCompleteFlag, 1) atomic.StoreInt64(&this.rowCopyCompleteFlag, 1)
this.migrationContext.MarkRowCopyEndTime() this.migrationContext.MarkRowCopyEndTime()
go func() { go func() {
for err := range this.rowCopyComplete { for err := range this.rowCopyComplete {
if err != nil { if err != nil {
this.migrationContext.PanicAbort <- err this.migrationContext.PanicAbortOnError(err)
} }
} }
}() }()
@ -620,10 +620,12 @@ func (this *Migrator) atomicCutOver() (err error) {
tableUnlocked := make(chan error, 2) tableUnlocked := make(chan error, 2)
go func() { go func() {
if err := this.applier.AtomicCutOverMagicLock(lockOriginalSessionIdChan, tableLocked, okToUnlockTable, tableUnlocked); err != nil { if err := this.applier.AtomicCutOverMagicLock(lockOriginalSessionIdChan, tableLocked, okToUnlockTable, tableUnlocked); err != nil {
this.migrationContext.PanicAbortIfTableError(err)
log.Errore(err) log.Errore(err)
} }
}() }()
if err := <-tableLocked; err != nil { if err := <-tableLocked; err != nil {
this.migrationContext.PanicAbortIfTableError(err)
return log.Errore(err) return log.Errore(err)
} }
lockOriginalSessionId := <-lockOriginalSessionIdChan lockOriginalSessionId := <-lockOriginalSessionIdChan
@ -631,6 +633,7 @@ func (this *Migrator) atomicCutOver() (err error) {
// At this point we know the original table is locked. // At this point we know the original table is locked.
// We know any newly incoming DML on original table is blocked. // We know any newly incoming DML on original table is blocked.
if err := this.waitForEventsUpToLock(); err != nil { if err := this.waitForEventsUpToLock(); err != nil {
this.migrationContext.PanicAbortIfTableError(err)
return log.Errore(err) return log.Errore(err)
} }
@ -644,6 +647,7 @@ func (this *Migrator) atomicCutOver() (err error) {
go func() { go func() {
if err := this.applier.AtomicCutoverRename(renameSessionIdChan, tablesRenamed); err != nil { if err := this.applier.AtomicCutoverRename(renameSessionIdChan, tablesRenamed); err != nil {
// Abort! Release the lock // Abort! Release the lock
this.migrationContext.PanicAbortIfTableError(err)
atomic.StoreInt64(&tableRenameKnownToHaveFailed, 1) atomic.StoreInt64(&tableRenameKnownToHaveFailed, 1)
okToUnlockTable <- true okToUnlockTable <- true
} }
@ -999,9 +1003,8 @@ func (this *Migrator) initiateStreaming() error {
go func() { go func() {
log.Debugf("Beginning streaming") log.Debugf("Beginning streaming")
err := this.eventsStreamer.StreamEvents(this.canStopStreaming) if err := this.eventsStreamer.StreamEvents(this.canStopStreaming); err != nil {
if err != nil { this.migrationContext.PanicAbortOnError(err)
this.migrationContext.PanicAbort <- err
} }
log.Debugf("Done streaming") log.Debugf("Done streaming")
}() }()

View File

@ -342,7 +342,7 @@ help # This message
return NoPrintStatusRule, err return NoPrintStatusRule, err
} }
err := fmt.Errorf("User commanded 'panic'. The migration will be aborted without cleanup. Please drop the gh-ost tables before trying again.") err := fmt.Errorf("User commanded 'panic'. The migration will be aborted without cleanup. Please drop the gh-ost tables before trying again.")
this.migrationContext.PanicAbort <- err this.migrationContext.PanicAbortOnError(err)
return NoPrintStatusRule, err return NoPrintStatusRule, err
} }
default: default:

View File

@ -209,6 +209,7 @@ func (this *Throttler) collectControlReplicasLag() {
lagResult := &mysql.ReplicationLagResult{Key: connectionConfig.Key} lagResult := &mysql.ReplicationLagResult{Key: connectionConfig.Key}
go func() { go func() {
lagResult.Lag, lagResult.Err = readReplicaLag(connectionConfig) lagResult.Lag, lagResult.Err = readReplicaLag(connectionConfig)
this.migrationContext.PanicAbortIfTableError(lagResult.Err)
lagResults <- lagResult lagResults <- lagResult
}() }()
} }
@ -317,7 +318,7 @@ func (this *Throttler) collectGeneralThrottleMetrics() error {
// Regardless of throttle, we take opportunity to check for panic-abort // Regardless of throttle, we take opportunity to check for panic-abort
if this.migrationContext.PanicFlagFile != "" { if this.migrationContext.PanicFlagFile != "" {
if base.FileExists(this.migrationContext.PanicFlagFile) { if base.FileExists(this.migrationContext.PanicFlagFile) {
this.migrationContext.PanicAbort <- fmt.Errorf("Found panic-file %s. Aborting without cleanup", this.migrationContext.PanicFlagFile) this.migrationContext.PanicAbortOnError(fmt.Errorf("Found panic-file %s. Aborting without cleanup", this.migrationContext.PanicFlagFile))
} }
} }
@ -340,7 +341,7 @@ func (this *Throttler) collectGeneralThrottleMetrics() error {
} }
if criticalLoadMet && this.migrationContext.CriticalLoadIntervalMilliseconds == 0 { if criticalLoadMet && this.migrationContext.CriticalLoadIntervalMilliseconds == 0 {
this.migrationContext.PanicAbort <- fmt.Errorf("critical-load met: %s=%d, >=%d", variableName, value, threshold) this.migrationContext.PanicAbortOnError(fmt.Errorf("critical-load met: %s=%d, >=%d", variableName, value, threshold))
} }
if criticalLoadMet && this.migrationContext.CriticalLoadIntervalMilliseconds > 0 { if criticalLoadMet && this.migrationContext.CriticalLoadIntervalMilliseconds > 0 {
log.Errorf("critical-load met once: %s=%d, >=%d. Will check again in %d millis", variableName, value, threshold, this.migrationContext.CriticalLoadIntervalMilliseconds) log.Errorf("critical-load met once: %s=%d, >=%d. Will check again in %d millis", variableName, value, threshold, this.migrationContext.CriticalLoadIntervalMilliseconds)
@ -348,7 +349,7 @@ func (this *Throttler) collectGeneralThrottleMetrics() error {
timer := time.NewTimer(time.Millisecond * time.Duration(this.migrationContext.CriticalLoadIntervalMilliseconds)) timer := time.NewTimer(time.Millisecond * time.Duration(this.migrationContext.CriticalLoadIntervalMilliseconds))
<-timer.C <-timer.C
if criticalLoadMetAgain, variableName, value, threshold, _ := this.criticalLoadIsMet(); criticalLoadMetAgain { if criticalLoadMetAgain, variableName, value, threshold, _ := this.criticalLoadIsMet(); criticalLoadMetAgain {
this.migrationContext.PanicAbort <- fmt.Errorf("critical-load met again after %d millis: %s=%d, >=%d", this.migrationContext.CriticalLoadIntervalMilliseconds, variableName, value, threshold) this.migrationContext.PanicAbortOnError(fmt.Errorf("critical-load met again after %d millis: %s=%d, >=%d", this.migrationContext.CriticalLoadIntervalMilliseconds, variableName, value, threshold))
} }
}() }()
} }

View File

@ -18,6 +18,11 @@ import (
"github.com/outbrain/golib/sqlutils" "github.com/outbrain/golib/sqlutils"
) )
const (
Error1017CantFindFile = "Error 1017:"
Error1146TableDoesntExist = "Error 1146:"
)
const MaxTableNameLength = 64 const MaxTableNameLength = 64
const MaxReplicationPasswordLength = 32 const MaxReplicationPasswordLength = 32

View File

@ -0,0 +1,23 @@
drop table if exists gh_ost_test;
create table gh_ost_test (
id int auto_increment,
i int not null,
color varchar(32),
primary key(id)
) auto_increment=1;
insert into gh_ost_test values (null, 1, 'red');
drop event if exists gh_ost_test;
delimiter ;;
create event gh_ost_test
on schedule every 1 second
starts current_timestamp + interval 3 second
ends current_timestamp + interval 60 second
on completion not preserve
enable
do
begin
insert into gh_ost_test values (null, 1, 'blue');
drop table if exists _gh_ost_test_ghc;
end ;;

View File

@ -0,0 +1 @@
Error 1146: Table 'test._gh_ost_test_ghc' doesn't exist

View File

@ -0,0 +1 @@
--throttle-query='select sleep(1)'

View File

@ -0,0 +1,23 @@
drop table if exists gh_ost_test;
create table gh_ost_test (
id int auto_increment,
i int not null,
color varchar(32),
primary key(id)
) auto_increment=1;
insert into gh_ost_test values (null, 1, 'red');
drop event if exists gh_ost_test;
delimiter ;;
create event gh_ost_test
on schedule every 1 second
starts current_timestamp + interval 3 second
ends current_timestamp + interval 60 second
on completion not preserve
enable
do
begin
insert into gh_ost_test values (null, 1, 'blue');
drop table if exists _gh_ost_test_ghc;
end ;;

View File

@ -0,0 +1 @@
Error 1146: Table 'test._gh_ost_test_ghc' doesn't exist

View File

@ -0,0 +1,22 @@
drop table if exists gh_ost_test;
create table gh_ost_test (
id int auto_increment,
i int not null,
color varchar(32),
primary key(id)
) auto_increment=1;
insert into gh_ost_test values (null, 1, 'blue');
drop event if exists gh_ost_test;
delimiter ;;
create event gh_ost_test
on schedule every 1 second
starts current_timestamp
ends current_timestamp + interval 60 second
on completion not preserve
enable
do
begin
drop table if exists _gh_ost_test_gho;
end ;;

View File

@ -0,0 +1 @@
Error 1146: Table 'test._gh_ost_test_gho' doesn't exist

View File

@ -0,0 +1 @@
--throttle-query='select timestampdiff(second, min(last_update), now()) < 5 from _gh_ost_test_ghc'

View File

@ -0,0 +1,22 @@
drop table if exists gh_ost_test;
create table gh_ost_test (
id int auto_increment,
i int not null,
color varchar(32),
primary key(id)
) auto_increment=1;
drop event if exists gh_ost_test;
delimiter ;;
create event gh_ost_test
on schedule every 1 second
starts current_timestamp
ends current_timestamp + interval 60 second
on completion not preserve
enable
do
begin
insert into gh_ost_test values (null, 1, 'blue');
drop table if exists _gh_ost_test_gho;
end ;;

View File

@ -0,0 +1 @@
Error 1146: Table 'test._gh_ost_test_gho' doesn't exist