Merge branch 'master' into fix-issue-1117
This commit is contained in:
commit
b7f7e2c0b0
@ -421,8 +421,28 @@ func (this *Applier) ReadMigrationMaxValues(uniqueKey *sql.UniqueKey) error {
|
||||
return err
|
||||
}
|
||||
|
||||
// ReadMigrationRangeValues reads min/max values that will be used for rowcopy
|
||||
// ReadMigrationRangeValues reads min/max values that will be used for rowcopy.
|
||||
// Before read min/max, write a changelog state into the ghc table to avoid lost data in mysql two-phase commit.
|
||||
/*
|
||||
Detail description of the lost data in mysql two-phase commit issue by @Fanduzi:
|
||||
When using semi-sync and setting rpl_semi_sync_master_wait_point=AFTER_SYNC,
|
||||
if an INSERT statement is being committed but blocks due to an unmet ack count,
|
||||
the data inserted by the transaction is not visible to ReadMigrationRangeValues,
|
||||
so the copy of the existing data in the table does not include the new row inserted by the transaction.
|
||||
However, the binlog event for the transaction is already written to the binlog,
|
||||
so the addDMLEventsListener only captures the binlog event after the transaction,
|
||||
and thus the transaction's binlog event is not captured, resulting in data loss.
|
||||
|
||||
If write a changelog into ghc table before ReadMigrationRangeValues, and the transaction commit blocks
|
||||
because the ack is not met, then the changelog will not be able to write, so the ReadMigrationRangeValues
|
||||
will not be run. When the changelog writes successfully, the ReadMigrationRangeValues will read the
|
||||
newly inserted data, thus Avoiding data loss due to the above problem.
|
||||
*/
|
||||
func (this *Applier) ReadMigrationRangeValues() error {
|
||||
if _, err := this.WriteChangelogState(string(ReadMigrationRangeValues)); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := this.ReadMigrationMinValues(this.migrationContext.UniqueKey); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -26,6 +26,7 @@ type ChangelogState string
|
||||
const (
|
||||
GhostTableMigrated ChangelogState = "GhostTableMigrated"
|
||||
AllEventsUpToLockProcessed = "AllEventsUpToLockProcessed"
|
||||
ReadMigrationRangeValues = "ReadMigrationRangeValues"
|
||||
)
|
||||
|
||||
func ReadChangelogState(s string) ChangelogState {
|
||||
@ -234,6 +235,8 @@ func (this *Migrator) onChangelogStateEvent(dmlEvent *binlog.BinlogDMLEvent) (er
|
||||
this.applyEventsQueue <- newApplyEventStructByFunc(&applyEventFunc)
|
||||
}()
|
||||
}
|
||||
case ReadMigrationRangeValues:
|
||||
// no-op event
|
||||
default:
|
||||
{
|
||||
return fmt.Errorf("Unknown changelog state: %+v", changelogState)
|
||||
|
Loading…
Reference in New Issue
Block a user