ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

rabbitMQ结合springboot使用

2021-01-30 20:32:51  阅读:160  来源: 互联网

标签:return springboot confirm 队列 rabbitMQ 死信 结合 消息 public


rabbitMQ结合springboot使用

导入依赖

<parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.6.RELEASE</version>
        <relativePath/>
    </parent>

    <dependencies>
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.6.0</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

简单使用

消息生产

创建发送配置

/**
 * @Author: bdsbdg
 * @Date: 2021/1/30 15:15
 */
@Configuration
public class ApplicationConfig {

    // 创建队列
    @Bean
    public Queue confirmQueue(){
        return new Queue("confirm-queue", true);
    }

    // 创建交换机
    @Bean
    public Exchange confirmExchange(){
        return new DirectExchange("confirm-exchange", true, false);
    }

    // 队列绑定到交换机
    @Bean
    public Binding queueBindToExchangeByConfirm(Queue confirmQueue, Exchange confirmExchange){
        return BindingBuilder.bind(confirmQueue).to(confirmExchange).with("confirm-routing-key").noargs();
    }
}

发送消息

/**
 * @Author: bdsbdg
 * @Date: 2021/1/30 15:40
 */
@SpringBootTest
@RunWith(SpringRunner.class)
public class demo {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testConfirm() {
        // 将消息发送到指定交换机的指定路由
        rabbitTemplate.convertAndSend("confirm-exchange", "confirm-routing-key", "测试消息发送。。。");
    }
}

消息接收

创建监听器监听指定队列的消息

@Component
public class ConsumerListener {
    @RabbitListener(queues = "confirm-queue")
    public void myListener1(String data) throws Exception {
        System.out.println("消费者接收到的消息data为:" + data);
    }
}

消息生产的可靠性投递

在使用RabbitMQ的时候,作为消息的发送方希望杜绝任何消息丢失或者投递失败的场景。如果消息投递失败,RabbitMQ为我们提供了两种模式用来控制消息的可靠投递。

  • confirm模式:

    • 首先需要开启confirm模式
    • 消息从producer到达exchange后,会执行一个confirmCallback回调函数
    • 该回调函数的方法中有个ack参数
      • ack = true,则发送成功
      • ack = false,则发送失败
  • return模式:

    • 首先需要开启return模式
    • 消息从exchange路由到queue后
      • 如果投递成功,不会执行一个returnCallback回调函数
      • 如果投递失败,则会执行一个returnCallback回调函数

消息生产者可靠性投递实现

修改配置文件

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    virtual-host: /day2
	publisher-confirms: true	# 开启confirm模式
    publisher-returns: true		# 开启return模式

RabbitTemplate设置回调

/**
 * @Author: bdsbdg
 * @Date: 2021/1/30 15:40
 */
@SpringBootTest
@RunWith(SpringRunner.class)
public class demo {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testConfirm() {
        // confirm
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                if (ack){
                    System.out.println("发送消息成功");
                }else {
                    System.out.println("发送消息失败!!!"+cause);
                }
            }
        });
        
        // return
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
//                System.out.println("消息路由失败会执行该方法。。。");
//                System.out.println("发送的消息体:" + new String(message.getBody()));
                System.out.println("响应码:" + replyCode);
//                System.out.println("响应信息:" + replyText);
            }
        });
        
        
        // 将消息发送到指定交换机的指定路由	
        rabbitTemplate.convertAndSend("confirm-exchange", "confirm-routing-key", "测试消息发送。。。");

        rabbitTemplate.convertAndSend("confirm-exchange-1", "confirm-routing-key", "测试消息发送confirm模式失败。。。");

        rabbitTemplate.convertAndSend("confirm-exchange", "confirm-routing-key-1", "测试消息发送return模式失败。。。");
    }
}

消费端ack机制

如果在处理消息的过程中,消费者的服务在处理消息的时候出现异常,那么可能这条正在处理的消息就没有完成消息消费,数据就会丢失。为了确保数据不会丢失,RabbitMQ支持确认机制ACK (Acknowledge)。

消费端接收到消息后有三种ack方式:

  • 不确认:ack = "none"
  • 手动确认:ack = "manual"
  • 自动确认:ack = "auto"

自动确认是指,消息一旦被consumer接收到则自动确认收到,并将相应的message从RabbitMQ的消息缓存中移除。但是在实际的业务处理中,很可能是消息被接收到了,但是业务处理出现了异常,那么消息从缓存中移除即该消息就被丢弃了。如果设置了手动确认,则需要在业务处理成功后,调用channel.basicAck()方法手动签收,如果出现了异常,则调用channel.basicNack()方法,让其自动重发消息。

