ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

RabbitMQ快速入门及实例演示

2022-02-05 11:59:51  阅读:168  来源: 互联网

标签:演示 消费者 队列 RabbitMQ 处理 消息 channel 入门


RabbitMQ

先做起来,再去想其他。

1.MQ 消息队列概念

MQ(message queue)消息队列,FIFO先入先出。对服务器的请求先加入到消息队列中,再由消息队列来进行请求的分发。

还是一种跨进程的通信机制,用于上下游传递消息。在互联网架构中,MQ是一种非常常见的上下游“逻辑解耦 + 物理解耦”的消息通信服务。

使用MQ以后,上游只需要将消息发送给MQ,无需关注是否下游接收,MQ会监督消息的接收,不再需要依赖其他服务。

1.1 用处

1.1.1 流量消峰

举个例子,如果订单系统最多能处理一万次订单,这个处理能力应付正常时段的下单时绰绰有余,正常时段我们下单一秒后就能返回结果。但是在高峰期,如果有两万次下单操作系统是处理不了的,只能限制订单超过一万后不允许用户下单。

使用消息队列做缓冲,我们可以取消这个限制,把一秒内下的订单分散成一段时间来处理,这时有些用户可能在下单十几秒后才能收到下单成功的操作,但是比不能下单的体验要好。

将消息加入到MQ中,然后由MQ分发错峰处理,处理时间会增加,但是减少了服务器因过高的消息宕机的风险。

1.1.2 应用解耦

以电商应用为例,应用中有订单系统、库存系统、物流系统、支付系统。

用户创建订单后,如果耦合调用库存系统、物流系统、支付系统,任何一个子系统出了故障,都会造成下单操作异常。当转变成基于消息队列的方式后,系统间调用的问题会减少很多,比如物流系统因为发生故障,需要几分钟来修复。

在这几分钟的时间里,物流系统要处理的内存被缓存在消息队列中,用户的下单操作可以正常完成。当物流系统恢复后,继续处理订单信息即可,中单用户感受不到物流系统的故障,提升系统的可用性。

用户下单以后,剩下的操作,如:调取支付系统、库存系统等,不再会将消息直接发给所有的系统,而是有MQ进行下一步的操作,分别发给各个系统,当各个系统的处理完毕以后。接收返回结果,并且MQ会监督消息的执行。

1.1.3 异步处理

有些服务间调用是异步的,例如A调用B,B需要花费很长时间执行,但是A需要知道B什么时候可以执行完,以前一般有两种方式,A过一段时间去调用B的查询api查询。或者A提供一个 callback api, B执行完之后调用 api通知A服务。

这两种方式都不是很优雅,使用消息总线,可以很方便解决这个问题,A调用B服务后,只需要监听B处理完成的消息,当B处理完成后,会发送一条消息给MQ,MQ会将此消息转发给A服务。这样A服务既不用循环调用B的查询 api,也不用提供callback api。同样B服务也不用做这些操作。A服务还能及时的得到异步处理成功的消息。

A调用B,执行B的方法,但是A并不知道B多久执行完,则需要不停的去查询B是否执行完毕。

通过MQ可以解决这个问题,A把需要B执行的消息发给MQ,由MQ去告知B执行,当B执行完毕以后通知MQ,最后再由MQ去通知A。

A就不需要不停的去查询B的执行进度,减少消耗。

1.2 MQ的分类

1.2.1 ActiveMQ

优点

单机吞吐量万级,时效性 ms 级,可用性高,基于主从架构实现高可用性,消息可靠性较低的概率丢失数据

缺点

官方社区现在对 ActiveMQ 5.x 维护越来越少,高吞吐量场景较少使用

1.2.2 Kafka

大数据的杀手锏,谈到大数据领域内的消息传输,则绕不开 Kafka,这款为大数据而生的消息中间件,以其百万级 TPS 的吞吐量名声大噪,迅速成为大数据领域的宠儿,在数据采集、传输、存储的过程中发挥着举足轻重的作用。目前已经被 LinkedIn,Uber, Twitter, Netflix 等大公司所采纳。

优点

性能卓越,单机写入 TPS 约在百万条/秒,最大的优点,就是吞吐量高。时效性 ms 级可用性非常高,kafka 是分布式的,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用,消费者采用 Pull 方式获取消息, 消息有序, 通过控制能够保证所有消息被消费且仅被消费一次;有优秀的第三方KafkaWeb 管理界面 Kafka-Manager;在日志领域比较成熟,被多家公司和多个开源项目使用;

功能支持

功能较为简单,主要支持简单的 MQ 功能,在大数据领域的实时计算以及日志采集被大规模使用

缺点

Kafka 单机超过 64 个队列/分区,Load 会发生明显的飙高现象,队列越多,load 越高,发送消息响应时间变长,使用短轮询方式,实时性取决于轮询间隔时间,消费失败不支持重试;支持消息顺序,但是一台代理宕机后,就会产生消息乱序;社区更新较慢

1.2.3.RocketMQ

RocketMQ 出自阿里巴巴的开源产品,用 Java 语言实现,在设计时参考了 Kafka,并做出了自己的一些改进。被阿里巴巴广泛应用在订单,交易,充值,流计算,消息推送,日志流式处理,binglog 分发等场景。

优点

单机吞吐量十万级,可用性非常高,分布式架构,消息可以做到 0 丢失,MQ 功能较为完善,还是分布式的,扩展性好,支持 10 亿级别的消息堆积,不会因为堆积导致性能下降,源码是 java 我们可以自己阅读源码,定制自己公司的 MQ。

缺点

支持的客户端语言不多,目前是 java 及 c++,其中 c++不成熟;社区活跃度一般,没有在MQ核心中去实现 JMS 等接口,有些系统要迁移需要修改大量代码。

1.2.4 RabbitMQ

2007 年发布,是一个在AMQP(高级消息队列协议)基础上完成的,可复用的企业消息系统,是当前最主流的消息中间件之一。

优点

由于 erlang 语言的高并发特性,性能较好;吞吐量到万级,MQ 功能比较完备,健壮、稳定、易用、跨平台、支持多种语言 如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持 AJAX 文档齐全;

开源提供的管理界面非常棒,用起来很好用,社区活跃度高;更新频率相当高。

需要设置具有erlang环境。

缺点

商业版需要收费,学习成本较高

1.3 MQ的选择

1.3.1 Kafka

Kafka 主要特点是基于Pull 的模式来处理消息消费,追求高吞吐量,一开始的目的就是用于日志收集和传输,适合产生大量数据的互联网服务的数据收集业务。大型公司建议可以选用,如果有日志采集功能,肯定是首选 kafka 了。

1.3.2 RocketMQ

天生为金融互联网领域而生,对于可靠性要求很高的场景,尤其是电商里面的订单扣款,以及业务削峰,在大量交易涌入时,后端可能无法及时处理的情况。RoketMQ 在稳定性上可能更值得信赖,这些业务场景在阿里双 11 已经经历了多次考验,如果你的业务有上述并发场景,建议可以选择 RocketMQ。

1.3.3 RabbitMQ

结合 erlang 语言本身的并发优势,性能好时效性微秒级社区活跃度也比较高,管理界面用起来十分方便,如果你的数据量没有那么大,中小型公司优先选择功能比较完备的 RabbitMQ。

2.RabbitMQ

2.1 概念

RabbitMQ 是一个消息中间件:它接受并转发消息。

你可以把它当做一个快递站点,当你要发送一个包裹时,你把你的包裹放到快递站,快递员最终会把你的快递送到收件人那里,按照这种逻辑 RabbitMQ 是一个快递站,一个快递员帮你传递快件。

RabbitMQ 与快递站的主要区别在于,它不处理快件而是接收,存储和转发消息数据。

RabbitMQ不处理消息,它只对消息进行接收、存储和转发。

2.2 四大核心概念

2.2.1 生产者

产生数据发送消息的程序是生产者。

2.2.2 交换机

交换机是 RabbitMQ 非常重要的一个部件,一方面它接收来自生产者的消息,另一方面它将消息推送到队列中。交换机必须确切知道如何处理它接收到的消息,是将这些消息推送到特定队列还是推送到多个队列,亦或者是把消息丢弃,这个得有交换机类型决定。

交换机必须与队列进行绑定。进行消息的分发。

2.2.3 队列

队列是 RabbitMQ 内部使用的一种数据结构,尽管消息流经 RabbitMQ 和应用程序,但它们只能存储在队列中。队列仅受主机的内存和磁盘限制的约束,本质上是一个大的消息缓冲区。许多生产者可以将消息发送到一个队列,许多消费者可以尝试从一个队列接收数据。这就是我们使用队列的方式。

2.2.4 消费者

消费与接收具有相似的含义。消费者大多时候是一个等待接收消息的程序。请注意,生产者、消费者和消息中间件很多时候并不在同一机器上。同一个应用程序既可以是生产者又是可以是消费者。

一个队列中的消息只能被一个消费者接收;同一条消息被交换机发给多个队列,那么这条消息可以被多个消费者接收。

消费者只能接收消息,分发到具体的处理者,消费者本身不处理消息。

2.3 核心部分

2.3.1 核心模式

  1. 简单模式
  2. 工作模式
  3. 发布、订阅模式
  4. 路由模式
  5. 主题模式
  6. 发布确认模式

2.3.2 工作原理

2.3.2.1 工作原理概述

2.3.2.2 名词解释
  • Broker:接收和分发消息的应用,RabbitMQ Server 就是 Message Broker

  • Virtual host:出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出多个 vhost,每个用户在自己的 vhost 创建 exchange/queue 等

    每个MQ实体中可以有多个交换机和队列,每一个用户对应自己的交换机。

  • Connection:publisher/consumer 和 broker 之间的 TCP 连接

  • Channel:如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP-Connection 的开销将是巨大的,效率也较低。Channel 是在 connection 内部建立的逻辑连接,如果应用程序支持多线程,通常每个 thread 创建单独的 channel 进行通讯,AMQP method 包含了 channel id 帮助客户端和 message broker 识别 channel,所以 channel 之间是完全隔离的。Channel 作为轻量级的

    在消息量巨大时,不为每一条消息单独建立连接,而是在 Connection 的内部建立多个信道,通过信道进行消息的传输,减轻开销。(类似线程池,减小开销、灵活复用等)

  • Connection 极大减少了操作系统建立 TCP connection 的开销

  • Exchange:message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到 queue 中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout(multicast)

  • Queue:消息最终被送到这里等待 consumer 取走

  • Binding:exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key,Binding 信息被保存到 exchange 中的查询表中,用于 message 的分发依据

    将交换机和队列进行绑定

2.3.2.3 总结
  • 生产者生产消息,通过建立连接将消息发送给 MQ 实体;
  • 在 MQ 实体中通过具体的交换机和队列进行消息处理;
  • 消息分发时建立连接,发送给消费者;
  • 消费者进行下一步的处理;

3.消息应答

3.1 概念

消费者完成一个任务可能需要一段时间,如果其中一个消费者处理一个长的任务并仅只完成了部分突然它挂掉了,会发生什么情况?

RabbitMQ 一旦向消费者传递了一条消息,便立即将该消息标记为删除。在这种情况下,突然有个消费者挂掉了,我们将丢失正在处理的消息。以及后续发送给该消费的消息,因为它无法接收到。

为了保证消息在发送过程中不丢失,RabbitMQ 引入消息应答机制。

消息应答:消费者在接收到消息并且处理该消息之后,告诉 RabbitMQ 它已经处理了,告知 RabbitMQ 可以把该消息删除了。这样就可以避免消息在转发过程中的丢失。

为了避免工作线程在接收到消息以后,没有将消息处理完成,导致在队列中的消息被删除,增加了消息应答机制。如果工作线程没有将消息处理完,就不能通知队列。只有当任务处理完以后才能通知队列,避免消息的丢失。

3.2 自动应答

消息发送后立即被认为已经传送成功,这种模式需要在**高吞吐量和数据传输安全性方面做权衡,**因为这种模式如果消息在接收到之前,消费者那边出现连接或者 Channel(信道)关闭,那么消息就丢失了。

当然另一方面这种模式消费者那边可以传递过载的消息,没有对传递的消息数量进行限制,这样有可能使得消费者这边由于接收太多还来不及处理的消息,导致这些消息的积压,最终使得内存耗尽,最终这些消费者线程被操作系统杀死,所以这种模式仅适用在消费者可以高效并以某种速率能够处理这些消息的情况下使用

虽然自动应答看起来很美好,但是有个问题就是当消费者接收到消息就会应答队列 – 消息已经处理,会造成在接收消息以后,后面还有处理过程,接下面的处理过程还并未处理、执行。

消费者在接收到消息就应答了队列,并不知道后续的处理情况。很有可能后续的处理情况会对消息有紧密的联系。对消费者的处理效率等要求太高,一般不用于真实开发场景。

一般使用手动应答,将自动应答关闭(设置相应的参数)。由开发人员自己设置应答的时机。

3.3 手动应答

3.3.1 相关方法(Java)
  1. Channel.basicAck():用于肯定确认

    手动确认,发送消息给队列,当前消息已经处理完,可以将其丢弃

  2. Channel.basicNack():用于否定确认

  3. Channel.basicReject():用于否定确认

    与Channel.basicNack相同的作用,但是少一个参数;不处理该消息,直接拒绝,告知 MQ 将消息丢弃

3.3.2 相关参数

对于肯定确认的方式,可以设置一个参数:Multiple 来设置是否一次性将链接中的消息一次性应答;

如果设置为 true,则链接中所有信道接收的消息,会因为这条设置为 true 的完成。(一批消息的到达会因其中的一条消息的处理完毕而整体回复给 MQ 已处理,不会关注其他消息的处理情况)

一般设置为 false,一条一条消息的应答给队列。

channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
// 第一个参数为:获取消息的在这次分发标记,消息的位置
// 第二个参数为:设置是否一次性将信道中的所有消息进行应答

设置为true,同一批消息会因为其中的一个应答给队列,而被队列视为已经处理完毕。

3.4 消息自动重新入队

如果消费者由于某些原因失去连接(其通道已关闭,连接已关闭或 TCP 连接丢失等)导致消息未发送 ACK 确认,RabbitMQ 将了解到消息未完全处理,并将对其重新排队。如果此时其他消费者可以处理,它将很快将其重新分发给另一个消费者。这样,即使某个消费者偶尔死亡,也可以确保不会丢失任何消息。

如果没有处理完毕,但是处理的工作线程因为某些原因而关闭,因为这条消息没有对队列进行应答,那么队列会及时的分发给其他消费者处理。

3.5 重新应答的实例

  1. 创建多个消费者

  2. 设置消费者的处理时间

    这里通过设置消费者的睡眠时间来模拟消费者的处理时间

  3. 启动多个消费者,让处理慢的消费者停止

    先让慢的消费者接收到消息,但是在没有应答队列前,让其停止

注意:如果设置了新的队列名称,一定要让生产者先启动起来,不然如果是消费者先启动就会报错。

1.创建消费者
public class Worker01 {
   private static final String QUEUE_NAME = "ack_queue";

