42 lines
927 B
Go
42 lines
927 B
Go
|
package canal
|
||
|
|
||
|
import (
|
||
|
"github.com/juju/errors"
|
||
|
"github.com/ngaut/log"
|
||
|
"github.com/siddontang/go-mysql/mysql"
|
||
|
)
|
||
|
|
||
|
var (
|
||
|
ErrHandleInterrupted = errors.New("do handler error, interrupted")
|
||
|
)
|
||
|
|
||
|
type RowsEventHandler interface {
|
||
|
// Handle RowsEvent, if return ErrHandleInterrupted, canal will
|
||
|
// stop the sync
|
||
|
Do(e *RowsEvent) error
|
||
|
String() string
|
||
|
}
|
||
|
|
||
|
func (c *Canal) RegRowsEventHandler(h RowsEventHandler) {
|
||
|
c.rsLock.Lock()
|
||
|
c.rsHandlers = append(c.rsHandlers, h)
|
||
|
c.rsLock.Unlock()
|
||
|
}
|
||
|
|
||
|
func (c *Canal) travelRowsEventHandler(e *RowsEvent) error {
|
||
|
c.rsLock.Lock()
|
||
|
defer c.rsLock.Unlock()
|
||
|
|
||
|
var err error
|
||
|
for _, h := range c.rsHandlers {
|
||
|
if err = h.Do(e); err != nil && !mysql.ErrorEqual(err, ErrHandleInterrupted) {
|
||
|
log.Errorf("handle %v err: %v", h, err)
|
||
|
} else if mysql.ErrorEqual(err, ErrHandleInterrupted) {
|
||
|
log.Errorf("handle %v err, interrupted", h)
|
||
|
return ErrHandleInterrupted
|
||
|
}
|
||
|
|
||
|
}
|
||
|
return nil
|
||
|
}
|