深入浅出RocketMQ(八):consumer消费消息源码分析(重点)

发布时间:2021-10-28 22:18:04



目录
前言1、Consumer注册监听器1.1 MessageListener1.2 ConsumeMessageService1.3 DefaultMQPushConsumerImpl
2、监听器消费消息2.1 消费消息2.2 consumeMessageDirectly2.3 ConsumeRequest#run()2.3.1 设置ackIndex2.3.2 消息重试2.3.3 消费进度更新

3、默认重试次数3.1 sendMessageBack方法3.2 执行链追踪
结尾


前言

????在上一章,我们了解了一下在RocketMQ怎么保证消息不丢失,在上一章的第四节,对消费失败这里我们只是粗略带过,那么在本章,我们来深入了解一下RocketMQ在消费消息时的源码,希望通过本章的学*,能对consumer消费消息的逻辑有个大概了解。


1、Consumer注册监听器

????先来看一段简单的代码:


consumer.registerMessageListener((MessageListenerConcurrently) (list, consumeConcurrentlyContext) -> {
try {
for (Message msg : list) {
String body = new String(msg.getBody(), StandardCharsets.UTF_8);
System.out.println(body);
}
} catch (Exception e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});

????这段代码是干嘛的,相信大家一眼就能看懂。那么大家有没有思考过:


????我们返回了这两个状态后,RocketMQ是怎么处理的?
????我们注册的消息监听器MessageListenerConcurrently又是在哪里被用到的?


????接下来我将通过分析源码的形式为讲解这两个问题。


1.1 MessageListener

????既然是监听器,那肯定会有被注册的地方,我们点进去MessageListenerConcurrently接口发现,该接口继承于MessageListener,但是再点进去会发现,这是一个空接口。





????但有趣的是,我们可以发现继承该接口的,除了MessageListenerConcurrently,还有另外一个接口:MessageListenerOrderly





????这个接口用来干嘛的应该不用我多说了,有序消费MessageListenerOrderly和并发消费MessageListenerConcurrently。


????但我们还没有找到究竟是谁注册了这两个消息监听器,既然它们都提供了消费消息的方法,那我们就看看是哪个类调用了这个方法:







1.2 ConsumeMessageService

????调用了consumeMessage方法的类有两个,另一个是测试类很明显不是我们要找到,于是我们来到ConsumeMessageConcurrentlyService类。


????有了上面的经验,我们很容易能够联想到:既然有ConsumeMessageConcurrentlyService类,那是不是也有一个Order的类呢?


????我们点进去ConsumeMessageService接口,查看它的实现类,果然和我们猜测的一样:





????在这两个实现类的成员变量里,我们发现了刚才的消息监听器。


public class ConsumeMessageConcurrentlyService implements ConsumeMessageService {
private static final InternalLogger log = ClientLogger.getLog();
private final DefaultMQPushConsumerImpl defaultMQPushConsumerImpl;
private final DefaultMQPushConsumer defaultMQPushConsumer;
// MessageListenerConcurrently
private final MessageListenerConcurrently messageListener;
private final BlockingQueue consumeRequestQueue;
private final ThreadPoolExecutor consumeExecutor;
private final String consumerGroup;

????但是在构造方法我们会发现,这个消息监听器是从外部传进来的,那么我们还要继续追踪,看看这个消息监听器究竟是在哪里产生的。


1.3 DefaultMQPushConsumerImpl

????接下来的追踪就很简单了,点击构造器可以看到,除了DefaultMQPushConsumerImpl之外,其他的都是测试类:





// ……省略其他代码
if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
this.consumeOrderly = true;
this.consumeMessageService =
new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
this.consumeOrderly = false;
this.consumeMessageService =
new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
}
// ……省略其他代码

????其它逻辑都不重要,我们看这段if…else…代码,很明显是在为我们的消息监听器初始化,而判断条件就是this.getMessageListenerInner()是否为其右边类的实例


????再点进去这个方法后,我们就会发现,这玩意返回的不就是我们一开始找的MessageListener吗?如果到这里你还有疑问,那我再给你看一个方法:





????眼熟吗?没错,这玩意就是我们*时使用消费者时给它注册的消息监听器!兜兜转转我们最终回到了原点。


2、监听器消费消息

????消息监听器是注册上去了,但它是怎么消费消息的呢?接下来我们就要在ConsumeMessageService的实现类里面做文章了。(这里我们只分析ConsumeMessageConcurrentlyService)


2.1 消费消息

????ConsumeMessageService很明显是消费消息的核心类,但是这类里面这么多方法,我们不可能说一个一个方法去看它的逻辑。


????既然消息监听器是在这里使用的,那我们只需要找找consumeMessage方法在哪里被调用了。通过查找之后,有两个方法用到了它:

????????1. consumeMessageDirectly
????????2. ConsumeRequest#run()


2.2 consumeMessageDirectly

????该方法的作用很明显,顾名思义就是直接消费消息。该类会根据consumeMessage方法返回的状态来设置返回值中消费结果,然后在ClientRemotingProcessor#consumeMessageDirectly方法中将结果告知broker。


2.3 ConsumeRequest#run()

????ConsumeRequest是一个内部类,同时实现了Runnable接口,我们主要看它run方法的逻辑。


????run方法代码很长,这里我们只关注两个地方:


????首先它会获取消费结果,然后根据消费结果设置returnType的值,再装入consumeMessageContext中。


????最后它会调用ConsumeMessageConcurrentlyService#processConsumeResult()方法,该方法的源码有点长,接下来我们分三段分析该方法。


2.3.1 设置ackIndex

????在之前的调用链中,消息消费结果status传入了该方法中,而在该方法源码,首先是对该状态进行处理:





????如果返回结果是 CONSUME_SUCCESS,此时 ackIndex = msg.size() - 1


????如果返回结果是 RECONSUME_LATER, 此时 ackIndex = -1


2.3.2 消息重试

????上面的逻辑很简单,而这个ackIndex真正的用武之地是在接下来的一段逻辑,接下来的逻辑全是重点,全是重点,全是重点





????switch的判断逻辑是根据当前消费模式来判断的,可以看到,如果是广播模式,rocketMQ会直接丢弃RECONSUME_LATER状态的消息,因为在这个for循环里只做了一个日志记录。


????而集群模式下就不一样了,根据ackIndex的结果,如果是正常消费的话,ackIndex = msg.size() - 1,也就是说for循环是不会执行的


????如果返回结果是 RECONSUME_LATER,则这批所有的消息都会发送消息给Broker,也就是这一批消息都得重新消费


????此外,如果发送 ack 消息失败,则将该任务直接在消费者这边,再次在本地处理该批消息,默认演出5s后在消费者重新消费


????为什么说默认是5s呢?我们先继续看processConsumeResult方法最后一段逻辑。


2.3.3 消费进度更新

long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
}

????最后一段代码的逻辑比较简单,更新消息消费进度,不管消费成功与否,上述这些消息消费成功,其实就是修改消费偏移量。(失败的,会进行重试,会创建新的消息)


3、默认重试次数

????刚才说了,默认重试时间是5s,在之前也提到过,consumer的默认重试次数是16次。但是明明rocketMQ有18个消费级别,为什么默认重试次数只有16次呢?


????要回答这个问题,我们就得进入sendMessageBack方法中寻找答案。


3.1 sendMessageBack方法

public boolean sendMessageBack(final MessageExt msg, final ConsumeConcurrentlyContext context) {
int delayLevel = context.getDelayLevelWhenNextConsume();

// Wrap topic with namespace before sending back message.
msg.setTopic(this.defaultMQPushConsumer.withNamespace(msg.getTopic()));
try {
this.defaultMQPushConsumerImpl.sendMessageBack(msg, delayLevel, context.getMessageQueue().getBrokerName());
return true;
} catch (Exception e) {
log.error("sendMessageBack exception, group: " + this.consumerGroup + " msg: " + msg.toString(), e);
}

return false;
}

????点进来该方法,一个重要的变量直接映入眼帘:delayLevel。
而我们进入getDelayLevelWhenNextConsume方法会发现,delayLevelWhenNextConsume变量的默认值是0。


