RabbitMQ实战《延迟队列》
第一次写博客,记录一下RabbitMQ的学习,如果有不正确的地方,还请及时指出
场景
订单超时未支付,关闭订单
用户下单
public function index()
{
//创建订单
$order = new Order();
$order->order_sn = date('YmdHis').time();
$order->user_id = 1;
$order->product_id = 1;
$order->save();
//推送至队列
(new OrderService())->push($order);
//返回相关信息
return true;
}
队列
<?php
namespace App\Service;
use App\Models\Order;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
class OrderService
{
const HOST = '192.168.1.199';
const PORT = '5672';
const LOGIN = 'guest';
const PASSWORD = 'guest';
const VHOST = '/';
//交换机名称
public $exchangeName = 'laravel_exchange_name';
//普通队列名称和路由key
public $queueName = 'laravel_queue_name';
public $routeKey = 'laravel_route_key';
//延迟队列和路由
public $delayQueueName = 'laravel_delay_queue_name';
public $delayRouteKey = 'laravel_delay_route_key';
//延迟时长
public $delaySecond = 10;
public $channel;
public function __construct()
{
$connection = new AMQPStreamConnection(self::HOST,self::PORT,self::LOGIN,self::PASSWORD);
$this->channel = $connection->channel();
$this->init();
}
public function init()
{
// 声明交换机
$this->channel->exchange_declare($this->exchangeName, 'direct', false, true, false);
$this->declareConsumeQueue();
$this->declareDelayQueue();
}
//消费队列
private function declareConsumeQueue()
{
//声明消费队列
$this->channel->queue_declare($this->queueName, false, true, false, false);
//绑定交换机及队列
$this->channel->queue_bind($this->queueName, $this->exchangeName, $this->routeKey);
}
//延迟队列
private function declareDelayQueue()
{
//设置消息过期时间
$tab = new AMQPTable([
'x-dead-letter-exchange' => $this->exchangeName, //消息过期后推送至此交换机
'x-dead-letter-routing-key' => $this->routeKey, //消息过期后推送至此路由地址 //也就是我们消费的正常队列 与①对应
'x-message-ttl' => intval($this->delaySecond) * 1000, //10秒
]);
//声明延迟队列
$this->channel->queue_declare($this->delayQueueName,false,true,false,false,false,$tab);
//绑定交换机及延迟队列
$this->channel->queue_bind($this->delayQueueName, $this->exchangeName, $this->delayRouteKey);
}
//入队列
public function push($order)
{
$message = json_encode([
'id' => $order->id
]);
//创建消息
$msg = new AMQPMessage($message, [
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
]);
//推送至队列 //消息 //交换机名称 //路由 推送至延迟队列中
$this->channel->basic_publish($msg, $this->exchangeName, $this->delayRouteKey);
}
//出队列
public function consume()
{
//消费 普通消费队列 //①
$this->channel->basic_consume($this->queueName, '', false, false, false, false,
[$this, 'process_message']);
while (count($this->channel->callbacks)) {
$this->channel->wait();
}
}
//开始消费
public function process_message($message)
{
$obj = json_decode($message->body);
try {
$order = Order::find($obj->id);
if (strtotime($order->created_at) + $this->delaySecond > time()){
throw new \Exception('取消订单时间未到', 404);
}
//更改数据库状态
$order->status = 10;
$order->colsed_at = date('Y-m-d H:i:s');
$res = $order->save();
if (!$res){
throw new \Exception('取消订单失败', 404);
}
} catch (\Exception $e) {
//记录日志
}
//确认消息处理完成
$message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);
}
}
用户下单
当用户下单以后,延迟队列就会出现一条待消费的记录,这里队列名称和我们代码中生成的名称一致,
laravel_delay_queue_name
当消息过期以后,此消息就会被推送至我们设置好的队列中,也就是
laravel_queue_name
,从而会被消费掉,达到超时未支付,取消订单的效果
定义调度
<?php
namespace App\Console;
use App\Console\Commands\OrderNopay;
use Illuminate\Console\Scheduling\Schedule;
use Illuminate\Foundation\Console\Kernel as ConsoleKernel;
class Kernel extends ConsoleKernel
{
//省略其他代码
protected $commands = [
OrderNopay::class,
];
}
<?php
namespace App\Console\Commands;
use App\Service\OrderService;
use Illuminate\Console\Command;
class OrderNopay extends Command
{
protected $signature = 'order:nopay';
protected $description = '订单超时未支付';
public function __construct()
{
parent::__construct();
}
public function handle(OrderService $order)
{
$order->consume();
}
}
执行
可以使用
php artisan order:nopay
命令,或者通过supervisor
来跑
本作品采用《CC 协议》,转载必须注明作者和本文链接
本帖由系统于 4年前 自动加精
高认可度评论:
你记录的没有错,一楼贴的文档地址我一看就知道怎么回事了。
如果要去两公里外的市场买菜,骑共享打车也行,走路也行。突然要去几十公里外的机场了,我打了一个出租车。这个时候有人跳出来就说,一看就没有用过共享单车,用过就不叫车了。我就明白了,此人要么沙雕要么一定是个日行千里的高手。
专业队列软件和框架自带的一些小功能,天差地别。
写的挺好的呀 充分利用了
rabbitmq
队列消息可以设置过期时间的设定 :smiley:你记录的没有错,一楼贴的文档地址我一看就知道怎么回事了。
如果要去两公里外的市场买菜,骑共享打车也行,走路也行。突然要去几十公里外的机场了,我打了一个出租车。这个时候有人跳出来就说,一看就没有用过共享单车,用过就不叫车了。我就明白了,此人要么沙雕要么一定是个日行千里的高手。
专业队列软件和框架自带的一些小功能,天差地别。
支持一下,并非一定要用laravel自带的队列,自己动手也是极好的, laravel队列的有局限性,用laravel生产消费会很方便,假设生产者是go预言,消费者是php,这个时候就另换他发了。
代码结构很清洗 看着很舒服 支持一下
mark一下,对我很有学习的帮助
这个用TTL到死信队列里延迟 消息A 是 ttl 1000 消息b 是ttl 100 ,那消息b 会立即到死信队列中 ,当然 你这取消订单的时间是统一 如果是其他场景 用延迟插件会更好点 :stuck_out_tongue_winking_eye: 就是探讨一下 没别的意思 其实哪种思路只要达到目的就是好的
楼主你用的mq版本多少?
Mark
:joy:
死信队列有超时限制,久了就无效了
java版的死信队列:blog.csdn.net/zhangcongyi420/artic...
能定义不同队列不同的过期时间吗?