RabbitMQ 入门 - 工作队列

基于 官方文档 翻译

工作队列

(using php-amqplib)

img

在第一篇教程中,我们编写了用于从命名队列发送和接收消息的程序。 在这一个章节中,我们将创建一个工作队列,用于在多个 Work Queue 之间分配耗时的任务。

工作队列(又名:任务队列)背后的主要思想是避免立即执行资源密集型任务,并且必须等待它完成。相反,我们安排稍后完成任务。我们将任务封装为消息并将其发送到队列。 在后台运行的工作进程将弹出任务并最终执行作业。当你运行许多工人时,任务将在他们之间共享。

这个概念在 Web 应用程序中特别有用,因为在短的 HTTP 请求窗口中无法处理复杂的任务。

准备

在本教程的前一部分中,我们发送了一条包含“Hello World!”的消息。 现在我们将发送代表复杂任务的字符串。 我们没有真实世界的任务,比如要调整大小的图像或要渲染的 PDF 文件,所以让我们假装我们很忙 - 使用sleep() 函数来假装。 我们将把字符串中的点数作为它的复杂度;每一个点都会占用一秒的“工作”。 例如,Hello …描述的假任务将需要三秒钟。

我们稍微修改前面例子中的 send.php 代码,以允许从命令行发送任意消息。 这个程序会把任务安排到我们的工作队列中,所以让我们把它命名为 new_task.php:

$data = implode(' ', array_slice($argv, 1));
if(empty($data)) $data = "Hello World!";
$msg = new AMQPMessage($data);

$channel->basic_publish($msg, '', 'hello');

echo " [x] Sent ", $data, "\n";

我们旧的 receive.php 脚本还需要进行一些更改:它需要伪造邮件正文中每个点的第二个工作。 它会从队列中弹出消息并执行任务,所以我们称之为 worker.php:

$callback = function($msg){
  echo " [x] Received ", $msg->body, "\n";
  sleep(substr_count($msg->body, '.'));
  echo " [x] Done", "\n";
};

$channel->basic_consume('hello', '', false, true, false, false, $callback);

请注意,我们的假任务模拟执行时间。

像在教程1中一样运行它们:

# shell 1
php worker.php
# shell 2
php new_task.php "A very hard task which takes two seconds.."

循环调度

使用任务队列的优点之一是可以轻松地平行工作。 如果我们正在积累积压的工作,我们可以增加更多的工作人员,并且这种方式很容易扩展。

首先,我们试着同时运行两个 worker.php 脚本。 他们都会从队列中获取消息,但具体到底是什么? 让我们来看看。

您需要打开三个控制台。 两个将运行 worker.php 脚本。 这些控制台将成为我们的两个消费者 - C1 和 C2。

# shell 1
php worker.php
# => [*] Waiting for messages. To exit press CTRL+C
# shell 2
php worker.php
# => [*] Waiting for messages. To exit press CTRL+C

在第三个我们将发布新的任务。 你可以启动消费者(consumers )发布几条消息:

# shell 3
php new_task.php First message.
php new_task.php Second message..
php new_task.php Third message...
php new_task.php Fourth message....
php new_task.php Fifth message.....

让我们看看交付给我们工人的东西:

# shell 1
php worker.php
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received 'First message.'
# => [x] Received 'Third message...'
# => [x] Received 'Fifth message.....'
# shell 2
php worker.php
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received 'Second message..'
# => [x] Received 'Fourth message....'

默认情况下,RabbitMQ 将按顺序将每条消息发送给下一个使用者。 平均而言,每个消费者将获得相同数量的消息。 这种分配消息的方式称为循环法。 尝试与三名或更多的工人。

消息确认

做任务可能需要几秒钟的时间。 你可能想知道如果其中一个消费者开始一项长期任务并且只是部分完成而死亡会发生什么。 使用我们当前的代码,一旦 RabbitMQ 向客户发送消息,它立即将其标记为删除。 在这种情况下,如果你杀了一个工人,我们将失去刚刚处理的信息。 我们也会失去所有派发给这个特定工作人员但尚未处理的消息。

但我们不想失去任何任务。 如果一名工人死亡,我们希望将任务交付给另一名工人。

为了确保消息永不丢失,RabbitMQ 支持消息确认。消费者发回询问(acknowledgement),告诉 RabbitMQ 收到,处理了特定的消息,并且 RabbitMQ 可以自由删除它。

如果消费者死亡(其通道关闭,连接关闭或TCP连接丢失),RabbitMQ 将理解消息未被完全处理,并将对其重新排队。 如果有其他消费者同时在线,它会迅速将其重新发送给另一位消费者。 这样,即使工作人员偶尔死亡,也可以确保没有任何信息丢失。

没有任何消息超时;当消费者死亡时,RabbitMQ 将传递消息。即使处理消息需要很长时间也没关系。

消息确认默认关闭。 通过将第四个参数设置为 basic_consume 为 false(true表示不询问),并在完成任务后向工作人员发送适当的确认,现在是时候打开它们了。

$callback = function($msg){
  echo " [x] Received ", $msg->body, "\n";
  sleep(substr_count($msg->body, '.'));
  echo " [x] Done", "\n";
  $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};

$channel->basic_consume('task_queue', '', false, false, false, false, $callback);

使用这段代码,我们可以确定,即使在处理消息时使用 CTRL + C 来杀死一个工作者,也不会丢失任何东西。 工人死后不久,所有未确认的消息将被重新发送。

