Bus服务使用方式及源码分析
Overview
这是一篇关于 illuminate\Bus
的源码解析, 解析项目中分发 Job 的姿势和背后的原理.
本文基于 lumen 5.8
如何使用
对于 Job, 就不再介绍其使用场景了. 那么直接介绍一下一些使用姿势
通过辅助函数 (lumen-framework/src/helpers.php)
dispatch(new TestJob());
dispatch_now(new TestJob());
通过
Dispatcher
服务分发use Illuminate\Bus\Dispatcher; app(Dispatcher::class)->dispatch(new TestJob());
use Illuminate\Contracts\Bus\Dispatcher as DispatcherContract; app(DispatcherContract::class)->dispatch(new TestJob());
通过
Facades/Bus
分发use Illuminate\Support\Facades\Bus; Bus::dispatchNow(new TestJob());
中间件支持的 Job 分发
// 为分发的 Job 设置中间件处理 class TestJob { public function handle() { echo "任务执行结束 \n"; } } // 处理 Job 的管道 $pipes = [ function($job, $next) { echo "管道处理111 \n"; $next($job); }, function($job, $next) { echo "管道处理222 \n"; $next($job); }, ]; // 执行 app(Dispatcher::class) ->pipeThrough($pipes) ->dispatchNow(new TestJob()); dd(123); // 结果输出: 管道处理111 管道处理222 任务执行结束
注意: 以上设置管道的方式只适用于
dispatchNow
处理非 Job 类任务
illuminate\Bus
为我们提供了处理非Job
类功能, 本质上就是没有该类没有实现handle
方法, 我们可以设置一个中间处理类// 老的版本 Job 类 class OldVersionJob { public function call() { echo "任务执行结束 \n"; } } // 新的版本 Job 类 class NewVersionJob { public function __construct($oldJob) { $this->oldJob = $oldJob; } public function handle() { return $this->oldJob->call(); } } $oldJob = new OldVersionJob(); app(Dispatcher::class) ->dispatchNow(new NewVersionJob($oldJob)); dd(123); // 结果输出: 任务执行结束 `` 使用场景: 1: 为了兼容处理老的任务代码 2: 想要使用分发的方式处理非 `Job` 代码
源码分析
再了解 illuminate\Bus
服务的使用姿势之后, 就让我们看看其背后的源码吧
首先再学习一个新服务的时候, 我们可以首先看看其提供的服务(ServiceProvider). 所以我看一下
Illuminate\Bus\BusServiceProvider
public function register() { // 提供的服务实际类 $this->app->singleton(Dispatcher::class, function ($app) { return new Dispatcher($app, function ($connection = null) use ($app) { return $app[QueueFactoryContract::class]->connection($connection); }); }); // 设置别名 $this->app->alias( Dispatcher::class, DispatcherContract::class ); // 设置别名 $this->app->alias( Dispatcher::class, QueueingDispatcherContract::class ); }
所以
illuminate\Bus
本质上只提供了一个Illuminate\Bus\Dispatcher
实例化对象dispatchNow
public function dispatchNow($command, $handler = null) { // $handler 为 上面介绍的第 5 种分发方式 if ($handler || $handler = $this->getCommandHandler($command)) { $callback = function ($command) use ($handler) { return $handler->handle($command); }; } else { $callback = function ($command) { return $this->container->call([$command, 'handle']); }; } // $this->pipeline 提供了 使用方式 4, 管道的支持 return $this->pipeline->send($command)->through($this->pipes)->then($callback); }
// 辅助函数的调用方式源码 if (! function_exists('dispatch_now')) { function dispatch_now($job, $handler = null) { return app(Dispatcher::class)->dispatchNow($job, $handler); } }
我们可以看到, 辅助函数
dispatch_now
本质上也是调用dispatchNow
dispatch
不同于dispatchNow
,dispatch
提供了将 Job 分发到队列的支持(当然需要实现相应可入队列的接口)public function dispatch($command) { // 判断是否有队列服务 以及 该 Job 是否可入队列 if ($this->queueResolver && $this->commandShouldBeQueued($command)) { return $this->dispatchToQueue($command); } // 否者直接分发执行 return $this->dispatchNow($command); }
// 辅助函数实现方式源码 if (! function_exists('dispatch')) { function dispatch($job) { return new PendingDispatch($job); } }
PendingDispatch 类
class PendingDispatch { protected $job; public function __construct($job) { $this->job = $job; } public function onConnection($connection) { $this->job->onConnection($connection); return $this; } public function onQueue($queue) { $this->job->onQueue($queue); return $this; } public function __destruct() { // 最后走析构函数, 然后 Job 被分发 app(Dispatcher::class)->dispatch($this->job); } }
以上大致就是 illuminate\Bus
服务的源码, 当然其中还有一些与队列操作相关的 trait 没有讲到
扩展
其实我们看 Console/Commands
目录下的类都是使用 handle
方法, 所以我们也是可以通过这样执行 Command
app(Dispatcher::class)->dispatchNow(new TestCommand())
本作品采用《CC 协议》,转载必须注明作者和本文链接