Laravel 队列源码解析

我总是控制不好自己的情绪。其实,情绪只是对自己无能的愤怒罢了。

Laravel 队列源码解析

开篇

日常开发使用队列的场景不少了吧,至于如何使用,我想文档已经写的很清楚了,毕业一年多了,七月份换一家新公司的时候开始使用 Laravel,因为项目中场景经常使用到 Laravel 中的队列,结合自己阅读的一丝队列的源码,写了这篇文章。(公司一直用的5.5 所以文章的版本你懂的。)

也不知道从哪讲起,那就从一个最基础的例子开始吧。创建一个最简单的任务类 SendMessage。继承Illuminate\Contracts\Queue\ShouldQueue 接口。

简单 demo 开始

<?php

namespace App\Jobs;

use Illuminate\Bus\Queueable;
use Illuminate\Queue\SerializesModels;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Support\Facades\Log;

class SendMessage implements ShouldQueue
{

    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

    protected $message;

    /**
     * Create a new job instance.
     *
     * @return void
     */
    public function __construct($message)
    {
        $this->message = $message;
    }

    /**
     * Execute the job.
     *
     * @return void
     */
    public function handle()
    {
        Log::info($this->message);
    }
}

这里直接分发一个任务类到队列中,并没有指定分发到哪个队列,那么会直接分发给默认的队列。直接从这里开始分析吧。

 public function test()
    {
        $msg = '吴亲库里';
        SendMessage::dispatch($msg);
    }

首先 SendMessage 并没有 dispatch 这个静态方法, 但是它 use dispatchable 这样的 Trait 类,我们可以点开 dispatchable 类查看 dispatch 方法。


trait Dispatchable
{
    /**
     * Dispatch the job with the given arguments.
     *
     * @return \Illuminate\Foundation\Bus\PendingDispatch
     */
    public static function dispatch()
{
        return new PendingDispatch(new static(...func_get_args()));
    }

    /**
     * Set the jobs that should run if this job is successful.
     *
     * @param  array  $chain
     * @return \Illuminate\Foundation\Bus\PendingChain
     */
    public static function withChain($chain)
{
        return new PendingChain(get_called_class(), $chain);
    }
}

可以看到在 dispatch 方法中 实例化另一个 PendingDispatch 类,并且根据传入的参数实例化任务 SendMessage 作为 PendingDispatch 类的参数。我们接着看,它是咋么分派任务?外层控制器现在只调用了 dispatch,看看 PendingDispatch 类中有什么


<?php

namespace Illuminate\Foundation\Bus;

use Illuminate\Contracts\Bus\Dispatcher;
use Illuminate\Support\Facades\Log;

class PendingDispatch
{
    /**
     * The job.
     *
     * @var mixed
     */
    protected $job;

    /**
     * Create a new pending job dispatch.
     *
     * @param  mixed  $job
     * @return void
     */
    public function __construct($job)
{
        $this->job = $job;
    }

    /**
     * Set the desired connection for the job.
     *
     * @param  string|null  $connection
     * @return $this
     */
    public function onConnection($connection)
{
        $this->job->onConnection($connection);

        return $this;
    }

    /**
     * Set the desired queue for the job.
     *
     * @param  string|null  $queue
     * @return $this
     */
    public function onQueue($queue)
{

        $this->job->onQueue($queue);

        return $this;
    }

    /**
     * Set the desired connection for the chain.
     *
     * @param  string|null  $connection
     * @return $this
     */
    public function allOnConnection($connection)
{
        $this->job->allOnConnection($connection);

        return $this;
    }

    /**
     * Set the desired queue for the chain.
     *
     * @param  string|null  $queue
     * @return $this
     */
    public function allOnQueue($queue)
{
        $this->job->allOnQueue($queue);

        return $this;
    }

    /**
     * Set the desired delay for the job.
     *
     * @param  \DateTime|int|null  $delay
     * @return $this
     */
    public function delay($delay)
{
        $this->job->delay($delay);

        return $this;
    }

    /**
     * Set the jobs that should run if this job is successful.
     *
     * @param  array  $chain
     * @return $this
     */
    public function chain($chain)
{
        $this->job->chain($chain);

        return $this;
    }

    /**
     * Handle the object's destruction.
     *
     * @return void
     */
    public function __destruct()
{
        app(Dispatcher::class)->dispatch($this->job);
    }
}