ack功能实现

修改消费端配置文件

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    # 消息确认方式
    listener:
      simple:
        acknowledge-mode: manual  # 手动确认

修改消费端消息处理代码

@Component
public class ConsumerListener {
    @RabbitListener(queues = "confirm-queue")
    public void myListener1(String data, Message message, Channel channel) throws Exception {
//        System.out.println("消费者接收到的消息data为:" + data);
//        Thread.sleep(5000);
        byte[] body = message.getBody();
        System.out.println("消费者接收到的消息body为:" + new String(body));

        // 消息id
        long id = message.getMessageProperties().getDeliveryTag();
        System.out.println("id:"+id);

        try {
            if (id%2==0){
                int a = 1/0;
            }
            System.out.println("处理业务");
            // 业务处理成功:手动签收
            channel.basicAck(id,true);
            System.out.println("处理业务成功!!!");
        }catch (Exception e){
            System.out.println("处理业务失败!!!");
            // 业务处理失败:拒收,并且让消息重回队列
            channel.basicNack(id,true,true);
        }
    }
}

消费端限流

未设置前

设置后

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    # 确认方式
    listener:
      simple:
        acknowledge-mode: manual
        # 每次最多处理消息的个数
        prefetch: 5

TTL与DLX

TTL

  • TTL:Time To Live(存活时间/过期时间)
  • 当消息到达存活时间后,该消息还没有被消费,会自动被清除
  • RabbitMQ可以对消息设置过期时间也可以对整个队列设置过期时间
    • 如果都设置了,哪个时间先到则生效
      在队列上设置过期时间
// 修改队列创建代码即可
// 创建队列
    @Bean
    public Queue confirmTtlQueue(){
        // 创建队列 设置里面内容十秒过期
        return QueueBuilder.durable("confirm-ttl-queue").withArgument("x-message-ttl",10000) .build();
    }
    // 当该队列中的消息进入十秒后将自动销毁

在消息上设置过期时间

@Test
public void testTTL(){
    // 可以设置消息的属性消息
    MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
        @Override
        public Message postProcessMessage(Message message) throws AmqpException {
            // 设置消息的过期时间 5s
            message.getMessageProperties().setExpiration("5000");
            return message;
        }
    };
    rabbitTemplate.convertAndSend("confirm-ttl-exchange","confirm-routing-key", "ttl消息", messagePostProcessor);
}

DLX

1、生成者将消息发送到交换机后,由交换机路由到指定的队列

2、当该消息成为了死信后并且将该消息发送给DLX。

成为死信的三种情况

  • 队列消息长度达到限制
  • 消费者拒签消息
  • 原队列中存在消息过期设置,消息到达超时时间未被消费

3、DLX再将这个消息路由给专门处理死信的队列,并且由对应的消费者消费

创建死信队列

// 创建死信交换机
    @Bean
    public Exchange dlxExhange(){
        return new DirectExchange("dlx-exchange");
    }

    // 创建死信队列
    @Bean
    public Queue dlxQueue(){
        return new Queue("dlx-queue");
    }

    // 将死信队列绑定到死信交换机上
    @Bean
    public Binding dlxBinding(Queue dlxQueue, Exchange dlxExhange){
        return BindingBuilder.bind(dlxQueue).to(dlxExhange).with("dlx-routing-key").noargs();
    }

创建普通消息队列时绑定死信队列

 // 创建普通队列
    @Bean
    public Queue delayQueue(){
        Map<String, Object> args = new HashMap<>();
        args.put("x-message-ttl", 20000);           // 队列过期时间
        args.put("x-max-length", 10000000);         // 队列中消息数量
        args.put("x-dead-letter-exchange", "dlx-exchange");       // 绑定死信交换机
        args.put("x-dead-letter-routing-key", "dlx-routing-key");    // 绑定死信路由器
        return QueueBuilder.durable("delay-queue").withArguments(args).build();
    }

发送消息

消息过期

实现延时队列

  • 创建指定过期时间且无消费端处理消息的普通队列
  • 监听该普通队列过期消息存放的死信队列

标签:return,springboot,confirm,队列,rabbitMQ,死信,结合,消息,public
来源: https://www.cnblogs.com/bdsbdg/p/14350360.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有