   public static void main(String[] args) throws Exception {
      Channel channel = RabbitMQUtil.getChannel();

      /*
         * 后面的参数,通过函数式接口表达
         *  第一个函数式表达式:接收到消息的处理方式
         *  第二个函数式表达式:消息接收被取消后的处理方式
         * */

      // 打印当前正在处理的线程名字
      System.out.println("worker处理消息时间短,需要1秒");
      // 接收消息并处理
      /*
         * 注意:将自动应答关闭 -- 第二参数设置为false
         * */
      // 在具体消费者接收到消息之前设置为不公平分发
      channel.basicQos(1);
      channel.basicConsume(QUEUE_NAME,false,
                           (consumerTag, message)->{
                              // 设置线程睡眠,模拟消息的处理速度
                              try {
                                 TimeUnit.SECONDS.sleep(1);
                                 System.out.println("接收到的消息:"+new String(message.getBody()));
                                 // 第二个参数为:设置是否一次性将信道中的所有消息进行应答
                                 channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
                              } catch (InterruptedException e) {
                                 e.printStackTrace();
                              }
                           },
                           (consumerTag)->{
                              System.out.println("消息接收被取消,执行该方法...");
                           });
   }
}
2.创建生产者
public class Task {
   private static final String QUEUE_NAME = "ack_queue";

   public static void main(String[] args) throws Exception {
      // 通过工具类获取信道
      Channel channel = RabbitMQUtil.getChannel();
      // 发送消息,并设定参数
      // 第二个参数:设置持久化的状态
      channel.queueDeclare(QUEUE_NAME,false,false,false,null);

      Scanner scanner = new Scanner(System.in);
      while (scanner.hasNext()){
         String message = scanner.next();
         // 连续发送消息
         // 将第三个参数设置为支持消息持久化
         channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
         System.out.println("消息发送完成:"+message);
      }
   }

   /*
    * 异步发布确认
    * */
   public static void messagePublishAsyn() throws Exception{
      Channel channel = RabbitMQUtil.getChannel();
      // 随机创建一个队列名字
      String queueName = UUID.randomUUID().toString();
      // 声明队列
      channel.queueDeclare(queueName, false, false, false, null);
      // 开启发布确认
      channel.confirmSelect();

      /**
         * 线程安全有序的一个哈希表,适用于高并发的情况
         * 1.轻松的将序号与消息进行关联
         * 2.轻松批量删除条目 只要给到序列号
         * 3.支持并发访问
         */
      ConcurrentSkipListMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();

      /**
         * 确认收到消息的一个回调
         * 1.消息序列号
         * 2.true 可以确认小于等于当前序列号的消息
         *        false 确认当前序列号消息
         */
      ConfirmCallback ackCallback = (sequenceNumber, multiple) -> {
         if (multiple) {
            //返回的是小于等于当前序列号的未确认消息 是一个 map
            ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap(sequenceNumber, true);
            //清除该部分确认消息
            confirmed.clear();
         }else{
            //只清除当前序列号的消息
            outstandingConfirms.remove(sequenceNumber);
         }
      };

      ConfirmCallback nackCallback = (sequenceNumber, multiple) -> {
         String message = outstandingConfirms.get(sequenceNumber);
         System.out.println("发布的消息"+message+"未被确认,序列号"+sequenceNumber);
      };

      /**
         * 添加一个异步确认的监听器
         * 1.确认收到消息的回调
         * 2.未收到消息的回调
         */
      channel.addConfirmListener(ackCallback, nackCallback);

      for (int i = 0; i < 1000; i++) {
         String message = "消息" + i;
         /**
             * channel.getNextPublishSeqNo()获取下一个消息的序列号
             * 通过序列号与消息体进行一个关联
             * 全部都是未确认的消息体
             */
         outstandingConfirms.put(channel.getNextPublishSeqNo(), message);
         // 发送消息
         channel.basicPublish("", queueName, null, message.getBytes());
      }
   }
}
3.运行生产者和消费者,让慢消费者停止
// 运行结果:worker处理消息时间短,需要1秒接收到的消息:AA接收到的消息:CC接收到的消息:DD

可以看到快的消费者处理了3条,其他DD原本是另外一个消费者处理的,但是设置了手动应答并让慢消费者人为停止,最后达到了消息重新入队和换新消费者处理的结果。

4.持久化

4.1 概念

刚刚我们已经看到了如何处理任务不丢失的情况,但是如何保障当 RabbitMQ 服务停掉以后消息生产者发送过来的消息不丢失。默认情况下 RabbitMQ 退出或由于某种原因崩溃时,它忽视队列和消息,除非告知它不要这样做。

确保消息不会丢失需要做两件事:我们需要将队列和消息都标记为持久化

队列的持久化和消息的持久化是分开的,并且为保证消息的持久化,队列也需要持久化。

4.2 队列持久化

在消息的发送方需要设置队列的持久化,通过在消息的发送方声明队列时,设置参数设置队列持久化。将队列设置为持久化以后,在RabbitMQ的后台管理界面也会相应的进行标识。

将队列进行持久化以后,即时RabbitMQ重启这个队列也还在,如果没有进行队列的持久化,断电即失。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-IHvaIE4Y-1644032863938)(F:\StudyNotepad\img\image-20211225122440671.png)]

// 第二个参数:设置持久化的状态channel.queueDeclare(QUEUE_NAME,false,false,false,null);

注意:如果需要修改队列的持久化状态,已经声明过的队列需要修改状态,则需要先将当前队列删除,然后再创建新的队列。如果直接在没有持久化的队列上修改,会报错。

只要修改了队列的参数等代码,就需要删除原有的队列,重新创建一个新的队列。

4.3 消息持久化

消息持久化,需要在消息推送时进行参数设置,将参数设置为:MessageProperties.PERSISTENT_TEXT_PLAIN支持消息持久化。

将消息标记为持久化并不能完全保证不会丢失消息。尽管它会告诉 RabbitMQ 将消息保存到磁盘,但是这里依然存在当消息刚准备存储在磁盘的时候 但是还没有存储完,消息还在缓存的一个间隔点。此时并没有真正写入磁盘。持久性保证并不强,但是对于我们的简单任务队列而言,这已经绰绰有余了。

// 将第三个参数设置为支持消息持久化channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());

消息的持久化并不是完全保证的,在持久化的过程中如果发生意外,还是有消息丢失的可能性。

4.4 不公平分发

在最简单的 RabbitMQ 分发消息采用的为轮训分发(每个队列依次分发),但是在某种场景下这种策略并不是很好,比方说有两个消费者在处理任务,其中有个消费者 1 处理任务的速度非常快,而另外一个消费者 2 处理速度却很慢,这个时候我们还是采用轮训分发的化就会到这处理速度快的这个消费者很大一部分时间处于空闲状态,而处理慢的那个消费者一直在干活,这种分配方式在这种情况下其实就不太好,但是 RabbitMQ 并不知道这种情况它依然很公平的进行分发。

为了避免这种情况,我们可以设置参数 channel.basicQos(1); 改为不公平分发。

// 在具体消费者接收到消息之前设置为不公平分发channel.basicQos(1);

RabbitMQ默认是使用轮询分发,当有工作线程闲置时,不会将任务发送给闲置工作线程,而是等待下一个轮询分发的工作线程。这样会让处理快的工作线程有大量闲置时间,通过在消费者接收消息前设置参数,开启不公平分发,能者多劳。

只有在设置了不公平分发的消费者才能有该特性。

4.5 预取值

本身消息的发送就是异步发送的,所以在任何时候,channel 上肯定不止只有一个消息,另外来自消费者的手动确认本质上也是异步的。因此这里就存在一个未确认的消息缓冲区,因此希望开发人员能限制此缓冲区的大小,以避免缓冲区里面无限制的未确认消息问题。这个时候就可以通过使用 basic.qos 方法设置“预取计数”值来完成的。

该值定义通道上允许的未确认消息的最大数量。一旦数量达到配置的数量,RabbitMQ 将停止在通道上传递更多消息,除非至少有一个未处理的消息被确认,例如,假设在通道上有未确认的消息 5、6、7,8,并且通道的预取计数设置为 4,此时 RabbitMQ 将不会在该通道上再传递任何消息,除非至少有一个未应答的消息被 ack。比方说 tag=6 这个消息刚刚被确认 ACK,RabbitMQ 将会感知这个情况到并再发送一条消息。

消息应答和 QoS 预取值对用户吞吐量有重大影响。通常,增加预取将提高向消费者传递消息的速度。虽然自动应答传输消息速率是最佳的,但是,在这种情况下已传递但尚未处理的消息的数量也会增加,从而增加了消费者的 RAM 消耗(随机存取存储器)应该小心使用具有无限预处理的自动确认模式或手动确认模式。

消费者消费了大量的消息如果没有确认的话,会导致消费者连接节点的内存消耗变大,所以找到合适的预取值是一个反复试验的过程,不同的负载该值取值也不同 100 到 300 范围内的值通常可提供最佳的吞吐量,并且不会给消费者带来太大的风险。预取值为 1 是最保守的。当然这将使吞吐量变得很低,特别是消费者连接延迟很严重的情况下,特别是在消费者连接等待时间较长的环境中。对于大多数应用来说,稍微高一点的值将是最佳的。

前面的不公平分发设置的参数的名称为预取值,除了1以外还可以设置为其他数字。

设置数字的大小为,这个消费者中的信道可以堆积的消息数量。比如:如果将这个消费者的预取值设置为:3,当这个消费者信道接收到一条消息在处理中时,还可以继续接收消息,最多堆积到3条。

5.死信队列

5.1 概念

先从概念解释上搞清楚这个定义,死信,顾名思义就是无法被消费的消息,字面意思可以这样理解,一般来说,producer 将消息投递到 broker 或者直接到 queue 里了,consumer 从 queue 取出消息进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。

应用场景:为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息消费发生异常时,将消息投入死信队列中。还有比如说: 用户在商城下单成功并点击去支付后在指定时间未支付时自动失效。

因为一些特殊情况消费者接收到消息但是没有处理,消费者丢失。消息没有处理,但是不需要将消息重新入队,而是将这个没有处理的消息直接发送给死信队列进行处理。

