RabbitMQ 入门案例 - Work 模式 - 轮询模式

架构图

RabbitMQ 入门案例 - Work 模式 - 轮询模式

当有多个消费者时,我们的消息会被哪个消费者消费?我们又如何均衡消费者消费信息的多少?
主要又两种模式
1、轮询模式的分发:一个消费者一条,按均分配
2、公平分发:根据消费者的消费能力进行公平分发,处理快的多处理,处理慢的少处理,按劳分配。

Work模式-轮询模式(Round-Robin)

  • 类型:无
  • 特点:该模式接收消息是当有多个消费者接入时,消息的分配模式是一个消费者一条,直至消息消费完成。

生产者消费者代码

public class Producer {
    public static void main(String[] args) {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.33.110");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("test");
        connectionFactory.setVirtualHost("/");
        Connection connection = null;
        Channel channel = null;
        try {
            connection = connectionFactory.newConnection("生产者");
            channel = connection.createChannel();

            // 准备发送消息内容
            for (int i = 0; i < 20; i++) {
                String msg = "message:"+i;
                channel.basicPublish("","queue1",null,msg.getBytes());
            }
            System.out.println("发送消息成功!");
        } catch (Exception e) {
            e.printStackTrace();
            System.out.println("消息发送出现异常");
        } finally {
            // 关闭通道释放连接
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
            }
            if (connection != null && connection.isOpen()) {
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

消费者 Work1 代码

public class Work1 {
    public static void main(String[] args) {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.33.110");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("test");
        connectionFactory.setVirtualHost("/");
        Connection connection = null;
        Channel channel = null;
        try {
            connection = connectionFactory.newConnection("消费者-Work1");
            channel = connection.createChannel();
            Channel finalChannel = channel;
//            finalChannel.basicQos(1);
            finalChannel.basicConsume("queue1", true, new DeliverCallback() {
                public void handle(String consumerTag, Delivery message) throws IOException {
                    try {
                        System.out.println("Work1-收到的消息是:" + new String(message.getBody(), "UTF-8"));
                        Thread.sleep(800);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }, new CancelCallback() {
                public void handle(String consumerTag) throws IOException {
                    System.out.println("Work1-获取消息失败");
                }
            });
            System.out.println("Work1-开始接收消息");
            System.in.read();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        } finally {
            // 关闭通道释放连接
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
            }
            if (connection != null && connection.isOpen()) {
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

消费者 Work2

public class Work2 {
    public static void main(String[] args) {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.33.110");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("test");
        connectionFactory.setVirtualHost("/");
        Connection connection = null;
        Channel channel = null;
        try {
            connection = connectionFactory.newConnection("消费者-Work2");
            channel = connection.createChannel();
            Channel finalChannel = channel;
//            finalChannel.basicQos(1);
            finalChannel.basicConsume("queue1", true, new DeliverCallback() {
                public void handle(String consumerTag, Delivery message) throws IOException {
                    try {
                        System.out.println("Work1-收到的消息是:" + new String(message.getBody(), "UTF-8"));
                        Thread.sleep(200);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }, new CancelCallback() {
                public void handle(String consumerTag) throws IOException {
                    System.out.println("Work2-获取消息失败");
                }
            });
            System.out.println("Work2-开始接收消息");
            System.in.read();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        } finally {
            // 关闭通道释放连接
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
            }
            if (connection != null && connection.isOpen()) {
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

首先创建队列 queue1,然后启动 Work1 和 Work2 两个消费者,接着启动 Producer 生产者。可以看到如下效果

RabbitMQ 入门案例 - Work 模式 - 轮询模式

RabbitMQ 入门案例 - Work 模式 - 轮询模式

这就是轮询分发模式,但是会出现问题,该模式不会因为实际情况的网络带宽的延迟,服务器资源等来进行合理分配

公平分发测试

消费者代码不变,Work1 和 Work2 消费者的代码变为手动应答机制,这里以 Work1 代码为例
public class Work1 {
    public static void main(String[] args) {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("192.168.33.110");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("test");
        connectionFactory.setVirtualHost("/");
        Connection connection = null;
        Channel channel = null;
        try {
            connection = connectionFactory.newConnection("消费者-Work1");
            channel = connection.createChannel();
            final Channel finalChannel = channel;
            // 定义指标,qos=1,默认是没有设置,为null,所以默认为轮询分发,1 表示每次从队列中取多少条数据
            finalChannel.basicQos(1);
            finalChannel.basicConsume("queue1", false, new DeliverCallback() {
                public void handle(String consumerTag, Delivery message) throws IOException {
                    try {
                        System.out.println("Work1-收到的消息是:" + new String(message.getBody(), "UTF-8"));
                        Thread.sleep(800);
                        finalChannel.basicAck(message.getEnvelope().getDeliveryTag(),false);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }, new CancelCallback() {
                public void handle(String consumerTag) throws IOException {
                    System.out.println("Work1-获取消息失败");
                }
            });
            System.out.println("Work1-开始接收消息");
            System.in.read();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        } finally {
            // 关闭通道释放连接
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
            }
            if (connection != null && connection.isOpen()) {
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

RabbitMQ 入门案例 - Work 模式 - 轮询模式

代码执行效果如下,由于 Work2 执行效率快很多,所以 Wokr2 消费了大部分的消息

RabbitMQ 入门案例 - Work 模式 - 轮询模式

RabbitMQ 入门案例 - Work 模式 - 轮询模式

本作品采用《CC 协议》,转载必须注明作者和本文链接
讨论数量: 0
(= ̄ω ̄=)··· 暂无内容!

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!
未填写
文章
247
粉丝
18
喜欢
217
收藏
62
排名:731
访问:9753
私信
所有博文
社区赞助商