剖析 Laravel 队列系统--推送作业到队列

译文GitHub https://github.com/yuansir/diving-laravel-...

原文链接https://divinglaravel.com/queue-system/pus...

有几种方法可以将作业推送到队列中:

Queue::push(new InvoiceEmail($order));

Bus::dispatch(new InvoiceEmail($order));

dispatch(new InvoiceEmail($order));

(new InvoiceEmail($order))->dispatch();

调用Queue facade是对应用程序使用的队列驱动的调用,如果你使用数据库队列驱动,调用push方法是调用Queue\DatabaseQueue类的push方法。

有几种有用的方法可以使用:

// 将作业推送到特定的队列
Queue::pushOn('emails', new InvoiceEmail($order));

// 在给定的秒数之后推送作业
Queue::later(60, new InvoiceEmail($order));

// 延迟后将作业推送到特定的队列
Queue::laterOn('emails', 60, new InvoiceEmail($order));

// 推送多个作业
Queue::bulk([
    new InvoiceEmail($order),
    new ThankYouEmail($order)
]);

// 推送特定队列上的多个作业
Queue::bulk([
    new InvoiceEmail($order),
    new ThankYouEmail($order)
], null, 'emails');

调用这些方法之后,所选择的队列驱动会将给定的信息存储在存储空间中,供workers按需获取。

使用命令总线

使用命令总线调度作业进行排队可以给你额外控制权; 您可以从作业类中设置选定的connection, queue, and delay 来决定命令是否应该排队或立即运行,在运行之前通过管道发送作业,实际上你甚至可以从你的作业类中处理整个队列过程。

Bug facade代理到Contracts\Bus\Dispatcher 容器别名,此别名解析为Bus\Dispatcher内的Bus\BusServiceProvider的一个实例:

$this->app->singleton(Dispatcher::class, function ($app) {
    return new Dispatcher($app, function ($connection = null) use ($app) {
        return $app[QueueFactoryContract::class]->connection($connection);
    });
});

所以Bus::dispatch() 调用的 dispatch() 方法是 Bus\Dispatcher 类的:

public function dispatch($command)
{
    if ($this->queueResolver && $this->commandShouldBeQueued($command)) {
        return $this->dispatchToQueue($command);
    } else {
        return $this->dispatchNow($command);
    }
}

该方法决定是否将给定的作业分派到队列或立即运行,commandShouldBeQueued() 方法检查作业类是否是 Contracts\Queue\ShouldQueue, 的实例,因此使用此方法,您的作业只有继承了ShouldQueue接口才会被放到队列中。

我们不会在这篇中深入dispatchNow,我们将在深入worker如何执行作业中详细讨论。 现在我们来看看总线如何调度你的工作队列:

public function dispatchToQueue($command)
{
    $connection = isset($command->connection) ? $command->connection : null;

    $queue = call_user_func($this->queueResolver, $connection);

    if (! $queue instanceof Queue) {
        throw new RuntimeException('Queue resolver did not return a Queue implementation.');
    }

    if (method_exists($command, 'queue')) {
        return $command->queue($queue, $command);
    } else {
        return $this->pushCommandToQueue($queue, $command);
    }
}

首先 Laravel会检查您的作业类中是否定义了connection 属性,使用这个属性可以设置Laravel应该将排队作业发送到哪个连接,如果未定义任何属性,将使用null属性,在这种情况下Laravel将使用默认连接。

通过设置的连接,Laravel使用一个queueResolver闭包来构建应该使用哪个队列驱动的实例,当注册调度器实例的时候这个闭包在Bus\BusServiceProvider 中被设置:

function ($connection = null) use ($app) {
    return $app[Contracts\Queue\Factory::class]->connection($connection);
}

Contracts\Queue\FactoryQueue\QueueManager的别名,换句话说,该闭包返回一个QueueManager实例,并为manager设置所使用的队列驱动需要的连接。

最后,dispatchToQueue方法检查作业类是否具有queue方法,如果调度器调用此方法,可以完全控制作业排队的方式,您可以选择队列,分配延迟,设置最大重试次数, 超时等

如果没有找到 queue 方法,对 pushCommandToQueue() 的调用将调用所选队列驱动上的push方法:

