ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

RocketMQ源码解析十一(Consumer上报消费进度流程(集群模式))

2021-09-28 16:01:54  阅读:240  来源: 互联网

标签:消费 requestHeader 源码 mq 进度 offset Consumer final RocketMQ


RocketMQ版本4.6.0,记录自己看源码的过程

Consumer

在消费者启动过程中,会启动MQClientInstance,而MQClientInstance中会启动多个定时任务,其中就包括定时上报消费进度:

private void startScheduledTask() {
    
    // 省略其它定时任务。。。

    // 定时持久化消费进度,默认5s
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

        @Override
        public void run() {
            try {
                MQClientInstance.this.persistAllConsumerOffset();
            } catch (Exception e) {
                log.error("ScheduledTask persistAllConsumerOffset exception", e);
            }
        }
    }, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);

    // 省略其它定时任务。。。
}

/**
 * 持久化全部消费进度
 */
private void persistAllConsumerOffset() {
    // 获取该JVM上全部消费者实例
    Iterator<Entry<String, MQConsumerInner>> it = this.consumerTable.entrySet().iterator();
    while (it.hasNext()) {
        Entry<String, MQConsumerInner> entry = it.next();
        MQConsumerInner impl = entry.getValue();
        // 调用各个消费者的持久化接口
        impl.persistConsumerOffset();
    }
}

调用各个消费者的上报接口
DefaultMQPushConsumerImpl

/**
 * 持久化当前消费者消费进度
 */
@Override
public void persistConsumerOffset() {
    try {
        this.makeSureStateOK();
        Set<MessageQueue> mqs = new HashSet<MessageQueue>();
        // 获取该消费者分配的消息队列
        Set<MessageQueue> allocateMq = this.rebalanceImpl.getProcessQueueTable().keySet();
        mqs.addAll(allocateMq);
        // 使用消费进度组件持久化全部队列消费进度
        this.offsetStore.persistAll(mqs);
    } catch (Exception e) {
        log.error("group: " + this.defaultMQPushConsumer.getConsumerGroup() + " persistConsumerOffset exception", e);
    }
}

使用消费进度组件上报该消费者的所有消费队列消费进度

/**
 * 持久化消费进度
 */
@Override
public void persistAll(Set<MessageQueue> mqs) {
    if (null == mqs || mqs.isEmpty())
        return;

    final HashSet<MessageQueue> unusedMQ = new HashSet<MessageQueue>();

    // 遍历每个队列当前的消费进度
    for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) {
        MessageQueue mq = entry.getKey();
        // 消费进度
        AtomicLong offset = entry.getValue();
        if (offset != null) {
            if (mqs.contains(mq)) {
                try {
                    // 更新消费进度到broker中
                    this.updateConsumeOffsetToBroker(mq, offset.get());
                    log.info("[persistAll] Group: {} ClientId: {} updateConsumeOffsetToBroker {} {}",
                        this.groupName,
                        this.mQClientFactory.getClientId(),
                        mq,
                        offset.get());
                } catch (Exception e) {
                    log.error("updateConsumeOffsetToBroker exception, " + mq.toString(), e);
                }
            } else {
                unusedMQ.add(mq);
            }
        }
    }

    if (!unusedMQ.isEmpty()) {
        for (MessageQueue mq : unusedMQ) {
            this.offsetTable.remove(mq);
            log.info("remove unused mq, {}, {}", mq, this.groupName);
        }
    }
}

/**
 * 以单向方式更新消费进度到broker
 */
private void updateConsumeOffsetToBroker(MessageQueue mq, long offset) throws RemotingException,
    MQBrokerException, InterruptedException, MQClientException {
    updateConsumeOffsetToBroker(mq, offset, true);
}

/**
 * Update the Consumer Offset synchronously, once the Master is off, updated to Slave, here need to be optimized.
 */
