Laravel 整合 workerman 做聊天室
在很多时候我们需要做这种聊天室的时候需要实时响应,所以今天介绍一下workerman,如果没有听过workerman的小伙伴,传送门:workerman文档 ,官网
本篇文章是整合laravel 5.8
的,使用artisan
命令去管理workerman
。
如果觉得workerman
不好搞,也可以直接使用GatewayWorker
,传送门:GatewayWorker手册 , 在 Laravel 中使用 GatewayWorker 进行 socket 通讯
第一步 安装workerman
在项目根目录执行
composer require workerman/workerman
第二部 创建命令类,自定义启停workerman
在项目根目录执行
php artisan make:command Workerman
执行该命令会在app/Console/Commands/
下面创建Workerman.php
文件,该文件代码如下:
<?php
namespace App\Console\Commands;
use Illuminate\Console\Command;
use Workerman\Autoloader;
use Workerman\Worker;
class Workerman extends Command
{
/**
* The name and signature of the console command.
*
* @var string
*/
protected $signature = 'Workerman {action} {--daemonize}';
/**
* The console command description.
*
* @var string
*/
protected $description = 'workerman 启动停止';
/**
* Create a new command instance.
*
* @return void
*/
public function __construct()
{
parent::__construct();
}
/**
* Execute the console command.
*
* @return mixed
*/
public function handle()
{
global $argv;//定义全局变量
$arg = $this->argument('action');
$argv[1] = $arg;
$argv[2] = $this->option('daemonize') ? '-d' : '';//该参数是以daemon(守护进程)方式启动
global $text_worker;
// 创建一个Worker监听2345端口,使用websocket协议通讯
$text_worker = new Worker("websocket://0.0.0.0:2345");
$text_worker->uidConnections = array();//在线用户连接对象
$text_worker->uidInfo = array();//在线用户的用户信息
// 启动4个进程对外提供服务
$text_worker->count = 4;
//引用类文件
$handler = \App::make('Handler\WorkermanHandler');
$text_worker->onConnect = array($handler,"handle_connection");
$text_worker->onMessage = array($handler,"handle_message");
$text_worker->onClose = array($handler,"handle_close");
$text_worker->onWorkerStart = array($handler,"handle_start");
// 运行worker
Worker::runAll();
}
}
注意:action 是启动参数,daemonize选项是是否后台运行,如果不需要后台运行请把有关daemonize的代码删掉。
最最重要的地方来了$argv一定不能删掉,否则启动workerman的时候会报错
然后手动创建app/Handler/WorkermanHandler.php
文件,代码如下:
<?php
namespace Handler;
use Illuminate\Console\Command;
use Workerman\Lib\Timer;
use Workerman\Worker;
class WorkermanHandler
{
private $heartbeat_time = 55;//心跳间隔55秒
//当客户端连上来时分配uid,并保存链接,并通知所有客户端
public function handle_connection($connection){
global $text_worker;
//判断是否设置了UID
if(!isset($connection->uid)){
//给用户分配一个UID
$connection->uid = random_string(20);
//保存用户的uid
$text_worker->uidConnections["{$connection->uid}"] = $connection;
//向用户返回创建成功的信息
$connection->send("用户:[{$connection->uid}] 创建成功");
}
}
public function handle_start(){
global $text_worker;
//每秒都判断客户端是否已下线
Timer::add(1, function()use($text_worker){
$time_now = time();
foreach($text_worker->connections as $connection) {
// 有可能该connection还没收到过消息,则lastMessageTime设置为当前时间
if (empty($connection->lastMessageTime)) {
$connection->lastMessageTime = $time_now;
continue;
}
// 上次通讯时间间隔大于心跳间隔,则认为客户端已经下线,关闭连接
if ($time_now - $connection->lastMessageTime > $this->heartbeat_time) {
$connection->close();
}
}
});
//每隔30秒就向客户端发送一条心跳验证
Timer::add(50,function ()use ($text_worker){
foreach ($text_worker->connections as $conn){
$conn->send('{"type":"ping"}');
}
});
}
//当客户端发送消息过来时,转发给所有人
public function handle_message($connection,$data){
global $text_worker;
//debug
//echo "data_info:".$data.PHP_EOL;
$connection->lastMessageTime = time();
$data_info=json_decode($data,true);
if(!$data_info){
return ;
}
//判断业务类型
switch($data_info['type'])
{
case 'login':
//判断用户信息是否存在
if(empty($data_info['user_id'])){
$connection->send("{'type':'error','msg':'非法请求'}");
return $connection->close();
}
//判断用户是否已经登录了
$user_ids=array_column($text_worker->uidInfo,"user_id");
if(in_array($data_info['user_id'],$user_ids)){
$connection->send("{'type':'error','msg':'你在其它地方已登录'}");
return $connection->close();
}
//存储用户信息
$text_worker->uidInfo["{$connection->uid}"]=array(
"user_id"=>$data_info['user_id'],
"user_name"=>htmlspecialchars($data_info['user_name']),
"user_header"=>$data_info['user_header'],
"create_time"=>date("Y-m-d H:i"),
);
//返回数据
if($data_info['to_uid'] == "all"){
$return_data=array(
"type"=>"login",
"uid"=>$connection->uid,
"user_name"=>htmlspecialchars($data_info['user_name']),
"user_header"=>$data_info['user_header'],
"send_time"=>date("Y-m-d H:i",time()),
"user_lists"=>$text_worker->uidInfo
);
$curral_data=array(
"type"=>"login_uid",
"uid"=>$connection->uid,
);
$connection->send(json_encode($curral_data));
//给所有用户发送一条数据
foreach($text_worker->connections as $conn){
$conn->send(json_encode($return_data));
}
}else{
return ;
}
return;
//用户发消息
case 'say':
if(!isset($text_worker->uidInfo["{$connection->uid}"]) || empty($text_worker->uidInfo["{$connection->uid}"])){
$connection->send('{"type":"error","msg":"你已经掉线了"}');
}
//获取到当前用户的信息
$user_info=$text_worker->uidInfo["{$connection->uid}"];
//判断是私聊还是群聊
if($data_info['to_uid'] != "all"){
//私聊
$return_data=array(
"type"=>"say",
"from_uid"=>$connection->uid,
"from_user_name"=>$user_info['user_name'],
"from_user_header"=>$user_info['user_header'],
"to_uid"=>$data_info['to_uid'],
"content"=>nl2br(htmlspecialchars($data_info['content'])),
"send_time"=>date("Y-m-d H:i")
);
if($data_info['to_uid'] == $connection->uid){
$connection->send(json_encode($return_data));
return;
}
//判断用户是否存在,并向对方发送数据
if(isset($text_worker->uidConnections["{$data_info['to_uid']}"])){
$to_connection=$text_worker->uidConnections["{$data_info['to_uid']}"];
$to_connection->send(json_encode($return_data));
}
//向你自己发送一条数据
$connection->send(json_encode($return_data));
}else{
//群聊
$return_data=array(
"type"=>"say",
"from_uid"=>$connection->uid,
"from_user_name"=>$user_info['user_name'],
"from_user_header"=>$user_info['user_header'],
"to_uid"=>"all",
"content"=>nl2br(htmlspecialchars($data_info['content'])),
"send_time"=>date("Y-m-d H:i")
);
//向所有用户发送数据
foreach($text_worker->connections as $conn){
$conn->send(json_encode($return_data));
}
}
return;
case "pong":
return;
}
}
//当客户端断开时,广播给所有客户端
public function handle_close($connection){
global $text_worker;
$user_name=$text_worker->uidInfo[$connection->uid]['user_name'] ?? "";
unset($text_worker->uidConnections["{$connection->uid}"]);
unset($text_worker->uidInfo["{$connection->uid}"]);
if(!empty($user_name)){
$return_data=array(
"type"=>"logout",
"uid"=>$connection->uid,
"user_name"=>$user_name,
"create_time"=>date("Y-m-d H:i:s"),
);
foreach($text_worker->connections as $conn){
$conn->send(json_encode($return_data));
}
}
}
}
该代码是我这边自己的代码,有些东西没有贴出来,只做参考,比如怎么实现维持心跳和分配用户唯一id等。
打开composer.json
文件增加一段"app/Handler"
于classmap中,下面是我的部分内容:
...
"autoload": {
"classmap": [
"database/seeds",
"database/factories",
"app/Handler"
],
"psr-4": {
"App\\": "app/"
}
},
...
执行命令: composer dump-autoload
执行命令启动workerman
php artisan Workerman start --daemonize
再说一遍:app\Console\Commands\Workerman.php 里的代码 $arg = $this->argument('action'); $argv [1] = $arg; 如果这段代码不写那么就无法启动服务会报Usage: php yourfile.php {start|stop|restart|reload|status|connections} [-d]
原因: 参考接收位置错误. 代码位置: \vendor\workerman\workerman\Worker.php 673 684, 问题位置 673行中, $argv 这个全局变量取的key的位置错乱的.
如果你是debug模式启动workerman的话,就把app\Console\Commands\Workerman.php 里的 关于daemonize的代码删掉,启动命令就是php artisan Workerman start
参考资料
可以参观一下 我的博客
本作品采用《CC 协议》,转载必须注明作者和本文链接
推荐文章: