基于 Hyperf 实现 RabbitMQ + WebSocket 消息推送

介绍

基于Hyperf+ WebSocket +RabbitMQ 实现的一个简单大屏幕的消息推送。

思路

利用WebSocket协议让客户端和服务器端保持有状态的长链接,

保存链接上来的客户端id。订阅发布者发布的消息针对已保存的客户端id进行广播消息。

WebSocket 服务

composer require hyperf/websocket-server

配置文件[config/autoload/server.php]

<?php

return [
    'mode' => SWOOLE_PROCESS,
    'servers' => [
        [
            'name' => 'http',
            'type' => Server::SERVER_HTTP,
            'host' => '0.0.0.0',
            'port' => 11111,
            'sock_type' => SWOOLE_SOCK_TCP,
            'callbacks' => [
                SwooleEvent::ON_REQUEST => [Hyperf\HttpServer\Server::class, 'onRequest'],
            ],
        ],
        [
            'name' => 'ws',
            'type' => Server::SERVER_WEBSOCKET,
            'host' => '0.0.0.0',
            'port' => 12222,
            'sock_type' => SWOOLE_SOCK_TCP,
            'callbacks' => [
                SwooleEvent::ON_HAND_SHAKE => [Hyperf\WebSocketServer\Server::class, 'onHandShake'],
                SwooleEvent::ON_MESSAGE => [Hyperf\WebSocketServer\Server::class, 'onMessage'],
                SwooleEvent::ON_CLOSE => [Hyperf\WebSocketServer\Server::class, 'onClose'],
            ],
        ],
    ],

WebSocket 服务器端代码示例

<?php

declare(strict_types=1);
/**
 * This file is part of Hyperf.
 *
 * @link     https://www.hyperf.io
 * @document https://doc.hyperf.io
 * @contact  group@hyperf.io
 * @license  https://github.com/hyperf-cloud/hyperf/blob/master/LICENSE
 */

namespace App\Controller;

use Hyperf\Contract\OnCloseInterface;
use Hyperf\Contract\OnMessageInterface;
use Hyperf\Contract\OnOpenInterface;
use Swoole\Http\Request;
use Swoole\Server;
use Swoole\Websocket\Frame;
use Swoole\WebSocket\Server as WebSocketServer;

class WebSocketController extends Controller implements OnMessageInterface, OnOpenInterface, OnCloseInterface
{

    /**
     * 发送消息
     * @param WebSocketServer $server
     * @param Frame $frame
     */
    public function onMessage(WebSocketServer $server, Frame $frame): void
    {
        //心跳刷新缓存
        $redis = $this->container->get(\Redis::class);
        //获取所有的客户端id
        $fdList = $redis->sMembers('websocket_sjd_1');
        //如果当前客户端在客户端集合中,就刷新
        if (in_array($frame->fd, $fdList)) {
            $redis->sAdd('websocket_sjd_1', $frame->fd);
            $redis->expire('websocket_sjd_1', 7200);
        }
        $server->push($frame->fd, 'Recv: ' . $frame->data);

    }

    /**
     * 客户端失去链接
     * @param Server $server
     * @param int $fd
     * @param int $reactorId
     */
    public function onClose(Server $server, int $fd, int $reactorId): void
    {
        //删掉客户端id
        $redis = $this->container->get(\Redis::class);
        //移除集合中指定的value
        $redis->sRem('websocket_sjd_1', $fd);
        var_dump('closed');

    }

    /**
     * 客户端链接
     * @param WebSocketServer $server
     * @param Request $request
     */
    public function onOpen(WebSocketServer $server, Request $request): void
    {
        //保存客户端id
        $redis = $this->container->get(\Redis::class);

        $res1 = $redis->sAdd('websocket_sjd_1', $request->fd);
        var_dump($res1);

        $res = $redis->expire('websocket_sjd_1', 7200);
        var_dump($res);

        $server->push($request->fd, 'Opened');

    }
}

WebSocket前端代码

    function WebSocketTest() {
        if ("WebSocket" in window) {
            console.log("您的浏览器支持 WebSocket!");
            var num = 0

            // 打开一个 web socket
            var ws = new WebSocket("ws://127.0.0.1:12222");

            ws.onopen = function () {
                // Web Socket 已连接上,使用 send() 方法发送数据
                //alert("数据发送中...");
                //ws.send("发送数据");
            };

            window.setInterval(function () { //每隔5秒钟发送一次心跳,避免websocket连接因超时而自动断开
                var ping = {"type": "ping"};
                ws.send(JSON.stringify(ping));
            }, 5000);

            ws.onmessage = function (evt) {
                var d = JSON.parse(evt.data);
                console.log(d);
                if (d.code == 300) {
                    $(".address").text(d.address)
                }
                if (d.code == 200) {
                    var v = d.data
                    console.log(v);
                    num++
                    var str = `<div class="item">
                                    <p>${v.recordOutTime}</p>
                                    <p>${v.userOutName}</p>
                                    <p>${v.userOutNum}</p>
                                    <p>${v.doorOutName}</p>
                                </div>`
                    $(".tableHead").after(str)
                    if (num > 7) {
                        num--
                        $(".table .item:nth-last-child(1)").remove()
                    }
                }
            };

            ws.error = function (e) {
                console.log(e)
                alert(e)
            }
            ws.onclose = function () {
                // 关闭 websocket
                alert("连接已关闭...");
            };
        } else {
            alert("您的浏览器不支持 WebSocket!");
        }
    }

AMQP组件

composer require hyperf/amqp

配置文件[config/autoload/amqp.php]

<?php

return [
    'default' => [
        'host' => 'localhost',
        'port' => 5672,
        'user' => 'guest',
        'password' => 'guest',
        'vhost' => '/',
        'pool' => [
            'min_connections' => 1,
            'max_connections' => 10,
            'connect_timeout' => 10.0,
            'wait_timeout' => 3.0,
            'heartbeat' => -1,
        ],
        'params' => [
            'insist' => false,
            'login_method' => 'AMQPLAIN',
            'login_response' => null,
            'locale' => 'en_US',
            'connection_timeout' => 3.0,
            'read_write_timeout' => 6.0,
            'context' => null,
            'keepalive' => false,
            'heartbeat' => 3,
        ],
    ],
];

MQ消费者代码

<?php

declare(strict_types=1);

namespace App\Amqp\Consumer;

use Hyperf\Amqp\Annotation\Consumer;
use Hyperf\Amqp\Message\ConsumerMessage;
use Hyperf\Amqp\Result;
use Hyperf\Server\Server;
use Hyperf\Server\ServerFactory;

/**
 * @Consumer(exchange="hyperf", routingKey="hyperf", queue="hyperf", nums=1)
 */
class DemoConsumer extends ConsumerMessage
{
    /**
     * rabbmitMQ消费端代码
     * @param $data
     * @return string
     */
    public function consume($data): string
    {
        print_r($data);

        //获取集合中所有的value
        $redis = $this->container->get(\Redis::class);
        $fdList=$redis->sMembers('websocket_sjd_1');

        $server=$this->container->get(ServerFactory::class)->getServer()->getServer();
        foreach($fdList as $key=>$v){
            if(!empty($v)){
                $server->push((int)$v, $data);
            }
        }

        return Result::ACK;
    }
}

控制器代码


    /**
     * test
     * @return array
     */
    public function test()
    {
        $data = array(
            'code' => 200,
            'data' => [
                'userOutName' => 'ccflow',
                'userOutNum' => '9999',
                'recordOutTime' => date("Y-m-d H:i:s", time()),
                'doorOutName' => '教师公寓',
            ]
        );
        $data = \GuzzleHttp\json_encode($data);
        $message = new DemoProducer($data);
        $producer = ApplicationContext::getContainer()->get(Producer::class);
        $result = $producer->produce($message);
        var_dump($result);

        $user = $this->request->input('user', 'Hyperf');
        $method = $this->request->getMethod();

        return [
            'method' => $method,
            'message' => "{$user}.",
        ];
    }

最终效果

本作品采用《CC 协议》,转载必须注明作者和本文链接
本帖由系统于 4年前 自动加精
讨论数量: 27

感谢Hyperf开发组!

4年前 评论
李铭昕

不错

4年前 评论

提个小建议,注释格式应该是这样的

/**
 * 发送消息
 * @param WebSocketServer $server
 * @param Frame $frame
 */
4年前 评论
ccflow (楼主) 4年前

file
这个标注的地方 没看懂。。。

4年前 评论
ccflow (楼主) 4年前
oldyellow 4年前

发现还有这个地方,学习了。

4年前 评论
wangchunbo

66666

4年前 评论

楼主能给个git地址吗 :relaxed:

4年前 评论
刘天承

请问稳定性咋样

4年前 评论
_杭城浪子 4年前
JeffreyBool

像我直接写了。直接开撸,长连服务写过两次了。具体并发倒是没测过,但是应该比 php 性能好

4年前 评论
StringKe

您好,

$server=$this->container->get(ServerFactory::class)->getServer()->getServer();

这句话具体表现是什么?拿到webscoket的 server,我自己尝试时候这里应该存在 http和ws两个server的

3年前 评论
李铭昕 3年前
ccflow (楼主) 3年前
ccflow (楼主) 3年前

file

3年前 评论
(作者) 3年前
ccflow (楼主) 3年前
XialunuoteRoy 3年前

DemoProducer 没找到

3年前 评论

这个怎么看也像是Redis啊

3年前 评论

在SWOOLE_PROCESS模式下可以判断客户端连接状态,在SWOOLE_BASE模式下的时候就不能判断了($this->container->get(ServerFactory::class)->getServer()->getServer()->exist($fd);)但是消息还是能够发送,请问这个是什么问题呢?我要怎么才能通过fd去判断连接状态呢。

2年前 评论

一段时间过后websocket_sjd_1 集合内容越来越多

2年前 评论

你好,怎么做websocket的鉴权,我的做法是在握手阶段对请求头鉴权,但是实现了onHandShake方法并没有执行

4个月前 评论

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!