队列任务

未匹配的标注

队列和作业

队列和作业

简介

几乎所有应用程序都可以使用队列。队列是一种很好的方法,通过将任务发送到后台或消息队列中,使时间密集型任务看起来是即时的。将不需要立即返回值的任何内容和所有不需要立即返回值的内容发送到队列中是很棒的 (例如发送电子邮件或触发 API 调用)。队列系统通过 QueueProvider 服务提供者加载到 Masonent 中。

Masonite 在适当的时候使用 pickle 对 Python 对象进行序列化和反序列化。确保您要序列化的对象没有任何最终用户提供的代码,这些代码在反序列化部分可能会序列化为Python对象。

阅读有关 pickle 漏洞利用 并确保您的特定应用程序免受任何攻击途径的保护是明智的。

默认情况下,所有配置设置都在 config / queue.py文件中。开箱即用,Masonite 支持 3 种驱动程序:

async
amqp
*database

async 驱动程序仅使用多线程将作业发送到后台。amqp 驱动程序用于任何与 AMQP 兼容的消息队列,例如RabbitMQ。如果您确实创建了驱动程序,请考虑让其在 PyPi 上可用,以便其他人也可以安装它。database 驱动程序具有一些其他驱动程序没有的功能,如果您需要更细粒度的控制。

Jobs(作业)

作业只是简单的 Python 类,它们继承了 Masonent 提供的 Queuable 类。我们可以简单地使用 craft job 命令来创建作业。

$ craft job SendWelcomeEmail

这将在 app/jobs/SendWelcomeEmail.py 中创建一个新作业。我们的作业将如下所示:

from masonite.queues import Queueable

class SendWelcomeEmail(Queueable):

    def __init__(self):
        pass

    def handle(self):
        pass

将作业添加到队列

我们可以使用 Queue 类来运行作业。让我们从控制器方法运行此作业:

from app.jobs.SendWelcomeEmail import SendWelcomeEmail
from masonite import Queue

def show(self, queue: Queue):
    queue.push(SendWelcomeEmail)

就是这样。现在,该作业将发送到队列并运行 handle 方法。

解析

注意,在上面的 show 方法中,我们只传入了类对象。并没有实例化该类。在这种情况下,Masonite 将解析作业的构造函数。所有作业构造函数都可以由容器解析,因此我们可以像往常一样简单地传递我们需要的任何东西:

from masonite.queues import Queueable
from masonite.request import Request
from masonite import Mail

class SendWelcomeEmail(Queueable):

    def __init__(self, request: Request, mail: Mail):
        self.request = request
        self.mail = mail

    def handle(self):
        pass

请记住,由容器解析的所有内容,都可以通过简单地传入位于容器中的对象的参数来从容器中检索任何内容。请阅读 服务容器 文档,了解有关容器的更多信息。

实例化

如果需要通过控制器方法传递数据,我们也可以实例化作业。这根本不会解析作业的构造函数:

from app.jobs.SendWelcomeEmail import SendWelcomeEmail
from masonite import Queue

def show(self, queue: Queue):
    var1 = 'value1'
    var2 = 'value2'

    queue.push(SendWelcomeEmail(var1, var2))

现在,我们的作业类的构造函数将如下所示:

class SendWelcomeEmail(Queueable):

    def __init__(self, var1, var2):
        self.var1 = var1
        self.var2 = var2

执行 Jobs

每当执行作业时,它仅执行 handle 方法。因此,我们可以发送欢迎电子邮件:

from masonite.queues import Queueable
from masonite.request import Request
from masonite import Mail

class SendWelcomeEmail(Queueable):

    def __init__(self, request: Request, mail: Mail):
        self.request = request
        self.mail = mail

    def handle(self):
        self.mail.driver('mailgun').to(self.request.user().email).template('mail/welcome').send()

就是这样!该作业将被加载到队列中。默认情况下,Masonite 使用 async 驱动程序,该驱动程序仅将任务发送到后台。

我们还可以通过将更多作业传递给 .push() 方法来将多个作业发送到队列:

from app.jobs.SendWelcomeEmail import SendWelcomeEmail
from app.jobs.TutorialEmail import TutorialEmail
from masonite import Queue

def show(self, queue: Queue):
    queue.push(SendWelcomeEmail, TutorialEmail('val1', 'val2'))

将变量传递到作业中

大多数时候,您将需要解析构造函数,还需要将变量传递到 handle() 方法中。这可以通过将迭代器传入 args= 关键字参数中来完成:

from masonite import Queue

def show(self, queue: Queue):
    queue.push(SendWelcomeEmail, args=['user@email.com'])

这将传递给您的 handle 方法:

from masonite.request import Request
from masonite import Mail
class SendWelcomeEmail(Queueable):

    def __init__(self, request: Request, mail: Mail):
        self.request = request
        self.mail = mail

    def handle(self, email):
        email # =='user@email.com'

