ICode9

精准搜索请尝试: 精确搜索
首页 > 其他分享> 文章详细

11 发布确认高级

2021-11-29 19:02:08  阅读:179  来源: 互联网

标签:11 String CONFIRM 确认 高级 交换机 消息 message public


发布确认 springboot 版本

配置文件

在配置文件当中需要添加

spring.rabbitmq.publisher-confirm-type=correlated

  • NONE

禁用发布确认模式,是默认值

  • CORRELATED

发布消息成功到交换器后会触发回调方法

  • SIMPLE

经测试有两种效果,其一效果和 CORRELATED 值一样会触发回调方法,
其二在发布消息成功后使用 rabbitTemplate 调用 waitForConfirms 或 waitForConfirmsOrDie 方法
等待 broker 节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是
waitForConfirmsOrDie 方法如果返回 false 则会关闭 channel,则接下来无法发送消息到 broker

//配置类  发布确认
@Configuration
public class ConfirmConfig {

    //交换机
    public static final String CONFIRM_EXCHANGE_NAME = "confirm_exchange";
    //队列
    public static final String CONFIRM_QUEUE = "confirm_queue";
    //RoutingKey
    public static final String CONFIRM_ROUTING_KEY = "confirm_routing_key";

    //声明交换机
    @Bean
    public DirectExchange confirmExchange(){
        return new DirectExchange(CONFIRM_EXCHANGE_NAME);
    }

    //声明队列
    @Bean
    public Queue confirmQueue(){
        return QueueBuilder.durable(CONFIRM_QUEUE).build();
    }

    //绑定
    @Bean
    public Binding queueBindingExchange(@Qualifier("confirmExchange")DirectExchange confirmExchange,
                                        @Qualifier("confirmQueue")Queue confirmQueue){
        return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_ROUTING_KEY);
    }
}

消息生产者

@Slf4j
@RestController
@RequestMapping("/confirm")
public class ProducerController {

    @Autowired
    RabbitTemplate rabbitTemplate;

    //发消息
    @GetMapping("/sendMessage/{message}")
    public void sendMessage(@PathVariable String message){
        CorrelationData correlationData = new CorrelationData("1");
        rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME,ConfirmConfig.CONFIRM_ROUTING_KEY,message,correlationData);

        log.info("发送的消息内容:{}",message);
		
        //改变 routingkey 让他错误
        CorrelationData correlationData2 = new CorrelationData("2");
        rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME,ConfirmConfig.CONFIRM_ROUTING_KEY+"2",message,correlationData2);

        log.info("发送的消息内容:{}",message);
    }

}

回调接口

@Component
@Slf4j
public class MyCallBack implements RabbitTemplate.ConfirmCallback{

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init(){
        //注入
        rabbitTemplate.setConfirmCallback(this);
    }

    /**
     *  交换机确认回调方法
     *  1.发消息  交换机收到了  回调
     *      1.1 CorrelationData  保存了回调消息的ID及相关信息
     *      1.2 交换机收到消息  ack=true
     *      1.3 cause - null
     * 2. 发消息  交换机接受失败  回调
     *      2.1 CorrelationData  保存了回调消息的ID及相关信息
     *      2.2 交换机收到消息  ack = false
     *      2.3  cause  - 失败的原因
     */

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        String id = correlationData != null ? correlationData.getId() : "";

        if (ack){
            log.info("交换机收到了id为:{}的消息",id);
        }else {
            log.info("交换机还未收到id:{}的消息,由于原因:{}",id,cause);
        }
    }
}

消息消费者

@Slf4j
@Component
public class Consumer {

    @RabbitListener(queues = ConfirmConfig.CONFIRM_QUEUE)
    public void receiveConfirmMessage(Message message){
       String msg = new String(message.getBody());
        log.info("接收到到的队列confirm.queue消息:{}",msg);
    }

}

结果分析

发送了两条消息,第一条消息的 RoutingKey 为 "key1",第二条消息的 RoutingKey 为"key2",两条消息都成功被交换机接收,也收到了交换机的确认回调,但消费者只收到了一条消息,因为第二条消息的 RoutingKey 与队列的 BindingKey 不一致,也没有其它队列能接收这个消息,所有第二条消息被直接丢弃了。

回退消息

队列收不到消息,routingKey 出错,队列消失了

在仅开启了生产者确认机制的情况下,交换机接收到消息后,会直接给消息生产者发送确认消息,如果发现该消息不可路由,那么消息会被直接丢弃,此时生产者是不知道消息被丢弃这个事件的。

在配置类加上 spring.rabbitmq.publisher-returns=true

消息生产者代码

@Slf4j
@RestController
@RequestMapping("/confirm")
public class ProducerController {

    @Autowired
    RabbitTemplate rabbitTemplate;

    //发消息
    @GetMapping("/sendMessage/{message}")
    public void sendMessage(@PathVariable String message){
        CorrelationData correlationData = new CorrelationData("1");
        rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME,ConfirmConfig.CONFIRM_ROUTING_KEY,message,correlationData);

        log.info("发送的消息内容:{}",message);

        CorrelationData correlationData2 = new CorrelationData("2");
        rabbitTemplate.convertAndSend(ConfirmConfig.CONFIRM_EXCHANGE_NAME+"123",ConfirmConfig.CONFIRM_ROUTING_KEY,message,correlationData2);

        log.info("发送的消息内容:{}",message);
    }

}

回调接口

@Component
@Slf4j
public class MyCallBack implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init(){
        //注入
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnsCallback(this);
    }

    /**
     *  交换机确认回调方法
     *  1.发消息  交换机收到了  回调
     *      1.1 CorrelationData  保存了回调消息的ID及相关信息
     *      1.2 交换机收到消息  ack=true
     *      1.3 cause - null
     * 2. 发消息  交换机接受失败  回调
     *      2.1 CorrelationData  保存了回调消息的ID及相关信息
     *      2.2 交换机收到消息  ack = false
     *      2.3  cause  - 失败的原因
     */

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        String id = correlationData != null ? correlationData.getId() : "";

        if (ack){
            log.info("交换机收到了id为:{}的消息",id);
        }else {
            log.info("交换机还未收到id:{}的消息,由于原因:{}",id,cause);
        }
    }

    //当消息传递过程中不可达目的地时将消息返回给生产者
    //只有不可达目的地的时候才进行回退
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        log.error("消息{},被交换机{}退回,退回原因:{},routingKey:{}",
                new String(message.getBody()),exchange,replyText,routingKey);
    }

    @Override
    public void returnedMessage(ReturnedMessage returned) {

    }
}

备份交换机

修改配置类

@Configuration
public class ConfirmConfig {

    //交换机
    public static final String CONFIRM_EXCHANGE_NAME = "confirm_exchange";
    //队列
    public static final String CONFIRM_QUEUE = "confirm_exchange";
    //RoutingKey
    public static final String CONFIRM_ROUTING_KEY = "confirm_routing_key";
    //备份交换机
    public static final String BACKUP_EXCHANGE_NAME = "backup_exchange";
    //备份队列
    public static final String BACKUP_QUEUE_NAME = "backup_queue";
    //报警队列
    public static final String WARNING_QUEUE_NAME = "warning_queue";


    //声明交换机
    @Bean
    public DirectExchange confirmExchange(){
        //确认交换机 无法确认消息 转发到备份交换机
        return ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME).durable(true)
                .withArgument("alternate-exchange",BACKUP_EXCHANGE_NAME).build();
    }

    //声明队列
    @Bean
    public Queue confirmQueue(){
        return QueueBuilder.durable(CONFIRM_QUEUE).build();
    }

    //绑定
    @Bean
    public Binding queueBindingExchange(@Qualifier("confirmExchange")DirectExchange confirmExchange,
                                        @Qualifier("confirmQueue")Queue confirmQueue){
        return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_ROUTING_KEY);
    }

    //备份交换机
    @Bean
    public FanoutExchange backupExchange(){
        return new FanoutExchange(BACKUP_EXCHANGE_NAME);
    }

    //备份队列
    @Bean
    public Queue backupQueue(){
        return QueueBuilder.durable(BACKUP_QUEUE_NAME).build();
    }

    @Bean
    public Queue warningQueue(){
        return QueueBuilder.durable(WARNING_QUEUE_NAME).build();
    }
    //绑定
    @Bean
    public Binding backupQueueBindingExchange(@Qualifier("backupExchange") FanoutExchange backupExchange,
                                        @Qualifier("backupQueue")Queue backupQueue){
        return BindingBuilder.bind(backupQueue).to(backupExchange);
    }

    //绑定
    @Bean
    public Binding warningQueueBindingExchange(@Qualifier("backupExchange") FanoutExchange backupExchange,
                                              @Qualifier("warningQueue")Queue warningQueue){
        return BindingBuilder.bind(warningQueue).to(backupExchange);
    }
}

报警消费者

@Component
@Slf4j
public class WarningConsumer {

    //接受报警信息
    @RabbitListener(queues = ConfirmConfig.WARNING_QUEUE_NAME)
    public void receiveWarningMsg(Message message) {
        String msg = new String(message.getBody());
        log.error("报警发现不可路由消息:{}",msg);

    }
}

结果分析

mandatory 参数与备份交换机可以一起使用的时候,如果两者同时开启,消息究竟何去何从?谁优先级高,经过上面结果显示答案是备份交换机优先级高。

标签:11,String,CONFIRM,确认,高级,交换机,消息,message,public
来源: https://www.cnblogs.com/flypigggg/p/15620786.html

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有