基于 swoole 的 websocket 服务一:状态同步

基于 swoole 的 websocket 服务一:状态同步

基础流程图

基础流程图

效果展示图

界面展示

界面展示

终端展示

前端实现代码

swoole_websocket.html

<html>
    <head>
        <title>WebSocket</title>
        <style>
            h2, hr, ul, li {
                margin: 0;
                padding: 0;
            }
            hr {
                width: 200px;
                margin-top: 5px;
                margin-bottom: 5px;
            }
            #main li {
                list-style: none;
            }
            #main li span {
                display: inline-block;
                width: 50px;
            }
        </style>
    </head>
    <!-- 初始化加载子节点 -->
    <body onload="initialize();">
        <h2>WebSocket</h2>
        <hr>
        <div>
            <ul id="main">
                <!-- 待渲染子节点列表 -->
            </ul>
        </div>
        <hr>
        <button onclick="connWebsocket()">发送</button>
    </body>
    <script>

        // 初始化页面加载函数
        function initialize() {
            for (let i = 0; i < 10; i++) {
                // 获取主节点
                let ulNode = window.document.getElementById('main')
                // 生成随机id
                let id = makeRandomNumber(5)
                // 创建子节点
                liNode = window.document.createElement('li')
                liNode.innerHTML = `<span>${id}</span><span></span><span>未开始</span>`
                liNode.firstElementChild.setAttribute('style', 'color:#363636;')
                liNode.lastElementChild.setAttribute('style', 'color:#363636;')
                // 应用子节点
                ulNode.appendChild(liNode)
            }
        }

        // 连接websocket服务器
        function connWebsocket() {
            // 所有的liNodes置为`进行中`
            let liNodes = window.document.getElementById('main').children
            // 待发送消息数据
            let allData = []
            for (let i = 0; i < liNodes.length; i++) {
                let firstNode = liNodes[i].firstElementChild
                let lastNode = liNodes[i].lastElementChild

                // 渲染`进行中`状态样式
                firstNode.setAttribute('style', 'color:#FF6347;')
                lastNode.setAttribute('style', 'color:#FF6347;')
                lastNode.textContent = '进行中'

                allData.push({
                    'id': firstNode.textContent,
                    'status': lastNode.textContent,
                })
            }

            // 进行weksocket通信服务
            let ws = new WebSocket("ws://localhost:9999");

            ws.onopen = function(evt) {
                console.log('connection start')
                // 打开连接就发送消息
                ws.send(JSON.stringify(allData))
            };

            // 已完成数量
            let completeNum = 0
            ws.onmessage = function(evt) {
                // 异常或错误处理
                try {
                    var obj = JSON.parse(evt.data);
                } catch (e) {
                    console.error(e)
                    return
                }
                if (!obj.id || !obj.status) {
                    console.error(`property is not undefined.`)
                    return
                }

                // 找到`id`对应节点
                let nodeList = [...liNodes]
                let liNode = nodeList.find((node) => {
                    return node.firstElementChild.textContent == obj.id
                })
                if (!liNode) {
                    console.error(`li node is not found.`)
                    return;
                }

                // 重新渲染界面,并自增已完成数量
                completeNum++
                liNode.lastElementChild.textContent = obj.status
                liNode.lastElementChild.setAttribute('style', 'color:#008B8B;')
                liNode.firstElementChild.setAttribute('style', 'color:#008B8B;')

                // 如果完成数和节点数相等,主动断开连接
                if (completeNum === liNodes.length) {
                    ws.close()
                }
            };

            ws.onclose = function(evt) {
                console.log("connection close");
            }
        }

        // 制作整型随机数
        function makeRandomNumber(digit = 6) {
            if (digit < 1 || digit > 10) {
                throw new RangeError('位数不能小于1且不能大于10')
            }
            const min = Math.pow(10, digit - 1)
            const max = Math.pow(10, digit) - 1

            let val = Math.floor(Math.random() * max + 1)
            while (val < min) {
                val = Math.floor(Math.random() * max + 1)
            }

            return val
        }

    </script>
</html>

后端代码实现

注意:演示框架是 laravel5.7+ 以上版本

1. 查看swoole扩展版本

> php --ri swoole

swoole

Swoole => enabled
Author => Swoole Team <team@swoole.com>
Version => 4.5.11
Built => Feb 21 2022 14:53:00
coroutine => enabled
kqueue => enabled
rwlock => enabled
pcre => enabled
zlib => 1.2.11
brotli => E16777225/D16777225
async_redis => enabled

Directive => Local Value => Master Value
swoole.enable_coroutine => On => On
swoole.enable_library => On => On
swoole.enable_preemptive_scheduler => Off => Off
swoole.display_errors => On => On
swoole.use_shortname => On => On
swoole.unixsock_buffer_size => 262144 => 262144

注意:以上演示 swoole 扩展版本为 4.5+

2. 创建自定义websocket服务类

app/Handlers/Websockets/CoServer.php

<?php

namespace App\Handlers\Websockets;

use Swoole\Coroutine;

class CoServer
{
    public function __construct()
    {
        // 开启一键协程化
        \Swoole\Runtime::enableCoroutine(SWOOLE_HOOK_ALL);
    }

