详解 canal 同步 MySQL 增量数据到 ES

canal 是阿里知名的开源项目,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费。

这篇文章,我们手把手向同学们展示使用 canal 将 MySQL 增量数据同步到 ES

1 集群模式

图中 server 对应一个 canal 运行实例 ,对应一个 JVM 。

server 中包含 1..n 个 instance , 我们可以将 instance 理解为配置任务

instance 包含如下模块 :

  • eventParser

    数据源接入,模拟 slave 协议和 master 进行交互,协议解析

  • eventSink

    Parser 和 Store 链接器,进行数据过滤,加工,分发的工作

  • eventStore

    数据存储

  • metaManager

    增量订阅 & 消费信息管理器

真实场景中,canal 高可用依赖 zookeeper ,笔者将客户端模式可以简单划分为:TCP 模式MQ 模式

实战中我们经常会使用 MQ 模式 。因为 MQ 模式的优势在于解耦 ,canal server 将数据变更信息发送到消息队列 kafka 或者 RocketMQ ,消费者消费消息,顺序执行相关逻辑即可。

顺序消费

对于指定的一个 Topic ,所有消息根据 Sharding Key 进行区块分区,同一个分区内的消息按照严格的先进先出(FIFO)原则进行发布和消费。同一分区内的消息保证顺序,不同分区之间的消息顺序不做要求。

2 MySQL配置

1、对于自建 MySQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下

[mysqld]

log-bin=mysql-bin # 开启 binlog

binlog-format=ROW # 选择 ROW 模式

server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

注意:针对阿里云 RDS for MySQL , 默认打开了 binlog , 并且账号默认具有 binlog dump 权限 , 不需要任何权限或者 binlog 设置,可以直接跳过这一步。

2、授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant 。

CREATE USER canal IDENTIFIED BY ‘canal’;

GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON . TO ‘canal‘@’%’;

– GRANT ALL PRIVILEGES ON . TO ‘canal‘@’%’ ;

FLUSH PRIVILEGES;

3、创建数据库商品表 t_product

CREATE TABLE t_product (

id BIGINT ( 20 ) NOT NULL AUTO_INCREMENT,

name VARCHAR ( 255 ) COLLATE utf8mb4_bin NOT NULL,

price DECIMAL ( 10, 2 ) NOT NULL,

status TINYINT ( 4 ) NOT NULL,

create_time datetime NOT NULL,

update_time datetime NOT NULL,

PRIMARY KEY ( id )

) ENGINE = INNODB DEFAULT CHARSET = utf8mb4 COLLATE = utf8mb4_bin

3 Elasticsearch配置

使用 Kibana 创建商品索引

PUT /t_product

{

“settings”: {

“number_of_shards”: 2,

“number_of_replicas”: 1

},

“mappings”: {

“properties”: {

“id”: {

“type”:”keyword”

},

“name”: {

“type”:”text”

},

“price”: {

“type”:”double”

},

“status”: {

“type”:”integer”

},

“createTime”: {

“type”: “date”,

“format”: “yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis”

},

“updateTime”: {

“type”: “date”,

“format”: “yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis”

}

}

}

}

执行完成,如图所示 :

4 RocketMQ 配置

创建主题:product-syn-topic ,canal 会将 Binlog 的变化数据发送到该主题。

5 canal 配置

我们选取 canal 版本 1.1.6 ,进入 conf 目录。

1、配置 canal.properties

#集群模式 zk地址

canal.zkServers = localhost:2181

#本质是MQ模式和tcp模式 tcp, kafka, rocketMQ, rabbitMQ, pulsarMQ

canal.serverMode = rocketMQ

#instance 列表

canal.destinations = product-syn

#conf root dir

canal.conf.dir = ../conf

#全局的spring配置方式的组件文件 生产环境,集群化部署

canal.instance.global.spring.xml = classpath:spring/default-instance.xml

以下部分是默认值 展示出来

Canal的batch size, 默认50K, 由于kafka最大消息体限制请勿超过1M(900K以下)

canal.mq.canalBatchSize = 50

Canal get数据的超时时间, 单位: 毫秒, 空为不限超时

canal.mq.canalGetTimeout = 100

是否为 flat json格式对象

canal.mq.flatMessage = true

2、instance 配置文件

conf 目录下创建实例目录 product-syn , 在 product-syn 目录创建配置文件 :instance.properties

按需修改成自己的数据库信息

#################################################

canal.instance.master.address=192.168.1.20:3306

username/password,数据库的用户名和密码

canal.instance.dbUsername = canal

canal.instance.dbPassword = canal

table regex

canal.instance.filter.regex=mytest.t_product

mq config

canal.mq.topic=product-syn-topic

针对库名或者表名发送动态topic

#canal.mq.dynamicTopic=mytest,.,mytest.user,mytest\..,.\..

canal.mq.partition=0

hash partition config

#canal.mq.partitionsNum=3

#库名.表名: 唯一主键,多个表之间用逗号分隔

#canal.mq.partitionHash=mytest.person:id,mytest.role:id

#################################################

3、服务启动

启动两个 canal 服务,我们从 zookeeper gui 中查看服务运行情况 。

修改一条 t_product 表记录,可以从 RocketMQ 控制台中观测到新的消息。

6 消费者

1、产品索引操作服务

2、消费监听器

消费者逻辑重点有两点:

  • 顺序消费监听器

  • 将消息数据转换成 JSON 字符串,从 data 节点中获取表最新数据(批量操作可能是多条)。然后根据操作类型 UPDATEINSERTDELETE 执行产品索引操作服务的方法。

7 写到最后

canal 是一个非常有趣的开源项目,很多公司使用 canal 构建数据传输服务( Data Transmission Service ,简称 DTS ) 。

推荐大家阅读这个开源项目,你可以从中学习到网络编程、多线程模型、高性能队列 Disruptor、 流程模型抽象等。

这篇文章涉及到的代码已收录到下面的工程中,有兴趣的同学可以一看。

github.com/makemyownlife/rocketmq4...


如果我的文章对你有所帮助,还请帮忙点赞、在看、转发一下,你的支持会激励我输出更高质量的文章,非常感谢!

本作品采用《CC 协议》,转载必须注明作者和本文链接
公众号: 勇哥Java实战
讨论数量: 0
(= ̄ω ̄=)··· 暂无内容!

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!