SpringBoot 整合 RabbitMQ 消息回调、手动确认

一、配置

消息发送者和消费者
注意:

若使用 confirm-callback 需要配置

publisher-confirm-type: correlated
若使用 return-callback 需要配置

publisher-returns: true

使用 return-callback 时必须设置 mandatory 为 true

或者在配置中设置 rabbitmq.template.mandatory=true

yml 配置如下

server:
  port: 8093

spring:
  application:
    name: back-provider
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    virtual-host: HuDuHost
    #确认消息已发送到交换机(Exchange)
    publisher-confirm-type: correlated
    #确认消息已发送到队列(Queue)
    publisher-returns: true

如果你们在配置确认回调,测试发现无法触发回调函数,那么存在原因也许是因为SpringBoot版本导致的配置项不起效,可以把 publisher-confirms: true 替换为 publisher-confirm-type: correlated

rabbitTemplate 配置如下

@Configuration
public class RabbitClusterConfig {
    @Value("${spring.rabbitmq.addresses}")
    private String addresses;

    @Value("${spring.rabbitmq.username}")
    private String username;

    @Value("${spring.rabbitmq.password}")
    private String password;

    @Value("${spring.rabbitmq.virtual-host}")
    private String virtualHost;

    @Value("${spring.rabbitmq.publisher-returns}")
    private boolean publisherReturns;

    @Autowired
    private RabbitConfirmCallbackService confirmCallbackService;
    @Autowired
    private RabbitReturnCallbackService returnCallbackService;

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        // connectionFactory.setHost();
        // connectionFactory.setPort();
        connectionFactory.setAddresses(addresses);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        connectionFactory.setVirtualHost(virtualHost);
        connectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
        connectionFactory.setPublisherReturns(publisherReturns);

        return connectionFactory;
    }

    @Bean
    // 如果需要对 rabbitTemplate 设置不同的回调类,需要设置原型模式,不然回调类只能有一个
    @Scope(value = ConfigurableBeanFactory.SCOPE_PROTOTYPE, proxyMode = ScopedProxyMode.TARGET_CLASS)
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        // 设置消布确认回调,即当消息达到交换机回调
        // 只有开启了 Mandatory 才能出发回调函数,无论消息吐送结果怎样都强制调用回调函数
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setConfirmCallback(confirmCallbackService);
        // 消息(带有 RoutingKey)到达交换机,与交换机的所有所有绑定的键进行匹配,匹配不到触发回调
        rabbitTemplate.setReturnsCallback(returnCallbackService);
        return rabbitTemplate;
    }
}
@Service
public class RabbitConfirmCallbackService implements RabbitTemplate.ConfirmCallback {

    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (ack) {
            logger.info("消息发送成功!");
        } else {
            logger.error("消息发送异常!");
            logger.error("ConfirmCallback 相关数据: {}", correlationData);
            logger.error("ConfirmCallback 确认情况: {}", ack);
            logger.error("confirmCallback 失败原因: {}", cause);
        }
    }
}
@Service
public class RabbitReturnCallbackService implements RabbitTemplate.ReturnsCallback {

    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    /**
     * 消息路由失败,回调
     * 消息(带有路由键routingKey)到达交换机,与交换机的所有绑定键进行匹配,匹配不到触发回调
     */
    @Override
    public void returnedMessage(ReturnedMessage returned) {
        logger.error("ReturnCallback 消息: " + returned.getMessage());
        logger.error("ReturnCallback 响应码: " + returned.getReplyCode());
        logger.error("ReturnCallback 回应消息: " + returned.getReplyText());
        logger.error("ReturnCallback 交换机: " + returned.getExchange());
        logger.error("ReturnCallback 路由键: " + returned.getRoutingKey());
    }
}

二、消息发送方消息模拟

我们大致模拟这几种情况

  1. 消息推送到 server ,但是 server 里找不到交换机
  2. 消息推送到 server,找到交换机,没有找到队列
  3. 消息推送到 server,交换机和队列里啥都没找到
  4. 消息推送成功

2.1、消息推送到 server,但是照得不到交换机

@RestController
@RequestMapping("rabbitmq")
public class NotExistentExchange {

    @Autowired
    RabbitTemplate rabbitTemplate;

    @GetMapping("noExistentMessage")
    public String notExistentMessage() {
        HashMap<String, Object> map = new HashMap<>();
        map.put("code", UUID.randomUUID().toString());
        map.put("time", LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
        map.put("message", "无法找到交换机的消息");
        // 指向交换机和路由键
        rabbitTemplate.convertAndSend("not-existent-exchange", "routingKey", map);
        return "ok";
    }
}

SpringBoot 整合 RabbitMQ 消息回调、手动确认

结论:这种情况触发 ConfirmCallback 回调函数

2.2、找到交换机,但是没有找到队列

这里就需要新增一个交换机

@Bean
public DirectExchange callBackDirectExchange() {
  return new DirectExchange("callBackDirectExchange",true,false);
}

控制层

@RestController
@RequestMapping("rabbitmq")
public class NotQueueMessage {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/notQueue")
    public String notQueue() {
        HashMap<String, Object> map = new HashMap<>();
        map.put("code", UUID.randomUUID().toString());
        map.put("time", LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
        map.put("message", "无法找到队列的消息");
        rabbitTemplate.convertAndSend("callBackDirectExchange", "routingKey", map);
        return "ok";
    }
}

SpringBoot 整合 RabbitMQ 消息回调、手动确认

可以看到这种情况,两个函数都被调用了,这种情况下,消息是推送到服务器的,所以 ConfirmCallback 确认了成功,而在 ReturnCallback 回调函数的打印参数里看到,消息到交换机成功了,但是分发给队列时候,找不到队列,报 NO_ROUTE 错误

结论:这种情况出发 ConfirmCallback 和 ReturnCallback 两个回调函数

2.3、消息推送到 server,交换机和队列里啥都没找到

这种情况就和案例1很像了,3和1 的回调是一致的

结论:这种情况触发 ConfirmCallabck 回调函数

2.4、消息推送正常

在这种情况下需要新增队列,并且绑定交换机和队列

@Bean
public Queue callBackQueue() {
  return new Queue("callBackQueue",true);
}

@Bean
public Binding callBackBinding() {
  return BindingBuilder.bind(callBackQueue()).to(callBackDirectExchange()).with("routingKey");
}

控制层代码

@RestController
@RequestMapping("rabbitmq")
public class NormalMessage {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @RequestMapping("normal")
    public String normalMessage() {
        HashMap<String, Object> map = new HashMap<>();
        map.put("code", UUID.randomUUID().toString());
        map.put("time", LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
        map.put("message", "正常的消息");
        rabbitTemplate.convertAndSend("callBackDirectExchange", "routingKey", map);
        return "ok";
    }
}

SpringBoot 整合 RabbitMQ 消息回调、手动确认

三、消费者收到消息确认

和生产的消息确认机制不同,因为消息接收本来就在监听信息,符合条件的消息就消费下来,所以,消息接收确认机制主要三种模式

  1. 自动确认,这也是默认的消息确认情况 AcknowledgeMode.NONE
    RabbitMQ 成功将消息发出立即认为本次投递已经被正确处理,不管消费者端是否成功处理本次投递,所以这种情况如果消费者抛出异常,也就是消费者没有成功处理这条消息,那么就相当于消息丢失,一般这种情况我们都是使用 try-catch 捕捉异常,打印日志用于追踪数据,这样找出对应数据后在做处理

  2. 根据情况确认,这个不做介绍

  3. 手动确认,这个比较关键,也就是我们配置接收消息确认机制时,多数选择的模式,消费者在收到消息后,手动调用 basic.ack、basic.nack、basic.reject 后,rabbitMQ收到这些消息,才认为本次投递成功

    1. basic.ack 用于确认
    2. basic.nack 用于否确认,(这是AMQP 0-9-1 的 RabbitMQ扩展)
    3. basic.reject 用于否确认,但是和 basic.nack 不同的是,一次只能拒绝一条消息

这里重点说一下 reject,因为有时候一些场景是需要重新入列的

channel.basicReject(deliveryTag,true);拒绝消费当前消息,如果第二个参数传入 true,就是将数据重新丢回队列里,那么下次还会消费这个消息。如果第二个参数设置 false,那就告诉服务器,他已经知道这个数据是因为什么原因被拒绝的,下次一次就不会再消费这个消息了

这样就出现了一个问题,如果设置为 False,被拒绝的数据就一直在队列里,如果被拒绝的越来越多,就会导致消息积压,一定要及时清理出去

channel.basicNack(deliveryTag,false,true);第一个参数依然是当前小消息到数据唯一id,第二个参数是指是否针对多条消息,如果是true,也就是说一次性针对当前通道的消息tagId小于这个消息的都拒绝,第三个参数是指是否重新入列(这里要注意,你这一次被拒绝了,下一次还是会被拒绝,所以还是会导致消息积压)

yml 配置如下

server:
  port: 8093

spring:
  application:
    name: back-provider
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    virtual-host: HuDuHost
    # 这里是手动开启 ack,让程序控制 MQ 消息的重发、删除和转移
    listener:
      simple:
        # 手动 ack 机制,默认是 none
        acknowledge-mode: manual
        retry:
          # 开启重试,默认为 false
          enabled: true
          # 最大重试次数,默认为 3
          max-attempts: 10
          # 重试间隔时间
          initial-interval: 2000ms

消费端代码大致如下

@Service
@Slf4j
@RabbitListener(queues = {"order.fanout.queue"})
public class OrderMqConsumer {
    @Autowired
    private DispatchService dispatchService;

    private int count = 1;

    @RabbitHandler
    public void messageConsumer2(String orderMsg, Channel channel, CorrelationData correlationData,
                                 @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException {
        // 获取消息队列消息
        log.info("收到 MQ 消息是: {}", orderMsg);
        try {
            // 获取订单信息
            Order order = new ObjectMapper().readValue(orderMsg, Order.class);
            // 获取订单 id
            String orderId = order.getOrderId();
            // 保存订单
            dispatchService.dispatch(orderId);
            // 模拟异常
            System.out.println(0 / 1);
            // 关闭 ack
            channel.basicAck(deliveryTag, false);
        } catch (Exception e) {
            // 如果出现异常的情况下,则判断为无效应答,根据实际情况重发
            // 参数1:消息的 tag,参数2:false 多条处理,参数3:requeue 重发
            // false 不会重发,false 的话会扔掉消息,将消息转移到死信队列
            // true 会循环重发,建议使用true 的话,不要加 try / catch,否则会造成死循环,并且在配置文件中配置的重发次数失效
            channel.basicNack(deliveryTag, false, false);
        }
    }
}
本作品采用《CC 协议》,转载必须注明作者和本文链接
讨论数量: 0
(= ̄ω ̄=)··· 暂无内容!

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!