Lumen/Laravel 集成 GatewayWorker (Workerman),实现简单的聊天系统.

GatewayWorker 与 Workerman的关系
Workerman相比GatewayWorker也更底层,需要开发者有一定的多进程编程经验。因为绝大多数开发者的目标是基于Workerman开发TCP长连接应用,而长连接应用服务端有很多共同之处,例如它们有相同的进程模型以及单发、群发、广播等接口需求。所以才有了GatewayWorker框架,GatewayWorker是基于Workerman开发的一个TCP长连接框架,实现了单发、群送、广播等长连接必用的接口。GatewayWorker框架实现了Gateway Worker进程模型,天然支持分布式多服务器部署,扩容缩容非常方便,能够应对海量并发连接。可以说GatewayWorker是基于Workerman实现的一个更完善的专门用于实现TCP长连接的项目框架。项目是长连接并且需要客户端与客户端之间通讯,建议使用GatewayWorker。

Step1.引入包

  1. composer.json件的require引入这三行
        "workerman/gateway-worker": "^3.0",
        "workerman/gatewayclient": "^3.0",
        "workerman/workerman": "^3.5"

    file

  2. 终端的项目根目录输入 composer update

Step2. 创建一个Command

  1. App\Console\Commands目录创建GatewayWorkerServer.php文件,作为服务端口的启动文件
<?php

namespace App\Console\Commands;

use GatewayWorker\BusinessWorker;
use Illuminate\Console\Command;
use Workerman\Worker;
use GatewayWorker\Gateway;
use GatewayWorker\Register;

class GatewayWorkerServer extends Command
{
    /**
     * The name and signature of the console command.
     *
     * @var string
     */
    protected $signature = 'gateway-worker:server {action} {--daemon}';

    /**
     * The console command description.
     *
     * @var string
     */
    protected $description = 'Start a GatewayWorker Server.';

    /**
     * constructor
     */
    public function __construct()
    {
        parent::__construct();
    }

    /**
     * Execute the console command.
     *
     * [@return](https://learnku.com/users/31554) mixed
     */
    public function handle()
    {
        global $argv;

        if (!in_array($action = $this->argument('action'), ['start', 'stop'])) {
            $this->error('Error Arguments');
            exit;
        }

        $argv[0] = 'gateway-worker:server';
        $argv[1] = $action;
        $argv[2] = $this->option('daemon') ? '-d' : '';

        $this->start();
    }

    private function start()
    {
        $this->startGateWay();
        $this->startBusinessWorker();
        $this->startRegister();
        Worker::runAll();
    }

    private function startBusinessWorker()
    {
        $worker                  = new BusinessWorker();
        $worker->name            = 'BusinessWorker';                        #设置BusinessWorker进程的名称
        $worker->count           = 1;                                       #设置BusinessWorker进程的数量
        $worker->registerAddress = '127.0.0.1:12360';                        #注册服务地址
        $worker->eventHandler    = \App\GatewayWorker\Events::class;            #设置使用哪个类来处理业务,业务类至少要实现onMessage静态方法,onConnect和onClose静态方法可以不用实现
    }

    private function startGateWay()
    {
        $gateway = new Gateway("websocket://0.0.0.0:23460");
        $gateway->name                 = 'Gateway';                         #设置Gateway进程的名称,方便status命令中查看统计
        $gateway->count                = 1;                                 #进程的数量
        $gateway->lanIp                = '127.0.0.1';                       #内网ip,多服务器分布式部署的时候需要填写真实的内网ip
        $gateway->startPort            = 2300;                              #监听本机端口的起始端口
        $gateway->pingInterval         = 30;
        $gateway->pingNotResponseLimit = 0;                                 #服务端主动发送心跳
        $gateway->pingData             = '{"mode":"heart"}';
        $gateway->registerAddress      = '127.0.0.1:12360';                  #注册服务地址
    }

    private function startRegister()
    {
        new Register('text://0.0.0.0:12360');
    }

}

