基于Canal+Kafka实现缓存实时更新
前言#
相信对于大部分同学来说,缓存应该是平时开发中会经常接触的东西了。常规的逻辑一般是「查询」->「检查是否有缓存」如果有,就直接返回;如果没有,则从 DB 中读取数据。后续对数据进行过更新之后,再删除 / 更新缓存。
所以从流程上来看读缓存很容易,难的是保证缓存数据的有效性,大多数的做法是在业务代码中嵌入更新缓存的逻辑。比如在修改某篇文章成功之后,删除原有的 key。
正常情况下这种方式没有问题,但有时我们的系统中不止一个地方会修改这些数据,那么我们就不得不在每处业务代码中植入更新缓存的逻辑,随着时间的推移,我们的代码变得越来越臃肿,难以维护。
那么有没有什么方法可以让我们可以不用花费太多精力来关注业务之外的事情呢?当然有,这就是我们今天要介绍的工具 Canal。
Canal 介绍#
canal 是阿里巴巴旗下的一款开源项目,纯 Java 开发。基于数据库增量日志解析,提供增量数据订阅 & 消费,可以很方便地同步数据库的增量数据到其他的存储应用。
他的原理是把自己伪装成 MySQL slave,模拟 MySQL slave 的交互协议向 MySQL Mater 发送 dump 协议,MySQL master 收到 canal 发送过来的 dump 请求,开始推送 binary log 给 canal,然后 canal 解析 binary log,再发送到存储目的地,比如 MySQL,Kafka,Elastic Search 等等。
那么我们今天要分享的就是基于 Mysql+Canal+Kafka+Redis 来实现缓存数据的实时更新。
环境准备#
- Canal
- Mysql
- Zookeeper
- Kafka
- Redis
具体安装流程就不在这里说明了,网上都有,嫌麻烦的同学可以直接使用 docker 进行安装
工作流程#
说明:大体流程就是 canal 充当一个 mysql 的从服务器,从 master 拉取 binlog 变化,将更新内容推送至 kafka 中,然后客户端启动消费者订阅主题,根据数据变化执行对应的业务逻辑。
Mysql#
首先数据库需要开启 binlog
my.cnf
[mysqld]
pid-file = /var/run/mysqld/mysqld.pid
socket = /var/run/mysqld/mysqld.sock
datadir = /var/lib/mysql
log-bin = /var/log/mysql/mysql-bin
binlog-format = row
server-id = 1
然后需要创建一个 canal 用户,用于 canal 实例拉取 binlog,具体库权限根据自身业务匹配,我这里直接给到所有。
create user canal@'%' identified by 'canal';
grant all privileges on *.* to 'canal'@'%';
flush privileges;
Canal 配置#
接下来就是 canal 的配置,主要分为两部分:全局配置 (canal.properties) 和 实例配置 (instance.properties),配置文件在 canal-server/conf 下,这里只列出一些修改项,其他的均采用默认配置。
canal.properties:
#canal参数
canal.register.ip = 172.17.0.4 # canal服务器地址
canal.port = 11111 # 端口
#zookeeper
canal.zkServers = 172.18.0.2:2181 #zookeeper的服务器地址及端口
canal.serverMode = kafka #服务模式 : tcp, kafka, rocketMQ, rabbitMQ
#destinations
canal.destinations = example # 实例
#kafka
kafka.bootstrap.servers = kafka_kafka_1:9092 #kafka地址
instance.properties:
canal.instance.mysql.slaveId=999 # mysql->slaveId,其实就是mysql的server-id,不要和数据库的server-id冲突
canal.instance.master.address=172.17.0.2:3306 # mysql连接地址
canal.instance.master.journal.name=mysql-bin.000005 #mysql-binlog日志
canal.instance.master.position=462 # 日志偏移位置
canal.instance.dbUsername=canal # mysql用户名
canal.instance.dbPassword=canal # mysql密码
canal.instance.filter.regex=.*\\..* # 监听库表,当前配置是监听所有库所有表
#canal.instance.filter.regex=test\\..* # 此配置是监听test库下所有表
canal.mq.topic=canal # Kafka topic名称
Kafka#
Kafka 相关内容可以参考「Kafka 应用」系列文章
操作演示#
启动 canal
docker run --name canal --network kafka_default -v /Users/admin/docker/conf/canal:/home/admin/canal-server/conf -p 11111:11111 -p 11112:11112 -p 11110:11110 -d canal/canal-server
修改数据库
MQ 消费者 (Client)
此时 Kafka 消费者会读取到一条消息,内容为本次更新内容。
MQ 消费者 (PHP)
使用 PHP 客户端来消费 Kafka 数据。public function handle() { $this->line("开启消费者..."); $conf = new \RdKafka\Conf(); $conf->set('group.id', 'test'); $conf->set('metadata.broker.list', '192.168.65.2:32768'); $conf->set('enable.auto.commit', 'false'); $conf->set('auto.offset.reset', 'earliest'); $consumer = new \RdKafka\KafkaConsumer($conf); $consumer->subscribe(['canal']); $this->line("订阅主题..."); while (true) { $message = $consumer->consume(120*10000); $this->line("接收消息..."); switch ($message->err) { case RD_KAFKA_RESP_ERR_NO_ERROR: $payload = json_decode($message->payload, true); $this->line("产生的操作:".$payload['type']); $this->line("变更的库:".$payload['database']); $this->line("变更的表:".$payload['table']); echo "变更的数据:"; var_dump($payload['data']); // 根据变更数据执行具体业务 // redis->del("xxx") 删除缓存等 $consumer->commit($message); break; case RD_KAFKA_RESP_ERR__PARTITION_EOF: echo "No more messages; will wait for more\n"; break; case RD_KAFKA_RESP_ERR__TIMED_OUT: echo "Timed out\n"; break; default: throw new \Exception($message->errstr(), $message->err); break; } } }
执行结果:
至此,我们在客户端已经获取到数据库实时变更数据,对应的可以执行我们自身的业务逻辑,比如:发送通知、缓存更新等操作。
注意事项#
连接不上 mysql/zookeeper 的问题
检查配置文件中 mysql 或 zookeeper 的连接地址是否正确,canal 所在服务器是否可以 ping 通
docker 环境下需要提前配置网络环境,已达到容器可以互相访问的目的。
docker –link mysql:mysql # 创建和 mysql 容器的连接
docker –network kafka_default # 将当前容器加入至指定网络中。
注意:link 和 network 同时使用时,network 会将 link 覆盖。需要使用 docker network connect 命令来连接 link 所在的网络。
例如:
docker inspect mysql # 查看mysql的network-> bridge
docker network connect bridge canal #将canal容器加入至bridge中。
或者使用 compose.yml 的方式进行网络设置,docker run 命令下只能连接一个网络。
本作品采用《CC 协议》,转载必须注明作者和本文链接
推荐文章: