begin work on positional columns. Incomplete

This commit is contained in:
Shlomi Noach 2016-03-24 22:27:39 +01:00
parent 96a8fd50c3
commit 30a472f741
3 changed files with 65 additions and 8 deletions

View File

@ -5,10 +5,27 @@
package binlog package binlog
// BinlogEntry describes an entry in the binary log
type BinlogEntry struct { type BinlogEntry struct {
LogPos uint64 LogPos uint64
EndLogPos uint64 EndLogPos uint64
StatementType string // INSERT, UPDATE, DELETE StatementType string // INSERT, UPDATE, DELETE
DatabaseName string DatabaseName string
TableName string TableName string
PositionalColumns map[uint64]interface{}
}
// NewBinlogEntry creates an empty, ready to go BinlogEntry object
func NewBinlogEntry() *BinlogEntry {
binlogEntry := &BinlogEntry{}
binlogEntry.PositionalColumns = make(map[uint64]interface{})
return binlogEntry
}
// Duplicate creates and returns a new binlog entry, with some of the attributes pre-assigned
func (this *BinlogEntry) Duplicate() *BinlogEntry {
binlogEntry := NewBinlogEntry()
binlogEntry.LogPos = this.LogPos
binlogEntry.EndLogPos = this.EndLogPos
return binlogEntry
} }

View File

