Thinkphp+workman+redis实现多进程异步任务处理

前言


PHP本身并不直接支持多线程编程,因为PHP的设计初衷是作为一个脚本语言,主要面向的是Web开发。不过我们可以使用一些扩展和库来实现多进程的功能,提高系统性能,比如workerman和swoole。通过多进程异步执行任务。

安装workman


Thinkphp5.0使用workman创建多进程任务


  • 1.在项目根目录(注意不是pubcli目录)下创建文件server.php,文件内容如下

    <?php
    define('APP_PATH', __DIR__ . '/application/');
    define('BIND_MODULE','workman/Worker');
    // 加载框架引导文件
    require __DIR__ . '/thinkphp/start.php';
  • 2.在根目录创建\application\workman\controller目录,然后在该目录下新建Worker.php,文件内容如下

    <?php
    
    namespace app\workman\controller;
    
    use think\worker\Server;
    
    class Worker extends Server
    {
        //websocket服务端地址和端口
        protected $socket = 'websocket://0.0.0.0:2346';
        //设置进程数,默认为4,根据自己的需要和服务器配置合理设置,一般设置进程数为CPU核数的1倍-3倍
        protected $processes = 4;
    
        /**
         * 收到信息
         * @param $connection
         * @param $data
         */
        public function onMessage($connection, $data)
        {
    
        }
    
        /**
         * 当连接建立时触发的回调函数
         * @param $connection
         */
        public function onConnect($connection)
        {
    
        }
    
        /**
         * 当连接断开时触发的回调函数
         * @param $connection
         */
        public function onClose($connection)
        {
    
        }
    
        /**
         * 当客户端的连接上发生错误时触发
         * @param $connection
         * @param $code
         * @param $msg
         */
        public function onError($connection, $code, $msg)
        {
            echo "error $code $msg\n";
        }
    
        /**
         * 每个进程启动
         * @param $worker
         */
        public function onWorkerStart($worker)
        {
            echo 'workman进程启动,进程id ' . $worker->id . PHP_EOL;
    
            //监听redis队列
            $redis = new \Redis();
            $redis->connect('192.168.204.128', 6379);
            while (true) {
                //读取redis队列
                $data = $redis->lPop('test-queue');
                if ($data) {
                    //处理业务
                    echo '进程id ' . $worker->id . ' 开始处理业务数据' . $data . PHP_EOL;
                    //模拟耗时任务
                    sleep(5);
                    echo '进程id ' . $worker->id . ' 处理业务数据' . $data . ' 完成' . PHP_EOL;
                } else {
                    echo '进程id ' . $worker->id . ' 空闲中,休息5秒'. PHP_EOL;
                    sleep(5);
                }
            }
        }
    }

    这里主要的功能就是创建一个workman的websocket服务端(使用其他tcp服务也是可以的),然后在每个进程启动的时候监听redis队列,利用这些进程异步去处理redis队列里的任务,代码里的模拟耗时任务可以直接替换成你在tp框架里写的耗时任务。

  • 3.在tp框架中将任务加入redis队列,例如我这里写一个添加redis列表元素的方法

    <?php
    
    namespace app\index\controller;
    
    class Index
    {
        //新增队列数据
        public function addQueue()
        {
            $redis = new \Redis();
            $redis->connect('192.168.204.128', 6379);
            $redis->rPush('test-queue', '1');
            $redis->rPush('test-queue', '2');
            $redis->rPush('test-queue', '3');
            $redis->rPush('test-queue', '4');
            $redis->rPush('test-queue', '5');
            $redis->rPush('test-queue', '6');
            $redis->rPush('test-queue', '7');
            echo 'success';
        }
    }
  • 4.启动workman

    直接在根目录下运行第一步创建的server.php

    php server.php start

    可以看到下面的输出

    Workerman[server.php] start in DEBUG mode
    -------------------------------------------- WORKERMAN --------------------------------------------
    Workerman version:3.5.35          PHP version:7.4.33           Event-Loop:\Workerman\Events\Select
    --------------------------------------------- WORKERS ---------------------------------------------
    proto   user            worker          listen                      processes    status
    tcp     root            none            websocket://0.0.0.0:2346    4             [OK]
    ---------------------------------------------------------------------------------------------------
    Press Ctrl+C to stop. Start success.
    workman进程启动,进程id 0
    workman进程启动,进程id 3
    workman进程启动,进程id 2
    workman进程启动,进程id 1
    进程id 1 空闲中,休息5秒
    进程id 0 空闲中,休息5秒
    进程id 2 空闲中,休息5秒
    进程id 3 空闲中,休息5

    注意:如果是在windows下,设置进程数是没有用的,就只会启动一个worker进程,也就是只有单进程,要体验多进程只能在Linux环境下,同时也无法守护进程,cmd窗口关掉后服务即停止

  • 5.访问tp框架中的将任务加入redis队列接口,直接用浏览器或者命令行curl访问http://网站域名/index/index/addQueue即可,然后你就可以看到所有的redis队列将被workman进程分配并执行,以下是我启动workman->添加redis队列->workman处理->队列处理结束打印的结果:

    Workerman[server.php] start in DEBUG mode
    -------------------------------------------- WORKERMAN --------------------------------------------
    Workerman version:3.5.35          PHP version:7.4.33           Event-Loop:\Workerman\Events\Select
    --------------------------------------------- WORKERS ---------------------------------------------
    proto   user            worker          listen                      processes    status
    tcp     root            none            websocket://0.0.0.0:2346    4             [OK]
    ---------------------------------------------------------------------------------------------------
    Press Ctrl+C to stop. Start success.
    workman进程启动,进程id 0
    workman进程启动,进程id 3
    workman进程启动,进程id 2
    workman进程启动,进程id 1
    进程id 1 空闲中,休息5秒
    进程id 0 空闲中,休息5秒
    进程id 2 空闲中,休息5秒
    进程id 3 空闲中,休息5秒
    进程id 2 空闲中,休息5秒
    进程id 1 空闲中,休息5秒
    进程id 3 空闲中,休息5秒
    进程id 0 空闲中,休息5秒
    (这里开始将任务加入redis队列,然后workman开始消费队列)
    进程id 0 开始处理业务数据1
    进程id 1 开始处理业务数据3
    进程id 3 开始处理业务数据4
    进程id 2 开始处理业务数据2
    进程id 0 处理业务数据1 完成
    进程id 1 处理业务数据3 完成
    进程id 2 处理业务数据2 完成
    进程id 1 开始处理业务数据6
    进程id 3 处理业务数据4 完成
    进程id 0 开始处理业务数据5
    进程id 3 开始处理业务数据7
    进程id 2 空闲中,休息5秒
    进程id 1 处理业务数据6 完成
    进程id 0 处理业务数据5 完成
    进程id 3 处理业务数据7 完成
    (到这里所有的7项任务已经处理完成)
    进程id 1 空闲中,休息5秒
    进程id 0 空闲中,休息5秒
    进程id 3 空闲中,休息5秒
    进程id 2 空闲中,休息5

TP5.1及TP6使用workman创建多进程任务


  • 1.在根目录创建\application\workman\controller目录,然后在该目录下新建Worker.php,文件内容如下

    <?php
    
    namespace app\workman;
    
    use think\worker\Server;
    
    class Worker extends Server
    {
        protected $host = '127.0.0.1';
        protected $port = 2346;
        protected $protocol = 'websocket';
        protected $option = [
            'count'   => 4, //设置进程数,默认为4,根据自己的需要和服务器配置合理设置,一般设置进程数为CPU核数的1倍-3倍
            'name'    => 'think'
        ];
    
    
  /**
   * 收到信息
   * @param $connection
   * @param $data
   */
  public function onMessage($connection, $data)
  {

  }

  /**
   * 当连接建立时触发的回调函数
   * @param $connection
   */
  public function onConnect($connection)
  {

  }

  /**
   * 当连接断开时触发的回调函数
   * @param $connection
   */
  public function onClose($connection)
  {

  }

  /**
   * 当客户端的连接上发生错误时触发
   * @param $connection
   * @param $code
   * @param $msg
   */
  public function onError($connection, $code, $msg)
  {
      echo "error $code $msg\n";
  }

  /**
   * 每个进程启动
   * @param $worker
   */
  public function onWorkerStart($worker)
  {
      echo 'workman进程启动,进程id ' . $worker->id . PHP_EOL;

      //监听redis队列
      $redis = new \Redis();
      $redis->connect('127.0.0.1', 6379);
      while (true) {
          //读取redis队列
          $data = $redis->lPop('test-queue');
          if ($data) {
              //处理业务
              echo '进程id ' . $worker->id . ' 开始处理业务数据' . $data . PHP_EOL;
              //模拟耗时任务
              sleep(5);
              echo '进程id ' . $worker->id . ' 处理业务数据' . $data . ' 完成' . PHP_EOL;
          } else {
              echo '进程id ' . $worker->id . ' 空闲中,休息5秒' . PHP_EOL;
              sleep(5);
          }
      }
  }

}


  这里主要的功能就是创建一个workman的websocket服务端(使用其他tcp服务也是可以的),然后在每个进程启动的时候监听redis队列,利用这些进程异步去处理redis队列里的任务,代码里的模拟耗时任务可以直接替换成你在tp框架里写的耗时任务。

