ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

Rebalance

2021-12-16 20:30:36  阅读:139  来源: 互联网

标签:doRebalance log consumerGroup topic mqSet null Rebalance


RebalanceService

run

public void run() {
	log.info(this.getServiceName() + " service started");

	while (!this.isStopped()) {
		this.waitForRunning(waitInterval); // 等待20s,然后超时自动释放锁执行doRebalance
		this.mqClientFactory.doRebalance(); // 具体逻辑
	} 
	log.info(this.getServiceName() + " service end");
}

doRebalance

public void doRebalance() {
	for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
		MQConsumerInner impl = entry.getValue();
		if (impl != null) {
			try { // 其中之一是DefaultMQPushConsumerImpl
				impl.doRebalance(); // 具体逻辑
			} catch (Throwable e) {
				log.error("doRebalance exception", e);
			}
		}
	}
}

DefaultMQPushConsumerImpl

doRebalance

public void doRebalance() {
	if (!this.pause) { // RebalancePushImpl
		this.rebalanceImpl.doRebalance(this.isConsumeOrderly());
	}
}

RebalanceImpl

doRebalance

public void doRebalance(final boolean isOrder) { // isOrder=true 顺序消息
	Map<String, SubscriptionData> subTable = this.getSubscriptionInner(); // key:topic  该topic订阅信息 push方式 consumerGroup=null
	if (subTable != null) {
		for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
			final String topic = entry.getKey();  //topic
			try {
				this.rebalanceByTopic(topic, isOrder);
			} catch (Throwable e) {
				if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
					log.warn("rebalanceByTopic Exception", e);
				}
			}
		}
	}

	this.truncateMessageQueueNotMyTopic();
}

rebalanceByTopic

private void rebalanceByTopic(final String topic, final boolean isOrder) {
	switch (messageModel) {
		case BROADCASTING: { // 广播
			Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
			if (mqSet != null) {
				boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
				if (changed) {
					this.messageQueueChanged(topic, mqSet, mqSet);
					log.info("messageQueueChanged {} {} {} {}",
						consumerGroup,
						topic,
						mqSet,
						mqSet);
				}
			} else {
				log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
			}
			break;
		}
		case CLUSTERING: { // 集群
			Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic); // 根据topic获得MessageQueue  总的MessageQueue
			List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup); // todo 得到某个broker下,该consumerGroup下的所有的消费者 (相当于全部消费者)
			if (null == mqSet) {
				if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
					log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
				}
			}

			if (null == cidAll) {
				log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);
			}

			if (mqSet != null && cidAll != null) {
				List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
				mqAll.addAll(mqSet);

				Collections.sort(mqAll); // 根据queueId排序
				Collections.sort(cidAll);

				AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;

				List<MessageQueue> allocateResult = null;
				try { // todo 重新负载均衡
					allocateResult = strategy.allocate( // todo AllocateMessageQueueStrategy
						this.consumerGroup,
						this.mQClientFactory.getClientId(), // 当前ClientId
						mqAll,
						cidAll);
				} catch (Throwable e) {
					log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
						e);
					return;
				}

				Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
				if (allocateResult != null) {
					allocateResultSet.addAll(allocateResult);
				}

				boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
				if (changed) {
					.........................
					this.messageQueueChanged(topic, mqSet, allocateResultSet);
				}
			}
			break;
		}
		default:
			break;
	}
}

标签:doRebalance,log,consumerGroup,topic,mqSet,null,Rebalance
来源: https://blog.csdn.net/kq1983/article/details/121977062

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有