没有确认?

错过这一点是常见的错误。 这是一个很容易的错误,但后果是严重的。 当你的客户退出时(这可能看起来像随机的重新传送),消息将被重新传递,但是 RabbitMQ 将会消耗越来越多的内存,因为它将不能释放任何未消息的消息。

为了调试这种错误,您可以使用 rabbitmqctl 来打印 messages_unacknowledged 字段:

sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged

On Windows, drop the sudo:

rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged

消息持久性

我们已经学会了如何确保即使消费者死亡,任务也不会丢失。 但是如果 RabbitMQ 服务器停止,我们的任务仍然会丢失。

当 RabbitMQ 退出或崩溃时,它会忘记队列和消息,除非您告诉它不要。 需要做两件事来确保消息不会丢失:我们需要将队列和消息标记为持久。

首先,我们需要确保 RabbitMQ 永远不会失去我们的队列。 为了做到这一点,我们需要宣布它是持久的。 为此,我们将第三个参数传递给 queue_declare 为 true :

$channel->queue_declare('hello', false, true, false, false);

虽然这个命令本身是正确的,但它在我们目前的设置中不起作用。 那是因为我们已经定义了一个名为 hello 的队列,这个队列并不持久。 RabbitMQ 不允许您使用不同的参数重新定义现有的队列,并会向任何试图执行该操作的程序返回错误。 但是有一个快速的解决方法 - 让我们声明一个具有不同名称的队列,例如 task_queue :

$channel->queue_declare('task_queue', false, true, false, false);

该标志设置为 true 并需要应用于生产者和消费者代码。

此时我们确信,即使 RabbitMQ 重新启动,task_queue 队列也不会丢失。 现在我们需要将消息标记为持久消息 - 通过设置 AMQPMessage 作为属性数组的一部分所使用的 delivery_mode = 2 消息属性。

$msg = new AMQPMessage($data,
       array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
       );

关于消息持久性的说明

将邮件标记为永久邮件并不能完全保证邮件不会丢失。 尽管它告诉 RabbitMQ 将消息保存到磁盘,但 RabbitMQ 接收到消息并且尚未保存消息时仍有一段时间窗口。 此外,RabbitMQ 不会为每条消息执行fsync(2) - 它可能只是保存到缓存中,并没有真正写入磁盘。 持久性保证不强,但对我们简单的任务队列来说已经足够了。 如果您需要更强大的保证,那么您可以使用 publisher confirms

负载均衡

您可能已经注意到调度仍然无法完全按照我们的要求工作。 例如,在有两名工人的情况下,当所有奇怪的信息都很重,甚至信息很少时,一名工作人员会一直很忙,另一名工作人员几乎不会做任何工作。 那么, RabbitMQ 不知道任何有关这一点,并仍将均匀地发送消息。

发生这种情况是因为 RabbitMQ 只在消息进入队列时调度消息。 它没有考虑消费者未确认消息的数量。 它只是盲目地将第 n 条消息分发给第 n 个消费者。

img

为了解决这个问题,我们可以使用 basic_qos 方法和 prefetch_count = 1 设置。 这告诉 RabbitMQ 一次不要向工作人员发送多个消息。 或者换句话说,不要向工作人员发送新消息,直到它处理并确认了前一个消息。 相反,它会将其分派给不是仍然忙碌的下一个工作人员。

$channel->basic_qos(null, 1, null);

有关队列大小的说明

如果所有的工作人员都很忙,你的队伍可以填满。 你会想看看,也许会增加更多的工人,或者有其他的策略。

把它们放在一起

我们的最终代码 new_task.php 文件:

<?php

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();


$channel->queue_declare('task_queue', false, true, false, false);

$data = implode(' ', array_slice($argv, 1));
if(empty($data)) $data = "Hello World!";
$msg = new AMQPMessage($data,
                        array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
                      );

$channel->basic_publish($msg, '', 'task_queue');

echo " [x] Sent ", $data, "\n";

$channel->close();
$connection->close();

?>

(new_task.php source)

添加我们的 worker.php:

<?php

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->queue_declare('task_queue', false, true, false, false);

echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";

$callback = function($msg){
  echo " [x] Received ", $msg->body, "\n";
  sleep(substr_count($msg->body, '.'));
  echo " [x] Done", "\n";
  $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};

$channel->basic_qos(null, 1, null);
$channel->basic_consume('task_queue', '', false, false, false, false, $callback);

while(count($channel->callbacks)) {
    $channel->wait();
}

$channel->close();
$connection->close();

?>

(worker.php source)

使用消息确认和预取可以设置工作队列。 即使 RabbitMQ 重新启动、持久性选项也可让任务继续存在。

现在我们可以继续阅读 教程 3 并学习如何向许多消费者传递相同的消息。

本作品采用《CC 协议》,转载必须注明作者和本文链接
本帖由系统于 2年前 自动加精
《L04 微信小程序从零到发布》
从小程序个人账户申请开始,带你一步步进行开发一个微信小程序,直到提交微信控制台上线发布。
《G01 Go 实战入门》
从零开始带你一步步开发一个 Go 博客项目,让你在最短的时间内学会使用 Go 进行编码。项目结构很大程度上参考了 Laravel。
讨论数量: 0
(= ̄ω ̄=)··· 暂无内容!

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!