@Override
public void updateConsumeOffsetToBroker(MessageQueue mq, long offset, boolean isOneway) throws RemotingException,
    MQBrokerException, InterruptedException, MQClientException {
    // 获得一个broker地址,正常情况下都是master broker
    FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
    if (null == findBrokerResult) {
        this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
        findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
    }

    if (findBrokerResult != null) {
        // 构建更新队列消费进度得请求数据
        UpdateConsumerOffsetRequestHeader requestHeader = new UpdateConsumerOffsetRequestHeader();
        requestHeader.setTopic(mq.getTopic());
        requestHeader.setConsumerGroup(this.groupName);
        requestHeader.setQueueId(mq.getQueueId());
        requestHeader.setCommitOffset(offset);

        if (isOneway) {
            // 单向发送更新进度请求
            this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffsetOneway(
                findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
        } else {
            this.mQClientFactory.getMQClientAPIImpl().updateConsumerOffset(
                findBrokerResult.getBrokerAddr(), requestHeader, 1000 * 5);
        }
    } else {
        throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
    }
}

以单向的方式发送请求,请求码为UPDATE_CONSUMER_OFFSET

public void updateConsumerOffsetOneway(
    final String addr,
    final UpdateConsumerOffsetRequestHeader requestHeader,
    final long timeoutMillis
    ) throws RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException,
InterruptedException {
    RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_CONSUMER_OFFSET, requestHeader);

    this.remotingClient.invokeOneway(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis);
}

Broker

broker处理更新消费进度的处理器是ConsumerManageProcessor

@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)
    throws RemotingCommandException {
    switch (request.getCode()) {
        case RequestCode.GET_CONSUMER_LIST_BY_GROUP:
            return this.getConsumerListByGroup(ctx, request);
        // 处理consumer发过来的更新消费进度的请求
        case RequestCode.UPDATE_CONSUMER_OFFSET:
            return this.updateConsumerOffset(ctx, request);
        case RequestCode.QUERY_CONSUMER_OFFSET:
            return this.queryConsumerOffset(ctx, request);
        default:
            break;
    }
    return null;
}

/**
 * 更新消息队列消费进度
 */
private RemotingCommand updateConsumerOffset(ChannelHandlerContext ctx, RemotingCommand request)
    throws RemotingCommandException {
    final RemotingCommand response =
        RemotingCommand.createResponseCommand(UpdateConsumerOffsetResponseHeader.class);
    final UpdateConsumerOffsetRequestHeader requestHeader =
        (UpdateConsumerOffsetRequestHeader) request
            .decodeCommandCustomHeader(UpdateConsumerOffsetRequestHeader.class);
    // 向消费进度管理组件提交消费进度
    this.brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.getConsumerGroup(),
        requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset());
    response.setCode(ResponseCode.SUCCESS);
    response.setRemark(null);
    return response;
}

向消费进度管理组件提交消费进度,消费进度统一由消费进度管理器管理

/**
 * 用来管理消费者的消费进度
 */

public class ConsumerOffsetManager extends ConfigManager {

    /**
     * 消费进度表
     */
    private ConcurrentMap<String/* topic@group */, ConcurrentMap<Integer, Long>> offsetTable =
        new ConcurrentHashMap<String, ConcurrentMap<Integer, Long>>(512);

    /**
     * 上报消费进度
     * @param clientHost 消费端地址
     * @param group      消费组
     * @param topic      主题
     * @param queueId    队列id
     * @param offset     要更新的进度
     */
    public void commitOffset(final String clientHost, final String group, final String topic, final int queueId,
        final long offset) {
        // topic@group
        String key = topic + TOPIC_GROUP_SEPARATOR + group;
        this.commitOffset(clientHost, key, queueId, offset);
    }

    private void commitOffset(final String clientHost, final String key, final int queueId, final long offset) {
        ConcurrentMap<Integer, Long> map = this.offsetTable.get(key);
        // 第一次上报消费进度
        if (null == map) {
            map = new ConcurrentHashMap<Integer, Long>(32);
            map.put(queueId, offset);
            this.offsetTable.put(key, map);
        } else {
            // 更新队列的消费进度
            Long storeOffset = map.put(queueId, offset);
            if (storeOffset != null && offset < storeOffset) {
                log.warn("[NOTIFYME]update consumer offset less than store. clientHost={}, key={}, queueId={}, requestOffset={}, storeOffset={}", clientHost, key, queueId, offset, storeOffset);
            }
        }
    }
}

这部分逻辑还是很清晰的。
在这里插入图片描述

参考资料

《儒猿技术窝——从 0 开始带你成为消息中间件实战高手》

标签:消费,requestHeader,源码,mq,进度,offset,Consumer,final,RocketMQ
来源: https://blog.csdn.net/hsz2568952354/article/details/120422743

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有