ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

rabbitMQ--死信队列

2022-07-19 11:06:01  阅读:136  来源: 互联网

标签:return String -- spring rabbitMQ 死信 rabbitmq message public


 


基于消费者 reject requeue设置为false 消息进入死信队列

# 应用名称
spring.application.name=rabbitmq
# 应用服务 WEB 访问端口
server.port=8080
spring.rabbitmq.host=192.168.1.137
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
#手动ack
spring.rabbitmq.listener.simple.acknowledge-mode=manual
#每次去交换机拿10条消息
spring.rabbitmq.listener.simple.prefetch=10
#开启confirm机制
spring.rabbitmq.publisher-confirm-type=correlated
#开启return机制
spring.rabbitmq.publisher-returns=true
@Configuration
public class DeadLetterConfig {
    public static final String NORMAL_EXCHANGE="normal-exchange";
    public static final String NORMAL_QUEUE="normal-queue";
    public static final String NORMAL_ROUTING_KEY="normal.#";
    public static final String DEAD_EXCHANGE="dead-exchange";
    public static final String DEAD_QUEUE="dead-queue";
    public static final String DEAD_ROUTING_KEY="dead.#";
    @Bean
    public Exchange normalExchange(){
        return ExchangeBuilder.topicExchange(NORMAL_EXCHANGE).build();
    }
    /*这里的队列需要绑定死信交换机*/
    @Bean
    public Queue normalQueue(){
        return QueueBuilder.durable(NORMAL_QUEUE).deadLetterExchange(DEAD_EXCHANGE).deadLetterRoutingKey(DEAD_ROUTING_KEY).build();
    }

    @Bean
    public Binding normalBinding(Exchange normalExchange,Queue normalQueue){
        return BindingBuilder.bind(normalQueue).to(normalExchange).with(NORMAL_ROUTING_KEY).noargs();
    }

    @Bean
    public Exchange deadExchange(){
        return ExchangeBuilder.topicExchange(DEAD_EXCHANGE).build();
    }

    @Bean
    public Queue deadQueue(){
        return QueueBuilder.durable(DEAD_QUEUE).build();
    }

    @Bean
    public Binding deadBinding(Exchange deadExchange,Queue deadQueue){
        return BindingBuilder.bind(deadQueue).to(deadExchange).with(DEAD_ROUTING_KEY).noargs();
    }
}
@Component
public class DeadListener {
    @RabbitListener(queues = DeadLetterConfig.NORMAL_QUEUE)
    public void consumer(String msg, Channel channel, Message message) throws IOException {
        System.out.println("接收到normal队列消息"+msg);
        /*拒绝消息 消息进入死信队列*/
        channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
    }
}
    /*死信发送者*/
    @Test
    public void publish() throws IOException {
        String msg="dead letter";
        rabbitTemplate.convertAndSend(DeadLetterConfig.NORMAL_EXCHANGE,"normal.abc",msg);
        System.in.read();
    }

--------

基于消息的生存时间  让消息进入到死信队列

 

 

    /*死信发送者*/

    @Test
    public void publishExpire() throws IOException {
        String msg="dead letter Expire";
        rabbitTemplate.convertAndSend(DeadLetterConfig.NORMAL_EXCHANGE, "normal.abc", msg, new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                message.getMessageProperties().setExpiration("50000");//5秒钟之内没有将消息消费掉  那么消息将进入死信队列中
                return message;
            }
        });
        System.in.read();
    }
最好不要设置 这种方式消息的生存时间  因为rabbitMQ 只会监听最外围的生存时间  也就是说 当msg1 ttl 5s,msg2 ttl 10s  
如果msg2第一个到达队列,msg1第二个到达 rabbitMQ会先监听msg2的10s 然后再来查看msg1的5秒 此时msg需要10秒 然后进入死信队列中 不会被正常消费
为了解决上述问题 我们需要引入 延迟交换机来解决此类问题  插件下载地址
https://www.rabbitmq.com/community-plugins.html
下载包 支持3.8.5 https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/3.8.9/rabbitmq_delayed_message_exchange-3.8.9-0199d11c.ez
docker cp rabbitmq_delayed_message_exchange-3.8.9-0199d11c.ez rabbitmq:/opt/rabbitmq/plugins


docker exec -it rabbitmq bash
cd /opt/rabbitmq/plugins
 cd ../sbin/
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
exit
docker restart rabbitmq

 

 这种方式是基于 生产者发送消息 到发送到交换机  消息会堆积在交换机  到达了指定时间 才会路由到指定的队列中  通过这种方式我们可以更方便的实现延迟消费  解决上面的问题



    @Test
    public void DelayedPublish(){
        rabbitTemplate.convertAndSend(XDelayedMessageConfig.DelayedMessage_EXCHANGE,"delayed.ach","xxx",new MessagePostProcessor(){

            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                message.getMessageProperties().setDelay(3000);
                return message;
            }
        });
    }
@Configuration
public class XDelayedMessageConfig {
    public static final String DelayedMessage_EXCHANGE="DelayedMessage-exchange";
    public static final String DelayedMessage_QUEUE="DelayedMessage-queue";
    public static final String DelayedMessage_ROUTING_KEY="delayed.#";
    /*延迟交换机*/
    @Bean
    public Exchange delayedExchange(){
        Map<String, Object> arguments=new HashMap<>();
        arguments.put("x-delayed-type", "topic");
        Exchange exchange = new CustomExchange(DelayedMessage_EXCHANGE, "x-delayed-message",true,false,arguments);
        return exchange;
    }
    @Bean
    public Queue delayedQueue(){
        return QueueBuilder.durable(DelayedMessage_QUEUE).build();
    }
    @Bean
    public Binding delayedBinding(Exchange delayedExchange, Queue delayedQueue){
        return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DelayedMessage_ROUTING_KEY).noargs();
    }
}

延迟交换机

 

 

 

------------

 

 

 

 

 

标签:return,String,--,spring,rabbitMQ,死信,rabbitmq,message,public
来源: https://www.cnblogs.com/Lcch/p/16493281.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有