翻译进度
51
分块数量
6
参与人数

队列

这是一篇协同翻译的文章,你可以点击『我来翻译』按钮来参与翻译。


简介

在构建 Web 应用程序时,你可能需要执行一些任务,例如解析和存储上传的 CSV 文件,这些任务在典型的 Web 请求期间需要很长时间才能执行。 值得庆幸的是,Laravel 允许你轻松创建可以在后台处理的队列任务。 通过将时间密集型任务移至队列,你的应用程序可以以极快的速度响应 Web 请求,并为你的客户提供更好的用户体验。

Laravel 队列为各种不同的队列驱动提供统一的队列 API,例如 Amazon SQSRedis,甚至是关系型数据库。

Laravel 队列的配置选项存储在 config/queue.php 文件中。 在这个文件中,你可以找到框架中包含的每个队列驱动的连接配置,包括数据库, Amazon SQSRedis, 和 Beanstalkd 驱动,以及一个会立即执行任务的同步驱动(用于本地开发)。还包括一个用于丢弃排队任务的 null 队列驱动。

[!注意]
Laravel 提供了 Horizon ,一个关于 Redis 队列的漂亮仪表盘后台和配置系统。如需了解更多信息请查看完整的 Horizon 文档

连接 vs 驱动

在开始使用 Laravel 队列之前,理解「连接」和「队列」之间的区别非常重要。 在 config/queue.php 配置文件中,有一个 connections 连接选项。这个选项可以定义连接到哪个后台队列系统,例如: Amazon SQS 、 Beanstalk 、 或者 Redis 。然而,任何给定的队列连接都可能有多个「队列」,这些「队列」可能被认为是不同的堆栈或成堆的排队任务。

justin_g 翻译于 2个月前

请注意, queue 配置文件中的每个连接配置示例都包含一个 queue 属性。这是将任务发送到给定连接时将被分配到的默认队列。换句话说,如果你没有显式地指定任务队列,那么该任务将被放置在连接配置的 queue 属性中定义的队列上:

use App\Jobs\ProcessPodcast;

// 这个任务将被推送到默认队列...
ProcessPodcast::dispatch();

// 这个任务将被推送到「emails」队列...
ProcessPodcast::dispatch()->onQueue('emails');

有些应用可能不需要将任务推到多个队列中,而是倾向于使用一个简单的队列。然而,如果希望对任务的处理方式进行优先级排序或分段时,将任务推送到多个队列就显得特别有用,因为 Laravel 队列工作程序允许你指定哪些队列应该按优先级处理。例如,如果你将任务推送到一个 high 队列,你可能会运行一个赋予它们更高处理优先级的 worker:

php artisan queue:work --queue=high,default

驱动程序说明和先决条件

数据库

要使用 database 队列驱动,你需要一个数据库表来保存任务。通常,这包含在 Laravel 默认的 0001_01_01_000002_create_jobs_table.php 数据库迁移 中; 然而,如果你的应用程序不包含此迁移,可以使用 make:queue-table Artisan 命令创建它:

php artisan make:queue-table

php artisan migrate
justin_g 翻译于 2个月前

Redis

要使用 redis 队列驱动程序,需要在 config/database.php 配置文件中配置一个 redis 数据库连接。

[!注意]
serializer 和 compression 选项不被 redis 队列驱动支持。

Redis 集群

如果你的 Redis 队列当中使用了 Redis 集群,那么你的队列名称就必须包含一个 key hash tag。这是为了确保一个给定队列的所有 Redis 键都被放在同一个哈希槽中:

'redis' => [
    'driver' => 'redis',
    'connection' => env('REDIS_QUEUE_CONNECTION', 'default'),
    'queue' => env('REDIS_QUEUE', '{default}'),
    'retry_after' => env('REDIS_QUEUE_RETRY_AFTER', 90),
    'block_for' => null,
    'after_commit' => false,
],

阻塞

当使用 Redis 队列时,您可以使用 block_for 配置选项来指定在遍历 worker 循环和重新轮询 Redis 数据库之前,驱动程序需要等待多长时间才能使任务变得可用。

根据你的队列负载调整此值要比连续轮询 Redis 数据库中的新任务更加有效。例如,你可以将值设置为 5 以指示驱动程序在等待任务变得可用时应该阻塞 5 秒:

'redis' => [
    'driver' => 'redis',
    'connection' => env('REDIS_QUEUE_CONNECTION', 'default'),
    'queue' => env('REDIS_QUEUE', 'default'),
    'retry_after' => env('REDIS_QUEUE_RETRY_AFTER', 90),
    'block_for' => 5,
    'after_commit' => false,
],

[!注意]
将 block_for 设置成 0 将导致队列任务一直阻塞,直到某一个任务变得可用。这也将防止在下一个任务被处理之前处理诸如 SIGTERM 之类的信号。

justin_g 翻译于 2个月前

其他驱动所需的先决条件

其它队列驱动需要下面的依赖。这些依赖可以通过 Composer 包管理器安装:

  • Amazon SQS: aws/aws-sdk-php ~3.0
  • Beanstalkd: pda/pheanstalk ~5.0
  • Redis: predis/predis ~2.0 or phpredis PHP 扩展
  • MongoDB: mongodb/laravel-mongodb

创建任务

生成任务类

默认情况下,应用程序的所有的可排队任务都被存储在了 app/Jobs 目录中。如果 app/Jobs 目录不存在,当你运行 make:job Artisan 命令时,将会自动创建该目录:

php artisan make:job ProcessPodcast

生成的类将会实现 Illuminate\Contracts\Queue\ShouldQueue 接口, 告诉 Laravel ,该任务应该推入队列以异步的方式运行。

[!注意]
任务 stubs 可以使用 stub publishing 定制。

任务类结构

任务类非常简单,通常只包含一个 handle 方法,在队列处理任务时将会调用它。让我们看一个任务类的示例。在这个例子中,我们假设我们管理一个 podcast 服务,并且需要在上传的 podcast 文件发布之前对其进行处理:

<?php

namespace App\Jobs;

use App\Models\Podcast;
use App\Services\AudioProcessor;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Queue\Queueable;

class ProcessPodcast implements ShouldQueue
{
    use Queueable;

    /**
     *  创建一个任务实例
     */
    public function __construct(
        public Podcast $podcast,
    ) {}

    /**
     * 执行任务
     */
    public function handle(AudioProcessor $processor): void
    {
        // 处理上传的 podcast...
    }
}
justin_g 翻译于 1个月前

在这个示例中,需要注意的是我们可以直接将一个 Eloquent 模型 传入到队列任务的构造函数中。 由于任务中使用Queueable trait,Eloquent 模型及其已加载的关联关系会在任务处理时被优雅地序列化与反序列化。
如果你的队列任务在构造函数中接收一个 Eloquent 模型,那么在写入队列时,只有模型的标识符会被序列化。 当任务被执行时,队列系统会自动从数据库中重新获取完整的模型实例和已加载的关联关系。
这种模型序列化的方式,可以让发送到队列驱动的任务负载体积更小,更高效。

handle 依赖注入

当队列开始处理任务时,就会调用 handle 方法。你也可以在任务的 handle 方法参数中使用「类型提示」来声明依赖。Laravel 的 服务容器 会自动注入这些依赖。

如果你想完全掌控服务容器是如何将依赖注入到 handle 方法中,可以使用容bindMethod方法。 bindMethod 方法接收一个回调函数,这个回调会获得任务实例和容器实例。你可以在回调中根据自己的需要来调用 handle 方法。
通常,需要在 App\Providers\AppServiceProviderboot 方法中调用这个方法,该类是应用程序的 服务提供者

use App\Jobs\ProcessPodcast;
use App\Services\AudioProcessor;
use Illuminate\Contracts\Foundation\Application;

$this->app->bindMethod([ProcessPodcast::class, 'handle'], function (ProcessPodcast $job, Application $app) {
    return $job->handle($app->make(AudioProcessor::class));
});

[警告]
二进制数据(例如原始图片内容)在传递到队列任务之前,应该先使用 base64_encode 函数进行编码。 否则,当任务被放入队列时,可能无法正确序列化为 JSON。

weilikai 翻译于 3周前

队列中的关联关系

由于所有已加载的 Eloquent 模型关联关系在队列入队时也会被一并序列化,因此序列化后的队列字符串有时可能会非常庞大。
此外,当队列被反序列化并重新从数据库中获取模型关联关系时,这些关系会「完整地」被取回。 需要注意,队列在入队前对模型应用的任何关联约束条件,在队列反序列化后都不会再次生效。 因此,如果你只想处理某个关联关系的部分数据,需要在队列任务中重新对该关系进行约束。

另外,如果你想避免模型的关联关系被序列化,可以在设置属性值时调用模型的 withoutRelations 方法。 此方法会返回一个不包含已加载关联关系的模型实例:

/**
 * 创建一个新的队列实例.
 */
public function __construct(
    Podcast $podcast,
) {
    $this->podcast = $podcast->withoutRelations();
}

如果你在使用 PHP 的构造函数,并且希望某个 Eloquent 模型在序列化时不包含关联关系,可以使用 WithoutRelations 属性:

use Illuminate\Queue\Attributes\WithoutRelations;

/**
 * 创建一个新的队列实例.
 */
public function __construct(
    #[WithoutRelations]
    public Podcast $podcast,
) {}

如果一个队列接收的是一组 Eloquent 模型集合或数组,而不是单个模型,那么在队列被反序列化并执行时,这些模型的关联关系将不会被恢复。这样是为了避免在处理大量模型的队列中消耗过多资源。

唯一队列

[!警告]
唯一队列需要一个支持 的缓存驱动。目前,memcachedredisdynamodbdatabasefilearray 缓存驱动都支持原子锁。 并且,唯一队列约束不适用于批量队列。

weilikai 翻译于 3周前

有时候,你可能希望在任意时刻,队列中只保留某个特定任务的一个实例。要实现这一点,只需要在队列类上「实现」 ShouldBeUnique 接口即可。
而且,这个接口并不要求你在类中额外定义任何方法:

<?php

use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Contracts\Queue\ShouldBeUnique;

class UpdateSearchIndex implements ShouldQueue, ShouldBeUnique
{
    // ...
}

在上面的示例中,UpdateSearchIndex 队列是「唯一的」。 因此,如果该队列的另一个实例已经在队列中且尚未处理完成,那么它将不会再次被调度。

在某些情况下,你可能希望通过一个特定的「键」来让队列保持唯一,或者希望指定一个超时时间,超过该时间后队列就不再保持唯一。
要实现这一点,你可以在队列类中定义 uniqueIduniqueFor 属性或方法:

<?php

