一个需求问题 求提供方案(可打赏)

问题描述

Laravel

接到一个开发任务,需求是当报警超过X分钟未处理则通知用户,
我打算用redis任务队列来做,当某个设备触发报警时 就把[设备id]和[触发时间]字段插入到任务队列,消费者进程获得[设备id]和[触发时间]字段,然后以[设备id]字段为条件查询数据库表判断[触发时间]是否超过报警时间.
超过报警时间则进行消息通知,未超过报警时间则插入任务队列进行下一个循环的消费

这个方案我总感觉不够好. 不知道论坛里的朋友们有没有做过类似的需求 能提供更优秀的解决方案 我可以打赏

—————————– 更新分割线————————————

非常感谢大家的反馈。但是还有一个问题 可能大家没有关注到,就是这个XX分钟报警超时是可以动态设置和取消的,并不是固定某分钟。
比如我设置某个设备超过30分钟报警未处理就消息通知,如果我在某一个设备触发报警30分钟以内修改了规则把时间进行提前或者延后,之前【30分钟】已经进入到任务队列里了 这个时候无法进行取消啊。
我现在的做法是每秒请求数据库查询【设备报警表】获得未处理的报警记录 获得【报警触发时间】以及上一次【消息推送时间】字段,获得之后进行规则匹配。
比如某个设备我分别设置5分钟、15分钟未处理就进行消息通知, 我就可以根据触发时间和报警X分钟进行比对 如果大于这个【报警X分钟】就消息通知,并在【设备报警表】记录此次消息推送时间。
下一次循环周期 把上一次【消息推送时间】字段也进行对比。这样 好像就能解决掉这个【报警未处理】时间动态设置的问题,但是缺点就是每秒请求数据库 感觉这个方案好糙啊

《L05 电商实战》
从零开发一个电商项目,功能包括电商后台、商品 & SKU 管理、购物车、订单管理、支付宝支付、微信支付、订单退款流程、优惠券等
《G01 Go 实战入门》
从零开始带你一步步开发一个 Go 博客项目,让你在最短的时间内学会使用 Go 进行编码。项目结构很大程度上参考了 Laravel。
讨论数量: 31

这是一个比较有意思的话题,于是专门写了一篇文章《一个延时任务问题引发的思考》来阐述我的想法。

有时间的话可以去看看,希望可以帮到你。

11个月前 评论

这个我正好做过。

首先别指望 redis ,他的工作不是做这个。
我的方案是这样的,启一个 rabbitmq ,增加延时插件。 rabbitmq 可以直接 docker 安装很方便。

写一个处理报警通知的功能,也即触发时将消息发给对应的人,相信这个你已经做好了。注意这里设置一个时间容差,比如半分钟,在任务的前后半分钟都认可并执行,超出时间不执行。
然后,建立第1道报警时,直接使用上述功能直接发送不谈。
第2道在 N 分钟,采用延时队列。往 rabbitmq 中写入延时任务,延时为对应的 N 分钟,并记录绑定任务编号。

使用 supervisorctl 启动一个监听队列,专门处理队列里的延时任务。
N 分钟之后,队列收到消息,执行发送警告操作。

如果在 N 分钟时间内没有更改设置,一切OK。

如果 在 N - 1 分钟的时候,用户更改了时间,改成了 N + 5 分钟的时刻,则将这个视为新任务继续直接给延时队列。前一个任务删除之或者放任不管(超过容差时间了不会执行)。

超过 N 分钟不让改了就是。

因为队列阻塞式的,不会频繁查表,性能没问题,整套方案都很稳定。

11个月前 评论
Neutrino (楼主) 11个月前
qufo (作者) 11个月前
巴啦啦

楼主的问题点在于,使用延时队列时,如何取消已设定的任务,或者重新设定时间。 而不在于使用何种方案,因为使用延时队列,本身就已经是最佳实践了,至于到底是用 rabbitmq 还是 redis 亦或是其他载体,都无所谓。

11个月前 评论
Neutrino (楼主) 11个月前
徵羽宫 11个月前
NeoHu

贴一段代码, 应该符合你可编辑定时任务的需求 代码是python 用的PPGo_Job 服务
相关文档 http://www.haodaquan.com/topics/1###
自己修改每隔N分钟报警并且记录 无处理则再触发下一个定时任务

