Laravel 队列源码解析
“我总是控制不好自己的情绪。其实,情绪只是对自己无能的愤怒罢了。”
开篇
日常开发使用队列的场景不少了吧,至于如何使用,我想文档已经写的很清楚了,毕业一年多了,七月份换一家新公司的时候开始使用 Laravel
,因为项目中场景经常使用到 Laravel
中的队列,结合自己阅读的一丝队列的源码,写了这篇文章。(公司一直用的5.5 所以文章的版本你懂的。)
也不知道从哪讲起,那就从一个最基础的例子开始吧。创建一个最简单的任务类 SendMessage
。继承Illuminate\Contracts\Queue\ShouldQueue
接口。
简单 demo 开始
<?php
namespace App\Jobs;
use Illuminate\Bus\Queueable;
use Illuminate\Queue\SerializesModels;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Support\Facades\Log;
class SendMessage implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
protected $message;
/**
* Create a new job instance.
*
* @return void
*/
public function __construct($message)
{
$this->message = $message;
}
/**
* Execute the job.
*
* @return void
*/
public function handle()
{
Log::info($this->message);
}
}
这里直接分发一个任务类到队列中,并没有指定分发到哪个队列,那么会直接分发给默认的队列。直接从这里开始分析吧。
public function test()
{
$msg = '吴亲库里';
SendMessage::dispatch($msg);
}
首先 SendMessage
并没有 dispatch
这个静态方法, 但是它 use dispatchable
这样的 Trait
类,我们可以点开 dispatchable
类查看 dispatch
方法。
trait Dispatchable
{
/**
* Dispatch the job with the given arguments.
*
* @return \Illuminate\Foundation\Bus\PendingDispatch
*/
public static function dispatch()
{
return new PendingDispatch(new static(...func_get_args()));
}
/**
* Set the jobs that should run if this job is successful.
*
* @param array $chain
* @return \Illuminate\Foundation\Bus\PendingChain
*/
public static function withChain($chain)
{
return new PendingChain(get_called_class(), $chain);
}
}
可以看到在 dispatch
方法中 实例化另一个 PendingDispatch
类,并且根据传入的参数实例化任务 SendMessage
作为 PendingDispatch
类的参数。我们接着看,它是咋么分派任务?外层控制器现在只调用了 dispatch
,看看 PendingDispatch
类中有什么
<?php
namespace Illuminate\Foundation\Bus;
use Illuminate\Contracts\Bus\Dispatcher;
use Illuminate\Support\Facades\Log;
class PendingDispatch
{
/**
* The job.
*
* @var mixed
*/
protected $job;
/**
* Create a new pending job dispatch.
*
* @param mixed $job
* @return void
*/
public function __construct($job)
{
$this->job = $job;
}
/**
* Set the desired connection for the job.
*
* @param string|null $connection
* @return $this
*/
public function onConnection($connection)
{
$this->job->onConnection($connection);
return $this;
}
/**
* Set the desired queue for the job.
*
* @param string|null $queue
* @return $this
*/
public function onQueue($queue)
{
$this->job->onQueue($queue);
return $this;
}
/**
* Set the desired connection for the chain.
*
* @param string|null $connection
* @return $this
*/
public function allOnConnection($connection)
{
$this->job->allOnConnection($connection);
return $this;
}
/**
* Set the desired queue for the chain.
*
* @param string|null $queue
* @return $this
*/
public function allOnQueue($queue)
{
$this->job->allOnQueue($queue);
return $this;
}
/**
* Set the desired delay for the job.
*
* @param \DateTime|int|null $delay
* @return $this
*/
public function delay($delay)
{
$this->job->delay($delay);
return $this;
}
/**
* Set the jobs that should run if this job is successful.
*
* @param array $chain
* @return $this
*/
public function chain($chain)
{
$this->job->chain($chain);
return $this;
}
/**
* Handle the object's destruction.
*
* @return void
*/
public function __destruct()
{
app(Dispatcher::class)->dispatch($this->job);
}
}
这里查看它的构造和析构函数。好吧从析构函数上已经能看出来,执行推送任务的在这里,app(Dispatcher::class)
这又是什么鬼?看来还得从运行机制开始看。Laravel
底层提供了一个强大的 IOC
容器,我们这里通过辅助函数 app()
的形式访问它,并通过传递参数解析出一个服务对象。这里我们传递 Dispatcher::class
得到的是一个什么服务?这个服务又是在哪里被注册进去的。让我们把目光又转移到根目录下的 index.php
文件。因为这篇文章不是说运行流程,所以一些流程会跳过。
注册服务
/*
|--------------------------------------------------------------------------
| Run The Application
|--------------------------------------------------------------------------
|
| Once we have the application, we can handle the incoming request
| through the kernel, and send the associated response back to
| the client's browser allowing them to enjoy the creative
| and wonderful application we have prepared for them.
|
*/
$kernel = $app->make(Illuminate\Contracts\Http\Kernel::class);
$response = $kernel->handle(
$request = Illuminate\Http\Request::capture()
);
这个服务实际上执行了 handle
方法之后才有的(别问我为什么,如果和我一样笨的话,多打断点😂),这里的 $kernel
实际上得到的是一个
Illuminate\Foundation\Http\Kernel
类,让我们进去看看这个类里面的 handle
方法。
/**
* Handle an incoming HTTP request.
*
* @param \Illuminate\Http\Request $request
* @return \Illuminate\Http\Response
*/
public function handle($request)
{
try {
$request->enableHttpMethodParameterOverride();
$response = $this->sendRequestThroughRouter($request);
} catch (Exception $e) {
$this->reportException($e);
$response = $this->renderException($request, $e);
} catch (Throwable $e) {
$this->reportException($e = new FatalThrowableError($e));
$response = $this->renderException($request, $e);
}
$this->app['events']->dispatch(
new Events\RequestHandled($request, $response)
);
return $response;
}
这个方法主要是处理传入的请求,追踪一下 sendRequestThroughRouter
方法。
/**
* Send the given request through the middleware / router.
*
* @param \Illuminate\Http\Request $request
* @return \Illuminate\Http\Response
*/
protected function sendRequestThroughRouter($request)
{
$this->app->instance('request', $request);
Facade::clearResolvedInstance('request');
$this->bootstrap();
return (new Pipeline($this->app))
->send($request)
->through($this->app->shouldSkipMiddleware() ? [] : $this->middleware)
->then($this->dispatchToRouter());
}
其他的代码不是我们这篇文章讨论范围之内。主要追下 bootstarp()方法。方法名就很好理解了。
/**
* Bootstrap the application for HTTP requests.
*
* @return void
*/
public function bootstrap()
{
if (! $this->app->hasBeenBootstrapped()) {
$this->app->bootstrapWith($this->bootstrappers());
}
}
/**
* Get the bootstrap classes for the application.
*
* @return array
*/
protected function bootstrappers()
{
return $this->bootstrappers;
}
/**
* The bootstrap classes for the application.
*
* @var array
*/
protected $bootstrappers = [
\Illuminate\Foundation\Bootstrap\LoadEnvironmentVariables::class,
\Illuminate\Foundation\Bootstrap\LoadConfiguration::class,
\Illuminate\Foundation\Bootstrap\HandleExceptions::class,
\Illuminate\Foundation\Bootstrap\RegisterFacades::class,
\Illuminate\Foundation\Bootstrap\RegisterProviders::class,
\Illuminate\Foundation\Bootstrap\BootProviders::class,
];
应用程序初始化要引导的类,是一个数组,传入到已经存在的 Application
类中的 bootstrapWith
方法中,让我们追踪一下这个方法。
/**
* Run the given array of bootstrap classes.
*
* @param array $bootstrappers
* @return void
*/
public function bootstrapWith(array $bootstrappers)
{
$this->hasBeenBootstrapped = true;
foreach ($bootstrappers as $bootstrapper) {
$this['events']->fire('bootstrapping: '.$bootstrapper, [$this]);
$this->make($bootstrapper)->bootstrap($this);
$this['events']->fire('bootstrapped: '.$bootstrapper, [$this]);
}
}
遍历传入的数组 $bootstrappers
,继续追踪 $this->make
方法。
/**
* Resolve the given type from the container.
*
* (Overriding Container::make)
*
* @param string $abstract
* @param array $parameters
* @return mixed
*/
public function make($abstract, array $parameters = [])
{
$abstract = $this->getAlias($abstract);
if (isset($this->deferredServices[$abstract]) && ! isset($this->instances[$abstract])) {
$this->loadDeferredProvider($abstract);
}
return parent::make($abstract, $parameters);
}
根据传递的参数,从容器中解析给定的类型获取到实例对象。再回到上一步,调用每一个对象的 bootstrap
方法。我们主要看 RegisterProviders
中的 bootstrap
方法。
/**
* Bootstrap the given application.
*
* @param \Illuminate\Contracts\Foundation\Application $app
* @return void
*/
public function bootstrap(Application $app)
{
$app->registerConfiguredProviders();
}
重新回到 Application
的 registerConfiguredProviders
方法。
/**
* Register all of the configured providers.
*
* @return void
*/
public function registerConfiguredProviders()
{
$providers = Collection::make($this->config['app.providers'])
->partition(function ($provider) {
return Str::startsWith($provider, 'Illuminate\\');
});
$providers->splice(1, 0, [$this->make(PackageManifest::class)->providers()]);
(new ProviderRepository($this, new Filesystem, $this->getCachedServicesPath()))
->load($providers->collapse()->toArray());
}
注册所有配置提供的服务。因为这一块代码过多,不是本章讨论的范围(其实是我这一块有些地方还没看懂😃),所以主要看 $this->config['app.providers']
,原来是要加载 app.php
的 providers
里面的数组配置。
'providers' => [
/*
* Laravel Framework Service Providers...
*/
Illuminate\Auth\AuthServiceProvider::class,
Illuminate\Broadcasting\BroadcastServiceProvider::class,
Illuminate\Bus\BusServiceProvider::class,
Illuminate\Cache\CacheServiceProvider::class,
Illuminate\Foundation\Providers\ConsoleSupportServiceProvider::class,
Illuminate\Cookie\CookieServiceProvider::class,
Illuminate\Database\DatabaseServiceProvider::class,
Illuminate\Encryption\EncryptionServiceProvider::class,
Illuminate\Filesystem\FilesystemServiceProvider::class,
Illuminate\Foundation\Providers\FoundationServiceProvider::class,
Illuminate\Hashing\HashServiceProvider::class,
Illuminate\Mail\MailServiceProvider::class,
Illuminate\Notifications\NotificationServiceProvider::class,
Illuminate\Pagination\PaginationServiceProvider::class,
Illuminate\Pipeline\PipelineServiceProvider::class,
Illuminate\Queue\QueueServiceProvider::class,
Illuminate\Redis\RedisServiceProvider::class,
Illuminate\Auth\Passwords\PasswordResetServiceProvider::class,
Illuminate\Session\SessionServiceProvider::class,
Illuminate\Translation\TranslationServiceProvider::class,
Illuminate\Validation\ValidationServiceProvider::class,
Illuminate\View\ViewServiceProvider::class,
/*
* Package Service Providers...
*/
/*
* Application Service Providers...
*/
App\Providers\AppServiceProvider::class,
App\Providers\AuthServiceProvider::class,
// App\Providers\BroadcastServiceProvider::class,
App\Providers\EventServiceProvider::class,
App\Providers\RouteServiceProvider::class,
],
让我们点开 BusServiceProvider
,终于找到一开始想要的东西了,原来注册的就是这个服务啊。
/**
* Register the service provider.
*
* @return void
*/
public function register()
{
$this->app->singleton(Dispatcher::class, function ($app) {
return new Dispatcher($app, function ($connection = null) use ($app) {
return $app[QueueFactoryContract::class]->connection($connection);
});
});
$this->app->alias(
Dispatcher::class, DispatcherContract::class
);
$this->app->alias(
Dispatcher::class, QueueingDispatcherContract::class
);
}
解析服务
所以之前的 app(Dispatcher::class)
解析的实际上是 BusServiceProvider
服务。
所以上面的析构函数实际上调用的是 Dispatcher
类中的 dispatch
方法。
/**
* Dispatch a command to its appropriate handler.
*
* @param mixed $command
* @return mixed
*/
public function dispatch($command)
{
if ($this->queueResolver && $this->commandShouldBeQueued($command)) {
return $this->dispatchToQueue($command);
}
return $this->dispatchNow($command);
}
这里的 commandShouldBeQueued
方法点进去看下。
/**
* Determine if the given command should be queued.
*
* @param mixed $command
* @return bool
*/
protected function commandShouldBeQueued($command)
{
return $command instanceof ShouldQueue;
}
这里就判断任务类是否属于 ShouldQueue
的实例,因为开头我们创建的类是继承自此类的。继承此类表示我们的队列是异步执行而非同步。
首先 Laravel
会去检查任务中是否设置了 connection
属性,表示的是把此次任务发送到哪个连接中,如果未设置,使用默认的。通过设置的连接,使用一个 queueResolver
的闭包来构建应该使用哪一个队列驱动的实例。这里我并没有在任务类中设置指定的 $connetction
,所以会使用默认配置,我在一开始就配置 redis
,打印一下这个$queue
,将得到一个Illuminate\Queue\RedisQueue
的实例。直接看最后一句。
推送至指定队列
/**
* Push the command onto the given queue instance.
*
* @param \Illuminate\Contracts\Queue\Queue $queue
* @param mixed $command
* @return mixed
*/
protected function pushCommandToQueue($queue, $command)
{
if (isset($command->queue, $command->delay)) {
return $queue->laterOn($command->queue, $command->delay, $command);
}
if (isset($command->queue)) {
return $queue->pushOn($command->queue, $command);
}
if (isset($command->delay)) {
return $queue->later($command->delay, $command);
}
return $queue->push($command);
}
这个函数的作用就是将任务推送到给定的队列中,这其中会根据任务类的配置,运行对应的操作。比如第一句,延迟将任务推送到队列。这里我们的任务类什么都没配置,当然直接追踪最后一句。前面已经说了,这里我们得到的是一个 Illuminate\Queue\RedisQueue
的实例,那就直接去访问这个类中的 push
方法吧。
/**
* Push a new job onto the queue.
*
* @param object|string $job
* @param mixed $data
* @param string $queue
* @return mixed
*/
public function push($job, $data = '', $queue = null)
{
return $this->pushRaw($this->createPayload($job, $data), $queue);
}
/**
* Push a raw payload onto the queue.
*
* @param string $payload
* @param string $queue
* @param array $options
* @return mixed
*/
public function pushRaw($payload, $queue = null, array $options = [])
{
$this->getConnection()->rpush($this->getQueue($queue), $payload);
return json_decode($payload, true)['id'] ?? null;
}
就一句话推送任务到队列去,继续追踪。下面的方法。可以看到 Laravel
以 redis
为队列是通过 List
的数据形式存在的,每推送一个任务,从左往右排队进入列表中,键名,够清晰了吧,因为我们并没有设置 $queue
,所以取默认值,那么我们得到的就是一个 queues:default
的字符串。
/**
* Get the queue or return the default.
*
* @param string|null $queue
* @return string
*/
public function getQueue($queue)
{
return 'queues:'.($queue ?: $this->default);
}
至于值嘛,我们也可以看下,一波对于是否是对象的判断之后,通过 json_encode()
进行编码。
protected function createPayload($job, $data = '')
{
$payload = json_encode($this->createPayloadArray($job, $data));
if (JSON_ERROR_NONE !== json_last_error()) {
throw new InvalidPayloadException(
'Unable to JSON encode payload. Error code: '.json_last_error()
);
}
return $payload;
}
/**
* Create a payload array from the given job and data.
*
* @param string $job
* @param mixed $data
* @return array
*/
protected function createPayloadArray($job, $data = '')
{
return is_object($job)
? $this->createObjectPayload($job)
: $this->createStringPayload($job, $data);
}
结尾
到这里的话代码已经追踪的差不多了,当然这里面还有很多是没有提到的,比如,在运行 queue:work
之后,底层都在做什么。队列任务是如何并取出来的,work
还可以跟很多的参数,这里面都发生了什么。,延迟任务,监听机制.....,我觉得看源码虽然一开始头顶略微凉了一点,但是看多了,你就是行走中的移动文档。
本作品采用《CC 协议》,转载必须注明作者和本文链接
推荐文章: