标签:消费 offsetTable 模式 ConsumerOffsetManager topic 全站 offset 源码 硬核
offset
在rocketMQ中,offset用来管理每个消息队列的不同消费组的消费进度。对offset的管理分为本地模式和远程模式,本地模式是以文本文件的形式存储在客户端,而远程模式是将数据保存到broker端,对应的数据结构分别为LocalFileOffsetStore和RemoteBrokerOffsetStore。
默认情况下,当消费模式为广播模式时,offset使用本地模式存储,因为每条消息会被所有的消费者消费,每个消费者管理自己的消费进度,各个消费者之间不存在消费进度的交集;当消费模式为集群消费时,则使用远程模式管理offset,消息会被多个消费者消费,不同的是每个消费者只负责消费其中部分消息,添加或删除消费者,都会使负载发生变动,容易造成消费进度冲突,因此需要集中管理。同时,RocketMQ也提供接口供用户自己实现offset管理(实现OffsetStore接口)。
生产环境上一般使用集群模式,这里主要记录集群模式下offset的管理,即RemoteBrokerOffsetStore。
broke端
offset的存储与加载
rocketMQ的broker端中,offset的是以json的形式持久化到磁盘文件中,文件路径为${user.home}/store/config/consumerOffset.json。其内容示例如下:
{
"offsetTable": {
"test-topic@test-group": {
"0": 88526,
"1": 88528
}
}
}
broker端启动后,会调用BrokerController.initialize()方法,方法中会对offset进行加载,consumerOffsetManager.load()。获取文件内容后,序列化为ConsumerOffsetManager对象,实质是其属性ConcurrentMap<String,ConcurrentMap<Integer, Long>> **offsetTable,**offsetTable的数据结构为ConcurrentMap,是一个线程安全的容器,key的形式为topic@group(每个topic下不同消费组的消费进度),value也是一个ConcurrentMap,key为queueId,value为消费位移(这里不是offset而是位移)。通过对全局ConsumerOffsetManager对象就可以对各个topic下不同消费组的消费位移进行获取与管理。
/**ConsumerOffsetManager.offsetTable*/
private ConcurrentMap<String/* topic@group */, ConcurrentMap<Integer, Long>> offsetTable =
new ConcurrentHashMap<String, ConcurrentMap<Integer, Long>>(512);
/**ConsumerOffsetManager.decode*/
public void decode(String jsonString) {
if (jsonString != null) {
// 序列化成功后复制给全局ConsumerOffsetManager对象
ConsumerOffsetManager obj = RemotingSerializable.fromJson(jsonString, ConsumerOffsetManager.class);
if (obj != null) {
this.offsetTable = obj.offsetTable;
}
}
}
标签:消费,offsetTable,模式,ConsumerOffsetManager,topic,全站,offset,源码,硬核 来源: https://blog.csdn.net/GBS20200720/article/details/122750634
本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享; 2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关; 3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关; 4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除; 5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。