ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

2021-03-01

2021-03-01 20:01:18  阅读:196  来源: 互联网

标签:03 01 消费者 队列 模式 死信 消费 2021 消息


 

一.AMQP消息协议你是怎么理解的?

AMQP(高级消息队列协议),即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。Erlang中的实现有RabbitMQ等。

AMQP工作流程

img

1、发布者发布消息,经由交换机。交换机根据路由规则将收到的消息分发给与该交换机绑定的队列。最后 AMQP 代理会将消息投递给订阅了此队列的消费者,或者消费者按照需求自行获取。

2、发布者、交换机、队列、消费者都可以有多个。同时因为 AMQP 是一个网络协议,所以这个过程中的发布者,消费者,消息代理 可以分别存在于不同的设备上。

3、发布者发布消息时可以给消息指定各种消息属性(Message Meta-data)。有些属性有可能会被消息代理(Brokers)使用,然而其他的属性则是完全不透明的,它们只能被接收消息的应用所使用。

4、从安全角度考虑,网络是不可靠的,又或是消费者在处理消息的过程中意外挂掉,这样没有处理成功的消息就会丢失。基于此原因,AMQP 模块包含了一个消息确认(Message Acknowledgements)机制:当一个消息从队列中投递给消费者后,不会立即从队列中删除,直到它收到来自消费者的确认回执(Acknowledgement)后,才完全从队列中删除。

5、在某些情况下,例如当一个消息无法被成功路由时(无法从交换机分发到队列),消息或许会被返回给发布者并被丢弃。或者,如果消息代理执行了延期操作,消息会被放入一个所谓的死信队列中。此时,消息发布者可以选择某些参数来处理这些特殊情况。

二.rabbitmq常见的消息模式有哪些

1.简单模式

    一个生产者,一个消费者,消息通过FIFO模式排队;

2.work模式

    一个生产者,多个消费者,消费者接受到的消息是不同的;
    普通的work模式采用轮询的方式向不同的消费者发送消息;
    “能者多劳”的work模式采用回复确认机制,将消息置为手动确认模式,且将预置队列数为1(prefetchCount=1);一个消费者接收到一条消息,直到处理成功确认后才会收到第二条消息;即处理能力强的消费者将接收更多的消息。

3.订阅模式

    一个生产者,一个交换机,多个队列,多个消费者,一个消息经过队列后只能被一个消费者获取,所以每个消费者都配备了队列;
    生产者生成消息后,将由交换机分发到多个队列,队列再将消息传递给后方的消费者。一个消息可以被分发给多个消息队列,所以可以达到一个消息被多个消费者消费的目的。

4.路由主题

    一个生产者,一个交换机,多个队列,多个消费者;与订阅模式类似,不同之处在于消息带有类型,不同的消费者可以订阅不同类型的消息;(消息可以自定义类型,例如“update”、“create”、“delete”、“audit.irs.corporate”等)
    消息在经过交换机时,根据消息的类型进行路由,从而被分发到不同的队列中;

5.主题模式(通配符模式)

    一个生产者,一个交换机,多个队列,多个消费者;与路由模式类似,不同之处在于,其可以通过简单通配符的方式进行消息路由,比如“#”表示所有消息类型,“*”表示单一词匹配;
    例如:“audit.irs.corporate”消息,“audit.#”可以匹配到,但“audit.*”只能匹配“audit.irs”消息。

三 rabbitmq在项目中你是怎么用的

rabbitmq整合springboot

引入依赖

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

引入配置文件

spring:
  application:
    name: springboot_rabbitmq
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: ems
    password: 12345
    virtual-host: /ems

开发生产者

@SpringBootTest(classes = RabbitMQApplication.class)
@RunWith(SpringRunner.class)
public class TestRabbitMQ {
    @Autowired
    AmqpTemplate amqpTemplate;
    @Test
    public void sendMessage(){
        //参数1:队列名称  参数2:消息内容
        amqpTemplate.convertAndSend("hello","hello world....");
    }
}

开发消费者

@Component//将监听器交给spring容器管理
public class MessageListener {
    @RabbitListener(queuesToDeclare=@Queue(value = "q1"))//声明当前类是一个监听器  绑定的队列的名称是q2
    public void getMessage01(String message){
        System.out.println("the message is : "+message);
    }
 }

四  rabbitmq如何保证消息在投递的过程不被丢失?

一般情况下rabbitmq消息很少丢失 但不排除这种可能  有一下几点保证

1.消息持久化

2.ACK确认机制

3.设置集群镜像模式

4.消息补偿机制

1.消息持久化

RabbitMQ 的消息默认存放在内存上面,如果不特别声明设置,消息不会持久化保存到硬盘上面的,如果节点重启或者意外crash掉,消息就会丢失。

所以就要对消息进行持久化处理。如何持久化,下面具体说明下:

要想做到消息持久化,必须满足以下三个条件,缺一不可。

1) Exchange 设置持久化

2)Queue 设置持久化

3)Message持久化发送:发送消息设置发送模式deliveryMode=2,代表持久化消息

2.ACK确认机制

多个消费者同时收取消息,比如消息接收到一半的时候,一个消费者死掉了(逻辑复杂时间太长,超时了或者消费被停机或者网络断开链接),如何保证消息不丢?

这个使用就要使用Message acknowledgment 机制,就是消费端消费完成要通知服务端,服务端才把消息从内存删除。

