帮忙解决一个小问题,请喝咖啡,谢谢!

AI摘要
用户询问如何将耗时队列单独执行以避免影响其他队列。当前代码已通过创建额外Worker进程(extra_queue_work)实现隔离,但需确认耗时队列是否已正确配置到getExtraQueue方法中。建议检查队列分类逻辑,确保耗时任务仅由额外进程处理。

队列里面有一部分是耗时的,想要这部分队列单独拿出来执行,不影响其他队列,以下代码是否有问题,该怎么修改

<?php

namespace app\command\workerman;

use app\command\WorkerCommand;
use app\dict\schedule\ScheduleDict;
use app\model\sys\SysSchedule;
use app\service\core\addon\CoreAddonService;
use app\service\core\schedule\CoreScheduleService;
use think\console\Command;
use think\console\Input;
use think\console\input\Argument;
use think\console\input\Option;
use think\console\Output;
use think\facade\Log;
use Workerman\Connection\TcpConnection;
use Workerman\Crontab\Crontab;
use Workerman\RedisQueue\Client;
use Workerman\Timer;
use Workerman\Worker;

class Workerman extends Command
{
    use WorkerCommand;

    public function configure()
    {
        // 指令配置
        $this->setName('workerman')
            ->addArgument('action', Argument::OPTIONAL, "start|stop|restart|reload|status|connections", 'start')
            ->addOption('mode', 'm', Option::VALUE_OPTIONAL, 'Run the workerman server in daemon mode.')
            ->setDescription('Workerman,高性能PHP应用容器');
    }

