RabbitMQ实战《延迟队列》

第一次写博客,记录一下RabbitMQ的学习,如果有不正确的地方,还请及时指出

场景

订单超时未支付,关闭订单

用户下单

public function index()
{
    //创建订单
    $order = new Order();
    $order->order_sn = date('YmdHis').time();
    $order->user_id = 1;
    $order->product_id = 1;
    $order->save();

    //推送至队列
    (new OrderService())->push($order);
    //返回相关信息
    return true;

}

队列

<?php

namespace App\Service;


use App\Models\Order;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;

class OrderService
{
    const HOST = '192.168.1.199';
    const PORT = '5672';
    const LOGIN = 'guest';
    const PASSWORD = 'guest';
    const VHOST = '/';

    //交换机名称
    public $exchangeName = 'laravel_exchange_name';

    //普通队列名称和路由key
    public $queueName = 'laravel_queue_name';
    public $routeKey = 'laravel_route_key';

    //延迟队列和路由
    public $delayQueueName = 'laravel_delay_queue_name';
    public $delayRouteKey = 'laravel_delay_route_key';

    //延迟时长
    public $delaySecond = 10;

    public $channel;

    public function __construct()
    {
        $connection = new AMQPStreamConnection(self::HOST,self::PORT,self::LOGIN,self::PASSWORD);
        $this->channel = $connection->channel();

        $this->init();
    }

    public function init()
    {
        // 声明交换机
        $this->channel->exchange_declare($this->exchangeName, 'direct', false, true, false);

        $this->declareConsumeQueue();
        $this->declareDelayQueue();
    }

    //消费队列
    private function declareConsumeQueue()
    {
        //声明消费队列
        $this->channel->queue_declare($this->queueName, false, true, false, false);
        //绑定交换机及队列
        $this->channel->queue_bind($this->queueName, $this->exchangeName, $this->routeKey);
    }

    //延迟队列
    private function declareDelayQueue()
    {
        //设置消息过期时间
        $tab = new AMQPTable([
            'x-dead-letter-exchange' => $this->exchangeName,    //消息过期后推送至此交换机
            'x-dead-letter-routing-key' => $this->routeKey,        //消息过期后推送至此路由地址        //也就是我们消费的正常队列    与①对应
            'x-message-ttl' => intval($this->delaySecond) * 1000, //10秒
        ]);
        //声明延迟队列
        $this->channel->queue_declare($this->delayQueueName,false,true,false,false,false,$tab);
        //绑定交换机及延迟队列
        $this->channel->queue_bind($this->delayQueueName, $this->exchangeName, $this->delayRouteKey);
    }

    //入队列
    public function push($order)
    {
        $message = json_encode([
            'id' => $order->id
        ]);

        //创建消息
        $msg = new AMQPMessage($message, [
            'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
        ]);
        //推送至队列                   //消息   //交换机名称        //路由  推送至延迟队列中
        $this->channel->basic_publish($msg, $this->exchangeName, $this->delayRouteKey);
    }

    //出队列
    public function consume()
    {
        //消费  普通消费队列            //①
        $this->channel->basic_consume($this->queueName, '', false, false, false, false,
            [$this, 'process_message']);

        while (count($this->channel->callbacks)) {
            $this->channel->wait();
        }
    }

    //开始消费
    public function process_message($message)
    {

        $obj = json_decode($message->body);
        try {

            $order = Order::find($obj->id);
            if (strtotime($order->created_at) + $this->delaySecond > time()){
                throw new \Exception('取消订单时间未到', 404);
            }

            //更改数据库状态
            $order->status = 10;
            $order->colsed_at = date('Y-m-d H:i:s');
            $res = $order->save();

            if (!$res){
                throw new \Exception('取消订单失败', 404);
            }

        } catch (\Exception $e) {
            //记录日志
        }
        //确认消息处理完成
        $message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);
    }

}

用户下单

当用户下单以后,延迟队列就会出现一条待消费的记录,这里队列名称和我们代码中生成的名称一致,laravel_delay_queue_name

