package replication import ( "encoding/binary" "fmt" "io" "strconv" "strings" "time" "unicode" "github.com/juju/errors" "github.com/satori/go.uuid" . "github.com/siddontang/go-mysql/mysql" ) const ( EventHeaderSize = 19 SidLength = 16 LogicalTimestampTypeCode = 2 PartLogicalTimestampLength = 8 BinlogChecksumLength = 4 ) type BinlogEvent struct { // raw binlog data which contains all data, including binlog header and event body, and including crc32 checksum if exists RawData []byte Header *EventHeader Event Event } func (e *BinlogEvent) Dump(w io.Writer) { e.Header.Dump(w) e.Event.Dump(w) } type Event interface { //Dump Event, format like python-mysql-replication Dump(w io.Writer) Decode(data []byte) error } type EventError struct { Header *EventHeader //Error message Err string //Event data Data []byte } func (e *EventError) Error() string { return fmt.Sprintf("Header %#v, Data %q, Err: %v", e.Header, e.Data, e.Err) } type EventHeader struct { Timestamp uint32 EventType EventType ServerID uint32 EventSize uint32 LogPos uint32 Flags uint16 } func (h *EventHeader) Decode(data []byte) error { if len(data) < EventHeaderSize { return errors.Errorf("header size too short %d, must 19", len(data)) } pos := 0 h.Timestamp = binary.LittleEndian.Uint32(data[pos:]) pos += 4 h.EventType = EventType(data[pos]) pos++ h.ServerID = binary.LittleEndian.Uint32(data[pos:]) pos += 4 h.EventSize = binary.LittleEndian.Uint32(data[pos:]) pos += 4 h.LogPos = binary.LittleEndian.Uint32(data[pos:]) pos += 4 h.Flags = binary.LittleEndian.Uint16(data[pos:]) pos += 2 if h.EventSize < uint32(EventHeaderSize) { return errors.Errorf("invalid event size %d, must >= 19", h.EventSize) } return nil } func (h *EventHeader) Dump(w io.Writer) { fmt.Fprintf(w, "=== %s ===\n", EventType(h.EventType)) fmt.Fprintf(w, "Date: %s\n", time.Unix(int64(h.Timestamp), 0).Format(TimeFormat)) fmt.Fprintf(w, "Log position: %d\n", h.LogPos) fmt.Fprintf(w, "Event size: %d\n", h.EventSize) } var ( checksumVersionSplitMysql []int = []int{5, 6, 1} checksumVersionProductMysql int = (checksumVersionSplitMysql[0]*256+checksumVersionSplitMysql[1])*256 + checksumVersionSplitMysql[2] checksumVersionSplitMariaDB []int = []int{5, 3, 0} checksumVersionProductMariaDB int = (checksumVersionSplitMariaDB[0]*256+checksumVersionSplitMariaDB[1])*256 + checksumVersionSplitMariaDB[2] ) // server version format X.Y.Zabc, a is not . or number func splitServerVersion(server string) []int { seps := strings.Split(server, ".") if len(seps) < 3 { return []int{0, 0, 0} } x, _ := strconv.Atoi(seps[0]) y, _ := strconv.Atoi(seps[1]) index := 0 for i, c := range seps[2] { if !unicode.IsNumber(c) { index = i break } } z, _ := strconv.Atoi(seps[2][0:index]) return []int{x, y, z} } func calcVersionProduct(server string) int { versionSplit := splitServerVersion(server) return ((versionSplit[0]*256+versionSplit[1])*256 + versionSplit[2]) } type FormatDescriptionEvent struct { Version uint16 //len = 50 ServerVersion []byte CreateTimestamp uint32 EventHeaderLength uint8 EventTypeHeaderLengths []byte // 0 is off, 1 is for CRC32, 255 is undefined ChecksumAlgorithm byte } func (e *FormatDescriptionEvent) Decode(data []byte) error { pos := 0 e.Version = binary.LittleEndian.Uint16(data[pos:]) pos += 2 e.ServerVersion = make([]byte, 50) copy(e.ServerVersion, data[pos:]) pos += 50 e.CreateTimestamp = binary.LittleEndian.Uint32(data[pos:]) pos += 4 e.EventHeaderLength = data[pos] pos++ if e.EventHeaderLength != byte(EventHeaderSize) { return errors.Errorf("invalid event header length %d, must 19", e.EventHeaderLength) } server := string(e.ServerVersion) checksumProduct := checksumVersionProductMysql if strings.Contains(strings.ToLower(server), "mariadb") { checksumProduct = checksumVersionProductMariaDB } if calcVersionProduct(string(e.ServerVersion)) >= checksumProduct { // here, the last 5 bytes is 1 byte check sum alg type and 4 byte checksum if exists e.ChecksumAlgorithm = data[len(data)-5] e.EventTypeHeaderLengths = data[pos : len(data)-5] } else { e.ChecksumAlgorithm = BINLOG_CHECKSUM_ALG_UNDEF e.EventTypeHeaderLengths = data[pos:] } return nil } func (e *FormatDescriptionEvent) Dump(w io.Writer) { fmt.Fprintf(w, "Version: %d\n", e.Version) fmt.Fprintf(w, "Server version: %s\n", e.ServerVersion) //fmt.Fprintf(w, "Create date: %s\n", time.Unix(int64(e.CreateTimestamp), 0).Format(TimeFormat)) fmt.Fprintf(w, "Checksum algorithm: %d\n", e.ChecksumAlgorithm) //fmt.Fprintf(w, "Event header lengths: \n%s", hex.Dump(e.EventTypeHeaderLengths)) fmt.Fprintln(w) } type RotateEvent struct { Position uint64 NextLogName []byte } func (e *RotateEvent) Decode(data []byte) error { e.Position = binary.LittleEndian.Uint64(data[0:]) e.NextLogName = data[8:] return nil } func (e *RotateEvent) Dump(w io.Writer) { fmt.Fprintf(w, "Position: %d\n", e.Position) fmt.Fprintf(w, "Next log name: %s\n", e.NextLogName) fmt.Fprintln(w) } type XIDEvent struct { XID uint64 // in fact XIDEvent dosen't have the GTIDSet information, just for beneficial to use GSet GTIDSet } func (e *XIDEvent) Decode(data []byte) error { e.XID = binary.LittleEndian.Uint64(data) return nil } func (e *XIDEvent) Dump(w io.Writer) { fmt.Fprintf(w, "XID: %d\n", e.XID) if e.GSet != nil { fmt.Fprintf(w, "GTIDSet: %s\n", e.GSet.String()) } fmt.Fprintln(w) } type QueryEvent struct { SlaveProxyID uint32 ExecutionTime uint32 ErrorCode uint16 StatusVars []byte Schema []byte Query []byte // in fact QueryEvent dosen't have the GTIDSet information, just for beneficial to use GSet GTIDSet } func (e *QueryEvent) Decode(data []byte) error { pos := 0 e.SlaveProxyID = binary.LittleEndian.Uint32(data[pos:]) pos += 4 e.ExecutionTime = binary.LittleEndian.Uint32(data[pos:]) pos += 4 schemaLength := uint8(data[pos]) pos++ e.ErrorCode = binary.LittleEndian.Uint16(data[pos:]) pos += 2 statusVarsLength := binary.LittleEndian.Uint16(data[pos:]) pos += 2 e.StatusVars = data[pos : pos+int(statusVarsLength)] pos += int(statusVarsLength) e.Schema = data[pos : pos+int(schemaLength)] pos += int(schemaLength) //skip 0x00 pos++ e.Query = data[pos:] return nil } func (e *QueryEvent) Dump(w io.Writer) { fmt.Fprintf(w, "Slave proxy ID: %d\n", e.SlaveProxyID) fmt.Fprintf(w, "Execution time: %d\n", e.ExecutionTime) fmt.Fprintf(w, "Error code: %d\n", e.ErrorCode) //fmt.Fprintf(w, "Status vars: \n%s", hex.Dump(e.StatusVars)) fmt.Fprintf(w, "Schema: %s\n", e.Schema) fmt.Fprintf(w, "Query: %s\n", e.Query) if e.GSet != nil { fmt.Fprintf(w, "GTIDSet: %s\n", e.GSet.String()) } fmt.Fprintln(w) } type GTIDEvent struct { CommitFlag uint8 SID []byte GNO int64 LastCommitted int64 SequenceNumber int64 } func (e *GTIDEvent) Decode(data []byte) error { pos := 0 e.CommitFlag = uint8(data[pos]) pos++ e.SID = data[pos : pos+SidLength] pos += SidLength e.GNO = int64(binary.LittleEndian.Uint64(data[pos:])) pos += 8 if len(data) >= 42 { if uint8(data[pos]) == LogicalTimestampTypeCode { pos++ e.LastCommitted = int64(binary.LittleEndian.Uint64(data[pos:])) pos += PartLogicalTimestampLength e.SequenceNumber = int64(binary.LittleEndian.Uint64(data[pos:])) } } return nil } func (e *GTIDEvent) Dump(w io.Writer) { fmt.Fprintf(w, "Commit flag: %d\n", e.CommitFlag) u, _ := uuid.FromBytes(e.SID) fmt.Fprintf(w, "GTID_NEXT: %s:%d\n", u.String(), e.GNO) fmt.Fprintf(w, "LAST_COMMITTED: %d\n", e.LastCommitted) fmt.Fprintf(w, "SEQUENCE_NUMBER: %d\n", e.SequenceNumber) fmt.Fprintln(w) } type BeginLoadQueryEvent struct { FileID uint32 BlockData []byte } func (e *BeginLoadQueryEvent) Decode(data []byte) error { pos := 0 e.FileID = binary.LittleEndian.Uint32(data[pos:]) pos += 4 e.BlockData = data[pos:] return nil } func (e *BeginLoadQueryEvent) Dump(w io.Writer) { fmt.Fprintf(w, "File ID: %d\n", e.FileID) fmt.Fprintf(w, "Block data: %s\n", e.BlockData) fmt.Fprintln(w) } type ExecuteLoadQueryEvent struct { SlaveProxyID uint32 ExecutionTime uint32 SchemaLength uint8 ErrorCode uint16 StatusVars uint16 FileID uint32 StartPos uint32 EndPos uint32 DupHandlingFlags uint8 } func (e *ExecuteLoadQueryEvent) Decode(data []byte) error { pos := 0 e.SlaveProxyID = binary.LittleEndian.Uint32(data[pos:]) pos += 4 e.ExecutionTime = binary.LittleEndian.Uint32(data[pos:]) pos += 4 e.SchemaLength = uint8(data[pos]) pos++ e.ErrorCode = binary.LittleEndian.Uint16(data[pos:]) pos += 2 e.StatusVars = binary.LittleEndian.Uint16(data[pos:]) pos += 2 e.FileID = binary.LittleEndian.Uint32(data[pos:]) pos += 4 e.StartPos = binary.LittleEndian.Uint32(data[pos:]) pos += 4 e.EndPos = binary.LittleEndian.Uint32(data[pos:]) pos += 4 e.DupHandlingFlags = uint8(data[pos]) return nil } func (e *ExecuteLoadQueryEvent) Dump(w io.Writer) { fmt.Fprintf(w, "Slave proxy ID: %d\n", e.SlaveProxyID) fmt.Fprintf(w, "Execution time: %d\n", e.ExecutionTime) fmt.Fprintf(w, "Schame length: %d\n", e.SchemaLength) fmt.Fprintf(w, "Error code: %d\n", e.ErrorCode) fmt.Fprintf(w, "Status vars length: %d\n", e.StatusVars) fmt.Fprintf(w, "File ID: %d\n", e.FileID) fmt.Fprintf(w, "Start pos: %d\n", e.StartPos) fmt.Fprintf(w, "End pos: %d\n", e.EndPos) fmt.Fprintf(w, "Dup handling flags: %d\n", e.DupHandlingFlags) fmt.Fprintln(w) } // case MARIADB_ANNOTATE_ROWS_EVENT: // return "MariadbAnnotateRowsEvent" type MariadbAnnotateRowsEvent struct { Query []byte } func (e *MariadbAnnotateRowsEvent) Decode(data []byte) error { e.Query = data return nil } func (e *MariadbAnnotateRowsEvent) Dump(w io.Writer) { fmt.Fprintf(w, "Query: %s\n", e.Query) fmt.Fprintln(w) } type MariadbBinlogCheckPointEvent struct { Info []byte } func (e *MariadbBinlogCheckPointEvent) Decode(data []byte) error { e.Info = data return nil } func (e *MariadbBinlogCheckPointEvent) Dump(w io.Writer) { fmt.Fprintf(w, "Info: %s\n", e.Info) fmt.Fprintln(w) } type MariadbGTIDEvent struct { GTID MariadbGTID } func (e *MariadbGTIDEvent) Decode(data []byte) error { e.GTID.SequenceNumber = binary.LittleEndian.Uint64(data) e.GTID.DomainID = binary.LittleEndian.Uint32(data[8:]) // we don't care commit id now, maybe later return nil } func (e *MariadbGTIDEvent) Dump(w io.Writer) { fmt.Fprintf(w, "GTID: %v\n", e.GTID) fmt.Fprintln(w) } type MariadbGTIDListEvent struct { GTIDs []MariadbGTID } func (e *MariadbGTIDListEvent) Decode(data []byte) error { pos := 0 v := binary.LittleEndian.Uint32(data[pos:]) pos += 4 count := v & uint32((1<<28)-1) e.GTIDs = make([]MariadbGTID, count) for i := uint32(0); i < count; i++ { e.GTIDs[i].DomainID = binary.LittleEndian.Uint32(data[pos:]) pos += 4 e.GTIDs[i].ServerID = binary.LittleEndian.Uint32(data[pos:]) pos += 4 e.GTIDs[i].SequenceNumber = binary.LittleEndian.Uint64(data[pos:]) } return nil } func (e *MariadbGTIDListEvent) Dump(w io.Writer) { fmt.Fprintf(w, "Lists: %v\n", e.GTIDs) fmt.Fprintln(w) }