some cleanup
This commit is contained in:
parent
fc787dd75f
commit
2f6975d39a
@ -891,9 +891,6 @@ func (this *Applier) buildDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) (query
|
|||||||
{
|
{
|
||||||
if this.migrationContext.UniqueKey.IsPrimary() {
|
if this.migrationContext.UniqueKey.IsPrimary() {
|
||||||
query, sharedArgs, err := sql.BuildPKInsertPreparedQuery(dmlEvent.DatabaseName, this.migrationContext.OriginalTableName, this.migrationContext.GetGhostTableName(), this.migrationContext.OriginalTableColumns, this.migrationContext.SharedColumns.Names(), this.migrationContext.MappedSharedColumns.Names(), this.migrationContext.UniqueKey, dmlEvent.NewColumnValues.AbstractValues(), this.migrationContext.IsTransactionalTable())
|
query, sharedArgs, err := sql.BuildPKInsertPreparedQuery(dmlEvent.DatabaseName, this.migrationContext.OriginalTableName, this.migrationContext.GetGhostTableName(), this.migrationContext.OriginalTableColumns, this.migrationContext.SharedColumns.Names(), this.migrationContext.MappedSharedColumns.Names(), this.migrationContext.UniqueKey, dmlEvent.NewColumnValues.AbstractValues(), this.migrationContext.IsTransactionalTable())
|
||||||
log.Errorf("-------------- insert")
|
|
||||||
log.Errorf("query: %+v", query)
|
|
||||||
log.Errorf("argss: %+v", sharedArgs)
|
|
||||||
return query, sharedArgs, false, 1, err
|
return query, sharedArgs, false, 1, err
|
||||||
}
|
}
|
||||||
query, sharedArgs, err := sql.BuildDMLInsertQuery(dmlEvent.DatabaseName, this.migrationContext.GetGhostTableName(), this.migrationContext.OriginalTableColumns, this.migrationContext.SharedColumns, this.migrationContext.MappedSharedColumns, dmlEvent.NewColumnValues.AbstractValues())
|
query, sharedArgs, err := sql.BuildDMLInsertQuery(dmlEvent.DatabaseName, this.migrationContext.GetGhostTableName(), this.migrationContext.OriginalTableColumns, this.migrationContext.SharedColumns, this.migrationContext.MappedSharedColumns, dmlEvent.NewColumnValues.AbstractValues())
|
||||||
@ -903,10 +900,6 @@ func (this *Applier) buildDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) (query
|
|||||||
{
|
{
|
||||||
if this.migrationContext.UniqueKey.IsPrimary() {
|
if this.migrationContext.UniqueKey.IsPrimary() {
|
||||||
query, sharedArgs, err := sql.BuildPKInsertPreparedQuery(dmlEvent.DatabaseName, this.migrationContext.OriginalTableName, this.migrationContext.GetGhostTableName(), this.migrationContext.OriginalTableColumns, this.migrationContext.SharedColumns.Names(), this.migrationContext.MappedSharedColumns.Names(), this.migrationContext.UniqueKey, dmlEvent.NewColumnValues.AbstractValues(), this.migrationContext.IsTransactionalTable())
|
query, sharedArgs, err := sql.BuildPKInsertPreparedQuery(dmlEvent.DatabaseName, this.migrationContext.OriginalTableName, this.migrationContext.GetGhostTableName(), this.migrationContext.OriginalTableColumns, this.migrationContext.SharedColumns.Names(), this.migrationContext.MappedSharedColumns.Names(), this.migrationContext.UniqueKey, dmlEvent.NewColumnValues.AbstractValues(), this.migrationContext.IsTransactionalTable())
|
||||||
log.Errorf("-------------- update")
|
|
||||||
log.Errorf("query: %+v", query)
|
|
||||||
log.Errorf("argss: %+v", sharedArgs)
|
|
||||||
|
|
||||||
return query, sharedArgs, false, 1, err
|
return query, sharedArgs, false, 1, err
|
||||||
}
|
}
|
||||||
query, sharedArgs, uniqueKeyArgs, err := sql.BuildDMLUpdateQuery(dmlEvent.DatabaseName, this.migrationContext.GetGhostTableName(), this.migrationContext.OriginalTableColumns, this.migrationContext.SharedColumns, this.migrationContext.MappedSharedColumns, &this.migrationContext.UniqueKey.Columns, dmlEvent.NewColumnValues.AbstractValues(), dmlEvent.WhereColumnValues.AbstractValues())
|
query, sharedArgs, uniqueKeyArgs, err := sql.BuildDMLUpdateQuery(dmlEvent.DatabaseName, this.migrationContext.GetGhostTableName(), this.migrationContext.OriginalTableColumns, this.migrationContext.SharedColumns, this.migrationContext.MappedSharedColumns, &this.migrationContext.UniqueKey.Columns, dmlEvent.NewColumnValues.AbstractValues(), dmlEvent.WhereColumnValues.AbstractValues())
|
||||||
@ -918,64 +911,6 @@ func (this *Applier) buildDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) (query
|
|||||||
return "", args, false, 0, fmt.Errorf("Unknown dml event type: %+v", dmlEvent.DML)
|
return "", args, false, 0, fmt.Errorf("Unknown dml event type: %+v", dmlEvent.DML)
|
||||||
}
|
}
|
||||||
|
|
||||||
// ApplyDMLEventQuery writes an entry to the ghost table, in response to an intercepted
|
|
||||||
// original-table binlog event
|
|
||||||
func (this *Applier) ApplyDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) error {
|
|
||||||
query, args, dataViaBinlog, rowDelta, err := this.buildDMLEventQuery(dmlEvent)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
// TODO The below is in preparation for transactional writes on the ghost tables.
|
|
||||||
// Such writes would be, for example:
|
|
||||||
// - prepended with sql_mode setup
|
|
||||||
// - prepended with time zone setup
|
|
||||||
// - prepended with SET SQL_LOG_BIN=0
|
|
||||||
// - prepended with SET FK_CHECKS=0
|
|
||||||
// etc.
|
|
||||||
//
|
|
||||||
// a known problem: https://github.com/golang/go/issues/9373 -- bitint unsigned values, not supported in database/sql
|
|
||||||
// is solved by silently converting unsigned bigints to string values.
|
|
||||||
//
|
|
||||||
|
|
||||||
err = func() error {
|
|
||||||
tx, err := this.db.Begin()
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
extraSessionChanges := ""
|
|
||||||
if dataViaBinlog {
|
|
||||||
extraSessionChanges = `
|
|
||||||
SESSION time_zone = '+00:00',
|
|
||||||
`
|
|
||||||
}
|
|
||||||
sessionQuery := fmt.Sprintf(`SET
|
|
||||||
%s
|
|
||||||
sql_mode = CONCAT(@@session.sql_mode, ',STRICT_ALL_TABLES')
|
|
||||||
`, extraSessionChanges)
|
|
||||||
if _, err := tx.Exec(sessionQuery); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if _, err := tx.Exec(query, args...); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if err := tx.Commit(); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}()
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
err = fmt.Errorf("%s; query=%s; args=%+v", err.Error(), query, args)
|
|
||||||
return log.Errore(err)
|
|
||||||
}
|
|
||||||
// no error
|
|
||||||
atomic.AddInt64(&this.migrationContext.TotalDMLEventsApplied, 1)
|
|
||||||
if this.migrationContext.CountTableRows {
|
|
||||||
atomic.AddInt64(&this.migrationContext.RowsDeltaEstimate, rowDelta)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// ApplyDMLEventQueries applies multiple DML queries onto the _ghost_ table
|
// ApplyDMLEventQueries applies multiple DML queries onto the _ghost_ table
|
||||||
func (this *Applier) ApplyDMLEventQueries(dmlEvents [](*binlog.BinlogDMLEvent)) error {
|
func (this *Applier) ApplyDMLEventQueries(dmlEvents [](*binlog.BinlogDMLEvent)) error {
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user