diff --git a/go/logic/applier.go b/go/logic/applier.go index 9bd2ba8..d6e5d5d 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -421,8 +421,28 @@ func (this *Applier) ReadMigrationMaxValues(uniqueKey *sql.UniqueKey) error { return err } -// ReadMigrationRangeValues reads min/max values that will be used for rowcopy +// ReadMigrationRangeValues reads min/max values that will be used for rowcopy. +// Before read min/max, write a changelog state into the ghc table to avoid lost data in mysql two-phase commit. +/* +Detail description of the lost data in mysql two-phase commit issue by @Fanduzi: + When using semi-sync and setting rpl_semi_sync_master_wait_point=AFTER_SYNC, + if an INSERT statement is being committed but blocks due to an unmet ack count, + the data inserted by the transaction is not visible to ReadMigrationRangeValues, + so the copy of the existing data in the table does not include the new row inserted by the transaction. + However, the binlog event for the transaction is already written to the binlog, + so the addDMLEventsListener only captures the binlog event after the transaction, + and thus the transaction's binlog event is not captured, resulting in data loss. + + If write a changelog into ghc table before ReadMigrationRangeValues, and the transaction commit blocks + because the ack is not met, then the changelog will not be able to write, so the ReadMigrationRangeValues + will not be run. When the changelog writes successfully, the ReadMigrationRangeValues will read the + newly inserted data, thus Avoiding data loss due to the above problem. +*/ func (this *Applier) ReadMigrationRangeValues() error { + if _, err := this.WriteChangelogState(string(ReadMigrationRangeValues)); err != nil { + return err + } + if err := this.ReadMigrationMinValues(this.migrationContext.UniqueKey); err != nil { return err } diff --git a/go/logic/migrator.go b/go/logic/migrator.go index b5c405a..6956149 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -26,6 +26,7 @@ type ChangelogState string const ( GhostTableMigrated ChangelogState = "GhostTableMigrated" AllEventsUpToLockProcessed = "AllEventsUpToLockProcessed" + ReadMigrationRangeValues = "ReadMigrationRangeValues" ) func ReadChangelogState(s string) ChangelogState { @@ -234,6 +235,8 @@ func (this *Migrator) onChangelogStateEvent(dmlEvent *binlog.BinlogDMLEvent) (er this.applyEventsQueue <- newApplyEventStructByFunc(&applyEventFunc) }() } + case ReadMigrationRangeValues: + // no-op event default: { return fmt.Errorf("Unknown changelog state: %+v", changelogState)