6-RocketMQ发送消息
public static void main(String[] args) throws Exception {
// Instantiate with a producer group name.
DefaultMQProducer producer = new
DefaultMQProducer("GroupNameDemo");
// Specify name server addresses.
producer.setNamesrvAddr("localhost:9876");
//Launch the instance.
producer.start();
for (int i = 0; i < 100; i++) {
//Create a message instance, specifying topic, tag and message body.
Message msg = new Message("TopicDemo" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
// Call send message to deliver message to one of brokers.
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
//Shut down once the producer instance is not longer in use.
producer.shutdown();
}
消息发送的基本流程
消息发送的入口DefaultMQProducer#send()
// DefaultMQProducer.java
@Override
public SendResult send(
Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
// 检查消息
Validators.checkMessage(msg, this);
msg.setTopic(withNamespace(msg.getTopic()));
// 发送消息
return this.defaultMQProducerImpl.send(msg);
}
消息检验
- 检查消息内容不能为空
- 检查消息长度不能为0
- 默认情况,消息大小不能超过 4M
// Validators.java
public static void checkMessage(Message msg, DefaultMQProducer defaultMQProducer)
throws MQClientException {
if (null == msg) {
// 抛异常
}
// topic
Validators.checkTopic(msg.getTopic());
// body 检查消息内容不能为空
if (null == msg.getBody()) {
// 抛异常
}
// 检查消息长度不能为0
if (0 == msg.getBody().length) {
// 抛异常
}
// 默认情况,消息大小不能超过 4M = 1024 * 1024 * 4;
if (msg.getBody().length > defaultMQProducer.getMaxMessageSize()) {
// 抛异常
}
}
消息发送
// DefaultMQProducerImpl.java
// DEFAULT SYNC
public SendResult send(Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
// this.defaultMQProducer.getSendMsgTimeout() 获取消息发送超时时间, 默认为3s
return send(msg, this.defaultMQProducer.getSendMsgTimeout());
}
public SendResult send(Message msg,long timeout) throws MQClientException,RemotingException, MQBrokerException, InterruptedException {
return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
}
private SendResult sendDefaultImpl(
Message msg,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
this.makeSureStateOK();
Validators.checkMessage(msg, this.defaultMQProducer);
final long invokeID = random.nextLong();
long beginTimestampFirst = System.currentTimeMillis();
long beginTimestampPrev = beginTimestampFirst;
long endTimestamp = beginTimestampFirst;
// 这里去查询主题路由信息
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
// 省略......
}
查询主题路由信息
如果生产者已经缓存了topic的路由信息,则直接返回。如果没有缓存,则向NameServer查询该topic的路由信息。
如果最终未能查询到路由信息,则直接抛出异常。
// DefaultMQProducerImpl.java
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
// 1. 先从缓存中获取
TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
if (null == topicPublishInfo || !topicPublishInfo.ok()) {
this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
// 2. 向NameServer查询该topic的路由信息
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
}
if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
return topicPublishInfo;
} else {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
return topicPublishInfo;
}
}
TopicPublishInfo
信息如下
public class TopicPublishInfo {
// 是否是顺序消息
private boolean orderTopic = false;
private boolean haveTopicRouterInfo = false;
// 该主题队列的消息队列
private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>();
// 每选择一次消息队列,该值会自增1,如果Integer.MAX_VALUE,则重置为0,用于选择消息队列。
private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
private TopicRouteData topicRouteData;
}
public class TopicRouteData extends RemotingSerializable {
private String orderTopicConf;
// topic队列元数据
private List<QueueData> queueDatas;
// topic分布的broker元数据
private List<BrokerData> brokerDatas;
private HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
}
具体信息如下
{
"haveTopicRouterInfo": true,
"messageQueueList": [
{
"brokerName": "PQSZ-L0039",
"queueId": 0,
"topic": "TopicDemo"
},
{
"brokerName": "PQSZ-L0039",
"queueId": 1,
"topic": "TopicDemo"
},
{
"brokerName": "PQSZ-L0039",
"queueId": 2,
"topic": "TopicDemo"
},
{
"brokerName": "PQSZ-L0039",
"queueId": 3,
"topic": "TopicDemo"
}
],
"orderTopic": false,
"sendWhichQueue": {
"andIncrement": 1497938501
},
"topicRouteData": {
"brokerDatas": [
{
"brokerAddrs": {
"0": "10.178.42.122:10911"
},
"brokerName": "PQSZ-L0039",
"cluster": "DefaultCluster"
}
],
"filterServerTable": {},
"queueDatas": [
{
"brokerName": "PQSZ-L0039",
"perm": 6,
"readQueueNums": 4,
"topicSynFlag": 0,
"writeQueueNums": 4
}
]
}
}
选择消息队列发送消息
根据主题路由信息选择消息队列
// DefaultMQProducerImpl.java
private SendResult sendDefaultImpl(
Message msg,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
// 省略......
if (topicPublishInfo != null && topicPublishInfo.ok()) {
boolean callTimeout = false;
MessageQueue mq = null;
Exception exception = null;
SendResult sendResult = null;
// 这里会设置一个请求次数
// 如果为同步: timesTotal = 1 + 2,有两次重试次数
// 如果为异步: timesTotal = 1, 重试操作会在回调接口中进行
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
int times = 0;
String[] brokersSent = new String[timesTotal];
for (; times < timesTotal; times++) {
String lastBrokerName = null == mq ? null : mq.getBrokerName();
// 选择消息队列, 首次进入时 lastBrokerName = null
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
if (mqSelected != null) {
mq = mqSelected;
brokersSent[times] = mq.getBrokerName();
// 省略......
// ......这里会计算是否超时,如果超时会直接抛出异常
// 发送信息
try{
sendResult = this.sendKernelImpl(
msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
// ......
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
// ......
} catch (Exception e) {
// 这里其实会有很多种异常,为了方便理解,直接在这里写成Exception
// 消息发送失败
// 启用Broker故障延迟机制,这里会将这个BrokerName保存起来
// 默认情况,使用30s来计算Broker故障规避时长,里面会有一些计算逻辑
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
}
// 省略......
}
}
}
}
从上面代码可以发现,消息发送成功或者失败都会调用MQFaultStrategy#updateFaultItem
,如果开启了Broker故障延迟机制,里面的代码才会执行。
/**
* @param brokerName broker名称
* @param currentLatency 本次消息发送延迟时间 currentLatency
* @param isolation 是否隔离,该参数的含义如果为true,则使用30s来计算Broker故障规避时长,
* 如果为false,则使用本次消息发送延迟时间来计算Broker故障规避时长。
*
*/
public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
if (this.sendLatencyFaultEnable) {
long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
}
}
选择消息队列有两种方式。
1)sendLatencyFaultEnable=false,默认不启用Broker故障延迟机制。
2)sendLatencyFaultEnable=true,启用Broker故障延迟机制。
不启用Broker故障延迟机制时选择队列的方法入口在TopicPublishInfo#selectOneMessageQueue
public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
// lastBrokerName == null 说明是第一次选择消息队列
if (lastBrokerName == null) {
// 此时直接用sendWhichQueue自增再获取值,与当前路由表中消息队列个数取模
return selectOneMessageQueue();
} else {
// lastBrokerName != null, 说明上一次发送消息失败了,这一次是重试
int index = this.sendWhichQueue.getAndIncrement();
for (int i = 0; i < this.messageQueueList.size(); i++) {
int pos = Math.abs(index++) % this.messageQueueList.size();
if (pos < 0)
pos = 0;
MessageQueue mq = this.messageQueueList.get(pos);
// 上一次调用 lastBrokerName 失败了,那么这一次调用 lastBrokerName 也可能失败
// 所以这里会过滤掉上一次调用失败的那个 Broker
if (!mq.getBrokerName().equals(lastBrokerName)) {
return mq;
}
}
// 如果上面没有获取队列
// 此时直接用sendWhichQueue自增再获取值,与当前路由表中消息队列个数取模
return selectOneMessageQueue();
}
}
public MessageQueue selectOneMessageQueue() {
// 用sendWhichQueue自增再获取值
int index = this.sendWhichQueue.getAndIncrement();
// 拿 index 与队列长度取模
int pos = Math.abs(index) % this.messageQueueList.size();
//
if (pos < 0)
pos = 0;
// 返回队列
return this.messageQueueList.get(pos);
}
启用Broker故障延迟机制,选择消息队列入口在MQFaultStrategy#selectOneMessageQueue
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
if (this.sendLatencyFaultEnable) {
try {
int index = tpInfo.getSendWhichQueue().getAndIncrement();
for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
if (pos < 0)
pos = 0;
MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
// 验证该消息队列是否可用,里面根据当前时间做对比
if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
return mq;
}
}
// 尝试从规避的Broker中选择一个可用的Broker,如果没有找到,将返回null
final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
if (writeQueueNums > 0) {
final MessageQueue mq = tpInfo.selectOneMessageQueue();
if (notBestBroker != null) {
mq.setBrokerName(notBestBroker);
mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
}
return mq;
} else {
latencyFaultTolerance.remove(notBestBroker);
}
} catch (Exception e) {
log.error("Error occurred when selecting message queue", e);
}
return tpInfo.selectOneMessageQueue();
}
return tpInfo.selectOneMessageQueue(lastBrokerName);
}
总结
消息队列负载机制:消息生产者在发送消息时,如果本地路由表中未缓存topic的路由信息,向NameServer发送获取路由信息请求,更新本地路由信息表,并且消息生产者每隔30s从NameServer更新路由表。
消息发送异常机制:消息发送高可用主要通过两个手段:重试与Broker规避。Broker规避就是在一次消息发送过程中发现错误,在某一时间段内,消息生产者不会选择该Broker(消息服务器)上的消息队列,提高发送消息的成功率。
本作品采用《CC 协议》,转载必须注明作者和本文链接
推荐文章: