RabbitMQ - SpringBoot 案例 - fanout 模式

整体核心

RabbitMQ - SpringBoot 案例 - fanout 模式

生产者模块

创建 springboot-rabbitmq-producer 的 springboot 项目

项目结构如下

RabbitMQ - SpringBoot 案例 - fanout 模式

web.xml 配置

<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

application.yml 配置

server:
  port: 9900
  servlet:
    context-path: /producer
spring:
  application:
    name: springboot-rabbitmq-producer
  rabbitmq:
    #指定client连接到的server的地址,多个以逗号分隔(优先取addresses,然后再取host)
    addresses: 192.168.1.xxx:5671,192.168.1.xxx:5672,192.168.1.xxx:5673
    # host: 192.168.1.xxx
    # port: 5672
    virtual-host: /
    username: xxx
    password: xxx

代码示例

rabbitTemplate 配置

@Configuration
public class RabbitTemplateConfig {

    @Value("${spring.rabbitmq.addresses}")
    private String addresses;

    @Value("${spring.rabbitmq.username}")
    private String username;

    @Value("${spring.rabbitmq.password}")
    private String password;

    @Value("${spring.rabbitmq.virtual-host}")
    private String virtualHost;

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setAddresses(addresses);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        connectionFactory.setVirtualHost(virtualHost);

        return connectionFactory;
    }

    @Bean
    // 如果需要对 rabbitTemplate 设置不同的回调类,需要设置原型模式,不然回调类只能有一个
    // @Scope(value = ConfigurableBeanFactory.SCOPE_PROTOTYPE,proxyMode = ScopedProxyMode.TARGET_CLASS)
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        return new RabbitTemplate(connectionFactory);
    }
}

服务层代码

@Service
public class OrderService {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * 模拟用户下单
     * @param userId
     * @param productId
     * @param num
     */
    public void makeOrder(String userId,String productId,int num) {
        // 1:根据id查询商品是否充足
        // 2:保存订单
        String orderId = UUID.randomUUID().toString();
        System.out.println("订单生成成功:"+orderId);
        // 3:通过 MQ 来完成消息的分发
        // 交换机,路由 key/queue 队列名称,消息内容
        String exchangeName = "fanout_order_exchange";
        String routingKey = "";
        rabbitTemplate.convertAndSend(exchangeName,routingKey,orderId);
    }
}

config 配置类

@Configuration
public class RabbitMqConfiguration {

    // 1;声明注册 fanout 模式的交换机
    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange("fanout_order_exchange",true,false);
    }
    // 2:声明队列 sms.fanout.queue email.fanout.queue weChat.fanout.queue
    @Bean
    public Queue smsQueue () {
        return new Queue("sms.fanout.queue",true);
    }

    @Bean
    public Queue emailQueue () {
        return new Queue("email.fanout.queue",true);
    }

    @Bean
    public Queue weChatQueue () {
        return new Queue("weChat.fanout.queue",true);
    }
    // 3;完成绑定关系(队列和交换机完成绑定关系)
    @Bean
    public Binding smsBinding() {
        return BindingBuilder.bind(smsQueue()).to(fanoutExchange());
    }

    @Bean
    public Binding emailBinding() {
        return BindingBuilder.bind(emailQueue()).to(fanoutExchange());
    }

    @Bean
    public Binding weChatBinding() {
        return BindingBuilder.bind(weChatQueue()).to(fanoutExchange());
    }
}

测试类

@SpringBootTest
class SpringbootRabbitmqProducerApplicationTests {

    @Autowired
    private OrderService orderService;

    @Test
    void contextLoads() {
        orderService.makeOrder("1","1",12);
    }

}

消费者模块

项目结构如下

RabbitMQ - SpringBoot 案例 - fanout 模式

和消费者模块区别在于修改一下端口

服务层代码如下

@Service
// 定义监听的队列
@RabbitListener(queues = {"email.fanout.queue"})
public class EmailConsumer {
    @RabbitHandler
    public void receiveMessage(String message) {
        System.out.println("email fanout--接收到的订单信息是:->" + message);
    }
}

@Service
@RabbitListener(queues = {"sms.fanout.queue"})
public class SMSConsumer {
  @RabbitHandler
  public void receiveMessage(String message) {
  System.out.println("sms fanout--接收到的订单信息是:->" + message);
  }
}

@Service
@RabbitListener(queues = {"weChat.fanout.queue"})
public class WeChatConsumer {
  @RabbitHandler
  public void receiveMessage(String message) {
  System.out.println("weChat fanout--接收到的订单信息是:->" + message);
  }
}

启动消费者模块,再启动生产者模块,效果如下

RabbitMQ - SpringBoot 案例 - fanout 模式

本作品采用《CC 协议》,转载必须注明作者和本文链接
讨论数量: 0
(= ̄ω ̄=)··· 暂无内容!

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!
未填写
文章
247
粉丝
18
喜欢
217
收藏
62
排名:731
访问:9753
私信
所有博文
社区赞助商