use App\Models\Product;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Contracts\Queue\ShouldBeUnique;

class UpdateSearchIndex implements ShouldQueue, ShouldBeUnique
{
    /**
     * 产品实例.
     *
     * @var \App\Product
     */
    public $product;

    /**
     * 队列的唯一锁将在 N 秒后被释放.
     *
     * @var int
     */
    public $uniqueFor = 3600;

    /**
     * 获取该队列的唯一 ID.
     */
    public function uniqueId(): string
    {
        return $this->product->id;
    }
}

在上面示例中,UpdateSearchIndex 队列是通过产品 ID 来保持唯一的。因此,任何带有相同产品 ID 的新队列在现有队列处理完成之前,都将会被忽略。
但是,如果现有队列在一小时内仍未被处理完毕,那么唯一锁将会被释放,此时带有相同唯一键的另一个队列就可以被调度到队列中。

[!警告]
如果你的应用会从多个 Web 服务器或容器中调度队列, 你需要确保所有服务都连接到同一个中央缓存服务器,这样 Laravel 才能准确判断某个队列是否唯一。

weilikai 翻译于 3周前

保持队列唯一 直到开始处理

默认情况下,唯一队列会在队列处理完成或所有重试都失败后才「解锁」。 但是有些情况下,你如果希望队列在开始处理之前就立即解锁。
想要实现这一点,你的队列类应实现 ShouldBeUniqueUntilProcessing 接口,而不是 ShouldBeUnique 接口:

<?php

use App\Models\Product;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Contracts\Queue\ShouldBeUniqueUntilProcessing;

class UpdateSearchIndex implements ShouldQueue, ShouldBeUniqueUntilProcessing
{
    // ...
}

唯一队列锁

在后台,当一个实现了 ShouldBeUnique 接口的队列被调度时,Laravel 会尝试使用 uniqueId 作为键去获取一个 。 如果锁未能获取到,该队列将不会被调度。
该锁会在队列处理完成或所有重试尝试都失败后被释放。 默认情况下,Laravel 会使用默认缓存驱动来获取这个锁。
如果你希望使用其他缓存驱动来获取锁,可以在队列类中定义一个 uniqueVia 方法,该方法返回要使用的缓存驱动。

use Illuminate\Contracts\Cache\Repository;
use Illuminate\Support\Facades\Cache;

class UpdateSearchIndex implements ShouldQueue, ShouldBeUnique
{
    // ...

    /**
     * 获取唯一队列锁所使用的缓存驱动.
     */
    public function uniqueVia(): Repository
    {
        return Cache::driver('redis');
    }
}

[!注意]
如果你只需要限制队列的并发处理,可以改用 WithoutOverlapping 队列中间件。

队列加密

Laravel 允许你通过 加密 来保证队列数据的隐私性和完整性。
要开始使用,只需在队列类中实现 ShouldBeEncrypted 接口。 一旦队列类中实现了该接口,Laravel 会在将队列推送到队列前自动对其进行加密:

<?php

use Illuminate\Contracts\Queue\ShouldBeEncrypted;
use Illuminate\Contracts\Queue\ShouldQueue;

class UpdateSearchIndex implements ShouldQueue, ShouldBeEncrypted
{
    // ...
}
weilikai 翻译于 3周前

任务中间件

队列中间件允许在队列执行过程中包裹自定义逻辑,从而减少队列本身的代码。
例如,下面的 handle 方法利用了 Laravel 的 Redis 限流功能,使得每五秒仅允许一个队列被处理:

use Illuminate\Support\Facades\Redis;

/**
 * 执行当前队列.
 */
public function handle(): void
{
    Redis::throttle('key')->block(0)->allow(1)->every(5)->then(function () {
        info('Lock obtained...');

        // 处理当前队列...
    }, function () {
        // 无法获取到锁...

        return $this->release(5);
    });
}

这段代码虽然有效,但 handle 方法的实现非常杂乱,因为它被 Redis 限流逻辑充斥。 如果我们希望对其他队列也进行限流,这段限流逻辑就必须重复实现。

所以与其在 handle 方法中进行限流,不如定义一个专门处理限流的队列中间件。
Laravel 并没有为队列中间件规定默认存放位置,你可以在应用中的任意位置放置队列中间件。
在这个示例中,我们会将中间件放在 app/Jobs/Middleware 目录下:

<?php

namespace App\Jobs\Middleware;

use Closure;
use Illuminate\Support\Facades\Redis;

class RateLimited
{
    /**
     * 处理当前队列任务.
     *
     * @param  \Closure(object): void  $next
     */
    public function handle(object $job, Closure $next): void
    {
        Redis::throttle('key')
            ->block(0)->allow(1)->every(5)
            ->then(function () use ($job, $next) {
                // 锁已获取...

                $next($job);
            }, function () use ($job) {
                // 无法获取到锁...

                $job->release(5);
            });
    }
}

如你所见,和 路由中间件 类似,队列中间件会接收当前正在处理的队列任务以及一个回调函数,该回调函数会被调用以继续处理队列任务。

weilikai 翻译于 3周前

创建队列中间件后,你可以在队列类的 middleware 方法中返回这些中间件,从而将它们应用到该队列上。 但是通过 make:job Artisan 命令生成的队列类默认没有该方法,因此你需要手动在队列类中添加 middleware 方法:

use App\Jobs\Middleware\RateLimited;

/**
 * 获取队列需要通过的中间件.
 *
 * @return array<int, object>
 */
public function middleware(): array
{
    return [new RateLimited];
}

[!注意]
队列中间件同样可以应用于队列的事件监听器、邮件类和通知。

限流

以上示例展示了如何编写自定义限流队列中间件,但 Laravel 实际上已经包含了可用于队列限流的中间件。 和 路由限流器 类似,队列限流器通过 RateLimiter facade 的 for 方法进行定义。

例如,如果你希望普通用户每小时只能备份一次数据,而对高级客户不做此限制。 要实现这一点,可以在 AppServiceProviderboot 方法中定义一个 RateLimiter

use Illuminate\Cache\RateLimiting\Limit;
use Illuminate\Support\Facades\RateLimiter;

/**
 * 引导应用中的所有服务.
 */
public function boot(): void
{
    RateLimiter::for('backups', function (object $job) {
        return $job->user->vipCustomer()
            ? Limit::none()
            : Limit::perHour(1)->by($job->user->id);
    });
}

在以上示例中,我们定义了每小时限流。 当然你也可以使用 perMinute 方法轻松定义基于分钟的限流,你可以将任意值传递给限流的 by 方法,这个值最常用于按客户划分限流:

return Limit::perMinute(50)->by($job->user->id);
weilikai 翻译于 3周前

定义好限流规则后,可以通过 Illuminate\Queue\Middleware\RateLimited 中间件,将该限流器应用到队列上。
每当队列任务超出限流阈值时,这个中间件就会根据限流时长计算合适的延迟,并将该任务重新释放回队列中。

use Illuminate\Queue\Middleware\RateLimited;

/**
 * 获取队列在执行前需要经过的中间件.
 *
 * @return array<int, object>
 */
public function middleware(): array
{
    return [new RateLimited('backups')];
}

将超出限流的任务重新释放回队列时,该任务的 attempts 总次数仍会增加。 你可能需要根据实际情况调整队列类中的 triesmaxExceptions 属性。 或者也可以使用 retryUntil 方法 来定义队列任务不再尝试的截止时间。

此外,使用 releaseAfter 方法,你还可以指定在重新释放队列后,必须经过多少秒才能再次尝试执行:

/**
 * 获取队列在执行前需要经过的中间件.
 *
 * @return array<int, object>
 */
public function middleware(): array
{
    return [(new RateLimited('backups'))->releaseAfter(60)];
}

如果你不希望队列任务在被限流时被重试,可以使用 dontRelease 方法:

/**
 * 获取队列在执行前需要经过的中间件。
 *
 * @return array<int, object>
 */
public function middleware(): array
{
    return [(new RateLimited('backups'))->dontRelease()];
}

[!注意]
如果你使用 Redis,可以使用 Illuminate\Queue\Middleware\RateLimitedWithRedis 中间件。
该中间件针对 Redis 做了优化,比基础限流中间件更高效。

weilikai 翻译于 3周前

防止任务重复执行

Laravel 提供了一个 Illuminate\Queue\Middleware\WithoutOverlapping 中间件,可以通过任意键防止任务重复执行。 当队列任务需要修改某个资源,而该资源一次只应被一个任务修改时,这个中间件就很有作用。

例如,假设你有一个队列任务用于更新用户的信用分数,你不希望同一用户 ID 更新信用分任务时发生重复执行。 对此你可以在队列任务的 middleware 方法中返回 WithoutOverlapping 中间件:

use Illuminate\Queue\Middleware\WithoutOverlapping;

/**
 * 获取队列在执行前需要经过的中间件.
 *
 * @return array<int, object>
 */
public function middleware(): array
{
    return [new WithoutOverlapping($this->user->id)];
}

同类型的任何重复执行队列任务都会被重新释放回队列。 你还可以指定在重新释放队列后,必须经过多少秒才能再次执行:

/**
 * 获取队列在执行前需要经过的中间件.
 *
 * @return array<int, object>
 */
public function middleware(): array
{
    return [(new WithoutOverlapping($this->order->id))->releaseAfter(60)];
}

如果你希望立即删除所有重复执行的队列任务,使其不再被重试,可以使用 dontRelease 方法:

/**
 * 获取队列任务在执行前需要经过的中间件.
 *
 * @return array<int, object>
 */
public function middleware(): array
{
    return [(new WithoutOverlapping($this->order->id))->dontRelease()];
}

WithoutOverlapping 中间件依赖于 Laravel 的原子锁功能。 当队列任务意外失败或超时,导致锁未被释放。 这时你可以使用 expireAfter 方法显式地定义锁的过期时间。
如下所示,Laravel 会在队列任务开始处理后三分钟释放 WithoutOverlapping 锁:

/**
 * 获取队列任务在执行前需要经过的中间件.
 *
 * @return array<int, object>
 */
public function middleware(): array
{
    return [(new WithoutOverlapping($this->order->id))->expireAfter(180)];
}

[!警告]
WithoutOverlapping 中间件需要使用支持 的缓存驱动。 目前 memcachedredisdynamodbdatabasefilearray 缓存驱动均支持原子锁。

weilikai 翻译于 3周前

队列类共享锁键

默认情况下,WithoutOverlapping 中间件仅会防止同一队列类的任务重叠。 因此即使两个不同的队列类使用相同的锁键,它们仍可能发生重叠。
为此可以使用 shared 方法让 Laravel 在多个队列类间共享该锁键:

use Illuminate\Queue\Middleware\WithoutOverlapping;

