SpringBoot 整合 RabbitMQ 集群 
                                                    
                        
                    
                    
  
                    
                    引入依赖
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
    </dependencies>详细配置如下
 rabbitmq:
    addresses: 127.0.0.1:6605,127.0.0.1:6606,127.0.0.1:6705 #指定client连接到的server的地址,多个以逗号分隔(优先取addresses,然后再取host)
#    port:
    ##集群配置 addresses之间用逗号隔开
    # addresses: ip:port,ip:port
    password: admin
    username: 123456
    virtual-host: / # 连接到rabbitMQ的vhost
    requested-heartbeat: #指定心跳超时,单位秒,0为不指定;默认60s
    publisher-confirms: #是否启用 发布确认
    publisher-reurns: # 是否启用发布返回
    connection-timeout: #连接超时,单位毫秒,0表示无穷大,不超时
    cache:
      channel.size: # 缓存中保持的channel数量
      channel.checkout-timeout: # 当缓存数量被设置时,从缓存中获取一个channel的超时时间,单位毫秒;如果为0,则总是创建一个新channel
      connection.size: # 缓存的连接数,只有是CONNECTION模式时生效
      connection.mode: # 连接工厂缓存模式:CHANNEL 和 CONNECTION
    listener:
      simple.auto-startup: # 是否启动时自动启动容器
      simple.acknowledge-mode: # 表示消息确认方式,其有三种配置方式,分别是none、manual和auto;默认auto
      simple.concurrency: # 最小的消费者数量
      simple.max-concurrency: # 最大的消费者数量
      simple.prefetch: # 指定一个请求能处理多少个消息,如果有事务的话,必须大于等于transaction数量.
      simple.transaction-size: # 指定一个事务处理的消息数量,最好是小于等于prefetch的数量.
      simple.default-requeue-rejected: # 决定被拒绝的消息是否重新入队;默认是true(与参数acknowledge-mode有关系)
      simple.idle-event-interval: # 多少长时间发布空闲容器时间,单位毫秒
      simple.retry.enabled: # 监听重试是否可用
      simple.retry.max-attempts: # 最大重试次数
      simple.retry.initial-interval: # 第一次和第二次尝试发布或传递消息之间的间隔
      simple.retry.multiplier: # 应用于上一重试间隔的乘数
      simple.retry.max-interval: # 最大重试时间间隔
      simple.retry.stateless: # 重试是有状态or无状态
    template:
      mandatory: # 启用强制信息;默认false
      receive-timeout: # receive() 操作的超时时间
      reply-timeout: # sendAndReceive() 操作的超时时间
      retry.enabled: # 发送重试是否可用
      retry.max-attempts: # 最大重试次数
      retry.initial-interval: # 第一次和第二次尝试发布或传递消息之间的间隔
      retry.multiplier: # 应用于上一重试间隔的乘数
      retry.max-interval: #最大重试时间间隔Spring AMQP的主要对象
