laravel集成workerman实现websocket多端及时通讯

1、安装依赖并创建启动命令文件

[root@local]# composer require workerman/gateway-worker:3.0
[root@local]# composer require workerman/gatewayclient:^3.0

使用命令创建

[root@local]# php artisan make:command Workerman

或在app/Console/Commands/目录下创建WorkerMan.php

namespace App\Console\Commands;

use GatewayWorker\BusinessWorker;
use GatewayWorker\Gateway;
use GatewayWorker\Register;
use Illuminate\Console\Command;
use Workerman\Worker;

class WorkerMan extends Command
{

    protected $signature = 'wk {action} {--d}';

    protected $description = 'Start a Workerman server.';

    public function handle()
    {
        global $argv;
        $action = $this->argument('action');

        $argv[0] = 'wk';
        $argv[1] = $action;
        $argv[2] = $this->option('d') ? '-d' : '';

        $this->start();
    }

    private function start()
    {
        $this->startGateWay();
        $this->startBusinessWorker();
        $this->startRegister();

        $workerPath = storage_path('workerman/');
        if (!is_dir($workerPath))
            mkdir($workerPath, 0755, true);
        Worker::$pidFile = $workerPath . config('app.name') . '_workman.pid';
        $logPath = $workerPath . date('Ym') . '/';
        if (!is_dir($logPath))
            mkdir($logPath, 0755, true);
        Worker::$logFile = $logPath . date('d') . '.log';

        Worker::runAll();
    }

    private function startBusinessWorker()
    {
        $worker = new BusinessWorker();
        $worker->name = 'BusinessWorker';
        $worker->count = 3;
        $worker->registerAddress = '127.0.0.1:1236';
        $worker->eventHandler = \App\Workerman\Events::class;
    }

    private function startGateWay()
    {
        $gateway = new Gateway("websocket://0.0.0.0:2346");
        $gateway->name = 'Gateway';
        $gateway->count = 1;
        $gateway->lanIp = '127.0.0.1';
        $gateway->startPort = 2300;
        $gateway->pingInterval = 30;
        $gateway->pingNotResponseLimit = 0;
        $gateway->pingData = '{"type":"ping"}';
        $gateway->registerAddress = '127.0.0.1:1236';
    }

    private function startRegister()
    {
        new Register('text://127.0.0.1:1236');
    }
}
  • 启动命令
    [root@local]# php artisan wk start
  • 守护模式启动
    [root@local]# php artisan wk start --d
  • 其它命令:
    查看状态:status / 停止: stop / 重载: reload / 重启: restart [–d]

2、创建windows环境启动命令文件
由于windows不支持批量启动服务,所以每个服务需要单独启动


namespace App\Console\Commands;

use GatewayWorker\BusinessWorker;
use GatewayWorker\Gateway;
use GatewayWorker\Register;
use Illuminate\Console\Command;
use Workerman\Worker;

class WorkerManWin extends Command
{

    //兼容win
    protected $signature = 'wk
                            {action : action}
                            {--start=all : start}
                            {--d : daemon mode}';

    protected $description = 'Start a Workerman server.';

    public function handle()
    {
        global $argv;
        $action = $this->argument('action');

        //针对 Windows 一次执行,无法注册多个协议的特殊处理
        if ($action === 'single') {
            $start = $this->option('start');
            if ($start === 'register') {
                $this->startRegister();
            } elseif ($start === 'gateway') {
                $this->startGateWay();
            } elseif ($start === 'worker') {
                $this->startBusinessWorker();
            }
            Worker::runAll();

            return;
        }

        $argv[1] = $action;
        $argv[2] = $this->option('d') ? '-d' : '';

        $this->start();
    }

    private function start()
    {
        $this->startGateWay();
        $this->startBusinessWorker();
        $this->startRegister();
        Worker::runAll();
    }

