ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

全站最硬核 百万字强肝RocketMq源码 火热更新中~(九十二)延时队列

2022-01-31 09:35:04  阅读:163  来源: 互联网

标签:全站 CommitLogDispatcherBuildConsumeQueue 调用 TRANSACTION putMessagePositionInfo Me


this.dispatcherList = new LinkedList<>();
this.dispatcherList.addLast(new CommitLogDispatcherBuildConsumeQueue());
this.dispatcherList.addLast(new CommitLogDispatcherBuildIndex());

doDispatch()会遍历CommitLogDispatcher,调用它们的dispatch()方法。其中专门用来通知ConsumeQueue的Dispatcher是CommitLogDispatcherBuildConsumeQueue

class CommitLogDispatcherBuildConsumeQueue implements CommitLogDispatcher {

        @Override
        public void dispatch(DispatchRequest request) {
            final int tranType = MessageSysFlag.getTransactionValue(request.getSysFlag());
            switch (tranType) {
                case MessageSysFlag.TRANSACTION_NOT_TYPE:
                case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
                    DefaultMessageStore.this.putMessagePositionInfo(request);
                    break;
                case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
                case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
                    break;
            }
        }
    }

当ReputMessageService调用了CommitLogDispatcherBuildConsumeQueue的dispatch()后,CommitLogDispatcherBuildConsumeQueue便会调用 DefaultMessageStore.this.putMessagePositionInfo(request):

public void putMessagePositionInfo(DispatchRequest dispatchRequest) {
    ConsumeQueue cq = this.findConsumeQueue(dispatchRequest.getTopic(), dispatchRequest.getQueueId());
    cq.putMessagePositionInfoWrapper(dispatchRequest);
}

putMessagePositionInfo()逻辑有两步:

1:调用findConsumeQueue(),根据消息的topic以及消息所属的ConsumeQueueId,找到对应的ConsumeQueue。

findConsumeQueue()会先从consumeQueueTable中查询topic的ConsumeQueueMap,如果未找到,便会为Topic创建一个新的ConcurrentMap<Integer/* queueId */, ConsumeQueue>,存放到表中。

接着在从Topic的ConcurrentMap中,根据QueueId,查询ConsumeQueue,如果未找到,便也会创建一个新的ConsumeQueue,存放到Map中。ConsumeQueue便是此时被创建的。

2:当找到消息对应的ConsumeQueue后,便调用ConsumeQueue的putMessagePositionInfoWrapper()方法,更新ConsumeQueue。

ConsumeQueue的更新

上面主要讲了ReputMessageService是如何通知ConsumeQueue的,现在我们就要看看ConsumeQueue收到通知后是如何更新的,更新逻辑就在putMessagePositionInfoWrapper()中。

putMessagePositionInfoWrapper()中调用了putMessagePositionInfo(),并引入了重试机制。

我们来看看putMessagePositionInfo()中的主要逻辑:

1:判断消息是否已经被处理过

 if (offset <= this.maxPhysicOffset) {
            return true;
 }

maxPhysicOffset记录了上一次ConsumeQueue更新的消息在CommitLog中的偏移量,如果本次消息偏移量小于maxPhysicOffset,则表明消息已经被更新过,直接返回。

标签:全站,CommitLogDispatcherBuildConsumeQueue,调用,TRANSACTION,putMessagePositionInfo,Me
来源: https://blog.csdn.net/GBS20200720/article/details/122758150

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有