5.2 造成的原因

  • 消息TTL过期
  • 队列达到最长长度
  • 消息被拒
    • basic.reject或basic.nack并且requeue = false

5.3 测试

5.3.1 概述

5.3.2 TTL过期

  1. 创建消息的生产者
    1. 设置交换机
    2. 设置消息的过期时间
  2. 创建消费者1,启动后关闭
    1. 创建两个交换机
    2. 设置当消息没有处理完以后的处理策略
    3. 绑定死信交换机
1.创建消息的生产者
/*
* 消息的发送方
* */
public class Producer {
   private static final String NORMAL_EXCHANGE = "normal_exchange";
   /*
    * 发送方设置TTL时间,模拟因TTL而造成的死信
    * */
   public static void ttl() throws Exception {
      Channel channel = RabbitMQUtil.getChannel();
      // 1.声明一个交换机
      channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
      // 2.设置TTL时间
      AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
      // 3.发送消息
      for (int i = 1; i < 11; i++) {
         String msg = "info" + i;
         channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",properties,msg.getBytes("UTF-8"));
         System.out.println("消息已发送,发送的消息是:"+msg);
      }
   }
}
2.创建消息的消费者
/*
* 消息的接收方,通过三种情况将消息变为死信,转发给死信队列
* - TTL时间过期
* - 队列长队达到最大长度,无法再添加
* - 消息被拒收
* */
public class Consumer {
   /*
    * 定义两个交换机和两个队列,实现过期消息发送给死信队列
    * */
   private static final String NORMAL_EXCHANGE = "normal_exchange";
   private static final String DEAD_EXCHANGE = "dead_exchange";

   /*
    * 因为TTL时间过期发生死信
    * */
   public static void ttl() throws Exception {
      Channel channel = RabbitMQUtil.getChannel();
      // 1.声明两个交换机
      channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
      channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT);

      // 2.声明两个队列
      // 2.1 声明死信队列
      channel.queueDeclare("dead-queue",false,false,false,null);
      channel.queueBind("dead-queue",DEAD_EXCHANGE,"lisi");
      // 2.2 声明普通队列,并设置对于死信的处理策略,在最后一个参数,通过Map传入
      Map<String,Object> params = new HashMap<>();
      params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
      params.put("x-dead-letter-routing-key", "lisi");
      channel.queueDeclare("normal-queue",false,false,false,params);

      // 3.交换机和队列的绑定 队列名-交换机名-绑定的key
      channel.queueBind("normal-queue",NORMAL_EXCHANGE,"zhangsan");

      //4.接收消息
      System.out.println("等待接收消息,把接收到的消息打印在屏幕........... ");
      // 消息成功接收的处理方式
      DeliverCallback deliverCallback = (consumerTag, delivery) -> {
         String message = new String(delivery.getBody(), "UTF-8");
         System.out.println("控制台打印接收到的消息"+message);
      };
      // 消息处理
      channel.basicConsume("normal-queue", true, deliverCallback, consumerTag -> {});
   }


   /*
    * 死信的接收者
    * */
   public static void ttlDead() throws Exception {
      Channel channel = RabbitMQUtil.getChannel();
      channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT);
      channel.queueDeclare("dead-queue",false,false,false,null);
      channel.queueBind("dead-queue",DEAD_EXCHANGE,"lisi");
      System.out.println("等待接收消息,把接收到的消息打印在屏幕........... ");
      // 消息成功接收的处理方式
      DeliverCallback deliverCallback = (consumerTag, delivery) -> {
         String message = new String(delivery.getBody(), "UTF-8");
         System.out.println("控制台打印接收到的消息"+message);
      };
      // 消息处理
      channel.basicConsume("dead-queue", true, deliverCallback, consumerTag -> {});
   }
}

结果:当正常的消费者不能接收到消息,超过了消息的TTL以后,消息由死信队列接收到。

5.3.3 队列达到最大长度

  1. 创建消息生产者
  2. 创建消息接受者
    1. 设置参数:最大队列长度-6

和前面演示一样,在生产者中的TTL参数删除;在消费者中添加一个参数:params.put("x-max-length",6);

其他一样,最后的结果,生产者发送了10条消息,最终正常接收到6条,其余4条由死信队列接收。

5.3.4 消息被拒

  1. 创建消息生产者
  2. 创建消息接收者
    1. 在消息接收处理中设置消息的拒绝策略

结果:被拒的消息被死信队列接收。

6.延迟队列

6.1 概念

延时队列,队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望在指定时间到了以后或之前取出和处理,简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列。

延迟队列就是死信队列中消息TTL过期产生的结果。

6.2 延迟队列的一些使用场景

  • 订单在十分钟之内未支付则自动取消
  • 新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒
  • 用户注册成功后,如果三天内没有登陆则进行短信提醒
  • 用户发起退款,如果三天内没有得到处理则通知相关运营人员
  • 预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议

延迟队列就是将消息放在延迟队列中,设置一个倒计时,当时间到了以后执行这个消息,进行处理,根据处理的结果进行更新。

6.3 RabbitMQ 中的 TTL

TTL是什么呢?

TTL 是 RabbitMQ 中一个消息或者队列的属性,表明一条消息或者该队列中的所有消息的最大存活时间,单位是毫秒。

换句话说,如果一条消息设置了 TTL 属性或者进入了设置 TTL 属性的队列,那么这条消息如果在 TTL 设置的时间内没有被消费,则会成为"死信"。如果同时配置了队列的 TTL 和消息的 TTL,那么较小的那个值将会被使用,有两种方式设置 TTL。

6.4 队列 TTL 和消息 TTL 的区别

如果设置了队列的 TTL 属性,那么一旦消息过期,就会被队列丢弃(如果配置了死信队列被丢到死信队列中),而第二种方式,消息即使过期,也不一定会被马上丢弃,因为消息是否过期是在即将投递到消费者之前判定的,如果当前队列有严重的消息积压情况,则已过期的消息也许还能存活较长时间;

另外,还需要注意的一点是,如果不设置 TTL,表示消息永远不会过期,如果将 TTL 设置为 0,则表示除非此时可以直接投递该消息到消费者,否则该消息将会被丢弃。

前一小节我们介绍了死信队列,刚刚又介绍了 TTL,至此利用 RabbitMQ 实现延时队列的两大要素已经集齐,接下来只需要将它们进行融合,再加入一点点调味料,延时队列就可以新鲜出炉了。

想想看,延时队列,不就是想要消息延迟多久被处理吗,TTL 则刚好能让消息在延迟多久之后成为死信,另一方面,成为死信的消息都会被投递到死信队列里,这样只需要消费者一直消费死信队列里的消息就完事了,因为里面的消息都是希望被立即处理的消息。

通过设置普通队列的死信队列和消息的TTL,达到延迟队列的效果。

正常情况下,消息由交换机发出,然后发给指定的队列,但是队列在消息的TTL中没有进行处理,那么该消息就会交给死信队列,然后再由死信队列进行处理,就达到了延迟队列的效果。

6.5 整合Spring Boot

  1. 导入依赖
  2. 设置队列和交换机绑定的配置文件
  3. 发送消息
  4. 设置监听器接收消息

6.6 优化延时队列

将消息延迟的TTL交给发送方而不是由消费者来定义。

6.6.1 问题

改变以后发现,消息是一条一条的发送的,当前面的消息还没有达到它的 TTL,后面的消息不会继续发送。

通过添加延迟队列插件优化。

6.6.2 安装延迟插件

在官网上下载 https://www.rabbitmq.com/community-plugins.html下载,安装到/usr/lib/rabbitmq/lib/rabbitmq_server-3.8.8/plugins目录下。

添加了这个插件以后,除了以后能创建的队列类型以外 ,增加了一个延迟消息交换机,这样就可以把消息的延迟交给交换机,而不是像前面一样将消息的延迟交给队列导致前面的消息没有到达 TTL 后面的消息不能发送出去。

6.6.3 实例

  1. 创建延迟交换机
  2. 绑定队列
1.创建延迟交换机
/*
* 创建一个延迟消息交换机,通过插件实现
* */
@Configuration
public class DelayQueueConfig {
   public static final String DELAYED_QUEUE_NAME = "delayed.queue";
   public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
   public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";

   @Bean
   public Queue delayedQueue() {
      return new Queue(DELAYED_QUEUE_NAME);
   }

   //自定义交换机 我们在这里定义的是一个延迟交换机
   @Bean
   public CustomExchange delayedExchange() {
      Map<String, Object> args = new HashMap<>();
      //自定义交换机的类型
      args.put("x-delayed-type", "direct");
      // 交换机的名字-交换机的类型-持久化-自动删除-其他参数
      return new CustomExchange(DELAYED_EXCHANGE_NAME, 
                                "x-delayed-message", 
                                true, 
                                false, 
                                args);
   }

   @Bean
   public Binding bindingDelayedQueue(@Qualifier("delayedQueue") 
                                      Queue queue,
                                      @Qualifier("delayedExchange") 
                                      CustomExchange delayedExchange) {
      // 最后为构建
      return BindingBuilder.
         bind(queue).
         to(delayedExchange).
         with(DELAYED_ROUTING_KEY).
         noargs();
   }
}

结果:解决了前面的问题。

6.6.4 总结

延时队列在需要延时处理的场景下非常有用,使用RabbitMQ来实现延时队列可以很好的利用RabbitMQ的特性,如:消息可靠发送、消息可靠投递、死信队列来保障消息至少被消费一次以及未被正确处理的消息不会被丢弃。

另外,通过 RabbitMQ 集群的特性,可以很好的解决单点故障问题,不会因为单个节点挂掉导致延时队列不可用或者消息丢失。

当然,延时队列还有很多其它选择,比如利用 Java 的 DelayQueue,利用 Redis 的 zset,利用 Quartz 或者利用 kafka 的时间轮,这些方式各有特点,看需要适用的场景。

7.其他知识点

7.1 幂等性

7.1.1概念

用户对于同一操作发起的一次请求或者多次请求的结果是一致的,不会因为多次点击而产生了副作用。

举个最简单的例子,那就是支付,用户购买商品后支付,支付扣款成功,但是返回结果的时候网络异常,此时钱已经扣了,用户再次点击按钮,此时会进行第二次扣款,返回结果成功,用户查询余额发现多扣钱了,流水记录也变成了两条。在以前的单应用系统中,我们只需要把数据操作放入事务中即可,发生错误立即回滚,但是再响应客户端的时候也有可能出现网络中断或者异常等等。

因为一些原因导致消费者已经消费了消息但是无法回复给 MQ,然后 MQ 再次分发这条消息,造成多次消费。

7.1.2消息重复消费

消费者在消费 MQ 中的消息时,MQ 已把消息发送给消费者,消费者在给 MQ 返回 ack 时网络中断,故 MQ 未收到确认信息,该条消息会重新发给其他的消费者,或者在网络重连后再次发送给该消费者,但实际上该消费者已成功消费了该条消息,造成消费者消费了重复的消息。

7.1.3 解决思路

MQ 消费者的幂等性的解决一般使用全局 ID 或者写个唯一标识比如时间戳 或者 UUID 或者订单消费者消费 MQ 中的消息也可利用 MQ 的该 id 来判断,或者可按自己的规则生成一个全局唯一 id,每次消费消息时用该 id 先判断该消息是否已消费过。

7.1.4消费端的幂等性保障

在海量订单生成的业务高峰期,生产端有可能就会重复发生了消息,这时候消费端就要实现幂等性,这就意味着我们的消息永远不会被消费多次,即使我们收到了一样的消息。业界主流的幂等性有两种操作:

  1. 唯一 ID+指纹码机制,利用数据库主键去重
  2. 利用 redis的原子性去实现。

7.1.5 唯一ID + 指纹码机制

指纹码:我们的一些规则或者时间戳加别的服务给到的唯一信息码,它并不一定是我们系统生成的,基本都是由我们的业务规则拼接而来,但是一定要保证唯一性,然后就利用查询语句进行判断这个 id 是否存在数据库中,优势就是实现简单就一个拼接,然后查询判断是否重复;

劣势就是在高并发时,如果是单个数据库就会有写入性能瓶颈当然也可以采用分库分表提升性能,但也不是我们最推荐的方式。

7.1.6 Redis 原子性

利用 redis 执行 setnx 命令,天然具有幂等性。从而实现不重复消费。

一般通过这种方式。

7.2 优先级队列

7.2.1 使用场景

假设有一个订单催付的场景,我们的客户在天猫下的订单,淘宝会及时将订单推送给我们,如果在用户设定的时间内未付款那么就会给用户推送一条短信提醒,很简单的一个功能对吧,但是,天猫商家对我们来说,肯定是要分大客户和小客户的对吧,比如像苹果,小米这样大商家一年起码能给我们创造很大的利润,所以理应当然,他们的订单必须得到优先处理,而曾经我们的后端系统是使用 redis 来存放的定时轮询,大家都知道 redis 只能用 List 做一个简简单单的消息队列,并不能实现一个优先级的场景。

通过给队列设置优先级范围,然后再发送消息的时候,设置消息的优先级。

注意:队列接收到消息一定是批量的,如果是单条消息的接收与发送,那么就不存在优先级问题,一定是多条消息同时发送给发送方队列,然后再对批消息进行优先级排序。

7.2.2 步骤

  1. 在相关队列设置参数开启优先级支持并设置优先级数值范围
  2. 在发送消息时,设置消息的优先级

在Spring Boot中创建队列时声明参数,在发送消息时设置消息优先级。

7.3 惰性队列

7.3.1 使用场景

RabbitMQ 从 3.6.0 版本开始引入了惰性队列的概念。惰性队列会尽可能的将消息存入磁盘中,而在消费者消费到相应的消息时才会被加载到内存中,它的一个重要的设计目标是能够支持更长的队列,即支持更多的消息存储。当消费者由于各种各样的原因(比如消费者下线、宕机亦或者是由于维护而关闭等)而致使长时间内不能消费消息造成堆积时,惰性队列就很有必要了。

默认情况下,当生产者将消息发送到 RabbitMQ 的时候,队列中的消息会尽可能的存储在内存之中,这样可以更加快速的将消息发送给消费者。即使是持久化的消息,在被写入磁盘的同时也会在内存中驻留一份备份。当 RabbitMQ 需要释放内存的时候,会将内存中的消息换页至磁盘中,这个操作会耗费较长的时间,也会阻塞队列的操作,进而无法接收新的消息。虽然 RabbitMQ 的开发者们一直在升级相关的算法,但是效果始终不太理想,尤其是在消息量特别大的时候。

正常情况下,队列中的消息是保存在内存中的,惰性队列则是将消息保存在磁盘中。

内存中的消息消费快,磁盘中的消息占空间少,在消费者宕机的情况下可以使用惰性队列将消息保存到磁盘中。

13.3.2 步骤

在声明队列的参数中设置为惰性队列即可。

标签:演示,消费者,队列,RabbitMQ,处理,消息,channel,入门
来源: https://blog.csdn.net/BaiKnow/article/details/122789134

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有