protected function pushCommandToQueue($queue, $command)
{
    if (isset($command->queue, $command->delay)) {
        return $queue->laterOn($command->queue, $command->delay, $command);
    }

    if (isset($command->queue)) {
        return $queue->pushOn($command->queue, $command);
    }

    if (isset($command->delay)) {
        return $queue->later($command->delay, $command);
    }

    return $queue->push($command);
}

调度器检查Job类中的 queuedelay ,并根据此选择适当的队列方法。

所以我可以设置工作类中的队列,延迟和连接?

是的,您还可以设置一个triestimeout 属性,队列驱动也将使用这些值,以下工作类示例:

class SendInvoiceEmail{
    public $connection = 'default';

    public $queue = 'emails';

    public $delay = 60;

    public $tries = 3;

    public $timeout = 20;
}

Setting job configuration on the fly

即时设置作业配置

使用 dispatch() 全局帮助方法,您可以执行以下操作:

dispatch(new InvoiceEmail($order))
        ->onConnection('default')
        ->onQueue('emails')
        ->delay(60);

这只有在您在作业类中使用 Bus\Queueable trait时才有效,此trait包含几种方法,您可以在分发作业类之前在作业类上设置一些属性,例如:

public function onQueue($queue)
{
    $this->queue = $queue;

    return $this;
}

但是在你的例子中,我们调用dispatch()的返回方法!

这是诀窍:

function dispatch($job)
{
    return new PendingDispatch($job);
}

这是在Foundation/helpers.php中的dispatch()帮助方法的定义,它返回一个Bus\PendingDispatch 的实例,并且在这个类中,我们有这样的方法:

public function onQueue($queue)
{
    $this->job->onQueue($queue);

    return $this;
}

所以当我们执行 dispatch(new JobClass())->onQueue('default'), 时,PendingDispatchonQueue 方法将调用job类上的 onQueue 方法,如前所述,作业类需要使用所有这些的 Queueable trait来工作。

那么调用Dispatcher::dispatch方法的那部分是哪里?

一旦你执行了 dispatch(new JobClass())->onQueue('default'),你将让作业实例准备好进行调度,实际的工作发生在 PendingDispatch::__destruct()中:

public function __destruct()
{
    app(Dispatcher::class)->dispatch($this->job);
}

调用此方法时,将从容器中解析 Dispatcher 的一个实例,然后调用它的dispatch()方法。 destruct()是一种PHP魔术方法,当对对象的所有引用不再存在或脚本终止时,都会调用,因为我们不会立即在 __destruct方法中存储对PendingDispatch 实例的引用,

// Here the destructor will be called rightaway
dispatch(new JobClass())->onQueue('default');

// 如果我们调用unset($temporaryVariable),那么析构函数将被调用
// 或脚本完成执行时。
$temporaryVariable = dispatch(new JobClass())->onQueue('default');

使用可调度的特征

您可以使用工作类上的 Bus\Dispatchable trait来调度您的工作,如下所示:

(new InvoiceEmail($order))->dispatch();

调度方法 Dispatchable看起来像这样:

public static function dispatch()
{
    return new PendingDispatch(new static(...func_get_args()));
}

正如你可以看到它使用一个 PendingDispatch的实例,这意味着我们可以做一些像这样的事:

(new InvoiceEmail($order))->dispatch()->onQueue('emails');

转载请注明: 转载自Ryan是菜鸟 | LNMP技术栈笔记

如果觉得本篇文章对您十分有益,何不 打赏一下

谢谢打赏

本文链接地址: 剖析Laravel队列系统--推送作业到队列

本作品采用《CC 协议》,转载必须注明作者和本文链接
本帖由系统于 3年前 自动加精
《L05 电商实战》
从零开发一个电商项目,功能包括电商后台、商品 & SKU 管理、购物车、订单管理、支付宝支付、微信支付、订单退款流程、优惠券等
《G01 Go 实战入门》
从零开始带你一步步开发一个 Go 博客项目,让你在最短的时间内学会使用 Go 进行编码。项目结构很大程度上参考了 Laravel。
讨论数量: 1

怎么理解 Contracts\Queue\Factory 是 Queue\QueueManager 的别名?

5年前 评论

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!