def cron(cron_type=1, cron_id=0, task_name=None, description=None, cron_spec=None, command=None):
    """
    Cron配置
    """
    url = ‘cron_test.com/api/task’ # 定时请求的api接口 其实就是定时去请求触发对应的事件

    if cron_type == 1:
        url = url + '/task/apitask'  # 配置
        #
        d = {
            'task_name': task_name, # 每隔五分钟报警
            'description': description,
            'cron_spec': cron_spec,
            'command': command,
            'server_type': 1,
            'create_id': 1,
            'group_id': 1,
            'concurrent': 0,
            'server_ids': 0,
            'timeout': 0,
            'is_notify': 0,
            'notify_type': 0,
            'notify_tpl_id': 0,
            'notify_user_ids': 0,
            'id': cron_id,
        }
    elif cron_type == 2:
        url = url + '/task/apistart'  # 启动
        d = {
            'id': cron_id
        }

    elif cron_type == 3:
        url = url + '/task/apipause'  # 暂停
        d = {
            'id': cron_id
        }
    elif cron_type == 4:
        url = url + '/task/edit'  # 编辑
        d = {
            'id': cron_id,
            'task_name': task_name,
            'description': description,
            'cron_spec': cron_spec,  # 这里就是 定时任务时间的表达式
            'command': command,
        }

    try:
        r = requests.post(url=url, data=d, verify=False)
        text = json.loads(r.text)
        if r.text is not None and text['status'] == 0:
            return text['message']
        else:
            return
    except:
        print('定时任务设置错误')
        return


def cronSpec(warning_type, target_id, start_time, end_time, interval_minute, config_id):
    """
    Cron表达式转换
    """
    warning_type = int(warning_type)
    host = current_app.config.get('DOMAIN_HOST')
    api_route = WarningTarget.query.filter_by(target_id=target_id).with_entities(WarningTarget.route).first()['route']
    cron_route = host + api_route
    # curl localhost:3000/api/json -X POST -d '{"hello": "world"}' --header "Content-Type: application/json"
    command = 'curl ' + cron_route + ' -X POST -F ' + '"' + 'config_id=' + str(config_id) + '"'
    spec = '0 */15  * * *'
    if warning_type == 1:
        spec = '0 */10  * * *'
    elif warning_type == 2:
        # interval_minute
        # 0 0/5 14,15 * * ? 每天下午2点到255  下午3点到355 每隔五分钟执行一次
        # 0 0/30 9-17 * * ? 每天早上9点到晚上5点 每隔30分钟执行一次
        # 0 5,10 14,15 * * ? 每天下午25210 分别触发一次  下午35310 分别触发一次
        # 提取时间段中的整点 9 15
        start_int = timeFormat(start_time)
        end_int = timeFormat(end_time)
        section = str(start_int) + '-' + str(end_int)
        spec = '0 0/' + str(interval_minute) + ' ' + section + ' * * ?'
    elif warning_type == 3:
        # 列如 230-630 要转换成 300  400 500 6:00 各执行一次
        # 0,0,0, 10,14,16 * * ? 代表每天上午10点 下午24点各执行一次
        interval_minute = 59  # 固定每隔一小时 todo
        start_num = timeFormat(start_time)
        end_num = timeFormat(end_time)
        section = []
        if int(start_num) + 1 == int(end_num):
            section_str = int(end_num)
        else:
            for i in range(int(start_num) + 1, int(end_num) + 1):
                section.append(str(i))
            section_str = ','.join(section)
        # 0 59 1-3 ? * *
        spec = '0 0' + ' ' + str(section_str) + ' * * ?'
    db.session.close()
    return {'spec': spec, 'command': command}
10个月前 评论
Neutrino (楼主) 10个月前

可以采用延时队列+redis;当触发报警,走一个5分钟得延时队列,在这五分钟内如果有人处理了,那就标记一个缓存或者修改 mysql 数据,使那个延时队列标记为已结束

11个月前 评论

要是我的话就把超时时间(时间戳) hash 一下, 然后作为 redis hset 的 key , 然后把 key 加入一个 set . 然后写一个无限循环, 循环里面获取当前时间戳, 然后 hash 当前时间戳, 然后判断 hash 后的结果在 set 中存在不存在, 如果存在的话 从 hset 拿到 info 信息 然后 从set中移除 key 然后 dispatch 一个超时事件, 最后事件里面再进行发送消息的相关逻辑。

class checkWarnTimeout extends Command{
    function handle(){
        while(True){
            // 这里你如果预期警告信息会特别多,导致下面的逻辑需要的时间特别长,你就可以对当前时间求模, 插入的时候保证时间是 5 的倍数, 然后设置时间间隔。比如每五秒处理一次,就判断当前时间是否能被 5 整除,如果不能就跳过当次循环。 
            $nowTime = time()
            // todo 这里你自己确定怎么 hash
            $key = Hash::encode($nowTime);
            // todo 这里自己实现一个 RedisUtil 
            if(RedisUtil::has('warn:set', $key))
            {
                // 同上如果这里预期警告信息会特别多, 这里拿的就不是info了, 而是某个时间段内所有报警信息key的list。 
                $info = RedisUtil::get('warn:hset', $key);
                RedisUtil::del('warn:set', $key);
                SendWarnTimeOut::dispatch($info);
            }
            sleep(1);
        }
    }
}
11个月前 评论
徵羽宫 (作者) 11个月前
陈先生 11个月前
徵羽宫 (作者) 11个月前
徵羽宫 (作者) 11个月前
陈先生 11个月前
徵羽宫 (作者) 11个月前
陈先生 11个月前

延时队列处理呀,不是五分钟后未处理则通知相关人员么,触发告警的时候,就调用延时队列,队列里做判断是否已处理,如果已处理就结束,否则通知相关人员。伪代码如下:

if($bool){ /*如果触发告警 执行五分钟延时队列*/
    $delay = Carbon::now()->addminutes(5);
    YourJob::dispatch($data)->delay($delay);
}
11个月前 评论
陈先生

以下全是伪代码,需要加上自己的理解去变为自己的业务代码,我选择了在5分三秒的时候执行这条队列,因为可能要空出一点操作时间。但求别调这个 3s 的刺

控制器代码内容。


<?php

namespace App\Http\Controllers;

use App\Jobs\PoliceReport;
use Illuminate\Http\Request;

class PoliceController extends Controller
{
    //

    public function store(Request $request)
    {
        //自行处理
        [$notice_members,$is_send_notices,$is_send_notice_sms,$notice_timeout_minutes,$later_notices_members,$is_send_later_notice_sms] = $request->all();
        // 逻辑请自行处理
        $id = 'the id of the police that you just created by the above request data';

        dispatch(new PoliceReport(['report_id'=>$id]))->delay(now()->addMinutes($notice_timeout_minutes)->addSeconds(3));
    }
}

队列文件内容


<?php

namespace App\Jobs;

use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldBeUnique;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;

class PoliceReport implements ShouldQueue
{
    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

    public int $report_id;

    /**
     * Execute the job.
     */
    public function handle(): void
    {
        //当前已经确认到时间未处理
        $report = Report::findOrFail($this->report_id);
        // 发送消息通知到相关人员 短信 & notice
    }
}
11个月前 评论
陈先生 (作者) 11个月前
Neutrino (楼主) 11个月前

你可以去了解一下 rabbitmq, 有一个延迟队列功能(死信队列)
触发警报时插入N分钟的延迟队列, 这时候不会直接消费消息
N分钟后死信交换机会把未处理死心消息, 交给处理队列
然后自己再判断这条报警消息是否被处理,然后走业务逻辑就好了

11个月前 评论
Neutrino (楼主) 11个月前
laisxn

用zadd,然后取出对应的时间周期?

11个月前 评论

要实现你这个需求,得在修改过期时间后触发更新已产生报警的过期时间。 其实在修改过期时间之前的数据,我觉得可以不管了,就沿用之前的过期时间。就好比屎都拉到一半了,要求等一哈儿再拉

11个月前 评论
Neutrino (楼主) 11个月前
sanders

我们一般采用的策略是:延迟队列任务+定时脚本补救,两者都可在 laravel 框架特性里实现。置于你补充的,配置更改的问题:若要适配最新的配置可标注一个配置版本,将所有进入延迟队列的任务记录下来,并据此重新写入队列,而旧的队列任务读取最新的配置版本,发现版本号变动就终止执行。

11个月前 评论

你新增的需求也很好解决啊, 就是队列执行的时候判断一下这个报警的状态,如果报警没处理,并且没超时, 那么重新发一个延迟队列任务。 如果报警没处理并且超时了, 那么发送上报提醒。 如果报警已经处理了, 那么不进行任何操作。

就算不考虑你补充的需求,你发上报提醒之前肯定也是要检查报警状态的呀。 不然不就成了只要有报警必然发上报提醒了。

报警的状态可以弄几个 比如 等待中,处理中,已挂起(表示推迟了报警时间, 但是也没处理),已上报, 已处理, 然后要有日志表记录下操作记录啥的。

11个月前 评论
// 队列类
class Task implements ShouldQueue
{
      public $version;
      public $task;

     public function __construct($task, $version) 
     {
           $this->version = $version;
           $this->task= $task;
    }

    /**
     * Execute the job.
     */
    public function handle(): void
    {
         // 这里判断一下版本号
          if ($this->task->job_version !== $this->version) {
              return;
          }
    }
}

## 把这个模型的延迟时间推入队列
$task = new Task();
$task->name = '测试任务';
$task->delay_seconds = 300;
$task->job_version = 1;
$task->save();
dispatch($task, $task->job_version)->delay((now()->addSeconds($task->delay_seconds);

// 上一个有版本的判断, 就不会执行了
// 这个 job_version 直接写到模型观察者的: saving 事件就行
$task->delay_seconds = 100;
$task->job_version += 1;
$task->save();
dispatch($task, $task->job_version)->delay((now()->addSeconds($task->delay_seconds);
11个月前 评论
wenber

下发报警任务时, 根据当前设置的报警时间下发, 每个报警延时任务执行时, 再查一下最新设置的报警时间, 如果较下发前增加, 则重新下发任务, 如何未重新设置, 则执行当前任务. 这样就可以满足报警时间动态配置的问题.

11个月前 评论

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!