基于 swoole 的 websocket 服务一:状态同步
基于 swoole 的 websocket 服务一:状态同步
基础流程图
效果展示图
界面展示
终端展示
前端实现代码
swoole_websocket.html
<html>
<head>
<title>WebSocket</title>
<style>
h2, hr, ul, li {
margin: 0;
padding: 0;
}
hr {
width: 200px;
margin-top: 5px;
margin-bottom: 5px;
}
#main li {
list-style: none;
}
#main li span {
display: inline-block;
width: 50px;
}
</style>
</head>
<!-- 初始化加载子节点 -->
<body onload="initialize();">
<h2>WebSocket</h2>
<hr>
<div>
<ul id="main">
<!-- 待渲染子节点列表 -->
</ul>
</div>
<hr>
<button onclick="connWebsocket()">发送</button>
</body>
<script>
// 初始化页面加载函数
function initialize() {
for (let i = 0; i < 10; i++) {
// 获取主节点
let ulNode = window.document.getElementById('main')
// 生成随机id
let id = makeRandomNumber(5)
// 创建子节点
liNode = window.document.createElement('li')
liNode.innerHTML = `<span>${id}</span><span></span><span>未开始</span>`
liNode.firstElementChild.setAttribute('style', 'color:#363636;')
liNode.lastElementChild.setAttribute('style', 'color:#363636;')
// 应用子节点
ulNode.appendChild(liNode)
}
}
// 连接websocket服务器
function connWebsocket() {
// 所有的liNodes置为`进行中`
let liNodes = window.document.getElementById('main').children
// 待发送消息数据
let allData = []
for (let i = 0; i < liNodes.length; i++) {
let firstNode = liNodes[i].firstElementChild
let lastNode = liNodes[i].lastElementChild
// 渲染`进行中`状态样式
firstNode.setAttribute('style', 'color:#FF6347;')
lastNode.setAttribute('style', 'color:#FF6347;')
lastNode.textContent = '进行中'
allData.push({
'id': firstNode.textContent,
'status': lastNode.textContent,
})
}
// 进行weksocket通信服务
let ws = new WebSocket("ws://localhost:9999");
ws.onopen = function(evt) {
console.log('connection start')
// 打开连接就发送消息
ws.send(JSON.stringify(allData))
};
// 已完成数量
let completeNum = 0
ws.onmessage = function(evt) {
// 异常或错误处理
try {
var obj = JSON.parse(evt.data);
} catch (e) {
console.error(e)
return
}
if (!obj.id || !obj.status) {
console.error(`property is not undefined.`)
return
}
// 找到`id`对应节点
let nodeList = [...liNodes]
let liNode = nodeList.find((node) => {
return node.firstElementChild.textContent == obj.id
})
if (!liNode) {
console.error(`li node is not found.`)
return;
}
// 重新渲染界面,并自增已完成数量
completeNum++
liNode.lastElementChild.textContent = obj.status
liNode.lastElementChild.setAttribute('style', 'color:#008B8B;')
liNode.firstElementChild.setAttribute('style', 'color:#008B8B;')
// 如果完成数和节点数相等,主动断开连接
if (completeNum === liNodes.length) {
ws.close()
}
};
ws.onclose = function(evt) {
console.log("connection close");
}
}
// 制作整型随机数
function makeRandomNumber(digit = 6) {
if (digit < 1 || digit > 10) {
throw new RangeError('位数不能小于1且不能大于10')
}
const min = Math.pow(10, digit - 1)
const max = Math.pow(10, digit) - 1
let val = Math.floor(Math.random() * max + 1)
while (val < min) {
val = Math.floor(Math.random() * max + 1)
}
return val
}
</script>
</html>
后端代码实现
注意:演示框架是 laravel5.7+ 以上版本
1. 查看swoole扩展版本
> php --ri swoole
swoole
Swoole => enabled
Author => Swoole Team <team@swoole.com>
Version => 4.5.11
Built => Feb 21 2022 14:53:00
coroutine => enabled
kqueue => enabled
rwlock => enabled
pcre => enabled
zlib => 1.2.11
brotli => E16777225/D16777225
async_redis => enabled
Directive => Local Value => Master Value
swoole.enable_coroutine => On => On
swoole.enable_library => On => On
swoole.enable_preemptive_scheduler => Off => Off
swoole.display_errors => On => On
swoole.use_shortname => On => On
swoole.unixsock_buffer_size => 262144 => 262144
注意:以上演示 swoole 扩展版本为 4.5+
2. 创建自定义websocket服务类
app/Handlers/Websockets/CoServer.php
<?php
namespace App\Handlers\Websockets;
use Swoole\Coroutine;
class CoServer
{
public function __construct()
{
// 开启一键协程化
\Swoole\Runtime::enableCoroutine(SWOOLE_HOOK_ALL);
}
public function start()
{
// 异步风格websocket服务器
$ws = new \Swoole\WebSocket\Server('127.0.0.1', 9999);
$ws->on('Open', function ($ws, $request) {
$this->log('连接成功', $request->fd);
});
$ws->on('Message', function ($ws, $frame) {
$fid = $frame->fd ?? 0;
$data = json_decode($frame->data, true);
if (!$data || !is_array($data)) {
$this->log('消息格式错误!', $fid);
return;
}
$this->log('服务器已接收消息.', $fid);
// 协程间通信,类似 go 的 sync.WaitGroup
$wg = new \Swoole\Coroutine\WaitGroup();
// 创建请求远端协程任务
$wg->add();
Coroutine::create(function () use($wg, $data, $fid) {
$this->log('已发送消息到远端.', $fid);
// 协程 http 客户端
$client = new \Swoole\Coroutine\Http\Client('127.0.0.1', 9101);
$client->post('/remote/message', [
'data' => $data,
'fid' => $fid,
]);
$client->close();
$wg->done();
});
// 创建监听消息协程任务
$wg->add();
Coroutine::create(function () use ($wg, $ws, $fid) {
$this->log('开始监听消息.', $fid);
// 协程 redis 客户端
$redis = new \Swoole\Coroutine\Redis();
$redis->connect('127.0.0.1', 6379);
if ($redis->subscribe(['ws:fid:'.$fid])) {
while ($msg = $redis->recv()) {
list($type, $name, $cont) = $msg;
if ($type == 'message' && $name == 'ws:fid:'.$fid) {
$ws->push($fid, $cont);
$this->log('消息已回复', $fid);
}
}
}
$redis->close();
$wg->done();
});
$wg->wait();
// 处理完主动断开连接
$ws->close($fid);
});
$ws->on('close', function ($server, $fid) {
echo "client {$fid} closed\n";
});
$ws->start();
}
protected function log($msg, $fd = 0)
{
if ($fd) {
echo sprintf('[%s]: %d -> %s'.PHP_EOL, date('H:i:s'), $fd, $msg);
} else {
echo sprintf('[%s]: %s'.PHP_EOL, date('H:i:s'), $msg);
}
}
}
3. 模拟远端请求接口方法
app/Http/Controllers/RemoteController.php
<?php
namespace App\Http\Controllers;
use Illuminate\Http\Request;
use Illuminate\Support\Facades\Log;
use Illuminate\Support\Facades\Redis;
class RemoteController extends Controller
{
// 对应路由:/remote/message POST
public function message(Request $request)
{
$data = $request->input('data');
$fid = $request->input('fid');
Log::info('请求参数: '.json_encode($request->all()));
while (true) {
if (empty($data)) {
break;
}
usleep(500000);
foreach ($data as $key => $val) {
if (random_int(0, 100) > 80) {
$val['status'] = '已完成';
Log::info('获取消息中...');
Redis::connection()->publish('ws:fid:'.$fid, json_encode($val));
unset($data[$key]);
}
}
}
return response()->json([
'status' => 0,
'msg' => 'success',
'data' => null,
]);
}
}
4. 创建命令执行websocket服务
app/Console/Commands/WebsocketServer.php
<?php
namespace App\Console\Commands;
use App\Handlers\Websockets\CoServer;
use Illuminate\Console\Command;
use Ratchet\Http\HttpServer;
use Ratchet\Server\IoServer;
use Ratchet\WebSocket\WsServer;
class WebsocketServer extends Command
{
protected $signature = 'ws:start {server}';
protected $description = 'Command description';
const SERVER_HTTP = 'http';
const SERVER_CO = 'co';
public function __construct()
{
parent::__construct();
}
public function handle()
{
$allowServers = [self::SERVER_HTTP, self::SERVER_CO];
$inputServer = $this->argument('server');
if (!in_array($inputServer, $allowServers)) {
$this->error('服务器类型错误');
return;
}
switch ($inputServer) {
case self::SERVER_HTTP:
$this->handleHttp();
break;
case self::SERVER_CO:
$this->handleCo();
break;
default:
$this->error('服务器类型不存在');
}
}
protected function handleHttp()
{
// $server = IoServer::factory(
// new HttpServer(
// new WsServer(
// new \App\Handlers\Websockets\HttpServer()
// )
// )
// , 9999);
// $server->run();
}
protected function handleCo()
{
$server = new CoServer();
$server->start();
}
}
5. 运行websocket服务
> php artisan ws:start co
后续扩展
权限校验
超时机制
数据同步
配置参数
……
本作品采用《CC 协议》,转载必须注明作者和本文链接
每遇到用
swoole
的,就觉得是时候停止CURD
了。哎!啥时候我能吧swoole
能玩的转哦 :relieved:~~:+1:好文章
期待后续