队列和任务
队列和作业
Masonite 有一个强大的队列系统。对于耗时的任务非常有用,例如发送电子邮件、处理视频、开发票、 更新记录以及其它所有不需要用户等待的任务。
首先,创建一个符合要求的作业并让他运行。然后将作业“推送”到队列中,稍后需要启动“队列工作者”才能运行。你可以生成多个队列工作者,相信自己的服务器。
除了运行作业,还允许你监视所有失败的作业。作业数据将保存到数据库中,在需要时可以进行监视和重新运行。
配置
修改队列配置,来达到不同的队列系统行为。
可用的驱动程序有: async
, database
and amqp
.
完整的配置如下所示:
DRIVERS = {
"default": "async",
"database": {
"connection": "mysql",
"table": "jobs",
"failed_table": "failed_jobs",
"attempts": 3,
"poll": 5,
"tz": "UTC"
},
"amqp": {
"username": "guest",
"password": "guest",
"port": "5672",
"vhost": "",
"host": "localhost",
"channel": "default",
"queue": "masonite4",
},
"async": {
"blocking": False,
"callback": "handle",
"mode": "threading",
"workers": 1,
},
}
默认队列
default字段用来表示默认使用哪个驱动程序。这个字段的值,对应任意一个驱动的字段名。
数据库驱动
如果要使用数据库驱动,你应该先创建一个作业表:
$ python craft queue:table
这将在你的迁移目录中创建一个迁移文件
如果你要保存失败的作业,还需要创建一张失败作业的表
$ python craft queue:failed
然后执行迁移:
$ python craft migrate
这是数据库驱动程序,它通过连接数据库来处理作业.
选项 | 说明 |
---|---|
connection |
数据库名称,比如mysql |
table |
存储作业的表名 |
failed_jobs |
存储失败作业的表名,设置None ,不使用此功能. |
attempts |
如果任务执行失败,会尝试重新执行,此值就是重新执行的次数. |
poll |
查找新作业前的等待时间,以秒为单位。 |
tz |
时区 |
AMQP 驱动程序
AMQP 驱动程序用于使用 AMQP 协议的连接,例如 RabbitMQ。
可用选项包括:
选项 | 说明 |
---|---|
username |
你的 AMQP 的连接用户名 |
password |
AMQP 的连接密码 |
port |
AMQP 的连接端口 |
vhost |
你的虚拟主机的名称,可以通过你的 AMQP 连接仪表板获得此信息 |
host |
IP 地址或主机名 |
channel |
将队列作业推送到的通道 |
queue |
默认队列名 |
Async 驱动程序
async驱动程序将简单地使用进程、线程在内存中运行。这是最简单的驱动程序,因为它不需要任何特殊的软件或设置。
可用选项包括:
选项 | 说明 |
---|---|
blocking |
是否同步运行,布尔值. |
callback |
要运行的队列作业上的方法名称. |
mode |
使用进程或线程。选项是threading 或multiprocess |
workers |
进程或线程的数量. |
创建作业
为了在队列中进行工作,需要先创建一个作业(job)。 这个作业会变成一个实体,可序列化,并在稍后运行。
使用job命令创建作业:
$ python craft job CreateInvoice
现在有了一个作业类(job class),然后就可以在里面处理自己的业务了:
from masonite.queues import Queueable
class CreateInvoice(Queueable):
def handle(self):
pass
所有的业务都要包含在handle方法中:
class CreateInvoice(Queueable):
def __init__(self, order_id):
self.order_id = order_id
def handle(self):
# Generate invoice documents
pass
排队作业
你可以将作业丢入队列中进行处理:
from masonite.queues import Queue
from app.jobs.CreateInvoice import CreateInvoice
class InvoiceController:
def generate(self, queue: Queue):
# 推送一个作业进入队列,没有参数
queue.push(CreateInvoice())
# 带参数的作业
# 根据订单创建发票
queue.push(CreateInvoice(Order.find(1).id))
你还可以在 push 方法里使用关键字参数,用来指定其它的参数:
queue.push(
CreateInvoice(Order.find(1).id)
driver="async" # 驱动程序
queue="invoices" # 执行队列时的名称
)
队列工作者(Queue Workers)
我们运行一个队列,这是一个终端进程,使用 'queue:work' 命令:
$ python craft queue:work
这将使用默认配置启动程序。但你可以增加些参数,这相当于动态配置:
选项 | 说明 |
---|---|
--driver database |
驱动程序:database |
--queue invoices |
队列名: invoices |
--connection mysql |
连接目标:mysql |
--poll 5 |
延迟时间。默认为1秒 |
--attempts 5 |
重试次数,默认3次 |
带参数的运行队列,如下所示:
$ python craft queue:work --driver database --connection mysql --poll 5 --attempts 2
失败的作业
如果你的配置正确,当作业失败时,它们将进入失败的作业表。你可以在此处监控失败的原因,或者重新运行或删除。
如果你重新运行,它们将在最后被放回队列中,并以正常的排队过程重新运行。
要重新运行失败的作业,可以使用以下命令:
$ python craft queue:retry
你还可以指定一些参数:
Option | Description |
---|---|
--driver database |
指定使用的驱动程序:database |
--queue invoices |
指定使用的队列名:invoices |
--connection mysql |
指定使用的连接,这里使用mysql |
本译文仅用于学习和交流目的,转载请务必注明文章译者、出处、和本文链接
我们的翻译工作遵照 CC 协议,如果我们的工作有侵犯到您的权益,请及时联系我们。