class ProviderIsDown
{
    // ...

    public function middleware(): array
    {
        return [
            (new WithoutOverlapping("status:{$this->provider}"))->shared(),
        ];
    }
}

class ProviderIsUp
{
    // ...

    public function middleware(): array
    {
        return [
            (new WithoutOverlapping("status:{$this->provider}"))->shared(),
        ];
    }
}

异常限流

Laravel 提供了一个 Illuminate\Queue\Middleware\ThrottlesExceptions 中间件,用于限流异常。 当队列抛出指定次数的异常后,所有后续的执行尝试都会被延迟,直到指定的时间间隔结束。 该中间件对不稳定的第三方服务交互的队列任务非常友好。

假设有一个队列任务需要调用第三方 API,而该 API 开始抛出异常。 为了对异常进行限流,你可以在队列任务的 middleware 方法中返回 ThrottlesExceptions 中间件。
该中间件要和实现了 基于时间的尝试 的队列任务配合使用:

use DateTime;
use Illuminate\Queue\Middleware\ThrottlesExceptions;

/**
 * 获取队列需要经过的中间件.
 *
 * @return array<int, object>
 */
public function middleware(): array
{
    return [new ThrottlesExceptions(10, 5 * 60)];
}

/**
 * 获取队列的超时时间。
 */
public function retryUntil(): DateTime
{
    return now()->addMinutes(30);
}

中间件接受的第一个构造函数参数是队列在被限流之前允许抛出的异常次数, 第二个参数是队列任务在被限流后再次尝试执行前必须经过的秒数。
在上面的代码示例中,如果队列任务连续抛出 10 次异常,我们将在 5 分钟后再次尝试执行该任务,并且限制时间 30 分钟。

weilikai 翻译于 3周前

当任务抛出异常但尚未达到异常阈值时,通常会立即重试该任务。但是,你可以通过在将中间件附加到任务时调用 backoff 方法来指定此类任务应延迟的分钟数:

use Illuminate\Queue\Middleware\ThrottlesExceptions;

/**
 * Get the middleware the job should pass through.
 *
 * @return array<int, object>
 */
public function middleware(): array
{
    return [(new ThrottlesExceptions(10, 5 * 60))->backoff(5)];
}

在内部,这个中间件使用 Laravel 的缓存系统来实现速率限制,并利用任务的类名作为缓存 「键」。 在将中间件附加到任务时,你可以通过调用 by 法来覆盖此键。 如果你有多个任务与同一个第三方服务交互并且你希望它们共享一个共同的节流 「桶」,这可能会很有用:

use Illuminate\Queue\Middleware\ThrottlesExceptions;

/**
 * Get the middleware the job should pass through.
 *
 * @return array<int, object>
 */
public function middleware(): array
{
    return [(new ThrottlesExceptions(10, 10 * 60))->by('key')];
}

默认情况下,这个中间件将限制每个异常。你可以通过在附加中间件到任务时调用 when 方法来修改这种行为。只有当提供给 when 方法的闭包返回 true 时,异常才会被节流:

use Illuminate\Http\Client\HttpClientException;
use Illuminate\Queue\Middleware\ThrottlesExceptions;

/**
 * Get the middleware the job should pass through.
 *
 * @return array<int, object>
 */
public function middleware(): array
{
    return [(new ThrottlesExceptions(10, 10 * 60))->when(
        fn (Throwable $throwable) => $throwable instanceof HttpClientException
    )];
}

when 方法不同(该方法会将任务放回队列或抛出异常), deleteWhen 方法允许你在指定异常发生时,将任务完全删除:

use App\Exceptions\CustomerDeletedException;
use Illuminate\Queue\Middleware\ThrottlesExceptions;

/**
 * Get the middleware the job should pass through.
 *
 * @return array<int, object>
 */
public function middleware(): array
{
    return [(new ThrottlesExceptions(2, 10 * 60))->deleteWhen(CustomerDeletedException::class)];
}
jiaerxiao 翻译于 2周前

如果你希望将被节流的异常报告给应用程序的异常处理程序,可以在将中间件附加到任务时调用 report 方法来实现。
也可以为 report 方法提供一个闭包,并且只有当该闭包返回 true 时,异常才会被报告:

use Illuminate\Http\Client\HttpClientException;
use Illuminate\Queue\Middleware\ThrottlesExceptions;

/**
 * 获取任务时,应该通过的中间件.
 *
 * @return array<int, object>
 */
public function middleware(): array
{
    return [(new ThrottlesExceptions(10, 10 * 60))->report(
        fn (Throwable $throwable) => $throwable instanceof HttpClientException
    )];
}

[技巧]
如果你使用的是 Redis,可以使用 Illuminate\Queue\Middleware\ThrottlesExceptionsWithRedis 中间件。 该中间件针对 Redis 进行了优化,比基础的异常节流中间件更高效。

跳过任务

Skip 中间件允许你在不修改任务逻辑的情况下,指定某个任务跳过或删除。
Skip::when 方法会在给定条件为 true 时删除任务,而 Skip::unless 方法会在条件为 false 时删除任务:

use Illuminate\Queue\Middleware\Skip;

/**
 * 获取任务应通过的中间件.
 */
public function middleware(): array
{
    return [
        Skip::when($someCondition),
    ];
}

你还可以向 whenunless 方法传递一个 Closure,用于更复杂的条件判断:

use Illuminate\Queue\Middleware\Skip;

class ProcessPodcast implements ShouldQueue
{
    public function middleware(): array
    {
        return [
            Skip::unless(function () {
                return $this->user->subscriptionActive() && 
                       $this->podcast->isPublished();
            }),
        ];
    }

    public function handle(): void
    {
        // 任务逻辑
    }
}

