ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

RabbitMQ的消息可靠性投递

2021-08-26 18:00:06  阅读:177  来源: 互联网

标签:可靠性 ACK 确认 RabbitMQ 投递 交换机 消息


目录

1.RabbitMQ的消息可靠性投递讲解

  • 什么是消息的可靠性投递
    • 保证消息百分百发送到消息队列中去
    • 详细流程
      • 保证mq节点成功接收消息
      • 消息发送端需要接收到mq服务端接收到消息的确认应答
      • 完善的消息补偿机制,发送失败的消息可以再感知并二次处理
  • RabbitMQ消息投递路径
    • 生产者-->交换机-->队列-->消费者
    • 通过两个点控制消息的可靠性投递
      • 生产者到交换机
        • 通过confirmCallback
      • 交换机到队列
        • 通过returnCallback
  • 建议
    • 开启消息确认机制以后,保证了消息的准确送达,但由于频繁的确认交互,RabbitMQ整体效率变低,吞吐量下降严重,不是非常重要的消息不建议用消息确认机制

2.RabbitMQ的消息可靠性投递ConfirmCallback实战

  • 生产者到交换机

    • 通过ConfirmCallback
    • 生产者投递消息后,如果Broker收到消息后,会给生产者一个ACK。生产者通过ACK,可以确认这条消息是否正常发送到Broker,这种方式是消息可靠性投递的核心
  • 开启ConfirmCallback

    # 新版,none值是禁用发布确认模式,是默认值;correlated值是发布消息成功到交换机后会触发回调方法
    spring:
      rabbitmq:
        # 开启消息二次确认,生产者到交换机
        publisher-confirm-type: correlated
    
  • 代码示例

    @Test
    void testConfirmCallback(){
        this.rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            /**
             * @param correlationData 配置
             * @param ack 交换机是否接收到消息,true是成功,false是失败
             * @param cause 失败的原因
             */
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                System.out.println("发送状态:" + ack);
            }
        });
    
        this.rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHAGE_NAME, "order.new", "新订单");
    }
    

3.RabbitMQ的消息可靠性投递ReturnsCallback实战

  • 交换机到队列

    • 通过ReturnsCallback
    • 消息从交换机发送到对应队列失败时触发
    • 两种模式
      • 交换机到队列不成功,则丢弃消息(默认)
      • 交换机到队列不成功,返回给消息生产者,触发ReturnsCallback
  • 开启ReturnsCallback

    spring:
      rabbitmq:
        # 开启消息二次确认,交换机到队列
        publisher-returns: true
        # true:交换机处理消息到路由失败,则会返回给生产者
        template:
          mandatory: true
    
  • 代码示例

    @Test
    void testReturnCallback(){
        this.rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
            @Override
            public void returnedMessage(ReturnedMessage returnedMessage) {
                System.out.println(returnedMessage);
            }
        });
    
        this.rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHAGE_NAME, "order.new", "新订单");
    }
    

4.RabbitMQ的消息确认机制ACK

  • 背景:消费者从Broker中监听消息,需要确保消息被合理处理

  • RabbitMQ的ACK介绍

    • 消费者从RabbitMQ收到消息并处理完成后,反馈给RabbitMQ,RabbitMQ收到反馈后才将此消息从队列中删除
    • 消费者在处理消息出现了网络不稳定、服务器异常等现象,那么就不会有ACK反馈,RabbitMQ会认为这个消息没有正常消费,会将消息重新放入队列中
    • 只有当消费者正确发送ACK反馈,RabbitMQ确认收到后,消息才会从RabbitMQ服务器的数据中删除
    • 消息的ACK确认机制默认是打开的,消息如未被进行ACK的消息确认机制,这条消息被锁定Unacked
  • 确认方式

    • 自动确认(默认)

    • 手动确认 manual

      spring:
        rabbitmq:
          # 开启手动确认消息,如果消息重新入队,进行重试
          listener:
            simple:
              acknowledge-mode: manual
      
    • 其他(基本不用,忽略)

  • 代码示例

    package com.gen.mq;
    
    import com.gen.config.RabbitMQConfig;
    import com.rabbitmq.client.Channel;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    import java.io.IOException;
    
    @Component
    @RabbitListener(queues = RabbitMQConfig.QUEUE)
    public class OrderMQListener {
    
        @RabbitHandler
        public void messageHandler(String body, Message message, Channel channel) throws IOException {
            System.out.println("【接收消息:】" + body);
    
            // 告诉Broker消息已经被确认
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        }
    
    }
    
    
    • deliveryTag:表示消息投递序号,每次消费消息或者消息重新投递后,deliveryTag都会增加
    • basicNack和basicReject介绍
      • basicReject一次只能拒绝接收一个消息,可以设置是否requeue
      • basicNack方法可以支持一次0个或多个消息的拒收,可以设置是否requeue
  • 人工审核异常消息

    • 设置重试阈值,超过后确认消费成功,记录消息,人工处理

标签:可靠性,ACK,确认,RabbitMQ,投递,交换机,消息
来源: https://www.cnblogs.com/Gen2021/p/15190915.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有