Compare commits

...

7 Commits

Author SHA1 Message Date
Shlomi Noach
de8b13911c Merge branch 'master' into go-mysql-updates-201709 2017-10-24 15:27:45 +03:00
Shlomi Noach
fe6b11d73d better diff output in localtests 2017-10-24 15:24:31 +03:00
Shlomi Noach
e28c3c02b2 Merge branch 'master' into go-mysql-updates-201709 2017-10-24 14:45:53 +03:00
Shlomi Noach
aac063f6f9 Merge branch 'master' into go-mysql-updates-201709 2017-10-17 10:03:04 +03:00
Shlomi Noach
2635a26069 Merge branch 'master' into go-mysql-updates-201709 2017-10-02 08:43:25 +03:00
Shlomi Noach
3dd15dfd18 updated local code 2017-10-02 08:42:38 +03:00
Shlomi Noach
5a9745e2b6 updated go-mysql library 2017-10-02 08:40:33 +03:00
42 changed files with 1010 additions and 816 deletions

View File

@ -49,7 +49,7 @@ func NewGoMySQLReader(connectionConfig *mysql.ConnectionConfig) (binlogReader *G
User: connectionConfig.User, User: connectionConfig.User,
Password: connectionConfig.Password, Password: connectionConfig.Password,
} }
binlogReader.binlogSyncer = replication.NewBinlogSyncer(binlogSyncerConfig) binlogReader.binlogSyncer = replication.NewBinlogSyncer(*binlogSyncerConfig)
return binlogReader, err return binlogReader, err
} }

View File

@ -1,3 +0,0 @@
var
bin
.idea

View File

@ -1,8 +1,9 @@
language: go language: go
go: go:
- 1.6
- 1.7 - 1.7
- 1.8
- 1.9
dist: trusty dist: trusty
sudo: required sudo: required

View File

@ -25,7 +25,7 @@ cfg := replication.BinlogSyncerConfig {
User: "root", User: "root",
Password: "", Password: "",
} }
syncer := replication.NewBinlogSyncer(&cfg) syncer := replication.NewBinlogSyncer(cfg)
// Start sync with sepcified binlog file and position // Start sync with sepcified binlog file and position
streamer, _ := syncer.StartSync(mysql.Position{binlogFile, binlogPos}) streamer, _ := syncer.StartSync(mysql.Position{binlogFile, binlogPos})
@ -105,20 +105,21 @@ cfg.Dump.Tables = []string{"canal_test"}
c, err := NewCanal(cfg) c, err := NewCanal(cfg)
type myRowsEventHandler struct { type MyEventHandler struct {
DummyEventHandler
} }
func (h *myRowsEventHandler) Do(e *RowsEvent) error { func (h *MyEventHandler) OnRow(e *RowsEvent) error {
log.Infof("%s %v\n", e.Action, e.Rows) log.Infof("%s %v\n", e.Action, e.Rows)
return nil return nil
} }
func (h *myRowsEventHandler) String() string { func (h *MyEventHandler) String() string {
return "myRowsEventHandler" return "MyEventHandler"
} }
// Register a handler to handle RowsEvent // Register a handler to handle RowsEvent
c.RegRowsEventHandler(&MyRowsEventHandler{}) c.SetEventHandler(&MyEventHandler{})
// Start canal // Start canal
c.Start() c.Start()
@ -221,6 +222,14 @@ func main() {
We pass all tests in https://github.com/bradfitz/go-sql-test using go-mysql driver. :-) We pass all tests in https://github.com/bradfitz/go-sql-test using go-mysql driver. :-)
## Donate
If you like the project and want to buy me a cola, you can through:
|PayPal|微信|
|------|---|
|[![](https://www.paypalobjects.com/webstatic/paypalme/images/pp_logo_small.png)](https://paypal.me/siddontang)|[![](https://github.com/siddontang/blog/blob/master/donate/weixin.png)|
## Feedback ## Feedback
go-mysql is still in development, your feedback is very welcome. go-mysql is still in development, your feedback is very welcome.

View File

@ -1,27 +0,0 @@
Copyright (c) 2009 The Go Authors. All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are
met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above
copyright notice, this list of conditions and the following disclaimer
in the documentation and/or other materials provided with the
distribution.
* Neither the name of Google Inc. nor the names of its
contributors may be used to endorse or promote products derived from
this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

View File

@ -1,22 +0,0 @@
Additional IP Rights Grant (Patents)
"This implementation" means the copyrightable works distributed by
Google as part of the Go project.
Google hereby grants to You a perpetual, worldwide, non-exclusive,
no-charge, royalty-free, irrevocable (except as stated in this section)
patent license to make, have made, use, offer to sell, sell, import,
transfer and otherwise run, modify and propagate the contents of this
implementation of Go, where such license applies only to those patent
claims, both currently owned or controlled by Google and acquired in
the future, licensable by Google that are necessarily infringed by this
implementation of Go. This grant does not include claims that would be
infringed only as a consequence of further modification of this
implementation. If you or your agent or exclusive licensee institute or
order or agree to the institution of patent litigation against any
entity (including a cross-claim or counterclaim in a lawsuit) alleging
that this implementation of Go or any code incorporated within this
implementation of Go constitutes direct or contributory patent
infringement, or inducement of patent infringement, then any patent
rights granted to you under this License for this implementation of Go
shall terminate as of the date such litigation is filed.

View File

@ -1,447 +0,0 @@
// Copyright 2014 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package context defines the Context type, which carries deadlines,
// cancelation signals, and other request-scoped values across API boundaries
// and between processes.
//
// Incoming requests to a server should create a Context, and outgoing calls to
// servers should accept a Context. The chain of function calls between must
// propagate the Context, optionally replacing it with a modified copy created
// using WithDeadline, WithTimeout, WithCancel, or WithValue.
//
// Programs that use Contexts should follow these rules to keep interfaces
// consistent across packages and enable static analysis tools to check context
// propagation:
//
// Do not store Contexts inside a struct type; instead, pass a Context
// explicitly to each function that needs it. The Context should be the first
// parameter, typically named ctx:
//
// func DoSomething(ctx context.Context, arg Arg) error {
// // ... use ctx ...
// }
//
// Do not pass a nil Context, even if a function permits it. Pass context.TODO
// if you are unsure about which Context to use.
//
// Use context Values only for request-scoped data that transits processes and
// APIs, not for passing optional parameters to functions.
//
// The same Context may be passed to functions running in different goroutines;
// Contexts are safe for simultaneous use by multiple goroutines.
//
// See http://blog.golang.org/context for example code for a server that uses
// Contexts.
package context // import "golang.org/x/net/context"
import (
"errors"
"fmt"
"sync"
"time"
)
// A Context carries a deadline, a cancelation signal, and other values across
// API boundaries.
//
// Context's methods may be called by multiple goroutines simultaneously.
type Context interface {
// Deadline returns the time when work done on behalf of this context
// should be canceled. Deadline returns ok==false when no deadline is
// set. Successive calls to Deadline return the same results.
Deadline() (deadline time.Time, ok bool)
// Done returns a channel that's closed when work done on behalf of this
// context should be canceled. Done may return nil if this context can
// never be canceled. Successive calls to Done return the same value.
//
// WithCancel arranges for Done to be closed when cancel is called;
// WithDeadline arranges for Done to be closed when the deadline
// expires; WithTimeout arranges for Done to be closed when the timeout
// elapses.
//
// Done is provided for use in select statements:
//
// // Stream generates values with DoSomething and sends them to out
// // until DoSomething returns an error or ctx.Done is closed.
// func Stream(ctx context.Context, out <-chan Value) error {
// for {
// v, err := DoSomething(ctx)
// if err != nil {
// return err
// }
// select {
// case <-ctx.Done():
// return ctx.Err()
// case out <- v:
// }
// }
// }
//
// See http://blog.golang.org/pipelines for more examples of how to use
// a Done channel for cancelation.
Done() <-chan struct{}
// Err returns a non-nil error value after Done is closed. Err returns
// Canceled if the context was canceled or DeadlineExceeded if the
// context's deadline passed. No other values for Err are defined.
// After Done is closed, successive calls to Err return the same value.
Err() error
// Value returns the value associated with this context for key, or nil
// if no value is associated with key. Successive calls to Value with
// the same key returns the same result.
//
// Use context values only for request-scoped data that transits
// processes and API boundaries, not for passing optional parameters to
// functions.
//
// A key identifies a specific value in a Context. Functions that wish
// to store values in Context typically allocate a key in a global
// variable then use that key as the argument to context.WithValue and
// Context.Value. A key can be any type that supports equality;
// packages should define keys as an unexported type to avoid
// collisions.
//
// Packages that define a Context key should provide type-safe accessors
// for the values stores using that key:
//
// // Package user defines a User type that's stored in Contexts.
// package user
//
// import "golang.org/x/net/context"
//
// // User is the type of value stored in the Contexts.
// type User struct {...}
//
// // key is an unexported type for keys defined in this package.
// // This prevents collisions with keys defined in other packages.
// type key int
//
// // userKey is the key for user.User values in Contexts. It is
// // unexported; clients use user.NewContext and user.FromContext
// // instead of using this key directly.
// var userKey key = 0
//
// // NewContext returns a new Context that carries value u.
// func NewContext(ctx context.Context, u *User) context.Context {
// return context.WithValue(ctx, userKey, u)
// }
//
// // FromContext returns the User value stored in ctx, if any.
// func FromContext(ctx context.Context) (*User, bool) {
// u, ok := ctx.Value(userKey).(*User)
// return u, ok
// }
Value(key interface{}) interface{}
}
// Canceled is the error returned by Context.Err when the context is canceled.
var Canceled = errors.New("context canceled")
// DeadlineExceeded is the error returned by Context.Err when the context's
// deadline passes.
var DeadlineExceeded = errors.New("context deadline exceeded")
// An emptyCtx is never canceled, has no values, and has no deadline. It is not
// struct{}, since vars of this type must have distinct addresses.
type emptyCtx int
func (*emptyCtx) Deadline() (deadline time.Time, ok bool) {
return
}
func (*emptyCtx) Done() <-chan struct{} {
return nil
}
func (*emptyCtx) Err() error {
return nil
}
func (*emptyCtx) Value(key interface{}) interface{} {
return nil
}
func (e *emptyCtx) String() string {
switch e {
case background:
return "context.Background"
case todo:
return "context.TODO"
}
return "unknown empty Context"
}
var (
background = new(emptyCtx)
todo = new(emptyCtx)
)
// Background returns a non-nil, empty Context. It is never canceled, has no
// values, and has no deadline. It is typically used by the main function,
// initialization, and tests, and as the top-level Context for incoming
// requests.
func Background() Context {
return background
}
// TODO returns a non-nil, empty Context. Code should use context.TODO when
// it's unclear which Context to use or it is not yet available (because the
// surrounding function has not yet been extended to accept a Context
// parameter). TODO is recognized by static analysis tools that determine
// whether Contexts are propagated correctly in a program.
func TODO() Context {
return todo
}
// A CancelFunc tells an operation to abandon its work.
// A CancelFunc does not wait for the work to stop.
// After the first call, subsequent calls to a CancelFunc do nothing.
type CancelFunc func()
// WithCancel returns a copy of parent with a new Done channel. The returned
// context's Done channel is closed when the returned cancel function is called
// or when the parent context's Done channel is closed, whichever happens first.
//
// Canceling this context releases resources associated with it, so code should
// call cancel as soon as the operations running in this Context complete.
func WithCancel(parent Context) (ctx Context, cancel CancelFunc) {
c := newCancelCtx(parent)
propagateCancel(parent, &c)
return &c, func() { c.cancel(true, Canceled) }
}
// newCancelCtx returns an initialized cancelCtx.
func newCancelCtx(parent Context) cancelCtx {
return cancelCtx{
Context: parent,
done: make(chan struct{}),
}
}
// propagateCancel arranges for child to be canceled when parent is.
func propagateCancel(parent Context, child canceler) {
if parent.Done() == nil {
return // parent is never canceled
}
if p, ok := parentCancelCtx(parent); ok {
p.mu.Lock()
if p.err != nil {
// parent has already been canceled
child.cancel(false, p.err)
} else {
if p.children == nil {
p.children = make(map[canceler]bool)
}
p.children[child] = true
}
p.mu.Unlock()
} else {
go func() {
select {
case <-parent.Done():
child.cancel(false, parent.Err())
case <-child.Done():
}
}()
}
}
// parentCancelCtx follows a chain of parent references until it finds a
// *cancelCtx. This function understands how each of the concrete types in this
// package represents its parent.
func parentCancelCtx(parent Context) (*cancelCtx, bool) {
for {
switch c := parent.(type) {
case *cancelCtx:
return c, true
case *timerCtx:
return &c.cancelCtx, true
case *valueCtx:
parent = c.Context
default:
return nil, false
}
}
}
// removeChild removes a context from its parent.
func removeChild(parent Context, child canceler) {
p, ok := parentCancelCtx(parent)
if !ok {
return
}
p.mu.Lock()
if p.children != nil {
delete(p.children, child)
}
p.mu.Unlock()
}
// A canceler is a context type that can be canceled directly. The
// implementations are *cancelCtx and *timerCtx.
type canceler interface {
cancel(removeFromParent bool, err error)
Done() <-chan struct{}
}
// A cancelCtx can be canceled. When canceled, it also cancels any children
// that implement canceler.
type cancelCtx struct {
Context
done chan struct{} // closed by the first cancel call.
mu sync.Mutex
children map[canceler]bool // set to nil by the first cancel call
err error // set to non-nil by the first cancel call
}
func (c *cancelCtx) Done() <-chan struct{} {
return c.done
}
func (c *cancelCtx) Err() error {
c.mu.Lock()
defer c.mu.Unlock()
return c.err
}
func (c *cancelCtx) String() string {
return fmt.Sprintf("%v.WithCancel", c.Context)
}
// cancel closes c.done, cancels each of c's children, and, if
// removeFromParent is true, removes c from its parent's children.
func (c *cancelCtx) cancel(removeFromParent bool, err error) {
if err == nil {
panic("context: internal error: missing cancel error")
}
c.mu.Lock()
if c.err != nil {
c.mu.Unlock()
return // already canceled
}
c.err = err
close(c.done)
for child := range c.children {
// NOTE: acquiring the child's lock while holding parent's lock.
child.cancel(false, err)
}
c.children = nil
c.mu.Unlock()
if removeFromParent {
removeChild(c.Context, c)
}
}
// WithDeadline returns a copy of the parent context with the deadline adjusted
// to be no later than d. If the parent's deadline is already earlier than d,
// WithDeadline(parent, d) is semantically equivalent to parent. The returned
// context's Done channel is closed when the deadline expires, when the returned
// cancel function is called, or when the parent context's Done channel is
// closed, whichever happens first.
//
// Canceling this context releases resources associated with it, so code should
// call cancel as soon as the operations running in this Context complete.
func WithDeadline(parent Context, deadline time.Time) (Context, CancelFunc) {
if cur, ok := parent.Deadline(); ok && cur.Before(deadline) {
// The current deadline is already sooner than the new one.
return WithCancel(parent)
}
c := &timerCtx{
cancelCtx: newCancelCtx(parent),
deadline: deadline,
}
propagateCancel(parent, c)
d := deadline.Sub(time.Now())
if d <= 0 {
c.cancel(true, DeadlineExceeded) // deadline has already passed
return c, func() { c.cancel(true, Canceled) }
}
c.mu.Lock()
defer c.mu.Unlock()
if c.err == nil {
c.timer = time.AfterFunc(d, func() {
c.cancel(true, DeadlineExceeded)
})
}
return c, func() { c.cancel(true, Canceled) }
}
// A timerCtx carries a timer and a deadline. It embeds a cancelCtx to
// implement Done and Err. It implements cancel by stopping its timer then
// delegating to cancelCtx.cancel.
type timerCtx struct {
cancelCtx
timer *time.Timer // Under cancelCtx.mu.
deadline time.Time
}
func (c *timerCtx) Deadline() (deadline time.Time, ok bool) {
return c.deadline, true
}
func (c *timerCtx) String() string {
return fmt.Sprintf("%v.WithDeadline(%s [%s])", c.cancelCtx.Context, c.deadline, c.deadline.Sub(time.Now()))
}
func (c *timerCtx) cancel(removeFromParent bool, err error) {
c.cancelCtx.cancel(false, err)
if removeFromParent {
// Remove this timerCtx from its parent cancelCtx's children.
removeChild(c.cancelCtx.Context, c)
}
c.mu.Lock()
if c.timer != nil {
c.timer.Stop()
c.timer = nil
}
c.mu.Unlock()
}
// WithTimeout returns WithDeadline(parent, time.Now().Add(timeout)).
//
// Canceling this context releases resources associated with it, so code should
// call cancel as soon as the operations running in this Context complete:
//
// func slowOperationWithTimeout(ctx context.Context) (Result, error) {
// ctx, cancel := context.WithTimeout(ctx, 100*time.Millisecond)
// defer cancel() // releases resources if slowOperation completes before timeout elapses
// return slowOperation(ctx)
// }
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc) {
return WithDeadline(parent, time.Now().Add(timeout))
}
// WithValue returns a copy of parent in which the value associated with key is
// val.
//
// Use context Values only for request-scoped data that transits processes and
// APIs, not for passing optional parameters to functions.
func WithValue(parent Context, key interface{}, val interface{}) Context {
return &valueCtx{parent, key, val}
}
// A valueCtx carries a key-value pair. It implements Value for that key and
// delegates all other calls to the embedded Context.
type valueCtx struct {
Context
key, val interface{}
}
func (c *valueCtx) String() string {
return fmt.Sprintf("%v.WithValue(%#v, %#v)", c.Context, c.key, c.val)
}
func (c *valueCtx) Value(key interface{}) interface{} {
if c.key == key {
return c.val
}
return c.Context.Value(key)
}

View File

@ -1,10 +1,10 @@
package canal package canal
import ( import (
"context"
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"os" "os"
"path"
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
@ -16,11 +16,8 @@ import (
"github.com/siddontang/go-mysql/mysql" "github.com/siddontang/go-mysql/mysql"
"github.com/siddontang/go-mysql/replication" "github.com/siddontang/go-mysql/replication"
"github.com/siddontang/go-mysql/schema" "github.com/siddontang/go-mysql/schema"
"github.com/siddontang/go/sync2"
) )
var errCanalClosed = errors.New("canal was closed")
// Canal can sync your MySQL data into everywhere, like Elasticsearch, Redis, etc... // Canal can sync your MySQL data into everywhere, like Elasticsearch, Redis, etc...
// MySQL must open row format for binlog // MySQL must open row format for binlog
type Canal struct { type Canal struct {
@ -28,50 +25,42 @@ type Canal struct {
cfg *Config cfg *Config
useGTID bool
master *masterInfo master *masterInfo
dumper *dump.Dumper dumper *dump.Dumper
dumpDoneCh chan struct{} dumpDoneCh chan struct{}
syncer *replication.BinlogSyncer syncer *replication.BinlogSyncer
rsLock sync.Mutex eventHandler EventHandler
rsHandlers []RowsEventHandler
connLock sync.Mutex connLock sync.Mutex
conn *client.Conn conn *client.Conn
wg sync.WaitGroup wg sync.WaitGroup
tableLock sync.Mutex tableLock sync.RWMutex
tables map[string]*schema.Table tables map[string]*schema.Table
quit chan struct{} ctx context.Context
closed sync2.AtomicBool cancel context.CancelFunc
} }
func NewCanal(cfg *Config) (*Canal, error) { func NewCanal(cfg *Config) (*Canal, error) {
c := new(Canal) c := new(Canal)
c.cfg = cfg c.cfg = cfg
c.closed.Set(false)
c.quit = make(chan struct{})
os.MkdirAll(cfg.DataDir, 0755) c.ctx, c.cancel = context.WithCancel(context.Background())
c.dumpDoneCh = make(chan struct{}) c.dumpDoneCh = make(chan struct{})
c.rsHandlers = make([]RowsEventHandler, 0, 4) c.eventHandler = &DummyEventHandler{}
c.tables = make(map[string]*schema.Table) c.tables = make(map[string]*schema.Table)
c.master = &masterInfo{}
var err error var err error
if c.master, err = loadMasterInfo(c.masterInfoPath()); err != nil {
return nil, errors.Trace(err)
} else if len(c.master.Addr) != 0 && c.master.Addr != c.cfg.Addr {
log.Infof("MySQL addr %s in old master.info, but new %s, reset", c.master.Addr, c.cfg.Addr)
// may use another MySQL, reset
c.master = &masterInfo{}
}
c.master.Addr = c.cfg.Addr if err = c.prepareDumper(); err != nil {
if err := c.prepareDumper(); err != nil {
return nil, errors.Trace(err) return nil, errors.Trace(err)
} }
@ -114,6 +103,12 @@ func (c *Canal) prepareDumper() error {
c.dumper.AddTables(tableDB, tables...) c.dumper.AddTables(tableDB, tables...)
} }
charset := c.cfg.Charset
c.dumper.SetCharset(charset)
c.dumper.SkipMasterData(c.cfg.Dump.SkipMasterData)
c.dumper.SetMaxAllowedPacket(c.cfg.Dump.MaxAllowedPacketMB)
for _, ignoreTable := range c.cfg.Dump.IgnoreTables { for _, ignoreTable := range c.cfg.Dump.IgnoreTables {
if seps := strings.Split(ignoreTable, ","); len(seps) == 2 { if seps := strings.Split(ignoreTable, ","); len(seps) == 2 {
c.dumper.AddIgnoreTables(seps[0], seps[1]) c.dumper.AddIgnoreTables(seps[0], seps[1])
@ -129,6 +124,8 @@ func (c *Canal) prepareDumper() error {
return nil return nil
} }
// Start will first try to dump all data from MySQL master `mysqldump`,
// then sync from the binlog position in the dump data.
func (c *Canal) Start() error { func (c *Canal) Start() error {
c.wg.Add(1) c.wg.Add(1)
go c.run() go c.run()
@ -136,55 +133,57 @@ func (c *Canal) Start() error {
return nil return nil
} }
func (c *Canal) run() error { // StartFrom will sync from the binlog position directly, ignore mysqldump.
defer c.wg.Done() func (c *Canal) StartFrom(pos mysql.Position) error {
c.useGTID = false
c.master.Update(pos)
if err := c.tryDump(); err != nil { return c.Start()
}
func (c *Canal) StartFromGTID(set mysql.GTIDSet) error {
c.useGTID = true
c.master.UpdateGTID(set)
return c.Start()
}
func (c *Canal) run() error {
defer func() {
c.wg.Done()
c.cancel()
}()
err := c.tryDump()
close(c.dumpDoneCh)
if err != nil {
log.Errorf("canal dump mysql err: %v", err) log.Errorf("canal dump mysql err: %v", err)
return errors.Trace(err) return errors.Trace(err)
} }
close(c.dumpDoneCh) if err = c.runSyncBinlog(); err != nil {
if err := c.startSyncBinlog(); err != nil {
if !c.isClosed() {
log.Errorf("canal start sync binlog err: %v", err) log.Errorf("canal start sync binlog err: %v", err)
}
return errors.Trace(err) return errors.Trace(err)
} }
return nil return nil
} }
func (c *Canal) isClosed() bool {
return c.closed.Get()
}
func (c *Canal) Close() { func (c *Canal) Close() {
log.Infof("close canal") log.Infof("closing canal")
c.m.Lock() c.m.Lock()
defer c.m.Unlock() defer c.m.Unlock()
if c.isClosed() { c.cancel()
return
}
c.closed.Set(true)
close(c.quit)
c.connLock.Lock() c.connLock.Lock()
c.conn.Close() c.conn.Close()
c.conn = nil c.conn = nil
c.connLock.Unlock() c.connLock.Unlock()
if c.syncer != nil {
c.syncer.Close() c.syncer.Close()
c.syncer = nil
}
c.master.Close() c.eventHandler.OnPosSynced(c.master.Position(), true)
c.wg.Wait() c.wg.Wait()
} }
@ -193,11 +192,15 @@ func (c *Canal) WaitDumpDone() <-chan struct{} {
return c.dumpDoneCh return c.dumpDoneCh
} }
func (c *Canal) Ctx() context.Context {
return c.ctx
}
func (c *Canal) GetTable(db string, table string) (*schema.Table, error) { func (c *Canal) GetTable(db string, table string) (*schema.Table, error) {
key := fmt.Sprintf("%s.%s", db, table) key := fmt.Sprintf("%s.%s", db, table)
c.tableLock.Lock() c.tableLock.RLock()
t, ok := c.tables[key] t, ok := c.tables[key]
c.tableLock.Unlock() c.tableLock.RUnlock()
if ok { if ok {
return t, nil return t, nil
@ -205,6 +208,11 @@ func (c *Canal) GetTable(db string, table string) (*schema.Table, error) {
t, err := schema.NewTable(c, db, table) t, err := schema.NewTable(c, db, table)
if err != nil { if err != nil {
// check table not exists
if ok, err1 := schema.IsTableExist(c, db, table); err1 == nil && !ok {
return nil, schema.ErrTableNotExist
}
return nil, errors.Trace(err) return nil, errors.Trace(err)
} }
@ -215,6 +223,14 @@ func (c *Canal) GetTable(db string, table string) (*schema.Table, error) {
return t, nil return t, nil
} }
// ClearTableCache clear table cache
func (c *Canal) ClearTableCache(db []byte, table []byte) {
key := fmt.Sprintf("%s.%s", db, table)
c.tableLock.Lock()
delete(c.tables, key)
c.tableLock.Unlock()
}
// Check MySQL binlog row image, must be in FULL, MINIMAL, NOBLOB // Check MySQL binlog row image, must be in FULL, MINIMAL, NOBLOB
func (c *Canal) CheckBinlogRowImage(image string) error { func (c *Canal) CheckBinlogRowImage(image string) error {
// need to check MySQL binlog row image? full, minimal or noblob? // need to check MySQL binlog row image? full, minimal or noblob?
@ -263,17 +279,16 @@ func (c *Canal) prepareSyncer() error {
Port: uint16(port), Port: uint16(port),
User: c.cfg.User, User: c.cfg.User,
Password: c.cfg.Password, Password: c.cfg.Password,
Charset: c.cfg.Charset,
HeartbeatPeriod: c.cfg.HeartbeatPeriod,
ReadTimeout: c.cfg.ReadTimeout,
} }
c.syncer = replication.NewBinlogSyncer(&cfg) c.syncer = replication.NewBinlogSyncer(cfg)
return nil return nil
} }
func (c *Canal) masterInfoPath() string {
return path.Join(c.cfg.DataDir, "master.info")
}
// Execute a SQL // Execute a SQL
func (c *Canal) Execute(cmd string, args ...interface{}) (rr *mysql.Result, err error) { func (c *Canal) Execute(cmd string, args ...interface{}) (rr *mysql.Result, err error) {
c.connLock.Lock() c.connLock.Lock()
@ -303,5 +318,5 @@ func (c *Canal) Execute(cmd string, args ...interface{}) (rr *mysql.Result, err
} }
func (c *Canal) SyncedPosition() mysql.Position { func (c *Canal) SyncedPosition() mysql.Position {
return c.master.Pos() return c.master.Position()
} }

View File

@ -1,10 +1,11 @@
package canal package canal
import ( import (
"bytes"
"flag" "flag"
"fmt" "fmt"
"os"
"testing" "testing"
"time"
"github.com/ngaut/log" "github.com/ngaut/log"
. "github.com/pingcap/check" . "github.com/pingcap/check"
@ -27,16 +28,16 @@ func (s *canalTestSuite) SetUpSuite(c *C) {
cfg := NewDefaultConfig() cfg := NewDefaultConfig()
cfg.Addr = fmt.Sprintf("%s:3306", *testHost) cfg.Addr = fmt.Sprintf("%s:3306", *testHost)
cfg.User = "root" cfg.User = "root"
cfg.HeartbeatPeriod = 200 * time.Millisecond
cfg.ReadTimeout = 300 * time.Millisecond
cfg.Dump.ExecutionPath = "mysqldump" cfg.Dump.ExecutionPath = "mysqldump"
cfg.Dump.TableDB = "test" cfg.Dump.TableDB = "test"
cfg.Dump.Tables = []string{"canal_test"} cfg.Dump.Tables = []string{"canal_test"}
os.RemoveAll(cfg.DataDir)
var err error var err error
s.c, err = NewCanal(cfg) s.c, err = NewCanal(cfg)
c.Assert(err, IsNil) c.Assert(err, IsNil)
s.execute(c, "DROP TABLE IF EXISTS test.canal_test")
sql := ` sql := `
CREATE TABLE IF NOT EXISTS test.canal_test ( CREATE TABLE IF NOT EXISTS test.canal_test (
id int AUTO_INCREMENT, id int AUTO_INCREMENT,
@ -52,12 +53,16 @@ func (s *canalTestSuite) SetUpSuite(c *C) {
s.execute(c, "SET GLOBAL binlog_format = 'ROW'") s.execute(c, "SET GLOBAL binlog_format = 'ROW'")
s.c.RegRowsEventHandler(&testRowsEventHandler{}) s.c.SetEventHandler(&testEventHandler{})
err = s.c.Start() err = s.c.Start()
c.Assert(err, IsNil) c.Assert(err, IsNil)
} }
func (s *canalTestSuite) TearDownSuite(c *C) { func (s *canalTestSuite) TearDownSuite(c *C) {
// To test the heartbeat and read timeout,so need to sleep 1 seconds without data transmission
log.Infof("Start testing the heartbeat and read timeout")
time.Sleep(time.Second)
if s.c != nil { if s.c != nil {
s.c.Close() s.c.Close()
s.c = nil s.c = nil
@ -70,16 +75,17 @@ func (s *canalTestSuite) execute(c *C, query string, args ...interface{}) *mysql
return r return r
} }
type testRowsEventHandler struct { type testEventHandler struct {
DummyEventHandler
} }
func (h *testRowsEventHandler) Do(e *RowsEvent) error { func (h *testEventHandler) Do(e *RowsEvent) error {
log.Infof("%s %v\n", e.Action, e.Rows) log.Infof("%s %v\n", e.Action, e.Rows)
return nil return nil
} }
func (h *testRowsEventHandler) String() string { func (h *testEventHandler) String() string {
return "testRowsEventHandler" return "testEventHandler"
} }
func (s *canalTestSuite) TestCanal(c *C) { func (s *canalTestSuite) TestCanal(c *C) {
@ -88,7 +94,50 @@ func (s *canalTestSuite) TestCanal(c *C) {
for i := 1; i < 10; i++ { for i := 1; i < 10; i++ {
s.execute(c, "INSERT INTO test.canal_test (name) VALUES (?)", fmt.Sprintf("%d", i)) s.execute(c, "INSERT INTO test.canal_test (name) VALUES (?)", fmt.Sprintf("%d", i))
} }
s.execute(c, "ALTER TABLE test.canal_test ADD `age` INT(5) NOT NULL AFTER `name`")
s.execute(c, "INSERT INTO test.canal_test (name,age) VALUES (?,?)", "d", "18")
err := s.c.CatchMasterPos(100) err := s.c.CatchMasterPos(10 * time.Second)
c.Assert(err, IsNil) c.Assert(err, IsNil)
} }
func TestAlterTableExp(t *testing.T) {
cases := []string{
"ALTER TABLE `mydb`.`mytable` ADD `field2` DATE NULL AFTER `field1`;",
"ALTER TABLE `mytable` ADD `field2` DATE NULL AFTER `field1`;",
"ALTER TABLE mydb.mytable ADD `field2` DATE NULL AFTER `field1`;",
"ALTER TABLE mytable ADD `field2` DATE NULL AFTER `field1`;",
"ALTER TABLE mydb.mytable ADD field2 DATE NULL AFTER `field1`;",
}
table := []byte("mytable")
db := []byte("mydb")
for _, s := range cases {
m := expAlterTable.FindSubmatch([]byte(s))
if m == nil || !bytes.Equal(m[2], table) || (len(m[1]) > 0 && !bytes.Equal(m[1], db)) {
t.Fatalf("TestAlterTableExp: case %s failed\n", s)
}
}
}
func TestRenameTableExp(t *testing.T) {
cases := []string{
"rename table `mydb`.`mytable` to `mydb`.`mytable1`",
"rename table `mytable` to `mytable1`",
"rename table mydb.mytable to mydb.mytable1",
"rename table mytable to mytable1",
"rename table `mydb`.`mytable1` to `mydb`.`mytable2`, `mydb`.`mytable` to `mydb`.`mytable1`",
"rename table `mytable1` to `mytable2`, `mytable` to `mytable1`",
"rename table mydb.mytable1 to mydb.mytable2, mydb.mytable to mydb.mytable1",
"rename table mytable1 to mytable2, mytable to mytable1",
}
table := []byte("mytable1")
db := []byte("mydb")
for _, s := range cases {
m := expRenameTable.FindSubmatch([]byte(s))
if m == nil || !bytes.Equal(m[2], table) || (len(m[1]) > 0 && !bytes.Equal(m[1], db)) {
t.Fatalf("TestRenameTableExp: case %s failed\n", s)
}
}
}

View File

@ -7,6 +7,7 @@ import (
"github.com/BurntSushi/toml" "github.com/BurntSushi/toml"
"github.com/juju/errors" "github.com/juju/errors"
"github.com/siddontang/go-mysql/mysql"
) )
type DumpConfig struct { type DumpConfig struct {
@ -25,6 +26,13 @@ type DumpConfig struct {
// If true, discard error msg, else, output to stderr // If true, discard error msg, else, output to stderr
DiscardErr bool `toml:"discard_err"` DiscardErr bool `toml:"discard_err"`
// Set true to skip --master-data if we have no privilege to do
// 'FLUSH TABLES WITH READ LOCK'
SkipMasterData bool `toml:"skip_master_data"`
// Set to change the default max_allowed_packet size
MaxAllowedPacketMB int `toml:"max_allowed_packet_mb"`
} }
type Config struct { type Config struct {
@ -32,9 +40,11 @@ type Config struct {
User string `toml:"user"` User string `toml:"user"`
Password string `toml:"password"` Password string `toml:"password"`
Charset string `toml:"charset"`
ServerID uint32 `toml:"server_id"` ServerID uint32 `toml:"server_id"`
Flavor string `toml:"flavor"` Flavor string `toml:"flavor"`
DataDir string `toml:"data_dir"` HeartbeatPeriod time.Duration `toml:"heartbeat_period"`
ReadTimeout time.Duration `toml:"read_timeout"`
Dump DumpConfig `toml:"dump"` Dump DumpConfig `toml:"dump"`
} }
@ -66,14 +76,15 @@ func NewDefaultConfig() *Config {
c.User = "root" c.User = "root"
c.Password = "" c.Password = ""
c.Charset = mysql.DEFAULT_CHARSET
rand.Seed(time.Now().Unix()) rand.Seed(time.Now().Unix())
c.ServerID = uint32(rand.Intn(1000)) + 1001 c.ServerID = uint32(rand.Intn(1000)) + 1001
c.Flavor = "mysql" c.Flavor = "mysql"
c.DataDir = "./var"
c.Dump.ExecutionPath = "mysqldump" c.Dump.ExecutionPath = "mysqldump"
c.Dump.DiscardErr = true c.Dump.DiscardErr = true
c.Dump.SkipMasterData = false
return c return c
} }

View File

@ -7,6 +7,7 @@ import (
"github.com/juju/errors" "github.com/juju/errors"
"github.com/ngaut/log" "github.com/ngaut/log"
"github.com/siddontang/go-mysql/dump" "github.com/siddontang/go-mysql/dump"
"github.com/siddontang/go-mysql/mysql"
"github.com/siddontang/go-mysql/schema" "github.com/siddontang/go-mysql/schema"
) )
@ -23,8 +24,8 @@ func (h *dumpParseHandler) BinLog(name string, pos uint64) error {
} }
func (h *dumpParseHandler) Data(db string, table string, values []string) error { func (h *dumpParseHandler) Data(db string, table string, values []string) error {
if h.c.isClosed() { if err := h.c.ctx.Err(); err != nil {
return errCanalClosed return err
} }
tableInfo, err := h.c.GetTable(db, table) tableInfo, err := h.c.GetTable(db, table)
@ -63,7 +64,7 @@ func (h *dumpParseHandler) Data(db string, table string, values []string) error
} }
events := newRowsEvent(tableInfo, InsertAction, [][]interface{}{vs}) events := newRowsEvent(tableInfo, InsertAction, [][]interface{}{vs})
return h.c.travelRowsEventHandler(events) return h.c.eventHandler.OnRow(events)
} }
func (c *Canal) AddDumpDatabases(dbs ...string) { func (c *Canal) AddDumpDatabases(dbs ...string) {
@ -91,9 +92,11 @@ func (c *Canal) AddDumpIgnoreTables(db string, tables ...string) {
} }
func (c *Canal) tryDump() error { func (c *Canal) tryDump() error {
if len(c.master.Name) > 0 && c.master.Position > 0 { pos := c.master.Position()
gtid := c.master.GTID()
if (len(pos.Name) > 0 && pos.Pos > 0) || gtid != nil {
// we will sync with binlog name and position // we will sync with binlog name and position
log.Infof("skip dump, use last binlog replication pos (%s, %d)", c.master.Name, c.master.Position) log.Infof("skip dump, use last binlog replication pos %s or GTID %s", pos, gtid)
return nil return nil
} }
@ -104,6 +107,16 @@ func (c *Canal) tryDump() error {
h := &dumpParseHandler{c: c} h := &dumpParseHandler{c: c}
if c.cfg.Dump.SkipMasterData {
pos, err := c.GetMasterPos()
if err != nil {
return errors.Trace(err)
}
log.Infof("skip master data, get current binlog position %v", pos)
h.name = pos.Name
h.pos = uint64(pos.Pos)
}
start := time.Now() start := time.Now()
log.Info("try dump MySQL and parse") log.Info("try dump MySQL and parse")
if err := c.dumper.DumpAndParse(h); err != nil { if err := c.dumper.DumpAndParse(h); err != nil {
@ -113,8 +126,8 @@ func (c *Canal) tryDump() error {
log.Infof("dump MySQL and parse OK, use %0.2f seconds, start binlog replication at (%s, %d)", log.Infof("dump MySQL and parse OK, use %0.2f seconds, start binlog replication at (%s, %d)",
time.Now().Sub(start).Seconds(), h.name, h.pos) time.Now().Sub(start).Seconds(), h.name, h.pos)
c.master.Update(h.name, uint32(h.pos)) pos = mysql.Position{h.name, uint32(h.pos)}
c.master.Save(true) c.master.Update(pos)
c.eventHandler.OnPosSynced(pos, true)
return nil return nil
} }

View File

@ -1,41 +1,36 @@
package canal package canal
import ( import (
"github.com/juju/errors"
"github.com/ngaut/log"
"github.com/siddontang/go-mysql/mysql" "github.com/siddontang/go-mysql/mysql"
"github.com/siddontang/go-mysql/replication"
) )
var ( type EventHandler interface {
ErrHandleInterrupted = errors.New("do handler error, interrupted") OnRotate(roateEvent *replication.RotateEvent) error
) OnDDL(nextPos mysql.Position, queryEvent *replication.QueryEvent) error
OnRow(e *RowsEvent) error
type RowsEventHandler interface { OnXID(nextPos mysql.Position) error
// Handle RowsEvent, if return ErrHandleInterrupted, canal will OnGTID(gtid mysql.GTIDSet) error
// stop the sync // OnPosSynced Use your own way to sync position. When force is true, sync position immediately.
Do(e *RowsEvent) error OnPosSynced(pos mysql.Position, force bool) error
String() string String() string
} }
func (c *Canal) RegRowsEventHandler(h RowsEventHandler) { type DummyEventHandler struct {
c.rsLock.Lock()
c.rsHandlers = append(c.rsHandlers, h)
c.rsLock.Unlock()
} }
func (c *Canal) travelRowsEventHandler(e *RowsEvent) error { func (h *DummyEventHandler) OnRotate(*replication.RotateEvent) error { return nil }
c.rsLock.Lock() func (h *DummyEventHandler) OnDDL(mysql.Position, *replication.QueryEvent) error {
defer c.rsLock.Unlock()
var err error
for _, h := range c.rsHandlers {
if err = h.Do(e); err != nil && !mysql.ErrorEqual(err, ErrHandleInterrupted) {
log.Errorf("handle %v err: %v", h, err)
} else if mysql.ErrorEqual(err, ErrHandleInterrupted) {
log.Errorf("handle %v err, interrupted", h)
return ErrHandleInterrupted
}
}
return nil return nil
} }
func (h *DummyEventHandler) OnRow(*RowsEvent) error { return nil }
func (h *DummyEventHandler) OnXID(mysql.Position) error { return nil }
func (h *DummyEventHandler) OnGTID(mysql.GTIDSet) error { return nil }
func (h *DummyEventHandler) OnPosSynced(mysql.Position, bool) error { return nil }
func (h *DummyEventHandler) String() string { return "DummyEventHandler" }
// `SetEventHandler` registers the sync handler, you must register your
// own handler before starting Canal.
func (c *Canal) SetEventHandler(h EventHandler) {
c.eventHandler = h
}

View File

@ -1,89 +1,46 @@
package canal package canal
import ( import (
"bytes"
"os"
"sync" "sync"
"time"
"github.com/BurntSushi/toml"
"github.com/juju/errors"
"github.com/ngaut/log" "github.com/ngaut/log"
"github.com/siddontang/go-mysql/mysql" "github.com/siddontang/go-mysql/mysql"
"github.com/siddontang/go/ioutil2"
) )
type masterInfo struct { type masterInfo struct {
Addr string `toml:"addr"` sync.RWMutex
Name string `toml:"bin_name"`
Position uint32 `toml:"bin_pos"`
name string pos mysql.Position
l sync.Mutex gtid mysql.GTIDSet
lastSaveTime time.Time
} }
func loadMasterInfo(name string) (*masterInfo, error) { func (m *masterInfo) Update(pos mysql.Position) {
var m masterInfo log.Debugf("update master position %s", pos)
m.name = name m.Lock()
m.pos = pos
f, err := os.Open(name) m.Unlock()
if err != nil && !os.IsNotExist(errors.Cause(err)) {
return nil, errors.Trace(err)
} else if os.IsNotExist(errors.Cause(err)) {
return &m, nil
}
defer f.Close()
_, err = toml.DecodeReader(f, &m)
return &m, err
} }
func (m *masterInfo) Save(force bool) error { func (m *masterInfo) UpdateGTID(gtid mysql.GTIDSet) {
m.l.Lock() log.Debugf("update master gtid %s", gtid.String())
defer m.l.Unlock()
n := time.Now() m.Lock()
if !force && n.Sub(m.lastSaveTime) < time.Second { m.gtid = gtid
return nil m.Unlock()
} }
var buf bytes.Buffer func (m *masterInfo) Position() mysql.Position {
e := toml.NewEncoder(&buf) m.RLock()
defer m.RUnlock()
e.Encode(m) return m.pos
var err error
if err = ioutil2.WriteFileAtomic(m.name, buf.Bytes(), 0644); err != nil {
log.Errorf("canal save master info to file %s err %v", m.name, err)
} }
m.lastSaveTime = n func (m *masterInfo) GTID() mysql.GTIDSet {
m.RLock()
defer m.RUnlock()
return errors.Trace(err) return m.gtid
}
func (m *masterInfo) Update(name string, pos uint32) {
m.l.Lock()
m.Name = name
m.Position = pos
m.l.Unlock()
}
func (m *masterInfo) Pos() mysql.Position {
var pos mysql.Position
m.l.Lock()
pos.Name = m.Name
pos.Pos = m.Position
m.l.Unlock()
return pos
}
func (m *masterInfo) Close() {
m.Save(true)
} }

View File

@ -53,6 +53,16 @@ func GetPKValues(table *schema.Table, row []interface{}) ([]interface{}, error)
return values, nil return values, nil
} }
// Get term column's value
func GetColumnValue(table *schema.Table, column string, row []interface{}) (interface{}, error) {
index := table.FindColumn(column)
if index == -1 {
return nil, errors.Errorf("table %s has no column name %s", table, column)
}
return row[index], nil
}
// String implements fmt.Stringer interface. // String implements fmt.Stringer interface.
func (r *RowsEvent) String() string { func (r *RowsEvent) String() string {
return fmt.Sprintf("%s %s %v", r.Action, r.Table, r.Rows) return fmt.Sprintf("%s %s %v", r.Action, r.Table, r.Rows)

View File

@ -1,49 +1,66 @@
package canal package canal
import ( import (
"fmt"
"regexp"
"time" "time"
"golang.org/x/net/context"
"github.com/juju/errors" "github.com/juju/errors"
"github.com/ngaut/log" "github.com/ngaut/log"
"github.com/satori/go.uuid"
"github.com/siddontang/go-mysql/mysql" "github.com/siddontang/go-mysql/mysql"
"github.com/siddontang/go-mysql/replication" "github.com/siddontang/go-mysql/replication"
"github.com/siddontang/go-mysql/schema"
) )
func (c *Canal) startSyncBinlog() error { var (
pos := mysql.Position{c.master.Name, c.master.Position} expAlterTable = regexp.MustCompile("(?i)^ALTER\\sTABLE\\s.*?`{0,1}(.*?)`{0,1}\\.{0,1}`{0,1}([^`\\.]+?)`{0,1}\\s.*")
expRenameTable = regexp.MustCompile("(?i)^RENAME\\sTABLE.*TO\\s.*?`{0,1}(.*?)`{0,1}\\.{0,1}`{0,1}([^`\\.]+?)`{0,1}$")
log.Infof("start sync binlog at %v", pos) )
func (c *Canal) startSyncer() (*replication.BinlogStreamer, error) {
if !c.useGTID {
pos := c.master.Position()
s, err := c.syncer.StartSync(pos) s, err := c.syncer.StartSync(pos)
if err != nil { if err != nil {
return errors.Errorf("start sync replication at %v error %v", pos, err) return nil, errors.Errorf("start sync replication at binlog %v error %v", pos, err)
}
log.Infof("start sync binlog at binlog file %v", pos)
return s, nil
} else {
gset := c.master.GTID()
s, err := c.syncer.StartSyncGTID(gset)
if err != nil {
return nil, errors.Errorf("start sync replication at GTID %v error %v", gset, err)
}
log.Infof("start sync binlog at GTID %v", gset)
return s, nil
}
} }
timeout := time.Second func (c *Canal) runSyncBinlog() error {
forceSavePos := false
s, err := c.startSyncer()
if err != nil {
return err
}
savePos := false
force := false
for { for {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) ev, err := s.GetEvent(c.ctx)
ev, err := s.GetEvent(ctx)
cancel()
if err == context.DeadlineExceeded {
timeout = 2 * timeout
continue
}
if err != nil { if err != nil {
return errors.Trace(err) return errors.Trace(err)
} }
savePos = false
force = false
pos := c.master.Position()
timeout = time.Second curPos := pos.Pos
//next binlog pos //next binlog pos
pos.Pos = ev.Header.LogPos pos.Pos = ev.Header.LogPos
forceSavePos = false
// We only save position with RotateEvent and XIDEvent. // We only save position with RotateEvent and XIDEvent.
// For RowsEvent, we can't save the position until meeting XIDEvent // For RowsEvent, we can't save the position until meeting XIDEvent
// which tells the whole transaction is over. // which tells the whole transaction is over.
@ -52,24 +69,69 @@ func (c *Canal) startSyncBinlog() error {
case *replication.RotateEvent: case *replication.RotateEvent:
pos.Name = string(e.NextLogName) pos.Name = string(e.NextLogName)
pos.Pos = uint32(e.Position) pos.Pos = uint32(e.Position)
// r.ev <- pos log.Infof("rotate binlog to %s", pos)
forceSavePos = true savePos = true
log.Infof("rotate binlog to %v", pos) force = true
if err = c.eventHandler.OnRotate(e); err != nil {
return errors.Trace(err)
}
case *replication.RowsEvent: case *replication.RowsEvent:
// we only focus row based event // we only focus row based event
if err = c.handleRowsEvent(ev); err != nil { err = c.handleRowsEvent(ev)
log.Errorf("handle rows event error %v", err) if err != nil && errors.Cause(err) != schema.ErrTableNotExist {
// We can ignore table not exist error
log.Errorf("handle rows event at (%s, %d) error %v", pos.Name, curPos, err)
return errors.Trace(err) return errors.Trace(err)
} }
continue continue
case *replication.XIDEvent: case *replication.XIDEvent:
savePos = true
// try to save the position later // try to save the position later
if err := c.eventHandler.OnXID(pos); err != nil {
return errors.Trace(err)
}
case *replication.MariadbGTIDEvent:
// try to save the GTID later
gtid := e.GTID
c.master.UpdateGTID(gtid)
if err := c.eventHandler.OnGTID(gtid); err != nil {
return errors.Trace(err)
}
case *replication.GTIDEvent:
u, _ := uuid.FromBytes(e.SID)
gset, err := mysql.ParseMysqlGTIDSet(fmt.Sprintf("%s:%d", u.String(), e.GNO))
if err != nil {
return errors.Trace(err)
}
c.master.UpdateGTID(gset)
if err := c.eventHandler.OnGTID(gset); err != nil {
return errors.Trace(err)
}
case *replication.QueryEvent:
if mb := checkRenameTable(e); mb != nil {
if len(mb[1]) == 0 {
mb[1] = e.Schema
}
savePos = true
force = true
c.ClearTableCache(mb[1], mb[2])
log.Infof("table structure changed, clear table cache: %s.%s\n", mb[1], mb[2])
if err = c.eventHandler.OnDDL(pos, e); err != nil {
return errors.Trace(err)
}
} else {
// skip others
continue
}
default: default:
continue continue
} }
c.master.Update(pos.Name, pos.Pos) if savePos {
c.master.Save(forceSavePos) c.master.Update(pos)
c.eventHandler.OnPosSynced(pos, force)
}
} }
return nil return nil
@ -98,24 +160,21 @@ func (c *Canal) handleRowsEvent(e *replication.BinlogEvent) error {
return errors.Errorf("%s not supported now", e.Header.EventType) return errors.Errorf("%s not supported now", e.Header.EventType)
} }
events := newRowsEvent(t, action, ev.Rows) events := newRowsEvent(t, action, ev.Rows)
return c.travelRowsEventHandler(events) return c.eventHandler.OnRow(events)
} }
func (c *Canal) WaitUntilPos(pos mysql.Position, timeout int) error { func (c *Canal) WaitUntilPos(pos mysql.Position, timeout time.Duration) error {
if timeout <= 0 { timer := time.NewTimer(timeout)
timeout = 60
}
timer := time.NewTimer(time.Duration(timeout) * time.Second)
for { for {
select { select {
case <-timer.C: case <-timer.C:
return errors.Errorf("wait position %v err", pos) return errors.Errorf("wait position %v too long > %s", pos, timeout)
default: default:
curpos := c.master.Pos() curPos := c.master.Position()
if curpos.Compare(pos) >= 0 { if curPos.Compare(pos) >= 0 {
return nil return nil
} else { } else {
log.Debugf("master pos is %v, wait catching %v", curPos, pos)
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)
} }
} }
@ -124,14 +183,32 @@ func (c *Canal) WaitUntilPos(pos mysql.Position, timeout int) error {
return nil return nil
} }
func (c *Canal) CatchMasterPos(timeout int) error { func (c *Canal) GetMasterPos() (mysql.Position, error) {
rr, err := c.Execute("SHOW MASTER STATUS") rr, err := c.Execute("SHOW MASTER STATUS")
if err != nil { if err != nil {
return errors.Trace(err) return mysql.Position{"", 0}, errors.Trace(err)
} }
name, _ := rr.GetString(0, 0) name, _ := rr.GetString(0, 0)
pos, _ := rr.GetInt(0, 1) pos, _ := rr.GetInt(0, 1)
return c.WaitUntilPos(mysql.Position{name, uint32(pos)}, timeout) return mysql.Position{name, uint32(pos)}, nil
}
func (c *Canal) CatchMasterPos(timeout time.Duration) error {
pos, err := c.GetMasterPos()
if err != nil {
return errors.Trace(err)
}
return c.WaitUntilPos(pos, timeout)
}
func checkRenameTable(e *replication.QueryEvent) [][]byte {
var mb = [][]byte{}
if mb = expAlterTable.FindSubmatch(e.Query); mb != nil {
return mb
}
mb = expRenameTable.FindSubmatch(e.Query)
return mb
} }

View File

@ -7,8 +7,10 @@ import (
"os/signal" "os/signal"
"strings" "strings"
"syscall" "syscall"
"time"
"github.com/siddontang/go-mysql/canal" "github.com/siddontang/go-mysql/canal"
"github.com/siddontang/go-mysql/mysql"
) )
var host = flag.String("host", "127.0.0.1", "MySQL host") var host = flag.String("host", "127.0.0.1", "MySQL host")
@ -18,8 +20,6 @@ var password = flag.String("password", "", "MySQL password")
var flavor = flag.String("flavor", "mysql", "Flavor: mysql or mariadb") var flavor = flag.String("flavor", "mysql", "Flavor: mysql or mariadb")
var dataDir = flag.String("data-dir", "./var", "Path to store data, like master.info")
var serverID = flag.Int("server-id", 101, "Unique Server ID") var serverID = flag.Int("server-id", 101, "Unique Server ID")
var mysqldump = flag.String("mysqldump", "mysqldump", "mysqldump execution path") var mysqldump = flag.String("mysqldump", "mysqldump", "mysqldump execution path")
@ -28,6 +28,12 @@ var tables = flag.String("tables", "", "dump tables, seperated by comma, will ov
var tableDB = flag.String("table_db", "test", "database for dump tables") var tableDB = flag.String("table_db", "test", "database for dump tables")
var ignoreTables = flag.String("ignore_tables", "", "ignore tables, must be database.table format, separated by comma") var ignoreTables = flag.String("ignore_tables", "", "ignore tables, must be database.table format, separated by comma")
var startName = flag.String("bin_name", "", "start sync from binlog name")
var startPos = flag.Uint("bin_pos", 0, "start sync from binlog position of")
var heartbeatPeriod = flag.Duration("heartbeat", 60*time.Second, "master heartbeat period")
var readTimeout = flag.Duration("read_timeout", 90*time.Second, "connection read timeout")
func main() { func main() {
flag.Parse() flag.Parse()
@ -36,8 +42,9 @@ func main() {
cfg.User = *user cfg.User = *user
cfg.Password = *password cfg.Password = *password
cfg.Flavor = *flavor cfg.Flavor = *flavor
cfg.DataDir = *dataDir
cfg.ReadTimeout = *readTimeout
cfg.HeartbeatPeriod = *heartbeatPeriod
cfg.ServerID = uint32(*serverID) cfg.ServerID = uint32(*serverID)
cfg.Dump.ExecutionPath = *mysqldump cfg.Dump.ExecutionPath = *mysqldump
cfg.Dump.DiscardErr = false cfg.Dump.DiscardErr = false
@ -65,9 +72,14 @@ func main() {
c.AddDumpDatabases(subs...) c.AddDumpDatabases(subs...)
} }
c.RegRowsEventHandler(&handler{}) c.SetEventHandler(&handler{})
err = c.Start() startPos := mysql.Position{
*startName,
uint32(*startPos),
}
err = c.StartFrom(startPos)
if err != nil { if err != nil {
fmt.Printf("start canal err %V", err) fmt.Printf("start canal err %V", err)
os.Exit(1) os.Exit(1)
@ -88,9 +100,10 @@ func main() {
} }
type handler struct { type handler struct {
canal.DummyEventHandler
} }
func (h *handler) Do(e *canal.RowsEvent) error { func (h *handler) OnRow(e *canal.RowsEvent) error {
fmt.Printf("%v\n", e) fmt.Printf("%v\n", e)
return nil return nil

View File

@ -4,12 +4,11 @@
package main package main
import ( import (
"context"
"flag" "flag"
"fmt" "fmt"
"os" "os"
"golang.org/x/net/context"
"github.com/juju/errors" "github.com/juju/errors"
"github.com/siddontang/go-mysql/mysql" "github.com/siddontang/go-mysql/mysql"
"github.com/siddontang/go-mysql/replication" "github.com/siddontang/go-mysql/replication"
@ -41,11 +40,11 @@ func main() {
Port: uint16(*port), Port: uint16(*port),
User: *user, User: *user,
Password: *password, Password: *password,
RawModeEanbled: *rawMode, RawModeEnabled: *rawMode,
SemiSyncEnabled: *semiSync, SemiSyncEnabled: *semiSync,
} }
b := replication.NewBinlogSyncer(&cfg) b := replication.NewBinlogSyncer(cfg)
pos := mysql.Position{*file, uint32(*pos)} pos := mysql.Position{*file, uint32(*pos)}
if len(*backupPath) > 0 { if len(*backupPath) > 0 {

View File

@ -8,6 +8,7 @@ import (
"strings" "strings"
"github.com/juju/errors" "github.com/juju/errors"
. "github.com/siddontang/go-mysql/mysql"
) )
// Unlick mysqldump, Dumper is designed for parsing and syning data easily. // Unlick mysqldump, Dumper is designed for parsing and syning data easily.
@ -25,9 +26,14 @@ type Dumper struct {
Databases []string Databases []string
Charset string
IgnoreTables map[string][]string IgnoreTables map[string][]string
ErrOut io.Writer ErrOut io.Writer
masterDataSkipped bool
maxAllowedPacket int
} }
func NewDumper(executionPath string, addr string, user string, password string) (*Dumper, error) { func NewDumper(executionPath string, addr string, user string, password string) (*Dumper, error) {
@ -47,17 +53,32 @@ func NewDumper(executionPath string, addr string, user string, password string)
d.Password = password d.Password = password
d.Tables = make([]string, 0, 16) d.Tables = make([]string, 0, 16)
d.Databases = make([]string, 0, 16) d.Databases = make([]string, 0, 16)
d.Charset = DEFAULT_CHARSET
d.IgnoreTables = make(map[string][]string) d.IgnoreTables = make(map[string][]string)
d.masterDataSkipped = false
d.ErrOut = os.Stderr d.ErrOut = os.Stderr
return d, nil return d, nil
} }
func (d *Dumper) SetCharset(charset string) {
d.Charset = charset
}
func (d *Dumper) SetErrOut(o io.Writer) { func (d *Dumper) SetErrOut(o io.Writer) {
d.ErrOut = o d.ErrOut = o
} }
// In some cloud MySQL, we have no privilege to use `--master-data`.
func (d *Dumper) SkipMasterData(v bool) {
d.masterDataSkipped = v
}
func (d *Dumper) SetMaxAllowedPacket(i int) {
d.maxAllowedPacket = i
}
func (d *Dumper) AddDatabases(dbs ...string) { func (d *Dumper) AddDatabases(dbs ...string) {
d.Databases = append(d.Databases, dbs...) d.Databases = append(d.Databases, dbs...)
} }
@ -97,7 +118,14 @@ func (d *Dumper) Dump(w io.Writer) error {
args = append(args, fmt.Sprintf("--user=%s", d.User)) args = append(args, fmt.Sprintf("--user=%s", d.User))
args = append(args, fmt.Sprintf("--password=%s", d.Password)) args = append(args, fmt.Sprintf("--password=%s", d.Password))
if !d.masterDataSkipped {
args = append(args, "--master-data") args = append(args, "--master-data")
}
if d.maxAllowedPacket > 0 {
args = append(args, fmt.Sprintf("--max_allowed_packet=%dM", d.maxAllowedPacket))
}
args = append(args, "--single-transaction") args = append(args, "--single-transaction")
args = append(args, "--skip-lock-tables") args = append(args, "--skip-lock-tables")
@ -133,6 +161,10 @@ func (d *Dumper) Dump(w io.Writer) error {
w.Write([]byte(fmt.Sprintf("USE `%s`;\n", d.TableDB))) w.Write([]byte(fmt.Sprintf("USE `%s`;\n", d.TableDB)))
} }
if len(d.Charset) != 0 {
args = append(args, fmt.Sprintf("--default-character-set=%s", d.Charset))
}
cmd := exec.Command(d.ExecutionPath, args...) cmd := exec.Command(d.ExecutionPath, args...)
cmd.Stderr = d.ErrOut cmd.Stderr = d.ErrOut
@ -147,7 +179,7 @@ func (d *Dumper) DumpAndParse(h ParseHandler) error {
done := make(chan error, 1) done := make(chan error, 1)
go func() { go func() {
err := Parse(r, h) err := Parse(r, h, !d.masterDataSkipped)
r.CloseWithError(err) r.CloseWithError(err)
done <- err done <- err
}() }()

View File

@ -38,6 +38,7 @@ func (s *schemaTestSuite) SetUpSuite(c *C) {
c.Assert(err, IsNil) c.Assert(err, IsNil)
c.Assert(s.d, NotNil) c.Assert(s.d, NotNil)
s.d.SetCharset("utf8")
s.d.SetErrOut(os.Stderr) s.d.SetErrOut(os.Stderr)
_, err = s.conn.Execute("CREATE DATABASE IF NOT EXISTS test1") _, err = s.conn.Execute("CREATE DATABASE IF NOT EXISTS test1")
@ -177,7 +178,7 @@ func (s *schemaTestSuite) TestParse(c *C) {
err := s.d.Dump(&buf) err := s.d.Dump(&buf)
c.Assert(err, IsNil) c.Assert(err, IsNil)
err = Parse(&buf, new(testParseHandler)) err = Parse(&buf, new(testParseHandler), true)
c.Assert(err, IsNil) c.Assert(err, IsNil)
} }

View File

@ -34,7 +34,7 @@ func init() {
// Parse the dump data with Dumper generate. // Parse the dump data with Dumper generate.
// It can not parse all the data formats with mysqldump outputs // It can not parse all the data formats with mysqldump outputs
func Parse(r io.Reader, h ParseHandler) error { func Parse(r io.Reader, h ParseHandler, parseBinlogPos bool) error {
rb := bufio.NewReaderSize(r, 1024*16) rb := bufio.NewReaderSize(r, 1024*16)
var db string var db string
@ -50,7 +50,7 @@ func Parse(r io.Reader, h ParseHandler) error {
line = line[0 : len(line)-1] line = line[0 : len(line)-1]
if !binlogParsed { if parseBinlogPos && !binlogParsed {
if m := binlogExp.FindAllStringSubmatch(line, -1); len(m) == 1 { if m := binlogExp.FindAllStringSubmatch(line, -1); len(m) == 1 {
name := m[0][1] name := m[0][1]
pos, err := strconv.ParseUint(m[0][2], 10, 64) pos, err := strconv.ParseUint(m[0][2], 10, 64)

View File

@ -23,8 +23,4 @@ imports:
- hack - hack
- ioutil2 - ioutil2
- sync2 - sync2
- name: golang.org/x/net
version: 6acef71eb69611914f7a30939ea9f6e194c78172
subpackages:
- context
testImports: [] testImports: []

View File

@ -22,5 +22,3 @@ import:
- hack - hack
- ioutil2 - ioutil2
- sync2 - sync2
- package: golang.org/x/net
version: 6acef71eb69611914f7a30939ea9f6e194c78172

View File

@ -57,3 +57,10 @@ func NewError(errCode uint16, message string) *MyError {
return e return e
} }
func ErrorCode(errMsg string) (code int) {
var tmpStr string
// golang scanf doesn't support %*,so I used a temporary variable
fmt.Sscanf(errMsg, "%s%d", &tmpStr, &code)
return
}

View File

@ -11,6 +11,8 @@ type GTIDSet interface {
Equal(o GTIDSet) bool Equal(o GTIDSet) bool
Contain(o GTIDSet) bool Contain(o GTIDSet) bool
Update(GTIDStr string) error
} }
func ParseGTIDSet(flavor string, s string) (GTIDSet, error) { func ParseGTIDSet(flavor string, s string) (GTIDSet, error) {

View File

@ -78,3 +78,14 @@ func (gtid MariadbGTID) Contain(o GTIDSet) bool {
return gtid.DomainID == other.DomainID && gtid.SequenceNumber >= other.SequenceNumber return gtid.DomainID == other.DomainID && gtid.SequenceNumber >= other.SequenceNumber
} }
func (gtid MariadbGTID) Update(GTIDStr string) error {
newGTID, err := ParseMariadbGTIDSet(GTIDStr)
if err != nil {
return err
}
gtid = newGTID.(MariadbGTID)
return nil
}

View File

@ -291,11 +291,13 @@ type MysqlGTIDSet struct {
func ParseMysqlGTIDSet(str string) (GTIDSet, error) { func ParseMysqlGTIDSet(str string) (GTIDSet, error) {
s := new(MysqlGTIDSet) s := new(MysqlGTIDSet)
s.Sets = make(map[string]*UUIDSet)
if str == "" {
return s, nil
}
sp := strings.Split(str, ",") sp := strings.Split(str, ",")
s.Sets = make(map[string]*UUIDSet, len(sp))
//todo, handle redundant same uuid //todo, handle redundant same uuid
for i := 0; i < len(sp); i++ { for i := 0; i < len(sp); i++ {
if set, err := ParseUUIDSet(sp[i]); err != nil { if set, err := ParseUUIDSet(sp[i]); err != nil {
@ -334,6 +336,9 @@ func DecodeMysqlGTIDSet(data []byte) (*MysqlGTIDSet, error) {
} }
func (s *MysqlGTIDSet) AddSet(set *UUIDSet) { func (s *MysqlGTIDSet) AddSet(set *UUIDSet) {
if set == nil {
return
}
sid := set.SID.String() sid := set.SID.String()
o, ok := s.Sets[sid] o, ok := s.Sets[sid]
if ok { if ok {
@ -343,6 +348,17 @@ func (s *MysqlGTIDSet) AddSet(set *UUIDSet) {
} }
} }
func (s *MysqlGTIDSet) Update(GTIDStr string) error {
uuidSet, err := ParseUUIDSet(GTIDStr)
if err != nil {
return err
}
s.AddSet(uuidSet)
return nil
}
func (s *MysqlGTIDSet) Contain(o GTIDSet) bool { func (s *MysqlGTIDSet) Contain(o GTIDSet) bool {
sub, ok := o.(*MysqlGTIDSet) sub, ok := o.(*MysqlGTIDSet)
if !ok { if !ok {

View File

@ -151,3 +151,19 @@ func (t *mysqlTestSuite) TestMysqlParseBinaryUint64(c *check.C) {
u64 := ParseBinaryUint64([]byte{1, 2, 3, 4, 5, 6, 7, 128}) u64 := ParseBinaryUint64([]byte{1, 2, 3, 4, 5, 6, 7, 128})
c.Assert(u64, check.Equals, 128*uint64(72057594037927936)+7*uint64(281474976710656)+6*uint64(1099511627776)+5*uint64(4294967296)+4*16777216+3*65536+2*256+1) c.Assert(u64, check.Equals, 128*uint64(72057594037927936)+7*uint64(281474976710656)+6*uint64(1099511627776)+5*uint64(4294967296)+4*16777216+3*65536+2*256+1)
} }
func (t *mysqlTestSuite) TestErrorCode(c *check.C) {
tbls := []struct {
msg string
code int
}{
{"ERROR 1094 (HY000): Unknown thread id: 1094", 1094},
{"error string", 0},
{"abcdefg", 0},
{"123455 ks094", 0},
{"ERROR 1046 (3D000): Unknown error 1046", 1046},
}
for _, v := range tbls {
c.Assert(ErrorCode(v.msg), check.Equals, v.code)
}
}

View File

@ -10,6 +10,7 @@ import (
"strings" "strings"
"github.com/juju/errors" "github.com/juju/errors"
"github.com/ngaut/log"
"github.com/siddontang/go/hack" "github.com/siddontang/go/hack"
) )
@ -351,4 +352,7 @@ func init() {
EncodeMap[byte(i)] = to EncodeMap[byte(i)] = to
} }
} }
// Disable highlight by default
log.SetHighlighting(false)
} }

View File

@ -1,13 +1,12 @@
package replication package replication
import ( import (
"context"
"io" "io"
"os" "os"
"path" "path"
"time" "time"
"golang.org/x/net/context"
"github.com/juju/errors" "github.com/juju/errors"
. "github.com/siddontang/go-mysql/mysql" . "github.com/siddontang/go-mysql/mysql"
) )

View File

@ -1,7 +1,7 @@
package replication package replication
import ( import (
"golang.org/x/net/context" "context"
"github.com/juju/errors" "github.com/juju/errors"
"github.com/ngaut/log" "github.com/ngaut/log"

View File

@ -1,17 +1,18 @@
package replication package replication
import ( import (
"context"
"crypto/tls" "crypto/tls"
"encoding/binary" "encoding/binary"
"fmt" "fmt"
"net"
"os" "os"
"sync" "sync"
"time" "time"
"golang.org/x/net/context"
"github.com/juju/errors" "github.com/juju/errors"
"github.com/ngaut/log" "github.com/ngaut/log"
"github.com/satori/go.uuid"
"github.com/siddontang/go-mysql/client" "github.com/siddontang/go-mysql/client"
. "github.com/siddontang/go-mysql/mysql" . "github.com/siddontang/go-mysql/mysql"
) )
@ -40,21 +41,37 @@ type BinlogSyncerConfig struct {
// If not set, use os.Hostname() instead. // If not set, use os.Hostname() instead.
Localhost string Localhost string
// Charset is for MySQL client character set
Charset string
// SemiSyncEnabled enables semi-sync or not. // SemiSyncEnabled enables semi-sync or not.
SemiSyncEnabled bool SemiSyncEnabled bool
// RawModeEanbled is for not parsing binlog event. // RawModeEnabled is for not parsing binlog event.
RawModeEanbled bool RawModeEnabled bool
// If not nil, use the provided tls.Config to connect to the database using TLS/SSL. // If not nil, use the provided tls.Config to connect to the database using TLS/SSL.
TLSConfig *tls.Config TLSConfig *tls.Config
// Use replication.Time structure for timestamp and datetime.
// We will use Local location for timestamp and UTC location for datatime.
ParseTime bool
// RecvBufferSize sets the size in bytes of the operating system's receive buffer associated with the connection.
RecvBufferSize int
// master heartbeat period
HeartbeatPeriod time.Duration
// read timeout
ReadTimeout time.Duration
} }
// BinlogSyncer syncs binlog event from server. // BinlogSyncer syncs binlog event from server.
type BinlogSyncer struct { type BinlogSyncer struct {
m sync.RWMutex m sync.RWMutex
cfg *BinlogSyncerConfig cfg BinlogSyncerConfig
c *client.Conn c *client.Conn
@ -64,22 +81,33 @@ type BinlogSyncer struct {
nextPos Position nextPos Position
useGTID bool
gset GTIDSet
running bool running bool
ctx context.Context ctx context.Context
cancel context.CancelFunc cancel context.CancelFunc
lastConnectionID uint32
} }
// NewBinlogSyncer creates the BinlogSyncer with cfg. // NewBinlogSyncer creates the BinlogSyncer with cfg.
func NewBinlogSyncer(cfg *BinlogSyncerConfig) *BinlogSyncer { func NewBinlogSyncer(cfg BinlogSyncerConfig) *BinlogSyncer {
// Clear the Password to avoid outputing it in log.
pass := cfg.Password
cfg.Password = ""
log.Infof("create BinlogSyncer with config %v", cfg) log.Infof("create BinlogSyncer with config %v", cfg)
cfg.Password = pass
b := new(BinlogSyncer) b := new(BinlogSyncer)
b.cfg = cfg b.cfg = cfg
b.parser = NewBinlogParser() b.parser = NewBinlogParser()
b.parser.SetRawMode(b.cfg.RawModeEanbled) b.parser.SetRawMode(b.cfg.RawModeEnabled)
b.parser.SetParseTime(b.cfg.ParseTime)
b.useGTID = false
b.running = false b.running = false
b.ctx, b.cancel = context.WithCancel(context.Background()) b.ctx, b.cancel = context.WithCancel(context.Background())
@ -140,6 +168,37 @@ func (b *BinlogSyncer) registerSlave() error {
return errors.Trace(err) return errors.Trace(err)
} }
if len(b.cfg.Charset) != 0 {
b.c.SetCharset(b.cfg.Charset)
}
//set read timeout
if b.cfg.ReadTimeout > 0 {
b.c.SetReadDeadline(time.Now().Add(b.cfg.ReadTimeout))
}
if b.cfg.RecvBufferSize > 0 {
if tcp, ok := b.c.Conn.Conn.(*net.TCPConn); ok {
tcp.SetReadBuffer(b.cfg.RecvBufferSize)
}
}
// kill last connection id
if b.lastConnectionID > 0 {
cmd := fmt.Sprintf("KILL %d", b.lastConnectionID)
if _, err := b.c.Execute(cmd); err != nil {
log.Errorf("kill connection %d error %v", b.lastConnectionID, err)
// Unknown thread id
if code := ErrorCode(err.Error()); code != ER_NO_SUCH_THREAD {
return errors.Trace(err)
}
}
log.Infof("kill last connection id %d", b.lastConnectionID)
}
// save last last connection id for kill
b.lastConnectionID = b.c.GetConnectionID()
//for mysql 5.6+, binlog has a crc32 checksum //for mysql 5.6+, binlog has a crc32 checksum
//before mysql 5.6, this will not work, don't matter.:-) //before mysql 5.6, this will not work, don't matter.:-)
if r, err := b.c.Execute("SHOW GLOBAL VARIABLES LIKE 'BINLOG_CHECKSUM'"); err != nil { if r, err := b.c.Execute("SHOW GLOBAL VARIABLES LIKE 'BINLOG_CHECKSUM'"); err != nil {
@ -175,6 +234,14 @@ func (b *BinlogSyncer) registerSlave() error {
} }
} }
if b.cfg.HeartbeatPeriod > 0 {
_, err = b.c.Execute(fmt.Sprintf("SET @master_heartbeat_period=%d;", b.cfg.HeartbeatPeriod))
if err != nil {
log.Error("failed to set @master_heartbeat_period=%d", b.cfg.HeartbeatPeriod, err)
return errors.Trace(err)
}
}
if err = b.writeRegisterSlaveCommand(); err != nil { if err = b.writeRegisterSlaveCommand(); err != nil {
return errors.Trace(err) return errors.Trace(err)
} }
@ -236,6 +303,11 @@ func (b *BinlogSyncer) startDumpStream() *BinlogStreamer {
return s return s
} }
// GetNextPosition returns the next position of the syncer
func (b *BinlogSyncer) GetNextPosition() Position {
return b.nextPos
}
// StartSync starts syncing from the `pos` position. // StartSync starts syncing from the `pos` position.
func (b *BinlogSyncer) StartSync(pos Position) (*BinlogStreamer, error) { func (b *BinlogSyncer) StartSync(pos Position) (*BinlogStreamer, error) {
log.Infof("begin to sync binlog from position %s", pos) log.Infof("begin to sync binlog from position %s", pos)
@ -258,6 +330,9 @@ func (b *BinlogSyncer) StartSync(pos Position) (*BinlogStreamer, error) {
func (b *BinlogSyncer) StartSyncGTID(gset GTIDSet) (*BinlogStreamer, error) { func (b *BinlogSyncer) StartSyncGTID(gset GTIDSet) (*BinlogStreamer, error) {
log.Infof("begin to sync binlog from GTID %s", gset) log.Infof("begin to sync binlog from GTID %s", gset)
b.useGTID = true
b.gset = gset
b.m.Lock() b.m.Lock()
defer b.m.Unlock() defer b.m.Unlock()
@ -284,7 +359,7 @@ func (b *BinlogSyncer) StartSyncGTID(gset GTIDSet) (*BinlogStreamer, error) {
return b.startDumpStream(), nil return b.startDumpStream(), nil
} }
func (b *BinlogSyncer) writeBinglogDumpCommand(p Position) error { func (b *BinlogSyncer) writeBinlogDumpCommand(p Position) error {
b.c.ResetSequence() b.c.ResetSequence()
data := make([]byte, 4+1+4+2+4+len(p.Name)) data := make([]byte, 4+1+4+2+4+len(p.Name))
@ -364,7 +439,7 @@ func (b *BinlogSyncer) writeBinlogDumpMariadbGTIDCommand(gset GTIDSet) error {
} }
// Since we use @slave_connect_state, the file and position here are ignored. // Since we use @slave_connect_state, the file and position here are ignored.
return b.writeBinglogDumpCommand(Position{"", 0}) return b.writeBinlogDumpCommand(Position{"", 0})
} }
// localHostname returns the hostname that register slave would register as. // localHostname returns the hostname that register slave would register as.
@ -449,12 +524,20 @@ func (b *BinlogSyncer) retrySync() error {
b.m.Lock() b.m.Lock()
defer b.m.Unlock() defer b.m.Unlock()
log.Infof("begin to re-sync from %s", b.nextPos)
b.parser.Reset() b.parser.Reset()
if b.useGTID {
log.Infof("begin to re-sync from %s", b.gset.String())
if err := b.prepareSyncGTID(b.gset); err != nil {
return errors.Trace(err)
}
} else {
log.Infof("begin to re-sync from %s", b.nextPos)
if err := b.prepareSyncPos(b.nextPos); err != nil { if err := b.prepareSyncPos(b.nextPos); err != nil {
return errors.Trace(err) return errors.Trace(err)
} }
}
return nil return nil
} }
@ -469,13 +552,33 @@ func (b *BinlogSyncer) prepareSyncPos(pos Position) error {
return errors.Trace(err) return errors.Trace(err)
} }
if err := b.writeBinglogDumpCommand(pos); err != nil { if err := b.writeBinlogDumpCommand(pos); err != nil {
return errors.Trace(err) return errors.Trace(err)
} }
return nil return nil
} }
func (b *BinlogSyncer) prepareSyncGTID(gset GTIDSet) error {
var err error
if err = b.prepare(); err != nil {
return errors.Trace(err)
}
if b.cfg.Flavor != MariaDBFlavor {
// default use MySQL
err = b.writeBinlogDumpMysqlGTIDCommand(gset)
} else {
err = b.writeBinlogDumpMariadbGTIDCommand(gset)
}
if err != nil {
return err
}
return nil
}
func (b *BinlogSyncer) onStream(s *BinlogStreamer) { func (b *BinlogSyncer) onStream(s *BinlogStreamer) {
defer func() { defer func() {
if e := recover(); e != nil { if e := recover(); e != nil {
@ -490,8 +593,8 @@ func (b *BinlogSyncer) onStream(s *BinlogStreamer) {
log.Error(err) log.Error(err)
// we meet connection error, should re-connect again with // we meet connection error, should re-connect again with
// last nextPos we got. // last nextPos or nextGTID we got.
if len(b.nextPos.Name) == 0 { if len(b.nextPos.Name) == 0 && b.gset == nil {
// we can't get the correct position, close. // we can't get the correct position, close.
s.closeWithError(err) s.closeWithError(err)
return return
@ -517,6 +620,11 @@ func (b *BinlogSyncer) onStream(s *BinlogStreamer) {
continue continue
} }
//set read timeout
if b.cfg.ReadTimeout > 0 {
b.c.SetReadDeadline(time.Now().Add(b.cfg.ReadTimeout))
}
switch data[0] { switch data[0] {
case OK_HEADER: case OK_HEADER:
if err = b.parseEvent(s, data); err != nil { if err = b.parseEvent(s, data); err != nil {
@ -552,7 +660,7 @@ func (b *BinlogSyncer) parseEvent(s *BinlogStreamer, data []byte) error {
data = data[2:] data = data[2:]
} }
e, err := b.parser.parse(data) e, err := b.parser.Parse(data)
if err != nil { if err != nil {
return errors.Trace(err) return errors.Trace(err)
} }
@ -561,11 +669,33 @@ func (b *BinlogSyncer) parseEvent(s *BinlogStreamer, data []byte) error {
// Some events like FormatDescriptionEvent return 0, ignore. // Some events like FormatDescriptionEvent return 0, ignore.
b.nextPos.Pos = e.Header.LogPos b.nextPos.Pos = e.Header.LogPos
} }
switch event := e.Event.(type) {
if re, ok := e.Event.(*RotateEvent); ok { case *RotateEvent:
b.nextPos.Name = string(re.NextLogName) b.nextPos.Name = string(event.NextLogName)
b.nextPos.Pos = uint32(re.Position) b.nextPos.Pos = uint32(event.Position)
log.Infof("rotate to %s", b.nextPos) log.Infof("rotate to %s", b.nextPos)
case *GTIDEvent:
if !b.useGTID {
break
}
u, _ := uuid.FromBytes(event.SID)
err := b.gset.Update(fmt.Sprintf("%s:%d", u.String(), event.GNO))
if err != nil {
return errors.Trace(err)
}
case *MariadbGTIDEvent:
if !b.useGTID {
break
}
GTID := event.GTID
err := b.gset.Update(fmt.Sprintf("%d-%d-%d", GTID.DomainID, GTID.ServerID, GTID.SequenceNumber))
if err != nil {
return errors.Trace(err)
}
case *XIDEvent:
event.GSet = b.getGtidSet()
case *QueryEvent:
event.GSet = b.getGtidSet()
} }
needStop := false needStop := false
@ -588,3 +718,19 @@ func (b *BinlogSyncer) parseEvent(s *BinlogStreamer, data []byte) error {
return nil return nil
} }
func (b *BinlogSyncer) getGtidSet() GTIDSet {
var gtidSet GTIDSet
if !b.useGTID {
return nil
}
if b.cfg.Flavor != MariaDBFlavor {
gtidSet, _ = ParseGTIDSet(MySQLFlavor, b.gset.String())
} else {
gtidSet, _ = ParseGTIDSet(MariaDBFlavor, b.gset.String())
}
return gtidSet
}

View File

@ -216,6 +216,9 @@ func (e *RotateEvent) Dump(w io.Writer) {
type XIDEvent struct { type XIDEvent struct {
XID uint64 XID uint64
// in fact XIDEvent dosen't have the GTIDSet information, just for beneficial to use
GSet GTIDSet
} }
func (e *XIDEvent) Decode(data []byte) error { func (e *XIDEvent) Decode(data []byte) error {
@ -225,6 +228,9 @@ func (e *XIDEvent) Decode(data []byte) error {
func (e *XIDEvent) Dump(w io.Writer) { func (e *XIDEvent) Dump(w io.Writer) {
fmt.Fprintf(w, "XID: %d\n", e.XID) fmt.Fprintf(w, "XID: %d\n", e.XID)
if e.GSet != nil {
fmt.Fprintf(w, "GTIDSet: %s\n", e.GSet.String())
}
fmt.Fprintln(w) fmt.Fprintln(w)
} }
@ -235,6 +241,9 @@ type QueryEvent struct {
StatusVars []byte StatusVars []byte
Schema []byte Schema []byte
Query []byte Query []byte
// in fact QueryEvent dosen't have the GTIDSet information, just for beneficial to use
GSet GTIDSet
} }
func (e *QueryEvent) Decode(data []byte) error { func (e *QueryEvent) Decode(data []byte) error {
@ -275,6 +284,9 @@ func (e *QueryEvent) Dump(w io.Writer) {
//fmt.Fprintf(w, "Status vars: \n%s", hex.Dump(e.StatusVars)) //fmt.Fprintf(w, "Status vars: \n%s", hex.Dump(e.StatusVars))
fmt.Fprintf(w, "Schema: %s\n", e.Schema) fmt.Fprintf(w, "Schema: %s\n", e.Schema)
fmt.Fprintf(w, "Query: %s\n", e.Query) fmt.Fprintf(w, "Query: %s\n", e.Query)
if e.GSet != nil {
fmt.Fprintf(w, "GTIDSet: %s\n", e.GSet.String())
}
fmt.Fprintln(w) fmt.Fprintln(w)
} }

View File

@ -338,7 +338,7 @@ func (d *jsonBinaryDecoder) decodeString(data []byte) string {
l, n := d.decodeVariableLength(data) l, n := d.decodeVariableLength(data)
if d.isDataShort(data, int(l)+n) { if d.isDataShort(data, l+n) {
return "" return ""
} }
@ -358,11 +358,11 @@ func (d *jsonBinaryDecoder) decodeOpaque(data []byte) interface{} {
l, n := d.decodeVariableLength(data) l, n := d.decodeVariableLength(data)
if d.isDataShort(data, int(l)+n) { if d.isDataShort(data, l+n) {
return nil return nil
} }
data = data[n : int(l)+n] data = data[n : l+n]
switch tp { switch tp {
case MYSQL_TYPE_NEWDECIMAL: case MYSQL_TYPE_NEWDECIMAL:
@ -459,7 +459,7 @@ func (d *jsonBinaryDecoder) decodeVariableLength(data []byte) (int, int) {
length := uint64(0) length := uint64(0)
for ; pos < maxCount; pos++ { for ; pos < maxCount; pos++ {
v := data[pos] v := data[pos]
length = (length << 7) + uint64(v&0x7F) length |= uint64(v & 0x7F) << uint(7 * pos)
if v&0x80 == 0 { if v&0x80 == 0 {
if length > math.MaxUint32 { if length > math.MaxUint32 {

View File

@ -16,6 +16,8 @@ type BinlogParser struct {
// for rawMode, we only parse FormatDescriptionEvent and RotateEvent // for rawMode, we only parse FormatDescriptionEvent and RotateEvent
rawMode bool rawMode bool
parseTime bool
} }
func NewBinlogParser() *BinlogParser { func NewBinlogParser() *BinlogParser {
@ -54,12 +56,10 @@ func (p *BinlogParser) ParseFile(name string, offset int64, onEvent OnEventFunc)
return errors.Errorf("seek %s to %d error %v", name, offset, err) return errors.Errorf("seek %s to %d error %v", name, offset, err)
} }
return p.parseReader(f, onEvent) return p.ParseReader(f, onEvent)
} }
func (p *BinlogParser) parseReader(r io.Reader, onEvent OnEventFunc) error { func (p *BinlogParser) ParseReader(r io.Reader, onEvent OnEventFunc) error {
p.Reset()
var err error var err error
var n int64 var n int64
@ -100,7 +100,10 @@ func (p *BinlogParser) parseReader(r io.Reader, onEvent OnEventFunc) error {
var e Event var e Event
e, err = p.parseEvent(h, data) e, err = p.parseEvent(h, data)
if err != nil { if err != nil {
break if _, ok := err.(errMissingTableMapEvent); ok {
continue
}
return errors.Trace(err)
} }
if err = onEvent(&BinlogEvent{rawData, h, e}); err != nil { if err = onEvent(&BinlogEvent{rawData, h, e}); err != nil {
@ -115,6 +118,10 @@ func (p *BinlogParser) SetRawMode(mode bool) {
p.rawMode = mode p.rawMode = mode
} }
func (p *BinlogParser) SetParseTime(parseTime bool) {
p.parseTime = parseTime
}
func (p *BinlogParser) parseHeader(data []byte) (*EventHeader, error) { func (p *BinlogParser) parseHeader(data []byte) (*EventHeader, error) {
h := new(EventHeader) h := new(EventHeader)
err := h.Decode(data) err := h.Decode(data)
@ -206,7 +213,13 @@ func (p *BinlogParser) parseEvent(h *EventHeader, data []byte) (Event, error) {
return e, nil return e, nil
} }
func (p *BinlogParser) parse(data []byte) (*BinlogEvent, error) { // Given the bytes for a a binary log event: return the decoded event.
// With the exception of the FORMAT_DESCRIPTION_EVENT event type
// there must have previously been passed a FORMAT_DESCRIPTION_EVENT
// into the parser for this to work properly on any given event.
// Passing a new FORMAT_DESCRIPTION_EVENT into the parser will replace
// an existing one.
func (p *BinlogParser) Parse(data []byte) (*BinlogEvent, error) {
rawData := data rawData := data
h, err := p.parseHeader(data) h, err := p.parseHeader(data)
@ -240,6 +253,7 @@ func (p *BinlogParser) newRowsEvent(h *EventHeader) *RowsEvent {
e.needBitmap2 = false e.needBitmap2 = false
e.tables = p.tables e.tables = p.tables
e.parseTime = p.parseTime
switch h.EventType { switch h.EventType {
case WRITE_ROWS_EVENTv0: case WRITE_ROWS_EVENTv0:

View File

@ -29,7 +29,7 @@ func (t *testSyncerSuite) TestIndexOutOfRange(c *C) {
0x3065f: &TableMapEvent{tableIDSize: 6, TableID: 0x3065f, Flags: 0x1, Schema: []uint8{0x73, 0x65, 0x69, 0x75, 0x6d, 0x61, 0x73, 0x74, 0x65, 0x72}, Table: []uint8{0x63, 0x6f, 0x6e, 0x73, 0x5f, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x73, 0x70, 0x65, 0x61, 0x6b, 0x6f, 0x75, 0x74, 0x5f, 0x6c, 0x65, 0x74, 0x74, 0x65, 0x72}, ColumnCount: 0xd, ColumnType: []uint8{0x3, 0x3, 0x3, 0x3, 0x1, 0x12, 0xf, 0xf, 0x12, 0xf, 0xf, 0x3, 0xf}, ColumnMeta: []uint16{0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x180, 0x180, 0x0, 0x180, 0x180, 0x0, 0x2fd}, NullBitmap: []uint8{0xe0, 0x17}}, 0x3065f: &TableMapEvent{tableIDSize: 6, TableID: 0x3065f, Flags: 0x1, Schema: []uint8{0x73, 0x65, 0x69, 0x75, 0x6d, 0x61, 0x73, 0x74, 0x65, 0x72}, Table: []uint8{0x63, 0x6f, 0x6e, 0x73, 0x5f, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x73, 0x70, 0x65, 0x61, 0x6b, 0x6f, 0x75, 0x74, 0x5f, 0x6c, 0x65, 0x74, 0x74, 0x65, 0x72}, ColumnCount: 0xd, ColumnType: []uint8{0x3, 0x3, 0x3, 0x3, 0x1, 0x12, 0xf, 0xf, 0x12, 0xf, 0xf, 0x3, 0xf}, ColumnMeta: []uint16{0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x180, 0x180, 0x0, 0x180, 0x180, 0x0, 0x2fd}, NullBitmap: []uint8{0xe0, 0x17}},
} }
_, err := parser.parse([]byte{ _, err := parser.Parse([]byte{
/* 0x00, */ 0xc1, 0x86, 0x8e, 0x55, 0x1e, 0xa5, 0x14, 0x80, 0xa, 0x55, 0x0, 0x0, 0x0, 0x7, 0xc, /* 0x00, */ 0xc1, 0x86, 0x8e, 0x55, 0x1e, 0xa5, 0x14, 0x80, 0xa, 0x55, 0x0, 0x0, 0x0, 0x7, 0xc,
0xbf, 0xe, 0x0, 0x0, 0x5f, 0x6, 0x3, 0x0, 0x0, 0x0, 0x1, 0x0, 0x2, 0x0, 0xd, 0xff, 0xbf, 0xe, 0x0, 0x0, 0x5f, 0x6, 0x3, 0x0, 0x0, 0x0, 0x1, 0x0, 0x2, 0x0, 0xd, 0xff,
0x0, 0x0, 0x19, 0x63, 0x7, 0x0, 0xca, 0x61, 0x5, 0x0, 0x5e, 0xf7, 0xc, 0x0, 0xf5, 0x7, 0x0, 0x0, 0x19, 0x63, 0x7, 0x0, 0xca, 0x61, 0x5, 0x0, 0x5e, 0xf7, 0xc, 0x0, 0xf5, 0x7,

View File

@ -1,6 +1,7 @@
package replication package replication
import ( import (
"context"
"flag" "flag"
"fmt" "fmt"
"os" "os"
@ -8,8 +9,6 @@ import (
"testing" "testing"
"time" "time"
"golang.org/x/net/context"
. "github.com/pingcap/check" . "github.com/pingcap/check"
uuid "github.com/satori/go.uuid" uuid "github.com/satori/go.uuid"
"github.com/siddontang/go-mysql/client" "github.com/siddontang/go-mysql/client"
@ -230,6 +229,28 @@ func (t *testSyncerSuite) testSync(c *C, s *BinlogStreamer) {
} }
} }
str = `DROP TABLE IF EXISTS test_parse_time`
t.testExecute(c, str)
// Must allow zero time.
t.testExecute(c, `SET sql_mode=''`)
str = `CREATE TABLE test_parse_time (
a1 DATETIME,
a2 DATETIME(3),
a3 DATETIME(6),
b1 TIMESTAMP,
b2 TIMESTAMP(3) ,
b3 TIMESTAMP(6))`
t.testExecute(c, str)
t.testExecute(c, `INSERT INTO test_parse_time VALUES
("2014-09-08 17:51:04.123456", "2014-09-08 17:51:04.123456", "2014-09-08 17:51:04.123456",
"2014-09-08 17:51:04.123456","2014-09-08 17:51:04.123456","2014-09-08 17:51:04.123456"),
("0000-00-00 00:00:00.000000", "0000-00-00 00:00:00.000000", "0000-00-00 00:00:00.000000",
"0000-00-00 00:00:00.000000", "0000-00-00 00:00:00.000000", "0000-00-00 00:00:00.000000"),
("2014-09-08 17:51:04.000456", "2014-09-08 17:51:04.000456", "2014-09-08 17:51:04.000456",
"2014-09-08 17:51:04.000456","2014-09-08 17:51:04.000456","2014-09-08 17:51:04.000456")`)
t.wg.Wait() t.wg.Wait()
} }
@ -271,7 +292,7 @@ func (t *testSyncerSuite) setupTest(c *C, flavor string) {
Password: "", Password: "",
} }
t.b = NewBinlogSyncer(&cfg) t.b = NewBinlogSyncer(cfg)
} }
func (t *testSyncerSuite) testPositionSync(c *C) { func (t *testSyncerSuite) testPositionSync(c *C) {

View File

@ -15,6 +15,8 @@ import (
"github.com/siddontang/go/hack" "github.com/siddontang/go/hack"
) )
type errMissingTableMapEvent error
type TableMapEvent struct { type TableMapEvent struct {
tableIDSize int tableIDSize int
@ -223,6 +225,8 @@ type RowsEvent struct {
//rows: invalid: int64, float64, bool, []byte, string //rows: invalid: int64, float64, bool, []byte, string
Rows [][]interface{} Rows [][]interface{}
parseTime bool
} }
func (e *RowsEvent) Decode(data []byte) error { func (e *RowsEvent) Decode(data []byte) error {
@ -257,7 +261,11 @@ func (e *RowsEvent) Decode(data []byte) error {
var ok bool var ok bool
e.Table, ok = e.tables[e.TableID] e.Table, ok = e.tables[e.TableID]
if !ok { if !ok {
return errors.Errorf("invalid table id %d, no correspond table map event", e.TableID) if len(e.tables) > 0 {
return errors.Errorf("invalid table id %d, no corresponding table map event", e.TableID)
} else {
return errMissingTableMapEvent(errors.Errorf("invalid table id %d, no corresponding table map event", e.TableID))
}
} }
var err error var err error
@ -336,6 +344,21 @@ func (e *RowsEvent) decodeRows(data []byte, table *TableMapEvent, bitmap []byte)
return pos, nil return pos, nil
} }
func (e *RowsEvent) parseFracTime(t interface{}) interface{} {
v, ok := t.(fracTime)
if !ok {
return t
}
if !e.parseTime {
// Don't parse time, return string directly
return v.String()
}
// return Golang time directly
return v.Time
}
// see mysql sql/log_event.cc log_event_print_value // see mysql sql/log_event.cc log_event_print_value
func (e *RowsEvent) decodeValue(data []byte, tp byte, meta uint16) (v interface{}, n int, err error) { func (e *RowsEvent) decodeValue(data []byte, tp byte, meta uint16) (v interface{}, n int, err error) {
var length int = 0 var length int = 0
@ -394,24 +417,26 @@ func (e *RowsEvent) decodeValue(data []byte, tp byte, meta uint16) (v interface{
case MYSQL_TYPE_TIMESTAMP: case MYSQL_TYPE_TIMESTAMP:
n = 4 n = 4
t := binary.LittleEndian.Uint32(data) t := binary.LittleEndian.Uint32(data)
v = time.Unix(int64(t), 0) v = e.parseFracTime(fracTime{time.Unix(int64(t), 0), 0})
case MYSQL_TYPE_TIMESTAMP2: case MYSQL_TYPE_TIMESTAMP2:
v, n, err = decodeTimestamp2(data, meta) v, n, err = decodeTimestamp2(data, meta)
v = e.parseFracTime(v)
case MYSQL_TYPE_DATETIME: case MYSQL_TYPE_DATETIME:
n = 8 n = 8
i64 := binary.LittleEndian.Uint64(data) i64 := binary.LittleEndian.Uint64(data)
d := i64 / 1000000 d := i64 / 1000000
t := i64 % 1000000 t := i64 % 1000000
v = time.Date(int(d/10000), v = e.parseFracTime(fracTime{time.Date(int(d/10000),
time.Month((d%10000)/100), time.Month((d%10000)/100),
int(d%100), int(d%100),
int(t/10000), int(t/10000),
int((t%10000)/100), int((t%10000)/100),
int(t%100), int(t%100),
0, 0,
time.UTC).Format(TimeFormat) time.UTC), 0})
case MYSQL_TYPE_DATETIME2: case MYSQL_TYPE_DATETIME2:
v, n, err = decodeDatetime2(data, meta) v, n, err = decodeDatetime2(data, meta)
v = e.parseFracTime(v)
case MYSQL_TYPE_TIME: case MYSQL_TYPE_TIME:
n = 3 n = 3
i32 := uint32(FixedLengthInt(data[0:3])) i32 := uint32(FixedLengthInt(data[0:3]))
@ -464,8 +489,8 @@ func (e *RowsEvent) decodeValue(data []byte, tp byte, meta uint16) (v interface{
case MYSQL_TYPE_STRING: case MYSQL_TYPE_STRING:
v, n = decodeString(data, length) v, n = decodeString(data, length)
case MYSQL_TYPE_JSON: case MYSQL_TYPE_JSON:
// Refer https://github.com/shyiko/mysql-binlog-connector-java/blob/8f9132ee773317e00313204beeae8ddcaa43c1b4/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/AbstractRowsEventDataDeserializer.java#L344 // Refer: https://github.com/shyiko/mysql-binlog-connector-java/blob/master/src/main/java/com/github/shyiko/mysql/binlog/event/deserialization/AbstractRowsEventDataDeserializer.java#L404
length = int(binary.LittleEndian.Uint32(data[0:])) length = int(FixedLengthInt(data[0:meta]))
n = length + int(meta) n = length + int(meta)
v, err = decodeJsonBinary(data[meta:n]) v, err = decodeJsonBinary(data[meta:n])
case MYSQL_TYPE_GEOMETRY: case MYSQL_TYPE_GEOMETRY:
@ -615,11 +640,10 @@ func decodeTimestamp2(data []byte, dec uint16) (interface{}, int, error) {
} }
if sec == 0 { if sec == 0 {
return "0000-00-00 00:00:00", n, nil return formatZeroTime(int(usec), int(dec)), n, nil
} }
t := time.Unix(sec, usec*1000) return fracTime{time.Unix(sec, usec*1000), int(dec)}, n, nil
return t, n, nil
} }
const DATETIMEF_INT_OFS int64 = 0x8000000000 const DATETIMEF_INT_OFS int64 = 0x8000000000
@ -641,7 +665,7 @@ func decodeDatetime2(data []byte, dec uint16) (interface{}, int, error) {
} }
if intPart == 0 { if intPart == 0 {
return "0000-00-00 00:00:00", n, nil return formatZeroTime(int(frac), int(dec)), n, nil
} }
tmp := intPart<<24 + frac tmp := intPart<<24 + frac
@ -650,7 +674,7 @@ func decodeDatetime2(data []byte, dec uint16) (interface{}, int, error) {
tmp = -tmp tmp = -tmp
} }
var secPart int64 = tmp % (1 << 24) // var secPart int64 = tmp % (1 << 24)
ymdhms := tmp >> 24 ymdhms := tmp >> 24
ymd := ymdhms >> 17 ymd := ymdhms >> 17
@ -665,10 +689,7 @@ func decodeDatetime2(data []byte, dec uint16) (interface{}, int, error) {
minute := int((hms >> 6) % (1 << 6)) minute := int((hms >> 6) % (1 << 6))
hour := int((hms >> 12)) hour := int((hms >> 12))
if secPart != 0 { return fracTime{time.Date(year, time.Month(month), day, hour, minute, second, int(frac*1000), time.UTC), int(dec)}, n, nil
return fmt.Sprintf("%04d-%02d-%02d %02d:%02d:%02d.%06d", year, month, day, hour, minute, second, secPart), n, nil // commented by Shlomi Noach. Yes I know about `git blame`
}
return fmt.Sprintf("%04d-%02d-%02d %02d:%02d:%02d", year, month, day, hour, minute, second), n, nil // commented by Shlomi Noach. Yes I know about `git blame`
} }
const TIMEF_OFS int64 = 0x800000000000 const TIMEF_OFS int64 = 0x800000000000

View File

@ -403,7 +403,8 @@ func (_ *testDecodeSuite) TestParseJson(c *C) {
// INSERT INTO `t10` (`c2`) VALUES (1); // INSERT INTO `t10` (`c2`) VALUES (1);
// INSERT INTO `t10` (`c1`, `c2`) VALUES ('{"key1": "value1", "key2": "value2"}', 1); // INSERT INTO `t10` (`c1`, `c2`) VALUES ('{"key1": "value1", "key2": "value2"}', 1);
// test json deserialization
// INSERT INTO `t10`(`c1`,`c2`) VALUES ('{"text":"Lorem ipsum dolor sit amet, consectetuer adipiscing elit. Aenean commodo ligula eget dolor. Aenean massa. Cum sociis natoque penatibus et magnis dis parturient montes, nascetur ridiculus mus. Donec quam felis, ultricies nec, pellentesque eu, pretium quis, sem. Nulla consequat massa quis enim. Donec pede justo, fringilla vel, aliquet nec, vulputate eget, arcu. In enim justo, rhoncus ut, imperdiet a, venenatis vitae, justo. Nullam dictum felis eu pede mollis pretium. Integer tincidunt. Cras dapibus. Vivamus elementum semper nisi. Aenean vulputate eleifend tellus. Aenean leo ligula, porttitor eu, consequat vitae, eleifend ac, enim. Aliquam lorem ante, dapibus in, viverra quis, feugiat a, tellus. Phasellus viverra nulla ut metus varius laoreet. Quisque rutrum. Aenean imperdiet. Etiam ultricies nisi vel augue. Curabitur ullamcorper ultricies nisi. Nam eget dui. Etiam rhoncus. Maecenas tempus, tellus eget condimentum rhoncus, sem quam semper libero, sit amet adipiscing sem neque sed ipsum. Nam quam nunc, blandit vel, luctus pulvinar, hendrerit id, lorem. Maecenas nec odio et ante tincidunt tempus. Donec vitae sapien ut libero venenatis faucibus. Nullam quis ante. Etiam sit amet orci eget eros faucibus tincidunt. Duis leo. Sed fringilla mauris sit amet nibh. Donec sodales sagittis magna. Sed consequat, leo eget bibendum sodales, augue velit cursus nunc, quis gravida magna mi a libero. Fusce vulputate eleifend sapien. Vestibulum purus quam, scelerisque ut, mollis sed, nonummy id, metus. Nullam accumsan lorem in dui. Cras ultricies mi eu turpis hendrerit fringilla. Vestibulum ante ipsum primis in faucibus orci luctus et ultrices posuere cubilia Curae; In ac dui quis mi consectetuer lacinia. Nam pretium turpis et arcu. Duis arcu tortor, suscipit eget, imperdiet nec, imperdiet iaculis, ipsum. Sed aliquam ultrices mauris. Integer ante arcu, accumsan a, consectetuer eget, posuere ut, mauris. Praesent adipiscing. Phasellus ullamcorper ipsum rutrum nunc. Nunc nonummy metus. Vestibulum volutpat pretium libero. Cras id dui. Aenean ut eros et nisl sagittis vestibulum. Nullam nulla eros, ultricies sit amet, nonummy id, imperdiet feugiat, pede. Sed lectus. Donec mollis hendrerit risus. Phasellus nec sem in justo pellentesque facilisis. Etiam imperdiet imperdiet orci. Nunc nec neque. Phasellus leo dolor, tempus non, auctor et, hendrerit quis, nisi. Curabitur ligula sapien, tincidunt non, euismod vitae, posuere imperdiet, leo. Maecenas malesuada. Praesent congue erat at massa. Sed cursus turpis vitae tortor. Donec posuere vulputate arcu. Phasellus accumsan cursus velit. Vestibulum ante ipsum primis in faucibus orci luctus et ultrices posuere cubilia Curae; Sed aliquam, nisi quis porttitor congue, elit erat euismod orci, ac"}',101);
tableMapEventData := []byte("m\x00\x00\x00\x00\x00\x01\x00\x04test\x00\x03t10\x00\x02\xf5\xf6\x03\x04\n\x00\x03") tableMapEventData := []byte("m\x00\x00\x00\x00\x00\x01\x00\x04test\x00\x03t10\x00\x02\xf5\xf6\x03\x04\n\x00\x03")
tableMapEvent := new(TableMapEvent) tableMapEvent := new(TableMapEvent)
@ -428,4 +429,15 @@ func (_ *testDecodeSuite) TestParseJson(c *C) {
c.Assert(err, IsNil) c.Assert(err, IsNil)
c.Assert(rows.Rows[0][1], Equals, float64(1)) c.Assert(rows.Rows[0][1], Equals, float64(1))
} }
longTbls := [][]byte{
[]byte("m\x00\x00\x00\x00\x00\x01\x00\x02\x00\x02\xff\xfc\xd0\n\x00\x00\x00\x01\x00\xcf\n\v\x00\x04\x00\f\x0f\x00text\xbe\x15Lorem ipsum dolor sit amet, consectetuer adipiscing elit. Aenean commodo ligula eget dolor. Aenean massa. Cum sociis natoque penatibus et magnis dis parturient montes, nascetur ridiculus mus. Donec quam felis, ultricies nec, pellentesque eu, pretium quis, sem. Nulla consequat massa quis enim. Donec pede justo, fringilla vel, aliquet nec, vulputate eget, arcu. In enim justo, rhoncus ut, imperdiet a, venenatis vitae, justo. Nullam dictum felis eu pede mollis pretium. Integer tincidunt. Cras dapibus. Vivamus elementum semper nisi. Aenean vulputate eleifend tellus. Aenean leo ligula, porttitor eu, consequat vitae, eleifend ac, enim. Aliquam lorem ante, dapibus in, viverra quis, feugiat a, tellus. Phasellus viverra nulla ut metus varius laoreet. Quisque rutrum. Aenean imperdiet. Etiam ultricies nisi vel augue. Curabitur ullamcorper ultricies nisi. Nam eget dui. Etiam rhoncus. Maecenas tempus, tellus eget condimentum rhoncus, sem quam semper libero, sit amet adipiscing sem neque sed ipsum. Nam quam nunc, blandit vel, luctus pulvinar, hendrerit id, lorem. Maecenas nec odio et ante tincidunt tempus. Donec vitae sapien ut libero venenatis faucibus. Nullam quis ante. Etiam sit amet orci eget eros faucibus tincidunt. Duis leo. Sed fringilla mauris sit amet nibh. Donec sodales sagittis magna. Sed consequat, leo eget bibendum sodales, augue velit cursus nunc, quis gravida magna mi a libero. Fusce vulputate eleifend sapien. Vestibulum purus quam, scelerisque ut, mollis sed, nonummy id, metus. Nullam accumsan lorem in dui. Cras ultricies mi eu turpis hendrerit fringilla. Vestibulum ante ipsum primis in faucibus orci luctus et ultrices posuere cubilia Curae; In ac dui quis mi consectetuer lacinia. Nam pretium turpis et arcu. Duis arcu tortor, suscipit eget, imperdiet nec, imperdiet iaculis, ipsum. Sed aliquam ultrices mauris. Integer ante arcu, accumsan a, consectetuer eget, posuere ut, mauris. Praesent adipiscing. Phasellus ullamcorper ipsum rutrum nunc. Nunc nonummy metus. Vestibulum volutpat pretium libero. Cras id dui. Aenean ut eros et nisl sagittis vestibulum. Nullam nulla eros, ultricies sit amet, nonummy id, imperdiet feugiat, pede. Sed lectus. Donec mollis hendrerit risus. Phasellus nec sem in justo pellentesque facilisis. Etiam imperdiet imperdiet orci. Nunc nec neque. Phasellus leo dolor, tempus non, auctor et, hendrerit quis, nisi. Curabitur ligula sapien, tincidunt non, euismod vitae, posuere imperdiet, leo. Maecenas malesuada. Praesent congue erat at massa. Sed cursus turpis vitae tortor. Donec posuere vulputate arcu. Phasellus accumsan cursus velit. Vestibulum ante ipsum primis in faucibus orci luctus et ultrices posuere cubilia Curae; Sed aliquam, nisi quis porttitor congue, elit erat euismod orci, ac\x80\x00\x00\x00e"),
}
for _, ltbl := range longTbls {
rows.Rows = nil
err = rows.Decode(ltbl)
c.Assert(err, IsNil)
c.Assert(rows.Rows[0][1], Equals, float64(101))
}
} }

View File

@ -0,0 +1,43 @@
package replication
import (
"fmt"
"strings"
"time"
)
var (
fracTimeFormat []string
)
// fracTime is a help structure wrapping Golang Time.
type fracTime struct {
time.Time
// Dec must in [0, 6]
Dec int
}
func (t fracTime) String() string {
return t.Format(fracTimeFormat[t.Dec])
}
func formatZeroTime(frac int, dec int) string {
if dec == 0 {
return "0000-00-00 00:00:00"
}
s := fmt.Sprintf("0000-00-00 00:00:00.%06d", frac)
// dec must < 6, if frac is 924000, but dec is 3, we must output 924 here.
return s[0 : len(s)-(6-dec)]
}
func init() {
fracTimeFormat = make([]string, 7)
fracTimeFormat[0] = "2006-01-02 15:04:05"
for i := 1; i <= 6; i++ {
fracTimeFormat[i] = fmt.Sprintf("2006-01-02 15:04:05.%s", strings.Repeat("0", i))
}
}

View File

@ -0,0 +1,51 @@
package replication
import (
"time"
. "github.com/pingcap/check"
)
type testTimeSuite struct{}
var _ = Suite(&testTimeSuite{})
func (s *testSyncerSuite) TestTime(c *C) {
tbls := []struct {
year int
month int
day int
hour int
min int
sec int
microSec int
frac int
expected string
}{
{2000, 1, 1, 1, 1, 1, 1, 0, "2000-01-01 01:01:01"},
{2000, 1, 1, 1, 1, 1, 1, 1, "2000-01-01 01:01:01.0"},
{2000, 1, 1, 1, 1, 1, 1, 6, "2000-01-01 01:01:01.000001"},
}
for _, t := range tbls {
t1 := fracTime{time.Date(t.year, time.Month(t.month), t.day, t.hour, t.min, t.sec, t.microSec*1000, time.UTC), t.frac}
c.Assert(t1.String(), Equals, t.expected)
}
zeroTbls := []struct {
frac int
dec int
expected string
}{
{0, 1, "0000-00-00 00:00:00.0"},
{1, 1, "0000-00-00 00:00:00.0"},
{123, 3, "0000-00-00 00:00:00.000"},
{123000, 3, "0000-00-00 00:00:00.123"},
{123, 6, "0000-00-00 00:00:00.000123"},
{123000, 6, "0000-00-00 00:00:00.123000"},
}
for _, t := range zeroTbls {
c.Assert(formatZeroTime(t.frac, t.dec), Equals, t.expected)
}
}

View File

@ -5,6 +5,7 @@
package schema package schema
import ( import (
"database/sql"
"fmt" "fmt"
"strings" "strings"
@ -12,6 +13,8 @@ import (
"github.com/siddontang/go-mysql/mysql" "github.com/siddontang/go-mysql/mysql"
) )
var ErrTableNotExist = errors.New("table is not exist")
const ( const (
TYPE_NUMBER = iota + 1 // tinyint, smallint, mediumint, int, bigint, year TYPE_NUMBER = iota + 1 // tinyint, smallint, mediumint, int, bigint, year
TYPE_FLOAT // float, double TYPE_FLOAT // float, double
@ -29,7 +32,10 @@ const (
type TableColumn struct { type TableColumn struct {
Name string Name string
Type int Type int
Collation string
RawType string
IsAuto bool IsAuto bool
IsUnsigned bool
EnumValues []string EnumValues []string
SetValues []string SetValues []string
} }
@ -53,9 +59,10 @@ func (ta *Table) String() string {
return fmt.Sprintf("%s.%s", ta.Schema, ta.Name) return fmt.Sprintf("%s.%s", ta.Schema, ta.Name)
} }
func (ta *Table) AddColumn(name string, columnType string, extra string) { func (ta *Table) AddColumn(name string, columnType string, collation string, extra string) {
index := len(ta.Columns) index := len(ta.Columns)
ta.Columns = append(ta.Columns, TableColumn{Name: name}) ta.Columns = append(ta.Columns, TableColumn{Name: name, Collation: collation})
ta.Columns[index].RawType = columnType
if strings.Contains(columnType, "int") || strings.HasPrefix(columnType, "year") { if strings.Contains(columnType, "int") || strings.HasPrefix(columnType, "year") {
ta.Columns[index].Type = TYPE_NUMBER ta.Columns[index].Type = TYPE_NUMBER
@ -97,6 +104,10 @@ func (ta *Table) AddColumn(name string, columnType string, extra string) {
ta.Columns[index].Type = TYPE_STRING ta.Columns[index].Type = TYPE_STRING
} }
if strings.Contains(columnType, "unsigned") || strings.Contains(columnType, "zerofill") {
ta.Columns[index].IsUnsigned = true
}
if extra == "auto_increment" { if extra == "auto_increment" {
ta.Columns[index].IsAuto = true ta.Columns[index].IsAuto = true
} }
@ -142,6 +153,35 @@ func (idx *Index) FindColumn(name string) int {
return -1 return -1
} }
func IsTableExist(conn mysql.Executer, schema string, name string) (bool, error) {
query := fmt.Sprintf("SELECT * FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = '%s' and TABLE_NAME = '%s' LIMIT 1", schema, name)
r, err := conn.Execute(query)
if err != nil {
return false, errors.Trace(err)
}
return r.RowNumber() == 1, nil
}
func NewTableFromSqlDB(conn *sql.DB, schema string, name string) (*Table, error) {
ta := &Table{
Schema: schema,
Name: name,
Columns: make([]TableColumn, 0, 16),
Indexes: make([]*Index, 0, 8),
}
if err := ta.fetchColumnsViaSqlDB(conn); err != nil {
return nil, errors.Trace(err)
}
if err := ta.fetchIndexesViaSqlDB(conn); err != nil {
return nil, errors.Trace(err)
}
return ta, nil
}
func NewTable(conn mysql.Executer, schema string, name string) (*Table, error) { func NewTable(conn mysql.Executer, schema string, name string) (*Table, error) {
ta := &Table{ ta := &Table{
Schema: schema, Schema: schema,
@ -151,18 +191,18 @@ func NewTable(conn mysql.Executer, schema string, name string) (*Table, error) {
} }
if err := ta.fetchColumns(conn); err != nil { if err := ta.fetchColumns(conn); err != nil {
return nil, err return nil, errors.Trace(err)
} }
if err := ta.fetchIndexes(conn); err != nil { if err := ta.fetchIndexes(conn); err != nil {
return nil, err return nil, errors.Trace(err)
} }
return ta, nil return ta, nil
} }
func (ta *Table) fetchColumns(conn mysql.Executer) error { func (ta *Table) fetchColumns(conn mysql.Executer) error {
r, err := conn.Execute(fmt.Sprintf("describe `%s`.`%s`", ta.Schema, ta.Name)) r, err := conn.Execute(fmt.Sprintf("show full columns from `%s`.`%s`", ta.Schema, ta.Name))
if err != nil { if err != nil {
return errors.Trace(err) return errors.Trace(err)
} }
@ -170,14 +210,39 @@ func (ta *Table) fetchColumns(conn mysql.Executer) error {
for i := 0; i < r.RowNumber(); i++ { for i := 0; i < r.RowNumber(); i++ {
name, _ := r.GetString(i, 0) name, _ := r.GetString(i, 0)
colType, _ := r.GetString(i, 1) colType, _ := r.GetString(i, 1)
extra, _ := r.GetString(i, 5) collation, _ := r.GetString(i, 2)
extra, _ := r.GetString(i, 6)
ta.AddColumn(name, colType, extra) ta.AddColumn(name, colType, collation, extra)
} }
return nil return nil
} }
func (ta *Table) fetchColumnsViaSqlDB(conn *sql.DB) error {
r, err := conn.Query(fmt.Sprintf("show full columns from `%s`.`%s`", ta.Schema, ta.Name))
if err != nil {
return errors.Trace(err)
}
defer r.Close()
var unusedVal interface{}
unused := &unusedVal
for r.Next() {
var name, colType, extra string
var collation sql.NullString
err := r.Scan(&name, &colType, &collation, &unused, &unused, &unused, &extra, &unused, &unused)
if err != nil {
return errors.Trace(err)
}
ta.AddColumn(name, colType, collation.String, extra)
}
return r.Err()
}
func (ta *Table) fetchIndexes(conn mysql.Executer) error { func (ta *Table) fetchIndexes(conn mysql.Executer) error {
r, err := conn.Execute(fmt.Sprintf("show index from `%s`.`%s`", ta.Schema, ta.Name)) r, err := conn.Execute(fmt.Sprintf("show index from `%s`.`%s`", ta.Schema, ta.Name))
if err != nil { if err != nil {
@ -197,6 +262,59 @@ func (ta *Table) fetchIndexes(conn mysql.Executer) error {
currentIndex.AddColumn(colName, cardinality) currentIndex.AddColumn(colName, cardinality)
} }
return ta.fetchPrimaryKeyColumns()
}
func (ta *Table) fetchIndexesViaSqlDB(conn *sql.DB) error {
r, err := conn.Query(fmt.Sprintf("show index from `%s`.`%s`", ta.Schema, ta.Name))
if err != nil {
return errors.Trace(err)
}
defer r.Close()
var currentIndex *Index
currentName := ""
var unusedVal interface{}
unused := &unusedVal
for r.Next() {
var indexName, colName string
var cardinality uint64
err := r.Scan(
&unused,
&unused,
&indexName,
&unused,
&colName,
&unused,
&cardinality,
&unused,
&unused,
&unused,
&unused,
&unused,
&unused,
)
if err != nil {
return errors.Trace(err)
}
if currentName != indexName {
currentIndex = ta.AddIndex(indexName)
currentName = indexName
}
currentIndex.AddColumn(colName, cardinality)
}
return ta.fetchPrimaryKeyColumns()
}
func (ta *Table) fetchPrimaryKeyColumns() error {
if len(ta.Indexes) == 0 { if len(ta.Indexes) == 0 {
return nil return nil
} }

View File

@ -1,12 +1,14 @@
package schema package schema
import ( import (
"database/sql"
"flag" "flag"
"fmt" "fmt"
"testing" "testing"
. "github.com/pingcap/check" . "github.com/pingcap/check"
"github.com/siddontang/go-mysql/client" "github.com/siddontang/go-mysql/client"
_ "github.com/siddontang/go-mysql/driver"
) )
// use docker mysql for test // use docker mysql for test
@ -18,6 +20,7 @@ func Test(t *testing.T) {
type schemaTestSuite struct { type schemaTestSuite struct {
conn *client.Conn conn *client.Conn
sqlDB *sql.DB
} }
var _ = Suite(&schemaTestSuite{}) var _ = Suite(&schemaTestSuite{})
@ -26,12 +29,19 @@ func (s *schemaTestSuite) SetUpSuite(c *C) {
var err error var err error
s.conn, err = client.Connect(fmt.Sprintf("%s:%d", *host, 3306), "root", "", "test") s.conn, err = client.Connect(fmt.Sprintf("%s:%d", *host, 3306), "root", "", "test")
c.Assert(err, IsNil) c.Assert(err, IsNil)
s.sqlDB, err = sql.Open("mysql", fmt.Sprintf("root:@%s:3306", *host))
c.Assert(err, IsNil)
} }
func (s *schemaTestSuite) TearDownSuite(c *C) { func (s *schemaTestSuite) TearDownSuite(c *C) {
if s.conn != nil { if s.conn != nil {
s.conn.Close() s.conn.Close()
} }
if s.sqlDB != nil {
s.sqlDB.Close()
}
} }
func (s *schemaTestSuite) TestSchema(c *C) { func (s *schemaTestSuite) TestSchema(c *C) {
@ -48,6 +58,10 @@ func (s *schemaTestSuite) TestSchema(c *C) {
se SET('a', 'b', 'c'), se SET('a', 'b', 'c'),
f FLOAT, f FLOAT,
d DECIMAL(2, 1), d DECIMAL(2, 1),
uint INT UNSIGNED,
zfint INT ZEROFILL,
name_ucs VARCHAR(256) CHARACTER SET ucs2,
name_utf8 VARCHAR(256) CHARACTER SET utf8,
PRIMARY KEY(id2, id), PRIMARY KEY(id2, id),
UNIQUE (id1), UNIQUE (id1),
INDEX name_idx (name) INDEX name_idx (name)
@ -60,7 +74,7 @@ func (s *schemaTestSuite) TestSchema(c *C) {
ta, err := NewTable(s.conn, "test", "schema_test") ta, err := NewTable(s.conn, "test", "schema_test")
c.Assert(err, IsNil) c.Assert(err, IsNil)
c.Assert(ta.Columns, HasLen, 8) c.Assert(ta.Columns, HasLen, 12)
c.Assert(ta.Indexes, HasLen, 3) c.Assert(ta.Indexes, HasLen, 3)
c.Assert(ta.PKColumns, DeepEquals, []int{2, 0}) c.Assert(ta.PKColumns, DeepEquals, []int{2, 0})
c.Assert(ta.Indexes[0].Columns, HasLen, 2) c.Assert(ta.Indexes[0].Columns, HasLen, 2)
@ -69,6 +83,16 @@ func (s *schemaTestSuite) TestSchema(c *C) {
c.Assert(ta.Columns[4].EnumValues, DeepEquals, []string{"a", "b", "c"}) c.Assert(ta.Columns[4].EnumValues, DeepEquals, []string{"a", "b", "c"})
c.Assert(ta.Columns[5].SetValues, DeepEquals, []string{"a", "b", "c"}) c.Assert(ta.Columns[5].SetValues, DeepEquals, []string{"a", "b", "c"})
c.Assert(ta.Columns[7].Type, Equals, TYPE_FLOAT) c.Assert(ta.Columns[7].Type, Equals, TYPE_FLOAT)
c.Assert(ta.Columns[0].IsUnsigned, IsFalse)
c.Assert(ta.Columns[8].IsUnsigned, IsTrue)
c.Assert(ta.Columns[9].IsUnsigned, IsTrue)
c.Assert(ta.Columns[10].Collation, Matches, "^ucs2.*")
c.Assert(ta.Columns[11].Collation, Matches, "^utf8.*")
taSqlDb, err := NewTableFromSqlDB(s.sqlDB, "test", "schema_test")
c.Assert(err, IsNil)
c.Assert(taSqlDb, DeepEquals, ta)
} }
func (s *schemaTestSuite) TestQuoteSchema(c *C) { func (s *schemaTestSuite) TestQuoteSchema(c *C) {