传递函数或方法

您还可以使用队列驱动程序调用任意函数或方法。您需要做的就是在 push 方法中传递其引用,并在 args 参数中传递所需的任何参数,如下所示:

def run_async(obj1, obj2):
    pass

def show(self, queue: Queue):
    obj1 = SomeObject()
    obj2 = AnotherObject()
    queue.push(run_async, args=(obj1, obj2))

这将使此函数排队等待稍后调用。

请注意,您将无法获取响应值。一旦它被发送到队列,它将在以后的任意时间运行。

异步驱动程序

async 队列驱动程序将允许您将作业发送到后台以异步运行。这不需要任何第三方服务,例如下面的 amqp 驱动程序。

更改模式

异步驱动程序有 2 种不同的模式:线程多进程。两者之间的区别在于 threading 使用多个线程,而 multiprocess 使用多个进程。应使用哪种模式取决于您正在处理的作业类型。您应该根据用例研究最好的方法。

您可以在 config/queue.py 文件中更改模式:

DRIVERS = {
    'async': {
        'mode': 'threading' # or 'multiprocess'
    },
}

阻塞

在开发过程中,可能很难调试异步任务。如果引发异常,将很难捕获到异常。可能有一个作业从未运行过。

为了解决这个问题,您可以在 config/queue.py 文件中设置 blocking 属性为 True:

DRIVERS = {
    'async': {
        'mode': 'threading' # or 'multiprocess',
        'blocking': True
    },
}

阻塞基本上使异步任务同步运行。这将在您的终端内部启用一些报告,如下所示:

GET Route: /categories
 Job Ran: <Future at 0x1032cef60 state=finished returned str> 
 Job Ran: <Future at 0x1032f1a90 state=finished returned str> 
 ...

这也将同步运行任务,以便您可以在开发期间发现作业中的异常和问题。

对于生产环境,应将其设置为 False

最好将此设置项设置为等于您的 APP_DEBUG 环境变量:

from masonite import env

DRIVERS = {
    'async': {
        'mode': 'threading' # or 'multiprocess',
        'blocking': env('APP_DEBUG')
    },
}

这样,它将在开发期间始终处于阻塞状态,并在生产期间自动切换为非阻塞状态。

AMQP 驱动程序

amqp 驱动程序可用于与 RabbitMQ 服务进行通信。

安装

为了开始使用此驱动程序,您将需要在开发机器 (或生产机器,取决于运行 Masonite 的机器) 上安装 RabbitMQ。

您可以在此处找到 RabbitMQ 的安装指南

运行 RabbitMQ

安装了 RabbitMQ 后,下一步就可以运行它。如果成功运行,则在终端中看起来像这样:

$ rabbitmq-server

  ##  ##
  ##  ##      RabbitMQ 3.7.8. Copyright (C) 2007-2018 Pivotal Software, Inc.
  ##########  Licensed under the MPL.  See http://www.rabbitmq.com/
  ######  ##
  ##########  Logs: /usr/local/var/log/rabbitmq/rabbit@localhost.log
                    /usr/local/var/log/rabbitmq/rabbit@localhost_upgrade.log

              Starting broker...
 completed with 6 plugins.

Great! 现在 RabbitMQ 已启动并运行,我们可以看一下 Masonite 部分。

现在,我们需要确保正确指定了驱动程序和驱动程序配置。以下是应该连接到当前 RabbitMQ 配置的默认值。所有内容都在您的 app/queue.py 文件中

DRIVER = 'amqp'
...
DRIVERS = {
    'amqp': {
        'username': 'guest',
        'password': 'guest',
        'host': 'localhost',
        'port': '5672',
        'channel': 'default',
    }
}

如果您的 Rabbit MQ 实例需要 vhost 但没有端口,则可以添加 vhost 并将端口设置为 none。vhostport 都可以选择为 None。如果您是在本地进行开发,则应该将 vhost 排除在外。以下设置最有可能用于您的生产设置:

DRIVER = 'amqp'
...
DRIVERS = {
    'amqp': {
        'username': 'guest',
        'vhost': '/',
        'password': 'guest',
        'host': 'localhost',
        'port': None,
        'channel': 'default',
    }
}

Database 驱动程序

数据库驱动程序会将所有作业存储在名为 queue_jobs 的数据库表中,并在失败时将所有失败的作业存储在 failed_jobs 表中(如果存在)。如果 failed_jobs 表不存在,则它将不存储任何失败的作业,并且所有失败的作业都将丢失。

迁移

为了获得这两个 queue 表,您可以运行 queue:table 命令,并在其中选择要标记的表:

此命令将创建 queue_jobs 迁移,您可以在其中存储作业:

$ craft queue:table --jobs

此命令将创建 failed_jobs 迁移,您可以在其中存储失败的作业:

$ craft queue:table --failed

