laravel集成workerman实现websocket多端及时通讯
1、安装依赖并创建启动命令文件
[root@local]# composer require workerman/gateway-worker:3.0 [root@local]# composer require workerman/gatewayclient:^3.0
使用命令创建
[root@local]# php artisan make:command Workerman
或在app/Console/Commands/目录下创建WorkerMan.php
namespace App\Console\Commands;
use GatewayWorker\BusinessWorker;
use GatewayWorker\Gateway;
use GatewayWorker\Register;
use Illuminate\Console\Command;
use Workerman\Worker;
class WorkerMan extends Command
{
protected $signature = 'wk {action} {--d}';
protected $description = 'Start a Workerman server.';
public function handle()
{
global $argv;
$action = $this->argument('action');
$argv[0] = 'wk';
$argv[1] = $action;
$argv[2] = $this->option('d') ? '-d' : '';
$this->start();
}
private function start()
{
$this->startGateWay();
$this->startBusinessWorker();
$this->startRegister();
$workerPath = storage_path('workerman/');
if (!is_dir($workerPath))
mkdir($workerPath, 0755, true);
Worker::$pidFile = $workerPath . config('app.name') . '_workman.pid';
$logPath = $workerPath . date('Ym') . '/';
if (!is_dir($logPath))
mkdir($logPath, 0755, true);
Worker::$logFile = $logPath . date('d') . '.log';
Worker::runAll();
}
private function startBusinessWorker()
{
$worker = new BusinessWorker();
$worker->name = 'BusinessWorker';
$worker->count = 3;
$worker->registerAddress = '127.0.0.1:1236';
$worker->eventHandler = \App\Workerman\Events::class;
}
private function startGateWay()
{
$gateway = new Gateway("websocket://0.0.0.0:2346");
$gateway->name = 'Gateway';
$gateway->count = 1;
$gateway->lanIp = '127.0.0.1';
$gateway->startPort = 2300;
$gateway->pingInterval = 30;
$gateway->pingNotResponseLimit = 0;
$gateway->pingData = '{"type":"ping"}';
$gateway->registerAddress = '127.0.0.1:1236';
}
private function startRegister()
{
new Register('text://127.0.0.1:1236');
}
}
- 启动命令
[root@local]# php artisan wk start
- 守护模式启动
[root@local]# php artisan wk start --d
- 其它命令:
查看状态:status / 停止: stop / 重载: reload / 重启: restart [–d]
2、创建windows环境启动命令文件
由于windows不支持批量启动服务,所以每个服务需要单独启动
namespace App\Console\Commands;
use GatewayWorker\BusinessWorker;
use GatewayWorker\Gateway;
use GatewayWorker\Register;
use Illuminate\Console\Command;
use Workerman\Worker;
class WorkerManWin extends Command
{
//兼容win
protected $signature = 'wk
{action : action}
{--start=all : start}
{--d : daemon mode}';
protected $description = 'Start a Workerman server.';
public function handle()
{
global $argv;
$action = $this->argument('action');
//针对 Windows 一次执行,无法注册多个协议的特殊处理
if ($action === 'single') {
$start = $this->option('start');
if ($start === 'register') {
$this->startRegister();
} elseif ($start === 'gateway') {
$this->startGateWay();
} elseif ($start === 'worker') {
$this->startBusinessWorker();
}
Worker::runAll();
return;
}
$argv[1] = $action;
$argv[2] = $this->option('d') ? '-d' : '';
$this->start();
}
private function start()
{
$this->startGateWay();
$this->startBusinessWorker();
$this->startRegister();
Worker::runAll();
}
private function startBusinessWorker()
{
$worker = new BusinessWorker();
$worker->name = 'BusinessWorker';
$worker->count = 1;
$worker->registerAddress = '127.0.0.1:1236';
$worker->eventHandler = \App\Workerman\Events::class;
}
private function startGateWay()
{
$gateway = new Gateway("websocket://0.0.0.0:2346");
$gateway->name = 'Gateway';
$gateway->count = 1;
$gateway->lanIp = '127.0.0.1';
$gateway->startPort = 2300;
$gateway->pingInterval = 30;
$gateway->pingNotResponseLimit = 0;
$gateway->pingData = '{"type":"ping"}';
$gateway->registerAddress = '127.0.0.1:1236';
}
private function startRegister()
{
new Register('text://0.0.0.0:1236');
}
}
- 启动命令
[root@local]# php artisan wk single --start=gateway [root@local]# php artisan wk single --start=worker [root@local]# php artisan wk single --start=register
3、定义事件类
在app/Workerman/目录下创建Events.php
namespace App\Workerman;
use App\Helpers\Jwt;
use App\Models\UserModel;
use GatewayWorker\BusinessWorker;
use GatewayWorker\Lib\Gateway;
use Illuminate\Support\Facades\Log;
use Workerman\Lib\Timer;
class Events
{
/**
* 业务服务启动事件
* @param BusinessWorker $businessWorker
* @return void
*/
public static function onWorkerStart(BusinessWorker $businessWorker)
{
self::log(__FUNCTION__, $businessWorker->workerId);
Timer::add(1, function () use ($businessWorker) {
$time_now = time();
foreach ($businessWorker->connections as $connection) {
// 有可能该connection还没收到过消息,则lastMessageTime设置为当前时间
if (empty($connection->lastMessageTime)) {
$connection->lastMessageTime = $time_now;
continue;
}
// 上次通讯时间间隔大于心跳间隔,则认为客户端已经下线,关闭连接
if ($time_now - $connection->lastMessageTime > 30) {
if ($connection->id) {
//todo
}
//断开后的回调
echo "Client ip {$connection->getRemoteIp()} timeout!!!\n";
$connection->close();
}
}
});
}
/**
* 客户端连接事件
* @param string $clientId
* @return void
*/
public static function onConnect(string $clientId)
{
self::log(__FUNCTION__, $clientId);
}
/**
* 客户端websocket 连接事件
* @param string $clientId
* @param mixed $data
* @return void
*/
public static function onWebSocketConnect(string $clientId, $data)
{
self::log(__FUNCTION__, $clientId, $data);
}
/**
* 客户端websocket消息
* @param string $clientId
* @param string $messageJson
* @return void
*/
public static function onMessage(string $clientId, string $messageJson)
{
self::log(__FUNCTION__, $clientId, $messageJson);
$message = json_decode($messageJson);
if (empty($message->type)) {
self::sendMessage(500, '请配置type');
return;
}
switch ($message->type) {
case 'login':
// 登录业务
break;
case 'ping':
self::sendMessage(201, 'pong');
break;
default:
self::sendMessage(500, '消息类型不支持');
}
}
/**
* 关闭客户端websocket
* @param string $clientId
* @return void
*/
public static function onClose(string $clientId)
{
self::log(__FUNCTION__, $clientId);
Gateway::destoryClient($clientId);
}
/**
* 写日志
* @param string $title
* @param $data
* @return void
*/
protected static function log(string $title, ...$data): void
{
if (config('app.debug')) {
var_dump("========== {$title} ==========");
var_dump($data);
Log::info("{$title} | " . json_encode($data, 256));
}
}
/**
* 发送客户端消息
* @param int $code
* @param mixed $message
* @param array|null $data
* @param string $clientId
* @return void
*/
protected static function sendMessage(int $code, $message, ?array $data = null, string $clientId = ''): void
{
$sendMessage = json_encode([
'code' => $code,
'msg' => $message,
'data' => $data,
]);
if ($clientId)
Gateway::sendToClient($clientId, $sendMessage);
else
Gateway::sendToCurrentClient($sendMessage);
}
}
- 测试可以在:www.websocket-test.com/ 连接服务并接收消息
4、js连接websocket服务
let ws = new WebSocket('ws://192.168.0.100:2346');
// 获取连接状态
console.log('ws连接状态:' + ws.readyState);
//监听是否连接成功
ws.onopen = function () {
console.log('ws连接状态:' + ws.readyState);
//连接成功则发送登录请求
let message = {type: "login", token: "JWT授权码"};
ws.send(JSON.stringify(message));
}
// 接听服务器发回的信息并处理展示
ws.onmessage = function (data) {
console.log('接收到来自服务器的消息:');
console.log(data);
//完成通信后关闭WebSocket连接
ws.close();
}
// 监听连接关闭事件
ws.onclose = function () {
// 监听整个过程中websocket的状态
console.log('ws连接状态:' + ws.readyState);
}
// 监听并处理error事件
ws.onerror = function (error) {
console.log(error);
}
5、后端向客户端发送消息
\GatewayClient\Gateway::$registerAddress = '127.0.0.1:1236';
Gateway::sendToUid(29, '{"type": "update", "data": {"name": "张三", "aratar": "..."}}');
- 注意:此处是单向api方式发送消息,registerAddress地址与启动服务注册的registerAddress地址保持对应,如本地可直接使用默认设置。内网调用时,服务与客户端使用内网IP。
6、补充使用域名时,配置nginx代理
在之前的项目中发现使用域名连接时,存在有时连接不上的问题,解决办法是使用nginx代理,在站点配置vhost文件,加上以下代码
# websocket端
location ~ /websocket {
proxy_pass http://127.0.0.1:2346;
proxy_http_version 1.1;
proxy_set_header Host $http_host;
proxy_set_header X-Scheme $scheme;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "Upgrade";
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
}
连接地址使用: ws://xxx.xxx.com/websocket,带证书则使用:wss://xxx.xxx.com/websocket
Gateway服务端与客户端使用方法,请查阅官方手册:www.workerman.net/doc/gateway-work...
本作品采用《CC 协议》,转载必须注明作者和本文链接
本帖由系统于 1年前 自动加精
不错收藏了 :+1:
同样的效果 我的例子,实现在线客服和聊天室功能。(https://element.wmhello.cn)
许多项目我都是独立使用 workman 做 websocket 服务器,使用 GatewayWorker 能快速开发相关功能,集成到 laravel 或者 tp 等 php 常规框架一起使用,能完美解决 websocket 通讯问题,实现了聊天室、客服,包括小程序中的各种实时交互与应用相关的功能。可以看 demo(https://element.wmhello.cn)
mark
最近准备弃坑了,GatewayWorker 的架构设计存在缺陷,横向扩展会导致 Gateway 和 BusinessWorker 之间的效率变低。