SpringBoot 整合 RabbitMQ 消息回调、手动确认
一、配置
消息发送者和消费者
注意:
若使用 confirm-callback 需要配置
publisher-confirm-type: correlated
若使用 return-callback 需要配置
publisher-returns: true
使用 return-callback 时必须设置 mandatory 为 true
或者在配置中设置 rabbitmq.template.mandatory=true
yml 配置如下
server:
port: 8093
spring:
application:
name: back-provider
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
virtual-host: HuDuHost
#确认消息已发送到交换机(Exchange)
publisher-confirm-type: correlated
#确认消息已发送到队列(Queue)
publisher-returns: true
如果你们在配置确认回调,测试发现无法触发回调函数,那么存在原因也许是因为SpringBoot版本导致的配置项不起效,可以把 publisher-confirms: true
替换为 publisher-confirm-type: correlated
rabbitTemplate 配置如下
@Configuration
public class RabbitClusterConfig {
@Value("${spring.rabbitmq.addresses}")
private String addresses;
@Value("${spring.rabbitmq.username}")
private String username;
@Value("${spring.rabbitmq.password}")
private String password;
@Value("${spring.rabbitmq.virtual-host}")
private String virtualHost;
@Value("${spring.rabbitmq.publisher-returns}")
private boolean publisherReturns;
@Autowired
private RabbitConfirmCallbackService confirmCallbackService;
@Autowired
private RabbitReturnCallbackService returnCallbackService;
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
// connectionFactory.setHost();
// connectionFactory.setPort();
connectionFactory.setAddresses(addresses);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(virtualHost);
connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
connectionFactory.setPublisherReturns(publisherReturns);
return connectionFactory;
}
@Bean
// 如果需要对 rabbitTemplate 设置不同的回调类,需要设置原型模式,不然回调类只能有一个
@Scope(value = ConfigurableBeanFactory.SCOPE_PROTOTYPE, proxyMode = ScopedProxyMode.TARGET_CLASS)
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
// 设置消布确认回调,即当消息达到交换机回调
// 只有开启了 Mandatory 才能出发回调函数,无论消息吐送结果怎样都强制调用回调函数
rabbitTemplate.setMandatory(true);
rabbitTemplate.setConfirmCallback(confirmCallbackService);
// 消息(带有 RoutingKey)到达交换机,与交换机的所有所有绑定的键进行匹配,匹配不到触发回调
rabbitTemplate.setReturnsCallback(returnCallbackService);
return rabbitTemplate;
}
}
@Service
public class RabbitConfirmCallbackService implements RabbitTemplate.ConfirmCallback {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
logger.info("消息发送成功!");
} else {
logger.error("消息发送异常!");
logger.error("ConfirmCallback 相关数据: {}", correlationData);
logger.error("ConfirmCallback 确认情况: {}", ack);
logger.error("confirmCallback 失败原因: {}", cause);
}
}
}
@Service
public class RabbitReturnCallbackService implements RabbitTemplate.ReturnsCallback {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
/**
* 消息路由失败,回调
* 消息(带有路由键routingKey)到达交换机,与交换机的所有绑定键进行匹配,匹配不到触发回调
*/
@Override
public void returnedMessage(ReturnedMessage returned) {
logger.error("ReturnCallback 消息: " + returned.getMessage());
logger.error("ReturnCallback 响应码: " + returned.getReplyCode());
logger.error("ReturnCallback 回应消息: " + returned.getReplyText());
logger.error("ReturnCallback 交换机: " + returned.getExchange());
logger.error("ReturnCallback 路由键: " + returned.getRoutingKey());
}
}
二、消息发送方消息模拟
我们大致模拟这几种情况
- 消息推送到 server ,但是 server 里找不到交换机
- 消息推送到 server,找到交换机,没有找到队列
- 消息推送到 server,交换机和队列里啥都没找到
- 消息推送成功
2.1、消息推送到 server,但是照得不到交换机
@RestController
@RequestMapping("rabbitmq")
public class NotExistentExchange {
@Autowired
RabbitTemplate rabbitTemplate;
@GetMapping("noExistentMessage")
public String notExistentMessage() {
HashMap<String, Object> map = new HashMap<>();
map.put("code", UUID.randomUUID().toString());
map.put("time", LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
map.put("message", "无法找到交换机的消息");
// 指向交换机和路由键
rabbitTemplate.convertAndSend("not-existent-exchange", "routingKey", map);
return "ok";
}
}
结论:这种情况触发 ConfirmCallback 回调函数
2.2、找到交换机,但是没有找到队列
这里就需要新增一个交换机
@Bean
public DirectExchange callBackDirectExchange() {
return new DirectExchange("callBackDirectExchange",true,false);
}
控制层
@RestController
@RequestMapping("rabbitmq")
public class NotQueueMessage {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/notQueue")
public String notQueue() {
HashMap<String, Object> map = new HashMap<>();
map.put("code", UUID.randomUUID().toString());
map.put("time", LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
map.put("message", "无法找到队列的消息");
rabbitTemplate.convertAndSend("callBackDirectExchange", "routingKey", map);
return "ok";
}
}
可以看到这种情况,两个函数都被调用了,这种情况下,消息是推送到服务器的,所以 ConfirmCallback 确认了成功,而在 ReturnCallback 回调函数的打印参数里看到,消息到交换机成功了,但是分发给队列时候,找不到队列,报 NO_ROUTE 错误
结论:这种情况出发 ConfirmCallback 和 ReturnCallback 两个回调函数
2.3、消息推送到 server,交换机和队列里啥都没找到
这种情况就和案例1很像了,3和1 的回调是一致的
结论:这种情况触发 ConfirmCallabck 回调函数
2.4、消息推送正常
在这种情况下需要新增队列,并且绑定交换机和队列
@Bean
public Queue callBackQueue() {
return new Queue("callBackQueue",true);
}
@Bean
public Binding callBackBinding() {
return BindingBuilder.bind(callBackQueue()).to(callBackDirectExchange()).with("routingKey");
}
控制层代码
@RestController
@RequestMapping("rabbitmq")
public class NormalMessage {
@Autowired
private RabbitTemplate rabbitTemplate;
@RequestMapping("normal")
public String normalMessage() {
HashMap<String, Object> map = new HashMap<>();
map.put("code", UUID.randomUUID().toString());
map.put("time", LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
map.put("message", "正常的消息");
rabbitTemplate.convertAndSend("callBackDirectExchange", "routingKey", map);
return "ok";
}
}
三、消费者收到消息确认
和生产的消息确认机制不同,因为消息接收本来就在监听信息,符合条件的消息就消费下来,所以,消息接收确认机制主要三种模式
自动确认,这也是默认的消息确认情况 AcknowledgeMode.NONE
RabbitMQ 成功将消息发出立即认为本次投递已经被正确处理,不管消费者端是否成功处理本次投递,所以这种情况如果消费者抛出异常,也就是消费者没有成功处理这条消息,那么就相当于消息丢失,一般这种情况我们都是使用 try-catch 捕捉异常,打印日志用于追踪数据,这样找出对应数据后在做处理根据情况确认,这个不做介绍
手动确认,这个比较关键,也就是我们配置接收消息确认机制时,多数选择的模式,消费者在收到消息后,手动调用 basic.ack、basic.nack、basic.reject 后,rabbitMQ收到这些消息,才认为本次投递成功
- basic.ack 用于确认
- basic.nack 用于否确认,(这是AMQP 0-9-1 的 RabbitMQ扩展)
- basic.reject 用于否确认,但是和 basic.nack 不同的是,一次只能拒绝一条消息
这里重点说一下 reject,因为有时候一些场景是需要重新入列的
channel.basicReject(deliveryTag,true)
;拒绝消费当前消息,如果第二个参数传入 true,就是将数据重新丢回队列里,那么下次还会消费这个消息。如果第二个参数设置 false,那就告诉服务器,他已经知道这个数据是因为什么原因被拒绝的,下次一次就不会再消费这个消息了
这样就出现了一个问题,如果设置为 False,被拒绝的数据就一直在队列里,如果被拒绝的越来越多,就会导致消息积压,一定要及时清理出去
channel.basicNack(deliveryTag,false,true)
;第一个参数依然是当前小消息到数据唯一id,第二个参数是指是否针对多条消息,如果是true,也就是说一次性针对当前通道的消息tagId小于这个消息的都拒绝,第三个参数是指是否重新入列(这里要注意,你这一次被拒绝了,下一次还是会被拒绝,所以还是会导致消息积压)
yml 配置如下
server:
port: 8093
spring:
application:
name: back-provider
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
virtual-host: HuDuHost
# 这里是手动开启 ack,让程序控制 MQ 消息的重发、删除和转移
listener:
simple:
# 手动 ack 机制,默认是 none
acknowledge-mode: manual
retry:
# 开启重试,默认为 false
enabled: true
# 最大重试次数,默认为 3
max-attempts: 10
# 重试间隔时间
initial-interval: 2000ms
消费端代码大致如下
@Service
@Slf4j
@RabbitListener(queues = {"order.fanout.queue"})
public class OrderMqConsumer {
@Autowired
private DispatchService dispatchService;
private int count = 1;
@RabbitHandler
public void messageConsumer2(String orderMsg, Channel channel, CorrelationData correlationData,
@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException {
// 获取消息队列消息
log.info("收到 MQ 消息是: {}", orderMsg);
try {
// 获取订单信息
Order order = new ObjectMapper().readValue(orderMsg, Order.class);
// 获取订单 id
String orderId = order.getOrderId();
// 保存订单
dispatchService.dispatch(orderId);
// 模拟异常
System.out.println(0 / 1);
// 关闭 ack
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
// 如果出现异常的情况下,则判断为无效应答,根据实际情况重发
// 参数1:消息的 tag,参数2:false 多条处理,参数3:requeue 重发
// false 不会重发,false 的话会扔掉消息,将消息转移到死信队列
// true 会循环重发,建议使用true 的话,不要加 try / catch,否则会造成死循环,并且在配置文件中配置的重发次数失效
channel.basicNack(deliveryTag, false, false);
}
}
}
本作品采用《CC 协议》,转载必须注明作者和本文链接