这里查看它的构造和析构函数。好吧从析构函数上已经能看出来,执行推送任务的在这里,app(Dispatcher::class) 这又是什么鬼?看来还得从运行机制开始看。Laravel 底层提供了一个强大的 IOC 容器,我们这里通过辅助函数 app() 的形式访问它,并通过传递参数解析出一个服务对象。这里我们传递 Dispatcher::class 得到的是一个什么服务?这个服务又是在哪里被注册进去的。让我们把目光又转移到根目录下的 index.php 文件。因为这篇文章不是说运行流程,所以一些流程会跳过。

注册服务


/*
|--------------------------------------------------------------------------
| Run The Application
|--------------------------------------------------------------------------
|
| Once we have the application, we can handle the incoming request
| through the kernel, and send the associated response back to
| the client's browser allowing them to enjoy the creative
| and wonderful application we have prepared for them.
|
*/

$kernel = $app->make(Illuminate\Contracts\Http\Kernel::class);

$response = $kernel->handle(
    $request = Illuminate\Http\Request::capture()
);

这个服务实际上执行了 handle 方法之后才有的(别问我为什么,如果和我一样笨的话,多打断点😂),这里的 $kernel 实际上得到的是一个 

Illuminate\Foundation\Http\Kernel 类,让我们进去看看这个类里面的 handle 方法。

/**
     * Handle an incoming HTTP request.
     *
     * @param  \Illuminate\Http\Request  $request
     * @return \Illuminate\Http\Response
     */
    public function handle($request)
  {
        try {
            $request->enableHttpMethodParameterOverride();

            $response = $this->sendRequestThroughRouter($request);
        } catch (Exception $e) {
            $this->reportException($e);

            $response = $this->renderException($request, $e);
        } catch (Throwable $e) {
            $this->reportException($e = new FatalThrowableError($e));

            $response = $this->renderException($request, $e);
        }

        $this->app['events']->dispatch(
            new Events\RequestHandled($request, $response)
        );

        return $response;
    }

这个方法主要是处理传入的请求,追踪一下 sendRequestThroughRouter 方法。

  /**
     * Send the given request through the middleware / router.
     *
     * @param  \Illuminate\Http\Request  $request
     * @return \Illuminate\Http\Response
     */
    protected function sendRequestThroughRouter($request)
  {
        $this->app->instance('request', $request);

        Facade::clearResolvedInstance('request');

        $this->bootstrap();

        return (new Pipeline($this->app))
                    ->send($request)
                    ->through($this->app->shouldSkipMiddleware() ? [] : $this->middleware)
                    ->then($this->dispatchToRouter());
    }

其他的代码不是我们这篇文章讨论范围之内。主要追下 bootstarp()方法。方法名就很好理解了。


/**
     * Bootstrap the application for HTTP requests.
     *
     * @return void
     */
    public function bootstrap()
   {

        if (! $this->app->hasBeenBootstrapped()) {
            $this->app->bootstrapWith($this->bootstrappers());
        }

    }
    /**
     * Get the bootstrap classes for the application.
     *
     * @return array
     */
    protected function bootstrappers()
   {
        return $this->bootstrappers;
    }
 /**
     * The bootstrap classes for the application.
     *
     * @var array
     */
    protected $bootstrappers = [
        \Illuminate\Foundation\Bootstrap\LoadEnvironmentVariables::class,
        \Illuminate\Foundation\Bootstrap\LoadConfiguration::class,
        \Illuminate\Foundation\Bootstrap\HandleExceptions::class,
        \Illuminate\Foundation\Bootstrap\RegisterFacades::class,
        \Illuminate\Foundation\Bootstrap\RegisterProviders::class,
        \Illuminate\Foundation\Bootstrap\BootProviders::class,
    ];

应用程序初始化要引导的类,是一个数组,传入到已经存在的 Application 类中的 bootstrapWith 方法中,让我们追踪一下这个方法。

    /**
     * Run the given array of bootstrap classes.
     *
     * @param  array  $bootstrappers
     * @return void
     */
    public function bootstrapWith(array $bootstrappers)
   {
        $this->hasBeenBootstrapped = true;

        foreach ($bootstrappers as $bootstrapper) {
            $this['events']->fire('bootstrapping: '.$bootstrapper, [$this]);

            $this->make($bootstrapper)->bootstrap($this);

            $this['events']->fire('bootstrapped: '.$bootstrapper, [$this]);

        }
    }

遍历传入的数组 $bootstrappers,继续追踪 $this->make 方法。

    /**
     * Resolve the given type from the container.
     *
     * (Overriding Container::make)
     *
     * @param  string  $abstract
     * @param  array  $parameters
     * @return mixed
     */
    public function make($abstract, array $parameters = [])
  {
        $abstract = $this->getAlias($abstract);

        if (isset($this->deferredServices[$abstract]) && ! isset($this->instances[$abstract])) {
            $this->loadDeferredProvider($abstract);
        }

        return parent::make($abstract, $parameters);
    }

根据传递的参数,从容器中解析给定的类型获取到实例对象。再回到上一步,调用每一个对象的 bootstrap 方法。我们主要看 RegisterProviders 中的 bootstrap 方法。


/**
     * Bootstrap the given application.
     *
     * @param  \Illuminate\Contracts\Foundation\Application  $app
     * @return void
     */
    public function bootstrap(Application $app)
   {
        $app->registerConfiguredProviders();
    }

重新回到 ApplicationregisterConfiguredProviders 方法。

/**
     * Register all of the configured providers.
     *
     * @return void
     */
    public function registerConfiguredProviders()
 {
        $providers = Collection::make($this->config['app.providers'])
                        ->partition(function ($provider) {
                            return Str::startsWith($provider, 'Illuminate\\');
                        });

        $providers->splice(1, 0, [$this->make(PackageManifest::class)->providers()]);

        (new ProviderRepository($this, new Filesystem, $this->getCachedServicesPath()))
                    ->load($providers->collapse()->toArray());

    }

注册所有配置提供的服务。因为这一块代码过多,不是本章讨论的范围(其实是我这一块有些地方还没看懂😃),所以主要看 $this->config['app.providers'],原来是要加载 app.phpproviders 里面的数组配置。

'providers' => [

        /*
         * Laravel Framework Service Providers...
         */
        Illuminate\Auth\AuthServiceProvider::class,
        Illuminate\Broadcasting\BroadcastServiceProvider::class,
        Illuminate\Bus\BusServiceProvider::class,
        Illuminate\Cache\CacheServiceProvider::class,
        Illuminate\Foundation\Providers\ConsoleSupportServiceProvider::class,
        Illuminate\Cookie\CookieServiceProvider::class,
        Illuminate\Database\DatabaseServiceProvider::class,
        Illuminate\Encryption\EncryptionServiceProvider::class,
        Illuminate\Filesystem\FilesystemServiceProvider::class,
        Illuminate\Foundation\Providers\FoundationServiceProvider::class,
        Illuminate\Hashing\HashServiceProvider::class,
        Illuminate\Mail\MailServiceProvider::class,
        Illuminate\Notifications\NotificationServiceProvider::class,
        Illuminate\Pagination\PaginationServiceProvider::class,
        Illuminate\Pipeline\PipelineServiceProvider::class,
        Illuminate\Queue\QueueServiceProvider::class,
        Illuminate\Redis\RedisServiceProvider::class,
        Illuminate\Auth\Passwords\PasswordResetServiceProvider::class,
        Illuminate\Session\SessionServiceProvider::class,
        Illuminate\Translation\TranslationServiceProvider::class,
        Illuminate\Validation\ValidationServiceProvider::class,
        Illuminate\View\ViewServiceProvider::class,

        /*
         * Package Service Providers...
         */

        /*
         * Application Service Providers...
         */
        App\Providers\AppServiceProvider::class,
        App\Providers\AuthServiceProvider::class,
        // App\Providers\BroadcastServiceProvider::class,
        App\Providers\EventServiceProvider::class,
        App\Providers\RouteServiceProvider::class,

    ],

让我们点开  BusServiceProvider ,终于找到一开始想要的东西了,原来注册的就是这个服务啊。

    /**
     * Register the service provider.
     *
     * @return void
     */
    public function register()
   {
        $this->app->singleton(Dispatcher::class, function ($app) {
            return new Dispatcher($app, function ($connection = null) use ($app) {
                return $app[QueueFactoryContract::class]->connection($connection);
            });
        });

        $this->app->alias(
            Dispatcher::class, DispatcherContract::class
        );

        $this->app->alias(
            Dispatcher::class, QueueingDispatcherContract::class
        );
    }

解析服务

所以之前的 app(Dispatcher::class) 解析的实际上是 BusServiceProvider 服务。

所以上面的析构函数实际上调用的是  Dispatcher 类中的 dispatch 方法。

    /**
     * Dispatch a command to its appropriate handler.
     *
     * @param  mixed  $command
     * @return mixed
     */
    public function dispatch($command)
   {
        if ($this->queueResolver && $this->commandShouldBeQueued($command)) {
            return $this->dispatchToQueue($command);
        }

        return $this->dispatchNow($command);
    }

这里的 commandShouldBeQueued 方法点进去看下。


/**
     * Determine if the given command should be queued.
     *
     * @param  mixed  $command
     * @return bool
     */
    protected function commandShouldBeQueued($command)
{       
      return $command instanceof ShouldQueue;
    }

这里就判断任务类是否属于 ShouldQueue 的实例,因为开头我们创建的类是继承自此类的。继承此类表示我们的队列是异步执行而非同步。

首先 Laravel 会去检查任务中是否设置了 connection 属性,表示的是把此次任务发送到哪个连接中,如果未设置,使用默认的。通过设置的连接,使用一个 queueResolver 的闭包来构建应该使用哪一个队列驱动的实例。这里我并没有在任务类中设置指定的 $connetction ,所以会使用默认配置,我在一开始就配置 redis,打印一下这个$queue,将得到一个Illuminate\Queue\RedisQueue 的实例。直接看最后一句。 

推送至指定队列

    /**
     * Push the command onto the given queue instance.
     *
     * @param  \Illuminate\Contracts\Queue\Queue  $queue
     * @param  mixed  $command
     * @return mixed
     */
    protected function pushCommandToQueue($queue, $command)
  {
        if (isset($command->queue, $command->delay)) {
            return $queue->laterOn($command->queue, $command->delay, $command);
        }

        if (isset($command->queue)) {
            return $queue->pushOn($command->queue, $command);
        }

        if (isset($command->delay)) {
            return $queue->later($command->delay, $command);
        }

        return $queue->push($command);
    }

这个函数的作用就是将任务推送到给定的队列中,这其中会根据任务类的配置,运行对应的操作。比如第一句,延迟将任务推送到队列。这里我们的任务类什么都没配置,当然直接追踪最后一句。前面已经说了,这里我们得到的是一个 Illuminate\Queue\RedisQueue 的实例,那就直接去访问这个类中的 push方法吧。

    /**
     * Push a new job onto the queue.
     *
     * @param  object|string  $job
     * @param  mixed   $data
     * @param  string  $queue
     * @return mixed
     */
    public function push($job, $data = '', $queue = null)
  {
        return $this->pushRaw($this->createPayload($job, $data), $queue);
    }

    /**
     * Push a raw payload onto the queue.
     *
     * @param  string  $payload
     * @param  string  $queue
     * @param  array   $options
     * @return mixed
     */
    public function pushRaw($payload, $queue = null, array $options = [])
{
        $this->getConnection()->rpush($this->getQueue($queue), $payload);
        return json_decode($payload, true)['id'] ?? null;
    }

就一句话推送任务到队列去,继续追踪。下面的方法。可以看到 Laravelredis 为队列是通过 List 的数据形式存在的,每推送一个任务,从左往右排队进入列表中,键名,够清晰了吧,因为我们并没有设置 $queue,所以取默认值,那么我们得到的就是一个 queues:default 的字符串。

    /**
     * Get the queue or return the default.
     *
     * @param  string|null  $queue
     * @return string
     */
    public function getQueue($queue)
   {
        return 'queues:'.($queue ?: $this->default);
    }

至于值嘛,我们也可以看下,一波对于是否是对象的判断之后,通过 json_encode() 进行编码。

    protected function createPayload($job, $data = '')
  {
        $payload = json_encode($this->createPayloadArray($job, $data));

        if (JSON_ERROR_NONE !== json_last_error()) {
            throw new InvalidPayloadException(
                'Unable to JSON encode payload. Error code: '.json_last_error()
            );
        }

        return $payload;
    }

    /**
     * Create a payload array from the given job and data.
     *
     * @param  string  $job
     * @param  mixed   $data
     * @return array
     */
    protected function createPayloadArray($job, $data = '')
  {
        return is_object($job)
                    ? $this->createObjectPayload($job)
                    : $this->createStringPayload($job, $data);
    }

结尾

到这里的话代码已经追踪的差不多了,当然这里面还有很多是没有提到的,比如,在运行 queue:work 之后,底层都在做什么。队列任务是如何并取出来的,work 还可以跟很多的参数,这里面都发生了什么。,延迟任务,监听机制.....,我觉得看源码虽然一开始头顶略微凉了一点,但是看多了,你就是行走中的移动文档。

本作品采用《CC 协议》,转载必须注明作者和本文链接
吴亲库里
《L05 电商实战》
从零开发一个电商项目,功能包括电商后台、商品 & SKU 管理、购物车、订单管理、支付宝支付、微信支付、订单退款流程、优惠券等
《G01 Go 实战入门》
从零开始带你一步步开发一个 Go 博客项目,让你在最短的时间内学会使用 Go 进行编码。项目结构很大程度上参考了 Laravel。
讨论数量: 1

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!
未填写
文章
46
粉丝
117
喜欢
493
收藏
604
排名:177
访问:5.5 万
私信
所有博文
社区赞助商