利用swoole扩展进行 延时异步定时任务
本次使用的业务环境在laravel9 + swoole4 + docker lnmp 没有docker环境的只能linux上操作了 宝塔的话就一键安装swoole 看到最近挺多异步任务的 要是用tp3或者旧版本一点的其他框架没有延迟异步任务就不好实现需求了 因此就发了这篇文章
swoole本身是一个扩展可以用在任何框架层面上 写法比较简洁 具体自己封装 只提供个思路
业务场景:需要自定义处理每个不同任务的时间 毫秒级别 比如:每个学校 自定义设置一个通知时间 到点了就提醒下面的学生上传健康码
比如
A学校 设置每天10点通知
B学校 设置每天15点通知
可以进行一个编辑修改 动态的任务
就拿laravel进行一个演示 不进行artisan命令这些封装了 就简单点的用法
先在app 目录创建 service目录 定义一个文件 SwooleService.php 这个文件不依赖laravel
代码 直接cd到当前目录
<?php
//创建TCP Server对象,监听 127.0.0.1:9501 端口 就类似nginx一样的一种东西
$server = new Swoole\Server('127.0.0.1', 2345);
//监听连接进入事件
$server->on('Connect', function ($server, $fd) {
});
//监听数据接收事件 接收后端laravel控制器发送过来的数据(也可以叫客户端 怕被误解)
$server->on('Receive', function ($server, $fd, $reactor_id, $data) {
//接收到的数据
/**
【time=>3】时间 单位秒
【type=>['open','close']】类型 发布任务 ,接收任务
【tick_id->1】任务发布成功获得的id 可存起来用于后续关闭这个任务
**/
$data = trim($data);
$info = json_decode($data);
echo $info->type;
//此处应该封装个方法调用的
//如果数据类型未open 那么代表发布任务
if($info->type=="open"){
$time = $info->time;
echo "我收到这个任务啦要求我每".$time."秒跑一次这个任务\n";
//swoole_timer_tick本身异步+协成
//此处发布任务
$tick_id = swoole_timer_tick($time*1000,function()use($time){
/** 此处的业务逻辑可以用curl 请求到laravel对应的控制器方法去做一个操作此处 简写 自行替换 */
$cli = new \Swoole\Coroutine\Http\Client('httpbin.org', 80);
$cli->get('/get');
echo $cli->body;
$cli->close();
echo "执行每".$time."秒一个的任务\n";
});
echo "这个定时器id是".$tick_id;
//返回数据给客户端检测是否发布了这个任务 对应SwooleClient.php的 $client->recv();方法
$server->send($fd, json_encode(['msg'=>"每".$time."秒一个定时器id任务id为".$tick_id."\n",'tick_id'=>$tick_id]));
}
if($info->type=="close"){
$tick_id = $info->tick_id;
echo "我收到中断信号 要求中断关闭定时器ID".$tick_id." \n";
//关闭当前在执行的定时器
swoole_timer_clear($tick_id);
//返回数据给客户端检测是否发布了这个任务 对应SwooleClient.php的 $client->recv();方法
$server->send($fd, json_encode(['msg'=>"中断定时器id为".$tick_id,'tick_id'=>$tick_id]));
}
});
//监听连接关闭事件
$server->on('Close', function ($server, $fd) {
//echo "Client: Close.\n";
});
//启动服务器
$server->start();
执行 php SwooleService.php 开始监听请求
如遇到报错 Error: Address already in use
代表端口被占用 执行
netstat -anp | grep 2345
获取到进程号kill 上面获取到的进程id
laravel 定义路由
Route::get('/send','\App\Http\Controllers\IndexController@send');
Route::get('/close','\App\Http\Controllers\IndexController@close');
IndexController控制器代码
<?php
namespace App\Http\Controllers;
class IndexController extends Controller
{
//发布任务方法
public function send(){
//动态填写参数多久后执行 秒单位
$time = request()->get("time",2);
//链接刚刚创建的service 127.0.0.1:2345端口
$client = new \Swoole\Client(SWOOLE_SOCK_TCP);
if (!$client->connect('127.0.0.1', 2345, -1)) {
exit("connect failed. Error: {$client->errCode}\n");
}
//发送数据到服务端127.0.0.1:2345端口
$client->send(json_encode(['type'=>'open','time'=>$time]));
//返回tick_id 执行任务的id 此处是能接收到SwooleService.php服务端返回的任务id的
echo $client->recv();
//*****$client->recv()返回值 进行业务逻辑处理存起来这个任务的id 方便下次任务关闭 监控等
//关闭客户链接
$client->close();
}
//停止任务方法
public function close(){
//获取任务的id 上面send方法 $client->recv();这行代码接收到的id
$tick_id = request()->get("tick_id");
$client = new \Swoole\Client(SWOOLE_SOCK_TCP);
if (!$client->connect('127.0.0.1', 2345, -1)) {
exit("connect failed. Error: {$client->errCode}\n");
}
//类型为close 触发 SwooleService.php的关闭事件
$client->send(json_encode(['type'=>'close','tick_id'=>$tick_id]));
//接收服务器返回的消息
echo $client->recv();
//关闭客户端
$client->close();
}
}
附上apipost请求的结果按顺序 先开启监听 发布任务 再关闭任务
运行开始监听
任务已经成功关闭
大体的业务逻辑就是
普通http请求 请求到laravel对应的控制器 接着laravel控制器 TCP请求携带参数到
TCP服务端也就是对应的SwooleService.php 文件写的那一串 服务端接收到了 用swoole的毫秒定时器再用curl请求http回laravel对应的控制器进行业务处理实现计划任务
本作品采用《CC 协议》,转载必须注明作者和本文链接
推荐文章: