ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

RabbitMQ学习09--死信队列(TTL过期)

2021-11-07 10:01:53  阅读:113  来源: 互联网

标签:-- 09 队列 死信 import com public channel


1、死信的概念:

死信,顾名思义就是无法被消费的消息,字面意思可以这样理解,一般来说,producer 将消息投递到 broker 或者直接到queue 里了,consumer 从 queue 取出消息进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。

应用场景:为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ的死信队列机制,当消息消费发生异常时,将消息投入死信队列中.还有比如说:用户在商城下单成功并点击去支付后在指定时间未支付时自动失效。

 

2、死信的来源:

消息 TTL 过期

队列达到最大长度(队列满了,无法再添加数据到 mq 中)

消息被拒绝(basic.reject 或 basic.nack)并且 requeue=false.

 

3、代码示例:

 

 

工厂类:

 1 package com.yas.config;
 2 
 3 import com.rabbitmq.client.Channel;
 4 import com.rabbitmq.client.Connection;
 5 import com.rabbitmq.client.ConnectionFactory;
 6 
 7 import java.io.IOException;
 8 import java.util.concurrent.TimeoutException;
 9 
10 public class RabbitMQClient {
11     public static Connection getConnection(){
12         //创建Connection工厂
13         ConnectionFactory factory = new ConnectionFactory();
14         factory.setHost("106.12.17.17");
15         factory.setPort(5672);
16         factory.setUsername("admin");
17         factory.setPassword("cs1986@0312");
18         factory.setVirtualHost("/");
19 
20         //创建Connection
21         Connection connection = null;
22         try {
23             connection = factory.newConnection();
24         } catch (IOException e) {
25             e.printStackTrace();
26         } catch (TimeoutException e) {
27             e.printStackTrace();
28         }
29         return connection;
30     }
31 
32     public static Channel getChannel(){
33         Connection connection = getConnection();
34         try {
35             return connection.createChannel();
36         } catch (IOException e) {
37             e.printStackTrace();
38         }
39         return null;
40     }
41 }

 

消息生产者代码:

 1 package com.yas.deadexchange;
 2 
 3 import com.rabbitmq.client.AMQP;
 4 import com.rabbitmq.client.BuiltinExchangeType;
 5 import com.rabbitmq.client.Channel;
 6 import com.yas.config.RabbitMQClient;
 7 import org.junit.Test;
 8 
 9 public class Publisher {
10     private static final String NORMAL_EXCHANGE = "normal_exchange";
11 
12     @Test
13     public void publish() throws Exception {
14         try (Channel channel = RabbitMQClient.getChannel()) {
15             channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
16             //设置消息的 TTL 时间
17             AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
18             //该信息是用作演示队列个数限制
19             for (int i = 1; i <11 ; i++) {
20                 String message="info"+i;
21                 //channel.basicPublish(NORMAL_EXCHANGE, message.getBytes());
22                 channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",properties,message.getBytes());
23                 System.out.println("生产者发送消息:"+message);
24             }
25         }
26     }
27 }

 

正常队列(normal_queue)消费者:

 1 package com.yas.deadexchange;
 2 
 3 import com.rabbitmq.client.*;
 4 import com.yas.config.RabbitMQClient;
 5 import org.junit.Test;
 6 
 7 import java.util.HashMap;
 8 import java.util.Map;
 9 
10 public class Consumer01 {
11     public static final String NORMAL_EXCHANGE = "normal_exchange";
12     public static final String DEAD_EXCHANGE = "dead_exchange";
13     public static final String NORMAL_QUEUE = "normal_queue";
14     public static final String DEAD_QUEUE = "dead_queue";
15 
16     @Test
17     public void consume() throws Exception {
18         //1.获取连接对象
19         Connection connection = RabbitMQClient.getConnection();
20         //2.创建channel
21         Channel channel = connection.createChannel();
22 
23         //声明死信交换机 类型为 direct
24         channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
25         //声明死信队列
26         channel.queueDeclare(DEAD_QUEUE, false, false, false, null);
27         //死信队列绑定死信交换机与 routingkey
28         channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "lisi");
29 
30 
31         //声明普通交换机 类型为 direct
32         channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
33         //正常队列绑定死信队列信息
34         Map<String, Object> params = new HashMap<>();
35         //正常队列设置死信交换机 参数 key 是固定值
36         params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
37         //正常队列设置死信 routing-key 参数 key 是固定值
38         params.put("x-dead-letter-routing-key", "lisi");
39         //声明普通队列
40         channel.queueDeclare(NORMAL_QUEUE, false, false, false, params);
41         //普通队列绑定普通交换机routing-key
42         channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "zhangsan");
43         System.out.println("等待接收消息........... ");
44 
45         //消费者回调
46         DeliverCallback deliverCallback = (consumerTag, delivery) -> {
47             String message = new String(delivery.getBody(), "UTF-8");
48             System.out.println("Consumer01 接收到消息"+message);
49             };
50         channel.basicConsume(NORMAL_QUEUE, true, deliverCallback, consumerTag -> {
51         });
52 
53         System.in.read();
54         //5/释放资源
55         channel.close();
56         connection.close();
57     }
58 }

测试方式:

1.正常消费,先执行消费者,开启消费监听。再开启生产者,则消息从normal_queue被消费。

2.死信队列,不开启消费者,只开启生产者,消息先发送到normal_queue,等10秒超时后,会从normal_queue转发给dead_queue。消息进入死信队列。

 

死信队列消费者:

 1 package com.yas.deadexchange;
 2 
 3 import com.rabbitmq.client.BuiltinExchangeType;
 4 import com.rabbitmq.client.Channel;
 5 import com.rabbitmq.client.Connection;
 6 import com.rabbitmq.client.DeliverCallback;
 7 import com.yas.config.RabbitMQClient;
 8 import org.junit.Test;
 9 
10 //消费死信队列中的消息
11 public class Consumer02 {
12 
13     private static final String DEAD_EXCHANGE = "dead_exchange";
14 
15     @Test
16     public void consume() throws Exception {
17         //1.获取连接对象
18         Connection connection = RabbitMQClient.getConnection();
19         //2.创建channel
20         Channel channel = connection.createChannel();
21 
22         channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
23         String deadQueue = "dead_queue";
24         channel.queueDeclare(deadQueue, false, false, false, null);
25         channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");
26         System.out.println("等待接收死信队列消息........... ");
27         DeliverCallback deliverCallback = (consumerTag, delivery) ->
28         {String message = new String(delivery.getBody(), "UTF-8");
29             System.out.println("Consumer02 接收死信队列的消息" + message);
30         };
31         channel.basicConsume(deadQueue, true, deliverCallback, consumerTag -> {
32         });
33 
34         System.in.read();
35         //5/释放资源
36         channel.close();
37         connection.close();
38     }
39 }

测试方式:

对于进入死信队列的信息,可以通过启动私信队列的消费者完成消费。

 

标签:--,09,队列,死信,import,com,public,channel
来源: https://www.cnblogs.com/asenyang/p/15519129.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有