这样就解决了,及时一个消费者出了问题,没有同步消息给服务端,还有其他的消费端去消费,保证了消息不丢的case。

3.设置集群镜像模式

1)单节点模式:最简单的情况,非集群模式,节点挂了,消息就不能用了。业务可能瘫痪,只能等待。
2)普通模式:默认的集群模式,某个节点挂了,该节点上的消息不能用,有影响的业务瘫痪,只能等待节点恢复重启可用(必须持久化消息情况下)。
3)镜像模式:把需要的队列做成镜像队列,存在于多个节点,属于RabbitMQ的HA方案

保证消息不丢失,需要采用HA 镜像模式队列。

但是:HA 镜像队列有一个很大的缺点就是:   系统的吞吐量会有所下降

4.消息补偿机制

但是作为有追求的程序员来讲,要绝对保证我的系统的稳定性,有一种危机意识。

比如:持久化的消息,保存到硬盘过程中,当前队列节点挂了,存储节点硬盘又坏了,消息丢了,怎么办?

产线网络环境太复杂,所以不知数太多,消息补偿机制需要建立在消息要写入DB日志,发送日志,接受日志,两者的状态必须记录。

然后根据DB日志记录check 消息发送消费是否成功,不成功,进行消息补偿措施,重新发送消息处理。

五  解决消息重复消费的问题

1.当拿到这个消息做数据库的insert操作。那就容易了,给这个消息做一个唯一主键,

那么就算出现重复消费的情况,就会导致主键冲突,避免数据库出现脏数据。

2.当拿到这个消息做redis的set的操作,那就容易了,不用解决,因为你无论set几次结果都是一样的,set操作本来就算幂等操作。

 3.如果上面两种情况还不行,准备一个第三方存储,来做消费记录。以redis为例,给消息分配一个全局id,

只要消费过该消息,将<id,message>以K-V形式写入redis。那消费者开始消费前,先去redis中查询有没消费记录即可。

六 如何解决消息堆积的问题?

1.后台定时任务每隔72小时,删除旧的没有使用过的消息信息

2.根据不同的业务实现不同的丢弃任务,选择不同的策略淘汰任务,例如FIFO/LRU等

3.消息定时转移,或者对某些重要的 TAG 型(支付型)消息真正落库

参照aa这也可以

七 什么是死信队列  死信队列的原因是什么 如何处理死信消息?

死信队列就是没有被及时消费的消息存放的队列  

原因:1.消息被拒绝 并且不再投递

           2.TTL消息超时未消费

         3.达到队列最大长度

死信的处理方式
死信的产生既然不可避免,那么就需要从实际的业务角度和场景出发,对这些死信进行后续的处理,常见的处理方式大致有下面几种,

1.丢弃,如果不是很重要,可以选择丢弃
2.记录死信入库,然后做后续的业务分析或处理

具体参照死信队列

八 如何实现消息限流

消息队列限流是指在服务器面临巨额流量时,为了进行自保,进行的一种救急措施。因为巨大的流量代表着非常多的消息,这些消息如果多到服务器处理不过来就会造成服务器瘫痪,影响用户体验,造成不良影响。所以要进行一次降级操作,把处理不了的流量隔绝在系统之外,避免它们打垮系统。基本上任何一个消息队列都有限流的功能,今天我们就来看看在 RabbitMQ 之中进行限流具体应该怎么做?

 

消息队列限流是指在服务器面临巨额流量时,为了进行自保,进行的一种救急措施。

因为巨大的流量代表着非常多的消息,这些消息如果多到服务器处理不过来就会造成服务器瘫痪,影响用户体验,造成不良影响。

所以要进行一次降级操作,把处理不了的流量隔绝在系统之外,避免它们打垮系统。

基本上任何一个消息队列都有限流的功能,今天我们就来看看在RabbitMQ之中进行限流具体应该怎么做?

RabbitMQ提供了一种QOS(服务质量保证)功能,即在非自动确认消息的前提下,如果一定数目的消息还未被消费确认,则不进行新消息的消费。

spring:
  rabbitmq:
    addresses: 127.0.0.1
    host: 5672
    username: guest
    password: guest
    virtual-host: /
    # 手动确认消息
    listener:
      simple:
          acknowledge-mode: manual
          prefetch: 2

我们只需要配置一下rabbitmq.listener.simple下的prefetch属性即可,为了演示方便我这里配置为两条,语义即为:如果队列中有两条以上未签收的消息,则不进行新的消息消费。

我往我的队列中发送三条信息,并不进行签收,来看看效果:

image.png

发送完显示我们系统中有三条Ready消息,这代表这三条消息还在队列中没有消费端去消费。

这时我打开消费端进行消费但依旧不进行签收,接着来看效果:

image.png

unacked=2,ready=1,这就代表有两条消息在服务端消费了但是没有签收,还有一条消息还在队列中没有往服务端推送,因为我们设置了prefetch=2,所以现在队列的最大同时在消费的消息数量为2,通过此种方式,我们就完成了消费限流。

Tip : 这种方式下消息一定要进行手动签收,因为之前的文章中我们讲过,自动签收是消息一达到消费端就进行签收了,可能我们的业务逻辑还没运行就已经进行签收了,所以自动签收状态下开启限流几乎没有作用。

 

标签:03,01,消费者,队列,模式,死信,消费,2021,消息
来源: https://blog.csdn.net/LIJiaHui232/article/details/114269186

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有