当消息过期以后,此消息就会被推送至我们设置好的队列中,也就是laravel_queue_name,从而会被消费掉,达到超时未支付,取消订单的效果

定义调度

<?php

namespace App\Console;

use App\Console\Commands\OrderNopay;
use Illuminate\Console\Scheduling\Schedule;
use Illuminate\Foundation\Console\Kernel as ConsoleKernel;

class Kernel extends ConsoleKernel
{
//省略其他代码
    protected $commands = [
        OrderNopay::class,
    ];
}
<?php

namespace App\Console\Commands;

use App\Service\OrderService;
use Illuminate\Console\Command;

class OrderNopay extends Command
{
    protected $signature = 'order:nopay';

    protected $description = '订单超时未支付';

    public function __construct()
    {
        parent::__construct();
    }

    public function handle(OrderService $order)
    {
        $order->consume();
    }
}

执行

可以使用 php artisan order:nopay命令,或者通过supervisor来跑

本作品采用《CC 协议》,转载必须注明作者和本文链接
本帖由系统于 3年前 自动加精
《L05 电商实战》
从零开发一个电商项目,功能包括电商后台、商品 & SKU 管理、购物车、订单管理、支付宝支付、微信支付、订单退款流程、优惠券等
《L04 微信小程序从零到发布》
从小程序个人账户申请开始,带你一步步进行开发一个微信小程序,直到提交微信控制台上线发布。
讨论数量: 32

你记录的没有错,一楼贴的文档地址我一看就知道怎么回事了。

如果要去两公里外的市场买菜,骑共享打车也行,走路也行。突然要去几十公里外的机场了,我打了一个出租车。这个时候有人跳出来就说,一看就没有用过共享单车,用过就不叫车了。我就明白了,此人要么沙雕要么一定是个日行千里的高手。

专业队列软件和框架自带的一些小功能,天差地别。

3年前 评论
xiaoAgiao 3年前
aruisi 3年前
aruisi 3年前
aruisi 3年前
rufo 3年前
aruisi 3年前
aruisi 3年前
sh512953070 2年前

你记录的没有错,一楼贴的文档地址我一看就知道怎么回事了。

如果要去两公里外的市场买菜,骑共享打车也行,走路也行。突然要去几十公里外的机场了,我打了一个出租车。这个时候有人跳出来就说,一看就没有用过共享单车,用过就不叫车了。我就明白了,此人要么沙雕要么一定是个日行千里的高手。

专业队列软件和框架自带的一些小功能,天差地别。

3年前 评论
xiaoAgiao 3年前
aruisi 3年前
aruisi 3年前
aruisi 3年前
rufo 3年前
aruisi 3年前
aruisi 3年前
sh512953070 2年前

支持一下,并非一定要用laravel自带的队列,自己动手也是极好的, laravel队列的有局限性,用laravel生产消费会很方便,假设生产者是go预言,消费者是php,这个时候就另换他发了。

3年前 评论
goodgood (作者) 3年前
老衲爱饮酒 (楼主) 3年前
xiaoAgiao 3年前
goodgood (作者) 3年前
aruisi 3年前
goodgood (作者) 3年前
panda-sir

写的挺好的呀 充分利用了rabbitmq队列消息可以设置过期时间的设定 :smiley:

3年前 评论
xiaoAgiao

代码结构很清洗 看着很舒服 支持一下

3年前 评论

mark一下,对我很有学习的帮助

3年前 评论

这个用TTL到死信队列里延迟 消息A 是 ttl 1000 消息b 是ttl 100 ,那消息b 会立即到死信队列中 ,当然 你这取消订单的时间是统一 如果是其他场景 用延迟插件会更好点 :stuck_out_tongue_winking_eye: 就是探讨一下 没别的意思 其实哪种思路只要达到目的就是好的

3年前 评论

楼主你用的mq版本多少?

3年前 评论
anzichen

:joy:

3年前 评论
JeffreyBool

死信队列有超时限制,久了就无效了

3年前 评论

能定义不同队列不同的过期时间吗?

2年前 评论

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!