创建这些迁移后,您可以运行 migration 命令:

$ craft migrate

延迟 Jobs

使用 database 驱动程序可以轻松地延迟作业。当前其他驱动程序不具备此功能。为了延迟作业,您可以使用 wait 关键字来启用字符串形式的时间。

def show(self, queue: Queue):
    queue.push(SendWelcomeEmail, wait="10 minutes")

启动 Worker

现在,我们可以使用 queue:work 命令启动 worker。在新的终端窗口中运行此命令可能是一个好主意,因为它将一直运行直到关闭它为止。

$ craft queue:work

这将启动 worker 并开始侦听通过 Masonent 项目传入的作业。

您还可以使用 -d--driver 选项指定要为其创建 worker 的驱动程序

$ craft queue:work --driver amqp

您也可以指定 channelchannel 对于不同的驱动程序可能意味着不同的含义。对于 amqp 驱动程序,channel 是要侦听的队列。对于 database 驱动程序,channel 是用于查找queue_jobsqueue_failed 表的连接。

$ craft queue:work --driver database --channel sqlite

发送 Jobs

像平常一样发送作业,它将通过 RabbitMQ 处理:

from app.jobs import SomeJob, AnotherJob
from masonite import Queue
...
def show(self, queue: Queue):
    # do your normal logic
    queue.push(SomeJob, AnotherJob(1,2))

您还可以通过运行以下命令指定要推送到的 channel:

queue.push(SomeJob, AnotherJob(1,2), channel="high")

失败的 Jobs

有时您的作业会失败。这可能是由于多种原因 (例如异常),但 Masonite 会尝试连续运行该作业 3 次,在两次作业之间等待 1 秒钟,直到最终调用该作业失败。

如果传递到队列中的对象不是作业 (或实现 Queueable 的类),则该作业将不会重新排队。它只会尝试运行一次。

处理失败的 Jobs

每个作业可以具有一个 failed 方法,该方法将在作业失败时被调用。您可以执行诸如修复参数和重新排队,调用其他队列,向开发团队发送电子邮件等操作。

这看起来像:

from masonite.queues import Queueable
from masonite.request import Request
from masonite import Mail

class SendWelcomeEmail(Queueable):

    def __init__(self, request: Request, mail: Mail):
        self.request = request
        self.mail = mail

    def failed(self, payload, error):
        self.mail.to('developer@company.com').send('The welcome email failed')

请务必注意,只有从 Queueable 类扩展的类才会处理失败。所有其他排队的对象将简单地死掉,并没有失败的回调。

注意 failed 方法 必须 带 2 个参数。

第一个参数是尝试运行的负载,它是如下所示的信息字典:

payload == {
    'obj': <class app.jobs.SomeJob>,
    'args': ('some_variables',), 
    'callback': 'handle', 
    'created': '2019-02-08T18:49:59.588474-05:00', 
    'ran': 3
}

错误可能类似于 division by zero(除以零)

存储失败的 Jobs

默认情况下,当作业失败时,它会消失并且无法再次运行,因为 Masonite 不存储此信息。

如果希望存储失败的作业以便以后再次运行它们,则需要创建一个 queue 表。Masonite 中这非常容易实现。

首先,您需要运行:

$ craft queue:table

这将在 databases/migrations 中创建新的迁移。然后您可以迁移它:

$ craft migrate

现在只要发生失败的作业,它就会将信息存储在这个新表中。

运行失败的 Jobs

您可以通过运行以下命令来运行所有失败的作业

$ craft queue:work --failed

这将从数据库中获取所有作业,并将其发送回队列。如果它们再次失败,则将它们重新添加到该数据库表中。

指定失败的 Jobs

您可以通过直接在作业上进行指定来修改以上设置。例如,您可能希望指定作业失败时重新运行 5 次,而不是 3 次,或者完全不应该重新运行。

在作业上指定此内容可能类似于:

from masonite.request import Request
from masonite import Mail

class SendWelcomeEmail(Queueable):

    run_again_on_fail = False

    def __init__(self, request: Request, mail: Mail):
        self.request = Request
        self.mail = Mail

    def handle(self, email):
        ...

这将不会在作业失败时尝试重新运行。

您可以通过指定 run_times 属性来指定作业失败时将重新运行的次数:

from masonite.request import Request
from masonite import Mail

class SendWelcomeEmail(Queueable):

    run_times = 5

    def __init__(self, request: Request, mail: Mail):
        self.request = Request
        self.mail = Mail

    def handle(self, email):
        ...

本文章首发在 LearnKu.com 网站上。

本译文仅用于学习和交流目的,转载请务必注明文章译者、出处、和本文链接
我们的翻译工作遵照 CC 协议,如果我们的工作有侵犯到您的权益,请及时联系我们。
上一篇 下一篇
Summer
贡献者:1
讨论数量: 0
发起讨论 只看当前版本


暂无话题~