ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

RabbitMQ学习笔记

2022-07-23 14:34:57  阅读:102  来源: 互联网

标签:String 队列 System 笔记 学习 RabbitMQ println channel 消息


RabbitMQ学习笔记

1、添加用户,进行登录

  1. 创建账号

     rabbitmqctl add_user 用户名 密码
  2. 设置用户角色

     rabbitmqctl set_user_tags 用户名 角色
  3. 设置用户权限

     rabbitmqctl set_permissions [-p <vhostpath>] <user> <conf> <write> <read> 
     # 例如
     rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"

     

2、Hello World!

  1. 生产者

     public class Producer {
     ​
         // 队列名
         private static final String QUEUE_NAME = "hello";
     ​
         public static void main(String[] args) throws Exception {
     ​
             // 创建连接工厂
             ConnectionFactory factory = new ConnectionFactory();
             // mq的ip
             factory.setHost("192.168.1.22");
             // mq的用户名
             factory.setUsername("admin");
             // mq的密码
             factory.setPassword("admin123");
     ​
             // 创建链接
             Connection connection = factory.newConnection();
             // 获取信道
             Channel channel = connection.createChannel();
             /**
              * 生成一个队列
              * 参数1:队列名称
              * 参数2:是否持久化
              * 参数3:是否进行消息的共享(一个队列可以由多个消费者消费)
              * 参数4:是否自动删除
              * 参数5:其他参数
              */
             channel.queueDeclare(QUEUE_NAME,true,false,false,null);
     ​
             // 消息内容
             String message = "hello world";
             /**
              * 发送消息
              * 参数1:发送哪个交换机
              * 参数2:路由的key值(队列名)
              * 参数3:其他参数
              * 参数4:消息体
              */
             channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
     ​
             System.out.println("消息发送完毕!");
        }
     }
  2. 消费者

     public class Consumer {
     ​
         // 队列名
         private static final String QUEQUE_NAME = "hello";
     ​
         public static void main(String[] args) throws Exception {
     ​
             // 创建连接
             ConnectionFactory factory = new ConnectionFactory();
             factory.setHost("192.168.1.22");
             factory.setUsername("admin");
             factory.setPassword("admin123");
             Connection connection = factory.newConnection();
     ​
             // 获取信道
             Channel channel = connection.createChannel();
     ​
             // 接收消息回调
             DeliverCallback deliverCallback = (consumerTag, message) -> {
                 System.out.println(new String(message.getBody()));
            };
             // 取消消息回调
             CancelCallback cancelCallback = consumerTag -> {
                 System.out.println("消息消费被中断!");
            };
             /**
              * 接受消息
              * 参数1:队列名
              * 参数2:消费成功后是否自动应答
              * 参数3:消费者未消费成功回调
              * 参数4:消费者消费取消回调
              */
             channel.basicConsume(QUEQUE_NAME, true, deliverCallback, cancelCallback);
        }
     }

     

3、消息应答

之前的例子中,我们在消费者中接收消息都是自动应答,也就说在接收到消息后不管有没有处理完都会应答,然后删除消息。

如果在处理的过程中出现错误,就会丢失消息。所以实际开发中我们需要手动应答

1、消息应答方法

  1. Channel.basicAck(用于肯定确认):确认成功处理消息,并丢弃消息。

  2. Channel.basicNAck(用于否定确认)

  3. Channel.basicReject(用于否定确认):与Channel.basicNAck相比,少一个参数,不处理该消息,并可以直接丢弃

 

2、Multiple

手动应答的好处是可以批量应答并且减少网络拥堵

 

3、消息重新入队

