ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

rabbitMQ--死信队列

2022-07-19 11:06:01  阅读:43  来源: 互联网

标签:return String -- spring rabbitMQ 死信 rabbitmq message public


 


基于消费者 reject requeue设置为false 消息进入死信队列

# 应用名称
spring.application.name=rabbitmq
# 应用服务 WEB 访问端口
server.port=8080
spring.rabbitmq.host=192.168.1.137
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
#手动ack
spring.rabbitmq.listener.simple.acknowledge-mode=manual
#每次去交换机拿10条消息
spring.rabbitmq.listener.simple.prefetch=10
#开启confirm机制
spring.rabbitmq.publisher-confirm-type=correlated
#开启return机制
spring.rabbitmq.publisher-returns=true
@Configuration
public class DeadLetterConfig {
    public static final String NORMAL_EXCHANGE="normal-exchange";
    public static final String NORMAL_QUEUE="normal-queue";
    public static final String NORMAL_ROUTING_KEY="normal.#";
    public static final String DEAD_EXCHANGE="dead-exchange";
    public static final String DEAD_QUEUE="dead-queue";
    public static final String DEAD_ROUTING_KEY="dead.#";
    @Bean
    public Exchange normalExchange(){
        return ExchangeBuilder.topicExchange(NORMAL_EXCHANGE).build();
    }
    /*这里的队列需要绑定死信交换机*/
    @Bean
    public Queue normalQueue(){
        return QueueBuilder.durable(NORMAL_QUEUE).deadLetterExchange(DEAD_EXCHANGE).deadLetterRoutingKey(DEAD_ROUTING_KEY).build();
    }

    @Bean
    public Binding normalBinding(Exchange normalExchange,Queue normalQueue){
        return BindingBuilder.bind(normalQueue).to(normalExchange).with(NORMAL_ROUTING_KEY).noargs();
    }

    @Bean
    public Exchange deadExchange(){
        return ExchangeBuilder.topicExchange(DEAD_EXCHANGE).build();
    }

    @Bean
    public Queue deadQueue(){
        return QueueBuilder.durable(DEAD_QUEUE).build();
    }

    @Bean
    public Binding deadBinding(Exchange deadExchange,Queue deadQueue){
        return BindingBuilder.bind(deadQueue).to(deadExchange).with(DEAD_ROUTING_KEY).noargs();
    }
}
@Component
public class DeadListener {
    @RabbitListener(queues = DeadLetterConfig.NORMAL_QUEUE)
    public void consumer(String msg, Channel channel, Message message) throws IOException {
        System.out.println("接收到normal队列消息"+msg);
        /*拒绝消息 消息进入死信队列*/
        channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
    }
}
    /*死信发送者*/
    @Test
    public void publish() throws IOException {
        String msg="dead letter";
        rabbitTemplate.convertAndSend(DeadLetterConfig.NORMAL_EXCHANGE,"normal.abc",msg);
        System.in.read();
    }

--------

基于消息的生存时间  让消息进入到死信队列

 

 

    /*死信发送者*/

    @Test
    public void publishExpire() throws IOException {
        String msg="dead letter Expire";
        rabbitTemplate.convertAndSend(DeadLetterConfig.NORMAL_EXCHANGE, "normal.abc", msg, new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                message.getMessageProperties().setExpiration("50000");//5秒钟之内没有将消息消费掉  那么消息将进入死信队列中
                return message;
            }
        });
        System.in.read();
    }
最好不要设置 这种方式消息的生存时间  因为rabbitMQ 只会监听最外围的生存时间  也就是说 当msg1 ttl 5s,msg2 ttl 10s  
如果msg2第一个到达队列,msg1第二个到达 rabbitMQ会先监听msg2的10s 然后再来查看msg1的5秒 此时msg需要10秒 然后进入死信队列中 不会被正常消费
为了解决上述问题 我们需要引入 延迟交换机来解决此类问题  插件下载地址
https://www.rabbitmq.com/community-plugins.html
下载包 支持3.8.5 https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/3.8.9/rabbitmq_delayed_message_exchange-3.8.9-0199d11c.ez
docker cp rabbitmq_delayed_message_exchange-3.8.9-0199d11c.ez rabbitmq:/opt/rabbitmq/plugins


docker exec -it rabbitmq bash
cd /opt/rabbitmq/plugins
 cd ../sbin/
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
exit
docker restart rabbitmq

 

 这种方式是基于 生产者发送消息 到发送到交换机  消息会堆积在交换机  到达了指定时间 才会路由到指定的队列中  通过这种方式我们可以更方便的实现延迟消费  解决上面的问题



    @Test
    public void DelayedPublish(){
        rabbitTemplate.convertAndSend(XDelayedMessageConfig.DelayedMessage_EXCHANGE,"delayed.ach","xxx",new MessagePostProcessor(){

            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                message.getMessageProperties().setDelay(3000);
                return message;
            }
        });
    }
@Configuration
public class XDelayedMessageConfig {
    public static final String DelayedMessage_EXCHANGE="DelayedMessage-exchange";
    public static final String DelayedMessage_QUEUE="DelayedMessage-queue";
    public static final String DelayedMessage_ROUTING_KEY="delayed.#";
    /*延迟交换机*/
    @Bean
    public Exchange delayedExchange(){
        Map<String, Object> arguments=new HashMap<>();
        arguments.put("x-delayed-type", "topic");
        Exchange exchange = new CustomExchange(DelayedMessage_EXCHANGE, "x-delayed-message",true,false,arguments);
        return exchange;
    }
    @Bean
    public Queue delayedQueue(){
        return QueueBuilder.durable(DelayedMessage_QUEUE).build();
    }
    @Bean
    public Binding delayedBinding(Exchange delayedExchange, Queue delayedQueue){
        return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DelayedMessage_ROUTING_KEY).noargs();
    }
}

延迟交换机

 

 

 

------------

 

 

 

 

 

标签:return,String,--,spring,rabbitMQ,死信,rabbitmq,message,public
来源: https://www.cnblogs.com/Lcch/p/16493281.html

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有