SpringBoot 整合 RabbitMQ 消息回调、手动确认 
                                                    
                        
                    
                    
  
                    
                    一、配置
消息发送者和消费者
注意:
若使用 confirm-callback 需要配置
publisher-confirm-type: correlated
若使用 return-callback 需要配置
publisher-returns: true
使用 return-callback 时必须设置 mandatory 为 true
或者在配置中设置 rabbitmq.template.mandatory=true
yml 配置如下
server:
  port: 8093
spring:
  application:
    name: back-provider
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    virtual-host: HuDuHost
    #确认消息已发送到交换机(Exchange)
    publisher-confirm-type: correlated
    #确认消息已发送到队列(Queue)
    publisher-returns: true
如果你们在配置确认回调,测试发现无法触发回调函数,那么存在原因也许是因为SpringBoot版本导致的配置项不起效,可以把 publisher-confirms: true 替换为 publisher-confirm-type: correlated
rabbitTemplate 配置如下
@Configuration
public class RabbitClusterConfig {
    @Value("${spring.rabbitmq.addresses}")
    private String addresses;
    @Value("${spring.rabbitmq.username}")
    private String username;
    @Value("${spring.rabbitmq.password}")
    private String password;
    @Value("${spring.rabbitmq.virtual-host}")
    private String virtualHost;
    @Value("${spring.rabbitmq.publisher-returns}")
    private boolean publisherReturns;
    @Autowired
    private RabbitConfirmCallbackService confirmCallbackService;
    @Autowired
    private RabbitReturnCallbackService returnCallbackService;
    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        // connectionFactory.setHost();
        // connectionFactory.setPort();
        connectionFactory.setAddresses(addresses);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        connectionFactory.setVirtualHost(virtualHost);
        connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
        connectionFactory.setPublisherReturns(publisherReturns);
        return connectionFactory;
    }
    @Bean
    // 如果需要对 rabbitTemplate 设置不同的回调类,需要设置原型模式,不然回调类只能有一个
    @Scope(value = ConfigurableBeanFactory.SCOPE_PROTOTYPE, proxyMode = ScopedProxyMode.TARGET_CLASS)
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        // 设置消布确认回调,即当消息达到交换机回调
        // 只有开启了 Mandatory 才能出发回调函数,无论消息吐送结果怎样都强制调用回调函数
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setConfirmCallback(confirmCallbackService);
        // 消息(带有 RoutingKey)到达交换机,与交换机的所有所有绑定的键进行匹配,匹配不到触发回调
        rabbitTemplate.setReturnsCallback(returnCallbackService);
        return rabbitTemplate;
    }
}
@Service
public class RabbitConfirmCallbackService implements RabbitTemplate.ConfirmCallback {
    private final Logger logger = LoggerFactory.getLogger(this.getClass());
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (ack) {
            logger.info("消息发送成功!");
        } else {
            logger.error("消息发送异常!");
            logger.error("ConfirmCallback 相关数据: {}", correlationData);
            logger.error("ConfirmCallback 确认情况: {}", ack);
            logger.error("confirmCallback 失败原因: {}", cause);
        }
    }
}
@Service
public class RabbitReturnCallbackService implements RabbitTemplate.ReturnsCallback {
    private final Logger logger = LoggerFactory.getLogger(this.getClass());
    /**
     * 消息路由失败,回调
     * 消息(带有路由键routingKey)到达交换机,与交换机的所有绑定键进行匹配,匹配不到触发回调
     */
    @Override
    public void returnedMessage(ReturnedMessage returned) {
        logger.error("ReturnCallback 消息: " + returned.getMessage());
        logger.error("ReturnCallback 响应码: " + returned.getReplyCode());
        logger.error("ReturnCallback 回应消息: " + returned.getReplyText());
        logger.error("ReturnCallback 交换机: " + returned.getExchange());
        logger.error("ReturnCallback 路由键: " + returned.getRoutingKey());
    }
}
二、消息发送方消息模拟
我们大致模拟这几种情况
- 消息推送到 server ,但是 server 里找不到交换机
 - 消息推送到 server,找到交换机,没有找到队列
 - 消息推送到 server,交换机和队列里啥都没找到
 - 消息推送成功
 
