Compare commits

...

7 Commits

Author SHA1 Message Date
Shlomi Noach
de8b13911c Merge branch 'master' into go-mysql-updates-201709 2017-10-24 15:27:45 +03:00
Shlomi Noach
fe6b11d73d better diff output in localtests 2017-10-24 15:24:31 +03:00
Shlomi Noach
e28c3c02b2 Merge branch 'master' into go-mysql-updates-201709 2017-10-24 14:45:53 +03:00
Shlomi Noach
aac063f6f9 Merge branch 'master' into go-mysql-updates-201709 2017-10-17 10:03:04 +03:00
Shlomi Noach
2635a26069 Merge branch 'master' into go-mysql-updates-201709 2017-10-02 08:43:25 +03:00
Shlomi Noach
3dd15dfd18 updated local code 2017-10-02 08:42:38 +03:00
Shlomi Noach
5a9745e2b6 updated go-mysql library 2017-10-02 08:40:33 +03:00
42 changed files with 1010 additions and 816 deletions

View File

@ -49,7 +49,7 @@ func NewGoMySQLReader(connectionConfig *mysql.ConnectionConfig) (binlogReader *G
User: connectionConfig.User,
Password: connectionConfig.Password,
}
binlogReader.binlogSyncer = replication.NewBinlogSyncer(binlogSyncerConfig)
binlogReader.binlogSyncer = replication.NewBinlogSyncer(*binlogSyncerConfig)
return binlogReader, err
}

View File

@ -1,3 +0,0 @@
var
bin
.idea

View File

@ -1,8 +1,9 @@
language: go
go:
- 1.6
- 1.7
- 1.8
- 1.9
dist: trusty
sudo: required

View File

@ -25,7 +25,7 @@ cfg := replication.BinlogSyncerConfig {
User: "root",
Password: "",
}
syncer := replication.NewBinlogSyncer(&cfg)
syncer := replication.NewBinlogSyncer(cfg)
// Start sync with sepcified binlog file and position
streamer, _ := syncer.StartSync(mysql.Position{binlogFile, binlogPos})
@ -105,20 +105,21 @@ cfg.Dump.Tables = []string{"canal_test"}
c, err := NewCanal(cfg)
type myRowsEventHandler struct {
type MyEventHandler struct {
DummyEventHandler
}
func (h *myRowsEventHandler) Do(e *RowsEvent) error {
func (h *MyEventHandler) OnRow(e *RowsEvent) error {
log.Infof("%s %v\n", e.Action, e.Rows)
return nil
}
func (h *myRowsEventHandler) String() string {
return "myRowsEventHandler"
func (h *MyEventHandler) String() string {
return "MyEventHandler"
}
// Register a handler to handle RowsEvent
c.RegRowsEventHandler(&MyRowsEventHandler{})
c.SetEventHandler(&MyEventHandler{})
// Start canal
c.Start()
@ -221,6 +222,14 @@ func main() {
We pass all tests in https://github.com/bradfitz/go-sql-test using go-mysql driver. :-)
## Donate
If you like the project and want to buy me a cola, you can through:
|PayPal|微信|
|------|---|
|[![](https://www.paypalobjects.com/webstatic/paypalme/images/pp_logo_small.png)](https://paypal.me/siddontang)|[![](https://github.com/siddontang/blog/blob/master/donate/weixin.png)|
## Feedback
go-mysql is still in development, your feedback is very welcome.

View File

@ -1,27 +0,0 @@
Copyright (c) 2009 The Go Authors. All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are
met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above
copyright notice, this list of conditions and the following disclaimer
in the documentation and/or other materials provided with the
distribution.
* Neither the name of Google Inc. nor the names of its
contributors may be used to endorse or promote products derived from
this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

View File

@ -1,22 +0,0 @@
Additional IP Rights Grant (Patents)
"This implementation" means the copyrightable works distributed by
Google as part of the Go project.
Google hereby grants to You a perpetual, worldwide, non-exclusive,
no-charge, royalty-free, irrevocable (except as stated in this section)
patent license to make, have made, use, offer to sell, sell, import,
transfer and otherwise run, modify and propagate the contents of this
implementation of Go, where such license applies only to those patent
claims, both currently owned or controlled by Google and acquired in
the future, licensable by Google that are necessarily infringed by this
implementation of Go. This grant does not include claims that would be
infringed only as a consequence of further modification of this
implementation. If you or your agent or exclusive licensee institute or
order or agree to the institution of patent litigation against any
entity (including a cross-claim or counterclaim in a lawsuit) alleging
that this implementation of Go or any code incorporated within this
implementation of Go constitutes direct or contributory patent
infringement, or inducement of patent infringement, then any patent
rights granted to you under this License for this implementation of Go
shall terminate as of the date such litigation is filed.

View File

@ -1,447 +0,0 @@
// Copyright 2014 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package context defines the Context type, which carries deadlines,
// cancelation signals, and other request-scoped values across API boundaries
// and between processes.
//
// Incoming requests to a server should create a Context, and outgoing calls to
// servers should accept a Context. The chain of function calls between must
// propagate the Context, optionally replacing it with a modified copy created
// using WithDeadline, WithTimeout, WithCancel, or WithValue.
//
// Programs that use Contexts should follow these rules to keep interfaces
// consistent across packages and enable static analysis tools to check context
// propagation:
//
// Do not store Contexts inside a struct type; instead, pass a Context
// explicitly to each function that needs it. The Context should be the first
// parameter, typically named ctx:
//
// func DoSomething(ctx context.Context, arg Arg) error {
// // ... use ctx ...
// }
//
// Do not pass a nil Context, even if a function permits it. Pass context.TODO
// if you are unsure about which Context to use.
//
// Use context Values only for request-scoped data that transits processes and
// APIs, not for passing optional parameters to functions.
//
// The same Context may be passed to functions running in different goroutines;
// Contexts are safe for simultaneous use by multiple goroutines.
//
// See http://blog.golang.org/context for example code for a server that uses
// Contexts.
package context // import "golang.org/x/net/context"
import (
"errors"
"fmt"
"sync"
"time"
)
// A Context carries a deadline, a cancelation signal, and other values across
// API boundaries.
//
// Context's methods may be called by multiple goroutines simultaneously.
type Context interface {
// Deadline returns the time when work done on behalf of this context
// should be canceled. Deadline returns ok==false when no deadline is
// set. Successive calls to Deadline return the same results.
Deadline() (deadline time.Time, ok bool)
// Done returns a channel that's closed when work done on behalf of this
// context should be canceled. Done may return nil if this context can
// never be canceled. Successive calls to Done return the same value.
//
// WithCancel arranges for Done to be closed when cancel is called;
// WithDeadline arranges for Done to be closed when the deadline
// expires; WithTimeout arranges for Done to be closed when the timeout
// elapses.
//
// Done is provided for use in select statements:
//
// // Stream generates values with DoSomething and sends them to out
// // until DoSomething returns an error or ctx.Done is closed.
// func Stream(ctx context.Context, out <-chan Value) error {
// for {
// v, err := DoSomething(ctx)
// if err != nil {
// return err
// }
// select {
// case <-ctx.Done():
// return ctx.Err()
// case out <- v:
// }
// }
// }
//
// See http://blog.golang.org/pipelines for more examples of how to use
// a Done channel for cancelation.
Done() <-chan struct{}
// Err returns a non-nil error value after Done is closed. Err returns
// Canceled if the context was canceled or DeadlineExceeded if the
// context's deadline passed. No other values for Err are defined.
// After Done is closed, successive calls to Err return the same value.
Err() error
// Value returns the value associated with this context for key, or nil
// if no value is associated with key. Successive calls to Value with
// the same key returns the same result.
//
// Use context values only for request-scoped data that transits
// processes and API boundaries, not for passing optional parameters to
// functions.
//
// A key identifies a specific value in a Context. Functions that wish
// to store values in Context typically allocate a key in a global
// variable then use that key as the argument to context.WithValue and
// Context.Value. A key can be any type that supports equality;
// packages should define keys as an unexported type to avoid
// collisions.
//
// Packages that define a Context key should provide type-safe accessors
// for the values stores using that key:
//
// // Package user defines a User type that's stored in Contexts.
// package user
//
// import "golang.org/x/net/context"
//
// // User is the type of value stored in the Contexts.
// type User struct {...}
//
// // key is an unexported type for keys defined in this package.
// // This prevents collisions with keys defined in other packages.
// type key int
//
// // userKey is the key for user.User values in Contexts. It is
// // unexported; clients use user.NewContext and user.FromContext
// // instead of using this key directly.
// var userKey key = 0
//
// // NewContext returns a new Context that carries value u.
// func NewContext(ctx context.Context, u *User) context.Context {
// return context.WithValue(ctx, userKey, u)
// }
//
// // FromContext returns the User value stored in ctx, if any.
// func FromContext(ctx context.Context) (*User, bool) {
// u, ok := ctx.Value(userKey).(*User)
// return u, ok
// }
Value(key interface{}) interface{}
}
// Canceled is the error returned by Context.Err when the context is canceled.
var Canceled = errors.New("context canceled")
// DeadlineExceeded is the error returned by Context.Err when the context's
// deadline passes.
var DeadlineExceeded = errors.New("context deadline exceeded")
// An emptyCtx is never canceled, has no values, and has no deadline. It is not
// struct{}, since vars of this type must have distinct addresses.
type emptyCtx int
func (*emptyCtx) Deadline() (deadline time.Time, ok bool) {
return
}
func (*emptyCtx) Done() <-chan struct{} {
return nil
}
func (*emptyCtx) Err() error {
return nil
}
func (*emptyCtx) Value(key interface{}) interface{} {
return nil
}
func (e *emptyCtx) String() string {
switch e {
case background:
return "context.Background"
case todo:
return "context.TODO"
}
return "unknown empty Context"
}
var (
background = new(emptyCtx)
todo = new(emptyCtx)
)
// Background returns a non-nil, empty Context. It is never canceled, has no
// values, and has no deadline. It is typically used by the main function,
// initialization, and tests, and as the top-level Context for incoming
// requests.
func Background() Context {
return background
}
// TODO returns a non-nil, empty Context. Code should use context.TODO when
// it's unclear which Context to use or it is not yet available (because the
// surrounding function has not yet been extended to accept a Context
// parameter). TODO is recognized by static analysis tools that determine
// whether Contexts are propagated correctly in a program.
func TODO() Context {
return todo
}
// A CancelFunc tells an operation to abandon its work.
// A CancelFunc does not wait for the work to stop.
// After the first call, subsequent calls to a CancelFunc do nothing.
type CancelFunc func()
// WithCancel returns a copy of parent with a new Done channel. The returned
// context's Done channel is closed when the returned cancel function is called
// or when the parent context's Done channel is closed, whichever happens first.
//
// Canceling this context releases resources associated with it, so code should
// call cancel as soon as the operations running in this Context complete.
func WithCancel(parent Context) (ctx Context, cancel CancelFunc) {
c := newCancelCtx(parent)
propagateCancel(parent, &c)
return &c, func() { c.cancel(true, Canceled) }
}
// newCancelCtx returns an initialized cancelCtx.
func newCancelCtx(parent Context) cancelCtx {
return cancelCtx{
Context: parent,
done: make(chan struct{}),
}
}
// propagateCancel arranges for child to be canceled when parent is.
func propagateCancel(parent Context, child canceler) {
if parent.Done() == nil {
return // parent is never canceled
}
if p, ok := parentCancelCtx(parent); ok {
p.mu.Lock()
if p.err != nil {
// parent has already been canceled
child.cancel(false, p.err)
} else {
if p.children == nil {
p.children = make(map[canceler]bool)
}
p.children[child] = true
}
p.mu.Unlock()
} else {
go func() {
select {
case <-parent.Done():
child.cancel(false, parent.Err())
case <-child.Done():
}
}()
}
}
// parentCancelCtx follows a chain of parent references until it finds a
// *cancelCtx. This function understands how each of the concrete types in this
// package represents its parent.
func parentCancelCtx(parent Context) (*cancelCtx, bool) {
for {
switch c := parent.(type) {
case *cancelCtx:
return c, true
case *timerCtx:
return &c.cancelCtx, true
case *valueCtx:
parent = c.Context
default:
return nil, false
}
}
}
// removeChild removes a context from its parent.
func removeChild(parent Context, child canceler) {
p, ok := parentCancelCtx(parent)
if !ok {
return
}
p.mu.Lock()
if p.children != nil {
delete(p.children, child)
}
p.mu.Unlock()
}
// A canceler is a context type that can be canceled directly. The
// implementations are *cancelCtx and *timerCtx.
type canceler interface {
cancel(removeFromParent bool, err error)
Done() <-chan struct{}
}
// A cancelCtx can be canceled. When canceled, it also cancels any children
// that implement canceler.
type cancelCtx struct {
Context
done chan struct{} // closed by the first cancel call.
mu sync.Mutex
children map[canceler]bool // set to nil by the first cancel call
err error // set to non-nil by the first cancel call
}
func (c *cancelCtx) Done() <-chan struct{} {
return c.done
}
func (c *cancelCtx) Err() error {
c.mu.Lock()
defer c.mu.Unlock()
return c.err
}
func (c *cancelCtx) String() string {
return fmt.Sprintf("%v.WithCancel", c.Context)
}
// cancel closes c.done, cancels each of c's children, and, if
// removeFromParent is true, removes c from its parent's children.
func (c *cancelCtx) cancel(removeFromParent bool, err error) {
if err == nil {
panic("context: internal error: missing cancel error")
}
c.mu.Lock()
if c.err != nil {
c.mu.Unlock()
return // already canceled
}
c.err = err
close(c.done)
for child := range c.children {
// NOTE: acquiring the child's lock while holding parent's lock.
child.cancel(false, err)
}
c.children = nil
c.mu.Unlock()
if removeFromParent {
removeChild(c.Context, c)
}
}
// WithDeadline returns a copy of the parent context with the deadline adjusted
// to be no later than d. If the parent's deadline is already earlier than d,
// WithDeadline(parent, d) is semantically equivalent to parent. The returned
// context's Done channel is closed when the deadline expires, when the returned
// cancel function is called, or when the parent context's Done channel is
// closed, whichever happens first.
//
// Canceling this context releases resources associated with it, so code should
// call cancel as soon as the operations running in this Context complete.
func WithDeadline(parent Context, deadline time.Time) (Context, CancelFunc) {
if cur, ok := parent.Deadline(); ok && cur.Before(deadline) {
// The current deadline is already sooner than the new one.
return WithCancel(parent)
}
c := &timerCtx{
cancelCtx: newCancelCtx(parent),
deadline: deadline,
}
propagateCancel(parent, c)
d := deadline.Sub(time.Now())
if d <= 0 {
c.cancel(true, DeadlineExceeded) // deadline has already passed
return c, func() { c.cancel(true, Canceled) }
}
c.mu.Lock()
defer c.mu.Unlock()
if c.err == nil {
c.timer = time.AfterFunc(d, func() {
c.cancel(true, DeadlineExceeded)
})
}
return c, func() { c.cancel(true, Canceled) }
}
// A timerCtx carries a timer and a deadline. It embeds a cancelCtx to
// implement Done and Err. It implements cancel by stopping its timer then
// delegating to cancelCtx.cancel.
type timerCtx struct {
cancelCtx
timer *time.Timer // Under cancelCtx.mu.
deadline time.Time
}
func (c *timerCtx) Deadline() (deadline time.Time, ok bool) {
return c.deadline, true
}
func (c *timerCtx) String() string {
return fmt.Sprintf("%v.WithDeadline(%s [%s])", c.cancelCtx.Context, c.deadline, c.deadline.Sub(time.Now()))
}
func (c *timerCtx) cancel(removeFromParent bool, err error) {
c.cancelCtx.cancel(false, err)
if removeFromParent {
// Remove this timerCtx from its parent cancelCtx's children.
removeChild(c.cancelCtx.Context, c)
}
c.mu.Lock()
if c.timer != nil {
c.timer.Stop()
c.timer = nil
}
c.mu.Unlock()
}
// WithTimeout returns WithDeadline(parent, time.Now().Add(timeout)).
//
// Canceling this context releases resources associated with it, so code should
// call cancel as soon as the operations running in this Context complete:
//
// func slowOperationWithTimeout(ctx context.Context) (Result, error) {
// ctx, cancel := context.WithTimeout(ctx, 100*time.Millisecond)
// defer cancel() // releases resources if slowOperation completes before timeout elapses
// return slowOperation(ctx)
// }
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc) {
return WithDeadline(parent, time.Now().Add(timeout))
}
// WithValue returns a copy of parent in which the value associated with key is
// val.
//
// Use context Values only for request-scoped data that transits processes and
// APIs, not for passing optional parameters to functions.
func WithValue(parent Context, key interface{}, val interface{}) Context {
return &valueCtx{parent, key, val}
}
// A valueCtx carries a key-value pair. It implements Value for that key and
// delegates all other calls to the embedded Context.
type valueCtx struct {
Context
key, val interface{}
}
func (c *valueCtx) String() string {
return fmt.Sprintf("%v.WithValue(%#v, %#v)", c.Context, c.key, c.val)
}
func (c *valueCtx) Value(key interface{}) interface{} {
if c.key == key {
return c.val
}
return c.Context.Value(key)
}

View File

@ -1,10 +1,10 @@
package canal
import (
"context"
"fmt"
"io/ioutil"
"os"
"path"
"strconv"
"strings"
"sync"
@ -16,11 +16,8 @@ import (
"github.com/siddontang/go-mysql/mysql"
"github.com/siddontang/go-mysql/replication"
"github.com/siddontang/go-mysql/schema"
"github.com/siddontang/go/sync2"
)
var errCanalClosed = errors.New("canal was closed")
// Canal can sync your MySQL data into everywhere, like Elasticsearch, Redis, etc...
// MySQL must open row format for binlog
type Canal struct {
@ -28,50 +25,42 @@ type Canal struct {
cfg *Config
useGTID bool
master *masterInfo
dumper *dump.Dumper
dumpDoneCh chan struct{}
syncer *replication.BinlogSyncer
rsLock sync.Mutex
rsHandlers []RowsEventHandler
eventHandler EventHandler
connLock sync.Mutex
conn *client.Conn
wg sync.WaitGroup
tableLock sync.Mutex
tableLock sync.RWMutex
tables map[string]*schema.Table
quit chan struct{}
closed sync2.AtomicBool
ctx context.Context
cancel context.CancelFunc
}
func NewCanal(cfg *Config) (*Canal, error) {
c := new(Canal)
c.cfg = cfg
c.closed.Set(false)
c.quit = make(chan struct{})
os.MkdirAll(cfg.DataDir, 0755)
c.ctx, c.cancel = context.WithCancel(context.Background())
c.dumpDoneCh = make(chan struct{})
c.rsHandlers = make([]RowsEventHandler, 0, 4)
c.eventHandler = &DummyEventHandler{}
c.tables = make(map[string]*schema.Table)
c.master = &masterInfo{}
var err error
if c.master, err = loadMasterInfo(c.masterInfoPath()); err != nil {
return nil, errors.Trace(err)
} else if len(c.master.Addr) != 0 && c.master.Addr != c.cfg.Addr {
log.Infof("MySQL addr %s in old master.info, but new %s, reset", c.master.Addr, c.cfg.Addr)
// may use another MySQL, reset
c.master = &masterInfo{}
}
c.master.Addr = c.cfg.Addr
if err := c.prepareDumper(); err != nil {
if err = c.prepareDumper(); err != nil {
return nil, errors.Trace(err)
}
@ -114,6 +103,12 @@ func (c *Canal) prepareDumper() error {
c.dumper.AddTables(tableDB, tables...)
}
charset := c.cfg.Charset
c.dumper.SetCharset(charset)
c.dumper.SkipMasterData(c.cfg.Dump.SkipMasterData)
c.dumper.SetMaxAllowedPacket(c.cfg.Dump.MaxAllowedPacketMB)
for _, ignoreTable := range c.cfg.Dump.IgnoreTables {
if seps := strings.Split(ignoreTable, ","); len(seps) == 2 {
c.dumper.AddIgnoreTables(seps[0], seps[1])
@ -129,6 +124,8 @@ func (c *Canal) prepareDumper() error {
return nil
}
// Start will first try to dump all data from MySQL master `mysqldump`,
// then sync from the binlog position in the dump data.
func (c *Canal) Start() error {
c.wg.Add(1)
go c.run()
@ -136,55 +133,57 @@ func (c *Canal) Start() error {
return nil
}
func (c *Canal) run() error {
defer c.wg.Done()
// StartFrom will sync from the binlog position directly, ignore mysqldump.
func (c *Canal) StartFrom(pos mysql.Position) error {
c.useGTID = false
c.master.Update(pos)
if err := c.tryDump(); err != nil {
return c.Start()
}
func (c *Canal) StartFromGTID(set mysql.GTIDSet) error {
c.useGTID = true
c.master.UpdateGTID(set)
return c.Start()
}
func (c *Canal) run() error {
defer func() {
c.wg.Done()
c.cancel()
}()
err := c.tryDump()
close(c.dumpDoneCh)
if err != nil {
log.Errorf("canal dump mysql err: %v", err)
return errors.Trace(err)
}
close(c.dumpDoneCh)
if err := c.startSyncBinlog(); err != nil {
if !c.isClosed() {
if err = c.runSyncBinlog(); err != nil {
log.Errorf("canal start sync binlog err: %v", err)
}
return errors.Trace(err)
}
return nil
}
func (c *Canal) isClosed() bool {
return c.closed.Get()
}
func (c *Canal) Close() {
log.Infof("close canal")
log.Infof("closing canal")
c.m.Lock()
defer c.m.Unlock()
if c.isClosed() {
return
}
c.closed.Set(true)
close(c.quit)
c.cancel()
c.connLock.Lock()
c.conn.Close()
c.conn = nil
c.connLock.Unlock()
if c.syncer != nil {
c.syncer.Close()
c.syncer = nil
}
c.master.Close()
c.eventHandler.OnPosSynced(c.master.Position(), true)
c.wg.Wait()
}
@ -193,11 +192,15 @@ func (c *Canal) WaitDumpDone() <-chan struct{} {
return c.dumpDoneCh
}
func (c *Canal) Ctx() context.Context {
return c.ctx
}
func (c *Canal) GetTable(db string, table string) (*schema.Table, error) {
key := fmt.Sprintf("%s.%s", db, table)
c.tableLock.Lock()
c.tableLock.RLock()
t, ok := c.tables[key]
c.tableLock.Unlock()
c.tableLock.RUnlock()
if ok {
return t, nil
@ -205,6 +208,11 @@ func (c *Canal) GetTable(db string, table string) (*schema.Table, error) {
t, err := schema.NewTable(c, db, table)
if err != nil {
// check table not exists
if ok, err1 := schema.IsTableExist(c, db, table); err1 == nil && !ok {
return nil, schema.ErrTableNotExist
}
return nil, errors.Trace(err)
}
@ -215,6 +223,14 @@ func (c *Canal) GetTable(db string, table string) (*schema.Table, error) {
return t, nil
}
// ClearTableCache clear table cache
func (c *Canal) ClearTableCache(db []byte, table []byte) {
key := fmt.Sprintf("%s.%s", db, table)
c.tableLock.Lock()
delete(c.tables, key)
c.tableLock.Unlock()
}
// Check MySQL binlog row image, must be in FULL, MINIMAL, NOBLOB
func (c *Canal) CheckBinlogRowImage(image string) error {
// need to check MySQL binlog row image? full, minimal or noblob?
@ -263,17 +279,16 @@ func (c *Canal) prepareSyncer() error {
Port: uint16(port),
User: c.cfg.User,
Password: c.cfg.Password,
Charset: c.cfg.Charset,
HeartbeatPeriod: c.cfg.HeartbeatPeriod,
ReadTimeout: c.cfg.ReadTimeout,
}
c.syncer = replication.NewBinlogSyncer(&cfg)
c.syncer = replication.NewBinlogSyncer(cfg)
return nil
}
func (c *Canal) masterInfoPath() string {
return path.Join(c.cfg.DataDir, "master.info")
}
// Execute a SQL
func (c *Canal) Execute(cmd string, args ...interface{}) (rr *mysql.Result, err error) {
c.connLock.Lock()
@ -303,5 +318,5 @@ func (c *Canal) Execute(cmd string, args ...interface{}) (rr *mysql.Result, err
}
func (c *Canal) SyncedPosition() mysql.Position {
return c.master.Pos()
return c.master.Position()
}

View File

@ -1,10 +1,11 @@
package canal
import (
"bytes"
"flag"
"fmt"
"os"
"testing"
"time"
"github.com/ngaut/log"
. "github.com/pingcap/check"
@ -27,16 +28,16 @@ func (s *canalTestSuite) SetUpSuite(c *C) {
cfg := NewDefaultConfig()
cfg.Addr = fmt.Sprintf("%s:3306", *testHost)
cfg.User = "root"
cfg.HeartbeatPeriod = 200 * time.Millisecond
cfg.ReadTimeout = 300 * time.Millisecond
cfg.Dump.ExecutionPath = "mysqldump"
cfg.Dump.TableDB = "test"
cfg.Dump.Tables = []string{"canal_test"}
os.RemoveAll(cfg.DataDir)
var err error
s.c, err = NewCanal(cfg)
c.Assert(err, IsNil)
s.execute(c, "DROP TABLE IF EXISTS test.canal_test")
sql := `
CREATE TABLE IF NOT EXISTS test.canal_test (
id int AUTO_INCREMENT,
@ -52,12 +53,16 @@ func (s *canalTestSuite) SetUpSuite(c *C) {
s.execute(c, "SET GLOBAL binlog_format = 'ROW'")
s.c.RegRowsEventHandler(&testRowsEventHandler{})
s.c.SetEventHandler(&testEventHandler{})
err = s.c.Start()
c.Assert(err, IsNil)
}
func (s *canalTestSuite) TearDownSuite(c *C) {
// To test the heartbeat and read timeout,so need to sleep 1 seconds without data transmission
log.Infof("Start testing the heartbeat and read timeout")
time.Sleep(time.Second)
if s.c != nil {
s.c.Close()
s.c = nil
@ -70,16 +75,17 @@ func (s *canalTestSuite) execute(c *C, query string, args ...interface{}) *mysql
return r
}
type testRowsEventHandler struct {
type testEventHandler struct {
DummyEventHandler
}
func (h *testRowsEventHandler) Do(e *RowsEvent) error {
func (h *testEventHandler) Do(e *RowsEvent) error {
log.Infof("%s %v\n", e.Action, e.Rows)
return nil
}
func (h *testRowsEventHandler) String() string {
return "testRowsEventHandler"
func (h *testEventHandler) String() string {
return "testEventHandler"
}
func (s *canalTestSuite) TestCanal(c *C) {
@ -88,7 +94,50 @@ func (s *canalTestSuite) TestCanal(c *C) {
for i := 1; i < 10; i++ {
s.execute(c, "INSERT INTO test.canal_test (name) VALUES (?)", fmt.Sprintf("%d", i))
}
s.execute(c, "ALTER TABLE test.canal_test ADD `age` INT(5) NOT NULL AFTER `name`")
s.execute(c, "INSERT INTO test.canal_test (name,age) VALUES (?,?)", "d", "18")
err := s.c.CatchMasterPos(100)
err := s.c.CatchMasterPos(10 * time.Second)
c.Assert(err, IsNil)
}
func TestAlterTableExp(t *testing.T) {
cases := []string{
"ALTER TABLE `mydb`.`mytable` ADD `field2` DATE NULL AFTER `field1`;",
"ALTER TABLE `mytable` ADD `field2` DATE NULL AFTER `field1`;",
"ALTER TABLE mydb.mytable ADD `field2` DATE NULL AFTER `field1`;",
"ALTER TABLE mytable ADD `field2` DATE NULL AFTER `field1`;",
"ALTER TABLE mydb.mytable ADD field2 DATE NULL AFTER `field1`;",
}
table := []byte("mytable")
db := []byte("mydb")
for _, s := range cases {
m := expAlterTable.FindSubmatch([]byte(s))
if m == nil || !bytes.Equal(m[2], table) || (len(m[1]) > 0 && !bytes.Equal(m[1], db)) {
t.Fatalf("TestAlterTableExp: case %s failed\n", s)
}
}
}
func TestRenameTableExp(t *testing.T) {
cases := []string{
"rename table `mydb`.`mytable` to `mydb`.`mytable1`",
"rename table `mytable` to `mytable1`",
"rename table mydb.mytable to mydb.mytable1",
"rename table mytable to mytable1",
"rename table `mydb`.`mytable1` to `mydb`.`mytable2`, `mydb`.`mytable` to `mydb`.`mytable1`",
"rename table `mytable1` to `mytable2`, `mytable` to `mytable1`",
"rename table mydb.mytable1 to mydb.mytable2, mydb.mytable to mydb.mytable1",
"rename table mytable1 to mytable2, mytable to mytable1",
}
table := []byte("mytable1")
db := []byte("mydb")
for _, s := range cases {
m := expRenameTable.FindSubmatch([]byte(s))
if m == nil || !bytes.Equal(m[2], table) || (len(m[1]) > 0 && !bytes.Equal(m[1], db)) {
t.Fatalf("TestRenameTableExp: case %s failed\n", s)
}
}
}

View File

@ -7,6 +7,7 @@ import (
"github.com/BurntSushi/toml"
"github.com/juju/errors"
"github.com/siddontang/go-mysql/mysql"
)
type DumpConfig struct {
@ -25,6 +26,13 @@ type DumpConfig struct {
// If true, discard error msg, else, output to stderr
DiscardErr bool `toml:"discard_err"`
// Set true to skip --master-data if we have no privilege to do
// 'FLUSH TABLES WITH READ LOCK'
SkipMasterData bool `toml:"skip_master_data"`
// Set to change the default max_allowed_packet size
MaxAllowedPacketMB int `toml:"max_allowed_packet_mb"`
}
type Config struct {
@ -32,9 +40,11 @@ type Config struct {
User string `toml:"user"`
Password string `toml:"password"`
Charset string `toml:"charset"`
ServerID uint32 `toml:"server_id"`
Flavor string `toml:"flavor"`
DataDir string `toml:"data_dir"`
HeartbeatPeriod time.Duration `toml:"heartbeat_period"`
ReadTimeout time.Duration `toml:"read_timeout"`
Dump DumpConfig `toml:"dump"`
}
@ -66,14 +76,15 @@ func NewDefaultConfig() *Config {
c.User = "root"
c.Password = ""
c.Charset = mysql.DEFAULT_CHARSET
rand.Seed(time.Now().Unix())
c.ServerID = uint32(rand.Intn(1000)) + 1001
c.Flavor = "mysql"
c.DataDir = "./var"
c.Dump.ExecutionPath = "mysqldump"
c.Dump.DiscardErr = true
c.Dump.SkipMasterData = false
return c
}

View File

@ -7,6 +7,7 @@ import (
"github.com/juju/errors"
"github.com/ngaut/log"
"github.com/siddontang/go-mysql/dump"
"github.com/siddontang/go-mysql/mysql"
"github.com/siddontang/go-mysql/schema"
)
@ -23,8 +24,8 @@ func (h *dumpParseHandler) BinLog(name string, pos uint64) error {
}
func (h *dumpParseHandler) Data(db string, table string, values []string) error {
if h.c.isClosed() {
return errCanalClosed
if err := h.c.ctx.Err(); err != nil {
return err
}
tableInfo, err := h.c.GetTable(db, table)
@ -63,7 +64,7 @@ func (h *dumpParseHandler) Data(db string, table string, values []string) error
}
events := newRowsEvent(tableInfo, InsertAction, [][]interface{}{vs})
return h.c.travelRowsEventHandler(events)
return h.c.eventHandler.OnRow(events)
}
func (c *Canal) AddDumpDatabases(dbs ...string) {
@ -91,9 +92,11 @@ func (c *Canal) AddDumpIgnoreTables(db string, tables ...string) {
}
func (c *Canal) tryDump() error {
if len(c.master.Name) > 0 && c.master.Position > 0 {
pos := c.master.Position()
gtid := c.master.GTID()
if (len(pos.Name) > 0 && pos.Pos > 0) || gtid != nil {
// we will sync with binlog name and position
log.Infof("skip dump, use last binlog replication pos (%s, %d)", c.master.Name, c.master.Position)
log.Infof("skip dump, use last binlog replication pos %s or GTID %s", pos, gtid)
return nil
}
@ -104,6 +107,16 @@ func (c *Canal) tryDump() error {
h := &dumpParseHandler{c: c}
if c.cfg.Dump.SkipMasterData {
pos, err := c.GetMasterPos()
if err != nil {
return errors.Trace(err)
}
log.Infof("skip master data, get current binlog position %v", pos)
h.name = pos.Name
h.pos = uint64(pos.Pos)
}
start := time.Now()
log.Info("try dump MySQL and parse")
if err := c.dumper.DumpAndParse(h); err != nil {
@ -113,8 +126,8 @@ func (c *Canal) tryDump() error {
log.Infof("dump MySQL and parse OK, use %0.2f seconds, start binlog replication at (%s, %d)",
time.Now().Sub(start).Seconds(), h.name, h.pos)
c.master.Update(h.name, uint32(h.pos))
c.master.Save(true)
pos = mysql.Position{h.name, uint32(h.pos)}
c.master.Update(pos)
c.eventHandler.OnPosSynced(pos, true)
return nil
}

View File

@ -1,41 +1,36 @@
package canal
import (
"github.com/juju/errors"
"github.com/ngaut/log"
"github.com/siddontang/go-mysql/mysql"
"github.com/siddontang/go-mysql/replication"
)
var (
ErrHandleInterrupted = errors.New("do handler error, interrupted")
)
type RowsEventHandler interface {
// Handle RowsEvent, if return ErrHandleInterrupted, canal will
// stop the sync
Do(e *RowsEvent) error
type EventHandler interface {
OnRotate(roateEvent *replication.RotateEvent) error
OnDDL(nextPos mysql.Position, queryEvent *replication.QueryEvent) error
OnRow(e *RowsEvent) error
OnXID(nextPos mysql.Position) error
OnGTID(gtid mysql.GTIDSet) error
// OnPosSynced Use your own way to sync position. When force is true, sync position immediately.
OnPosSynced(pos mysql.Position, force bool) error
String() string
}
func (c *Canal) RegRowsEventHandler(h RowsEventHandler) {
c.rsLock.Lock()
c.rsHandlers = append(c.rsHandlers, h)
c.rsLock.Unlock()
type DummyEventHandler struct {
}
func (c *Canal) travelRowsEventHandler(e *RowsEvent) error {
c.rsLock.Lock()
defer c.rsLock.Unlock()
var err error
for _, h := range c.rsHandlers {
if err = h.Do(e); err != nil && !mysql.ErrorEqual(err, ErrHandleInterrupted) {
log.Errorf("handle %v err: %v", h, err)
} else if mysql.ErrorEqual(err, ErrHandleInterrupted) {
log.Errorf("handle %v err, interrupted", h)
return ErrHandleInterrupted
}
}
func (h *DummyEventHandler) OnRotate(*replication.RotateEvent) error { return nil }
func (h *DummyEventHandler) OnDDL(mysql.Position, *replication.QueryEvent) error {
return nil
}
func (h *DummyEventHandler) OnRow(*RowsEvent) error { return nil }
func (h *DummyEventHandler) OnXID(mysql.Position) error { return nil }
func (h *DummyEventHandler) OnGTID(mysql.GTIDSet) error { return nil }
func (h *DummyEventHandler) OnPosSynced(mysql.Position, bool) error { return nil }
func (h *DummyEventHandler) String() string { return "DummyEventHandler" }
// `SetEventHandler` registers the sync handler, you must register your
// own handler before starting Canal.
func (c *Canal) SetEventHandler(h EventHandler) {
c.eventHandler = h
}

View File

@ -1,89 +1,46 @@
package canal
import (
"bytes"
"os"
"sync"
"time"
"github.com/BurntSushi/toml"
"github.com/juju/errors"
"github.com/ngaut/log"
"github.com/siddontang/go-mysql/mysql"
"github.com/siddontang/go/ioutil2"
)
type masterInfo struct {
Addr string `toml:"addr"`
Name string `toml:"bin_name"`
Position uint32 `toml:"bin_pos"`
sync.RWMutex
name string
pos mysql.Position
l sync.Mutex
lastSaveTime time.Time
gtid mysql.GTIDSet
}
func loadMasterInfo(name string) (*masterInfo, error) {
var m masterInfo
func (m *masterInfo) Update(pos mysql.Position) {
log.Debugf("update master position %s", pos)
m.name = name
f, err := os.Open(name)
if err != nil && !os.IsNotExist(errors.Cause(err)) {
return nil, errors.Trace(err)
} else if os.IsNotExist(errors.Cause(err)) {
return &m, nil
}
defer f.Close()
_, err = toml.DecodeReader(f, &m)
return &m, err
m.Lock()
m.pos = pos
m.Unlock()
}
func (m *masterInfo) Save(force bool) error {
m.l.Lock()
defer m.l.Unlock()
func (m *masterInfo) UpdateGTID(gtid mysql.GTIDSet) {
log.Debugf("update master gtid %s", gtid.String())
n := time.Now()
if !force && n.Sub(m.lastSaveTime) < time.Second {
return nil
}
var buf bytes.Buffer
e := toml.NewEncoder(&buf)
e.Encode(m)
var err error
if err = ioutil2.WriteFileAtomic(m.name, buf.Bytes(), 0644); err != nil {
log.Errorf("canal save master info to file %s err %v", m.name, err)
}
m.lastSaveTime = n
return errors.Trace(err)
m.Lock()
m.gtid = gtid
m.Unlock()
}
func (m *masterInfo) Update(name string, pos uint32) {
m.l.Lock()
m.Name = name
m.Position = pos
m.l.Unlock()
func (m *masterInfo) Position() mysql.Position {
m.RLock()
defer m.RUnlock()
return m.pos
}
func (m *masterInfo) Pos() mysql.Position {
var pos mysql.Position
m.l.Lock()
pos.Name = m.Name
pos.Pos = m.Position
m.l.Unlock()
func (m *masterInfo) GTID() mysql.GTIDSet {
m.RLock()
defer m.RUnlock()
return pos
}
func (m *masterInfo) Close() {
m.Save(true)
return m.gtid
}

View File

@ -53,6 +53,16 @@ func GetPKValues(table *schema.Table, row []interface{}) ([]interface{}, error)
return values, nil
}
// Get term column's value
func GetColumnValue(table *schema.Table, column string, row []interface{}) (interface{}, error) {
index := table.FindColumn(column)
if index == -1 {
return nil, errors.Errorf("table %s has no column name %s", table, column)
}
return row[index], nil
}
// String implements fmt.Stringer interface.
func (r *RowsEvent) String() string {
return fmt.Sprintf("%s %s %v", r.Action, r.Table, r.Rows)

View File

@ -1,49 +1,66 @@
package canal
import (
"fmt"
"regexp"
"time"
"golang.org/x/net/context"
"github.com/juju/errors"
"github.com/ngaut/log"
"github.com/satori/go.uuid"
"github.com/siddontang/go-mysql/mysql"
"github.com/siddontang/go-mysql/replication"
"github.com/siddontang/go-mysql/schema"
)
func (c *Canal) startSyncBinlog() error {
pos := mysql.Position{c.master.Name, c.master.Position}
log.Infof("start sync binlog at %v", pos)
var (
expAlterTable = regexp.MustCompile("(?i)^ALTER\\sTABLE\\s.*?`{0,1}(.*?)`{0,1}\\.{0,1}`{0,1}([^`\\.]+?)`{0,1}\\s.*")
expRenameTable = regexp.MustCompile("(?i)^RENAME\\sTABLE.*TO\\s.*?`{0,1}(.*?)`{0,1}\\.{0,1}`{0,1}([^`\\.]+?)`{0,1}$")
)
func (c *Canal) startSyncer() (*replication.BinlogStreamer, error) {
if !c.useGTID {
pos := c.master.Position()
s, err := c.syncer.StartSync(pos)
if err != nil {
return errors.Errorf("start sync replication at %v error %v", pos, err)
return nil, errors.Errorf("start sync replication at binlog %v error %v", pos, err)
}
log.Infof("start sync binlog at binlog file %v", pos)
return s, nil
} else {
gset := c.master.GTID()
s, err := c.syncer.StartSyncGTID(gset)
if err != nil {
return nil, errors.Errorf("start sync replication at GTID %v error %v", gset, err)
}
log.Infof("start sync binlog at GTID %v", gset)
return s, nil
}
}
func (c *Canal) runSyncBinlog() error {
s, err := c.startSyncer()
if err != nil {
return err
}
timeout := time.Second
forceSavePos := false
savePos := false
force := false
for {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
ev, err := s.GetEvent(ctx)
cancel()
if err == context.DeadlineExceeded {
timeout = 2 * timeout
continue
}
ev, err := s.GetEvent(c.ctx)
if err != nil {
return errors.Trace(err)
}
savePos = false
force = false
pos := c.master.Position()
timeout = time.Second
curPos := pos.Pos
//next binlog pos
pos.Pos = ev.Header.LogPos
forceSavePos = false
// We only save position with RotateEvent and XIDEvent.
// For RowsEvent, we can't save the position until meeting XIDEvent
// which tells the whole transaction is over.
@ -52,24 +69,69 @@ func (c *Canal) startSyncBinlog() error {
case *replication.RotateEvent:
pos.Name = string(e.NextLogName)
pos.Pos = uint32(e.Position)
// r.ev <- pos
forceSavePos = true
log.Infof("rotate binlog to %v", pos)
log.Infof("rotate binlog to %s", pos)
savePos = true
force = true
if err = c.eventHandler.OnRotate(e); err != nil {
return errors.Trace(err)
}
case *replication.RowsEvent:
// we only focus row based event
if err = c.handleRowsEvent(ev); err != nil {
log.Errorf("handle rows event error %v", err)
err = c.handleRowsEvent(ev)
if err != nil && errors.Cause(err) != schema.ErrTableNotExist {
// We can ignore table not exist error
log.Errorf("handle rows event at (%s, %d) error %v", pos.Name, curPos, err)
return errors.Trace(err)
}
continue
case *replication.XIDEvent:
savePos = true
// try to save the position later
if err := c.eventHandler.OnXID(pos); err != nil {
return errors.Trace(err)
}
case *replication.MariadbGTIDEvent:
// try to save the GTID later
gtid := e.GTID
c.master.UpdateGTID(gtid)
if err := c.eventHandler.OnGTID(gtid); err != nil {
return errors.Trace(err)
}
case *replication.GTIDEvent:
u, _ := uuid.FromBytes(e.SID)
gset, err := mysql.ParseMysqlGTIDSet(fmt.Sprintf("%s:%d", u.String(), e.GNO))
if err != nil {
return errors.Trace(err)
}
c.master.UpdateGTID(gset)
if err := c.eventHandler.OnGTID(gset); err != nil {
return errors.Trace(err)
}
case *replication.QueryEvent:
if mb := checkRenameTable(e); mb != nil {
if len(mb[1]) == 0 {
mb[1] = e.Schema
}
savePos = true
force = true
c.ClearTableCache(mb[1], mb[2])
log.Infof("table structure changed, clear table cache: %s.%s\n", mb[1], mb[2])
if err = c.eventHandler.OnDDL(pos, e); err != nil {
return errors.Trace(err)
}
} else {
// skip others
continue
}
default:
continue
}
c.master.Update(pos.Name, pos.Pos)
c.master.Save(forceSavePos)
if savePos {
c.master.Update(pos)
c.eventHandler.OnPosSynced(pos, force)
}
}
return nil
@ -98,24 +160,21 @@ func (c *Canal) handleRowsEvent(e *replication.BinlogEvent) error {
return errors.Errorf("%s not supported now", e.Header.EventType)
}
events := newRowsEvent(t, action, ev.Rows)
return c.travelRowsEventHandler(events)
return c.eventHandler.OnRow(events)
}
func (c *Canal) WaitUntilPos(pos mysql.Position, timeout int) error {
if timeout <= 0 {
timeout = 60
}
timer := time.NewTimer(time.Duration(timeout) * time.Second)
func (c *Canal) WaitUntilPos(pos mysql.Position, timeout time.Duration) error {
timer := time.NewTimer(timeout)
for {
select {
case <-timer.C:
return errors.Errorf("wait position %v err", pos)
return errors.Errorf("wait position %v too long > %s", pos, timeout)
default:
curpos := c.master.Pos()
if curpos.Compare(pos) >= 0 {
curPos := c.master.Position()
if curPos.Compare(pos) >= 0 {
return nil
} else {
log.Debugf("master pos is %v, wait catching %v", curPos, pos)
time.Sleep(100 * time.Millisecond)
}
}
@ -124,14 +183,32 @@ func (c *Canal) WaitUntilPos(pos mysql.Position, timeout int) error {
return nil
}
func (c *Canal) CatchMasterPos(timeout int) error {
func (c *Canal) GetMasterPos() (mysql.Position, error) {
rr, err := c.Execute("SHOW MASTER STATUS")
if err != nil {
return errors.Trace(err)
return mysql.Position{"", 0}, errors.Trace(err)
}
name, _ := rr.GetString(0, 0)
pos, _ := rr.GetInt(0, 1)
return c.WaitUntilPos(mysql.Position{name, uint32(pos)}, timeout)
return mysql.Position{name, uint32(pos)}, nil
}
func (c *Canal) CatchMasterPos(timeout time.Duration) error {
pos, err := c.GetMasterPos()
if err != nil {
return errors.Trace(err)
}
return c.WaitUntilPos(pos, timeout)
}
func checkRenameTable(e *replication.QueryEvent) [][]byte {
var mb = [][]byte{}
if mb = expAlterTable.FindSubmatch(e.Query); mb != nil {
return mb
}
mb = expRenameTable.FindSubmatch(e.Query)
return mb
}

View File

@ -7,8 +7,10 @@ import (
"os/signal"
"strings"
"syscall"
"time"
"github.com/siddontang/go-mysql/canal"
"github.com/siddontang/go-mysql/mysql"
)
var host = flag.String("host", "127.0.0.1", "MySQL host")
@ -18,8 +20,6 @@ var password = flag.String("password", "", "MySQL password")
var flavor = flag.String("flavor", "mysql", "Flavor: mysql or mariadb")
var dataDir = flag.String("data-dir", "./var", "Path to store data, like master.info")
var serverID = flag.Int("server-id", 101, "Unique Server ID")
var mysqldump = flag.String("mysqldump", "mysqldump", "mysqldump execution path")
@ -28,6 +28,12 @@ var tables = flag.String("tables", "", "dump tables, seperated by comma, will ov
var tableDB = flag.String("table_db", "test", "database for dump tables")
var ignoreTables = flag.String("ignore_tables", "", "ignore tables, must be database.table format, separated by comma")
var startName = flag.String("bin_name", "", "start sync from binlog name")
var startPos = flag.Uint("bin_pos", 0, "start sync from binlog position of")
var heartbeatPeriod = flag.Duration("heartbeat", 60*time.Second, "master heartbeat period")
var readTimeout = flag.Duration("read_timeout", 90*time.Second, "connection read timeout")
func main() {
flag.Parse()
@ -36,8 +42,9 @@ func main() {
cfg.User = *user
cfg.Password = *password
cfg.Flavor = *flavor
cfg.DataDir = *dataDir
cfg.ReadTimeout = *readTimeout
cfg.HeartbeatPeriod = *heartbeatPeriod
cfg.ServerID = uint32(*serverID)
cfg.Dump.ExecutionPath = *mysqldump
cfg.Dump.DiscardErr = false
@ -65,9 +72,14 @@ func main() {
c.AddDumpDatabases(subs...)
}
c.RegRowsEventHandler(&handler{})
c.SetEventHandler(&handler{})
err = c.Start()
startPos := mysql.Position{
*startName,
uint32(*startPos),
}
err = c.StartFrom(startPos)
if err != nil {
fmt.Printf("start canal err %V", err)
os.Exit(1)
@ -88,9 +100,10 @@ func main() {
}
type handler struct {
canal.DummyEventHandler
}
func (h *handler) Do(e *canal.RowsEvent) error {
func (h *handler) OnRow(e *canal.RowsEvent) error {
fmt.Printf("%v\n", e)
return nil

View File

@ -4,12 +4,11 @@
package main
import (
"context"
"flag"
"fmt"
"os"
"golang.org/x/net/context"
"github.com/juju/errors"
"github.com/siddontang/go-mysql/mysql"
"github.com/siddontang/go-mysql/replication"
@ -41,11 +40,11 @@ func main() {
Port: uint16(*port),
User: *user,
Password: *password,
RawModeEanbled: *rawMode,
RawModeEnabled: *rawMode,
SemiSyncEnabled: *semiSync,
}
b := replication.NewBinlogSyncer(&cfg)
b := replication.NewBinlogSyncer(cfg)
pos := mysql.Position{*file, uint32(*pos)}
if len(*backupPath) > 0 {

View File

@ -8,6 +8,7 @@ import (
"strings"
"github.com/juju/errors"
. "github.com/siddontang/go-mysql/mysql"
)
// Unlick mysqldump, Dumper is designed for parsing and syning data easily.
@ -25,9 +26,14 @@ type Dumper struct {
Databases []string
Charset string
IgnoreTables map[string][]string
ErrOut io.Writer
masterDataSkipped bool
maxAllowedPacket int
}
func NewDumper(executionPath string, addr string, user string, password string) (*Dumper, error) {
@ -47,17 +53,32 @@ func NewDumper(executionPath string, addr string, user string, password string)
d.Password = password
d.Tables = make([]string, 0, 16)
d.Databases = make([]string, 0, 16)
d.Charset = DEFAULT_CHARSET
d.IgnoreTables = make(map[string][]string)
d.masterDataSkipped = false
d.ErrOut = os.Stderr
return d, nil
}
func (d *Dumper) SetCharset(charset string) {
d.Charset = charset
}
func (d *Dumper) SetErrOut(o io.Writer) {
d.ErrOut = o
}
// In some cloud MySQL, we have no privilege to use `--master-data`.
func (d *Dumper) SkipMasterData(v bool) {
d.masterDataSkipped = v
}
func (d *Dumper) SetMaxAllowedPacket(i int) {
d.maxAllowedPacket = i
}
func (d *Dumper) AddDatabases(dbs ...string) {
d.Databases = append(d.Databases, dbs...)
}
@ -97,7 +118,14 @@ func (d *Dumper) Dump(w io.Writer) error {
args = append(args, fmt.Sprintf("--user=%s", d.User))
args = append(args, fmt.Sprintf("--password=%s", d.Password))
if !d.masterDataSkipped {
args = append(args, "--master-data")
}
if d.maxAllowedPacket > 0 {
args = append(args, fmt.Sprintf("--max_allowed_packet=%dM", d.maxAllowedPacket))
}
args = append(args, "--single-transaction")
args = append(args, "--skip-lock-tables")
@ -133,6 +161,10 @@ func (d *Dumper) Dump(w io.Writer) error {
w.Write([]byte(fmt.Sprintf("USE `%s`;\n", d.TableDB)))
}
if len(d.Charset) != 0 {
args = append(args, fmt.Sprintf("--default-character-set=%s", d.Charset))
}
cmd := exec.Command(d.ExecutionPath, args...)
cmd.Stderr = d.ErrOut
@ -147,7 +179,7 @@ func (d *Dumper) DumpAndParse(h ParseHandler) error {
done := make(chan error, 1)
go func() {
err := Parse(r, h)
err := Parse(r, h, !d.masterDataSkipped)
r.CloseWithError(err)
done <- err
}()

View File

@ -38,6 +38,7 @@ func (s *schemaTestSuite) SetUpSuite(c *C) {
c.Assert(err, IsNil)
c.Assert(s.d, NotNil)
s.d.SetCharset("utf8")
s.d.SetErrOut(os.Stderr)
_, err = s.conn.Execute("CREATE DATABASE IF NOT EXISTS test1")
@ -177,7 +178,7 @@ func (s *schemaTestSuite) TestParse(c *C) {
err := s.d.Dump(&buf)
c.Assert(err, IsNil)
err = Parse(&buf, new(testParseHandler))
err = Parse(&buf, new(testParseHandler), true)
c.Assert(err, IsNil)
}

View File

@ -34,7 +34,7 @@ func init() {
// Parse the dump data with Dumper generate.
// It can not parse all the data formats with mysqldump outputs
func Parse(r io.Reader, h ParseHandler) error {
func Parse(r io.Reader, h ParseHandler, parseBinlogPos bool) error {
rb := bufio.NewReaderSize(r, 1024*16)
var db string
@ -50,7 +50,7 @@ func Parse(r io.Reader, h ParseHandler) error {
line = line[0 : len(line)-1]
if !binlogParsed {
if parseBinlogPos && !binlogParsed {
if m := binlogExp.FindAllStringSubmatch(line, -1); len(m) == 1 {
name := m[0][1]
pos, err := strconv.ParseUint(m[0][2], 10, 64)

View File

@ -23,8 +23,4 @@ imports:
- hack
- ioutil2
- sync2
- name: golang.org/x/net
version: 6acef71eb69611914f7a30939ea9f6e194c78172
subpackages:
- context
testImports: []

View File

@ -22,5 +22,3 @@ import:
- hack
- ioutil2
- sync2
- package: golang.org/x/net
version: 6acef71eb69611914f7a30939ea9f6e194c78172

View File

@ -57,3 +57,10 @@ func NewError(errCode uint16, message string) *MyError {
return e
}
func ErrorCode(errMsg string) (code int) {
var tmpStr string
// golang scanf doesn't support %*,so I used a temporary variable
fmt.Sscanf(errMsg, "%s%d", &tmpStr, &code)
return
}

View File

@ -11,6 +11,8 @@ type GTIDSet interface {
Equal(o GTIDSet) bool
Contain(o GTIDSet) bool
Update(GTIDStr string) error
}
func ParseGTIDSet(flavor string, s string) (GTIDSet, error) {

View File

@ -78,3 +78,14 @@ func (gtid MariadbGTID) Contain(o GTIDSet) bool {
return gtid.DomainID == other.DomainID && gtid.SequenceNumber >= other.SequenceNumber
}
func (gtid MariadbGTID) Update(GTIDStr string) error {
newGTID, err := ParseMariadbGTIDSet(GTIDStr)
if err != nil {
return err
}
gtid = newGTID.(MariadbGTID)
return nil
}

View File

@ -291,11 +291,13 @@ type MysqlGTIDSet struct {
func ParseMysqlGTIDSet(str string) (GTIDSet, error) {
s := new(MysqlGTIDSet)
s.Sets = make(map[string]*UUIDSet)
if str == "" {
return s, nil
}
sp := strings.Split(str, ",")
s.Sets = make(map[string]*UUIDSet, len(sp))
//todo, handle redundant same uuid
for i := 0; i < len(sp); i++ {
if set, err := ParseUUIDSet(sp[i]); err != nil {
@ -334,6 +336,9 @@ func DecodeMysqlGTIDSet(data []byte) (*MysqlGTIDSet, error) {
}
func (s *MysqlGTIDSet) AddSet(set *UUIDSet) {
if set == nil {
return
}
sid := set.SID.String()
o, ok := s.Sets[sid]
if ok {
@ -343,6 +348,17 @@ func (s *MysqlGTIDSet) AddSet(set *UUIDSet) {
}
}
func (s *MysqlGTIDSet) Update(GTIDStr string) error {
uuidSet, err := ParseUUIDSet(GTIDStr)
if err != nil {
return err
}
s.AddSet(uuidSet)
return nil
}
func (s *MysqlGTIDSet) Contain(o GTIDSet) bool {
sub, ok := o.(*MysqlGTIDSet)
if !ok {

View File

@ -151,3 +151,19 @@ func (t *mysqlTestSuite) TestMysqlParseBinaryUint64(c *check.C) {
u64 := ParseBinaryUint64([]byte{1, 2, 3, 4, 5, 6, 7, 128})
c.Assert(u64, check.Equals, 128*uint64(72057594037927936)+7*uint64(281474976710656)+6*uint64(1099511627776)+5*uint64(4294967296)+4*16777216+3*65536+2*256+1)
}
func (t *mysqlTestSuite) TestErrorCode(c *check.C) {
tbls := []struct {
msg string
code int
}{
{"ERROR 1094 (HY000): Unknown thread id: 1094", 1094},
{"error string", 0},
{"abcdefg", 0},
{"123455 ks094", 0},
{"ERROR 1046 (3D000): Unknown error 1046", 1046},
}
for _, v := range tbls {
c.Assert(ErrorCode(v.msg), check.Equals, v.code)
}
}

View File

@ -10,6 +10,7 @@ import (
"strings"
"github.com/juju/errors"
"github.com/ngaut/log"
"github.com/siddontang/go/hack"
)
@ -351,4 +352,7 @@ func init() {
EncodeMap[byte(i)] = to
}
}
// Disable highlight by default
log.SetHighlighting(false)
}

View File

@ -1,13 +1,12 @@
package replication
import (
"context"
"io"
"os"
"path"
"time"
"golang.org/x/net/context"
"github.com/juju/errors"
. "github.com/siddontang/go-mysql/mysql"
)

View File

@ -1,7 +1,7 @@
package replication
import (
"golang.org/x/net/context"
"context"
"github.com/juju/errors"
"github.com/ngaut/log"

View File

@ -1,17 +1,18 @@
package replication
import (
"context"
"crypto/tls"
"encoding/binary"
"fmt"
"net"
"os"
"sync"
"time"
"golang.org/x/net/context"
"github.com/juju/errors"
"github.com/ngaut/log"
"github.com/satori/go.uuid"
"github.com/siddontang/go-mysql/client"
. "github.com/siddontang/go-mysql/mysql"
)
@ -40,21 +41,37 @@ type BinlogSyncerConfig struct {
// If not set, use os.Hostname() instead.
Localhost string
// Charset is for MySQL client character set
Charset string
// SemiSyncEnabled enables semi-sync or not.
SemiSyncEnabled bool
// RawModeEanbled is for not parsing binlog event.
RawModeEanbled bool
// RawModeEnabled is for not parsing binlog event.
RawModeEnabled bool
// If not nil, use the provided tls.Config to connect to the database using TLS/SSL.
TLSConfig *tls.Config
// Use replication.Time structure for timestamp and datetime.
// We will use Local location for timestamp and UTC location for datatime.
ParseTime bool
// RecvBufferSize sets the size in bytes of the operating system's receive buffer associated with the connection.
RecvBufferSize int
// master heartbeat period
HeartbeatPeriod time.Duration
// read timeout
ReadTimeout time.Duration
}
// BinlogSyncer syncs binlog event from server.
type BinlogSyncer struct {
m sync.RWMutex
cfg *BinlogSyncerConfig
cfg BinlogSyncerConfig
c *client.Conn
@ -64,22 +81,33 @@ type BinlogSyncer struct {
nextPos Position
useGTID bool
gset GTIDSet
running bool
ctx context.Context
cancel context.CancelFunc
lastConnectionID uint32
}
// NewBinlogSyncer creates the BinlogSyncer with cfg.
func NewBinlogSyncer(cfg *BinlogSyncerConfig) *BinlogSyncer {
func NewBinlogSyncer(cfg BinlogSyncerConfig) *BinlogSyncer {
// Clear the Password to avoid outputing it in log.
pass := cfg.Password
cfg.Password = ""
log.Infof("create BinlogSyncer with config %v", cfg)
cfg.Password = pass
b := new(BinlogSyncer)
b.cfg = cfg
b.parser = NewBinlogParser()
b.parser.SetRawMode(b.cfg.RawModeEanbled)
b.parser.SetRawMode(b.cfg.RawModeEnabled)
b.parser.SetParseTime(b.cfg.ParseTime)
b.useGTID = false
b.running = false
b.ctx, b.cancel = context.WithCancel(context.Background())
@ -140,6 +168,37 @@ func (b *BinlogSyncer) registerSlave() error {
return errors.Trace(err)
}
if len(b.cfg.Charset) != 0 {
b.c.SetCharset(b.cfg.Charset)
}
//set read timeout
if b.cfg.ReadTimeout > 0 {
b.c.SetReadDeadline(time.Now().Add(b.cfg.ReadTimeout))
}
if b.cfg.RecvBufferSize > 0 {
if tcp, ok := b.c.Conn.Conn.(*net.TCPConn); ok {
tcp.SetReadBuffer(b.cfg.RecvBufferSize)
}
}
// kill last connection id
if b.lastConnectionID > 0 {
cmd := fmt.Sprintf("KILL %d", b.lastConnectionID)
if _, err := b.c.Execute(cmd); err != nil {
log.Errorf("kill connection %d error %v", b.lastConnectionID, err)
// Unknown thread id
if code := ErrorCode(err.Error()); code != ER_NO_SUCH_THREAD {
return errors.Trace(err)
}
}
log.Infof("kill last connection id %d", b.lastConnectionID)
}
// save last last connection id for kill
b.lastConnectionID = b.c.GetConnectionID()
//for mysql 5.6+, binlog has a crc32 checksum
//before mysql 5.6, this will not work, don't matter.:-)
if r, err := b.c.Execute("SHOW GLOBAL VARIABLES LIKE 'BINLOG_CHECKSUM'"); err != nil {
@ -175,6 +234,14 @@ func (b *BinlogSyncer) registerSlave() error {
}
}
if b.cfg.HeartbeatPeriod > 0 {
_, err = b.c.Execute(fmt.Sprintf("SET @master_heartbeat_period=%d;", b.cfg.HeartbeatPeriod))
if err != nil {
log.Error("failed to set @master_heartbeat_period=%d", b.cfg.HeartbeatPeriod, err)
return errors.Trace(err)
}
}
if err = b.writeRegisterSlaveCommand(); err != nil {
return errors.Trace(err)
}
@ -236,6 +303,11 @@ func (b *BinlogSyncer) startDumpStream() *BinlogStreamer {
return s
}
// GetNextPosition returns the next position of the syncer
func (b *BinlogSyncer) GetNextPosition() Position {
return b.nextPos
}
// StartSync starts syncing from the `pos` position.
func (b *BinlogSyncer) StartSync(pos Position) (*BinlogStreamer, error) {
log.Infof("begin to sync binlog from position %s", pos)
@ -258,6 +330,9 @@ func (b *BinlogSyncer) StartSync(pos Position) (*BinlogStreamer, error) {
func (b *BinlogSyncer) StartSyncGTID(gset GTIDSet) (*BinlogStreamer, error) {
log.Infof("begin to sync binlog from GTID %s", gset)
b.useGTID = true
b.gset = gset
b.m.Lock()
defer b.m.Unlock()
@ -284,7 +359,7 @@ func (b *BinlogSyncer) StartSyncGTID(gset GTIDSet) (*BinlogStreamer, error) {
return b.startDumpStream(), nil
}
func (b *BinlogSyncer) writeBinglogDumpCommand(p Position) error {
func (b *BinlogSyncer) writeBinlogDumpCommand(p Position) error {
b.c.ResetSequence()
data := make([]byte, 4+1+4+2+4+len(p.Name))
@ -364,7 +439,7 @@ func (b *BinlogSyncer) writeBinlogDumpMariadbGTIDCommand(gset GTIDSet) error {
}
// Since we use @slave_connect_state, the file and position here are ignored.
return b.writeBinglogDumpCommand(Position{"", 0})
return b.writeBinlogDumpCommand(Position{"", 0})
}
// localHostname returns the hostname that register slave would register as.
@ -449,12 +524,20 @@ func (b *BinlogSyncer) retrySync() error {
b.m.Lock()
defer b.m.Unlock()
log.Infof("begin to re-sync from %s", b.nextPos)
b.parser.Reset()
if b.useGTID {
log.Infof("begin to re-sync from %s", b.gset.String())
if err := b.prepareSyncGTID(b.gset); err != nil {
return errors.Trace(err)
}
} else {
log.Infof("begin to re-sync from %s", b.nextPos)
if err := b.prepareSyncPos(b.nextPos); err != nil {
return errors.Trace(err)
}
}
return nil
}
@ -469,13 +552,33 @@ func (b *BinlogSyncer) prepareSyncPos(pos Position) error {
return errors.Trace(err)
}
if err := b.writeBinglogDumpCommand(pos); err != nil {
if err := b.writeBinlogDumpCommand(pos); err != nil {
return errors.Trace(err)
}
return nil
}
func (b *BinlogSyncer) prepareSyncGTID(gset GTIDSet) error {
var err error
if err = b.prepare(); err != nil {
return errors.Trace(err)
}
if b.cfg.Flavor != MariaDBFlavor {
// default use MySQL
err = b.writeBinlogDumpMysqlGTIDCommand(gset)
} else {
err = b.writeBinlogDumpMariadbGTIDCommand(gset)
}
if err != nil {
return err
}
return nil
}
func (b *BinlogSyncer) onStream(s *BinlogStreamer) {
defer func() {
if e := recover(); e != nil {
@ -490,8 +593,8 @@ func (b *BinlogSyncer) onStream(s *BinlogStreamer) {
log.Error(err)
// we meet connection error, should re-connect again with
// last nextPos we got.
if len(b.nextPos.Name) == 0 {
// last nextPos or nextGTID we got.
if len(b.nextPos.Name) == 0 && b.gset == nil {
// we can't get the correct position, close.
s.closeWithError(err)
return
@ -517,6 +620,11 @@ func (b *BinlogSyncer) onStream(s *BinlogStreamer) {
continue
}
//set read timeout
if b.cfg.ReadTimeout > 0 {
b.c.SetReadDeadline(time.Now().Add(b.cfg.ReadTimeout))
}
switch data[0] {
case OK_HEADER:
if err = b.parseEvent(s, data); err != nil {
@ -552,7 +660,7 @@ func (b *BinlogSyncer) parseEvent(s *BinlogStreamer, data []byte) error {
data = data[2:]
}
e, err := b.parser.parse(data)
e, err := b.parser.Parse(data)
if err != nil {
return errors.Trace(err)
}
@ -561,11 +669,33 @@ func (b *BinlogSyncer) parseEvent(s *BinlogStreamer, data []byte) error {
// Some events like FormatDescriptionEvent return 0, ignore.
b.nextPos.Pos = e.Header.LogPos
}
if re, ok := e.Event.(*RotateEvent); ok {
b.nextPos.Name = string(re.NextLogName)
b.nextPos.Pos = uint32(re.Position)
switch event := e.Event.(type) {
case *RotateEvent:
b.nextPos.Name = string(event.NextLogName)
b.nextPos.Pos = uint32(event.Position)
log.Infof("rotate to %s", b.nextPos)
case *GTIDEvent:
if !b.useGTID {
break
}
u, _ := uuid.FromBytes(event.SID)
err := b.gset.Update(fmt.Sprintf("%s:%d", u.String(), event.GNO))
if err != nil {
return errors.Trace(err)
}
case *MariadbGTIDEvent:
if !b.useGTID {
break
}
GTID := event.GTID
err := b.gset.Update(fmt.Sprintf("%d-%d-%d", GTID.DomainID, GTID.ServerID, GTID.SequenceNumber))
if err != nil {
return errors.Trace(err)
}
case *XIDEvent:
event.GSet = b.getGtidSet()
case *QueryEvent:
event.GSet = b.getGtidSet()
}
needStop := false
@ -588,3 +718,19 @@ func (b *BinlogSyncer) parseEvent(s *BinlogStreamer, data []byte) error {
return nil
}
func (b *BinlogSyncer) getGtidSet() GTIDSet {
var gtidSet GTIDSet
if !b.useGTID {
return nil
}
if b.cfg.Flavor != MariaDBFlavor {
gtidSet, _ = ParseGTIDSet(MySQLFlavor, b.gset.String())
} else {
gtidSet, _ = ParseGTIDSet(MariaDBFlavor, b.gset.String())
}
return gtidSet
}

View File

@ -216,6 +216,9 @@ func (e *RotateEvent) Dump(w io.Writer) {
type XIDEvent struct {
XID uint64
// in fact XIDEvent dosen't have the GTIDSet information, just for beneficial to use
GSet GTIDSet
}
func (e *XIDEvent) Decode(data []byte) error {
@ -225,6 +228,9 @@ func (e *XIDEvent) Decode(data []byte) error {
func (e *XIDEvent) Dump(w io.Writer) {
fmt.Fprintf(w, "XID: %d\n", e.XID)
if e.GSet != nil {
fmt.Fprintf(w, "GTIDSet: %s\n", e.GSet.String())
}
fmt.Fprintln(w)
}
@ -235,6 +241,9 @@ type QueryEvent struct {
StatusVars []byte
Schema []byte
Query []byte
// in fact QueryEvent dosen't have the GTIDSet information, just for beneficial to use
GSet GTIDSet
}
func (e *QueryEvent) Decode(data []byte) error {
@ -275,6 +284,9 @@ func (e *QueryEvent) Dump(w io.Writer) {
//fmt.Fprintf(w, "Status vars: \n%s", hex.Dump(e.StatusVars))
fmt.Fprintf(w, "Schema: %s\n", e.Schema)
fmt.Fprintf(w, "Query: %s\n", e.Query)
if e.GSet != nil {
fmt.Fprintf(w, "GTIDSet: %s\n", e.GSet.String())
}
fmt.Fprintln(w)
}

View File

@ -338,7 +338,7 @@ func (d *jsonBinaryDecoder) decodeString(data []byte) string {
l, n := d.decodeVariableLength(data)
if d.isDataShort(data, int(l)+n) {
if d.isDataShort(data, l+n) {
return ""
}
@ -358,11 +358,11 @@ func (d *jsonBinaryDecoder) decodeOpaque(data []byte) interface{} {
l, n := d.decodeVariableLength(data)
if d.isDataShort(data, int(l)+n) {
if d.isDataShort(data, l+n) {
return nil
}
data = data[n : int(l)+n]
data = data[n : l+n]
switch tp {
case MYSQL_TYPE_NEWDECIMAL:
@ -459,7 +459,7 @@ func (d *jsonBinaryDecoder) decodeVariableLength(data []byte) (int, int) {
length := uint64(0)
for ; pos < maxCount; pos++ {
v := data[pos]
length = (length << 7) + uint64(v&0x7F)
length |= uint64(v & 0x7F) << uint(7 * pos)
if v&0x80 == 0 {
if length > math.MaxUint32 {

View File

@ -16,6 +16,8 @@ type BinlogParser struct {
// for rawMode, we only parse FormatDescriptionEvent and RotateEvent
rawMode bool
parseTime bool
}
func NewBinlogParser() *BinlogParser {
@ -54,12 +56,10 @@ func (p *BinlogParser) ParseFile(name string, offset int64, onEvent OnEventFunc)
return errors.Errorf("seek %s to %d error %v", name, offset, err)
}
return p.parseReader(f, onEvent)
return p.ParseReader(f, onEvent)
}
func (p *BinlogParser) parseReader(r io.Reader, onEvent OnEventFunc) error {
p.Reset()
func (p *BinlogParser) ParseReader(r io.Reader, onEvent OnEventFunc) error {
var err error
var n int64
@ -100,7 +100,10 @@ func (p *BinlogParser) parseReader(r io.Reader, onEvent OnEventFunc) error {
var e Event
e, err = p.parseEvent(h, data)
if err != nil {
break
if _, ok := err.(errMissingTableMapEvent); ok {
continue
}
return errors.Trace(err)
}
if err = onEvent(&BinlogEvent{rawData, h, e}); err != nil {
@ -115,6 +118,10 @@ func (p *BinlogParser) SetRawMode(mode bool) {
p.rawMode = mode
}
func (p *BinlogParser) SetParseTime(parseTime bool) {
p.parseTime = parseTime
}
func (p *BinlogParser) parseHeader(data []byte) (*EventHeader, error) {
h := new(EventHeader)
err := h.Decode(data)
@ -206,7 +213,13 @@ func (p *BinlogParser) parseEvent(h *EventHeader, data []byte) (Event, error) {
return e, nil
}
func (p *BinlogParser) parse(data []byte) (*BinlogEvent, error) {
// Given the bytes for a a binary log event: return the decoded event.
// With the exception of the FORMAT_DESCRIPTION_EVENT event type
// there must have previously been passed a FORMAT_DESCRIPTION_EVENT
// into the parser for this to work properly on any given event.
// Passing a new FORMAT_DESCRIPTION_EVENT into the parser will replace
// an existing one.
func (p *BinlogParser) Parse(data []byte) (*BinlogEvent, error) {
rawData := data
h, err := p.parseHeader(data)
@ -240,6 +253,7 @@ func (p *BinlogParser) newRowsEvent(h *EventHeader) *RowsEvent {
e.needBitmap2 = false
e.tables = p.tables
e.parseTime = p.parseTime
switch h.EventType {
case WRITE_ROWS_EVENTv0:

View File

@ -29,7 +29,7 @@ func (t *testSyncerSuite) TestIndexOutOfRange(c *C) {
0x3065f: &TableMapEvent{tableIDSize: 6, TableID: 0x3065f, Flags: 0x1, Schema: []uint8{0x73, 0x65, 0x69, 0x75, 0x6d, 0x61, 0x73, 0x74, 0x65, 0x72}, Table: []uint8{0x63, 0x6f, 0x6e, 0x73, 0x5f, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x73, 0x70, 0x65, 0x61, 0x6b, 0x6f, 0x75, 0x74, 0x5f, 0x6c, 0x65, 0x74, 0x74, 0x65, 0x72}, ColumnCount: 0xd, ColumnType: []uint8{0x3, 0x3, 0x3, 0x3, 0x1, 0x12, 0xf, 0xf, 0x12, 0xf, 0xf, 0x3, 0xf}, ColumnMeta: []uint16{0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x180, 0x180, 0x0, 0x180, 0x180, 0x0, 0x2fd}, NullBitmap: []uint8{0xe0, 0x17}},
}
_, err := parser.parse([]byte{
_, err := parser.Parse([]byte{
/* 0x00, */ 0xc1, 0x86, 0x8e, 0x55, 0x1e, 0xa5, 0x14, 0x80, 0xa, 0x55, 0x0, 0x0, 0x0, 0x7, 0xc,
0xbf, 0xe, 0x0, 0x0, 0x5f, 0x6, 0x3, 0x0, 0x0, 0x0, 0x1, 0x0, 0x2, 0x0, 0xd, 0xff,
0x0, 0x0, 0x19, 0x63, 0x7, 0x0, 0xca, 0x61, 0x5, 0x0, 0x5e, 0xf7, 0xc, 0x0, 0xf5, 0x7,

View File

@ -1,6 +1,7 @@
package replication
import (
"context"
"flag"
"fmt"
"os"
@ -8,8 +9,6 @@ import (
"testing"
"time"
"golang.org/x/net/context"
. "github.com/pingcap/check"
uuid "github.com/satori/go.uuid"
"github.com/siddontang/go-mysql/client"
@ -230,6 +229,28 @@ func (t *testSyncerSuite) testSync(c *C, s *BinlogStreamer) {
}
}
str = `DROP TABLE IF EXISTS test_parse_time`
t.testExecute(c, str)
// Must allow zero time.
t.testExecute(c, `SET sql_mode=''`)
str = `CREATE TABLE test_parse_time (
a1 DATETIME,
a2 DATETIME(3),
a3 DATETIME(6),
b1 TIMESTAMP,
b2 TIMESTAMP(3) ,
b3 TIMESTAMP(6))`
t.testExecute(c, str)
t.testExecute(c, `INSERT INTO test_parse_time VALUES
("2014-09-08 17:51:04.123456", "2014-09-08 17:51:04.123456", "2014-09-08 17:51:04.123456",
"2014-09-08 17:51:04.123456","2014-09-08 17:51:04.123456","2014-09-08 17:51:04.123456"),
("0000-00-00 00:00:00.000000", "0000-00-00 00:00:00.000000", "0000-00-00 00:00:00.000000",
"0000-00-00 00:00:00.000000", "0000-00-00 00:00:00.000000", "0000-00-00 00:00:00.000000"),
("2014-09-08 17:51:04.000456", "2014-09-08 17:51:04.000456", "2014-09-08 17:51:04.000456",
"2014-09-08 17:51:04.000456","2014-09-08 17:51:04.000456","2014-09-08 17:51:04.000456")`)
t.wg.Wait()
}
@ -271,7 +292,7 @@ func (t *testSyncerSuite) setupTest(c *C, flavor string) {
Password: "",
}
t.b = NewBinlogSyncer(&cfg)
t.b = NewBinlogSyncer(cfg)
}
func (t *testSyncerSuite) testPositionSync(c *C) {

View File

@ -15,6 +15,8 @@ import (
"github.com/siddontang/go/hack"
)
type errMissingTableMapEvent error
type TableMapEvent struct {
tableIDSize int
@ -223,6 +225,8 @@ type RowsEvent struct {
//rows: invalid: int64, float64, bool, []byte, string
Rows [][]interface{}
parseTime bool
}
func (e *RowsEvent) Decode(data []byte) error {
@ -257,7 +261,11 @@ func (e *RowsEvent) Decode(data []byte) error {
var ok bool
e.Table, ok = e.tables[e.TableID]
if !ok {
return errors.Errorf("invalid table id %d, no correspond table map event", e.TableID)
if len(e.tables) > 0 {
return errors.Errorf("invalid table id %d, no corresponding table map event", e.TableID)
} else {
return errMissingTableMapEvent(errors.Errorf("invalid table id %d, no corresponding table map event", e.TableID))
}
}
var err error
@ -336,6 +344,21 @@ func (e *RowsEvent) decodeRows(data []byte, table *TableMapEvent, bitmap []byte)
return pos, nil
}
func (e *RowsEvent) parseFracTime(t interface{}) interface{} {
v, ok := t.(fracTime)
if !ok {
return t
}
if !e.parseTime {
// Don't parse time, return string directly
return v.String()
}
// return Golang time directly
return v.Time
}
// see mysql sql/log_event.cc log_event_print_value
func (e *RowsEvent) decodeValue(data []byte, tp byte, meta uint16) (v interface{}, n int, err error) {
var length int = 0
@ -394,24 +417,26 @@ func (e *RowsEvent) decodeValue(data []byte, tp byte, meta uint16) (v interface{
case MYSQL_TYPE_TIMESTAMP:
n = 4
t := binary.LittleEndian.Uint32(data)
v = time.Unix(int64(t), 0)
v = e.parseFracTime(fracTime{time.Unix(int64(t), 0), 0})
case MYSQL_TYPE_TIMESTAMP2:
v, n, err = decodeTimestamp2(data, meta)
v = e.parseFracTime(v)
case MYSQL_TYPE_DATETIME:
n = 8
i64 := binary.LittleEndian.Uint64(data)
d := i64 / 1000000
t := i64 % 1000000
v = time.Date(int(d/10000),
v = e.parseFracTime(fracTime{time.Date(int(d/10000),
time.Month((d%10000)/100),
int(d%100),
int(t/10000),
int((t%10000)/100),
int(t%100),
0,
time.UTC).Format(TimeFormat)
time.UTC), 0})
case MYSQL_TYPE_DATETIME2:
v, n, err = decodeDatetime2(data, meta)
v = e.parseFracTime(v)
case MYSQL_TYPE_TIME:
n = 3
i32 := uint32(FixedLengthInt(data[0:3]))
@ -464,8 +489,8 @@ func (e *RowsEvent) decodeValue(data []byte, tp byte, meta uint16) (v interface{
case MYSQL_TYPE_STRING:
v, n = decodeString(data, length)
case MYSQL_TYPE_JSON:
// Refer https://github.com/shyiko/mysql-binlog-connector-java/blob/8f9132ee773317e00313204beeae8ddcaa43c1b4/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/AbstractRowsEventDataDeserializer.java#L344
length = int(binary.LittleEndian.Uint32(data[0:]))
// Refer: https://github.com/shyiko/mysql-binlog-connector-java/blob/master/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/AbstractRowsEventDataDeserializer.java#L404
length = int(FixedLengthInt(data[0:meta]))
n = length + int(meta)
v, err = decodeJsonBinary(data[meta:n])
case MYSQL_TYPE_GEOMETRY:
@ -615,11 +640,10 @@ func decodeTimestamp2(data []byte, dec uint16) (interface{}, int, error) {
}
if sec == 0 {
return "0000-00-00 00:00:00", n, nil
return formatZeroTime(int(usec), int(dec)), n, nil
}
t := time.Unix(sec, usec*1000)
return t, n, nil
return fracTime{time.Unix(sec, usec*1000), int(dec)}, n, nil
}
const DATETIMEF_INT_OFS int64 = 0x8000000000
@ -641,7 +665,7 @@ func decodeDatetime2(data []byte, dec uint16) (interface{}, int, error) {
}
if intPart == 0 {
return "0000-00-00 00:00:00", n, nil
return formatZeroTime(int(frac), int(dec)), n, nil
}
tmp := intPart<<24 + frac
@ -650,7 +674,7 @@ func decodeDatetime2(data []byte, dec uint16) (interface{}, int, error) {
tmp = -tmp
}
var secPart int64 = tmp % (1 << 24)
// var secPart int64 = tmp % (1 << 24)
ymdhms := tmp >> 24
ymd := ymdhms >> 17
@ -665,10 +689,7 @@ func decodeDatetime2(data []byte, dec uint16) (interface{}, int, error) {
minute := int((hms >> 6) % (1 << 6))
hour := int((hms >> 12))
if secPart != 0 {
return fmt.Sprintf("%04d-%02d-%02d %02d:%02d:%02d.%06d", year, month, day, hour, minute, second, secPart), n, nil // commented by Shlomi Noach. Yes I know about `git blame`
}
return fmt.Sprintf("%04d-%02d-%02d %02d:%02d:%02d", year, month, day, hour, minute, second), n, nil // commented by Shlomi Noach. Yes I know about `git blame`
return fracTime{time.Date(year, time.Month(month), day, hour, minute, second, int(frac*1000), time.UTC), int(dec)}, n, nil
}
const TIMEF_OFS int64 = 0x800000000000

View File

@ -403,7 +403,8 @@ func (_ *testDecodeSuite) TestParseJson(c *C) {
// INSERT INTO `t10` (`c2`) VALUES (1);
// INSERT INTO `t10` (`c1`, `c2`) VALUES ('{"key1": "value1", "key2": "value2"}', 1);
// test json deserialization
// INSERT INTO `t10`(`c1`,`c2`) VALUES ('{"text":"Lorem ipsum dolor sit amet, consectetuer adipiscing elit. Aenean commodo ligula eget dolor. Aenean massa. Cum sociis natoque penatibus et magnis dis parturient montes, nascetur ridiculus mus. Donec quam felis, ultricies nec, pellentesque eu, pretium quis, sem. Nulla consequat massa quis enim. Donec pede justo, fringilla vel, aliquet nec, vulputate eget, arcu. In enim justo, rhoncus ut, imperdiet a, venenatis vitae, justo. Nullam dictum felis eu pede mollis pretium. Integer tincidunt. Cras dapibus. Vivamus elementum semper nisi. Aenean vulputate eleifend tellus. Aenean leo ligula, porttitor eu, consequat vitae, eleifend ac, enim. Aliquam lorem ante, dapibus in, viverra quis, feugiat a, tellus. Phasellus viverra nulla ut metus varius laoreet. Quisque rutrum. Aenean imperdiet. Etiam ultricies nisi vel augue. Curabitur ullamcorper ultricies nisi. Nam eget dui. Etiam rhoncus. Maecenas tempus, tellus eget condimentum rhoncus, sem quam semper libero, sit amet adipiscing sem neque sed ipsum. Nam quam nunc, blandit vel, luctus pulvinar, hendrerit id, lorem. Maecenas nec odio et ante tincidunt tempus. Donec vitae sapien ut libero venenatis faucibus. Nullam quis ante. Etiam sit amet orci eget eros faucibus tincidunt. Duis leo. Sed fringilla mauris sit amet nibh. Donec sodales sagittis magna. Sed consequat, leo eget bibendum sodales, augue velit cursus nunc, quis gravida magna mi a libero. Fusce vulputate eleifend sapien. Vestibulum purus quam, scelerisque ut, mollis sed, nonummy id, metus. Nullam accumsan lorem in dui. Cras ultricies mi eu turpis hendrerit fringilla. Vestibulum ante ipsum primis in faucibus orci luctus et ultrices posuere cubilia Curae; In ac dui quis mi consectetuer lacinia. Nam pretium turpis et arcu. Duis arcu tortor, suscipit eget, imperdiet nec, imperdiet iaculis, ipsum. Sed aliquam ultrices mauris. Integer ante arcu, accumsan a, consectetuer eget, posuere ut, mauris. Praesent adipiscing. Phasellus ullamcorper ipsum rutrum nunc. Nunc nonummy metus. Vestibulum volutpat pretium libero. Cras id dui. Aenean ut eros et nisl sagittis vestibulum. Nullam nulla eros, ultricies sit amet, nonummy id, imperdiet feugiat, pede. Sed lectus. Donec mollis hendrerit risus. Phasellus nec sem in justo pellentesque facilisis. Etiam imperdiet imperdiet orci. Nunc nec neque. Phasellus leo dolor, tempus non, auctor et, hendrerit quis, nisi. Curabitur ligula sapien, tincidunt non, euismod vitae, posuere imperdiet, leo. Maecenas malesuada. Praesent congue erat at massa. Sed cursus turpis vitae tortor. Donec posuere vulputate arcu. Phasellus accumsan cursus velit. Vestibulum ante ipsum primis in faucibus orci luctus et ultrices posuere cubilia Curae; Sed aliquam, nisi quis porttitor congue, elit erat euismod orci, ac"}',101);
tableMapEventData := []byte("m\x00\x00\x00\x00\x00\x01\x00\x04test\x00\x03t10\x00\x02\xf5\xf6\x03\x04\n\x00\x03")
tableMapEvent := new(TableMapEvent)
@ -428,4 +429,15 @@ func (_ *testDecodeSuite) TestParseJson(c *C) {
c.Assert(err, IsNil)
c.Assert(rows.Rows[0][1], Equals, float64(1))
}
longTbls := [][]byte{
[]byte("m\x00\x00\x00\x00\x00\x01\x00\x02\x00\x02\xff\xfc\xd0\n\x00\x00\x00\x01\x00\xcf\n\v\x00\x04\x00\f\x0f\x00text\xbe\x15Lorem ipsum dolor sit amet, consectetuer adipiscing elit. Aenean commodo ligula eget dolor. Aenean massa. Cum sociis natoque penatibus et magnis dis parturient montes, nascetur ridiculus mus. Donec quam felis, ultricies nec, pellentesque eu, pretium quis, sem. Nulla consequat massa quis enim. Donec pede justo, fringilla vel, aliquet nec, vulputate eget, arcu. In enim justo, rhoncus ut, imperdiet a, venenatis vitae, justo. Nullam dictum felis eu pede mollis pretium. Integer tincidunt. Cras dapibus. Vivamus elementum semper nisi. Aenean vulputate eleifend tellus. Aenean leo ligula, porttitor eu, consequat vitae, eleifend ac, enim. Aliquam lorem ante, dapibus in, viverra quis, feugiat a, tellus. Phasellus viverra nulla ut metus varius laoreet. Quisque rutrum. Aenean imperdiet. Etiam ultricies nisi vel augue. Curabitur ullamcorper ultricies nisi. Nam eget dui. Etiam rhoncus. Maecenas tempus, tellus eget condimentum rhoncus, sem quam semper libero, sit amet adipiscing sem neque sed ipsum. Nam quam nunc, blandit vel, luctus pulvinar, hendrerit id, lorem. Maecenas nec odio et ante tincidunt tempus. Donec vitae sapien ut libero venenatis faucibus. Nullam quis ante. Etiam sit amet orci eget eros faucibus tincidunt. Duis leo. Sed fringilla mauris sit amet nibh. Donec sodales sagittis magna. Sed consequat, leo eget bibendum sodales, augue velit cursus nunc, quis gravida magna mi a libero. Fusce vulputate eleifend sapien. Vestibulum purus quam, scelerisque ut, mollis sed, nonummy id, metus. Nullam accumsan lorem in dui. Cras ultricies mi eu turpis hendrerit fringilla. Vestibulum ante ipsum primis in faucibus orci luctus et ultrices posuere cubilia Curae; In ac dui quis mi consectetuer lacinia. Nam pretium turpis et arcu. Duis arcu tortor, suscipit eget, imperdiet nec, imperdiet iaculis, ipsum. Sed aliquam ultrices mauris. Integer ante arcu, accumsan a, consectetuer eget, posuere ut, mauris. Praesent adipiscing. Phasellus ullamcorper ipsum rutrum nunc. Nunc nonummy metus. Vestibulum volutpat pretium libero. Cras id dui. Aenean ut eros et nisl sagittis vestibulum. Nullam nulla eros, ultricies sit amet, nonummy id, imperdiet feugiat, pede. Sed lectus. Donec mollis hendrerit risus. Phasellus nec sem in justo pellentesque facilisis. Etiam imperdiet imperdiet orci. Nunc nec neque. Phasellus leo dolor, tempus non, auctor et, hendrerit quis, nisi. Curabitur ligula sapien, tincidunt non, euismod vitae, posuere imperdiet, leo. Maecenas malesuada. Praesent congue erat at massa. Sed cursus turpis vitae tortor. Donec posuere vulputate arcu. Phasellus accumsan cursus velit. Vestibulum ante ipsum primis in faucibus orci luctus et ultrices posuere cubilia Curae; Sed aliquam, nisi quis porttitor congue, elit erat euismod orci, ac\x80\x00\x00\x00e"),
}
for _, ltbl := range longTbls {
rows.Rows = nil
err = rows.Decode(ltbl)
c.Assert(err, IsNil)
c.Assert(rows.Rows[0][1], Equals, float64(101))
}
}

View File

@ -0,0 +1,43 @@
package replication
import (
"fmt"
"strings"
"time"
)
var (
fracTimeFormat []string
)
// fracTime is a help structure wrapping Golang Time.
type fracTime struct {
time.Time
// Dec must in [0, 6]
Dec int
}
func (t fracTime) String() string {
return t.Format(fracTimeFormat[t.Dec])
}
func formatZeroTime(frac int, dec int) string {
if dec == 0 {
return "0000-00-00 00:00:00"
}
s := fmt.Sprintf("0000-00-00 00:00:00.%06d", frac)
// dec must < 6, if frac is 924000, but dec is 3, we must output 924 here.
return s[0 : len(s)-(6-dec)]
}
func init() {
fracTimeFormat = make([]string, 7)
fracTimeFormat[0] = "2006-01-02 15:04:05"
for i := 1; i <= 6; i++ {
fracTimeFormat[i] = fmt.Sprintf("2006-01-02 15:04:05.%s", strings.Repeat("0", i))
}
}

View File

@ -0,0 +1,51 @@
package replication
import (
"time"
. "github.com/pingcap/check"
)
type testTimeSuite struct{}
var _ = Suite(&testTimeSuite{})
func (s *testSyncerSuite) TestTime(c *C) {
tbls := []struct {
year int
month int
day int
hour int
min int
sec int
microSec int
frac int
expected string
}{
{2000, 1, 1, 1, 1, 1, 1, 0, "2000-01-01 01:01:01"},
{2000, 1, 1, 1, 1, 1, 1, 1, "2000-01-01 01:01:01.0"},
{2000, 1, 1, 1, 1, 1, 1, 6, "2000-01-01 01:01:01.000001"},
}
for _, t := range tbls {
t1 := fracTime{time.Date(t.year, time.Month(t.month), t.day, t.hour, t.min, t.sec, t.microSec*1000, time.UTC), t.frac}
c.Assert(t1.String(), Equals, t.expected)
}
zeroTbls := []struct {
frac int
dec int
expected string
}{
{0, 1, "0000-00-00 00:00:00.0"},
{1, 1, "0000-00-00 00:00:00.0"},
{123, 3, "0000-00-00 00:00:00.000"},
{123000, 3, "0000-00-00 00:00:00.123"},
{123, 6, "0000-00-00 00:00:00.000123"},
{123000, 6, "0000-00-00 00:00:00.123000"},
}
for _, t := range zeroTbls {
c.Assert(formatZeroTime(t.frac, t.dec), Equals, t.expected)
}
}

View File

@ -5,6 +5,7 @@
package schema
import (
"database/sql"
"fmt"
"strings"
@ -12,6 +13,8 @@ import (
"github.com/siddontang/go-mysql/mysql"
)
var ErrTableNotExist = errors.New("table is not exist")
const (
TYPE_NUMBER = iota + 1 // tinyint, smallint, mediumint, int, bigint, year
TYPE_FLOAT // float, double
@ -29,7 +32,10 @@ const (
type TableColumn struct {
Name string
Type int
Collation string
RawType string
IsAuto bool
IsUnsigned bool
EnumValues []string
SetValues []string
}
@ -53,9 +59,10 @@ func (ta *Table) String() string {
return fmt.Sprintf("%s.%s", ta.Schema, ta.Name)
}
func (ta *Table) AddColumn(name string, columnType string, extra string) {
func (ta *Table) AddColumn(name string, columnType string, collation string, extra string) {
index := len(ta.Columns)
ta.Columns = append(ta.Columns, TableColumn{Name: name})
ta.Columns = append(ta.Columns, TableColumn{Name: name, Collation: collation})
ta.Columns[index].RawType = columnType
if strings.Contains(columnType, "int") || strings.HasPrefix(columnType, "year") {
ta.Columns[index].Type = TYPE_NUMBER
@ -97,6 +104,10 @@ func (ta *Table) AddColumn(name string, columnType string, extra string) {
ta.Columns[index].Type = TYPE_STRING
}
if strings.Contains(columnType, "unsigned") || strings.Contains(columnType, "zerofill") {
ta.Columns[index].IsUnsigned = true
}
if extra == "auto_increment" {
ta.Columns[index].IsAuto = true
}
@ -142,6 +153,35 @@ func (idx *Index) FindColumn(name string) int {
return -1
}
func IsTableExist(conn mysql.Executer, schema string, name string) (bool, error) {
query := fmt.Sprintf("SELECT * FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = '%s' and TABLE_NAME = '%s' LIMIT 1", schema, name)
r, err := conn.Execute(query)
if err != nil {
return false, errors.Trace(err)
}
return r.RowNumber() == 1, nil
}
func NewTableFromSqlDB(conn *sql.DB, schema string, name string) (*Table, error) {
ta := &Table{
Schema: schema,
Name: name,
Columns: make([]TableColumn, 0, 16),
Indexes: make([]*Index, 0, 8),
}
if err := ta.fetchColumnsViaSqlDB(conn); err != nil {
return nil, errors.Trace(err)
}
if err := ta.fetchIndexesViaSqlDB(conn); err != nil {
return nil, errors.Trace(err)
}
return ta, nil
}
func NewTable(conn mysql.Executer, schema string, name string) (*Table, error) {
ta := &Table{
Schema: schema,
@ -151,18 +191,18 @@ func NewTable(conn mysql.Executer, schema string, name string) (*Table, error) {
}
if err := ta.fetchColumns(conn); err != nil {
return nil, err
return nil, errors.Trace(err)
}
if err := ta.fetchIndexes(conn); err != nil {
return nil, err
return nil, errors.Trace(err)
}
return ta, nil
}
func (ta *Table) fetchColumns(conn mysql.Executer) error {
r, err := conn.Execute(fmt.Sprintf("describe `%s`.`%s`", ta.Schema, ta.Name))
r, err := conn.Execute(fmt.Sprintf("show full columns from `%s`.`%s`", ta.Schema, ta.Name))
if err != nil {
return errors.Trace(err)
}
@ -170,14 +210,39 @@ func (ta *Table) fetchColumns(conn mysql.Executer) error {
for i := 0; i < r.RowNumber(); i++ {
name, _ := r.GetString(i, 0)
colType, _ := r.GetString(i, 1)
extra, _ := r.GetString(i, 5)
collation, _ := r.GetString(i, 2)
extra, _ := r.GetString(i, 6)
ta.AddColumn(name, colType, extra)
ta.AddColumn(name, colType, collation, extra)
}
return nil
}
func (ta *Table) fetchColumnsViaSqlDB(conn *sql.DB) error {
r, err := conn.Query(fmt.Sprintf("show full columns from `%s`.`%s`", ta.Schema, ta.Name))
if err != nil {
return errors.Trace(err)
}
defer r.Close()
var unusedVal interface{}
unused := &unusedVal
for r.Next() {
var name, colType, extra string
var collation sql.NullString
err := r.Scan(&name, &colType, &collation, &unused, &unused, &unused, &extra, &unused, &unused)
if err != nil {
return errors.Trace(err)
}
ta.AddColumn(name, colType, collation.String, extra)
}
return r.Err()
}
func (ta *Table) fetchIndexes(conn mysql.Executer) error {
r, err := conn.Execute(fmt.Sprintf("show index from `%s`.`%s`", ta.Schema, ta.Name))
if err != nil {
@ -197,6 +262,59 @@ func (ta *Table) fetchIndexes(conn mysql.Executer) error {
currentIndex.AddColumn(colName, cardinality)
}
return ta.fetchPrimaryKeyColumns()
}
func (ta *Table) fetchIndexesViaSqlDB(conn *sql.DB) error {
r, err := conn.Query(fmt.Sprintf("show index from `%s`.`%s`", ta.Schema, ta.Name))
if err != nil {
return errors.Trace(err)
}
defer r.Close()
var currentIndex *Index
currentName := ""
var unusedVal interface{}
unused := &unusedVal
for r.Next() {
var indexName, colName string
var cardinality uint64
err := r.Scan(
&unused,
&unused,
&indexName,
&unused,
&colName,
&unused,
&cardinality,
&unused,
&unused,
&unused,
&unused,
&unused,
&unused,
)
if err != nil {
return errors.Trace(err)
}
if currentName != indexName {
currentIndex = ta.AddIndex(indexName)
currentName = indexName
}
currentIndex.AddColumn(colName, cardinality)
}
return ta.fetchPrimaryKeyColumns()
}
func (ta *Table) fetchPrimaryKeyColumns() error {
if len(ta.Indexes) == 0 {
return nil
}

View File

@ -1,12 +1,14 @@
package schema
import (
"database/sql"
"flag"
"fmt"
"testing"
. "github.com/pingcap/check"
"github.com/siddontang/go-mysql/client"
_ "github.com/siddontang/go-mysql/driver"
)
// use docker mysql for test
@ -18,6 +20,7 @@ func Test(t *testing.T) {
type schemaTestSuite struct {
conn *client.Conn
sqlDB *sql.DB
}
var _ = Suite(&schemaTestSuite{})
@ -26,12 +29,19 @@ func (s *schemaTestSuite) SetUpSuite(c *C) {
var err error
s.conn, err = client.Connect(fmt.Sprintf("%s:%d", *host, 3306), "root", "", "test")
c.Assert(err, IsNil)
s.sqlDB, err = sql.Open("mysql", fmt.Sprintf("root:@%s:3306", *host))
c.Assert(err, IsNil)
}
func (s *schemaTestSuite) TearDownSuite(c *C) {
if s.conn != nil {
s.conn.Close()
}
if s.sqlDB != nil {
s.sqlDB.Close()
}
}
func (s *schemaTestSuite) TestSchema(c *C) {
@ -48,6 +58,10 @@ func (s *schemaTestSuite) TestSchema(c *C) {
se SET('a', 'b', 'c'),
f FLOAT,
d DECIMAL(2, 1),
uint INT UNSIGNED,
zfint INT ZEROFILL,
name_ucs VARCHAR(256) CHARACTER SET ucs2,
name_utf8 VARCHAR(256) CHARACTER SET utf8,
PRIMARY KEY(id2, id),
UNIQUE (id1),
INDEX name_idx (name)
@ -60,7 +74,7 @@ func (s *schemaTestSuite) TestSchema(c *C) {
ta, err := NewTable(s.conn, "test", "schema_test")
c.Assert(err, IsNil)
c.Assert(ta.Columns, HasLen, 8)
c.Assert(ta.Columns, HasLen, 12)
c.Assert(ta.Indexes, HasLen, 3)
c.Assert(ta.PKColumns, DeepEquals, []int{2, 0})
c.Assert(ta.Indexes[0].Columns, HasLen, 2)
@ -69,6 +83,16 @@ func (s *schemaTestSuite) TestSchema(c *C) {
c.Assert(ta.Columns[4].EnumValues, DeepEquals, []string{"a", "b", "c"})
c.Assert(ta.Columns[5].SetValues, DeepEquals, []string{"a", "b", "c"})
c.Assert(ta.Columns[7].Type, Equals, TYPE_FLOAT)
c.Assert(ta.Columns[0].IsUnsigned, IsFalse)
c.Assert(ta.Columns[8].IsUnsigned, IsTrue)
c.Assert(ta.Columns[9].IsUnsigned, IsTrue)
c.Assert(ta.Columns[10].Collation, Matches, "^ucs2.*")
c.Assert(ta.Columns[11].Collation, Matches, "^utf8.*")
taSqlDb, err := NewTableFromSqlDB(s.sqlDB, "test", "schema_test")
c.Assert(err, IsNil)
c.Assert(taSqlDb, DeepEquals, ta)
}
func (s *schemaTestSuite) TestQuoteSchema(c *C) {