Thinkphp+workman+redis实现多进程异步任务处理 
                                                    
                        
                    
                    
  
                    
                    前言
PHP本身并不直接支持多线程编程,因为PHP的设计初衷是作为一个脚本语言,主要面向的是Web开发。不过我们可以使用一些扩展和库来实现多进程的功能,提高系统性能,比如workerman和swoole。通过多进程异步执行任务。
安装workman
- 简介 - 文档: - Workerman · ThinkPHP5.0完全开发手册 · 看云 (kancloud.cn) - ThinkPHP 5.1 Workerman 快速上手指南 · ThinkPHP5.1 Workerman上手指南 · 看云 (kancloud.cn) 
- 环境要求 - PHP >= 7.2
- Composer >= 2.0
 
- 安装扩展 - composer require topthink/think-worker
Thinkphp5.0使用workman创建多进程任务
- 1.在项目根目录(注意不是pubcli目录)下创建文件 - server.php,文件内容如下- <?php define('APP_PATH', __DIR__ . '/application/'); define('BIND_MODULE','workman/Worker'); // 加载框架引导文件 require __DIR__ . '/thinkphp/start.php';
- 2.在根目录创建 - \application\workman\controller目录,然后在该目录下新建- Worker.php,文件内容如下- <?php namespace app\workman\controller; use think\worker\Server; class Worker extends Server { //websocket服务端地址和端口 protected $socket = 'websocket://0.0.0.0:2346'; //设置进程数,默认为4,根据自己的需要和服务器配置合理设置,一般设置进程数为CPU核数的1倍-3倍 protected $processes = 4; /** * 收到信息 * @param $connection * @param $data */ public function onMessage($connection, $data) { } /** * 当连接建立时触发的回调函数 * @param $connection */ public function onConnect($connection) { } /** * 当连接断开时触发的回调函数 * @param $connection */ public function onClose($connection) { } /** * 当客户端的连接上发生错误时触发 * @param $connection * @param $code * @param $msg */ public function onError($connection, $code, $msg) { echo "error $code $msg\n"; } /** * 每个进程启动 * @param $worker */ public function onWorkerStart($worker) { echo 'workman进程启动,进程id ' . $worker->id . PHP_EOL; //监听redis队列 $redis = new \Redis(); $redis->connect('192.168.204.128', 6379); while (true) { //读取redis队列 $data = $redis->lPop('test-queue'); if ($data) { //处理业务 echo '进程id ' . $worker->id . ' 开始处理业务数据' . $data . PHP_EOL; //模拟耗时任务 sleep(5); echo '进程id ' . $worker->id . ' 处理业务数据' . $data . ' 完成' . PHP_EOL; } else { echo '进程id ' . $worker->id . ' 空闲中,休息5秒'. PHP_EOL; sleep(5); } } } }- 这里主要的功能就是创建一个workman的websocket服务端(使用其他tcp服务也是可以的),然后在每个进程启动的时候监听redis队列,利用这些进程异步去处理redis队列里的任务,代码里的模拟耗时任务可以直接替换成你在tp框架里写的耗时任务。 
- 3.在tp框架中将任务加入redis队列,例如我这里写一个添加redis列表元素的方法 - <?php namespace app\index\controller; class Index { //新增队列数据 public function addQueue() { $redis = new \Redis(); $redis->connect('192.168.204.128', 6379); $redis->rPush('test-queue', '1'); $redis->rPush('test-queue', '2'); $redis->rPush('test-queue', '3'); $redis->rPush('test-queue', '4'); $redis->rPush('test-queue', '5'); $redis->rPush('test-queue', '6'); $redis->rPush('test-queue', '7'); echo 'success'; } }
- 4.启动workman - 直接在根目录下运行第一步创建的 - server.php- php server.php start- 可以看到下面的输出 - Workerman[server.php] start in DEBUG mode -------------------------------------------- WORKERMAN -------------------------------------------- Workerman version:3.5.35 PHP version:7.4.33 Event-Loop:\Workerman\Events\Select --------------------------------------------- WORKERS --------------------------------------------- proto user worker listen processes status tcp root none websocket://0.0.0.0:2346 4 [OK] --------------------------------------------------------------------------------------------------- Press Ctrl+C to stop. Start success. workman进程启动,进程id 0 workman进程启动,进程id 3 workman进程启动,进程id 2 workman进程启动,进程id 1 进程id 1 空闲中,休息5秒 进程id 0 空闲中,休息5秒 进程id 2 空闲中,休息5秒 进程id 3 空闲中,休息5秒- 注意:如果是在windows下,设置进程数是没有用的,就只会启动一个worker进程,也就是只有单进程,要体验多进程只能在Linux环境下,同时也无法守护进程,cmd窗口关掉后服务即停止 
- 5.访问tp框架中的将任务加入redis队列接口,直接用浏览器或者命令行curl访问 - http://网站域名/index/index/addQueue即可,然后你就可以看到所有的redis队列将被workman进程分配并执行,以下是我启动workman->添加redis队列->workman处理->队列处理结束打印的结果:- Workerman[server.php] start in DEBUG mode -------------------------------------------- WORKERMAN -------------------------------------------- Workerman version:3.5.35 PHP version:7.4.33 Event-Loop:\Workerman\Events\Select --------------------------------------------- WORKERS --------------------------------------------- proto user worker listen processes status tcp root none websocket://0.0.0.0:2346 4 [OK] --------------------------------------------------------------------------------------------------- Press Ctrl+C to stop. Start success. workman进程启动,进程id 0 workman进程启动,进程id 3 workman进程启动,进程id 2 workman进程启动,进程id 1 进程id 1 空闲中,休息5秒 进程id 0 空闲中,休息5秒 进程id 2 空闲中,休息5秒 进程id 3 空闲中,休息5秒 进程id 2 空闲中,休息5秒 进程id 1 空闲中,休息5秒 进程id 3 空闲中,休息5秒 进程id 0 空闲中,休息5秒 (这里开始将任务加入redis队列,然后workman开始消费队列) 进程id 0 开始处理业务数据1 进程id 1 开始处理业务数据3 进程id 3 开始处理业务数据4 进程id 2 开始处理业务数据2 进程id 0 处理业务数据1 完成 进程id 1 处理业务数据3 完成 进程id 2 处理业务数据2 完成 进程id 1 开始处理业务数据6 进程id 3 处理业务数据4 完成 进程id 0 开始处理业务数据5 进程id 3 开始处理业务数据7 进程id 2 空闲中,休息5秒 进程id 1 处理业务数据6 完成 进程id 0 处理业务数据5 完成 进程id 3 处理业务数据7 完成 (到这里所有的7项任务已经处理完成) 进程id 1 空闲中,休息5秒 进程id 0 空闲中,休息5秒 进程id 3 空闲中,休息5秒 进程id 2 空闲中,休息5秒
TP5.1及TP6使用workman创建多进程任务
- 1.在根目录创建 - \application\workman\controller目录,然后在该目录下新建- Worker.php,文件内容如下- <?php namespace app\workman; use think\worker\Server; class Worker extends Server { protected $host = '127.0.0.1'; protected $port = 2346; protected $protocol = 'websocket'; protected $option = [ 'count' => 4, //设置进程数,默认为4,根据自己的需要和服务器配置合理设置,一般设置进程数为CPU核数的1倍-3倍 'name' => 'think' ];
  /**
   * 收到信息
   * @param $connection
   * @param $data
   */
  public function onMessage($connection, $data)
  {
  }
  /**
   * 当连接建立时触发的回调函数
   * @param $connection
   */
  public function onConnect($connection)
  {
  }
  /**
   * 当连接断开时触发的回调函数
   * @param $connection
   */
  public function onClose($connection)
  {
  }
  /**
   * 当客户端的连接上发生错误时触发
   * @param $connection
   * @param $code
   * @param $msg
   */
  public function onError($connection, $code, $msg)
  {
      echo "error $code $msg\n";
  }
  /**
   * 每个进程启动
   * @param $worker
   */
  public function onWorkerStart($worker)
  {
      echo 'workman进程启动,进程id ' . $worker->id . PHP_EOL;
      //监听redis队列
      $redis = new \Redis();
      $redis->connect('127.0.0.1', 6379);
      while (true) {
          //读取redis队列
          $data = $redis->lPop('test-queue');
          if ($data) {
              //处理业务
              echo '进程id ' . $worker->id . ' 开始处理业务数据' . $data . PHP_EOL;
              //模拟耗时任务
              sleep(5);
              echo '进程id ' . $worker->id . ' 处理业务数据' . $data . ' 完成' . PHP_EOL;
          } else {
              echo '进程id ' . $worker->id . ' 空闲中,休息5秒' . PHP_EOL;
              sleep(5);
          }
      }
  }}
  这里主要的功能就是创建一个workman的websocket服务端(使用其他tcp服务也是可以的),然后在每个进程启动的时候监听redis队列,利用这些进程异步去处理redis队列里的任务,代码里的模拟耗时任务可以直接替换成你在tp框架里写的耗时任务。
- 2.指定workman服务类名
  修改`config/worker_server.php`,将`worker_class`的值改为`app\workman\Worker`
‘worker_class’ => ‘app\workman\Worker’, // 自定义Workerman服务类名 支持数组定义多个服务
- 3.启动workman
  直接在根目录下运行命令`php think worker:server`或者`php think worker:server -d`即可启动,如果要调整workman参数,修改`config/worker_server.php`中的选项即可
  ```bash
  php think worker:server可以看到下面的输出
  Starting Workerman server...
  ----------------------- WORKERMAN -----------------------------
  Workerman version:3.5.35          PHP version:7.3.4
  ------------------------ WORKERS -------------------------------
  worker               listen                              processes status
  none                 websocket://127.0.0.1:2346          1         [ok]
  workman进程启动,进程id 0
  进程id 0 空闲中,休息5秒
  进程id 0 空闲中,休息5秒
  进程id 0 空闲中,休息5秒
  进程id 0 空闲中,休息5秒
  进程id 0 空闲中,休息5秒注意:如果是在windows下,设置进程数是没有用的,就只会启动一个worker进程,也就是只有单进程,要体验多进程只能在Linux环境下,同时也无法守护进程,cmd窗口关掉后服务即停止
- 4.在tp框架中将任务加入redis队列,例如我这里写一个添加redis列表元素的方法 - <?php namespace app\controller; use app\BaseController; class Index extends BaseController { //新增队列数据 public function addQueue() { $redis = new \Redis(); $redis->connect('127.0.0.1', 6379); $redis->rPush('test-queue', '1'); $redis->rPush('test-queue', '2'); $redis->rPush('test-queue', '3'); $redis->rPush('test-queue', '4'); $redis->rPush('test-queue', '5'); $redis->rPush('test-queue', '6'); $redis->rPush('test-queue', '7'); echo 'success'; } }
- 5.访问上面的将任务加入redis队列接口,直接用浏览器或者命令行curl访问 - http://网站域名/index/addQueue即可,然后你就可以看到所有的redis队列将被workman进程分配并执行,以下是我启动workman->添加redis队列->workman处理->队列处理结束打印的结果:- Starting Workerman server... Workerman[think] start in DEBUG mode -------------------------------------------- WORKERMAN --------------------------------------------- Workerman version:3.5.35 PHP version:7.4.33 Event-Loop:\Workerman\Events\Select --------------------------------------------- WORKERS ---------------------------------------------- proto user worker listen processes status tcp root think websocket://127.0.0.1:2346 4 [OK] ---------------------------------------------------------------------------------------------------- Press Ctrl+C to stop. Start success. workman进程启动,进程id 0 workman进程启动,进程id 3 进程id 0 空闲中,休息5秒 进程id 3 空闲中,休息5秒 workman进程启动,进程id 1 进程id 1 空闲中,休息5秒 workman进程启动,进程id 2 进程id 2 空闲中,休息5秒 进程id 0 开始处理业务数据2 进程id 3 开始处理业务数据1 进程id 1 开始处理业务数据3 进程id 2 开始处理业务数据4 进程id 0 处理业务数据2 完成 进程id 0 开始处理业务数据5 进程id 3 处理业务数据1 完成 进程id 1 处理业务数据3 完成 进程id 3 开始处理业务数据6 进程id 1 开始处理业务数据7 进程id 2 处理业务数据4 完成 进程id 2 空闲中,休息5秒 进程id 0 处理业务数据5 完成 进程id 1 处理业务数据7 完成 进程id 3 处理业务数据6 完成 进程id 0 空闲中,休息5秒 进程id 1 空闲中,休息5秒 进程id 3 空闲中,休息5秒 进程id 2 空闲中,休息5秒
本作品采用《CC 协议》,转载必须注明作者和本文链接
 
           jian1098 的个人博客
 jian1098 的个人博客
         
             
             
             
           
           关于 LearnKu
                关于 LearnKu
               
                     
                     
                     粤公网安备 44030502004330号
 粤公网安备 44030502004330号 
 
推荐文章: