RabbitMQ 集群

一、RabbitMQ 集群介绍

  • 普通集群
  • 镜像集群

1.1、普通镜像模式

普通集群模式,就是将 RabbitMQ 部署到多台服务器上,每个服务器启动一个 RabbitMQ 实例,多个实例之间进行消息通信。

此时我们创建的队列 Queue,它的元数据(主要就是 Queue 的一些配置信息)会在所有的 RabbitMQ 实例中进行同步,但是队列中的消息只会存在于一个 RabbitMQ 实例上,而不会同步到其他队列。

当我们消费消息的时候,如果连接到了另外一个实例,那么那个实例会通过元数据定位到 Queue 所在的位置,然后访问 Queue 所在的实例,拉取数据过来发送给消费者。

这种集群可以提高 RabbitMQ 的消息吞吐能力,但是无法保证高可用,因为一旦一个 RabbitMQ 实例挂了,消息就没法访问了,如果消息队列做了持久化,那么等 RabbitMQ 实例恢复后,就可以继续访问了;如果消息队列没做持久化,那么消息就丢了。

大致的流程图如下图:

RabbitMQ 集群

1.2、镜像集群

它和普通集群最大的区别在于 Queue 数据和原数据不再是单独存储在一台机器上,而是同时存储在多台机器上。也就是说每个 RabbitMQ 实例都有一份镜像数据(副本数据)。每次写入消息的时候都会自动把数据同步到多台实例上去,这样一旦其中一台机器发生故障,其他机器还有一份副本数据可以继续提供服务,也就实现了高可用。

大致流程图如下图:

RabbitMQ 集群

1.3、节点类型

RabbitMQ 中的节点类型有两种:

  • RAM node:内存节点将所有的队列、交换机、绑定、用户、权限和 vhost 的元数据定义存储在内存中,好处是可以使得交换机和队列声明等操作速度更快。
  • Disk node:将元数据存储在磁盘中,单节点系统只允许磁盘类型的节点,防止重启 RabbitMQ 的时候,丢失系统的配置信息

RabbitMQ 要求在集群中至少有一个磁盘节点,所有其他节点可以是内存节点,当节点加入或者离开集群时,必须要将该变更通知到至少一个磁盘节点。如果集群中唯一的一个磁盘节点崩溃的话,集群仍然可以保持运行,但是无法进行其他操作(增删改查),直到节点恢复。为了确保集群信息的可靠性,或者在不确定使用磁盘节点还是内存节点的时候,建议直接用磁盘节点。

二、搭建普通集群

2.1、预备知识

搭建之前,有两个预备知识需要大家了解:

1.搭建集群时,节点中的 Erlang Cookie 值要一致,默认情况下,文件在 /var/lib/rabbitmq/.erlang.cookie,我们在用 docker 创建 RabbitMQ 容器时,可以为之设置相应的 Cookie 值。

2.RabbitMQ 是通过主机名来连接服务,必须保证各个主机名之间可以 ping 通。可以通过编辑 /etc/hosts 来手工添加主机名和 IP 对应关系。如果主机名 ping 不通,RabbitMQ 服务启动会失败(如果我们是在不同的服务器上搭建 RabbitMQ 集群,大家需要注意这一点,接下来的 2.2 小结,我们将通过 Docker 的容器连接 link 来实现容器之间的访问,略有不同)。

2.2、开始搭建

$ docker run -d --hostname rabbit01 --name mq01 -p 5671:5672 -p 15671:15672 -e RABBITMQ_ERLANG_COOKIE="rabbitmq_cookie" rabbitmq:3.9.22

$ docker run -d --hostname rabbit02 --name mq02 --link mq01:mylink01 -p 5672:5672 -p 15672:15672 -e RABBITMQ_ERLANG_COOKIE="rabbitmq_cookie" rabbitmq:3.9.22

$ docker run -d --hostname rabbit03 --name mq03 --link mq01:mylink02 --link mq02:mylink03 -p 5673:5672 -p 15673:15672 -e RABBITMQ_ERLANG_COOKIE="rabbitmq_cookie" rabbitmq:3.9.22

添加完参数之后会出现警告:RABBITMQ_ERLANG_COOKIE env variable support is deprecated and will be REMOVED in a future version. Use the $HOME/.erlang.cookie file or the --erlang-cookie switch instead.

准备一个.erlang.cookie文件,内容为rabbitmq_cookie。修改权限为读写,不修改会报权限错误erlang.cookie must be accessible by owner only

$ chmod 600 .erlang.cookie
$ docker run -d --name mq01 --hostname rabbit01 -p 15671:15672 -p 5671:5672 rabbitmq:3.9.22-management

$ docker run -d --name mq02 --hostname rabbit02 -p 15672:15672 -p 5672:5672 --link mq01:mylink01 rabbitmq:3.9.22-management

$ docker run -d --name mq03 --hostname rabbit03 -p 15673:15672 -p 5673:5672 --link mq01:mylink02 --link mq02:mylink03 rabbitmq:3.9.22-management
$ docker cp .erlang.cookie mq01:/var/lib/rabbitmq
$ docker cp .erlang.cookie mq02:/var/lib/rabbitmq
$ docker cp .erlang.cookie mq03:/var/lib/rabbitmq

重启三个节点

$ docker restart mq01 mq02 mq03

三个节点现在就启动好了,注意在 mq02 和 mq03 中,分别使用了 --link 参数来实现容器连接,另外还需要注意,mq03 容器中要既能够连接 mq01 也能够连接 mq02。

--link说明:

