ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

RabbitMQ-AMQP模型详解二

2021-10-28 15:34:15  阅读:229  来源: 互联网

标签:RabbitTemplate AMQP 确认 RabbitMQ 详解 消息 message channel String


RabbitMQ-AMQP模型详解_踩踩踩从踩的博客-CSDN博客

前言

上篇文章介绍了AMQP得流程,以及介绍Vhost Host、连接  、通道 、RoutingKey、exchange、绑定、message等组件;这篇文章会继续介绍AMQP中重要的概念,生产路由不可达,以及可靠的发布 事务机制,发布确认机制,消费者独占等机制

publisher

路由不可达

当消息发送给交换器或队列,在发送中,出现没有队列。

  • 交换没有绑定队列
  • 交换没法根据消息的路由key把消息路由到队列。

可以处理的情况 但是别抛异常 只是为找到交换器之类的

  • 退回
  • 死信队列(备用交换)

退回

void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props,
byte[] body)
mandatory : true 强制退回, false 不需退回,直接丢弃。 在发送数据时设置  设置返回消息的回调处理
channel.addReturnListener(returnMessage -> {
				try {
					System.out.println("收到退回消息:" + new String(returnMessage.getBody(), "UTF-8"));
				} catch (UnsupportedEncodingException e) {
					e.printStackTrace();
				}
			});

在spring中使用

  • 设置消息不可以路由退回,设置消息退回回调 【注意】一个 RabbitTemplate 只能设置一个 ReturnCallback
@Bean
	public RabbitTemplate busiARabbitTemplate(ConnectionFactory connectionFactory) {
		RabbitTemplate template = new RabbitTemplate(connectionFactory);
		template.setMandatory(true); // 设置消息不可以路由退回
		// 设置消息退回回调 【注意】一个 RabbitTemplate 只能设置一个 ReturnCallback
		template.setReturnCallback(myReturnCallback());
		return template;
	}
  • replyCode broker的回应码 replyText 回应描述
private ReturnCallback myReturnCallback() {
		return new ReturnCallback() {
			@Override
			// replyCode broker的回应码 replyText 回应描述
			public void returnedMessage(Message message, int replyCode, String replyText, String exchange,
					String routingKey) {

				// 在这里写退回处理逻辑
				System.out.println("收到回退消息 replyCode=" + replyCode + " replyText=" + replyText + " exchange=" + exchange
						+ " routingKey=" + routingKey);

				System.out.println(" 消息:" + message);
			}
		};
	}

在spring中,要重写 ReturnCallback,如果在spring中 配置文件中添加属性配置,这个是没有用的。在返回的数据可以知道

备用交换

  • policy  设置好策略,
rabbitmqctl set_policy mike "^my-direct$" '{"alternate-exchange":"my-ae"}'
#对那些交换器进行匹配  指定备用交换器
  • 代码中声明交换时通过参数指定备用交换
//声明参数 
Map<String, Object> args = new HashMap<String, Object>(); 
args.put("alternate-exchange", "my-ae"); 
//备用交换参数指定 
channel.exchangeDeclare("my-direct", "direct", false, false, args);
channel.exchangeDeclare("my-ae", "fanout"); 
channel.queueDeclare("routed"); 
channel.queueBind("routed", "my-direct", "key1"); 
channel.queueDeclare("unrouted");
channel.queueBind("unrouted", "my-ae", "");

加上备用参数进行指定上 myae上通道上去。

事务机制

怎么确认可靠发布,这就是事务机制要做的事情,保证网络传输的可靠发布。保证一个收,要么都收,无论是数据库,还是mq都是一样的,都是通过保证的。

当方法里面发布消息,并且需要做其他事情时,所以开启事务

spring 事务管理需要的组件 

事务管理器 TransactionManager  
@Configuration public class TxConfiguration { 
        @Bean 
        // 配置事务管理器 
   public RabbitTransactionManager rabbitTransactionManager(ConnectionFactory connectionFactory) { 
        return new RabbitTransactionManager(connectionFactory); 
    }
 }
在 spring 该怎么玩事务就怎么玩 .   RabbitTransactionManager 只能做 Rabbitmq 的消息事务管理 只能是单连接的连接工厂 如果方法中,即要做数据库又要做rabbitmq,它没办法实现的。 没有分布式事务管理器实现。 rabbitmq 中事务机制来保证消息的可靠发布,性能是比较差  这是相对的发布确认机制。 调用 开启
@Transactional
	public void send(int i) {
		// 一定要设置ChannelTransacted(true) 表示开启通道事务
		this.template.setChannelTransacted(true);
		String message = "Hello World!-" + i;
		this.template.convertAndSend(queue.getName(), message);
		System.out.println(" [x] Sent '" + message + "'");
		if (i % 2 == 0)
			throw new RuntimeException();
	}

发布确认机制

性能是事务机制的 250 倍。 发布者发布消息,一般是走异步。 channel 有三种确认模式
  • 异步流式确认 事件驱动  优点 :开销低,吞吐量大  
  • 批量发布确认 批次等待,确认不ok 一批重发  
  • 单条确认 发一条就等待确认 
broker 给出确认会有三种结果
  • ack 接收成功
  • nack 接收失败
  • 发布者收不到Broker的确认(超时)

这都是确认会出现的情况。

异步流式确认

  • 开启发布确认模式 就不能再做事务管理了
  • 待确认消息的Map
  • 指定流式确认事件回调处理
  • 从Map中移除对应的消息
  • 重发,或做其他处理
// 1 开启发布确认模式 就不能再做事务管理了
			channel.confirmSelect();
			// 2 待确认消息的Map
			Map<Long, String> messagesMap = new ConcurrentHashMap<>();

			// 3 指定流式确认事件回调处理
			channel.addConfirmListener((deliveryTag, multiple) -> { // multiple表示是否是多条的确认
				System.out.println("收到OK ack:deliveryTag=" + deliveryTag + " multiple=" + multiple + ",从Map中移除消息");
				// 从Map中移除对应的消息
				messagesMap.remove(deliveryTag);
			}, (deliveryTag, multiple) -> {
				System.out.println("收到 NON OK ack:deliveryTag=" + deliveryTag + " multiple=" + multiple + " 从Map中移除消息,重发或做其他处理");
				// 从Map中移除对应的消息
				String message = messagesMap.remove(deliveryTag);
				// 重发,或做其他处理
				System.out.println("失败消息:" + message);
			});

for (int i = 1; i < 100; i++) {
				// 消息内容
				String message = "消息" + i;
				// 4 将消息放入到map中
				messagesMap.put(channel.getNextPublishSeqNo(), message);
				// 5、发送消息
				channel.basicPublish("mandatory-ex", "", true, null, message.getBytes());
				System.out.println("发布消息:" + message);

				Thread.sleep(2000L);
			}

在spring中添加 publisher-confirms 开启消息确认 

这里做流式确认 设置回调 发送

// 配置RabbitTemplate Bean
	@Bean
	public RabbitTemplate busiARabbitTemplate(ConnectionFactory connectionFactory) {
		RabbitTemplate template = new RabbitTemplate(connectionFactory);
		// 设置发布确认回调,一个RabbitTemplate只可设置一个回调。
		template.setConfirmCallback(confirmCallback());

		return template;
	}

Consumer

消费者的两种消息模式、消费者 注册 取消 、独占消费者 消费者 优先级  消息确认  pull 拉模式消费。这几种模式。

两种消费模式

  • push 推模式
  • pull 拉模式

在这两种模式下面的问题出现

push 模式

broker client 消费者 client 向 broker 注册对某个队列的消费者
// 对感兴趣的队列注册消费者,返回Server生成的consumerTag(消费者标识) String consumerTag = channel.basicConsume(queueName, true, callback, consumerTag -> {});
取消消费者注册
channel.basicCancel(consumerTag);

独占消费者

独占队列:被创建它的连接独占 这个连接上的 channel 可以共享。连接关闭,独占队列没有了。 独占消费者:消费者独占一个队列进行消息消费,适用场景: 消息一定要严格按序消费处理。 一旦独占消费者挂了的话,消息队列里面的数据就会一直存在着,因此需要备用的 在spring中 如何添加 只需要添加 exclusive设置为true就可以了

 

采用不断的重试去抢独占,也是防止被挂了。

标签:RabbitTemplate,AMQP,确认,RabbitMQ,详解,消息,message,channel,String
来源: https://blog.csdn.net/qq_33373609/article/details/121006662

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有