Merge pull request #172 from github/transactional-apply-dml
DML write wrapped in transaction
This commit is contained in:
commit
41d43933c8
@ -854,40 +854,40 @@ func (this *Applier) ApplyDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) error {
|
||||
return err
|
||||
}
|
||||
|
||||
// TODO The below is commented, and is in preparation for transactional writes on the ghost tables.
|
||||
// TODO The below is in preparation for transactional writes on the ghost tables.
|
||||
// Such writes would be, for example:
|
||||
// - prepended with sql_mode setup
|
||||
// - prepended with time zone setup
|
||||
// - prepended with SET SQL_LOG_BIN=0
|
||||
// - prepended with SET FK_CHECKS=0
|
||||
// etc.
|
||||
//
|
||||
// Current known problem: https://github.com/golang/go/issues/9373 -- bitint unsigned values, not supported in database/sql
|
||||
// a known problem: https://github.com/golang/go/issues/9373 -- bitint unsigned values, not supported in database/sql
|
||||
// is solved by silently converting unsigned bigints to string values.
|
||||
//
|
||||
|
||||
// err = func() error {
|
||||
// tx, err := this.db.Begin()
|
||||
// if err != nil {
|
||||
// return err
|
||||
// }
|
||||
// if _, err := tx.Exec(query, args...); err != nil {
|
||||
// return err
|
||||
// }
|
||||
// if err := tx.Commit(); err != nil {
|
||||
// return err
|
||||
// }
|
||||
// return nil
|
||||
// }()
|
||||
err = func() error {
|
||||
tx, err := this.db.Begin()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if _, err := tx.Exec(query, args...); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := tx.Commit(); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}()
|
||||
|
||||
_, err = sqlutils.Exec(this.db, query, args...)
|
||||
if err == nil {
|
||||
atomic.AddInt64(&this.migrationContext.TotalDMLEventsApplied, 1)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("%s; query=%s; args=%+v", err.Error(), query, args)
|
||||
return log.Errore(err)
|
||||
}
|
||||
// no error
|
||||
atomic.AddInt64(&this.migrationContext.TotalDMLEventsApplied, 1)
|
||||
if this.migrationContext.CountTableRows {
|
||||
atomic.AddInt64(&this.migrationContext.RowsEstimate, rowDelta)
|
||||
}
|
||||
if err != nil {
|
||||
err = fmt.Errorf("%s; query=%s; args=%+v", err.Error(), query, args)
|
||||
log.Errore(err)
|
||||
}
|
||||
return err
|
||||
return nil
|
||||
}
|
||||
|
@ -47,7 +47,7 @@ func fixArgType(arg interface{}, isUnsigned bool) interface{} {
|
||||
return uint32(i)
|
||||
}
|
||||
if i, ok := arg.(int64); ok {
|
||||
return uint64(i)
|
||||
return strconv.FormatUint(uint64(i), 10)
|
||||
}
|
||||
if i, ok := arg.(int); ok {
|
||||
return uint(i)
|
||||
|
Loading…
x
Reference in New Issue
Block a user