基于 swoole 的 websocket 服务二:路由注册

基于 swoole 的 websocket 服务二:路由注册

效果展示图

界面展示

动图地址:过程动态展示

以上演示存在两个路由:taskmessage,具体请查看前端代码。

终端展示

前端实现代码

上章节 swoole_websocket.html 文件内容保持不变,文件名称改为 co_task.html

新增 co_chat.html文件:

<!DOCTYPE html>
<html lang="en">

<head>
    <meta charset="UTF-8">
    <meta http-equiv="X-UA-Compatible" content="IE=edge">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>Document</title>
</head>

<body onload="initialize()">
    <div>
        <div id="receiveMessage">
            <!-- 虚拟dom节点 -->
        </div>
        <div>
            <input type="text" id="sendingMessage" onkeydown="sendMessageByEnter()">
            <button onclick="sendMessage()">发送</button>
            <div><small style="color: #666666">按"enter"键发送</small></div>
        </div>
    </div>
</body>

<script>
    let ws;

    function initialize() {
          // 注意这里是 message 路由
        ws = new WebSocket("ws://localhost:9999/message");

        ws.onopen = function (evt) {
            console.log('connection start')
        };
    }

    function sendMessageByEnter() {
        let event = window.event;
        let code = event.keyCode || event.which || event.charCode
        // 13 == `enter`
        if (code == 13) {
            sendMessage()
        }
    }

    function sendMessage() {
        const inputDom = window.document.getElementById('sendingMessage')
        const recevDom = window.document.getElementById('receiveMessage')

        if (!inputDom.value) {
            alert('不能发送空消息哦~')
            return
        }

        const div1Dom = window.document.createElement('div')
        div1Dom.innerHTML = `<span>我:</span><span>${inputDom.value}</span>`
        div1Dom.setAttribute('style', 'color:#363636;')
        recevDom.appendChild(div1Dom)

        // 发送消息
        ws.send(inputDom.value)
        inputDom.value = ''

        ws.onmessage = function (evt) {
            const div2Dom = window.document.createElement('div')
            div2Dom.innerHTML = `<span>服务器:</span><span>${evt.data}</span>`
            div2Dom.setAttribute('style', 'color:#008B8B;')
            recevDom.appendChild(div2Dom)
        };

        ws.onclose = function (evt) {
            console.log("connection close")
            alert('连接已过期,3秒后将刷新页面~')
            setTimeout(function () {
                window.location.reload()
            }, 3000)
        }
    }
</script>
</html>

后端代码实现

app/Console/Commands/WebsocketServer.php

.
.
.
use App\Traits\CoRouter;

class WebsocketServer extends Command
{
    use CoRouter;
.
.
.
    protected function handleCo()
    {
        $server = new CoServer();

        $server->route('/task', $this->handleTask());
        $server->route('/message', $this->handleMessage());

        $server->start();
    }
}

app/Handlers/Websockets/CoServer.php

<?php

namespace App\Handlers\Websockets;

use App\Traits\CoLogger;
use Swoole\Runtime;
use Swoole\WebSocket\Server;

class CoServer
{
    use CoLogger;

    protected $routes;
    protected $paths;

    public function __construct()
    {
        // 开启一键协程化
        Runtime::enableCoroutine(SWOOLE_HOOK_ALL);
    }

    public function start()
    {
        // 异步风格websocket服务器
        $ws = new Server('127.0.0.1', 9999);

        $ws->on('Open', function ($ws, $request) {
            $path = trim($request->server['request_uri'], '/');
            if (!in_array($path, array_keys($this->routes))) {
                $this->log('路由不存在', $request->fd);
                $ws->close($request->fd);
                return;
            }
            $this->paths[$request->fd] = $path;
            $this->log('连接成功', $request->fd);
        });

        $ws->on('Message', function ($ws, $frame) {
            // 1. 检查fid对应的路径
            if (!isset($this->paths[$frame->fd])) {
                $this->log('路径不存在', $frame->fd);
                $ws->close($frame->fd);
                return;
            }

            // 2. 检查fid对应的路由
            $path = $this->paths[$frame->fd];
            if (!isset($this->routes[$path])) {
                $this->log('路由不存在', $frame->fd);
                $ws->close($frame->fd);
                return;
            }

            // 3. 执行回调的闭包函数
            call_user_func($this->routes[$path], $ws, $frame);
        });

        $ws->on('close', function ($server, $fid) {
            if (isset($this->paths[$fid])) {
                unset($this->paths[$fid]);
            }
            echo "client {$fid} closed\n";
        });

        $ws->start();
    }

