ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

Springboot整合RabbitMQ详解

2021-11-30 20:00:19  阅读:32  来源: 互联网

标签:false Springboot 队列 rabbitmq RabbitMQ Queue 详解 消息 public


RabbitMQ

文章目录


本文结合了网上一些资料借鉴,如有侵权请指出。

RabbitMQ的特点

RabbitMQ是一款使用Erlang语言开发的,实现AMQP(高级消息队列协议)的开源消息中间件。首先要知道一些RabbitMQ的特点,官网可查:

  • 可靠性。支持持久化,传输确认,发布确认等保证了MQ的可靠性。
  • 灵活的分发消息策略。这应该是RabbitMQ的一大特点。在消息进入MQ前由Exchange(交换机)进行路由消息。
    分发消息策略有:简单模式、工作队列模式、发布订阅模式、路由模式、通配符模式。
  • 支持集群。多台RabbitMQ服务器可以组成一个集群,形成一个逻辑Broker。
  • 多种协议。RabbitMQ支持多种消息队列协议,比如 STOMP、MQTT 等等。
  • 支持多种语言客户端。RabbitMQ几乎支持所有常用编程语言,包括 Java、.NET、Ruby 等等。
  • 可视化管理界面。RabbitMQ提供了一个易用的用户界面,使得用户可以监控和管理消息 Broker。
  • 插件机制。RabbitMQ提供了许多插件,可以通过插件进行扩展,也可以编写自己的插件。

AMQP

AMQP模型

消息(message)被发布者(publisher)发送给交换机(exchange),然后交换机将收到的消息根据路由规则分发给绑定的队列(queue)。最后AMQP代理会将消息投递给订阅了此队列的消费者,或者消费者按照需求自行获取。

AMQP模型图

消息确认

从安全角度考虑,网络是不可靠的,接收消息的应用也有可能在处理消息的时候失败。基于此原因,AMQP模块包含了一个消息确认(message acknowledgements)的概念:当消息从队列投递给消费者的时候,消费者服务器需要返回一个ack(确认信息),当broker收到了确认才会将该消息删除;消息确认可以是自动的,也可以是由消费端手动确认。此外也支持生产端向broker发送消息得到broker的ack,从而针对做出响应逻辑。

AMQP是一个可编程的协议

某种意义上说AMQP的实体和路由规则是由应用本身定义的,而不是由消息代理定义。包括像声明队列和交换机,定义他们之间的绑定,订阅队列等等关于协议本身的操作。但是需要注意双方定义的冲突,否则会暴露出配置错误的问题。

RabbitMQ安装

Windows10安装

步骤

  • 到erlang官网下载win10版安装包。下载完成后傻瓜式安装。
  • 配置erlang环境变量

image-20211124135631736

image-20211124135719557

​ cmd输入erl验证安装是否成功,如下成功;ctrl+c退出

image-20211124135919422

  • 傻瓜式安装RabbitMQ服务。
    在RabbitMQ的gitHub项目中,下载window版本的服务端安装包
  • 进入安装目录,sbin目录下,执行:rabbitmq-plugins enable rabbitmq_management 命令安装管理页面的插件

image-20211124140316403

img

image-20211124140741917

Spring整合AMQP

官方中文文档

Spring AMQP (geekdoc.top)

GitHup翻译文档

GitHub - rockit-ba/spring-rabbit-: spring AMQP 实现: spring rabbit 官方中文文档翻译

Spring AMQP主要对象类及作用

作用
Queue对应RabbitMQ中Queue
AmqpTemplate接口,用于向RabbitMQ发送和接收Message
RabbitTemplateAmqpTemplate的实现类
@RabbitListener指定消息接收方,可以配置在类和方法上
@RabbitHandler指定消息接收方,只能配置在方法上,可以与@RabbitListener一起使用
Message对RabbitMQ消息的封装
Exchange对RabbitMQ的Exchange的封装,子类有TopicExchange、FanoutExchange和DirectExchange等
Binding将一个Queue绑定到某个Exchange,本身只是一个声明,并不做实际绑定操作
AmqpAdmin接口,用于Exchange和Queue的管理,比如创建/删除/绑定等,自动检查Binding类并完成绑定操作
RabbitAdminAmqpAdmin的实现类
ConnectionFactory创建Connection的工厂类,RabbitMQ也有一个名为ConnectionFactory的类但二者没有继承关系,Spring ConnectionFactory可以认为是对RabbitMQ ConnectionFactory的封装
CachingConnectionFactorySpring ConnectionFactory的实现类,可以用于缓存Channel和Connection
ConnectionSpring中用于创建Channel的连接类,RabbitMQ也有一个名为Connection的类,但二者没有继承关系,Spring Connection是对RabbitMQ Connection的封装
SimpleConnectionSpring Connection的实现类,将实际工作代理给RabbitMQ的Connection类
MessageListenerContainer接口,消费端负责与RabbitMQ服务器保持连接并将Message传递给实际的@RabbitListener/@RabbitHandler处理
RabbitListenerContainerFactory接口,用于创建MessageListenerContainer
SimpleMessageListenerContainerMessageListenerContainer的实现类
SimpleRabbitListenerContainerFactoryRabbitListenerContainerFactory的实现类
RabbitProperties用于配置Spring AMQP的Property类

Spring AMQP主要参数

参数默认值说明
基础信息
spring.rabbitmq.hostlocalhost主机
spring.rabbitmq.port5672端口
spring.rabbitmq.usernameguest用户名
spring.rabbitmq.passwordguest密码
spring.rabbitmq.virtual-host虚拟主机
spring.rabbitmq.addressesserver的地址列表(以逗号分隔),配置了该项将忽略spring.rabbitmq.host和spring.rabbitmq.port
spring.rabbitmq.requested-heartbeat请求心跳超时时间,0表示不指定;如果后面没加时间单位默认为秒
spring.rabbitmq.publisher-confirm-typenone发布确认类型,none、correlated、simple该配置只管有无投递到exchange,而不管有无发送到队列当中
spring.rabbitmq.publisher-returnsfalse是否启用发布返回
spring.rabbitmq.connection-timeout连接超时时间,0表示永不超时
缓存cache
spring.rabbitmq.cache.channel.checkout-timeout如果已达到channel缓存大小,等待获取channel的时间。 如果为0,则始终创建一个新channel。
spring.rabbitmq.cache.channel.size缓存中保持的channel数量
spring.rabbitmq.cache.connection.size缓存的connection数,只有是CONNECTION模式时生效
spring.rabbitmq.cache.connection.modechannel连接工厂缓存模式
Listener
spring.rabbitmq.listener.typesimple容器类型,simple或direct
spring.rabbitmq.listener.simple.auto-startuptrue应用启动时是否启动容器
spring.rabbitmq.listener.simple.acknowledge-modeauto消息确认方式,none、manual和auto
spring.rabbitmq.listener.simple.concurrencylistener最小消费者数
spring.rabbitmq.listener.simple.max-concurrencylistener最大消费者数
spring.rabbitmq.listener.simple.prefetch一个消费者最多可处理的nack消息数量
spring.rabbitmq.listener.simple.default-requeue-rejectedtrue被拒绝的消息是否重新入队
spring.rabbitmq.listener.simple.missing-queues-fataltrue如果容器声明的队列不可用,是否失败;或如果在运行时删除一个或多个队列,是否停止容器
spring.rabbitmq.listener.simple.idle-event-interval空闲容器事件应多久发布一次
spring.rabbitmq.listener.simple.retry.enabledfalse是否开启消费者重试
spring.rabbitmq.listener.simple.retry.max-attempts3最大重试次数
spring.rabbitmq.listener.simple.retry.max-interval10000ms最大重试间隔
spring.rabbitmq.listener.simple.retry.initial-interval1000ms第一次和第二次尝试发送消息的时间间隔
spring.rabbitmq.listener.simple.retry.multiplier1.0应用于前一个重试间隔的乘数
spring.rabbitmq.listener.simple.retry.statelesstrue重试是无状态还是有状态
spring.rabbitmq.listener.direct.consumers-per-queue每个队列消费者数量
direct类型listener其他参数同simple类型
Template
spring.rabbitmq.template.mandatoryfalse消息在没有被队列接收时是否退回,与spring.rabbitmq.publisher-returns类似, 该配置优先级高于spring.rabbitmq.publisher-returns
spring.rabbitmq.template.receive-timeoutreceive() 操作的超时时间
spring.rabbitmq.template.reply-timeoutsendAndReceive() 操作的超时时间
spring.rabbitmq.template.retry.enabledfalse发送消息是否重试
spring.rabbitmq.template.retry.max-attempts3.0发送消息最大重试次数
spring.rabbitmq.template.retry.initial-interval1000ms第一次和第二次尝试发送消息的时间间隔
spring.rabbitmq.template.retry.multiplier1.0应用于前一个重试间隔的乘数
spring.rabbitmq.template.retry.max-interval10000ms最大重试间隔

Springboot整合AMQP

消费端监听相关注解

@RabbitListener

可以作用在类或方法上,设置监听的队列。 如果未设置containerFactory(),则使用默认容器工厂。

内置许多属性提供绑定队列的关系。

  • 作用在方法上:表明该方法监听某个队列
  • 作用在类上:需配合使用@RabbitHandler,监听队列会调用@RabbitHandler注释的方法
@RabbitListener(bindings = @QueueBinding(
    value = @Queue(value = "directQueue-Two", durable = "false"),
    exchange = @Exchange(value = "MqSendService-One", type = "direct", durable = "false"),
    key = "One"),
    ackMode = "MANUAL"
)
public void tsJucDirectMsgTwo(@Header Message data, Channel channel){}
注意事项

必须指定监听的队列。建议声明指定绑定交换器和队列,保持和生产端一致

**方式一:**只声明监听队列(不推荐)

@RabbitListener(queues = "directQueue-One")

该方式消费者会默认监听这个队列,如果rabbit服务端broker内不存在该队列,则会一直报错

**方式二:**保持和生产端同步,指定绑定关系

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(value = "directQueue-One",type = "direct"),
    exchange = @Exchange(value = "MqSendService-One"),
    key = "One"
))

该方式如果broker内还未存在指定队列,则会直接创建指定的Exchange和Queue。

无队列情况出现场景:

  1. 在生产端声明了但还未发送消息情况,因为若只在生产端声明,但还未发送过消息,就不会创建对应的Exchange和Queue。
  2. broker中的队列被删除
@RabbitHandler

@RabbitListener 标注在类上面表示当有收到消息的时候,就交给 @RabbitHandler 的方法处理;

具体使用哪个方法处理,根据 MessageConverter 转换后的参数类型

@Component
@RabbitListener(queues = "consumer_queue")
public class Receiver {
 
    @RabbitHandler
    public void processMessage1(String message) {
        System.out.println(message);
    }
 
    @RabbitHandler
    public void processMessage2(byte[] message) {
        System.out.println(new String(message));
    }
    
}
@Payload

可以获取消息中的 body 信息

@RabbitListener(queues = "debug")
public void processMessage1(@Payload String body) {
    System.out.println("body:"+body);
}
@Header,@Headers

可以获得消息中的 headers 信息

@RabbitListener(queues = "debug")
public void processMessage1(@Payload String body, @Header String token) {
    System.out.println("body:"+body);
    System.out.println("token:"+token);
}

@RabbitListener(queues = "debug")
public void processMessage1(@Payload String body, @Headers Map<String,Object> headers) {
    System.out.println("body:"+body);
    System.out.println("Headers:"+headers);
}

快速入门

Gitee项目:Ahang/ts-rabbitmq (gitee.com)

RabbitMQ结构介绍

rabbitmq成员图.drawio

队列,交换机和绑定统称为AMQP实体(AMQP entities)

成员

ConnectionFactory、Connection

ConnectionFactory、Connection、Channel都是RabbitMQ对外提供的API中最基本的对象。Connection是publisher/consumer 和 broker 之间的 TCP 连接,它封装了socket协议相关部分逻辑。ConnectionFactory为Connection的制造工厂。

Channel

如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP Connection 的开销将是巨大的,效率也较低。

Channel 是在 connection 内部建立的逻辑连接,如果应用程序支持多线程,通常每个 thread 创建单独的 channel 进行通讯,AMQP method 包含了 channel id 帮助客户端和 message broker 识别 channel,所以 channel 之间是完全隔离的。

Channel 作为轻量级的Connection 极大减少了操作系统建立 TCP connection 的开销

Producer(生产者)

生产消息的一方,通过信道向指定交换机发送消息;

生产者可以在发送消息前声明Exchange、Queue以及对应关系;声明后发送消息如果无相关成员则会按照声明情况创建对应的Exchange和Queue。

若不声明直接发送则会按照默认规则发送。

Consumer(消费者)

消费消息的一方,通过监听指定队列来消费消息;

消费者同样可以声明Exchange、Queue以及对应关系,声明后如果监听发现不存在监听队列,则会按照声明创建对应的Exchange和Queue。

Exchange(交换机)

用于接受、分配消息,存在多种不同类型的交换机处理特定需求;

不做存储,消息会存储在队列中;交换机只是进行消息的接收、转发、分配。

Queue(队列)

用于存储生产者的消息;

RoutingKey(路由键)

用于生产者者指定的消息路由键规则;

是为了匹配交换机上的绑定路由键,从而找到要发送的队列。

//会去名为“Topic-Ex”的交换机匹配“One.Two.Three”的绑定路由键
rabbitTemplate.send("Topic-Ex","One.Two.Three",msg);

BindingKey(绑定键)

用于把交换器的消息绑定到队列上;

是在配置时指定交换机和队列的绑定路由键,是为了去匹配生产者发送消息指定的路由键;每个交换机和队列之间都会有一个对应的绑定路由,首先消息发送到指定交换机,再根据发送的路由规则匹配事先设置的绑定路由键,匹配到对应的绑定路由则代表消息找到对应的队列。

//生产方通过配置类指定绑定关系
@Bean
public Binding bingExchange2(){
    return BindingBuilder.bind(topicQueue2())   //绑定队列
        .to(topicExchange())       //队列绑定到哪个交换器
        .with("*");         //绑定路由key,必须指定
}

//消费方监听声明交换机和队列关系,应当与上方保持一致,否则会创建新的
@RabbitListener(bindings = @QueueBinding(
    value = @Queue(value = "topicQueue-One", durable = "false"),
    exchange = @Exchange(value = "Topic-Ex", type = "topic", durable = "false"),
    key = "*"))
public void tsTopicMsg(Message data, Channel channel) {
    String str = new String(data.getBody());
    System.out.println(str + "-----:" + seq);
    seq.incrementAndGet();
}

虚拟主机

每个Rabbit都能创建很多vhost,我们称之为虚拟主机,每个虚拟主机其实都是mini版的RabbitMQ,拥有自己的队列,交换器和绑定,拥有自己的权限机制。

出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的 namespace 概念(或RocketMQ的Group)。

当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出多个 vhost,每个用户在自己的 vhost 创建 exchange/queue 等。

vhost特性

  1. RabbitMQ默认的vhost是“/”开箱即用;
  2. 多个vhost是隔离的,多个vhost无法通讯,并且不用担心命名冲突(队列和交换器和绑定),实现了多层分离;
  3. 创建用户的时候必须指定vhost;

vhost操作

可以通过rabbitmqctl工具命令

创建

rabbitmqctl add_vhost[vhost_name]

删除vhost

rabbitmqctl delete_vhost[vhost_name]

查看所有的vhost

rabbitmqctl list_vhosts

交换机类型

多消费者情况

当一个队列被多个消费者监听,那么消息将被均匀分配到消费者,且如果某条消息阻塞不会将其他消息发到另一个空闲的消费者,消息的分配在一开始就固定了。

Direct类型(默认,匹配发送)

它会把消息路由到那些binding key与routing key完全匹配的Queue中。

它是一个一对一的模型,一条消息一定会被发到指定的一个队列(完全匹配)。

img

配置代码

@Configuration
public class RabbitDirectConfig {

    @Bean
    public Queue directQueue(){
        //参数介绍
        //1.队列名 2.是否持久化 3.是否独占 4.自动删除 5.其他参数
        return new Queue("directQueue-One",false,false,false,null);
    }

    @Bean
    public Queue directQueue2(){
        //参数介绍
        //1.队列名 2.是否持久化 3.是否独占 4.自动删除 5.其他参数
        return new Queue("directQueue-Two",false,false,false,null);
    }

    @Bean
    public DirectExchange directExchange(){
        //参数介绍
        //1.交换器名 2.是否持久化 3.自动删除 4.其他参数
        return new DirectExchange("MqSendService-One",false,false,null);
    }

    @Bean
    public Binding bingExchange(){
        return BindingBuilder.bind(directQueue())   //绑定队列
                .to(directExchange())       //队列绑定到哪个交换器
                .with("One");         //绑定路由key,必须指定
    }

    @Bean
    public Binding bingExchange2(){
        return BindingBuilder.bind(directQueue2())   //绑定队列
                .to(directExchange())       //队列绑定到哪个交换器
                .with("Two");         //绑定路由key,必须指定
    }
}

Topic类型(拓展匹配发送)

它是Direct类型的一种扩展,提供灵活的匹配规则。

  • routing key为一个句点号 " . " 分隔的字符串(我们将被句点号“. ”分隔开的每一段独立的字符串称为一个单词),如"One.Two"
  • binding key与routing key一样也是句点号 " . " 分隔的字符串
  • binding key中可以存在两种特殊字符 " * "" # " ,用于做模糊匹配,其中“*”用于匹配一个单词,“#”用于匹配多个单词(可以是零个)

配置代码

package cn.zh.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitTopicConfig {
    @Bean
    public Queue topicQueue(){
        //参数介绍
        //1.队列名 2.是否持久化 3.是否独占 4.自动删除 5.其他参数
        return new Queue("topicQueue-One",false,false,false,null);
    }

    @Bean
    public Queue topicQueue2(){
        //参数介绍
        //1.队列名 2.是否持久化 3.是否独占 4.自动删除 5.其他参数
        return new Queue("topicQueue-Two",false,false,false,null);
    }

    @Bean
    public TopicExchange topicExchange(){
        //参数介绍
        //1.交换器名 2.是否持久化 3.自动删除 4.其他参数
        return new TopicExchange("Topic-Ex",false,false,null);
    }

    @Bean
    public Binding bingExchange(){
        return BindingBuilder.bind(topicQueue())   //绑定队列
                .to(topicExchange())       //队列绑定到哪个交换器
                .with("*.Two.*");        //路由key,必须指定
    }

    @Bean
    public Binding bingExchange2(){
        return BindingBuilder.bind(topicQueue2())   //绑定队列
                .to(topicExchange())       //队列绑定到哪个交换器
                .with("#");         //路由key,必须指定
    }
}

Fanout 类型(广播发送)

它会把所有发送到该Exchange的消息路由到所有与它绑定的Queue中。

它是一种一对多的类型,无法指定Binding Key,发送的一条消息会被发到绑定的所有队列。

img

配置代码

package cn.zh.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitFanoutConfig {

    @Bean
    public Queue fanoutQueue(){
        //参数介绍
        //1.队列名 2.是否持久化 3.是否独占 4.自动删除 5.其他参数
        return new Queue("fanoutQueue-One",false,false,false,null);
    }

    @Bean
    public Queue fanoutQueue2(){
        //参数介绍
        //1.队列名 2.是否持久化 3.是否独占 4.自动删除 5.其他参数
        return new Queue("fanoutQueue-Two",false,false,false,null);
    }

    @Bean
    public FanoutExchange fanoutExchange(){
        //参数介绍
        //1.交换器名 2.是否持久化 3.自动删除 4.其他参数
        return new FanoutExchange("Fanout-Ex",false,false,null);
    }

    @Bean
    public Binding bingExchange(){
        return BindingBuilder.bind(fanoutQueue())   //绑定队列
                .to(fanoutExchange());       //队列绑定到哪个交换器
    }

    @Bean
    public Binding bingExchange2(){
        return BindingBuilder.bind(fanoutQueue())   //绑定队列
                .to(fanoutExchange());       //队列绑定到哪个交换器
    }

}

Headers(键值对匹配,不常用)

headers类型的Exchange不依赖于routing key与binding key的匹配规则来路由消息,而是根据发送的消息内容中的headers属性进行匹配。

在绑定Queue与Exchange时指定一组键值对;当消息发送到Exchange时,RabbitMQ会取到该消息的headers(也是一个键值对的形式),对比其中的键值对是否完全匹配Queue与Exchange绑定时指定的键值对;如果完全匹配则消息会路由到该Queue,否则不会路由到该Queue。

该类型不常用,暂不提供代码。

Message(消息)

当执行诸如 basicPublish() 之类的操作时,内容作为字节数组参数传递,而其他属性作为单独的参数传入。

public class Message {

    private final MessageProperties messageProperties;

    private final byte[] body;

    public Message(byte[] body, MessageProperties messageProperties) {
        this.body = body;
        this.messageProperties = messageProperties;
    }

    public byte[] getBody() {
        return this.body;
    }

    public MessageProperties getMessageProperties() {
        return this.messageProperties;
    }
    
    ...
}

MessageProperties 接口定义了几个常见的属性,例如“messageId”、“timestamp”、“contentType”等等。 还可以通过调用 setHeader(String key, Object value) 方法扩展这些属性。

消息序列化

自定义的要作为消息object发送的类一定要实现Serializable接口,否则将收到IllegalArgumentException: SimpleMessageConverter only supports String, byte[] and Serializable payloads。

从版本开始 1.5.7, 1.6.11, 1.7.4, 和 2.0.0,如果消息正文是序列化的 Serializable Java对象,执行时不再反序列化(默认), 这是为了防止不安全的反序列化。 默认情况下,仅 java.utiljava.lang类反序列化。

要恢复以前的行为,可以通过调用添加允许的类/包模式 Message.addAllowedListPatterns(…)

//通配符
Message.addAllowedListPatterns("com.zh.*.class");
//单个
Message.addAllowedListPatterns(User.class.getName());
@org.junit.jupiter.api.Test
public void test() {
    NoMessage hello = new NoMessage("hello");
    SimpleMessageConverter simpleMessageConverter = new SimpleMessageConverter();
    Message message = simpleMessageConverter.toMessage(hello, new MessageProperties());
    log.info("添加白名单之前---{}",message);
    Message.addAllowedListPatterns(NoMessage.class.getName());
    log.info("NoMessage 全限定名:{}",NoMessage.class.getName());
    log.info("添加白名单之后---{}",message);
}

