RabbitMQ 入门案例 - Work 模式 - 轮询模式
架构图
当有多个消费者时,我们的消息会被哪个消费者消费?我们又如何均衡消费者消费信息的多少?
主要又两种模式
1、轮询模式的分发:一个消费者一条,按均分配
2、公平分发:根据消费者的消费能力进行公平分发,处理快的多处理,处理慢的少处理,按劳分配。
Work模式-轮询模式(Round-Robin)
- 类型:无
- 特点:该模式接收消息是当有多个消费者接入时,消息的分配模式是一个消费者一条,直至消息消费完成。
生产者消费者代码
public class Producer {
public static void main(String[] args) {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.33.110");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("test");
connectionFactory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
connection = connectionFactory.newConnection("生产者");
channel = connection.createChannel();
// 准备发送消息内容
for (int i = 0; i < 20; i++) {
String msg = "message:"+i;
channel.basicPublish("","queue1",null,msg.getBytes());
}
System.out.println("发送消息成功!");
} catch (Exception e) {
e.printStackTrace();
System.out.println("消息发送出现异常");
} finally {
// 关闭通道释放连接
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
消费者 Work1 代码
public class Work1 {
public static void main(String[] args) {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.33.110");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("test");
connectionFactory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
connection = connectionFactory.newConnection("消费者-Work1");
channel = connection.createChannel();
Channel finalChannel = channel;
// finalChannel.basicQos(1);
finalChannel.basicConsume("queue1", true, new DeliverCallback() {
public void handle(String consumerTag, Delivery message) throws IOException {
try {
System.out.println("Work1-收到的消息是:" + new String(message.getBody(), "UTF-8"));
Thread.sleep(800);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, new CancelCallback() {
public void handle(String consumerTag) throws IOException {
System.out.println("Work1-获取消息失败");
}
});
System.out.println("Work1-开始接收消息");
System.in.read();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
} finally {
// 关闭通道释放连接
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
消费者 Work2
public class Work2 {
public static void main(String[] args) {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.33.110");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("test");
connectionFactory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
connection = connectionFactory.newConnection("消费者-Work2");
channel = connection.createChannel();
Channel finalChannel = channel;
// finalChannel.basicQos(1);
finalChannel.basicConsume("queue1", true, new DeliverCallback() {
public void handle(String consumerTag, Delivery message) throws IOException {
try {
System.out.println("Work1-收到的消息是:" + new String(message.getBody(), "UTF-8"));
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, new CancelCallback() {
public void handle(String consumerTag) throws IOException {
System.out.println("Work2-获取消息失败");
}
});
System.out.println("Work2-开始接收消息");
System.in.read();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
} finally {
// 关闭通道释放连接
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
首先创建队列 queue1,然后启动 Work1 和 Work2 两个消费者,接着启动 Producer 生产者。可以看到如下效果
这就是轮询分发模式,但是会出现问题,该模式不会因为实际情况的网络带宽的延迟,服务器资源等来进行合理分配
公平分发测试
消费者代码不变,Work1 和 Work2 消费者的代码变为手动应答机制,这里以 Work1 代码为例
public class Work1 {
public static void main(String[] args) {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.33.110");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("test");
connectionFactory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
connection = connectionFactory.newConnection("消费者-Work1");
channel = connection.createChannel();
final Channel finalChannel = channel;
// 定义指标,qos=1,默认是没有设置,为null,所以默认为轮询分发,1 表示每次从队列中取多少条数据
finalChannel.basicQos(1);
finalChannel.basicConsume("queue1", false, new DeliverCallback() {
public void handle(String consumerTag, Delivery message) throws IOException {
try {
System.out.println("Work1-收到的消息是:" + new String(message.getBody(), "UTF-8"));
Thread.sleep(800);
finalChannel.basicAck(message.getEnvelope().getDeliveryTag(),false);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, new CancelCallback() {
public void handle(String consumerTag) throws IOException {
System.out.println("Work1-获取消息失败");
}
});
System.out.println("Work1-开始接收消息");
System.in.read();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
} finally {
// 关闭通道释放连接
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
代码执行效果如下,由于 Work2 执行效率快很多,所以 Wokr2 消费了大部分的消息
本作品采用《CC 协议》,转载必须注明作者和本文链接