Laravel Queue 消費者源码分析

基于laravel10分析
基于php artisan queue:work和基于redis驱动来分析

命令参数说明

protected $signature = 'queue:work
                            {connection? : 队列连接驱动 是连接redis还是mq....}
                            {--name=default : work进程的名称}
                            {--queue= : 队列连接名称 默认是default}
                            {--daemon : 以守护程序模式运行工作程序 (废弃)}
                            {--once : 进程只运行一个任务就直接结束进程}
                            {--stop-when-empty : 如果任务为空就结束进程}
                            {--delay=0 : 延迟失败队列的秒数 (废弃)}
                            {--backoff=0 : 遇到异常没有捕获的时候多久丢回队列}
                            {--max-jobs=0 : 最大运行任务 达到这个任务 进程将会结束}
                            {--max-time=0 : 当前进程最大运行时间 超过这个时间进程将会结束}
                            {--force : 强制在维护模式下也执行任务}
                            {--memory=128 : 内存限制(MB) 超过内存进程将会结束}
                            {--sleep=3 : 没有任务的时候sleep多少秒}
                            {--rest=0 : 任务与任务之间隔多少秒执行}
                            {--timeout=60 : 单个任务超时时间}
                            {--tries=1 : 尝试次数}';

运行命令后就会执行handle方法

public function handle()
    {
        //如果当前是维护模式且只运行一个任务 就直接结束进程
        if ($this->downForMaintenance() && $this->option('once')) {
            return $this->worker->sleep($this->option('sleep'));
        }

       //绑定事件监听
       $this->listenForEvents();

        //连接 因为只分析redis 所以这里的连接默认就是redis
        $connection = $this->argument('connection')
                       ?: $this->laravel['config']['queue.default'];

        //获取队列名称
        $queue = $this->getQueue($connection);

        if (Terminal::hasSttyAvailable()) {
            $this->components->info(
                sprintf('Processing jobs from the [%s] %s.', $queue, str('queue')->plural(explode(',', $queue)))
            );
        }

        //运行Worker
        return $this->runWorker(
            $connection, $queue
        );
    }
protected function listenForEvents()
    {
        任务开始执行
        $this->laravel['events']->listen(JobProcessing::class, function ($event) {
            $this->writeOutput($event->job, 'starting');
        });
        //任务执行完成
        $this->laravel['events']->listen(JobProcessed::class, function ($event) {
            $this->writeOutput($event->job, 'success');
        });
        //异常任务释放后
        $this->laravel['events']->listen(JobReleasedAfterException::class, function ($event) {
            $this->writeOutput($event->job, 'released_after_exception');
        });
        //任务失败(达到尝试次数)
        $this->laravel['events']->listen(JobFailed::class, function ($event) {
            $this->writeOutput($event->job, 'failed');

            $this->logFailedJob($event);
        });
    }

执行runWorker方法 这个里面会执行$this->work 对应Worker类 然后判断是不是只执行一次 来调用对应的方法 我们只分析daemon方法

protected function runWorker($connection, $queue)
    {
        return $this->worker
            ->setName($this->option('name'))
            ->setCache($this->cache)
            ->{$this->option('once') ? 'runNextJob' : 'daemon'}(
                $connection, $queue, $this->gatherWorkerOptions()
            );
    }

执行Worker类的 daemon方法

public function daemon($connectionName, $queue, WorkerOptions $options)
    {
        //判断是否支持异步信号 如果支持 则注册信号
        if ($supportsAsyncSignals = $this->supportsAsyncSignals()) {
            $this->listenForSignals();
        }
        ...
    }

listenForSignals方法注册信号

protected function listenForSignals()
    {
        pcntl_async_signals(true);  //启用异步信号

        pcntl_signal(SIGQUIT, fn () => $this->shouldQuit = true); //退出信号
        pcntl_signal(SIGTERM, fn () => $this->shouldQuit = true); //终止信号
        pcntl_signal(SIGUSR2, fn () => $this->paused = true); //用户定义信号2(当前用户暂停任务)
        pcntl_signal(SIGCONT, fn () => $this->paused = false); //继续(用户暂停后继续任务)
    }
