请问java消费rabbitmq队列消息,如何实现多线程并发消费消息呢?
java11
没有使用springboot
channel.exchangeDeclare(exchangeName, exchangeType, true);
// 4.监听队列
channel.queueBind("click", exchangeName, routingKey);
MyConsumer myConsumer = new MyConsumer(channel);
int prefetchCount = 100; // 设置每个消费者预取的消息数量
channel.basicQos(prefetchCount); // 设置消费者的 QoS
// 启动多个线程处理消息
for (int i = 0; i < 100; i++) {
Thread thread = new Thread(() -> {
try {
channel.basicConsume("bookmark_click", false, myConsumer);
} catch (IOException e) {
e.printStackTrace();
}
});
thread.start();
}
}
static class MyConsumer extends DefaultConsumer {
// 其他代码...
public MyConsumer(Channel channel) {
super(channel);
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
LocalDateTime currentTime = LocalDateTime.now();
System.out.println("Current time: " + currentTime);
getChannel().basicAck(envelope.getDeliveryTag(), false);
}
}
为什么以上代码.是一个个消息执行,每个消息需要一秒,为什么不能每一秒100个消息并发执行呢? 为什么会出现阻塞呢?