Golang 实现 RabbitMQ 的延迟队列

读本文之前,你应该已经了解 RabbitMQ 的一些概念,如队列、交换机之类。

延迟队列简介

一个队列中的消息在延迟一段时间后才被消费者消费,这样的队列可以称之为延迟队列。

延迟队列的应用场景十分广泛,如:下单后30分钟内未付款则取消订单;在某个时间下发一条通知等。

通过死信实现延迟队列

通过Golang 实现 RabbitMQ 的死信队列的介绍,我们可以很容易的实现一个延迟队列。

  1. 将正常队列的消费者取消;
  2. 发消息时设置TTL;

通过上面两点,正常队列的消息始终不会被消费,而是等待消息TTL到期,进入死信队列,让死信消费者进行消费,从而达到延迟队列的效果。

上面看上去似乎没什么问题,实测一下就会发现消息不会“如期死亡”

当先生产一个TTL为60s的消息,再生产一个TTL为5s的消息,第二个消息并不会再5s后过期进入死信队列,而是需要等到第一个消息TTL到期后,与第一个消息一同进入死信队列。这是因为RabbitMQ 只会判断队列中的第一个消息是否过期。

通过插件实现延迟队列

架构

对于上文的问题,自然有解决方法,那就是通过 RabbitMQ 的 rabbitmq_delayed_message_exchange 插件来解决。本文不赘述 RabbitMQ和插件的安装,你可以参考此文安装或使用Docker来安装。

此插件的原理是将消息在交换机处暂存储在mnesia(一个分布式数据系统)表中,延迟投递到队列中,等到消息到期再投递到队列当中。

简单了解了插件的原理,我们便可以如此设计延迟队列。

实现

生产者实现的关键点:

1.在声明交换机时不在是direct类型,而是x-delayed-message类型,这是由插件提供的类型;

2.交换机要增加"x-delayed-type": "direct"参数设置;

3.发布消息时,要在 Headers 中设置x-delay参数,来控制消息从交换机过期时间;

err = mqCh.Publish(constant.Exchange1, constant.RoutingKey1, false, false, amqp.Publishing{
    ContentType: "text/plain",
    Body:        []byte(message),
    //Expiration: "10000", // 消息过期时间(消息级别),毫秒
    Headers: map[string]interface{}{
        "x-delay": "5000", // 消息从交换机过期时间,毫秒(x-dead-message插件提供)
    },
})

生产者完整代码:

// producter.go
package main

import (
    "fmt"
    "github.com/streadway/amqp"
    "learn_gin/go/rabbitmq/delayletter/constant"
    "learn_gin/go/rabbitmq/util"
    "strconv"
    "time"
)

func main() {
    // # ========== 1.创建连接 ==========
    mq := util.NewRabbitMQ()
    defer mq.Close()
    mqCh := mq.Channel

    // # ========== 2.设置队列(队列、交换机、绑定) ==========
    // 声明队列
    var err error
    _, err = mqCh.QueueDeclare(constant.Queue1, true, false, false, false, amqp.Table{
        // "x-message-ttl": 60000, // 消息过期时间(队列级别),毫秒
    })
    util.FailOnError(err, "创建队列失败")

    // 声明交换机
    //err = mqCh.ExchangeDeclare(Exchange1, amqp.ExchangeDirect, true, false, false, false, nil)
    err = mqCh.ExchangeDeclare(constant.Exchange1, "x-delayed-message", true, false, false, false, amqp.Table{
        "x-delayed-type": "direct", 
    })
    util.FailOnError(err, "创建交换机失败")

    // 队列绑定(将队列、routing-key、交换机三者绑定到一起)
    err = mqCh.QueueBind(constant.Queue1, constant.RoutingKey1, constant.Exchange1, false, nil)
    util.FailOnError(err, "队列、交换机、routing-key 绑定失败")

    // # ========== 4.发布消息 ==========
    message := "msg" + strconv.Itoa(int(time.Now().Unix()))
    fmt.Println(message)
    // 发布消息
    err = mqCh.Publish(constant.Exchange1, constant.RoutingKey1, false, false, amqp.Publishing{
        ContentType: "text/plain",
        Body:        []byte(message),
        //Expiration: "10000", // 消息过期时间(消息级别),毫秒
        Headers: map[string]interface{}{
            "x-delay": "5000", // 消息从交换机过期时间,毫秒(x-dead-message插件提供)
        },
    })
    util.FailOnError(err, "消息发布失败")
}

