RabbitMQ实战《延迟队列》
第一次写博客,记录一下RabbitMQ的学习,如果有不正确的地方,还请及时指出
场景
订单超时未支付,关闭订单
用户下单
public function index()
{
//创建订单
$order = new Order();
$order->order_sn = date('YmdHis').time();
$order->user_id = 1;
$order->product_id = 1;
$order->save();
//推送至队列
(new OrderService())->push($order);
//返回相关信息
return true;
}
队列
<?php
namespace App\Service;
use App\Models\Order;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
class OrderService
{
const HOST = '192.168.1.199';
const PORT = '5672';
const LOGIN = 'guest';
const PASSWORD = 'guest';
const VHOST = '/';
//交换机名称
public $exchangeName = 'laravel_exchange_name';
//普通队列名称和路由key
public $queueName = 'laravel_queue_name';
public $routeKey = 'laravel_route_key';
//延迟队列和路由
public $delayQueueName = 'laravel_delay_queue_name';
public $delayRouteKey = 'laravel_delay_route_key';
//延迟时长
public $delaySecond = 10;
public $channel;
public function __construct()
{
$connection = new AMQPStreamConnection(self::HOST,self::PORT,self::LOGIN,self::PASSWORD);
$this->channel = $connection->channel();
$this->init();
}
public function init()
{
// 声明交换机
$this->channel->exchange_declare($this->exchangeName, 'direct', false, true, false);
$this->declareConsumeQueue();
$this->declareDelayQueue();
}
//消费队列
private function declareConsumeQueue()
{
//声明消费队列
$this->channel->queue_declare($this->queueName, false, true, false, false);
//绑定交换机及队列
$this->channel->queue_bind($this->queueName, $this->exchangeName, $this->routeKey);
}
//延迟队列
private function declareDelayQueue()
{
//设置消息过期时间
$tab = new AMQPTable([
'x-dead-letter-exchange' => $this->exchangeName, //消息过期后推送至此交换机
'x-dead-letter-routing-key' => $this->routeKey, //消息过期后推送至此路由地址 //也就是我们消费的正常队列 与①对应
'x-message-ttl' => intval($this->delaySecond) * 1000, //10秒
]);
//声明延迟队列
$this->channel->queue_declare($this->delayQueueName,false,true,false,false,false,$tab);
//绑定交换机及延迟队列
$this->channel->queue_bind($this->delayQueueName, $this->exchangeName, $this->delayRouteKey);
}
//入队列
public function push($order)
{
$message = json_encode([
'id' => $order->id
]);
//创建消息
$msg = new AMQPMessage($message, [
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
]);
//推送至队列 //消息 //交换机名称 //路由 推送至延迟队列中
$this->channel->basic_publish($msg, $this->exchangeName, $this->delayRouteKey);
}
//出队列
public function consume()
{
//消费 普通消费队列 //①
$this->channel->basic_consume($this->queueName, '', false, false, false, false,
[$this, 'process_message']);
while (count($this->channel->callbacks)) {
$this->channel->wait();
}
}
//开始消费
public function process_message($message)
{
$obj = json_decode($message->body);
try {
$order = Order::find($obj->id);
if (strtotime($order->created_at) + $this->delaySecond > time()){
throw new \Exception('取消订单时间未到', 404);
}
//更改数据库状态
$order->status = 10;
$order->colsed_at = date('Y-m-d H:i:s');
$res = $order->save();
if (!$res){
throw new \Exception('取消订单失败', 404);
}
} catch (\Exception $e) {
//记录日志
}
//确认消息处理完成
$message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);
}
}
用户下单
当用户下单以后,延迟队列就会出现一条待消费的记录,这里队列名称和我们代码中生成的名称一致,
laravel_delay_queue_name
当消息过期以后,此消息就会被推送至我们设置好的队列中,也就是
laravel_queue_name
,从而会被消费掉,达到超时未支付,取消订单的效果
定义调度
<?php
namespace App\Console;
use App\Console\Commands\OrderNopay;
use Illuminate\Console\Scheduling\Schedule;
use Illuminate\Foundation\Console\Kernel as ConsoleKernel;
class Kernel extends ConsoleKernel
{
//省略其他代码
protected $commands = [
OrderNopay::class,
];
}
<?php
namespace App\Console\Commands;
use App\Service\OrderService;
use Illuminate\Console\Command;
class OrderNopay extends Command
{
protected $signature = 'order:nopay';
protected $description = '订单超时未支付';
public function __construct()
{
parent::__construct();
}
public function handle(OrderService $order)
{
$order->consume();
}
}
执行
可以使用
php artisan order:nopay
命令,或者通过supervisor
来跑
本作品采用《CC 协议》,转载必须注明作者和本文链接
推荐文章: