2.6. Rabbitmq工作模式之simple模式
Simple模式
- 消息产生着将消息放入队列
- 消息的消费者(consumer) 监听(while) 消息队列,如果队列中有消息,就消费掉,消息被拿走后,自动从队列中删除(隐患 消息可能没有被消费者正确处理,已经从队列中消失了,造成消息的丢失)应用场景:聊天(中间有一个过度的服务器;p端,c端)
Simple模式也是最简单的模式
生产者 ——> 队列 ——> 消费者
就这么简单,你可以理解为redis里面的list“队列”,另外多说一句redis里面的队列都是用list来模拟的,不要以为list就是队列,它,不是!充其量是一个列表,stream类型是redis5新出的,stream还可以够的上是一个队列,并且是乞丐版的kafka,应对一般的流量冲击还是搓搓有余的!
接下来我们会直接上golang当中操作rabbitmq的代码,提前提醒一下这很重要,因为接下里的五中模式都会基于此代码进行改造哈
因为我们需要使用rabbitmq所以在golang当中那就需要引入包,如何引入第三方包知道吧?博客:Go 之基础速学 (七) golang 里包的使用 JSON 化 struct 结构体以及 i...这篇文章里面有,自己去看,你只需要执行命令即可,但是为什么这么搞还是得看文章去,执行命令(记住:执行命令的地方一定要在goland的命令行里面去执行,否则你可能无法引用到amqp包):
go get github.com/streadway/amqp
你会发现怎么go get github.com/streadway/amqp下载不下来呢?是滴!下载不下来,我教你撒:
和我这张图片上的配置保持一致 并且go的版本号必须是1.14 这样你会避免很多的坑!
这样在代码当中你就可以引用amqp包来操作rabbitmq了!
另外需要注意的是 main 文件一定要单独一个包 其他的应用文件一个包,不然很容易出现重复载入包导致错误发生的情况!
直接上代码吧:
rbtmqone.go(一个rabbitmq各种操作的工具类文件):
package rbtmqcs
import (
"fmt"
"github.com/streadway/amqp" "log")
//url 格式 amqp://账号:密码@rabbitmq服务器地址:端口号/Virtual Host
//格式在golang语言当中是固定不变的
const MQURL = "amqp://huj:123456@10.10.16.219:5672/rbtmq"
type RabbitMQ struct {
conn *amqp.Connection //需要引入amqp包 https://learnku.com/articles/44185教会你如何引用amqp包
channel *amqp.Channel
//队列名称
QueueName string
//交换机
Exchange string
//key
Key string
//链接信息
Mqurl string
}
//创建RabbitMQ结构体实例
func NewRabbitMQ(queuename string,exchange string,key string) *RabbitMQ {
rabbitmq := &RabbitMQ{QueueName:queuename,Exchange:exchange,Key:key,Mqurl:MQURL}
var err error
//创建rabbitmq连接
rabbitmq.conn,err = amqp.Dial(rabbitmq.Mqurl) //通过amqp.Dial()方法去链接rabbitmq服务端
rabbitmq.failOnErr(err,"创建连接错误!") //调用我们自定义的failOnErr()方法去处理异常错误信息
rabbitmq.channel,err = rabbitmq.conn.Channel() //链接上rabbitmq之后通过rabbitmq.conn.Channel()去设置channel信道
rabbitmq.failOnErr(err,"获取channel失败!")
return rabbitmq
}
//断开channel和connection
//为什么要断开channel和connection 因为如果不断开他会始终使用和占用我们的channel和connection 断开是为了避免资源浪费
func (r *RabbitMQ) Destory() {
r.channel.Close() //关闭信道资源
r.conn.Close() //关闭链接资源
}
//错误处理函数
func (r *RabbitMQ) failOnErr(err error,message string) {
if err != nil {
log.Fatalf("%s:%s",message,err)
panic(fmt.Sprintf("%s:%s",message,err))
}
}
//简单模式step:1.创建简单模式下的rabbitmq实例
func NewRabbitMQSimple(queueName string) *RabbitMQ {
//simple模式下交换机为空因为会默认使用rabbitmq默认的default交换机而不是真的没有 bindkey绑定建key也是为空的
//特别注意:simple模式是最简单的rabbitmq的一种模式 他只需要传递queue队列名称过去即可 像exchange交换机会默认使用default交换机 绑定建key的会不必要传
return NewRabbitMQ(queueName,"","")
}
//简单模式step:2.简单模式下生产代码
func (r *RabbitMQ) PublishSimple(message string) {
//1.申请队列,如果队列不存在,则会自动创建,如果队列存在则跳过创建直接使用 这样的好处保障队列存在,消息能发送到队列当中
_,err := r.channel.QueueDeclare(
r.QueueName,
//进入的消息是否持久化 进入队列如果不消费那么消息就在队列里面 如果重启服务器那么这个消息就没啦 通常设置为false
false,
//是否为自动删除 意思是最后一个消费者断开链接以后是否将消息从队列当中删除 默认设置为false不自动删除
false,
//是否具有排他性
false,
//是否阻塞 发送消息以后是否要等待消费者的响应 消费了下一个才进来 就跟golang里面的无缓冲channle一个道理 默认为非阻塞即可设置为false
false,
//其他的属性,没有则直接诶传入空即可 nil nil,
)
if err != nil {
fmt.Println(err)
}
//2.发送消息到队列当中
r.channel.Publish(
//交换机 simple模式下默认为空 我们在上边已经赋值为空了 虽然为空 但其实也是在用的rabbitmq当中的default交换机运行
r.Exchange,
//队列的名称
r.QueueName,
//如果为true 会根据exchange类型和routkey规则,如果无法找到符合条件的队列那么会把发送的消息返还给发送者
false,
//如果为true,当exchange发送消息到队列后发现队列上没有绑定消费者则会把消息返还给发送者
false,
//要发送的消息
amqp.Publishing{
ContentType:"text/plain",
Body:[]byte(message),
})
}
//简单模式step:3.简单模式下消费者代码
func (r *RabbitMQ) ConsumeSimple() {
//申请队列和生产消息当中是一样一样滴 直接复制即可
//1.申请队列,如果队列不存在,则会自动创建,如果队列存在则跳过创建直接使用 这样的好处保障队列存在,消息能发送到队列当中
_,err := r.channel.QueueDeclare(
//队列名称
r.QueueName,
//进入的消息是否持久化 进入队列如果不消费那么消息就在队列里面 如果重启服务器那么这个消息就没啦 通常设置为false
false,
//是否为自动删除 意思是最后一个消费者断开链接以后是否将消息从队列当中删除 默认设置为false不自动删除
false,
//是否具有排他性
false,
//是否阻塞 发送消息以后是否要等待消费者的响应 消费了下一个才进来 就跟golang里面的无缓冲channle一个道理 默认为非阻塞即可设置为false
false,
//其他的属性,没有则直接诶传入空即可 nil nil,
)
if err != nil {
fmt.Println(err)
}
//2.接收消息
//建立了链接 就跟socket一样 一直在监听 从未被终止 这也就保证了下边的子协程当中程序的无线循环的成立
msgs,err := r.channel.Consume(
//队列名称
r.QueueName,
//用来区分多个消费者
"",
//是否自动应答 意思就是收到一个消息已经被消费者消费完了是否主动告诉rabbitmq服务器我已经消费完了你可以去删除这个消息啦 默认是true
true,
//是否具有排他性
false,
//如果设置为true表示不能将同一个connection中发送的消息传递给同个connectio中的消费者
false,
//队列消费是否阻塞 fase表示是阻塞 true表示是不阻塞
false,
nil,)
if err != nil {
fmt.Println(err)
}
//3.消费消息
forever := make(chan bool)
//启用协程处理消息
go func() {
//子协程不会结束 因为msgs有监听 会不断的有值进来 就算没值也在监听 就跟socket服务一样一直在监听从未被中断!
for d:=range msgs {
//实现我们要处理的逻辑函数
log.Printf("Recieved a message : %s",d.Body)
//fmt.Println(d.Body)
}
}()
log.Printf("[*] Waiting for message,To exit press CTRL+C")
//最后我们来阻塞一下 这样主程序就不会死掉
<-forever
}
rbtmqtwo.go 生产者往队列里面放入数据:
package main
import (
"fmt"
"rbtmq/rbtmqcs")
func main() {
rabitmq := rbtmqcs.NewRabbitMQSimple("queueone")
rabitmq.PublishSimple("hello huxiaobai12345!!!")
fmt.Println("发送成功!")
}
rabtmqthree.go 消费者从队列里面读取数据:
package main
import (
"rbtmq/rbtmqcs"
)
func main() {
rabbitmq := rbtmqcs.NewRabbitMQSimple("queueone")
rabbitmq.ConsumeSimple()
}
目录结构如下:
再来看一下视图界面吧:
就写这么多,里面很多的注释自己去看吧!代码注释已经写得非常详细啦!毕竟不是视频也许看着会比较费劲,不过没关系撒,有问题联系我15275411187 手机微信同号!
另外我们反复的强调在代码的注释里面 说simple模式下是灭有交换机的exchange,并不是这样的,其实是有的,如下图就是rbtmq这个virtual host对应的就是默认的default交换机 类型是direct