@ -25,6 +25,7 @@ var (
endLogPosRegexp = regexp.MustCompile("^#[0-9]{6} .*? end_log_pos ([0-9]+)") endLogPosRegexp = regexp.MustCompile("^#[0-9]{6} .*? end_log_pos ([0-9]+)")
statementRegxp = regexp.MustCompile("### (INSERT INTO|UPDATE|DELETE FROM) `(.*?)`[.]`(.*?)`") statementRegxp = regexp.MustCompile("### (INSERT INTO|UPDATE|DELETE FROM) `(.*?)`[.]`(.*?)`")
tokenRegxp = regexp.MustCompile("### (WHERE|SET)$") tokenRegxp = regexp.MustCompile("### (WHERE|SET)$")
positionalColumnRegexp = regexp.MustCompile("### @([0-9]+)=(.+)$")
) )
// BinlogEntryState is a state in the binlog parser automaton / state machine // BinlogEntryState is a state in the binlog parser automaton / state machine
@ -104,7 +105,7 @@ func searchForStartPosOrStatement(scanner *bufio.Scanner, binlogEntry *BinlogEnt
nextBinlogEntry = binlogEntry nextBinlogEntry = binlogEntry
if binlogEntry.LogPos != 0 && binlogEntry.StatementType != "" { if binlogEntry.LogPos != 0 && binlogEntry.StatementType != "" {
// Current entry is already a true entry, with startpos and with statement // Current entry is already a true entry, with startpos and with statement
nextBinlogEntry = &BinlogEntry{} nextBinlogEntry = NewBinlogEntry()
} }
nextBinlogEntry.LogPos = startLogPos nextBinlogEntry.LogPos = startLogPos
@ -115,7 +116,7 @@ func searchForStartPosOrStatement(scanner *bufio.Scanner, binlogEntry *BinlogEnt
nextBinlogEntry = binlogEntry nextBinlogEntry = binlogEntry
if binlogEntry.LogPos != 0 && binlogEntry.StatementType != "" { if binlogEntry.LogPos != 0 && binlogEntry.StatementType != "" {
// Current entry is already a true entry, with startpos and with statement // Current entry is already a true entry, with startpos and with statement
nextBinlogEntry = &BinlogEntry{LogPos: binlogEntry.LogPos, EndLogPos: binlogEntry.EndLogPos} nextBinlogEntry = binlogEntry.Duplicate()
} }
nextBinlogEntry.StatementType = strings.Split(submatch[1], " ")[0] nextBinlogEntry.StatementType = strings.Split(submatch[1], " ")[0]
@ -125,6 +126,19 @@ func searchForStartPosOrStatement(scanner *bufio.Scanner, binlogEntry *BinlogEnt
return ExpectTokenState, nextBinlogEntry, nil return ExpectTokenState, nextBinlogEntry, nil
} }
onPositionalColumn := func(submatch []string) (BinlogEntryState, *BinlogEntry, error) {
columnIndex, _ := strconv.ParseUint(submatch[1], 10, 64)
if _, found := binlogEntry.PositionalColumns[columnIndex]; found {
return InvalidState, binlogEntry, fmt.Errorf("Positional column %+v found more than once in %+v, statement=%+v", columnIndex, binlogEntry.LogPos, binlogEntry.StatementType)
}
columnValue := submatch[2]
columnValue = strings.TrimPrefix(columnValue, "'")
columnValue = strings.TrimSuffix(columnValue, "'")
binlogEntry.PositionalColumns[columnIndex] = columnValue
return SearchForStartPosOrStatementState, binlogEntry, nil
}
line := scanner.Text() line := scanner.Text()
if submatch := startEntryRegexp.FindStringSubmatch(line); len(submatch) > 1 { if submatch := startEntryRegexp.FindStringSubmatch(line); len(submatch) > 1 {
return onStartEntry(submatch) return onStartEntry(submatch)
@ -135,6 +149,9 @@ func searchForStartPosOrStatement(scanner *bufio.Scanner, binlogEntry *BinlogEnt
if submatch := statementRegxp.FindStringSubmatch(line); len(submatch) > 1 { if submatch := statementRegxp.FindStringSubmatch(line); len(submatch) > 1 {
return onStatementEntry(submatch) return onStatementEntry(submatch)
} }
if submatch := positionalColumnRegexp.FindStringSubmatch(line); len(submatch) > 1 {
return onPositionalColumn(submatch)
}
// Haven't found a match // Haven't found a match
return SearchForStartPosOrStatementState, binlogEntry, nil return SearchForStartPosOrStatementState, binlogEntry, nil
} }
@ -164,7 +181,7 @@ func expectToken(scanner *bufio.Scanner, binlogEntry *BinlogEntry) (nextState Bi
// parseEntries will parse output of `mysqlbinlog --verbose --base64-output=DECODE-ROWS` // parseEntries will parse output of `mysqlbinlog --verbose --base64-output=DECODE-ROWS`
// It issues an automaton / state machine to do its thang. // It issues an automaton / state machine to do its thang.
func parseEntries(scanner *bufio.Scanner) (entries [](*BinlogEntry), err error) { func parseEntries(scanner *bufio.Scanner) (entries [](*BinlogEntry), err error) {
binlogEntry := &BinlogEntry{} binlogEntry := NewBinlogEntry()
var state BinlogEntryState = SearchForStartPosOrStatementState var state BinlogEntryState = SearchForStartPosOrStatementState
var endLogPos uint64 var endLogPos uint64

View File

@ -1,3 +1,26 @@
/*
these are the statements that were used to execute the RBR log:
create table samplet(id int primary key, license int, name varchar(64), unique key license_uidx(license)) engine=innodb;
insert into samplet values (1,1,'a');
insert into samplet values (2,2,'extended'),(3,3,'extended');
begin;
insert into samplet values (4,4,'transaction');
insert into samplet values (5,5,'transaction');
insert into samplet values (6,6,'transaction');
commit;
update samplet set name='update' where id=5;
replace into samplet values (2,4,'replaced 2,4');
insert into samplet values (7,7,'7');
insert into samplet values (8,8,'8');
delete from samplet where id >= 7;
insert into samplet values (9,9,'9');
begin;
update samplet set name='update 9' where id=9;
delete from samplet where license=3;
insert into samplet values (10,10,'10');
commit;
*/
/*!50530 SET @@SESSION.PSEUDO_SLAVE_MODE=1*/; /*!50530 SET @@SESSION.PSEUDO_SLAVE_MODE=1*/;
/*!40019 SET @@session.max_insert_delayed_threads=0*/; /*!40019 SET @@session.max_insert_delayed_threads=0*/;
/*!50003 SET @OLD_COMPLETION_TYPE=@@COMPLETION_TYPE,COMPLETION_TYPE=0*/; /*!50003 SET @OLD_COMPLETION_TYPE=@@COMPLETION_TYPE,COMPLETION_TYPE=0*/;