    public function route(string $path,\Closure $callback)
    {
        $path = trim($path, '/');
        $this->routes[$path] = $callback;
    }
}

app/Traits/CoRouter.php

<?php

namespace App\Traits;

use Illuminate\Support\Facades\Cache;
use Swoole\Coroutine;
use Swoole\Coroutine\Http\Client;
use Swoole\Coroutine\Redis;
use Swoole\Coroutine\WaitGroup;
use Swoole\Timer;

trait CoRouter
{
    use CoLogger;

    /**
     * 处理任务
     * @return \Closure
     */
    public function handleTask(): \Closure
    {
        return function ($ws, $frame) {
            $fid = $frame->fd ?? 0;
            $data = json_decode($frame->data, true);
            if (!$data || !is_array($data)) {
                $this->log('消息格式错误!', $fid);
                return;
            }
            $this->log('服务器已接收消息.', $fid);

            // 协程间通信,类似 go 的 sync.WaitGroup
            $wg = new WaitGroup();

            // 创建请求远端协程任务
            $wg->add();
            Coroutine::create(function () use($wg, $data, $fid) {
                $this->log('已发送消息到远端.', $fid);

                // 协程 http 客户端
                $client = new Client('127.0.0.1', 9101);
                $client->post('/remote/message', [
                    'data' => $data,
                    'fid' => $fid,
                ]);
                $client->close();
                $wg->done();
            });

            // 创建监听消息协程任务
            $wg->add();
            Coroutine::create(function () use ($wg, $ws, $fid) {
                $this->log('开始监听消息.', $fid);

                // 协程 redis 客户端
                $redis = new Redis();
                $redis->connect('127.0.0.1', 6379);
                if ($redis->subscribe(['ws:fid:'.$fid])) {
                    while ($msg = $redis->recv()) {
                        list($type, $name, $cont) = $msg;
                        if ($type == 'message' && $name == 'ws:fid:'.$fid) {
                            $ws->push($fid, $cont);
                            $this->log('消息已回复', $fid);
                        }
                    }
                }
                $redis->close();
                $wg->done();
            });

            $wg->wait();
            // 处理完主动断开连接
            $ws->close($fid);
        };
    }

    /**
     * 处理消息
     * @return \Closure
     */
    public function handleMessage(): \Closure
    {
        return function ($ws, $frame) {
            // 定时器缓存key
            $key = 'ws:msg:'.$frame->fd;
            // 活跃时长(ms)
            $ttl = 5 * 60 * 1000;

            // 清除上个定时器
            if ($tid = Cache::get($key)) {
                Cache::forget($key);
                Timer::clear($tid);
            }

            $message = $frame->data ?? '';
            $message = $frame->fd.' : '.$message.' : '.random_int(1000, 9999);

            $ws->push($frame->fd, $message);
            $this->log('回复内容: '.$message, $frame->fd);

            // 创建主动断开连接定时器
            $tid = Timer::after($ttl, function () use($ws, $frame) {
                $ws->close($frame->fd);
            });

            // 记录定时器到缓存
            Cache::forever($key, $tid);
        };
    }
}

app/Traits/CoLogger.php

<?php

namespace App\Traits;

trait CoLogger
{
    /**
     * 输出日志
     * @param $msg
     * @param int $fd
     */
    public function log($msg, $fd = 0)
    {
        if ($fd) {
            echo sprintf('[%s]: %d -> %s'.PHP_EOL, date('H:i:s'), $fd, $msg);
        } else {
            echo sprintf('[%s]: %s'.PHP_EOL, date('H:i:s'), $msg);
        }
    }
}

启动服务

> php artisan ws:start co
本作品采用《CC 协议》,转载必须注明作者和本文链接
《L03 构架 API 服务器》
你将学到如 RESTFul 设计风格、PostMan 的使用、OAuth 流程,JWT 概念及使用 和 API 开发相关的进阶知识。
《G01 Go 实战入门》
从零开始带你一步步开发一个 Go 博客项目,让你在最短的时间内学会使用 Go 进行编码。项目结构很大程度上参考了 Laravel。
讨论数量: 1

大佬 请教一下 就是这种操作方式能不能就是remote服务器直接 $client = new Swoole\Client(SWOOLE_SOCK_TCP); 创建一个swoole客户端 然后$client->send往swoole服务端发送处理好的结果 不用创建redis监听消息协程任务 直接swoole服务端接收到了返回回去 这两种操作方式都有些什么区别

1年前 评论

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!