2.15. rabbitmq当中的qos(补充篇)
消费者流量控制
在实际项目中使用RabbitMQ的时候,由于消费者自身处理消息的效率并不高,如果说这个时候生产者还是不断的在生产消息,一直推送消息到消费者,那么很容易引起消费者的宕机。
rabbitmq提供了一个限流机制,用于限制一次性推送到消费者客户端的消息数量,让消费者都处理完了消息之后,生产者再推送新的消息过来
限流的原因
出于以下两个方面,所以需要对消费者进行一些限流策略
【a】假设某个时候,在rabbitmq队列中已经堆积了非常非常多的消息,这个时候,如果有一个消费者启动,那么大量的消息将会一起推送到这个消费者上面,这种瞬间超大流量,很有可能导致服务器崩溃
【b】生产者生产消息的效率比消费者处理消息的效率高很多,两端之间这种效率不平衡性。所以消费端需要做一些限流措施,否则可能导致消费端性能下降,服务器卡顿甚至崩溃等现象
总结
从某种意义上说,消费者的限流策略有助于那么处理消息效率高的消费者多消费一些消息,效率低一些的消费者少推送一些消息,从而可以达到能者多劳的目的,尽可能发挥消费者处理消息的能力。在项目中,为了缓解生产者和消费者两边效率不平衡的影响,通常会对消费者进行限流处理,保证消费者端正常消费消息,尽可能避免服务器崩溃以及宕机现象。
直接上代码:
//简单模式step:3.简单模式下消费者代码
func (r *RabbitMQ) ConsumeSimple(service services.IOrderService,productService services.IProductService) {
//申请队列和生产消息当中是一样一样滴 直接复制即可
//1.申请队列,如果队列不存在,则会自动创建,如果队列存在则跳过创建直接使用 这样的好处保障队列存在,消息能发送到队列当中
_,err := r.channel.QueueDeclare(
//队列名称
r.QueueName,
//进入的消息是否持久化 进入队列如果不消费那么消息就在队列里面 如果重启服务器那么这个消息就没啦 通常设置为false
false,
//是否为自动删除 意思是最后一个消费者断开链接以后是否将消息从队列当中删除 默认设置为false不自动删除
false,
//是否具有排他性
false,
//是否阻塞 发送消息以后是否要等待消费者的响应 消费了下一个才进来 就跟golang里面的无缓冲channle一个道理 默认为非阻塞即可设置为false
false,
//其他的属性,没有则直接诶传入空即可 nil nil,
)
if err != nil {
fmt.Println(err)
}
//消费者流控 防止数据库爆库
//消息的消费需要配合Qos
r.channel.Qos(
//每次队列只消费一个消息 这个消息处理不完服务器不会发送第二个消息过来
//当前消费者一次能接受的最大消息数量
1,
//服务器传递的最大容量
0,
//如果为true 对channel可用 false则只对当前队列可用
false,
)
//2.接收消息
//建立了链接 就跟socket一样 一直在监听 从未被终止 这也就保证了下边的子协程当中程序的无线循环的成立
msgs,err := r.channel.Consume(
//队列名称
r.QueueName,
//用来区分多个消费者
"",
//是否自动应答 意思就是收到一个消息已经被消费者消费完了是否主动告诉rabbitmq服务器我已经消费完了你可以去删除这个消息啦 默认是true
false,//为false表示消费端消费了消息之后需要手动ack一下 告诉服务器我消费成功了你可以去删除服务端队列里面的值了 如果为true则是只要消费端销售了那么服务器自动删除服务端队列里面的消息
//是否具有排他性
false,
//如果设置为true表示不能将同一个connection中发送的消息传递给同个connectio中的消费者
false,
//队列消费是否阻塞 fase表示是阻塞 true表示是不阻塞
false,
nil,)
if err != nil {
fmt.Println(err)
}
//3.消费消息
forever := make(chan bool)
//启用协程处理消息
go func() {
//子协程不会结束 因为msgs有监听 会不断的有值进来 就算没值也在监听 就跟socket服务一样一直在监听从未被中断!
for d:=range msgs {
//如果为true表示确认所有未确认的消息
//如果为false表示确认当前消息
//执行完业务逻辑成功之后我们再手动ack告诉服务器你可以删除这个消息啦! 这样就保障了数据的绝对的安全不丢失!
d.Ack(false)
}
}()
log.Printf("[*] Waiting for message,To exit press CTRL+C")
//最后我们来阻塞一下 这样主程序就不会死掉
<-forever
}