基于Canal+Kafka实现缓存实时更新

前言

相信对于大部分同学来说,缓存应该是平时开发中会经常接触的东西了。常规的逻辑一般是「查询」->「检查是否有缓存」如果有,就直接返回;如果没有,则从DB中读取数据。后续对数据进行过更新之后,再删除/更新缓存。

所以从流程上来看读缓存很容易,难的是保证缓存数据的有效性,大多数的做法是在业务代码中嵌入更新缓存的逻辑。比如在修改某篇文章成功之后,删除原有的 key。

正常情况下这种方式没有问题,但有时我们的系统中不止一个地方会修改这些数据,那么我们就不得不在每处业务代码中植入更新缓存的逻辑,随着时间的推移,我们的代码变得越来越臃肿,难以维护。

那么有没有什么方法可以让我们可以不用花费太多精力来关注业务之外的事情呢?当然有,这就是我们今天要介绍的工具 Canal。

Canal介绍

canal 是阿里巴巴旗下的一款开源项目,纯 Java 开发。基于数据库增量日志解析,提供增量数据订阅&消费,可以很方便地同步数据库的增量数据到其他的存储应用。

他的原理是把自己伪装成 MySQL slave,模拟 MySQL slave 的交互协议向 MySQL Mater发送 dump协议,MySQL master 收到 canal 发送过来的 dump 请求,开始推送 binary log 给 canal,然后 canal 解析 binary log,再发送到存储目的地,比如 MySQL,Kafka,Elastic Search 等等。

那么我们今天要分享的就是基于 Mysql+Canal+Kafka+Redis 来实现缓存数据的实时更新。

环境准备

  • Canal
  • Mysql
  • Zookeeper
  • Kafka
  • Redis

    具体安装流程就不在这里说明了,网上都有,嫌麻烦的同学可以直接使用docker进行安装

image

工作流程

image

说明:大体流程就是 canal 充当一个 mysql 的从服务器,从 master 拉取 binlog 变化,将更新内容推送至 kafka 中,然后客户端启动消费者订阅主题,根据数据变化执行对应的业务逻辑。

Mysql

首先数据库需要开启 binlog
my.cnf

[mysqld]
pid-file        = /var/run/mysqld/mysqld.pid
socket          = /var/run/mysqld/mysqld.sock
datadir         = /var/lib/mysql
log-bin         = /var/log/mysql/mysql-bin
binlog-format   = row
server-id       = 1

然后需要创建一个 canal 用户,用于 canal 实例拉取 binlog,具体库权限根据自身业务匹配,我这里直接给到所有。

create user canal@'%' identified by 'canal';
grant all privileges on *.* to 'canal'@'%';
flush privileges;

Canal配置

接下来就是 canal 的配置,主要分为两部分:全局配置(canal.properties) 和 实例配置(instance.properties),配置文件在 canal-server/conf 下,这里只列出一些修改项,其他的均采用默认配置。

canal.properties:

#canal参数
canal.register.ip = 172.17.0.4        # canal服务器地址
canal.port = 11111                    # 端口

#zookeeper
canal.zkServers = 172.18.0.2:2181    #zookeeper的服务器地址及端口
canal.serverMode = kafka            #服务模式 : tcp, kafka, rocketMQ, rabbitMQ

#destinations
canal.destinations = example        # 实例

#kafka
kafka.bootstrap.servers = kafka_kafka_1:9092    #kafka地址

instance.properties:

canal.instance.mysql.slaveId=999                # mysql->slaveId,其实就是mysql的server-id,不要和数据库的server-id冲突

canal.instance.master.address=172.17.0.2:3306    # mysql连接地址
canal.instance.master.journal.name=mysql-bin.000005    #mysql-binlog日志
canal.instance.master.position=462                # 日志偏移位置

canal.instance.dbUsername=canal                    # mysql用户名
canal.instance.dbPassword=canal                    # mysql密码

canal.instance.filter.regex=.*\\..*                # 监听库表,当前配置是监听所有库所有表
#canal.instance.filter.regex=test\\..*            # 此配置是监听test库下所有表        

canal.mq.topic=canal                            # Kafka topic名称

Kafka

Kafka相关内容可以参考「Kafka应用」系列文章

