轻松上手 PHP RabbitMQ 消息发布与订阅

场景

之前开发一个电竞比分网系统,有许多模块依赖实时比赛状态(待开始、进行中、已结束、异常),比赛状态 进行中->已结束 由图像识别处理,识别到比赛结束后向消息队列发送某场比赛的状态信息,其他模块只需订阅队列消息获取比赛状态更新并进行对于逻辑处理

RabbitMQ 概念

  • 交换器(Exchanges)

    RabbitMQ消息传递模型的核心思想是,生产者不发送任何信息直接到队列。事实上,生产者甚至不知道消息是否会发送到任何队列。生产者只能向交换器发送消息(也叫交换机,默认交换器使用””空字符标记)。交换器需要知道如何处理接收的消息,将消息推入到指定的队列中,决定消息是否入列和抛弃。如下图,P表示消息发布者,X表示交换机,Q1和Q2表示不同的队列
    rabbitmq-exchanges

  • 交换类型

    • direct:
      消息中的路由键(routing key)如果和 Binding 中的 binding key 一致, 交换器就将消息发到对应的队列中。路由键与队列名完全匹配,如果一个队列绑定到交换机要求路由键为”dog”,则只转发 routing key 标记为”A1”的消息,不会转发”A2”,也不会转发”A3”等等。它是完全匹配、单播的模式
    • fanout:
      广播订阅,向所有的消费者发布消息。每个发到 fanout 类型交换器的消息都会分到所有绑定的队列上去。fanout 交换器不处理路由键,只是简单的将队列绑定到交换器上,每个发送到交换器的消息都会被转发到与该交换器绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息(fanout 类型转发消息是最快的)
    • topic:
      交换器通过模式匹配分配消息的路由键属性,将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。它将路由键和绑定键的字符串切分成单词,这些单词之间用点隔开。
    • 两个通配符:符号”#”和符号”*”
      • #:匹配0个或多个单词
      • *:匹配不多不少一个单词

RabbitMQ 安装运行

  • 使用Docker安装RabbitMQ

    $ docker pull rabbitmq:3.8.3-management
  • 运行

    • 服务端口:5672
    • 管理端端口:15672
      $ docker run --name rabbitmq -d -p 5672:5672 -p 15672:15672 -v /data:/var/lib/rabbitmq rabbitmq:3.8.3-management
  • web管理端登录查看(127.0.0.1:15672)

    默认账号:guest,默认密码:guest

rabbitmq-admin

  • 添加管理员

    • 命令行
      $ docker exec -it 89e8e968aebc bash
      root@89e8e968aebc:/# rabbitmqctl add_user ar414 ar414 
      root@89e8e968aebc:/# rabbitmqctl set_user_tags ar414 administrator 
    • Web管理端
      rabbitmq-add-admin
  • 添加vhost
    rabbitmq-add-vhost

PHP 简单使用

安装

$ composer require php-amqplib/php-amqplib

发布者

<?php

require_once __DIR__ . '/../vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$exchange = 'Gaming';

$connection = new AMQPStreamConnection('127.0.0.1', 5672, 'ar414', 'ar414', 'test');
$channel = $connection->channel();
$channel->exchange_declare($exchange, 'direct', false, false, false);
for ($i = 0; $i < 100; $i++) {
    $routes = ['dota', 'csgo', 'lol'];
    $key = array_rand($routes);
    $arr = [
        'match_id' => $i,
        'status' => rand(0,3)
    ];
    $data = json_encode($arr);
    $msg = new AMQPMessage($data);

    $channel->basic_publish($msg, $exchange, $routes[$key]);
    echo '发送 '.$routes[$key].' 消息: ' . $data . PHP_EOL;
}
$channel->close();
$connection->close();

订阅者

<?php

require_once __DIR__ . '/../vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
$exchange = 'Gaming';
$routerKey = 'lol'; //只订阅LOL消息

$connection = new AMQPStreamConnection('127.0.0.1', 5672, 'ar414', 'ar414', 'test');
$channel = $connection->channel(); $channel->exchange_declare($exchange, 'direct', false, false, false);
list($queueName, ,) = $channel->queue_declare("", false, false, true, false);
$channel->queue_bind($queueName, $exchange, $routerKey);

echo " 等待消息中..." .PHP_EOL;
$callback = function ($msg) {
    echo '接收到消息:',$msg->delivery_info['routing_key'], ':', $msg->body, PHP_EOL;
    sleep(1);  //模拟耗时执行
};
$channel->basic_consume($queueName, '', false, true, false, false, $callback);

while ($channel->is_consuming()) {
    $channel->wait();
}

$channel->close();
$connection->close();

运行

1. 运行某一个订阅者程序监听LOL消息队列(LolSub.php)
2. 运行发送者程序(Send.php)

发送者

$ php Send.php 

rabbitmq-send

LOL订阅者

$ php LolSub.php 

lol-sub

本作品采用《CC 协议》,转载必须注明作者和本文链接
本帖由系统于 2个月前 自动加精
AR414
《L01 基础入门》
我们将带你从零开发一个项目并部署到线上,本课程教授 Web 开发中专业、实用的技能,如 Git 工作流、Laravel Mix 前端工作流等。
《L02 从零构建论坛系统》
以构建论坛项目 LaraBBS 为线索,展开对 Laravel 框架的全面学习。应用程序架构思路贴近 Laravel 框架的设计哲学。
讨论数量: 8

直接看官方文档吧,教程更多
想问一下你们的laravel版本是5.5以上的版本还是5.5的,如果是以上的你们有把rabbitmq适配laravel的队列吗?

3个月前 评论
青风百里

感觉和腾讯的 CMQ 是类似的。

3个月前 评论
wangchunbo

非常棒!厉害,我要学习下!

3个月前 评论
wangchunbo

请问这及时变化的数据,在前端使用socket好些,还是使用轮询?

3个月前 评论
AR414 (楼主) 3个月前
wangchunbo (作者) 3个月前
AR414 (楼主) 3个月前
wangchunbo (作者) 3个月前
fatrbaby

重要的是你在业务中是如使用的? 使用脚本一直监听消息吗?然后更改数据库?

2个月前 评论
AR414 (楼主) 2个月前
xxxie

啊啊啊。感谢老哥分享,正好有求于这一块 :+1: :+1:

2个月前 评论
AR414 (楼主) 2个月前

收藏等于学习了 :smile:

2个月前 评论

貌似长时间没有消息会断开链接,这个没做处理了。可以加心跳解决

1个月前 评论

请勿发布不友善或者负能量的内容。与人为善,比聪明更重要!