基于 Hyperf 实现 RabbitMQ + WebSocket 消息推送

介绍

基于Hyperf+ WebSocket +RabbitMQ 实现的一个简单大屏幕的消息推送。

思路

利用WebSocket协议让客户端和服务器端保持有状态的长链接,

保存链接上来的客户端id。订阅发布者发布的消息针对已保存的客户端id进行广播消息。

WebSocket 服务

composer require hyperf/websocket-server

配置文件[config/autoload/server.php]

<?php

return [
    'mode' => SWOOLE_PROCESS,
    'servers' => [
        [
            'name' => 'http',
            'type' => Server::SERVER_HTTP,
            'host' => '0.0.0.0',
            'port' => 11111,
            'sock_type' => SWOOLE_SOCK_TCP,
            'callbacks' => [
                SwooleEvent::ON_REQUEST => [Hyperf\HttpServer\Server::class, 'onRequest'],
            ],
        ],
        [
            'name' => 'ws',
            'type' => Server::SERVER_WEBSOCKET,
            'host' => '0.0.0.0',
            'port' => 12222,
            'sock_type' => SWOOLE_SOCK_TCP,
            'callbacks' => [
                SwooleEvent::ON_HAND_SHAKE => [Hyperf\WebSocketServer\Server::class, 'onHandShake'],
                SwooleEvent::ON_MESSAGE => [Hyperf\WebSocketServer\Server::class, 'onMessage'],
                SwooleEvent::ON_CLOSE => [Hyperf\WebSocketServer\Server::class, 'onClose'],
            ],
        ],
    ],

WebSocket 服务器端代码示例

<?php

declare(strict_types=1);
/**
 * This file is part of Hyperf.
 *
 * @link     https://www.hyperf.io
 * @document https://doc.hyperf.io
 * @contact  group@hyperf.io
 * @license  https://github.com/hyperf-cloud/hyperf/blob/master/LICENSE
 */

namespace App\Controller;

use Hyperf\Contract\OnCloseInterface;
use Hyperf\Contract\OnMessageInterface;
use Hyperf\Contract\OnOpenInterface;
use Swoole\Http\Request;
use Swoole\Server;
use Swoole\Websocket\Frame;
use Swoole\WebSocket\Server as WebSocketServer;

class WebSocketController extends Controller implements OnMessageInterface, OnOpenInterface, OnCloseInterface
{

    /**
     * 发送消息
     * @param WebSocketServer $server
     * @param Frame $frame
     */
    public function onMessage(WebSocketServer $server, Frame $frame): void
    {
        //心跳刷新缓存
        $redis = $this->container->get(\Redis::class);
        //获取所有的客户端id
        $fdList = $redis->sMembers('websocket_sjd_1');
        //如果当前客户端在客户端集合中,就刷新
        if (in_array($frame->fd, $fdList)) {
            $redis->sAdd('websocket_sjd_1', $frame->fd);
            $redis->expire('websocket_sjd_1', 7200);
        }
        $server->push($frame->fd, 'Recv: ' . $frame->data);

    }

    /**
     * 客户端失去链接
     * @param Server $server
     * @param int $fd
     * @param int $reactorId
     */
    public function onClose(Server $server, int $fd, int $reactorId): void
    {
        //删掉客户端id
        $redis = $this->container->get(\Redis::class);
        //移除集合中指定的value
        $redis->sRem('websocket_sjd_1', $fd);
        var_dump('closed');

    }

    /**
     * 客户端链接
     * @param WebSocketServer $server
     * @param Request $request
     */
    public function onOpen(WebSocketServer $server, Request $request): void
    {
        //保存客户端id
        $redis = $this->container->get(\Redis::class);

        $res1 = $redis->sAdd('websocket_sjd_1', $request->fd);
        var_dump($res1);

        $res = $redis->expire('websocket_sjd_1', 7200);
        var_dump($res);

        $server->push($request->fd, 'Opened');

    }
}

WebSocket前端代码

    function WebSocketTest() {
        if ("WebSocket" in window) {
            console.log("您的浏览器支持 WebSocket!");
            var num = 0

            // 打开一个 web socket
            var ws = new WebSocket("ws://127.0.0.1:12222");

            ws.onopen = function () {
                // Web Socket 已连接上,使用 send() 方法发送数据
                //alert("数据发送中...");
                //ws.send("发送数据");
            };

            window.setInterval(function () { //每隔5秒钟发送一次心跳,避免websocket连接因超时而自动断开
                var ping = {"type": "ping"};
                ws.send(JSON.stringify(ping));
            }, 5000);

            ws.onmessage = function (evt) {
                var d = JSON.parse(evt.data);
                console.log(d);
                if (d.code == 300) {
                    $(".address").text(d.address)
                }
                if (d.code == 200) {
                    var v = d.data
                    console.log(v);
                    num++
                    var str = `<div class="item">
                                    <p>${v.recordOutTime}</p>
                                    <p>${v.userOutName}</p>
                                    <p>${v.userOutNum}</p>
                                    <p>${v.doorOutName}</p>
                                </div>`
                    $(".tableHead").after(str)
                    if (num > 7) {
                        num--
                        $(".table .item:nth-last-child(1)").remove()
                    }
                }
            };

            ws.error = function (e) {
                console.log(e)
                alert(e)
            }
            ws.onclose = function () {
                // 关闭 websocket
                alert("连接已关闭...");
            };
        } else {
            alert("您的浏览器不支持 WebSocket!");
        }
    }

AMQP组件

composer require hyperf/amqp

配置文件[config/autoload/amqp.php]

<?php

return [
    'default' => [
        'host' => 'localhost',
        'port' => 5672,
        'user' => 'guest',
        'password' => 'guest',
        'vhost' => '/',
        'pool' => [
            'min_connections' => 1,
            'max_connections' => 10,
            'connect_timeout' => 10.0,
            'wait_timeout' => 3.0,
            'heartbeat' => -1,
        ],
        'params' => [
            'insist' => false,
            'login_method' => 'AMQPLAIN',
            'login_response' => null,
            'locale' => 'en_US',
            'connection_timeout' => 3.0,
            'read_write_timeout' => 6.0,
            'context' => null,
            'keepalive' => false,
            'heartbeat' => 3,
        ],
    ],
];

MQ消费者代码

<?php

declare(strict_types=1);

namespace App\Amqp\Consumer;

use Hyperf\Amqp\Annotation\Consumer;
use Hyperf\Amqp\Message\ConsumerMessage;
use Hyperf\Amqp\Result;
use Hyperf\Server\Server;
use Hyperf\Server\ServerFactory;

/**
 * @Consumer(exchange="hyperf", routingKey="hyperf", queue="hyperf", nums=1)
 */
class DemoConsumer extends ConsumerMessage
{
    /**
     * rabbmitMQ消费端代码
     * @param $data
     * @return string
     */
    public function consume($data): string
    {
        print_r($data);

        //获取集合中所有的value
        $redis = $this->container->get(\Redis::class);
        $fdList=$redis->sMembers('websocket_sjd_1');

        $server=$this->container->get(ServerFactory::class)->getServer()->getServer();
        foreach($fdList as $key=>$v){
            if(!empty($v)){
                $server->push((int)$v, $data);
            }
        }

        return Result::ACK;
    }
}

控制器代码


    /**
     * test
     * @return array
     */
    public function test()
    {
        $data = array(
            'code' => 200,
            'data' => [
                'userOutName' => 'ccflow',
                'userOutNum' => '9999',
                'recordOutTime' => date("Y-m-d H:i:s", time()),
                'doorOutName' => '教师公寓',
            ]
        );
        $data = \GuzzleHttp\json_encode($data);
        $message = new DemoProducer($data);
        $producer = ApplicationContext::getContainer()->get(Producer::class);
        $result = $producer->produce($message);
        var_dump($result);

        $user = $this->request->input('user', 'Hyperf');
        $method = $this->request->getMethod();

        return [
            'method' => $method,
            'message' => "{$user}.",
        ];
    }

最终效果

本作品采用《CC 协议》,转载必须注明作者和本文链接
本帖由系统于 1年前 自动加精
讨论数量: 14

感谢Hyperf开发组!

1年前 评论
李铭昕

不错

1年前 评论

提个小建议,注释格式应该是这样的

/**
 * 发送消息
 * @param WebSocketServer $server
 * @param Frame $frame
 */
1年前 评论
ccflow (楼主) 1年前

file
这个标注的地方 没看懂。。。

1年前 评论
ccflow (楼主) 1年前
oldyellow 9个月前

发现还有这个地方,学习了。

1年前 评论
wangchunbo

66666

1年前 评论

楼主能给个git地址吗 :relaxed:

1年前 评论
刘天承

请问稳定性咋样

9个月前 评论
_杭城浪子 9个月前
JeffreyBool

像我直接写了。直接开撸,长连服务写过两次了。具体并发倒是没测过,但是应该比 php 性能好

9个月前 评论
StringKe

您好,

$server=$this->container->get(ServerFactory::class)->getServer()->getServer();

这句话具体表现是什么?拿到webscoket的 server,我自己尝试时候这里应该存在 http和ws两个server的

8个月前 评论
李铭昕 8个月前
ccflow (楼主) 8个月前
ccflow (楼主) 8个月前

file

8个月前 评论
(作者) 8个月前
ccflow (楼主) 8个月前
XialunuoteRoy 7个月前

DemoProducer 没找到

5个月前 评论

这个怎么看也像是Redis啊

1个月前 评论

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!