2.1、消息推送到 server,但是照得不到交换机
@RestController
@RequestMapping("rabbitmq")
public class NotExistentExchange {
    @Autowired
    RabbitTemplate rabbitTemplate;
    @GetMapping("noExistentMessage")
    public String notExistentMessage() {
        HashMap<String, Object> map = new HashMap<>();
        map.put("code", UUID.randomUUID().toString());
        map.put("time", LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
        map.put("message", "无法找到交换机的消息");
        // 指向交换机和路由键
        rabbitTemplate.convertAndSend("not-existent-exchange", "routingKey", map);
        return "ok";
    }
}

结论:这种情况触发 ConfirmCallback 回调函数
2.2、找到交换机,但是没有找到队列
这里就需要新增一个交换机
@Bean
public DirectExchange callBackDirectExchange() {
  return new DirectExchange("callBackDirectExchange",true,false);
}
控制层
@RestController
@RequestMapping("rabbitmq")
public class NotQueueMessage {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @GetMapping("/notQueue")
    public String notQueue() {
        HashMap<String, Object> map = new HashMap<>();
        map.put("code", UUID.randomUUID().toString());
        map.put("time", LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
        map.put("message", "无法找到队列的消息");
        rabbitTemplate.convertAndSend("callBackDirectExchange", "routingKey", map);
        return "ok";
    }
}

可以看到这种情况,两个函数都被调用了,这种情况下,消息是推送到服务器的,所以 ConfirmCallback 确认了成功,而在 ReturnCallback 回调函数的打印参数里看到,消息到交换机成功了,但是分发给队列时候,找不到队列,报 NO_ROUTE 错误
结论:这种情况出发 ConfirmCallback 和 ReturnCallback 两个回调函数
2.3、消息推送到 server,交换机和队列里啥都没找到
这种情况就和案例1很像了,3和1 的回调是一致的
结论:这种情况触发 ConfirmCallabck 回调函数
2.4、消息推送正常
在这种情况下需要新增队列,并且绑定交换机和队列
@Bean
public Queue callBackQueue() {
  return new Queue("callBackQueue",true);
}
@Bean
public Binding callBackBinding() {
  return BindingBuilder.bind(callBackQueue()).to(callBackDirectExchange()).with("routingKey");
}
控制层代码
@RestController
@RequestMapping("rabbitmq")
public class NormalMessage {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @RequestMapping("normal")
    public String normalMessage() {
        HashMap<String, Object> map = new HashMap<>();
        map.put("code", UUID.randomUUID().toString());
        map.put("time", LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
        map.put("message", "正常的消息");
        rabbitTemplate.convertAndSend("callBackDirectExchange", "routingKey", map);
        return "ok";
    }
}

三、消费者收到消息确认
和生产的消息确认机制不同,因为消息接收本来就在监听信息,符合条件的消息就消费下来,所以,消息接收确认机制主要三种模式
自动确认,这也是默认的消息确认情况 AcknowledgeMode.NONE
RabbitMQ 成功将消息发出立即认为本次投递已经被正确处理,不管消费者端是否成功处理本次投递,所以这种情况如果消费者抛出异常,也就是消费者没有成功处理这条消息,那么就相当于消息丢失,一般这种情况我们都是使用 try-catch 捕捉异常,打印日志用于追踪数据,这样找出对应数据后在做处理根据情况确认,这个不做介绍
手动确认,这个比较关键,也就是我们配置接收消息确认机制时,多数选择的模式,消费者在收到消息后,手动调用 basic.ack、basic.nack、basic.reject 后,rabbitMQ收到这些消息,才认为本次投递成功
- basic.ack 用于确认
 - basic.nack 用于否确认,(这是AMQP 0-9-1 的 RabbitMQ扩展)
 - basic.reject 用于否确认,但是和 basic.nack 不同的是,一次只能拒绝一条消息
 
这里重点说一下 reject,因为有时候一些场景是需要重新入列的
channel.basicReject(deliveryTag,true);拒绝消费当前消息,如果第二个参数传入 true,就是将数据重新丢回队列里,那么下次还会消费这个消息。如果第二个参数设置 false,那就告诉服务器,他已经知道这个数据是因为什么原因被拒绝的,下次一次就不会再消费这个消息了
这样就出现了一个问题,如果设置为 False,被拒绝的数据就一直在队列里,如果被拒绝的越来越多,就会导致消息积压,一定要及时清理出去
channel.basicNack(deliveryTag,false,true);第一个参数依然是当前小消息到数据唯一id,第二个参数是指是否针对多条消息,如果是true,也就是说一次性针对当前通道的消息tagId小于这个消息的都拒绝,第三个参数是指是否重新入列(这里要注意,你这一次被拒绝了,下一次还是会被拒绝,所以还是会导致消息积压)
yml 配置如下
server:
  port: 8093
spring:
  application:
    name: back-provider
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    virtual-host: HuDuHost
    # 这里是手动开启 ack,让程序控制 MQ 消息的重发、删除和转移
    listener:
      simple:
        # 手动 ack 机制,默认是 none
        acknowledge-mode: manual
        retry:
          # 开启重试,默认为 false
          enabled: true
          # 最大重试次数,默认为 3
          max-attempts: 10
          # 重试间隔时间
          initial-interval: 2000ms
消费端代码大致如下
@Service
@Slf4j
@RabbitListener(queues = {"order.fanout.queue"})
public class OrderMqConsumer {
    @Autowired
    private DispatchService dispatchService;
    private int count = 1;
    @RabbitHandler
    public void messageConsumer2(String orderMsg, Channel channel, CorrelationData correlationData,
                                 @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException {
        // 获取消息队列消息
        log.info("收到 MQ 消息是: {}", orderMsg);
        try {
            // 获取订单信息
            Order order = new ObjectMapper().readValue(orderMsg, Order.class);
            // 获取订单 id
            String orderId = order.getOrderId();
            // 保存订单
            dispatchService.dispatch(orderId);
            // 模拟异常
            System.out.println(0 / 1);
            // 关闭 ack
            channel.basicAck(deliveryTag, false);
        } catch (Exception e) {
            // 如果出现异常的情况下,则判断为无效应答,根据实际情况重发
            // 参数1:消息的 tag,参数2:false 多条处理,参数3:requeue 重发
            // false 不会重发,false 的话会扔掉消息,将消息转移到死信队列
            // true 会循环重发,建议使用true 的话,不要加 try / catch,否则会造成死循环,并且在配置文件中配置的重发次数失效
            channel.basicNack(deliveryTag, false, false);
        }
    }
}
                        
                        本作品采用《CC 协议》,转载必须注明作者和本文链接
          
          
          
                关于 LearnKu