Hyperf搭建websocket集群项目(通过redis发布订阅)

Hyperf分布式websocket解决方案

场景:

1、多个websocket server
2、客户端 clientA 连接到 serverA 得到 fdA, clientB 连接到 serverB 得到 fdB
3、clientA 想要给 clientB 发送消息

注:fd 类似于当前链接的文件描述符,每一次新的连接会自增 1 ,可以理解为连接号

问题:clientA 和 clientB 连接到的是不同的服务器,fd作用域仅限于当前服务器,要想跨服务器想实现通讯,需要借助中间件来传递消息

解决方案

方案一、使用redis发布订阅(pub/sub):

第一步:生成分布式fd
这里的分布式FD, 目的是让不同服务器在推送消息的时候能够知道这个fd是哪个server的,然后进行定向推送

附代码:

//链接上服务器时的回调 需要在这里生成分布式fd,且保存用户与分布式fd的映射关系
public function onOpen($server, Request $request): void
    {
        $uid = $request->get['uid'];
        $server->push($request->fd, 'Opened');
        //我这里用了环境变量APP_NAME去区分了server 你们也可以看着办 只要能区分出服务器就行
        $server_name = env('APP_NAME');
        //这就是一个简易的分布式fd,能够确保我能通过某种方法解析出服务器名称和在这个服务器上的fd即可
        $fd = $server_name . '_' . $request->fd;
        redis()->set("user:$uid", $fd);
   }

redis中能够看到映射关系已经建立好了
Hyperf搭建websocket集群项目

以上已经保存好了用户与分布式fd的映射,接下来就是发送消息的时候怎么转发给指定服务器的问题了
//收到客户端消息回调
public function onMessage($server, Frame $frame): void
    {
        $data = json_decode($frame->data, true);
        $uid = (int)$data['uid'];
        $text = $data['data'];

        $target_fd = redis()->get("user:$uid");
        if (!$target_fd) {
            $server->push($frame->fd, 'not exist');
            return;
        }
        //这里咱们根据onOpen时生成分布式fd的规则 结析出服务器和fd
        [$server_name, $server_fd] = explode('_', $target_fd);

        //向订阅了这个服务器channel发布消息
        //这里如果封装的话 最好判断一下是不是本服务器,如果是的话就不需要通过(pub/sub)了
        redis()->publish($server_name, json_encode([
            'fd' => (int)$server_fd,
            'data' => $text
        ]));
    }
注意的点:由于redis的subscribe方法是阻塞的,所以需要在hyperf中使用自定义进程,该进程只负责订阅和回调,不影响其他进程,收到订阅消息后执行回调即可

有关于hyperf自定义进程请看自定义进程

/**
 * 订阅redis频道进程
 * @Process(name="subscribe_process")
 */
class SubscribeProcess extends AbstractProcess
{
    public function handle(): void
    {
       //这里依然是订阅了本服务器上的环境变量APP_NAME,这样就能够实现指定发布到某一个channel上
        $server_name = env('APP_NAME');
        $redis = redis();
        //订阅是阻塞的 如果这里的redis链接限制了超时时间 那么到时间后就会断开 该进程也就失效了,所以这里要解除超时限制
        $redis->setOption(\Redis::OPT_READ_TIMEOUT, -1);
        $redis->subscribe([
            $server_name,
        ], [
            $this,
            'dispatchChannel'
        ]);
    }

    /**
     * Notes: 订阅事件回调 这里其实就是让这个服务器推送消息了
     * User: 陈朋
     * DateTime: 2022/06/28 15:22
     * @param $redis
     * @param string $channel
     * @param string $msg
     * @return void
     */
    private function dispatchChannel($redis, string $channel, string $msg): void
    {
        //当然这里能做的不只是推送消息,也可以在$msg中传递一个自定义的type字段,根据type来做不同的处理,比如强制断开连接
        $msg = json_decode($msg, true);
        $data = $msg['data'];
        $fd = $msg['fd'];
        if (!server()->exists($fd)) {
            return;
        }
        //到这里就是给指定的fd发送消息了
        server()->push($fd, $data);
    }
}

上面的代码就能够解决不同客户端连接到不同server时的通讯问题

验证:

本地开启两个不同端口的server

我这里是9501 和 9503

配置env

Hyperf搭建websocket集群项目(通过redis发布订阅)

启动第一个server

Hyperf搭建websocket集群项目(通过redis发布订阅)

然后修改env

Hyperf搭建websocket集群项目(通过redis发布订阅)

启动第二个server

Hyperf搭建websocket集群项目(通过redis发布订阅)

写一个简易的view当作websocket客户端

我这里的demo是想要实现给指定的uid发送消息

然后分别连接到两个不同的websocket服务器

连接演示

Hyperf搭建websocket集群项目(通过redis发布订阅)

效果演示

Hyperf搭建websocket集群项目(通过redis发布订阅)

当然分布式解决方案还有消息队列,网关等。有空我会再出一个通过消息队列实现的帖子。本文简单的演示了使用redis的发布订阅实现的分布式websocket项目搭建,有不足之处请大神们指教

该项目已封装成composer包 支持配置发布订阅驱动或者异步队列驱动

AstonChenDev/hyperf-distribute-websocket: 基于redis发布订阅和异步队列实现的ws分布式通信 (github.com)

本作品采用《CC 协议》,转载必须注明作者和本文链接
本帖由系统于 1年前 自动加精
讨论数量: 2

可以用gethostname()获取主机名代替APP_NAME做区分

1年前 评论

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!