当消费者接收到消息后由于某种原因与信道断开,导致还未应答。mq会把消息保存下来重新入队,保证消息不丢失。

  1. 生产者

     public class Producer {
     ​
         // 队列名
         private static final String QUEUE_NAME = "queue_ack";
     ​
         public static void main(String[] args) throws Exception {
     ​
             // 获取信道
             Channel channel = RabbitMQUtils.getChanel();
             channel.queueDeclare(QUEUE_NAME,true,false,false,null);
     ​
             // 大量发送消息
             Scanner scanner = new Scanner(System.in);
             while (scanner.hasNext()) {
                 String message = scanner.next();
     ​
                 channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
                 System.out.println("发送消息:" + message);
            }
        }
     }
  2. 消费者1

     public class Consumer1 {
     ​
         // 队列名
         private static final String QUEQUE_NAME = "queue_ack";
     ​
         public static void main(String[] args) throws Exception {
     ​
             // 获取信道
             Channel channel = RabbitMQUtils.getChanel();
     ​
             // 接收消息回调
             DeliverCallback deliverCallback = (tag,msg) -> {
                 // 处理时间1秒
                 try {
                     Thread.sleep(1000);
                } catch (InterruptedException e) {
                     e.printStackTrace();
                }
                 System.out.println("接收到消息内容:" + new String(msg.getBody()));
                 // 手动应答,并且不批量应答
                 channel.basicAck(msg.getEnvelope().getDeliveryTag(), false);
            };
     ​
             System.out.println("C1等待接收消息,处理时间较短...");
             // 接收消息并且手动应答
             boolean autoAck = false;
             channel.basicConsume(QUEQUE_NAME, autoAck,
                     deliverCallback,
                     tag -> System.out.println(tag + "消息者取消接收消息"));
        }
     }
  3. 消费者2

     public class Consumer2 {
     ​
         // 队列名
         private static final String QUEQUE_NAME = "queue_ack";
     ​
         public static void main(String[] args) throws Exception {
     ​
             // 获取信道
             Channel channel = RabbitMQUtils.getChanel();
     ​
             // 接收消息回调
             DeliverCallback deliverCallback = (tag,msg) -> {
                 // 处理时间1秒
                 try {
                     Thread.sleep(3000);
                } catch (InterruptedException e) {
                     e.printStackTrace();
                }
                 System.out.println("接收到消息内容:" + new String(msg.getBody()));
                 // 手动应答,并且不批量应答
                 channel.basicAck(msg.getEnvelope().getDeliveryTag(), false);
            };
     ​
             System.out.println("C1等待接收消息,处理时间较长...");
             // 接收消息并且手动应答
             boolean autoAck = false;
             channel.basicConsume(QUEQUE_NAME, autoAck,
                     deliverCallback,
                     tag -> System.out.println(tag + "消息者取消接收消息"));
        }
     }

     

4、RabbitMQ持久化

解决消息不丢失问题,将消息持久化

1、队列持久化

在声明队列时参数设置为true即可持久化

2、消息持久化

发布消息是设置属性:==MessageProperties.PERSISTENT_TEXT_PLAIN==

 

3、不公平分发

mq默认是轮询消费消息,但大部分情况有的消费者处理消息快,而有的消费者处理消息慢,这就导致处理快的消费者有很大一部分是空闲状态的。

 Channel.basicQos(int prefetchCount)

 

4、预取值

指定分发固定的消息给消费者

 

5、发布确认

1.设置队列必须持久化

2.设置消息必须持久化

3.当生产者将消息持久化成功后,发布确认。这样保证消息不会丢失

 

开启发布确认模式:

 // 开启发布确认模式
 channel.confirmSelect();

 

1、单个确认发布

发布一条,确认一条

 // 单个发布确认
 public static void singleConfirmMessage() throws Exception {
     // 获取信道
     Channel channel = RabbitMQUtils.getChanel();
     // 信道持久化
     boolean durable = true;
     channel.queueDeclare(QUEUE_NAME, durable, false, false, null);
     // 开启发布确认模式
     channel.confirmSelect();
 ​
     long start = System.currentTimeMillis();
     // 大量发送消息
     for (int i = 0; i < MESSAGE_COUNT; i++) {
         // 发布消息并持久化
         channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, (i + "").getBytes());
         //单个消息发布确认
         channel.waitForConfirms();
    }
     long end = System.currentTimeMillis();
     System.out.println("单个发布确认1000个耗时:" + (end - start) + "毫秒");
 }

 

2、批量确认发布

 public static void batchConfirmMessage() throws Exception {
     // 获取信道
     Channel channel = RabbitMQUtils.getChanel();
     // 信道持久化
     boolean durable = true;
     channel.queueDeclare(QUEUE_NAME, durable, false, false, null);
     // 开启发布确认模式
     channel.confirmSelect();
 ​
     // 发送消息多少个时进行批量确认
     int count = 100;
     long start = System.currentTimeMillis();
     // 大量发送消息
     for (int i = 1; i <= MESSAGE_COUNT; i++) {
         // 发布消息并持久化
         channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, (i + "").getBytes());
 ​
         // 发布100个消息确认一次
         if (i % count == 0) {
             channel.waitForConfirms();
        }
    }
     long end = System.currentTimeMillis();
     System.out.println("批量发布确认1000个耗时:" + (end - start) + "毫秒");
 }

 