    private function startBusinessWorker()
    {
        $worker = new BusinessWorker();
        $worker->name = 'BusinessWorker';
        $worker->count = 1;
        $worker->registerAddress = '127.0.0.1:1236';
        $worker->eventHandler = \App\Workerman\Events::class;
    }

    private function startGateWay()
    {
        $gateway = new Gateway("websocket://0.0.0.0:2346");
        $gateway->name = 'Gateway';
        $gateway->count = 1;
        $gateway->lanIp = '127.0.0.1';
        $gateway->startPort = 2300;
        $gateway->pingInterval = 30;
        $gateway->pingNotResponseLimit = 0;
        $gateway->pingData = '{"type":"ping"}';
        $gateway->registerAddress = '127.0.0.1:1236';
    }


    private function startRegister()
    {
        new Register('text://0.0.0.0:1236');
    }
}
  • 启动命令
    [root@local]# php artisan wk single --start=gateway
    [root@local]# php artisan wk single --start=worker
    [root@local]# php artisan wk single --start=register

    3、定义事件类
    在app/Workerman/目录下创建Events.php

namespace App\Workerman;

use App\Helpers\Jwt;
use App\Models\UserModel;
use GatewayWorker\BusinessWorker;
use GatewayWorker\Lib\Gateway;
use Illuminate\Support\Facades\Log;
use Workerman\Lib\Timer;

class Events
{


    /**
     * 业务服务启动事件
     * @param BusinessWorker $businessWorker
     * @return void
     */
    public static function onWorkerStart(BusinessWorker $businessWorker)
    {
        self::log(__FUNCTION__, $businessWorker->workerId);
        Timer::add(1, function () use ($businessWorker) {
            $time_now = time();
            foreach ($businessWorker->connections as $connection) {
                // 有可能该connection还没收到过消息,则lastMessageTime设置为当前时间
                if (empty($connection->lastMessageTime)) {
                    $connection->lastMessageTime = $time_now;
                    continue;
                }
                // 上次通讯时间间隔大于心跳间隔,则认为客户端已经下线,关闭连接
                if ($time_now - $connection->lastMessageTime > 30) {
                    if ($connection->id) {
                        //todo
                    }
                    //断开后的回调
                    echo "Client ip {$connection->getRemoteIp()} timeout!!!\n";
                    $connection->close();
                }
            }
        });
    }

    /**
     * 客户端连接事件
     * @param string $clientId
     * @return void
     */
    public static function onConnect(string $clientId)
    {
        self::log(__FUNCTION__, $clientId);
    }

    /**
     * 客户端websocket 连接事件
     * @param string $clientId
     * @param mixed $data
     * @return void
     */
    public static function onWebSocketConnect(string $clientId, $data)
    {
        self::log(__FUNCTION__, $clientId, $data);
    }

    /**
     * 客户端websocket消息
     * @param string $clientId
     * @param string $messageJson
     * @return void
     */
    public static function onMessage(string $clientId, string $messageJson)
    {
        self::log(__FUNCTION__, $clientId, $messageJson);
        $message = json_decode($messageJson);
        if (empty($message->type)) {
            self::sendMessage(500, '请配置type');
            return;
        }
        switch ($message->type) {
            case 'login':
                // 登录业务
                break;
            case 'ping':
                self::sendMessage(201, 'pong');
                break;
            default:
                self::sendMessage(500, '消息类型不支持');
        }
    }


    /**
     * 关闭客户端websocket
     * @param string $clientId
     * @return void
     */
    public static function onClose(string $clientId)
    {
        self::log(__FUNCTION__, $clientId);
        Gateway::destoryClient($clientId);
    }


    /**
     * 写日志
     * @param string $title
     * @param $data
     * @return void
     */
    protected static function log(string $title, ...$data): void
    {
        if (config('app.debug')) {
            var_dump("========== {$title} ==========");
            var_dump($data);
            Log::info("{$title} | " . json_encode($data, 256));
        }
    }