由于在生产者端建立队列和交换机,所以消费者并不需要特殊的设置,直接附代码。

消费者完整代码:

// consumer.go
package main

import (
    "learn_gin/go/rabbitmq/delayletter/constant"
    "learn_gin/go/rabbitmq/util"
    "log"
)

func main() {
    // # ========== 1.创建连接 ==========
    mq := util.NewRabbitMQ()
    defer mq.Close()
    mqCh := mq.Channel

    // # ========== 2.消费消息 ==========
    msgsCh, err := mqCh.Consume(constant.Queue1, "", false, false, false, false, nil)
    util.FailOnError(err, "消费队列失败")

    forever := make(chan bool)
    go func() {
        for d := range msgsCh {
            // 要实现的逻辑
            log.Printf("接收的消息: %s", d.Body)

            // 手动应答
            d.Ack(false)
            //d.Reject(true)
        }
    }()
    log.Printf("[*] Waiting for message, To exit press CTRL+C")
    <-forever
}

end!

源码Mr-houzi/go-demo

本作品采用《CC 协议》,转载必须注明作者和本文链接
讨论数量: 15

测试了一下代码,好像跑不成功 file

file

file

2年前 评论
Mr-houzi (楼主) 2年前
小花旦 (作者) 2年前
小花旦 (作者) 2年前
Mr-houzi (楼主) 2年前
Mr-houzi (楼主) 2年前

file

2年前 评论
小花旦 (作者) 2年前

// # ========== 1.创建连接 ========== mq := util.NewRabbitMQ() defer mq.Close() mqCh := mq.Channel

// # ========== 2.设置队列(队列、交换机、绑定) ==========
// 声明队列
var err error
_, err = mqCh.QueueDeclare(constant.Queue1, true, false, false, false, amqp.Table{
    // "x-message-ttl": 60000, // 消息过期时间(队列级别),毫秒
})
util.FailOnError(err, "创建队列失败")

// 声明交换机
//err = mqCh.ExchangeDeclare(Exchange1, amqp.ExchangeDirect, true, false, false, false, nil)
err = mqCh.ExchangeDeclare(constant.Exchange1, "x-delayed-message", true, false, false, false, amqp.Table{
    "x-delayed-type": "direct",
})
util.FailOnError(err, "创建交换机失败")

// 队列绑定(将队列、routing-key、交换机三者绑定到一起)
err = mqCh.QueueBind(constant.Queue1, constant.RoutingKey1, constant.Exchange1, false, nil)
util.FailOnError(err, "队列、交换机、routing-key 绑定失败")

// # ========== 4.发布消息 ==========
message := "msg" + strconv.Itoa(int(time.Now().Unix()))
fmt.Println(message)
// 发布消息
err = mqCh.Publish(constant.Exchange1, constant.RoutingKey1, false, false, amqp.Publishing{
    ContentType: "text/plain",
    Body:        []byte(message),
    //Expiration: "10000", // 消息过期时间(消息级别),毫秒
    Headers: map[string]interface{}{
        "x-delay": "5000", // 消息从交换机过期时间,毫秒(x-dead-message插件提供)
    },
})
util.FailOnError(err, "消息发布失败")
2年前 评论

========== 1.创建连接 ==========

mq := util.NewRabbitMQ()
defer mq.Close()
mqCh := mq.Channel

// # ========== 2.消费消息 ==========
msgsCh, err := mqCh.Consume(constant.Queue1, "", false, false, false, false, nil)
util.FailOnError(err, "消费队列失败")

forever := make(chan bool)
go func() {
    for d := range msgsCh {
        // 要实现的逻辑
        log.Printf("接收的消息: %s", d.Body)

        // 手动应答
        d.Ack(false)
        //d.Reject(true)
    }
}()
log.Printf("[*] Waiting for message, To exit press CTRL+C")
<-forever
2年前 评论
Jason0727 1年前

多几次也是不行

file

2年前 评论
Mr-houzi (楼主) 2年前
小花旦 (作者) 2年前

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!