Canal 介绍及使用

一、介绍

1.1、什么是 Canal

Canal 是用 Java 开发的基于数据库增量日志解析,提供增量数据订阅&消费的中间件。目前。Canal 主要支持了 MySQL 的 Binlog 解析,解析完成后才利用 Canal Client 来处理获得的相关数据。(数据库同步需要阿里的Otter中间件,基于Canal)。

1.2、MySQL 的 Binlog

1.2.1、什么是 Binglog

MySQL 的二进制日志可以说 MySQL 最重要的日志了,它记录了所有的 DDL 和 DML (除了数据查询语句)语句,以事件形式记录,还包含语句所执行的消耗的时间,MySQL的二进制日志是事务安全型的。

一般来说开启二进制日志大概会有1%的性能损耗。二进制有两个最重要的使用场景:

  • 其一: MySQL Replication在Master端开启Binlog, Master把它的二进制日志传递给Slaves 来达到Master-Slave数据一致的目的。
  • 其二:自然就是数据恢复了,通过使用MySQL Binlog工具来使恢复数据。

二进制日志包括两类文件:二进制日志索引文件(文件名后缀为.index)用于记录所有
的二进制文件,二进制日志文件(文件名后缀为.00000*)记录数据库所有的DDL和DML(除
了数据查询语句)语句事件。

1.2.2、Binglog 分类

MySQL Binlog 的格式有三种,分别是STATEMENTMIXEDROW。在配置文件中可以选择配
binlog_ format= statement| mixed |row。三种格式的区别:

  1. statement:语句级,binlog会记录每次一执行写操作的语句。相对row模式节省空,但是可能会会产生不一致,比如一些函数,例如update t set create_date = now(),如果使用 binlog 日志,进行恢复,由于执行时间不同,可能产生的数据就不同
    1. 优点:节省空间
    2. 缺点:又可能造成数据不一致
  2. row:行级,binglog 会记录每次操作后每行记录的变化
    1. 优点:保持数据的绝对一致。因为不管 sql 是什么,引用了什么函数,它只记录执行后的结果
    2. 缺点:占用空间较大
  3. mixed:stetement 的升级版,一定程度上解决了一些情况而造成的 statement 模式不一致问题,默认还是 statement,在某些情况下譬如:当函数中包含UUID() 时;包含AUTO_ INCREMENT字段的表被更新时;执行INSERT DELAYED语句时;用UDF时;会按照 ROW 的方式进行处理。
    1. 优点:节省空间,同时兼顾了一定的一致性。
    2. 缺点:还有些极个别情况依旧会造成不一致,另外 stetement 和 mixed 对于需要 binlog 的监控情况都不方便。

综上所述,Canal 先做监控分析,选择 row 格式比较合适。

1.3、Canal 工作原理

  • canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送 dump 协议
  • MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
  • canal 解析 binary log 对象(原始为 byte 流)

1.4、使用场景

1.4.1、原始场景

阿里 Otter 中间件的一部分,Otter 是阿里用于进行异步数据库之间同步框架,Canal 是其中一部分

Canal 介绍及使用

1.4.2、常用场景一

更新缓存,例如当数据写入到 MySQL 中时,将增加或修改的数据同步到缓存中,用户每次直接从缓存中拿取数据,如果没获取到,再从数据库查询。

1.4.3、常用场景二

抓取业务表的新增变化数据,用于制作实时统计

二、MySQL 准备

这里以mysql8.0.28canal-1.1.7-alpha-1,为例,由于使用的是 mysql8,如果 canal 版本过低会导致各种 bug。

2.1、创建数据库

CREATE DATABASE `canal_test` /*!40100 DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci */ /*!80016 DEFAULT ENCRYPTION='N' */

2.2、创建数据库表

CREATE TABLE user_info(
  `id` VARCHAR(255),
    `name` VARCHAR(255),
    `sex` VARCHAR(255)
)

2.3、配置文件开启 Binglog

$ vim /etc/mysql/my.cnf