    /**
     * 发送客户端消息
     * @param int $code
     * @param mixed $message
     * @param array|null $data
     * @param string $clientId
     * @return void
     */
    protected static function sendMessage(int $code, $message, ?array $data = null, string $clientId = ''): void
    {
        $sendMessage = json_encode([
            'code' => $code,
            'msg' => $message,
            'data' => $data,
        ]);
        if ($clientId)
            Gateway::sendToClient($clientId, $sendMessage);
        else
            Gateway::sendToCurrentClient($sendMessage);
    }

}

4、js连接websocket服务

let ws = new WebSocket('ws://192.168.0.100:2346');
// 获取连接状态
console.log('ws连接状态:' + ws.readyState);
//监听是否连接成功
ws.onopen = function () {
    console.log('ws连接状态:' + ws.readyState);
    //连接成功则发送登录请求
 let message =  {type: "login", token: "JWT授权码"};
 ws.send(JSON.stringify(message));
}
// 接听服务器发回的信息并处理展示
ws.onmessage = function (data) {
    console.log('接收到来自服务器的消息:');
    console.log(data);
    //完成通信后关闭WebSocket连接
    ws.close();
}
// 监听连接关闭事件
ws.onclose = function () {
    // 监听整个过程中websocket的状态
    console.log('ws连接状态:' + ws.readyState);
}
// 监听并处理error事件
ws.onerror = function (error) {
    console.log(error);
}

5、后端向客户端发送消息

\GatewayClient\Gateway::$registerAddress = '127.0.0.1:1236';
        Gateway::sendToUid(29, '{"type": "update", "data": {"name": "张三", "aratar": "..."}}');
  • 注意:此处是单向api方式发送消息,registerAddress地址与启动服务注册的registerAddress地址保持对应,如本地可直接使用默认设置。内网调用时,服务与客户端使用内网IP。

6、补充使用域名时,配置nginx代理

在之前的项目中发现使用域名连接时,存在有时连接不上的问题,解决办法是使用nginx代理,在站点配置vhost文件,加上以下代码

# websocket端
        location ~ /websocket {
          proxy_pass http://127.0.0.1:2346;
          proxy_http_version 1.1;
          proxy_set_header Host $http_host;
          proxy_set_header X-Scheme $scheme;
          proxy_set_header Upgrade $http_upgrade;
          proxy_set_header Connection "Upgrade";
          proxy_set_header X-Real-IP $remote_addr;
          proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        }
  • 连接地址使用: ws://xxx.xxx.com/websocket,带证书则使用:wss://xxx.xxx.com/websocket

  • Gateway服务端与客户端使用方法,请查阅官方手册:www.workerman.net/doc/gateway-work...

本作品采用《CC 协议》,转载必须注明作者和本文链接
本帖由系统于 1年前 自动加精
《L04 微信小程序从零到发布》
从小程序个人账户申请开始,带你一步步进行开发一个微信小程序,直到提交微信控制台上线发布。
《L03 构架 API 服务器》
你将学到如 RESTFul 设计风格、PostMan 的使用、OAuth 流程,JWT 概念及使用 和 API 开发相关的进阶知识。
讨论数量: 7
猪猪

不错收藏了 :+1:

1年前 评论

同样的效果 我的例子,实现在线客服和聊天室功能。(https://element.wmhello.cn)

10个月前 评论

许多项目我都是独立使用 workman 做 websocket 服务器,使用 GatewayWorker 能快速开发相关功能,集成到 laravel 或者 tp 等 php 常规框架一起使用,能完美解决 websocket 通讯问题,实现了聊天室、客服,包括小程序中的各种实时交互与应用相关的功能。可以看 demo(https://element.wmhello.cn)

10个月前 评论
Su 5个月前
游离不2

最近准备弃坑了,GatewayWorker 的架构设计存在缺陷,横向扩展会导致 Gateway 和 BusinessWorker 之间的效率变低。

2个月前 评论
zfb 1个月前

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!