//ws = new WebSocket("ws://127.0.0.1:23460");
//ws.onopen = function() {
//    ws . send('{"mode":"say","order_id":"21",type:1,"content":"文字内容","user_id":21}');
//    ws . send('{"mode":"chats","order_id":"97"}');
//};
//ws.onmessage = function(e) {
//    console.log("收到服务端的消息:" + e.data);
//};
  1. App\Console\Kernel引入 GatewayWorkerServer::class
    file

Step3.创建监听文件

在app目录下创建 GatewayWorker/Events.php文件
file

<?php

namespace App\GatewayWorker;

use App\Models\Home\Order;
use App\Models\Home\OrderChat;
use GatewayWorker\Lib\Gateway;
use Illuminate\Support\Facades\Log;
use mysql_xdevapi\Exception;

class Events
{

    public static function onWorkerStart($businessWorker)
    {
        echo "BusinessWorker    Start\n";
    }

    public static function onConnect($client_id)
    {
        Gateway::sendToClient($client_id, json_encode(['type' => 'init', 'client_id' => $client_id]));
    }

    public static function onWebSocketConnect($client_id, $data)
    {

    }

    public static function onMessage($client_id, $message)
    {
        $response = ['errcode' => 0, 'msg' => 'ok', 'data' => []];
        $message = json_decode($message);

        if (!isset($message->mode)) {
            $response['msg'] = 'missing parameter mode';
            $response['errcode'] = ERROR_CHAT;
            Gateway::sendToClient($client_id, json_encode($response));
            return false;
        }

        switch ($message->mode) {
            case 'say':   #处理发送的聊天
                if (self::authentication($message->order_id, $message->user_id)) {
                    OrderChat::store($message->order_id, $message->type, $message->content, $message->user_id);
                } else {
                    $response['msg'] = 'Authentication failure';
                    $response['errcode'] = ERROR_CHAT;
                }
                break;
            case 'chats':  #获取聊天列表
                $chats = OrderChat::where('order_id', $message->order_id)->get();
                $response['data'] = ['chats' => $chats];
                break;
            default:
                $response['errcode'] = ERROR_CHAT;
                $response['msg'] = 'Undefined';
        }

        Gateway::sendToClient($client_id, json_encode($response));
    }

    public static function onClose($client_id)
    {
        Log::info('close connection' . $client_id);
    }

    private static function authentication($order_id, $user_id): bool
    {
        $order = Order::find($order_id);
        if (is_null($order)) {
            return false;
        }
        return in_array($user_id, [$order->user_id, $order->to_user_id]) ? true : false;   #判断属不属于这个订单的两个人
    }
}

注意 GatewayWorkerServer.php文件中的 startBusinessWorker方法中$worker->eventHandler属性的路径是Event.php的路径,namespace可以自己定义

最后执行一个 composer dump-autoload

Step4 测试

  1. 开启服务,在终端输入
    php artisan gateway-worker:server start
    file

  2. 在浏览器F12打开调试模式,在Console里输入
ws = new WebSocket("ws://127.0.0.1:23460");
ws = new WebSocket("ws://127.0.0.1:23460");
ws.onopen = function() {
    ws . send('{"mode":"say","order_id":"375","type":1,"content":"你好","user_id":100036}');
    ws . send('{"mode":"chats","order_id":"375"}');
};
ws.onmessage = function(e) {
    console.log("收到服务端的消息:" + e.data);
};

file

简单实现订单里的买家和卖家交流、获取聊天记录,也可以把相关的信息存到缓存里,
public static function onMessage() 里修改自己的业务逻辑

http://doc2.workerman.net/ GatewayWorker2.x 3.x 手册

《L03 构架 API 服务器》
你将学到如 RESTFul 设计风格、PostMan 的使用、OAuth 流程,JWT 概念及使用 和 API 开发相关的进阶知识。
《L04 微信小程序从零到发布》
从小程序个人账户申请开始,带你一步步进行开发一个微信小程序,直到提交微信控制台上线发布。
讨论数量: 2

怎么重启

1天前

你好,没做守护进程吗

1天前

请勿发布不友善或者负能量的内容。与人为善,比聪明更重要!