一个需求问题 求提供方案(可打赏)

问题描述

Laravel

接到一个开发任务,需求是当报警超过X分钟未处理则通知用户,
我打算用redis任务队列来做,当某个设备触发报警时 就把[设备id]和[触发时间]字段插入到任务队列,消费者进程获得[设备id]和[触发时间]字段,然后以[设备id]字段为条件查询数据库表判断[触发时间]是否超过报警时间.
超过报警时间则进行消息通知,未超过报警时间则插入任务队列进行下一个循环的消费

这个方案我总感觉不够好. 不知道论坛里的朋友们有没有做过类似的需求 能提供更优秀的解决方案 我可以打赏

—————————– 更新分割线————————————

非常感谢大家的反馈。但是还有一个问题 可能大家没有关注到,就是这个XX分钟报警超时是可以动态设置和取消的,并不是固定某分钟。
比如我设置某个设备超过30分钟报警未处理就消息通知,如果我在某一个设备触发报警30分钟以内修改了规则把时间进行提前或者延后,之前【30分钟】已经进入到任务队列里了 这个时候无法进行取消啊。
我现在的做法是每秒请求数据库查询【设备报警表】获得未处理的报警记录 获得【报警触发时间】以及上一次【消息推送时间】字段,获得之后进行规则匹配。
比如某个设备我分别设置5分钟、15分钟未处理就进行消息通知, 我就可以根据触发时间和报警X分钟进行比对 如果大于这个【报警X分钟】就消息通知,并在【设备报警表】记录此次消息推送时间。
下一次循环周期 把上一次【消息推送时间】字段也进行对比。这样 好像就能解决掉这个【报警未处理】时间动态设置的问题,但是缺点就是每秒请求数据库 感觉这个方案好糙啊

《L04 微信小程序从零到发布》
从小程序个人账户申请开始,带你一步步进行开发一个微信小程序,直到提交微信控制台上线发布。
《G01 Go 实战入门》
从零开始带你一步步开发一个 Go 博客项目,让你在最短的时间内学会使用 Go 进行编码。项目结构很大程度上参考了 Laravel。
讨论数量: 31

可以采用延时队列+redis;当触发报警,走一个5分钟得延时队列,在这五分钟内如果有人处理了,那就标记一个缓存或者修改 mysql 数据,使那个延时队列标记为已结束

11个月前 评论

要是我的话就把超时时间(时间戳) hash 一下, 然后作为 redis hset 的 key , 然后把 key 加入一个 set . 然后写一个无限循环, 循环里面获取当前时间戳, 然后 hash 当前时间戳, 然后判断 hash 后的结果在 set 中存在不存在, 如果存在的话 从 hset 拿到 info 信息 然后 从set中移除 key 然后 dispatch 一个超时事件, 最后事件里面再进行发送消息的相关逻辑。

class checkWarnTimeout extends Command{
    function handle(){
        while(True){
            // 这里你如果预期警告信息会特别多,导致下面的逻辑需要的时间特别长,你就可以对当前时间求模, 插入的时候保证时间是 5 的倍数, 然后设置时间间隔。比如每五秒处理一次,就判断当前时间是否能被 5 整除,如果不能就跳过当次循环。 
            $nowTime = time()
            // todo 这里你自己确定怎么 hash
            $key = Hash::encode($nowTime);
            // todo 这里自己实现一个 RedisUtil 
            if(RedisUtil::has('warn:set', $key))
            {
                // 同上如果这里预期警告信息会特别多, 这里拿的就不是info了, 而是某个时间段内所有报警信息key的list。 
                $info = RedisUtil::get('warn:hset', $key);
                RedisUtil::del('warn:set', $key);
                SendWarnTimeOut::dispatch($info);
            }
            sleep(1);
        }
    }
}
11个月前 评论
徵羽宫 (作者) 11个月前
陈先生 11个月前
徵羽宫 (作者) 11个月前
徵羽宫 (作者) 11个月前
陈先生 11个月前
徵羽宫 (作者) 11个月前
陈先生 11个月前

延时队列处理呀,不是五分钟后未处理则通知相关人员么,触发告警的时候,就调用延时队列,队列里做判断是否已处理,如果已处理就结束,否则通知相关人员。伪代码如下:

if($bool){ /*如果触发告警 执行五分钟延时队列*/
    $delay = Carbon::now()->addminutes(5);
    YourJob::dispatch($data)->delay($delay);
}
11个月前 评论
陈先生

以下全是伪代码,需要加上自己的理解去变为自己的业务代码,我选择了在5分三秒的时候执行这条队列,因为可能要空出一点操作时间。但求别调这个 3s 的刺

控制器代码内容。


<?php

namespace App\Http\Controllers;

use App\Jobs\PoliceReport;
use Illuminate\Http\Request;

class PoliceController extends Controller
{
    //

    public function store(Request $request)
    {
        //自行处理
        [$notice_members,$is_send_notices,$is_send_notice_sms,$notice_timeout_minutes,$later_notices_members,$is_send_later_notice_sms] = $request->all();
        // 逻辑请自行处理
        $id = 'the id of the police that you just created by the above request data';

        dispatch(new PoliceReport(['report_id'=>$id]))->delay(now()->addMinutes($notice_timeout_minutes)->addSeconds(3));
    }
}

队列文件内容


<?php

namespace App\Jobs;

use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldBeUnique;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;

class PoliceReport implements ShouldQueue
{
    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

    public int $report_id;

    /**
     * Execute the job.
     */
    public function handle(): void
    {
        //当前已经确认到时间未处理
        $report = Report::findOrFail($this->report_id);
        // 发送消息通知到相关人员 短信 & notice
    }
}
11个月前 评论
陈先生 (作者) 11个月前
Neutrino (楼主) 11个月前

你可以去了解一下 rabbitmq, 有一个延迟队列功能(死信队列)
触发警报时插入N分钟的延迟队列, 这时候不会直接消费消息
N分钟后死信交换机会把未处理死心消息, 交给处理队列
然后自己再判断这条报警消息是否被处理,然后走业务逻辑就好了

11个月前 评论
Neutrino (楼主) 11个月前
laisxn

用zadd,然后取出对应的时间周期?

11个月前 评论

要实现你这个需求,得在修改过期时间后触发更新已产生报警的过期时间。 其实在修改过期时间之前的数据,我觉得可以不管了,就沿用之前的过期时间。就好比屎都拉到一半了,要求等一哈儿再拉

11个月前 评论
Neutrino (楼主) 11个月前
sanders

我们一般采用的策略是:延迟队列任务+定时脚本补救,两者都可在 laravel 框架特性里实现。置于你补充的,配置更改的问题:若要适配最新的配置可标注一个配置版本,将所有进入延迟队列的任务记录下来,并据此重新写入队列,而旧的队列任务读取最新的配置版本,发现版本号变动就终止执行。

11个月前 评论

你新增的需求也很好解决啊, 就是队列执行的时候判断一下这个报警的状态,如果报警没处理,并且没超时, 那么重新发一个延迟队列任务。 如果报警没处理并且超时了, 那么发送上报提醒。 如果报警已经处理了, 那么不进行任何操作。

就算不考虑你补充的需求,你发上报提醒之前肯定也是要检查报警状态的呀。 不然不就成了只要有报警必然发上报提醒了。

报警的状态可以弄几个 比如 等待中,处理中,已挂起(表示推迟了报警时间, 但是也没处理),已上报, 已处理, 然后要有日志表记录下操作记录啥的。

11个月前 评论

这个我正好做过。

首先别指望 redis ,他的工作不是做这个。
我的方案是这样的,启一个 rabbitmq ,增加延时插件。 rabbitmq 可以直接 docker 安装很方便。

写一个处理报警通知的功能,也即触发时将消息发给对应的人,相信这个你已经做好了。注意这里设置一个时间容差,比如半分钟,在任务的前后半分钟都认可并执行,超出时间不执行。
然后,建立第1道报警时,直接使用上述功能直接发送不谈。
第2道在 N 分钟,采用延时队列。往 rabbitmq 中写入延时任务,延时为对应的 N 分钟,并记录绑定任务编号。

使用 supervisorctl 启动一个监听队列,专门处理队列里的延时任务。
N 分钟之后,队列收到消息,执行发送警告操作。

如果在 N 分钟时间内没有更改设置,一切OK。

如果 在 N - 1 分钟的时候,用户更改了时间,改成了 N + 5 分钟的时刻,则将这个视为新任务继续直接给延时队列。前一个任务删除之或者放任不管(超过容差时间了不会执行)。

超过 N 分钟不让改了就是。

因为队列阻塞式的,不会频繁查表,性能没问题,整套方案都很稳定。

11个月前 评论
Neutrino (楼主) 11个月前
qufo (作者) 11个月前
// 队列类
class Task implements ShouldQueue
{
      public $version;
      public $task;

     public function __construct($task, $version) 
     {
           $this->version = $version;
           $this->task= $task;
    }

    /**
     * Execute the job.
     */
    public function handle(): void
    {
         // 这里判断一下版本号
          if ($this->task->job_version !== $this->version) {
              return;
          }
    }
}

## 把这个模型的延迟时间推入队列
$task = new Task();
$task->name = '测试任务';
$task->delay_seconds = 300;
$task->job_version = 1;
$task->save();
dispatch($task, $task->job_version)->delay((now()->addSeconds($task->delay_seconds);

// 上一个有版本的判断, 就不会执行了
// 这个 job_version 直接写到模型观察者的: saving 事件就行
$task->delay_seconds = 100;
$task->job_version += 1;
$task->save();
dispatch($task, $task->job_version)->delay((now()->addSeconds($task->delay_seconds);
11个月前 评论

这是一个比较有意思的话题,于是专门写了一篇文章《一个延时任务问题引发的思考》来阐述我的想法。

有时间的话可以去看看,希望可以帮到你。

11个月前 评论
巴啦啦

楼主的问题点在于,使用延时队列时,如何取消已设定的任务,或者重新设定时间。 而不在于使用何种方案,因为使用延时队列,本身就已经是最佳实践了,至于到底是用 rabbitmq 还是 redis 亦或是其他载体,都无所谓。

11个月前 评论
Neutrino (楼主) 11个月前
徵羽宫 11个月前
wenber

下发报警任务时, 根据当前设置的报警时间下发, 每个报警延时任务执行时, 再查一下最新设置的报警时间, 如果较下发前增加, 则重新下发任务, 如何未重新设置, 则执行当前任务. 这样就可以满足报警时间动态配置的问题.

11个月前 评论
NeoHu

贴一段代码, 应该符合你可编辑定时任务的需求 代码是python 用的PPGo_Job 服务
相关文档 http://www.haodaquan.com/topics/1###
自己修改每隔N分钟报警并且记录 无处理则再触发下一个定时任务

def cron(cron_type=1, cron_id=0, task_name=None, description=None, cron_spec=None, command=None):
    """
    Cron配置
    """
    url = ‘cron_test.com/api/task’ # 定时请求的api接口 其实就是定时去请求触发对应的事件

    if cron_type == 1:
        url = url + '/task/apitask'  # 配置
        #
        d = {
            'task_name': task_name, # 每隔五分钟报警
            'description': description,
            'cron_spec': cron_spec,
            'command': command,
            'server_type': 1,
            'create_id': 1,
            'group_id': 1,
            'concurrent': 0,
            'server_ids': 0,
            'timeout': 0,
            'is_notify': 0,
            'notify_type': 0,
            'notify_tpl_id': 0,
            'notify_user_ids': 0,
            'id': cron_id,
        }
    elif cron_type == 2:
        url = url + '/task/apistart'  # 启动
        d = {
            'id': cron_id
        }

    elif cron_type == 3:
        url = url + '/task/apipause'  # 暂停
        d = {
            'id': cron_id
        }
    elif cron_type == 4:
        url = url + '/task/edit'  # 编辑
        d = {
            'id': cron_id,
            'task_name': task_name,
            'description': description,
            'cron_spec': cron_spec,  # 这里就是 定时任务时间的表达式
            'command': command,
        }

    try:
        r = requests.post(url=url, data=d, verify=False)
        text = json.loads(r.text)
        if r.text is not None and text['status'] == 0:
            return text['message']
        else:
            return
    except:
        print('定时任务设置错误')
        return


def cronSpec(warning_type, target_id, start_time, end_time, interval_minute, config_id):
    """
    Cron表达式转换
    """
    warning_type = int(warning_type)
    host = current_app.config.get('DOMAIN_HOST')
    api_route = WarningTarget.query.filter_by(target_id=target_id).with_entities(WarningTarget.route).first()['route']
    cron_route = host + api_route
    # curl localhost:3000/api/json -X POST -d '{"hello": "world"}' --header "Content-Type: application/json"
    command = 'curl ' + cron_route + ' -X POST -F ' + '"' + 'config_id=' + str(config_id) + '"'
    spec = '0 */15  * * *'
    if warning_type == 1:
        spec = '0 */10  * * *'
    elif warning_type == 2:
        # interval_minute
        # 0 0/5 14,15 * * ? 每天下午2点到255  下午3点到355 每隔五分钟执行一次
        # 0 0/30 9-17 * * ? 每天早上9点到晚上5点 每隔30分钟执行一次
        # 0 5,10 14,15 * * ? 每天下午25210 分别触发一次  下午35310 分别触发一次
        # 提取时间段中的整点 9 15
        start_int = timeFormat(start_time)
        end_int = timeFormat(end_time)
        section = str(start_int) + '-' + str(end_int)
        spec = '0 0/' + str(interval_minute) + ' ' + section + ' * * ?'
    elif warning_type == 3:
        # 列如 230-630 要转换成 300  400 500 6:00 各执行一次
        # 0,0,0, 10,14,16 * * ? 代表每天上午10点 下午24点各执行一次
        interval_minute = 59  # 固定每隔一小时 todo
        start_num = timeFormat(start_time)
        end_num = timeFormat(end_time)
        section = []
        if int(start_num) + 1 == int(end_num):
            section_str = int(end_num)
        else:
            for i in range(int(start_num) + 1, int(end_num) + 1):
                section.append(str(i))
            section_str = ','.join(section)
        # 0 59 1-3 ? * *
        spec = '0 0' + ' ' + str(section_str) + ' * * ?'
    db.session.close()
    return {'spec': spec, 'command': command}
10个月前 评论
Neutrino (楼主) 10个月前

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!