```php
use Illuminate\Queue\Middleware\Skip;

/**
 * 获取任务要通过的中间件.
 */
public function middleware(): array
{
    return [
        Skip::when(function (): bool {
            return $this->shouldSkip();
        }),
    ];
}

调度任务

一旦你写好了队列类,你可以使用任务本身的 dispatch 方法来调度它。
传递给 dispatch 方法的参数将会传入队列类的构造函数:

<?php

namespace App\Http\Controllers;

use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
use Illuminate\Http\RedirectResponse;
use Illuminate\Http\Request;

class PodcastController extends Controller
{
    /**
     * 存储一个新的播客.
     */
    public function store(Request $request): RedirectResponse
    {
        $podcast = Podcast::create(/* ... */);

        // ...

        ProcessPodcast::dispatch($podcast);

        return redirect('/podcasts');
    }
}
weilikai 翻译于 2周前

如果你想有条件地分派任务,你可以使用 dispatchIfdispatchUnless 方法:

ProcessPodcast::dispatchIf($accountActive, $podcast);

ProcessPodcast::dispatchUnless($accountSuspended, $podcast);

在新的 Laravel 应用程序中,默认的队列驱动是 sync 驱动。这个驱动在当前请求的前台同步执行任务,通常在本地开发中很方便。如果你想在后台处理队列任务,你可以在应用程序的 config/queue.php 配置文件中指定不同的队列驱动。

延迟调度

如果你想指定一个任务不应该立即可供队列工作器处理,当调度任务时,你可以使用 delay 方法。例如,让我们指定一个任务应该在调度后 10 分钟才能被处理:

<?php

namespace App\Http\Controllers;

use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
use Illuminate\Http\RedirectResponse;
use Illuminate\Http\Request;

class PodcastController extends Controller
{
    /**
     * 储存一个新的播客.
     */
    public function store(Request $request): RedirectResponse
    {
        $podcast = Podcast::create(/* ... */);

        // ...

        ProcessPodcast::dispatch($podcast)
            ->delay(now()->addMinutes(10));

        return redirect('/podcasts');
    }
}

在某些情况下,队列可能配置了默认延迟。 如果你需要跳过这个延迟并立即调度任务,可以使用 withoutDelay 方法:

ProcessPodcast::dispatch($podcast)->withoutDelay();

[!注意]
Amazon SQS 队列服务的最大延迟时间为 15 分钟。

响应发送到浏览器后调度

weilikai 翻译于 2周前

另外,如果你的 web 服务器使用 FastCGI,dispatchAfterResponse 方法延迟调度作业,直到 HTTP 响应发送到用户浏览器之后。这将仍然允许用户开始使用应用程序,即使一个排队的任务仍然在执行。这通常只应该用于执行时间大约一秒钟的任务,例如发送电子邮件。由于它们是在当前 HTTP 请求中处理的,以这种方式调度的任务不需要运行队列工作器就能被处理:

use App\Jobs\SendNotification;

SendNotification::dispatchAfterResponse();

你也可以 dispatch 一个闭包并将 afterResponse 方法链式调用到 dispatch 帮助器上,以在发送 HTTP 响应后执行闭包:

use App\Mail\WelcomeMessage;
use Illuminate\Support\Facades\Mail;

dispatch(function () {
    Mail::to('taylor@example.com')->send(new WelcomeMessage);
})->afterResponse();

同步调度

如果你想立即(同步)调度任务,你可以使用 dispatchSync 方法。使用此方法时,任务不会排队,会在当前进程内立即执行

<?php

namespace App\Http\Controllers;

use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
use Illuminate\Http\RedirectResponse;
use Illuminate\Http\Request;

class PodcastController extends Controller
{
    /**
     * 存储一个新的播客.
     */
    public function store(Request $request): RedirectResponse
    {
        $podcast = Podcast::create(/* ... */);

        // 创建播客...

        ProcessPodcast::dispatchSync($podcast);

        return redirect('/podcasts');
    }
}

队列和数据库事务

在数据库事务中调度作业是完全可以的,但你应该特别注意确保你的作业实际上能够成功执行。在事务中调度作业时,有可能作业在父事务提交之前就被工作器处理了。当这种情况发生时,你在数据库事务期间对模型或数据库记录所做的任何更新可能还没有反映在数据库中。此外,在事务中创建的任何模型或数据库记录可能还不存在于数据库中。

weilikai 翻译于 2周前

幸运的是,Laravel 提供了几种方法来解决这个问题。首先,你可以在队列连接的配置数组中设置 after_commit 连接选项:

'redis' => [
    'driver' => 'redis',
    // ...
    'after_commit' => true,
],

after_commit 选项为 true, 你可以在数据库事务中调度作业;但是,Laravel 会等到打开的父数据库事务提交后再实际调度作业。当然,如果目前没有开启的数据库事务,作业将被立即调度。

如果事务因在事务期间发生的异常而回滚,那么在该事务期间调度的作业将被丢弃。

[!注意]
将 after_commit 配置选项设置为 true 还会导致任何排队的事件监听器、邮件、通知和广播事件在所有打开的数据库事务提交后被调度。

内联指定提交调度行为

如果你没有将 after_commit 队列连接配置选项设置为 true,你仍然可以指定特定作业应在所有打开的数据库事务提交后被调度。为此,你可以将 afterCommit 方法链接到你的调度操作:

use App\Jobs\ProcessPodcast;

ProcessPodcast::dispatch($podcast)->afterCommit();

类似地,如果 after_commit 配置选项被设置为 true,你可以指定特定作业应立即调度,无需等待任何打开的数据库事务提交:

ProcessPodcast::dispatch($podcast)->beforeCommit();

任务链

任务链允许你指定一组应在主任务成功执行后按顺序运行的排队任务。如果序列中的一个任务失败,其余的任务将不会运行。要执行一个排队的任务链,你可以使用 Bus facade 提供的 chain 方法。Laravel 的命令总线是建立在排队作业调度之上的底层组件:

use App\Jobs\OptimizePodcast;
use App\Jobs\ProcessPodcast;
use App\Jobs\ReleasePodcast;
use Illuminate\Support\Facades\Bus;

Bus::chain([
    new ProcessPodcast,
    new OptimizePodcast,
    new ReleasePodcast,
])->dispatch();
weilikai 翻译于 2周前

除了链式调用作业类实例,你还可以链式闭包:

Bus::chain([
    new ProcessPodcast,
    new OptimizePodcast,
    function () {
        Podcast::update(/* ... */);
    },
])->dispatch();

[!警告]
在任务中使用 $this->delete() 方法删除任务不会阻止链式任务的处理。只有当链中的任务失败时,链才会停止执行。

链式连接 & 队列

如果要指定链式任务应使用的连接和队列,可以使用 onConnectiononQueue 方法。这些方法指定应使用的队列连接和队列名称,除非为排队任务显式分配了不同的连接 / 队列:

Bus::chain([
    new ProcessPodcast,
    new OptimizePodcast,
    new ReleasePodcast,
])->onConnection('redis')->onQueue('podcasts')->dispatch();

向链中添加任务

如果你想要从链中的另一个任务内向现有的任务链前置或后置添加任务。你可以使用 prependToChainappendToChain 方法来实现这一点:

/**
 * 执行任务.
 */
public function handle(): void
{
    // ...

    // 前置到当前链,在当前任务之后立即运行任务...
    $this->prependToChain(new TranscribePodcast);

    // 后置到当前链,在链末尾运行任务...
    $this->appendToChain(new TranscribePodcast);
}

链式失败

在链式任务中,你可以使用 catch 方法来指定一个闭包,该闭包在链中的任务失败时被调用。给定的回调将接收导致任务失败的 Throwable 实例:

use Illuminate\Support\Facades\Bus;
use Throwable;

Bus::chain([
    new ProcessPodcast,
    new OptimizePodcast,
    new ReleasePodcast,
])->catch(function (Throwable $e) {
    // 链中的某个任务失败了...
})->dispatch();

[!警告]
由于链回调被序列化并在稍后由 Laravel 队列执行,因此你不应在链回调中使用 $this 变量。

weilikai 翻译于 2周前

自定义队列连接

分发到特定队列

通过将任务推送到不同的队列,你可以「分类」你的队列任务,甚至可以优先分配多少个 worker 到各种队列。请记住,这并不会将任务推送到队列配置文件中定义的不同队列「连接」,而只是推送到单个连接内的特定队列。要指定队列,在分发任务时使用 onQueue 方法:

<?php

namespace App\Http\Controllers;

use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
use Illuminate\Http\RedirectResponse;
use Illuminate\Http\Request;

class PodcastController extends Controller
{
    /**
     * 存储一个新的播客.
     */
    public function store(Request $request): RedirectResponse
    {
        $podcast = Podcast::create(/* ... */);

        // 创建播客...

        ProcessPodcast::dispatch($podcast)->onQueue('processing');

        return redirect('/podcasts');
    }
}

或者,你可以在任务的构造函数中调用 onQueue 方法来指定任务的队列:

<?php

namespace App\Jobs;

use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Queue\Queueable;

class ProcessPodcast implements ShouldQueue
{
    use Queueable;

    /**
     * 创建一个新的任务实例.
     */
    public function __construct()
    {
        $this->onQueue('processing');
    }
}

调度到特定连接

如果你的应用与多个队列连接交互,你可以使用 onConnection 方法来指定任务应推送到的连接:

<?php

namespace App\Http\Controllers;

use App\Jobs\ProcessPodcast;
use App\Models\Podcast;
use Illuminate\Http\RedirectResponse;
use Illuminate\Http\Request;

class PodcastController extends Controller
{
    /**
     * 存储一个新的播客.
     */
    public function store(Request $request): RedirectResponse
    {
        $podcast = Podcast::create(/* ... */);

        // 创建播客...

        ProcessPodcast::dispatch($podcast)->onConnection('sqs');

        return redirect('/podcasts');
    }
}
weilikai 翻译于 2周前

你可以链式调用 onConnectiononQueue 方法来指定任务的连接和队列:

ProcessPodcast::dispatch($podcast)
    ->onConnection('sqs')
    ->onQueue('processing');

或者,你可以在任务的构造函数中调用 onConnection 方法来指定任务的连接:

<?php

namespace App\Jobs;

use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Queue\Queueable;

class ProcessPodcast implements ShouldQueue
{
    use Queueable;

    /**
     * 创建一个新的作业实例.
     */
    public function __construct()
    {
        $this->onConnection('sqs');
    }
}

指定最大任务尝试 次数 / 超时值

最大尝试次数

如果你的队列任务遇到错误,你可能不希望它无限期地重试。因此,Laravel 提供了多种方法来指定任务可以尝试的次数或持续时间。

指定作业可能尝试的最大次数的一种方法是通过 Artisan 命令行的 --tries 开关。这将适用于调度作业的所有任务,除非正在处理的任务指定了它最大尝试次数:

php artisan queue:work --tries=3

如果一个任务超过其最大尝试次数,它将被视为「失败」的任务。有关处理失败任务的更多信息,请查看处理失败队列。如果给 queue:work 命令提供了 --tries=0,任务将无限次重试。

你可以采取更细粒度的方法,通过在任务类本身定义任务可以尝试的最大次数。如果在任务上指定了最大尝试次数,它将优先于命令行上提供的 --tries 值:

<?php

namespace App\Jobs;

class ProcessPodcast implements ShouldQueue
{
    /**
     * 任务可尝试的次数.
     *
     * @var int
     */
    public $tries = 5;
}
weilikai 翻译于 2周前

如果你需要对特定任务的最大尝试次数进行动态控制,你可以在作业上定义一个 tries 方法:

/**
 * 确定任务可以尝试的次数.
 */
public function tries(): int
{
    return 5;
}

基于时间的尝试

作为定义任务在失败前可尝试次数的替代方式,你可以定义一个任务应该超时的时间。这允许在给定时间范围内任意次数地尝试任务。要定义任务超时的时间,请在任务类中添加一个 retryUntil 方法。此方法应该返回一个 DateTime 实例:

use DateTime;

/**
 * 确定任务应当超时的时间.
 */
public function retryUntil(): DateTime
{
    return now()->addMinutes(10);
}

如果同时定义了 retryUntiltries,Laravel 会优先使用 retryUntil 方法。

注意
你也可以在 队列事件监听器队列通知 中定义 tries 属性或 retryUntil 方法。

最大异常数

有时你可能希望指定一个任务可以尝试多次,但如果重试是由给定数量的未处理异常触发的(而不是直接被 release 方法释放的),应该失败。为此,你可以在任务类中定义一个 maxExceptions 属性:

<?php

namespace App\Jobs;

use Illuminate\Support\Facades\Redis;

class ProcessPodcast implements ShouldQueue
{
    /**
     * 可以尝试任务的次数.
     *
     * @var int
     */
    public $tries = 25;

    /**
     * 失败前允许的最大未处理异常数.
     *
     * @var int
     */
    public $maxExceptions = 3;

    /**
     * 执行任务.
     */
    public function handle(): void
    {
        Redis::throttle('key')->allow(10)->every(60)->then(function () {
            // 获得锁,处理播客...
        }, function () {
            // 无法获取锁...
            return $this->release(10);
        });
    }
}
weilikai 翻译于 2周前

在此示例中,如果应用程序无法获得 Redis 锁,则该任务将在 10 秒后被释放,并将继续重试最多 25 次。但是,如果任务抛出三个未处理的异常,则任务将失败。

超时

通常,你大致知道你的排队任务需要多长时间。因此,Laravel 允许你指定一个「超时」值。默认情况下,超时值为 60 秒。如果任务的处理时间超过超时值指定的秒数,处理任务的工作进程将退出并报错。通常工作进程将由服务器上配置的 进程管理器 自动重启。

任务可以运行的最大秒数可以使用 Artisan 命令行上的 --timeout 开关指定:

php artisan queue:work --timeout=30

如果任务因不断超时超过其最大尝试次数,它将被标记为失败。

你还可以在任务类本身定义任务允许运行的最大秒数。如果在任务上指定了超时时间,它将优先于命令行上指定的任何超时时间:

<?php

namespace App\Jobs;

class ProcessPodcast implements ShouldQueue
{
    /**
     * 在超时之前任务可以运行的秒数.
     *
     * @var int
     */
    public $timeout = 120;
}

有时,诸如 sockets 或外部 HTTP 连接之类的 IO 阻塞过程可能不会遵守你指定的超时。因此,在使用这些功能时,你应该始终尝试使用他们的 API 来指定超时时间。例如,当使用 Guzzle 时,你应该总是指定连接和请求超时值。

[!警告]
必须安装 PHP pcntl 扩展才能指定任务超时。此外,任务的「超时」值应始终小于其「任务到期」 值。否则,可能在任务实际完成执行或超时前,任务就会被重试。

weilikai 翻译于 2周前

Failing on Timeout

If you would like to indicate that a job should be marked as failed on timeout, you may define the $failOnTimeout property on the job class:

/**
 * Indicate if the job should be marked as failed on timeout.
 *
 * @var bool
 */
public $failOnTimeout = true;

Error Handling

If an exception is thrown while the job is being processed, the job will automatically be released back onto the queue so it may be attempted again. The job will continue to be released until it has been attempted the maximum number of times allowed by your application. The maximum number of attempts is defined by the --tries switch used on the queue:work Artisan command. Alternatively, the maximum number of attempts may be defined on the job class itself. More information on running the queue worker can be found below.

Manually Releasing a Job

Sometimes you may wish to manually release a job back onto the queue so that it can be attempted again at a later time. You may accomplish this by calling the release method:

/**
 * Execute the job.
 */
public function handle(): void
{
    // ...

    $this->release();
}

By default, the release method will release the job back onto the queue for immediate processing. However, you may instruct the queue to not make the job available for processing until a given number of seconds has elapsed by passing an integer or date instance to the release method:

$this->release(10);

$this->release(now()->addSeconds(10));

Manually Failing a Job

Occasionally you may need to manually mark a job as "failed". To do so, you may call the fail method:

/**
 * Execute the job.
 */
public function handle(): void
{
    // ...

    $this->fail();
}

If you would like to mark your job as failed because of an exception that you have caught, you may pass the exception to the fail method. Or, for convenience, you may pass a string error message which will be converted to an exception for you:

$this->fail($exception);

$this->fail('Something went wrong.');

[!NOTE]
For more information on failed jobs, check out the documentation on dealing with job failures.

Job Batching

Laravel's job batching feature allows you to easily execute a batch of jobs and then perform some action when the batch of jobs has completed executing. Before getting started, you should create a database migration to build a table which will contain meta information about your job batches, such as their completion percentage. This migration may be generated using the make:queue-batches-table Artisan command:

php artisan make:queue-batches-table

php artisan migrate

Defining Batchable Jobs

To define a batchable job, you should create a queueable job as normal; however, you should add the Illuminate\Bus\Batchable trait to the job class. This trait provides access to a batch method which may be used to retrieve the current batch that the job is executing within:

<?php

namespace App\Jobs;

use Illuminate\Bus\Batchable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Queue\Queueable;

class ImportCsv implements ShouldQueue
{
    use Batchable, Queueable;

    /**
     * Execute the job.
     */
    public function handle(): void
    {
        if ($this->batch()->cancelled()) {
            // Determine if the batch has been cancelled...

            return;
        }

        // Import a portion of the CSV file...
    }
}

Dispatching Batches

To dispatch a batch of jobs, you should use the batch method of the Bus facade. Of course, batching is primarily useful when combined with completion callbacks. So, you may use the then, catch, and finally methods to define completion callbacks for the batch. Each of these callbacks will receive an Illuminate\Bus\Batch instance when they are invoked. In this example, we will imagine we are queueing a batch of jobs that each process a given number of rows from a CSV file:

use App\Jobs\ImportCsv;
use Illuminate\Bus\Batch;
use Illuminate\Support\Facades\Bus;
use Throwable;

$batch = Bus::batch([
    new ImportCsv(1, 100),
    new ImportCsv(101, 200),
    new ImportCsv(201, 300),
    new ImportCsv(301, 400),
    new ImportCsv(401, 500),
])->before(function (Batch $batch) {
    // The batch has been created but no jobs have been added...
})->progress(function (Batch $batch) {
    // A single job has completed successfully...
})->then(function (Batch $batch) {
    // All jobs completed successfully...
})->catch(function (Batch $batch, Throwable $e) {
    // First batch job failure detected...
})->finally(function (Batch $batch) {
    // The batch has finished executing...
})->dispatch();

return $batch->id;

The batch's ID, which may be accessed via the $batch->id property, may be used to query the Laravel command bus for information about the batch after it has been dispatched.

[!WARNING]
Since batch callbacks are serialized and executed at a later time by the Laravel queue, you should not use the $this variable within the callbacks. In addition, since batched jobs are wrapped within database transactions, database statements that trigger implicit commits should not be executed within the jobs.

Naming Batches

Some tools such as Laravel Horizon and Laravel Telescope may provide more user-friendly debug information for batches if batches are named. To assign an arbitrary name to a batch, you may call the name method while defining the batch:

$batch = Bus::batch([
    // ...
])->then(function (Batch $batch) {
    // All jobs completed successfully...
})->name('Import CSV')->dispatch();

Batch Connection and Queue

If you would like to specify the connection and queue that should be used for the batched jobs, you may use the onConnection and onQueue methods. All batched jobs must execute within the same connection and queue:

$batch = Bus::batch([
    // ...
])->then(function (Batch $batch) {
    // All jobs completed successfully...
})->onConnection('redis')->onQueue('imports')->dispatch();

Chains and Batches

You may define a set of chained jobs within a batch by placing the chained jobs within an array. For example, we may execute two job chains in parallel and execute a callback when both job chains have finished processing:

use App\Jobs\ReleasePodcast;
use App\Jobs\SendPodcastReleaseNotification;
use Illuminate\Bus\Batch;
use Illuminate\Support\Facades\Bus;

Bus::batch([
    [
        new ReleasePodcast(1),
        new SendPodcastReleaseNotification(1),
    ],
    [
        new ReleasePodcast(2),
        new SendPodcastReleaseNotification(2),
    ],
])->then(function (Batch $batch) {
    // ...
})->dispatch();

Conversely, you may run batches of jobs within a chain by defining batches within the chain. For example, you could first run a batch of jobs to release multiple podcasts then a batch of jobs to send the release notifications:

use App\Jobs\FlushPodcastCache;
use App\Jobs\ReleasePodcast;
use App\Jobs\SendPodcastReleaseNotification;
use Illuminate\Support\Facades\Bus;

Bus::chain([
    new FlushPodcastCache,
    Bus::batch([
        new ReleasePodcast(1),
        new ReleasePodcast(2),
    ]),
    Bus::batch([
        new SendPodcastReleaseNotification(1),
        new SendPodcastReleaseNotification(2),
    ]),
])->dispatch();

Adding Jobs to Batches

Sometimes it may be useful to add additional jobs to a batch from within a batched job. This pattern can be useful when you need to batch thousands of jobs which may take too long to dispatch during a web request. So, instead, you may wish to dispatch an initial batch of "loader" jobs that hydrate the batch with even more jobs:

$batch = Bus::batch([
    new LoadImportBatch,
    new LoadImportBatch,
    new LoadImportBatch,
])->then(function (Batch $batch) {
    // All jobs completed successfully...
})->name('Import Contacts')->dispatch();

In this example, we will use the LoadImportBatch job to hydrate the batch with additional jobs. To accomplish this, we may use the add method on the batch instance that may be accessed via the job's batch method:

use App\Jobs\ImportContacts;
use Illuminate\Support\Collection;

/**
 * Execute the job.
 */
public function handle(): void
{
    if ($this->batch()->cancelled()) {
        return;
    }

    $this->batch()->add(Collection::times(1000, function () {
        return new ImportContacts;
    }));
}

[!WARNING]
You may only add jobs to a batch from within a job that belongs to the same batch.

Inspecting Batches

The Illuminate\Bus\Batch instance that is provided to batch completion callbacks has a variety of properties and methods to assist you in interacting with and inspecting a given batch of jobs:

// The UUID of the batch...
$batch->id;

// The name of the batch (if applicable)...
$batch->name;

// The number of jobs assigned to the batch...
$batch->totalJobs;

// The number of jobs that have not been processed by the queue...
$batch->pendingJobs;

// The number of jobs that have failed...
$batch->failedJobs;

// The number of jobs that have been processed thus far...
$batch->processedJobs();

// The completion percentage of the batch (0-100)...
$batch->progress();

// Indicates if the batch has finished executing...
$batch->finished();

// Cancel the execution of the batch...
$batch->cancel();

// Indicates if the batch has been cancelled...
$batch->cancelled();

Returning Batches From Routes

All Illuminate\Bus\Batch instances are JSON serializable, meaning you can return them directly from one of your application's routes to retrieve a JSON payload containing information about the batch, including its completion progress. This makes it convenient to display information about the batch's completion progress in your application's UI.

To retrieve a batch by its ID, you may use the Bus facade's findBatch method:

use Illuminate\Support\Facades\Bus;
use Illuminate\Support\Facades\Route;

Route::get('/batch/{batchId}', function (string $batchId) {
    return Bus::findBatch($batchId);
});

Cancelling Batches

Sometimes you may need to cancel a given batch's execution. This can be accomplished by calling the cancel method on the Illuminate\Bus\Batch instance:

/**
 * Execute the job.
 */
public function handle(): void
{
    if ($this->user->exceedsImportLimit()) {
        return $this->batch()->cancel();
    }

    if ($this->batch()->cancelled()) {
        return;
    }
}

As you may have noticed in the previous examples, batched jobs should typically determine if their corresponding batch has been cancelled before continuing execution. However, for convenience, you may assign the SkipIfBatchCancelled middleware to the job instead. As its name indicates, this middleware will instruct Laravel to not process the job if its corresponding batch has been cancelled:

use Illuminate\Queue\Middleware\SkipIfBatchCancelled;

/**
 * Get the middleware the job should pass through.
 */
public function middleware(): array
{
    return [new SkipIfBatchCancelled];
}

Batch Failures

When a batched job fails, the catch callback (if assigned) will be invoked. This callback is only invoked for the first job that fails within the batch.

Allowing Failures

When a job within a batch fails, Laravel will automatically mark the batch as "cancelled". If you wish, you may disable this behavior so that a job failure does not automatically mark the batch as cancelled. This may be accomplished by calling the allowFailures method while dispatching the batch:

$batch = Bus::batch([
    // ...
])->then(function (Batch $batch) {
    // All jobs completed successfully...
})->allowFailures()->dispatch();

Retrying Failed Batch Jobs

For convenience, Laravel provides a queue:retry-batch Artisan command that allows you to easily retry all of the failed jobs for a given batch. The queue:retry-batch command accepts the UUID of the batch whose failed jobs should be retried:

php artisan queue:retry-batch 32dbc76c-4f82-4749-b610-a639fe0099b5

Pruning Batches

Without pruning, the job_batches table can accumulate records very quickly. To mitigate this, you should schedule the queue:prune-batches Artisan command to run daily:

use Illuminate\Support\Facades\Schedule;

Schedule::command('queue:prune-batches')->daily();

By default, all finished batches that are more than 24 hours old will be pruned. You may use the hours option when calling the command to determine how long to retain batch data. For example, the following command will delete all batches that finished over 48 hours ago:

use Illuminate\Support\Facades\Schedule;

Schedule::command('queue:prune-batches --hours=48')->daily();

Sometimes, your jobs_batches table may accumulate batch records for batches that never completed successfully, such as batches where a job failed and that job was never retried successfully. You may instruct the queue:prune-batches command to prune these unfinished batch records using the unfinished option:

use Illuminate\Support\Facades\Schedule;

Schedule::command('queue:prune-batches --hours=48 --unfinished=72')->daily();

Likewise, your jobs_batches table may also accumulate batch records for cancelled batches. You may instruct the queue:prune-batches command to prune these cancelled batch records using the cancelled option:

use Illuminate\Support\Facades\Schedule;

Schedule::command('queue:prune-batches --hours=48 --cancelled=72')->daily();

Storing Batches in DynamoDB

Laravel also provides support for storing batch meta information in DynamoDB instead of a relational database. However, you will need to manually create a DynamoDB table to store all of the batch records.

Typically, this table should be named job_batches, but you should name the table based on the value of the queue.batching.table configuration value within your application's queue configuration file.

DynamoDB Batch Table Configuration

The job_batches table should have a string primary partition key named application and a string primary sort key named id. The application portion of the key will contain your application's name as defined by the name configuration value within your application's app configuration file. Since the application name is part of the DynamoDB table's key, you can use the same table to store job batches for multiple Laravel applications.

In addition, you may define ttl attribute for your table if you would like to take advantage of automatic batch pruning.

DynamoDB Configuration

Next, install the AWS SDK so that your Laravel application can communicate with Amazon DynamoDB:

composer require aws/aws-sdk-php

Then, set the queue.batching.driver configuration option's value to dynamodb. In addition, you should define key, secret, and region configuration options within the batching configuration array. These options will be used to authenticate with AWS. When using the dynamodb driver, the queue.batching.database configuration option is unnecessary:

'batching' => [
    'driver' => env('QUEUE_BATCHING_DRIVER', 'dynamodb'),
    'key' => env('AWS_ACCESS_KEY_ID'),
    'secret' => env('AWS_SECRET_ACCESS_KEY'),
    'region' => env('AWS_DEFAULT_REGION', 'us-east-1'),
    'table' => 'job_batches',
],

Pruning Batches in DynamoDB

When utilizing DynamoDB to store job batch information, the typical pruning commands used to prune batches stored in a relational database will not work. Instead, you may utilize DynamoDB's native TTL functionality to automatically remove records for old batches.

If you defined your DynamoDB table with a ttl attribute, you may define configuration parameters to instruct Laravel how to prune batch records. The queue.batching.ttl_attribute configuration value defines the name of the attribute holding the TTL, while the queue.batching.ttl configuration value defines the number of seconds after which a batch record can be removed from the DynamoDB table, relative to the last time the record was updated:

'batching' => [
    'driver' => env('QUEUE_FAILED_DRIVER', 'dynamodb'),
    'key' => env('AWS_ACCESS_KEY_ID'),
    'secret' => env('AWS_SECRET_ACCESS_KEY'),
    'region' => env('AWS_DEFAULT_REGION', 'us-east-1'),
    'table' => 'job_batches',
    'ttl_attribute' => 'ttl',
    'ttl' => 60 * 60 * 24 * 7, // 7 days...
],

Queueing Closures

Instead of dispatching a job class to the queue, you may also dispatch a closure. This is great for quick, simple tasks that need to be executed outside of the current request cycle. When dispatching closures to the queue, the closure's code content is cryptographically signed so that it cannot be modified in transit:

$podcast = App\Podcast::find(1);

dispatch(function () use ($podcast) {
    $podcast->publish();
});

To assign a name to the queued closure which may be used by queue reporting dashboards, as well as be displayed by the queue:work command, you may use the name method:

dispatch(function () {
    // ...
})->name('Publish Podcast');

Using the catch method, you may provide a closure that should be executed if the queued closure fails to complete successfully after exhausting all of your queue's configured retry attempts:

use Throwable;

dispatch(function () use ($podcast) {
    $podcast->publish();
})->catch(function (Throwable $e) {
    // This job has failed...
});

[!WARNING]
Since catch callbacks are serialized and executed at a later time by the Laravel queue, you should not use the $this variable within catch callbacks.

Running the Queue Worker

The queue:work Command

Laravel includes an Artisan command that will start a queue worker and process new jobs as they are pushed onto the queue. You may run the worker using the queue:work Artisan command. Note that once the queue:work command has started, it will continue to run until it is manually stopped or you close your terminal:

php artisan queue:work

[!NOTE]
To keep the queue:work process running permanently in the background, you should use a process monitor such as Supervisor to ensure that the queue worker does not stop running.

You may include the -v flag when invoking the queue:work command if you would like the processed job IDs to be included in the command's output:

php artisan queue:work -v

Remember, queue workers are long-lived processes and store the booted application state in memory. As a result, they will not notice changes in your code base after they have been started. So, during your deployment process, be sure to restart your queue workers. In addition, remember that any static state created or modified by your application will not be automatically reset between jobs.

Alternatively, you may run the queue:listen command. When using the queue:listen command, you don't have to manually restart the worker when you want to reload your updated code or reset the application state; however, this command is significantly less efficient than the queue:work command:

php artisan queue:listen

Running Multiple Queue Workers

To assign multiple workers to a queue and process jobs concurrently, you should simply start multiple queue:work processes. This can either be done locally via multiple tabs in your terminal or in production using your process manager's configuration settings. When using Supervisor, you may use the numprocs configuration value.

Specifying the Connection and Queue

You may also specify which queue connection the worker should utilize. The connection name passed to the work command should correspond to one of the connections defined in your config/queue.php configuration file:

php artisan queue:work redis

By default, the queue:work command only processes jobs for the default queue on a given connection. However, you may customize your queue worker even further by only processing particular queues for a given connection. For example, if all of your emails are processed in an emails queue on your redis queue connection, you may issue the following command to start a worker that only processes that queue:

php artisan queue:work redis --queue=emails

Processing a Specified Number of Jobs

The --once option may be used to instruct the worker to only process a single job from the queue:

php artisan queue:work --once

The --max-jobs option may be used to instruct the worker to process the given number of jobs and then exit. This option may be useful when combined with Supervisor so that your workers are automatically restarted after processing a given number of jobs, releasing any memory they may have accumulated:

php artisan queue:work --max-jobs=1000

Processing All Queued Jobs and Then Exiting

The --stop-when-empty option may be used to instruct the worker to process all jobs and then exit gracefully. This option can be useful when processing Laravel queues within a Docker container if you wish to shutdown the container after the queue is empty:

php artisan queue:work --stop-when-empty

Processing Jobs for a Given Number of Seconds

The --max-time option may be used to instruct the worker to process jobs for the given number of seconds and then exit. This option may be useful when combined with Supervisor so that your workers are automatically restarted after processing jobs for a given amount of time, releasing any memory they may have accumulated:

# Process jobs for one hour and then exit...
php artisan queue:work --max-time=3600

Worker Sleep Duration

When jobs are available on the queue, the worker will keep processing jobs with no delay in between jobs. However, the sleep option determines how many seconds the worker will "sleep" if there are no jobs available. Of course, while sleeping, the worker will not process any new jobs:

php artisan queue:work --sleep=3

Maintenance Mode and Queues

While your application is in maintenance mode, no queued jobs will be handled. The jobs will continue to be handled as normal once the application is out of maintenance mode.

To force your queue workers to process jobs even if maintenance mode is enabled, you may use --force option:

php artisan queue:work --force

Resource Considerations

Daemon queue workers do not "reboot" the framework before processing each job. Therefore, you should release any heavy resources after each job completes. For example, if you are doing image manipulation with the GD library, you should free the memory with imagedestroy when you are done processing the image.

Queue Priorities

Sometimes you may wish to prioritize how your queues are processed. For example, in your config/queue.php configuration file, you may set the default queue for your redis connection to low. However, occasionally you may wish to push a job to a high priority queue like so:

dispatch((new Job)->onQueue('high'));

To start a worker that verifies that all of the high queue jobs are processed before continuing to any jobs on the low queue, pass a comma-delimited list of queue names to the work command:

php artisan queue:work --queue=high,low

Queue Workers and Deployment

Since queue workers are long-lived processes, they will not notice changes to your code without being restarted. So, the simplest way to deploy an application using queue workers is to restart the workers during your deployment process. You may gracefully restart all of the workers by issuing the queue:restart command:

php artisan queue:restart

This command will instruct all queue workers to gracefully exit after they finish processing their current job so that no existing jobs are lost. Since the queue workers will exit when the queue:restart command is executed, you should be running a process manager such as Supervisor to automatically restart the queue workers.

[!NOTE]
The queue uses the cache to store restart signals, so you should verify that a cache driver is properly configured for your application before using this feature.

Job Expirations and Timeouts

Job Expiration

In your config/queue.php configuration file, each queue connection defines a retry_after option. This option specifies how many seconds the queue connection should wait before retrying a job that is being processed. For example, if the value of retry_after is set to 90, the job will be released back onto the queue if it has been processing for 90 seconds without being released or deleted. Typically, you should set the retry_after value to the maximum number of seconds your jobs should reasonably take to complete processing.

[!WARNING]
The only queue connection which does not contain a retry_after value is Amazon SQS. SQS will retry the job based on the Default Visibility Timeout which is managed within the AWS console.

Worker Timeouts

The queue:work Artisan command exposes a --timeout option. By default, the --timeout value is 60 seconds. If a job is processing for longer than the number of seconds specified by the timeout value, the worker processing the job will exit with an error. Typically, the worker will be restarted automatically by a process manager configured on your server:

php artisan queue:work --timeout=60

The retry_after configuration option and the --timeout CLI option are different, but work together to ensure that jobs are not lost and that jobs are only successfully processed once.

[!WARNING]
The --timeout value should always be at least several seconds shorter than your retry_after configuration value. This will ensure that a worker processing a frozen job is always terminated before the job is retried. If your --timeout option is longer than your retry_after configuration value, your jobs may be processed twice.

Supervisor Configuration

In production, you need a way to keep your queue:work processes running. A queue:work process may stop running for a variety of reasons, such as an exceeded worker timeout or the execution of the queue:restart command.

For this reason, you need to configure a process monitor that can detect when your queue:work processes exit and automatically restart them. In addition, process monitors can allow you to specify how many queue:work processes you would like to run concurrently. Supervisor is a process monitor commonly used in Linux environments and we will discuss how to configure it in the following documentation.

Installing Supervisor

Supervisor is a process monitor for the Linux operating system, and will automatically restart your queue:work processes if they fail. To install Supervisor on Ubuntu, you may use the following command:

sudo apt-get install supervisor

[!NOTE]
If configuring and managing Supervisor yourself sounds overwhelming, consider using Laravel Cloud, which provides a fully-managed platform for running Laravel queue workers.

Configuring Supervisor

Supervisor configuration files are typically stored in the /etc/supervisor/conf.d directory. Within this directory, you may create any number of configuration files that instruct supervisor how your processes should be monitored. For example, let's create a laravel-worker.conf file that starts and monitors queue:work processes:

[program:laravel-worker]
process_name=%(program_name)s_%(process_num)02d
command=php /home/forge/app.com/artisan queue:work sqs --sleep=3 --tries=3 --max-time=3600
autostart=true
autorestart=true
stopasgroup=true
killasgroup=true
user=forge
numprocs=8
redirect_stderr=true
stdout_logfile=/home/forge/app.com/worker.log
stopwaitsecs=3600

In this example, the numprocs directive will instruct Supervisor to run eight queue:work processes and monitor all of them, automatically restarting them if they fail. You should change the command directive of the configuration to reflect your desired queue connection and worker options.

[!WARNING]
You should ensure that the value of stopwaitsecs is greater than the number of seconds consumed by your longest running job. Otherwise, Supervisor may kill the job before it is finished processing.

Starting Supervisor

Once the configuration file has been created, you may update the Supervisor configuration and start the processes using the following commands:

sudo supervisorctl reread

sudo supervisorctl update

sudo supervisorctl start "laravel-worker:*"

For more information on Supervisor, consult the Supervisor documentation.

Dealing With Failed Jobs

Sometimes your queued jobs will fail. Don't worry, things don't always go as planned! Laravel includes a convenient way to specify the maximum number of times a job should be attempted. After an asynchronous job has exceeded this number of attempts, it will be inserted into the failed_jobs database table. Synchronously dispatched jobs that fail are not stored in this table and their exceptions are immediately handled by the application.

A migration to create the failed_jobs table is typically already present in new Laravel applications. However, if your application does not contain a migration for this table, you may use the make:queue-failed-table command to create the migration:

php artisan make:queue-failed-table

php artisan migrate

When running a queue worker process, you may specify the maximum number of times a job should be attempted using the --tries switch on the queue:work command. If you do not specify a value for the --tries option, jobs will only be attempted once or as many times as specified by the job class' $tries property:

php artisan queue:work redis --tries=3

Using the --backoff option, you may specify how many seconds Laravel should wait before retrying a job that has encountered an exception. By default, a job is immediately released back onto the queue so that it may be attempted again:

php artisan queue:work redis --tries=3 --backoff=3

If you would like to configure how many seconds Laravel should wait before retrying a job that has encountered an exception on a per-job basis, you may do so by defining a backoff property on your job class:

/**
 * The number of seconds to wait before retrying the job.
 *
 * @var int
 */
public $backoff = 3;

If you require more complex logic for determining the job's backoff time, you may define a backoff method on your job class:

/**
 * Calculate the number of seconds to wait before retrying the job.
 */
public function backoff(): int
{
    return 3;
}

You may easily configure "exponential" backoffs by returning an array of backoff values from the backoff method. In this example, the retry delay will be 1 second for the first retry, 5 seconds for the second retry, 10 seconds for the third retry, and 10 seconds for every subsequent retry if there are more attempts remaining:

/**
 * Calculate the number of seconds to wait before retrying the job.
 *
 * @return array<int, int>
 */
public function backoff(): array
{
    return [1, 5, 10];
}

Cleaning Up After Failed Jobs

When a particular job fails, you may want to send an alert to your users or revert any actions that were partially completed by the job. To accomplish this, you may define a failed method on your job class. The Throwable instance that caused the job to fail will be passed to the failed method:

<?php

namespace App\Jobs;

use App\Models\Podcast;
use App\Services\AudioProcessor;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Queue\Queueable;
use Throwable;

class ProcessPodcast implements ShouldQueue
{
    use Queueable;

    /**
     * Create a new job instance.
     */
    public function __construct(
        public Podcast $podcast,
    ) {}

    /**
     * Execute the job.
     */
    public function handle(AudioProcessor $processor): void
    {
        // Process uploaded podcast...
    }

    /**
     * Handle a job failure.
     */
    public function failed(?Throwable $exception): void
    {
        // Send user notification of failure, etc...
    }
}

[!WARNING]
A new instance of the job is instantiated before invoking the failed method; therefore, any class property modifications that may have occurred within the handle method will be lost.

Retrying Failed Jobs

To view all of the failed jobs that have been inserted into your failed_jobs database table, you may use the queue:failed Artisan command:

php artisan queue:failed

The queue:failed command will list the job ID, connection, queue, failure time, and other information about the job. The job ID may be used to retry the failed job. For instance, to retry a failed job that has an ID of ce7bb17c-cdd8-41f0-a8ec-7b4fef4e5ece, issue the following command:

php artisan queue:retry ce7bb17c-cdd8-41f0-a8ec-7b4fef4e5ece

If necessary, you may pass multiple IDs to the command:

php artisan queue:retry ce7bb17c-cdd8-41f0-a8ec-7b4fef4e5ece 91401d2c-0784-4f43-824c-34f94a33c24d

You may also retry all of the failed jobs for a particular queue:

php artisan queue:retry --queue=name

To retry all of your failed jobs, execute the queue:retry command and pass all as the ID:

php artisan queue:retry all

If you would like to delete a failed job, you may use the queue:forget command:

php artisan queue:forget 91401d2c-0784-4f43-824c-34f94a33c24d

[!NOTE]
When using Horizon, you should use the horizon:forget command to delete a failed job instead of the queue:forget command.

To delete all of your failed jobs from the failed_jobs table, you may use the queue:flush command:

php artisan queue:flush

Ignoring Missing Models

When injecting an Eloquent model into a job, the model is automatically serialized before being placed on the queue and re-retrieved from the database when the job is processed. However, if the model has been deleted while the job was waiting to be processed by a worker, your job may fail with a ModelNotFoundException.

For convenience, you may choose to automatically delete jobs with missing models by setting your job's deleteWhenMissingModels property to true. When this property is set to true, Laravel will quietly discard the job without raising an exception:

/**
 * Delete the job if its models no longer exist.
 *
 * @var bool
 */
public $deleteWhenMissingModels = true;

Pruning Failed Jobs

You may prune the records in your application's failed_jobs table by invoking the queue:prune-failed Artisan command:

php artisan queue:prune-failed

By default, all the failed job records that are more than 24 hours old will be pruned. If you provide the --hours option to the command, only the failed job records that were inserted within the last N number of hours will be retained. For example, the following command will delete all the failed job records that were inserted more than 48 hours ago:

php artisan queue:prune-failed --hours=48

Storing Failed Jobs in DynamoDB

Laravel also provides support for storing your failed job records in DynamoDB instead of a relational database table. However, you must manually create a DynamoDB table to store all of the failed job records. Typically, this table should be named failed_jobs, but you should name the table based on the value of the queue.failed.table configuration value within your application's queue configuration file.

The failed_jobs table should have a string primary partition key named application and a string primary sort key named uuid. The application portion of the key will contain your application's name as defined by the name configuration value within your application's app configuration file. Since the application name is part of the DynamoDB table's key, you can use the same table to store failed jobs for multiple Laravel applications.

In addition, ensure that you install the AWS SDK so that your Laravel application can communicate with Amazon DynamoDB:

composer require aws/aws-sdk-php

Next, set the queue.failed.driver configuration option's value to dynamodb. In addition, you should define key, secret, and region configuration options within the failed job configuration array. These options will be used to authenticate with AWS. When using the dynamodb driver, the queue.failed.database configuration option is unnecessary:

'failed' => [
    'driver' => env('QUEUE_FAILED_DRIVER', 'dynamodb'),
    'key' => env('AWS_ACCESS_KEY_ID'),
    'secret' => env('AWS_SECRET_ACCESS_KEY'),
    'region' => env('AWS_DEFAULT_REGION', 'us-east-1'),
    'table' => 'failed_jobs',
],

Disabling Failed Job Storage

You may instruct Laravel to discard failed jobs without storing them by setting the queue.failed.driver configuration option's value to null. Typically, this may be accomplished via the QUEUE_FAILED_DRIVER environment variable:

QUEUE_FAILED_DRIVER=null

Failed Job Events

If you would like to register an event listener that will be invoked when a job fails, you may use the Queue facade's failing method. For example, we may attach a closure to this event from the boot method of the AppServiceProvider that is included with Laravel:

<?php

namespace App\Providers;

use Illuminate\Support\Facades\Queue;
use Illuminate\Support\ServiceProvider;
use Illuminate\Queue\Events\JobFailed;

class AppServiceProvider extends ServiceProvider
{
    /**
     * Register any application services.
     */
    public function register(): void
    {
        // ...
    }

    /**
     * Bootstrap any application services.
     */
    public function boot(): void
    {
        Queue::failing(function (JobFailed $event) {
            // $event->connectionName
            // $event->job
            // $event->exception
        });
    }
}

Clearing Jobs From Queues

[!NOTE]
When using Horizon, you should use the horizon:clear command to clear jobs from the queue instead of the queue:clear command.

If you would like to delete all jobs from the default queue of the default connection, you may do so using the queue:clear Artisan command:

php artisan queue:clear

You may also provide the connection argument and queue option to delete jobs from a specific connection and queue:

php artisan queue:clear redis --queue=emails

[!WARNING]
Clearing jobs from queues is only available for the SQS, Redis, and database queue drivers. In addition, the SQS message deletion process takes up to 60 seconds, so jobs sent to the SQS queue up to 60 seconds after you clear the queue might also be deleted.

Monitoring Your Queues

If your queue receives a sudden influx of jobs, it could become overwhelmed, leading to a long wait time for jobs to complete. If you wish, Laravel can alert you when your queue job count exceeds a specified threshold.

To get started, you should schedule the queue:monitor command to run every minute. The command accepts the names of the queues you wish to monitor as well as your desired job count threshold:

php artisan queue:monitor redis:default,redis:deployments --max=100

Scheduling this command alone is not enough to trigger a notification alerting you of the queue's overwhelmed status. When the command encounters a queue that has a job count exceeding your threshold, an Illuminate\Queue\Events\QueueBusy event will be dispatched. You may listen for this event within your application's AppServiceProvider in order to send a notification to you or your development team:

use App\Notifications\QueueHasLongWaitTime;
use Illuminate\Queue\Events\QueueBusy;
use Illuminate\Support\Facades\Event;
use Illuminate\Support\Facades\Notification;

/**
 * Bootstrap any application services.
 */
public function boot(): void
{
    Event::listen(function (QueueBusy $event) {
        Notification::route('mail', 'dev@example.com')
            ->notify(new QueueHasLongWaitTime(
                $event->connection,
                $event->queue,
                $event->size
            ));
    });
}

Testing

When testing code that dispatches jobs, you may wish to instruct Laravel to not actually execute the job itself, since the job's code can be tested directly and separately of the code that dispatches it. Of course, to test the job itself, you may instantiate a job instance and invoke the handle method directly in your test.

You may use the Queue facade's fake method to prevent queued jobs from actually being pushed to the queue. After calling the Queue facade's fake method, you may then assert that the application attempted to push jobs to the queue:

<?php

use App\Jobs\AnotherJob;
use App\Jobs\FinalJob;
use App\Jobs\ShipOrder;
use Illuminate\Support\Facades\Queue;

test('orders can be shipped', function () {
    Queue::fake();

    // Perform order shipping...

    // Assert that no jobs were pushed...
    Queue::assertNothingPushed();

    // Assert a job was pushed to a given queue...
    Queue::assertPushedOn('queue-name', ShipOrder::class);

    // Assert a job was pushed twice...
    Queue::assertPushed(ShipOrder::class, 2);

    // Assert a job was not pushed...
    Queue::assertNotPushed(AnotherJob::class);

    // Assert that a Closure was pushed to the queue...
    Queue::assertClosurePushed();

    // Assert the total number of jobs that were pushed...
    Queue::assertCount(3);
});
<?php

namespace Tests\Feature;

use App\Jobs\AnotherJob;
use App\Jobs\FinalJob;
use App\Jobs\ShipOrder;
use Illuminate\Support\Facades\Queue;
use Tests\TestCase;

class ExampleTest extends TestCase
{
    public function test_orders_can_be_shipped(): void
    {
        Queue::fake();

        // Perform order shipping...

        // Assert that no jobs were pushed...
        Queue::assertNothingPushed();

        // Assert a job was pushed to a given queue...
        Queue::assertPushedOn('queue-name', ShipOrder::class);

        // Assert a job was pushed twice...
        Queue::assertPushed(ShipOrder::class, 2);

        // Assert a job was not pushed...
        Queue::assertNotPushed(AnotherJob::class);

        // Assert that a Closure was pushed to the queue...
        Queue::assertClosurePushed();

        // Assert the total number of jobs that were pushed...
        Queue::assertCount(3);
    }
}

You may pass a closure to the assertPushed or assertNotPushed methods in order to assert that a job was pushed that passes a given "truth test". If at least one job was pushed that passes the given truth test then the assertion will be successful:

Queue::assertPushed(function (ShipOrder $job) use ($order) {
    return $job->order->id === $order->id;
});

Faking a Subset of Jobs

If you only need to fake specific jobs while allowing your other jobs to execute normally, you may pass the class names of the jobs that should be faked to the fake method:

test('orders can be shipped', function () {
    Queue::fake([
        ShipOrder::class,
    ]);

    // Perform order shipping...

    // Assert a job was pushed twice...
    Queue::assertPushed(ShipOrder::class, 2);
});
public function test_orders_can_be_shipped(): void
{
    Queue::fake([
        ShipOrder::class,
    ]);

    // Perform order shipping...

    // Assert a job was pushed twice...
    Queue::assertPushed(ShipOrder::class, 2);
}

You may fake all jobs except for a set of specified jobs using the except method:

Queue::fake()->except([
    ShipOrder::class,
]);

Testing Job Chains

To test job chains, you will need to utilize the Bus facade's faking capabilities. The Bus facade's assertChained method may be used to assert that a chain of jobs was dispatched. The assertChained method accepts an array of chained jobs as its first argument:

use App\Jobs\RecordShipment;
use App\Jobs\ShipOrder;
use App\Jobs\UpdateInventory;
use Illuminate\Support\Facades\Bus;

Bus::fake();

// ...

Bus::assertChained([
    ShipOrder::class,
    RecordShipment::class,
    UpdateInventory::class
]);

As you can see in the example above, the array of chained jobs may be an array of the job's class names. However, you may also provide an array of actual job instances. When doing so, Laravel will ensure that the job instances are of the same class and have the same property values of the chained jobs dispatched by your application:

Bus::assertChained([
    new ShipOrder,
    new RecordShipment,
    new UpdateInventory,
]);

You may use the assertDispatchedWithoutChain method to assert that a job was pushed without a chain of jobs:

Bus::assertDispatchedWithoutChain(ShipOrder::class);

Testing Chain Modifications

If a chained job prepends or appends jobs to an existing chain, you may use the job's assertHasChain method to assert that the job has the expected chain of remaining jobs:

$job = new ProcessPodcast;

$job->handle();

$job->assertHasChain([
    new TranscribePodcast,
    new OptimizePodcast,
    new ReleasePodcast,
]);

The assertDoesntHaveChain method may be used to assert that the job's remaining chain is empty:

$job->assertDoesntHaveChain();

Testing Chained Batches

If your job chain contains a batch of jobs, you may assert that the chained batch matches your expectations by inserting a Bus::chainedBatch definition within your chain assertion:

use App\Jobs\ShipOrder;
use App\Jobs\UpdateInventory;
use Illuminate\Bus\PendingBatch;
use Illuminate\Support\Facades\Bus;

Bus::assertChained([
    new ShipOrder,
    Bus::chainedBatch(function (PendingBatch $batch) {
        return $batch->jobs->count() === 3;
    }),
    new UpdateInventory,
]);

Testing Job Batches

The Bus facade's assertBatched method may be used to assert that a batch of jobs was dispatched. The closure given to the assertBatched method receives an instance of Illuminate\Bus\PendingBatch, which may be used to inspect the jobs within the batch:

use Illuminate\Bus\PendingBatch;
use Illuminate\Support\Facades\Bus;

Bus::fake();

// ...

Bus::assertBatched(function (PendingBatch $batch) {
    return $batch->name == 'import-csv' &&
           $batch->jobs->count() === 10;
});

You may use the assertBatchCount method to assert that a given number of batches were dispatched:

Bus::assertBatchCount(3);

You may use assertNothingBatched to assert that no batches were dispatched:

Bus::assertNothingBatched();

Testing Job / Batch Interaction

In addition, you may occasionally need to test an individual job's interaction with its underlying batch. For example, you may need to test if a job cancelled further processing for its batch. To accomplish this, you need to assign a fake batch to the job via the withFakeBatch method. The withFakeBatch method returns a tuple containing the job instance and the fake batch:

[$job, $batch] = (new ShipOrder)->withFakeBatch();

$job->handle();

$this->assertTrue($batch->cancelled());
$this->assertEmpty($batch->added);

Testing Job / Queue Interactions

Sometimes, you may need to test that a queued job releases itself back onto the queue. Or, you may need to test that the job deleted itself. You may test these queue interactions by instantiating the job and invoking the withFakeQueueInteractions method.

Once the job's queue interactions have been faked, you may invoke the handle method on the job. After invoking the job, the assertReleased, assertDeleted, assertNotDeleted, assertFailed, assertFailedWith, and assertNotFailed methods may be used to make assertions against the job's queue interactions:

use App\Exceptions\CorruptedAudioException;
use App\Jobs\ProcessPodcast;

$job = (new ProcessPodcast)->withFakeQueueInteractions();

$job->handle();

$job->assertReleased(delay: 30);
$job->assertDeleted();
$job->assertNotDeleted();
$job->assertFailed();
$job->assertFailedWith(CorruptedAudioException::class);
$job->assertNotFailed();

Job Events

Using the before and after methods on the Queue facade, you may specify callbacks to be executed before or after a queued job is processed. These callbacks are a great opportunity to perform additional logging or increment statistics for a dashboard. Typically, you should call these methods from the boot method of a service provider. For example, we may use the AppServiceProvider that is included with Laravel:

<?php

namespace App\Providers;

use Illuminate\Support\Facades\Queue;
use Illuminate\Support\ServiceProvider;
use Illuminate\Queue\Events\JobProcessed;
use Illuminate\Queue\Events\JobProcessing;

class AppServiceProvider extends ServiceProvider
{
    /**
     * Register any application services.
     */
    public function register(): void
    {
        // ...
    }

    /**
     * Bootstrap any application services.
     */
    public function boot(): void
    {
        Queue::before(function (JobProcessing $event) {
            // $event->connectionName
            // $event->job
            // $event->job->payload()
        });

        Queue::after(function (JobProcessed $event) {
            // $event->connectionName
            // $event->job
            // $event->job->payload()
        });
    }
}

Using the looping method on the Queue facade, you may specify callbacks that execute before the worker attempts to fetch a job from a queue. For example, you might register a closure to rollback any transactions that were left open by a previously failed job:

use Illuminate\Support\Facades\DB;
use Illuminate\Support\Facades\Queue;

Queue::looping(function () {
    while (DB::transactionLevel() > 0) {
        DB::rollBack();
    }
});

本文章首发在 LearnKu.com 网站上。

本文中的所有译文仅用于学习和交流目的,转载请务必注明文章译者、出处、和本文链接
我们的翻译工作遵照 CC 协议,如果我们的工作有侵犯到您的权益,请及时联系我们。

《L03 构架 API 服务器》
你将学到如 RESTFul 设计风格、PostMan 的使用、OAuth 流程,JWT 概念及使用 和 API 开发相关的进阶知识。
《L02 从零构建论坛系统》
以构建论坛项目 LaraBBS 为线索,展开对 Laravel 框架的全面学习。应用程序架构思路贴近 Laravel 框架的设计哲学。
贡献者:6
讨论数量: 13
发起讨论 只看当前版本


simaguo
文档中 supervisor 使用的一些补充说明
3 个点赞 | 0 个回复 | 分享 | 课程版本 5.6
Supervisor的laravel-worker配置补充
2 个点赞 | 4 个回复 | 分享 | 课程版本 6.x
郎中航
队列的简单 demo
2 个点赞 | 3 个回复 | 分享 | 课程版本 5.6
biuBoom-
关于laravel未运行队列处理器但是可以分发任务的问题
0 个点赞 | 12 个回复 | 问答 | 课程版本 6.x
Pendant59
文档中 Redis 驱动队列配置的配置文件
0 个点赞 | 8 个回复 | 分享 | 课程版本 5.5
xuanziDy
php artisan queue:work --queue=high,default 中 --queue参数的疑问
0 个点赞 | 7 个回复 | 问答 | 课程版本 9.x
weiss_schnee
关于laravel队列和excel读取的问题
0 个点赞 | 3 个回复 | 问答 | 课程版本 5.5
Lee丶
CentOS / 7.2 怎么配置 Supervisor?
0 个点赞 | 3 个回复 | 问答 | 课程版本 5.5
kangsf
队列 JOBS
0 个点赞 | 2 个回复 | 问答 | 课程版本 5.5
halt-dudu
Composer 扩展安装 Laravel-queue-rabbitmq 扩展遇到的一些问题
0 个点赞 | 2 个回复 | 问答 | 课程版本 5.8
猪猪
关于队列的参数问题
0 个点赞 | 2 个回复 | 问答 | 课程版本 5.8
请多多指教
Windows 下 Laravel queue 进程守护应该如何处理?
0 个点赞 | 1 个回复 | 教程 | 课程版本 5.5