ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

RabbitMQ消息确认机制

2022-01-14 12:00:30  阅读:107  来源: 互联网

标签:spring 确认 rabbitmq RabbitMQ deliveryTag 消息 机制 true


概念

确认机制--》可靠抵达

发送端确认

#配置文件中 开启发送端到达服务器确认
spring.rabbitmq.publisher-confirms = true
#开启发送端消息抵达队列确认
spring.rabbitmq.publisher-returns = true
#只要抵达队列,以异步发送优先回调我们这个returnConfirm
spring.rabbitmq.template.mandatory = true
@Configuration
public class MyRabbitMQConfig {

    // @Resource
    // private RabbitTemplate rabbitTemplate;

    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }

/*    *//**
     * 定制RabbitTemplate
     * 1、服务收到消息就会回调
     *      1、spring.rabbitmq.publisher-confirms: true
     *      2、设置确认回调
     * 2、消息正确抵达队列就会进行回调
     *      1、spring.rabbitmq.publisher-returns: true
     *         spring.rabbitmq.template.mandatory: true
     *      2、设置确认回调ReturnCallback
     *
     * 3、消费端确认(保证每个消息都被正确消费,此时才可以broker删除这个消息)
     *
     *//*
    @PostConstruct  //MyRabbitConfig对象创建完成以后,执行这个方法
    public void initRabbitTemplate() {

        *//**
         * 1、只要消息抵达Broker就ack=true
         * correlationData:当前消息的唯一关联数据(这个是消息的唯一id)
         * ack:消息是否成功收到
         * cause:失败的原因
         *//*
        //设置确认回调
        rabbitTemplate.setConfirmCallback((correlationData,ack,cause) -> {
            System.out.println("confirm...correlationData["+correlationData+"]==>ack:["+ack+"]==>cause:["+cause+"]");
        });


        *//**
         * 只要消息没有投递给指定的队列,就触发这个失败回调
         * message:投递失败的消息详细信息
         * replyCode:回复的状态码
         * replyText:回复的文本内容
         * exchange:当时这个消息发给哪个交换机
         * routingKey:当时这个消息用哪个路邮键
         *//*
        rabbitTemplate.setReturnCallback((message,replyCode,replyText,exchange,routingKey) -> {
            System.out.println("Fail Message["+message+"]==>replyCode["+replyCode+"]" +
                    "==>replyText["+replyText+"]==>exchange["+exchange+"]==>routingKey["+routingKey+"]");
        });
    }*/

}

消费端确认

1、默认是自动确认的,只要消息接收到,客户端会自动确认,服务端就会移除这个消息,

问题:

我们收到很多消息,自动回复给服务器ack,只有一个消息处理成功,宕机了。发生消息丢失;

解决方法:消费者手动确认模式,

#手动ack消息
spring.rabbitmq.listener.simple.acknowledge-mode=manual
@RbbitHandler
public void recieveMessage(Message message, OrderReturnReasonEntity content, Channel channel) throws InterruptedException{
    System,out.println("接收到消息",+content);
    byte[] body = message.getBody();
    //消息头属性信息
    MessageProperties properties = message.getMessageProperties();
    //Thread.sleep(3000);
    System.out.println("消息处理完成==》"+content.getName());
    //deliveryTag 是 Channel内按顺序自增的
    long deliveryTag = message.getMessageProperties().getDeliveryTag();
    System.out.println("deliveryTag==>"+deliveryTag);
    
    //手动确认,批量模式
    try{
        if(deliveryTag%2==0){
            channel.basicAck(deliveryTag, false);
        }else{
            //退货 requeue=false?丢弃:发回服务器,重新入队
            //long deliveryTag, boolean multiple, boolean requeue
            channel.basicNack(deliveryTag, false, true);
            //long deliveryTag, boolean requeue
            //channle.basicReject();
        }
    }catch(Exception e){
        //网络中断
    }
}

标签:spring,确认,rabbitmq,RabbitMQ,deliveryTag,消息,机制,true
来源: https://www.cnblogs.com/aluna/p/15801314.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有