ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

RocketMq顺序消费

2020-11-22 19:58:01  阅读:270  来源: 互联网

标签:顺序 producer 队列 接口 MessageListenerOrderly 消费 message RocketMq


RocketMq如何保证消息的顺序消费:

由于RocketMq的消息都是存储在topic中,而topic中又有不同的队列,RocketMq会自动进行负载均衡处理,使消息尽量均匀的分布到不同的队列中去,而队列的属性又是先进先出,所以我们只需要确保把消息发送到同一个队列中,消费者单线程进行消费,就可以确保消息的顺序性。

producer代码:

for(int i=0;i<20;i++) {
			Message message=new Message("order_producer_topic", ("hello !"+i).getBytes());
			//顺序发送,自定义发送到那个队列的计算方式,
			try {
				producer.send(message, new MessageQueueSelector() {
					public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
						//根据传入参数计算应该放到那个队列中
						int  queueId=Integer.parseInt(arg.toString())%mqs.size();
						return mqs.get(queueId);
					}
				}, 8//该参数用于select()计算
				);
				
				//RocketMq还提供了定义好的计算队列方式  该方式是通过传入的tag 进行hash计算,然后对写队列值进行取余运算
				message.setTags("oeder message");
				producer.send(message, new SelectMessageQueueByHash(), message.getTags());
				
				//该方式是通过随机数的方式进行计算放到那个队列中, 在该方式中,第三个参数不参与计算
				producer.send(message, new SelectMessageQueueByRandoom(), "");
				
			} catch (Exception e) {
				System.out.println("顺序发送失败"+e);
			}
		}

consumer代码:

	//消费端代码      注册一个MessageListenerOrderly()进行有序监听
		consumer.registerMessageListener(new MessageListenerOrderly() {
			public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
				for (MessageExt messageExt : msgs) {
					String str=new String(messageExt.getBody());
					System.out.println("The message queue id is "+messageExt.getQueueId()+"============ message is ["+str+"]");
				}
				return ConsumeOrderlyStatus.SUCCESS;
			}
		});

producer中MessageQueueSelector接口解析

在producer中MessageQueueSelector是一个接口,可以自己实现改接口,RocketMq自己提供了三种实现方式,分别是SelectMessageQueueByHash、SelectMessageQueueByMachineRoom、SelectMessageQueueByRandoom。

其中SelectMessageQueueByHash是通过传入参数的hashcode()值与写队列数量进行取余运算,来决定该消息放到那个队列中去。

SelectMessageQueueByMachineRoom(根据机房来选择发往哪个队列,支付宝逻辑机房使用) 这是该实现类自带的注释,用于阿里自己的业务需求,可以忽略。

SelectMessageQueueByRandoom是使用随机数和写队列数量进行取余运算,随机把消息放到一个队列中去。

consumer中MessageListenerOrderly接口解析

MessageListenerOrderly和MessageListenerConcurrently都是继承MessageListener接口,区别在于MessageListenerOrderly是同一队列的消息同一时刻只能一个线程消费,

可保证消息在同一队列严格有序消费而MessageListenerConcurrently接口是同一队列的消息并行消费

标签:顺序,producer,队列,接口,MessageListenerOrderly,消费,message,RocketMq
来源: https://blog.csdn.net/wang_jun_jie/article/details/109960517

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有