    /**
     * 执行任务
     * @return void
     */
    protected function execute(Input $input, Output $output)
    {
        $this->resetCli($input, $output);
        //计划任务
        Worker::$pidFile = runtime_path() . 'workerman_schedule.pid';
        $worker = new Worker();
        $worker->name = 'schedule_work';
        $worker->count = 1;
        // 设置时区,避免运行结果与预期不一致
        date_default_timezone_set('PRC');
        $worker->onWorkerStart = function () use ($output) {
            $output->writeln('[' . date('Y-m-d H:i:s') . ']' . " Schedule Starting...");
//            // 每分钟的第1秒执行.用于计划任务是否仍在执行
            new Crontab('*/10 * * * * *', function () {
                $file = root_path('runtime') . '.schedule';
                file_put_contents($file, time());
            });
            $core_schedule_service = new CoreScheduleService();
            //查询所有的计划任务
            $task_list = $core_schedule_service->getList(['status' => ScheduleDict::ON]);

            foreach ($task_list as $item) {
                //获取定时任务时间字符串
                new Crontab($this->getCrontab($item['time']), function () use ($core_schedule_service, $item, $output) {
                    if (!empty($item['class'])) {
                        $core_schedule_service->execute($item, $output);
                    }
                });
            }
            $output->writeln('[' . date('Y-m-d H:i:s') . ']' . " Schedule Started.");
        };

        //消息队列
        Worker::$pidFile = runtime_path() . 'workerman_queue.pid';
        Worker::$logFile = runtime_path() . 'workerman.log';
        $worker = new Worker();
        $worker->name = 'queue_work';
        $worker->count = 1;
        $worker->onWorkerStart = function () use ($output) {
            $output->writeln('[' . date('Y-m-d H:i:s') . ']' . " Queue Starting...");
            // 定时,每10秒一次
            Timer::add(30, function () use ($output) {
                (new SysSchedule())->select();
            });
            $redis_option = [
                'connect_timeout' => 10,
                'max_attempts' => 3,
                'retry_seconds' => 5,
                'prefix' => md5(root_path())
            ];
            if (!empty(env('redis.redis_password'))) {
                $redis_option['auth'] = env('redis.redis_password');
            }
            $redis_option['db'] = env('redis.select');
            $client = new Client('redis://' . env('redis.redis_hostname') . ':' . env('redis.port'), $redis_option);
            $queue_list = $this->getAllQueue();
            $output->writeln('[' . date('Y-m-d H:i:s') . ']' . " queue_list" . json_encode($queue_list));
            foreach ($queue_list as $queue_class_name) {
                $queue_class_name = str_replace('.php', '', $queue_class_name);
                // 订阅
                $client->subscribe($queue_class_name, function ($data) use ($queue_class_name, $output) {
                    $output->writeln('[queue][' . date('Y-m-d H:i:s') . ']' . " Processing:" . $queue_class_name);
                    try {
                        $class_name = '\\' . $queue_class_name;
                        $class = new  $class_name();
                        $class->fire($data);
                    } catch (\Throwable $e) {
                        Log::write(date('Y-m-d H:i:s') . ',队列有错误:' . $queue_class_name . '_' . $e->getMessage() . '_' . $e->getFile() . '_' . $e->getLine());
                    }
                    $output->writeln('[queue][' . date('Y-m-d H:i:s') . ']' . " Processed:" . $queue_class_name);
                });
            }
            // 消费失败触发的回调(可选)
            $client->onConsumeFailure(function (\Throwable $exception, $package) use ($output) {
                $output->writeln('[queue]队列 ' . $package['queue'] . " 消费失败," . $exception->getMessage());
            });
            $output->writeln('[' . date('Y-m-d H:i:s') . ']' . " Queue Started.");
        };

        Worker::$pidFile = runtime_path() . 'workerman_extra_queue.pid';
        Worker::$logFile = runtime_path() . 'workerman_extra_queue.log';
        $worker = new Worker();
        $worker->name = 'extra_queue_work';
        $worker->count = 3;
        $worker->onWorkerStart = function () use ($output) {
            $output->writeln('[' . date('Y-m-d H:i:s') . ']' . "Extra Queue Starting...");
            $redis_option = [
                'connect_timeout' => 10,
                'max_attempts' => 3,
                'retry_seconds' => 5,
                'prefix' => md5(root_path())
            ];
            if (!empty(env('redis.redis_password'))) {
                $redis_option['auth'] = env('redis.redis_password');
            }
            $redis_option['db'] = env('redis.select');
            $client = new Client('redis://' . env('redis.redis_hostname') . ':' . env('redis.port'), $redis_option);
            $queue_list = $this->getExtraQueue();
            $output->writeln('[' . date('Y-m-d H:i:s') . ']' . " extra_queue_list" . json_encode($queue_list));

            foreach ($queue_list as $queue_class_name) {
                $queue_class_name = str_replace('.php', '', $queue_class_name);
                // 订阅
                $client->subscribe($queue_class_name, function ($data) use ($queue_class_name, $output) {
                    $output->writeln('[extra_queue][' . date('Y-m-d H:i:s') . ']' . " Processing:" . $queue_class_name);
                    try {
                        $class_name = '\\' . $queue_class_name;
                        $class = new  $class_name();
                        $class->fire($data);
                    } catch (\Throwable $e) {
                        Log::write(date('Y-m-d H:i:s') . ',队列有错误:' . $queue_class_name . '_' . $e->getMessage() . '_' . $e->getFile() . '_' . $e->getLine());
                    }
                    $output->writeln('[extra_queue][' . date('Y-m-d H:i:s') . ']' . " Processed:" . $queue_class_name);
                });
            }
            // 消费失败触发的回调(可选)
            $client->onConsumeFailure(function (\Throwable $exception, $package) use ($output) {
                $output->writeln('[extra_queue]队列 ' . $package['queue'] . " 消费失败," . $exception->getMessage());
            });
            $output->writeln('[' . date('Y-m-d H:i:s') . ']' . "Extra Queue Started.");
        };

        Worker::runAll();
    }

