7-RocketMQ拉取消息

消费者启动

public class Consumer {
    public static void main(String[] args) throws InterruptedException, MQClientException {
        // 实例化消费者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("GroupNameDemo");
        // 设置NameServer的地址
        consumer.setNamesrvAddr("localhost:9876");
        // 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息
        consumer.subscribe(
                "TopicDemo",
                "*"); // 多个用 || 分割,* 表示所有
        // 注册回调实现类来处理从broker拉取回来的消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                // 标记该消息已经被成功消费
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // 启动消费者实例
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

第一步:构建主题的订阅关系DefaultMQPushConsumerImpl#subscribe(String topic, String subExpression),将需要订阅的主题信息存放到RebalanceImpl类中的subscriptionInner属性:

protected final ConcurrentMap<String /* topic */, SubscriptionData> subscriptionInner =
        new ConcurrentHashMap<String, SubscriptionData>();

订阅重试主题消息。RocketMQ消息重试是以消费组为单位,而不是主题,消息重试主题名为%RETRY%+消费组名。消费者在启动的时候会自动订阅该主题,参与该主题的消息队列负载。

第二步:注册回调方法。当拉取到消息时会调用这个方法来处理消息。

第三步:启动消费者。初始化MQClientInstanceRebalanceImple(消息重新负载实现类)等。向MQClientInstance注册消费者,并启动MQClientInstance,在一个JVM中的所有消费者、生产者持有同一个MQClientInstanceMQClientInstance只会启动一次。

消息拉取

消息消费有两种模式:广播模式与集群模式,广播模式比较简单,每一个消费者需要去拉取订阅主题下所有消费队列的消息,本节主要基于集群模式。在集群模式下,同一个消费组内有多个消息消费者,同一个主题存在多个消费队列,那么消费者如何进行消息队列负载呢?从上文启动流程也知道,每一个消费组内维护一个线程池来消费消息,那么这些线程又是如何分工合作的呢?

消息队列负载,通常的做法是一个消息队列在同一时间只允许被同一个消费组中的一个消息消费者消费,一个消息消费者可以同时消费多个消息队列,那么RocketMQ是如何实现的呢?

MQClientInstance#start的启动流程中可以看出,RocketMQ使用一个单独的线程PullMessageService来负责消息的拉取。

PullMessageService

PullMessageService继承了ServiceThreadServiceThread实现了Runnable接口,PullMessageServiceMQClientInstance中的属性并跟随MQClientInstance启动。

查看它的run方法

// PullMessageService.java
@Override
public void run() {
    log.info(this.getServiceName() + " service started");
    // stopped 声明为 volatile
    // 每执行一次业务逻辑检测一下其运行状态,
    // 可以通过其他线程将stopped设置为true从而停止该线程。
    while (!this.isStopped()) {
        try {
            // 从pullRequestQueue中获取一个PullRequest消息拉取任务,
            // 如果pullRequest Queue为空,则线程将阻塞,直到有拉取任务被放入。
            PullRequest pullRequest = this.pullRequestQueue.take();
            // 调用pullMessage方法进行消息拉取
            this.pullMessage(pullRequest);
        } catch (InterruptedException ignored) {
        } catch (Exception e) {
            log.error("Pull Message Service Run Method exception", e);
        }
    }

    log.info(this.getServiceName() + " service end");
}

这里主要是从pullRequestQueue获取任务,pullRequestQueue的类型是LinkedBlockingQueue<PullRequest>

任务的功能就是去拉取消息。接下来需要搞清楚PullRequest是什么时候被添加的。

PullMessageService提供延迟添加与立即添加2种方式将PullRequest放入到pullRequestQueue中:

PullMessageService#executePullRequestLater

PullMessageService#executePullRequestImmediately

查看PullMessageService#executePullRequestImmediately方法的调用链可以发现,主要有两个地方会调用,一个是在RocketMQ根据PullRequest拉取任务执行完一次消息拉取任务后,又将PullRequest对象放入到pullRequestQueue,第二个是在RebalancceImpl中创建,RebalanceImpl实现了消息队列负载机制,也就是PullRequest对象真正创建的地方,具体创建逻辑下面再分析。

先来看看PullRequest类的数据结构

public class PullRequest {
    // 消费者组
    private String consumerGroup;
    // 待拉取消费队列
    private MessageQueue messageQueue;
    // 消息处理队列,从Broker拉取到的消息先存入Proccess Queue,
    // 然后再提交到消费者消费线程池消费。
    private ProcessQueue processQueue;
    // 待拉取的MessageQueue偏移
    private long nextOffset;
    // 是否被锁定
    private boolean lockedFirst = false;
}

下面来查看从pullRequestQueue队列中拿到PullRequest信息后是做了什么

// PullMessageService#pullMessage
private void pullMessage(final PullRequest pullRequest) {
    final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
    if (consumer != null) {
        DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
        impl.pullMessage(pullRequest);
    } else {
        log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);
    }
}
  1. PullRequest中获取ProcessQueue

    • 如果处理队列当前状态被丢弃,结束本次消息拉取,如果没有被丢弃,更新ProcessQueue的lastPullTimestamp为当前时间戳;

    • 如果消费者状态异常,则将拉取任务延迟1s再次放入到PullMessageService的拉取任务队列中

    • 如果当前消费者被挂起,则将拉取任务延迟1s再次放入到PullMessageService的拉取任务队列中,结束本次消息拉取。

final ProcessQueue processQueue = pullRequest.getProcessQueue();
if (processQueue.isDropped()) {
    log.info("the pull request[{}] is dropped.", pullRequest.toString());
    return;
}

pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());

try {
    this.makeSureStateOK();
} catch (MQClientException e) {
    log.warn("pullMessage exception, consumer state not ok", e);
    this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
    return;
}

if (this.isPause()) {
    log.warn("consumer was paused, execute pull request later. instanceName={}, group={}", this.defaultMQPushConsumer.getInstanceName(), this.defaultMQPushConsumer.getConsumerGroup());
    this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND);
    return;
}
  1. 进行消息拉取流控。从消息消费数量与消费间隔两个维度进行控制。

    • 未消息处理总数,如果ProcessQueue当前处理的消息条数超过了1000将触发流控,放弃本次拉取任务,并且该队列的下一次拉取任务将在50毫秒后才加入到拉取任务队列中,每触发1000次流控后输出提示语。
    • 未处理消息大小,如果ProcessQueue当前处理的消息大小超过100MB,放弃本次拉取任务,并且该队列的下一次拉取任务将在50毫秒后才加入到拉取任务队列中,每触发1000次流控后输出提示语。
// 获取 ProcessQueue 中的消息数量
long cachedMessageCount = processQueue.getMsgCount().get();
// 获取 ProcessQueue 中缓存的消息大小
long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);

if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
    this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
    if ((queueFlowControlTimes++ % 1000) == 0) {
        log.warn(
            "the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
            this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
    }
    return;
}

if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {
    this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
    if ((queueFlowControlTimes++ % 1000) == 0) {
        log.warn(
            "the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
            this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
    }
    return;
}
  1. ProcessQueue中队列最大偏移量与最小偏离量的间距

    • 非顺序消息,默认情况不能超过2000,否则触发流控,放弃本次拉取任务,并且该队列的下一次拉取任务将在50毫秒后才加入到拉取任务队列中

    • 顺序消息

      • 如果ProcessQueue是锁定状态,获取服务端的消费偏移量offset

        pullRequest.setNextOffset(offset)

      • 如果ProcessQueue未锁定状态,放弃本次拉取任务,并且该队列的下一次拉取任务将在50毫秒后才加入到拉取任务队列中

if (!this.consumeOrderly) {
    if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {
        this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
        if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {
            log.warn(
                "the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}",
                processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(),
                pullRequest, queueMaxSpanFlowControlTimes);
        }
        return;
    }
} else {
    if (processQueue.isLocked()) {
        if (!pullRequest.isLockedFirst()) {
            final long offset = this.rebalanceImpl.computePullFromWhere(pullRequest.getMessageQueue());
            boolean brokerBusy = offset < pullRequest.getNextOffset();
            // 省略......
            pullRequest.setLockedFirst(true);
            pullRequest.setNextOffset(offset);
        }
    } else {
        this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
        log.info("pull message later because not locked in broker, {}", pullRequest);
        return;
    }
}
  1. 拉取该主题订阅信息,如果为空,结束本次消息拉取,关于该队列的下一次拉取任务延迟3s。
final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
if (null == subscriptionData) {
    this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
    log.warn("find the consumer's subscription failed, {}", pullRequest);
    return;
}
  1. 调用PullAPIWrapper.pullKernelImpl方法后与服务端交互
try {
    this.pullAPIWrapper.pullKernelImpl(
        pullRequest.getMessageQueue(), // 从哪个消息消费队列拉取消息
        subExpression, // 消息过滤表达式
        subscriptionData.getExpressionType(), // 消息表达式类型,分为TAG、SQL92。
        subscriptionData.getSubVersion(),
        pullRequest.getNextOffset(), // 消息拉取偏移量
        this.defaultMQPushConsumer.getPullBatchSize(), // 本次拉取最大消息条数,默认32条
        sysFlag,
        commitOffsetValue,
        BROKER_SUSPEND_MAX_TIME_MILLIS,
        CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,
        CommunicationMode.ASYNC, // 消息拉取模式,默认为异步拉取
        pullCallback // 从Broker拉取到消息后的回调方法。
    );
} catch (Exception e) {
    log.error("pullKernelImpl exception", e);
    // 异常后,放弃本次拉取任务,并且该队列的下一次拉取任务将在50毫秒后才加入到拉取任务队列中
    this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
}
  1. 首先将拉取到的消息存入ProcessQueue,然后将拉取到的消息提交到ConsumeMessageService中供消费者消费,该方法是一个异步方法,也就是PullCallBack将消息提交到ConsumeMessageService中就会立即返回,至于这些消息如何消费,PullCallBack不关注。

    然后根据pullInterval参数,如果pullInterval>0,则等待pullInterval毫秒后将PullRequest对象放入到PullMessageService的pullRequestQueue中,该消息队列的下次拉取即将被激活,达到持续消息拉取,实现准实时拉取消息的效果。

boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
    pullResult.getMsgFoundList(),
    processQueue,
    pullRequest.getMessageQueue(),
    dispatchToConsume);

if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
    DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
                                                           DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
} else {
    DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
}

消息队列负载与重新分布机制

PullMessageService在启动时由于LinkedBlockingQueue<PullRequest>pullRequestQueue中没有PullRequest对象,故PullMessageService线程将阻塞。

问题1:PullRequest对象在什么时候创建并加入到pullRequestQueue中以便唤醒PullMessageService线程。

问题2:集群内多个消费者是如何负载主题下的多个消费队列,并且如果有新的消费者加入时,消息队列又会如何重新分布。

查看RebalanceService类的run方法,RebalanceServiceMQClientInstance中的属性并跟随MQClientInstance启动。

public class RebalanceService extends ServiceThread {
    private static long waitInterval =
        Long.parseLong(System.getProperty(
            "rocketmq.client.rebalance.waitInterval", "20000"));
    @Override
    public void run() {
        log.info(this.getServiceName() + " service started");
        while (!this.isStopped()) {
            this.waitForRunning(waitInterval);
            this.mqClientFactory.doRebalance();
        }

        log.info(this.getServiceName() + " service end");
    }
}

RebalanceService线程默认每隔20s执行一次mqClientFactory.doRebalance()方法,可以使用Drocketmq.client.rebalance.waitInterval=interval来改变默认值。

// MQClientInstance#doRebalance
public class MQClientInstance {
    public void doRebalance() {
        for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
            MQConsumerInner impl = entry.getValue();
            if (impl != null) {
                try {
                      // 这里面最终调用的是RebalanceImpl#doRebalance
                    impl.doRebalance();
                } catch (Throwable e) {
                    log.error("doRebalance exception", e);
                }
            }
        }
    }
}

MQClientIinstance遍历已注册的消费者,对消费者执行doRebalance()方法。

public abstract class RebalanceImpl  {
    public void doRebalance(final boolean isOrder) {
          // 获取消费者所有的订阅信息
        Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
        if (subTable != null) {
            for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
                final String topic = entry.getKey();
                try {
                      // 消息队列负载与重新分配
                    this.rebalanceByTopic(topic, isOrder);
                } catch (Throwable e) {
                    if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                        log.warn("rebalanceByTopic Exception", e);
                    }
                }
            }
        }

        this.truncateMessageQueueNotMyTopic();
    }
}

每个DefaultMQPushConsumerImpl都持有一个单独的RebalanceImpl对象,该方法主要是遍历订阅信息对每个主题的队列进行重新负载。RebalanceImplMap<String, SubscriptionData> subTable在调用消费者DefaultMQPushConsumerImpl#subscribe方法时填充。如果订阅信息发送变化,例如调用了unsubscribe方法,则需要将不关心的主题消费队列从processQueueTable中移除。

接下来重点分析RebalanceImpl#rebalanceByTopic来分析RocketMQ是如何针对单个主题进行消息队列重新负载(以集群模式)。

public abstract class RebalanceImpl {
    private void rebalanceByTopic(final String topic, final boolean isOrder) {
        switch (messageModel) {
            case BROADCASTING: {
                // 广播模式
                break;
            }
            case CLUSTERING: {
                // 集群模式
                // 第一步:
                // 获取队列信息
                Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
                // 获取该消费组内当前所有的消费者客户端ID
                List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
                // 省略......
                if (mqSet != null && cidAll != null) {
                    List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
                    mqAll.addAll(mqSet);
                    // 第二步:
                    // 对消息队列排序
                    Collections.sort(mqAll);
                    // 对消费者排序
                    Collections.sort(cidAll);

                    // 第三步
                    // 队列负载策略
                    AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;

                    List<MessageQueue> allocateResult = null;
                    try {
                        allocateResult = strategy.allocate(
                                this.consumerGroup,
                                this.mQClientFactory.getClientId(),
                                mqAll,
                                cidAll);
                    } catch (Throwable e) {
                        // 省略......
                        return;
                    }
                    Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
                    if (allocateResult != null) {
                        allocateResultSet.addAll(allocateResult);
                    }
                    boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
                  // 省略......
                }
                break;
            }
            default:
                break;
        }
    }
}

第一步:从主题订阅信息缓存表中获取主题的队列信息;发送请求从Broker中获取该消费组内当前所有的消费者客户端ID,主题topic的队列可能分布在多个Broker上,那请求发往哪个Broker呢?RocketeMQ从主题的路由信息表中随机选择一个Broker。Broker为什么会存在消费组内所有消费者的信息呢?我们不妨回忆一下消费者在启动的时候会向MQClientInstance中注册消费者,然后MQClientInstance会向所有的Broker发送心跳包,心跳包中包含MQClientInstance的消费者信息。如果mqSet、cidAll任意一个为空则忽略本次消息队列负载。

第二步:首先对cidAll, mqAll排序,这个很重要,同一个消费组内看到的视图保持一致,确保同一个消费队列不会被多个消费者分配。RocketMQ消息队列分配算法接口。

第三步:队列负载策略。

RocketMQ默认提供5种分配算法。以AllocateMessageQueueAveragely为例:

如果现在有8个消息消费队列q1, q2, q3, q4, q5, q6, q7, q8,

有3个消费者c1, c2, c3,那么根据该负载算法,消息队列分配如下:

c1: q1, q2, q3

c2:q4, q5, q6

c3:q7, q8

第四步:调用updateProcessQueueTableInRebalance()方法,具体的做法是,先将分配到的消息队列集合(mqSet)与processQueueTable做一个过滤比对。

  • 上图中processQueueTable标注的红色部分,表示与分配到的消息队列集合mqSet互不包含。将这些队列设置Dropped属性为true,然后查看这些队列是否可以移除出processQueueTable缓存变量,这里具体执行removeUnnecessaryMessageQueue()方法,即每隔1s 查看是否可以获取当前消费处理队列的锁,拿到的话返回true。如果等待1s后,仍然拿不到当前消费处理队列的锁则返回false。如果返回true,则从processQueueTable缓存变量中移除对应的Entry;

  • 上图中processQueueTable的绿色部分,表示与分配到的消息队列集合mqSet的交集。判断该ProcessQueue是否已经过期了,在Pull模式的不用管,如果是Push模式的,设置Dropped属性为true,并且调用removeUnnecessaryMessageQueue()方法,像上面一样尝试移除Entry;

最后,为过滤后的消息队列集合(mqSet)中的每个MessageQueue创建一个ProcessQueue对象并存入RebalanceImpl的processQueueTable队列中(其中调用RebalanceImpl实例的computePullFromWhere(MessageQueue mq)方法获取该MessageQueue对象的下一个进度消费值offset,随后填充至接下来要创建的pullRequest对象属性中),并创建拉取请求对象—pullRequest添加到拉取列表—pullRequestList中,最后执行dispatchPullRequest()方法,将Pull消息的请求对象PullRequest依次放入PullMessageService服务线程的阻塞队列pullRequestQueue中,待该服务线程取出后向Broker端发起Pull消息的请求。其中,可以重点对比下,RebalancePushImpl和RebalancePullImpl两个实现类的dispatchPullRequest()方法不同,RebalancePullImpl类里面的该方法为空。

消息消费队列在同一消费组不同消费者之间的负载均衡,其核心设计理念是在一个消息消费队列在同一时间只允许被同一消费组内的一个消费者消费,一个消息消费者能同时消费多个消息队列。

RocketMQ消息拉取由PullMessageServiceRebalanceService共同协作完成

本作品采用《CC 协议》,转载必须注明作者和本文链接
讨论数量: 0
(= ̄ω ̄=)··· 暂无内容!

讨论应以学习和精进为目的。请勿发布不友善或者负能量的内容,与人为善,比聪明更重要!