轻松上手 PHP RabbitMQ 消息发布与订阅

场景

之前开发一个电竞比分网系统,有许多模块依赖实时比赛状态(待开始、进行中、已结束、异常),比赛状态 进行中->已结束 由图像识别处理,识别到比赛结束后向消息队列发送某场比赛的状态信息,其他模块只需订阅队列消息获取比赛状态更新并进行对于逻辑处理

RabbitMQ 概念

  • 交换器(Exchanges)

    RabbitMQ消息传递模型的核心思想是,生产者不发送任何信息直接到队列。事实上,生产者甚至不知道消息是否会发送到任何队列。生产者只能向交换器发送消息(也叫交换机,默认交换器使用””空字符标记)。交换器需要知道如何处理接收的消息,将消息推入到指定的队列中,决定消息是否入列和抛弃。如下图,P表示消息发布者,X表示交换机,Q1和Q2表示不同的队列
    rabbitmq-exchanges

  • 交换类型

    • direct:
      消息中的路由键(routing key)如果和 Binding 中的 binding key 一致, 交换器就将消息发到对应的队列中。路由键与队列名完全匹配,如果一个队列绑定到交换机要求路由键为”dog”,则只转发 routing key 标记为”A1”的消息,不会转发”A2”,也不会转发”A3”等等。它是完全匹配、单播的模式
    • fanout:
      广播订阅,向所有的消费者发布消息。每个发到 fanout 类型交换器的消息都会分到所有绑定的队列上去。fanout 交换器不处理路由键,只是简单的将队列绑定到交换器上,每个发送到交换器的消息都会被转发到与该交换器绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息(fanout 类型转发消息是最快的)
    • topic:
      交换器通过模式匹配分配消息的路由键属性,将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。它将路由键和绑定键的字符串切分成单词,这些单词之间用点隔开。
    • 两个通配符:符号”#”和符号”*”
      • #:匹配0个或多个单词
      • *:匹配不多不少一个单词

RabbitMQ 安装运行

  • 使用Docker安装RabbitMQ

    $ docker pull rabbitmq:3.8.3-management
  • 运行

    • 服务端口:5672
    • 管理端端口:15672
      $ docker run --name rabbitmq -d -p 5672:5672 -p 15672:15672 -v /data:/var/lib/rabbitmq rabbitmq:3.8.3-management
  • web管理端登录查看(127.0.0.1:15672)

    默认账号:guest,默认密码:guest

rabbitmq-admin

  • 添加管理员

    • 命令行
      $ docker exec -it 89e8e968aebc bash
      root@89e8e968aebc:/# rabbitmqctl add_user ar414 ar414 
      root@89e8e968aebc:/# rabbitmqctl set_user_tags ar414 administrator 
    • Web管理端
      rabbitmq-add-admin
  • 添加vhost
    rabbitmq-add-vhost

PHP 简单使用

安装

$ composer require php-amqplib/php-amqplib

发布者

<?php

require_once __DIR__ . '/../vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$exchange = 'Gaming';

$connection = new AMQPStreamConnection('127.0.0.1', 5672, 'ar414', 'ar414', 'test');
$channel = $connection->channel();
$channel->exchange_declare($exchange, 'direct', false, false, false);
for ($i = 0; $i < 100; $i++) {
    $routes = ['dota', 'csgo', 'lol'];
    $key = array_rand($routes);
    $arr = [
        'match_id' => $i,
        'status' => rand(0,3)
    ];
    $data = json_encode($arr);
    $msg = new AMQPMessage($data);

    $channel->basic_publish($msg, $exchange, $routes[$key]);
    echo '发送 '.$routes[$key].' 消息: ' . $data . PHP_EOL;
}
$channel->close();
$connection->close();

订阅者

<?php

require_once __DIR__ . '/../vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
$exchange = 'Gaming';
$routerKey = 'lol'; //只订阅LOL消息

$connection = new AMQPStreamConnection('127.0.0.1', 5672, 'ar414', 'ar414', 'test');
$channel = $connection->channel(); $channel->exchange_declare($exchange, 'direct', false, false, false);
list($queueName, ,) = $channel->queue_declare("", false, false, true, false);
$channel->queue_bind($queueName, $exchange, $routerKey);

echo " 等待消息中..." .PHP_EOL;
$callback = function ($msg) {
    echo '接收到消息:',$msg->delivery_info['routing_key'], ':', $msg->body, PHP_EOL;
    sleep(1);  //模拟耗时执行
};
$channel->basic_consume($queueName, '', false, true, false, false, $callback);

while ($channel->is_consuming()) {
    $channel->wait();
}

$channel->close();
$connection->close();

运行

ar414
1. 运行某一个订阅者程序监听LOL消息队列(LolSub.php)
2. 运行发送者程序(Send.php)

发送者

$ php Send.php 

rabbitmq-send

LOL订阅者

$ php LolSub.php 

lol-sub

本作品采用《CC 协议》,转载必须注明作者和本文链接
本帖由系统于 3年前 自动加精
AR414
《L05 电商实战》
从零开发一个电商项目,功能包括电商后台、商品 & SKU 管理、购物车、订单管理、支付宝支付、微信支付、订单退款流程、优惠券等
《L03 构架 API 服务器》
你将学到如 RESTFul 设计风格、PostMan 的使用、OAuth 流程,JWT 概念及使用 和 API 开发相关的进阶知识。
讨论数量: 10

直接看官方文档吧,教程更多
想问一下你们的laravel版本是5.5以上的版本还是5.5的,如果是以上的你们有把rabbitmq适配laravel的队列吗?

3年前 评论

感觉和腾讯的 CMQ 是类似的。

3年前 评论
wangchunbo

非常棒!厉害,我要学习下!

3年前 评论
wangchunbo

请问这及时变化的数据,在前端使用socket好些,还是使用轮询?

3年前 评论
AR414 (楼主) 3年前
wangchunbo (作者) 3年前
AR414 (楼主) 3年前
wangchunbo (作者) 3年前
fatrbaby

重要的是你在业务中是如使用的? 使用脚本一直监听消息吗?然后更改数据库?

3年前 评论
AR414 (楼主) 3年前
xxxie

啊啊啊。感谢老哥分享,正好有求于这一块 :+1: :+1:

3年前 评论
AR414 (楼主) 3年前

收藏等于学习了 :smile:

3年前 评论

貌似长时间没有消息会断开链接,这个没做处理了。可以加心跳解决

3年前 评论
AR414 (楼主) 3年前

请问消费者在生产环境是怎么运行的,总不可能是 php LolSub.php 吧。如何保证进程一直运行

3年前 评论
AR414 (楼主) 3年前

我遇到了时间一长消费者队列就收不到消息的情况

2年前 评论

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!