- 2.指定workman服务类名

  修改`config/worker_server.php`,将`worker_class`的值改为`app\workman\Worker`

‘worker_class’ => ‘app\workman\Worker’, // 自定义Workerman服务类名 支持数组定义多个服务


- 3.启动workman

  直接在根目录下运行命令`php think worker:server`或者`php think worker:server -d`即可启动,如果要调整workman参数,修改`config/worker_server.php`中的选项即可

  ```bash
  php think worker:server

可以看到下面的输出

  Starting Workerman server...
  ----------------------- WORKERMAN -----------------------------
  Workerman version:3.5.35          PHP version:7.3.4
  ------------------------ WORKERS -------------------------------
  worker               listen                              processes status
  none                 websocket://127.0.0.1:2346          1         [ok]
  workman进程启动,进程id 0
  进程id 0 空闲中,休息5秒
  进程id 0 空闲中,休息5秒
  进程id 0 空闲中,休息5秒
  进程id 0 空闲中,休息5秒
  进程id 0 空闲中,休息5

注意:如果是在windows下,设置进程数是没有用的,就只会启动一个worker进程,也就是只有单进程,要体验多进程只能在Linux环境下,同时也无法守护进程,cmd窗口关掉后服务即停止

  • 4.在tp框架中将任务加入redis队列,例如我这里写一个添加redis列表元素的方法

    <?php
    namespace app\controller;
    
    use app\BaseController;
    
    class Index extends BaseController
    {
        //新增队列数据
        public function addQueue()
        {
            $redis = new \Redis();
            $redis->connect('127.0.0.1', 6379);
            $redis->rPush('test-queue', '1');
            $redis->rPush('test-queue', '2');
            $redis->rPush('test-queue', '3');
            $redis->rPush('test-queue', '4');
            $redis->rPush('test-queue', '5');
            $redis->rPush('test-queue', '6');
            $redis->rPush('test-queue', '7');
            echo 'success';
        }
    }
  • 5.访问上面的将任务加入redis队列接口,直接用浏览器或者命令行curl访问http://网站域名/index/addQueue即可,然后你就可以看到所有的redis队列将被workman进程分配并执行,以下是我启动workman->添加redis队列->workman处理->队列处理结束打印的结果:

    Starting Workerman server...
    Workerman[think] start in DEBUG mode
    -------------------------------------------- WORKERMAN ---------------------------------------------
    Workerman version:3.5.35          PHP version:7.4.33           Event-Loop:\Workerman\Events\Select
    --------------------------------------------- WORKERS ----------------------------------------------
    proto   user            worker          listen                        processes    status
    tcp     root            think           websocket://127.0.0.1:2346    4             [OK]
    ----------------------------------------------------------------------------------------------------
    Press Ctrl+C to stop. Start success.
    workman进程启动,进程id 0
    workman进程启动,进程id 3
    进程id 0 空闲中,休息5秒
    进程id 3 空闲中,休息5秒
    workman进程启动,进程id 1
    进程id 1 空闲中,休息5秒
    workman进程启动,进程id 2
    进程id 2 空闲中,休息5秒
    进程id 0 开始处理业务数据2
    进程id 3 开始处理业务数据1
    进程id 1 开始处理业务数据3
    进程id 2 开始处理业务数据4
    进程id 0 处理业务数据2 完成
    进程id 0 开始处理业务数据5
    进程id 3 处理业务数据1 完成
    进程id 1 处理业务数据3 完成
    进程id 3 开始处理业务数据6
    进程id 1 开始处理业务数据7
    进程id 2 处理业务数据4 完成
    进程id 2 空闲中,休息5秒
    进程id 0 处理业务数据5 完成
    进程id 1 处理业务数据7 完成
    进程id 3 处理业务数据6 完成
    进程id 0 空闲中,休息5秒
    进程id 1 空闲中,休息5秒
    进程id 3 空闲中,休息5秒
    进程id 2 空闲中,休息5
本作品采用《CC 协议》,转载必须注明作者和本文链接
讨论数量: 1

值得参考

4个月前 评论

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!