输出:
添加白名单之前---(Body:'[B@6fc3e1a4(byte[89])' MessageProperties
NoMessage 全限定名:com.rabbit.producer.NoMessage
添加白名单之后---(Body:'NoMessage(content=hello)'

Queue(队列)

构建者创建

@Bean
public Queue directQueue(){
    //需要的属性可以通过构建者不断添加
    Queue queue = QueueBuilder.durable("dis").autoDelete().ttl(100).build();
    return queue;
}

构造方法new

@Bean
public DirectExchange directExchange(){
    Map<String, Object> args = new HashMap<>(3);
    //声明当前队列绑定的死信交换机
    args.put("x-dead-letter-exchange", "dead_exchange");
    //声明当前队列的死信路由 key
    args.put("x-dead-letter-routing-key", "dead");
    //声明队列的 TTL
    args.put("x-message-ttl", 10000);
    //参数介绍
    //1.交换器名 2.是否持久化 3.自动删除 4.其他参数
    return new DirectExchange("MqSendService-One",false,false,args);
}

特性功能

Prefetch count(消息分配)

如果有多个消费者同时订阅同一个Queue中的消息,Queue中的消息会被平摊给多个消费者。这样并不好,因为如果每个消息的处理时间不同,就有可能会导致某些消费者一直在忙,而另外一些消费者很快就处理完手头工作并一直空闲的情况。

我们可以通过设置prefetchCount来表示该消费者在每次在该队列只能处理几个消息,比如我们设置prefetchCount=1,则该消费者每次在同一队列只能消费一条消息,消息未处理完不会被分配该队列其他消息。这样就达到能者多劳的效果。

rabbitmq:
    addresses: 127.0.0.1
    cache:
      channel:
        size: 25
# 指定消费端消息确认方式
    listener:
      simple:
        # 消费端最小并发数
        concurrency: 1
        # 消费端最大并发数
        max-concurrency: 5
        # 一次处理的消息数量
        prefetch: 2
        # 手动应答
        acknowledge-mode: manual

QOS预取值(设置未确认消息缓冲区大小)

介绍

这是RabbitMQ的一种保护机制。防止当消息激增的时候,海量的消息进入consumer而引发consumer宕机。

该值定义通道上允许的未确认消息的最大数量,这是为了防止Unacked消息缓冲区存在过多的Unacked消息。

一旦数量达到配置的数量,RabbitMQ 将停止在通道上传递更多消息。

除非至少有一个未处理的消息被确认,例如,假设在通道上有未确认的消息 5、6、7,8,并且通道的预取计数设置为 4,此时 RabbitMQ 将不会在该通道上再传递任何消息,除非至少有一个未应答的消息被 ack。比方说 tag=6 这个消息刚刚被确认 ACK,RabbitMQ 将会感知这个情况到并再发送一条消息。

代码实现

这个可以通过设置消息分配数目达到效果。

listener:
  simple:
    # 消费端最小并发数
    concurrency: 1
    # 消费端最大并发数
    max-concurrency: 5
    # 一次处理的消息数量
    prefetch: 2
    # 手动应答
    acknowledge-mode: manual
缓冲区大小
  • min = concurrency * prefetch * 节点数量
  • max = max-concurrency * prefetch * 节点数量

unacked_msg_count < min 队列不会阻塞。但需要及时处理unacked的消息。 - unacked_msg_count >= min 可能会出现堵塞。 - unacked_msg_count >= max 队列一定阻塞。

死信队列

RabbitMQ的死信队列不像RocketMQ一样时原本就存在的,它需要我们自己设置一个交换机然后绑定队列,我们在语义上将其用作为存放无法消费的消息的队列。

RabbitMQ的死信是通过为普通队列设置死信参数,当该队列出现无法消费的消息,就会将这些消息转移到设置的死信队列中。

死信消息产生原因

  • 消息 TTL 过期
  • 队列达到最大长度(队列满了,无法再添加数据到 mq 中)
  • 消息被拒绝(basic.reject 或 basic.nack)并且 requeue=false

RabbitMQ中的TTL

TTL 是 RabbitMQ 中一个消息或者队列的属性,表明一条消息或者该队列中的所有消息的最大存活时间,单位是毫秒。

换句话说,如果一条消息设置了 TTL 属性或者进入了设置 TTL 属性的队列,那么这条消息如果在 TTL 设置的时间内没有被消费,则会成为"死信"。如果同时配置了队列的 TTL 和消息的TTL,那么 较小 的那个值将会被使用,有两种方式设置 TTL。

设置TTL的方式

消息设置TTL
Message msg = new Message(s.getBytes(StandardCharsets.UTF_8));
//参数四 MessagePostProcessor:用于在执行消息转换后添加/修改标头或属性。 
//它还可以用于在侦听器容器和AmqpTemplate接收消息时修改入站消息。
rabbitTemplate.convertAndSend("MqSendService-One","One",msg,correlationData->{
    correlationData.getMessageProperties().setExpiration("1000");
    return correlationData;
});


//也可在创建消息时指定
 msg.getMessageProperties().setExpiration("1000");
队列设置TTL
@Bean
public DirectExchange directExchange(){
    Map<String, Object> args = new HashMap<>(3);
    //声明队列的 TTL
    args.put("x-message-ttl", 10000);
    //参数介绍
    //1.交换器名 2.是否持久化 3.自动删除 4.其他参数
    return new DirectExchange("MqSendService-One",false,false,args);
}


@Bean
public Queue directQueue(){
    //需要的属性可以通过构建者不断添加
    Queue queue = QueueBuilder.noDurable("TTL_Queue").ttl(100).build();
    return queue;
}
二者的区别

如果设置了队列的 TTL 属性,那么一旦消息过期,就会被队列丢弃(如果配置了死信队列被丢到死信队列中),

而消息设置TTL方式,消息即使过期,也不一定会被马上丢弃,因为因为 RabbitMQ 只会检查第一个消息是否过期,如果过期则丢到死信队列,如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行

另外,还需要注意的一点是,如果 不设置 TTL,表示消息永远不会过期,如果将 TTL 设置为 0,则表示除非此时可以直接投递该消息到消费者,否则该消息将会被丢弃。

代码实现

1.语义声明死信交换机

@Bean
public DirectExchange deadExchange(){
    //参数介绍
    //1.交换器名 2.是否持久化 3.自动删除 4.其他参数
    return new DirectExchange("Dead_Exchange",false,false,null);
}

2.声明死信队列,并建立绑定关系

@Bean
public Queue directQueue(){
    //参数介绍
    //1.队列名 2.是否持久化 3.是否独占 4.自动删除 5.其他参数
    return new Queue("Dead_Queue",false,false,false,null);
}

3.为正常队列设置死信参数(重点

@Bean
public Queue directQueue(){
    Map<String, Object> args = new HashMap<>(3);
    //声明当前队列绑定的死信交换机
    args.put("x-dead-letter-exchange", "dead_exchange");
    //声明当前队列的死信路由 key
    args.put("x-dead-letter-routing-key", "dead");
    //参数介绍
    //1.队列名 2.是否持久化 3.是否独占 4.自动删除 5.其他参数
    return new Queue("directQueue-One",false,false,false,args);
}

@Bean
public Queue directQueue2(){
    Queue queue = QueueBuilder
        .durable("dis")
        .autoDelete()
        .ttl(100)
        .deadLetterExchange("Dead_Exchange")		//设置死信交换机参数
        .deadLetterRoutingKey("Dead")		//设置死信队列的路由key
        .build();
    return queue;
}

延迟队列

利用死信队列达到

RabbitMQ的延迟队列可以通过设置TTL的时间再配合设置死信队列的参数达到。

例:创建一个队列并设置TTL时间,但无人监听消费,那么当TTL时间达到,该消息就会进入死信队列,这时设置一个监听死信队列的消 费者,从而达到延迟消费的效果。

利用官网延迟队列插件达到

优先级队列

介绍

RabbitMQ支持为队列设置优先级,从而达到优先级高的队列中消息被优先消费。

实现代码

@Bean
public Queue directQueue2() {
    //设置队列优先级
    //args.put("x-max-priority",5)
    
    Queue queue = QueueBuilder
        //持久化并设置队列名
        .durable("dis")
        //开启队列优先级,并设置优先级数
        .maxPriority(5)
        .build();
    return queue;
}

惰性队列

介绍

默认情况下,当生产者将消息发送到 RabbitMQ 的时候,队列中的消息会尽可能的存储在内存之中,这样可以更加快速的将消息发送给消费者。即使是持久化的消息,在被写入磁盘的同时也会在内存中驻留一份备份。

惰性队列会尽可能的将消息存入磁盘中,而在消费者消费到相应的消息时才会被加载到内存中,它的一个重要的设计目标是 支持更多的消息存储。当消费者由于各种各样的原因(比如消费者下线、宕机亦或者是由于维护而关闭等)而致使长时间内不能消费消息造成堆积时,惰性队列就很有必要了。

代码实现

队列存在两种模式:defaultlazylazy即为惰性队列模式。

@Bean
public Queue directQueue2() {
    //设置惰性队列
    //args.put("x-queue-mode", "lazy");
    
    Queue queue = QueueBuilder
        //持久化并设置队列名
        .durable("dis")
        //设为惰性队列
        .lazy()
        .build();
    return queue;
}

灾难防护

Message acknowledgment(消息确认)

介绍

从安全角度考虑,网络是不可靠的,接收消息的应用也有可能在处理消息的时候失败。基于此原因,AMQP模块包含了一个消息确认(message acknowledgements)的概念:当消息从队列投递给消费者的时候,消费者服务器需要返回一个ack(确认信息),当broker收到了确认才会将该消息删除;消息确认可以是自动的,也可以是由消费端手动确认。此外也支持生产端向broker发送消息得到broker的ack,从而针对做出响应逻辑。

发布端消息确认(发布确认)

确认模式
  • NONE

    禁用发布确认模式,是默认值

  • CORRELATED

    发布消息成功到交换器后会触发回调方法

  • SIMPLE

    经测试有两种效果,其一效果和 CORRELATED 值一样会触发回调方法;

    其二在发布消息成功后使用 rabbitTemplate 调用 waitForConfirms 或 waitForConfirmsOrDie 方法等待 broker 节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是waitForConfirmsOrDie 方法如果返回 false 则会关闭 channel,则接下来无法发送消息到 broker。

快速入门

1.配置文件设置发布确认方式

spring:
  application:
    name: produer-mq-7001
  rabbitmq:
    addresses: 127.0.0.1
    username: guest
    password: guest
    # 发布确认方式,默认NONE
    publisher-confirm-type: correlated

2.配置RabbitTemplate

由于发布确认需要设置回调,但是Spring默认是单例的,如果直接注入RabbitTemplate,那么在设置发布确认回调时,会被认为是重新设置回调方法;而一个RabbitTemplate只能有初始的一个发布确认回调。

public class RabbitTemplate extends RabbitAccessor implements ... {
    
    ...
        
    public void setConfirmCallback(ConfirmCallback confirmCallback) {
        Assert.state(this.confirmCallback == null || this.confirmCallback.equals(confirmCallback),
                     "Only one ConfirmCallback is supported by each RabbitTemplate");
        this.confirmCallback = confirmCallback;
    }
    
    ...
}
public abstract class Assert {
    public Assert() {
    }

    public static void state(boolean expression, String message) {
        if (!expression) {
            throw new IllegalStateException(message);
        }
    }
    
    ...
}

解决方式:

  1. 使用多例,可以达到不同的消息发布使用不同的确认回调(违背单例)

    @Bean
    @Scope("prototype")
    public RabbitTemplate getRabbitTemplate(ConnectionFactory connectionFactory){
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        return rabbitTemplate;
    }
    
  2. 使用单例,在初始时即配置确认回调(仅能有一个确认回调)

    @Bean
    public RabbitTemplate getRabbitTemplate(ConnectionFactory connectionFactory){
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean b, String s) {
                if (!b){
                    ReturnedMessage dataReturned = correlationData.getReturned();
                    String str = new String(dataReturned.getMessage().getBody());
                    System.out.println(str);
                    log.error("消息发送失败,请重试");
                    return;
                }
            }
        });
        return rabbitTemplate;
    }
    
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    //依赖注入 rabbitTemplate 之后再设置它的回调对象
    @PostConstruct
    public void init(){
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean b, String s) {
                if (!b){
                    ReturnedMessage dataReturned = correlationData.getReturned();
                    String str = new String(dataReturned.getMessage().getBody());
                    System.out.println(str);
                    log.error("消息发送失败,请重试");
                    return;
                }
            }
        });
    }
    

回退消息

在仅开启了生产者确认机制的情况下,交换机接收到消息后,会直接给消息生产者发送确认消息,如果发现该消息不可路由,那么消息会被直接丢弃,此时生产者是不知道消息被丢弃这个事件的。

此时通过设置 mandatory 参数可以在当消息传递过程中不可达目的地时将消息返回给生产者,需搭配使用 ReturnsCallback

@Bean
public RabbitTemplate getRabbitTemplate(ConnectionFactory connectionFactory){
    RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
    //true:交换机无法将消息进行路由时,会将该消息返回给生产者
    //false:如果发现消息无法进行路由,则直接丢弃;默认false
    rabbitTemplate.setMandatory(true);
    //设置回退消息交给谁处理
    rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
        @Override
        public void returnedMessage(ReturnedMessage returned) {
            System.out.println("--------无法路由,回退处理--------");
        }
    });
    //设置确认回调
    rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
        @Override
        public void confirm(CorrelationData correlationData, boolean b, String s) {
            if (!b){
                ReturnedMessage dataReturned = correlationData.getReturned();
                String str = new String(dataReturned.getMessage().getBody());
                System.out.println(str);
                log.error("消息发送失败,请重试");
                return;
            }
        }
    });
    return rabbitTemplate;
}

消费端消息确认

消息端确认模式
  • **NONE:**不确认,即监听器监听到消息后直接确认

  • **MANUAL:**手动确认,需要消费端手动回复确认

  • **AUTO:**容器将根据监听器是正常返回还是抛出异常来发出 ack/nack,注意与NONE区分

    ​ Spring 默认requeue-rejected配置为true,所以在消费消息发生异常后该消息会重新入队。

    ​ 并且若存在消费集群,会将某个消费端Nack的消息交给其他消费者。

消息确认实现方式
  • 方式一:配置文件
spring:
  application:
    name: consumer-mq-7100
  rabbitmq:
    addresses: 127.0.0.1
    cache:
      channel:
        size: 25
# 指定消费端消息确认方式
    listener:
      simple:
        acknowledge-mode: manual
  • 方式二:@RabbitListener 指定
@RabbitListener(bindings = @QueueBinding(
    value = @Queue(value = "directQueue-One", durable = "false"),
    exchange = @Exchange(value = "MqSendService-One", type = "direct", durable = "false"),
    key = "One"),
    ackMode = "MANUAL")			//指定消费端消息确认方式
public void tsAckDirectMsg(Message data, Channel channel) throws IOException {
    String str = new String(data.getBody());
    System.out.println(str + "-----:" + seq);
    System.out.println();
    seq.incrementAndGet();
    System.out.println(data.getMessageProperties().getDeliveryTag());
    System.out.println(channel.getChannelNumber());
    channel.basicAck(data.getMessageProperties().getDeliveryTag(),false);
}
channel.basicAck() 方法

参数:

  1. long deliveryTag:

    消息的索引。通常设为 data.getMessageProperties().getDeliveryTag()。

    每个消息在一个channel中都有唯一的一个deliveryTag,每次发送一条,deliveryTag都会+1,从0开始计数;
    确认消息传入的deliveryTag需保证和渠道内的一致,否则无法确认,该消息会被设置为 ready 状态。

    **注意:**当deliveryTag被固定一个数字m时,当m > deliveryTag就会换个渠道重新监听消费。

    ​ 无法确认的消息(deliveryTag不匹配,通道已关闭,连接已关闭或 TCP 连接丢失)会重新入队,被设为 ready 状态,如果存在其他消费者,会将消息发送 给其他消费者,否则反复尝试仅存消费者。但没进行确认的消息会被设为 Unacked

  2. boolean multiple:

    是否批量确认。

    当设为true时,会批量确认deliveryTag小于传入deliveryTag参数的消息。

channel.basicNack() 方法

参数多了一个 boolean requeue 是否重新入队,前两个参数同上。

Message durability(消息持久化)

默认情况下 RabbitMQ 退出或由于某种原因崩溃时,它忽视队列和消息。

队列的持久化

在声明队列的时候设置持久化为 true。

需要注意的就是如果之前声明的队列不是持久化的,需要把原先队列先删除,或者重新创建一个持久化的队列,不然就会出现错误。

@Bean
public Queue directQueue(){
    //参数介绍
    //1.队列名 2.是否持久化 3.是否独占 4.自动删除 5.其他参数
    return new Queue("directQueue-One",true,false,false,null);
}

交换机的持久化

同上。

备用交换机

前言

有了消息回退的功能我们可以感知到消息的投递情况,但是对于这些无法路由到的消息我们可能只能做一个记录的功能,然后再手动处理;并且消息回退会增加生产者的复杂性;那么现在如何想要实现不增加生产者的复杂性,并保证消息不丢失呢?因为消息是不可达的,所以显然无法通过死信队列机制实现。所以通过这种备用交换机的机制可以实现。

实现原理

它是通过在声明交换机的时候,为该交换机设置一个备用的交换机;当主交换机接收一条消息不可达后,会将该消息转发到备用交换机,它在将这些消息发到自己绑定的队列,一般备用交换机的类型都设置为 Fanout(广播类型)。这样我们可以统一设置一个消费者监听该交换机下的队列对其进行统一处理。

实现代码

mandatory 参数与备份交换机可以一起使用的时候,如果两者同时开启,谁优先级高,经测试备份交换机优先级高

@Configuration
public class RabbitDirectConfig {
    @Bean
    public Queue alternateQueue(){
        //参数介绍
        //1.队列名 2.是否持久化 3.是否独占 4.自动删除 5.其他参数
        Queue queue = QueueBuilder.durable("alternateQueue")
            .autoDelete()
            .build();
        return queue;
    }

    @Bean
    public FanoutExchange alternateExchange(){
        return new FanoutExchange("Alternate_Exchange",true,false,null);
    }

    @Bean
    public DirectExchange directExchange(){
        //        ExchangeBuilder exchange = ExchangeBuilder.directExchange("MqSendService-One")
        //                .durable(false)
        //                .autoDelete()
        //                .withArgument("alternate-exchange", "Alternate_Exchange");
        //参数介绍
        //1.交换器名 2.是否持久化 3.自动删除 4.其他参数
        Map<String,Object> args = new HashMap<>(3);
        args.put("alternate-exchange","Alternate_Exchange");
        return new DirectExchange("MqSendService-One",false,false,args);
    }

    @Bean
    public Binding bingAlternateExchange(){
        return BindingBuilder.bind(alternateQueue())   //绑定队列
            .to(alternateExchange());      //队列绑定到哪个交换器
    }

    @Bean
    public Binding bingExchange(){
        return BindingBuilder.bind(directQueue())   //绑定队列
            .to(directExchange())       //队列绑定到哪个交换器
            .with("One");        //路由key,必须指定
    }
}

标签:false,Springboot,队列,rabbitmq,RabbitMQ,Queue,详解,消息,public
来源: https://blog.csdn.net/AhangA/article/details/121641034

专注分享技术,共同学习,共同进步。侵权联系[admin#icode9.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有