1.当新建一个容器时,如果没有显示指定其使用的网络,那么默认会使用bridge网络
2.当一个容器link到另一个容器时,该容器可以通过IP或容器名称访问被link的容器,而被link容器可以通过IP访问该容器,但是无法通过容器名称访问
3.当被link的容器被删除时,创建link的容器也无法正常使用
4.如果两个容器被加入到我们手动创建的网络时,那么该网络内的容器相互直接可以通过IP和名称同时访问。

接下来开始集群的配置

进入 mq01

$ docker exec -it mq01 /bin/bash
$ rabbitmqctl stop_app
$ rabbitmqctl reset
$ rabbitmqctl start_app

分别进入 mq02 和 mq03,执行如下命令。

$ docker exec -it mq02 /bin/bash
$ rabbitmqctl stop_app
$ rabbitmqctl reset
$ rabbitmqctl join_cluster --ram rabbit@rabbit01
$ rabbitmqctl start_app

进入任意一个容器内,输入以下命令查看集群状态

$ rabbitmqctl cluster_status

RabbitMQ 集群

2.3、代码测试

application.yml 配置如下

server:
  port: 9092
spring:
  rabbitmq:
    host: localhost
    port: 5671
    password: guest
    username: guest
    virtual-host: /

rabbitTemplate 配置如下

@Configuration
public class RabbitTemplateConfig {

    @Value("${spring.rabbitmq.host}")
    private String host;

    @Value("${spring.rabbitmq.username}")
    private String username;

    @Value("${spring.rabbitmq.password}")
    private String password;

    @Value("${spring.rabbitmq.port}")
    private int port;

    @Value("${spring.rabbitmq.virtual-host}")
    private String virtualHost;

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host,port);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        connectionFactory.setVirtualHost(virtualHost);
        return connectionFactory;
    }

    @Bean
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    // 必须是 prototype 类型
    public RabbitTemplate rabbitTemplate() {
        return new RabbitTemplate(connectionFactory());
    }
}

创建队列、交换机以及绑定关系

@Configuration
public class DirectRabbitMQConfig {

    public static final String CLUSTER_EXCHANGE_NAME = "cluster_direct_exchange";
    public static final String CLUSTER_QUEUE_NAME = "cluster_direct_queue";
    public static final String CLUSTER_ROUTING_KEY = "cluster";

    @Bean
    public DirectExchange clusterDirectExchange() {
        return new DirectExchange(CLUSTER_EXCHANGE_NAME,true,false);
    }

    @Bean
    public Queue clusterDirectQueue() {
        return new Queue(CLUSTER_QUEUE_NAME,true,false,false);
    }

    @Bean
    public Binding clusterDirectBinding(@Qualifier("clusterDirectQueue") Queue clusterDirectQueue,
                                        @Qualifier("clusterDirectExchange") DirectExchange clusterDirectExchange) {
        return BindingBuilder.bind(clusterDirectQueue).to(clusterDirectExchange).with(CLUSTER_ROUTING_KEY);
    }
}

在单元测试中进行消息发送测试

public class RabbitMQTest {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    void directQueueTest() {
        rabbitTemplate.convertAndSend(DirectRabbitMQConfig.CLUSTER_EXCHANGE_NAME,DirectRabbitMQConfig.CLUSTER_ROUTING_KEY,"Hello Rabbitmq cluster");
    }
}

RabbitMQ 集群

消费者端代码

@Service
public class DirectClusterConsumer {

    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    @RabbitListener(queues = {"cluster_direct_queue"})
    public void receiveClusterMessage(String message) {
        logger.info("接收到消息: " + message);
    }
}

RabbitMQ 集群

2.4、反向测试

2.4.1、停止主节点

在三个节点都启动的状态下,先发布一条消息,然后停掉主节点,也就是mq01

RabbitMQ 集群

然后动消费者服务,可以看到服务报错,无法获取到队列消息

RabbitMQ 集群

2.4.2、停止从节点

此时,停止任意子节点,这里停止mq02节点

RabbitMQ 集群

可以看到,消息队列的各项功能都不受影响。

RabbitMQ 集群

RabbitMQ 集群

三、配置镜像集群

镜像集群不需要额外搭建,只需要将队列配置为镜像队列即可。这个配置可以通过网页配置,也可以通过命令行配置。

3.1、网页配置镜像队列

点击 Admin 选项卡,然后点击右边的Policies,再点击 Add/update a policy,如下图:

RabbitMQ 集群

然后添加策略

RabbitMQ 集群

各参数含义如下:

Name:policy 的名称
Pattern:queue 的匹配模式(正则表达式)
Definition:镜像定义,主要由三个参数:ha-modeha-paramsha-sync-mode

  • ha-mode:指明镜像队列的模式,有效值为 all、exactly、nodes。其中 all 表示在集群中所有的节点上进行镜像(默认即此);exactly 表示在指定个数的节点上进行镜像,节点的个数由 ha-params 指定;nodes 表示在指定的节点上进行镜像,节点名称通过 ha-params 指定。
  • ha-params:ha-mode 模式需要用到的参数
  • ha-sync-mode:进行队列中消息的同步方式,有效值为 automatic 和 manual。
    priority 为可选参数,表示 policy 的优先级

添加完效果如下

RabbitMQ 集群

3.2、命令行配置

命令行的配置格式如下:

$ rabbitmqctl set_policy [-p vhost] [--priority priority] [--apply-to apply-to] {name} {pattern} {definition}
$ docker exec -it rabbit01 bash
$ rabbitmqctl set_policy -p / --apply-to queues my_queue_mirror "^" '{"ha-mode":"all","ha-sync-mode":"automatic"}'

3.3、测试

停止mq01节点,可以看到此时消息队列集群没有任何影响

RabbitMQ 集群

本作品采用《CC 协议》,转载必须注明作者和本文链接
讨论数量: 0
(= ̄ω ̄=)··· 暂无内容!

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!