利用swoole扩展进行 延时异步定时任务

本次使用的业务环境在laravel9 + swoole4 + docker lnmp 没有docker环境的只能linux上操作了 宝塔的话就一键安装swoole 看到最近挺多异步任务的 要是用tp3或者旧版本一点的其他框架没有延迟异步任务就不好实现需求了 因此就发了这篇文章
swoole本身是一个扩展可以用在任何框架层面上 写法比较简洁 具体自己封装 只提供个思路

业务场景:需要自定义处理每个不同任务的时间 毫秒级别 比如:每个学校 自定义设置一个通知时间 到点了就提醒下面的学生上传健康码
比如
A学校 设置每天10点通知
B学校 设置每天15点通知
可以进行一个编辑修改 动态的任务

就拿laravel进行一个演示 不进行artisan命令这些封装了 就简单点的用法

先在app 目录创建 service目录 定义一个文件 SwooleService.php 这个文件不依赖laravel
代码 直接cd到当前目录

<?php

//创建TCP Server对象,监听 127.0.0.1:9501 端口 就类似nginx一样的一种东西 
$server = new Swoole\Server('127.0.0.1', 2345);

//监听连接进入事件
$server->on('Connect', function ($server, $fd) {

});

//监听数据接收事件 接收后端laravel控制器发送过来的数据(也可以叫客户端 怕被误解)
$server->on('Receive', function ($server, $fd, $reactor_id, $data) {

    //接收到的数据
    /**
    【time=>3】时间 单位秒
    【type=>['open','close']】类型 发布任务 ,接收任务
    【tick_id->1】任务发布成功获得的id 可存起来用于后续关闭这个任务
    **/
    $data    = trim($data);
    $info = json_decode($data);
    echo $info->type;

    //此处应该封装个方法调用的 
    //如果数据类型未open 那么代表发布任务 
    if($info->type=="open"){
        $time = $info->time;
        echo "我收到这个任务啦要求我每".$time."秒跑一次这个任务\n";

        //swoole_timer_tick本身异步+协成 
        //此处发布任务
        $tick_id = swoole_timer_tick($time*1000,function()use($time){

            /** 此处的业务逻辑可以用curl 请求到laravel对应的控制器方法去做一个操作此处                简写 自行替换 */
            $cli = new \Swoole\Coroutine\Http\Client('httpbin.org', 80);
            $cli->get('/get');
            echo $cli->body;
            $cli->close();
            echo "执行每".$time."秒一个的任务\n";
        });

        echo "这个定时器id是".$tick_id;
        //返回数据给客户端检测是否发布了这个任务  对应SwooleClient.php的 $client->recv();方法
        $server->send($fd, json_encode(['msg'=>"每".$time."秒一个定时器id任务id为".$tick_id."\n",'tick_id'=>$tick_id]));
    }

    if($info->type=="close"){
        $tick_id =  $info->tick_id;
        echo "我收到中断信号 要求中断关闭定时器ID".$tick_id." \n";
        //关闭当前在执行的定时器
        swoole_timer_clear($tick_id);
        //返回数据给客户端检测是否发布了这个任务  对应SwooleClient.php的 $client->recv();方法
        $server->send($fd, json_encode(['msg'=>"中断定时器id为".$tick_id,'tick_id'=>$tick_id]));
    }

});

//监听连接关闭事件
$server->on('Close', function ($server, $fd) {
    //echo "Client: Close.\n";
});

//启动服务器
$server->start();

执行 php SwooleService.php 开始监听请求

如遇到报错 Error: Address already in use
代表端口被占用 执行
netstat -anp | grep 2345
获取到进程号kill 上面获取到的进程id

laravel 定义路由

Route::get('/send','\App\Http\Controllers\IndexController@send');
Route::get('/close','\App\Http\Controllers\IndexController@close');

IndexController控制器代码

<?php

namespace App\Http\Controllers;

class IndexController extends Controller
{
    //发布任务方法
    public function send(){

        //动态填写参数多久后执行 秒单位
        $time = request()->get("time",2);

        //链接刚刚创建的service 127.0.0.1:2345端口
        $client = new \Swoole\Client(SWOOLE_SOCK_TCP);
        if (!$client->connect('127.0.0.1', 2345, -1)) {
            exit("connect failed. Error: {$client->errCode}\n");
        }
        //发送数据到服务端127.0.0.1:2345端口
        $client->send(json_encode(['type'=>'open','time'=>$time]));

        //返回tick_id 执行任务的id 此处是能接收到SwooleService.php服务端返回的任务id的
        echo $client->recv();
        //*****$client->recv()返回值 进行业务逻辑处理存起来这个任务的id 方便下次任务关闭 监控等
        //关闭客户链接
        $client->close();
    }

    //停止任务方法
    public function close(){

        //获取任务的id 上面send方法 $client->recv();这行代码接收到的id
        $tick_id = request()->get("tick_id");

        $client = new \Swoole\Client(SWOOLE_SOCK_TCP);
        if (!$client->connect('127.0.0.1', 2345, -1)) {
            exit("connect failed. Error: {$client->errCode}\n");
        }
        //类型为close 触发 SwooleService.php的关闭事件
        $client->send(json_encode(['type'=>'close','tick_id'=>$tick_id]));
        //接收服务器返回的消息
        echo $client->recv();
        //关闭客户端
        $client->close();
    }


}

附上apipost请求的结果按顺序 先开启监听 发布任务 再关闭任务
运行开始监听
利用swoole扩展进行 延时异步任务

利用swoole扩展进行 延时异步任务

利用swoole扩展进行 延时异步任务

任务已经成功关闭

大体的业务逻辑就是
普通http请求 请求到laravel对应的控制器 接着laravel控制器 TCP请求携带参数到
TCP服务端也就是对应的SwooleService.php 文件写的那一串 服务端接收到了 用swoole的毫秒定时器再用curl请求http回laravel对应的控制器进行业务处理实现计划任务

本作品采用《CC 协议》,转载必须注明作者和本文链接
《L05 电商实战》
从零开发一个电商项目,功能包括电商后台、商品 & SKU 管理、购物车、订单管理、支付宝支付、微信支付、订单退款流程、优惠券等
《G01 Go 实战入门》
从零开始带你一步步开发一个 Go 博客项目,让你在最短的时间内学会使用 Go 进行编码。项目结构很大程度上参考了 Laravel。
讨论数量: 7

感觉不可靠,进程保活,进程挂掉重启怎么恢复任务,还是延时队列用着简单靠谱

1年前 评论
y1415181920 (楼主) 1年前

确实进程一挂就g了 我的建议:弄个任务计划表。守护进程守护这个启动的tcp服务 。tcp服务重启或者挂了就重新从计划表里面加载下计划任务。计划表需要一个是否已执行的执行状态。
如楼上所言当然还是延迟队列靠谱些。

1年前 评论
y1415181920 (楼主) 1年前
y1415181920 (楼主) 1年前
Latent (作者) 1年前

创建服务模式改成new Swoole\Server('127.0.0.1', 2345,SWOOLE_PROCESS);
稳定性会高很多 附上官方文档 wiki.swoole.com/#/learn?id=swoole_...

file

1年前 评论

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!