    public function start()
    {
        // 异步风格websocket服务器
        $ws = new \Swoole\WebSocket\Server('127.0.0.1', 9999);

        $ws->on('Open', function ($ws, $request) {
            $this->log('连接成功', $request->fd);
        });

        $ws->on('Message', function ($ws, $frame) {

            $fid = $frame->fd ?? 0;
            $data = json_decode($frame->data, true);
            if (!$data || !is_array($data)) {
                $this->log('消息格式错误!', $fid);
                return;
            }
            $this->log('服务器已接收消息.', $fid);

            // 协程间通信,类似 go 的 sync.WaitGroup
            $wg = new \Swoole\Coroutine\WaitGroup();

            // 创建请求远端协程任务
            $wg->add();
            Coroutine::create(function () use($wg, $data, $fid) {
                $this->log('已发送消息到远端.', $fid);

                // 协程 http 客户端
                $client = new \Swoole\Coroutine\Http\Client('127.0.0.1', 9101);
                $client->post('/remote/message', [
                    'data' => $data,
                    'fid' => $fid,
                ]);
                $client->close();
                $wg->done();
            });

            // 创建监听消息协程任务
            $wg->add();
            Coroutine::create(function () use ($wg, $ws, $fid) {
                $this->log('开始监听消息.', $fid);

                // 协程 redis 客户端
                $redis = new \Swoole\Coroutine\Redis();
                $redis->connect('127.0.0.1', 6379);
                if ($redis->subscribe(['ws:fid:'.$fid])) {
                    while ($msg = $redis->recv()) {
                        list($type, $name, $cont) = $msg;
                        if ($type == 'message' && $name == 'ws:fid:'.$fid) {
                            $ws->push($fid, $cont);
                            $this->log('消息已回复', $fid);
                        }
                    }
                }
                $redis->close();
                $wg->done();
            });

            $wg->wait();
            // 处理完主动断开连接
            $ws->close($fid);
        });

        $ws->on('close', function ($server, $fid) {
            echo "client {$fid} closed\n";
        });

        $ws->start();
    }

    protected function log($msg, $fd = 0)
    {
        if ($fd) {
            echo sprintf('[%s]: %d -> %s'.PHP_EOL, date('H:i:s'), $fd, $msg);
        } else {
            echo sprintf('[%s]: %s'.PHP_EOL, date('H:i:s'), $msg);
        }
    }
}

3. 模拟远端请求接口方法

app/Http/Controllers/RemoteController.php

<?php

namespace App\Http\Controllers;

use Illuminate\Http\Request;
use Illuminate\Support\Facades\Log;
use Illuminate\Support\Facades\Redis;

class RemoteController extends Controller
{
      // 对应路由:/remote/message POST
    public function message(Request $request)
    {
        $data = $request->input('data');
        $fid = $request->input('fid');

        Log::info('请求参数: '.json_encode($request->all()));

        while (true) {
            if (empty($data)) {
                break;
            }

            usleep(500000);

            foreach ($data as $key => $val) {
                if (random_int(0, 100) > 80) {
                    $val['status'] = '已完成';
                    Log::info('获取消息中...');
                    Redis::connection()->publish('ws:fid:'.$fid, json_encode($val));
                    unset($data[$key]);
                }
            }
        }

        return response()->json([
            'status' => 0,
            'msg'    => 'success',
            'data'   => null,
        ]);
    }
}

4. 创建命令执行websocket服务

app/Console/Commands/WebsocketServer.php

<?php

namespace App\Console\Commands;

use App\Handlers\Websockets\CoServer;
use Illuminate\Console\Command;
use Ratchet\Http\HttpServer;
use Ratchet\Server\IoServer;
use Ratchet\WebSocket\WsServer;

class WebsocketServer extends Command
{
    protected $signature = 'ws:start {server}';

    protected $description = 'Command description';

    const SERVER_HTTP = 'http';
    const SERVER_CO = 'co';

    public function __construct()
    {
        parent::__construct();
    }

    public function handle()
    {
        $allowServers = [self::SERVER_HTTP, self::SERVER_CO];
        $inputServer = $this->argument('server');
        if (!in_array($inputServer, $allowServers)) {
            $this->error('服务器类型错误');
            return;
        }

        switch ($inputServer) {
            case self::SERVER_HTTP:
                $this->handleHttp();
                break;
            case self::SERVER_CO:
                $this->handleCo();
                break;
            default:
                $this->error('服务器类型不存在');
        }

    }

    protected function handleHttp()
    {
//        $server = IoServer::factory(
//            new HttpServer(
//                new WsServer(
//                    new \App\Handlers\Websockets\HttpServer()
//                )
//            )
//            , 9999);
//        $server->run();
    }

    protected function handleCo()
    {
        $server = new CoServer();
        $server->start();
    }
}

5. 运行websocket服务

> php artisan ws:start co

后续扩展

  1. 权限校验

  2. 超时机制

  3. 数据同步

  4. 配置参数

    ……

本作品采用《CC 协议》,转载必须注明作者和本文链接
《L01 基础入门》
我们将带你从零开发一个项目并部署到线上,本课程教授 Web 开发中专业、实用的技能,如 Git 工作流、Laravel Mix 前端工作流等。
《L02 从零构建论坛系统》
以构建论坛项目 LaraBBS 为线索,展开对 Laravel 框架的全面学习。应用程序架构思路贴近 Laravel 框架的设计哲学。
讨论数量: 7
laravel_peng

每遇到用 swoole 的,就觉得是时候停止 CURD 了。哎!啥时候我能吧 swoole 能玩的转哦 :relieved:~~

1年前 评论
romp (楼主) 1年前
romp (楼主) 1年前

:+1:好文章

1年前 评论
romp (楼主) 1年前

期待后续

1年前 评论
romp (楼主) 1年前

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!