基于 swoole 的 websocket 服务一:状态同步

基于 swoole 的 websocket 服务一:状态同步

基础流程图

基础流程图

效果展示图

界面展示

界面展示

终端展示

前端实现代码

swoole_websocket.html

<html>
    <head>
        <title>WebSocket</title>
        <style>
            h2, hr, ul, li {
                margin: 0;
                padding: 0;
            }
            hr {
                width: 200px;
                margin-top: 5px;
                margin-bottom: 5px;
            }
            #main li {
                list-style: none;
            }
            #main li span {
                display: inline-block;
                width: 50px;
            }
        </style>
    </head>
    <!-- 初始化加载子节点 -->
    <body onload="initialize();">
        <h2>WebSocket</h2>
        <hr>
        <div>
            <ul id="main">
                <!-- 待渲染子节点列表 -->
            </ul>
        </div>
        <hr>
        <button onclick="connWebsocket()">发送</button>
    </body>
    <script>

        // 初始化页面加载函数
        function initialize() {
            for (let i = 0; i < 10; i++) {
                // 获取主节点
                let ulNode = window.document.getElementById('main')
                // 生成随机id
                let id = makeRandomNumber(5)
                // 创建子节点
                liNode = window.document.createElement('li')
                liNode.innerHTML = `<span>${id}</span><span></span><span>未开始</span>`
                liNode.firstElementChild.setAttribute('style', 'color:#363636;')
                liNode.lastElementChild.setAttribute('style', 'color:#363636;')
                // 应用子节点
                ulNode.appendChild(liNode)
            }
        }

        // 连接websocket服务器
        function connWebsocket() {
            // 所有的liNodes置为`进行中`
            let liNodes = window.document.getElementById('main').children
            // 待发送消息数据
            let allData = []
            for (let i = 0; i < liNodes.length; i++) {
                let firstNode = liNodes[i].firstElementChild
                let lastNode = liNodes[i].lastElementChild

                // 渲染`进行中`状态样式
                firstNode.setAttribute('style', 'color:#FF6347;')
                lastNode.setAttribute('style', 'color:#FF6347;')
                lastNode.textContent = '进行中'

                allData.push({
                    'id': firstNode.textContent,
                    'status': lastNode.textContent,
                })
            }

            // 进行weksocket通信服务
            let ws = new WebSocket("ws://localhost:9999");

            ws.onopen = function(evt) {
                console.log('connection start')
                // 打开连接就发送消息
                ws.send(JSON.stringify(allData))
            };

            // 已完成数量
            let completeNum = 0
            ws.onmessage = function(evt) {
                // 异常或错误处理
                try {
                    var obj = JSON.parse(evt.data);
                } catch (e) {
                    console.error(e)
                    return
                }
                if (!obj.id || !obj.status) {
                    console.error(`property is not undefined.`)
                    return
                }

                // 找到`id`对应节点
                let nodeList = [...liNodes]
                let liNode = nodeList.find((node) => {
                    return node.firstElementChild.textContent == obj.id
                })
                if (!liNode) {
                    console.error(`li node is not found.`)
                    return;
                }

                // 重新渲染界面,并自增已完成数量
                completeNum++
                liNode.lastElementChild.textContent = obj.status
                liNode.lastElementChild.setAttribute('style', 'color:#008B8B;')
                liNode.firstElementChild.setAttribute('style', 'color:#008B8B;')

                // 如果完成数和节点数相等,主动断开连接
                if (completeNum === liNodes.length) {
                    ws.close()
                }
            };

            ws.onclose = function(evt) {
                console.log("connection close");
            }
        }

        // 制作整型随机数
        function makeRandomNumber(digit = 6) {
            if (digit < 1 || digit > 10) {
                throw new RangeError('位数不能小于1且不能大于10')
            }
            const min = Math.pow(10, digit - 1)
            const max = Math.pow(10, digit) - 1

            let val = Math.floor(Math.random() * max + 1)
            while (val < min) {
                val = Math.floor(Math.random() * max + 1)
            }

            return val
        }

    </script>
</html>

后端代码实现

注意:演示框架是 laravel5.7+ 以上版本

1. 查看swoole扩展版本

> php --ri swoole

swoole

Swoole => enabled
Author => Swoole Team <team@swoole.com>
Version => 4.5.11
Built => Feb 21 2022 14:53:00
coroutine => enabled
kqueue => enabled
rwlock => enabled
pcre => enabled
zlib => 1.2.11
brotli => E16777225/D16777225
async_redis => enabled

Directive => Local Value => Master Value
swoole.enable_coroutine => On => On
swoole.enable_library => On => On
swoole.enable_preemptive_scheduler => Off => Off
swoole.display_errors => On => On
swoole.use_shortname => On => On
swoole.unixsock_buffer_size => 262144 => 262144

注意:以上演示 swoole 扩展版本为 4.5+

2. 创建自定义websocket服务类

app/Handlers/Websockets/CoServer.php

<?php

namespace App\Handlers\Websockets;

use Swoole\Coroutine;

class CoServer
{
    public function __construct()
    {
        // 开启一键协程化
        \Swoole\Runtime::enableCoroutine(SWOOLE_HOOK_ALL);
    }

    public function start()
    {
        // 异步风格websocket服务器
        $ws = new \Swoole\WebSocket\Server('127.0.0.1', 9999);

        $ws->on('Open', function ($ws, $request) {
            $this->log('连接成功', $request->fd);
        });

        $ws->on('Message', function ($ws, $frame) {

            $fid = $frame->fd ?? 0;
            $data = json_decode($frame->data, true);
            if (!$data || !is_array($data)) {
                $this->log('消息格式错误!', $fid);
                return;
            }
            $this->log('服务器已接收消息.', $fid);

            // 协程间通信,类似 go 的 sync.WaitGroup
            $wg = new \Swoole\Coroutine\WaitGroup();

            // 创建请求远端协程任务
            $wg->add();
            Coroutine::create(function () use($wg, $data, $fid) {
                $this->log('已发送消息到远端.', $fid);

                // 协程 http 客户端
                $client = new \Swoole\Coroutine\Http\Client('127.0.0.1', 9101);
                $client->post('/remote/message', [
                    'data' => $data,
                    'fid' => $fid,
                ]);
                $client->close();
                $wg->done();
            });

            // 创建监听消息协程任务
            $wg->add();
            Coroutine::create(function () use ($wg, $ws, $fid) {
                $this->log('开始监听消息.', $fid);

                // 协程 redis 客户端
                $redis = new \Swoole\Coroutine\Redis();
                $redis->connect('127.0.0.1', 6379);
                if ($redis->subscribe(['ws:fid:'.$fid])) {
                    while ($msg = $redis->recv()) {
                        list($type, $name, $cont) = $msg;
                        if ($type == 'message' && $name == 'ws:fid:'.$fid) {
                            $ws->push($fid, $cont);
                            $this->log('消息已回复', $fid);
                        }
                    }
                }
                $redis->close();
                $wg->done();
            });

            $wg->wait();
            // 处理完主动断开连接
            $ws->close($fid);
        });

        $ws->on('close', function ($server, $fid) {
            echo "client {$fid} closed\n";
        });

        $ws->start();
    }

    protected function log($msg, $fd = 0)
    {
        if ($fd) {
            echo sprintf('[%s]: %d -> %s'.PHP_EOL, date('H:i:s'), $fd, $msg);
        } else {
            echo sprintf('[%s]: %s'.PHP_EOL, date('H:i:s'), $msg);
        }
    }
}

3. 模拟远端请求接口方法

app/Http/Controllers/RemoteController.php

<?php

namespace App\Http\Controllers;

use Illuminate\Http\Request;
use Illuminate\Support\Facades\Log;
use Illuminate\Support\Facades\Redis;

class RemoteController extends Controller
{
      // 对应路由:/remote/message POST
    public function message(Request $request)
    {
        $data = $request->input('data');
        $fid = $request->input('fid');

        Log::info('请求参数: '.json_encode($request->all()));

        while (true) {
            if (empty($data)) {
                break;
            }

            usleep(500000);

            foreach ($data as $key => $val) {
                if (random_int(0, 100) > 80) {
                    $val['status'] = '已完成';
                    Log::info('获取消息中...');
                    Redis::connection()->publish('ws:fid:'.$fid, json_encode($val));
                    unset($data[$key]);
                }
            }
        }

        return response()->json([
            'status' => 0,
            'msg'    => 'success',
            'data'   => null,
        ]);
    }
}

4. 创建命令执行websocket服务

app/Console/Commands/WebsocketServer.php

<?php

namespace App\Console\Commands;

use App\Handlers\Websockets\CoServer;
use Illuminate\Console\Command;
use Ratchet\Http\HttpServer;
use Ratchet\Server\IoServer;
use Ratchet\WebSocket\WsServer;

class WebsocketServer extends Command
{
    protected $signature = 'ws:start {server}';

    protected $description = 'Command description';

    const SERVER_HTTP = 'http';
    const SERVER_CO = 'co';

    public function __construct()
    {
        parent::__construct();
    }

    public function handle()
    {
        $allowServers = [self::SERVER_HTTP, self::SERVER_CO];
        $inputServer = $this->argument('server');
        if (!in_array($inputServer, $allowServers)) {
            $this->error('服务器类型错误');
            return;
        }

        switch ($inputServer) {
            case self::SERVER_HTTP:
                $this->handleHttp();
                break;
            case self::SERVER_CO:
                $this->handleCo();
                break;
            default:
                $this->error('服务器类型不存在');
        }

    }

    protected function handleHttp()
    {
//        $server = IoServer::factory(
//            new HttpServer(
//                new WsServer(
//                    new \App\Handlers\Websockets\HttpServer()
//                )
//            )
//            , 9999);
//        $server->run();
    }

    protected function handleCo()
    {
        $server = new CoServer();
        $server->start();
    }
}

5. 运行websocket服务

> php artisan ws:start co

后续扩展

  1. 权限校验

  2. 超时机制

  3. 数据同步

  4. 配置参数

    ……

本作品采用《CC 协议》,转载必须注明作者和本文链接
《L03 构架 API 服务器》
你将学到如 RESTFul 设计风格、PostMan 的使用、OAuth 流程,JWT 概念及使用 和 API 开发相关的进阶知识。
《L02 从零构建论坛系统》
以构建论坛项目 LaraBBS 为线索,展开对 Laravel 框架的全面学习。应用程序架构思路贴近 Laravel 框架的设计哲学。
讨论数量: 7
laravel_peng

每遇到用 swoole 的,就觉得是时候停止 CURD 了。哎!啥时候我能吧 swoole 能玩的转哦 :relieved:~~

1年前 评论
romp (楼主) 1年前
romp (楼主) 1年前

:+1:好文章

1年前 评论
romp (楼主) 1年前

期待后续

1年前 评论
romp (楼主) 1年前

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!