    /**
     * 获取计划任务所需的时间字符串
     * 0   1   2   3   4   5
     * |   |   |   |   |   |
     * |   |   |   |   |   +------ day of week (0 - 6) (Sunday=0)
     * |   |   |   |   +------ month (1 - 12)
     * |   |   |   +-------- day of month (1 - 31)
     * |   |   +---------- hour (0 - 23)
     * |   +------------ min (0 - 59)
     * +-------------- sec (0-59)[可省略,如果没有0位,则最小时间粒度是分钟]
     * @param $data
     * @return string
     */
    protected function getCrontab($data): string
    {
        $sec = $data['sec'] ?? '*';
        $min = $data['min'] ?? '*';
        $hour = $data['hour'] ?? '*';
        $day = $data['day'] ?? '*';
        $week = $data['week'] ?? '*';
        $type = $data['type'] ?? '';
        switch ($type) {
            case 'sec':// 每隔几秒
                $crontab = '*/' . $sec . ' * * * * *';
                break;
            case 'min':// 每隔几分
                $crontab = '0 */' . $min . ' * * * *';
                break;
            case 'hour':// 每隔几时第几分钟执行
                $crontab = '0 ' . $min . ' */' . $hour . ' * * *';
                break;
            case 'day':// 每隔几日第几小时第几分钟执行
                $crontab = '0 ' . $min . ' ' . $hour . ' */' . $day . ' * *';
                break;
            case 'week':// 每周一次,周几具体时间执行
                $crontab = '0 ' . $min . ' ' . $hour . ' * * ' . $week;
                break;
            case 'month':// 每月一次,某日具体时间执行
                $crontab = '0 ' . $min . ' ' . $hour . ' ' . $day . ' * *';
                break;
        }
        return $crontab ?? '0 */1 * * * *';
    }

    /**
     * 捕获所有队列任务
     * @return array
     */
    public function getAllQueue()
    {
        $class_list = [];
        $system_dir = root_path() . 'app' . DIRECTORY_SEPARATOR . 'job';
        $addon_dir = root_path() . 'addon' . DIRECTORY_SEPARATOR;
        if (is_dir($system_dir)) {
            search_dir($system_dir, $app_data, root_path());
            $class_list = array_merge($class_list, $app_data);
        }

        $addons = (new CoreAddonService())->getInstallAddonList();
        foreach ($addons as $v) {

            $addon_path = $addon_dir . $v['key'] . DIRECTORY_SEPARATOR . 'app' . DIRECTORY_SEPARATOR . 'job';
            if (is_dir($addon_path)) {
                search_dir($addon_path, $addon_data, root_path());
                if ($addon_data) $class_list = array_merge($class_list, array_filter($addon_data));
            }
        }

        foreach ($class_list as &$v) {
            $v = str_replace('.php', '', $v);
            $v = str_replace('/', '\\', $v);
        }
        if (!empty($class_list)) {
            $extra_queue_list = $this->getExtraQueue();
            $class_list = array_filter($class_list, function($class) use ($extra_queue_list) {
                return !in_array($class, $extra_queue_list);
            });
            $class_list = array_values($class_list);
        }
        return $class_list;
    }

    /**
     * 捕获所有队列任务
     * @return array
     */
    public function getExtraQueue()
    {
        return [
            "app\\job\\weapp\\job\\WeappSyncAuthorizeDramaList",
            "app\\job\\weapp\\job\\WeappSyncDramaList"
        ];
    }
}
《L05 电商实战》
从零开发一个电商项目,功能包括电商后台、商品 & SKU 管理、购物车、订单管理、支付宝支付、微信支付、订单退款流程、优惠券等
《G01 Go 实战入门》
从零开始带你一步步开发一个 Go 博客项目,让你在最短的时间内学会使用 Go 进行编码。项目结构很大程度上参考了 Laravel。
最佳答案

laravel 可以创建不同的队列吧。例如创建emails,imports, orders,任务分发的时候指定相关队列就行,互不影响的。

1天前 评论
讨论数量: 3

laravel 可以创建不同的队列吧。例如创建emails,imports, orders,任务分发的时候指定相关队列就行,互不影响的。

1天前 评论

运行一下,ps看下进程

1天前 评论

如果你在加入队列之前就知道这个任务耗时,则加入到一个不同的指定名称的队列。如果加入队列前并不知道其耗时,那么应该在执行过程中监测执行时间,超过指定时限则回退,并加入到另一个指定名称的队列。

16小时前 评论

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!