Bus服务使用方式及源码分析

Overview

这是一篇关于 illuminate\Bus 的源码解析, 解析项目中分发 Job 的姿势和背后的原理.

本文基于 lumen 5.8

如何使用

对于 Job, 就不再介绍其使用场景了. 那么直接介绍一下一些使用姿势

  1. 通过辅助函数 (lumen-framework/src/helpers.php)

    dispatch(new TestJob());
    dispatch_now(new TestJob());
  2. 通过 Dispatcher 服务分发

    use Illuminate\Bus\Dispatcher;
    app(Dispatcher::class)->dispatch(new TestJob());
    use Illuminate\Contracts\Bus\Dispatcher as DispatcherContract;
    app(DispatcherContract::class)->dispatch(new TestJob());
  3. 通过 Facades/Bus 分发

    use Illuminate\Support\Facades\Bus;
    Bus::dispatchNow(new TestJob());
  4. 中间件支持的 Job 分发

    // 为分发的 Job 设置中间件处理
    class TestJob
    {
     public function handle()
     {
         echo "任务执行结束 \n";
     }
    }
    // 处理 Job 的管道
    $pipes = [
     function($job, $next) {
         echo "管道处理111 \n";
         $next($job);
     },
     function($job, $next) {
         echo "管道处理222 \n";
         $next($job);
     },
    ];
    // 执行
    app(Dispatcher::class)
     ->pipeThrough($pipes)
     ->dispatchNow(new TestJob());
    dd(123);
    // 结果输出:  管道处理111 管道处理222 任务执行结束

    注意: 以上设置管道的方式只适用于 dispatchNow

  5. 处理非 Job 类任务
    illuminate\Bus 为我们提供了处理非 Job 类功能, 本质上就是没有该类没有实现 handle 方法, 我们可以设置一个中间处理类

    // 老的版本 Job 类
    class OldVersionJob
    {
     public function call()
     {
         echo "任务执行结束 \n";
     }
    }
    // 新的版本 Job 类
    class NewVersionJob
    {
     public function __construct($oldJob)
     {
         $this->oldJob = $oldJob;
     }
    
     public function handle()
     {
         return $this->oldJob->call();
     }
    }
    $oldJob = new OldVersionJob();
    app(Dispatcher::class)
     ->dispatchNow(new NewVersionJob($oldJob));
    dd(123);
    // 结果输出: 任务执行结束 
    ``
    使用场景:
     1: 为了兼容处理老的任务代码
     2: 想要使用分发的方式处理非 `Job` 代码
    

源码分析

再了解 illuminate\Bus 服务的使用姿势之后, 就让我们看看其背后的源码吧

  1. 首先再学习一个新服务的时候, 我们可以首先看看其提供的服务(ServiceProvider). 所以我看一下 Illuminate\Bus\BusServiceProvider

     public function register()
     {
         // 提供的服务实际类
         $this->app->singleton(Dispatcher::class, function ($app) {
             return new Dispatcher($app, function ($connection = null) use ($app) {
                 return $app[QueueFactoryContract::class]->connection($connection);
             });
         });
    
         // 设置别名
         $this->app->alias(
             Dispatcher::class, DispatcherContract::class
         );
    
         // 设置别名
         $this->app->alias(
             Dispatcher::class, QueueingDispatcherContract::class
         );
     }

    所以 illuminate\Bus 本质上只提供了一个 Illuminate\Bus\Dispatcher 实例化对象

  2. dispatchNow

     public function dispatchNow($command, $handler = null)
     {
         // $handler 为 上面介绍的第 5 种分发方式
         if ($handler || $handler = $this->getCommandHandler($command)) {
             $callback = function ($command) use ($handler) {
                 return $handler->handle($command);
             };
         } else {
             $callback = function ($command) {
                 return $this->container->call([$command, 'handle']);
             };
         }
    
         // $this->pipeline 提供了 使用方式 4, 管道的支持
         return $this->pipeline->send($command)->through($this->pipes)->then($callback);
     }
    // 辅助函数的调用方式源码
    if (! function_exists('dispatch_now')) {
     function dispatch_now($job, $handler = null)
     {
         return app(Dispatcher::class)->dispatchNow($job, $handler);
     }
    }

    我们可以看到, 辅助函数 dispatch_now 本质上也是调用 dispatchNow

  3. dispatch
    不同于 dispatchNow, dispatch 提供了将 Job 分发到队列的支持(当然需要实现相应可入队列的接口)

     public function dispatch($command)
     {
         // 判断是否有队列服务 以及 该 Job 是否可入队列
         if ($this->queueResolver && $this->commandShouldBeQueued($command)) {
             return $this->dispatchToQueue($command);
         }
    
         // 否者直接分发执行 
         return $this->dispatchNow($command);
     }
    // 辅助函数实现方式源码
    if (! function_exists('dispatch')) {
     function dispatch($job)
     {
         return new PendingDispatch($job);
     }
    }

    PendingDispatch 类

    class PendingDispatch
    {
     protected $job;
    
     public function __construct($job)
     {
         $this->job = $job;
     }
    
     public function onConnection($connection)
     {
         $this->job->onConnection($connection);
    
         return $this;
     }
    
     public function onQueue($queue)
     {
         $this->job->onQueue($queue);
    
         return $this;
     }
    
     public function __destruct()
     {
         // 最后走析构函数, 然后 Job 被分发
         app(Dispatcher::class)->dispatch($this->job);
     }
    }

以上大致就是 illuminate\Bus 服务的源码, 当然其中还有一些与队列操作相关的 trait 没有讲到

扩展

其实我们看 Console/Commands 目录下的类都是使用 handle 方法, 所以我们也是可以通过这样执行 Command

app(Dispatcher::class)->dispatchNow(new TestCommand())
本作品采用《CC 协议》,转载必须注明作者和本文链接
《L05 电商实战》
从零开发一个电商项目,功能包括电商后台、商品 & SKU 管理、购物车、订单管理、支付宝支付、微信支付、订单退款流程、优惠券等
《G01 Go 实战入门》
从零开始带你一步步开发一个 Go 博客项目,让你在最短的时间内学会使用 Go 进行编码。项目结构很大程度上参考了 Laravel。
讨论数量: 0
(= ̄ω ̄=)··· 暂无内容!

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!