Go Rabbitmq 使用

Rabbitmq 学习使用样例

使用说明需要本机安装好rabbitmq才行

  • 目录结构
├── RabbitMQ
│   └── rabbitmq.go   # rabbitmq实现方法
├── go.mod
├── go.sum
├── mainSimplePublish.go    // 发布文件
└── mainSimpleRecieve.go // 消费文件

1、创建rabbitmq实例 rabbitmq.go

package RabbitMQ

import (
    "fmt"
    "log"

    "github.com/streadway/amqp"
)

// MQURL 格式 amqp://账号:密码@rabbitmq服务器地址:端口号/vhost
const MQURL = "amqp://goshop:123456@127.0.0.1:5672/shop"

type RabbitMQ struct {
    conn    *amqp.Connection
    channel *amqp.Channel
    // 队列名称
    QueueName string
    // 交换机
    Exchange string
    // Key
    Key string
    // 连接信息
    Mqurl string
}

// NewRabbitMQ 创建结构体实例
func NewRabbitMQ(queueName, exchange, key string) *RabbitMQ {
    rabbitmq := &RabbitMQ{
        QueueName: queueName,
        Exchange:  exchange,
        Key:       key,
        Mqurl:     MQURL,
    }
    var err error
    // 创建rabbitmq连接
    rabbitmq.conn, err = amqp.Dial(rabbitmq.Mqurl)
    rabbitmq.failOnErr(err, "创建连接错误!")

    rabbitmq.channel, err = rabbitmq.conn.Channel()
    rabbitmq.failOnErr(err, "获取channel失败!")

    return rabbitmq
}

// Destory 断开channel和connection
func (r *RabbitMQ) Destory() {
    _ = r.channel.Close()
    _ = r.conn.Close()
}

// failOnErr 错误处理函数
func (r *RabbitMQ) failOnErr(err error, message string) {
    if err != nil {
        log.Fatalf("%s:%s", message, err)
        panic(fmt.Sprintf("%s:%s", message, err))
    }
}

// NewRabbitMQSimple
// 简单模式Step 1.创建简单模式下的RabbitMq实例
func NewRabbitMQSimple(queueName string) *RabbitMQ {
    return NewRabbitMQ(queueName, "", "")
}

// 简单模式Step 2:简单模式下生产代码
func (r *RabbitMQ) PublishSimple(message string) {
    // 1. 申请队列,如果队列不存在会自动创建,如何存在则跳过创建
    // 保证队列存在,消息能发送到队列中
    _, err := r.channel.QueueDeclare(
        r.QueueName,
        // 是否持久化
        false,
        // 是否为自动删除
        false,
        // 是否具有排他性
        false,
        // 是否阻塞
        false,
        // 额外属性
        nil,
    )
    if err != nil {
        fmt.Println(err)
    }

    // 2.发送消息到队列中
    r.channel.Publish(
        r.Exchange,
        r.QueueName,
        // 如果为true, 会根据exchange类型和routkey规则,如果无法找到符合条件的队列那么会把发送的消息返回给发送者
        false,
        // 如果为true, 当exchange发送消息到队列后发现队列上没有绑定消费者,则会把消息发还给发送者
        false,
        amqp.Publishing{
            ContentType: "text/plain",
            Body:        []byte(message),
        },
    )
}

// ConsumeSimple 使用 goroutine 消费消息
func (r *RabbitMQ) ConsumeSimple() {
    // 1. 申请队列,如果队列不存在会自动创建,如何存在则跳过创建
    // 保证队列存在,消息能发送到队列中
    _, err := r.channel.QueueDeclare(
        r.QueueName,
        // 是否持久化
        false,
        // 是否为自动删除
        false,
        // 是否具有排他性
        false,
        // 是否阻塞
        false,
        // 额外属性
        nil,
    )
    if err != nil {
        fmt.Println(err)
    }

    // 接收消息
    msgs, err := r.channel.Consume(
        r.QueueName,
        // 用来区分多个消费者
        "",
        // 是否自动应答
        true,
        // 是否具有排他性
        false,
        // 如果设置为true,表示不能将同一个connection中发送的消息传递给这个connection中的消费者
        false,
        // 队列消费是否阻塞
        false,
        nil,
    )

    if err != nil {
        fmt.Println(err)
    }

    forever := make(chan bool)
    // 启用协和处理消息
    go func() {
        for d := range msgs {
            // 实现我们要实现的逻辑函数
            log.Printf("Received a message: %s", d.Body)
            fmt.Println(d.Body)
        }
    }()
    log.Printf("[*] Waiting for message, To exit press CTRL+C")
    <-forever
}

2、建立消费文件和生产文件

  • 消费文件 mainSimpleRecieve.go
package main

import "go-shop/RabbitMQ"

func main() {
    rabbitmq := RabbitMQ.NewRabbitMQSimple("imoocSimple")

    rabbitmq.ConsumeSimple()
}
  • 发布文件 mainSimplePublish.go
import (
    "fmt"
    "go-shop/RabbitMQ"
)

func main() {
    rabbitmq := RabbitMQ.NewRabbitMQSimple("imoocSimple")
    rabbitmq.PublishSimple("Hello goFrame!")
    fmt.Println("发送成功")
}
本作品采用《CC 协议》,转载必须注明作者和本文链接
本帖由系统于 2年前 自动加精
994914376
讨论数量: 1

请教下 接收消息时候 用这一段 是啥意思啊 forever := make(chan bool)

log.Printf("[*] Waiting for message, To exit press CTRL+C") <-forever

3年前 评论

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!