????奇了怪了,既然delayLevelWhenNextConsume默认是0的话,根据上面的逻辑:


if (!result) {
msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
msgBackFailed.add(msg);
}

????就算消费失败了+1,达到最高的重试次数也应该是18呀,为什么说是16呢?难道网络上面流传的都是假的?


3.2 执行链追踪

????如果你肯多花几秒钟时间去仔细看看,就会发现这个方法并不是真正干活的方法,它内部还是调用了别人的sendMessageBack方法,那么我们继续追踪下去。


????最后来到DefaultMQPushConsumerImpl#sendMessageBack方法:


public void sendMessageBack(MessageExt msg, int delayLevel, final String brokerName)
throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
try {
String brokerAddr = (null != brokerName) ? this.mQClientFactory.findBrokerAddressInPublish(brokerName)
: RemotingHelper.parseSocketAddressAddr(msg.getStoreHost());
this.mQClientFactory.getMQClientAPIImpl().consumerSendMessageBack(brokerAddr, msg,
this.defaultMQPushConsumer.getConsumerGroup(), delayLevel, 5000, getMaxReconsumeTimes());
} catch (Exception e) {
log.error("sendMessageBack Exception, " + this.defaultMQPushConsumer.getConsumerGroup(), e);

Message newMsg = new Message(MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()), msg.getBody());

String originMsgId = MessageAccessor.getOriginMessageId(msg);
MessageAccessor.setOriginMessageId(newMsg, UtilAll.isBlank(originMsgId) ? msg.getMsgId() : originMsgId);

newMsg.setFlag(msg.getFlag());
MessageAccessor.setProperties(newMsg, msg.getProperties());
MessageAccessor.putProperty(newMsg, MessageConst.PROPERTY_RETRY_TOPIC, msg.getTopic());
MessageAccessor.setReconsumeTime(newMsg, String.valueOf(msg.getReconsumeTimes() + 1));
MessageAccessor.setMaxReconsumeTimes(newMsg, String.valueOf(getMaxReconsumeTimes()));
MessageAccessor.clearProperty(newMsg, MessageConst.PROPERTY_TRANSACTION_PREPARED);
newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes());

this.mQClientFactory.getDefaultMQProducer().send(newMsg);
} finally {
msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQPushConsumer.getNamespace()));
}
}

????在这个方法,我们会看到一个很关键的地方:


newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes());

????看到这里相信不用我多说了,默认的确是0,在try代码块中使用的delayLevel就是0,也就是说这条消息会立即发送出去。


????但是如果这条消息发送失败,就会抛出异常然后被捕捉到进入重试机制。而重试机制中,使用的是该条消息设置的重试时间,默认也是0,在进行+3操作后,就是默认从5s开始,也就是级别为3,最后到最大值18正好16次。


结尾

????在本章我们通过对consumer源码剖析,了解了RocketMQ消费者的消费逻辑,下一章,我们来了解一下RocketMQ的消息在broker中是怎么存储的。

相关文档

  • git&gerrit学习??安装配置
  • 微信小程序开发拼图小游戏
  • python挖矿脚本_windows应急响应 -- powershell挖矿病毒清理办法
  • 2017出国游学打包行李技巧
  • 物业客服专员的工作总结
  • 肌研水乳好用吗平价水乳战斗机
  • win10_x64 wsl2 更新包
  • 关于描写秋雨的散文
  • 一般人手机设什么密码
  • C++传地址和传值方式简要分析
  • 洗衣机波轮涡轮拆卸妙招
  • 无法确定卷版本和状态_文件或目录损坏怎么办 文件或目录损坏且无法读取解决方案 手把手教你解决...
  • 法官逐级选任制度研究
  • 爱尔兰历史地理介绍
  • WIN10+YOLOv4,windows上完美执行YOLOv4目标检测
  • ARM的FP寄存器
  • 描写神态类的成语
  • 童年傻事四年级写事作文
  • Java广联达元素平衡_数据结构?平衡二叉树(AVL树)的原理以及Java代码的完全实现...
  • 微信朋友圈快速返回顶部的小技巧
  • 雾和霾是气溶胶吗
  • 小学生中秋节手抄报资料
  • 幼儿园大班教案《食物中毒》含反思
  • 有关于感恩企业的演讲稿范文
  • 图解使用Telnet程序手工发送邮件
  • 多核技术导论之操作系统对多核处理器的支持方法
  • 全球最长跨海大桥纪实
  • 三年级母爱400字作文
  • 网线电插线怎么连接电脑没反应怎么办
  • Unused import statement
  • 猜你喜欢

  • 会议系统项目投资建议及可行性分析
  • 山东省广饶县丁庄镇中心初级中学八年级数学上册 15.1.
  • 链路汇聚
  • Android Studio简单用户界面查看密码
  • 一蓑烟雨任*生——记抚顺道顿生物有限公司董事长张蜂
  • 我国中小企业国际贸易融资问题的探索研究
  • 小学教育顾问求职简历模板
  • 高中语文 第7课《记念刘和珍君》第一课时学案 新人教版必修1
  • 中考数学总复*第一轮基础知识复*第四章图形的认识及三角形第4讲等腰三角形练册本课件
  • 最新北师版初中数学七年级下册6.1 感受可能性导学案
  • 超详细SQL SERVER 2016跨网段和局域网发布订阅配置图解和常见问题
  • 解决:Failed to connect to repository : Error performing command: git.exe ls-remote -h xxxxxxx HEAD...
  • 【最新文档】双胞胎宝宝起名有大诀窍-优秀word范文 (2页)
  • (湘教版)八年级下册数学课件:2.2.1 第2课时 *行四边
  • 山东省聊城市高三数学模拟(理)试卷(二)
  • 深度剖析木马的植入与攻击
  • 金湖县陈桥镇南堆水产养殖专业合作社(企业信用报告)- 天眼查
  • 美国孩子的记单词教学法
  • (word完整版)初二英语试题及答案(2),推荐文档
  • 优秀的人体艺术摄影作品
  • 做传递正能量的使者
  • 四川省2016年下半年监理工程师执业资格:工程师的口头指示试题
  • 长春版五年级语文长春上册5语长春第9单元单元测试卷A卷
  • 光纤与光缆技术.ppt
  • 关于旅游管理应用型本科加强实践教学环节的思考
  • 如何在英语教学中实施德育渗透
  • 法院离婚申请书范本
  • “人口与计划生育目标管理责任书”计划生育工作计划
  • 关于美德养成的故事
  • 2019-2020年八年级期末考试生物试题及答案
  • 前金所是真的吗
  • 中医药大学急救护理学第三次作业
  • 水煮臭豆腐香锅贵阳民建黔菜研究发展中心特色菜品
  • 很权威的现金流量表分析教程 精品
  • 学会调控情绪2
  • 变电运维一体化的培训纲要与发展探讨
  • 关于老中医的顺口溜
  • 福建省弘源智能科技有限公司(企业信用报告)- 天眼查
  • 长春市中天装饰设计有限公司企业信息报告-天眼查
  • 以党的十五大精神为动力 加快广东商品流通体系建设
  • 农业集约化对产权制度和价格体系的依赖性
  • 2017年普通高等学校招生全国统一考试 语文(新课标I卷)word版 (含答案)
  • 电脑版