操作演示

  1. 启动canal

    docker run --name canal --network kafka_default -v /Users/admin/docker/conf/canal:/home/admin/canal-server/conf -p 11111:11111 -p 11112:11112 -p 11110:11110 -d canal/canal-server
  2. 修改数据库
    image

  3. MQ消费者(Client)

此时Kafka消费者会读取到一条消息,内容为本次更新内容。

image

  1. MQ消费者(PHP)
    使用 PHP 客户端来消费 Kafka 数据。

    public function handle()
     {
         $this->line("开启消费者...");
         $conf = new \RdKafka\Conf();
         $conf->set('group.id', 'test');
         $conf->set('metadata.broker.list', '192.168.65.2:32768');
         $conf->set('enable.auto.commit', 'false');
         $conf->set('auto.offset.reset', 'earliest');
    
         $consumer = new \RdKafka\KafkaConsumer($conf);
         $consumer->subscribe(['canal']);
         $this->line("订阅主题...");
    
         while (true) {
             $message = $consumer->consume(120*10000);
             $this->line("接收消息...");
             switch ($message->err) {
                 case RD_KAFKA_RESP_ERR_NO_ERROR:
                     $payload = json_decode($message->payload, true);
                     $this->line("产生的操作:".$payload['type']);
                     $this->line("变更的库:".$payload['database']);
                     $this->line("变更的表:".$payload['table']);
                     echo "变更的数据:";
                     var_dump($payload['data']);
                     // 根据变更数据执行具体业务
                     // redis->del("xxx")    删除缓存等
    
                     $consumer->commit($message);
                     break;
    
                 case RD_KAFKA_RESP_ERR__PARTITION_EOF:
                     echo "No more messages; will wait for more\n";
                     break;
    
                 case RD_KAFKA_RESP_ERR__TIMED_OUT:
                     echo "Timed out\n";
                     break;
    
                 default:
                     throw new \Exception($message->errstr(), $message->err);
                     break;
             }
         }
     }

执行结果:
image
至此,我们在客户端已经获取到数据库实时变更数据,对应的可以执行我们自身的业务逻辑,比如:发送通知、缓存更新等操作。

注意事项

  1. 连接不上mysql/zookeeper的问题

    检查配置文件中mysql或zookeeper的连接地址是否正确,canal所在服务器是否可以ping通
    docker环境下需要提前配置网络环境,已达到容器可以互相访问的目的。
    docker –link mysql:mysql # 创建和mysql容器的连接
    docker –network kafka_default # 将当前容器加入至指定网络中。
    注意:link和network同时使用时,network会将link覆盖。需要使用docker network connect命令来连接link所在的网络。
    例如:
    docker inspect mysql # 查看mysql的network-> bridge
    docker network connect bridge canal #将canal容器加入至bridge中。
    或者使用compose.yml的方式进行网络设置,docker run命令下只能连接一个网络。

本作品采用《CC 协议》,转载必须注明作者和本文链接
本帖由系统于 5个月前 自动加精
《L01 基础入门》
我们将带你从零开发一个项目并部署到线上,本课程教授 Web 开发中专业、实用的技能,如 Git 工作流、Laravel Mix 前端工作流等。
《G01 Go 实战入门》
从零开始带你一步步开发一个 Go 博客项目,让你在最短的时间内学会使用 Go 进行编码。项目结构很大程度上参考了 Laravel。
讨论数量: 8
朕略显ぼうっと萌

牛逼

5个月前 评论
ligkwww (楼主) 5个月前
xingkong 5个月前
xingkong 5个月前

赞! canal 我们正在用,架构是 mysql -> canal_server -> canal_client(PHP实现的) -> mysql2 在 client 根据需求过滤表,甚至更改字段,比如原表是 id 和 name,新表是 id 和 nickname,则在client 做个转换就行了。redis 和 es 也有考虑去实现。

5个月前 评论
ligkwww (楼主) 5个月前

又学习到新知识了,爆赞!

5个月前 评论

只有一半的数据,都不知道怎样入手测试,能搞一个整套的吗

5个月前 评论

可以,思路不错

5个月前 评论

go-mysql-transfer这个工具也可以实现,而且支持Redis、MongoDB、Elasticsearch、RocketMQ、Kafka、RabbitMQ等多种接收端。github.com/wj596/go-mysql-transfer

4个月前 评论

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!