帮忙解决一个小问题,请喝咖啡,谢谢!
队列里面有一部分是耗时的,想要这部分队列单独拿出来执行,不影响其他队列,以下代码是否有问题,该怎么修改
<?php
namespace app\command\workerman;
use app\command\WorkerCommand;
use app\dict\schedule\ScheduleDict;
use app\model\sys\SysSchedule;
use app\service\core\addon\CoreAddonService;
use app\service\core\schedule\CoreScheduleService;
use think\console\Command;
use think\console\Input;
use think\console\input\Argument;
use think\console\input\Option;
use think\console\Output;
use think\facade\Log;
use Workerman\Connection\TcpConnection;
use Workerman\Crontab\Crontab;
use Workerman\RedisQueue\Client;
use Workerman\Timer;
use Workerman\Worker;
class Workerman extends Command
{
use WorkerCommand;
public function configure()
{
// 指令配置
$this->setName('workerman')
->addArgument('action', Argument::OPTIONAL, "start|stop|restart|reload|status|connections", 'start')
->addOption('mode', 'm', Option::VALUE_OPTIONAL, 'Run the workerman server in daemon mode.')
->setDescription('Workerman,高性能PHP应用容器');
}
/**
* 执行任务
* @return void
*/
protected function execute(Input $input, Output $output)
{
$this->resetCli($input, $output);
//计划任务
Worker::$pidFile = runtime_path() . 'workerman_schedule.pid';
$worker = new Worker();
$worker->name = 'schedule_work';
$worker->count = 1;
// 设置时区,避免运行结果与预期不一致
date_default_timezone_set('PRC');
$worker->onWorkerStart = function () use ($output) {
$output->writeln('[' . date('Y-m-d H:i:s') . ']' . " Schedule Starting...");
// // 每分钟的第1秒执行.用于计划任务是否仍在执行
new Crontab('*/10 * * * * *', function () {
$file = root_path('runtime') . '.schedule';
file_put_contents($file, time());
});
$core_schedule_service = new CoreScheduleService();
//查询所有的计划任务
$task_list = $core_schedule_service->getList(['status' => ScheduleDict::ON]);
foreach ($task_list as $item) {
//获取定时任务时间字符串
new Crontab($this->getCrontab($item['time']), function () use ($core_schedule_service, $item, $output) {
if (!empty($item['class'])) {
$core_schedule_service->execute($item, $output);
}
});
}
$output->writeln('[' . date('Y-m-d H:i:s') . ']' . " Schedule Started.");
};
//消息队列
Worker::$pidFile = runtime_path() . 'workerman_queue.pid';
Worker::$logFile = runtime_path() . 'workerman.log';
$worker = new Worker();
$worker->name = 'queue_work';
$worker->count = 1;
$worker->onWorkerStart = function () use ($output) {
$output->writeln('[' . date('Y-m-d H:i:s') . ']' . " Queue Starting...");
// 定时,每10秒一次
Timer::add(30, function () use ($output) {
(new SysSchedule())->select();
});
$redis_option = [
'connect_timeout' => 10,
'max_attempts' => 3,
'retry_seconds' => 5,
'prefix' => md5(root_path())
];
if (!empty(env('redis.redis_password'))) {
$redis_option['auth'] = env('redis.redis_password');
}
$redis_option['db'] = env('redis.select');
$client = new Client('redis://' . env('redis.redis_hostname') . ':' . env('redis.port'), $redis_option);
$queue_list = $this->getAllQueue();
$output->writeln('[' . date('Y-m-d H:i:s') . ']' . " queue_list" . json_encode($queue_list));
foreach ($queue_list as $queue_class_name) {
$queue_class_name = str_replace('.php', '', $queue_class_name);
// 订阅
$client->subscribe($queue_class_name, function ($data) use ($queue_class_name, $output) {
$output->writeln('[queue][' . date('Y-m-d H:i:s') . ']' . " Processing:" . $queue_class_name);
try {
$class_name = '\\' . $queue_class_name;
$class = new $class_name();
$class->fire($data);
} catch (\Throwable $e) {
Log::write(date('Y-m-d H:i:s') . ',队列有错误:' . $queue_class_name . '_' . $e->getMessage() . '_' . $e->getFile() . '_' . $e->getLine());
}
$output->writeln('[queue][' . date('Y-m-d H:i:s') . ']' . " Processed:" . $queue_class_name);
});
}
// 消费失败触发的回调(可选)
$client->onConsumeFailure(function (\Throwable $exception, $package) use ($output) {
$output->writeln('[queue]队列 ' . $package['queue'] . " 消费失败," . $exception->getMessage());
});
$output->writeln('[' . date('Y-m-d H:i:s') . ']' . " Queue Started.");
};
Worker::$pidFile = runtime_path() . 'workerman_extra_queue.pid';
Worker::$logFile = runtime_path() . 'workerman_extra_queue.log';
$worker = new Worker();
$worker->name = 'extra_queue_work';
$worker->count = 3;
$worker->onWorkerStart = function () use ($output) {
$output->writeln('[' . date('Y-m-d H:i:s') . ']' . "Extra Queue Starting...");
$redis_option = [
'connect_timeout' => 10,
'max_attempts' => 3,
'retry_seconds' => 5,
'prefix' => md5(root_path())
];
if (!empty(env('redis.redis_password'))) {
$redis_option['auth'] = env('redis.redis_password');
}
$redis_option['db'] = env('redis.select');
$client = new Client('redis://' . env('redis.redis_hostname') . ':' . env('redis.port'), $redis_option);
$queue_list = $this->getExtraQueue();
$output->writeln('[' . date('Y-m-d H:i:s') . ']' . " extra_queue_list" . json_encode($queue_list));
foreach ($queue_list as $queue_class_name) {
$queue_class_name = str_replace('.php', '', $queue_class_name);
// 订阅
$client->subscribe($queue_class_name, function ($data) use ($queue_class_name, $output) {
$output->writeln('[extra_queue][' . date('Y-m-d H:i:s') . ']' . " Processing:" . $queue_class_name);
try {
$class_name = '\\' . $queue_class_name;
$class = new $class_name();
$class->fire($data);
} catch (\Throwable $e) {
Log::write(date('Y-m-d H:i:s') . ',队列有错误:' . $queue_class_name . '_' . $e->getMessage() . '_' . $e->getFile() . '_' . $e->getLine());
}
$output->writeln('[extra_queue][' . date('Y-m-d H:i:s') . ']' . " Processed:" . $queue_class_name);
});
}
// 消费失败触发的回调(可选)
$client->onConsumeFailure(function (\Throwable $exception, $package) use ($output) {
$output->writeln('[extra_queue]队列 ' . $package['queue'] . " 消费失败," . $exception->getMessage());
});
$output->writeln('[' . date('Y-m-d H:i:s') . ']' . "Extra Queue Started.");
};
Worker::runAll();
}
/**
* 获取计划任务所需的时间字符串
* 0 1 2 3 4 5
* | | | | | |
* | | | | | +------ day of week (0 - 6) (Sunday=0)
* | | | | +------ month (1 - 12)
* | | | +-------- day of month (1 - 31)
* | | +---------- hour (0 - 23)
* | +------------ min (0 - 59)
* +-------------- sec (0-59)[可省略,如果没有0位,则最小时间粒度是分钟]
* @param $data
* @return string
*/
protected function getCrontab($data): string
{
$sec = $data['sec'] ?? '*';
$min = $data['min'] ?? '*';
$hour = $data['hour'] ?? '*';
$day = $data['day'] ?? '*';
$week = $data['week'] ?? '*';
$type = $data['type'] ?? '';
switch ($type) {
case 'sec':// 每隔几秒
$crontab = '*/' . $sec . ' * * * * *';
break;
case 'min':// 每隔几分
$crontab = '0 */' . $min . ' * * * *';
break;
case 'hour':// 每隔几时第几分钟执行
$crontab = '0 ' . $min . ' */' . $hour . ' * * *';
break;
case 'day':// 每隔几日第几小时第几分钟执行
$crontab = '0 ' . $min . ' ' . $hour . ' */' . $day . ' * *';
break;
case 'week':// 每周一次,周几具体时间执行
$crontab = '0 ' . $min . ' ' . $hour . ' * * ' . $week;
break;
case 'month':// 每月一次,某日具体时间执行
$crontab = '0 ' . $min . ' ' . $hour . ' ' . $day . ' * *';
break;
}
return $crontab ?? '0 */1 * * * *';
}
/**
* 捕获所有队列任务
* @return array
*/
public function getAllQueue()
{
$class_list = [];
$system_dir = root_path() . 'app' . DIRECTORY_SEPARATOR . 'job';
$addon_dir = root_path() . 'addon' . DIRECTORY_SEPARATOR;
if (is_dir($system_dir)) {
search_dir($system_dir, $app_data, root_path());
$class_list = array_merge($class_list, $app_data);
}
$addons = (new CoreAddonService())->getInstallAddonList();
foreach ($addons as $v) {
$addon_path = $addon_dir . $v['key'] . DIRECTORY_SEPARATOR . 'app' . DIRECTORY_SEPARATOR . 'job';
if (is_dir($addon_path)) {
search_dir($addon_path, $addon_data, root_path());
if ($addon_data) $class_list = array_merge($class_list, array_filter($addon_data));
}
}
foreach ($class_list as &$v) {
$v = str_replace('.php', '', $v);
$v = str_replace('/', '\\', $v);
}
if (!empty($class_list)) {
$extra_queue_list = $this->getExtraQueue();
$class_list = array_filter($class_list, function($class) use ($extra_queue_list) {
return !in_array($class, $extra_queue_list);
});
$class_list = array_values($class_list);
}
return $class_list;
}
/**
* 捕获所有队列任务
* @return array
*/
public function getExtraQueue()
{
return [
"app\\job\\weapp\\job\\WeappSyncAuthorizeDramaList",
"app\\job\\weapp\\job\\WeappSyncDramaList"
];
}
}
关于 LearnKu
推荐文章: