PHP 并发编程之 Master-Worker 模式

Master-Worker 的模式结构

Master 进程为主进程,它维护了一个 Worker 进程队列、子任务队列和子结果集。Worker 进程队列中的 Worker 进程,不停地从任务队列中提取要处理的子任务,并将子任务的处理结果写入结果集。

  • 使用多进程
  • 支持 Worker 错误重试,仅仅实现业务即可
  • 任务累积过多,自动 Fork Worker 进程
  • 常驻 Worker 进程,减少进程 Fork 开销
  • 非常驻 Worker 进程闲置,自动退出回收
  • 支持日志

Demo: 基于 Redis 生产消费队列 在 test 目录中

代码:

<?php
declare(ticks = 1);
// 必须先使用语句declare(ticks=1),否则注册的singal-handel就不会执行了
//error_reporting(E_ERROR);

abstract class MasterWorker
{

    // 子进程配置属性
    protected $maxWorkerNum; // 最多只能开启进程数
    protected $minWorkerNum; // 最少常驻子进程数
    protected $waitTaskTime; // 等待任务时间,单位秒
    protected $waitTaskLoopTimes; // 连续这么多次队列为空就退出子进程
    protected $consumeTryTimes; // 连续消费失败次数

    // 父进程专用属性
    protected $worker_list = [];
    protected $check_internal = 1;
    protected $masterExitCallback = [];

    // 子进程专用属性
    protected $autoQuit = false;
    protected $status = self::WORKER_STATUS_IDLE;
    protected $taskData; // 任务数据
    protected $workerExitCallback = [];

    // 通用属性
    protected $stop_service = false;
    protected $master = true;

    // 通用配置
    protected $logFile;

    const WORKER_STATUS_IDLE = 'idle';
    const WORKER_STATUS_FINISHED = 'finished';
    const WORKER_STATUS_EXITING = 'exiting';
    const WORKER_STATUS_WORKING = 'working';
    const WORKER_STATUS_FAIL = 'fail';
    const WORKER_STATUS_TERMINATED = 'terminated';

    public function __construct($options = [])
    {
        $this->initConfig($options);
    }

    protected function initConfig($options = [])
    {
        $defaultConfig = [
            'maxWorkerNum' => 10,
            'minWorkerNum' => 3,
            'waitTaskTime' => 0.01,
            'waitTaskLoopTimes' => 50,
            'consumeTryTimes' => 3,
            'logFile' => './master_worker.log',
        ];

        foreach ($defaultConfig as $key => $default) {
            $this->$key = array_key_exists($key, $options) ? $options[$key] : $default;
        }
    }

    public function start()
    {

        // 父进程异常,需要终止子进程
        set_exception_handler([$this, 'exceptionHandler']);

        // fork minWorkerNum 个子进程
        $this->mutiForkWorker($this->minWorkerNum);

        if ($this->getWorkerLength() <= 0) {
            $this->masterWaitExit(true, 'fork 子进程全部失败');
        }

        // 父进程监听信号
        pcntl_signal(SIGTERM, [$this, 'sig_handler']);
        pcntl_signal(SIGINT, [$this, 'sig_handler']);
        pcntl_signal(SIGQUIT, [$this, 'sig_handler']);
        pcntl_signal(SIGCHLD, [$this, 'sig_handler']);

        // 监听队列,队列比进程数多很多,则扩大进程,扩大部分的进程会空闲自动退出
        $this->checkWorkerLength();

        $this->masterWaitExit();
    }

    /**
     * Master 等待退出
     *
     * @param boolean $force 强制退出
     * @param string $msg 退出 message
     * @return void
     */
    protected function masterWaitExit($force = false, $msg = '')
    {
        // 强制发送退出信号
        $force && $this->sig_handler(SIGTERM);

        // 等到子进程退出
        while ($this->stop_service) {
            $this->checkExit($msg);
            $this->msleep($this->check_internal);
        }
    }

    protected function log($msg)
    {
        try {
            $header = $this->isMaster() ? 'Master [permanent]' : sprintf('Worker [%s]', $this->autoQuit ? 'temporary' : 'permanent');
            $this->writeLog($msg, $this->getLogFile(), $header);
        } catch (\Exception $e) {

        }
    }

    protected function mutiForkWorker($num, $autoQuit = false, $maxTryTimes = 3)
    {
        for ($i = 1; $i <= $num; ++$i) {
            $this->forkWorker($autoQuit, $maxTryTimes);
        }
    }

