Hyperf搭建websocket集群项目(通过redis发布订阅)
Hyperf分布式websocket解决方案
场景:
1、多个websocket server
2、客户端 clientA 连接到 serverA 得到 fdA, clientB 连接到 serverB 得到 fdB
3、clientA 想要给 clientB 发送消息
注:fd 类似于当前链接的文件描述符,每一次新的连接会自增 1 ,可以理解为连接号
问题:clientA 和 clientB 连接到的是不同的服务器,fd作用域仅限于当前服务器,要想跨服务器想实现通讯,需要借助中间件来传递消息
解决方案
方案一、使用redis发布订阅(pub/sub):
第一步:生成分布式fd
这里的分布式FD, 目的是让不同服务器在推送消息的时候能够知道这个fd是哪个server的,然后进行定向推送
附代码:
//链接上服务器时的回调 需要在这里生成分布式fd,且保存用户与分布式fd的映射关系
public function onOpen($server, Request $request): void
{
$uid = $request->get['uid'];
$server->push($request->fd, 'Opened');
//我这里用了环境变量APP_NAME去区分了server 你们也可以看着办 只要能区分出服务器就行
$server_name = env('APP_NAME');
//这就是一个简易的分布式fd,能够确保我能通过某种方法解析出服务器名称和在这个服务器上的fd即可
$fd = $server_name . '_' . $request->fd;
redis()->set("user:$uid", $fd);
}
redis中能够看到映射关系已经建立好了
以上已经保存好了用户与分布式fd的映射,接下来就是发送消息的时候怎么转发给指定服务器的问题了
//收到客户端消息回调
public function onMessage($server, Frame $frame): void
{
$data = json_decode($frame->data, true);
$uid = (int)$data['uid'];
$text = $data['data'];
$target_fd = redis()->get("user:$uid");
if (!$target_fd) {
$server->push($frame->fd, 'not exist');
return;
}
//这里咱们根据onOpen时生成分布式fd的规则 结析出服务器和fd
[$server_name, $server_fd] = explode('_', $target_fd);
//向订阅了这个服务器channel发布消息
//这里如果封装的话 最好判断一下是不是本服务器,如果是的话就不需要通过(pub/sub)了
redis()->publish($server_name, json_encode([
'fd' => (int)$server_fd,
'data' => $text
]));
}
注意的点:由于redis的subscribe方法是阻塞的,所以需要在hyperf中使用自定义进程,该进程只负责订阅和回调,不影响其他进程,收到订阅消息后执行回调即可
有关于hyperf自定义进程请看自定义进程
/**
* 订阅redis频道进程
* @Process(name="subscribe_process")
*/
class SubscribeProcess extends AbstractProcess
{
public function handle(): void
{
//这里依然是订阅了本服务器上的环境变量APP_NAME,这样就能够实现指定发布到某一个channel上
$server_name = env('APP_NAME');
$redis = redis();
//订阅是阻塞的 如果这里的redis链接限制了超时时间 那么到时间后就会断开 该进程也就失效了,所以这里要解除超时限制
$redis->setOption(\Redis::OPT_READ_TIMEOUT, -1);
$redis->subscribe([
$server_name,
], [
$this,
'dispatchChannel'
]);
}
/**
* Notes: 订阅事件回调 这里其实就是让这个服务器推送消息了
* User: 陈朋
* DateTime: 2022/06/28 15:22
* @param $redis
* @param string $channel
* @param string $msg
* @return void
*/
private function dispatchChannel($redis, string $channel, string $msg): void
{
//当然这里能做的不只是推送消息,也可以在$msg中传递一个自定义的type字段,根据type来做不同的处理,比如强制断开连接
$msg = json_decode($msg, true);
$data = $msg['data'];
$fd = $msg['fd'];
if (!server()->exists($fd)) {
return;
}
//到这里就是给指定的fd发送消息了
server()->push($fd, $data);
}
}
上面的代码就能够解决不同客户端连接到不同server时的通讯问题
验证:
本地开启两个不同端口的server
我这里是9501 和 9503
配置env
启动第一个server
然后修改env
启动第二个server
写一个简易的view当作websocket客户端
我这里的demo是想要实现给指定的uid发送消息
然后分别连接到两个不同的websocket服务器
连接演示
效果演示
当然分布式解决方案还有消息队列,网关等。有空我会再出一个通过消息队列实现的帖子。本文简单的演示了使用redis的发布订阅实现的分布式websocket项目搭建,有不足之处请大神们指教
该项目已封装成composer包 支持配置发布订阅驱动或者异步队列驱动
AstonChenDev/hyperf-distribute-websocket: 基于redis发布订阅和异步队列实现的ws分布式通信 (github.com)
本作品采用《CC 协议》,转载必须注明作者和本文链接
本帖由系统于 2年前 自动加精
支持支持
可以用gethostname()获取主机名代替APP_NAME做区分