3、异步确认发布

 // 获取信道
         Channel channel = RabbitMQUtils.getChanel();
         // 信道持久化
         boolean durable = true;
         channel.queueDeclare(QUEUE_NAME, durable, false, false, null);
         // 开启发布确认模式
         channel.confirmSelect();
 ​
         long start = System.currentTimeMillis();
         // 线程安全有序的map,适合高并发,储存消息
         ConcurrentSkipListMap<Long,String> map = new ConcurrentSkipListMap<>();
 ​
         // 发布确认成功事件
         ConfirmCallback ackCallback = (deliveryTag, multiple) -> {
             // 如果是批量
             if (multiple) {
                 // 删除已确认的消息,剩下的就是未确认的消息
                 ConcurrentNavigableMap<Long,String> comfirmed = map.headMap(deliveryTag);
                 comfirmed.clear();
            } else {
                 map.remove(deliveryTag);
            }
             System.out.println("消息发布成功:" + deliveryTag);
        };
         // 发布确认失败事件
         ConfirmCallback nackCallback = (deliveryTag, multiple) -> {
             System.out.println("未确认的消息有" + map.get(deliveryTag) + "\t未确认的消息序号:" + deliveryTag);
        };
         // 发布消息监听,哪些消息发布成功,哪些消息发布失败
         channel.addConfirmListener(ackCallback, nackCallback);
 ​
 ​
         // 大量发送消息
         for (int i = 1; i <= MESSAGE_COUNT; i++) {
             // 发布消息并持久化
             String message = i + "";
             channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
             // 记录所有要发送的消息序号和内容
             map.put(channel.getNextPublishSeqNo(), message);
        }
         long end = System.currentTimeMillis();
         System.out.println("异步发布确认1000个耗时:" + (end - start) + "毫秒");

 

 

6、交换机

1、类型

直接(direct)、主题(topic)、标题(headers)、扇出(fanout)

 

2、发布订阅模式(fanout)

 

 

3、多重绑定(direct)

 

4、主题交换机(topic)

主题交换机的routing_key不能随便写,==必须是单词列表,以点号隔开==。例如:"user.admin.guest",这些单词可以是任意单词。

*(星号)可以代替一个单词

#(井号)可以代替0个或多个单词

 

注意:

  1. 当一个队列绑定的是一个#(井号),那么这个队列将接收所有数据,有点像fanout

  2. 如果队列绑定键中没有*(星号)或#(井号),那么该队列的绑定类型就是direct。

 

7、死信队列

死信:无法被消费的消息

原因:由于特定原因导致队列中某些消息无法被消费,如果这样的消息没有被处理,就会变成死信,有死信就自然有死信队列。

应用场景:用户在商城下单成功并点击支付时,在指定时间内未支付则自动失效。

 

1、死信的来源

  1. 消息TTL过期

  2. 队列达到最大长度(队列满了,无法再添加)

  3. 消息被拒绝(basic.reject或basic.nack)并且requeue=false

 

2、代码示例

  1. 消费者1:

     public class Consumer1 {
     ​
         // 普通交换机名称
         private static final String NORMAL_EXCHANGE = "normal_exchange";
         // 普通队列名
         private static final String NORMAL_QUEUE = "normal_queue";
         // 死信交换机名称
         private static final String DEAD_EXCHANGE = "dead_exchange";
         // 死信队列名称
         private static final String DEAD_QUEUE = "dead_queue";
     ​
         public static void main(String[] args) throws Exception {
     ​
             // 获取信道
             Channel channel = RabbitMQUtils.getChanel();
             // 声明交换机类型为direct
             channel.exchangeDeclare(NORMAL_EXCHANGE,BuiltinExchangeType.DIRECT);
             channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT);
     ​
             // 声明普通队列
             Map<String, Object> arguments = new HashMap<>();
             // 不正常消费转换到死信交换机
             arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);
             // 设置死信routingKey
             arguments.put("x-dead-letter-routing-key", "lisi");
             // 设置队列最大长度
     //       arguments.put("x-max-length", 6);
             channel.queueDeclare(NORMAL_QUEUE, false, false,false,arguments);
     ​
             // 声明死信队列
             channel.queueDeclare(DEAD_QUEUE, false, false,false,null);
     ​
             // 绑定普通交换机和普通队列
             channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "zhangsan");
             // 绑定死信交换机和死信队列
             channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "lisi");
     ​
             System.out.println("C1等待接收消息...");
             // 接收消息
             DeliverCallback deliverCallback = (tag, msg) -> {
                     System.out.println("Consumer1接收到消息:" + new String(msg.getBody()));
            };
             channel.basicConsume(NORMAL_QUEUE, true, deliverCallback, (tag, msg) -> {});
        }
     }
  2. 消费者2:

     public class Consumer2 {
     ​
         // 死信队列名称
         private static final String DEAD_QUEUE = "dead_queue";
     ​
         public static void main(String[] args) throws Exception {
     ​
             // 获取信道
             Channel channel = RabbitMQUtils.getChanel();
     ​
             System.out.println("C2等待接收消息...");
     ​
             // 接收消息事件
             DeliverCallback callback = (tag, msg) -> {
                 System.out.println("Consumer2接收到消息:" + new String(msg.getBody()));
            };
             channel.basicConsume(DEAD_QUEUE, false, callback, (tag, msg) -> System.out.println(tag + "未处理消息"));
        }
     }
  3. 生产者:

     public class Producer {
     ​
         // 普通交换机名称
         private static final String NORMAL_EXCHANGE = "normal_exchange";
     ​
         public static void main(String[] args) throws Exception {
     ​
             // 获取信道
             Channel chanel = RabbitMQUtils.getChanel();
             // 死信消息,设置TTL时间
             AMQP.BasicProperties properties = new AMQP.BasicProperties()
                    .builder()
                    .expiration("10000")
                    .build();
     ​
             // 发送消息
             for (int i = 1; i <= 10; i++) {
                 String message = i + "";
                 chanel.basicPublish(NORMAL_EXCHANGE, "zhangsan", properties, message.getBytes("utf-8"));
                 System.out.println("发送消息成功:" + message);
            }
        }
     }

     

 

3、延迟队列

延迟队列就是死信队列的一种

来源:消息TTL过期

 

 

4、插件实现延迟队列

延迟队列的存在问题:因为RabbitMQ只会检查第一个消息是否过期,如果过期则丢入延迟队列。如果当第一个消息延时时长很长第二个消息延时时长很短,结果是第二个消息并不会得到优先执行。

安装插件后,交换机会多出一种类型:x-delayed-message

插件原理:交换机进行延迟,延迟时间过后,再把消息放入队列。之前是队列里进行延迟。

  1. 官网下载插件:rabbitmq_delayed_message_exchange

  2. 安装插件:

     rabbitmq-plugins enable rabbitmq_delayed_message_exchange
  3. 重启rabbitmq

     systemctl restart rabbitmq-server

 

 

8、发布确认高级

1、配置文件

spring.rabbitmq.publisher-confirm-type=correlated

  • NONE:禁止发布确认模式,是默认值。

  • CORRELATED:发布消息成功到交换机后会触发回调函数。

  • SIMPLE

 

2、配置类

 // 交换机接受到消息回调接口
 @Component
 public class ExchangeCallBack implements RabbitTemplate.ConfirmCallback {
 ​
     @Autowired
     private RabbitTemplate rabbitTemplate;
 ​
     @PostConstruct
     public void init() {
         rabbitTemplate.setConfirmCallback(this);
    }
 ​
     /**
      *
      * @param correlationData 保存回调消息的ID和相关信息
      * @param ack 交换机是否接受到消息
      * @param cause 失败的原因
      */
     @Override
     public void confirm(CorrelationData correlationData, boolean ack, String cause) {
 ​
         if (ack) {
             System.out.println("交换机收到来自ID为" + correlationData.getId() + "的消息");
        } else {
             System.out.println("交换机未接收到来自ID为" + correlationData.getId() + ",原因:" + cause);
        }
    }
 }

 

9、回退消息

问题场景:队列无法收到消息

1.配置文件

 spring:
  rabbitmq:
    publisher-returns: true

2、配置类

 @Component
 public class ExchangeCallBack implements RabbitTemplate.ReturnCallback {
 ​
     @Autowired
     private RabbitTemplate rabbitTemplate;
 ​
     @PostConstruct
     public void init() {
         rabbitTemplate.setReturnCallback(this);
    }
 ​
     /**
      * 单消息不可到达目的地时,将消息返回给生产者
      * @param message
      * @param replyCode
      * @param replyText
      * @param exchange
      * @param routingKey
      */
     @Override
     public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
         System.out.println(String.format("消息%s被交换%s机退回,返回原因:%s,路由key:%s",new String(message.getBody()), exchange, replyText, routingKey));
    }
 }

 

10、备份交换机

 

 

11、消息优先级

 

 

12、搭建集群

 

标签:String,队列,System,笔记,学习,RabbitMQ,println,channel,消息
来源: https://www.cnblogs.com/zhouqiangshuo/p/16511953.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有