    protected function checkWorkerLength()
    {
        // 如果要退出父进程,就不执行检测
        while (! $this->stop_service) {

            $this->msleep($this->check_internal);

            // 处理进程
            $workerLength = $this->getWorkerLength();

            // 如果进程数小于最低进程数
            $this->mutiForkWorker($this->minWorkerNum - $workerLength);

            $workerLength = $this->getWorkerLength();

            // 创建常驻worker进程失败, 下次检查继续尝试创建
            if ($workerLength <= 0) {
                continue;
            }

            if ($workerLength >= $this->maxWorkerNum) {
                // 不需要增加进程
                continue;
            }

            $num = $this->calculateAddWorkerNum();

            // 不允许超过最大进程数
            $num = min($num, $this->maxWorkerNum - $workerLength);

            // 创建空闲自动退出worker进程
            $this->mutiForkWorker($num, true);

        }
    }

    protected function getWorkerLength()
    {
        return count($this->worker_list);
    }

    //信号处理函数
    public function sig_handler($sig)
    {
        switch ($sig) {
            case SIGTERM:
            case SIGINT:
            case SIGQUIT:
                // 退出: 给子进程发送退出信号,退出完成后自己退出

                // 先标记一下,子进程完全退出后才能结束
                $this->stop_service = true;

                // 给子进程发送信号
                foreach ($this->worker_list as $pid => $v) {
                    posix_kill($pid, SIGTERM);
                }

                break;
            case SIGCHLD:
                // 子进程退出, 回收子进程, 并且判断程序是否需要退出
                while (($pid = pcntl_waitpid(-1, $status, WNOHANG)) > 0) {
                    // 去除子进程
                    unset($this->worker_list[$pid]);

                    // 子进程是否正常退出
                    // if (pcntl_wifexited($status)) {
                    //     //
                    // }
                }

                $this->checkExit();

                break;
            default:
                $this->default_sig_handler($sig);
                break;
        }

    }

    public function child_sig_handler($sig)
    {
        switch ($sig) {
            case SIGINT:
            case SIGQUIT:
            case SIGTERM:
                $this->stop_service = true;
                break;
            // 操作比较危险 在处理任务当初强制终止
            // case SIGTERM:
            //     // 强制退出
            //     $this->stop_service = true;
            //     $this->status = self::WORKER_STATUS_TERMINATED;
            //     $this->beforeWorkerExitHandler();
            //     $this->status = self::WORKER_STATUS_EXITING;
            //     die(1);
            //     break;
        }
    }

    protected function checkExit($msg = '')
    {
        if ($this->stop_service && empty($this->worker_list)) {
            $this->beforeMasterExitHandler();
            die($msg ?:'Master 进程结束, Worker 进程全部退出');
        }
    }

    protected function forkWorker($autoQuit = false, $maxTryTimes = 3)
    {

        $times = 1;

        do {

            $pid = pcntl_fork();

            if ($pid == -1) {
                ++$times;
            } elseif($pid) {
                $this->worker_list[$pid] = true;
                //echo 'pid:', $pid, "\n";
                return $pid;
            } else {
                // 子进程 消费
                $this->autoQuit = $autoQuit;
                $this->master = false;
                // 处理信号
                pcntl_signal(SIGTERM, [$this, 'child_sig_handler']);
                pcntl_signal(SIGINT, [$this, 'child_sig_handler']);
                pcntl_signal(SIGQUIT, [$this, 'child_sig_handler']);
                exit($this->runChild()); // worker进程结束
            }
        } while ($times <= $maxTryTimes);

        // fork 3次都失败

        return false;

    }

    /**
     * 子进程处理内容
     */
    protected function runChild()
    {
        $noDataLoopTime = 0;
        $status = 0;
        while (!$this->autoQuit || ($noDataLoopTime <= $this->waitTaskLoopTimes)) {

            // 处理退出
            if ($this->stop_service) {
                break;
            }

            $this->taskData = null;
            try {
                $this->taskData = $this->deQueue();
                if ($this->taskData) {
                    $noDataLoopTime = 1; // 重新从1开始
                    $this->status = self::WORKER_STATUS_WORKING;
                    $this->consumeByRetry($this->taskData);
                    $this->status = self::WORKER_STATUS_FINISHED;
                } else {
                    $this->status = self::WORKER_STATUS_IDLE;
                    // 避免溢出
                    $noDataLoopTime = $noDataLoopTime >= PHP_INT_MAX ? PHP_INT_MAX : ($noDataLoopTime + 1);
                    // 等待队列
                    $this->msleep($this->waitTaskTime);
                }

                $status = 0;
            } catch (\RedisException $e) {
                $this->status = self::WORKER_STATUS_FAIL;
                $this->consumeFail($this->taskData, $e);
                $status = 1;
            } catch (\Exception $e) {
                // 消费出现错误
                $this->status = self::WORKER_STATUS_FAIL;
                $this->consumeFail($this->taskData, $e);
                $status = 2;
            }
        }

        $this->beforeWorkerExitHandler();
        $this->status = self::WORKER_STATUS_EXITING;

        return $status;
    }

    /**
     * @param $data
     * @param int $tryTimes
     * @throws \Exception
     */
    protected function consumeByRetry($data, $tryTimes = 1)
    {
        $tryTimes = 1;
        $exception = null;
        // consume 返回false 为失败
        while ($tryTimes <= $this->consumeTryTimes) {
            try {
                return $this->consume($data);
            } catch (\Exception $e) {
                $exception = $e;
                ++$tryTimes;
            }
        }
        // 最后一次还报错 写日志
        if (($tryTimes > $this->consumeTryTimes) && $exception) {
            throw $exception;
        }
    }

    /**
     * @param $mixed
     * @param $filename
     * @param $header
     * @param bool $trace
     * @return bool
     * @throws \Exception
     */
    protected function writeLog($mixed, $filename, $header, $trace = false)
    {
        if (is_string($mixed)) {
            $text = $mixed;
        } else {
            $text = var_export($mixed, true);
        }
        $trace_list = "";
        if ($trace) {
            $_t = debug_backtrace();
            $trace_list = "-- TRACE : \r\n";
            foreach ($_t as $_line) {
                $trace_list .= "-- " . $_line ['file'] . "[" . $_line ['line'] . "] : " . $_line ['function'] . "()" . "\r\n";
            }
        }
        $text = "\r\n=" . $header . "==== " . strftime("[%Y-%m-%d %H:%M:%S] ") . " ===\r\n<" . getmypid() . "> : " . $text . "\r\n" . $trace_list;
        $h = fopen($filename, 'a');
        if (! $h) {
            throw new \Exception('Could not open logfile:' . $filename);
        }
        // exclusive lock, will get released when the file is closed
        if (! flock($h, LOCK_EX)) {
            return false;
        }
        if (fwrite($h, $text) === false) {
            throw new \Exception('Could not write to logfile:' . $filename);
        }
        flock($h, LOCK_UN);
        fclose($h);
        return true;
    }

    protected function msleep($time)
    {
        usleep($time * 1000000);
    }

    public function exceptionHandler($exception)
    {
        if ($this->isMaster()) {
            $msg = '父进程['.posix_getpid().']错误退出中:' . $exception->getMessage();
            $this->log($msg);
            $this->masterWaitExit(true, $msg);
        } else {
            $this->child_sig_handler(SIGTERM);
        }
    }

    public function isMaster()
    {
        return $this->master;
    }

    /**
     * 默认的 worker 数量增加处理
     * 
     * @return int
     */
    public function calculateAddWorkerNum()
    {
        $workerLength = $this->getWorkerLength();
        $taskLength = $this->getTaskLength();
        // 还不够多
        if (($taskLength / $workerLength < 3) && ($taskLength - $workerLength < 10)) {
            return 0;
        }

        // 增加一定数量的进程
        return ceil($this->maxWorkerNum - $workerLength / 2);
    }

    /**
     * 自定义日子文件
     *
     * @return string
     */
    protected function getLogFile()
    {
        return $this->logFile;
    }

    /**
     * 自定义消费错误函数
     *
     * @param [type] $data
     * @param \Exception $e
     * @return void
     */
    protected function consumeFail($data, \Exception $e)
    {
        $this->log(['data' => $data, 'errorCode' => $e->getCode(), 'errorMsg' => get_class($e) . ' : ' . $e->getMessage()]);
    }

     protected function beforeWorkerExitHandler()
     {
         foreach ($this->workerExitCallback as $callback) {
            is_callable($callback) && call_user_func($callback, $this);
         }
     }

     /**
      * 设置Worker自定义结束回调
      *
      * @param mixed  $func
      * @param boolean $prepend
      * @return void
      */
     public function setWorkerExitCallback($callback, $prepend = false)
     {
        return $this->setCallbackQueue('workerExitCallback', $callback, $prepend);
     }

     /**
     * 设置Master自定义结束回调
     *
     * @param callable $func
     * @param boolean $prepend
     * @return void
     */
    public function setMasterExitCallback(callable $callback, $prepend = false)
    {
        return $this->setCallbackQueue('masterExitCallback', $callback, $prepend);
    }

    protected function setCallbackQueue($queueName, $callback, $prepend = false)
    {
        if (! isset($this->$queueName) || ! is_array($this->$queueName)) {
            return false;
        }

        if (is_null($callback)) {
            $this->$queueName = []; // 如果传递 null 就清空
            return true;
        } elseif (! is_callable($callback)) {
            return false;
        }

        if ($prepend) {
            array_unshift($this->$queueName, $callback);
        } else {
            $this->$queueName[] = $callback;
        }

        return true;
    }

    protected function beforeMasterExitHandler()
    {
        foreach ($this->masterExitCallback as $callback) {
            is_callable($callback) && call_user_func($callback, $this);
         }
    }

    protected function default_sig_handler($sig)
    {

    }

    /**
     * 得到待处理任务数量
     */
    abstract protected function getTaskLength();

    /**
     * 出队
     * @return mixed
     */
    abstract public function deQueue();

    /**
     * 入队
     * @param $data
     * @return int
     */
    abstract public function enQueue($data);

    /**
     * 消费的具体内容
     * 不要进行失败重试
     * 会自动进行
     * 如果失败直接抛出异常
     * @param $data
     */
    abstract protected function consume($data);
}

Demo

基于redis 的 生产者-消费者模式

RedisProducterConsumer.php

<?php
require "../src/MasterWorker.php";
class RedisProducterConsumer extends MasterWorker
{
    const QUERY_NAME = 'query_name';

    /**
     * Master 和 Worker 的连接分开,否则会出现问题
     * 
     * @var Redis[]
     */
    protected $redis_connections = [];
    public function __construct($options = [])
    {
        parent::__construct($options);
        // 设置退出回调
        $this->setWorkerExitCallback(function ($worker) {
            $this->closeRedis();
            // 处理结束,把redis关闭
            $this->log('进程退出:' . posix_getpid());
        });
        $this->setMasterExitCallback(function ($master) {
            $this->closeRedis();
            $this->log('master 进程退出:' . posix_getpid());
        });
    }
    /**
     * 得到队列长度
     */
    protected function getTaskLength()
    {
        return (int) $this->getRedis()->lSize(static::QUERY_NAME);
    }
    /**
     * 出队
     * @return mixed
     */
    public function deQueue()
    {
        return $this->getRedis()->lPop(static::QUERY_NAME);
    }
    /**
     * 入队
     * @param $data
     * @return int
     */
    public function enQueue($data)
    {
        return $this->getRedis()->rPush(static::QUERY_NAME, (string) $data);
    }
    /**
     * 消费的具体内容
     * 不要进行失败重试
     * 会自动进行
     * 如果失败直接抛出异常
     * @param $data
     */
    protected function consume($data)
    {
        // 错误抛出异常
        //throw new Exception('错误信息');
        $this->log('消费中 ' . $data);
        $this->msleep(1);
        $this->log('消费结束:' . $data . '; 剩余个数:' . $this->getTaskLength());
    }
    /**
     * @return Redis
     */
    public function getRedis()
    {
        $index = $this->isMaster() ? 'master' : 'worker';
        // 后续使用 predis 使用redis池
        if (! isset($this->redis_connections[$index])) {
            $connection = new \Redis();
            $connection->connect('127.0.0.1', 6379, 2);
            $this->redis_connections[$index] = $connection;
        }
        return $this->redis_connections[$index];
    }
    public function closeRedis()
    {
        foreach ($this->redis_connections as $key => $connection) {
            $connection && $connection->close();
        }
    }
    protected function consumeFail($data, \Exception $e)
    {
        parent::consumeFail($data, $e);
        // 自定义操作,比如重新入队,上报错误等
    }
}

调用例子

<?php
require "./RedisProducterConsumer.php";
$producterConsumer = new RedisProducterConsumer();
// 清空任务队列
$producterConsumer->getRedis()->ltrim(RedisProducterConsumer::QUERY_NAME, 1, 0);
// 写入任务队列
for ($i = 1; $i <= 100; ++$i) {
    $producterConsumer->enQueue($i);
}
$producterConsumer->start();
// 接下来的写的代码不会执行
// 查看运行的进程
// ps aux | grep test.php
// 试一试 Ctrl + C 在执行上面产看进程命令

代码地址:https://github.com/MrSuperLi/php-master-wo...

本作品采用《CC 协议》,转载必须注明作者和本文链接
有什么想法欢迎提问或者资讯
讨论数量: 0
(= ̄ω ̄=)··· 暂无内容!

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!