public function daemon($connectionName, $queue, WorkerOptions $options)
    {
        ...
        $lastRestart = $this->getTimestampOfLastQueueRestart(); //获取最后一次重启时间
        [$startTime, $jobsProcessed] = [hrtime(true) / 1e9, 0]; //进程开始时间和初始化任务计数
        ...
    }
public function daemon($connectionName, $queue, WorkerOptions $options)
    {
        ...
        while (true) {
            //判断是否应该运行任务
            if (! $this->daemonShouldRun($options, $connectionName, $queue)) {
                $status = $this->pauseWorker($options, $lastRestart);
                //如果状态不是null 则会停止进程
                if (! is_null($status)) {
                    return $this->stop($status, $options);
                }
                continue;
            }
            //重置作用域
            if (isset($this->resetScope)) {
                ($this->resetScope)();
            }
             ...
        }
        ...
    }
public function daemon($connectionName, $queue, WorkerOptions $options)
    {
        ...
        while (true) {
             ...
            //获取任务
            $job = $this->getNextJob(
                $this->manager->connection($connectionName), $queue
            );
             ...
        }
        ...
    }

getNextJob方法

protected function getNextJob($connection, $queue)
    {
        $popJobCallback = function ($queue) use ($connection) {
            return $connection->pop($queue);
        };
        //这个是获取任务时触发事件
        $this->raiseBeforeJobPopEvent($connection->getConnectionName());

        try {
            //这个地方是触发回调 如果有设置的话 目前我也没有用到这个
            if (isset(static::$popCallbacks[$this->name])) {
                return tap(
                    (static::$popCallbacks[$this->name])($popJobCallback, $queue),
                    fn ($job) => $this->raiseAfterJobPopEvent($connection->getConnectionName(), $job)
                );
            }

            //这里就是去获取任务 因为一个队列进程可以设置多个队列名称 有优先级 排在前面的会先执行
            //所有会先等优先级高的所有任务执行完才会执行优先级低的 如果优先级高的持续在往队列里面丢 就可能会导致优先级低的执行不到 这个需要看队列的频率
            foreach (explode(',', $queue) as $queue) {
                if (! is_null($job = $popJobCallback($queue))) {
                    $this->raiseAfterJobPopEvent($connection->getConnectionName(), $job);

                    return $job;
                } 
            }
        } catch (Throwable $e) {
            $this->exceptions->report($e);

            $this->stopWorkerIfLostConnection($e);

            $this->sleep(1);
        }
    }

分析一下$popJobCallback($queue)

$popJobCallback = function ($queue) use ($connection) {
    return $connection->pop($queue);
};

看一下pop方法里面做了什么 因为我们分析的是redis驱动 所以直接找到RedisQueue这个类 然后就可以看到pop方法了

public function pop($queue = null)
    {
        //合并
        $this->migrate($prefixed = $this->getQueue($queue));
        ...
    }

看一下migrate方法做了什么事情

protected function migrate($queue)
    {
        //这里是检测是否有已经到期了的任务 放到list
        $this->migrateExpiredJobs($queue.':delayed', $queue);
        //判断是否有设置多久重试 
        if (! is_null($this->retryAfter)) {
            //判断保留队列是否有到期的任务 放到list 这个有序集合是在获取到任务的时候就会往这里面丢一个任务 为了防止任务超时或者捕捉不到特定异常的时候 丢失任务 所以retry_after尽量不要设置成null
            $this->migrateExpiredJobs($queue.':reserved', $queue);
        }
    }

我们来分析migrateExpiredJobs方法里面做了什么操作

public function migrateExpiredJobs($from, $to)
    {
      //这里写了一个lua脚本 新版本中多了一个migrationBatchSize这个参数 就是合并的时候可以限定最多获取前多少个任务 防止一次获取很大的时候 阻塞或者内存爆了
        return $this->getConnection()->eval(
            LuaScripts::migrateExpiredJobs(), 3, $from, $to, $to.':notify', $this->currentTime(), $this->migrationBatchSize
        );
    }

看一下LuaScripts::migrateExpiredJobs()这个lua脚本做了什么操作

public static function migrateExpiredJobs()
    {
        return <<<'LUA'
        //获取满足条件的任务(到期了的)
local val = redis.call('zrangebyscore', KEYS[1], '-inf', ARGV[1], 'limit', 0, ARGV[2])

//判断任务是否是空的 如果不是空的
if(next(val) ~= nil) then
    //则移除已经获取到的任务
    redis.call('zremrangebyrank', KEYS[1], 0, #val - 1)

    //循环 每次获取100个
    for i = 1, #val, 100 do
        往list中添加任务
        redis.call('rpush', KEYS[2], unpack(val, i, math.min(i+99, #val)))

        for j = i, math.min(i+99, #val) do
            //这里是一个通知作用的list 后续可以看到它的作用
            redis.call('rpush', KEYS[3], 1)
        end
    end
end

return val
LUA;
    }

我们再回到pop方法后续的操作
上面同步完了到期任务到list 这里就是从list去拿任务了

 public function pop($queue = null)
    {
        ...
        [$job, $reserved] = $this->retrieveNextJob($prefixed);
        ...
    }

我们看一下retrieveNextJob方法做了什么操作

protected function retrieveNextJob($queue, $block = true)
    {
        //取任务
        $nextJob = $this->getConnection()->eval(
            LuaScripts::pop(), 3, $queue, $queue.':reserved', $queue.':notify',
            $this->availableAt($this->retryAfter)
        );
        ...
    }

看一下这个取任务的lua脚本做了些什么操作

public static function pop()
    {
        return <<<'LUA'
//取第一个任务
local job = redis.call('lpop', KEYS[1])
local reserved = false
//如果取到了任务
if(job ~= false) then
    //decode解析出来
    reserved = cjson.decode(job)
    //尝试次数+1
    reserved['attempts'] = reserved['attempts'] + 1
    //转成json
    reserved = cjson.encode(reserved)
    //加到保留队列
    redis.call('zadd', KEYS[2], ARGV[1], reserved)
    //通知list也取出第一个 同步
    redis.call('lpop', KEYS[3])
end
//返回结果
return {job, reserved}
LUA;

回到retrieveNextJob方法

protected function retrieveNextJob($queue, $block = true)
    {
       ...
        //如果为空 可能取失败了 就返回
        if (empty($nextJob)) {
            return [null, null];
        }

        //拿到任务和保留任务
        [$job, $reserved] = $nextJob;
        //如果没有job 且 block_for设置不是null 且$block是true(第二次会标记flase) 通知list再这里就会提现出作用了 用于检测是否有新的任务  只是用作通知 如果能拿到通知 就代表有新的任务了 
        //blpop是阻塞等待 但是不会影响redis阻塞  (根据你的队列负载调整此值要比连续轮询 Redis 数据库中的新任务更加有效 官方文档是这样解释的)
        if (! $job && ! is_null($this->blockFor) && $block &&
            $this->getConnection()->blpop([$queue.':notify'], $this->blockFor)) {
            //这里标记为false 可能原因是防止死循环 就是有多个进程都在跑同一个队列的时候 就可能会出现我拿到通知了 但是被别的任务取走了  然后又来一个 又被抢走了 导致延迟队列有序集合里面的一直没去处理 防止这种特殊情况的出现 
            return $this->retrieveNextJob($queue, false);
        }

        return [$job, $reserved];
    }

我们又再一次再回到pop方法后续的操作

 public function pop($queue = null)
    {
        ...
        //如果有保留任务的时候返回RedisJob 至于为什么要返回这个在后续会有解释
        if ($reserved) {
            return new RedisJob(
                $this->container, $this, $job,
                $reserved, $this->connectionName, $queue ?: $this->default
            );
        }
        ...
    }

pop方法执行完了 在这个方法里面做了合并任务 取任务的操作
getNextJob方法也执行完了
又回到了最开始的Worker类的daemon方法

public function daemon($connectionName, $queue, WorkerOptions $options)
    {
        ...
        while (true) {
            ...
            //如果支持异步信号 就会去注册超时信号
            if ($supportsAsyncSignals) {
                $this->registerTimeoutHandler($job, $options);
            }
            ...
        }
        ...
    }

看一下 registerTimeoutHandler方法做了什么操作

protected function registerTimeoutHandler($job, WorkerOptions $options)
    {
        //注册终止信号
        pcntl_signal(SIGALRM, function () use ($job, $options) {
            //如果有任务 这个任务就是上面的 new RedisJob
            if ($job) {
                //标记任务失败 如果达到最大尝试次数 
                $this->markJobAsFailedIfWillExceedMaxAttempts(
                    $job->getConnectionName(), $job, (int) $options->maxTries, $e = $this->timoutExceededException($job)
                );
                //标记任务失败 达到最大异常数
                $this->markJobAsFailedIfWillExceedMaxExceptions(
                    $job->getConnectionName(), $job, $e
                );
                //标记任务失败 如果设置了failOnTimeout = true 不会进行尝试
                $this->markJobAsFailedIfItShouldFailOnTimeout(
                    $job->getConnectionName(), $job, $e
                );
                //触发任务超时事件
                $this->events->dispatch(new JobTimedOut(
                    $job->getConnectionName(), $job
                ));
            }
            //结束进程
            $this->kill(static::EXIT_ERROR, $options);
        }, true);
        //注册闹钟 到时间就会触发终止信号
        pcntl_alarm(
            max($this->timeoutForJob($job, $options), 0)
        );
    }

回到 daemon方法

 public function daemon($connectionName, $queue, WorkerOptions $options)
 {
         ...
        while (true) {
            ...
            if ($job) {
                //任务数统计
                $jobsProcessed++;
                //执行任务
                $this->runJob($job, $connectionName, $options);
                //任务之间睡眠多少秒数
                if ($options->rest > 0) {
                    $this->sleep($options->rest);
                }
            } else {
                //拿不到任务就睡眠几秒
                $this->sleep($options->sleep);
            }
            //判断信号
            $status = $this->stopIfNecessary(
                $options, $lastRestart, $startTime, $jobsProcessed, $job
            );
            //如果信号不是null 则停止进程
            if (! is_null($status)) {
                return $this->stop($status, $options);
            }
        }
 }

我们来分析runJob方法 看一下这个方法做了什么操作

 protected function runJob($job, $connectionName, WorkerOptions $options)
    {
        try {
            //执行
            return $this->process($connectionName, $job, $options);
        } catch (Throwable $e) {
            $this->exceptions->report($e);

            $this->stopWorkerIfLostConnection($e);
        }
    }

看一下 process方法做了什么操作

public function process($connectionName, $job, WorkerOptions $options)
    {
        try {
             //触发事件
            $this->raiseBeforeJobEvent($connectionName, $job);
            //标记任务失败 如果达到了最大尝试次数
            $this->markJobAsFailedIfAlreadyExceedsMaxAttempts(
                $connectionName, $job, (int) $options->maxTries
            );
            //判断任务是否标记了删除
            if ($job->isDeleted()) {
                return $this->raiseAfterJobEvent($connectionName, $job);
            }
            //重点
            $job->fire();
            //触发事件
            $this->raiseAfterJobEvent($connectionName, $job);
        } catch (Throwable $e) {
            $this->handleJobException($connectionName, $job, $options, $e);
        }
    }

重点分析一下这个 $job->fire()

public function fire()
    {
        //解析有效载荷 这个有效载荷是什么结构 可以看下图
        $payload = $this->payload();
        //这个job key的值是固定的 看下图可以看出是 Illuminate\\Queue\\CallQueuedHandler@call
        //所以$class=Illuminate\\Queue\\CallQueuedHandler  $method=call
        [$class, $method] = JobName::parse($payload['job']);
        //这里就是实例化Illuminate\\Queue\\CallQueuedHandler并调用call方法
        ($this->instance = $this->resolve($class))->{$method}($this, $payload['data']);
    }

有效载荷
Queue 消費者源码分析

我们看一下 Illuminate\\Queue\\CallQueuedHandler类的call做了什么处理

public function call(Job $job, array $data)
    {
        try {
            //set任务实例如果解析成功的话
           //getCommand是拿到业务处理类 
            $command = $this->setJobInstanceIfNecessary(
                $job, $this->getCommand($data)
            );
        } catch (ModelNotFoundException $e) {
            //这个方法里面是模型不存在的时候是否需要抛出异常
            return $this->handleModelNotFound($job, $e);
        }
        //判断任务是否设置了任务执行前是否唯一锁
        if ($command instanceof ShouldBeUniqueUntilProcessing) {
            //如果是则释放唯一锁
            $this->ensureUniqueJobLockIsReleased($command);
        }
        //这里是去执行业务逻辑
        $this->dispatchThroughMiddleware($job, $command);
        //如果任务没有被释放回队列且不是执行前释放锁 则去释放锁
        //计算标记失败了也会被释放锁
        if (! $job->isReleased() && ! $command instanceof ShouldBeUniqueUntilProcessing) {
            $this->ensureUniqueJobLockIsReleased($command);
        }
        //如果任务没有标记失败且没有被释放回队列
        if (! $job->hasFailed() && ! $job->isReleased()) {
            //如果是任务链则把下一个任务丢到队列执行
            $this->ensureNextJobInChainIsDispatched($command);
            //通知任务已成功完成
            $this->ensureSuccessfulBatchJobIsRecorded($command);
        }
        //如果没有被删除且没有被释放 代表成功了则删掉保留队列
        if (! $job->isDeletedOrReleased()) {
            $job->delete();
        }
    }

看一下dispatchThroughMiddleware方法里面做了什么操作

protected function dispatchThroughMiddleware(Job $job, $command)
    {
        //这里会检测业务类是否不存在了 就是反序列的时候得到了__PHP_Incomplete_Class这个的时候代表类不存在了
        if ($command instanceof \__PHP_Incomplete_Class) {
            throw new Exception('Job is incomplete class: '.json_encode($command));
        }
        //这里就是通过管道去运行 看是否有指定中间件 然后最终去handle方法去执行业务逻辑
        return (new Pipeline($this->container))->send($command)
                ->through(array_merge(method_exists($command, 'middleware') ? $command->middleware() : [], $command->middleware ?? []))
                ->then(function ($command) use ($job) {
                    return $this->dispatcher->dispatchNow(
                        $command, $this->resolveHandler($job, $command)
                    );
                });
    }

如果有异常被捕捉到 如果还有尝试次数和最大异常数 就会丢回队列 如果某一个没有了 就会被写入失败表里面
我们看一下有被捕捉到异常做了什么操作吧

 try {
     ...
 }  cacth(Throwable $e) {
     $this->handleJobException($connectionName, $job, $options, $e);
 }

handleJobException 看一下这个方法里面做了什么操作

protected function handleJobException($connectionName, $job, WorkerOptions $options, Throwable $e)
    {
        try {
            //如果没有标记失败
            if (! $job->hasFailed()) {
                //判断是否达到了最大尝试次数
                $this->markJobAsFailedIfWillExceedMaxAttempts(
                    $connectionName, $job, (int) $options->maxTries, $e
                );
               //判断是否到最大异常数
                $this->markJobAsFailedIfWillExceedMaxExceptions(
                    $connectionName, $job, $e
                );
            }
            $this->raiseExceptionOccurredJobEvent(
                $connectionName, $job, $e
            );
        } finally {
            //如果没有标记删除且没有被释放且没有标记失败
            if (! $job->isDeleted() && ! $job->isReleased() && ! $job->hasFailed()) {
                //释放回队列 计算backoff
                $job->release($this->calculateBackoff($job, $options));

                $this->events->dispatch(new JobReleasedAfterException(
                    $connectionName, $job
                ));
            }
        }
        throw $e;
    }

主要是看markJobAsFailedIfWillExceedMaxAttemptsmarkJobAsFailedIfWillExceedMaxExceptions方法中调用的failJob方法 看一下这个方法做了什么处理

protected function failJob($job, Throwable $e)
    {
        $job->fail($e);
    }

fail方法

public function fail($e = null)
{
    //标记删除
    $this->markAsFailed();
    //如果已被删除 则往下操作
    if ($this->isDeleted()) {
        return;
    }

    try {
        //标记删除 删除保留队列
        $this->delete();
        //如果任务里面有定义failed 这里就会去解析调用
        $this->failed($e);
    } finally {
        //这里会触发任务失败事件 在最上面的说明中有注册这个事件的地方 我放到下面说明一下
        $this->resolve(Dispatcher::class)->dispatch(new JobFailed(
            $this->connectionName, $this, $e ?: new ManuallyFailedException
        ));
    }
}
//任务失败(达到尝试次数) 
$this->laravel['events']->listen(JobFailed::class,  function  ($event)  { 
    //这里只是输出到控制台
    $this->writeOutput($event->job,  'failed'); 
    //重点是这里
    $this->logFailedJob($event); 
});

logFailedJob方法

protected function logFailedJob(JobFailed $event)
{
    //这里会调用注册在容器中的类
    $this->laravel['queue.failer']->log(
    $event->connectionName,
    $event->job->getQueue(),
    $event->job->getRawBody(),
    $event->exception
);
我们一般用的是database-uuids这个驱动
$this->app->singleton('queue.failer', function ($app) {
            $config = $app['config']['queue.failed'];

            ...
            elseif (isset($config['driver']) && $config['driver'] === 'database-uuids') {
                return $this->databaseUuidFailedJobProvider($config);
            } 
            ...
        });

databaseUuidFailedJobProvider方法

protected function databaseUuidFailedJobProvider($config)
    {
        return new DatabaseUuidFailedJobProvider(
            $this->app['db'], $config['database'], $config['table']
        );
    }

所以最终是调用这个DatabaseUuidFailedJobProvider类中的log方法

public function log($connection, $queue, $payload, $exception)
    {
        //这里就会把失败的任务插入到数据库
        $this->getTable()->insert([
            'uuid' => $uuid = json_decode($payload, true)['uuid'],
            'connection' => $connection,
            'queue' => $queue,
            'payload' => $payload,
            'exception' => (string) mb_convert_encoding($exception, 'UTF-8'),
            'failed_at' => Date::now(),
        ]);

        return $uuid;
    }

以上就是 queue消费者的整个执行流程了

本作品采用《CC 协议》,转载必须注明作者和本文链接
本帖由系统于 9个月前 自动加精
《L01 基础入门》
我们将带你从零开发一个项目并部署到线上,本课程教授 Web 开发中专业、实用的技能,如 Git 工作流、Laravel Mix 前端工作流等。
《L04 微信小程序从零到发布》
从小程序个人账户申请开始,带你一步步进行开发一个微信小程序,直到提交微信控制台上线发布。
讨论数量: 4

看不懂也收藏起来点赞一波

9个月前 评论

看不太懂

8个月前 评论
cccdz (楼主) 8个月前

老哥,go实战课程咋样啊?准备入手,还在犹豫,求解答

3个月前 评论

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!