[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
binglog-do-db=canal_test
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

注意: binglog-do-db 根据自己情况进行修改,指定具体要同步的数据库,如果不配置,表示所有的数据均开启 Binglog

2.4、重启 MySQL 生效

$ sudo systemctl restart mysqld

2.5、创建用户并赋权

-- 由于默认密码比较严格,降低密码策略严格程度
SHOW VARIABLES LIKE 'validate_password%';
SET GLOBAL validate_password.policy = LOW;
SET GLOBAL validate_password.length = 4;

CREATE USER 'canal'@'%' IDENTIFIED by 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;

三、准备 canal

3.1、下载并解压

Canal 下载地址
Canal 官方文档地址

$ mkdir canal
$ tar -zxvf canal.deployer-1.1.7-SNAPSHOT.tar.gz -C canal

3.2、修改 canal.properties 文件

修改canal.properties文件

canal.serverMode = tcp

说明:这个文件是 canal 的基本迪用配置,canal 端口号默认就是 11111,修改 canal 的输出 model,默认tcp,改为输出到 kafka 等消息中间件。

多实例配置如果创建多个实例,通过前面canal架构,我们可以知道,一个canal服务中可以有多个instance,conf下的每一个 example 即是一个实例,每个实例下面都有独立的配置文件。默认只有一个实例 example,如果需要多个实例处理不同的 MySQL 数据的话,直接拷贝出多个 example,并对其重新命名,命名和配置文件中指定的名称一致,然后修改 canal.properties 中的 canal.destinations=实例1,实例2,实例3

canal.destinations = example

3.3、修改 instance.properties

这里按一个数据库为例

$ cd canal/conf/example
$ vim instance.properties

3.3.1、配置 MySQL 服务器地址

canal.instance.mysql.slaveId=20
canal.instance.master.address=ip:port

# 关闭 tsdb
canal.instance.tsdb.enable=false

3.3.2、配置连接 MySQL 用户名和密码

# username/password
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false

3.3.3、 开放端口

# canal admin 端口
$ firewall-cmd --zone=public --add-port=11110/tcp --permanent
# canal 监听端口
$ firewall-cmd --zone=public --add-port=11111/tcp --permanent
# canal 指标短裤哦
$ firewall-cmd --zone=public --add-port=11112/tcp --permanent
$ firewall-cmd --reload

四、Docker 安装

4.1、拉取镜像

# 拉取镜像
$ docker pull canal/canal-server:v1.1.6
# 下载脚本
$ wget https://raw.githubusercontent.com/alibaba/canal/master/docker/run.sh

# 构建一个destination name为test的队列
sh run.sh -e canal.auto.scan=false
          -e canal.destinations=test
          -e canal.instance.master.address=127.0.0.1:3306
          -e canal.instance.dbUsername=canal
          -e canal.instance.dbPassword=canal
          -e canal.instance.connectionCharset=UTF-8
          -e canal.instance.tsdb.enable=true
          -e canal.instance.gtidon=false

五、问题

问题说明

canal 启动时,报错

Canal 介绍及使用

因为自MySQL 8.0.3开始,身份验证插件默认使用caching_sha2_password

问题解决

修改canal用户对应的身份验证插件为 mysql_native_password

mysql> select host,user,plugin from mysql.user ;
mysql> ALTER USER 'canal'@'%' IDENTIFIED WITH mysql_native_password BY 'password';

再次启动即可。

六、整合 SpringBoot 测试

6.1、引入依赖

<dependencies>
  <dependency>
  <groupId>com.alibaba.otter</groupId>
 <artifactId>canal.client</artifactId>
 <version>1.1.6</version>
  </dependency>
 <dependency>
  <groupId>com.alibaba.otter</groupId>
 <artifactId>canal.protocol</artifactId>
 <version>1.1.6</version>
  </dependency>
</dependencies>

6.2、客户端代码

public class CanalClient {

    private static final Logger log = LoggerFactory.getLogger(CanalClient.class);

    public static void main(String[] args) throws InterruptedException, InvalidProtocolBufferException {
        // 获取连接
        CanalConnector canalConnector =
                CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.33.81", 11111), "example", "", "");

        // 连接
        canalConnector.connect();

        // 订阅数据库,这里一定要是world.*否则无法获取到数据
        canalConnector.subscribe("world.*");

         while (true) {
            // 获取数据,一次拉取 100 条数据
            Message message = canalConnector.get(100);

            List<CanalEntry.Entry> entries = message.getEntries();

            // 判断集合是否为空,如果为空则等待之后再拉取
            if (entries.isEmpty()) {
                log.warn("当此抓取没有数据,等待片刻");
                TimeUnit.SECONDS.sleep(5);
            } else {
                // 遍历 entries
                for (CanalEntry.Entry entry : entries) {
                    // 1.获取表名
                    String tableName = entry.getHeader().getTableName();

                    // 2.获取类型
                    CanalEntry.EntryType entryType = entry.getEntryType();

                    // 3.获取序列化的数据
                    ByteString storeValue = entry.getStoreValue();
                    // 4.判断 entryType 是否为 ROWDATA 类型
                    if (CanalEntry.EntryType.ROWDATA.equals(entryType)) {
                        // 5.反序列化数据
                        CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue);

                        //6.获取当前时间的操作类型
                        CanalEntry.EventType eventType = rowChange.getEventType();

                        //7.获取行数据集
                        List<CanalEntry.RowData> rowDatesList = rowChange.getRowDatasList();

                        //8.遍历  rowDatesList 并打印数据集
                        for (CanalEntry.RowData rowData : rowDatesList) {
                            JSONObject beforeDate = new JSONObject();
                            List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();
                            for (CanalEntry.Column column : beforeColumnsList) {
                                beforeDate.put(column.getName(), column.getValue());
                            }

                            JSONObject afterDate = new JSONObject();
                            List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
                            for (CanalEntry.Column column : afterColumnsList) {
                                afterDate.put(column.getName(), column.getValue());
                            }

                            // 数据的打印
                            log.info("Table: {},EventType: {},Before: {},After: {}", tableName, eventType, beforeDate, afterDate);
                        }
                    } else {
                        log.warn("当前操作类型为: {}", entryType);
                    }
                }
            }
         }
    }
}

6.3、测试

6.3.1、新增数据

INSERT INTO user_info VALUES('1002','test2','female'),('1003','test3','male');

效果如下

Canal 介绍及使用

6.3.2、删除数据

DELETE FROM user_info WHERE id = '1001';

Canal 介绍及使用

6.3.3、修改数据

UPDATE user_info SET name = 'female' WHERE id = '1003';

Canal 介绍及使用

本作品采用《CC 协议》,转载必须注明作者和本文链接
讨论数量: 0
(= ̄ω ̄=)··· 暂无内容!

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!
未填写
文章
247
粉丝
18
喜欢
217
收藏
62
排名:731
访问:9753
私信
所有博文
社区赞助商