ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

broker处理发送消息判断逻辑

2021-10-28 19:04:23  阅读:257  来源: 互联网

标签:topic 逻辑 topicConfig 重试 broker 发送 requestHeader return response


SendMessageProcessor

asyncSendMessage

private CompletableFuture<RemotingCommand> asyncSendMessage(ChannelHandlerContext ctx, RemotingCommand request,
                                                                SendMessageContext mqtraceContext,
                                                                SendMessageRequestHeader requestHeader) {
	final RemotingCommand response = preSend(ctx, request, requestHeader); // 创建response的RemotingCommand
	.........
}

preSend

判断是否还没到服务时间

 final long startTimestamp = this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp();
// 现在的时间 < startTimestamp (startAcceptSendRequestTimeStamp) 相当于现在是非工作时间 还没到服务时间
if (this.brokerController.getMessageStore().now() < startTimestamp) {
	response.setCode(ResponseCode.SYSTEM_ERROR);
	response.setRemark(String.format("broker unable to service, until %s", UtilAll.timeMillisToHumanString2(startTimestamp)));
	return response;
}

AbstractSendMessageProcessor

msgCheck

验证topic名称是否符合

// 验证名称是否合法  是否为空、是否合法的字符、长度不能大于127
if (!TopicValidator.validateTopic(requestHeader.getTopic(), response)) {
	return response;
}

验证当前topic是否在不允许发送队列中

默认有SCHEDULE_TOPIC_XXXX 如果存在是不允许发送

if (TopicValidator.isNotAllowedSendTopic(requestHeader.getTopic(), response)) {
	return response;
}

验证topic信息最终是否存在

# 如果存在直接返回topicConfig
# 如果topicConfig 不存在,DefaultTopic也不存在直接返回null
# 如果topicConfig 不存在,DefaultTopic存在,则会创建topic配置信息,从DefaultTopic复制信息
topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageMethod(
	requestHeader.getTopic(),
	requestHeader.getDefaultTopic(),
	RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
	requestHeader.getDefaultTopicQueueNums(), topicSysFlag);
# 如果是重试topic,则会创建RETRY队列
if (null == topicConfig) { // 消息消费失败时,消费者会将消息发往retry队列,等待重试
	if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { //topic是%RETRY%开头的
		topicConfig = // 这里创建不判断
			this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(
				requestHeader.getTopic(), 1, PermName.PERM_WRITE | PermName.PERM_READ,
				topicSysFlag); // topicConfigTable存在则直接返回,不存在则创建RETRY队列配置信息
	}
}
# 最终还是没有,则返回失败ResponseCode.TOPIC_NOT_EXIST
if (null == topicConfig) { // 到这里还是没有配置信息,则返回失败
	response.setCode(ResponseCode.TOPIC_NOT_EXIST); // topic是否存在
	response.setRemark("topic[" + requestHeader.getTopic() + "] not exist, apply first please!"
		+ FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));
	return response;
}

验证queueId

验证queutId 不能大于等于该broker的读或写的最大queueId

int queueIdInt = requestHeader.getQueueId();
int idValid = Math.max(topicConfig.getWriteQueueNums(), topicConfig.getReadQueueNums());
if (queueIdInt >= idValid) { // 验证queutId 不能大于等于读或写的最大queueId  因为queueId从0开始
	String errorInfo = String.format("request queueId[%d] is illegal, %s Producer: %s",
		queueIdInt,
		topicConfig.toString(),
		RemotingHelper.parseChannelRemoteAddr(ctx.channel()));

	log.warn(errorInfo);
	response.setCode(ResponseCode.SYSTEM_ERROR);
	response.setRemark(errorInfo);

	return response;
}

queueIdInt <0 ,则从该broker的写队列随机取1个索引

// 无效的queutId,则随机取1个

if (queueIdInt < 0) {
	queueIdInt = randomQueueId(topicConfig.getWriteQueueNums()); 
}

handleRetryAndDLQ

// 非重试Topic则返回true
private boolean handleRetryAndDLQ(SendMessageRequestHeader requestHeader, RemotingCommand response,
								  RemotingCommand request,
								  MessageExt msg, TopicConfig topicConfig) {
	String newTopic = requestHeader.getTopic();
	if (null != newTopic && newTopic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {  // 判断是否包含重试topic前缀(%RETRY%)
		String groupName = newTopic.substring(MixAll.RETRY_GROUP_TOPIC_PREFIX.length()); // 消费组
		SubscriptionGroupConfig subscriptionGroupConfig = // 获取订阅组配置
			this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(groupName);
		if (null == subscriptionGroupConfig) {
			response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
			response.setRemark(
				"subscription group not exist, " + groupName + " " + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST));
			return false;
		}
		// 获取订阅组配置的最大重试次数  默认:16 SubscriptionGroupConfig
		int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();
		if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) {
			maxReconsumeTimes = requestHeader.getMaxReconsumeTimes(); // 从请求头获取最大重试次数
		}
		int reconsumeTimes = requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes(); // 获取当前重试次数
		if (reconsumeTimes >= maxReconsumeTimes) { //条件满足,则已经达到最大重试次数16
			newTopic = MixAll.getDLQTopic(groupName); // 超过最大重试次数,生成死信队列topic,  生成规则: %DLQ% + groupName
			int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;
			topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,  // 不存在则创建死信队列,存在则直接返回
				DLQ_NUMS_PER_GROUP, // 1
				PermName.PERM_WRITE, 0
			);
			msg.setTopic(newTopic);
			msg.setQueueId(queueIdInt);
			if (null == topicConfig) { // 不存在死信队列,返回错误信息
				response.setCode(ResponseCode.SYSTEM_ERROR);
				response.setRemark("topic[" + newTopic + "] not exist");
				return false;
			}
		}
	}
	int sysFlag = requestHeader.getSysFlag();
	if (TopicFilterType.MULTI_TAG == topicConfig.getTopicFilterType()) {
		sysFlag |= MessageSysFlag.MULTI_TAGS_FLAG;
	}
	msg.setSysFlag(sysFlag);
	return true;
}

标签:topic,逻辑,topicConfig,重试,broker,发送,requestHeader,return,response
来源: https://blog.csdn.net/kq1983/article/details/121008315

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有