ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

【架构师面试-消息队列-5】-MQ消息可靠性实战源码解决方案

2021-11-24 10:02:13  阅读:188  来源: 互联网

标签:确认 签收 投递 源码 MQ 消息 监听器 交换机 架构师


1:引言

如果保证消息的可靠性?需要解决如下问题

问题1:生产者能百分之百将消息发送给消息队列!

两种意外情况:

第一,消费者发送消息给MQ失败,消息丢失;

第二,交换机路由到队列失败,路由键写错;

问题2:消费者能百分百接收到请求,且业务执行过程中还不能出错!

2:生产者确认

在使用 RabbitMQ 的时候,作为消息发送方希望杜绝任何消息丢失或者投递失败场景。RabbitMQ 为我们提供了两种方式用来控制消息的投递可靠性模式。

confirm 确认模式

return 退回模式

rabbitmq 整个消息投递的路径为:

消息从生产者(producer)发送消息到交换机(exchange),不论是否成功,都会执行一个确认回调方法confirmCallback 。

消息从交换机(exchange)到消息队列( queue )投递失败则会执行一个返回回调方法 returnCallback。

我们将利用这两个 callback 控制消息的可靠性投递

1:confirm 确认模式

目标

演示消息确认模式效果

生产者发布消息确认模式特点,不论消息是否进入交换机均执行回调方法

实现步骤

1. 在配置文件中,开启生产者发布消息确认模式

2. 编写生产者确认回调方法

3. 在RabbitTemplate中,设置消息发布确认回调方法

4. 请求测试:

测试成功回调:

测试失败回调:

实现过程

1. 在配置文件中,开启生产者发布消息确认模式

# 开启生产者确认模式:(confirm),投递到交换机,不论失败或者成功都回调
spring.rabbitmq.publisher-confirms=true

2. 编写生产者确认回调方法

//发送消息回调确认类,实现回调接口ConfirmCallback,重写其中confirm()方法
@Component
public class MessageConfirmCallback implements
RabbitTemplate.ConfirmCallback {
/**
* 投递到交换机,不论投递成功还是失败都回调次方法
* @param correlationData 投递相关数据
* @param ack 是否投递到交换机
* @param cause 投递失败原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack,
String cause) {
if (ack){
System.out.println("消息进入交换机成功{}");
} else {
System.out.println("消息进入交换机失败{} , 失败原因:" + cause);
}
}
}

3. 在RabbitTemplate中,设置消息发布确认回调方法

@Component
public class MessageConfirmCallback implements
RabbitTemplate.ConfirmCallback{
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 创建RabbitTemplate对象之后执行当前方法,为模板对象设置回调确认方法
* 设置消息确认回调方法
* 设置消息回退回调方法
*/
@PostConstruct
public void initRabbitTemplate(){
//设置消息确认回调方法
rabbitTemplate.setConfirmCallback(this::confirm);
}
/**
* 投递到交换机,不论投递成功还是失败都回调次方法
* @param correlationData 投递相关数据
* @param ack 是否投递到交换机
* @param cause 投递失败原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack,
String cause) {
if (ack){
System.out.println("消息进入交换机成功{}");
} else {
System.out.println("消息进入交换机失败{} , 失败原因:" + cause);
}
}
}

4. 请求测试:

测试成功回调: http://localhost:8080/direct/sendMsg?exchange=order_exchange&routingkey=order.A&msg=购买苹果手机

测试失败回调: http://localhost:8080/direct/sendMsg?

exchange=order_xxxxxxx&routingkey=order.A&msg=购买苹果手机

2:return 退回模式

目标

演示消息回退模式效果

消息回退模式特点:消息进入交换机,路由到队列过程中出现异常则执行回调方法

实现步骤

1. 在配置文件中,开启生产者发布消息回退模式

2. 在MessageConfirmCallback类中,实现接口RabbitTemplate.ReturnCallback

3. 并重写RabbitTemplate.ReturnCallback接口中returnedMessage()方法

4. 在RabbitTemplate中,设置消息发布回退回调方法

5. 请求测试:

测试成功回调:

测试失败回调:

实现过程

1. 在配置文件中,开启生产者发布消息回退模式

# 开启生产者回退模式:(returns),交换机将消息路由到队列,出现异常则回调
spring.rabbitmq.publisher-returns=true

2. 在MessageConfirmCallback类中,实现接口RabbitTemplate.ReturnCallback

@Component
public class RabbitConfirm implements RabbitTemplate.ConfirmCallback
,RabbitTemplate.ReturnCallback {
//..省略
}

3. 并重写RabbitTemplate.ReturnCallback接口中returnedMessage()方法

/**
* 当消息投递到交换机,交换机路由到消息队列中出现异常,执行returnedMessaged方
法
* @param message 投递消息内容
* @param replyCode 返回错误状态码
* @param replyText 返回错误内容
* @param exchange 交换机名称
* @param routingKey 路由键
*/
@Override
public void returnedMessage(Message message, int replyCode, String
replyText, String exchange, String routingKey) {
System.out.println("交换机路由至消息队列出错:>>>>>>>");
System.out.println("交换机:"+exchange);
System.out.println("路由键:"+routingKey);
System.out.println("错误状态码:"+replyCode);
System.out.println("错误原因:"+replyText);
System.out.println("发送消息内容:"+message.toString());
System.out.println("<<<<<<<<");
}

4. 在RabbitTemplate中,设置消息发布回退回调方法

@PostConstruct

public void initRabbitTemplate(){

//设置消息确认回调方法

rabbitTemplate.setConfirmCallback(this::confirm);

//设置消息回退回调方法

rabbitTemplate.setReturnCallback(this::returnedMessage);

}

5. 请求测试失败执行returnedMessage方法: http://localhost:8080/direct/sendMsg?

exchange=order_exchange&routingkey=xxxxx&msg=购买苹果手机

3:小结

确认模式

设置publisher-confirms="true" 开启 确认模式。

实现RabbitTemplate.ConfirmCallback接口,重写confirm方法

特点:不论消息是否成功投递至交换机,都回调confirm方法,只有在发送失败时需要写业务代码进行处理。

退回模式

设置publisher-returns="true" 开启 退回模式。

实现RabbitTemplate.ReturnCallback接口,重写returnedMessage方法

特点:消息进入交换机后,只有当从exchange路由到queue失败,才去回调returnedMessage方法;

3:消费者确认(ACK)

ack指 Acknowledge,拥有确认的含义,是消费端收到消息的一种确认机制;

1:消息确认的三种类型

自动确认:acknowledge="none"

手动确认:acknowledge="manual"

根据异常情况确认:acknowledge="auto",(这种方式使用麻烦,不作讲解)

其中自动确认是指,当消息一旦被Consumer接收到,则自动确认收到,并将相应 message 从

RabbitMQ 的消息缓存中移除。但是在实际业务处理中,很可能消息接收到,业务处理出现异常,那么该消息就会丢失。

如果设置了手动确认方式,则需要在业务处理成功后,调用 channel.basicAck() ,手动签收,如果出现异常,则调用 channel.basicNack() 方法,让其自动重新发送消息。

 

自己实现消费者签收代码:自定义监听器涉及三个对象:三个对象必须注入Spring容器

1. 自定义监听器对象

2. 自定义监听器的适配器Adaptor对象

3. 监听器的容器对象

可以使用RabbitTemplate中提供的签收方式:

2:代码实现

目标

演示消费者手动确认效果

自定义消费者接收消息监听器,监听收到消息的内容,手动进行签收;当业务系统抛出异常则拒绝签收,重回队列

实现步骤

1. 搭建新的案例工程consumer-received-ack,用于演示ack消费者签收

2. 在消费者工程中,创建自定义监听器类CustomAckConsumerListener,实现

ChannelAwareMessageListener接口

3. 编写监听器配置类ListenerConfiguration,配置自定义监听器绑定消息队列 order.A

注入消息队列监听器适配器对象到ioc容器

注入消息队列监听器容器对象到ioc容器:

配置连接工厂

配置自定义监听器适配器对象

配置消息队列

开启手动签收

4. 启动消费者服务,观察控制台,消费者监听器是否与RabbitMQ建立Connection

5. 测试发送消息手动签收

6. 模拟业务逻辑出现异常情况

7. 测试异常情况,演示拒绝签收消息,消息重回队列

实现过程

1. 搭建新的案例工程consumer-received-ack,搭建过程类似于生产者确认

2. 在消费者工程中,创建自定义监听器类CustomAckConsumerListener,实现

ChannelAwareMessageListener接口

/**
* 自定义监听器,监听到消息之后,立即执行onMessage方法
*/
@Component
public class CustomAckConsumerListener implements
ChannelAwareMessageListener {
/**
* 监听到消息之后执行的方法
* @param message 消息内容
* @param channel 消息所在频道
*/
@Override
public void onMessage(Message message, Channel channel) throws
Exception {
//获取消息内容
byte[] messageBody = message.getBody();
String msg = new String(messageBody, "utf-8");
System.out.println("接收到消息,执行具体业务逻辑{} 消息内容:"+msg);
//获取投递标签
MessageProperties messageProperties =
message.getMessageProperties();
long deliveryTag = messageProperties.getDeliveryTag();
/**
* 签收消息,前提条件,必须在监听器的配置中,开启手动签收模式
* 参数1:消息投递标签
* 参数2:是否批量签收:true一次性签收所有,false,只签收当前消息
*/
channel.basicAck(deliveryTag,false);
System.out.println("手动签收完成:{}");
}
}

3. 编写监听器配置类ListenerConfiguration,配置自定义监听器绑定消息队列 order.A

注入消息队列监听器适配器对象到ioc容器

注入消息队列监听器容器对象到ioc容器:

配置连接工厂

配置自定义监听器

配置消息队列

开启手动签收

/**
* 消费者监听器配置,将监听器绑定到消息队列上
*/
@Configuration
public class ListenerConfiguration {
/**
* 注入消息监听器适配器
* @param customAckConsumerListener 自定义监听器对象
*/
@Bean
public MessageListenerAdapter
messageListenerAdapter(CustomAckConsumerListener
customAckConsumerListener){
//创建自定义监听器适配器对象
return new
MessageListenerAdapter(customAckConsumerListener);
}
/**
* 注入消息监听器容器
* @param connectionFactory 连接工厂
* @param messageListenerAdapter 自定义的消息监听器适配器
*/
@Bean
public SimpleMessageListenerContainer
simpleMessageListenerContainer(
ConnectionFactory connectionFactory,
MessageListenerAdapter messageListenerAdapter){
//简单的消息监听器容器对象
SimpleMessageListenerContainer container = new
SimpleMessageListenerContainer();
//绑定消息队列
container.setQueueNames("order.A");
//设置连接工厂对象
container.setConnectionFactory(connectionFactory);
//设置消息监听器适配器
container.setMessageListener(messageListenerAdapter);
//设置手动确认消息:NONE(不确认消息),MANUAL(手动确认消息),AUTO(自
动确认消息)
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
return container;
}
}

4. 启动消费者控制,观察控制台,消费者监听器是否与RabbitMQ建立Connection

5. 测试发送消息手动签收,请求地址http://localhost:8080/direct/sendMsg?exchange=order_exchange&routingkey=order.A&msg=购买苹果手机

6. 模拟业务逻辑出现异常情况,修改自定义监听器

@Override
public void onMessage(Message message, Channel channel) throws Exception
{
//获取消息内容
byte[] messageBody = message.getBody();
String msg = new String(messageBody, "utf-8");
System.out.println("接收到消息,执行具体业务逻辑{} 消息内容:"+msg);
//获取投递标签
MessageProperties messageProperties =
message.getMessageProperties();
long deliveryTag = messageProperties.getDeliveryTag();
try {
if (msg.contains("苹果")){
throw new RuntimeException("不允许卖苹果手机!!!");
}
/**
* 手动签收消息
* 参数1:消息投递标签
* 参数2:是否批量签收:true一次性签收所有,false,只签收当前消息
*/
channel.basicAck(deliveryTag,false);
System.out.println("手动签收完成:{}");
} catch (Exception ex){
/**
* 手动拒绝签收
* 参数1:当前消息的投递标签
* 参数2:是否批量签收:true一次性签收所有,false,只签收当前消息
* 参数3:是否重回队列,true为重回队列,false为不重回
*/
channel.basicNack(deliveryTag,false,true);
System.out.println("拒绝签收,重回队列:{}"+ex);
}
}

7. 测试异常情况,演示拒绝签收消息,消息重回队列

请求地址包含苹果,抛出异常:http://localhost:8080/direct/sendMsg?exchange=order_ex

change&routingkey=order.A&msg=购买苹果手机

控制台打印结果

2:小结

如果想手动签收消息,那么需要自定义实现消息接收监听器,实现

ChannelAwareMessageListener接口

设置AcknowledgeMode模式

none:自动

auto:异常模式

manual:手动

调用channel.basicAck方法签收消息

调用channel.basicNAck方法拒签消息

标签:确认,签收,投递,源码,MQ,消息,监听器,交换机,架构师
来源: https://blog.csdn.net/chongfa2008/article/details/121492595

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有