Laravel+workman+redis实现多进程异步任务处理
前言
PHP本身并不直接支持多线程编程,因为PHP的设计初衷是作为一个脚本语言,主要面向的是Web开发。不过我们可以使用一些扩展和库来实现多进程的功能,提高系统性能,比如workerman和swoole。通过多进程异步执行任务。
安装workman
简介
文档:
环境要求
- PHP >= 7.2
- Composer >= 2.0
- Linux系统
安装扩展
composer require workerman/gateway-worker
Laravel中使用workman创建多进程任务
1.创建 artisan命令
目录下建立命令行文件
WorkermanCommand
php artisan make:command WorkermanCommand
文件内容如下
protected $signature = 'workman {action} {--d}'; protected $description = 'Start a Workerman server.'; public function handle() { global $argv; $action = $this->argument('action'); $argv[0] = 'wk'; $argv[1] = $action; $argv[2] = $this->option('d') ? '-d' : ''; $this->start(); } private function start() { $this->startGateWay(); $this->startBusinessWorker(); $this->startRegister(); Worker::runAll(); } private function startBusinessWorker() { $worker = new BusinessWorker(); $worker->name = 'BusinessWorker'; $worker->count = 1; $worker->registerAddress = '127.0.0.1:1236'; $worker->eventHandler = \App\Workerman\Events::class; //指定workman监听事件文件 } private function startGateWay() { $gateway = new Gateway("websocket://0.0.0.0:2346"); $gateway->name = 'Gateway'; $gateway->count = 1; $gateway->lanIp = '127.0.0.1'; $gateway->startPort = 2300; $gateway->pingInterval = 30; $gateway->pingNotResponseLimit = 0; $gateway->pingData = '{"type":"@heart@"}'; $gateway->registerAddress = '127.0.0.1:1236'; } private function startRegister() { new Register('text://0.0.0.0:1236'); }
2.创建监听事件
创建
app/Workerman/Events.php
文件来监听处理Workman的各种事件<?php namespace App\Workerman; class Events { public static function onWorkerStart($worker) { echo 'workman进程启动,进程id ' . $worker->id . PHP_EOL; //监听redis队列 $redis = app('redis.connection'); while (true) { //读取redis队列 $data = $redis->lPop('test-queue'); if ($data) { //处理业务 echo '进程id ' . $worker->id . ' 开始处理业务数据' . $data . PHP_EOL; //模拟耗时任务 sleep(5); echo '进程id ' . $worker->id . ' 处理业务数据' . $data . ' 完成' . PHP_EOL; } else { echo '进程id ' . $worker->id . ' 空闲中,休息5秒' . PHP_EOL; sleep(5); } } } public static function onConnect($client_id) { } public static function onWebSocketConnect($client_id, $data) { } public static function onMessage($client_id, $message) { } public static function onClose($client_id) { } }
3.启动workman
php artisan workman start -d
看到下面的信息说明启动成功
root@2e870aece68b:/var/www/hello# php artisan workman start Workerman[wk] start in DEBUG mode -------------------------------------------- WORKERMAN --------------------------------------------- Workerman version:4.1.15 PHP version:7.4.33 Event-Loop:\Workerman\Events\Select --------------------------------------------- WORKERS ---------------------------------------------- proto user worker listen processes status tcp root Gateway websocket://0.0.0.0:2346 4 [OK] tcp root BusinessWorker none 4 [OK] tcp root Register text://0.0.0.0:1236 1 [OK] ----------------------------------------------------------------------------------------------------
4.在laravel框架中新增api接口将任务加入redis队列,例如我这里写一个添加redis列表元素的方法
<?php namespace App\Http\Controllers;
class IndexController extends Controller
{
//新增队列数据
public function addQueue()
{
$redis = app(‘redis.connection’);
$redis->rPush(‘test-queue’, ‘1’);
$redis->rPush(‘test-queue’, ‘2’);
$redis->rPush(‘test-queue’, ‘3’);
$redis->rPush(‘test-queue’, ‘4’);
$redis->rPush(‘test-queue’, ‘5’);
$redis->rPush(‘test-queue’, ‘6’);
$redis->rPush(‘test-queue’, ‘7’);
echo ‘success’;
}
}
加完方法后记得新增路由
- 5.访问上面的将任务加入redis队列接口,直接用浏览器或者命令行curl访问`http://网站域名/api/index/addQueue`即可,然后你就可以看到所有的redis队列将被workman进程分配并执行,以下是我启动workman->添加redis队列->workman处理->队列处理结束打印的结果:
root@2e870aece68b:/var/www/hello# php artisan workman start
Workerman[wk] start in DEBUG mode
——————————————– WORKERMAN ———————————————
Workerman version:4.1.15 PHP version:7.4.33 Event-Loop:\Workerman\Events\Select
——————————————— WORKERS ———————————————-
proto user worker listen processes status
tcp root Gateway websocket://0.0.0.0:2346 4 [OK]
tcp root BusinessWorker none 4 [OK]
tcp root Register text://0.0.0.0:1236 1 [OK]
Press Ctrl+C to stop. Start success.
workman进程启动,进程id 2
workman进程启动,进程id 0
workman进程启动,进程id 1
进程id 0 空闲中,休息5秒
workman进程启动,进程id 3
进程id 2 空闲中,休息5秒
进程id 1 空闲中,休息5秒
进程id 3 空闲中,休息5秒
进程id 0 开始处理业务数据1
进程id 2 开始处理业务数据3
进程id 3 开始处理业务数据4
进程id 1 开始处理业务数据2
进程id 0 处理业务数据1 完成
进程id 0 开始处理业务数据5
进程id 3 处理业务数据4 完成
进程id 2 处理业务数据3 完成
进程id 1 处理业务数据2 完成
进程id 3 开始处理业务数据6
进程id 2 开始处理业务数据7
进程id 1 空闲中,休息5秒
进程id 0 处理业务数据5 完成
进程id 0 空闲中,休息5秒
进程id 3 处理业务数据6 完成
进程id 2 处理业务数据7 完成
进程id 2 空闲中,休息5秒
进程id 3 空闲中,休息5秒
进程id 1 空闲中,休息5秒
本作品采用《CC 协议》,转载必须注明作者和本文链接
异步任务laravel扔到amqp hyperf起个消费进程去消费就好了 简单明了
哪里体现了多线程?
请问和把任务投递到队列,队列开启4个进程消费有区别吗?
并且、好像、应该 Workman 是多进程处理的吧

是4个worker进程啦,不是线程。