| 类 | 作用 | 
|---|---|
| Queue | 对应RabbitMQ中Queue | 
| AmqpTemplate | 接口,用于向RabbitMQ发送和接收Message | 
| RabbitTemplate | AmqpTemplate的实现类 | 
| @RabbitListener | 指定消息接收方,可以配置在类和方法上 | 
| @RabbitHandler | 指定消息接收方,只能配置在方法上,可以与@RabbitListener一起使用 | 
| Message | 对RabbitMQ消息的封装 | 
| Exchange | 对RabbitMQ的Exchange的封装,子类有TopicExchange、FanoutExchange和DirectExchange等 | 
| Binding | 将一个Queue绑定到某个Exchange,本身只是一个声明,并不做实际绑定操作 | 
| AmqpAdmin | 接口,用于Exchange和Queue的管理,比如创建/删除/绑定等,自动检查Binding类并完成绑定操作 | 
| RabbitAdmin | AmqpAdmin的实现类 | 
| ConnectionFactory | 创建Connection的工厂类,RabbitMQ也有一个名为ConnectionFactory的类但二者没有继承关系,Spring ConnectionFactory可以认为是对RabbitMQ ConnectionFactory的封装 | 
| CachingConnectionFactory | Spring ConnectionFactory的实现类,可以用于缓存Channel和Connection | 
| Connection | Spring中用于创建Channel的连接类,RabbitMQ也有一个名为Connection的类,但二者没有继承关系,Spring Connection是对RabbitMQ Connection的封装 | 
| SimpleConnection | Spring Connection的实现类,将实际工作代理给RabbitMQ的Connection类 | 
| MessageListenerContainer | 接口,消费端负责与RabbitMQ服务器保持连接并将Message传递给实际的@RabbitListener/@RabbitHandler处理 | 
| RabbitListenerContainerFactory | 接口,用于创建MessageListenerContainer | 
| SimpleMessageListenerContainer | MessageListenerContainer的实现类 | 
| SimpleRabbitListenerContainerFactory | RabbitListenerContainerFactory的实现类 | 
| RabbitProperties | 用于配置Spring AMQP的Property类 | 
对于消息的发送方而言,需要进行如下配置
- 配置 CachingConnectionFactory
- 配置 Exchange/Queue/Binding
- 配置 RabbitAdmin 创建上一步的 Exchange/Queue/Binding
- 配置 RabbitTemplate 用于发送消息,RabbitTemplate通过CachingConnectionFactory 获取到 Connection,然后想指定 Exchange发送
消息的消费方需要进行如下配置
- 配置 CachingConnectionFactory
- 配置 Exchange/Queue/Binding
- 配置 RabbitAdmin 创建上一步的 Exchange/Queue/Binding
- 配置 RabbitListenerContainerFactory
- 配置 @RabbitListener/@RabbitHandler 用于接收消息
默认情况下的配置如下
| 配置项 | 默认值 | 作用 | 
|---|---|---|
| host | localhost | RabbitMQ服务器地址 | 
| port | 5672 | RabbitMQ服务器端口 | 
| username | guest | 用户名 | 
| password | guest | 密码 | 
| virtualHost | / | RabbitMQ 虚拟主机名 | 
| publisherConfirms | false | 设置是否启用生产方确认 | 
| publisherReturns | false | 设置是否启用生产方消息返回 | 
| ssl | 对象 | 配置 SSL,默认停用 | 
| template | 对象 | 设置 RabbitTemplate | 
| template.retry | 默认停用 | 设置RabbitTemplate发送消息时的重试,主要用于RabbitTemplate与RabbitMQ之间的网络连接 | 
| template.mandatory | false | 设置发送消息失败时(无接收queue)是否return 消息,与return callback一并使用 | 
| template.exchange | “” | 默认发送的exchange | 
| template.routingKey | “” | 默认发送消息时的routing key | 
| template.defaultReceiveQueue | null | 默认接收消息的queue | 
| listener.simple | 对象 | 设置SimpleRabbitListenerContainerFactory | 
| listener.direct | 对象 | 设置DirectRabbitListenerContainerFactory | 
| listener.simple.concurrency | null | 并发消费方数量 | 
| listener.simple.acknowledgeMode | AUTO | 设置消费方确认模式,这里的AUTO与RabbitMQ的自动确认不是一回事 | 
| listener.simple.prefetch | 250 | 设置消费方一次性接收消息的条数 | 
| listener.simple.defaultRequeueRejected | true | 当Listener发生异常时是否requeue | 
| listener.simple.retry | 对象 | 设置Listener的重试机制,默认停用,当启用时,Listener对于消息处理过程中的异常将进行requeue重试,超过重试次数再抛弃,此时AmqpRejectAndDontRequeueException异常也会被重试 | 
通过配置类加载
@Configuration
public class RabbitClusterConfig {
    @Value("${spring.rabbitmq.addresses}")
    private String addresses;
    @Value("${spring.rabbitmq.username}")
    private String username;
    @Value("${spring.rabbitmq.password}")
    private String password;
    @Value("${spring.rabbitmq.virtual-host}")
    private String virtualHost;
    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        // connectionFactory.setHost();
        // connectionFactory.setPort();
        connectionFactory.setAddresses(addresses);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        connectionFactory.setVirtualHost(virtualHost);
        return connectionFactory;
    }
    @Bean
    // 如果需要对 rabbitTemplate 设置不同的回调类,需要设置原型模式,不然回调类只能有一个
    @Scope(value = ConfigurableBeanFactory.SCOPE_PROTOTYPE,proxyMode = ScopedProxyMode.TARGET_CLASS)
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        // 设置消布确认回调,即当消息达到交换机回调
        // rabbitTemplate.setConfirmCallback();
        // 消息(带有 RoutingKey)到达交换机,与交换机的所有所有绑定的键进行匹配,匹配不到触发回调
        // rabbitTemplate.setReturnsCallback();
        return rabbitTemplate;
    }
    public static final String CLUSTER_EXCHANGE_NAME = "cluster_direct_exchange";
    public static final String CLUSTER_QUEUE_NAME = "cluster_direct_queue";
    public static final String CLUSTER_ROUTING_KEY = "cluster";
    @Bean
    public DirectExchange clusterDirectExchange() {
        return new DirectExchange(CLUSTER_EXCHANGE_NAME,true,false);
    }
    @Bean
    public Queue clusterDirectQueue() {
        return new Queue(CLUSTER_QUEUE_NAME,true,false,false);
    }
    @Bean
    public Binding clusterDirectBinding(@Qualifier("clusterDirectQueue") Queue clusterDirectQueue,
                                        @Qualifier("clusterDirectExchange") DirectExchange clusterDirectExchange) {
        return BindingBuilder.bind(clusterDirectQueue).to(clusterDirectExchange).with(CLUSTER_ROUTING_KEY);
    }
}本作品采用《CC 协议》,转载必须注明作者和本文链接
 
           HuDu 的个人博客
 HuDu 的个人博客
         
           
           关于 LearnKu
                关于 LearnKu
               
                     
                     
                     粤公网安备 44030502004330号
 粤公网安备 44030502004330号