Merge pull request #1141 from shaohk/fix-two-phase-commit-lost-data_v2
fix(lost data in mysql two-phase commit): lost data in mysql two-phas…
This commit is contained in:
commit
8d9761d616
@ -421,8 +421,28 @@ func (this *Applier) ReadMigrationMaxValues(uniqueKey *sql.UniqueKey) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// ReadMigrationRangeValues reads min/max values that will be used for rowcopy
|
// ReadMigrationRangeValues reads min/max values that will be used for rowcopy.
|
||||||
|
// Before read min/max, write a changelog state into the ghc table to avoid lost data in mysql two-phase commit.
|
||||||
|
/*
|
||||||
|
Detail description of the lost data in mysql two-phase commit issue by @Fanduzi:
|
||||||
|
When using semi-sync and setting rpl_semi_sync_master_wait_point=AFTER_SYNC,
|
||||||
|
if an INSERT statement is being committed but blocks due to an unmet ack count,
|
||||||
|
the data inserted by the transaction is not visible to ReadMigrationRangeValues,
|
||||||
|
so the copy of the existing data in the table does not include the new row inserted by the transaction.
|
||||||
|
However, the binlog event for the transaction is already written to the binlog,
|
||||||
|
so the addDMLEventsListener only captures the binlog event after the transaction,
|
||||||
|
and thus the transaction's binlog event is not captured, resulting in data loss.
|
||||||
|
|
||||||
|
If write a changelog into ghc table before ReadMigrationRangeValues, and the transaction commit blocks
|
||||||
|
because the ack is not met, then the changelog will not be able to write, so the ReadMigrationRangeValues
|
||||||
|
will not be run. When the changelog writes successfully, the ReadMigrationRangeValues will read the
|
||||||
|
newly inserted data, thus Avoiding data loss due to the above problem.
|
||||||
|
*/
|
||||||
func (this *Applier) ReadMigrationRangeValues() error {
|
func (this *Applier) ReadMigrationRangeValues() error {
|
||||||
|
if _, err := this.WriteChangelogState(string(ReadMigrationRangeValues)); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
if err := this.ReadMigrationMinValues(this.migrationContext.UniqueKey); err != nil {
|
if err := this.ReadMigrationMinValues(this.migrationContext.UniqueKey); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -26,6 +26,7 @@ type ChangelogState string
|
|||||||
const (
|
const (
|
||||||
GhostTableMigrated ChangelogState = "GhostTableMigrated"
|
GhostTableMigrated ChangelogState = "GhostTableMigrated"
|
||||||
AllEventsUpToLockProcessed = "AllEventsUpToLockProcessed"
|
AllEventsUpToLockProcessed = "AllEventsUpToLockProcessed"
|
||||||
|
ReadMigrationRangeValues = "ReadMigrationRangeValues"
|
||||||
)
|
)
|
||||||
|
|
||||||
func ReadChangelogState(s string) ChangelogState {
|
func ReadChangelogState(s string) ChangelogState {
|
||||||
@ -234,6 +235,8 @@ func (this *Migrator) onChangelogStateEvent(dmlEvent *binlog.BinlogDMLEvent) (er
|
|||||||
this.applyEventsQueue <- newApplyEventStructByFunc(&applyEventFunc)
|
this.applyEventsQueue <- newApplyEventStructByFunc(&applyEventFunc)
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
case ReadMigrationRangeValues:
|
||||||
|
// no-op event
|
||||||
default:
|
default:
|
||||||
{
|
{
|
||||||
return fmt.Errorf("Unknown changelog state: %+v", changelogState)
|
return fmt.Errorf("Unknown changelog state: %+v", changelogState)
|
||||||
|
Loading…
Reference in New Issue
Block a user