基于 swoole 的 websocket 服务二:路由注册
基于 swoole 的 websocket 服务二:路由注册
效果展示图
界面展示
动图地址:过程动态展示
以上演示存在两个路由:task
和 message
,具体请查看前端代码。
终端展示
前端实现代码
上章节 swoole_websocket.html 文件内容保持不变,文件名称改为 co_task.html
新增 co_chat.html文件:
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta http-equiv="X-UA-Compatible" content="IE=edge">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Document</title>
</head>
<body onload="initialize()">
<div>
<div id="receiveMessage">
<!-- 虚拟dom节点 -->
</div>
<div>
<input type="text" id="sendingMessage" onkeydown="sendMessageByEnter()">
<button onclick="sendMessage()">发送</button>
<div><small style="color: #666666">按"enter"键发送</small></div>
</div>
</div>
</body>
<script>
let ws;
function initialize() {
// 注意这里是 message 路由
ws = new WebSocket("ws://localhost:9999/message");
ws.onopen = function (evt) {
console.log('connection start')
};
}
function sendMessageByEnter() {
let event = window.event;
let code = event.keyCode || event.which || event.charCode
// 13 == `enter`
if (code == 13) {
sendMessage()
}
}
function sendMessage() {
const inputDom = window.document.getElementById('sendingMessage')
const recevDom = window.document.getElementById('receiveMessage')
if (!inputDom.value) {
alert('不能发送空消息哦~')
return
}
const div1Dom = window.document.createElement('div')
div1Dom.innerHTML = `<span>我:</span><span>${inputDom.value}</span>`
div1Dom.setAttribute('style', 'color:#363636;')
recevDom.appendChild(div1Dom)
// 发送消息
ws.send(inputDom.value)
inputDom.value = ''
ws.onmessage = function (evt) {
const div2Dom = window.document.createElement('div')
div2Dom.innerHTML = `<span>服务器:</span><span>${evt.data}</span>`
div2Dom.setAttribute('style', 'color:#008B8B;')
recevDom.appendChild(div2Dom)
};
ws.onclose = function (evt) {
console.log("connection close")
alert('连接已过期,3秒后将刷新页面~')
setTimeout(function () {
window.location.reload()
}, 3000)
}
}
</script>
</html>
后端代码实现
app/Console/Commands/WebsocketServer.php
.
.
.
use App\Traits\CoRouter;
class WebsocketServer extends Command
{
use CoRouter;
.
.
.
protected function handleCo()
{
$server = new CoServer();
$server->route('/task', $this->handleTask());
$server->route('/message', $this->handleMessage());
$server->start();
}
}
app/Handlers/Websockets/CoServer.php
<?php
namespace App\Handlers\Websockets;
use App\Traits\CoLogger;
use Swoole\Runtime;
use Swoole\WebSocket\Server;
class CoServer
{
use CoLogger;
protected $routes;
protected $paths;
public function __construct()
{
// 开启一键协程化
Runtime::enableCoroutine(SWOOLE_HOOK_ALL);
}
public function start()
{
// 异步风格websocket服务器
$ws = new Server('127.0.0.1', 9999);
$ws->on('Open', function ($ws, $request) {
$path = trim($request->server['request_uri'], '/');
if (!in_array($path, array_keys($this->routes))) {
$this->log('路由不存在', $request->fd);
$ws->close($request->fd);
return;
}
$this->paths[$request->fd] = $path;
$this->log('连接成功', $request->fd);
});
$ws->on('Message', function ($ws, $frame) {
// 1. 检查fid对应的路径
if (!isset($this->paths[$frame->fd])) {
$this->log('路径不存在', $frame->fd);
$ws->close($frame->fd);
return;
}
// 2. 检查fid对应的路由
$path = $this->paths[$frame->fd];
if (!isset($this->routes[$path])) {
$this->log('路由不存在', $frame->fd);
$ws->close($frame->fd);
return;
}
// 3. 执行回调的闭包函数
call_user_func($this->routes[$path], $ws, $frame);
});
$ws->on('close', function ($server, $fid) {
if (isset($this->paths[$fid])) {
unset($this->paths[$fid]);
}
echo "client {$fid} closed\n";
});
$ws->start();
}
public function route(string $path,\Closure $callback)
{
$path = trim($path, '/');
$this->routes[$path] = $callback;
}
}
app/Traits/CoRouter.php
<?php
namespace App\Traits;
use Illuminate\Support\Facades\Cache;
use Swoole\Coroutine;
use Swoole\Coroutine\Http\Client;
use Swoole\Coroutine\Redis;
use Swoole\Coroutine\WaitGroup;
use Swoole\Timer;
trait CoRouter
{
use CoLogger;
/**
* 处理任务
* @return \Closure
*/
public function handleTask(): \Closure
{
return function ($ws, $frame) {
$fid = $frame->fd ?? 0;
$data = json_decode($frame->data, true);
if (!$data || !is_array($data)) {
$this->log('消息格式错误!', $fid);
return;
}
$this->log('服务器已接收消息.', $fid);
// 协程间通信,类似 go 的 sync.WaitGroup
$wg = new WaitGroup();
// 创建请求远端协程任务
$wg->add();
Coroutine::create(function () use($wg, $data, $fid) {
$this->log('已发送消息到远端.', $fid);
// 协程 http 客户端
$client = new Client('127.0.0.1', 9101);
$client->post('/remote/message', [
'data' => $data,
'fid' => $fid,
]);
$client->close();
$wg->done();
});
// 创建监听消息协程任务
$wg->add();
Coroutine::create(function () use ($wg, $ws, $fid) {
$this->log('开始监听消息.', $fid);
// 协程 redis 客户端
$redis = new Redis();
$redis->connect('127.0.0.1', 6379);
if ($redis->subscribe(['ws:fid:'.$fid])) {
while ($msg = $redis->recv()) {
list($type, $name, $cont) = $msg;
if ($type == 'message' && $name == 'ws:fid:'.$fid) {
$ws->push($fid, $cont);
$this->log('消息已回复', $fid);
}
}
}
$redis->close();
$wg->done();
});
$wg->wait();
// 处理完主动断开连接
$ws->close($fid);
};
}
/**
* 处理消息
* @return \Closure
*/
public function handleMessage(): \Closure
{
return function ($ws, $frame) {
// 定时器缓存key
$key = 'ws:msg:'.$frame->fd;
// 活跃时长(ms)
$ttl = 5 * 60 * 1000;
// 清除上个定时器
if ($tid = Cache::get($key)) {
Cache::forget($key);
Timer::clear($tid);
}
$message = $frame->data ?? '';
$message = $frame->fd.' : '.$message.' : '.random_int(1000, 9999);
$ws->push($frame->fd, $message);
$this->log('回复内容: '.$message, $frame->fd);
// 创建主动断开连接定时器
$tid = Timer::after($ttl, function () use($ws, $frame) {
$ws->close($frame->fd);
});
// 记录定时器到缓存
Cache::forever($key, $tid);
};
}
}
app/Traits/CoLogger.php
<?php
namespace App\Traits;
trait CoLogger
{
/**
* 输出日志
* @param $msg
* @param int $fd
*/
public function log($msg, $fd = 0)
{
if ($fd) {
echo sprintf('[%s]: %d -> %s'.PHP_EOL, date('H:i:s'), $fd, $msg);
} else {
echo sprintf('[%s]: %s'.PHP_EOL, date('H:i:s'), $msg);
}
}
}
启动服务
> php artisan ws:start co
本作品采用《CC 协议》,转载必须注明作者和本文链接
推荐文章: