标签:map correlationData String confirm rabbitmq 可靠 import message 投递
原理
rabbitmq基于confirm模式、持久化策略,使用redis作为消息的临时储存空间
首先开启rabbitmq的confirm和持久化策略(可持久的对象为交换机、队列、消息)
1 开启confirm,在配置文件application.yml中
spring: rabbitmq: host: 192.168.200.128 publisher-confirms: true #开启confirm数据保护机制
2 开启交换机、队列、消息的持久化,在rabbitmq的配置类里
@Configuration public class RabbitMQConfig { public static final String SECKILL_ORDER_QUEUE ="seckill_order"; @Bean public Queue queue(){ return new Queue(SECKILL_ORDER_QUEUE,true);//true为开启队列持久化 } }
此处没有用交换机,如果想要开启交换机持久化参考 https://www.cnblogs.com/wrc-blog/p/14340064.html
消息默认就是持久的
编写一个增强spring自带的rabbitTemplate的类
使其在向队列发消息前,先往redis中备份数据
同时实现RabbitTemplate.ConfirmCallback接口,接收confirm模式下的回调函数,根据结果执行下一步操作
package com.changgou.seckill.config; import com.alibaba.fastjson.JSON; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; import java.util.HashMap; import java.util.Map; import java.util.UUID; //增强rabbitmq @Component public class ConfirmMessageSender implements RabbitTemplate.ConfirmCallback { @Autowired private RabbitTemplate rabbitTemplate; @Autowired private RedisTemplate redisTemplate; public static final String MESSAGE_CONFIRM_KEY="message_confirm_key"; //构造函数 public ConfirmMessageSender(RabbitTemplate rabbitTemplate) { this.rabbitTemplate = rabbitTemplate; rabbitTemplate.setConfirmCallback(this); } //开启生产者的confirm可靠投递时,用于接收rabbit服务器返回的通知 //参数1 消息的唯一标识,需要在发送消息时就传入,参见下方自定义发送方法 //参数2 发送是否成功 @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (ack){ //成功 删除redis中的相关数据 redisTemplate.delete(correlationData.getId()); redisTemplate.delete(MESSAGE_CONFIRM_KEY+correlationData.getId()); }else{ //失败 //从redis中获取失败的消息,重新发送 Map<String,String> map = ( Map<String,String>)redisTemplate.opsForHash().entries(MESSAGE_CONFIRM_KEY + correlationData.getId()); String exchange = map.get("exchange"); String routingKey = map.get("routingKey"); String message = map.get("message"); rabbitTemplate.convertAndSend(exchange,routingKey, JSON.toJSONString(message)); } } //自定义消息发送方法 先往redis里保存一份,再往消息队列里发送 public void sendMessage(String exchange,String routingKey,String message){ //将消息保存到redis中,在投递失败时重新取出再次投递 //设置消息的唯一标识,这个类由rabbit提供,用来为消息设置唯一标识,在上方confrim用于回调时区分消息 CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); redisTemplate.opsForValue().set(correlationData.getId(),message); //将本次发送消息的元数据保存到redis Map<String,String> map = new HashMap<>(); map.put("exchange",exchange); map.put("routingKey",routingKey); map.put("message",message); //注意这是哈希结构,你可以想为一大一小两个map,putAll会把上边定义的map作为小map的键值 redisTemplate.opsForHash().putAll(MESSAGE_CONFIRM_KEY+correlationData.getId(),map); //携带本次消息的唯一标识,发送到消息队列 //注意这里多了一个参数4,是我们上边定义的消息唯一标识,通过confirm模式确认可靠投递时,用corrlationData来区分不同的消息 rabbitTemplate.convertAndSend(exchange,routingKey,message,correlationData); //这里的message是由外部传入的,如果是一个对象,要在外部先转为json } }
在需要进行可靠投递时,不用原生的rabbittemplate,而注入这个类,并调用sendMessage方法,回调函数会自动执行,并判断如何继续处理
1
1
标签:map,correlationData,String,confirm,rabbitmq,可靠,import,message,投递 来源: https://www.cnblogs.com/wrc-blog/p/14341684.html
本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享; 2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关; 3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关; 4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除; 5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。