MongoDB changeStream 的 PHP 实现

什么是 mongodb 的 changeStream#

  • changeStream 是 mongodb 基于 oplog (操作日志) 的实时的数据变更流,应用程序可以通过 changeStream 实现对 mongo 数据变更的订阅,我们可以理解为 MySQL 中的触发器或者基于 binlog 复制的从节点。

如何使用#

  • changeSteam 可以对某个 db/colletion/deployment 进行监听 (watch)
  • 监听的事件 (event) 包括 insert Event、Update Event、Replace Event、 delete Event 等事件

使用场景#

  • 可以利用这个特性实现对 mongo 中数据变更的监听,进行数据同步、复制等操作,举个栗子,公司原始系统使用 PHP+MySQL 进行服务,后续经过发展,账号服务越来越复杂,账号属性多变,数据存储切换至 MongoDB,采用 Java 进行新服务提供,但是老系统又不能一时砍掉,砍掉我们就要丢饭碗了,那数据流从 mongo 到 MySQL 如何进行同步呢,一个方式就是可以利用 changeStream 进行。

代码实现#

  • 这里我通过一个简单的 demo 来学习 changeStream

  • 前提准备

  • 安装 MongoDB

  • mongodb 需要开启复制集,也就是必须要主从,部分版本需要开启 enableMajorityReadConcern: true

  • 安装 PHPmongdoDB 扩展,注意事项 mongodb 扩展而不是 mongo

  • composer 安装 php-mongodb 包

实现 具体实现后面贴图#

  • 配置 mongodb 的复制集,mongo.conf 在代码实例中,这里我们使用一主两从
  • 启动 mongod -f /Users/gaoz/project13/mongo/db1/mongod.conf
  • 启动监听客户端 php index.php
  • 通过 mongoshell 进行数据 insert/update 操作

注意事项#

  • 在实际的使用过程中,要注意数据丢失的情况
  • 考虑大量数据变更的发生,加入 Mq 中间件进行消息处理
  • 做好服务故障监听

启动 mongod 服务,这里创建三个 /mongo/db 分别为 db1, db2, db3, 端口分别为 28017,28018,28019#

启动 mongo 监听脚本#

通过 mongoshell 操作数据#

更新数据#

监听脚本接受到数据变更#

看一下接受到数据#

这里重点关注三个内容#

  • operationType: 操作类型,可能是 update、insert、replace 等
  • fullDocument: 文档内容
  • 如果是更新的话,updateFields 则是变更内容

后续的使用,比如同步到 MySQL,大家可以在监听脚本中加入同步逻辑即可。#

参考资料#

参考代码

个人学习使用,欢迎指正讨论。

本作品采用《CC 协议》,转载必须注明作者和本文链接